Aestate
pooled_db.py
Go to the documentation of this file.
1 from threading import Condition
2 
3 from . import __version__
4 from .steady_db import connect
5 
6 
7 class PooledDBError(Exception):
8  """General PooledDB error."""
9 
10 
12  """Database connection is invalid."""
13 
14 
16  """DB-API module not supported by PooledDB."""
17 
18 
20  """Too many database connections were opened."""
21 
22 
23 class PooledDB:
24  """Pool for DB-API 2 connections.
25 
26  After you have created the connection pool, you can use
27  connection() to get pooled, steady DB-API 2 connections.
28  """
29 
30  version = __version__
31 
32  def __init__(
33  self, creator, mincached=0, maxcached=0,
34  maxshared=0, maxconnections=0, blocking=False,
35  maxusage=None, setsession=None, reset=True,
36  failures=None, ping=1,
37  *args, **kwargs):
38  """
39 
40  设置DB-API 2连接池。
41 
42  creator:返回新的DB-API 2的任意函数
43  连接对象或符合DB-API 2的数据库模块
44  mincached:池中空闲连接的初始数量
45  (0表示启动时未建立连接)
46  maxcached:池中最大空闲连接数
47  (0或无表示池大小不受限制)
48  maxshared:共享连接的最大数量
49  (0或无表示所有连接都是专用的)
50  当达到此最大数量时,连接为
51  如果被要求共享,则将它们共享。
52  maxconnections:通常允许的最大连接数
53  (0或无表示任意数量的连接)
54  blocking:确定超出最大值时的行为
55  (如果将其设置为true,请阻止并等待,直到
56  连接减少,否则将报告错误)
57  maxusage:单个连接的最大重用次数
58  (0或无表示无限重用)
59  当达到连接的最大使用次数时,
60  连接将自动重置(关闭并重新打开)。
61  setsession:可用于准备的SQL命令的可选列表
62  会话,例如[“将日期样式设置为...”,“将时区设置为...”]
63  reset:返回到池后应如何重置连接
64  (对于以begin()开始的回滚事务,为False或None,
65  出于安全考虑,总是发出回滚是正确的)
66  failures:可选的异常类或异常类的元组
67  为此,应应用连接故障转移机制,
68  如果默认值(OperationalError,InternalError)不足够
69  ping:确定何时应使用ping()检查连接
70  (0 =无=永不,1 =默认=每当从池中获取时,
71  2 =创建游标时,4 =执行查询时,
72  7 =始终,以及这些值的所有其他位组合)
73  args,kwargs:应传递给创建者的参数
74  函数或DB-API 2模块的连接构造函数
75 
76 
77  初始化配置
78  以下参数与PooledDB一致
79  :param creator:默认即可
80  :param maxconnections:默认即可
81  :param mincached:默认即可
82  :param maxcached:默认即可
83  :param maxshared:默认即可
84  :param blocking:默认即可
85  :param setsession:默认即可
86  :param ping:默认即可
87  :param host:数据库IP地址
88  :param port:端口
89  :param user:用户名,如root
90  :param password:密码
91  :param database:数据库名
92  :param charset:编码格式
93  :param POOL:使用自定义的PooledDB,不建议
94  """
95  try:
96  threadsafety = creator.threadsafety
97  except AttributeError:
98  try:
99  if not callable(creator.connect):
100  raise AttributeError
101  except AttributeError:
102  threadsafety = 2
103  else:
104  threadsafety = 0
105  if not threadsafety:
106  raise NotSupportedError("数据库模块未分配线程安全")
107  self._creator = creator
108  self._args, self._kwargs = args, kwargs
109  self._blocking = blocking
110  self._maxusage = maxusage
111  self._setsession = setsession
112  self._reset = reset
113  self._failures = failures
114  self._ping = ping
115  if mincached is None:
116  mincached = 0
117  if maxcached is None:
118  maxcached = 0
119  if maxconnections is None:
120  maxconnections = 0
121  if maxcached:
122  if maxcached < mincached:
123  maxcached = mincached
124  self._maxcached = maxcached
125  else:
126  self._maxcached = 0
127  if threadsafety > 1 and maxshared:
128  self._maxshared = maxshared
129  self._shared_cache = [] # 共享连接的缓存
130  else:
131  self._maxshared = 0
132  if maxconnections:
133  if maxconnections < maxcached:
134  maxconnections = maxcached
135  if maxconnections < maxshared:
136  maxconnections = maxshared
137  self._maxconnections = maxconnections
138  else:
139  self._maxconnections = 0
140  self._idle_cache = [] # 空闲连接池
141  self._lock = Condition()
142  self._connections = 0
143  # Establish an initial number of idle database connections:
144  idle = [self.dedicated_connection() for i in range(mincached)]
145  while idle:
146  idle.pop().close()
147 
148  def steady_connection(self):
149  """获得稳定的,未池化的DB-API 2连接."""
150  return connect(
151  self._creator, self._maxusage, self._setsession,
152  self._failures, self._ping, True, *self._args, **self._kwargs)
153 
154  def connection(self, shareable=True):
155  """从连接池中获得稳定的缓存的DB-API 2连接。
156 
157  :param shareable:允许共享连接
158  """
159  if shareable and self._maxshared:
160  self._lock.acquire()
161  try:
162  while (not self._shared_cache and self._maxconnections
163  and self._connections >= self._maxconnections):
164  self._wait_lock()
165  if len(self._shared_cache) < self._maxshared:
166  # shared cache is not full, get a dedicated connection
167  try: # first try to get it from the idle cache
168  con = self._idle_cache.pop(0)
169  except IndexError: # else get a fresh connection
170  con = self.steady_connection()
171  else:
172  con._ping_check() # check this connection
173  con = SharedDBConnection(con)
174  self._connections += 1
175  else: # shared cache full or no more connections allowed
176  self._shared_cache.sort() # least shared connection first
177  con = self._shared_cache.pop(0) # get it
178  while con.con._transaction:
179  # do not share connections which are in a transaction
180  self._shared_cache.insert(0, con)
181  self._wait_lock()
182  self._shared_cache.sort()
183  con = self._shared_cache.pop(0)
184  con.con._ping_check() # check the underlying connection
185  con.share() # increase share of this connection
186  # put the connection (back) into the shared cache
187  self._shared_cache.append(con)
188  self._lock.notify()
189  finally:
190  self._lock.release()
191  con = PooledSharedDBConnection(self, con)
192  else: # try to get a dedicated connection
193  self._lock.acquire()
194  try:
195  while (self._maxconnections
196  and self._connections >= self._maxconnections):
197  self._wait_lock()
198  # connection limit not reached, get a dedicated connection
199  try: # first try to get it from the idle cache
200  con = self._idle_cache.pop(0)
201  except IndexError: # else get a fresh connection
202  con = self.steady_connection()
203  else:
204  con._ping_check() # check connection
205  con = PooledDedicatedDBConnection(self, con)
206  self._connections += 1
207  finally:
208  self._lock.release()
209  return con
210 
212  """Alias for connection(shareable=False)."""
213  return self.connection(False)
214 
215  def unshare(self, con):
216  """Decrease the share of a connection in the shared cache."""
217  self._lock.acquire()
218  try:
219  con.unshare()
220  shared = con.shared
221  if not shared: # connection is idle,
222  try: # so try to remove it
223  self._shared_cache.remove(con) # from shared cache
224  except ValueError:
225  pass # pool has already been closed
226  finally:
227  self._lock.release()
228  if not shared: # connection has become idle,
229  self.cache(con.con) # so add it to the idle cache
230 
231  def cache(self, con):
232  """Put a dedicated connection back into the idle cache."""
233  self._lock.acquire()
234  try:
235  if not self._maxcached or len(self._idle_cache) < self._maxcached:
236  con._reset(force=self._reset) # rollback possible transaction
237  # the idle cache is not full, so put it there
238  self._idle_cache.append(con) # append it to the idle cache
239  else: # if the idle cache is already full,
240  con.close() # then close the connection
241  self._connections -= 1
242  self._lock.notify()
243  finally:
244  self._lock.release()
245 
246  def close(self):
247  """Close all connections in the pool."""
248  self._lock.acquire()
249  try:
250  while self._idle_cache: # close all idle connections
251  con = self._idle_cache.pop(0)
252  try:
253  con.close()
254  except Exception:
255  pass
256  if self._maxshared: # close all shared connections
257  while self._shared_cache:
258  con = self._shared_cache.pop(0).con
259  try:
260  con.close()
261  except Exception:
262  pass
263  self._connections -= 1
264  self._lock.notifyAll()
265  finally:
266  self._lock.release()
267 
268  def __del__(self):
269  """Delete the pool."""
270  try:
271  self.close()
272  except: # builtin Exceptions might not exist any more
273  pass
274 
275  def _wait_lock(self):
276  """Wait until notified or report an error."""
277  if not self._blocking:
278  raise TooManyConnections
279  self._lock.wait()
280 
281 
282 # Auxiliary classes for pooled connections
283 
285  """Auxiliary proxy class for pooled dedicated connections."""
286 
287  def __init__(self, pool, con):
288  """Create a pooled dedicated connection.
289 
290  pool: the corresponding PooledDB instance
291  con: the underlying SteadyDB connection
292  """
293  # basic initialization to make finalizer work
294  self._con = None
295  # proper initialization of the connection
296  if not con.threadsafety():
297  raise NotSupportedError("数据库模块未分配线程安全")
298  self._pool = pool
299  self._con = con
300 
301  def close(self):
302  """Close the pooled dedicated connection."""
303  # Instead of actually closing the connection,
304  # return it to the pool for future reuse.
305  if self._con:
306  self._pool.cache(self._con)
307  self._con = None
308 
309  def __getattr__(self, name):
310  """Proxy all members of the class."""
311  if self._con:
312  return getattr(self._con, name)
313  else:
314  raise InvalidConnection
315 
316  def __del__(self):
317  """Delete the pooled connection."""
318  try:
319  self.close()
320  except: # builtin Exceptions might not exist any more
321  pass
322 
323 
325  """Auxiliary class for shared connections."""
326 
327  def __init__(self, con):
328  """Create a shared connection.
329 
330  con: the underlying SteadyDB connection
331  """
332  self.con = con
333  self.shared = 1
334 
335  def __lt__(self, other):
336  if self.con._transaction == other.con._transaction:
337  return self.shared < other.shared
338  else:
339  return not self.con._transaction
340 
341  def __le__(self, other):
342  if self.con._transaction == other.con._transaction:
343  return self.shared <= other.shared
344  else:
345  return not self.con._transaction
346 
347  def __eq__(self, other):
348  return (self.con._transaction == other.con._transaction
349  and self.shared == other.shared)
350 
351  def __ne__(self, other):
352  return not self.__eq__(other)
353 
354  def __gt__(self, other):
355  return other.__lt__(self)
356 
357  def __ge__(self, other):
358  return other.__le__(self)
359 
360  def share(self):
361  """Increase the share of this connection."""
362  self.shared += 1
363 
364  def unshare(self):
365  """Decrease the share of this connection."""
366  self.shared -= 1
367 
368 
370  """Auxiliary proxy class for pooled shared connections."""
371 
372  def __init__(self, pool, shared_con):
373  """Create a pooled shared connection.
374 
375  pool: the corresponding PooledDB instance
376  con: the underlying SharedDBConnection
377  """
378  # basic initialization to make finalizer work
379  self._con = None
380  # proper initialization of the connection
381  con = shared_con.con
382  if not con.threadsafety() > 1:
383  raise NotSupportedError("数据库模块未分配线程安全")
384  self._pool = pool
385  self._shared_con = shared_con
386  self._con = con
387 
388  def close(self):
389  """Close the pooled shared connection."""
390  # Instead of actually closing the connection,
391  # unshare it and/or return it to the pool.
392  if self._con:
393  self._pool.unshare(self._shared_con)
394  self._shared_con = self._con = None
395 
396  def __getattr__(self, name):
397  """Proxy all members of the class."""
398  if self._con:
399  return getattr(self._con, name)
400  else:
401  raise InvalidConnection
402 
403  def __del__(self):
404  """Delete the pooled connection."""
405  try:
406  self.close()
407  except: # builtin Exceptions might not exist any more
408  pass
aestate.opera.DBPool.pooled_db.SharedDBConnection.__lt__
def __lt__(self, other)
Definition: pooled_db.py:335
aestate.opera.DBPool.pooled_db.PooledDB.dedicated_connection
def dedicated_connection(self)
Definition: pooled_db.py:211
aestate.opera.DBPool.pooled_db.PooledDedicatedDBConnection.__del__
def __del__(self)
Definition: pooled_db.py:316
aestate.opera.DBPool.pooled_db.PooledDB.__del__
def __del__(self)
Definition: pooled_db.py:268
aestate.opera.DBPool.pooled_db.PooledDedicatedDBConnection.__init__
def __init__(self, pool, con)
Definition: pooled_db.py:287
aestate.opera.DBPool.pooled_db.PooledDB._kwargs
_kwargs
Definition: pooled_db.py:103
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection.__getattr__
def __getattr__(self, name)
Definition: pooled_db.py:396
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection._con
_con
Definition: pooled_db.py:379
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection._pool
_pool
Definition: pooled_db.py:384
aestate.opera.DBPool.pooled_db.PooledDedicatedDBConnection._con
_con
Definition: pooled_db.py:294
aestate.opera.DBPool.pooled_db.PooledDedicatedDBConnection.close
def close(self)
Definition: pooled_db.py:301
aestate.opera.DBPool.pooled_db.InvalidConnection
Definition: pooled_db.py:11
aestate.opera.DBPool.pooled_db.PooledDedicatedDBConnection.__getattr__
def __getattr__(self, name)
Definition: pooled_db.py:309
aestate.opera.DBPool.pooled_db.PooledDB._idle_cache
_idle_cache
Definition: pooled_db.py:135
aestate.opera.DBPool.pooled_db.PooledDB._maxconnections
_maxconnections
Definition: pooled_db.py:132
aestate.opera.DBPool.pooled_db.SharedDBConnection.shared
shared
Definition: pooled_db.py:333
aestate.opera.DBPool.pooled_db.PooledDB.connection
def connection(self, shareable=True)
Definition: pooled_db.py:154
aestate.opera.DBPool.pooled_db.PooledDedicatedDBConnection
Definition: pooled_db.py:284
aestate.opera.DBPool.pooled_db.SharedDBConnection.__init__
def __init__(self, con)
Definition: pooled_db.py:327
aestate.opera.DBPool.pooled_db.PooledDB._maxcached
_maxcached
Definition: pooled_db.py:119
aestate.opera.DBPool.pooled_db.PooledDB._failures
_failures
Definition: pooled_db.py:108
aestate.opera.DBPool.pooled_db.PooledDB.__init__
def __init__(self, creator, mincached=0, maxcached=0, maxshared=0, maxconnections=0, blocking=False, maxusage=None, setsession=None, reset=True, failures=None, ping=1, *args, **kwargs)
Definition: pooled_db.py:32
aestate.opera.DBPool.pooled_db.SharedDBConnection.__eq__
def __eq__(self, other)
Definition: pooled_db.py:347
aestate.opera.DBPool.pooled_db.PooledDedicatedDBConnection._pool
_pool
Definition: pooled_db.py:298
aestate.opera.DBPool.pooled_db.SharedDBConnection
Definition: pooled_db.py:324
aestate.opera.DBPool.pooled_db.PooledDB._shared_cache
_shared_cache
Definition: pooled_db.py:124
aestate.opera.DBPool.pooled_db.PooledDB.cache
def cache(self, con)
Definition: pooled_db.py:231
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection.__init__
def __init__(self, pool, shared_con)
Definition: pooled_db.py:372
aestate.opera.DBPool.pooled_db.TooManyConnections
Definition: pooled_db.py:19
aestate.opera.DBPool.steady_db.connect
def connect(creator, maxusage=None, setsession=None, failures=None, ping=1, closeable=True, *args, **kwargs)
Definition: steady_db.py:16
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection.__del__
def __del__(self)
Definition: pooled_db.py:403
aestate.opera.DBPool.pooled_db.SharedDBConnection.__ge__
def __ge__(self, other)
Definition: pooled_db.py:357
aestate.opera.DBPool.pooled_db.SharedDBConnection.__le__
def __le__(self, other)
Definition: pooled_db.py:341
aestate.opera.DBPool.pooled_db.SharedDBConnection.unshare
def unshare(self)
Definition: pooled_db.py:364
aestate.opera.DBPool.pooled_db.PooledDBError
Definition: pooled_db.py:7
aestate.opera.DBPool.pooled_db.SharedDBConnection.con
con
Definition: pooled_db.py:332
aestate.opera.DBPool.pooled_db.PooledDB._connections
_connections
Definition: pooled_db.py:137
aestate.opera.DBPool.pooled_db.PooledDB.steady_connection
def steady_connection(self)
Definition: pooled_db.py:148
aestate.opera.DBPool.pooled_db.PooledDB._wait_lock
def _wait_lock(self)
Definition: pooled_db.py:275
aestate.opera.DBPool.pooled_db.PooledDB._creator
_creator
Definition: pooled_db.py:102
aestate.opera.DBPool.pooled_db.SharedDBConnection.__ne__
def __ne__(self, other)
Definition: pooled_db.py:351
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection.close
def close(self)
Definition: pooled_db.py:388
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection
Definition: pooled_db.py:369
aestate.opera.DBPool.pooled_db.PooledDB._ping
_ping
Definition: pooled_db.py:109
aestate.opera.DBPool.pooled_db.SharedDBConnection.share
def share(self)
Definition: pooled_db.py:360
aestate.opera.DBPool.pooled_db.PooledDB
Definition: pooled_db.py:23
aestate.opera.DBPool.pooled_db.PooledDB._reset
_reset
Definition: pooled_db.py:107
aestate.opera.DBPool.pooled_db.PooledDB.close
def close(self)
Definition: pooled_db.py:246
aestate.opera.DBPool.pooled_db.NotSupportedError
Definition: pooled_db.py:15
aestate.opera.DBPool.pooled_db.PooledSharedDBConnection._shared_con
_shared_con
Definition: pooled_db.py:385
aestate.opera.DBPool.pooled_db.PooledDB.unshare
def unshare(self, con)
Definition: pooled_db.py:215
aestate.opera.DBPool.pooled_db.PooledDB._lock
_lock
Definition: pooled_db.py:136
aestate.opera.DBPool.pooled_db.PooledDB._maxshared
_maxshared
Definition: pooled_db.py:123
aestate.opera.DBPool.pooled_db.SharedDBConnection.__gt__
def __gt__(self, other)
Definition: pooled_db.py:354
aestate.opera.DBPool.pooled_db.PooledDB._maxusage
_maxusage
Definition: pooled_db.py:105
aestate.opera.DBPool.pooled_db.PooledDB._setsession
_setsession
Definition: pooled_db.py:106
aestate.opera.DBPool.pooled_db.PooledDB._blocking
_blocking
Definition: pooled_db.py:104