在 AWS 中搭建自托管 Kafka Private Link 服务
本文档介绍如何在 AWS 中为自托管 Kafka 搭建 Private Link 服务,并使其与 TiDB Cloud 协同工作。
其机制如下:
- TiDB Cloud VPC 通过私有端点连接到 Kafka VPC。
- Kafka 客户端需要直接与所有 Kafka broker 通信。
- 每个 Kafka broker 映射到 TiDB Cloud VPC 内端点的唯一端口。
- 利用 Kafka 的 bootstrap 机制和 AWS 资源实现端口映射。
下图展示了该机制。

本文以在 AWS 三个可用区(AZ)部署的 Kafka Private Link 服务为例进行说明。你也可以基于类似的端口映射原理进行其他配置,但本文档覆盖了 Kafka Private Link 服务的基础搭建流程。对于生产环境,建议搭建更具弹性、具备更好运维性和可观测性的 Kafka Private Link 服务。
前置条件
确保你拥有在自己的 AWS 账号中搭建 Kafka Private Link 服务的以下权限:
- 管理 EC2 节点
- 管理 VPC
- 管理子网
- 管理安全组
- 管理负载均衡器
- 管理端点服务
- 连接 EC2 节点以配置 Kafka 节点
如果你还没有 TiDB Cloud Dedicated 集群,请创建 TiDB Cloud Dedicated 集群。
从你的 TiDB Cloud Dedicated 集群获取 Kafka 部署信息。
- 在 TiDB Cloud 控制台中,进入 TiDB 集群的集群总览页面,然后点击左侧导航栏的 Data > Changefeed。
- 在总览页面,找到 TiDB 集群所在的 region。确保你的 Kafka 集群也部署在同一区域。
- 点击 Create Changefeed。
- 在 Destination 选择 Kafka。
- 在 Connectivity Method 选择 Private Link。
- 记下 Reminders before proceeding 中 TiDB Cloud AWS 账号的信息。你需要用它来授权 TiDB Cloud 创建 Kafka Private Link 服务的端点。
- 选择 Number of AZs。本例选择 3 AZs。记下你希望部署 Kafka 集群的 AZ ID。如果你想了解 AZ 名称与 AZ ID 的对应关系,请参见 Availability Zone IDs for your AWS resources。
- 为你的 Kafka Private Link 服务输入唯一的 Kafka Advertised Listener Pattern。
- 输入一个唯一的随机字符串,只能包含数字或小写字母。你将用它来生成 Kafka Advertised Listener Pattern。
- 点击 Check usage and generate 检查该随机字符串是否唯一,并生成用于组装 Kafka broker EXTERNAL advertised listener 的 Kafka Advertised Listener Pattern。
请记下所有部署信息,后续配置 Kafka Private Link 服务时需要用到。
下表为部署信息示例。
| 信息 | 值 | 备注 |
|---|---|---|
| Region | Oregon (us-west-2) | N/A |
| Principal of TiDB Cloud AWS Account | arn:aws:iam::<account_id>:root | N/A |
| AZ IDs |
| 将 AZ ID 与 AWS 账号中的 AZ 名称对应。 示例:
|
| Kafka Advertised Listener Pattern | 唯一随机字符串:abc 为各 AZ 生成的 pattern:
| 将 AZ 名称映射到 AZ 专属 pattern。确保后续在特定 AZ 的 broker 上配置正确的 pattern。
|
步骤 1. 搭建 Kafka 集群
如果你需要部署新集群,请参考 部署新 Kafka 集群。
如果你需要暴露已有集群,请参考 重配置运行中的 Kafka 集群。
部署新 Kafka 集群
1. 搭建 Kafka VPC
Kafka VPC 需要满足以下要求:
- 为每个 AZ 分别创建三个 broker 专用私有子网。
- 在任意 AZ 创建一个带有堡垒机节点的公有子网,该节点可连接互联网和三个私有子网,便于搭建 Kafka 集群。生产环境下你可以有自己的堡垒机节点连接 Kafka VPC。
在创建子网前,请根据 AZ ID 与 AZ 名称的映射关系创建子网。以如下映射为例:
usw2-az1=>us-west-2ausw2-az2=>us-west-2cusw2-az3=>us-west-2b
在以下 AZ 创建私有子网:
us-west-2aus-west-2cus-west-2b
按以下步骤创建 Kafka VPC。
1.1. 创建 Kafka VPC
进入 AWS 控制台 > VPC 控制面板,切换到你希望部署 Kafka 的 region。
点击 Create VPC,在 VPC settings 页面填写如下信息:
- 选择 VPC only。
- 在 Name tag 输入标签,例如
Kafka VPC。 - 选择 IPv4 CIDR manual input,输入 IPv4 CIDR,例如
10.0.0.0/16。 - 其他选项保持默认,点击 Create VPC。
- 在 VPC 详情页记下 VPC ID,例如
vpc-01f50b790fa01dffa。
1.2. 在 Kafka VPC 中创建私有子网
进入 Subnets 列表页面。
点击 Create subnet。
选择之前记下的 VPC ID(本例为
vpc-01f50b790fa01dffa)。添加三个子网,建议在子网名称中包含 AZ ID,便于后续配置 broker,因为 TiDB Cloud 要求在 broker 的
advertised.listener配置中编码 AZ ID。us-west-2a的子网- Subnet name:
broker-usw2-az1 - Availability Zone:
us-west-2a - IPv4 subnet CIDR block:
10.0.0.0/18
- Subnet name:
us-west-2c的子网- Subnet name:
broker-usw2-az2 - Availability Zone:
us-west-2c - IPv4 subnet CIDR block:
10.0.64.0/18
- Subnet name:
us-west-2b的子网- Subnet name:
broker-usw2-az3 - Availability Zone:
us-west-2b - IPv4 subnet CIDR block:
10.0.128.0/18
- Subnet name:
点击 Create subnet,进入 Subnets Listing 页面。
1.3. 在 Kafka VPC 中创建公有子网
点击 Create subnet。
选择之前记下的 VPC ID(本例为
vpc-01f50b790fa01dffa)。在任意 AZ 添加公有子网,填写如下信息:
- Subnet name:
bastion - IPv4 subnet CIDR block:
10.0.192.0/18
- Subnet name:
将 bastion 子网配置为公有子网。
进入 VPC 控制台 > Internet gateways,创建名为
kafka-vpc-igw的 Internet Gateway。在 Internet gateways Detail 页面,点击 Actions 下的 Attach to VPC,将 Internet Gateway 绑定到 Kafka VPC。
进入 VPC 控制台 > Route tables,在 Kafka VPC 中创建路由表并添加如下路由:
- Name:
kafka-vpc-igw-route-table - VPC:
Kafka VPC - Route:
- Destination:
0.0.0.0/0 - Target:
Internet Gateway,kafka-vpc-igw
- Destination:
- Name:
将路由表关联到 bastion 子网。在路由表详情页点击 Subnet associations > Edit subnet associations,添加 bastion 子网并保存。
2. 搭建 Kafka broker
2.1. 创建堡垒机节点
进入 EC2 列表页面,在 bastion 子网中创建堡垒机节点。
Name:
bastion-nodeAmazon Machine Image:
Amazon linuxInstance Type:
t2.smallKey pair:
kafka-vpc-key-pair。新建名为kafka-vpc-key-pair的密钥对,并下载 kafka-vpc-key-pair.pem 以便后续配置。网络设置
- VPC:
Kafka VPC - Subnet:
bastion - Auto-assign public IP:
Enable - Security Group: 新建安全组,允许任意来源 SSH 登录。生产环境下可收紧规则以提升安全性。
- VPC:
2.2. 创建 broker 节点
进入 EC2 列表页面,在各 broker 子网中分别创建三个 broker 节点,每个 AZ 一个。
broker-usw2-az1子网的 Broker 1Name:
broker-node1Amazon Machine Image:
Amazon linuxInstance Type:
t2.largeKey pair: 复用
kafka-vpc-key-pair网络设置
- VPC:
Kafka VPC - Subnet:
broker-usw2-az1 - Auto-assign public IP:
Disable - Security Group: 新建安全组,允许来自 Kafka VPC 的所有 TCP。生产环境下可收紧规则。
- Protocol:
TCP - Port range:
0 - 65535 - Source:
10.0.0.0/16
- Protocol:
- VPC:
broker-usw2-az2子网的 Broker 2Name:
broker-node2Amazon Machine Image:
Amazon linuxInstance Type:
t2.largeKey pair: 复用
kafka-vpc-key-pair网络设置
- VPC:
Kafka VPC - Subnet:
broker-usw2-az2 - Auto-assign public IP:
Disable - Security Group: 新建安全组,允许来自 Kafka VPC 的所有 TCP。生产环境下可收紧规则。
- Protocol:
TCP - Port range:
0 - 65535 - Source:
10.0.0.0/16
- Protocol:
- VPC:
broker-usw2-az3子网的 Broker 3Name:
broker-node3Amazon Machine Image:
Amazon linuxInstance Type:
t2.largeKey pair: 复用
kafka-vpc-key-pair网络设置
- VPC:
Kafka VPC - Subnet:
broker-usw2-az3 - Auto-assign public IP:
Disable - Security Group: 新建安全组,允许来自 Kafka VPC 的所有 TCP。生产环境下可收紧规则。
- Protocol:
TCP - Port range:
0 - 65535 - Source:
10.0.0.0/16
- Protocol:
- VPC:
2.3. 准备 Kafka 运行时二进制文件
进入堡垒机节点详情页,获取 Public IPv4 address。使用 SSH 登录该节点,并用之前下载的
kafka-vpc-key-pair.pem。chmod 400 kafka-vpc-key-pair.pem ssh -i "kafka-vpc-key-pair.pem" ec2-user@{bastion_public_ip} # 将 {bastion_public_ip} 替换为你的堡垒机节点 IP,例如 54.186.149.187 scp -i "kafka-vpc-key-pair.pem" kafka-vpc-key-pair.pem ec2-user@{bastion_public_ip}:~/下载二进制文件。
# 下载 Kafka 和 OpenJDK 并解压。你可以根据需要选择二进制版本。 wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz tar -zxf kafka_2.13-3.7.1.tgz wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz将二进制文件拷贝到每个 broker 节点。
# 将 {broker-node1-ip} 替换为你的 broker-node1 IP scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # 将 {broker-node2-ip} 替换为你的 broker-node2 IP scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # 将 {broker-node3-ip} 替换为你的 broker-node3 IP scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz"
2.4. 在每个 broker 节点上搭建 Kafka 节点
2.4.1 搭建三节点 KRaft Kafka 集群
每个节点同时作为 broker 和 controller。对每个 broker 执行以下操作:
对于
listeners配置项,所有三个 broker 都相同,均作为 broker 和 controller 角色:- 对所有 controller 角色节点配置相同的 CONTROLLER listener。如果只添加 broker 角色节点,则无需在
server.properties中配置 CONTROLLER listener。 - 配置两个 broker listener,
INTERNAL用于内部访问,EXTERNAL用于 TiDB Cloud 的外部访问。
- 对所有 controller 角色节点配置相同的 CONTROLLER listener。如果只添加 broker 角色节点,则无需在
对于
advertised.listeners配置项,操作如下:为每个 broker 配置 INTERNAL advertised listener,使用 broker 节点的内网 IP。内部 Kafka 客户端通过该地址访问 broker。
为每个 broker 节点配置基于 TiDB Cloud 获取的 Kafka Advertised Listener Pattern 的 EXTERNAL advertised listener,帮助 TiDB Cloud 区分不同 broker。不同的 EXTERNAL advertised listener 使 TiDB Cloud 的 Kafka 客户端能够路由到正确的 broker。
<port>用于区分来自 Kafka Private Link Service 访问点的不同 broker。为所有 broker 的 EXTERNAL advertised listener 规划一个端口区间。这些端口不必是 broker 实际监听的端口,而是 Private Link Service 负载均衡器监听的端口,会转发到不同 broker。- Kafka Advertised Listener Pattern 中的
AZ ID表示 broker 部署的可用区。TiDB Cloud 会根据 AZ ID 路由到不同的端点 DNS 名称。
建议为不同 broker 配置不同的 broker ID,便于排查问题。
规划值如下:
- CONTROLLER 端口:
29092 - INTERNAL 端口:
9092 - EXTERNAL:
39092 - EXTERNAL advertised listener 端口区间:
9093~9095
- CONTROLLER 端口:
2.4.2. 创建配置文件
使用 SSH 登录每个 broker 节点,创建配置文件 ~/config/server.properties,内容如下。
# brokers in usw2-az1
# broker-node1 ~/config/server.properties
# 1. 将 {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} 替换为实际 IP。
# 2. 按 "前置条件" 部分的 "Kafka Advertised Listener Pattern" 配置 "advertised.listeners" 中的 EXTERNAL。
# 2.1 AZ(ID: usw2-az1) 的 pattern 是 "<broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>"。
# 2.2 所以 EXTERNAL 可以为 "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093"。<broker_id> 用 "b" 前缀加 "node.id",<port> 用 EXTERNAL advertised listener 端口区间内唯一端口(9093)。
# 2.3 如果同一 AZ 有更多 broker 角色节点,也可同样配置。
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node1-ip}:9092,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
# brokers in usw2-az2
# broker-node2 ~/config/server.properties
# 1. 将 {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} 替换为实际 IP。
# 2. 按 "前置条件" 部分的 "Kafka Advertised Listener Pattern" 配置 "advertised.listeners" 中的 EXTERNAL。
# 2.1 AZ(ID: usw2-az2) 的 pattern 是 "<broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>"。
# 2.2 所以 EXTERNAL 可以为 "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094"。<broker_id> 用 "b" 前缀加 "node.id",<port> 用 EXTERNAL advertised listener 端口区间内唯一端口(9094)。
# 2.3 如果同一 AZ 有更多 broker 角色节点,也可同样配置。
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node2-ip}:9092,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
# brokers in usw2-az3
# broker-node3 ~/config/server.properties
# 1. 将 {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} 替换为实际 IP。
# 2. 按 "前置条件" 部分的 "Kafka Advertised Listener Pattern" 配置 "advertised.listeners" 中的 EXTERNAL。
# 2.1 AZ(ID: usw2-az3) 的 pattern 是 "<broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>"。
# 2.2 所以 EXTERNAL 可以为 "b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095"。<broker_id> 用 "b" 前缀加 "node.id",<port> 用 EXTERNAL advertised listener 端口区间内唯一端口(9095)。
# 2.3 如果同一 AZ 有更多 broker 角色节点,也可同样配置。
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node3-ip}:9092,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
2.4.3 启动 Kafka broker
创建脚本并在每个 broker 节点执行以启动 Kafka broker。
#!/bin/bash
# 获取当前脚本目录
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# 设置 JAVA_HOME
export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2"
# 定义变量
KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin"
KAFKA_STORAGE_CMD=$KAFKA_DIR/kafka-storage.sh
KAFKA_START_CMD=$KAFKA_DIR/kafka-server-start.sh
KAFKA_DATA_DIR=$SCRIPT_DIR/data
KAFKA_LOG_DIR=$SCRIPT_DIR/log
KAFKA_CONFIG_DIR=$SCRIPT_DIR/config
# 清理步骤,便于多次实验
# 查找所有 Kafka 进程
KAFKA_PIDS=$(ps aux | grep 'kafka.Kafka' | grep -v grep | awk '{print $2}')
if [ -z "$KAFKA_PIDS" ]; then
echo "No Kafka processes are running."
else
# 杀死每个 Kafka 进程
echo "Killing Kafka processes with PIDs: $KAFKA_PIDS"
for PID in $KAFKA_PIDS; do
kill -9 $PID
echo "Killed Kafka process with PID: $PID"
done
echo "All Kafka processes have been killed."
fi
rm -rf $KAFKA_DATA_DIR
mkdir -p $KAFKA_DATA_DIR
rm -rf $KAFKA_LOG_DIR
mkdir -p $KAFKA_LOG_DIR
# Magic id: BRl69zcmTFmiPaoaANybiw, 你可以自定义
$KAFKA_STORAGE_CMD format -t "BRl69zcmTFmiPaoaANybiw" -c "$KAFKA_CONFIG_DIR/server.properties" > $KAFKA_LOG_DIR/server_format.log
LOG_DIR=$KAFKA_LOG_DIR nohup $KAFKA_START_CMD "$KAFKA_CONFIG_DIR/server.properties" &
2.5. 在堡垒机节点测试集群设置
测试 Kafka bootstrap。
export JAVA_HOME=/home/ec2-user/jdk-22.0.2 # 从 INTERNAL listener 启动 ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:9092 | grep 9092 # 期望输出(实际顺序可能不同) {broker-node1-ip}:9092 (id: 1 rack: null) -> ( {broker-node2-ip}:9092 (id: 2 rack: null) -> ( {broker-node3-ip}:9092 (id: 3 rack: null) -> ( # 从 EXTERNAL listener 启动 ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 # 最后 3 行期望输出(实际顺序可能不同) # 与 "从 INTERNAL listener 启动" 的区别在于,因 advertised listener 在 Kafka VPC 内无法解析,可能出现异常或错误。 # 我们会在 TiDB Cloud 侧使其可解析,并在你通过 Private Link 创建 changefeed 连接该 Kafka 集群时路由到正确的 broker。 b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException在堡垒机节点创建生产者脚本
produce.sh。#!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # 获取当前脚本目录 SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # 设置 JAVA_HOME export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # 定义 Kafka 目录 KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" # 如果不存在则创建 topic create_topic() { echo "Creating topic if it does not exist..." $KAFKA_DIR/kafka-topics.sh --create --topic $TOPIC --bootstrap-server $BROKER_LIST --if-not-exists --partitions 3 --replication-factor 3 } # 向 topic 生产消息 produce_messages() { echo "Producing messages to the topic..." for ((chrono=1; chrono <= 10; chrono++)); do message="Test message "$chrono echo "Create "$message echo $message | $KAFKA_DIR/kafka-console-producer.sh --broker-list $BROKER_LIST --topic $TOPIC done } create_topic produce_messages在堡垒机节点创建消费者脚本
consume.sh。#!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # 获取当前脚本目录 SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # 设置 JAVA_HOME export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # 定义 Kafka 目录 KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" CONSUMER_GROUP="test-group" # 从 topic 消费消息 consume_messages() { echo "Consuming messages from the topic..." $KAFKA_DIR/kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $TOPIC --from-beginning --timeout-ms 5000 --consumer-property group.id=$CONSUMER_GROUP } consume_messages执行
produce.sh和consume.sh验证 Kafka 集群运行正常。这些脚本也会用于后续网络连通性测试。脚本会用--partitions 3 --replication-factor 3创建 topic,确保三台 broker 都有数据。确保脚本会连接所有三台 broker,以保证网络连通性被测试到。# 测试写消息 ./produce.sh {one_of_broker_ip}:9092# 期望输出 Creating topic if it does not exist... Producing messages to the topic... Create Test message 1 >>Create Test message 2 >>Create Test message 3 >>Create Test message 4 >>Create Test message 5 >>Create Test message 6 >>Create Test message 7 >>Create Test message 8 >>Create Test message 9 >>Create Test message 10# 测试读消息 ./consume.sh {one_of_broker_ip}:9092# 期望输出示例(实际消息顺序可能不同) Consuming messages from the topic... Test message 3 Test message 4 Test message 5 Test message 9 Test message 10 Test message 6 Test message 8 Test message 1 Test message 2 Test message 7 [2024-11-01 08:54:27,547] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 10 messages
重配置运行中的 Kafka 集群
确保你的 Kafka 集群部署在与 TiDB 集群相同的 region 和 AZ。如果有 broker 在不同 AZ,请将其迁移到正确的 AZ。
1. 为 broker 配置 EXTERNAL listener
以下配置适用于 Kafka KRaft 集群。ZK 模式配置类似。
规划配置变更。
为每个 broker 配置 EXTERNAL listener,用于 TiDB Cloud 的外部访问。选择唯一端口作为 EXTERNAL 端口,例如
39092。为每个 broker 节点配置基于 TiDB Cloud 获取的 Kafka Advertised Listener Pattern 的 EXTERNAL advertised listener,帮助 TiDB Cloud 区分不同 broker。不同的 EXTERNAL advertised listener 使 TiDB Cloud 的 Kafka 客户端能够路由到正确的 broker。
<port>用于区分来自 Kafka Private Link Service 访问点的不同 broker。为所有 broker 的 EXTERNAL advertised listener 规划一个端口区间,例如从9093开始。这些端口不必是 broker 实际监听的端口,而是 Private Link Service 负载均衡器监听的端口,会转发到不同 broker。- Kafka Advertised Listener Pattern 中的
AZ ID表示 broker 部署的可用区。TiDB Cloud 会根据 AZ ID 路由到不同的端点 DNS 名称。
建议为不同 broker 配置不同的 broker ID,便于排查问题。
使用 SSH 登录每个 broker 节点,修改每个 broker 的配置文件如下:
# brokers in usw2-az1 # 添加 EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # 按 "前置条件" 部分的 "Kafka Advertised Listener Pattern" 添加 EXTERNAL advertised listeners # 1. AZ(ID: usw2-az1) 的 pattern 是 "<broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. 所以 EXTERNAL 可以为 "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093",<broker_id> 用 "b" 前缀加 "node.id",<port> 用 EXTERNAL advertised listener 端口区间内唯一端口(9093) advertised.listeners=...,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 # 配置 EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT# brokers in usw2-az2 # 添加 EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # 按 "前置条件" 部分的 "Kafka Advertised Listener Pattern" 添加 EXTERNAL advertised listeners # 1. AZ(ID: usw2-az2) 的 pattern 是 "<broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. 所以 EXTERNAL 可以为 "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094",<broker_id> 用 "b" 前缀加 "node.id",<port> 用 EXTERNAL advertised listener 端口区间内唯一端口(9094) advertised.listeners=...,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 # 配置 EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT# brokers in usw2-az3 # 添加 EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # 按 "前置条件" 部分的 "Kafka Advertised Listener Pattern" 添加 EXTERNAL advertised listeners # 1. AZ(ID: usw2-az3) 的 pattern 是 "<broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. 所以 EXTERNAL 可以为 "b2.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095",<broker_id> 用 "b" 前缀加 "node.id",<port> 用 EXTERNAL advertised listener 端口区间内唯一端口(9095) advertised.listeners=...,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 # 配置 EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT重新配置所有 broker 后,依次重启你的 Kafka broker。
2. 在内网测试 EXTERNAL listener 设置
你可以在 Kafka 客户端节点下载 Kafka 和 OpenJDK。
# 下载 Kafka 和 OpenJDK 并解压。你可以根据需要选择二进制版本。
wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz
tar -zxf kafka_2.13-3.7.1.tgz
wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz
tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz
执行以下脚本测试 bootstrap 是否正常。
export JAVA_HOME=/home/ec2-user/jdk-22.0.2
# 从 EXTERNAL listener 启动
./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092
# 最后 3 行期望输出(实际顺序可能不同)
# 因为 advertised listener 在你的 Kafka 网络内无法解析,会有一些异常或错误。
# 我们会在 TiDB Cloud 侧使其可解析,并在你通过 Private Link 创建 changefeed 连接该 Kafka 集群时路由到正确的 broker。
b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
步骤 2. 将 Kafka 集群暴露为 Private Link 服务
1. 搭建负载均衡器
创建一个网络负载均衡器(NLB),包含四个不同端口的目标组。一个目标组用于 bootstrap,其他分别映射到不同 broker。
- bootstrap 目标组 => 9092 => broker-node1:39092,broker-node2:39092,broker-node3:39092
- broker 目标组 1 => 9093 => broker-node1:39092
- broker 目标组 2 => 9094 => broker-node2:39092
- broker 目标组 3 => 9095 => broker-node3:39092
如果有更多 broker 角色节点,需要添加更多映射。确保 bootstrap 目标组至少有一个节点,建议每个 AZ 各加一个节点以提升可用性。
按以下步骤搭建负载均衡器:
进入 Target groups 创建四个目标组。
Bootstrap 目标组
- Target type:
Instances - Target group name:
bootstrap-target-group - Protocol:
TCP - Port:
9092 - IP address type:
IPv4 - VPC:
Kafka VPC - Health check protocol:
TCP - Register targets:
broker-node1:39092,broker-node2:39092,broker-node3:39092
- Target type:
Broker 目标组 1
- Target type:
Instances - Target group name:
broker-target-group-1 - Protocol:
TCP - Port:
9093 - IP address type:
IPv4 - VPC:
Kafka VPC - Health check protocol:
TCP - Register targets:
broker-node1:39092
- Target type:
Broker 目标组 2
- Target type:
Instances - Target group name:
broker-target-group-2 - Protocol:
TCP - Port:
9094 - IP address type:
IPv4 - VPC:
Kafka VPC - Health check protocol:
TCP - Register targets:
broker-node2:39092
- Target type:
Broker 目标组 3
- Target type:
Instances - Target group name:
broker-target-group-3 - Protocol:
TCP - Port:
9095 - IP address type:
IPv4 - VPC:
Kafka VPC - Health check protocol:
TCP - Register targets:
broker-node3:39092
- Target type:
进入 Load balancers 创建网络负载均衡器。
- Load balancer name:
kafka-lb - Schema:
Internal - Load balancer IP address type:
IPv4 - VPC:
Kafka VPC - Availability Zones:
usw2-az1绑定broker-usw2-az1 subnetusw2-az2绑定broker-usw2-az2 subnetusw2-az3绑定broker-usw2-az3 subnet
- Security groups: 新建安全组,规则如下:
- 入站规则允许来自 Kafka VPC 的所有 TCP:Type -
{ports of target groups},如9092-9095;Source -{TiDB Cloud 的 CIDR}。获取 region 内 TiDB Cloud 的 CIDR,请在 TiDB Cloud 控制台 左上角切换到目标项目,点击 Project Settings > Network Access,再点击 Project CIDR > AWS。 - 出站规则允许所有 TCP 到 Kafka VPC:Type -
All TCP;Destination -Anywhere-IPv4
- 入站规则允许来自 Kafka VPC 的所有 TCP:Type -
- 监听器与路由:
- Protocol:
TCP; Port:9092; Forward to:bootstrap-target-group - Protocol:
TCP; Port:9093; Forward to:broker-target-group-1 - Protocol:
TCP; Port:9094; Forward to:broker-target-group-2 - Protocol:
TCP; Port:9095; Forward to:broker-target-group-3
- Protocol:
- Load balancer name:
在堡垒机节点测试负载均衡器。本例只测试 Kafka bootstrap。由于负载均衡器监听的是 Kafka EXTERNAL listener,EXTERNAL advertised listener 的地址在堡垒机节点无法解析。记下负载均衡器详情页的
kafka-lbDNS 名称,例如kafka-lb-77405fa57191adcb.elb.us-west-2.amazonaws.com。在堡垒机节点执行脚本。# 将 {lb_dns_name} 替换为实际值 export JAVA_HOME=/home/ec2-user/jdk-22.0.2 ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {lb_dns_name}:9092 # 最后 3 行期望输出(实际顺序可能不同) b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException # 你也可以尝试在 9093/9094/9095 端口 bootstrap。由于 AWS 的 NLB 默认将 LB DNS 解析到任意可用区的 IP 并禁用跨区负载均衡,因此有概率成功。 # 如果你在 LB 启用跨区负载均衡,则会成功,但这通常不需要,且可能带来额外的跨 AZ 流量。
2. 搭建 Private Link 服务
进入 Endpoint service,点击 Create endpoint service,为 Kafka 负载均衡器创建 Private Link 服务。
- Name:
kafka-pl-service - Load balancer type:
Network - Load balancers:
kafka-lb - Included Availability Zones:
usw2-az1,usw2-az2,usw2-az3 - Require acceptance for endpoint:
Acceptance required - Enable private DNS name:
No
- Name:
记下 Service name,你需要将其提供给 TiDB Cloud,例如
com.amazonaws.vpce.us-west-2.vpce-svc-0f49e37e1f022cd45。在 kafka-pl-service 详情页,点击 Allow principals 标签页,允许 TiDB Cloud 的 AWS 账号创建端点。你可以在 前置条件 获取 TiDB Cloud 的 AWS 账号,例如
arn:aws:iam::<account_id>:root。
步骤 3. 从 TiDB Cloud 连接
回到 TiDB Cloud 控制台,为 集群 创建 changefeed,通过 Private Link 连接到 Kafka 集群。详细信息参见 Sink to Apache Kafka。
当你进行 Configure the changefeed target > Connectivity Method > Private Link 时,按需填写以下字段及其他字段:
- Kafka Type:
3 AZs。确保你的 Kafka 集群部署在同样的三个 AZ。 - Kafka Advertised Listener Pattern:
abc。与 前置条件 中用于生成 Kafka Advertised Listener Pattern 的唯一随机字符串一致。 - Endpoint Service Name: Kafka 服务名。
- Bootstrap Ports:
9092。单端口即可,因为你已在其后配置了专用的 bootstrap 目标组。
- Kafka Type:
按照 Sink to Apache Kafka 的步骤继续操作。
现在你已成功完成该任务。
FAQ
如何从两个不同的 TiDB Cloud 项目连接同一个 Kafka Private Link 服务?
如果你已按本文档成功完成第一个项目的连接,可以按如下方式让第二个项目也连接同一个 Kafka Private Link 服务:
按本文档从头操作。
当你进行 步骤 1. 搭建 Kafka 集群 时,参考 重配置运行中的 Kafka 集群 新增一组 EXTERNAL listener 和 advertised listener,可以命名为 EXTERNAL2。注意 EXTERNAL2 的端口区间不能与 EXTERNAL 重叠。
重新配置 broker 后,在负载均衡器中新增一组目标组,包括 bootstrap 和 broker 目标组。
配置 TiDB Cloud 连接时填写以下信息:
- 新的 Bootstrap 端口
- 新的 Kafka Advertised Listener Group
- 相同的 Endpoint Service