Merge lp:~hazmat/pyjuju/resolved-state-api into lp:pyjuju
- resolved-state-api
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Gustavo Niemeyer |
Approved revision: | 202 |
Merged at revision: | 222 |
Proposed branch: | lp:~hazmat/pyjuju/resolved-state-api |
Merge into: | lp:pyjuju |
Diff against target: |
792 lines (+537/-22) 9 files modified
ensemble/state/errors.py (+25/-0) ensemble/state/service.py (+194/-2) ensemble/state/tests/test_errors.py (+20/-1) ensemble/state/tests/test_service.py (+216/-2) ensemble/state/tests/test_utils.py (+26/-12) ensemble/state/utils.py (+18/-2) ensemble/unit/lifecycle.py (+1/-0) ensemble/unit/tests/test_workflow.py (+21/-1) ensemble/unit/workflow.py (+16/-2) |
To merge this branch: | bzr merge lp:~hazmat/pyjuju/resolved-state-api |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gustavo Niemeyer | Approve | ||
Review via email: mp+58579@code.launchpad.net |
Commit message
Description of the change
I'm not particularly thrilled with the method names, but this branch implements a get/set/del/watch api for unit resolved and unit relation resolved, to enable sending resolve requests to the unit agent.
Kapil Thangavelu (hazmat) wrote : | # |
Excerpts from Gustavo Niemeyer's message of Tue Apr 26 18:04:27 UTC 2011:
> Review: Needs Fixing
> Some comments:
>
>
> [1]
>
> +class ServiceUnitRela
> + """The unit has already been marked resolved.
>
> Documentation needs updating.
how so?
>
> [2]
>
> + def get_resolved(self):
> + """Get the value of the resolved flag if any.
>
> The result of this method isn't a flag.
>
fixed.
> [3]
>
> + def get_resolved(self):
> (...)
> + except zookeeper.
> + # We get to the same end state.
> + returnValue(None)
>
> This is a bit misleading. We're just reading, so there's no "end state" to be
> achieved. The existence of the file is actually what determines if resolved
> was requested or not.
>
fixed.
> [4]
>
> + unit_resolve_path = "/units/
>
> Given that this was used 5 times, it would be handy to have something like:
>
> @property
> def _unit_resolved_path
> return "/units/
>
> Then you can use self._unit_
>
done.
> [5]
>
> + rel_resolved_path = "/units/
>
> Our file paths so far are all dashed. It'd be good to keep the convention
> inside ZooKeeper too.
>
> May also be factored out into a property as per the point above.
done
>
> [6]
>
> + def set_relation_
> + """Mark a unit's relations as in need of being resolved.
>
> Doesn't this actually mean something like:
>
> "Mark the problem in the unit's relation as being resolved."
>
> IOW, it's not requesting for the problem to be resolved, but stating
> that it has been.
>
It is actually making a request for the problem to be solved, not solving the
problem directly, as there are behavioral aspects to transitions that need be
done inside of the unit agent.
> [7]
>
> + :param relation_map: A map of internal relation ids, to retry booleans.
>
> This feels like a non-ideal API for a couple of reasons:
>
> a) It's exposing internal ids rather than using the relation names as we have
> elsewhere.
>
> b) set_relation_
> resolved, and cache as unresolved.
>
> It's a bit hard to suggest something else without knowing exactly how you
> intend to use. Let's talk about it.
>
As per irc discussion, we've deferred a) to a latter time, there's a commit in the branch
which does switch the syntax to
set_relation_
but its then inconsistent with its get_resolved api which retrieves a dictionary of
relation ids : retry hook boolean
> [8]
>
> + d1_keys = set(d1.keys())
> + d2_keys = set(d2.keys())
> +
> + must_match = d1_keys.
>
> As a hint, this could be spelled as:
>
> must_match = set(d1)
fixed.
>
> [9]
>
> + running = workflow_state in ("up",)
>
> FWIW, this pattern feels slightly weird when there's a single value.
> workflow_state == "up" would be more straightforward.
>
> [10]
>
> + self.assert...
- 202. By Kapil Thangavelu
-
merge trunk and pep8ify it.
Gustavo Niemeyer (niemeyer) wrote : | # |
Looks good, +1. Just two follow ups on your questioning above:
> > [1]
> >
> > +class ServiceUnitRela
> > + """The unit has already been marked resolved.
> >
> > Documentation needs updating.
>
> how so?
s/unit/relation/
> > [6]
> >
> > + def set_relation_
> > + """Mark a unit's relations as in need of being resolved.
> >
> > Doesn't this actually mean something like:
> >
> > "Mark the problem in the unit's relation as being resolved."
> >
> > IOW, it's not requesting for the problem to be resolved, but stating
> > that it has been.
> >
>
> It is actually making a request for the problem to be solved, not solving the
> problem directly, as there are behavioral aspects to transitions that need be
> done inside of the unit agent.
By the time someone calls "resolved", the *problem* which caused the hook to fail
must already have been fixed (hence the 'd' in "resolved"). We're not fixing the
problem with that action, we're simply stating that it's fine to go ahead because
someone else hopefully already fixed it.
- 203. By Kapil Thangavelu
-
merge trunk conflict
- 204. By Kapil Thangavelu
-
merge trunk
- 205. By Kapil Thangavelu
-
document set_relation_
resolved api todo discussion - 206. By Kapil Thangavelu
-
doc string cleanups per review.
Preview Diff
1 | === modified file 'ensemble/state/errors.py' |
2 | --- ensemble/state/errors.py 2011-05-04 07:40:25 +0000 |
3 | +++ ensemble/state/errors.py 2011-05-05 16:20:19 +0000 |
4 | @@ -21,6 +21,7 @@ |
5 | """Exception value to denote watching should stop. |
6 | """ |
7 | |
8 | + |
9 | class StateNotFound(StateError): |
10 | """State not found. |
11 | |
12 | @@ -151,6 +152,30 @@ |
13 | self.unit_name |
14 | |
15 | |
16 | +class ServiceUnitResolvedAlreadyEnabled(StateError): |
17 | + """The unit has already been marked resolved. |
18 | + """ |
19 | + |
20 | + def __init__(self, unit_name): |
21 | + self.unit_name = unit_name |
22 | + |
23 | + def __str__(self): |
24 | + return "Service unit %r is already marked as resolved." % ( |
25 | + self.unit_name) |
26 | + |
27 | + |
28 | +class ServiceUnitRelationResolvedAlreadyEnabled(StateError): |
29 | + """The relation has already been marked resolved. |
30 | + """ |
31 | + |
32 | + def __init__(self, unit_name): |
33 | + self.unit_name = unit_name |
34 | + |
35 | + def __str__(self): |
36 | + return "Service unit %r already has relations marked as resolved." % ( |
37 | + self.unit_name) |
38 | + |
39 | + |
40 | class RelationAlreadyExists(StateError): |
41 | |
42 | def __init__(self, *endpoints): |
43 | |
44 | === modified file 'ensemble/state/service.py' |
45 | --- ensemble/state/service.py 2011-05-05 09:41:11 +0000 |
46 | +++ ensemble/state/service.py 2011-05-05 16:20:19 +0000 |
47 | @@ -15,11 +15,16 @@ |
48 | StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound, |
49 | ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse, |
50 | BadDescriptor, BadServiceStateName, NoUnusedMachines, |
51 | - ServiceUnitDebugAlreadyEnabled) |
52 | + ServiceUnitDebugAlreadyEnabled, ServiceUnitResolvedAlreadyEnabled, |
53 | + ServiceUnitRelationResolvedAlreadyEnabled) |
54 | from ensemble.state.formula import FormulaStateManager |
55 | from ensemble.state.relation import ServiceRelationState, RelationStateManager |
56 | from ensemble.state.machine import _public_machine_id, MachineState |
57 | -from ensemble.state.utils import remove_tree, YAMLState |
58 | + |
59 | +from ensemble.state.utils import remove_tree, dict_merge, YAMLState |
60 | + |
61 | +RETRY_HOOKS = 1000 |
62 | +NO_HOOKS = 1001 |
63 | |
64 | |
65 | class ServiceStateManager(StateBase): |
66 | @@ -877,6 +882,193 @@ |
67 | callback_d.addCallback( |
68 | lambda x: watch_d.addCallback(watcher) and x) |
69 | |
70 | + @property |
71 | + def _unit_resolve_path(self): |
72 | + return "/units/%s/resolved" % self.internal_id |
73 | + |
74 | + @inlineCallbacks |
75 | + def set_resolved(self, retry): |
76 | + """Mark the unit as in need of being resolved. |
77 | + |
78 | + :param retry: A boolean denoting if hooks should fire as a result |
79 | + of the retry. |
80 | + |
81 | + The resolved setting is set by the command line to inform |
82 | + a unit to attempt an retry transition from an error state. |
83 | + """ |
84 | + |
85 | + if not retry in (RETRY_HOOKS, NO_HOOKS): |
86 | + raise ValueError("invalid retry value %r" % retry) |
87 | + |
88 | + try: |
89 | + yield self._client.create( |
90 | + self._unit_resolve_path, yaml.safe_dump({"retry": retry})) |
91 | + except zookeeper.NodeExistsException: |
92 | + raise ServiceUnitResolvedAlreadyEnabled(self.unit_name) |
93 | + |
94 | + @inlineCallbacks |
95 | + def get_resolved(self): |
96 | + """Get the value of the resolved setting if any. |
97 | + |
98 | + The resolved setting is retrieved by the unit agent and if |
99 | + found instructs it to attempt an retry transition from an |
100 | + error state. |
101 | + """ |
102 | + try: |
103 | + content, stat = yield self._client.get(self._unit_resolve_path) |
104 | + except zookeeper.NoNodeException: |
105 | + # Return a default value. |
106 | + returnValue(None) |
107 | + returnValue(yaml.load(content)) |
108 | + |
109 | + @inlineCallbacks |
110 | + def clear_resolved(self): |
111 | + """Remove any resolved setting on the unit.""" |
112 | + try: |
113 | + yield self._client.delete(self._unit_resolve_path) |
114 | + except zookeeper.NoNodeException: |
115 | + # We get to the same end state. |
116 | + pass |
117 | + |
118 | + @inlineCallbacks |
119 | + def watch_resolved(self, callback): |
120 | + """Set a callback to be invoked when an unit is marked resolved. |
121 | + |
122 | + :param callback: The callback recieves a single parameter, the |
123 | + change event. The watcher always recieve an initial |
124 | + boolean value invocation denoting the existence of the |
125 | + resolved setting. Subsequent invocations will be with change |
126 | + events. |
127 | + """ |
128 | + @inlineCallbacks |
129 | + def watcher(change_event): |
130 | + if not self._client.connected: |
131 | + returnValue(None) |
132 | + exists_d, watch_d = self._client.exists_and_watch( |
133 | + self._unit_resolve_path) |
134 | + yield callback(change_event) |
135 | + watch_d.addCallback(watcher) |
136 | + |
137 | + exists_d, watch_d = self._client.exists_and_watch( |
138 | + self._unit_resolve_path) |
139 | + exists = yield exists_d |
140 | + |
141 | + # Setup the watch deferred callback after the user defined callback |
142 | + # has returned successfully from the existence invocation. |
143 | + callback_d = maybeDeferred(callback, bool(exists)) |
144 | + callback_d.addCallback( |
145 | + lambda x: watch_d.addCallback(watcher) and x) |
146 | + |
147 | + @property |
148 | + def _relation_resolved_path(self): |
149 | + return "/units/%s/relation-resolved" % self.internal_id |
150 | + |
151 | + @inlineCallbacks |
152 | + def set_relation_resolved(self, relation_map): |
153 | + """Mark a unit's relations as being resolved. |
154 | + |
155 | + The unit agent will watch this setting and unblock the unit, |
156 | + via manipulation of the unit workflow and lifecycle. |
157 | + |
158 | + :param relation_map: A map of internal relation ids, to retry hook |
159 | + values either ensemble.state.service.NO_HOOKS or |
160 | + RETRY_HOOKS. |
161 | + |
162 | + TODO: |
163 | + The api currently takes internal relation ids, this should be |
164 | + cleaned up with a refactor to state request protocol objects. |
165 | + Only public names should be exposed beyond the state api. |
166 | + |
167 | + There's an ongoing discussion on whether this needs to support |
168 | + retries. Currently it doesn't, and this could the arg to this |
169 | + method could just be a list of relations. Supporting retries |
170 | + would mean capturing enough information to retry the hook, and |
171 | + has reconciliation issues wrt to what's current at the time of |
172 | + re-execution. The existing hook scheduler automatically |
173 | + performs merges of redundant events. The retry could execute a |
174 | + relation change hook, for a remote unit that has already |
175 | + departed at the time of re-execution (and for which we have |
176 | + a pending hook execution), which would be inconsistent, wrt |
177 | + to what would be exposed via the hook cli api. With support |
178 | + for on disk persistence and recovery, some of this temporal |
179 | + synchronization would already be in place. |
180 | + """ |
181 | + if not isinstance(relation_map, dict): |
182 | + raise ValueError( |
183 | + "Relation map must be a dictionary %r" % relation_map) |
184 | + |
185 | + if [v for v in relation_map.values() if v not in ( |
186 | + RETRY_HOOKS, NO_HOOKS)]: |
187 | + |
188 | + print relation_map |
189 | + raise ValueError("Invalid setting for retry hook") |
190 | + |
191 | + def update_relation_resolved(content, stat): |
192 | + if not content: |
193 | + return yaml.safe_dump(relation_map) |
194 | + |
195 | + content = yaml.safe_dump( |
196 | + dict_merge(yaml.load(content), relation_map)) |
197 | + return content |
198 | + |
199 | + try: |
200 | + yield retry_change( |
201 | + self._client, |
202 | + self._relation_resolved_path, |
203 | + update_relation_resolved) |
204 | + except StateChanged: |
205 | + raise ServiceUnitRelationResolvedAlreadyEnabled(self.unit_name) |
206 | + returnValue(True) |
207 | + |
208 | + @inlineCallbacks |
209 | + def get_relation_resolved(self): |
210 | + """Retrieve any resolved flags set for this unit's relations. |
211 | + """ |
212 | + try: |
213 | + content, stat = yield self._client.get( |
214 | + self._relation_resolved_path) |
215 | + except zookeeper.NoNodeException: |
216 | + returnValue(None) |
217 | + returnValue(yaml.load(content)) |
218 | + |
219 | + @inlineCallbacks |
220 | + def clear_relation_resolved(self): |
221 | + try: |
222 | + yield self._client.delete(self._relation_resolved_path) |
223 | + except zookeeper.NoNodeException: |
224 | + # We get to the same end state. |
225 | + pass |
226 | + |
227 | + @inlineCallbacks |
228 | + def watch_relation_resolved(self, callback): |
229 | + """Set a callback to be invoked when a unit's relations are resolved. |
230 | + |
231 | + :param callback: The callback recieves a single parameter, the |
232 | + change event. The watcher always recieve an initial |
233 | + boolean value invocation denoting the existence of the |
234 | + resolved setting. Subsequent invocations will be with change |
235 | + events. |
236 | + """ |
237 | + @inlineCallbacks |
238 | + def watcher(change_event): |
239 | + if not self._client.connected: |
240 | + returnValue(None) |
241 | + exists_d, watch_d = self._client.exists_and_watch( |
242 | + self._relation_resolved_path) |
243 | + yield callback(change_event) |
244 | + watch_d.addCallback(watcher) |
245 | + |
246 | + exists_d, watch_d = self._client.exists_and_watch( |
247 | + self._relation_resolved_path) |
248 | + |
249 | + exists = yield exists_d |
250 | + |
251 | + # Setup the watch deferred callback after the user defined callback |
252 | + # has returned successfully from the existence invocation. |
253 | + callback_d = maybeDeferred(callback, bool(exists)) |
254 | + callback_d.addCallback( |
255 | + lambda x: watch_d.addCallback(watcher) and x) |
256 | + |
257 | |
258 | def _parse_unit_name(unit_name): |
259 | """Parse a unit's name into the service name and its sequence. |
260 | |
261 | === modified file 'ensemble/state/tests/test_errors.py' |
262 | --- ensemble/state/tests/test_errors.py 2011-02-23 17:47:46 +0000 |
263 | +++ ensemble/state/tests/test_errors.py 2011-05-05 16:20:19 +0000 |
264 | @@ -11,7 +11,9 @@ |
265 | UnitRelationStateAlreadyAssigned, UnknownRelationRole, |
266 | BadDescriptor, DuplicateEndpoints, IncompatibleEndpoints, |
267 | NoMatchingEndpoints, AmbiguousEndpoints, |
268 | - ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled) |
269 | + ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled, |
270 | + ServiceUnitResolvedAlreadyEnabled, |
271 | + ServiceUnitRelationResolvedAlreadyEnabled) |
272 | |
273 | |
274 | class StateErrorsTest(TestCase): |
275 | @@ -89,6 +91,23 @@ |
276 | str(error), |
277 | "Service unit 'wordpress/0' is already in debug mode.") |
278 | |
279 | + def test_unit_already_in_resolved_mode(self): |
280 | + error = ServiceUnitResolvedAlreadyEnabled("wordpress/0") |
281 | + self.assertIsStateError(error) |
282 | + self.assertEquals(error.unit_name, "wordpress/0") |
283 | + self.assertEquals( |
284 | + str(error), |
285 | + "Service unit 'wordpress/0' is already marked as resolved.") |
286 | + |
287 | + def test_unit_already_in_relation_resolved_mode(self): |
288 | + error = ServiceUnitRelationResolvedAlreadyEnabled("wordpress/0") |
289 | + self.assertIsStateError(error) |
290 | + self.assertEquals(error.unit_name, "wordpress/0") |
291 | + self.assertEquals( |
292 | + str(error), |
293 | + "Service unit %r already has relations marked as resolved." % ( |
294 | + "wordpress/0")) |
295 | + |
296 | def test_service_name_in_use(self): |
297 | error = ServiceStateNameInUse("wordpress") |
298 | self.assertIsStateError(error) |
299 | |
300 | === modified file 'ensemble/state/tests/test_service.py' |
301 | --- ensemble/state/tests/test_service.py 2011-05-05 09:41:11 +0000 |
302 | +++ ensemble/state/tests/test_service.py 2011-05-05 16:20:19 +0000 |
303 | @@ -9,14 +9,15 @@ |
304 | from ensemble.formula.tests.test_metadata import test_repository_path |
305 | from ensemble.state.endpoint import RelationEndpoint |
306 | from ensemble.state.formula import FormulaStateManager |
307 | -from ensemble.state.service import ServiceStateManager |
308 | +from ensemble.state.service import ServiceStateManager, NO_HOOKS, RETRY_HOOKS |
309 | from ensemble.state.machine import MachineStateManager |
310 | from ensemble.state.relation import RelationStateManager |
311 | from ensemble.state.errors import ( |
312 | StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound, |
313 | ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse, |
314 | BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled, |
315 | - MachineStateNotFound, NoUnusedMachines) |
316 | + MachineStateNotFound, NoUnusedMachines, ServiceUnitResolvedAlreadyEnabled, |
317 | + ServiceUnitRelationResolvedAlreadyEnabled) |
318 | |
319 | |
320 | from ensemble.state.tests.common import StateTestBase |
321 | @@ -739,6 +740,213 @@ |
322 | None) |
323 | |
324 | @inlineCallbacks |
325 | + def test_get_set_clear_resolved(self): |
326 | + """The a unit can be set to resolved to mark a future transition, with |
327 | + an optional retry flag.""" |
328 | + |
329 | + unit_state = yield self.get_unit_state() |
330 | + |
331 | + self.assertIdentical((yield unit_state.get_resolved()), None) |
332 | + yield unit_state.set_resolved(NO_HOOKS) |
333 | + |
334 | + yield self.assertFailure( |
335 | + unit_state.set_resolved(NO_HOOKS), |
336 | + ServiceUnitResolvedAlreadyEnabled) |
337 | + yield self.assertEqual( |
338 | + |
339 | + (yield unit_state.get_resolved()), {"retry": NO_HOOKS}) |
340 | + |
341 | + yield unit_state.clear_resolved() |
342 | + self.assertIdentical((yield unit_state.get_resolved()), None) |
343 | + yield unit_state.clear_resolved() |
344 | + |
345 | + yield self.assertFailure(unit_state.set_resolved(None), ValueError) |
346 | + |
347 | + @inlineCallbacks |
348 | + def test_watch_resolved(self): |
349 | + """A unit resolved watch can be instituted on a permanent basis.""" |
350 | + unit_state = yield self.get_unit_state() |
351 | + |
352 | + results = [] |
353 | + |
354 | + def callback(value): |
355 | + results.append(value) |
356 | + |
357 | + unit_state.watch_resolved(callback) |
358 | + yield unit_state.set_resolved(RETRY_HOOKS) |
359 | + yield unit_state.clear_resolved() |
360 | + yield unit_state.set_resolved(NO_HOOKS) |
361 | + |
362 | + yield self.poke_zk() |
363 | + |
364 | + self.assertEqual(len(results), 4) |
365 | + self.assertIdentical(results.pop(0), False) |
366 | + self.assertEqual(results.pop(0).type_name, "created") |
367 | + self.assertEqual(results.pop(0).type_name, "deleted") |
368 | + self.assertEqual(results.pop(0).type_name, "created") |
369 | + |
370 | + self.assertEqual( |
371 | + (yield unit_state.get_resolved()), |
372 | + {"retry": NO_HOOKS}) |
373 | + |
374 | + @inlineCallbacks |
375 | + def test_get_set_clear_relation_resolved(self): |
376 | + """The a unit's realtions can be set to resolved to mark a |
377 | + future transition, with an optional retry flag.""" |
378 | + |
379 | + unit_state = yield self.get_unit_state() |
380 | + |
381 | + self.assertIdentical((yield unit_state.get_relation_resolved()), None) |
382 | + yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) |
383 | + |
384 | + # Trying to set a conflicting raises an error |
385 | + yield self.assertFailure( |
386 | + unit_state.set_relation_resolved({"0": NO_HOOKS}), |
387 | + ServiceUnitRelationResolvedAlreadyEnabled) |
388 | + |
389 | + # Doing the same thing is fine |
390 | + yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}), |
391 | + |
392 | + # Its fine to put in new values |
393 | + yield unit_state.set_relation_resolved({"21": RETRY_HOOKS}) |
394 | + yield self.assertEqual( |
395 | + (yield unit_state.get_relation_resolved()), |
396 | + {"0": RETRY_HOOKS, "21": RETRY_HOOKS}) |
397 | + |
398 | + yield unit_state.clear_relation_resolved() |
399 | + self.assertIdentical((yield unit_state.get_relation_resolved()), None) |
400 | + yield unit_state.clear_relation_resolved() |
401 | + |
402 | + yield self.assertFailure( |
403 | + unit_state.set_relation_resolved(True), ValueError) |
404 | + yield self.assertFailure( |
405 | + unit_state.set_relation_resolved(None), ValueError) |
406 | + |
407 | + @inlineCallbacks |
408 | + def test_watch_relation_resolved(self): |
409 | + """A unit resolved watch can be instituted on a permanent basis.""" |
410 | + unit_state = yield self.get_unit_state() |
411 | + |
412 | + results = [] |
413 | + |
414 | + def callback(value): |
415 | + results.append(value) |
416 | + |
417 | + unit_state.watch_relation_resolved(callback) |
418 | + yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) |
419 | + yield unit_state.clear_relation_resolved() |
420 | + yield unit_state.set_relation_resolved({"0": NO_HOOKS}) |
421 | + |
422 | + yield self.poke_zk() |
423 | + |
424 | + self.assertEqual(len(results), 4) |
425 | + self.assertIdentical(results.pop(0), False) |
426 | + self.assertEqual(results.pop(0).type_name, "created") |
427 | + self.assertEqual(results.pop(0).type_name, "deleted") |
428 | + self.assertEqual(results.pop(0).type_name, "created") |
429 | + |
430 | + self.assertEqual( |
431 | + (yield unit_state.get_relation_resolved()), |
432 | + {"0": NO_HOOKS}) |
433 | + |
434 | + @inlineCallbacks |
435 | + def test_watch_resolved_slow_callback(self): |
436 | + """A slow watch callback is still invoked serially.""" |
437 | + unit_state = yield self.get_unit_state() |
438 | + |
439 | + callbacks = [Deferred() for i in range(5)] |
440 | + results = [] |
441 | + contents = [] |
442 | + |
443 | + @inlineCallbacks |
444 | + def watch(value): |
445 | + results.append(value) |
446 | + yield callbacks[len(results) - 1] |
447 | + contents.append((yield unit_state.get_resolved())) |
448 | + |
449 | + yield unit_state.watch_resolved(watch) |
450 | + |
451 | + # These get collapsed into a single event |
452 | + yield unit_state.set_resolved(RETRY_HOOKS) |
453 | + yield unit_state.clear_resolved() |
454 | + yield self.poke_zk() |
455 | + |
456 | + # Verify the callback hasn't completed |
457 | + self.assertEqual(len(results), 1) |
458 | + self.assertEqual(len(contents), 0) |
459 | + |
460 | + # Let it finish |
461 | + callbacks[0].callback(True) |
462 | + yield self.poke_zk() |
463 | + |
464 | + # Verify result counts |
465 | + self.assertEqual(len(results), 2) |
466 | + self.assertEqual(len(contents), 1) |
467 | + |
468 | + # Verify result values. Even though we have created event, the |
469 | + # setting retrieved shows the hook is not enabled. |
470 | + self.assertEqual(results[-1].type_name, "created") |
471 | + self.assertEqual(contents[-1], None) |
472 | + |
473 | + yield unit_state.set_resolved(NO_HOOKS) |
474 | + callbacks[1].callback(True) |
475 | + yield self.poke_zk() |
476 | + |
477 | + self.assertEqual(len(results), 3) |
478 | + self.assertEqual(contents[-1], {"retry": NO_HOOKS}) |
479 | + |
480 | + # Clear out any pending activity. |
481 | + yield self.poke_zk() |
482 | + |
483 | + @inlineCallbacks |
484 | + def test_watch_relation_resolved_slow_callback(self): |
485 | + """A slow watch callback is still invoked serially.""" |
486 | + unit_state = yield self.get_unit_state() |
487 | + |
488 | + callbacks = [Deferred() for i in range(5)] |
489 | + results = [] |
490 | + contents = [] |
491 | + |
492 | + @inlineCallbacks |
493 | + def watch(value): |
494 | + results.append(value) |
495 | + yield callbacks[len(results) - 1] |
496 | + contents.append((yield unit_state.get_relation_resolved())) |
497 | + |
498 | + yield unit_state.watch_relation_resolved(watch) |
499 | + |
500 | + # These get collapsed into a single event |
501 | + yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) |
502 | + yield unit_state.clear_relation_resolved() |
503 | + yield self.poke_zk() |
504 | + |
505 | + # Verify the callback hasn't completed |
506 | + self.assertEqual(len(results), 1) |
507 | + self.assertEqual(len(contents), 0) |
508 | + |
509 | + # Let it finish |
510 | + callbacks[0].callback(True) |
511 | + yield self.poke_zk() |
512 | + |
513 | + # Verify result counts |
514 | + self.assertEqual(len(results), 2) |
515 | + self.assertEqual(len(contents), 1) |
516 | + |
517 | + # Verify result values. Even though we have created event, the |
518 | + # setting retrieved shows the hook is not enabled. |
519 | + self.assertEqual(results[-1].type_name, "created") |
520 | + self.assertEqual(contents[-1], None) |
521 | + |
522 | + yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) |
523 | + callbacks[1].callback(True) |
524 | + yield self.poke_zk() |
525 | + |
526 | + self.assertEqual(len(results), 3) |
527 | + self.assertEqual(contents[-1], {"0": RETRY_HOOKS}) |
528 | + # Clear out any pending activity. |
529 | + yield self.poke_zk() |
530 | + |
531 | + @inlineCallbacks |
532 | def test_set_and_clear_upgrade_flag(self): |
533 | """An upgrade flag can be set on a unit.""" |
534 | |
535 | @@ -871,6 +1079,9 @@ |
536 | self.assertEqual(results[-1].type_name, "created") |
537 | self.assertEqual(contents[-1], True) |
538 | |
539 | + # Clear out any pending activity. |
540 | + yield self.poke_zk() |
541 | + |
542 | @inlineCallbacks |
543 | def test_enable_debug_hook(self): |
544 | """Unit hook debugging can be enabled on the unit state.""" |
545 | @@ -1063,6 +1274,9 @@ |
546 | self.assertEqual(results[-1].type_name, "created") |
547 | self.assertEqual(contents[-1], {"debug_hooks": ["*"]}) |
548 | |
549 | + # Clear out any pending activity. |
550 | + yield self.poke_zk() |
551 | + |
552 | @inlineCallbacks |
553 | def test_service_unit_agent(self): |
554 | """A service unit state has an associated unit agent.""" |
555 | |
556 | === modified file 'ensemble/state/tests/test_utils.py' |
557 | --- ensemble/state/tests/test_utils.py 2011-05-03 08:54:13 +0000 |
558 | +++ ensemble/state/tests/test_utils.py 2011-05-05 16:20:19 +0000 |
559 | @@ -12,9 +12,10 @@ |
560 | import yaml |
561 | |
562 | from ensemble.lib.testing import TestCase |
563 | -from ensemble.state.errors import StateNotFound |
564 | -from ensemble.state.utils import (PortWatcher, remove_tree, |
565 | +from ensemble.state.errors import StateChanged, StateNotFound |
566 | +from ensemble.state.utils import (PortWatcher, remove_tree, dict_merge, |
567 | get_open_port, YAMLState) |
568 | + |
569 | from ensemble.tests.common import get_test_zookeeper_address |
570 | |
571 | |
572 | @@ -206,6 +207,26 @@ |
573 | self.assertNotIn("zoo", children) |
574 | |
575 | |
576 | +class DictMergeTest(TestCase): |
577 | + |
578 | + def test_merge_no_match(self): |
579 | + self.assertEqual( |
580 | + dict_merge(dict(a=1), dict(b=2)), |
581 | + dict(a=1, b=2)) |
582 | + |
583 | + def test_merge_matching_keys_same_value(self): |
584 | + self.assertEqual( |
585 | + dict_merge(dict(a=1, b=2), dict(b=2, c=1)), |
586 | + dict(a=1, b=2, c=1)) |
587 | + |
588 | + def test_merge_conflict(self): |
589 | + self.assertRaises( |
590 | + StateChanged, |
591 | + dict_merge, |
592 | + dict(a=1, b=3), |
593 | + dict(b=2, c=1)) |
594 | + |
595 | + |
596 | class OpenPortTest(TestCase): |
597 | |
598 | def test_get_open_port(self): |
599 | @@ -301,7 +322,6 @@ |
600 | zk_data = yaml.load(zk_data) |
601 | self.assertEqual(zk_data, options) |
602 | |
603 | - |
604 | @inlineCallbacks |
605 | def test_conflict_on_set(self): |
606 | """Version conflict error tests. |
607 | @@ -357,8 +377,8 @@ |
608 | yield node.read() |
609 | |
610 | options = dict(alpha="beta", one=1) |
611 | - node["alpha"] = "beta" |
612 | - node["one"] = 1 |
613 | + node["alpha"] = "beta" |
614 | + node["one"] = 1 |
615 | yield node.write() |
616 | |
617 | # a local get should reflect proper data |
618 | @@ -369,7 +389,6 @@ |
619 | zk_data = yaml.load(zk_data) |
620 | self.assertEqual(zk_data, options) |
621 | |
622 | - |
623 | @inlineCallbacks |
624 | def test_multiple_reads(self): |
625 | """Calling read resets state to ZK after multiple round-trips.""" |
626 | @@ -415,7 +434,7 @@ |
627 | |
628 | result = node.pop("foo") |
629 | self.assertEqual(result, "bar") |
630 | - self.assertEqual(node, {"alpha": "beta",}) |
631 | + self.assertEqual(node, {"alpha": "beta"}) |
632 | |
633 | node["delta"] = "gamma" |
634 | self.assertEqual(set(node.keys()), set(("alpha", "delta"))) |
635 | @@ -424,7 +443,6 @@ |
636 | self.assertIn(("alpha", "beta"), result) |
637 | self.assertIn(("delta", "gamma"), result) |
638 | |
639 | - |
640 | @inlineCallbacks |
641 | def test_del_empties_state(self): |
642 | d = YAMLState(self.client, self.path) |
643 | @@ -500,12 +518,8 @@ |
644 | yield d1.read() |
645 | self.assertEquals(d1, d2) |
646 | |
647 | - |
648 | @inlineCallbacks |
649 | def test_read_requires_node(self): |
650 | """Validate that read raises when required=True.""" |
651 | d1 = YAMLState(self.client, self.path) |
652 | yield self.assertFailure(d1.read(True), StateNotFound) |
653 | - |
654 | - |
655 | - |
656 | |
657 | === modified file 'ensemble/state/utils.py' |
658 | --- ensemble/state/utils.py 2011-05-03 08:54:13 +0000 |
659 | +++ ensemble/state/utils.py 2011-05-05 16:20:19 +0000 |
660 | @@ -9,8 +9,10 @@ |
661 | import yaml |
662 | import zookeeper |
663 | |
664 | +from ensemble.state.errors import StateChanged |
665 | from ensemble.state.errors import StateNotFound |
666 | |
667 | + |
668 | class PortWatcher(object): |
669 | |
670 | def __init__(self, host, port, timeout, listen=False): |
671 | @@ -91,6 +93,22 @@ |
672 | return port |
673 | |
674 | |
675 | +def dict_merge(d1, d2): |
676 | + """Return a union of dicts if they have no conflicting values. |
677 | + |
678 | + Else raise a StateChanged error. |
679 | + """ |
680 | + must_match = set(d1).intersection(d2) |
681 | + for k in must_match: |
682 | + if not d1[k] == d2[k]: |
683 | + raise StateChanged() |
684 | + |
685 | + d = {} |
686 | + d.update(d1) |
687 | + d.update(d2) |
688 | + return d |
689 | + |
690 | + |
691 | class YAMLState(DictMixin): |
692 | """Provides a dict like interface around a Zookeeper node |
693 | containing serialised YAML data. The dict provided represents the |
694 | @@ -144,7 +162,6 @@ |
695 | if required: |
696 | raise StateNotFound(self._path) |
697 | |
698 | - |
699 | def _check(self): |
700 | """Verify that sync was called for operations which expect it.""" |
701 | if self._pristine_cache is None: |
702 | @@ -164,7 +181,6 @@ |
703 | self._check() |
704 | self._cache[key] = value |
705 | |
706 | - |
707 | def __delitem__(self, key): |
708 | self._check() |
709 | del self._cache[key] |
710 | |
711 | === modified file 'ensemble/unit/lifecycle.py' |
712 | --- ensemble/unit/lifecycle.py 2011-05-05 09:41:11 +0000 |
713 | +++ ensemble/unit/lifecycle.py 2011-05-05 16:20:19 +0000 |
714 | @@ -106,6 +106,7 @@ |
715 | # Stop relation lifecycles |
716 | if self._relations: |
717 | self._log.debug("stopping relation lifecycles") |
718 | + |
719 | for workflow in self._relations.values(): |
720 | yield workflow.transition_state("down") |
721 | |
722 | |
723 | === modified file 'ensemble/unit/tests/test_workflow.py' |
724 | --- ensemble/unit/tests/test_workflow.py 2011-05-05 09:41:11 +0000 |
725 | +++ ensemble/unit/tests/test_workflow.py 2011-05-05 16:20:19 +0000 |
726 | @@ -10,7 +10,7 @@ |
727 | |
728 | from ensemble.unit.workflow import ( |
729 | UnitWorkflowState, RelationWorkflowState, WorkflowStateClient, |
730 | - is_unit_running) |
731 | + is_unit_running, is_relation_running) |
732 | |
733 | |
734 | class WorkflowTestBase(LifecycleTestBase): |
735 | @@ -367,6 +367,26 @@ |
736 | self.state_directory) |
737 | |
738 | @inlineCallbacks |
739 | + def test_is_relation_running(self): |
740 | + """The unit relation's workflow state can be categorized as a |
741 | + boolean. |
742 | + """ |
743 | + running, state = yield is_relation_running( |
744 | + self.client, self.states["unit_relation"]) |
745 | + self.assertIdentical(running, False) |
746 | + self.assertIdentical(state, None) |
747 | + yield self.workflow.fire_transition("start") |
748 | + running, state = yield is_relation_running( |
749 | + self.client, self.states["unit_relation"]) |
750 | + self.assertIdentical(running, True) |
751 | + self.assertEqual(state, "up") |
752 | + yield self.workflow.fire_transition("stop") |
753 | + running, state = yield is_relation_running( |
754 | + self.client, self.states["unit_relation"]) |
755 | + self.assertIdentical(running, False) |
756 | + self.assertEqual(state, "down") |
757 | + |
758 | + @inlineCallbacks |
759 | def test_up_down_cycle(self): |
760 | """The workflow can be transition from up to down, and back. |
761 | """ |
762 | |
763 | === modified file 'ensemble/unit/workflow.py' |
764 | --- ensemble/unit/workflow.py 2011-05-05 09:41:11 +0000 |
765 | +++ ensemble/unit/workflow.py 2011-05-05 16:20:19 +0000 |
766 | @@ -78,12 +78,26 @@ |
767 | """Is the service unit in a running state. |
768 | |
769 | Returns a boolean which is true if the unit is running, and |
770 | - the unit state in two element tuple. |
771 | + the unit workflow state in a two element tuple. |
772 | """ |
773 | workflow_state = yield WorkflowStateClient(client, unit).get_state() |
774 | if not workflow_state: |
775 | returnValue((False, None)) |
776 | - running = workflow_state in ("started",) |
777 | + running = workflow_state == "started" |
778 | + returnValue((running, workflow_state)) |
779 | + |
780 | + |
781 | +@inlineCallbacks |
782 | +def is_relation_running(client, relation): |
783 | + """Is the unit relation in a running state. |
784 | + |
785 | + Returns a boolean which is true if the relation is running, and |
786 | + the unit relation workflow state in a two element tuple. |
787 | + """ |
788 | + workflow_state = yield WorkflowStateClient(client, relation).get_state() |
789 | + if not workflow_state: |
790 | + returnValue((False, None)) |
791 | + running = workflow_state == "up" |
792 | returnValue((running, workflow_state)) |
793 | |
794 |
Some comments:
[1]
+class ServiceUnitRela tionResolvedAlr eadyEnabled( StateError) :
+ """The unit has already been marked resolved.
Documentation needs updating.
[2]
+ def get_resolved(self):
+ """Get the value of the resolved flag if any.
The result of this method isn't a flag.
[3]
+ def get_resolved(self): NoNodeException :
(...)
+ except zookeeper.
+ # We get to the same end state.
+ returnValue(None)
This is a bit misleading. We're just reading, so there's no "end state" to be
achieved. The existence of the file is actually what determines if resolved
was requested or not.
[4]
+ unit_resolve_path = "/units/ %s/resolved" % self.internal_id
Given that this was used 5 times, it would be handy to have something like:
@property %s/resolved" % self.internal_id
def _unit_resolved_path
return "/units/
Then you can use self._unit_ resolved_ path everywhere.
[5]
+ rel_resolved_path = "/units/ %s/relation_ resolved" % self._internal_id
Our file paths so far are all dashed. It'd be good to keep the convention
inside ZooKeeper too.
May also be factored out into a property as per the point above.
[6]
+ def set_relation_ resolved( self, relation_map):
+ """Mark a unit's relations as in need of being resolved.
Doesn't this actually mean something like:
"Mark the problem in the unit's relation as being resolved."
IOW, it's not requesting for the problem to be resolved, but stating
that it has been.
[7]
+ :param relation_map: A map of internal relation ids, to retry booleans.
This feels like a non-ideal API for a couple of reasons:
a) It's exposing internal ids rather than using the relation names as we have
elsewhere.
b) set_relation_ resolved( {"db": True, "cache": False}) feels like marking db as
resolved, and cache as unresolved.
It's a bit hard to suggest something else without knowing exactly how you
intend to use. Let's talk about it.
[8]
+ d1_keys = set(d1.keys()) intersection( d2_keys)
+ d2_keys = set(d2.keys())
+
+ must_match = d1_keys.
As a hint, this could be spelled as:
must_match = set(d1) .intersection( d2):
[9]
+ running = workflow_state in ("up",)
FWIW, this pattern feels slightly weird when there's a single value.
workflow_state == "up" would be more straightforward.
[10]
+ self.assertIden tical(results. pop(0). type_name, "created")
assertIdentical is being overused a bit. There's no reason why these should be
allocated in the same memory location.