TiCDC のアーキテクチャと原則
TiCDCアーキテクチャ
複数の TiCDC ノードで構成される TiCDC クラスターは、分散型のステートレスアーキテクチャを使用します。 TiCDC とそのコンポーネントの設計は次のとおりです。
TiCDC コンポーネント
上の図では、TiCDC クラスターは、TiCDC インスタンスを実行する複数のノードで構成されています。各 TiCDC インスタンスはキャプチャ プロセスを実行します。 Capture プロセスの 1 つが所有者 Capture として選出され、ワークロードのスケジュール設定、DDL ステートメントのレプリケート、および管理タスクの実行を担当します。
各キャプチャ プロセスには、上流の TiDB 内のテーブルからデータを複製するための 1 つまたは複数のプロセッサ スレッドが含まれています。 TiCDC ではテーブルがデータ レプリケーションの最小単位であるため、プロセッサは複数のテーブル パイプラインで構成されます。
各パイプラインには、プーラー、ソーター、マウンター、シンクのコンポーネントが含まれています。
これらのコンポーネントは相互に直列に動作して、データのプル、データの並べ替え、データのロード、上流から下流へのデータの複製などのレプリケーション プロセスを完了します。コンポーネントは次のように説明されます。
- プラー: TiKV ノードから DDL と行の変更をプルします。
- ソーター: TiKV ノードから受信した変更をタイムスタンプの昇順に並べ替えます。
- マウンタ: 変更を、スキーマ情報に基づいて TiCDC シンクが処理できる形式に変換します。
- シンク: 変更をダウンストリーム システムにレプリケートします。
高可用性を実現するために、各 TiCDC クラスターは複数の TiCDC ノードを実行します。これらのノードは定期的にステータスを PD の etcd クラスターに報告し、ノードの 1 つを TiCDC クラスターの所有者として選択します。オーナーノードは、etcd に保存されているステータスに基づいてデータをスケジュールし、スケジュール結果を etcd に書き込みます。プロセッサは、etcd のステータスに従ってタスクを完了します。プロセッサを実行しているノードに障害が発生した場合、クラスタはテーブルを他のノードにスケジュールします。所有者ノードに障害が発生した場合、他のノードのキャプチャ プロセスが新しい所有者を選出します。次の図を参照してください。
変更フィードとタスク
TiCDC におけるチェンジフィードとタスクは 2 つの論理概念です。具体的な説明は以下の通りです。
- Changefeed: レプリケーション タスクを表します。これには、複製されるテーブルとダウンストリームに関する情報が含まれます。
- タスク: TiCDC はレプリケーション タスクを受信すると、このタスクをいくつかのサブタスクに分割します。このようなサブタスクはタスクと呼ばれます。これらのタスクは、処理のために TiCDC ノードのキャプチャ プロセスに割り当てられます。
例えば:
cdc cli changefeed create --server="http://127.0.0.1:8300" --sink-uri="kafka://127.0.0.1:9092/cdc-test?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
cat changefeed.toml
......
[sink]
dispatchers = [
{matcher = ['test1.tab1', 'test2.tab2'], topic = "{schema}_{table}"},
{matcher = ['test3.tab3', 'test4.tab4'], topic = "{schema}_{table}"},
]
前述のcdc cli changefeed create
コマンドのパラメーターの詳細については、 TiCDC Changefeedコンフィグレーションパラメータを参照してください。
前述のcdc cli changefeed create
コマンドは、 test1.tab1
、 test1.tab2
、 test3.tab3
、およびtest4.tab4
を Kafka クラスターにレプリケートする変更フィード タスクを作成します。 TiCDC がこのコマンドを受信した後の処理フローは次のとおりです。
- TiCDC は、このタスクを所有者のキャプチャ プロセスに送信します。
- 所有者のキャプチャ プロセスは、この変更フィード タスクに関する情報を PD の etcd に保存します。
- 所有者のキャプチャ プロセスは、チェンジフィード タスクを複数のタスクに分割し、完了するタスクを他のキャプチャ プロセスに通知します。
- キャプチャ プロセスは、TiKV ノードからのデータの取得を開始し、データを処理して、レプリケーションを完了します。
以下は、Changefeed と Task が含まれる TiCDCアーキテクチャ図です。
上の図では、4 つのテーブルをダウンストリームにレプリケートするために変更フィードが作成されます。この変更フィードは 3 つのタスクに分割され、TiCDC クラスター内の 3 つのキャプチャ プロセスにそれぞれ送信されます。 TiCDC がデータを処理した後、データはダウンストリーム システムに複製されます。
TiCDC は、MySQL、TiDB、および Kafka データベースへのデータのレプリケーションをサポートしています。上の図は、チェンジフィード レベルでのデータ転送のプロセスのみを示しています。次のセクションでは、テーブルtable1
を複製する Task1 を例として、TiCDC がデータを処理する方法について詳しく説明します。
- データのプッシュ: データ変更が発生すると、TiKV はデータを Puller モジュールにプッシュします。
- 増分データのスキャン: Puller モジュールは、受信したデータ変更が連続していないと判断した場合、TiKV からデータをプルします。
- データのソート: ソーター モジュールは、TiKV から受信したデータをタイムスタンプに基づいてソートし、ソートされたデータをマウンター モジュールに送信します。
- データのマウント: データ変更を受信した後、マウンター モジュールは TiCDC シンクが理解できる形式でデータをロードします。
- データのレプリケート: シンク モジュールは、データの変更をダウンストリームにレプリケートします。
TiCDC の上流には、トランザクションをサポートする分散リレーショナル データベース TiDB があります。 TiCDC がデータをレプリケートする場合、複数のテーブルをレプリケートするときにデータとトランザクションの一貫性を確保する必要がありますが、これは大きな課題です。次のセクションでは、この課題に対処するために TiCDC が使用する主要なテクノロジーと概念を紹介します。
TiCDC の主要な概念
ダウンストリームのリレーショナル データベースの場合、TiCDC は単一テーブル内のトランザクションの一貫性と、複数のテーブル内の最終的なトランザクションの一貫性を保証します。さらに、TiCDC は、アップストリームの TiDB クラスターで発生したデータ変更を少なくとも 1 回はダウンストリームに複製できることを保証します。
アーキテクチャ関連の概念
- キャプチャ: TiCDC ノードを実行するプロセス。複数のキャプチャ プロセスが TiCDC クラスターを構成します。各キャプチャ プロセスは、データ変更の受信とアクティブなプル、ダウンストリームへのデータの複製など、TiKV でのデータ変更の複製を担当します。
- Capture 所有者: 複数の Capture プロセス間の Capture 所有者。 TiCDC クラスターには一度に 1 つの所有者ロールのみが存在します。キャプチャ オーナーは、クラスター内のデータのスケジュールを担当します。
- プロセッサ: Capture 内の論理スレッド。各プロセッサは、同じレプリケーション ストリーム内の 1 つ以上のテーブルのデータを処理します。キャプチャ ノードは複数のプロセッサを実行できます。
- Changefeed: 上流の TiDB クラスターから下流のシステムにデータを複製するタスク。変更フィードには複数のタスクが含まれており、各タスクはキャプチャ ノードによって処理されます。
タイムスタンプ関連の概念
TiCDC では、データ レプリケーションのステータスを示す一連のタイムスタンプ (TS) が導入されています。これらのタイムスタンプは、データが少なくとも 1 回はダウンストリームにレプリケートされ、データの一貫性が保証されることを保証するために使用されます。
解決済みTS
このタイムスタンプは TiKV と TiCDC の両方に存在します。
TiKV の ResolvedTS:リージョンリーダーの最も早いトランザクションの開始時刻を表します。つまり、
ResolvedTS
= max(ResolvedTS
, min(StartTS
))。 TiDB クラスターには複数の TiKV ノードが含まれるため、すべての TiKV ノード上のリージョンリーダーの最小 ResolvedTS はグローバル ResolvedTS と呼ばれます。 TiDB クラスターは、グローバル ResolvedTS の前のすべてのトランザクションがコミットされることを保証します。あるいは、このタイムスタンプより前にコミットされていないトランザクションがないと仮定することもできます。TiCDC の解決済みTS:
- table ResolvedTS: 各テーブルにはテーブルレベルの ResolvedTS があり、Resolved TS より小さいテーブル内のすべてのデータ変更が受信されたことを示します。簡単に説明すると、このタイムスタンプは、TiKV ノード上のこのテーブルに対応するすべてのリージョンの ResolvedTS の最小値と同じです。
- global ResolvedTS: すべての TiCDC ノード上のすべてのプロセッサの最小 ResolvedTS。各 TiCDC ノードには 1 つ以上のプロセッサがあるため、各プロセッサは複数のテーブル パイプラインに対応します。
TiCDC の場合、TiKV によって送信される ResolvedTS は、
<resolvedTS: timestamp>
の形式の特別なイベントです。一般に、ResolvedTS は次の制約を満たします。table ResolvedTS >= global ResolvedTS
チェックポイントTS
このタイムスタンプは TiCDC にのみ存在します。これは、このタイムスタンプより前に発生したデータ変更がダウンストリーム システムにレプリケートされたことを意味します。
- テーブル CheckpointTS: TiCDC はテーブル内のデータを複製するため、テーブル CheckpointTS は、CheckpointTS がテーブル レベルで複製される前に発生したすべてのデータ変更を示します。
- プロセッサー CheckpointTS: プロセッサー上の最小テーブル CheckpointTS を示します。
- global CheckpointTS: すべてのプロセッサの最小の CheckpointTS を示します。
一般に、チェックポイントTS は次の制約を満たします。
table CheckpointTS >= global CheckpointTS
TiCDC はグローバル ResolvedTS よりも小さいデータのみをダウンストリームにレプリケートするため、完全な制約は次のようになります。
table ResolvedTS >= global ResolvedTS >= table CheckpointTS >= global CheckpointTS
データが変更され、トランザクションがコミットされた後も、TiKV ノード上の ResolvedTS は続行し、TiCDC ノード上の Puller モジュールは TiKV によってプッシュされたデータを受信し続けます。また、Puller モジュールは、受信したデータ変更に基づいて増分データをスキャンするかどうかを決定します。これにより、すべてのデータ変更が TiCDC ノードに確実に送信されます。
Sorter モジュールは、Puller モジュールが受信したデータをタイムスタンプに従って昇順に並べ替えます。このプロセスにより、テーブル レベルでのデータの一貫性が保証されます。次に、マウンター モジュールは、アップストリームからのデータ変更をシンク モジュールが使用できる形式に組み立て、シンク モジュールに送信します。シンク モジュールは、CheckpointTS と ResolvedTS の間のデータ変更をタイムスタンプの順に下流に複製し、下流がデータ変更を受信した後にチェックポイント TS を進めます。
前のセクションでは、DML ステートメントのデータ変更のみを説明しており、DDL ステートメントは含まれていません。次のセクションでは、DDL ステートメントに関連するタイムスタンプを紹介します。
バリアTS
バリア TS は、DDL 変更イベントがある場合、または同期ポイントが使用される場合に生成されます。
- DDL 変更イベント: バリア TS は、DDL ステートメント前のすべての変更がダウンストリームにレプリケートされることを保証します。この DDL ステートメントが実行されてレプリケートされた後、TiCDC は他のデータ変更のレプリケーションを開始します。 DDL ステートメントはキャプチャ オーナーによって処理されるため、DDL ステートメントに対応するバリア TS はオーナー ノードによってのみ生成されます。
- 同期ポイント: TiCDC の同期ポイント機能を有効にすると、指定した
sync-point-interval
に従って TiCDC によってバリア TS が生成されます。このバリア TS が複製される前にすべてのテーブルが変更されると、TiCDC は現在のグローバル チェックポイント TS をプライマリ TS としてダウンストリームの tsMap を記録するテーブルに挿入します。その後、TiCDC はデータ レプリケーションを続行します。
バリア TS が生成された後、TiCDC は、このバリア TS が生成される前に発生したデータ変更のみがダウンストリームに複製されることを保証します。これらのデータ変更がダウンストリームにレプリケートされるまで、レプリケーション タスクは続行されません。オーナー TiCDC は、グローバル CheckpointTS と Barrier TS を継続的に比較することにより、すべてのターゲット データが複製されたかどうかをチェックします。グローバル CheckpointTS が Barrier TS と等しい場合、TiCDC は指定された操作 (DDL ステートメントの実行やグローバル CheckpointTS ダウンストリームの記録など) を実行した後、レプリケーションを続行します。それ以外の場合、TiCDC は、バリア TS がダウンストリームに複製される前に発生するすべてのデータ変更を待機します。
主な工程
このセクションでは、TiCDC の動作原理をより深く理解できるように、TiCDC の主要なプロセスについて説明します。
次のプロセスは TiCDC 内でのみ発生し、ユーザーには透過的であることに注意してください。したがって、どの TiCDC ノードを開始するかを気にする必要はありません。
TiCDC を開始する
所有者ではない TiCDC ノードの場合、次のように動作します。
- キャプチャプロセスを開始します。
- プロセッサを起動します。
- オーナーが実行したタスクスケジュールコマンドを受け取ります。
- スケジューリングコマンドに従って tablePipeline を開始または停止します。
所有者の TiCDC ノードの場合、次のように動作します。
- キャプチャプロセスを開始します。
- ノードが所有者として選出され、対応するスレッドが開始されます。
- チェンジフィード情報を読み取ります。
- チェンジフィード管理プロセスを開始します。
- 変更フィード構成と最新の CheckpointTS に従って TiKV 内のスキーマ情報を読み取り、レプリケートするテーブルを決定します。
- 各プロセッサーによって現在複製されているテーブルのリストを読み取り、追加するテーブルを配布します。
- レプリケーションの進行状況を更新します。
TiCDC を停止する
通常、TiCDC ノードをアップグレードする必要がある場合、または計画されたメンテナンス操作を実行する必要がある場合は、TiCDC ノードを停止します。 TiCDC ノードを停止するプロセスは次のとおりです。
- ノードは自身を停止するコマンドを受け取ります。
- ノードはサービス ステータスを利用不可に設定します。
- ノードは新しいレプリケーション タスクの受信を停止します。
- ノードは、データ複製タスクを他のノードに転送するように所有者ノードに通知します。
- レプリケーションタスクが他のノードに転送された後、ノードは停止します。