I have a Kafka queue from where I read the data as below:
private static void startKafkaConsumerStream() {
try {
System.out.println("Print method: startKafkaConsumerStream");
Dataset<String> lines = (Dataset<String>) _spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
.option("subscribe", HTTP_FED_VO_TOPIC)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
StreamingQuery query = lines.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
The Requirement: With the above code, I am able to print records to console however, I am jeopardized as in how do I pass these to a method which will process them.
To do this, I tried looking over in the documentation but could not find anything relevant. As I am a newbie to this, it might sound a bit silly. However I am stuck and would highly appreciate any hints.
Goal of the app The goal of the app is to accept request and send it to Kafka, then in a separate thread a Kafka reader is implemented which is responsible to read and process the request and produce the output to another Kafka queue. I am just implementing this, the architecture is not my idea.
You can use a
ForeachWriter[T]
on the sink part of kafka streaming application to process each row of your query, like this: