I am new to kafka streams and i have the following scenario. There is a topic that contains records of type Event some of them are additional info of the actual Event (something like an update). Those two records are generated almost at the same time from the source and have the same key. I need to combine those two events into a new updated Event and then write it to a new topic.
So far i have done something like the following. Is it ok to join two kstreams that have been created with .split() in the same topology? Is this a valid approach?
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> eventsStream = builder.stream("events-topic");
Predicate<String, Event> isEventType = (key, value) -> {
return value.getType().equals("event");
};
Predicate<String, Event> isUpdateType = (key, value) -> {
return value.getType().equals("update");
};
ValueJoiner<Event, Event, Event> joiner = (event, update) -> {
event.setText(event.getText() + update.getAdditionalText());
return event;
};
//split events into sub streams
Map<String, KStream<String, Event>> subStreams = eventsStream
.split(Named.as("type_"))
.branch(isEventType, Branched.as("events"))
.branch(isUpdateType, Branched.as("updates"))
.defaultBranch(Branched.as("other"));
//join sub stream of events with sub stream of updates
subStreams.get("type_events").join(
subStreams.get("type_updates"),
joiner,
JoinWindows.of(Duration.ofMillis(10))
).to("out-topic");
Presently, Kafka Streams has a limitation that it does not support self-joins. See https://issues.apache.org/jira/browse/KAFKA-7497
Given that restriction, splitting up the topic into two and then joining those is a suitable solution!