Skip to content

Instantly share code, notes, and snippets.

@kenzo0107
Last active April 16, 2025 05:11
Show Gist options
  • Save kenzo0107/75c7b60a6bd0a3b206045f6349e1e449 to your computer and use it in GitHub Desktop.
Save kenzo0107/75c7b60a6bd0a3b206045f6349e1e449 to your computer and use it in GitHub Desktop.

https://iceberg.apache.org/docs/nightly/kafka-connect/

Kafka コネクト

Kafka Connectは、コネクタを介してKafkaとの間でデータをやり取りするための人気のフレームワークです。 KafkaからS3にデータを書き込むためのS3シンクや、リレーショナルデータベースからKafkaに変更データキャプチャレコードを書き込むための Debeziumソースコネクタなど、様々なコネクタが利用可能です。

シンプルで分散化された分散アーキテクチャを採用しています。クラスターは複数のワーカープロセスで構成され、 コネクタはこれらのプロセス上でタスクを実行して作業を実行します。 コネクタのデプロイは設定に基づいて行われるため、コネクタを実行するためにコードを記述する必要はありません。

Apache Iceberg シンクコネクタ

Kafka Connect 用の Apache Iceberg Sink Connector は、Kafka から Iceberg テーブルにデータを書き込むためのシンク コネクタです。

特徴

  • 集中型アイスバーグコミットのコミット調整
  • Exactly-once 正確に1回だけの配信セマンティクス
  • マルチテーブルファンアウト
  • 自動テーブル作成とスキーマ進化
  • Icebergの列マッピング機能によるフィールド名マッピング

インストール

コネクタのzipアーカイブはIcebergビルドの一部として作成されます。ビルドは以下から実行できます。

./gradlew -x test -x integrationTest clean build

zipアーカイブは./kafka-connect/kafka-connect-runtime/build/distributionsにあります。 Hiveメタストアクライアントと関連する依存関係をバンドルしたディストリビューションと、バンドルしていないディストリビューションがあります。 ディストリビューションアーカイブをすべてのノードのKafka Connectプラグインディレクトリにコピーしてください。

要件

シンクは、正確に1回だけのセマンティクスを実現するためにKIP-447に依存しています。これにはKafka 2.5以降が必要です。

構成

プロパティ 説明
iceberg.tables 宛先テーブルのコンマ区切りリスト
iceberg.tables.dynamic-enabled routeRegex を使用する代わりに、routeField で指定されたテーブルにルーティングするには true に設定します。デフォルトは false です。
iceberg.tables.route-field マルチテーブルファンアウトの場合、レコードをテーブルにルーティングするために使用されるフィールドの名前
iceberg.tables.default-commit-branch コミットのデフォルトブランチ。指定されていない場合はメインが使用されます。
iceberg.tables.default-id-columns テーブル内の行を識別する列のデフォルトのコンマ区切りリスト(主キー)
iceberg.tables.default-partition-by テーブルを作成するときに使用するパーティション フィールド名のデフォルトのコンマ区切りリスト
iceberg.tables.auto-create-enabled 宛先テーブルを自動的に作成するには true に設定し、デフォルトは false です
iceberg.tables.evolve-schema-enabled 不足しているレコードフィールドをテーブルスキーマに追加するには true に設定します。デフォルトは false です。
iceberg.tables.schema-force-optional テーブルの作成と展開中に列をオプションとして設定するには true に設定し、スキーマを尊重するにはデフォルトは false です。
iceberg.tables.schema-case-insensitive 大文字と小文字を区別しない名前でテーブル列を検索するには true に設定し、大文字と小文字を区別する場合は false に設定します。
iceberg.tables.auto-create-props.* 自動作成時に新しいテーブルに設定されるプロパティ
iceberg.tables.write-props.* Icebergライターの初期化に渡されるプロパティは、優先されます
iceberg.table.<table name>.commit-branch コミット用のテーブル固有のブランチ。指定されていない場合は iceberg.tables.default-commit-branch を使用します。
iceberg.table.<table name>.id-columns 表内の行を識別する列のカンマ区切りリスト(主キー)
iceberg.table.<table name>.partition-by テーブルの作成時に使用するパーティション フィールドのコンマ区切りリスト
iceberg.table.<table name>.route-regex レコードのルートフィールドをテーブルに一致させるために使用される正規表現
iceberg.control.topic 制御トピックの名前。デフォルトは control-iceberg です。
iceberg.control.group-id-prefix 制御コンシューマ グループのプレフィックス。デフォルトは cg-control です。
iceberg.control.commit.interval-ms コミット間隔(ミリ秒)。デフォルトは 300,000(5 分)
iceberg.control.commit.timeout-ms コミットタイムアウト間隔(ミリ秒)。デフォルトは30,000(30秒)です。
iceberg.control.commit.threads コミットに使用するスレッド数。デフォルトは (コア * 2)
iceberg.coordinator.transactional.prefix コーディネータ プロデューサーに使用するトランザクション ID のプレフィックス。デフォルトではプレフィックスなしまたは空のプレフィックスを使用します。
iceberg.catalog Name of the catalog, default is iceberg
iceberg.catalog.* Icebergカタログの初期化に渡されるプロパティ
iceberg.hadoop-conf-dir 指定すると、このディレクトリ内のHadoop設定ファイルがロードされます
iceberg.hadoop.* Hadoop 構成に渡されるプロパティ
iceberg.kafka.* トピック Kafka クライアントの初期化を制御するために渡されるプロパティ

iceberg.tables.dynamic-enabledfalse(デフォルト)の場合、iceberg.tables を指定する必要があります。iceberg.tables.dynamic-enabledtrue の場合、テーブル名を含む iceberg.tables.route-field を指定する必要があります。

Kafka の設定

デフォルトでは、コネクタはワーカープロパティからKafkaクライアント設定を使用して制御トピックに接続しようとします。 何らかの理由でその設定を読み取れない場合は、iceberg.kafka.*プロパティを使用してKafkaクライアント設定を明示的に設定できます。

メッセージ形式

メッセージは、適切な Kafka Connect コンバーターを使用して構造体またはマップに変換する必要があります。

カタログ構成

Icebergカタログへの接続には、iceberg.catalog.* プロパティが必要です。 REST、Glue、DynamoDB、Hadoop、Nessie、JDBC、Hiveなど、主要なカタログタイプはデフォルトのディストリビューションに含まれています。 JDBCドライバーはデフォルトのディストリビューションには含まれていないため、必要に応じて追加する必要があります。 Hiveカタログを使用する場合は、Hiveメタストアクライアントが含まれているディストリビューションを使用できます。 そうでない場合は、自分で追加する必要があります。

カタログの種類を設定するには、iceberg.catalog.typeresthive、または hadoop に設定します。 その他のカタログの種類の場合は、iceberg.catalog.catalog-impl をカタログクラス名に設定する必要があります。

RESTの例

"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog-service",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse>",

Hiveの例

注: HMSクライアントが含まれているディストリビューションを使用してください(またはHMSクライアントを自分で追加してください)。 S3をストレージとして使用する場合は、S3FileIOを使用してください(デフォルトはHiveCatalogを使用したHadoopFileIOです)。

"iceberg.catalog.type": "hive",
"iceberg.catalog.uri": "thrift://hive:9083",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.client.region": "us-east-1",
"iceberg.catalog.s3.access-key-id": "<AWS access>",
"iceberg.catalog.s3.secret-access-key": "<AWS secret>",

Glue の例

"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",

Nessie の例

"iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
"iceberg.catalog.uri": "http://localhost:19120/api/v2",
"iceberg.catalog.ref": "main",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",

注記

設定によっては、iceberg.catalog.s3.endpointiceberg.catalog.s3.staging-dir、またはiceberg.catalog.s3.path-style-accessの設定も必要になる場合があります。 カタログの設定に関する詳細は、Icebergのドキュメントをご覧ください。

Hiveカタログを使用する場合は、Hiveメタストアクライアントが含まれているディストリビューションを使用できます。

Azure ADLS の構成例

ADLSを使用する場合、AzureのJava SDKではAZURE_CLIENT_ID、AZURE_TENANT_ID、AZURE_CLIENT_SECRETを渡す必要があります。 Kafka Connectをコンテナー内で実行している場合は、これらの値を環境変数として挿入してください。 詳細については、Java用Azure Identity Clientライブラリをご覧ください。

たとえば次のようになります。

AZURE_CLIENT_ID=e564f687-7b89-4b48-80b8-111111111111
AZURE_TENANT_ID=95f2f365-f5b7-44b1-88a1-111111111111
AZURE_CLIENT_SECRET="XXX"

CLIENT_IDはアプリ登録に登録されているアプリケーションのアプリケーションID、TENANT_IDはAzureテナントプロパティから取得します。 CLIENT_SECRETは、特定のアプリ登録を選択した後、「管理」の「証明書とシークレット」セクションで作成されます。 クライアントシークレットを生成するには、中央のパネルで「クライアントシークレット」を選択し、 「新しいクライアントシークレット」の前の「+」をクリックする必要がある場合があります。 この変数にはIDではなく値を設定するようにしてください。

また、ストレージ アカウントのアクセス制御 (IAM) で、アプリ登録に「ストレージ BLOB データ共同作成者」のロール割り当てが付与されていることも重要です。 付与されていない場合、そこに新しいファイルを書き込むことはできません。

次に、コネクタの設定に以下の内容を追加します。

"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog:8181",
"iceberg.catalog.warehouse": "abfss://[email protected]/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO",
"iceberg.catalog.include-credentials": "true"

ここで、storage-container-name はAzureストレージアカウント内のコンテナー名、/warehouse はApache Icebergファイルがデフォルトで書き込まれるコンテナー内の場所(またはiceberg.tables.auto-create-enabled=trueの場合)です。 include-credentialsパラメータはAzure Javaクライアントの資格情報を渡します。これにより、Iceberg Sinkコネクタがiceberg.catalog.uriのRESTカタログ実装に接続し、ADLSv2クライアントに必要な接続文字列を取得するように構成されます。

Google GCS の設定例

デフォルトでは、GCSへの接続にはアプリケーションのデフォルト認証情報(ADC)が使用されます。ADCの仕組みの詳細については、Google Cloudのドキュメントをご覧ください。

"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog:8181",
"iceberg.catalog.warehouse": "gs://bucket-name/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.google.gcs.GCSFileIO"

Hadoopの設定

HDFSまたはHiveを使用する場合、シンクはHadoop設定を初期化します。まず、クラスパスから設定ファイルが読み込まれます。 次に、iceberg.hadoop-conf-dirが指定されている場合は、その場所から設定ファイルが読み込まれます。 最後に、シンク設定のiceberg.hadoop.*プロパティが適用されます。 これらのプロパティをマージする際の優先順位は、シンク設定 > 設定ディレクトリ > クラスパスの順になります。

初期設定

ソーストピック

これは、ソース トピックがすでに存在し、 events という名前が付けられていることを前提としています。

制御トピック

Kafkaクラスターの auto.create.topics.enable が true(デフォルト)に設定されている場合、 コントロールトピックは自動的に作成されます。 そうでない場合は、まずトピックを作成する必要があります。デフォルトのトピック名は control-iceberg です。

bin/kafka-topics  \
  --command-config command-config.props \
  --bootstrap-server ${CONNECT_BOOTSTRAP_SERVERS} \
  --create \
  --topic control-iceberg \
  --partitions 1

注意: Confluent Cloud で実行されているクラスターでは、auto.create.topics.enable はデフォルトで false に設定されています。

Iceberg カタログの構成

iceberg.catalog. というプレフィックスを持つ設定プロパティは、Icebergカタログの初期化に渡されます。 特定のカタログの設定方法の詳細については、Icebergのドキュメントをご覧ください。

単一の宛先テーブル

この例では、すべての受信レコードを 1 つのテーブルに書き込みます。

宛先テーブルを作成する

CREATE TABLE default.events (
    id STRING,
    type STRING,
    ts TIMESTAMP,
    payload STRING)
PARTITIONED BY (hours(ts))

コネクタ構成

このサンプル構成は、Iceberg REST カタログに接続します。

{
  "name": "events-sink",
  "config": {
    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "2",
    "topics": "events",
    "iceberg.tables": "default.events",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "https://localhost",
    "iceberg.catalog.credential": "<credential>",
    "iceberg.catalog.warehouse": "<warehouse name>"
  }
}

マルチテーブルファンアウト、静的ルーティング

この例では、typeがlistに設定されたレコードをdefault.events_listテーブルに書き込み、typeがcreateに設定されたレコードをdefault.events_createテーブルに書き込みます。 その他のレコードはスキップされます。

2つの宛先テーブルを作成する

CREATE TABLE default.events_list (
    id STRING,
    type STRING,
    ts TIMESTAMP,
    payload STRING)
PARTITIONED BY (hours(ts));

CREATE TABLE default.events_create (
    id STRING,
    type STRING,
    ts TIMESTAMP,
    payload STRING)
PARTITIONED BY (hours(ts));

コネクタ構成

{
  "name": "events-sink",
  "config": {
    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "2",
    "topics": "events",
    "iceberg.tables": "default.events_list,default.events_create",
    "iceberg.tables.route-field": "type",
    "iceberg.table.default.events_list.route-regex": "list",
    "iceberg.table.default.events_create.route-regex": "create",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "https://localhost",
    "iceberg.catalog.credential": "<credential>",
    "iceberg.catalog.warehouse": "<warehouse name>"
  }
}

マルチテーブルファンアウト、動的ルーティング

この例では、db_tableフィールドの値に基づいて名前が付けられたテーブルに書き込みます。 名前のテーブルが存在しない場合、レコードはスキップされます。 例えば、レコードのdb_tableフィールドがdefault.events_listに設定されている場合、レコードはdefault.events_listテーブルに書き込まれます。

2つの宛先テーブルを作成する

2 つのテーブルを作成する方法については上記を参照してください。

コネクタ構成

{
  "name": "events-sink",
  "config": {
    "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
    "tasks.max": "2",
    "topics": "events",
    "iceberg.tables.dynamic-enabled": "true",
    "iceberg.tables.route-field": "db_table",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "https://localhost",
    "iceberg.catalog.credential": "<credential>",
    "iceberg.catalog.warehouse": "<warehouse name>"
  }
}

Apache Iceberg Sink コネクタ用の SMT

このプロジェクトには、Iceberg シンク コネクタで使用するために Kafka データを変換するときに役立つ SMT がいくつか含まれています。

CopyValue

(実験的)

CopyValue SMT は、あるフィールドの値を新しいフィールドにコピーします。

構成

プロパティ 説明
source.field ソースフィールド名
target.field 対象フィールド名

"transforms": "copyId",
"transforms.copyId.type": "org.apache.iceberg.connect.transforms.CopyValue",
"transforms.copyId.source.field": "id",
"transforms.copyId.target.field": "id_copy",

DmsTransform

(実験的)

DmsTransform SMTは、AWS DMS形式のメッセージをシンクのCDC機能で使用できるように変換します。 データ要素フィールドを最上位に昇格し、メタデータフィールド_cdc.op_cdc.ts_cdc.sourceを追加します。

構成

SMT には現在構成がありません。

DebeziumTransform

(実験的)

DebeziumTransform SMTは、シンクのCDC機能で使用できるようにDebezium形式のメッセージを変換します。 このSMTは、要素の前後のフィールドを最上位に昇格させ、次のメタデータフィールドを追加します: _cdc.op_cdc.ts_cdc.offset_cdc.source_cdc.target_cdc.key

構成
プロパティ 説明
cdc.target.pattern CDC ターゲット フィールド値を設定するために使用するパターン。デフォルトは {db}.{table} です。

JsonToMapTransform

(実験的)

JsonToMapTransform SMTは、文字列をJSONオブジェクトのペイロードとして解析し、スキーマを推論します。 スキーマレスデータ(Kafkaが提供するJsonConverterによって生成されたMapなど)用のiceberg-kafka-connectコネクタは、MapをIceberg構造体に変換します。 JSONが適切に構造化されている場合は問題ありませんが、キーが動的に変化するJSONオブジェクトの場合、スキーマの進化によりIcebergテーブルの列数が爆発的に増加します。

