Kstream-Kstream join based on common field

66 Views Asked by At

We want to do Kstream-Kstream join based on the common Field(primary key). Currently with the below code we are getting result as just merging 2 Streams without any primary key constraint.

val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)


userRegions.join(regionMetrics)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

Could you please suggest how to join 2 Streams based on common field/Column.

1

There are 1 best solutions below

0
J. B On

In order to join the 2 streams based on a common field/column you can use the selectKey() function, I will provide you an snipped that can help you.

val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)

// New code
val userRegionsWithKeys = userRegions.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});

val regionMetricsWithKeys = regionMetrics.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});


userRegionsWithKeys .join(regionMetricsWithKeys )(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

I hope this solution helps you.

Thank you