BOOST_ASSERT failure raised in boost::fiber on Visual Studio "Debug" build

255 Views Asked by At

I run into issue with boost::fiber. My code is based on "work_stealing.cpp" example of boost::fiber. I decorated it a little bit. It can work now on Windows Subsystem Linux Ubuntu for both Debug and Release build. In fact, till last night it can work on Windows Visual Studio build... But today, we I attempt to run some test, a BOOST ASSERT failure raised on Debug build. The Release build can work...

I don't know why... so, do anyone know something about this? Why it's only on Windows Debug build? What did I do wrong?

I am using cmake as build tool, Visual Studio 2019 Community Edition as development tool. I also test on WSL Ubuntu 20.04 and macOS 10.15.x(cannot recall it...).

Thank you.

-Quan

The failure raised from the below code of boost:

// <boost>/lib/fiber/src/scheduler.cpp
...
void
scheduler::detach_worker_context( context * ctx) noexcept {
    BOOST_ASSERT( nullptr != ctx);
    BOOST_ASSERT( ! ctx->ready_is_linked() );
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
    BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
#endif
    BOOST_ASSERT( ! ctx->sleep_is_linked() );
    BOOST_ASSERT( ! ctx->terminated_is_linked() );
    BOOST_ASSERT( ! ctx->wait_is_linked() ); // <-- [THE ERROR RAISED FROM HERE!]
    BOOST_ASSERT( ctx->worker_is_linked() );
    BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
    ctx->worker_unlink();
    BOOST_ASSERT( ! ctx->worker_is_linked() );
    ctx->scheduler_ = nullptr;
    // a detached context must not belong to any queue
}
...

And my code is like below (I removed some not-related parts...):

//coroutine.h
class CoroutineManager {
  ...

  typedef std::unique_lock<std::mutex> lock_type;
  typedef boost::fibers::algo::work_stealing scheduler_algorithm;
  typedef boost::fibers::default_stack stack_type;
  
  ...
  void wait() {
    lock_type shutdown_lock(shutdown_mutex);
    shutdown_condition.wait(shutdown_lock, [this]() { return 1 == this->shutdown_flag; });
    BOOST_ASSERT(1 == this->shutdown_flag);
  }

  void shutdown(bool wait = true) {
    lock_type shutdown_lock(shutdown_mutex);

    this->shutdown_flag = 1;

    shutdown_lock.unlock();

    this->shutdown_condition.notify_all();

    if (wait) {
      for (std::thread& t : threads) {
        if (t.joinable()) {
          t.join();
        }
      }
    }
  }
  
  ...

  template <class TaskType>
  void submit(const TaskType& task) {
    boost::fibers::fiber(boost::fibers::launch::dispatch, std::allocator_arg, stack_type(this->stack_size), task).detach();
  }
  
  ...
  
  void init() {
    if (verbose) spdlog::info("INIT THREAD({0}) STARTED.", std::this_thread::get_id());

    for (int i = 0; i < (this->thread_count - 1); ++i) {
      threads.push_back(std::thread(&CoroutineManager::thread, this));
    }

    boost::fibers::use_scheduling_algorithm<scheduler_algorithm>(this->thread_count);

    this->start_barrier.wait();
  }

  void dispose() {
    if (verbose) spdlog::info("INIT THREAD({0}) DISPOSED.", std::this_thread::get_id());
  }

  void thread() {
    if(verbose) spdlog::info("WORKER THREAD({0}) STARTED.", std::this_thread::get_id());

    boost::fibers::use_scheduling_algorithm<scheduler_algorithm>(this->thread_count);

    this->start_barrier.wait();

    this->wait();

    if (verbose) spdlog::info("WORKER THREAD({0}) DISPOSED.", std::this_thread::get_id());
  }

  ...
};

- - - - - - - - - - - - - - - - - - - - - - -

// coroutine_test.cpp
...

int main(int argc, char* argv[]) {
  init_file_logger("coroutine_test.log");
  time_tracker tracker("[coroutine_test.main]");

  typedef std::thread::id tid_t;
  typedef boost::fibers::fiber::id fid_t;
  typedef boost::fibers::buffered_channel<std::tuple<tid_t, fid_t, int>> channel_t;

  CoroutineManager manager(std::thread::hardware_concurrency(), 1024 * 1024 * 8, true);

  const int N = 1;

  channel_t chan { 8 };

  boost::fibers::barrier done_flag(2);

  manager.submit([&chan, &done_flag]() {
    std::tuple<tid_t, fid_t, int> p;

    while( boost::fibers::channel_op_status::success == chan.pop_wait_for(p, std::chrono::milliseconds(100))) {
      spdlog::info("[thead({0}) : fiber({1}) from {2}]", std::get<0>(p), std::get<1>(p), std::get<2>(p));
    }

    done_flag.wait();
  });

  for (int i = 0; i < N; ++i) {
    manager.submit([&chan, i]() {
      for (int j = 0; j < 1000; ++j) {
        chan.push(std::move(std::make_tuple(std::this_thread::get_id(), boost::this_fiber::get_id(), i)));
        boost::this_fiber::yield();
      }
    });
  }

  done_flag.wait();

  spdlog::info("-START TO SHUTDOWN-");

  manager.shutdown();

  spdlog::info("-END-");

  return 0;
}

[UPDATE] Add a snapshot to explain my situation more clearly...

1

There are 1 best solutions below

1
On

I think I found how it happened (though not the root cause):

The failed assertion took place in buffered channel with push/pop_wait_for pair. If you changed to use try_push/pop pair, the exception goes away.

I am thinking it's in pop_wait_for, but as the stack information is very dim, I cannot figure it out. I am not a boost expert as well. Hopefully someone else can make it out.

So, my code now is like:

  manager.submit([&chan, &done_flag]() {
    std::tuple<tid_t, fid_t, int> p;

    for (;;) {
      boost::fibers::channel_op_status status = chan.pop(p);
      if (boost::fibers::channel_op_status::success == status) {
        spdlog::info("[thead({0}) : fiber({1}) from {2}]", std::get<0>(p), std::get<1>(p), std::get<2>(p));
      } else if (boost::fibers::channel_op_status::closed == status) {
        spdlog::info("[ off loop ]");
        break;
      }

      boost::this_fiber::yield();
    }
  });

  for (int i = 0; i < N; ++i) {
    manager.submit([&chan, i]() {
      for (int j = 0; j < 1000; ++j) {
        boost::fibers::channel_op_status status = chan.try_push(std::make_tuple(std::this_thread::get_id(), boost::this_fiber::get_id(), i));
        spdlog::warn("[ worker ] status : {0}", (int)status);
      }
    });
  }

And it was like:

  manager.submit([&chan, &done_flag]() {
    std::tuple<tid_t, fid_t, int> p;

    while( boost::fibers::channel_op_status::success == chan.pop_wait_for(p, std::chrono::milliseconds(100))) {
      spdlog::info("[thead({0}) : fiber({1}) from {2}]", std::get<0>(p), std::get<1>(p), std::get<2>(p));
    }
  });

  for (int i = 0; i < N; ++i) {
    manager.submit([&chan, i]() {
      for (int j = 0; j < 1000; ++j) {
        chan.push(std::make_tuple(std::this_thread::get_id(), boost::this_fiber::get_id(), i));
      }
    });
  }

Hopefully, this can help others who run into the same situation.