Skip to content

Instantly share code, notes, and snippets.

@ifuller1
Last active November 12, 2018 19:04
Show Gist options
  • Save ifuller1/a295c271a3b39827cb4c5b3fd4b3635d to your computer and use it in GitHub Desktop.
Save ifuller1/a295c271a3b39827cb4c5b3fd4b3635d to your computer and use it in GitHub Desktop.
import logging
import datetime
import json
import glob
from airflow import DAG
from airflow import models
from airflow.hooks.http_hook import HttpHook
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow.operators.python_operator import PythonOperator
SYMBOLS = ['AAPL', 'GOOGL', 'SNAP']
YESTERDAY = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
def say_hello_method(**_context):
logging.info('Hello Cloud Composer!')
def create_pricing_file(**context):
symbol = context['params']['symbol']
iex_hook = HttpHook(
method='GET',
http_conn_id='iex_http_connection'
)
iex_response = iex_hook.run(
f'/1.0/stock/{symbol}/quote',
extra_options={"verify": True},
)
response_json = json.loads(iex_response.text)
pricing_data = {
"symbol": symbol,
"price": response_json['latestPrice']
}
pricing_file = f'/home/airflow/gcs/data/{symbol}.json'
with open(pricing_file, 'w') as text_file:
text_file.write(json.dumps(pricing_data))
def parse_message_data(filename):
json_string = ""
with open(filename, 'r') as text_file:
json_string = text_file.read()
symbol_json = json.loads(json_string)
return f'Symbol:{symbol_json["symbol"]} is at ${symbol_json["price"]}'
def build_message(**context):
json_files = glob.glob("/home/airflow/gcs/data/*.json")
html_body = '\n'.join([parse_message_data(filename)
for filename in json_files])
context['task_instance'].xcom_push(key='html_body', value=html_body)
DEFAULT_ARGS = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': YESTERDAY,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1),
'project_id': models.Variable.get('gcp_project')
}
with DAG('example',
default_args=DEFAULT_ARGS,
schedule_interval=datetime.timedelta(minutes=90),
catchup=False
) as dag:
say_hello_task = PythonOperator(
task_id='say_hello',
python_callable=say_hello_method,
provide_context=True
)
build_message_task = PythonOperator(
task_id='build_message_task',
python_callable=build_message,
provide_context=True
)
send_slack_message_task = SlackAPIPostOperator(
task_id='send_slack_message_task',
channel="#ian-test",
token=models.Variable.get('slack_token'),
text='{{ task_instance.xcom_pull(task_ids=["build_message_task"], key="html_body")[0] }}'
)
say_hello_task \
>> [PythonOperator(
task_id=f'create_pricing_file_{symbol}',
params={'symbol': symbol},
python_callable=create_pricing_file,
provide_context=True
) for symbol in SYMBOLS] \
>> build_message_task \
>> send_slack_message_task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment