Merge lp:~salgado/psycogreen/gevent-connection-pool into lp:~ubuntuone-pqm-team/psycogreen/trunk

Proposed by Guilherme Salgado
Status: Merged
Approved by: Guilherme Salgado
Approved revision: 21
Merged at revision: 21
Proposed branch: lp:~salgado/psycogreen/gevent-connection-pool
Merge into: lp:~ubuntuone-pqm-team/psycogreen/trunk
Diff against target: 144 lines (+121/-0)
2 files modified
psycogreen/gevent.py (+53/-0)
tests/test_gevent_connection_pool.py (+68/-0)
To merge this branch: bzr merge lp:~salgado/psycogreen/gevent-connection-pool
Reviewer Review Type Date Requested Status
Sidnei da Silva (community) Approve
Samuele Pedroni Approve
Review via email: mp+156627@code.launchpad.net

Commit message

Add a gevent ConnectionPool class, which allows us to share connections between greenlets as it serializes access to them.

Description of the change

Add a gevent ConnectionPool class, which allows us to share connections between greenlets as it serializes access to them.

To post a comment you must log in.
Revision history for this message
Samuele Pedroni (pedronis) wrote :

looks good

review: Approve
Revision history for this message
Sidnei da Silva (sidnei) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'psycogreen/gevent.py'
2--- psycogreen/gevent.py 2012-10-10 23:17:24 +0000
3+++ psycogreen/gevent.py 2013-04-02 16:16:30 +0000
4@@ -10,10 +10,14 @@
5 from __future__ import absolute_import
6
7 import psycopg2
8+from psycopg2.extras import RealDictConnection
9 from psycopg2 import extensions
10
11+from gevent.coros import Semaphore
12+from gevent.local import local as gevent_local
13 from gevent.socket import wait_read, wait_write
14
15+
16 def patch_psycopg():
17 """Configure Psycopg to be used with gevent in non-blocking way."""
18 if not hasattr(extensions, 'set_wait_callback'):
19@@ -36,3 +40,52 @@
20 else:
21 raise psycopg2.OperationalError(
22 "Bad result from poll: %r" % state)
23+
24+
25+class ConnectionPool(object):
26+
27+ def __init__(self, dsn, max_con=10, max_idle=3,
28+ connection_factory=RealDictConnection):
29+ self.dsn = dsn
30+ self.max_con = max_con
31+ self.max_idle = max_idle
32+ self.connection_factory = connection_factory
33+ self._sem = Semaphore(max_con)
34+ self._free = []
35+ self._local = gevent_local()
36+
37+ def __enter__(self):
38+ self._sem.acquire()
39+ try:
40+ if getattr(self._local, 'con', None) is not None:
41+ raise RuntimeError("Attempting to re-enter connection pool?")
42+ if self._free:
43+ con = self._free.pop()
44+ else:
45+ con = psycopg2.connect(
46+ dsn=self.dsn, connection_factory=self.connection_factory)
47+ self._local.con = con
48+ return con
49+ except StandardError:
50+ self._sem.release()
51+ raise
52+
53+ def __exit__(self, exc_type, exc_value, traceback):
54+ try:
55+ if self._local.con is None:
56+ raise RuntimeError("Exit connection pool with no connection?")
57+ if exc_type is not None:
58+ self.rollback()
59+ else:
60+ self.commit()
61+ if len(self._free) < self.max_idle:
62+ self._free.append(self._local.con)
63+ self._local.con = None
64+ finally:
65+ self._sem.release()
66+
67+ def commit(self):
68+ self._local.con.commit()
69+
70+ def rollback(self):
71+ self._local.con.rollback()
72
73=== added file 'tests/test_gevent_connection_pool.py'
74--- tests/test_gevent_connection_pool.py 1970-01-01 00:00:00 +0000
75+++ tests/test_gevent_connection_pool.py 2013-04-02 16:16:30 +0000
76@@ -0,0 +1,68 @@
77+import unittest
78+
79+import gevent
80+import psycopg2
81+
82+from psycogreen.gevent import ConnectionPool
83+
84+
85+class FakeConnection(object):
86+
87+ rollback_called = False
88+ commit_called = False
89+
90+ def rollback(self):
91+ self.rollback_called = True
92+
93+ def commit(self):
94+ self.commit_called = True
95+
96+
97+class TestConnectionPool(unittest.TestCase):
98+
99+ def setUp(self):
100+ super(TestConnectionPool, self).setUp()
101+ self._orig_connect = psycopg2.connect
102+ self.conn = FakeConnection()
103+ self.pool = ConnectionPool('bogus-dsn')
104+ psycopg2.connect = lambda **kwargs: self.conn
105+ self.addCleanup(self._restore_psycopg2_connect)
106+
107+ def _restore_psycopg2_connect(self):
108+ psycopg2.connect = self._orig_connect
109+
110+ def test_commit_is_called_on_success(self):
111+ with self.pool:
112+ self.assertTrue(isinstance(self.pool._local, gevent.local.local))
113+ self.assertEqual(self.conn, self.pool._local.con)
114+ self.assertTrue(self.conn.commit_called)
115+
116+ def test_rollback_is_called_on_success(self):
117+ try:
118+ with self.pool:
119+ raise ValueError("anything")
120+ except ValueError:
121+ pass
122+ self.assertTrue(self.conn.rollback_called)
123+
124+ def test_cannot_reenter_connection_pool(self):
125+ try:
126+ with self.pool:
127+ with self.pool:
128+ pass
129+ except RuntimeError:
130+ pass
131+ else:
132+ self.fail("Should not be allowed to re-enter pool")
133+
134+ def test_pool_locks_when_max_connections_reached(self):
135+ pool = ConnectionPool('bogus-dsn', max_con=1)
136+ def f():
137+ with pool:
138+ self.assertTrue(pool._sem.locked())
139+
140+ gevent.joinall([gevent.spawn(f)])
141+
142+
143+if __name__ == "__main__":
144+ unittest.main()

Subscribers

People subscribed via source and target branches

to all changes: