Skip to content

Instantly share code, notes, and snippets.

@z81
Created December 31, 2018 02:11

Revisions

  1. z81 created this gist Dec 31, 2018.
    127 changes: 127 additions & 0 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,127 @@
    package main

    import (
    "database/sql"
    "fmt"
    "log"
    "math"
    "runtime"
    "sync"
    "time"

    humanize "github.com/dustin/go-humanize"
    "github.com/kshvakov/clickhouse"
    )

    type Stats struct {
    success int
    recived int
    startTime int64
    time int64
    }

    func write(connect *sql.DB, batchSize int, stats *Stats) {
    stats.recived += batchSize
    var (
    tx, _ = connect.Begin()
    stmt, _ = tx.Prepare("INSERT INTO data2 (name, user, ts, v1, v2) VALUES (?, ?, ?, ?, ?)")
    )

    graphCount := 3
    userCount := 1000

    for i := 0; i < (batchSize / graphCount / userCount); i++ {
    for g := 0; g < graphCount; g++ {
    for u := 0; u < userCount; u++ {

    val1 := float64(u % 100)
    val2 := float64(u % 25)

    if _, err := stmt.Exec(
    u,
    g,
    stats.time,
    val1,
    val2,
    // clickhouse.Array([]float64{val1, val2}),
    ); err != nil {
    log.Fatal(err)
    }
    }

    stats.time = (stats.time + 60)
    }
    }

    if err := tx.Commit(); err != nil {
    log.Fatal(err)
    return
    }

    stats.success += batchSize

    }

    func t(stats *Stats) {
    timeDiff := float64(time.Now().Unix() - stats.startTime)
    recAvgMsgSec := int(math.Floor(float64(stats.recived) / timeDiff))
    sucAvgMsgSec := int(math.Floor(float64(stats.success) / timeDiff))

    fmt.Printf("Suc %[1]s, Rec %[2]s, Avg rec %[3]s/sec, avg suc %[4]s/sec \n",
    humanize.Comma(int64(stats.success)),
    humanize.Comma(int64(stats.recived)),
    humanize.Comma(int64(recAvgMsgSec)),
    humanize.Comma(int64(sucAvgMsgSec)),
    )
    }

    func main() {
    runtime.GOMAXPROCS(4)

    batchSize := 350000
    msgSec := 1800000
    workers := 4

    stats := Stats{
    success: 0,
    recived: 0,
    startTime: time.Now().Unix(),
    time: 118771240,
    }

    timer1 := time.NewTicker(time.Second)
    go func() {
    for range timer1.C {
    t(&stats)
    }
    }()

    connect, err := sql.Open("clickhouse", "tcp://localhost:9000?debug=false")
    if err != nil {
    log.Fatal(err)
    }

    if err := connect.Ping(); err != nil {
    if exception, ok := err.(*clickhouse.Exception); ok {
    fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
    } else {
    fmt.Println(err)
    }
    return
    }

    var wg sync.WaitGroup
    wg.Add(1)

    oneTick := float64(time.Second) / float64(msgSec) * float64(batchSize)

    for w := 1; w <= workers; w++ {
    timer2 := time.NewTicker(time.Duration(oneTick / float64(workers)))
    go func() {
    for range timer2.C {
    write(connect, batchSize, &stats)
    }
    }()
    }
    wg.Wait()
    }