Consume Kafka in batches when all data has been pushed

882 Views Asked by At

I'm new to Kafka and I would like to do the following:

  • I have a bunch of servers that push some data every 10 minutes to Kafka.
  • I have a spark application that needs the latest data pushed by all the servers.

E.g.: I have 2 servers that push, respectively 'a' and 'b'. I need the spark app to receive in a dataframe the values 'a' and 'b' so that they can be processed together. 10 minutes later, the 2 servers push 'c' and 'd'. The spark app should receive the values 'c' and 'd' at the same time, etc.

My spark application needs all the latest data pushed, so I believe that a streaming approach is not correct and that maybe a batch approach (or maybe it's called differently) should be taken.

My Spark app expects a DataFrame.

2

There are 2 best solutions below

0
On

Your problem does not sound like an usual Kafka use case. However if using Kafka is a must you can use Kafka topics to group data. By creating topics A_B and C_D you assure that values 'a' and 'b' will be consumed together and separated from 'c' and 'd' values. Then your Spark app must verify that it got all needed data from A_B and C_D and proceeds with execution. This design will work if your Spark application is able to buffer all data and determine when all needed messages were consumed.

0
On

When you first load your app, you need to save the current offset of your Kafka topic. Then, every 10 minutes you load the data from Kafka and perform your logic. So you can basically run the following routine:

  • Store current offset
  • Wait 10 minutes
  • Init the DataFrame and load data from Kafka (in your case, for the first time it will load 'a' and 'b')
  • Perform your logic

When you load the DataFrame, you can specify the offsets and hence make sure that you get the data received in the past 10 minutes.

For example, you can do something like this to init your DataFrame:

df = spark.read.format("kafka") \
     .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
     .option("subscribe", "your_topic") \
     .option("startingOffsets", """{"your_topic":{"0":saved_offset}}""") \
     .option("endingOffsets", """{"your_topic":{"0":-1}}""") \
     .load()

Alternatively, if you want to use streaming application, the idea is pretty much the same. However, it this case you start in the current Kafka position (or store the starting offset once and start from it), and then every additional query you run will continue where the previous query left off. So in this case you first init the DataFrame and then run the following routine:

  • Wait 10 minutes
  • Query the DataFrame and perform your logic

Initializing the DataFrame in this case will be as follows:

df = spark.readStream.format("kafka") \
     .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
     .option("subscribe", "your_topic") \
     .load()

You can get more info here: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html