重要

TiDB v6.2 (DMR) のドキュメントを表示しています。PingCap は v6.2 のバグ修正を提供していません。バグは将来のリリースで修正される予定です。

一般的な目的では、TiDBデータベースの最新の安定バージョンを使用してください。
重要
このページは英語版のページを機械翻訳しています。原文はこちらからご覧ください。

Apache Kafka および Apache Flink とのデータの統合

このドキュメントでは、 TiCDCを使用して TiDB データを Apache Kafka および Apache Flink に複製する方法について説明します。このドキュメントの構成は次のとおりです。

  1. TiCDC を含む TiDB クラスターをすばやくデプロイし、Kafka クラスターと Flink クラスターを作成します。
  2. TiDB から Kafka にデータをレプリケートする変更フィードを作成します。
  3. go-tpc を使用して TiDB にデータを書き込みます。
  4. Kafka コンソール コンシューマーでデータを観察し、データが指定された Kafka トピックにレプリケートされていることを確認します。
  5. (オプション) Kafka データを使用するように Flink クラスターを構成します。

上記の手順は、ラボ環境で実行されます。これらの手順を参照して、本番環境にクラスターをデプロイすることもできます。

ステップ 1. 環境をセットアップする

  1. TiCDC を含む TiDB クラスターをデプロイします。

    ラボまたはテスト環境では、TiUP Playground を使用して、TiCDC を含む TiDB クラスターをすばやくデプロイできます。

    tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
    # View cluster status
    tiup status
    

    TiUP がインストールされていない場合は、 TiUPをインストールするを参照してください。実稼働環境では、 TiCDC をデプロイの指示に従って TiCDC をデプロイできます。

  2. Kafka クラスターを作成します。

  3. (オプション) Flink クラスターを作成します。

ステップ 2. Kafka チェンジフィードを作成する

  1. changefeed 構成ファイルを作成します。

    Flink の要求に応じて、各テーブルの増分データを独立したトピックに送信する必要があり、主キーの値に基づいて各イベントに対してパーティションをディスパッチする必要があります。したがって、次の内容で changefeed 構成ファイルchangefeed.confを作成する必要があります。

    [sink]
    dispatchers = [
    {matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
    ]
    

    構成ファイルのdispatchersの詳細な説明については、 Kafka Sink のトピックおよびパーティション ディスパッチャーのルールをカスタマイズするを参照してください。

  2. 増分データを Kafka にレプリケートする変更フィードを作成します。

    tiup ctl:v6.2.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" --changefeed-id="kafka-changefeed" --config="changefeed.conf"
    
    • 変更フィードが正常に作成されると、次のように、変更フィード ID などの変更フィード情報が表示されます。

      Create changefeed successfully!
      ID: kafka-changefeed
      Info: {... changfeed info json struct ...}
      
    • コマンドの実行後に結果が返されない場合は、コマンドを実行したサーバーとシンク URI で指定された Kafka マシンとの間のネットワーク接続を確認してください。

    本番環境では、Kafka クラスターに複数のブローカー ノードがあります。したがって、複数のブローカーのアドレスをシンク UIR に追加できます。これにより、Kafka クラスターへの安定したアクセスが保証されます。 Kafka クラスターがダウンしても、changefeed は引き続き機能します。 Kafka クラスターに 3 つのブローカー ノードがあり、IP アドレスがそれぞれ 127.0.0.1:9092、127.0.0.2:9092、127.0.0.3:9092 であるとします。次のシンク URI を使用して、変更フィードを作成できます。

    tiup ctl:v6.2.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" --config="changefeed.conf"
    
  3. 変更フィードを作成したら、次のコマンドを実行して変更フィードのステータスを確認します。

    tiup ctl:v6.2.0 cdc changefeed list --pd="http://127.0.0.1:2379"
    

    TiCDCクラスタとレプリケーション タスクの管理を参照して、変更フィードを管理できます。

ステップ 3. データを書き込んで変更ログを生成する

上記の手順が完了すると、TiCDC は TiDB クラスター内の増分データの変更ログを Kafka に送信します。このセクションでは、TiDB にデータを書き込んで変更ログを生成する方法について説明します。

  1. サービスのワークロードをシミュレートします。

    ラボ環境で変更ログを生成するには、go-tpc を使用してデータを TiDB クラスターに書き込みます。具体的には、次のコマンドを実行して、TiUP ベンチを使用してtpccデータベースを作成し、この新しいデータベースにデータを書き込みます。

    tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
    tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s
    

    go-tpc の詳細については、 TiDB で TPC-C テストを実行する方法を参照してください。

  2. Kafka トピックでデータを使用します。

    changefeed が正常に機能すると、Kafka トピックにデータが書き込まれます。 kafka-console-consumer.shを実行します。 Kafka トピックにデータが正常に書き込まれていることがわかります。

    ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`
    

この時点で、TiDB データベースの増分データが Kafka に正常に複製されます。次に、Flink を使用して Kafka データを使用できます。または、特定のサービス シナリオ用に Kafka コンシューマー クライアントを独自に開発することもできます。

  1. Flink Kafka コネクタをインストールします。

    Flink エコシステムでは、Flink Kafka コネクタを使用して Kafka データを消費し、データを Flink に出力します。ただし、Flink Kafka コネクタは自動的にインストールされません。これを使用するには、Flink のインストール後に、Flink Kafka コネクタとその依存関係を Flink インストール ディレクトリに追加します。具体的には、以下の jar ファイルを Flink インストール ディレクトリのlibディレクトリにダウンロードします。 Flink クラスターを既に実行している場合は、再起動して新しいプラグインをロードします。

  2. テーブルを作成します。

    Flink がインストールされているディレクトリで、次のコマンドを実行して Flink SQL クライアントを起動します。

    [root@flink flink-1.15.0]# ./bin/sql-client.sh
    

    次に、次のコマンドを実行して、 tpcc_ordersという名前のテーブルを作成します。

    CREATE TABLE tpcc_orders (
        o_id INTEGER,
        o_d_id INTEGER,
        o_w_id INTEGER,
        o_c_id INTEGER,
        o_entry_d STRING,
        o_carrier_id INTEGER,
        o_ol_cnt INTEGER,
        o_all_local INTEGER
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'tidb_tpcc_orders',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'properties.group.id' = 'testGroup',
    'format' = 'canal-json',
    'scan.startup.mode' = 'earliest-offset',
    'properties.auto.offset.reset' = 'earliest'
    )
    

    topicproperties.bootstrap.serversを環境の実際の値に置き換えます。

  3. テーブルのデータをクエリします。

    次のコマンドを実行して、 tpcc_ordersテーブルのデータをクエリします。

    SELECT * FROM tpcc_orders;
    

    このコマンドを実行すると、次の図に示すように、テーブルに新しいデータがあることがわかります。

    SQL query result

Kafka とのデータ統合が行われます。