I need to support mqtt in my project. I use RabbitMQ as a broker. I developed Spring Boot application and i use Spring Integration MQTT.
@Configuration
public class MqttConfig {
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory clientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{ "tcp://localhost:1883" });
options.setUserName("user_admin");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer", clientFactory(), "example_topic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("MESSAGE: " + message.getPayload());
}
};
}
}
pom.xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.2.1</version>
</dependency>
I run two instance of same application in different port 8081 and 8082
java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8081
java -jar target/mqtt_project-0.0.1-SNAPSHOT.jar --server.port=8082
I use MQTTX Client Toolbox for testing.
When i send several message on the topic example_topic
it always arrives at the same port 8082
MESSAGE: Hello World // port 8082
MESSAGE: Hello World // port 8082
MESSAGE: Hello World // port 8082
How can i implement MQTT shared subscriptions to allow client load balancing, which means that the broker splits the message load equally amongst the subscribed clients for a particular topic ?
MESSAGE: Hello World // port 8081
MESSAGE: Hello World // port 8082
MESSAGE: Hello World // port 8081
MESSAGE: Hello World // port 8082
If the broker supports shared subscriptions then using them is just a case of adding the right prefix to the topic the clients subscribe to.
E.g.
Where
group-identifieris a unique id for the collection of clients