Kafka へのデータのレプリケーション
このドキュメントでは、TiCDC を使用して増分データを Apache Kafka にレプリケートするチェンジフィードを作成する方法について説明します。
レプリケーションタスクを作成する
次のコマンドを実行して、レプリケーション タスクを作成します。
cdc cli changefeed create \
    --server=http://10.0.10.25:8300 \
    --sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1" \
    --changefeed-id="simple-replication-task"
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1","opts":{},"create-time":"2023-12-21T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":false,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
--server: TiCDC クラスター内の任意の TiCDCサーバーのアドレス。--changefeed-id: レプリケーション タスクの ID。形式は^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$正規表現と一致する必要があります。この ID が指定されていない場合、TiCDC は UUID (バージョン 4 形式) を ID として自動的に生成します。--sink-uri: レプリケーションタスクの下流アドレス。詳細はkafkaを使用してシンク URI を構成するを参照してください。--start-ts: チェンジフィードの開始 TSO を指定します。この TSO から、TiCDC クラスターはデータのプルを開始します。デフォルト値は現在時刻です。--target-ts: チェンジフィードの終了 TSO を指定します。この TSO に対して、TiCDC クラスターはデータのプルを停止します。デフォルト値は空です。これは、TiCDC がデータのプルを自動的に停止しないことを意味します。--config: チェンジフィード構成ファイルを指定します。詳細はTiCDC Changefeedコンフィグレーションパラメータを参照してください。
Kafka のシンク URI を構成する
シンク URI は、TiCDC ターゲット システムの接続情報を指定するために使用されます。形式は次のとおりです。
[scheme]://[userinfo@][host]:[port][/path]?[query_parameters]
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
以下は、Kafka 用に構成できるシンク URI パラメーターと値の説明です。
| パラメータ/パラメータ値 | 説明 | 
|---|---|
127.0.0.1 | ダウンストリーム Kafka サービスの IP アドレス。 | 
9092 | ダウンストリーム Kafka のポート。 | 
topic-name | 変数。 Kafka トピックの名前。 | 
kafka-version | ダウンストリーム Kafka のバージョン (オプション、デフォルトは2.4.0現在、サポートされている最も古い Kafka バージョンは0.11.0.2で、最新のものは3.2.0です。この値は、ダウンストリーム Kafka の実際のバージョンと一致している必要があります)。 | 
kafka-client-id | レプリケーション タスクの Kafka クライアント ID を指定します (オプション。デフォルトではTiCDC_sarama_producer_replication ID )。 | 
partition-num | ダウンストリーム Kafka パーティションの数 (オプション。値は実際のパーティション数以下である必要があります。そうでない場合、レプリケーション タスクは正常に作成できません。デフォルトでは3 )。 | 
max-message-bytes | 毎回 Kafka ブローカーに送信されるデータの最大サイズ (オプション、デフォルトでは10MB )。 v5.0.6 および v4.0.6 から、デフォルト値は64MBおよび256MBから10MBに変更されました。 | 
replication-factor | 保存できる Kafka メッセージ レプリカの数 (オプション、デフォルトでは1 )。この値は、Kafka の値min.insync.replicas以上である必要があります。 | 
required-acks | Produceリクエストで使用されるパラメータ。応答する前に受信する必要があるレプリカ確認応答の数をブローカーに通知します。値のオプションは0 ( NoResponse : 応答なし、 TCP ACKのみが提供される)、 1 ( WaitForLocal : ローカル コミットが正常に送信された後にのみ応答する)、および-1 ( WaitForAll : すべての複製されたレプリカが正常にコミットされた後に応答する) です。最小数は構成できます。ブローカーのmin.insync.replicas構成項目を使用して複製されたレプリカの数)。 (オプション、デフォルト値は-1 )。 | 
compression | メッセージの送信時に使用される圧縮アルゴリズム (値のオプションはnone 、 lz4 、 gzip 、 snappy 、およびzstdです。デフォルトではnoneです)。 Snappy 圧縮ファイルは公式の Snappy フォーマットにある必要があることに注意してください。 Snappy 圧縮の他のバリアントはサポートされていません。 | 
protocol | Kafka へのメッセージの出力に使用されるプロトコル。値のオプションはcanal-json 、 open-protocol 、 canal 、 avroおよびmaxwellです。 | 
auto-create-topic | 渡されたtopic-name Kafka クラスターに存在しない場合に、TiCDC がトピックを自動的に作成するかどうかを決定します (オプション、デフォルトではtrue )。 | 
enable-tidb-extension | オプション。デフォルトではfalse 。出力プロトコルがcanal-jsonの場合、値がtrueの場合、TiCDC はウォーターマークイベント送信し、 TiDB 拡張フィールドを Kafka メッセージに追加します。 v6.1.0 以降、このパラメータはavroプロトコルにも適用されます。値がtrueの場合、TiCDC は Kafka メッセージに3 つの TiDB 拡張フィールドを追加します。 | 
max-batch-size | v4.0.9 の新機能。メッセージ プロトコルが 1 つの Kafka メッセージへの複数のデータ変更の出力をサポートしている場合、このパラメーターは 1 つの Kafka メッセージ内のデータ変更の最大数を指定します。現在、Kafka のprotocolがopen-protocol (オプション、デフォルトでは16 ) の場合にのみ有効になります。 | 
enable-tls | TLS を使用してダウンストリーム Kafka インスタンスに接続するかどうか (オプション、デフォルトではfalse )。 | 
ca | ダウンストリーム Kafka インスタンスに接続するために必要な CA 証明書ファイルのパス (オプション)。 | 
cert | ダウンストリーム Kafka インスタンスに接続するために必要な証明書ファイルのパス (オプション)。 | 
key | ダウンストリーム Kafka インスタンスに接続するために必要な証明書キー ファイルのパス (オプション)。 | 
insecure-skip-verify | ダウンストリーム Kafka インスタンスに接続するときに証明書の検証をスキップするかどうか (オプション、デフォルトではfalse )。 | 
sasl-user | ダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証の ID (authcid) (オプション)。 | 
sasl-password | ダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証のパスワード (オプション)。特殊文字が含まれている場合は、URL エンコードする必要があります。 | 
sasl-mechanism | ダウンストリーム Kafka インスタンスに接続するために必要な SASL 認証の名前。値はplain 、 scram-sha-256 、 scram-sha-512 、またはgssapiです。 | 
sasl-gssapi-auth-type | gssapi 認証タイプ。値はuserまたはkeytab (オプション) です。 | 
sasl-gssapi-keytab-path | gssapi keytab パス (オプション)。 | 
sasl-gssapi-kerberos-config-path | gssapi kerberos 構成パス (オプション)。 | 
sasl-gssapi-service-name | gssapi サービス名 (オプション)。 | 
sasl-gssapi-user | gssapi 認証のユーザー名 (オプション)。 | 
sasl-gssapi-password | gssapi 認証のパスワード (オプション)。特殊文字が含まれている場合は、URL エンコードする必要があります。 | 
sasl-gssapi-realm | gssapi レルム名 (オプション)。 | 
sasl-gssapi-disable-pafxfast | gssapi PA-FX-FAST を無効にするかどうか (オプション)。 | 
dial-timeout | ダウンストリーム Kafka との接続を確立する際のタイムアウト。デフォルト値は10sです。 | 
read-timeout | ダウンストリーム Kafka から返される応答を取得する際のタイムアウト。デフォルト値は10sです。 | 
write-timeout | ダウンストリーム Kafka にリクエストを送信する際のタイムアウト。デフォルト値は10sです。 | 
avro-decimal-handling-mode | avroプロトコルでのみ有効です。 Avro が DECIMAL フィールドを処理する方法を決定します。値はstringまたはpreciseで、DECIMAL フィールドを文字列または正確な浮動小数点数にマッピングすることを示します。 | 
avro-bigint-unsigned-handling-mode | avroプロトコルでのみ有効です。 Avro が BIGINT UNSIGNED フィールドを処理する方法を決定します。値はstringまたはlongで、BIGINT UNSIGNED フィールドを 64 ビットの符号付き数値または文字列にマッピングすることを示します。 | 
ベストプラクティス
- 独自の Kafka トピックを作成することをお勧めします。少なくとも、トピックが Kafka ブローカーに送信できる各メッセージの最大データ量と、ダウンストリーム Kafka パーティションの数を設定する必要があります。チェンジフィードを作成するとき、これら 2 つの設定はそれぞれ
max-message-bytesとpartition-numに対応します。 - まだ存在しないトピックを含むチェンジフィードを作成する場合、TiCDC は
partition-numおよびreplication-factorパラメーターを使用してトピックを作成しようとします。これらのパラメータを明示的に指定することをお勧めします。 - ほとんどの場合、 
canal-jsonプロトコルを使用することをお勧めします。 
注記:
protocolがopen-protocolの場合、TiCDC は長さがmax-message-bytesを超えるメッセージの生成を回避しようとします。ただし、行が大きすぎて 1 つの変更だけで長さがmax-message-bytesを超える場合、サイレント障害を避けるために、TiCDC はこのメッセージの出力を試行し、ログに警告を出力。
TiCDC は Kafka の認証と認可を使用します。
以下は、Kafka SASL 認証を使用する場合の例です。
SASL/プレーン
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"SASL/スクラム
SCRAM-SHA-256 および SCRAM-SHA-512 は PLAIN メソッドに似ています。対応する認証方法として
sasl-mechanismを指定するだけです。SASL/GSSAPI
SASL/GSSAPI
user認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"sasl-gssapi-userとsasl-gssapi-realmの値は、kerberos で指定された原理に関連しています。たとえば、原則がalice/for-kafka@example.comに設定されている場合、sasl-gssapi-userとsasl-gssapi-realmそれぞれalice/for-kafkaとexample.comとして指定されます。SASL/GSSAPI
keytab認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"SASL/GSSAPI 認証方法の詳細については、 GSSAPIの構成を参照してください。
TLS/SSL暗号化
Kafka ブローカーで TLS/SSL 暗号化が有効になっている場合は、
-enable-tls=trueパラメーターを--sink-uriに追加する必要があります。自己署名証明書を使用する場合は、--sink-uriにca、cert、およびkeyも指定する必要があります。ACL認可
TiCDC が適切に機能するために必要な最小限の権限セットは次のとおりです。
- トピックリソースタイプの
Create、Write、およびDescribe権限。 - クラスタリソース タイプの
DescribeConfigs権限。 
- トピックリソースタイプの
 
