Created
March 21, 2016 15:03
-
-
Save benbjohnson/9d2ebbc90b8b52f3fe25 to your computer and use it in GitHub Desktop.
boltpanic
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"errors" | |
"flag" | |
"fmt" | |
"io" | |
"io/ioutil" | |
"log" | |
"math/rand" | |
"os" | |
"strconv" | |
"sync" | |
"time" | |
"github.com/boltdb/bolt" | |
) | |
const ( | |
OplogExt = ".log" | |
BucketName = "kv" | |
CommitN = 50 | |
) | |
func main() { | |
rand.Seed(time.Now().UnixNano()) | |
// Parse command line arguments. | |
fs := flag.NewFlagSet("boltpanic", flag.ContinueOnError) | |
srcDBPath := fs.String("src-db", "", "source db path") | |
since := fs.Int("start-txid", 0, "replay from txid") | |
if err := fs.Parse(os.Args[1:]); err != nil { | |
fmt.Fprintln(os.Stderr, err) | |
os.Exit(1) | |
} | |
// First argument specifies a subcommand to run. | |
switch fs.Arg(0) { | |
case "generate": | |
if err := generate(); err != nil { | |
fmt.Fprintln(os.Stderr, err) | |
os.Exit(1) | |
} | |
case "replay": | |
if err := replay(fs.Arg(1), *srcDBPath, *since); err != nil { | |
fmt.Fprintln(os.Stderr, err) | |
os.Exit(1) | |
} | |
case "": | |
fmt.Fprintln(os.Stderr, "command name required") | |
os.Exit(1) | |
default: | |
fmt.Fprintf(os.Stderr, "unknown command: %q\n", fs.Arg(0)) | |
os.Exit(1) | |
} | |
} | |
// generate randomly generates databases until one panics. | |
func generate() error { | |
for { | |
// Generate temporary path. | |
path := MustTempPath() | |
log.Printf("GENERATE: %s", path) | |
// If an error is returned then simply stop. | |
if err := generateFile(path, CommitN); err != nil { | |
log.Fatalf("error occurred: %s", err) | |
} | |
// If we successfully generated a database then try again. | |
log.Printf("GENERATION SUCCESSFUL, RETRYING") | |
os.Remove(path) | |
os.Remove(path + OplogExt) | |
println("") | |
} | |
return nil | |
} | |
// generateFile generates a bolt file with a maximum of n commits. | |
func generateFile(path string, n int) error { | |
// Create and open database. | |
db := NewDB(path) | |
if err := db.Open(); err != nil { | |
return err | |
} | |
defer db.Close() | |
// Generate as many operations as possible in a given period. | |
ticker := time.NewTicker(1 * time.Second) | |
defer ticker.Stop() | |
for i := 0; i < n; i++ { | |
tx, err := db.db.Begin(true) | |
if err != nil { | |
return err | |
} | |
inner: | |
for { | |
switch rand.Intn(3) { | |
case 0: // PUT | |
k, v := strconv.Itoa(rand.Intn(65536)), RandBlob() | |
db.mustAppendLogEntry(&LogEntry{Op: "put", Key: k, Value: v}) | |
if err := tx.Bucket([]byte(BucketName)).Put([]byte(k), v); err != nil { | |
log.Fatalf("put error:", err) | |
} | |
case 1: | |
k := strconv.Itoa(rand.Intn(65536)) | |
db.mustAppendLogEntry(&LogEntry{Op: "get", Key: k}) | |
if v := tx.Bucket([]byte(BucketName)).Get([]byte(k)); len(v) > 1000000 { // this is added to avoid optimization | |
log.Println("...") | |
} | |
case 2: | |
k := strconv.Itoa(rand.Intn(32768)) | |
db.mustAppendLogEntry(&LogEntry{Op: "delete", Key: k}) | |
if err := tx.Bucket([]byte(BucketName)).Delete([]byte(k)); err != nil { | |
log.Fatalf("delete error:", err) | |
} | |
} | |
select { | |
case <-ticker.C: | |
break inner | |
default: | |
} | |
} | |
db.mustAppendLogEntry(&LogEntry{Op: "commit"}) | |
if err := tx.Commit(); err != nil { | |
return err | |
} | |
} | |
return nil | |
} | |
// replay executes a set of generated data against the database. | |
func replay(logPath, srcDBPath string, sinceTxid int) error { | |
if logPath == "" { | |
return errors.New("log path required") | |
} | |
// Generate temporary path. | |
dbPath := MustTempPath() | |
log.Printf("READ FROM: %s", logPath) | |
log.Printf("REPLAY INTO: %s", dbPath) | |
// Copy from source database, if specified. | |
if srcDBPath != "" { | |
log.Printf("COPYING DB FROM: %s", srcDBPath) | |
buf, err := ioutil.ReadFile(srcDBPath) | |
if err != nil { | |
return err | |
} | |
if err := ioutil.WriteFile(dbPath, buf, 0666); err != nil { | |
return err | |
} | |
} | |
// Create and open database. | |
db := NewDB(dbPath) | |
if err := db.Open(); err != nil { | |
return err | |
} | |
db.db.StrictMode = true | |
defer db.Close() | |
f, err := os.Open(logPath) | |
if err != nil { | |
return err | |
} | |
defer f.Close() | |
dec := json.NewDecoder(f) | |
// Skip over early transactions if requested. | |
if sinceTxid > 0 { | |
log.Printf("SKIPPING TO TX: %d", sinceTxid) | |
for txid := 3; txid < sinceTxid; { | |
// Increment txid on commit. | |
var entry LogEntry | |
if err := dec.Decode(&entry); err == io.EOF { | |
break | |
} else if err != nil { | |
return err | |
} else if entry.Op == "commit" { | |
log.Printf("[skip #%d]", txid) | |
txid++ | |
} | |
} | |
} | |
var done bool | |
for { | |
if err := db.db.Update(func(tx *bolt.Tx) error { | |
log.Printf("executing tx: id=%d", tx.ID()) | |
for { | |
// Decode entry. | |
var entry LogEntry | |
if err := dec.Decode(&entry); err == io.EOF { | |
done = true | |
return nil | |
} else if err != nil { | |
return err | |
} | |
// Execute entry. | |
println("op", entry.Op) | |
switch entry.Op { | |
case "put": | |
if err := tx.Bucket([]byte(BucketName)).Put([]byte(entry.Key), entry.Value); err != nil { | |
log.Fatalf("put error:", err) | |
} | |
case "get": | |
if v := tx.Bucket([]byte(BucketName)).Get([]byte(entry.Key)); len(v) > 1000000 { // this is added to avoid optimization | |
log.Println("...") | |
} | |
case "delete": | |
if err := tx.Bucket([]byte(BucketName)).Delete([]byte(entry.Key)); err != nil { | |
log.Fatalf("delete error:", err) | |
} | |
case "commit": | |
return nil // exit inner function and commit | |
default: | |
log.Fatalf("invalid op: %s", entry.Op) | |
} | |
} | |
}); err != nil { | |
return err | |
} else if done { | |
break | |
} | |
} | |
log.Printf("SUCCESSFUL REPLAY, DELETING DB: %s", dbPath) | |
if err := os.Remove(dbPath); err != nil { | |
return err | |
} | |
return nil | |
} | |
// DB wraps a bolt.DB to provide periodic flushes. | |
type DB struct { | |
mu sync.Mutex | |
path string | |
db *bolt.DB | |
oplog *os.File | |
} | |
// NewDB returns a new instance of DB. | |
func NewDB(path string) *DB { | |
return &DB{ | |
path: path, | |
} | |
} | |
// Open opens and initializes the database. | |
func (db *DB) Open() error { | |
// Open database file. | |
d, err := bolt.Open(db.path, 0666, &bolt.Options{Timeout: 1 * time.Second}) | |
if err != nil { | |
return err | |
} | |
db.db = d | |
// Create top level bucket. | |
if err := db.db.Update(func(tx *bolt.Tx) error { | |
if _, err := tx.CreateBucketIfNotExists([]byte(BucketName)); err != nil { | |
return err | |
} | |
return nil | |
}); err != nil { | |
return err | |
} | |
// Open operation log. | |
f, err := os.Create(db.path + OplogExt) | |
if err != nil { | |
return err | |
} | |
db.oplog = f | |
return nil | |
} | |
// Close closes the database. | |
func (db *DB) Close() error { | |
db.db.Close() | |
db.oplog.Close() | |
return nil | |
} | |
// blobs is a fixed set of generated data used for Put() values. | |
var blobs = [][]byte{} | |
func init() { | |
blobs = make([][]byte, 1024) | |
for i := 0; i < 1024; i++ { | |
blob := make([]byte, 100+rand.Intn(4096)) | |
for j := 0; j < len(blob); j++ { | |
blob[j] = byte(j % 254) | |
} | |
blobs[i] = blob | |
} | |
} | |
func RandBlob() []byte { return blobs[rand.Intn(len(blobs))] } | |
// writeLogEntry writes an entry to the operations log. | |
func (db *DB) mustAppendLogEntry(e *LogEntry) { | |
if err := json.NewEncoder(db.oplog).Encode(e); err != nil { | |
panic(err) | |
} | |
if err := db.oplog.Sync(); err != nil { | |
panic(err) | |
} | |
} | |
// LogEntry represents an entry to the operation log. | |
type LogEntry struct { | |
Op string `json:"op,omitempty"` | |
Key string `json:"key,omitempty"` | |
Value []byte `json:"value,omitempty"` | |
} | |
func MustTempPath() string { | |
f, err := ioutil.TempFile("", "boltpanic-") | |
if err != nil { | |
panic(err) | |
} | |
f.Close() | |
os.Remove(f.Name()) | |
return f.Name() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment