Merge lp:~ccx/txpostgres/notify into lp:~wulczer/txpostgres/trunk

Proposed by Jan Pobrislo
Status: Needs review
Proposed branch: lp:~ccx/txpostgres/notify
Merge into: lp:~wulczer/txpostgres/trunk
Diff against target: 166 lines (+80/-4)
2 files modified
test/test_txpostgres.py (+15/-0)
txpostgres/txpostgres.py (+65/-4)
To merge this branch: bzr merge lp:~ccx/txpostgres/notify
Reviewer Review Type Date Requested Status
Jan UrbaƄski Pending
Review via email: mp+38376@code.launchpad.net

Description of the change

Support for NOTIFY/LISTEN events as supported by psycopg2 by connection.notifies.

To post a comment you must log in.

Unmerged revisions

17. By Jan Pobrislo

add test case for notify events

16. By Jan Pobrislo

handle adding/removing reader and writer more elegantly

15. By Jan Pobrislo

Add support for NOTIFY/LISTEN events.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'test/test_txpostgres.py'
--- test/test_txpostgres.py 2010-07-02 00:55:29 +0000
+++ test/test_txpostgres.py 2010-10-14 00:11:49 +0000
@@ -305,6 +305,21 @@
305 d = c.execute("drop table simple")305 d = c.execute("drop table simple")
306 return d.addCallback(lambda _: self.conn.close())306 return d.addCallback(lambda _: self.conn.close())
307307
308class TxPostgresNotifyObserverTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
309
310 def test_notifyObserver(self):
311 notify_d = defer.Deferred()
312 self.conn.addNotifyObserver(
313 lambda pid, name: notify_d.callback((pid,name))
314 )
315 c = self.conn.cursor()
316 d = c.execute("LISTEN FOO"
317 ).addCallback(
318 lambda c: c.execute("NOTIFY FOO")
319 ).addCallback(
320 lambda c: notify_d
321 )
322 return d
308323
309class TxPostgresManualQueryTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):324class TxPostgresManualQueryTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
310325
311326
=== modified file 'txpostgres/txpostgres.py'
--- txpostgres/txpostgres.py 2010-07-02 00:55:29 +0000
+++ txpostgres/txpostgres.py 2010-10-14 00:11:49 +0000
@@ -54,6 +54,10 @@
54 prefix = "pollable"54 prefix = "pollable"
55 _pollingD = None55 _pollingD = None
5656
57 def __init__(self):
58 self._is_reading = False
59 self._is_writing = False
60
57 def pollable(self):61 def pollable(self):
58 """62 """
59 Return the pollable object. Subclasses should override this.63 Return the pollable object. Subclasses should override this.
@@ -62,6 +66,26 @@
62 """66 """
63 raise NotImplementedError()67 raise NotImplementedError()
6468
69 def addReader(self):
70 if not self._is_reading:
71 self.reactor.addReader(self)
72 self._is_reading = True
73
74 def addWriter(self):
75 if not self._is_writing:
76 self.reactor.addWriter(self)
77 self._is_writing = True
78
79 def removeReader(self):
80 if self._is_reading:
81 self.reactor.removeReader(self)
82 self._is_reading = False
83
84 def removeWriter(self):
85 if self._is_writing:
86 self.reactor.removeWriter(self)
87 self._is_writing = False
88
65 def poll(self):89 def poll(self):
66 """90 """
67 Start polling the wrapped pollable.91 Start polling the wrapped pollable.
@@ -85,9 +109,9 @@
85 d, self._pollingD = self._pollingD, None109 d, self._pollingD = self._pollingD, None
86 d.callback(self)110 d.callback(self)
87 elif self._pollingState == psycopg2.extensions.POLL_WRITE:111 elif self._pollingState == psycopg2.extensions.POLL_WRITE:
88 self.reactor.addWriter(self)112 self.addWriter()
89 elif self._pollingState == psycopg2.extensions.POLL_READ:113 elif self._pollingState == psycopg2.extensions.POLL_READ:
90 self.reactor.addReader(self)114 self.addReader()
91 else:115 else:
92 d, self._pollingD = self._pollingD, None116 d, self._pollingD = self._pollingD, None
93 d.errback(UnexpectedPollResult())117 d.errback(UnexpectedPollResult())
@@ -95,11 +119,11 @@
95 return ret119 return ret
96120
97 def doRead(self):121 def doRead(self):
98 self.reactor.removeReader(self)122 self.removeReader()
99 self.poll()123 self.poll()
100124
101 def doWrite(self):125 def doWrite(self):
102 self.reactor.removeWriter(self)126 self.removeWriter()
103 self.poll()127 self.poll()
104128
105 def logPrefix(self):129 def logPrefix(self):
@@ -132,6 +156,8 @@
132 """156 """
133157
134 def __init__(self, cursor, connection):158 def __init__(self, cursor, connection):
159 super(Cursor, self).__init__()
160
135 self.reactor = connection.reactor161 self.reactor = connection.reactor
136 self.prefix = "cursor"162 self.prefix = "cursor"
137163
@@ -237,6 +263,8 @@
237 cursorFactory = Cursor263 cursorFactory = Cursor
238264
239 def __init__(self, reactor=None):265 def __init__(self, reactor=None):
266 super(Connection, self).__init__()
267
240 if not reactor:268 if not reactor:
241 from twisted.internet import reactor269 from twisted.internet import reactor
242 self.reactor = reactor270 self.reactor = reactor
@@ -245,10 +273,39 @@
245 # this lock will be used to prevent concurrent query execution273 # this lock will be used to prevent concurrent query execution
246 self.lock = defer.DeferredLock()274 self.lock = defer.DeferredLock()
247 self._connection = None275 self._connection = None
276 self._notify_observers = set()
248277
249 def pollable(self):278 def pollable(self):
250 return self._connection279 return self._connection
251280
281 def addNotifyObserver(self, callback):
282 """
283 Add callable to be called back when NOTIFY event are delivered.
284
285 @type callback: Callable that takes pid, name as parameters.
286 """
287 self._notify_observers.add(callback)
288 if self.pollable():
289 if self not in self.reactor.getReaders():
290 self.reactor.addReader(self)
291
292 def removeNotifyObserver(self, callback):
293 """
294 Remove callable from NOTIFY event are delivery.
295
296 @type callback: Callable previously registered by addNotifyObserver.
297 """
298 self._notify_observers.remove(callback)
299
300 def doRead(self):
301 super(Connection, self).doRead()
302 if self._notify_observers and self.pollable():
303 self.addReader()
304 while self.notifies:
305 pid, name = self.notifies.pop(0)
306 for observer in self._notify_observers:
307 observer(pid, name)
308
252 def connect(self, *args, **kwargs):309 def connect(self, *args, **kwargs):
253 """310 """
254 Connect to the database.311 Connect to the database.
@@ -276,6 +333,8 @@
276 """333 """
277 _connection, self._connection = self._connection, None334 _connection, self._connection = self._connection, None
278 _connection.close()335 _connection.close()
336 self.removeReader()
337 self.removeWriter()
279338
280 def cursor(self):339 def cursor(self):
281 """340 """
@@ -575,3 +634,5 @@
575 c = self.connections.pop()634 c = self.connections.pop()
576 d = c.runInteraction(interaction, *args, **kwargs)635 d = c.runInteraction(interaction, *args, **kwargs)
577 return d.addBoth(self._putBackAndPassthrough, c)636 return d.addBoth(self._putBackAndPassthrough, c)
637
638# vim: ft=python et sts=4 ts=4

Subscribers

People subscribed via source and target branches

to all changes: