- 文档中心
- 关于 TiDB
- 快速上手
- 应用开发
- 概览
- 快速开始
- 示例程序
- 连接到 TiDB
- 数据库模式设计
- 数据写入
- 数据读取
- 事务
- 优化 SQL 性能
- 故障诊断
- 引用文档
- 云原生开发环境
- 第三方软件支持
- 部署标准集群
- 数据迁移
- 数据集成
- 运维操作
- 监控与告警
- 故障诊断
- 性能调优
- 优化手册
- 配置调优
- SQL 性能调优
- SQL 性能调优概览
- 理解 TiDB 执行计划
- SQL 优化流程
- 控制执行计划
- 教程
- 同城多中心部署
- 两地三中心部署
- 同城两中心部署
- 读取历史数据
- 使用 Stale Read 功能读取历史数据(推荐)
- 使用系统变量
tidb_snapshot
读取历史数据
- 最佳实践
- Placement Rules 使用文档
- Load Base Split 使用文档
- Store Limit 使用文档
- TiDB 工具
- 功能概览
- 适用场景
- 工具下载
- TiUP
- 文档地图
- 概览
- 术语及核心概念
- TiUP 组件管理
- FAQ
- 故障排查
- TiUP 命令参考手册
- 命令概览
- TiUP 命令
- TiUP Cluster 命令
- TiUP Cluster 命令概览
- tiup cluster audit
- tiup cluster check
- tiup cluster clean
- tiup cluster deploy
- tiup cluster destroy
- tiup cluster disable
- tiup cluster display
- tiup cluster edit-config
- tiup cluster enable
- tiup cluster help
- tiup cluster import
- tiup cluster list
- tiup cluster patch
- tiup cluster prune
- tiup cluster reload
- tiup cluster rename
- tiup cluster replay
- tiup cluster restart
- tiup cluster scale-in
- tiup cluster scale-out
- tiup cluster start
- tiup cluster stop
- tiup cluster template
- tiup cluster upgrade
- TiUP DM 命令
- TiUP DM 命令概览
- tiup dm audit
- tiup dm deploy
- tiup dm destroy
- tiup dm disable
- tiup dm display
- tiup dm edit-config
- tiup dm enable
- tiup dm help
- tiup dm import
- tiup dm list
- tiup dm patch
- tiup dm prune
- tiup dm reload
- tiup dm replay
- tiup dm restart
- tiup dm scale-in
- tiup dm scale-out
- tiup dm start
- tiup dm stop
- tiup dm template
- tiup dm upgrade
- TiDB 集群拓扑文件配置
- DM 集群拓扑文件配置
- TiUP 镜像参考指南
- TiUP 组件文档
- PingCAP Clinic 诊断服务
- TiDB Operator
- Dumpling
- TiDB Lightning
- TiDB Data Migration
- 关于 Data Migration
- 架构简介
- 快速开始
- 部署 DM 集群
- 入门指南
- 进阶教程
- 运维管理
- 参考手册
- 使用示例
- 异常解决
- 版本发布历史
- Backup & Restore (BR)
- TiDB Binlog
- TiCDC
- TiUniManager
- sync-diff-inspector
- TiSpark
- 参考指南
- 架构
- 监控指标
- 安全加固
- 权限
- SQL
- SQL 语言结构和语法
- SQL 语句
ADD COLUMN
ADD INDEX
ADMIN
ADMIN CANCEL DDL
ADMIN CHECKSUM TABLE
ADMIN CHECK [TABLE|INDEX]
ADMIN SHOW DDL [JOBS|QUERIES]
ADMIN SHOW TELEMETRY
ALTER DATABASE
ALTER INDEX
ALTER INSTANCE
ALTER PLACEMENT POLICY
ALTER TABLE
ALTER TABLE COMPACT
ALTER USER
ANALYZE TABLE
BACKUP
BATCH
BEGIN
CHANGE COLUMN
CHANGE DRAINER
CHANGE PUMP
COMMIT
CREATE [GLOBAL|SESSION] BINDING
CREATE DATABASE
CREATE INDEX
CREATE PLACEMENT POLICY
CREATE ROLE
CREATE SEQUENCE
CREATE TABLE LIKE
CREATE TABLE
CREATE USER
CREATE VIEW
DEALLOCATE
DELETE
DESC
DESCRIBE
DO
DROP [GLOBAL|SESSION] BINDING
DROP COLUMN
DROP DATABASE
DROP INDEX
DROP PLACEMENT POLICY
DROP ROLE
DROP SEQUENCE
DROP STATS
DROP TABLE
DROP USER
DROP VIEW
EXECUTE
EXPLAIN ANALYZE
EXPLAIN
FLASHBACK TABLE
FLUSH PRIVILEGES
FLUSH STATUS
FLUSH TABLES
GRANT <privileges>
GRANT <role>
INSERT
KILL [TIDB]
LOAD DATA
LOAD STATS
MODIFY COLUMN
PREPARE
RECOVER TABLE
RENAME INDEX
RENAME TABLE
RENAME USER
REPLACE
RESTORE
REVOKE <privileges>
REVOKE <role>
ROLLBACK
SELECT
SET DEFAULT ROLE
SET [NAMES|CHARACTER SET]
SET PASSWORD
SET ROLE
SET TRANSACTION
SET [GLOBAL|SESSION] <variable>
SHOW [BACKUPS|RESTORES]
SHOW ANALYZE STATUS
SHOW [GLOBAL|SESSION] BINDINGS
SHOW BUILTINS
SHOW CHARACTER SET
SHOW COLLATION
SHOW [FULL] COLUMNS FROM
SHOW CONFIG
SHOW CREATE PLACEMENT POLICY
SHOW CREATE SEQUENCE
SHOW CREATE TABLE
SHOW CREATE USER
SHOW DATABASES
SHOW DRAINER STATUS
SHOW ENGINES
SHOW ERRORS
SHOW [FULL] FIELDS FROM
SHOW GRANTS
SHOW INDEX [FROM|IN]
SHOW INDEXES [FROM|IN]
SHOW KEYS [FROM|IN]
SHOW MASTER STATUS
SHOW PLACEMENT
SHOW PLACEMENT FOR
SHOW PLACEMENT LABELS
SHOW PLUGINS
SHOW PRIVILEGES
SHOW [FULL] PROCESSSLIST
SHOW PROFILES
SHOW PUMP STATUS
SHOW SCHEMAS
SHOW STATS_HEALTHY
SHOW STATS_HISTOGRAMS
SHOW STATS_META
SHOW STATUS
SHOW TABLE NEXT_ROW_ID
SHOW TABLE REGIONS
SHOW TABLE STATUS
SHOW [FULL] TABLES
SHOW [GLOBAL|SESSION] VARIABLES
SHOW WARNINGS
SHUTDOWN
SPLIT REGION
START TRANSACTION
TABLE
TRACE
TRUNCATE
UPDATE
USE
WITH
- 数据类型
- 函数与操作符
- 聚簇索引
- 约束
- 生成列
- SQL 模式
- 表属性
- 事务
- 视图
- 分区表
- 临时表
- 缓存表
- 字符集和排序
- Placement Rules in SQL
- 系统表
mysql
- INFORMATION_SCHEMA
- Overview
ANALYZE_STATUS
CLIENT_ERRORS_SUMMARY_BY_HOST
CLIENT_ERRORS_SUMMARY_BY_USER
CLIENT_ERRORS_SUMMARY_GLOBAL
CHARACTER_SETS
CLUSTER_CONFIG
CLUSTER_HARDWARE
CLUSTER_INFO
CLUSTER_LOAD
CLUSTER_LOG
CLUSTER_SYSTEMINFO
COLLATIONS
COLLATION_CHARACTER_SET_APPLICABILITY
COLUMNS
DATA_LOCK_WAITS
DDL_JOBS
DEADLOCKS
ENGINES
INSPECTION_RESULT
INSPECTION_RULES
INSPECTION_SUMMARY
KEY_COLUMN_USAGE
METRICS_SUMMARY
METRICS_TABLES
PARTITIONS
PLACEMENT_POLICIES
PROCESSLIST
REFERENTIAL_CONSTRAINTS
SCHEMATA
SEQUENCES
SESSION_VARIABLES
SLOW_QUERY
STATISTICS
TABLES
TABLE_CONSTRAINTS
TABLE_STORAGE_STATS
TIDB_HOT_REGIONS
TIDB_HOT_REGIONS_HISTORY
TIDB_INDEXES
TIDB_SERVERS_INFO
TIDB_TRX
TIFLASH_REPLICA
TIKV_REGION_PEERS
TIKV_REGION_STATUS
TIKV_STORE_STATUS
USER_PRIVILEGES
VIEWS
METRICS_SCHEMA
- UI
- CLI
- 命令行参数
- 配置文件参数
- 系统变量
- 存储引擎
- 遥测
- 错误码
- 通过拓扑 label 进行副本调度
- 常见问题解答 (FAQ)
- 版本发布历史
- 术语表
TiCDC Avro Protocol
Avro 是由 Apache Avro™ 定义的一种数据交换格式协议,Confluent Platform 选择它作为默认的数据交换格式。通过本文,你可以了解 TiCDC 对 Avro 数据格式的实现,包括 TiDB 扩展字段、Avro 数据格式定义,以及和 Confluent Schema Registry 的交互。
使用 Avro
当使用 Message Queue (MQ) 作为下游 Sink 时,你可以在 sink-uri
中指定使用 Avro。TiCDC 获取 TiDB 的 DML 事件,并将这些事件封装到 Avro Message,然后发送到下游。当 Avro 检测到 schema 变化时,会向 Schema Registry 注册最新的 schema。
使用 Avro 时的配置样例如下所示:
cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
--schema-registry
的值支持 https 协议和 username:password 认证,比如--schema-registry=https://username:password@schema-registry-uri.com
,username 和 password 必须经过 URL 编码。
TiDB 扩展字段
默认情况下,Avro 只收集在 DML 事件中发生数据变更的行的所有数据信息,不收集数据变更的类型和 TiDB 专有的 CommitTS 事务唯一标识信息。为了解决这个问题,TiCDC 在 Avro 协议格式中附加了 TiDB 扩展字段。当 sink-uri
中设置 enable-tidb-extension
为 true
(默认为 false
)后,TiCDC 生成 Avro 消息时会新增三个字段:
_tidb_op
:DML 的类型,"c" 表示插入,"u" 表示更新。_tidb_commit_ts
:事务唯一标识信息。_tidb_commit_physical_time
:事务标识信息中现实时间的时间戳。
配置样例如下所示:
cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro-enable-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&enable-tidb-extension=true" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
数据格式定义
TiCDC 会将一个 DML 事件转换为一个 kafka 事件,其中事件的 key 和 value 都按照 Avro 协议进行编码。
Key 数据格式
{
"name":"{{TableName}}",
"namespace":"{{Namespace}}",
"type":"record",
"fields":[
{{ColumnValueBlock}},
{{ColumnValueBlock}},
]
}
{{TableName}}
是事件来源表的名称。{{Namespace}}
是 Avro 的命名空间。{{ColumnValueBlock}}
是每列数据的格式定义。
Key 中的 fields
只包含主键或唯一索引列。
Value 数据格式
{
"name":"{{TableName}}",
"namespace":"{{Namespace}}",
"type":"record",
"fields":[
{{ColumnValueBlock}},
{{ColumnValueBlock}},
]
}
Value 数据格式默认与 Key 数据格式相同,但是 Value 的 fields
中包含了所有的列,而不仅仅是主键列。
如果开启了 TiDB 扩展字段,那么 Value 数据格式将会变成:
{
"name":"{{TableName}}",
"namespace":"{{Namespace}}",
"type":"record",
"fields":[
{{ColumnValueBlock}},
{{ColumnValueBlock}},
{
"name":"_tidb_op",
"type":"string"
},
{
"name":"_tidb_commit_ts",
"type":"long"
},
{
"name":"_tidb_commit_physical_time",
"type":"long"
}
]
}
相比于不打开 TiDB 扩展字段选项,多出了 _tidb_op
, _tidb_commit_ts
, _tidb_commit_physical_time
三个字段的定义。
Column 数据格式
Column 数据格式即 Key/Value 数据格式中的 {{ColumnValueBlock}}
部分,TiCDC 会根据 SQL Type 生成对应的 Column 数据格式。基础的 Column 数据格式是:
{
"name":"{{ColumnName}}",
"type":{
"connect.parameters":{
"tidb_type":"{{TIDB_TYPE}}"
},
"type":"{{AVRO_TYPE}}"
}
}
如果一列可以为 NULL,那么 Column 数据格式是:
{
"default":null,
"name":"{{ColumnName}}",
"type":[
"null",
{
"connect.parameters":{
"tidb_type":"{{TIDB_TYPE}}"
},
"type":"{{AVRO_TYPE}}"
}
]
}
{{ColumnName}}
表示列名。{{TIDB_TYPE}}
表示对应到 TiDB 中的类型,与原始的 SQL Type 不是一一对应关系。{{AVRO_TYPE}}
表示 avro spec 中的类型。
SQL TYPE | TIDB_TYPE | AVRO_TYPE | 说明 |
---|---|---|---|
BOOL | INT | int | |
TINYINT | INT | int | 当 TINYINT 为无符号值时,TIDB_TYPE 为 INT UNSIGNED。 |
SMALLINT | INT | int | 当 SMALLINT 为无符号值时,TIDB_TYPE 为 INT UNSIGNED. |
MEDIUMINT | INT | int | 当 MEDIUMINT 为无符号值时,TIDB_TYPE 为 INT UNSIGNED。 |
INT | INT | int | 当 INT 为无符号值时,TIDB_TYPE 为 INT UNSIGNED,AVRO_TYPE 为 long。 |
BIGINT | BIGINT | long | 当 BIGINT 为无符号值时,TIDB_TYPE 为 BIGINT UNSIGNED。当 avro-bigint-unsigned-handling-mode 为 string 时,AVRO_TYPE 也为 string。 |
TINYBLOB | BLOB | bytes | |
BLOB | BLOB | bytes | |
MEDIUMBLOB | BLOB | bytes | |
LONGBLOB | BLOB | bytes | |
BINARY | BLOB | bytes | |
VARBINARY | BLOB | bytes | |
TINYTEXT | TEXT | string | |
TEXT | TEXT | string | |
MEDIUMTEXT | TEXT | string | |
LONGTEXT | TEXT | string | |
CHAR | TEXT | string | |
VARCHAR | TEXT | string | |
FLOAT | FLOAT | double | |
DOUBLE | DOUBLE | double | |
DATE | DATE | string | |
DATETIME | DATETIME | string | |
TIMESTAMP | TIMESTAMP | string | |
TIME | TIME | string | |
YEAR | YEAR | int | |
BIT | BIT | bytes | |
JSON | JSON | string | |
ENUM | ENUM | string | |
SET | SET | string | |
DECIMAL | DECIMAL | bytes | 当 avro-decimal-handling-mode 为 string 时,AVRO_TYPE 也为 string。 |
对于 Avro 协议,另外两个 sink-uri
参数 avro-decimal-handling-mode
和 avro-bigint-unsigned-handling-mode
也会影响 Column 数据格式:
avro-decimal-handling-mode
决定了如何处理 DECIMAL 字段,它有两个选项:- string:Avro 将 DECIMAL 字段以 string 的方式处理。
- precise:Avro 将 DECIMAL 字段以字节的方式处理。
avro-bigint-unsigned-handling-mode
决定了如何处理 BIGINT UNSIGNED 字段,它有两个选项:- string:Avro 将 BIGINT UNSIGNED 字段以 string 的方式处理。
- long:Avro 将 BIGINT UNSIGNED 字段以 64 位有符号整数处理,大于 9223372036854775807 的值会发生溢出。
配置样例如下所示:
cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-avro-string-option" --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=avro&avro-decimal-handling-mode=string&avro-bigint-unsigned-handling-mode=string" --schema-registry=http://127.0.0.1:8081 --config changefeed_config.toml
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
大多数的 SQL Type 都会映射成基础的 Column 数据格式,但有一些类型会在基础数据格式上拓展,提供更多的信息。
BIT(64)
{
"name":"{{ColumnName}}",
"type":{
"connect.parameters":{
"tidb_type":"BIT",
"length":"64"
},
"type":"bytes"
}
}
ENUM/SET(a,b,c)
{
"name":"{{ColumnName}}",
"type":{
"connect.parameters":{
"tidb_type":"ENUM/SET",
"allowed":"a,b,c"
},
"type":"string"
}
}
DECIMAL(10, 4)
{
"name":"{{ColumnName}}",
"type":{
"connect.parameters":{
"tidb_type":"DECIMAL",
},
"logicalType":"decimal",
"precision":10,
"scale":4,
"type":"bytes"
}
}
DDL 事件与 Schema 变更
Avro 并不会向下游生成 DDL 事件。Avro 会在每次 DML 事件发生时检测是否发生 schema 变更,如果发生了 schema 变更,Avro 会生成新的 schema,并尝试向 Schema Registry 注册。注册时,Schema Registry 会做兼容性检测,如果此次 schema 变更没有通过兼容性检测,注册将会失败,Avro 并不会尝试解决 schema 的兼容性问题。
同时,即使 schema 变更通过兼容性检测并成功注册新版本,数据的生产者和消费者可能仍然需要升级才能正确工作。
比如,Confluent Schema Registry 默认的兼容性策略是 BACKWARD,在这种策略下,如果你在源表增加一个非空列,Avro 在生成新 schema 向 Schema Registry 注册时将会因为兼容性问题失败,这个时候 changefeed 将会进入 error 状态。
如需了解更多 schema 相关信息,请参阅 Schema Registry 的相关文档。
Topic 分发
Schema Registry 支持三种 Subject Name Strategy:TopicNameStrategy、RecordNameStrategy 和 TopicRecordNameStrategy。目前 TiCDC Avro 只支持 TopicNameStrategy 一种,这意味着一个 kafka topic 只能接收一种数据格式的数据,所以 TiCDC Avro 禁止将多张表映射到同一个 topic。在创建 changefeed 时,如果配置的分发规则中,topic 规则不包含 {schema}
和 {table}
占位符,将会报错。