I have a Kafka queue from where I read the data as below:
private static void startKafkaConsumerStream() {
        try {
            System.out.println("Print method: startKafkaConsumerStream");
            Dataset<String> lines = (Dataset<String>) _spark
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", getProperty("kafka.bootstrap.servers"))
                    .option("subscribe", HTTP_FED_VO_TOPIC)
                    .option("startingOffsets", "latest")
                    .load()
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());
            StreamingQuery query = lines.writeStream()
                    .outputMode("append")
                    .format("console")
                    .start();
            query.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
The Requirement: With the above code, I am able to print records to console however, I am jeopardized as in how do I pass these to a method which will process them.
To do this, I tried looking over in the documentation but could not find anything relevant. As I am a newbie to this, it might sound a bit silly. However I am stuck and would highly appreciate any hints.
Goal of the app The goal of the app is to accept request and send it to Kafka, then in a separate thread a Kafka reader is implemented which is responsible to read and process the request and produce the output to another Kafka queue. I am just implementing this, the architecture is not my idea.
 
                        
You can use a
ForeachWriter[T]on the sink part of kafka streaming application to process each row of your query, like this: