I have the following topology in Kafka Streams 7.2.2-ccs:
Or in code:
val groupedStream = StreamsBuilder().stream<String, Quote>("quotes").groupByKey()
for (windowSize in windows()) {
groupedStream
.windowedBy(TimeWindows.ofSizeWithNoGrace(windowSize))
.aggregate({ Aggregator() },{ _, quote, aggregator -> aggregator.execute(quote) })
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("outputTopic")
}
I am using io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
to monitor the application. I have some questions:
- Why isn't there any metrics for the unbounded suppressed store? There any many with the label rocksdb_window_state_id, but none for suppressed.
- How many rocksdb instances will be created if the input topic has 3 partitions? It seems there is a segment concept for window store, but I couldn't find how many segments per window will be created.
- Is there a way to configure RocksDB to flush to disk all keys for windows that were closed? The container is using too much off heap memory, and it keeps growing, and I suspect it's because of that.
Suppress is not based on RocksDB, but uses an in-memory store. There should be something like
in-memory-suppression
for the available metrics (cf https://docs.confluent.io/platform/current/streams/monitoring.html#state-store-metrics)You should get 3 segments (ie, 3 RocksDB) times the number of partitions, ie, 9 RocksDBs in your case with 3 input topic partitions.
No, you cannot control the flushing. However, you can limit RocksDB memory usage via
RocksDBConfigSetter
that you can pass viaStreamsConfig
.