TiCDCオープンプロトコル

TiCDC Open Protocolは、行レベルのデータ変更通知プロトコルであり、監視、キャッシング、フルテキストインデックス作成、分析エンジン、および異なるデータベース間のプライマリ-セカンダリレプリケーション用のデータソースを提供します。 TiCDCはTiCDCOpenProtocolに準拠し、TiDBのデータ変更をMQ(メッセージキュー)などのサードパーティのデータメディアに複製します。

TiCDC Open Protocolは、イベントを基本ユニットとして使用して、データ変更イベントをダウンストリームに複製します。イベントは3つのカテゴリに分けられます。

  • 行変更イベント:行のデータ変更を表します。行が変更されると、このイベントが送信され、変更された行に関する情報が含まれます。
  • DDLイベント:DDLの変更を表します。このイベントは、アップストリームでDDLステートメントが正常に実行された後に送信されます。 DDLイベントはすべてのMQパーティションにブロードキャストされます。
  • 解決されたイベント:受信したイベントが完了する前の特別な時点を表します。

制限

  • ほとんどの場合、バージョンの行変更イベントは1回だけ送信されますが、ノード障害やネットワークパーティションなどの特殊な状況では、同じバージョンの行変更イベントが複数回送信される場合があります。
  • 同じテーブルで、最初に送信された各バージョンの行変更イベントは、イベントストリームのタイムスタンプ(TS)の順序でインクリメントされます。
  • 解決されたイベントは、各MQパーティションに定期的にブロードキャストされます。解決済みイベントとは、解決済みイベントTSより前のTSを持つイベントがダウンストリームに送信されたことを意味します。
  • DDLイベントは各MQパーティションにブロードキャストされます。
  • 行の複数の行変更イベントが同じMQパーティションに送信されます。

メッセージ形式

メッセージには、次の形式で配置された1つ以上のイベントが含まれます。

鍵:

オフセット(バイト)0〜78〜1516〜(15 +長さ1)..。..。
パラメータプロトコルバージョン長さ1イベントキー1長さNイベントKeyN

価値:

オフセット(バイト)0〜78〜(7 +長さ1)..。..。
パラメータ長さ1イベント値1長さNイベント値N
  • LengthNは、 N番目のキー/値の長さを表します。
  • 長さとプロトコルバージョンはビッグエンディアンint64タイプです。
  • 現在のプロトコルのバージョンは1です。

イベント形式

このセクションでは、行変更イベント、DDLイベント、および解決済みイベントの形式を紹介します。

行変更イベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":1 }
    パラメータタイプ説明
    TS番号行の変更を引き起こすトランザクションのタイムスタンプ。
    スキーマ名行が含まれるスキーマの名前。
    テーブル名行が含まれるテーブルの名前。
  • 価値:

    Insertイベント。新しく追加された行データが出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Updateイベント。新しく追加された行データ( "u")と更新前の行データ( "p")が出力されます。後者( "p")は、古い値機能が有効になっている場合にのみ出力されます。

    { "u":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } }, "p":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }

    Deleteイベント。削除した行データを出力します。古い値機能が有効になっている場合、 Deleteイベントには、削除された行データのすべての列が含まれます。この機能を無効にすると、 DeleteイベントにはHandleKey列のみが含まれます。

    { "d":{ <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> }, <Column Name>:{ "t":<Column Type>, "h":<Where Handle>, "f":<Flag>, "v":<Column Value> } } }
    パラメータタイプ説明
    列名列名。
    列タイプ番号列タイプ。詳細については、 列タイプコードを参照してください。
    ハンドルする場所ブール値この列がWhere節のフィルター条件になり得るかどうかを判別します。この列がテーブル上で一意である場合、 Where Handletrueです。
    国旗番号列のビットフラグ。詳細については、 列のビットフラグを参照してください。
    列の値どれでも列の値。

DDLイベント

  • 鍵:

    { "ts":<TS>, "scm":<Schema Name>, "tbl":<Table Name>, "t":2 }
    パラメータタイプ説明
    TS番号DDL変更を実行するトランザクションのタイムスタンプ。
    スキーマ名DDL変更のスキーマ名。空の文字列である可能性があります。
    テーブル名DDL変更のテーブル名。空の文字列である可能性があります。
  • 価値:

    { "q":<DDL Query>, "t":<DDL Type> }
    パラメータタイプ説明
    DDLクエリDDLクエリSQL
    DDLタイプDDLタイプ。詳細については、 DDLタイプコードを参照してください。

解決されたイベント

  • 鍵:

    { "ts":<TS>, "t":3 }
    パラメータタイプ説明
    TS番号解決されたタイムスタンプ。このイベントより前のTSが送信されました。
  • 値:なし

イベントストリーム出力の例

このセクションでは、イベントストリームの出力ログを表示および表示します。

アップストリームで次のSQLステートメントを実行し、MQパーティション番号が2であるとします。

CREATE TABLE test.t1(id int primary key, val varchar(16));

次のログ1とログ3から、DDLイベントがすべてのMQパーティションにブロードキャストされ、解決されたイベントが各MQパーティションに定期的にブロードキャストされていることがわかります。

1. [partition=0] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 2. [partition=0] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=] 3. [partition=1] [key="{\"ts\":415508856908021766,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":2}"] [value="{\"q\":\"CREATE TABLE test.t1(id int primary key, val varchar(16))\",\"t\":3}"] 4. [partition=1] [key="{\"ts\":415508856908021766,\"t\":3}"] [value=]

アップストリームで次のSQLステートメントを実行します。

BEGIN; INSERT INTO test.t1(id, val) VALUES (1, 'aa'); INSERT INTO test.t1(id, val) VALUES (2, 'aa'); UPDATE test.t1 SET val = 'bb' WHERE id = 2; INSERT INTO test.t1(id, val) VALUES (3, 'cc'); COMMIT;
  • 次のログ5とログ6から、同じテーブルの行変更イベントが主キーに基づいて異なるパーティションに送信される可能性があることがわかりますが、同じ行への変更は同じパーティションに送信されるため、ダウンストリームは簡単に実行できますイベントを同時に処理します。
  • ログ6から、トランザクション内の同じ行に対する複数の変更は、1つの行変更イベントでのみ送信されます。
  • ログ8は、ログ7の繰り返しイベントです。行変更イベントは繰り返される場合がありますが、各バージョンの最初のイベントは順番に送信されます。
5. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":1},\"val\":{\"t\":15,\"v\":\"YWE=\"}}}"] 6. [partition=1] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":2},\"val\":{\"t\":15,\"v\":\"YmI=\"}}}"] 7. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"] 8. [partition=0] [key="{\"ts\":415508878783938562,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"Y2M=\"}}}"]

アップストリームで次のSQLステートメントを実行します。

BEGIN; DELETE FROM test.t1 WHERE id = 1; UPDATE test.t1 SET val = 'dd' WHERE id = 3; UPDATE test.t1 SET id = 4, val = 'ee' WHERE id = 2; COMMIT;
  • ログ9は、 Deleteタイプの行変更イベントです。このタイプのイベントには、主キー列または一意のインデックス列のみが含まれます。
  • ログ13とログ14は解決されたイベントです。解決済みイベントとは、このパーティションで、解決済みTSよりも小さいイベント(行変更イベントおよびDDLイベントを含む)が送信されたことを意味します。
9. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":1}}}"] 10. [partition=1] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"d\":{\"id\":{\"t\":3,\"h\":true,\"v\":2}}}"] 11. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":3},\"val\":{\"t\":15,\"v\":\"ZGQ=\"}}}"] 12. [partition=0] [key="{\"ts\":415508881418485761,\"scm\":\"test\",\"tbl\":\"t1\",\"t\":1}"] [value="{\"u\":{\"id\":{\"t\":3,\"h\":true,\"v\":4},\"val\":{\"t\":15,\"v\":\"ZWU=\"}}}"] 13. [partition=0] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=] 14. [partition=1] [key="{\"ts\":415508881038376963,\"t\":3}"] [value=]

消費者のためのプロトコル解析

現在、TiCDCはTiCDC Open Protocolの標準解析ライブラリを提供していませんが、GolangバージョンとJavaバージョンの解析例が提供されています。このドキュメントと次の例で提供されているデータ形式を参照して、コンシューマーのプロトコル解析を実装できます。

列タイプコード

Column Type Codeは、行変更イベントの列データ型を表します。

タイプコード出力例説明
TINYINT / BOOLEAN1{"t":1、 "v":1}
SMALLINT2{"t":2、 "v":1}
INT3{"t":3、 "v":123}
浮く4{"t":4、 "v":153.123}
ダブル5{"t":5、 "v":153.123}
ヌル6{"t":6、 "v":null}
タイムスタンプ7{"t":7、 "v""1973-12-30 15:30:00"}
BIGINT8{"t":8、 "v":123}
MEDIUMINT9{"t":9、 "v":123}
日にち10/14{"t":10、 "v""2000-01-01"}
時間11{"t":11、 "v""23:59:59"}
日付時刻12{"t":12、 "v""2015-12-20 23:58:58"}
13{"t":13、 "v":1970}
VARCHAR / VARBINARY15/253{"t":15、 "v""test"} / {"t":15、 "v""\ x89PNG \ r \ n \ x1a \ n"}値はUTF-8でエンコードされます。アップストリームタイプがVARBINARYの場合、非表示の文字はエスケープされます。
少し16{"t":16、 "v":81}
JSON245{"t":245、 "v""{\" key1 \ ":\" value1 \ "}"}
10進数246{"t":246、 "v""129012.1230000"}
ENUM247{"t":247、 "v":1}
設定248{"t":248、 "v":3}
TINYTEXT / TINYBLOB249{"t":249、 "v""5rWL6K + VdGV4dA =="}値はBase64でエンコードされます。
MEDIUMTEXT / MEDIUMBLOB250{"t":250、 "v""5rWL6K + VdGV4dA =="}値はBase64でエンコードされます。
LONGTEXT / LONGBLOB251{"t":251、 "v""5rWL6K + VdGV4dA =="}値はBase64でエンコードされます。
TEXT / BLOB252{"t":252、 "v""5rWL6K + VdGV4dA =="}値はBase64でエンコードされます。
CHAR / BINARY254{"t":254、 "v""test"} / {"t":254、 "v""\ x89PNG \ r \ n \ x1a \ n"}値はUTF-8でエンコードされます。アップストリームタイプがBINARYの場合、非表示の文字はエスケープされます。
幾何学255サポートされていません

DDLタイプコード

DDL Type Codeは、DDLイベントのDDLステートメントタイプを表します。

タイプコード
スキーマの作成1
ドロップスキーマ2
テーブルの作成3
ドロップテーブル4
列を追加5
ドロップカラム6
インデックスを追加7
ドロップインデックス8
外部キーを追加する9
外部キーをドロップする10
テーブルの切り捨て11
列を変更12
自動IDをリベース13
テーブルの名前を変更14
デフォルト値の設定15
シャードRowID16
テーブルコメントの変更17
インデックスの名前を変更18
テーブルパーティションの追加19
テーブルパーティションの削除20
ビューの作成21
テーブルの文字セットを変更して照合する22
テーブルパーティションを切り捨てる23
ドロップビュー24
テーブルを回復する25
スキーマ文字セットを変更して照合する26
ロックテーブル27
テーブルのロックを解除28
修理テーブル29
TiFlashレプリカを設定します30
TiFlashレプリカのステータスを更新する31
主キーを追加する32
主キーを削除します33
シーケンスの作成34
シーケンスの変更35
ドロップシーケンス36

列のビットフラグ

ビットフラグは、列の特定の属性を表します。

少し価値名前説明
10x01BinaryFlag列がバイナリエンコードされた列であるかどうか。
20x02HandleKeyFlag列がハンドルインデックス列であるかどうか。
30x04GeneratedColumnFlag列が生成された列であるかどうか。
40x08PrimaryKeyFlag列が主キー列であるかどうか。
50x10UniqueKeyFlag列が一意のインデックス列であるかどうか。
60x20MultipleKeyFlag列が複合インデックス列であるかどうか。
70x40NullableFlag列がNULL可能列であるかどうか。
80x80UnsignedFlag列が符号なし列であるかどうか。

例:

列フラグの値が85の場合、その列はnull許容列、一意のインデックス列、生成された列、およびバイナリエンコードされた列です。

85 == 0b_101_0101 == NullableFlag | UniqueKeyFlag | GeneratedColumnFlag | BinaryFlag

列の値が46の場合、その列は複合インデックス列、主キー列、生成された列、およびハンドルキー列です。

46 == 0b_010_1110 == MultipleKeyFlag | PrimaryKeyFlag | GeneratedColumnFlag | HandleKeyFlag

ノート:

  • BinaryFlagは、列タイプがBLOB / TEXT(TINYBLOB/TINYTEXTおよびBINARY/CHARを含む)の場合にのみ意味があります。アップストリーム列がBLOBタイプの場合、 BinaryFlagの値は1に設定されます。アップストリーム列がTEXTタイプの場合、 BinaryFlagの値は0に設定されます。
  • アップストリームからテーブルを複製するために、TiCDCはハンドルインデックスとして有効なインデックスを選択します。 Handleインデックス列のHandleKeyFlag値は1に設定されます。

このページは役に立ちましたか?