MQTT Connectivity issue - Message Expiry Interval

337 Views Asked by At

I have been encountering an issue for a long time but I haven't found any solution yet, despite reading the protocol version 5 standard and the emqx documentation.

I want to publish messages with a time limit to simulate a situation where my device is unavailable so that, after the time limit has expired, the broker will delete the message and my device will not receive it. I want the reason that my device will not be available (and therefore will not receive messages), to be because it is in a closed area, like a tunnel, or in a area with no cellular cove-range and not because it initiated a disconnect from the broker or because the “keepalive” value has expired. My understanding is that I can use the “messageExpiryInterval” property (in protocol 5) to implement my goal.

I used EMQX broker as follows: const connectUrl = 'mqtt://broker.emqx.io:1883';

Along with the following connection configuration:

const client = mqtt.connect(connectUrl, {
  clientId: 'mqtt_dani_pub',
  protocolVersion: 5,
  keepalive: 1800,
  clean: true
});

And when sending a message, I put the following values:

const options = { 
  qos: 0,
  retain: false,
  properties:  { messageExpiryInterval: 30 }
};

As you can see, I used a high value, 1800, for “keepalive” to ensure that the device will be connected to the broker for a long time.

To simulate this situation, I used one publisher on one PC, and one subscriber on another PC.

The scenario is as follows:

  1. Connect publisher and subscriber to emqx broker a. Verify MQTT v5 protocol.
  2. Publish the message (when the subscriber is connected) to emqx with MessageExpiryInterval: 30 a. Subscribe receive the message
  3. Turn OFF the Wi-Fi on the subscriber computer.
  4. Publish the message to emqx with MessageExpiryInterval: 30
  5. Wait for 120 seconds
  6. Turn ON the Wi-Fi on the subscriber computer. a. Subscriber does not receive the message ==> but it does get the message!

In addition to this, I saw in the standard of protocol 5 section 3.3.2.3.3 (Message Expiry Interval - https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.pdf ) that the PUBLISH packet sent to a Client by the Server MUST contain a Message Expiry Interval set to the received value minus the time that the Application Message has been waiting in the Server, so this may be the problem.

my publisher js code:

import mqtt, { MqttClient } from 'mqtt';
import * as readline from 'node:readline'
import { stdin, stdout } from 'process';

const connectUrl = 'mqtt://broker.emqx.io:1883';
const clientId = 'mqtt_dani_pub';
const topic = 'dani/test';
const subject = 'Publisher';

const rl = readline.createInterface({
  input:stdin,
  output:stdout
});

const client = mqtt.connect(connectUrl, {
  clientId,
  protocolVersion: 5,
  keepalive: 1800,
  clean: true
});

client.on('connect', () => {
  console.log(`${subject} client connected..`)
  client.subscribe([topic], () => {
    console.log(`Subscribe to topic '${topic}'`);
  })
});

const options = { 
  qos: 0,
  retain: false,
  properties:  { messageExpiryInterval: 30 }
};

const publishMsg = (message) => {
  client.publish(topic,
    `${clientId} - ${Date.now()} - ${message}`, 
    options,
    (error) => {
      if (error) {
        console.error(error)
      }
    }
  );
};

const input2topic = () => {
  return new Promise(resolve => {
    rl.question(`send message to topic ${topic}: `,
      (input) => {
        if(input !== 'exit'){
          console.log(`writing to topic ${topic}..`);
          publishMsg(input);
          resolve(true);
        } else{
          console.log('exit...');
          resolve(false);
        }
      });
  });
}

const main = async () => {
  publishMsg('first message');
  let flag = true;
  while(flag){
    await new Promise(resolve => setTimeout(resolve, 1000));
    flag = await input2topic();
  }
  rl.close();
  client.end();
}

main();

my subscriber js code:

import mqtt, { MqttClient } from 'mqtt';

const connectUrl = 'mqtt://broker.emqx.io:1883';
const clientId = 'mqtt_dani_sub';
const topic = 'dani/test';
const subject = 'Subscriber';

const client = mqtt.connect(connectUrl, {
  clientId,
  keepalive: 1800,
  protocolVersion: 5,
})

client.on('connect', () => {
  console.log(`${subject} client connected`)
  client.subscribe([topic], {qos: 0}, () => {
    console.log(`Subscribe to topic '${topic}'`)
  })
});

client.on('message', (topic, payload, packet) => {
  console.log('\nReceived Message:', { 
    ...packet,
    message: payload.toString(),
    msg_length: payload.toString().length,
    time: new Date(),
  });
});
0

There are 0 best solutions below