My use case:
- The Subscriber will be a server(bind to a port) and it will wait messages from multiple publishers.
- The Publishers will be initialized inside different threads as clients(connect to the port).
- The data to publish in each thread will be a couple of messages.
- It is important when subscriber is connected, to get every message and as soon as possible.
- If subscriber is not connected then I don't want to keep the publisher thread blocked, ideally having a timeout around 1-2s it works.
The slow joiner problem:
Running more than 1000 threads(publishers) only 1 or 2 times I get all the data in Subscriber. Adding a sleep for some milliseconds solves the issue, so I'm 99.9% sure that I'm victim of the well-known slow joiner syndrome. However sleep solution in my case is not a good solution as connect time for publisher can be variable and I want the data to subscriber as soon as possible.
My thoughts and experiment code on solving this issue:
My solution is based on using XPUB recv method. Initialize publisher with XPUB and set RCVTIMEO to 1000ms. After publisher connection, I add a recv()
call for checking if there is a subscriber. When I get the subscribe message, I know that connection has been finalized and that I can send data without any of them being lost (except if something wrong happen to subscriber but I don't care).
In case that I don't get any subscribe message then in 1000ms recv()
times out and the thread is terminated.
Here is a sample code in python(pyzmq) to test this implementation (for publisher I don't use threads but a while loop and running multiple publishers at the same time) and it works as I wanted to:
publisher.py:
import zmq
def main():
""" main method """
i = 0
while True:
# Prepare context and publisher
context = zmq.Context()
publisher = context.socket(zmq.XPUB)
publisher.connect("tcp://0.0.0.0:5650")
publisher.setsockopt(zmq.RCVTIMEO, 1000)
# Waiting for 1000ms to get a subscription
i = i + 1
try:
publisher.recv()
# Send the message
publisher.send_multipart([b'test', bytes(str(i),'utf-8')])
except Exception as e:
print(e, flush=True)
# Terminate socket and context
publisher.close()
context.term()
if i >= 10000:
break
if __name__ == "__main__":
main()
subscriber.py:
import zmq
def main():
""" main method """
# Prepare our context and subscriber
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
uri = "tcp://0.0.0.0:5650"
subscriber.bind(uri)
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
print('Subscriber connects to %s' % (uri), flush=True)
# Receive messages
i = 0
while True:
[topic, data] = subscriber.recv_multipart()
i = i + 1
print("%s: %s %s" % (i, topic, data), flush=True)
if __name__ == "__main__":
main()
My question:
Is the solution that simple? Am I missing anything(related to slow joiner) that will cause loss of data if there is a subscriber active?
The very contrary. For what was posted above, the solution is over-complicated w.r.t. the so far posted use-case requirements.
a) Given the requirements above, it is possible to eliminate all the costs associated with the setup & maintenance of the ISO-OSI-L3
tcp://
Transport Class, when communicating among threads co-located on the same host belonging to the same process. Rather go in for the ultra-fast, stack-less, memory-mappedinproc://
Transport Class to avoid all of these inefficiencies. ZeroMQ API v4.0+ have also the comfort of no other conditions on setting up theinproc://
-TransportClass{ .bind() | .connect() }
-order of appearance, so we may enjoy the utmost MEM-mapped ultra-low latency flagging of Zero-Copy "transported" messages (without moving a byte of in-RAM data) - cool, isn't it? ( unless you need a MITM protocol-sniffing to be injected, remove thetcp://
overkill )b) Given the requirements above, a delivery of a couple of messages, where the "static"
SUB
-side subscribes to all of them, is enormously inefficient use of thePUB/SUB
Scalable Formal Communications Pattern Archetype. Your code has to pay all costs to setup a newSUB
-instance, then it crawls to setup a valid connection (over thetcp://
-TransportClass' stack, hopefully removed under a) ), next wrangling to setup a new TOPIC-filter (be it operated on the SUB-side in earlier versions or on the PUB-side in the newer ZeroMQ releases -- all at remarkable costs of doing so just to receive all messages -- i.e. no filtering at all). The same formal service is achievable with a way more lightweight many-nodes-PUSH/PULL
-on-one-node. If there is no other need for any reverse / bi-directional / more complex formal communications, this just onePUSH/PULL
will be able to do the job requested.c) Given the requirements above, your accent seems to have been put on not losing messages by prematurely sending them over the connection. There are tools for ascertaining that in ZeroMQ settings, yet you take no care to use them:
zmq.IMMEDIATE
may use the blocking state of an AccessNode in cases no ready-made connection is working yet (or ever)errno
( orzmq.errno()
for POSIX-incompatible operating systems / Win32 and likes ) handling may help your code to detect & to react to any and all specific situations that happen in "network-of-autonomous-agents" throughout the whole span of its lifecycle in the distributed-computing ( no matter if the agents are indeed actually "physically" distributed or co-located, as is the case here ). Not losing the control is the core responsibility here. What is a control-code, that self-locks in a lost-control state, where it cannot control even itself ;) ?d) Never use a blocking form of the
{ .recv() | .send() | .poll() | ... }
-methods. Schoolbook examples are rather anti-patterns of how a professional signaling/messaging metaplane implementation ought look like. Indeed never - ref. item 5) above.e) Better re-use a
Context()
-instance, instead of making it a consumable/disposable as was sketched above. Threads can freely share a pre-instantiatedContext()
-engine, avoiding next huge amounts of repetitive add-on overhead costs, if re-instantiating a consumable/disposableContext()
per each forked, just a short-lived, peer client thread.f) If anybody knows a better solution, keep us posted :o)
Questions from comments
Sure, NP here.
{ pgm:// | epgm:// | tipc:// }
-transports might be interesting here if going further into higher performance levels directionWell, not mentioned in the O/P. Any layering of
XPUB
s/XSUB
s may work well, the problems are on connection-management levelSure, having no subscriber available on an RTO-connected link, ready for an immediate delivery "across the wire", no messages could be ever delivered ( and could be silently dropped, which is what you try to fight against, don't you? ). That is what
zmq.IMMEDIATE
can manage via a call to a.setsockopt()
-method.