Apache Kafkaへのシンク(ベータ版)
このドキュメントでは、TiDB Cloud EssentialからApache Kafkaへデータをストリーミングするためのチェンジフィードを作成する方法について説明します。
制限
- TiDB Cloud Essentialインスタンスごとに、最大10個のチェンジフィードを作成できます。
- 現在、 TiDB Cloud Essentialは、Kafkaブローカーへの接続に自己署名TLS証明書をアップロードすることをサポートしていません。
- TiDB Cloud Essential はTiCDC を使用して変更フィードを確立するため、同じTiCDCの制限があります。
- 複製対象のテーブルに主キーまたはNULLを許容しない一意インデックスがない場合、複製中に一意制約が存在しないことで、一部の再試行シナリオにおいて、下流で重複データが挿入される可能性があります。
前提条件
Apache Kafkaにデータをストリーミングするためのチェンジフィードを作成する前に、以下の前提条件を満たす必要があります。
- ネットワーク接続を設定する
- Kafka ACL認証の権限を追加する
ネットワーク
TiDB Cloud EssentialインスタンスがApache Kafkaサービスに接続できることを確認してください。接続方法は以下のいずれかを選択できます。
- プライベートリンク接続:セキュリティコンプライアンスを満たし、ネットワーク品質を確保します。
- 公共ネットワーク:迅速なセットアップに適しています。
プライベートリンク接続は、クラウドプロバイダーのプライベートリンク技術を活用し、VPC内のリソースがプライベートIPアドレスを使用して他のVPC内のサービスに接続できるようにします。これにより、あたかもそれらのサービスがVPC内で直接ホストされているかのように動作します。
TiDB Cloud Essentialは現在、セルフホスト型Kafka、Confluent Cloud Dedicatedクラスター、およびAmazon MSK Provisioned向けのプライベートリンク接続のみをサポートしています。他のKafka SaaSサービスとの直接統合はサポートしていません。
Kafkaのデプロイメントとクラウドプロバイダーに基づいてプライベートリンク接続を設定するには、以下のガイドを参照してください。
Apache Kafkaサービスへのパブリックアクセスを提供する場合は、すべてのKafkaブローカーにパブリックIPアドレスまたはドメイン名を割り当ててください。
本番環境でパブリックアクセスを使用することは推奨されません。
Kafka ACL認証
TiDB Cloud Essential の変更フィードが Apache Kafka にデータをストリーミングし、Kafka トピックを自動的に作成できるようにするには、Kafka に次の権限が追加されていることを確認してください。
- Kafka のトピック リソース タイプに
CreateおよびWrite権限が追加されます。 - Kafka のクラスタ リソース タイプに
DescribeConfigs権限が追加されます。
たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については、Confluent ドキュメントのリソースとACLの追加を参照してください。
ステップ1. Apache KafkaのChangefeedページを開きます。
- TiDB Cloudコンソールにログインします。
- 対象のTiDB Cloud Essentialインスタンスの概要ページに移動し、左側のナビゲーションペインで「データ」 > 「変更フィード」をクリックします。
- 「変更フィードの作成」をクリックし、次に「宛先」として「Kafka」を選択します。
ステップ2. changefeedターゲットを設定する
手順は、選択した接続方法によって異なります。
「接続方法」で「パブリック」を選択し、Kafkaブローカーのエンドポイントを入力します。複数のエンドポイントはカンマ
,を使用して区切ることができます。Kafkaの認証設定に応じて、認証オプションを選択してください。
- Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
- Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名とパスワードを入力してください。
Kafkaのバージョンについては、ご使用のKafkaのバージョンに基づいて、 Kafka v2またはKafka v3を選択してください。
この変更フィード内のデータの圧縮タイプを選択してください。
KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。
「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。
接続方法で「プライベートリンク」を選択します。
「プライベートリンク接続」で、ネットワークセクションで作成したプライベートリンク接続を選択します。プライベートリンク接続のアベイラビリティゾーンが、Kafkaデプロイメントのアベイラビリティゾーンと一致していることを確認してください。
ネットワークセクションで取得したブートストラップポートを入力してください。Amazon MSKプロビジョニング済みプライベートリンク接続を使用している場合は、このフィールドはスキップできます。
Kafkaの認証設定に応じて、認証オプションを選択してください。
- Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
- Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名とパスワードを入力してください。
Kafkaのバージョンについては、ご使用のKafkaのバージョンに基づいて、 Kafka v2またはKafka v3を選択してください。
この変更フィード内のデータの圧縮タイプを選択してください。
KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。
KafkaでTLS SNI検証が必要な場合は、 TLSサーバー名を入力してください。例:
Confluent Cloud Dedicated clusters。「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。
ステップ3.チェンジフィードを設定する
テーブル フィルターをカスタマイズして、複製するテーブルをフィルターします。ルールの構文については、 テーブルフィルタルールを参照してください。
- レプリケーション範囲:有効なキーを持つテーブルのみをレプリケートするか、選択したすべてのテーブルをレプリケートするかを選択できます。
- フィルタルール:この列でフィルタルールを設定できます。デフォルトでは、すべてのテーブルを複製するルール
*.*が設定されています。新しいルールを追加して[適用] をクリックすると、 TiDB Cloud はTiDB 内のすべてのテーブルをクエリし、フィルタ結果の下にルールに一致するテーブルのみを表示します。 - 大文字小文字の区別:フィルタルールにおけるデータベース名とテーブル名の照合において、大文字小文字を区別するかどうかを設定できます。デフォルトでは、大文字小文字は区別されません。
- 有効なキーで結果をフィルタリングする:この列には、主キーや一意インデックスなど、有効なキーを持つテーブルが表示されます。
- 有効なキーのない結果をフィルタリングする: この列には、主キーまたは一意キーがないテーブルが表示されます。一意の識別子がないと、ダウンストリームが重複イベントを処理する際にデータの一貫性が失われる可能性があるため、これらのテーブルはレプリケーション中に問題となります。データの一貫性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意キーまたは主キーを追加することをお勧めします。または、フィルタ ルールを追加してこれらのテーブルを除外することもできます。たとえば、ルール
test.tbl1を使用して、テーブル"!test.tbl1"を除外できます。
イベントフィルターをカスタマイズして、複製したいイベントを絞り込みます。
- 一致するテーブル:この列では、イベントフィルターを適用するテーブルを設定できます。ルールの構文は、前のテーブルフィルター領域で使用されているものと同じです。
- イベントフィルター:無視したいイベントを選択できます。
カラムセレクタをカスタマイズして、イベントから列を選択し、選択した列に関連するデータ変更のみを下流に送信します。
- 一致するテーブル:列セレクターを適用するテーブルを指定します。どのルールにも一致しないテーブルの場合、すべての列が送信されます。
- カラムセレクター:一致したテーブルのどの列をダウンストリームに送信するかを指定します。
マッチングルールの詳細については、 カラムセレクターを参照してください。
データフォーマット領域で、希望するKafkaメッセージのフォーマットを選択してください。
- Avroは、コンパクトで高速なバイナリデータフォーマットであり、豊富なデータ構造を備え、様々なフローシステムで広く利用されています。詳細については、 Avroデータ形式参照してください。
- Canal-JSONは、解析が容易なプレーンなJSONテキスト形式です。詳細については、 Canal-JSONデータ形式参照してください。
- オープン プロトコルは、監視、キャッシュ、全文インデックス作成、分析エンジン、および異なるデータベース間のプライマリとセカンダリのレプリケーションのためのデータ ソースを提供する行レベルのデータ変更通知プロトコルです。詳細については、 オープンプロトコルデータフォーマットを参照してください。
- Debeziumは、データベースの変更をキャプチャするためのツールです。キャプチャされた各データベース変更を「イベント」と呼ばれるメッセージに変換し、これらのイベントをKafkaに送信します。詳細については、 Debeziumデータ形式参照してください。
TiDB拡張フィールドをKafkaメッセージ本文に追加する場合は、 TiDB拡張オプションを有効にしてください。
TiDB 拡張フィールドの詳細については、 Avroデータ形式のTiDB拡張フィールドフィールド」およびCanal-JSONデータ形式のTiDB拡張フィールド参照してください。
データ形式としてAvroを選択すると、ページにAvro固有の設定項目が表示されます。これらの設定項目は、以下のように入力できます。
- 「Decimal」および「Unsigned BigInt」の設定では、 TiDB CloudがKafkaメッセージ内のdecimal型およびunsigned bigint型のデータ型をどのように処理するかを指定します。
- スキーマレジストリ領域で、スキーマレジストリのエンドポイントを入力します。HTTP認証を有効にする場合は、ユーザー名とパスワードを入力します。
トピック配信エリアで配信モードを選択し、選択したモードに応じてトピック名の設定を入力します。
データ形式としてAvroを選択した場合、 [配信モード]ドロップダウンリストでは、[テーブルごとの変更ログをKafkaトピックに配信する]モードのみを選択できます。
配信モードは、変更フィードがKafkaトピックを作成する方法を制御します。テーブルごと、データベースごと、またはすべての変更ログに対して1つのトピックを作成するかを選択できます。
テーブルごとに変更ログをKafkaトピックに配信する
変更フィードでテーブルごとに専用の Kafka トピックを作成する場合は、このモードを選択します。そうすると、テーブルのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピックのプレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定することで、テーブルのトピック名をカスタマイズできます。たとえば、区切り文字を
_に設定すると、トピック名は<Prefix><DatabaseName>_<TableName><Suffix>の形式になります。スキーマ作成イベントなど、行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、指定されたトピックに基づいて、これらの変更ログを収集するためのトピックを作成します。
データベースごとに変更ログをKafkaトピックに配信する
変更フィードでデータベースごとに専用のKafkaトピックを作成する場合は、このモードを選択してください。そうすると、データベースのすべてのKafkaメッセージが専用のKafkaトピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。
解決済みTsイベントなど、行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、指定されたトピックに基づいて、これらの変更ログを収集するためのトピックを作成します。
すべての変更ログを、指定された1つのKafkaトピックに送信する
変更フィードで全ての変更ログに対して1つのKafkaトピックを作成する場合は、このモードを選択してください。そうすると、変更フィード内のすべてのKafkaメッセージが1つのKafkaトピックに送信されます。トピック名は「トピック名」フィールドで指定できます。
パーティション分散領域では、Kafka メッセージの送信先パーティションを決定できます。すべてのテーブルに対して単一のパーティションディスパッチャを定義することも、テーブルごとに異なるパーティションディスパッチャを定義することもできます。TiDB Cloud、次の 4 種類のディスパッチャが提供されています。
プライマリキーまたはインデックス値に基づいて変更ログをKafkaパーティションに分散します。
変更フィードでテーブルの Kafka メッセージを異なるパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの主キーまたはインデックス値によって、変更ログの送信先パーティションが決まります。主キーを使用する場合は、 「インデックス名」フィールドを空のままにしてください。この分散方法は、パーティションのバランスを改善し、行レベルの順序性を確保します。
変更ログをテーブルごとにKafkaパーティションに分散する
変更フィードによってテーブルの Kafka メッセージを 1 つの Kafka パーティションに送信する場合は、この分散方法を選択してください。行の変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法はテーブルの順序性を保証しますが、パーティションのバランスが崩れる可能性があります。
変更ログをタイムスタンプに基づいてKafkaパーティションに分散する
変更フィードが Kafka メッセージをランダムに異なる Kafka パーティションに送信するようにするには、この分散方法を選択してください。行の変更ログの commitTs によって、変更ログが送信されるパーティションが決まります。この分散方法は、パーティションのバランスを改善し、各パーティションの順序性を確保します。ただし、データ項目の複数の変更が異なるパーティションに送信され、異なるコンシューマーのコンシューマー処理の進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは、複数のパーティションからのデータを消費する前に commitTs でソートする必要があります。
変更ログを列の値に基づいてKafkaパーティションに分散する
テーブルのKafkaメッセージを異なるパーティションに送信するように変更フィードを設定したい場合は、この配信方法を選択してください。行の変更ログで指定された列の値によって、変更ログの送信先パーティションが決まります。この配信方法は、各パーティション内の順序性を保証し、同じ列値を持つ変更ログが同じパーティションに送信されることを保証します。
トピックコンフィグレーション領域で、以下の数値を設定してください。changefeedは、これらの数値に基づいてKafkaトピックを自動的に作成します。
- レプリケーション係数:各KafkaメッセージがレプリケートされるKafkaサーバーの数を制御します。有効な値の範囲は、
min.insync.replicasからKafkaブローカーの数までです。 - パーティション番号:トピックに存在するパーティションの数を制御します。有効な値の範囲は
[1, 10 * the number of Kafka brokers]です。
- レプリケーション係数:各KafkaメッセージがレプリケートされるKafkaサーバーの数を制御します。有効な値の範囲は、
[イベントの分割]エリアで、
UPDATEイベントを別々のDELETEとINSERTイベントに分割するか、生のUPDATEイベントとして保持するかを選択します。詳細については、 MySQL以外のシンクにおける、主キーまたは一意キーを分割したUPDATEイベント参照してください。「次へ」をクリックしてください。
ステップ4.変更フィードを確認して作成する
- 変更フィード名欄に、変更フィードの名前を指定します。
- 設定した変更フィードの設定をすべて確認してください。必要に応じて「戻る」をクリックして変更してください。
- すべての設定が正しければ、 「送信」をクリックして変更フィードを作成します。