トランザクション制限
この文書では、TiDBにおけるトランザクション制約について簡単に説明します。
隔離レベル
TiDBがサポートする分離レベルは、RC(Read Committed)とSI(Snapshot Isolation)であり、 SIは基本的にRR(Repeatable Read)分離レベルと同等です。
スナップショット分離により、ファントムリードを回避できます。
TiDB のSI分離レベルではファントム リードを回避できますが、ANSI/ISO SQL 標準のRRでは回避できません。
以下の2つの例は、ファントムリードがどのようなものかを示しています。
例 1:トランザクションA は、まずクエリに従って
n行を取得し、次にトランザクションB は、これらのm行以外のn行を変更するか、トランザクションAのクエリに一致するm行を追加します。トランザクションAが再度クエリを実行すると、条件に一致するn+m行が存在することがわかります。これはファントムのようなものなので、ファントム リードと呼ばれます。例2:管理者Aがデータベース内のすべての学生の成績を特定の点数からABCDEの成績に変更しますが、このとき管理者Bが特定の点数のレコードを挿入します。管理者Aが変更を終え、まだ変更されていないレコード(管理者Bが挿入したレコード)が残っていることに気付いた場合、それはファントムリードです。
SIは書き込みスキューを回避できない
TiDBのSI分離レベルでは、書き込みスキュー例外を回避できません。書き込みスキュー例外を回避するには、 SELECT FOR UPDATE構文を使用してください。
書き込みスキュー例外は、2つの同時実行トランザクションが異なる関連性のあるレコードを読み取り、各トランザクションが読み取ったデータを更新して最終的にトランザクションをコミットしたときに発生します。これらの関連レコード間に、複数のトランザクションによって同時に変更できない制約が存在する場合、最終結果はその制約に違反することになります。
例えば、病院の医師の勤務シフト管理プログラムを作成するとします。病院では通常、複数の医師が同時に待機する必要がありますが、最低限必要なのは少なくとも1人の医師が待機していることです。医師は、少なくとも1人の医師が待機している限り、勤務をキャンセルすることができます(例えば、体調不良の場合など)。
現在、医師AliceとBobが当直中です。2人とも体調が悪く、病気休暇を取ることにしました。偶然にも、2人は同時にボタンをクリックしました。次のプログラムでこのプロセスをシミュレートしてみましょう。
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint DEFAULT NULL," +
" `shift_id` int DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting for txn 2 done
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 done, let txn 1 run again
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
TiDB トランザクションを適応させるには、次のコードに従ってユーティリティを作成します。
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint DEFAULT NULL," +
" `shift_id` int DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQLログ:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) as `count` FROM `doctors` WHERE `on_call` = 1 AND `shift_id` = 123
/* txn 2 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 and `shift_id` = 123
/* txn 1 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 1 AND `shift_id` = 123
/* txn 1 */ COMMIT
実行結果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 0 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
どちらのトランザクションでも、アプリケーションはまず 2 人以上の医師が待機しているかどうかを確認します。待機している場合は、1 人の医師が安全に休暇を取れると想定します。データベースはスナップショット分離を使用しているため、どちらのチェックも2を返すため、両方のトランザクションは次のステージに進みます。 Alice自分のレコードを非番に更新し、 Bobも同様に更新します。両方のトランザクションは正常にコミットされます。これで、待機中の医師がいなくなり、少なくとも 1 人の医師が待機している必要があるという要件に違反します。次の図 ( Designing Data-Intensive Applicationsから引用) は、実際に何が起こるかを示しています。
それでは、書き込みのずれの問題を回避するために、サンプルプログラムをSELECT FOR UPDATEを使用するように変更してみましょう。
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint DEFAULT NULL," +
" `shift_id` int DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting for txn 2 done
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ? FOR UPDATE");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 done, let txn 1 run again
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint DEFAULT NULL," +
" `shift_id` int DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQLログ:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE on_call = 1 AND `shift_id` = 123 FOR UPDATE
/* txn 2 */ UPDATE `doctors` SET on_call = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 FOR UPDATE
At least one doctor is on call
/* txn 1 */ ROLLBACK
実行結果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 1 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
savepointとネストされたトランザクションのサポート
注記:
TiDBはv6.2.0以降、
savepoint機能をサポートしています。TiDBのバージョンがv6.2.0より前の場合、PROPAGATION_NESTEDの動作はサポートされていません。v6.2.0以降のバージョンにアップグレードすることをお勧めします。TiDBのアップグレードが不可能な場合、アプリケーションがPROPAGATION_NESTED伝播動作を使用するJava Springフレームワークに基づいている場合は、ネストされたトランザクションのロジックを削除するようにアプリケーション側で対応する必要があります。
SpringがサポートするPROPAGATION_NESTED伝播動作は、ネストされたトランザクションをトリガーします。これは、現在のトランザクションとは独立して開始される子トランザクションです。ネストされたトランザクションが開始されると、 savepointが記録されます。ネストされたトランザクションが失敗した場合、トランザクションはsavepointの状態にロールバックされます。ネストされたトランザクションは外側のトランザクションの一部であり、外側のトランザクションと一緒にコミットされます。
以下の例はsavepointメカニズムを示しています。
mysql> BEGIN;
mysql> INSERT INTO T2 VALUES(100);
mysql> SAVEPOINT svp1;
mysql> INSERT INTO T2 VALUES(200);
mysql> ROLLBACK TO SAVEPOINT svp1;
mysql> RELEASE SAVEPOINT svp1;
mysql> COMMIT;
mysql> SELECT * FROM T2;
+------+
| ID |
+------+
| 100 |
+------+
大口取引制限
基本原則は、トランザクションのサイズを制限することです。KVレベルでは、TiDBは単一トランザクションのサイズに制限を設けています。SQLレベルでは、1行のデータが1つのKVエントリにマッピングされ、インデックスを追加するたびに1つのKVエントリが追加されます。SQLレベルでの制限は以下のとおりです。
1行あたりのレコードの最大サイズは120MiBです。
- TiDB v4.0.10 以降、v4.0.x バージョン、および TiDB v5.0.0 以降のバージョンでは、tidb-server の
performance.txn-entry-size-limit設定パラメータを使用して調整できます。v4.0.10 より前のバージョンでは、値は6 MBです。 - バージョン7.6.0以降では、
tidb_txn_entry_size_limitシステム変数を使用して、この設定項目の値を動的に変更できます。
- TiDB v4.0.10 以降、v4.0.x バージョン、および TiDB v5.0.0 以降のバージョンでは、tidb-server の
単一トランザクションでサポートされる最大サイズは1 TiBです。
- TiDB v4.0 以降のバージョンでは、
performance.txn-total-size-limitで設定できます。以前のバージョンでは、値は100 MBです。 - TiDB v6.5.0以降のバージョンでは、この設定は推奨されなくなりました。詳細については、
performance.txn-total-size-limit参照してください。
- TiDB v4.0 以降のバージョンでは、
サイズ制限と行数制限の両方において、トランザクション実行時のエンコード処理とトランザクション用の追加キーのオーバーヘッドも考慮する必要があることに注意してください。最適なパフォーマンスを実現するには、100~500行ごとに1つのトランザクションを書き込むことをお勧めします。
自動コミットされたSELECT FOR UPDATEステートメントはロックを待機しません
現在、自動コミットされたSELECT FOR UPDATEステートメントにはロックが追加されません。次のスクリーンショットは、2 つの別々のセッションでの影響を示しています。
これはMySQLとの互換性に関する既知の問題です。 BEGIN;COMMIT;ステートメントを明示的に使用することで、この問題を解決できます。
お困りですか?
- 不和or スラックコミュニティに質問してください。
- TiDB Cloudのサポートチケットを送信してください
- TiDB Self-Managedのサポートチケットを送信してください


