TiCDC シンプルプロトコル

TiCDC シンプル プロトコルは、監視、キャッシュ、フルテキスト インデックス、分析エンジン、異種データベース間のプライマリ/セカンダリ レプリケーション用のデータ ソースを提供する行レベルのデータ変更通知プロトコルです。このドキュメントでは、TiCDC シンプル プロトコルの使用法とデータ形式の実装について説明します。

TiCDCシンプルプロトコルを使用する

Kafka をダウンストリームとして使用する場合は、changefeed 構成でprotocol "simple"に指定します。すると、TiCDC は各行の変更または DDL イベントをメッセージとしてエンコードし、データ変更イベントをダウンストリームに送信します。

シンプル プロトコルを使用するための構成例は次のとおりです。

sink-uri構成:

--sink-uri = "kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0"

Changefeed 構成:

[sink] protocol = "simple" # The following configuration parameters control the sending behavior of bootstrap messages. # send-bootstrap-interval-in-sec controls the time interval for sending bootstrap messages, in seconds. # The default value is 120 seconds, which means that a bootstrap message is sent every 120 seconds for each table. send-bootstrap-interval-in-sec = 120 # send-bootstrap-in-msg-count controls the message interval for sending bootstrap, in message count. # The default value is 10000, which means that a bootstrap message is sent every 10000 row changed messages for each table. send-bootstrap-in-msg-count = 10000 # Note: If you want to disable the sending of bootstrap messages, set both send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count to 0. # send-bootstrap-to-all-partition controls whether to send bootstrap messages to all partitions. # The default value is true, which means that bootstrap messages are sent to all partitions of the corresponding table topic. # Setting it to false means bootstrap messages are sent to only the first partition of the corresponding table topic. send-bootstrap-to-all-partition = true [sink.kafka-config.codec-config] # encoding-format controls the encoding format of the Simple protocol messages. Currently, the Simple protocol message supports "json" and "avro" encoding formats. # The default value is "json". encoding-format = "json"

メッセージの種類

TiCDC シンプル プロトコルには、次のメッセージ タイプがあります。

DDL:

  • CREATE : テーブル作成イベント。
  • RENAME : テーブルの名前変更イベント。
  • CINDEX : インデックス作成イベント。
  • DINDEX : インデックス削除イベント。
  • ERASE : テーブル削除イベント。
  • TRUNCATE : テーブル切り捨てイベント。
  • ALTER : 列の追加、列の削除、列タイプの変更、および TiCDC でサポートされているその他のALTER TABLEステートメントを含む、テーブル変更イベント。
  • QUERY : その他の DDL イベント。

DM: いいえ

  • INSERT : 挿入イベント。
  • UPDATE : 更新イベント。
  • DELETE : 削除イベント。

他の:

  • WATERMARK : 上流 TiDB クラスターの TSO (つまり、64 ビットのタイムスタンプ) を含み、テーブル レプリケーションの進行状況を示します。ウォーターマークより前のすべてのイベントは下流に送信されています。
  • BOOTSTRAP : ダウンストリームのテーブル スキーマを構築するために使用されるテーブルのスキーマ情報が含まれます。

メッセージ形式

Simple プロトコルでは、各メッセージには 1 つのイベントのみが含まれます。Simple プロトコルは、JSON 形式と Avro 形式のメッセージのエンコードをサポートしています。このドキュメントでは、例として JSON 形式を使用します。Avro 形式のメッセージの場合、フィールドと意味は JSON 形式のメッセージと同じですが、エンコード形式が異なります。Avro 形式の詳細については、 シンプルプロトコル Avro スキーマ参照してください。

DDL

TiCDC は、DDL イベントを次の JSON 形式でエンコードします。

{ "version":1, "type":"ALTER", "sql":"ALTER TABLE `user` ADD COLUMN `createTime` TIMESTAMP", "commitTs":447987408682614795, "buildTs":1708936343598, "tableSchema":{ "schema":"simple", "table":"user", "tableID":148, "version":447987408682614791, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null }, { "name":"createTime", "dataType":{ "mysqlType":"timestamp", "charset":"binary", "collate":"binary", "length":19 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] }, "preTableSchema":{ "schema":"simple", "table":"user", "tableID":148, "version":447984074911121426, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] } }

上記の JSON データ内のフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
typeDDL イベント タイプ ( CREATERENAMECINDEXDINDEXERASETRUNCATEALTERQUERY )。
sqlDDL ステートメント。
commitTs番号DDL ステートメントの実行がアップストリームで完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
tableSchema物体テーブルの現在のスキーマ情報。詳細については、 TableSchema の定義参照してください。
preTableSchema物体DDL ステートメントが実行される前のテーブルのスキーマ情報。1 CREATEの DDL イベントを除くすべての DDL イベントにこのフィールドがあります。

DMML の

入れる

TiCDC は、 INSERTイベントを次の JSON 形式でエンコードします。

{ "version":1, "database":"simple", "table":"user", "tableID":148, "type":"INSERT", "commitTs":447984084414103554, "buildTs":1708923662983, "schemaVersion":447984074911121426, "data":{ "age":"25", "id":"1", "name":"John Doe", "score":"90.5" } }

上記の JSON データ内のフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
databaseデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
typeDML イベント タイプ ( INSERTUPDATEDELETEを含む)。
commitTs番号アップストリームで DML ステートメントの実行が完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
schemaVersion番号DML メッセージがエンコードされるときのテーブルのスキーマ バージョン番号。
data物体挿入されたデータ。フィールド名は列名、フィールド値は列値です。

INSERTイベントにはdataフィールドが含まれ、 oldフィールドは含まれません。

アップデート

TiCDC は、 UPDATEイベントを次の JSON 形式でエンコードします。

{ "version":1, "database":"simple", "table":"user", "tableID":148, "type":"UPDATE", "commitTs":447984099186180098, "buildTs":1708923719184, "schemaVersion":447984074911121426, "data":{ "age":"25", "id":"1", "name":"John Doe", "score":"95" }, "old":{ "age":"25", "id":"1", "name":"John Doe", "score":"90.5" } }

上記の JSON データ内のフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
databaseデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
typeDML イベント タイプ ( INSERTUPDATEDELETEを含む)。
commitTs番号アップストリームで DML ステートメントの実行が完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
schemaVersion番号DML メッセージがエンコードされるときのテーブルのスキーマ バージョン番号。
data物体更新後のデータ。フィールド名は列名、フィールド値は列値です。
old物体更新前のデータ。フィールド名は列名、フィールド値は列値です。

UPDATEイベントには、それぞれ更新後のデータと更新前のデータを表すdataフィールドとoldフィールドの両方が含まれます。

消去

TiCDC は、 DELETEイベントを次の JSON 形式でエンコードします。

{ "version":1, "database":"simple", "table":"user", "tableID":148, "type":"DELETE", "commitTs":447984114259722243, "buildTs":1708923776484, "schemaVersion":447984074911121426, "old":{ "age":"25", "id":"1", "name":"John Doe", "score":"95" } }

上記の JSON データ内のフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
databaseデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
typeDML イベント タイプ ( INSERTUPDATEDELETEを含む)。
commitTs番号アップストリームで DML ステートメントの実行が完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
schemaVersion番号DML メッセージがエンコードされるときのテーブルのスキーマ バージョン番号。
old物体削除されたデータ。フィールド名は列名、フィールド値は列値です。

DELETEイベントにはoldフィールドが含まれ、 dataフィールドは含まれません。

透かし

TiCDC は、 WATERMARKイベントを次の JSON 形式でエンコードします。

{ "version":1, "type":"WATERMARK", "commitTs":447984124732375041, "buildTs":1708923816911 }

上記の JSON データ内のフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
typeWATERMARKイベントタイプ。
commitTs番号WATERMARKのコミットタイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。

ブートストラップ

TiCDC は、 BOOTSTRAPイベントを次の JSON 形式でエンコードします。

{ "version":1, "type":"BOOTSTRAP", "commitTs":0, "buildTs":1708924603278, "tableSchema":{ "schema":"simple", "table":"new_user", "tableID":148, "version":447984074911121426, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] } }

上記の JSON データ内のフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
typeBOOTSTRAPイベントタイプ。
commitTs番号BOOTSTRAPのうちのcommitTs0です。これは TiCDC によって内部的に生成されるため、そのcommitTs意味がありません。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
tableSchema物体テーブルのスキーマ情報。詳細については、 TableSchema の定義参照してください。

メッセージの生成と送信ルール

DDL

  • 生成時間: TiCDC は、この DDL イベントの前のすべてのトランザクションが送信された後に DDL イベントを送信します。
  • 宛先: TiCDC は、対応するトピックのすべてのパーティションに DDL イベントを送信します。

DMML の

  • 生成時間: TiCDC はトランザクションのcommitTsの順序で DML イベントを送信します。
  • 宛先: TiCDC は、ユーザーが設定したディスパッチ ルールに従って、対応するトピックの対応するパーティションに DDL イベントを送信します。

透かし

  • 生成時間: TiCDC は、変更フィードのレプリケーションの進行状況を示すために、定期的にWATERMARKイベントを送信します。現在の間隔は 1 秒です。
  • 宛先: TiCDC は、対応するトピックのすべてのパーティションにWATERMARKイベントを送信します。

ブートストラップ

  • 生成時間:
    • 新しい変更フィードを作成した後、テーブルの最初の DML イベントが送信される前に、TiCDC はテーブル スキーマを構築するためにBOOTSTRAPイベントをダウンストリームに送信します。
    • さらに、TiCDC は、新しく参加したコンシューマーがテーブル スキーマを構築できるように、定期的にBOOTSTRAPイベントを送信します。デフォルトの間隔は 120 秒または 10000 メッセージごとです。7 sinksend-bootstrap-interval-in-secおよびsend-bootstrap-in-msg-countパラメータを構成することで、送信間隔を調整できます。
    • テーブルが 30 分以内に新しい DML メッセージを受信しない場合、テーブルは非アクティブであると見なされます。TiCDC は、新しい DML イベントが受信されるまで、テーブルへのBOOTSTRAPイベントの送信を停止します。
  • 送信先: デフォルトでは、TiCDC は対応するトピックのすべてのパーティションにBOOTSTRAPイベントを送信します。シンク構成でsend-bootstrap-to-all-partitionパラメータを構成することで、送信戦略を調整できます。

メッセージの消費方法

TiCDC Simple プロトコルは、DML メッセージを送信するときにテーブルのスキーマ情報を含まないため、ダウンストリームは DDL または BOOTSTRAP メッセージを受信し、DML メッセージを消費する前にテーブルのスキーマ情報をキャッシュする必要があります。ダウンストリームは、DML メッセージを受信すると、DML メッセージのtableの名前とschemaVersionフィールドを検索してキャッシュから対応するテーブル スキーマ情報を取得し、DML メッセージを正しく消費します。

以下では、ダウンストリームが DDL または BOOTSTRAP メッセージに基づいて DML メッセージを使用する方法について説明します。前述の説明によると、次の情報がわかっています。

  • 各 DML メッセージには、DML メッセージに対応するテーブルのスキーマ バージョン番号をマークするためのschemaVersionフィールドが含まれています。
  • 各 DDL メッセージには、DDL イベントの前後のテーブルのスキーマ情報をマークするためのtableSchemaフィールドとpreTableSchemaフィールドが含まれています。
  • 各 BOOTSTRAP メッセージには、BOOTSTRAP メッセージに対応するテーブルのスキーマ情報をマークするためのtableSchemaフィールドが含まれています。

消費方法は次の2つのシナリオで紹介されています。

シナリオ1: 消費者が最初から消費を始める

このシナリオでは、コンシューマーはテーブルの作成から消費を開始するため、テーブルのすべての DDL および BOOTSTRAP メッセージを受信できます。この場合、コンシューマーは DML メッセージのtableの名前とschemaVersionフィールドを通じてテーブルのスキーマ情報を取得できます。詳細なプロセスは次のとおりです。

TiCDC Simple Protocol consumer scene 1

シナリオ2: 消費者が中間層から消費を始める

新しいコンシューマーがコンシューマー グループに参加すると、途中から消費を開始する可能性があるため、テーブルの以前の DDL および BOOTSTRAP メッセージを見逃す可能性があります。この場合、コンシューマーはテーブルのスキーマ情報を取得する前にいくつかの DML メッセージを受信する可能性があります。したがって、コンシューマーはテーブルのスキーマ情報を取得するために DDL または BOOTSTRAP メッセージを受信するまで、一定期間待機する必要があります。TiCDC は定期的に BOOTSTRAP メッセージを送信するため、コンシューマーは常に一定期間内にテーブルのスキーマ情報を取得できます。詳細なプロセスは次のとおりです。

TiCDC Simple Protocol consumer scene 2

参照

TableSchema の定義

TableSchema は、テーブル名、テーブル ID、テーブル バージョン番号、列情報、インデックス情報など、テーブルのスキーマ情報を含む JSON オブジェクトです。JSON メッセージの形式は次のとおりです。

{ "schema":"simple", "table":"user", "tableID":148, "version":447984074911121426, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] }

上記の JSON データの説明は次のとおりです。

分野タイプ説明
schemaデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
version番号テーブルのスキーマ バージョン番号。
columns配列列名、データ型、null が可能かどうか、デフォルト値などの列情報。
indexes配列インデックス情報 (インデックス名、一意かどうか、主キーかどうか、インデックス列など)。

テーブル名とスキーマ バージョン番号によって、テーブルのスキーマ情報を一意に識別できます。

注記:

TiDB の実装上の制限により、 RENAME TABLE DDL 操作が実行されても、テーブルのスキーマ バージョン番号は変更されません。

カラムの定義

カラムは、列名、データ型、null が可能かどうか、デフォルト値など、列のスキーマ情報を含む JSON オブジェクトです。

{ "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }

上記の JSON データの説明は次のとおりです。

分野タイプ説明
name列の名前。
dataType物体MySQL データ型、文字セット、照合順序、フィールド長などのデータ型情報。
nullableブール列が null になるかどうか。
default列のデフォルト値。

インデックスの定義

インデックスは、インデックス名、インデックスが一意かどうか、主キーかどうか、インデックス列など、インデックスのスキーマ情報を含む JSON オブジェクトです。

{ "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] }

上記の JSON データの説明は次のとおりです。

分野タイプ説明
nameインデックスの名前。
uniqueブールインデックスが一意であるかどうか。
primaryブールインデックスが主キーであるかどうか。
nullableブールインデックスが null になるかどうか。
columns配列インデックスに含まれる列名。

mysqlType 参照テーブル

次の表は、TiCDC Simple プロトコルのmysqlTypeフィールドの値の範囲と、TiDB (Golang) および Avro (Java) でのそのタイプを示しています。DML メッセージを解析する必要がある場合は、使用するプロトコルと言語に応じて、この表と DML メッセージのmysqlTypeフィールドに従ってデータを正しく解析できます。

TiDB 型 (Golang) は、 TiDB および TiCDC (Golang) で処理されるときに対応するmysqlTypeの型を表します。Avro型 (Java) は、 Avro 形式のメッセージにエンコードされるときに対応するmysqlTypeの型を表します。

mysqlタイプ値の範囲TiDB 型 (Golang)Avro 型 (Java)
ちっちゃい[-128, 127]整数64長さ
tinyint 符号なし[0, 255]uint64長さ
小さい整数[-32768, 32767]整数64長さ
符号なし小整数[0, 65535]uint64長さ
中程度[-8388608, 8388607]整数64長さ
mediumint 符号なし[0, 16777215]uint64長さ
整数[-2147483648, 2147483647]整数64長さ
符号なし整数[0, 4294967295]uint64長さ
ビッグイント[-9223372036854775808、9223372036854775807]整数64長さ
bigint 符号なし[0, 9223372036854775807]uint64長さ
bigint 符号なし[9223372036854775808、18446744073709551615]uint64
フロート/浮動小数点数32フロート
ダブル/フロート64ダブル
小数点/
varchar/[]uint8
文字/[]uint8
varbinary/[]uint8バイト
バイナリ/[]uint8バイト
小さなテキスト/[]uint8
文章/[]uint8
中程度のテキスト/[]uint8
長文/[]uint8
小さな塊/[]uint8バイト
ブロブ/[]uint8バイト
中程度のブロブ/[]uint8バイト
ロングブロブ/[]uint8バイト
日付/
日時/
タイムスタンプ/
時間/
/整数64長さ
列挙型/uint64長さ
セット/uint64長さ
少し/uint64長さ
json/
ブール/整数64長さ

Avro スキーマ定義

Simple プロトコルは、Avro 形式でのメッセージの出力をサポートしています。Avro 形式の詳細については、 シンプルプロトコル Avro スキーマ参照してください。

このページは役に立ちましたか?