Created
November 11, 2017 18:26
-
-
Save cw-andrews/2b7a2f98475f90b52857f9b6cc75db5d to your computer and use it in GitHub Desktop.
CSV Sanitizer Bonobo ETL Job with Atomic Writes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from time import sleep | |
import os | |
import re | |
import bonobo | |
from atomicwrites import atomic_write | |
from fs.osfs import OSFS | |
from texools.files import new_file_check | |
from texools.strings import asciify | |
from unittest.mock import patch | |
SOURCE_FILE = os.getenv('SOURCE_FILE', | |
'dealervault/DealerVaultInventory.csv') | |
DESTINATION_FILE = os.getenv('DESTINATION_FILE', | |
'dealervault/DealerVaultInventory_processed.csv') | |
SHOULD_FORCE = bool(os.getenv('SHOULD_FORCE', 0)) | |
_open = OSFS.open | |
def _atomic_open(self, path, mode="r", **kwargs): | |
if mode.startswith('w'): | |
return atomic_write(path, overwrite=mode.endswith('+')) | |
return _open(path, mode=mode, **kwargs) | |
OSFS.open = _atomic_open | |
def get_graph(): | |
graph = bonobo.Graph() | |
if SHOULD_FORCE or new_file_check(SOURCE_FILE, DESTINATION_FILE): | |
graph.add_chain( | |
bonobo.FileReader(SOURCE_FILE), | |
asciify, | |
extra_quote_killer, | |
bonobo.FileWriter(DESTINATION_FILE) | |
) | |
else: | |
graph.add_node( | |
sleep(1) | |
) | |
return graph | |
def extra_quote_killer(line: str) -> str: | |
# if line.startswith('"') and line.endswith('"'): | |
field_seprators_original = re.compile('","') | |
field_sepraters_replacement = re.compile('<~>') | |
bos_eos_quotes_original = re.compile('(^"|"$)') | |
bos_eos_quotes_replacement = re.compile('<>') | |
any_quotes = re.compile('"') | |
translated_seperators = field_seprators_original.sub('<~>', line) | |
translated_bol_eol_quotes = bos_eos_quotes_original.sub( | |
'<>', translated_seperators) | |
translated_sanitized = any_quotes.sub('', translated_bol_eol_quotes) | |
retranslated_seperators = field_sepraters_replacement.sub( | |
'","', translated_sanitized) | |
retranslated_sanitized = bos_eos_quotes_replacement.sub( | |
'"', retranslated_seperators) | |
# else: | |
# retranslated_sanitized = line | |
return retranslated_sanitized | |
def get_services(): | |
return {} | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--env', '-e', action='append', default=list()) | |
options = parser.parse_args() | |
for env in options.env: | |
k, v = env.split('=', 1) | |
os.environ[k] = v | |
bonobo.run(get_graph(), services=get_services()) | |
else: | |
graph = get_graph() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment