Skip to content

Instantly share code, notes, and snippets.

@snaffi
Created April 25, 2019 07:27
Show Gist options
  • Select an option

  • Save snaffi/7565bc14981d039f0dccd02eb4745505 to your computer and use it in GitHub Desktop.

Select an option

Save snaffi/7565bc14981d039f0dccd02eb4745505 to your computer and use it in GitHub Desktop.
package main
import (
"log"
"os"
"time"
"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/rcrowley/go-metrics"
"golang.org/x/sync/errgroup"
)
var batchLat = metrics.GetOrRegisterTimer("batch_lat", nil)
var singleLat = metrics.GetOrRegisterTimer("single_lat", nil)
func main() {
defer metrics.WriteOnce(metrics.DefaultRegistry, os.Stdout)
cluster := gocql.NewCluster(...)
cluster.Keyspace = "test_updates"
cluster.RetryPolicy = &gocql.ExponentialBackoffRetryPolicy{NumRetries: 999999, Min: time.Millisecond * 5, Max: time.Second * 5}
cluster.NumConns = 50
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
cluster.Compressor = gocql.SnappyCompressor{}
cluster.Timeout = time.Minute * 5
cluster.SocketKeepalive = time.Minute
cluster.Consistency = gocql.LocalOne
sess, err := cluster.CreateSession()
if err != nil {
panic(err)
}
defer sess.Close()
users := make([]gocql.UUID, 0)
for i := 0; i < 250; i++ {
users = append(users, gocql.TimeUUID())
}
log.Println("batch processing")
for i := 0; i < 2000; i++ {
partitionedBatch, err := sess.NewPartitionedBatch(
"INSERT INTO updates (user_id, update_id, data) VALUES (?, ?, ?)", gocql.UnloggedBatch, true, gocql.One,
)
if err != nil {
panic(errors.Wrap(err, "can not create partitioned batch object"))
}
for _, userID := range users {
if err := partitionedBatch.Query(userID, gocql.TimeUUID(), "some text data"); err != nil {
panic(errors.Wrap(err, "can not add query params to batch"))
}
}
batchLat.Time(func() {
batchInserts(sess, partitionedBatch)
})
}
log.Println("single query processing")
for i := 0; i < 2000; i++ {
queries := make([]*gocql.Query, 0)
for _, userID := range users {
q := sess.Query(
"INSERT INTO updates (user_id, update_id, data) VALUES (?, ?, ?)", userID, gocql.TimeUUID(), "some text data",
).Idempotent(true).Consistency(gocql.One)
queries = append(queries, q)
}
singleLat.Time(func() {
singleInserts(sess, queries)
})
}
}
func batchInserts(sess *gocql.Session, partitionedBatch *gocql.PartitionedBatch) {
wg := errgroup.Group{}
for _, b := range partitionedBatch.Batches() {
batch := b
wg.Go(func() error {
return sess.ExecuteBatch(batch)
})
}
if err := wg.Wait(); err != nil {
panic(errors.Wrap(wg.Wait(), "executeBatch"))
}
}
func singleInserts(sess *gocql.Session, queries []*gocql.Query) {
wg := errgroup.Group{}
for _, q := range queries {
query := q
wg.Go(func() error {
return query.Exec()
})
}
if err := wg.Wait(); err != nil {
panic(errors.Wrap(wg.Wait(), "singleInserts"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment