python之数据库连接池DBUtils

DBUtils 是Python 的一个用于实现数据库连接池的模块。

此连接池有两种连接模式:

DBUtils :提供两种外部接口:

PersistentDB :提供线程专用的数据库连接,并自动管理连接。

PooledDB :提供线程间可共享的数据库连接,并自动管理连接。

介绍

PersistentDB模式

为每个线程创建一个连接,线程即使调用了 close 方法,也不会关闭,只是把链接重新放到链接池,供自己线程再次使用,当线程终止时,链接自动关闭。

from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0, # ping MySQL服务端,检查是否服务可用。
closeable=False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host='127.0.0.1',
port=3306,
user='root',
password='',
database='test',
charset='utf8'
) def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
if __name__ == '__main__': func()

PooledDB模式

创建一批连接到连接池,供所有线程共享使用。

import pymysql

from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0, # ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='',
database='test',
charset='utf8'
) def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则等待或报raise TooManyConnections异常
# 否则则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = POOL.connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute('select * from user')
result = cursor.fetchall()
print(result)
conn.close() if __name__ == '__main__': func()

工具类

# TEST数据库信息
DB_TEST_HOST = "127.0.0.1"
DB_TEST_PORT = 3306
DB_TEST_DBNAME = ""
DB_TEST_USER = "root"
DB_TEST_PASSWORD = "root" # 数据库连接编码
DB_CHARSET = "utf8" # mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 10 # maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 10 # maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 20 # maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100 # blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......>; 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True # maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0 # setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

db_config.py

import pymysql
from DBUtils.PooledDB import PooledDB
import db_config as Config class ConnectionPool(object):
__pool = None def __enter__(self):
self.conn = self.__getConn()
self.cursor = self.conn.cursor()
print("数据库创建conn和cursor")
return self def __getConn(self):
if self.__pool is None:
self.__pool = PooledDB(creator=pymysql, mincached=Config.DB_MIN_CACHED, maxcached=Config.DB_MAX_CACHED,
maxshared=Config.DB_MAX_SHARED, maxconnections=Config.DB_MAX_CONNECYIONS,
blocking=Config.DB_BLOCKING, maxusage=Config.DB_MAX_USAGE,
setsession=Config.DB_SET_SESSION,
host=Config.DB_TEST_HOST, port=Config.DB_TEST_PORT,
user=Config.DB_TEST_USER, passwd=Config.DB_TEST_PASSWORD,
db=Config.DB_TEST_DBNAME, use_unicode=False, charset=Config.DB_CHARSET)
return self.__pool.connection() def __exit__(self, type, value, trace):
"""
@summary: 释放连接池资源
"""
self.cursor.close()
self.conn.close()
print("连接池释放conn和cursor") def getconn(self):
'''
从连接池中取出一个连接
'''
conn = self.__getConn()
cursor = conn.cursor(pymysql.cursors.DictCursor)
return cursor, conn def close(self):
'''
关闭连接归还给连接池
'''
self.cursor.close()
self.conn.close()
print("连接池释放conn和cursor") POOL = ConnectionPool() class MysqlHelper(object):
mysql = None def __init__(self):
self.db = POOL def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'inst'):
cls.inst = super(MysqlHelper, cls).__new__(cls, *args, **kwargs)
return cls.inst def selectall(self, sql='', param=()):
'''
查询所有
''' try:
cursor, conn = self.execute(sql, param)
res = cursor.fetchall()
self.close(cursor, conn)
return res
except Exception as e:
print('selectall except ', e.args)
self.close(cursor, conn)
return None def selectone(self, sql='', param=()):
'''
查询一条
'''
try:
cursor, conn = self.execute(sql, param)
res = cursor.fetchone()
self.close(cursor, conn)
return res
except Exception as e:
print('selectone except ', e.args)
self.close(cursor, conn)
return None def insert(self, sql='', param=()):
'''
增加
'''
try:
cursor, conn = self.execute(sql, param)
print('============')
_id = cursor.lastrowid
print('_id ', _id)
conn.commit()
self.close(cursor, conn)
# 防止表中没有id返回0
if _id == 0:
return True
return _id
except Exception as e:
print('insert except ', e.args)
conn.rollback()
self.close(cursor, conn)
# self.conn.rollback()
return 0 def insertmany(self, sql='', param=()):
'''
增加多行
'''
cursor, conn = self.db.getconn()
try:
cursor.executemany(sql, param)
conn.commit()
self.close(cursor, conn)
return True
except Exception as e:
print('insert many except ', e.args)
conn.rollback()
self.close(cursor, conn)
return False def delete(self, sql='', param=()):
'''
删除
'''
try:
cursor, conn = self.execute(sql, param)
self.close(cursor, conn)
return True
except Exception as e:
print('delete except ', e.args)
conn.rollback()
self.close(cursor, conn)
return False def update(self, sql='', param=()):
'''
更新
'''
try:
cursor, conn = self.execute(sql, param)
self.close(cursor, conn)
return True
except Exception as e:
print('update except ', e.args)
conn.rollback()
self.close(cursor, conn)
return False @classmethod
def getInstance(self):
if MysqlHelper.mysql == None:
MysqlHelper.mysql = MysqlHelper()
return MysqlHelper.mysql # 执行命令
def execute(self, sql='', param=(), autoclose=False):
cursor, conn = self.db.getconn()
try:
if param:
cursor.execute(sql, param)
else:
cursor.execute(sql)
conn.commit()
if autoclose:
self.close(cursor, conn)
except Exception as e:
pass
return cursor, conn def executemany(self, list=[]):
'''
# 执行多条命令
'[{"sql":"xxx","param":"xx"}....]'
'''
cursor, conn = self.db.getconn()
try:
for order in list:
sql = order['sql']
param = order['param']
if param:
cursor.execute(sql, param)
else:
cursor.execute(sql)
conn.commit()
self.close(cursor, conn)
return True
except Exception as e:
print('execute failed========', e.args)
conn.rollback()
self.close(cursor, conn)
return False def close(self, cursor, conn):
cursor.close()
conn.close()
print("PT连接池释放con和cursor")

mysqlhelper

上一篇:ScheduledThreadPoolExecutor 使用线程池执行定时任务


下一篇:UVA10763:Foreign Exchange&&UVA10340: All in All(水题)