Pulsar へのデータの複製

このドキュメントでは、TiCDC を使用して増分データを Pulsar にレプリケートするチェンジフィードを作成する方法について説明します。

レプリケーション タスクを作成して増分データを Pulsar にレプリケートする

次のコマンドを実行して、レプリケーション タスクを作成します。

cdc cli changefeed create \ --server=http://127.0.0.1:8300 \ --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json" \ --config=./t_changefeed.toml \ --changefeed-id="simple-replication-task"
Create changefeed successfully! ID: simple-replication-task Info: {"upstream_id":7277814241002263370,"namespace":"default","id":"simple-replication-task","sink_uri":"pulsar://127.0.0.1:6650/consumer-test?protocol=canal-json","create_time":"2024-02-29T14:42:32.000904+08:00","start_ts":444203257406423044,"config":{"memory_quota":1073741824,"case_sensitive":false,"force_replicate":false,"ignore_ineligible_table":false,"check_gc_safe_point":true,"enable_sync_point":false,"bdr_mode":false,"sync_point_interval":600000000000,"sync_point_retention":86400000000000,"filter":{"rules":["pulsar_test.*"]},"mounter":{"worker_num":16},"sink":{"protocol":"canal-json","csv":{"delimiter":",","quote":"\"","null":"\\N","include_commit_ts":false,"binary_encoding_method":"base64"},"dispatchers":[{"matcher":["pulsar_test.*"],"partition":"","topic":"test_{schema}_{table}"}],"encoder_concurrency":16,"terminator":"\r\n","date_separator":"day","enable_partition_separator":true,"only_output_updated_columns":false,"delete_only_output_handle_key_columns":false,"pulsar_config":{"connection-timeout":30,"operation-timeout":30,"batching-max-messages":1000,"batching-max-publish-delay":10,"send-timeout":30},"advance_timeout":150},"consistent":{"level":"none","max_log_size":64,"flush_interval":2000,"use_file_backend":false},"scheduler":{"enable_table_across_nodes":false,"region_threshold":100000,"write_key_threshold":0},"integrity":{"integrity_check_level":"none","corruption_handle_level":"warn"}},"state":"normal","creator_version":"v7.5.1","resolved_ts":444203257406423044,"checkpoint_ts":444203257406423044,"checkpoint_time":"2024-02-29 14:42:31.410"}

各パラメータの意味は次のとおりです。

  • --server : TiCDC クラスター内の TiCDCサーバーのアドレス。
  • --changefeed-id : レプリケーション タスクの ID。形式は正規表現^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$と一致する必要があります。 ID が指定されていない場合、TiCDC は ID として UUID (バージョン 4 形式) を自動的に生成します。
  • --sink-uri : レプリケーションタスクの下流アドレス。 シンク URI を使用して Pulsar を構成するを参照してください。
  • --start-ts : チェンジフィードの開始 TSO。 TiCDC クラスターは、この TSO からのデータのプルを開始します。デフォルト値は現在時刻です。
  • --target-ts : チェンジフィードのターゲット TSO。 TiCDC クラスターは、この TSO でデータのプルを停止します。デフォルトでは空です。これは、TiCDC がデータのプルを自動的に停止しないことを意味します。
  • --config : チェンジフィード構成ファイル。 TiCDC チェンジフィード構成パラメータを参照してください。

シンク URI と変更フィード構成を使用して Pulsar を構成する

シンク URI を使用して TiCDC ターゲット システムの接続情報を指定し、changefeed config を使用して Pulsar に関連するパラメーターを構成できます。

シンク URI

シンク URI は次の形式に従います。

[scheme]://[userinfo@][host]:[port][/path]?[query_parameters]

コンフィグレーション例1:

--sink-uri="pulsar://127.0.0.1:6650/persistent://abc/def/yktest?protocol=canal-json"

コンフィグレーション例2:

--sink-uri="pulsar://127.0.0.1:6650/yktest?protocol=canal-json"

URI で構成可能なパラメータは次のとおりです。

パラメータ説明
127.0.0.1ダウンストリーム Pulsar がサービスを提供する IP アドレス。
6650ダウンストリーム Pulsar の接続ポート。
persistent://abc/def/yktest前述の構成例 1 に示すように、このパラメーターは、Pulsar のテナント、名前空間、およびトピックを指定するために使用されます。
yktest前述の構成例 2 に示すように、指定するトピックが Pulsar のデフォルト テナントpublicのデフォルト ネームスペースdefaultにある場合は、トピック名のみを使用して URI を構成できます (例: yktest )。これは、トピックをpersistent://public/default/yktestとして指定するのと同じです。

