Kafka にデータを複製する

このドキュメントでは、TiCDC を使用して増分データを Apache Kafka に複製する変更フィードを作成する方法について説明します。

レプリケーションタスクを作成する

次のコマンドを実行してレプリケーション タスクを作成します。

cdc cli changefeed create \ --server=http://10.0.10.25:8300 \ --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" \ --changefeed-id="simple-replication-task"
Create changefeed successfully! ID: simple-replication-task Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
  • --server : TiCDC クラスター内の任意の TiCDCサーバーのアドレス。
  • --changefeed-id : レプリケーション タスクの ID。形式は^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$の正規表現と一致する必要があります。この ID が指定されていない場合、TiCDC は ID として UUID (バージョン 4 形式) を自動的に生成します。
  • --sink-uri : レプリケーションタスクのダウンストリームアドレス。詳細については、 kafkaでシンク URI を設定する参照してください。
  • --start-ts : 変更フィードの開始 TSO を指定します。この TSO から、TiCDC クラスターはデータのプルを開始します。デフォルト値は現在の時刻です。
  • --target-ts : 変更フィード終了 TSO を指定します。この TSO まで、TiCDC クラスターはデータのプルを停止します。デフォルト値は空です。つまり、TiCDC はデータのプルを自動的に停止しません。
  • --config : changefeed設定ファイルを指定します。詳細についてはTiCDC Changefeedコンフィグレーションパラメータ参照してください。

サポートされている Kafka バージョン

次の表は、各 TiCDC バージョンでサポートされる最小 Kafka バージョンを示しています。

TiCDC バージョンサポートされている最小の Kafka バージョン
TiCDC >= v8.1.02.1.0
v7.6.0 <= TiCDC < v8.1.02.4.0
v7.5.2 <= TiCDC < v8.0.02.1.0
v7.5.0 <= TiCDC < v7.5.22.4.0
v6.5.0 <= TiCDC < v7.5.02.1.0
v6.1.0 <= TiCDC < v6.5.02.0.0

Kafka のシンク URI を構成する

シンク URI は、TiCDC ターゲット システムの接続情報を指定するために使用されます。形式は次のとおりです。

[scheme]://[host]:[port][/path]?[query_parameters]

ヒント:

ダウンストリーム Kafka に複数のホストまたはポートがある場合は、シンク URI に複数の[host]:[port]設定できます。例:

[scheme]://[host]:[port],[host]:[port],[host]:[port][/path]?[query_parameters]

サンプル構成:

--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"

以下は、Kafka に設定できるシンク URI パラメータと値の説明です。

パラメータ/パラメータ値説明
127.0.0.1ダウンストリーム Kafka サービスの IP アドレス。
9092ダウンストリーム Kafka のポート。
topic-name変数。Kafka トピックの名前。
protocolメッセージが Kafka に出力されるプロトコル。値のオプションはcanal-jsonopen-protocolavrodebeziumsimpleです。
kafka-versionダウンストリーム Kafka のバージョン。この値は、ダウンストリーム Kafka の実際のバージョンと一致している必要があります。
kafka-client-idレプリケーション タスクの Kafka クライアント ID を指定します (オプション。デフォルトはTiCDC_sarama_producer_replication ID )。
partition-numダウンストリーム Kafka パーティションの数 (オプション。値は実際のパーティション数以下にする必要があります。そうでない場合、レプリケーション タスクを正常に作成できません。デフォルトは3 )。
max-message-bytesKafka ブローカーに毎回送信されるデータの最大サイズ (オプション、デフォルトは10MB )。v5.0.6 および v4.0.6 から、デフォルト値は64MB256MBから10MBに変更されました。
replication-factor保存できる Kafka メッセージ レプリカの数 (オプション、デフォルトは1 )。この値は、Kafka の値min.insync.replicas以上である必要があります。
required-acksProduceリクエストで使用されるパラメータ。ブローカーが応答する前に受信する必要があるレプリカ確認応答の数を通知します。値のオプションは、 0 ( NoResponse : 応答なし、 TCP ACKのみが提供されます)、 1 ( WaitForLocal : ローカルコミットが正常に送信された後にのみ応答します)、および-1 ( WaitForAll : すべてのレプリケートされたレプリカが正常にコミットされた後に応答します。ブローカーのmin.insync.replicas構成項目を使用して、レプリケートされたレプリカの最小数を設定できます) です。(オプション、デフォルト値は-1です)。
compressionメッセージを送信するときに使用する圧縮アルゴリズム (値のオプションはnonelz4gzipsnappyzstdで、デフォルトはnoneです)。Snappy 圧縮ファイルは公式Snappyフォーマットである必要があります。Snappy 圧縮の他のバリアントはサポートされていません。
auto-create-topic渡されたtopic-name Kafka クラスターに存在しない場合に、TiCDC がトピックを自動的に作成するかどうかを決定します (オプション、デフォルトはtrue )。
enable-tidb-extensionオプション。デフォルトはfalseです。出力プロトコルがcanal-json場合、値がtrueであれば、TiCDC はウォーターマークイベントを送信し、 TiDB拡張フィールド Kafka メッセージに追加します。v6.1.0 以降では、このパラメータはavroプロトコルにも適用されます。値がtrueの場合、TiCDC は3つのTiDB拡張フィールド Kafka メッセージに追加します。
max-batch-sizev4.0.9 の新機能。メッセージ プロトコルが 1 つの Kafka メッセージに複数のデータ変更を出力することをサポートしている場合、このパラメーターは 1 つの Kafka メッセージ内のデータ変更の最大数を指定します。現在、Kafka のprotocolopen-protocol (オプション、デフォルトは16 ) の場合にのみ有効です。
enable-tlsダウンストリーム Kafka インスタンスに接続するために TLS を使用するかどうか (オプション、デフォルトはfalse )。
caダウンストリーム Kafka インスタンスに接続するために必要な CA 証明書ファイルのパス (オプション)。
certダウンストリーム Kafka インスタンスに接続するために必要な証明書ファイルのパス (オプション)。
keyダウンストリーム Kafka インスタンスに接続するために必要な証明書キー ファイルのパス (オプション)。
insecure-skip-verifyダウンストリーム Kafka インスタンスに接続するときに証明書の検証をスキップするかどうか (オプション、デフォルトはfalse )。
sasl-userダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証の ID (authcid) (オプション)。
sasl-passwordダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証のパスワード (オプション)。特殊文字が含まれている場合は、URL エンコードする必要があります。
sasl-mechanismダウンストリーム Kafka インスタンスに接続するために必要な SASL 認証の名前。値はplainscram-sha-256scram-sha-512 、またはgssapiになります。
sasl-gssapi-auth-typegssapi 認証タイプ。値はuserまたはkeytab (オプション) です。
sasl-gssapi-keytab-pathgssapi キータブ パス (オプション)。
sasl-gssapi-kerberos-config-pathgssapi kerberos 構成パス (オプション)。
sasl-gssapi-service-namegssapi サービス名 (オプション)。
sasl-gssapi-usergssapi 認証のユーザー名 (オプション)。
sasl-gssapi-passwordgssapi 認証のパスワード (オプション)。特殊文字が含まれている場合は、URL エンコードする必要があります。
sasl-gssapi-realmgssapi レルム名 (オプション)。
sasl-gssapi-disable-pafxfastgssapi PA-FX-FAST を無効にするかどうか (オプション)。
dial-timeoutダウンストリーム Kafka との接続を確立する際のタイムアウト。デフォルト値は10sです。
read-timeoutダウンストリーム Kafka から返される応答を取得する際のタイムアウト。デフォルト値は10sです。
write-timeoutダウンストリーム Kafka にリクエストを送信する際のタイムアウト。デフォルト値は10sです。
avro-decimal-handling-modeavroプロトコルでのみ有効です。Avro が DECIMAL フィールドを処理する方法を決定します。値はstringまたはpreciseで、DECIMAL フィールドを文字列または正確な浮動小数点数にマッピングすることを示します。
avro-bigint-unsigned-handling-modeavroプロトコルでのみ有効です。Avro が BIGINT UNSIGNED フィールドを処理する方法を決定します。値はstringまたはlongで、BIGINT UNSIGNED フィールドを 64 ビットの符号付き数値または文字列にマッピングすることを示します。

ベストプラクティス

  • 独自の Kafka Topic を作成することをお勧めします。少なくとも、Topic が Kafka ブローカーに送信できる各メッセージの最大データ量と、下流の Kafka パーティションの数を設定する必要があります。changefeed を作成する場合、これら 2 つの設定はそれぞれmax-message-bytespartition-numに対応します。
  • まだ存在しないトピックで changefeed を作成すると、TiCDC はpartition-numreplication-factorパラメータを使用してトピックを作成しようとします。これらのパラメータを明示的に指定することをお勧めします。
  • ほとんどの場合、 canal-jsonプロトコルを使用することをお勧めします。

注記:

protocolopen-protocol場合、TiCDC は複数のイベントを 1 つの Kafka メッセージにエンコードし、 max-message-bytesで指定された長さを超えるメッセージの生成を回避します。単一行変更イベントのエンコードされた結果がmax-message-bytesの値を超えると、変更フィードはエラーを報告し、ログを出力。

TiCDCはKafkaの認証と認可を使用します

以下は、Kafka SASL 認証を使用する場合の例です。

  • SASL/プレーン

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"
  • SASL/SCRAM

    SCRAM-SHA-256 と SCRAM-SHA-512 は PLAIN 方式に似ています。対応する認証方法としてsasl-mechanism指定するだけです。

  • SASL/GSSAPI

    SASL/GSSAPI user認証:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"

    sasl-gssapi-usersasl-gssapi-realmの値は、Kerberos で指定された原理に関連しています。たとえば、プリンシパルがalice/for-kafka@example.comに設定されている場合、 sasl-gssapi-usersasl-gssapi-realmそれぞれalice/for-kafkaexample.comとして指定されます。

    SASL/GSSAPI keytab認証:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"

    SASL/GSSAPI 認証方式の詳細については、 GSSAPI の設定参照してください。

  • TLS/SSL暗号化

    Kafka ブローカーで TLS/SSL 暗号化が有効になっている場合は、 --sink-uri-enable-tls=trueパラメータを追加する必要があります。自己署名証明書を使用する場合は、 --sink-uricacertkeyも指定する必要があります。

  • ACL 認証

    TiCDC が適切に機能するために必要な最小限の権限セットは次のとおりです。

    • トピックリソースタイプCreateWrite 、およびDescribe権限。
    • クラスタリソース タイプに対するDescribeConfigの権限。

    各権限の使用シナリオは次のとおりです。

    リソースタイプ操作の種類シナリオ
    クラスタDescribeConfig変更フィードの実行中にクラスターのメタデータを取得します
    トピックDescribeチェンジフィードの開始時にトピックを作成しようとします
    トピックCreateチェンジフィードの開始時にトピックを作成しようとします
    トピックWriteトピックにデータを送信する

    変更フィードを作成または開始するときに、指定された Kafka トピックがすでに存在する場合は、 DescribeおよびCreate権限を無効にすることができます。

TiCDC を Kafka Connect (Confluent Platform) と統合する

Confluent が提供するデータコネクタ使用してリレーショナル データベースまたは非リレーショナル データベースにデータをストリーミングするには、 avroプロトコル使用し、 schema-registryConfluent スキーマ レジストリの URL を指定する必要があります。

サンプル構成:

--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink] dispatchers = [ {matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, ]

詳細な統合ガイドについては、 TiDB と Confluent Platform の統合に関するクイック スタート ガイド参照してください。

TiCDC を AWS Glue スキーマレジストリと統合する

v7.4.0 以降、TiCDC は、ユーザーがデータ レプリケーションにアブロプロトコル選択した場合に、スキーマ レジストリとしてAWS Glue スキーマレジストリ使用することをサポートします。構成例は次のとおりです。

./cdc cli changefeed create --server=127.0.0.1:8300 --changefeed-id="kafka-glue-test" --sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --config changefeed_glue.toml
[sink] [sink.kafka-config.glue-schema-registry-config] region="us-west-1" registry-name="ticdc-test" access-key="xxxx" secret-access-key="xxxx" token="xxxx"

上記の設定では、 regionregistry-name必須フィールドですが、 access-keysecret-access-keytokenはオプションフィールドです。ベストプラクティスは、AWS 認証情報を環境変数として設定するか、changefeed 設定ファイルで設定するのではなく、 ~/.aws/credentialsファイルに保存することです。

詳細については、 Go V2 向け公式 AWS SDK ドキュメントを参照してください。

Kafka シンクのトピックおよびパーティションディスパッチャーのルールをカスタマイズする

マッチャールール

dispatchersの次の構成を例に挙げます。

[sink] dispatchers = [ {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, {matcher = ['test6.*'], partition = "ts"} ]
  • マッチャールールに一致するテーブルについては、対応するトピック式で指定されたポリシーに従ってディスパッチされます。たとえば、テーブルtest3.aaは「トピック式 2」に従ってディスパッチされ、テーブルtest5.aaは「トピック式 3」に従ってディスパッチされます。
  • 複数のマッチャールールに一致するテーブルの場合、最初に一致するトピック式に従ってディスパッチされます。たとえば、 test1.aaテーブルは「トピック式 1」に従って配布されます。
  • どのマッチャールールにも一致しないテーブルの場合、対応するデータ変更イベントは--sink-uriで指定されたデフォルトのトピックに送信されます。たとえば、 test10.aaテーブルはデフォルトのトピックに送信されます。
  • マッチャー ルールに一致するがトピック ディスパッチャーを指定していないテーブルの場合、対応するデータ変更は--sink-uriで指定されたデフォルト トピックに送信されます。たとえば、 test6.aaテーブルはデフォルト トピックに送信されます。

トピックディスパッチャ

topic = "xxx" を使用してトピック ディスパッチャーを指定し、トピック式を使用して柔軟なトピック ディスパッチ ポリシーを実装できます。トピックの合計数は 1000 未満にすることをお勧めします。

トピック式の形式は[prefix]{schema}[middle][{table}][suffix]です。

  • prefix : オプション。トピック名のプレフィックスを示します。
  • {schema} : 必須。スキーマ名を一致させるために使用されます。v7.1.4 以降では、このパラメーターはオプションです。
  • middle : オプション。スキーマ名とテーブル名の間の区切り文字を示します。
  • {table} : オプション。テーブル名を一致させるために使用されます。
  • suffix : オプション。トピック名の接尾辞を示します。

prefixmiddlesuffixa-zA-Z0-9._-の文字のみを含めることができます。 {schema}{table}両方とも小文字です。 {Schema}{TABLE}などのプレースホルダーは無効です。

例:

  • matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"
    • test1.table1に対応するデータ変更イベントは、 hello_test1_table1という名前のトピックに送信されます。
    • test2.table2に対応するデータ変更イベントは、 hello_test2_table2という名前のトピックに送信されます。
  • matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"
    • test3内のすべてのテーブルに対応するデータ変更イベントは、 hello_test3_worldという名前のトピックに送信されます。
    • test4内のすべてのテーブルに対応するデータ変更イベントは、 hello_test4_worldという名前のトピックに送信されます。
  • matcher = ['test5.*, 'test6.*'], topic = "hard_code_topic_name"
    • test5test6のすべてのテーブルに対応するデータ変更イベントは、 hard_code_topic_nameという名前のトピックに送信されます。トピック名を直接指定できます。
  • matcher = ['*.*'], topic = "{schema}_{table}"
    • TiCDC がリッスンするすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。たとえば、 test.accountテーブルの場合、TiCDC はデータ変更ログをtest_accountという名前のトピックにディスパッチします。

DDLイベントをディスパッチする

スキーマレベルの DDL

特定のテーブルに関連しない DDL は、 create databasedrop databaseなどのスキーマ レベル DDL と呼ばれます。スキーマ レベル DDL に対応するイベントは、 --sink-uriで指定されたデフォルト トピックに送信されます。

テーブルレベルの DDL

特定のテーブルに関連する DDL は、 alter tablecreate tableなどのテーブル レベル DDL と呼ばれます。テーブル レベル DDL に対応するイベントは、ディスパッチャ構成に従って対応するトピックに送信されます。

たとえば、 matcher = ['test.*'], topic = {schema}_{table}ようなディスパッチャーの場合、DDL イベントは次のようにディスパッチされます。

  • DDL イベントに 1 つのテーブルが関係している場合、DDL イベントはそのまま対応するトピックに送信されます。たとえば、DDL イベントdrop table test.table1の場合、イベントはtest_table1という名前のトピックに送信されます。
  • DDL イベントに複数のテーブルが関係する場合 ( rename table / drop table / drop viewは複数のテーブルが関係する可能性があります)、DDL イベントは複数のイベントに分割され、対応するトピックに送信されます。たとえば、DDL イベントrename table test.table1 to test.table10, test.table2 to test.table20の場合、イベントrename table test.table1 to test.table10トピックtest_table1に送信され、イベントrename table test.table2 to test.table20はトピックtest.table2に送信されます。

パーティションディスパッチャ

partition = "xxx"使用してパーティション ディスパッチャーを指定できます。 defaultindex-valuecolumnstable 、およびtsの 5 つのディスパッチャーがサポートされています。ディスパッチャーのルールは次のとおりです。

  • default : デフォルトでtableディスパッチャ ルールを使用します。スキーマ名とテーブル名を使用してパーティション番号を計算し、テーブルのデータが同じパーティションに送信されるようにします。その結果、1 つのテーブルのデータは 1 つのパーティションにのみ存在し、順序付けが保証されます。ただし、このディスパッチャ ルールは送信スループットを制限し、コンシューマーを追加しても消費速度を向上させることはできません。
  • index-value : 主キー、一意のインデックス、またはindexで明示的に指定されたインデックスを使用してパーティション番号を計算し、テーブル データを複数のパーティションに分散します。1 つのテーブルのデータは複数のパーティションに送信され、各パーティションのデータは順序付けされます。コンシューマーを追加することで、消費速度を向上させることができます。
  • columns : 明示的に指定された列の値を使用してパーティション番号を計算し、テーブル データを複数のパーティションに分散します。単一のテーブルのデータは複数のパーティションに送信され、各パーティションのデータは順序付けされます。コンシューマーを追加することで、消費速度を向上させることができます。
  • table : スキーマ名とテーブル名を使用してパーティション番号を計算します。
  • ts : 行変更の commitTs を使用してパーティション番号を計算し、テーブル データを複数のパーティションに分散します。単一のテーブルからのデータは複数のパーティションに送信され、各パーティションのデータは順序付けされます。コンシューマーを追加することで、消費速度を向上させることができます。ただし、データ項目の複数の変更が異なるパーティションに送信され、異なるコンシューマーのコンシューマーの進行状況が異なる場合があり、データの不整合が発生する可能性があります。したがって、コンシューマーは、消費する前に、複数のパーティションからのデータを commitTs で並べ替える必要があります。

dispatchersの次の構成を例に挙げます。

[sink] dispatchers = [ {matcher = ['test.*'], partition = "index-value"}, {matcher = ['test1.*'], partition = "index-value", index = "index1"}, {matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]}, {matcher = ['test3.*'], partition = "table"}, ]
  • testデータベース内のテーブルはindex-valueディスパッチャを使用し、主キーまたは一意のインデックスの値を使用してパーティション番号を計算します。主キーが存在する場合は、主キーが使用されます。それ以外の場合は、最短の一意のインデックスが使用されます。
  • test1テーブル内のテーブルはindex-valueディスパッチャを使用し、 index1という名前のインデックス内のすべての列の値を使用してパーティション番号を計算します。指定されたインデックスが存在しない場合は、エラーが報告されます。 indexで指定されたインデックスは一意のインデックスである必要があることに注意してください。
  • test2データベース内のテーブルはcolumnsディスパッチャを使用し、列idaの値を使用してパーティション番号を計算します。いずれかの列が存在しない場合は、エラーが報告されます。
  • test3データベース内のテーブルはtableディスパッチャーを使用します。
  • test4データベース内のテーブルは、前述のルールのいずれにも一致しないため、 defaultディスパッチャー、つまりtableディスパッチャーを使用します。

テーブルが複数のディスパッチャー ルールに一致する場合、最初に一致するルールが優先されます。

注記:

v6.1.0 以降、構成の意味を明確にするために、パーティション ディスパッチャーを指定するために使用される構成がdispatcherからpartitionに変更され、 partitiondispatcherの別名になりました。たとえば、次の 2 つのルールはまったく同じです。

[sink] dispatchers = [ {matcher = ['*.*'], dispatcher = "index-value"}, {matcher = ['*.*'], partition = "index-value"}, ]

ただし、 dispatcherpartition同じルールに出現させることはできません。たとえば、次のルールは無効です。

{matcher = ['*.*'], dispatcher = "index-value", partition = "table"},

カラムセレクター

列セレクター機能は、イベントから列を選択し、それらの列に関連するデータの変更のみをダウンストリームに送信することをサポートします。

column-selectorsの次の構成を例に挙げます。

[sink] column-selectors = [ {matcher = ['test.t1'], columns = ['a', 'b']}, {matcher = ['test.*'], columns = ["*", "!b"]}, {matcher = ['test1.t1'], columns = ['column*', '!column1']}, {matcher = ['test3.t'], columns = ["column?", "!column1"]}, ]
  • test.t1の場合、列abのみが送信されます。
  • testデータベース内のテーブル ( t1テーブルを除く) の場合、 bを除くすべての列が送信されます。
  • test1.t1の場合、 column1を除くcolumnで始まるすべての列が送信されます。
  • test3.tの場合、 column1を除く、 columnで始まる 7 文字の列が送信されます。
  • どのルールにも一致しないテーブルの場合、すべての列が送信されます。

注記:

column-selectorsルールでフィルタリングされた後、テーブル内のデータには、複製される主キーまたは一意のキーが必要です。そうでない場合、変更フィードは作成時または実行時にエラーを報告します。

単一の大きなテーブルの負荷を複数の TiCDC ノードにスケールアウトする

この機能は、1 つの大きなテーブルのデータ レプリケーション範囲を、データ量と 1 分あたりの変更行数に応じて複数の範囲に分割し、各範囲でレプリケートされるデータ量と変更行数をほぼ同じにします。この機能は、これらの範囲を複数の TiCDC ノードに分散してレプリケーションするため、複数の TiCDC ノードが同時に 1 つの大きなテーブルをレプリケートできます。この機能により、次の 2 つの問題を解決できます。

  • 単一の TiCDC ノードでは、単一の大きなテーブルを時間内に複製することはできません。
  • TiCDC ノードによって消費されるリソース (CPU やメモリなど) は均等に分散されません。

サンプル構成:

[scheduler] # The default value is "false". You can set it to "true" to enable this feature. enable-table-across-nodes = true # When you enable this feature, it only takes effect for tables with the number of regions greater than the `region-threshold` value. region-threshold = 100000 # When you enable this feature, it takes effect for tables with the number of rows modified per minute greater than the `write-key-threshold` value. # Note: # * The default value of `write-key-threshold` is 0, which means that the feature does not split the table replication range according to the number of rows modified in a table by default. # * You can configure this parameter according to your cluster workload. For example, if it is configured as 30000, it means that the feature will split the replication range of a table when the number of modified rows per minute in the table exceeds 30000. # * When `region-threshold` and `write-key-threshold` are configured at the same time: # TiCDC will check whether the number of modified rows is greater than `write-key-threshold` first. # If not, next check whether the number of Regions is greater than `region-threshold`. write-key-threshold = 30000

次の SQL ステートメントを使用して、テーブルに含まれるリージョンの数を照会できます。

SELECT COUNT(*) FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS WHERE DB_NAME="database1" AND TABLE_NAME="table1" AND IS_INDEX=0;

Kafkaトピックの制限を超えるメッセージを処理する

Kafka トピックは、受信できるメッセージのサイズに制限を設定します。この制限は、 max.message.bytesパラメータによって制御されます。TiCDC Kafka シンクがこの制限を超えるデータを送信すると、変更フィードはエラーを報告し、データのレプリケーションを続行できません。この問題を解決するために、TiCDC は新しい構成large-message-handle-optionを追加し、次のソリューションを提供します。

現在、この機能は Canal-JSON と Open Protocol の 2 つのエンコード プロトコルをサポートしています。Canal-JSON プロトコルを使用する場合は、 sink-uriのうちenable-tidb-extension=true指定する必要があります。

TiCDC データ圧縮

v7.4.0 以降、TiCDC Kafka シンクは、エンコード直後にデータを圧縮し、圧縮されたデータ サイズをメッセージ サイズ制限と比較することをサポートします。この機能により、サイズ制限を超えるメッセージの発生を効果的に減らすことができます。

構成例は次のとおりです。

[sink.kafka-config.large-message-handle] # This configuration is introduced in v7.4.0. # "none" by default, which means that the compression feature is disabled. # Possible values are "none", "lz4", and "snappy". The default value is "none". large-message-handle-compression = "none"

large-message-handle-compressionが有効になっている場合、コンシューマーが受信したメッセージは特定の圧縮プロトコルを使用してエンコードされ、コンシューマー アプリケーションは指定された圧縮プロトコルを使用してデータをデコードする必要があります。

この機能は、Kafka プロデューサーの圧縮機能とは異なります。

  • large-message-handle-compressionで指定された圧縮アルゴリズムは、単一の Kafka メッセージを圧縮します。圧縮は、メッセージ サイズの制限と比較する前に実行されます。
  • 同時に、 sink-uricompressionパラメータを使用して圧縮アルゴリズムを構成することもできます。この圧縮アルゴリズムは、複数の Kafka メッセージを含むデータ送信リクエスト全体に適用されます。

large-message-handle-compression設定すると、TiCDC はメッセージを受信すると、まずメッセージ サイズ制限パラメータの値と比較し、サイズ制限より大きいメッセージを圧縮します。 sink-uricompressionも設定すると、TiCDC はsink-uri設定に基づいて、送信データ要求全体をシンク レベルで再度圧縮します。

前述の 2 つの圧縮方法の圧縮率は次のように計算されますcompression ratio = size before compression / size after compression * 100

ハンドルキーのみ送信

v7.3.0 以降、TiCDC Kafka シンクは、メッセージ サイズが制限を超えた場合にハンドル キーのみの送信をサポートします。これにより、メッセージ サイズが大幅に削減され、メッセージ サイズが Kafka トピック制限を超えたために発生する変更フィード エラーやタスクの失敗を回避できます。ハンドル キーとは、次のものを指します。

  • 複製するテーブルに主キーがある場合、主キーはハンドル キーになります。
  • テーブルに主キーがなく、NOT NULL 一意キーがある場合、NOT NULL 一意キーがハンドル キーになります。

サンプル構成は次のとおりです。

[sink.kafka-config.large-message-handle] # large-message-handle-option is introduced in v7.3.0. # Defaults to "none". When the message size exceeds the limit, the changefeed fails. # When set to "handle-key-only", if the message size exceeds the limit, only the handle key is sent in the data field. If the message size still exceeds the limit, the changefeed fails. large-message-handle-option = "claim-check"

ハンドルキーのみでメッセージを消費する

ハンドル キーのみのメッセージ形式は次のとおりです。

{ "id": 0, "database": "test", "table": "tp_int", "pkNames": [ "id" ], "isDdl": false, "type": "INSERT", "es": 1639633141221, "ts": 1639633142960, "sql": "", "sqlType": { "id": 4 }, "mysqlType": { "id": "int" }, "data": [ { "id": "2" } ], "old": null, "_tidb": { // TiDB extension fields "commitTs": 429918007904436226, // A TiDB TSO timestamp "onlyHandleKey": true } }

Kafka コンシューマーがメッセージを受信すると、まずonlyHandleKeyフィールドをチェックします。このフィールドが存在し、 trueである場合、メッセージには完全なデータのハンドル キーのみが含まれていることを意味します。この場合、完全なデータを取得するには、上流の TiDB をクエリして履歴データを読み取るためのtidb_snapshot使用する必要があります。

大きなメッセージを外部storageに送信する

v7.4.0 以降、TiCDC Kafka シンクは、メッセージ サイズが制限を超えた場合に、大きなメッセージを外部storageに送信することをサポートします。一方、TiCDC は、外部storage内の大きなメッセージのアドレスを含むメッセージを Kafka に送信します。これにより、メッセージ サイズが Kafka トピック制限を超えたために発生する変更フィード障害を回避できます。

構成例は次のとおりです。

[sink.kafka-config.large-message-handle] # large-message-handle-option is introduced in v7.3.0. # Defaults to "none". When the message size exceeds the limit, the changefeed fails. # When set to "handle-key-only", if the message size exceeds the limit, only the handle key is sent in the data field. If the message size still exceeds the limit, the changefeed fails. # When set to "claim-check", if the message size exceeds the limit, the message is sent to external storage. large-message-handle-option = "claim-check" claim-check-storage-uri = "s3://claim-check-bucket"

large-message-handle-option "claim-check"に設定する場合、 claim-check-storage-uri有効な外部storageアドレスに設定する必要があります。そうしないと、変更フィードの作成が失敗します。

ヒント

TiCDC における Amazon S3、GCS、Azure Blob Storage の URI パラメータの詳細については、 外部ストレージサービスの URI 形式参照してください。

TiCDC は外部storageサービス上のメッセージをクリーンアップしません。データ コンシューマーは外部storageサービスを独自に管理する必要があります。

外部storageから大きなメッセージを消費する

Kafka コンシューマーは、外部storage内の大きなメッセージのアドレスを含むメッセージを受信します。メッセージの形式は次のとおりです。

{ "id": 0, "database": "test", "table": "tp_int", "pkNames": [ "id" ], "isDdl": false, "type": "INSERT", "es": 1639633141221, "ts": 1639633142960, "sql": "", "sqlType": { "id": 4 }, "mysqlType": { "id": "int" }, "data": [ { "id": "2" } ], "old": null, "_tidb": { // TiDB extension fields "commitTs": 429918007904436226, // A TiDB TSO timestamp "claimCheckLocation": "s3:/claim-check-bucket/${uuid}.json" } }

メッセージにclaimCheckLocationフィールドが含まれている場合、Kafka コンシューマーは、フィールドによって提供されるアドレスに従って、JSON 形式で保存された大きなメッセージ データを読み取ります。メッセージの形式は次のとおりです。

{ key: "xxx", value: "xxx", }

keyおよびvalueフィールドは、Kafka メッセージの同じ名前のフィールドに対応します。コンシューマーは、これらの 2 つのフィールドのデータを解析することで、元の大きなメッセージを取得できます。オープン プロトコルでエンコードされた Kafka メッセージのみ、 keyフィールドに有効なコンテンツが含まれます。TiCDC は、 keyvalue両方を 1 つの JSON オブジェクトにエンコードして、完全なメッセージを一度に配信します。他のプロトコルの場合、 keyフィールドは常に空です。

valueフィールドを外部storageにのみ送信する

v8.4.0 以降、TiCDC は Kafka メッセージのvalueフィールドのみを外部storageに送信することをサポートします。この機能は、非オープン プロトコル シナリオにのみ適用されます。この機能は、 claim-check-raw-valueパラメータ (デフォルトはfalseを設定することで制御できます。

注記:

オープンプロトコルを使用する場合、 claim-check-raw-valuetrueに設定するとエラーが発生します。

claim-check-raw-value trueに設定すると、changefeed は Kafka メッセージのvalueフィールドを、 keyvalueの追加の JSON シリアル化なしで外部storageに直接送信します。これにより、CPU オーバーヘッドが削減されます。さらに、コンシューマーは外部storageから直接消費可能なデータを読み取ることができるため、逆シリアル化のオーバーヘッドが削減されます。

構成例は次のとおりです。

protocol = "simple" [sink.kafka-config.large-message-handle] large-message-handle-option = "claim-check" claim-check-storage-uri = "s3://claim-check-bucket" claim-check-raw-value = true

このページは役に立ちましたか?