このSMTは、JSONが適切に構造化されていない場合に、データをIcebergに取り込み、クエリエンジンでより扱いやすい形式に処理するために役立ちます。 ネストされたオブジェクトをMapに変換し、Map型をスキーマに追加します。 コネクタはスキーマを尊重し、JSONオブジェクト用のIceberg Map (String)列を持つIcebergテーブルを作成します。

注記:

  • コネクタの value.converter 設定には、jsonConverter ではなく stringConverter を使用する必要があります。
    • これらの文字列には JSON オブジェクト ({...}) が必要です。
  • メッセージキー、トゥームストーン、ヘッダーは変換されず、SMTによってそのまま渡されます。
構成
プロパティ 説明
json.root (false) ルートから開始するブール値

transforms.IDENTIFIER_HERE.json.root は、最も不整合なデータを対象としています。 これは、Map<String, String> のスキーマを持つ、payload という単一のフィールドを持つ構造体を構築します。

transforms.IDENTIFIER_HERE.json.root が false(デフォルト)の場合、プリミティブ型および配列フィールドの推論されたスキーマを使用して構造体を構築します。ネストされたオブジェクトは Map<String, String> 型のフィールドになります。

空の配列と空のオブジェクトを持つキーは、最終スキーマから除外されます。 配列は型付けされますが、JSON配列に型が混在している場合は文字列の配列に変換されます。

例のJSON:

{
  "key": 1, 
  "array": [1,"two",3],
  "empty_obj": {},
  "nested_obj": {"some_key": ["one", "two"]}
}

json.root が true の場合は次のようになります。

SinkRecord.schema: 
  "payload" : (Optional) Map<String, String>

Sinkrecord.value (Struct): 
  "payload"  : Map(
    "key" : "1",
    "array" : "[1,"two",3]"
    "empty_obj": "{}"
    "nested_obj": "{"some_key":["one","two"]}}"
   )

json.rootがfalseの場合は次のようになります

SinkRecord.schema: 
  "key": (Optional) Int32,
  "array": (Optional) Array<String>,
  "nested_object": (Optional) Map<string, String>

SinkRecord.value (Struct):
 "key" 1, 
 "array" ["1", "two", "3"] 
 "nested_object" Map ("some_key" : "["one", "two"]") 

KafkaMetadataTransform

(実験的)

KafkaMetadata は、Kafka メッセージのプロパティであるtopicpartitionoffsettimestampを挿入します。

構成

プロパティ 説明
field_name フィールドのプレフィックス (_kafka_metadata)
nested (false) trueの場合、構造体にデータをネストし、そうでない場合はプレフィックス付きフィールドとしてトップレベルに追加します。
external_field (なし) メタデータに定数キーと値を追加します (例: クラスター名)

nestedがオンの場合: _kafka_metadata.topic_kafka_metadata.partition_kafka_metadata.offset_kafka_metadata.timestamp

nestedがオフの場合: _kafka_metdata_topic_kafka_metadata_partition_kafka_metadata_offset_kafka_metadata_timestamp

MongoDebeziumTransform

(実験的)

MongoDebeziumTransform SMT は、before / after の BSON 文字列を含む Mongo Debezium 形式のメッセージを、DebeziumTransform SMT が期待する before / after の型指定された構造体に変換します。

基礎となるカタログ タイプで mongodb 列がサポートされていない場合、列の名前変更は (まだ) サポートされません。

構成

プロパティ 説明
array_handling_mode arrayまたはdocumentを使用して配列処理モードを設定します

値配列(デフォルト)は、配列を配列データ型としてエンコードします。 特定の配列インスタンスのすべての要素が同じ型であることを保証するのはユーザーの責任です。 このオプションは制限が多いですが、下流のクライアントによる配列の処理を容易にします。

値ドキュメントは、BSONシリアル化と同様の方法で、配列を構造体の構造体に変換します。 メイン構造体には、_0、_1、_2などの名前を持つフィールドが含まれます。 これらの名前は、配列内の要素のインデックスを表します。 各要素は、指定されたフィールドの値として渡されます。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment