Apache Kafka にシンクする

このドキュメントでは、 TiDB Cloudから Apache Kafka にデータをストリーミングするための変更フィードを作成する方法について説明します。

注記:

  • changefeed 機能を使用するには、 TiDB Cloud Dedicated クラスターのバージョンが v6.1.3 以降であることを確認してください。
  • TiDB Cloudサーバーレス クラスター場合、changefeed 機能は使用できません。

制限

  • TiDB Cloudクラスターごとに、最大 100 個の変更フィードを作成できます。

  • 現在、 TiDB Cloud は、 Kafka ブローカーに接続するための自己署名 TLS 証明書のアップロードをサポートしていません。

  • TiDB Cloud は変更フィードを確立するために TiCDC を使用するため、同じTiCDCとしての制限持ちます。

  • 複製するテーブルに主キーまたは null 以外の一意のインデックスがない場合、複製中に一意の制約がないと、再試行シナリオによっては下流に重複したデータが挿入される可能性があります。

  • ネットワーク接続方法として Private Link または Private Service Connect を選択する場合は、TiDB クラスターのバージョンが次の要件を満たしていることを確認してください。

    • v6.5.xの場合: バージョンv6.5.9以降
    • v7.1.xの場合: バージョンv7.1.4以降
    • v7.5.xの場合: バージョンv7.5.1以降
    • v8.1.xの場合: v8.1.x以降のすべてのバージョンがサポートされています
  • データ形式として Debezium を使用する場合は、TiDB クラスターのバージョンが v8.1.0 以降であることを確認してください。

  • Kafka メッセージのパーティション分散については、次の点に注意してください。

    • 指定されたインデックス名を持つ Kafka パーティションに主キーまたはインデックス値による変更ログを配布する場合は、TiDB クラスターのバージョンが v7.5.0 以降であることを確認してください。
    • 列値ごとに変更ログを Kafka パーティションに配布する場合は、TiDB クラスターのバージョンが v7.5.0 以降であることを確認してください。

前提条件

Apache Kafka にデータをストリーミングするための変更フィードを作成する前に、次の前提条件を完了する必要があります。

  • ネットワーク接続を設定する
  • Kafka ACL 認証の権限を追加する

ネットワーク

TiDB クラスターが Apache Kafka サービスに接続できることを確認します。次のいずれかの接続方法を選択できます。

  • Private Connect (ベータ版): VPC CIDR の競合を回避し、セキュリティ コンプライアンスを満たすのに最適ですが、追加のプライベートデータリンクコストが発生します。
  • VPC ピアリング: コスト効率の高いオプションとして適していますが、潜在的な VPC CIDR の競合とセキュリティ上の考慮事項を管理する必要があります。
  • パブリック IP: 迅速なセットアップに適しています。
  • Private Connect (Beta)
  • VPC Peering
  • Public IP

Private Connect は、クラウド プロバイダーのPrivate LinkまたはPrivate Service Connectテクノロジーを活用して、VPC 内のリソースがプライベート IP アドレスを使用して他の VPC 内のサービスに接続できるようにします。その場合、それらのサービスが VPC 内で直接ホストされているかのようになります。

TiDB Cloud は現在、セルフホスト型 Kafka のプライベート接続のみをサポートしています。MSK、Confluent Kafka、またはその他の Kafka SaaS サービスとの直接統合はサポートしていません。プライベート接続を介してこれらの Kafka SaaS サービスに接続するには、 kafka プロキシを仲介としてデプロイし、Kafka サービスをセルフホスト型 Kafka として効果的に公開します。詳細な例については、 Google Cloud で Kafka-proxy を使用してセルフホスト型 Kafka Private Service Connect を設定する参照してください。この設定は、すべての Kafka SaaS サービスで同様です。

  • Apache Kafka サービスが AWS でホストされている場合は、 AWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップするに従って、ネットワーク接続が適切に構成されていることを確認します。セットアップ後、 TiDB Cloudコンソールで次の情報を入力して、変更フィードを作成します。

    • Kafka アドバタイズド リスナー パターンの ID
    • エンドポイントサービス名
    • ブートストラップポート
  • Apache Kafka サービスが Google Cloud でホストされている場合は、 Google Cloud でセルフホスト型 Kafka プライベート サービス接続を設定するに従って、ネットワーク接続が適切に構成されていることを確認します。セットアップ後、 TiDB Cloudコンソールで次の情報を指定して、変更フィードを作成します。

    • Kafka アドバタイズド リスナー パターンの ID
    • サービスアタッチメント
    • ブートストラップポート

