アパッチパルサーに沈む
このドキュメントでは、 TiDB Cloudから Apache Pulsar にデータをストリーミングするための変更フィードを作成する方法について説明します。
注記:
- changefeed 機能を使用して Apache Pulsar にデータを複製するには、 TiDB Cloud Dedicated クラスターのバージョンが v7.5.1 以降であることを確認してください。
- TiDB Cloudサーバーレス クラスターの場合、changefeed 機能は使用できません。
制限
- TiDB Cloudクラスターごとに、最大 100 個の変更フィードを作成できます。
- 現在、 TiDB Cloud は、Pulsar ブローカーに接続するための自己署名 TLS 証明書のアップロードをサポートしていません。
- TiDB Cloud は、変更フィードを確立するために TiCDC を使用するため、同じTiCDCとしての制限持ちます。
- レプリケートするテーブルに主キーまたは NULL 以外の一意のインデックスがない場合、レプリケーション中に一意の制約がないと、再試行シナリオによっては下流に重複したデータが挿入される可能性があります。
- 現在、TiCDC は Pulsar トピックを自動的に作成しません。トピックにイベントをディスパッチする前に、そのトピックが Pulsar に存在することを確認してください。
前提条件
Apache Pulsar にデータをストリーミングするための変更フィードを作成する前に、次の前提条件を完了する必要があります。
- ネットワーク接続を設定する
- Pulsar ACL認証の権限を追加する
- Apache Pulsarでトピックを手動で作成するか、Apache Pulsarブローカー構成で
allowAutoTopicCreation
有効にします。
ネットワーク
TiDB クラスターが Apache Pulsar サービスに接続できることを確認してください。以下のいずれかの接続方法を選択できます。
- VPC ピアリング: 潜在的な VPC CIDR の競合を回避するためのネットワーク計画とセキュリティ上の懸念の考慮が必要です。
- パブリックIP: PulsarがパブリックIPをアドバタイズする場合のセットアップに適しています。この方法は本番環境には推奨されず、セキュリティ上の懸念事項を慎重に検討する必要があります。
- VPC Peering
- Public IP
Apache Pulsar サービスがインターネットにアクセスできない AWS VPC にある場合は、次の手順を実行します。
Apache Pulsar サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する 。
Apache Pulsar サービスが関連付けられているセキュリティ グループの受信ルールを変更します。
TiDB Cloudクラスターが配置されているリージョンの CIDR をインバウンドルールに追加する必要があります。CIDR はVPC ピアリングページで確認できます。これにより、トラフィックが TiDB クラスターから Pulsar ブローカーに流れるようになります。
Apache Pulsar URL にホスト名が含まれている場合は、 TiDB Cloud がApache Pulsar ブローカーの DNS ホスト名を解決できるようにする必要があります。
- VPC ピアリング接続の DNS 解決を有効にするの手順に従います。
- Accepter DNS 解決オプションを有効にします。
Apache Pulsar サービスがインターネットにアクセスできない Google Cloud VPC にある場合は、次の手順に従います。
Apache Pulsar サービスの VPC と TiDB クラスター間の接続はVPCピアリング接続を設定する 。
Apache Pulsar が配置されている VPC の Ingress ファイアウォール ルールを変更します。
TiDB Cloudクラスターが配置されているリージョンの CIDR を、Ingress ファイアウォールルールに追加する必要があります。CIDR はVPC ピアリングページで確認できます。これにより、TiDB クラスターから Pulsar ブローカーへのトラフィックが許可される場合があります。
Apache Pulsar サービスにパブリック IP アクセスを提供する場合は、すべての Pulsar ブローカーにパブリック IP アドレスを割り当てます。
本番環境でパブリック IP を使用することはお勧めしません。
Apache Pulsarでトピックを作成する
現在、TiCDCはPulsarトピックを自動的に作成しません。変更フィードを作成する前に、Pulsarで必要なトピックを作成する必要があります。トピックの数と名前は、選択した配信モードによって異なります。
- すべての Pulsar メッセージを 1 つのトピックに配信するには、希望する名前で 1 つのトピックを作成します。
- 各テーブルの Pulsar メッセージを専用のトピックに配信するには、複製するテーブルごとに
<Topic Prefix><DatabaseName><Separator><TableName><Topic Suffix>
形式でトピックを作成します。 - データベースの Pulsar メッセージを専用のトピックに配信するには、複製するデータベースごとに
<Topic Prefix><DatabaseName><Topic Suffix>
形式でトピックを作成します。
構成によっては、行以外のイベント (スキーマの変更など) のデフォルト トピックが必要になる場合もあります。
詳細については、Apache Pulsar ドキュメントのトピックの作成方法参照してください。
ステップ1. Apache PulsarのChangefeedページを開く
- TiDB Cloudコンソールにログインします。
- 変更フィード イベントのソースとなる TiDB クラスターのクラスター概要ページに移動し、左側のナビゲーション ペインで[データ] > [変更フィード]をクリックします。
- 「Changefeed の作成」をクリックします。
ステップ2. チェンジフィードの送信先を設定する
宛先セクションで、 Pulsarを選択します。
接続セクションで、次の情報を入力します。
- 宛先プロトコル: PulsarまたはPulsar+SSLを選択します。
- 接続方法: Pulsar エンドポイントへの接続方法に応じて、 VPC ピアリングまたはパブリックを選択します。
- Pulsarブローカー: Pulsarブローカーのエンドポイントを入力します。ポートとドメインまたはIPアドレスをコロンで区切ってください(例:
example.org:6650
)。
「認証」セクションで、Pulsarの認証設定に応じて「認証タイプ」オプションを選択します。選択内容に基づいて、要求された認証情報を入力します。
オプション: [詳細設定]セクションで、追加の設定を構成します。
- 圧縮: この変更フィード内のデータに対してオプションの圧縮アルゴリズムを選択します。
- バッチあたりの最大メッセージ数と最大公開遅延:Pulsar に送信されるイベントメッセージのバッチ処理を指定します。バッチあたりの最大メッセージ数はバッチあたりの最大メッセージ数を設定し、最大公開遅延はバッチを送信するまでの最大待機時間を設定します。
- 接続タイムアウト: Pulsar への TCP 接続を確立するためのタイムアウトを調整します。
- 操作タイムアウト: TiCDC Pulsar クライアントを使用して操作を開始する際のタイムアウトを調整します。
- 送信タイムアウト: TiCDC Pulsar プロデューサーがメッセージを送信するまでのタイムアウトを調整します。
「次へ」をクリックしてネットワーク接続をテストします。テストが成功すると、次のステップに進みます。
ステップ3. 変更フィードのレプリケーションを構成する
テーブルフィルターをカスタマイズして、複製するテーブルをフィルタリングします。ルールの構文については、 テーブルフィルタルールを参照してください。
- フィルタールール: この列でフィルタールールを設定できます。デフォルトでは、すべてのテーブルを複製するルール
*.*
が設定されています。新しいルールを追加すると、 TiDB CloudはTiDB内のすべてのテーブルをクエリし、ルールに一致するテーブルのみを右側のボックスに表示されます。フィルタールールは最大100件まで追加できます。 - 有効なキーを持つテーブル: この列には、主キーや一意のインデックスなど、有効なキーを持つテーブルが表示されます。
- 有効なキーのないテーブル: この列には、主キーまたは一意キーを持たないテーブルが表示されます。これらのテーブルは、一意の識別子がないと、下流で重複イベントを処理する際にデータの不整合が発生する可能性があるため、レプリケーション中に問題が発生します。データの整合性を確保するには、レプリケーションを開始する前に、これらのテーブルに一意のキーまたは主キーを追加することをお勧めします。または、これらのテーブルを除外するフィルタールールを追加することもできます。例えば、ルール
"!test.tbl1"
を使用してテーブルtest.tbl1
を除外できます。
- フィルタールール: この列でフィルタールールを設定できます。デフォルトでは、すべてのテーブルを複製するルール
イベント フィルターをカスタマイズして、複製するイベントをフィルターします。
- 一致するテーブル: この列では、イベントフィルターを適用するテーブルを設定できます。ルールの構文は、前述の「テーブルフィルター」領域で使用した構文と同じです。変更フィードごとに最大10個のイベントフィルタールールを追加できます。
- 無視されるイベント: イベント フィルターが変更フィードから除外するイベントの種類を設定できます。
「レプリケーション開始位置」領域で、変更フィードが Pulsar にデータをレプリケートする開始点を選択します。
- 今からレプリケーションを開始します。変更フィードは現在の時点からデータのレプリケーションを開始します。
- 特定のTSOからレプリケーションを開始します。変更フィードは指定されたTSO以降のデータのレプリケーションを開始します。指定するTSOはガベージコレクションの安全ポイント以内である必要があります。
- 特定の時刻からレプリケーションを開始: チェンジフィードは指定されたタイムスタンプ以降のデータのレプリケーションを開始します。指定するタイムスタンプは、ガベージコレクションのセーフポイント内である必要があります。
「データ形式」領域で、希望する Pulsar メッセージの形式を選択します。
Canal-JSONは、解析が容易なプレーンなJSONテキスト形式です。詳細については、 TiCDC Canal-JSON プロトコルご覧ください。
Pulsarメッセージ本文にTiDB拡張フィールドを追加するには、 「TiDB拡張」オプションを有効にしてください。詳細については、 TiCDC Canal-JSON プロトコルの TiDB 拡張フィールド参照してください。
「トピック配布」領域で配布モードを選択し、モードに応じてトピック名の構成を入力します。
配布モードは、すべてのメッセージを 1 つのトピックに送信するか、テーブルまたはデータベースごとに特定のトピックに送信するかによって、変更フィードがイベント メッセージを Pulsar トピックに配布する方法を制御します。
注記:
ダウンストリームとしてPulsarを選択した場合、チェンジフィードはトピックを自動的に作成しません。必要なトピックは事前に作成する必要があります。
すべての変更ログを指定された Pulsar トピックに送信する
チェンジフィードですべてのメッセージを単一のPulsarトピックに送信したい場合は、このモードを選択してください。トピック名フィールドでトピック名を指定できます。
Pulsar Topicsにテーブルごとに変更ログを配布する
各テーブルのすべてのPulsarメッセージを専用のPulsarトピックに送信するように変更フィードを設定する場合は、このモードを選択してください。トピックプレフィックス、データベース名とテーブル名の間のセパレータ、トピックサフィックスを設定することで、テーブルのトピック名を指定できます。例えば、セパレータを
_
に設定すると、Pulsarメッセージは<Topic Prefix><DatabaseName>_<TableName><Topic Suffix>
という形式の名前を持つトピックに送信されます。これらのトピックは事前にPulsar上に作成しておく必要があります。スキーマ作成イベントなどの行以外のイベントの変更ログについては、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、行以外のイベントをこのトピックに送信して、そのような変更ログを収集します。
データベースごとに変更ログをPulsar Topicsに配布する
各データベースのすべてのPulsarメッセージを専用のPulsarトピックに送信するように変更フィードを設定する場合は、このモードを選択してください。トピックプレフィックスとトピックサフィックスを設定することで、データベースのトピック名を指定できます。
解決済みTsイベントなどの行以外のイベントの変更ログについては、 「デフォルトのトピック名」フィールドにトピック名を指定できます。変更フィードは、行以外のイベントをこのトピックに送信して、そのような変更ログを収集します。
Pulsar はマルチテナントをサポートしているため、デフォルトと異なる場合はPulsar テナントとPulsar 名前空間も設定する必要があります。
「パーティション分散」領域では、Pulsarメッセージの送信先パーティションを指定できます。すべてのテーブルに対して単一のパーティションディスパッチャを定義することも、テーブルごとに異なるパーティションディスパッチャを定義することもできます。TiDB TiDB Cloudは、変更イベントをPulsarパーティションに分散するための4つのルールオプションを提供しています。
主キーまたは一意のインデックス
変更フィードによってテーブルのPulsarメッセージを複数のパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの主キーまたはインデックス値によって、変更ログが送信されるパーティションが決まります。この分散方法により、パーティションのバランスが向上し、行レベルの順序性が確保されます。
テーブル
変更フィードでテーブルのPulsarメッセージを単一のPulsarパーティションに送信する場合は、この分散方法を選択してください。行変更ログのテーブル名によって、変更ログが送信されるパーティションが決まります。この分散方法はテーブルの整列性を保証しますが、パーティションの不均衡が生じる可能性があります。
タイムスタンプ
タイムスタンプに基づいて、変更フィードから異なるPulsarパーティションにPulsarメッセージを送信する場合は、この分散方法を選択してください。行変更ログのコミットTによって、変更ログが送信されるパーティションが決まります。この分散方法は、パーティションバランスを改善し、各パーティションの秩序性を確保します。ただし、データ項目の複数の変更が異なるパーティションに送信され、各コンシューマーの進行状況が異なる場合があり、データの不整合が発生する可能性があります。そのため、コンシューマーは複数のパーティションからデータを消費する前に、コミットTでソートする必要があります。
カラムの値
変更フィードによってテーブルのPulsarメッセージを複数のパーティションに送信する場合は、この分散方法を選択してください。行の変更ログの指定された列値によって、変更ログが送信されるパーティションが決まります。この分散方法により、各パーティションの順序性が確保され、同じ列値を持つ変更ログが同じパーティションに送信されることが保証されます。
「次へ」をクリックします。
ステップ4. 仕様の設定とレビュー
仕様と名前のセクションでは、次の操作を行います。
- チェンジフィードの数をレプリケーション容量単位 (RCU)に指定します。
- 変更フィードの名前を入力します。
すべての changefeed 構成を確認します。
- 問題が見つかった場合は、前の手順に戻って問題を解決することができます。
- 問題がなければ、 「送信」をクリックして変更フィードを作成できます。