重要

你正在查看 TiDB v6.0 (DMR) 的文档。PingCAP 不提供基于 v6.0 的 bug 修复版本,如有 bug,会在后续版本中修复。

如无特殊需求,建议使用 TiDB 数据库的最新 LTS 版本

TiCDC Canal-JSON Protocol

Canal-JSON 是由 Alibaba Canal 定义的一种数据交换格式协议。通过本文,你可以了解 TiCDC 对 Canal-JSON 数据格式的实现,包括 TiDB 扩展字段、Canal-JSON 数据格式定义,以及和官方实现进行对比等相关内容。

使用 Canal-JSON

当使用 MQ (Message Queue) 作为下游 Sink 时,你可以在 sink-uri 中指定使用 Canal-JSON,TiCDC 将以 Event 为基本单位封装构造 Canal-JSON Message,向下游发送 TiDB 的数据变更事件。

Event 分为三类:

  • DDL Event:代表 DDL 变更记录,在上游成功执行 DDL 语句后发出,DDL Event 会被发送到索引为 0 的 MQ Partition。
  • DML Event:代表一行数据变更记录,在行变更发生时该类 Event 被发出,包含变更后该行的相关信息。
  • WATERMARK Event:代表一个特殊的时间点,表示在这个时间点前收到的 Event 是完整的。仅适用于 TiDB 扩展字段,当你在 sink-uri 中设置 enable-tidb-extension=true 时生效。

使用 Canal-JSON 时的配置样例如下所示:

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-canal-json" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json"

TiDB 扩展字段

Canal-JSON 协议本是为 MySQL 设计的,其中并不包含 TiDB 专有的 CommitTS 事务唯一标识等重要字段。为了解决这个问题,TiCDC 在 Canal-JSON 协议格式中附加了 TiDB 扩展字段。在 sink-uri 中设置 enable-tidb-extensiontrue 后,TiCDC 生成 Canal-JSON 消息时的行为如下:

  • TiCDC 发送的 DML Event 和 DDL Event 类型消息中,将会含有一个名为 _tidb 的字段。
  • TiCDC 将会发送 WATERMARK Event 消息。·

配置样例如下所示:

cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-canal-json-enable-tidb-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json&enable-tidb-extension=true"

enable-tidb-extension 默认为 false,仅当使用 Canal-JSON 时生效。

Message 格式定义

下面介绍 DDL Event、DML Event 和 WATERMARK Event 的格式定义,以及消费端的数据解析。

DDL Event

TiCDC 会把一个 DDL Event 编码成如下 Canal-JSON 格式:

{
    "id": 0,
    "database": "test",
    "table": "",
    "pkNames": null,
    "isDdl": true,
    "type": "QUERY",
    "es": 1639633094670,
    "ts": 1639633095489,
    "sql": "drop database if exists test",
    "sqlType": null,
    "mysqlType": null,
    "data": null,
    "old": null,
    "_tidb": {     // TiDB 的扩展字段
        "commitTs": 163963309467037594
    }
}

以上 JSON 数据的字段解释如下:

字段类型说明
idNumberTiCDC 默认值为 0
databaseStringRow 所在的 Database 的名字
tableStringRow 所在的 Table 的名字
pkNamesArray组成 primary key 的所有列的名字
isDdlBool该条消息是否为 DDL 事件
typeStringCanal-JSON 定义的事件类型
esNumber产生该条消息的事件发生时的 13 位(毫秒级)时间戳
tsNumberTiCDC 生成该条消息时的 13 位(毫秒级)时间戳
sqlString当 isDdl 为 true 时,记录对应的 DDL 语句
sqlTypeObject当 isDdl 为 false 时,记录每一列数据类型在 Java 中的类型表示
mysqlTypeobject当 isDdl 为 false 时,记录每一列数据类型在 MySQL 中的类型表示
dataObject当 isDdl 为 false 时,记录每一列的名字及其数据值
oldObject仅当该条消息由 Update 类型事件产生时,记录每一列的名字,和 Update 之前的数据值
_tidbObjectTiDB 扩展字段,仅当 enable-tidb-extension 为 true 时才会存在。其中的 commitTs 值为造成 Row 变更的事务的 TSO

DML Event

对于一行 DML 数据变更事件,TiCDC 会将其编码成如下形式:

{
    "id": 0,
    "database": "test",
    "table": "tp_int",
    "pkNames": [
        "id"
    ],
    "isDdl": false,
    "type": "INSERT",
    "es": 1639633141221,
    "ts": 1639633142960,
    "sql": "",
    "sqlType": {
        "c_bigint": -5,
        "c_int": 4,
        "c_mediumint": 4,
        "c_smallint": 5,
        "c_tinyint": -6,
        "id": 4
    },
    "mysqlType": {
        "c_bigint": "bigint",
        "c_int": "int",
        "c_mediumint": "mediumint",
        "c_smallint": "smallint",
        "c_tinyint": "tinyint",
        "id": "int"
    },
    "data": [
        {
            "c_bigint": "9223372036854775807",
            "c_int": "2147483647",
            "c_mediumint": "8388607",
            "c_smallint": "32767",
            "c_tinyint": "127",
            "id": "2"
        }
    ],
    "old": null,
    "_tidb": {     // TiDB 的扩展字段
        "commitTs": 163963314122145239
    }
}

WATERMARK Event

仅当 enable-tidb-extensiontrue 时,TiCDC 才会发送 WATERMARK Event,其 type 字段值为 TIDB_WATERMARK。该类型事件具有 _tidb 字段,当前只含有 watermarkTs,其值为该 Event 发送时的 TSO。

当你收到一个该类型的事件,所有 commitTs 小于 watermarkTs 的事件均已发送完毕。因为 TiCDC 提供 At Least Once 语义,可能出现重复发送数据的情况。如果后续收到有 commitTs 小于 watermarkTs 的事件,可以忽略。

WATERMARK Event 的示例如下:

{
    "id": 0,
    "database": "",
    "table": "",
    "pkNames": null,
    "isDdl": false,
    "type": "TIDB_WATERMARK",
    "es": 1640007049196,
    "ts": 1640007050284,
    "sql": "",
    "sqlType": null,
    "mysqlType": null,
    "data": null,
    "old": null,
    "_tidb": {     // TiDB 的扩展字段
        "watermarkTs": 429918007904436226
    }
}

消费端数据解析

从上面的示例中可知,Canal-JSON 具有统一的数据格式,针对不同的事件类型,有不同的字段填充规则。消费者可以使用统一的方法对该 JSON 格式的数据进行解析,然后通过判断字段值的方式,来确定具体事件类型:

  • isDdl 为 true 时,该消息含有一条 DDL Event。
  • isDdl 为 false 时,需要对 type 字段加以判断。如果 typeTIDB_WATERMARK,可得知其为 WATERMARK Event,否则就是 DML Event。

字段说明

Canal-JSON 格式会在 mysqlType 字段和 sqlType 字段中记录对应的数据类型。

MySQL Type 字段

Canal-JSON 格式会在 mysqlType 字段中记录每一列的 MySQL Type 的字符串表示。相关详情可以参考 TiDB Data Types

SQL Type 字段

Canal-JSON 格式会在 sqlType 字段中记录每一列的 Java SQL Type,即每条数据在 JDBC 中对应的数据类型,其值可以通过 MySQL Type 和具体数据值计算得到。具体对应关系如下:

MySQL TypeJava SQL Type Code
Boolean-6
Float7
Double8
Decimal3
Char1
Varchar12
Binary2004
Varbinary2004
Tinytext2005
Text2005
Mediumtext2005
Longtext2005
Tinyblob2004
Blob2004
Mediumblob2004
Longblob2004
Date91
Datetime93
Timestamp93
Time92
Year12
Enum4
Set-7
Bit-7
JSON12

整数类型

你需要考虑整数类型是否有 Unsigned 约束,以及当前取值大小,分别对应不同的 Java SQL Type Code。如下表所示。

MySQL Type StringValue RangeJava SQL Type Code
tinyint[-128, 127]-6
tinyint unsigned[0, 127]-6
tinyint unsigned[128, 255]5
smallint[-32768, 32767]5
smallint unsigned[0, 32767]5
smallint unsigned[32768, 65535]4
mediumint[-8388608, 8388607]4
mediumint unsigned[0, 8388607]4
mediumint unsigned[8388608, 16777215]4
int[-2147483648, 2147483647]4
int unsigned[0, 2147483647]4
int unsigned[2147483648, 4294967295]-5
bigint[-9223372036854775808, 9223372036854775807]-5
bigint unsigned[0, 9223372036854775807]-5
bigint unsigned[9223372036854775808, 18446744073709551615]3

TiCDC 涉及的 Java SQL Type 及其 Code 映射关系如下表所示。

Java SQL TypeJava SQL Type Code
CHAR1
DECIMAL3
INTEGER4
SMALLINT5
REAL7
DOUBLE8
VARCHAR12
DATE91
TIME92
TIMESTAMP93
BLOB2004
CLOB2005
BIGINT-5
TINYINT-6
Bit-7

想要了解 Java SQL Type 的更多信息,请参考 Java SQL Class Types

TiCDC Canal-JSON 和 Canal 官方实现对比

TiCDC 对 Canal-JSON 数据格式的实现,包括 Update 类型事件和 mysqlType 字段,和官方有些许不同。主要差异见下表。

差异点TiCDCCanal
Update 类型事件old 字段包含所有列数据old 字段仅包含被修改的列数据
mysqlType 字段对于含有参数的类型,没有类型参数信息对于含有参数的类型,会包含完整的参数信息

Update 类型事件

对于 Update 类型事件,Canal 官方实现中,old 字段仅包含被修改的列数据,而 TiCDC 的实现则包含所有列数据。

假设在上游 TiDB 按顺序执行如下 SQL 语句:

create table tp_int
(
    id          int auto_increment,
    c_tinyint   tinyint   null,
    c_smallint  smallint  null,
    c_mediumint mediumint null,
    c_int       int       null,
    c_bigint    bigint    null,
    constraint pk
        primary key (id)
);

insert into tp_int(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
values (127, 32767, 8388607, 2147483647, 9223372036854775807);

update tp_int set c_int = 0, c_tinyint = 0 where c_smallint = 32767;

对于 update 语句,TiCDC 将会输出一条 typeUPDATE 的事件消息,如下所示。该 update 语句仅对 c_intc_tinyint 两列进行了修改。输出事件消息的 old 字段,则包含所有列数据。

{
    "id": 0,
    ...
    "type": "UPDATE",
    ...
    "sqlType": {
        ...
    },
    "mysqlType": {
        ...
    },
    "data": [
        {
            "c_bigint": "9223372036854775807",
            "c_int": "0",
            "c_mediumint": "8388607",
            "c_smallint": "32767",
            "c_tinyint": "0",
            "id": "2"
        }
    ],
    "old": [                                 // TiCDC 输出事件消息的 `old` 字段,则包含所有列数据。
        {
            "c_bigint": "9223372036854775807",
            "c_int": "2147483647",           // 修改的列
            "c_mediumint": "8388607",
            "c_smallint": "32767",
            "c_tinyint": "127",              // 修改的列
            "id": "2"
        }
    ]
}

官方 Canal 输出事件消息的 old 字段仅包含被修改的列数据。示例如下。

{
    "id": 0,
    ...
    "type": "UPDATE",
    ...
    "sqlType": {
        ...
    },
    "mysqlType": {
        ...
    },
    "data": [
        {
            "c_bigint": "9223372036854775807",
            "c_int": "0",
            "c_mediumint": "8388607",
            "c_smallint": "32767",
            "c_tinyint": "0",
            "id": "2"
        }
    ],
    "old": [                                    // Canal 输出事件消息的 `old` 字段,仅包含被修改的列的数据。
        {
            "c_int": "2147483647",              // 修改的列
            "c_tinyint": "127",                 // 修改的列
        }
    ]
}

mysqlType 字段

对于 mysqlType 字段,Canal 官方实现中,对于含有参数的类型,会包含完整的参数信息,TiCDC 实现则没有类型参数信息。

在下面示例的表定义 SQL 语句中,如 decimal / char / varchar / enum 等类型,都含有参数。对比 TiCDC 和 Canal 官方实现分别生成的 Canal-JSON 格式数据可知,在 mysqlType 字段中的数据,TiCDC 实现只包含基本 MySQL Type。如果业务需要类型参数信息,需要你自行通过其他方式实现。

假设在上游数据库按顺序执行如下 SQL 语句:

create table t (
    id     int auto_increment,
    c_decimal    decimal(10, 4) null,
    c_char       char(16)      null,
    c_varchar    varchar(16)   null,
    c_binary     binary(16)    null,
    c_varbinary  varbinary(16) null,

    c_enum enum('a','b','c') null,
    c_set  set('a','b','c')  null,
    c_bit  bit(64)            null,
    constraint pk
        primary key (id)
);

insert into t (c_decimal, c_char, c_varchar, c_binary, c_varbinary, c_enum, c_set, c_bit)
values (123.456, "abc", "abc", "abc", "abc", 'a', 'a,b', b'1000001');

TiCDC 输出内容如下:

{
    "id": 0,
    ...
    "isDdl": false,
    "sqlType": {
        ...
    },
    "mysqlType": {
        "c_binary": "binary",
        "c_bit": "bit",
        "c_char": "char",
        "c_decimal": "decimal",
        "c_enum": "enum",
        "c_set": "set",
        "c_varbinary": "varbinary",
        "c_varchar": "varchar",
        "id": "int"
    },
    "data": [
        {
            ...
        }
    ],
    "old": null,
}

Canal 官方实现输出内容如下:

{
    "id": 0,
    ...
    "isDdl": false,
    "sqlType": {
        ...
    },
    "mysqlType": {
        "c_binary": "binary(16)",
        "c_bit": "bit(64)",
        "c_char": "char(16)",
        "c_decimal": "decimal(10, 4)",
        "c_enum": "enum('a','b','c')",
        "c_set": "set('a','b','c')",
        "c_varbinary": "varbinary(16)",
        "c_varchar": "varchar(16)",
        "id": "int"
    },
    "data": [
        {
            ...
        }
    ],
    "old": null,
}

TiCDC Canal-JSON 改动说明

Delete 类型事件中 Old 字段的变化说明

TiCDC 实现的 Canal-JSON 格式,v5.4.0 及以后版本的实现,和之前的有些许不同,具体如下:

  • Delete 类型事件,Old 字段的内容发生了变化。

如下是一个 DELETE 事件的数据内容,在 v5.4.0 前的实现中,"old" 的内容和 "data" 相同,在 v5.4.0 及之后的实现中,"old" 将被设为 null。你可以通过 "data" 字段获取到被删除的数据。

{
    "id": 0,
    "database": "test",
    ...
    "type": "DELETE",
    ...
    "sqlType": {
        ...
    },
    "mysqlType": {
        ...
    },
    "data": [
        {
            "c_bigint": "9223372036854775807",
            "c_int": "0",
            "c_mediumint": "8388607",
            "c_smallint": "32767",
            "c_tinyint": "0",
            "id": "2"
        }
    ],
    "old": null,

    // 以下示例是 v5.4.0 之前的实现,`old` 内容等同于 `data` 内容
    "old": [
        {
            "c_bigint": "9223372036854775807",
            "c_int": "0",
            "c_mediumint": "8388607",
            "c_smallint": "32767",
            "c_tinyint": "0",
            "id": "2"
        }
    ]
}
文档内容是否有帮助?