A-A+
python3 dbutils 解决mysql连接关闭及返回数据格式为字段对应值问题
【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】
自己重新封装了一个Mysql的命令执行的类,但是经常会自动关闭mysql数据库的连接,自己写法完全按照官方的来的,但是就是会出问题,自己也是一脸懵逼,后来启用dbutils解决此问题了,但是又出新问题,就是查询返回数据是tuple,而且是0,1,2,3,4……等对应查询的值,并非字段返回字段对应值,艾玛,想着不可能有这么大缺陷,然后一顿狂看接口API,最终找到解决方法。下看如下代码。
DBUtils 是 Python 的一个用于实现数据库连接池的模块 , 此连接池有两种连接模式
模式一 🍀
为每个线程创建一个连接 , 线程即使调用了 close 方法 , 也不会关闭 , 只是把连接重新放到连接池 , 供自己线程再次使用 , 当线程终止时 , 连接自动关闭
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PersistentDB(
# 使用链接数据库的模块
creator=pymysql,
# 一个链接最多被重复使用的次数,None表示无限制
maxusage=None,
# 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
setsession=[],
# ping MySQL服务端,检查是否服务可用
# 0 = None = never,
# 1 = default = whenever it is requested,
# 2 = when a cursor is created,
# 4 = when a query is executed,
# 7 = always
ping=0,
# 如果为False,conn.close()实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接
# 如果为True,conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
closeable=False,
# 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
threadlocal=None,
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8',
cursorclass=DictCursor ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close()
func()
模式二 🍀
创建一批连接到连接池 , 供所有线程共享使用 (由于 pymysql , MySQLdb 等 threadsafety 值为 1 , 所以该模式连接池中的线程会被所有线程共享)
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
# 使用链接数据库的模块
creator=pymysql,
# 连接池允许的最大连接数,0和None表示不限制连接数
maxconnections=6,
# 初始化时,链接池中至少创建的空闲的链接,0表示不创建
mincached=2,
# 链接池中最多闲置的链接,0和None不限制
maxcached=5,
# 链接池中最多共享的链接数量,0和None表示全部共享
# PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享
maxshared=3,
# 连接池中如果没有可用连接后,是否阻塞等待;True,等待;False,不等待然后报错
blocking=True,
# 一个链接最多被重复使用的次数,None表示无限制
maxusage=None,
# 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
setsession=[],
# ping MySQL服务端,检查是否服务可用
# 0 = None = never,
# 1 = default = whenever it is requested,
# 2 = when a cursor is created,
# 4 = when a query is executed,
# 7 = always
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8',
cursorclass=DictCursor ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
)
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则等待或报raise TooManyConnections异常
# 否则则优先去初始化时创建的链接中获取链接 SteadyDBConnection,
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回,
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回,
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用
conn = POOL.connection()
# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()
func()
以上数据来源:https://github.com/lyonyang/blogs/blob/8042919c035f4586196ccf34d85ec4dbe95d06ea/04-Web-Framework/Flask/DBUtils.md
下面奉上我自己封装的python3 mysql调用类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | # -*-coding:utf-8-*- import pymysql import socket import struct import time from flask import jsonify from pymysql.cursors import DictCursor from DBUtils.PooledDB import PooledDB class MysqlPool: def __init__(self, dbname, username, password, host='127.0.0.1', port=3306, charset='utf8'): self.pool = PooledDB( # 使用链接数据库的模块 creator=pymysql, host=host, port=port, user=username, password=password, database=dbname, charset=charset, cursorclass=DictCursor ) def Time_Y_M_D(self, time): return time.strftime('%Y-%m-%d %H:%M:%S', time) def ip2int(self, ip): return struct.unpack("!I", socket.inet_aton(ip))[0] def int2ip(self, ip_num): return socket.inet_ntoa(struct.pack("!I", ip_num)) def linux_10_nowtime(self): return int(time.time()) def sql_execute(self, sql, sql_args=None): try: conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "sql命令执行失败。{}".format(e)}) finally: # cursor.close() print("") def insert_execute(self, sql, sql_args=None): try: conn = self.pool.connection() # 和sql_execute一样的,分开来写,仅仅是为了可读性 with conn.cursor() as cursor: cursor.execute(sql, sql_args) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "添加失败。{}".format(e)}) def update_execute(self, sql, sql_args=None): try: conn = self.pool.connection() # 和sql_execute一样的,分开来写,仅仅是为了可读性 with conn.cursor() as cursor: cursor.execute(sql, sql_args=None) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "更新失败。{}".format(e)}) def delete_execute(self, sql, sql_args=None): try: conn = self.pool.connection() # 和sql_execute一样的,分开来写,仅仅是为了可读性 with conn.cursor() as cursor: cursor.execute(sql, sql_args) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "删除失败。{}".format(e)}) def select_count(self, sql, sql_args=None): try: num = "" conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) result = cursor.fetchall() if isinstance(result[0], dict): num = result[0]["count(*)"] elif isinstance(result[0], list): num = result[0][0] return num # return是在finnally之后执行 except Exception as e: # 异常的时候返回None return jsonify({"status": 500, "data": "数量查询失败。{}".format(e)}) def select_execute(self, sql, sql_args=None): try: conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) result = cursor.fetchall() # result的type为tuple,已经是个具体的集合了 return result except Exception as e: # 异常的时候返回None return jsonify({"status": 500, "data": "sql执行失败。{}".format(e)}) def select_one(self, sql, sql_args=None): try: conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) result = cursor.fetchone() return result # return是在finnally之后执行 except Exception as e: # 异常的时候返回None return jsonify({"status": 500, "data": "sql获取一条数据失败。{}".format(e)}) def __del__(self): self.pool.close() |
布施恩德可便相知重
微信扫一扫打赏
支付宝扫一扫打赏