NATS-Jetstream two consumer run issue

93 Views Asked by At

I am using nats-jetstream and in that made two consumers as per below code with diff. subjects and same stream. Single publisher code file sends event to both consumers with its subjects. Used durable consumer with unique name for both.

I deployed this two consumers code in two kuberenetes pods in same namespace also used another single pod for publisher. When publisher sends event to both, the first consumer gets the event. another one does not. I am trying to solve this issue considering as race condition for subscribe event in both consumer pods. Still exact don't know about this issue. Please help me.

Code: 
    func StartNatsJetstream() error {
    fmt.Println("StartNatsJetstream runs")
    StartSubject := common.SoConfig.MessageBus.WorkflowRequestQueue + ".*"
    startConsumer := "tart-consumer" //Consumer name can be anything
    fmt.Printf("StartNatsJetstream runs%s", tartSubject)
    // Connect to the NATS server
    nc, js, err := natsjetstream.SetupJetStream(startSubject)
    if err != nil {
        log.Errorf("Nats server is not running", err)
        return err
    }
    defer nc.Close()
    fmt.Println("Connection got")
    // Subscribe to the stream and process messages
    _, err = js.Subscribe(StartSubject, func(msg *nats.Msg) {
        fmt.Printf("Received message: %s\n", msg.Data)

        // Acknowledge the message to prevent it from being sent again
        err := msg.Ack()
        if err != nil {
            fmt.Printf("Error acknowledging message: %v\n", err)
            return
        }

        // Process the message
        receivedPayload := msg.Data
        var msgData Message
        err = json.Unmarshal([]byte(receivedPayload), &msgData)
        if err != nil {
            fmt.Println("Error unmarshaling JSON:", err)
            return
        }
        //Decoded byte message understand to the user in logs
        decodedPayload, err := base64.StdEncoding.DecodeString(msgData.Payload)
        if err != nil {
            fmt.Println("Error decoding payload:", err)
            return
        }

        var payloadData PayloadData
        err = json.Unmarshal(decodedPayload, &payloadData)
        if err != nil {
            fmt.Println("Error unmarshaling JSON from decoded payload:", err)
            return
        }

        msgData.Payload = string(decodedPayload)

        fmt.Printf("Received message: %+v\n", msgData)
        //Send byte msgData to process workflow req.
        go processMessageNatsJetstream(msgData)

    }, nats.Durable(StartConsumer))
    if err != nil {
        log.Errorf("Error while subscribe", err)
    }
    fmt.Println("Function completed")
    // To keep the goroutine running
    select {}
}

Please help me about to identify the issue.

0

There are 0 best solutions below