Skip to content

Instantly share code, notes, and snippets.

@tameem92
Last active April 29, 2021 12:18
Show Gist options
  • Save tameem92/ac72dee0298f6190daddbccbca5543fc to your computer and use it in GitHub Desktop.
Save tameem92/ac72dee0298f6190daddbccbca5543fc to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
import os
from airflow.models import DAG, Variable
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
# Custom slack notifications
from utils.slack import task_fail_slack_alert
from utils.slack import pipeline_success_slack_alert
VERSION = 'v1.0.1'
YESTERDAY = datetime.utcnow() - timedelta(days=1)
class ProcessMessages():
def __init__(self, environment_config, config):
self.config = config
self.environment_config = environment_config
self.default_args = {
'owner': self.environment_config.get('owner', 'test'),
'start_date': datetime.strptime(self.config['start_date'], '%Y-%m-%d'),
'retries': 0,
'on_failure_callback': task_fail_slack_alert,
}
def build(self):
dag = DAG(
dag_id=f'process_messages_{VERSION}',
default_args=self.default_args,
schedule_interval=self.config['schedule'],
catchup=False,
template_searchpath=['/home/airflow/gcs/dags'],
)
secret_volume = Secret(
'volume', # Path where we mount the secret as volume
'/var/secrets/google', # Name of Kubernetes Secret
'service-account', # Key in the form of service account file name
'service-account.json'
)
# In our example we just call a kubernetes
# pod operator
fetch_messages = KubernetesPodOperator(
task_id='fetch_messages',
name='fetch_messages',
namespace='default',
is_delete_operator_pod=True,
image_pull_policy='Always',
image_pull_secrets='gitlab-registry',
image=self.config['image'],
cmds=['/bin/bash', 'c', 'echo "fetch"'],
secrets=[secret_volume],
dag=dag
)
process_messages = KubernetesPodOperator(
task_id='process_messages',
name='process_messages',
namespace='default',
is_delete_operator_pod=True,
image_pull_policy='Always',
image_pull_secrets='gitlab-registry',
image=self.config['image'],
cmds=['/bin/bash', 'c', 'echo "hey"'],
secrets=[secret_volume],
dag=dag
)
fetch_messages >> process_messages
return dag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment