TiCDC Avro プロトコル

Avro は、 アパッチ アブロ™によって定義され、デフォルトのデータ交換フォーマットとしてコンフルエントなプラットフォームによって選択されたデータ交換フォーマット プロトコルです。このドキュメントでは、TiDB 拡張フィールド、Avro データ形式の定義、および Avro とコンフルエント スキーマ レジストリの間の相互作用を含む、TiCDC での Avro データ形式の実装について説明します。

アブロを使用

Message Queue (MQ) をダウンストリーム シンクとして使用する場合、 Avro をsink-uriで指定できます。 TiCDC は TiDB DML イベントをキャプチャし、これらのイベントから Avro メッセージを作成して、メッセージをダウンストリームに送信します。 Avro は、スキーマの変更を検出すると、最新のスキーマをスキーマ レジストリに登録します。

以下は、Avro を使用した構成例です。

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

--schema-registryhttpsプロトコルとusername:password認証をサポートします (例: --schema-registry=https://username:password@schema-registry-uri.com )。ユーザー名とパスワードは URL エンコードする必要があります。

TiDB 拡張フィールド

デフォルトでは、Avro は DML イベントで変更された行のデータのみを収集し、データ変更のタイプや TiDB 固有の CommitTS (トランザクションの一意の識別子) は収集しません。この問題に対処するために、TiCDC は次の 3 つの TiDB 拡張フィールドを Avro プロトコル メッセージに導入します。 sink-urienable-tidb-extensiontrue (デフォルトではfalse ) に設定されている場合、TiCDC はメッセージ生成中にこれら 3 つのフィールドを Avro メッセージに追加します。

  • _tidb_op : DML タイプ。 「c」は挿入を示し、「u」は更新を示します。
  • _tidb_commit_ts : トランザクションの一意の識別子。
  • _tidb_commit_physical_time : トランザクション ID の物理タイムスタンプ。

次に設定例を示します。

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

データ形式の定義

TiCDC は DML イベントを Kafka イベントに変換し、イベントのキーと値は Avro プロトコルに従ってエンコードされます。

鍵データ形式

{
    "name":"{{TableName}}",
    "namespace":"{{Namespace}}",
    "type":"record",
    "fields":[
        {{ColumnValueBlock}},
        {{ColumnValueBlock}},
    ]
}
  • {{TableName}}は、イベントが発生したテーブルの名前を示します。
  • {{Namespace}}は Avro の名前空間です。
  • {{ColumnValueBlock}}は、データの各列の形式を定義します。

キーのfieldsには、主キー列または一意のインデックス列のみが含まれます。

値のデータ形式

{
    "name":"{{TableName}}",
    "namespace":"{{Namespace}}",
    "type":"record",
    "fields":[
        {{ColumnValueBlock}},
        {{ColumnValueBlock}},
    ]
}

デフォルトでは、Value のデータ形式は Key と同じです。ただし、値のfieldsには、主キー列だけでなく、すべての列が含まれます。

enable-tidb-extensionを有効にすると、値のデータ形式は次のようになります。

{
    "name":"{{TableName}}",
    "namespace":"{{Namespace}}",
    "type":"record",
    "fields":[
        {{ColumnValueBlock}},
        {{ColumnValueBlock}},
        {
            "name":"_tidb_op",
            "type":"string"
        },
        {
            "name":"_tidb_commit_ts",
            "type":"long"
        },
        {
            "name":"_tidb_commit_physical_time",
            "type":"long"
        }
    ]
}

enable-tidb-extensionが無効になっている値のデータ形式と比較すると、3 つの新しいフィールドが追加されています: _tidb_op_tidb_commit_ts 、および_tidb_commit_physical_time

カラムデータ形式

カラムデータは、キー/値データ形式の{{ColumnValueBlock}}の部分です。 TiCDC は、SQL タイプに基づいてカラムデータ形式を生成します。基本的なカラムデータ形式は次のとおりです。

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"{{TIDB_TYPE}}"
        },
        "type":"{{AVRO_TYPE}}"
    }
}

1 つの列が NULL になる可能性がある場合、カラムのデータ形式は次のようになります。

{
    "default":null,
    "name":"{{ColumnName}}",
    "type":[
        "null",
        {
            "connect.parameters":{
                "tidb_type":"{{TIDB_TYPE}}"
            },
            "type":"{{AVRO_TYPE}}"
        }
    ]
}
  • {{ColumnName}}は列名を示します。
  • {{TIDB_TYPE}}は、TiDB の型を示します。これは、SQL 型との 1 対 1 のマッピングではありません。
  • {{AVRO_TYPE}}アブロスペックのタイプを示します。
SQL タイプTIDB_TYPEAVRO_TYPE説明
ブールINT整数
TINYINTINT整数unsigned の場合、TIDB_TYPE は INT UNSIGNED です。
SMALLINTINT整数unsigned の場合、TIDB_TYPE は INT UNSIGNED です。
ミディアムミントINT整数unsigned の場合、TIDB_TYPE は INT UNSIGNED です。
INTINT整数unsigned の場合、TIDB_TYPE は INT UNSIGNED で、AVRO_TYPE は long です。
BIGINTBIGINT長いです署名されていない場合、TIDB_TYPE は BIGINT UNSIGNED です。 avro-bigint-unsigned-handling-modeが文字列の場合、AVRO_TYPE は文字列です。
小さな塊BLOBバイト
BLOBBLOBバイト
ミディアムブロブBLOBバイト
ロングブロブBLOBバイト
バイナリBLOBバイト
VARBINARYBLOBバイト
小さなテキスト文章ストリング
文章文章ストリング
中文文章ストリング
ロングテキスト文章ストリング
CHAR文章ストリング
VARCHAR文章ストリング
浮く浮くダブル
ダブルダブルダブル
日にち日にちストリング
日付時刻日付時刻ストリング
タイムスタンプタイムスタンプストリング
時間時間ストリング
整数
少し少しバイト
JSONJSONストリング
列挙型列挙型ストリング
設定設定ストリング
小数小数バイトavro-decimal-handling-modeが文字列の場合、AVRO_TYPE は文字列です。

Avro プロトコルでは、他の 2 つのsink-uriパラメーター ( avro-decimal-handling-modeおよびavro-bigint-unsigned-handling-mode ) もカラムデータ形式に影響を与える可能性があります。

  • avro-decimal-handling-modeは、Avro が以下を含む小数フィールドを処理する方法を制御します。

    • string: Avro は 10 進数フィールドを文字列として処理します。
    • 正確: Avro は 10 進数フィールドをバイトとして処理します。
  • avro-bigint-unsigned-handling-modeは、次のような BIGINT UNSIGNED フィールドを Avro が処理する方法を制御します。

    • string: Avro は BIGINT UNSIGNED フィールドを文字列として処理します。
    • long: Avro は BIGINT UNSIGNED フィールドを 64 ビット符号付き整数として処理します。値が9223372036854775807より大きい場合、オーバーフローが発生します。

次に設定例を示します。

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro-string-option" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

ほとんどの SQL タイプは、基本カラムデータ形式にマップされます。他の一部の SQL タイプは、基本データ形式を拡張して、より多くの情報を提供します。

ビット(64)

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"BIT",
            "length":"64"
        },
        "type":"bytes"
    }
}

ENUM/SET(a,b,c)

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"ENUM/SET",
            "allowed":"a,b,c"
        },
        "type":"string"
    }
}

DECIMAL(10, 4)

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"DECIMAL",
        },
        "logicalType":"decimal",
        "precision":10,
        "scale":4,
        "type":"bytes"
    }
}

DDL イベントとスキーマの変更

Avro はダウンストリームの DDL イベントを生成しません。 DML イベントが発生するたびにスキーマが変更されているかどうかを確認します。スキーマが変更されると、Avro は新しいスキーマを生成し、スキーマ レジストリに登録します。スキーマの変更が互換性チェックに合格しない場合、登録は失敗します。 Avro はスキーマの互換性の問題を解決しません。

スキーマの変更が互換性チェックに合格し、新しいバージョンが登録された場合でも、データのプロデューサーとコンシューマーは、システムを正常に実行するためにアップグレードを実行する必要があることに注意してください。

Confluent Schema Registry のデフォルトの互換性ポリシーがBACKWARDであると仮定し、ソース テーブルに空でない列を追加します。この状況では、Avro は新しいスキーマを生成しますが、互換性の問題によりスキーマ レジストリへの登録に失敗します。このとき、changefeed はエラー状態になります。

スキーマの詳細については、 スキーマ レジストリ関連ドキュメントを参照してください。

トピックの配布

スキーマ レジストリは、TopicNameStrategy、RecordNameStrategy、および TopicRecordNameStrategy の 3 つサブジェクト名戦略をサポートします。現在、TiCDC Avro は TopicNameStrategy のみをサポートしています。つまり、Kafka トピックは 1 つのデータ形式でしかデータを受信できません。したがって、TiCDC Avro では、複数のテーブルを同じトピックにマッピングすることを禁止しています。変更フィードを作成するときに、構成された配布ルールにトピック ルールに{schema}{table}のプレースホルダーが含まれていない場合、エラーが報告されます。

エコシステム
TiDB
TiKV
TiSpark
Chaos Mesh
© 2022 PingCAP. All Rights Reserved.