Spring Integration MQTT shared subscriptions

120 Views Asked by At

I need to support mqtt in my project. I use RabbitMQ as a broker. I developed Spring Boot application and i use Spring Integration MQTT.

@Configuration
public class MqttConfig {
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[]{ "tcp://localhost:1883" });
        options.setUserName("user_admin");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer", clientFactory(), "example_topic");

        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("MESSAGE: " + message.getPayload());
            }

        };
    }
}

pom.xml

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.2.1</version>
</dependency>

I run two instance of same application in different port 8081 and 8082

  java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8081
  java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8082

I use MQTTX Client Toolbox for testing. When i send several message on the topic example_topic it always arrives at the same port 8082

MESSAGE: Hello World // port 8082

MESSAGE: Hello World // port 8082

MESSAGE: Hello World // port 8082

How can i implement MQTT shared subscriptions to allow client load balancing, which means that the broker splits the message load equally amongst the subscribed clients for a particular topic ?

MESSAGE: Hello World // port 8081

MESSAGE: Hello World // port 8082

MESSAGE: Hello World // port 8081

MESSAGE: Hello World // port 8082
2

There are 2 best solutions below

0
hardillb On

If the broker supports shared subscriptions then using them is just a case of adding the right prefix to the topic the clients subscribe to.

E.g.

$shared/group-identifier/example_topic

Where group-identifier is a unique id for the collection of clients

4
Prince Sachdev On

I was facing the same problem with same type of code. I solved it by starting the instances with $share/groupID/my/topic where groupID is group ID of the subscriber applications in which multiple application instances should be start.

NOTE: groupID value should be the same for all the instances.

I ran 4 application instances and when I published the messages on my/topic 4 times, application instances recieved messages in round-robin manner.

In my case I was using the eclipse mosquitto mqtt broker. I referred https://cedalo.com/blog/mqtt-shared-subscriptions-guide/ as a guide. This guide will also show how mqtt broker sends messages among multiple groups where each group has multiple instances.

This approached will work like a charm if you want your MQTT traffic to load balance among multiple subscriber applications in round-robin manner.

Thanks.