I am trying to write some simple Go code that uses librdkafka Kafka Consumer, consumes a message on a Kafka topic and publishes that message to HTTP (upgraded to a websocket) endpoint. Something like the following
Kafka Topic(myTopic) --> Go --> Client uses HTTP but gets upgraded to Websockets and keeps an open connection and consumes all messages from the Kafka topic through the Go code. Here is what I have tried so far. While I am able to consume messages on the topic as well as test my websockets code separately, I am unable to stitch the two together. I am unable to figure out how to pass the Kafka message to the upgraded websocket connection.
At this stage, I have kept all the code in a single file, which is against the best practices. I will refactor the code once I get it working.
import (
"fmt"
"log"
"net/http"
"github.com/gorilla/websocket"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func reader(conn *websocket.Conn) {
messageType, p, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
log.Println(string(p))
if err := conn.WriteMessage(messageType, p); err != nil {
log.Println(err)
return
}
}
//function that will be triggered with the / handler
func homePage(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Home Page")
}
//function that will be triggered with the /ws handler
func wsEndpoint(w http.ResponseWriter, r *http.Request) {
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
}
log.Println("Client Succesfully connected")
reader(ws)
}
//the handlers are initialized when the code is run.
func setupRoutes() {
http.HandleFunc("/", homePage)
http.HandleFunc("/ws", wsEndpoint)
}
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
//Subscribe to "myTopic". Any message produced on myTopic will be consumed by this KafkaConsumer
c.SubscribeTopics([]string{"myTopic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
setupRoutes()
log.Fatal(http.ListenAndServe(":8081", nil))
}