取引トランザクション
このドキュメントでは、TiDB のトランザクション制約について簡単に説明します。
隔離レベル
TiDB でサポートされている分離レベルはRC (Read Committed)とSI (Snapshot Isolation)です。SIは基本的にRR (Repeatable Read)分離レベルと同等です。
スナップショット分離によりファントムリードを回避できる
TiDB の分離レベルSI
ではファントム リードを回避できますが、ANSI/ISO SQL 標準のRR
では回避できません。
次の 2 つの例は、ファントム リードが何であるかを示しています。
例 1:トランザクションA は最初にクエリに従って
n
行を取得し、次にトランザクションB はこれらのn
行以外のm
行を変更するか、トランザクションAのクエリに一致するm
行を追加します。トランザクションA が再度クエリを実行すると、条件に一致する行がn+m
行あることがわかります。これはファントムのようなもので、ファントム リードと呼ばれます。例 2:管理者 A は、データベース内のすべての生徒の成績を特定のスコアから ABCDE の成績に変更しますが、管理者 B は、この時点で特定のスコアのレコードを挿入します。管理者 A が変更を終了し、まだ変更されていないレコード (管理者 Bによって挿入されたレコード) が残っていることに気付きます。これがファントム リードです。
SIは書き込みスキューを回避できない
TiDB の SI 分離レベルでは、書き込みスキュー例外を回避できません。書き込みスキュー例外を回避するには、 SELECT FOR UPDATE
構文を使用します。
書き込みスキュー例外は、2 つの同時トランザクションが異なるが関連するレコードを読み取り、各トランザクションが読み取ったデータを更新して最終的にトランザクションをコミットするときに発生します。これらの関連レコード間に、複数のトランザクションによって同時に変更できない制約がある場合、最終結果は制約に違反します。
たとえば、病院の医師シフト管理プログラムを作成するとします。病院では通常、複数の医師が同時に待機する必要がありますが、最低要件は少なくとも 1 人の医師が待機していることです。医師は、そのシフト中に少なくとも 1 人の医師が待機している限り、シフトを中止できます (たとえば、体調が悪い場合)。
ここで、医師Alice
とBob
が待機している状況があります。2 人とも体調が悪く、病気休暇を取ることにしました。偶然、2 人は同時にボタンをクリックします。次のプログラムでこのプロセスをシミュレートしてみましょう。
- Java
- Golang
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting for txn 2 done
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 done, let txn 1 run again
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
TiDB トランザクションを適応させるには、次のコードに従ってユーティリティを記述します。
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQL ログ:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) as `count` FROM `doctors` WHERE `on_call` = 1 AND `shift_id` = 123
/* txn 2 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 and `shift_id` = 123
/* txn 1 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 1 AND `shift_id` = 123
/* txn 1 */ COMMIT
実行結果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 0 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
どちらのトランザクションでも、アプリケーションは最初に 2 人以上の医師が待機中かどうかをチェックします。待機中の場合、1 人の医師は安全に休暇を取ることができると想定します。データベースはスナップショット分離を使用しているため、両方のチェックで2
が返され、両方のトランザクションが次の段階に進みます。 Alice
彼女の記録を非番として更新し、 Bob
も同様に更新します。両方のトランザクションが正常にコミットされました。これで、少なくとも 1 人の医師が待機中であるという要件に違反する、勤務中の医師がいなくなりました。次の図 ( Designing Data-Intensive Applicationsから引用) は、実際に何が起こるかを示しています。
ここで、書き込みスキューの問題を回避するために、サンプル プログラムを変更してSELECT FOR UPDATE
使用します。
- Java
- Golang
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting for txn 2 done
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ? FOR UPDATE");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 done, let txn 1 run again
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQL ログ:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE on_call = 1 AND `shift_id` = 123 FOR UPDATE
/* txn 2 */ UPDATE `doctors` SET on_call = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 FOR UPDATE
At least one doctor is on call
/* txn 1 */ ROLLBACK
実行結果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 1 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
savepoint
とネストされたトランザクションのサポート
SpringでサポートされているPROPAGATION_NESTED
伝播動作は、ネストされたトランザクションをトリガーします。ネストされたトランザクションは、現在のトランザクションとは独立して開始される子トランザクションです。ネストされたトランザクションが開始すると、 savepoint
が記録されます。ネストされたトランザクションが失敗すると、トランザクションはsavepoint
状態にロールバックされます。ネストされたトランザクションは外部トランザクションの一部であり、外部トランザクションと一緒にコミットされます。
次の例は、 savepoint
メカニズムを示しています。
mysql> BEGIN;
mysql> INSERT INTO T2 VALUES(100);
mysql> SAVEPOINT svp1;
mysql> INSERT INTO T2 VALUES(200);
mysql> ROLLBACK TO SAVEPOINT svp1;
mysql> RELEASE SAVEPOINT svp1;
mysql> COMMIT;
mysql> SELECT * FROM T2;
+------+
| ID |
+------+
| 100 |
+------+
注記:
v6.2.0 以降、TiDB は
savepoint
機能をサポートしています。TiDB クラスターが v6.2.0 より前の場合、TiDB クラスターはPROPAGATION_NESTED
動作をサポートしていません。アプリケーションがPROPAGATION_NESTED
伝播動作を使用するJava Springフレームワークに基づいている場合は、アプリケーション側でそれを適応させて、ネストされたトランザクションのロジックを削除する必要があります。
大規模取引制限
基本的な原則は、トランザクションのサイズを制限することです。KV レベルでは、TiDB は単一のトランザクションのサイズに制限を設けています。SQL レベルでは、1 行のデータに 1 つの KV エントリがマップされ、インデックスを追加するごとに 1 つの KV エントリが追加されます。SQL レベルでの制限は次のとおりです。
- 単一行レコードの最大サイズは
120 MB
です。TiDB v5.0 以降のバージョンでは、これをperformance.txn-entry-size-limit
に設定できます。それより前のバージョンでは、値は6 MB
です。 - サポートされる最大単一トランザクション サイズは
10 GB
です。TiDB v4.0 以降のバージョンでは、これをperformance.txn-total-size-limit
に設定できます。それより前のバージョンでは、値は100 MB
です。
サイズ制限と行制限の両方について、トランザクション実行中にトランザクションのエンコードと追加キーのオーバーヘッドも考慮する必要があることに注意してください。最適なパフォーマンスを実現するには、100〜500行ごとに1つのトランザクションを書き込むことをお勧めします。
自動コミットされたSELECT FOR UPDATE
文はロックを待機しません。
現在、自動コミットされたSELECT FOR UPDATE
ステートメントにはロックは追加されません。その効果は次の図に示されています。
これは MySQL との既知の非互換性の問題です。明示的なBEGIN;COMMIT;
ステートメントを使用することでこの問題を解決できます。