Returning a value from a Co-routine started via boost::asio::co_spawn

236 Views Asked by At

I have some asio code which is used for transferring a file from a client. This code is working correctly and is as follows:

bool FileTransferBehaviour::ConnectAndTransferFile(const std::string& _sender,
                                               const std::string& _filePath,
                                               size_t _numBytes)
{
  // use a coroutine to accept the connection and receive the file.
  bool result = false;

  asio::co_spawn(m_ioContext,
     [&result, &filePath, _numBytes, this]() mutable
      -> asio::awaitable<void> {
        auto maybeSock = co_await CoAwaitClientConnection();
        if (maybeSock.has_value()) {
            result = ReceiveFile(maybeSock.value(),                                               filePath, _numBytes);
        }
    }, asio::detached);

  m_ioContext.restart();
  m_ioContext.run();

  return result;
}

Currently a result is set in the internal lambda expression. I would prefer to return this result from the coroutine instead of capturing a local variable by reference, i.e.

bool FileTransferBehaviour::ConnectAndTransferFile(const std::string& _sender,
                                               const std::string& _filePath,
                                               size_t _numBytes)
{
  // use a coroutine to accept the connection and receive the file.
  bool result = co_await asio::co_spawn(m_ioContext,
     &filePath, _numBytes, this]() mutable
      -> asio::awaitable<bool> {
        auto maybeSock = co_await CoAwaitClientConnection();
        if (maybeSock.has_value()) {
            co_return ReceiveFile(maybeSock.value(),                                               filePath, _numBytes);
        }
        co_return false;
    }, asio::detached);

  m_ioContext.restart();
  m_ioContext.run();

  return result;
} 

When I change the return type of the lambda the code fails with

FileTransferBehaviour.cpp(101,42): error C2228: left of '.await_ready' must have 
class/struct/union [MyProject,vcxproj]
FileTransferBehaviour.cpp(101,17): error C2440: 'initializing': cannot convert from 
'void' to 'bool' [MyProject.vcxproj]

I am using C++20 with CL version 19.37.32824 (Visual Studio 2022)

With GCC 11.4.0 I get the following error:

