前言 最近捣鼓项目时,pymysql用executemany方法批量更新大量数据过程中,对处理时长有点不满意,于是看了下源代码,发现里面实现批量update居然是循环去execute的,吃鲸。数据少其实没多大影响,如果大量数据批量执行,那数据库压力可想而知,难怪速度提不上来;于是乎,请教了下万能的百度,得到一个还算可以的解决方案–【创建临时表实现批量更新】,以下就原方法和新实现的方法写个示例做个对比。
先来看下pymysql的源码(cursors.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 RE_INSERT_VALUES = re.compile ( r"\s*((?:INSERT|REPLACE)\b.+\bVALUES?\s*)" + r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))" + r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z" , re.IGNORECASE | re.DOTALL, ) def executemany (self, query, args ): if not args: return m = RE_INSERT_VALUES.match (query) if m: q_prefix = m.group(1 ) % () q_values = m.group(2 ).rstrip() q_postfix = m.group(3 ) or "" assert q_values[0 ] == "(" and q_values[-1 ] == ")" return self ._do_execute_many( q_prefix, q_values, q_postfix, args, self .max_stmt_length, self ._get_db().encoding, ) self .rowcount = sum (self .execute(query, arg) for arg in args) return self .rowcount
可以看到,executemany 只针对 INSERT|REPLACE、ON DUPLICATE 这些才能实现真正批量,而对于单纯的批量update 只能循环execute
实现步骤 准备
连接数据库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 from typing import List import pymysqlimport timefrom pymysql.cursors import DictCursordbconfig = { "host" : "127.0.0.1" , "user" : "root" , "password" : "123456" , "db" : "testdb" } conn = pymysql.connect(**dbconfig) value = [] for i in range (4000 ): value.append((str (i)+"name" , str (i)+"ip" , i))
executemany实现批量更新 1 2 3 4 5 6 7 8 9 10 11 12 13 def updatemany (value:List ): start_time = time.time() with conn.cursor(DictCursor) as cursor: sql = "UPDATE log SET name=%s,ip=%s WHERE id=%s" cursor.executemany(sql, value) conn.commit() end_time = time.time() print (f"【executemany】批量更新:用时{end_time-start_time} " ) updatemany(value)
创建临时表 实现批量更新 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 def updatemanytemp (value:List ): start_time = time.time() with conn.cursor(DictCursor) as cursor: sql_temp = """ CREATE TEMPORARY TABLE log_temp SELECT id,name,ip FROM log LIMIT 0 """ sql_insert = """ INSERT INTO log_temp (name,ip,id) VALUES (%s, %s, %s) """ sql_update = """ UPDATE log, log_temp SET log.name=log_temp.name,log.ip=log_temp.ip WHERE log.id=log_temp.id """ cursor.execute(sql_temp) cursor.executemany(sql_insert, value) cursor.execute(sql_update) conn.commit() end_time = time.time() print (f"【创建临时表 】批量更新:用时{end_time-start_time} " ) updatemanytemp(value)
结果 从上图看到,2者相差10倍左右。这次示例的数据表只有3个字段4000条数据,所以这点时间看不出有多大影响,如果字段更多数据量更大,时间可能就不止相差这点了
最后 把方法写得通用一点,直接传递表名和参数即可,方便照搬
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 from typing import List import pymysqlimport timefrom pymysql.cursors import DictCursordbconfig = { "host" : "127.0.0.1" , "user" : "root" , "password" : "123456" , "db" : "testdb" } header = ["name" ,"ip" ] where = ["id" ] value = [] for i in range (4000 ): value.append((str (i)+"name" , str (i)+"ip" , i)) conn = pymysql.connect(**dbconfig) def updatemany (table:str , header:List , where:List , value:List ): start_time = time.time() with conn.cursor(DictCursor) as cursor: set_str = "," .join([f"{sql} =%s" for sql in header]) where_str = " AND " .join([f"{sql} =%s" for sql in where]) sql = f"UPDATE {table} SET {set_str} WHERE {where_str} " cursor.executemany(sql, value) conn.commit() end_time = time.time() print (f"【executemany】批量更新:用时{end_time-start_time} " ) def updatemanytemp (table:str , header:List , where:List , value:List ): start_time = time.time() with conn.cursor(DictCursor) as cursor: set_str = "," .join([f"{table} .{sql} ={table} _temp.{sql} " for sql in header]) where_str = " AND " .join([f"{table} .{sql} ={table} _temp.{sql} " for sql in where]) sql_temp = f""" CREATE TEMPORARY TABLE {table} _temp SELECT {',' .join(where + header)} FROM {table} LIMIT 0 """ sql_insert = f""" INSERT INTO {table} _temp ({',' .join(header + where)} ) VALUES {',' .join([str (v) for v in value])} """ sql_update = f""" UPDATE {table} , {table} _temp SET {set_str} WHERE {where_str} """ cursor.execute(sql_temp) cursor.execute(sql_insert) cursor.execute(sql_update) conn.commit() end_time = time.time() print (f"【创建临时表 】批量更新:用时{end_time-start_time} " ) updatemanytemp("log" , header, where, value) updatemany("log" , header, where, value)