My use case is to print offset number , partition , topic for each record that has been read from kafka from a spark streaming application.
currently my code to create discrete stream looks like this.
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, params, topicPartition_map)
)
but to get metadata about each record i need to pass in message handler.. so i am expecting to use something like this..
val messageHandler = { mmd: MessageAndMetadata[String, String] => (mmd.topic, mmd.key, mmd.message)
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, params, topicPartition_map),
messageHandler
)
or
val messageHandler = { mmd: MessageAndMetadata[String, String] => (mmd.topic, mmd.key, mmd.message)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String, String)](
ssc, params, topicPartition_map,messageHandler)
but the createDirectStream is not available with method argument that accepts message handler..
my sbt dependency is.
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion exclude("org.slf4j","slf4j-log4j12") exclude("com.fasterxml.jackson.module","*") scalaVersion := "2.12.12" val sparkVersion = "2.4.3"