I'm doing my first steps with NATS and see behavior I cannot make sense of, even after reading the docs quite carefully. I have a local NATS server (2.6.5) running. It was started with
./nats-server -js
I generate some messages with the following code:
async def main():
nc = await nats.connect()
js = nc.jetstream()
await js.delete_stream(name="hello")
await js.add_stream(
name="hello",
subjects=["hello"],
)
for i in range(0, 10):
await js.publish("hello", f"hello world: {i}".encode())
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
If I run the code and execute ./nats stream ls
I see 10 messages. Everything fine so far. Next I run my consumer:
async def main():
nc = await nats.connect()
js = nc.jetstream()
sub = await js.pull_subscribe("hello", "hello")
msg_count = 0
while msg_count < 10:
for msg in await sub.fetch(1):
print("Received:", msg.data)
msg_count = msg_count + 1
# Try nack'ing every third message
if msg_count % 3 == 0:
await msg.nak()
else:
await msg.ack()
await nc.close()
if __name__ == "__main__":
asyncio.run(main())
The output shows:
Received: b'hello world: 0'
Received: b'hello world: 1'
Received: b'hello world: 2'
Received: b'hello world: 2'
Received: b'hello world: 3'
Received: b'hello world: 4'
Received: b'hello world: 4'
Received: b'hello world: 5'
Received: b'hello world: 6'
Received: b'hello world: 6'
That makes sense: I pull 10 messages. Every third message is "naked", so it is retrieved again by the next call. If I start the script again the output is:
Received: b'hello world: 7'
Received: b'hello world: 8'
Received: b'hello world: 9'
Received: b'hello world: 9'
And after a few seconds I get a time out. Obviously NATS somehow remembers my script and continues with delivering messages. But I don't get how this happens!? Is there a "global" cursor in the stream? But in that case multiple clients would interfere, which does not make sense to me. So I assume that NATS somehow remembers my client. How? And how would I tell NATS that I want to restart? I would also appreciate a pointer to the docs that I obviously missed!?
When creating a pull subscription, the jetstream client API also creates a durable consumer with matching consumer options, in this case the stream name and a durable name (the second argument).
Source: https://nats.io/blog/jetstream-java-client-03-consume/#durable-vs-ephemeral