What is the most efficient ZeroMQ polling on a single PUB/SUB socket?

902 Views Asked by At

The ZeroMQ documentation mentions a zmq_poll as a method for multi-plexing multiple sockets on a single thread. Is there any benefit to polling in a thread that simply consumes data from one socket? Or should I just use zmq_recv?

For example:

/*                                                POLLING A SINGLE SOCKET */
while (true) {
   zmq::poll(&items[0], 1, -1);
   if (items[0].revents & ZMQ_POLLIN) {
      int size = zmq_recv(receiver, msg, 255, 0);
      if (size != -1) {
      // do something with msg
      }
   }
}

vs.

/*                                               NO POLLING AND BLOCKING RECV */
while (true) {
    int size = zmq_recv(receiver, msg, 255, 0);
    if (size != -1) {
        // do something with msg
    }
}

Is there ever a situation to prefer the version with polling, or should I only use it for multi-plexing? Does polling result in more efficient CPU usage? Does the answer depend on the rate of messages being received?

*** Editing this post to include a toy example ***

The reason for asking this question is that I have observed that I can achieve a much higher throughput on my subscriber if I do not poll (more than an order of magnitude)

#include <thread>
#include <zmq.hpp>
#include <iostream>
#include <unistd.h>
#include <chrono>

using msg_t = char[88];
using timepoint_t = std::chrono::high_resolution_clock::time_point;
using milliseconds = std::chrono::milliseconds;
using microseconds = std::chrono::microseconds;

/* Log stats about how many packets were sent/received */
class SocketStats {
   public:
      SocketStats(const std::string& name) : m_socketName(name), m_timePrev(now()) {}
      void update() {
         m_numPackets++;
         timepoint_t timeNow = now();
         if (duration(timeNow, m_timePrev) > m_logIntervalMs) {
            uint64_t packetsPerSec = m_numPackets - m_numPacketsPrev;
            std::cout << m_socketName << " : " << "processed " << (packetsPerSec) << " packets" << std::endl;
            m_numPacketsPrev = m_numPackets;
            m_timePrev = timeNow;
         }
      }
   private:
      timepoint_t now() { return std::chrono::steady_clock::now(); }
      static milliseconds duration(timepoint_t timeNow, timepoint_t timePrev) { 
         return std::chrono::duration_cast<milliseconds>(timeNow - timePrev);
      }
      timepoint_t m_timePrev;
      uint64_t m_numPackets = 0;
      uint64_t m_numPacketsPrev = 0;
      milliseconds m_logIntervalMs = milliseconds{1000};
      const std::string m_socketName;
};

/* non-polling subscriber uses blocking receive and no poll */
void startNonPollingSubscriber(){
   SocketStats subStats("NonPollingSubscriber");
   zmq::context_t ctx(1);
   zmq::socket_t sub(ctx, ZMQ_SUB);
   sub.connect("tcp://127.0.0.1:5602");
   sub.setsockopt(ZMQ_SUBSCRIBE, "", 0);

   while (true) {
      zmq::message_t msg;
      bool success = sub.recv(&msg);
      if (success) { subStats.update(); }
   }
}

/* polling subscriber receives messages when available */
void startPollingSubscriber(){
   SocketStats subStats("PollingSubscriber");
   zmq::context_t ctx(1);
   zmq::socket_t sub(ctx, ZMQ_SUB);
   sub.connect("tcp://127.0.0.1:5602");
   sub.setsockopt(ZMQ_SUBSCRIBE, "", 0);

   zmq::pollitem_t items [] = {{static_cast<void*>(sub), 0, ZMQ_POLLIN, 0 }};

   while (true) {
      zmq::message_t msg;
      int rc = zmq::poll (&items[0], 1, -1);
      if (rc < 1) { continue; }
      if (items[0].revents & ZMQ_POLLIN) {
         bool success = sub.recv(&msg, ZMQ_DONTWAIT);
         if (success) { subStats.update(); }
      }
   }
}

void startFastPublisher() {
   SocketStats pubStats("FastPublisher");
   zmq::context_t ctx(1);
   zmq::socket_t pub(ctx, ZMQ_PUB);
   pub.bind("tcp://127.0.0.1:5602");

   while (true) {
      msg_t mymessage;
      zmq::message_t msg(sizeof(msg_t));
      memcpy((char *)msg.data(), (void*)(&mymessage), sizeof(msg_t));
      bool success = pub.send(&msg, ZMQ_DONTWAIT);
      if (success) { pubStats.update(); }
   }
}

int main() {
    std::thread t_sub1(startPollingSubscriber);
    sleep(1); 
    std::thread t_sub2(startNonPollingSubscriber);
    sleep(1);
    std::thread t_pub(startFastPublisher); 
    while(true) {
       sleep(10);
    }
}
2

There are 2 best solutions below

6
On

Q : "Is there any benefit to polling in a thread that simply consumes data from one socket?"

Oh sure there is.

As a principal promoter of non-blocking designs, I always advocate to design zero-waiting .poll()-s before deciding on .recv()-calls.

Q : "Does polling result in more efficient CPU usage?"

A harder one, yet I love it:

This question is decidable in two distinct manners:

a) read the source-code of both the .poll()-method and the .recv()-method, as ported onto your target platform and guesstimate the costs of calling each v/s the other

b) test either of the use-cases inside your run-time ecosystem and have the hard facts micro-benchmarked in-vivo.

Either way, you see the difference.

What you cannot see ATM are the add-on costs and other impacts that appear once you try (or once you are forced to) extend the use-case so as to accomodate other properties, not included in either the former or the latter.

Here, my principal preference to use .poll() before deciding further, enables other priority-based re-ordering of actual .recv()-calls and other, higher level, decisions, that neither the source-code, nor the test could ever decide.

Do not hesitate to test first and if tests will seem to be inconclusive (on your scale of { low | ultra-low }-latency sensitivity), may deep into the source-code to see why.

1
On

Typically you will get the best performance by draining the socket before doing another poll. In other words, once poll returns, you read until read returns "no more data", at which point you call poll.

For an example, see https://github.com/nyfix/OZ/blob/4627b0364be80de4451bf1a80a26c00d0ba9310f/src/transport.c#L524

There are trade-offs with this approach (mentioned in the code comments), but if you're only reading from a single socket you can ignore these.