Nanomsg non-blocking bidirectional socket with multi-threaded application

1.6k Views Asked by At

I'm using Nanomsg to do IPC in my system using C++. I want to create a background thread to handle the message both send and recv. I use pair paradigm and I use nn_poll to check if the socket fd is writable or readable, if readable then read; if writable then pop one item from message queue and send. My problem is the backgroudn thread I create is using to much CPU usage since there is no sleep in nn_poll loop, is there any way to reduce the CPU usage but still make the latency like there is no sleep? Below are my example code. Thanks.

Server.cpp

#include <iostream>
#include <thread>
#include <string>
#include <queue>
#include <utility>
#include <mutex>
#include <nanomsg/pair.h>
#include <nanomsg/nn.h>

class Nanomsg {
private:
    bool _server;
    bool _stop;

    int _sock;
    std::string _url;
    std::thread _th;
    std::queue<std::string> _queue;

    std::mutex _queueMutex;

    void _start() {
        _sock = nn_socket(AF_SP, NN_PAIR);

        if (_sock < 0) {
            std::cout << "failed to create socket" << std::endl;
            return;
        }

        int rc = 0;

        if (_server) {
            rc = nn_bind(_sock, _url.c_str());
        } else {
            rc = nn_connect(_sock, _url.c_str());
        }

        if (rc < 0) {
            std::cout << "failed to connect/bind socket" << std::endl;
            return;
        }

        struct nn_pollfd pfd{};
        pfd.fd = _sock;
        pfd.events = NN_POLLIN | NN_POLLOUT;

        while (!_stop) {
            std::cout << "ssasd" << std::endl;
            rc = nn_poll(&pfd, 1, 2000);

            if (rc == 0) {
                std::cout << "timeout" << std::endl;
                continue;
            }

            if (rc == -1) {
                std::cout << "error!" << std::endl;
                return;
            }

            if (pfd.revents & NN_POLLIN) {
                char *buf = nullptr;
                int rbs = nn_recv(_sock, &buf, NN_MSG, 0);

                if (rbs < 0) {
                    continue;
                }

                std::string r(buf, rbs);

                std::cout << "received [" << r << "]" << std::endl;

                nn_freemsg(buf);
            }

            if (pfd.revents & NN_POLLOUT) {
                std::cout << "asd" << std::endl;
                if (_queue.empty()) {
                    continue;
                }

                {
                    std::lock_guard<std::mutex> lock(_queueMutex);
                    auto msg = _queue.front();

                    std::cout << "send [" << msg << "]" << std::endl;

                    rc = nn_send(_sock, msg.c_str(), msg.length(), 0);
                    if (rc >= 0) {
                        _queue.pop();
                    }
                }
            }
        }

    }

public:
    Nanomsg() : _sock(0), _server(false), _stop(false), _url("ipc:///tmp/test.ipc") {

    }

    Nanomsg(std::string url, bool server) : _url(std::move(url)), _sock(0), _server(server), _stop(false) {

    }

    void start() {
        _th = std::thread([=]() {
            _start();
        });
    }

    void stop() {
        _stop = true;

        if (_th.joinable()) {
            _th.join();
        }
    }

    void send(const std::string& msg) {
        {
            std::lock_guard<std::mutex> lock(_queueMutex);
            _queue.push(msg);
        }
    }

};

int main() {

    Nanomsg server("ipc:///tmp/test.ipc", true);

    server.start();

    while (true) {
        server.send("test");
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }

    return 0;
}

Client.cpp

#include <iostream>
#include <thread>
#include <string>
#include <queue>
#include <utility>
#include <mutex>
#include <nanomsg/pair.h>
#include <nanomsg/nn.h>

struct nn_pollf {
    int fd;
    short events;
    short revents;
};

class Nanomsg {
private:
    bool _server;
    bool _stop;

    int _sock;
    std::string _url;
    std::thread _th;
    std::queue<std::string> _queue;

    std::mutex _queueMutex;

    void _start() {
        _sock = nn_socket(AF_SP, NN_PAIR);

        if (_sock < 0) {
            std::cout << "failed to create socket" << std::endl;
            return;
        }

        int rc = 0;

        if (_server) {
            rc = nn_bind(_sock, _url.c_str());
        } else {
            rc = nn_connect(_sock, _url.c_str());
        }

        if (rc < 0) {
            std::cout << "failed to connect/bind socket" << std::endl;
            return;
        }

        struct nn_pollfd pfd{};
        pfd.fd = _sock;
        pfd.events = NN_POLLIN | NN_POLLOUT;

        while (!_stop) {
            std::cout << "ssasd" << std::endl;
            rc = nn_poll(&pfd, 1, 2000);

            if (rc == 0) {
                std::cout << "timeout" << std::endl;
                continue;
            }

            if (rc == -1) {
                std::cout << "error!" << std::endl;
                return;
            }

            if (pfd.revents & NN_POLLIN) {
                char *buf = nullptr;
                int rbs = nn_recv(_sock, &buf, NN_MSG, 0);

                if (rbs < 0) {
                    continue;
                }

                std::string r(buf, rbs);

                std::cout << "received [" << r << "]" << std::endl;

                nn_freemsg(buf);
            }

            if (pfd.revents & NN_POLLOUT) {
                std::cout << "asd" << std::endl;
                if (_queue.empty()) {
                    continue;
                }

                {
                    std::lock_guard<std::mutex> lock(_queueMutex);
                    auto msg = _queue.front();

                    std::cout << "send [" << msg << "]" << std::endl;

                    rc = nn_send(_sock, msg.c_str(), msg.length(), 0);
                    if (rc >= 0) {
                        _queue.pop();
                    }
                }
            }
        }

    }

public:
    Nanomsg() : _sock(0), _server(false), _stop(false), _url("ipc:///tmp/test.ipc") {

    }

    Nanomsg(std::string url, bool server) : _url(std::move(url)), _sock(0), _server(server), _stop(false) {

    }

    void start() {
        _start();
//        _th = std::thread([=]() {
//            _start();
//        });
    }

    void stop() {
        _stop = true;

        if (_th.joinable()) {
            _th.join();
        }
    }

    void send(const std::string& msg) {
        {
            std::lock_guard<std::mutex> lock(_queueMutex);
            _queue.push(msg);
        }
    }

};

int main() {

    Nanomsg client("ipc:///tmp/test.ipc", false);

    client.start();

    return 0;
}
1

There are 1 best solutions below

1
nullptr On

If there is nothing to send and nothing to receive, put your thread to sleep for a millisecond. Pretty much the only thing you can do in your current design.

If possible you could use nanomsg next generation (nng) and give its asynchronous interface a chance. Seems you are implementing an asynchronous interface yourself anyways, so why not use nanomsg for that as well? They have all the features of your OS's network API available and thus should be able to provide the best latency without wasting CPU time.

Create an async I/O handle and set a callback with nng_aio_alloc(3). Call nng_recv_aio(3) to get notified on reception of data. Don't manage your own send queue, use nng_send_aio(3) in void Nanomsg::send() instead.

Unfortunately nng is a separate library and your are using the classic nanomsg. I noticed that only mid-writing..