I have read about IntegrationFlows, subflows, splitters, and aggregators, but yet cannot find a good solution on how to implement a simple DRDA server based on spring integration:

DRDA sends command chunks prefixed with a 2-byte length, however, there is a flag that indicates last or more commands send through a socket. So I used ByteArrayLengthHeaderSerializer(2), but that does not create a single byte array, it splits the input stream into independent commands, which is good for processing them separately. On the other hand, I need to know how many of those input messages are chained together, in order to be able to send them back chained. Similar to the input, the output messages also need to have a flag indicating the last message or not.

Trying to make it better understandable, what I want to implement:

DSSCommand (LLxxxxEndFLAGxxxxCodePoint...)
DSSReply (LLxxxxEndFLAGxxxxCodePoint...)

So the input on the socket is for example:
DSSCommand1 (with EndFlag=off) DSSCommand2 (with EndFlag=on)
SpringIntegration should then process the Commands separately with Bean methods based on CodePoint which is an Integer.
On the Reply, if I use an aggregator, I need to set the EndFLAG only at the last reply DSSReply Object.
DSSReply1 (with Endflag=off) DSSReply2 (with EndFlag=on)

So my question is based on the fact, that the ByteArrayLengthHeaderSerializer already splits the input into separate messages and I would need to count them on a per input socket basis and check if one of them is the last one, but if I use an aggregator for the output, DSSReply1 and 2 would be a single message and DSSReply2 would miss the length field and the length Field of DSSReply1 would contain the length of both DSSReply1 and 2.
Is there are way once aggregated, to split the messages off again and then send those split messages in the right order (based on the end flag) back to the socket that has to send the requests? E.g. correlate by CONNECTION_ID?

The easiest way is probably to use my own serializer treating the chained input DSSCommands and the chained output DSSCommands as single-byte arrays without splitting them and operate on parsing the complete byte array based on isAvailable() on the socket. But I wonder if it's possible to do it differently. I would also need to throw an exception if the last input on the socket misses the Endflag as a protocol violation.

It might sound complicated, but I really would like to put as much logic as possible into spring integration.

Thanks.

1

There are 1 best solutions below

3
On

It sounds like a custom TcpConnection would be the best solution; overriding getPayload().

The TcpNetConnection implementation is this...

@Override
public Object getPayload() {
    InputStream inputStream;
    try {
        inputStream = inputStream();
    }
    catch (IOException e1) {
        throw new SoftEndOfStreamException("Socket closed when getting input stream", e1);
    }
    try {
        return getDeserializer()
                .deserialize(inputStream);
    }
    catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

If you subclass that you can call the deserializer multiple times and return a Collection of chunks.

Implement a custom TcpNetConnectionSupport to return your new connection type.