I currently have a Kafka Stream service:
{
val _ = metrics
val timeWindow = Duration.of(config.timeWindow.toMillis, ChronoUnit.MILLIS)
val gracePeriod = Duration.of(config.gracePeriod.toMillis, ChronoUnit.MILLIS)
val store = Materialized
.as[AggregateKey, AggMetricDocument, ByteArrayWindowStore](
config.storeNames.reducerStateStore
)
.withRetention(gracePeriod.plus(timeWindow))
.withCachingEnabled()
builder
.stream[AggregateKey, AggMetricDocument(config.topicNames.aggMetricDocumentsIntermediate)
.groupByKey
.windowedBy(TimeWindows.of(timeWindow).grace(gracePeriod))
.reduce { (metricDoc1, metricDoc2) =>
metricDoc1.copy(
metrics = metricDoc1.metrics
.merge(metricDoc2.metrics, config.metricDocumentsReducerSamplesReservoirSize),
docsCount = metricDoc1.docsCount + metricDoc2.docsCount
)
}(store)
.toStream
.to(config.topicNames.aggMetricDocuments)(
Produced.`with`(AggregateKey.windowedSerde, AggMetricDocument.flattenSerde)
)
}
While
timeWindow=1m
gracePeriod=39h
The stream works fine on normal cardinality but when it starts processing high cardinality data(more than 100 million different keys) the processing rate declines after some time.
By looking at the RocksDB metrics it looks like the avg fetch latency is rising from 30µs to 600µs, and some decreasing in the hit rate of the filters and index as seen in the following test(sending ~15K/sec messages with uniqe keys):
The disk throughput and io seems under the disk limits. The cpu usage and Load Avg increasing(the limit is 5 cores):
I made some RocksDB config modification:
private val cache = new LRUCache(2147483648, -1, false, 0.9) // 2GB
private val writeBufferManager = new WriteBufferManager(2147483648, cache)
val tableConfig = options.tableFormatConfig.asInstanceOf[BlockBasedTableConfig]
tableConfig.setBlockCache(BoundedMemoryRocksDBConfig.cache)
tableConfig.setCacheIndexAndFilterBlocks(true) // Default false
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true)
All other setting has the default values of Kafka Streams. It seems that increasing the LRUCache helps for a while.
I am not sure what the core problem, Does someone have an idea of what causing this problem and which configuration should I tune to get better performance on high cardinality data.