- 关于 TiDB
- 主要概念
- 操作指南
- 快速上手
- 部署
- 配置
- 安全
- 安全传输层协议 (TLS)
- 生成自签名证书
- 监控
- 迁移
- 运维
- 扩容缩容
- 升级
- 故障诊断
- 参考手册
- SQL
- 与 MySQL 兼容性对比
- SQL 语言结构
- 数据类型
- 函数与操作符
- SQL 语句
ADD COLUMN
ADD INDEX
ADMIN
ADMIN CANCEL DDL
ADMIN CHECKSUM TABLE
ADMIN CHECK [TABLE|INDEX]
ADMIN SHOW DDL [JOBS|QUERIES]
ALTER DATABASE
ALTER TABLE
ALTER USER
ANALYZE TABLE
BEGIN
CHANGE COLUMN
COMMIT
CREATE DATABASE
CREATE INDEX
CREATE TABLE LIKE
CREATE TABLE
CREATE USER
DEALLOCATE
DELETE
DESC
DESCRIBE
DO
DROP COLUMN
DROP DATABASE
DROP INDEX
DROP TABLE
DROP USER
EXECUTE
EXPLAIN ANALYZE
EXPLAIN
FLUSH PRIVILEGES
FLUSH STATUS
FLUSH TABLES
GRANT <privileges>
INSERT
KILL [TIDB]
LOAD DATA
LOAD STATS
MODIFY COLUMN
PREPARE
RENAME INDEX
RENAME TABLE
REPLACE
REVOKE <privileges>
ROLLBACK
SELECT
SET [NAMES|CHARACTER SET]
SET PASSWORD
SET TRANSACTION
SET [GLOBAL|SESSION] <variable>
SHOW CHARACTER SET
SHOW COLLATION
SHOW [FULL] COLUMNS FROM
SHOW CREATE TABLE
SHOW DATABASES
SHOW ENGINES
SHOW ERRORS
SHOW [FULL] FIELDS FROM
SHOW GRANTS
SHOW INDEXES [FROM|IN]
SHOW INDEX [FROM|IN]
SHOW KEYS [FROM|IN]
SHOW PRIVILEGES
SHOW [FULL] PROCESSSLIST
SHOW SCHEMAS
SHOW STATUS
SHOW [FULL] TABLES
SHOW TABLE STATUS
SHOW [GLOBAL|SESSION] VARIABLES
SHOW WARNINGS
START TRANSACTION
TRACE
TRUNCATE
UPDATE
USE
- 约束
- 生成列
- 字符集
- 配置
- tidb-server
- pd-server
- tikv-server
- 安全
- 事务
- 系统数据库
- 错误码
- 支持的连接器和 API
- 垃圾回收 (GC)
- 性能调优
- 监控指标
- 报警规则
- 最佳实践
- TiSpark 使用指南
- TiDB Binlog
- 周边工具
- 常见问题 (FAQ)
- 技术支持
- 贡献
- 版本发布历史
- 术语表
重要
你正在查看 TiDB 数据库的较旧版本 (TiDB v2.1) 的文档。
如无特殊需求,建议使用 TiDB 数据库的最新 LTS 版本。
Binlog Consumer Client 用户文档
目前 Drainer 提供了多种输出方式,包括 MySQL、TiDB、file 等。但是用户往往有一些自定义的需求,比如输出到 Elasticsearch、Hive 等,这些需求 Drainer 现在还没有实现,因此 Drainer 增加了输出到 Kafka 的功能,将 binlog 数据解析后按一定的格式再输出到 Kafka 中,用户编写代码从 Kafka 中读出数据再进行处理。
配置 Kafka Drainer
修改 Drainer 的配置文件,设置输出为 Kafka,相关配置如下:
[syncer]
db-type = "kafka"
[syncer.to]
# Kafka 地址
kafka-addrs = "127.0.0.1:9092"
# Kafka 版本号
kafka-version = "0.8.2.0"
自定义开发
数据格式
首先需要了解 Drainer 写入到 Kafka 中的数据格式:
// Column 保存列的数据,针对数据的类型,保存在对应的变量中
message Column {
// 数据是否为 null
optional bool is_null = 1 [ default = false ];
// 保存 int 类型的数据
optional int64 int64_value = 2;
// 保存 uint、enum, set 类型的数据
optional uint64 uint64_value = 3;
// 保存 float、double 类型的数据
optional double double_value = 4;
// 保存 bit、blob、binary、json 类型的数据
optional bytes bytes_value = 5;
// 保存 date、time、decimal、text、char 类型的数据
optional string string_value = 6;
}
// ColumnInfo 保存列的信息,包括列名、类型、是否为主键
message ColumnInfo {
optional string name = 1 [ (gogoproto.nullable) = false ];
// MySQL 中小写的列字段类型
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
// numeric 类型:int bigint smallint tinyint float double decimal bit
// string 类型:text longtext mediumtext char tinytext varchar
// blob longblob mediumblob binary tinyblob varbinary
// enum set
// json 类型:json
optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
}
// Row 保存一行的具体数据
message Row { repeated Column columns = 1; }
// MutationType 表示 DML 的类型
enum MutationType {
Insert = 0;
Update = 1;
Delete = 2;
}
// Table 包含一个表的数据变更
message Table {
optional string schema_name = 1;
optional string table_name = 2;
repeated ColumnInfo column_info = 3;
repeated TableMutation mutations = 4;
}
// TableMutation 保存一行数据的变更
message TableMutation {
required MutationType type = 1;
// 修改后的数据
required Row row = 2;
// 修改前的数据,只对 Update MutationType 有效
optional Row change_row = 3;
}
// DMLData 保存一个事务所有的 DML 造成的数据变更
message DMLData {
// `tables` 包含事务中所有表的数据变更
repeated Table tables = 1;
}
// DDLData 保存 DDL 的信息
message DDLData {
// 当前使用的数据库
optional string schema_name = 1;
// 相关表
optional string table_name = 2;
// `ddl_query` 是原始的 DDL 语句 query
optional bytes ddl_query = 3;
}
// BinlogType 为 Binlog 的类型,分为 DML 和 DDL
enum BinlogType {
DML = 0; // Has `dml_data`
DDL = 1; // Has `ddl_query`
}
// Binlog 保存一个事务所有的变更,Kafka 中保存的数据为该结构数据序列化后的结果
message Binlog {
optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
optional DMLData dml_data = 3;
optional DDLData ddl_data = 4;
}
查看数据格式的具体定义,参见 binlog.proto。
Driver
TiDB-Tools 项目提供了用于读取 Kafka 中 binlog 数据的 Driver,具有如下功能:
- 读取 Kafka 的数据
- 根据 commit ts 查找 binlog 在 kafka 中的储存位置
使用该 Driver 时,用户需要配置如下信息:
- KafkaAddr:Kafka 集群的地址
- CommitTS:从哪个 commit ts 开始读取 binlog
- Offset:从 Kafka 哪个 offset 开始读取,如果设置了 CommitTS 就不用配置该参数
- ClusterID:TiDB 集群的 cluster ID
- Topic: Kafka Topic 名称,如果 Topic 名称为空,将会使用
drainer <ClusterID>_obinlog
中的默认名称
用户以包的形式引用 Driver 的代码即可使用,可以参考 Driver 中提供的示例代码来学习如何使用 Driver 以及 binlog 数据的解析,目前提供了两个例子:
- 使用该 Driver 将数据同步到 MySQL,该示例包含将 binlog 转化为 SQL 的具体方法
- 使用该 Driver 将数据打印出来
Driver 项目地址:TiDB Binlog Driver。
注意
- 示例代码仅仅用于示范如何使用 Driver,如果需要用于生产环境需要优化代码。
- 目前仅提供了 golang 版本的 Driver 以及示例代码。如果需要使用其他语言,用户需要根据 binlog 的 proto 文件生成相应语言的代码文件,并自行开发程序读取 Kafka 中的 binlog 数据、解析数据、输出到下游。也欢迎用户优化 example 代码,以及提交其他语言的示例代码到 TiDB-Tools。
文档内容是否有帮助?