package main
import (
"container/heap"
"fmt"
"github.com/sirupsen/logrus"
"io"
"math/rand"
"net/http"
_ "net/http/pprof"
"sync"
"time"
)
// Item Base on Golang official demo
// An Item is something we manage in a priority queue.
type Item struct {
Data interface{} // The value of the item; arbitrary.
Priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
Index int // The index of the item in the heap.
}
// A SafePriorityQueue implements heap.Interface and holds Items.
type SafePriorityQueue struct {
IsMax bool // set type: max or min heap
Lock sync.RWMutex
Items []*Item
}
func (spq SafePriorityQueue) Len() int {
spq.Lock.RLock()
size := len(spq.Items)
spq.Lock.RUnlock()
return size
}
func (spq SafePriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
if !spq.IsMax {
return spq.Items[i].Priority < spq.Items[j].Priority
}
return spq.Items[i].Priority > spq.Items[j].Priority
}
func (spq SafePriorityQueue) Swap(i, j int) {
spq.Lock.Lock()
spq.Items[i], spq.Items[j] = spq.Items[j], spq.Items[i]
spq.Items[i].Index = i
spq.Items[j].Index = j
spq.Lock.Unlock()
}
func (spq *SafePriorityQueue) Push(x interface{}) {
spq.Lock.Lock()
defer spq.Lock.Unlock()
n := len(spq.Items)
item := x.(*Item)
item.Index = n
spq.Items = append(spq.Items, item)
}
func (spq *SafePriorityQueue) Pop() interface{} {
spq.Lock.Lock()
defer spq.Lock.Unlock()
old := *spq
n := len(old.Items)
item := old.Items[n-1]
old.Items[n-1] = nil
spq.Items = old.Items[0 : n-1]
return item
}
// Update modifies the priority and value of an Item in the queue.
func (spq *SafePriorityQueue) Update(item *Item, priority int) {
item.Priority = priority
heap.Fix(spq, item.Index)
}
func (spq *SafePriorityQueue) Show() []*Item {
spq.Lock.RLock()
item := spq.Items
spq.Lock.RUnlock()
return item
}
var testQ = &SafePriorityQueue{
Lock: sync.RWMutex{},
Items: []*Item{},
}
var run = true
func main() {
for i := 0; i < 10; i++ {
go func(q *SafePriorityQueue) {
for {
s := rand.Intn(100)
time.Sleep(time.Millisecond * time.Duration(s))
task := &Item{
Data: "",
Priority: int(time.Now().UnixMilli()) + rand.Intn(1000),
}
if run {
//logrus.Infof("mutex:%p", &q.Lock)
heap.Push(q, task)
}
}
}(testQ)
}
go func() {
for {
if testQ.Len() == 0 {
time.Sleep(time.Millisecond * 50)
continue
}
item := heap.Pop(testQ).(*Item)
priority := int64(item.Priority)
logrus.Info("pop item priority,", GetTimeStr(time.UnixMilli(priority)))
}
}()
http.HandleFunc("/show", Show)
http.HandleFunc("/stop", Stop)
http.HandleFunc("/pop", Pop)
_ = http.ListenAndServe("0.0.0.0:6060", nil)
}
func Stop(w http.ResponseWriter, r *http.Request) {
x := testQ.Len()
run = !run
_, _ = io.WriteString(w, fmt.Sprintf("%d:stopped!\n", x))
}
func Pop(w http.ResponseWriter, r *http.Request) {
if testQ.Len() == 0 {
_, _ = io.WriteString(w, "noting to pop")
return
}
item := heap.Pop(testQ).(*Item)
priority := int64(item.Priority)
logrus.Info("pop item priority,", priority)
_, _ = io.WriteString(w, fmt.Sprintf("pop item priority:%d", priority))
}
func Show(w http.ResponseWriter, r *http.Request) {
x := testQ.Len()
logrus.WithField("pq", 1).Info("len:", x)
_, _ = io.WriteString(w, fmt.Sprintf("%d:show!\n", x))
}
var defLocal, _ = time.LoadLocation("Asia/Shanghai")
func GetTimeStr(t time.Time) string {
return t.In(defLocal).Format("2006-01-02 15:04:05.000")
}
Recently I take over the job of a colleague, sometimes the program hangs, the Pop goroutine doesn't pop anything, I checked the implementation of SafePriorityQueue, and found that he misused value receiver and pointer receiver, so I wrote this test program. Mostly the program panic, index out of range, I know why it panic, but sometimes it hangs, the pop goroutine sometimes hangs when call if testQ.Len() == 0 and other times when call item := heap.Pop(testQ).(*Item). What I’m confused is that who block it, since value receiver func always copy the struct, so the lock is a new lock, why it blocked forever when call testQ.Len() or heap.Pop(testQ).
