I have a firehose of stuff coming through a redis pub/sub and I need to distribute it to a number of websocket connections, so basically whenever a message comes from redis, it needs to be distributed through all websockets connections.
I want multiple consumer. And each of them should get all the messages.
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false
});
var cts = new CancellationTokenSource();
var producer = Task.Run(async () =>
{
int i = 0;
while (!cts.IsCancellationRequested)
{
channel.Writer.TryWrite(i++);
await Task.Delay(TimeSpan.FromMilliseconds(250));
}
});
var readerOneTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader one: {i}");
}
});
var readerTwoTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader two: {i}");
}
});
cts.CancelAfter(TimeSpan.FromSeconds(5));
Console.ReadLine();
A single
Channel<T>
cannot broadcast messages to multiple consumers. Every time a message is read from the channel, the message is consumed, and no other consumer is going to get it. If you want to broadcast all messages to all consumers, you'll have to create one dedicatedChannel<T>
per consumer.You might find this question interesting: Factory for IAsyncEnumerable or IAsyncEnumerator. It shows various ways to implement a source/controller for an
IAsyncEnumerable<T>
sequence, that include channels and Rx subjects.Update: Below is a demo of how you could use multiple channels, in order to propagate all the messages to all the consumers.
Try it on Fiddle.
The
CreateConsumer
is an asynchronous method that is responsible for creating the channel and adding it in the list. It is also responsible for removing the channel from the list when the consumer completes. This is important, otherwise in case a consumer fails the producer would continue pushing messages in the dead channel, resulting in a memory leak.The "body" of the consumer, that can be different for each consumer, is passed as an asynchronous lambda to the
CreateConsumer
method.It is important that all consumers are started, and their channels are created, before starting the producer. That's why the
CreateConsumer
method is not wrapped in aTask.Run
. This way the code inside theCreateConsumer
until the firstawait
runs synchronously on the same thread that invoked theCreateConsumer
.Every access to the list with the channels is protected with a
lock
, because it is possible that multiple threads might attempt to read/modify the list at the same time.