goroutine hangs forever when using value receiver with mutex

52 Views Asked by At
package main

import (
    "container/heap"
    "fmt"
    "github.com/sirupsen/logrus"
    "io"
    "math/rand"
    "net/http"
    _ "net/http/pprof"
    "sync"
    "time"
)


// Item Base on Golang official demo
// An Item is something we manage in a priority queue.
type Item struct {
    Data     interface{} // The value of the item; arbitrary.
    Priority int         // The priority of the item in the queue.
    // The index is needed by update and is maintained by the heap.Interface methods.
    Index int // The index of the item in the heap.
}

// A SafePriorityQueue implements heap.Interface and holds Items.
type SafePriorityQueue struct {
    IsMax bool // set type: max or min heap
    Lock  sync.RWMutex
    Items []*Item
}

func (spq SafePriorityQueue) Len() int {
    spq.Lock.RLock()
    size := len(spq.Items)
    spq.Lock.RUnlock()
    return size
}

func (spq SafePriorityQueue) Less(i, j int) bool {
    // We want Pop to give us the highest, not lowest, priority so we use greater than here.
    if !spq.IsMax {
        return spq.Items[i].Priority < spq.Items[j].Priority
    }
    return spq.Items[i].Priority > spq.Items[j].Priority
}

func (spq SafePriorityQueue) Swap(i, j int) {
    spq.Lock.Lock()
    spq.Items[i], spq.Items[j] = spq.Items[j], spq.Items[i]
    spq.Items[i].Index = i
    spq.Items[j].Index = j
    spq.Lock.Unlock()
}

func (spq *SafePriorityQueue) Push(x interface{}) {
    spq.Lock.Lock()
    defer spq.Lock.Unlock()
    n := len(spq.Items)
    item := x.(*Item)
    item.Index = n
    spq.Items = append(spq.Items, item)
}

func (spq *SafePriorityQueue) Pop() interface{} {
    spq.Lock.Lock()
    defer spq.Lock.Unlock()
    old := *spq
    n := len(old.Items)
    item := old.Items[n-1]
    old.Items[n-1] = nil
    spq.Items = old.Items[0 : n-1]
    return item
}

// Update modifies the priority and value of an Item in the queue.
func (spq *SafePriorityQueue) Update(item *Item, priority int) {
    item.Priority = priority
    heap.Fix(spq, item.Index)
}

func (spq *SafePriorityQueue) Show() []*Item {
    spq.Lock.RLock()
    item := spq.Items
    spq.Lock.RUnlock()
    return item
}

var testQ = &SafePriorityQueue{
    Lock:  sync.RWMutex{},
    Items: []*Item{},
}

var run = true

func main() {

    for i := 0; i < 10; i++ {
        go func(q *SafePriorityQueue) {
            for {
                s := rand.Intn(100)
                time.Sleep(time.Millisecond * time.Duration(s))
                task := &Item{
                    Data:     "",
                    Priority: int(time.Now().UnixMilli()) + rand.Intn(1000),
                }
                if run {
                    //logrus.Infof("mutex:%p", &q.Lock)
                    heap.Push(q, task)
                }
            }
        }(testQ)
    }
    go func() {
        for {
            if testQ.Len() == 0 {
                time.Sleep(time.Millisecond * 50)
                continue
            }
            item := heap.Pop(testQ).(*Item)
            priority := int64(item.Priority)
            logrus.Info("pop item priority,", GetTimeStr(time.UnixMilli(priority)))
        }
    }()
    http.HandleFunc("/show", Show)
    http.HandleFunc("/stop", Stop)
    http.HandleFunc("/pop", Pop)
    _ = http.ListenAndServe("0.0.0.0:6060", nil)
}

func Stop(w http.ResponseWriter, r *http.Request) {
    x := testQ.Len()
    run = !run
    _, _ = io.WriteString(w, fmt.Sprintf("%d:stopped!\n", x))
}

func Pop(w http.ResponseWriter, r *http.Request) {
    if testQ.Len() == 0 {
        _, _ = io.WriteString(w, "noting to pop")
        return
    }
    item := heap.Pop(testQ).(*Item)
    priority := int64(item.Priority)
    logrus.Info("pop item priority,", priority)
    _, _ = io.WriteString(w, fmt.Sprintf("pop item priority:%d", priority))
}

func Show(w http.ResponseWriter, r *http.Request) {
    x := testQ.Len()
    logrus.WithField("pq", 1).Info("len:", x)
    _, _ = io.WriteString(w, fmt.Sprintf("%d:show!\n", x))
}

var defLocal, _ = time.LoadLocation("Asia/Shanghai")

func GetTimeStr(t time.Time) string {
    return t.In(defLocal).Format("2006-01-02 15:04:05.000")
}

Recently I take over the job of a colleague, sometimes the program hangs, the Pop goroutine doesn't pop anything, I checked the implementation of SafePriorityQueue, and found that he misused value receiver and pointer receiver, so I wrote this test program. Mostly the program panic, index out of range, I know why it panic, but sometimes it hangs, the pop goroutine sometimes hangs when call if testQ.Len() == 0 and other times when call item := heap.Pop(testQ).(*Item). What I’m confused is that who block it, since value receiver func always copy the struct, so the lock is a new lock, why it blocked forever when call testQ.Len() or heap.Pop(testQ).

0

There are 0 best solutions below