join_node graph flow construction

984 Views Asked by At

I am experimenting Intel Graph Flow from TBB. I am super satisfied by the results and I found the product amazing, with unlimited possibilities. However I was confronted to a pb that I fixed but I am not satisfied. The pb is the following.

   message
A ----------\       tuple<message,message>             WHATEVER
   message   join ------------------------- C------------------------
B ----------/

This pattern is applied when we want to sync and avoidto propagate n times a message (and his value). Intel provides an example which explained well the pb (and the solution - Intel example). My pb is the tupple constructed and the construction of the graph which used static approach. It is fully static, specially if the number of input edge (input_port<i> in the Intel example) to the join node are variables.

Does a guru of TBB-graph flow knows a "dynamic approach" to this pb ?

Best,

Tim [EDIT my code real pb]

I can do:

std::vector<tbb::flow::function_node<std::size_t, message>> vec_node;

for (int i(0) ; i < 3 ;++i)
        nodes_cont_.emplace_back(my_amazing_function_to_create_node(g_));

tbb::flow::make_edge(vec_node[0], tbb::flow::input_port<0>

tbb::flow::make_edge(vec_node[1], tbb::flow::input_port<1>(node_join_));
tbb::flow::make_edge(vec_node[2], tbb::flow::input_port<2>(node_join_));

I can not do:

for(int i(0); i < vec_node.size(); ++i)
    tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));

Due to the "tuple" and the "tbb::flow::input_port" function.

3

There are 3 best solutions below

5
On BEST ANSWER

The number of ports on a join node is static (determined at compile time.) If you want a variant number of inputs for one output, you need to be able to indicate which "port" the message came in on, as well as its value.

