How can we optimize this package where i am creating a multiple websocket clients in go

33 Views Asked by At

In order to create multiple connections, I created a package with the functions start(), sendmessage(), readmessage(), and connectionmanager() that are exposed to other packages that create websocket clients. The purpose of this package is to create new clients every 50 seconds from the time we first started the client connection, meaning that we are creating new clients every 50 seconds and closing the old ones after a minute. To manage the connections, I created a connection manager that handles the closing and reconnecting of the clients

In order to avoid sending requests to any closed connections, sendmeesage() is designed to send the message to a randomly selected client from the connectionpool.

I'm new to this, so is there a method to optimise the package as it's taking too long to complete?

this is what i wrote :

type Client struct {
    url            string
    sendMessage    chan []byte
    readMessage    chan []byte
    connectionPool []*websocket.Conn
    addConnection  chan *websocket.Conn
    removeConn     chan *websocket.Conn
}

func NewClient(url string) *Client {
    return &Client{
        url:            url,
        sendMessage:    make(chan []byte),
        readMessage:    make(chan []byte),
        connectionPool: make([]*websocket.Conn, 0),
        addConnection:  make(chan *websocket.Conn),
        removeConn:     make(chan *websocket.Conn),
    }
}

func (c *Client) Connect() {
    header := c.GenerateHeader()
    // log.Println("Log in Connect() function ")
    for {
        conn, _, err := websocket.DefaultDialer.Dial(c.url, header)
        if err != nil {
            log.Printf("Error connecting to server: %v\n", err)
            time.Sleep(5 * time.Second) // Retry after 5 seconds
            continue
        }
        c.addConnection <- conn
        go c.readMessages(conn)
        break
    }
}

func (c *Client) addConnectionToList(conn *websocket.Conn) {
    c.connectionPool = append(c.connectionPool, conn)
    fmt.Println("Connection added. Total connections:", len(c.connectionPool))
}

func (c *Client) removeConnectionFromList(conn *websocket.Conn) {
    for i, connection := range c.connectionPool {
        if connection == conn {
            c.connectionPool = append(c.connectionPool[:i], c.connectionPool[i+1:]...)
            break
        }
    }
    fmt.Println("Connection removed. Total connections:", len(c.connectionPool))
}

func (c *Client) readMessages(conn *websocket.Conn) {
    defer func() {
        // log.Println("i am coming in this function for what !!!")
        conn.Close()
        c.removeConn <- conn
    }()
    // log.Println("Log in readmessage() function ")
    for {
        // log.Println("Log in for of readMessage function ")
        _, msg, err := conn.ReadMessage()
        if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
            log.Println("Connection closed by server.")
            break
        } else if err != nil {
            log.Printf("Error reading message: %v\n", err)
            break
        }
        c.recordMessage(msg)
        log.Println("Message received from Server:", string(msg))
    }
}

func (c *Client) writeMessages() {
    // log.Println("Log in writeMessage() function ")
    for {
        select {
        case msg := <-c.sendMessage:
            // log.Println("Log in select of writeMessage() function ")
            if len(c.connectionPool) > 0 {
                conn := c.connectionPool[rand.Intn(len(c.connectionPool))]
                err := conn.WriteMessage(websocket.TextMessage, msg)
                if err != nil {
                    log.Printf("Error writing message: %v\n", err)
                    return
                }
                log.Println("Message sent from Client:", string(msg))
            }
        }
    }
}

func (c *Client) SendMessage(message []byte) {
    // log.Println("Log in SendMessage() function ")
    c.sendMessage <- message
}

func (c *Client) recordMessage(message []byte) {
    // log.Println("Log in recordMessage() function ")
    c.readMessage <- message
}

func (c *Client) ReadMessage() []byte {
    // log.Println("Log in ReadMessage() function ")
    return <-c.readMessage
}

func (c *Client) Start() {
    go c.writeMessages()
    for i := 0; i < 5; i++ {
        go c.Connect()
        log.Println("Connection Established on Start()", i)
    }
}

func (c *Client) ConnectionManager() {
    tickerConnect := time.NewTicker(15 * time.Second)
    defer tickerConnect.Stop()

    // Initially, set disconnect ticker duration to a long duration to prevent immediate disconnect
    tickerDisconnect := time.NewTicker(10 * time.Minute)
    defer tickerDisconnect.Stop()

    for {
        select {
        case <-tickerConnect.C:
            log.Println("Connecting new websockets")
            c.Start()

            // If the disconnect ticker is still running, stop it
            tickerDisconnect.Stop()

            // Reset disconnect ticker to trigger after 5 seconds
            tickerDisconnect = time.NewTicker(5 * time.Second)

        case <-tickerDisconnect.C:
            log.Println("Disconnecting old websockets")
            c.Disconnect()

            // Reset disconnect ticker duration to a long duration
            tickerDisconnect.Stop()
            tickerDisconnect = time.NewTicker(10 * time.Minute)

        case connToAdd := <-c.addConnection:
            c.addConnectionToList(connToAdd)

        case connToRemove := <-c.removeConn:
            c.removeConnectionFromList(connToRemove)
        }
    }
}

func (c *Client) Disconnect() {
    // log.Println("Log in Disconnect() function ")
    for i := 0; i < 5; i++ {
        log.Println("Closing Connection")
        c.connectionPool[i].Close()
    }

}
0

There are 0 best solutions below