Apache Kafka サービスがインターネットにアクセスできない AWS VPC にある場合は、次の手順を実行します。

  1. Apache Kafka サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する

  2. Apache Kafka サービスが関連付けられているセキュリティ グループの受信ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR をインバウンド ルールに追加する必要があります。CIDR はVPC ピアリングページで確認できます。これにより、トラフィックが TiDB クラスターから Kafka ブローカーに流れるようになります。

  3. Apache Kafka URL にホスト名が含まれている場合は、 TiDB Cloud がApache Kafka ブローカーの DNS ホスト名を解決できるようにする必要があります。

    1. VPC ピアリング接続の DNS 解決を有効にするの手順に従います。
    2. Accepter DNS 解決オプションを有効にします。

Apache Kafka サービスがインターネットにアクセスできない Google Cloud VPC にある場合は、次の手順に従います。

  1. Apache Kafka サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する

  2. Apache Kafka が配置されている VPC の Ingress ファイアウォール ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR を、イングレス ファイアウォール ルールに追加する必要があります。CIDR は、 VPC ピアリングページで確認できます。これにより、トラフィックが TiDB クラスターから Kafka ブローカーに流れるようになります。

Apache Kafka サービスにパブリック IP アクセスを提供する場合は、すべての Kafka ブローカーにパブリック IP アドレスを割り当てます。

本番環境でパブリック IP を使用することはお勧めしませ

Kafka ACL 認証

TiDB Cloud の変更フィードが Apache Kafka にデータをストリーミングし、Kafka トピックを自動的に作成できるようにするには、Kafka に次の権限が追加されていることを確認します。

  • Kafka のトピック リソース タイプにCreateおよびWrite権限が追加されます。
  • Kafka のクラスター リソース タイプにDescribeConfigs権限が追加されます。

たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については Confluent ドキュメントのリソースACLの追加参照してください。

ステップ1. Apache Kafkaのchangefeedページを開く

  1. TiDB Cloudコンソールにログインします。
  2. ターゲット TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[Changefeed]クリックします。
  3. 「Changefeed の作成」をクリックし、ターゲット タイプとしてKafka を選択します。

ステップ2. changefeedターゲットを構成する

手順は、選択した接続方法によって異なります。

  • VPC Peering or Public IP
  • Private Link
  • Private Service Connect
  1. 接続方法で、 VPC ピアリングまたはパブリック IPを選択し、Kafka ブローカーのエンドポイントを入力します。複数のエンドポイントを区切るには、コンマ,使用できます。

  2. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション「Disable」のままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
  3. Kafka のバージョンを選択します。どれを使用すればよいかわからない場合は、 Kafka v2 を使用してください。

  4. この変更フィード内のデータの圧縮タイプを選択します。

  5. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  6. 「次へ」をクリックして、ネットワーク接続をテストします。テストが成功すると、次のページに移動します。

  1. [接続方法]で、 [プライベート リンク]を選択します。

  2. エンドポイント サービスのエンドポイントを作成するには、 TiDB CloudのAWS プリンシパルを承認します。AWS プリンシパルは、Web ページのヒントに提供されています。

  3. ネットワークセクションでAWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップする選択するときに、必ずKafka デプロイメントの**AZ の数と AZ ID を同じに選択し、 Kafka アドバタイズ リスナー パターン**に同じ一意の ID を入力してください。

  4. AWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップするで設定したエンドポイント サービス名を入力します。

  5. ブートストラップ ポートを入力します。1 つの AZ に少なくとも 1 つのポートを設定することをお勧めします。複数のポートを区切るには、コンマ,使用できます。

  6. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション「Disable」のままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
  7. Kafka のバージョンを選択します。どれを使用すればよいかわからない場合は、 Kafka v2 を使用してください。

  8. この変更フィード内のデータの圧縮タイプを選択します。

  9. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  10. 「次へ」をクリックして、ネットワーク接続をテストします。テストが成功すると、次のページに移動します。

  11. TiDB Cloud はPrivate Linkのエンドポイントを作成しますが、これには数分かかる場合があります。

  12. エンドポイントが作成されたら、クラウド プロバイダー コンソールにログインし、接続要求を承認します。

  13. TiDB Cloudコンソールに戻り、接続要求を受け入れたことを確認します。TiDB TiDB Cloud は接続をテストし、テストが成功すると次のページに進みます。

  1. 接続方法で、プライベート サービス接続を選択します。

  2. ネットワークセクションでGoogle Cloud でセルフホスト型 Kafka プライベート サービス接続を設定する入力するときに、 Kafka アドバタイズ リスナー パターンに同じ一意の ID を入力するようにしてください。

  3. Google Cloud でセルフホスト型 Kafka プライベート サービス接続をセットアップするで設定したサービスアタッチメントを入力します。

  4. ブートストラップ ポートを入力します。複数のポートを指定することをお勧めします。複数のポートを区切るには、コンマ,を使用できます。

  5. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション「Disable」のままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
  6. Kafka のバージョンを選択します。どれを使用すればよいかわからない場合は、 Kafka v2 を使用してください。

  7. この変更フィード内のデータの圧縮タイプを選択します。

  8. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  9. 「次へ」をクリックして、ネットワーク接続をテストします。テストが成功すると、次のページに移動します。

  10. TiDB Cloud はPrivate Service Connectのエンドポイントを作成します。これには数分かかる場合があります。

  11. エンドポイントが作成されたら、クラウド プロバイダー コンソールにログインし、接続要求を承認します。

  12. TiDB Cloudコンソールに戻り、接続要求を受け入れたことを確認します。TiDB TiDB Cloud は接続をテストし、テストが成功すると次のページに進みます。

ステップ3. チェンジフィードを設定する

  1. テーブル フィルターをカスタマイズして、複製するテーブルをフィルターします。ルール構文については、 テーブルフィルタルールを参照してください。

    • フィルター ルール: この列でフィルター ルールを設定できます。デフォルトでは、すべてのテーブルを複製するルール*.*があります。新しいルールを追加すると、 TiDB Cloud はTiDB 内のすべてのテーブルを照会し、右側のボックスにルールに一致するテーブルのみを表示します。最大 100 個のフィルター ルールを追加できます。
    • 有効なキーを持つテーブル: この列には、主キーや一意のインデックスなど、有効なキーを持つテーブルが表示されます。
    • 有効なキーのないテーブル: この列には、主キーまたは一意のキーがないテーブルが表示されます。これらのテーブルは、一意の識別子がないと、ダウンストリームが重複イベントを処理するときにデータの一貫性がなくなる可能性があるため、レプリケーション中に問題が発生します。データの一貫性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意のキーまたは主キーを追加することをお勧めします。または、フィルター ルールを追加して、これらのテーブルを除外することもできます。たとえば、ルール"!test.tbl1"使用してテーブルtest.tbl1を除外できます。
  2. イベント フィルターをカスタマイズして、複製するイベントをフィルターします。

    • 一致するテーブル: この列で、イベント フィルターを適用するテーブルを設定できます。ルールの構文は、前のテーブル フィルター領域で使用した構文と同じです。変更フィードごとに最大 10 個のイベント フィルター ルールを追加できます。
    • 無視されるイベント: イベント フィルターが変更フィードから除外するイベントの種類を設定できます。
  3. 「データ形式」領域で、Kafka メッセージの希望する形式を選択します。

    • Avro は、豊富なデータ構造を備えたコンパクトで高速なバイナリ データ形式で、さまざまなフロー システムで広く使用されています。詳細については、 Avroデータ形式参照してください。
    • Canal-JSON は解析しやすいプレーンな JSON テキスト形式です。詳細については、 Canal-JSON データ形式参照してください。
    • Open Protocol は、監視、キャッシュ、フルテキスト インデックス、分析エンジン、および異なるデータベース間のプライマリ/セカンダリ レプリケーション用のデータ ソースを提供する行レベルのデータ変更通知プロトコルです。詳細については、 オープンプロトコルデータ形式参照してください。
    • Debezium は、データベースの変更をキャプチャするためのツールです。キャプチャされた各データベースの変更を「イベント」と呼ばれるメッセージに変換し、これらのイベントを Kafka に送信します。詳細については、 Debezium データ形式参照してください。
  4. TiDB 拡張フィールドを Kafka メッセージ本文に追加する場合は、 TiDB 拡張オプションを有効にします。

    TiDB拡張フィールドの詳細については、 Avro データ形式の TiDB 拡張フィールドおよびCanal-JSON データ形式の TiDB 拡張フィールド参照してください。

  5. データ形式としてAvro を選択した場合、ページに Avro 固有の構成がいくつか表示されます。これらの構成は次のように入力できます。

    • DecimalおよびUnsigned BigInt構成では、 TiDB Cloud がKafka メッセージ内の Decimal および Unsigned Bigint データ型を処理する方法を指定します。
    • スキーマ レジストリ領域で、スキーマ レジストリ エンドポイントを入力します。HTTP認証を有効にすると、ユーザー名とパスワードのフィールドが表示され、TiDB クラスターのエンドポイントとパスワードが自動的に入力されます。
  6. トピック配布」領域で配布モードを選択し、モードに応じてトピック名の設定を入力します。

    データ形式としてAvro を選択した場合は、 「配布モード」ドロップダウン リストで「変更ログをテーブルごとに Kafka トピックに配布」モードのみを選択できます。

    配布モードは、変更フィードが Kafka トピックをテーブル別、データベース別、またはすべての変更ログに対して 1 つのトピックを作成する方法を制御します。

    • テーブルごとに変更ログを Kafka Topics に配布する

      変更フィードで各テーブル専用の Kafka トピックを作成する場合は、このモードを選択します。すると、テーブルのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピックのプレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定することで、テーブルのトピック名をカスタマイズできます。たとえば、区切り文字を_に設定すると、トピック名の形式は<Prefix><DatabaseName>_<TableName><Suffix>になります。

      スキーマ イベントの作成などの行以外のイベントの変更ログの場合は、 [既定のトピック名]フィールドにトピック名を指定できます。変更フィードは、そのような変更ログを収集するためにそれに応じてトピックを作成します。

    • データベースごとに変更ログを Kafka Topics に配布する

      変更フィードで各データベース専用の Kafka トピックを作成する場合は、このモードを選択します。すると、データベースのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。

      解決された Ts イベントなどの行以外のイベントの変更ログの場合は、 [デフォルトのトピック名]フィールドにトピック名を指定できます。変更フィードは、それに応じてトピックを作成し、そのような変更ログを収集します。

    • すべての変更ログを指定された Kafka トピックに送信する

      変更フィードですべての変更ログに対して 1 つの Kafka トピックを作成する場合は、このモードを選択します。すると、変更フィード内のすべての Kafka メッセージが 1 つの Kafka トピックに送信されます。トピック名は[トピック名]フィールドで定義できます。

  7. パーティション配布領域では、Kafka メッセージを送信するパーティションを決定できます。すべてのテーブルに対して単一のパーティション ディスパッチャーを定義することも、テーブルごとに異なるパーティション ディスパッチャーを定義することもできます。TiDB TiDB Cloud、次の 4 種類のディスパッチャーが提供されています。

    • 主キーまたはインデックス値によって変更ログを Kafka パーティションに分散する

      変更フィードでテーブルの Kafka メッセージを異なるパーティションに送信する場合は、この分散方法を選択します。行の変更ログの主キーまたはインデックス値によって、変更ログが送信されるパーティションが決まります。この分散方法により、パーティションのバランスが向上し、行レベルの秩序性が確保されます。

    • テーブルごとに変更ログを Kafka パーティションに配布する

      変更フィードでテーブルの Kafka メッセージを 1 つの Kafka パーティションに送信する場合は、この分散方法を選択します。行変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法では、テーブルの秩序性が確保されますが、パーティションのバランスが崩れる可能性があります。

    • タイムスタンプごとに変更ログを Kafka パーティションに配布する

      変更フィードが Kafka メッセージを異なる Kafka パーティションにランダムに送信するようにする場合は、この分散方法を選択します。行の変更ログの commitTs によって、変更ログが送信されるパーティションが決まります。この分散方法により、パーティションのバランスが向上し、各パーティションの秩序が確保されます。ただし、データ項目の複数の変更が異なるパーティションに送信され、異なるコンシューマーの進行状況が異なる場合があり、データの不整合が発生する可能性があります。したがって、コンシューマーは、消費する前に、複数のパーティションのデータを commitTs で並べ替える必要があります。

    • 列値ごとに変更ログを Kafka パーティションに分散する

      変更フィードでテーブルの Kafka メッセージを異なるパーティションに送信する場合は、この分散方法を選択します。行の変更ログの指定された列値によって、変更ログが送信されるパーティションが決まります。この分散方法により、各パーティションの秩序が確保され、同じ列値の変更ログが同じパーティションに送信されることが保証されます。

  8. トピックコンフィグレーション領域で、次の番号を設定します。変更フィードは、番号に従って Kafka トピックを自動的に作成します。

    • レプリケーション係数: 各 Kafka メッセージが複製される Kafka サーバーの数を制御します。有効な値の範囲はmin.insync.replicasから Kafka ブローカーの数までです。
    • パーティション数: トピック内に存在するパーティションの数を制御します。有効な値の範囲は[1, 10 * the number of Kafka brokers]です。
  9. 「次へ」をクリックします。

ステップ4. チェンジフィード仕様を構成する

  1. 「Changefeed 仕様」領域で、Changefeed で使用されるレプリケーション容量単位 (RCU) の数を指定します。
  2. 「Changefeed 名」領域で、Changefeed の名前を指定します。
  3. 「次へ」をクリックして設定した構成を確認し、次のページに進みます。

ステップ5. 構成を確認する

このページでは、設定したすべての changefeed 構成を確認できます。

エラーが見つかった場合は、戻ってエラーを修正できます。エラーがない場合は、下部にあるチェックボックスをクリックし、 [作成]をクリックして変更フィードを作成できます。

このページは役に立ちましたか?