Merge lp:~blake-rouse/maas/cordinate-notifies into lp:~maas-committers/maas/trunk

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: no longer in the source branch.
Merged at revision: 3850
Proposed branch: lp:~blake-rouse/maas/cordinate-notifies
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 398 lines (+99/-60)
2 files modified
src/maasserver/websockets/listener.py (+54/-23)
src/maasserver/websockets/tests/test_listener.py (+45/-37)
To merge this branch: bzr merge lp:~blake-rouse/maas/cordinate-notifies
Reviewer Review Type Date Requested Status
Gavin Panella (community) Approve
Review via email: mp+257634@code.launchpad.net

Commit message

Coordinate the notify messages from postgres into a set, handling the notification messages in the set instead of as the come in from postgres.

This helps remove duplicate notifications as some notifications can happen multiple times quickly as the database is being updated. Also helps coordinate the notifications only processing one at a time, instead of hammering the database and processing all notifications in parallel or until the threadpool is maxed.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

It looks good, and it's a cool idea.

Twisted has a couple of things that'll make the implementation much easier: LoopingCall and cooiterate(). I've made some changes in another branch lp:~allenap/maas/coordinate-notifies which you might want to merge into this branch.

I also think you need another test, to explicitly test this new behaviour.

Revision history for this message
Blake Rouse (blake-rouse) wrote :

I merged your branch can you give this a final blessing.

Revision history for this message
Gavin Panella (allenap) wrote :

Niiice.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/maasserver/websockets/listener.py'
2--- src/maasserver/websockets/listener.py 2015-03-27 21:21:18 +0000
3+++ src/maasserver/websockets/listener.py 2015-04-28 14:27:40 +0000
4@@ -64,11 +64,18 @@
5
6 implements(interfaces.IReadDescriptor)
7
8+ # Seconds to wait to handle new notifications. When the notifications set
9+ # is empty it will wait this amount of time to check again for new
10+ # notifications.
11+ HANDLE_NOTIFY_DELAY = 0.5
12+
13 def __init__(self, alias="default"):
14 self.alias = alias
15 self.listeners = defaultdict(list)
16 self.autoReconnect = False
17 self.connection = None
18+ self.notifications = set()
19+ self.notifier = task.LoopingCall(self.handleNotifies)
20
21 def start(self):
22 """Start the listener."""
23@@ -80,6 +87,7 @@
24 self.autoReconnect = False
25 if self.connected():
26 reactor.removeReader(self)
27+ self.cancelHandleNotify()
28 return deferToThread(self.stopConnection)
29 else:
30 return defer.succeed(None)
31@@ -119,14 +127,18 @@
32 # contains no pgcode or pgerror to identify the reason, so best
33 # assumtion is that the connection has been lost.
34 reactor.removeReader(self)
35+ self.cancelHandleNotify()
36 self.connectionLost(failure.Failure(error.ConnectionClosed()))
37 else:
38- # Process all of the notify messages inside of a Cooperator so
39- # each notification will be handled in order.
40+ # Add each notify to to the notifications set. This helps
41+ # removes duplicate notifications as one entity in the database
42+ # can send multiple notifies as it can be updated quickly.
43+ # Accumulate the notifications and the listener passes them on to
44+ # be handled in batches.
45 notifies = self.connection.connection.notifies
46 if len(notifies) != 0:
47- # Pass a *copy* of the received notifies list.
48- task.cooperate(self.handleNotifies(notifies[:]))
49+ for notify in notifies:
50+ self.notifications.add((notify.channel, notify.payload))
51 # Delete the contents of the connection's notifies list so
52 # that we don't process them a second time.
53 del notifies[:]
54@@ -184,6 +196,8 @@
55 d.addCallback(lambda _: deferToThread(self.registerChannels))
56 d.addCallback(lambda _: reactor.addReader(self))
57 d.addCallback(
58+ lambda _: self.runHandleNotify(delay=self.HANDLE_NOTIFY_DELAY))
59+ d.addCallback(
60 lambda _: self.logMsg("Listening for notificaton from database."))
61 d.addErrback(failureToConnect)
62 return d
63@@ -213,22 +227,39 @@
64 "%s action is not supported." % action)
65 return channel, action
66
67- def handleNotifies(self, notifies):
68- """Process each notify message yeilding to the returned defer.
69-
70- This method should be called from the global Cooperator so each notify
71- message is handled in order.
72- """
73- for notify in notifies:
74- try:
75- channel, action = self.convertChannel(notify.channel)
76- except PostgresListenerNotifyError as e:
77- # Log the error and continue processing the remaining
78- # notifications.
79- self.logErr(e)
80- else:
81- handlers = self.listeners[channel]
82- for handler in handlers:
83- yield defer.maybeDeferred(
84- handler, action, notify.payload).addErrback(
85- self.logErr)
86+ def runHandleNotify(self, delay=0, clock=reactor):
87+ """Defer later the `handleNotify`."""
88+ if not self.notifier.running:
89+ self.notifier.start(delay, now=False)
90+
91+ def cancelHandleNotify(self):
92+ """Cancel the deferred `handleNotify` call."""
93+ if self.notifier.running:
94+ self.notifier.stop()
95+
96+ def handleNotifies(self, clock=reactor):
97+ """Process all notify message in the notifications set."""
98+ def gen_notifications(notifications):
99+ while len(notifications) != 0:
100+ yield notifications.pop()
101+ return task.coiterate(
102+ self.handleNotify(notification, clock=clock)
103+ for notification in gen_notifications(self.notifications))
104+
105+ def handleNotify(self, notification, clock=reactor):
106+ """Process a notify message in the notifications set."""
107+ channel, payload = notification
108+ try:
109+ channel, action = self.convertChannel(channel)
110+ except PostgresListenerNotifyError as e:
111+ # Log the error and continue processing the remaining
112+ # notifications.
113+ self.logErr(e)
114+ else:
115+ defers = []
116+ handlers = self.listeners[channel]
117+ for handler in handlers:
118+ d = defer.maybeDeferred(handler, action, payload)
119+ d.addErrback(self.logErr)
120+ defers.append(d)
121+ return defer.DeferredList(defers)
122
123=== modified file 'src/maasserver/websockets/tests/test_listener.py'
124--- src/maasserver/websockets/tests/test_listener.py 2015-03-31 18:30:56 +0000
125+++ src/maasserver/websockets/tests/test_listener.py 2015-04-28 14:27:40 +0000
126@@ -14,6 +14,8 @@
127 __metaclass__ = type
128 __all__ = []
129
130+from collections import namedtuple
131+
132 from crochet import wait_for_reactor
133 from django.contrib.auth.models import User
134 from django.db import connection
135@@ -41,12 +43,14 @@
136 )
137 from provisioningserver.utils.twisted import DeferredValue
138 from psycopg2 import OperationalError
139-from testtools.matchers import HasLength
140 from twisted.internet import reactor
141 from twisted.internet.defer import inlineCallbacks
142 from twisted.internet.threads import deferToThread
143
144
145+FakeNotify = namedtuple("FakeNotify", ["channel", "payload"])
146+
147+
148 class TestPostgresListener(MAASServerTestCase):
149
150 @transactional
151@@ -185,31 +189,35 @@
152 listener.connectionLost,
153 MockCalledOnceWith(ANY))
154
155- def test__doRead_copies_and_clears_notifies_list(self):
156+ def test__doRead_adds_notifies_to_notifications(self):
157 listener = PostgresListener()
158+ notifications = [
159+ FakeNotify(
160+ channel=factory.make_name("channel_action"),
161+ payload=factory.make_name("payload"))
162+ for _ in range(3)
163+ ]
164
165 connection = self.patch(listener, "connection")
166 connection.connection.poll.return_value = None
167- # The new notifies received from the database.
168- connection.connection.notifies = [sentinel.notify]
169-
170- handleNotifies = self.patch(listener, "handleNotifies")
171- # The returned value from handleNotifies() is used as input into the
172- # Twisted cooperator. Here we ensure that it has nothing to do.
173- handleNotifies.return_value = iter(())
174+ # Add the notifications twice, so it can test that duplicates are
175+ # accumulated together.
176+ connection.connection.notifies = notifications + notifications
177+ self.patch(listener, "handleNotify")
178
179 listener.doRead()
180- self.assertThat(
181- handleNotifies,
182- MockCalledOnceWith([sentinel.notify]))
183- self.assertThat(
184- connection.connection.notifies,
185- HasLength(0))
186+ self.assertItemsEqual(
187+ listener.notifications, set(notifications))
188
189
190 class TransactionalHelpersMixin:
191 """Helpers performing actions in transactions."""
192
193+ def make_listener_without_delay(self):
194+ listener = PostgresListener()
195+ self.patch(listener, "HANDLE_NOTIFY_DELAY", 0)
196+ return listener
197+
198 @transactional
199 def create_node(self, params=None):
200 if params is None:
201@@ -360,7 +368,7 @@
202 @inlineCallbacks
203 def test__calls_handler_on_create_notification(self):
204 yield deferToThread(register_all_triggers)
205- listener = PostgresListener()
206+ listener = self.make_listener_without_delay()
207 dv = DeferredValue()
208 listener.register(self.listener, lambda *args: dv.set(args))
209 yield listener.start()
210@@ -375,7 +383,7 @@
211 @inlineCallbacks
212 def test__calls_handler_on_update_notification(self):
213 yield deferToThread(register_all_triggers)
214- listener = PostgresListener()
215+ listener = self.make_listener_without_delay()
216 dv = DeferredValue()
217 listener.register(self.listener, lambda *args: dv.set(args))
218 node = yield deferToThread(self.create_node, self.params)
219@@ -395,7 +403,7 @@
220 @inlineCallbacks
221 def test__calls_handler_on_delete_notification(self):
222 yield deferToThread(register_all_triggers)
223- listener = PostgresListener()
224+ listener = self.make_listener_without_delay()
225 dv = DeferredValue()
226 listener.register(self.listener, lambda *args: dv.set(args))
227 node = yield deferToThread(self.create_node, self.params)
228@@ -417,7 +425,7 @@
229 @inlineCallbacks
230 def test__calls_handler_on_create_notification(self):
231 yield deferToThread(register_all_triggers)
232- listener = PostgresListener()
233+ listener = self.make_listener_without_delay()
234 dv = DeferredValue()
235 listener.register("nodegroup", lambda *args: dv.set(args))
236 yield listener.start()
237@@ -432,7 +440,7 @@
238 @inlineCallbacks
239 def test__calls_handler_on_update_notification(self):
240 yield deferToThread(register_all_triggers)
241- listener = PostgresListener()
242+ listener = self.make_listener_without_delay()
243 dv = DeferredValue()
244 listener.register("nodegroup", lambda *args: dv.set(args))
245 nodegroup = yield deferToThread(self.create_nodegroup)
246@@ -452,7 +460,7 @@
247 @inlineCallbacks
248 def test__calls_handler_on_delete_notification(self):
249 yield deferToThread(register_all_triggers)
250- listener = PostgresListener()
251+ listener = self.make_listener_without_delay()
252 dv = DeferredValue()
253 listener.register("nodegroup", lambda *args: dv.set(args))
254 nodegroup = yield deferToThread(self.create_nodegroup)
255@@ -476,7 +484,7 @@
256 yield deferToThread(register_all_triggers)
257 nodegroup = yield deferToThread(self.create_nodegroup)
258
259- listener = PostgresListener()
260+ listener = self.make_listener_without_delay()
261 dv = DeferredValue()
262 listener.register("nodegroup", lambda *args: dv.set(args))
263 yield listener.start()
264@@ -496,7 +504,7 @@
265 interface = yield deferToThread(
266 self.create_nodegroupinterface, nodegroup)
267
268- listener = PostgresListener()
269+ listener = self.make_listener_without_delay()
270 dv = DeferredValue()
271 listener.register("nodegroup", lambda *args: dv.set(args))
272 yield listener.start()
273@@ -518,7 +526,7 @@
274 interface = yield deferToThread(
275 self.create_nodegroupinterface, nodegroup)
276
277- listener = PostgresListener()
278+ listener = self.make_listener_without_delay()
279 dv = DeferredValue()
280 listener.register("nodegroup", lambda *args: dv.set(args))
281 yield listener.start()
282@@ -538,7 +546,7 @@
283 @inlineCallbacks
284 def test__calls_handler_on_create_notification(self):
285 yield deferToThread(register_all_triggers)
286- listener = PostgresListener()
287+ listener = self.make_listener_without_delay()
288 dv = DeferredValue()
289 listener.register("zone", lambda *args: dv.set(args))
290 yield listener.start()
291@@ -553,7 +561,7 @@
292 @inlineCallbacks
293 def test__calls_handler_on_update_notification(self):
294 yield deferToThread(register_all_triggers)
295- listener = PostgresListener()
296+ listener = self.make_listener_without_delay()
297 dv = DeferredValue()
298 listener.register("zone", lambda *args: dv.set(args))
299 zone = yield deferToThread(self.create_zone)
300@@ -573,7 +581,7 @@
301 @inlineCallbacks
302 def test__calls_handler_on_delete_notification(self):
303 yield deferToThread(register_all_triggers)
304- listener = PostgresListener()
305+ listener = self.make_listener_without_delay()
306 dv = DeferredValue()
307 listener.register("zone", lambda *args: dv.set(args))
308 zone = yield deferToThread(self.create_zone)
309@@ -609,7 +617,7 @@
310 node = yield deferToThread(self.create_node, self.params)
311 tag = yield deferToThread(self.create_tag)
312
313- listener = PostgresListener()
314+ listener = self.make_listener_without_delay()
315 dv = DeferredValue()
316 listener.register(self.listener, lambda *args: dv.set(args))
317 yield listener.start()
318@@ -627,7 +635,7 @@
319 node = yield deferToThread(self.create_node, self.params)
320 tag = yield deferToThread(self.create_tag)
321
322- listener = PostgresListener()
323+ listener = self.make_listener_without_delay()
324 dv = DeferredValue()
325 listener.register(self.listener, lambda *args: dv.set(args))
326 yield listener.start()
327@@ -646,7 +654,7 @@
328 tag = yield deferToThread(self.create_tag)
329 yield deferToThread(self.add_node_to_tag, node, tag)
330
331- listener = PostgresListener()
332+ listener = self.make_listener_without_delay()
333 dv = DeferredValue()
334 listener.register(self.listener, lambda *args: dv.set(args))
335 yield listener.start()
336@@ -667,7 +675,7 @@
337 @inlineCallbacks
338 def test__calls_handler_on_create_notification(self):
339 yield deferToThread(register_all_triggers)
340- listener = PostgresListener()
341+ listener = self.make_listener_without_delay()
342 dv = DeferredValue()
343 listener.register("user", lambda *args: dv.set(args))
344 yield listener.start()
345@@ -682,7 +690,7 @@
346 @inlineCallbacks
347 def test__calls_handler_on_update_notification(self):
348 yield deferToThread(register_all_triggers)
349- listener = PostgresListener()
350+ listener = self.make_listener_without_delay()
351 dv = DeferredValue()
352 listener.register("user", lambda *args: dv.set(args))
353 user = yield deferToThread(self.create_user)
354@@ -702,7 +710,7 @@
355 @inlineCallbacks
356 def test__calls_handler_on_delete_notification(self):
357 yield deferToThread(register_all_triggers)
358- listener = PostgresListener()
359+ listener = self.make_listener_without_delay()
360 dv = DeferredValue()
361 listener.register("user", lambda *args: dv.set(args))
362 user = yield deferToThread(self.create_user)
363@@ -723,7 +731,7 @@
364 @inlineCallbacks
365 def test__calls_handler_on_create_notification(self):
366 yield deferToThread(register_all_triggers)
367- listener = PostgresListener()
368+ listener = self.make_listener_without_delay()
369 dv = DeferredValue()
370 listener.register("event", lambda *args: dv.set(args))
371 yield listener.start()
372@@ -738,7 +746,7 @@
373 @inlineCallbacks
374 def test__calls_handler_on_update_notification(self):
375 yield deferToThread(register_all_triggers)
376- listener = PostgresListener()
377+ listener = self.make_listener_without_delay()
378 dv = DeferredValue()
379 listener.register("event", lambda *args: dv.set(args))
380 event = yield deferToThread(self.create_event)
381@@ -758,7 +766,7 @@
382 @inlineCallbacks
383 def test__calls_handler_on_delete_notification(self):
384 yield deferToThread(register_all_triggers)
385- listener = PostgresListener()
386+ listener = self.make_listener_without_delay()
387 dv = DeferredValue()
388 listener.register("event", lambda *args: dv.set(args))
389 event = yield deferToThread(self.create_event)
390@@ -793,7 +801,7 @@
391 yield deferToThread(register_all_triggers)
392 node = yield deferToThread(self.create_node, self.params)
393
394- listener = PostgresListener()
395+ listener = self.make_listener_without_delay()
396 dv = DeferredValue()
397 listener.register(self.listener, lambda *args: dv.set(args))
398 yield listener.start()