重要
このページは英語版のページを機械翻訳しています。原文はこちらからご覧ください。

TiCDCAvroプロトコル

Avroは、 ApacheAvro™で定義され、デフォルトのデータ交換フォーマットとしてコンフルエントなプラットフォームで選択されるデータ交換フォーマットプロトコルです。このドキュメントでは、TiDB拡張フィールド、Avroデータ形式の定義、Avroとコンフルエントなスキーマレジストリの間の相互作用など、TiCDCでのAvroデータ形式の実装について説明します。

Avroを使用する

メッセージキュー(MQ)をダウンストリームシンクとして使用する場合、 sink-uriでAvroを指定できます。 TiCDCは、TiDB DMLイベントをキャプチャし、これらのイベントからAvroメッセージを作成して、メッセージをダウンストリームに送信します。 Avroはスキーマの変更を検出すると、最新のスキーマをスキーマレジストリに登録します。

以下は、Avroを使用した構成例です。

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

--schema-registryは、 httpsプロトコルとusername:password認証をサポートします(例: --schema-registry=https://username:password@schema-registry-uri.com )。ユーザー名とパスワードはURLエンコードする必要があります。

TiDB拡張フィールド

デフォルトでは、AvroはDMLイベントで変更された行のデータのみを収集し、データ変更のタイプまたはTiDB固有のCommitTS(トランザクションの一意の識別子)を収集しません。この問題に対処するために、TiCDCは次の3つのTiDB拡張フィールドをAvroプロトコルメッセージに導入します。 sink-urienable-tidb-extensiontrue (デフォルトではfalse )に設定されている場合、TiCDCはメッセージ生成中にこれらの3つのフィールドをAvroメッセージに追加します。

  • _tidb_op :DMLタイプ。 「c」は挿入を示し、「u」は更新を示します。
  • _tidb_commit_ts :トランザクションの一意の識別子。
  • _tidb_commit_physical_time :トランザクション識別子の物理タイムスタンプ。

構成例を次に示します。

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

データ形式の定義

TiCDCはDMLイベントをKafkaイベントに変換し、イベントのキーと値はAvroプロトコルに従ってエンコードされます。

キーデータ形式

{
    "name":"{{TableName}}",
    "namespace":"{{Namespace}}",
    "type":"record",
    "fields":[
        {{ColumnValueBlock}},
        {{ColumnValueBlock}},
    ]
}
  • {{TableName}}は、イベントが発生するテーブルの名前を示します。
  • {{Namespace}}はAvroの名前空間です。
  • {{ColumnValueBlock}}は、データの各列の形式を定義します。

キーのfieldsには、主キー列または一意のインデックス列のみが含まれます。

値のデータ形式

{
    "name":"{{TableName}}",
    "namespace":"{{Namespace}}",
    "type":"record",
    "fields":[
        {{ColumnValueBlock}},
        {{ColumnValueBlock}},
    ]
}

デフォルトでは、Valueのデータ形式はKeyのデータ形式と同じです。ただし、値のfieldsには、主キー列だけでなく、すべての列が含まれます。

enable-tidb-extensionを有効にすると、値のデータ形式は次のようになります。

{
    "name":"{{TableName}}",
    "namespace":"{{Namespace}}",
    "type":"record",
    "fields":[
        {{ColumnValueBlock}},
        {{ColumnValueBlock}},
        {
            "name":"_tidb_op",
            "type":"string"
        },
        {
            "name":"_tidb_commit_ts",
            "type":"long"
        },
        {
            "name":"_tidb_commit_physical_time",
            "type":"long"
        }
    ]
}

enable-tidb-extensionが無効になっている値データ形式と比較して、 _tidb_op 、および_tidb_commit_physical_time_tidb_commit_tsつの新しいフィールドが追加されています。

カラムデータ形式

カラムデータは、キー/値データ形式の{{ColumnValueBlock}}の部分です。 TiCDCは、SQLタイプに基づいてカラムデータ形式を生成します。基本的なカラムデータ形式は次のとおりです。

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"{{TIDB_TYPE}}"
        },
        "type":"{{AVRO_TYPE}}"
    }
}

1つの列をNULLにすることができる場合、カラムのデータ形式は次のようになります。

{
    "default":null,
    "name":"{{ColumnName}}",
    "type":[
        "null",
        {
            "connect.parameters":{
                "tidb_type":"{{TIDB_TYPE}}"
            },
            "type":"{{AVRO_TYPE}}"
        }
    ]
}
  • {{ColumnName}}は列名を示します。
  • {{TIDB_TYPE}}は、TiDBのタイプを示します。これは、SQLタイプとの1対1のマッピングではありません。
  • {{AVRO_TYPE}}avroスペックのタイプを示します。
SQLタイプTIDB_TYPEAVRO_TYPE説明
BOOLINTint
TINYINTINTint符号なしの場合、TIDB_TYPEはINTUNSIGNEDです。
SMALLINTINTint符号なしの場合、TIDB_TYPEはINTUNSIGNEDです。
MEDIUMINTINTint符号なしの場合、TIDB_TYPEはINTUNSIGNEDです。
INTINTint符号なしの場合、TIDB_TYPEはINT UNSIGNEDであり、AVRO_TYPEは長いです。
BIGINTBIGINT長いです符号なしの場合、TIDB_TYPEはBIGINTUNSIGNEDです。 avro-bigint-unsigned-handling-modeが文字列の場合、AVRO_TYPEは文字列です。
TINYBLOBBLOBバイト
BLOBBLOBバイト
MEDIUMBLOBBLOBバイト
LONGBLOBBLOBバイト
バイナリBLOBバイト
VARBINARYBLOBバイト
TINYTEXT文章ストリング
文章文章ストリング
MEDIUMTEXT文章ストリング
LONGTEXT文章ストリング
CHAR文章ストリング
VARCHAR文章ストリング
浮く浮くダブル
ダブルダブルダブル
日にち日にちストリング
日付時刻日付時刻ストリング
タイムスタンプタイムスタンプストリング
時間時間ストリング
int
少し少しバイト
JSONJSONストリング
ENUMENUMストリング
設定設定ストリング
10進数10進数バイトavro-decimal-handling-modeが文字列の場合、AVRO_TYPEは文字列です。

Avroプロトコルでは、他の2つのsink-uriパラメーター( avro-decimal-handling-modeavro-bigint-unsigned-handling-mode )もカラムデータ形式に影響を与える可能性があります。

  • avro-decimal-handling-modeは、Avroが次のような10進フィールドを処理する方法を制御します。

    • string:Avroは10進フィールドを文字列として処理します。
    • 正確:Avroは10進フィールドをバイトとして処理します。
  • avro-bigint-unsigned-handling-modeは、Avroが次のようなBIGINTUNSIGNEDフィールドを処理する方法を制御します。

    • 文字列:AvroはBIGINTUNSIGNEDフィールドを文字列として処理します。
    • long:AvroはBIGINTUNSIGNEDフィールドを64ビットの符号付き整数として処理します。値が9223372036854775807より大きい場合、オーバーフローが発生します。

構成例を次に示します。

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro-string-option" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

ほとんどのSQLタイプは、基本のカラムデータ形式にマップされます。他のいくつかのSQLタイプは、基本データ形式を拡張してより多くの情報を提供します。

BIT(64)

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"BIT",
            "length":"64"
        },
        "type":"bytes"
    }
}

ENUM / SET(a、b、c)

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"ENUM/SET",
            "allowed":"a,b,c"
        },
        "type":"string"
    }
}

DECIMAL(10、4)

{
    "name":"{{ColumnName}}",
    "type":{
        "connect.parameters":{
            "tidb_type":"DECIMAL",
        },
        "logicalType":"decimal",
        "precision":10,
        "scale":4,
        "type":"bytes"
    }
}

DDLイベントとスキーマの変更

AvroはダウンストリームでDDLイベントを生成しません。 DMLイベントが発生するたびにスキーマが変更されるかどうかをチェックします。スキーマが変更されると、Avroは新しいスキーマを生成し、それをスキーマレジストリに登録します。スキーマの変更が互換性チェックに合格しない場合、登録は失敗します。 Avroは、スキーマの互換性の問題を解決しません。

スキーマの変更が互換性チェックに合格し、新しいバージョンが登録された場合でも、システムの正常な実行を保証するために、データプロデューサーとコンシューマーはアップグレードを実行する必要があることに注意してください。

Confluent Schema Registryのデフォルトの互換性ポリシーがBACKWARDであると想定し、空でない列をソーステーブルに追加します。この状況では、Avroは新しいスキーマを生成しますが、互換性の問題のためにスキーマレジストリへの登録に失敗します。このとき、チェンジフィードはエラー状態になります。

スキーマの詳細については、 スキーマレジストリ関連ドキュメントを参照してください。

トピックの配布

スキーマレジストリは、TopicNameStrategy、RecordNameStrategy、およびTopicRecordNameStrategyの主題名戦略つをサポートします。現在、TiCDC AvroはTopicNameStrategyのみをサポートしています。つまり、Kafkaトピックは1つのデータ形式でのみデータを受信できます。したがって、TiCDC Avroは、複数のテーブルを同じトピックにマッピングすることを禁止しています。チェンジフィードを作成するときに、トピックルールに構成済みの配布ルールに{schema}{table}のプレースホルダーが含まれていない場合、エラーが報告されます。