Handle deleted MQTT topics with MappingJackson2MessageConverter

124 Views Asked by At

In a customized IntegrationFlow, I use MappingJackson2MessageConverter for subscriptions to generate corresponding POJOs from JSON. This works perfectly so far.

However, some topics are retained and can be deleted. This currently leads to an exception:

ERROR org.springframework.integration.handler.LoggingHandler [MQTT Call: ...]
    ...
Caused by: org.springframework.integration.transformer.MessageTransformationException: failed to transform message, failedMessage=GenericMessage [payload=byte[0], headers={...}]
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
    ...
Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: No content to map due to end-of-input
 at [Source: (byte[])""; line: 1, column: 0], failedMessage=GenericMessage [payload=byte[0], headers={...}]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235)
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185)
    ...
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: (byte[])""; line: 1, column: 0]
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4916)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4818)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3866)
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:221)
    ...

Is there any way to intercept this case and process the deleted topic from the header? I could easily live with getting null as a result. However, as this is not possible with messaging because it would terminate the flow, I am looking for an alternative solution. Would routeByException() be an option or is there a better solution?

My setup looks more or less like this:

    <T> IntegrationFlowRegistration subscriber(
            final MqttConnectionOptions options,
            final String id,
            final String topic,
            final Class<T> type,
            final Class<?> view,
            final GenericHandler<T> handler
    ) {
        final var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
        final IntegrationFlowBuilder builder = IntegrationFlow
                .from(adapter)
                .transform(new PojoTransformer<>(type, view))
                .handle(handler);
        return this.flowContext.registration(builder.get()).register();
    }


    public class PojoTransformer<T> extends AbstractTransformer {
        private final Class<T> type;
        private final Class<?> view;


        public PojoTransformer(
                final Class<T> type,
                final Class<?> view
        ) {
            this.type = type;
            this.view = view;
        }


        @Override
        protected Object doTransform(final Message<?> message) {
            // final Object o = message.getPayload();
            // if (o instanceof final byte[] bytes && bytes.length == 0 || o instanceof final String s && s.isBlank()) {
            //  return null;
            // }
            return new MappingJackson2MessageConverter().fromMessage(message, this.type, this.view);
        }
    }
2

There are 2 best solutions below

3
mrpiggi On

I came finally up with basically something like this:

<T> IntegrationFlowRegistration subscriber(
        final MqttConnectionOptions options,
        final String id,
        final String topic,
        final Class<T> type,
        final Class<?> view,
        final GenericHandler<T> handler,
        final Consumer<MessageHeaders> onDeletion
) {
    final var adapter = new Mqttv5PahoMessageDrivenChannelAdapter(options, id, topic);
    final IntegrationFlowBuilder builder = IntegrationFlow
            .from(adapter)
            .route(byte[].class, payload -> payload.length == 0, spec -> spec
                    .subFlowMapping(true, flow -> flow.handle((p, h) -> {
                        onDeletion.accept(h);
                        return null;
                    }))
                    .defaultOutputToParentFlow()
            )
            .transform(new PojoTransformer<>(type, view))
            .handle(handler);
    return this.flowContext.registration(builder.get()).register();
}

Nevertheless, I am still interested in other and possibly more elegant solutions.

5
Artem Bilan On

So, it looks like your .transform(new PojoTransformer<>(type, view)) throws an exception in case of invalid JSON.

Consider to use an ExpressionEvaluatingRequestHandlerAdvice with a trapException = true option to not re-throw that exception and use some failureChannel to handle that exception from this transformer.

See more in docs: https://docs.spring.io/spring-integration/reference/handler-advice/classes.html#expression-advice