Skip to content

Instantly share code, notes, and snippets.

@nomkhonwaan
Last active December 19, 2019 13:35
Show Gist options
  • Save nomkhonwaan/40dceee35bc2c4ba112dbb2496c21643 to your computer and use it in GitHub Desktop.
Save nomkhonwaan/40dceee35bc2c4ba112dbb2496c21643 to your computer and use it in GitHub Desktop.
ต่อ-การทำ-graceful-shutdown-กับ-worker-pool-ใน-go-5df5070c8883fdeaea7306cb
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// worker is a member of the Worker Pool which handles an asynchronous task
type worker struct {
id int
wg *sync.WaitGroup
jobs chan int
results chan int
}
// do handles all tasks
func (w *worker) do() {
for j := range w.jobs {
fmt.Println("worker", w.id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", w.id, "finished job", j)
w.results <- j * 2
}
w.wg.Done()
}
func main() {
jobs := make(chan int, 100)
ingest := make(chan int)
results := make(chan int, 100)
wg := new(sync.WaitGroup)
stop := handleSignals()
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context, ingest <-chan int, jobs chan<- int) {
for {
select {
case v := <-ingest:
jobs <- v
case _ = <-ctx.Done():
fmt.Println("received shutdown signal, closing jobs!")
close(jobs)
fmt.Println("jobs closed")
return
}
}
}(ctx, ingest, jobs)
wg.Add(3)
for w := 1; w <= 3; w++ {
go (&worker{id: w, wg: wg, jobs: jobs, results: results}).do()
}
go func(ingest chan<- int) {
for j := 1; j <= 5; j++ {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
ingest <- j
}
}(ingest)
<-stop
cancel()
wg.Wait()
}
// handleSignals listens on SIGINT and SIGTERM if one of them has been notified,
// will close stop channel otherwise will exist immediately with non-zero
func handleSignals() <-chan struct{} {
stop := make(chan struct{})
sigs := make(chan os.Signal, 2)
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigs
close(stop)
<-sigs
os.Exit(1)
}()
return stop
}
package main
import (
"fmt"
"sync"
"time"
)
// worker is a member of the Worker Pool which handles an asynchronous task
type worker struct {
id int
wg *sync.WaitGroup
jobs chan int
results chan int
}
// do handles all tasks
func (w *worker) do() {
for j := range w.jobs {
fmt.Println("worker", w.id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", w.id, "finished job", j)
w.results <- j * 2
}
w.wg.Done()
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
wg := new(sync.WaitGroup)
wg.Add(3)
for w := 1; w <= 3; w++ {
go (&worker{id: w, wg: wg, jobs: jobs, results: results}).do()
}
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment