XACK is not deleting the message, even if it is processed successfully?

76 Views Asked by At

I am trying to implement redis stream where we have a producer.

package producer

import (
    "RedisStream/models"
    "encoding/json"
    "fmt"
    "github.com/garyburd/redigo/redis"
)

type Producer struct {
    streamName string
}

func NewProducer(streamName string) *Producer {
    return &Producer{streamName: streamName}
}

func (p *Producer) WriteEvents(conn redis.Conn, key string) {
    // Create a new struct
    employee := models.Employee{
        Name:     "ashutosh",
        Employer: "self-employee",
    }
    // Convert struct to JSON
    e, _ := json.Marshal(employee)

    // Send key and value to Redis stream
    _, err := conn.Do("XADD", p.streamName, "*", key, e)
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("Successfully sent data to Redis stream")
}

then I have implemented a consumer

func (c *Consumer) ReadEventsCons1() {
    // Connect to Redis
    conn, err := redis.Dial("tcp", ":6379")
    if err != nil {
        fmt.Println(err)
        return
    }
    defer conn.Close()
    for {
        // Read key and value from Redis stream
        reply, err := conn.Do("XREADGROUP", "GROUP", c.groupName[0], "ashu", "COUNT", "1", "STREAMS", c.streamName, ">")
        vs, err := redis.Values(reply, err)
        if err != nil {
            if errors.Is(err, redis.ErrNil) {
                continue
            }
            fmt.Printf("Error: %+v", err)
        }

        // Get the first and only value in the array since we're only
        // reading from one stream "some-stream-name" here.
        vs, err = redis.Values(vs[0], nil)
        if err != nil {
            fmt.Printf("Error: %+v", err)
        }

        // Ignore the stream name as the first value as we already have
        // that in hand! Just get the second value which is guaranteed to
        // exist per the docs, and parse it as some stream entries.
        res, err := entries(vs[1], nil)
        if err != nil {
            fmt.Errorf("error parsing entries: %w", err)
        }
        for _, val := range res {
            for k, v := range val.Fields {
                empl := &models.Employee{}
                _ = json.Unmarshal(v, empl)
                fmt.Printf("From Consumer Ashu:  Key: %s and val: %+v \n", k, empl)
            }
            reply, err := redis.Int(conn.Do("XACK", c.streamName, c.groupName[0], val.ID))
            if reply != 1 {
                fmt.Printf("failed to ack: err: %+v", err)
            }
        }

    }
}

Once a consumer from a consumergroup successfully processed a message, I sent acknowledgement to redis.But messages still resides in redis stream. because post running

XLEN streamName I can see length is growing. This may create memory challenge, since messages are residing in perpetuity. Is there any intelligent way to handle this issue?

0

There are 0 best solutions below