Java NIO: How can I read into a buffer possibly multiple times?

153 Views Asked by At

I am working on a Java project. I have the following code that is part of the Selector's while loop.

The response is supposed to be less than 256 and it ends with a line break. But as far as I understand, it is possible (even though I didn't observe this as the message is pretty small) that only part of the message will be ready for a OP_READ.

As you can see, in the event of a read I'm allocating a Buffer, reading it and converting to a String.

What's the appropriate way to handle a partial message? Ideally, I'd like to wait for the whole message to be ready before reading it. A naive way would be to have a Map<SocketChannel, Buffer> and read until the connection is closed.

if (key.isReadable()) {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Read the response from the server
    ByteBuffer buffer = ByteBuffer.allocate(256);
    int bytesRead = socketChannel.read(buffer);

    if (bytesRead == -1) {
        // The server closed the connection
        socketChannel.close();
        key.cancel();
        channels.remove(socketChannel);
    } else {
        buffer.flip();
        byte[] responseBytes = new byte[buffer.remaining()];
        buffer.get(responseBytes);
        String response = new String(responseBytes);
        if (response.endsWith("\n")) {
            socketChannel.close();
            key.cancel();
            channels.remove(socketChannel);
            responses.add(response);
            System.out.println(response);
        } else {
            System.out.println("need to read more!!!!");
        }
    }
}
1

There are 1 best solutions below

0
On

What's the appropriate way to handle a partial message?

Yes, indeed, that can happen and async systems need to handle this notion of 'we are half done'.

As a broad overview, well, you said it: Buffers.

If a message has been half received, the data you have received so far simply has to be stored someplace. When you don't use async and instead you use threads (which you probably should - the performance improvements of async are virtually always vastly less than fanboy async blogs tend to peddle, and Project Loom - which is the OpenJDK team that works on adding new language features and libraries to improve threading concepts in the JDK, most of which is already part of JDK20, has added the concept of virtual threads which close whatever little distance remained) - all that 'state' (such as 'half a message') is, effectively, stored automatically on the stack. With async you have to handmanage all that stuff.

The intended place to store that stuff is in buffers - ByteBuffer and friends is designed to allow you to do it efficiently.

By way of a chat app, here's how this is all supposed to work using low-level async NIO primitives: ByteBuffer, channels, and selectors.

Situation: You are a chat system that lets 20 people chat together in a group. All 20 clients connect once and maintain a long-running connection; clients can send a message 'SEND stuff to show to the other 19 participants goes here', and that would result in the other 19 connections getting some data transferred from server to those 19 clients with 'SHOW stuff to show to the other 19 participants goes here'.

Our current state is that all 20 connections have sent all they need to send; they are now all waiting for somebody to type something.

These 20 connections are all configured the same way

  • You register with your nio selector that you are currently NOT interested in sending data (because there's nothing to send), but you ARE interested in reading data. Then you tell your selector to go to sleep and wake you up if something 'interesting' happens. You register this for all 20 connections, interesting defined as: There's something to read - and now your system goes to sleep because there really is nothing to do.
  • Jane typed something and thus the selector wakes up the thread and says: Here you go, this socket (jane's socket) is now capable of doing something you told me you were interested in: You can read from it.
  • You ask that socket to fill your buffer. The buffer is filled with 'SEND Hello ever' and that's it. Jane clearly isn't done typing yet - the data doesn't end in a newline.
  • You... just go back to sleep - there is no need to change the list of 'events you are interested in': You are still interested if any of the 20 sockets have data to read, and you are still not interested if any of the 20 sockets are capable of receiving writes. That is very important; if you say you are interested in that, the selector wakes up continuously, and your CPU goes to 100% (because all 20 sockets are capable of sending data. We don't care because there still isn't anything to send, yet). Jane is still typing (or she's finished but het network connection is rather slow), so the selector goes back to sleep because NONE of the 20 things you are interested in, is available. Jane's socket is not ready to read from - you read all there is so far. That half baked message is hanging out in a ByteBuffer.
  • Eventually your NIO selector wakes up again and tells you that, again, jane's socket has more to read. Your code should tell that socket: Here is a bytebuffer (its position is halfway into it, as SEND Hello ever is in it), please fill it as far as you can. The socket writes ybody, how's li and we do this song and dance routine a second time.
  • NIO selector wakes up a third time, you do the same stuff, and the socket writes 'fe!\n' to it and is done. You do the same thing you've done two times before already and check your buffer if it contains at leats one whole message, and this time, it really does (you see that newline and you know what it means).
  • You now update the selector's "what am I interested in" list (the selector keys): You are now interested in 'ready to write' for 19 of the 20 sockets, and let's say in this system jane can't send any more data until this is out (I wouldn't really design it that way, but for the example's sake), so you update jane's socket that you aren't interested in anything. Not even if there's more data to read. It'll just stay in the network buffer.
  • You tell your selector to sleep. That call finishes instantly because there are events you are interested in, namely: all those 19 sockets are ready to send and you said you were interested in that.
  • You go through your list of 'stuff I was interested in that is ready to go' and you start sending 'SHOW Hello everybody how's life!'. However, whilst you were capable of sending that whole thing in one go for the first socket, the second socket only gets 'SHOW Hello everybod' out the door before it returns - it is not capable of sending any more (for some reason, this person is on a teensy tiny network buffer, who knows). You do the same thing: 'y, how's life!\n' is still in a ByteBuffer, you still need to send that to socket 2.
  • You remain interested in socket 2's 'ready to write data' event. Eventually you get through all 19 sockets and for two of those you weren't able to get the whole message out. For those 2 you continue to be interested in 'ready to send', for the other 17 you clear those keys (you are NOT interested that they are ready to send). Eventually you wake up again and those 2 are ready to send more; you eventually get all data out. Somewhere in the process you re-enabled, or for a good chat app, during all of it it was enabled all along, you were also interested in reading from them, in case somebody types while not everything is out.

All sorts of really complex hairy situations occur here, and you have to handle all of them. For example, imagine people are typing so frantically, and Jane's network connection is so flaky, that folks are simply managing to send data at a higher volume than Jane's network can ever handle. That ByteBuffer that contains all the data that still has to be sent to Jane just grows and grows (everytime somebody types a message, all the 20 'stuff to send' buffers are edited to append these messages. They are 'spooled' from time to time (the data still left to send is moved to the front so there's room to add more 'send this too' at the end of it).

Eventually jane's buffer is simply completely full. You can, of course, write some code to grow it (make a new, larger ByteBuffer, copy the data in the original smaller one to it, replace the entry in the hashmap with this new larger bytebuffer), but if you do that, eventually you have a 'to send' bytebuffer that is 1 gigabyte large. At some point you have to decide. It's your app, there is no one standard answer. Do you want to prevent the other 19 from typing any more until there's enough room in jane's buffer? Do you want to disconnect jane? Do you want to stop putting messages in jane's buffer (she will end up not seeing those things; once her network catches up she'll see NEW messages, but these, they'd be lost to her). Do you want to cache them to disk? Do you want to drop jane and tell the others that the system's doing that? Endless answers - you write code to do the thing you feel is right for your application.

If that sounds incredibly complicated, that's because it is! The advantage is that a single thread can do the reading and writing for all 20 connections all at once, though the performance gain is minimal and probably negative (A thread is not some dangerous boogeyman; 20 threads is nothing, and the overhead for them, as well as switching between them, is infinitesemal. The above NIO setup has overhead too - all those buffers).

There are various frameworks like Grizzly that should make it easier to write apps like this.