In order to create multiple connections, I created a package with the functions start(), sendmessage(), readmessage(), and connectionmanager() that are exposed to other packages that create websocket clients. The purpose of this package is to create new clients every 50 seconds from the time we first started the client connection, meaning that we are creating new clients every 50 seconds and closing the old ones after a minute. To manage the connections, I created a connection manager that handles the closing and reconnecting of the clients
In order to avoid sending requests to any closed connections, sendmeesage() is designed to send the message to a randomly selected client from the connectionpool.
I'm new to this, so is there a method to optimise the package as it's taking too long to complete?
this is what i wrote :
type Client struct {
url string
sendMessage chan []byte
readMessage chan []byte
connectionPool []*websocket.Conn
addConnection chan *websocket.Conn
removeConn chan *websocket.Conn
}
func NewClient(url string) *Client {
return &Client{
url: url,
sendMessage: make(chan []byte),
readMessage: make(chan []byte),
connectionPool: make([]*websocket.Conn, 0),
addConnection: make(chan *websocket.Conn),
removeConn: make(chan *websocket.Conn),
}
}
func (c *Client) Connect() {
header := c.GenerateHeader()
// log.Println("Log in Connect() function ")
for {
conn, _, err := websocket.DefaultDialer.Dial(c.url, header)
if err != nil {
log.Printf("Error connecting to server: %v\n", err)
time.Sleep(5 * time.Second) // Retry after 5 seconds
continue
}
c.addConnection <- conn
go c.readMessages(conn)
break
}
}
func (c *Client) addConnectionToList(conn *websocket.Conn) {
c.connectionPool = append(c.connectionPool, conn)
fmt.Println("Connection added. Total connections:", len(c.connectionPool))
}
func (c *Client) removeConnectionFromList(conn *websocket.Conn) {
for i, connection := range c.connectionPool {
if connection == conn {
c.connectionPool = append(c.connectionPool[:i], c.connectionPool[i+1:]...)
break
}
}
fmt.Println("Connection removed. Total connections:", len(c.connectionPool))
}
func (c *Client) readMessages(conn *websocket.Conn) {
defer func() {
// log.Println("i am coming in this function for what !!!")
conn.Close()
c.removeConn <- conn
}()
// log.Println("Log in readmessage() function ")
for {
// log.Println("Log in for of readMessage function ")
_, msg, err := conn.ReadMessage()
if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Println("Connection closed by server.")
break
} else if err != nil {
log.Printf("Error reading message: %v\n", err)
break
}
c.recordMessage(msg)
log.Println("Message received from Server:", string(msg))
}
}
func (c *Client) writeMessages() {
// log.Println("Log in writeMessage() function ")
for {
select {
case msg := <-c.sendMessage:
// log.Println("Log in select of writeMessage() function ")
if len(c.connectionPool) > 0 {
conn := c.connectionPool[rand.Intn(len(c.connectionPool))]
err := conn.WriteMessage(websocket.TextMessage, msg)
if err != nil {
log.Printf("Error writing message: %v\n", err)
return
}
log.Println("Message sent from Client:", string(msg))
}
}
}
}
func (c *Client) SendMessage(message []byte) {
// log.Println("Log in SendMessage() function ")
c.sendMessage <- message
}
func (c *Client) recordMessage(message []byte) {
// log.Println("Log in recordMessage() function ")
c.readMessage <- message
}
func (c *Client) ReadMessage() []byte {
// log.Println("Log in ReadMessage() function ")
return <-c.readMessage
}
func (c *Client) Start() {
go c.writeMessages()
for i := 0; i < 5; i++ {
go c.Connect()
log.Println("Connection Established on Start()", i)
}
}
func (c *Client) ConnectionManager() {
tickerConnect := time.NewTicker(15 * time.Second)
defer tickerConnect.Stop()
// Initially, set disconnect ticker duration to a long duration to prevent immediate disconnect
tickerDisconnect := time.NewTicker(10 * time.Minute)
defer tickerDisconnect.Stop()
for {
select {
case <-tickerConnect.C:
log.Println("Connecting new websockets")
c.Start()
// If the disconnect ticker is still running, stop it
tickerDisconnect.Stop()
// Reset disconnect ticker to trigger after 5 seconds
tickerDisconnect = time.NewTicker(5 * time.Second)
case <-tickerDisconnect.C:
log.Println("Disconnecting old websockets")
c.Disconnect()
// Reset disconnect ticker duration to a long duration
tickerDisconnect.Stop()
tickerDisconnect = time.NewTicker(10 * time.Minute)
case connToAdd := <-c.addConnection:
c.addConnectionToList(connToAdd)
case connToRemove := <-c.removeConn:
c.removeConnectionFromList(connToRemove)
}
}
}
func (c *Client) Disconnect() {
// log.Println("Log in Disconnect() function ")
for i := 0; i < 5; i++ {
log.Println("Closing Connection")
c.connectionPool[i].Close()
}
}