TiCDC 单行数据正确性校验

从 v7.1.0 开始,TiCDC 引入了单行数据正确性校验功能。该功能基于 Checksum 算法,校验一行数据从 TiDB 写入、通过 TiCDC 同步,到写入 Kafka 集群的过程中数据内容是否发生错误。TiCDC 数据正确性校验功能仅支持下游是 Kafka 的 Changefeed,目前支持 Avro 协议。

实现原理

在启用单行数据 Checksum 正确性校验功能后,TiDB 使用 CRC32 算法计算该行数据的 Checksum 值,并将其一并写入 TiKV。TiCDC 从 TiKV 读取数据,根据相同的算法重新计算 Checksum,如果该值与 TiDB 写入的值相同,则可以证明数据在 TiDB 至 TiCDC 的传输过程中是正确的。

TiCDC 将数据编码成特定格式并发送至 Kafka。Kafka Consumer 读取数据后,可以使用与 TiDB 相同的算法计算得到新的 Checksum,将此值与数据中携带的 Checksum 值进行比较,若二者一致,则可证明从 TiCDC 至 Kafka Consumer 的传输链路上的数据是正确的。

关于 Checksum 值的计算规则,请参考 Checksum 计算规则

启用功能

TiCDC 数据正确性校验功能默认关闭,要使用该功能,请执行以下步骤:

  1. 首先,你需要在上游 TiDB 中开启行数据 Checksum 功能 (tidb_enable_row_level_checksum):

    SET GLOBAL tidb_enable_row_level_checksum = ON;

    上述配置仅对新创建的会话生效,因此需要重新连接 TiDB。

  2. 在创建 Changefeed 的 --config 参数所指定的配置文件中,添加如下配置:

    [integrity] integrity-check-level = "correctness" corruption-handle-level = "warn"
  3. 当使用 Avro 作为数据编码格式时,你需要在 sink-uri 中设置 enable-tidb-extension=true。同时,为了防止数值类型在网络传输过程中发生精度丢失,导致 Checksum 校验失败,还需要设置 avro-decimal-handling-mode=stringavro-bigint-unsigned-handling-mode=string。下面是一个配置示例:

    cdc cli changefeed create --server=http://127.0.0.1:8300 --changefeed-id="kafka-avro-checksum" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml

    通过上述配置,Changefeed 会在每条写入 Kafka 的消息中携带该消息对应数据的 Checksum,你可以根据此 Checksum 的值进行数据一致性校验。

关闭功能

TiCDC 默认关闭单行数据的 Checksum 校验功能。若要在开启此功能后将其关闭,请执行以下步骤:

  1. 首先,按照 TiCDC 更新同步任务配置的说明,按照 暂停任务 -> 修改配置 -> 恢复任务 的流程更新 Changefeed 的配置内容。在 Changefeed 的 --config 参数所指定的配置文件中调整 [integrity] 的配置内容为:

    [integrity] integrity-check-level = "none" corruption-handle-level = "warn"
  2. 在上游 TiDB 中关闭行数据 Checksum 功能 (tidb_enable_row_level_checksum),执行如下 SQL 语句:

    SET GLOBAL tidb_enable_row_level_checksum = OFF;

    上述配置仅对新创建的会话生效。在所有写入 TiDB 的客户端都完成数据库连接重建后,Changefeed 写入 Kafka 的消息中将不再携带该条消息对应数据的 Checksum 值。

Checksum 计算规则

Checksum 计算算法的伪代码如下:

fn checksum(columns) { let result = 0 for column in sort_by_schema_order(columns) { result = crc32.update(result, encode(column)) } return result }
  • columns 应该按照 column ID 排序。在 Avro schema 中,各个字段已经按照 column ID 的顺序排序,因此可以直接按照此顺序排序 columns

  • encode(column) 函数将 column 的值编码为字节,编码规则取决于该 column 的数据类型。具体规则如下:

    • TINYINT、SMALLINT、INT、BIGINT、MEDIUMINT 和 YEAR 类型会被转换为 UINT64 类型,并按照小端序编码。例如,数字 0x0123456789abcdef 会被编码为 hex'0x0123456789abcdef'

    • FLOAT 和 DOUBLE 类型会被转换为 DOUBLE 类型,然后转换为 IEEE754 格式的 UINT64 类型。

    • BIT、ENUM 和 SET 类型会被转换为 UINT64 类型。

      • BIT 类型按照二进制转换为 UINT64 类型。
      • ENUM 和 SET 类型按照其对应的 INT 值转换为 UINT64 类型。例如,SET('a','b','c') 类型 column 的数据值为 'a,c',则该值将被编码为 0b101,即 5
    • TIMESTAMP、DATE、DURATION、DATETIME、JSON 和 DECIMAL 类型会被转换为 STRING 类型,然后转换为字节。

    • CHAR、VARCHAR、VARSTRING、STRING、TEXT、BLOB(包括 TINY、MEDIUM 和 LONG)等字符类型,会直接使用字节。

    • NULL 和 GEOMETRY 类型不会被纳入到 Checksum 计算中,返回空字节。

基于 Golang 的 Avro 数据消费和 Checksum 校验,可以参考 TiCDC 行数据 Checksum 校验

文档内容是否有帮助?