How to use inproc transport with pyzmq?

4.2k Views Asked by At

I have set up two small scripts imitating a publish and subscribe procedure with pyzmq. However, I am unable to send messages over to my subscriber client using the inproc transport. I am able to use tcp://127.0.0.1:8080 fine, just not inproc.

pub_server.py

import zmq
import random
import sys
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("inproc://stream")

while True:
    socket.send_string("Hello")
    time.sleep(1)

sub_client.py

import sys
import zmq

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '')
socket.connect("inproc://stream")

for x in range (5):
    string = socket.recv()
    print(string)

How can I successfully alter my code so that I'm able to use the inproc transport method between my two scripts?

EDIT:

I have updated my code to further reflect @larsks comment. I am still not receiving my published string - what is it that I am doing wrong?

import threading
import zmq

def pub():
    context = zmq.Context()
    sender = context.socket(zmq.PUB)
    sender.connect("inproc://hello")
    lock = threading.RLock()

    with lock:
        sender.send(b"")

def sub():
    context = zmq.Context()
    receiver = context.socket(zmq.SUB)
    receiver.bind("inproc://hello")

    pub()

    # Wait for signal
    string = receiver.recv()
    print(string)
    print("Test successful!")

    receiver.close()

if __name__ == "__main__":
    sub()
3

There are 3 best solutions below

5
larsks On

As the name implies, inproc sockets can only be used within the same process. If you were to rewrite your client and server such that there were two threads in the same process you could use inproc, but otherwise this socket type simply isn't suitable for what you're doing.

The documentation is very clear on this point:

The in-process transport passes messages via memory directly between threads sharing a single ØMQ context.

Update

Taking a look at the updated code, the problem that stands out first is that while the documentation quoted above says "...between threads sharing a single ØMQ context", you are creating two contexts in your code. Typically, you will only call zmq.Context() once in your program.

Next, you are never subscribing your subscriber to any messages, so even in the event that everything else was working correctly you would not actually receive any messages.

Lastly, your code is going to experience the slow joiner problem:

There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

The pub/sub model isn't meant for single messages, nor is it meant to be a reliable transport.

So, to sum up:

  • You need to create a shared ZMQ context before you creating your sockets.
  • You probably want your publisher to publish in a loop instead of publishing a single message. Since you're trying to use inproc sockets you're going to need to put your two functions into separate threads.
  • You need to set a subscription filter in order to receive messages.

There is an example using PAIR sockets in the ZMQ documentation that might provide a useful starting point. PAIR sockets are designed for coordinating threads over inproc sockets, and unlike pub/sub sockets they are bidirectional and are not impacted by the "slow joiner" issue.

2
user52610 On

As mention earlier by @larsks, the context object should be the same. Declare the context object globally and use it in both pub and sub functions instead of creating new ones for each.

0
JSON On

Zmq contexts are the only thread safe data type when using zmq and a common context must be shared between threads that use inproc transport, regardless of language. The zmq context points to where io takes place within zmq. Inproc sockets share the same event loop where other transports may (and often will) use separate io loops within separate contexts. Here’s an example for python multi threaded inproc http://zguide2.zeromq.org/py:mtrelay

Mind that zmq does io within an isolated thread pool per context. While it’s possible to increase the io thread count on a given context, loading all io into the same thread pool is not always optimal, which is the case where multiple contexts may be used.

Also note that inproc is possible within the same thread as long as your not using blocking socket types. There are few cases where this is practical, unit testing being the only case I can imagine.