My spark streaming context is successfully subscribed to my kafka topic where my tweets are streamed using my twitter producer.But no messages is being streamed from topic in my spark streaming! Here is my code
def main(args: Array[String]){
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "127.0.0.1:9093",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "earliest", //earliest/latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val sparkConf = new SparkConf().setAppName("StreamTweets").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = List("topic_three")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
stream.map(record => (record.key, record.value))
stream.print()
stream.foreachRDD { rdd =>
rdd.foreach { record =>
val value = record.value()
val tweet = scala.util.parsing.json.JSON.parseFull(value)
val map:Map[String,Any] = tweet.get.asInstanceOf[Map[String, Any]]
println(map.get("text"))
}
}
ssc.start()
ssc.awaitTermination()