Kafka Streams: Efficient Batch Collection and State Store Management

26 Views Asked by At

I have a Kafka topic with 100 partitions, and I need to map messages into another type( it is long-term processing logic, about 500 ms for each message), then collect them into batches and send them to an external service. Here is my KStream bean:

    @Bean
    public KStream<String, List<Request>> kStream(StreamsBuilder streamsBuilder) {
        var stream = streamsBuilder
                .stream(topic, Consumed.with(Serdes.String(), getIncomingMessageSerde()));

        var storeBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(storeName),
                Serdes.String(),
                getBatchSerde());
        streamsBuilder.addStateStore(storeBuilder);

        stream
                .mapValues(asyncService::processBatch) // process first
                .process(this::getAggregationProcessor, storeName) // then processor collects messages into batch
                .foreach(asyncService::saveBatches); // then save batch somewhere(app. 30 messages size)

        return stream;
    }

I created this processor for batching messages:

public class AggregationProcessor implements Processor<String, BatchInfo, String, AggregatedBatchesInfo> {
    private final String stateStoreName;
    private final String storeKey;
    private final Integer batchSize;
    private final Integer flushInterval;
    private ProcessorContext<String, AggregatedBatchesInfo> context;
    private KeyValueStore<String, AggregatedBatchesInfo> stateStore;

    public AggregationProcessor(String stateStoreName, String storeKey, Integer batchSize, Integer flushInterval) {
        this.stateStoreName = stateStoreName;
        this.storeKey = storeKey;
        this.batchSize = batchSize;
        this.flushInterval = flushInterval;
    }

    @Override
    public void init(ProcessorContext<String, AggregatedBatchesInfo> context) {
        this.context = context;
        this.stateStore = context.getStateStore(stateStoreName);
        context.schedule(Duration.ofSeconds(flushInterval),
                PunctuationType.WALL_CLOCK_TIME,
                timestamp -> flushBatch());
    }

    @Override
    public void process(Record<String, BatchInfo> batchRecord) {
        var aggregatedBatches = getAggregatedBatches();
        aggregatedBatches.add(batchRecord.value());
        updateState(aggregatedBatches);
        if (aggregatedBatches.size() >= batchSize) {
            flushBatch();
        }
    }

    private void flushBatch() {
        var aggregatedBatches = getAggregatedBatches();
        if (!aggregatedBatches.isEmpty()) {
            context.forward(new Record<>(randomUUID().toString(), aggregatedBatches, System.currentTimeMillis()));
            updateState(new AggregatedBatchesInfo());
        }
    }

    private AggregatedBatchesInfo getAggregatedBatches() {
        final var storeValue = stateStore.get(storeKey);
        return isNull(storeValue) ? new AggregatedBatchesInfo() : storeValue;
    }

    private void updateState(AggregatedBatchesInfo aggregatedBatches) {
        stateStore.put(storeKey, aggregatedBatches);
    }

    @Override
    public void close() {
        flushBatch();
        stateStore.flush();
    }
}

The created State Store for saving collected batches is a topic with 100 partitions (each one for the Kafka stream task). For this reason, this store works only for one partition, so I need to wait a long time to collect it before saving it. Is it possible that different tasks could have the same shared store to make this collection process faster? Or maybe I chose the wrong approach to solve this issue.

0

There are 0 best solutions below