📣
TiDB Cloud Essential はパブリックプレビュー中です。このページは自動翻訳されたものです。原文はこちらからご覧ください。

Apache Kafka へのシンク(ベータ版)



このドキュメントでは、TiDB Cloud Essentialから Apache Kafka にデータをストリーミングするための変更フィードを作成する方法について説明します。

制限

  • TiDB Cloud Essentialクラスターごとに、最大 10 個の変更フィードを作成できます。
  • 現在、 TiDB Cloud Essential は、Kafka ブローカーに接続するための自己署名 TLS 証明書のアップロードをサポートしていません。
  • TiDB Cloud Essential は、変更フィードを確立するために TiCDC を使用するため、同じTiCDCとしての制限持ちます。
  • レプリケートするテーブルに主キーまたは NULL 以外の一意のインデックスがない場合、レプリケーション中に一意の制約がないと、再試行シナリオによっては下流に重複したデータが挿入される可能性があります。

前提条件

Apache Kafka にデータをストリーミングするための変更フィードを作成する前に、次の前提条件を完了する必要があります。

  • ネットワーク接続を設定する
  • Kafka ACL 認証の権限を追加する

ネットワーク

TiDB Cloud Essentialクラスターが Apache Kafka サービスに接続できることを確認してください。以下のいずれかの接続方法を選択できます。

  • プライベート リンク接続: セキュリティ コンプライアンスを満たし、ネットワーク品質を確保します。
  • パブリック ネットワーク: 素早いセットアップに適しています。

    プライベート リンク接続は、クラウド プロバイダーのプライベート リンクテクノロジーを活用して、VPC 内のリソースが、それらのサービスが VPC 内で直接ホストされているかのように、プライベート IP アドレスを使用して他の VPC 内のサービスに接続できるようにします。

    TiDB Cloud Essentialは現在、セルフホスト型Kafka、Confluent Cloud Dedicatedクラスター、およびAmazon MSK ProvisionedのPrivate Link接続のみをサポートしています。他のKafka SaaSサービスとの直接統合はサポートしていません。

    Kafka のデプロイメントとクラウド プロバイダーに基づいてプライベート リンク接続を設定するには、次のガイドを参照してください。

    Apache Kafka サービスへのパブリック アクセスを提供する場合は、すべての Kafka ブローカーにパブリック IP アドレスまたはドメイン名を割り当てます。

    本番環境でパブリック アクセスを使用することはお勧めしません。

    Kafka ACL 認証

    TiDB Cloud Essential変更フィードが Apache Kafka にデータをストリーミングし、Kafka トピックを自動的に作成できるようにするには、Kafka に次の権限が追加されていることを確認します。

    • Kafka のトピック リソース タイプにCreateおよびWrite権限が追加されます。
    • Kafka のクラスター リソース タイプにDescribeConfigs権限が追加されます。

    たとえば、Kafka クラスターが Confluent Cloud にある場合、詳細については Confluent ドキュメントのリソースACLの追加を参照してください。

    ステップ1. Apache KafkaのChangefeedページを開く

    1. TiDB Cloudコンソールにログインします。
    2. 対象のTiDB Cloud Essentialクラスターの概要ページに移動し、左側のナビゲーション ペインで[データ] > [Changefeed]をクリックします。
    3. 「Changefeed の作成」をクリックし、宛先としてKafka を選択します。

    ステップ2. changefeedターゲットを構成する

    手順は、選択した接続方法によって異なります。

      1. 接続方法「パブリック」を選択し、Kafkaブローカーのエンドポイントを入力します。複数のエンドポイントを指定する場合は、カンマ,で区切ることができます。

      2. Kafka 認証構成に応じて認証オプションを選択します。

        • Kafka で認証が不要な場合は、デフォルトのオプション[Disable]ままにしておきます。
        • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
      3. Kafka バージョンでは、Kafka のバージョンに基づいてKafka v2またはKafka v3を選択します。

      4. この変更フィード内のデータの圧縮タイプを選択します。

      5. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

      6. 「次へ」をクリックしてネットワーク接続をテストします。テストが成功すると、次のページに進みます。

      1. [接続方法][プライベート リンク]を選択します。

      2. 「プライベートリンク接続」で、セクションネットワークで作成したプライベートリンク接続を選択します。プライベートリンク接続のアベイラビリティゾーンがKafkaデプロイメントのアベイラビリティゾーンと一致していることを確認してください。

      3. セクションネットワークで取得したブートストラップポートを入力してください。Amazon MSKプロビジョニングプライベートリンク接続を使用している場合は、このフィールドをスキップできます。

      4. Kafka 認証構成に応じて認証オプションを選択します。

        • Kafka で認証が不要な場合は、デフォルトのオプション[Disable]ままにしておきます。
        • Kafka に認証が必要な場合は、対応する認証タイプを選択し、認証用の Kafka アカウントのユーザー名パスワードを入力します。
      5. Kafka バージョンでは、Kafka のバージョンに基づいてKafka v2またはKafka v3を選択します。

      6. この変更フィード内のデータの圧縮タイプを選択します。

      7. Kafka で TLS 暗号化が有効になっていて、Kafka 接続に TLS 暗号化を使用する場合は、 TLS 暗号化オプションを有効にします。

      8. Kafka で TLS SNI 検証が必要な場合は、 TLS サーバー名を入力します。例: Confluent Cloud Dedicated clusters

      9. 「次へ」をクリックしてネットワーク接続をテストします。テストが成功すると、次のページに進みます。

      ステップ3. チェンジフィードを設定する

      1. テーブルフィルターをカスタマイズして、複製するテーブルをフィルタリングします。ルールの構文については、 テーブルフィルタルールを参照してください。

        • レプリケーション スコープ: 有効なキーを持つテーブルのみをレプリケートするか、選択したすべてのテーブルをレプリケートするかを選択できます。
        • フィルタールール: この列でフィルタールールを設定できます。デフォルトでは、すべてのテーブルを複製するルール*.*が設定されています。新しいルールを追加して「適用」をクリックすると、 TiDB CloudはTiDB内のすべてのテーブルをクエリし、ルールに一致するテーブルのみを「フィルター結果」の下に表示されます。
        • 大文字と小文字を区別: フィルタールール内のデータベース名とテーブル名のマッチングで大文字と小文字を区別するかどうかを設定できます。デフォルトでは、大文字と小文字は区別されません。
        • 有効なキーで結果をフィルタリング: この列には、主キーや一意のインデックスなど、有効なキーを持つテーブルが表示されます。
        • 有効なキーのないフィルター結果: この列には、主キーまたは一意キーを持たないテーブルが表示されます。これらのテーブルは、一意の識別子がないと、下流で重複イベントを処理する際にデータの不整合が発生する可能性があるため、レプリケーション中に問題が発生します。データの整合性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意のキーまたは主キーを追加することをお勧めします。または、これらのテーブルを除外するフィルタールールを追加することもできます。例えば、ルール"!test.tbl1"を使用してテーブルtest.tbl1を除外できます。
      2. イベント フィルターをカスタマイズして、複製するイベントをフィルターします。

        • 一致するテーブル: この列では、イベントフィルターを適用するテーブルを設定できます。ルールの構文は、前述の「テーブルフィルター」領域で使用されているものと同じです。
        • イベント フィルター: 無視するイベントを選択できます。
      3. カラムセレクターをカスタマイズして、イベントから列を選択し、それらの列に関連するデータの変更のみをダウンストリームに送信します。

        • 一致するテーブル: 列セレクターを適用するテーブルを指定します。どのルールにも一致しないテーブルの場合は、すべての列が送信されます。
        • カラムセレクター: 一致したテーブルのどの列をダウンストリームに送信するかを指定します。

        一致ルールの詳細については、 カラムセレクター参照してください。

      4. 「データ形式」領域で、Kafka メッセージの希望する形式を選択します。

        • Avroは、豊富なデータ構造を備えたコンパクトで高速なバイナリデータ形式で、様々なフローシステムで広く使用されています。詳細については、 Avroデータ形式参照してください。
        • Canal-JSONは、解析が容易なプレーンなJSONテキスト形式です。詳細については、 Canal-JSONデータ形式参照してください。
        • Open Protocolは、行レベルのデータ変更通知プロトコルであり、監視、キャッシュ、全文インデックス作成、分析エンジン、および異なるデータベース間のプライマリ-セカンダリレプリケーションのためのデータソースを提供します。詳細については、 オープンプロトコルデータ形式参照してください。
        • Debeziumはデータベースの変更をキャプチャするためのツールです。キャプチャされたデータベースの変更は「イベント」と呼ばれるメッセージに変換され、Kafkaに送信されます。詳細については、 Debeziumデータ形式参照してください。
      5. Kafka メッセージ本文に TiDB 拡張フィールドを追加する場合は、 TiDB 拡張オプションを有効にします。

        TiDB 拡張フィールドの詳細については、 Avro データ形式の TiDB 拡張フィールドおよびCanal-JSON データ形式の TiDB 拡張フィールド参照してください。

      6. データ形式としてAvroを選択した場合、ページにAvro固有の設定が表示されます。これらの設定は以下のように入力できます。

        • DecimalおよびUnsigned BigInt構成では、 TiDB Cloud がKafka メッセージの decimal および unsigned bigint データ型を処理する方法を指定します。
        • スキーマレジストリ領域で、スキーマレジストリエンドポイントを入力します。HTTP認証を有効にする場合は、ユーザー名とパスワードを入力します。
      7. 「トピック配布」領域で配布モードを選択し、モードに応じてトピック名の設定を入力します。

        データ形式としてAvro を選択した場合は、 「配布モード」ドロップダウン リストで「変更ログをテーブルごとに Kafka トピックに配布」モードのみを選択できます。

        配布モードは、変更フィードが Kafka トピックをテーブル別、データベース別、またはすべての変更ログに対して 1 つのトピックを作成する方法を制御します。

        • テーブルごとに変更ログを Kafka Topics に配布する

          変更フィードで各テーブル専用のKafkaトピックを作成したい場合は、このモードを選択してください。すると、テーブルのすべてのKafkaメッセージが専用のKafkaトピックに送信されます。トピックのプレフィックス、データベース名とテーブル名の間の区切り文字、およびサフィックスを設定することで、テーブルのトピック名をカスタマイズできます。例えば、区切り文字を_に設定すると、トピック名は<Prefix><DatabaseName>_<TableName><Suffix>という形式になります。

          スキーマ作成イベントなどの行イベント以外の変更ログについては、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、このような変更ログを収集するために、それに応じたトピックを作成します。

        • データベースごとに変更ログを Kafka Topics に配布する

          変更フィードで各データベース専用のKafkaトピックを作成したい場合は、このモードを選択してください。これにより、データベースのすべてのKafkaメッセージが専用のKafkaトピックに送信されます。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名をカスタマイズできます。

          解決済みTsイベントなど、行イベント以外のイベントの変更ログについては、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、指定されたトピックに基づいて変更ログを収集します。

        • すべての変更ログを指定された 1 つの Kafka トピックに送信する

          チェンジフィードですべての変更ログに対して1つのKafkaトピックを作成したい場合は、このモードを選択してください。そうすると、チェンジフィード内のすべてのKafkaメッセージが1つのKafkaトピックに送信されます。トピック名は「トピック名」フィールドで定義できます。

      8. パーティションディストリビューション領域では、Kafka メッセージを送信するパーティションを指定できます。すべてのテーブルに単一のパーティションディスパッチャを定義することも、テーブルごとに異なるパーティションディスパッチャを定義することもできます。TiDB TiDB Cloud、以下の 4 種類のディスパッチャが提供されています。

        • 主キーまたはインデックス値によって変更ログを Kafka パーティションに分散する

          変更フィードを使用してテーブルのKafkaメッセージを複数のパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの主キーまたはインデックス値によって、変更ログが送信されるパーティションが決まります。主キーを使用する場合は、 「インデックス名」フィールドを空のままにしてください。この分散方法により、パーティションのバランスが向上し、行レベルの順序性が確保されます。

        • テーブルごとに変更ログを Kafka パーティションに分散する

          変更フィードを使用して、テーブルの Kafka メッセージを 1 つの Kafka パーティションに送信する場合は、この分散方法を選択してください。行変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法はテーブルの順序性を保証しますが、パーティションの不均衡が発生する可能性があります。

        • タイムスタンプごとに変更ログを Kafka パーティションに配布する

          変更フィードがKafkaメッセージを異なるKafkaパーティションにランダムに送信するようにしたい場合は、この分散方法を選択してください。行の変更ログのcommitTsによって、変更ログが送信されるパーティションが決まります。この分散方法は、パーティションバランスを改善し、各パーティションの秩序性を確保します。ただし、データ項目の複数の変更が異なるパーティションに送信され、各コンシューマーの進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは複数のパーティションからデータを消費する前に、commitTsでソートする必要があります。

        • 列値ごとに変更ログを Kafka パーティションに分散する

          変更フィードを使用して、テーブルのKafkaメッセージを複数のパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの指定された列値に基づいて、変更ログが送信されるパーティションが決定されます。この分散方法により、各パーティションの順序性が確保され、同じ列値を持つ変更ログが同じパーティションに送信されることが保証されます。

      9. トピックコンフィグレーションエリアで、以下の数値を設定します。変更フィードは、これらの数値に基づいてKafkaトピックを自動的に作成します。

        • レプリケーション係数: 各Kafkaメッセージが何台のKafkaサーバーに複製されるかを制御します。有効な値の範囲はmin.insync.replicasからKafkaブローカーの数までです。
        • パーティション数: トピック内に存在するパーティションの数を制御します。有効な値の範囲は[1, 10 * the number of Kafka brokers]です。
      10. 「イベントの分割」エリアでは、 UPDATEイベントをDELETEつとINSERTイベントに分割するか、 UPDATEイベントのままにするかを選択します。詳細については、 MySQL以外のシンクの主キーまたは一意キーのUPDATEイベントを分割する参照してください。

      11. 「次へ」をクリックします。

      ステップ4. 変更フィードを確認して作成する

      1. 「Changefeed 名」領域で、Changefeed の名前を指定します。
      2. 設定したすべての変更フィード設定を確認してください。必要に応じて「前へ」をクリックして変更してください。
      3. すべての構成が正しい場合は、 「送信」をクリックして変更フィードを作成します。

      このページは役に立ちましたか?