Kafka にデータを複製する
このドキュメントでは、TiCDC を使用して増分データを Apache Kafka に複製する変更フィードを作成する方法について説明します。
レプリケーションタスクを作成する
次のコマンドを実行してレプリケーション タスクを作成します。
cdc cli changefeed create \
--server=http://10.0.10.25:8300 \
--sink-uri="kafka://127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" \
--changefeed-id="simple-replication-task"
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"kafka://127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-11-28T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
--server
: TiCDC クラスター内の任意の TiCDCサーバーのアドレス。--changefeed-id
: レプリケーションタスクのID。形式は正規表現^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$
一致する必要があります。このIDが指定されていない場合、TiCDCは自動的にUUID(バージョン4形式)をIDとして生成します。--sink-uri
: レプリケーションタスクのダウンストリームアドレス。詳細はkafka
でシンクURIを設定する参照してください。--start-ts
: チェンジフィードの開始TSOを指定します。このTSOから、TiCDCクラスターはデータのプルを開始します。デフォルト値は現在時刻です。--target-ts
: チェンジフィードの終了TSOを指定します。このTSOまで、TiCDCクラスターはデータのプルを停止します。デフォルト値は空で、TiCDCはデータのプルを自動的に停止しません。--config
: changefeed設定ファイルを指定します。詳細はTiCDC Changefeedコンフィグレーションパラメータ参照してください。
サポートされているKafkaのバージョン
次の表は、各 TiCDC バージョンでサポートされる最小 Kafka バージョンを示しています。
TiCDCバージョン | サポートされている最小の Kafka バージョン |
---|---|
TiCDC >= v8.1.0 | 2.1.0 |
v7.6.0 <= TiCDC < v8.1.0 | 2.4.0 |
v7.5.2 <= TiCDC < v8.0.0 | 2.1.0 |
v7.5.0 <= TiCDC < v7.5.2 | 2.4.0 |
v6.5.0 <= TiCDC < v7.5.0 | 2.1.0 |
v6.1.0 <= TiCDC < v6.5.0 | 2.0.0 |
Kafka のシンク URI を構成する
シンクURIは、TiCDCターゲットシステムの接続情報を指定するために使用されます。形式は次のとおりです。
[scheme]://[host]:[port][/path]?[query_parameters]
ヒント:
下流のKafkaに複数のホストまたはポートがある場合は、シンクURIに複数の
[host]:[port]
設定できます。例:
[scheme]://[host]:[port],[host]:[port],[host]:[port][/path]?[query_parameters]
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
以下は、Kafka に対して構成できるシンク URI パラメータと値の説明です。
パラメータ/パラメータ値 | 説明 |
---|---|
host | ダウンストリーム Kafka サービスの IP アドレス。 |
port | ダウンストリーム Kafka のポート。 |
topic-name | 変数。Kafka トピックの名前。 |
protocol | Kafka にメッセージを出力するプロトコル。値のオプションはcanal-json 、 open-protocol 、 avro 、 debezium 、 simple です。 |
kafka-version | ダウンストリーム Kafka のバージョン。この値は、ダウンストリーム Kafka の実際のバージョンと一致している必要があります。 |
kafka-client-id | レプリケーション タスクの Kafka クライアント ID を指定します (オプション。デフォルトはTiCDC_sarama_producer_replication ID )。 |
partition-num | ダウンストリーム Kafka パーティションの数 (オプション。値は実際のパーティション数以下にする必要があります。そうでない場合、レプリケーション タスクを正常に作成できません。デフォルトは3 )。 |
max-message-bytes | Kafkaブローカーに毎回送信されるデータの最大サイズ(オプション、デフォルトは10MB )。v5.0.6およびv4.0.6以降、デフォルト値は64MB と256MB から10MB に変更されました。 |
replication-factor | 保存できるKafkaメッセージレプリカの数(オプション、デフォルトは1 )。この値はKafkaのmin.insync.replicas の値である必要があります。 |
required-acks | Produce リクエストで使用されるパラメータ。ブローカーが応答するまでに受信する必要があるレプリカ確認応答の数を通知します。値のオプションは0 ( NoResponse : 応答なし、 TCP ACK のみ提供)、 1 ( WaitForLocal : ローカルコミットが正常に送信された後にのみ応答)、および-1 ( WaitForAll : すべての複製レプリカが正常にコミットされた後に応答) です。複製レプリカの最小数は、ブローカーのmin.insync.replicas 設定項目を使用して設定できます。(オプション、デフォルト値は-1 )。 |
compression | メッセージを送信する際に使用する圧縮アルゴリズム(値の選択肢はnone 、 lz4 、 gzip 、 snappy 、 zstd 。デフォルトはnone )。Snappy圧縮ファイルは公式Snappyフォーマットである必要があります。その他のSnappy圧縮形式はサポートされていません。 |
auto-create-topic | 渡されたtopic-name Kafka クラスターに存在しない場合に、TiCDC がトピックを自動的に作成するかどうかを決定します (オプション、デフォルトはtrue )。 |
enable-tidb-extension | オプション。デフォルトはfalse 。出力プロトコルがcanal-json 場合、値がtrue であれば、TiCDC はウォーターマークイベント送信し、Kafka メッセージにTiDB拡張フィールド追加します。v6.1.0 以降では、このパラメータはavro プロトコルにも適用されます。値がtrue 場合、TiCDC は Kafka メッセージに3つのTiDB拡張フィールド追加します。 |
max-batch-size | v4.0.9 の新機能。メッセージプロトコルが 1 つの Kafka メッセージに複数のデータ変更を出力することをサポートしている場合、このパラメータは 1 つの Kafka メッセージに含まれるデータ変更の最大数を指定します。現在、このパラメータは Kafka のprotocol がopen-protocol (オプション、デフォルトは16 )の場合にのみ有効です。 |
enable-tls | ダウンストリーム Kafka インスタンスに接続するために TLS を使用するかどうか (オプション、デフォルトはfalse )。 |
ca | ダウンストリーム Kafka インスタンスに接続するために必要な CA 証明書ファイルのパス (オプション)。 |
cert | ダウンストリーム Kafka インスタンスに接続するために必要な証明書ファイルのパス (オプション)。 |
key | ダウンストリーム Kafka インスタンスに接続するために必要な証明書キー ファイルのパス (オプション)。 |
insecure-skip-verify | ダウンストリーム Kafka インスタンスに接続するときに証明書の検証をスキップするかどうか (オプション、デフォルトはfalse )。 |
sasl-user | ダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証の ID (authcid) (オプション)。 |
sasl-password | 下流のKafkaインスタンスに接続するために必要なSASL/PLAINまたはSASL/SCRAM認証のパスワード(オプション)。特殊文字が含まれている場合は、URLエンコードする必要があります。 |
sasl-mechanism | 下流のKafkaインスタンスに接続するために必要なSASL認証の名前。値はplain 、 scram-sha-256 、 scram-sha-512 、またはgssapi です。 |
sasl-gssapi-auth-type | gssapi 認証タイプ。値はuser またはkeytab (オプション)です。 |
sasl-gssapi-keytab-path | gssapi キータブ パス (オプション)。 |
sasl-gssapi-kerberos-config-path | gssapi kerberos 構成パス (オプション)。 |
sasl-gssapi-service-name | gssapi サービス名 (オプション)。 |
sasl-gssapi-user | gssapi 認証のユーザー名 (オプション)。 |
sasl-gssapi-password | gssapi認証のパスワード(オプション)。特殊文字が含まれている場合は、URLエンコードする必要があります。 |
sasl-gssapi-realm | gssapi 領域名 (オプション)。 |
sasl-gssapi-disable-pafxfast | gssapi PA-FX-FAST を無効にするかどうか (オプション)。 |
dial-timeout | 下流のKafkaとの接続を確立する際のタイムアウト。デフォルト値は10s です。 |
read-timeout | 下流のKafkaから返されるレスポンスを取得する際のタイムアウト。デフォルト値は10s です。 |
write-timeout | 下流のKafkaにリクエストを送信する際のタイムアウト。デフォルト値は10s です。 |
avro-decimal-handling-mode | avro プロトコルでのみ有効です。Avro が DECIMAL フィールドをどのように処理するかを指定します。値はstring またはprecise で、DECIMAL フィールドを文字列または正確な浮動小数点数にマッピングすることを示します。 |
avro-bigint-unsigned-handling-mode | avro プロトコルでのみ有効です。Avro が BIGINT UNSIGNED フィールドを処理する方法を指定します。値はstring またはlong で、それぞれ BIGINT UNSIGNED フィールドを 64 ビット符号付き数値または文字列にマッピングすることを示します。 |
ベストプラクティス
- 独自のKafkaトピックを作成することをお勧めします。少なくとも、トピックがKafkaブローカーに送信できる各メッセージの最大データ量と、下流のKafkaパーティションの数を設定する必要があります。チェンジフィードを作成する場合、これらの2つの設定はそれぞれ
max-message-bytes
とpartition-num
に対応します。 - まだ存在しないトピックでチェンジフィードを作成した場合、TiCDCは
partition-num
とreplication-factor
パラメータを使用してトピックを作成しようとします。これらのパラメータは明示的に指定することをお勧めします。 - ほとんどの場合、
canal-json
プロトコルを使用することをお勧めします。
注記:
protocol
がopen-protocol
場合、TiCDC は複数のイベントを 1 つの Kafka メッセージにエンコードし、max-message-bytes
で指定された長さを超えるメッセージの生成を回避します。1 行の変更イベントのエンコード結果がmax-message-bytes
を超える場合、変更フィードはエラーを報告し、ログを出力。
TiCDCはKafkaの認証と認可を使用します
以下は、Kafka SASL 認証を使用する場合の例です。
SASL/プレーン
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"SASL/SCRAM
SCRAM-SHA-256とSCRAM-SHA-512はPLAIN方式に似ています。対応する認証方式として
sasl-mechanism
指定するだけです。SASL/GSSAPI
SASL/GSSAPI
user
認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"sasl-gssapi-user
とsasl-gssapi-realm
の値は、Kerberos で指定されている原理と関連しています。例えば、プリンシパルがalice/for-kafka@example.com
に設定されている場合、sasl-gssapi-user
とsasl-gssapi-realm
それぞれalice/for-kafka
とexample.com
として指定されます。SASL/GSSAPI
keytab
認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"SASL/GSSAPI 認証方式の詳細については、 GSSAPIの設定参照してください。
TLS/SSL暗号化
KafkaブローカーでTLS/SSL暗号化が有効になっている場合は、
--sink-uri
に-enable-tls=true
パラメータを追加する必要があります。自己署名証明書を使用する場合は、--sink-uri
にca
、cert
、key
も指定する必要があります。ACL認証
TiCDC が適切に機能するために必要な最小限の権限セットは次のとおりです。
- トピックリソースタイプの
Create
、Write
、およびDescribe
権限。 - クラスタリソース タイプに対する
DescribeConfig
権限。
各権限の使用シナリオは次のとおりです。
リソースタイプ 操作の種類 シナリオ クラスタ DescribeConfig
変更フィードの実行中にクラスターのメタデータを取得します トピック Describe
チェンジフィードの開始時にトピックを作成しようとします トピック Create
チェンジフィードの開始時にトピックを作成しようとします トピック Write
トピックにデータを送信します 変更フィードを作成または開始するときに、指定された Kafka トピックがすでに存在する場合は、
Describe
およびCreate
権限を無効にすることができます。- トピックリソースタイプの
TiCDC を Kafka Connect (Confluent Platform) と統合する
Confluent が提供するデータコネクタ使用して、データをリレーショナル データベースまたは非リレーショナル データベースにストリーミングするには、 avro
プロトコル使用し、 schema-registry
でConfluent スキーマレジストリ URL を指定する必要があります。
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
詳細な統合ガイドについては、 TiDB と Confluent Platform の統合に関するクイック スタート ガイド参照してください。
TiCDC を AWS Glue スキーマレジストリと統合する
v7.4.0以降、TiCDCは、ユーザーがデータレプリケーションにアブロプロトコル選択した場合、スキーマレジストリとしてAWS Glue スキーマレジストリ使用をサポートします。設定例は次のとおりです。
./cdc cli changefeed create --server=127.0.0.1:8300 --changefeed-id="kafka-glue-test" --sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --config changefeed_glue.toml
[sink]
[sink.kafka-config.glue-schema-registry-config]
region="us-west-1"
registry-name="ticdc-test"
access-key="xxxx"
secret-access-key="xxxx"
token="xxxx"
上記の設定では、 region
とregistry-name
必須フィールドですが、 access-key
、 secret-access-key
、 token
オプションフィールドです。AWS認証情報は、changefeed設定ファイルではなく、環境変数として設定するか、 ~/.aws/credentials
ファイルに保存するのがベストプラクティスです。
詳細については、 Go V2 向け公式 AWS SDK ドキュメントを参照してください。
Kafka シンクのトピックおよびパーティションディスパッチャーのルールをカスタマイズする
マッチャールール
dispatchers
の次の構成を例に挙げます。
[sink]
dispatchers = [
{matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" },
{matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" },
{matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"},
{matcher = ['test6.*'], partition = "ts"}
]
- マッチャールールに一致するテーブルは、対応するトピック式で指定されたポリシーに従ってディスパッチされます。例えば、テーブル
test3.aa
は「トピック式2」に従ってディスパッチされ、テーブルtest5.aa
は「トピック式3」に従ってディスパッチされます。 - 複数のマッチャールールに一致するテーブルの場合、最初に一致するトピック式に従ってディスパッチされます。例えば、
test1.aa
テーブルは「トピック式 1」に従ってディスパッチされます。 - どのマッチャールールにも一致しないテーブルの場合、対応するデータ変更イベントは
--sink-uri
で指定されたデフォルトトピックに送信されます。例えば、test10.aa
テーブルはデフォルトトピックに送信されます。 - マッチャールールに一致するもののトピックディスパッチャーを指定していないテーブルの場合、対応するデータ変更は
--sink-uri
で指定されたデフォルトトピックに送信されます。例えば、test6.aa
テーブルはデフォルトトピックに送信されます。
トピックディスパッチャ
topic = "xxx" を使用してトピックディスパッチャを指定し、トピック式を使用して柔軟なトピックディスパッチポリシーを実装できます。トピックの総数は1000未満にすることをお勧めします。
Topic 式の形式は[prefix]{schema}[middle][{table}][suffix]
です。
prefix
: オプション。トピック名のプレフィックスを示します。{schema}
: 必須。スキーマ名を一致させるために使用されます。v7.1.4以降、このパラメータはオプションです。middle
: オプション。スキーマ名とテーブル名の間の区切り文字を示します。{table}
: オプション。テーブル名と一致させるために使用されます。suffix
: オプション。トピック名の接尾辞を示します。
prefix
、 middle
、 suffix
、 a-z
、 A-Z
、 0-9
、 .
、 _
、 -
の文字のみを使用できます。 {schema}
と{table}
どちらも小文字です。 {Schema}
や{TABLE}
などのプレースホルダーは無効です。
例:
matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"
test1.table1
に対応するデータ変更イベントは、hello_test1_table1
という名前のトピックに送信されます。test2.table2
に対応するデータ変更イベントは、hello_test2_table2
という名前のトピックに送信されます。
matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"
test3
内のすべてのテーブルに対応するデータ変更イベントは、hello_test3_world
という名前のトピックに送信されます。test4
内のすべてのテーブルに対応するデータ変更イベントは、hello_test4_world
という名前のトピックに送信されます。
matcher = ['test5.*, 'test6.*'], topic = "hard_code_topic_name"
test5
とtest6
のすべてのテーブルに対応するデータ変更イベントは、トピックhard_code_topic_name
に送信されます。トピック名は直接指定できます。
matcher = ['*.*'], topic = "{schema}_{table}"
- TiCDC が監視するすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。例えば、テーブル
test.account
場合、TiCDC はデータ変更ログをtest_account
という名前のトピックにディスパッチします。
- TiCDC が監視するすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。例えば、テーブル
DDLイベントをディスパッチする
スキーマレベルのDDL
特定のテーブルに関連しないDDLは、スキーマレベルDDLと呼ばれます( create database
やdrop database
など)。スキーマレベルDDLに対応するイベントは、 --sink-uri
で指定されたデフォルトトピックに送信されます。
テーブルレベルのDDL
特定のテーブルに関連するDDLは、テーブルレベルDDLと呼ばれます(例: alter table
create table
。テーブルレベルDDLに対応するイベントは、ディスパッチャの設定に従って対応するトピックに送信されます。
たとえば、 matcher = ['test.*'], topic = {schema}_{table}
ようなディスパッチャーの場合、DDL イベントは次のようにディスパッチされます。
- DDLイベントに単一のテーブルが関係している場合、DDLイベントは対応するトピックにそのまま送信されます。例えば、DDLイベント
drop table test.table1
場合、イベントはtest_table1
という名前のトピックに送信されます。 - DDLイベントに複数のテーブルが関係する場合(
rename table
/drop table
/drop view
複数のテーブルに関係する可能性があります)、DDLイベントは複数のイベントに分割され、対応するトピックに送信されます。例えば、DDLイベントrename table test.table1 to test.table10, test.table2 to test.table20
場合、イベントrename table test.table1 to test.table10
トピックtest_table1
に送信され、イベントrename table test.table2 to test.table20
トピックtest.table2
に送信されます。
パーティションディスパッチャ
partition = "xxx"
パーティションディスパッチャを指定するために使用できます。3、5、7、9、11 default
index-value
ts
のディスパッチャcolumns
サポートされてtable
ます。ディスパッチャのルールは次のとおりです。
default
: デフォルトでtable
ディスパッチャルールを使用します。スキーマ名とテーブル名に基づいてパーティション番号が計算され、テーブルからのデータが必ず同じパーティションに送信されます。その結果、1つのテーブルからのデータは1つのパーティションにのみ存在し、順序付けが保証されます。ただし、このディスパッチャルールは送信スループットを制限し、コンシューマーを追加しても消費速度を向上させることはできません。index-value
: 主キー、一意のインデックス、またはindex
で明示的に指定されたインデックスのいずれかを使用してパーティション番号を計算し、テーブルデータを複数のパーティションに分散します。単一のテーブルのデータは複数のパーティションに送信され、各パーティションのデータは順序付けされます。コンシューマーを追加することで、消費速度を向上させることができます。columns
: 明示的に指定された列の値に基づいてパーティション番号を計算し、テーブルデータを複数のパーティションに分散します。単一のテーブルのデータは複数のパーティションに送信され、各パーティションのデータは順序付けされます。コンシューマーを追加することで、消費速度を向上させることができます。table
: スキーマ名とテーブル名を使用してパーティション番号を計算します。ts
: 行変更のコミットタイムを用いてパーティション番号を計算し、テーブルデータを複数のパーティションに分散します。単一テーブルのデータは複数のパーティションに送信され、各パーティションのデータは順序付けされます。コンシューマーを追加することで、消費速度を向上させることができます。ただし、データ項目に対する複数の変更が異なるパーティションに送信され、各コンシューマーのコンシューマー進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは複数のパーティションからデータを消費する前に、コミットタイムでソートする必要があります。
dispatchers
の次の構成を例に挙げます。
[sink]
dispatchers = [
{matcher = ['test.*'], partition = "index-value"},
{matcher = ['test1.*'], partition = "index-value", index = "index1"},
{matcher = ['test2.*'], partition = "columns", columns = ["id", "a"]},
{matcher = ['test3.*'], partition = "table"},
]
test
データベース内のテーブルはindex-value
ディスパッチャを使用し、主キーまたは一意のインデックスの値を使用してパーティション番号を計算します。主キーが存在する場合は主キーが使用され、存在しない場合は最短の一意のインデックスが使用されます。test1
テーブル内のテーブルはindex-value
ディスパッチャを使用し、index1
という名前のインデックスに含まれるすべての列の値を使用してパーティション番号を計算します。指定されたインデックスが存在しない場合はエラーが報告されますindex
で指定されたインデックスは一意のインデックスである必要があります。test2
データベース内のテーブルはcolumns
ディスパッチャを使用し、列id
とa
の値を使用してパーティション番号を計算します。いずれかの列が存在しない場合は、エラーが報告されます。test3
データベース内のテーブルはtable
ディスパッチャーを使用します。test4
データベース内のテーブルは、前述のルールのいずれにも一致しないため、default
ディスパッチャ、つまりtable
ディスパッチャを使用します。
テーブルが複数のディスパッチャー ルールに一致する場合、最初に一致するルールが優先されます。
注記:
バージョン6.1.0以降、設定の意味を明確にするため、パーティションディスパッチャーを指定するための設定が
dispatcher
からpartition
に変更されました。5partition
dispatcher
の別名です。例えば、次の2つのルールは全く同じ意味です。
[sink] dispatchers = [ {matcher = ['*.*'], dispatcher = "index-value"}, {matcher = ['*.*'], partition = "index-value"}, ]ただし、
dispatcher
とpartition
同じルール内に出現させることはできません。例えば、次のルールは無効です。
{matcher = ['*.*'], dispatcher = "index-value", partition = "table"},
カラムセレクター
列セレクター機能は、イベントから列を選択し、それらの列に関連するデータの変更のみをダウンストリームに送信することをサポートします。
column-selectors
の次の構成を例に挙げます。
[sink]
column-selectors = [
{matcher = ['test.t1'], columns = ['a', 'b']},
{matcher = ['test.*'], columns = ["*", "!b"]},
{matcher = ['test1.t1'], columns = ['column*', '!column1']},
{matcher = ['test3.t'], columns = ["column?", "!column1"]},
]
- 表
test.t1
場合、列a
とb
のみが送信されます。 test
データベース内のテーブル (t1
テーブルを除く) の場合、b
を除くすべての列が送信されます。- 表
test1.t1
場合、column1
を除く、column
で始まるすべての列が送信されます。 - 表
test3.t
場合、column1
を除く、column
で始まる 7 文字の列が送信されます。 - どのルールにも一致しないテーブルの場合、すべての列が送信されます。
注記:
column-selectors
ルールでフィルタリングされた後、テーブル内のデータは、複製されるために主キーまたは一意キーを持っている必要があります。そうでない場合、変更フィードは作成時または実行時にエラーを報告します。
単一の大きなテーブルの負荷を複数の TiCDC ノードにスケールアウトする
この機能は、単一の大規模テーブルのデータレプリケーション範囲を、データ量と1分あたりの変更行数に応じて複数の範囲に分割し、各範囲でレプリケーションされるデータ量と変更行数をほぼ一定にします。この機能は、これらの範囲を複数のTiCDCノードに分散してレプリケーションすることで、複数のTiCDCノードが同時に単一の大規模テーブルをレプリケーションできるようにします。この機能により、以下の2つの問題が解決されます。
- 単一の TiCDC ノードでは、単一の大きなテーブルを時間内に複製することはできません。
- TiCDC ノードによって消費されるリソース (CPU やメモリなど) は均等に分散されません。
サンプル構成:
[scheduler]
# The default value is "false". You can set it to "true" to enable this feature.
enable-table-across-nodes = true
# When you enable this feature, it only takes effect for tables with the number of regions greater than the `region-threshold` value.
region-threshold = 100000
# When you enable this feature, it takes effect for tables with the number of rows modified per minute greater than the `write-key-threshold` value.
# Note:
# * The default value of `write-key-threshold` is 0, which means that the feature does not split the table replication range according to the number of rows modified in a table by default.
# * You can configure this parameter according to your cluster workload. For example, if it is configured as 30000, it means that the feature will split the replication range of a table when the number of modified rows per minute in the table exceeds 30000.
# * When `region-threshold` and `write-key-threshold` are configured at the same time:
# TiCDC will check whether the number of modified rows is greater than `write-key-threshold` first.
# If not, next check whether the number of Regions is greater than `region-threshold`.
write-key-threshold = 30000
次の SQL ステートメントを使用して、テーブルに含まれる地域の数を照会できます。
SELECT COUNT(*) FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS WHERE DB_NAME="database1" AND TABLE_NAME="table1" AND IS_INDEX=0;
Kafka トピックの制限を超えるメッセージを処理する
Kafkaトピックは受信できるメッセージのサイズに制限を設定します。この制限はパラメータmax.message.bytes
によって制御されます。TiCDC Kafkaシンクがこの制限を超えるデータを送信した場合、変更フィードはエラーを報告し、データのレプリケーションを続行できません。この問題を解決するために、TiCDCは新しい設定large-message-handle-option
を追加し、以下のソリューションを提供します。
現在、この機能はCanal-JSONとOpen Protocolの2つのエンコーディングプロトコルをサポートしています。Canal-JSONプロトコルを使用する場合は、 sink-uri
のうちenable-tidb-extension=true
指定する必要があります。
TiCDCデータ圧縮
バージョン7.4.0以降、TiCDC Kafkaシンクは、エンコード直後にデータを圧縮し、圧縮データのサイズとメッセージサイズ制限を比較する機能をサポートしています。この機能により、サイズ制限を超えるメッセージの発生を効果的に削減できます。
構成例は次のとおりです。
[sink.kafka-config.large-message-handle]
# This configuration is introduced in v7.4.0.
# "none" by default, which means that the compression feature is disabled.
# Possible values are "none", "lz4", and "snappy". The default value is "none".
large-message-handle-compression = "none"
large-message-handle-compression
有効になっている場合、コンシューマーが受信したメッセージは特定の圧縮プロトコルを使用してエンコードされ、コンシューマー アプリケーションは指定された圧縮プロトコルを使用してデータをデコードする必要があります。
この機能は、Kafka プロデューサーの圧縮機能とは異なります。
large-message-handle-compression
で指定された圧縮アルゴリズムは、単一のKafkaメッセージを圧縮します。圧縮は、メッセージサイズの制限と比較する前に実行されます。- 同時に、
sink-uri
のcompression
パラメータを使用して圧縮アルゴリズムを設定することもできます。この圧縮アルゴリズムは、複数のKafkaメッセージを含むデータ送信リクエスト全体に適用されます。
large-message-handle-compression
設定した場合、TiCDC はメッセージを受信すると、まずメッセージサイズ制限パラメータの値と比較し、サイズ制限を超えるメッセージを圧縮します。5 にcompression
sink-uri
設定した場合、TiCDC はsink-uri
設定に基づいて、送信データ要求全体をシンクレベルで再度圧縮します。
前述の 2 つの圧縮方法の圧縮率は次のように計算されますcompression ratio = size before compression / size after compression * 100
。
ハンドルキーのみ送信
v7.3.0以降、TiCDC Kafkaシンクは、メッセージサイズが制限を超えた場合にハンドルキーのみを送信することをサポートします。これにより、メッセージサイズが大幅に削減され、Kafkaトピックの制限を超えたメッセージサイズに起因するチェンジフィードエラーやタスクの失敗を回避できます。ハンドルキーとは、以下のものを指します。
- 複製するテーブルに主キーがある場合、主キーはハンドル キーになります。
- テーブルに主キーがなく、NOT NULL 一意キーがある場合、NOT NULL 一意キーがハンドル キーになります。
サンプル構成は次のとおりです。
[sink.kafka-config.large-message-handle]
# large-message-handle-option is introduced in v7.3.0.
# Defaults to "none". When the message size exceeds the limit, the changefeed fails.
# When set to "handle-key-only", if the message size exceeds the limit, only the handle key is sent in the data field. If the message size still exceeds the limit, the changefeed fails.
large-message-handle-option = "claim-check"
ハンドルキーのみでメッセージを消費する
ハンドル キーのみのメッセージ形式は次のとおりです。
{
"id": 0,
"database": "test",
"table": "tp_int",
"pkNames": [
"id"
],
"isDdl": false,
"type": "INSERT",
"es": 1639633141221,
"ts": 1639633142960,
"sql": "",
"sqlType": {
"id": 4
},
"mysqlType": {
"id": "int"
},
"data": [
{
"id": "2"
}
],
"old": null,
"_tidb": { // TiDB extension fields
"commitTs": 429918007904436226, // A TiDB TSO timestamp
"onlyHandleKey": true
}
}
Kafkaコンシューマーはメッセージを受信すると、まずonlyHandleKey
フィールドをチェックします。このフィールドが存在し、値がtrue
場合、メッセージには完全なデータのハンドルキーのみが含まれていることを意味します。この場合、完全なデータを取得するには、上流のTiDBにクエリを実行し、 履歴データを読み取るためのtidb_snapshot
使用する必要があります。
大きなメッセージを外部storageに送信する
バージョン7.4.0以降、TiCDC Kafkaシンクは、メッセージサイズが制限を超えた場合に、大容量メッセージを外部storageに送信できるようになりました。同時に、TiCDCは外部storage内の大容量メッセージのアドレスを含むメッセージをKafkaに送信します。これにより、メッセージサイズがKafkaトピックの制限を超えたことによる変更フィードの失敗を回避できます。
構成例は次のとおりです。
[sink.kafka-config.large-message-handle]
# large-message-handle-option is introduced in v7.3.0.
# Defaults to "none". When the message size exceeds the limit, the changefeed fails.
# When set to "handle-key-only", if the message size exceeds the limit, only the handle key is sent in the data field. If the message size still exceeds the limit, the changefeed fails.
# When set to "claim-check", if the message size exceeds the limit, the message is sent to external storage.
large-message-handle-option = "claim-check"
claim-check-storage-uri = "s3://claim-check-bucket"
large-message-handle-option
"claim-check"
に設定する場合、 claim-check-storage-uri
有効な外部storageアドレスに設定する必要があります。そうでない場合、変更フィードの作成は失敗します。
ヒント
TiCDC における Amazon S3、GCS、Azure Blob Storage の URI パラメータの詳細については、 外部ストレージサービスのURI形式参照してください。
TiCDCは外部storageサービス上のメッセージをクリーンアップしません。データ利用者は外部storageサービスを独自に管理する必要があります。
外部storageから大きなメッセージを消費する
Kafkaコンシューマーは、外部storage内の大きなメッセージのアドレスを含むメッセージを受信します。メッセージの形式は次のとおりです。
{
"id": 0,
"database": "test",
"table": "tp_int",
"pkNames": [
"id"
],
"isDdl": false,
"type": "INSERT",
"es": 1639633141221,
"ts": 1639633142960,
"sql": "",
"sqlType": {
"id": 4
},
"mysqlType": {
"id": "int"
},
"data": [
{
"id": "2"
}
],
"old": null,
"_tidb": { // TiDB extension fields
"commitTs": 429918007904436226, // A TiDB TSO timestamp
"claimCheckLocation": "s3:/claim-check-bucket/${uuid}.json"
}
}
メッセージにclaimCheckLocation
フィールドが含まれている場合、Kafka コンシューマーは、フィールドで指定されたアドレスに従って、JSON 形式で保存された大容量メッセージデータを読み取ります。メッセージ形式は次のとおりです。
{
key: "xxx",
value: "xxx",
}
key
とvalue
フィールドには、エンコードされた大きなメッセージが含まれています。これは、Kafka メッセージの対応するフィールドに送信されるはずでした。コンシューマーは、これらの 2 つの部分のデータを解析することで、大きなメッセージの内容を復元できます。