JeroMQ: connection does not recover reliably

484 Views Asked by At

I have two applications, sending messages asynchronously in both directions. I am using sockets of type ZMQ.DEALER on both sides. The connection status is additionally controlled by heartbeating.

I have now problems to get the connection reliably recovering after connection problems (line failure or application restart on one side). When I restart the applicaton on the server side (the side doing the bind()), the client side will not always reconnect successfully and then needs to be restarted, especially when the local buffer has reached the HWM limit.

I did not find any other way to make the connection recovery reliable, other than resetting the complete ZMQ.Context in case of heartbeat failures or if send() returned false. I will then call Context.term() and will create Context and Socket again. This seemed to work fine in my tests. But now I observed occasional and hangups inside Context.term(), which are rare and hard to reproduce. I know, that creating the Context should be done just once at application startup, but as said I found no other way to re-establish a broken connection.

I am using JeroMQ 0.3.4. The source of a test application is below, ~200 lines of code.

Any hints to solve this are very much appreciated.

import java.util.Calendar;
import org.zeromq.ZMQ;

public class JeroMQTest {
    public interface IMsgListener {
        public void newMsg(byte[] message);
    }

    final static int delay = 100;
    final static boolean doResetContext = true;
    static JeroMQTest jeroMQTest;
    static boolean isServer;
    private ZMQ.Context zContext;
    private ZMQ.Socket zSocket;
    private String address = "tcp://localhost:9889";
    private long lastHeartbeatReceived = 0;
    private long lastHeartbeatReplyReceived;
    private boolean sendStat = true, serverIsActive = false, receiverInterrupted = false;
    private Thread receiverThread;
    private IMsgListener msgListener;

    public static void main(String[] args) {
        isServer = args.length > 0 && args[0].equals("true");

        if (isServer) {
            new JeroMQTest().runServer();
        }
        else {
            new JeroMQTest().runClient();
        }
    }

    public void runServer() {
        msgListener = new IMsgListener() {
            public void newMsg(byte[] message) {
                String msgReceived = new String(message);
                if (msgReceived.startsWith("HEARTBEAT")) {
                    String msgSent = "HEARTBEAT_REP " + msgReceived.substring(10);
                    sendStat = zSocket.send(msgSent.getBytes());
                    System.out.println("heartbeat rcvd, reply sent, status:" + sendStat);
                    lastHeartbeatReceived = getNow();
                } else {
                    System.out.println("msg received:" + msgReceived);
                }
            }
        };

        createJmq();
        sleep(1000);

        int ct = 1;
        while (true) {
            boolean heartbeatsOk = lastHeartbeatReceived > getNow() - delay * 4;
            if (heartbeatsOk) {
                serverIsActive = true;
                String msg = "SERVER " + ct;
                sendStat = zSocket.send(msg.getBytes());
                System.out.println("msg sent:" + msg + ", status:" + sendStat);
                ct++;
            }

            if (serverIsActive && (!heartbeatsOk || !sendStat)) {
                serverIsActive = false;
                if (doResetContext) {
                    resetContext();
                }
            }
            sleep(delay);
        }
    }

    public void runClient() {
        msgListener = new IMsgListener() {
            public void newMsg(byte[] message) {
                String msgReceived = new String(message);
                if (msgReceived.startsWith("HEARTBEAT_REP")) {
                    System.out.println("HEARTBEAT_REP received:" + msgReceived);
                    lastHeartbeatReplyReceived = getNow();
                }
                else {
                    System.out.println("msg received:" + msgReceived);
                }
            }
        };

        createJmq();
        sleep(1000);

        int ct = 1;
        boolean reconnectDone = false;
        while (true) {
            boolean heartbeatsOK = lastHeartbeatReplyReceived > getNow() - delay * 4;
            String msg = "HEARTBEAT " + (ct++);
            sendStat = zSocket.send(msg.getBytes());
            System.out.println("heartbeat sent:" + msg + ", status:" + sendStat);
            sleep(delay / 2);

            if (sendStat) {
                msg = "MSG " + ct;
                sendStat = zSocket.send(msg.getBytes());
                System.out.println("msg sent:" + msg + ", status:" + sendStat);
                reconnectDone = false;
            }

            if ((!heartbeatsOK && lastHeartbeatReplyReceived > 0) || (!sendStat && !reconnectDone)) {
                if (doResetContext) {
                    resetContext();
                }
                lastHeartbeatReplyReceived = 0;
                reconnectDone = true;
            }
            sleep(delay / 2);
        }
    }

    public void resetContext() {
        closeJmq();
        sleep(1000);
        createJmq();
        System.out.println("resetContext done");
    }

    private void createJmq() {
        zContext = ZMQ.context(1);
        zSocket = zContext.socket(ZMQ.DEALER);
        zSocket.setSendTimeOut(100);
        zSocket.setReceiveTimeOut(100);
        zSocket.setSndHWM(10);
        zSocket.setRcvHWM(10);
        zSocket.setLinger(100);

        if (isServer) {
            zSocket.bind(address);
        } else {
            zSocket.connect(address);
        }

        receiverThread = new Thread() {
            public void run() {
                receiverInterrupted = false;
                try {
                    ZMQ.Poller poller = new ZMQ.Poller(1);
                    poller.register(zSocket, ZMQ.Poller.POLLIN);
                    while (!receiverInterrupted) {
                        if (poller.poll(100) > 0) {
                            byte byteArr[] = zSocket.recv(0);
                            msgListener.newMsg(byteArr);
                        }
                    }
                    poller.unregister(zSocket);
                } catch (Throwable e) {
                    System.out.println("Exception in ReceiverThread.run:" + e.getMessage());
                }
            }
        };
        receiverThread.start();
    }

    public void closeJmq() {
        receiverInterrupted = true;
        sleep(100);
        zSocket.close();
        zContext.term();
    }

    long getNow() {
        Calendar now = Calendar.getInstance();
        return (long) (now.getTime().getTime());
    }

    private static void sleep(int mSleep) {
        try {
            Thread.sleep(mSleep);
        } catch (InterruptedException e) {
        }
    }
}
0

There are 0 best solutions below