Created
July 14, 2020 18:51
-
-
Save briansorahan/1da8a795ac4ab982652363b6b8b1bcf2 to your computer and use it in GitHub Desktop.
Debugging parquet-go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"compress/gzip" | |
"io" | |
"log" | |
"os" | |
"time" | |
"github.com/pkg/errors" | |
"github.com/xitongsys/parquet-go-source/local" | |
"github.com/xitongsys/parquet-go/writer" | |
) | |
func main() { | |
f, err := os.Open("lease_production.ndjson.gz") | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer f.Close() | |
r, err := gzip.NewReader(f) | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer r.Close() | |
if err := run(r); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func run(r io.Reader) error { | |
var ( | |
count int64 | |
br = bufio.NewReader(r) | |
start = time.Now() | |
) | |
fw, err := local.NewLocalFileWriter("lease_production.parquet") | |
if err != nil { | |
return errors.Wrap(err, "creating parquet file writer") | |
} | |
pw, err := writer.NewJSONWriter(schema, fw, 4) | |
if err != nil { | |
return errors.Wrap(err, "creating parquet writer") | |
} | |
defer func() { | |
pw.WriteStop() | |
log.Println("closed parquet writer") | |
fw.Close() | |
log.Println("closed file writer") | |
}() | |
ReadLoop: | |
for { | |
line, err := br.ReadString(0x0A) | |
if err == io.EOF { | |
break ReadLoop | |
} | |
if err != nil { | |
return errors.Wrap(err, "reading line") | |
} | |
if err := process(line, pw); err != nil { | |
return errors.Wrapf(err, "processing line %d %s", count, line) | |
} | |
count++ | |
} | |
log.Printf("processed %d lines in %s", count, time.Since(start)) | |
return nil | |
} | |
func process(line string, pw *writer.JSONWriter) error { | |
if err := pw.Write(line); err != nil { | |
return errors.Wrap(err, "writing json to parquet") | |
} | |
return nil | |
} | |
var schema = `{ | |
"Tag": "name=parquet-go-root", | |
"Fields": [ | |
{"Tag": "name=entity_id, type=UTF8, repetitiontype=REQUIRED"} | |
,{"Tag": "name=lease_no, type=UTF8, repetitiontype=REQUIRED"} | |
,{"Tag": "name=district, type=UTF8, repetitiontype=REQUIRED"} | |
,{"Tag": "name=lease_type, type=UTF8, repetitiontype=REQUIRED"} | |
,{"Tag": "name=field_no, type=UTF8, repetitiontype=REQUIRED"} | |
,{"Tag": "name=production_date, type=UTF8, repetitiontype=REQUIRED"} | |
,{"Tag": "name=operator_no, type=UTF8, repetitiontype=OPTIONAL"} | |
,{"Tag": "name=oil, type=DOUBLE, repetitiontype=OPTIONAL"} | |
,{"Tag": "name=casinghead, type=DOUBLE, repetitiontype=OPTIONAL"} | |
,{"Tag": "name=condensate, type=DOUBLE, repetitiontype=OPTIONAL"} | |
,{"Tag": "name=gas, type=DOUBLE, repetitiontype=OPTIONAL"} | |
,{"Tag": "name=flow_count, type=DOUBLE, repetitiontype=OPTIONAL"} | |
,{"Tag": "name=lift_count, type=DOUBLE, repetitiontype=OPTIONAL"} | |
] | |
} | |
` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment