The complete reproducible code is on github, the memory will soon rocket after launching the executable. The code resides mostly in AsyncBlockingQueue.cs
class.
The following code implements a simple async "blocking" queue:
public async Task<T> DequeueAsync(
int timeoutInMs = -1,
CancellationToken cancellationToken = default)
{
try
{
using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
{
T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
return value;
}
}
catch (ChannelClosedException cce)
{
await Console.Error.WriteLineAsync("Channel is closed.");
throw new ObjectDisposedException("Queue is disposed");
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
await Console.Error.WriteLineAsync("Dequeue failed.");
throw;
}
}
private CancellationTokenSource GetCancellationTokenSource(
int timeoutInMs,
CancellationToken cancellationToken)
{
if (timeoutInMs <= 0)
{
return null;
}
CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
return cts;
}
When used in this way, it has memory leaks:
try
{
string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
// timeout
}
I was able to reproduce the issue you are observing. It is clearly a flaw in the Channels library IMHO. Here is my repro:
Output:
Try it on Fiddle.
Around 800 bytes are leaked per operation, which is quite nasty. The memory is reclaimed every time a new value is written in the channel, so for a busy channel this design flaw should not be an issue. But for a channel that receives values sporadically, this can be a showstopper.
There are other asynchronous queue implementations available, that do not suffer from the same issue. You can try commenting the
await channel.Reader.ReadAsync(cts.Token);
line and uncommenting any of the two lines below. You will see that both theBufferBlock<T>
from the TPL Dataflow library, and theAsyncCollection<T>
from the Nito.AsyncEx.Coordination package, allow asynchronous retrieval from the queue with timeout, without memory leakage.