AWS でセルフホスト型 Kafka プライベートリンク サービスをセットアップする

O
q

このドキュメントでは、AWS でセルフホスト型 Kafka 用の Private Link サービスを設定する方法と、それをTiDB Cloudで動作させる方法について説明します。

このメカニズムは次のように機能します。

  1. TiDB Cloud VPC はプライベート エンドポイントを介して Kafka VPC に接続します。
  2. Kafka クライアントはすべての Kafka ブローカーと直接通信する必要があります。
  3. 各 Kafka ブローカーは、 TiDB Cloud VPC 内のエンドポイントの一意のポートにマッピングされます。
  4. マッピングを実現するには、Kafka ブートストラップ メカニズムと AWS リソースを活用します。

次の図にその仕組みを示します。

Connect to AWS Self-Hosted Kafka Private Link Service

このドキュメントでは、AWS の 3 つのアベイラビリティーゾーン (AZ) にデプロイされた Kafka Private Link サービスに接続する例を示します。同様のポートマッピング原則に基づいて他の構成も可能ですが、このドキュメントでは Kafka Private Link サービスの基本的なセットアッププロセスについて説明します。本番環境では、運用の保守性と監視性が強化された、より回復力の高い Kafka Private Link サービスが推奨されます。

前提条件

  1. 独自の AWS アカウントで Kafka Private Link サービスを設定するには、次の権限があることを確認してください。

    • EC2ノードを管理する
    • VPC の管理
    • サブネットを管理する
    • セキュリティ グループを管理する
    • ロードバランサーを管理する
    • エンドポイントサービスの管理
    • EC2 ノードに接続して Kafka ノードを構成する
  2. 持っていない場合はTiDB Cloud専用クラスターを作成する

  3. TiDB Cloud Dedicated クラスターから Kafka デプロイメント情報を取得します。

    1. TiDB Cloudコンソールで、TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[Changefeed]クリックします。
    2. 概要ページで、TiDB クラスターのリージョンを見つけます。Kafka クラスターが同じリージョンにデプロイされることを確認します。
    3. 「Changefeed の作成」をクリックします。
      1. ターゲットタイプで、 Kafkaを選択します。
      2. [接続方法]で、 [プライベート リンク]を選択します。
    4. 続行する前に、リマインダーにTiDB Cloud AWS アカウントの情報を書き留めておきます。これを使用して、 TiDB Cloud がKafka Private Link サービスのエンドポイントを作成することを承認します。
    5. AZ の数を選択します。この例では、 3 つの AZを選択します。Kafka クラスターをデプロイする AZ の ID を書き留めます。AZ 名と AZ ID の関係を知りたい場合は、 AWS リソースのアベイラビリティーゾーン ID参照してください。
    6. Kafka プライベート リンク サービスに固有のKafka アドバタイズ リスナー パターンを入力します。
      1. 一意のランダム文字列を入力します。数字または小文字のみを含めることができます。これは後でKafka アドバタイズ リスナー パターンを生成するために使用します。
      2. 「使用状況の確認と生成」をクリックして、ランダム文字列が一意であるかどうかを確認し、Kafka ブローカーの外部アドバタイズ リスナーを組み立てるために使用されるKafka アドバタイズ リスナー パターンを生成します。

すべてのデプロイメント情報を書き留めます。後で Kafka Private Link サービスを構成するときにこれを使用する必要があります。

次の表は、デプロイメント情報の例を示しています。

情報価値注記
リージョンオレゴン ( us-west-2 )該当なし
TiDB Cloud AWS アカウントのプリンシパルarn:aws:iam::<account_id>:root該当なし
AZ ID
  • usw2-az1
  • usw2-az2
  • usw2-az3
  • AZ ID を AWS アカウントの AZ 名に合わせます。
    例:
    • usw2-az1 => us-west-2a
    • usw2-az2 => us-west-2c
    • usw2-az3 => us-west-2b
    Kafka アドバタイズド リスナー パターン一意のランダム文字列: abc
    AZ 用に生成されたパターン:
    • usw2-az1 => <ブローカーID>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<ポート>
    • usw2-az2 => <ブローカーID>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<ポート>
    • usw2-az3 => <ブローカーID>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<ポート>
    AZ 名を AZ 指定のパターンにマップします。後で特定の AZ のブローカーに適切なパターンを構成するようにしてください。
    • us-west-2a => <ブローカーID>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<ポート>
    • us-west-2c => <ブローカーID>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<ポート>
    • us-west-2b => <ブローカーID>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<ポート>

    ステップ1. Kafkaクラスターをセットアップする

    新しいクラスターをデプロイする必要がある場合は、 新しい Kafka クラスターをデプロイの手順に従ってください。

    既存のクラスターを公開する必要がある場合は、 実行中の Kafka クラスターを再構成するの手順に従ってください。

    新しい Kafka クラスターをデプロイ

    1. Kafka VPCをセットアップする

    Kafka VPC には次のものが必要です。

    • ブローカー用のプライベートサブネットが 3 つ (AZ ごとに 1 つ)。
    • 任意の AZ にインターネットに接続できる要塞ノードを持つ 1 つのパブリック サブネットと 3 つのプライベート サブネットがあり、Kafka クラスターを簡単にセットアップできます。本番環境では、Kafka VPC に接続できる独自の要塞ノードが存在する場合があります。

    サブネットを作成する前に、AZ ID と AZ 名のマッピングに基づいて AZ にサブネットを作成します。次のマッピングを例に挙げます。

    • usw2-az1 => us-west-2a
    • usw2-az2 => us-west-2c
    • usw2-az3 => us-west-2b

    次の AZ にプライベート サブネットを作成します。

    • us-west-2a
    • us-west-2c
    • us-west-2b

    Kafka VPC を作成するには、次の手順を実行します。

    1.1. Kafka VPCを作成する

    1. AWS コンソール > VPC ダッシュボードに進み、Kafka をデプロイするリージョンに切り替えます。

    2. 「VPC の作成」をクリックします。VPC設定ページで次のように情報を入力します。

      1. VPC のみを選択します。
      2. 名前タグにタグを入力します(例: Kafka VPC )。
      3. IPv4 CIDR 手動入力を選択し、IPv4 CIDR (例: 10.0.0.0/16 ) を入力します。
      4. その他のオプションについてはデフォルト値を使用します。 「VPC の作成」をクリックします。
      5. VPC 詳細ページで、VPC ID (例: vpc-01f50b790fa01dffa ) をメモします。

    1.2. Kafka VPC にプライベートサブネットを作成する

    1. サブネット一覧ページに進みます。

    2. サブネットの作成をクリックします。

    3. 前に書き留めておいたVPC ID (この例ではvpc-01f50b790fa01dffa ) を選択します。

    4. 次の情報を使用して 3 つのサブネットを追加します。TiDB TiDB Cloudブローカーのadvertised.listener構成で AZ ID をエンコードする必要があるため、後でブローカーを簡単に構成できるように、サブネット名に AZ ID を入れることをお勧めします。

      • サブネットus-west-2a

        • サブネット名: broker-usw2-az1
        • 利用可能ゾーン: us-west-2a
        • IPv4 サブネット CIDR ブロック: 10.0.0.0/18
      • サブネット2 in us-west-2c

        • サブネット名: broker-usw2-az2
        • 利用可能ゾーン: us-west-2c
        • IPv4 サブネット CIDR ブロック: 10.0.64.0/18
      • サブネットus-west-2b

        • サブネット名: broker-usw2-az3
        • 利用可能ゾーン: us-west-2b
        • IPv4 サブネット CIDR ブロック: 10.0.128.0/18
    5. 「サブネットの作成」をクリックします。サブネット一覧ページが表示されます。

    1.3. Kafka VPCにパブリックサブネットを作成する

    1. サブネットの作成をクリックします。

    2. 前に書き留めておいたVPC ID (この例ではvpc-01f50b790fa01dffa ) を選択します。

    3. 次の情報を使用して、任意の AZ にパブリック サブネットを追加します。

      • サブネット名: bastion
      • IPv4 サブネット CIDR ブロック: 10.0.192.0/18
    4. 「サブネットの作成」をクリックします。サブネット一覧ページが表示されます。

    5. 要塞サブネットをパブリックサブネットに構成します。

      1. VPC ダッシュボード > インターネット ゲートウェイに進みます。 kafka-vpc-igwという名前のインターネット ゲートウェイを作成します。

      2. インターネット ゲートウェイの詳細ページの[アクション]で、 [VPC に接続]をクリックして、インターネット ゲートウェイを Kafka VPC に接続します。

      3. VPC ダッシュボード > ルートテーブルに進みます。Kafka VPC のインターネット ゲートウェイへのルート テーブルを作成し、次の情報を含む新しいルートを追加します。

        • 名前: kafka-vpc-igw-route-table
        • VC : Kafka VPC
        • ルート:
          • 目的地0.0.0.0/0
          • ターゲットkafka-vpc-igw Internet Gateway
      4. ルート テーブルを要塞サブネットに接続します。ルート テーブルの詳細ページで、 [サブネットの関連付け] > [サブネットの関連付けの編集]をクリックして要塞サブネットを追加し、変更を保存します。

    2. Kafkaブローカーを設定する

    2.1. 要塞ノードを作成する

    EC2 リストページに進みます。要塞サブネットに要塞ノードを作成します。

    • 名前: bastion-node

    • Amazon マシンイメージ: Amazon linux

    • インスタンスタイプ: t2.small

    • キー ペア: kafka-vpc-key-pair kafka-vpc-key-pairという名前の新しいキー ペアを作成します。後の構成のためにkafka-vpc-key-pair.pem をローカルにダウンロードします。

    • ネットワーク設定

      • VC : Kafka VPC
      • サブネット: bastion
      • パブリック IP の自動割り当て: Enable
      • Securityグループ: どこからでも SSH ログインを許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。

    2.2. ブローカーノードを作成する

    EC2 リストページに進みます。ブローカー サブネットに 3 つのブローカー ノード (AZ ごとに 1 つ) を作成します。

    • サブネットbroker-usw2-az1のブローカー 1

      • 名前: broker-node1

      • Amazon マシンイメージ: Amazon linux

      • インスタンスタイプ: t2.large

      • キーペア: 再利用kafka-vpc-key-pair

      • ネットワーク設定

        • VC : Kafka VPC
        • サブネット: broker-usw2-az1
        • パブリック IP の自動割り当て: Disable
        • Securityグループ: Kafka VPC からのすべての TCP を許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。
          • プロトコル: TCP
          • ポート範囲: 0 - 65535
          • 出典: 10.0.0.0/16
    • サブネットbroker-usw2-az2のブローカー 2

      • 名前: broker-node2

      • Amazon マシンイメージ: Amazon linux

      • インスタンスタイプ: t2.large

      • キーペア: 再利用kafka-vpc-key-pair

      • ネットワーク設定

        • VC : Kafka VPC
        • サブネット: broker-usw2-az2
        • パブリック IP の自動割り当て: Disable
        • Securityグループ: Kafka VPC からのすべての TCP を許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。
          • プロトコル: TCP
          • ポート範囲: 0 - 65535
          • 出典: 10.0.0.0/16
    • サブネットbroker-usw2-az3のブローカー 3

      • 名前: broker-node3

      • Amazon マシンイメージ: Amazon linux

      • インスタンスタイプ: t2.large

      • キーペア: 再利用kafka-vpc-key-pair

      • ネットワーク設定

        • VC : Kafka VPC
        • サブネット: broker-usw2-az3
        • パブリック IP の自動割り当て: Disable
        • Securityグループ: Kafka VPC からのすべての TCP を許可する新しいセキュリティ グループを作成します。本番環境での安全性のためにルールを絞り込むことができます。
          • プロトコル: TCP
          • ポート範囲: 0 - 65535
          • 出典: 10.0.0.0/16

    2.3. Kafka ランタイムバイナリを準備する

    1. 要塞ノードの詳細ページに移動します。パブリック IPv4 アドレスを取得します。SSH を使用して、以前にダウンロードしたkafka-vpc-key-pair.pem使用してノードにログインします。

      chmod 400 kafka-vpc-key-pair.pem ssh -i "kafka-vpc-key-pair.pem" ec2-user@{bastion_public_ip} # replace {bastion_public_ip} with the IP address of your bastion node, for example, 54.186.149.187 scp -i "kafka-vpc-key-pair.pem" kafka-vpc-key-pair.pem ec2-user@{bastion_public_ip}:~/
    2. バイナリをダウンロードします。

      # Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz tar -zxf kafka_2.13-3.7.1.tgz wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz
    3. バイナリを各ブローカー ノードにコピーします。

      # Replace {broker-node1-ip} with your broker-node1 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node1-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node1-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # Replace {broker-node2-ip} with your broker-node2 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node2-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node2-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz" # Replace {broker-node3-ip} with your broker-node3 IP address scp -i "kafka-vpc-key-pair.pem" kafka_2.13-3.7.1.tgz ec2-user@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf kafka_2.13-3.7.1.tgz" scp -i "kafka-vpc-key-pair.pem" openjdk-22.0.2_linux-x64_bin.tar.gz ec2-user@{broker-node3-ip}:~/ ssh -i "kafka-vpc-key-pair.pem" ec2-user@{broker-node3-ip} "tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz"

    2.4. 各ブローカーノードにKafkaノードを設定する

    2.4.1 3つのノードを持つKRaft Kafkaクラスターをセットアップする

    各ノードはブローカーとコントローラーのロールとして機能します。各ブローカーに対して次の操作を実行します。

    1. listeners項目の場合、3 つのブローカーはすべて同じであり、ブローカーとコントローラーのロールとして機能します。

      1. すべてのコントローラーロール ノードに対して同じ CONTROLLER リスナーを構成します。ブローカーロール ノードのみを追加する場合は、 server.propertiesの CONTROLLER リスナーは必要ありません。
      2. ブローカーリスナーを 2 つ構成します。3 INTERNAL内部アクセス用、 EXTERNAL TiDB Cloudからの外部アクセス用です。
    2. advertised.listeners項目については、次の操作を行います。

      1. ブローカー ノードの内部 IP を使用して、ブローカーごとに INTERNAL アドバタイズ リスナーを構成します。アドバタイズされた内部 Kafka クライアントはこのアドレスを使用してブローカーにアクセスします。

      2. TiDB TiDB Cloud TiDB Cloudから取得したKafka アドバタイズ リスナー パターンに基づいて EXTERNAL アドバタイズ リスナーを構成します。さまざまな EXTERNAL アドバタイズ リスナーにより、 TiDB Cloudの Kafka クライアントはリクエストを適切なブローカーにルーティングできます。

        • <port> 、ブローカーを Kafka プライベート リンク サービス アクセス ポイントと区別します。すべてのブローカーの EXTERNAL アドバタイズ リスナーのポート範囲を計画します。これらのポートは、ブローカーがリッスンする実際のポートである必要はありません。これらは、リクエストを別のブローカーに転送するプライベート リンク サービスのロード バランサーがリッスンするポートです。
        • Kafka アドバタイズ リスナー パターンAZ ID 、ブローカーがデプロイされている場所を示します。TiDB TiDB Cloud は、 AZ ID に基づいて、リクエストを異なるエンドポイント DNS 名にルーティングします。

      トラブルシューティングを容易にするために、ブローカーごとに異なるブローカー ID を構成することをお勧めします。

    3. 計画値は次のとおりです。

      • コントローラーポート: 29092
      • 内部ポート: 9092
      • 外部: 39092
      • 外部アドバタイズされたリスナーポート範囲: 9093~9095

    2.4.2. 設定ファイルを作成する

    SSH を使用してすべてのブローカー ノードにログインします。次の内容の構成ファイル~/config/server.propertiesを作成します。

    # brokers in usw2-az1 # broker-node1 ~/config/server.properties # 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. # 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. # 2.1 The pattern for AZ(ID: usw2-az1) is "<broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>". # 2.2 So the EXTERNAL can be "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port (9093) in the port range of the EXTERNAL advertised listener. # 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way. process.roles=broker,controller node.id=1 controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 inter.broker.listener.name=INTERNAL advertised.listeners=INTERNAL://{broker-node1-ip}:9092,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 controller.listener.names=CONTROLLER listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.dirs=./data
    # brokers in usw2-az2 # broker-node2 ~/config/server.properties # 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. # 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. # 2.1 The pattern for AZ(ID: usw2-az2) is "<broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>". # 2.2 So the EXTERNAL can be "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port (9094) in the port range of the EXTERNAL advertised listener. # 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way. process.roles=broker,controller node.id=2 controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 inter.broker.listener.name=INTERNAL advertised.listeners=INTERNAL://{broker-node2-ip}:9092,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 controller.listener.names=CONTROLLER listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.dirs=./data
    # brokers in usw2-az3 # broker-node3 ~/config/server.properties # 1. Replace {broker-node1-ip}, {broker-node2-ip}, {broker-node3-ip} with the actual IP addresses. # 2. Configure EXTERNAL in "advertised.listeners" based on the "Kafka Advertised Listener Pattern" in the "Prerequisites" section. # 2.1 The pattern for AZ(ID: usw2-az3) is "<broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>". # 2.2 So the EXTERNAL can be "b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port (9095) in the port range of the EXTERNAL advertised listener. # 2.3 If there are more broker role nodes in the same AZ, you can configure them in the same way. process.roles=broker,controller node.id=3 controller.quorum.voters=1@{broker-node1-ip}:29092,2@{broker-node2-ip}:29092,3@{broker-node3-ip}:29092 listeners=INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29092,EXTERNAL://0.0.0.0:39092 inter.broker.listener.name=INTERNAL advertised.listeners=INTERNAL://{broker-node3-ip}:9092,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 controller.listener.names=CONTROLLER listener.security.protocol.map=INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL log.dirs=./data

    2.4.3 Kafkaブローカーを起動する

    スクリプトを作成し、それを実行して各ブローカー ノードで Kafka ブローカーを起動します。

    #!/bin/bash # Get the directory of the current script SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Set JAVA_HOME to the Java installation within the script directory export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # Define the vars KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" KAFKA_STORAGE_CMD=$KAFKA_DIR/kafka-storage.sh KAFKA_START_CMD=$KAFKA_DIR/kafka-server-start.sh KAFKA_DATA_DIR=$SCRIPT_DIR/data KAFKA_LOG_DIR=$SCRIPT_DIR/log KAFKA_CONFIG_DIR=$SCRIPT_DIR/config # Cleanup step, which makes it easy for multiple experiments # Find all Kafka process IDs KAFKA_PIDS=$(ps aux | grep 'kafka.Kafka' | grep -v grep | awk '{print $2}') if [ -z "$KAFKA_PIDS" ]; then echo "No Kafka processes are running." else # Kill each Kafka process echo "Killing Kafka processes with PIDs: $KAFKA_PIDS" for PID in $KAFKA_PIDS; do kill -9 $PID echo "Killed Kafka process with PID: $PID" done echo "All Kafka processes have been killed." fi rm -rf $KAFKA_DATA_DIR mkdir -p $KAFKA_DATA_DIR rm -rf $KAFKA_LOG_DIR mkdir -p $KAFKA_LOG_DIR # Magic id: BRl69zcmTFmiPaoaANybiw, you can use your own $KAFKA_STORAGE_CMD format -t "BRl69zcmTFmiPaoaANybiw" -c "$KAFKA_CONFIG_DIR/server.properties" > $KAFKA_LOG_DIR/server_format.log LOG_DIR=$KAFKA_LOG_DIR nohup $KAFKA_START_CMD "$KAFKA_CONFIG_DIR/server.properties" &

    2.5. 要塞ノードでクラスター設定をテストする

    1. Kafka ブートストラップをテストします。

      export JAVA_HOME=/home/ec2-user/jdk-22.0.2 # Bootstrap from INTERNAL listener ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:9092 | grep 9092 # Expected output (the actual order might be different) {broker-node1-ip}:9092 (id: 1 rack: null) -> ( {broker-node2-ip}:9092 (id: 2 rack: null) -> ( {broker-node3-ip}:9092 (id: 3 rack: null) -> ( # Bootstrap from EXTERNAL listener ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 # Expected output for the last 3 lines (the actual order might be different) # The difference in the output from "bootstrap from INTERNAL listener" is that exceptions or errors might occur because advertised listeners cannot be resolved in Kafka VPC. # We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Link. b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException
    2. 要塞ノードにプロデューサー スクリプトproduce.shを作成します。

      #!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # Get the directory of the current script SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Set JAVA_HOME to the Java installation within the script directory export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # Define the Kafka directory KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" # Create a topic if it does not exist create_topic() { echo "Creating topic if it does not exist..." $KAFKA_DIR/kafka-topics.sh --create --topic $TOPIC --bootstrap-server $BROKER_LIST --if-not-exists --partitions 3 --replication-factor 3 } # Produce messages to the topic produce_messages() { echo "Producing messages to the topic..." for ((chrono=1; chrono <= 10; chrono++)); do message="Test message "$chrono echo "Create "$message echo $message | $KAFKA_DIR/kafka-console-producer.sh --broker-list $BROKER_LIST --topic $TOPIC done } create_topic produce_messages
    3. 要塞ノードにコンシューマー スクリプトconsume.shを作成します。

      #!/bin/bash BROKER_LIST=$1 # "{broker_address1},{broker_address2}..." # Get the directory of the current script SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" # Set JAVA_HOME to the Java installation within the script directory export JAVA_HOME="$SCRIPT_DIR/jdk-22.0.2" # Define the Kafka directory KAFKA_DIR="$SCRIPT_DIR/kafka_2.13-3.7.1/bin" TOPIC="test-topic" CONSUMER_GROUP="test-group" # Consume messages from the topic consume_messages() { echo "Consuming messages from the topic..." $KAFKA_DIR/kafka-console-consumer.sh --bootstrap-server $BROKER_LIST --topic $TOPIC --from-beginning --timeout-ms 5000 --consumer-property group.id=$CONSUMER_GROUP } consume_messages
    4. produce.shconsume.sh実行して、Kafka クラスターが実行中であることを確認します。これらのスクリプトは、後のネットワーク接続テストでも再利用されます。スクリプトは--partitions 3 --replication-factor 3でトピックを作成します。これら 3 つのブローカーすべてにデータが含まれていることを確認します。スクリプトが 3 つのブローカーすべてに接続して、ネットワーク接続がテストされることを保証します。

      # Test write message. ./produce.sh {one_of_broker_ip}:9092
      # Expected output Creating topic if it does not exist... Producing messages to the topic... Create Test message 1 >>Create Test message 2 >>Create Test message 3 >>Create Test message 4 >>Create Test message 5 >>Create Test message 6 >>Create Test message 7 >>Create Test message 8 >>Create Test message 9 >>Create Test message 10
      # Test read message ./consume.sh {one_of_broker_ip}:9092
      # Expected example output (the actual message order might be different) Consuming messages from the topic... Test message 3 Test message 4 Test message 5 Test message 9 Test message 10 Test message 6 Test message 8 Test message 1 Test message 2 Test message 7 [2024-11-01 08:54:27,547] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.TimeoutException Processed a total of 10 messages

    実行中の Kafka クラスターを再構成する

    Kafka クラスターが TiDB クラスターと同じリージョンおよび AZ にデプロイされていることを確認します。ブローカーが異なる AZ にある場合は、正しい AZ に移動します。

    1. ブローカーの外部リスナーを構成する

    次の構成は、Kafka KRaft クラスターに適用されます。ZK モードの構成も同様です。

    1. 構成の変更を計画します。

      1. TiDB Cloudからの外部アクセス用に、ブローカーごとに EXTERNALリスナーを構成します。 EXTERNAL ポートとして一意のポート (例: 39092 ) を選択します。

      2. TiDB Cloudがさまざまなブローカーを区別できるように、すべてのブローカー ノードに対してTiDB Cloudから取得したKafka アドバタイズ リスナー パターンに基づいて EXTERNALアドバタイズ リスナーを構成します。さまざまな EXTERNAL アドバタイズ リスナーにより、 TiDB Cloudの Kafka クライアントはリクエストを適切なブローカーにルーティングできます。

        • <port>ブローカーを Kafka プライベート リンク サービス アクセス ポイントと区別します。すべてのブローカーの EXTERNAL アドバタイズ リスナーのポート範囲を計画します (例: range from 9093 )。これらのポートは、ブローカーがリッスンする実際のポートである必要はありません。これらは、リクエストを別のブローカーに転送するプライベート リンク サービスのロード バランサーがリッスンするポートです。
        • Kafka アドバタイズ リスナー パターンAZ ID 、ブローカーがデプロイされている場所を示します。TiDB TiDB Cloud は、 AZ ID に基づいて、リクエストを異なるエンドポイント DNS 名にルーティングします。

      トラブルシューティングを容易にするために、ブローカーごとに異なるブローカー ID を構成することをお勧めします。

    2. SSH を使用して各ブローカー ノードにログインします。各ブローカーの構成ファイルを次の内容に変更します。

      # brokers in usw2-az1 # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern for AZ(ID: usw2-az1) is "<broker_id>.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093", replace <broker_id> with "b" prefix plus "node.id" properties, replace <port> with a unique port(9093) in EXTERNAL advertised listener ports range advertised.listeners=...,EXTERNAL://b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT
      # brokers in usw2-az2 # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern for AZ(ID: usw2-az2) is "<broker_id>.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port(9094) in EXTERNAL advertised listener ports range. advertised.listeners=...,EXTERNAL://b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT
      # brokers in usw2-az3 # Add EXTERNAL listener listeners=INTERNAL:...,EXTERNAL://0.0.0.0:39092 # Add EXTERNAL advertised listeners based on the "Kafka Advertised Listener Pattern" in "Prerequisites" section # 1. The pattern for AZ(ID: usw2-az3) is "<broker_id>.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:<port>" # 2. So the EXTERNAL can be "b2.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095". Replace <broker_id> with "b" prefix plus "node.id" properties, and replace <port> with a unique port(9095) in EXTERNAL advertised listener ports range. advertised.listeners=...,EXTERNAL://b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 # Configure EXTERNAL map listener.security.protocol.map=...,EXTERNAL:PLAINTEXT
    3. すべてのブローカーを再構成したら、Kafka ブローカーを 1 つずつ再起動します。

    2. 内部ネットワークで外部リスナー設定をテストする

    Kafka クライアント ノードに Kafka と OpenJDK をダウンロードできます。

    # Download Kafka and OpenJDK, and then extract the files. You can choose the binary version based on your preference. wget https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz tar -zxf kafka_2.13-3.7.1.tgz wget https://download.java.net/java/GA/jdk22.0.2/c9ecb94cd31b495da20a27d4581645e8/9/GPL/openjdk-22.0.2_linux-x64_bin.tar.gz tar -zxf openjdk-22.0.2_linux-x64_bin.tar.gz

    次のスクリプトを実行して、ブートストラップが期待どおりに動作するかどうかをテストします。

    export JAVA_HOME=/home/ec2-user/jdk-22.0.2 # Bootstrap from the EXTERNAL listener ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {one_of_broker_ip}:39092 # Expected output for the last 3 lines (the actual order might be different) # There will be some exceptions or errors because advertised listeners cannot be resolved in your Kafka network. # We will make them resolvable in TiDB Cloud side and make it route to the right broker when you create a changefeed connect to this Kafka cluster by Private Link. b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException

    1. ロードバランサーを設定する

    異なるポートを持つ 4 つのターゲット グループを持つネットワーク ロード バランサーを作成します。1 つのターゲット グループはブートストラップ用で、他のターゲット グループは異なるブローカーにマップされます。

    1. ブートストラップ ターゲット グループ => 9092 => broker-node1:39092、broker-node2:39092、broker-node3:39092
    2. ブローカー ターゲット グループ 1 => 9093 => broker-node1:39092
    3. ブローカー ターゲット グループ 2 => 9094 => broker-node2:39092
    4. ブローカー ターゲット グループ 3 => 9095 => broker-node3:39092

    ブローカー ロール ノードがさらにある場合は、マッピングを追加する必要があります。ブートストラップ ターゲット グループに少なくとも 1 つのノードがあることを確認してください。回復力のために、各 AZ に 1 つずつ、合計 3 つのノードを追加することをお勧めします。

    ロード バランサーを設定するには、次の手順を実行します。

    1. 対象グループに進み、4 つのターゲット グループを作成します。

      • ブートストラップターゲットグループ

        • ターゲットタイプ: Instances
        • 対象グループ名: bootstrap-target-group
        • プロトコル: TCP
        • ポート: 9092
        • IPアドレスタイプ: IPv4
        • VC : Kafka VPC
        • ヘルスチェックプロトコル: TCP
        • broker-node3:39092ターゲットbroker-node2:39092 broker-node1:39092
      • ブローカーターゲットグループ1

        • ターゲットタイプ: Instances
        • 対象グループ名: broker-target-group-1
        • プロトコル: TCP
        • ポート: 9093
        • IPアドレスタイプ: IPv4
        • VC : Kafka VPC
        • ヘルスチェックプロトコル: TCP
        • 登録対象: broker-node1:39092
      • ブローカーターゲットグループ2

        • ターゲットタイプ: Instances
        • 対象グループ名: broker-target-group-2
        • プロトコル: TCP
        • ポート: 9094
        • IPアドレスタイプ: IPv4
        • VC : Kafka VPC
        • ヘルスチェックプロトコル: TCP
        • 登録対象: broker-node2:39092
      • ブローカーターゲットグループ3

        • ターゲットタイプ: Instances
        • 対象グループ名: broker-target-group-3
        • プロトコル: TCP
        • ポート: 9095
        • IPアドレスタイプ: IPv4
        • VC : Kafka VPC
        • ヘルスチェックプロトコル: TCP
        • 登録対象: broker-node3:39092
    2. ロードバランサーに進み、ネットワーク ロード バランサーを作成します。

      • ロードバランサー名: kafka-lb
      • スキーマ: Internal
      • ロードバランサの IP アドレスタイプ: IPv4
      • VC : Kafka VPC
      • 可用性ゾーン:
        • usw2-az1broker-usw2-az1 subnet
        • usw2-az2broker-usw2-az2 subnet
        • usw2-az3broker-usw2-az3 subnet
      • Securityグループ: 次のルールで新しいセキュリティ グループを作成します。
        • インバウンドルールは、Kafka VPC からのすべての TCP を許可します: タイプ - All TCP 、ソース - Anywhere-IPv4
        • アウトバウンドルールは、Kafka VPC へのすべての TCP を許可します: タイプ - All TCP 、宛先 - Anywhere-IPv4
      • リスナーとルーティング:
        • プロトコル: TCP ; ポート: 9092 ; 転送先: bootstrap-target-group
        • プロトコル: TCP ; ポート: 9093 ; 転送先: broker-target-group-1
        • プロトコル: TCP ; ポート: 9094 ; 転送先: broker-target-group-2
        • プロトコル: TCP ; ポート: 9095 ; 転送先: broker-target-group-3
    3. 要塞ノードでロード バランサーをテストします。この例では、Kafka ブートストラップのみをテストします。ロード バランサーは Kafka EXTERNAL リスナーをリッスンしているため、EXTERNAL アドバタイズ リスナーのアドレスは要塞ノードで解決できません。ロード バランサーの詳細ページからkafka-lb DNS 名 (例kafka-lb-77405fa57191adcb.elb.us-west-2.amazonaws.comを書き留めます。要塞ノードでスクリプトを実行します。

      # Replace {lb_dns_name} to your actual value export JAVA_HOME=/home/ec2-user/jdk-22.0.2 ./kafka_2.13-3.7.1/bin/kafka-broker-api-versions.sh --bootstrap-server {lb_dns_name}:9092 # Expected output for the last 3 lines (the actual order might be different) b1.usw2-az1.abc.us-west-2.aws.3199015.tidbcloud.com:9093 (id: 1 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b2.usw2-az2.abc.us-west-2.aws.3199015.tidbcloud.com:9094 (id: 2 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException b3.usw2-az3.abc.us-west-2.aws.3199015.tidbcloud.com:9095 (id: 3 rack: null) -> ERROR: org.apache.kafka.common.errors.DisconnectException # You can also try bootstrap in other ports 9093/9094/9095. It will succeed probabilistically because NLB in AWS resolves LB DNS to the IP address of any availability zone and disables cross-zone load balancing by default. # If you enable cross-zone load balancing in LB, it will succeed. However, it is unnecessary and might cause additional cross-AZ traffic.
    1. エンドポイントサービスに進みます。 「エンドポイント サービスの作成」をクリックして、Kafka ロード バランサーのプライベート リンク サービスを作成します。

      • 名前: kafka-pl-service
      • ロードバランサタイプ: Network
      • ロードバランサー: kafka-lb
      • 含まusw2-az3アベイラビリティゾーンusw2-az2 usw2-az1
      • エンドポイントの承認が必要: Acceptance required
      • プライベートDNS名を有効にする: No
    2. サービス名を書き留めます。これをTiDB Cloudに提供する必要があります (例com.amazonaws.vpce.us-west-2.vpce-svc-0f49e37e1f022cd45

    3. kafka-pl-service の詳細ページで、「プリンシパルを許可」タブをクリックし、 TiDB Cloudの AWS アカウントがエンドポイントを作成できるようにします。 前提条件 、たとえばarn:aws:iam::<account_id>:rootでTiDB Cloudの AWS アカウントを取得できます。

    ステップ3. TiDB Cloudから接続する

    1. TiDB Cloudコンソールに戻り、クラスターがプライベート リンクで Kafka クラスターに接続するための変更フィードを作成します。詳細については、 Apache Kafka にシンクする参照してください。

    2. 「ChangeFeed ターゲットの構成」>「接続方法」>「プライベート リンク」に進むときは、次のフィールドに対応する値を入力し、必要に応じて他のフィールドに入力します。

      • Kafka タイプ: 3 AZsクラスターが同じ 3 つの AZ にデプロイされていることを確認します。
      • Kafka アドバタイズ リスナー パターン: abc 。これは、 前提条件Kafka アドバタイズ リスナー パターンを生成するために使用する一意のランダム文字列と同じです。
      • エンドポイント サービス名: Kafka サービス名。
      • ブートストラップ ポート: 9092背後に専用のブートストラップ ターゲット グループを構成するため、1 つのポートで十分です。
    3. Apache Kafka にシンクするの手順に進みます。

    これでタスクは正常に完了しました。

    FAQ

    このドキュメントに従って最初のプロジェクトからの接続をすでに正常に設定している場合は、次のようにして 2 番目のプロジェクトから同じ Kafka Private Link サービスに接続できます。

    1. このドキュメントの冒頭の指示に従ってください。

    2. ステップ1. Kafkaクラスターをセットアップするに進んだら、 実行中の Kafka クラスターを再構成するに従って、別の EXTERNAL リスナーとアドバタイズされたリスナーのグループを作成します。このグループにはEXTERNAL2という名前を付けることができます。EXTERNAL2ポート範囲はEXTERNALと重複できないことに注意してください。

    3. ブローカーを再構成した後、ブートストラップおよびブローカー ターゲット グループを含む別のターゲット グループをロード バランサーに追加します。

    4. 次の情報を使用してTiDB Cloud接続を構成します。

      • 新しいブートストラップポート
      • 新しい Kafka 広告リスナー グループ
      • 同じエンドポイントサービス

    このページは役に立ちましたか?