TiSparkユーザーガイド
TiSparkは、複雑なOLAPクエリに応答するためにTiDB/TiKV上でApacheSparkを実行するために構築されたシンレイヤーです。 Sparkプラットフォームと分散TiKVクラスタの両方を活用し、分散OLTPデータベースであるTiDBにシームレスに接着して、オンライントランザクションと分析の両方のワンストップソリューションとして機能するハイブリッドトランザクション/分析処理(HTAP)ソリューションを提供します。 。
TiFlashは、HTAPを有効にするもう1つのツールです。 TiFlashとTiSparkはどちらも、複数のホストを使用してOLTPデータに対してOLAPクエリを実行できます。 TiFlashはデータを列形式で保存するため、より効率的な分析クエリが可能になります。 TiFlashとTiSparkは一緒に使用できます。
TiSparkは、TiKVクラスタとPDクラスタに依存しています。また、Sparkクラスタをセットアップする必要があります。このドキュメントでは、TiSparkのセットアップと使用方法について簡単に紹介します。 ApacheSparkの基本的な知識が必要です。詳細については、 ApacheSparkのWebサイトを参照してください。
Spark Catalyst Engineと緊密に統合されたTiSparkは、コンピューティングを正確に制御します。これにより、SparkはTiKVからデータを効率的に読み取ることができます。 TiSparkは、高速ポイントクエリを可能にするインデックスシークもサポートしています。
TiSparkは、Spark SQLによって処理されるデータの量を減らすために、コンピューティングをTiKVにプッシュすることにより、データクエリを高速化します。一方、TiSparkは、TiDBの組み込み統計を使用して、最適なクエリプランを選択できます。
TiSparkとTiDBを使用すると、ETLを構築および保守することなく、同じプラットフォームでトランザクションタスクと分析タスクの両方を実行できます。これにより、システムアーキテクチャが簡素化され、メンテナンスのコストが削減されます。
Sparkエコシステムのツールを使用して、TiDBでのデータ処理を行うことができます。
- TiSpark:データ分析とETL
- TiKV:データ検索
- スケジューリングシステム:レポート生成
また、TiSparkはTiKVへの分散書き込みをサポートしています。 SparkおよびJDBCを使用したTiDBへの書き込みと比較して、TiKVへの分散書き込みはトランザクションを実装でき(すべてのデータが正常に書き込まれるか、すべての書き込みが失敗します)、書き込みが高速になります。
環境設定
次の表に、サポートされているTiSparkバージョンの互換性情報を示します。必要に応じてTiSparkバージョンを選択できます。
TiSparkバージョン | TiDB、TiKV、およびPDバージョン | Sparkバージョン | Scalaバージョン |
---|---|---|---|
2.4.x-scala_2.11 | 5.x、4.x | 2.3.x、2.4.x | 2.11 |
2.4.x-scala_2.12 | 5.x、4.x | 2.4.x | 2.12 |
2.5.x | 5.x、4.x | 3.0.x、3.1.x | 2.12 |
TiSparkは、YARN、Mesos、Standaloneなどの任意のSparkモードで実行されます。
推奨される構成
このセクションでは、TiKVとTiSparkの独立した展開、SparkとTiSparkの独立した展開、およびTiKVとTiSparkの同時展開の推奨構成について説明します。
TiUPを使用してTiSparkを展開する方法の詳細については、 TiSpark展開トポロジも参照してください。
TiKVとTiSparkの独立した展開のConfiguration / コンフィグレーション
TiKVとTiSparkを独立して展開するには、次の推奨事項を参照することをお勧めします。
- ハードウェア構成
- 一般的な目的については、TiDBおよびTiKVハードウェア構成推奨事項を参照してください。
- 使用法が分析シナリオに重点を置いている場合は、TiKVノードのメモリを少なくとも64Gに増やすことができます。
SparkとTiSparkの独立した展開のConfiguration / コンフィグレーション
ハードウェアの推奨事項の詳細については、 Spark公式サイトを参照してください。以下は、TiSpark構成の概要です。
Sparkに32Gメモリを割り当て、オペレーティングシステムとバッファキャッシュ用にメモリの少なくとも25%を予約することをお勧めします。
Sparkのマシンごとに少なくとも8〜16コアをプロビジョニングすることをお勧めします。最初に、すべてのCPUコアをSparkに割り当てることができます。
共同展開されたTiKVとTiSparkのConfiguration / コンフィグレーション
TiKVとTiSparkを共同展開するには、TiSparkに必要なリソースをTiKVの予約済みリソースに追加し、メモリの25%をシステムに割り当てます。
TiSparkクラスタをデプロイします
TiSparkのjarパッケージここをダウンロードし、 $SPARKPATH/jars
フォルダーに配置します。
ノート:
TiSpark v2.1.x以前のバージョンのファイル名は、
tispark-core-2.1.9-spark_2.4-jar-with-dependencies.jar
のようになります。必要なバージョンの正確なファイル名については、 GitHubのリリースページを確認してください。
以下は、TiSparkv2.4.1をインストールする方法の簡単な例です。
wget https://github.com/pingcap/tispark/releases/download/v2.4.1/tispark-assembly-2.4.1.jar
mv tispark-assembly-2.4.1.jar $SPARKPATH/jars/
spark-defaults.conf.template
ファイルからspark-defaults.conf
をコピーします。
cp conf/spark-defaults.conf.template conf/spark-defaults.conf
spark-defaults.conf
ファイルに、次の行を追加します。
spark.tispark.pd.addresses $pd_host:$pd_port
spark.sql.extensions org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses
の構成では、複数のPDサーバーを配置できます。それぞれのポート番号を指定します。たとえば、ポート2379を使用して10.16.20.1,10.16.20.2,10.16.20.3
に複数のPDサーバーがある場合は、 10.16.20.1:2379,10.16.20.2:2379,10.16.20.3:2379
として配置します。
ノート:
TiSparkが正しく通信できなかった場合は、ファイアウォールの構成を確認してください。ファイアウォールルールを調整するか、必要に応じて無効にすることができます。
既存のSparkクラスタにTiSparkをデプロイ
既存のSparkクラスタでTiSparkを実行する場合、クラスタを再起動する必要はありません。 Sparkの--jars
パラメーターを使用して、依存関係としてTiSparkを導入できます。
spark-shell --jars $TISPARK_FOLDER/tispark-${name_with_version}.jar
SparkクラスタなしでTiSparkをデプロイ
Sparkクラスタがない場合は、スタンドアロンモードを使用することをお勧めします。詳細については、 Sparkスタンドアロンを参照してください。問題が発生した場合は、 Spark公式サイトを参照してください。そして、GitHubで問題を提出するへようこそ。
SparkShellとSparkSQLを使用する
上記のように、TiSparkクラスタを正常に開始したと想定します。次に、 tpch
データベースのlineitem
という名前のテーブルでOLAP分析にSparkSQLを使用する方法について説明します。
192.168.1.101
で利用可能なTiDBサーバーを介してテストデータを生成するには:
tiup bench tpch prepare --host 192.168.1.101 --user root
PDノードが192.168.1.100
、ポート2379
にあると仮定して、次のコマンドを$SPARK_HOME/conf/spark-defaults.conf
に追加します。
spark.tispark.pd.addresses 192.168.1.100:2379
spark.sql.extensions org.apache.spark.sql.TiExtensions
SparkShellを起動します。
./bin/spark-shell
次に、ネイティブApache Sparkの場合と同様に、SparkShellに次のコマンドを入力します。
spark.sql("use tpch")
spark.sql("select count(*) from lineitem").show
結果は次のとおりです。
+-------------+
| Count (1) |
+-------------+
| 2000 |
+-------------+
Spark Shellの他に、SparkSQLも利用できます。 Spark SQLを使用するには、次のコマンドを実行します。
./bin/spark-sql
同じクエリを実行できます。
use tpch;
select count(*) from lineitem;
結果は次のとおりです。
2000
Time taken: 0.673 seconds, Fetched 1 row(s)
ThriftServerでJDBCサポートを使用する
JDBCサポートなしでSparkShellまたはSparkSQLを使用できます。ただし、beelineなどのツールにはJDBCサポートが必要です。 JDBCサポートは、Thriftサーバーによって提供されます。 SparkのThriftサーバーを使用するには、次のコマンドを実行します。
./sbin/start-thriftserver.sh
JDBCをThriftサーバーに接続するには、beelineなどのJDBC対応ツールを使用できます。
たとえば、beelineで使用するには:
./bin/beeline jdbc:hive2://localhost:10000
次のメッセージが表示された場合は、beelineが正常に有効になっています。
Beeline version 1.2.2 by Apache Hive
次に、クエリコマンドを実行できます。
1: jdbc:hive2://localhost:10000> use testdb;
+---------+--+
| Result |
+---------+--+
+---------+--+
No rows selected (0.013 seconds)
select count(*) from account;
+-----------+--+
| count(1) |
+-----------+--+
| 1000000 |
+-----------+--+
1 row selected (1.97 seconds)
TiSparkをHiveと一緒に使用する
TiSparkはHiveと一緒に使用できます。 Sparkを起動する前に、 HADOOP_CONF_DIR
の環境変数をHadoop構成フォルダーに設定し、 hive-site.xml
をspark/conf
フォルダーにコピーする必要があります。
val tisparkDF = spark.sql("select * from tispark_table").toDF
tisparkDF.write.saveAsTable("hive_table") // save table to hive
spark.sql("select * from hive_table a, tispark_table b where a.col1 = b.col1").show // join table across Hive and Tispark
TiSparkを使用してDataFrameをTiDBにバッチ書き込みします
v2.3以降、TiSparkはデータフレームのTiDBクラスターへのバッチ書き込みをネイティブにサポートします。この書き込みモードは、TiKVの2フェーズコミットプロトコルを介して実装されます。
Spark + JDBCを介した書き込みと比較して、TiSparkバッチ書き込みには次の利点があります。
比較する側面 | TiSparkバッチ書き込み | Spark+JDBC書き込み |
---|---|---|
アトミシティ | DataFrameはすべて正常に書き込まれるか、すべて失敗します。 | Sparkタスクが失敗し、書き込みプロセス中に終了した場合、データの一部が正常に書き込まれている可能性があります。 |
隔離 | 書き込みプロセス中、書き込まれているデータは他のトランザクションからは見えません。 | 書き込みプロセス中に、正常に書き込まれたデータの一部が他のトランザクションに表示されます。 |
エラー回復 | バッチ書き込みが失敗した場合は、Sparkを再実行するだけで済みます。 | べき等を達成するには、アプリケーションが必要です。たとえば、バッチ書き込みが失敗した場合は、正常に書き込まれたデータの一部をクリーンアップして、Sparkを再実行する必要があります。タスクの再試行によるデータの重複を防ぐには、 spark.task.maxFailures=1 を設定する必要があります。 |
スピード | データはTiKVに直接書き込まれます。TiKVの方が高速です。 | データはTiDBを介してTiKVに書き込まれ、速度に影響します。 |
次の例は、scalaAPIを介してTiSparkを使用してデータをバッチ書き込みする方法を示しています。
// select data to write
val df = spark.sql("select * from tpch.ORDERS")
// write data to tidb
df.write.
format("tidb").
option("tidb.addr", "127.0.0.1").
option("tidb.port", "4000").
option("tidb.user", "root").
option("tidb.password", "").
option("database", "tpch").
option("table", "target_orders").
mode("append").
save()
書き込むデータ量が多く、書き込み時間が10分を超える場合は、GC時間が書き込み時間より長くなるようにする必要があります。
UPDATE mysql.tidb SET VARIABLE_VALUE="6h" WHERE VARIABLE_NAME="tikv_gc_life_time";
詳細はこのドキュメントを参照してください。
JDBCを使用してSparkデータフレームをTiDBにロードします
TiSparkを使用してDataFrameをTiDBクラスタにバッチ書き込みすることに加えて、データ書き込みにSparkのネイティブJDBCサポートを使用することもできます。
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
val customer = spark.sql("select * from customer limit 100000")
// You might repartition the source to make it balance across nodes
// and increase the concurrency.
val df = customer.repartition(32)
df.write
.mode(saveMode = "append")
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
// Replace the host and port with that of your own and be sure to use the rewrite batch
.option("url", "jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true")
.option("useSSL", "false")
// As tested, 150 is good practice
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150)
.option("dbtable", s"cust_test_select") // database name and table name here
.option("isolationLevel", "NONE") // recommended to set isolationLevel to NONE if you have a large DF to load.
.option("user", "root") // TiDB user here
.save()
TiDB OOMにつながる可能性のある大規模な単一トランザクションを回避するために、 isolationLevel
からNONE
に設定することをお勧めします。
ノート:
JDBCを使用する場合、デフォルト値の
isolationLevel
はREAD_UNCOMMITTED
です。これにより、サポートされていない分離レベルのトランザクションのエラーが発生します。isolationLevel
の値を設定することをお勧めしNONE
。
統計情報
TiSparkは、次の項目にTiDB統計情報を使用します。
- 推定最小コストでクエリプランのどのインデックスを使用するかを決定します。
- 効率的な放送参加を可能にする小さなテーブル放送。
TiSparkで統計情報を使用する場合は、最初に、関連するテーブルがすでに分析されていることを確認する必要があります。 テーブルを分析する方法についてもっと読む。
TiSpark 2.0以降、統計情報はデフォルトで自動ロードされます。
安全
TiSpark v2.5.0以降のバージョンを使用している場合は、TiDBを使用してTiSparkユーザーを認証および承認できます。
認証および承認機能はデフォルトで無効になっています。これを有効にするには、次の構成をSpark構成ファイルspark-defaults.conf
に追加します。
// Enable authentication and authorization
spark.sql.auth.enable true
// Configure TiDB information
spark.sql.tidb.addr $your_tidb_server_address
spark.sql.tidb.port $your_tidb_server_port
spark.sql.tidb.user $your_tidb_server_user
spark.sql.tidb.password $your_tidb_server_password
詳細については、 TiDBサーバーによる承認と認証を参照してください。
ノート:
認証および承認機能を有効にすると、TiSpark Spark SQLはデータソースとしてTiDBのみを使用できるため、他のデータソース(Hiveなど)に切り替えるとテーブルが非表示になります。
TiSpark FAQ
Q:既存のSpark / Hadoopクラスタとの共有リソースとは対照的に、独立したデプロイメントの長所/短所は何ですか?
A:個別のデプロイなしで既存のSparkクラスタを使用できますが、既存のクラスタがビジーの場合、TiSparkは目的の速度を達成できません。
Q:SparkをTiKVと混合できますか?
A:TiDBとTiKVが過負荷になり、重要なオンラインタスクを実行する場合は、TiSparkを個別にデプロイすることを検討してください。また、OLTPのネットワークリソースが危険にさらされてオンラインビジネスに影響を与えないように、さまざまなNICの使用を検討する必要があります。オンラインビジネスの要件が高くない場合、または負荷が十分に大きくない場合は、TiSparkとTiKVの展開を混在させることを検討できます。
Q:TiSparkを使用してSQLステートメントを実行するときにwarning:WARN ObjectStore:568 - Failed to get database
が返された場合、どうすればよいですか?
A:この警告は無視してかまいません。これは、Sparkがカタログに存在しない2つのデータベース( default
とglobal_temp
)を読み込もうとしたために発生します。この警告をミュートする場合は、 tispark/conf
のlog4j
ファイルにlog4j.logger.org.apache.hadoop.hive.metastore.ObjectStore=ERROR
を追加してlog4jを変更します。 Sparkの下のconfig
のlog4j
ファイルにパラメータを追加できます。サフィックスがtemplate
の場合、 mv
コマンドを使用してproperties
に変更できます。
Q:TiSparkを使用してSQLステートメントを実行するときにjava.sql.BatchUpdateException: Data Truncated
が返された場合、どうすればよいですか?
A:このエラーは、書き込まれたデータの長さがデータベースで定義されたデータ型の長さを超えているために発生します。フィールドの長さを確認し、それに応じて調整できます。
Q:TiSparkはデフォルトでHiveメタデータを読み取りますか?
A:デフォルトでは、TiSparkはhive-siteのHiveメタデータを読み取ることによってHiveデータベースを検索します。検索タスクが失敗した場合は、代わりにTiDBメタデータを読み取ってTiDBデータベースを検索します。
このデフォルトの動作が必要ない場合は、hive-siteでHiveメタデータを構成しないでください。
Q:TiSparkがSparkタスクを実行しているときにError:java.io.InvalidClassException: com.pingcap.tikv.region.TiRegion; local class incompatible: stream classdesc serialVersionUID ...
が返された場合、どうすればよいですか?
A:エラーメッセージはserialVersionUID
の競合を示しています。これは、 class
つとTiRegion
の異なるバージョンを使用したために発生します。 TiRegion
はTiSparkにのみ存在するため、TiSparkパッケージの複数のバージョンが使用される可能性があります。このエラーを修正するには、TiSpark依存関係のバージョンがクラスタのすべてのノード間で一貫していることを確認する必要があります。