TiCDCクラスタとレプリケーション タスクの管理
このドキュメントでは、TiUP を使用して TiCDC クラスターをアップグレードし、TiCDC クラスターの構成を変更する方法、およびコマンドライン ツールを使用して TiCDC クラスターとレプリケーション タスクを管理する方法について説明しますcdc cli
。
HTTP インターフェイス (TiCDC OpenAPI 機能) を使用して、TiCDC クラスターとレプリケーション タスクを管理することもできます。詳細については、 TiCDC OpenAPIを参照してください。
TiUP を使用して TiCDC をアップグレードする
このセクションでは、TiUP を使用して TiCDC クラスターをアップグレードする方法を紹介します。次の例では、TiCDC と TiDB クラスター全体を v6.1.2 にアップグレードする必要があると想定しています。
tiup update --self && \
tiup update --all && \
tiup cluster upgrade <cluster-name> v6.1.2
バージョンアップ時の注意事項
changefeed
の構成は、TiCDC v4.0.2 で変更されました。詳細は構成ファイルの互換性に関する注意事項を参照してください。- 問題が発生した場合は、 TiUP を使用して TiDB をアップグレードする -FAQを参照してください。
TiUP を使用して TiCDC 構成を変更する
このセクションでは、TiUP のtiup cluster edit-config
コマンドを使用して、TiCDC クラスターの構成を変更する方法を紹介します。次の例では、値gc-ttl
をデフォルトの86400
から3600
、つまり 1 時間に変更します。
まず、次のコマンドを実行します。 <cluster-name>
を実際のクラスター名に置き換える必要があります。
tiup cluster edit-config <cluster-name>
次に、vi エディター ページに入り、 server-configs
の下のcdc
構成を変更します。構成を以下に示します。
server_configs:
tidb: {}
tikv: {}
pd: {}
tiflash: {}
tiflash-learner: {}
pump: {}
drainer: {}
cdc:
gc-ttl: 3600
変更後、 tiup cluster reload -R cdc
コマンドを実行して構成をリロードします。
TLS を使用する
暗号化データ転送 (TLS) の使用について詳しくは、 TiDB コンポーネント間の TLS を有効にするを参照してください。
cdc cli
を使用してクラスターのステータスとデータ複製タスクを管理する
このセクションでは、 cdc cli
を使用して TiCDC クラスターとデータ複製タスクを管理する方法を紹介します。 cdc cli
は、 cdc
バイナリを使用して実行されるcli
サブコマンドです。以下の説明では、次のことを前提としています。
cli
コマンドはcdc
バイナリを使用して直接実行されます。- PD は
10.0.10.25
でリッスンし、ポートは2379
です。
ノート:
PD が listen する IP アドレスとポートは、
pd-server
始動時に指定されたadvertise-client-urls
パラメーターに対応します。複数のpd-server
には複数のadvertise-client-urls
パラメータがあり、1 つまたは複数のパラメータを指定できます。たとえば、--pd=http://10.0.10.25:2379
または--pd=http://10.0.10.25:2379,http://10.0.10.26:2379,http://10.0.10.27:2379
です。
TiUP を使用して TiCDC をデプロイする場合は、次のコマンドのcdc cli
をtiup ctl:<cluster-version> cdc
に置き換えます。
TiCDC サービスの進行状況を管理する ( capture
)
capture
のリストをクエリします。cdc cli capture list --pd=http://10.0.10.25:2379[ { "id": "806e3a1b-0e31-477f-9dd6-f3f2c570abdd", "is-owner": true, "address": "127.0.0.1:8300" }, { "id": "ea2a4203-56fe-43a6-b442-7b295f458ebc", "is-owner": false, "address": "127.0.0.1:8301" } ]id
: サービス プロセスの ID。is-owner
: サービスプロセスがオーナーノードかどうかを示します。address
: サービス プロセスが外部へのインターフェイスを提供するためのアドレス。
レプリケーション タスクの管理 ( changefeed
)
レプリケーション タスクの状態転送
レプリケーション タスクの状態は、レプリケーション タスクの実行ステータスを表します。 TiCDC の実行中に、レプリケーション タスクがエラーで失敗したり、手動で一時停止、再開したり、指定されたTargetTs
に達したりする場合があります。これらの動作により、レプリケーション タスクの状態が変化する可能性があります。このセクションでは、TiCDC レプリケーション タスクの状態と、状態間の転送関係について説明します。
上記の状態遷移図の状態は、次のように説明されています。
Normal
: レプリケーション タスクは正常に実行され、checkpoint-ts は正常に進行します。Stopped
: ユーザーが変更フィードを手動で一時停止したため、レプリケーション タスクは停止されています。この状態の変更フィードは、GC 操作をブロックします。Error
: レプリケーション タスクはエラーを返します。いくつかの回復可能なエラーが原因で、レプリケーションを続行できません。この状態の changefeed は、状態がNormal
に移行するまで再開を試み続けます。この状態の変更フィードは、GC 操作をブロックします。Finished
: レプリケーション タスクが完了し、プリセットTargetTs
に達しました。この状態の変更フィードは、GC 操作をブロックしません。Failed
: レプリケーション タスクは失敗します。一部の回復不能なエラーが原因で、レプリケーション タスクを再開できず、回復できません。この状態の変更フィードは、GC 操作をブロックしません。
上記の状態遷移図の番号は、次のように記述されます。
- ①
changefeed pause
コマンドを実行します。 - ②
changefeed resume
コマンドを実行してレプリケーションタスクを再開します。 - ③
changefeed
動作中に回復可能なエラーが発生し、自動的に動作が再開されます。 - ④
changefeed resume
コマンドを実行してレプリケーションタスクを再開します。 - ⑤
changefeed
目の操作で回復不可能なエラーが発生した。 - ⑥
changefeed
がプリセットTargetTs
に到達し、レプリケーションが自動的に停止されます。 - ⑦
changefeed
はgc-ttl
で指定された期間を超えて停止し、再開することはできません。 - ⑧
changefeed
は、自動回復を実行しようとしたときに、回復不能なエラーが発生しました。
レプリケーション タスクを作成する
次のコマンドを実行して、レプリケーション タスクを作成します。
cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:123456@127.0.0.1:3306/" --changefeed-id="simple-replication-task" --sort-engine="unified"
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"mysql://root:123456@127.0.0.1:3306/","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
--changefeed-id
: レプリケーション タスクの ID。形式は^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$
の正規表現と一致する必要があります。この ID が指定されていない場合、TiCDC は ID として UUID (バージョン 4 形式) を自動的に生成します。--sink-uri
: レプリケーション タスクのダウンストリーム アドレス。--sink-uri
を次の形式に従って構成します。現在、スキームはmysql
/tidb
/kafka
/pulsar
/s3
/local
をサポートしています。[scheme]://[userinfo@][host]:[port][/path]?[query_parameters]URI に特殊文字が含まれている場合、URL エンコーディングを使用してこれらの特殊文字を処理する必要があります。
--start-ts
:changefeed
の開始 TSO を指定します。この TSO から、TiCDC クラスターはデータのプルを開始します。デフォルト値は現在の時刻です。--target-ts
:changefeed
の終了 TSO を指定します。この TSO に対して、TiCDC クラスターはデータのプルを停止します。デフォルト値は空です。これは、TiCDC がデータのプルを自動的に停止しないことを意味します。--sort-engine
:changefeed
のソート エンジンを指定します。 TiDB と TiKV は分散アーキテクチャを採用しているため、TiCDC はデータの変更をシンクに書き込む前にソートする必要があります。このオプションはunified
(デフォルト)/memory
/file
をサポートします。unified
:unified
を使用すると、TiCDC はメモリ内でのデータの並べ替えを優先します。メモリが不足している場合、TiCDC は自動的にディスクを使用して一時データを保存します。これはデフォルト値の--sort-engine
です。memory
: メモリ内のデータ変更をソートします。大量のデータをレプリケートすると OOM が簡単にトリガーされるため、この並べ替えエンジンの使用はお勧めしません。file
: ディスクを完全に使用して一時データを格納します。この機能は非推奨です。どのような状況でも使用することはお勧めしません。
--config
:changefeed
の構成ファイルを指定します。sort-dir
: ソート エンジンが使用する一時ファイル ディレクトリを指定します。このオプションは、TiDB v4.0.13、v5.0.3、および v5.1.0 以降ではサポートされていないことに注意してください。もう使用しないでください。
mysql
/ tidb
でシンク URI を構成する
サンプル構成:
--sink-uri="mysql://root:123456@127.0.0.1:3306/?worker-count=16&max-txn-row=5000&transaction-atomicity=table"
以下は、 mysql
/ tidb
を使用してシンク URI に構成できるパラメーターとパラメーター値の説明です。
パラメータ/パラメータ値 | 説明 |
---|---|
root | ダウンストリーム データベースのユーザー名 |
123456 | ダウンストリーム データベースのパスワード |
127.0.0.1 | ダウンストリーム データベースの IP アドレス |
3306 | ダウンストリーム データのポート |
worker-count | ダウンストリームに対して同時に実行できる SQL ステートメントの数 (オプション、既定では16 ) |
max-txn-row | ダウンストリームに対して実行できるトランザクション バッチのサイズ (オプション、既定では256 ) |
ssl-ca | ダウンストリームの MySQL インスタンスに接続するために必要な CA 証明書ファイルのパス (オプション) |
ssl-cert | ダウンストリームの MySQL インスタンスに接続するために必要な証明書ファイルのパス (オプション) |
ssl-key | ダウンストリームの MySQL インスタンスに接続するために必要な証明書キー ファイルのパス (オプション) |
time-zone | ダウンストリームの MySQL インスタンスに接続するときに使用されるタイム ゾーン。v4.0.8 以降で有効です。これはオプションのパラメーターです。このパラメーターが指定されていない場合、TiCDC サービス プロセスのタイム ゾーンが使用されます。このパラメータが空の値に設定されている場合、TiCDC がダウンストリームの MySQL インスタンスに接続するときにタイム ゾーンが指定されず、ダウンストリームのデフォルトのタイム ゾーンが使用されます。 |
transaction-atomicity | トランザクションの原子性レベル。これはオプションのパラメーターで、デフォルト値はtable です。値がtable の場合、TiCDC は単一テーブル トランザクションの原子性を保証します。値がnone の場合、TiCDC は単一テーブル トランザクションを分割します。 |
kafka
でシンク URI を構成する
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
以下は、 kafka
のシンク URI に構成できるパラメーターとパラメーター値の説明です。
パラメータ/パラメータ値 | 説明 |
---|---|
127.0.0.1 | ダウンストリーム Kafka サービスの IP アドレス |
9092 | 下流の Kafka のポート |
topic-name | 変数。 Kafka トピックの名前 |
kafka-version | ダウンストリーム Kafka のバージョン (オプション、デフォルトでは2.4.0 現在、サポートされている最も古い Kafka バージョンは0.11.0.2 で、最新のものは3.1.0 です。この値は、ダウンストリーム Kafka の実際のバージョンと一致する必要があります) |
kafka-client-id | レプリケーション タスクの Kafka クライアント ID を指定します (オプション。既定ではTiCDC_sarama_producer_replication ID )。 |
partition-num | ダウンストリーム Kafka パーティションの数 (オプション。値は、実際のパーティション数を超えてはなりません。そうでない場合、レプリケーション タスクは正常に作成されません。デフォルトでは3 ) |
max-message-bytes | 毎回 Kafka ブローカーに送信されるデータの最大サイズ (オプション、デフォルトでは10MB )。 v5.0.6 および v4.0.6 から、デフォルト値が 64MB および 256MB から 10MB に変更されました。 |
replication-factor | 保存できる Kafka メッセージ レプリカの数 (オプション、既定では1 ) |
protocol | メッセージが Kafka に出力されるプロトコル。値のオプションはcanal-json 、 open-protocol 、 canal 、 avro 、およびmaxwell です。 |
auto-create-topic | 渡されたtopic-name が Kafka クラスターに存在しない場合に、TiCDC がトピックを自動的に作成するかどうかを決定します (オプション、デフォルトではtrue )。 |
enable-tidb-extension | オプション。デフォルトではfalse です。出力プロトコルがcanal-json の場合、値がtrue の場合、TiCDC は Resolved イベントを送信し、TiDB 拡張フィールドを Kafka メッセージに追加します。 v6.1.0 から、このパラメーターはavro プロトコルにも適用されます。値がtrue の場合、TiCDC は 3 つの TiDB 拡張フィールドを Kafka メッセージに追加します。 |
max-batch-size | v4.0.9 の新機能。メッセージ プロトコルが 1 つの Kafka メッセージへの複数のデータ変更の出力をサポートしている場合、このパラメーターは 1 つの Kafka メッセージ内のデータ変更の最大数を指定します。現在、Kafka のprotocol がopen-protocol の場合にのみ有効です。 (オプション、デフォルトで16 ) |
enable-tls | TLS を使用してダウンストリーム Kafka インスタンスに接続するかどうか (オプション、デフォルトではfalse ) |
ca | ダウンストリーム Kafka インスタンスに接続するために必要な CA 証明書ファイルのパス (オプション) |
cert | ダウンストリームの Kafka インスタンスに接続するために必要な証明書ファイルのパス (オプション) |
key | ダウンストリーム Kafka インスタンスに接続するために必要な証明書キー ファイルのパス (オプション) |
sasl-user | ダウンストリームの Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証の ID (authcid) (オプション) |
sasl-password | ダウンストリーム Kafka インスタンスに接続するために必要な SASL/PLAIN または SASL/SCRAM 認証のパスワード (オプション) |
sasl-mechanism | ダウンストリーム Kafka インスタンスに接続するために必要な SASL 認証の名前。値はplain 、 scram-sha-256 、 scram-sha-512 、またはgssapi です。 |
sasl-gssapi-auth-type | gssapi 認証タイプ。値はuser またはkeytab です (オプション) |
sasl-gssapi-keytab-path | gssapi キータブ パス (オプション) |
sasl-gssapi-kerberos-config-path | gssapi kerberos 構成パス (オプション) |
sasl-gssapi-service-name | gssapi サービス名 (オプション) |
sasl-gssapi-user | gssapi 認証のユーザー名 (オプション) |
sasl-gssapi-password | gssapi 認証のパスワード (オプション) |
sasl-gssapi-realm | gssapi レルム名 (オプション) |
sasl-gssapi-disable-pafxfast | gssapi PA-FX-FAST を無効にするかどうか (オプション) |
dial-timeout | ダウンストリーム Kafka との接続を確立する際のタイムアウト。デフォルト値は10s です |
read-timeout | ダウンストリーム Kafka から返された応答を取得する際のタイムアウト。デフォルト値は10s です |
write-timeout | ダウンストリーム Kafka にリクエストを送信する際のタイムアウト。デフォルト値は10s です |
avro-decimal-handling-mode | avro プロトコルでのみ有効です。 Avro が DECIMAL フィールドを処理する方法を決定します。値はstring またはprecise で、DECIMAL フィールドを文字列または正確な浮動小数点数にマッピングすることを示します。 |
avro-bigint-unsigned-handling-mode | avro プロトコルでのみ有効です。 Avro が BIGINT UNSIGNED フィールドを処理する方法を決定します。値はstring またはlong で、BIGINT UNSIGNED フィールドを 64 ビットの符号付き数値または文字列にマッピングすることを示します。 |
ベストプラクティス:
- 独自の Kafka トピックを作成することをお勧めします。少なくとも、トピックが Kafka ブローカーに送信できる各メッセージの最大データ量と、ダウンストリーム Kafka パーティションの数を設定する必要があります。 changefeed を作成すると、これら 2 つの設定はそれぞれ
max-message-bytes
とpartition-num
に対応します。 - まだ存在しないトピックで変更フィードを作成すると、TiCDC は
partition-num
とreplication-factor
のパラメーターを使用してトピックを作成しようとします。これらのパラメーターを明示的に指定することをお勧めします。 - ほとんどの場合、
canal-json
プロトコルを使用することをお勧めします。
ノート:
protocol
がopen-protocol
の場合、TiCDC は長さがmax-message-bytes
を超えるメッセージの生成を回避しようとします。ただし、1 つの変更だけで長さがmax-message-bytes
を超える行が非常に大きい場合、TiCDC はサイレント エラーを回避するために、このメッセージを出力しようとし、ログに警告を出力します。
TiCDC は Kafka の認証と承認を使用します
以下は、Kafka SASL 認証を使用する場合の例です。
SASL/プレーン
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"SASL/スクラム
SCRAM-SHA-256 と SCRAM-SHA-512 は PLAIN メソッドに似ています。対応する認証方法として
sasl-mechanism
を指定するだけです。SASL/GSSAPI
SASL/GSSAPI
user
認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"sasl-gssapi-user
とsasl-gssapi-realm
の値は、kerberos で指定された原理に関連しています。たとえば、原則がalice/for-kafka@example.com
に設定されている場合、sasl-gssapi-user
とsasl-gssapi-realm
はそれぞれalice/for-kafka
とexample.com
として指定されます。SASL/GSSAPI
keytab
認証:--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"SASL/GSSAPI 認証方式の詳細については、 GSSAPI の設定を参照してください。
TLS/SSL 暗号化
Kafka ブローカーで TLS/SSL 暗号化が有効になっている場合は、
-enable-tls=true
パラメーターを--sink-uri
に追加する必要があります。自己署名証明書を使用する場合は、--sink-uri
でca
、cert
、およびkey
も指定する必要があります。ACL 承認
TiCDC が適切に機能するために必要な最小限のアクセス許可セットは次のとおりです。
- トピックリソースタイプの
Create
とWrite
のアクセス許可。 - クラスタリソース タイプの
DescribeConfigs
のアクセス許可。
- トピックリソースタイプの
TiCDC を Kafka Connect (コンフルエント プラットフォーム) と統合する
Confluent が提供するデータ コネクタを使用してデータをリレーショナル データベースまたは非リレーショナル データベースにストリーミングするには、 avro
プロトコルを使用してコンフルエント スキーマ レジストリ in schema-registry
の URL を提供する必要があります。
サンプル構成:
--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]
詳細な統合ガイドについては、 TiDB と Confluent Platform の統合に関するクイック スタート ガイドを参照してください。
pulsar
でシンク URI を構成する
サンプル構成:
--sink-uri="pulsar://127.0.0.1:6650/topic-name?connectionTimeout=2s"
以下は、 pulsar
でシンク URI に構成できるパラメーターの説明です。
パラメータ | 説明 |
---|---|
connectionTimeout | ダウンストリーム Pulsar への接続を確立するためのタイムアウト。これはオプションであり、デフォルトは 30 (秒) です。 |
operationTimeout | ダウンストリーム Pulsar で操作を実行するためのタイムアウト。これはオプションであり、デフォルトは 30 (秒) です。 |
tlsTrustCertsFilePath | ダウンストリームの Pulsar インスタンスに接続するために必要な CA 証明書ファイルのパス (オプション) |
tlsAllowInsecureConnection | TLS が有効になった後に暗号化されていない接続を許可するかどうかを決定します (オプション) |
tlsValidateHostname | ダウンストリーム Pulsar からの証明書のホスト名を検証するかどうかを決定します (オプション) |
maxConnectionsPerBroker | 単一のダウンストリーム Pulsar ブローカーに許可される接続の最大数。これはオプションで、デフォルトは 1 です。 |
auth.tls | TLS モードを使用して、下流のパルサーを検証します (オプション)。たとえば、 auth=tls&auth.tlsCertFile=/path/to/cert&auth.tlsKeyFile=/path/to/key です。 |
auth.token | トークン モードを使用して、下流のパルサーを検証します (オプション)。たとえば、 auth=token&auth.token=secret-token またはauth=token&auth.file=path/to/secret-token-file です。 |
name | TiCDC のパルサー プロデューサーの名前 (オプション) |
protocol | メッセージがパルサーに出力されるプロトコル。値のオプションはcanal-json 、 open-protocol 、 canal 、 avro 、およびmaxwell です。 |
maxPendingMessages | 保留中のメッセージ キューの最大サイズを設定します。これはオプションで、デフォルトは 1000 です。たとえば、Pulsar からの確認メッセージを保留します。 |
disableBatching | バッチでのメッセージの自動送信を無効にします (オプション) |
batchingMaxPublishDelay | 送信されたメッセージがバッチ化される期間を設定します (デフォルト: 10ms) |
compressionType | メッセージの送信に使用される圧縮アルゴリズムを設定します (オプション)。値のオプションはNONE 、 LZ4 、 ZLIB 、およびZSTD です。 (デフォルトではNONE ) |
hashingScheme | メッセージの送信先のパーティションを選択するために使用されるハッシュ アルゴリズム (オプション)。値のオプションはJavaStringHash (デフォルト) とMurmur3 です。 |
properties.* | TiCDC の Pulsar プロデューサーに追加されたカスタマイズされたプロパティ (オプション)。たとえば、 properties.location=Hangzhou です。 |
Pulsar のその他のパラメータについては、 pulsar-client-go ClientOptionsおよびpulsar-client-go ProducerOptionsを参照してください。
タスク構成ファイルを使用する
レプリケーション構成の詳細 (単一テーブルのレプリケーションを指定するなど) については、 タスク構成ファイルを参照してください。
構成ファイルを使用して、次の方法でレプリケーション タスクを作成できます。
cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:123456@127.0.0.1:3306/" --config changefeed.toml
上記のコマンドで、 changefeed.toml
はレプリケーション タスクの構成ファイルです。
レプリケーション タスク リストを照会する
次のコマンドを実行して、レプリケーション タスク リストを照会します。
cdc cli changefeed list --pd=http://10.0.10.25:2379
[{
"id": "simple-replication-task",
"summary": {
"state": "normal",
"tso": 417886179132964865,
"checkpoint": "2020-07-07 16:07:44.881",
"error": null
}
}]
checkpoint
は、この時点より前に TiCDC が既にデータをダウンストリームにレプリケートしたことを示します。state
は、レプリケーション タスクの状態を示します。normal
: レプリケーション タスクは正常に実行されます。stopped
: レプリケーション タスクは停止しています (手動で一時停止)。error
: レプリケーション タスクは (エラーにより) 停止されました。removed
: レプリケーション タスクは削除されます。この状態のタスクは、オプション--all
を指定した場合にのみ表示されます。このオプションが指定されていない場合にこれらのタスクを表示するには、changefeed query
コマンドを実行します。finished
: レプリケーション タスクが完了しました (データはtarget-ts
にレプリケートされます)。この状態のタスクは、オプション--all
を指定した場合にのみ表示されます。このオプションが指定されていない場合にこれらのタスクを表示するには、changefeed query
コマンドを実行します。
特定のレプリケーション タスクを照会する
特定のレプリケーション タスクを照会するには、 changefeed query
コマンドを実行します。クエリ結果には、タスク情報とタスク状態が含まれます。 --simple
または-s
引数を指定して、基本的なレプリケーション状態とチェックポイント情報のみを含むクエリ結果を簡素化できます。この引数を指定しない場合、詳細なタスク構成、レプリケーション状態、およびレプリケーション テーブル情報が出力されます。
cdc cli changefeed query -s --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task
{
"state": "normal",
"tso": 419035700154597378,
"checkpoint": "2020-08-27 10:12:19.579",
"error": null
}
上記のコマンドと結果:
state
は、現在のchangefeed
の複製状態です。各状態はchangefeed list
の状態と一致している必要があります。tso
は、現在のchangefeed
でダウンストリームに正常に複製された最大のトランザクション TSO を表します。checkpoint
は、ダウンストリームに正常に複製された現在のchangefeed
の最大トランザクション TSO の対応する時間を表します。error
は、現在のchangefeed
でエラーが発生したかどうかを記録します。
cdc cli changefeed query --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task
{
"info": {
"sink-uri": "mysql://127.0.0.1:3306/?max-txn-row=20\u0026worker-number=4",
"opts": {},
"create-time": "2020-08-27T10:33:41.687983832+08:00",
"start-ts": 419036036249681921,
"target-ts": 0,
"admin-job-type": 0,
"sort-engine": "unified",
"sort-dir": ".",
"config": {
"case-sensitive": true,
"enable-old-value": false,
"filter": {
"rules": [
"*.*"
],
"ignore-txn-start-ts": null,
"ddl-allow-list": null
},
"mounter": {
"worker-num": 16
},
"sink": {
"dispatchers": null,
},
"scheduler": {
"type": "table-number",
"polling-time": -1
}
},
"state": "normal",
"history": null,
"error": null
},
"status": {
"resolved-ts": 419036036249681921,
"checkpoint-ts": 419036036249681921,
"admin-job-type": 0
},
"count": 0,
"task-status": [
{
"capture-id": "97173367-75dc-490c-ae2d-4e990f90da0f",
"status": {
"tables": {
"47": {
"start-ts": 419036036249681921
}
},
"operation": null,
"admin-job-type": 0
}
}
]
}
上記のコマンドと結果:
info
は、照会されたchangefeed
の複製構成です。status
は、照会されたchangefeed
の複製状態です。resolved-ts
: 現在のchangefeed
の中で最大のトランザクションTS
。このTS
は TiKV から TiCDC に正常に送信されていることに注意してください。checkpoint-ts
: 現在のchangefeed
の中で最大のトランザクションTS
。このTS
はダウンストリームに正常に書き込まれていることに注意してください。admin-job-type
:changefeed
のステータス:0
: 状態は正常です。1
: タスクは一時停止されています。タスクが一時停止すると、レプリケートされたすべてのprocessor
が終了します。タスクの構成とレプリケーション ステータスが保持されるため、タスクをcheckpiont-ts
から再開できます。2
: タスクは再開されます。レプリケーション タスクはcheckpoint-ts
から再開します。3
: タスクは削除されます。タスクが削除されると、複製されたすべてのprocessor
が終了し、複製タスクの構成情報がクリアされます。以降のクエリでは、レプリケーション ステータスのみが保持されます。
task-status
は、照会されたchangefeed
の各複製サブタスクの状態を示します。
レプリケーション タスクを一時停止する
次のコマンドを実行して、レプリケーション タスクを一時停止します。
cdc cli changefeed pause --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task
上記のコマンドで:
--changefeed-id=uuid
は、一時停止するレプリケーション タスクに対応するchangefeed
の ID を表します。
レプリケーション タスクを再開する
次のコマンドを実行して、一時停止したレプリケーション タスクを再開します。
cdc cli changefeed resume --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task
上記のコマンドで:
--changefeed-id=uuid
は、再開するレプリケーション タスクに対応するchangefeed
の ID を表します。
レプリケーション タスクを削除する
次のコマンドを実行して、レプリケーション タスクを削除します。
cdc cli changefeed remove --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task
上記のコマンドで:
--changefeed-id=uuid
は、削除するレプリケーション タスクに対応するchangefeed
の ID を表します。
タスク構成の更新
v4.0.4 以降、TiCDC はレプリケーション タスクの構成の変更をサポートしています (動的ではありません)。 changefeed
構成を変更するには、タスクを一時停止し、構成を変更してから、タスクを再開します。
cdc cli changefeed pause -c test-cf --pd=http://10.0.10.25:2379
cdc cli changefeed update -c test-cf --pd=http://10.0.10.25:2379 --sink-uri="mysql://127.0.0.1:3306/?max-txn-row=20&worker-number=8" --config=changefeed.toml
cdc cli changefeed resume -c test-cf --pd=http://10.0.10.25:2379
現在、次の構成項目を変更できます。
changefeed
のsink-uri
。changefeed
の構成ファイルと、ファイル内のすべての構成アイテム。- ファイルの並べ替え機能と並べ替えディレクトリを使用するかどうか。
changefeed
のtarget-ts
。
レプリケーション サブタスクの処理単位を管理する ( processor
)
processor
のリストをクエリします。cdc cli processor list --pd=http://10.0.10.25:2379[ { "id": "9f84ff74-abf9-407f-a6e2-56aa35b33888", "capture-id": "b293999a-4168-4988-a4f4-35d9589b226b", "changefeed-id": "simple-replication-task" } ]特定のレプリケーション タスクのステータスに対応する特定の
changefeed
を照会します。cdc cli processor query --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task --capture-id=b293999a-4168-4988-a4f4-35d9589b226b{ "status": { "tables": { "56": { # ID of the replication table, corresponding to tidb_table_id of a table in TiDB "start-ts": 417474117955485702 } }, "operation": null, "admin-job-type": 0 }, "position": { "checkpoint-ts": 417474143881789441, "resolved-ts": 417474143881789441, "count": 0 } }上記のコマンドでは:
status.tables
: 各キー番号はレプリケーション テーブルの ID を表し、TiDB のテーブルのtidb_table_id
に対応します。resolved-ts
: 現在のプロセッサでソートされたデータの中で最大の TSO。checkpoint-ts
: 現在のプロセッサでダウンストリームに正常に書き込まれた最大の TSO。
タスク構成ファイル
このセクションでは、レプリケーション タスクの構成について説明します。
# Specifies whether the database names and tables in the configuration file are case-sensitive.
# The default value is true.
# This configuration item affects configurations related to filter and sink.
case-sensitive = true
# Specifies whether to output the old value. New in v4.0.5. Since v5.0, the default value is `true`.
enable-old-value = true
[filter]
# Ignores the transaction of specified start_ts.
ignore-txn-start-ts = [1, 2]
# Filter rules.
# Filter syntax: https://docs.pingcap.com/tidb/stable/table-filter#syntax.
rules = ['*.*', '!test.*']
[mounter]
# mounter thread counts, which is used to decode the TiKV output data.
worker-num = 16
[sink]
# For the sink of MQ type, you can use dispatchers to configure the event dispatcher.
# Since v6.1, TiDB supports two types of event dispatchers: partition and topic. For more information, see the following section.
# The matching syntax of matcher is the same as the filter rule syntax. For details about the matcher rules, see the following section.
dispatchers = [
{matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" },
{matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" },
{matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"},
{matcher = ['test6.*'], partition = "ts"}
]
# For the sink of MQ type, you can specify the protocol format of the message.
# Currently the following protocols are supported: canal-json, open-protocol, canal, avro, and maxwell.
protocol = "canal-json"
互換性に関する注意事項
- TiCDC v4.0.0 では、
ignore-txn-commit-ts
が削除され、ignore-txn-start-ts
が追加され、start_ts を使用してトランザクションをフィルタリングします。 - TiCDC v4.0.2 では、
db-dbs
/db-tables
/ignore-dbs
/ignore-tables
が削除され、データベースとテーブルに新しいフィルター ルールを使用するrules
が追加されました。詳細なフィルター構文については、 テーブル フィルターを参照してください。
Kafka Sink のトピックおよびパーティション ディスパッチャーのルールをカスタマイズする
マッチャーのルール
前のセクションの例では:
- マッチャー ルールに一致するテーブルについては、対応するトピック式で指定されたポリシーに従ってディスパッチされます。たとえば、
test3.aa
テーブルは「トピック式 2」に従ってディスパッチされます。test5.aa
テーブルは「トピック式 3」に従ってディスパッチされます。 - 複数のマッチャー ルールに一致するテーブルの場合、最初に一致したトピック式に従ってディスパッチされます。たとえば、「トピック表現 1」に従って、
test1.aa
のテーブルが分散されます。 - どのマッチャー ルールにも一致しないテーブルの場合、対応するデータ変更イベントが
--sink-uri
で指定されたデフォルト トピックに送信されます。たとえば、test10.aa
テーブルはデフォルト トピックに送信されます。 - マッチャー ルールに一致するが、トピック ディスパッチャーが指定されていないテーブルの場合、対応するデータ変更は
--sink-uri
で指定されたデフォルト トピックに送信されます。たとえば、test6.aa
テーブルはデフォルト トピックに送信されます。
トピック ディスパッチャ
topic = "xxx" を使用してトピック ディスパッチャを指定し、トピック式を使用して柔軟なトピック ディスパッチ ポリシーを実装できます。トピックの総数は 1000 未満にすることをお勧めします。
Topic 式の形式は[prefix]{schema}[middle][{table}][suffix]
です。
prefix
: オプション。トピック名のプレフィックスを示します。{schema}
: 必須。スキーマ名と一致させるために使用されます。middle
: オプション。スキーマ名とテーブル名の間の区切り文字を示します。{table}
: オプション。テーブル名と一致させるために使用されます。suffix
: オプション。トピック名のサフィックスを示します。
prefix
、 middle
、およびsuffix
には、次の文字のみを含めることができます: a-z
、 A-Z
、 0-9
、 .
、 _
、および-
。 {schema}
と{table}
は両方とも小文字です。 {Schema}
や{TABLE}
などのプレースホルダーは無効です。
いくつかの例:
matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"
test1.table1
に対応するデータ変更イベントは、hello_test1_table1
という名前のトピックに送信されます。test2.table2
に対応するデータ変更イベントは、hello_test2_table2
という名前のトピックに送信されます。
matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"
test3
のすべてのテーブルに対応するデータ変更イベントは、hello_test3_world
という名前のトピックに送信されます。test4
のすべてのテーブルに対応するデータ変更イベントは、hello_test4_world
という名前のトピックに送信されます。
matcher = ['*.*'], topic = "{schema}_{table}"
- TiCDC がリッスンするすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。たとえば、
test.account
テーブルの場合、TiCDC はそのデータ変更ログをtest_account
という名前のトピックにディスパッチします。
- TiCDC がリッスンするすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。たとえば、
DDL イベントのディスパッチ
スキーマレベルの DDL
create database
やdrop database
など、特定のテーブルに関連付けられていない DDL は、スキーマ レベルの DDL と呼ばれます。スキーマレベルの DDL に対応するイベントは、 --sink-uri
で指定されたデフォルトのトピックに送信されます。
テーブルレベルの DDL
alter table
やcreate table
など、特定のテーブルに関連する DDL はテーブルレベル DDL と呼ばれます。テーブルレベルの DDL に対応するイベントは、ディスパッチャの構成に従って、対応するトピックに送信されます。
たとえば、 matcher = ['test.*'], topic = {schema}_{table}
のようなディスパッチャーの場合、DDL イベントは次のようにディスパッチされます。
- DDL イベントに含まれるテーブルが 1 つの場合、DDL イベントは対応するトピックにそのまま送信されます。たとえば、DDL イベント
drop table test.table1
の場合、イベントはtest_table1
という名前のトピックに送信されます。 - DDL イベントに複数のテーブルが含まれる場合 (
rename table
/drop table
/drop view
は複数のテーブルが含まれる場合があります)、DDL イベントは複数のイベントに分割され、対応するトピックに送信されます。たとえば、DDL イベントrename table test.table1 to test.table10, test.table2 to test.table20
の場合、イベントrename table test.table1 to test.table10
はtest_table1
という名前のトピックに送信され、イベントrename table test.table2 to test.table20
はtest.table2
という名前のトピックに送信されます。
区画ディスパッチャー
partition = "xxx"
を使用して、パーティション ディスパッチャーを指定できます。デフォルト、ts、インデックス値、およびテーブルの 4 つのディスパッチャがサポートされています。ディスパッチャのルールは次のとおりです。
- デフォルト: 複数の一意のインデックス (主キーを含む) が存在する場合、または古い値機能が有効になっている場合、イベントはテーブル モードでディスパッチされます。一意のインデックス (または主キー) が 1 つだけ存在する場合、イベントはインデックス値モードで送出されます。
- ts: 行変更の commitTs を使用して、イベントをハッシュおよびディスパッチします。
- index-value: 主キーの値またはテーブルの一意のインデックスを使用して、イベントをハッシュしてディスパッチします。
- table: テーブルのスキーマ名とテーブル名を使用して、イベントをハッシュしてディスパッチします。
ノート:
v6.1 以降、構成の意味を明確にするために、パーティション ディスパッチャーを指定するために使用される構成が
dispatcher
からpartition
に変更されましたpartition
はdispatcher
のエイリアスです。たとえば、次の 2 つのルールはまったく同じです。
[sink] dispatchers = [ {matcher = ['*.*'], dispatcher = "ts"}, {matcher = ['*.*'], partition = "ts"}, ]ただし、
dispatcher
とpartition
を同じルールに含めることはできません。たとえば、次のルールは無効です。
{matcher = ['*.*'], dispatcher = "ts", partition = "table"},
行変更イベントの履歴値を出力v4.0.5 の新機能
デフォルトの構成では、レプリケーション タスクの TiCDC Open Protocol 出力の Row Changed Event には、変更前の値ではなく、変更された値のみが含まれます。したがって、出力値は、TiCDC Open Protocol のコンシューマー側で行変更イベントの履歴値として使用することはできません。
v4.0.5 以降、TiCDC は行変更イベントの履歴値の出力をサポートしています。この機能を有効にするには、ルート レベルのchangefeed
の構成ファイルで次の構成を指定します。
enable-old-value = true
この機能は、v5.0 以降、デフォルトで有効になっています。この機能を有効にした後の TiCDC Open Protocol の出力形式については、 TiCDC オープン プロトコル - 行変更イベントを参照してください。
照合の新しいフレームワークを有効にしてテーブルを複製する
v4.0.15、v5.0.4、v5.1.1、および v5.2.0 以降、TiCDC は照合のための新しいフレームワークを有効にしたテーブルをサポートします。
有効なインデックスのないテーブルをレプリケートする
v4.0.8 以降、TiCDC は、タスク構成を変更することにより、有効なインデックスを持たないテーブルの複製をサポートします。この機能を有効にするには、 changefeed
構成ファイルで次のように構成します。
enable-old-value = true
force-replicate = true
ユニファイドソーター
ユニファイド ソーターは、TiCDC のソーティング エンジンです。次のシナリオによって発生する OOM の問題を軽減できます。
- TiCDC のデータ レプリケーション タスクは長時間一時停止されます。その間、大量の増分データが蓄積され、レプリケートする必要があります。
- データ複製タスクは早いタイムスタンプから開始されるため、大量の増分データを複製する必要があります。
v4.0.13 以降のcdc cli
を使用して作成された変更フィードの場合、Unified Sorter はデフォルトで有効になっています。 v4.0.13 より前に存在していた変更フィードについては、以前の構成が使用されます。
ユニファイド ソーター機能が変更フィードで有効になっているかどうかを確認するには、次のコマンド例を実行します (PD インスタンスの IP アドレスがhttp://10.0.10.25:2379
であると仮定します)。
cdc cli --pd="http://10.0.10.25:2379" changefeed query --changefeed-id=simple-replication-task | grep 'sort-engine'
上記のコマンドの出力で、値sort-engine
が「unified」の場合、変更フィードでユニファイド ソーターが有効になっていることを意味します。
ノート:
- サーバーが機械式ハード ドライブまたはその他のストレージ デバイスを使用しており、レイテンシーが大きいか帯域幅が限られている場合は、統合ソーターを慎重に使用してください。
- デフォルトでは、Unified Sorter は
data_dir
を使用して一時ファイルを保存します。空きディスク容量が 500 GiB 以上であることを確認することをお勧めします。実稼働環境では、各ノードの空きディスク容量が (ビジネスで許容される最大checkpoint-ts
遅延) * (ビジネス ピーク時のアップストリーム書き込みトラフィック) より大きいことを確認することをお勧めします。また、changefeed
の作成後に大量の履歴データをレプリケートする予定がある場合は、各ノードの空き容量がレプリケートされたデータの量よりも多いことを確認してください。- 統合ソーターはデフォルトで有効になっています。サーバーが上記の要件に一致せず、統合ソーターを無効にする場合は、changefeed の
sort-engine
からmemory
を手動で設定する必要があります。memory
を使用してソートする既存の変更フィードでユニファイド ソーターを有効にするには、 タスクの中断後に TiCDC が再起動された後に発生する OOM を処理するにはどうすればよいですか?で提供されているメソッドを参照してください。
災害シナリオにおける結果整合性レプリケーション
v5.3.0 以降、TiCDC はアップストリームの TiDB クラスターから S3 ストレージまたはダウンストリーム クラスターの NFS ファイル システムへの増分データのバックアップをサポートします。アップストリーム クラスターが災害に遭遇して利用できなくなった場合、TiCDC はダウンストリーム データを最新の結果整合性のある状態に復元できます。これは、TiCDC が提供する結果整合性のあるレプリケーション機能です。この機能を使用すると、アプリケーションをダウンストリーム クラスターにすばやく切り替えて、長時間のダウンタイムを回避し、サービスの継続性を向上させることができます。
現在、TiCDC は、TiDB クラスターから別の TiDB クラスターまたは MySQL 互換データベース システム ( Aurora、MySQL、および MariaDB を含む) に増分データを複製できます。アップストリーム クラスターがクラッシュした場合、災害前の TiCDC のレプリケーション ステータスが正常であり、レプリケーション ラグが小さいという条件を考えると、TiCDC は 5 分以内にダウンストリーム クラスターのデータを復元できます。最大で 10 秒のデータ損失が許容されます。つまり、RTO <= 5 分、および P95 RPO <= 10 秒です。
次のシナリオでは、TiCDC のレプリケーション ラグが増加します。
- 短時間でTPSが大幅に上昇
- アップストリームで大規模または長時間のトランザクションが発生する
- アップストリームの TiKV または TiCDC クラスターがリロードまたはアップグレードされている
add index
などの時間のかかる DDL ステートメントはアップストリームで実行されます。- PD はアグレッシブなスケジューリング戦略で構成されているため、リージョンリーダーが頻繁に異動したり、リージョンの合併やリージョンの分割が頻繁に発生したりします。
前提条件
- TiCDC のリアルタイム増分データ バックアップ ファイルを格納するために、高可用性 Amazon S3 ストレージまたは NFS システムを準備します。これらのファイルには、プライマリ クラスタの障害が発生した場合にアクセスできます。
- 災害シナリオで結果整合性を確保する必要がある変更フィードに対して、この機能を有効にします。これを有効にするには、changefeed 構成ファイルに次の構成を追加します。
[consistent]
# Consistency level. Options include:
# - none: the default value. In a non-disaster scenario, eventual consistency is only guaranteed if and only if finished-ts is specified.
# - eventual: Uses redo log to guarantee eventual consistency in case of the primary cluster disasters.
level = "eventual"
# Individual redo log file size, in MiB. By default, it's 64. It is recommended to be no more than 128.
max-log-size = 64
# The interval for flushing or uploading redo logs to S3, in milliseconds. By default, it's 1000. The recommended range is 500-2000.
flush-interval = 1000
# Form of storing redo log, including nfs (NFS directory) and S3 (uploading to S3).
storage = "s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
災害からの回復
主クラスタで障害が発生した場合、 cdc redo
コマンドを実行して副クラスタで手動で復旧する必要があります。回復プロセスは次のとおりです。
- すべての TiCDC プロセスが終了していることを確認します。これは、データ リカバリ中にプライマリ クラスタがサービスを再開するのを防ぎ、TiCDC がデータ同期を再開するのを防ぐためです。
- データの回復には cdc バイナリを使用します。次のコマンドを実行します。
cdc redo apply --tmp-dir="/tmp/cdc/redo/apply" \
--storage="s3://logbucket/test-changefeed?endpoint=http://10.0.10.25:24927/" \
--sink-uri="mysql://normal:123456@10.0.10.55:3306/"
このコマンドでは:
tmp-dir
: TiCDC 増分データ バックアップ ファイルをダウンロードするための一時ディレクトリを指定します。storage
: Amazon S3 ストレージまたは NFS ディレクトリのいずれかで、TiCDC 増分データ バックアップ ファイルを保存するためのアドレスを指定します。sink-uri
: データを復元するセカンダリ クラスタ アドレスを指定します。スキームはmysql
のみです。