FileTransferBehaviour.cpp:101:19: error: unable to find the promise type for this coroutine
  101 |     bool result = co_await asio::co_spawn(m_ioContext,

What am I missing?

2

There are 2 best solutions below

3
On

In your first version, ConnectAndTransferFile is not a coroutine. It is a function that spawns a coroutine that happens to be defined as a lambda within the function. This coroutine is handed off to ASIO, and your outer non-coroutine essentially waits for it to finish before returning.

This is a synchronous operation.

In your second version, you have changed ConnectAndTransferFile to become a coroutine. That's what happened when you did co_await asio::co_spawn(...) directly in ConnectAndTransferFile.

Of course, if a function is a coroutine, then it cannot return something; it must co_return it. Also, the function's signature now needs to indicate the promise type to be used with that coroutine. This is typically done using the return value. In your case, you probably mean for it to return asio::awaitable<bool>.

But this means that the code that invoked the coroutine has to wait on it to extract the bool result. Your coroutine probably also should not restart and run the context.

0
On

I'd suggest running the io service on some other thread. That way you can post a promise and await the future:

bool FileTransferBehaviour::ConnectAndTransferFile(         //
    std::string const& sender, std::string const& filePath) //
{
    std::promise<bool> p;
    std::future<bool>  f = p.get_future();

    // use a coroutine to accept the connection and receive the file.
    co_spawn(ioc,
        [=, this] -> asio::awaitable<bool> {
            if (auto s = co_await CoAwaitClientConnection(sender)) {
                bool r = co_await ReceiveFile(*s, filePath);
                co_return r;
            }
            co_return false;
        },
        [p = std::move(p)](std::exception_ptr e, bool r) mutable {
            if (e)
                p.set_exception(e);
            else
                p.set_value(r);
        });

    return f.get();
}

But see much simplified using asio::use_future below

With a working live example:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using NetSize  = boost::endian::big_uint64_t;
using asio::ip::tcp;
using std::filesystem::path;

struct FileTransferBehaviour {
    bool ConnectAndTransferFile(std::string const& sender, std::string const& filePath);

  private:
    using Sock     = asio::deferred_t::as_default_on_t<tcp::socket>;
    using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;

    asio::awaitable<std::optional<Sock>> CoAwaitClientConnection(std::string host) try {
        Sock s{ioc};
        co_await async_connect(s, co_await Resolver(ioc).async_resolve(host, "8989"));
        co_return s;
    } catch (...) {
        co_return std::nullopt;
    }

    asio::awaitable<bool> ReceiveFile(Sock& s, path filePath) try {
        NetSize header[2];
        co_await async_read(s, asio::buffer(header));

        auto& [content_size, name_len] = header;

        // read name and contents
        std::string       name(name_len, '\0');
        std::vector<char> data(content_size);
        co_await async_read(s, std::array{asio::buffer(name), asio::buffer(data)});

        // write target file
        std::ofstream ofs(filePath / name, std::ios::binary);
        ofs.write(data.data(), data.size());
        std::cout << "Wrote " << filePath / name << ", " << content_size << " bytes" << std::endl;
        co_return true;
    } catch (...) {
        co_return false;
    }

    asio::thread_pool ioc{1};
};

bool FileTransferBehaviour::ConnectAndTransferFile(         //
    std::string const& sender, std::string const& filePath) //
{
    std::promise<bool> p;
    std::future<bool>  f = p.get_future();

    // use a coroutine to accept the connection and receive the file.
    co_spawn(ioc,
        [=, this] -> asio::awaitable<bool> {
            if (auto s = co_await CoAwaitClientConnection(sender)) {
                bool r = co_await ReceiveFile(*s, filePath);
                co_return r;
            }
            co_return false;
        },
        [p = std::move(p)](std::exception_ptr e, bool r) mutable {
            if (e)
                p.set_exception(e);
            else
                p.set_value(r);
        });

    return f.get();
}

void demoServer(uint16_t port, std::string name) {
    asio::io_context  ioc;
    tcp::acceptor     acc{ioc, {{}, port}};
    tcp::socket client = acc.accept();

    std::ifstream     ifs(name, std::ios::binary);
    std::vector<char> payload(std::istreambuf_iterator{ifs}, {});

    NetSize content_size(payload.size()), name_len(name.length());

    std::cout << "Sending " << quoted(name) << ", " << content_size << " bytes" << std::endl;
    write(client,
          std::array{
              asio::buffer(&content_size, sizeof(content_size)),
              asio::buffer(&name_len, sizeof(name_len)),
              asio::buffer(name),
              asio::buffer(payload),
          });

    std::cout << "Server shutting down" << std::endl;
}

int main() {
    std::thread server(demoServer, 8989, "main.cpp");

    FileTransferBehaviour ftb;
    bool b = ftb.ConnectAndTransferFile("127.0.0.1", "outputdir");
    std::cout << "Received: " << std::boolalpha << b << std::endl;

    server.join();
    std::cout << "Bye" << std::endl;
}

Tested with

g++ -std=c++2b -O2 -Wall -pedantic -pthread main.cpp
mkdir -pv outputdir && ./a.out; md5sum main.cpp outputdir/*

Which prints

mkdir: created directory 'outputdir'

Sending "main.cpp", 3370 bytes
Server shutting down
Wrote "outputdir/main.cpp", 3370 bytes
Received: true
Bye

a6d1a03fd575856c9180919ebd710c24  main.cpp
a6d1a03fd575856c9180919ebd710c24  outputdir/main.cpp

Notes

It seems easier to just set the promise in a void awaitable:

bool FileTransferBehaviour::ConnectAndTransferFile(         //
    std::string const& sender, std::string const& filePath) //
{
    std::promise<bool> p;
    std::future<bool>  f = p.get_future();

    // use a coroutine to accept the connection and receive the file.
    co_spawn(ioc,
        [=, this, p = std::move(p)]() mutable -> asio::awaitable<void> {
            if (auto s = co_await CoAwaitClientConnection(sender)) {
                p.set_value(co_await ReceiveFile(*s, filePath));
            }
            p.set_value(false);
        },
        asio::detached);

    return f.get();
}

In fact, the "bool" and "optional" return values seem antipattern here to deal with exceptional situations. I'd simplify all the way:

void FileTransferBehaviour::ConnectAndTransferFile(std::string const& sender, std::string const& filePath) {
    return co_spawn(ioc, [=, this] -> asio::awaitable<void> {
                   auto s = co_await Connect(sender);
                   co_await ReceiveFile(s, filePath);
               },
               asio::use_future)
        .get();
}

Again with a Live Demo, this time also demonstrating error handling:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/endian/arithmetic.hpp>
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <iostream>
namespace asio = boost::asio;
using NetSize  = boost::endian::big_uint64_t;
using asio::ip::tcp;
using std::filesystem::path;

struct FileTransferBehaviour {
    void ConnectAndTransferFile(std::string const& sender, std::string const& filePath);

  private:
    using Sock     = asio::deferred_t::as_default_on_t<tcp::socket>;
    using Resolver = asio::deferred_t::as_default_on_t<tcp::resolver>;

    asio::awaitable<Sock> Connect(std::string host) {
        Sock s{ioc};
        co_await async_connect(s, co_await Resolver(ioc).async_resolve(host, "8989"));
        co_return s;
    }

    asio::awaitable<void> ReceiveFile(Sock& s, path filePath) {
        NetSize header[2];
        co_await async_read(s, asio::buffer(header));

        auto& [content_size, name_len] = header;

        // read name and contents
        std::string       name(name_len, '\0');
        std::vector<char> data(content_size);
        co_await async_read(s, std::array{asio::buffer(name), asio::buffer(data)});

        // write target file
        std::ofstream ofs(filePath / name, std::ios::binary);
        ofs.write(data.data(), data.size());
        std::cout << "Wrote " << filePath / name << ", " << content_size << " bytes" << std::endl;
    }

    asio::thread_pool ioc{1};
};

void FileTransferBehaviour::ConnectAndTransferFile(std::string const& sender, std::string const& filePath) {
    return co_spawn(
               ioc,
               [=, this]() mutable -> asio::awaitable<void> {
                   auto s = co_await Connect(sender);
                   co_await ReceiveFile(s, filePath);
               },
               asio::use_future)
        .get();
}

void demoServer(uint16_t port, std::string name) {
    asio::io_context  ioc;
    tcp::acceptor     acc{ioc, {{}, port}};
    tcp::socket client = acc.accept();

    std::ifstream     ifs(name, std::ios::binary);
    std::vector<char> payload(std::istreambuf_iterator{ifs}, {});

    NetSize content_size(payload.size()), name_len(name.length());

    std::cout << "Sending " << quoted(name) << ", " << content_size << " bytes" << std::endl;
    write(client,
          std::array{
              asio::buffer(&content_size, sizeof(content_size)),
              asio::buffer(&name_len, sizeof(name_len)),
              asio::buffer(name),
              asio::buffer(payload),
          });

    std::cout << "Server shutting down" << std::endl;
}

int main() {
    FileTransferBehaviour ftb;

    {
        std::thread server(demoServer, 8989, "main.cpp");

        ftb.ConnectAndTransferFile("127.0.0.1", "outputdir");
        std::cout << "Received!" << std::endl;
        server.join();
    }

    try {
        ftb.ConnectAndTransferFile("127.0.0.1", "outputdir"); // there is no server anymore
    } catch (boost::system::system_error const& se) {
        std::cout << "Second receive fails: " << se.code().message() << std::endl;
    }

    std::cout << "Bye" << std::endl;
}

Which prints:

mkdir: created directory 'outputdir'

Sending "main.cpp", 3091 bytes
Server shutting down
Wrote "outputdir/main.cpp", 3091 bytes
Received!

Second receive fails: Connection refused
Bye

4fe9a130e1e2f7516c0dfbe57a3e962a  main.cpp
4fe9a130e1e2f7516c0dfbe57a3e962a  outputdir/main.cpp