TiDBとConfluentプラットフォームの統合に関するクイックスタートガイド
このドキュメントでは、 TiCDCを使用してTiDBをConfluentプラットフォームに統合する方法を紹介します。
コンフルエントなプラットフォームは、ApacheKafkaをコアとするデータストリーミングプラットフォームです。多くの公式およびサードパーティのシンクコネクタを備えたConfluentPlatformを使用すると、ストリームソースをリレーショナルデータベースまたは非リレーショナルデータベースに簡単に接続できます。
TiDBをConfluentPlatformと統合するには、TiCDCコンポーネントをAvroプロトコルで使用できます。 TiCDCは、ConfluentPlatformが認識する形式でデータ変更をKafkaにストリーミングできます。詳細な統合ガイドについては、次のセクションを参照してください。
前提条件
ノート:
このチュートリアルでは、 JDBCシンクコネクタを使用してTiDBデータをダウンストリームのリレーショナルデータベースに複製します。簡単にするために、ここでは例としてSQLiteを使用しています。
Zookeeper、Kafka、およびSchemaRegistryが正しくインストールされていることを確認してください。 Confluentプラットフォームクイックスタートガイドに従って、ローカルテスト環境を展開することをお勧めします。
次のコマンドを実行して、JDBCシンクコネクタがインストールされていることを確認します。結果には
jdbc-sink
が含まれている必要があります。confluent local services connect connector list
統合手順
次の構成を
jdbc-sink-connector.json
に保存します。{ "name": "jdbc-sink-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "testdb_test", "connection.url": "sqlite:test.db", "connection.ds.pool.size": 5, "table.name.format": "test", "auto.create": true, "auto.evolve": true } }次のコマンドを実行して、JDBCシンクコネクタのインスタンスを作成します(Kafkaが
127.0.0.1:8083
をリッスンしていると仮定します)。curl -X POST -H "Content-Type: application/json" -d @jdbc-sink-connector.json http://127.0.0.1:8083/connectors次のいずれかの方法でTiCDCをデプロイします。 TiCDCがすでに展開されている場合は、この手順をスキップできます。
- TiUPを使用してTiCDCを含む新しいTiDBクラスタをデプロイします
- TiUPを使用して既存のTiDBクラスタにTiCDCを追加します
- バイナリを使用して既存のTiDBクラスタにTiCDCを追加します(非推奨)
続行する前に、TiDBおよびTiCDCクラスターが正常であることを確認してください。
cdc cli
コマンドを実行してchangefeed
を作成します。./cdc cli changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/testdb_test?protocol=avro" --opts "registry=http://127.0.0.1:8081"ノート:
PD、Kafka、およびSchemaRegistryがそれぞれのデフォルトポートで実行されていることを確認してください。
データ複製のテスト
TiDBがConfluentPlatformと統合された後、以下の手順例に従ってデータ複製をテストできます。
TiDBクラスタに
testdb
のデータベースを作成します。CREATE DATABASE IF NOT EXISTS testdb;testdb
でtest
のテーブルを作成します。USE testdb; CREATE TABLE test ( id INT PRIMARY KEY, v TEXT );ノート:
データベース名またはテーブル名を変更する必要がある場合は、それに応じて
jdbc-sink-connector.json
のtopics
を変更してください。TiDBにデータを挿入します。
INSERT INTO test (id, v) values (1, 'a'); INSERT INTO test (id, v) values (2, 'b'); INSERT INTO test (id, v) values (3, 'c'); INSERT INTO test (id, v) values (4, 'd');データがダウンストリームに複製されるまでしばらく待ちます。次に、ダウンストリームでデータを確認します。
sqlite3 test.db sqlite> SELECT * from test;