Skip to content

Instantly share code, notes, and snippets.

@kenzo0107
Last active April 11, 2025 02:48
Show Gist options
  • Save kenzo0107/19410b226b972f06f904a70d4d853c51 to your computer and use it in GitHub Desktop.
Save kenzo0107/19410b226b972f06f904a70d4d853c51 to your computer and use it in GitHub Desktop.

https://debezium.io/documentation//reference/3.0/configuration/signalling.html

Debeziumコネクタへの信号の送信

概要

Debezium のシグナリングメカニズムは、コネクタの動作を変更したり、テーブルのアドホックスナップショットの開始など、 1 回限りのアクションをトリガーしたりする方法を提供します。 シグナルを使用してコネクタに特定のアクションを実行させるには、以下のチャネルのいずれかを使用するようにコネクタを設定します。

SourceSignalChannel

SQLコマンドを発行することで、専用のシグナリングデータコレクションにシグナルメッセージを追加できます。 ソースデータベース上に作成されるシグナリングデータコレクションは、Debeziumとの通信専用です。 シグナリングデータコレクションは、各コネクタインスタンスごとに一意である必要があります。

KafkaSignalChannel

シグナル メッセージを構成可能な Kafka トピックに送信します。

JmxSignalChannel

シグナルは JMX signal 操作を通じて送信します。

FileSignalChannel

ファイルを使用して信号を送信できます。

Custom

実装したカスタムチャネルにシグナルを送信します。 Debezium は、新しいログ記録レコードまたはアドホックスナップショットレコードがチャネルに追加されたことを検出すると、 そのシグナルを読み取り、要求された操作を開始します。

シグナリングは、以下のDebeziumコネクタで使用できます。

  • Db2
  • MariaDB (Technology Preview)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

signal.enabled.channels 設定プロパティを設定することで、有効にするチャネルを指定できます。 このプロパティには、有効にするチャネルの名前がリストされます。Debezium はデフォルトで、sourcekafka のチャネルを提供しています。 source チャネルは、増分スナップショットシグナルに必要なため、デフォルトで有効になっています。

エラー処理

soure チャネルを除き、Debezium シグナリングチャネルには再試行ポリシーは実装されていません。 シグナルを開始した後は、必ず正常に完了したかどうかを確認してください。

通知を送信するようにコネクタを構成することで、 コネクタが増分スナップショットまたはブロックスナップショットの進行状況を自動的に報告するようにすることができます。

ソースシグナリングチャネルの有効化

デフォルトでは、Debezium ソース シグナリング チャネルが有効になっています。

シグナリングを使用するコネクタごとに、シグナリングを明示的に構成する必要があります。

手順

  1. ソースデータベースに、コネクタにシグナルを送信するためのシグナリング・データ収集テーブルを作成します。 シグナリング・データ収集に必要な構造については、「シグナリング・データ収集の構造」を参照してください。
  2. ネイティブの変更データ キャプチャ (CDC) メカニズムを実装する Db2 や SQL Server などのソース データベースの場合は、シグナリング テーブルに対して CDC を有効にします。
  3. シグナリング データ コレクションの名前を Debezium コネクタ構成に追加します。 コネクタ構成で、プロパティ signal.data.collection を追加し、その値を手順 1 で作成したシグナリング データコレクションの完全修飾名に設定します。

たとえば、signal.data.collection = inventory.debezium_signals です。 シグナリング コレクションの完全修飾名の形式は、コネクタによって異なります。 次の例は、各コネクタに使用する命名形式を示しています。

完全修飾テーブル名

  • Db2
    • <schemaName>.<tableName>
  • MariaDB (Technology Preview)
    • <databaseName>.<tableName>
  • MongoDB
    • <databaseName>.<collectionName>
  • MySQL
    • <databaseName>.<tableName>
  • Oracle
    • <databaseName>.<schemaName>.<tableName>
  • PostgreSQL
    • <schemaName>.<tableName>
  • SQL Server
    • <databaseName>.<schemaName>.<tableName>

signal.data.collection プロパティの設定の詳細については、コネクタの構成プロパティの表を参照してください。

シグナリングデータ収集の構造

シグナリングデータコレクション(シグナリングテーブル)は、指定された操作をトリガーするためにコネクタに送信するシグナルを格納します。 シグナリングテーブルの構造は、以下の標準形式に準拠する必要があります。

  • 3 つのフィールド (列) が含まれます。
  • フィールドは、表 1 に示すように特定の順序で配置されます。

