TBB Flow Graph: how wait for a specific token from the output?

992 Views Asked by At

I'm trying to write an adapter to Flow Graph that imitates a pipeline-like synchronous function call. But I don't understand how to block and wait for the output for a specific token. Calling wait_for_all on the graph doesn't help as I don't need to wait for all values. Can anybody suggest a solution?

template <typename TOutput, typename TInput>
class FlowPathAdapter {
public:
    TOutput operator()(const TInput& val) {
        m_input->try_put(val);
        TOutput result;
        // What should be done here to ensure that
        // m_output returns the result corresponding to this specific token?
        m_output->try_get(result);
        return result;
    }

private:
    // input and output are connected in some graph constructed outside the adapter
    std::shared_ptr<tbb::flow::receiver<TInput>> m_input;
    std::shared_ptr<tbb::flow::sender<TOutput>> m_output;
};
2

There are 2 best solutions below

0
On

Waiting is generally avoided in TBB; waiting is non-productive.

The timing of task execution in TBB is also not guaranteed because we depend on the OS for scheduling. That is, doing out-of-band signaling such as an atomic operation will not occur after the message from a node is forwarded, so the result will not be ready. You would also have to use a queue_node or other buffering node to hold the result.

Given that, if you wish to do an explicit wait you can make a function_node that receives the final answer, assigns it to a result location, and does a signal to tell the result is ready. (the signal should be an atomic to enforce fencing.) Your other task can spin-wait on the atomic. Or you can use a condition variable.

Regards, Chris

1
On

The receiver and sender class are "pure virtual" classes (there are default behaviors for the try_* methods to simplify the definition of nodes.) For each of the flow::graph nodes they are overridden with the behavior particular to that class.

If you wish to create a particular kind of node (or an adapter), you should override the virtual methods. However, you can use a function_node to do what you are trying to do with this adapter. You can remove the try_get and try_put calls (which are part of the function_node's behavior.)

// notice TInput and TOutput are exchanged
template < typename TInput, typename TOutput >
class FlowPathAdapterBody {
public:
    TOutput operator()(const TInput& val) {
        // val has the value passed to us
        TOutput result;
        // computation turning val into result
        return result;
    }
};

tbb::flow::graph g;
// node is fully-parallel in this case
tbb::flow::function_node<int, int> fnode(g, tbb::flow::unlimited, FlowPathAdapterBody<int,int>());

// ...

You can attach multiple predecessors to the function_node's input, and multiple successors to its output.

By default the function_node has a buffer on its input, so even if the node is serial it will always accept inputs. If you wish it to reject inputs rather than buffer them, you can make the node a function_node<T,U,tbb::flow::rejecting>.

If you want more-advanced behavior (such as the ability to emit multiple messages per input, or optionally not emitting messages for certain inputs, or signaling special conditions, check out multifunction_node. A multifunction_node can have only one output if that is all that's needed.)

Regards, Chris