TiCDC オープン プロトコル

TiCDC オープン プロトコルは、監視、キャッシュ、全文インデックス作成、分析エンジン、および異なるデータベース間のプライマリ - セカンダリ レプリケーションのためのデータ ソースを提供する行レベルのデータ変更通知プロトコルです。 TiCDC は TiCDC Open Protocol に準拠し、TiDB のデータ変更を MQ (Message Queue) などのサードパーティ データ メディアに複製します。

TiCDC オープン プロトコルは、データ変更イベントをダウンストリームに複製するための基本単位としてイベントを使用します。イベントは 3 つのカテゴリに分類されます。

  • 行変更イベント: 行内のデータ変更を表します。行が変更されると、このイベントが送信され、変更された行に関する情報が含まれます。
  • DDL イベント: DDL の変更を表します。このイベントは、DDL ステートメントがアップストリームで正常に実行された後に送信されます。 DDL イベントはすべての MQ パーティションにブロードキャストされます。
  • 解決済みイベント: 受信したイベントが完了する前の特別な時点を表します。

制限

  • ほとんどの場合、あるバージョンの行変更イベントは 1 回だけ送信されますが、ノード障害やネットワークの分断などの特殊な状況では、同じバージョンの行変更イベントが複数回送信されることがあります。
  • 同じテーブル上で、最初に送信される各バージョンの行変更イベントは、イベント ストリーム内のタイムスタンプ (TS) の順に増加します。
  • 解決されたイベントは、各 MQ パーティションに定期的にブロードキャストされます。解決済みイベントは、解決済みイベント TS より前の TS を持つイベントがダウンストリームに送信されたことを意味します。
  • DDL イベントは各 MQ パーティションにブロードキャストされます。
  • 1 つの行の複数の行変更イベントが同じ MQ パーティションに送信されます。

メッセージフォーマット

メッセージには、次の形式で配置された 1 つ以上のイベントが含まれます。

鍵:

オフセット(バイト)0~78~1516~(15+長さ1)......
パラメータプロトコルのバージョン長さ1イベントキー1長さNイベントキーN

価値:

オフセット(バイト)0~78~(7+長さ1)......
パラメータ長さ1イベント値1長さNイベント値N
  • LengthN N番目のキー/値の長さを表します。
  • 長さとプロトコルバージョンはビッグエンディアンint64タイプです。
  • 現在のプロトコルのバージョンは1です。

イベント形式

このセクションでは、行変更イベント、DDL イベント、および解決済みイベントの形式を紹介します。

行変更イベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":1 }
    パラメータタイプ説明
    TS番号行の変更を引き起こしたトランザクションのタイムスタンプ。
    スキーマ名行が存在するスキーマの名前。
    テーブル名行が存在するテーブルの名前。
  • 価値:

    Insertイベント。新たに追加された行データが出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Updateイベント。新たに追加された行データ("u")と更新前の行データ("p")が出力されます。後者 (「p」) は、古い値機能が有効な場合にのみ出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } }, "p":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Deleteイベント。削除されたロウデータが出力されます。古い値機能が有効になっている場合、 Deleteイベントには削除された行データのすべての列が含まれます。この機能が無効になっている場合、 Deleteイベントにはハンドルキー列のみが含まれます。

    { "d":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }
    パラメータタイプ説明
    カラム名列名。
    カラムの種類番号列のタイプ。詳細はカラムタイプコードを参照してください。
    ハンドルの場所ブール値この列をWhere句のフィルター条件にできるかどうかを決定します。この列がテーブル上で一意である場合、 Where Handletrueになります。
    国旗番号列のビットフラグ。詳細はカラムのビットフラグを参照してください。
    カラムの値どれでもカラムの値。

DDLイベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":2 }
    パラメータタイプ説明
    TS番号DDL 変更を実行するトランザクションのタイムスタンプ。
    スキーマ名DDL 変更のスキーマ名。空の文字列の場合があります。
    テーブル名DDL 変更のテーブル名。空の文字列の場合があります。
  • 価値:

    { "q":<DDL Query>, "t":<DDL Type> }
    パラメータタイプ説明
    DDLクエリDDLクエリSQL
    DDL タイプDDL タイプ。詳細はDDL タイプ コードを参照してください。

解決されたイベント

  • 鍵:

    { "ts":<TS>, "t":3 }
    パラメータタイプ説明
    TS番号解決されたタイムスタンプ。このイベントより前の TS は送信されています。
  • 値:なし

イベントストリームの出力例

このセクションでは、イベント ストリームの出力ログを表示します。

次の SQL ステートメントをアップストリームで実行し、MQ パーティション番号が 2 であるとします。

CREATE TABLE test.t1(id int primary key, val varchar(16));

次のログ 1 とログ 3 から、DDL イベントがすべての MQ パーティションにブロードキャストされ、解決されたイベントが各 MQ パーティションに定期的にブロードキャストされていることがわかります。

1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=] 3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]

アップストリームで次の SQL ステートメントを実行します。

BEGIN; INSERT INTO test.t1(id, val) VALUES (1, 'aa'); INSERT INTO test.t1(id, val) VALUES (2, 'aa'); UPDATE test.t1 SET val = 'bb' WHERE id = 2; INSERT INTO test.t1(id, val) VALUES (3, 'cc'); COMMIT;
  • 次のログ 5 とログ 6 から、同じテーブルの行変更イベントは主キーに基づいて異なるパーティションに送信される可能性がありますが、同じ行への変更は同じパーティションに送信されるため、ダウンストリームが簡単に変更できることがわかります。イベントを同時に処理します。
  • ログ 6 から、トランザクション内の同じ行に対する複数の変更は 1 つの行変更イベントでのみ送信されます。
  • ログ 8 はログ 7 の繰り返しイベントです。行変更イベントは繰り返される可能性がありますが、各バージョンの最初のイベントは順番に送信されます。
5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"YWE=\"}}}"] 6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"YmI=\"}}}"] 7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"] 8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]

アップストリームで次の SQL ステートメントを実行します。

BEGIN; DELETE FROM test.t1 WHERE id = 1; UPDATE test.t1 SET val = 'dd' WHERE id = 3; UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2; COMMIT;
  • ログ 9 は、タイプDeleteの行変更イベントです。このタイプのイベントには、主キー列または一意のインデックス列のみが含まれます。
  • ログ 13 とログ 14 は解決されたイベントです。解決済みイベントは、このパーティション内で、解決済み TS より小さいイベント (行変更イベントおよび DDL イベントを含む) が送信されたことを意味します。
9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"] 10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"] 11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"] 12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"] 13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=] 14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]

コンシューマ向けのプロトコル解析

現在、TiCDC は TiCDC Open Protocol の標準解析ライブラリを提供していませんが、 GolangバージョンとJavaバージョンの解析サンプルが提供されています。このドキュメントで提供されているデータ形式と次の例を参照して、コンシューマー向けのプロトコル解析を実装できます。

カラムの型コード

Column Type Codeは行変更イベントの列データ型を表します。

タイプコード出力例説明
TINYINT/ブール値1{"t":1,"v":1}
スモールント2{"t":2,"v":1}
INT3{"t":3、"v":123}
浮く4{"t":4,"v":153.123}
ダブル5{"t":5,"v":153.123}
ヌル6{"t":6,"v":null}
タイムスタンプ7{"t":7,"v":"1973-12-30 15:30:00"}
BIGINT8{"t":8、"v":123}
ミディアムミント9{"t":9、"v":123}
日にち10/14{"t":10,"v":"2000-01-01"}
時間11{"t":11,"v":"23:59:59"}
日付時刻12{"t":12,"v":"2015-12-20 23:58:58"}
13{"t":13,"v":1970}
VARCHAR/VARBINARY15/253{"t":15,"v":"テスト"} / {"t":15,"v":"\x89PNG\r\n\x1a\n"}値は UTF-8 でエンコードされます。上流の型が VARBINARY の場合、非表示の文字はエスケープされます。
少し16{"t":16、"v":81}
JSON245{"t":245,"v":"{\"key1\": \"value1\"}"}
10進数246{"t":246,"v":"129012.1230000"}
ENUM247{"t":247,"v":1}
設定248{"t":248,"v":3}
TINYTEXT/TINYBLOB249{"t":249,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
メディアテキスト/メディアブロブ250{"t":250,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
ロングテキスト/ロングブロブ251{"t":251,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
TEXT/BLOB252{"t":252,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
文字/バイナリ254{"t":254,"v":"テスト"} / {"t":254,"v":"\x89PNG\r\n\x1a\n"}値は UTF-8 でエンコードされます。上流の型が BINARY の場合、非表示の文字はエスケープされます。
幾何学模様255サポートされていません

DDL タイプ コード

DDL Type Code DDL イベントの DDL ステートメント タイプを表します。

タイプコード
スキーマの作成1
スキーマの削除2
テーブルの作成3
ドロップテーブル4
カラムの追加5
カラムをドロップ6
インデックスの追加7
インデックスを削除8
外部キーの追加9
外部キーを削除10
テーブルの切り捨て11
カラムの変更12
自動 ID をリベースする13
テーブル名の変更14
デフォルト値の設定15
シャード行ID16
テーブルコメントの変更17
インデックス名の変更18
テーブルパーティションの追加19
テーブルパーティションの削除20
ビューの作成21
テーブルの文字セットを変更して照合する22
テーブルパーティションの切り捨て23
ドロップビュー24
テーブルを回復する25
スキーマ文字セットの変更と照合26
ロックテーブル27
テーブルのロックを解除する28
修理テーブル29
TiFlashレプリカを設定する30
TiFlashレプリカのステータスを更新する31
主キーの追加32
主キーを削除33
シーケンスの作成34
シーケンスの変更35
ドロップシーケンス36

カラムのビットフラグ

ビット フラグは列の特定の属性を表します。

少し価値名前説明
10x01バイナリフラグ列がバイナリエンコードされた列かどうか。
20x02ハンドルキーフラグ列がハンドル インデックス列であるかどうか。
30x04生成された列フラグ列が生成された列かどうか。
40x08主キーフラグ列が主キー列であるかどうか。
50x10ユニークキーフラグ列が一意のインデックス列であるかどうか。
60x20複数のキーフラグ列が複合インデックス列であるかどうか。
70x40Nullableフラグ列が NULL 許容列かどうか。
80x80署名なしフラグ列が符号なし列かどうか。

例:

列フラグの値が85の場合、その列は NULL 可能列、一意のインデックス列、生成列、およびバイナリ エンコード列です。

85 == 0b_101_0101 == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag

列の値が46の場合、その列は複合インデックス列、主キー列、生成列、およびハンドル キー列です。

46 == 0b_010_1110 == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag

ノート:

  • BinaryFlag列の型が BLOB/ TEXT (TINYBLOB/TINYTEXT および BINARY/CHAR を含む) の場合にのみ意味があります。上流列が BLOB 型の場合、値BinaryFlag1に設定されます。上流列がTEXT型の場合、値BinaryFlag0に設定されます。
  • アップストリームからテーブルを複製するために、TiCDC はハンドル インデックスとして有効なインデックスを選択します。 Handle インデックス列のHandleKeyFlag値は1に設定されます。

このページは役に立ちましたか?