📣
TiDB Cloud Essential はパブリックプレビュー中です。このページは自動翻訳されたものです。原文はこちらからご覧ください。

Apache Kafkaへのシンク(ベータ版)



このドキュメントでは、TiDB Cloud EssentialからApache Kafkaへデータをストリーミングするためのチェンジフィードを作成する方法について説明します。

制限

  • TiDB Cloud Essentialインスタンスごとに、最大10個のチェンジフィードを作成できます。
  • 現在、 TiDB Cloud Essentialは、Kafkaブローカーへの接続に自己署名TLS証明書をアップロードすることをサポートしていません。
  • TiDB Cloud Essential はTiCDC を使用して変更フィードを確立するため、同じTiCDCの制限があります。
  • 複製対象のテーブルに主キーまたはNULLを許容しない一意インデックスがない場合、複製中に一意制約が存在しないことで、一部の再試行シナリオにおいて、下流で重複データが挿入される可能性があります。

前提条件

Apache Kafkaにデータをストリーミングするためのチェンジフィードを作成する前に、以下の前提条件を満たす必要があります。

  • ネットワーク接続を設定する
  • Kafka ACL認証の権限を追加する

ネットワーク

TiDB Cloud EssentialインスタンスがApache Kafkaサービスに接続できることを確認してください。接続方法は以下のいずれかを選択できます。

  • プライベートリンク接続:セキュリティコンプライアンスを満たし、ネットワーク品質を確保します。
  • 公共ネットワーク:迅速なセットアップに適しています。

    プライベートリンク接続は、クラウドプロバイダーのプライベートリンク技術を活用し、VPC内のリソースがプライベートIPアドレスを使用して他のVPC内のサービスに接続できるようにします。これにより、あたかもそれらのサービスがVPC内で直接ホストされているかのように動作します。

    TiDB Cloud Essentialは現在、セルフホスト型Kafka、Confluent Cloud Dedicatedクラスター、およびAmazon MSK Provisioned向けのプライベートリンク接続のみをサポートしています。他のKafka SaaSサービスとの直接統合はサポートしていません。

    Kafkaのデプロイメントとクラウドプロバイダーに基づいてプライベートリンク接続を設定するには、以下のガイドを参照してください。

    Apache Kafkaサービスへのパブリックアクセスを提供する場合は、すべてのKafkaブローカーにパブリックIPアドレスまたはドメイン名を割り当ててください。

    本番環境でパブリックアクセスを使用することは推奨されません。

    Kafka ACL認証

    TiDB Cloud Essential の変更フィードが Apache Kafka にデータをストリーミングし、Kafka トピックを自動的に作成できるようにするには、Kafka に次の権限が追加されていることを確認してください。

    • Kafka のトピック リソース タイプにCreateおよびWrite権限が追加されます。
    • Kafka のクラスタ リソース タイプにDescribeConfigs権限が追加されます。

    たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については、Confluent ドキュメントのリソースACLの追加を参照してください。

    ステップ1. Apache KafkaのChangefeedページを開きます。

    1. TiDB Cloudコンソールにログインします。
    2. 対象のTiDB Cloud Essentialインスタンスの概要ページに移動し、左側のナビゲーションペインで「データ」 > 「変更フィード」をクリックします。
    3. 「変更フィードの作成」をクリックし、次に「宛先」として「Kafka」を選択します。

    ステップ2. changefeedターゲットを設定する

    手順は、選択した接続方法によって異なります。

      1. 「接続方法」「パブリック」を選択し、Kafkaブローカーのエンドポイントを入力します。複数のエンドポイントはカンマ,を使用して区切ることができます。

      2. Kafkaの認証設定に応じて、認証オプションを選択してください。

        • Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
        • Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名パスワードを入力してください。
      3. Kafkaのバージョンについては、ご使用のKafkaのバージョンに基づいて、 Kafka v2またはKafka v3を選択してください。

      4. この変更フィード内のデータの圧縮タイプを選択してください。

      5. KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。

      6. 「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。

      1. 接続方法「プライベートリンク」を選択します。

      2. 「プライベートリンク接続」で、ネットワークセクションで作成したプライベートリンク接続を選択します。プライベートリンク接続のアベイラビリティゾーンが、Kafkaデプロイメントのアベイラビリティゾーンと一致していることを確認してください。

      3. ネットワークセクションで取得したブートストラップポートを入力してください。Amazon MSKプロビジョニング済みプライベートリンク接続を使用している場合は、このフィールドはスキップできます。

      4. Kafkaの認証設定に応じて、認証オプションを選択してください。

        • Kafkaで認証が必要ない場合は、デフォルトオプションの「無効」のままにしてください。
        • Kafkaで認証が必要な場合は、該当する認証タイプを選択し、認証に使用するKafkaアカウントのユーザー名パスワードを入力してください。
      5. Kafkaのバージョンについては、ご使用のKafkaのバージョンに基づいて、 Kafka v2またはKafka v3を選択してください。

      6. この変更フィード内のデータの圧縮タイプを選択してください。

      7. KafkaでTLS暗号化が有効になっており、Kafka接続にTLS暗号化を使用する場合は、TLS暗号化オプションを有効にしてください。

      8. KafkaでTLS SNI検証が必要な場合は、 TLSサーバー名を入力してください。例: Confluent Cloud Dedicated clusters

      9. 「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のページに移動します。

      ステップ3.チェンジフィードを設定する

      1. テーブル フィルターをカスタマイズして、複製するテーブルをフィルターします。ルールの構文については、 テーブルフィルタルールを参照してください。

        • レプリケーション範囲:有効なキーを持つテーブルのみをレプリケートするか、選択したすべてのテーブルをレプリケートするかを選択できます。
        • フィルタルール:この列でフィルタルールを設定できます。デフォルトでは、すべてのテーブルを複製するルール*.*が設定されています。新しいルールを追加して[適用] をクリックすると、 TiDB Cloud はTiDB 内のすべてのテーブルをクエリし、フィルタ結果の下にルールに一致するテーブルのみを表示します。
        • 大文字小文字の区別:フィルタルールにおけるデータベース名とテーブル名の照合において、大文字小文字を区別するかどうかを設定できます。デフォルトでは、大文字小文字は区別されません。
        • 有効なキーで結果をフィルタリングする:この列には、主キーや一意インデックスなど、有効なキーを持つテーブルが表示されます。
        • 有効なキーのない結果をフィルタリングする: この列には、主キーまたは一意キーがないテーブルが表示されます。一意の識別子がないと、ダウンストリームが重複イベントを処理する際にデータの一貫性が失われる可能性があるため、これらのテーブルはレプリケーション中に問題となります。データの一貫性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意キーまたは主キーを追加することをお勧めします。または、フィルタ ルールを追加してこれらのテーブルを除外することもできます。たとえば、ルールtest.tbl1を使用して、テーブル"!test.tbl1"を除外できます。
      2. イベントフィルターをカスタマイズして、複製したいイベントを絞り込みます。

        • 一致するテーブル:この列では、イベントフィルターを適用するテーブルを設定できます。ルールの構文は、前のテーブルフィルター領域で使用されているものと同じです。
        • イベントフィルター:無視したいイベントを選択できます。
      3. カラムセレクタをカスタマイズして、イベントから列を選択し、選択した列に関連するデータ変更のみを下流に送信します。

        • 一致するテーブル:列セレクターを適用するテーブルを指定します。どのルールにも一致しないテーブルの場合、すべての列が送信されます。
        • カラムセレクター:一致したテーブルのどの列をダウンストリームに送信するかを指定します。

        マッチングルールの詳細については、 カラムセレクターを参照してください。

      4. データフォーマット領域で、希望するKafkaメッセージのフォーマットを選択してください。

        • Avroは、コンパクトで高速なバイナリデータフォーマットであり、豊富なデータ構造を備え、様々なフローシステムで広く利用されています。詳細については、 Avroデータ形式参照してください。
        • Canal-JSONは、解析が容易なプレーンなJSONテキスト形式です。詳細については、 Canal-JSONデータ形式参照してください。
        • オープン プロトコルは、監視、キャッシュ、全文インデックス作成、分析エンジン、および異なるデータベース間のプライマリとセカンダリのレプリケーションのためのデータ ソースを提供する行レベルのデータ変更通知プロトコルです。詳細については、 オープンプロトコルデータフォーマットを参照してください。
        • Debeziumは、データベースの変更をキャプチャするためのツールです。キャプチャされた各データベース変更を「イベント」と呼ばれるメッセージに変換し、これらのイベントをKafkaに送信します。詳細については、 Debeziumデータ形式参照してください。
      5. TiDB拡張フィールドをKafkaメッセージ本文に追加する場合は、 TiDB拡張オプションを有効にしてください。

        TiDB 拡張フィールドの詳細については、 Avroデータ形式のTiDB拡張フィールドフィールド」およびCanal-JSONデータ形式のTiDB拡張フィールド参照してください。

      6. データ形式としてAvroを選択すると、ページにAvro固有の設定項目が表示されます。これらの設定項目は、以下のように入力できます。

        • 「Decimal」および「Unsigned BigInt」の設定では、 TiDB CloudがKafkaメッセージ内のdecimal型およびunsigned bigint型のデータ型をどのように処理するかを指定します。
        • スキーマレジストリ領域で、スキーマレジストリのエンドポイントを入力します。HTTP認証を有効にする場合は、ユーザー名とパスワードを入力します。
      7. トピック配信エリアで配信モードを選択し、選択したモードに応じてトピック名の設定を入力します。

        データ形式としてAvroを選択した場合、 [配信モード]ドロップダウンリストでは[テーブルごとの変更ログをKafkaトピックに配信する]モードのみを選択できます。

        配信モードは、変更フィードがKafkaトピックを作成する方法を制御します。テーブルごと、データベースごと、またはすべての変更ログに対して1つのトピックを作成するかを選択できます。

        • テーブルごとに変更ログをKafkaトピックに配信する

          変更フィードでテーブルごとに専用の Kafka トピックを作成する場合は、このモードを選択します。そうすると、テーブルのすべての Kafka メッセージが専用の Kafka トピックに送信されます。トピックのプレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定することで、テーブルのトピック名をカスタマイズできます。たとえば、区切り文字を_に設定すると、トピック名は<Prefix><DatabaseName>_<TableName><Suffix>の形式になります。

          スキーマ作成イベントなど、行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、指定されたトピックに基づいて、これらの変更ログを収集するためのトピックを作成します。

        • データベースごとに変更ログをKafkaトピックに配信する

          変更フィードでデータベースごとに専用のKafkaトピックを作成する場合は、このモードを選択してください。そうすると、データベースのすべてのKafkaメッセージが専用のKafkaトピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。

          解決済みTsイベントなど、行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、指定されたトピックに基づいて、これらの変更ログを収集するためのトピックを作成します。

        • すべての変更ログを、指定された1つのKafkaトピックに送信する

          変更フィードで全ての変更ログに対して1つのKafkaトピックを作成する場合は、このモードを選択してください。そうすると、変更フィード内のすべてのKafkaメッセージが1つのKafkaトピックに送信されます。トピック名は「トピック名」フィールドで指定できます。

      8. パーティション分散領域では、Kafka メッセージの送信先パーティションを決定できます。すべてのテーブルに対して単一のパーティションディスパッチャを定義することも、テーブルごとに異なるパーティションディスパッチャを定義することもできます。TiDB Cloud、次の 4 種類のディスパッチャが提供されています。

        • プライマリキーまたはインデックス値に基づいて変更ログをKafkaパーティションに分散します。

          変更フィードでテーブルの Kafka メッセージを異なるパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの主キーまたはインデックス値によって、変更ログの送信先パーティションが決まります。主キーを使用する場合は、 「インデックス名」フィールドを空のままにしてください。この分散方法は、パーティションのバランスを改善し、行レベルの順序性を確保します。

        • 変更ログをテーブルごとにKafkaパーティションに分散する

          変更フィードによってテーブルの Kafka メッセージを 1 つの Kafka パーティションに送信する場合は、この分散方法を選択してください。行の変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法はテーブルの順序性を保証しますが、パーティションのバランスが崩れる可能性があります。

        • 変更ログをタイムスタンプに基づいてKafkaパーティションに分散する

          変更フィードが Kafka メッセージをランダムに異なる Kafka パーティションに送信するようにするには、この分散方法を選択してください。行の変更ログの commitTs によって、変更ログが送信されるパーティションが決まります。この分散方法は、パーティションのバランスを改善し、各パーティションの順序性を確保します。ただし、データ項目の複数の変更が異なるパーティションに送信され、異なるコンシューマーのコンシューマー処理の進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは、複数のパーティションからのデータを消費する前に commitTs でソートする必要があります。

        • 変更ログを列の値に基づいてKafkaパーティションに分散する

          テーブルのKafkaメッセージを異なるパーティションに送信するように変更フィードを設定したい場合は、この配信方法を選択してください。行の変更ログで指定された列の値によって、変更ログの送信先パーティションが決まります。この配信方法は、各パーティション内の順序性を保証し、同じ列値を持つ変更ログが同じパーティションに送信されることを保証します。

      9. トピックコンフィグレーション領域で、以下の数値を設定してください。changefeedは、これらの数値に基づいてKafkaトピックを自動的に作成します。

        • レプリケーション係数:各KafkaメッセージがレプリケートされるKafkaサーバーの数を制御します。有効な値の範囲は、 min.insync.replicasからKafkaブローカーの数までです。
        • パーティション番号:トピックに存在するパーティションの数を制御します。有効な値の範囲は[1, 10 * the number of Kafka brokers]です。
      10. [イベントの分割]エリアで、 UPDATEイベントを別々のDELETEINSERTイベントに分割するか、生のUPDATEイベントとして保持するかを選択します。詳細については、 MySQL以外のシンクにおける、主キーまたは一意キーを分割したUPDATEイベント参照してください。

      11. 「次へ」をクリックしてください。

      ステップ4.変更フィードを確認して作成する

      1. 変更フィード名欄に、変更フィードの名前を指定します。
      2. 設定した変更フィードの設定をすべて確認してください。必要に応じて「戻る」をクリックして変更してください。
      3. すべての設定が正しければ、 「送信」をクリックして変更フィードを作成します。

      このページは役に立ちましたか?