Librdkafka Go consumer and websocket issue

164 Views Asked by At

I am trying to write some simple Go code that uses librdkafka Kafka Consumer, consumes a message on a Kafka topic and publishes that message to HTTP (upgraded to a websocket) endpoint. Something like the following

Kafka Topic(myTopic) --> Go --> Client uses HTTP but gets upgraded to Websockets and keeps an open connection and consumes all messages from the Kafka topic through the Go code. Here is what I have tried so far. While I am able to consume messages on the topic as well as test my websockets code separately, I am unable to stitch the two together. I am unable to figure out how to pass the Kafka message to the upgraded websocket connection.

At this stage, I have kept all the code in a single file, which is against the best practices. I will refactor the code once I get it working.


import (
    "fmt"
    "log"
    "net/http"

    "github.com/gorilla/websocket"
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

func reader(conn *websocket.Conn) {
    messageType, p, err := conn.ReadMessage()

    if err != nil {
        log.Println(err)
        return
    }

    log.Println(string(p))

    if err := conn.WriteMessage(messageType, p); err != nil {
        log.Println(err)
        return
    }

}

//function that will be triggered with the / handler
func homePage(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Home Page")
}

//function that will be triggered with the /ws handler
func wsEndpoint(w http.ResponseWriter, r *http.Request) {
    upgrader.CheckOrigin = func(r *http.Request) bool { return true }

    ws, err := upgrader.Upgrade(w, r, nil)

    if err != nil {
        log.Println(err)
    }

    log.Println("Client Succesfully connected")
    reader(ws)

}

//the handlers are initialized when the code is run.
func setupRoutes() {
    http.HandleFunc("/", homePage)
    http.HandleFunc("/ws", wsEndpoint)
}

func main() {

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    //Subscribe to "myTopic". Any message produced on myTopic will be consumed by this KafkaConsumer
    c.SubscribeTopics([]string{"myTopic"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))

        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

    c.Close()

    setupRoutes()
    log.Fatal(http.ListenAndServe(":8081", nil))
}

0

There are 0 best solutions below