Java Selector for socket client not waked up after changing of interested ops from different thread

392 Views Asked by At

I use Java Selector for both server and client. For Server side it works perfect. It stops the thread when i call select() and wakes up when i change interest ops and it is ready for this operation..

But unfortunatelt it does not work for the same way for socket client. It stops the thread and does not wake up for reading or writing when i change interestedOps.

Creation of client connection:

selector = Selector.open()
SocketChannel.open().apply {
    configureBlocking(false)
    connect(address)
    val key = socket.register(selector, SelectionKey.OP_READ or SelectionKey.OP_CONNECT)
    val connection = ClientConnection(key) // Some stuff to hold the key for events
    key.attach(connection)
}

Handle selection inside while loop:

val readyChannels = selector.select()
if (readyChannels == 0) continue

val keyIterator = selector.selectedKeys().iterator()
while (keyIterator.hasNext()) {
    val key = keyIterator.next()
    when (key.readyOps()) {
        SelectionKey.OP_CONNECT -> {
            val socket = (key.channel() as SocketChannel)
            socket.finishConnect()
            key.interestOps(key.interestOps() and SelectionKey.OP_CONNECT.inv())
       
            // WORKS FINE!!!!!
            key.interestOps(key.interestOps() and SelectionKey.OP_WRITE)

            // Does not work at all. Selector will not wake up!
            Thread(){
                key.interestOps(key.interestOps() and SelectionKey.OP_WRITE)  
            }.start()
        }
        SelectionKey.OP_READ -> readPackets(key)
        SelectionKey.OP_WRITE -> writePackets(key)
        SelectionKey.OP_READ or SelectionKey.OP_WRITE -> {
            writePackets(key)
            readPackets(key)
        }
    }

    keyIterator.remove()
}

So. The changing of interestOps from different thread does not work for socket clients. But it works fine for Server sockets..

Found solutions:

  • selector.select(300) -> use some timeout to wake up selector
  • selector.selectNow() -> use non blocking method and check the count of evetns
  • selector.wakeUp() -> save instance and wakeup it manually..

The question is Why it does not work ? Did I do some mistake? Something Missed?

UPD: Server side socket and selector Creation of server socket:

selector = Selector.open()
serverSocket = ServerSocketChannel.open().apply {
    socket().bind(address)
    configureBlocking(false)
    register(selector, SelectionKey.OP_ACCEPT)
}

Iteration of the selector inside Loop:

val readyChannels = selector.select()
if (readyChannels == 0) continue
            
val keyIterator = selector.selectedKeys().iterator()
while (keyIterator.hasNext()) {
    val key = keyIterator.next()
    when (key.readyOps()) {
        SelectionKey.OP_ACCEPT -> {
             val socket = serverSocket.accept().apply { 
                  configureBlocking(false) 
             }
             val client = clientFactory.createClient(selector,socket)
            // Coroutines with Another thread context. 
            // There interestOps will be changed to send first data
            _selectionAcceptFlow.tryEmit(client) 
        }
        SelectionKey.OP_READ -> readPackets(key)
        SelectionKey.OP_WRITE -> writePackets(key)
        SelectionKey.OP_READ or SelectionKey.OP_WRITE -> {
            writePackets(key)
            readPackets(key)
        }
    }
            
    keyIterator.remove()
}
1

There are 1 best solutions below

3
On

If you call key.setInterestOps from a separate thread, you are creating a race condition between that call and the call to selector.select() in the client loop.

Your initial call to register does not contain SelectorKey.OP_WRITE. The first event triggered will be SelectorKey.OP_CONNECT. When handling that event, you indicate that in the future you are also interested in processing OP_WRITE.

  • If you do that in the same thread, then you are guaranteed that the interestOps are set the way you want them before the client loop reaches the call to selector.select(). If there is an OP_WRITE event available, you will process it immediatelly, otherwise the call blocks until it is available.
  • If you do that in a separate thread, then, depending on timing, you may run into a case where the client loop reaches the call to selector.select() and blocks even though there is an OP_WRITE event available. Since the separate thread did not yet change the interestOps, the OP_WRITE event is ignored.

