I made a PUB/SUB connection using zmqpp and now I want to send data from the publisher to the subscribers using the header-only, C++11 version of msgpack-c.
The publisher has to send 2 int64_t
numbers -- header_1
and header_2
-- followed by a std::vector<T>
-- data
--, where T
is determined by the (header_1, header_2)
combination.
Sinse there aren't that many examples on how to combine msgpack and zmqpp, the idea I came up with is to send a 3-part message by using zmqpp::message::add/add_raw
. Each part would be packed/unpacked using msgpack.
The publisher packs a single data part as follows:
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
And the receiver unpacks it like this:
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
When I run the code, I get the following error on the subscriber side:
terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
what(): insufficient bytes
Aborted
Also, it seems that zmqpp has generated a 5-part message, even though I called add()
only 3 times.
Q1: Am I packing/unpacking the data correctly ?
Q2: Is this the proper method for sending msgpack buffers using zmqpp ?
Here are the important parts of the code:
Publisher
zmqpp::socket publisherSock;
/* connection setup stuff ...*/
// forever send data to the subscribers
while(true)
{
zmqpp::message msg;
// meta info about the data
int64_t header_1 = 1234567;
int64_t header_2 = 89;
// sample data
std::vector<double> data;
data.push_back(1.2);
data.push_back(3.4);
data.push_back(5.6);
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
cout << "header_1:" << header_1 << endl; // header_1:1234567
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_2);
msg.add(buffer.data(), buffer.size());
cout << "header_2:" << header_2 << endl; // header_2:89
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
msg.add_raw(buffer.data(), buffer.size());
std::cout << "data: " << data << std::endl; // data:[1.2 3.4 5.6]
}
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"... why ?
publisherSock.send(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Subscriber
zmqpp::socket subscriberSock;
/* connection setup stuff ...*/
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
int64_t header_2;
std::vector<double> data;
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"
{
// header 1
{
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
cout << "header_1:" << header_1 << endl;
}
// header 2
{
msgpack::unpacked unpackedData;
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(1)),
msg.size(1));
unpackedData.get().convert(&header_2);
cout << "header_2:" << header_2 << endl;
}
// data
{
msgpack::unpacked unpacked_data;
msgpack::unpack(unpacked_data,
static_cast<const char*>(msg.raw_data(2)),
msg.size(2));
unpacked_data.get().convert(&data);
std::cout << "data:" << data << std::endl;
}
}
EDIT: Problem solved: As pointed out by @Jens, the correct way of packing/sending data is by using zmqpp::message::add_raw()
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());
I think the calls to
msg.add(buffer.data(), buffer.size()
do not add a array ofbuffer.size()
bytes, but callmessage::add(Type const& part, Args &&...args)
, whichmsg << buffer.data()
, which probably callsmessage::operator<<(bool)
since a pointer converts to booladd(buffer.size())
which then callsmsg << buffer.size()
, which adds asize_t
value as the next part.Looking at the zmqpp::message class, using message::add_raw should do the trick.
PS: This is all without any guarantee because I have never used zmqpp or msgpack.