アパッチ・パルサーへシンク
このドキュメントでは、TiDB CloudからApache Pulsarにデータをストリーミングするためのチェンジフィードを作成する方法について説明します。
注記:
- 変更フィード機能を使用してデータをApache Pulsarに複製するには、 TiDB Cloud Dedicatedクラスタのバージョンがv7.5.1以降であることを確認してください。
- TiDB Cloud Starterインスタンスでは、変更フィード機能は利用できません。
- TiDB Cloud Essentialインスタンスの場合、変更フィード機能はベータ版です。詳細については、 変更フィード(ベータ版)を参照してください。
制限
- TiDB Cloudの各クラスターにつき、最大100個のチェンジフィードを作成できます。
- 現在、 TiDB Cloudは、Pulsarブローカーに接続するための自己署名TLS証明書のアップロードをサポートしていません。
- TiDB Cloud はTiCDC を使用して変更フィードを確立するため、同じTiCDCの制限があります。
- 複製対象のテーブルに主キーまたはNULLを許容しない一意インデックスがない場合、複製中に一意制約が存在しないことで、一部の再試行シナリオにおいて、下流で重複データが挿入される可能性があります。
- 現在、TiCDCはPulsarトピックを自動的に作成しません。イベントをトピックに送信する前に、Pulsarにそのトピックが存在することを確認してください。
前提条件
Apache Pulsarにデータをストリーミングするためのチェンジフィードを作成する前に、以下の前提条件を満たす必要があります。
- ネットワーク接続を設定する
- Pulsar ACL認証のための権限を追加する
- Apache Pulsarでトピックを手動で作成するか、Apache Pulsarブローカーの設定で
allowAutoTopicCreation有効にしてください。
ネットワーク
TiDBクラスタがApache Pulsarサービスに接続できることを確認してください。接続方法は以下のいずれかを選択できます。
- VPCピアリング:潜在的なVPC CIDRの競合を回避するためのネットワーク計画と、セキュリティ上の懸念事項の考慮が必要です。
- パブリックIP:PulsarがパブリックIPをアドバタイズする場合に適しています。この方法は本番環境では推奨されず、セキュリティ上の懸念事項を慎重に検討する必要があります。
Apache PulsarサービスがインターネットにアクセスできないAWS VPC内にある場合は、以下の手順を実行してください。
Apache Pulsar サービスの VPC と TiDB クラスターの間でVPCピアリング接続を設定する。
Apache Pulsarサービスが関連付けられているセキュリティグループの受信ルールを変更します。
TiDB Cloudクラスタが配置されているリージョンのCIDRを受信ルールに追加する必要があります。CIDRはVPCピアリングページで確認できます。これにより、TiDBクラスタからPulsarブローカーへのトラフィックが流れるようになります。
Apache PulsarのURLにホスト名が含まれている場合、 TiDB CloudがApache PulsarブローカーのDNSホスト名を解決できるようにする必要があります。
- VPCピアリング接続のDNS解決を有効にするの手順に従います。
- アクセプターDNS解決オプションを有効にする。
Apache Pulsar サービスがインターネットにアクセスできない Google Cloud VPC 内にある場合は、以下の手順を実行してください。
Apache Pulsar サービスの VPC と TiDB クラスターの間でVPCピアリング接続を設定する。
Apache Pulsarが配置されているVPCのイングレスファイアウォールルールを変更します。
TiDB Cloudクラスタが配置されているリージョンのCIDRを、イングレスファイアウォールルールに追加する必要があります。CIDRはVPCピアリングページで確認できます。これにより、TiDBクラスタからPulsarブローカーへのトラフィックが流れるようになります。
Apache PulsarサービスにパブリックIPアクセスを提供する場合は、すべてのPulsarブローカーにパブリックIPアドレスを割り当ててください。
本番環境でパブリックIPを使用することは推奨されません。
Apache Pulsarでトピックを作成する
現在、TiCDCはPulsarトピックを自動的に作成しません。変更フィードを作成する前に、Pulsarで必要なトピックを作成する必要があります。トピックの数と名前は、優先する配信モードによって異なります。
- Pulsarのすべてのメッセージを単一のトピックに配信するには、任意の名前でトピックを1つ作成してください。
- 各テーブルの Pulsar メッセージを専用のトピックに配信するには、複製する各テーブルに対して
<Topic Prefix><DatabaseName><Separator><TableName><Topic Suffix>形式でトピックを作成します。 - データベースの Pulsar メッセージを専用トピックに配信するには、複製するデータベースごとに
<Topic Prefix><DatabaseName><Topic Suffix>形式でトピックを作成します。
設定によっては、行以外のイベント(スキーマの変更など)用のデフォルトトピックが必要になる場合もあります。
詳細については、Apache Pulsar ドキュメントのトピックの作成方法を参照してください。
ステップ1. Apache PulsarのChangefeedページを開きます。
- TiDB Cloudコンソールにログインします。
- 変更フィードイベントの発生源となるTiDBクラスタのクラスタ概要ページに移動し、左側のナビゲーションペインで「データ」 > 「変更フィード」をクリックします。
- 変更フィードを作成をクリックします。
ステップ2. changefeedの送信先を設定します
「目的地」セクションで「Pulsar」を選択してください。
接続セクションに、以下の情報を入力してください。
- 宛先プロトコル: PulsarまたはPulsar+SSLを選択してください。
- 接続方法:Pulsarエンドポイントへの接続方法に応じて、 「VPCピアリング」または「パブリック」を選択してください。
- Pulsar Broker : Pulsar Broker のエンドポイントを入力します。ポートとドメインまたは IP アドレスはコロンで区切ります。例
example.org:6650。
認証セクションで、Pulsarの認証設定に応じて「認証タイプ」オプションを選択します。選択したオプションに基づいて、要求された認証情報を入力します。
オプション:詳細設定セクションで、追加設定を構成します。
- 圧縮:この変更フィードのデータに対して、オプションの圧縮アルゴリズムを選択してください。
- バッチあたりの最大メッセージ数と最大公開遅延時間:Pulsarに送信されるイベントメッセージのバッチ処理を指定します。バッチあたりの最大メッセージ数は、バッチあたりの最大メッセージ数を設定し、最大公開遅延時間は、バッチを送信する前の最大待機時間を設定します。
- 接続タイムアウト:PulsarへのTCP接続を確立するためのタイムアウト時間を調整します。
- 操作タイムアウト:TiCDC Pulsarクライアントを使用して操作を開始する際のタイムアウト時間を調整します。
- 送信タイムアウト:TiCDC Pulsarプロデューサーがメッセージを送信するまでのタイムアウトを調整します。
「次へ」をクリックしてネットワーク接続をテストしてください。テストが成功すると、次のステップに進みます。
ステップ3. 変更フィードのレプリケーションを設定する
テーブル フィルターをカスタマイズして、複製するテーブルをフィルターします。ルールの構文については、テーブルフィルタルールを参照してください。
- 大文字小文字の区別:フィルタルールにおけるデータベース名とテーブル名の照合において、大文字小文字を区別するかどうかを設定できます。デフォルトでは、大文字小文字は区別されません。
- フィルタルール:この列でフィルタルールを設定できます。デフォルトでは、すべてのテーブルを複製するルール
*.*が設定されています。新しいルールを追加すると、 TiDB Cloud はTiDB 内のすべてのテーブルをクエリし、右側のボックスにルールに一致するテーブルのみを表示します。フィルタルールは最大 100 個まで追加できます。 - 有効なキーを持つテーブル:この列には、主キーや一意インデックスなど、有効なキーを持つテーブルが表示されます。
- 有効なキーのないテーブル: この列には、主キーまたは一意キーがないテーブルが表示されます。一意の識別子がないと、ダウンストリームが重複イベントを処理する際にデータの一貫性が失われる可能性があるため、これらのテーブルはレプリケーション中に問題となります。データの一貫性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意キーまたは主キーを追加することをお勧めします。または、フィルタルールを追加してこれらのテーブルを除外することもできます。たとえば、ルール
test.tbl1を使用して、テーブル"!test.tbl1"除外できます。
イベントフィルターをカスタマイズして、複製したいイベントを絞り込みます。
- 一致するテーブル:この列では、イベントフィルターを適用するテーブルを設定できます。ルールの構文は、前のテーブルフィルター領域で使用されているものと同じです。変更フィードごとに最大10個のイベントフィルタールールを追加できます。
- イベントフィルター:以下のイベントフィルターを使用して、変更フィードから特定のイベントを除外できます。
- イベントを無視する:指定されたイベントタイプを除外します。
- SQL を無視: 指定された式に一致する DDL イベントを除外します。たとえば、
^dropDROPで始まるステートメントを除外し、add columnはADD COLUMNを含むステートメントを除外します。 - 挿入値の式を無視する: 特定の条件を満たす
INSERTステートメントを除外します。たとえば、id >= 100は、INSERTが 100 以上であるidステートメントを除外します。 - 新しい値の更新式を無視する: 新しい値が指定された条件に一致する
UPDATEステートメントを除外します。たとえば、gender = 'male'はgenderがmaleになるような更新を除外します。 - 古い値の更新を無視する式: 古い値が指定された条件に一致する
UPDATEステートメントを除外します。たとえば、age < 18ageの古い値が 18 未満である場合の更新を除外します。 - 削除値式を無視する: 指定された条件を満たす
DELETEステートメントを除外します。たとえば、name = 'john'はDELETEがnameである'john'ステートメントを除外します。
「レプリケーション開始位置」領域で、チェンジフィードがPulsarにデータをレプリケートする開始点を選択します。
- レプリケーションを今すぐ開始します。変更フィードは、現在の時点からデータのレプリケーションを開始します。
- 特定の TSO からレプリケーションを開始する: 変更フィードは、指定されたTSO以降のデータのレプリケーションを開始します。指定された TSO はガベージコレクションの安全地点内にある必要があります。
- 特定の時刻からレプリケーションを開始する:変更フィードは、指定されたタイムスタンプ以降からデータのレプリケーションを開始します。指定されたタイムスタンプは、ガベージコレクションのセーフポイント内である必要があります。
データフォーマットの領域で、希望するPulsarメッセージのフォーマットを選択してください。
Canal-JSONは、解析が容易なプレーンなJSONテキスト形式です。詳細については、 TiCDC Canal- JSONプロトコルを参照してください。
TiDB 拡張フィールドを Pulsar メッセージ本文に追加するには、 TiDB 拡張オプションを有効にします。詳細については、 TiCDC Canal- JSONプロトコルのTiDB拡張フィールド参照してください。
トピック配信エリアで配信モードを選択し、選択したモードに応じてトピック名の設定を入力します。
配信モードは、チェンジフィードがイベントメッセージをPulsarトピックにどのように配信するかを制御します。すべてのメッセージを1つのトピックに送信するか、テーブルごとまたはデータベースごとに特定のトピックに送信するかを選択できます。
注記:
ダウンストリームとしてPulsarを選択した場合、チェンジフィードは自動的にトピックを作成しません。必要なトピックは事前に作成しておく必要があります。
すべての変更ログを、指定された1つのPulsarトピックに送信する
変更フィードからすべてのメッセージを単一のPulsarトピックに送信する場合は、このモードを選択してください。トピック名は「トピック名」フィールドで指定できます。
変更履歴をテーブルごとにPulsarトピックに配布する
変更フィードで各テーブルのすべての Pulsar メッセージを専用の Pulsar トピックに送信する場合は、このモードを選択してください。トピック名は、トピック プレフィックス、データベース名とテーブル名の間の区切り文字、およびトピック サフィックスを設定することで指定できます。たとえば、区切り文字を
_に設定すると、Pulsar メッセージは<Topic Prefix><DatabaseName>_<TableName><Topic Suffix>の形式の名前を持つトピックに送信されます。これらのトピックは、事前に Pulsar 上に作成しておく必要があります。スキーマ作成イベントなどの行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、行以外のイベントをこのトピックに送信し、変更ログを収集します。
データベースごとに変更ログをPulsarトピックに配信する
変更フィードが各データベースのすべてのPulsarメッセージを専用のPulsarトピックに送信するようにするには、このモードを選択してください。トピックのプレフィックスとサフィックスを設定することで、データベースのトピック名を指定できます。
解決済みTsイベントなどの行以外のイベントの変更ログについては、 「デフォルトトピック名」フィールドにトピック名を指定できます。変更フィードは、行以外のイベントをこのトピックに送信し、そのような変更ログを収集します。
Pulsarはマルチテナントをサポートしているため、デフォルト設定と異なる場合は、 PulsarテナントとPulsarネームスペースを設定することもできます。
パーティション分散領域では、Pulsar メッセージの送信先パーティションを決定できます。すべてのテーブルに対して単一のパーティションディスパッチャを定義することも、テーブルごとに異なるパーティションディスパッチャを定義することもできます。TiDB Cloud、変更イベントを Pulsar パーティションに分散するための 4 つのルール オプションが提供されています。
主キーまたは一意インデックス
テーブルのPulsarメッセージを異なるパーティションに送信するように変更フィードを設定する場合は、この配信方法を選択してください。行変更ログの主キーまたはインデックス値によって、変更ログの送信先パーティションが決まります。この配信方法は、パーティション間のバランスを改善し、行レベルの順序性を確保します。
テーブル
変更フィードによってテーブルの Pulsar メッセージを 1 つの Pulsar パーティションに送信する場合は、この配信方法を選択してください。行の変更ログのテーブル名によって、変更ログの送信先パーティションが決まります。この配信方法はテーブルの順序性を確保しますが、パーティションのバランスが崩れる可能性があります。
タイムスタンプ
変更フィードがタイムスタンプに基づいて異なる Pulsar パーティションに Pulsar メッセージを送信する場合は、この配信方法を選択してください。行の変更ログの commitTs によって、変更ログが送信されるパーティションが決まります。この配信方法は、パーティションのバランスを改善し、各パーティションの順序性を確保します。ただし、データ項目の複数の変更が異なるパーティションに送信され、異なるコンシューマーのコンシューマー処理の進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは、複数のパーティションからのデータを消費する前に commitTs でソートする必要があります。
カラムの値
テーブルのPulsarメッセージを異なるパーティションに送信するように変更フィードを設定する場合は、この配信方法を選択してください。行の変更ログで指定された列の値によって、変更ログの送信先パーティションが決まります。この配信方法により、各パーティション内の順序が確保され、同じ列の値を持つ変更ログが同じパーティションに送信されることが保証されます。
[イベントの分割]エリアで、
UPDATEイベントを別々のDELETEとINSERTイベントに分割するか、生のUPDATEイベントとして保持するかを選択します。詳細については、 MySQL以外のシンクにおける、主キーまたは一意キーを分割したUPDATEイベント参照してください。「次へ」をクリックしてください。
ステップ4.仕様の設定とレビュー
仕様と名称のセクションで:
- チェンジフィードの複製容量単位(RCU)の数を指定します。
- 変更フィードの名前を入力してください。
変更フィードの設定をすべて確認してください。
- 問題が見つかった場合は、前の手順に戻って問題を解決できます。
- 問題がなければ、 「送信」をクリックして変更フィードを作成できます。