Skip to content

Commit 559e4c4

Browse files
committed
Add pools module
1 parent dd47b39 commit 559e4c4

6 files changed

Lines changed: 204 additions & 2 deletions

File tree

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
include README.rst LICENSE CHANGELOG
22
include runtests.py tox.ini
3-
include example.py
3+
include example.py example_pool

README.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,17 @@ This package contains a fork of PyMySQL supporting Tornado.
1212
Example
1313
-------
1414

15+
Simple
16+
~~~~~~~
17+
1518
.. include:: example.py
1619
:code: python
1720

21+
pool
22+
~~~~~
23+
24+
.. include:: example_pool.py
25+
:code: python
1826

1927
Requirements
2028
-------------

example_pool.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#!/usr/bin/env python
2+
from __future__ import print_function
3+
import random
4+
5+
from tornado import ioloop, gen
6+
from tornado_mysql import pools
7+
8+
9+
pools.DEBUG = True
10+
11+
12+
POOL = pools.Pool(
13+
dict(host='127.0.0.1', port=3306, user='root', passwd='', db='mysql'),
14+
max_idle_connections=1,
15+
max_recycle_sec=3)
16+
17+
18+
@gen.coroutine
19+
def worker(n):
20+
for i in range(10):
21+
t = 1
22+
print(n, "sleeping", t, "seconds")
23+
cur = yield POOL.execute("SELECT SLEEP(%s)", (t,))
24+
print(n, cur.fetchall())
25+
26+
27+
@gen.coroutine
28+
def main():
29+
workers = [worker(i) for i in range(10)]
30+
yield workers
31+
32+
33+
ioloop.IOLoop.current().run_sync(main)
34+
print(POOL._opened_conns)

tornado_mysql/connections.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -738,6 +738,8 @@ def connect(self):
738738
yield self._get_server_information()
739739
yield self._request_authentication()
740740

741+
self.connected_time = self.io_loop.time()
742+
741743
if self.sql_mode is not None:
742744
yield self.query("SET sql_mode=%s" % (self.sql_mode,))
743745

tornado_mysql/cursors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ def nextset(self):
8989
raise gen.Return()
9090
if not current_result.has_next:
9191
raise gen.Return()
92-
conn.next_result()
92+
yield conn.next_result()
9393
self._do_get_result()
9494
raise gen.Return(True)
9595

tornado_mysql/pools.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
"""Connection pool"""
2+
from __future__ import absolute_import, division, print_function
3+
4+
from collections import deque
5+
import sys
6+
import warnings
7+
8+
from tornado.ioloop import IOLoop
9+
from tornado.gen import coroutine, Return
10+
from tornado.concurrent import Future
11+
from tornado_mysql import connect
12+
13+
14+
DEBUG = False
15+
16+
17+
def _debug(*msg):
18+
if DEBUG:
19+
print(*msg)
20+
21+
22+
class Pool(object):
23+
"""Connection pool like Golang's database/sql.DB.
24+
25+
This connection pool is based on autocommit mode.
26+
You can execute query without knowing connection.
27+
28+
When transaction is necessary, you can checkout transaction object.
29+
"""
30+
31+
def __init__(self,
32+
connect_kwargs,
33+
max_idle_connections=1,
34+
max_recycle_sec=3600,
35+
io_loop=None,
36+
):
37+
"""
38+
:param dict connect_kwargs: kwargs for tornado_mysql.connect()
39+
:param int max_idle_connections: Max number of keeping connections.
40+
:param int max_recycle_sec: How long connections are recycled.
41+
"""
42+
connect_kwargs['autocommit'] = True
43+
self.io_loop = io_loop or IOLoop.current()
44+
self.connect_kwargs = connect_kwargs
45+
self.max_idle_connections = max_idle_connections
46+
self.max_recycle_sec = max_recycle_sec
47+
48+
self._opened_conns = 0
49+
self._free_conn = deque()
50+
51+
def _get_conn(self):
52+
now = self.io_loop.time()
53+
while self._free_conn:
54+
conn = self._free_conn.popleft()
55+
if now - conn.connected_time > self.max_recycle_sec:
56+
self._close_async(conn)
57+
continue
58+
_debug("Reusing connection from pool (opened=%d)" % (self._opened_conns,))
59+
fut = Future()
60+
fut.set_result(conn)
61+
return fut
62+
63+
self._opened_conns += 1
64+
_debug("Creating new connection (opened=%d)" % (self._opened_conns,))
65+
return connect(**self.connect_kwargs)
66+
67+
def _put_conn(self, conn):
68+
if (len(self._free_conn) < self.max_idle_connections and
69+
self.io_loop.time() - conn.connected_time < self.max_recycle_sec):
70+
self._free_conn.append(conn)
71+
else:
72+
self._close_async(conn)
73+
74+
def _close_async(self, conn):
75+
self.io_loop.add_future(conn.close_async(), callback=lambda f: None)
76+
self._opened_conns -= 1
77+
78+
def _close_conn(self, conn):
79+
conn.close()
80+
self._opened_conns -= 1
81+
82+
@coroutine
83+
def execute(self, query, params):
84+
"""Execute query in pool.
85+
86+
Returns future yielding closed cursor.
87+
You can get rows, lastrowid, etc from the cursor.
88+
89+
:return: Future of cursor
90+
:rtype: Future
91+
"""
92+
conn = yield self._get_conn()
93+
try:
94+
cur = conn.cursor()
95+
yield cur.execute(query, params)
96+
yield cur.close()
97+
self._put_conn(conn)
98+
except:
99+
self._opened_conns -= 1
100+
conn.close()
101+
raise
102+
raise Return(cur)
103+
104+
@coroutine
105+
def begin(self):
106+
"""Start transaction
107+
108+
Wait to get connection and returns `Transaction` object.
109+
110+
:return: Future[Transaction]
111+
:rtype: Future
112+
"""
113+
conn = yield self._get_conn()
114+
trx = Transaction(self, conn)
115+
raise gen.Return(trx)
116+
117+
118+
class Transaction(object):
119+
"""Represents transaction in pool"""
120+
def __init__(self, pool, conn):
121+
self._pool = pool
122+
self._conn = conn
123+
124+
def _ensure_conn(self):
125+
if self._conn is None:
126+
raise Exception("Transaction is closed already")
127+
128+
def _close(self):
129+
self._pool._put_conn(self._conn)
130+
self._pool = self._conn = None
131+
132+
@coroutine
133+
def execute(self, query, args):
134+
"""
135+
:return: Future[Cursor]
136+
:rtype: Future
137+
"""
138+
self._ensure_conn()
139+
cur = self._conn.cursor()
140+
yield cur.execute(query, args)
141+
raise Return(cur)
142+
143+
@coroutine
144+
def commit(self):
145+
self._ensure_conn()
146+
yield self._conn.commit()
147+
self._close()
148+
149+
@coroutine
150+
def rollback(self):
151+
self._ensure_conn()
152+
yield self._conn.rollback()
153+
self._close()
154+
155+
def __del__(self):
156+
if self._pool is not None:
157+
warnings.warn("Transaction has not committed or rollbacked.")
158+
self._pool._close_conn(self._conn)

0 commit comments

Comments
 (0)