Apache Kafka にシンクする

このドキュメントでは、 TiDB Cloudから Apache Kafka にデータをストリーミングするためのチェンジフィードを作成する方法について説明します。

注記:

  • チェンジフィード機能を使用するには、TiDB 専用クラスターのバージョンが v6.1.3 以降であることを確認してください。
  • TiDB サーバーレスクラスターの場合、チェンジフィード機能は使用できません。

制限

  • TiDB Cloudクラスターごとに、最大 100 個の変更フィードを作成できます。
  • 現在、 TiDB Cloud は、Kafka ブローカーに接続するための自己署名 TLS 証明書のアップロードをサポートしていません。
  • TiDB Cloud はTiCDC を使用して変更フィードを確立するため、同じTiCDC としての制限を持ちます。
  • レプリケートされるテーブルに主キーまたは NULL 以外の一意のインデックスがない場合、レプリケーション中に一意制約がないため、一部の再試行シナリオでは重複したデータがダウンストリームに挿入される可能性があります。

前提条件

データを Apache Kafka にストリーミングするための変更フィードを作成する前に、次の前提条件を満たしている必要があります。

  • ネットワーク接続をセットアップする
  • Kafka ACL 認可のためのアクセス許可を追加する

通信網

TiDB クラスターが Apache Kafka サービスに接続できることを確認してください。

Apache Kafka サービスがインターネットにアクセスできない AWS VPC にある場合は、次の手順を実行します。

  1. Apache Kafka サービスの VPC と TiDB クラスターの間のVPC ピアリング接続をセットアップする

  2. Apache Kafka サービスが関連付けられているセキュリティ グループの受信ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR を受信ルールに追加する必要があります。 CIDR は、 「VPC ピアリング」ページにあります。これにより、TiDB クラスターから Kafka ブローカーにトラフィックが流れるようになります。

  3. Apache Kafka URL にホスト名が含まれている場合は、 TiDB Cloud がApache Kafka ブローカーの DNS ホスト名を解決できるようにする必要があります。

    1. VPC ピアリング接続の DNS 解決を有効にするの手順に従います。
    2. アクセプター DNS 解決オプションを有効にします。

Apache Kafka サービスがインターネットにアクセスできない Google Cloud VPC にある場合は、次の手順を実行します。

  1. Apache Kafka サービスの VPC と TiDB クラスターの間のVPC ピアリング接続をセットアップする

  2. Apache Kafka が配置されている VPC のイングレス ファイアウォール ルールを変更します。

    TiDB Cloudクラスターが配置されているリージョンの CIDR をイングレス ファイアウォール ルールに追加する必要があります。 CIDR は、 「VPC ピアリング」ページにあります。これにより、TiDB クラスターから Kafka ブローカーにトラフィックが流れるようになります。

Kafka ACL 認可

TiDB Cloud変更フィードがデータを Apache Kafka にストリーミングし、Kafka トピックを自動的に作成できるようにするには、次の権限が Kafka に追加されていることを確認します。

  • Kafka のトピック リソース タイプにCreateおよびWrite権限が追加されます。
  • Kafka のクラスター リソース タイプにDescribeConfigs権限が追加されます。

たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については Confluent ドキュメントのリソースACLの追加を参照してください。

ステップ 1. Apache Kafka のチェンジフィード ページを開く

  1. TiDB Cloudコンソールでは、ターゲット TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[Changefeed]をクリックします。
  2. [変更フィードの作成]をクリックし、ターゲット タイプとしてKafkaを選択します。

ステップ 2. チェンジフィードターゲットを構成する

  1. [ブローカーのコンフィグレーション]で、Kafka ブローカー エンドポイントを入力します。カンマ,使用して複数のエンドポイントを区切ることができます。

  2. Kafka 認証構成に従って認証オプションを選択します。

    • Kafka が認証を必要としない場合は、デフォルトのオプションDisable のままにしておきます。
    • Kafka で認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名とパスワードを入力します。
  3. Kafka のバージョンを選択します。それがわからない場合は、Kafka V2 を使用してください。

  4. この変更フィード内のデータに必要な圧縮タイプを選択します。

  5. Kafka でTLS 暗号化が有効になっており、Kafka 接続に TLS 暗号化を使用する場合は、TLS 暗号化オプションを有効にします。

  6. 「次へ」をクリックして設定した構成を確認し、次のページに進みます。

ステップ 3. チェンジフィードを設定する

  1. テーブル フィルターをカスタマイズして、複製するテーブルをフィルターします。ルールの構文については、 テーブルフィルタールールを参照してください。

    • フィルター ルール: この列でフィルター ルールを設定できます。デフォルトでは、すべてのテーブルを複製することを表すルール*.*があります。新しいルールを追加すると、 TiDB CloudはTiDB 内のすべてのテーブルをクエリし、ルールに一致するテーブルのみを右側のボックスに表示します。最大 100 個のフィルター ルールを追加できます。
    • 有効なキーを持つテーブル: この列には、主キーや一意のインデックスなどの有効なキーを持つテーブルが表示されます。
    • 有効なキーのないテーブル: この列には、主キーまたは一意のキーがないテーブルが表示されます。これらのテーブルでは、ダウンストリームが重複イベントを処理するときに一意の識別子がないとデータの不整合が生じる可能性があるため、レプリケーション中に課題が生じます。データの一貫性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意キーまたは主キーを追加することをお勧めします。あるいは、フィルター ルールを追加して、これらのテーブルを除外することもできます。たとえば、ルール"!test.tbl1"を使用してテーブルtest.tbl1を除外できます。
  2. イベント フィルターをカスタマイズして、複製するイベントをフィルターします。

    • 一致するテーブル: この列でイベント フィルターを適用するテーブルを設定できます。ルールの構文は、前述のテーブル フィルター領域で使用されるものと同じです。変更フィードごとに最大 10 個のイベント フィルター ルールを追加できます。
    • 無視されるイベント: イベント フィルターが変更フィードから除外するイベントのタイプを設定できます。
  3. 「データ形式」領域で、Kafka メッセージの希望の形式を選択します。

    • Avro は、豊富なデータ構造を備えたコンパクトで高速なバイナリ データ形式で、さまざまなフロー システムで広く使用されています。詳細については、 Avro データ形式を参照してください。
    • Canal-JSON はプレーンな JSON テキスト形式であり、解析が簡単です。詳細については、 Canal-JSON データ形式を参照してください。
  4. TiDB 拡張フィールドを Kafka メッセージ本文に追加する場合は、TiDB 拡張オプションを有効にします。

    TiDB 拡張フィールドの詳細については、 Avro データ形式の TiDB 拡張フィールドおよびCanal-JSON データ形式の TiDB 拡張フィールドを参照してください。

  5. データ形式としてAvroを選択すると、ページに Avro 固有の構成がいくつか表示されます。これらの構成は次のように入力できます。

    • DecimalおよびUnsigned BigInt構成では、 TiDB Cloud がKafka メッセージ内の 10 進数および符号なし bigint データ型を処理する方法を指定します。
    • [スキーマ レジストリ]領域で、スキーマ レジストリ エンドポイントを入力します。 HTTP 認証 を有効にすると、ユーザー名とパスワードのフィールドが表示され、TiDB クラスターのエンドポイントとパスワードが自動的に入力されます。
  6. [トピックの配布]領域で、配布モードを選択し、モードに従ってトピック名の設定を入力します。

    データ形式としてAvroを選択した場合は、[**配布モード]ドロップダウン リストで[変更ログをテーブルごとに Kafka トピックに配布]**モードのみを選択できます。

    分散モードは、テーブルごと、データベースごと、またはすべての変更ログに対して 1 つのトピックを作成するなど、変更フィードが Kafka トピックを作成する方法を制御します。

    • 変更ログをテーブルごとに Kafka トピックに配布する

      チェンジフィードでテーブルごとに専用の Kafka トピックを作成する場合は、このモードを選択します。次に、テーブルのすべての Kafka メッセージが専用の Kafka トピックに送信されます。テーブルのトピック名をカスタマイズするには、トピックのプレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定します。たとえば、区切り文字を_に設定すると、トピック名の形式は<Prefix><DatabaseName>_<TableName><Suffix>になります。

      「スキーマ作成イベント」などの行以外のイベントの変更ログの場合、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、そのような変更ログを収集するために、それに応じてトピックを作成します。

    • データベースごとに変更ログを Kafka に配布する

      チェンジフィードでデータベースごとに専用の Kafka トピックを作成する場合は、このモードを選択します。次に、データベースのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。

      解決された Ts イベントなどの非行イベントの変更ログの場合、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、そのような変更ログを収集するために、それに応じてトピックを作成します。

    • すべての変更ログを指定された 1 つの Kafka トピックに送信します

      変更フィードですべての変更ログに対して 1 つの Kafka トピックを作成する場合は、このモードを選択します。その後、変更フィード内のすべての Kafka メッセージが 1 つの Kafka トピックに送信されます。 「トピック名」フィールドでトピック名を定義できます。

  7. [パーティション配布]領域では、Kafka メッセージがどのパーティションに送信されるかを決定できます。

    • インデックス値ごとに変更ログを Kafka パーティションに配布する

      変更フィードでテーブルの Kafka メッセージを別のパーティションに送信する場合は、この分散方法を選択します。行変更ログのインデックス値によって、変更ログがどのパーティションに送信されるかが決まります。この分散方法により、パーティションのバランスが改善され、行レベルの順序性が保証されます。

    • 変更ログをテーブルごとに Kafka パーティションに分散する

      チェンジフィードでテーブルの Kafka メッセージを 1 つの Kafka パーティションに送信する場合は、この分散方法を選択します。行変更ログのテーブル名によって、変更ログがどのパーティションに送信されるかが決まります。この分散方法ではテーブルの順序性が保証されますが、パーティションの不均衡が発生する可能性があります。

  8. [トピックコンフィグレーション]エリアで、次の数値を設定します。チェンジフィードは、数値に従って Kafka トピックを自動的に作成します。

    • レプリケーション係数: 各 Kafka メッセージがレプリケートされる Kafka サーバーの数を制御します。有効な値の範囲は、 min.insync.replicasから Kafka ブローカーの数までです。
    • パーティション番号: トピック内に存在するパーティションの数を制御します。有効な値の範囲は[1, 10 * the number of Kafka brokers]です。
  9. 「次へ」をクリックします。

ステップ 4. 変更フィード仕様を構成する

  1. 「変更フィードの仕様」領域で、変更フィードで使用するレプリケーション キャパシティ ユニット (RCU) の数を指定します。
  2. 「変更フィード名」領域で、変更フィードの名前を指定します。
  3. 「次へ」をクリックして設定した構成を確認し、次のページに進みます。

ステップ 5. 構成を確認する

このページでは、設定したすべての変更フィード構成を確認できます。

エラーが見つかった場合は、戻ってエラーを修正できます。エラーがない場合は、下部のチェックボックスをクリックし、 [作成]をクリックして変更フィードを作成します。

このページは役に立ちましたか?

Playground
新規
登録なしで TiDB の機能をワンストップでインタラクティブに体験できます。
製品
TiDB Cloud
TiDB
価格
PoC お問い合わせ
エコシステム
TiKV
TiFlash
OSS Insight
© 2024 PingCAP. All Rights Reserved.
Privacy Policy.