ZeroMQ pub-sub socket receiving error of a serialized message object using cap'n proto (C++)

208 Views Asked by At

I am using ZeroMQ and Cap'n Proto to send serialized messages between two processes, but I am getting the following error when I try to receive the message on the receiving end:

terminate called after throwing an instance of 'kj::ExceptionImpl'
  what():  capnp/message.c++:99: failed: expected segment != nullptr && segment->checkObject(segment->getStartPtr(), ONE * WORDS); Message did not contain a root pointer.
stack: 7efc84cd8dd4 558b95e72030 558b95e71488 7efc848ebd09 558b95e71259
Aborted 

I have checked that the message and message builder are properly initialized and that the serialization and ZeroMQ message creation steps are being executed successfully. I am not sure what could be causing the "no root pointer" error on the receiving end. Can anyone suggest what might be causing this error and how I can fix it?

Note (in my code): The message.capnp.h file is the generated code from my Cap'n Proto schema file, which defines the Message struct with the preparation and heading fields.

My code for Receiver.cpp

#include "../schema/message.capnp.h"

#include <capnp/message.h>
#include <capnp/serialize.h>
#include <kj/std/iostream.h>
#include <zmq.hpp>

int main() {
  
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_SUB);

socket.connect("tcp://localhost:5555"); //or *
socket.set(zmq::sockopt::subscribe, "");

zmq::message_t zmq_message;
socket.recv(zmq_message);

// create a memory buffer from the received message
kj::ArrayPtr<capnp::word> buffer(reinterpret_cast <capnp::word*>(zmq_message.data()), zmq_message.size() / sizeof(capnp::word));

// create an input stream from the memory buffer
capnp::FlatArrayMessageReader message_reader(buffer); 
Message::Reader message = message_reader.getRoot<Message>();

// print the preparation and heading fields
std::cout << "Preparation: " << message.getPreparation().cStr() << std::endl;
std::cout << "Heading: " << message.getHeading().cStr() << std::endl;

return 0;
}

My code for Sender.cpp


  #include "../schema/message.capnp.h"

  #include <capnp/message.h>
  #include <capnp/serialize.h>
  #include <kj/std/iostream.h>
  #include <zmq.hpp>

int main() {
  
  // create a message builder
  capnp::MallocMessageBuilder message;
  Message::Builder messageBuilder = message.initRoot<Message>();

  // set the preparation and heading fields
  messageBuilder.setPreparation("prep");
  messageBuilder.setHeading("heading");

  zmq::context_t context(1);
  zmq::socket_t socket(context, ZMQ_PUB);

  socket.bind("tcp://*:5555");

  // serialize the message to a memory buffer
  kj::Array<capnp::word> serialized_message = capnp::messageToFlatArray(message); 

  // create a ZeroMQ message from the serialized buffer
  zmq::message_t zmq_message(serialized_message.size() * sizeof(capnp::word));
  memcpy(zmq_message.data(), serialized_message.begin(), serialized_message.size() * sizeof(capnp::word));

  while(true)
  {
    socket.send(zmq_message);
  }

  return 0;
}

My cap'n proto schema language Message.capnp

@0xbf5147cbbecf40c1;

struct Message {
  preparation @0 :Text;
  heading @1 :Text;
}
1

There are 1 best solutions below

2
On

In the two places in your sender code where you have /** sizeof(capnp::word)*/, you should un-comment this, to * sizeof(capnp::word).

This multiplication is needed because the array's size is counted in words, but ZeroMQ wants a size in bytes. Without this, the message is being truncated to 1/8 its actual size. Probably, the size is ending up less than 8 bytes total. On the receiving end, since you (correctly) divide the size by the size of a word, you'd then end up with a zero-size array. Cap'n Proto is then complaining because the array contains no data.