I am trying to use the epgm
transport in my simple publisher-subscriber program, but I am unable to do so. From what I understand, I am unable to supply a correct address string in bind
and connect
statements.
The publisher and subscriber can be running on same or different machines.
Below is the required code which usees tcp
transport and works correctly. It uses cppzmq
: https://github.com/zeromq/cppzmq.
Publisher code:
#include <zmq.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
int main () {
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://10.1.1.8:5000");
int i = 0;
while (1) {
int topic = 101;
zmq::message_t message(50);
snprintf ((char *) message.data(), 50, "%03d %10d %10d", topic, i, i);
//fprintf(stderr, "message: %s\n", (char *) message.data());
publisher.send(message);
++i;
}
return 0;
}
Subscriber code:
#include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <cassert>
int main (int argc, char *argv[]) {
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://10.1.1.8:5000");
const char *filter = "101 ";
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
zmq::message_t tp;
int maxx = 0;
for (int i = 0; i < 1000; ++i) {
zmq::message_t update;
int topic, a, b;
if(subscriber.krecv(&update, ZMQ_DONTWAIT)) {
//fprintf(stderr, "size of data received: %zd\n", sizeof(update.data()));
std::istringstream iss(static_cast<char*>(update.data()));
iss >> topic >> a >> b;
assert(a == b);
}
else {
--i;
}
maxx = a > maxx ? a : maxx;
}
fprintf(stderr, "maxx = %d\n", maxx);
return 0;
}
krecv
method that is used in subscriber:
inline bool krecv (message_t *msg_, int flags_ = 0) {
int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_);
if (nbytes >= 0)
return true;
if (zmq_errno () == EAGAIN)
return false;
return false;
}
I tried changing the bind
statement in publisher to following:
publisher.bind("epgm://10.1.1.8:5000");
publisher.bind("epgm://224.1.1.1:5000");
publisher.bind("epgm://eth0;224.1.1.1:5000");
publisher.bind("epgm://10.1.1.8;224.1.1.1:5000");
publisher.bind("epgm://localhost:5000");
For all 5 cases, the program crashes with Assertion failed: false (src/pgm_socket.cpp:165)
. For the 5th case (epgm://localhost:5000
), I also receive following warnings along with the crash:
Warn: Interface lo reports as a loopback device.
Warn: Interface lo reports as a non-multicast capable device.
How can I resolve this issue? I am guessing that the address change will be same in both publisher and subscriber?
I am using libpgm 5.2.122
with zeromq-4.1.3
.
Note that the machine has following interfaces:
eth0
(Ethernet) -- inet addr:10.1.1.8
ib0
(InfiniBand) -- inet addr:10.1.3.8
lo
(Local Loopback) -- inet addr:127.0.0.1
Try a
239.0.0.0/8
IP in your bind:publisher.bind("epgm://;239.0.0.1:5000");
Wikipedia: