Sink to Apache Kafka (Beta)
This document describes how to create a changefeed to stream data from TiDB Cloud Essential to Apache Kafka.
Restrictions
- For each TiDB Cloud Essential cluster, you can create up to 10 changefeeds.
- Currently, TiDB Cloud Essential does not support uploading self-signed TLS certificates to connect to Kafka brokers.
- Because TiDB Cloud Essential uses TiCDC to establish changefeeds, it has the same restrictions as TiCDC.
- If the table to be replicated does not have a primary key or a non-null unique index, the absence of a unique constraint during replication could result in duplicated data being inserted downstream in some retry scenarios.
Prerequisites
Before creating a changefeed to stream data to Apache Kafka, you need to complete the following prerequisites:
- Set up your network connection
- Add permissions for Kafka ACL authorization
Network
Ensure that your TiDB Cloud Essential cluster can connect to the Apache Kafka service. You can choose one of the following connection methods:
- Private Link Connection: meeting security compliance and ensuring network quality.
- Public Network: suitable for a quick setup.
Private link connections leverage Private Link technologies from cloud providers to enable resources in your VPC to connect to services in other VPCs using private IP addresses, as if those services were hosted directly within your VPC.
TiDB Cloud Essential currently supports Private Link connections only for self-hosted Kafka and Confluent Cloud Dedicated clusters. It does not support direct integration with MSK or other Kafka SaaS services.
To set up a Private Link connection based on your Kafka deployment and cloud provider, see the following guides:
If you want to provide public access to your Apache Kafka service, assign public IP addresses or domain names to all your Kafka brokers.
It is not recommended to use public access in a production environment.
Kafka ACL authorization
To allow TiDB Cloud Essential changefeeds to stream data to Apache Kafka and create Kafka topics automatically, ensure that the following permissions are added in Kafka:
- The
CreateandWritepermissions are added for the topic resource type in Kafka. - The
DescribeConfigspermission is added for the cluster resource type in Kafka.
For example, if your Kafka cluster is in Confluent Cloud, refer to Resources and Adding ACLs in the Confluent documentation for more information.
Step 1. Open the Changefeed page for Apache Kafka
- Log in to the TiDB Cloud console.
- Navigate to the overview page of the target TiDB Cloud Essential cluster, and then click Data > Changefeed in the left navigation pane.
- Click Create Changefeed, and then select Kafka as Destination.
Step 2. Configure the changefeed target
The steps vary depending on the connectivity method you select.
In Connectivity Method, select Public, and fill in your Kafka broker endpoints. You can use commas
,to separate multiple endpoints.Select an Authentication option according to your Kafka authentication configuration.
- If your Kafka does not require authentication, keep the default option Disable.
- If your Kafka requires authentication, select the corresponding authentication type, and then fill in the user name and password of your Kafka account for authentication.
For Kafka Version, select Kafka v2 or Kafka v3 based on your Kafka version.
Select a Compression type for the data in this changefeed.
Enable the TLS Encryption option if your Kafka has enabled TLS encryption and you want to use TLS encryption for the Kafka connection.
Click Next to test the network connection. If the test succeeds, you will be directed to the next page.
In Connectivity Method, select Private Link.
In Private Link Connection, select the private link connection that you created in the Network section. Make sure the Availability Zones of the private link connection match those of the Kafka deployment.
Fill in the Bootstrap Port that you obtained from the Network section.
Select an Authentication option according to your Kafka authentication configuration.
- If your Kafka does not require authentication, keep the default option Disable.
- If your Kafka requires authentication, select the corresponding authentication type, and then fill in the user name and password of your Kafka account for authentication.
For Kafka Version, select Kafka v2 or Kafka v3 based on your Kafka version.
Select a Compression type for the data in this changefeed.
Enable the TLS Encryption option if your Kafka has enabled TLS encryption and you want to use TLS encryption for the Kafka connection.
If your Kafka requires TLS SNI verification, enter the TLS Server Name. For example,
Confluent Cloud Dedicated clusters.Click Next to test the network connection. If the test succeeds, you will be directed to the next page.
Step 3. Set the changefeed
Customize Table Filter to filter the tables that you want to replicate. For the rule syntax, refer to table filter rules.
- Replication Scope: you can choose to only replicate tables with valid keys or replicate all selected tables.
- Filter Rules: you can set filter rules in this column. By default, there is a rule
*.*, which stands for replicating all tables. When you add a new rule and click Apply, TiDB Cloud queries all the tables in TiDB and displays only the tables that match the rules under Filter results. - Case Sensitive: you can set whether the matching of database and table names in filter rules is case-sensitive. By default, matching is case-insensitive.
- Filter results with valid keys: this column displays the tables that have valid keys, including primary keys or unique indexes.
- Filter results without valid keys: this column shows tables that lack primary keys or unique keys. These tables present a challenge during replication because the absence of a unique identifier can result in inconsistent data when the downstream handles duplicate events. To ensure data consistency, it is recommended to add unique keys or primary keys to these tables before initiating the replication. Alternatively, you can add filter rules to exclude these tables. For example, you can exclude the table
test.tbl1by using the rule"!test.tbl1".
Customize Event Filter to filter the events that you want to replicate.
- Tables matching: you can set which tables the event filter will be applied to in this column. The rule syntax is the same as that used for the preceding Table Filter area.
- Event Filter: you can choose the events you want to ignore.
Customize Column Selector to select columns from events and send only the data changes related to those columns to the downstream.
- Tables matching: specify which tables the column selector applies to. For tables that do not match any rule, all columns are sent.
- Column Selector: specify which columns of the matched tables will be sent to the downstream.
For more information about the matching rules, see Column selectors.
In the Data Format area, select your desired format of Kafka messages.
- Avro is a compact, fast, and binary data format with rich data structures, which is widely used in various flow systems. For more information, see Avro data format.
- Canal-JSON is a plain JSON text format, which is easy to parse. For more information, see Canal-JSON data format.
- Open Protocol is a row-level data change notification protocol that provides data sources for monitoring, caching, full-text indexing, analysis engines, and primary-secondary replication between different databases. For more information, see Open Protocol data format.
- Debezium is a tool for capturing database changes. It converts each captured database change into a message called an "event" and sends these events to Kafka. For more information, see Debezium data format.
Enable the TiDB Extension option if you want to add TiDB-extension fields to the Kafka message body.
For more information about TiDB-extension fields, see TiDB extension fields in Avro data format and TiDB extension fields in Canal-JSON data format.
If you select Avro as your data format, you will see some Avro-specific configurations on the page. You can fill in these configurations as follows:
- In the Decimal and Unsigned BigInt configurations, specify how TiDB Cloud handles the decimal and unsigned bigint data types in Kafka messages.
- In the Schema Registry area, fill in your schema registry endpoint. If you enable HTTP Authentication, enter the user name and password.
In the Topic Distribution area, select a distribution mode, and then fill in the topic name configurations according to the mode.
If you select Avro as your data format, you can only choose the Distribute changelogs by table to Kafka Topics mode in the Distribution Mode drop-down list.
The distribution mode controls how the changefeed creates Kafka topics, by table, by database, or creating one topic for all changelogs.
Distribute changelogs by table to Kafka Topics
If you want the changefeed to create a dedicated Kafka topic for each table, choose this mode. Then, all Kafka messages of a table are sent to a dedicated Kafka topic. You can customize topic names for tables by setting a topic prefix, a separator between a database name and table name, and a suffix. For example, if you set the separator as
_, the topic names are in the format of<Prefix><DatabaseName>_<TableName><Suffix>.For changelogs of non-row events, such as Create Schema Event, you can specify a topic name in the Default Topic Name field. The changefeed will create a topic accordingly to collect such changelogs.
Distribute changelogs by database to Kafka Topics
If you want the changefeed to create a dedicated Kafka topic for each database, choose this mode. Then, all Kafka messages of a database are sent to a dedicated Kafka topic. You can customize topic names of databases by setting a topic prefix and a suffix.
For changelogs of non-row events, such as Resolved Ts Event, you can specify a topic name in the Default Topic Name field. The changefeed will create a topic accordingly to collect such changelogs.
Send all changelogs to one specified Kafka Topic
If you want the changefeed to create one Kafka topic for all changelogs, choose this mode. Then, all Kafka messages in the changefeed will be sent to one Kafka topic. You can define the topic name in the Topic Name field.
In the Partition Distribution area, you can decide which partition a Kafka message will be sent to. You can define a single partition dispatcher for all tables, or different partition dispatchers for different tables. TiDB Cloud provides four types of dispatchers:
Distribute changelogs by primary key or index value to Kafka partition
If you want the changefeed to send Kafka messages of a table to different partitions, choose this distribution method. The primary key or index value of a row changelog will determine which partition the changelog is sent to. Keep the Index Name field empty if you want to use the primary key. This distribution method provides a better partition balance and ensures row-level orderliness.
Distribute changelogs by table to Kafka partition
If you want the changefeed to send Kafka messages of a table to one Kafka partition, choose this distribution method. The table name of a row changelog will determine which partition the changelog is sent to. This distribution method ensures table orderliness but might cause unbalanced partitions.
Distribute changelogs by timestamp to Kafka partition
If you want the changefeed to send Kafka messages to different Kafka partitions randomly, choose this distribution method. The commitTs of a row changelog will determine which partition the changelog is sent to. This distribution method provides a better partition balance and ensures orderliness in each partition. However, multiple changes of a data item might be sent to different partitions and the consumer progress of different consumers might be different, which might cause data inconsistency. Therefore, the consumer needs to sort the data from multiple partitions by commitTs before consuming.
Distribute changelogs by column value to Kafka partition
If you want the changefeed to send Kafka messages of a table to different partitions, choose this distribution method. The specified column values of a row changelog will determine which partition the changelog is sent to. This distribution method ensures orderliness in each partition and guarantees that the changelog with the same column values is sent to the same partition.
In the Topic Configuration area, configure the following numbers. The changefeed will automatically create the Kafka topics according to the numbers.
- Replication Factor: controls how many Kafka servers each Kafka message is replicated to. The valid value ranges from
min.insync.replicasto the number of Kafka brokers. - Partition Number: controls how many partitions exist in a topic. The valid value range is
[1, 10 * the number of Kafka brokers].
- Replication Factor: controls how many Kafka servers each Kafka message is replicated to. The valid value ranges from
In the Split Event area, choose whether to split
UPDATEevents into separateDELETEandINSERTevents or keep as rawUPDATEevents. For more information, see Split primary or unique key UPDATE events for non-MySQL sinks.Click Next.
Step 4. Review and create your changefeed
- In the Changefeed Name area, specify a name for the changefeed.
- Review all the changefeed configurations that you set. Click Previous to make changes if necessary.
- If all configurations are correct, click Submit to create the changefeed.