I'm trying to wrap my head around "completion" in TPL Dataflow blocks. In particular, the TransformBlock doesn't seem to ever complete. Why?
Sample program
My code calculates the square of all integers from 1 to 1000. I used a BufferBlock and a TransformBlock for that. Later in my code, I await completion of the TransformBlock. The block never actually completes though, and I don't understand why.
static void Main(string[] args)
{
var bufferBlock = new BufferBlock<int>();
var calculatorBlock = new TransformBlock<int, int>(i =>
{
Console.WriteLine("Calculating {0}²", i);
return (int)Math.Pow(i, 2);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
using (bufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
foreach (var number in Enumerable.Range(1, 1000))
{
bufferBlock.Post(number);
}
bufferBlock.Complete();
// This line never completes
calculatorBlock.Completion.Wait();
// Unreachable code
IList<int> results;
if (calculatorBlock.TryReceiveAll(out results))
{
foreach (var result in results)
{
Console.WriteLine("x² = {0}", result);
}
}
}
}
At first I thought I created a deadlock situation, but that doesn't seem to be true. When I inspected the calculatorBlock.Completion task in the debugger, its Status property was set to WaitingForActivation. That was the moment when my brain blue screened.
The reason your pipeline hangs is that both
BufferBlockandTransformBlockevidently don't complete until they emptied themselves of items (I guess that the desired behavior ofIPropagatorBlocks although I haven't found documentation on it).This can be verified with a more minimal example:
This blocks indefinitely unless you add
bufferBlock.Receive();before completing.If you remove the items from your pipeline before blocking by either your
TryReceiveAllcode block, connecting anotherActionBlockto the pipeline, converting yourTransformBlockto anActionBlockor any other way this will no longer block.About your specific solution, it seems that you don't need a
BufferBlockorTransformBlockat all since blocks have an input queue for themselves and you don't use the return value of theTransformBlock. This could be achieved with just anActionBlock: