Fundamental difference between inbound and outbound channel adapters

49 Views Asked by At

In my project I have MQTT protocol, RabbitMQ as a broker and Spring Integration. The flow is simple:

RabbitMQ(data source) --> inbound flow (convert data from Base64 to JsonObject) --> outbound flow (send data to another topic)

This is an example data from RabbitMQ eyJrZXkiOiJIZWxsbyBXb3JsZCJ9 which is encoded {"key":"Hello World"}

As I understand the difference between outbound and inbound channel adapter that, any inbound adapter is intended to get data from external system (source of the data), terminus of that data is a outbound adapter.

  • In my case is that correct that RabbitMQ is a external system?

  • How can I decode/encode the data IntegrationFlow.transform from Base64 eyJrZXkiOiJIZWxsbyBXb3JsZCJ9 to JsonObject {"key":"Hello World"}?

  • Am I correct understand the idea of Spring Integration because I think that I confused by outbound and inbound channel adapter?

I read this documentation.

@SpringBootApplication
public class DeepDiveIntegrationApplication {
    public static void main(String[] args) {
        SpringApplication.run(DeepDiveIntegrationApplication.class, args);
    }


    @Value("${mqtt.username}")
    private String mqttBrokerUsername;

    @Value("${mqtt.password}")
    private String mqttBrokerPassword;
    @Bean
    MqttPahoClientFactory clientFactory(@Value("${mqtt.brokerUrl}") String host){
        var factory = new DefaultMqttPahoClientFactory();
        var options = new MqttConnectOptions();
        options.setServerURIs(new String[]{host});
        options.setUserName(mqttBrokerUsername);
        options.setPassword(mqttBrokerPassword.toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    MessageChannel integrationMessageChannels(){
        return MessageChannels.direct().getObject();
    }

    @Bean
    MqttPahoMessageDrivenChannelAdapter inboundAdapter(@Value("${mqtt.decodedTopic}") String topic, MqttPahoClientFactory factory){
       var adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClientID", factory, topic);
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(integrationMessageChannels());

        return adapter;
    }
    @Bean
    IntegrationFlow inboundFlow(MqttPahoMessageDrivenChannelAdapter inboundAdapter){
        return IntegrationFlow
                .from(inboundAdapter)
                .transform( ... )  // How to implement decoder
                .handle((payload, headers) -> {
                    System.out.println(payload);
                    return null;
                })
                .get();
    }

    @Bean
    MqttPahoMessageHandler outboundAdapter(@Value("${mqtt.encodedTopic}") String topic, MqttPahoClientFactory factory){
        var mh = new MqttPahoMessageHandler("producerClientID", factory);
        mh.setDefaultTopic(topic);
        return mh;
    }

    @Bean
    IntegrationFlow outboundFlow(MessageChannel integrationMessageChannels, MqttPahoMessageHandler outboundAdapter){
        return IntegrationFlow
                .from(integrationMessageChannels)
                .handle(outboundAdapter)
                .get();
    }


}
1

There are 1 best solutions below

1
Artem Bilan On

Your understanding is correct. Let's correlated RabbitMQ abstractions! So, RabbitMQ publisher where you send data into destination is an outbound channel adapter in terms of EIP . The RabbitMQ consumer where you receive data from a destination is an inbound channel adapter in terms of EIP and Spring Integration.

The question about transformer from Base64 is out of scope of this SO thread. Feel free to raise a new one.