TPL Dataflow SendAsync task never completes when block is linked

1.2k Views Asked by At

I was hoping for a clean solution for throttling a specific type of producer while the consumer is busy processing, without writing a custom block of my own. I had hoped the code below would have done exactly that, but once SendAsync blocks after the capacity limit hits, its task never completes, insinuating that the postponed message is never consumed.

_block = new TransformBlock<int, string>(async i =>
{
    // Send the next request to own input queue
    // before processing this request, or block
    // while pipeline is full. 
    // Do not start processing if pipeline is full!
    await _block.SendAsync(i + 1);
    // Process this request and pass it on to the
    // next block in the pipeline.
    return i.ToString();
}, 
// TransformBlock block has input and output buffers. Limit both, 
// otherwise requests that cannot be passed on to the next 
// block in the pipeline will be cached in this block's output 
// buffer, never throttling this block.
new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });

// This block is linked to the output of the 
// transform block. 
var action = new ActionBlock<string>(async i =>
{
    // Do some very long processing on the transformed element.
    await Task.Delay(1000);
}, 
// Limit buffer size, and consequently throttle previous blocks 
// in the pipeline.
new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
_block.LinkTo(action);

// Start running.
_block.Post(0);

I was wondering if there is any reason why the linked ActionBlock does not consume the postponed message.

1

There are 1 best solutions below

0
On

I faced the same problem as you. I didn't dig deep into implementation of LinkTo but I think it propogate message only when source block received some. I mean, there may be a case when source block have some messages in its input, but it will not process them until next Post/SendAsync it received. And that's your case.

Here is my solution and it's working for me.

First declare "engine"

/// <summary>
/// Engine-class (like a car engine) that produced a lot count (or infinite) of actions.
/// </summary>
public class Engine
{
    private BufferBlock<int> _bufferBlock;

    /// <summary>
    /// Creates source block that produced stub data.
    /// </summary>
    /// <param name="count">Count of actions. If count = 0 then it's infinite loop.</param>
    /// <param name="boundedCapacity">Bounded capacity (throttling).</param>
    /// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
    /// <returns>Source block that constantly produced 0-value.</returns>
    public ISourceBlock<int> CreateEngine(int count, int boundedCapacity, CancellationToken cancellationToken)
    {
        _bufferBlock = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

        Task.Run(async () =>
        {
            var counter = 0;
            while (count == 0 || counter < count)
            {
                await _bufferBlock.SendAsync(0);
                if (cancellationToken.IsCancellationRequested)
                    return;
                counter++;
            }
        }, cancellationToken).ContinueWith((task) =>
        {
            _bufferBlock.Complete();
        });

        return _bufferBlock;
    }
}

And then Producer that uses engine

/// <summary>
/// Producer that generates random byte blobs with specified size.
/// </summary>
public class Producer
{
    private static Random random = new Random();

    /// <summary>
    /// Returns source block that produced byte arrays. 
    /// </summary>
    /// <param name="blobSize">Size of byte arrays.</param>
    /// <param name="count">Total count of blobs (if 0 then infinite).</param>
    /// <param name="boundedCapacity">Bounded capacity (throttling).</param>
    /// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
    /// <returns>Source block.</returns>
    public static ISourceBlock<byte[]> BlobsSourceBlock(int blobSize, int count, int boundedCapacity, CancellationToken cancellationToken)
    {
        // Creating engine with specified bounded capacity.
        var engine = new Engine().CreateEngine(count, boundedCapacity, cancellationToken);

        // Creating transform block that uses our driver as a source.
        var block = new TransformBlock<int, byte[]>(
            // Useful work.
            i => CreateBlob(blobSize),
            new ExecutionDataflowBlockOptions
            {
                // Here you can specify your own throttling. 
                BoundedCapacity = boundedCapacity,
                MaxDegreeOfParallelism = Environment.ProcessorCount,
            });
        // Linking engine (and engine is already working at that time).
        engine.LinkTo(block, new DataflowLinkOptions { PropagateCompletion = true });
        return block;
    }

    /// <summary>
    /// Simple random byte[] generator.
    /// </summary>
    /// <param name="size">Array size.</param>
    /// <returns>byte[]</returns>
    private static byte[] CreateBlob(int size)
    {
        var buffer = new byte[size];
        random.NextBytes(buffer);
        return buffer;
    }
}

Now you can use producer with consumer (eg ActionBlock)

        var blobsProducer = BlobsProducer.CreateAndStartBlobsSourceBlock(0, 1024 * 1024, 10, cancellationTokenSource.Token);

        var md5Hash = MD5.Create();

        var actionBlock = new ActionBlock<byte[]>(b => 
        {
            Console.WriteLine(GetMd5Hash(md5Hash, b));
        },
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 10 });

        blobsProducer.LinkTo(actionBlock);

Hope it will help you!