Intel TBB computational graph: how to specify input queue capacity of the node

512 Views Asked by At

I'm looking for C++ analog of .NET TPL Dataflow library.

In TPL Dataflow you can specify parallelism & blocks' capacity options. If the size of the block's input queue reaches it's capacity then the execution of the producer of the corresponding block is suspended:

var buffer = new BufferBlock<int>(new DataflowBlockOptions() { BoundedCapacity = 10 });

var producer = new Task(() => { 
    for (int i = 0; i < 1000; i++) {
        buffer.Post(i);
    }
});

var fstAction = new TransformBlock<int, int>(async (i) => {
    return i*i;
}, MaxDegreeOfParallelism = 4, BoundedCapacity = 10);

var sndAction = new ActionBlock<int>(async (i) => {
    Thread.Sleep(5000);
    Console.WriteLine(i);
}, MaxDegreeOfParallelism = 4, BoundedCapacity = 10);

buffer.LinkTo(fstAction, new DataflowLinkOptions() { PropagateCompletion = true });
fstAction.LinkTo(sndAction, new DataflowLinkOptions() { PropagateCompletion = true });

sndAction.Completition.Wait();

And I need similar functionality in C++. TBB seems a good choise but I cannot find how to specify the capacity on function_node/buffer_node. Here is an an example:

std::size_t exportConcurrency = 16;
std::size_t uploadConcurrency = 16;

flow::graph graph;

std::size_t count = 1000;
std::size_t idx = 0;

flow::source_node<std::vector<std::string>> producerNode(graph, [&count, &idx](auto& out) {
    out = { "0"s };
    return ++idx != count;
});

flow::function_node<std::vector<std::string>, std::string> exportNode(graph, exportConcurrency, [](auto& ids) {
    return "0"s;
});

flow::function_node<std::string, std::string> uploadNode(graph, uploadConcurrency, [](auto& chunk) {
    std::this_thread::sleep_for(5s);
    return "0"s;
});

flow::make_edge(producerNode, exportNode);
flow::make_edge(exportNode, uploadNode);

graph.wait_for_all();
1

There are 1 best solutions below

0
On

One can find in the official docs, that there are three recommended ways to limiting resource consumption, and one of them is using limiter_node:

One way to limit resource consumption is to use a limiter_node to set a limit on the number of messages that can flow through a given point in your graph.

It not the exact thing that you want, but still should be investigating. Also I was able to find the concurrent queue classes sections, which can be used with bounded capacity with set_capacity method. Maybe you can manage it out that way. Hope this helps.