データを Kafka にレプリケートする

このドキュメントでは、TiCDC を使用して増分データを Apache Kafka にレプリケートする変更フィードを作成する方法について説明します。

レプリケーション タスクを作成する

次のコマンドを実行して、レプリケーション タスクを作成します。

cdc cli changefeed create \ --server=http://10.0.10.25:8300 \ --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" \ --changefeed-id="simple-replication-task"
Create changefeed successfully! ID: simple-replication-task Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
  • --changefeed-id : レプリケーション タスクの ID。形式は^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$正規表現と一致する必要があります。この ID が指定されていない場合、TiCDC は ID として UUID (バージョン 4 形式) を自動的に生成します。
  • --sink-uri : レプリケーション タスクのダウンストリーム アドレス。詳細については、 kafkaでシンク URI を構成するを参照してください。
  • --start-ts : 変更フィードの開始 TSO を指定します。この TSO から、TiCDC クラスターはデータのプルを開始します。デフォルト値は現在の時刻です。
  • --target-ts : changefeed の終了 TSO を指定します。この TSO に対して、TiCDC クラスターはデータのプルを停止します。デフォルト値は空です。これは、TiCDC がデータのプルを自動的に停止しないことを意味します。
  • --config : changefeed 構成ファイルを指定します。詳細については、 TiCDC Changefeedコンフィグレーションパラメータを参照してください。

Kafka のシンク URI を構成する

シンク URI は、TiCDC ターゲット システムの接続情報を指定するために使用されます。形式は次のとおりです。

[scheme]://[userinfo@][host]:[port][/path]?[query_parameters]

サンプル構成:

--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"

以下は、Kafka 用に構成できるシンク URI パラメーターと値の説明です。

パラメータ/パラメータ値説明
127.0.0.1ダウンストリーム Kafka サービスの IP アドレス。
9092ダウンストリーム Kafka のポート。
topic-name変数。 Kafka トピックの名前。
kafka-versionダウンストリーム Kafka のバージョン (オプション、デフォルトでは2.4.0現在、サポートされている最も古い Kafka バージョンは0.11.0.2で、最新のものは3.2.0です。この値は、ダウンストリーム Kafka の実際のバージョンと一致する必要があります)。
kafka-client-idレプリケーション タスクの Kafka クライアント ID を指定します (オプション。既定ではTiCDC_sarama_producer_replication ID )。
partition-numダウンストリーム Kafka パーティションの数 (オプション。値は実際のパーティション数を超えてはなりません。そうでない場合、レプリケーション タスクは正常に作成されません。デフォルトでは3 )。
max-message-bytes毎回 Kafka ブローカーに送信されるデータの最大サイズ (オプション、デフォルトでは10MB )。 v5.0.6 および v4.0.6 から、デフォルト値が64MBおよび256MBから10MBに変更されました。
replication-factor保存できる Kafka メッセージ レプリカの数 (オプション、既定では1 )。
compressionメッセージの送信時に使用される圧縮アルゴリズム (値のオプションはnonelz4gzipsnappy 、およびzstdで、デフォルトではnoneです)。
protocolメッセージが Kafka に出力されるプロトコル。値のオプションはcanal-jsonopen-protocolcanalavroおよびmaxwellです。
auto-create-topic渡されたtopic-name Kafka クラスターに存在しない場合に、TiCDC が自動的にトピックを作成するかどうかを決定します (オプション、デフォルトではtrue )。
enable-tidb-extensionオプション。デフォルトではfalse 。出力プロトコルがcanal-jsonの場合、値がtrueの場合、TiCDC は Resolved イベントを送信し、TiDB 拡張フィールドを Kafka メッセージに追加します。 v6.1.0 から、このパラメーターはavroプロトコルにも適用されます。値がtrueの場合、TiCDC は Kafka メッセージに3 つの TiDB 拡張フィールドを追加します。
max-batch-sizev4.0.9 の新機能。メッセージ プロトコルが 1 つの Kafka メッセージへの複数のデータ変更の出力をサポートしている場合、このパラメーターは 1 つの Kafka メッセージ内のデータ変更の最大数を指定します。現在、Kafka のprotocolopen-protocol (オプション、デフォルトでは16 ) の場合にのみ有効です。
enable-tlsTLS を使用してダウンストリームの Kafka インスタンスに接続するかどうか (オプション、デフォルトではfalse )。
caダウンストリーム Kafka インスタンスに接続するために必要な CA 証明書ファイルのパス (オプション)。
certダウンストリーム Kafka インスタンスに接続するために必要な証明書ファイルのパス (オプション)。
keyダウンストリーム Kafka インスタンスに接続するために必要な証明書キー ファイルのパス (オプション)。
sasl-userダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証の ID (authcid) (オプション)。
sasl-passwordダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証のパスワード (オプション)。
sasl-mechanismダウンストリーム Kafka インスタンスに接続するために必要な SASL 認証の名前。値はplainscram-sha-256scram-sha-512 、またはgssapiです。
sasl-gssapi-auth-typegssapi 認証タイプ。値はuserまたはkeytabです (オプション)。
sasl-gssapi-keytab-pathgssapi キータブ パス (オプション)。
sasl-gssapi-kerberos-config-pathgssapi kerberos 構成パス (オプション)。
sasl-gssapi-service-namegssapi サービス名 (オプション)。
sasl-gssapi-usergssapi 認証のユーザー名 (オプション)。
sasl-gssapi-passwordgssapi 認証のパスワード (オプション)。
sasl-gssapi-realmgssapi レルム名 (オプション)。
sasl-gssapi-disable-pafxfastgssapi PA-FX-FAST を無効にするかどうか (オプション)。
dial-timeoutダウンストリーム Kafka との接続を確立する際のタイムアウト。デフォルト値は10sです。
read-timeoutダウンストリーム Kafka から返された応答を取得する際のタイムアウト。デフォルト値は10sです。
write-timeoutダウンストリーム Kafka にリクエストを送信する際のタイムアウト。デフォルト値は10sです。
avro-decimal-handling-modeavroプロトコルでのみ有効です。 Avro が DECIMAL フィールドを処理する方法を決定します。値はstringまたはpreciseで、DECIMAL フィールドを文字列または正確な浮動小数点数にマッピングすることを示します。
avro-bigint-unsigned-handling-modeavroプロトコルでのみ有効です。 Avro が BIGINT UNSIGNED フィールドを処理する方法を決定します。値はstringまたはlongで、BIGINT UNSIGNED フィールドを 64 ビットの符号付き数値または文字列にマッピングすることを示します。

