I've created a template class for IPC using Message Queue.
I'm running my program in infinite while loop (called main loop).
I collect data from various sub-systems (sensors) over Ethernet and pass the received data to appropriate processes using Message Queue (their are multiple different processes that can act as data sink each with their own message queue).
I've just run the program and I'm not performing any activity. This is the only program running and I reboot the OS before every run.
The program is just running in while loop where all flags are set to false; hence the program is just running a blank (empty) loop.
Randomly I'm getting boost::interprocess_exception::library_error. As their is no activity I expected that their should be no error.
I commented out the Ethernet related code but still I'm getting the same error.
I'm getting error in statement:
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
I tried with primaryNode set to true or false. I get the same error.
Code :
ipc.hpp
#pragma once
#include <thread>
#include <string>
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <iostream>
#include <functional>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
/// @brief
/// @tparam T1 Specifies the data-type that has to be sent
/// @tparam T2 Specifies the data-type that has will be received
/// @tparam primaryNode Denotes if the RTP is the primaryNode owner/creater of the message queue
template<typename T1, typename T2, bool primaryNode>
class Ipc
{
private:
static const std::uint8_t MAX_MESSAGE_DEPTH = 5; //Specifies the number of messages will the message queue hold
using callback_t = std::function<void(void)>;
callback_t mCallback;
std::unique_ptr<boost::interprocess::message_queue> mPrimaryToSecondaryMessageQueue;
std::unique_ptr<boost::interprocess::message_queue> mSecondaryToPrimaryMessageQueue;
std::string mPrimaryToSecondaryMessageQueueName;
std::string mSecondaryToPrimaryMessageQueueName;
std::thread mReceiveThread;
std::atomic_bool mExitReceiveThread{ false };
boost::lockfree::spsc_queue<T2, boost::lockfree::capacity<MAX_MESSAGE_DEPTH>> mReceiveDataQueue;
void listen(void);
public:
Ipc() {}
bool open(const std::string& queueName);
bool close(void);
bool send(const T1& data, std::uint32_t priority = 10);
std::optional<T2> receive(void);
bool register_callback(callback_t callback_implementation);
bool isDataAvailableInReceiveDataQueue(void) const;
};
template<typename T1, typename T2, bool primaryNode>
inline void Ipc<T1, T2, primaryNode>::listen(void)
{
T2 receiveData;//Buffer to store received data
std::uint64_t receiveLength;
std::uint32_t priority;
while(this->mExitReceiveThread.load() == false)
{
try
{
std::memset(&receiveData, 0, sizeof(receiveData)); //Initialize buffer to 0
receiveLength = 0; //Initialize read length to 0
priority = 0; //Initialize priority to 0
if (primaryNode == true)
{
this->mSecondaryToPrimaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
else
{
this->mPrimaryToSecondaryMessageQueue->receive(
&receiveData,
sizeof(receiveData),
receiveLength,
priority
);
}
this->mReceiveDataQueue.push(receiveData);
this->mCallback();
}
catch (const std::exception& ex)
{
std::cout << "Inside Listen Exception\n";
std::cout << ex.what() << std::endl;
}
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::open(const std::string& queueName)
{
try
{
if(primaryNode == true)
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
else
{
this->mPrimaryToSecondaryMessageQueueName = queueName + std::string("_out");
this->mSecondaryToPrimaryMessageQueueName = queueName + std::string("_in");
}
//Open-Create message queue to send data from primaryNode node to secondary node
this->mPrimaryToSecondaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mPrimaryToSecondaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T1)
);
//Open-Create message queue to send data from secondary node to primaryNode node
this->mSecondaryToPrimaryMessageQueue = std::make_unique<boost::interprocess::message_queue>(
boost::interprocess::open_or_create,
this->mSecondaryToPrimaryMessageQueueName.c_str(),
MAX_MESSAGE_DEPTH,
sizeof(T2)
);
//Start Listner Thread
this->mReceiveThread = std::thread(&Ipc::listen, this);
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::close(void)
{
try
{
this->mExitReceiveThread.store(true); //Marked to close thread
boost::interprocess::message_queue::remove(this->mPrimaryToSecondaryMessageQueueName.c_str());//Delete Primary to Secondary Message Queue
boost::interprocess::message_queue::remove(this->mSecondaryToPrimaryMessageQueueName.c_str());//Delete Secondary to Primary Message Queue
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::send(const T1& data, std::uint32_t priority)
{
try
{
if (primaryNode == true) //Send message on Primary to Secondary Queue
{
this->mPrimaryToSecondaryMessageQueue->send(&data, sizeof(data), priority);
}
else //Send message on Secondary to Primary Queue
{
this->mSecondaryToPrimaryMessageQueue->send(&data, sizeof(data), priority);
}
return true;
}
catch (const std::exception& ex)
{
std::cout << ex.what() << std::endl;
return false;
}
}
template<typename T1, typename T2, bool primaryNode>
inline std::optional<T2> Ipc<T1, T2, primaryNode>::receive(void)
{
std::optional<T2> data{ std::nullopt };
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable, pop first element
{
data = this->mReceiveDataQueue.front();
this->mReceiveDataQueue.pop();
}
else
{
//data = std::nullopt; //Not needed
}
return data;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::register_callback(callback_t callbackImplementation)
{
try
{
this->mCallback = callbackImplementation;
return true;
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << '\n';
}
return false;
}
template<typename T1, typename T2, bool primaryNode>
inline bool Ipc<T1, T2, primaryNode>::isDataAvailableInReceiveDataQueue(void) const
{
if (this->mReceiveDataQueue.read_available() > 0) //If data is avaiable
{
return true;
}
else
{
return false;
}
}
main.cpp
#include <ipc.hpp>
#include <iostream>
//P1 stands for Process 1
//P2 stands for Process 2
struct P1ToP2
{
float a;
int b;
};
struct P2ToP1
{
int a;
int b;
};
Ipc<P1ToP2, P2ToP1, false> ipc1; //Global IPC object
void message_queue_data_received(void)
{
if (ipc1.isDataAvailableInReceiveDataQueue() == true)
{
auto s = ipc1.receive();
if (s.has_value() == true)
{
std::cout << "a : " << s->a << "\tb : " << s->b << std::endl;
}
}
}
int main(int argc, char *argv[])
{
bool dataReceivedOnEthernet = false;
ipc1.register_callback(message_queue_data_received);
this->ipc1.open("ipc1");
while(true)
{
if(dataReceivedOnEthernet == true) //Flag set by other thread
{
P1ToP2 p;
p.a = 10.23; //Some Data received over ethernet
p.b = 10; //Some Data received over ethernet
ipc1.send(p); //Send data over IPC
}
//Other Code
}
}
Error
boost::interprocess_exception::library_error
Why do the processes use a different type for the message, while silently assuming they're the same size (and both trivial and standard layout etc...). Are you mixing up the various types and queues? It looks like it.
It helps if you name things well. Also, remove duplication.
I'd separate the queues by message type. Name them by the roles:
Then, by defining a Channel type:
Now we can simply state:
With construction like
The listener queues received messages. The type depends on client/server mode:
Note how I opted for the much safe
jthreadwith stop tokens for coordinating thread exit:The external operations look much simpler, like:
Sample Client/Server
Let's define the above server to respond to
Requestsby sending back the square-root of a and b/2:That's all. Note how we've added a mechanism to tell the server the client wants it to exit (because the server owns the resources). The client may look something like this:
All it does is send some requests for ~10s and log the responses. Then it tells the server to quit, and closes. Only the server will remove the queues.
A simple main to switch client/server:
Live Demo
Live¹ On ColiruFile
test.hFile
test.cppWith a live demo locally:
¹ sadly online compilers don't allow shared memory access