Created
July 2, 2023 22:13
-
-
Save juanpabloaj/8855566497175255a1fc6f4b6febed74 to your computer and use it in GitHub Desktop.
heartbeats example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// package main example of the book Concurrency in Go by Katherine Cox-Buday | |
// it includes the patterns: orChannel, orDone, take, and bridge. | |
// newSteward monitors a goroutine and restarts it if it stops sending heartbeats | |
package main | |
import ( | |
"log" | |
"time" | |
) | |
func or(channels ...<-chan interface{}) <-chan interface{} { | |
switch len(channels) { | |
case 0: | |
return nil | |
case 1: | |
return channels[0] | |
} | |
orDone := make(chan interface{}) | |
go func() { | |
defer close(orDone) | |
switch len(channels) { | |
case 2: | |
select { | |
case <-channels[0]: | |
case <-channels[1]: | |
} | |
default: | |
select { | |
case <-channels[0]: | |
case <-channels[1]: | |
case <-channels[2]: | |
case <-or(append(channels[3:], orDone)...): | |
} | |
} | |
}() | |
return orDone | |
} | |
func orDone(done, c <-chan interface{}) <-chan interface{} { | |
valStream := make(chan interface{}) | |
go func() { | |
defer close(valStream) | |
for { | |
select { | |
case <-done: | |
return | |
case v, ok := <-c: | |
if !ok { | |
return | |
} | |
select { | |
case valStream <- v: | |
case <-done: | |
} | |
} | |
} | |
}() | |
return valStream | |
} | |
func bridge( | |
done <-chan interface{}, | |
chanStream <-chan <-chan interface{}, | |
) <-chan interface{} { | |
valStream := make(chan interface{}) | |
go func() { | |
defer close(valStream) | |
for { | |
var stream <-chan interface{} | |
select { | |
case maybeStream, ok := <-chanStream: | |
if !ok { | |
return | |
} | |
stream = maybeStream | |
case <-done: | |
return | |
} | |
for val := range orDone(done, stream) { | |
select { | |
case valStream <- val: | |
case <-done: | |
} | |
} | |
} | |
}() | |
return valStream | |
} | |
type startGoroutineFn func( | |
done <-chan interface{}, | |
pulseInterval time.Duration, | |
) (heartbeat <-chan interface{}) | |
func newSteward( | |
timeout time.Duration, | |
startGoroutine startGoroutineFn, | |
) startGoroutineFn { | |
return func( | |
done <-chan interface{}, | |
pulseInterval time.Duration, | |
) <-chan interface{} { | |
heartbeat := make(chan interface{}) | |
go func() { | |
defer close(heartbeat) | |
var wardDone chan interface{} | |
var wardHeartbeat <-chan interface{} | |
startWard := func() { | |
wardDone = make(chan interface{}) | |
wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) | |
} | |
startWard() | |
pulse := time.Tick(pulseInterval) | |
monitorLoop: | |
for { | |
timeoutSignal := time.After(timeout) | |
for { | |
select { | |
case <-pulse: | |
select { | |
case heartbeat <- struct{}{}: | |
default: | |
} | |
case <-wardHeartbeat: | |
continue monitorLoop | |
case <-timeoutSignal: | |
log.Println("steward: ward unhealthy, restarting") | |
close(wardDone) | |
startWard() | |
continue monitorLoop | |
case <-done: | |
return | |
} | |
} | |
} | |
}() | |
return heartbeat | |
} | |
} | |
func take( | |
done <-chan interface{}, | |
valueStream <-chan interface{}, | |
num int, | |
) <-chan interface{} { | |
takeStream := make(chan interface{}) | |
go func() { | |
defer close(takeStream) | |
for i := 0; i < num; i++ { | |
select { | |
case <-done: | |
return | |
case takeStream <- <-valueStream: | |
} | |
} | |
}() | |
return takeStream | |
} | |
func main() { | |
log.SetFlags(log.Ltime | log.Lmicroseconds) | |
doWorkFn := func( | |
done <-chan interface{}, | |
intList ...int, | |
) (startGoroutineFn, <-chan interface{}) { | |
intChanStream := make(chan (<-chan interface{})) | |
intStreamOut := bridge(done, intChanStream) | |
doWork := func( | |
done <-chan interface{}, | |
pulseInterval time.Duration, | |
) <-chan interface{} { | |
intStream := make(chan interface{}) | |
heartbeat := make(chan interface{}) | |
go func() { | |
defer close(intStream) | |
select { | |
case intChanStream <- intStream: | |
case <-done: | |
return | |
} | |
pulse := time.Tick(pulseInterval) | |
for { | |
valueLoop: | |
for _, intVal := range intList { | |
if intVal < 0 { | |
log.Printf("negative value: %v\n", intVal) | |
return | |
} | |
for { | |
select { | |
case <-pulse: | |
select { | |
case heartbeat <- struct{}{}: | |
default: | |
} | |
case intStream <- intVal: | |
continue valueLoop | |
case <-done: | |
return | |
} | |
} | |
} | |
} | |
}() | |
return heartbeat | |
} | |
return doWork, intStreamOut | |
} | |
done := make(chan interface{}) | |
defer close(done) | |
doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5) | |
doWorkWithSteward := newSteward(1*time.Millisecond, doWork) | |
doWorkWithSteward(done, 1*time.Hour) | |
for intVal := range take(done, intStream, 6) { | |
log.Printf("received %v\n", intVal) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment