- Will generate exceptions that cannot be caught
-
boost::asio::detail::cancellation_handler_base::call[virtual] == 0xFFFFFFFFFFFFFFFF。 /// Emits the signal and causes invocation of the slot's handler, if any. void emit(cancellation_type_t type) { if (handler_) handler_->call(type); }
I can't find where the problem is anymore, the exception cannot be caught
ServerDome:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <cstdio>
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using boost::asio::thread_pool;
using std::chrono::steady_clock;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;
using boost::asio::steady_timer;
namespace this_coro = boost::asio::this_coro;
using ssl_socket = boost::asio::ssl::stream<boost::asio::ip::tcp::socket>;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace std::literals::chrono_literals;
using namespace boost::asio::experimental::awaitable_operators;
awaitable<void> echo(tcp::socket socket)
{
steady_timer timer(co_await this_coro::executor);
try
{
char data[2000];
for (;;)
{
timer.expires_after(2ms);
auto result1 = co_await(async_read(socket, buffer(data, 2000), use_nothrow_awaitable) || timer.async_wait(use_nothrow_awaitable));
if (result1.index() == 1) {
//std::cout << "time out." << std::endl;
}
else {
auto [e,n] = std::get<0>(result1);
if (!e)
{
if (n)
co_await async_write(socket, boost::asio::buffer(data, n), use_awaitable);
}
else {
std::cout << e.message() << std::endl;
}
}
}
}
catch (std::exception& e)
{
std::printf("echo Exception: %s\n", e.what());
}
}
awaitable<void> listener()
{
auto executor = co_await this_coro::executor;
tcp::acceptor acceptor(executor, { tcp::v4(), 5555 });
for (;;)
{
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, echo(std::move(socket)), detached);
}
}
int main(int argc, char* argv[])
{
try
{
thread_pool pol(8);
boost::asio::signal_set signals(pol, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { pol.stop(); });
co_spawn(pol, listener(), detached);
pol.wait();
}
catch (std::exception& e)
{
std::printf("Exception: %s\n", e.what());
}
}
ClientDome:
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
#include <cstdio>
using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::steady_timer;
using boost::asio::use_awaitable;
using std::chrono::steady_clock;
using boost::asio::thread_pool;
using boost::asio::as_tuple;
using boost::asio::buffer;
using boost::asio::experimental::channel;
using boost::asio::io_context;
namespace this_coro = boost::asio::this_coro;
constexpr auto use_nothrow_awaitable = boost::asio::experimental::as_tuple(boost::asio::use_awaitable);
using namespace boost::asio::experimental::awaitable_operators;
using namespace std::literals::chrono_literals;
thread_pool io_context(3);
awaitable<void> timeout(steady_clock::duration duration)
{
steady_timer timer(co_await this_coro::executor);
timer.expires_after(duration);
co_await timer.async_wait(use_nothrow_awaitable);
}
awaitable<void> echo3(tcp::socket socket)
{
try
{
char data[4024];
for (;;)
{
co_await timeout(std::chrono::milliseconds(2));
co_await async_write(socket, boost::asio::buffer(data, 2000), use_awaitable);
}
}
catch (const std::exception& e)
{
std::printf("echo2 Exception: %s\n", e.what());
}
}
awaitable<void> listener(tcp::socket socket)
{
try
{
auto executor = co_await this_coro::executor;
auto listen_endpoint =
*tcp::resolver(socket.get_executor()).resolve("127.0.0.1", std::to_string(5555),
tcp::resolver::passive).begin();
co_await socket.async_connect(listen_endpoint, use_awaitable);
co_spawn(executor, echo3(std::move(socket)), detached);
}
catch (const std::exception& e)
{
std::printf("connect Exception: %s\n", e.what());
}
}
int main()
{
try
{
boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { io_context.stop(); });
for (size_t i = 0; i < 3; i++)
{
co_spawn(io_context, listener(std::move(tcp::socket(io_context))), detached);
}
io_context.wait();
}
catch (std::exception& e)
{
std::printf("main Exception: %s\n", e.what());
}
}
I expect the program to run without errors and exceptions
There are a two main things:
Strands
I noticed sparsely occuring SEGV in
deadline_timer_service. I noticed that you have a multi-threaded server context,but didn't guard the connections (echosessions with strand). To be completely honest, I didn't intuitively think you would require one, but I'm thinking that theparallel_groupthat implements the||awaitable operator uses a shared cancellation mechanism that may not be threadsafe. In that case, a strand might be required:With that the SEGV goes away.
Deadlock
I noticed client and server stop progressing after a while. After counting lines I noticed it would freeze up after the same amount of messages transferred each time: ~7800x2000 bytes. That's about 5GiB per socket.
I figured that the kernel might just block writes when the read buffer is at maximum size, because the client never reads the echoed packets in the client.
So, I added that. The neat thing to do would be to
co_spawna full-duplex coroutine, but since we know that all packets are echoed 1:1, I did the simple thing and put the read inline with the write loop.Effectively we replace the
sleep(2ms)with a "read indefinitely" (here it's important that thedatabuffer has a higher capacity than the maximum expected reply) limited to 2ms:Combined Test Program
With that change, all is well, and the server/client kept running without failures, tripping UBSan/ASan or deadlocking.
This program (potentially) combines client and server, time limited to 30s only for COLIRU. It logs frequent messages once-in-a-thousand times:
Live On Coliru ²
Local demo for clarity:
² also using
experimental::as_tuplefor COLIRU