Created
October 3, 2023 23:39
-
-
Save ecmonsen/76759c5ab42a1973ef2dac7668bfe883 to your computer and use it in GitHub Desktop.
Pseudo-python in response to a recent interview question.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Build the start of an e2e pipeline designed to be robust, extensible and scalable. | |
Approach and structure is open ended. | |
Use any packages you like but ensure code is as close to executable as possible. | |
Input | |
API endpoint = "testurl.com/endpoint" | |
- json response that contains some IDs, a description of that ID, and the most | |
recent modification date for that id’s description. | |
Has three columns: | |
Ex: [{“id”: “hello world”, “description”: “lorem”, “timestamp”: “ipsum”}, {}, …] | |
id | |
description | |
timestamp | |
Output | |
./output/<filename>.csv | |
- A flat file that contains all of the records of the API call and has these four columns: | |
id | |
description | |
timestamp | |
ingestion_time - time at which you call the API | |
""" | |
from datetime import datetime | |
import requests | |
import csv | |
import json | |
import argparse | |
import os | |
import logging | |
logger = logging.getLogger("pipeline") | |
def pipeline(endpoint, output_dir, fields : Iterable): | |
# endpoint = "testurl.com/endpoint" | |
# output_path = './output/<filename>.csv' | |
dt = datetime.now() | |
output_path = os.path.join(output_dir, f"table_{dt}.csv") | |
try: | |
response_body = requests.get(endpoint) | |
j = json.read(response_body) | |
with open(output_path, 'w') as f: | |
csv_writer = csv.writer(output_path) | |
csv.write(fields) | |
for row in j: | |
csv.write([row.get(field, "") for field in fields] + [str(dt)]) | |
except JsonError e: | |
logger.error("error reading response json") | |
except HttpError e: | |
if e.status >= 500: | |
retry() # TODO | |
elif e.status >= 400: | |
logger.error(e) | |
raise e | |
DEFAULT_ENDPOINT=="testurl.com/endpoint" | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT) | |
parser.add_argument("output_path") | |
args = parser.parse_args() | |
pipeline(args.endpoint, args.output_path) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment