Kafka streams - joining two ktables invokes join function twice

4k Views Asked by At

i am trying to join 2 KTables.

KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(RecordBean.class),
    bidTopic, RECORDS_STORE);

KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
    new JsonPOJOSerde<>(ImpressionBean.class),
    impressionTopic, IMPRESSIONS_STORE);

KTable<String, RecordBean> mergedByTxId = recordsTable
    .join(impressionsTable, merge());

The merge functions is very simple, i am just copying value from one bean to another.

public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
  v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
  return v1;
};

But for some reasons the join function is calling twice on single produced record. Please see streaming/producer config below

Properties streamsConfiguration = new Properties();
streamsConfiguration
    .put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
    .getAbsolutePath());

return streamsConfiguration;

Producer config -

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

return producerConfig;

Next i am submitting single record per stream. Both records has same keys. I am expecting to receive single record as output.

 IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
    Arrays.asList(new KeyValue("1", getRecordBean("1"))),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
    Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        outputTopic, 1);

But ValueJoiner triggers 2 times, and i am getting 2 identical output records instead one. During trigger time - both values from both streams exist - and i cannot get what is triggering second execution.

Without joining - i cannot reproduce this behavior. I cannot find any working example of 2 ktable join - so cannot understand whats wrong with my approach.

Adding simple code that demonstrate same behavior

KStreamBuilder builder = new KStreamBuilder();

KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");

KTable<String, String> joined = first.join(second, (value1, value2) -> value1);

joined.to("output");

KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());

streams.start();

IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
    Arrays.asList(new KeyValue("1", "first stream")),
    getProducerProperties());

IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
    Arrays.asList(new KeyValue("1", "second stream")),
    getProducerProperties());

List<KeyValue<String, String>> parsedRecord =
    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
        "output", 1);
2

There are 2 best solutions below

1
On BEST ANSWER

I got following explanation after posting similar question to Confluent mail groups.

I think this might be related to caching. The caches for the 2 tables are flushed independently, so there is a chance you will get the same record twice. If stream1 and stream2 both receive a record for the same key, and the cache flushes, then:

The cache from stream1 will flush, perform the join, and produce a record.

The cache from stream2 will flush, perform the join, and produce a record.

Technically this is ok as the result of the join is another KTable, so the value in the KTable will be the correct value.

After setting following variable to 0 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0 - issue was resolved. I still got 2 records - but now one record is joined with null - and its much clear behavior according to join semantics document that was provided above.

0
On

I found the same behavior using leftJoin between two KTables and stumbled upon this post after googling. I don't know what version of kafka-streams you were using, but after having debugged confluent code, kafka-streams version 2.0.1 seems to deliberately send old and new values in certain types of joins, so you get two calls to the ValueJoiner.

Take a look at the implementation of org.apache.kafka.streams.kstream.internals.KTableImpl#buildJoin which constructs the join topology, as well as org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin.KTableKTableRightJoinProcessor#process which dispatches it at runtime. Clearly done twice in some scenarios.

Here is some background of this behavior https://issues.apache.org/jira/browse/KAFKA-2984