I'm trying to join KStream with GlobalKTable by key, but with specific logic.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"
stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
(key, value) -> key,
(valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});
For example, if the key = "ABC", then:
- first, join by the complete key - i.e. "ABC" = "ABC"
- then, if not joined, join by the first two symbols (one symbol removed) - i.e. "AB" = "AB"
- finally, try to join by only one symbol - i.e. "A" = "A"
Additionally, it is required to know by which condition was the join performed - e.g., by 3 letters / by 2 letters / by 1 letter.
The question is, is it possible at all or should I search for a workaround? For example, make copies of GlobalKTable with corresponding keys (table with "ABC" key, one with "AB" key and one with "A" key) and perform 3 separate joins? Or maybe any other suggestions?
Thanks in advance!
Using a series of left-joins against multiple tables would be possible (if you know of often you want to try the join). If the joins succeeds, you skip the next join. Using a combination of
leftJoin()andbranch()should allow you split the stream after each join into "joined" and "retry". At the end, you canmerge()the different result streams together if you want.