Apache Kafkaへのシンク
このドキュメントでは、TiDB CloudからApache Kafkaへデータをストリーミングするためのチェンジフィードを作成する方法について説明します。
注記:
変更フィード機能を使用するには、 TiDB Cloud Dedicatedクラスタのバージョンがv6.1.3以降であることを確認してください。
制限
- TiDB Cloud Dedicatedクラスターごとに、最大 100 個の変更フィードを作成できます。
- 現在、 TiDB Cloudは、Kafkaブローカーに接続するための自己署名TLS証明書のアップロードをサポートしていません。
- TiDB Cloud はTiCDC を使用して変更フィードを確立するため、同じTiCDCの制限があります。
- 複製対象のテーブルに主キーまたはNULLを許容しない一意インデックスがない場合、複製中に一意制約が存在しないことで、一部の再試行シナリオにおいて、下流で重複データが挿入される可能性があります。
ネットワーク接続方法としてプライベートリンクまたはプライベートサービスコネクトを選択する場合は、 TiDB Cloud Dedicatedクラスタのバージョンが以下の要件を満たしていることを確認してください。
- v6.5.xの場合:バージョンv6.5.9以降
- v7.1.xの場合:バージョンv7.1.4以降
- v7.5.xの場合:バージョンv7.5.1以降
- v8.1.xの場合:v8.1.x以降のすべてのバージョンがサポートされています。
データ形式としてDebeziumを使用する場合は、 TiDB Cloud Dedicatedクラスターのバージョンがv8.1.0以降であることを確認してください。
Kafkaメッセージのパーティション分散については、以下の点に注意してください。
- 変更ログをプライマリキーまたはインデックス値に基づいて、指定したインデックス名を持つ Kafka パーティションに分散する場合は、 TiDB Cloud Dedicatedクラスターのバージョンが v7.5.0 以降であることを確認してください。
- 変更ログを列値ごとにKafkaパーティションに分散する場合は、 TiDB Cloud Dedicatedクラスタのバージョンがv7.5.0以降であることを確認してください。
前提条件
Apache Kafkaにデータをストリーミングするためのチェンジフィードを作成する前に、以下の前提条件を満たす必要があります。
- ネットワーク接続を設定する
- Kafka ACL認証の権限を追加する
ネットワーク
TiDB Cloud DedicatedクラスターApache Kafka サービスに接続できることを確認します。次の接続方法のいずれかを選択できます。- プライベート接続: VPC CIDR の競合を回避し、セキュリティ コンプライアンスを満たすのに最適ですが、 プライベートデータリンクのコストが発生します。
- VPCピアリング:費用対効果の高い選択肢として適していますが、潜在的なVPC CIDRの競合やセキュリティ上の考慮事項を管理する必要があります。
- パブリックIP:迅速なセットアップに適しています。
プライベートコネクトは、クラウドプロバイダーのプライベートリンクまたはプライベートサービスコネクト技術を活用し、VPC内のリソースがプライベートIPアドレスを使用して他のVPC内のサービスに接続できるようにします。これにより、あたかもそれらのサービスがVPC内で直接ホストされているかのように動作します。
TiDB Cloud は現在、セルフホスト型 Kafka のプライベート接続のみをサポートしています。 MSK、Confluent Kafka、またはその他の Kafka SaaS サービスとの直接統合はサポートされていません。 Private Connect 経由でこれらの Kafka SaaS サービスに接続するには、 kafka-proxy仲介としてデプロイし、Kafka サービスを自己ホスト型 Kafka として効果的に公開できます。詳細な例については、 Google Cloud で Kafka-proxy を使用して自己ホスト型 Kafka プライベートサービス接続を設定する参照してください。この設定は、すべての Kafka SaaS サービスで同様です。
- Apache Kafka サービスが AWS でホストされている場合は、 AWSでセルフホスト型のKafkaプライベートリンクサービスをセットアップするセットアップする」に従ってネットワーク接続を構成し、ブートストラップ ポート情報を取得します。次にChangefeeds用のプライベートエンドポイントを設定するポイントを設定する」に従ってプライベート エンドポイントを作成します。
- Apache Kafka サービスが Google Cloud でホストされている場合は、 Google Cloud でセルフホスト型の Kafka プライベートサービスコネクトを設定するネットワーク接続を構成し、ブートストラップ ポート情報を取得します。次にChangefeeds用のプライベートエンドポイントを設定するポイントを設定するに従ってプライベート エンドポイントを作成します。
- Apache Kafka サービスが Azure でホストされている場合は、 Azureでセルフホスト型Kafkaプライベートリンクサービスをセットアップするセットアップする」に従ってネットワーク接続を構成し、ブートストラップ ポート情報を取得してから、Changefeeds用のプライベートエンドポイントを設定するエンドプライベートポイントを設定するに従ってプライベート エンドポイントを作成します。
Apache KafkaサービスがインターネットにアクセスできないAWS VPC内にある場合は、以下の手順を実行してください。
Apache Kafka サービスの VPC とTiDB Cloud Dedicatedクラスターの間でVPCピアリング接続を設定する。
Apache Kafkaサービスが関連付けられているセキュリティグループの受信ルールを変更します。
TiDB Cloud Dedicatedクラスターが配置されているリージョンの CIDR を受信ルールに追加する必要があります。CIDR はVPC ピアリングページで確認できます。これにより、TiDB Cloud Dedicatedクラスターから Kafka ブローカーへのトラフィックが流れるようになります。
Apache KafkaのURLにホスト名が含まれている場合、 TiDB CloudがApache KafkaブローカーのDNSホスト名を解決できるようにする必要があります。
- VPCピアリング接続のDNS解決を有効にするの手順に従います。
- アクセプターDNS解決オプションを有効にする。
Apache Kafka サービスがインターネットにアクセスできない Google Cloud VPC 内にある場合は、以下の手順を実行してください。
Apache Kafka サービスの VPC とTiDB Cloud Dedicatedクラスターの間でVPCピアリング接続を設定する。
Apache Kafkaが配置されているVPCのイングレスファイアウォールルールを変更します。
TiDB Cloud Dedicatedクラスターが配置されているリージョンの CIDR を、イングレス ファイアウォール ルールに追加する必要があります。CIDR は、VPC ピアリングページで確認できます。これにより、TiDB Cloud Dedicatedクラスターから Kafka ブローカーへのトラフィックが流れるようになります。
Apache KafkaサービスにパブリックIPアクセスを提供する場合は、すべてのKafkaブローカーにパブリックIPアドレスを割り当ててください。
本番環境でパブリックIPを使用することは推奨されません。
Kafka ACL認証
TiDB Cloudの変更フィードがデータをApache Kafkaにストリーミングし、Kafkaトピックを自動的に作成できるようにするには、Kafkaに以下の権限が追加されていることを確認してください。
- Kafka のトピック リソース タイプに
CreateおよびWrite権限が追加されます。 - Kafka のクラスタ リソース タイプに
DescribeConfigs権限が追加されます。
たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については Confluent ドキュメントのリソースとACLの追加参照してください。
ステップ1. Apache KafkaのChangefeedページを開きます。
- TiDB Cloudコンソールにログインします。
- ターゲットのTiDB Cloud Dedicatedクラスターの概要ページに移動し、左側のナビゲーション ペインで[データ] > [変更フィード]をクリックします。
- 「変更フィードの作成」をクリックし、宛先として「Kafka」を選択します。
ステップ2. changefeedターゲットを設定する
手順は、選択した接続方法によって異なります。
接続方法で「VPCピアリング」または「パブリックIP」を選択し、Kafkaブローカーのエンドポイントを入力します。複数のエンドポイントはカンマ
,で区切ることができます。Kafkaの認証設定に応じて、認証オプションを選択してください。
- Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
- Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名とパスワードを入力してください。
Kafkaのバージョンを選択してください。どのバージョンを使用すればよいかわからない場合は、 Kafka v2を使用してください。
この変更フィード内のデータの圧縮タイプを選択してください。
KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。
「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。
接続方法で「プライベートリンク」を選択します。
「プライベートエンドポイント」で、ネットワークセクションで作成したプライベートエンドポイントを選択します。プライベートエンドポイントのAZがKafkaデプロイメントのAZと一致していることを確認してください。
ネットワークセクションで取得したブートストラップポートを入力してください。1つのAZにつき少なくとも1つのポートを設定することをお勧めします。複数のポートを指定する場合は、カンマ
,で区切ってください。Kafkaの認証設定に応じて、認証オプションを選択してください。
- Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
- Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名とパスワードを入力してください。
Kafkaのバージョンを選択してください。どのバージョンを使用すればよいかわからない場合は、 Kafka v2を使用してください。
この変更フィード内のデータの圧縮タイプを選択してください。
KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。
「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。
接続方法で「プライベートサービス接続」を選択します。
[プライベート エンドポイント]で、ネットワークセクションで作成したプライベート エンドポイントを選択します。
ネットワークセクションで取得したブートストラップポートを入力してください。複数のポートを指定することをお勧めします。複数のポートを区切るには、カンマ
,を使用できます。Kafkaの認証設定に応じて、認証オプションを選択してください。
- Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
- Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名とパスワードを入力してください。
Kafkaのバージョンを選択してください。どのバージョンを使用すればよいかわからない場合は、 Kafka v2を使用してください。
この変更フィード内のデータの圧縮タイプを選択してください。
KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。
「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。
TiDB Cloudはプライベートサービス接続用のエンドポイントを作成しますが、これには数分かかる場合があります。
エンドポイントが作成されたら、クラウドプロバイダーのコンソールにログインし、接続要求を承認してください。
TiDB Cloudコンソールに戻る 接続要求を承認したことを確認してください。TiDB Cloudは接続テストを実行し、テストが成功した場合は次のページに進みます。
接続方法で「プライベートリンク」を選択します。
[プライベート エンドポイント]で、ネットワークセクションで作成したプライベート エンドポイントを選択します。
ネットワークセクションで取得したブートストラップポートを入力してください。1つのAZにつき少なくとも1つのポートを設定することをお勧めします。複数のポートを指定する場合は、カンマ
,で区切ってください。Kafkaの認証設定に応じて、認証オプションを選択してください。
- Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
- Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名とパスワードを入力してください。
Kafkaのバージョンを選択してください。どのバージョンを使用すればよいかわからない場合は、 Kafka v2を使用してください。
この変更フィード内のデータの圧縮タイプを選択してください。
KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。
「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。
TiDB Cloudはプライベートリンクのエンドポイントを作成しますが、これには数分かかる場合があります。
エンドポイントが作成されたら、 Azureポータルにログインして接続要求を承認してください。
TiDB Cloudコンソールに戻る 接続要求を承認したことを確認してください。TiDB Cloudは接続テストを実行し、テストが成功した場合は次のページに進みます。
ステップ3.チェンジフィードを設定する
テーブル フィルターをカスタマイズして、複製するテーブルをフィルターします。ルールの構文については、テーブルフィルタルールを参照してください。
- 大文字小文字の区別:フィルタルールにおけるデータベース名とテーブル名の照合において、大文字小文字を区別するかどうかを設定できます。デフォルトでは、大文字小文字は区別されません。
- フィルタルール:この列でフィルタルールを設定できます。デフォルトでは、すべてのテーブルを複製するルール
*.*が設定されています。新しいルールを追加すると、 TiDB Cloud はTiDB 内のすべてのテーブルをクエリし、右側のボックスにルールに一致するテーブルのみを表示します。フィルタルールは最大 100 個まで追加できます。 - 有効なキーを持つテーブル:この列には、主キーや一意インデックスなど、有効なキーを持つテーブルが表示されます。
- 有効なキーのないテーブル: この列には、主キーまたは一意キーがないテーブルが表示されます。一意の識別子がないと、ダウンストリームが重複イベントを処理する際にデータの一貫性が失われる可能性があるため、これらのテーブルはレプリケーション中に問題となります。データの一貫性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意キーまたは主キーを追加することをお勧めします。または、フィルタルールを追加してこれらのテーブルを除外することもできます。たとえば、ルール
test.tbl1を使用して、テーブル"!test.tbl1"除外できます。
イベントフィルターをカスタマイズして、複製したいイベントを絞り込みます。
- 一致するテーブル:この列では、イベントフィルターを適用するテーブルを設定できます。ルールの構文は、前のテーブルフィルター領域で使用されているものと同じです。変更フィードごとに最大10個のイベントフィルタールールを追加できます。
- イベントフィルター:以下のイベントフィルターを使用して、変更フィードから特定のイベントを除外できます。
- イベントを無視する:指定されたイベントタイプを除外します。
- SQL を無視: 指定された式に一致する DDL イベントを除外します。たとえば、
^dropDROPで始まるステートメントを除外し、add columnはADD COLUMNを含むステートメントを除外します。 - 挿入値の式を無視する: 特定の条件を満たす
INSERTステートメントを除外します。たとえば、id >= 100は、INSERTが 100 以上であるidステートメントを除外します。 - 新しい値の更新式を無視する: 新しい値が指定された条件に一致する
UPDATEステートメントを除外します。たとえば、gender = 'male'はgenderがmaleになるような更新を除外します。 - 古い値の更新を無視する式: 古い値が指定された条件に一致する
UPDATEステートメントを除外します。たとえば、age < 18ageの古い値が 18 未満である場合の更新を除外します。 - 削除値式を無視する: 指定された条件を満たす
DELETEステートメントを除外します。たとえば、name = 'john'はDELETEがnameである'john'ステートメントを除外します。
カラムセレクタをカスタマイズして、イベントから列を選択し、選択した列に関連するデータ変更のみを下流に送信します。
- 一致するテーブル:列セレクターを適用するテーブルを指定します。どのルールにも一致しないテーブルの場合、すべての列が送信されます。
- カラムセレクター:一致したテーブルのどの列をダウンストリームに送信するかを指定します。
マッチングルールの詳細については、 カラムセレクターを参照してください。
データフォーマット領域で、希望するKafkaメッセージのフォーマットを選択してください。
- Avroは、コンパクトで高速なバイナリデータフォーマットであり、豊富なデータ構造を備え、様々なフローシステムで広く利用されています。詳細については、 Avroデータ形式参照してください。
- Canal-JSONは、解析が容易なプレーンなJSONテキスト形式です。詳細については、 Canal-JSONデータ形式参照してください。
- オープン プロトコルは、監視、キャッシュ、全文インデックス作成、分析エンジン、および異なるデータベース間のプライマリとセカンダリのレプリケーションのためのデータ ソースを提供する行レベルのデータ変更通知プロトコルです。詳細については、 オープンプロトコルデータフォーマットを参照してください。
- Debeziumは、データベースの変更をキャプチャするためのツールです。キャプチャされた各データベース変更を「イベント」と呼ばれるメッセージに変換し、これらのイベントをKafkaに送信します。詳細については、 Debeziumデータ形式参照してください。
TiDB拡張フィールドをKafkaメッセージ本文に追加する場合は、 TiDB拡張オプションを有効にしてください。
TiDB 拡張フィールドの詳細については、 Avroデータ形式のTiDB拡張フィールドフィールド」およびCanal-JSONデータ形式のTiDB拡張フィールド参照してください。
データ形式としてAvroを選択すると、ページにAvro固有の設定項目が表示されます。これらの設定項目は、以下のように入力できます。
- 「Decimal」および「Unsigned BigInt」の設定では、 TiDB CloudがKafkaメッセージ内のdecimal型およびunsigned bigint型データ型をどのように処理するかを指定します。
- スキーマレジストリ領域で、スキーマレジストリエンドポイントを入力します。HTTP認証を有効にすると、ユーザー名とパスワードのフィールドが表示され、 TiDB Cloud Dedicatedクラスターのエンドポイントとパスワードが自動的に入力されます。TiDB 。
トピック配信エリアで配信モードを選択し、選択したモードに応じてトピック名の設定を入力します。
データ形式としてAvroを選択した場合、 [配信モード]ドロップダウンリストでは、[テーブルごとの変更ログをKafkaトピックに配信する]モードのみを選択できます。
配信モードは、変更フィードがKafkaトピックを作成する方法を制御します。テーブルごと、データベースごと、またはすべての変更ログに対して1つのトピックを作成するかを選択できます。
テーブルごとに変更ログをKafkaトピックに配信する
変更フィードでテーブルごとに専用の Kafka トピックを作成する場合は、このモードを選択します。そうすると、テーブルのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピックのプレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定することで、テーブルのトピック名をカスタマイズできます。たとえば、区切り文字を
_に設定すると、トピック名は<Prefix><DatabaseName>_<TableName><Suffix>の形式になります。スキーマ作成イベントなど、行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、指定されたトピックに基づいて、これらの変更ログを収集するためのトピックを作成します。
データベースごとに変更ログをKafkaトピックに配信する
変更フィードでデータベースごとに専用のKafkaトピックを作成する場合は、このモードを選択してください。そうすると、データベースのすべてのKafkaメッセージが専用のKafkaトピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。
解決済みTsイベントなど、行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、指定されたトピックに基づいて、これらの変更ログを収集するためのトピックを作成します。
すべての変更ログを、指定された1つのKafkaトピックに送信する
変更フィードで全ての変更ログに対して1つのKafkaトピックを作成する場合は、このモードを選択してください。そうすると、変更フィード内のすべてのKafkaメッセージが1つのKafkaトピックに送信されます。トピック名は「トピック名」フィールドで指定できます。
パーティション分散領域では、Kafka メッセージの送信先パーティションを決定できます。すべてのテーブルに対して単一のパーティションディスパッチャを定義することも、テーブルごとに異なるパーティションディスパッチャを定義することもできます。TiDB Cloud、次の 4 種類のディスパッチャが提供されています。
プライマリキーまたはインデックス値に基づいて変更ログをKafkaパーティションに分散します。
変更フィードを使用してテーブルの Kafka メッセージを異なるパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの主キーまたはインデックス値によって、変更ログが送信されるパーティションが決まります。この分散方法は、パーティションのバランスを改善し、行レベルの順序性を確保します。
変更ログをテーブルごとにKafkaパーティションに分散する
変更フィードによってテーブルの Kafka メッセージを 1 つの Kafka パーティションに送信する場合は、この分散方法を選択してください。行の変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法はテーブルの順序性を保証しますが、パーティションのバランスが崩れる可能性があります。
変更ログをタイムスタンプに基づいてKafkaパーティションに分散する
変更フィードが Kafka メッセージをランダムに異なる Kafka パーティションに送信するようにするには、この分散方法を選択してください。行の変更ログの commitTs によって、変更ログが送信されるパーティションが決まります。この分散方法は、パーティションのバランスを改善し、各パーティションの順序性を確保します。ただし、データ項目の複数の変更が異なるパーティションに送信され、異なるコンシューマーのコンシューマー処理の進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは、複数のパーティションからのデータを消費する前に commitTs でソートする必要があります。
変更ログを列の値に基づいてKafkaパーティションに分散する
テーブルのKafkaメッセージを複数のパーティションに送信するように変更フィードを設定したい場合は、この配信方法を選択してください。行の変更ログで指定された列の値によって、変更ログの送信先パーティションが決まります。この配信方法では、各パーティション内の順序が確保され、同じ列の値を持つ変更ログが同じパーティションに送信されることが保証されます。
トピックコンフィグレーション領域で、以下の数値を設定してください。changefeedは、これらの数値に基づいてKafkaトピックを自動的に作成します。
- レプリケーション係数:各KafkaメッセージがレプリケートされるKafkaサーバーの数を制御します。有効な値の範囲は、
min.insync.replicasからKafkaブローカーの数までです。 - パーティション番号:トピックに存在するパーティションの数を制御します。有効な値の範囲は
[1, 10 * the number of Kafka brokers]です。
- レプリケーション係数:各KafkaメッセージがレプリケートされるKafkaサーバーの数を制御します。有効な値の範囲は、
[イベントの分割]エリアで、
UPDATEイベントを別々のDELETEとINSERTイベントに分割するか、生のUPDATEイベントとして保持するかを選択します。詳細については、 MySQL以外のシンクにおける、主キーまたは一意キーを分割したUPDATEイベント参照してください。「次へ」をクリックしてください。
ステップ4. 変更フィード仕様を設定します
- 「チェンジフィードの仕様」領域で、チェンジフィードで使用する複製容量単位(RCU)チェンジフィードの数を指定します。
- 変更フィード名欄に、変更フィードの名前を指定します。
- 「次へ」をクリックして、設定した内容を確認し、次のページへ進んでください。
ステップ5.設定内容を確認する
このページでは、設定したすべての変更フィード設定を確認できます。
エラーが見つかった場合は、戻って修正できます。エラーがない場合は、下部のチェックボックスをクリックし、 「作成」をクリックして変更フィードを作成します。