I need to implement a client for testing UDP + protobuf server to test the performance of the server. The client needs to implement the timeout retransmission function. Now I have a basic idea, but I don't know if there is a better implementation solution or an existing implementation solution.
A known:
- The communication mode between the client and the server is a request-response mode similar to http;
- The client and server will add
seq uint32andcmd uint32before the package before sending it; - A set of request-response corresponding cmd numbers is maintained. For example: the response cmd of a request with cmd 0x01 is 0x02;
- The seq number is a self-increasing number. When the server returns a response, it will add the same seq as the request in front of the package;
- The client can use seq and cmd to determine which request the received UDP packet is a response to.
When designing the logic of timeout retransmission, the basic data structure and functions I implemented are as follows:
type ResponseResult struct {
Response []byte
Err error
}
type MessageStatus struct {
SendData PackageData
Timestamp time.Time
FirstSent time.Time // Time when the message was first sent
LastAcked time.Time // Time when the message was last acknowledged
Attempts int // Number of times the message has been sent
IsAcknowledged bool
ResponseTime time.Duration // Time taken for the message to be acknowledged
ResponseChan chan *ResponseResult
}
type Tracker struct {
cmdToMsgStatus map[uint32]*MessageStatus // cmd to MessageStatus. Normal requests are placed here
seqToMsgStatus map[uint32]*MessageStatus // seq to MessageStatus. Some requests may be sent multiple times in a short period of time, and cannot use cmd as the key. Instead, use seq as the key
messages *list.List[*MessageStatus] // a linked list sorted by sending order
mutex *sync.RWMutex
}
// GetUnacknowledgedMessages return unacknowledged messages.
func (t *Tracker) GetUnacknowledgedMessages() chan *PackageData {
reSendData := make(chan *PackageData)
go func() {
defer close(reSendData)
for {
t.mutex.Lock()
nowTime := time.Now()
if t.messages.Len() <= 0 {
t.mutex.Unlock()
return
}
e := t.messages.Front()
status := e.Value
if nowTime.Sub(status.Timestamp) < t.RetryTime {
t.mutex.Unlock()
return
}
if status.IsAcknowledged {
delete(t.cmdToMsgStatus, status.SendData.Seq)
t.messages.Remove(e)
t.mutex.Unlock()
continue
}
if status.Attempts >= t.RetryLimit {
status.ResponseChan <- &ResponseResult{
Response: nil,
Err: fmt.Errorf("cmd: [%d], seq: [%d], exceeded retry limit", status.SendData.Cmd, status.SendData.Seq),
}
delete(t.cmdToMsgStatus, status.SendData.Seq)
t.messages.Remove(e) // Remove the message from the queue
t.mutex.Unlock()
continue
}
status.Timestamp = nowTime
t.messages.MoveToBack(e)
reSendData <- &status.SendData
status.Attempts++
t.RetryCount += status.Attempts
t.mutex.Unlock()
t.SendPreSecond.Add(1)
}
}()
return reSendData
}
There is a goroutine that regularly calls GetUnacknowledgedMessages to obtain all timed-out requests and resend them.
The GetUnacknowledgedMessages function keeps getting the elements of the first node from t.messages. Determine whether the request times out. If it times out, resend it and put the current node at the end of the linked list. If it does not time out, break of the loop.
Are there any risks in implementing this? Is there any suitable implementation method?
Looking forward to discussing with you, thank you!