How to determine which side of go channel is waiting?

1.2k Views Asked by At

How do I determine which side of a go channel is waiting on the other side?

I'd like to know this so I can figure out where my processing is being limited and respond by allocating more resources.

Some options

The 2 methods I thought of both require something to do a moving average of recorded values so measurements are not too noisy, but that's not a big problem.


  1. Use a timer to check % of time waiting in consumer

In the case of a single consumer, I can start a timer before consuming from the channel, stopping the timer after I get a record. I can keep track of the % of the time I spend waiting and respond accordingly during each fetch cycle.

  1. Sample length of buffered channel

If the channel is regularly 0, that means we are consuming faster than sending. Similarly if the buffer is full, we're sending faster than we can receive. We can check the length of the our channel over time to determine what is running slow.


Is there a good reason to prefer one of there to another, for performance reasons or otherwise? Is there a simpler solution to this problem?

Example

I have a service that is performing N HTTP requests to grab content in up to W goroutines at the same time and sending all that content on a channel to a processor running in a single goroutine, which in turn feeds data back to a client.

Each worker task will result in a large number of messages sent on the channel. Each worker's task can take several minutes to complete.

The following diagram summarizes the flow of data with 3 concurrent workers (W=3).

    [worker: task 1] -
                      \
    [worker: task 2] - | --- [ channel ] --- [ processor ] -> [ client ]
                      /
    [worker: task 3] -

I want to know whether I should run more workers (increase W) or less workers (decrease W) during a request. This can vary a lot per request since client work over connections of very different speeds.

1

There are 1 best solutions below

2
On

One way to reach your goal is to use "bounded send" and "bounded receive" operations—if you're able to come up with reasonable polling timeouts.

When any one of your workers attempts to send a completed result over the channel, don't let it block "forever" (until there's space in the channel's buffer); instead, only allow it to block some maximum amount of time. If the timeout occurs before there was space in the channel's buffer, you can react to that condition: count how many times it occurs, adjust future deadlines, throttle or reduce the worker count, and so on.

Similarly, for your "processor" receiving results from the workers, you can limit the amount of time it blocks. If the timeout occurs before there was a value available, the processor is starved. Create more workers to feed it more quickly (assuming the workers will benefit from such parallelism).

The downside to this approach is the overhead in creating timers for each send or receive operation.

Sketching, with these declarations accessible to each of your workers:

const minWorkers = 3
var workers uint32

In each worker goroutine:

atomic.AddUint32(&workers, 1)
for {
    result, ok := produce()
    if !ok {
        break
    }
    // Detect when channel "p"'s buffer is full.
    select {
    case p <- result:
    case <-time.After(500 * time.Millisecond):
        // Hand over the pending result, no matter how long it takes.
        p <- result
        // Reduce worker count if above minimum.
        if current := atomic.LoadUint32(&workers); current > minWorkers &&
            atomic.CompareAndSwapUint32(&workers, current, current-1) {
            return
        }
        // Consider whether to try decrementing the working count again
        // if we're still above the minimum. It's possible another one
        // of the workers also exited voluntarily, changing the count.
    }
}
atomic.AddUint32(&workers, -1)

Note that as written above, you could achieve the same effect by timing how long it takes for the send to channel p to complete, and reacting to it having taken too long, as opposed to doing one bounded send followed by a potential blocking send. However, I sketched it that way because I suspect that such code would mature to include logging and instrumentation counter bumps when the timeout expires.

Similarly, in your processor goroutine, you could limit the amount of time you block receiving a value from the workers:

for {
    select {
    case result <- p:
        consume(result)
    case <-time.After(500 * time.Millisecond):
        maybeStartAnotherWorker()
    }
}

Obviously, there are many knobs you can attach to this contraption. You wind up coupling the scheduling of the producers to both the consumer and the producers themselves. Introducing an opaque "listener" to which the producers and consumer "complain" about delays allows you to break this circular relationship and more easily vary the policy that governs how you react to congestion.