Created
May 2, 2019 18:22
-
-
Save p5k6/f2181270a3a16fa46097f76686cd6cfa to your computer and use it in GitHub Desktop.
Amundsen Postgres version of a metadata extractor. Only tested on 9.6, on local install. No tests provided at this point
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from collections import namedtuple | |
from pyhocon import ConfigFactory, ConfigTree # noqa: F401 | |
from typing import Iterator, Union, Dict, Any # noqa: F401 | |
from databuilder import Scoped | |
from databuilder.extractor.base_extractor import Extractor | |
from databuilder.extractor.sql_alchemy_extractor import SQLAlchemyExtractor | |
from databuilder.models.table_metadata import TableMetadata, ColumnMetadata | |
from itertools import groupby | |
TableKey = namedtuple('TableKey', ['schema_name', 'table_name']) | |
LOGGER = logging.getLogger(__name__) | |
class PostgresTableMetadataExtractor(Extractor): | |
""" | |
Extracts Postgres table and column metadata from underlying meta store database using SQLAlchemyExtractor | |
""" | |
# SELECT statement from postgres information_schema to extract table and column metadata | |
SQL_STATEMENT = """ | |
SELECT | |
c.table_catalog as cluster, c.table_schema as schema_name, c.table_name as name, null as description, c.column_name as col_name, c.data_type as col_type, null as col_description, ordinal_position as col_sort_order | |
FROM INFORMATION_SCHEMA.COLUMNS c | |
{where_clause_suffix} | |
ORDER by cluster, schema_name, name, col_sort_order ; | |
""" | |
# CONFIG KEYS | |
WHERE_CLAUSE_SUFFIX_KEY = 'where_clause_suffix' | |
CLUSTER_KEY = 'master' | |
DEFAULT_CONFIG = ConfigFactory.from_dict({WHERE_CLAUSE_SUFFIX_KEY: ' ', CLUSTER_KEY: 'master'}) | |
def init(self, conf): | |
# type: (ConfigTree) -> None | |
conf = conf.with_fallback(PostgresTableMetadataExtractor.DEFAULT_CONFIG) | |
self._cluster = '{}'.format(conf.get_string(PostgresTableMetadataExtractor.CLUSTER_KEY)) | |
self.sql_stmt = PostgresTableMetadataExtractor.SQL_STATEMENT.format( | |
where_clause_suffix=conf.get_string(PostgresTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY)) | |
LOGGER.info('SQL for postgres metadata: {}'.format(self.sql_stmt)) | |
self._alchemy_extractor = SQLAlchemyExtractor() | |
sql_alch_conf = Scoped.get_scoped_conf(conf, self._alchemy_extractor.get_scope())\ | |
.with_fallback(ConfigFactory.from_dict({SQLAlchemyExtractor.EXTRACT_SQL: self.sql_stmt})) | |
self._alchemy_extractor.init(sql_alch_conf) | |
self._extract_iter = None # type: Union[None, Iterator] | |
def extract(self): | |
# type: () -> Union[TableMetadata, None] | |
if not self._extract_iter: | |
self._extract_iter = self._get_extract_iter() | |
try: | |
return next(self._extract_iter) | |
except StopIteration: | |
return None | |
def get_scope(self): | |
# type: () -> str | |
return 'extractor.postgres_table_metadata' | |
def _get_extract_iter(self): | |
# type: () -> Iterator[TableMetadata] | |
""" | |
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata | |
:return: | |
""" | |
for key, group in groupby(self._get_raw_extract_iter(), self._get_table_key): | |
columns = [] | |
for row in group: | |
last_row = row | |
columns.append(ColumnMetadata(row['col_name'], row['col_description'], | |
row['col_type'], row['col_sort_order'])) | |
yield TableMetadata('postgres', last_row['cluster'], | |
last_row['schema_name'], | |
last_row['name'], | |
last_row['description'], | |
columns) | |
def _get_raw_extract_iter(self): | |
# type: () -> Iterator[Dict[str, Any]] | |
""" | |
Provides iterator of result row from SQLAlchemy extractor | |
:return: | |
""" | |
row = self._alchemy_extractor.extract() | |
while row: | |
yield row | |
row = self._alchemy_extractor.extract() | |
def _get_table_key(self, row): | |
# type: (Dict[str, Any]) -> Union[TableKey, None] | |
""" | |
Table key consists of schema and table name | |
:param row: | |
:return: | |
""" | |
if row: | |
return TableKey(schema_name=row['schema_name'], table_name=row['name']) | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hey @p5k6, saw you created this gist for postgres extractor. I think it would be a goo addition to Amundsen databuilder. Do you think you have time to create an Amundsen databuilder pr with unit test for this extractor?