How to pull from IObservable

121 Views Asked by At

Suppose there is a Subject<T> at endpoint A and an IObservable<T> on endpoint B.

Endpoint A sends exactly one object of T using OnNext() and never calls OnComplete(). I don't have a control over that, it's in an external assembly. When I subscribe on endpoint B in my one, I don't get my lambda called:

endpoint.Subscribe(t => { doSomething(); });

However, when I subscribe like this:

using var cts = new CancellationTokenSource();
await endpoint.ForEachAsync(t => { doSomething(); /* cts.Cancel(); */ }, cts.Token);

I do get my lambda called. But there's a drawback. Since the publisher never calls OnCompleted(), I can't escape that async task unless I call cts.Cancel() after doSomething() method. It kinda works, but I don't like the idea of throwing exceptions as a part of a flow. Is there a better way?

2

There are 2 best solutions below

1
Serge Misnik On

Ok, one of complicated solutions could be:

using var cts = new CancellationTokenSource();
cts.CancelAfter(timeout);  // Some timeout
var timeoutTask = Task.Delay(timeout, cts.Token);
var stopperTask = new TaskCompletionSource();
var subscription = endpoint
                   .ObserveOn(CurrentThreadScheduler.Instance)
                   .Subscribe(t => {
                      // doSomething();
                      stopperTask.SetResult();
                   });
await Task.WhenAny(stopperTask.Task, timeoutTask);
subscription.Dispose();
if (!stopperTask.Task.IsCompleted) stopperTask.SetResult();
if (!timeoutTask.IsCompleted) cts.Cancel();

And thanks to @Enigmativity for a hint, it can be that simple:

// For whatever T
await endpoint.Amb(Observable.Timer(timeout).Select(_ => default(T))).Take(1);
3
Enigmativity On

To start with, your description is a bit confusing. If your endpoint sends a value without completing then you subscription should still fire fine. It just won't end without the completion or disposing of the subscription.

That can be easily fixed. Just add a .Take(1).

endpoint.Take(1).Subscribe(t => { doSomething(); });

It will complete automatically after the first value.

If, instead there is a delay in the value arriving and you want to put a timeout then you have two options.

(1) You can add the .Timeout(timeout) operator (if timeout is a TimeSpan).

endpoint
    .Timeout(timeout)
    .Subscribe(
        t => { doSomething(); },
        ex => { /* timeout here */ });

(2) You can use the .Amb operator - which produces a value based on which of the two input observables fires first - to produce a default value after the length of time if a real value doesn't come.

endpoint
    .Take(1)
    .Amb(Observable.Timer(timeout).Select(_ => -1))
    .Subscribe(t => { doSomething(); });

Here you get t == -1 if it times out - assuming that endpoint is producing an integer.

All of these options cancel when the subscription is disposed - so no need to muck around the cancellation tokens.