We want to do Kstream-Kstream join based on the common Field(primary key). Currently with the below code we are getting result as just merging 2 Streams without any primary key constraint.
val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)
userRegions.join(regionMetrics)(
((regionValue, metricValue) => regionValue + "/" + metricValue),
JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)
Could you please suggest how to join 2 Streams based on common field/Column.
In order to join the 2 streams based on a common field/column you can use the selectKey() function, I will provide you an snipped that can help you.
I hope this solution helps you.
Thank you