C++ nested coroutine destruction issue

1k Views Asked by At

I'm experimenting with clang-5 and its coroutine TS implementation. I'm trying to use it with boost asio but run into issues, it seems like my coroutine stack frame is destroyed twice but I'm having trouble figuring out what I'm doing wrong.

This is my simplified reproduction scenario:

#include <experimental/coroutine>
#include <boost/asio.hpp>
#include <iostream>

using namespace boost::asio;

inline auto async_connect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
    struct Awaiter
    {
        ip::tcp::socket& s;
        ip::tcp::endpoint ep;
        boost::system::error_code ec;
        bool ready = false;

        Awaiter(ip::tcp::socket& s, const ip::tcp::endpoint& ep) : s(s) , ep(ep) {}
        bool await_ready() { return ready; }
        void await_resume()
        {
            if (ec)
            {
                throw boost::system::system_error(ec);
            }
        }
        void await_suspend(std::experimental::coroutine_handle<> coro)
        {
            s.async_connect(this->ep, [this, coro] (auto ec) mutable {
                this->ready = true;
                this->ec = ec;
                std::cout << "Connect ready: resume " << coro.address() << "\n";
                coro.resume();
            });

            std::cout << "Connect initiated\n";
        }
    };

    return Awaiter(s, ep);
}

struct Future
{
    struct promise_type
    {
        bool _ready = false;
        std::experimental::coroutine_handle<> _waiter = nullptr;

        Future get_return_object()
        {
            auto coro = std::experimental::coroutine_handle<promise_type>::from_promise(*this);
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << coro.address() << "]\n";
            return Future(coro);
        }

        auto initial_suspend()
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
            return std::experimental::suspend_never();
        }

        auto final_suspend()
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
            return std::experimental::suspend_always();
        }

        void return_void()
        {
            _ready = true;
            if (_waiter)
            {
                std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "] resume waiter[" << _waiter.address() << "]\n ";
                _waiter.resume();
            }
        }

        void unhandled_exception()
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << std::experimental::coroutine_handle<promise_type>::from_promise(*this).address() << "]\n";
            std::terminate();
        }
    };

    Future() = default;
    Future(const Future&) = delete;
    Future& operator=(const Future&) = delete;
    Future& operator=(Future&& other)
    {
        _coro = other._coro;
        other._coro = nullptr;
        return *this;
    }

    explicit Future(std::experimental::coroutine_handle<promise_type> coro)
    : _coro(coro)
    {
        std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
    }

    Future(Future&& f)
    : _coro(f._coro)
    {
        f._coro = nullptr;
    }

    ~Future()
    {
        if (_coro)
        {
            std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
            _coro.destroy();
            //_coro = nullptr;
        }
    }

    bool await_ready()
    {
        assert(_coro);
        std::cout << __FUNCTION__ << ":" << __LINE__ << ": ready " << _coro.promise()._ready << std::endl;
        return _coro.promise()._ready;
    }

    void await_suspend(std::experimental::coroutine_handle<> callerCoro)
    {
        std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "] waiter [" << callerCoro.address() << "]\n";
        _coro.promise()._waiter = callerCoro;
    }

    void await_resume()
    {
        std::cout << __FUNCTION__ << ":" << __LINE__ << " [" << _coro.address() << "]\n";
    }

    void* address()
    {
        return _coro.address();
    }

    std::experimental::coroutine_handle<promise_type> _coro = nullptr;
};

struct RaiiCheck
{
    ~RaiiCheck()
    {
        std::cout << "******** ~RaiiCheck " << this << " ********\n";
    }
};

Future asioConnect(ip::tcp::socket& s, const ip::tcp::endpoint& ep)
{
    RaiiCheck rc;
    s.open(ep.protocol());
    co_await async_connect(s, ep);
}

template <typename Awaitable>
Future waitForIt(io_service& io, Awaitable&& awaitable)
{
    co_await awaitable;
    io.stop();
}

static Future performRequest(io_service& io)
{
    try
    {
        auto addr = ip::tcp::endpoint(ip::address::from_string("8.8.8.8"), 53);
        ip::tcp::socket socket(io);

        // Works
        //socket.open(addr.protocol());
        //co_await async_connect(socket, addr);

        // Crashes
        std::cout << "asioConnect\n";
        co_await asioConnect(socket, addr);
        assert(socket.is_open());
    }
    catch (const boost::system::system_error& e)
    {
        std::cerr << "Failed to parse http response: " << e.what() << "\n";
    }
}

int main(int, char**)
{
    io_service io;

    auto task = performRequest(io);

    Future fut;
    io.post([&] () {
        fut = waitForIt(io, task);
    });

    io.run();
}

It generates the following output:

get_return_object:51 [0x7fbbeed03c00]
Future:99 [0x7fbbeed03c00]
initial_suspend:57 [0x7fbbeed03c00]
asioConnect
get_return_object:51 [0x7fbbeed03d50]
Future:99 [0x7fbbeed03d50]
initial_suspend:57 [0x7fbbeed03d50]
Connect initiated
await_suspend:129 [0x7fbbeed03d50] waiter [0x7fbbeed03c00]
get_return_object:51 [0x7fbbeec028a0]
Future:99 [0x7fbbeec028a0]
initial_suspend:57 [0x7fbbeec028a0]
await_suspend:129 [0x7fbbeed03c00] waiter [0x7fbbeec028a0]
Connect ready: resume 0x7fbbeed03d50
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03d50] resume waiter[0x7fbbeed03c00]
await_resume:135 [0x7fbbeed03d50]
~Future:113 [0x7fbbeed03d50]
******** ~RaiiCheck 0x7fbbeed03e14 ********
return_void:69 [0x7fbbeed03c00] resume waiter[0x7fbbeec028a0]
await_resume:135 [0x7fbbeed03c00]
return_void:69 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeec028a0]
final_suspend:63 [0x7fbbeed03c00]
final_suspend:63 [0x7fbbeed03d50]
~Future:113 [0x7fbbeec028a0]
~Future:113 [0x7fbbeed03c00]
corotest(74949,0x7fffc163c3c0) malloc: *** error for object 0x7fbbeed03d50: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
Abort trap: 6

As you can see the RaiiCheck object in the asioConnect function gets destroyed twice.

The Future object inside the asioConnect function has the coroutine_handle with address 0x7fbbeed03d50. When this coroutine is resumed in the s.async_connect completion callback the RaiiCheck object is destroyed for the first time. Then I see the return_void function getting called which resumes the parent coroutine, which seems to resume the inner coroutine again causing another destruction of the RaiiCheck object.

It seems to be related to my nesting of coroutines because it works if I co_await directly on the async_connect call instead of putting the async_connect call in another coroutine.

Any hint to solve this issue is highly appreciated.

1

There are 1 best solutions below

2
On BEST ANSWER

You should only resume the waiter after final_suspend(), not in return_void(), because the coroutine frame is not unwound before final_suspend(), you'll have trouble calling destroy() before it's finally suspended.

The corrected code for final_suspend() looks like this:

auto final_suspend()
{
    struct awaiter
    {
        std::experimental::coroutine_handle<> _waiter;

        bool await_ready() { return false; }

        void await_suspend(std::experimental::coroutine_handle<> coro)
        {
            if (_waiter)
            {
                _waiter();
            }
        }

        void await_resume() {}
    };
    return awaiter{_waiter};
}