TiCDC Canal- JSON プロトコル
Canal-JSON は、 アリババ運河で定義されたデータ交換形式のプロトコルです。このドキュメントでは、TiDB 拡張フィールド、Canal-JSON データ形式の定義、公式の Canal との比較など、Canal-JSON データ形式が TiCDC でどのように実装されているかを学ぶことができます。
Canal-JSON を使用する
Message Queue (MQ) をダウンストリーム Sink として使用する場合、 sink-uriで Canal-JSON を指定できます。 TiCDC は Event を基本単位として Canal-JSON メッセージをラップして構築し、TiDB のデータ変更イベントを下流に送信します。
イベントには次の 3 種類があります。
- DDL イベント: DDL 変更レコードを表します。上流の DDL ステートメントが正常に実行された後に送信されます。 DDL イベントは、インデックスが 0 の MQ パーティションに送信されます。
- DML イベント: 行データ変更レコードを表します。このタイプのイベントは、行の変更が発生したときに送信されます。変更が発生した後の行に関する情報が含まれています。
- WATERMARK イベント: 特別な時点を表します。これは、この時点より前に受信したイベントが完了したことを示します。 TiDB 拡張フィールドのみに適用され、
sink-uriでenable-tidb-extensionからtrue設定すると有効になります。
以下はCanal-JSONの使用例です。
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-canal-json" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json"
TiDB 拡張フィールド
Canal-JSON プロトコルは、もともと MySQL 用に設計されました。 CommitTS トランザクションの TiDB 固有の一意の識別子などの重要なフィールドは含まれていません。この問題を解決するために、TiCDC は TiDB 拡張フィールドを Canal-JSON プロトコル形式に追加します。 sink-uriでenable-tidb-extensionからtrue (デフォルトではfalse ) を設定すると、TiCDC は Canal-JSON メッセージを生成するときに次のように動作します。
- TiCDC は、
_tidbという名前のフィールドを含む DML イベントおよび DDL イベント メッセージを送信します。 - TiCDC は WATERMARK イベント メッセージを送信します。
次に例を示します。
cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-canal-json-enable-tidb-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json&enable-tidb-extension=true"
メッセージ形式の定義
このセクションでは、DDL イベント、DML イベント、および WATERMARK イベントの形式と、データがコンシューマー側でどのように解決されるかについて説明します。
DDL イベント
TiCDC は、DDL イベントを次の Canal-JSON 形式にエンコードします。
{
"id": 0,
"database": "test",
"table": "",
"pkNames": null,
"isDdl": true,
"type": "QUERY",
"es": 1639633094670,
"ts": 1639633095489,
"sql": "drop database if exists test",
"sqlType": null,
"mysqlType": null,
"data": null,
"old": null,
"_tidb": { // TiDB extension field
"commitTs": 163963309467037594
}
}
フィールドは次のように説明されています。
| 分野 | タイプ | 説明 |
|---|---|---|
| ID | 番号 | TiCDC のデフォルト値は 0 です。 |
| データベース | 弦 | 行が配置されているデータベースの名前 |
| テーブル | 弦 | 行が配置されているテーブルの名前 |
| pkNames | 配列 | 主キーを構成するすべての列の名前 |
| isDdl | ブール | メッセージが DDL イベントかどうか |
| タイプ | 弦 | Canal-JSON で定義されたイベント タイプ |
| エス | 番号 | メッセージを生成したイベントが発生したときの 13 ビット (ミリ秒) のタイムスタンプ |
| TS | 番号 | TiCDC がメッセージを生成したときの 13 ビット (ミリ秒) のタイムスタンプ |
| SQL | 弦 | isDdl がtrueの場合、対応する DDL ステートメントを記録します |
| sqlType | 物体 | isDdl がfalseの場合、各列のデータ型がJavaでどのように表現されるかを記録します |
| mysql タイプ | 物体 | isDdl がfalseの場合、各列のデータ型が MySQL でどのように表現されるかを記録します |
| データ | 物体 | isDdl がfalseの場合、各列の名前とそのデータ値を記録します |
| 年 | 物体 | メッセージが更新イベントによって生成された場合のみ、各列の名前と更新前のデータ値を記録します |
| _tidb | 物体 | TiDB 拡張フィールド。 enable-tidb-extension ~ trueを設定した場合にのみ存在します。値commitTsは、行の変更の原因となったトランザクションの TSO です。 |
DML イベント
TiCDC は、DML データ変更イベントの行を次のようにエンコードします。
{
"id": 0,
"database": "test",
"table": "tp_int",
"pkNames": [
"id"
],
"isDdl": false,
"type": "INSERT",
"es": 1639633141221,
"ts": 1639633142960,
"sql": "",
"sqlType": {
"c_bigint": -5,
"c_int": 4,
"c_mediumint": 4,
"c_smallint": 5,
"c_tinyint": -6,
"id": 4
},
"mysqlType": {
"c_bigint": "bigint",
"c_int": "int",
"c_mediumint": "mediumint",
"c_smallint": "smallint",
"c_tinyint": "tinyint",
"id": "int"
},
"data": [
{
"c_bigint": "9223372036854775807",
"c_int": "2147483647",
"c_mediumint": "8388607",
"c_smallint": "32767",
"c_tinyint": "127",
"id": "2"
}
],
"old": null,
"_tidb": { // TiDB extension field
"commitTs": 163963314122145239
}
}
ウォーターマーク イベント
TiCDC は、 enable-tidb-extensionからtrueを設定した場合にのみ WATERMARK イベントを送信します。 typeフィールドの値はTIDB_WATERMARKです。 Event には_tidbフィールドが含まれており、このフィールドにはパラメーターwatermarkTsが 1 つだけ含まれています。値watermarkTsは、イベントが送信されたときに記録される TSO です。
このタイプのイベントを受信すると、 commitTsがwatermarkTs未満のすべてのイベントが送信されています。 TiCDC は「少なくとも 1 回」のセマンティクスを提供するため、データが繰り返し送信される可能性があります。 commitTsがwatermarkTs未満の後続のイベントを受信した場合、このイベントを安全に無視できます。
以下は WATERMARK イベントの例です。
{
"id": 0,
"database": "",
"table": "",
"pkNames": null,
"isDdl": false,
"type": "TIDB_WATERMARK",
"es": 1640007049196,
"ts": 1640007050284,
"sql": "",
"sqlType": null,
"mysqlType": null,
"data": null,
"old": null,
"_tidb": { // TiDB extension field
"watermarkTs": 429918007904436226
}
}
消費者側のデータ解決
上記の例からわかるように、Canal-JSON には統一されたデータ形式があり、さまざまなイベント タイプに対してさまざまなフィールド入力規則があります。統一された方法を使用してこの JSON 形式のデータを解決し、フィールド値を確認してイベント タイプを判断できます。
isDdlがtrueの場合、メッセージには DDL イベントが含まれます。isDdlがfalse場合、さらにtypeフィールドを確認する必要があります。typeがTIDB_WATERMARKの場合、それは WATERMARK イベントです。それ以外の場合は、DML イベントです。
フィールドの説明
Canal-JSON 形式では、対応するデータ型がmysqlTypeフィールドとsqlTypeフィールドに記録されます。
MySQL タイプ フィールド
mysqlTypeフィールドには、Canal-JSON 形式で各列に MySQL Type の文字列が記録されます。詳細については、 TiDB データ型を参照してください。
SQL タイプ フィールド
sqlTypeフィールドには、Canal-JSON 形式で各列のJava SQL Type が記録されます。これは、JDBC のデータに対応するデータ型です。その値は、MySQL タイプと特定のデータ値によって計算できます。マッピングは次のとおりです。
| MySQL タイプ | Java SQL タイプ コード |
|---|---|
| ブール値 | -6 |
| 浮く | 7 |
| ダブル | 8 |
| 小数 | 3 |
| シャア | 1 |
| ヴァルチャー | 12 |
| バイナリ | 2004年 |
| バーバイナリ | 2004年 |
| タイニーテキスト | 2005年 |
| 文章 | 2005年 |
| ミディアムテキスト | 2005年 |
| 長文 | 2005年 |
| タイニブロブ | 2004年 |
| ブロブ | 2004年 |
| ミディアムブロブ | 2004年 |
| ロングブロブ | 2004年 |
| 日にち | 91 |
| 日付時刻 | 93 |
| タイムスタンプ | 93 |
| 時間 | 92 |
| 年 | 12 |
| 列挙型 | 4 |
| 設定 | -7 |
| 少し | -7 |
| JSON | 12 |
整数型
次の表に示すように、 整数型にUnsigned制約と値のサイズがあるかどうかを考慮する必要があります。これは、それぞれ異なるJava SQL 型コードに対応しています。
| MySQL タイプ文字列 | 値の範囲 | Java SQL タイプ コード |
|---|---|---|
| tinyint | [-128、127] | -6 |
| tinyint unsigned | [0, 127] | -6 |
| tinyint unsigned | [128、255] | 5 |
| 小さい整数 | [-32768、32767] | 5 |
| smallint unsigned | [0, 32767] | 5 |
| smallint unsigned | [32768, 65535] | 4 |
| mediumint | [-8388608、8388607] | 4 |
| mediumint 符号なし | [0、8388607] | 4 |
| mediumint 符号なし | [8388608、16777215] | 4 |
| 整数 | [-2147483648、2147483647] | 4 |
| int unsigned | [0, 2147483647] | 4 |
| int unsigned | [2147483648、4294967295] | -5 |
| bigint | [-9223372036854775808、9223372036854775807] | -5 |
| bigint 署名なし | [0, 9223372036854775807] | -5 |
| bigint 署名なし | [9223372036854775808、18446744073709551615] | 3 |
次の表は、TiCDC のJava SQL 型とそのコードの間のマッピング関係を示しています。
| Java SQL タイプ | Java SQL タイプ コード |
|---|---|
| CHAR | 1 |
| 小数 | 3 |
| 整数 | 4 |
| SMALLINT | 5 |
| 本物 | 7 |
| ダブル | 8 |
| VARCHAR | 12 |
| 日にち | 91 |
| 時間 | 92 |
| タイムスタンプ | 93 |
| BLOB | 2004年 |
| CLOB | 2005年 |
| BIGINT | -5 |
| TINYINT | -6 |
| 少し | -7 |
Java SQL 型の詳細については、 Java SQL クラス タイプを参照してください。
TiCDC Canal-JSON と公式 Canal の比較
TiCDC が Canal-JSON データ形式を実装する方法 ( UpdateイベントとmysqlTypeフィールドを含む) は、公式の Canal とは異なります。次の表に、主な違いを示します。
| アイテム | TiCDC Canal-JSON | 運河 |
|---|---|---|
Update種類のイベント | oldフィールドにはすべての列データが含まれます | oldフィールドには、変更された列データのみが含まれます |
mysqlTypeフィールド | パラメーターを持つ型の場合、型パラメーターの情報は含まれません | パラメーターを持つ型の場合、型パラメーターの完全な情報が含まれます |
Update型のイベント
Updateタイプのイベントの場合:
- TiCDC では、
oldフィールドにすべての列データが含まれます。 - 公式の Canal では、
oldフィールドには変更された列データのみが含まれます。
次の SQL ステートメントが上流の TiDB で順次実行されるとします。
create table tp_int
(
id int auto_increment,
c_tinyint tinyint null,
c_smallint smallint null,
c_mediumint mediumint null,
c_int int null,
c_bigint bigint null,
constraint pk
primary key (id)
);
insert into tp_int(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (127, 32767, 8388607, 2147483647, 9223372036854775807);
update tp_int set c_int = 0, c_tinyint = 0 where c_smallint = 32767;
updateステートメントの場合、以下に示すように、TiCDC はtypeをUPDATEとしてイベント メッセージを出力します。 updateステートメントは、 c_intとc_tinyint列のみを変更します。出力イベント メッセージのoldフィールドには、すべての列データが含まれます。
{
"id": 0,
...
"type": "UPDATE",
...
"sqlType": {
...
},
"mysqlType": {
...
},
"data": [
{
"c_bigint": "9223372036854775807",
"c_int": "0",
"c_mediumint": "8388607",
"c_smallint": "32767",
"c_tinyint": "0",
"id": "2"
}
],
"old": [ // In TiCDC, this field contains all the column data.
{
"c_bigint": "9223372036854775807",
"c_int": "2147483647", // Modified column
"c_mediumint": "8388607",
"c_smallint": "32767",
"c_tinyint": "127", // Modified column
"id": "2"
}
]
}
公式の Canal の場合、出力イベント メッセージのoldフィールドには、以下に示すように、変更された列データのみが含まれます。
{
"id": 0,
...
"type": "UPDATE",
...
"sqlType": {
...
},
"mysqlType": {
...
},
"data": [
{
"c_bigint": "9223372036854775807",
"c_int": "0",
"c_mediumint": "8388607",
"c_smallint": "32767",
"c_tinyint": "0",
"id": "2"
}
],
"old": [ // In Canal, this field contains only the modified column data.
{
"c_int": "2147483647", // Modified column
"c_tinyint": "127", // Modified column
}
]
}
mysqlTypeフィールド
mysqlTypeフィールドの場合、型にパラメーターが含まれている場合、公式の Canal には型パラメーターの完全な情報が含まれています。 TiCDC にはそのような情報は含まれていません。
次の例では、テーブル定義 SQL ステートメントに、 decimal 、 char 、 varchar 、およびenumなど、各列のパラメーターが含まれています。 TiCDC と公式の Canal によって生成された Canal-JSON 形式を比較すると、TiCDC にはmysqlTypeフィールドに基本的な MySQL 情報しか含まれていないことがわかります。型パラメーターの完全な情報が必要な場合は、別の方法で実装する必要があります。
次の SQL ステートメントが上流の TiDB で順次実行されるとします。
create table t (
id int auto_increment,
c_decimal decimal(10, 4) null,
c_char char(16) null,
c_varchar varchar(16) null,
c_binary binary(16) null,
c_varbinary varbinary(16) null,
c_enum enum('a','b','c') null,
c_set set('a','b','c') null,
c_bit bit(64) null,
constraint pk
primary key (id)
);
insert into t (c_decimal, c_char, c_varchar, c_binary, c_varbinary, c_enum, c_set, c_bit)
values (123.456, "abc", "abc", "abc", "abc", 'a', 'a,b', b'1000001');
TiCDC の出力は次のとおりです。
{
"id": 0,
...
"isDdl": false,
"sqlType": {
...
},
"mysqlType": {
"c_binary": "binary",
"c_bit": "bit",
"c_char": "char",
"c_decimal": "decimal",
"c_enum": "enum",
"c_set": "set",
"c_varbinary": "varbinary",
"c_varchar": "varchar",
"id": "int"
},
"data": [
{
...
}
],
"old": null,
}
公式 Canal の出力は次のとおりです。
{
"id": 0,
...
"isDdl": false,
"sqlType": {
...
},
"mysqlType": {
"c_binary": "binary(16)",
"c_bit": "bit(64)",
"c_char": "char(16)",
"c_decimal": "decimal(10, 4)",
"c_enum": "enum('a','b','c')",
"c_set": "set('a','b','c')",
"c_varbinary": "varbinary(16)",
"c_varchar": "varchar(16)",
"id": "int"
},
"data": [
{
...
}
],
"old": null,
}
TiCDC Canalの変更 -JSON
DeleteイベントのOldフィールドの変更
v5.4.0 から、 Deleteのイベントのoldフィールドが変更されました。
以下は、 Deleteイベント メッセージです。 v5.4.0 より前では、 oldフィールドには「データ」フィールドと同じ内容が含まれています。 v5.4.0 以降のバージョンでは、 oldフィールドは null に設定されます。 「データ」フィールドを使用して、削除されたデータを取得できます。
{
"id": 0,
"database": "test",
...
"type": "DELETE",
...
"sqlType": {
...
},
"mysqlType": {
...
},
"data": [
{
"c_bigint": "9223372036854775807",
"c_int": "0",
"c_mediumint": "8388607",
"c_smallint": "32767",
"c_tinyint": "0",
"id": "2"
}
],
"old": null,
// The following is an example before v5.4.0. The `old` field contains the same content as the "data" field.
"old": [
{
"c_bigint": "9223372036854775807",
"c_int": "0",
"c_mediumint": "8388607",
"c_smallint": "32767",
"c_tinyint": "0",
"id": "2"
}
]
}