Python 第三方模块之 DButils - 数据库连接池

使用pymysql来连接数据库时,单线程应用完全没有问题,但如果涉及到多线程应用那么就需要加锁,一旦加锁那么连接势必就会排队等待,当请求比较多时,性能就会降低了。所以我们需要使用 DButils 模块

DBUtils是Python的一个用于实现数据库连接池的模块。并允许对非线程安全的数据库接口进行线程安全包装。DBUtils来自Webware for Python。

DBUtils提供两种外部接口:

  • PersistentDB :提供线程专用的数据库连接,并自动管理连接。
  • PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

安装

1
2
3
4
5
# DBUtils 
pip install DBUtils

# pymysql
pip install pymysql/MySQLdb

连接模式

DButils 连接池有两种连接模式:

  • 模式一:为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    POOL = PersistentDB(
    creator=pymysql, # 使用链接数据库的模块,需要符合PEP249协议, 如MySQLdb, mysql.connector等
    closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,在线程关闭时,才会自动关闭链接。
    # 如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)

    threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host='127.0.0.1',
    port=3306, # 必须为int 类型
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
    )

    def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

    func()
  • 模式二:创建一批连接到连接池,供所有线程共享使用。
    PS:由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    import time
    import pymysql
    import threading
    from DBUtils.PooledDB import PooledDB, SharedDBConnection
    POOL = PooledDB(
    creator=pymysql, # 使用链接数据库的模块
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb', # python2 是 db
    charset='utf8'
    )


    def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则 ,则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()

    func()

参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
POOL = PersistentDB(
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数,需要符合PEP249协议, 如MySQLdb, mysql.connector等
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # ping MySQL服务端,检查是否服务可用。
# 0 = None = never, 从来不检查
# 1 = default = whenever it is requested, 默认值,从连接池中拿出来时检查
# 2 = when a cursor is created, 当游标cursor创建时
# 4 = when a query is executed, 当调用excute方法时
# 7 = always 上面的几种情况
# 连接池对象有个空闲连接列表的属性,当服务端与数据库端没有什么数据发送时,此时中的空闲连接若长时间没有向数据发送数据,如果该连接被防火墙干掉,然而此时
# 客户端中的连接池中的空闲连接并不知道自己异常,当再次发送数据时,会连接重试最后大概15分钟后报错超时。所以上面的集中参数都没有考虑到,若要解决这个问题
# 我们需要定时向数据发送数据。

reset=True, # 连接归还给pool时,是否进行重置
host='127.0.0.1',
port=3306, # 必须为int 类型
user='root',
password='123',
database='pooldb',
charset='utf8'
)

使用

config.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#-*- coding: UTF-8 -*-

#TEST数据库信息
DB_TEST_HOST="192.168.88.6";
DB_TEST_PORT=3306;
DB_TEST_DBNAME="asterisk";
DB_TEST_USER="root";
DB_TEST_PASSWORD="kalamodo";


#数据库连接编码
DB_CHARSET="utf8";

#mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED=10;

#maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED=10;

#maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED=20;

#maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS=100;

#blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......>; 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING=True;

#maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE=0;

#setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION=None;

MySQLPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import pymysql
from dbutils.pooled_db import PooledDB

from sem.utils.config import conf

from sem.utils.log import logger


class MySQLPoolLocal(object):
__pool = None

def __init__(self):
# self.conn = self.__get_conn()
# self.cursor = self.conn.cursor()
pass

def __get_conn(self):
if self.__pool is None:
self.__pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块,需要符合PEP249协议, 如MySQLdb, mysql.connector等
mincached=10, # 最小缓存的连接数, pool初始化时将会预先建立mincached个数的连接(缺省值 0 开始时不创建连接)
maxcached=10, # 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
maxshared=0, # 当连接数达到maxshared后,开始复用连接(0表示连接不进行复用)。该参数受数据库驱动的threadsafety参数影响(缺省值 0 代表所有连接都是专用的)
maxconnections=100, # 连接数上限, 0为不设上限,每当连接达到这个数时,将会阻塞或是抛出异常。shared机制下,不受该参数影响(缺省值 0 代表不限制)
blocking=True, # 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表抛出异常; 其他代表阻塞直到连接数减少,连接被分配)
maxusage=0, # 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
host=conf.MYSQL_HOST,
port=3306,
user=conf.MYSQL_USER,
passwd=conf.MYSQL_PASSWD,
db=conf.MYSQL_DB,
use_unicode=True, # 是否使用 unicode 编码
charset='utf8'
)
# 每次需要数据库连接就是用connection()函数获取连接
return self.__pool.connection()

def get_conn(self, cursor_dict=False):
"""
连接池中取出一个连接
:return:
"""
try:
conn = self.__get_conn()
if cursor_dict:
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
else:
cursor = conn.cursor()
return cursor, conn
except:
logger.exception("get db connect error")
return "", ""

def __exit__(self, type, value, trace):
self.cursor.close()
self.conn.close()

def close(self):
"""
关闭连接,归还给连接池
:return:
"""
self.cursor.close()
self.conn.close()


class MySQLPool(object):
mysql = None

def __init__(self, mysql_pool_obj=MySQLPoolLocal):
self.db = mysql_pool_obj()

def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'inst'):
cls.inst = super(MySQLPool, cls).__new__(cls, *args, **kwargs)
return cls.inst

def select_all(self, sql='', param=(), cursor_dict=False):
"""
查询全部
:param sql:
:param param:
:return:
"""
cursor, conn = self.db.get_conn(cursor_dict=cursor_dict)
try:
cursor.execute(sql, param)
res = cursor.fetchall()
self.close(cursor, conn)
print(res)
return res
except Exception as e:
logger.exception("select_all except")
self.close(cursor, conn)
return None

def select_one(self, sql='', param=()):
"""
查询一条
:param sql:
:param param:
:return:
"""
cursor, conn = self.db.get_conn()
try:
cursor.execute(sql, param)
res = cursor.fetchone()
self.close(cursor, conn)
return res
except Exception as e:
logger.exception("select_one except")
self.close(cursor, conn)
return None

def select(self, sql='', param=(), cursor_dict=False):
return self.select_all(sql, param, cursor_dict=cursor_dict)

def insert(self, sql='', param=()):
"""
增加
:param sql:
:param param:
:return:
"""

return self.execute(sql, param)

def insert_many(self, sql='', param=()):
"""
增加多行
:param sql:
:param param:
:return:
"""
return self.execute(sql, param)

def delete(self, sql='', param=()):
"""
删除
:param sql:
:param param:
:return:
"""
return self.execute(sql, param)

def update(self, sql='', param=()):
"""
更新
:param sql:
:param param:
:return:
"""
return self.execute(sql, param)

def execute(self, sql='', param=(), ):
cursor, conn = self.db.get_conn()

try:
if param:
cursor.execute(sql, param)
else:
cursor.execute(sql)
_id = cursor.lastrowid
conn.commit()
self.close(cursor, conn)
# 防止表中没有id返回0
if _id == 0:
return True
return _id
except Exception as e:
logger.exception("error")
if conn: conn.rollback()
return False

def execute_many(self, list=[]):
"""
执行多条命令
:param list:
:return:
"""
cursor, conn = self.db.get_conn()
try:
for order in list:
sql = order['sql']
param = order['param']
if param:
cursor.execute(sql, param)
else:
cursor.execute(sql)
conn.commit()
self.close(cursor, conn)
return True
except Exception as e:
logger.exception("error")
conn.rollback()
self.close(cursor, conn)
return False

def close(self, cursor, conn):
cursor and cursor.close()
conn and conn.close()


mysql_pool = MySQLPool()

Python 第三方模块之 DButils - 数据库连接池
https://flepeng.github.io/021-Python-31-Python-第三方模块-41-数据库相关-Python-第三方模块之-DButils-数据库连接池/
作者
Lepeng
发布于
2021年4月27日
许可协议