My scenario is as follows: I'm using a high-level C++ binding for 0MQ zmqpp
.
I have a publisher socket (XPUB
), to which multiple subscribers connect(XSUB
).
The subscription message starts a cascade of events in my application (that's why I use XPUB
). I want to pass some extra information as a part of the subscription message, though I don't want it to be the part of subscription topic, so I put this extra information in the second frame of the message (it can't be sent as a separate message for race-condition reasons). Sadly (or not...) the minute the XPUB
socket sees the subscription byte (0x01
), it takes for granted that this message can only have one frame and splits it (at least I think this is the reason for such behavior). So the second frame comes as another message.
I though of reading both of these messages and restoring the original 2-frame message either by simply reading them one by one, or by using the receive_raw()
function of the socket
. My only fear is, that for some reason, the two frames of the original message shall be interleaved with messages from the other clients, working concurrently. I use a poll-in poller with zmqpp::reactor
on the XPUB
socket to receive the messages and forward them to the internal components, single threaded.
So, my questions is: do my fears have some basis or they are baseless?
This is the code that causes the behavior:
#include <zmqpp/zmqpp.hpp>
#include <iostream>
int main(int argc, char **argv)
{
using namespace zmqpp;
using namespace std;
char frame1[] = {1, 's', 'u', 'b', 's', '\0'};
char frame2[] = {'d','a','t','a', '\0'};
context_t context;
socket_t xpubSocket(context, socket_type::xpub);
xpubSocket.bind("inproc://pub");
socket_t xsubSocket(context, socket_type::xsub);
xsubSocket.connect("inproc://pub");
xsubSocket.send_raw(frame1, strlen(frame1), socket_t::send_more);
xsubSocket.send_raw(frame2, strlen(frame2));
//I changed the first byte to be non-subscription
char frame1_1[] = {3, 's', 'u', 'b', 's', '\0'};
xsubSocket.send_raw(frame1_1, strlen(frame1), socket_t::send_more);
xsubSocket.send_raw(frame2, strlen(frame2));
bool doContinue = true;
while(doContinue)
{
message_t incoming;
doContinue = xpubSocket.receive(incoming, true);
if(!doContinue)
{
break;
}
cout << endl <<"New Message: " << endl;
for(size_t i = 0 ; i < incoming.parts() ; ++i)
{
cout << "Frame: " << i << "\t" << incoming.get(i) << endl;
}
}
}
The output is:
New Message:
Frame: 0 subs
New Message:
Frame: 0 data
New Message:
Frame: 0 subs
Frame: 1 data
So, it looks like whenever the XPUB socket sees a subscription byte on, it assumes the message has only one frame.
I'm running this on ubuntu, here are package versions:
dpkg -l | grep zmq
ii libzmq3:i386 4.0.4+dfsg-2 i386 lightweight messaging kernel (shared library)
ii libzmq3-dev:i386 4.0.4+dfsg-2 i386 lightweight messaging kernel (development files)
ii libzmqpp-dev:i386 3.2.0-0ubuntu3 i386 High-level C++ bindings for zeromq3 - development files
ii libzmqpp3:i386 3.2.0-0ubuntu3 i386 High-level C++ bindings for zeromq3