ZeroMQ: Using EPGM transport

3.6k Views Asked by At

I am trying to use the epgm transport in my simple publisher-subscriber program, but I am unable to do so. From what I understand, I am unable to supply a correct address string in bind and connect statements.

The publisher and subscriber can be running on same or different machines.

Below is the required code which usees tcp transport and works correctly. It uses cppzmq: https://github.com/zeromq/cppzmq.

Publisher code:

#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

int main () {
  zmq::context_t context (1);
  zmq::socket_t publisher (context, ZMQ_PUB);
  publisher.bind("tcp://10.1.1.8:5000");

  int i = 0;
  while (1) {
    int topic = 101;

    zmq::message_t message(50);
    snprintf ((char *) message.data(), 50, "%03d %10d %10d", topic, i, i);
    //fprintf(stderr, "message: %s\n", (char *) message.data());

    publisher.send(message);
    ++i;
  }
  return 0;
}

Subscriber code:

#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <cassert>

int main (int argc, char *argv[]) {
  zmq::context_t context (1);

  zmq::socket_t subscriber (context, ZMQ_SUB);
  subscriber.connect("tcp://10.1.1.8:5000");

  const char *filter = "101 ";
  subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));

  zmq::message_t tp;

  int maxx = 0;
  for (int i = 0; i < 1000; ++i) {
    zmq::message_t update;
    int topic, a, b;
    if(subscriber.krecv(&update, ZMQ_DONTWAIT)) {
      //fprintf(stderr, "size of data received: %zd\n", sizeof(update.data()));
      std::istringstream iss(static_cast<char*>(update.data()));
      iss >> topic >> a >> b;
      assert(a == b);
    }
    else {
      --i;
    }

    maxx = a > maxx ? a : maxx;
  }

  fprintf(stderr, "maxx = %d\n", maxx);
  return 0;
}

krecv method that is used in subscriber:

inline bool krecv (message_t *msg_, int flags_ = 0) {
  int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_);
  if (nbytes >= 0)
    return true;
  if (zmq_errno () == EAGAIN)
    return false;
  return false;
}

I tried changing the bind statement in publisher to following:

  1. publisher.bind("epgm://10.1.1.8:5000");
  2. publisher.bind("epgm://224.1.1.1:5000");
  3. publisher.bind("epgm://eth0;224.1.1.1:5000");
  4. publisher.bind("epgm://10.1.1.8;224.1.1.1:5000");
  5. publisher.bind("epgm://localhost:5000");

For all 5 cases, the program crashes with Assertion failed: false (src/pgm_socket.cpp:165). For the 5th case (epgm://localhost:5000), I also receive following warnings along with the crash:

Warn: Interface lo reports as a loopback device.
Warn: Interface lo reports as a non-multicast capable device.

How can I resolve this issue? I am guessing that the address change will be same in both publisher and subscriber?

I am using libpgm 5.2.122 with zeromq-4.1.3.

Note that the machine has following interfaces:

  1. eth0 (Ethernet) -- inet addr:10.1.1.8
  2. ib0 (InfiniBand) -- inet addr:10.1.3.8
  3. lo (Local Loopback) -- inet addr:127.0.0.1
2

There are 2 best solutions below

0
On

Try a 239.0.0.0/8 IP in your bind:

publisher.bind("epgm://;239.0.0.1:5000");

Wikipedia:

The 239.0.0.0/8 range is assigned by RFC 2365 for private use within an organization. From the RFC, packets destined to administratively scoped IPv4 multicast addresses do not cross administratively defined organizational boundaries, and administratively scoped IPv4 multicast addresses are locally assigned and do not have to be globally unique.

0
On

I have used epgm with zeromq on linux and it's tricky to configure correctly

Assuming you are using linux the read below, if not I have no experience with windows so disregard:

  1. epgm does not work with the loopback adapter on linux so forget that.
  2. Your eth0 should work. Is MCAST definitely enabled (check ifconfg)?
  3. Port usage, is the port already in use?

I link zeromq with openpgm and there are some rather special differences in the way port reuse works between different linux kernels.

I added some code to the openpgm repo to fix my issues with rhel7 https://github.com/steve-o/openpgm/pull/52

James