TiCDCオープンプロトコル
TiCDC Open Protocolは、行レベルのデータ変更通知プロトコルであり、監視、キャッシング、フルテキストインデックス作成、分析エンジン、および異なるデータベース間のプライマリ-セカンダリレプリケーション用のデータソースを提供します。 TiCDCはTiCDCOpenProtocolに準拠し、TiDBのデータ変更をMQ(メッセージキュー)などのサードパーティのデータメディアに複製します。
TiCDC Open Protocolは、イベントを基本ユニットとして使用して、データ変更イベントをダウンストリームに複製します。イベントは3つのカテゴリに分けられます。
- 行変更イベント:行のデータ変更を表します。行が変更されると、このイベントが送信され、変更された行に関する情報が含まれます。
- DDLイベント:DDLの変更を表します。このイベントは、アップストリームでDDLステートメントが正常に実行された後に送信されます。 DDLイベントはすべてのMQパーティションにブロードキャストされます。
- 解決されたイベント:受信したイベントが完了する前の特別な時点を表します。
制限
- ほとんどの場合、バージョンの行変更イベントは1回だけ送信されますが、ノード障害やネットワークパーティションなどの特殊な状況では、同じバージョンの行変更イベントが複数回送信される場合があります。
- 同じテーブルで、最初に送信された各バージョンの行変更イベントは、イベントストリームのタイムスタンプ(TS)の順序でインクリメントされます。
- 解決されたイベントは、各MQパーティションに定期的にブロードキャストされます。解決済みイベントとは、解決済みイベントTSより前のTSを持つイベントがダウンストリームに送信されたことを意味します。
- DDLイベントは各MQパーティションにブロードキャストされます。
- 行の複数の行変更イベントが同じMQパーティションに送信されます。
メッセージ形式
メッセージには、次の形式で配置された1つ以上のイベントが含まれます。
鍵:
オフセット(バイト) | 0〜7 | 8〜15 | 16〜(15 +長さ1) | ..。 | ..。 |
---|---|---|---|---|---|
パラメータ | プロトコルバージョン | 長さ1 | イベントキー1 | 長さN | イベントKeyN |
価値:
オフセット(バイト) | 0〜7 | 8〜(7 +長さ1) | ..。 | ..。 |
---|---|---|---|---|
パラメータ | 長さ1 | イベント値1 | 長さN | イベント値N |
LengthN
は、N
番目のキー/値の長さを表します。- 長さとプロトコルバージョンはビッグエンディアン
int64
タイプです。 - 現在のプロトコルのバージョンは
1
です。
イベント形式
このセクションでは、行変更イベント、DDLイベント、および解決済みイベントの形式を紹介します。
行変更イベント
鍵:
{ "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":1 }パラメータ タイプ 説明 TS 番号 行の変更を引き起こすトランザクションのタイムスタンプ。 スキーマ名 弦 行が含まれるスキーマの名前。 テーブル名 弦 行が含まれるテーブルの名前。 価値:
Insert
イベント。新しく追加された行データが出力されます。{ "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }Update
イベント。新しく追加された行データ( "u")と更新前の行データ( "p")が出力されます。後者( "p")は、古い値機能が有効になっている場合にのみ出力されます。{ "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } }, "p":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }Delete
イベント。削除した行データを出力します。古い値機能が有効になっている場合、Delete
イベントには、削除された行データのすべての列が含まれます。この機能を無効にすると、Delete
イベントにはHandleKey列のみが含まれます。{ "d":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }パラメータ タイプ 説明 列名 弦 列名。 列タイプ 番号 列タイプ。詳細については、 列タイプコードを参照してください。 ハンドルする場所 ブール値 この列が Where
節のフィルター条件になり得るかどうかを判別します。この列がテーブル上で一意である場合、Where Handle
はtrue
です。国旗 番号 列のビットフラグ。詳細については、 列のビットフラグを参照してください。 列の値 どれでも 列の値。
DDLイベント
鍵:
{ "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":2 }パラメータ タイプ 説明 TS 番号 DDL変更を実行するトランザクションのタイムスタンプ。 スキーマ名 弦 DDL変更のスキーマ名。空の文字列である可能性があります。 テーブル名 弦 DDL変更のテーブル名。空の文字列である可能性があります。 価値:
{ "q":<DDL Query>, "t":<DDL Type> }パラメータ タイプ 説明 DDLクエリ 弦 DDLクエリSQL DDLタイプ 弦 DDLタイプ。詳細については、 DDLタイプコードを参照してください。
解決されたイベント
鍵:
{ "ts":<TS>, "t":3 }パラメータ タイプ 説明 TS 番号 解決されたタイムスタンプ。このイベントより前のTSが送信されました。 値:なし
イベントストリーム出力の例
このセクションでは、イベントストリームの出力ログを表示および表示します。
アップストリームで次のSQLステートメントを実行し、MQパーティション番号が2であるとします。
CREATE TABLE test.t1(id int primary key, val varchar(16));
次のログ1とログ3から、DDLイベントがすべてのMQパーティションにブロードキャストされ、解決されたイベントが各MQパーティションに定期的にブロードキャストされていることがわかります。
1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"]
4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]
アップストリームで次のSQLステートメントを実行します。
BEGIN;
INSERT INTO test.t1(id, val) VALUES (1, 'aa');
INSERT INTO test.t1(id, val) VALUES (2, 'aa');
UPDATE test.t1 SET val = 'bb' WHERE id = 2;
INSERT INTO test.t1(id, val) VALUES (3, 'cc');
COMMIT;
- 次のログ5とログ6から、同じテーブルの行変更イベントが主キーに基づいて異なるパーティションに送信される可能性があることがわかりますが、同じ行への変更は同じパーティションに送信されるため、ダウンストリームは簡単に実行できますイベントを同時に処理します。
- ログ6から、トランザクション内の同じ行に対する複数の変更は、1つの行変更イベントでのみ送信されます。
- ログ8は、ログ7の繰り返しイベントです。行変更イベントは繰り返される場合がありますが、各バージョンの最初のイベントは順番に送信されます。
5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"YWE=\"}}}"]
6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"YmI=\"}}}"]
7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]
8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]
アップストリームで次のSQLステートメントを実行します。
BEGIN;
DELETE FROM test.t1 WHERE id = 1;
UPDATE test.t1 SET val = 'dd' WHERE id = 3;
UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2;
COMMIT;
- ログ9は、
Delete
タイプの行変更イベントです。このタイプのイベントには、主キー列または一意のインデックス列のみが含まれます。 - ログ13とログ14は解決されたイベントです。解決済みイベントとは、このパーティションで、解決済みTSよりも小さいイベント(行変更イベントおよびDDLイベントを含む)が送信されたことを意味します。
9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"]
10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"]
11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"]
12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"]
13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]
消費者のためのプロトコル解析
現在、TiCDCはTiCDC Open Protocolの標準解析ライブラリを提供していませんが、GolangバージョンとJavaバージョンの解析例が提供されています。このドキュメントと次の例で提供されているデータ形式を参照して、コンシューマーのプロトコル解析を実装できます。
列タイプコード
Column Type Code
は、行変更イベントの列データ型を表します。
タイプ | コード | 出力例 | 説明 |
---|---|---|---|
TINYINT / BOOLEAN | 1 | {"t":1、 "v":1} | |
SMALLINT | 2 | {"t":2、 "v":1} | |
INT | 3 | {"t":3、 "v":123} | |
浮く | 4 | {"t":4、 "v":153.123} | |
ダブル | 5 | {"t":5、 "v":153.123} | |
ヌル | 6 | {"t":6、 "v":null} | |
タイムスタンプ | 7 | {"t":7、 "v": "1973-12-30 15:30:00"} | |
BIGINT | 8 | {"t":8、 "v":123} | |
MEDIUMINT | 9 | {"t":9、 "v":123} | |
日にち | 10/14 | {"t":10、 "v": "2000-01-01"} | |
時間 | 11 | {"t":11、 "v": "23:59:59"} | |
日付時刻 | 12 | {"t":12、 "v": "2015-12-20 23:58:58"} | |
年 | 13 | {"t":13、 "v":1970} | |
VARCHAR / VARBINARY | 15/253 | {"t":15、 "v": "test"} / {"t":15、 "v": "\ x89PNG \ r \ n \ x1a \ n"} | 値はUTF-8でエンコードされます。アップストリームタイプがVARBINARYの場合、非表示の文字はエスケープされます。 |
少し | 16 | {"t":16、 "v":81} | |
JSON | 245 | {"t":245、 "v": "{\" key1 \ ":\" value1 \ "}"} | |
10進数 | 246 | {"t":246、 "v": "129012.1230000"} | |
ENUM | 247 | {"t":247、 "v":1} | |
設定 | 248 | {"t":248、 "v":3} | |
TINYTEXT / TINYBLOB | 249 | {"t":249、 "v": "5rWL6K + VdGV4dA =="} | 値はBase64でエンコードされます。 |
MEDIUMTEXT / MEDIUMBLOB | 250 | {"t":250、 "v": "5rWL6K + VdGV4dA =="} | 値はBase64でエンコードされます。 |
LONGTEXT / LONGBLOB | 251 | {"t":251、 "v": "5rWL6K + VdGV4dA =="} | 値はBase64でエンコードされます。 |
TEXT / BLOB | 252 | {"t":252、 "v": "5rWL6K + VdGV4dA =="} | 値はBase64でエンコードされます。 |
CHAR / BINARY | 254 | {"t":254、 "v": "test"} / {"t":254、 "v": "\ x89PNG \ r \ n \ x1a \ n"} | 値はUTF-8でエンコードされます。アップストリームタイプがBINARYの場合、非表示の文字はエスケープされます。 |
幾何学 | 255 | サポートされていません |
DDLタイプコード
DDL Type Code
は、DDLイベントのDDLステートメントタイプを表します。
タイプ | コード |
---|---|
スキーマの作成 | 1 |
ドロップスキーマ | 2 |
テーブルの作成 | 3 |
ドロップテーブル | 4 |
列を追加 | 5 |
ドロップカラム | 6 |
インデックスを追加 | 7 |
ドロップインデックス | 8 |
外部キーを追加する | 9 |
外部キーをドロップする | 10 |
テーブルの切り捨て | 11 |
列を変更 | 12 |
自動IDをリベース | 13 |
テーブルの名前を変更 | 14 |
デフォルト値の設定 | 15 |
シャードRowID | 16 |
テーブルコメントの変更 | 17 |
インデックスの名前を変更 | 18 |
テーブルパーティションの追加 | 19 |
テーブルパーティションの削除 | 20 |
ビューの作成 | 21 |
テーブルの文字セットを変更して照合する | 22 |
テーブルパーティションを切り捨てる | 23 |
ドロップビュー | 24 |
テーブルを回復する | 25 |
スキーマ文字セットを変更して照合する | 26 |
ロックテーブル | 27 |
テーブルのロックを解除 | 28 |
修理テーブル | 29 |
TiFlashレプリカを設定します | 30 |
TiFlashレプリカのステータスを更新する | 31 |
主キーを追加する | 32 |
主キーを削除します | 33 |
シーケンスの作成 | 34 |
シーケンスの変更 | 35 |
ドロップシーケンス | 36 |
列のビットフラグ
ビットフラグは、列の特定の属性を表します。
少し | 価値 | 名前 | 説明 |
---|---|---|---|
1 | 0x01 | BinaryFlag | 列がバイナリエンコードされた列であるかどうか。 |
2 | 0x02 | HandleKeyFlag | 列がハンドルインデックス列であるかどうか。 |
3 | 0x04 | GeneratedColumnFlag | 列が生成された列であるかどうか。 |
4 | 0x08 | PrimaryKeyFlag | 列が主キー列であるかどうか。 |
5 | 0x10 | UniqueKeyFlag | 列が一意のインデックス列であるかどうか。 |
6 | 0x20 | MultipleKeyFlag | 列が複合インデックス列であるかどうか。 |
7 | 0x40 | NullableFlag | 列がNULL可能列であるかどうか。 |
8 | 0x80 | UnsignedFlag | 列が符号なし列であるかどうか。 |
例:
列フラグの値が85
の場合、その列はnull許容列、一意のインデックス列、生成された列、およびバイナリエンコードされた列です。
85 == 0b_101_0101
== NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag
列の値が46
の場合、その列は複合インデックス列、主キー列、生成された列、およびハンドルキー列です。
46 == 0b_010_1110
== MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag
ノート:
BinaryFlag
は、列タイプがBLOB / TEXT(TINYBLOB/TINYTEXTおよびBINARY/CHARを含む)の場合にのみ意味があります。アップストリーム列がBLOBタイプの場合、BinaryFlag
の値は1
に設定されます。アップストリーム列がTEXTタイプの場合、BinaryFlag
の値は0
に設定されます。- アップストリームからテーブルを複製するために、TiCDCはハンドルインデックスとして有効なインデックスを選択します。 Handleインデックス列の
HandleKeyFlag
値は1
に設定されます。