TiCDC オープンAPI v2
TiCDCは、TiCDCクラスターのクエリと操作のためのOpenAPI機能を提供します。OpenAPI機能は、 cdc cliツールのサブセットです。
注記:
TiCDC OpenAPI v1は将来削除される予定です。TiCDC OpenAPI v2のご利用をお勧めします。
API を使用して、TiCDC クラスターで次のメンテナンス操作を実行できます。
- TiCDCノードのステータス情報を取得する
- TiCDC クラスターのヘルスステータスを確認する
- レプリケーションタスクを作成する
- レプリケーションタスクを削除する
- レプリケーション構成を更新する
- レプリケーションタスクリストをクエリする
- 特定のレプリケーションタスクをクエリする
- レプリケーションタスクを一時停止する
- レプリケーションタスクを再開する
- レプリケーションサブタスクリストを照会する
- 特定のレプリケーションサブタスクをクエリする
- TiCDC サービス プロセス リストを照会する
- 所有者ノードの退去
- TiCDCサーバーのログレベルを動的に調整する
すべてのAPIのリクエストボディと戻り値はJSON形式です。リクエストが成功すると、 200 OKメッセージが返されます。以下のセクションでは、APIの具体的な使用方法について説明します。
以下の例では、TiCDCサーバーのリスニングIPアドレスは127.0.0.1 、ポートは8300です。TiCDCサーバーの起動時に、 --addr=ip:portでTiCDCにバインドされたIPアドレスとポートを指定できます。
APIエラーメッセージテンプレート
API リクエストの送信後にエラーが発生した場合、返されるエラー メッセージは次の形式になります。
{
"error_msg": "",
"error_code": ""
}
上記の JSON 出力では、 error_msgエラー メッセージを示し、 error_code対応するエラー コードを示します。
APIリストインターフェースの戻り形式
API リクエストがリソースのリスト (たとえば、すべてのCapturesのリスト) を返す場合、TiCDC の戻り形式は次のようになります。
{
"total": 2,
"items": [
{
"id": "d2912e63-3349-447c-90ba-wwww",
"is_owner": true,
"address": "127.0.0.1:8300"
},
{
"id": "d2912e63-3349-447c-90ba-xxxx",
"is_owner": false,
"address": "127.0.0.1:8302"
}
]
}
上記の例では、
total: リソースの合計数を示します。items: このリクエストによって返されるすべてのリソースを含む配列。配列のすべての要素は同じリソースです。
TiCDCノードのステータス情報を取得する
このAPIは同期インターフェースです。リクエストが成功すると、対応するノードのステータス情報が返されます。
リクエストURI
GET /api/v2/status
例
次のリクエストは、IP アドレスが127.0.0.1でポート番号が8300ある TiCDC ノードのステータス情報を取得します。
curl -X GET http://127.0.0.1:8300/api/v2/status
{
"version": "v8.5.3",
"git_hash": "10413bded1bdb2850aa6d7b94eb375102e9c44dc",
"id": "d2912e63-3349-447c-90ba-72a4e04b5e9e",
"pid": 1447,
"is_owner": true,
"liveness": 0
}
上記の出力のパラメータは次のとおりです。
version: TiCDC の現在のバージョン番号。git_hash: Git ハッシュ値。id: ノードのキャプチャ ID。pid: ノードのキャプチャプロセス ID (PID)。is_owner: ノードが所有者であるかどうかを示します。liveness: このノードがライブかどうか。20正常を意味します。41ノードがgraceful shutdown状態にあることを意味します。
TiCDC クラスターのヘルスステータスを確認する
このAPIは同期インターフェースです。クラスターが正常な場合は200 OK返されます。
リクエストURI
GET /api/v2/health
例
curl -X GET http://127.0.0.1:8300/api/v2/health
クラスターが正常な場合、応答は200 OKと空の JSON オブジェクトになります。
{}
クラスターが正常でない場合、応答はエラー メッセージを含む JSON オブジェクトになります。
レプリケーションタスクを作成する
このインターフェースは、TiCDCにレプリケーションタスクを送信するために使用されます。リクエストが成功した場合、 200 OKが返されます。返された結果は、サーバーがコマンドの実行に同意したことを意味するだけで、コマンドが正常に実行されることを保証するものではありません。
リクエストURI
POST /api/v2/changefeeds
パラメータの説明
{
"changefeed_id": "string",
"replica_config": {
"bdr_mode": true,
"case_sensitive": false,
"check_gc_safe_point": true,
"consistent": {
"flush_interval": 0,
"level": "string",
"max_log_size": 0,
"storage": "string"
},
"enable_sync_point": true,
"filter": {
"event_filters": [
{
"ignore_delete_value_expr": "string",
"ignore_event": [
"string"
],
"ignore_insert_value_expr": "string",
"ignore_sql": [
"string"
],
"ignore_update_new_value_expr": "string",
"ignore_update_old_value_expr": "string",
"matcher": [
"string"
]
}
],
"ignore_txn_start_ts": [
0
],
"rules": [
"string"
]
},
"force_replicate": true,
"ignore_ineligible_table": true,
"memory_quota": 0,
"mounter": {
"worker_num": 0
},
"sink": {
"column_selectors": [
{
"columns": [
"string"
],
"matcher": [
"string"
]
}
],
"csv": {
"delimiter": "string",
"include_commit_ts": true,
"null": "string",
"quote": "string"
},
"date_separator": "string",
"dispatchers": [
{
"matcher": [
"string"
],
"partition": "string",
"topic": "string"
}
],
"enable_partition_separator": true,
"encoder_concurrency": 0,
"protocol": "string",
"schema_registry": "string",
"terminator": "string",
"transaction_atomicity": "string"
},
"sync_point_interval": "string",
"sync_point_retention": "string"
},
"sink_uri": "string",
"start_ts": 0,
"target_ts": 0
}
パラメータの説明は次のとおりです。
changefeed_id target_ts意味とsink_uriは、 cdc cliを使用してレプリケーションタスクを作成するドキュメントに記載されているものと同じです。これらstart_tsパラメータの詳細については、 sink_uriのドキュメントを参照してください。11 で証明書パスを指定する際は、対応する証明書が対応する TiCDCサーバーにアップロードされていることを確認してください。
replica_configパラメータの説明は次のとおりです。
consistentパラメータは次のように記述されます。
filterパラメータは次のように記述されます。
filter.event_filtersパラメータの説明は以下のとおりです。詳細については変更フィードログフィルター参照してください。
mounterパラメータは次のように記述されます。
sinkパラメータは次のように記述されます。
sink.column_selectorsは配列です。パラメータは以下のとおりです。
sink.csvパラメータは次のように記述されます。
sink.dispatchers : MQタイプのシンクの場合、このパラメータを使用してイベントディスパッチャを設定できます。サポートされているディスパッチャはdefault 、 ts 、 index-value 、 tableです。ディスパッチャのルールは以下のとおりです。
default:tableモードでイベントを送信します。ts: 行変更の commitTs を使用してハッシュ値を作成し、イベントをディスパッチします。index-value: 選択した HandleKey 列の名前と値を使用してハッシュ値を作成し、イベントをディスパッチします。table: テーブルのスキーマ名とテーブル名を使用してハッシュ値を作成し、イベントをディスパッチします。
sink.dispatchersは配列です。パラメータは以下のとおりです。
sink.cloud_storage_configパラメータは次のように記述されます。
sink.openパラメータは次のように記述されます。
sink.debeziumパラメータは次のように記述されます。
例
次のリクエストは、ID がtest5でblackhome:// sink_uriあるレプリケーション タスクを作成します。
curl -X POST -H "Content-type: application/json" http://127.0.0.1:8300/api/v2/changefeeds -d '{"changefeed_id":"test5","sink_uri":"blackhole://"}'
リクエストが成功した場合は200 OK返されます。リクエストが失敗した場合は、エラーメッセージとエラーコードが返されます。
レスポンス本文の形式
{
"admin_job_type": 0,
"checkpoint_time": "string",
"checkpoint_ts": 0,
"config": {
"bdr_mode": true,
"case_sensitive": false,
"check_gc_safe_point": true,
"consistent": {
"flush_interval": 0,
"level": "string",
"max_log_size": 0,
"storage": "string"
},
"enable_sync_point": true,
"filter": {
"event_filters": [
{
"ignore_delete_value_expr": "string",
"ignore_event": [
"string"
],
"ignore_insert_value_expr": "string",
"ignore_sql": [
"string"
],
"ignore_update_new_value_expr": "string",
"ignore_update_old_value_expr": "string",
"matcher": [
"string"
]
}
],
"ignore_txn_start_ts": [
0
],
"rules": [
"string"
]
},
"force_replicate": true,
"ignore_ineligible_table": true,
"memory_quota": 0,
"mounter": {
"worker_num": 0
},
"sink": {
"column_selectors": [
{
"columns": [
"string"
],
"matcher": [
"string"
]
}
],
"csv": {
"delimiter": "string",
"include_commit_ts": true,
"null": "string",
"quote": "string"
},
"date_separator": "string",
"dispatchers": [
{
"matcher": [
"string"
],
"partition": "string",
"topic": "string"
}
],
"enable_partition_separator": true,
"encoder_concurrency": 0,
"protocol": "string",
"schema_registry": "string",
"terminator": "string",
"transaction_atomicity": "string"
},
"sync_point_interval": "string",
"sync_point_retention": "string"
},
"create_time": "string",
"creator_version": "string",
"error": {
"addr": "string",
"code": "string",
"message": "string"
},
"id": "string",
"resolved_ts": 0,
"sink_uri": "string",
"start_ts": 0,
"state": "string",
"target_ts": 0,
"task_status": [
{
"capture_id": "string",
"table_ids": [
0
]
}
]
}
パラメータの説明は次のとおりです。
task_statusパラメータは次のように記述されます。
errorパラメータは次のように記述されます。
レプリケーションタスクを削除する
このAPIは、レプリケーションタスクを削除するためのべき等インターフェース(つまり、最初の適用後、結果を変更することなく複数回適用できるインターフェース)です。リクエストが成功した場合、 200 OKが返されます。返された結果は、サーバーがコマンドの実行に同意したことを意味するだけで、コマンドが正常に実行されることを保証するものではありません。
リクエストURI
DELETE /api/v2/changefeeds/{changefeed_id}
パラメータの説明
パスパラメータ
例
次のリクエストは、ID test1のレプリケーション タスクを削除します。
curl -X DELETE http://127.0.0.1:8300/api/v2/changefeeds/test1
リクエストが成功した場合は200 OK返されます。リクエストが失敗した場合は、エラーメッセージとエラーコードが返されます。
レプリケーション構成を更新する
このAPIはレプリケーションタスクの更新に使用されます。リクエストが成功した場合、 200 OK返されます。返された結果は、サーバーがコマンドの実行に同意したことを意味するだけで、コマンドが正常に実行されることを保証するものではありません。
changefeed 設定を変更するには、 pause the replication task -> modify the configuration -> resume the replication taskの手順に従います。
リクエストURI
PUT /api/v2/changefeeds/{changefeed_id}
パラメータの説明
パスパラメータ
リクエスト本体のパラメータ
{
"replica_config": {
"bdr_mode": true,
"case_sensitive": false,
"check_gc_safe_point": true,
"consistent": {
"flush_interval": 0,
"level": "string",
"max_log_size": 0,
"storage": "string"
},
"enable_sync_point": true,
"filter": {
"event_filters": [
{
"ignore_delete_value_expr": "string",
"ignore_event": [
"string"
],
"ignore_insert_value_expr": "string",
"ignore_sql": [
"string"
],
"ignore_update_new_value_expr": "string",
"ignore_update_old_value_expr": "string",
"matcher": [
"string"
]
}
],
"ignore_txn_start_ts": [
0
],
"rules": [
"string"
]
},
"force_replicate": true,
"ignore_ineligible_table": true,
"memory_quota": 0,
"mounter": {
"worker_num": 0
},
"sink": {
"column_selectors": [
{
"columns": [
"string"
],
"matcher": [
"string"
]
}
],
"csv": {
"delimiter": "string",
"include_commit_ts": true,
"null": "string",
"quote": "string"
},
"date_separator": "string",
"dispatchers": [
{
"matcher": [
"string"
],
"partition": "string",
"topic": "string"
}
],
"enable_partition_separator": true,
"encoder_concurrency": 0,
"protocol": "string",
"schema_registry": "string",
"terminator": "string",
"transaction_atomicity": "string"
},
"sync_point_interval": "string",
"sync_point_retention": "string"
},
"sink_uri": "string",
"target_ts": 0
}
現在、API 経由で変更できるのは次の構成のみです。
上記のパラメータの意味はセクションレプリケーションタスクを作成すると同じです。詳細については、セクション1を参照してください。
例
次のリクエストは、ID test1のレプリケーション タスクのtarget_ts 32に更新します。
curl -X PUT -H "Content-type: application/json" http://127.0.0.1:8300/api/v2/changefeeds/test1 -d '{"target_ts":32}'
リクエストが成功した場合は200 OK返されます。リクエストが失敗した場合は、エラーメッセージとエラーコードが返されます。JSONレスポンスボディの意味はレプリケーションタスクを作成するセクションと同じです。詳細は3のセクションを参照してください。
レプリケーションタスクリストをクエリする
このAPIは同期インターフェースです。リクエストが成功すると、TiCDCクラスター内のすべてのレプリケーションタスク(変更フィード)の基本情報が返されます。
リクエストURI
GET /api/v2/changefeeds
パラメータの説明
クエリパラメータ
stateの値のオプションはall 、 normal 、 stopped 、 error 、 failed 、 finishedです。
このパラメータを指定しない場合は、 normal 、 stopped 、またはfailed状態のレプリケーション タスクの基本情報がデフォルトで返されます。
例
次のリクエストは、状態normalにあるすべてのレプリケーション タスクの基本情報を照会します。
curl -X GET http://127.0.0.1:8300/api/v2/changefeeds?state=normal
{
"total": 2,
"items": [
{
"id": "test",
"state": "normal",
"checkpoint_tso": 439749918821711874,
"checkpoint_time": "2023-02-27 23:46:52.888",
"error": null
},
{
"id": "test2",
"state": "normal",
"checkpoint_tso": 439749918821711874,
"checkpoint_time": "2023-02-27 23:46:52.888",
"error": null
}
]
}
上記の返された結果のパラメータは次のように説明されます。
id: レプリケーション タスクの ID。state: レプリケーション タスクの現在の州 。checkpoint_tso: レプリケーション タスクの現在のチェックポイントの TSO。checkpoint_time: レプリケーション タスクの現在のチェックポイントのフォーマットされた時刻。error: レプリケーション タスクのエラー情報。
特定のレプリケーションタスクをクエリする
このAPIは同期インターフェースです。リクエストが成功すると、指定されたレプリケーションタスク(changefeed)の詳細情報が返されます。
リクエストURI
GET /api/v2/changefeeds/{changefeed_id}
パラメータの説明
パスパラメータ
例
次のリクエストは、ID test1のレプリケーション タスクの詳細情報を照会します。
curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test1
JSONレスポンスボディの意味はセクションレプリケーションタスクを作成すると同じです。詳細はセクション1を参照してください。
特定のレプリケーションタスクが完了したかどうかを照会する
このAPIは同期インターフェースです。リクエストが成功すると、指定されたレプリケーションタスク(changefeed)の同期ステータス(タスクの完了の有無など)が返されます。
リクエストURI
GET /api/v2/changefeeds/{changefeed_id}/synced
パラメータの説明
パスパラメータ
例
次のリクエストは、ID test1のレプリケーション タスクの同期ステータスを照会します。
curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/test1/synced
例1: 同期が完了しました
{
"synced": true,
"sink_checkpoint_ts": "2023-11-30 15:14:11.015",
"puller_resolved_ts": "2023-11-30 15:14:12.215",
"last_synced_ts": "2023-11-30 15:08:35.510",
"now_ts": "2023-11-30 15:14:11.511",
"info": "Data syncing is finished"
}
応答には次のフィールドが含まれます。
synced: このレプリケーションタスクが完了したかどうか。2trueタスクが完了したこと、falseタスクが完了していない可能性があることを意味します。6false場合は、infoフィールドとその他のフィールドの両方で具体的なステータスを確認する必要があります。sink_checkpoint_ts: シンク モジュールのチェックポイント ts 値 (PD 時間)。puller_resolved_ts: PD 時間での、プラー モジュールのresolved-ts値。last_synced_ts: TiCDC によって処理された最新のデータの commit-ts 値 (PD 時間)。now_ts: 現在の PD 時間。info: 特にsyncedがfalse場合、同期ステータスの判別を支援する補足情報。
例2: 同期が完了していない
{
"synced": false,
"sink_checkpoint_ts": "2023-11-30 15:26:31.519",
"puller_resolved_ts": "2023-11-30 15:26:23.525",
"last_synced_ts": "2023-11-30 15:24:30.115",
"now_ts": "2023-11-30 15:26:31.511",
"info": "The data syncing is not finished, please wait"
}
この例は、進行中のレプリケーションタスクの応答を示しています。フィールドsyncedとinfo両方を確認することで、レプリケーションタスクがまだ完了しておらず、さらに待機する必要があることがわかります。
例3: 同期ステータスをさらに確認する必要がある
{
"synced":false,
"sink_checkpoint_ts":"2023-12-13 11:45:13.515",
"puller_resolved_ts":"2023-12-13 11:45:13.525",
"last_synced_ts":"2023-12-13 11:45:07.575",
"now_ts":"2023-12-13 11:50:24.875",
"info":"Please check whether PD is online and TiKV Regions are all available. If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. To check whether TiKV regions are all available, you can view 'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait"
}
このAPIを使用すると、上流クラスタで災害が発生した場合でも同期ステータスを照会できます。状況によっては、TiCDCの現在のデータレプリケーションタスクが完了しているかどうかを直接確認できない場合があります。そのような場合は、このAPIをリクエストし、レスポンスのinfoフィールドと上流クラスタの現在のステータスの両方を確認することで、具体的なステータスを確認できます。
この例では、 sink_checkpoint_ts now_tsより遅れていますが、これは TiCDC がまだデータレプリケーションに追いついていないか、PD または TiKV に障害が発生しているためです。TiCDC がまだデータレプリケーションに追いついていない場合は、レプリケーションタスクがまだ完了していないことを意味します。PD または TiKV に障害が発生した場合は、レプリケーションタスクが完了していることを意味します。したがって、クラスターのステータスを確認するには、 infoフィールドを確認する必要があります。
例4: クエリエラー
{
"error_msg": "[CDC:ErrPDEtcdAPIError]etcd api call error: context deadline exceeded",
"error_code": "CDC:ErrPDEtcdAPIError"
}
上流クラスターのPDに長時間障害が発生した場合、このAPIクエリを実行すると、前述のエラーと同様のエラーが返されることがあります。このエラーでは、それ以上確認するための情報は提供されません。PDの障害はTiCDCのデータレプリケーションに直接影響するため、このようなエラーが発生した場合、TiCDCは可能な限りデータレプリケーションを完了していると考えられますが、下流クラスターではPDの障害によりデータ損失が発生する可能性が依然としてあります。
レプリケーションタスクを一時停止する
このAPIはレプリケーションタスクを一時停止します。リクエストが成功した場合、 200 OKが返されます。返された結果は、サーバーがコマンドの実行に同意したことを意味するだけで、コマンドが正常に実行されることを保証するものではありません。
リクエストURI
POST /api/v2/changefeeds/{changefeed_id}/pause
パラメータの説明
パスパラメータ
例
次のリクエストは、ID test1のレプリケーション タスクを一時停止します。
curl -X POST http://127.0.0.1:8300/api/v2/changefeeds/test1/pause
リクエストが成功した場合は200 OK返されます。リクエストが失敗した場合は、エラーメッセージとエラーコードが返されます。
レプリケーションタスクを再開する
このAPIはレプリケーションタスクを再開します。リクエストが成功した場合、 200 OKが返されます。返された結果は、サーバーがコマンドの実行に同意したことを意味するだけで、コマンドが正常に実行されることを保証するものではありません。
リクエストURI
POST /api/v2/changefeeds/{changefeed_id}/resume
パラメータの説明
パスパラメータ
リクエスト本体のパラメータ
{
"overwrite_checkpoint_ts": 0
}
例
次のリクエストは、ID test1のレプリケーション タスクを再開します。
curl -X POST http://127.0.0.1:8300/api/v2/changefeeds/test1/resume -d '{}'
リクエストが成功した場合は200 OK返されます。リクエストが失敗した場合は、エラーメッセージとエラーコードが返されます。
レプリケーションサブタスクリストを照会する
このAPIは同期インターフェースです。リクエストが成功すると、すべてのレプリケーションサブタスク( processor )の基本情報が返されます。
リクエストURI
GET /api/v2/processors
例
curl -X GET http://127.0.0.1:8300/api/v2/processors
{
"total": 3,
"items": [
{
"changefeed_id": "test2",
"capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e"
},
{
"changefeed_id": "test1",
"capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e"
},
{
"changefeed_id": "test",
"capture_id": "d2912e63-3349-447c-90ba-72a4e04b5e9e"
}
]
}
パラメータの説明は次のとおりです。
changefeed_id: 変更フィード ID。capture_id: キャプチャ ID。
特定のレプリケーションサブタスクをクエリする
このAPIは同期インターフェースです。リクエストが成功すると、指定されたレプリケーションサブタスク( processor )の詳細情報を返します。
リクエストURI
GET /api/v2/processors/{changefeed_id}/{capture_id}
パラメータの説明
パスパラメータ
例
次のリクエストは、 changefeed_idがtest 、 capture_idが561c3784-77f0-4863-ad52-65a3436db6afあるサブタスクの詳細情報を取得します。サブタスクはchangefeed_idとcapture_idで識別できます。
curl -X GET http://127.0.0.1:8300/api/v2/processors/test/561c3784-77f0-4863-ad52-65a3436db6af
{
"table_ids": [
80
]
}
パラメータの説明は次のとおりです。
table_ids: このキャプチャで複製されるテーブル ID。
TiCDC サービス プロセス リストを照会する
このAPIは同期インターフェースです。リクエストが成功すると、すべてのレプリケーションプロセスの基本情報( capture )が返されます。
リクエストURI
GET /api/v2/captures
例
curl -X GET http://127.0.0.1:8300/api/v2/captures
{
"total": 1,
"items": [
{
"id": "d2912e63-3349-447c-90ba-72a4e04b5e9e",
"is_owner": true,
"address": "127.0.0.1:8300"
}
]
}
パラメータの説明は次のとおりです。
id: キャプチャ ID。is_owner: キャプチャが所有者であるかどうか。address: キャプチャのアドレス。
所有者ノードの退去
このAPIは非同期インターフェースです。リクエストが成功した場合、 200 OKが返されます。返された結果は、サーバーがコマンドの実行に同意したことを意味するだけで、コマンドが正常に実行されることを保証するものではありません。
リクエストURI
POST /api/v2/owner/resign
例
次のリクエストは、TiCDC の現在の所有者ノードを削除し、新しい所有者ノードを生成するための新しいラウンドの選挙をトリガーします。
curl -X POST http://127.0.0.1:8300/api/v2/owner/resign
リクエストが成功した場合は200 OK返されます。リクエストが失敗した場合は、エラーメッセージとエラーコードが返されます。
TiCDCサーバーのログレベルを動的に調整する
このAPIは同期インターフェースです。リクエストが成功すると200 OK返されます。
リクエストURI
POST /api/v2/log
リクエストパラメータ
リクエスト本体のパラメータ
log_level 、「debug」、「info」、「warn」、「error」、「dpanic」、「panic」、「fatal」のzapが提供するログレベルサポートします。
例
curl -X POST -H "Content-type: application/json" http://127.0.0.1:8300/api/v2/log -d '{"log_level":"debug"}'
リクエストが成功した場合は200 OK返されます。リクエストが失敗した場合は、エラーメッセージとエラーコードが返されます。