Merge lp:~fwereade/pyjuju/watch-related-units-callbacks into lp:pyjuju

Proposed by William Reade
Status: Superseded
Proposed branch: lp:~fwereade/pyjuju/watch-related-units-callbacks
Merge into: lp:pyjuju
Prerequisite: lp:~fwereade/pyjuju/resolve-unit-relation-diffs
Diff against target: 1341 lines (+310/-491)
5 files modified
juju/hooks/scheduler.py (+13/-27)
juju/hooks/tests/test_scheduler.py (+31/-39)
juju/state/relation.py (+9/-8)
juju/state/tests/test_relation.py (+255/-416)
juju/unit/lifecycle.py (+2/-1)
To merge this branch: bzr merge lp:~fwereade/pyjuju/watch-related-units-callbacks
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+84423@code.launchpad.net

This proposal has been superseded by a proposal from 2011-12-12.

Description of the change

Lack of separate callbacks for membership changes and settings changes made it harder for me to follow the code, and especially the tests. Should now be more comprehensible. Both callbacks still need to be synchronous, but this doesn't represent a real change IMO; it's just something to remain aware of.

To post a comment you must log in.
Revision history for this message
William Reade (fwereade) wrote :

rejecting due to abandonment of prereq

485. By William Reade

merge parent

486. By William Reade

merge parent

487. By William Reade

merge parent

488. By William Reade

merge parent

489. By William Reade

merge parent

490. By William Reade

rename foo_callback to cb_foo

491. By William Reade

