How to call a function when a work item is finished in Boost.Asio?

1.3k Views Asked by At

I would like to implement a command queue which handles incoming commands concurrently with a thread pool (so the queue grows temporarily when all threads are working). I would like to post a callback to the callers when a command worker is started and finished. My implementation is based on this example from the Asio website.

Is there a way to hook into these events and signal somehow? I would like to avoid the command functors knowing about the callbacks (since obviously I could call the callbacks inside the command functors).

Pseudocode to illustrate (initialization and error handling omitted for brevity):

class CommandQueue
{
public:
    void handle_command(CmdId id, int param)
    {
        io_service.post(boost::bind(&(dispatch_map[id]), param));

        // PSEUDOCODE:
        // when one of the worker threads start with this item, I want to call 
        callback_site.cmd_started(id, param);

        // when the command functor returns and the thread finished
        callback_site.cmd_finished(id, param);
    }

private:
    boost::asio::io_service io_service;
    asio::io_service::work work;
    std::map<CmdId, CommandHandler> dispatch_map; // CommandHandler is a functor taking an int parameter
    CallbackSite callback_site;
};

Is there a way to do this without having the command functors depend on the CallbackSite?

2

There are 2 best solutions below

0
On BEST ANSWER

So what you want is to build in something that happens when one of the run() commands starts process a command, and then does something on return.

Personally, I do this by wrapping the function call:

    class CommandQueue
    {
    public:
        void handle_command(CmdId id, int param)
        {
            io_service.post(boost::bind(&CommandQueue::DispatchCommand, this,id,param));
        }

    private:
        boost::asio::io_service io_service;
        asio::io_service::work work;
        std::map<CmdId, CommandHandler> dispatch_map; // CommandHandler is a functor taking an int parameter
        CallbackSite callback_site;
        void DispatchCommand(CmdId id, int param)
        {
          // when one of the worker threads start with this item, I want to call 
          callback_site.cmd_started(id, param);
          dispatch_map[id](param);
          // when the command functor returns and the thread finished
          callback_site.cmd_finished(id, param);
        }
    };

This is also the pattern I use when I want to handle exceptions in the dispatched commands. You can also post different events instead of running them inline.

0
On

My initial response would be that std::futures are what you want given that now even has built in support for them. However you have tagged this as so you will have to make do with boost::future.

Basically you pass in a boost::promise to the task you want to pass into asio but beforehand call get_future on it and store the future values which shares state with the promise. When the task finishes you can call promise::set_value. In another thread you can check to see if this has happened by calling future::is_ready (non-blocking) or future::wait (blocking) and then retrieve the value from it before calling the appropriate callback functions.

e.g. the value set could be a CmdId in your example to determine which callback to call.