Created
August 26, 2024 12:50
-
-
Save xtrmstep/f62925b7785f65ca78c7acd230fd030d to your computer and use it in GitHub Desktop.
Perform operations: Convert JSON to Avro using schema, store with compression, read Avro with compression and store uncompressed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import fastavro | |
from fastavro.schema import load_schema | |
def json_to_avro(json_file_path, avro_file_path, schema_file_path, compression='deflate'): | |
try: | |
schema = load_schema(schema_file_path) | |
except Exception as e: | |
print(f"Failed to load schema from {schema_file_path}: {e}") | |
return | |
records = [] | |
try: | |
with open(json_file_path, 'r', encoding='utf-8') as json_file: | |
for line in json_file: | |
try: | |
record = json.loads(line) | |
records.append(record) | |
except json.JSONDecodeError: | |
print(f"Invalid JSON found: {line}") | |
except Exception as e: | |
print(f"Failed to read or parse JSON file {json_file_path}: {e}") | |
return | |
try: | |
with open(avro_file_path, 'wb') as avro_file: | |
fastavro.writer(avro_file, schema, records, codec=compression) | |
print(f"Converted {len(records)} records to Avro with {compression} compression.") | |
except Exception as e: | |
print(f"Failed to write Avro file {avro_file_path}: {e}") | |
def read_and_write_avro_no_compression(input_avro_file, output_avro_file, schema_file_path): | |
try: | |
schema = load_schema(schema_file_path) | |
except Exception as e: | |
print(f"Failed to load schema from {schema_file_path}: {e}") | |
return | |
records = [] | |
try: | |
with open(input_avro_file, 'rb') as f_in: | |
reader = fastavro.reader(f_in) | |
for record in reader: | |
records.append(record) | |
except Exception as e: | |
print(f"Failed to read the input Avro file {input_avro_file}: {e}") | |
return | |
try: | |
with open(output_avro_file, 'wb') as f_out: | |
fastavro.writer(f_out, schema, records, codec='null') # No compression | |
print(f"Successfully wrote {len(records)} records to {output_avro_file} without compression.") | |
except Exception as e: | |
print(f"Failed to write the output Avro file {output_avro_file}: {e}") | |
json_to_avro( | |
'data.json', | |
'output.avro', | |
'schema.avsc', | |
compression='deflate' # Change to 'snappy', 'bzip2', or other if desired | |
) | |
read_and_write_avro_no_compression( | |
'output.avro', | |
'uncompressed_output.avro', | |
'schema.avsc' | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment