TiCDC オープン プロトコル
TiCDC オープン プロトコルは、監視、キャッシュ、全文インデックス作成、分析エンジン、および異なるデータベース間のプライマリ - セカンダリ レプリケーションのためのデータ ソースを提供する行レベルのデータ変更通知プロトコルです。 TiCDC は TiCDC Open Protocol に準拠し、TiDB のデータ変更を MQ (Message Queue) などのサードパーティ データ メディアに複製します。
TiCDC オープン プロトコルは、データ変更イベントをダウンストリームに複製するための基本単位としてイベントを使用します。イベントは 3 つのカテゴリに分類されます。
- 行変更イベント: 行内のデータ変更を表します。行が変更されると、このイベントが送信され、変更された行に関する情報が含まれます。
 - DDL イベント: DDL の変更を表します。このイベントは、DDL ステートメントがアップストリームで正常に実行された後に送信されます。 DDL イベントはすべての MQ パーティションにブロードキャストされます。
 - 解決済みイベント: 受信したイベントが完了する前の特別な時点を表します。
 
制限
- ほとんどの場合、あるバージョンの行変更イベントは 1 回だけ送信されますが、ノード障害やネットワークの分断などの特殊な状況では、同じバージョンの行変更イベントが複数回送信されることがあります。
 - 同じテーブル上で、最初に送信される各バージョンの行変更イベントは、イベント ストリーム内のタイムスタンプ (TS) の順に増加します。
 - 解決されたイベントは、各 MQ パーティションに定期的にブロードキャストされます。解決済みイベントは、解決済みイベント TS より前の TS を持つイベントがダウンストリームに送信されたことを意味します。
 - DDL イベントは各 MQ パーティションにブロードキャストされます。
 - 1 つの行の複数の行変更イベントが同じ MQ パーティションに送信されます。
 
メッセージフォーマット
メッセージには、次の形式で配置された 1 つ以上のイベントが含まれます。
鍵:
| オフセット(バイト) | 0~7 | 8~15 | 16~(15+長さ1) | ... | ... | 
|---|---|---|---|---|---|
| パラメータ | プロトコルのバージョン | 長さ1 | イベントキー1 | 長さN | イベントキーN | 
価値:
| オフセット(バイト) | 0~7 | 8~(7+長さ1) | ... | ... | 
|---|---|---|---|---|
| パラメータ | 長さ1 | イベント値1 | 長さN | イベント値N | 
LengthNN番目のキー/値の長さを表します。- 長さとプロトコルバージョンはビッグエンディアン
int64タイプです。 - 現在のプロトコルのバージョンは
1です。 
イベント形式
このセクションでは、行変更イベント、DDL イベント、および解決済みイベントの形式を紹介します。
行変更イベント
鍵:
{ "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":1 }パラメータ タイプ 説明 TS 番号 行の変更を引き起こしたトランザクションのタイムスタンプ。 スキーマ名 弦 行が存在するスキーマの名前。 テーブル名 弦 行が存在するテーブルの名前。 価値:
Insertイベント。新たに追加された行データが出力されます。{ "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }Updateイベント。新たに追加された行データ("u")と更新前の行データ("p")が出力されます。後者 (「p」) は、古い値機能が有効な場合にのみ出力されます。{ "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } }, "p":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }Deleteイベント。削除されたロウデータが出力されます。古い値機能が有効になっている場合、Deleteイベントには削除された行データのすべての列が含まれます。この機能が無効になっている場合、Deleteイベントにはハンドルキー列のみが含まれます。{ "d":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }パラメータ タイプ 説明 カラム名 弦 列名。 カラムの種類 番号 列のタイプ。詳細はカラムタイプコードを参照してください。 ハンドルの場所 ブール値 この列を Where句のフィルター条件にできるかどうかを決定します。この列がテーブル上で一意である場合、Where Handleはtrueになります。国旗 番号 列のビットフラグ。詳細はカラムのビットフラグを参照してください。 カラムの値 どれでも カラムの値。 
DDLイベント
鍵:
{ "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":2 }パラメータ タイプ 説明 TS 番号 DDL 変更を実行するトランザクションのタイムスタンプ。 スキーマ名 弦 DDL 変更のスキーマ名。空の文字列の場合があります。 テーブル名 弦 DDL 変更のテーブル名。空の文字列の場合があります。 価値:
{ "q":<DDL Query>, "t":<DDL Type> }パラメータ タイプ 説明 DDLクエリ 弦 DDLクエリSQL DDL タイプ 弦 DDL タイプ。詳細はDDL タイプ コードを参照してください。 
解決されたイベント
鍵:
{ "ts":<TS>, "t":3 }パラメータ タイプ 説明 TS 番号 解決されたタイムスタンプ。このイベントより前の TS は送信されています。 値:なし
イベントストリームの出力例
このセクションでは、イベント ストリームの出力ログを表示します。
次の SQL ステートメントをアップストリームで実行し、MQ パーティション番号が 2 であるとします。
CREATE TABLE test.t1(id int primary key, val varchar(16));
次のログ 1 とログ 3 から、DDL イベントがすべての MQ パーティションにブロードキャストされ、解決されたイベントが各 MQ パーティションに定期的にブロードキャストされていることがわかります。
1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
アップストリームで次の SQL ステートメントを実行します。
BEGIN;
INSERT INTO test.t1(id, val) VALUES (1, 'aa');
INSERT INTO test.t1(id, val) VALUES (2, 'aa');
UPDATE test.t1 SET val = 'bb' WHERE id = 2;
INSERT INTO test.t1(id, val) VALUES (3, 'cc');
COMMIT;
- 次のログ 5 とログ 6 から、同じテーブルの行変更イベントは主キーに基づいて異なるパーティションに送信される可能性がありますが、同じ行への変更は同じパーティションに送信されるため、ダウンストリームが簡単に変更できることがわかります。イベントを同時に処理します。
 - ログ 6 から、トランザクション内の同じ行に対する複数の変更は 1 つの行変更イベントでのみ送信されます。
 - ログ 8 はログ 7 の繰り返しイベントです。行変更イベントは繰り返される可能性がありますが、各バージョンの最初のイベントは順番に送信されます。
 
5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"YWE=\"}}}"]
6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"YmI=\"}}}"]
7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]
8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]
アップストリームで次の SQL ステートメントを実行します。
BEGIN;
DELETE FROM test.t1 WHERE id = 1;
UPDATE test.t1 SET val = 'dd' WHERE id = 3;
UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2;
COMMIT;
- ログ 9 は、タイプ
Deleteの行変更イベントです。このタイプのイベントには、主キー列または一意のインデックス列のみが含まれます。 - ログ 13 とログ 14 は解決されたイベントです。解決済みイベントは、このパーティション内で、解決済み TS より小さいイベント (行変更イベントおよび DDL イベントを含む) が送信されたことを意味します。
 
9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"]
10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"]
11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"]
12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"]
13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
コンシューマ向けのプロトコル解析
現在、TiCDC は TiCDC Open Protocol の標準解析ライブラリを提供していませんが、 GolangバージョンとJavaバージョンの解析サンプルが提供されています。このドキュメントで提供されているデータ形式と次の例を参照して、コンシューマー向けのプロトコル解析を実装できます。
カラムの型コード
Column Type Codeは行変更イベントの列データ型を表します。
| タイプ | コード | 出力例 | 説明 | 
|---|---|---|---|
| TINYINT/ブール値 | 1 | {"t":1,"v":1} | |
| スモールント | 2 | {"t":2,"v":1} | |
| INT | 3 | {"t":3、"v":123} | |
| 浮く | 4 | {"t":4,"v":153.123} | |
| ダブル | 5 | {"t":5,"v":153.123} | |
| ヌル | 6 | {"t":6,"v":null} | |
| タイムスタンプ | 7 | {"t":7,"v":"1973-12-30 15:30:00"} | |
| BIGINT | 8 | {"t":8、"v":123} | |
| ミディアムミント | 9 | {"t":9、"v":123} | |
| 日にち | 10/14 | {"t":10,"v":"2000-01-01"} | |
| 時間 | 11 | {"t":11,"v":"23:59:59"} | |
| 日付時刻 | 12 | {"t":12,"v":"2015-12-20 23:58:58"} | |
| 年 | 13 | {"t":13,"v":1970} | |
| VARCHAR/VARBINARY | 15/253 | {"t":15,"v":"テスト"} / {"t":15,"v":"\x89PNG\r\n\x1a\n"} | 値は UTF-8 でエンコードされます。上流の型が VARBINARY の場合、非表示の文字はエスケープされます。 | 
| 少し | 16 | {"t":16、"v":81} | |
| JSON | 245 | {"t":245,"v":"{\"key1\": \"value1\"}"} | |
| 10進数 | 246 | {"t":246,"v":"129012.1230000"} | |
| ENUM | 247 | {"t":247,"v":1} | |
| 設定 | 248 | {"t":248,"v":3} | |
| TINYTEXT/TINYBLOB | 249 | {"t":249,"v":"5rWL6K+VdGV4dA=="} | 値は Base64 でエンコードされます。 | 
| メディアテキスト/メディアブロブ | 250 | {"t":250,"v":"5rWL6K+VdGV4dA=="} | 値は Base64 でエンコードされます。 | 
| ロングテキスト/ロングブロブ | 251 | {"t":251,"v":"5rWL6K+VdGV4dA=="} | 値は Base64 でエンコードされます。 | 
| TEXT/BLOB | 252 | {"t":252,"v":"5rWL6K+VdGV4dA=="} | 値は Base64 でエンコードされます。 | 
| 文字/バイナリ | 254 | {"t":254,"v":"テスト"} / {"t":254,"v":"\x89PNG\r\n\x1a\n"} | 値は UTF-8 でエンコードされます。上流の型が BINARY の場合、非表示の文字はエスケープされます。 | 
| 幾何学模様 | 255 | サポートされていません | 
DDL タイプ コード
DDL Type Code DDL イベントの DDL ステートメント タイプを表します。
| タイプ | コード | 
|---|---|
| スキーマの作成 | 1 | 
| スキーマの削除 | 2 | 
| テーブルの作成 | 3 | 
| ドロップテーブル | 4 | 
| カラムの追加 | 5 | 
| カラムをドロップ | 6 | 
| インデックスの追加 | 7 | 
| インデックスを削除 | 8 | 
| 外部キーの追加 | 9 | 
| 外部キーを削除 | 10 | 
| テーブルの切り捨て | 11 | 
| カラムの変更 | 12 | 
| 自動 ID をリベースする | 13 | 
| テーブル名の変更 | 14 | 
| デフォルト値の設定 | 15 | 
| シャード行ID | 16 | 
| テーブルコメントの変更 | 17 | 
| インデックス名の変更 | 18 | 
| テーブルパーティションの追加 | 19 | 
| テーブルパーティションの削除 | 20 | 
| ビューの作成 | 21 | 
| テーブルの文字セットを変更して照合する | 22 | 
| テーブルパーティションの切り捨て | 23 | 
| ドロップビュー | 24 | 
| テーブルを回復する | 25 | 
| スキーマ文字セットの変更と照合 | 26 | 
| ロックテーブル | 27 | 
| テーブルのロックを解除する | 28 | 
| 修理テーブル | 29 | 
| TiFlashレプリカを設定する | 30 | 
| TiFlashレプリカのステータスを更新する | 31 | 
| 主キーの追加 | 32 | 
| 主キーを削除 | 33 | 
| シーケンスの作成 | 34 | 
| シーケンスの変更 | 35 | 
| ドロップシーケンス | 36 | 
カラムのビットフラグ
ビット フラグは列の特定の属性を表します。
| 少し | 価値 | 名前 | 説明 | 
|---|---|---|---|
| 1 | 0x01 | バイナリフラグ | 列がバイナリエンコードされた列かどうか。 | 
| 2 | 0x02 | ハンドルキーフラグ | 列がハンドル インデックス列であるかどうか。 | 
| 3 | 0x04 | 生成された列フラグ | 列が生成された列かどうか。 | 
| 4 | 0x08 | 主キーフラグ | 列が主キー列であるかどうか。 | 
| 5 | 0x10 | ユニークキーフラグ | 列が一意のインデックス列であるかどうか。 | 
| 6 | 0x20 | 複数のキーフラグ | 列が複合インデックス列であるかどうか。 | 
| 7 | 0x40 | Nullableフラグ | 列が NULL 許容列かどうか。 | 
| 8 | 0x80 | 署名なしフラグ | 列が符号なし列かどうか。 | 
例:
列フラグの値が85の場合、その列は NULL 可能列、一意のインデックス列、生成列、およびバイナリ エンコード列です。
85 == 0b_101_0101
   == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag
列の値が46の場合、その列は複合インデックス列、主キー列、生成列、およびハンドル キー列です。
46 == 0b_010_1110
   == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag
ノート:
BinaryFlag列の型が BLOB/ TEXT (TINYBLOB/TINYTEXT および BINARY/CHAR を含む) の場合にのみ意味があります。上流列が BLOB 型の場合、値BinaryFlagは1に設定されます。上流列がTEXT型の場合、値BinaryFlagは0に設定されます。- アップストリームからテーブルを複製するために、TiCDC はハンドル インデックスとして有効なインデックスを選択します。 Handle インデックス列の
 HandleKeyFlag値は1に設定されます。