final KStream<String, EmpModel> empModelStream = getMapOperator(empoutStream);
final KStream<String, EmpModel> empModelinput = getMapOperator(inputStream);
// empModelinput.print();
// empModelStream.print();
empModelStream.join(empModelinput, new ValueJoiner<EmpModel, EmpModel, Object>() {
@Override
public Object apply(EmpModel paramV1, EmpModel paramV2) {
System.out.println("Model1 "+paramV1.getKey());
System.out.println("Model2 "+paramV2.getKey());
return paramV1;
}
},JoinWindows.of("2000L"));
I get error:
Invalid topology building: KSTREAM-MAP-0000000003 and KSTREAM-MAP-0000000004 are not joinable
If you want to join two
KStreamsyou must ensure that both have the same number of partitions. (cf. "Note" box in http://docs.confluent.io/current/streams/developer-guide.html#joining-streams)If you use Kafka
v0.10.1+, repartitioning will happen automatically (cf. http://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning).For Kafka
v0.10.0.xyou have two options:.through("my-repartitioning-topic")to one of theKStreams before the join. You need to create the topic"my-repartioning-topic"with the right number of partitions (ie, same number of partitions as the secondKStream's original input topic) before you start your Streams application