poke_zk changes, as requested (since these are just checking for callbacks firing, I think they're suitable)

492. By William Reade

merge parent

493. By William Reade

merge parent

494. By William Reade

merge parent

495. By William Reade

merge parent

496. By William Reade

merge parent

497. By William Reade

merge parent

498. By William Reade

merge parent

499. By William Reade

merge parent

500. By William Reade

merge parent

501. By William Reade

merge parent

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'juju/hooks/scheduler.py'
2--- juju/hooks/scheduler.py 2011-12-05 02:11:54 +0000
3+++ juju/hooks/scheduler.py 2011-12-05 02:11:55 +0000
4@@ -137,31 +137,11 @@
5 # occurs.
6 self._run_queue.put(None)
7
8- def notify_change(self, old_units=(), new_units=(), modified=()):
9- """Receive changes regarding related units and schedule hook execution.
10- """
11- log.debug("relation change old:%s, new:%s, modified:%s",
12- old_units, new_units, modified)
13-
14+ def members_changed(self, old_units, new_units):
15+ log.debug("members changed: old=%s, new=%s", old_units, new_units)
16+ scheduled = 0
17 self._clock_sequence += 1
18
19- # UnitRelationState.watch_related_units will EITHER call back with
20- # old_units and new_units, OR with modified; never both.
21- # This should in fact probably be two distinct callbacks...
22- if old_units or new_units:
23- assert not modified, "got mixed settings and membership changes"
24- scheduled = self._notify_membership_change(old_units, new_units)
25- else:
26- assert modified, "got no changes"
27- scheduled = self._notify_settings_change(modified)
28-
29- if scheduled:
30- self._run_queue.put(self._clock_sequence)
31- self._save_state()
32-
33- def _notify_membership_change(self, old_units, new_units):
34- scheduled = 0
35-
36 if self._context_members is None:
37 self._context_members = list(old_units)
38
39@@ -184,16 +164,22 @@
40 scheduled += self._queue_change(
41 unit_name, REMOVED, self._clock_sequence)
42
43- return scheduled
44+ if scheduled:
45+ self._run_queue.put(self._clock_sequence)
46+ self._save_state()
47
48- def _notify_settings_change(self, modified):
49+ def settings_changed(self, unit_versions):
50+ log.debug("settings changed: %s", unit_versions)
51 scheduled = 0
52- for (unit_name, version) in modified:
53+ self._clock_sequence += 1
54+ for (unit_name, version) in unit_versions:
55 if version > self._latest_members.get(unit_name, 0):
56 self._latest_members[unit_name] = version
57 scheduled += self._queue_change(
58 unit_name, MODIFIED, self._clock_sequence)
59- return scheduled
60+ if scheduled:
61+ self._run_queue.put(self._clock_sequence)
62+ self._save_state()
63
64 def get_hook_context(self, change):
65 """
66
67=== modified file 'juju/hooks/tests/test_scheduler.py'
68--- juju/hooks/tests/test_scheduler.py 2011-12-05 02:11:54 +0000
69+++ juju/hooks/tests/test_scheduler.py 2011-12-05 02:11:55 +0000
70@@ -56,14 +56,14 @@
71 results in a modify event.
72 """
73 self.write_single_unit_state()
74- self.scheduler.notify_change(old_units=["u-1"], new_units=[])
75- self.scheduler.notify_change(old_units=[], new_units=["u-1"])
76+ self.scheduler.members_changed(["u-1"], [])
77+ self.scheduler.members_changed([], ["u-1"])
78 self.scheduler.run()
79 self.assertEqual(len(self.executions), 1)
80 self.assertEqual(self.executions[0][1].change_type, "modified")
81
82- output = ("relation change old:['u-1'], new:[], modified:()",
83- "relation change old:[], new:['u-1'], modified:()",
84+ output = ("members changed: old=['u-1'], new=[]",
85+ "members changed: old=[], new=['u-1']",
86 "start",
87 "executing hook for u-1:modified\n")
88 self.assertEqual(self.log_stream.getvalue(), "\n".join(output))
89@@ -73,33 +73,33 @@
90 An extra validation of the previous test.
91 """
92 self.write_single_unit_state()
93- self.scheduler.notify_change(modified=[("u-1", 1)])
94- self.scheduler.notify_change(old_units=["u-1"], new_units=[])
95- self.scheduler.notify_change(old_units=[], new_units=["u-1"])
96+ self.scheduler.settings_changed([("u-1", 1)])
97+ self.scheduler.members_changed(["u-1"], [])
98+ self.scheduler.members_changed([], ["u-1"])
99 self.scheduler.run()
100 self.assertEqual(len(self.executions), 1)
101 self.assertEqual(self.executions[0][1].change_type, "modified")
102
103 def test_reduce_add_modify(self):
104 """An add and modify event for a node are coalesced to an add."""
105- self.scheduler.notify_change(old_units=[], new_units=["u-1"])
106- self.scheduler.notify_change(modified=[("u-1", 1)])
107+ self.scheduler.members_changed([], ["u-1"])
108+ self.scheduler.settings_changed([("u-1", 1)])
109 self.scheduler.run()
110 self.assertEqual(len(self.executions), 1)
111 self.assertEqual(self.executions[0][1].change_type, "joined")
112
113 def test_reduce_add_remove(self):
114 """an add followed by a removal results in a noop."""
115- self.scheduler.notify_change(old_units=[], new_units=["u-1"])
116- self.scheduler.notify_change(old_units=["u-1"], new_units=[])
117+ self.scheduler.members_changed([], ["u-1"])
118+ self.scheduler.members_changed(["u-1"], [])
119 self.scheduler.run()
120 self.assertEqual(len(self.executions), 0)
121
122 def test_reduce_modify_remove(self):
123 """Modifying and then removing a node, results in just the removal."""
124 self.write_single_unit_state()
125- self.scheduler.notify_change(modified=[("u-1", 1)])
126- self.scheduler.notify_change(old_units=["u-1"], new_units=[])
127+ self.scheduler.settings_changed([("u-1", 1)])
128+ self.scheduler.members_changed(["u-1"], [])
129 self.scheduler.run()
130 self.assertEqual(len(self.executions), 1)
131 self.assertEqual(self.executions[0][1].change_type, "departed")
132@@ -108,15 +108,15 @@
133 """Multiple modifies get coalesced to a single modify."""
134 # simulate normal startup, the first notify will always be the existing
135 # membership set.
136- self.scheduler.notify_change(old_units=[], new_units=["u-1"])
137+ self.scheduler.members_changed([], ["u-1"])
138 self.scheduler.run()
139 self.scheduler.stop()
140 self.assertEqual(len(self.executions), 1)
141
142 # Now continue the modify/modify reduction.
143- self.scheduler.notify_change(modified=[("u-1", 1)])
144- self.scheduler.notify_change(modified=[("u-1", 2)])
145- self.scheduler.notify_change(modified=[("u-1", 3)])
146+ self.scheduler.settings_changed([("u-1", 1)])
147+ self.scheduler.settings_changed([("u-1", 2)])
148+ self.scheduler.settings_changed([("u-1", 3)])
149 self.scheduler.run()
150
151 self.assertEqual(len(self.executions), 2)
152@@ -143,11 +143,9 @@
153 always see the membership of a relation as it was at the
154 time of their associated change.
155 """
156- self.scheduler.notify_change(
157- old_units=[], new_units=["u-1", "u-2"])
158- self.scheduler.notify_change(
159- old_units=["u-1", "u-2"], new_units=["u-2", "u-3"])
160- self.scheduler.notify_change(modified=[("u-2", 1)])
161+ self.scheduler.members_changed([], ["u-1", "u-2"])
162+ self.scheduler.members_changed(["u-1", "u-2"], ["u-2", "u-3"])
163+ self.scheduler.settings_changed([("u-2", 1)])
164
165 self.scheduler.run()
166 self.scheduler.stop()
167@@ -161,9 +159,8 @@
168 change_members = yield self.executions[0][0].get_members()
169 self.assertEqual(change_members, ["u-2"])
170
171- self.scheduler.notify_change(modified=[("u-2", 2)])
172- self.scheduler.notify_change(
173- old_units=["u-2", "u-3"], new_units=["u-2"])
174+ self.scheduler.settings_changed([("u-2", 2)])
175+ self.scheduler.members_changed(["u-2", "u-3"], ["u-2"])
176 self.scheduler.run()
177
178 self.assertEqual(len(self.executions), 4)
179@@ -187,10 +184,8 @@
180 "clock_queue": [],
181 "clock_sequence": 1}))
182
183- self.scheduler.notify_change(
184- old_units=["u-1", "u-2"],
185- new_units=["u-2", "u-3", "u-4"])
186- self.scheduler.notify_change(modified=[("u-2", 1)])
187+ self.scheduler.members_changed(["u-1", "u-2"], ["u-2", "u-3", "u-4"])
188+ self.scheduler.settings_changed([("u-2", 1)])
189
190 self.scheduler.run()
191 self.scheduler.stop()
192@@ -273,10 +268,8 @@
193 "clock_queue": [],
194 "clock_sequence": 7}))
195
196- self.scheduler.notify_change(
197- old_units=["u-1", "u-2"],
198- new_units=["u-2", "u-3"])
199- self.scheduler.notify_change(modified=[("u-2", 3)])
200+ self.scheduler.members_changed(["u-1", "u-2"], ["u-2", "u-3"])
201+ self.scheduler.settings_changed([("u-2", 3)])
202
203 # Add a stop instruction to the queue, which should *not* be saved.
204 self.scheduler.stop()
205@@ -358,7 +351,7 @@
206 version for that unit will be ignored.
207 """
208 self.write_single_unit_state()
209- self.scheduler.notify_change(modified=(("u-1", 0),))
210+ self.scheduler.settings_changed([("u-1", 0),])
211 self.scheduler.run()
212 self.assertEquals(len(self.executions), 0)
213
214@@ -367,8 +360,8 @@
215 When a unit is added, we assume its settings version to be 0, and
216 therefore modified events with version 0 will be ignored.
217 """
218- self.scheduler.notify_change(old_units=[], new_units=["u-1"])
219- self.scheduler.notify_change(modified=(("u-1", 0),))
220+ self.scheduler.members_changed([], ["u-1"])
221+ self.scheduler.settings_changed([("u-1", 0),])
222 self.scheduler.run()
223 self.assertEquals(len(self.executions), 1)
224 self.assertEqual(self.executions[0][1].change_type, "joined")
225@@ -387,12 +380,11 @@
226 "clock_queue": [],
227 "clock_sequence": 4}))
228
229- self.scheduler.notify_change(
230- old_units=["u-2"], new_units=["u-3", "u-4"])
231+ self.scheduler.members_changed(["u-2"], ["u-3", "u-4"])
232 self.scheduler.run()
233
234 output = (
235- "relation change old:['u-2'], new:['u-3', 'u-4'], modified:()",
236+ "members changed: old=['u-2'], new=['u-3', 'u-4']",
237 "old does not match last recorded units: ['u-1', 'u-2']",
238 "start",
239 "executing hook for u-3:joined",
240
241=== modified file 'juju/state/relation.py'
242--- juju/state/relation.py 2011-12-05 02:11:54 +0000
243+++ juju/state/relation.py 2011-12-05 02:11:55 +0000
244@@ -408,7 +408,7 @@
245 returnValue(endpoint_container)
246
247 @inlineCallbacks
248- def watch_related_units(self, callback):
249+ def watch_related_units(self, members_callback, settings_callback):
250 """Register a callback to be invoked when related units change.
251
252 @param: callback a function that gets invoked when related
253@@ -443,7 +443,8 @@
254 watcher_factory = PeerUnitWatcher
255
256 watcher = watcher_factory(
257- self._client, self, endpoint_container, callback)
258+ self._client, self, endpoint_container,
259+ members_callback, settings_callback)
260 returnValue(watcher)
261
262
263@@ -474,12 +475,14 @@
264 client,
265 watcher_unit,
266 unit_container_path,
267- callback):
268+ members_callback,
269+ settings_callback):
270 super(RelationUnitWatcherBase, self).__init__(client)
271 self._units = []
272 self._watcher_unit = watcher_unit
273 self._container_path = unit_container_path
274- self._callback = callback
275+ self._members_callback = members_callback
276+ self._settings_callback = settings_callback
277 self._stopped = None
278 self._unit_name_map = None
279 self._log = logging.getLogger("unit.relation.watch")
280@@ -606,9 +609,7 @@
281 # Invoke callback
282 callback_d.addCallback(
283 lambda (old_units, new_units): maybeDeferred(
284- self._callback,
285- old_units=sorted(old_units),
286- new_units=sorted(new_units)))
287+ self._members_callback, sorted(old_units), sorted(new_units)))
288
289 # Attach initial notifiers and change handlers to new nodes
290 if settings_watches:
291@@ -631,7 +632,7 @@
292 return
293 ((unit_name,),) = yield self._resolve_unit_names([unit_id])
294 node_info = (unit_name, stat["version"])
295- yield self._callback(modified=(node_info,))
296+ yield self._settings_callback((node_info,))
297
298 def _watch_settings_for_units(self, added):
299 """Setup watches on new unit relation setting nodes.
300
301=== modified file 'juju/state/tests/test_relation.py'
302--- juju/state/tests/test_relation.py 2011-12-05 02:11:54 +0000
303+++ juju/state/tests/test_relation.py 2011-12-05 02:11:55 +0000
304@@ -24,6 +24,18 @@
305 from juju.state.tests.common import StateTestBase
306
307
308+def _1_arg(f):
309+ def g(arg1):
310+ return f(arg1)
311+ return g
312+
313+
314+def _2_args(f):
315+ def g(arg1, arg2):
316+ return f(arg1, arg2)
317+ return g
318+
319+
320 class RelationTestBase(StateTestBase):
321
322 @inlineCallbacks
323@@ -716,23 +728,60 @@
324 self.service_state2)))
325
326
327+class WatchChecker(object):
328+
329+ def __init__(self, test, block_cbs=False):
330+ self.test = test
331+ self.results = []
332+ self.sentinels = [Deferred() for i in range(10)]
333+ self.blockers = [Deferred() if block_cbs else True for i in range(10)]
334+
335+ def _members_changed(self, old, new):
336+ self.results.append((old, new))
337+ return self._synchronize()
338+
339+ def _settings_changed(self, modified):
340+ (change,) = modified
341+ self.results.append(change)
342+ return self._synchronize()
343+
344+ @inlineCallbacks
345+ def _synchronize(self):
346+ index = len(self.results) - 1
347+ self.sentinels[index].callback(True)
348+ yield self.blockers[index]
349+
350+ def watch(self, unit_relation):
351+ return unit_relation.watch_related_units(
352+ self._members_changed, self._settings_changed)
353+
354+ def assert_cb_count(self, count):
355+ # *started* callbacks, including those whose return is blocked
356+ self.test.assertEquals(len(self.results), count)
357+
358+ def wait_for_cb(self, index):
359+ # wait for *call*, not return
360+ return self.sentinels[index]
361+
362+ def unblock_cb(self, index):
363+ self.blockers[index].callback(True)
364+
365+ @inlineCallbacks
366+ def assert_members_cb(self, index, old, new):
367+ yield self.wait_for_cb(index)
368+ for actual, expected in zip(self.results[index], (old, new)):
369+ for actual_name, expected_unit in zip(actual, expected):
370+ self.test.assertEquals(actual_name, expected_unit.unit_name)
371+
372+ @inlineCallbacks
373+ def assert_settings_cb(self, index, unit, version):
374+ yield self.wait_for_cb(index)
375+ change = self.results[index]
376+ self.test.assertEquals(change, (unit.unit_name, version))
377+
378+
379 class UnitRelationStateTest(RelationTestBase):
380
381- def verify_unit_watch_result(
382- self, result, old_units=None, new_units=None, modified=None):
383- old_actual, new_actual, modified_actual = result
384- for value, expected in zip(result, [old_units, new_units, modified]):
385- if expected is None:
386- self.assertFalse(value)
387- continue
388- for value_unit, expected_unit in zip(value, expected):
389- if isinstance(expected_unit, tuple):
390- self.assertEqual(
391- value_unit[0], expected_unit[0].unit_name)
392- self.assertEqual(value_unit[1], expected_unit[1])
393- else:
394- self.assertEqual(value_unit, expected_unit.unit_name)
395-
396 @inlineCallbacks
397 def test_properties(self):
398 states = yield self.add_relation_service_unit("webcache", "varnish")
399@@ -822,36 +871,29 @@
400
401 yield self.add_opposite_service_unit(wordpress_states)
402
403- results = []
404- wait_callback = [Deferred() for i in range(5)]
405-
406- def watch_related(old_units=None, new_units=None, modified=None):
407- results.append((old_units, new_units, modified))
408- wait_callback[len(results)-1].callback(True)
409-
410- def invoked(*args, **kw):
411- # sleep to make sure that things haven't fired till the watch is
412- # in place.
413- time.sleep(0.1)
414- self.assertFalse(results)
415-
416+ checker = WatchChecker(self)
417 mock_client = self.mocker.patch(self.client)
418 mock_client.get_children_and_watch("/relations/%s/server" % (
419 wordpress_states["relation"].internal_id))
420
421+ def invoked(*args, **kw):
422+ # sleep to make sure that things haven't fired till the watch is
423+ # in place.
424+ time.sleep(0.1)
425+ checker.assert_cb_count(0)
426+
427 self.mocker.call(invoked)
428 self.mocker.passthrough()
429 self.mocker.replay()
430
431- watcher = yield wordpress_states["unit_relation"].watch_related_units(
432- watch_related)
433+ watcher = yield checker.watch(wordpress_states["unit_relation"])
434 yield watcher.start()
435- yield wait_callback[0]
436+ yield checker.wait_for_cb(0)
437
438 @inlineCallbacks
439 def test_watch_start_new_service(self):
440 """Invoking watcher.start returns a deferred that only fires
441- after watch on the containr is in place. In the case of a new
442+ after watch on the container is in place. In the case of a new
443 service this after an existance watch is established on the
444 container.
445 """
446@@ -860,28 +902,23 @@
447 wordpress_states = yield self.add_relation_service_unit_from_endpoints(
448 wordpress_ep)
449
450- results = []
451-
452- def watch_related(old_units=None, new_units=None, modified=None):
453- results.append((old_units, new_units, modified))
454-
455- def invoked(*args, **kw):
456- # sleep to make sure that things haven't fired till the watch is
457- # in place.
458- time.sleep(0.1)
459- self.assertFalse(results)
460-
461+ checker = WatchChecker(self)
462 mock_client = self.mocker.patch(self.client)
463 mock_client.exists_and_watch("/relations/%s/server" % (
464 wordpress_states["relation"].internal_id))
465
466+ def invoked(*args, **kw):
467+ # sleep to make sure that things haven't fired till the watch is
468+ # in place.
469+ time.sleep(0.1)
470+ checker.assert_cb_count(0)
471+
472 self.mocker.call(invoked)
473 self.mocker.passthrough()
474 self.mocker.replay()
475
476- watcher = yield wordpress_states["unit_relation"].watch_related_units(
477- watch_related)
478- yield watcher.start().addCallback(lambda x: results.append(True))
479+ watcher = yield checker.watch(wordpress_states["unit_relation"])
480+ yield watcher.start()
481
482 @inlineCallbacks
483 def test_watch_client_server_with_new_service(self):
484@@ -896,16 +933,8 @@
485 wordpress_states = yield self.add_relation_service_unit_from_endpoints(
486 wordpress_ep, mysql_ep)
487
488- # setup watch callbacks, and start watching.
489- wait_callback = [Deferred() for i in range(5)]
490- results = []
491-
492- def watch_related(old_units=None, new_units=None, modified=None):
493- results.append((old_units, new_units, modified))
494- wait_callback[len(results) - 1].callback(True)
495-
496- watcher = yield wordpress_states["unit_relation"].watch_related_units(
497- watch_related)
498+ checker = WatchChecker(self)
499+ watcher = yield checker.watch(wordpress_states["unit_relation"])
500 yield watcher.start()
501
502 # adding another unit of wordpress, does not cause any changes
503@@ -915,13 +944,11 @@
504
505 # give chance for accidental watch firing.
506 yield self.sleep(0.1)
507-
508- # assert no firing of callback
509- self.assertFalse(results)
510+ checker.assert_cb_count(0)
511
512 # add the server service and a unit of that
513- mysql_states = yield self.add_opposite_service_unit(
514- wordpress_states)
515+ mysql_states = yield self.add_opposite_service_unit(wordpress_states)
516+ mysql_unit = mysql_states["unit"]
517
518 topology = yield self.get_topology()
519 # assert the relation is established correctly
520@@ -929,30 +956,19 @@
521 wordpress_states["relation"].internal_id)
522 self.assertEqual(len(services), 2)
523
524- # wait for callback
525- yield wait_callback[0]
526- self.verify_unit_watch_result(
527- results[0], old_units=[], new_units=[mysql_states["unit"]])
528-
529- # wait for initial settings version
530- yield wait_callback[1]
531- self.verify_unit_watch_result(
532- results[1], modified=((mysql_states["unit"], 0),))
533-
534- # modify the unit and get another callback
535+ # wait for initial callbacks
536+ yield checker.assert_members_cb(0, [], [mysql_unit])
537+ yield checker.assert_settings_cb(1, mysql_unit, 0)
538+
539+ # modify unit, wait for callback
540 yield mysql_states["unit_relation"].set_data(dict(hello="world"))
541- yield wait_callback[2]
542- self.verify_unit_watch_result(
543- results[2], modified=((mysql_states["unit"], 1),))
544+ yield checker.assert_settings_cb(2, mysql_unit, 1)
545
546- # add another unit of mysql
547+ # add another unit, wait for callback
548 mysql_unit2 = yield mysql_states["service"].add_unit_state()
549 yield mysql_states["service_relation"].add_unit_state(mysql_unit2)
550- yield wait_callback[3]
551- self.verify_unit_watch_result(
552- results[3],
553- old_units=[mysql_states["unit"]],
554- new_units=[mysql_states["unit"], mysql_unit2])
555+ yield checker.assert_members_cb(
556+ 3, [mysql_unit], [mysql_unit, mysql_unit2])
557
558 @inlineCallbacks
559 def test_watch_client_server_with_existing_service(self):
560@@ -969,34 +985,15 @@
561 wordpress_ep, mysql_ep)
562 mysql_states = yield self.add_opposite_service_unit(
563 wordpress_states)
564-
565- # setup callbacks and start observing.
566- wait_callback = [Deferred() for i in range(5)]
567- results = []
568-
569- def watch_related(old_units=None, new_units=None, modified=None):
570- results.append((old_units, new_units, modified))
571- wait_callback[len(results)-1].callback(True)
572-
573- watcher = yield wordpress_states["unit_relation"].watch_related_units(
574- watch_related)
575+ mysql_unit = mysql_states["unit"]
576+
577+ checker = WatchChecker(self)
578+ watcher = yield checker.watch(wordpress_states["unit_relation"])
579 yield watcher.start()
580-
581- # wait for initial callback
582- yield wait_callback[0]
583- self.verify_unit_watch_result(
584- results[0], old_units=[], new_units=[mysql_states["unit"]])
585-
586- # and initial settings version notification
587- yield wait_callback[1]
588- self.verify_unit_watch_result(
589- results[1], modified=((mysql_states["unit"], 0),))
590-
591- # modify the unit and get another callback, verify the result.
592+ yield checker.assert_members_cb(0, [], [mysql_unit])
593+ yield checker.assert_settings_cb(1, mysql_unit, 0)
594 yield mysql_states["unit_relation"].set_data(dict(hello="world"))
595- yield wait_callback[2]
596- self.verify_unit_watch_result(
597- results[2], modified=((mysql_states["unit"], 1),))
598+ yield checker.assert_settings_cb(2, mysql_unit, 1)
599
600 # directly delete the presence node to trigger a deletion notification
601 self.client.delete("/relations/%s/server/%s" % (
602@@ -1004,9 +1001,7 @@
603 mysql_states["unit"].internal_id))
604
605 # verify the deletion result.
606- yield wait_callback[3]
607- self.verify_unit_watch_result(
608- results[3], old_units=[mysql_states["unit"]], new_units=[])
609+ yield checker.assert_members_cb(3, [mysql_unit], [])
610
611 @inlineCallbacks
612 def test_watch_server_client_with_new_service(self):
613@@ -1020,32 +1015,18 @@
614 mysql_states = yield self.add_relation_service_unit_from_endpoints(
615 mysql_ep, wordpress_ep)
616
617- # setup callbacks and start observing.
618- wait_callback = [Deferred() for i in range(5)]
619- results = []
620-
621- def watch_related(old_units=None, new_units=None, modified=None):
622- results.append((old_units, new_units, modified))
623- wait_callback[len(results)-1].callback(True)
624-
625- watcher = yield mysql_states["unit_relation"].watch_related_units(
626- watch_related)
627+ checker = WatchChecker(self)
628+ watcher = yield checker.watch(mysql_states["unit_relation"])
629 yield watcher.start()
630-
631- # assert no firing of callback
632- self.assertFalse(results)
633+ yield self.sleep(0.1)
634+ checker.assert_cb_count(0)
635
636 # add the client service and a unit of that
637 wordpress_states = yield self.add_opposite_service_unit(
638 mysql_states)
639-
640- yield wait_callback[0]
641- self.verify_unit_watch_result(
642- results[0], old_units=[], new_units=[wordpress_states["unit"]])
643-
644- yield wait_callback[1]
645- self.verify_unit_watch_result(
646- results[1], modified=((wordpress_states["unit"], 0),))
647+ wordpress_unit = wordpress_states["unit"]
648+ yield checker.assert_members_cb(0, [], [wordpress_unit])
649+ yield checker.assert_settings_cb(1, wordpress_unit, 0)
650
651 @inlineCallbacks
652 def test_watch_peer(self):
653@@ -1059,58 +1040,37 @@
654 riak2_unit = yield riak_states["service"].add_unit_state()
655 yield riak_states["service_relation"].add_unit_state(riak2_unit)
656
657- # setup callbacks and start observing.
658- wait_callback = [Deferred() for i in range(10)]
659- results = []
660-
661- def watch_related(old_units=None, new_units=None, modified=None):
662- results.append((old_units, new_units, modified))
663- wait_callback[len(results)-1].callback(True)
664-
665- watcher = yield riak_states["unit_relation"].watch_related_units(
666- watch_related)
667+ checker = WatchChecker(self)
668+ watcher = yield checker.watch(riak_states["unit_relation"])
669 yield watcher.start()
670
671 # wait for initial callbacks
672- yield wait_callback[0]
673- self.verify_unit_watch_result(
674- results[0], old_units=[], new_units=[riak2_unit])
675- yield wait_callback[1]
676- self.verify_unit_watch_result(
677- results[1], modified=((riak2_unit, 0),))
678+ yield checker.assert_members_cb(0, [], [riak2_unit])
679+ yield checker.assert_settings_cb(1, riak2_unit, 0)
680
681 # verify modifying self does not cause a notification.
682 yield riak_states["unit_relation"].set_data(dict(hello="world"))
683 yield self.sleep(0.1)
684- self.assertEqual(len(results), 2)
685+ checker.assert_cb_count(2)
686
687 # add another unit
688 riak3_unit = yield riak_states["service"].add_unit_state()
689 riak3_relation = yield riak_states["service_relation"].add_unit_state(
690 riak3_unit)
691-
692- yield wait_callback[2]
693- self.verify_unit_watch_result(
694- results[2],
695- old_units=[riak2_unit], new_units=[riak2_unit, riak3_unit])
696- yield wait_callback[3]
697- self.verify_unit_watch_result(
698- results[3], modified=((riak3_unit, 0),))
699-
700- # add remove one (no api atm, so directly to trigger notification)
701+ yield checker.assert_members_cb(
702+ 2, [riak2_unit], [riak2_unit, riak3_unit])
703+ yield checker.assert_settings_cb(3, riak3_unit, 0)
704+
705+ # remove one (no api atm, so directly to trigger notification)
706 yield self.client.delete(
707 "/relations/%s/peer/%s" % (riak_states["relation"].internal_id,
708 riak2_unit.internal_id))
709- yield wait_callback[4]
710- self.verify_unit_watch_result(
711- results[4],
712- old_units=[riak2_unit, riak3_unit], new_units=[riak3_unit])
713+ yield checker.assert_members_cb(
714+ 4, [riak2_unit, riak3_unit], [riak3_unit])
715
716 # modify one.
717 yield riak3_relation.set_data(dict(later="eventually"))
718- yield wait_callback[5]
719- self.verify_unit_watch_result(
720- results[5], modified=((riak3_unit, 1),))
721+ yield checker.assert_settings_cb(5, riak3_unit, 1)
722
723 @inlineCallbacks
724 def test_watch_role_container_created_concurrently(self):
725@@ -1126,6 +1086,7 @@
726 "mysql", "client-server", "", "server")
727 wordpress_states = yield self.add_relation_service_unit_from_endpoints(
728 wordpress_ep, mysql_ep)
729+ wordpress_unit = wordpress_states["unit"]
730 mysql_states = yield self.add_opposite_service_unit(
731 wordpress_states)
732
733@@ -1146,23 +1107,10 @@
734
735 self.mocker.replay()
736
737- # setup callbacks and start observing.
738- wait_callback = [Deferred() for i in range(5)]
739- results = []
740-
741- def watch_related(old_units=None, new_units=None, modified=None):
742- results.append((old_units, new_units, modified))
743- wait_callback[len(results)-1].callback(True)
744-
745- # Watch
746- watcher = yield mysql_states["unit_relation"].watch_related_units(
747- watch_related)
748+ checker = WatchChecker(self)
749+ watcher = yield checker.watch(mysql_states["unit_relation"])
750 yield watcher.start()
751-
752- # Verify
753- yield wait_callback[0]
754- self.verify_unit_watch_result(
755- results[0], old_units=[], new_units=[wordpress_states["unit"]])
756+ yield checker.assert_members_cb(0, [], [wordpress_unit])
757
758 @inlineCallbacks
759 def test_watch_deleted_modify_notifications(self):
760@@ -1178,48 +1126,31 @@
761 "mysql", "client-server", "", "server")
762 wordpress_states = yield self.add_relation_service_unit_from_endpoints(
763 wordpress_ep, mysql_ep)
764+ wordpress_unit = wordpress_states["unit"]
765 mysql_states = yield self.add_opposite_service_unit(
766 wordpress_states)
767
768- # setup callbacks and start observing.
769- wait_callback = [Deferred() for i in range(5)]
770- results = []
771-
772- def watch_related(old_units=None, new_units=None, modified=None):
773- results.append((old_units, new_units, modified))
774- wait_callback[len(results)-1].callback(True)
775-
776 # Start watching
777- watcher = yield mysql_states["unit_relation"].watch_related_units(
778- watch_related)
779+ checker = WatchChecker(self)
780+ watcher = yield checker.watch(mysql_states["unit_relation"])
781 yield watcher.start()
782-
783- yield wait_callback[0]
784- self.verify_unit_watch_result(
785- results[0], old_units=[], new_units=[wordpress_states["unit"]])
786-
787- yield wait_callback[1]
788- self.verify_unit_watch_result(
789- results[1], modified=((wordpress_states["unit"], 0),))
790+ yield checker.assert_members_cb(0, [], [wordpress_unit])
791+ yield checker.assert_settings_cb(1, wordpress_unit, 0)
792
793 # Delete the presence path
794 presence_path = "/relations/%s/client/%s" % (
795 wordpress_states["relation"].internal_id,
796 wordpress_states["unit"].internal_id)
797 yield self.client.delete(presence_path)
798+ yield checker.assert_members_cb(2, [wordpress_unit], [])
799
800 # Modify the settings path
801 settings_path = self.get_unit_settings_path(wordpress_states)
802 yield self.client.set(settings_path, "some random string")
803
804- # Verify only deletion callback
805- yield wait_callback[2]
806- self.verify_unit_watch_result(
807- results[2], old_units=[wordpress_states["unit"]], new_units=[])
808-
809 # Give a moment to ensure we don't see any new callbacks
810 yield self.sleep(0.1)
811- self.assertEquals(len(results), 3)
812+ checker.assert_cb_count(3)
813
814 @inlineCallbacks
815 def test_watch_with_settings_deleted(self):
816@@ -1236,29 +1167,15 @@
817 "mysql", "client-server", "", "server")
818 wordpress_states = yield self.add_relation_service_unit_from_endpoints(
819 wordpress_ep, mysql_ep)
820+ wordpress_unit = wordpress_states["unit"]
821 mysql_states = yield self.add_opposite_service_unit(
822 wordpress_states)
823
824- # setup callbacks and start observing.
825- wait_callback = [Deferred() for i in range(5)]
826- results = []
827-
828- def watch_related(old_units=None, new_units=None, modified=None):
829- results.append((old_units, new_units, modified))
830- wait_callback[len(results)-1].callback(True)
831-
832- # Start watching
833- watcher = yield mysql_states["unit_relation"].watch_related_units(
834- watch_related)
835+ checker = WatchChecker(self)
836+ watcher = yield checker.watch(mysql_states["unit_relation"])
837 yield watcher.start()
838-
839- yield wait_callback[0]
840- self.verify_unit_watch_result(
841- results[0], old_units=[], new_units=[wordpress_states["unit"]])
842-
843- yield wait_callback[1]
844- self.verify_unit_watch_result(
845- results[1], modified=((wordpress_states["unit"], 0),))
846+ yield checker.assert_members_cb(0, [], [wordpress_unit])
847+ yield checker.assert_settings_cb(1, wordpress_unit, 0)
848
849 # Delete the settings path
850 settings_path = "/relations/%s/settings/%s" % (
851@@ -1268,7 +1185,7 @@
852
853 # Verify no callbacks
854 yield self.sleep(0.1)
855- self.assertEqual(len(results), 2)
856+ checker.assert_cb_count(2)
857
858 # Recreate the settings path; this should trigger a callback.
859 # Note that this is not likely to happen in reality, and if it does
860@@ -1276,15 +1193,11 @@
861 # to 0, and HookScheduler depends on that value continuing to increase
862 # so it can determine whether changes happened while it was inactive.
863 yield self.client.create(settings_path, "abc")
864- yield wait_callback[2]
865- self.verify_unit_watch_result(
866- results[2], modified=((wordpress_states["unit"], 0),))
867+ yield checker.assert_settings_cb(2, wordpress_unit, 0)
868
869 # And modify it.
870 yield self.client.set(settings_path, "123")
871- yield wait_callback[3]
872- self.verify_unit_watch_result(
873- results[3], modified=((wordpress_states["unit"], 1),))
874+ yield checker.assert_settings_cb(3, wordpress_unit, 1)
875
876 @inlineCallbacks
877 def test_watch_start_stop_start_with_existing_service(self):
878@@ -1304,28 +1217,15 @@
879 "mysql", "client-server", "", "server")
880 wordpress_states = yield self.add_relation_service_unit_from_endpoints(
881 wordpress_ep, mysql_ep)
882+ wordpress_unit = wordpress_states["unit"]
883 mysql_states = yield self.add_opposite_service_unit(
884 wordpress_states)
885
886- # setup callbacks and start observing.
887- wait_callback = [Deferred() for i in range(5)]
888- results = []
889-
890- def watch_related(old_units=None, new_units=None, modified=None):
891- results.append((old_units, new_units, modified))
892- wait_callback[len(results)-1].callback(True)
893-
894- # Start watching
895- watcher = yield mysql_states["unit_relation"].watch_related_units(
896- watch_related)
897+ checker = WatchChecker(self)
898+ watcher = yield checker.watch(mysql_states["unit_relation"])
899 yield watcher.start()
900-
901- yield wait_callback[0]
902- self.verify_unit_watch_result(
903- results[0], old_units=[], new_units=[wordpress_states["unit"]])
904- yield wait_callback[1]
905- self.verify_unit_watch_result(
906- results[1], modified=((wordpress_states["unit"], 0),))
907+ yield checker.assert_members_cb(0, [], [wordpress_unit])
908+ yield checker.assert_settings_cb(1, wordpress_unit, 0)
909
910 # Stop watching
911 watcher.stop()
912@@ -1333,26 +1233,20 @@
913 # Add a new unit
914 wordpress2_states = yield self.add_related_service_unit(
915 wordpress_states)
916+ wordpress2_unit = wordpress2_states["unit"]
917
918- # Modify a unit
919+ # Modify a unit (this change will not be detected, ever)
920 yield wordpress_states["unit_relation"].set_data(dict(hello="world"))
921
922 # Verify no callbacks
923 yield self.sleep(0.1)
924- self.assertEqual(len(results), 2)
925+ checker.assert_cb_count(2)
926
927- # Start watching
928+ # Start watching again; watch for addition
929 yield watcher.start()
930-
931- # Verify we see the addition.
932- yield wait_callback[2]
933- self.verify_unit_watch_result(
934- results[2],
935- old_units=[wordpress_states["unit"]],
936- new_units=[wordpress_states["unit"], wordpress2_states["unit"]])
937- yield wait_callback[3]
938- self.verify_unit_watch_result(
939- results[3], modified=((wordpress2_states["unit"], 0),))
940+ yield checker.assert_members_cb(
941+ 2, [wordpress_unit], [wordpress_unit, wordpress2_unit])
942+ yield checker.assert_settings_cb(3, wordpress2_unit, 0)
943
944 @inlineCallbacks
945 def test_watch_start_stop_start_with_new_service(self):
946@@ -1373,48 +1267,37 @@
947 mysql_states = yield self.add_relation_service_unit_from_endpoints(
948 mysql_ep, wordpress_ep)
949
950- # setup callbacks and start observing.
951- wait_callback = [Deferred() for i in range(5)]
952- results = []
953-
954- def watch_related(old_units=None, new_units=None, modified=None):
955- results.append((old_units, new_units, modified))
956- wait_callback[len(results)-1].callback(True)
957-
958- # Start watching
959- watcher = yield mysql_states["unit_relation"].watch_related_units(
960- watch_related)
961+ checker = WatchChecker(self)
962+ watcher = yield checker.watch(mysql_states["unit_relation"])
963 yield watcher.start()
964-
965- # Stop watching
966 watcher.stop()
967
968- # Add the new service and a unit
969+ # Add the new service and 2 units
970 wordpress_states = yield self.add_opposite_service_unit(
971 mysql_states)
972-
973- # Add another unit
974 wordpress2_states = yield self.add_related_service_unit(
975 wordpress_states)
976+ wordpress_unit = wordpress_states["unit"]
977+ wordpress2_unit = wordpress2_states["unit"]
978
979 # Modify a unit
980 yield wordpress_states["unit_relation"].set_data(dict(hello="world"))
981
982 # Verify no callbacks
983 yield self.sleep(0.1)
984- self.assertEqual(len(results), 0)
985+ checker.assert_cb_count(0)
986
987 # Start watching
988 yield watcher.start()
989-
990- # Wait a moment for callback
991- yield wait_callback[0]
992-
993- self.assertEqual(len(results), 1)
994- self.verify_unit_watch_result(
995- results[0],
996- old_units=[],
997- new_units=[wordpress_states["unit"], wordpress2_states["unit"]])
998+ yield checker.assert_members_cb(
999+ 0, [], [wordpress_unit, wordpress2_unit])
1000+
1001+ # We expect a settings callback for each...
1002+ yield checker.wait_for_cb(2)
1003+
1004+ # ...but only one
1005+ yield self.sleep(0.1)
1006+ checker.assert_cb_count(3)
1007
1008 @inlineCallbacks
1009 def test_watch_user_callback_invocation_delays_node_watch(self):
1010@@ -1430,18 +1313,8 @@
1011 riak_states = yield self.add_relation_service_unit(
1012 "riak", "riak", "kvstore", "peer")
1013
1014- wait_callback = [Deferred() for i in range(10)]
1015- finish_callback = [Deferred() for i in range(10)]
1016-
1017- results = []
1018-
1019- def watch_related(old_units=None, new_units=None, modified=None):
1020- results.append((old_units, new_units, modified))
1021- wait_callback[len(results)-1].callback(True)
1022- return finish_callback[len(results)-1]
1023-
1024- watcher = yield riak_states["unit_relation"].watch_related_units(
1025- watch_related)
1026+ checker = WatchChecker(self, block_cbs=True)
1027+ watcher = yield checker.watch(riak_states["unit_relation"])
1028 yield watcher.start()
1029
1030 # Create a new unit and add it to the relation.
1031@@ -1450,60 +1323,53 @@
1032 riak_unit2)
1033
1034 # Wait for it
1035- yield wait_callback[0]
1036- self.verify_unit_watch_result(
1037- results[0], old_units=[], new_units=[riak_unit2])
1038+ yield checker.assert_members_cb(0, [], [riak_unit2])
1039
1040 # We are also expecting a notification for the initial settings version
1041 # ...but we won't get that until the first callback is done
1042 self.sleep(0.1)
1043- self.assertEquals(len(results), 1)
1044+ checker.assert_cb_count(1)
1045
1046 # While we wait for this, someone modifies the settings
1047 yield riak_unit2_rel.set_data(dict(hello="world"))
1048
1049 # Hey, the add callback finished!
1050- finish_callback[0].callback(True)
1051+ checker.unblock_cb(0)
1052
1053 # OK, now we expect to see the initial setting version callback
1054- yield wait_callback[1]
1055- self.verify_unit_watch_result(
1056- results[1], modified=((riak_unit2, 0),))
1057+ yield checker.assert_settings_cb(1, riak_unit2, 0)
1058
1059 # ...but that is also taking a long time, so we shouldn't expect to see
1060 # the callback for the explicit modification yet...
1061- self.sleep(0.1)
1062- self.assertEquals(len(results), 2)
1063+ yield self.sleep(0.1)
1064+ checker.assert_cb_count(2)
1065
1066 # ...or, in fact, for this other modification that just happened, which
1067 # will be collapsed into the other one...
1068 yield riak_unit2_rel.set_data(dict(hello="world 2"))
1069- self.sleep(0.1)
1070- self.assertEquals(len(results), 2)
1071+ yield self.sleep(0.1)
1072+ checker.assert_cb_count(2)
1073
1074 # ...so, we should have 1 callback in progress, and only 1 pending
1075 # notification. OK, finish the callback...
1076- finish_callback[1].callback(True)
1077+ checker.unblock_cb(1)
1078
1079 # ...and wait for the change notification.
1080- yield wait_callback[2]
1081- self.assertEqual(len(results), 3)
1082- self.verify_unit_watch_result(
1083- results[2], modified=((riak_unit2, 2),))
1084+ yield checker.assert_settings_cb(2, riak_unit2, 2)
1085
1086 # Finish the callback and verify no other invocations.
1087- finish_callback[2].callback(True)
1088+ checker.unblock_cb(2)
1089 yield self.sleep(0.1)
1090- self.assertEqual(len(results), 3)
1091+ checker.assert_cb_count(3)
1092
1093- # Modify the node again, we should see this change, immediately.
1094+ # Modify the node again; we should see this change immediately.
1095 yield riak_unit2_rel.set_data(dict(hello="goodbye"))
1096- yield wait_callback[3]
1097- finish_callback[3].callback(True)
1098+ yield checker.assert_settings_cb(3, riak_unit2, 3)
1099
1100- self.verify_unit_watch_result(
1101- results[3], modified=((riak_unit2, 3),))
1102- self.assertEqual(len(results), 4)
1103+ # And again, finish the callback and verify no other invocations.
1104+ checker.unblock_cb(3)
1105+ yield self.sleep(0.1)
1106+ checker.assert_cb_count(4)
1107
1108 node_path = "/relations/relation-0000000000/settings/unit-0000000001"
1109 expected_output = (
1110@@ -1527,18 +1393,8 @@
1111 riak_states = yield self.add_relation_service_unit(
1112 "riak", "riak", "kvstore", "peer")
1113
1114- wait_callback = [Deferred() for i in range(10)]
1115- finish_callback = [Deferred() for i in range(10)]
1116-
1117- results = []
1118-
1119- def watch_related(old_units=None, new_units=None, modified=None):
1120- results.append((old_units, new_units, modified))
1121- wait_callback[len(results)-1].callback(True)
1122- return finish_callback[len(results)-1]
1123-
1124- watcher = yield riak_states["unit_relation"].watch_related_units(
1125- watch_related)
1126+ checker = WatchChecker(self, block_cbs=True)
1127+ watcher = yield checker.watch(riak_states["unit_relation"])
1128 yield watcher.start()
1129
1130 # Create a new unit and add it to the relation.
1131@@ -1547,9 +1403,7 @@
1132 riak_unit2)
1133
1134 # Wait for it
1135- yield wait_callback[0]
1136- self.verify_unit_watch_result(
1137- results[0], old_units=[], new_units=[riak_unit2])
1138+ yield checker.assert_members_cb(0, [], [riak_unit2])
1139
1140 # Now add a new unit: we won't see it immediately, since the callback
1141 # is still executing, but we will have a container change pending
1142@@ -1557,12 +1411,10 @@
1143 yield riak_states["service_relation"].add_unit_state(
1144 riak_unit3)
1145
1146- # Finish the first callback, immediately hit the settings version
1147+ # Finish the first callback; immediately hit the settings version
1148 # callback, which will also take a while
1149- finish_callback[0].callback(True)
1150- yield wait_callback[1]
1151- self.verify_unit_watch_result(
1152- results[1], modified=((riak_unit2, 0),))
1153+ checker.unblock_cb(0)
1154+ yield checker.assert_settings_cb(1, riak_unit2, 0)
1155
1156 # Adding another unit; will be rolled into the container change we're
1157 # already expecting from before
1158@@ -1572,12 +1424,9 @@
1159
1160 # Now release the container callback, and verify the callback
1161 # for both the new nodes.
1162- finish_callback[1].callback(True)
1163- yield wait_callback[2]
1164- self.verify_unit_watch_result(
1165- results[2],
1166- old_units=[riak_unit2],
1167- new_units=[riak_unit2, riak_unit3, riak_unit4])
1168+ checker.unblock_cb(1)
1169+ yield checker.assert_members_cb(
1170+ 2, [riak_unit2], [riak_unit2, riak_unit3, riak_unit4])
1171
1172 @inlineCallbacks
1173 def test_watch_concurrent_callback_execution(self):
1174@@ -1585,35 +1434,6 @@
1175
1176 IFF they are not synchronous and not on the same node.
1177 """
1178- #Setup parallel callbacks bookeeping and implementation
1179- wait_callback = dict([(i, Deferred()) for i in range(10)])
1180- results = {}
1181- success_deferrals = {
1182- 2: Deferred(),
1183- 5: Deferred()}
1184-
1185- @inlineCallbacks
1186- def watch_related(
1187- old_units=None, new_units=None, modified=None, callback_id=None):
1188- success_result = success_deferrals.get(callback_id)
1189- if success_result is not None:
1190- yield success_result
1191- results[callback_id] = (old_units, new_units, modified)
1192- wait_callback[callback_id].callback(True)
1193-
1194- class ParallelWatcherCallback(object):
1195-
1196- def __init__(self):
1197- self.callback_id_sequence = 0
1198-
1199- def __call__(self, *args, **kw):
1200- callback = functools.partial(
1201- watch_related, callback_id=self.callback_id_sequence)
1202- self.callback_id_sequence += 1
1203- return callback(*args, **kw)
1204-
1205- watcher_callback = ParallelWatcherCallback()
1206-
1207 # Add the relation, services, and related units.
1208 wordpress_ep = RelationEndpoint(
1209 "wordpress", "client-server", "", "client")
1210@@ -1621,56 +1441,74 @@
1211 "mysql", "client-server", "", "server")
1212 wordpress_states = yield self.add_relation_service_unit_from_endpoints(
1213 wordpress_ep, mysql_ep)
1214+ wordpress_unit = wordpress_states["unit"]
1215 mysql_states = yield self.add_opposite_service_unit(
1216 wordpress_states)
1217
1218- # Start watching, and give a moment to establish
1219- watcher = yield mysql_states["unit_relation"].watch_related_units(
1220- watcher_callback)
1221+ checker = WatchChecker(self, block_cbs=True)
1222+ # To verify parallel execution, this checker will make us wait for
1223+ # some of the callbacks (2 and 5), but leave the rest unimpeded.
1224+ for i in (0, 1, 3, 4, 6):
1225+ checker.unblock_cb(i)
1226+
1227+ watcher = yield checker.watch(mysql_states["unit_relation"])
1228 yield watcher.start()
1229+ yield checker.wait_for_cb(0)
1230+ yield checker.wait_for_cb(1)
1231
1232- # Modify a unit
1233+ # Modify a unit (blocking callback)
1234 yield wordpress_states["unit_relation"].set_data(dict(hello="world"))
1235+ yield checker.wait_for_cb(2)
1236+
1237+ # Modify the unit again (will wait for previous)
1238+ yield wordpress_states["unit_relation"].set_data(dict(hello="world 2"))
1239
1240 # Add a unit.
1241 wordpress2_states = yield self.add_related_service_unit(
1242 wordpress_states)
1243+ wordpress2_unit = wordpress2_states["unit"]
1244+ yield checker.wait_for_cb(3)
1245+ yield checker.wait_for_cb(4)
1246
1247- # Delete a unit
1248+ # Delete a unit (blocking callbck)
1249 presence_path = "/relations/%s/client/%s" % (
1250 wordpress_states["relation"].internal_id,
1251 wordpress_states["unit"].internal_id)
1252 yield self.client.delete(presence_path)
1253-
1254- # Verify the parallel execution, give a moment for callbacks to finish.
1255- yield self.sleep(0.1)
1256- self.assertEqual(len(results), 4)
1257-
1258- self.verify_unit_watch_result(
1259- results[0], old_units=[], new_units=[wordpress_states["unit"]])
1260- self.verify_unit_watch_result(
1261- results[1], modified=((wordpress_states["unit"], 0),))
1262- self.verify_unit_watch_result(
1263- results[3],
1264- old_units=[wordpress_states["unit"]],
1265- new_units=[wordpress_states["unit"], wordpress2_states["unit"]])
1266- self.verify_unit_watch_result(
1267- results[4], modified=((wordpress2_states["unit"], 0),))
1268-
1269- for k, v in success_deferrals.items():
1270- v.callback(True)
1271-
1272- # Verify all callback results, give a moment for them to finish.
1273- yield wait_callback[2]
1274- yield wait_callback[5]
1275- self.assertEquals(len(results), 6)
1276-
1277- self.verify_unit_watch_result(
1278- results[2], modified=((wordpress_states["unit"], 1),))
1279- self.verify_unit_watch_result(
1280- results[5],
1281- old_units=[wordpress_states["unit"], wordpress2_states["unit"]],
1282- new_units=[wordpress2_states["unit"]])
1283+ yield checker.wait_for_cb(5)
1284+
1285+ # ...and delete the other unit (also blocked)
1286+ presence_path = "/relations/%s/client/%s" % (
1287+ wordpress2_states["relation"].internal_id,
1288+ wordpress2_states["unit"].internal_id)
1289+ yield self.client.delete(presence_path)
1290+
1291+ # Verify that all unblocked callbacks have started correctly
1292+ yield checker.assert_members_cb(0, [], [wordpress_unit])
1293+ yield checker.assert_settings_cb(1, wordpress_unit, 0)
1294+ yield checker.assert_settings_cb(2, wordpress_unit, 1)
1295+ yield checker.assert_members_cb(
1296+ 3, [wordpress_unit], [wordpress_unit, wordpress2_unit])
1297+ yield checker.assert_settings_cb(4, wordpress2_unit, 0)
1298+ yield checker.assert_members_cb(
1299+ 5, [wordpress_unit, wordpress2_unit], [wordpress2_unit])
1300+ checker.assert_cb_count(6)
1301+ # OK, fine, but cbs 2 and 5 are still blocking.
1302+
1303+ # Whoops, looks like the unit got deleted before we could notify the
1304+ # second settings change. Check nothing happens:
1305+ checker.unblock_cb(2)
1306+ yield self.sleep(0.1)
1307+ checker.assert_cb_count(6)
1308+
1309+ # Now complete processing of the first delete, and check that we then
1310+ # *do* get notified of the second delete:
1311+ checker.unblock_cb(5)
1312+ yield checker.assert_members_cb(6, [wordpress2_unit], [])
1313+
1314+ # ...and finally double-check no further callbacks:
1315+ yield self.sleep(0.1)
1316+ checker.assert_cb_count(7)
1317
1318 @inlineCallbacks
1319 def test_watch_unknown_relation_role_error(self):
1320@@ -1685,5 +1523,6 @@
1321 self.fail("Should not be called")
1322
1323 yield self.failUnlessFailure(
1324- wordpress_states["unit_relation"].watch_related_units(not_called),
1325+ wordpress_states["unit_relation"].watch_related_units(
1326+ not_called, not_called),
1327 UnknownRelationRole)
1328
1329=== modified file 'juju/unit/lifecycle.py'
1330--- juju/unit/lifecycle.py 2011-12-05 02:11:54 +0000
1331+++ juju/unit/lifecycle.py 2011-12-05 02:11:55 +0000
1332@@ -436,7 +436,8 @@
1333 # Create a watcher if we don't have one yet.
1334 if self._watcher is None:
1335 self._watcher = yield self._unit_relation.watch_related_units(
1336- self._scheduler.notify_change)
1337+ self._scheduler.members_changed,
1338+ self._scheduler.settings_changed)
1339 # And start the watcher.
1340 if watches:
1341 yield self._watcher.start()

Subscribers

People subscribed via source and target branches

to status/vote changes: