Apache Kafka および Apache Flink とデータを統合する

このドキュメントでは、 TiCDCを使用して TiDB データを Apache Kafka および Apache Flink にレプリケートする方法について説明します。この文書の構成は次のとおりです。

  1. TiCDC を含む TiDB クラスターを迅速にデプロイし、Kafka クラスターと Flink クラスターを作成します。
  2. データを TiDB から Kafka にレプリケートする変更フィードを作成します。
  3. go-tpc を使用して TiDB にデータを書き込みます。
  4. Kafka コンソール コンシューマでデータを観察し、データが指定された Kafka トピックにレプリケートされていることを確認します。
  5. (オプション) Kafka データを消費するように Flink クラスターを構成します。

前述の手順はラボ環境で実行されます。これらの手順を参照して、本番環境にクラスターをデプロイすることもできます。

ステップ 1. 環境をセットアップする

  1. TiCDC を含む TiDB クラスターをデプロイ。

    ラボまたはテスト環境では、 TiUP Playground を使用して、TiCDC が組み込まれた TiDB クラスターを迅速にデプロイできます。

    tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 # View cluster status tiup status

    TiUPがまだインストールされていない場合は、 TiUPをインストールするを参照してください。本番環境では、 TiCDCのデプロイ手順に従って TiCDC をデプロイできます。

  2. Kafka クラスターを作成します。

  3. (オプション) Flink クラスターを作成します。

ステップ 2. Kafka チェンジフィードを作成する

  1. チェンジフィード構成ファイルを作成します。

    Flink の要求に応じて、各テーブルの増分データを独立したトピックに送信する必要があり、主キー値に基づいてイベントごとにパーティションをディスパッチする必要があります。したがって、次の内容で変更フィード構成ファイルchangefeed.confを作成する必要があります。

    [sink] dispatchers = [ {matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"}, ]

    設定ファイルのdispatchersの詳細については、 Kafka シンクのトピックおよびパーティション ディスパッチャーのルールをカスタマイズするを参照してください。

  2. 増分データを Kafka にレプリケートするための変更フィードを作成します。

    tiup ctl:v<CLUSTER_VERSION> cdc changefeed create --server="http://127.0.0.1:8300" --sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" --changefeed-id="kafka-changefeed" --config="changefeed.conf"
    • チェンジフィードが正常に作成されると、以下に示すように、チェンジフィード ID などのチェンジフィード情報が表示されます。

      Create changefeed successfully! ID: kafka-changefeed Info: {... changfeed info json struct ...}
    • コマンドの実行後に結果が返されない場合は、コマンドを実行したサーバーとシンク URI で指定された Kafka マシン間のネットワーク接続を確認してください。

    本番環境では、Kafka クラスターに複数のブローカー ノードがあります。したがって、複数のブローカーのアドレスをシンク UIR に追加できます。これにより、Kafka クラスターへの安定したアクセスが保証されます。 Kafka クラスターがダウンしても、チェンジフィードは引き続き機能します。 Kafka クラスターに 3 つのブローカー ノードがあり、IP アドレスがそれぞれ 127.0.0.1:9092、127.0.0.2:9092、および 127.0.0.3:9092 であるとします。次のシンク URI を使用して変更フィードを作成できます。

    tiup ctl:v<CLUSTER_VERSION> cdc changefeed create --server="http://127.0.0.1:8300" --sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" --config="changefeed.conf"
  3. 変更フィードを作成した後、次のコマンドを実行して変更フィードのステータスを確認します。

    tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

    チェンジフィードを管理するには、 TiCDC 変更フィードの管理を参照してください。

ステップ 3. データを書き込んで変更ログを生成する

前述の手順が完了すると、TiCDC は TiDB クラスター内の増分データの変更ログを Kafka に送信します。このセクションでは、TiDB にデータを書き込んで変更ログを生成する方法について説明します。

  1. サービスのワークロードをシミュレートします。

    ラボ環境で変更ログを生成するには、go-tpc を使用してデータを TiDB クラスターに書き込むことができます。具体的には、次のコマンドを実行してTiUPベンチを使用してtpccデータベースを作成し、この新しいデータベースにデータを書き込みます。

    tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

    go-tpc の詳細については、 TiDB で TPC-C テストを実行する方法を参照してください。

  2. Kafka トピック内のデータを使用します。

    チェンジフィードが正常に動作すると、データが Kafka トピックに書き込まれます。 kafka-console-consumer.shを実行します。データが Kafka トピックに正常に書き込まれていることがわかります。

    ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`

この時点で、TiDB データベースの増分データは Kafka に正常にレプリケートされます。次に、Flink を使用して Kafka データを使用できます。あるいは、特定のサービス シナリオ向けに Kafka コンシューマ クライアントを自分で開発することもできます。

  1. Flink Kafka コネクタをインストールします。

    Flink エコシステムでは、Flink Kafka コネクタを使用して Kafka データを消費し、データを Flink に出力します。ただし、Flink Kafka コネクタは自動的にはインストールされません。これを使用するには、Flink をインストールした後、Flink Kafka コネクタとその依存関係を Flink インストール ディレクトリに追加します。具体的には、以下のjarファイルをFlinkインストールディレクトリのlibディレクトリにダウンロードします。すでに Flink クラスターを実行している場合は、再起動して新しいプラグインをロードします。

  2. テーブルを作成します。

    Flink がインストールされているディレクトリで、次のコマンドを実行して Flink SQL クライアントを起動します。

    [root@flink flink-1.15.0]# ./bin/sql-client.sh

    次に、次のコマンドを実行して、 tpcc_ordersという名前のテーブルを作成します。

    CREATE TABLE tpcc_orders ( o_id INTEGER, o_d_id INTEGER, o_w_id INTEGER, o_c_id INTEGER, o_entry_d STRING, o_carrier_id INTEGER, o_ol_cnt INTEGER, o_all_local INTEGER ) WITH ( 'connector' = 'kafka', 'topic' = 'tidb_tpcc_orders', 'properties.bootstrap.servers' = '127.0.0.1:9092', 'properties.group.id' = 'testGroup', 'format' = 'canal-json', 'scan.startup.mode' = 'earliest-offset', 'properties.auto.offset.reset' = 'earliest' )

    topicproperties.bootstrap.servers環境内の実際の値に置き換えます。

  3. テーブルのデータをクエリします。

    次のコマンドを実行して、 tpcc_ordersテーブルのデータをクエリします。

    SELECT * FROM tpcc_orders;

    このコマンドを実行すると、次の図に示すように、テーブルに新しいデータがあることがわかります。

    SQL query result

Kafka とのデータ統合が完了しました。

このページは役に立ちましたか?

Playground
新規
登録なしで TiDB の機能をワンストップでインタラクティブに体験できます。
製品
TiDB Cloud
TiDB
価格
PoC お問い合わせ
エコシステム
TiKV
TiFlash
OSS Insight
© 2024 PingCAP. All Rights Reserved.
Privacy Policy.