- 关于 TiDB
- 快速上手
- 应用开发
- 概览
- 快速开始
- 示例程序
- 连接到 TiDB
- 数据库模式设计
- 数据写入
- 数据读取
- 事务
- 优化 SQL 性能
- 故障诊断
- 引用文档
- 云原生开发环境
- 部署标准集群
- 数据迁移
- 运维操作
- 监控与告警
- 故障诊断
- 性能调优
- 优化手册
- 配置调优
- SQL 性能调优
- SQL 性能调优概览
- 理解 TiDB 执行计划
- SQL 优化流程
- 控制执行计划
- 教程
- 同城多中心部署
- 两地三中心部署
- 同城两中心部署
- 读取历史数据
- 使用 Stale Read 功能读取历史数据(推荐)
- 使用系统变量
tidb_snapshot
读取历史数据
- 最佳实践
- Placement Rules 使用文档
- Load Base Split 使用文档
- Store Limit 使用文档
- TiDB 工具
- 功能概览
- 适用场景
- 工具下载
- TiUP
- 文档地图
- 概览
- 术语及核心概念
- TiUP 组件管理
- FAQ
- 故障排查
- TiUP 命令参考手册
- 命令概览
- TiUP 命令
- TiUP Cluster 命令
- TiUP Cluster 命令概览
- tiup cluster audit
- tiup cluster check
- tiup cluster clean
- tiup cluster deploy
- tiup cluster destroy
- tiup cluster disable
- tiup cluster display
- tiup cluster edit-config
- tiup cluster enable
- tiup cluster help
- tiup cluster import
- tiup cluster list
- tiup cluster patch
- tiup cluster prune
- tiup cluster reload
- tiup cluster rename
- tiup cluster replay
- tiup cluster restart
- tiup cluster scale-in
- tiup cluster scale-out
- tiup cluster start
- tiup cluster stop
- tiup cluster template
- tiup cluster upgrade
- TiUP DM 命令
- TiUP DM 命令概览
- tiup dm audit
- tiup dm deploy
- tiup dm destroy
- tiup dm disable
- tiup dm display
- tiup dm edit-config
- tiup dm enable
- tiup dm help
- tiup dm import
- tiup dm list
- tiup dm patch
- tiup dm prune
- tiup dm reload
- tiup dm replay
- tiup dm restart
- tiup dm scale-in
- tiup dm scale-out
- tiup dm start
- tiup dm stop
- tiup dm template
- tiup dm upgrade
- TiDB 集群拓扑文件配置
- DM 集群拓扑文件配置
- TiUP 镜像参考指南
- TiUP 组件文档
- PingCAP Clinic 诊断服务 (Technical Preview)
- TiDB Operator
- Dumpling
- TiDB Lightning
- TiDB Data Migration
- 关于 Data Migration
- 快速开始
- 部署 DM 集群
- 入门指南
- 进阶教程
- 运维管理
- 参考手册
- 使用示例
- 异常解决
- 版本发布历史
- Backup & Restore (BR)
- TiDB Binlog
- TiCDC
- TiUniManager
- sync-diff-inspector
- TiSpark
- 参考指南
- 架构
- 监控指标
- 安全加固
- 权限
- SQL
- SQL 语言结构和语法
- SQL 语句
ADD COLUMN
ADD INDEX
ADMIN
ADMIN CANCEL DDL
ADMIN CHECKSUM TABLE
ADMIN CHECK [TABLE|INDEX]
ADMIN SHOW DDL [JOBS|QUERIES]
ADMIN SHOW TELEMETRY
ALTER DATABASE
ALTER INDEX
ALTER INSTANCE
ALTER PLACEMENT POLICY
ALTER TABLE
ALTER TABLE COMPACT
ALTER USER
ANALYZE TABLE
BACKUP
BATCH
BEGIN
CHANGE COLUMN
CHANGE DRAINER
CHANGE PUMP
COMMIT
CREATE [GLOBAL|SESSION] BINDING
CREATE DATABASE
CREATE INDEX
CREATE PLACEMENT POLICY
CREATE ROLE
CREATE SEQUENCE
CREATE TABLE LIKE
CREATE TABLE
CREATE USER
CREATE VIEW
DEALLOCATE
DELETE
DESC
DESCRIBE
DO
DROP [GLOBAL|SESSION] BINDING
DROP COLUMN
DROP DATABASE
DROP INDEX
DROP PLACEMENT POLICY
DROP ROLE
DROP SEQUENCE
DROP STATS
DROP TABLE
DROP USER
DROP VIEW
EXECUTE
EXPLAIN ANALYZE
EXPLAIN
FLASHBACK TABLE
FLUSH PRIVILEGES
FLUSH STATUS
FLUSH TABLES
GRANT <privileges>
GRANT <role>
INSERT
KILL [TIDB]
LOAD DATA
LOAD STATS
MODIFY COLUMN
PREPARE
RECOVER TABLE
RENAME INDEX
RENAME TABLE
REPLACE
RESTORE
REVOKE <privileges>
REVOKE <role>
ROLLBACK
SELECT
SET DEFAULT ROLE
SET [NAMES|CHARACTER SET]
SET PASSWORD
SET ROLE
SET TRANSACTION
SET [GLOBAL|SESSION] <variable>
SHOW [BACKUPS|RESTORES]
SHOW ANALYZE STATUS
SHOW [GLOBAL|SESSION] BINDINGS
SHOW BUILTINS
SHOW CHARACTER SET
SHOW COLLATION
SHOW [FULL] COLUMNS FROM
SHOW CONFIG
SHOW CREATE PLACEMENT POLICY
SHOW CREATE SEQUENCE
SHOW CREATE TABLE
SHOW CREATE USER
SHOW DATABASES
SHOW DRAINER STATUS
SHOW ENGINES
SHOW ERRORS
SHOW [FULL] FIELDS FROM
SHOW GRANTS
SHOW INDEX [FROM|IN]
SHOW INDEXES [FROM|IN]
SHOW KEYS [FROM|IN]
SHOW MASTER STATUS
SHOW PLACEMENT
SHOW PLACEMENT FOR
SHOW PLACEMENT LABELS
SHOW PLUGINS
SHOW PRIVILEGES
SHOW [FULL] PROCESSSLIST
SHOW PROFILES
SHOW PUMP STATUS
SHOW SCHEMAS
SHOW STATS_HEALTHY
SHOW STATS_HISTOGRAMS
SHOW STATS_META
SHOW STATUS
SHOW TABLE NEXT_ROW_ID
SHOW TABLE REGIONS
SHOW TABLE STATUS
SHOW [FULL] TABLES
SHOW [GLOBAL|SESSION] VARIABLES
SHOW WARNINGS
SHUTDOWN
SPLIT REGION
START TRANSACTION
TABLE
TRACE
TRUNCATE
UPDATE
USE
WITH
- 数据类型
- 函数与操作符
- 聚簇索引
- 约束
- 生成列
- SQL 模式
- 表属性
- 事务
- 视图
- 分区表
- 临时表
- 缓存表
- 字符集和排序
- Placement Rules in SQL
- 系统表
mysql
- INFORMATION_SCHEMA
- Overview
ANALYZE_STATUS
CLIENT_ERRORS_SUMMARY_BY_HOST
CLIENT_ERRORS_SUMMARY_BY_USER
CLIENT_ERRORS_SUMMARY_GLOBAL
CHARACTER_SETS
CLUSTER_CONFIG
CLUSTER_HARDWARE
CLUSTER_INFO
CLUSTER_LOAD
CLUSTER_LOG
CLUSTER_SYSTEMINFO
COLLATIONS
COLLATION_CHARACTER_SET_APPLICABILITY
COLUMNS
DATA_LOCK_WAITS
DDL_JOBS
DEADLOCKS
ENGINES
INSPECTION_RESULT
INSPECTION_RULES
INSPECTION_SUMMARY
KEY_COLUMN_USAGE
METRICS_SUMMARY
METRICS_TABLES
PARTITIONS
PLACEMENT_POLICIES
PROCESSLIST
REFERENTIAL_CONSTRAINTS
SCHEMATA
SEQUENCES
SESSION_VARIABLES
SLOW_QUERY
STATISTICS
TABLES
TABLE_CONSTRAINTS
TABLE_STORAGE_STATS
TIDB_HOT_REGIONS
TIDB_HOT_REGIONS_HISTORY
TIDB_INDEXES
TIDB_SERVERS_INFO
TIDB_TRX
TIFLASH_REPLICA
TIKV_REGION_PEERS
TIKV_REGION_STATUS
TIKV_STORE_STATUS
USER_PRIVILEGES
VIEWS
METRICS_SCHEMA
- UI
- CLI
- 命令行参数
- 配置文件参数
- 系统变量
- 存储引擎
- 遥测
- 错误码
- 通过拓扑 label 进行副本调度
- 常见问题解答 (FAQ)
- 版本发布历史
- 术语表
从 TiDB 同步数据至 Apache Kafka
本文档介绍如何使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka。主要包含以下内容:
- 快速搭建 TiCDC 集群和 Kafka 集群
- 创建以 Kafka 为 sink 的 changefeed
- 使用 go-tpc 写入数据到上游 TiDB,使用 Kafka console consumer 观察数据被写入到指定的 Topic
上述过程将会基于实验环境进行,你可以参考上述执行步骤,搭建生产级别的集群。
第 1 步:搭建环境
部署 TiCDC 集群。
你可以使用 TiUP Playground 功能,快速部署 TiCDC ,命令如下:
tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 # 查看集群状态 tiup status
生产环境下,可以参考 TiCDC 部署,完成 TiCDC 集群部署工作。
部署 Kafka 集群。
- 快速体验,你可以参考 Apache Kakfa Quickstart 启动 Kafka 节点。
- 生产环境下,可以参考 Running Kafka in Production 完成 Kafka 集群搭建。
第 2 步:创建 Kafka Changefeed
使用 TiUP,执行如下命令,创建一个将 Kafka 作为下游节点的 changefeed:
tiup ctl cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" --changefeed-id="kafka-changefeed"
如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相信信息,内容如下:
Create changefeed successfully!
ID: kafka-changefeed
Info: {"sink-uri":"kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json","opts":{},"create-time":"2022-04-06T14:45:10.824475+08:00","start-ts":432335096583028737,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"canal-json","column-selectors":null},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}},"state":"normal","error":null,"sync-point-enabled":false,"sync-point-interval":600000000000,"creator-version":"v6.1.0-master"}
如果命令长时间没有返回,你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性,保证二者之间的网络连接正常。
生产环境下 Kafka 集群通常有多个 broker 节点,你可以在 sink-uri 中配置多个 broker 的访问地址,这有助于提升 changefeed 到 Kafka 集群访问的稳定性,当部分被配置的 Kafka 节点故障的时候,changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点,地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以参考如下 sink-uri 创建 changefeed:
tiup ctl cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576"
上述命令执行返回之后,可以通过如下命令,查看 changefeed 的状态:
tiup ctl cdc changefeed list --pd="http://127.0.0.1:2379"
你可以参考 TiCDC 运维操作及任务管理,对 changefeed 状态进行管理。
第 3 步:TiDB 产生事件变更数据
完成以上步骤后,只要上游 TiDB 有事件变更操作,比如 INSERT
、UPDATE
、DELETE
或其他 DDL 操作,即会产生数据到 TiCDC,然后 TiCDC 会将数据发送到 changefeed 指定的 sink,当下游 sink 是 Kafka 时,数据将会被写入指定的 Kafka Topic 中。
模拟业务负载
在测试实验环境下,我们可以使用
go-tpc
向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。如下命令,首先在上游 TiDB 创建名为tpcc
的数据库,然后使用 TiUP bench 写入数据到刚创建的tpcc
数据库中。create database tpcc; tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s
关于 go-tpc 的更多详细内容,可以参考如何对 TiDB 进行 TPC-C 测试。
消费 Kafka Topic 中的数据
changefeed 正常运行时,会向 Kafka Topic 写入数据,你可以通过由 Kafka 提供的
kafka-console-consumer.sh
, 观测到数据成功被写入到 Kafka Topic 中:./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`
生产环境下,你需要自行开发 Kafka Consumer 程序,对 Kafka Topic 中的数据进行消费。