Sink to MySQL
本文档介绍如何使用 Sink to MySQL changefeed,将数据从 TiDB Cloud 流式同步到 MySQL。
限制
- 对于每个 TiDB Cloud 集群,最多可以创建 100 个 changefeed。
- 由于 TiDB Cloud 使用 TiCDC 建立 changefeed,因此具有与 TiCDC 相同的限制。
- 如果需要同步的表没有主键或非空唯一索引,则在某些重试场景下,由于同步过程中缺少唯一约束,可能会导致下游插入重复数据。
前提条件
在创建 changefeed 之前,你需要完成以下前提条件:
- 设置网络连接
- 导出并加载已有数据到 MySQL(可选)
- 如果你不加载已有数据,仅希望将增量数据同步到 MySQL,则需要在 MySQL 中创建对应的目标表
网络
确保你的 TiDB Cloud 集群可以连接到 MySQL service。
如果你的 MySQL service 位于没有公网访问权限的 AWS VPC 中,请按照以下步骤操作:
在 MySQL service 所在 VPC 与 TiDB 集群之间 建立 VPC Peering 连接。
修改 MySQL service 关联的安全组的入站规则。
你必须将 TiDB Cloud 集群所在区域的 CIDR 添加到入站规则中。这样可以允许来自 TiDB 集群到 MySQL 实例的流量。
如果 MySQL URL 包含主机名,你需要允许 TiDB Cloud 能够解析 MySQL service 的 DNS 主机名。
- 按照 为 VPC Peering 连接启用 DNS 解析 中的步骤操作。
- 启用 Accepter DNS resolution 选项。
如果你的 MySQL service 位于没有公网访问权限的 Google Cloud VPC 中,请按照以下步骤操作:
如果你的 MySQL service 是 Google Cloud SQL,必须在 Google Cloud SQL 实例关联的 VPC 中暴露一个 MySQL endpoint。你可能需要使用 Google 提供的 Cloud SQL Auth proxy。
在 MySQL service 所在 VPC 与 TiDB 集群之间 建立 VPC Peering 连接。
修改 MySQL 所在 VPC 的 ingress 防火墙规则。
你必须将 TiDB Cloud 集群所在区域的 CIDR 添加到 ingress 防火墙规则中。这样可以允许来自 TiDB Cloud 集群到 MySQL endpoint 的流量。
私有 endpoint 利用云服务商的 Private Link 或 Private Service Connect 技术,使你 VPC 中的资源能够通过私有 IP 地址连接到其他 VPC 的 service,就像这些 service 直接托管在你的 VPC 内一样。
你可以通过私有 endpoint,将 TiDB Cloud 集群安全地连接到你的 MySQL service。如果你的 MySQL service 尚未启用私有 endpoint,请参考 为 Changefeed 设置私有 endpoint 创建一个。
加载已有数据(可选)
Sink to MySQL 连接器只能将某一时间戳之后的增量数据从 TiDB 集群同步到 MySQL。如果你的 TiDB 集群中已经有数据,可以在启用 Sink to MySQL 之前,将 TiDB 集群的已有数据导出并加载到 MySQL。
加载已有数据的步骤如下:
将 tidb_gc_life_time 设置为大于以下两个操作总耗时的值,以防止这段时间内的历史数据被 TiDB 垃圾回收。
- 导出和导入已有数据所需的时间
- 创建 Sink to MySQL 所需的时间
例如:
SET GLOBAL tidb_gc_life_time = '720h';使用 Dumpling 从 TiDB 集群导出数据,然后使用 mydumper/myloader 等社区工具将数据加载到 MySQL service。
从 Dumpling 导出的文件 中,在元信息文件获取 MySQL sink 的起始位置:
以下是一个元信息文件的示例片段。
SHOW MASTER STATUS的Pos即为已有数据的 TSO,也是 MySQL sink 的起始位置。Started dump at: 2020-11-10 10:40:19 SHOW MASTER STATUS: Log: tidb-binlog Pos: 420747102018863124 Finished dump at: 2020-11-10 10:40:20
在 MySQL 中创建目标表
如果你没有加载已有数据,则需要在 MySQL 中手动创建对应的目标表,用于存储来自 TiDB 的增量数据。否则,数据将无法同步。
创建 MySQL sink
完成前提条件后,你可以将数据同步到 MySQL。
进入目标 TiDB 集群的概览页面,在左侧导航栏点击 Data > Changefeed。
点击 Create Changefeed,并选择 MySQL 作为 Destination。
在 Connectivity Method 中,选择连接 MySQL service 的方法。
- 如果选择 VPC Peering 或 Public IP,请填写你的 MySQL endpoint。
- 如果选择 Private Link,请选择你在 网络 部分创建的私有 endpoint,并填写 MySQL service 的 port。
在 Authentication 中,填写 MySQL service 的用户名和密码。
点击 Next,测试 TiDB 是否能成功连接到 MySQL:
- 如果连接成功,将进入下一步配置。
- 如果连接失败,会显示连接错误,你需要处理该错误。错误解决后,再次点击 Next。
自定义 Table Filter,筛选你希望同步的表。规则语法详见 table filter rules。
- Case Sensitive:你可以设置 filter 规则中数据库和表名的匹配是否大小写敏感。默认情况下,匹配不区分大小写。
- Filter Rules:你可以在此列设置 filter 规则。默认有一条规则
*.*,表示同步所有表。添加新规则时,TiDB Cloud 会查询 TiDB 中的所有表,并在右侧仅显示符合规则的表。最多可添加 100 条 filter 规则。 - Tables with valid keys:此列显示具有有效键(包括主键或唯一索引)的表。
- Tables without valid keys:此列显示缺少主键或唯一键的表。这些表在同步时存在挑战,因为缺少唯一标识,可能导致下游处理重复事件时数据不一致。为保证数据一致性,建议在同步前为这些表添加唯一键或主键,或者通过添加 filter 规则排除这些表。例如,可以通过规则
"!test.tbl1"排除表test.tbl1。
自定义 Event Filter,筛选你希望同步的事件。
- Tables matching:你可以在此列设置 event filter 应用到哪些表。规则语法与前述 Table Filter 区域相同。每个 changefeed 最多可添加 10 条 event filter 规则。
- Event Filter:你可以使用以下 event filter,从 changefeed 中排除特定事件:
- Ignore event:排除指定类型的事件。
- Ignore SQL:排除匹配指定表达式的 DDL 事件。例如,
^drop排除以DROP开头的语句,add column排除包含ADD COLUMN的语句。 - Ignore insert value expression:排除满足特定条件的
INSERT语句。例如,id >= 100排除id大于等于 100 的INSERT语句。 - Ignore update new value expression:排除新值满足指定条件的
UPDATE语句。例如,gender = 'male'排除修改后gender为male的更新。 - Ignore update old value expression:排除旧值满足指定条件的
UPDATE语句。例如,age < 18排除旧值age小于 18 的更新。 - Ignore delete value expression:排除满足指定条件的
DELETE语句。例如,name = 'john'排除name为'john'的删除。
在 Start Replication Position 中,配置 MySQL sink 的起始位置。
- 如果你已通过 Dumpling 加载了已有数据,请选择 Start replication from a specific TSO,并填写从 Dumpling 导出元信息文件中获取的 TSO。
- 如果上游 TiDB 集群中没有任何数据,选择 Start replication from now on。
- 否则,你可以通过选择 Start replication from a specific time 自定义起始时间点。
点击 Next,配置 changefeed 规格。
- 在 Changefeed Specification 区域,指定 changefeed 使用的 Replication Capacity Units (RCUs) 数量。
- 在 Changefeed Name 区域,指定 changefeed 的名称。
点击 Next,检查 changefeed 配置。
如果确认所有配置无误,检查跨区域同步的合规性,并点击 Create。
如果需要修改某些配置,点击 Previous 返回上一步配置页面。
sink 很快会启动,你可以看到 sink 状态从 Creating 变为 Running。
点击 changefeed 名称,可以查看更多 changefeed 详情,如 checkpoint、同步延时、其他统计/指标(信息)等。
如果你已通过 Dumpling 加载了已有数据,在 sink 创建完成后,需要将 GC 时间恢复为原值(默认值为
10m):
SET GLOBAL tidb_gc_life_time = '10m';