We are experiencing what amounts to “lost packets” in our stream processing when we use custom groupByKey() values. We have a single processor node, with a source topic from which we read packets, do a grouping and aggregation on that group, and output based on a computation that requires access to a statestore.
Let me give greater details of the problem and how we have tried to understand it until now, below:
Overview We are setting up a Kafka Streams application in which we have to perform windowed operations. We are grouping devices based on a specific key. Following are the sample columns we are using for GroupBy:
+---------+---------+------+
| Field Name | Field Value |
+---------+---------+------+
| A | 12 |
| B | abc |
| C | x13 |
+---------+---------+------+
Sample Key based on the above data: 12abcx13 where key = Field (A) + Field (B) + Field (C)
Problem Getting a different count of records in two scenarios against the same key When specifying the key ourselves using groupBy() Using groupByKey() to group the data on the ‘Input Kafka Topic’ partitioning key.
Description We were first using the groupBy() function of Kafka streams to group the devices using the key above. In this case, the streams application dropped several records and produced less number of records than expected. However, when we did not specify our own custom grouping using the groupBy() function, and instead used groupByKey() to key the data on the original incoming Kafka partition key, we got the exact number of records which were expected.
To check that we were using the exact same keys as the input topic for our custom groupBy() function we compared both Keys within the code. The Input topic key and the custom key were exactly the same.
So now we have come to the conclusion that there is some internal functionality of the groupBy function that we are not able to understand because of which the groupBy function and the groupByKey function both report different counts for the same key. We have searched multiple forums but are unable to understand the reason for this phenomenon.
Code Snippet:
With groupBykey()
KStream<String, Output> myStream = this.stream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name(), this.store.name());
With groupBy():
KStream<String, Output> myStream = this.stream
.groupBy((key, value) -> value.A + value.B + value.C,
Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name()), this.store.name());
Kafka Cluster Setup
----------------------------
| No. of Nodes | 3 |
----------------------------
| CPU Cores | 2 |
----------------------------
| RAM | 8 GB |
----------------------------
Streaming Application
-----------------------------------------
| Kafka Streams Version | 2.3.0 |
-----------------------------------------
| Java Version | 11 |
-----------------------------------------
If
key == value.A + value.B + value.C
both programs should do the same thing. Not sure, why you want to usegroupBy()
though: it overwriteskey
with the same value?The difference between both program is, that
groupBy()
will result in a (unnecessary) repartitions topic, compared togroupByKey()
. In general, repartitioning may lead to out-of-order data what could impact your downstream processing. However, for your case in which you don't change the key, out-of-order should not be introduced as the partition of a record should not change. The only case I can think of is that your upstream application uses a different partitioning strategy compare to Kafka Streams (note, that not all tools use the same hash-partitioning as the Java clients).