Created
May 2, 2019 18:24
-
-
Save p5k6/6941109ca0e04c21017c57b76b2c1389 to your computer and use it in GitHub Desktop.
postgres version of sample_dag for amundsen databuilder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import textwrap | |
from datetime import datetime, timedelta | |
import uuid | |
from elasticsearch import Elasticsearch | |
from airflow import DAG # noqa | |
from airflow import macros # noqa | |
from airflow.operators.python_operator import PythonOperator # noqa | |
from pyhocon import ConfigFactory | |
from databuilder.extractor.neo4j_search_data_extractor import Neo4jSearchDataExtractor | |
from databuilder.extractor.postgres_table_metadata_extractor import PostgresTableMetadataExtractor | |
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor | |
from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher | |
from databuilder.extractor.neo4j_extractor import Neo4jExtractor | |
from databuilder.job.job import DefaultJob | |
from databuilder.models.table_metadata import DESCRIPTION_NODE_LABEL | |
from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader | |
from databuilder.loader.file_system_neo4j_csv_loader import FsNeo4jCSVLoader | |
from databuilder.publisher import neo4j_csv_publisher | |
from databuilder.publisher.neo4j_csv_publisher import Neo4jCsvPublisher | |
from databuilder.task.task import DefaultTask | |
from databuilder.transformer.base_transformer import NoopTransformer | |
from databuilder.transformer.elasticsearch_document_transformer import ElasticsearchDocumentTransformer | |
from databuilder import Scoped | |
dag_args = { | |
'concurrency': 10, | |
# One dagrun at a time | |
'max_active_runs': 1, | |
# 4AM, 4PM PST | |
'schedule_interval': '0 11 * * *', | |
'catchup': False | |
} | |
default_args = { | |
'owner': 'amundsen', | |
'start_date': datetime(2018, 6, 18), | |
'depends_on_past': False, | |
'email': [''], | |
'email_on_failure': False, | |
'email_on_retry': False, | |
'retries': 3, | |
'priority_weight': 10, | |
'retry_delay': timedelta(minutes=5), | |
'execution_timeout': timedelta(minutes=120) | |
} | |
# NEO4J cluster endpoints | |
NEO4J_ENDPOINT = 'bolt://neo4j:7687' | |
neo4j_endpoint = NEO4J_ENDPOINT | |
neo4j_user = 'neo4j' | |
neo4j_password = 'test' | |
es = Elasticsearch([ | |
{'host': 'elasticsearch'}, | |
]) | |
# Todo: user needs to modify and provide a hivemetastore connection string | |
def connection_string(): | |
return 'postgresql://jstanfield:[email protected]:5433/moviesdemo' | |
def create_table_extract_job(**kwargs): | |
where_clause_suffix = textwrap.dedent(""" | |
where table_name='movies' | |
and table_catalog='moviesdemo' and table_schema='moviesdemo' | |
""") | |
tmp_folder = '/var/tmp/amundsen/table_metadata' | |
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder) | |
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder) | |
job_config = ConfigFactory.from_dict({ | |
'extractor.postgres_table_metadata.{}'.format(PostgresTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): | |
where_clause_suffix, | |
'extractor.postgres_table_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): | |
connection_string(), | |
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): | |
node_files_folder, | |
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): | |
relationship_files_folder, | |
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): | |
node_files_folder, | |
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): | |
relationship_files_folder, | |
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): | |
neo4j_endpoint, | |
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): | |
neo4j_user, | |
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): | |
neo4j_password, | |
'publisher.neo4j.{}'.format(neo4j_csv_publisher.JOB_PUBLISH_TAG): | |
'unique_tag', # should use unique tag here like {ds} | |
}) | |
job = DefaultJob(conf=job_config, | |
task=DefaultTask(extractor=PostgresTableMetadataExtractor(), loader=FsNeo4jCSVLoader()), | |
publisher=Neo4jCsvPublisher()) | |
job.launch() | |
def create_es_publisher_sample_job(): | |
# loader save data to this location and publisher read if from here | |
extracted_search_data_path = '/var/tmp/amundsen/search_data.json' | |
task = DefaultTask(loader=FSElasticsearchJSONLoader(), | |
extractor=Neo4jSearchDataExtractor(), | |
transformer=ElasticsearchDocumentTransformer()) | |
# elastic search client instance | |
elasticsearch_client = es | |
# unique name of new index in Elasticsearch | |
elasticsearch_new_index_key = 'tables' + str(uuid.uuid4()) | |
# related to mapping type from /databuilder/publisher/elasticsearch_publisher.py#L38 | |
elasticsearch_new_index_key_type = 'table' | |
# alias for Elasticsearch used in amundsensearchlibrary/search_service/config.py as an index | |
elasticsearch_index_alias = 'table_search_index' | |
job_config = ConfigFactory.from_dict({ | |
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint, | |
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): | |
'databuilder.models.neo4j_data.Neo4jDataResult', | |
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user, | |
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password, | |
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): | |
extracted_search_data_path, | |
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w', | |
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_INDEX_CONFIG_KEY): | |
elasticsearch_new_index_key, | |
'transformer.elasticsearch.{}'.format(ElasticsearchDocumentTransformer.ELASTICSEARCH_DOC_CONFIG_KEY): | |
elasticsearch_new_index_key_type, | |
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): | |
extracted_search_data_path, | |
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r', | |
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): | |
elasticsearch_client, | |
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): | |
elasticsearch_new_index_key, | |
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): | |
elasticsearch_index_alias | |
}) | |
job = DefaultJob(conf=job_config, | |
task=task, | |
publisher=ElasticsearchPublisher()) | |
job.launch() | |
with DAG('amundsen_databuilder', default_args=default_args, **dag_args) as dag: | |
create_table_extract_job = PythonOperator( | |
task_id='create_table_extract_job', | |
python_callable=create_table_extract_job | |
) | |
create_es_index_job = PythonOperator( | |
task_id='create_es_publisher_sample_job', | |
python_callable=create_es_publisher_sample_job | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment