Last active
October 26, 2016 10:10
-
-
Save volnt/a4d1954eb7c7236661dd98b61cccf514 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import re | |
import json | |
import sys | |
import argparse | |
from sys import argv | |
from collections import defaultdict | |
from google.cloud import storage | |
from google.cloud import bigquery | |
from google.appengine.api.files import records | |
from google.appengine.datastore import entity_pb | |
from google.appengine.api import datastore | |
def fetch_entities(bucket_name, folder_name): | |
client = storage.Client() | |
bucket = client.get_bucket(bucket_name) | |
blobs = bucket.list_blobs(prefix=folder_name) | |
print 'Reading blobs : ', | |
for blob in blobs: | |
print '{}..'.format(blob.path.split('%2F')[-1]), | |
sys.stdout.flush() | |
with open('/tmp/blob.data', 'wb') as blob_file: | |
blob.download_to_file(blob_file) | |
with open('/tmp/blob.data', 'r') as blob_file: | |
reader = records.RecordsReader(blob_file) | |
for record in reader: | |
entity_proto = entity_pb.EntityProto(contents=record) | |
yield datastore.Entity.FromPb(entity_proto) | |
def remove_html_tags(html): | |
return re.sub(r'<.*?>', ' ', html) | |
def embedded_entity_to_dict(entity): | |
real_pb = entity_pb.EntityProto() | |
real_pb.ParsePartialFromString(entity['template']) | |
real_pb.key().path().add_element() | |
real_pb.key().path().element_list()[-1].type_ = 'Template' | |
real_pb.key().app_ = 'lumsites-dev' | |
template = datastore.Entity.FromPb(real_pb) | |
if not len(template.get('components', [])): | |
return {} | |
return json.loads(template['components'][0][1:-1].decode('string_escape')) | |
def entity_to_rows(entity): | |
if not entity['template']: | |
return [] | |
components = embedded_entity_to_dict(entity) | |
rows = defaultdict(lambda: { | |
'instance': str(entity['instance'].id()), | |
'text': '', | |
'type': str(entity['customContentType'].id()), | |
'tags': entity.get('customContentTypeTags', []) | |
}) | |
for cell in components.get('cells', []): | |
for component in cell.get('components', []): | |
for lang, data in component.get('properties', {}).get('content', {}).iteritems(): | |
rows[lang]['text'] += remove_html_tags(data) + u' ' | |
rows[lang]['lang'] = lang | |
rows[lang]['slug'] = json.loads(entity.get('slug', '{}')).get(lang) | |
return [(row['slug'], row['instance'], row['lang'], row['text'], row['type'], row['tags']) | |
for row in rows.values() if row['text'].strip()] | |
def insert_entities(table, entities): | |
output = [] | |
for entity in entities: | |
rows = entity_to_rows(entity) | |
if rows: | |
output.append(table.insert_data(rows)) | |
return output | |
def create_table(dataset_name, table_name): | |
client = bigquery.Client() | |
dataset = client.dataset(dataset_name) | |
table = dataset.table(table_name, [ | |
bigquery.SchemaField('slug', 'STRING', mode='nullable'), | |
bigquery.SchemaField('instance', 'STRING', mode='nullable'), | |
bigquery.SchemaField('lang', 'STRING', mode='nullable'), | |
bigquery.SchemaField('text', 'STRING', mode='nullable'), | |
bigquery.SchemaField('type', 'STRING', mode='nullable'), | |
bigquery.SchemaField('tags', 'STRING', mode='repeated'), | |
]) | |
if not table.exists(): | |
table.create() | |
table.reload() | |
return table | |
def main(argv=None): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--bucket', | |
dest='bucket', | |
required=True, | |
default='gs://lumsites-sandbox-datastore-backup', | |
help='Bucket containing the backup files.') | |
parser.add_argument('--folder', | |
required=True, | |
dest='folder', | |
default='', | |
help='Folder containing the backup files.') | |
parser.add_argument('--dataset', | |
required=True, | |
dest='dataset', | |
help='Output dataset to write results to.') | |
parser.add_argument('--table', | |
required=True, | |
dest='table', | |
help='Output table to write results to.') | |
known_args, pipeline_args = parser.parse_known_args(argv) | |
table = create_table(known_args.dataset, known_args.table) | |
output = insert_entities(table, fetch_entities(known_args.bucket, known_args.folder)) | |
if output: | |
print output[0] | |
if __name__ == "__main__": | |
main(argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment