how aggregate total sum in a period of time(window) with kafka?

891 Views Asked by At

The code below is only process the first message arrived and publishes it properly. But no more messages are processed afterwards(I'm using kafka-console-consumer.bat in the terminal to monitor the messages published to total-amount-by-id)

Kafka Streams:

KStream<String, String> totalAmount = builder.stream("data-consumed", Consumed.with(Serdes.String(), Serdes.String()));


totalAmount
  .mapValues(v -> Integer.valueOf(v))
  .groupByKey()
  .windowedBy(TimeWindows.of(Duration.ofMillis(100)))
  .aggregate(
        () -> new Integer(0),
        (key, value, aggregate) -> {
                        System.out.println("value: "+value);
                        System.out.println("aggregate: "+aggregate);
                        return value+aggregate;
                    },,
        Materialized.with(Serdes.String(), Serdes.Integer())
  )
  .toStream()
  .map(((key, aggregate) -> new KeyValue<>(key.key(), aggregate)))
  .to("total-amount-by-id", Produced.with(Serdes.String(), Serdes.Integer()));

Tests:

  • I'm publishing 1 message per 100ms, always with the same key, to the topic "data-consumed"
  • The first four (k,v) published to the topic "data-consumed" were: (1,1),(1,2),(1,4),(1,1)
  • Kafka streams published (1,1) to "total-amount-by-id", but nothing else came after that
  • The System.out.println() in the code above printed only: value: 1 aggregate: 0 value: 2 aggregate: 0

Any guess of the reason behind this problem?

*I was expecting to have the second aggregate equals to 1(aggregate: 1)

0

There are 0 best solutions below