AWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップする
このドキュメントでは、AWS でセルフホスト型 Kafka 用の Private Link サービスを設定する方法と、それをTiDB Cloudで動作させる方法について説明します。
このメカニズムは次のように機能します。
- TiDB Cloud VPC はプライベート エンドポイントを介して Kafka VPC に接続します。
- Kafka クライアントはすべての Kafka ブローカーと直接通信する必要があります。
- 各 Kafka ブローカーは、 TiDB Cloud VPC 内のエンドポイントの一意のポートにマッピングされます。
- マッピングを実現するには、Kafka ブートストラップ メカニズムと AWS リソースを活用します。
次の図にその仕組みを示します。
このドキュメントでは、AWS の 3 つのアベイラビリティーゾーン (AZ) にデプロイされた Kafka Private Link サービスに接続する例を示します。同様のポートマッピング原則に基づいて他の構成も可能ですが、このドキュメントでは Kafka Private Link サービスの基本的なセットアッププロセスについて説明します。本番環境では、運用の保守性と監視性が強化された、より回復力の高い Kafka Private Link サービスが推奨されます。
前提条件
独自の AWS アカウントで Kafka Private Link サービスを設定するには、次の権限があることを確認してください。
- EC2ノードを管理する
- VPC の管理
- サブネットを管理する
- セキュリティ グループを管理する
- ロードバランサーを管理する
- エンドポイントサービスの管理
- EC2 ノードに接続して Kafka ノードを構成する
持っていない場合はTiDB Cloud専用クラスターを作成する 。
TiDB Cloud Dedicated クラスターから Kafka デプロイメント情報を取得します。
- TiDB Cloudコンソールで、TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[Changefeed] をクリックします。
- 概要ページで、TiDB クラスターのリージョンを見つけます。Kafka クラスターが同じリージョンにデプロイされることを確認します。
- 「Changefeed の作成」をクリックします。
- ターゲットタイプで、 Kafkaを選択します。
- [接続方法]で、 [プライベート リンク]を選択します。
- 続行する前に、リマインダーにTiDB Cloud AWS アカウントの情報を書き留めておきます。これを使用して、 TiDB Cloud がKafka Private Link サービスのエンドポイントを作成することを承認します。
- AZ の数を選択します。この例では、 3 つの AZを選択します。Kafka クラスターをデプロイする AZ の ID を書き留めます。AZ 名と AZ ID の関係を知りたい場合は、 AWS リソースのアベイラビリティーゾーン ID参照してください。
- Kafka プライベート リンク サービスに固有のKafka アドバタイズ リスナー パターンを入力します。
- 一意のランダム文字列を入力します。数字または小文字のみを含めることができます。これは後でKafka アドバタイズ リスナー パターンを生成するために使用します。
- 「使用状況の確認と生成」をクリックして、ランダム文字列が一意であるかどうかを確認し、Kafka ブローカーの外部アドバタイズ リスナーを組み立てるために使用されるKafka アドバタイズ リスナー パターンを生成します。
すべてのデプロイメント情報を書き留めます。後で Kafka Private Link サービスを構成するときにこれを使用する必要があります。
次の表は、デプロイメント情報の例を示しています。
情報 | 価値 | 注記 |
---|---|---|
リージョン | オレゴン ( us-west-2 ) | 該当なし |
TiDB Cloud AWS アカウントのプリンシパル | arn:aws:iam::<account_id>:root | 該当なし |
AZ ID | usw2-az1 usw2-az2 usw2-az3 | AZ ID を AWS アカウントの AZ 名に合わせます。 例:
|
Kafka アドバタイズド リスナー パターン | 一意のランダム文字列: abc AZ 用に生成されたパターン:
| AZ 名を AZ 指定のパターンにマップします。後で特定の AZ のブローカーに適切なパターンを構成するようにしてください。
|
ステップ1. Kafkaクラスターをセットアップする
新しいクラスターをデプロイする必要がある場合は、 新しい Kafka クラスターをデプロイの手順に従ってください。
既存のクラスターを公開する必要がある場合は、 実行中の Kafka クラスターを再構成するの手順に従ってください。
新しい Kafka クラスターをデプロイ
1. Kafka VPCをセットアップする
Kafka VPC には次のものが必要です。
- ブローカー用のプライベートサブネットが 3 つ (AZ ごとに 1 つ)。
- 任意の AZ にインターネットに接続できる要塞ノードを持つ 1 つのパブリック サブネットと 3 つのプライベート サブネットがあり、Kafka クラスターを簡単にセットアップできます。本番環境では、Kafka VPC に接続できる独自の要塞ノードが存在する場合があります。
サブネットを作成する前に、AZ ID と AZ 名のマッピングに基づいて AZ にサブネットを作成します。次のマッピングを例に挙げます。
usw2-az1
=>us-west-2a
usw2-az2
=>us-west-2c
usw2-az3
=>us-west-2b
次の AZ にプライベート サブネットを作成します。
us-west-2a
us-west-2c
us-west-2b
Kafka VPC を作成するには、次の手順を実行します。
1.1. Kafka VPCを作成する
AWS コンソール > VPC ダッシュボードに進み、Kafka をデプロイするリージョンに切り替えます。
「VPC の作成」をクリックします。VPC設定ページで次のように情報を入力します。
- VPC のみを選択します。
- 名前タグにタグを入力します(例:
Kafka VPC
)。 - IPv4 CIDR 手動入力を選択し、IPv4 CIDR (例:
10.0.0.0/16
) を入力します。 - その他のオプションについてはデフォルト値を使用します。 「VPC の作成」をクリックします。
- VPC 詳細ページで、VPC ID (例:
vpc-01f50b790fa01dffa
) をメモします。
1.2. Kafka VPC にプライベートサブネットを作成する
サブネット一覧ページに進みます。
サブネットの作成をクリックします。
前に書き留めておいたVPC ID (この例では
vpc-01f50b790fa01dffa
) を選択します。次の情報を使用して 3 つのサブネットを追加します。TiDB TiDB Cloudブローカーの
advertised.listener
構成で AZ ID をエンコードする必要があるため、後でブローカーを簡単に構成できるように、サブネット名に AZ ID を入れることをお勧めします。サブネット
us-west-2a
- サブネット名:
broker-usw2-az1
- 利用可能ゾーン:
us-west-2a
- IPv4 サブネット CIDR ブロック:
10.0.0.0/18
- サブネット名:
サブネット2 in
us-west-2c
- サブネット名:
broker-usw2-az2
- 利用可能ゾーン:
us-west-2c
- IPv4 サブネット CIDR ブロック:
10.0.64.0/18
- サブネット名:
サブネット
us-west-2b
- サブネット名:
broker-usw2-az3
- 利用可能ゾーン:
us-west-2b
- IPv4 サブネット CIDR ブロック:
10.0.128.0/18
- サブネット名:
「サブネットの作成」をクリックします。サブネット一覧ページが表示されます。
1.3. Kafka VPCにパブリックサブネットを作成する
サブネットの作成をクリックします。
前に書き留めておいたVPC ID (この例では
vpc-01f50b790fa01dffa
) を選択します。次の情報を使用して、任意の AZ にパブリック サブネットを追加します。
- サブネット名:
bastion
- IPv4 サブネット CIDR ブロック:
10.0.192.0/18
- サブネット名:
「サブネットの作成」をクリックします。サブネット一覧ページが表示されます。
要塞サブネットをパブリックサブネットに構成します。
VPC ダッシュボード > インターネット ゲートウェイに進みます。
kafka-vpc-igw
という名前のインターネット ゲートウェイを作成します。インターネット ゲートウェイの詳細ページの[アクション]で、 [VPC に接続]をクリックして、インターネット ゲートウェイを Kafka VPC に接続します。
VPC ダッシュボード > ルートテーブルに進みます。Kafka VPC のインターネット ゲートウェイへのルート テーブルを作成し、次の情報を含む新しいルートを追加します。
- 名前:
kafka-vpc-igw-route-table
- VC :
Kafka VPC
- ルート:
- 目的地:
0.0.0.0/0
- ターゲット
kafka-vpc-igw
Internet Gateway
- 目的地:
- 名前:
ルート テーブルを要塞サブネットに接続します。ルート テーブルの詳細ページで、 [サブネットの関連付け] > [サブネットの関連付けの編集]をクリックして要塞サブネットを追加し、変更を保存します。
2. Kafkaブローカーを設定する
2.1. 要塞ノードを作成する
EC2 リストページに進みます。要塞サブネットに要塞ノードを作成します。
名前:
bastion-node
Amazon マシンイメージ:
Amazon linux
インスタンスタイプ:
t2.small
キー ペア:
kafka-vpc-key-pair
kafka-vpc-key-pair
という名前の新しいキー ペアを作成します。後の構成のためにkafka-vpc-key-pair.pem をローカルにダウンロードします。ネットワーク設定
- VC :
Kafka VPC
- サブネット:
bastion
- パブリック IP の自動割り当て:
Enable
- Securityグループ: どこからでも SSH ログインを許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。
- VC :
2.2. ブローカーノードを作成する
EC2 リストページに進みます。ブローカー サブネットに 3 つのブローカー ノード (AZ ごとに 1 つ) を作成します。
サブネット
broker-usw2-az1
のブローカー 1名前:
broker-node1
Amazon マシンイメージ:
Amazon linux
インスタンスタイプ:
t2.large
キーペア: 再利用
kafka-vpc-key-pair
ネットワーク設定
- VC :
Kafka VPC
- サブネット:
broker-usw2-az1
- パブリック IP の自動割り当て:
Disable
- Securityグループ: Kafka VPC からのすべての TCP を許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。
- プロトコル:
TCP
- ポート範囲:
0 - 65535
- 出典:
10.0.0.0/16
- プロトコル:
- VC :
サブネット
broker-usw2-az2
のブローカー 2名前:
broker-node2
Amazon マシンイメージ:
Amazon linux
インスタンスタイプ:
t2.large
キーペア: 再利用
kafka-vpc-key-pair
ネットワーク設定
- VC :
Kafka VPC
- サブネット:
broker-usw2-az2
- パブリック IP の自動割り当て:
Disable
- Securityグループ: Kafka VPC からのすべての TCP を許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。
- プロトコル:
TCP
- ポート範囲:
0 - 65535
- 出典:
10.0.0.0/16
- プロトコル:
- VC :
サブネット
broker-usw2-az3
のブローカー 3名前:
broker-node3
Amazon マシンイメージ:
Amazon linux
インスタンスタイプ:
t2.large
キーペア: 再利用
kafka-vpc-key-pair
ネットワーク設定
- VC :
Kafka VPC
- サブネット:
broker-usw2-az3
- パブリック IP の自動割り当て:
Disable
- Securityグループ: Kafka VPC からのすべての TCP を許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。
- プロトコル:
TCP
- ポート範囲:
0 - 65535
- 出典:
10.0.0.0/16
- プロトコル:
- VC :
2.3. Kafka ランタイムバイナリを準備する
要塞ノードの詳細ページに移動します。パブリック IPv4 アドレスを取得します。SSH を使用して、以前にダウンロードした
kafka-vpc-key-pair.pem
使用してノードにログインします。chmod 400 kafka-vpc-key-pair.pem ssh -i "kafka-vpc-key-pair.pem" ec2-user@{bastion_public_ip} # replace {bastion_public_ip} with the IP address of your bastion node, for example, 54.186.149.187 scp -i "kafka-vpc-key-pair.pem" kafka-vpc-key-pair.pem ec2-user@{bastion_public_ip}:~/バイナリをダウンロードします。
# Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz tar -zxf kafka_2.13-3.7.1.tgz wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gzバイナリを各ブローカー ノードにコピーします。
# Replace {broker-node1-ip} with your broker-node1 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # Replace {broker-node2-ip} with your broker-node2 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # Replace {broker-node3-ip} with your broker-node3 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz"
2.4. 各ブローカーノードにKafkaノードを設定する
2.4.1 3つのノードを持つKRaft Kafkaクラスターをセットアップする
各ノードはブローカーとコントローラーのロールとして機能します。各ブローカーに対して次の操作を実行します。
listeners
項目の場合、3 つのブローカーはすべて同じであり、ブローカーとコントローラーのロールとして機能します。- すべてのコントローラーロール ノードに対して同じ CONTROLLER リスナーを構成します。ブローカーロール ノードのみを追加する場合は、
server.properties
の CONTROLLER リスナーは必要ありません。 - ブローカーリスナーを 2 つ構成します。3
INTERNAL
内部アクセス用、EXTERNAL
TiDB Cloudからの外部アクセス用です。
- すべてのコントローラーロール ノードに対して同じ CONTROLLER リスナーを構成します。ブローカーロール ノードのみを追加する場合は、
advertised.listeners
項目については、次の操作を行います。ブローカー ノードの内部 IP を使用して、ブローカーごとに INTERNAL アドバタイズ リスナーを構成します。アドバタイズされた内部 Kafka クライアントはこのアドレスを使用してブローカーにアクセスします。
TiDB TiDB Cloud TiDB Cloudから取得したKafka アドバタイズ リスナー パターンに基づいて EXTERNAL アドバタイズ リスナーを構成します。さまざまな EXTERNAL アドバタイズ リスナーにより、 TiDB Cloudの Kafka クライアントはリクエストを適切なブローカーにルーティングできます。
<port>
、ブローカーを Kafka プライベート リンク サービス アクセス ポイントと区別します。すべてのブローカーの EXTERNAL アドバタイズ リスナーのポート範囲を計画します。これらのポートは、ブローカーがリッスンする実際のポートである必要はありません。これらは、リクエストを別のブローカーに転送するプライベート リンク サービスのロード バランサーがリッスンするポートです。- Kafka アドバタイズ リスナー パターンの
AZ ID
、ブローカーがデプロイされている場所を示します。TiDB TiDB Cloud は、 AZ ID に基づいて、リクエストを異なるエンドポイント DNS 名にルーティングします。
トラブルシューティングを容易にするために、ブローカーごとに異なるブローカー ID を構成することをお勧めします。
計画値は次のとおりです。
- コントローラーポート:
29092
- 内部ポート:
9092
- 外部:
39092
- 外部アドバタイズされたリスナーポート範囲:
9093~9095
- コントローラーポート:
2.4.2. 設定ファイルを作成する
SSH を使用してすべてのブローカー ノードにログインします。次の内容の構成ファイル~/config/server.properties
を作成します。
# brokers in usw2-az1
# broker-node1 ~/config/server.properties
# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses.
# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section.
# 2.1 The pattern for AZ(ID: usw2-az1) is "<broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>".
# 2.2 So the EXTERNAL can be "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port (9093) in the port range of the EXTERNAL advertised listener.
# 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way.
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node1-ip}:9092,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
# brokers in usw2-az2
# broker-node2 ~/config/server.properties
# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses.
# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section.
# 2.1 The pattern for AZ(ID: usw2-az2) is "<broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>".
# 2.2 So the EXTERNAL can be "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port (9094) in the port range of the EXTERNAL advertised listener.
# 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way.
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node2-ip}:9092,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
# brokers in usw2-az3
# broker-node3 ~/config/server.properties
# 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses.
# 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section.
# 2.1 The pattern for AZ(ID: usw2-az3) is "<broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>".
# 2.2 So the EXTERNAL can be "b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port (9095) in the port range of the EXTERNAL advertised listener.
# 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way.
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092
listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://{broker-node3-ip}:9092,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095
controller.listener.names=CONTROLLER
listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
log.dirs=./data
2.4.3 Kafkaブローカーを起動する
スクリプトを作成し、それを実行して各ブローカー ノードで Kafka ブローカーを起動します。
#!/bin/bash
# Get the directory of the current script
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Set JAVA_HOME to the Java installation within the script directory
export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2"
# Define the vars
KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin"
KAFKA_STORAGE_CMD=$KAFKA_DIR/kafka-storage.sh
KAFKA_START_CMD=$KAFKA_DIR/kafka-server-start.sh
KAFKA_DATA_DIR=$SCRIPT_DIR/data
KAFKA_LOG_DIR=$SCRIPT_DIR/log
KAFKA_CONFIG_DIR=$SCRIPT_DIR/config
# Cleanup step, which makes it easy for multiple experiments
# Find all Kafka process IDs
KAFKA_PIDS=$(ps aux | grep 'kafka.Kafka' | grep -v grep | awk '{print $2}')
if [ -z "$KAFKA_PIDS" ]; then
echo "No Kafka processes are running."
else
# Kill each Kafka process
echo "Killing Kafka processes with PIDs: $KAFKA_PIDS"
for PID in $KAFKA_PIDS; do
kill -9 $PID
echo "Killed Kafka process with PID: $PID"
done
echo "All Kafka processes have been killed."
fi
rm -rf $KAFKA_DATA_DIR
mkdir -p $KAFKA_DATA_DIR
rm -rf $KAFKA_LOG_DIR
mkdir -p $KAFKA_LOG_DIR
# Magic id: BRl69zcmTFmiPaoaANybiw, you can use your own
$KAFKA_STORAGE_CMD format -t "BRl69zcmTFmiPaoaANybiw" -c "$KAFKA_CONFIG_DIR/server.properties" > $KAFKA_LOG_DIR/server_format.log
LOG_DIR=$KAFKA_LOG_DIR nohup $KAFKA_START_CMD "$KAFKA_CONFIG_DIR/server.properties" &
2.5. 要塞ノードでクラスター設定をテストする
Kafka ブートストラップをテストします。
export JAVA_HOME=/home/ec2-user/jdk-22.0.2 # Bootstrap from INTERNAL listener ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:9092 | grep 9092 # Expected output (the actual order might be different) {broker-node1-ip}:9092 (id: 1 rack: null) -> ( {broker-node2-ip}:9092 (id: 2 rack: null) -> ( {broker-node3-ip}:9092 (id: 3 rack: null) -> ( # Bootstrap from EXTERNAL listener ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 # Expected output for the last 3 lines (the actual order might be different) # The difference in the output from "bootstrap from INTERNAL listener" is that exceptions or errors might occur because advertised listeners cannot be resolved in Kafka VPC. # We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Link. b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException要塞ノードにプロデューサー スクリプト
produce.sh
を作成します。#!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # Get the directory of the current script SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Set JAVA_HOME to the Java installation within the script directory export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # Define the Kafka directory KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" # Create a topic if it does not exist create_topic() { echo "Creating topic if it does not exist..." $KAFKA_DIR/kafka-topics.sh --create --topic $TOPIC --bootstrap-server $BROKER_LIST --if-not-exists --partitions 3 --replication-factor 3 } # Produce messages to the topic produce_messages() { echo "Producing messages to the topic..." for ((chrono=1; chrono <= 10; chrono++)); do message="Test message "$chrono echo "Create "$message echo $message | $KAFKA_DIR/kafka-console-producer.sh --broker-list $BROKER_LIST --topic $TOPIC done } create_topic produce_messages要塞ノードにコンシューマー スクリプト
consume.sh
を作成します。#!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # Get the directory of the current script SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Set JAVA_HOME to the Java installation within the script directory export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # Define the Kafka directory KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" CONSUMER_GROUP="test-group" # Consume messages from the topic consume_messages() { echo "Consuming messages from the topic..." $KAFKA_DIR/kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $TOPIC --from-beginning --timeout-ms 5000 --consumer-property group.id=$CONSUMER_GROUP } consume_messagesproduce.sh
とconsume.sh
実行して、Kafka クラスターが実行中であることを確認します。これらのスクリプトは、後のネットワーク接続テストでも再利用されます。スクリプトは--partitions 3 --replication-factor 3
でトピックを作成します。これら 3 つのブローカーすべてにデータが含まれていることを確認します。スクリプトが 3 つのブローカーすべてに接続して、ネットワーク接続がテストされることを保証します。# Test write message. ./produce.sh {one_of_broker_ip}:9092# Expected output Creating topic if it does not exist... Producing messages to the topic... Create Test message 1 >>Create Test message 2 >>Create Test message 3 >>Create Test message 4 >>Create Test message 5 >>Create Test message 6 >>Create Test message 7 >>Create Test message 8 >>Create Test message 9 >>Create Test message 10# Test read message ./consume.sh {one_of_broker_ip}:9092# Expected example output (the actual message order might be different) Consuming messages from the topic... Test message 3 Test message 4 Test message 5 Test message 9 Test message 10 Test message 6 Test message 8 Test message 1 Test message 2 Test message 7 [2024-11-01 08:54:27,547] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 10 messages
実行中の Kafka クラスターを再構成する
Kafka クラスターが TiDB クラスターと同じリージョンおよび AZ にデプロイされていることを確認します。ブローカーが異なる AZ にある場合は、正しい AZ に移動します。
1. ブローカーの外部リスナーを構成する
次の構成は、Kafka KRaft クラスターに適用されます。ZK モードの構成も同様です。
構成の変更を計画します。
TiDB Cloudからの外部アクセス用に、ブローカーごとに EXTERNALリスナーを構成します。 EXTERNAL ポートとして一意のポート (例:
39092
) を選択します。TiDB Cloudがさまざまなブローカーを区別できるように、すべてのブローカー ノードに対してTiDB Cloudから取得したKafka アドバタイズ リスナー パターンに基づいて EXTERNALアドバタイズ リスナーを構成します。さまざまな EXTERNAL アドバタイズ リスナーにより、 TiDB Cloudの Kafka クライアントはリクエストを適切なブローカーにルーティングできます。
<port>
ブローカーを Kafka プライベート リンク サービス アクセス ポイントと区別します。すべてのブローカーの EXTERNAL アドバタイズ リスナーのポート範囲を計画します (例:range from 9093
)。これらのポートは、ブローカーがリッスンする実際のポートである必要はありません。これらは、リクエストを別のブローカーに転送するプライベート リンク サービスのロード バランサーがリッスンするポートです。- Kafka アドバタイズ リスナー パターンの
AZ ID
、ブローカーがデプロイされている場所を示します。TiDB TiDB Cloud は、 AZ ID に基づいて、リクエストを異なるエンドポイント DNS 名にルーティングします。
トラブルシューティングを容易にするために、ブローカーごとに異なるブローカー ID を構成することをお勧めします。
SSH を使用して各ブローカー ノードにログインします。各ブローカーの構成ファイルを次の内容に変更します。
# brokers in usw2-az1 # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern for AZ(ID: usw2-az1) is "<broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093", replace <broker_id> with "b" prefix plus "node.id" properties, replace <port> with a unique port(9093) in EXTERNAL advertised listener ports range advertised.listeners=...,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT# brokers in usw2-az2 # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern for AZ(ID: usw2-az2) is "<broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port(9094) in EXTERNAL advertised listener ports range. advertised.listeners=...,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT# brokers in usw2-az3 # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern for AZ(ID: usw2-az3) is "<broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b2.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port(9095) in EXTERNAL advertised listener ports range. advertised.listeners=...,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXTすべてのブローカーを再構成したら、Kafka ブローカーを 1 つずつ再起動します。
2. 内部ネットワークで外部リスナー設定をテストする
Kafka クライアント ノードに Kafka と OpenJDK をダウンロードできます。
# Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference.
wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz
tar -zxf kafka_2.13-3.7.1.tgz
wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz
tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz
次のスクリプトを実行して、ブートストラップが期待どおりに動作するかどうかをテストします。
export JAVA_HOME=/home/ec2-user/jdk-22.0.2
# Bootstrap from the EXTERNAL listener
./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092
# Expected output for the last 3 lines (the actual order might be different)
# There will be some exceptions or errors because advertised listeners cannot be resolved in your Kafka network.
# We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Link.
b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
ステップ 2. Kafka クラスターをプライベート リンク サービスとして公開する
1. ロードバランサーを設定する
異なるポートを持つ 4 つのターゲット グループを持つネットワーク ロード バランサーを作成します。1 つのターゲット グループはブートストラップ用で、他のターゲット グループは異なるブローカーにマップされます。
- ブートストラップ ターゲット グループ => 9092 => broker-node1:39092、broker-node2:39092、broker-node3:39092
- ブローカー ターゲット グループ 1 => 9093 => broker-node1:39092
- ブローカー ターゲット グループ 2 => 9094 => broker-node2:39092
- ブローカー ターゲット グループ 3 => 9095 => broker-node3:39092
ブローカー ロール ノードがさらにある場合は、マッピングを追加する必要があります。ブートストラップ ターゲット グループに少なくとも 1 つのノードがあることを確認してください。回復力のために、各 AZ に 1 つずつ、合計 3 つのノードを追加することをお勧めします。
ロード バランサーを設定するには、次の手順を実行します。
対象グループに進み、4 つのターゲット グループを作成します。
ブートストラップターゲットグループ
- ターゲットタイプ:
Instances
- 対象グループ名:
bootstrap-target-group
- プロトコル:
TCP
- ポート:
9092
- IPアドレスタイプ:
IPv4
- VC :
Kafka VPC
- ヘルスチェックプロトコル:
TCP
broker-node3:39092
ターゲットbroker-node2:39092
broker-node1:39092
- ターゲットタイプ:
ブローカーターゲットグループ1
- ターゲットタイプ:
Instances
- 対象グループ名:
broker-target-group-1
- プロトコル:
TCP
- ポート:
9093
- IPアドレスタイプ:
IPv4
- VC :
Kafka VPC
- ヘルスチェックプロトコル:
TCP
- 登録対象:
broker-node1:39092
- ターゲットタイプ:
ブローカーターゲットグループ2
- ターゲットタイプ:
Instances
- 対象グループ名:
broker-target-group-2
- プロトコル:
TCP
- ポート:
9094
- IPアドレスタイプ:
IPv4
- VC :
Kafka VPC
- ヘルスチェックプロトコル:
TCP
- 登録対象:
broker-node2:39092
- ターゲットタイプ:
ブローカーターゲットグループ3
- ターゲットタイプ:
Instances
- 対象グループ名:
broker-target-group-3
- プロトコル:
TCP
- ポート:
9095
- IPアドレスタイプ:
IPv4
- VC :
Kafka VPC
- ヘルスチェックプロトコル:
TCP
- 登録対象:
broker-node3:39092
- ターゲットタイプ:
ロードバランサーに進み、ネットワーク ロード バランサーを作成します。
- ロードバランサー名:
kafka-lb
- スキーマ:
Internal
- ロードバランサの IP アドレスタイプ:
IPv4
- VC :
Kafka VPC
- 可用性ゾーン:
usw2-az1
とbroker-usw2-az1 subnet
usw2-az2
とbroker-usw2-az2 subnet
usw2-az3
とbroker-usw2-az3 subnet
- Securityグループ: 次のルールで新しいセキュリティ グループを作成します。
- インバウンドルールは、Kafka VPC からのすべての TCP を許可します: タイプ -
All TCP
、ソース -Anywhere-IPv4
- アウトバウンドルールは、Kafka VPC へのすべての TCP を許可します: タイプ -
All TCP
、宛先 -Anywhere-IPv4
- インバウンドルールは、Kafka VPC からのすべての TCP を許可します: タイプ -
- リスナーとルーティング:
- プロトコル:
TCP
; ポート:9092
; 転送先:bootstrap-target-group
- プロトコル:
TCP
; ポート:9093
; 転送先:broker-target-group-1
- プロトコル:
TCP
; ポート:9094
; 転送先:broker-target-group-2
- プロトコル:
TCP
; ポート:9095
; 転送先:broker-target-group-3
- プロトコル:
- ロードバランサー名:
要塞ノードでロード バランサーをテストします。この例では、Kafka ブートストラップのみをテストします。ロード バランサーは Kafka EXTERNAL リスナーをリッスンしているため、EXTERNAL アドバタイズ リスナーのアドレスは要塞ノードで解決できません。ロード バランサーの詳細ページから
kafka-lb
DNS 名 (例kafka-lb-77405fa57191adcb.elb.us-west-2.amazonaws.com
を書き留めます。要塞ノードでスクリプトを実行します。# Replace {lb_dns_name} to your actual value export JAVA_HOME=/home/ec2-user/jdk-22.0.2 ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {lb_dns_name}:9092 # Expected output for the last 3 lines (the actual order might be different) b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException # You can also try bootstrap in other ports 9093/9094/9095. It will succeed probabilistically because NLB in AWS resolves LB DNS to the IP address of any availability zone and disables cross-zone load balancing by default. # If you enable cross-zone load balancing in LB, it will succeed. However, it is unnecessary and might cause additional cross-AZ traffic.
2. プライベートリンクサービスを設定する
エンドポイントサービスに進みます。 「エンドポイント サービスの作成」をクリックして、Kafka ロード バランサーのプライベート リンク サービスを作成します。
- 名前:
kafka-pl-service
- ロードバランサタイプ:
Network
- ロードバランサー:
kafka-lb
- 含ま
usw2-az3
アベイラビリティゾーンusw2-az2
usw2-az1
- エンドポイントの承認が必要:
Acceptance required
- プライベートDNS名を有効にする:
No
- 名前:
サービス名を書き留めます。これをTiDB Cloudに提供する必要があります (例
com.amazonaws.vpce.us-west-2.vpce-svc-0f49e37e1f022cd45
。kafka-pl-service の詳細ページで、「プリンシパルを許可」タブをクリックし、 TiDB Cloudの AWS アカウントがエンドポイントを作成できるようにします。 前提条件 、たとえば
arn:aws:iam::<account_id>:root
でTiDB Cloudの AWS アカウントを取得できます。
ステップ3. TiDB Cloudから接続する
TiDB Cloudコンソールに戻り、クラスターがプライベート リンクで Kafka クラスターに接続するための変更フィードを作成します。詳細については、 Apache Kafka にシンクする参照してください。
「ChangeFeed ターゲットの構成」>「接続方法」>「プライベート リンク」に進むときは、次のフィールドに対応する値を入力し、必要に応じて他のフィールドに入力します。
- Kafka タイプ:
3 AZs
クラスターが同じ 3 つの AZ にデプロイされていることを確認します。 - Kafka アドバタイズ リスナー パターン:
abc
。これは、 前提条件でKafka アドバタイズ リスナー パターンを生成するために使用する一意のランダム文字列と同じです。 - エンドポイント サービス名: Kafka サービス名。
- ブートストラップ ポート:
9092
背後に専用のブートストラップ ターゲット グループを構成するため、1 つのポートで十分です。
- Kafka タイプ:
Apache Kafka にシンクするの手順に進みます。
これでタスクは正常に完了しました。
FAQ
2 つの異なるTiDB Cloudプロジェクトから同じ Kafka Private Link サービスに接続するにはどうすればよいですか?
このドキュメントに従って最初のプロジェクトからの接続をすでに正常に設定している場合は、次のようにして 2 番目のプロジェクトから同じ Kafka Private Link サービスに接続できます。
このドキュメントの冒頭の指示に従ってください。
ステップ1. Kafkaクラスターをセットアップするに進んだら、 実行中の Kafka クラスターを再構成するに従って、別の EXTERNAL リスナーとアドバタイズされたリスナーのグループを作成します。このグループにはEXTERNAL2という名前を付けることができます。EXTERNAL2のポート範囲はEXTERNALと重複できないことに注意してください。
ブローカーを再構成した後、ブートストラップおよびブローカー ターゲット グループを含む別のターゲット グループをロード バランサーに追加します。
次の情報を使用してTiDB Cloud接続を構成します。
- 新しいブートストラップポート
- 新しい Kafka 広告リスナー グループ
- 同じエンドポイントサービス