Idiomatic variable-size worker pool in Go

9k Views Asked by At

I'm trying to implement a pool of workers in Go. The go-wiki (and Effective Go in the Channels section) feature excellent examples of bounding resource use. Simply make a channel with a buffer that's as large as the worker pool. Then fill that channel with workers, and send them back into the channel when they're done. Receiving from the channel blocks until a worker is available. So the channel and a loop is the entire implementation -- very cool!

Alternatively one could block on sending into the channel, but same idea.

My question is about changing the size of the worker pool while it's running. I don't believe there's a way to change the size of a channel. I have some ideas, but most of them seem way too complicated. This page actually implements a semaphore using a channel and empty structs in much the same way, but it has the same problem (these things come up all the time while Googling for "golang semaphore".

2

There are 2 best solutions below

2
On BEST ANSWER

I would do it the other way round. Instead of spawning many goroutines (which still require a considerable amount of memory) and use a channel to block them, I would model the workers as goroutines and use a channel to distribute the work. Something like this:

package main

import (
    "fmt"
    "sync"
)

type Task string

func worker(tasks <-chan Task, quit <-chan bool, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case task, ok := <-tasks:
            if !ok {
                return
            }
            fmt.Println("processing task", task)
        case <-quit:
            return
        }
    }
}

func main() {
    tasks := make(chan Task, 128)
    quit := make(chan bool)
    var wg sync.WaitGroup

    // spawn 5 workers
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute some tasks
    tasks <- Task("foo")
    tasks <- Task("bar")

    // remove two workers
    quit <- true
    quit <- true

    // add three more workers
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go worker(tasks, quit, &wg)
    }

    // distribute more tasks
    for i := 0; i < 20; i++ {
        tasks <- Task(fmt.Sprintf("additional_%d", i+1))
    }

    // end of tasks. the workers should quit afterwards
    close(tasks)
    // use "close(quit)", if you do not want to wait for the remaining tasks

    // wait for all workers to shut down properly
    wg.Wait()
}

It might be a good idea to create a separate WorkerPool type with some convenient methods. Also, instead of type Task string it is quite common to use a struct that also contains a done channel that is used to signal that the task had been executed successfully.

Edit: I've played around a bit more and came up with the following: http://play.golang.org/p/VlEirPRk8V. It's basically the same example, with a nicer API.

0
On

A simple change that can think is to have a channel that controls how big is the semaphore. The relevant part is the select statements. If there is more work from the queue process it with the current semaphore. If there is a request to change the size of the semaphore change it and continue processing the req queue with the new semaphore. Note that the old one is going to be garbage collected.

package main

import "time"
import "fmt"

type Request struct{ num int }
var quit chan struct{} = make(chan struct{})

func Serve(queue chan *Request, resize chan int, semsize int) {
    for {
        sem := make(chan struct{}, semsize)
        var req *Request
        select {
        case semsize = <-resize:
            {
                sem = make(chan struct{}, semsize)
                fmt.Println("changing semaphore size to ", semsize)
            }
        case req = <-queue:
            {
                sem <- struct{}{}   // Block until there's capacity to process a request.
                go handle(req, sem) // Don't wait for handle to finish.
            }
                case <-quit:
                     return
        }

    }
}

func process(r *Request) {
  fmt.Println("Handled Request", r.num)
}

func handle(r *Request, sem chan struct{}) {
    process(r) // May take a long time & use a lot of memory or CPU
    <-sem      // Done; enable next request to run.
}

func main() {
    workq := make(chan *Request, 1)
    ctrlq := make(chan int)
    go func() {
        for i := 0; i < 20; i += 1 {
            <-time.After(100 * time.Millisecond)
            workq <- &Request{i}
        }
        <-time.After(500 * time.Millisecond)
            quit <- struct{}{}
    }()
    go func() {
        <-time.After(500 * time.Millisecond)
        ctrlq <- 10
    }()
    Serve(workq, ctrlq, 1)
}

http://play.golang.org/p/AHOLlAv2LH