I am trying to send JSON as payload through MQTT messages using Spring Integration. This is my outbound handler:
@Bean
@ServiceActivator(inputChannel = MQTT_OUTBOUND_CHANNEL)
public MessageHandler mqttOutbound(final ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager,
final MqttClientPersistence persistence,
final MqttHeaderMapper mqttHeaderMapper,
final JacksonJsonMessageConverter jacksonJsonMessageConverter) {
final var messageHandler = new Mqttv5PahoMessageHandler(clientManager);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setPersistence(persistence);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(false);
final var defaultPahoMessageConverter = new DefaultPahoMessageConverter(MqttQoS.AT_LEAST_ONCE.value(), false, StandardCharsets.UTF_8.name());
final var bytesMessageMapper = new ConvertingBytesMessageMapper(jacksonJsonMessageConverter);
defaultPahoMessageConverter.setBytesMessageMapper(bytesMessageMapper);
messageHandler.setConverter(defaultPahoMessageConverter);
return messageHandler;
}
However, this ends up asserting in org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler#onInit on
else {
Assert.state(!(getConverter() instanceof MqttMessageConverter),
"MessageConverter must not be an MqttMessageConverter");
}
How else can I do this?
I tried to study the Spring Integration samples source by I did not glean anything from it.
Yes. We just don't have enough resources on our side to cover all the features we implement for the community with samples. So, indeed there is no MQTT v5 sample over there.
As you see
Mqttv5PahoMessageHandlercannot be supplied with theMqttMessageConverter. Instead regular instance ofMessageConvertermust be used. In your case the providedjacksonJsonMessageConvertermust be used directly:See more info in docs: https://docs.spring.io/spring-integration/reference/mqtt.html#mqtt-v5