boost asio priority queue, add async operation from handler

1.7k Views Asked by At

I have a problem with a design based on the priority queue example from boost asio. If I add a wrapped handler from within a handler it seems to get lost:

See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp for the example.

I have used everything as is and replaced the main() function with the code below:

//
// based on prioritised_handlers.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/asio.hpp>
#include <boost/function.hpp>
#include <iostream>
#include <queue>

using boost::asio::ip::tcp;

class handler_priority_queue
{
public:
  void add(int priority, boost::function<void()> function)
  {
    handlers_.push(queued_handler(priority, function));
  }

  void execute_all()
  {
    while (!handlers_.empty())
    {
      queued_handler handler = handlers_.top();
      handler.execute();
      handlers_.pop();
    }
  }

  // A generic wrapper class for handlers to allow the invocation to be hooked.
  template <typename Handler>
  class wrapped_handler
  {
  public:
    wrapped_handler(handler_priority_queue& q, int p, Handler h)
      : queue_(q), priority_(p), handler_(h)
    {
    }

    void operator()()
    {
      handler_();
    }

    template <typename Arg1>
    void operator()(Arg1 arg1)
    {
      handler_(arg1);
    }

    template <typename Arg1, typename Arg2>
    void operator()(Arg1 arg1, Arg2 arg2)
    {
      handler_(arg1, arg2);
    }

  //private:
    handler_priority_queue& queue_;
    int priority_;
    Handler handler_;
  };

  template <typename Handler>
  wrapped_handler<Handler> wrap(int priority, Handler handler)
  {
    return wrapped_handler<Handler>(*this, priority, handler);
  }

private:
  class queued_handler
  {
  public:
    queued_handler(int p, boost::function<void()> f)
      : priority_(p), function_(f)
    {
    }

    void execute()
    {
      function_();
    }

    friend bool operator<(const queued_handler& a,
        const queued_handler& b)
    {
      return a.priority_ < b.priority_;
    }

  private:
    int priority_;
    boost::function<void()> function_;
  };

  std::priority_queue<queued_handler> handlers_;
};

// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function f,
    handler_priority_queue::wrapped_handler<Handler>* h)
{
  h->queue_.add(h->priority_, f);
}
void low_priority_handler()
{
  std::cout << "Low priority handler\n";
}

int main()
{
  //
  // BASED ON prioritised_handlers.cpp
  // ~~~~~~~~~~~~~~~~~~~~~~~~
  //
  // Copyright (c) 2003-2011 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  //
  // Distributed under the Boost Software License, Version 1.0. (See accompanying
  // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  //



  //----------------------------------------------------------------------
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  handler_priority_queue pri_queue;

  // Post a completion handler to be run immediately.
  io_service.post(pri_queue.wrap(0, low_priority_handler));


  // Set a deadline timer to expire immediately.
  boost::asio::deadline_timer timer1(io_service);
  timer1.expires_at(boost::posix_time::neg_infin);
  timer1.async_wait(pri_queue.wrap(42, [](const boost::system::error_code& )
                                   {
                                     std::cout << "now" << std::endl;
                                   }));
  // Set a deadline timer to expire later.
  boost::asio::deadline_timer timer2(io_service, boost::posix_time::milliseconds(100));
  boost::asio::deadline_timer timer3(io_service, boost::posix_time::milliseconds(200));
  timer2.async_wait(pri_queue.wrap(100, [&pri_queue, &timer3](const boost::system::error_code& )
                                   {
                                     std::cout << "100ms" << std::endl;
                                     timer3.async_wait(pri_queue.wrap(100, [](const boost::system::error_code& )
                                                                      {
                                                                        std::cout << "200ms" << std::endl;
                                                                      }));

                                   }));

  while (io_service.run_one())
    {
      // The custom invocation hook adds the handlers to the priority queue
      // rather than executing them from within the poll_one() call.
      while (io_service.poll_one())
        ;

      pri_queue.execute_all();
    }
}

//g++ -std=c++14 -Wall -Werror  -rdynamic -lboost_system -lboost_thread -lboost_log -lpthread prioritised_handlers.cpp

This prints:

now
Low priority handler
100ms

And the 200ms printout from timer3 is missing. Based on my printf debugging approach the custom invocation hook asio_handler_invoke() never gets called for the "200ms" action. Unfortunately I cannot see why.

What is wrong with above approach?

updated the code with the rest of the example after the hint from Technik

2

There are 2 best solutions below

3
On BEST ANSWER

There's nothing wrong with any of the code, except for the fact that the way you're keeping (or rather not keeping) the application alive long enough for the last handler to complete.

You're running the io_service, polling and running each job. The jobs all get processed, but they are delayed, asynchronous calls. You're counting on the io_service calls to keep the application alive, but as far as the io_service is concerned, all the work is done, so it stops blocking, your main() function exits and you never see the output from the final handler.

To prove this to you, simply wrap the io_service in a ::work object, which will prevent the io_service from ever thinking that's it out of work to do and therefore always block.

boost::asio::io_service::work w(io_service);

Add this right before your while (io_service.run_one()) line.

1
On

I think there is a bug in Christopher's implementation:

  void execute_all()
  {
    while (!handlers_.empty())
    {
      queued_handler handler = handlers_.top();
      handler.execute(); //point1
      handlers_.pop(); //point2
    }
  }

If you add new element to std::priority_queue at point1, which is same priority level (100 in your case) - there is a possibility that handler will be added to top of heap. In this case pop() on point2 will pop-out our new handler without executing. In this case your io_service loop will be stopped because no more job.

I think you can swap lines point1 and point2 to fix that.