ベストプラクティス

  • 独自の Kafka トピックを作成することをお勧めします。少なくとも、トピックが Kafka ブローカーに送信できる各メッセージの最大データ量と、ダウンストリーム Kafka パーティションの数を設定する必要があります。 changefeed を作成すると、これら 2 つの設定はそれぞれmax-message-bytespartition-numに対応します。
  • まだ存在しないトピックで変更フィードを作成すると、TiCDC はpartition-numreplication-factorパラメーターを使用してトピックを作成しようとします。これらのパラメーターを明示的に指定することをお勧めします。
  • ほとんどの場合、 canal-jsonプロトコルを使用することをお勧めします。

ノート:

protocolopen-protocolの場合、TiCDC は長さがmax-message-bytesを超えるメッセージの生成を回避しようとします。ただし、1 つの変更だけで長さがmax-message-bytesを超える行が非常に大きい場合、TiCDC はサイレント エラーを回避するために、このメッセージを出力しようとし、ログに警告を出力。

TiCDC は Kafka の認証と承認を使用します

以下は、Kafka SASL 認証を使用する場合の例です。

  • SASL/プレーン

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"
  • SASL/スクラム

    SCRAM-SHA-256 と SCRAM-SHA-512 は PLAIN メソッドに似ています。対応する認証方法としてsasl-mechanismを指定するだけです。

  • SASL/GSSAPI

    SASL/GSSAPI user認証:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"

    sasl-gssapi-usersasl-gssapi-realmの値は、kerberos で指定された原理に関連しています。たとえば、原則がalice/for-kafka@example.comに設定されている場合、 sasl-gssapi-usersasl-gssapi-realmそれぞれalice/for-kafkaexample.comとして指定されます。

    SASL/GSSAPI keytab認証:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"

    SASL/GSSAPI 認証方式の詳細については、 GSSAPI の設定を参照してください。

  • TLS/SSL 暗号化

    Kafka ブローカーで TLS/SSL 暗号化が有効になっている場合は、 -enable-tls=trueパラメーターを--sink-uriに追加する必要があります。自己署名証明書を使用する場合は、 --sink-uricacert 、およびkeyも指定する必要があります。

  • ACL 認可

    TiCDC が適切に機能するために必要な最小限のアクセス許可セットは次のとおりです。

    • トピックリソースタイプCreateWriteアクセス許可。
    • クラスタリソース タイプのDescribeConfigsアクセス許可。

TiCDC を Kafka Connect (コンフルエント プラットフォーム) と統合する

Confluent が提供するデータ コネクタ使用してデータをリレーショナル データベースまたは非リレーショナル データベースにストリーミングするには、 avroプロトコルを使用してコンフルエント スキーマ レジストリ in schema-registryの URL を提供する必要があります。

サンプル構成:

--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink] dispatchers = [ {matcher = ['*.*'], topic = "tidb_{schema}_{table}"}, ]

詳細な統合ガイドについては、 TiDB と Confluent Platform の統合に関するクイック スタート ガイドを参照してください。

Kafka Sink のトピックおよびパーティション ディスパッチャーのルールをカスタマイズする

マッチャーのルール

前のセクションの例では:

  • マッチャー ルールに一致するテーブルについては、対応するトピック式で指定されたポリシーに従ってディスパッチされます。たとえば、 test3.aaテーブルは「トピック式 2」に従ってディスパッチされます。 test5.aaテーブルは「トピック式 3」に従ってディスパッチされます。
  • 複数のマッチャー ルールに一致するテーブルの場合、最初に一致したトピック式に従ってディスパッチされます。たとえば、「トピック表現 1」に従って、 test1.aaテーブルが分散されます。
  • どのマッチャー ルールにも一致しないテーブルの場合、対応するデータ変更イベントが--sink-uriで指定されたデフォルト トピックに送信されます。たとえば、 test10.aaテーブルはデフォルト トピックに送信されます。
  • マッチャー ルールに一致するが、トピック ディスパッチャーが指定されていないテーブルの場合、対応するデータ変更は--sink-uriで指定されたデフォルト トピックに送信されます。たとえば、 test6.aaテーブルはデフォルト トピックに送信されます。

