I am trying to use kafka streams with scala below is my code in Java which works perfectly fine:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
textLines.foreach((key,values) -> {
System.out.println(values);
});
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
My scala is code as follows:
val builder = new KStreamBuilder
val textLines:KStream[String, String] = builder.stream("TextLinesTopic")
textLines.foreach((key,value)-> {
println(key)
})
val streams = new KafkaStreams(builder, config)
streams.start()
The scala code throws compilation error. Type mismatch expected:ForEachAction[>String,>String],Actual((any,any), Unit) not found:value key not found: value value
Does anyone know how to use streams API in scala
Your syntax is wrong :).
->
is just operator for creating pairs, so expressionHas type ((Any, Any), Unit) because compiler cannot infer any type information (and
key
andvalue
are missing)If you are using scala 2.12 replacing
->
with=>
should solve the problem, but if you are using older version of scala, you will have to implement java bifunction explicitly: