Last active
February 1, 2016 00:53
-
-
Save rcarver/59633d83fb9ef4634111 to your computer and use it in GitHub Desktop.
Example pipeline program in go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Package main runs a program that counts input lines. It's an example of a | |
// program that cleanly handles input and output to pipes, and cancellation. | |
// | |
// Examples | |
// | |
// # Compile | |
// | |
// > go build -o go-pipeline . | |
// | |
// # Success | |
// | |
// > echo 'abc\ndefg\nhij\na\nb\nc' | ./go-pipeline -name a -sleep 0 | ./go-pipeline -name b -sleep 2s | |
// a Counted 6 | |
// a Exiting | |
// b:1 a:1 abc | |
// b:2 a:2 defg | |
// b:3 a:3 hij | |
// b:4 a:4 a | |
// b:5 a:5 b | |
// b:6 a:6 c | |
// b Counted 6 | |
// b Exiting | |
// | |
// # Cancel the second process with CTRL-C | |
// | |
// › echo 'abc\ndefg\nhij\na\nb\nc' | ./go-pipeline -name a -sleep 0 | ./go-pipeline -name b -sleep 1s | |
// a Counted 6 | |
// a Exiting | |
// b:1 a:1 abc | |
// b:2 a:2 defg | |
// ^Cb Cancelling: interrupt | |
// b Counted 2 | |
// | |
// # Cancel both processes with CTRL-C | |
// | |
// › echo 'abc\ndefg\nhij\na\nb\nc' | ./go-pipeline -name a -sleep 1s | ./go-pipeline -name b -sleep 1s | |
// b:1 a:1 abc | |
// b:2 a:2 defg | |
// ^Cb Cancelling: interrupt | |
// a Cancelling: interrupt | |
// a Counted 2 | |
// b Counted 2 | |
// | |
package main | |
import ( | |
"bufio" | |
"flag" | |
"log" | |
"os" | |
"os/signal" | |
"sync" | |
"time" | |
"golang.org/x/net/context" | |
) | |
func main() { | |
// Parse inputs | |
var ( | |
name string | |
sleep time.Duration | |
) | |
flag.StringVar(&name, "name", "a", "name, used in output") | |
flag.DurationVar(&sleep, "sleep", 0, "sleep between lines") | |
flag.Parse() | |
// Initialize output streams. | |
stdout := log.New(os.Stdout, "", 0) | |
stderr := log.New(os.Stderr, "", 0) | |
// Verify input stream. | |
if !isPipe(os.Stdin) { | |
stderr.Printf("No input") | |
os.Exit(1) | |
} | |
// Catch signals for cancellation. | |
sigs := make(chan os.Signal) | |
signal.Notify(sigs, | |
os.Interrupt, // CTRL-C | |
) | |
// Context drives control flow for cancellation. | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
// Handle signals. | |
go func() { | |
// Handle signal. | |
sig := <-sigs | |
stderr.Printf("%s Cancelling: %s", name, sig) | |
cancel() | |
}() | |
// Handle shutdown. | |
var shutdown sync.WaitGroup | |
shutdown.Add(1) | |
go func() { | |
// Coordinate other shutdown activity. | |
<-ctx.Done() | |
stderr.Printf("%s Exiting", name) | |
shutdown.Done() | |
}() | |
// Main program. | |
var cnt int | |
scanner := bufio.NewScanner(os.Stdin) | |
for scanner.Scan() { | |
select { | |
case <-ctx.Done(): // Exit loop if canceled. | |
break | |
default: | |
cnt++ | |
text := scanner.Text() | |
stdout.Printf("%s:%d %s\n", name, cnt, text) | |
select { | |
case <-time.After(sleep): | |
case <-ctx.Done(): // Abort sleep if canceled. | |
} | |
} | |
} | |
stderr.Printf("%s Counted %d", name, cnt) | |
// Exit. | |
cancel() // Ensure shutdown is triggered. | |
shutdown.Wait() // Ensure shutdown is complete. | |
os.Exit(0) | |
} | |
func isPipe(f *os.File) bool { | |
info, _ := f.Stat() | |
if (info.Mode() & os.ModeCharDevice) == os.ModeCharDevice { | |
return false | |
} | |
return true | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
run: build | |
echo 'a\nb\nc\nd\ne' | ./go-pipeline -name a -sleep 1s | ./go-pipeline -name b -sleep 2s | |
build: | |
go build -o go-pipeline . | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment