重要
このページは英語版のページを機械翻訳しています。原文はこちらからご覧ください。

TiCDCクラスターおよびレプリケーションタスクの管理

このドキュメントでは、TiUPを使用してTiCDCクラスタの構成を変更する方法、およびコマンドラインツールcdc cliを使用してTiCDCクラスタとレプリケーションタスクを管理する方法について説明します。

HTTPインターフェイス(TiCDC OpenAPI機能)を使用して、TiCDCクラスタおよびレプリケーションタスクを管理することもできます。詳細については、 TiCDC OpenAPIを参照してください。

TiUPを使用してTiCDCをアップグレードする

このセクションでは、TiUPを使用してTiCDCクラスタをアップグレードする方法を紹介します。次の例では、TiCDCとTiDBクラスタ全体をv6.1.0にアップグレードする必要があると想定しています。

tiup update --self && \
tiup update --all && \
tiup cluster upgrade <cluster-name> v6.1.0

アップグレードに関する注意事項

TiUPを使用してTiCDC構成を変更する

このセクションでは、TiUPのtiup cluster edit-configコマンドを使用してTiCDCクラスタの構成を変更する方法を紹介します。次の例では、 gc-ttlの値をデフォルトの86400から3600 、つまり1時間に変更します。

まず、次のコマンドを実行します。 <cluster-name>を実際のクラスタ名に置き換える必要があります。

tiup cluster edit-config <cluster-name>

次に、viエディターページに入り、 server-configsの下のcdc構成を変更します。構成を以下に示します。

 server_configs:
  tidb: {}
  tikv: {}
  pd: {}
  tiflash: {}
  tiflash-learner: {}
  pump: {}
  drainer: {}
  cdc:
    gc-ttl: 3600

変更後、 tiup cluster reload -R cdcコマンドを実行して構成を再ロードします。

TLSを使用する

暗号化データ送信(TLS)の使用の詳細については、 TiDBコンポーネント間のTLSを有効にするを参照してください。

cdc cliを使用して、クラスタステータスとデータレプリケーションタスクを管理します

このセクションでは、 cdc cliを使用してTiCDCクラスタおよびデータ複製タスクを管理する方法を紹介します。 cdc cliは、 cdcバイナリを使用して実行されるcliサブコマンドです。以下の説明は、次のことを前提としています。

  • cliコマンドはcdcバイナリを使用して直接実行されます。
  • PDは10.0.10.25でリッスンし、ポートは2379です。

ノート:

PDがリッスンするIPアドレスとポートは、 pd-serverの起動時に指定されたadvertise-client-urlsつのパラメーターに対応します。複数のpd-serverには複数のadvertise-client-urlsパラメーターがあり、1つまたは複数のパラメーターを指定できます。たとえば、 --pd=http://10.0.10.25:2379または--pd=http://10.0.10.25:2379,http://10.0.10.26:2379,http://10.0.10.27:2379

TiUPを使用してTiCDCを展開する場合は、次のコマンドのcdc clitiup ctl cdcに置き換えます。

TiCDCサービスの進捗状況を管理する( capture

  • captureのリストを照会します。

    cdc cli capture list --pd=http://10.0.10.25:2379
    
    [
      {
        "id": "806e3a1b-0e31-477f-9dd6-f3f2c570abdd",
        "is-owner": true,
        "address": "127.0.0.1:8300"
      },
      {
        "id": "ea2a4203-56fe-43a6-b442-7b295f458ebc",
        "is-owner": false,
        "address": "127.0.0.1:8301"
      }
    ]
    
    • id :サービスプロセスのID。
    • is-owner :サービスプロセスが所有者ノードであるかどうかを示します。
    • address :サービスプロセスが外部へのインターフェイスを提供するために使用するアドレス。

レプリケーションタスクの管理( changefeed

レプリケーションタスクの状態転送

レプリケーションタスクの状態は、レプリケーションタスクの実行ステータスを表します。 TiCDCの実行中に、レプリケーションタスクがエラーで失敗したり、手動で一時停止、再開したり、指定されたTargetTsに到達したりする場合があります。これらの動作は、レプリケーションタスクの状態の変化につながる可能性があります。このセクションでは、TiCDCレプリケーションタスクの状態と状態間の転送関係について説明します。

TiCDC state transfer

上記の状態転送図の状態は、次のように説明されています。

  • Normal :レプリケーションタスクは正常に実行され、チェックポイント-tsは正常に進行します。
  • Stopped :ユーザーが手動でチェンジフィードを一時停止したため、レプリケーションタスクが停止しました。この状態のチェンジフィードは、GC操作をブロックします。
  • Error :レプリケーションタスクがエラーを返します。一部の回復可能なエラーのため、レプリケーションを続行できません。この状態のチェンジフィードは、状態がNormalに移行するまで再開を試み続けます。この状態のチェンジフィードは、GC操作をブロックします。
  • Finished :レプリケーションタスクが終了し、プリセットTargetTsに到達しました。この状態のチェンジフィードは、GC操作をブロックしません。
  • Failed :レプリケーションタスクは失敗します。一部の回復不能なエラーが原因で、レプリケーションタスクを再開できず、回復できません。この状態のチェンジフィードは、GC操作をブロックしません。

上記の状態転送図の番号は次のとおりです。

  • changefeed pauseコマンドを実行します
  • changefeed resumeコマンドを実行してレプリケーションタスクを再開します
  • ③1 changefeedの操作で回復可能なエラーが発生し、自動的に操作を再開します。
  • changefeed resumeコマンドを実行してレプリケーションタスクを再開します
  • ⑤1 changefeedの操作で回復可能なエラーが発生する
  • TargetTs changefeed到達すると、レプリケーションが自動的に停止します。
  • changefeedgc-ttlで指定された期間より長く中断され、再開できません。
  • changefeedは、自動回復を実行しようとしたときに回復不能なエラーが発生しました。

レプリケーションタスクを作成する

次のコマンドを実行して、レプリケーションタスクを作成します。

cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:123456@127.0.0.1:3306/" --changefeed-id="simple-replication-task" --sort-engine="unified"
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"mysql://root:123456@127.0.0.1:3306/","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
  • --changefeed-id :レプリケーションタスクのID。形式は^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$の正規表現と一致する必要があります。このIDが指定されていない場合、TiCDCはIDとしてUUID(バージョン4形式)を自動的に生成します。

  • --sink-uri :レプリケーションタスクのダウンストリームアドレス。次の形式に従って--sink-uriを構成します。現在、このkafkamysql local pulsarしてs3 tidb

    [scheme]://[userinfo@][host]:[port][/path]?[query_parameters]
    

    URIに特殊文字が含まれている場合、URLエンコードを使用してこれらの特殊文字を処理する必要があります。

  • --start-tschangefeedの開始TSOを指定します。このTSOから、TiCDCクラスタはデータのプルを開始します。デフォルト値は現在の時刻です。

  • --target-tschangefeedの終了TSOを指定します。このTSOに対して、TiCDCクラスタはデータのプルを停止します。デフォルト値は空です。これは、TiCDCがデータのプルを自動的に停止しないことを意味します。

  • --sort-enginechangefeedのソートエンジンを指定します。 TiDBとTiKVは分散アーキテクチャを採用しているため、TiCDCはデータの変更をシンクに書き込む前にソートする必要があります。このオプションは、 unified (デフォルト)/ memory / fileをサポートします。

    • unifiedunifiedが使用されている場合、TiCDCはメモリ内のデータ並べ替えを優先します。メモリが不足している場合、TiCDCは自動的にディスクを使用して一時データを保存します。これはデフォルト値の--sort-engineです。
    • memory :メモリ内のデータ変更をソートします。大量のデータを複製するとOOMが簡単にトリガーされるため、この並べ替えエンジンの使用はお勧めしません
    • file :ディスクを完全に使用して一時データを保存します。この機能は非推奨ですいかなる状況でも使用することはお勧めしません
  • --configchangefeedの構成ファイルを指定します。

  • sort-dir :ソートエンジンが使用する一時ファイルディレクトリを指定します。 TiDB v4.0.13、v5.0.3、およびv5.1.0以降、このオプションはサポートされていないことに注意してください。もう使用しないでください

mysql / tidbを使用してシンクURIを構成します

構成例:

--sink-uri="mysql://root:123456@127.0.0.1:3306/?worker-count=16&max-txn-row=5000"

以下は、 mysql / tidbのシンクURI用に構成できるパラメーターとパラメーター値の説明です。

パラメータ/パラメータ値説明
rootダウンストリームデータベースのユーザー名
123456ダウンストリームデータベースのパスワード
127.0.0.1ダウンストリームデータベースのIPアドレス
3306ダウンストリームデータのポート
worker-countダウンストリームに対して同時に実行できるSQLステートメントの数(オプション、デフォルトでは16
max-txn-rowダウンストリームで実行できるトランザクションバッチのサイズ(オプション、デフォルトで256
ssl-caダウンストリームのMySQLインスタンスに接続するために必要なCA証明書ファイルのパス(オプション)
ssl-certダウンストリームのMySQLインスタンスに接続するために必要な証明書ファイルのパス(オプション)
ssl-keyダウンストリームのMySQLインスタンスに接続するために必要な証明書キーファイルのパス(オプション)
time-zoneダウンストリームのMySQLインスタンスに接続するときに使用されるタイムゾーン。v4.0.8以降で有効です。これはオプションのパラメーターです。このパラメーターが指定されていない場合、TiCDCサービスプロセスのタイムゾーンが使用されます。このパラメーターが空の値に設定されている場合、TiCDCがダウンストリームのMySQLインスタンスに接続するときにタイムゾーンが指定されず、ダウンストリームのデフォルトのタイムゾーンが使用されます。

kafkaでシンクURIを構成する

構成例:

--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"

以下は、 kafkaを使用してシンクURIに構成できるパラメーターとパラメーター値の説明です。

パラメータ/パラメータ値説明
127.0.0.1ダウンストリームKafkaサービスのIPアドレス
9092下流のカフカの港
topic-name変数。カフカトピックの名前
kafka-versionダウンストリームKafkaのバージョン(オプション、デフォルトで2.4.0現在、サポートされている最も古いKafkaバージョンは0.11.0.2で、最新のバージョンは2.7.0です。この値はダウンストリームKafkaの実際のバージョンと一致している必要があります)
kafka-client-idレプリケーションタスクのKafkaクライアントIDを指定します(オプション。デフォルトではTiCDC_sarama_producer_replication ID )。
partition-numダウンストリームKafkaパーティションの数(オプション。値は実際のパーティション数以下である必要があります。それ以外の場合、レプリケーションタスクは正常に作成できません。デフォルトでは3 )。
max-message-bytes毎回Kafkaブローカーに送信されるデータの最大サイズ(オプション、デフォルトでは10MB )。 v5.0.6およびv4.0.6から、デフォルト値が64MBおよび256MBから10MBに変更されました。
replication-factor保存できるKafkaメッセージレプリカの数(オプション、デフォルトで1
protocolメッセージがKafkaに出力されるプロトコル。値のmaxwellは、 canal-jsonavro canal open-protocol
auto-create-topic渡されたtopic-nameがKafkaクラスタに存在しない場合にTiCDCがトピックを自動的に作成するかどうかを決定します(オプション、デフォルトでtrue
enable-tidb-extensionオプション。デフォルトではfalse 。出力プロトコルがcanal-jsonの場合、値がtrueの場合、TiCDCは解決済みイベントを送信し、TiDB拡張フィールドをKafkaメッセージに追加します。 v6.1.0以降、このパラメーターはavroプロトコルにも適用できます。値がtrueの場合、TiCDCは3つのTiDB拡張フィールドをKafkaメッセージに追加します。
max-batch-sizev4.0.9の新機能。メッセージプロトコルが1つのKafkaメッセージへの複数のデータ変更の出力をサポートしている場合、このパラメーターは1つのKafkaメッセージでのデータ変更の最大数を指定します。現在、Kafkaのprotocolopen-protocolの場合にのみ有効になります。 (オプション、デフォルトで16
enable-tlsTLSを使用してダウンストリームKafkaインスタンスに接続するかどうか(オプション、デフォルトでfalse
caダウンストリームのKafkaインスタンスに接続するために必要なCA証明書ファイルのパス(オプション)
certダウンストリームのKafkaインスタンスに接続するために必要な証明書ファイルのパス(オプション)
keyダウンストリームのKafkaインスタンスに接続するために必要な証明書キーファイルのパス(オプション)
sasl-userダウンストリームのKafkaインスタンスに接続するために必要なSASL/PLAINまたはSASL/SCRAM認証のID(authcid)(オプション)
sasl-passwordダウンストリームのKafkaインスタンスに接続するために必要なSASL/PLAINまたはSASL/SCRAM認証のパスワード(オプション)
sasl-mechanismダウンストリームのKafkaインスタンスに接続するために必要なSASL認証の名前。 scram-sha-256は、 plain 、またはscram-sha-512にすることができgssapi
sasl-gssapi-auth-typegssapi認証タイプ。値はuserまたはkeytabにすることができます(オプション)
sasl-gssapi-keytab-pathgssapiキータブパス(オプション)
sasl-gssapi-kerberos-config-pathgssapi kerberos構成パス(オプション)
sasl-gssapi-service-namegssapiサービス名(オプション)
sasl-gssapi-usergssapi認証のユーザー名(オプション)
sasl-gssapi-passwordgssapi認証のパスワード(オプション)
sasl-gssapi-realmgssapiレルム名(オプション)
sasl-gssapi-disable-pafxfastgssapi PA-FX-FASTを無効にするかどうか(オプション)
dial-timeoutダウンストリームKafkaとの接続を確立する際のタイムアウト。デフォルト値は10sです
read-timeoutダウンストリームのKafkaから返される応答を取得する際のタイムアウト。デフォルト値は10sです
write-timeoutダウンストリームKafkaにリクエストを送信する際のタイムアウト。デフォルト値は10sです
avro-decimal-handling-modeavroのプロトコルでのみ有効です。 AvroがDECIMALフィールドを処理する方法を決定します。値はstringまたはpreciseで、DECIMALフィールドを文字列または正確な浮動小数点数にマッピングすることを示します。
avro-bigint-unsigned-handling-modeavroのプロトコルでのみ有効です。 AvroがBIGINTUNSIGNEDフィールドを処理する方法を決定します。値はstringまたはlongで、BIGINTUNSIGNEDフィールドを64ビットの符号付き数値または文字列にマッピングすることを示します。

ベストプラクティス:

  • 独自のKafkaトピックを作成することをお勧めします。少なくとも、トピックがKafkaブローカーに送信できる各メッセージのデータの最大量と、ダウンストリームのKafkaパーティションの数を設定する必要があります。チェンジフィードを作成する場合、これら2つの設定はそれぞれmax-message-bytespartition-numに対応します。
  • まだ存在しないトピックを使用してチェンジフィードを作成する場合、TiCDCはpartition-numおよびreplication-factorパラメーターを使用してトピックを作成しようとします。これらのパラメーターを明示的に指定することをお勧めします。
  • ほとんどの場合、 canal-jsonプロトコルを使用することをお勧めします。

ノート:

protocolopen-protocolの場合、TiCDCは長さがmax-message-bytesを超えるメッセージの生成を回避しようとします。ただし、行が大きすぎて1回の変更だけで長さがmax-message-bytesを超える場合、サイレント障害を回避するために、TiCDCはこのメッセージを出力しようとし、ログに警告を出力します。

TiCDCはKafkaの認証と承認を使用します

以下は、KafkaSASL認証を使用する場合の例です。

  • SASL / PLAIN

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-user=alice-user&sasl-password=alice-secret&sasl-mechanism=plain"
    
  • SASL / SCRAM

    SCRAM-SHA-256およびSCRAM-SHA-512は、PLAINメソッドに似ています。対応する認証方法としてsasl-mechanismを指定する必要があります。

  • SASL / GSSAPI

    SASL / GSSAPI user認証:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=user&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-password=alice-secret&sasl-gssapi-realm=example.com"
    

    sasl-gssapi-usersasl-gssapi-realmの値は、Kerberosで指定された原理に関連しています。たとえば、原則がalice/for-kafka@example.comに設定されている場合、 sasl-gssapi-usersasl-gssapi-realmはそれぞれalice/for-kafkaexample.comとして指定されます。

    SASL / GSSAPI keytab認証:

    --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&sasl-mechanism=gssapi&sasl-gssapi-auth-type=keytab&sasl-gssapi-kerberos-config-path=/etc/krb5.conf&sasl-gssapi-service-name=kafka&sasl-gssapi-user=alice/for-kafka&sasl-gssapi-keytab-path=/var/lib/secret/alice.key&sasl-gssapi-realm=example.com"
    

    SASL / GSSAPI認証方法の詳細については、 GSSAPIの構成を参照してください。

  • TLS/SSL暗号化

    KafkaブローカーでTLS/SSL暗号化が有効になっている場合は、 -enable-tls=trueパラメーターを--sink-uriに追加する必要があります。自己署名証明書を使用する場合は、 cert caおよびkeyも指定する必要があり--sink-uri

  • ACL認証

    TiCDCが正しく機能するために必要な最小限の権限セットは次のとおりです。

    • トピックリソースタイプCreateおよびWriteの権限。
    • クラスターリソースタイプのDescribeConfigsのアクセス許可。

TiCDCをKafkaConnect(Confluent Platform)と統合する

Confluentが提供するデータコネクタを使用してデータをリレーショナルデータベースまたは非リレーショナルデータベースにストリーミングするには、 avroプロトコルを使用し、 schema-registryのURLを指定する必要がありコンフルエントなスキーマレジストリ

構成例:

--sink-uri="kafka://127.0.0.1:9092/topic-name?&protocol=avro&replication-factor=3" --schema-registry="http://127.0.0.1:8081" --config changefeed_config.toml
[sink]
dispatchers = [
 {matcher = ['*.*'], topic = "tidb_{schema}_{table}"},
]

詳細な統合ガイドについては、 TiDBとConfluentプラットフォームの統合に関するクイックスタートガイドを参照してください。

pulsarでシンクURIを構成する

警告

これはまだ実験的機能です。実稼働環境では使用しないでください。

構成例:

--sink-uri="pulsar://127.0.0.1:6650/topic-name?connectionTimeout=2s"

以下は、 pulsarを使用してシンクURIに構成できるパラメーターの説明です。

パラメータ説明
connectionTimeoutダウンストリームパルサーへの接続を確立するためのタイムアウト。これはオプションで、デフォルトは30(秒)です。
operationTimeoutダウンストリームパルサーで操作を実行するためのタイムアウト。これはオプションで、デフォルトは30(秒)です。
tlsTrustCertsFilePathダウンストリームのPulsarインスタンスに接続するために必要なCA証明書ファイルのパス(オプション)
tlsAllowInsecureConnectionTLSを有効にした後で暗号化されていない接続を許可するかどうかを決定します(オプション)
tlsValidateHostnameダウンストリームパルサーからの証明書のホスト名を確認するかどうかを決定します(オプション)
maxConnectionsPerBroker単一のダウンストリームPulsarブローカーに許可される接続の最大数。これはオプションであり、デフォルトは1です。
auth.tlsTLSモードを使用して、ダウンストリームのパルサーを検証します(オプション)。たとえば、 auth=tls&auth.tlsCertFile=/path/to/cert&auth.tlsKeyFile=/path/to/key
auth.tokenトークンモードを使用して、ダウンストリームのパルサーを検証します(オプション)。たとえば、 auth=token&auth.token=secret-tokenまたはauth=token&auth.file=path/to/secret-token-file
nameTiCDCのパルサープロデューサーの名前(オプション)
protocolメッセージがパルサーに出力されるプロトコル。値のmaxwellは、 canal-jsonavro canal open-protocol
maxPendingMessages保留中のメッセージキューの最大サイズを設定します。これはオプションで、デフォルトは1000です。たとえば、Pulsarからの確認メッセージの保留中です。
disableBatchingメッセージをバッチで自動的に送信することを無効にします(オプション)
batchingMaxPublishDelay送信されたメッセージがバッチ処理される期間を設定します(デフォルト:10ms)
compressionTypeメッセージの送信に使用される圧縮アルゴリズムを設定します(オプション)。値のオプションは、 NONEZSTD ZLIB LZ4 (デフォルトではNONE
hashingSchemeメッセージの送信先のパーティションを選択するために使用されるハッシュアルゴリズム(オプション)。値のオプションはJavaStringHash (デフォルト)とMurmur3です。
properties.*TiCDCのパルサープロデューサーに追加されたカスタマイズされたプロパティ(オプション)。たとえば、 properties.location=Hangzhou

パルサーのその他のパラメーターについては、 pulsar-client-go ClientOptionspulsar-client-go ProducerOptionsを参照してください。

タスク構成ファイルを使用する

その他のレプリケーション構成(たとえば、単一のテーブルのレプリケーションを指定する)については、 タスク構成ファイルを参照してください。

構成ファイルを使用して、次の方法でレプリケーションタスクを作成できます。

cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:123456@127.0.0.1:3306/" --config changefeed.toml

上記のコマンドで、 changefeed.tomlはレプリケーションタスクの構成ファイルです。

レプリケーションタスクリストをクエリする

次のコマンドを実行して、レプリケーションタスクリストを照会します。

cdc cli changefeed list --pd=http://10.0.10.25:2379
[{
    "id": "simple-replication-task",
    "summary": {
      "state": "normal",
      "tso": 417886179132964865,
      "checkpoint": "2020-07-07 16:07:44.881",
      "error": null
    }
}]
  • checkpointは、この時点より前にTiCDCがすでにデータをダウンストリームに複製していることを示します。
  • stateは、レプリケーションタスクの状態を示します。
    • normal :レプリケーションタスクは正常に実行されます。
    • stopped :レプリケーションタスクが停止します(手動で一時停止します)。
    • error :レプリケーションタスクは(エラーによって)停止されます。
    • removed :レプリケーションタスクが削除されます。この状態のタスクは、 --allオプションを指定した場合にのみ表示されます。このオプションが指定されていないときにこれらのタスクを表示するには、 changefeed queryコマンドを実行します。
    • finished :レプリケーションタスクが終了しました(データはtarget-tsにレプリケートされます)。この状態のタスクは、 --allオプションを指定した場合にのみ表示されます。このオプションが指定されていないときにこれらのタスクを表示するには、 changefeed queryコマンドを実行します。

特定のレプリケーションタスクをクエリする

特定のレプリケーションタスクをクエリするには、 changefeed queryコマンドを実行します。クエリ結果には、タスク情報とタスク状態が含まれます。 --simpleまたは-s引数を指定して、基本的なレプリケーション状態とチェックポイント情報のみを含むクエリ結果を簡略化できます。この引数を指定しない場合、詳細なタスク構成、レプリケーション状態、およびレプリケーションテーブル情報が出力されます。

cdc cli changefeed query -s --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task
{
 "state": "normal",
 "tso": 419035700154597378,
 "checkpoint": "2020-08-27 10:12:19.579",
 "error": null
}

上記のコマンドと結果では、次のようになります。

  • stateは、現在のchangefeedの複製状態です。各状態は、 changefeed listの状態と一致している必要があります。
  • tsoは、ダウンストリームに正常に複製された、現在のchangefeedで最大のトランザクションTSOを表します。
  • checkpointは、ダウンストリームに正常に複製された、現在のchangefeedの最大のトランザクションTSOの対応する時間を表します。
  • errorは、現在のchangefeedでエラーが発生したかどうかを記録します。
cdc cli changefeed query --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task
{
  "info": {
    "sink-uri": "mysql://127.0.0.1:3306/?max-txn-row=20\u0026worker-number=4",
    "opts": {},
    "create-time": "2020-08-27T10:33:41.687983832+08:00",
    "start-ts": 419036036249681921,
    "target-ts": 0,
    "admin-job-type": 0,
    "sort-engine": "unified",
    "sort-dir": ".",
    "config": {
      "case-sensitive": true,
      "enable-old-value": false,
      "filter": {
        "rules": [
          "*.*"
        ],
        "ignore-txn-start-ts": null,
        "ddl-allow-list": null
      },
      "mounter": {
        "worker-num": 16
      },
      "sink": {
        "dispatchers": null,
      },
      "scheduler": {
        "type": "table-number",
        "polling-time": -1
      }
    },
    "state": "normal",
    "history": null,
    "error": null
  },
  "status": {
    "resolved-ts": 419036036249681921,
    "checkpoint-ts": 419036036249681921,
    "admin-job-type": 0
  },
  "count": 0,
  "task-status": [
    {
      "capture-id": "97173367-75dc-490c-ae2d-4e990f90da0f",
      "status": {
        "tables": {
          "47": {
            "start-ts": 419036036249681921
          }
        },
        "operation": null,
        "admin-job-type": 0
      }
    }
  ]
}

上記のコマンドと結果では、次のようになります。

  • infoは、照会されたchangefeedの複製構成です。
  • statusは、照会されたchangefeedの複製状態です。
    • resolved-ts :現在のchangefeedで最大のトランザクションTS 。このTSはTiKVからTiCDCに正常に送信されていることに注意してください。
    • checkpoint-ts :現在のchangefeedで最大のトランザクションTS 。このTSはダウンストリームに正常に書き込まれていることに注意してください。
    • admin-job-typechangefeedのステータス:
      • 0 :状態は正常です。
      • 1 :タスクは一時停止されています。タスクが一時停止されると、複製されたすべてのprocessorが終了します。タスクの構成とレプリケーションステータスは保持されるため、 checkpiont-tsからタスクを再開できます。
      • 2 :タスクが再開されます。レプリケーションタスクはcheckpoint-tsから再開します。
      • 3 :タスクは削除されます。タスクが削除されると、複製されたprocessorがすべて終了し、複製タスクの構成情報がクリアされます。レプリケーションステータスのみが、後のクエリのために保持されます。
  • task-statusは、クエリされたchangefeedの各レプリケーションサブタスクの状態を示します。

レプリケーションタスクを一時停止します

次のコマンドを実行して、レプリケーションタスクを一時停止します。

cdc cli changefeed pause --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task

上記のコマンドでは:

  • --changefeed-id=uuidは、一時停止するレプリケーションタスクに対応するchangefeedのIDを表します。

レプリケーションタスクを再開します

次のコマンドを実行して、一時停止したレプリケーションタスクを再開します。

cdc cli changefeed resume --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task

上記のコマンドでは:

  • --changefeed-id=uuidは、再開するレプリケーションタスクに対応するchangefeedのIDを表します。

レプリケーションタスクを削除する

次のコマンドを実行して、レプリケーションタスクを削除します。

cdc cli changefeed remove --pd=http://10.0.10.25:2379 --changefeed-id simple-replication-task

上記のコマンドでは:

  • --changefeed-id=uuidは、削除するレプリケーションタスクに対応するchangefeedのIDを表します。

タスク構成の更新

v4.0.4以降、TiCDCはレプリケーションタスクの構成の変更をサポートします(動的ではありません)。 changefeedの構成を変更するには、タスクを一時停止し、構成を変更してから、タスクを再開します。

cdc cli changefeed pause -c test-cf --pd=http://10.0.10.25:2379
cdc cli changefeed update -c test-cf --pd=http://10.0.10.25:2379 --sink-uri="mysql://127.0.0.1:3306/?max-txn-row=20&worker-number=8" --config=changefeed.toml
cdc cli changefeed resume -c test-cf --pd=http://10.0.10.25:2379

現在、次の構成アイテムを変更できます。

  • changefeedsink-uri
  • changefeedの構成ファイルとファイル内のすべての構成アイテム。
  • ファイルの並べ替え機能と並べ替えディレクトリを使用するかどうか。
  • changefeedのうちのtarget-tsつ。

レプリケーションサブタスクの処理ユニットを管理する( processor

  • processorのリストを照会します。

    cdc cli processor list --pd=http://10.0.10.25:2379
    
    [
            {
                    "id": "9f84ff74-abf9-407f-a6e2-56aa35b33888",
                    "capture-id": "b293999a-4168-4988-a4f4-35d9589b226b",
                    "changefeed-id": "simple-replication-task"
            }
    ]
    
  • 特定のレプリケーションタスクのステータスに対応する特定のchangefeedをクエリします。

    cdc cli processor query --pd=http://10.0.10.25:2379 --changefeed-id=simple-replication-task --capture-id=b293999a-4168-4988-a4f4-35d9589b226b
    
    {
      "status": {
        "tables": {
          "56": {    # ID of the replication table, corresponding to tidb_table_id of a table in TiDB
            "start-ts": 417474117955485702
          }
        },
        "operation": null,
        "admin-job-type": 0
      },
      "position": {
        "checkpoint-ts": 417474143881789441,
        "resolved-ts": 417474143881789441,
        "count": 0
      }
    }
    

    上記のコマンドでは:

    • status.tables :各キー番号はレプリケーションテーブルのIDを表し、TiDBのテーブルのtidb_table_idに対応します。
    • resolved-ts :現在のプロセッサでソートされたデータの中で最大のTSO。
    • checkpoint-ts :現在のプロセッサのダウンストリームに正常に書き込まれた最大のTSO。

タスク構成ファイル

このセクションでは、レプリケーションタスクの構成を紹介します。

# Specifies whether the database names and tables in the configuration file are case-sensitive.
# The default value is true.
# This configuration item affects configurations related to filter and sink.
case-sensitive = true

# Specifies whether to output the old value. New in v4.0.5. Since v5.0, the default value is `true`.
enable-old-value = true

[filter]
# Ignores the transaction of specified start_ts.
ignore-txn-start-ts = [1, 2]

# Filter rules.
# Filter syntax: https://docs.pingcap.com/tidb/stable/table-filter#syntax.
rules = ['*.*', '!test.*']

[mounter]
# mounter thread counts, which is used to decode the TiKV output data.
worker-num = 16

[sink]
# For the sink of MQ type, you can use dispatchers to configure the event dispatcher.
# Since v6.1, TiDB supports two types of event dispatchers: partition and topic. For more information, see the following section.
# The matching syntax of matcher is the same as the filter rule syntax. For details about the matcher rules, see the following section.

dispatchers = [
    {matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1", partition = "ts" },
    {matcher = ['test3.*', 'test4.*'], topic = "Topic expression 2", partition = "index-value" },
    {matcher = ['test1.*', 'test5.*'], topic = "Topic expression 3", partition = "table"},
    {matcher = ['test6.*'], partition = "ts"}
]
# For the sink of MQ type, you can specify the protocol format of the message.
# Currently the following protocols are supported: canal-json, open-protocol, canal, avro, and maxwell.
protocol = "canal-json"

互換性に関する注意事項

  • TiCDC v4.0.0では、 ignore-txn-commit-tsが削除され、 ignore-txn-start-tsが追加されます。これは、start_tsを使用してトランザクションをフィルタリングします。
  • TiCDC v4.0.2では、 db-dbs / db-tables / ignore-dbs / ignore-tablesが削除され、 rulesが追加されました。これは、データベースとテーブルに新しいフィルタールールを使用します。フィルタ構文の詳細については、 テーブルフィルターを参照してください。

KafkaSinkのトピックおよびパーティションディスパッチャのルールをカスタマイズします

マッチャールール

前のセクションの例では:

  • マッチャールールに一致するテーブルの場合、対応するトピック式で指定されたポリシーに従ってディスパッチされます。たとえば、 test3.aaテーブルは「トピック式2」に従ってディスパッチされます。 test5.aaテーブルは「トピック式3」に従ってディスパッチされます。
  • 複数のマッチャールールに一致するテーブルの場合、最初に一致するトピック式に従ってディスパッチされます。たとえば、 test1.aaテーブルは「トピック式1」に従って配布されます。
  • どのマッチャールールにも一致しないテーブルの場合、対応するデータ変更イベントは、 --sink-uriで指定されたデフォルトのトピックに送信されます。たとえば、 test10.aaテーブルはデフォルトのトピックに送信されます。
  • マッチャールールに一致するがトピックディスパッチャーを指定しないテーブルの場合、対応するデータ変更は--sink-uriで指定されたデフォルトトピックに送信されます。たとえば、 test6.aaテーブルはデフォルトのトピックに送信されます。

トピックディスパッチャ

topic = "xxx"を使用してトピックディスパッチャーを指定し、トピック式を使用して柔軟なトピックディスパッチポリシーを実装できます。トピックの総数は1000未満にすることをお勧めします。

トピック式の形式は[prefix]{schema}[middle][{table}][suffix]です。

  • prefix :オプション。トピック名のプレフィックスを示します。
  • {schema} :必須。スキーマ名を照合するために使用されます。
  • middle :オプション。スキーマ名とテーブル名の間の区切り文字を示します。
  • {table} :オプション。テーブル名と一致させるために使用されます。
  • suffix :オプション。トピック名のサフィックスを示します。

prefix 、およびsuffixには、 middle0-9 A-Z_のみを.ことが- a-z{schema}{table}は両方とも小文字です。 {Schema}{TABLE}などのプレースホルダーは無効です。

いくつかの例:

  • matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"
    • test1.table1に対応するデータ変更イベントは、 hello_test1_table1という名前のトピックに送信されます。
    • test2.table2に対応するデータ変更イベントは、 hello_test2_table2という名前のトピックに送信されます。
  • matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"
    • test3のすべてのテーブルに対応するデータ変更イベントは、 hello_test3_worldという名前のトピックに送信されます。
    • test4のすべてのテーブルに対応するデータ変更イベントは、 hello_test4_worldという名前のトピックに送信されます。
  • matcher = ['*.*'], topic = "{schema}_{table}"
    • TiCDCによってリッスンされるすべてのテーブルは、「schema_table」ルールに従って個別のトピックにディスパッチされます。たとえば、 test.accountテーブルの場合、TiCDCはデータ変更ログをtest_accountという名前のトピックにディスパッチします。

DDLイベントをディスパッチします

スキーマレベルのDDL

特定のテーブルに関連しないDDLは、 create databasedrop databaseなどのスキーマレベルのDDLと呼ばれます。スキーマレベルのDDLに対応するイベントは、 --sink-uriで指定されたデフォルトのトピックに送信されます。

テーブルレベルのDDL

特定のテーブルに関連するDDLは、 alter tablecreate tableなどのテーブルレベルのDDLと呼ばれます。テーブルレベルのDDLに対応するイベントは、ディスパッチャの構成に従って対応するトピックに送信されます。

たとえば、 matcher = ['test.*'], topic = {schema}_{table}のようなディスパッチャの場合、DDLイベントは次のようにディスパッチャされます。

  • 単一のテーブルがDDLイベントに関係している場合、DDLイベントは対応するトピックにそのまま送信されます。たとえば、DDLイベントdrop table test.table1の場合、イベントはtest_table1という名前のトピックに送信されます。
  • 複数のテーブルがDDLイベントに関与している場合( rename tableは複数のテーブルに関与している可能性がありdrop table )、DDLイベントは複数のイベントに分割され、対応するトピックに送信されdrop view 。たとえば、DDLイベントrename table test.table1 to test.table10, test.table2 to test.table20の場合、イベントrename table test.table1 to test.table10test_table1という名前のトピックに送信され、イベントrename table test.table2 to test.table20test.table2という名前のトピックに送信されます。

パーティションディスパッチャ

partition = "xxx"を使用して、パーティションディスパッチャを指定できます。 default、ts、index-value、tableの4つのディスパッチャーをサポートします。ディスパッチャのルールは次のとおりです。

  • デフォルト:複数の一意のインデックス(主キーを含む)が存在する場合、または古い値機能が有効になっている場合、イベントはテーブルモードでディスパッチされます。一意のインデックス(または主キー)が1つしかない場合、イベントはインデックス値モードでディスパッチされます。
  • ts:行変更のcommitTを使用して、イベントをハッシュおよびディスパッチします。
  • index-value:主キーの値またはテーブルの一意のインデックスを使用して、イベントをハッシュおよびディスパッチします。
  • table:テーブルのスキーマ名とテーブル名を使用して、イベントをハッシュおよびディスパッチします。

ノート:

v6.1以降、構成の意味を明確にするために、パーティションディスパッチャーを指定するために使用される構成がdispatcherからpartitionに変更され、 partitiondispatcherのエイリアスになりました。たとえば、次の2つのルールはまったく同じです。

[sink]
dispatchers = [
   {matcher = ['*.*'], dispatcher = "ts"},
   {matcher = ['*.*'], partition = "ts"},
]

ただし、 dispatcherpartitionを同じルールに含めることはできません。たとえば、次のルールは無効です。

{matcher = ['*.*'], dispatcher = "ts", partition = "table"},

行変更イベントの履歴値を出力するv4.0.5の新機能

デフォルトの構成では、レプリケーションタスクのTiCDC Open Protocol出力の行変更イベントには変更された値のみが含まれ、変更前の値は含まれません。したがって、出力値は、行変更イベントの履歴値としてTiCDCOpenProtocolのコンシューマー側で使用することはできません。

v4.0.5以降、TiCDCは行変更イベントの履歴値の出力をサポートします。この機能を有効にするには、ルートレベルのchangefeedの構成ファイルで次の構成を指定します。

enable-old-value = true

この機能は、v5.0以降デフォルトで有効になっています。この機能を有効にした後のTiCDCOpenProtocolの出力形式については、 TiCDCオープンプロトコル-行変更イベントを参照してください。

照合用の新しいフレームワークを有効にしてテーブルを複製する

v4.0.15、v5.0.4、v5.1.1、およびv5.2.0以降、TiCDCは照合のための新しいフレームワークを有効にしたテーブルをサポートします。

有効なインデックスなしでテーブルを複製する

v4.0.8以降、TiCDCは、タスク構成を変更することにより、有効なインデックスを持たないテーブルの複製をサポートします。この機能を有効にするには、 changefeedの構成ファイルで次のように構成します。

enable-old-value = true
force-replicate = true
警告

有効なインデックスのないテーブルの場合、 INSERTREPLACEなどの操作は再入可能ではないため、データが冗長になるリスクがあります。 TiCDCは、レプリケーションプロセス中にデータが少なくとも1回だけ配布されることを保証します。したがって、この機能を有効にして有効なインデックスなしでテーブルを複製できるようにすると、データの冗長性が確実に発生します。データの冗長性を受け入れない場合は、 AUTO RANDOM属性を持つ主キー列を追加するなど、効果的なインデックスを追加することをお勧めします。

ユニファイドソーター

統合ソーターは、TiCDCのソーティングエンジンです。次のシナリオによって引き起こされるOOMの問題を軽減できます。

  • TiCDCのデータ複製タスクは長時間一時停止されます。その間、大量の増分データが蓄積され、複製する必要があります。
  • データ複製タスクは早いタイムスタンプから開始されるため、大量の増分データを複製する必要があります。

v4.0.13以降にcdc cliを使用して作成されたチェンジフィードの場合、UnifiedSorterはデフォルトで有効になっています。 v4.0.13より前に存在していたチェンジフィードの場合、以前の構成が使用されます。

チェンジフィードでユニファイドソーター機能が有効になっているかどうかを確認するには、次のコマンド例を実行できます(PDインスタンスのIPアドレスがhttp://10.0.10.25:2379であると想定)。

cdc cli --pd="http://10.0.10.25:2379" changefeed query --changefeed-id=simple-replication-task | grep 'sort-engine'

上記のコマンドの出力で、値sort-engineが「unified」の場合、チェンジフィードでUnifiedSorterが有効になっていることを意味します。

ノート:

  • サーバーで、待ち時間が長いか帯域幅が制限されているメカニカルハードドライブやその他のストレージデバイスを使用している場合は、統合ソーターの使用に注意してください。
  • デフォルトでは、UnifiedSorterはdata_dirを使用して一時ファイルを保存します。空きディスク容量が500GiB以上であることを確認することをお勧めします。実稼働環境では、各ノードの空きディスク容量が(ビジネスで許可される最大checkpoint-tsの遅延)*(ビジネスのピーク時のアップストリーム書き込みトラフィック)よりも大きいことを確認することをお勧めします。さらに、 changefeedの作成後に大量の履歴データを複製する場合は、各ノードの空き領域が複製されたデータの量よりも大きいことを確認してください。
  • 統合ソーターはデフォルトで有効になっています。サーバーが上記の要件に一致せず、統合ソーターを無効にする場合は、チェンジフィードにsort-engineからmemoryを手動で設定する必要があります。
  • memoryを使用してソートする既存のチェンジフィードでUnifiedSorterを有効にするには、 タスクの中断後にTiCDCが再起動された後に発生するOOMを処理するにはどうすればよいですか?で提供されているメソッドを参照してください。

災害シナリオでの結果整合性レプリケーション

警告

現在、災害シナリオで結果整合性のあるレプリケーションを使用することはお勧めしません。詳細については、 重大なバグ#6189を参照してください。

v5.3.0以降、TiCDCは、アップストリームTiDBクラスタからS3ストレージまたはダウンストリームクラスタのNFSファイルシステムへの増分データのバックアップをサポートします。アップストリームクラスタで災害が発生して使用できなくなった場合、TiCDCはダウンストリームデータを結果整合性のある最近の状態に復元できます。これは、TiCDCによって提供される結果整合性のあるレプリケーション機能です。この機能を使用すると、アプリケーションをダウンストリームクラスタにすばやく切り替えることができ、長時間のダウンタイムを回避し、サービスの継続性を向上させることができます。

現在、TiCDCは、増分データをTiDBクラスタから別のTiDBクラスタまたはMySQL互換データベースシステム( Aurora、MySQL、MariaDBを含む)に複製できます。アップストリームクラスタがクラッシュした場合、災害前のTiCDCのレプリケーションステータスは正常であり、レプリケーションラグが小さいという条件で、TiCDCは5分以内にダウンストリームクラスタのデータを復元できます。これにより、最大で10秒のデータ損失が可能になります。つまり、RTO <= 5分、P95 RPO<=10秒です。

TiCDCレプリケーションラグは、次のシナリオで増加します。

  • TPSは短時間で大幅に増加します
  • 大規模または長いトランザクションがアップストリームで発生します
  • アップストリームのTiKVまたはTiCDCクラスタがリロードまたはアップグレードされます
  • add indexなどの時間のかかるDDLステートメントはアップストリームで実行されます
  • PDは積極的なスケジューリング戦略で構成されているため、リージョンリーダーが頻繁に異動したり、リージョンのマージや分割が頻繁に行われたりしリージョン。

前提条件

  • TiCDCのリアルタイムインクリメンタルデータバックアップファイルを保存するための高可用性AmazonS3ストレージまたはNFSシステムを準備します。これらのファイルは、プライマリクラスタの障害が発生した場合にアクセスできます。
  • 災害シナリオで結果整合性を持たせる必要があるチェンジフィードに対して、この機能を有効にします。これを有効にするには、次の構成をチェンジフィード構成ファイルに追加します。
[consistent]
# Consistency level. Options include:
# - none: the default value. In a non-disaster scenario, eventual consistency is only guaranteed if and only if finished-ts is specified.
# - eventual: Uses redo log to guarantee eventual consistency in case of the primary cluster disasters.
level = "eventual"

# Individual redo log file size, in MiB. By default, it's 64. It is recommended to be no more than 128.
max-log-size = 64

# The interval for flushing or uploading redo logs to S3, in milliseconds. By default, it's 1000. The recommended range is 500-2000.
flush-interval = 1000

# Form of storing redo log, including nfs (NFS directory) and S3 (uploading to S3).
storage = "s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"

災害からの回復

プライマリクラスタで災害が発生した場合は、 cdc redoコマンドを実行してセカンダリクラスタで手動で回復する必要があります。復旧プロセスは以下のとおりです。

  1. すべてのTiCDCプロセスが終了したことを確認します。これは、データ回復中にプライマリクラスタがサービスを再開するのを防ぎ、TiCDCがデータ同期を再開するのを防ぐためです。
  2. データ回復にはcdcバイナリを使用します。次のコマンドを実行します。
cdc redo apply --tmp-dir="/tmp/cdc/redo/apply" \
    --storage="s3://logbucket/test-changefeed?endpoint=http://10.0.10.25:24927/" \
    --sink-uri="mysql://normal:123456@10.0.10.55:3306/"

このコマンドの場合:

  • tmp-dir :TiCDCインクリメンタルデータバックアップファイルをダウンロードするための一時ディレクトリを指定します。
  • storage :AmazonS3ストレージまたはNFSディレクトリのいずれかのTiCDCインクリメンタルデータバックアップファイルを保存するためのアドレスを指定します。
  • sink-uri :データを復元するセカンダリクラスタアドレスを指定します。スキームはmysqlのみです。
このページの内容