I've included a self-contained example (client sending a message to server).
To test different cases, you can comment/uncomment sections around line 90.

import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.SelectionKey
import java.nio.channels.Selector
import java.nio.channels.ServerSocketChannel
import java.nio.channels.SocketChannel
import java.util.concurrent.CountDownLatch


val address = InetSocketAddress("localhost", 5454)

fun main() {
    val serverSocketSignal = CountDownLatch(1)

    Thread {
        startServer(serverSocketSignal)
    }.start()

    Thread {
        startClient(serverSocketSignal)
    }.start()
}

fun startServer(serverSocketSignal: CountDownLatch) {
    //prepare server socket
    val selector = Selector.open()
    val serverSocket = ServerSocketChannel.open().apply {
        socket().bind(address)
        configureBlocking(false)
        register(selector, SelectionKey.OP_ACCEPT)
    }
    serverSocketSignal.countDown();

    //run server loop
    while (true) {
        println("Server loop")
        val readyChannels = selector.select()
        if (readyChannels == 0) continue

        val keyIterator = selector.selectedKeys().iterator()
        while (keyIterator.hasNext()) {
            val key = keyIterator.next()
            when (key.readyOps()) {
                SelectionKey.OP_ACCEPT -> {
                    println("Server ACCEPT")
                    val socket = serverSocket.accept().apply {
                        configureBlocking(false)
                    }
                    socket.register(selector, SelectionKey.OP_READ)
                }
                SelectionKey.OP_READ -> {
                    val buffer = ByteBuffer.allocate(1024)
                    val count = (key.channel() as SocketChannel).read(buffer)
                    val message = String(buffer.array(), 0, count)
                    println("Server READ - " + message)
                }
            }
            keyIterator.remove()
        }
    }
}

fun startClient(serverSocketSignal: CountDownLatch) {
    serverSocketSignal.await();

    //prepare client socket
    val selector = Selector.open()
    SocketChannel.open().apply {
        configureBlocking(false)
        connect(address)
        register(selector, SelectionKey.OP_CONNECT or SelectionKey.OP_READ)
    }

    //run client loop
    while (true) {
        println("Client loop")
        val readyChannels = selector.select()
        if (readyChannels == 0) continue

        val keyIterator = selector.selectedKeys().iterator()
        while (keyIterator.hasNext()) {
            val key = keyIterator.next()
            when (key.readyOps()) {
                SelectionKey.OP_CONNECT -> {
                    println("Client CONNECT")
                    val socket = (key.channel() as SocketChannel)
                    socket.finishConnect()
                    key.interestOpsAnd(SelectionKey.OP_CONNECT.inv())

/*
This works
*/
                    key.interestOps(SelectionKey.OP_WRITE)

/*
This doesn't work because we're And-ing the interestOps an the OP_WRITE op was not specified when calling register()
*/
//                    key.interestOpsAnd(SelectionKey.OP_WRITE)

/*
This may or may not work, depending on which thread gets executed first
- it will work if the setting interestOps=OP_WRITE in the new thread gets executed before the selector.select() in the client loop
- it will not work if selector.select() in the client loop gets executed before setting interestOps=OP_WRITE in the new thread,
   since there won't be anything to process and the selector.select() gets blocked
On my machine, pausing the client loop even for a small duration was enough to change the result (e.g. the Thread.sleep(1) below).
* */
//                    Thread {
//                        println("Client setting interestedOps to OP_WRITE from new thread")
//                        key.interestOps(SelectionKey.OP_WRITE)
//                    }.start()
//                    //Thread.sleep(1)
                }
                SelectionKey.OP_WRITE -> {
                    println("Client WRITE")
                    val buffer = ByteBuffer.wrap("test message from client".toByteArray());
                    (key.channel() as SocketChannel).write(buffer)
                    key.interestOps(0)
                }

            }
            keyIterator.remove()
        }
    }
}

As for why it works for you on the server side - you would have to share the full code for the server and client (might be a timing issue or your selector might be woken up by some events you did not intend to listen for). The snippets provided in the question do not contain enough infomation.