The code below is only process the first message arrived and publishes it properly. But no more messages are processed afterwards(I'm using kafka-console-consumer.bat in the terminal to monitor the messages published to total-amount-by-id)
Kafka Streams:
KStream<String, String> totalAmount = builder.stream("data-consumed", Consumed.with(Serdes.String(), Serdes.String()));
totalAmount
.mapValues(v -> Integer.valueOf(v))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(100)))
.aggregate(
() -> new Integer(0),
(key, value, aggregate) -> {
System.out.println("value: "+value);
System.out.println("aggregate: "+aggregate);
return value+aggregate;
},,
Materialized.with(Serdes.String(), Serdes.Integer())
)
.toStream()
.map(((key, aggregate) -> new KeyValue<>(key.key(), aggregate)))
.to("total-amount-by-id", Produced.with(Serdes.String(), Serdes.Integer()));
Tests:
- I'm publishing 1 message per 100ms, always with the same key, to the topic "data-consumed"
- The first four (k,v) published to the topic "data-consumed" were: (1,1),(1,2),(1,4),(1,1)
- Kafka streams published (1,1) to "total-amount-by-id", but nothing else came after that
- The System.out.println() in the code above printed only: value: 1 aggregate: 0 value: 2 aggregate: 0
Any guess of the reason behind this problem?
*I was expecting to have the second aggregate equals to 1(aggregate: 1)