tl;dr: Is there a way to close a WebSocket that's currently doing (sync) read() operation, if server sends nothing for some time?
I wanted to make a simple WebSocket client with Boost::beast. When I realized that read() is a blocking operation, and that there's no way to tell if there's a message coming - I created a sleeper thread. All the thread does is read() and I can afford to have it blocked if no data is coming.
I want it to be able to close the connection so from non-blocked thread I shoot a websocket::close(). This causes read() to throw a BOOST_ASSERT() at me:
Assertion failed: ! impl.wr_close
How can I close the connection when (sync) read() is ongoing?
Code for reproduction of my scenario:
#include <string>
#include <thread>
#include <chrono>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
using namespace std::chrono_literals;
class HandlerThread {
enum class Status {
UNINITIALIZED,
DISCONNECTED,
CONNECTED,
READING,
};
const std::string _host;
const std::string _port;
std::string _resolvedAddress;
boost::asio::io_context _ioc;
boost::asio::ip::tcp::resolver _resolver;
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> _websocket;
boost::beast::flat_buffer _buffer;
bool isRunning = true;
Status _connectionStatus = Status::UNINITIALIZED;
public:
HandlerThread(const std::string& host, const uint16_t port)
: _host(std::move(host))
, _port(std::to_string(port))
, _ioc()
, _resolver(_ioc)
, _websocket(_ioc) {}
void Run() {
// isRunning is also useless, due to blocking boost::beast operations.
while(isRunning) {
switch (_connectionStatus) {
case Status::UNINITIALIZED:
case Status::DISCONNECTED:
if (!connect()) {
_connectionStatus = Status::DISCONNECTED;
break;
}
case Status::CONNECTED:
case Status::READING:
if (!read()) {
_connectionStatus = Status::DISCONNECTED;
break;
}
}
}
}
void Close()
{
isRunning = false;
_websocket.close(boost::beast::websocket::close_code::normal);
}
private:
bool connect()
{
// All here is copy-paste from the examples.
boost::system::error_code errorCode;
// Look up the domain name
auto const results = _resolver.resolve(_host, _port, errorCode);
if (errorCode) return false;
// Make the connection on the IP address we get from a lookup
auto ep = boost::asio::connect(_websocket.next_layer(), results, errorCode);
if (errorCode) return false;
_resolvedAddress = _host + ':' + std::to_string(ep.port());
_websocket.set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::request_type& req)
{
req.set(boost::beast::http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-coro");
}));
boost::beast::websocket::response_type res;
_websocket.handshake(res, _resolvedAddress, "/", errorCode);
if (errorCode) return false;
_connectionStatus = Status::CONNECTED;
return true;
}
bool read()
{
boost::system::error_code errorCode;
_websocket.read(_buffer, errorCode);
if (errorCode) return false;
if (_websocket.is_message_done()) {
_connectionStatus = Status::CONNECTED;
// notifyRead(_buffer);
_buffer.clear();
} else {
_connectionStatus = Status::READING;
}
return true;
}
};
int main() {
HandlerThread handler("localhost", 8080);
std::thread([&]{
handler.Run();
}).detach(); // bye!
std::this_thread::sleep_for(3s);
handler.Close(); // Bad idea...
return 0;
}
There is no such thing. You might be able to force something at the TCP stack (so, operating system, usually) level. E.g. disabling the network interface involved.
Note that most synchronous code can be trivially be transformed into asynchronous code with the exact same blocking semantics using
asio::use_future. That means you can use async deadlines. And those are supported by beast out of the box (basic your websocket onbeast::tcp_streaminstead ofasio::ip::tcp::socket)UPDATE
To the added code
Review/Simplify
Reduced the code removing unneeded bits and adding some fixes and demonstration handler notification so we can test the functioning:
Live On Coliru
Exercising it for demonstration:
Design Issues
I'd argue there are three design issues:
HandlerThreadis not a threaddetach()-ing threads makes them ungovernable by definition.Here's a fix, naively only time-limiting operations like in your example:
Live On Coliru
Note how the code got simpler, shorter, and we even print error information. It's achieved by using
use_futuretogether withbeast::tcp_stream::expires_after:Full Cancellation
To allow for /externally triggered/ cancellation (instead of a fixed deadline), we can cheat a little by using 2 threads, so one can be "clogged" doing blocking waits on the futures:
Live On Coliru
Now, the thing runs, reconnecting happily until the user enters
Stopon the terminal:No Cheats, C++20
In C++ using coro's you can have basically identical code being true async. This gets rid of the "cheating extra thread":
Live On Coliru
Not Using C++20
This is conceptually identical, but more tedious as it requires explicit callback functions:
Live On Coliru
Again, same behaviour: