Can Netty automatically handle queueing of requests?

1.9k Views Asked by At

In the Apache PLC4X project (https://plc4x.apache.org) we are implementing drivers for industrial PLCs using Netty. Here usually a variety of protocols are layered. Some times one layer requires us to split up one message into multiple messages of the underlying layer. Now we are facing one big problem: One protocol negotiates a maximum number of unconfirmed messages per connection. So we can't send more messages to that than this maximum or the receiver will simply send an error response.

Now we would need to not add things to "out" in the encode method, but to add them to some sort of queue and have some Netty mechanism take care of draining that queue ... is there such a mechanism in Netty? If not, what would be the best way to implement this?

Would also be cool if someone with good Netty insight could join our project mailing list ([email protected]) as we're also working on some really cool additions for Netty (Raw Socket transport on Ethernet Frame and one on IP packet base) ... I bet both projects could benefit greatly from each other.

1

There are 1 best solutions below

8
On BEST ANSWER

While Netty does not provide such a handler out of the box, but because of the internal design, it is really easy to make such max concurrent pending requests out of the box.

Making such handler can be done using the PendingWriteQueue class from the Netty framework in combination with a generic handler:

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.PendingWriteQueue;

public class MaxPendingRequestHandler extends ChannelHandlerAdapter {

    private PendingWriteQueue queue;
    private int freeSlots;

    public MaxPendingRequestHandler(int maxRequests) {
        this.freeSlots = maxRequests;
    }

    private synchronized void trySendMessages(ChannelHandlerContext ctx) {
        if(this.freeSlots > 0) {
            while(this.freeSlots > 0) {
                if(this.queue.removeAndWrite() == null) {
                    ctx.flush();
                    return;
                }
                this.freeSlots--;
            }
            ctx.flush();
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.queue = new PendingWriteQueue(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // Send everything so we get a proper failurefor those pending writes
        this.queue.removeAndWriteAll();
        super.channelInactive(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        this.queue.removeAndWriteAll();
        super.channelUnregistered(ctx);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        this.queue.add(msg, promise);
        trySendMessages(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        synchronized(this) {
            this.freeSlots++;
            trySendMessages(ctx);
        }
        super.channelRead(ctx, msg);
    }

}

This handler works on the fact that it saves every new message in a queue, and it checks the free slots on the wire on every write/read.

Notice that handler should be placed in the pipeline after the packet decoders/encoders, else problems happen with counting incoming packets as potential multiple packets, example:

pipeline.addLast(new PacketCodex()); // A codex exists of an encoder and decoder, you can also ass them seperately
// pipeline.addLast(new TrafficShapingHandler()) // Optional, depending on your required protocols
// pipeline.addLast(new IdleStateHandler())      // Optional, depending on your required protocols
pipeline.addLast(new MaxPendingRequestHandler())
pipeline.addLast(new Businesshandler())

Of course, you also want to verify that our handler works, this can be done using a Unit test containing a EmbeddedChannel & JUnit:

public class MaxPendingRequestHandlerTest {

    @Test
    public void testMaxPending() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAResponseHasReceived() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAllResponseHasReceived() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");
        channel.writeInbound("RE: 2");
        channel.writeInbound("RE: 3");
        channel.writeInbound("RE: 4");
        channel.writeInbound("RE: 5");
        channel.writeInbound("RE: 6");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), "5");
        Assert.assertEquals(channel.readOutbound(), "6");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

    @Test
    public void testMaxPendingWhenAllResponseHasReceivedAndNewMessagesAreSend() {
        EmbeddedChannel channel = new EmbeddedChannel(new MaxPendingRequestHandler(3));

        // channel.writeOutbound("1", "2", "3"); // This doesn't work as it expects operations to complete directly

        channel.write("1");
        channel.write("2");
        channel.write("3");
        channel.write("4");
        channel.write("5");
        channel.write("6");

        channel.writeInbound("RE: 1");
        channel.writeInbound("RE: 2");
        channel.writeInbound("RE: 3");
        channel.writeInbound("RE: 4");
        channel.writeInbound("RE: 5");
        channel.writeInbound("RE: 6");

        channel.write("7");
        channel.write("8");
        channel.write("9");
        channel.write("10");

        Assert.assertEquals(channel.readOutbound(), "1");
        Assert.assertEquals(channel.readOutbound(), "2");
        Assert.assertEquals(channel.readOutbound(), "3");
        Assert.assertEquals(channel.readOutbound(), "4");
        Assert.assertEquals(channel.readOutbound(), "5");
        Assert.assertEquals(channel.readOutbound(), "6");
        Assert.assertEquals(channel.readOutbound(), "7");
        Assert.assertEquals(channel.readOutbound(), "8");
        Assert.assertEquals(channel.readOutbound(), "9");
        Assert.assertEquals(channel.readOutbound(), (Object)null);
    }

}