TiDBとConfluentプラットフォームの統合に関するクイックスタートガイド

このドキュメントでは、 TiCDCを使用してTiDBをConfluentプラットフォームに統合する方法を紹介します。

コンフルエントなプラットフォームは、ApacheKafkaをコアとするデータストリーミングプラットフォームです。多くの公式およびサードパーティのシンクコネクタを備えたConfluentPlatformを使用すると、ストリームソースをリレーショナルデータベースまたは非リレーショナルデータベースに簡単に接続できます。

TiDBをConfluentPlatformと統合するには、TiCDCコンポーネントをAvroプロトコルで使用できます。 TiCDCは、ConfluentPlatformが認識する形式でデータ変更をKafkaにストリーミングできます。詳細な統合ガイドについては、次のセクションを参照してください。

前提条件

ノート:

このチュートリアルでは、 JDBCシンクコネクタを使用してTiDBデータをダウンストリームのリレーショナルデータベースに複製します。簡単にするために、ここでは例としてSQLiteを使用しています。

  • Zookeeper、Kafka、およびSchemaRegistryが正しくインストールされていることを確認してください。 Confluentプラットフォームクイックスタートガイドに従って、ローカルテスト環境を展開することをお勧めします。

  • 次のコマンドを実行して、JDBCシンクコネクタがインストールされていることを確認します。結果にはjdbc-sinkが含まれている必要があります。

    confluent local services connect connector list

統合手順

  1. 次の構成をjdbc-sink-connector.jsonに保存します。

    { "name": "jdbc-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "testdb_test", "connection.url": "sqlite:test.db", "connection.ds.pool.size": 5, "table.name.format": "test", "auto.create": true, "auto.evolve": true } }
  2. 次のコマンドを実行して、JDBCシンクコネクタのインスタンスを作成します(Kafkaが127.0.0.1:8083をリッスンしていると仮定します)。

    curl -X POST -H "Content-Type: application/json" -d @jdbc-sink-connector.json http://127.0.0.1:8083/connectors
  3. 次のいずれかの方法でTiCDCをデプロイします。 TiCDCがすでに展開されている場合は、この手順をスキップできます。

    続行する前に、TiDBおよびTiCDCクラスターが正常であることを確認してください。

  4. cdc cliコマンドを実行してchangefeedを作成します。

    ./cdc cli changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/testdb_test?protocol=avro" --opts "registry=http://127.0.0.1:8081"

    ノート:

    PD、Kafka、およびSchemaRegistryがそれぞれのデフォルトポートで実行されていることを確認してください。

データ複製のテスト

TiDBがConfluentPlatformと統合された後、以下の手順例に従ってデータ複製をテストできます。

  1. TiDBクラスタにtestdbのデータベースを作成します。

    CREATE DATABASE IF NOT EXISTS testdb;

    testdbtestのテーブルを作成します。

    USE testdb; CREATE TABLE test ( id INT PRIMARY KEY, v TEXT );

    ノート:

    データベース名またはテーブル名を変更する必要がある場合は、それに応じてjdbc-sink-connector.jsontopicsを変更してください。

  2. TiDBにデータを挿入します。

    INSERT INTO test (id, v) values (1, 'a'); INSERT INTO test (id, v) values (2, 'b'); INSERT INTO test (id, v) values (3, 'c'); INSERT INTO test (id, v) values (4, 'd');
  3. データがダウンストリームに複製されるまでしばらく待ちます。次に、ダウンストリームでデータを確認します。

    sqlite3 test.db sqlite> SELECT * from test;

このページは役に立ちましたか?