The complete reproducible code is on github, the memory will soon rocket after launching the executable. The code resides mostly in AsyncBlockingQueue.cs class.
The following code implements a simple async "blocking" queue:
public async Task<T> DequeueAsync(
int timeoutInMs = -1,
CancellationToken cancellationToken = default)
{
try
{
using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
{
T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
return value;
}
}
catch (ChannelClosedException cce)
{
await Console.Error.WriteLineAsync("Channel is closed.");
throw new ObjectDisposedException("Queue is disposed");
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync("Dequeue failed.");
throw;
}
}
private CancellationTokenSource GetCancellationTokenSource(
int timeoutInMs,
CancellationToken cancellationToken)
{
if (timeoutInMs <= 0)
{
return null;
}
CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
return cts;
}
When used in this way, it has memory leaks:
try
{
string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
// timeout
}

Update
From the comments :
This means that what's really needed is a way to batch messages by both count and period. Doing either is relatively easy .
This method batches by count. The method adds messages to the
batchlist until the limit is reached, sends the data downstream and clears the list :A method that batches by period is more complicated, as the timer can fire at the same time a message is received.
Interlocked.Exchangereplaces the existingbatchlist with a new one and sends the batched data downstream. :To do both - I'm still working on that. The problem is that both the count and timer expiration can occur at the same time. Worst case,
lock(batch)can be used to ensure only the thread or loop can send the data downstreamOriginal Answer
Channels don't leak when used properly - just like any other container. A Channel isn't an asynchronous queue and definitely not a blocking one. It's a very different construct, with completely different idioms. It's a higher-level container that uses queues. There's a very good reason there are separate ChannelReader and ChannelWriter classes.
The typical scenario is to have a publisher create and own the channel. Only the publisher can write to that channel and call
Complete()on it.Channeldoesn't implementIDisposableso it can't be disposed. The publisher only provides aChannelReaderto subscribers.Subscribers only see a
ChannelReaderand read from it until it completes. By usingReadAllAsynca subscriber can keep reading from a ChannelReader until it completes.This is a typical example :
The subscriber only needs a
ChannelReaderto work. By using ChannelReader.ReadAllAsync the subscriber only needsawait foreachto process messages:The subscriber can produce its own messages by returning a ChannelReader. And this is where things become very interesting, as the
Subscribermethod becomes a step in a pipeline of chained steps. If we convert the methods to extension methods onChannelReaderwe can easily create an entire pipeline.Let's generate some numbers :
Then double and square them :
And finally print them
Now we can build a pipeline
And add a cancellation token to all steps :
Memory usage can increase if one step produces messages faster than they're consumed for a long time. This is easily handled by using a bounded instead of an unbounded channel. This way, if a method is too slow all the previous methods will have to await before publishing new data.