Skip to content

Instantly share code, notes, and snippets.

@klesouza
Created January 26, 2020 20:56
Show Gist options
  • Save klesouza/12b510b25b478d9b6c50380a424ecea9 to your computer and use it in GitHub Desktop.
Save klesouza/12b510b25b478d9b6c50380a424ecea9 to your computer and use it in GitHub Desktop.
Analyse BigQuery data with TFDV (tensorflow data validation)
import apache_beam as beam
import pyarrow
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import statistics_pb2
import numpy as np
pipeline_options = beam.pipeline.PipelineOptions.from_dictionary({
'project': '[PROJECT_ID]'
})
def row_to_nparray(row: dict):
return {k: np.asarray([v]) for k,v in row.items()}
with beam.Pipeline(options=pipeline_options) as p:
r = (p
| 'BQ read' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `[TABLE]`',
use_standard_sql=True))
| 'to dict' >> beam.Map(row_to_nparray)
| ' batch' >> tfdv.utils.batch_util.BatchExamplesToArrowTables()
| 'tdfv' >> tfdv.GenerateStatistics()
| 'WriteStatsOutput' >> beam.io.WriteToTFRecord(
'files', shard_name_template='',
coder=beam.coders.ProtoCoder(
statistics_pb2.DatasetFeatureStatisticsList))
)
result = p.run()
result.wait_until_finish()
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment