Skip to content

Instantly share code, notes, and snippets.

@thinktainer
Last active March 7, 2018 13:08
Show Gist options
  • Select an option

  • Save thinktainer/c608c4c9c81a4755205dece37bcac4b3 to your computer and use it in GitHub Desktop.

Select an option

Save thinktainer/c608c4c9c81a4755205dece37bcac4b3 to your computer and use it in GitHub Desktop.
parallel processing loop (generic / interface{} based)
package s3repo
import (
"context"
"log"
"sync"
"github.com/pkg/errors"
)
func processItemsParallel(c context.Context, numGoRoutines int, items []interface{}, getItem func(context.Context, interface{}) (interface{}, error), handleItem func(context.Context, interface{}) error) error {
ctx, cancel := context.WithCancel(c)
errChan := make(chan error, 1)
resChan := make(chan interface{})
done := make(chan struct{})
tokenBucket := make(chan struct{}, numGoRoutines)
go func() {
for r := range resChan {
if err := handleItem(ctx, r); err != nil {
errChan <- errors.Wrap(err, "handling item")
break
}
}
done <- struct{}{}
}()
go func() {
wg := &sync.WaitGroup{}
LOOP:
for _, item := range items {
i := item
select {
case <-ctx.Done():
break LOOP
case tokenBucket <- struct{}{}:
wg.Add(1)
go func() {
defer wg.Done()
result, err := getItem(ctx, i)
if err != nil {
select {
case errChan <- errors.Wrap(err, "fetching item"):
default:
}
<-tokenBucket
return
}
resChan <- result
<-tokenBucket
}()
}
}
wg.Wait()
close(resChan)
}()
var err error
select {
case <-done:
case e := <-errChan:
log.Printf("error: %v", e)
err = e
}
cancel()
return err
}
@thinktainer
Copy link
Copy Markdown
Author

parallel processing with error handling (fail fast)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment