Write byte array received from mqtt into image in C++

184 Views Asked by At

Hello I am trying to read byte array (image binary) from mqtt and write it back to a png/jpeg(based on original files extension) file in C++ and below is my code, it read the binary data and write to the image file however when I try to open the image I get error:

"Error interpreting JPEG image file(improper call to JPEG library in state 201)"

Below is the image I am using for my test and for testing I am publishing the image using below python code

update: As suggested by @hardillb I tried publish single but same result.

import paho.mqtt.client as mqtt

f=open("/home/anup/Documents/img_2.jpg", "rb") #3.7kiB in same folder
fileContent = f.read()
byteArr = bytearray(fileContent)
import paho.mqtt.client as mqtt #import the client1
broker_address="0.0.0.0" 
client = mqtt.Client("P1") #create new instance
client.connect(broker_address, "1883") #connect to broker
res = client.publish("in",byteArr)#publish
client.loop()
print(res)
import paho.mqtt.publish as publish
publish.single("in", payload=byteArr, hostname="localhost", port=1883)

JPG source image:

enter image description here I am using MQTT binaries from this repo aschiffler/cqq-mqtt and compiled it using Ninja.

Below is my C++ subscriber code:

// Start by `#include`-ing the Mosquitto MQTT Library and other standard libraries.
#include <mqtt/client.h>  // Mosquitto client.
#include <ostream>  // std::cout.
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <fstream>


// With the library header files included, continue by defining a main function.
int main()
{
    // In order to connect the mqtt client to a broker, 
    // Define an Ip address pointing to a broker. In this case, the localhost on port 1883.
    std::string ip = "localhost:1883";
    // Then, define an ID to be used by the client when communicating with the broker.
    std::string id = "consumer";

    // Construct a client using the Ip and Id, specifying usage of MQTT V5.
    mqtt::client client(ip, id, mqtt::create_options(MQTTVERSION_5));
    // Use the connect method of the client to establish a connection to the broker.
    client.connect();
    // In order to receive messages from the broker, specify a topic to subscribe to.
    client.subscribe("in");
    // Begin the client's message processing loop, filling a queue with messages.
    client.start_consuming();

    bool running = true;
    while (running)
    {
        // Construct a message pointer to hold an incoming message.
        mqtt::const_message_ptr messagePointer;

        // Try to consume a message, passing messagePointer by reference.
        // If a message is consumed, the function will return `true`, 
        // allowing control to enter the if-statement body.
        if (client.try_consume_message(&messagePointer))
        {
            // Construct a string from the message payload.
            std::string messageString = messagePointer -> get_payload_str();
            // Print payload string to console (debugging).
            std::cout << messageString << std::endl;

            std::string filename = "/home/anup/Documents/mqtt/cpp-mqtt/image.jpg";
            std:: ofstream outfile(filename, std::ios::binary);
            outfile.write((char*)messageString.c_str(), sizeof(messageString));
            outfile.close();


            // Perform processing on the string.
            // This is where message processing can be passed onto different
            // functions for parsing. 
            // Here, we break the loop and exit the program if a `quit` is received.
            if (messageString == "quit")
            {
                running = false;
            } 
        }
    }

    return 0;
}

The output that I see on the subscribers console (truncated to fit here)


�Y,���àyPà�U���ٳ�F��!*�G9�6dD������xr��é�ì�Tm塦d��˨�TKGj��W�5��6'��nvK��W�D��۩(ځf՛��6��˲p�
                                                                                          �V�+°��?)°27M���̴I��R�������8��/;�޽���<�ì�DB�����5�u^�M�j�ò��K���U��gg��QF�/3%����MЮ��W��8_���d�0j£�"�OX�R���mm�K��K�E4H���L��
                                           ���w��F;�'���8�9�O ��
                                                                        �S)M��é���-ĜBxcP���§-� ֆ����9��+><$?�g�Rf��+a6��1R���.�
e��d��O�ù����� ��Y�_�h�.X�i�mù����W��x�9eG�v3é��T:�����1Z�
�n�����7�_�$G7���ù
W)Wn�=�R���z�è��uè�XZ*(ø�>F���$�B&��h�>�e�;Ҩ�x�d�aò�?��y��՛�è�nҕ�����5���jI�^ç��/e���a�
                                                                                        ��a�òm�N*�߀U��5.<���8z:A��ӍZ��&V§��.�IEND�Bù�

1

There are 1 best solutions below

0
Explorer On

After a bit of struggle and valuable guidance from Mark Setchell and @hardillb, I was able to get the consumer working. Below is the working code, I will keep the original question and working solution as it as so that might come handy to others.

// Start by `#include`-ing the Mosquitto MQTT Library and other standard libraries.
#include <mqtt/client.h>  // Mosquitto client.
#include <ostream>  // std::cout.
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <fstream>
#include <cstddef>
#include <sstream>
#include <vector>


// With the library header files included, continue by defining a main function.
int main()
{
    // In order to connect the mqtt client to a broker, 
    // Define an Ip address pointing to a broker. In this case, the localhost on port 1883.
    std::string ip = "localhost:1883";
    // Then, define an ID to be used by the client when communicating with the broker.
    std::string id = "consumer";

    // Construct a client using the Ip and Id, specifying usage of MQTT V5.
    mqtt::client client(ip, id, mqtt::create_options(MQTTVERSION_5));
    // Use the connect method of the client to establish a connection to the broker.
    client.connect();
    // In order to receive messages from the broker, specify a topic to subscribe to.
    client.subscribe("in");
    // Begin the client's message processing loop, filling a queue with messages.
    client.start_consuming();

    bool running = true;
    while (running)
    {
        // Construct a message pointer to hold an incoming message.
        mqtt::const_message_ptr messagePointer;
        if (client.try_consume_message(&messagePointer))
        {
            // Construct a string from the message payload.
            const auto& byteArry  = messagePointer -> get_payload();
            if (byteArry[0] == '\xFF' && byteArry[1] == '\xD8' && byteArry[2] == '\xFF' && byteArry.back() == '\xD9'){
                std::cout << "The byte array is a JPEG" << std::endl;
                std::string filename = "/home/user/Documents/mqtt/cpp-mqtt/image.jpg";
                std:: ofstream outfile(filename, std::ios::binary);
                outfile.write(reinterpret_cast<const char*>(byteArry.data()), byteArry.size());
                outfile.close();

            }
            else {
                std::cout << "The byte array is a png" << std::endl;
                std::string filename = "/home/user/Documents/mqtt/cpp-mqtt/image.png";
                std:: ofstream outfile(filename, std::ios::binary);
                outfile.write(reinterpret_cast<const char*>(byteArry.data()), byteArry.size());
                outfile.close();
            }
        }
    }

    return 0;
}