TiCDCオープンプロトコル
TiCDCオープンプロトコルは、行レベルのデータ変更通知プロトコルであり、監視、キャッシュ、全文インデックス作成、分析エンジン、そして異なるデータベース間のプライマリ-セカンダリレプリケーションのためのデータソースを提供します。TiCDCはTiCDCオープンプロトコルに準拠しており、TiDBのデータ変更をMQ(メッセージキュー)などのサードパーティのデータメディアに複製します。
TiCDCオープンプロトコルは、データ変更イベントを下流に複製するための基本単位としてイベントを使用します。イベントは以下の3つのカテゴリに分類されます。
- 行変更イベント:行のデータ変更を表します。行が変更されると、このイベントが送信され、変更された行に関する情報が含まれます。
- DDLイベント:DDLの変更を表します。このイベントは、上流でDDL文が正常に実行された後に送信されます。DDLイベントはすべてのMQパーティションにブロードキャストされます。
- 解決されたイベント: 受信したイベントが完了する特別な時点を表します。
制限
- ほとんどの場合、バージョンの行変更イベントは 1 回だけ送信されますが、ノード障害やネットワーク パーティションなどの特別な状況では、同じバージョンの行変更イベントが複数回送信されることがあります。
- 同じテーブルで、最初に送信された各バージョンの行変更イベントは、イベント ストリーム内のタイムスタンプ (TS) の順に増加します。
- 解決済みイベントは、各MQパーティションに定期的にブロードキャストされます。解決済みイベントとは、解決済みイベントTSよりも前のTSを持つイベントがダウンストリームに送信されたことを意味します。
- DDL イベントは各 MQ パーティションにブロードキャストされます。
- 1 つの行の複数の行変更イベントが同じ MQ パーティションに送信されます。
メッセージ形式
メッセージには、次の形式で配置された 1 つ以上のイベントが含まれます。
鍵:
価値:
LengthNN番目のキー/値の長さを表します。- 長さとプロトコルバージョンはビッグエンディアン
int64型です。 - 現在のプロトコルのバージョンは
1です。
イベント形式
このセクションでは、行変更イベント、DDL イベント、解決イベントの形式について説明します。
行変更イベント
鍵:
{ "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":1 }価値:
Insertイベント。新しく追加された行データが出力されます。{ "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }Updateイベント。新しく追加された行データ("u")と更新前の行データ("p")が出力されます。{ "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } }, "p":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }Deleteイベント。削除された行データが出力されます。{ "d":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }
DDLイベント
鍵:
{ "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":2 }価値:
{ "q":<DDL Query>, "t":<DDL Type> }
解決されたイベント
鍵:
{ "ts":<TS>, "t":3 }Value: None
イベントストリーム出力の例
このセクションでは、イベント ストリームの出力ログを表示します。
アップストリームで次の SQL ステートメントを実行し、MQ パーティション番号が 2 であるとします。
CREATE TABLE test.t1(id int primary key, val varchar(16));
次のログ 1 とログ 3 から、DDL イベントがすべての MQ パーティションにブロードキャストされ、解決されたイベントが各 MQ パーティションに定期的にブロードキャストされていることがわかります。
1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
アップストリームで次の SQL ステートメントを実行します。
BEGIN;
INSERT INTO test.t1(id, val) VALUES (1, 'aa');
INSERT INTO test.t1(id, val) VALUES (2, 'aa');
UPDATE test.t1 SET val = 'bb' WHERE id = 2;
INSERT INTO test.t1(id, val) VALUES (3, 'cc');
COMMIT;
- 次のログ 5 とログ 6 から、同じテーブル上の行変更イベントは主キーに基づいて異なるパーティションに送信される可能性がありますが、同じ行への変更は同じパーティションに送信されるため、ダウンストリームでイベントを簡単に同時に処理できることがわかります。
- ログ 6 以降、トランザクション内の同じ行に対する複数の変更は、1 つの行変更イベントでのみ送信されます。
- ログ 8 は、ログ 7 の繰り返しイベントです。行変更イベントは繰り返される可能性がありますが、各バージョンの最初のイベントは順番に送信されます。
5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"aa\"}}}"]
6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"bb\"}}}"]
7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"cc\"}}}"]
8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"cc\"}}}"]
アップストリームで次の SQL ステートメントを実行します。
BEGIN;
DELETE FROM test.t1 WHERE id = 1;
UPDATE test.t1 SET val = 'dd' WHERE id = 3;
UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2;
COMMIT;
- ログ9は、
Deleteタイプの行変更イベントです。このタイプのイベントには、主キー列または一意のインデックス列のみが含まれます。 - ログ13とログ14は解決済みイベントです。解決済みイベントとは、このパーティションにおいて、解決済みTSよりも小さいイベント(行変更イベントとDDLイベントを含む)が送信されたことを意味します。
9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"]
10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"]
11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"]
12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"]
13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
消費者向けプロトコル解析
現在、TiCDCはTiCDCオープンプロトコル用の標準解析ライブラリを提供していませんが、 Golang版とJava版の解析例が提供されています。このドキュメントで提供されているデータ形式と以下の例を参考に、コンシューマー向けのプロトコル解析を実装できます。
カラムタイプコード
Column Type Code 、行変更イベントの列データ型を表します。
DDLタイプコード
DDL Type Code 、DDL イベントの DDL ステートメント タイプを表します。
列のビットフラグ
ビット フラグは列の特定の属性を表します。
例:
列フラグの値が85の場合、その列は NULL 可能列、一意のインデックス列、生成された列、およびバイナリ エンコード列になります。
85 == 0b_101_0101
== NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag
列の値が46の場合、その列は複合インデックス列、主キー列、生成列、およびハンドル キー列になります。
46 == 0b_010_1110
== MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag
注記:
BinaryFlag、列の型が BLOB/ TEXT (TINYBLOB/TINYTEXT、BINARY/CHAR を含む)の場合にのみ意味を持ちます。上流の列が BLOB 型の場合、BinaryFlag値は1に設定されます。上流の列がTEXT型の場合、BinaryFlag値は0に設定されます。- TiCDCは、上流からテーブルを複製するために、ハンドルインデックスとして有効なインデックス選択します。ハンドルインデックス列の
HandleKeyFlagの値は1に設定されます。