I have an asynchronous sequence (stream) of messages that are arriving sometimes numerously and sometimes sporadically, and I would like to process them in batches of 10 messages per batch. I also want to enforce an upper limit to the latency between receiving a message and processing it, so a batch with fewer than 10 messages should also be processed, if 5 seconds have passed after receiving the first message of the batch. I found that I can solve the first part of the problem by using the Buffer
operator from the System.Interactive.Async package:
IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
// Process batch
}
The signature of the Buffer
operator:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
Unfortunately the Buffer
operator has no overload with a TimeSpan
parameter, so I can't solve the second part of the problem so easily. I'll have to implement somehow a batching operator with a timer myself. My question is: how can I implement a variant of the Buffer
operator that has the signature below?
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);
The timeSpan
parameter should affect the behavior of the Buffer
operator like so:
- A batch must be emitted when the
timeSpan
has elapsed after emitting the previous batch (or initially after the invocation of theBuffer
method). - An empty batch must be emitted if the
timeSpan
has elapsed after emitting the previous batch, and no messages have been received during this time. - Emitting batches more frequently than every
timeSpan
implies that the batches are full. Emitting a batch with less thancount
messages before thetimeSpan
has elapsed, is not desirable.
I am OK with adding external dependencies to my project if needed, like the System.Interactive.Async or the System.Linq.Async packages.
P.S. this question was inspired by a recent question related to channels and memory leaks.
The solution below uses the
PeriodicTimer
class (.NET 6) for receiving timer notifications, and theTask.WhenAny
method for coordinating the timer and enumeration tasks. ThePeriodicTimer
class is more convenient than theTask.Delay
method for this purpose, because it can be disposed directly, instead of requiring an accompanyingCancellationTokenSource
.The timer is restarted each time a chunk is emitted, after the consumer has finished consuming the chunk.
Online demo.
This implementation is destructive, meaning that in case the
source
sequence fails or the enumeration is canceled, any elements that have been consumed previously from thesource
and are buffered, will be lost. See this question for ideas about how to inject a non-destructive behavior.Care has been taken to avoid leaking fire-and-forget
MoveNextAsync
operations or timers.For an implementation that uses the
Task.Delay
method instead of thePeriodicTimer
class, and so it can be used by .NET versions previous than 6.0, you can look at the 7th revision of this answer. That revision includes also a tempting but flawed Rx-based implementation.