I have implemented a producer..consumer pattern using TPL Dataflow. The use case is that code reads messages from the Kafka bus. For efficiency, we need to process messages in batches when going to the database.
Is there a way in TPL data flow to hold on to the message and fire whenever a size or duration threshold is hit?
Example, the current implementation post the message once it is pulled from the queue.
postedSuccessfully = targetBuffer.Post(msg.Value);
While there is no out of the box timeout you can wire up a timer to
TriggerBatchwhenever the downstream pipeline has waited long enough for a batch. Then reset the timer when ever a batch is flowed through. TheBatchBlockwill take care of the rest for you.Now for example, this sample has been configure to cause a batch size of 1 everytime even though the batch block would normally be waiting for 10 elements. The timeout forces emptying whatever is currently stored in the
BatchBlockOutput: