pubsub.get_message() sometimes doesn't receive messages

324 Views Asked by At

Version: redis-py 4.5.1, redis-stack

Platform: Python 3.9.12 on Ubuntu 20.4

Description: To give some context, I have a notifier that publishes a user_info dict in redis every time there is an update in a user. I can see this working fine using redis-stack.

Then I have a websocket that receives a user connection and upon connecting it connects to redis and subscribes to that user's channel like so:

 async def subscribe_redis(self, channel: str):
        try:
            r = aioredis.from_url(settings.REDIS_HOST)
            self.redis = r
            
            pubsub = r.pubsub()
            await pubsub.subscribe(channel)

            asyncio.create_task(self.reader(pubsub))

        except Exception as ex:
            logging.info(f"Error while subscribing to Redis: {str(ex)}")
        
        return True

Upon subscribing to the channel, it waits for messages (in this case, a user_info). The reader function:

async def reader(self, pubsub: aioredis.client.PubSub):
        while True:
            try:                
                if not self.websocket:
                    break

                message = await pubsub.get_message(
                    ignore_subscribe_messages=True,
                    timeout=1.0
                )

                if message is not None:
                    user_info = eval(message["data"])
                    await self.send_user_info(user_info)
            
            except Exception as ex:
                logging.info(str(ex))
                break

        return True

The expected functioning is: I make a change in the user inside my database, the notifier publishes the new user_info in the user's redis pubsub channel, the get_message() receives the user_info.

However, something weird happens: sometimes it works, sometimes it doesn't. Within the same websocket connection, when it works, it always works, meaning the get_message() always gets every new message; and within the same websocket connection, when it doesn't work, it never works, meaning the get_message never receives a message from the channel, even though the new message is being successfully published in redis.

Another weird thing is that in the case when it doesn't work, the redis connection is there and pubsub as well, if I make a redis.ping() and a pubsub.ping() inside the while loop I get a response.

Note that I call the subscribe_redis() function like this:

self.task = asyncio.create_task(
            self.subscribe_redis(redis_channel)
        ) 

Is there something I am not seeing?

0

There are 0 best solutions below