Can I make multiple consumers for multiple subjects on one stream on NATS JetStream?

2.6k Views Asked by At

I am trying to make it so that I can add multiple subscriptions to multiple subjects on a single stream using NATS JetStream. My question is, is this possible? Have I gone the right direction or am I doing something wrong? I'm using the Pull subscribe feature of NATS JetStream to make a pull/request relationship between my NATS Server and where information is being passed in, but I don't see how I can keep both subjects open at the same time. Also if anyone has any suggestions on how to do this I would greatly appreciate that!

Here is the code (consumerNames is a hashmap that stores the consumer name, as well as the subject name for the stream):

for consumerName , subjectName := range consumerNames {
    if _, err := js.AddConsumer(streamConfig.Name, &nats.ConsumerConfig {
        // Durable means that the consumer receives messages even if they are offline
        Durable: consumerName,
        DeliverySubject: subjectName,
        AckPolicy: nats.AckExplicitPolicy
    } 
    ); err != nil {
        log.Fatalf("Can not add consumer to specified subject in stream: %v", err)
    }
    //Pull messages to consume - Bind function binds the consumer to the stream
        sub, err := js.PullSubscribe(subjectName, consumerName, nats.Bind(streamConfig.Name,      c    consumerName), nats.MaxDeliver(2))
    if err != nil {
        log.Fatalf("Can not Subscribe to messages: %v", err)
    }
}
2

There are 2 best solutions below

0
On

DeliverySubject seems to belong to push consumers... Just leave it empty. If you need a filter then use FilterSubject. This should do the trick.

https://pkg.go.dev/github.com/nats-io/nats.go#StreamConfig

https://pkg.go.dev/github.com/nats-io/nats.go#ConsumerConfig

js.AddStream(&nats.StreamConfig{
    Name:     "ORDERS",
    Subjects: []string{"ORDERS.*"},
})

For more context take a look at @Jarema's answer

0
On

For some context:

  • Stream can have many subjects.
  • Consumer can have one filter subject, or none at all.
  • One Stream can have many Consumers (consumer is a view into a stream)
  • That filter subject can have wildcards, so if you have a stream:
StreamConfig {
  Subjects: []string{"events.one", "events.two", "events.three", "data.one", "data.two"}
}

you can create a consumer that:

  1. Gets every message for every subject without specifying a filter at all
  2. Gets only events messages with filter events.* or events.>

So either you create one consumer that gets all messages for all subjects on the stream, or many consumers for one stream, each with specific filter. That is ok too.

As of nats-server 2.9, you cannot specify multiple filters for one consumer, but nats-server 2.10 and clients will support a consumer with multiple filters, so that often removes a need for having multiple consumers for a single stream.

I would also recommend using the new JS API, that simplifies usage:

/ Create a JetStream management interface
js, _ := jetstream.New(nc)

// get stream handle
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
    Durable:   "foo",
    AckPolicy: jetstream.AckExplicitPolicy,
})

// get messages for the consumer
consContext, _ := c.Consume(func(msg jetstream.Msg) {
    fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
})
defer consContext.Stop()

docs: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md

@b2f answer is correct, too. Deliver Subject is meant for Push Consumers. You can't have it (and don't need it) for Pull Consumers.