- 关于 TiDB
- 快速上手
- 应用开发
- 概览
- 快速开始
- 示例程序
- 连接到 TiDB
- 数据库模式设计
- 数据写入
- 数据读取
- 事务
- 优化 SQL 性能
- 故障诊断
- 引用文档
- 云原生开发环境
- 部署标准集群
- 数据迁移
- 运维操作
- 监控与告警
- 故障诊断
- 性能调优
- 优化手册
- 配置调优
- SQL 性能调优
- SQL 性能调优概览
- 理解 TiDB 执行计划
- SQL 优化流程
- 控制执行计划
- 教程
- 同城多中心部署
- 两地三中心部署
- 同城两中心部署
- 读取历史数据
- 使用 Stale Read 功能读取历史数据(推荐)
- 使用系统变量
tidb_snapshot
读取历史数据
- 最佳实践
- Placement Rules 使用文档
- Load Base Split 使用文档
- Store Limit 使用文档
- TiDB 工具
- 功能概览
- 适用场景
- 工具下载
- TiUP
- 文档地图
- 概览
- 术语及核心概念
- TiUP 组件管理
- FAQ
- 故障排查
- TiUP 命令参考手册
- 命令概览
- TiUP 命令
- TiUP Cluster 命令
- TiUP Cluster 命令概览
- tiup cluster audit
- tiup cluster check
- tiup cluster clean
- tiup cluster deploy
- tiup cluster destroy
- tiup cluster disable
- tiup cluster display
- tiup cluster edit-config
- tiup cluster enable
- tiup cluster help
- tiup cluster import
- tiup cluster list
- tiup cluster patch
- tiup cluster prune
- tiup cluster reload
- tiup cluster rename
- tiup cluster replay
- tiup cluster restart
- tiup cluster scale-in
- tiup cluster scale-out
- tiup cluster start
- tiup cluster stop
- tiup cluster template
- tiup cluster upgrade
- TiUP DM 命令
- TiUP DM 命令概览
- tiup dm audit
- tiup dm deploy
- tiup dm destroy
- tiup dm disable
- tiup dm display
- tiup dm edit-config
- tiup dm enable
- tiup dm help
- tiup dm import
- tiup dm list
- tiup dm patch
- tiup dm prune
- tiup dm reload
- tiup dm replay
- tiup dm restart
- tiup dm scale-in
- tiup dm scale-out
- tiup dm start
- tiup dm stop
- tiup dm template
- tiup dm upgrade
- TiDB 集群拓扑文件配置
- DM 集群拓扑文件配置
- TiUP 镜像参考指南
- TiUP 组件文档
- PingCAP Clinic 诊断服务 (Technical Preview)
- TiDB Operator
- Dumpling
- TiDB Lightning
- TiDB Data Migration
- 关于 Data Migration
- 架构简介
- 快速开始
- 部署 DM 集群
- 入门指南
- 进阶教程
- 运维管理
- 参考手册
- 使用示例
- 异常解决
- 版本发布历史
- Backup & Restore (BR)
- TiDB Binlog
- TiCDC
- TiUniManager
- sync-diff-inspector
- TiSpark
- 参考指南
- 架构
- 监控指标
- 安全加固
- 权限
- SQL
- SQL 语言结构和语法
- SQL 语句
ADD COLUMN
ADD INDEX
ADMIN
ADMIN CANCEL DDL
ADMIN CHECKSUM TABLE
ADMIN CHECK [TABLE|INDEX]
ADMIN SHOW DDL [JOBS|QUERIES]
ADMIN SHOW TELEMETRY
ALTER DATABASE
ALTER INDEX
ALTER INSTANCE
ALTER PLACEMENT POLICY
ALTER TABLE
ALTER TABLE COMPACT
ALTER USER
ANALYZE TABLE
BACKUP
BATCH
BEGIN
CHANGE COLUMN
CHANGE DRAINER
CHANGE PUMP
COMMIT
CREATE [GLOBAL|SESSION] BINDING
CREATE DATABASE
CREATE INDEX
CREATE PLACEMENT POLICY
CREATE ROLE
CREATE SEQUENCE
CREATE TABLE LIKE
CREATE TABLE
CREATE USER
CREATE VIEW
DEALLOCATE
DELETE
DESC
DESCRIBE
DO
DROP [GLOBAL|SESSION] BINDING
DROP COLUMN
DROP DATABASE
DROP INDEX
DROP PLACEMENT POLICY
DROP ROLE
DROP SEQUENCE
DROP STATS
DROP TABLE
DROP USER
DROP VIEW
EXECUTE
EXPLAIN ANALYZE
EXPLAIN
FLASHBACK TABLE
FLUSH PRIVILEGES
FLUSH STATUS
FLUSH TABLES
GRANT <privileges>
GRANT <role>
INSERT
KILL [TIDB]
LOAD DATA
LOAD STATS
MODIFY COLUMN
PREPARE
RECOVER TABLE
RENAME INDEX
RENAME TABLE
REPLACE
RESTORE
REVOKE <privileges>
REVOKE <role>
ROLLBACK
SELECT
SET DEFAULT ROLE
SET [NAMES|CHARACTER SET]
SET PASSWORD
SET ROLE
SET TRANSACTION
SET [GLOBAL|SESSION] <variable>
SHOW [BACKUPS|RESTORES]
SHOW ANALYZE STATUS
SHOW [GLOBAL|SESSION] BINDINGS
SHOW BUILTINS
SHOW CHARACTER SET
SHOW COLLATION
SHOW [FULL] COLUMNS FROM
SHOW CONFIG
SHOW CREATE PLACEMENT POLICY
SHOW CREATE SEQUENCE
SHOW CREATE TABLE
SHOW CREATE USER
SHOW DATABASES
SHOW DRAINER STATUS
SHOW ENGINES
SHOW ERRORS
SHOW [FULL] FIELDS FROM
SHOW GRANTS
SHOW INDEX [FROM|IN]
SHOW INDEXES [FROM|IN]
SHOW KEYS [FROM|IN]
SHOW MASTER STATUS
SHOW PLACEMENT
SHOW PLACEMENT FOR
SHOW PLACEMENT LABELS
SHOW PLUGINS
SHOW PRIVILEGES
SHOW [FULL] PROCESSSLIST
SHOW PROFILES
SHOW PUMP STATUS
SHOW SCHEMAS
SHOW STATS_HEALTHY
SHOW STATS_HISTOGRAMS
SHOW STATS_META
SHOW STATUS
SHOW TABLE NEXT_ROW_ID
SHOW TABLE REGIONS
SHOW TABLE STATUS
SHOW [FULL] TABLES
SHOW [GLOBAL|SESSION] VARIABLES
SHOW WARNINGS
SHUTDOWN
SPLIT REGION
START TRANSACTION
TABLE
TRACE
TRUNCATE
UPDATE
USE
WITH
- 数据类型
- 函数与操作符
- 聚簇索引
- 约束
- 生成列
- SQL 模式
- 表属性
- 事务
- 视图
- 分区表
- 临时表
- 缓存表
- 字符集和排序
- Placement Rules in SQL
- 系统表
mysql
- INFORMATION_SCHEMA
- Overview
ANALYZE_STATUS
CLIENT_ERRORS_SUMMARY_BY_HOST
CLIENT_ERRORS_SUMMARY_BY_USER
CLIENT_ERRORS_SUMMARY_GLOBAL
CHARACTER_SETS
CLUSTER_CONFIG
CLUSTER_HARDWARE
CLUSTER_INFO
CLUSTER_LOAD
CLUSTER_LOG
CLUSTER_SYSTEMINFO
COLLATIONS
COLLATION_CHARACTER_SET_APPLICABILITY
COLUMNS
DATA_LOCK_WAITS
DDL_JOBS
DEADLOCKS
ENGINES
INSPECTION_RESULT
INSPECTION_RULES
INSPECTION_SUMMARY
KEY_COLUMN_USAGE
METRICS_SUMMARY
METRICS_TABLES
PARTITIONS
PLACEMENT_POLICIES
PROCESSLIST
REFERENTIAL_CONSTRAINTS
SCHEMATA
SEQUENCES
SESSION_VARIABLES
SLOW_QUERY
STATISTICS
TABLES
TABLE_CONSTRAINTS
TABLE_STORAGE_STATS
TIDB_HOT_REGIONS
TIDB_HOT_REGIONS_HISTORY
TIDB_INDEXES
TIDB_SERVERS_INFO
TIDB_TRX
TIFLASH_REPLICA
TIKV_REGION_PEERS
TIKV_REGION_STATUS
TIKV_STORE_STATUS
USER_PRIVILEGES
VIEWS
METRICS_SCHEMA
- UI
- CLI
- 命令行参数
- 配置文件参数
- 系统变量
- 存储引擎
- 遥测
- 错误码
- 通过拓扑 label 进行副本调度
- 常见问题解答 (FAQ)
- 版本发布历史
- 术语表
事务限制
本章将简单介绍 TiDB 中的事务限制。
隔离级别
TiDB 支持的隔离级别是 RC(Read Committed)与 SI(Snapshot Isolation),其中 SI 与 RR(Repeatable Read)隔离级别基本等价。
SI 可以克服幻读
TiDB 的 SI 隔离级别可以克服幻读异常 (Phantom Reads),但 ANSI/ISO SQL 标准 中的 RR 不能。
所谓幻读是指:事务 A 首先根据条件查询得到 n 条记录,然后事务 B 改变了这 n 条记录之外的 m 条记录或者增添了 m 条符合事务 A 查询条件的记录,导致事务 A 再次发起请求时发现有 n+m 条符合条件记录,就产生了幻读。
例如:系统管理员 A 将数据库中所有学生的成绩从具体分数改为 ABCDE 等级,但是系统管理员 B 就在这个时候插入了一条具体分数的记录,当系统管理员 A 改结束后发现还有一条记录没有改过来,就好像发生了幻觉一样,这就叫幻读。
SI 不能克服写偏斜
TiDB 的 SI 隔离级别不能克服写偏斜异常(Write Skew),需要使用 Select for update 语法来克服写偏斜异常。
写偏斜异常是指两个并发的事务读取了不同但相关的记录,接着这两个事务各自更新了自己读到的数据,并最终都提交了事务,如果这些相关的记录之间存在着不能被多个事务并发修改的约束,那么最终结果将是违反约束的。
举个例子,假设你正在为医院写一个医生轮班管理程序。医院通常会同时要求几位医生待命,但底线是至少有一位医生在待命。医生可以放弃他们的班次(例如,如果他们自己生病了),只要至少有一个同事在这一班中继续工作。
现在出现这样一种情况,Alice 和 Bob 是两位值班医生。两人都感到不适,所以他们都决定请假。不幸的是,他们恰好在同一时间点击按钮下班。下面用程序来模拟一下这个过程。
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting until txn 2 is done.
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 is done. Let txn 1 run again.
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
首先,封装一个用于适配 TiDB 事务的工具包 util,随后编写以下代码:
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQL 日志:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) as `count` FROM `doctors` WHERE `on_call` = 1 AND `shift_id` = 123
/* txn 2 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 and `shift_id` = 123
/* txn 1 */ UPDATE `doctors` SET `on_call` = 0 WHERE `id` = 1 AND `shift_id` = 123
/* txn 1 */ COMMIT
执行结果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 0 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
在两个事务中,应用首先检查是否有两个或以上的医生正在值班;如果是的话,它就假定一名医生可以安全地休班。由于数据库使用快照隔离,两次检查都返回 2 ,所以两个事务都进入下一个阶段。Alice 更新自己的记录休班了,而 Bob 也做了一样的事情。两个事务都成功提交了,现在没有医生值班了。违反了至少有一名医生在值班的要求。下图(引用自《Designing Data-Intensive Application》)说明了实际发生的情况:
现在更改示例程序,使用 SELECT FOR UPDATE
来克服写偏斜问题:
package com.pingcap.txn.write.skew;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class EffectWriteSkew {
public static void main(String[] args) throws SQLException, InterruptedException {
HikariDataSource ds = new HikariDataSource();
ds.setJdbcUrl("jdbc:mysql://localhost:4000/test?useServerPrepStmts=true&cachePrepStmts=true");
ds.setUsername("root");
// prepare data
Connection connection = ds.getConnection();
createDoctorTable(connection);
createDoctor(connection, 1, "Alice", true, 123);
createDoctor(connection, 2, "Bob", true, 123);
createDoctor(connection, 3, "Carol", false, 123);
Semaphore txn1Pass = new Semaphore(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService threadPool = Executors.newFixedThreadPool(2);
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 1, 1);
countDownLatch.countDown();
});
threadPool.execute(() -> {
askForLeave(ds, txn1Pass, 2, 2);
countDownLatch.countDown();
});
countDownLatch.await();
}
public static void createDoctorTable(Connection connection) throws SQLException {
connection.createStatement().executeUpdate("CREATE TABLE `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin");
}
public static void createDoctor(Connection connection, Integer id, String name, Boolean onCall, Integer shiftID) throws SQLException {
PreparedStatement insert = connection.prepareStatement(
"INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)");
insert.setInt(1, id);
insert.setString(2, name);
insert.setBoolean(3, onCall);
insert.setInt(4, shiftID);
insert.executeUpdate();
}
public static void askForLeave(HikariDataSource ds, Semaphore txn1Pass, Integer txnID, Integer doctorID) {
try(Connection connection = ds.getConnection()) {
try {
connection.setAutoCommit(false);
String comment = txnID == 2 ? " " : "" + "/* txn #{txn_id} */ ";
connection.createStatement().executeUpdate(comment + "BEGIN");
// Txn 1 should be waiting until txn 2 is done.
if (txnID == 1) {
txn1Pass.acquire();
}
PreparedStatement currentOnCallQuery = connection.prepareStatement(comment +
"SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ? FOR UPDATE");
currentOnCallQuery.setBoolean(1, true);
currentOnCallQuery.setInt(2, 123);
ResultSet res = currentOnCallQuery.executeQuery();
if (!res.next()) {
throw new RuntimeException("error query");
} else {
int count = res.getInt("count");
if (count >= 2) {
// If current on-call doctor has 2 or more, this doctor can leave
PreparedStatement insert = connection.prepareStatement( comment +
"UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?");
insert.setBoolean(1, false);
insert.setInt(2, doctorID);
insert.setInt(3, 123);
insert.executeUpdate();
connection.commit();
} else {
throw new RuntimeException("At least one doctor is on call");
}
}
// Txn 2 is done. Let txn 1 run again.
if (txnID == 2) {
txn1Pass.release();
}
} catch (Exception e) {
// If got any error, you should roll back, data is priceless
connection.rollback();
e.printStackTrace();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
package main
import (
"database/sql"
"fmt"
"sync"
"github.com/pingcap-inc/tidb-example-golang/util"
_ "github.com/go-sql-driver/mysql"
)
func main() {
openDB("mysql", "root:@tcp(127.0.0.1:4000)/test", func(db *sql.DB) {
writeSkew(db)
})
}
func openDB(driverName, dataSourceName string, runnable func(db *sql.DB)) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
panic(err)
}
defer db.Close()
runnable(db)
}
func writeSkew(db *sql.DB) {
err := prepareData(db)
if err != nil {
panic(err)
}
waitingChan, waitGroup := make(chan bool), sync.WaitGroup{}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 1, 1)
if err != nil {
panic(err)
}
}()
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err = askForLeave(db, waitingChan, 2, 2)
if err != nil {
panic(err)
}
}()
waitGroup.Wait()
}
func askForLeave(db *sql.DB, waitingChan chan bool, goroutineID, doctorID int) error {
txnComment := fmt.Sprintf("/* txn %d */ ", goroutineID)
if goroutineID != 1 {
txnComment = "\t" + txnComment
}
txn, err := util.TiDBSqlBegin(db, true)
if err != nil {
return err
}
fmt.Println(txnComment + "start txn")
// Txn 1 should be waiting until txn 2 is done.
if goroutineID == 1 {
<-waitingChan
}
txnFunc := func() error {
queryCurrentOnCall := "SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = ? AND `shift_id` = ?"
rows, err := txn.Query(queryCurrentOnCall, true, 123)
if err != nil {
return err
}
defer rows.Close()
fmt.Println(txnComment + queryCurrentOnCall + " successful")
count := 0
if rows.Next() {
err = rows.Scan(&count)
if err != nil {
return err
}
}
rows.Close()
if count < 2 {
return fmt.Errorf("at least one doctor is on call")
}
shift := "UPDATE `doctors` SET `on_call` = ? WHERE `id` = ? AND `shift_id` = ?"
_, err = txn.Exec(shift, false, doctorID, 123)
if err == nil {
fmt.Println(txnComment + shift + " successful")
}
return err
}
err = txnFunc()
if err == nil {
txn.Commit()
fmt.Println("[runTxn] commit success")
} else {
txn.Rollback()
fmt.Printf("[runTxn] got an error, rollback: %+v\n", err)
}
// Txn 2 is done. Let txn 1 run again.
if goroutineID == 2 {
waitingChan <- true
}
return nil
}
func prepareData(db *sql.DB) error {
err := createDoctorTable(db)
if err != nil {
return err
}
err = createDoctor(db, 1, "Alice", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 2, "Bob", true, 123)
if err != nil {
return err
}
err = createDoctor(db, 3, "Carol", false, 123)
if err != nil {
return err
}
return nil
}
func createDoctorTable(db *sql.DB) error {
_, err := db.Exec("CREATE TABLE IF NOT EXISTS `doctors` (" +
" `id` int(11) NOT NULL," +
" `name` varchar(255) DEFAULT NULL," +
" `on_call` tinyint(1) DEFAULT NULL," +
" `shift_id` int(11) DEFAULT NULL," +
" PRIMARY KEY (`id`)," +
" KEY `idx_shift_id` (`shift_id`)" +
" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")
return err
}
func createDoctor(db *sql.DB, id int, name string, onCall bool, shiftID int) error {
_, err := db.Exec("INSERT INTO `doctors` (`id`, `name`, `on_call`, `shift_id`) VALUES (?, ?, ?, ?)",
id, name, onCall, shiftID)
return err
}
SQL 日志:
/* txn 1 */ BEGIN
/* txn 2 */ BEGIN
/* txn 2 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE on_call = 1 AND `shift_id` = 123 FOR UPDATE
/* txn 2 */ UPDATE `doctors` SET on_call = 0 WHERE `id` = 2 AND `shift_id` = 123
/* txn 2 */ COMMIT
/* txn 1 */ SELECT COUNT(*) AS `count` FROM `doctors` WHERE `on_call` = 1 FOR UPDATE
At least one doctor is on call
/* txn 1 */ ROLLBACK
执行结果:
mysql> SELECT * FROM doctors;
+----+-------+---------+----------+
| id | name | on_call | shift_id |
+----+-------+---------+----------+
| 1 | Alice | 1 | 123 |
| 2 | Bob | 0 | 123 |
| 3 | Carol | 0 | 123 |
+----+-------+---------+----------+
不支持 savepoint 和嵌套事务
Spring 支持的 PROPAGATION_NESTED 传播行为会启动一个嵌套的事务,它是当前事务之上独立启动的一个子事务。嵌套事务开始时会记录一个 savepoint ,如果嵌套事务执行失败,事务将会回滚到 savepoint 的状态。嵌套事务是外层事务的一部分,它将会在外层事务提交时一起被提交。下面案例展示了 savepoint 机制:
mysql> BEGIN;
mysql> INSERT INTO T2 VALUES(100);
mysql> SAVEPOINT svp1;
mysql> INSERT INTO T2 VALUES(200);
mysql> ROLLBACK TO SAVEPOINT svp1;
mysql> RELEASE SAVEPOINT svp1;
mysql> COMMIT;
mysql> SELECT * FROM T2;
+------+
| ID |
+------+
| 100 |
+------+
TiDB 不支持 savepoint 机制,因此也不支持 PROPAGATION_NESTED 传播行为。基于 Java Spring 框架的应用如果使用了 PROPAGATION_NESTED 传播行为,需要在应用端做出调整,将嵌套事务的逻辑移除。
大事务限制
基本原则是要限制事务的大小。TiDB 对单个事务的大小有限制,这层限制是在 KV 层面。反映在 SQL 层面的话,简单来说一行数据会映射为一个 KV entry,每多一个索引,也会增加一个 KV entry。所以这个限制反映在 SQL 层面是:
- 最大单行记录容量为 120MB(TiDB v5.0 及更高的版本可通过 tidb-server 配置项
performance.txn-entry-size-limit
调整,低于 TiDB v5.0 的版本支持的单行容量为 6MB)。 - 支持的最大单个事务容量为 10GB(TiDB v4.0 及更高版本可通过 tidb-server 配置项
performance.txn-total-size-limit
调整,低于 TiDB v4.0 的版本支持的最大单个事务容量为 100MB)。
另外注意,无论是大小限制还是行数限制,还要考虑事务执行过程中,TiDB 做编码以及事务额外 Key 的开销。在使用的时候,为了使性能达到最优,建议每 100 ~ 500 行写入一个事务。
自动提交的 SELECT FOR UPDATE 语句不会等锁
自动提交下的 SELECT FOR UPDATE 目前不会加锁。效果如下图所示:
这是已知的与 MySQL 不兼容的地方。
可以通过使用显式的 BEGIN;COMMIT;
解决该问题。