Tcp inbound gateway does not call the Serilaizer but deserializer is getting called

49 Views Asked by At

We need to listen to the port and receive the message and then we need to write it back in the same output stream. we have written custom deserializer which is working fine. but custom serilaizer is not getting called. We have used Spring integration flow.

@Bean
    public IntegrationFlow integrationFlow() {

        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.nioServer(port)
              .deserializer(new TestDeserializer())
             .serializer(new TestSerializer())
             ))
                .transform(Transformers.objectToString())
                .handle("outboundService", "processAndSendMessage")
                .get();
    }

Find below Custom Deserializer:

public class TestDeserializer implements Deserializer<String>{

    private static final char END_OF_BLOCK = '\u001c';
    
    @Override
    public String deserialize(InputStream inputStream) throws IOException {
        boolean end_of_message = false;
        int characterReceived = 0;
        StringBuffer parsedMessage = new StringBuffer();
        characterReceived = inputStream.read();
        
        while (!end_of_message) {
            characterReceived = inputStream.read();
            if (characterReceived == END_OF_BLOCK) {
                characterReceived = inputStream.read();
                end_of_message = true;
            }else {
                parsedMessage.append((char) characterReceived);
            }
        }
        String message = parsedMessage.toString();
        inputStream.close();
        parsedMessage = null;
        return message;
    }

}

Find below Custom Serializer

public class TestSerializer implements Serializer<String>{

    @Override
    public void serialize(String object, OutputStream outputStream) throws IOException {
        System.out.println("inside Serializer -- "+object);
        outputStream.write(object.getBytes());
        System.out.println("inside after Serializer -- "+object);
    }

}

Custom Serializer is not getting called in the Spring Integration flow.

2

There are 2 best solutions below

2
On

The Serializer is called from a Tcp.inboundGateway() when downstream flow returns a reply.

Please, be sure that your .handle("outboundService", "processAndSendMessage") returns something. Otherwise there is no point in the Tcp.inboundGateway().

1
On

There is something wrong with your environment; this works fine; if you can't figure it out, post an MCRE.

@SpringBootApplication
public class So77329593Application {

    private static final Logger log = LoggerFactory.getLogger(So77329593Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So77329593Application.class, args);
    }

    @Bean
    IntegrationFlow flow() {
        return IntegrationFlow.from(Tcp.inboundGateway(Tcp.nioServer(1234)
                .deserializer(new MyDe())
                .serializer(new MySe())))
            .log()
            .bridge() // just echo back
            .get();
    }

    @Bean
    ApplicationRunner runner() {
        ByteArrayCrLfSerializer deser = new ByteArrayCrLfSerializer();
        return args -> {
            Socket sock = SocketFactory.getDefault().createSocket("localhost", 1234);
            sock.getOutputStream().write("foo\r\n".getBytes());
            log.info(new String(deser.deserialize(sock.getInputStream())));
            sock.close();
        };
    }

    static class MyDe implements Deserializer<String> {

        private final ByteArrayCrLfSerializer deser = new ByteArrayCrLfSerializer();

        @Override
        public String deserialize(InputStream inputStream) throws IOException {
            log.info("In deserializer");
            return new String(this.deser.deserialize(inputStream));
        }

    }

    static class MySe implements Serializer<String> {

        @Override
        public void serialize(String str, OutputStream outputStream) throws IOException {
            log.info("In serializer");
            outputStream.write(str.getBytes());
            outputStream.write("\r\n".getBytes());
        }

    }

}
[main] Started So77329593Application in 0.786 seconds (process running for 1.158)
[pool-2-thread-3] In deserializer
[pool-2-thread-3] GenericMessage [payload=foo, headers= ...
[pool-2-thread-3] In serializer
[main] foo