Last active
October 29, 2017 22:08
-
-
Save hartym/e676f1c74a4313db6b108ff31f4e66b7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import datetime | |
import os | |
import time | |
import bonobo | |
from atomicwrites import atomic_write | |
from bonobo.constants import NOT_MODIFIED | |
from fs.osfs import OSFS | |
_open = OSFS.open | |
def _atomic_open(self, path, mode="r", **kwargs): | |
if mode.startswith('w'): | |
return atomic_write(path, overwrite=mode.endswith('+')) | |
return _open(path, mode=mode, **kwargs) | |
OSFS.open = _atomic_open | |
def is_update_needed(): | |
return bool(os.getenv('UPDATE', '')) | |
input_data = [{'id': 1, 'timestamp': str(datetime.datetime.now())}] | |
def take_your_time(*args, **kwargs): | |
time.sleep(5) | |
return NOT_MODIFIED | |
def get_graph(): | |
graph = bonobo.Graph() | |
if is_update_needed(): | |
graph.add_chain( | |
input_data, | |
take_your_time, | |
bonobo.CsvWriter('output.csv', ioformat='arg0'), | |
) | |
return graph | |
def get_services(): | |
return {} | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--env', '-e', action='append', default=list()) | |
options = parser.parse_args() | |
for env in options.env: | |
k, v = env.split('=', 1) | |
os.environ[k] = v | |
bonobo.run(get_graph(), services=get_services()) | |
else: | |
graph = get_graph() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment