spark-streaming-kafka-0-10 does not support message handler

101 Views Asked by At

My use case is to print offset number , partition , topic for each record that has been read from kafka from a spark streaming application.

currently my code to create discrete stream looks like this.

val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, params, topicPartition_map)
)

but to get metadata about each record i need to pass in message handler.. so i am expecting to use something like this..

val messageHandler = { mmd: MessageAndMetadata[String, String] => (mmd.topic, mmd.key, mmd.message)

val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, params, topicPartition_map),
  messageHandler
)

or

val messageHandler = { mmd: MessageAndMetadata[String, String] => (mmd.topic, mmd.key, mmd.message)
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String, String)](
      ssc, params, topicPartition_map,messageHandler)

but the createDirectStream is not available with method argument that accepts message handler..

my sbt dependency is.

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion exclude("org.slf4j","slf4j-log4j12") exclude("com.fasterxml.jackson.module","*") scalaVersion := "2.12.12" val sparkVersion = "2.4.3"

0

There are 0 best solutions below