Merge lp:~ccx/txpostgres/notify into lp:~wulczer/txpostgres/trunk

Proposed by Jan Pobrislo
Status: Needs review
Proposed branch: lp:~ccx/txpostgres/notify
Merge into: lp:~wulczer/txpostgres/trunk
Diff against target: 166 lines (+80/-4)
2 files modified
test/test_txpostgres.py (+15/-0)
txpostgres/txpostgres.py (+65/-4)
To merge this branch: bzr merge lp:~ccx/txpostgres/notify
Reviewer Review Type Date Requested Status
Jan UrbaƄski Pending
Review via email: mp+38376@code.launchpad.net

Description of the change

Support for NOTIFY/LISTEN events as supported by psycopg2 by connection.notifies.

To post a comment you must log in.

Unmerged revisions

17. By Jan Pobrislo

add test case for notify events

16. By Jan Pobrislo

handle adding/removing reader and writer more elegantly

15. By Jan Pobrislo

Add support for NOTIFY/LISTEN events.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'test/test_txpostgres.py'
2--- test/test_txpostgres.py 2010-07-02 00:55:29 +0000
3+++ test/test_txpostgres.py 2010-10-14 00:11:49 +0000
4@@ -305,6 +305,21 @@
5 d = c.execute("drop table simple")
6 return d.addCallback(lambda _: self.conn.close())
7
8+class TxPostgresNotifyObserverTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
9+
10+ def test_notifyObserver(self):
11+ notify_d = defer.Deferred()
12+ self.conn.addNotifyObserver(
13+ lambda pid, name: notify_d.callback((pid,name))
14+ )
15+ c = self.conn.cursor()
16+ d = c.execute("LISTEN FOO"
17+ ).addCallback(
18+ lambda c: c.execute("NOTIFY FOO")
19+ ).addCallback(
20+ lambda c: notify_d
21+ )
22+ return d
23
24 class TxPostgresManualQueryTestCase(_SimpleDBSetupMixin, Psycopg2TestCase):
25
26
27=== modified file 'txpostgres/txpostgres.py'
28--- txpostgres/txpostgres.py 2010-07-02 00:55:29 +0000
29+++ txpostgres/txpostgres.py 2010-10-14 00:11:49 +0000
30@@ -54,6 +54,10 @@
31 prefix = "pollable"
32 _pollingD = None
33
34+ def __init__(self):
35+ self._is_reading = False
36+ self._is_writing = False
37+
38 def pollable(self):
39 """
40 Return the pollable object. Subclasses should override this.
41@@ -62,6 +66,26 @@
42 """
43 raise NotImplementedError()
44
45+ def addReader(self):
46+ if not self._is_reading:
47+ self.reactor.addReader(self)
48+ self._is_reading = True
49+
50+ def addWriter(self):
51+ if not self._is_writing:
52+ self.reactor.addWriter(self)
53+ self._is_writing = True
54+
55+ def removeReader(self):
56+ if self._is_reading:
57+ self.reactor.removeReader(self)
58+ self._is_reading = False
59+
60+ def removeWriter(self):
61+ if self._is_writing:
62+ self.reactor.removeWriter(self)
63+ self._is_writing = False
64+
65 def poll(self):
66 """
67 Start polling the wrapped pollable.
68@@ -85,9 +109,9 @@
69 d, self._pollingD = self._pollingD, None
70 d.callback(self)
71 elif self._pollingState == psycopg2.extensions.POLL_WRITE:
72- self.reactor.addWriter(self)
73+ self.addWriter()
74 elif self._pollingState == psycopg2.extensions.POLL_READ:
75- self.reactor.addReader(self)
76+ self.addReader()
77 else:
78 d, self._pollingD = self._pollingD, None
79 d.errback(UnexpectedPollResult())
80@@ -95,11 +119,11 @@
81 return ret
82
83 def doRead(self):
84- self.reactor.removeReader(self)
85+ self.removeReader()
86 self.poll()
87
88 def doWrite(self):
89- self.reactor.removeWriter(self)
90+ self.removeWriter()
91 self.poll()
92
93 def logPrefix(self):
94@@ -132,6 +156,8 @@
95 """
96
97 def __init__(self, cursor, connection):
98+ super(Cursor, self).__init__()
99+
100 self.reactor = connection.reactor
101 self.prefix = "cursor"
102
103@@ -237,6 +263,8 @@
104 cursorFactory = Cursor
105
106 def __init__(self, reactor=None):
107+ super(Connection, self).__init__()
108+
109 if not reactor:
110 from twisted.internet import reactor
111 self.reactor = reactor
112@@ -245,10 +273,39 @@
113 # this lock will be used to prevent concurrent query execution
114 self.lock = defer.DeferredLock()
115 self._connection = None
116+ self._notify_observers = set()
117
118 def pollable(self):
119 return self._connection
120
121+ def addNotifyObserver(self, callback):
122+ """
123+ Add callable to be called back when NOTIFY event are delivered.
124+
125+ @type callback: Callable that takes pid, name as parameters.
126+ """
127+ self._notify_observers.add(callback)
128+ if self.pollable():
129+ if self not in self.reactor.getReaders():
130+ self.reactor.addReader(self)
131+
132+ def removeNotifyObserver(self, callback):
133+ """
134+ Remove callable from NOTIFY event are delivery.
135+
136+ @type callback: Callable previously registered by addNotifyObserver.
137+ """
138+ self._notify_observers.remove(callback)
139+
140+ def doRead(self):
141+ super(Connection, self).doRead()
142+ if self._notify_observers and self.pollable():
143+ self.addReader()
144+ while self.notifies:
145+ pid, name = self.notifies.pop(0)
146+ for observer in self._notify_observers:
147+ observer(pid, name)
148+
149 def connect(self, *args, **kwargs):
150 """
151 Connect to the database.
152@@ -276,6 +333,8 @@
153 """
154 _connection, self._connection = self._connection, None
155 _connection.close()
156+ self.removeReader()
157+ self.removeWriter()
158
159 def cursor(self):
160 """
161@@ -575,3 +634,5 @@
162 c = self.connections.pop()
163 d = c.runInteraction(interaction, *args, **kwargs)
164 return d.addBoth(self._putBackAndPassthrough, c)
165+
166+# vim: ft=python et sts=4 ts=4

Subscribers

People subscribed via source and target branches

to all changes: