I am examining the effects of replacing some instances of regular C# event pattern with IAsyncEnumerable. This would be accomplished by lazy instantiation/activation of an IAsyncEnumerable and caching that reference for use by all callers/listeners. Some quick tests (see below) show that this works, but I haven't seen other examples online using IAsyncEnumerable in this fashion.
I realize this isn't exactly what IAsyncEnumerable was created for, and that most would advocate for ReactiveX (https://github.com/dotnet/reactive) in this case. However, I'd appreciate an analysis of why one would or would not want to do this as described (instead of just how to do this with Rx). I've provided a couple of examples below. My candidate event pattern replacement is one where it is more of an event stream (like deserialized messages being produced off a serial connection or UDP socket etc.)
Example 1:
class Program
{
public static async Task Main( string[] args )
{
// Cache an async enumerable (imagine stream of events e.g. deserialized messages arriving)
var asyncEnumerable = GetNumbersAsync( 10 );
// Now multiple consumers want to respond to incoming events/messages (in this case just numbers)
await Task.WhenAll( Task.Run( () => ProcessNumbersAsync( 1, asyncEnumerable ) ), Task.Run( () => ProcessNumbersAsync( 2, asyncEnumerable ) ) );
Console.WriteLine( "DONE!");
}
private static async Task ProcessNumbersAsync( int id, IAsyncEnumerable<int> numbers )
{
await foreach ( var n in numbers )
Console.WriteLine( $"{id}: Processing {n}" );
}
private static async IAsyncEnumerable<int> GetNumbersAsync( int maxNumber )
{
// This would really be async read operations from a remote source
for ( var i = 0; i < maxNumber; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
This produces the output I would want as a user of this pattern:
1: Processing 0
2: Processing 0
2: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
2: Processing 3
1: Processing 3
2: Processing 4
1: Processing 4
2: Processing 5
1: Processing 5
1: Processing 6
2: Processing 6
1: Processing 7
2: Processing 7
2: Processing 8
1: Processing 8
2: Processing 9
1: Processing 9
DONE!
The previous example is putting each consumer on a different thread but based on the context (perhaps WPF app) there could be multiple consumers on the same thread (not possible with IEnumerable but the door is opened with IAsyncEnumerable). The following is in a console app but one can imagine the producers and consumers being created on the UI thread of a WPF app.
Example 2:
class Program
{
public static async Task Main( string[] args )
{
var producer = new Producer();
var consumer1 = new Consumer( 1, producer );
var consumer2 = new Consumer( 2, producer );
var consumer3 = new Consumer( 3, producer );
await Task.WhenAll( consumer1.ConsumeMessagesAsync(), consumer2.ConsumeMessagesAsync(), Task.Run( async () => await consumer3.ConsumeMessagesAsync() ) );
Console.WriteLine( "DONE!");
}
// Singleton producer
private interface IProducer
{
IAsyncEnumerable<int> GetMessagesAsync();
}
// Transient consumer
private interface IConsumer
{
Task ConsumeMessagesAsync();
}
private class Producer : IProducer
{
private const int _maxFakeMessages = 10;
private readonly object _mutex = new Object();
private IAsyncEnumerable<int> _actualIncomingMessagesEnumerable;
public IAsyncEnumerable<int> GetMessagesAsync()
{
// TODO: use AsyncEx AsyncLock
lock ( _mutex )
{
if ( _actualIncomingMessagesEnumerable == null)
_actualIncomingMessagesEnumerable = ReadIncomingMessagesAsync();
}
return _actualIncomingMessagesEnumerable;
}
private static async IAsyncEnumerable<int> ReadIncomingMessagesAsync()
{
for ( var i = 0; i < _maxFakeMessages; i++ )
{
await Task.Delay( 100 );
yield return i;
}
}
}
private class Consumer
{
private readonly int _id;
private readonly IProducer _producer;
public Consumer( int id, IProducer producer )
{
_id = id;
_producer = producer;
}
public async Task ConsumeMessagesAsync()
{
await foreach( var n in _producer.GetMessagesAsync() )
Console.WriteLine( $"{_id}: Processing {n}" );
}
}
}
Again, the output from this is what I would want as a user:
1: Processing 0
2: Processing 0
3: Processing 0
2: Processing 1
3: Processing 1
1: Processing 1
2: Processing 2
1: Processing 2
3: Processing 2
1: Processing 3
2: Processing 3
3: Processing 3
1: Processing 4
2: Processing 4
3: Processing 4
3: Processing 5
1: Processing 5
2: Processing 5
1: Processing 6
3: Processing 6
2: Processing 6
2: Processing 7
1: Processing 7
3: Processing 7
3: Processing 8
2: Processing 8
1: Processing 8
1: Processing 9
3: Processing 9
2: Processing 9
DONE!
One benefit inherent to a pattern like this is that the consumer/caller can have their callback/item-of-type-T-handling-code occur within their own SynchronizationContext. Often the events off a SerialPort or Timer or other source can occur on a background thread and the user -- especially if on a UI thread -- may need to perform their own synchronization. In this case a consumer on the UI thread can always have their code happen on the UI thread while a user in a console app will have it happen on the threadpool.
Am I missing something?
Let's change slightly the implementation of the "event source" of your first example, the
GetNumbersAsync
method:Here is the output after this change:
Each consumer is receiving different "events"!
Although the
IAsyncEnumerable
in your example is a single cached instance, every time you try to enumerate it with anawait foreach
statement a newIAsyncEnumerator
is created, with its life bounded with this specific enumeration. TheIAsyncEnumerator
s are neither thread-safe nor reusable, and if you try to cache one and share it between consumers, with each consumer calling itsMoveNextAsync
method without synchronization, you'll get undefined behavior.If you want a source of
IAsyncEnumerable
s that can be safely subscribed/unsubscribed at any time, and propagate all messages to subscribers that may consume them at different paces, it's nowhere near as trivial as caching anIAsyncEnumerable
created by a C# iterator (a method containingyield
statements). You can find implementations of anAsyncEnumerableSource
here.