Merge lp:~blake-rouse/maas/cordinate-notifies into lp:~maas-committers/maas/trunk
- cordinate-notifies
- Merge into trunk
Proposed by
Blake Rouse
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Blake Rouse | ||||
Approved revision: | no longer in the source branch. | ||||
Merged at revision: | 3850 | ||||
Proposed branch: | lp:~blake-rouse/maas/cordinate-notifies | ||||
Merge into: | lp:~maas-committers/maas/trunk | ||||
Diff against target: |
398 lines (+99/-60) 2 files modified
src/maasserver/websockets/listener.py (+54/-23) src/maasserver/websockets/tests/test_listener.py (+45/-37) |
||||
To merge this branch: | bzr merge lp:~blake-rouse/maas/cordinate-notifies | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gavin Panella (community) | Approve | ||
Review via email: mp+257634@code.launchpad.net |
Commit message
Coordinate the notify messages from postgres into a set, handling the notification messages in the set instead of as the come in from postgres.
This helps remove duplicate notifications as some notifications can happen multiple times quickly as the database is being updated. Also helps coordinate the notifications only processing one at a time, instead of hammering the database and processing all notifications in parallel or until the threadpool is maxed.
Description of the change
To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote : | # |
Revision history for this message
Blake Rouse (blake-rouse) wrote : | # |
I merged your branch can you give this a final blessing.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'src/maasserver/websockets/listener.py' |
2 | --- src/maasserver/websockets/listener.py 2015-03-27 21:21:18 +0000 |
3 | +++ src/maasserver/websockets/listener.py 2015-04-28 14:27:40 +0000 |
4 | @@ -64,11 +64,18 @@ |
5 | |
6 | implements(interfaces.IReadDescriptor) |
7 | |
8 | + # Seconds to wait to handle new notifications. When the notifications set |
9 | + # is empty it will wait this amount of time to check again for new |
10 | + # notifications. |
11 | + HANDLE_NOTIFY_DELAY = 0.5 |
12 | + |
13 | def __init__(self, alias="default"): |
14 | self.alias = alias |
15 | self.listeners = defaultdict(list) |
16 | self.autoReconnect = False |
17 | self.connection = None |
18 | + self.notifications = set() |
19 | + self.notifier = task.LoopingCall(self.handleNotifies) |
20 | |
21 | def start(self): |
22 | """Start the listener.""" |
23 | @@ -80,6 +87,7 @@ |
24 | self.autoReconnect = False |
25 | if self.connected(): |
26 | reactor.removeReader(self) |
27 | + self.cancelHandleNotify() |
28 | return deferToThread(self.stopConnection) |
29 | else: |
30 | return defer.succeed(None) |
31 | @@ -119,14 +127,18 @@ |
32 | # contains no pgcode or pgerror to identify the reason, so best |
33 | # assumtion is that the connection has been lost. |
34 | reactor.removeReader(self) |
35 | + self.cancelHandleNotify() |
36 | self.connectionLost(failure.Failure(error.ConnectionClosed())) |
37 | else: |
38 | - # Process all of the notify messages inside of a Cooperator so |
39 | - # each notification will be handled in order. |
40 | + # Add each notify to to the notifications set. This helps |
41 | + # removes duplicate notifications as one entity in the database |
42 | + # can send multiple notifies as it can be updated quickly. |
43 | + # Accumulate the notifications and the listener passes them on to |
44 | + # be handled in batches. |
45 | notifies = self.connection.connection.notifies |
46 | if len(notifies) != 0: |
47 | - # Pass a *copy* of the received notifies list. |
48 | - task.cooperate(self.handleNotifies(notifies[:])) |
49 | + for notify in notifies: |
50 | + self.notifications.add((notify.channel, notify.payload)) |
51 | # Delete the contents of the connection's notifies list so |
52 | # that we don't process them a second time. |
53 | del notifies[:] |
54 | @@ -184,6 +196,8 @@ |
55 | d.addCallback(lambda _: deferToThread(self.registerChannels)) |
56 | d.addCallback(lambda _: reactor.addReader(self)) |
57 | d.addCallback( |
58 | + lambda _: self.runHandleNotify(delay=self.HANDLE_NOTIFY_DELAY)) |
59 | + d.addCallback( |
60 | lambda _: self.logMsg("Listening for notificaton from database.")) |
61 | d.addErrback(failureToConnect) |
62 | return d |
63 | @@ -213,22 +227,39 @@ |
64 | "%s action is not supported." % action) |
65 | return channel, action |
66 | |
67 | - def handleNotifies(self, notifies): |
68 | - """Process each notify message yeilding to the returned defer. |
69 | - |
70 | - This method should be called from the global Cooperator so each notify |
71 | - message is handled in order. |
72 | - """ |
73 | - for notify in notifies: |
74 | - try: |
75 | - channel, action = self.convertChannel(notify.channel) |
76 | - except PostgresListenerNotifyError as e: |
77 | - # Log the error and continue processing the remaining |
78 | - # notifications. |
79 | - self.logErr(e) |
80 | - else: |
81 | - handlers = self.listeners[channel] |
82 | - for handler in handlers: |
83 | - yield defer.maybeDeferred( |
84 | - handler, action, notify.payload).addErrback( |
85 | - self.logErr) |
86 | + def runHandleNotify(self, delay=0, clock=reactor): |
87 | + """Defer later the `handleNotify`.""" |
88 | + if not self.notifier.running: |
89 | + self.notifier.start(delay, now=False) |
90 | + |
91 | + def cancelHandleNotify(self): |
92 | + """Cancel the deferred `handleNotify` call.""" |
93 | + if self.notifier.running: |
94 | + self.notifier.stop() |
95 | + |
96 | + def handleNotifies(self, clock=reactor): |
97 | + """Process all notify message in the notifications set.""" |
98 | + def gen_notifications(notifications): |
99 | + while len(notifications) != 0: |
100 | + yield notifications.pop() |
101 | + return task.coiterate( |
102 | + self.handleNotify(notification, clock=clock) |
103 | + for notification in gen_notifications(self.notifications)) |
104 | + |
105 | + def handleNotify(self, notification, clock=reactor): |
106 | + """Process a notify message in the notifications set.""" |
107 | + channel, payload = notification |
108 | + try: |
109 | + channel, action = self.convertChannel(channel) |
110 | + except PostgresListenerNotifyError as e: |
111 | + # Log the error and continue processing the remaining |
112 | + # notifications. |
113 | + self.logErr(e) |
114 | + else: |
115 | + defers = [] |
116 | + handlers = self.listeners[channel] |
117 | + for handler in handlers: |
118 | + d = defer.maybeDeferred(handler, action, payload) |
119 | + d.addErrback(self.logErr) |
120 | + defers.append(d) |
121 | + return defer.DeferredList(defers) |
122 | |
123 | === modified file 'src/maasserver/websockets/tests/test_listener.py' |
124 | --- src/maasserver/websockets/tests/test_listener.py 2015-03-31 18:30:56 +0000 |
125 | +++ src/maasserver/websockets/tests/test_listener.py 2015-04-28 14:27:40 +0000 |
126 | @@ -14,6 +14,8 @@ |
127 | __metaclass__ = type |
128 | __all__ = [] |
129 | |
130 | +from collections import namedtuple |
131 | + |
132 | from crochet import wait_for_reactor |
133 | from django.contrib.auth.models import User |
134 | from django.db import connection |
135 | @@ -41,12 +43,14 @@ |
136 | ) |
137 | from provisioningserver.utils.twisted import DeferredValue |
138 | from psycopg2 import OperationalError |
139 | -from testtools.matchers import HasLength |
140 | from twisted.internet import reactor |
141 | from twisted.internet.defer import inlineCallbacks |
142 | from twisted.internet.threads import deferToThread |
143 | |
144 | |
145 | +FakeNotify = namedtuple("FakeNotify", ["channel", "payload"]) |
146 | + |
147 | + |
148 | class TestPostgresListener(MAASServerTestCase): |
149 | |
150 | @transactional |
151 | @@ -185,31 +189,35 @@ |
152 | listener.connectionLost, |
153 | MockCalledOnceWith(ANY)) |
154 | |
155 | - def test__doRead_copies_and_clears_notifies_list(self): |
156 | + def test__doRead_adds_notifies_to_notifications(self): |
157 | listener = PostgresListener() |
158 | + notifications = [ |
159 | + FakeNotify( |
160 | + channel=factory.make_name("channel_action"), |
161 | + payload=factory.make_name("payload")) |
162 | + for _ in range(3) |
163 | + ] |
164 | |
165 | connection = self.patch(listener, "connection") |
166 | connection.connection.poll.return_value = None |
167 | - # The new notifies received from the database. |
168 | - connection.connection.notifies = [sentinel.notify] |
169 | - |
170 | - handleNotifies = self.patch(listener, "handleNotifies") |
171 | - # The returned value from handleNotifies() is used as input into the |
172 | - # Twisted cooperator. Here we ensure that it has nothing to do. |
173 | - handleNotifies.return_value = iter(()) |
174 | + # Add the notifications twice, so it can test that duplicates are |
175 | + # accumulated together. |
176 | + connection.connection.notifies = notifications + notifications |
177 | + self.patch(listener, "handleNotify") |
178 | |
179 | listener.doRead() |
180 | - self.assertThat( |
181 | - handleNotifies, |
182 | - MockCalledOnceWith([sentinel.notify])) |
183 | - self.assertThat( |
184 | - connection.connection.notifies, |
185 | - HasLength(0)) |
186 | + self.assertItemsEqual( |
187 | + listener.notifications, set(notifications)) |
188 | |
189 | |
190 | class TransactionalHelpersMixin: |
191 | """Helpers performing actions in transactions.""" |
192 | |
193 | + def make_listener_without_delay(self): |
194 | + listener = PostgresListener() |
195 | + self.patch(listener, "HANDLE_NOTIFY_DELAY", 0) |
196 | + return listener |
197 | + |
198 | @transactional |
199 | def create_node(self, params=None): |
200 | if params is None: |
201 | @@ -360,7 +368,7 @@ |
202 | @inlineCallbacks |
203 | def test__calls_handler_on_create_notification(self): |
204 | yield deferToThread(register_all_triggers) |
205 | - listener = PostgresListener() |
206 | + listener = self.make_listener_without_delay() |
207 | dv = DeferredValue() |
208 | listener.register(self.listener, lambda *args: dv.set(args)) |
209 | yield listener.start() |
210 | @@ -375,7 +383,7 @@ |
211 | @inlineCallbacks |
212 | def test__calls_handler_on_update_notification(self): |
213 | yield deferToThread(register_all_triggers) |
214 | - listener = PostgresListener() |
215 | + listener = self.make_listener_without_delay() |
216 | dv = DeferredValue() |
217 | listener.register(self.listener, lambda *args: dv.set(args)) |
218 | node = yield deferToThread(self.create_node, self.params) |
219 | @@ -395,7 +403,7 @@ |
220 | @inlineCallbacks |
221 | def test__calls_handler_on_delete_notification(self): |
222 | yield deferToThread(register_all_triggers) |
223 | - listener = PostgresListener() |
224 | + listener = self.make_listener_without_delay() |
225 | dv = DeferredValue() |
226 | listener.register(self.listener, lambda *args: dv.set(args)) |
227 | node = yield deferToThread(self.create_node, self.params) |
228 | @@ -417,7 +425,7 @@ |
229 | @inlineCallbacks |
230 | def test__calls_handler_on_create_notification(self): |
231 | yield deferToThread(register_all_triggers) |
232 | - listener = PostgresListener() |
233 | + listener = self.make_listener_without_delay() |
234 | dv = DeferredValue() |
235 | listener.register("nodegroup", lambda *args: dv.set(args)) |
236 | yield listener.start() |
237 | @@ -432,7 +440,7 @@ |
238 | @inlineCallbacks |
239 | def test__calls_handler_on_update_notification(self): |
240 | yield deferToThread(register_all_triggers) |
241 | - listener = PostgresListener() |
242 | + listener = self.make_listener_without_delay() |
243 | dv = DeferredValue() |
244 | listener.register("nodegroup", lambda *args: dv.set(args)) |
245 | nodegroup = yield deferToThread(self.create_nodegroup) |
246 | @@ -452,7 +460,7 @@ |
247 | @inlineCallbacks |
248 | def test__calls_handler_on_delete_notification(self): |
249 | yield deferToThread(register_all_triggers) |
250 | - listener = PostgresListener() |
251 | + listener = self.make_listener_without_delay() |
252 | dv = DeferredValue() |
253 | listener.register("nodegroup", lambda *args: dv.set(args)) |
254 | nodegroup = yield deferToThread(self.create_nodegroup) |
255 | @@ -476,7 +484,7 @@ |
256 | yield deferToThread(register_all_triggers) |
257 | nodegroup = yield deferToThread(self.create_nodegroup) |
258 | |
259 | - listener = PostgresListener() |
260 | + listener = self.make_listener_without_delay() |
261 | dv = DeferredValue() |
262 | listener.register("nodegroup", lambda *args: dv.set(args)) |
263 | yield listener.start() |
264 | @@ -496,7 +504,7 @@ |
265 | interface = yield deferToThread( |
266 | self.create_nodegroupinterface, nodegroup) |
267 | |
268 | - listener = PostgresListener() |
269 | + listener = self.make_listener_without_delay() |
270 | dv = DeferredValue() |
271 | listener.register("nodegroup", lambda *args: dv.set(args)) |
272 | yield listener.start() |
273 | @@ -518,7 +526,7 @@ |
274 | interface = yield deferToThread( |
275 | self.create_nodegroupinterface, nodegroup) |
276 | |
277 | - listener = PostgresListener() |
278 | + listener = self.make_listener_without_delay() |
279 | dv = DeferredValue() |
280 | listener.register("nodegroup", lambda *args: dv.set(args)) |
281 | yield listener.start() |
282 | @@ -538,7 +546,7 @@ |
283 | @inlineCallbacks |
284 | def test__calls_handler_on_create_notification(self): |
285 | yield deferToThread(register_all_triggers) |
286 | - listener = PostgresListener() |
287 | + listener = self.make_listener_without_delay() |
288 | dv = DeferredValue() |
289 | listener.register("zone", lambda *args: dv.set(args)) |
290 | yield listener.start() |
291 | @@ -553,7 +561,7 @@ |
292 | @inlineCallbacks |
293 | def test__calls_handler_on_update_notification(self): |
294 | yield deferToThread(register_all_triggers) |
295 | - listener = PostgresListener() |
296 | + listener = self.make_listener_without_delay() |
297 | dv = DeferredValue() |
298 | listener.register("zone", lambda *args: dv.set(args)) |
299 | zone = yield deferToThread(self.create_zone) |
300 | @@ -573,7 +581,7 @@ |
301 | @inlineCallbacks |
302 | def test__calls_handler_on_delete_notification(self): |
303 | yield deferToThread(register_all_triggers) |
304 | - listener = PostgresListener() |
305 | + listener = self.make_listener_without_delay() |
306 | dv = DeferredValue() |
307 | listener.register("zone", lambda *args: dv.set(args)) |
308 | zone = yield deferToThread(self.create_zone) |
309 | @@ -609,7 +617,7 @@ |
310 | node = yield deferToThread(self.create_node, self.params) |
311 | tag = yield deferToThread(self.create_tag) |
312 | |
313 | - listener = PostgresListener() |
314 | + listener = self.make_listener_without_delay() |
315 | dv = DeferredValue() |
316 | listener.register(self.listener, lambda *args: dv.set(args)) |
317 | yield listener.start() |
318 | @@ -627,7 +635,7 @@ |
319 | node = yield deferToThread(self.create_node, self.params) |
320 | tag = yield deferToThread(self.create_tag) |
321 | |
322 | - listener = PostgresListener() |
323 | + listener = self.make_listener_without_delay() |
324 | dv = DeferredValue() |
325 | listener.register(self.listener, lambda *args: dv.set(args)) |
326 | yield listener.start() |
327 | @@ -646,7 +654,7 @@ |
328 | tag = yield deferToThread(self.create_tag) |
329 | yield deferToThread(self.add_node_to_tag, node, tag) |
330 | |
331 | - listener = PostgresListener() |
332 | + listener = self.make_listener_without_delay() |
333 | dv = DeferredValue() |
334 | listener.register(self.listener, lambda *args: dv.set(args)) |
335 | yield listener.start() |
336 | @@ -667,7 +675,7 @@ |
337 | @inlineCallbacks |
338 | def test__calls_handler_on_create_notification(self): |
339 | yield deferToThread(register_all_triggers) |
340 | - listener = PostgresListener() |
341 | + listener = self.make_listener_without_delay() |
342 | dv = DeferredValue() |
343 | listener.register("user", lambda *args: dv.set(args)) |
344 | yield listener.start() |
345 | @@ -682,7 +690,7 @@ |
346 | @inlineCallbacks |
347 | def test__calls_handler_on_update_notification(self): |
348 | yield deferToThread(register_all_triggers) |
349 | - listener = PostgresListener() |
350 | + listener = self.make_listener_without_delay() |
351 | dv = DeferredValue() |
352 | listener.register("user", lambda *args: dv.set(args)) |
353 | user = yield deferToThread(self.create_user) |
354 | @@ -702,7 +710,7 @@ |
355 | @inlineCallbacks |
356 | def test__calls_handler_on_delete_notification(self): |
357 | yield deferToThread(register_all_triggers) |
358 | - listener = PostgresListener() |
359 | + listener = self.make_listener_without_delay() |
360 | dv = DeferredValue() |
361 | listener.register("user", lambda *args: dv.set(args)) |
362 | user = yield deferToThread(self.create_user) |
363 | @@ -723,7 +731,7 @@ |
364 | @inlineCallbacks |
365 | def test__calls_handler_on_create_notification(self): |
366 | yield deferToThread(register_all_triggers) |
367 | - listener = PostgresListener() |
368 | + listener = self.make_listener_without_delay() |
369 | dv = DeferredValue() |
370 | listener.register("event", lambda *args: dv.set(args)) |
371 | yield listener.start() |
372 | @@ -738,7 +746,7 @@ |
373 | @inlineCallbacks |
374 | def test__calls_handler_on_update_notification(self): |
375 | yield deferToThread(register_all_triggers) |
376 | - listener = PostgresListener() |
377 | + listener = self.make_listener_without_delay() |
378 | dv = DeferredValue() |
379 | listener.register("event", lambda *args: dv.set(args)) |
380 | event = yield deferToThread(self.create_event) |
381 | @@ -758,7 +766,7 @@ |
382 | @inlineCallbacks |
383 | def test__calls_handler_on_delete_notification(self): |
384 | yield deferToThread(register_all_triggers) |
385 | - listener = PostgresListener() |
386 | + listener = self.make_listener_without_delay() |
387 | dv = DeferredValue() |
388 | listener.register("event", lambda *args: dv.set(args)) |
389 | event = yield deferToThread(self.create_event) |
390 | @@ -793,7 +801,7 @@ |
391 | yield deferToThread(register_all_triggers) |
392 | node = yield deferToThread(self.create_node, self.params) |
393 | |
394 | - listener = PostgresListener() |
395 | + listener = self.make_listener_without_delay() |
396 | dv = DeferredValue() |
397 | listener.register(self.listener, lambda *args: dv.set(args)) |
398 | yield listener.start() |
It looks good, and it's a cool idea.
Twisted has a couple of things that'll make the implementation much easier: LoopingCall and cooiterate(). I've made some changes in another branch lp:~allenap/maas/coordinate-notifies which you might want to merge into this branch.
I also think you need another test, to explicitly test this new behaviour.