-
-
Save hartym/c03ed10caa5a439a72b108057209f543 to your computer and use it in GitHub Desktop.
CSV Sanitizer Bonobo ETL Job with Atomic Writes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from time import sleep | |
import os | |
import re | |
import bonobo | |
from atomicwrites import atomic_write | |
from fs.osfs import OSFS | |
from texools.files import new_file_check | |
from texools.strings import asciify | |
from unittest.mock import patch | |
SOURCE_FILE = os.getenv('SOURCE_FILE', | |
'dealervault/DealerVaultInventory.csv') | |
DESTINATION_FILE = os.getenv('DESTINATION_FILE', | |
'dealervault/DealerVaultInventory_processed.csv') | |
SHOULD_FORCE = bool(os.getenv('SHOULD_FORCE', 0)) | |
_open = OSFS.open | |
def _atomic_open(self, path, mode="r", **kwargs): | |
if mode.startswith('w'): | |
return atomic_write(path, overwrite=mode.endswith('+')) | |
return _open(self, path, mode=mode, **kwargs) | |
OSFS.open = _atomic_open | |
def get_graph(): | |
graph = bonobo.Graph() | |
if SHOULD_FORCE or new_file_check(SOURCE_FILE, DESTINATION_FILE): | |
graph.add_chain( | |
bonobo.FileReader(SOURCE_FILE), | |
asciify, | |
extra_quote_killer, | |
bonobo.FileWriter(DESTINATION_FILE) | |
) | |
else: | |
graph.add_node( | |
sleep(1) | |
) | |
return graph | |
def extra_quote_killer(line: str) -> str: | |
# if line.startswith('"') and line.endswith('"'): | |
field_seprators_original = re.compile('","') | |
field_sepraters_replacement = re.compile('<~>') | |
bos_eos_quotes_original = re.compile('(^"|"$)') | |
bos_eos_quotes_replacement = re.compile('<>') | |
any_quotes = re.compile('"') | |
translated_seperators = field_seprators_original.sub('<~>', line) | |
translated_bol_eol_quotes = bos_eos_quotes_original.sub( | |
'<>', translated_seperators) | |
translated_sanitized = any_quotes.sub('', translated_bol_eol_quotes) | |
retranslated_seperators = field_sepraters_replacement.sub( | |
'","', translated_sanitized) | |
retranslated_sanitized = bos_eos_quotes_replacement.sub( | |
'"', retranslated_seperators) | |
# else: | |
# retranslated_sanitized = line | |
return retranslated_sanitized | |
def get_services(): | |
return {} | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--env', '-e', action='append', default=list()) | |
options = parser.parse_args() | |
for env in options.env: | |
k, v = env.split('=', 1) | |
os.environ[k] = v | |
bonobo.run(get_graph(), services=get_services()) | |
else: | |
graph = get_graph() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment