TiCDC Debezium Protocol

Debezium 是一个用于捕获数据库变更的工具。它会将捕获的数据库变更的每一条记录转化为一个被称为“事件” (event) 的消息 (message),并将这些事件发送到 Kafka 中。从 v8.0.0 起,TiCDC 支持将 TiDB 的变更以 Debezium 的格式直接传输到 Kafka,为之前使用 Debezium 的 MySQL 集成的用户简化了从 MySQL 数据库迁移的过程。从 v9.0.0 起,TiCDC 支持 DDL 事件和 WATERMARK 事件。

使用 Debezium 消息格式

当使用 Kafka 作为下游 Sink 时,你可以将 sink-uriprotocol 字段指定为 debezium,TiCDC 将以 event 为基本单位封装构造 Debezium 消息,向下游发送 TiDB 的数据变更事件。

Debezium 协议支持以下类型的事件:

  • DDL 事件:表示 DDL 变更记录。在上游 DDL 语句成功执行后,DDL 事件被发送到每个 MQ (Message Queue) 分区。
  • DML 事件:表示一行数据变更记录。在行变更发生时,DML 事件被发出,包含变更后该行的相关信息。
  • WATERMARK 事件:表示一个特殊的时间点。在这个时间点之前收到的事件是完整的。仅适用于 TiDB 扩展字段,当你在 sink-uri 中设置 enable-tidb-extensiontrue 时生效。

使用 Debezium 消息格式时的配置样例如下所示:

cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-debezium" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=debezium"

Debezium 输出格式中包含当前行的 Schema 信息,以便下游消费者更好地理解当前行的数据结构。对于不需要输出 Schema 信息的场景,也可以通过在 changefeed 的配置文件或者 sink-uri 中将 debezium-disable-schema 参数设置为 true 来关闭 Schema 信息的输出。

此外,Debezium 原有格式中并不包含 TiDB 专有的 CommitTS 事务唯一标识等重要字段。为了保证数据的完整性,TiCDC 在 Debezium 格式中增加了 CommitTsClusterID 两个字段,用于标识 TiDB 数据变更的相关信息。

消息格式定义

本节介绍 DDL 事件、DML 事件和 WATERMARK 事件的消息格式。

DDL 事件

TiCDC 会将一个 DDL 事件转换为一条 Kafka 消息,其中消息的 key 和 value 都按照 Debezium 协议进行编码。

Key 数据格式

{ "payload": { "databaseName": "test" }, "schema": { "type": "struct", "name": "io.debezium.connector.mysql.SchemaChangeKey", "optional": false, "version": 1, "fields": [ { "field": "databaseName", "optional": false, "type": "string" } ] } }

Key 中的字段仅包含数据库名称。字段解释如下:

字段类型说明
payloadJSON数据库名称信息。
schema.fieldsJSONpayload 中各个字段的类型信息。
schema.type字符串字段类型。
schema.optional布尔值该字段是否为选填项。值为 true 表示该字段为选填项。
schema.version字符串schema 的版本。

Value 数据格式

{ "payload": { "source": { "version": "2.4.0.Final", "connector": "TiCDC", "name": "test_cluster", "ts_ms": 0, "snapshot": "false", "db": "test", "table": "table1", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": 0, "query": null, "commit_ts": 1, "cluster_id": "test_cluster" }, "ts_ms": 1701326309000, "databaseName": "test", "schemaName": null, "ddl": "RENAME TABLE test.table1 to test.table2", "tableChanges": [ { "type": "ALTER", "id": "\"test\".\"table2\",\"test\".\"table1\"", "table": { "defaultCharsetName": "", "primaryKeyColumnNames": [ "id" ], "columns": [ { "name": "id", "jdbcType": 4, "nativeType": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "typeName": "INT", "typeExpression": "INT", "charsetName": null, "length": 0, "scale": null, "position": 1, "optional": false, "autoIncremented": false, "generated": false } ], "comment": null } } ] }, "schema": { "optional": false, "type": "struct", "version": 1, "name": "io.debezium.connector.mysql.SchemaChangeValue", "fields": [ { "field": "source", "name": "io.debezium.connector.mysql.Source", "optional": false, "type": "struct", "fields": [ { "field": "version", "optional": false, "type": "string" }, { "field": "connector", "optional": false, "type": "string" }, { "field": "name", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": false, "type": "int64" }, { "field": "snapshot", "optional": true, "type": "string", "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "name": "io.debezium.data.Enum", "version": 1 }, { "field": "db", "optional": false, "type": "string" }, { "field": "sequence", "optional": true, "type": "string" }, { "field": "table", "optional": true, "type": "string" }, { "field": "server_id", "optional": false, "type": "int64" }, { "field": "gtid", "optional": true, "type": "string" }, { "field": "file", "optional": false, "type": "string" }, { "field": "pos", "optional": false, "type": "int64" }, { "field": "row", "optional": false, "type": "int32" }, { "field": "thread", "optional": true, "type": "int64" }, { "field": "query", "optional": true, "type": "string" } ] }, { "field": "ts_ms", "optional": false, "type": "int64" }, { "field": "databaseName", "optional": true, "type": "string" }, { "field": "schemaName", "optional": true, "type": "string" }, { "field": "ddl", "optional": true, "type": "string" }, { "field": "tableChanges", "optional": false, "type": "array", "items": { "name": "io.debezium.connector.schema.Change", "optional": false, "type": "struct", "version": 1, "fields": [ { "field": "type", "optional": false, "type": "string" }, { "field": "id", "optional": false, "type": "string" }, { "field": "table", "optional": true, "type": "struct", "name": "io.debezium.connector.schema.Table", "version": 1, "fields": [ { "field": "defaultCharsetName", "optional": true, "type": "string" }, { "field": "primaryKeyColumnNames", "optional": true, "type": "array", "items": { "type": "string", "optional": false } }, { "field": "columns", "optional": false, "type": "array", "items": { "name": "io.debezium.connector.schema.Column", "optional": false, "type": "struct", "version": 1, "fields": [ { "field": "name", "optional": false, "type": "string" }, { "field": "jdbcType", "optional": false, "type": "int32" }, { "field": "nativeType", "optional": true, "type": "int32" }, { "field": "typeName", "optional": false, "type": "string" }, { "field": "typeExpression", "optional": true, "type": "string" }, { "field": "charsetName", "optional": true, "type": "string" }, { "field": "length", "optional": true, "type": "int32" }, { "field": "scale", "optional": true, "type": "int32" }, { "field": "position", "optional": false, "type": "int32" }, { "field": "optional", "optional": true, "type": "boolean" }, { "field": "autoIncremented", "optional": true, "type": "boolean" }, { "field": "generated", "optional": true, "type": "boolean" }, { "field": "comment", "optional": true, "type": "string" }, { "field": "defaultValueExpression", "optional": true, "type": "string" }, { "field": "enumValues", "optional": true, "type": "array", "items": { "type": "string", "optional": false } } ] } }, { "field": "comment", "optional": true, "type": "string" } ] } ] } } ] } }

以上 JSON 数据的重点字段解释如下:

字段类型说明
payload.ts_ms数值TiCDC 生成这条信息的时间戳(毫秒级别)。
payload.ddl字符串DDL 事件的 SQL 语句。
payload.databaseName字符串事件发生的数据库的名称。
payload.source.commit_ts数值该事件的 CommitTs 值。
payload.source.db字符串事件发生的数据库的名称。
payload.source.table字符串事件发生的数据表的名称。
payload.tableChanges数组在 schema 变更后的整个表 schema 的结构化表示。tableChanges 字段包含一个数组,其中包括表中每一列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者可以在不通过 DDL 解析器处理的情况下轻松读取消息。
payload.tableChanges.type字符串描述变更的类型。值为以下之一:CREATE,表示表已创建;ALTER,表示表已修改;DROP,表示表已删除。
payload.tableChanges.id字符串被创建、修改或删除的表的完整标识符。如果是表重命名,则该标识符是 <old><new> 表名的拼接。
payload.tableChanges.table.defaultCharsetName字符串事件发生的表的字符集。
payload.tableChanges.table.primaryKeyColumnNames字符串组成表主键的列的名称列表。
payload.tableChanges.table.columns数组变更的表中每一列的元数据。
payload.tableChanges.table.columns.name字符串列的名称。
payload.tableChanges.table.columns.jdbcType数值列的 JDBC 类型。
payload.tableChanges.table.columns.comment字符串列的注释。
payload.tableChanges.table.columns.defaultValueExpression字符串列的默认值。
payload.tableChanges.table.columns.enumValues字符串列的枚举值。格式为 ['e1', 'e2']
payload.tableChanges.table.columns.charsetName字符串列的字符集。
payload.tableChanges.table.columns.length数值列的长度。
payload.tableChanges.table.columns.scale数值列的精度。
payload.tableChanges.table.columns.position数值列的位置。
payload.tableChanges.table.columns.optional布尔值是否为可选列。值为 true 表示为可选列。
schema.fieldsJSONpayload 每个字段的类型信息,包括变更表的列 schema 信息。
schema.name字符串schema 的名称,格式为 "{cluster-name}.{schema-name}.{table-name}.SchemaChangeValue"
schema.optional布尔值该字段是否为选填项。值为 true 表示该字段为选填项。
schema.type字符串字段的数据类型。

DML 事件

TiCDC 会将一个 DML 事件转换为一条 Kafka 消息,其中消息的 key 和 value 都按照 Debezium 协议进行编码。

Key 数据格式

{ "payload": { "tiny": 1 }, "schema": { "fields": [ { "field":"tiny", "optional":true, "type":"int16" } ], "name": "test_cluster.test.table1.Key", "optional": false, "type":"struct" } }

Key 中的字段只包含主键或唯一索引列。字段解释如下:

字段类型说明
payloadJSON主键或唯一索引列的信息。每个字段的 key 和 value 分别为列名和当前值。
schema.fieldsJSONpayload 中各个字段的类型信息,包括对应行数据变更前后 schema 的信息。
schema.name字符串schema 的名称,格式为 "{cluster-name}.{schema-name}.{table-name}.Key"
schema.optional布尔值该字段是否为选填项。值为 true 表示该字段为选填项。
schema.type字符串字段的数据类型。

Value 数据格式

{ "payload": { "source": { "version": "2.4.0.Final", "connector": "TiCDC", "name": "test_cluster", "ts_ms": 0, "snapshot": "false", "db": "test", "table": "table1", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": 0, "query": null, "commit_ts": 1, "cluster_id": "test_cluster" }, "ts_ms": 1701326309000, "transaction": null, "op": "u", "before": { "tiny": 2 }, "after": { "tiny": 1 } }, "schema": { "type": "struct", "optional": false, "name": "test_cluster.test.table1.Envelope", "version": 1, "fields": [ { "type": "struct", "optional": true, "name": "test_cluster.test.table1.Value", "field": "before", "fields": [{ "type": "int16", "optional": true, "field": "tiny" }] }, { "type": "struct", "optional": true, "name": "test_cluster.test.table1.Value", "field": "after", "fields": [{ "type": "int16", "optional": true, "field": "tiny" }] }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "name": "event.block", "version": 1, "field": "transaction" } ] } }

以上 JSON 数据的重点字段解释如下:

字段类型说明
payload.op字符串变更事件类型。"c" 表示 INSERT 事件,"u" 表示 UPDATE 事件,"d" 表示 DELETE 事件。
payload.ts_ms数值TiCDC 生成这条信息的时间戳(毫秒级别)。
payload.beforeJSON这条事件语句变更前的数据值。对于 "c" 事件,before 字段的值为 null
payload.afterJSON这条事件语句变更后的数据值。对于 "d" 事件,after 字段的值为 null
payload.source.commit_ts数值该事件的 CommitTs 值。
payload.source.db字符串事件发生的数据库的名称。
payload.source.table字符串事件发生的数据表的名称。
schema.fieldsJSONpayload 中各个字段的类型信息,包括对应行数据变更前后 schema 的信息。
schema.fields[1].fields[n].tidb_type字符串payload.after 中每列的 TiDB 类型。仅在 enable-tidb-extension = true 时存在。
schema.name字符串schema 的名称,格式为 "{cluster-name}.{schema-name}.{table-name}.Envelope"
schema.optional布尔值该字段是否为选填项。值为 true 表示该字段为选填项。
schema.type字符串字段的类型。

WATERMARK 事件

TiCDC 会将一个 WATERMARK 事件转换为一条 Kafka 消息,其中消息的 key 和 value 都按照 Debezium 协议进行编码。

Key 数据格式

{ "payload": {}, "schema": { "fields": [], "optional": false, "name": "test_cluster.watermark.Key", "type": "struct" } }

Key 中的字段解释如下:

字段类型说明
schema.name字符串schema 的名称,格式为 "{cluster-name}.watermark.Key"

Value 数据格式

{ "payload": { "source": { "version": "2.4.0.Final", "connector": "TiCDC", "name": "test_cluster", "ts_ms": 0, "snapshot": "false", "db": "", "table": "", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": 0, "query": null, "commit_ts": 3, "cluster_id": "test_cluster" }, "op": "m", "ts_ms": 1701326309000, "transaction": null }, "schema": { "type": "struct", "optional": false, "name": "test_cluster.watermark.Envelope", "version": 1, "fields": [ { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "string", "optional": true, "name": "io.debezium.data.Enum", "version": 1, "parameters": { "allowed": "true,last,false,incremental" }, "default": "false", "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": true, "field": "sequence" }, { "type": "string", "optional": true, "field": "table" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "query" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "id" }, { "type": "int64", "optional": false, "field": "total_order" }, { "type": "int64", "optional": false, "field": "data_collection_order" } ], "optional": true, "name": "event.block", "version": 1, "field": "transaction" } ] } }

以上 JSON 数据的重点字段解释如下:

字段类型说明
payload.op字符串变更事件类型。"m" 表示 WATERMARK 事件。
payload.ts_ms数值TiCDC 生成这条信息的时间戳(毫秒级别)。
payload.source.commit_ts数值该事件的 CommitTs 值。
payload.source.db字符串事件发生的数据库的名称。
payload.source.table字符串事件发生的数据表的名称。
schema.fieldsJSONpayload 中各个字段的类型信息,包括对应行数据变更前后 schema 的信息。
schema.name字符串schema 的名称,格式为 "{cluster-name}.watermark.Envelope"
schema.optional布尔值该字段是否为选填项。值为 true 表示该字段为选填项。
schema.type字符串字段的类型。

数据类型映射

TiCDC Debezium 消息中的数据格式映射基本遵循 Debezium 的数据类型映射规则,与 Debezium Connector for MySQL 原生消息大体一致。但是对于部分数据类型,TiCDC Debezium 的处理方式与 Debezium Connector Message 存在一定差异,具体如下:

  • 目前 TiDB 不支持空间数据类型,包括 GEOMETRY、LINESTRING、POLYGON、MULTIPOINT、MULTILINESTRING、MULTIPOLYGON、GEOMETRYCOLLECTION。

  • 对于 String-likes 的数据类型,包括 Varchar、String、VarString、TinyBlob、MediumBlob、BLOB、LongBlob 等,当该列具有 BINARY 标志时,TiCDC 会将其按照 Base64 编码后以 String 类型表示;当该列没有 BINARY 标志时,TiCDC 则直接将其编码为 String 类型。而原生的 Debezium Connector 会根据 binary.handling.mode 以不同的编码方式进行编码。

  • 对于 Decimal 数据类型,包括 DECIMAL 和 NUMERIC,TiCDC 均会使用 float64 类型来表示。而原生的 Debezium Connector 会根据数据类型的不同精度采用 float32 或者 float64 的方式进行编码。

  • TiCDC 将 REAL 转换为 DOUBLE;当长度为 1 时,将 BOOLEAN 转换为 TINYINT(1)。

  • 在 TiCDC 中,BLOB、TEXT、GEOMETRY、JSON 列没有默认值。

  • Debezium 将 FLOAT 类型的 "5.61" 转换为 "5.610000133514404",但 TiCDC 不会。

  • TiCDC 在处理 FLOAT 时打印了错误的 flen tidb#57060

  • 当列的排序规则为 utf8_unicode_ci 且字符集为 null 时,Debezium 将 charsetName 转换为 "utf8mb4",但 TiCDC 不会。

  • TiCDC 将 ENUM 元素中的 \ 视为转义引号,但 Debezium 不会。例如,TiCDC 将 ENUM 元素 ("c,\'d','g,''h") 编码为 ('c,'d', 'g,''h')

  • TiCDC 将 TIME 类型的默认值如 '1000-00-00 01:00:00.000' 转换为 "1000-00-00",但 Debezium 不会。

文档内容是否有帮助?