JeroMQ from time to time yields "IOException. too many files open"

153 Views Asked by At

I have a tomcat7 application that communicates with another application of mine using an IPC end point with JeroMQ in Java. There is a client server scheme and the client waits fro sometime for the response from the server and if it does not receive the response fails the first time without retry.

The code is below

@Override
public List<Result> call() throws Exception {
    final List<Result> results = new LinkedList<>();
    try {
        for (DTO dto : messages) {
            Message m = MessageHelper.MessageMapper(dto);

            Thread.sleep(dto.getDelayBeforeSend());
            final Result mtresult = send(dto);
            results.add(result);
        }
    } catch (RuntimeException e) {
        LOGGER.error("Flow => Uncaught Exception: {}", e.getMessage());
        LOGGER.debug("Flow => Uncaught Exception: ", e);
        Thread t = Thread.currentThread();
        t.getUncaughtExceptionHandler().uncaughtException(t, e);
    }
    return results;
}

private Result send(Message m) {
    ZMQ.Socket client = MQSocketFactory.getMQSocket(serverEndpoint).createRequester();
    try {
        final byte[] DTO = Helper.serializeMessage(m);
        int retriesLeft = 1;
        Result result = new Result(MessageConstants.MESSAGE_FAIL);

        while (retriesLeft > 0 && !Thread.currentThread().isInterrupted()) {

            client.send(DTO, 0);
            int expect_reply = 1;

            while (expect_reply > 0) {

                ZMQ.PollItem items[] = { new ZMQ.PollItem(client, Poller.POLLIN) };
                int rc = ZMQ.poll(items, 3000);
                if (rc == -1) break; // Interrupted

                if (items[0].isReadable()) {
                    final byte[] reply = client.recv(0);
                    if (reply == null) break;
                    result = new Result(new String(reply));
                    if (result.isSuccessful()) {
                        LOGGER.trace("Server replied OK. Result: [{}]", result);
                        retriesLeft = 0;
                        expect_reply = 0;
                    } else LOGGER.error("Malformed reply from server: [{}]", result);

                } else if (--retriesLeft == 0) {
                    LOGGER.error("Server:[{}] seems to be offline, abandoning sending message [{}]!", serverEndpoint, m);
                    break;
                } else {
                    LOGGER.warn("No response from server, retrying...");
                    client = MQSocketFactory.getMQSocket(serverEndpoint).resetRequester(client);
                    client.send(DTO, 0);
                }
            }
        }
        return result;
    } finally {
        MQSocketFactory.getMQSocket(serverEndpoint).destroyRequester(client);

    }
}

Now the MQSocketFactory class is like below:

public final class MQSocketFactory {

private static final Map<String, MQSocket> store = new HashMap<String, MQSocket>();

private static final Logger LOGGER = LoggerFactory.getLogger(MQSocketFactory.class);

public static MQSocket getMQSocket(String endpointName) {
    synchronized (store) {
        MQSocket result = store.get(endpointName);
        if (result == null) {
            result = new MQSocket(endpointName);
            store.put(endpointName, result);
        }
        return result;
    }
}

public static final class MQSocket {

    private final String endpoint;
    private final ZMQ.Context ctx;

    private MQSocket(String endpointName) {
        this.endpoint = endpointName;
        this.ctx = ZMQ.context(1);
    }

    public ZMQ.Socket createRequester() {
        ZMQ.Socket client = null;
        try {
            client = ctx.socket(ZMQ.REQ);
            assert (client != null);
            client.connect(endpoint);
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.error("Error: {}", e);
        }
        return client;
    }

    public ZMQ.Socket resetRequester(ZMQ.Socket socket) {
        destroyRequester(socket);
        return createRequester();
    }

    public void destroyRequester(ZMQ.Socket socket) {
        if (socket != null) {
            try {
                socket.close();
            } catch (Exception e) {
                LOGGER.error("Error: {}", e.getMessage());
                LOGGER.debug("Error: {}", e);
            }
        }
    }

    public ZMQ.Context getContext() {
        return ctx;
    }

    // Responder Unit
    private ZMQ.Socket responder;

    public ZMQ.Socket createResponder() {
        if (responder == null) {
            this.responder = ctx.socket(ZMQ.REP);
            responder.bind(endpoint);
        }
        return responder;
    }

    public ZMQ.Socket resetResponder() {
        destroyResponder();
        return createResponder();
    }

    public void destroyResponder() {
        try {
            responder.close();
        } catch (Exception e) {
            LOGGER.error("Error: {}", e.getMessage());
            LOGGER.debug("Error: {}", e);
        }
    }

}

}

I have done this specifically so every sockets gets closed after the request is done so as to avoid this particular problem with IOExcpetion Too Many file Open. However from very rarely I get this issue and I cannot figure out why. The application maybe working for like days under pretty much the same load and everything being ok but at some points it start throwing the exception and I don't know why.

Also is there a way to increase the ulimit in tomcat7? Right now is 1024.

0

There are 0 best solutions below