KTable GroupBy Function Overload in Kotlin

295 Views Asked by At

Attempting to replicate a simple word count example for Kafka Streams.

val groupedByWord: KTable<String, Long> = source
        .flatMapValues<Any> { value: String ->
            listOf(
                value.lowercase(Locale.getDefault()).split("\\W+").toTypedArray()
            )
        }
        .groupBy( { (key: String, word: String) -> word }, Grouped.with(Serdes.String(), Serdes.String()) )

Unfortunately compilation fails due to the second parameter in the groupby being confused with Serialized.

None of the following functions can be called with the arguments supplied: 
public abstract fun <KR : Any!> groupBy(p0: KeyValueMapper<in String!, in Any!, TypeVariable(KR)!>!, p1: Grouped<TypeVariable(KR)!, Any!>!): KGroupedStream<TypeVariable(KR)!, Any!>! defined in org.apache.kafka.streams.kstream.KStream
public abstract fun <KR : Any!> groupBy(p0: KeyValueMapper<in String!, in Any!, TypeVariable(KR)!>!, p1: Serialized<TypeVariable(KR)!, Any!>!): KGroupedStream<TypeVariable(KR)!, Any!>! defined in org.apache.kafka.streams.kstream.KStream

I'm looking for suggestions on how to resolve this.

1

There are 1 best solutions below

2
On

if there's no type change from your original stream Serdes you should be able to just use the first arg and in Kotlin you can also remove the parentheses in this case, like :

groupBy { _, word -> word }