how to handle one to many relationship using kafka streams join operations

1.7k Views Asked by At

can you please help me how to achieve this using Kafka streams?

Scenario: Grouping all the invoices for an order data. In real-time streaming, there might be a delay in receiving invoices. so we want to wait 20 mins to group all the invoices before joining it.

Example: Order 'x' has 3 invoices that may expect to receive in 20 mins time period.

Expected output: Order and 3 invoices should be available as single data in the output topic.

We have the below topology to achieve this.

  1. we have the order stream and invoice stream separately

  2. We are grouping invoices based on the order key. we set 20 mins tumbling windows

  3. Joining order data with the generated group of invoices

  4. writing the output to a new topic

Problem: Step 3 is not waiting for step 2 completion. Join the operation performed as soon as the order is received. so we are not getting the expected output.

we tried to achieve the same using join windows. but since join windows are sliding windows, we are getting duplicate data in the output topic.

For the above example, If we use join windows instead of tumbling windows, we are getting 3 output data which order has 1 invoice, 2 invoices, and 3 invoices respectively.

Please help me to resolve this issue or suggest any alternative approach

Code snippet:

 KTable<Windowed<String>, List<InvoiceList>> invoiceList= invoiceStream
                .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(1200)))
                .aggregate(() -> new ArrayList<InvoiceList>(),
                        (key, newValue, agg) -> {
                            new KeyValue<>(key, agg.add(newValue));
                            return agg;
                        },
                        Materialized.as("invoice-list").with(Serdes.String(), new ArrayListSerde<InvoiceList>(AppSerdes.InvoiceList())))
 
KStream<String, Order> orderOutput=
 
                orderStream.join(invoiceList, Joiner);
 
       
        orderOutput.to(AppConfig.OutputTopic.OUTPUT_ORDER,Produced.with(Serdes.String(), AppSerdes.Order()));
2

There are 2 best solutions below

0
S.N On

I assume, the order comes first and then the invoice(s), not other way around. If my assumption is right then your logic won't work. Because by the time order comes into your KStream, there might not be invoices and hence the join doesn't fetch any. Please remember, the KStream-KTable joins are non-windowed joins and can be used like lookups against a KTable (changelog stream).

1
Devika Elangovan On

This joining dint work in our case. So we received it as a two separate streams and added custom logic on the consumer to handle our use case.

Thanks!