Im trying to learn asio by writing a library that connects to a specific device over a UDP connection.
When a device turns on it goes into broadcast mode where it broadcasts its identity to the network. So the host machine needs to listen on a udp port for packets that match the device format, and keeps track of devices that have already been detected as the device will continue to broadcast even after connection.
I did some prototyping in python, and an async generator lent
itself quite well to this use case. But in asio, the natural analog is the
asio::ip::tcp::acceptor.
I am expecting my custom acceptor to be run from within a multi-thread io_context because each device has a high data rate. So I think this means that I need to use a strand for the acceptor as well as each device controller to order the operations. So I have two question on how to use the strand with respect to this bit of example code:
template <typename Executor>
class Acceptor {
std::unordered_set<asio::ip::udp::endpoint> accepted_connections;
asio::basic_datagram_socket<asio::ip::udp, Executor> receive_sock;
std::shared_ptr<Device<Executor>> accept(asio::error_code& ec) {
std::array<std::byte, buffer_size> buffer;
typename asio::basic_datagram_socket<asio::ip::udp, Executor>::endpoint_type remote_endpoint;
receive_sock.receive_from(asio::buffer(buffer), remote_endpoint, {}, ec);
if (!ec) {
return {};
}
if (accepted_connections.contains(remote_endpoint)) {
return {};
}
if (auto ret = std::start_lifetime_as<Header>(buffer.data())->validate(); !ret) {
ec = ret.error();
return {};
}
accepted_connections.emplace(remote_endpoint.remote_endpoint());
return MakeDevice(remote_endpoint, buffer);
}
};
#1) How do I modify my blocking accept function to run inside the strand so
that any access to the accepted_connections set is serialized correctly? From
what I can tell, all of the options for execution on the strand (defer,
dispatch, execute, post) are allowed to push the function onto the
strands queue and execute later. This makes checking the error_code impossible
because if the executor runs the contents of this function later, when accept
returns immediately the ec variable wont yet be set. Due to the indirection
that supports different native sockets, looking at the source code for how
socket Acceptor::accept(endpoint& peer_endpoint, error_code& ec) works has
not been very informative.
#2) When implementing async_accept I think I need to use async_initiate to
make it compatible with other completion tokens. But how do incorporate the
strand into that so the accepted_connections set is kept in order? Do I await
the strand inside my coroutine, or do I need to do something to my
async_initiate to make the coroutine run in the strand.
First off:
I've beaten your sample into
submissiona self-contained example: https://coliru.stacked-crooked.com/a/b70927fe6853d5bfThat out of the way, let's answer your questions:
Question #1
Blocking operations should not be run on the service.
Even if so, you would require the user to switch to the appropriate context instead of doing that "transparently" for the user².
Yes. If you must shoehorn the sync accept into a blocking handler¹ the typical pattern would look like (see How can I get a future from boost::asio::post?):
So in your example:
I'm not sure what it buys you, but it does address your question.
Question #2
That's one way to make composed operations, yes: https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/examples/cpp20_examples.html#boost_asio.examples.cpp20_examples.operations
The latter. Again, like under #1 the caller is responsible for calling the initiation on the strand, IFF the implementation requires that (it's up to document whether that is required). However, you will at least want the map access to happen on the strand, and also honor the callers executor specification, if any.
To this end Asio service objects have associated executors which will be default if the completion token has no executor associated. Composed operations must use the associated executor for any (intermediate) handlers it invokes.
Note how this is at odds with your goal of forcing specific intermediate step(s) to be on the strand. I think in this case the best you can do is dispatch to resume the intermediate completion to the strand:
Here's a tested example which exercises different completion token types, including bound ones and c++20 coroutines:
Live On Coliru
Online output:
Locally, with a steady stream of faux devices:
We get:
¹ (hint: don't; it will easily cause your threads to soft lock)
² The latter more closely approximates Active Object Pattern (see e.g. boost::asio and Active Object for inspiration)