using Redis python read in messages already being sent to redis

84 Views Asked by At

A system I'm working on currently is receiving messages in redis. I would like to write a python script that can read in those messages , modify them and return them to a different channel/key in redis.

For example:

 redis-cli  -a 'redis' monitor | grep '"pushName"'

will return :

1707963448.404618 [0 192.168.86.75:51520] "LPUSH" "pushName" "{\"uuid\":"be70\"...,{}}

How do I subscribe to get the messages from pushName because when I try to do this

mobile = r.pubsub()
mobile.subscribe('pushName')
for message in mobile.listen():
    print(message)

Nothing displays execept for :

{'type': 'subscribe', 'pattern': None, 'channel': 'pushName', 'data': 1}

. I already know my connection criteria is fine because I can get a list of the channels when I do this:

index_list = []
for key in r.scan_iter('*'):
    index_list.append(key)

But messages are flying when I do :

 redis-cli  -a 'redis' monitor | grep '"pushName"'
2

There are 2 best solutions below

3
Mark Setchell On

You seem to be confusing debug output with messages, with Redis LISTs and with Redis Pub/Sub.

If your other (real) code is doing LPUSH operations it is appending items to a Redis LIST.

Those LPUSH operations will show up in your redis-cli monitor command because that is a debugging tool showing you all Redis internal operations.

The LPUSH commands (which operate on a Redis LIST) will not show up when you subscribe to Redis Pub/Sub because that is a completely separate feature of Redis and you will only see messages when you subscribe to a topic and when somebody publishes on that topic... but nobody is doing any publishing as such, they are only doing LPUSH to a LIST.

1
Mark Setchell On

Maybe you could adapt things so they work more like you are expecting. If you want to be notified every time there is an LPUSH onto a LIST, you could change your LPUSH to a MULTI/EXEC transaction that does the LPUSH and then PUBLISHES a message to notify all interested parties in one single transaction . That would make your sender look like this:

#!/usr/bin/env python3

from time import sleep
import redis
import numpy as np

# Redis connection
r = redis.Redis(host='192.168.0.10', port=6379, db=0)

# Pub/sub connection
p = r.pubsub()

# Empty list at start of each run
r.delete('myList')

# Push 10 items to Redis LIST, publishing that they are available
N = 10
for i in range(N):
    print(f'DEBUG: Publishing item {i} and notifying subscribers')
    # Start pipeline, i.e. MULTI/EXEC transaction
    pipe = r.pipeline()
    pipe.lpush('myList', i)
    pipe.publish('notifications',f'LPUSHed to myList')
    pipe.execute()
    sleep(1)

Then your receiver might look like this, subscribing to the notifications stream, and grabbing the LIST when notified:

#!/usr/bin/env python3

from time import time, sleep
import redis
import numpy as np

def msgHandler(message):
    """Called whenever a message is published"""
    print(message)
    # See what's in our LIST
    items = r.lrange('myList',0,100)
    print(items)
    # You could put the items into a regular Python queue here for main program to read

# Redis connection
r = redis.Redis(host='192.168.0.10', port=6379, db=0)

# Subscribe to notifications and register callback for when messages arrive
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe(**{'notifications': msgHandler})
# Start a background message receiver
# See https://redis.readthedocs.io/en/stable/advanced_features.html
thread = pubsub.run_in_thread(sleep_time=0.001)

# Do something else for 15 seconds
endTime = time() + 15
while time() < endTime:
    print('Waiting for data:')
    sleep(0.3)

# Stop the redis pub/sub listener
thread.stop()

When you run the receiver, it looks like this:

Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'0']
Waiting for data:
Waiting for data:
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'1', b'0']
Waiting for data:
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'2', b'1', b'0']
...
...
Waiting for data:
Waiting for data:
{'type': 'message', 'pattern': None, 'channel': b'notifications', 'data': b'LPUSHed to myList'}
[b'9', b'8', b'7', b'6', b'5', b'4', b'3', b'2', b'1', b'0']
Waiting for data:

Note that there are actually 3 ways of waiting for messages you are subscribed to and they are described here. Basically, you can use:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)

Or you can use:

for message in p.listen():
    # do something with the message

Or you can use a thread like I did.


Note that I am using a pipeline (MULTI/EXEC) for 2 reasons:

  • it is an atomic transaction, so we know the item will be pushed on the list before the PUBLISH
  • there is less overhead/latency because it is a single round-trip to the server, rather than one round-trip per operation

Note that you can chain the pipeline elements together, so that the 4-line pipeline in my sender code becomes:

r.pipeline().lpush('myList', i).publish('notifications',f'LPUSHed to myList').execute()