📣

TiDB Cloud Serverless 现已更名为
Starter
!此页面由 AI 自动翻译,英文原文请见
此处。

更新数据

本文档介绍如何使用以下 SQL 语句结合各种编程语言在 TiDB 中更新数据:

  • UPDATE:用于修改指定表中的数据。
  • INSERT ON DUPLICATE KEY UPDATE:用于插入数据,如果存在主键或唯一键冲突,则更新该数据。如果表中有多个唯一键(包括主键),不推荐使用此语句。因为该语句在检测到任何唯一键(包括主键)冲突时会更新数据。当存在多个冲突行时,只会更新其中一行。

开始之前

在阅读本文档之前,你需要准备以下内容:

使用 UPDATE

要更新表中的现有行,你需要使用带有 WHERE 子句的 UPDATE 语句,以筛选需要更新的列。

UPDATE SQL 语法

在 SQL 中,UPDATE 语句通常如下所示:

UPDATE {table} SET {update_column} = {update_value} WHERE {filter_column} = {filter_value}
参数名称描述
{table}表名
{update_column}需要更新的列名
{update_value}需要更新的列值
{filter_column}用于筛选的列名
{filter_value}用于筛选的列值

详细信息请参见 UPDATE syntax

UPDATE 最佳实践

以下是一些更新数据的最佳实践:

  • 始终在 UPDATE 语句中指定 WHERE 子句。如果没有 WHERE 子句,TiDB 将会更新 所有行

UPDATE 示例

假设一位作者将她的名字改为 Helen Haruki,你需要更新 authors 表。假设她的唯一 id1,筛选条件为:id = 1

    UPDATE `authors` SET `name` = "Helen Haruki" WHERE `id` = 1;
    // ds 是一个 com.mysql.cj.jdbc.MysqlDataSource 的实体 try (Connection connection = ds.getConnection()) { PreparedStatement pstmt = connection.prepareStatement("UPDATE `authors` SET `name` = ? WHERE `id` = ?"); pstmt.setString(1, "Helen Haruki"); pstmt.setInt(2, 1); pstmt.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); }

    使用 INSERT ON DUPLICATE KEY UPDATE

    如果你需要向表中插入新数据,但如果存在唯一键(包括主键)冲突,则会更新冲突的那条记录。你可以使用 INSERT ... ON DUPLICATE KEY UPDATE ... 语句实现插入或更新。

    INSERT ON DUPLICATE KEY UPDATE SQL 语法

    在 SQL 中,INSERT ... ON DUPLICATE KEY UPDATE ... 语句通常如下所示:

    INSERT INTO {table} ({columns}) VALUES ({values}) ON DUPLICATE KEY UPDATE {update_column} = {update_value};
    参数名称描述
    {table}表名
    {columns}要插入的列名
    {values}要插入的列值
    {update_column}要更新的列名
    {update_value}要更新的列值

    INSERT ON DUPLICATE KEY UPDATE 最佳实践

    • 仅在表中只有一个唯一键时使用 INSERT ON DUPLICATE KEY UPDATE。该语句在检测到任何 UNIQUE KEY(包括主键)冲突时会更新数据。如果存在多行冲突,只会更新其中一行。因此,除非你能保证冲突只有一行,否则不建议在具有多个唯一键的表中使用此语句。
    • 在创建数据或更新数据时使用此语句。

    INSERT ON DUPLICATE KEY UPDATE 示例

    例如,你需要更新 ratings 表,以包含用户对书的评分。如果用户尚未评分,则会创建新评分;如果已评分,则会更新之前的评分。

    在下面的示例中,主键是 book_iduser_id 的联合主键。用户 user_id = 1 给一本书 book_id = 1000 评分为 5

      INSERT INTO `ratings` (`book_id`, `user_id`, `score`, `rated_at`) VALUES (1000, 1, 5, NOW()) ON DUPLICATE KEY UPDATE `score` = 5, `rated_at` = NOW();
      // ds 是一个 com.mysql.cj.jdbc.MysqlDataSource 的实体 try (Connection connection = ds.getConnection()) { PreparedStatement p = connection.prepareStatement("INSERT INTO `ratings` (`book_id`, `user_id`, `score`, `rated_at`) VALUES (?, ?, ?, NOW()) ON DUPLICATE KEY UPDATE `score` = ?, `rated_at` = NOW()"); p.setInt(1, 1000); p.setInt(2, 1); p.setInt(3, 5); p.setInt(4, 5); p.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); }

      批量更新

      当你需要在表中更新多行数据时,可以 使用 INSERT ON DUPLICATE KEY UPDATE 搭配 WHERE 子句筛选需要更新的数据。

      然而,如果你需要更新大量行(例如超过一万行),建议逐步迭代更新,即每次只更新部分数据,直到全部完成。这是因为 TiDB 限制单个事务的大小为 100 MB。一次性更新过多数据会导致持锁时间过长(pessimistic transactions)或引发冲突(optimistic transactions)。你可以在程序或脚本中使用循环完成此操作。

      本节提供了编写脚本以实现迭代更新的示例。此示例展示了如何结合 SELECTUPDATE 完成批量更新。

      编写批量更新循环

      首先,你应在你的应用或脚本中编写一个 SELECT 查询。该查询的返回值可以作为需要更新行的主键。注意在定义此 SELECT 查询时,必须使用 WHERE 子句筛选需要更新的行。

      示例

      假设你在 bookshop 网站上过去一年收集了大量用户对书的评分,但原有的 5 分制设计导致评分缺乏差异化,大部分书的评分都为 3。你决定将评分从 5 分制切换到 10 分制,以实现差异化。

      你需要将 ratings 表中的数据乘以 2,并新增一列用以标识是否已更新。利用此列,你可以在 SELECT 时筛选出已更新的行,避免脚本崩溃或多次更新同一行导致数据异常。

      例如,你创建一个名为 ten_point 的列,数据类型为 BOOL,用作是否为 10 分制的标识:

      ALTER TABLE `bookshop`.`ratings` ADD COLUMN `ten_point` BOOL NOT NULL DEFAULT FALSE;

        在 Golang 中,批量更新的示例类似如下:

        package main import ( "database/sql" "fmt" _ "github.com/go-sql-driver/mysql" "strings" "time" ) func main() { db, err := sql.Open("mysql", "root:@tcp(127.0.0.1:4000)/bookshop") if err != nil { panic(err) } defer db.Close() bookID, userID := updateBatch(db, true, 0, 0) fmt.Println("第一次批量更新成功") for { time.Sleep(time.Second) bookID, userID = updateBatch(db, false, bookID, userID) fmt.Printf("批量更新成功,[bookID] %d,[userID] %d\n", bookID, userID) } } // updateBatch 在最多 1000 行数据中选择,更新评分 func updateBatch(db *sql.DB, firstTime bool, lastBookID, lastUserID int64) (bookID, userID int64) { // 选择最多 1000 条未更新到 10 分制的数据的主键 var err error var rows *sql.Rows if firstTime { rows, err = db.Query("SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` " + "WHERE `ten_point` != true ORDER BY `book_id`, `user_id` LIMIT 1000") } else { rows, err = db.Query("SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` "+ "WHERE `ten_point` != true AND `book_id` > ? AND `user_id` > ? "+ "ORDER BY `book_id`, `user_id` LIMIT 1000", lastBookID, lastUserID) } if err != nil || rows == nil { panic(fmt.Errorf("发生错误或行为空: %+v", err)) } // 将所有ID合并成列表 var idList []interface{} for rows.Next() { var tempBookID, tempUserID int64 if err := rows.Scan(&tempBookID, &tempUserID); err != nil { panic(err) } idList = append(idList, tempBookID, tempUserID) bookID, userID = tempBookID, tempUserID } bulkUpdateSql := fmt.Sprintf("UPDATE `bookshop`.`ratings` SET `ten_point` = true, "+ "`score` = `score` * 2 WHERE (`book_id`, `user_id`) IN (%s)", placeHolder(len(idList))) db.Exec(bulkUpdateSql, idList...) return bookID, userID } // placeHolder 格式化SQL占位符 func placeHolder(n int) string { holderList := make([]string, n/2, n/2) for i := range holderList { holderList[i] = "(?,?)" } return strings.Join(holderList, ",") }

        每次循环中,SELECT 按主键顺序查询,最多选择 1000 行未更新到 10 分制(ten_pointfalse)的主键。每个 SELECT 语句会选择比上一次最大主键更大的主键,以避免重复。然后,利用批量更新,将 score 列乘以 2,并将 ten_point 设置为 true。更新 ten_point 的目的是为了防止在崩溃重启后,更新程序反复更新同一行,导致数据损坏。每个循环中的 time.Sleep(time.Second) 让更新暂停 1 秒,以减少硬件资源消耗。

        在 Java (JDBC) 中,批量更新的示例可能如下:

        代码:

        package com.pingcap.bulkUpdate; import com.mysql.cj.jdbc.MysqlDataSource; import java.sql.*; import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; public class BatchUpdateExample { static class UpdateID { private Long bookID; private Long userID; public UpdateID(Long bookID, Long userID) { this.bookID = bookID; this.userID = userID; } public Long getBookID() { return bookID; } public void setBookID(Long bookID) { this.bookID = bookID; } public Long getUserID() { return userID; } public void setUserID(Long userID) { this.userID = userID; } @Override public String toString() { return "[bookID] " + bookID + ", [userID] " + userID ; } } public static void main(String[] args) throws InterruptedException { // 配置示例数据库连接。 // 创建 Mysql 数据源实例。 MysqlDataSource mysqlDataSource = new MysqlDataSource(); // 设置服务器名、端口、数据库名、用户名和密码。 mysqlDataSource.setServerName("localhost"); mysqlDataSource.setPortNumber(4000); mysqlDataSource.setDatabaseName("bookshop"); mysqlDataSource.setUser("root"); mysqlDataSource.setPassword(""); UpdateID lastID = batchUpdate(mysqlDataSource, null); System.out.println("第一次批量更新成功"); while (true) { TimeUnit.SECONDS.sleep(1); lastID = batchUpdate(mysqlDataSource, lastID); System.out.println("批量更新成功,[lastID] " + lastID); } } public static UpdateID batchUpdate (MysqlDataSource ds, UpdateID lastID) { try (Connection connection = ds.getConnection()) { UpdateID updateID = null; PreparedStatement selectPs; if (lastID == null) { selectPs = connection.prepareStatement( "SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` " + "WHERE `ten_point` != true ORDER BY `book_id`, `user_id` LIMIT 1000"); } else { selectPs = connection.prepareStatement( "SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` "+ "WHERE `ten_point` != true AND `book_id` > ? AND `user_id` > ? "+ "ORDER BY `book_id`, `user_id` LIMIT 1000"); selectPs.setLong(1, lastID.getBookID()); selectPs.setLong(2, lastID.getUserID()); } List<Long> idList = new LinkedList<>(); ResultSet res = selectPs.executeQuery(); while (res.next()) { updateID = new UpdateID( res.getLong("book_id"), res.getLong("user_id") ); idList.add(updateID.getBookID()); idList.add(updateID.getUserID()); } if (idList.isEmpty()) { System.out.println("没有数据需要更新"); return null; } String updateSQL = "UPDATE `bookshop`.`ratings` SET `ten_point` = true, "+ "`score` = `score` * 2 WHERE (`book_id`, `user_id`) IN (" + placeHolder(idList.size() / 2) + ")"; PreparedStatement updatePs = connection.prepareStatement(updateSQL); for (int i = 0; i < idList.size(); i++) { updatePs.setLong(i + 1, idList.get(i)); } int count = updatePs.executeUpdate(); System.out.println("更新了 " + count + " 条数据"); return updateID; } catch (SQLException e) { e.printStackTrace(); } return null; } public static String placeHolder(int n) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < n ; i++) { sb.append(i == 0 ? "(?,?)" : ",(?,?)"); } return sb.toString(); } }
        • hibernate.cfg.xml 配置:
        <?xml version='1.0' encoding='utf-8'?> <!DOCTYPE hibernate-configuration PUBLIC "-//Hibernate/Hibernate Configuration DTD 3.0//EN" "http://www.hibernate.org/dtd/hibernate-configuration-3.0.dtd"> <hibernate-configuration> <session-factory> <!-- 数据库连接设置 --> <property name="hibernate.connection.driver_class">com.mysql.cj.jdbc.Driver</property> <property name="hibernate.dialect">org.hibernate.dialect.TiDBDialect</property> <property name="hibernate.connection.url">jdbc:mysql://localhost:4000/movie</property> <property name="hibernate.connection.username">root</property> <property name="hibernate.connection.password"></property> <property name="hibernate.connection.autocommit">false</property> <property name="hibernate.jdbc.batch_size">20</property> <!-- 可选:显示 SQL 输出以便调试 --> <property name="hibernate.show_sql">true</property> <property name="hibernate.format_sql">true</property> </session-factory> </hibernate-configuration>

        每次循环中,SELECT 按主键顺序查询,最多选择 1000 行未更新到 10 分制(ten_pointfalse)的主键。每个 SELECT 语句会选择比上一次最大主键更大的主键,以避免重复。然后,利用批量更新,将 score 列乘以 2,并将 ten_point 设置为 true。更新 ten_point 的目的是为了防止在崩溃重启后,更新程序反复更新同一行,导致数据损坏。每个循环中的 TimeUnit.SECONDS.sleep(1); 让更新暂停 1 秒,以减少硬件资源消耗。

        需要帮助?

        DiscordSlack 上向社区提问,或 提交支持工单

        文档内容是否有帮助?