A-A+

python3 dbutils 解决mysql连接关闭及返回数据格式为字段对应值问题

2019年03月04日 17:07 学习笔记 暂无评论 共5390字 (阅读2,524 views次)

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】

自己重新封装了一个Mysql的命令执行的类,但是经常会自动关闭mysql数据库的连接,自己写法完全按照官方的来的,但是就是会出问题,自己也是一脸懵逼,后来启用dbutils解决此问题了,但是又出新问题,就是查询返回数据是tuple,而且是0,1,2,3,4……等对应查询的值,并非字段返回字段对应值,艾玛,想着不可能有这么大缺陷,然后一顿狂看接口API,最终找到解决方法。下看如下代码。

DBUtils 是 Python 的一个用于实现数据库连接池的模块 , 此连接池有两种连接模式

模式一 🍀

为每个线程创建一个连接 , 线程即使调用了 close 方法 , 也不会关闭 , 只是把连接重新放到连接池 , 供自己线程再次使用 , 当线程终止时 , 连接自动关闭

 

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection

POOL = PersistentDB(
    # 使用链接数据库的模块
    creator=pymysql,  
    
    # 一个链接最多被重复使用的次数,None表示无限制
    maxusage=None,  
    
    # 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
    setsession=[],  
    
    # ping MySQL服务端,检查是否服务可用
    # 0 = None = never, 
    # 1 = default = whenever it is requested, 
    # 2 = when a cursor is created, 
    # 4 = when a query is executed, 
    # 7 = always
    ping=0,
    
    # 如果为False,conn.close()实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接
    # 如果为True,conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    closeable=False,
    
    # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    threadlocal=None,  
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8',
    cursorclass=DictCursor    ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()

 

模式二 🍀

创建一批连接到连接池 , 供所有线程共享使用 (由于 pymysql , MySQLdb 等 threadsafety 值为 1 , 所以该模式连接池中的线程会被所有线程共享)

 

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    # 使用链接数据库的模块
    creator=pymysql,  
    
    # 连接池允许的最大连接数,0和None表示不限制连接数
    maxconnections=6,  
    
    # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    mincached=2,  
    
    # 链接池中最多闲置的链接,0和None不限制
    maxcached=5,  
    
    # 链接池中最多共享的链接数量,0和None表示全部共享
    # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享
    maxshared=3,
    
    # 连接池中如果没有可用连接后,是否阻塞等待;True,等待;False,不等待然后报错
    blocking=True,  
    
    # 一个链接最多被重复使用的次数,None表示无限制
    maxusage=None,  
    
    # 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
    setsession=[],  
    
    # ping MySQL服务端,检查是否服务可用
    # 0 = None = never, 
    # 1 = default = whenever it is requested, 
    # 2 = when a cursor is created, 
    # 4 = when a query is executed, 
    # 7 = always
    ping=0,
    
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8',
    cursorclass=DictCursor    ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
)


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则等待或报raise TooManyConnections异常
    # 否则则优先去初始化时创建的链接中获取链接 SteadyDBConnection,
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回,
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回,
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用
    conn = POOL.connection()

    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()

func()

以上数据来源:https://github.com/lyonyang/blogs/blob/8042919c035f4586196ccf34d85ec4dbe95d06ea/04-Web-Framework/Flask/DBUtils.md

下面奉上我自己封装的python3 mysql调用类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# -*-coding:utf-8-*-
import pymysql
import socket
import struct
import time
from flask import jsonify
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
 
 
class MysqlPool:
 
    def __init__(self, dbname, username, password, host='127.0.0.1', port=3306, charset='utf8'):
        self.pool = PooledDB(
            # 使用链接数据库的模块
            creator=pymysql,
            host=host,
            port=port,
            user=username,
            password=password,
            database=dbname,
            charset=charset,
            cursorclass=DictCursor
        )
 
 
    def Time_Y_M_D(self, time):
        return time.strftime('%Y-%m-%d %H:%M:%S', time)
 
    def ip2int(self, ip):
        return struct.unpack("!I", socket.inet_aton(ip))[0]
 
    def int2ip(self, ip_num):
        return socket.inet_ntoa(struct.pack("!I", ip_num))
 
    def linux_10_nowtime(self):
        return int(time.time())
 
    def sql_execute(self, sql, sql_args=None):
        try:
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "sql命令执行失败。{}".format(e)})
        finally:
            # cursor.close()
            print("")
 
    def insert_execute(self, sql, sql_args=None):
        try:
            conn = self.pool.connection()
            # 和sql_execute一样的,分开来写,仅仅是为了可读性
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "添加失败。{}".format(e)})
 
    def update_execute(self, sql, sql_args=None):
        try:
            conn = self.pool.connection()
            # 和sql_execute一样的,分开来写,仅仅是为了可读性
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args=None)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "更新失败。{}".format(e)})
 
    def delete_execute(self, sql, sql_args=None):
        try:
            conn = self.pool.connection()
            # 和sql_execute一样的,分开来写,仅仅是为了可读性
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "删除失败。{}".format(e)})
 
    def select_count(self, sql, sql_args=None):
        try:
            num = ""
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            result = cursor.fetchall()
            if isinstance(result[0], dict):
                num = result[0]["count(*)"]
            elif isinstance(result[0], list):
                num = result[0][0]
            return num  # return是在finnally之后执行
        except Exception as e:  # 异常的时候返回None
            return jsonify({"status": 500, "data": "数量查询失败。{}".format(e)})
 
    def select_execute(self, sql, sql_args=None):
        try:
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            result = cursor.fetchall()  # result的type为tuple,已经是个具体的集合了
            return result
        except Exception as e:  # 异常的时候返回None
            return jsonify({"status": 500, "data": "sql执行失败。{}".format(e)})
 
    def select_one(self, sql, sql_args=None):
        try:
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            result = cursor.fetchone()
            return result  # return是在finnally之后执行
        except Exception as e:  # 异常的时候返回None
            return jsonify({"status": 500, "data": "sql获取一条数据失败。{}".format(e)})
 
    def __del__(self):
        self.pool.close()

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×
标签:

给我留言