So I created an observable wrapper for Stephen Cleary's AsyncProducerConsumerQueue<T>
with the following code.
I'm wondering if anyone here knows how I could have done this in a much simpler way?
- Could it have been written without a wrapper class?
- Would it be possible to prevent errors from multiple wrappers being applied to one queue?
- Could I make it connect on the first subscription instead of via a direct call to
Connect
? If so, what are the implications of that? - Finally, how would you have done it?
using Nito.AsyncEx;
using System.Reactive;
static async Task ExampleUsage() {
var queue = new AsyncProducerConsumerQueue<int>();
var observable = queue.AsConnectableObservable();
await queue.EnqueueAsync(1);
observable.Subscribe(Console.WriteLine);
observable.Connect();
await queue.EnqueueAsync(2);
}
public static class AsyncExExtensions {
public static IConnectableObservable<T> AsConnectableObservable<T>(this AsyncProducerConsumerQueue<T> queue) {
return new ConnectableObservableForAsyncProducerConsumerQueue<T>(queue);
}
}
class ConnectableObservableForAsyncProducerConsumerQueue<T> : IConnectableObservable<T> {
readonly AsyncProducerConsumerQueue<T> Queue;
long _isConnected = 0;
ImmutableList<IObserver<T>> Observers = ImmutableList<IObserver<T>>.Empty;
public ConnectableObservableForAsyncProducerConsumerQueue(AsyncProducerConsumerQueue<T> queue) {
Queue = queue;
}
public IDisposable Connect() {
if (Interlocked.Exchange(ref _isConnected, 1) == 1) throw new InvalidOperationException("Observable cannot be connected more than once.");
var cts = new CancellationTokenSource();
var token = cts.Token;
Task.Run(async () => {
try {
while (true) {
token.ThrowIfCancellationRequested();
var @event = await Queue.DequeueAsync(token).ConfigureAwait(false);
foreach (var observer in Observers)
observer.OnNext(@event);
}
} catch (Exception x) when (x is OperationCanceledException || x is InvalidOperationException) {
foreach (var observer in Observers)
observer.OnCompleted();
}
});
return Disposable.Create(() => {
cts.Cancel();
cts.Dispose();
});
}
readonly object subscriberListMutex = new object();
public IDisposable Subscribe(IObserver<T> observer) {
lock (subscriberListMutex) {
Observers = Observers.Add(observer);
}
return Disposable.Create(() => {
lock (subscriberListMutex) {
Observers = Observers.Remove(observer);
}
});
}
}
DISCLAIMER: I am not an expert, so there may be aspects of this answer that I've overlooked - use with caution!
Consider the following two demos. These behave differently for the case where you have more than one observer. In the first demo, observers will compete for items on the queue, and in the second they will each receive a copy.
Demo #1 - Cold observable
Demo #2 - Hot observable
To answer your original questions:
Yes, see demos above.
The approaches demo-ed above do not prevent other parties from dequeueing items (or performing any other operation on the queue). If you want ensure you only expose a single
IObservable<T>
for a given queue, consider encapsulating the queue itself, by creating anObservableProducerConsumerQueue<T>
that internally creates and manages its ownAsyncProducerConsumerQueue
. You can expose anEnqueueAsync
method that just delegates to the internal queue and use one of the demo-ed observables above to either expose the observable as a property or implement theIObservable<T>
interface.Demo #2 shows this behaviour and describes the implications. If you want to be able to subscribe observers before connecting, skip the
RefCount
call and use theIConnectableObservable
returned byPublish
as before.As described above, I would have encapsulated the queue and exposed an
IObservable
orIConnectableObservable
using one of the approaches demo-ed above.