I thought it's very basic approach but I haven't found any example yet. I have one producer and one consumer and I want to finish the pipeline when at least x objects were processed. Additionally I need to know what objects have been received.
That's how I do it:
public class BlockTester
{
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
_worker = new TransformBlock<int, int>(s => s + s);
var buffer = new BufferBlock<int>();
var consumeTask = Consume(buffer);
_worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});
foreach (var value in Enumerable.Range(0,100))
{
_worker.Post(value);
}
_worker.Complete();
await buffer.Completion;
if(buffer.TryReceiveAll(out var received))
{
Console.WriteLine(string.Join(", ", received));
}
}
public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
{
var received = new List<int>();
while (await buffer.OutputAvailableAsync())
{
var current = buffer.Receive();
received.Add(current);
if (current > 25)
{
_worker.Complete();
}
}
return received;
}
}
I am a bit confused about the buffer.TryReceiveAll. What's the difference between awaiting the consume task and the TryReceiveAll? Why does TryReceiveAll is false in my scenario? I guess there's still something wrong with my approach to reach my goals.
Your
Consume
method should be anActionBlock
. There's no need to useOutputAvailableAsync
orTryRecieveAll
. Replace theBufferBlock
with anActionBlock
and do your processing within theActionBlock
. It's not clear why you would need theTransformBlock
either unless you have more than one step in the process.Or with a complex message object:
Edit To answer the original question:
Because by the time
TryReceiveAll
is ran theBufferBlock
has "completed". For a block to be completed it must contain 0 items in its output buffer. TheConsume
method was pulling all the items out before the block was allowed to complete and you'd finally callTryRecieveAll
on an empty block.