前言

最近捣鼓项目时,pymysql用executemany方法批量更新大量数据过程中,对处理时长有点不满意,于是看了下源代码,发现里面实现批量update居然是循环去execute的,吃鲸。数据少其实没多大影响,如果大量数据批量执行,那数据库压力可想而知,难怪速度提不上来;于是乎,请教了下万能的百度,得到一个还算可以的解决方案–【创建临时表实现批量更新】,以下就原方法和新实现的方法写个示例做个对比。

先来看下pymysql的源码(cursors.py)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#: Regular expression for :meth:`Cursor.executemany`.
#: executemany only supports simple bulk insert.
#: You can use it to load large dataset.
RE_INSERT_VALUES = re.compile(
r"\s*((?:INSERT|REPLACE)\b.+\bVALUES?\s*)"
+ r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))"
+ r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
re.IGNORECASE | re.DOTALL,
)
def executemany(self, query, args):
if not args:
return
m = RE_INSERT_VALUES.match(query)
if m:
q_prefix = m.group(1) % ()
q_values = m.group(2).rstrip()
q_postfix = m.group(3) or ""
assert q_values[0] == "(" and q_values[-1] == ")"
return self._do_execute_many(
q_prefix,
q_values,
q_postfix,
args,
self.max_stmt_length,
self._get_db().encoding,
)

self.rowcount = sum(self.execute(query, arg) for arg in args)
return self.rowcount

可以看到,executemany 只针对 INSERT|REPLACE、ON DUPLICATE 这些才能实现真正批量,而对于单纯的批量update 只能循环execute

实现步骤

准备

1
pip install pymysql

连接数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from typing import List
import pymysql
import time
from pymysql.cursors import DictCursor


dbconfig = {
"host": "127.0.0.1",
"user": "root",
"password": "123456",
"db": "testdb"
}

# 创建连接
conn = pymysql.connect(**dbconfig)

value = []
# 以4000条数据为例
for i in range(4000):
value.append((str(i)+"name", str(i)+"ip", i))

executemany实现批量更新

1
2
3
4
5
6
7
8
9
10
11
12
13
# 批量更新
def updatemany(value:List):
start_time = time.time()

with conn.cursor(DictCursor) as cursor:
sql = "UPDATE log SET name=%s,ip=%s WHERE id=%s"
cursor.executemany(sql, value)
conn.commit()

end_time = time.time()
print(f"【executemany】批量更新:用时{end_time-start_time}")

updatemany(value)

创建临时表 实现批量更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 批量更新(创建临时表更新)
def updatemanytemp(value:List):
start_time = time.time()

with conn.cursor(DictCursor) as cursor:
# 创建临时表
sql_temp = """
CREATE TEMPORARY TABLE log_temp SELECT id,name,ip FROM log LIMIT 0
"""
# 插入数据到临时表
sql_insert = """
INSERT INTO log_temp (name,ip,id) VALUES (%s, %s, %s)
"""
# 连表更新正式表
sql_update = """
UPDATE log, log_temp SET log.name=log_temp.name,log.ip=log_temp.ip WHERE log.id=log_temp.id
"""

# 执行sql语句
cursor.execute(sql_temp)
cursor.executemany(sql_insert, value)
cursor.execute(sql_update)
conn.commit()


end_time = time.time()
print(f"【创建临时表 】批量更新:用时{end_time-start_time}")

updatemanytemp(value)

结果

在这里插入图片描述
从上图看到,2者相差10倍左右。这次示例的数据表只有3个字段4000条数据,所以这点时间看不出有多大影响,如果字段更多数据量更大,时间可能就不止相差这点了

最后

把方法写得通用一点,直接传递表名和参数即可,方便照搬

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from typing import List
import pymysql
import time
from pymysql.cursors import DictCursor


dbconfig = {
"host": "127.0.0.1",
"user": "root",
"password": "123456",
"db": "testdb"
}

header = ["name","ip"]
where = ["id"]
value = []
for i in range(4000):
value.append((str(i)+"name", str(i)+"ip", i))

# 创建连接
conn = pymysql.connect(**dbconfig)


# 批量更新
def updatemany(table:str, header:List, where:List, value:List):
start_time = time.time()

with conn.cursor(DictCursor) as cursor:
# 拼接set语句
set_str = ",".join([f"{sql}=%s" for sql in header])
# 拼接where条件
where_str = " AND ".join([f"{sql}=%s" for sql in where])
# 拼接整个sql语句
sql = f"UPDATE {table} SET {set_str} WHERE {where_str}"

# 执行sql语句
cursor.executemany(sql, value)
conn.commit()

end_time = time.time()
print(f"【executemany】批量更新:用时{end_time-start_time}")


# 批量更新(创建临时表更新)
def updatemanytemp(table:str, header:List, where:List, value:List):
start_time = time.time()

with conn.cursor(DictCursor) as cursor:
# 拼接set语句
set_str = ",".join([f"{table}.{sql}={table}_temp.{sql}" for sql in header])
# 拼接where条件
where_str = " AND ".join([f"{table}.{sql}={table}_temp.{sql}" for sql in where])
# 拼接整个sql语句

# 创建临时表
sql_temp = f"""
CREATE TEMPORARY TABLE {table}_temp SELECT {','.join(where + header)} FROM {table} LIMIT 0
"""
# 插入数据到临时表
sql_insert = f"""
INSERT INTO {table}_temp ({','.join(header + where)}) VALUES {','.join([str(v) for v in value])}
"""
# 连表更新正式表
sql_update = f"""
UPDATE {table}, {table}_temp SET {set_str} WHERE {where_str}
"""

# 执行sql语句
cursor.execute(sql_temp)
cursor.execute(sql_insert)
cursor.execute(sql_update)
conn.commit()

end_time = time.time()
print(f"【创建临时表 】批量更新:用时{end_time-start_time}")

updatemanytemp("log", header, where, value)
updatemany("log", header, where, value)