TiCDC と Kafka Connect (Confluent プラットフォーム) の統合
Confluent が提供するデータコネクタ使用してデータをリレーショナル データベースまたは非リレーショナル データベースにストリーミングするには、 avroプロトコルを使用し、 Confluent スキーマ レジストリ in schema-registryの URL を指定する必要があります。
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
詳細な統合ガイドについては、 TiDB と Confluent プラットフォームの統合に関するクイック スタート ガイドを参照してください。
Kafka シンクのトピックおよびパーティション ディスパッチャーのルールをカスタマイズする
マッチャーのルール
例として、次のdispatchersの構成を取り上げます。
[sink]
dispatchers = [
  {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" },
  {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" },
  {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"},
  {matcher = ['test6.*'], partition = "ts"}
]
- マッチャー ルールに一致するテーブルの場合、対応するトピック式で指定されたポリシーに従ってディスパッチされます。たとえば、テーブル
test3.aaは「トピック式 2」に従ってディスパッチされます。test5.aaテーブルは「トピック式 3」に従ってディスパッチされます。 - 複数のマッチャー ルールに一致するテーブルの場合、最初に一致したトピック式に従ってディスパッチされます。例えば、 
test1.aaテーブルは「トピック表現1」に従って配布されます。 - どのマッチャー ルールにも一致しないテーブルの場合、対応するデータ変更イベントは
--sink-uriで指定されたデフォルトのトピックに送信されます。たとえば、test10.aaテーブルはデフォルトのトピックに送信されます。 - マッチャー ルールに一致するがトピック ディスパッチャーを指定していないテーブルの場合、対応するデータ変更は
--sink-uriで指定されたデフォルトのトピックに送信されます。たとえば、test6.aaテーブルはデフォルトのトピックに送信されます。 
トピックディスパッチャー
topic = "xxx" を使用してトピック ディスパッチャを指定し、トピック式を使用して柔軟なトピック ディスパッチ ポリシーを実装できます。トピックの総数は 1000 未満にすることをお勧めします。
Topic 式の形式は[prefix]{schema}[middle][{table}][suffix]です。
prefix: オプション。トピック名のプレフィックスを示します。{schema}: 必須。スキーマ名と一致させるために使用されます。middle: オプション。スキーマ名とテーブル名の間の区切り文字を示します。{table}: オプション。テーブル名と一致させるために使用されます。suffix: オプション。トピック名の接尾辞を示します。
prefix 、 middleおよびsuffixには、文字a-z 、 A-Z 、 0-9 、 . 、 _および-のみを含めることができます。 {schema}と{table}は両方とも小文字です。 {Schema}や{TABLE}などのプレースホルダは無効です。
いくつかの例:
matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"test1.table1に対応するデータ変更イベントは、hello_test1_table1という名前のトピックに送信されます。test2.table2に対応するデータ変更イベントは、hello_test2_table2という名前のトピックに送信されます。
matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"test3のすべてのテーブルに対応するデータ変更イベントは、hello_test3_worldという名前のトピックに送信されます。test4のすべてのテーブルに対応するデータ変更イベントは、hello_test4_worldという名前のトピックに送信されます。
matcher = ['*.*'], topic = "{schema}_{table}"- TiCDC によってリッスンされるすべてのテーブルは、「schema_table」ルールに従って別のトピックにディスパッチされます。たとえば、テーブル
test.accountの場合、TiCDC はデータ変更ログをtest_accountという名前のトピックにディスパッチします。 
- TiCDC によってリッスンされるすべてのテーブルは、「schema_table」ルールに従って別のトピックにディスパッチされます。たとえば、テーブル
 
DDL イベントをディスパッチする
スキーマレベルの DDL
特定のテーブルに関連付けられていない DDL は、 create databaseやdrop databaseなどのスキーマ レベル DDL と呼ばれます。スキーマレベルの DDL に対応するイベントは、 --sink-uriで指定されたデフォルトのトピックに送信されます。
テーブルレベルの DDL
特定のテーブルに関連する DDL は、 alter tableやcreate tableなどのテーブル レベル DDL と呼ばれます。テーブルレベルの DDL に対応するイベントは、ディスパッチャー構成に従って、対応するトピックに送信されます。
たとえば、 matcher = ['test.*'], topic = {schema}_{table}のようなディスパッチャの場合、DDL イベントは次のようにディスパッチされます。
- DDL イベントに単一のテーブルが関与している場合、DDL イベントは対応するトピックにそのまま送信されます。たとえば、DDL イベント
drop table test.table1の場合、イベントはtest_table1という名前のトピックに送信されます。 - DDL イベントに複数のテーブルが関与する場合 ( 
rename tabledrop tableは複数のテーブルが関与する場合があります)、DDL イベントは複数のイベントに分割され、対応するトピックに送信さdrop viewます。たとえば、DDL イベントrename table test.table1 to test.table10, test.table2 to test.table20の場合、イベントrename table test.table1 to test.table10test_table1という名前のトピックに送信され、イベントrename table test.table2 to test.table20test.table2という名前のトピックに送信されます。 
パーティションディスパッチャ
partition = "xxx"を使用してパーティション ディスパッチャを指定できます。これは、default、ts、index-value、および table の 4 つのディスパッチャーをサポートします。ディスパッチャのルールは次のとおりです。
- デフォルト: 複数の一意のインデックス (主キーを含む) が存在する場合、または古い値機能が有効になっている場合、イベントはテーブル モードで送出されます。一意のインデックス (または主キー) が 1 つだけ存在する場合、イベントはインデックス値モードで送出されます。
 - ts: 行変更の commitT を使用してイベントをハッシュし、ディスパッチします。
 - インデックス値: 主キーの値またはテーブルの一意のインデックスを使用して、イベントをハッシュし、ディスパッチします。
 - table: テーブルのスキーマ名とテーブル名を使用して、イベントをハッシュしてディスパッチします。
 
注記:
v6.1.0 以降、構成の意味を明確にするために、パーティション ディスパッチャーの指定に使用される構成は
dispatcherからpartitionに変更され、partitionはdispatcherのエイリアスです。たとえば、次の 2 つのルールはまったく同じです。[sink] dispatchers = [ {matcher = ['*.*'], dispatcher = "ts"}, {matcher = ['*.*'], partition = "ts"}, ]ただし、
dispatcherとpartition同じルールに含めることはできません。たとえば、次のルールは無効です。{matcher = ['*.*'], dispatcher = "ts", partition = "table"},
単一の大きなテーブルの負荷を複数の TiCDC ノードにスケールアウトします。
この機能は、1 つの大きなテーブルのデータ レプリケーション範囲を、データ量と 1 分あたりの変更行数に応じて複数の範囲に分割し、各範囲でレプリケートされるデータ量と変更行数をほぼ同じにします。この機能は、これらの範囲をレプリケーション用の複数の TiCDC ノードに分散するため、複数の TiCDC ノードが大きな単一テーブルを同時にレプリケートできます。この機能により、次の 2 つの問題が解決されます。
- 単一の TiCDC ノードは、大きな単一テーブルを時間内に複製できません。
 - TiCDC ノードによって消費されるリソース (CPU やメモリなど) は均等に分散されていません。
 
サンプル構成:
[scheduler]
# The default value is "false". You can set it to "true" to enable this feature.
enable-table-across-nodes = true
# When you enable this feature, it only takes effect for tables with the number of regions greater than the `region-threshold` value.
region-threshold = 100000
# When you enable this feature, it takes effect for tables with the number of rows modified per minute greater than the `write-key-threshold` value.
# Note:
# * The default value of `write-key-threshold` is 0, which means that the feature does not split the table replication range according to the number of rows modified in a table by default.
# * You can configure this parameter according to your cluster workload. For example, if it is configured as 30000, it means that the feature will split the replication range of a table when the number of modified rows per minute in the table exceeds 30000.
# * When `region-threshold` and `write-key-threshold` are configured at the same time:
#   TiCDC will check whether the number of modified rows is greater than `write-key-threshold` first.
#   If not, next check whether the number of Regions is greater than `region-threshold`.
write-key-threshold = 30000
次の SQL ステートメントを使用して、テーブルに含まれるリージョンの数をクエリできます。
SELECT COUNT(*) FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS WHERE DB_NAME="database1" AND TABLE_NAME="table1" AND IS_INDEX=0;
Kafka トピックの制限を超えるメッセージを処理する
Kafka トピックは、受信できるメッセージのサイズに制限を設定します。この制限はmax.message.bytesパラメータによって制御されます。 TiCDC Kafka シンクがこの制限を超えるデータを送信すると、変更フィードはエラーを報告し、データの複製を続行できません。この問題を解決するために、TiCDC は次のソリューションを提供します。
ハンドル キーのみを送信する
v7.1.2 以降、TiCDC Kafka シンクは、メッセージ サイズが制限を超えた場合のハンドル キーの送信のみをサポートします。これにより、メッセージ サイズが大幅に削減され、Kafka トピックの制限を超えるメッセージ サイズによって引き起こされるチェンジフィード エラーやタスクの失敗を回避できます。ハンドル キーとは次のことを指します。
- レプリケートされるテーブルに主キーがある場合、主キーはハンドル キーになります。
 - テーブルに主キーがなくても NOT NULL 固有キーがある場合、NOT NULL 固有キーがハンドル キーになります。
 
現在、この機能は、Canal-JSON と Open Protocol の 2 つのエンコード プロトコルをサポートしています。 Canal-JSON プロトコルを使用する場合は、 enable-tidb-extension=true in sink-uriを指定する必要があります。
サンプル構成は次のとおりです。
[sink.kafka-config.large-message-handle]
# This configuration is introduced in v7.1.2.
# Empty by default, which means when the message size exceeds the limit, the changefeed fails.
# If this configuration is set to "handle-key-only", when the message size exceeds the limit, only the handle key is sent in the data field. If the message size still exceeds the limit, the changefeed fails.
large-message-handle-option = "handle-key-only"
ハンドル キーのみを使用してメッセージを消費する
ハンドルキーのみのメッセージフォーマットは次のとおりです。
{
    "id": 0,
    "database": "test",
    "table": "tp_int",
    "pkNames": [
        "id"
    ],
    "isDdl": false,
    "type": "INSERT",
    "es": 1639633141221,
    "ts": 1639633142960,
    "sql": "",
    "sqlType": {
        "id": 4
    },
    "mysqlType": {
        "id": "int"
    },
    "data": [
        {
          "id": "2"
        }
    ],
    "old": null,
    "_tidb": {     // TiDB extension fields
        "commitTs": 163963314122145239,
        "onlyHandleKey": true
    }
}
Kafka コンシューマはメッセージを受信すると、まずonlyHandleKeyフィールドをチェックします。このフィールドが存在し、 trueである場合、メッセージには完全なデータのハンドル キーのみが含まれていることを意味します。この場合、完全なデータを取得するには、上流の TiDB にクエリを実行し、 tidb_snapshot履歴データを読み取るを使用する必要があります。