Please don't mark this question as a duplicate of kafka-streams join produce duplicates. I think my scenario is different. I'm also already using kafka EOS via processing.guarantee=exactly_once
I have an input topic transactions_topic with json data that looks like
{
"timestamp": "2022-10-08T13:04:30Z",
"transactionId": "842d38ea-1d3d-41a4-b724-bcc7e81aec9a",
"accountId": "account123",
"amount": 1.0
}
It's represented as a simple class using lombok @Data
@Data
class Transaction {
String transactionId;
String timestamp;
String accountId;
Double amount;
}
I want to compute the total amount spent by accountId for the past 1 hour, past 1 day and past 30 days. These computations are the features represented by the the following class
@Data
public class Features {
double totalAmount1Hour;
double totalAmount1Day;
double totalAmount30Day;
}
I'm using kafka-streams and springboot to achieve this.
First I subscribe to the input topic and select the accountId as key
KStream<String, Transaction> kStream = builder.stream(inputTopic,
Consumed.with(Serdes.String(), new JsonSerde<>(Transaction.class)).
withTimestampExtractor(new TransactionTimestampExtractor())).
selectKey((k,v)-> v.getAccountId());
TransactionTimestampExtractor is implemented as follows
public class TransactionTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
Transaction value = (Transaction) consumerRecord.value();
long epoch = Instant.parse(value.getTimestamp()).toEpochMilli();
return epoch;
}
}
Now in order to compute the total amount for the past 1 hour, past 1 day and past 30 days, I created a function that will aggregate the amount based on a sliding window
private <T> KStream<String, T> windowAggregate(KStream<String, Transaction> kStream,
SlidingWindows window,
Initializer<T> initializer,
Aggregator<String, Transaction, T> aggregator,
Class<T> t) {
return kStream.
groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(Transaction.class))).
windowedBy(window).
aggregate(initializer,
aggregator,
Materialized.with(Serdes.String(), Serdes.serdeFrom(t))).
suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).
toStream().
map((k, v) -> KeyValue.pair(k.key(), v));
}
Now we can use it like
Aggregator<String, Transaction, Double> amountAggregator = (k, v, aggregate) -> aggregate + v.getAmount();
KStream<String, Double> totalAmount1Hour = windowAggregate(kStream, SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)), () -> 0.0, amountAggregator, Double.class);
KStream<String, Double> totalAmount1Day = windowAggregate(kStream, SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(1)), () -> 0.0, amountAggregator, Double.class);
KStream<String, Double> totalAmount30Day = windowAggregate(kStream, SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofDays(30)), () -> 0.0, amountAggregator, Double.class);
Now all I need to do is to join these streams and return a new stream with Features as values
private KStream<String, Features> joinAmounts(KStream<String, Double> totalAmount1Hour, KStream<String, Double> totalAmount1Day, KStream<String, Double> totalAmount30Day) {
JoinWindows joinWindows = JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(0));
KStream<String, Features> totalAmount1HourAnd1Day = totalAmount1Hour.join(totalAmount1Day,
(amount1Hour, amount1Day) -> {
Features features = new Features();
features.setTotalAmount1Hour(amount1Hour);
features.setTotalAmount1Day(amount1Day);
return features;
},
joinWindows,
StreamJoined.with(Serdes.String(), Serdes.Double(), Serdes.Double()));
KStream<String, Features> featuresKStream = totalAmount1HourAnd1Day.join(totalAmount30Day,
(features, amount30Day) -> {
features.setTotalAmount30Day(amount30Day);
return features;
},
joinWindows,
StreamJoined.with(Serdes.String(), new JsonSerde<>(Features.class), Serdes.Double()));
return featuresKStream;
}
I print the features stream for debugging purposes
KStream<String, Features> features = joinAmounts(totalAmount1Hour, totalAmount1Day, totalAmount30Day);
features.print(Printed.<String, Features>toSysOut().withLabel("features"));
This works and prints the correct values for the features however when I process the same payload more than once, the features stream produces duplicates. For example processing the following payload twice produces the following output.
{
"timestamp":"2022-10-08T01:09:32Z",
"accountId":"account1",
"transactionId":"33694a6e-8c15-4cc2-964a-b8b0ecce2682",
"amount":1.0
}
Output
[features]: account1, Features(totalAmount1Hour=2.0, totalAmount1Day=1.0, totalAmount30Day=1.0)
[features]: account1, Features(totalAmount1Hour=1.0, totalAmount1Day=2.0, totalAmount30Day=1.0)
[features]: account1, Features(totalAmount1Hour=2.0, totalAmount1Day=2.0, totalAmount30Day=1.0)
[features]: account1, Features(totalAmount1Hour=1.0, totalAmount1Day=1.0, totalAmount30Day=2.0)
[features]: account1, Features(totalAmount1Hour=2.0, totalAmount1Day=1.0, totalAmount30Day=2.0)
[features]: account1, Features(totalAmount1Hour=1.0, totalAmount1Day=2.0, totalAmount30Day=2.0)
[features]: account1, Features(totalAmount1Hour=2.0, totalAmount1Day=2.0, totalAmount30Day=2.0)
My expected output would be just the last one
[features]: account1, Features(totalAmount1Hour=2.0, totalAmount1Day=2.0, totalAmount30Day=2.0)
How can I achive this and get rid of the duplicates in the features stream? Is kafka-streams join() doing a cartesian product because I have the same timestamp and key?
Yes, the
toStreamwill convert from aKTableback to aKStream, giving you full changelogs for the tables. Then, for every single change of the each of the 3 tables, you will also get a join result.Maybe a better idea to achieve what you want is to chain your aggregations. So that you generate the
KTablefor 1 hour changes, and from this table you derive the 1 day changes, and from the resulting table you finally generate the 30 day changes. See this Wiki page for an example: https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows