ZeroMQ: Is frames/messages order preserved when receiving using receive_raw from multiple clients?

1.1k Views Asked by At

My scenario is as follows: I'm using a high-level C++ binding for 0MQ zmqpp.

I have a publisher socket (XPUB), to which multiple subscribers connect(XSUB). The subscription message starts a cascade of events in my application (that's why I use XPUB). I want to pass some extra information as a part of the subscription message, though I don't want it to be the part of subscription topic, so I put this extra information in the second frame of the message (it can't be sent as a separate message for race-condition reasons). Sadly (or not...) the minute the XPUB socket sees the subscription byte (0x01), it takes for granted that this message can only have one frame and splits it (at least I think this is the reason for such behavior). So the second frame comes as another message.

I though of reading both of these messages and restoring the original 2-frame message either by simply reading them one by one, or by using the receive_raw() function of the socket. My only fear is, that for some reason, the two frames of the original message shall be interleaved with messages from the other clients, working concurrently. I use a poll-in poller with zmqpp::reactor on the XPUB socket to receive the messages and forward them to the internal components, single threaded.

So, my questions is: do my fears have some basis or they are baseless?

This is the code that causes the behavior:

#include <zmqpp/zmqpp.hpp>
#include <iostream>

int main(int argc, char **argv)
{
    using namespace zmqpp;
    using namespace std;

    char frame1[] = {1, 's', 'u', 'b', 's', '\0'};
    char frame2[] = {'d','a','t','a', '\0'};

    context_t context;

    socket_t xpubSocket(context, socket_type::xpub);
    xpubSocket.bind("inproc://pub");

    socket_t xsubSocket(context, socket_type::xsub);
    xsubSocket.connect("inproc://pub");

    xsubSocket.send_raw(frame1, strlen(frame1), socket_t::send_more);
    xsubSocket.send_raw(frame2, strlen(frame2));

    //I changed the first byte to be non-subscription
    char frame1_1[] = {3, 's', 'u', 'b', 's', '\0'};    
    xsubSocket.send_raw(frame1_1, strlen(frame1), socket_t::send_more);
    xsubSocket.send_raw(frame2, strlen(frame2));

    bool doContinue = true;
    while(doContinue)
    {
        message_t incoming;
        doContinue = xpubSocket.receive(incoming, true);

        if(!doContinue)
        {
            break;
        }
        cout << endl <<"New Message: " << endl;

        for(size_t i = 0 ; i < incoming.parts() ; ++i)
        {
            cout << "Frame: " << i << "\t" << incoming.get(i) << endl;
        }
    }
}

The output is:

New Message: 
Frame: 0    subs

New Message: 
Frame: 0    data

New Message: 
Frame: 0    subs
Frame: 1    data

So, it looks like whenever the XPUB socket sees a subscription byte on, it assumes the message has only one frame.

I'm running this on ubuntu, here are package versions:

dpkg -l | grep zmq
ii  libzmq3:i386                                          4.0.4+dfsg-2                                        i386         lightweight messaging kernel (shared library)
ii  libzmq3-dev:i386                                      4.0.4+dfsg-2                                        i386         lightweight messaging kernel (development files)
ii  libzmqpp-dev:i386                                     3.2.0-0ubuntu3                                      i386         High-level C++ bindings for zeromq3 - development files
ii  libzmqpp3:i386                                        3.2.0-0ubuntu3                                      i386         High-level C++ bindings for zeromq3
0

There are 0 best solutions below