Merge lp:~fwereade/pyjuju/watch-related-units-callbacks into lp:pyjuju
- watch-related-units-callbacks
- Merge into trunk
Proposed by
William Reade
Status: | Superseded | ||||
---|---|---|---|---|---|
Proposed branch: | lp:~fwereade/pyjuju/watch-related-units-callbacks | ||||
Merge into: | lp:pyjuju | ||||
Prerequisite: | lp:~fwereade/pyjuju/resolve-unit-relation-diffs | ||||
Diff against target: |
1341 lines (+310/-491) 5 files modified
juju/hooks/scheduler.py (+13/-27) juju/hooks/tests/test_scheduler.py (+31/-39) juju/state/relation.py (+9/-8) juju/state/tests/test_relation.py (+255/-416) juju/unit/lifecycle.py (+2/-1) |
||||
To merge this branch: | bzr merge lp:~fwereade/pyjuju/watch-related-units-callbacks | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email:
|
This proposal has been superseded by a proposal from 2011-12-12.
Commit message
Description of the change
Lack of separate callbacks for membership changes and settings changes made it harder for me to follow the code, and especially the tests. Should now be more comprehensible. Both callbacks still need to be synchronous, but this doesn't represent a real change IMO; it's just something to remain aware of.
To post a comment you must log in.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
William Reade (fwereade) wrote : | # |
- 485. By William Reade
-
merge parent
- 486. By William Reade
-
merge parent
- 487. By William Reade
-
merge parent
- 488. By William Reade
-
merge parent
- 489. By William Reade
-
merge parent
- 490. By William Reade
-
rename foo_callback to cb_foo
- 491. By William Reade
-
poke_zk changes, as requested (since these are just checking for callbacks firing, I think they're suitable)
- 492. By William Reade
-
merge parent
- 493. By William Reade
-
merge parent
- 494. By William Reade
-
merge parent
- 495. By William Reade
-
merge parent
- 496. By William Reade
-
merge parent
- 497. By William Reade
-
merge parent
- 498. By William Reade
-
merge parent
- 499. By William Reade
-
merge parent
- 500. By William Reade
-
merge parent
- 501. By William Reade
-
merge parent
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'juju/hooks/scheduler.py' |
2 | --- juju/hooks/scheduler.py 2011-12-05 02:11:54 +0000 |
3 | +++ juju/hooks/scheduler.py 2011-12-05 02:11:55 +0000 |
4 | @@ -137,31 +137,11 @@ |
5 | # occurs. |
6 | self._run_queue.put(None) |
7 | |
8 | - def notify_change(self, old_units=(), new_units=(), modified=()): |
9 | - """Receive changes regarding related units and schedule hook execution. |
10 | - """ |
11 | - log.debug("relation change old:%s, new:%s, modified:%s", |
12 | - old_units, new_units, modified) |
13 | - |
14 | + def members_changed(self, old_units, new_units): |
15 | + log.debug("members changed: old=%s, new=%s", old_units, new_units) |
16 | + scheduled = 0 |
17 | self._clock_sequence += 1 |
18 | |
19 | - # UnitRelationState.watch_related_units will EITHER call back with |
20 | - # old_units and new_units, OR with modified; never both. |
21 | - # This should in fact probably be two distinct callbacks... |
22 | - if old_units or new_units: |
23 | - assert not modified, "got mixed settings and membership changes" |
24 | - scheduled = self._notify_membership_change(old_units, new_units) |
25 | - else: |
26 | - assert modified, "got no changes" |
27 | - scheduled = self._notify_settings_change(modified) |
28 | - |
29 | - if scheduled: |
30 | - self._run_queue.put(self._clock_sequence) |
31 | - self._save_state() |
32 | - |
33 | - def _notify_membership_change(self, old_units, new_units): |
34 | - scheduled = 0 |
35 | - |
36 | if self._context_members is None: |
37 | self._context_members = list(old_units) |
38 | |
39 | @@ -184,16 +164,22 @@ |
40 | scheduled += self._queue_change( |
41 | unit_name, REMOVED, self._clock_sequence) |
42 | |
43 | - return scheduled |
44 | + if scheduled: |
45 | + self._run_queue.put(self._clock_sequence) |
46 | + self._save_state() |
47 | |
48 | - def _notify_settings_change(self, modified): |
49 | + def settings_changed(self, unit_versions): |
50 | + log.debug("settings changed: %s", unit_versions) |
51 | scheduled = 0 |
52 | - for (unit_name, version) in modified: |
53 | + self._clock_sequence += 1 |
54 | + for (unit_name, version) in unit_versions: |
55 | if version > self._latest_members.get(unit_name, 0): |
56 | self._latest_members[unit_name] = version |
57 | scheduled += self._queue_change( |
58 | unit_name, MODIFIED, self._clock_sequence) |
59 | - return scheduled |
60 | + if scheduled: |
61 | + self._run_queue.put(self._clock_sequence) |
62 | + self._save_state() |
63 | |
64 | def get_hook_context(self, change): |
65 | """ |
66 | |
67 | === modified file 'juju/hooks/tests/test_scheduler.py' |
68 | --- juju/hooks/tests/test_scheduler.py 2011-12-05 02:11:54 +0000 |
69 | +++ juju/hooks/tests/test_scheduler.py 2011-12-05 02:11:55 +0000 |
70 | @@ -56,14 +56,14 @@ |
71 | results in a modify event. |
72 | """ |
73 | self.write_single_unit_state() |
74 | - self.scheduler.notify_change(old_units=["u-1"], new_units=[]) |
75 | - self.scheduler.notify_change(old_units=[], new_units=["u-1"]) |
76 | + self.scheduler.members_changed(["u-1"], []) |
77 | + self.scheduler.members_changed([], ["u-1"]) |
78 | self.scheduler.run() |
79 | self.assertEqual(len(self.executions), 1) |
80 | self.assertEqual(self.executions[0][1].change_type, "modified") |
81 | |
82 | - output = ("relation change old:['u-1'], new:[], modified:()", |
83 | - "relation change old:[], new:['u-1'], modified:()", |
84 | + output = ("members changed: old=['u-1'], new=[]", |
85 | + "members changed: old=[], new=['u-1']", |
86 | "start", |
87 | "executing hook for u-1:modified\n") |
88 | self.assertEqual(self.log_stream.getvalue(), "\n".join(output)) |
89 | @@ -73,33 +73,33 @@ |
90 | An extra validation of the previous test. |
91 | """ |
92 | self.write_single_unit_state() |
93 | - self.scheduler.notify_change(modified=[("u-1", 1)]) |
94 | - self.scheduler.notify_change(old_units=["u-1"], new_units=[]) |
95 | - self.scheduler.notify_change(old_units=[], new_units=["u-1"]) |
96 | + self.scheduler.settings_changed([("u-1", 1)]) |
97 | + self.scheduler.members_changed(["u-1"], []) |
98 | + self.scheduler.members_changed([], ["u-1"]) |
99 | self.scheduler.run() |
100 | self.assertEqual(len(self.executions), 1) |
101 | self.assertEqual(self.executions[0][1].change_type, "modified") |
102 | |
103 | def test_reduce_add_modify(self): |
104 | """An add and modify event for a node are coalesced to an add.""" |
105 | - self.scheduler.notify_change(old_units=[], new_units=["u-1"]) |
106 | - self.scheduler.notify_change(modified=[("u-1", 1)]) |
107 | + self.scheduler.members_changed([], ["u-1"]) |
108 | + self.scheduler.settings_changed([("u-1", 1)]) |
109 | self.scheduler.run() |
110 | self.assertEqual(len(self.executions), 1) |
111 | self.assertEqual(self.executions[0][1].change_type, "joined") |
112 | |
113 | def test_reduce_add_remove(self): |
114 | """an add followed by a removal results in a noop.""" |
115 | - self.scheduler.notify_change(old_units=[], new_units=["u-1"]) |
116 | - self.scheduler.notify_change(old_units=["u-1"], new_units=[]) |
117 | + self.scheduler.members_changed([], ["u-1"]) |
118 | + self.scheduler.members_changed(["u-1"], []) |
119 | self.scheduler.run() |
120 | self.assertEqual(len(self.executions), 0) |
121 | |
122 | def test_reduce_modify_remove(self): |
123 | """Modifying and then removing a node, results in just the removal.""" |
124 | self.write_single_unit_state() |
125 | - self.scheduler.notify_change(modified=[("u-1", 1)]) |
126 | - self.scheduler.notify_change(old_units=["u-1"], new_units=[]) |
127 | + self.scheduler.settings_changed([("u-1", 1)]) |
128 | + self.scheduler.members_changed(["u-1"], []) |
129 | self.scheduler.run() |
130 | self.assertEqual(len(self.executions), 1) |
131 | self.assertEqual(self.executions[0][1].change_type, "departed") |
132 | @@ -108,15 +108,15 @@ |
133 | """Multiple modifies get coalesced to a single modify.""" |
134 | # simulate normal startup, the first notify will always be the existing |
135 | # membership set. |
136 | - self.scheduler.notify_change(old_units=[], new_units=["u-1"]) |
137 | + self.scheduler.members_changed([], ["u-1"]) |
138 | self.scheduler.run() |
139 | self.scheduler.stop() |
140 | self.assertEqual(len(self.executions), 1) |
141 | |
142 | # Now continue the modify/modify reduction. |
143 | - self.scheduler.notify_change(modified=[("u-1", 1)]) |
144 | - self.scheduler.notify_change(modified=[("u-1", 2)]) |
145 | - self.scheduler.notify_change(modified=[("u-1", 3)]) |
146 | + self.scheduler.settings_changed([("u-1", 1)]) |
147 | + self.scheduler.settings_changed([("u-1", 2)]) |
148 | + self.scheduler.settings_changed([("u-1", 3)]) |
149 | self.scheduler.run() |
150 | |
151 | self.assertEqual(len(self.executions), 2) |
152 | @@ -143,11 +143,9 @@ |
153 | always see the membership of a relation as it was at the |
154 | time of their associated change. |
155 | """ |
156 | - self.scheduler.notify_change( |
157 | - old_units=[], new_units=["u-1", "u-2"]) |
158 | - self.scheduler.notify_change( |
159 | - old_units=["u-1", "u-2"], new_units=["u-2", "u-3"]) |
160 | - self.scheduler.notify_change(modified=[("u-2", 1)]) |
161 | + self.scheduler.members_changed([], ["u-1", "u-2"]) |
162 | + self.scheduler.members_changed(["u-1", "u-2"], ["u-2", "u-3"]) |
163 | + self.scheduler.settings_changed([("u-2", 1)]) |
164 | |
165 | self.scheduler.run() |
166 | self.scheduler.stop() |
167 | @@ -161,9 +159,8 @@ |
168 | change_members = yield self.executions[0][0].get_members() |
169 | self.assertEqual(change_members, ["u-2"]) |
170 | |
171 | - self.scheduler.notify_change(modified=[("u-2", 2)]) |
172 | - self.scheduler.notify_change( |
173 | - old_units=["u-2", "u-3"], new_units=["u-2"]) |
174 | + self.scheduler.settings_changed([("u-2", 2)]) |
175 | + self.scheduler.members_changed(["u-2", "u-3"], ["u-2"]) |
176 | self.scheduler.run() |
177 | |
178 | self.assertEqual(len(self.executions), 4) |
179 | @@ -187,10 +184,8 @@ |
180 | "clock_queue": [], |
181 | "clock_sequence": 1})) |
182 | |
183 | - self.scheduler.notify_change( |
184 | - old_units=["u-1", "u-2"], |
185 | - new_units=["u-2", "u-3", "u-4"]) |
186 | - self.scheduler.notify_change(modified=[("u-2", 1)]) |
187 | + self.scheduler.members_changed(["u-1", "u-2"], ["u-2", "u-3", "u-4"]) |
188 | + self.scheduler.settings_changed([("u-2", 1)]) |
189 | |
190 | self.scheduler.run() |
191 | self.scheduler.stop() |
192 | @@ -273,10 +268,8 @@ |
193 | "clock_queue": [], |
194 | "clock_sequence": 7})) |
195 | |
196 | - self.scheduler.notify_change( |
197 | - old_units=["u-1", "u-2"], |
198 | - new_units=["u-2", "u-3"]) |
199 | - self.scheduler.notify_change(modified=[("u-2", 3)]) |
200 | + self.scheduler.members_changed(["u-1", "u-2"], ["u-2", "u-3"]) |
201 | + self.scheduler.settings_changed([("u-2", 3)]) |
202 | |
203 | # Add a stop instruction to the queue, which should *not* be saved. |
204 | self.scheduler.stop() |
205 | @@ -358,7 +351,7 @@ |
206 | version for that unit will be ignored. |
207 | """ |
208 | self.write_single_unit_state() |
209 | - self.scheduler.notify_change(modified=(("u-1", 0),)) |
210 | + self.scheduler.settings_changed([("u-1", 0),]) |
211 | self.scheduler.run() |
212 | self.assertEquals(len(self.executions), 0) |
213 | |
214 | @@ -367,8 +360,8 @@ |
215 | When a unit is added, we assume its settings version to be 0, and |
216 | therefore modified events with version 0 will be ignored. |
217 | """ |
218 | - self.scheduler.notify_change(old_units=[], new_units=["u-1"]) |
219 | - self.scheduler.notify_change(modified=(("u-1", 0),)) |
220 | + self.scheduler.members_changed([], ["u-1"]) |
221 | + self.scheduler.settings_changed([("u-1", 0),]) |
222 | self.scheduler.run() |
223 | self.assertEquals(len(self.executions), 1) |
224 | self.assertEqual(self.executions[0][1].change_type, "joined") |
225 | @@ -387,12 +380,11 @@ |
226 | "clock_queue": [], |
227 | "clock_sequence": 4})) |
228 | |
229 | - self.scheduler.notify_change( |
230 | - old_units=["u-2"], new_units=["u-3", "u-4"]) |
231 | + self.scheduler.members_changed(["u-2"], ["u-3", "u-4"]) |
232 | self.scheduler.run() |
233 | |
234 | output = ( |
235 | - "relation change old:['u-2'], new:['u-3', 'u-4'], modified:()", |
236 | + "members changed: old=['u-2'], new=['u-3', 'u-4']", |
237 | "old does not match last recorded units: ['u-1', 'u-2']", |
238 | "start", |
239 | "executing hook for u-3:joined", |
240 | |
241 | === modified file 'juju/state/relation.py' |
242 | --- juju/state/relation.py 2011-12-05 02:11:54 +0000 |
243 | +++ juju/state/relation.py 2011-12-05 02:11:55 +0000 |
244 | @@ -408,7 +408,7 @@ |
245 | returnValue(endpoint_container) |
246 | |
247 | @inlineCallbacks |
248 | - def watch_related_units(self, callback): |
249 | + def watch_related_units(self, members_callback, settings_callback): |
250 | """Register a callback to be invoked when related units change. |
251 | |
252 | @param: callback a function that gets invoked when related |
253 | @@ -443,7 +443,8 @@ |
254 | watcher_factory = PeerUnitWatcher |
255 | |
256 | watcher = watcher_factory( |
257 | - self._client, self, endpoint_container, callback) |
258 | + self._client, self, endpoint_container, |
259 | + members_callback, settings_callback) |
260 | returnValue(watcher) |
261 | |
262 | |
263 | @@ -474,12 +475,14 @@ |
264 | client, |
265 | watcher_unit, |
266 | unit_container_path, |
267 | - callback): |
268 | + members_callback, |
269 | + settings_callback): |
270 | super(RelationUnitWatcherBase, self).__init__(client) |
271 | self._units = [] |
272 | self._watcher_unit = watcher_unit |
273 | self._container_path = unit_container_path |
274 | - self._callback = callback |
275 | + self._members_callback = members_callback |
276 | + self._settings_callback = settings_callback |
277 | self._stopped = None |
278 | self._unit_name_map = None |
279 | self._log = logging.getLogger("unit.relation.watch") |
280 | @@ -606,9 +609,7 @@ |
281 | # Invoke callback |
282 | callback_d.addCallback( |
283 | lambda (old_units, new_units): maybeDeferred( |
284 | - self._callback, |
285 | - old_units=sorted(old_units), |
286 | - new_units=sorted(new_units))) |
287 | + self._members_callback, sorted(old_units), sorted(new_units))) |
288 | |
289 | # Attach initial notifiers and change handlers to new nodes |
290 | if settings_watches: |
291 | @@ -631,7 +632,7 @@ |
292 | return |
293 | ((unit_name,),) = yield self._resolve_unit_names([unit_id]) |
294 | node_info = (unit_name, stat["version"]) |
295 | - yield self._callback(modified=(node_info,)) |
296 | + yield self._settings_callback((node_info,)) |
297 | |
298 | def _watch_settings_for_units(self, added): |
299 | """Setup watches on new unit relation setting nodes. |
300 | |
301 | === modified file 'juju/state/tests/test_relation.py' |
302 | --- juju/state/tests/test_relation.py 2011-12-05 02:11:54 +0000 |
303 | +++ juju/state/tests/test_relation.py 2011-12-05 02:11:55 +0000 |
304 | @@ -24,6 +24,18 @@ |
305 | from juju.state.tests.common import StateTestBase |
306 | |
307 | |
308 | +def _1_arg(f): |
309 | + def g(arg1): |
310 | + return f(arg1) |
311 | + return g |
312 | + |
313 | + |
314 | +def _2_args(f): |
315 | + def g(arg1, arg2): |
316 | + return f(arg1, arg2) |
317 | + return g |
318 | + |
319 | + |
320 | class RelationTestBase(StateTestBase): |
321 | |
322 | @inlineCallbacks |
323 | @@ -716,23 +728,60 @@ |
324 | self.service_state2))) |
325 | |
326 | |
327 | +class WatchChecker(object): |
328 | + |
329 | + def __init__(self, test, block_cbs=False): |
330 | + self.test = test |
331 | + self.results = [] |
332 | + self.sentinels = [Deferred() for i in range(10)] |
333 | + self.blockers = [Deferred() if block_cbs else True for i in range(10)] |
334 | + |
335 | + def _members_changed(self, old, new): |
336 | + self.results.append((old, new)) |
337 | + return self._synchronize() |
338 | + |
339 | + def _settings_changed(self, modified): |
340 | + (change,) = modified |
341 | + self.results.append(change) |
342 | + return self._synchronize() |
343 | + |
344 | + @inlineCallbacks |
345 | + def _synchronize(self): |
346 | + index = len(self.results) - 1 |
347 | + self.sentinels[index].callback(True) |
348 | + yield self.blockers[index] |
349 | + |
350 | + def watch(self, unit_relation): |
351 | + return unit_relation.watch_related_units( |
352 | + self._members_changed, self._settings_changed) |
353 | + |
354 | + def assert_cb_count(self, count): |
355 | + # *started* callbacks, including those whose return is blocked |
356 | + self.test.assertEquals(len(self.results), count) |
357 | + |
358 | + def wait_for_cb(self, index): |
359 | + # wait for *call*, not return |
360 | + return self.sentinels[index] |
361 | + |
362 | + def unblock_cb(self, index): |
363 | + self.blockers[index].callback(True) |
364 | + |
365 | + @inlineCallbacks |
366 | + def assert_members_cb(self, index, old, new): |
367 | + yield self.wait_for_cb(index) |
368 | + for actual, expected in zip(self.results[index], (old, new)): |
369 | + for actual_name, expected_unit in zip(actual, expected): |
370 | + self.test.assertEquals(actual_name, expected_unit.unit_name) |
371 | + |
372 | + @inlineCallbacks |
373 | + def assert_settings_cb(self, index, unit, version): |
374 | + yield self.wait_for_cb(index) |
375 | + change = self.results[index] |
376 | + self.test.assertEquals(change, (unit.unit_name, version)) |
377 | + |
378 | + |
379 | class UnitRelationStateTest(RelationTestBase): |
380 | |
381 | - def verify_unit_watch_result( |
382 | - self, result, old_units=None, new_units=None, modified=None): |
383 | - old_actual, new_actual, modified_actual = result |
384 | - for value, expected in zip(result, [old_units, new_units, modified]): |
385 | - if expected is None: |
386 | - self.assertFalse(value) |
387 | - continue |
388 | - for value_unit, expected_unit in zip(value, expected): |
389 | - if isinstance(expected_unit, tuple): |
390 | - self.assertEqual( |
391 | - value_unit[0], expected_unit[0].unit_name) |
392 | - self.assertEqual(value_unit[1], expected_unit[1]) |
393 | - else: |
394 | - self.assertEqual(value_unit, expected_unit.unit_name) |
395 | - |
396 | @inlineCallbacks |
397 | def test_properties(self): |
398 | states = yield self.add_relation_service_unit("webcache", "varnish") |
399 | @@ -822,36 +871,29 @@ |
400 | |
401 | yield self.add_opposite_service_unit(wordpress_states) |
402 | |
403 | - results = [] |
404 | - wait_callback = [Deferred() for i in range(5)] |
405 | - |
406 | - def watch_related(old_units=None, new_units=None, modified=None): |
407 | - results.append((old_units, new_units, modified)) |
408 | - wait_callback[len(results)-1].callback(True) |
409 | - |
410 | - def invoked(*args, **kw): |
411 | - # sleep to make sure that things haven't fired till the watch is |
412 | - # in place. |
413 | - time.sleep(0.1) |
414 | - self.assertFalse(results) |
415 | - |
416 | + checker = WatchChecker(self) |
417 | mock_client = self.mocker.patch(self.client) |
418 | mock_client.get_children_and_watch("/relations/%s/server" % ( |
419 | wordpress_states["relation"].internal_id)) |
420 | |
421 | + def invoked(*args, **kw): |
422 | + # sleep to make sure that things haven't fired till the watch is |
423 | + # in place. |
424 | + time.sleep(0.1) |
425 | + checker.assert_cb_count(0) |
426 | + |
427 | self.mocker.call(invoked) |
428 | self.mocker.passthrough() |
429 | self.mocker.replay() |
430 | |
431 | - watcher = yield wordpress_states["unit_relation"].watch_related_units( |
432 | - watch_related) |
433 | + watcher = yield checker.watch(wordpress_states["unit_relation"]) |
434 | yield watcher.start() |
435 | - yield wait_callback[0] |
436 | + yield checker.wait_for_cb(0) |
437 | |
438 | @inlineCallbacks |
439 | def test_watch_start_new_service(self): |
440 | """Invoking watcher.start returns a deferred that only fires |
441 | - after watch on the containr is in place. In the case of a new |
442 | + after watch on the container is in place. In the case of a new |
443 | service this after an existance watch is established on the |
444 | container. |
445 | """ |
446 | @@ -860,28 +902,23 @@ |
447 | wordpress_states = yield self.add_relation_service_unit_from_endpoints( |
448 | wordpress_ep) |
449 | |
450 | - results = [] |
451 | - |
452 | - def watch_related(old_units=None, new_units=None, modified=None): |
453 | - results.append((old_units, new_units, modified)) |
454 | - |
455 | - def invoked(*args, **kw): |
456 | - # sleep to make sure that things haven't fired till the watch is |
457 | - # in place. |
458 | - time.sleep(0.1) |
459 | - self.assertFalse(results) |
460 | - |
461 | + checker = WatchChecker(self) |
462 | mock_client = self.mocker.patch(self.client) |
463 | mock_client.exists_and_watch("/relations/%s/server" % ( |
464 | wordpress_states["relation"].internal_id)) |
465 | |
466 | + def invoked(*args, **kw): |
467 | + # sleep to make sure that things haven't fired till the watch is |
468 | + # in place. |
469 | + time.sleep(0.1) |
470 | + checker.assert_cb_count(0) |
471 | + |
472 | self.mocker.call(invoked) |
473 | self.mocker.passthrough() |
474 | self.mocker.replay() |
475 | |
476 | - watcher = yield wordpress_states["unit_relation"].watch_related_units( |
477 | - watch_related) |
478 | - yield watcher.start().addCallback(lambda x: results.append(True)) |
479 | + watcher = yield checker.watch(wordpress_states["unit_relation"]) |
480 | + yield watcher.start() |
481 | |
482 | @inlineCallbacks |
483 | def test_watch_client_server_with_new_service(self): |
484 | @@ -896,16 +933,8 @@ |
485 | wordpress_states = yield self.add_relation_service_unit_from_endpoints( |
486 | wordpress_ep, mysql_ep) |
487 | |
488 | - # setup watch callbacks, and start watching. |
489 | - wait_callback = [Deferred() for i in range(5)] |
490 | - results = [] |
491 | - |
492 | - def watch_related(old_units=None, new_units=None, modified=None): |
493 | - results.append((old_units, new_units, modified)) |
494 | - wait_callback[len(results) - 1].callback(True) |
495 | - |
496 | - watcher = yield wordpress_states["unit_relation"].watch_related_units( |
497 | - watch_related) |
498 | + checker = WatchChecker(self) |
499 | + watcher = yield checker.watch(wordpress_states["unit_relation"]) |
500 | yield watcher.start() |
501 | |
502 | # adding another unit of wordpress, does not cause any changes |
503 | @@ -915,13 +944,11 @@ |
504 | |
505 | # give chance for accidental watch firing. |
506 | yield self.sleep(0.1) |
507 | - |
508 | - # assert no firing of callback |
509 | - self.assertFalse(results) |
510 | + checker.assert_cb_count(0) |
511 | |
512 | # add the server service and a unit of that |
513 | - mysql_states = yield self.add_opposite_service_unit( |
514 | - wordpress_states) |
515 | + mysql_states = yield self.add_opposite_service_unit(wordpress_states) |
516 | + mysql_unit = mysql_states["unit"] |
517 | |
518 | topology = yield self.get_topology() |
519 | # assert the relation is established correctly |
520 | @@ -929,30 +956,19 @@ |
521 | wordpress_states["relation"].internal_id) |
522 | self.assertEqual(len(services), 2) |
523 | |
524 | - # wait for callback |
525 | - yield wait_callback[0] |
526 | - self.verify_unit_watch_result( |
527 | - results[0], old_units=[], new_units=[mysql_states["unit"]]) |
528 | - |
529 | - # wait for initial settings version |
530 | - yield wait_callback[1] |
531 | - self.verify_unit_watch_result( |
532 | - results[1], modified=((mysql_states["unit"], 0),)) |
533 | - |
534 | - # modify the unit and get another callback |
535 | + # wait for initial callbacks |
536 | + yield checker.assert_members_cb(0, [], [mysql_unit]) |
537 | + yield checker.assert_settings_cb(1, mysql_unit, 0) |
538 | + |
539 | + # modify unit, wait for callback |
540 | yield mysql_states["unit_relation"].set_data(dict(hello="world")) |
541 | - yield wait_callback[2] |
542 | - self.verify_unit_watch_result( |
543 | - results[2], modified=((mysql_states["unit"], 1),)) |
544 | + yield checker.assert_settings_cb(2, mysql_unit, 1) |
545 | |
546 | - # add another unit of mysql |
547 | + # add another unit, wait for callback |
548 | mysql_unit2 = yield mysql_states["service"].add_unit_state() |
549 | yield mysql_states["service_relation"].add_unit_state(mysql_unit2) |
550 | - yield wait_callback[3] |
551 | - self.verify_unit_watch_result( |
552 | - results[3], |
553 | - old_units=[mysql_states["unit"]], |
554 | - new_units=[mysql_states["unit"], mysql_unit2]) |
555 | + yield checker.assert_members_cb( |
556 | + 3, [mysql_unit], [mysql_unit, mysql_unit2]) |
557 | |
558 | @inlineCallbacks |
559 | def test_watch_client_server_with_existing_service(self): |
560 | @@ -969,34 +985,15 @@ |
561 | wordpress_ep, mysql_ep) |
562 | mysql_states = yield self.add_opposite_service_unit( |
563 | wordpress_states) |
564 | - |
565 | - # setup callbacks and start observing. |
566 | - wait_callback = [Deferred() for i in range(5)] |
567 | - results = [] |
568 | - |
569 | - def watch_related(old_units=None, new_units=None, modified=None): |
570 | - results.append((old_units, new_units, modified)) |
571 | - wait_callback[len(results)-1].callback(True) |
572 | - |
573 | - watcher = yield wordpress_states["unit_relation"].watch_related_units( |
574 | - watch_related) |
575 | + mysql_unit = mysql_states["unit"] |
576 | + |
577 | + checker = WatchChecker(self) |
578 | + watcher = yield checker.watch(wordpress_states["unit_relation"]) |
579 | yield watcher.start() |
580 | - |
581 | - # wait for initial callback |
582 | - yield wait_callback[0] |
583 | - self.verify_unit_watch_result( |
584 | - results[0], old_units=[], new_units=[mysql_states["unit"]]) |
585 | - |
586 | - # and initial settings version notification |
587 | - yield wait_callback[1] |
588 | - self.verify_unit_watch_result( |
589 | - results[1], modified=((mysql_states["unit"], 0),)) |
590 | - |
591 | - # modify the unit and get another callback, verify the result. |
592 | + yield checker.assert_members_cb(0, [], [mysql_unit]) |
593 | + yield checker.assert_settings_cb(1, mysql_unit, 0) |
594 | yield mysql_states["unit_relation"].set_data(dict(hello="world")) |
595 | - yield wait_callback[2] |
596 | - self.verify_unit_watch_result( |
597 | - results[2], modified=((mysql_states["unit"], 1),)) |
598 | + yield checker.assert_settings_cb(2, mysql_unit, 1) |
599 | |
600 | # directly delete the presence node to trigger a deletion notification |
601 | self.client.delete("/relations/%s/server/%s" % ( |
602 | @@ -1004,9 +1001,7 @@ |
603 | mysql_states["unit"].internal_id)) |
604 | |
605 | # verify the deletion result. |
606 | - yield wait_callback[3] |
607 | - self.verify_unit_watch_result( |
608 | - results[3], old_units=[mysql_states["unit"]], new_units=[]) |
609 | + yield checker.assert_members_cb(3, [mysql_unit], []) |
610 | |
611 | @inlineCallbacks |
612 | def test_watch_server_client_with_new_service(self): |
613 | @@ -1020,32 +1015,18 @@ |
614 | mysql_states = yield self.add_relation_service_unit_from_endpoints( |
615 | mysql_ep, wordpress_ep) |
616 | |
617 | - # setup callbacks and start observing. |
618 | - wait_callback = [Deferred() for i in range(5)] |
619 | - results = [] |
620 | - |
621 | - def watch_related(old_units=None, new_units=None, modified=None): |
622 | - results.append((old_units, new_units, modified)) |
623 | - wait_callback[len(results)-1].callback(True) |
624 | - |
625 | - watcher = yield mysql_states["unit_relation"].watch_related_units( |
626 | - watch_related) |
627 | + checker = WatchChecker(self) |
628 | + watcher = yield checker.watch(mysql_states["unit_relation"]) |
629 | yield watcher.start() |
630 | - |
631 | - # assert no firing of callback |
632 | - self.assertFalse(results) |
633 | + yield self.sleep(0.1) |
634 | + checker.assert_cb_count(0) |
635 | |
636 | # add the client service and a unit of that |
637 | wordpress_states = yield self.add_opposite_service_unit( |
638 | mysql_states) |
639 | - |
640 | - yield wait_callback[0] |
641 | - self.verify_unit_watch_result( |
642 | - results[0], old_units=[], new_units=[wordpress_states["unit"]]) |
643 | - |
644 | - yield wait_callback[1] |
645 | - self.verify_unit_watch_result( |
646 | - results[1], modified=((wordpress_states["unit"], 0),)) |
647 | + wordpress_unit = wordpress_states["unit"] |
648 | + yield checker.assert_members_cb(0, [], [wordpress_unit]) |
649 | + yield checker.assert_settings_cb(1, wordpress_unit, 0) |
650 | |
651 | @inlineCallbacks |
652 | def test_watch_peer(self): |
653 | @@ -1059,58 +1040,37 @@ |
654 | riak2_unit = yield riak_states["service"].add_unit_state() |
655 | yield riak_states["service_relation"].add_unit_state(riak2_unit) |
656 | |
657 | - # setup callbacks and start observing. |
658 | - wait_callback = [Deferred() for i in range(10)] |
659 | - results = [] |
660 | - |
661 | - def watch_related(old_units=None, new_units=None, modified=None): |
662 | - results.append((old_units, new_units, modified)) |
663 | - wait_callback[len(results)-1].callback(True) |
664 | - |
665 | - watcher = yield riak_states["unit_relation"].watch_related_units( |
666 | - watch_related) |
667 | + checker = WatchChecker(self) |
668 | + watcher = yield checker.watch(riak_states["unit_relation"]) |
669 | yield watcher.start() |
670 | |
671 | # wait for initial callbacks |
672 | - yield wait_callback[0] |
673 | - self.verify_unit_watch_result( |
674 | - results[0], old_units=[], new_units=[riak2_unit]) |
675 | - yield wait_callback[1] |
676 | - self.verify_unit_watch_result( |
677 | - results[1], modified=((riak2_unit, 0),)) |
678 | + yield checker.assert_members_cb(0, [], [riak2_unit]) |
679 | + yield checker.assert_settings_cb(1, riak2_unit, 0) |
680 | |
681 | # verify modifying self does not cause a notification. |
682 | yield riak_states["unit_relation"].set_data(dict(hello="world")) |
683 | yield self.sleep(0.1) |
684 | - self.assertEqual(len(results), 2) |
685 | + checker.assert_cb_count(2) |
686 | |
687 | # add another unit |
688 | riak3_unit = yield riak_states["service"].add_unit_state() |
689 | riak3_relation = yield riak_states["service_relation"].add_unit_state( |
690 | riak3_unit) |
691 | - |
692 | - yield wait_callback[2] |
693 | - self.verify_unit_watch_result( |
694 | - results[2], |
695 | - old_units=[riak2_unit], new_units=[riak2_unit, riak3_unit]) |
696 | - yield wait_callback[3] |
697 | - self.verify_unit_watch_result( |
698 | - results[3], modified=((riak3_unit, 0),)) |
699 | - |
700 | - # add remove one (no api atm, so directly to trigger notification) |
701 | + yield checker.assert_members_cb( |
702 | + 2, [riak2_unit], [riak2_unit, riak3_unit]) |
703 | + yield checker.assert_settings_cb(3, riak3_unit, 0) |
704 | + |
705 | + # remove one (no api atm, so directly to trigger notification) |
706 | yield self.client.delete( |
707 | "/relations/%s/peer/%s" % (riak_states["relation"].internal_id, |
708 | riak2_unit.internal_id)) |
709 | - yield wait_callback[4] |
710 | - self.verify_unit_watch_result( |
711 | - results[4], |
712 | - old_units=[riak2_unit, riak3_unit], new_units=[riak3_unit]) |
713 | + yield checker.assert_members_cb( |
714 | + 4, [riak2_unit, riak3_unit], [riak3_unit]) |
715 | |
716 | # modify one. |
717 | yield riak3_relation.set_data(dict(later="eventually")) |
718 | - yield wait_callback[5] |
719 | - self.verify_unit_watch_result( |
720 | - results[5], modified=((riak3_unit, 1),)) |
721 | + yield checker.assert_settings_cb(5, riak3_unit, 1) |
722 | |
723 | @inlineCallbacks |
724 | def test_watch_role_container_created_concurrently(self): |
725 | @@ -1126,6 +1086,7 @@ |
726 | "mysql", "client-server", "", "server") |
727 | wordpress_states = yield self.add_relation_service_unit_from_endpoints( |
728 | wordpress_ep, mysql_ep) |
729 | + wordpress_unit = wordpress_states["unit"] |
730 | mysql_states = yield self.add_opposite_service_unit( |
731 | wordpress_states) |
732 | |
733 | @@ -1146,23 +1107,10 @@ |
734 | |
735 | self.mocker.replay() |
736 | |
737 | - # setup callbacks and start observing. |
738 | - wait_callback = [Deferred() for i in range(5)] |
739 | - results = [] |
740 | - |
741 | - def watch_related(old_units=None, new_units=None, modified=None): |
742 | - results.append((old_units, new_units, modified)) |
743 | - wait_callback[len(results)-1].callback(True) |
744 | - |
745 | - # Watch |
746 | - watcher = yield mysql_states["unit_relation"].watch_related_units( |
747 | - watch_related) |
748 | + checker = WatchChecker(self) |
749 | + watcher = yield checker.watch(mysql_states["unit_relation"]) |
750 | yield watcher.start() |
751 | - |
752 | - # Verify |
753 | - yield wait_callback[0] |
754 | - self.verify_unit_watch_result( |
755 | - results[0], old_units=[], new_units=[wordpress_states["unit"]]) |
756 | + yield checker.assert_members_cb(0, [], [wordpress_unit]) |
757 | |
758 | @inlineCallbacks |
759 | def test_watch_deleted_modify_notifications(self): |
760 | @@ -1178,48 +1126,31 @@ |
761 | "mysql", "client-server", "", "server") |
762 | wordpress_states = yield self.add_relation_service_unit_from_endpoints( |
763 | wordpress_ep, mysql_ep) |
764 | + wordpress_unit = wordpress_states["unit"] |
765 | mysql_states = yield self.add_opposite_service_unit( |
766 | wordpress_states) |
767 | |
768 | - # setup callbacks and start observing. |
769 | - wait_callback = [Deferred() for i in range(5)] |
770 | - results = [] |
771 | - |
772 | - def watch_related(old_units=None, new_units=None, modified=None): |
773 | - results.append((old_units, new_units, modified)) |
774 | - wait_callback[len(results)-1].callback(True) |
775 | - |
776 | # Start watching |
777 | - watcher = yield mysql_states["unit_relation"].watch_related_units( |
778 | - watch_related) |
779 | + checker = WatchChecker(self) |
780 | + watcher = yield checker.watch(mysql_states["unit_relation"]) |
781 | yield watcher.start() |
782 | - |
783 | - yield wait_callback[0] |
784 | - self.verify_unit_watch_result( |
785 | - results[0], old_units=[], new_units=[wordpress_states["unit"]]) |
786 | - |
787 | - yield wait_callback[1] |
788 | - self.verify_unit_watch_result( |
789 | - results[1], modified=((wordpress_states["unit"], 0),)) |
790 | + yield checker.assert_members_cb(0, [], [wordpress_unit]) |
791 | + yield checker.assert_settings_cb(1, wordpress_unit, 0) |
792 | |
793 | # Delete the presence path |
794 | presence_path = "/relations/%s/client/%s" % ( |
795 | wordpress_states["relation"].internal_id, |
796 | wordpress_states["unit"].internal_id) |
797 | yield self.client.delete(presence_path) |
798 | + yield checker.assert_members_cb(2, [wordpress_unit], []) |
799 | |
800 | # Modify the settings path |
801 | settings_path = self.get_unit_settings_path(wordpress_states) |
802 | yield self.client.set(settings_path, "some random string") |
803 | |
804 | - # Verify only deletion callback |
805 | - yield wait_callback[2] |
806 | - self.verify_unit_watch_result( |
807 | - results[2], old_units=[wordpress_states["unit"]], new_units=[]) |
808 | - |
809 | # Give a moment to ensure we don't see any new callbacks |
810 | yield self.sleep(0.1) |
811 | - self.assertEquals(len(results), 3) |
812 | + checker.assert_cb_count(3) |
813 | |
814 | @inlineCallbacks |
815 | def test_watch_with_settings_deleted(self): |
816 | @@ -1236,29 +1167,15 @@ |
817 | "mysql", "client-server", "", "server") |
818 | wordpress_states = yield self.add_relation_service_unit_from_endpoints( |
819 | wordpress_ep, mysql_ep) |
820 | + wordpress_unit = wordpress_states["unit"] |
821 | mysql_states = yield self.add_opposite_service_unit( |
822 | wordpress_states) |
823 | |
824 | - # setup callbacks and start observing. |
825 | - wait_callback = [Deferred() for i in range(5)] |
826 | - results = [] |
827 | - |
828 | - def watch_related(old_units=None, new_units=None, modified=None): |
829 | - results.append((old_units, new_units, modified)) |
830 | - wait_callback[len(results)-1].callback(True) |
831 | - |
832 | - # Start watching |
833 | - watcher = yield mysql_states["unit_relation"].watch_related_units( |
834 | - watch_related) |
835 | + checker = WatchChecker(self) |
836 | + watcher = yield checker.watch(mysql_states["unit_relation"]) |
837 | yield watcher.start() |
838 | - |
839 | - yield wait_callback[0] |
840 | - self.verify_unit_watch_result( |
841 | - results[0], old_units=[], new_units=[wordpress_states["unit"]]) |
842 | - |
843 | - yield wait_callback[1] |
844 | - self.verify_unit_watch_result( |
845 | - results[1], modified=((wordpress_states["unit"], 0),)) |
846 | + yield checker.assert_members_cb(0, [], [wordpress_unit]) |
847 | + yield checker.assert_settings_cb(1, wordpress_unit, 0) |
848 | |
849 | # Delete the settings path |
850 | settings_path = "/relations/%s/settings/%s" % ( |
851 | @@ -1268,7 +1185,7 @@ |
852 | |
853 | # Verify no callbacks |
854 | yield self.sleep(0.1) |
855 | - self.assertEqual(len(results), 2) |
856 | + checker.assert_cb_count(2) |
857 | |
858 | # Recreate the settings path; this should trigger a callback. |
859 | # Note that this is not likely to happen in reality, and if it does |
860 | @@ -1276,15 +1193,11 @@ |
861 | # to 0, and HookScheduler depends on that value continuing to increase |
862 | # so it can determine whether changes happened while it was inactive. |
863 | yield self.client.create(settings_path, "abc") |
864 | - yield wait_callback[2] |
865 | - self.verify_unit_watch_result( |
866 | - results[2], modified=((wordpress_states["unit"], 0),)) |
867 | + yield checker.assert_settings_cb(2, wordpress_unit, 0) |
868 | |
869 | # And modify it. |
870 | yield self.client.set(settings_path, "123") |
871 | - yield wait_callback[3] |
872 | - self.verify_unit_watch_result( |
873 | - results[3], modified=((wordpress_states["unit"], 1),)) |
874 | + yield checker.assert_settings_cb(3, wordpress_unit, 1) |
875 | |
876 | @inlineCallbacks |
877 | def test_watch_start_stop_start_with_existing_service(self): |
878 | @@ -1304,28 +1217,15 @@ |
879 | "mysql", "client-server", "", "server") |
880 | wordpress_states = yield self.add_relation_service_unit_from_endpoints( |
881 | wordpress_ep, mysql_ep) |
882 | + wordpress_unit = wordpress_states["unit"] |
883 | mysql_states = yield self.add_opposite_service_unit( |
884 | wordpress_states) |
885 | |
886 | - # setup callbacks and start observing. |
887 | - wait_callback = [Deferred() for i in range(5)] |
888 | - results = [] |
889 | - |
890 | - def watch_related(old_units=None, new_units=None, modified=None): |
891 | - results.append((old_units, new_units, modified)) |
892 | - wait_callback[len(results)-1].callback(True) |
893 | - |
894 | - # Start watching |
895 | - watcher = yield mysql_states["unit_relation"].watch_related_units( |
896 | - watch_related) |
897 | + checker = WatchChecker(self) |
898 | + watcher = yield checker.watch(mysql_states["unit_relation"]) |
899 | yield watcher.start() |
900 | - |
901 | - yield wait_callback[0] |
902 | - self.verify_unit_watch_result( |
903 | - results[0], old_units=[], new_units=[wordpress_states["unit"]]) |
904 | - yield wait_callback[1] |
905 | - self.verify_unit_watch_result( |
906 | - results[1], modified=((wordpress_states["unit"], 0),)) |
907 | + yield checker.assert_members_cb(0, [], [wordpress_unit]) |
908 | + yield checker.assert_settings_cb(1, wordpress_unit, 0) |
909 | |
910 | # Stop watching |
911 | watcher.stop() |
912 | @@ -1333,26 +1233,20 @@ |
913 | # Add a new unit |
914 | wordpress2_states = yield self.add_related_service_unit( |
915 | wordpress_states) |
916 | + wordpress2_unit = wordpress2_states["unit"] |
917 | |
918 | - # Modify a unit |
919 | + # Modify a unit (this change will not be detected, ever) |
920 | yield wordpress_states["unit_relation"].set_data(dict(hello="world")) |
921 | |
922 | # Verify no callbacks |
923 | yield self.sleep(0.1) |
924 | - self.assertEqual(len(results), 2) |
925 | + checker.assert_cb_count(2) |
926 | |
927 | - # Start watching |
928 | + # Start watching again; watch for addition |
929 | yield watcher.start() |
930 | - |
931 | - # Verify we see the addition. |
932 | - yield wait_callback[2] |
933 | - self.verify_unit_watch_result( |
934 | - results[2], |
935 | - old_units=[wordpress_states["unit"]], |
936 | - new_units=[wordpress_states["unit"], wordpress2_states["unit"]]) |
937 | - yield wait_callback[3] |
938 | - self.verify_unit_watch_result( |
939 | - results[3], modified=((wordpress2_states["unit"], 0),)) |
940 | + yield checker.assert_members_cb( |
941 | + 2, [wordpress_unit], [wordpress_unit, wordpress2_unit]) |
942 | + yield checker.assert_settings_cb(3, wordpress2_unit, 0) |
943 | |
944 | @inlineCallbacks |
945 | def test_watch_start_stop_start_with_new_service(self): |
946 | @@ -1373,48 +1267,37 @@ |
947 | mysql_states = yield self.add_relation_service_unit_from_endpoints( |
948 | mysql_ep, wordpress_ep) |
949 | |
950 | - # setup callbacks and start observing. |
951 | - wait_callback = [Deferred() for i in range(5)] |
952 | - results = [] |
953 | - |
954 | - def watch_related(old_units=None, new_units=None, modified=None): |
955 | - results.append((old_units, new_units, modified)) |
956 | - wait_callback[len(results)-1].callback(True) |
957 | - |
958 | - # Start watching |
959 | - watcher = yield mysql_states["unit_relation"].watch_related_units( |
960 | - watch_related) |
961 | + checker = WatchChecker(self) |
962 | + watcher = yield checker.watch(mysql_states["unit_relation"]) |
963 | yield watcher.start() |
964 | - |
965 | - # Stop watching |
966 | watcher.stop() |
967 | |
968 | - # Add the new service and a unit |
969 | + # Add the new service and 2 units |
970 | wordpress_states = yield self.add_opposite_service_unit( |
971 | mysql_states) |
972 | - |
973 | - # Add another unit |
974 | wordpress2_states = yield self.add_related_service_unit( |
975 | wordpress_states) |
976 | + wordpress_unit = wordpress_states["unit"] |
977 | + wordpress2_unit = wordpress2_states["unit"] |
978 | |
979 | # Modify a unit |
980 | yield wordpress_states["unit_relation"].set_data(dict(hello="world")) |
981 | |
982 | # Verify no callbacks |
983 | yield self.sleep(0.1) |
984 | - self.assertEqual(len(results), 0) |
985 | + checker.assert_cb_count(0) |
986 | |
987 | # Start watching |
988 | yield watcher.start() |
989 | - |
990 | - # Wait a moment for callback |
991 | - yield wait_callback[0] |
992 | - |
993 | - self.assertEqual(len(results), 1) |
994 | - self.verify_unit_watch_result( |
995 | - results[0], |
996 | - old_units=[], |
997 | - new_units=[wordpress_states["unit"], wordpress2_states["unit"]]) |
998 | + yield checker.assert_members_cb( |
999 | + 0, [], [wordpress_unit, wordpress2_unit]) |
1000 | + |
1001 | + # We expect a settings callback for each... |
1002 | + yield checker.wait_for_cb(2) |
1003 | + |
1004 | + # ...but only one |
1005 | + yield self.sleep(0.1) |
1006 | + checker.assert_cb_count(3) |
1007 | |
1008 | @inlineCallbacks |
1009 | def test_watch_user_callback_invocation_delays_node_watch(self): |
1010 | @@ -1430,18 +1313,8 @@ |
1011 | riak_states = yield self.add_relation_service_unit( |
1012 | "riak", "riak", "kvstore", "peer") |
1013 | |
1014 | - wait_callback = [Deferred() for i in range(10)] |
1015 | - finish_callback = [Deferred() for i in range(10)] |
1016 | - |
1017 | - results = [] |
1018 | - |
1019 | - def watch_related(old_units=None, new_units=None, modified=None): |
1020 | - results.append((old_units, new_units, modified)) |
1021 | - wait_callback[len(results)-1].callback(True) |
1022 | - return finish_callback[len(results)-1] |
1023 | - |
1024 | - watcher = yield riak_states["unit_relation"].watch_related_units( |
1025 | - watch_related) |
1026 | + checker = WatchChecker(self, block_cbs=True) |
1027 | + watcher = yield checker.watch(riak_states["unit_relation"]) |
1028 | yield watcher.start() |
1029 | |
1030 | # Create a new unit and add it to the relation. |
1031 | @@ -1450,60 +1323,53 @@ |
1032 | riak_unit2) |
1033 | |
1034 | # Wait for it |
1035 | - yield wait_callback[0] |
1036 | - self.verify_unit_watch_result( |
1037 | - results[0], old_units=[], new_units=[riak_unit2]) |
1038 | + yield checker.assert_members_cb(0, [], [riak_unit2]) |
1039 | |
1040 | # We are also expecting a notification for the initial settings version |
1041 | # ...but we won't get that until the first callback is done |
1042 | self.sleep(0.1) |
1043 | - self.assertEquals(len(results), 1) |
1044 | + checker.assert_cb_count(1) |
1045 | |
1046 | # While we wait for this, someone modifies the settings |
1047 | yield riak_unit2_rel.set_data(dict(hello="world")) |
1048 | |
1049 | # Hey, the add callback finished! |
1050 | - finish_callback[0].callback(True) |
1051 | + checker.unblock_cb(0) |
1052 | |
1053 | # OK, now we expect to see the initial setting version callback |
1054 | - yield wait_callback[1] |
1055 | - self.verify_unit_watch_result( |
1056 | - results[1], modified=((riak_unit2, 0),)) |
1057 | + yield checker.assert_settings_cb(1, riak_unit2, 0) |
1058 | |
1059 | # ...but that is also taking a long time, so we shouldn't expect to see |
1060 | # the callback for the explicit modification yet... |
1061 | - self.sleep(0.1) |
1062 | - self.assertEquals(len(results), 2) |
1063 | + yield self.sleep(0.1) |
1064 | + checker.assert_cb_count(2) |
1065 | |
1066 | # ...or, in fact, for this other modification that just happened, which |
1067 | # will be collapsed into the other one... |
1068 | yield riak_unit2_rel.set_data(dict(hello="world 2")) |
1069 | - self.sleep(0.1) |
1070 | - self.assertEquals(len(results), 2) |
1071 | + yield self.sleep(0.1) |
1072 | + checker.assert_cb_count(2) |
1073 | |
1074 | # ...so, we should have 1 callback in progress, and only 1 pending |
1075 | # notification. OK, finish the callback... |
1076 | - finish_callback[1].callback(True) |
1077 | + checker.unblock_cb(1) |
1078 | |
1079 | # ...and wait for the change notification. |
1080 | - yield wait_callback[2] |
1081 | - self.assertEqual(len(results), 3) |
1082 | - self.verify_unit_watch_result( |
1083 | - results[2], modified=((riak_unit2, 2),)) |
1084 | + yield checker.assert_settings_cb(2, riak_unit2, 2) |
1085 | |
1086 | # Finish the callback and verify no other invocations. |
1087 | - finish_callback[2].callback(True) |
1088 | + checker.unblock_cb(2) |
1089 | yield self.sleep(0.1) |
1090 | - self.assertEqual(len(results), 3) |
1091 | + checker.assert_cb_count(3) |
1092 | |
1093 | - # Modify the node again, we should see this change, immediately. |
1094 | + # Modify the node again; we should see this change immediately. |
1095 | yield riak_unit2_rel.set_data(dict(hello="goodbye")) |
1096 | - yield wait_callback[3] |
1097 | - finish_callback[3].callback(True) |
1098 | + yield checker.assert_settings_cb(3, riak_unit2, 3) |
1099 | |
1100 | - self.verify_unit_watch_result( |
1101 | - results[3], modified=((riak_unit2, 3),)) |
1102 | - self.assertEqual(len(results), 4) |
1103 | + # And again, finish the callback and verify no other invocations. |
1104 | + checker.unblock_cb(3) |
1105 | + yield self.sleep(0.1) |
1106 | + checker.assert_cb_count(4) |
1107 | |
1108 | node_path = "/relations/relation-0000000000/settings/unit-0000000001" |
1109 | expected_output = ( |
1110 | @@ -1527,18 +1393,8 @@ |
1111 | riak_states = yield self.add_relation_service_unit( |
1112 | "riak", "riak", "kvstore", "peer") |
1113 | |
1114 | - wait_callback = [Deferred() for i in range(10)] |
1115 | - finish_callback = [Deferred() for i in range(10)] |
1116 | - |
1117 | - results = [] |
1118 | - |
1119 | - def watch_related(old_units=None, new_units=None, modified=None): |
1120 | - results.append((old_units, new_units, modified)) |
1121 | - wait_callback[len(results)-1].callback(True) |
1122 | - return finish_callback[len(results)-1] |
1123 | - |
1124 | - watcher = yield riak_states["unit_relation"].watch_related_units( |
1125 | - watch_related) |
1126 | + checker = WatchChecker(self, block_cbs=True) |
1127 | + watcher = yield checker.watch(riak_states["unit_relation"]) |
1128 | yield watcher.start() |
1129 | |
1130 | # Create a new unit and add it to the relation. |
1131 | @@ -1547,9 +1403,7 @@ |
1132 | riak_unit2) |
1133 | |
1134 | # Wait for it |
1135 | - yield wait_callback[0] |
1136 | - self.verify_unit_watch_result( |
1137 | - results[0], old_units=[], new_units=[riak_unit2]) |
1138 | + yield checker.assert_members_cb(0, [], [riak_unit2]) |
1139 | |
1140 | # Now add a new unit: we won't see it immediately, since the callback |
1141 | # is still executing, but we will have a container change pending |
1142 | @@ -1557,12 +1411,10 @@ |
1143 | yield riak_states["service_relation"].add_unit_state( |
1144 | riak_unit3) |
1145 | |
1146 | - # Finish the first callback, immediately hit the settings version |
1147 | + # Finish the first callback; immediately hit the settings version |
1148 | # callback, which will also take a while |
1149 | - finish_callback[0].callback(True) |
1150 | - yield wait_callback[1] |
1151 | - self.verify_unit_watch_result( |
1152 | - results[1], modified=((riak_unit2, 0),)) |
1153 | + checker.unblock_cb(0) |
1154 | + yield checker.assert_settings_cb(1, riak_unit2, 0) |
1155 | |
1156 | # Adding another unit; will be rolled into the container change we're |
1157 | # already expecting from before |
1158 | @@ -1572,12 +1424,9 @@ |
1159 | |
1160 | # Now release the container callback, and verify the callback |
1161 | # for both the new nodes. |
1162 | - finish_callback[1].callback(True) |
1163 | - yield wait_callback[2] |
1164 | - self.verify_unit_watch_result( |
1165 | - results[2], |
1166 | - old_units=[riak_unit2], |
1167 | - new_units=[riak_unit2, riak_unit3, riak_unit4]) |
1168 | + checker.unblock_cb(1) |
1169 | + yield checker.assert_members_cb( |
1170 | + 2, [riak_unit2], [riak_unit2, riak_unit3, riak_unit4]) |
1171 | |
1172 | @inlineCallbacks |
1173 | def test_watch_concurrent_callback_execution(self): |
1174 | @@ -1585,35 +1434,6 @@ |
1175 | |
1176 | IFF they are not synchronous and not on the same node. |
1177 | """ |
1178 | - #Setup parallel callbacks bookeeping and implementation |
1179 | - wait_callback = dict([(i, Deferred()) for i in range(10)]) |
1180 | - results = {} |
1181 | - success_deferrals = { |
1182 | - 2: Deferred(), |
1183 | - 5: Deferred()} |
1184 | - |
1185 | - @inlineCallbacks |
1186 | - def watch_related( |
1187 | - old_units=None, new_units=None, modified=None, callback_id=None): |
1188 | - success_result = success_deferrals.get(callback_id) |
1189 | - if success_result is not None: |
1190 | - yield success_result |
1191 | - results[callback_id] = (old_units, new_units, modified) |
1192 | - wait_callback[callback_id].callback(True) |
1193 | - |
1194 | - class ParallelWatcherCallback(object): |
1195 | - |
1196 | - def __init__(self): |
1197 | - self.callback_id_sequence = 0 |
1198 | - |
1199 | - def __call__(self, *args, **kw): |
1200 | - callback = functools.partial( |
1201 | - watch_related, callback_id=self.callback_id_sequence) |
1202 | - self.callback_id_sequence += 1 |
1203 | - return callback(*args, **kw) |
1204 | - |
1205 | - watcher_callback = ParallelWatcherCallback() |
1206 | - |
1207 | # Add the relation, services, and related units. |
1208 | wordpress_ep = RelationEndpoint( |
1209 | "wordpress", "client-server", "", "client") |
1210 | @@ -1621,56 +1441,74 @@ |
1211 | "mysql", "client-server", "", "server") |
1212 | wordpress_states = yield self.add_relation_service_unit_from_endpoints( |
1213 | wordpress_ep, mysql_ep) |
1214 | + wordpress_unit = wordpress_states["unit"] |
1215 | mysql_states = yield self.add_opposite_service_unit( |
1216 | wordpress_states) |
1217 | |
1218 | - # Start watching, and give a moment to establish |
1219 | - watcher = yield mysql_states["unit_relation"].watch_related_units( |
1220 | - watcher_callback) |
1221 | + checker = WatchChecker(self, block_cbs=True) |
1222 | + # To verify parallel execution, this checker will make us wait for |
1223 | + # some of the callbacks (2 and 5), but leave the rest unimpeded. |
1224 | + for i in (0, 1, 3, 4, 6): |
1225 | + checker.unblock_cb(i) |
1226 | + |
1227 | + watcher = yield checker.watch(mysql_states["unit_relation"]) |
1228 | yield watcher.start() |
1229 | + yield checker.wait_for_cb(0) |
1230 | + yield checker.wait_for_cb(1) |
1231 | |
1232 | - # Modify a unit |
1233 | + # Modify a unit (blocking callback) |
1234 | yield wordpress_states["unit_relation"].set_data(dict(hello="world")) |
1235 | + yield checker.wait_for_cb(2) |
1236 | + |
1237 | + # Modify the unit again (will wait for previous) |
1238 | + yield wordpress_states["unit_relation"].set_data(dict(hello="world 2")) |
1239 | |
1240 | # Add a unit. |
1241 | wordpress2_states = yield self.add_related_service_unit( |
1242 | wordpress_states) |
1243 | + wordpress2_unit = wordpress2_states["unit"] |
1244 | + yield checker.wait_for_cb(3) |
1245 | + yield checker.wait_for_cb(4) |
1246 | |
1247 | - # Delete a unit |
1248 | + # Delete a unit (blocking callbck) |
1249 | presence_path = "/relations/%s/client/%s" % ( |
1250 | wordpress_states["relation"].internal_id, |
1251 | wordpress_states["unit"].internal_id) |
1252 | yield self.client.delete(presence_path) |
1253 | - |
1254 | - # Verify the parallel execution, give a moment for callbacks to finish. |
1255 | - yield self.sleep(0.1) |
1256 | - self.assertEqual(len(results), 4) |
1257 | - |
1258 | - self.verify_unit_watch_result( |
1259 | - results[0], old_units=[], new_units=[wordpress_states["unit"]]) |
1260 | - self.verify_unit_watch_result( |
1261 | - results[1], modified=((wordpress_states["unit"], 0),)) |
1262 | - self.verify_unit_watch_result( |
1263 | - results[3], |
1264 | - old_units=[wordpress_states["unit"]], |
1265 | - new_units=[wordpress_states["unit"], wordpress2_states["unit"]]) |
1266 | - self.verify_unit_watch_result( |
1267 | - results[4], modified=((wordpress2_states["unit"], 0),)) |
1268 | - |
1269 | - for k, v in success_deferrals.items(): |
1270 | - v.callback(True) |
1271 | - |
1272 | - # Verify all callback results, give a moment for them to finish. |
1273 | - yield wait_callback[2] |
1274 | - yield wait_callback[5] |
1275 | - self.assertEquals(len(results), 6) |
1276 | - |
1277 | - self.verify_unit_watch_result( |
1278 | - results[2], modified=((wordpress_states["unit"], 1),)) |
1279 | - self.verify_unit_watch_result( |
1280 | - results[5], |
1281 | - old_units=[wordpress_states["unit"], wordpress2_states["unit"]], |
1282 | - new_units=[wordpress2_states["unit"]]) |
1283 | + yield checker.wait_for_cb(5) |
1284 | + |
1285 | + # ...and delete the other unit (also blocked) |
1286 | + presence_path = "/relations/%s/client/%s" % ( |
1287 | + wordpress2_states["relation"].internal_id, |
1288 | + wordpress2_states["unit"].internal_id) |
1289 | + yield self.client.delete(presence_path) |
1290 | + |
1291 | + # Verify that all unblocked callbacks have started correctly |
1292 | + yield checker.assert_members_cb(0, [], [wordpress_unit]) |
1293 | + yield checker.assert_settings_cb(1, wordpress_unit, 0) |
1294 | + yield checker.assert_settings_cb(2, wordpress_unit, 1) |
1295 | + yield checker.assert_members_cb( |
1296 | + 3, [wordpress_unit], [wordpress_unit, wordpress2_unit]) |
1297 | + yield checker.assert_settings_cb(4, wordpress2_unit, 0) |
1298 | + yield checker.assert_members_cb( |
1299 | + 5, [wordpress_unit, wordpress2_unit], [wordpress2_unit]) |
1300 | + checker.assert_cb_count(6) |
1301 | + # OK, fine, but cbs 2 and 5 are still blocking. |
1302 | + |
1303 | + # Whoops, looks like the unit got deleted before we could notify the |
1304 | + # second settings change. Check nothing happens: |
1305 | + checker.unblock_cb(2) |
1306 | + yield self.sleep(0.1) |
1307 | + checker.assert_cb_count(6) |
1308 | + |
1309 | + # Now complete processing of the first delete, and check that we then |
1310 | + # *do* get notified of the second delete: |
1311 | + checker.unblock_cb(5) |
1312 | + yield checker.assert_members_cb(6, [wordpress2_unit], []) |
1313 | + |
1314 | + # ...and finally double-check no further callbacks: |
1315 | + yield self.sleep(0.1) |
1316 | + checker.assert_cb_count(7) |
1317 | |
1318 | @inlineCallbacks |
1319 | def test_watch_unknown_relation_role_error(self): |
1320 | @@ -1685,5 +1523,6 @@ |
1321 | self.fail("Should not be called") |
1322 | |
1323 | yield self.failUnlessFailure( |
1324 | - wordpress_states["unit_relation"].watch_related_units(not_called), |
1325 | + wordpress_states["unit_relation"].watch_related_units( |
1326 | + not_called, not_called), |
1327 | UnknownRelationRole) |
1328 | |
1329 | === modified file 'juju/unit/lifecycle.py' |
1330 | --- juju/unit/lifecycle.py 2011-12-05 02:11:54 +0000 |
1331 | +++ juju/unit/lifecycle.py 2011-12-05 02:11:55 +0000 |
1332 | @@ -436,7 +436,8 @@ |
1333 | # Create a watcher if we don't have one yet. |
1334 | if self._watcher is None: |
1335 | self._watcher = yield self._unit_relation.watch_related_units( |
1336 | - self._scheduler.notify_change) |
1337 | + self._scheduler.members_changed, |
1338 | + self._scheduler.settings_changed) |
1339 | # And start the watcher. |
1340 | if watches: |
1341 | yield self._watcher.start() |
rejecting due to abandonment of prereq