変更フィード構成パラメータ

以下は、changefeed 構成パラメーターの例です。

[sink] # `dispatchers` is used to specify matching rules. # Note: When the downstream MQ is Pulsar, if the routing rule for `partition` is not specified as any of `ts`, `index-value`, `table`, or `default`, each Pulsar message will be routed using the string you set as the key. # For example, if you specify the routing rule for a matcher as the string `code`, then all Pulsar messages that match that matcher will be routed with `code` as the key. # dispatchers = [ # {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, # {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, # {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, # {matcher = ['test6.*'], partition = "default"}, # {matcher = ['test7.*'], partition = "test123"} # ] # `protocol` is used to specify the protocol format for encoding messages. # When the downstream is Pulsar, the protocol can only be canal-json. # protocol = "canal-json" # The following parameters only take effect when the downstream is Pulsar. [sink.pulsar-config] # Authentication on the Pulsar server is done using a token. Specify the value of the token. authentication-token = "xxxxxxxxxxxxx" # When you use a token for Pulsar server authentication, specify the path to the file where the token is located. token-from-file="/data/pulsar/token-file.txt" # Pulsar uses the basic account and password to authenticate the identity. Specify the account. basic-user-name="root" # Pulsar uses the basic account and password to authenticate the identity. Specify the password. basic-password="password" # The certificate path on the client, which is required when Pulsar enables the mTLS authentication. auth-tls-certificate-path="/data/pulsar/certificate" # The private key path on the client, which is required when Pulsar enables the mTLS authentication. auth-tls-private-key-path="/data/pulsar/certificate.key" # The path to the trusted certificate file of the Pulsar TLS authentication, which is required when Pulsar enables the mTLS authentication or TLS encrypted transmission. tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file" # The path to the encrypted private key on the client, which is required when Pulsar enables TLS encrypted transmission. tls-key-file-path="/data/pulsar/tls-key-file" # The path to the encrypted certificate file on the client, which is required when Pulsar enables TLS encrypted transmission. tls-certificate-file="/data/pulsar/tls-certificate-file" # Pulsar oauth2 issuer-url. For more information, see the Pulsar website: https://pulsar.apache.org/docs/2.10.x/client-libraries-go/#tls-encryption-and-authentication oauth2.oauth2-issuer-url="https://xxxx.auth0.com" # Pulsar oauth2 audience oauth2.oauth2-audience="https://xxxx.auth0.com/api/v2/" # Pulsar oauth2 private-key oauth2.oauth2-private-key="/data/pulsar/privateKey" # Pulsar oauth2 client-id oauth2.oauth2-client-id="0Xx...Yyxeny" # Pulsar oauth2 oauth2-scope oauth2.oauth2-scope="xxxx" # The number of cached Pulsar producers in TiCDC. The value is 10240 by default. Each Pulsar producer corresponds to one topic. If the number of topics you need to replicate is larger than the default value, you need to increase the number. pulsar-producer-cache-size=10240 # Pulsar data compression method. No compression is used by default. Optional values are "lz4", "zlib", and "zstd". compression-type="" # The timeout for the Pulsar client to establish a TCP connection with the server. The value is 5 seconds by default. connection-timeout=5 # The timeout for Pulsar clients to initiate operations such as creating and subscribing to a topic. The value is 30 seconds by default. operation-timeout=30 # The maximum number of messages in a single batch for a Pulsar producer to send. The value is 1000 by default. batching-max-messages=1000 # The interval at which Pulsar producer messages are saved for batching. The value is 10 milliseconds by default. batching-max-publish-delay=10 # The timeout for a Pulsar producer to send a message. The value is 30 seconds by default. send-timeout=30

ベストプラクティス

  • 変更フィードを作成するときは、 protocolパラメーターを指定する必要があります。現在、Pulsar へのデータの複製ではcanal-jsonプロトコルのみがサポートされています。
  • pulsar-producer-cache-sizeパラメータは、Pulsar クライアントにキャッシュされたプロデューサーの数を示します。 Pulsar の各プロデューサーは 1 つのトピックにのみ対応できるため、TiCDC は LRU 方式を採用してプロデューサーをキャッシュし、デフォルトの制限は 10240 です。複製する必要があるトピックの数がデフォルト値より大きい場合は、その数を増やす必要があります。 。

TLS暗号化通信

v7.5.1 以降の v7.5 パッチ バージョンでは、TiCDC は Pulsar の TLS 暗号化送信をサポートします。構成例は以下のとおりです。

シンク URI:

--sink-uri="pulsar+ssl://127.0.0.1:6651/persistent://public/default/yktest?protocol=canal-json"

コンフィグレーション:

