NATS (nats.py) Jetstream with multiple consumers in one group

265 Views Asked by At

Can I create multiple consumers to one stream and one group?

What I want to achieve:

  • one stream "test-stream" with subjects "test.*"
  • two consumers:
    • consumer with subject "test.first"
    • consumer with subject "test.second"
  • consumers in one queue group "test-group" to start multiple worker-apps, group "test-group" will be as load balancer

So, I wrote this code with nats.py:

import asyncio
import nats
from nats.aio.msg import Msg

async def first_cb(msg: Msg):
    print("first")
    await msg.ack()

async def second_cb(msg: Msg):
    print("second")
    await msg.ack()

async def main():
    nc = await nats.connect(
        servers="nats://localhost:4222",
    )

    js = nc.jetstream()
    await js.add_stream(name="test-stream", subjects=["test.*"])

    await js.subscribe("test.first", "test-group", cb=first_cb, manual_ack=True)
    await js.subscribe("test.second", "test-group", cb=second_cb, manual_ack=True)

    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())

When I publish multiple messages with subject "test.first" next happens:

first
first
second
first

What I did wrong? Consumers should filter two different subjects, but consumer with subject "test.second" got message from "test.first".

If I make different queue groups for each consumer - works fine, but this is not exact wanted behaviour.

0

There are 0 best solutions below