How to reuse a goroutine instead of creating a new goroutine for reach async call

150 Views Asked by At

For this use case, it seems like it might be valid: This is dumbed down code, but you should get the idea -

func Log(v interface{}){

  1. get json - json marshal some large object v
  2. write big buf to a file 

}

.. the above ^^ could be pretty blocking, so maybe want to put in a goroutine:

func Log(v interface{}){

  go func(){
    1. get json - json marshal some large object v
    2. write big buf to a file (not necessarily stdout)
  }()

}

but the problem with that is we are creating a lot of goroutines here, which might defeat the purpose of putting it in a goroutine anyway.

So my question is - is there a way to somehow use a pool of goroutines, and use those. Even just like 10 in a pool would be fabulous.

I assume using channels to communicate with them, but not sure exactly how

Two stipulations:

  1. only need 1 job to be added to the queue at a time (but they are added asynchronously)
  2. if the queue is full, I want to just run on a new goroutine separately, not adding a new worker to the queue
1

There are 1 best solutions below

6
Alexander Mills On

This code basically works. If we were just writing to stdout, then this really would be unnecessary, but since we are writing to multiple files, then could benefit from some goroutine stuff.

package pool

import (
    "sync"
)

type ChanMessage struct {
    f  func(*sync.WaitGroup)
    wg *sync.WaitGroup
}

type Worker struct {
    c      chan *ChanMessage
    mtx    sync.Mutex
    isBusy bool
}

type Pool struct {
    mtx               *sync.Mutex
    Size              int
    workers           []*Worker
    Count             int
    RoundRobinCounter int
}

func (p *Pool) createWorkers() {

    for i := 0; i < p.Size; i++ {

        var w = &Worker{
            c:      make(chan *ChanMessage, 1),
            mtx:    sync.Mutex{},
            isBusy: false,
        }

        go func(w *Worker) {
            for {
                var m = <-w.c
                w.mtx.Lock()
                w.isBusy = true
                p.Count++
                m.f(m.wg)
                m.wg.Wait()
                p.Count--
                w.isBusy = false
                w.mtx.Unlock()
            }
        }(w)

        p.workers = append(p.workers, w)
    }
}

func CreatePool(size int) *Pool {

    var p = &Pool{
        mtx:               &sync.Mutex{},
        Size:              size,
        Count:             0,
        RoundRobinCounter: size + 1,
    }

    p.createWorkers()

    return p
}

func (p *Pool) Run(z func(*sync.WaitGroup)) {

    p.mtx.Lock()

    var wg = &sync.WaitGroup{}
    wg.Add(1)

    if p.Count >= p.Size {
        p.mtx.Unlock()
        // queue is full, so just create a new goroutine here
        go z(wg)
        return
    }

    var m = &ChanMessage{
        f:  z,
        wg: wg,
    }

    for _, v := range p.workers {
        if !v.isBusy {
            v.mtx.Lock()
            p.mtx.Unlock()
            v.isBusy = true
            v.mtx.Unlock()
            v.c <- m
            return
        }
    }

    // couldn't find a non-busy one, so just round robin to next
    p.RoundRobinCounter = (p.RoundRobinCounter + 1) % p.Size
    var v = p.workers[p.RoundRobinCounter]

    p.mtx.Unlock()

    v.mtx.Lock()
    v.isBusy = true
    v.mtx.Unlock()

    v.c <- m

}

here is a test:

package pool_test

import (
    "github.com/oresoftware/json-logging/jlog/pool"
    "sync"
    "testing"
    "time"
)

// Test if the worker pool can handle a simple task
func TestWorkerPoolSimpleTask(t *testing.T) {
    poolSize := 3
    p := pool.CreatePool(poolSize)

    var wg sync.WaitGroup
    wg.Add(1)

    p.Run(func(g *sync.WaitGroup) {
        // Perform a simple task
        t.Logf("Simple task executed")
        g.Done()
        wg.Done()
    })

    // Ensure the pool count is updated correctly
    if p.Count != 0 {
        t.Errorf("Expected pool count to be 0, got %d", p.Count)
    }
}