[sink.pulsar-config] tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file"

Pulsarサーバーに対してtlsRequireTrustedClientCertOnConnect=trueパラメータが設定されている場合は、changefeed 設定ファイルでtls-key-file-pathおよびtls-certificate-fileパラメータも設定する必要があります。例えば:

[sink.pulsar-config] tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file" tls-certificate-file="/data/pulsar/tls-certificate-file" tls-key-file-path="/data/pulsar/tls-key-file"

Pulsar の TiCDC 認証と認可

以下は、Pulsar でトークン認証を使用する場合のサンプル構成です。

  • トークン

    シンク URI:

    --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json"

    設定パラメータ:

    [sink.pulsar-config] authentication-token = "xxxxxxxxxxxxx"
  • ファイルからのトークン

    シンク URI:

    --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json"

    設定パラメータ:

    [sink.pulsar-config] # Pulsar uses tokens for authentication on the Pulsar server. Specify the path to the token file, which will be read from the TiCDC server. token-from-file="/data/pulsar/token-file.txt"
  • mTLS認証

    シンク URI:

    --sink-uri="pulsar+ssl://127.0.0.1:6651/persistent://public/default/yktest?protocol=canal-json"

    構成パラメータ:

    [sink.pulsar-config] # Certificate path of the Pulsar mTLS authentication auth-tls-certificate-path="/data/pulsar/certificate" # Private key path of the Pulsar mTLS authentication auth-tls-private-key-path="/data/pulsar/certificate.key" # Path to the trusted certificate file of the Pulsar mTLS authentication tls-trust-certs-file-path="/data/pulsar/tls-trust-certs-file"
  • OAuth2認証

    v7.5.1 以降の v7.5 パッチ バージョンの場合、TiCDC は Pulsar の OAuth2 認証をサポートします。

    シンク URI:

    --sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/yktest?protocol=canal-json"

    構成パラメータ:

    [sink.pulsar-config] # Pulsar oauth2 issuer-url. For more information, see the Pulsar website: https://pulsar.apache.org/docs/2.10.x/client-libraries-go/#oauth2-authentication oauth2.oauth2-issuer-url="https://xxxx.auth0.com" # Pulsar oauth2 audience oauth2.oauth2-audience="https://xxxx.auth0.com/api/v2/" # Pulsar oauth2 private-key oauth2.oauth2-private-key="/data/pulsar/privateKey" # Pulsar oauth2 client-id oauth2.oauth2-client-id="0Xx...Yyxeny" # Pulsar oauth2 oauth2-scope oauth2.oauth2-scope="xxxx"

Pulsar シンクのトピックとパーティションのディスパッチ ルールをカスタマイズする

Matcher のマッチング ルール

例として、次のサンプル構成ファイルのdispatchers構成項目を取り上げます。

[sink] dispatchers = [ {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" }, {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" }, {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"}, {matcher = ['test6.*'], partition = "default"}, {matcher = ['test7.*'], partition = "test123"} ]
  • マッチャー ルールに一致するテーブルは、対応するトピック式で指定されたポリシーに従ってディスパッチされます。たとえば、テーブルtest3.aaTopic expression 2に従ってディスパッチされ、テーブルtest5.aaTopic expression 3に従ってディスパッチされます。
  • 複数のマッチャー ルールに一致するテーブルの場合、最初に一致したトピック式に従ってテーブルがディスパッチされます。たとえば、テーブルtest1.aaTopic expression 1に従ってディスパッチされます。
  • どのマッチャーにも一致しないテーブルの場合、対応するデータ変更イベントは-sink-uriで指定されたデフォルトのトピックに送信されます。たとえば、テーブルtest10.aaはデフォルトのトピックに送信されます。
  • マッチャー ルールに一致するがトピック ディスパッチャーが指定されていないテーブルの場合、対応するデータ変更は-sink-uriで指定されたデフォルトのトピックに送信されます。たとえば、テーブルtest6.abcはデフォルトのトピックに送信されます。

トピックディスパッチャー

topic = "xxx"を使用してトピック ディスパッチャを指定し、トピック式を使用して柔軟なトピック ディスパッチ ポリシーを実装できます。トピックの総数は 1000 未満にすることをお勧めします。

トピック式の形式は[prefix]{schema}[middle][{table}][suffix]です。各部分の意味は次のとおりです。

  • prefix : オプション。トピック名のプレフィックスを表します。
  • {schema} : オプション。データベース名を表します。
  • middle : オプション。データベース名とテーブル名の間の区切り文字を表します。
  • {table} : オプション。テーブル名を表します。
  • suffix : オプション。トピック名の接尾辞を表します。

prefixmiddle 、およびsuffix 、大文字と小文字 ( a-zA-Z )、数字 ( 0-9 )、ドット ( . )、アンダースコア ( _ )、およびハイフン ( - ) のみをサポートします。 {schema}{table}小文字でなければなりません。 {Schema}{TABLE}などの大文字を含むプレースホルダーは無効です。

以下にいくつかの例を示します。

  • matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"

    • テーブルtest1.table1に対応するデータ変更イベントは、 hello_test1_table1という名前のトピックに送出されます。
    • テーブルtest2.table2に対応するデータ変更イベントは、 hello_test2_table2という名前のトピックに送出されます。
  • matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"

    • test3の下のすべてのテーブルのデータ変更イベントは、 hello_test3_worldという名前のトピックに送出されます。
    • test4の下のすべてのテーブルのデータ変更イベントは、 hello_test4_worldという名前のトピックに送出されます。
  • matcher = ['*.*'], topic = "{schema}_{table}"

    • TiCDC がリッスンするすべてのテーブルについては、ルールdatabaseName_tableNameに従って別のトピックにディスパッチされます。たとえば、テーブルtest.accountの場合、TiCDC はデータ変更ログをtest_accountという名前のトピックに送信します。

DDL イベントをディスパッチする

データベースレベルのDDLイベント

CREATE DATABASEDROP DATABASEなど、特定のテーブルに関連しない DDL ステートメントは、データベース レベルの DDL ステートメントと呼ばれます。データベースレベルの DDL ステートメントに対応するイベントは、 --sink-uriで指定されたデフォルトのトピックにディスパッチされます。

テーブルレベルのDDLイベント

特定のテーブルに関連するALTER TABLECREATE TABLEなどの DDL ステートメントは、テーブル レベルの DDL ステートメントと呼ばれます。テーブルレベルの DDL ステートメントに対応するイベントは、 dispatchersの構成に従って適切なトピックにディスパッチされます。

たとえば、 matcher = ['test.*'], topic = {schema}_{table}のようなdispatchers構成の場合、DDL イベントは次のように送出されます。

  • DDL イベントに 1 つのテーブルのみが関与する場合、DDL イベントはそのまま適切なトピックにディスパッチされます。たとえば、DDL イベントDROP TABLE test.table1の場合、イベントはtest_table1という名前のトピックにディスパッチされます。

  • DDL イベントに複数のテーブルが関与する場合 ( RENAME TABLEDROP TABLE 、およびDROP VIEWはすべて複数のテーブルに関与する可能性があります)、単一の DDL イベントは複数の DDL イベントに分割され、適切なトピックにディスパッチされます。たとえば、DDL イベントRENAME TABLE test.table1 TO test.table10, test.table2 TO test.table20の場合、処理は次のようになります。

    • RENAME TABLE test.table1 TO test.table10の DDL イベントをtest_table1という名前のトピックにディスパッチします。
    • RENAME TABLE test.table2 TO test.table20の DDL イベントをtest_table2という名前のトピックにディスパッチします。

パーティションディスパッチャー

現在、TiCDC は、コンシューマが排他的サブスクリプション モデルを使用してメッセージを消費することのみをサポートしています。つまり、各コンシューマはトピック内のすべてのパーティションからメッセージを消費できます。

partition = "xxx"を使用してパーティション ディスパッチャを指定できます。次のパーティション ディスパッチがサポートされています: defaulttsindex-value 、およびtable 。他の文字列を入力すると、TiCDC はその文字列を Pulsarサーバーに送信されるメッセージのメッセージのkeyとして渡します。

派遣ルールは以下の通りです。

  • default : デフォルトでは、 tableを指定した場合と同じスキーマ名とテーブル名によってイベントが送出されます。
  • ts : 行変更の commitT を使用してハッシュ計算を実行し、イベントをディスパッチします。
  • index-value : テーブルの主キーまたは一意のインデックスの値を使用して、ハッシュ計算を実行し、イベントをディスパッチします。
  • table : スキーマ名とテーブル名を使用してハッシュ計算を実行し、イベントをディスパッチします。
  • その他の自己定義文字列: 自己定義文字列は Pulsar メッセージのキーとして直接使用され、Pulsar プロデューサはこのキー値をディスパッチに使用します。

このページは役に立ちましたか?

Playground
新規
登録なしで TiDB の機能をワンストップでインタラクティブに体験できます。
製品
TiDB Cloud
TiDB
価格
PoC お問い合わせ
エコシステム
TiKV
TiFlash
OSS Insight
© 2024 PingCAP. All Rights Reserved.
Privacy Policy.