TiCDC シンプルプロトコル

TiCDC シンプル プロトコルは、監視、キャッシュ、フルテキスト インデックス、分析エンジン、異種データベース間のプライマリ/セカンダリ レプリケーション用のデータ ソースを提供する行レベルのデータ変更通知プロトコルです。このドキュメントでは、TiCDC シンプル プロトコルの使用法とデータ形式の実装について説明します。

TiCDCシンプルプロトコルを使用する

Kafka をダウンストリームとして使用する場合は、changefeed 構成でprotocol"simple"に指定します。すると、TiCDC は各行の変更または DDL イベントをメッセージとしてエンコードし、データ変更イベントをダウンストリームに送信します。

シンプル プロトコルを使用するための構成例は次のとおりです。

sink-uri構成:

--sink-uri = "kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0"

Changefeed 構成:

[sink] protocol = "simple" # The following configuration parameters control the sending behavior of bootstrap messages. # send-bootstrap-interval-in-sec controls the time interval for sending bootstrap messages, in seconds. # The default value is 120 seconds, which means that a bootstrap message is sent every 120 seconds for each table. send-bootstrap-interval-in-sec = 120 # send-bootstrap-in-msg-count controls the message interval for sending bootstrap, in message count. # The default value is 10000, which means that a bootstrap message is sent every 10000 row changed messages for each table. send-bootstrap-in-msg-count = 10000 # Note: If you want to disable the sending of bootstrap messages, set both send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count to 0. # send-bootstrap-to-all-partition controls whether to send bootstrap messages to all partitions. # The default value is true, which means that bootstrap messages are sent to all partitions of the corresponding table topic. # Setting it to false means bootstrap messages are sent to only the first partition of the corresponding table topic. send-bootstrap-to-all-partition = true [sink.kafka-config.codec-config] # encoding-format controls the encoding format of the Simple protocol messages. Currently, the Simple protocol message supports "json" and "avro" encoding formats. # The default value is "json". encoding-format = "json"

メッセージの種類

TiCDC シンプル プロトコルには、次のメッセージ タイプがあります。

DDL:

  • CREATE : テーブル作成イベント。
  • RENAME : テーブルの名前変更イベント。
  • CINDEX : インデックス作成イベント。
  • DINDEX : インデックス削除イベント。
  • ERASE : テーブル削除イベント。
  • TRUNCATE : テーブル切り捨てイベント。
  • ALTER : 列の追加、列の削除、列の種類の変更、および TiCDC でサポートされているその他のALTER TABLEつのステートメントを含む、テーブル変更イベント。
  • QUERY : その他の DDL イベント。

DM: いいえ

  • INSERT : 挿入イベント。
  • UPDATE : 更新イベント。
  • DELETE : 削除イベント。

他の:

  • WATERMARK : 上流 TiDB クラスターの TSO (つまり、64 ビットのタイムスタンプ) を含み、テーブル レプリケーションの進行状況を示します。ウォーターマークより前のすべてのイベントは下流に送信されています。
  • BOOTSTRAP : ダウンストリームのテーブル スキーマを構築するために使用されるテーブルのスキーマ情報が含まれます。

メッセージ形式

Simple プロトコルでは、各メッセージには 1 つのイベントのみが含まれます。Simple プロトコルは、JSON 形式と Avro 形式のメッセージのエンコードをサポートしています。このドキュメントでは、例として JSON 形式を使用します。Avro 形式のメッセージの場合、フィールドと意味は JSON 形式のメッセージと同じですが、エンコード形式が異なります。Avro 形式の詳細については、 シンプルプロトコル Avro スキーマ参照してください。

DDL

TiCDC は、DDL イベントを次の JSON 形式でエンコードします。

{ "version":1, "type":"ALTER", "sql":"ALTER TABLE `user` ADD COLUMN `createTime` TIMESTAMP", "commitTs":447987408682614795, "buildTs":1708936343598, "tableSchema":{ "schema":"simple", "table":"user", "tableID":148, "version":447987408682614791, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null }, { "name":"createTime", "dataType":{ "mysqlType":"timestamp", "charset":"binary", "collate":"binary", "length":19 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] }, "preTableSchema":{ "schema":"simple", "table":"user", "tableID":148, "version":447984074911121426, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] } }

上記の JSON データのフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
typeDDL イベント タイプCREATERENAMECINDEXDINDEXERASETRUNCATEALTERQUERY
sqlDDL ステートメント。
commitTs番号DDL ステートメントの実行がアップストリームで完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
tableSchema物体テーブルの現在のスキーマ情報。詳細については、 テーブルスキーマの定義参照してください。
preTableSchema物体DDL ステートメントが実行される前のテーブルのスキーマ情報。1 種類の DDL イベントを除くすべてCREATE DDL イベントにこのフィールドがあります。

DMML の

入れる

TiCEC は、 INSERTイベントを次の JSON 形式でエンコードします。

{ "version":1, "database":"simple", "table":"user", "tableID":148, "type":"INSERT", "commitTs":447984084414103554, "buildTs":1708923662983, "schemaVersion":447984074911121426, "data":{ "age":"25", "id":"1", "name":"John Doe", "score":"90.5" } }

上記の JSON データのフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
databaseデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
typeDML イベント タイプINSERTUPDATEDELETEを含む)。
commitTs番号アップストリームで DML ステートメントの実行が完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
schemaVersion番号DML メッセージがエンコードされるときのテーブルのスキーマ バージョン番号。
data物体挿入されたデータ。フィールド名は列名、フィールド値は列値です。

INSERTイベントにはdataフィールドが含まれ、 oldフィールドは含まれません。

アップデート

TiCDC は、 UPDATEイベントを次の JSON 形式でエンコードします。

{ "version":1, "database":"simple", "table":"user", "tableID":148, "type":"UPDATE", "commitTs":447984099186180098, "buildTs":1708923719184, "schemaVersion":447984074911121426, "data":{ "age":"25", "id":"1", "name":"John Doe", "score":"95" }, "old":{ "age":"25", "id":"1", "name":"John Doe", "score":"90.5" } }

上記の JSON データのフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
databaseデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
typeDML イベント タイプINSERTUPDATEDELETEを含む)。
commitTs番号アップストリームで DML ステートメントの実行が完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
schemaVersion番号DML メッセージがエンコードされるときのテーブルのスキーマ バージョン番号。
data物体更新後のデータ。フィールド名は列名、フィールド値は列値です。
old物体更新前のデータ。フィールド名は列名、フィールド値は列値です。

UPDATEイベントには、それぞれ更新後のデータと更新前のデータを表すdataフィールドとoldフィールドの両方が含まれます。

消去

TiCDC は、 DELETEイベントを次の JSON 形式でエンコードします。

{ "version":1, "database":"simple", "table":"user", "tableID":148, "type":"DELETE", "commitTs":447984114259722243, "buildTs":1708923776484, "schemaVersion":447984074911121426, "old":{ "age":"25", "id":"1", "name":"John Doe", "score":"95" } }

上記の JSON データのフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
databaseデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
typeDML イベント タイプINSERTUPDATEDELETEを含む)。
commitTs番号アップストリームで DML ステートメントの実行が完了したときのコミット タイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
schemaVersion番号DML メッセージがエンコードされるときのテーブルのスキーマ バージョン番号。
old物体削除されたデータ。フィールド名は列名、フィールド値は列値です。

DELETEイベントにはoldフィールドが含まれ、 dataフィールドは含まれません。

透かし

TiCDC は、 WATERMARKイベントを次の JSON 形式でエンコードします。

{ "version":1, "type":"WATERMARK", "commitTs":447984124732375041, "buildTs":1708923816911 }

上記の JSON データのフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
typeWATERMARKイベントタイプ。
commitTs番号WATERMARKのコミットタイムスタンプ。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。

ブートストラップ

TiCDC はBOOTSTRAPイベントを次の JSON 形式でエンコードします。

{ "version":1, "type":"BOOTSTRAP", "commitTs":0, "buildTs":1708924603278, "tableSchema":{ "schema":"simple", "table":"new_user", "tableID":148, "version":447984074911121426, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] } }

上記の JSON データのフィールドは次のように説明されます。

分野タイプ説明
version番号プロトコルのバージョン番号。現在は1です。
typeBOOTSTRAPイベントタイプ。
commitTs番号BOOTSTRAPのうちのcommitTs0です。これは TiCDC によって内部的に生成されるため、そのcommitTsは意味がありません。
buildTs番号TiCDC 内でメッセージが正常にエンコードされたときの UNIX タイムスタンプ。
tableSchema物体テーブルのスキーマ情報。詳細については、 テーブルスキーマの定義参照してください。

メッセージの生成と送信ルール

DDL

  • 生成時間: TiCDC は、この DDL イベントの前のすべてのトランザクションが送信された後に DDL イベントを送信します。
  • 宛先: TiCDC は、対応するトピックのすべてのパーティションに DDL イベントを送信します。

DMML の

  • 生成時間: TiCDC はトランザクションのcommitTsの順序で DML イベントを送信します。
  • 宛先: TiCDC は、ユーザーが設定したディスパッチ ルールに従って、対応するトピックの対応するパーティションに DDL イベントを送信します。

透かし

  • 生成時間: TiCDC は、変更フィードのレプリケーションの進行状況を示すために、定期的にWATERMARKイベントを送信します。現在の間隔は 1 秒です。
  • 宛先: TiCDC は、対応するトピックのすべてのパーティションにWATERMARKイベントを送信します。

ブートストラップ

  • 生成時間:
    • 新しい変更フィードを作成した後、テーブルの最初の DML イベントが送信される前に、TiCDC はテーブル スキーマを構築するためにBOOTSTRAPイベントをダウンストリームに送信します。
    • さらに、TiCDC は、新しく参加したコンシューマーがテーブル スキーマを構築できるように、 BOOTSTRAPイベントsink定期的に送信します。デフォルトの間隔は 120 秒または 10000 メッセージごとです。7 構成でsend-bootstrap-interval-in-secおよびsend-bootstrap-in-msg-countパラメータを構成することで、送信間隔を調整できます。
    • テーブルが 30 分以内に新しい DML メッセージを受信しない場合、テーブルは非アクティブであると見なされます。TiCDC は、新しい DML イベントが受信されるまで、テーブルへのBOOTSTRAPイベントの送信を停止します。
  • 送信先: デフォルトでは、TiCDC は対応するトピックのすべてのパーティションにBOOTSTRAPイベントを送信します。シンク構成でsend-bootstrap-to-all-partitionパラメータを構成することで、送信戦略を調整できます。

メッセージの消費方法

TiCDC Simple プロトコルは、DML メッセージを送信するときにテーブルのスキーマ情報を含まないため、ダウンストリームは DDL または BOOTSTRAP メッセージを受信し、DML メッセージを消費する前にテーブルのスキーマ情報をキャッシュする必要があります。ダウンストリームは、DML メッセージを受信すると、DML メッセージのtable名前とschemaVersionフィールドを検索してキャッシュから対応するテーブル スキーマ情報を取得し、DML メッセージを正しく消費します。

以下では、ダウンストリームが DDL または BOOTSTRAP メッセージに基づいて DML メッセージを使用する方法について説明します。前述の説明によると、次の情報がわかっています。

  • 各 DML メッセージには、DML メッセージに対応するテーブルのスキーマ バージョン番号をマークするためのschemaVersionフィールドが含まれています。
  • 各 DDL メッセージには、DDL イベントの前後のテーブルのスキーマ情報をマークするためのtableSchemaフィールドとpreTableSchemaフィールドが含まれています。
  • 各 BOOTSTRAP メッセージには、BOOTSTRAP メッセージに対応するテーブルのスキーマ情報をマークするためのtableSchemaフィールドが含まれています。

消費方法は次の2つのシナリオで紹介されています。

シナリオ1: 消費者が最初から消費を始める

このシナリオでは、コンシューマーはテーブルの作成から消費を開始するため、テーブルのすべての DDL および BOOTSTRAP メッセージを受信できます。この場合、コンシューマーは DML メッセージのtable名前とschemaVersionフィールドを通じてテーブルのスキーマ情報を取得できます。詳細なプロセスは次のとおりです。

TiCDC Simple Protocol consumer scene 1

シナリオ2: 消費者が中間層から消費を始める

新しいコンシューマーがコンシューマー グループに参加すると、途中から消費を開始する可能性があるため、テーブルの以前の DDL および BOOTSTRAP メッセージを見逃す可能性があります。この場合、コンシューマーはテーブルのスキーマ情報を取得する前にいくつかの DML メッセージを受信する可能性があります。したがって、コンシューマーはテーブルのスキーマ情報を取得するために DDL または BOOTSTRAP メッセージを受信するまで一定時間待機する必要があります。TiCDC は BOOTSTRAP メッセージを定期的に送信するため、コンシューマーは常に一定期間内にテーブルのスキーマ情報を取得できます。詳細なプロセスは次のとおりです。

TiCDC Simple Protocol consumer scene 2

参照

テーブルスキーマの定義

TableSchema は、テーブル名、テーブル ID、テーブル バージョン番号、列情報、インデックス情報など、テーブルのスキーマ情報を含む JSON オブジェクトです。JSON メッセージの形式は次のとおりです。

{ "schema":"simple", "table":"user", "tableID":148, "version":447984074911121426, "columns":[ { "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }, { "name":"name", "dataType":{ "mysqlType":"varchar", "charset":"utf8mb4", "collate":"utf8mb4_bin", "length":255 }, "nullable":true, "default":null }, { "name":"age", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":true, "default":null }, { "name":"score", "dataType":{ "mysqlType":"float", "charset":"binary", "collate":"binary", "length":12 }, "nullable":true, "default":null } ], "indexes":[ { "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] } ] }

上記の JSON データの説明は次のとおりです。

分野タイプ説明
schemaデータベースの名前。
tableテーブルの名前。
tableID番号テーブルの ID。
version番号テーブルのスキーマ バージョン番号。
columns配列列名、データ型、null が可能かどうか、デフォルト値などの列情報。
indexes配列インデックス名、インデックスが一意かどうか、インデックスが主キーかどうか、インデックス列などのインデックス情報。

テーブル名とスキーマ バージョン番号によって、テーブルのスキーマ情報を一意に識別できます。

注記:

TiDB の実装上の制限により、 RENAME TABLE DDL 操作が実行されても、テーブルのスキーマ バージョン番号は変更されません。

カラムの定義

カラムは、列名、データ型、null が可能かどうか、デフォルト値など、列のスキーマ情報を含む JSON オブジェクトです。

{ "name":"id", "dataType":{ "mysqlType":"int", "charset":"binary", "collate":"binary", "length":11 }, "nullable":false, "default":null }

上記の JSON データの説明は次のとおりです。

分野タイプ説明
name列の名前。
dataType物体MySQL データ型、文字セット、照合順序、フィールド長などのデータ型情報。
nullableブール列が null になるかどうか。
default列のデフォルト値。

インデックスの定義

インデックスは、インデックス名、インデックスが一意かどうか、主キーであるかどうか、インデックス列など、インデックスのスキーマ情報を含む JSON オブジェクトです。

{ "name":"primary", "unique":true, "primary":true, "nullable":false, "columns":[ "id" ] }

上記の JSON データの説明は次のとおりです。

分野タイプ説明
nameインデックスの名前。
uniqueブールインデックスが一意であるかどうか。
primaryブールインデックスが主キーであるかどうか。
nullableブールインデックスが null になるかどうか。
columns配列インデックスに含まれる列名。

mysqlType 参照テーブル

次の表は、TiCDC Simple プロトコルのmysqlTypeフィールドの値の範囲と、TiDB (Golang) および Avro (Java) でのその型を示しています。DML メッセージを解析する必要がある場合は、使用するプロトコルと言語に応じて、この表と DML メッセージのmysqlTypeフィールドに従ってデータを正しく解析できます。

TiDB 型 (Golang) は、 TiDB および TiCDC (Golang) で処理されるときに対応するmysqlTypeの型を表します。Avro型 (Java) は、 Avro 形式のメッセージにエンコードされるときに対応するmysqlTypeの型を表します。

mysqlタイプ値の範囲TiDB タイプ (Golang)Avro 型 (Java)
ちっちゃい[-128, 127]整数64長さ
tinyint 符号なし[0, 255]uint64長さ
小さい整数[-32768, 32767]整数64長さ
符号なし小整数[0, 65535]uint64長さ
中程度[-8388608, 8388607]整数64長さ
mediumint 符号なし[0, 16777215]uint64長さ
整数[-2147483648, 2147483647]整数64長さ
符号なし整数[0, 4294967295]uint64長さ
ビッグイント[-9223372036854775808, 9223372036854775807]整数64長さ
bigint 符号なし[0, 9223372036854775807]uint64長さ
bigint 符号なし[9223372036854775808, 18446744073709551615]uint64
浮く/浮動小数点数32浮く
ダブル/フロート64ダブル
小数点/
varchar/[]uint8
文字/[]uint8
varbinary/[]uint8バイト
バイナリ/[]uint8バイト
小さなテキスト/[]uint8
文章/[]uint8
中テキスト/[]uint8
長文/[]uint8
小さな塊/[]uint8バイト
ブロブ/[]uint8バイト
中くらいの塊/[]uint8バイト
ロングブロブ/[]uint8バイト
日付/
日付時刻/
タイムスタンプ/
時間/
/整数64長さ
列挙型/uint64長さ
セット/uint64長さ
少し/uint64長さ
json/
ブール/整数64長さ

Avro スキーマ定義

Simple プロトコルは、Avro 形式でのメッセージの出力をサポートしています。Avro 形式の詳細については、 シンプルプロトコル Avro スキーマを参照してください。

このページは役に立ちましたか?