How do I create a queue that holds boost::packaged_task<> with functions that return arbitrary types?

5.2k Views Asked by At

I'm trying to construct a work queue of functions that need to be executed by one thread and can be fed by many threads. To accomplish this, I was planning on using the boost::packaged_task and boost::unique_future. The idea would be you would do:

Foo value = queue.add(myFunc).get();

which would block, until the function is executed. So queue.add(...) takes in a boost::function, and returns a boost::unique_future. Internally it then creates a boost::packaged_task using the boost::function for its constructor.

The problem I'm running into is that boost::function<...> won't be the same every time. Specifically, the return value for it will change (the functions, however, will never take any parameters). Thus, I have to have an add function that looks something like:

template <typename ResultType>
boost::unique_future<ResultType> add(boost::function<ResultType ()> f) {
   boost::packaged_task<boost::function<ResultType ()> > task(f);
   queue.push_back(task);
   return task.get_future();
}

Okay, that doesn't seem too bad, but then I ran into the problem of how to define 'queue'. I think I have no choice but to use boost::any, since the types will not be constant:

std::list<boost::any> queue; // note: I'm not concerned with thread-safety yet

But then I run into a problem when I try to implement my executeSingle (takes just a single item off the queue to execute):

void executeSingle() {
    boost::any value = queue.back();
    boost::packaged_task<?> task = boost::packaged_task<?>(boost::move(value));
    // actually execute task
    task();
    queue.pop_back();
}

The '?' denote what I'm unsure about. I can't call executeSingle with a template, as it's called from a separate thread. I tried using boost::any, but I get the error:

  conversion from 'boost::any' to non-scalar type  boost::detail::thread_move_t<boost:thread>' requested.

The funny part is, I actually don't care about the return type of packaged_task at this point, I just want to execute it, but I can figure out the template details.

Any insight would be greatly appreciated!

3

There are 3 best solutions below

9
On BEST ANSWER

You should store boost::function<void()>'s. Note that boost::packaged_task<R>::operator() doesn't return anything; it populates the associated boost::future. In fact, even if it returned something you could still use boost::function<void()> since you'd still have no interest in the returned value: all you care about is to call queue.back()(). If this were the case boost::function<void()>::operator() would take care of discarding the returned value for you.

As a minor note, you might want to change the signature of your add method to be templated on a generic type Functor rather than a boost::function, and use boost::result_of to get the result type for boost::packaged_task.

My suggestion as a whole:

template<typename Functor>
boost::future<typename boost::result_of<Functor()>::type>
queue::add(Functor functor) // assuming your class is named queue
{
    typedef typename boost::result_of<Functor()>::type result_type;
    boost::packaged_task<result_type> task(functor);
    boost::unique_future<result_type> future = task.get_future();
    internal_queue.push_back(boost::move(task)); // assuming internal_queue member
    return boost::move(future);
}

void
queue::executeSingle()
{
    // Note: do you really want LIFO here?
    queue.back()();
    queue.pop_back();
}

EDIT

How to take care of move-semantics inside queue::add

typedef typename boost::result_of<Functor()>::type result_type;
typedef boost::packaged_task<result_type> task_type;
boost::shared_ptr<task_type> task = boost::make_shared<task_type>(functor);
boost::unique_future<result_type> future = task->get_future();

/* boost::shared_ptr is possibly move-enabled so you can try moving it */
internal_queue.push_back( boost::bind(dereference_functor(), task) );

return boost::move(future);

where dereference_functor could be:

struct dereference_functor {
    template<typename Pointer>
    void
    operator()(Pointer const& p) const
    {
        (*p)();
    }
};

You could also substitute the bind expression for the much clearer

boost::bind(&task_type::operator(), task)

which also doesn't require a custom functor. However if there are multiple overloads of task_type::operator() this might need disambiguation; the code could also break if a future change in the Boost.Thread introduce an overload.

0
On

Somewhat belatedly, but you might want to consider using Boost.Asio instead of rolling your own queue-runner solution.

While this grew up as an I/O library it does support asynchronous calls just like this. Simply define an io_service somewhere, run it inside a thread, and then post functors to get called on that thread.

1
On

You use old-fashioned virtual functions. Define a base class task_base with a virtual execute method, then define a template derived class which holds a specific task instance. Something along the lines:

struct task_base {
  virtual void execute() = 0;
};
template<typename ResultType>
struct task_holder : task_base {
  task_holder(boost::packaged_task<boost::function<ResultType ()> >&& task)
    : m_task(task) { }
  void execute() {
    m_task();
  }
private:
  boost::packaged_task<boost::function<ResultType ()> > m_task;
};

And define your queue to hold unique_ptr<task_base>. This is essentially what boost::any does, only you'd be using a specific function, namely execute.

NOTE: Untested code! And I'm still not very familiar with rvalue references. This is just to give you the idea of how the code would look.