Skip to content

Instantly share code, notes, and snippets.

@libesz
Last active January 20, 2018 11:27
Show Gist options
  • Save libesz/5bd6506fe4c945e39b7395cbb49d9e25 to your computer and use it in GitHub Desktop.
Save libesz/5bd6506fe4c945e39b7395cbb49d9e25 to your computer and use it in GitHub Desktop.
go program to demonstrate the basic pattern of a simple worker pool
/* Dummy one-filer go program to demonstrate the basic pattern
of a simple worker pool with a context, a waitgroup and a channel.
I crafted this when I wanted to (again) understand the go concurrency
patterns. Than now, I found it clean enough and added verbose comments
so that anyone can understand or complain :).
So, the following language properties can be inspected:
- Channels may have multiple senders and/or receivers
- Here we have multiple receivers (workers in a pool) and a single sender
(who is producing the work).
- Channels can be used to create cancellable, but otherwise blocking
and event driven data consumers (select) -> no polling or busy loop.
- Contexts are inheritable and safely usable concurrently
- Here we use them to send common properties while an individual
ID as well
- Deferred actions are performed in the opposite order of the
placement
Usage: go run workerpool.go
*/
package main
import (
"context"
"log"
"math/rand"
"sync"
"time"
)
// When some data is added to a context, the WithValue() function of the
// context package has a generic key-value interface, so we have to craft a
// user datatype for the keys.
type ctxKey int
// Dumb key choices to be on the same page between the getter and the setter
// place.
const chanKey = ctxKey(0)
const wgKey = ctxKey(1)
const idKey = ctxKey(2)
func main() {
// Greetings... and immediately say good-bye.
log.Println("Main starting")
// Since this is the first deferred function call, this will be executed
// as the very last one.
defer log.Println("Main exiting")
// Here we create the root context. We start with the Background() which
// is totally empty. It has no property of functionality. Then wrap it
// immediately in another one, which gives it a cancel capability.
// It returns a cancel function which can be called later on.
workerCtx, workerCancelFunc := context.WithCancel(context.Background())
// We create the channel, with which we can push tasks to the workers
// It could be an arbitrary datatype, here it is integer. It is not
// buffered, which means whenever the sender tries to send something,
// it will block until somebody is able to consume it in parralel.
workerInputChan := make(chan int)
// Add it to the context as a value so anyone having this context can
// extract it. Using the global key identifier.
workerCtx = context.WithValue(workerCtx, chanKey, workerInputChan)
// Create a so-called wait group. It helps to synchronize parralel things.
// Here we use it to wait the workers to finish when the main() is already
// about to exit. It is thread-safe so multiple concurrent go-routines may
// contribute to the counter which it holds.
var wg sync.WaitGroup
// Add it also the context with it's key.
workerCtx = context.WithValue(workerCtx, wgKey, &wg)
// Let's defer a wait call at the end of the main(). This will block until
// the wait group counter is 0. Note that we have a single wait statement
// for all the workers.
defer wg.Wait()
// Loop to start up multiple workers with roughly the same context.
for id := 0; id < 10; id++ {
// Increment the waitgroup counter before firing a new worker.
// The worker is responsible to decrement it when finished.
wg.Add(1)
// Here we add the last value to the workers' context. It is an
// identifier, which, hence the name shall be identical for each.
// We assign the loop counter which will override the context data
// in every iteration. So every worker will share the channel and the
// WaitGroup, but will have unique ID.
workerCtx = context.WithValue(workerCtx, idKey, id)
// Start a worker in a go-routine. The only parameter is the prepared
// context.
go worker(workerCtx)
}
// Let's fire the cancellation of the workers. Since this is the last defer
// call, this will be executed first, after the written code of main()
// concludes.
defer workerCancelFunc()
// Last but not least, push some work to the workers by sending data of
// the required type (integer) into the channel.
for i := 0; i < 1000; i++ {
workerInputChan <- i
}
// So here, as we seemingly return from main(), the following deferred function
// calles will be issued and in this order (which is the opposite of the order
// in the code above):
// - workerCancelFunc()
// - wg.Wait()
// - log.Println("Main exiting")
}
// The function which represents
func worker(ctx context.Context) {
// Extract the defined context data and cast them to the valid types.
in, _ := ctx.Value(chanKey).(chan int)
wg, _ := ctx.Value(wgKey).(*sync.WaitGroup)
id, _ := ctx.Value(idKey).(int)
// Make sure that we decrement the waitgroup counter when the actual worker
// exits.
defer wg.Done()
// Create a loop. Yes it seems infinite, but will block in every iteration
// until either work arrived in the channel or the context is cancelled by
// the workerCancelFunc up there...
for {
select {
case workItem := <-in:
// We successfully got an item from the work channel. Since all the
// workers are trying to consume from this channel, the load is
// shared between the workers.
log.Println("ID:", id, "Working on item:", workItem)
// Simulate random working time, 0-99 msec. time.After creates a
// channel in which it will send the actual time after the given
// time.
timer := time.After(time.Millisecond * time.Duration(rand.Int()%100))
// We don't use the data from the channel, so we simply block here
// until any data comes.
<-timer
case <-ctx.Done():
// This channel is triggered when the actual worker is asked to exit.
// We might ask how it is ensured that all the work is done and this
// case is not selected before that. This is ensured by two things.
// First is that select is always checking the cases in the order of
// the code. Second,
log.Println("ID", id, "Worker cancelled.")
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment