Multithread proccesing events from epoll_wait with coroutines

232 Views Asked by At

I have such io context class:

    class IOContext
    {
    public:
        explicit IOContext(uint32_t eventPoolCount);
        ~IOContext();

        IOContext(IOContext&) = delete;
        IOContext& operator=(IOContext&) = delete;
        IOContext(IOContext&&) = delete;
        IOContext& operator=(IOContext&&) = delete;

        void processAwaitingEvents(int timeout);

        void scheduleOperation(std::derived_from<IOOperation> auto& operation)
        {
            // ok?
            if(0 == epoll_ctl(this->epollFD, EPOLL_CTL_ADD, operation.fd, &operation.settings))
                return;

            if(errno == EEXIST)
                // re-init event
                if(0 == epoll_ctl(this->epollFD, EPOLL_CTL_MOD, operation.fd, &operation.settings))
                    return;

            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

    private:
        tinycoro::Generator<IOOperation::CoroHandle> yieldAwaitingEvents(int timeout);

        std::unique_ptr<epoll_event[]> eventsList;
        const uint32_t eventPoolCount;
        int epollFD = -1;
    };

    //.
    //. //constructor, destructor
    //.
    void IOContext::processAwaitingEvents(int timeout)
    {
        for(auto& event : this->yieldAwaitingEvents(timeout))
        {
            event.resume();
        }
    }

    tinycoro::Generator<IOOperation::CoroHandle> IOContext::yieldAwaitingEvents(int timeout)
    {
        int eventCount = epoll_wait(this->epollFD, eventsList.get(), this->eventPoolCount, timeout);
        if(eventCount == -1)
        {
            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

        for(int i = 0; i < eventCount; ++i)
        {
            co_yield std::coroutine_handle<IOOperation>::from_address(eventsList[i].data.ptr);
        }
    }

and I want to enable multi-threaded processing for io events (IOOperation is the base class, which contains fd), so when some events are ready, first from N thread will get them and process them. So, I think that i could move eventsList to cpp as thread_local variable, like this:

    //.
    //. //constructor, destructor
    //.
    void IOContext::processAwaitingEvents(int timeout)
    {
        for(auto& event : this->yieldAwaitingEvents(timeout))
        {
            event.resume();
        }
    }

    tinycoro::Generator<IOOperation::CoroHandle> IOContext::yieldAwaitingEvents(int timeout)
    {
        thread_local std::unique_ptr<epoll_event[]> eventsList = std::make_unique<epoll_event[]>(this->eventPollCount);
        int eventCount = epoll_wait(this->epollFD, eventsList.get(), this->eventPoolCount, timeout);
        if(eventCount == -1)
        {
            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

        for(int i = 0; i < eventCount; ++i)
        {
            co_yield std::coroutine_handle<IOOperation>::from_address(eventsList[i].data.ptr);
        }
    }

But I'm not sure it's good idea, ie if this solution is safe (I do some tests, everything was OK, but despite this, I have doubts).
I could make yieldAwaitingEvents as a public function, but I don't think that giving raw coroutine handle is good idea...
Thanks!

EDIT: I change a little bit code - initializing of thread_local variable was moved to yieldAwaitingEvents function

0

There are 0 best solutions below