TiCDC シンプルプロトコル
v8.0.0 以降、TiCDC は Simple プロトコルをサポートします。このドキュメントでは、TiCDC Simple プロトコルの使用方法とデータ形式の実装について説明します。
TiCDCシンプルプロトコルを使用する
Kafka をダウンストリームとして使用する場合は、changefeed 構成でprotocolを"simple"に指定します。すると、TiCDC は各行の変更または DDL イベントをメッセージとしてエンコードし、データ変更イベントをダウンストリームに送信します。
シンプル プロトコルを使用するための構成例は次のとおりです。
sink-uri構成:
--sink-uri = "kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0"
Changefeed 構成:
[sink]
protocol = "simple"
# The following configuration parameters control the sending behavior of bootstrap messages.
# send-bootstrap-interval-in-sec controls the time interval for sending bootstrap messages, in seconds.
# The default value is 120 seconds, which means that a bootstrap message is sent every 120 seconds for each table.
send-bootstrap-interval-in-sec = 120
# send-bootstrap-in-msg-count controls the message interval for sending bootstrap, in message count.
# The default value is 10000, which means that a bootstrap message is sent every 10000 row changed messages for each table.
send-bootstrap-in-msg-count = 10000
# Note: If you want to disable the sending of bootstrap messages, set both send-bootstrap-interval-in-sec and send-bootstrap-in-msg-count to 0.
# send-bootstrap-to-all-partition controls whether to send bootstrap messages to all partitions.
# The default value is true, which means that bootstrap messages are sent to all partitions of the corresponding table topic.
# Setting it to false means bootstrap messages are sent to only the first partition of the corresponding table topic.
send-bootstrap-to-all-partition = true
[sink.kafka-config.codec-config]
# encoding-format controls the encoding format of the Simple protocol messages. Currently, the Simple protocol message supports "json" and "avro" encoding formats.
# The default value is "json".
encoding-format = "json"
メッセージの種類
TiCDC シンプル プロトコルには、次のメッセージ タイプがあります。
DDL:
CREATE: テーブル作成イベント。RENAME: テーブルの名前変更イベント。CINDEX: インデックス作成イベント。DINDEX: インデックス削除イベント。ERASE: テーブル削除イベント。TRUNCATE: テーブル切り捨てイベント。ALTER: 列の追加、列の削除、列タイプの変更、および TiCDC でサポートされているその他のALTER TABLEステートメントを含む、テーブル変更イベント。QUERY: その他の DDL イベント。
DM: いいえ
INSERT: 挿入イベント。UPDATE: 更新イベント。DELETE: 削除イベント。
他の:
WATERMARK: 上流 TiDB クラスターの TSO (つまり、64 ビットのタイムスタンプ) を含み、テーブル レプリケーションの進行状況を示します。ウォーターマークより前のすべてのイベントは下流に送信されています。BOOTSTRAP: ダウンストリームのテーブル スキーマを構築するために使用されるテーブルのスキーマ情報が含まれます。
メッセージ形式
Simple プロトコルでは、各メッセージには 1 つのイベントのみが含まれます。Simple プロトコルは、JSON 形式と Avro 形式のメッセージのエンコードをサポートしています。このドキュメントでは、例として JSON 形式を使用します。Avro 形式のメッセージの場合、フィールドと意味は JSON 形式のメッセージと同じですが、エンコード形式が異なります。Avro 形式の詳細については、 シンプルプロトコル Avro スキーマ参照してください。
DDL
TiCDC は、DDL イベントを次の JSON 形式でエンコードします。
{
"version":1,
"type":"ALTER",
"sql":"ALTER TABLE `user` ADD COLUMN `createTime` TIMESTAMP",
"commitTs":447987408682614795,
"buildTs":1708936343598,
"tableSchema":{
"schema":"simple",
"table":"user",
"tableID":148,
"version":447987408682614791,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
},
{
"name":"createTime",
"dataType":{
"mysqlType":"timestamp",
"charset":"binary",
"collate":"binary",
"length":19
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
},
"preTableSchema":{
"schema":"simple",
"table":"user",
"tableID":148,
"version":447984074911121426,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
}
}
The fields in the preceding JSON data are explained as follows:
DMML の
入れる
TiCDC はINSERTイベントを次の JSON 形式でエンコードします。
{
"version":1,
"database":"simple",
"table":"user",
"tableID":148,
"type":"INSERT",
"commitTs":447984084414103554,
"buildTs":1708923662983,
"schemaVersion":447984074911121426,
"data":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"90.5"
}
}
上記の JSON データのフィールドは次のように説明されます。
INSERTイベントにはdataフィールドが含まれ、 oldフィールドは含まれません。
アップデート
TiCDC はUPDATEイベントを次の JSON 形式でエンコードします。
{
"version":1,
"database":"simple",
"table":"user",
"tableID":148,
"type":"UPDATE",
"commitTs":447984099186180098,
"buildTs":1708923719184,
"schemaVersion":447984074911121426,
"data":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"95"
},
"old":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"90.5"
}
}
上記の JSON データのフィールドは次のように説明されます。
UPDATEイベントには、それぞれ更新後のデータと更新前のデータを表すdataフィールドとoldフィールドの両方が含まれます。
消去
TiCDC はDELETEイベントを次の JSON 形式でエンコードします。
{
"version":1,
"database":"simple",
"table":"user",
"tableID":148,
"type":"DELETE",
"commitTs":447984114259722243,
"buildTs":1708923776484,
"schemaVersion":447984074911121426,
"old":{
"age":"25",
"id":"1",
"name":"John Doe",
"score":"95"
}
}
上記の JSON データのフィールドは次のように説明されます。
DELETEイベントにはoldフィールドが含まれ、 dataフィールドは含まれません。
透かし
TiCDC encodes a WATERMARK event in the following JSON format:
{
"version":1,
"type":"WATERMARK",
"commitTs":447984124732375041,
"buildTs":1708923816911
}
上記の JSON データのフィールドは次のように説明されます。
ブートストラップ
TiCDC はBOOTSTRAPイベントを次の JSON 形式でエンコードします。
{
"version":1,
"type":"BOOTSTRAP",
"commitTs":0,
"buildTs":1708924603278,
"tableSchema":{
"schema":"simple",
"table":"new_user",
"tableID":148,
"version":447984074911121426,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
}
}
上記の JSON データのフィールドは次のように説明されます。
メッセージの生成と送信ルール
DDL
- 生成時間: TiCDC は、この DDL イベントの前のすべてのトランザクションが送信された後に DDL イベントを送信します。
- 宛先: TiCDC は、対応するトピックのすべてのパーティションに DDL イベントを送信します。
DMML の
- 生成時間: TiCDC はトランザクションの
commitTs順序で DML イベントを送信します。 - 宛先: TiCDC は、ユーザーが設定したディスパッチ ルールに従って、対応するトピックの対応するパーティションに DDL イベントを送信します。
透かし
- 生成時間: TiCDC は、変更フィードのレプリケーションの進行状況を示すために、定期的に
WATERMARKイベントを送信します。現在の間隔は 1 秒です。 - 宛先: TiCDC は、対応するトピックのすべてのパーティションに
WATERMARKイベントを送信します。
ブートストラップ
- 生成時間:
- 新しい変更フィードを作成した後、テーブルの最初の DML イベントが送信される前に、TiCDC はテーブル スキーマを構築するために
BOOTSTRAPイベントをダウンストリームに送信します。 - さらに、TiCDC は、新しく参加したコンシューマーがテーブル スキーマ
sink構築できるように、BOOTSTRAPイベントを定期的に送信します。デフォルトの間隔は 120 秒または 10000 メッセージごとです。7 構成でsend-bootstrap-interval-in-secおよびsend-bootstrap-in-msg-countパラメータを構成することで、送信間隔を調整できます。 - テーブルが 30 分以内に新しい DML メッセージを受信しない場合、テーブルは非アクティブであると見なされます。TiCDC は、新しい DML イベントが受信されるまで、テーブルへの
BOOTSTRAPイベントの送信を停止します。
- 新しい変更フィードを作成した後、テーブルの最初の DML イベントが送信される前に、TiCDC はテーブル スキーマを構築するために
- 送信先: デフォルトでは、TiCDC は対応するトピックのすべてのパーティションに
BOOTSTRAPイベントを送信します。シンク構成でsend-bootstrap-to-all-partitionパラメータを構成することで、送信戦略を調整できます。
メッセージの消費方法
TiCDC Simple プロトコルは、DML メッセージを送信するときにテーブルのスキーマ情報を含まないため、ダウンストリームは DDL または BOOTSTRAP メッセージを受信し、DML メッセージを消費する前にテーブルのスキーマ情報をキャッシュする必要があります。ダウンストリームは、DML メッセージを受信すると、DML メッセージのtable名前とschemaVersionフィールドを検索してキャッシュから対応するテーブル スキーマ情報を取得し、DML メッセージを正しく消費します。
以下では、ダウンストリームが DDL または BOOTSTRAP メッセージに基づいて DML メッセージを使用する方法について説明します。前述の説明によると、次の情報がわかっています。
- 各 DML メッセージには、DML メッセージに対応するテーブルのスキーマ バージョン番号をマークするための
schemaVersionフィールドが含まれています。 - 各 DDL メッセージには、DDL イベントの前後のテーブルのスキーマ情報をマークするための
tableSchemaフィールドとpreTableSchemaフィールドが含まれています。 - 各 BOOTSTRAP メッセージには、BOOTSTRAP メッセージに対応するテーブルのスキーマ情報をマークするための
tableSchemaフィールドが含まれています。
消費方法は次の2つのシナリオで紹介されています。
シナリオ1: 消費者が最初から消費を始める
このシナリオでは、コンシューマーはテーブルの作成から消費を開始するため、テーブルのすべての DDL および BOOTSTRAP メッセージを受信できます。この場合、コンシューマーは DML メッセージのtable名前とschemaVersionフィールドを通じてテーブルのスキーマ情報を取得できます。詳細なプロセスは次のとおりです。
シナリオ2: 消費者が中間層から消費を始める
新しいコンシューマーがコンシューマー グループに参加すると、途中から消費を開始する可能性があるため、テーブルの以前の DDL および BOOTSTRAP メッセージを見逃す可能性があります。この場合、コンシューマーはテーブルのスキーマ情報を取得する前にいくつかの DML メッセージを受信する可能性があります。したがって、コンシューマーはテーブルのスキーマ情報を取得するために DDL または BOOTSTRAP メッセージを受信するまで、一定期間待機する必要があります。TiCDC は BOOTSTRAP メッセージを定期的に送信するため、コンシューマーは常に一定期間内にテーブルのスキーマ情報を取得できます。詳細なプロセスは次のとおりです。
参照
テーブルスキーマの定義
TableSchema は、テーブル名、テーブル ID、テーブル バージョン番号、列情報、インデックス情報など、テーブルのスキーマ情報を含む JSON オブジェクトです。JSON メッセージの形式は次のとおりです。
{
"schema":"simple",
"table":"user",
"tableID":148,
"version":447984074911121426,
"columns":[
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
},
{
"name":"name",
"dataType":{
"mysqlType":"varchar",
"charset":"utf8mb4",
"collate":"utf8mb4_bin",
"length":255
},
"nullable":true,
"default":null
},
{
"name":"age",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":true,
"default":null
},
{
"name":"score",
"dataType":{
"mysqlType":"float",
"charset":"binary",
"collate":"binary",
"length":12
},
"nullable":true,
"default":null
}
],
"indexes":[
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
]
}
上記の JSON データの説明は次のとおりです。
テーブル名とスキーマ バージョン番号によって、テーブルのスキーマ情報を一意に識別できます。
注記:
TiDB の実装上の制限により、
RENAME TABLEDDL 操作が実行されても、テーブルのスキーマ バージョン番号は変更されません。
カラムの定義
カラムは、列名、データ型、null が可能かどうか、デフォルト値など、列のスキーマ情報を含む JSON オブジェクトです。
{
"name":"id",
"dataType":{
"mysqlType":"int",
"charset":"binary",
"collate":"binary",
"length":11
},
"nullable":false,
"default":null
}
上記の JSON データの説明は次のとおりです。
インデックスの定義
インデックスは、インデックス名、インデックスが一意かどうか、主キーかどうか、インデックス列など、インデックスのスキーマ情報を含む JSON オブジェクトです。
{
"name":"primary",
"unique":true,
"primary":true,
"nullable":false,
"columns":[
"id"
]
}
上記の JSON データの説明は次のとおりです。
mysqlType 参照テーブル
次の表は、TiCDC Simple プロトコルのmysqlTypeフィールドの値の範囲と、TiDB (Golang) および Avro (Java) でのそのタイプを示しています。DML メッセージを解析する必要がある場合は、使用するプロトコルと言語に応じて、この表と DML メッセージのmysqlTypeフィールドに従ってデータを正しく解析できます。
TiDB 型 (Golang) は、 TiDB および TiCDC (Golang) で処理されるときに対応するmysqlTypeの型を表します。Avro型 (Java) は、 Avro 形式のメッセージにエンコードされるときに対応するmysqlTypeの型を表します。
Avro スキーマ定義
Simple プロトコルは、Avro 形式でのメッセージの出力をサポートしています。Avro 形式の詳細については、 シンプルプロトコル Avro スキーマ参照してください。