トピック ディスパッチャ

topic = "xxx" を使用してトピック ディスパッチャを指定し、トピック式を使用して柔軟なトピック ディスパッチ ポリシーを実装できます。トピックの総数は 1000 未満にすることをお勧めします。

Topic 式の形式は[prefix]{schema}[middle][{table}][suffix]です。

  • prefix : オプション。トピック名のプレフィックスを示します。
  • {schema} : 必須。スキーマ名と一致させるために使用されます。
  • middle : オプション。スキーマ名とテーブル名の間の区切り文字を示します。
  • {table} : オプション。テーブル名と一致させるために使用されます。
  • suffix : オプション。トピック名のサフィックスを示します。

prefixmiddleおよびsuffixには、次の文字のみを含めることができます: a-zA-Z0-9._ 、および-{schema}{table}は両方とも小文字です。 {Schema}{TABLE}などのプレースホルダーは無効です。

いくつかの例:

  • matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"
    • test1.table1に対応するデータ変更イベントは、 hello_test1_table1という名前のトピックに送信されます。
    • test2.table2に対応するデータ変更イベントは、 hello_test2_table2という名前のトピックに送信されます。
  • matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"
    • test3のすべてのテーブルに対応するデータ変更イベントは、 hello_test3_worldという名前のトピックに送信されます。
    • test4のすべてのテーブルに対応するデータ変更イベントは、 hello_test4_worldという名前のトピックに送信されます。
  • matcher = ['*.*'], topic = "{schema}_{table}"
    • TiCDC がリッスンするすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。たとえば、 test.accountテーブルの場合、TiCDC はそのデータ変更ログをtest_accountという名前のトピックにディスパッチします。

DDL イベントのディスパッチ

スキーマレベルの DDL

create databasedrop databaseなど、特定のテーブルに関連付けられていない DDL は、スキーマ レベルの DDL と呼ばれます。スキーマレベルの DDL に対応するイベントは、 --sink-uriで指定されたデフォルトのトピックに送信されます。

テーブルレベルの DDL

alter tablecreate tableなど、特定のテーブルに関連する DDL はテーブルレベル DDL と呼ばれます。テーブルレベルの DDL に対応するイベントは、ディスパッチャの構成に従って、対応するトピックに送信されます。

たとえば、 matcher = ['test.*'], topic = {schema}_{table}のようなディスパッチャーの場合、DDL イベントは次のようにディスパッチされます。

  • DDL イベントに含まれるテーブルが 1 つの場合、DDL イベントは対応するトピックにそのまま送信されます。たとえば、DDL イベントdrop table test.table1の場合、イベントはtest_table1という名前のトピックに送信されます。
  • DDL イベントに複数のテーブルが含まれる場合 ( rename table / drop table / drop viewは複数のテーブルが含まれる場合があります)、DDL イベントは複数のイベントに分割され、対応するトピックに送信されます。たとえば、DDL イベントrename table test.table1 to test.table10, test.table2 to test.table20の場合、イベントrename table test.table1 to test.table10 test_table1という名前のトピックに送信され、イベントrename table test.table2 to test.table20 test.table2という名前のトピックに送信されます。

区画ディスパッチャー

partition = "xxx"使用して、パーティション ディスパッチャーを指定できます。デフォルト、ts、インデックス値、およびテーブルの 4 つのディスパッチャがサポートされています。ディスパッチャのルールは次のとおりです。

  • デフォルト: 複数の一意のインデックス (主キーを含む) が存在する場合、または古い値機能が有効になっている場合、イベントはテーブル モードでディスパッチされます。一意のインデックス (または主キー) が 1 つだけ存在する場合、イベントはインデックス値モードで送出されます。
  • ts: 行変更の commitTs を使用して、イベントをハッシュおよびディスパッチします。
  • index-value: 主キーの値またはテーブルの一意のインデックスを使用して、イベントをハッシュしてディスパッチします。
  • table: テーブルのスキーマ名とテーブル名を使用して、イベントをハッシュしてディスパッチします。

ノート:

v6.1.0 以降、構成の意味を明確にするために、パーティション ディスパッチャを指定するために使用される構成がdispatcherからpartitionに変更されましたpartitiondispatcherのエイリアスです。たとえば、次の 2 つのルールはまったく同じです。

[sink] dispatchers = [ {matcher = ['*.*'], dispatcher = "ts"}, {matcher = ['*.*'], partition = "ts"}, ]

ただし、 dispatcherpartition同じルールに含めることはできません。たとえば、次のルールは無効です。

{matcher = ['*.*'], dispatcher = "ts", partition = "table"},

このページは役に立ちましたか?