Skip to content

Instantly share code, notes, and snippets.

@echohes
Last active March 9, 2023 08:13
Show Gist options
  • Save echohes/8f6b1002f99f2305456cc1fa6b224438 to your computer and use it in GitHub Desktop.
Save echohes/8f6b1002f99f2305456cc1fa6b224438 to your computer and use it in GitHub Desktop.
golang errgroup example with context and subcontext for gracefull shutdown
package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"golang.org/x/sync/errgroup"
)
func tick(ctx context.Context, sec int) error {
//additional goroutine
go tick2(ctx, sec+10)
ticker := time.NewTicker(2 * time.Second)
i := 1
for {
select {
case <-ticker.C:
log.Println(fmt.Sprintf("ticker %d s ticked, %d\n", sec, i))
case <-ctx.Done():
//imitation of slow closing
time.Sleep(2 * time.Second)
log.Println(fmt.Sprintf("closing ticker local %d s goroutine\n", sec))
return ctx.Err()
}
i = i + 1
}
}
func tick2(ctx context.Context, sec int) error {
ticker := time.NewTicker(2 * time.Second)
i := 1
for {
select {
case <-ticker.C:
log.Println(fmt.Sprintf("ticker2 %d s ticked, %d\n", sec, i))
// test error
if sec == 2 && i == 2 {
fmt.Println("test error ticker2")
return fmt.Errorf("test error ticker")
}
case <-ctx.Done():
log.Println(fmt.Sprintf("closing ticker2 local %d s goroutine\n", sec))
return ctx.Err()
}
i = i + 1
}
}
func sigtrap(ctx context.Context) error {
sigChannel := make(chan os.Signal, 1)
signal.Notify(sigChannel, os.Interrupt, syscall.SIGTERM)
select {
case sig := <-sigChannel:
fmt.Printf("received signal: %s\n", sig)
return fmt.Errorf(fmt.Sprintf("os signal %s", sig))
case <-ctx.Done():
fmt.Printf("closing signal goroutine\n")
return ctx.Err()
}
}
func main() {
mainctx, done := context.WithCancel(context.Background())
g, gctx := errgroup.WithContext(mainctx)
// goroutine to check for signals to gracefully finish all functions
g.Go(func() error {
return sigtrap(gctx)
})
//run 3 goroutine group
for _, num := range []int{1, 2, 3} {
num := num
g2, gsubctx := errgroup.WithContext(gctx)
g2.Go(func() error {
return tick(gsubctx, num)
})
g2.Go(func() error {
return tick2(gsubctx, num)
})
g.Go(func() error {
if err := g2.Wait(); err == nil || errors.Is(err, context.Canceled) {
//stop with cancel
fmt.Println(fmt.Sprintf("stop clear group %d", num))
return gsubctx.Err()
} else {
fmt.Println(fmt.Sprintf("stop with err %v, group %d", err, num))
//need return 'is cancel' for main context, all group continue work
return gctx.Err()
}
})
}
//after timeout, shutdown all
time.AfterFunc(12*time.Second, func() {
fmt.Printf("stop after 12sec\n")
//send to main context, canceled all goroutine and subcontext goroutine
done()
})
if err := g.Wait(); err == nil || errors.Is(err, context.Canceled) {
fmt.Printf("finished clean: %v", err)
} else {
fmt.Printf("received error: %v", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment