Assertion error polling from inside a new thread in ZeroMQ (JeroMQ)

655 Views Asked by At

I have code that looks like this:

public void handleRequests() {
    ZMQ.Poller items = new ZMQ.Poller(1);
    items.register(clientEndpoint, ZMQ.Poller.POLLIN);
    while (!Thread.currentThread().isInterrupted()) {
        byte[] message;
        items.poll();  // this is the line that throws exception.
        if (items.pollin(0)) {
            message = clientEndpoint.recv(0);
        }
    }
}

It works fine when i call it directly:

foo.handleRequests();

but it fails regularly with assertion errors if it is run in a new thread:

final Runnable listener = worldviewServer::handleRequests;
Executors.newSingleThreadExecutor().execute(listener);

The stack trace I get is shown below:

Exception in thread "pool-6-thread-1" java.lang.AssertionError
at zmq.Mailbox.recv(Mailbox.java:113)
at zmq.SocketBase.process_commands(SocketBase.java:820)
at zmq.SocketBase.getsockopt(SocketBase.java:258)
at zmq.PollItem.readyOps(PollItem.java:107)
at zmq.ZMQ.zmq_poll(ZMQ.java:708)
at zmq.ZMQ.zmq_poll(ZMQ.java:600)
at org.zeromq.ZMQ$Poller.poll(ZMQ.java:1618)
at org.zeromq.ZMQ$Poller.poll(ZMQ.java:1592)
at com.tracelink.worldview.server.Head.handleRequests(Head.java:68)
at com.tracelink.worldview.server.WorldviewServer.handleRequests(WorldviewServer.java:236)
at com.tracelink.worldview.server.fsm.EnablingAction$$Lambda$12/404648734.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I'm using Java 8 with JeroMQ 0.3.5-SNAPSHOT

1

There are 1 best solutions below

2
On

ZMQ sockets are not threadsafe. You have some limited ability to create a socket in one thread and then use it in another, but I'm guessing that the unseen code will branch off into multiple threads all attempting to use the socket at the same time. That's a ZMQ no-no. Generally, you should be creating sockets in the threads they'll be used in.

ZMQ contexts are threadsafe.