TBB has a variant type that encapsulates a port number and a value (it is the output of the indexer_node.) If you use that type (define an indexer_node, but don't instantiate it, and you can use the ::output_type of the node) as the input type for a multifunction_node (which can potentially have more than one output, but can have just one output), and let the function body of the multifunction_node decide when it has had the correct number of outputs, then you could store values as they are input, and when the multifunction_node sees the "correct " number of inputs, it can construct an output value and forward it to its successors.

The graph will look like:

One problem I see is you must define the output type of the multifunction_node. That is also a static declaration, though a variant tuple may be what you need there.

EDIT:

Let's make some simplifying assumptions:

  • Though N is not known at compile-time, it is known and invariant at runtime. Relaxing this constraint will require some extra data be passed with each message.
  • Though you were using a tuple, I believe that was because the output of join_nodes is a tuple (I tried to add vectors as a special case, but I don't think that is an option.) I assume all the function_nodes you have in the vector have the same type as output. That way we can avoid using a variant type.
  • The data we are passing is not particularly big (copy-construction is not super-expensive.) Relaxing this constraint will require a lot more care in accessing data in each node.
  • There is something that uniquely defines the messages that go together. For example, if you hand a read-only buffer of data to each function_node in the vector, the address of that buffer is the part that lets us know which messages to put together.

It has been a few years since I worked on TBB, so there may be some things I'm unaware of, but I can give you a sketch.

The graph will look like:

graph using vector of functions

(I am actually sketching out the structure of the tag-matching join, because it sounds like that is what you want.)

When you construct the vector of function_nodes, it is necessary each function_body know what its index is. In general this means the vector is of pointers to function_nodes, and each node is constructed with the index as one of its parameters.

I am assuming the source_node's output is something like a buffer. that buffer is passed to each function_node in the vector, and each function_node has an output type that is

  • the buffer address
  • the index of the function_node in that vector of nodes
  • other magical goodness

The multifuncton_node is where most of the work will be done. It has

  • a vector of hash_maps, indexed by that function_node index, and with the key of the buffer address, containing the results for each function_node for various buffers.
  • a hash_map with a key of the buffer address, containing a count of the nuber of elements received for that buffer. When it reaches N, you have all your inputs.

When the multifunction_node receives a message, it

  • adds the data to the hash_map[i][key] where i is the function_node index (in the input message),and key is the buffer address
  • increments hash_count[key]. If this is now N, then
  • construct a vector of the result values, pulling each out of the hash table for that index.
  • forward that value if you constructed it, otherwise just return.

There are some concerns for how the data is passed and stored, and how to clean up elements if you expect values to be reused, but that is the basic sketch.

1
On

I had a similar situation where the number of function nodes was not determined at compile time. In my case I solved this problem the following way:

  • Connect each incoming edge/flow graph node with one port of a queueing join node.
  • Also connect the same edge/node with a multifunction node. This multifunction node will not output anything until the number of the received inputs matches the number of incoming edges.
  • Connect the output port of the multifunction_node into each of the second ports of all the queueing join_nodes. When the internal count reaches the limit, the multifunction_node will release the buffers and their stored value will be forwarded to their corresponding output ports.

If you want to perform a single action with all the data instead of forwarding the incoming buffered messages, you could do that by subsequently calling try_get on all the buffering nodes' output ports instead.

using namespace flow = tbb::flow;

template <class Input, class Output>
struct DynamicJoinNode {
  using BufferNode = flow::join_node<std::tuple<Input, flow::continue_msg>, flow::queueing>;
  using ContinueNode = flow::multifunction_node<Input, std::tuple<flow::continue_msg>>;

  std::atomic<size_t> count;
  std::vector<BufferNode> buffers;
  ContinueNode customContinueNode;

  template <class InputNodeIt>
  DynamicJoinNode(flow::graph &graph, InputNodeIt first, InputNodeIt last) :
    count(0),
    buffering(std::distance(first, last), BufferNode(graph)),
    customContinueNode(graph, [this](Input in, ContinueNode::output_ports_type &out) {
      unsigned previous = count.load();
      unsigned desired;
      do {
       desired = previous + 1;
       if (desired == buffers.size())
         desired = 0; // reached the last element: reset the count
      } while (count.compare_exchange_weak(previous, desired));
      if (desired) {
        get<1>(out).try_put(flow::continue_msg{});
      }
    })
  {    
    for (auto bufferIt = buffers.begin(); first != last; ++first, ++bufferIt) {
      flow::make_edge(*first, flow::input_port<0>(*bufferIt));
      flow::make_edge(*first, customContinueNode);
      flow::make_edge(customContinueNode, flow::input_port<1>(*bufferIt));
    }
  }
};
0
On

If you know N at compile time for a specific program, but want to implement the graph in a generic way for a library to be used in different programs, BOOST_PP is an option for small N.

I have implemented a graph that generates a continue_msg after the slowest connected node outputs a continue_msg. For that, I needed a number of N buffer nodes and connect them to a join node with N ports with the same type (tbb::flow::continue_msg).

Basically, the code below does what you intended with

for(int i(0); i < vec_node.size(); ++i)
    tbb::flow::make_edge(vec_node[i], tbb::flow::input_port<i>(node_join_));

...but uses the precompiler to "write" multiple lines with the correct make_edge calls, but only up to N (for N < MY_JOIN_NODE_VARIADIC_MAX, that choice is arbitrary to limit it to "small" N):

    #include "boost/preprocessor/repetition/repeat_from_to.hpp"
    #include "boost/preprocessor/repetition/repeat.hpp"
    #include "boost/preprocessor/arithmetic/inc.hpp"

    ...

    #define MY_JOIN_NODE_VARIADIC_MAX 8
    #define MY_FUNC_IMPL(z, n, unused) tbb::flow::make_edge(vec_node[##n], tbb::flow::input_port<##n>(joinNode));
    #define MY_MAKE_IMPL(z, n, unused)                                  \
    template <size_t N, typename TJoinNode> void                        \
    makeAllEdges (TJoinNode& joinNode,                                  \
                     typename std::enable_if< N == n >::type * = 0)     \
    {                                                                   \
        BOOST_PP_REPEAT(n, MY_FUNC_IMPL, unused)                          \
    }
    BOOST_PP_REPEAT_FROM_TO(0, BOOST_PP_INC(MY_JOIN_NODE_VARIADIC_MAX), MY_MAKE_IMPL, unused)
    #undef MY_MAKE_IMPL
    #undef MY_FUNC_IMPL
    #undef MY_JOIN_NODE_VARIADIC_MAX

This code is a function definition. Then "makeAllEdges" can be called. (Note that in this example, I assume that makeAllEdges is a class method, and vec_node a member of the class, and as such known within the scoe of makeAllEdges.)