Apache Kafka にシンクする

このドキュメントでは、TiDB Cloudから Apache Kafka にデータをストリーミングするための変更フィードを作成する方法について説明します。

注記:

  • changefeed 機能を使用するには、 TiDB Cloud Dedicated クラスターのバージョンが v6.1.3 以降であることを確認してください。
  • TiDB Cloudサーバーレス クラスターの場合、changefeed 機能は使用できません。

制限

  • TiDB Cloudクラスターごとに、最大 100 個の変更フィードを作成できます。

  • 現在、 TiDB Cloud は、 Kafka ブローカーに接続するための自己署名 TLS 証明書のアップロードをサポートしていません。

  • TiDB Cloud は、変更フィードを確立するために TiCDC を使用するため、同じTiCDCとしての制限持ちます。

  • レプリケートするテーブルに主キーまたは NULL 以外の一意のインデックスがない場合、レプリケーション中に一意の制約がないと、再試行シナリオによっては下流に重複したデータが挿入される可能性があります。

  • ネットワーク接続方法として Private Link または Private Service Connect を選択する場合は、TiDB クラスターのバージョンが次の要件を満たしていることを確認してください。

    • v6.5.xの場合: バージョンv6.5.9以降
    • v7.1.xの場合: バージョンv7.1.4以降
    • v7.5.x の場合: バージョン v7.5.1 以降
    • v8.1.xの場合: v8.1.x以降のすべてのバージョンがサポートされています
  • データ形式として Debezium を使用する場合は、TiDB クラスターのバージョンが v8.1.0 以降であることを確認してください。

  • Kafka メッセージのパーティション分散については、次の点に注意してください。

    • 指定されたインデックス名を持つ Kafka パーティションに主キーまたはインデックス値による変更ログを配布する場合は、TiDB クラスターのバージョンが v7.5.0 以降であることを確認してください。
    • 列値ごとに変更ログを Kafka パーティションに配布する場合は、TiDB クラスターのバージョンが v7.5.0 以降であることを確認してください。

前提条件

Apache Kafka にデータをストリーミングするための変更フィードを作成する前に、次の前提条件を完了する必要があります。

  • ネットワーク接続を設定する
  • Kafka ACL 認証の権限を追加する

ネットワーク

TiDB クラスターが Apache Kafka サービスに接続できることを確認してください。以下のいずれかの接続方法を選択できます。

  • Private Connect: VPC CIDR の競合を回避し、セキュリティ コンプライアンスを満たすのに最適ですが、追加のプライベートデータリンクコストが発生します。
  • VPC ピアリング: コスト効率の高いオプションとして適していますが、潜在的な VPC CIDR の競合とセキュリティ上の考慮事項を管理する必要があります。
  • パブリック IP: 素早いセットアップに適しています。
  • Private Connect
  • VPC Peering
  • Public IP

Private Connect は、クラウド プロバイダーのPrivate LinkまたはPrivate Service Connectテクノロジーを活用して、VPC 内のリソースがプライベート IP アドレスを使用して他の VPC 内のサービスに接続できるようにします。これは、それらのサービスが VPC 内で直接ホストされているかのように機能します。

TiDB Cloudは現在、セルフホスト型KafkaのPrivate Connectのみをサポートしています。MSK、Confluent Kafka、その他のKafka SaaSサービスとの直接統合はサポートしていません。Private Connectを介してこれらのKafka SaaSサービスに接続するには、 kafkaプロキシを仲介としてデプロイし、Kafkaサービスをセルフホスト型Kafkaとして公開します。詳細な例については、 Google Cloud で Kafka-proxy を使用してセルフホスト型 Kafka プライベート サービス接続を設定する参照してください。この設定は、すべてのKafka SaaSサービスで共通です。

  • Apache Kafka サービスが AWS でホストされている場合は、手順AWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップするに従ってネットワーク接続が適切に設定されていることを確認してください。セットアップ後、 TiDB Cloudコンソールで以下の情報を入力して変更フィードを作成してください。

    • Kafka アドバタイズドリスナーパターンの ID
    • エンドポイントサービス名
    • ブートストラップポート
  • Apache Kafka サービスが Google Cloud でホストされている場合は、手順Google Cloud でセルフホスト型 Kafka プライベート サービス接続を設定するに従ってネットワーク接続が適切に構成されていることを確認してください。セットアップ後、 TiDB Cloudコンソールで以下の情報を入力して変更フィードを作成してください。

    • Kafka アドバタイズドリスナーパターンの ID
    • サービスアタッチメント
    • ブートストラップポート
  • Apache Kafka サービスが Azure でホストされている場合は、手順Azure でセルフホスト型 Kafka プライベートリンク サービスをセットアップするに従ってネットワーク接続が適切に構成されていることを確認してください。セットアップ後、 TiDB Cloudコンソールで以下の情報を入力して変更フィードを作成してください。

    • Kafka アドバタイズドリスナーパターンの ID
    • プライベートリンクサービスの別名
    • ブートストラップポート

Apache Kafka サービスがインターネットにアクセスできない AWS VPC にある場合は、次の手順を実行します。

  1. Apache Kafka サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する

  2. Apache Kafka サービスが関連付けられているセキュリティ グループの受信ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR をインバウンドルールに追加する必要があります。CIDR はVPC ピアリングページで確認できます。これにより、TiDB クラスターから Kafka ブローカーへのトラフィックが許可される場合があります。

  3. Apache Kafka URL にホスト名が含まれている場合は、 TiDB Cloud がApache Kafka ブローカーの DNS ホスト名を解決できるようにする必要があります。

    1. VPC ピアリング接続の DNS 解決を有効にするの手順に従います。
    2. Accepter DNS 解決オプションを有効にします。

Apache Kafka サービスがインターネットにアクセスできない Google Cloud VPC にある場合は、次の手順を行います。

  1. Apache Kafka サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する

  2. Apache Kafka が配置されている VPC の Ingress ファイアウォール ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR を、Ingress ファイアウォール ルールに追加する必要があります。CIDR はVPC ピアリングページで確認できます。これにより、TiDB クラスターから Kafka ブローカーへのトラフィックが許可される場合があります。

Apache Kafka サービスにパブリック IP アクセスを提供する場合は、すべての Kafka ブローカーにパブリック IP アドレスを割り当てます。

本番環境でパブリック IP を使用することはお勧めしません

Kafka ACL 認証

TiDB Cloud変更フィードが Apache Kafka にデータをストリーミングし、Kafka トピックを自動的に作成できるようにするには、Kafka に次の権限が追加されていることを確認します。

  • Kafka のトピック リソース タイプにCreateおよびWrite権限が追加されます。
  • Kafka のクラスター リソース タイプにDescribeConfigs権限が追加されます。

たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については Confluent ドキュメントのリソースACLの追加参照してください。

ステップ1. Apache KafkaのChangefeedページを開く

  1. TiDB Cloudコンソールにログインします。
  2. ターゲット TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[データ] > [Changefeed]クリックします。
  3. 「Changefeed の作成」をクリックし、宛先としてKafkaを選択します。

ステップ2. changefeedターゲットを構成する

手順は、選択した接続方法によって異なります。

  • VPC Peering or Public IP
  • Private Link (AWS)
  • Private Service Connect
  • Private Link (Azure)
  1. 接続方法「VPCピアリング」または「パブリックIP」を選択し、Kafkaブローカーのエンドポイントを入力します。複数のエンドポイントを指定する場合は、カンマ,で区切ることができます。

  2. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション[無効]ままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
  3. Kafka のバージョンを選択してください。どのバージョンを使用すればよいか分からない場合は、 Kafka v2を使用してください。

  4. この変更フィード内のデータの圧縮タイプを選択します。

  5. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  6. 「次へ」をクリックしてネットワーク接続をテストします。テストが成功すると、次のページに進みます。

  1. [接続方法][プライベート リンク]を選択します。

  2. TiDB CloudのAWS プリンシパルエンドポイントサービスのエンドポイント作成を承認します。AWS プリンシパルは、Web ページのヒントに記載されています。

  3. ネットワークセクションでAWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップする選択するときに、必ずKafka デプロイメントの**AZ の数と AZ ID を選択し、 Kafka アドバタイズ リスナー パターン**に同じ一意の ID を入力してください。

  4. AWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップするで設定したエンドポイント サービス名を入力します。

  5. ブートストラップポートを入力してください。1つのAZにつき少なくとも1つのポートを設定することをお勧めします。複数のポートを指定する場合は、カンマ,で区切ることができます。

  6. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション[無効]ままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
  7. Kafka のバージョンを選択してください。どのバージョンを使用すればよいか分からない場合は、 Kafka v2を使用してください。

  8. この変更フィード内のデータの圧縮タイプを選択します。

  9. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  10. 「次へ」をクリックしてネットワーク接続をテストします。テストが成功すると、次のページに進みます。

  11. TiDB Cloud はPrivate Linkのエンドポイントを作成します。これには数分かかる場合があります。

  12. エンドポイントが作成されたら、クラウド プロバイダー コンソールにログインし、接続要求を承認します。

  13. TiDB Cloudコンソールに戻り、接続リクエストを承認したことを確認してください。TiDB TiDB Cloud は接続をテストし、テストが成功すると次のページに進みます。

  1. [接続方法]で、 [プライベート サービス接続]を選択します。

  2. ネットワークセクションでGoogle Cloud でセルフホスト型 Kafka プライベート サービス接続を設定する入力するときに、 Kafka アドバタイズ リスナー パターンに同じ一意の ID を入力するようにしてください。

  3. Google Cloud でセルフホスト型 Kafka プライベート サービス接続をセットアップするで設定したサービスアタッチメントを入力します。

  4. ブートストラップポートを入力してください。複数のポートを指定することをお勧めします。複数のポートを指定する場合は、カンマ,で区切ることができます。

  5. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション[無効]ままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
  6. Kafka のバージョンを選択してください。どのバージョンを使用すればよいか分からない場合は、 Kafka v2を使用してください。

  7. この変更フィード内のデータの圧縮タイプを選択します。

  8. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  9. 「次へ」をクリックしてネットワーク接続をテストします。テストが成功すると、次のページに進みます。

  10. TiDB Cloud はPrivate Service Connectのエンドポイントを作成します。これには数分かかる場合があります。

  11. エンドポイントが作成されたら、クラウド プロバイダー コンソールにログインし、接続要求を承認します。

  12. TiDB Cloudコンソールに戻り、接続リクエストを承認したことを確認してください。TiDB TiDB Cloud は接続をテストし、テストが成功すると次のページに進みます。

  1. [接続方法][プライベート リンク]を選択します。

  2. 変更フィードを作成する前に、 TiDB Cloudの Azure サブスクリプションを承認するか、エイリアスを持つすべてのユーザーが Private Link サービスにアクセスできるようにしてください。Azure サブスクリプションは、Web ページの「続行する前に確認する」ヒントに記載されています。Private Link サービスの可視性に関する詳細については、Azure ドキュメントの制御サービスの公開参照してください。

  3. ネットワークセクションでAzure でセルフホスト型 Kafka プライベートリンク サービスをセットアップする入力するときは、必ずKafka アドバタイズ リスナー パターンに同じ一意の ID を入力してください。

  4. Azure でセルフホスト型 Kafka プライベートリンク サービスをセットアップするで設定したプライベートリンクサービスのエイリアスを入力します。

  5. ブートストラップポートを入力してください。1つのAZにつき少なくとも1つのポートを設定することをお勧めします。複数のポートを指定する場合は、カンマ,で区切ることができます。

  6. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション[無効]ままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
  7. Kafka のバージョンを選択してください。どのバージョンを使用すればよいか分からない場合は、 Kafka v2を使用してください。

  8. この変更フィード内のデータの圧縮タイプを選択します。

  9. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  10. 「次へ」をクリックしてネットワーク接続をテストします。テストが成功すると、次のページに進みます。

  11. TiDB Cloud はPrivate Linkのエンドポイントを作成します。これには数分かかる場合があります。

  12. エンドポイントが作成されたら、 Azureポータルにログインして接続要求を承認します。

  13. TiDB Cloudコンソールに戻り、接続リクエストを承認したことを確認してください。TiDB TiDB Cloud は接続をテストし、テストが成功すると次のページに進みます。

ステップ3. チェンジフィードを設定する

  1. テーブルフィルターをカスタマイズして、複製するテーブルをフィルタリングします。ルールの構文については、 テーブルフィルタルールを参照してください。

    • フィルタールール: この列でフィルタールールを設定できます。デフォルトでは、すべてのテーブルを複製するルール*.*が設定されています。新しいルールを追加すると、 TiDB CloudはTiDB内のすべてのテーブルをクエリし、ルールに一致するテーブルのみを右側のボックスに表示されます。フィルタールールは最大100件まで追加できます。
    • 有効なキーを持つテーブル: この列には、主キーや一意のインデックスなど、有効なキーを持つテーブルが表示されます。
    • 有効なキーのないテーブル: この列には、主キーまたは一意キーを持たないテーブルが表示されます。これらのテーブルは、一意の識別子がないと、下流で重複イベントを処理する際にデータの不整合が発生する可能性があるため、レプリケーション中に問題が発生します。データの整合性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意のキーまたは主キーを追加することをお勧めします。または、これらのテーブルを除外するフィルタールールを追加することもできます。例えば、ルール"!test.tbl1"を使用してテーブルtest.tbl1を除外できます。
  2. イベント フィルターをカスタマイズして、複製するイベントをフィルターします。

    • 一致するテーブル: この列では、イベントフィルターを適用するテーブルを設定できます。ルールの構文は、前述の「テーブルフィルター」領域で使用した構文と同じです。変更フィードごとに最大10個のイベントフィルタールールを追加できます。
    • 無視されるイベント: イベント フィルターが変更フィードから除外するイベントの種類を設定できます。
  3. カラムセレクターをカスタマイズして、イベントから列を選択し、それらの列に関連するデータの変更のみをダウンストリームに送信します。

    • 一致するテーブル: 列セレクターを適用するテーブルを指定します。どのルールにも一致しないテーブルの場合は、すべての列が送信されます。
    • カラムセレクター: 一致したテーブルのどの列をダウンストリームに送信するかを指定します。

    一致ルールの詳細については、 カラムセレクター参照してください。

  4. 「データ形式」領域で、Kafka メッセージの希望する形式を選択します。

    • Avroは、豊富なデータ構造を備えたコンパクトで高速なバイナリデータ形式で、様々なフローシステムで広く使用されています。詳細については、 Avroデータ形式参照してください。
    • Canal-JSONは、解析が容易なプレーンなJSONテキスト形式です。詳細については、 Canal-JSONデータ形式ご覧ください。
    • Open Protocolは、行レベルのデータ変更通知プロトコルであり、監視、キャッシュ、全文インデックス作成、分析エンジン、および異なるデータベース間のプライマリ-セカンダリレプリケーションのためのデータソースを提供します。詳細については、 オープンプロトコルデータ形式参照してください。
    • Debeziumはデータベースの変更をキャプチャするためのツールです。キャプチャされたデータベースの変更は「イベント」と呼ばれるメッセージに変換され、Kafkaに送信されます。詳細については、 Debeziumデータ形式ご覧ください。
  5. Kafka メッセージ本文に TiDB 拡張フィールドを追加する場合は、 TiDB 拡張オプションを有効にします。

    TiDB 拡張フィールドの詳細については、 Avro データ形式の TiDB 拡張フィールドおよびCanal-JSON データ形式の TiDB 拡張フィールド参照してください。

  6. データ形式としてAvroを選択した場合、ページにAvro固有の設定が表示されます。これらの設定は以下のように入力できます。

    • DecimalおよびUnsigned BigInt構成では、 TiDB Cloud がKafka メッセージ内の Decimal および unsigned bigint データ型を処理する方法を指定します。
    • 「スキーマレジストリ」領域で、スキーマレジストリのエンドポイントを入力します。HTTP認証を有効にすると、ユーザー名とパスワードのフィールドが表示され、TiDBクラスターのエンドポイントとパスワードが自動的に入力されます。
  7. 「トピック配布」領域で配布モードを選択し、モードに応じてトピック名の構成を入力します。

    データ形式としてAvroを選択した場合は、 「配布モード」ドロップダウン リストで「変更ログをテーブルごとに Kafka トピックに配布」モードのみを選択できます。

    配布モードは、変更フィードが Kafka トピックをテーブル別、データベース別、またはすべての変更ログに対して 1 つのトピックを作成する方法を制御します。

    • テーブルごとに変更ログを Kafka Topics に配布する

      変更フィードで各テーブル専用のKafkaトピックを作成したい場合は、このモードを選択してください。すると、テーブルのすべてのKafkaメッセージが専用のKafkaトピックに送信されます。トピック名は、トピックプレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定することでカスタマイズできます。例えば、区切り文字を_に設定すると、トピック名は<Prefix><DatabaseName>_<TableName><Suffix>という形式になります。

      スキーマ作成イベントなどの行イベント以外の変更ログについては、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、そのような変更ログを収集するために、それに応じたトピックを作成します。

    • データベースごとに変更ログを Kafka Topics に配布する

      変更フィードで各データベース専用のKafkaトピックを作成したい場合は、このモードを選択してください。すると、データベースのすべてのKafkaメッセージが専用のKafkaトピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。

      解決済みTsイベントなど、行イベント以外のイベントの変更ログについては、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、そのような変更ログを収集するために、それに応じたトピックを作成します。

    • すべての変更ログを指定された Kafka トピックに送信する

      変更フィードですべての変更ログに対して1つのKafkaトピックを作成したい場合は、このモードを選択してください。そうすると、変更フィード内のすべてのKafkaメッセージが1つのKafkaトピックに送信されます。トピック名は「トピック名」フィールドで定義できます。

  8. 「パーティション分散」領域では、Kafka メッセージを送信するパーティションを指定できます。すべてのテーブルに単一のパーティションディスパッチャを定義することも、テーブルごとに異なるパーティションディスパッチャを定義することもできます。TiDB TiDB Cloud、以下の 4 種類のディスパッチャが提供されています。

    • 主キーまたはインデックス値で変更ログを Kafka パーティションに分散する

      変更フィードを使用してテーブルのKafkaメッセージを複数のパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの主キーまたはインデックス値によって、変更ログが送信されるパーティションが決まります。この分散方法により、パーティションのバランスが向上し、行レベルの順序性が確保されます。

    • テーブルごとに変更ログを Kafka パーティションに分散する

      変更フィードを使用して、テーブルの Kafka メッセージを単一の Kafka パーティションに送信する場合は、この分散方法を選択してください。行変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法はテーブルの順序性を保証しますが、パーティションの不均衡が生じる可能性があります。

    • タイムスタンプで変更ログを Kafka パーティションに配布する

      変更フィードが Kafka メッセージを異なる Kafka パーティションにランダムに送信するようにしたい場合は、この分散方法を選択してください。行の変更ログの commitTs によって、変更ログが送信されるパーティションが決まります。この分散方法は、パーティションバランスを改善し、各パーティションの秩序性を確保します。ただし、データ項目の複数の変更が異なるパーティションに送信され、各コンシューマーの進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは複数のパーティションからデータを消費する前に、commitTs でソートする必要があります。

    • 列値ごとに変更ログを Kafka パーティションに分散する

      変更フィードを使用して、テーブルの Kafka メッセージを複数のパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの指定された列値によって、変更ログが送信されるパーティションが決まります。この分散方法により、各パーティションの順序性が確保され、同じ列値を持つ変更ログが同じパーティションに送信されることが保証されます。

  9. トピックコンフィグレーションエリアで、以下の数値を設定します。変更フィードは、これらの数値に基づいてKafkaトピックを自動的に作成します。

    • レプリケーション係数: 各Kafkaメッセージが何台のKafkaサーバーに複製されるかを制御します。有効な値の範囲はmin.insync.replicasからKafkaブローカーの数までです。
    • パーティション数: トピック内に存在するパーティションの数を制御します。有効な値の範囲は[1, 10 * the number of Kafka brokers]です。
  10. 「次へ」をクリックします。

ステップ4. チェンジフィード仕様を構成する

  1. 「Changefeed 仕様」領域で、Changefeed で使用されるレプリケーション容量単位 (RCU) の数を指定します。
  2. 「Changefeed 名」領域で、Changefeed の名前を指定します。
  3. 「次へ」をクリックして設定した構成を確認し、次のページに進みます。

ステップ5. 構成を確認する

このページでは、設定したすべての changefeed 構成を確認できます。

エラーが見つかった場合は、戻って修正できます。エラーがない場合は、下部のチェックボックスをクリックし、「作成」をクリックして変更フィードを作成します。

このページは役に立ちましたか?