Channels with CancellationTokenSource with timeout memory leak after dispose

3.1k Views Asked by At

The complete reproducible code is on github, the memory will soon rocket after launching the executable. The code resides mostly in AsyncBlockingQueue.cs class.

The following code implements a simple async "blocking" queue:

        public async Task<T> DequeueAsync(
            int timeoutInMs = -1,
            CancellationToken cancellationToken = default)
        {
            try
            {
                using (CancellationTokenSource cts = this.GetCancellationTokenSource(timeoutInMs, cancellationToken))
                {
                    T value = await this._channel.Reader.ReadAsync(cts?.Token ?? cancellationToken).ConfigureAwait(false);
                    return value;
                }
            }
            catch (ChannelClosedException cce)
            {
                await Console.Error.WriteLineAsync("Channel is closed.");
                throw new ObjectDisposedException("Queue is disposed");
            }
            catch (OperationCanceledException)
            {
                throw;
            }
            catch (Exception ex)
            {
                await Console.Error.WriteLineAsync("Dequeue failed.");
                throw;
            }
        }


        private CancellationTokenSource GetCancellationTokenSource(
            int timeoutInMs,
            CancellationToken cancellationToken)
        {
            if (timeoutInMs <= 0)
            {
                return null;
            }

            CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            cts.CancelAfter(TimeSpan.FromMilliseconds(timeoutInMs));
            return cts;
        }

When used in this way, it has memory leaks:

try
{
   string message = await this._inputQueue.DequeueAsync(10,cancellationToken).ConfigureAwait(false);
}
catch(OperationCanceledException){
   // timeout 
}

enter image description here

3

There are 3 best solutions below

10
On

I was able to reproduce the issue you are observing. It is clearly a flaw in the Channels library IMHO. Here is my repro:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public static class Program
{
    public static async Task Main()
    {
        var channel = Channel.CreateUnbounded<int>();
        var bufferBlock = new BufferBlock<int>();
        var asyncCollection = new Nito.AsyncEx.AsyncCollection<int>();
        var mem0 = GC.GetTotalMemory(true);
        int timeouts = 0;
        for (int i = 0; i < 10; i++)
        {
            var stopwatch = Stopwatch.StartNew();
            while (stopwatch.ElapsedMilliseconds < 500)
            {
                using var cts = new CancellationTokenSource(1);
                try
                {
                    await channel.Reader.ReadAsync(cts.Token);
                    //await bufferBlock.ReceiveAsync(cts.Token);
                    //await asyncCollection.TakeAsync(cts.Token);
                }
                catch (OperationCanceledException) { timeouts++; }
            }
            var mem1 = GC.GetTotalMemory(true);
            Console.WriteLine($"{i + 1,2}) Timeouts: {timeouts,5:#,0},"
                + $" Allocated: {mem1 - mem0:#,0} bytes");
        }
    }
}

Output:

 1) Timeouts:   124, Allocated: 175,664 bytes
 2) Timeouts:   250, Allocated: 269,720 bytes
 3) Timeouts:   376, Allocated: 362,544 bytes
 4) Timeouts:   502, Allocated: 453,264 bytes
 5) Timeouts:   628, Allocated: 548,080 bytes
 6) Timeouts:   754, Allocated: 638,800 bytes
 7) Timeouts:   880, Allocated: 729,584 bytes
 8) Timeouts: 1,006, Allocated: 820,304 bytes
 9) Timeouts: 1,132, Allocated: 919,216 bytes
10) Timeouts: 1,258, Allocated: 1,011,928 bytes

Try it on Fiddle.

Around 800 bytes are leaked per operation, which is quite nasty. The memory is reclaimed every time a new value is written in the channel, so for a busy channel this design flaw should not be an issue. But for a channel that receives values sporadically, this can be a showstopper.

There are other asynchronous queue implementations available, that do not suffer from the same issue. You can try commenting the await channel.Reader.ReadAsync(cts.Token); line and uncommenting any of the two lines below. You will see that both the BufferBlock<T> from the TPL Dataflow library, and the AsyncCollection<T> from the Nito.AsyncEx.Coordination package, allow asynchronous retrieval from the queue with timeout, without memory leakage.

15
On

Update

From the comments :

there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up

This means that what's really needed is a way to batch messages by both count and period. Doing either is relatively easy .

This method batches by count. The method adds messages to the batch list until the limit is reached, sends the data downstream and clears the list :

static ChannelReader<Message[]> BatchByCount(this ChannelReader<Message> input, int count, CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    _ = Task.Run(async ()=>{
        var batch=new List<Message>(count);
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
            if(batch.Count==count)
            {
                await writer.WriteAsync(batch.ToArray());
                batch.Clear();
            }
        }
    },token)
   .ContinueWith(t=>writer.TryComplete(t.Exception));
   return channel;
}

A method that batches by period is more complicated, as the timer can fire at the same time a message is received. Interlocked.Exchange replaces the existing batch list with a new one and sends the batched data downstream. :

static ChannelReader<Message[]> BatchByPeriod(this ChannelReader<Message> input, TimeSpan period, CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded();
    var writer=channel.Writer;   

    var batch=new List<Message>();
    Timer t=new Timer(async obj =>{
        var data=Interlocked.Exchange(ref batch,new List<Message>());
        writer.WriteAsync(data.ToArray());
    },null,TimeSpan.Zero,period);

    _ = Task.Run(async ()=>{
        
        await foreach(var msg in input.ReadAllAsync(token))
        {
            batch.Add(msg);
        }
    },token)
   .ContinueWith(t=>{
        timer.Dispose();
        writer.TryComplete(t.Exception);
   });
   return channel;
}