シグナリングデータ収集の作成

標準の SQL DDL クエリをソース データベースに送信して、シグナリング テーブルを作成します。

前提条件

  • ソース データベースにテーブルを作成するための十分なアクセス権限があります。

手順

  • 次の例に示すように、ソース データベースに SQL クエリを送信して、必要な構造と一致するテーブルを作成します。
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);

NOTE:

id 変数の VARCHAR パラメータに割り当てるスペースの量は、シグナリング テーブルに送信されるシグナルの ID 文字列のサイズを収容するのに十分な大きさである必要があります。 ID のサイズが使用可能なスペースを超えると、コネクタはシグナルを処理できません。

次の例は、3 列の debezium_signal テーブルを作成する CREATE TABLE コマンドを示しています。

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

Kafka シグナリングチャネルの有効化

Kafka シグナリングチャネルを有効にするには、signal.enabled.channels 設定プロパティにチャネルを追加し、 シグナルを受信するトピックの名前を signal.kafka.topic プロパティに追加します。 シグナリングチャネルを有効にすると、設定されたシグナルトピックに送信されたシグナルを消費するための Kafka コンシューマーが作成されます。

消費者が利用できる追加設定

NOTE:

Kafka シグナリングを使用してほとんどのコネクタでアドホック増分スナップショットをトリガーするには、まずコネクタ設定でソースシグナリングチャネルを有効にする必要があります。ソースチャネルは、増分スナップショットでキャプチャされ、ストリーミング再開後に再度キャプチャされる可能性のあるイベントの重複を排除するためのウォーターマークメカニズムを実装しています。GTID が有効になっている読み取り専用 MySQL データベースの増分スナップショットをシグナリングチャネルを使用してトリガーする場合は、ソースチャネルを有効にする必要はありません。詳細については、「MySQL 読み取り専用増分スナップショット」を参照してください。

メッセージ形式

Kafka メッセージのキーは、topic.prefix コネクタ構成オプションの値と一致する必要があります。

値は、typedata フィールドを持つ JSON オブジェクトです。

シグナル タイプが execute-snapshot に設定されている場合、data フィールドには次の表に示すフィールドが含まれている必要があります。

表2. スナップショットデータフィールドの実行

フィールド デフォルト
type incremental 実行するスナップショットの種類。現在、Debezium は増分型とブロッキング型をサポートしています。
data-collections N/A スナップショットに含めるデータコレクションの完全修飾名に一致する、カンマ区切りの正規表現の配列。
命名形式はデータベースによって異なります。
additional-conditions N/A スナップショットに含めるレコードのサブセットを決定するためにコネクタが評価する追加条件のセットを指定するオプションの配列。
追加条件はそれぞれ、アドホックスナップショットでキャプチャするデータをフィルタリングするための基準を指定するオブジェクトです。追加条件ごとに以下のプロパティを設定できます。
data-collection: フィルターが適用されるデータコレクションの完全修飾名。データコレクションごとに異なるフィルターを適用できます。
filter: スナップショットに含めるためにデータベースレコードに存在する必要がある列値を指定します(例:"color='blue')。 スナップショットプロセスは、データコレクション内のレコードをフィルター値と照合し、一致する値を含むレコードのみをキャプチャします。

フィルター プロパティに割り当てる具体的な値は、アドホック スナップショットの種類によって異なります。
・増分スナップショットの場合、スナップショットがクエリの条件句に追加する「color='blue'」などの検索条件フラグメントを指定します。
・ブロッキング スナップショットの場合は、snapshot.select.statement.overrides プロパティで設定するような完全な SELECT ステートメントを指定します。

次の例は、典型的な execute-snapshot Kafka メッセージを示しています。

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

信号アクション

シグナリングを使用して、次のアクションを開始できます。

一部の信号はすべてのコネクタと互換性がありません。

信号の記録

log シグナルタイプを指定したシグナリングテーブルエントリを作成することで、 コネクタにログへのエントリの追加をリクエストできます。 コネクタはシグナルを処理した後、指定されたメッセージをログに出力します。 必要に応じて、結果メッセージにストリーミング座標が含まれるようにシグナルを設定することもできます。

表5. ログメッセージを追加するためのシグナリングレコードの例

カラム 説明
id 924e3ff8-2245-43ca-ba77-2af9af02fa07
type log 信号のアクション タイプ
data {"message": "Signal message at offset {}"} message パラメータは、ログに出力する文字列を指定します。 メッセージにプレースホルダ ({}) を追加すると、ストリーミング座標に置き換えられます。

アドホックスナップショット信号

コネクタにアドホックスナップショットの開始を要求するには、execute-snapshotシグナルタイプを持つシグナルを作成します。 シグナルを処理後、コネクタは要求されたスナップショット操作を実行します。

コネクタが最初に起動した後に実行される初期スナップショットとは異なり、アドホックスナップショットは、 コネクタがデータベースから変更イベントのストリーミングを開始した後に、実行時に実行されます。 アドホックスナップショットはいつでも開始できます。

アドホック スナップショットは、次の Debezium コネクタで使用できます。

  • Db2
  • MariaDB (Technology Preview)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

表6. アドホックスナップショット信号レコードの例

カラム
id d139b9b7-7777-4547-917d-e1775ea61d41
type execute-snapshot
data {"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

表7. アドホックスナップショット信号メッセージの例

キー
test_connector {"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"}]}}

アドホック スナップショットの詳細については、コネクタのドキュメントの「スナップショット」トピックを参照してください。

追加リソース

アドホックスナップショット停止信号

進行中のアドホックスナップショットを停止するようコネクタに要求するには、 シグナルテーブルに stop-snapshot シグナルタイプでエントリを作成します。 シグナルを処理後、コネクタは現在進行中のスナップショット操作を停止します。

次の Debezium コネクタのアドホック スナップショットを停止できます。

  • Db2
  • MariaDB (Technology Preview)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

表8. 停止アドホックスナップショット信号レコードの例

カラム
id d139b9b7-7777-4547-917d-e1775ea61d41
type stop-snapshot
data {"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}

シグナルの種類を指定する必要があります。 data-collections フィールドはオプションです。 コネクタに現在のスナップショットのすべてのアクティビティを停止するように要求するには、data-collections フィールドを空白のままにしてください。 増分スナップショットを続行する一方で、特定のコレクションをスナップショットから除外する場合は、除外するコレクションの名前または正規表現をカンマ区切りでリスト化して指定します。コネクタがシグナルを処理後、増分スナップショットは続行されますが、指定したコレクションからデータが除外されます。

増分スナップショット

増分スナップショットは、アドホックスナップショットの一種です。 増分スナップショットでは、コネクタは初期スナップショットと同様に、指定したテーブルのベースライン状態をキャプチャします。 ただし、初期スナップショットとは異なり、増分スナップショットではテーブルを一度にすべてキャプチャするのではなく、チャンク単位でキャプチャします。 コネクタは、ウォーターマーク方式を使用してスナップショットの進行状況を追跡します。

増分スナップショットでは、指定されたテーブルの初期状態を単一のモノリシック操作ではなくチャンクでキャプチャすることにより、初期スナップショット プロセスに比べて次の利点が得られます。

  • コネクタが指定されたテーブルのベースライン状態をキャプチャしている間、トランザクションログからのほぼリアルタイムのイベントのストリーミングは中断されずに継続されます。
  • 増分スナップショット プロセスが中断された場合、停止した時点から再開できます。
  • 増分スナップショットはいつでも開始できます。

増分スナップショットの一時停止信号

シグナルテーブルにpause-snapshotシグナルタイプを指定したエントリを作成することで、コネクタに進行中の増分スナップショットを一時停止するよう要求できます。 シグナル処理後、コネクタは進行中のスナップショット操作を停止します。 そのため、スナップショット処理はシグナル処理中の時点で一時停止されるため、データ収集を指定することはできません。

次の Debezium コネクタの増分スナップショットを一時停止できます。

  • Db2
  • MariaDB (Technology Preview)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

表9. 一時停止増分スナップショット信号レコードの例

カラム
id d139b9b7-7777-4547-917d-e1775ea61d41
type pause-snapshot

信号の type を指定する必要があります。dataフィールドは無視されます。

増分スナップショット再開信号

一時停止中の増分スナップショットを再開するようコネクタに要求するには、resume-snapshotシグナルタイプを指定したシグナルテーブルエントリを作成します。シグナルを処理すると、コネクタは一時停止していたスナップショット操作を再開します。

次の Debezium コネクタの増分スナップショットを再開できます。

  • Db2
  • MariaDB (Technology Preview)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

表10. 再開増分スナップショット信号レコードの例

カラム
id d139b9b7-7777-4547-917d-e1775ea61d41
type resume-snapshot

信号の type を指定する必要があります。dataフィールドは無視されます。

増分スナップショットの詳細については、コネクタのドキュメントの「スナップショット」トピックを参照してください。

追加リソース

スナップショット信号のブロック

コネクタにアドホックブロッキングスナップショットの開始を要求するには、 execute-snapshotシグナルタイプとdata.typeblockingという値を指定してシグナルを作成します。 シグナル処理後、コネクタは要求されたスナップショット操作を実行します。

コネクタが最初に起動した後に実行される初期スナップショットとは異なり、 アドホックブロッキングスナップショットは、コネクタがデータベースから変更イベントをストリーミングするために停止した後に、 実行時に実行されます。 アドホックブロッキングスナップショットはいつでも開始できます。

ブロッキング スナップショットは、次の Debezium コネクタで使用できます。

  • Db2
  • MariaDB (Technology Preview)
  • MongoDB
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

表11. ブロッキングスナップショット信号レコードの例

カラム
id d139b9b7-7777-4547-917d-e1775ea61d41
type execute-snapshot
data {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}

表12. ブロッキングスナップショット信号メッセージの例

キー
test_connector {"type":"execute-snapshot","data": {"type": "blocking"}

スナップショットのブロックの詳細については、コネクタのドキュメントの「スナップショット」トピックを参照してください。

追加リソース

カスタムアクションの定義

カスタムアクションを使用すると、Debezium シグナリングフレームワークを拡張して、デフォルトの実装では利用できないアクションをトリガーできます。カスタムアクションは複数のコネクタで使用できます。

カスタム シグナル アクションを定義するには、次のインターフェースを定義する必要があります。

@FunctionalInterface
public interface SignalAction<P extends Partition> {

    /**
     * @param signalPayload the content of the signal
     * @return true if the signal was processed
     */
    boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException;
}

io.debezium.pipeline.signal.actions.SignalAction は、シグナリング チャネルを介して送信されるメッセージ ペイロードを表す 1 つのパラメータを持つ単一のメソッドを公開します。

カスタム シグナリング アクションを定義した後、次の SPI インターフェイスを使用して、カスタム アクションをシグナリング メカニズムで使用できるようにします: io.debezium.pipeline.signal.actions.SignalActionProvider

public interface SignalActionProvider {

    /**
     * Create a map of signal action where the key is the name of the action.
     *
     * @param dispatcher the event dispatcher instance
     * @param connectorConfig the connector config
     * @return a concrete action
     */

    <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig);
}

実装では、シグナルアクションのマップを返す必要があります。 マップキーをアクション名に設定してください。このキーはシグナルの type として使用されます。

Debeziumコアモジュールの依存関係

カスタムアクションのJavaプロジェクトは、Debeziumコアモジュールへのコンパイル依存関係を持っています。 プロジェクトのpom.xmlファイルに以下のコンパイル依存関係を含めてください。

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version>
</dependency>

上記の例では、プレースホルダ ${version.debezium} はDebeziumコネクタのバージョンを表しています。 pom.xmlファイルの<properties>セクションにあるversion.debeziumプロパティの値を指定してください。 例えば、

<properties>
    <version.debezium>3.0.8.Final</version.debezium>
</properties>

META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider ファイルでプロバイダーの実装を宣言します。

カスタムアクションの展開

前提条件

  • カスタムアクション Java プログラムがあります。

手順

  • Debezium コネクタでカスタム アクションを使用するには、Java プロジェクトを JAR ファイルにエクスポートし、そのファイルを、使用する各 Debezium コネクタの JAR ファイルが格納されているディレクトリにコピーします。 たとえば、一般的なデプロイメントでは、Debezium コネクタ ファイルは Kafka Connect ディレクトリ (/kafka/connect) のサブディレクトリに保存され、各コネクタ JAR は独自のサブディレクトリ (/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql など) に保存されます。

NOTE:

複数のコネクタでカスタム アクションを使用するには、カスタム シグナリング チャネル JAR ファイルのコピーを各コネクタのサブディレクトリに配置する必要があります。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment