TiCDC オープンプロトコル

TiCDC オープン プロトコルは、行レベルのデータ変更通知プロトコルであり、監視、キャッシュ、フルテキスト インデックス、分析エンジン、および異なるデータベース間のプライマリ セカンダリ レプリケーション用のデータ ソースを提供します。TiCDC は TiCDC オープン プロトコルに準拠しており、TiDB のデータ変更を MQ (メッセージ キュー) などのサードパーティ データ メディアに複製します。

TiCDC オープン プロトコルは、データ変更イベントをダウンストリームに複製するための基本単位としてイベントを使用します。イベントは次の 3 つのカテゴリに分けられます。

  • 行変更イベント: 行のデータの変更を表します。行が変更されると、このイベントが送信され、変更された行に関する情報が含まれます。
  • DDL イベント: DDL の変更を表します。このイベントは、アップストリームで DDL ステートメントが正常に実行された後に送信されます。DDL イベントは、すべての MQ パーティションにブロードキャストされます。
  • 解決されたイベント: 受信したイベントが完了する特別な時点を表します。

制限

  • ほとんどの場合、バージョンの行変更イベントは 1 回だけ送信されますが、ノード障害やネットワーク パーティションなどの特殊な状況では、同じバージョンの行変更イベントが複数回送信されることがあります。
  • 同じテーブルで、最初に送信される各バージョンの行変更イベントは、イベント ストリーム内のタイムスタンプ (TS) の順に増加されます。
  • 解決されたイベントは、各 MQ パーティションに定期的にブロードキャストされます。解決されたイベントとは、解決されたイベント TS より前の TS を持つすべてのイベントがダウンストリームに送信されたことを意味します。
  • DDL イベントは各 MQ パーティションにブロードキャストされます。
  • 行の複数の行変更イベントが同じ MQ パーティションに送信されます。

メッセージ形式

メッセージには、次の形式で配置された 1 つ以上のイベントが含まれます。

鍵:

オフセット(バイト)0〜78〜1516~(15+長さ1)......
パラメータプロトコルバージョン長さ1イベントキー1長さNイベントキーN

価値:

オフセット(バイト)0〜78~(7+長さ1)......
パラメータ長さ1イベント値1長さNイベント値N
  • LengthN N番目のキー/値の長さを表します。
  • 長さとプロトコルバージョンはビッグエンディアンint64型です。
  • 現在のプロトコルのバージョンは1です。

イベント形式

このセクションでは、行変更イベント、DDL イベント、解決イベントの形式について説明します。

行変更イベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":1 }
    パラメータタイプ説明
    TS番号行の変更の原因となったトランザクションのタイムスタンプ。
    スキーマ名行が含まれるスキーマの名前。
    テーブル名行が含まれているテーブルの名前。
  • 価値:

    Insertイベント。新しく追加された行データが出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Updateイベント。新しく追加された行データ(「u」)と更新前の行データ(「p」)が出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } }, "p":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Deleteイベント。削除された行データが出力されます。

    { "d":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }
    パラメータタイプ説明
    カラム名列名。
    カラムタイプ番号列の種類。詳細についてはカラムタイプコード参照してください。
    ハンドルの場所ブールこの列がWhere句のフィルター条件になるかどうかを決定します。この列がテーブル上で一意である場合、 Where Handletrueになります。
    フラグ番号列のビットフラグ。詳細は列のビットフラグを参照。
    カラムの値どれでもカラムの値。

DDLイベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":2 }
    パラメータタイプ説明
    TS番号DDL 変更を実行するトランザクションのタイムスタンプ。
    スキーマ名DDL 変更のスキーマ名。空の文字列になる場合があります。
    テーブル名DDL 変更のテーブル名。空の文字列になる場合があります。
  • 価値:

    { "q":<DDL Query>, "t":<DDL Type> }
    パラメータタイプ説明
    DDLクエリDDLクエリSQL
    DDLタイプDDLタイプ。詳細はDDL タイプコード参照してください。

解決されたイベント

  • 鍵:

    { "ts":<TS>, "t":3 }
    パラメータタイプ説明
    TS番号解決されたタイムスタンプ。このイベントより前の TS は送信されています。
  • 値:なし

イベントストリーム出力の例

このセクションでは、イベント ストリームの出力ログを表示します。

アップストリームで次の SQL ステートメントを実行し、MQ パーティション番号が 2 であるとします。

CREATE TABLE test.t1(id int primary key, val varchar(16));

次のログ 1 とログ 3 から、DDL イベントがすべての MQ パーティションにブロードキャストされ、解決されたイベントが各 MQ パーティションに定期的にブロードキャストされていることがわかります。

1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=] 3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]

アップストリームで次の SQL ステートメントを実行します。

BEGIN; INSERT INTO test.t1(id, val) VALUES (1, 'aa'); INSERT INTO test.t1(id, val) VALUES (2, 'aa'); UPDATE test.t1 SET val = 'bb' WHERE id = 2; INSERT INTO test.t1(id, val) VALUES (3, 'cc'); COMMIT;
  • 次のログ 5 とログ 6 から、同じテーブル上の行変更イベントは主キーに基づいて異なるパーティションに送信される可能性がありますが、同じ行への変更は同じパーティションに送信されるため、ダウンストリームでイベントを同時に簡単に処理できることがわかります。
  • ログ 6 から、トランザクション内の同じ行に対する複数の変更は、1 つの行変更イベントでのみ送信されます。
  • ログ 8 は、ログ 7 の繰り返しイベントです。行変更イベントは繰り返される可能性がありますが、各バージョンの最初のイベントは順番に送信されます。
5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"YWE=\"}}}"] 6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"YmI=\"}}}"] 7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"] 8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]

アップストリームで次の SQL ステートメントを実行します。

BEGIN; DELETE FROM test.t1 WHERE id = 1; UPDATE test.t1 SET val = 'dd' WHERE id = 3; UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2; COMMIT;
  • ログ 9 は、 Deleteタイプの行変更イベントです。このタイプのイベントには、主キー列または一意のインデックス列のみが含まれます。
  • ログ 13 とログ 14 は解決されたイベントです。解決されたイベントとは、このパーティションで、解決された TS よりも小さいイベント (行変更イベントや DDL イベントを含む) が送信されたことを意味します。
9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"] 10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"] 11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"] 12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"] 13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=] 14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]

消費者向けプロトコル解析

現在、TiCDC は TiCDC Open Protocol の標準解析ライブラリを提供していませんが、 GolangバージョンとJavaバージョンの解析例が提供されています。このドキュメントで提供されているデータ形式と次の例を参照して、コンシューマー向けのプロトコル解析を実装できます。

カラムタイプコード

Column Type Codeは行変更イベントの列データ型を表します。

タイプコード出力例説明
TINYINT/ブール1{"t":1,"v":1}
スモールイント2{"t":2,"v":1}
内部3{"t":3,"v":123}
浮く4{"t":4,"v":153.123}
ダブル5{"t":5,"v":153.123}
ヌル6{"t":6,"v":null}
タイムスタンプ7{"t":7,"v":"1973-12-30 15:30:00"}
ビッグイント8{"t":8,"v":123}
ミディアムミント9{"t":9,"v":123}
日付10/14{"t":10,"v":"2000-01-01"}
時間11{"t":11,"v":"23:59:59"}
日付時刻12{"t":12,"v":"2015-12-20 23:58:58"}
13{"t":13,"v":1970}
VARCHAR/VARBINARY15/253{"t":15,"v":"テスト"} / {"t":15,"v":"\x89PNG\r\n\x1a\n"}値は UTF-8 でエンコードされます。アップストリーム タイプが VARBINARY の場合、非表示の文字はエスケープされます。
少し16{"t":16,"v":81}
翻訳245{"t":245,"v":"{\"キー1\": \"値1\"}"}
小数点246{"t":246,"v":"129012.1230000"}
列挙247{"t":247,"v":1}
セット248{"t":248,"v":3}
TINYテキスト/TINYブロブ249{"t":249,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
MEDIUMテキスト/MEDIUMブロブ250{"t":250,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
LONGTEXT/LONGBLOB251{"t":251,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
TEXT/BLOB252{"t":252,"v":"5rWL6K+VdGV4dA=="}値は Base64 でエンコードされます。
文字/バイナリ254{"t":254,"v":"テスト"} / {"t":254,"v":"\x89PNG\r\n\x1a\n"}値は UTF-8 でエンコードされます。アップストリーム タイプが BINARY の場合、非表示の文字はエスケープされます。
幾何学255サポートされていません

DDL タイプコード

DDL Type Code DDL イベントの DDL ステートメント タイプを表します。

タイプコード
スキーマの作成1
スキーマの削除2
テーブルを作成3
ドロップテーブル4
カラムを追加5
カラムの削除6
インデックスを追加7
ドロップインデックス8
外部キーの追加9
外部キーの削除10
テーブルを切り捨て11
カラムの変更12
自動IDのリベース13
テーブル名の変更14
デフォルト値を設定する15
シャード行ID16
テーブルコメントの変更17
インデックスの名前を変更18
テーブルパーティションの追加19
テーブルパーティションの削除20
ビューの作成21
表の文字セットと照合を変更する22
テーブルパーティションの切り捨て23
ドロップビュー24
テーブルの回復25
スキーマ文字セットと照合を変更する26
ロックテーブル27
テーブルのロックを解除28
修理表29
TiFlashレプリカを設定する30
TiFlashレプリカのステータスを更新31
主キーの追加32
主キーを削除33
シーケンスを作成34
シーケンスの変更35
ドロップシーケンス36

列のビットフラグ

ビット フラグは列の特定の属性を表します。

少し価値名前説明
10x01バイナリフラグ列がバイナリエンコードされた列であるかどうか。
20x02ハンドルキーフラグ列がハンドル インデックス列であるかどうか。
30x04生成された列フラグ列が生成された列であるかどうか。
40x08プライマリキーフラグ列が主キー列であるかどうか。
50x10ユニークキーフラグ列が一意のインデックス列であるかどうか。
60x20複数キーフラグ列が複合インデックス列であるかどうか。
70x40Nullableフラグ列が NULL 可能列であるかどうか。
80x80未署名フラグ列が符号なし列であるかどうか。

例:

列フラグの値が85場合、その列は NULL 可能列、一意のインデックス列、生成された列、およびバイナリ エンコードされた列です。

85 == 0b_101_0101 == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag

列の値が46の場合、その列は複合インデックス列、主キー列、生成列、およびハンドル キー列です。

46 == 0b_010_1110 == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag

注記:

  • BinaryFlagは、列タイプが BLOB/ TEXT (TINYBLOB/TINYTEXT および BINARY/CHAR を含む) の場合にのみ意味を持ちます。上流列が BLOB タイプの場合、 BinaryFlag値は1に設定されます。上流列がTEXTタイプの場合、 BinaryFlag値は0に設定されます。
  • アップストリームからテーブルを複製するために、TiCDC はハンドル インデックスとして有効なインデックスを選択します。ハンドル インデックス列のHandleKeyFlag値は1に設定されます。

このページは役に立ちましたか?