To do both - I'm still working on that. The problem is that both the count and timer expiration can occur at the same time. Worst case, lock(batch) can be used to ensure only the thread or loop can send the data downstream

Original Answer

Channels don't leak when used properly - just like any other container. A Channel isn't an asynchronous queue and definitely not a blocking one. It's a very different construct, with completely different idioms. It's a higher-level container that uses queues. There's a very good reason there are separate ChannelReader and ChannelWriter classes.

The typical scenario is to have a publisher create and own the channel. Only the publisher can write to that channel and call Complete() on it. Channel doesn't implement IDisposable so it can't be disposed. The publisher only provides a ChannelReader to subscribers.

Subscribers only see a ChannelReader and read from it until it completes. By using ReadAllAsync a subscriber can keep reading from a ChannelReader until it completes.

This is a typical example :

ChannelReader<Message> Producer(CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<Message>();
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<100;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }
            //Simulate some work
            await Task.Delay(100);
            await writer.WriteAsync(new Message(...));          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

The subscriber only needs a ChannelReader to work. By using ChannelReader.ReadAllAsync the subscriber only needs await foreach to process messages:

async Task Subscriber(ChannelReader<Message> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        //Use the message
    }
}

The subscriber can produce its own messages by returning a ChannelReader. And this is where things become very interesting, as the Subscriber method becomes a step in a pipeline of chained steps. If we convert the methods to extension methods on ChannelReader we can easily create an entire pipeline.

Let's generate some numbers :

ChannelReader<int> Generate(int nums,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<int>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        for(int i=0;i<nums;i++)
        {
            //Check for cancellation
            if(token.IsCancellationRequested)
            {
                return;
            }

            await writer.WriteAsync(i*7);  
            await Task.Delay(100);        
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    //This casts to a ChannelReader
    return channel;
}

Then double and square them :

ChannelReader<double> Double(this ChannelReader<int> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(2.0*msg);          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

ChannelReader<double> Root(this ChannelReader<double> input,CancellationToken token=default)
{
    var channel=Channel.CreateBounded<double>(10);
    var writer=channel.Writer;

    //Create the actual "publisher" worker
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(Math.Sqrt(msg));          
        }
    }  ,token)
    //Complete and propagate any exceptions
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel;
}

And finally print them

async Task Print(this ChannelReader<double> input,CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        Console.WriteLine(msg);
    }
}

Now we can build a pipeline


await Generate(100)
          .Double()
          .Square()
          .Print();

And add a cancellation token to all steps :

using var cts=new CancellationTokenSource();
await Generate(100,cts.Token)
          .Double(cts.Token)
          .Square(cts.Token)
          .Print(cts.Token);

Memory usage can increase if one step produces messages faster than they're consumed for a long time. This is easily handled by using a bounded instead of an unbounded channel. This way, if a method is too slow all the previous methods will have to await before publishing new data.

6
On

I was so preoccupied with the technical details of the actual problem I forgot the problem is already solved almost out-of-the-box.

From the comments, it looks like the actual question is :

there is a processor to process the messages in batch. it starts to process when there are enough messages or time is up, that's where the timeout cancellation comes up

This is provided by the Buffer operator of ReactiveX.NET, which is built by the same team that creates System.Linq.Async :

ChannelReader<Message> reader=_channel;

IAsyncEnumerable<IList<Message>> batchItems = reader.ReadAllAsync(token)
                                              .ToObservable()
                                              .Buffer(TimeSpan.FromSeconds(30), 5)
                                              .ToAsyncEnumerable();

await foreach(var batch in batchItems.WithCancellation(token))
{
 ....
}

These calls can be converted into an extension method, so instead of a DequeueAsync, the question's class could have a BufferAsync or GetWorkItemsAsync method:

public IAsyncEnumerable<T[]> BufferAsync(
            TimeSpan timeSpan,
            int count,
            CancellationToken cancellationToken = default)
{
    return _channel.Reader.BufferAsync(timeSpan,count,cancellationToken);
}

ToObservable and ToAsyncEnumerable are provided by System.Linq.Async and convert between IAsyncEnumerable and IObservable, the interface used by ReactiveX.NET.

Buffer is provided by System.Reactive and buffers item by count or period, even allowing for overlapping sequences.

While LINQ and LINQ to Async provide query operators over objects, Rx.NET does the same over time-based streams of events. It's possible to aggregate over time, buffer events by time, throttle them etc. The examples in the (unofficial) doc page for Buffer show how to create overlapping sequences (eg sliding windows). The same page shows how Sample or Throttle can be used to throttle fast event streams by propagating only the last event in a period.

Rx uses a push model (new events are pushed to subscribers) while IAsyncEnumerable, like IEnumerable, use a pull model. ToAsyncEnumerable() will cache items until they're requested, which can lead to problems if nobody's listening.

With these methods, one could even create extension methods to buffer or throttle the publishers :

    //Returns all items in a period
    public static IAsyncEnumerable<IList<T>> BufferAsync<T>(
        this ChannelReader<T> reader, 
        TimeSpan timeSpan, 
        int count,
        CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Buffer(timeSpan, count)
            .ToAsyncEnumerable();
    }
        
        
    //Return the latest item in a period
    public static IAsyncEnumerable<T> SampleAsync<T>(
        this ChannelReader<T> reader, 
        TimeSpan timeSpan,
        CancellationToken token = default)
    {
        return reader.ReadAllAsync(token)
            .ToObservable()
            .Sample(timeSpan)
            .ToAsyncEnumerable();
    }