编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

数据库连接池,本地线程,上下文管理

wxchong 2024-09-19 06:02:08 开源技术 17 ℃ 0 评论

一、数据库连接池

flask 中是没有 ORM 的,如果在 flask 里要连接数据库有两种方式

一:pymysql
二:SQLAlchemy
 是python 操作数据库的一个库。能够进行 orm 映射官方文档 sqlchemy
 SQLAlchemy“采用简单的Python语言,为高效和高性能的数据库访问设计,实现了完整的企业级持久模型”。SQLAlchemy的理念是,SQL数据库的量级和性能重要于对象集合;而对象集合的抽象又重要于表和行。

1. 链接池原理

- DBUtils数据库链接池 
 - 模式一:基于threaing.local实现为每一个线程创建一个连接,关闭是伪关闭,当前线程可以重复
 - 模式二:连接池原理
 - 可以设置连接池中最大连接数 9
 - 默认启动时,连接池中创建连接 5
 
 - 如果有三个线程来数据库中获取连接:
 - 如果三个同时来的,一人给一个链接
 - 如果一个一个来,有时间间隔,用一个链接就可以为三个线程提供服务
 - 说不准
 有可能:1个链接就可以为三个线程提供服务
 有可能:2个链接就可以为三个线程提供服务
 有可能:3个链接就可以为三个线程提供服务
 PS、:maxshared在使用pymysql中均无用。链接数据库的模块:只有threadsafety>1的时候才有用

2. 不使用连接池链接数据库

方式一:每次操作都要链接数据库,链接次数过多

#!usr/bin/env python3
# -*- coding:utf-8 -*-

import pymysql
from flask import Flask

app = Flask(__name__)

@app.route('/index')
def index():
 # 链接数据库
 conn = pymysql.connect(host="127.0.0.1",port=3306,user='root',password='123', database='pooldb',charset='utf8')
 cursor = conn.cursor()
 cursor.execute("select * from td where id=%s", [5, ])
 result = cursor.fetchall() # 获取数据
 cursor.close()
 conn.close() # 关闭链接
 print(result)
 return "执行成功"

if __name__ == '__main__':
 app.run(debug=True)

这种方式每次请求,反复创建数据库链接,多次链接数据库会非常耗时

这时,我们会想到一种解决方法,就是把数据库链接放到全局,即方式二

方式二:不支持并发

#!usr/bin/env python3
# -*- coding:utf-8 -*-

import pymysql
from flask import Flask
from threading import RLock

app = Flask(__name__)
CONN = pymysql.connect(host="127.0.0.1",port=3306,user='root',password='123', database='pooldb',charset='utf8')
# 方式二:放在全局,如果是单线程,这样就可以,但是如果是多线程,就得加把锁。这样就成串行的了, 不支持并发,也不好。所有我们选择用数据库连接池
@app.route('/index')
def index():
 with RLock:
 cursor = CONN.cursor()
 cursor.execute("select * from td where id=%s", [5, ])
 result = cursor.fetchall() # 获取数据
 cursor.close()
 print(result)
 return "执行成功"
if __name__ == '__main__':
 app.run(debug=True)

由于上面两种方案都不完美,所以得把方式一和方式二联合一下(既让减少链接次数,也能支持并发)所有了方式三,需要导入一个 DButils 模块,基于 DButils 实现的数据库连接池

3. 基于 DButils 实现的数据库连接池

模式一

为每一个线程创建一个链接(是基于本地线程来实现的。thread.local),每个线程独立使用自己的数据库链接,该线程关闭不是真正的关闭,本线程再次调用时,还是使用的最开始创建的链接,直到线程终止,数据库链接才关闭。

#!usr/bin/env python3
# -*- coding:utf-8 -*-

from flask import Flask
from DBUtils.PersistentDB import PersistentDB
import pymysql

app = Flask(__name__)

POOL = PersistentDB(
 creator=pymysql, # 使用链接数据库的模块
 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
 ping=0,
 # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
 closeable=False,
 # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
 threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
 host='127.0.0.1',
 port=3306,
 user='root',
 password='123',
 database='pooldb',
 charset='utf8'
)

@app.route('/func')
def func():
  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close() # 不是真的关闭,而是假的关闭。 conn = pymysql.connect() conn.close()

  conn = POOL.connection()
  cursor = conn.cursor()
  cursor.execute('select * from tb1')
  result = cursor.fetchall()
  cursor.close()
  conn.close()
if __name__ == '__main__': app.run(debug=True)

缺点:如果线程比较多,还是会创建很多连接

模式二(推荐)

创建一个链接池,为所有线程提供连接,使用时来进行获取,使用完毕后在放回到连接池。

PS:假设最大链接数有 10 个,其实也就是一个列表,当你 pop 一个,系统会再 append 一个,链接池的所有的链接都是按照排队的这样的方式来链接的。链接池里所有的链接都能重复使用,共享的, 即实现了并发,又防止了链接次数太多

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection

POOL = PooledDB(
 creator=pymysql, # 使用链接数据库的模块
 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建


 maxcached=5, # 链接池中最多闲置的链接,0和None不限制
 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
 ping=0,
 # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
 host='127.0.0.1',
 port=3306,
 user='root',
 password='123',
 database='pooldb',
 charset='utf8'
)


def func():
 # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
 # 否则
 # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
 # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
 # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
 # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。

 # PooledDedicatedDBConnection
 conn = POOL.connection()

 # print(th, '链接被拿走了', conn1._con)
 # print(th, '池子里目前有', pool._idle_cache, '\r\n')

 cursor = conn.cursor()
 cursor.execute('select * from tb1')
 result = cursor.fetchall()
 conn.close()

func()

二、本地线程

本地线程:保证每个线程都只有自己的一份数据,在操作时不会影响别人的,即使是多线程,自己的值也是互相隔离的

没用线程之前

import threading
import time

class Foo(object):
 def __init__(self):
 self.name = None
local_values = Foo()

def func(num):
 time.sleep(2)
 local_values.name = num
 print(local_values.name,threading.current_thread().name)

for i in range(5):
 th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
 th.start()

打印结果:

1 线程1
0 线程0
2 线程2
3 线程3
4 线程4

用了本地线程之后

import threading
import time
# 本地线程对象
local_values = threading.local()
def func(num):

 """
 # 第一个线程进来,本地线程对象会为他创建一个
 # 第二个线程进来,本地线程对象会为他创建一个
 {
 线程1的唯一标识:{name:1},
 线程2的唯一标识:{name:2},
 }
 :param num:
 :return:
 """
 local_values.name = num # 4
 # 线程停下来了
 time.sleep(2)
 # 第二个线程: local_values.name,去local_values中根据自己的唯一标识作为key,获取value中name对应的值
 print(local_values.name, threading.current_thread().name)


for i in range(5):
 th = threading.Thread(target=func, args=(i,), name='线程%s' % i)
 th.start()

打印结果:

1 线程1
2 线程2
0 线程0
4 线程4
3 线程3

三、上下文管理

flask 的 request 和 session 设置方式比较新颖,如果没有这种方式,那么就只能通过参数的传递。

flask 是如何做的呢?

- 本地线程:是Flask自己创建的一个线程(猜想:内部是不是基于本地线程做的?)
 vals = threading.local()
 def task(arg):
 vals.name = num
 - 每个线程进来都是打印的自己的,只有自己的才能修改,
 - 通过他就能保证每一个线程里面有一个数据库链接,通过他就能创建出数据库链接池的第一种模式
 - 上下文原理
 - 类似于本地线程
 - 猜想:内部是不是基于本地线程做的?不是,是一个特殊的字典

1. 上下文原理

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from functools import partial
from flask.globals import LocalStack, LocalProxy
 
ls = LocalStack()
 
 
class RequestContext(object):
 def __init__(self, environ):
 self.request = environ
 
 
def _lookup_req_object(name):
 top = ls.top
 if top is None:
 raise RuntimeError(ls)
 return getattr(top, name)
 
 
session = LocalProxy(partial(_lookup_req_object, 'request'))
 
ls.push(RequestContext('c1')) # 当请求进来时,放入
print(session) # 视图函数使用
print(session) # 视图函数使用
ls.pop() # 请求结束pop
 
 
ls.push(RequestContext('c2'))
print(session)
 
ls.push(RequestContext('c3'))
print(session)

2. Flask 内部实现

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from greenlet import getcurrent as get_ident
 
 
def release_local(local):
 local.__release_local__()
 
 
class Local(object):
 __slots__ = ('__storage__', '__ident_func__')  # __slots__的作用是用tuple定义允许绑定的属性名称
 
 def __init__(self):
 # self.__storage__ = {}  
 # self.__ident_func__ = get_ident 等价于下面两句,之所以这样,是因为如果直接按这种方式设置,通过.会自动调用__setattr___,而在下面的__setattr__中
      又要获取__storage__等方法的值,这样会会形成递归,所以采用这张设置方法

 object.__setattr__(self, '__storage__', {})
 object.__setattr__(self, '__ident_func__', get_ident)
 
 def __release_local__(self):
 self.__storage__.pop(self.__ident_func__(), None)
 
 def __getattr__(self, name):
 try:
 return self.__storage__[self.__ident_func__()][name]
 except KeyError:
 raise AttributeError(name)
 
 def __setattr__(self, name, value):
 ident = self.__ident_func__()
 storage = self.__storage__
 try:
 storage[ident][name] = value
 except KeyError:
 storage[ident] = {name: value}
 
 def __delattr__(self, name):
 try:
 del self.__storage__[self.__ident_func__()][name]
 except KeyError:
 raise AttributeError(name)
 
 
class LocalStack(object):

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表