Kafka streams with Scala

1.8k Views Asked by At

I am trying to use kafka streams with scala below is my code in Java which works perfectly fine:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

My scala is code as follows:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()

The scala code throws compilation error. Type mismatch expected:ForEachAction[>String,>String],Actual((any,any), Unit) not found:value key not found: value value

Does anyone know how to use streams API in scala

2

There are 2 best solutions below

0
On BEST ANSWER

Your syntax is wrong :). -> is just operator for creating pairs, so expression

(key,value)-> {
  println(key)
}

Has type ((Any, Any), Unit) because compiler cannot infer any type information (and key and value are missing)

If you are using scala 2.12 replacing -> with => should solve the problem, but if you are using older version of scala, you will have to implement java bifunction explicitly:

 textLines.foreach(new BiFunction[T1, T2] { ... })
0
On

You can directly print kafkastream using print method.

textlines.print

It will print the kafka stream. You can even print either key or values by passing argument to print function.