Discussion on using golang to implement UDP client timeout retransmission

45 Views Asked by At

I need to implement a client for testing UDP + protobuf server to test the performance of the server. The client needs to implement the timeout retransmission function. Now I have a basic idea, but I don't know if there is a better implementation solution or an existing implementation solution.

A known:

  1. The communication mode between the client and the server is a request-response mode similar to http;
  2. The client and server will add seq uint32 and cmd uint32 before the package before sending it;
  3. A set of request-response corresponding cmd numbers is maintained. For example: the response cmd of a request with cmd 0x01 is 0x02;
  4. The seq number is a self-increasing number. When the server returns a response, it will add the same seq as the request in front of the package;
  5. The client can use seq and cmd to determine which request the received UDP packet is a response to.

When designing the logic of timeout retransmission, the basic data structure and functions I implemented are as follows:

type ResponseResult struct {
    Response []byte
    Err      error
}

type MessageStatus struct {
    SendData       PackageData
    Timestamp      time.Time
    FirstSent      time.Time // Time when the message was first sent
    LastAcked      time.Time // Time when the message was last acknowledged
    Attempts       int // Number of times the message has been sent
    IsAcknowledged bool
    ResponseTime   time.Duration // Time taken for the message to be acknowledged
    ResponseChan   chan *ResponseResult
}

type Tracker struct {
    cmdToMsgStatus map[uint32]*MessageStatus // cmd to MessageStatus. Normal requests are placed here
    seqToMsgStatus map[uint32]*MessageStatus // seq to MessageStatus. Some requests may be sent multiple times in a short period of time, and cannot use cmd as the key. Instead, use seq as the key
    messages       *list.List[*MessageStatus] // a linked list sorted by sending order
    mutex          *sync.RWMutex
}

// GetUnacknowledgedMessages return unacknowledged messages.
func (t *Tracker) GetUnacknowledgedMessages() chan *PackageData {
    reSendData := make(chan *PackageData)

    go func() {
        defer close(reSendData)

        for {
            t.mutex.Lock()
            nowTime := time.Now()

            if t.messages.Len() <= 0 {
                t.mutex.Unlock()
                return
            }

            e := t.messages.Front()
            status := e.Value

            if nowTime.Sub(status.Timestamp) < t.RetryTime {
                t.mutex.Unlock()
                return
            }

            if status.IsAcknowledged {
                delete(t.cmdToMsgStatus, status.SendData.Seq)
                t.messages.Remove(e)
                t.mutex.Unlock()
                continue
            }

            if status.Attempts >= t.RetryLimit {
                status.ResponseChan <- &ResponseResult{
                    Response: nil,
                    Err:      fmt.Errorf("cmd: [%d], seq: [%d], exceeded retry limit", status.SendData.Cmd, status.SendData.Seq),
                }
                delete(t.cmdToMsgStatus, status.SendData.Seq)
                t.messages.Remove(e) // Remove the message from the queue
                t.mutex.Unlock()
                continue
            }
            status.Timestamp = nowTime
            t.messages.MoveToBack(e)

            reSendData <- &status.SendData
            status.Attempts++
            t.RetryCount += status.Attempts
            t.mutex.Unlock()
            t.SendPreSecond.Add(1)
        }
    }()

    return reSendData
}

There is a goroutine that regularly calls GetUnacknowledgedMessages to obtain all timed-out requests and resend them.

The GetUnacknowledgedMessages function keeps getting the elements of the first node from t.messages. Determine whether the request times out. If it times out, resend it and put the current node at the end of the linked list. If it does not time out, break of the loop.

Are there any risks in implementing this? Is there any suitable implementation method?

Looking forward to discussing with you, thank you!

0

There are 0 best solutions below