Kafka streams with Scala

1.8k Views Asked by At

I am trying to use kafka streams with scala below is my code in Java which works perfectly fine:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

My scala is code as follows:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()

The scala code throws compilation error. Type mismatch expected:ForEachAction[>String,>String],Actual((any,any), Unit) not found:value key not found: value value

Does anyone know how to use streams API in scala

2

There are 2 best solutions below

0
L.Lampart On BEST ANSWER

Your syntax is wrong :). -> is just operator for creating pairs, so expression

(key,value)-> {
  println(key)
}

Has type ((Any, Any), Unit) because compiler cannot infer any type information (and key and value are missing)

If you are using scala 2.12 replacing -> with => should solve the problem, but if you are using older version of scala, you will have to implement java bifunction explicitly:

 textLines.foreach(new BiFunction[T1, T2] { ... })
0
Mahesh Chand On

You can directly print kafkastream using print method.

textlines.print

It will print the kafka stream. You can even print either key or values by passing argument to print function.