TPL Dataflow with Rx: Observer misses messages

58 Views Asked by At

In the sample coding below, the source and IObservable and also part of a pipeline. The sink at the end of the pipeline gets all the messages. But the observer gets only the first and last messages.

    public class Test
{
    public static async Task Run()
    {
        var source = new BroadcastBlock<int>(x => x);
        var transform = new TransformBlock<int, int>(x => x * 2);
        var sink = new ActionBlock<int>(x => Console.WriteLine($"from ActionBlock: {x}"));

        source.LinkTo(transform);
        transform.LinkTo(sink);

        var observable = source.AsObservable();

        IDisposable subscription = observable.Subscribe(x => Console.WriteLine($"from observer: {x}"));

        await source.SendAsync(1);
        await source.SendAsync(2);
        await source.SendAsync(3);
        await source.SendAsync(4);
        await source.SendAsync(5);
        await source.SendAsync(6);

        subscription.Dispose();
    }
}
from observer: 1
from ActionBlock: 2
from ActionBlock: 4
from ActionBlock: 6
from ActionBlock: 8
from ActionBlock: 10
from ActionBlock: 12
from observer: 6
1

There are 1 best solutions below

3
Theodor Zoulias On

From a relevant GitHub issue regarding similar problems with the WriteOnceBlock<T>:

at present WriteOnceBlock composability with AsObservable isn't perfect.

(Stephen Toub commented on Jun 30, 2022)

If I a was you I wouldn't mess with TPL Dataflow and Rx interop. It seems that these two technologies have not been rigorously tested together, and there are bugs lurking in there. Bugs that currently no one cares about, because no one actually uses these two technologies together to do anything serious.