- Introduction
- Concepts
- Architecture
- Key Features
- Horizontal Scalability
- MySQL Compatible Syntax
- Replicate from and to MySQL
- Distributed Transactions with Strong Consistency
- Cloud Native Architecture
- Minimize ETL with HTAP
- Fault Tolerance & Recovery with Raft
- Automatic Rebalancing
- Deployment and Orchestration with Ansible, Kubernetes, Docker
- JSON Support
- Spark Integration
- Read Historical Data Without Restoring from Backup
- Fast Import and Restore of Data
- Hybrid of Column and Row Storage
- SQL Plan Management
- Open Source
- Online Schema Changes
- How-to
- Get Started
- Deploy
- Hardware Recommendations
- From Binary Tarball
- Orchestrated Deployment
- Geographic Redundancy
- Data Migration with Ansible
- Configure
- Secure
- Transport Layer Security (TLS)
- Generate Self-signed Certificates
- Monitor
- Migrate
- Maintain
- Common Ansible Operations
- Backup and Restore
- Use BR (recommended)
- Identify Abnormal Queries
- Scale
- Upgrade
- Troubleshoot
- Reference
- SQL
- MySQL Compatibility
- SQL Language Structure
- Attributes
- Data Types
- Functions and Operators
- Function and Operator Reference
- Type Conversion in Expression Evaluation
- Operators
- Control Flow Functions
- String Functions
- Numeric Functions and Operators
- Date and Time Functions
- Bit Functions and Operators
- Cast Functions and Operators
- Encryption and Compression Functions
- Information Functions
- JSON Functions
- Aggregate (GROUP BY) Functions
- Window Functions
- Miscellaneous Functions
- Precision Math
- List of Expressions for Pushdown
- SQL Statements
ADD COLUMN
ADD INDEX
ADMIN
ADMIN CANCEL DDL
ADMIN CHECKSUM TABLE
ADMIN CHECK [TABLE|INDEX]
ADMIN SHOW DDL [JOBS|QUERIES]
ALTER DATABASE
ALTER INSTANCE
ALTER TABLE
ALTER USER
ANALYZE TABLE
BEGIN
CHANGE COLUMN
COMMIT
CREATE DATABASE
CREATE INDEX
CREATE ROLE
CREATE TABLE LIKE
CREATE TABLE
CREATE USER
CREATE VIEW
DEALLOCATE
DELETE
DESC
DESCRIBE
DO
DROP COLUMN
DROP DATABASE
DROP INDEX
DROP ROLE
DROP TABLE
DROP USER
DROP VIEW
EXECUTE
EXPLAIN ANALYZE
EXPLAIN
FLUSH PRIVILEGES
FLUSH STATUS
FLUSH TABLES
GRANT <privileges>
GRANT <role>
INSERT
KILL [TIDB]
LOAD DATA
LOAD STATS
MODIFY COLUMN
PREPARE
RECOVER TABLE
RENAME INDEX
RENAME TABLE
REPLACE
REVOKE <privileges>
REVOKE <role>
ROLLBACK
SELECT
SET DEFAULT ROLE
SET [NAMES|CHARACTER SET]
SET PASSWORD
SET ROLE
SET TRANSACTION
SET [GLOBAL|SESSION] <variable>
SHOW ANALYZE STATUS
SHOW CHARACTER SET
SHOW COLLATION
SHOW [FULL] COLUMNS FROM
SHOW CREATE TABLE
SHOW CREATE USER
SHOW DATABASES
SHOW ENGINES
SHOW ERRORS
SHOW [FULL] FIELDS FROM
SHOW GRANTS
SHOW INDEXES [FROM|IN]
SHOW INDEX [FROM|IN]
SHOW KEYS [FROM|IN]
SHOW PRIVILEGES
SHOW [FULL] PROCESSSLIST
SHOW SCHEMAS
SHOW STATUS
SHOW [FULL] TABLES
SHOW TABLE REGIONS
SHOW TABLE STATUS
SHOW [GLOBAL|SESSION] VARIABLES
SHOW WARNINGS
SPLIT REGION
START TRANSACTION
TRACE
TRUNCATE
UPDATE
USE
- Constraints
- Generated Columns
- Partitioning
- Character Set
- SQL Mode
- Views
- Configuration
- Security
- Transactions
- System Databases
- Errors Codes
- Supported Client Drivers
- Garbage Collection (GC)
- Performance
- Overview
- Understanding the Query Execution Plan
- The Blocklist of Optimization Rules and Expression Pushdown
- Introduction to Statistics
- TopN and Limit Push Down
- Optimizer Hints
- Follower Read
- Check the TiDB Cluster Status Using SQL Statements
- Execution Plan Binding
- Statement Summary Table
- Tune TiKV
- Operating System Tuning
- Column Pruning
- Key Monitoring Metrics
- Alert Rules
- Best Practices
- TiSpark
- TiKV
- TiFlash
- TiDB Binlog
- Tools
- Overview
- Use Cases
- Download
- TiDB Operator
- Table Filter
- Backup & Restore (BR)
- Mydumper
- Syncer
- Loader
- Data Migration
- TiDB Lightning
- sync-diff-inspector
- PD Control
- PD Recover
- TiKV Control
- TiDB Control
- TiDB in Kubernetes
- FAQs
- Support
- Contribute
- Releases
- All Releases
- v3.1
- v3.0
- v2.1
- v2.0
- v1.0
- Glossary
You are viewing the documentation of an older version of the TiDB database (TiDB v3.1).
Binlog Consumer Client User Guide
Binlog Consumer Client is used to consume TiDB secondary binlog data from Kafka and output the data in a specific format. Currently, Drainer supports multiple kinds of down streaming, including MySQL, TiDB, file and Kafka. But sometimes users have customized requirements for outputting data to other formats, for example, Elasticsearch and Hive, so this feature is introduced.
Configure Drainer
Modify the configuration file of Drainer and set it to output the data to Kafka:
[syncer]
db-type = "kafka"
[syncer.to]
# the Kafka address
kafka-addrs = "127.0.0.1:9092"
# the Kafka version
kafka-version = "0.8.2.0"
Customized development
Data format
Firstly, you need to obtain the format information of the data which is output to Kafka by Drainer:
// `Column` stores the column data in the corresponding variable based on the data type.
message Column {
// Indicates whether the data is null
optional bool is_null = 1 [ default = false ];
// Stores `int` data
optional int64 int64_value = 2;
// Stores `uint`, `enum`, and `set` data
optional uint64 uint64_value = 3;
// Stores `float` and `double` data
optional double double_value = 4;
// Stores `bit`, `blob`, `binary` and `json` data
optional bytes bytes_value = 5;
// Stores `date`, `time`, `decimal`, `text`, `char` data
optional string string_value = 6;
}
// `ColumnInfo` stores the column information, including the column name, type, and whether it is the primary key.
message ColumnInfo {
optional string name = 1 [ (gogoproto.nullable) = false ];
// the lower case column field type in MySQL
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
// for the `numeric` type: int bigint smallint tinyint float double decimal bit
// for the `string` type: text longtext mediumtext char tinytext varchar
// blob longblob mediumblob binary tinyblob varbinary
// enum set
// for the `json` type: json
optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
}
// `Row` stores the actual data of a row.
message Row { repeated Column columns = 1; }
// `MutationType` indicates the DML type.
enum MutationType {
Insert = 0;
Update = 1;
Delete = 2;
}
// `Table` contains mutations in a table.
message Table {
optional string schema_name = 1;
optional string table_name = 2;
repeated ColumnInfo column_info = 3;
repeated TableMutation mutations = 4;
}
// `TableMutation` stores mutations of a row.
message TableMutation {
required MutationType type = 1;
// data after modification
required Row row = 2;
// data before modification. It only takes effect for `Update MutationType`.
optional Row change_row = 3;
}
// `DMLData` stores all the mutations caused by DML in a transaction.
message DMLData {
// `tables` contains all the table changes in the transaction.
repeated Table tables = 1;
}
// `DDLData` stores the DDL information.
message DDLData {
// the database used currently
optional string schema_name = 1;
// the relates table
optional string table_name = 2;
// `ddl_query` is the original DDL statement query.
optional bytes ddl_query = 3;
}
// `BinlogType` indicates the binlog type, including DML and DDL.
enum BinlogType {
DML = 0; // Has `dml_data`
DDL = 1; // Has `ddl_query`
}
// `Binlog` stores all the changes in a transaction. Kafka stores the serialized result of the structure data.
message Binlog {
optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
optional DMLData dml_data = 3;
optional DDLData ddl_data = 4;
}
For the definition of the data format, see binlog.proto
.
Driver
The TiDB-Tools project provides Driver, which is used to read the binlog data in Kafka. It has the following features:
- Read the Kafka data.
- Locate the binlog stored in Kafka based on
commit ts
.
You need to configure the following information when using Driver:
KafkaAddr
: the address of the Kafka clusterCommitTS
: from whichcommit ts
to start reading the binlogOffset
: from which Kafkaoffset
to start reading data. IfCommitTS
is set, you needn't configure this parameter.ClusterID
: the cluster ID of the TiDB clusterTopic
: the topic name of Kafka. If Topic is empty, use the default name in Drainer<ClusterID>_obinlog
.
You can use Driver by quoting the Driver code in package and refer to the example code provided by Driver to learn how to use Driver and parse the binlog data.
Currently, two examples are provided:
- Using Driver to replicate data to MySQL. This example shows how to convert a binlog to SQL
- Using Driver to print data
- The example code only shows how to use Driver. If you want to use Driver in the production environment, you need to optimize the code.
- Currently, only the Golang version of Driver and example code are available. If you want to use other languages, you need to generate the code file in the corresponding language based on the binlog proto file and develop an application to read the binlog data in Kafka, parse the data, and output the data to the downstream. You are also welcome to optimize the example code and submit the example code of other languages to TiDB-Tools.