Last active
July 10, 2019 01:26
-
-
Save jameshartig/95c1148db456190fc61034accf3e85d1 to your computer and use it in GitHub Desktop.
Reproduction for #1485
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"math/rand" | |
"net/http" | |
_ "net/http/pprof" | |
"os" | |
"os/signal" | |
"sync" | |
"sync/atomic" | |
"time" | |
"go.opencensus.io/stats/view" | |
"cloud.google.com/go/pubsub" | |
"github.com/levenlabs/go-llog" | |
"google.golang.org/grpc/codes" | |
"google.golang.org/grpc/status" | |
) | |
func isGoogleCloudErrAlreadyExists(err error) bool { | |
if err == nil { | |
return false | |
} | |
s, ok := status.FromError(err) | |
return ok && s.Code() == codes.AlreadyExists | |
} | |
type ocstat int64 | |
// ExportView implements the view.Exporter interface | |
func (s *ocstat) ExportView(d *view.Data) { | |
if len(d.Rows) < 1 { | |
return | |
} | |
atomic.SwapInt64((*int64)(s), int64(d.Rows[0].Data.(*view.SumData).Value)) | |
} | |
func main() { | |
proj := flag.String("project", "admiral-1007", "project name") | |
topic := flag.String("topic", "pubsub_test", "name of the topic to use") | |
outstanding := flag.Int("num-outstanding", 1000, "number of oustanding messages") | |
numPublish := flag.Int("num-publish", 1000000, "number of jobs to publish") | |
flag.Parse() | |
go func() { | |
// useful for pprof | |
http.ListenAndServe("127.0.0.1:9999", nil) | |
}() | |
// the client, topic and subscription | |
psc, err := pubsub.NewClient(context.Background(), *proj) | |
if err != nil { | |
panic(err) | |
} | |
t, err := psc.CreateTopic(context.Background(), *topic) | |
if err != nil && !isGoogleCloudErrAlreadyExists(err) { | |
panic(err) | |
} else if t == nil { | |
t = psc.Topic(*topic) | |
} | |
sub, err := psc.CreateSubscription(context.Background(), *topic+"_test", pubsub.SubscriptionConfig{ | |
Topic: t, | |
AckDeadline: 15 * time.Second, | |
}) | |
if err != nil && !isGoogleCloudErrAlreadyExists(err) { | |
panic(err) | |
} else if sub == nil { | |
sub = psc.Subscription(*topic + "_test") | |
} | |
ctx, cancel := context.WithCancel(context.Background()) | |
var acked int64 | |
var published int64 | |
// setup goroutine to print out stats once a minute | |
{ | |
var pullCount ocstat | |
view.SetReportingPeriod(time.Second) // try to make sure we get an export close so it matches up | |
view.RegisterExporter(&pullCount) | |
view.Register(pubsub.PullCountView) | |
var lastPullCount int64 | |
go func() { | |
tick := time.NewTicker(time.Minute) | |
for { | |
select { | |
case <-tick.C: | |
a := atomic.SwapInt64(&acked, 0) | |
p := atomic.SwapInt64(&published, 0) | |
pc := atomic.LoadInt64((*int64)(&pullCount)) | |
lpc := atomic.SwapInt64(&lastPullCount, pc) | |
llog.Info("last minute", llog.KV{"acked": a, "pullCount": pc - lpc, "published": p}) | |
case <-ctx.Done(): | |
return | |
} | |
} | |
}() | |
} | |
// make a channel so we can start receiving after publishes finish | |
donePublishing := make(chan bool) | |
wg := new(sync.WaitGroup) | |
// publish all of the messages to pubsub | |
if *numPublish > 0 { | |
llog.Info("starting publishing") | |
innerWG := new(sync.WaitGroup) | |
publishers := 100 | |
per := *numPublish / publishers | |
for i := 0; i < 100; i++ { | |
innerWG.Add(1) | |
wg.Add(1) | |
go func() { | |
defer innerWG.Done() | |
defer wg.Done() | |
for i := 0; i < per; i++ { | |
_, err := t.Publish(ctx, &pubsub.Message{ | |
// just something from json-generator.com | |
Data: []byte(`{"_id":"5bc522c300adfb89aa57aa46","index":0,"guid":"7068a7a9-9fb1-4fc6-867a-60144f0a4312","greeting":"Hello, undefined! You have 5 unread messages.","favoriteFruit":"banana"}`), | |
}).Get(ctx) | |
if ctx.Err() == context.Canceled { | |
return | |
} | |
if err != nil { | |
llog.Warn("error publishing", llog.ErrKV(err)) | |
time.Sleep(time.Second) | |
} else { | |
atomic.AddInt64(&published, 1) | |
} | |
} | |
}() | |
} | |
// once all of our publishers are done then close the done ch | |
go func() { | |
innerWG.Wait() | |
close(donePublishing) | |
}() | |
} else { | |
close(donePublishing) | |
} | |
sub.ReceiveSettings.MaxOutstandingMessages = *outstanding | |
sub.ReceiveSettings.NumGoroutines = 10 | |
sub.ReceiveSettings.MaxExtension = time.Hour | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
select { | |
case <-donePublishing: | |
llog.Info("starting receive") | |
case <-ctx.Done(): | |
return | |
} | |
for { | |
err := sub.Receive(ctx, func(mctx context.Context, msg *pubsub.Message) { | |
select { | |
// 100ms -> ~3 seconds | |
case <-time.After(time.Duration(100+rand.Int63n(3000)) * time.Millisecond): | |
msg.Ack() | |
atomic.AddInt64(&acked, 1) | |
case <-mctx.Done(): | |
msg.Nack() | |
} | |
}) | |
if ctx.Err() == context.Canceled { | |
return | |
} | |
if err != nil { | |
llog.Warn("error receiving", llog.ErrKV(err)) | |
time.Sleep(time.Second) | |
} | |
} | |
}() | |
c := make(chan os.Signal, 1) | |
signal.Notify(c, os.Interrupt) | |
<-c | |
cancel() | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment