I want to write a Spark Streaming Job from Kafka to Elasticsearch. Here I want to detect the schema dynamically while reading it from Kafka.
Can you help me to do that.?
I know, this can be done in Spark Batch Processing via below line.
val schema = spark.read.json(dfKafkaPayload.select("value").as[String]).schema
But while executing the same via Spark Streaming Job, we cannot do the above since streaming can have only on Action.
Please let me know.
If you are listening from kafka topic you can not rely on spark to automaticly infer json schema since it will take a lot of time. So somehow you need to provide your schema to your application.
If you are listening from file source you can do that though.