データを Kafka にレプリケートする
このドキュメントでは、TiCDC を使用して増分データを Apache Kafka にレプリケートする変更フィードを作成する方法について説明します。
レプリケーション タスクを作成する
次のコマンドを実行して、レプリケーション タスクを作成します。
cdc cli changefeed create \
--server=http://10.0.10.25:8300 \
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" \
--changefeed-id="simple-replication-task"
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
--changefeed-id: レプリケーション タスクの ID。形式は^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$正規表現と一致する必要があります。この ID が指定されていない場合、TiCDC は ID として UUID (バージョン 4 形式) を自動的に生成します。--sink-uri: レプリケーション タスクのダウンストリーム アドレス。詳細については、kafkaでシンク URI を構成するを参照してください。--start-ts: 変更フィードの開始 TSO を指定します。この TSO から、TiCDC クラスターはデータのプルを開始します。デフォルト値は現在の時刻です。--target-ts: changefeed の終了 TSO を指定します。この TSO に対して、TiCDC クラスターはデータのプルを停止します。デフォルト値は空です。これは、TiCDC がデータのプルを自動的に停止しないことを意味します。--config: changefeed 構成ファイルを指定します。詳細については、 TiCDC Changefeedコンフィグレーションパラメータを参照してください。
Kafka のシンク URI を構成する
シンク URI は、TiCDC ターゲット システムの接続情報を指定するために使用されます。形式は次のとおりです。
[scheme]://[userinfo@][host]:[port][/path]?[query_parameters]
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
以下は、Kafka 用に構成できるシンク URI パラメーターと値の説明です。
ベストプラクティス
- 独自の Kafka トピックを作成することをお勧めします。少なくとも、トピックが Kafka ブローカーに送信できる各メッセージの最大データ量と、ダウンストリーム Kafka パーティションの数を設定する必要があります。 changefeed を作成すると、これら 2 つの設定はそれぞれ
max-message-bytesとpartition-numに対応します。 - まだ存在しないトピックで変更フィードを作成すると、TiCDC は
partition-numとreplication-factorパラメーターを使用してトピックを作成しようとします。これらのパラメーターを明示的に指定することをお勧めします。 - ほとんどの場合、
canal-jsonプロトコルを使用することをお勧めします。
ノート:
protocolがopen-protocolの場合、TiCDC は長さがmax-message-bytesを超えるメッセージの生成を回避しようとします。ただし、1 つの変更だけで長さがmax-message-bytesを超える行が非常に大きい場合、TiCDC はサイレント エラーを回避するために、このメッセージを出力しようとし、ログに警告を出力。
TiCDC は Kafka の認証と承認を使用します
以下は、Kafka SASL 認証を使用する場合の例です。
SASL/プレーン
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"SASL/スクラム
SCRAM-SHA-256 と SCRAM-SHA-512 は PLAIN メソッドに似ています。対応する認証方法として
sasl-mechanismを指定するだけです。SASL/GSSAPI
SASL/GSSAPI
user認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"sasl-gssapi-userとsasl-gssapi-realmの値は、kerberos で指定された原理に関連しています。たとえば、原則がalice/for-kafka@example.comに設定されている場合、sasl-gssapi-userとsasl-gssapi-realmそれぞれalice/for-kafkaとexample.comとして指定されます。SASL/GSSAPI
keytab認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"SASL/GSSAPI 認証方式の詳細については、 GSSAPI の設定を参照してください。
TLS/SSL 暗号化
Kafka ブローカーで TLS/SSL 暗号化が有効になっている場合は、
-enable-tls=trueパラメーターを--sink-uriに追加する必要があります。自己署名証明書を使用する場合は、--sink-uriでca、cert、およびkeyも指定する必要があります。ACL 認可
TiCDC が適切に機能するために必要な最小限のアクセス許可セットは次のとおりです。
- トピックリソースタイプの
CreateとWriteアクセス許可。 - クラスタリソース タイプの
DescribeConfigsアクセス許可。
- トピックリソースタイプの
TiCDC を Kafka Connect (コンフルエント プラットフォーム) と統合する
Confluent が提供するデータ コネクタ使用してデータをリレーショナル データベースまたは非リレーショナル データベースにストリーミングするには、 avroプロトコルを使用してコンフルエント スキーマ レジストリ in schema-registryの URL を提供する必要があります。
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
詳細な統合ガイドについては、 TiDB と Confluent Platform の統合に関するクイック スタート ガイドを参照してください。
Kafka Sink のトピックおよびパーティション ディスパッチャーのルールをカスタマイズする
マッチャーのルール
前のセクションの例では:
- マッチャー ルールに一致するテーブルについては、対応するトピック式で指定されたポリシーに従ってディスパッチされます。たとえば、
test3.aaテーブルは「トピック式 2」に従ってディスパッチされます。test5.aaテーブルは「トピック式 3」に従ってディスパッチされます。 - 複数のマッチャー ルールに一致するテーブルの場合、最初に一致したトピック式に従ってディスパッチされます。たとえば、「トピック表現 1」に従って、
test1.aaテーブルが分散されます。 - どのマッチャー ルールにも一致しないテーブルの場合、対応するデータ変更イベントが
--sink-uriで指定されたデフォルト トピックに送信されます。たとえば、test10.aaテーブルはデフォルト トピックに送信されます。 - マッチャー ルールに一致するが、トピック ディスパッチャーが指定されていないテーブルの場合、対応するデータ変更は
--sink-uriで指定されたデフォルト トピックに送信されます。たとえば、test6.aaテーブルはデフォルト トピックに送信されます。
トピック ディスパッチャ
topic = "xxx" を使用してトピック ディスパッチャを指定し、トピック式を使用して柔軟なトピック ディスパッチ ポリシーを実装できます。トピックの総数は 1000 未満にすることをお勧めします。
Topic 式の形式は[prefix]{schema}[middle][{table}][suffix]です。
prefix: オプション。トピック名のプレフィックスを示します。{schema}: 必須。スキーマ名と一致させるために使用されます。middle: オプション。スキーマ名とテーブル名の間の区切り文字を示します。{table}: オプション。テーブル名と一致させるために使用されます。suffix: オプション。トピック名のサフィックスを示します。
prefix 、 middleおよびsuffixには、次の文字のみを含めることができます: a-z 、 A-Z 、 0-9 、 . 、 _ 、および- 。 {schema}と{table}は両方とも小文字です。 {Schema}や{TABLE}などのプレースホルダーは無効です。
いくつかの例:
matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"test1.table1に対応するデータ変更イベントは、hello_test1_table1という名前のトピックに送信されます。test2.table2に対応するデータ変更イベントは、hello_test2_table2という名前のトピックに送信されます。
matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"test3のすべてのテーブルに対応するデータ変更イベントは、hello_test3_worldという名前のトピックに送信されます。test4のすべてのテーブルに対応するデータ変更イベントは、hello_test4_worldという名前のトピックに送信されます。
matcher = ['*.*'], topic = "{schema}_{table}"- TiCDC がリッスンするすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。たとえば、
test.accountテーブルの場合、TiCDC はそのデータ変更ログをtest_accountという名前のトピックにディスパッチします。
- TiCDC がリッスンするすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。たとえば、
DDL イベントのディスパッチ
スキーマレベルの DDL
create databaseやdrop databaseなど、特定のテーブルに関連付けられていない DDL は、スキーマ レベルの DDL と呼ばれます。スキーマレベルの DDL に対応するイベントは、 --sink-uriで指定されたデフォルトのトピックに送信されます。
テーブルレベルの DDL
alter tableやcreate tableなど、特定のテーブルに関連する DDL はテーブルレベル DDL と呼ばれます。テーブルレベルの DDL に対応するイベントは、ディスパッチャの構成に従って、対応するトピックに送信されます。
たとえば、 matcher = ['test.*'], topic = {schema}_{table}のようなディスパッチャーの場合、DDL イベントは次のようにディスパッチされます。
- DDL イベントに含まれるテーブルが 1 つの場合、DDL イベントは対応するトピックにそのまま送信されます。たとえば、DDL イベント
drop table test.table1の場合、イベントはtest_table1という名前のトピックに送信されます。 - DDL イベントに複数のテーブルが含まれる場合 (
rename table/drop table/drop viewは複数のテーブルが含まれる場合があります)、DDL イベントは複数のイベントに分割され、対応するトピックに送信されます。たとえば、DDL イベントrename table test.table1 to test.table10, test.table2 to test.table20の場合、イベントrename table test.table1 to test.table10test_table1という名前のトピックに送信され、イベントrename table test.table2 to test.table20test.table2という名前のトピックに送信されます。
区画ディスパッチャー
partition = "xxx"使用して、パーティション ディスパッチャーを指定できます。デフォルト、ts、インデックス値、およびテーブルの 4 つのディスパッチャがサポートされています。ディスパッチャのルールは次のとおりです。
- デフォルト: 複数の一意のインデックス (主キーを含む) が存在する場合、または古い値機能が有効になっている場合、イベントはテーブル モードでディスパッチされます。一意のインデックス (または主キー) が 1 つだけ存在する場合、イベントはインデックス値モードで送出されます。
- ts: 行変更の commitTs を使用して、イベントをハッシュおよびディスパッチします。
- index-value: 主キーの値またはテーブルの一意のインデックスを使用して、イベントをハッシュしてディスパッチします。
- table: テーブルのスキーマ名とテーブル名を使用して、イベントをハッシュしてディスパッチします。
ノート:
v6.1.0 以降、構成の意味を明確にするために、パーティション ディスパッチャを指定するために使用される構成が
dispatcherからpartitionに変更されましたpartitionはdispatcherのエイリアスです。たとえば、次の 2 つのルールはまったく同じです。[sink] dispatchers = [ {matcher = ['*.*'], dispatcher = "ts"}, {matcher = ['*.*'], partition = "ts"}, ]ただし、
dispatcherとpartition同じルールに含めることはできません。たとえば、次のルールは無効です。{matcher = ['*.*'], dispatcher = "ts", partition = "table"},