// Test if the worker pool can handle multiple concurrent tasks
func TestWorkerPoolConcurrentTasks(t *testing.T) {
    poolSize := 5
    p := pool.CreatePool(poolSize)

    numTasks := 10
    var wg sync.WaitGroup
    wg.Add(numTasks)

    for i := 0; i < numTasks; i++ {
        go func(index int) {
            p.Run(func(g *sync.WaitGroup) {
                // Perform a task and log the index
                t.Logf("Task %d executed", index)
                g.Done()
                wg.Done()
            })
        }(i)
    }

    wg.Wait()

    // Ensure the pool count is updated correctly
    if p.Count != 0 {
        t.Errorf("Expected pool count to be 0, got %d", p.Count)
    }
}

// Test if the worker pool can handle more tasks than the pool size
func TestWorkerPoolFullQueue(t *testing.T) {
    poolSize := 3
    p := pool.CreatePool(poolSize)

    numTasks := 5
    var wg sync.WaitGroup
    wg.Add(numTasks)

    for i := 0; i < numTasks; i++ {
        go func(index int) {
            p.Run(func(g *sync.WaitGroup) {
                // Perform a task and log the index
                t.Logf("Task %d executed", index)
                time.Sleep(50 * time.Millisecond) // Simulate task execution time
                g.Done()
                wg.Done()
            })
        }(i)
    }

    wg.Wait()

    // Ensure the pool count is updated correctly
    if p.Count != 0 {
        t.Errorf("Expected pool count to be 0, got %d", p.Count)
    }
}

// Test if the worker pool handles a large number of tasks without deadlocks or panics
func TestWorkerPoolStressTest(t *testing.T) {
    poolSize := 10
    p := pool.CreatePool(poolSize)

    numTasks := 1000
    var wg sync.WaitGroup
    wg.Add(numTasks)

    for i := 0; i < numTasks; i++ {
        go func(index int) {
            p.Run(func(g *sync.WaitGroup) {
                // Perform a task and log the index
                t.Logf("Task %d executed", index)
                g.Done()
                wg.Done()
            })
        }(i)
    }

    wg.Wait()

    // Ensure the pool count is updated correctly
    if p.Count != 0 {
        t.Errorf("Expected pool count to be 0, got %d", p.Count)
    }
}

and here is the real world use case:

func (l *Logger) writeJSON(level shared.LogLevel, mf *MetaFields, args *[]interface{}) {

    date := time.Now().UTC().String()
    date = date[:26]
    var strLevel = shared.LevelToString[level]
    var pid = shared.PID

    if mf == nil {
        mf = NewMetaFields(&MF{})
    }

    shared.StdioPool.Run(func(g *sync.WaitGroup) {

        // TODO: maybe manually generating JSON is better? prob not worth it
        buf, err := json.Marshal([8]interface{}{"@bunion:v1", l.AppName, strLevel, pid, l.HostName, date, mf.m, args})

        if err != nil {

            _, file, line, _ := runtime.Caller(3)
            DefaultLogger.Warn("could not marshal the slice:", err.Error(), "file://"+file+":"+strconv.Itoa(line))

            //cleaned := make([]interface{},0)

            var cache = map[*interface{}]*interface{}{}
            var cleaned = make([]interface{}, 0)

            for i := 0; i < len(*args); i++ {
                // TODO: for now instead of cleanUp, we can ust fmt.Sprintf()
                v := &(*args)[i]
                c := hlpr.CleanUp(v, &cache)
                //debug.PrintStack()
                cleaned = append(cleaned, c)
            }

            buf, err = json.Marshal([8]interface{}{"@bunion:v1", l.AppName, level, pid, l.HostName, date, mf.m, cleaned})

            if err != nil {
                fmt.Println(errors.New("Json-Logging: could not marshal the slice: " + err.Error()))
                g.Done()
                return
            }
        }

        shared.M1.Lock()
        safeStdout.Write(buf)
        safeStdout.Write([]byte("\n"))
        shared.M1.Unlock()
        g.Done()

    })

}

in my case I don't need the waitgroup, since it's all synced out code, but having the wg makes it more generic.