I have implemented a producer..consumer pattern using TPL Dataflow. The use case is that code reads messages from the Kafka bus. For efficiency, we need to process messages in batches when going to the database.
Is there a way in TPL data flow to hold on to the message and fire whenever a size or duration threshold is hit?
Example, the current implementation post the message once it is pulled from the queue.
postedSuccessfully = targetBuffer.Post(msg.Value);
Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires.
Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods.
This makes building a buffering block very easy :
This method uses two buffer blocks to buffer inputs and outputs.
Buffer()
reads from the input block (the observable) and writes to the output block (the observer) when either the batch is full or the timespan expires.By default, Rx works on the current thread. By calling
ObserveOn(TaskPoolScheduler.Default)
we tell it to process data on a Task pool thread.Example
This code creates a buffer block for 5 items or 1 second. It starts by posting 7 items, waits 1.1 seconds then posts another 7 items. Each batch is written to the console together with the thread ID :
The output is :