プライベートリンク接続を介して Alibaba Cloud Self-Hosted Kafka に接続する
このドキュメントでは、 Alibaba Cloud Endpoint Service プライベートリンク接続を使用して、 TiDB Cloud Essential クラスターを Alibaba Cloud 内のセルフホスト型 Kafka クラスターに接続する方法について説明します。
このメカニズムは次のように機能します。
- プライベート リンク接続は、
advertised.listenersで定義されたブローカー外部アドレスを返すブートストラップ ポートを使用して Alibaba Cloud エンドポイント サービスに接続します。 - プライベート リンク接続は、ブローカーの外部アドレスを使用してエンドポイント サービスに接続します。
- Alibaba Cloud エンドポイント サービスは、リクエストをロード バランサーに転送します。
- ロード バランサーは、ポート マッピングに基づいて、対応する Kafka ブローカーにリクエストを転送します。
たとえば、ポート マッピングは次のようになります。
| ブローカー外部アドレスポート | ロードバランサーのリスナーポート | ロードバランサバックエンドサーバー |
|---|---|---|
| 9093 | 9093 | ブローカーノード1:39092 |
| 9094 | 9094 | ブローカーノード2:39092 |
| 9095 | 9095 | ブローカーノード3:39092 |
前提条件
Kafka クラスターがあること、またはクラスターを設定するための次の権限があることを確認します。
- ECSノードを管理する
- VPCとvSwitchを管理する
- ECS ノードに接続して Kafka ノードを構成する
Alibaba Cloud アカウントでロードバランサーとエンドポイント サービスを設定するには、次の権限があることを確認してください。
- ロードバランサーを管理する
- エンドポイントサービスの管理
TiDB Cloud EssentialはAlibaba Cloudでホストされており、アクティブです。後で使用するために、以下の詳細情報を取得して保存してください。
- Alibaba CloudアカウントID
- 可用性ゾーン(AZ)
Alibaba Cloud アカウント ID とアベイラビリティーゾーンを表示するには、次の手順を実行します。
- TiDB Cloudコンソールで、TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[設定] > [ネットワーク]をクリックします。
- [データフローのプライベート リンク接続]領域で、 [プライベート リンク接続の作成] をクリックします。
- 表示されたダイアログで、Alibaba Cloud アカウント ID とアベイラビリティーゾーンを見つけることができます。
次の表は、展開情報の例を示しています。
| 情報 | 価値 | 注記 |
|---|---|---|
| リージョン | ap-southeast-1 | 該当なし |
| TiDB Cloud Alibaba Cloud アカウント | <account_id> | 該当なし |
| AZ ID | ap-southeast-1a ap-southeast-1b ap-southeast-1c | 該当なし |
| Kafka アドバタイズドリスナーパターン | <ブローカーID>.unique_name.alicloud.plc.tidbcloud.com:<ポート> | unique_nameはプレースホルダーであり、 ステップ4の実際の値に置き換えられます。 |
ステップ1. Kafkaクラスターをセットアップする
新しいクラスターをデプロイする必要がある場合は、 新しいKafkaクラスターをデプロイ手順に従ってください。
既存のクラスターを公開する必要がある場合は、 実行中の Kafka クラスターを再構成する手順に従ってください。
新しいKafkaクラスターをデプロイ
1. Kafka VPC を設定する
Kafka VPC には次のものが必要です。
- ブローカー用のプライベート vSwitch が 3 つ (AZ ごとに 1 つ)。
- 任意の AZ にパブリック vSwitch を 1 つ、インターネットに接続できる Bastion ノードを 1 つ、プライベート vSwitch を 3 つ配置することで、Kafka クラスターを簡単にセットアップできます。本番環境では、Kafka VPC に接続できる独自の Bastion ノードを配置することもできます。
Kafka VPC を作成するには、次の手順を実行します。
1.1. Kafka VPCを作成する
Alibaba Cloud コンソール > VPC ダッシュボードに進み、Kafka をデプロイするリージョンに切り替えます。
「VPCの作成」をクリックします。VPC設定ページで以下の情報を入力します。
名前を入力します (例:
Kafka VPC)。TiDB Cloudでプライベート リンク接続を設定するリージョンを選択します。
[IPv4 CIDR ブロックを手動で入力]を選択し、IPv4 CIDR (例:
10.0.0.0/16を入力します。Kafkaブローカーをデプロイする各AZごとにvSwitchを作成し、IPv4 CIDRを設定します。例:
- broker-ap-southeast-1a vSwitch
ap-southeast-1a: 10.0.0.0/18 - broker-ap-southeast-1b vSwitch
ap-southeast-1b: 10.0.64.0/18 - broker-ap-southeast-1c vSwitch
ap-southeast-1c: 10.0.128.0/18 ap-southeast-1aの bastion vSwitch: 10.0.192.0/18
- broker-ap-southeast-1a vSwitch
その他のオプションはデフォルト値を使用します。 「OK」をクリックします。
VPC の詳細ページで、VPC ID (例:
vpc-t4nfx2vcqazc862e9fg06) をメモします。
2. Kafkaブローカーを設定する
2.1. 要塞ノードを作成する
ECSコンソールに進みます。要塞 vSwitch に要塞ノードを作成します。
- ネットワークとゾーン:
Kafka VPCおよびbastionvSwitch。 - インスタンスとイメージ: インスタンス タイプが
ecs.t5-lc1m2.small、イメージがAlibaba Cloud Linux。 - ネットワークとSecurityグループ:
Assign Public IPv4 Addressを選択します。 - キーペア:
kafka-vpc-key-pair。kafka-vpc-key-pairという名前の新しいキーペアを作成します。kafka-vpc-key-pair.pemローカルマシンにダウンロードして、後で設定します。 - Securityグループ:どこからでもSSHログインを許可する新しいセキュリティグループを作成します。本番環境の安全性を確保するために、ルールを絞り込むことができます。
- インスタンス名:
bastion-node。
2.2. ブローカーノードを作成する
ECSコンソールに進みます。vSwitch に 3 つのブローカー ノード (AZ ごとに 1 つ) を作成します。
vSwitch 1 のブローカー
broker-ap-southeast-1a- ネットワークとゾーン:
Kafka VPCおよびbroker-ap-southeast-1avSwitch - インスタンスとイメージ:
ecs.t5-lc1m2.smallインスタンスタイプとAlibaba Cloud Linuxイメージ - キーペア:再利用
kafka-vpc-key-pair。 - インスタンス名:
broker-node1 - Securityグループ: Kafka VPCからのすべてのTCPを許可する新しいセキュリティグループを作成します。本番環境では、安全性を考慮してルールを絞り込むことができます。インバウンドルール: -プロトコル:
TCP-ポート範囲:All-ソース:10.0.0.0/16
- ネットワークとゾーン:
vSwitch
broker-ap-southeast-1bのブローカー 2- ネットワークとゾーン:
Kafka VPCおよびbroker-ap-southeast-1bvSwitch - インスタンスとイメージ:
ecs.t5-lc1m2.smallインスタンスタイプとAlibaba Cloud Linuxイメージ - キーペア:再利用
kafka-vpc-key-pair。 - インスタンス名:
broker-node2 - Securityグループ: Kafka VPCからのすべてのTCPを許可する新しいセキュリティグループを作成します。本番環境では、安全性を考慮してルールを絞り込むことができます。インバウンドルール: -プロトコル:
TCP-ポート範囲:All-ソース:10.0.0.0/16
- ネットワークとゾーン:
vSwitch
broker-ap-southeast-1cのブローカー 3- ネットワークとゾーン:
Kafka VPCおよびbroker-ap-southeast-1cvSwitch - インスタンスとイメージ:
ecs.t5-lc1m2.smallインスタンスタイプとAlibaba Cloud Linuxイメージ - キーペア:再利用
kafka-vpc-key-pair。 - インスタンス名:
broker-node3 - Securityグループ: Kafka VPCからのすべてのTCPを許可する新しいセキュリティグループを作成します。本番環境では、安全性を考慮してルールを絞り込むことができます。インバウンドルール: -プロトコル:
TCP-ポート範囲:All-ソース:10.0.0.0/16
- ネットワークとゾーン:
2.3. Kafkaランタイムバイナリの準備
要塞ノードの詳細ページに移動します。パブリックIPv4アドレスを取得します。SSHを使用して、先ほどダウンロードした
kafka-vpc-key-pair.pemを使用してノードにログインします。chmod 400 kafka-vpc-key-pair.pem scp -i "kafka-vpc-key-pair.pem" kafka-vpc-key-pair.pem root@{bastion_public_ip}:~/ # replace {bastion_public_ip} with the IP address of your bastion node ssh -i "kafka-vpc-key-pair.pem" root@{bastion_public_ip}バイナリを要塞ノードにダウンロードします。
# Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz tar -zxf kafka_2.13-3.7.1.tgz wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz要塞ノードから各ブローカー ノードにバイナリをコピーします。
# Replace {broker-node1-ip} with your broker-node1 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz root@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" root@{broker-node1-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz root@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" root@{broker-node1-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # Replace {broker-node2-ip} with your broker-node2 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz root@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" root@{broker-node2-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz root@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" root@{broker-node2-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # Replace {broker-node3-ip} with your broker-node3 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz root@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" root@{broker-node3-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz root@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" root@{broker-node3-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz"
2.4. 各ブローカーノードにKafkaノードを設定する
2.4.1 3つのノードを持つKRaft Kafkaクラスターをセットアップする
各ノードはブローカーとコントローラーの役割を担います。各ブローカーに対して以下の操作を実行してください。
listeners項目の場合、3 つのブローカーはすべて同じであり、ブローカーとコントローラーのロールとして機能します。- すべてのコントローラーロールノードに同じ CONTROLLER リスナーを設定します。ブローカーロールノードのみを追加する場合は、
server.propertiesの CONTROLLER リスナーは必要ありません。 - ブローカーリスナーを 2 つ構成します。3
INTERNALは内部アクセス用、EXTERNALはTiDB Cloudからの外部アクセス用です。
- すべてのコントローラーロールノードに同じ CONTROLLER リスナーを設定します。ブローカーロールノードのみを追加する場合は、
advertised.listeners項目については、次の操作を行います。各ブローカーに対して、ブローカーノードの内部IPアドレスを使用して、INTERNALアドバタイズリスナーを設定します。アドバタイズされた内部Kafkaクライアントは、このアドレスを使用してブローカーにアクセスします。
TiDB Cloudから取得したKafkaアドバタイズリスナーパターンに基づいて、各ブローカーノードに外部アドバタイズリスナーを設定することで、 TiDB Cloudが複数のブローカーを区別できるようになります。異なる外部アドバタイズリスナーを設定することで、 TiDB CloudのKafkaクライアントはリクエストを適切なブローカーにルーティングできるようになります。
<port>ブローカーと Kafka プライベートリンクサービスのアクセスポイントを区別します。すべてのブローカーの EXTERNAL アドバタイズリスナーのポート範囲を計画してください。これらのポートは、ブローカーが実際にリッスンするポートである必要はありません。これらは、リクエストを別のブローカーに転送するプライベートリンクサービスのロードバランサーがリッスンするポートです。- Kafka アドバタイズドリスナーパターンの
AZ ID、ブローカーがデプロイされている場所を示します。TiDB TiDB Cloud は、 AZ ID に基づいてリクエストを異なるエンドポイント DNS 名にルーティングします。
トラブルシューティングを容易にするために、ブローカーごとに異なるブローカー ID を構成することをお勧めします。
計画値は次のとおりです。
- コントローラーポート:
29092 - 内部ポート:
9092 - 外部:
39092 - 外部アドバタイズリスナーポート範囲:
9093~9095
- コントローラーポート:
2.4.2. 設定ファイルを作成する
SSHを使用してすべてのブローカーノードにログインします。以下の内容の設定ファイル~/config/server.propertiesを作成します。
# brokers in ap-southeast-1a
# broker-node1 ~/config/server.properties
# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses.
# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section.
# 2.1 The pattern is "<broker_id>.unique_name.alicloud.plc.tidbcloud.com:<port>".
# 2.2 If there are more broker role nodes, you can configure them in the same way.
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node1-ip}:9092,EXTERNAL://b1.unique_name.alicloud.plc.tidbcloud.com:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
# brokers in ap-southeast-1b
# broker-node2 ~/config/server.properties
# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses.
# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section.
# 2.1 The pattern is "<broker_id>.unique_name.alicloud.plc.tidbcloud.com:<port>".
# 2.2 If there are more broker role nodes, you can configure them in the same way.
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node2-ip}:9092,EXTERNAL://b2.unique_name.alicloud.plc.tidbcloud.com:9094
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
# brokers in ap-southeast-1c
# broker-node3 ~/config/server.properties
# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses.
# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section.
# 2.1 The pattern is "<broker_id>.unique_name.alicloud.plc.tidbcloud.com:<port>".
# 2.2 If there are more broker role nodes, you can configure them in the same way.
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node3-ip}:9092,EXTERNAL://b3.ap-southeast-1c.unique_name.alicloud.plc.tidbcloud.com:9095
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
2.4.3 Kafkaブローカーを起動する
スクリプトを作成し、それを実行して各ブローカー ノードで Kafka ブローカーを起動します。
#!/bin/bash
# Get the directory of the current script
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Set JAVA_HOME to the Java installation within the script directory
export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2"
# Define the vars
KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin"
KAFKA_STORAGE_CMD=$KAFKA_DIR/kafka-storage.sh
KAFKA_START_CMD=$KAFKA_DIR/kafka-server-start.sh
KAFKA_DATA_DIR=$SCRIPT_DIR/data
KAFKA_LOG_DIR=$SCRIPT_DIR/log
KAFKA_CONFIG_DIR=$SCRIPT_DIR/config
# Cleanup step, which makes it easy for multiple experiments
# Find all Kafka process IDs
KAFKA_PIDS=$(ps aux | grep 'kafka.Kafka' | grep -v grep | awk '{print $2}')
if [ -z "$KAFKA_PIDS" ]; then
echo "No Kafka processes are running."
else
# Kill each Kafka process
echo "Killing Kafka processes with PIDs: $KAFKA_PIDS"
for PID in $KAFKA_PIDS; do
kill -9 $PID
echo "Killed Kafka process with PID: $PID"
done
echo "All Kafka processes have been killed."
fi
rm -rf $KAFKA_DATA_DIR
mkdir -p $KAFKA_DATA_DIR
rm -rf $KAFKA_LOG_DIR
mkdir -p $KAFKA_LOG_DIR
# Magic id: BRl69zcmTFmiPaoaANybiw, you can use your own
$KAFKA_STORAGE_CMD format -t "BRl69zcmTFmiPaoaANybiw" -c "$KAFKA_CONFIG_DIR/server.properties" > $KAFKA_LOG_DIR/server_format.log
LOG_DIR=$KAFKA_LOG_DIR nohup $KAFKA_START_CMD "$KAFKA_CONFIG_DIR/server.properties" &
2.5. 要塞ノードでクラスター設定をテストする
Kafka ブートストラップをテストします。
export JAVA_HOME=~/jdk-22.0.2 # Bootstrap from INTERNAL listener ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:9092 | grep 9092 # Expected output (the actual order might be different) {broker-node1-ip}:9092 (id: 1 rack: null) -> ( {broker-node2-ip}:9092 (id: 2 rack: null) -> ( {broker-node3-ip}:9092 (id: 3 rack: null) -> ( # Bootstrap from EXTERNAL listener ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 # Expected output for the last 3 lines (the actual order might be different) # The difference in the output from "bootstrap from INTERNAL listener" is that exceptions or errors might occur because advertised listeners cannot be resolved in Kafka VPC. # We will make them resolvable on the TiDB Cloud side and route requests to the right broker when you create a changefeed that connects to this Kafka cluster via Private Link. b1.unique_name.alicloud.plc.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.unique_name.alicloud.plc.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.unique_name.alicloud.plc.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException要塞ノードにプロデューサー スクリプト
produce.shを作成します。#!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # Get the directory of the current script SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Set JAVA_HOME to the Java installation within the script directory export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # Define the Kafka directory KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" # Create a topic if it does not exist create_topic() { echo "Creating topic if it does not exist..." $KAFKA_DIR/kafka-topics.sh --create --topic $TOPIC --bootstrap-server $BROKER_LIST --if-not-exists --partitions 3 --replication-factor 3 } # Produce messages to the topic produce_messages() { echo "Producing messages to the topic..." for ((chrono=1; chrono <= 10; chrono++)); do message="Test message "$chrono echo "Create "$message echo $message | $KAFKA_DIR/kafka-console-producer.sh --broker-list $BROKER_LIST --topic $TOPIC done } create_topic produce_messages要塞ノードにコンシューマー スクリプト
consume.shを作成します。#!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # Get the directory of the current script SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Set JAVA_HOME to the Java installation within the script directory export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # Define the Kafka directory KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" CONSUMER_GROUP="test-group" # Consume messages from the topic consume_messages() { echo "Consuming messages from the topic..." $KAFKA_DIR/kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $TOPIC --from-beginning --timeout-ms 5000 --consumer-property group.id=$CONSUMER_GROUP } consume_messagesproduce.shとconsume.shを実行して、Kafkaクラスターが実行中であることを確認してください。これらのスクリプトは、後ほどネットワーク接続テストにも再利用されます。スクリプトは--partitions 3 --replication-factor 3のトピックを作成します。これら3つのブローカーすべてにデータが含まれていることを確認してください。ネットワーク接続がテストされることを保証するために、スクリプトが3つのブローカーすべてに接続することを確認してください。# Test write message. sh produce.sh {one_of_broker_ip}:9092# Expected output Creating topic if it does not exist... Producing messages to the topic... Create Test message 1 >>Create Test message 2 >>Create Test message 3 >>Create Test message 4 >>Create Test message 5 >>Create Test message 6 >>Create Test message 7 >>Create Test message 8 >>Create Test message 9 >>Create Test message 10# Test read message sh consume.sh {one_of_broker_ip}:9092# Expected example output (the actual message order might be different) Consuming messages from the topic... Test message 3 Test message 4 Test message 5 Test message 9 Test message 10 Test message 6 Test message 8 Test message 1 Test message 2 Test message 7 [2024-11-01 08:54:27,547] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 10 messages
実行中の Kafka クラスターを再構成する
Kafka クラスターが TiDB クラスターと同じリージョンおよび AZ にデプロイされていることを確認してください。ブローカーが異なる AZ にある場合は、正しい AZ に移動してください。
1. ブローカーの外部リスナーを構成する
以下の設定はKafka KRaftクラスターに適用されます。ZKモードの設定も同様です。
構成の変更を計画します。
TiDB Cloudからの外部アクセス用に、各ブローカーに EXTERNALリスナーを設定します。EXTERNAL ポートとして、一意のポート(例:
39092)を選択します。TiDB Cloudから取得したKafkaアドバタイズリスナーパターンに基づいて、各ブローカーノードにEXTERNALアドバタイズリスナーを設定することで、 TiDB Cloudが複数のブローカーを区別できるようになります。異なるEXTERNALアドバタイズリスナーを設定することで、TiDB CloudのKafkaクライアントはリクエストを適切なブローカーにルーティングできるようになります。
<port>ブローカーと Kafka プライベートリンクサービスのアクセスポイントを区別します。すべてのブローカーの EXTERNAL アドバタイズリスナーのポート範囲を計画してください(例:range from 9093)。これらのポートは、ブローカーが実際にリッスンするポートである必要はありません。これらは、リクエストを別のブローカーに転送するプライベートリンクサービスのロードバランサーがリッスンするポートです。
トラブルシューティングを容易にするために、ブローカーごとに異なるブローカー ID を構成することをお勧めします。
SSHを使用して各ブローカーノードにログインします。各ブローカーの設定ファイルを以下の内容に変更します。
# brokers in ap-southeast-1a # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern is "<broker_id>.unique_name.alicloud.plc.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b1.unique_name.alicloud.plc.tidbcloud.com:9093", replace <broker_id> with "b" prefix plus "node.id" properties, replace <port> with a unique port(9093) in EXTERNAL advertised listener ports range advertised.listeners=...,EXTERNAL://b1.unique_name.alicloud.plc.tidbcloud.com:9093 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT# brokers in ap-southeast-1b # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern is "<broker_id>.unique_name.alicloud.plc.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b2.unique_name.alicloud.plc.tidbcloud.com:9094". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port(9094) in EXTERNAL advertised listener ports range. advertised.listeners=...,EXTERNAL://b2.unique_name.alicloud.plc.tidbcloud.com:9094 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT# brokers in ap-southeast-1c # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern is "<broker_id>.unique_name.alicloud.plc.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b2.unique_name.alicloud.plc.tidbcloud.com:9095". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port(9095) in EXTERNAL advertised listener ports range. advertised.listeners=...,EXTERNAL://b3.unique_name.alicloud.plc.tidbcloud.com:9095 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXTすべてのブローカーを再構成したら、Kafka ブローカーを 1 つずつ再起動します。
2. 内部ネットワークで外部リスナーの設定をテストする
Kafka と OpenJDK を Kafka クライアント ノードにダウンロードできます。
# Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference.
wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz
tar -zxf kafka_2.13-3.7.1.tgz
wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz
tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz
次のスクリプトを実行して、ブートストラップが期待どおりに動作するかどうかをテストします。
export JAVA_HOME=/root/jdk-22.0.2
# Bootstrap from the EXTERNAL listener
./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092
# Expected output for the last 3 lines (the actual order might be different)
# There will be some exceptions or errors because advertised listeners cannot be resolved in your Kafka network.
# We will make them resolvable on the TiDB Cloud side and route requests to the right broker when you create a changefeed that connects to this Kafka cluster via Private Link.
b1.ap-southeast-1a.unique_name.alicloud.plc.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
b2.ap-southeast-1b.unique_name.alicloud.plc.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
b3.ap-southeast-1c.unique_name.alicloud.plc.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
ステップ2. Kafkaクラスターをプライベートリンクサービスとして公開する
1. ロードバランサーを設定する
異なるポートを持つ4つのサーバーグループを持つネットワークロードバランサーを作成します。1つのサーバーグループはブートストラップ用で、他のサーバーグループは異なるブローカーにマッピングされます。
- ブートストラップサーバーグループ => 9092 => broker-node1:39092、broker-node2:39092、broker-node3:39092
- ブローカーサーバーグループ 1 => 9093 => broker-node1:39092
- ブローカーサーバーグループ 2 => 9094 => broker-node2:39092
- ブローカーサーバーグループ 3 => 9095 => broker-node3:39092
ブローカーロールノードが複数ある場合は、マッピングを追加する必要があります。ブートストラップターゲットグループに少なくとも1つのノードがあることを確認してください。耐障害性を確保するため、各AZに1つずつ、合計3つのノードを追加することをお勧めします。
ロードバランサーを設定するには、次の手順を実行します。
サーバーグループに進み、4 つのサーバーグループを作成します。
ブートストラップサーバーグループ
- サーバーグループタイプ:
Serverを選択 - サーバーグループ名:
bootstrap-server-group - VPC :
Kafka VPC - バックエンドサーバープロトコル:
TCPを選択 - バックエンドサーバー: 作成したサーバーグループをクリックし、
broker-node1:39092broker-node3:39092含むバックエンドサーバーを追加しますbroker-node2:39092
- サーバーグループタイプ:
ブローカーサーバーグループ1
- サーバーグループタイプ:
Serverを選択 - サーバーグループ名:
broker-server-group-1 - VPC :
Kafka VPC - バックエンドサーバープロトコル:
TCPを選択 - バックエンドサーバー: 作成したサーバーグループをクリックし、バックエンドサーバー
broker-node1:39092を追加します。
- サーバーグループタイプ:
ブローカーサーバーグループ2
- サーバーグループタイプ:
Serverを選択 - サーバーグループ名:
broker-server-group-2 - VPC :
Kafka VPC - バックエンドサーバープロトコル:
TCPを選択 - バックエンドサーバー: 作成したサーバーグループをクリックし、バックエンドサーバー
broker-node2:39092を追加します。
- サーバーグループタイプ:
ブローカーサーバーグループ3
- サーバーグループタイプ:
Serverを選択 - サーバーグループ名:
broker-server-group-3 - VPC :
Kafka VPC - バックエンドサーバープロトコル:
TCPを選択 - バックエンドサーバー: 作成したサーバーグループをクリックし、バックエンドサーバー
broker-node3:39092を追加します。
- サーバーグループタイプ:
ナショナルリーグに進み、ネットワーク ロード バランサーを作成します。
- ネットワークタイプ:
Internal-facingを選択 - VPC :
Kafka VPC - ゾーン:
ap-southeast-1aとbroker-ap-southeast-1a vswitchap-southeast-1bとbroker-ap-southeast-1b vswitchap-southeast-1cとbroker-ap-southeast-1c vswitch
- IPバージョン:
IPv4を選択 - インスタンス名:
kafka-nlb - 「今すぐ作成」をクリックしてロードバランサーを作成します。
- ネットワークタイプ:
作成したロード バランサーを見つけて、 [リスナーの作成]をクリックして 4 つの TCP リスナーを作成します。
ブートストラップサーバーグループ
- リスナープロトコル:
TCPを選択 - リスナーポート:
9092 - サーバー グループ: 以前に作成したサーバーグループ
bootstrap-server-groupを選択します。
- リスナープロトコル:
ブローカーサーバーグループ1
- リスナープロトコル:
TCPを選択 - リスナーポート:
9093 - サーバー グループ: 以前に作成したサーバーグループ
broker-server-group-1を選択します。
- リスナープロトコル:
ブローカーサーバーグループ2
- リスナープロトコル:
TCPを選択 - リスナーポート:
9094 - サーバー グループ: 以前に作成したサーバーグループ
broker-server-group-2を選択します。
- リスナープロトコル:
ブローカーサーバーグループ3
- リスナープロトコル:
TCPを選択 - リスナーポート:
9095 - サーバー グループ: 以前に作成したサーバーグループ
broker-server-group-3を選択します。
- リスナープロトコル:
要塞ノードでロードバランサーをテストします。この例では、Kafka ブートストラップのみをテストします。ロードバランサーは Kafka EXTERNAL リスナーをリッスンしているため、EXTERNAL アドバタイズされたリスナーのアドレスは要塞ノードでは解決できません。ロードバランサーの詳細ページから
kafka-lbDNS 名(例:nlb-o21d6wyjknamw8hjxb.ap-southeast-1.nlb.aliyuncsslbintl.com)をメモしておいてください。要塞ノードでスクリプトを実行してください。# Replace {lb_dns_name} to your actual value export JAVA_HOME=~/jdk-22.0.2 ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {lb_dns_name}:9092 # Expected output for the last 3 lines (the actual order might be different) b1.unique_name.alicloud.plc.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.unique_name.alicloud.plc.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.unique_name.alicloud.plc.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
2. Alibaba Cloudエンドポイントサービスを設定する
同じリージョンにエンドポイント サービスを設定します。
エンドポイント サービスを作成するには、 エンドポイントサービスに進みます。
- サービスリソースタイプ:
NLB選択 - サービス リソースの選択: NLB が含まれるすべてのゾーンを選択し、前の手順で作成した NLB を選択します。
- エンドポイント接続を自動的に受け入れる:
No選択することをお勧めします
- サービスリソースタイプ:
エンドポイントサービスの詳細ページに移動し、エンドポイントサービス名(例:
com.aliyuncs.privatelink.<region>.xxxxx)をコピーします。これは後でTiDB Cloudで使用する必要があります。エンドポイントサービスの詳細ページで、「サービスホワイトリスト」タブをクリックし、 「ホワイトリストに追加」をクリックして、 前提条件で取得した Alibaba Cloud アカウント ID を入力します。
ステップ3. TiDB Cloudでプライベートリンク接続を作成する
TiDB Cloudでプライベート リンク接続を作成するには、次の手順を実行します。
ステップ2で取得した Alibaba Cloud エンドポイント サービス名 (例:
com.aliyuncs.privatelink.<region>.xxxxx) を使用して、 TiDB Cloudにプライベート リンク接続を作成します。詳細についてはAlibaba Cloud Endpoint Service のプライベートリンク接続を作成する参照してください。
TiDB Cloudのデータフロー サービスが Kafka クラスターにアクセスできるように、プライベート リンク接続にドメインをアタッチします。
詳細については、 プライベートリンク接続にドメインを添付するを参照してください。 「ドメインのアタッチ」ダイアログで、ドメインの種類として「TiDB Cloud Managed」を選択し、生成されたドメインの一意の名前を後で使用するためにコピーする必要があることに注意してください。
ステップ4. Kafka設定内の一意の名前プレースホルダーを置き換える
- Kafka ブローカー ノードに戻り、各ブローカーの
advertised.listeners構成内のunique_nameプレースホルダーを、前の手順で取得した実際の一意の名前に置き換えます。 - すべてのブローカーを再構成したら、Kafka ブローカーを 1 つずつ再起動します。
これで、このプライベート リンク接続と 9092 をブートストラップ ポートとして使用し、 TiDB Cloudから Kafka クラスターに接続できるようになります。