Multiple consumers without losing messages

1.5k Views Asked by At

I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections.

I want multiple consumer. And each of them should get all the messages.

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    FullMode = BoundedChannelFullMode.Wait,
    SingleReader = false
});
var cts = new CancellationTokenSource();


var producer = Task.Run(async () =>
{
    int i = 0;
    while (!cts.IsCancellationRequested)
    {
        channel.Writer.TryWrite(i++);

        await Task.Delay(TimeSpan.FromMilliseconds(250));
    }
});

var readerOneTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader one: {i}");
    }
});

var readerTwoTask = Task.Run(async () =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Reader two: {i}");
    }
});

cts.CancelAfter(TimeSpan.FromSeconds(5));

Console.ReadLine();
2

There are 2 best solutions below

11
On BEST ANSWER

A single Channel<T> cannot broadcast messages to multiple consumers. Every time a message is read from the channel, the message is consumed, and no other consumer is going to get it. If you want to broadcast all messages to all consumers, you'll have to create one dedicated Channel<T> per consumer.

You might find this question interesting: Factory for IAsyncEnumerable or IAsyncEnumerator. It shows various ways to implement a source/controller for an IAsyncEnumerable<T> sequence, that include channels and Rx subjects.


Update: Below is a demo of how you could use multiple channels, in order to propagate all the messages to all the consumers.

List<Channel<int>> channels = new();

async Task CreateConsumer(Func<Channel<int>, Task> body)
{
    var channel = Channel.CreateUnbounded<int>();
    lock (channels) channels.Add(channel);
    try
    {
        await Task.Run(() => body(channel)).ConfigureAwait(false);
    }
    finally
    {
        lock (channels) channels.Remove(channel);
    }
}

Task consumer1 = CreateConsumer(async channel =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Consumer one: {i}");
    }
});

Task consumer2 = CreateConsumer(async channel =>
{
    await foreach (var i in channel.Reader.ReadAllAsync())
    {
        Console.WriteLine($"Consumer two: {i}");
    }
});

using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
{
    int i = 0;
    while (true)
    {
        i++;
        lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
        try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
        catch (OperationCanceledException) { break; }
    }
});

try { producer.Wait(); } catch { }
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(producer, consumer1, consumer2);

Try it on Fiddle.

The CreateConsumer is an asynchronous method that is responsible for creating the channel and adding it in the list. It is also responsible for removing the channel from the list when the consumer completes. This is important, otherwise in case a consumer fails the producer would continue pushing messages in the dead channel, resulting in a memory leak.

The "body" of the consumer, that can be different for each consumer, is passed as an asynchronous lambda to the CreateConsumer method.

It is important that all consumers are started, and their channels are created, before starting the producer. That's why the CreateConsumer method is not wrapped in a Task.Run. This way the code inside the CreateConsumer until the first await runs synchronously on the same thread that invoked the CreateConsumer.

Every access to the list with the channels is protected with a lock, because it is possible that multiple threads might attempt to read/modify the list at the same time.

1
On

What you ask is possible, just not in this way.

A Channel is a single asynchronous queue, kind-of like a ConcurrentQueue with an async interface (and order guarantees, backpressure and some other stuff). Just like a ConcurrentQueue, when multiple consumers read from the queue, each one will receive a new message. To send the same message to multiple consumers you'll have to broadcast it.

Common Channel Pattern

A common pattern with channels is for each processing method only consume a ChannelReader passed as input, create and own its own channel and return it as its output. This is very common in Go ( blog post and talk ), where channels are used extensively for producer/consumer communication and pipelines. If you replace <-chan int with ChannelReader you'll realize almost all methods receive a ChannelReader and returns a new one.

This way the processing method can control the lifetime of the channel. When the input completes or the work gets cancelled, completion is propagated to the consumers. Since the output channel was created by the worker itself, the method has full control over its lifetime :

ChannelReader<string> Worker(ChannelReader<int> input,
                             CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            await writer.WriteAsync(msg.ToString(),token);
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel.Reader;
}

This boilerplate code can be generalized if the actual code is passed as a Func<TIn,TOut> or Func<TIn,Task<TOut>> for asynchronous methods:

ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
                             Func<TIn,CancellationToken,Task<TOut>> func,  
                             CancellationToken token=default)
{
    var channel=Channel.CreateUnbounded<string>();
    var writer=channel.Writer;
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {
            var result=await func(msg,token);
            await writer.WriteAsync(result,token);
        }
    },token)
    .ContinueWith(t=>writer.TryComplete(t.Exception));

    return channel.Reader;
}

ChannelReader<TOut> Work<TIn,TOut>(ChannelReader<TIn> input,
                             Func<TIn,CancellationToken,TOut> func,  
                             CancellationToken token=default)
{
...
            var result=func(msg,token);
            await writer.WriteAsync(result,token);
...
}

This can be used to create any processing block, eg :

ChannelReader<int> step1Out=.....;
ChannelReader<int> step2Out=Work<int,int>(step1Out,async (i,token)=>{
    await Task.Delay(i*1000,token);
    return i;
});
ChannelReader<string> step3Out=Work<int,string>(step2Out,(i,token)=>{
    var line=$"Data is {i}";
    Console.WriteLine(line);
    return line;
});

A method that doesn't produce any output can be simpler but asynchronous :

async Task Consume<TIn>(ChannelReader<TIn> input,
                             Action<TIn,CancellationToken> act,  
                             CancellationToken token=default)
{
    await foreach(var msg in input.ReadAllAsync(token))
    {
        act(msg,token);
    }
}

...

await Consume(step2Out,(i,token)=>Console.WriteLine($"Data is {i}"));

Broadcasting

This simple pattern can be adopted to broadcast the same message to N consumers, by creating N channels and returning their readers:

IList<ChannelReader<T>> Broadcast<T>(ChannelReader<T> input, int count, CancellationToken token=default)
{
    var channels=Enumerable.Range(0,count)
                           .Select(_=> Channel.CreateUnbounded<T>())
                           .ToList();
    var writers=channels.Select(c=>c.Writer).ToList();
   
    _ = Task.Run(async ()=>{
        await foreach(var msg in input.ReadAllAsync(token))
        {            
            //Offer the message to all output channels
            foreach(var w in writers)
            {
                await w.WriteAsync(msg,token);
            }
        }
    },token)
    .ContinueWith(t=>{
            foreach(var w in writers)
            {
                writer.TryComplete(t.Exception);
            }
    });

    return channels.Select(c=>c.Reader).ToList();
}

This way, one can use broadcast the same message to multiple consumers :

var broadcast=Broadcast<int,int>(step1Out,2);
var reader1=Consume(broadcast[0],(i,token)=>Console.WriteLine("Reader 0: {0}",i));
var reader2=Consume(broadcast[1],(i,token)=>Console.WriteLine("Reader 1: {0}",i));

Or even

var readers=broadcast.Select((b,idx)=>Consume(b,
                         (i,token)=>Console.WriteLine($"Reader {idx}: {i}"))
                     .ToList();
await Task.WhenAll(readers);