Should we use ValueTask within System.Threading.Channels.WaitToReadAsync loops?

905 Views Asked by At

Since we expect to be reading frequently, and for us to often be reading when data is already available to be consumed, should SendLoopAsync return ValueTask rather than Task, so that we can make it allocation-free?

// Caller
_ = Task.Factory.StartNew(_ => SendLoopAsync(cancellationToken), TaskCreationOptions.LongRunning, cancellationToken);

// Method
private async ValueTask SendLoopAsync(CancellationToken cancellationToken)
{
    while (await _outputChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
    {
        while (_outputChannel.Reader.TryRead(out var message))
        {
            using (await _mutex.LockAsync(cancellationToken).ConfigureAwait(false))
            {
                await _clientWebSocket.SendAsync(message.Data.AsMemory(), message.MessageType, true, cancellationToken).ConfigureAwait(false);
            }
        }
    }
}
2

There are 2 best solutions below

7
On BEST ANSWER

No, there is no value at the SendLoopAsync returning a ValueTask instead of Task. This method is invoked only once in your code. The impact of avoiding a single allocation of a tiny object is practically zero. You should consider using ValueTasks for asynchronous methods that are invoked repeatedly in loops, especially in hot paths. That's not the case in the example presented in the question.

As a side note, invoking asynchronous methods with the Task.Factory.StartNew+TaskCreationOptions.LongRunning combination is pointless. The new Thread that is going to be created, will have a very short life. It's gonna be terminated immediately when the code reaches the first await of an incomplete awaitable inside the async method. Also you are getting back a nested Task<Task>, which is tricky to handle. Using Task.Run is preferable. You can read here the reasons why.

Also be aware that the Nito.AsyncEx.AsyncLock class is not optimized for memory-efficiency. Lots of allocations are happening every time the lock is acquired. If you want a low-allocation synchronization primitive that can be acquired asynchronously, your best bet currently is probably to use a Channel<object> instance, initialized with a single null value: retrieve the value to enter, store it back to release.

6
On

The idiomatic way to use channels doesn't need locks, semaphores or Task.Factory.StartNew. The typical way to use a channel is to have a method that accepts just a ChannelReader as input. If the method wants to use a Channel as output, it should create it itself and only return a ChannelReader that can be passed to other methods. By owning the channel the method knows when it can be closed.

In the questions case, the code is simple enough though. A simple await foreach should be enough:

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    await foreach (var msg in reader.ReadAllAsync(cancellationToken))
    {
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, cancellationToken);
    }
}

This method doesn't need an external Task.Run or Task.Factory.New to work. To run it, just call it and store its task somewhere, but not discarded:

public MyWorker(ChannelReader<Message> reader,CancellationToken token)
{
   .....
   _loopTask=SendLoopAsync(reader,token);
}

This way, once the input channel completes, the code can await for _loopTask to finish processing any pending messages.

Any blocking code should run inside it with Task.Run(), eg

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    await foreach (var msg in reader.ReadAllAsync(cancellationToken))
    {
        var new_msg=await Task.Run(()=>SomeHeavyWork(msg),cancellationToken);
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, cancellationToken);
    }
}

Concurrent Workers

This method could be used to start multiple concurrent workers too :


var tasks=Enumerable.Range(0,dop).Select(_=>SendLoopAsync(reader,token));
_loopTask=Task.WhenAll(tasks);
...
await _loopTask;

In .NET 6, Parallel.ForEachAsync can be used to process multiple messages with less code:

private async ValueTask SendLoopAsync(ChannelReader<Message> reader,
                                      CancellationToken cancellationToken)
{
    var options=new ParallelOptions {
        CancellationToke=cancellationToken,
        MaxDegreeOfParallellism=4
    };
    var input=reader.ReadAllAsync(cancellationToken);
    await Parallel.ForEachAsync(input,options,async (msg,token)=>{
        var new_msg=await Task.Run(()=>SomeHeavyWork(msg),token);
        await _clientWebSocket.SendAsync(message.Data.AsMemory(), 
                      message.MessageType, 
                      true, token);
    });
}

Idiomatic Channel Producers

Instead of using a class-level channel stored in a field, create the channel inside the producer method and only return its reader. This way the producer method has control of the channel's lifecycle and can close it when it's done. That's one of the reasons a Channel can only be accessed accessed only through its Reader and Writer classes.

A method can consume a ChannelReader and return another. This allows creating methods that can be chained together into a pipeline.

A simple producer can look like this:

ChannelReader<Message> Producer(CancellationToke token)
{
    var channel=Channel.CreateUnbounded<Message>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        while(!token.IsCancellationRequested)
        {
            var msg=SomeHeavyJob();
            await writer.SendAsync(msg);
        },token)
    .ContinueWith(t=>writer.TryComplete(t));

    return channel.Reader;
}

When cancellation is signaled, the worker exits or an exception is thrown, the main task exists and ContinueWith calls TryComplete on the writer with any exception that may have been thrown. That's a simple non-blocking operation so it doesn't matter what thread it runs on.

A transforming method would look like this:

ChannelReader<Msg2> Transform(ChannelReader<Msg1> input,CancellationToke token)
{
    var channel=Channel.CreateUnbounded<Msg2>();
    var writer=channel.Writer;
    _ = Task.Run(()=>{
        await foreach(var msg1 in input.ReadAllAsync(token))
        {
            var msg2=SomeHeavyJob(msg1);
            await writer.SendAsync(msg2);
        },token)
    .ContinueWith(t=>writer.TryComplete(t));

    return channel.Reader;
}

Turning those methods into static extension methods would allow chaining them one after the other :

var task=Producer()
          .Transformer()
          .Consumer();

I don't set SingleWriter because this doesn't seem to be doing anything yet. Searching for this in the .NET runtime repo on Github doesn't show any results beyond test code.