Created
June 11, 2022 16:29
-
-
Save JackPott/e343267cdea7f4ba39fb8a21fec34372 to your computer and use it in GitHub Desktop.
Import Confluent topic names to Datahub (without schema)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import re | |
from dataclasses import dataclass | |
from typing import List, Optional | |
import datahub.emitter.mce_builder as builder | |
import stackprinter | |
from confluent_kafka import KafkaException | |
from confluent_kafka.admin import AdminClient, ConfigResource, TopicMetadata | |
from datahub.emitter.mcp import MetadataChangeProposalWrapper | |
from datahub.emitter.rest_emitter import DatahubRestEmitter | |
from datahub.metadata.schema_classes import (ChangeTypeClass, | |
DatasetPropertiesClass, | |
DomainsClass) | |
stackprinter.set_excepthook(style='lightbg') | |
logging.basicConfig(level=logging.INFO) | |
logging.info(__file__) | |
DATAHUB_GMS_HOST = os.getenv("DATAHUB_GMS_HOST", "http://localhost:8080") | |
logging.info(f"DATAHUB_GMS_HOST={DATAHUB_GMS_HOST}") | |
UAT_CONFLUENT_BOOTSTRAP = os.getenv("UAT_CONFLUENT_BOOTSTRAP") | |
logging.info(f"UAT_CONFLUENT_BOOTSTRAP={UAT_CONFLUENT_BOOTSTRAP}") | |
UAT_CONFLUENT_API_KEY_ID = os.getenv("UAT_CONFLUENT_API_KEY_ID") | |
logging.info(f"UAT_CONFLUENT_API_KEY_ID={UAT_CONFLUENT_API_KEY_ID[:4]}*******") | |
UAT_CONFLUENT_API_KEY_SECRET = os.getenv("UAT_CONFLUENT_API_KEY_SECRET") | |
logging.info( | |
f"UAT_CONFLUENT_API_KEY_SECRET={UAT_CONFLUENT_API_KEY_SECRET[:4]}*******") | |
@dataclass | |
class NoSchemaDataset: | |
env: str | |
platform: str | |
topic_metadata: TopicMetadata | |
description: Optional[str] = None | |
platform_instance: Optional[str] = None | |
domain: Optional[str] = None | |
class NoSchemaDatasetEmitter: | |
def __init__(self, emitter: DatahubRestEmitter): | |
self.emitter = emitter | |
def emit(self, dataset: NoSchemaDataset): | |
logging.info( | |
f"Starting metadata ingestion with provided dataSet: {dataset.topic_metadata}") | |
if dataset.platform_instance: | |
dataset_urn = builder.make_dataset_urn_with_platform_instance( | |
platform=dataset.platform, | |
platform_instance=dataset.platform_instance, | |
env=dataset.env, | |
name=dataset.topic_metadata.topic) | |
else: | |
dataset_urn = builder.make_dataset_urn( | |
platform=dataset.platform, | |
env=dataset.env, | |
name=dataset.name) | |
topic_config = describe_topic(a, dataset.topic_metadata.topic) | |
topic_config['partitions'] = str( | |
len(dataset.topic_metadata.partitions)) | |
props = DatasetPropertiesClass( | |
customProperties=topic_config | |
) | |
dataset_properties_mcp = MetadataChangeProposalWrapper( | |
entityType="dataset", | |
changeType=ChangeTypeClass.UPSERT, | |
entityUrn=dataset_urn, | |
aspectName="datasetProperties", | |
aspect=props, | |
) | |
logging.info( | |
f"Emitting 'datasetProperties' MCP for dataSet {dataset_urn}") | |
logging.debug(dataset_properties_mcp) | |
self.emitter.emit_mcp(dataset_properties_mcp) | |
# Domain aspect | |
if dataset.domain: | |
domains_aspect = DomainsClass( | |
[builder.make_domain_urn(dataset.domain)]) | |
dataset_domain_mcp = MetadataChangeProposalWrapper( | |
entityType="dataset", | |
changeType=ChangeTypeClass.UPSERT, | |
entityUrn=dataset_urn, | |
aspectName="domains", | |
aspect=domains_aspect, | |
) | |
logging.info(f"Emitting 'domains' MCP for dataset {dataset_urn}") | |
logging.debug(dataset_domain_mcp) | |
self.emitter.emit_mcp(dataset_domain_mcp) | |
def get_cluster_id(a: AdminClient) -> str: | |
md = a.list_topics(timeout=10) | |
return md.cluster_id | |
def list_topics(a: AdminClient, args=['topics']) -> 'list[TopicMetadata]': | |
md = a.list_topics(timeout=10) | |
logging.info(" {} topics:".format(len(md.topics))) | |
for t in iter(md.topics.values()): | |
logging.debug("{}".format(t)) | |
return [t for t in md.topics.values()] | |
def describe_topic(a: AdminClient, topic: str) -> dict: | |
resources = [ConfigResource("topic", topic)] | |
# Async method, returns a dict of futures keyed by the resource name | |
fs = a.describe_configs(resources) | |
# We only ever send one topic, so always take the first result | |
f = fs[resources[0]] | |
try: | |
configs = f.result() | |
return {v.name: v.value for k, v in configs.items()} | |
except KafkaException as e: | |
print("Failed to describe {}: {}".format(f, e)) | |
except Exception: | |
raise | |
def filter_deny(ip: 'list[TopicMetadata]', terms: 'list[str]') -> 'list[TopicMetadata]': | |
""" | |
Returns a filtered version of a list, removing thinks which match any of the | |
supplied regex string patterns | |
""" | |
patterns = [re.compile(t) for t in terms] | |
op = ip | |
for p in patterns: | |
op = [tmd for tmd in op if not p.match(tmd.topic)] | |
logging.info( | |
f"Removing matches for pattern '{p}'... filter_deny now has {len(op)} items") | |
return op | |
a = AdminClient({ | |
'bootstrap.servers': UAT_CONFLUENT_BOOTSTRAP, | |
'ssl.endpoint.identification.algorithm': 'https', | |
'sasl.mechanism': 'PLAIN', | |
'request.timeout.ms': '20000', | |
'sasl.username': UAT_CONFLUENT_API_KEY_ID, | |
'sasl.password': UAT_CONFLUENT_API_KEY_SECRET, | |
'security.protocol': 'SASL_SSL' | |
}) | |
emitter = DatahubRestEmitter(DATAHUB_GMS_HOST) | |
cluster_id = get_cluster_id(a) | |
all_topics = list_topics(a) | |
after_deny = filter_deny( | |
all_topics, ['_.*', | |
'pksql.*', | |
'cp-connect.*']) | |
# Pattern to extract domain when topic is in 'atech.domain.topicname' format | |
domain_pattern = re.compile("^atech\.(.*?)\..*") | |
for t in after_deny: | |
found_domain = domain_pattern.match(t.topic) | |
if found_domain: | |
domain = found_domain.group(1) | |
logging.info( | |
f"Found a domain name from the topic name: '{domain}'") | |
else: | |
domain = None | |
nsd = NoSchemaDataset( | |
platform="kafka", | |
platform_instance=cluster_id, | |
env="UAT", | |
topic_metadata=t, | |
domain=domain, | |
) | |
NoSchemaDatasetEmitter(emitter=emitter).emit(nsd) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment