I thought it's very basic approach but I haven't found any example yet. I have one producer and one consumer and I want to finish the pipeline when at least x objects were processed. Additionally I need to know what objects have been received.
That's how I do it:
public class BlockTester
{
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
_worker = new TransformBlock<int, int>(s => s + s);
var buffer = new BufferBlock<int>();
var consumeTask = Consume(buffer);
_worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});
foreach (var value in Enumerable.Range(0,100))
{
_worker.Post(value);
}
_worker.Complete();
await buffer.Completion;
if(buffer.TryReceiveAll(out var received))
{
Console.WriteLine(string.Join(", ", received));
}
}
public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
{
var received = new List<int>();
while (await buffer.OutputAvailableAsync())
{
var current = buffer.Receive();
received.Add(current);
if (current > 25)
{
_worker.Complete();
}
}
return received;
}
}
I am a bit confused about the buffer.TryReceiveAll. What's the difference between awaiting the consume task and the TryReceiveAll? Why does TryReceiveAll is false in my scenario? I guess there's still something wrong with my approach to reach my goals.
Your
Consumemethod should be anActionBlock. There's no need to useOutputAvailableAsyncorTryRecieveAll. Replace theBufferBlockwith anActionBlockand do your processing within theActionBlock. It's not clear why you would need theTransformBlockeither unless you have more than one step in the process.Or with a complex message object:
Edit To answer the original question:
Because by the time
TryReceiveAllis ran theBufferBlockhas "completed". For a block to be completed it must contain 0 items in its output buffer. TheConsumemethod was pulling all the items out before the block was allowed to complete and you'd finally callTryRecieveAllon an empty block.