Created
December 11, 2019 08:12
-
-
Save edthix/8bcb0eb8415d01e4302640cddf57f2b6 to your computer and use it in GitHub Desktop.
Sample airflow dag for ssh tunnel + postgres (assuming both SERVER_ssh_connector and SERVER_ssh_postresql_tunnel_connector are available)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import timedelta, datetime | |
import airflow | |
from airflow import DAG | |
from airflow.models import Variable | |
from airflow.contrib.operators.ssh_operator import SSHOperator | |
from airflow.contrib.hooks.ssh_hook import SSHHook | |
from airflow.operators.postgres_operator import PostgresOperator | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'retries': 5, | |
'retry_delay': timedelta(minutes=5), | |
} | |
dag = DAG( | |
dag_id='testing_postgres_tunnel_ssh', | |
default_args=default_args, | |
start_date=datetime(2019, 12, 1), | |
end_date=datetime(2019, 12, 30), | |
schedule_interval= timedelta(minutes=10) #'@daily' | |
) | |
REMOTE_BIND_IP = Variable.get('SERVER_REMOTE_BIND_IP') | |
REMOTE_BIND_PORT = Variable.get('SERVER_REMOTE_BIND_PORT') | |
LOCAL_BIND_PORT = Variable.get('SERVER_LOCAL_BIND_PORT') | |
ssh_hook = SSHHook(ssh_conn_id='SERVER_ssh_connector', keepalive_interval=60).get_tunnel( | |
int(REMOTE_BIND_PORT), | |
remote_host=REMOTE_BIND_IP, | |
local_port=int(LOCAL_BIND_PORT) | |
).start() | |
ssh_operator = SSHOperator( | |
ssh_hook=ssh_hook, | |
task_id='open_tunnel_to_SERVER', | |
command='ls -al', | |
dag=dag | |
) | |
postgres_operator = PostgresOperator( | |
postgres_conn_id='SERVER_ssh_postresql_tunnel_connector', | |
sql="select * from users limit 100", | |
task_id='get_users_from_SERVER_postgres_table', | |
dag=dag | |
) | |
ssh_operator >> postgres_operator |
There are couple of things to tweek in here to make this work!
hook part should be something like the following:
ssh_hook = SSHHook(ssh_conn_id='SERVER_ssh_connector', keepalive_interval=60)
ssh_hook.get_tunnel(
int(REMOTE_BIND_PORT),
remote_host=REMOTE_BIND_IP,
local_port=int(LOCAL_BIND_PORT)
).start()
# and then pass it to operator
ssh_operator = SSHOperator(
ssh_hook=ssh_hook,
task_id='open_tunnel_to_SERVER',
command='ls -al',
dag=dag
)
!! IMPORTANT thing to remember is the mapping of remote host and port to your local
when opening the tunnel you should specify remote_host as your postgres host and port to the port your postgres communicate with.
Then when creating connection in your postgres operator the connection should be to your locahost and the port.
So, what this setup does, it will create a tunnel to access the postgres_host:port from your localhost:port
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey guys, I made an Kubernetes SSH Tunnel operator and dashboard , check it out may help
https://github.com/ngn-au/whiplash