I am getting this exception:
org.zeromq.ZMQException: Errno 4
at org.zeromq.ZMQ$Socket.mayRaise(ZMQ.java:3732) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZMQ$Socket.recv(ZMQ.java:3530) ~[jeromq-0.5.3.jar:na]
at com.forexassistant.service.zeromq.CurrencyStrengthZeroMQ.sendCurrencyStrengthRequest(CurrencyStrengthZeroMQ.java:30) ~[classes/:na]
at com.forexassistant.service.algorithmlogic.AlgorithmLogic.getCurrencyStrength(AlgorithmLogic.java:209) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor111.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.22.jar:5.3.22]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.22.jar:5.3.22]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_241]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_241]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_241]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]
Suppressed: org.zeromq.ZMQException: Errno 4
at zmq.Ctx.terminate(Ctx.java:304) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZMQ$Context.term(ZMQ.java:671) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZContext.destroy(ZContext.java:136) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZContext.close(ZContext.java:463) ~[jeromq-0.5.3.jar:na]
at com.forexassistant.service.zeromq.CurrencyStrengthZeroMQ.sendCurrencyStrengthRequest(CurrencyStrengthZeroMQ.java:37) ~[classes/:na]
... 13 common frames omitted
I have no idea why it is happening, the logic in question is this that communicates with the socket:
public String sendCurrencyStrengthRequest() {
try (ZContext context = new ZContext()) {
ZMQ.Socket socket = context.createSocket(SocketType.PUSH);
socket.connect("tcp://localhost:32868");
ZMQ.Socket socket2 = context.createSocket(SocketType.PULL);
socket2.connect("tcp://localhost:32869");
String msg = "GET_CURRENCY_STRENGTHS";
socket.send(msg,1);
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket2.recv(0);
if(reply!=null) {
currencyStrengths= new String(reply, ZMQ.CHARSET);
}
context.close();
break;
}
}
return currencyStrengths;
}
This is the code from the other bit:
// CREATE ZeroMQ Context
Context context(PROJECT_NAME);
// CREATE ZMQ_PUSH SOCKET
Socket pushSocket(context, ZMQ_PUSH);
// CREATE ZMQ_PULL SOCKET
Socket pullSocket(context, ZMQ_PULL);
// CREATE ZMQ_PUB SOCKET
Socket pubSocket(context, ZMQ_PUB);
int OnInit() {
EventSetMillisecondTimer(MILLISECOND_TIMER); // Set Millisecond Timer to get client socket input
//EventSetTimer(1790);
context.setBlocky(false);
// Send responses to PULL_PORT that client is listening on.
if(!pushSocket.bind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PULL_PORT))) {
Print("[PUSH] ####ERROR#### Binding MT4 Server to Socket on Port " + IntegerToString(PULL_PORT) + "..");
return(INIT_FAILED);
} else {
Print("[PUSH] Binding MT4 Server to Socket on Port " + IntegerToString(PULL_PORT) + "..");
pushSocket.setSendHighWaterMark(1);
pushSocket.setLinger(0);
}
// Receive commands from PUSH_PORT that client is sending to.
if(!pullSocket.bind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT))) {
Print("[PULL] ####ERROR#### Binding MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
return(INIT_FAILED);
} else {
Print("[PULL] Binding MT4 Server to Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pullSocket.setReceiveHighWaterMark(1);
pullSocket.setLinger(0);
}
// Send new market data to PUB_PORT that client is subscribed to.
if(!pubSocket.bind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_PORT))) {
Print("[PUB] ####ERROR#### Binding MT4 Server to Socket on Port " + IntegerToString(PUB_PORT) + "..");
return(INIT_FAILED);
} else {
Print("[PUB] Binding MT4 Server to Socket on Port " + IntegerToString(PUB_PORT) + "..");
pubSocket.setSendHighWaterMark(1);
pubSocket.setLinger(0);
}
return(INIT_SUCCEEDED);
}
//+------------------------------------------------------------------+
//| Expert deinitialization function |
//+------------------------------------------------------------------+
void OnDeinit(const int reason) {
Print("[PUSH] Unbinding MT4 Server from Socket on Port " + IntegerToString(PULL_PORT) + "..");
pushSocket.unbind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PULL_PORT));
pushSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PULL_PORT));
Print("[PULL] Unbinding MT4 Server from Socket on Port " + IntegerToString(PUSH_PORT) + "..");
pullSocket.unbind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
pullSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUSH_PORT));
if (Publish_MarketData == true || Publish_MarketRates == true) {
Print("[PUB] Unbinding MT4 Server from Socket on Port " + IntegerToString(PUB_PORT) + "..");
pubSocket.unbind(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_PORT));
pubSocket.disconnect(StringFormat("%s://%s:%d", ZEROMQ_PROTOCOL, HOSTNAME, PUB_PORT));
}
// Shutdown ZeroMQ Context
context.shutdown();
context.destroy(0);
EventKillTimer();
}
//+------------------------------------------------------------------+
//| Expert timer function |
//+------------------------------------------------------------------+
void OnTimer() {
/*
Use this OnTimer() function to get and respond to commands
*/
if(CheckServerStatus() == true) {
// Get client's response, but don't block.
bool hasAllOtherRequest = pullSocket.recv(request, true);
if(hasAllOtherRequest){
if (request.size() > 0) {
// Wait
// pullSocket.recv(request,false);
// MessageHandler() should go here.
ZmqMsg reply = MessageHandler(request);
// Send response, and block
// pushSocket.send(reply);
// Send response, but don't block
if(!pushSocket.send(reply, true)) {
Print("###ERROR### Sending message");
}
}
}
// update prices regularly in case there was no tick within X milliseconds (for non-chart symbols).
if (GetTickCount() >= lastUpdateMillis + MILLISECOND_TIMER_PRICES) OnTick();
}
}
//+------------------------------------------------------------------+
ZmqMsg MessageHandler(ZmqMsg &_request) {
// Output object
ZmqMsg reply;
// Message components for later.
string components[11];
if(_request.size() > 0) {
// Get data from request
ArrayResize(_data, _request.size());
_request.getData(_data);
string dataStr = CharArrayToString(_data);
// Process data
ParseZmqMessage(dataStr, components);
// Interpret data
InterpretZmqMessage(pushSocket, components);
} else {
// NO DATA RECEIVED
}
return(reply);
}
//+------------------------------------------------------------------+
// Interpret Zmq Message and perform actions
void InterpretZmqMessage(Socket &pSocket, string &compArray[]) {
int switch_action = 0;
/* 02-08-2019 10:41 CEST - HEARTBEAT */
if(compArray[0] == "GET_CURRENCY_STRENGTHS")
switch_action = 14;
/* Setup processing variables */
string zmq_ret = "";
string response = "";
string responseArray[10];
string ret = "";
int ticket = -1;
bool ans = false;
/****************************
* PERFORM SOME CHECKS HERE *
****************************/
if (CheckOpsStatus(pSocket, switch_action) == true) {
switch(switch_action) {
case 14:
getCurrencyStrengths(responseArray);
response = response + responseArray[0] + ", ";
response = response + responseArray[1] + ", ";
response = response + responseArray[2] + ", ";
response = response + responseArray[3] + ", ";
response = response + responseArray[4] + ", ";
response = response + responseArray[5];
InformPullClient(pSocket, response);
break;
default:
break;
}
}
}
// Inform Client
void InformPullClient(Socket& pSocket, string message) {
ZmqMsg pushReply(message);
pSocket.send(pushReply,true); // NON-BLOCKING
}
When the error happens then the line to print Print("###ERROR### Sending message"); is hit and causes that to be printed in the other application.
Am I doing this wrong? sendCurrencyStrengthRequest() is scheduled in Spring to be called every 5 seconds and there is another function that will be called every 30mins that uses a different pull and push socket within a different context, all this works for a while and then this error gets thrown, any idea?
I'm assuming that, what's not shown, is the other program paired with this serving as the other endpoints for the PUSH and PULL sockets that you create and connect.
I think that the problem lies in your other program that we're not seeing. Is that receiving the
GET_CURRENCY_STRENGTHS
message, replying, and then immediately terminating, or immediately cleaning up its context in much the same way as this code snippet is?If so, it is the immediate termination / clean up that is the problem. The act of sending a message using ZMQ is non blocking; you're just pushing the message on to a queue that's managed by the thread(s) that ZMQ starts up and manages in the background. If after a ZMQ send() you immediately terminate the program or clean up, that means in all likelihood the management threads haven't actually had any time to do anything at all - maybe they've not even been scheduled by the OS at this point.
The result of the program terminating is that the OS cleans up whatever resources have not yet been cleaned up by the program itself - sockets, allocated memory, threads, the lot.
The fact that this is all on localhost is interesting, because the OS has full visibility of the underlying tcp socket and can tell the connecting end (this program) a lot more about the state of the tcp socket than if the other endpoints were on another computer.
If this is the case, at other end of the connections (in the code snippet you've given us), what's happening is that ZMQ is patiently waiting in a blocked system call for something to come in via a tcp socket. Except, that socket is getting torn down by the OS (or the other end cleaning up). So, the socket read is terminated (because the socket no longer exists) and you get an ugly exception as shown.
At least, that's my guess. If that's correct, try putting in some delay in the other program (that we've not seen) between the zmq send() and the termination of the program, allow things to happen in the background.
The variability in whether your code succeeds or not would be down to the random nature of the threads being scheduled or not following the zmq send(). The communication between the application program and the zme management thread(s) is done via IPC pipes (or other things like semaphores), all of which give the OS an opportunity to re-schedule threads. Sometimes it might say one thread gets scheduled, another time it may not, depending on how much of a time slice the main application thread has had (and a heap of other mysterious, arcane factors).
In General
In ZMQ in particular, and in other Actor Model systems in general, termination is something that has to be agreed on. Because it's buffering messages inside the transport, you've no idea whether "that last, final message" has made it through to the receiving end and that it is safe to terminate. What you need to have is a either a time delay before closing to allow everything to settle down to quiescence, or some explicit messaging acknowledging that the final message has propagated to everywhere.