I am developing spark-consumer application which is consuming messages from kafka broker and i want to find average of that messages which are coming to spark consumer and finally i want to store that average into cassandra.
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("MyDirectKafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(20))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val count = lines.count()
count.print()
val total = lines.map(JSON.parseFull(_)
.asInstanceOf[Option[List[Map[String,List[Map[String,Double]]]]]]
.map(_(1)("MeterData").map(_("mVolts1"))).getOrElse(List())).flatMap(list => list).reduce((x,y) => x+y)
total.print()
val avg = total.reduce((total,count) => total / count )
avg.print()
ssc.start()
ssc.awaitTermination()
In the above code,i am getting total and count exactly what i am expecting it to be, but i am not able to compute average as count is dstream[long] and total is dstream[double].
I think that this line is having some problem. "val avg = total.reduce((total,count) => total / count )" Any help is appreciated.
Output: Count: This is the count output i get in stream as dstream[Long] Total: This is the total output i get in same stream as dstream[Double]