📣
TiDB Cloud Essential 开放公测中。此页面由 AI 自动翻译,英文原文请见此处。

事务限制

本文档简要介绍了 TiDB 中的事务限制。

隔离级别

TiDB 支持的隔离级别有 RC (Read Committed)SI (Snapshot Isolation),其中 SI 基本等同于 RR (Repeatable Read) 隔离级别。

isolation level

Snapshot Isolation 可以避免幻读

TiDB 的 SI 隔离级别可以避免 幻读,但 ANSI/ISO SQL 标准中的 RR 隔离级别无法避免幻读。

下面两个例子展示了什么是 幻读

  • 示例 1:事务 A 首先根据查询获取了 n 行,然后 事务 B 修改了这 n 行之外的 m 行,或者新增了 m 行满足 事务 A 查询条件的数据。当 事务 A 再次执行该查询时,发现有 n+m 行满足条件。就像出现了幻影一样,因此称为 幻读

  • 示例 2:管理员 A 将数据库中所有学生的成绩从具体分数改为 ABCDE 等级,但此时 管理员 B 插入了一条具体分数的记录。当 管理员 A 完成修改后,发现还有一条记录(即 管理员 B 插入的)没有被修改。这也是 幻读

SI 无法避免写偏斜

TiDB 的 SI 隔离级别无法避免 写偏斜异常。你可以使用 SELECT FOR UPDATE 语法来避免 写偏斜异常。

写偏斜异常发生在两个并发事务分别读取了不同但相关的记录,然后每个事务都对自己读取到的数据进行了 update 并最终提交。如果这些相关记录之间存在不能被多个事务并发修改的约束,那么最终结果就会违反该约束。

例如,假设你正在为医院编写一个医生值班管理程序。医院通常要求同时有多名医生值班,但最低要求是至少有一名医生值班。只要该班次至少有一名医生值班,医生就可以请假(比如身体不适)。

