Apache Kafka にシンクする

このドキュメントでは、 TiDB Cloudから Apache Kafka にデータをストリーミングするための変更フィードを作成する方法について説明します。

注記:

  • changefeed 機能を使用するには、 TiDB Cloud Dedicated クラスターのバージョンが v6.1.3 以降であることを確認してください。
  • TiDB Cloudサーバーレス クラスター場合、changefeed 機能は使用できません。

制限

  • TiDB Cloudクラスターごとに、最大 100 個の変更フィードを作成できます。
  • 現在、 TiDB Cloud は、 Kafka ブローカーに接続するための自己署名 TLS 証明書のアップロードをサポートしていません。
  • TiDB Cloud は変更フィードを確立するために TiCDC を使用するため、同じTiCDCとしての制限持ちます。
  • 複製するテーブルに主キーまたは null 以外の一意のインデックスがない場合、複製中に一意の制約がないと、再試行シナリオによっては下流に重複したデータが挿入される可能性があります。

前提条件

Apache Kafka にデータをストリーミングするための変更フィードを作成する前に、次の前提条件を完了する必要があります。

  • ネットワーク接続を設定する
  • Kafka ACL 認証の権限を追加する

ネットワーク

TiDB クラスターが Apache Kafka サービスに接続できることを確認します。

Apache Kafka サービスがインターネットにアクセスできない AWS VPC にある場合は、次の手順を実行します。

  1. Apache Kafka サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する

  2. Apache Kafka サービスが関連付けられているセキュリティ グループの受信ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR をインバウンド ルールに追加する必要があります。CIDR はVPC ピアリングページで確認できます。これにより、トラフィックが TiDB クラスターから Kafka ブローカーに流れるようになります。

  3. Apache Kafka URL にホスト名が含まれている場合は、 TiDB Cloud がApache Kafka ブローカーの DNS ホスト名を解決できるようにする必要があります。

    1. VPC ピアリング接続の DNS 解決を有効にするの手順に従います。
    2. Accepter DNS 解決オプションを有効にします。

Apache Kafka サービスがインターネットにアクセスできない Google Cloud VPC にある場合は、次の手順に従います。

  1. Apache Kafka サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する

  2. Apache Kafka が配置されている VPC の Ingress ファイアウォール ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR を、イングレス ファイアウォール ルールに追加する必要があります。CIDR は、 VPC ピアリングページで確認できます。これにより、トラフィックが TiDB クラスターから Kafka ブローカーに流れるようになります。

Kafka ACL 認証

TiDB Cloud の変更フィードが Apache Kafka にデータをストリーミングし、Kafka トピックを自動的に作成できるようにするには、Kafka に次の権限が追加されていることを確認します。

  • Kafka のトピック リソース タイプにCreateおよびWrite権限が追加されます。
  • Kafka のクラスター リソース タイプにDescribeConfigs権限が追加されます。

たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については Confluent ドキュメントのリソースACLの追加参照してください。

ステップ1. Apache Kafkaのchangefeedページを開く

  1. TiDB Cloudコンソールで、ターゲット TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[Changefeed]をクリックします。
  2. 「Changefeed の作成」をクリックし、ターゲット タイプとしてKafka を選択します。

ステップ2. changefeedターゲットを構成する

  1. 「ブローカーコンフィグレーション」で、Kafka ブローカーのエンドポイントを入力します。複数のエンドポイントを区切るには、コンマ,使用できます。

  2. Kafka 認証構成に応じて認証オプションを選択します。

    • Kafka で認証が不要な場合は、デフォルトのオプション「Disable」のままにします。
    • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名とパスワードを入力します。
  3. Kafka のバージョンを選択します。不明な場合は、Kafka V2 を使用してください。

  4. この変更フィード内のデータに必要な圧縮タイプを選択します。

  5. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

  6. 「次へ」をクリックして設定した構成を確認し、次のページに進みます。

ステップ3. チェンジフィードを設定する

  1. テーブル フィルターをカスタマイズして、複製するテーブルをフィルターします。ルール構文については、 テーブルフィルタルールを参照してください。

    • フィルター ルール: この列でフィルター ルールを設定できます。デフォルトでは、すべてのテーブルを複製するルール*.*があります。新しいルールを追加すると、 TiDB Cloud はTiDB 内のすべてのテーブルを照会し、右側のボックスにルールに一致するテーブルのみを表示します。最大 100 個のフィルター ルールを追加できます。
    • 有効なキーを持つテーブル: この列には、主キーや一意のインデックスなど、有効なキーを持つテーブルが表示されます。
    • 有効なキーのないテーブル: この列には、主キーまたは一意のキーがないテーブルが表示されます。これらのテーブルは、一意の識別子がないと、ダウンストリームが重複イベントを処理するときにデータの一貫性がなくなる可能性があるため、レプリケーション中に問題が発生します。データの一貫性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意のキーまたは主キーを追加することをお勧めします。または、フィルター ルールを追加して、これらのテーブルを除外することもできます。たとえば、ルール"!test.tbl1"使用してテーブルtest.tbl1を除外できます。
  2. イベント フィルターをカスタマイズして、複製するイベントをフィルターします。

    • 一致するテーブル: この列で、イベント フィルターを適用するテーブルを設定できます。ルールの構文は、前のテーブル フィルター領域で使用した構文と同じです。変更フィードごとに最大 10 個のイベント フィルター ルールを追加できます。
    • 無視されるイベント: イベント フィルターが変更フィードから除外するイベントの種類を設定できます。
  3. 「データ形式」領域で、Kafka メッセージの希望する形式を選択します。

    • Avro は、豊富なデータ構造を備えたコンパクトで高速なバイナリ データ形式で、さまざまなフロー システムで広く使用されています。詳細については、 Avroデータ形式参照してください。
    • Canal-JSON は解析しやすいプレーンな JSON テキスト形式です。詳細については、 Canal-JSON データ形式参照してください。
  4. TiDB 拡張フィールドを Kafka メッセージ本文に追加する場合は、 TiDB 拡張オプションを有効にします。

    TiDB拡張フィールドの詳細については、 Avro データ形式の TiDB 拡張フィールドおよびCanal-JSON データ形式の TiDB 拡張フィールド参照してください。

  5. データ形式としてAvro を選択した場合、ページに Avro 固有の構成がいくつか表示されます。これらの構成は次のように入力できます。

    • DecimalおよびUnsigned BigInt構成では、 TiDB Cloud がKafka メッセージ内の Decimal および Unsigned Bigint データ型を処理する方法を指定します。
    • スキーマ レジストリ領域で、スキーマ レジストリ エンドポイントを入力します。HTTP認証を有効にすると、ユーザー名とパスワードのフィールドが表示され、TiDB クラスターのエンドポイントとパスワードが自動的に入力されます。
  6. トピック配布」領域で配布モードを選択し、モードに応じてトピック名の設定を入力します。

    データ形式としてAvro を選択した場合は、 「配布モード」ドロップダウン リストで「変更ログをテーブルごとに Kafka トピックに配布」モードのみを選択できます。

    配布モードは、変更フィードが Kafka トピックをテーブル別、データベース別、またはすべての変更ログに対して 1 つのトピックを作成する方法を制御します。

    • テーブルごとに変更ログを Kafka Topics に配布する

      changefeed で各テーブル専用の Kafka トピックを作成する場合は、このモードを選択します。すると、テーブルのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピック プレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定することで、テーブルのトピック名をカスタマイズできます。たとえば、区切り文字を_に設定すると、トピック名の形式は<Prefix><DatabaseName>_<TableName><Suffix>になります。

      スキーマ イベントの作成などの行以外のイベントの変更ログの場合は、 [既定のトピック名]フィールドにトピック名を指定できます。変更フィードは、そのような変更ログを収集するためにそれに応じてトピックを作成します。

    • データベースごとに変更ログを Kafka Topics に配布する

      変更フィードで各データベース専用の Kafka トピックを作成する場合は、このモードを選択します。すると、データベースのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。

      解決された Ts イベントなどの行以外のイベントの変更ログの場合は、 [デフォルトのトピック名]フィールドにトピック名を指定できます。変更フィードは、それに応じてトピックを作成し、そのような変更ログを収集します。

    • すべての変更ログを指定された Kafka トピックに送信する

      変更フィードですべての変更ログに対して 1 つの Kafka トピックを作成する場合は、このモードを選択します。すると、変更フィード内のすべての Kafka メッセージが 1 つの Kafka トピックに送信されます。トピック名は[トピック名]フィールドで定義できます。

  7. パーティション配布領域では、Kafka メッセージが送信されるパーティションを決定できます。

    • インデックス値によって変更ログを Kafka パーティションに配布する

      変更フィードでテーブルの Kafka メッセージを異なるパーティションに送信する場合は、この分散方法を選択します。行の変更ログのインデックス値によって、変更ログが送信されるパーティションが決まります。この分散方法により、パーティションのバランスが向上し、行レベルの秩序性が確保されます。

    • テーブルごとに変更ログを Kafka パーティションに配布する

      変更フィードでテーブルの Kafka メッセージを 1 つの Kafka パーティションに送信する場合は、この分散方法を選択します。行変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法では、テーブルの秩序性が確保されますが、パーティションのバランスが崩れる可能性があります。

  8. トピックコンフィグレーション領域で、次の番号を設定します。変更フィードは、番号に従って Kafka トピックを自動的に作成します。

    • レプリケーション係数: 各 Kafka メッセージが複製される Kafka サーバーの数を制御します。有効な値の範囲はmin.insync.replicasから Kafka ブローカーの数までです。
    • パーティション数: トピック内に存在するパーティションの数を制御します。有効な値の範囲は[1, 10 * the number of Kafka brokers]です。
  9. 「次へ」をクリックします。

ステップ4. チェンジフィード仕様を構成する

  1. 「Changefeed 仕様」領域で、Changefeed で使用されるレプリケーション容量単位 (RCU) の数を指定します。
  2. 「Changefeed 名」領域で、Changefeed の名前を指定します。
  3. 「次へ」をクリックして設定した構成を確認し、次のページに進みます。

ステップ5. 構成を確認する

このページでは、設定したすべての changefeed 構成を確認できます。

エラーが見つかった場合は、戻ってエラーを修正できます。エラーがない場合は、下部にあるチェックボックスをクリックし、 [作成]をクリックして変更フィードを作成できます。

このページは役に立ちましたか?