I am using a Channel<T>
in a producer-consumer scenario, and I have the requirement to consume the channel in batches of 10 items each, and without letting any consumed item to stay idle in a buffer for more than 5 seconds. This duration is the maximum latency allowed between reading an item from the channel, and processing a batch that contains this item. The maximum latency policy has precedence over the batch size policy, so a batch should be processed even with fewer than 10 items, in order to satisfy the max-latency requirement.
I was able to implement the first requirement, in the form of a ReadAllBatches
extension method for the ChannelReader<T>
class:
public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
this ChannelReader<T> channelReader, int batchSize)
{
List<T> buffer = new();
while (true)
{
T item;
try { item = await channelReader.ReadAsync(); }
catch (ChannelClosedException) { break; }
buffer.Add(item);
if (buffer.Count == batchSize)
{
yield return buffer.ToArray();
buffer.Clear();
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
await channelReader.Completion; // Propagate possible failure
}
I am planning to use it like this:
await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
Console.WriteLine($"Processing batch of {batch.Length} items");
}
My question is: how can I enhance my ReadAllBatches<T>
implementation with an additional TimeSpan timeout
parameter, that enforces the aforementioned maximum latency policy, and without installing third-party packages to my project?
Important: The requested implementation should not be susceptible to the memory leak issue that has been reported here. So the loop that consumes the channel should not cause the steady increment of the memory used by the application, in case the producer that writes the items in the channel has become idle for a prolonged period of time.
Note: I am aware that a similar question exists regarding batching the IAsyncEnumerable<T>
interface, but I am not interested to that. I am interested for a method that targets directly the ChannelReader<T>
type, for performance reasons.
Below is an implementation of an idea that was posted on GitHub, by tkrafael.
The internal
CancellationTokenSource
is scheduled with a timer for cancellation, immediately after consuming the first element in the batch.Usage example:
This implementation is non-destructive, meaning that no items that have been consumed from the channel are in danger of being lost. In case the enumeration is canceled or the channel is faulted, any consumed items will be emitted in a final batch, before the propagation of the error.
Note: In case the source
ChannelReader<T>
is completed at the same time that thecancellationToken
is canceled, the cancellation has precedence over completion. This is the same behavior with all native methods of theChannelReader<T>
andChannelWriter<T>
classes. It means that it's possible (although rare) for anOperationCanceledException
to be thrown, even in case all the work has completed.