现在有这样一种情况,医生 AliceBob 正在值班。两人都感觉不适,于是都决定请病假,并且恰好同时点击了请假按钮。我们用下面的程序来模拟这个过程:

    package com.pingcap.txn.write.skew; import com.zaxxer.hikari.HikariDataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class EffectWriteSkew { public static void main(String[] args) throws SQLException, InterruptedException { HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true"); ds.setUsername("root"); // prepare data Connection connection = ds.getConnection(); createDoctorTable(connection); createDoctor(connection, 1, "Alice", true, 123); createDoctor(connection, 2, "Bob", true, 123); createDoctor(connection, 3, "Carol", false, 123); Semaphore txn1Pass = new Semaphore(0); CountDownLatch countDownLatch = new CountDownLatch(2); ExecutorService threadPool = Executors.newFixedThreadPool(2); threadPool.execute(() -> { askForLeave(ds, txn1Pass, 1, 1); countDownLatch.countDown(); }); threadPool.execute(() -> { askForLeave(ds, txn1Pass, 2, 2); countDownLatch.countDown(); }); countDownLatch.await(); } public static void createDoctorTable(Connection connection) throws SQLException { connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" + " `id` int NOT NULL," + " `name` varchar(255) DEFAULT NULL," + " `on_call` tinyint DEFAULT NULL," + " `shift_id` int DEFAULT NULL," + " PRIMARY KEY (`id`)," + " KEY `idx_shift_id` (`shift_id`)" + " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"); } public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException { PreparedStatement insert = connection.prepareStatement( "INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)"); insert.setInt(1, id); insert.setString(2, name); insert.setBoolean(3, onCall); insert.setInt(4, shiftID); insert.executeUpdate(); } public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) { try(Connection connection = ds.getConnection()) { try { connection.setAutoCommit(false); String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ "; connection.createStatement().executeUpdate(comment + "BEGIN"); // Txn 1 should be waiting for txn 2 done if (txnID == 1) { txn1Pass.acquire(); } PreparedStatement currentOnCallQuery = connection.prepareStatement(comment + "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"); currentOnCallQuery.setBoolean(1, true); currentOnCallQuery.setInt(2, 123); ResultSet res = currentOnCallQuery.executeQuery(); if (!res.next()) { throw new RuntimeException("error query"); } else { int count = res.getInt("count"); if (count >= 2) { // If current on-call doctor has 2 or more, this doctor can leave PreparedStatement insert = connection.prepareStatement( comment + "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"); insert.setBoolean(1, false); insert.setInt(2, doctorID); insert.setInt(3, 123); insert.executeUpdate(); connection.commit(); } else { throw new RuntimeException("At least one doctor is on call"); } } // Txn 2 done, let txn 1 run again if (txnID == 2) { txn1Pass.release(); } } catch (Exception e) { // If got any error, you should roll back, data is priceless connection.rollback(); e.printStackTrace(); } } catch (SQLException e) { e.printStackTrace(); } } }

    要适配 TiDB 事务,请根据以下代码编写一个 util

    package main import ( "database/sql" "fmt" "sync" "github.com/pingcap-inc/tidb-example-golang/util" _ "github.com/go-sql-driver/mysql" ) func main() { openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) { writeSkew(db) }) } func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) { db, err := sql.Open(driverName, dataSourceName) if err != nil { panic(err) } defer db.Close() runnable(db) } func writeSkew(db *sql.DB) { err := prepareData(db) if err != nil { panic(err) } waitingChan, waitGroup := make(chan bool), sync.WaitGroup{} waitGroup.Add(1) go func() { defer waitGroup.Done() err = askForLeave(db, waitingChan, 1, 1) if err != nil { panic(err) } }() waitGroup.Add(1) go func() { defer waitGroup.Done() err = askForLeave(db, waitingChan, 2, 2) if err != nil { panic(err) } }() waitGroup.Wait() } func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error { txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID) if goroutineID != 1 { txnComment = "\t" + txnComment } txn, err := util.TiDBSqlBegin(db, true) if err != nil { return err } fmt.Println(txnComment + "start txn") // Txn 1 should be waiting until txn 2 is done. if goroutineID == 1 { <-waitingChan } txnFunc := func() error { queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?" rows, err := txn.Query(queryCurrentOnCall, true, 123) if err != nil { return err } defer rows.Close() fmt.Println(txnComment + queryCurrentOnCall + " successful") count := 0 if rows.Next() { err = rows.Scan(&count) if err != nil { return err } } rows.Close() if count < 2 { return fmt.Errorf("at least one doctor is on call") } shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?" _, err = txn.Exec(shift, false, doctorID, 123) if err == nil { fmt.Println(txnComment + shift + " successful") } return err } err = txnFunc() if err == nil { txn.Commit() fmt.Println("[runTxn] commit success") } else { txn.Rollback() fmt.Printf("[runTxn] got an error, rollback: %+v\n", err) } // Txn 2 is done. Let txn 1 run again. if goroutineID == 2 { waitingChan <- true } return nil } func prepareData(db *sql.DB) error { err := createDoctorTable(db) if err != nil { return err } err = createDoctor(db, 1, "Alice", true, 123) if err != nil { return err } err = createDoctor(db, 2, "Bob", true, 123) if err != nil { return err } err = createDoctor(db, 3, "Carol", false, 123) if err != nil { return err } return nil } func createDoctorTable(db *sql.DB) error { _, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" + " `id` int NOT NULL," + " `name` varchar(255) DEFAULT NULL," + " `on_call` tinyint DEFAULT NULL," + " `shift_id` int DEFAULT NULL," + " PRIMARY KEY (`id`)," + " KEY `idx_shift_id` (`shift_id`)" + " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") return err } func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error { _, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)", id, name, onCall, shiftID) return err }

    SQL 日志:

    /* txn 1 */ BEGIN /* txn 2 */ BEGIN /* txn 2 */ SELECT COUNT(*) as `count` FROM `doctors` WHERE `on_call` = 1 AND `shift_id` = 123 /* txn 2 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 2 AND `shift_id` = 123 /* txn 2 */ COMMIT /* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 and `shift_id` = 123 /* txn 1 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 1 AND `shift_id` = 123 /* txn 1 */ COMMIT

    运行结果:

    mysql> SELECT * FROM doctors; +----+-------+---------+----------+ | id | name | on_call | shift_id | +----+-------+---------+----------+ | 1 | Alice | 0 | 123 | | 2 | Bob | 0 | 123 | | 3 | Carol | 0 | 123 | +----+-------+---------+----------+

    在两个事务中,应用程序首先检查是否有两名或以上医生值班,如果是,则认为可以有一名医生请假。由于数据库使用快照隔离,两个检查都返回 2,因此两个事务都进入下一步。Alice 将自己的记录 update 为不值班,Bob 也做了同样的操作。两个事务都成功提交。现在没有医生值班,违反了至少有一名医生值班的要求。下图(引用自 Designing Data-Intensive Applications)展示了实际发生的情况。

    Write Skew

    现在我们将示例程序改为使用 SELECT FOR UPDATE,以避免写偏斜问题:

      package com.pingcap.txn.write.skew; import com.zaxxer.hikari.HikariDataSource; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class EffectWriteSkew { public static void main(String[] args) throws SQLException, InterruptedException { HikariDataSource ds = new HikariDataSource(); ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true"); ds.setUsername("root"); // prepare data Connection connection = ds.getConnection(); createDoctorTable(connection); createDoctor(connection, 1, "Alice", true, 123); createDoctor(connection, 2, "Bob", true, 123); createDoctor(connection, 3, "Carol", false, 123); Semaphore txn1Pass = new Semaphore(0); CountDownLatch countDownLatch = new CountDownLatch(2); ExecutorService threadPool = Executors.newFixedThreadPool(2); threadPool.execute(() -> { askForLeave(ds, txn1Pass, 1, 1); countDownLatch.countDown(); }); threadPool.execute(() -> { askForLeave(ds, txn1Pass, 2, 2); countDownLatch.countDown(); }); countDownLatch.await(); } public static void createDoctorTable(Connection connection) throws SQLException { connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" + " `id` int NOT NULL," + " `name` varchar(255) DEFAULT NULL," + " `on_call` tinyint DEFAULT NULL," + " `shift_id` int DEFAULT NULL," + " PRIMARY KEY (`id`)," + " KEY `idx_shift_id` (`shift_id`)" + " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"); } public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException { PreparedStatement insert = connection.prepareStatement( "INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)"); insert.setInt(1, id); insert.setString(2, name); insert.setBoolean(3, onCall); insert.setInt(4, shiftID); insert.executeUpdate(); } public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) { try(Connection connection = ds.getConnection()) { try { connection.setAutoCommit(false); String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ "; connection.createStatement().executeUpdate(comment + "BEGIN"); // Txn 1 should be waiting for txn 2 done if (txnID == 1) { txn1Pass.acquire(); } PreparedStatement currentOnCallQuery = connection.prepareStatement(comment + "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ? FOR UPDATE"); currentOnCallQuery.setBoolean(1, true); currentOnCallQuery.setInt(2, 123); ResultSet res = currentOnCallQuery.executeQuery(); if (!res.next()) { throw new RuntimeException("error query"); } else { int count = res.getInt("count"); if (count >= 2) { // If current on-call doctor has 2 or more, this doctor can leave PreparedStatement insert = connection.prepareStatement( comment + "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"); insert.setBoolean(1, false); insert.setInt(2, doctorID); insert.setInt(3, 123); insert.executeUpdate(); connection.commit(); } else { throw new RuntimeException("At least one doctor is on call"); } } // Txn 2 done, let txn 1 run again if (txnID == 2) { txn1Pass.release(); } } catch (Exception e) { // If got any error, you should roll back, data is priceless connection.rollback(); e.printStackTrace(); } } catch (SQLException e) { e.printStackTrace(); } } }
      package main import ( "database/sql" "fmt" "sync" "github.com/pingcap-inc/tidb-example-golang/util" _ "github.com/go-sql-driver/mysql" ) func main() { openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) { writeSkew(db) }) } func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) { db, err := sql.Open(driverName, dataSourceName) if err != nil { panic(err) } defer db.Close() runnable(db) } func writeSkew(db *sql.DB) { err := prepareData(db) if err != nil { panic(err) } waitingChan, waitGroup := make(chan bool), sync.WaitGroup{} waitGroup.Add(1) go func() { defer waitGroup.Done() err = askForLeave(db, waitingChan, 1, 1) if err != nil { panic(err) } }() waitGroup.Add(1) go func() { defer waitGroup.Done() err = askForLeave(db, waitingChan, 2, 2) if err != nil { panic(err) } }() waitGroup.Wait() } func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error { txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID) if goroutineID != 1 { txnComment = "\t" + txnComment } txn, err := util.TiDBSqlBegin(db, true) if err != nil { return err } fmt.Println(txnComment + "start txn") // Txn 1 should be waiting until txn 2 is done. if goroutineID == 1 { <-waitingChan } txnFunc := func() error { queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?" rows, err := txn.Query(queryCurrentOnCall, true, 123) if err != nil { return err } defer rows.Close() fmt.Println(txnComment + queryCurrentOnCall + " successful") count := 0 if rows.Next() { err = rows.Scan(&count) if err != nil { return err } } rows.Close() if count < 2 { return fmt.Errorf("at least one doctor is on call") } shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?" _, err = txn.Exec(shift, false, doctorID, 123) if err == nil { fmt.Println(txnComment + shift + " successful") } return err } err = txnFunc() if err == nil { txn.Commit() fmt.Println("[runTxn] commit success") } else { txn.Rollback() fmt.Printf("[runTxn] got an error, rollback: %+v\n", err) } // Txn 2 is done. Let txn 1 run again. if goroutineID == 2 { waitingChan <- true } return nil } func prepareData(db *sql.DB) error { err := createDoctorTable(db) if err != nil { return err } err = createDoctor(db, 1, "Alice", true, 123) if err != nil { return err } err = createDoctor(db, 2, "Bob", true, 123) if err != nil { return err } err = createDoctor(db, 3, "Carol", false, 123) if err != nil { return err } return nil } func createDoctorTable(db *sql.DB) error { _, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" + " `id` int NOT NULL," + " `name` varchar(255) DEFAULT NULL," + " `on_call` tinyint DEFAULT NULL," + " `shift_id` int DEFAULT NULL," + " PRIMARY KEY (`id`)," + " KEY `idx_shift_id` (`shift_id`)" + " ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin") return err } func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error { _, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)", id, name, onCall, shiftID) return err }

      SQL 日志:

      /* txn 1 */ BEGIN /* txn 2 */ BEGIN /* txn 2 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE on_call = 1 AND `shift_id` = 123 FOR UPDATE /* txn 2 */ UPDATE `doctors` SET on_call = 0 WHERE `id` = 2 AND `shift_id` = 123 /* txn 2 */ COMMIT /* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 FOR UPDATE At least one doctor is on call /* txn 1 */ ROLLBACK

      运行结果:

      mysql> SELECT * FROM doctors; +----+-------+---------+----------+ | id | name | on_call | shift_id | +----+-------+---------+----------+ | 1 | Alice | 1 | 123 | | 2 | Bob | 0 | 123 | | 3 | Carol | 0 | 123 | +----+-------+---------+----------+

      savepoint 和嵌套事务的支持

      Spring 支持的 PROPAGATION_NESTED 传播行为会触发嵌套事务,即在当前事务之外独立开启一个子事务。嵌套事务开始时会记录一个 savepoint。如果嵌套事务失败,则会回滚到 savepoint 状态。嵌套事务属于外部事务的一部分,最终会与外部事务一起提交。

      下面的例子演示了 savepoint 机制:

      mysql> BEGIN; mysql> INSERT INTO T2 VALUES(100); mysql> SAVEPOINT svp1; mysql> INSERT INTO T2 VALUES(200); mysql> ROLLBACK TO SAVEPOINT svp1; mysql> RELEASE SAVEPOINT svp1; mysql> COMMIT; mysql> SELECT * FROM T2; +------+ | ID | +------+ | 100 | +------+

      大事务限制

      基本原则是限制事务的大小。在 KV 层,TiDB 对单个事务的大小有限制。在 SQL 层,一行数据映射为一个 KV entry,每增加一个索引会多一个 KV entry。SQL 层的限制如下:

      注意,无论是大小限制还是行数限制,在事务执行过程中还需考虑编码和事务额外 key 的开销。为获得最佳性能,建议每 100 ~ 500 行写入一次事务。

      自动提交的 SELECT FOR UPDATE 语句不会等待锁

      目前,自动提交的 SELECT FOR UPDATE 语句不会加锁。下图展示了在两个独立会话中的效果:

      The situation in TiDB

      这是已知的与 MySQL 不兼容的问题。你可以通过显式使用 BEGIN;COMMIT; 语句来解决该问题。

      需要帮助?

      DiscordSlack 社区提问,或提交支持工单

      文档内容是否有帮助?