Skip to content

Instantly share code, notes, and snippets.

@db7
Created June 20, 2019 10:44
Show Gist options
  • Select an option

  • Save db7/38791842583c0eabde523191521a64fc to your computer and use it in GitHub Desktop.

Select an option

Save db7/38791842583c0eabde523191521a64fc to your computer and use it in GitHub Desktop.
Use view to initialize and update a data structure.
package main
import (
"context"
"log"
"sync"
"time"
"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/storage"
"github.com/lovoo/goka/tester"
)
const (
tableTopic = "match-table"
table = goka.Table(tableTopic)
inputTopic = "input-stream"
inputStream = goka.Stream(inputTopic)
group = goka.Group("my-group")
)
var (
brokers = []string{"localhost:9092"}
m sync.Mutex
keys = map[string]bool{}
)
func main() {
var (
ctx = context.Background()
tester = tester.New(new(nullt))
)
view, _ := goka.NewView(brokers, table, new(codec.String),
goka.WithViewCallback(
func(s storage.Storage, partition int32, key string, value []byte) error {
// update data structure
addKey(key)
// call the default update to store in the local storage
return goka.DefaultUpdate(s, partition, key, value)
}),
goka.WithViewTester(tester),
)
// iterate over view and intialize data structure before consuming new events
initialize(view)
// start consuming new events
go view.Run(ctx)
// wait until view is recovered
for !view.Recovered() {
time.Sleep(1 * time.Second)
}
// now start processor
proc, _ := goka.NewProcessor(brokers,
goka.DefineGroup(
group,
goka.Input(inputStream, new(codec.String), func(ctx goka.Context, m interface{}) {
// do match check
if match(ctx.Key()) {
log.Println("match:", ctx.Key(), m)
} else {
log.Println("no match:", ctx.Key(), m)
}
}),
),
goka.WithTester(tester),
)
go proc.Run(ctx)
// some events
tester.Consume(tableTopic, "key1", "")
tester.Consume(tableTopic, "key2", "")
tester.Consume(inputTopic, "key1", "should match event value")
tester.Consume(inputTopic, "key3", "should not match event value")
}
func addKey(k string) {
m.Lock()
keys[k] = true
m.Unlock()
}
func match(k string) bool {
m.Lock()
defer m.Unlock()
return keys[k]
}
func initialize(view *goka.View) {
// iterate over the whole table
it, err := view.Iterator()
defer it.Release()
if err != nil {
panic(err)
}
for it.Next() {
addKey(it.Key())
}
}
type nullt int
func (nullt) Errorf(format string, args ...interface{}) { log.Printf(format, args) }
func (nullt) Fatalf(format string, args ...interface{}) { log.Fatalf(format, args) }
func (nullt) Fatal(args ...interface{}) { log.Fatal(args) }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment