In the sample coding below, the source and IObservable and also part of a pipeline. The sink at the end of the pipeline gets all the messages. But the observer gets only the first and last messages.
public class Test
{
public static async Task Run()
{
var source = new BroadcastBlock<int>(x => x);
var transform = new TransformBlock<int, int>(x => x * 2);
var sink = new ActionBlock<int>(x => Console.WriteLine($"from ActionBlock: {x}"));
source.LinkTo(transform);
transform.LinkTo(sink);
var observable = source.AsObservable();
IDisposable subscription = observable.Subscribe(x => Console.WriteLine($"from observer: {x}"));
await source.SendAsync(1);
await source.SendAsync(2);
await source.SendAsync(3);
await source.SendAsync(4);
await source.SendAsync(5);
await source.SendAsync(6);
subscription.Dispose();
}
}
from observer: 1
from ActionBlock: 2
from ActionBlock: 4
from ActionBlock: 6
from ActionBlock: 8
from ActionBlock: 10
from ActionBlock: 12
from observer: 6
From a relevant GitHub issue regarding similar problems with the
WriteOnceBlock<T>:(Stephen Toub commented on Jun 30, 2022)
If I a was you I wouldn't mess with TPL Dataflow and Rx interop. It seems that these two technologies have not been rigorously tested together, and there are bugs lurking in there. Bugs that currently no one cares about, because no one actually uses these two technologies together to do anything serious.