Merge lp:~hazmat/pyjuju/unit-agent-resolved into lp:pyjuju
- unit-agent-resolved
- Merge into trunk
Status: | Superseded |
---|---|
Proposed branch: | lp:~hazmat/pyjuju/unit-agent-resolved |
Merge into: | lp:pyjuju |
Diff against target: |
2237 lines (+1531/-93) 16 files modified
Makefile (+1/-1) ensemble/control/__init__.py (+2/-0) ensemble/control/resolved.py (+110/-0) ensemble/control/tests/test_resolved.py (+390/-0) ensemble/lib/statemachine.py (+16/-10) ensemble/lib/tests/test_statemachine.py (+9/-0) ensemble/state/errors.py (+24/-0) ensemble/state/service.py (+186/-2) ensemble/state/tests/test_errors.py (+20/-1) ensemble/state/tests/test_service.py (+286/-2) ensemble/state/tests/test_utils.py (+26/-12) ensemble/state/utils.py (+18/-2) ensemble/unit/lifecycle.py (+84/-21) ensemble/unit/tests/test_lifecycle.py (+240/-3) ensemble/unit/tests/test_workflow.py (+23/-3) ensemble/unit/workflow.py (+96/-36) |
To merge this branch: | bzr merge lp:~hazmat/pyjuju/unit-agent-resolved |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email:
|
This proposal has been superseded by a proposal from 2011-05-02.
Commit message
Description of the change
This implements the unit lifecycle resolving unit relations, and additional changes to enable resolution (transition actions receiving hook execution flags).
The branch is large enough that i'm going to split the remaining unit agent resolving unit relations into another branch.
- 269. By Kapil Thangavelu
-
update unit tests to use relation workflow accessor on lifecycle.
- 270. By Kapil Thangavelu
-
remove passing action transition/state variables
- 271. By Kapil Thangavelu
-
separate transitions for retry with hook
- 272. By Kapil Thangavelu
-
merge trunk resolve conflict.
- 273. By Kapil Thangavelu
-
resolve merge conflict
- 274. By Kapil Thangavelu
-
expand out additional recovery transition actions
- 275. By Kapil Thangavelu
-
complete the change over to more transitions, fill out coverage
- 276. By Kapil Thangavelu
-
Merged ensemble-resolved into unit-agent-
resolved. - 277. By Kapil Thangavelu
-
Merged ensemble-resolved into unit-agent-
resolved. - 278. By Kapil Thangavelu
-
Merged ensemble-resolved into unit-agent-
resolved. - 279. By Kapil Thangavelu
-
address review comments re formatting
Unmerged revisions
Preview Diff
1 | === modified file 'Makefile' | |||
2 | --- Makefile 2011-02-11 14:17:18 +0000 | |||
3 | +++ Makefile 2011-05-02 21:57:29 +0000 | |||
4 | @@ -31,7 +31,7 @@ | |||
5 | 31 | @test -n "$(modified)" && echo $(modified) | xargs pyflakes | 31 | @test -n "$(modified)" && echo $(modified) | xargs pyflakes |
6 | 32 | 32 | ||
7 | 33 | 33 | ||
9 | 34 | modified=$(shell bzr status -S -r ancestor:../trunk |grep -P '^\s*M' | awk '{print $$2;}'| grep -P ".py$$") | 34 | modified=$(shell bzr status -S -r ancestor:../../trunk |grep -P '^\s*M' | awk '{print $$2;}'| grep -P ".py$$") |
10 | 35 | review: | 35 | review: |
11 | 36 | @test -n "$(modified)" && echo $(modified) | xargs $(PEP8) --repeat | 36 | @test -n "$(modified)" && echo $(modified) | xargs $(PEP8) --repeat |
12 | 37 | @test -n "$(modified)" && echo $(modified) | xargs pyflakes | 37 | @test -n "$(modified)" && echo $(modified) | xargs pyflakes |
13 | 38 | 38 | ||
14 | === modified file 'ensemble/control/__init__.py' | |||
15 | --- ensemble/control/__init__.py 2011-04-25 17:53:19 +0000 | |||
16 | +++ ensemble/control/__init__.py 2011-05-02 21:57:29 +0000 | |||
17 | @@ -17,6 +17,7 @@ | |||
18 | 17 | import open_tunnel | 17 | import open_tunnel |
19 | 18 | import remove_relation | 18 | import remove_relation |
20 | 19 | import remove_unit | 19 | import remove_unit |
21 | 20 | import resolved | ||
22 | 20 | import status | 21 | import status |
23 | 21 | import shutdown | 22 | import shutdown |
24 | 22 | import ssh | 23 | import ssh |
25 | @@ -39,6 +40,7 @@ | |||
26 | 39 | open_tunnel, | 40 | open_tunnel, |
27 | 40 | remove_relation, | 41 | remove_relation, |
28 | 41 | remove_unit, | 42 | remove_unit, |
29 | 43 | resolved, | ||
30 | 42 | status, | 44 | status, |
31 | 43 | ssh, | 45 | ssh, |
32 | 44 | shutdown, | 46 | shutdown, |
33 | 45 | 47 | ||
34 | === added file 'ensemble/control/resolved.py' | |||
35 | --- ensemble/control/resolved.py 1970-01-01 00:00:00 +0000 | |||
36 | +++ ensemble/control/resolved.py 2011-05-02 21:57:29 +0000 | |||
37 | @@ -0,0 +1,110 @@ | |||
38 | 1 | """Implementation of resolved subcommand""" | ||
39 | 2 | |||
40 | 3 | |||
41 | 4 | from twisted.internet.defer import inlineCallbacks, returnValue | ||
42 | 5 | |||
43 | 6 | from ensemble.control.utils import get_environment | ||
44 | 7 | |||
45 | 8 | from ensemble.state.service import ServiceStateManager, RETRY_HOOKS, NO_HOOKS | ||
46 | 9 | from ensemble.state.relation import RelationStateManager | ||
47 | 10 | from ensemble.state.errors import RelationStateNotFound | ||
48 | 11 | from ensemble.unit.workflow import is_unit_running, is_relation_running | ||
49 | 12 | |||
50 | 13 | |||
51 | 14 | def configure_subparser(subparsers): | ||
52 | 15 | """Configure resolved subcommand""" | ||
53 | 16 | sub_parser = subparsers.add_parser( | ||
54 | 17 | "resolved", help=command.__doc__, description=resolved.__doc__) | ||
55 | 18 | sub_parser.add_argument( | ||
56 | 19 | "--retry", "-r", action="store_true", | ||
57 | 20 | help="Retry failed hook."), | ||
58 | 21 | sub_parser.add_argument( | ||
59 | 22 | "--environment", "-e", | ||
60 | 23 | help="Ensemble environment to operate in.") | ||
61 | 24 | sub_parser.add_argument( | ||
62 | 25 | "service_unit_name", | ||
63 | 26 | help="Name of the service unit that should be resolved") | ||
64 | 27 | sub_parser.add_argument( | ||
65 | 28 | "relation_name", nargs="?", default=None, | ||
66 | 29 | help="Name of the unit relation that should be resolved") | ||
67 | 30 | return sub_parser | ||
68 | 31 | |||
69 | 32 | |||
70 | 33 | def command(options): | ||
71 | 34 | """Mark an error as resolved in a unit or unit relation.""" | ||
72 | 35 | environment = get_environment(options) | ||
73 | 36 | return resolved( | ||
74 | 37 | options.environments, | ||
75 | 38 | environment, | ||
76 | 39 | options.verbose, | ||
77 | 40 | options.log, | ||
78 | 41 | options.service_unit_name, | ||
79 | 42 | options.relation_name, | ||
80 | 43 | options.retry) | ||
81 | 44 | |||
82 | 45 | |||
83 | 46 | @inlineCallbacks | ||
84 | 47 | def resolved( | ||
85 | 48 | config, environment, verbose, log, unit_name, relation_name, retry): | ||
86 | 49 | """Mark an error as resolved in a unit or unit relation. | ||
87 | 50 | |||
88 | 51 | If one of a unit's formula non-relation hooks returns a non-zero exit | ||
89 | 52 | status, the entire unit can be considered to be in a non-running state. | ||
90 | 53 | |||
91 | 54 | As a resolution, the the unit can be manually returned a running state | ||
92 | 55 | via the ensemble resolved command. Optionally this command can also | ||
93 | 56 | rerun the failed hook. | ||
94 | 57 | |||
95 | 58 | This resolution also applies separately to each of the unit's relations. | ||
96 | 59 | If one of the relation-hooks failed. In that case there is no | ||
97 | 60 | notion of retrying (the change is gone), but resolving will allow | ||
98 | 61 | additional relation hooks for that relation to proceed. | ||
99 | 62 | """ | ||
100 | 63 | provider = environment.get_machine_provider() | ||
101 | 64 | client = yield provider.connect() | ||
102 | 65 | service_manager = ServiceStateManager(client) | ||
103 | 66 | relation_manager = RelationStateManager(client) | ||
104 | 67 | |||
105 | 68 | unit_state = yield service_manager.get_unit_state(unit_name) | ||
106 | 69 | service_state = yield service_manager.get_service_state( | ||
107 | 70 | unit_name.split("/")[0]) | ||
108 | 71 | |||
109 | 72 | retry = retry and RETRY_HOOKS or NO_HOOKS | ||
110 | 73 | |||
111 | 74 | if not relation_name: | ||
112 | 75 | running, workflow_state = yield is_unit_running(client, unit_state) | ||
113 | 76 | if running: | ||
114 | 77 | log.info("Unit %r already running: %s", unit_name, workflow_state) | ||
115 | 78 | client.close() | ||
116 | 79 | returnValue(False) | ||
117 | 80 | |||
118 | 81 | yield unit_state.set_resolved(retry) | ||
119 | 82 | log.info("Marked unit %r as resolved", unit_name) | ||
120 | 83 | returnValue(True) | ||
121 | 84 | |||
122 | 85 | # Check for the matching relations | ||
123 | 86 | service_relations = yield relation_manager.get_relations_for_service( | ||
124 | 87 | service_state) | ||
125 | 88 | service_relations = [ | ||
126 | 89 | sr for sr in service_relations if sr.relation_name == relation_name] | ||
127 | 90 | if not service_relations: | ||
128 | 91 | raise RelationStateNotFound() | ||
129 | 92 | |||
130 | 93 | # Verify the relations are in need of resolution. | ||
131 | 94 | resolve_relations = {} | ||
132 | 95 | for service_relation in service_relations: | ||
133 | 96 | unit_relation = yield service_relation.get_unit_state(unit_state) | ||
134 | 97 | running, state = yield is_relation_running(client, unit_relation) | ||
135 | 98 | if not running: | ||
136 | 99 | resolve_relations[unit_relation.internal_relation_id] = retry | ||
137 | 100 | |||
138 | 101 | if not resolve_relations: | ||
139 | 102 | log.warning("Matched relations are all running") | ||
140 | 103 | client.close() | ||
141 | 104 | returnValue(False) | ||
142 | 105 | |||
143 | 106 | # Mark the relations for resolution. | ||
144 | 107 | yield unit_state.set_relation_resolved(resolve_relations) | ||
145 | 108 | log.info( | ||
146 | 109 | "Marked unit %r relation %r as resolved", unit_name, relation_name) | ||
147 | 110 | client.close() | ||
148 | 0 | 111 | ||
149 | === added file 'ensemble/control/tests/test_resolved.py' | |||
150 | --- ensemble/control/tests/test_resolved.py 1970-01-01 00:00:00 +0000 | |||
151 | +++ ensemble/control/tests/test_resolved.py 2011-05-02 21:57:29 +0000 | |||
152 | @@ -0,0 +1,390 @@ | |||
153 | 1 | from twisted.internet.defer import inlineCallbacks, returnValue | ||
154 | 2 | from yaml import dump | ||
155 | 3 | |||
156 | 4 | from ensemble.control import main | ||
157 | 5 | from ensemble.control.resolved import resolved | ||
158 | 6 | from ensemble.control.tests.common import ControlToolTest | ||
159 | 7 | from ensemble.formula.tests.test_repository import RepositoryTestBase | ||
160 | 8 | |||
161 | 9 | from ensemble.state.service import RETRY_HOOKS, NO_HOOKS | ||
162 | 10 | from ensemble.state.tests.test_service import ServiceStateManagerTestBase | ||
163 | 11 | from ensemble.state.errors import ServiceStateNotFound | ||
164 | 12 | |||
165 | 13 | from ensemble.unit.workflow import UnitWorkflowState, RelationWorkflowState | ||
166 | 14 | from ensemble.unit.lifecycle import UnitRelationLifecycle | ||
167 | 15 | from ensemble.hooks.executor import HookExecutor | ||
168 | 16 | |||
169 | 17 | |||
170 | 18 | class ControlResolvedTest( | ||
171 | 19 | ServiceStateManagerTestBase, ControlToolTest, RepositoryTestBase): | ||
172 | 20 | |||
173 | 21 | @inlineCallbacks | ||
174 | 22 | def setUp(self): | ||
175 | 23 | yield super(ControlResolvedTest, self).setUp() | ||
176 | 24 | config = { | ||
177 | 25 | "ensemble": "environments", | ||
178 | 26 | "environments": {"firstenv": {"type": "dummy"}}} | ||
179 | 27 | |||
180 | 28 | self.write_config(dump(config)) | ||
181 | 29 | self.config.load() | ||
182 | 30 | |||
183 | 31 | yield self.add_relation_state("wordpress", "mysql") | ||
184 | 32 | yield self.add_relation_state("wordpress", "varnish") | ||
185 | 33 | |||
186 | 34 | self.service1 = yield self.service_state_manager.get_service_state( | ||
187 | 35 | "mysql") | ||
188 | 36 | self.service_unit1 = yield self.service1.add_unit_state() | ||
189 | 37 | self.service_unit2 = yield self.service1.add_unit_state() | ||
190 | 38 | |||
191 | 39 | self.unit1_workflow = UnitWorkflowState( | ||
192 | 40 | self.client, self.service_unit1, None, self.makeDir()) | ||
193 | 41 | yield self.unit1_workflow.set_state("started") | ||
194 | 42 | |||
195 | 43 | self.environment = self.config.get_default() | ||
196 | 44 | self.provider = self.environment.get_machine_provider() | ||
197 | 45 | |||
198 | 46 | self.output = self.capture_logging() | ||
199 | 47 | self.stderr = self.capture_stream("stderr") | ||
200 | 48 | self.executor = HookExecutor() | ||
201 | 49 | |||
202 | 50 | @inlineCallbacks | ||
203 | 51 | def add_relation_state(self, *service_names): | ||
204 | 52 | for service_name in service_names: | ||
205 | 53 | try: | ||
206 | 54 | yield self.service_state_manager.get_service_state( | ||
207 | 55 | service_name) | ||
208 | 56 | except ServiceStateNotFound: | ||
209 | 57 | yield self.add_service_from_formula(service_name) | ||
210 | 58 | |||
211 | 59 | endpoint_pairs = yield self.service_state_manager.join_descriptors( | ||
212 | 60 | *service_names) | ||
213 | 61 | endpoints = endpoint_pairs[0] | ||
214 | 62 | endpoints = endpoint_pairs[0] | ||
215 | 63 | if endpoints[0] == endpoints[1]: | ||
216 | 64 | endpoints = endpoints[0:1] | ||
217 | 65 | relation_state = (yield self.relation_state_manager.add_relation_state( | ||
218 | 66 | *endpoints))[0] | ||
219 | 67 | returnValue(relation_state) | ||
220 | 68 | |||
221 | 69 | @inlineCallbacks | ||
222 | 70 | def get_named_service_relation(self, service_state, relation_name): | ||
223 | 71 | if isinstance(service_state, str): | ||
224 | 72 | service_state = yield self.service_state_manager.get_service_state( | ||
225 | 73 | service_state) | ||
226 | 74 | |||
227 | 75 | rels = yield self.relation_state_manager.get_relations_for_service( | ||
228 | 76 | service_state) | ||
229 | 77 | |||
230 | 78 | rels = [sr for sr in rels if sr.relation_name == relation_name] | ||
231 | 79 | if len(rels) == 1: | ||
232 | 80 | returnValue(rels[0]) | ||
233 | 81 | returnValue(rels) | ||
234 | 82 | |||
235 | 83 | @inlineCallbacks | ||
236 | 84 | def setup_unit_relations(self, service_relation, *units): | ||
237 | 85 | """ | ||
238 | 86 | Given a service relation and set of unit tuples in the form | ||
239 | 87 | unit_state, unit_relation_workflow_state, will add unit relations | ||
240 | 88 | for these units and update their workflow state to the desired/given | ||
241 | 89 | state. | ||
242 | 90 | """ | ||
243 | 91 | for unit, state in units: | ||
244 | 92 | unit_relation = yield service_relation.add_unit_state(unit) | ||
245 | 93 | lifecycle = UnitRelationLifecycle( | ||
246 | 94 | self.client, unit_relation, service_relation.relation_name, | ||
247 | 95 | self.makeDir(), self.executor) | ||
248 | 96 | workflow_state = RelationWorkflowState( | ||
249 | 97 | self.client, unit_relation, lifecycle, self.makeDir()) | ||
250 | 98 | yield workflow_state.set_state(state) | ||
251 | 99 | |||
252 | 100 | @inlineCallbacks | ||
253 | 101 | def test_resolved(self): | ||
254 | 102 | """ | ||
255 | 103 | 'ensemble resolved <unit_name>' will schedule a unit for | ||
256 | 104 | retrying from an error state. | ||
257 | 105 | """ | ||
258 | 106 | # Push the unit into an error state | ||
259 | 107 | yield self.unit1_workflow.set_state("start_error") | ||
260 | 108 | self.setup_exit(0) | ||
261 | 109 | finished = self.setup_cli_reactor() | ||
262 | 110 | self.mocker.replay() | ||
263 | 111 | |||
264 | 112 | self.assertEqual( | ||
265 | 113 | (yield self.service_unit1.get_resolved()), None) | ||
266 | 114 | |||
267 | 115 | main(["resolved", "mysql/0"]) | ||
268 | 116 | yield finished | ||
269 | 117 | |||
270 | 118 | self.assertEqual( | ||
271 | 119 | (yield self.service_unit1.get_resolved()), {"retry": NO_HOOKS}) | ||
272 | 120 | self.assertIn( | ||
273 | 121 | "Marked unit 'mysql/0' as resolved", | ||
274 | 122 | self.output.getvalue()) | ||
275 | 123 | |||
276 | 124 | @inlineCallbacks | ||
277 | 125 | def test_resolved_retry(self): | ||
278 | 126 | """ | ||
279 | 127 | 'ensemble resolved --retry <unit_name>' will schedule a unit | ||
280 | 128 | for retrying from an error state with a retry of hooks | ||
281 | 129 | executions. | ||
282 | 130 | """ | ||
283 | 131 | yield self.unit1_workflow.set_state("start_error") | ||
284 | 132 | self.setup_exit(0) | ||
285 | 133 | finished = self.setup_cli_reactor() | ||
286 | 134 | self.mocker.replay() | ||
287 | 135 | |||
288 | 136 | self.assertEqual( | ||
289 | 137 | (yield self.service_unit1.get_resolved()), None) | ||
290 | 138 | |||
291 | 139 | main(["resolved", "--retry", "mysql/0"]) | ||
292 | 140 | yield finished | ||
293 | 141 | |||
294 | 142 | self.assertEqual( | ||
295 | 143 | (yield self.service_unit1.get_resolved()), {"retry": RETRY_HOOKS}) | ||
296 | 144 | self.assertIn( | ||
297 | 145 | "Marked unit 'mysql/0' as resolved", | ||
298 | 146 | self.output.getvalue()) | ||
299 | 147 | |||
300 | 148 | @inlineCallbacks | ||
301 | 149 | def test_relation_resolved(self): | ||
302 | 150 | """ | ||
303 | 151 | 'ensemble relation <unit_name> <rel_name>' will schedule | ||
304 | 152 | the broken unit relations for being resolved. | ||
305 | 153 | """ | ||
306 | 154 | service_relation = yield self.get_named_service_relation( | ||
307 | 155 | self.service1, "server") | ||
308 | 156 | |||
309 | 157 | self.setup_unit_relations( | ||
310 | 158 | service_relation, | ||
311 | 159 | (self.service_unit1, "down"), | ||
312 | 160 | (self.service_unit2, "up")) | ||
313 | 161 | |||
314 | 162 | yield self.unit1_workflow.set_state("start_error") | ||
315 | 163 | self.setup_exit(0) | ||
316 | 164 | finished = self.setup_cli_reactor() | ||
317 | 165 | self.mocker.replay() | ||
318 | 166 | |||
319 | 167 | self.assertEqual( | ||
320 | 168 | (yield self.service_unit1.get_relation_resolved()), None) | ||
321 | 169 | |||
322 | 170 | main(["resolved", "--retry", "mysql/0", | ||
323 | 171 | service_relation.relation_name]) | ||
324 | 172 | yield finished | ||
325 | 173 | |||
326 | 174 | self.assertEqual( | ||
327 | 175 | (yield self.service_unit1.get_relation_resolved()), | ||
328 | 176 | {service_relation.internal_relation_id: RETRY_HOOKS}) | ||
329 | 177 | self.assertEqual( | ||
330 | 178 | (yield self.service_unit2.get_relation_resolved()), | ||
331 | 179 | None) | ||
332 | 180 | self.assertIn( | ||
333 | 181 | "Marked unit 'mysql/0' relation 'server' as resolved", | ||
334 | 182 | self.output.getvalue()) | ||
335 | 183 | |||
336 | 184 | @inlineCallbacks | ||
337 | 185 | def test_resolved_relation_some_already_resolved(self): | ||
338 | 186 | """ | ||
339 | 187 | 'ensemble resolved <service_name> <rel_name>' will mark | ||
340 | 188 | resolved all down units that are not already marked resolved. | ||
341 | 189 | """ | ||
342 | 190 | |||
343 | 191 | service2 = yield self.service_state_manager.get_service_state( | ||
344 | 192 | "wordpress") | ||
345 | 193 | service_unit1 = yield service2.add_unit_state() | ||
346 | 194 | |||
347 | 195 | service_relation = yield self.get_named_service_relation( | ||
348 | 196 | service2, "db") | ||
349 | 197 | yield self.setup_unit_relations( | ||
350 | 198 | service_relation, (service_unit1, "down")) | ||
351 | 199 | |||
352 | 200 | service_relation2 = yield self.get_named_service_relation( | ||
353 | 201 | service2, "cache") | ||
354 | 202 | yield self.setup_unit_relations( | ||
355 | 203 | service_relation2, (service_unit1, "down")) | ||
356 | 204 | |||
357 | 205 | yield service_unit1.set_relation_resolved( | ||
358 | 206 | {service_relation.internal_relation_id: NO_HOOKS}) | ||
359 | 207 | |||
360 | 208 | self.setup_exit(0) | ||
361 | 209 | finished = self.setup_cli_reactor() | ||
362 | 210 | self.mocker.replay() | ||
363 | 211 | |||
364 | 212 | main(["resolved", "--retry", "wordpress/0", "cache"]) | ||
365 | 213 | yield finished | ||
366 | 214 | |||
367 | 215 | self.assertEqual( | ||
368 | 216 | (yield service_unit1.get_relation_resolved()), | ||
369 | 217 | {service_relation.internal_relation_id: NO_HOOKS, | ||
370 | 218 | service_relation2.internal_relation_id: RETRY_HOOKS}) | ||
371 | 219 | |||
372 | 220 | self.assertIn( | ||
373 | 221 | "Marked unit 'wordpress/0' relation 'cache' as resolved", | ||
374 | 222 | self.output.getvalue()) | ||
375 | 223 | |||
376 | 224 | @inlineCallbacks | ||
377 | 225 | def test_resolved_relation_some_already_resolved_conflict(self): | ||
378 | 226 | """ | ||
379 | 227 | 'ensemble resolved <service_name> <rel_name>' will mark | ||
380 | 228 | resolved all down units that are not already marked resolved. | ||
381 | 229 | """ | ||
382 | 230 | |||
383 | 231 | service2 = yield self.service_state_manager.get_service_state( | ||
384 | 232 | "wordpress") | ||
385 | 233 | service_unit1 = yield service2.add_unit_state() | ||
386 | 234 | |||
387 | 235 | service_relation = yield self.get_named_service_relation( | ||
388 | 236 | service2, "db") | ||
389 | 237 | yield self.setup_unit_relations( | ||
390 | 238 | service_relation, (service_unit1, "down")) | ||
391 | 239 | |||
392 | 240 | yield service_unit1.set_relation_resolved( | ||
393 | 241 | {service_relation.internal_relation_id: NO_HOOKS}) | ||
394 | 242 | |||
395 | 243 | self.setup_exit(0) | ||
396 | 244 | finished = self.setup_cli_reactor() | ||
397 | 245 | self.mocker.replay() | ||
398 | 246 | |||
399 | 247 | main(["resolved", "--retry", "wordpress/0", "db"]) | ||
400 | 248 | yield finished | ||
401 | 249 | |||
402 | 250 | self.assertEqual( | ||
403 | 251 | (yield service_unit1.get_relation_resolved()), | ||
404 | 252 | {service_relation.internal_relation_id: NO_HOOKS}) | ||
405 | 253 | |||
406 | 254 | self.assertIn( | ||
407 | 255 | "Service unit 'wordpress/0' already has relations marked as resol", | ||
408 | 256 | self.output.getvalue()) | ||
409 | 257 | |||
410 | 258 | @inlineCallbacks | ||
411 | 259 | def test_resolved_unknown_service(self): | ||
412 | 260 | """ | ||
413 | 261 | 'ensemble resolved <unit_name>' will report if a service is | ||
414 | 262 | invalid. | ||
415 | 263 | """ | ||
416 | 264 | self.setup_exit(0) | ||
417 | 265 | finished = self.setup_cli_reactor() | ||
418 | 266 | self.mocker.replay() | ||
419 | 267 | main(["resolved", "zebra/0"]) | ||
420 | 268 | yield finished | ||
421 | 269 | self.assertIn("Service 'zebra' was not found", self.stderr.getvalue()) | ||
422 | 270 | |||
423 | 271 | @inlineCallbacks | ||
424 | 272 | def test_resolved_unknown_unit(self): | ||
425 | 273 | """ | ||
426 | 274 | 'ensemble resolved <unit_name>' will report if a unit is | ||
427 | 275 | invalid. | ||
428 | 276 | """ | ||
429 | 277 | self.setup_exit(0) | ||
430 | 278 | finished = self.setup_cli_reactor() | ||
431 | 279 | self.mocker.replay() | ||
432 | 280 | main(["resolved", "mysql/5"]) | ||
433 | 281 | yield finished | ||
434 | 282 | self.assertIn( | ||
435 | 283 | "Service unit 'mysql/5' was not found", self.output.getvalue()) | ||
436 | 284 | |||
437 | 285 | @inlineCallbacks | ||
438 | 286 | def test_resolved_unknown_unit_relation(self): | ||
439 | 287 | """ | ||
440 | 288 | 'ensemble resolved <unit_name>' will report if a relation is | ||
441 | 289 | invalid. | ||
442 | 290 | """ | ||
443 | 291 | self.setup_exit(0) | ||
444 | 292 | finished = self.setup_cli_reactor() | ||
445 | 293 | self.mocker.replay() | ||
446 | 294 | |||
447 | 295 | self.assertEqual( | ||
448 | 296 | (yield self.service_unit1.get_resolved()), None) | ||
449 | 297 | |||
450 | 298 | main(["resolved", "mysql/0", "magic"]) | ||
451 | 299 | yield finished | ||
452 | 300 | |||
453 | 301 | self.assertIn("Relation not found", self.output.getvalue()) | ||
454 | 302 | |||
455 | 303 | @inlineCallbacks | ||
456 | 304 | def test_resolved_already_running(self): | ||
457 | 305 | """ | ||
458 | 306 | 'ensemble resolved <unit_name>' will report if | ||
459 | 307 | the unit is already running. | ||
460 | 308 | """ | ||
461 | 309 | # Just verify we don't accidentally mark up another unit of the service | ||
462 | 310 | unit2_workflow = UnitWorkflowState( | ||
463 | 311 | self.client, self.service_unit2, None, self.makeDir()) | ||
464 | 312 | unit2_workflow.set_state("start_error") | ||
465 | 313 | |||
466 | 314 | self.setup_exit(0) | ||
467 | 315 | finished = self.setup_cli_reactor() | ||
468 | 316 | self.mocker.replay() | ||
469 | 317 | |||
470 | 318 | main(["resolved", "mysql/0"]) | ||
471 | 319 | yield finished | ||
472 | 320 | |||
473 | 321 | self.assertEqual( | ||
474 | 322 | (yield self.service_unit2.get_resolved()), None) | ||
475 | 323 | self.assertEqual( | ||
476 | 324 | (yield self.service_unit1.get_resolved()), None) | ||
477 | 325 | |||
478 | 326 | self.assertNotIn( | ||
479 | 327 | "Unit 'mysql/0 already running: started", | ||
480 | 328 | self.output.getvalue()) | ||
481 | 329 | |||
482 | 330 | @inlineCallbacks | ||
483 | 331 | def test_resolved_already_resolved(self): | ||
484 | 332 | """ | ||
485 | 333 | 'ensemble resolved <unit_name>' will report if | ||
486 | 334 | the unit is already resolved. | ||
487 | 335 | """ | ||
488 | 336 | # Mark the unit as resolved and as in an error state. | ||
489 | 337 | yield self.service_unit1.set_resolved(RETRY_HOOKS) | ||
490 | 338 | yield self.unit1_workflow.set_state("start_error") | ||
491 | 339 | |||
492 | 340 | unit2_workflow = UnitWorkflowState( | ||
493 | 341 | self.client, self.service_unit1, None, self.makeDir()) | ||
494 | 342 | unit2_workflow.set_state("start_error") | ||
495 | 343 | |||
496 | 344 | self.assertEqual( | ||
497 | 345 | (yield self.service_unit2.get_resolved()), None) | ||
498 | 346 | |||
499 | 347 | self.setup_exit(0) | ||
500 | 348 | finished = self.setup_cli_reactor() | ||
501 | 349 | self.mocker.replay() | ||
502 | 350 | |||
503 | 351 | main(["resolved", "mysql/0"]) | ||
504 | 352 | yield finished | ||
505 | 353 | |||
506 | 354 | self.assertEqual( | ||
507 | 355 | (yield self.service_unit1.get_resolved()), | ||
508 | 356 | {"retry": RETRY_HOOKS}) | ||
509 | 357 | self.assertNotIn( | ||
510 | 358 | "Marked unit 'mysql/0' as resolved", | ||
511 | 359 | self.output.getvalue()) | ||
512 | 360 | self.assertIn( | ||
513 | 361 | "Service unit 'mysql/0' is already marked as resolved.", | ||
514 | 362 | self.stderr.getvalue(), "") | ||
515 | 363 | |||
516 | 364 | @inlineCallbacks | ||
517 | 365 | def test_resolved_relation_already_running(self): | ||
518 | 366 | """ | ||
519 | 367 | 'ensemble resolved <unit_name> <rel_name>' will report | ||
520 | 368 | if the relation is already running. | ||
521 | 369 | """ | ||
522 | 370 | service2 = yield self.service_state_manager.get_service_state( | ||
523 | 371 | "wordpress") | ||
524 | 372 | service_unit1 = yield service2.add_unit_state() | ||
525 | 373 | |||
526 | 374 | service_relation = yield self.get_named_service_relation( | ||
527 | 375 | service2, "db") | ||
528 | 376 | yield self.setup_unit_relations( | ||
529 | 377 | service_relation, (service_unit1, "up")) | ||
530 | 378 | |||
531 | 379 | self.setup_exit(0) | ||
532 | 380 | finished = self.setup_cli_reactor() | ||
533 | 381 | self.mocker.replay() | ||
534 | 382 | |||
535 | 383 | main(["resolved", "wordpress/0", "db"]) | ||
536 | 384 | yield finished | ||
537 | 385 | |||
538 | 386 | self.assertIn("Matched relations are all running", | ||
539 | 387 | self.output.getvalue()) | ||
540 | 388 | self.assertEqual( | ||
541 | 389 | (yield service_unit1.get_relation_resolved()), | ||
542 | 390 | None) | ||
543 | 0 | 391 | ||
544 | === modified file 'ensemble/lib/statemachine.py' | |||
545 | --- ensemble/lib/statemachine.py 2011-04-15 20:20:57 +0000 | |||
546 | +++ ensemble/lib/statemachine.py 2011-05-02 21:57:29 +0000 | |||
547 | @@ -61,7 +61,7 @@ | |||
548 | 61 | returnValue(self._workflow.get_transitions(state_id)) | 61 | returnValue(self._workflow.get_transitions(state_id)) |
549 | 62 | 62 | ||
550 | 63 | @inlineCallbacks | 63 | @inlineCallbacks |
552 | 64 | def fire_transition_alias(self, transition_alias): | 64 | def fire_transition_alias(self, transition_alias, **transition_variables): |
553 | 65 | """Fire a transition with the matching alias. | 65 | """Fire a transition with the matching alias. |
554 | 66 | 66 | ||
555 | 67 | A transition from the current state with the given alias will | 67 | A transition from the current state with the given alias will |
556 | @@ -81,6 +81,9 @@ | |||
557 | 81 | 81 | ||
558 | 82 | Ambigious (multiple) or no matching transitions cause an exception | 82 | Ambigious (multiple) or no matching transitions cause an exception |
559 | 83 | InvalidTransition to be raised. | 83 | InvalidTransition to be raised. |
560 | 84 | |||
561 | 85 | Keyword args are used as transition variables and given to the | ||
562 | 86 | associated transtion action. | ||
563 | 84 | """ | 87 | """ |
564 | 85 | 88 | ||
565 | 86 | found = [] | 89 | found = [] |
566 | @@ -100,7 +103,8 @@ | |||
567 | 100 | "No transition found for alias:%s state:%s" % ( | 103 | "No transition found for alias:%s state:%s" % ( |
568 | 101 | transition_alias, current_state)) | 104 | transition_alias, current_state)) |
569 | 102 | 105 | ||
571 | 103 | value = yield self.fire_transition(found[0].transition_id) | 106 | value = yield self.fire_transition( |
572 | 107 | found[0].transition_id, **transition_variables) | ||
573 | 104 | returnValue(value) | 108 | returnValue(value) |
574 | 105 | 109 | ||
575 | 106 | @inlineCallbacks | 110 | @inlineCallbacks |
576 | @@ -135,7 +139,8 @@ | |||
577 | 135 | returnValue(False) | 139 | returnValue(False) |
578 | 136 | 140 | ||
579 | 137 | @inlineCallbacks | 141 | @inlineCallbacks |
581 | 138 | def fire_transition(self, transition_id, **state_variables): | 142 | def fire_transition( |
582 | 143 | self, transition_id, **transition_variables): | ||
583 | 139 | """Fire a transition with given id. | 144 | """Fire a transition with given id. |
584 | 140 | 145 | ||
585 | 141 | Invokes any transition actions, saves state and state variables, and | 146 | Invokes any transition actions, saves state and state variables, and |
586 | @@ -156,10 +161,9 @@ | |||
587 | 156 | transition_id, | 161 | transition_id, |
588 | 157 | transition.source, | 162 | transition.source, |
589 | 158 | transition.destination, | 163 | transition.destination, |
591 | 159 | state_variables) | 164 | transition_variables) |
592 | 160 | 165 | ||
593 | 161 | # Execute any per transition action. | 166 | # Execute any per transition action. |
594 | 162 | state_variables = state_variables | ||
595 | 163 | action_id = "do_%s" % transition_id | 167 | action_id = "do_%s" % transition_id |
596 | 164 | action = getattr(self, action_id, None) | 168 | action = getattr(self, action_id, None) |
597 | 165 | 169 | ||
598 | @@ -167,9 +171,9 @@ | |||
599 | 167 | try: | 171 | try: |
600 | 168 | log.debug("%s: execute action %s", | 172 | log.debug("%s: execute action %s", |
601 | 169 | class_name(self), action.__name__) | 173 | class_name(self), action.__name__) |
603 | 170 | variables = yield action() | 174 | variables = yield action(**transition_variables) |
604 | 171 | if isinstance(variables, dict): | 175 | if isinstance(variables, dict): |
606 | 172 | state_variables.update(variables) | 176 | transition_variables.update(variables) |
607 | 173 | except TransitionError, e: | 177 | except TransitionError, e: |
608 | 174 | # If an error happens during the transition, allow for | 178 | # If an error happens during the transition, allow for |
609 | 175 | # executing an error transition. | 179 | # executing an error transition. |
610 | @@ -187,14 +191,16 @@ | |||
611 | 187 | returnValue(False) | 191 | returnValue(False) |
612 | 188 | 192 | ||
613 | 189 | # Set the state with state variables | 193 | # Set the state with state variables |
615 | 190 | yield self.set_state(transition.destination, **state_variables) | 194 | yield self.set_state(transition.destination, **transition_variables) |
616 | 191 | log.debug("%s: transition complete %s (state %s) %r", | 195 | log.debug("%s: transition complete %s (state %s) %r", |
617 | 192 | class_name(self), transition_id, | 196 | class_name(self), transition_id, |
619 | 193 | transition.destination, state_variables) | 197 | transition.destination, transition_variables) |
620 | 194 | if transition.success_transition_id: | 198 | if transition.success_transition_id: |
621 | 195 | log.debug("%s: initiating success transition: %s", | 199 | log.debug("%s: initiating success transition: %s", |
622 | 196 | class_name(self), transition.success_transition_id) | 200 | class_name(self), transition.success_transition_id) |
624 | 197 | yield self.fire_transition(transition.success_transition_id) | 201 | yield self.fire_transition( |
625 | 202 | transition.success_transition_id, | ||
626 | 203 | **transition_variables) | ||
627 | 198 | returnValue(True) | 204 | returnValue(True) |
628 | 199 | 205 | ||
629 | 200 | @inlineCallbacks | 206 | @inlineCallbacks |
630 | 201 | 207 | ||
631 | === modified file 'ensemble/lib/tests/test_statemachine.py' | |||
632 | --- ensemble/lib/tests/test_statemachine.py 2011-04-15 20:20:57 +0000 | |||
633 | +++ ensemble/lib/tests/test_statemachine.py 2011-05-02 21:57:29 +0000 | |||
634 | @@ -295,6 +295,11 @@ | |||
635 | 295 | Transition("continue", "", "next-state", "final-state")) | 295 | Transition("continue", "", "next-state", "final-state")) |
636 | 296 | 296 | ||
637 | 297 | workflow_state = AttributeWorkflowState(workflow) | 297 | workflow_state = AttributeWorkflowState(workflow) |
638 | 298 | results = [] | ||
639 | 299 | |||
640 | 300 | def do_begin(*args, **kw): | ||
641 | 301 | results.append(kw) | ||
642 | 302 | workflow_state.do_begin = do_begin | ||
643 | 298 | 303 | ||
644 | 299 | yield workflow_state.fire_transition( | 304 | yield workflow_state.fire_transition( |
645 | 300 | "begin", rabbit="moon", hello=True) | 305 | "begin", rabbit="moon", hello=True) |
646 | @@ -303,6 +308,10 @@ | |||
647 | 303 | variables = yield workflow_state.get_state_variables() | 308 | variables = yield workflow_state.get_state_variables() |
648 | 304 | self.assertEqual({"rabbit": "moon", "hello": True}, variables) | 309 | self.assertEqual({"rabbit": "moon", "hello": True}, variables) |
649 | 305 | 310 | ||
650 | 311 | # Ensure the variables made it through to the transition action | ||
651 | 312 | self.assertEqual( | ||
652 | 313 | {"rabbit": "moon", "hello": True}, results[0]) | ||
653 | 314 | |||
654 | 306 | yield workflow_state.fire_transition("continue") | 315 | yield workflow_state.fire_transition("continue") |
655 | 307 | current_state = yield workflow_state.get_state() | 316 | current_state = yield workflow_state.get_state() |
656 | 308 | self.assertEqual(current_state, "final-state") | 317 | self.assertEqual(current_state, "final-state") |
657 | 309 | 318 | ||
658 | === modified file 'ensemble/state/errors.py' | |||
659 | --- ensemble/state/errors.py 2011-04-28 17:52:55 +0000 | |||
660 | +++ ensemble/state/errors.py 2011-05-02 21:57:29 +0000 | |||
661 | @@ -151,6 +151,30 @@ | |||
662 | 151 | self.unit_name | 151 | self.unit_name |
663 | 152 | 152 | ||
664 | 153 | 153 | ||
665 | 154 | class ServiceUnitResolvedAlreadyEnabled(StateError): | ||
666 | 155 | """The unit has already been marked resolved. | ||
667 | 156 | """ | ||
668 | 157 | |||
669 | 158 | def __init__(self, unit_name): | ||
670 | 159 | self.unit_name = unit_name | ||
671 | 160 | |||
672 | 161 | def __str__(self): | ||
673 | 162 | return "Service unit %r is already marked as resolved." % ( | ||
674 | 163 | self.unit_name) | ||
675 | 164 | |||
676 | 165 | |||
677 | 166 | class ServiceUnitRelationResolvedAlreadyEnabled(StateError): | ||
678 | 167 | """The unit has already been marked resolved. | ||
679 | 168 | """ | ||
680 | 169 | |||
681 | 170 | def __init__(self, unit_name): | ||
682 | 171 | self.unit_name = unit_name | ||
683 | 172 | |||
684 | 173 | def __str__(self): | ||
685 | 174 | return "Service unit %r already has relations marked as resolved." % ( | ||
686 | 175 | self.unit_name) | ||
687 | 176 | |||
688 | 177 | |||
689 | 154 | class RelationAlreadyExists(StateError): | 178 | class RelationAlreadyExists(StateError): |
690 | 155 | 179 | ||
691 | 156 | def __init__(self, *endpoints): | 180 | def __init__(self, *endpoints): |
692 | 157 | 181 | ||
693 | === modified file 'ensemble/state/service.py' | |||
694 | --- ensemble/state/service.py 2011-04-27 16:23:16 +0000 | |||
695 | +++ ensemble/state/service.py 2011-05-02 21:57:29 +0000 | |||
696 | @@ -15,11 +15,16 @@ | |||
697 | 15 | StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound, | 15 | StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound, |
698 | 16 | ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse, | 16 | ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse, |
699 | 17 | BadDescriptor, BadServiceStateName, NoUnusedMachines, | 17 | BadDescriptor, BadServiceStateName, NoUnusedMachines, |
701 | 18 | ServiceUnitDebugAlreadyEnabled) | 18 | ServiceUnitDebugAlreadyEnabled, ServiceUnitResolvedAlreadyEnabled, |
702 | 19 | ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher) | ||
703 | 19 | from ensemble.state.formula import FormulaStateManager | 20 | from ensemble.state.formula import FormulaStateManager |
704 | 20 | from ensemble.state.relation import ServiceRelationState, RelationStateManager | 21 | from ensemble.state.relation import ServiceRelationState, RelationStateManager |
705 | 21 | from ensemble.state.machine import _public_machine_id, MachineState | 22 | from ensemble.state.machine import _public_machine_id, MachineState |
707 | 22 | from ensemble.state.utils import remove_tree | 23 | from ensemble.state.utils import remove_tree, dict_merge |
708 | 24 | |||
709 | 25 | |||
710 | 26 | RETRY_HOOKS = 1000 | ||
711 | 27 | NO_HOOKS = 1001 | ||
712 | 23 | 28 | ||
713 | 24 | 29 | ||
714 | 25 | class ServiceStateManager(StateBase): | 30 | class ServiceStateManager(StateBase): |
715 | @@ -855,6 +860,185 @@ | |||
716 | 855 | callback_d.addCallback( | 860 | callback_d.addCallback( |
717 | 856 | lambda x: watch_d.addCallback(watcher) and x) | 861 | lambda x: watch_d.addCallback(watcher) and x) |
718 | 857 | 862 | ||
719 | 863 | @property | ||
720 | 864 | def _unit_resolve_path(self): | ||
721 | 865 | return "/units/%s/resolved" % self.internal_id | ||
722 | 866 | |||
723 | 867 | @inlineCallbacks | ||
724 | 868 | def set_resolved(self, retry): | ||
725 | 869 | """Mark the unit as in need of being resolved. | ||
726 | 870 | |||
727 | 871 | :param retry: A boolean denoting if hooks should fire as a result | ||
728 | 872 | of the retry. | ||
729 | 873 | |||
730 | 874 | The resolved setting is set by the command line to inform | ||
731 | 875 | a unit to attempt an retry transition from an error state. | ||
732 | 876 | """ | ||
733 | 877 | |||
734 | 878 | if not retry in (RETRY_HOOKS, NO_HOOKS): | ||
735 | 879 | raise ValueError("invalid retry value %r" % retry) | ||
736 | 880 | |||
737 | 881 | try: | ||
738 | 882 | yield self._client.create( | ||
739 | 883 | self._unit_resolve_path, yaml.safe_dump({"retry": retry})) | ||
740 | 884 | except zookeeper.NodeExistsException: | ||
741 | 885 | raise ServiceUnitResolvedAlreadyEnabled(self.unit_name) | ||
742 | 886 | |||
743 | 887 | @inlineCallbacks | ||
744 | 888 | def get_resolved(self): | ||
745 | 889 | """Get the value of the resolved setting if any. | ||
746 | 890 | |||
747 | 891 | The resolved setting is retrieved by the unit agent and if | ||
748 | 892 | found instructs it to attempt an retry transition from an | ||
749 | 893 | error state. | ||
750 | 894 | """ | ||
751 | 895 | try: | ||
752 | 896 | content, stat = yield self._client.get(self._unit_resolve_path) | ||
753 | 897 | except zookeeper.NoNodeException: | ||
754 | 898 | # Return a default value. | ||
755 | 899 | returnValue(None) | ||
756 | 900 | returnValue(yaml.load(content)) | ||
757 | 901 | |||
758 | 902 | @inlineCallbacks | ||
759 | 903 | def clear_resolved(self): | ||
760 | 904 | """Remove any resolved setting on the unit.""" | ||
761 | 905 | try: | ||
762 | 906 | yield self._client.delete(self._unit_resolve_path) | ||
763 | 907 | except zookeeper.NoNodeException: | ||
764 | 908 | # We get to the same end state. | ||
765 | 909 | pass | ||
766 | 910 | |||
767 | 911 | @inlineCallbacks | ||
768 | 912 | def watch_resolved(self, callback): | ||
769 | 913 | """Set a callback to be invoked when an unit is marked resolved. | ||
770 | 914 | |||
771 | 915 | :param callback: The callback recieves a single parameter, the | ||
772 | 916 | change event. The watcher always recieve an initial | ||
773 | 917 | boolean value invocation denoting the existence of the | ||
774 | 918 | resolved setting. Subsequent invocations will be with change | ||
775 | 919 | events. | ||
776 | 920 | """ | ||
777 | 921 | @inlineCallbacks | ||
778 | 922 | def watcher(change_event): | ||
779 | 923 | if not self._client.connected: | ||
780 | 924 | returnValue(None) | ||
781 | 925 | |||
782 | 926 | exists_d, watch_d = self._client.exists_and_watch( | ||
783 | 927 | self._unit_resolve_path) | ||
784 | 928 | try: | ||
785 | 929 | yield callback(change_event) | ||
786 | 930 | except StopWatcher: | ||
787 | 931 | returnValue(None) | ||
788 | 932 | watch_d.addCallback(watcher) | ||
789 | 933 | |||
790 | 934 | exists_d, watch_d = self._client.exists_and_watch( | ||
791 | 935 | self._unit_resolve_path) | ||
792 | 936 | exists = yield exists_d | ||
793 | 937 | |||
794 | 938 | # Setup the watch deferred callback after the user defined callback | ||
795 | 939 | # has returned successfully from the existence invocation. | ||
796 | 940 | callback_d = maybeDeferred(callback, bool(exists)) | ||
797 | 941 | callback_d.addCallback( | ||
798 | 942 | lambda x: watch_d.addCallback(watcher) and x) | ||
799 | 943 | callback_d.addErrback( | ||
800 | 944 | lambda failure: failure.trap(StopWatcher)) | ||
801 | 945 | |||
802 | 946 | @property | ||
803 | 947 | def _relation_resolved_path(self): | ||
804 | 948 | return "/units/%s/relation-resolved" % self.internal_id | ||
805 | 949 | |||
806 | 950 | @inlineCallbacks | ||
807 | 951 | def set_relation_resolved(self, relation_map): | ||
808 | 952 | """Mark a unit's relations as in need of being resolved. | ||
809 | 953 | |||
810 | 954 | :param relation_map: A map of internal relation ids, to retry hook | ||
811 | 955 | values either ensemble.state.service.NO_HOOKS or | ||
812 | 956 | RETRY_HOOKS. | ||
813 | 957 | """ | ||
814 | 958 | if not isinstance(relation_map, dict): | ||
815 | 959 | raise ValueError( | ||
816 | 960 | "Relation map must be a dictionary %r" % relation_map) | ||
817 | 961 | |||
818 | 962 | if [v for v in relation_map.values() if v not in ( | ||
819 | 963 | RETRY_HOOKS, NO_HOOKS)]: | ||
820 | 964 | |||
821 | 965 | print relation_map | ||
822 | 966 | raise ValueError("Invalid setting for retry hook") | ||
823 | 967 | |||
824 | 968 | def update_relation_resolved(content, stat): | ||
825 | 969 | if not content: | ||
826 | 970 | return yaml.safe_dump(relation_map) | ||
827 | 971 | |||
828 | 972 | content = yaml.safe_dump( | ||
829 | 973 | dict_merge(yaml.load(content), relation_map)) | ||
830 | 974 | return content | ||
831 | 975 | |||
832 | 976 | try: | ||
833 | 977 | yield retry_change( | ||
834 | 978 | self._client, | ||
835 | 979 | self._relation_resolved_path, | ||
836 | 980 | update_relation_resolved) | ||
837 | 981 | except StateChanged: | ||
838 | 982 | raise ServiceUnitRelationResolvedAlreadyEnabled(self.unit_name) | ||
839 | 983 | returnValue(True) | ||
840 | 984 | |||
841 | 985 | @inlineCallbacks | ||
842 | 986 | def get_relation_resolved(self): | ||
843 | 987 | """Retrieve any resolved flags set for this unit's relations. | ||
844 | 988 | """ | ||
845 | 989 | try: | ||
846 | 990 | content, stat = yield self._client.get( | ||
847 | 991 | self._relation_resolved_path) | ||
848 | 992 | except zookeeper.NoNodeException: | ||
849 | 993 | returnValue(None) | ||
850 | 994 | returnValue(yaml.load(content)) | ||
851 | 995 | |||
852 | 996 | @inlineCallbacks | ||
853 | 997 | def clear_relation_resolved(self): | ||
854 | 998 | """ Clear the relation resolved setting. | ||
855 | 999 | """ | ||
856 | 1000 | try: | ||
857 | 1001 | yield self._client.delete(self._relation_resolved_path) | ||
858 | 1002 | except zookeeper.NoNodeException: | ||
859 | 1003 | # We get to the same end state. | ||
860 | 1004 | pass | ||
861 | 1005 | |||
862 | 1006 | @inlineCallbacks | ||
863 | 1007 | def watch_relation_resolved(self, callback): | ||
864 | 1008 | """Set a callback to be invoked when a unit's relations are resolved. | ||
865 | 1009 | |||
866 | 1010 | :param callback: The callback recieves a single parameter, the | ||
867 | 1011 | change event. The watcher always recieve an initial | ||
868 | 1012 | boolean value invocation denoting the existence of the | ||
869 | 1013 | resolved setting. Subsequent invocations will be with change | ||
870 | 1014 | events. | ||
871 | 1015 | """ | ||
872 | 1016 | @inlineCallbacks | ||
873 | 1017 | def watcher(change_event): | ||
874 | 1018 | if not self._client.connected: | ||
875 | 1019 | returnValue(None) | ||
876 | 1020 | exists_d, watch_d = self._client.exists_and_watch( | ||
877 | 1021 | self._relation_resolved_path) | ||
878 | 1022 | try: | ||
879 | 1023 | yield callback(change_event) | ||
880 | 1024 | except StopWatcher: | ||
881 | 1025 | returnValue(None) | ||
882 | 1026 | |||
883 | 1027 | watch_d.addCallback(watcher) | ||
884 | 1028 | |||
885 | 1029 | exists_d, watch_d = self._client.exists_and_watch( | ||
886 | 1030 | self._relation_resolved_path) | ||
887 | 1031 | |||
888 | 1032 | exists = yield exists_d | ||
889 | 1033 | |||
890 | 1034 | # Setup the watch deferred callback after the user defined callback | ||
891 | 1035 | # has returned successfully from the existence invocation. | ||
892 | 1036 | callback_d = maybeDeferred(callback, bool(exists)) | ||
893 | 1037 | callback_d.addCallback( | ||
894 | 1038 | lambda x: watch_d.addCallback(watcher) and x) | ||
895 | 1039 | callback_d.addErrback( | ||
896 | 1040 | lambda failure: failure.trap(StopWatcher)) | ||
897 | 1041 | |||
898 | 858 | 1042 | ||
899 | 859 | def _parse_unit_name(unit_name): | 1043 | def _parse_unit_name(unit_name): |
900 | 860 | """Parse a unit's name into the service name and its sequence. | 1044 | """Parse a unit's name into the service name and its sequence. |
901 | 861 | 1045 | ||
902 | === modified file 'ensemble/state/tests/test_errors.py' | |||
903 | --- ensemble/state/tests/test_errors.py 2011-02-23 17:47:46 +0000 | |||
904 | +++ ensemble/state/tests/test_errors.py 2011-05-02 21:57:29 +0000 | |||
905 | @@ -11,7 +11,9 @@ | |||
906 | 11 | UnitRelationStateAlreadyAssigned, UnknownRelationRole, | 11 | UnitRelationStateAlreadyAssigned, UnknownRelationRole, |
907 | 12 | BadDescriptor, DuplicateEndpoints, IncompatibleEndpoints, | 12 | BadDescriptor, DuplicateEndpoints, IncompatibleEndpoints, |
908 | 13 | NoMatchingEndpoints, AmbiguousEndpoints, | 13 | NoMatchingEndpoints, AmbiguousEndpoints, |
910 | 14 | ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled) | 14 | ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled, |
911 | 15 | ServiceUnitResolvedAlreadyEnabled, | ||
912 | 16 | ServiceUnitRelationResolvedAlreadyEnabled) | ||
913 | 15 | 17 | ||
914 | 16 | 18 | ||
915 | 17 | class StateErrorsTest(TestCase): | 19 | class StateErrorsTest(TestCase): |
916 | @@ -89,6 +91,23 @@ | |||
917 | 89 | str(error), | 91 | str(error), |
918 | 90 | "Service unit 'wordpress/0' is already in debug mode.") | 92 | "Service unit 'wordpress/0' is already in debug mode.") |
919 | 91 | 93 | ||
920 | 94 | def test_unit_already_in_resolved_mode(self): | ||
921 | 95 | error = ServiceUnitResolvedAlreadyEnabled("wordpress/0") | ||
922 | 96 | self.assertIsStateError(error) | ||
923 | 97 | self.assertEquals(error.unit_name, "wordpress/0") | ||
924 | 98 | self.assertEquals( | ||
925 | 99 | str(error), | ||
926 | 100 | "Service unit 'wordpress/0' is already marked as resolved.") | ||
927 | 101 | |||
928 | 102 | def test_unit_already_in_relation_resolved_mode(self): | ||
929 | 103 | error = ServiceUnitRelationResolvedAlreadyEnabled("wordpress/0") | ||
930 | 104 | self.assertIsStateError(error) | ||
931 | 105 | self.assertEquals(error.unit_name, "wordpress/0") | ||
932 | 106 | self.assertEquals( | ||
933 | 107 | str(error), | ||
934 | 108 | "Service unit %r already has relations marked as resolved." % ( | ||
935 | 109 | "wordpress/0")) | ||
936 | 110 | |||
937 | 92 | def test_service_name_in_use(self): | 111 | def test_service_name_in_use(self): |
938 | 93 | error = ServiceStateNameInUse("wordpress") | 112 | error = ServiceStateNameInUse("wordpress") |
939 | 94 | self.assertIsStateError(error) | 113 | self.assertIsStateError(error) |
940 | 95 | 114 | ||
941 | === modified file 'ensemble/state/tests/test_service.py' | |||
942 | --- ensemble/state/tests/test_service.py 2011-04-23 01:13:27 +0000 | |||
943 | +++ ensemble/state/tests/test_service.py 2011-05-02 21:57:29 +0000 | |||
944 | @@ -9,14 +9,15 @@ | |||
945 | 9 | from ensemble.formula.tests.test_metadata import test_repository_path | 9 | from ensemble.formula.tests.test_metadata import test_repository_path |
946 | 10 | from ensemble.state.endpoint import RelationEndpoint | 10 | from ensemble.state.endpoint import RelationEndpoint |
947 | 11 | from ensemble.state.formula import FormulaStateManager | 11 | from ensemble.state.formula import FormulaStateManager |
949 | 12 | from ensemble.state.service import ServiceStateManager | 12 | from ensemble.state.service import ServiceStateManager, NO_HOOKS, RETRY_HOOKS |
950 | 13 | from ensemble.state.machine import MachineStateManager | 13 | from ensemble.state.machine import MachineStateManager |
951 | 14 | from ensemble.state.relation import RelationStateManager | 14 | from ensemble.state.relation import RelationStateManager |
952 | 15 | from ensemble.state.errors import ( | 15 | from ensemble.state.errors import ( |
953 | 16 | StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound, | 16 | StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound, |
954 | 17 | ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse, | 17 | ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse, |
955 | 18 | BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled, | 18 | BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled, |
957 | 19 | MachineStateNotFound, NoUnusedMachines) | 19 | MachineStateNotFound, NoUnusedMachines, ServiceUnitResolvedAlreadyEnabled, |
958 | 20 | ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher) | ||
959 | 20 | 21 | ||
960 | 21 | 22 | ||
961 | 22 | from ensemble.state.tests.common import StateTestBase | 23 | from ensemble.state.tests.common import StateTestBase |
962 | @@ -738,6 +739,283 @@ | |||
963 | 738 | None) | 739 | None) |
964 | 739 | 740 | ||
965 | 740 | @inlineCallbacks | 741 | @inlineCallbacks |
966 | 742 | def test_get_set_clear_resolved(self): | ||
967 | 743 | """The a unit can be set to resolved to mark a future transition, with | ||
968 | 744 | an optional retry flag.""" | ||
969 | 745 | |||
970 | 746 | unit_state = yield self.get_unit_state() | ||
971 | 747 | |||
972 | 748 | self.assertIdentical((yield unit_state.get_resolved()), None) | ||
973 | 749 | yield unit_state.set_resolved(NO_HOOKS) | ||
974 | 750 | |||
975 | 751 | yield self.assertFailure( | ||
976 | 752 | unit_state.set_resolved(NO_HOOKS), | ||
977 | 753 | ServiceUnitResolvedAlreadyEnabled) | ||
978 | 754 | yield self.assertEqual( | ||
979 | 755 | |||
980 | 756 | (yield unit_state.get_resolved()), {"retry": NO_HOOKS}) | ||
981 | 757 | |||
982 | 758 | yield unit_state.clear_resolved() | ||
983 | 759 | self.assertIdentical((yield unit_state.get_resolved()), None) | ||
984 | 760 | yield unit_state.clear_resolved() | ||
985 | 761 | |||
986 | 762 | yield self.assertFailure(unit_state.set_resolved(None), ValueError) | ||
987 | 763 | |||
988 | 764 | @inlineCallbacks | ||
989 | 765 | def test_watch_resolved(self): | ||
990 | 766 | """A unit resolved watch can be instituted on a permanent basis.""" | ||
991 | 767 | unit_state = yield self.get_unit_state() | ||
992 | 768 | |||
993 | 769 | results = [] | ||
994 | 770 | |||
995 | 771 | def callback(value): | ||
996 | 772 | results.append(value) | ||
997 | 773 | |||
998 | 774 | unit_state.watch_resolved(callback) | ||
999 | 775 | yield unit_state.set_resolved(RETRY_HOOKS) | ||
1000 | 776 | yield unit_state.clear_resolved() | ||
1001 | 777 | yield unit_state.set_resolved(NO_HOOKS) | ||
1002 | 778 | |||
1003 | 779 | yield self.poke_zk() | ||
1004 | 780 | |||
1005 | 781 | self.assertEqual(len(results), 4) | ||
1006 | 782 | self.assertIdentical(results.pop(0), False) | ||
1007 | 783 | self.assertEqual(results.pop(0).type_name, "created") | ||
1008 | 784 | self.assertEqual(results.pop(0).type_name, "deleted") | ||
1009 | 785 | self.assertEqual(results.pop(0).type_name, "created") | ||
1010 | 786 | |||
1011 | 787 | self.assertEqual( | ||
1012 | 788 | (yield unit_state.get_resolved()), | ||
1013 | 789 | {"retry": NO_HOOKS}) | ||
1014 | 790 | |||
1015 | 791 | @inlineCallbacks | ||
1016 | 792 | def test_stop_watch_resolved(self): | ||
1017 | 793 | """A unit resolved watch can be instituted on a permanent basis. | ||
1018 | 794 | |||
1019 | 795 | However the callback can raise StopWatcher at anytime to stop the watch | ||
1020 | 796 | """ | ||
1021 | 797 | unit_state = yield self.get_unit_state() | ||
1022 | 798 | |||
1023 | 799 | results = [] | ||
1024 | 800 | |||
1025 | 801 | def callback(value): | ||
1026 | 802 | results.append(value) | ||
1027 | 803 | if len(results) == 1: | ||
1028 | 804 | raise StopWatcher() | ||
1029 | 805 | if len(results) == 3: | ||
1030 | 806 | raise StopWatcher() | ||
1031 | 807 | |||
1032 | 808 | unit_state.watch_resolved(callback) | ||
1033 | 809 | yield unit_state.set_resolved(RETRY_HOOKS) | ||
1034 | 810 | yield unit_state.clear_resolved() | ||
1035 | 811 | yield self.poke_zk() | ||
1036 | 812 | |||
1037 | 813 | unit_state.watch_resolved(callback) | ||
1038 | 814 | yield unit_state.set_resolved(NO_HOOKS) | ||
1039 | 815 | yield unit_state.clear_resolved() | ||
1040 | 816 | |||
1041 | 817 | yield self.poke_zk() | ||
1042 | 818 | |||
1043 | 819 | self.assertEqual(len(results), 3) | ||
1044 | 820 | self.assertIdentical(results.pop(0), False) | ||
1045 | 821 | self.assertIdentical(results.pop(0), False) | ||
1046 | 822 | self.assertEqual(results.pop(0).type_name, "created") | ||
1047 | 823 | |||
1048 | 824 | self.assertEqual( | ||
1049 | 825 | (yield unit_state.get_resolved()), None) | ||
1050 | 826 | |||
1051 | 827 | @inlineCallbacks | ||
1052 | 828 | def test_get_set_clear_relation_resolved(self): | ||
1053 | 829 | """The a unit's realtions can be set to resolved to mark a | ||
1054 | 830 | future transition, with an optional retry flag.""" | ||
1055 | 831 | |||
1056 | 832 | unit_state = yield self.get_unit_state() | ||
1057 | 833 | |||
1058 | 834 | self.assertIdentical((yield unit_state.get_relation_resolved()), None) | ||
1059 | 835 | yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) | ||
1060 | 836 | |||
1061 | 837 | # Trying to set a conflicting raises an error | ||
1062 | 838 | yield self.assertFailure( | ||
1063 | 839 | unit_state.set_relation_resolved({"0": NO_HOOKS}), | ||
1064 | 840 | ServiceUnitRelationResolvedAlreadyEnabled) | ||
1065 | 841 | |||
1066 | 842 | # Doing the same thing is fine | ||
1067 | 843 | yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}), | ||
1068 | 844 | |||
1069 | 845 | # Its fine to put in new values | ||
1070 | 846 | yield unit_state.set_relation_resolved({"21": RETRY_HOOKS}) | ||
1071 | 847 | yield self.assertEqual( | ||
1072 | 848 | (yield unit_state.get_relation_resolved()), | ||
1073 | 849 | {"0": RETRY_HOOKS, "21": RETRY_HOOKS}) | ||
1074 | 850 | |||
1075 | 851 | yield unit_state.clear_relation_resolved() | ||
1076 | 852 | self.assertIdentical((yield unit_state.get_relation_resolved()), None) | ||
1077 | 853 | yield unit_state.clear_relation_resolved() | ||
1078 | 854 | |||
1079 | 855 | yield self.assertFailure( | ||
1080 | 856 | unit_state.set_relation_resolved(True), ValueError) | ||
1081 | 857 | yield self.assertFailure( | ||
1082 | 858 | unit_state.set_relation_resolved(None), ValueError) | ||
1083 | 859 | |||
1084 | 860 | @inlineCallbacks | ||
1085 | 861 | def test_watch_relation_resolved(self): | ||
1086 | 862 | """A unit resolved watch can be instituted on a permanent basis.""" | ||
1087 | 863 | unit_state = yield self.get_unit_state() | ||
1088 | 864 | |||
1089 | 865 | results = [] | ||
1090 | 866 | |||
1091 | 867 | def callback(value): | ||
1092 | 868 | results.append(value) | ||
1093 | 869 | |||
1094 | 870 | unit_state.watch_relation_resolved(callback) | ||
1095 | 871 | yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) | ||
1096 | 872 | yield unit_state.clear_relation_resolved() | ||
1097 | 873 | yield unit_state.set_relation_resolved({"0": NO_HOOKS}) | ||
1098 | 874 | |||
1099 | 875 | yield self.poke_zk() | ||
1100 | 876 | |||
1101 | 877 | self.assertEqual(len(results), 4) | ||
1102 | 878 | self.assertIdentical(results.pop(0), False) | ||
1103 | 879 | self.assertEqual(results.pop(0).type_name, "created") | ||
1104 | 880 | self.assertEqual(results.pop(0).type_name, "deleted") | ||
1105 | 881 | self.assertEqual(results.pop(0).type_name, "created") | ||
1106 | 882 | |||
1107 | 883 | self.assertEqual( | ||
1108 | 884 | (yield unit_state.get_relation_resolved()), | ||
1109 | 885 | {"0": NO_HOOKS}) | ||
1110 | 886 | |||
1111 | 887 | @inlineCallbacks | ||
1112 | 888 | def test_stop_watch_relation_resolved(self): | ||
1113 | 889 | """A unit resolved watch can be instituted on a permanent basis.""" | ||
1114 | 890 | unit_state = yield self.get_unit_state() | ||
1115 | 891 | |||
1116 | 892 | results = [] | ||
1117 | 893 | |||
1118 | 894 | def callback(value): | ||
1119 | 895 | results.append(value) | ||
1120 | 896 | |||
1121 | 897 | if len(results) == 1: | ||
1122 | 898 | raise StopWatcher() | ||
1123 | 899 | |||
1124 | 900 | if len(results) == 3: | ||
1125 | 901 | raise StopWatcher() | ||
1126 | 902 | |||
1127 | 903 | unit_state.watch_relation_resolved(callback) | ||
1128 | 904 | yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) | ||
1129 | 905 | yield unit_state.clear_relation_resolved() | ||
1130 | 906 | yield self.poke_zk() | ||
1131 | 907 | self.assertEqual(len(results), 1) | ||
1132 | 908 | |||
1133 | 909 | unit_state.watch_relation_resolved(callback) | ||
1134 | 910 | yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) | ||
1135 | 911 | yield unit_state.clear_relation_resolved() | ||
1136 | 912 | yield self.poke_zk() | ||
1137 | 913 | self.assertEqual(len(results), 3) | ||
1138 | 914 | self.assertIdentical(results.pop(0), False) | ||
1139 | 915 | self.assertIdentical(results.pop(0), False) | ||
1140 | 916 | self.assertEqual(results.pop(0).type_name, "created") | ||
1141 | 917 | |||
1142 | 918 | self.assertEqual( | ||
1143 | 919 | (yield unit_state.get_relation_resolved()), None) | ||
1144 | 920 | |||
1145 | 921 | @inlineCallbacks | ||
1146 | 922 | def test_watch_resolved_slow_callback(self): | ||
1147 | 923 | """A slow watch callback is still invoked serially.""" | ||
1148 | 924 | unit_state = yield self.get_unit_state() | ||
1149 | 925 | |||
1150 | 926 | callbacks = [Deferred() for i in range(5)] | ||
1151 | 927 | results = [] | ||
1152 | 928 | contents = [] | ||
1153 | 929 | |||
1154 | 930 | @inlineCallbacks | ||
1155 | 931 | def watch(value): | ||
1156 | 932 | results.append(value) | ||
1157 | 933 | yield callbacks[len(results) - 1] | ||
1158 | 934 | contents.append((yield unit_state.get_resolved())) | ||
1159 | 935 | |||
1160 | 936 | yield unit_state.watch_resolved(watch) | ||
1161 | 937 | |||
1162 | 938 | # These get collapsed into a single event | ||
1163 | 939 | yield unit_state.set_resolved(RETRY_HOOKS) | ||
1164 | 940 | yield unit_state.clear_resolved() | ||
1165 | 941 | yield self.poke_zk() | ||
1166 | 942 | |||
1167 | 943 | # Verify the callback hasn't completed | ||
1168 | 944 | self.assertEqual(len(results), 1) | ||
1169 | 945 | self.assertEqual(len(contents), 0) | ||
1170 | 946 | |||
1171 | 947 | # Let it finish | ||
1172 | 948 | callbacks[0].callback(True) | ||
1173 | 949 | yield self.poke_zk() | ||
1174 | 950 | |||
1175 | 951 | # Verify result counts | ||
1176 | 952 | self.assertEqual(len(results), 2) | ||
1177 | 953 | self.assertEqual(len(contents), 1) | ||
1178 | 954 | |||
1179 | 955 | # Verify result values. Even though we have created event, the | ||
1180 | 956 | # setting retrieved shows the hook is not enabled. | ||
1181 | 957 | self.assertEqual(results[-1].type_name, "created") | ||
1182 | 958 | self.assertEqual(contents[-1], None) | ||
1183 | 959 | |||
1184 | 960 | yield unit_state.set_resolved(NO_HOOKS) | ||
1185 | 961 | callbacks[1].callback(True) | ||
1186 | 962 | yield self.poke_zk() | ||
1187 | 963 | |||
1188 | 964 | self.assertEqual(len(results), 3) | ||
1189 | 965 | self.assertEqual(contents[-1], {"retry": NO_HOOKS}) | ||
1190 | 966 | |||
1191 | 967 | # Clear out any pending activity. | ||
1192 | 968 | yield self.poke_zk() | ||
1193 | 969 | |||
1194 | 970 | @inlineCallbacks | ||
1195 | 971 | def test_watch_relation_resolved_slow_callback(self): | ||
1196 | 972 | """A slow watch callback is still invoked serially.""" | ||
1197 | 973 | unit_state = yield self.get_unit_state() | ||
1198 | 974 | |||
1199 | 975 | callbacks = [Deferred() for i in range(5)] | ||
1200 | 976 | results = [] | ||
1201 | 977 | contents = [] | ||
1202 | 978 | |||
1203 | 979 | @inlineCallbacks | ||
1204 | 980 | def watch(value): | ||
1205 | 981 | results.append(value) | ||
1206 | 982 | yield callbacks[len(results) - 1] | ||
1207 | 983 | contents.append((yield unit_state.get_relation_resolved())) | ||
1208 | 984 | |||
1209 | 985 | yield unit_state.watch_relation_resolved(watch) | ||
1210 | 986 | |||
1211 | 987 | # These get collapsed into a single event | ||
1212 | 988 | yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) | ||
1213 | 989 | yield unit_state.clear_relation_resolved() | ||
1214 | 990 | yield self.poke_zk() | ||
1215 | 991 | |||
1216 | 992 | # Verify the callback hasn't completed | ||
1217 | 993 | self.assertEqual(len(results), 1) | ||
1218 | 994 | self.assertEqual(len(contents), 0) | ||
1219 | 995 | |||
1220 | 996 | # Let it finish | ||
1221 | 997 | callbacks[0].callback(True) | ||
1222 | 998 | yield self.poke_zk() | ||
1223 | 999 | |||
1224 | 1000 | # Verify result counts | ||
1225 | 1001 | self.assertEqual(len(results), 2) | ||
1226 | 1002 | self.assertEqual(len(contents), 1) | ||
1227 | 1003 | |||
1228 | 1004 | # Verify result values. Even though we have created event, the | ||
1229 | 1005 | # setting retrieved shows the hook is not enabled. | ||
1230 | 1006 | self.assertEqual(results[-1].type_name, "created") | ||
1231 | 1007 | self.assertEqual(contents[-1], None) | ||
1232 | 1008 | |||
1233 | 1009 | yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}) | ||
1234 | 1010 | callbacks[1].callback(True) | ||
1235 | 1011 | yield self.poke_zk() | ||
1236 | 1012 | |||
1237 | 1013 | self.assertEqual(len(results), 3) | ||
1238 | 1014 | self.assertEqual(contents[-1], {"0": RETRY_HOOKS}) | ||
1239 | 1015 | # Clear out any pending activity. | ||
1240 | 1016 | yield self.poke_zk() | ||
1241 | 1017 | |||
1242 | 1018 | @inlineCallbacks | ||
1243 | 741 | def test_set_and_clear_upgrade_flag(self): | 1019 | def test_set_and_clear_upgrade_flag(self): |
1244 | 742 | """An upgrade flag can be set on a unit.""" | 1020 | """An upgrade flag can be set on a unit.""" |
1245 | 743 | 1021 | ||
1246 | @@ -870,6 +1148,9 @@ | |||
1247 | 870 | self.assertEqual(results[-1].type_name, "created") | 1148 | self.assertEqual(results[-1].type_name, "created") |
1248 | 871 | self.assertEqual(contents[-1], True) | 1149 | self.assertEqual(contents[-1], True) |
1249 | 872 | 1150 | ||
1250 | 1151 | # Clear out any pending activity. | ||
1251 | 1152 | yield self.poke_zk() | ||
1252 | 1153 | |||
1253 | 873 | @inlineCallbacks | 1154 | @inlineCallbacks |
1254 | 874 | def test_enable_debug_hook(self): | 1155 | def test_enable_debug_hook(self): |
1255 | 875 | """Unit hook debugging can be enabled on the unit state.""" | 1156 | """Unit hook debugging can be enabled on the unit state.""" |
1256 | @@ -1061,6 +1342,9 @@ | |||
1257 | 1061 | self.assertEqual(results[-1].type_name, "created") | 1342 | self.assertEqual(results[-1].type_name, "created") |
1258 | 1062 | self.assertEqual(contents[-1], {"debug_hooks": ["*"]}) | 1343 | self.assertEqual(contents[-1], {"debug_hooks": ["*"]}) |
1259 | 1063 | 1344 | ||
1260 | 1345 | # Clear out any pending activity. | ||
1261 | 1346 | yield self.poke_zk() | ||
1262 | 1347 | |||
1263 | 1064 | @inlineCallbacks | 1348 | @inlineCallbacks |
1264 | 1065 | def test_service_unit_agent(self): | 1349 | def test_service_unit_agent(self): |
1265 | 1066 | """A service unit state has an associated unit agent.""" | 1350 | """A service unit state has an associated unit agent.""" |
1266 | 1067 | 1351 | ||
1267 | === modified file 'ensemble/state/tests/test_utils.py' | |||
1268 | --- ensemble/state/tests/test_utils.py 2011-04-28 17:52:55 +0000 | |||
1269 | +++ ensemble/state/tests/test_utils.py 2011-05-02 21:57:29 +0000 | |||
1270 | @@ -12,9 +12,10 @@ | |||
1271 | 12 | import yaml | 12 | import yaml |
1272 | 13 | 13 | ||
1273 | 14 | from ensemble.lib.testing import TestCase | 14 | from ensemble.lib.testing import TestCase |
1276 | 15 | from ensemble.state.errors import StateNotFound | 15 | from ensemble.state.errors import StateChanged, StateNotFound |
1277 | 16 | from ensemble.state.utils import (PortWatcher, remove_tree, | 16 | from ensemble.state.utils import (PortWatcher, remove_tree, dict_merge, |
1278 | 17 | get_open_port, YAMLState) | 17 | get_open_port, YAMLState) |
1279 | 18 | |||
1280 | 18 | from ensemble.tests.common import get_test_zookeeper_address | 19 | from ensemble.tests.common import get_test_zookeeper_address |
1281 | 19 | 20 | ||
1282 | 20 | 21 | ||
1283 | @@ -206,6 +207,26 @@ | |||
1284 | 206 | self.assertNotIn("zoo", children) | 207 | self.assertNotIn("zoo", children) |
1285 | 207 | 208 | ||
1286 | 208 | 209 | ||
1287 | 210 | class DictMergeTest(TestCase): | ||
1288 | 211 | |||
1289 | 212 | def test_merge_no_match(self): | ||
1290 | 213 | self.assertEqual( | ||
1291 | 214 | dict_merge(dict(a=1), dict(b=2)), | ||
1292 | 215 | dict(a=1, b=2)) | ||
1293 | 216 | |||
1294 | 217 | def test_merge_matching_keys_same_value(self): | ||
1295 | 218 | self.assertEqual( | ||
1296 | 219 | dict_merge(dict(a=1, b=2), dict(b=2, c=1)), | ||
1297 | 220 | dict(a=1, b=2, c=1)) | ||
1298 | 221 | |||
1299 | 222 | def test_merge_conflict(self): | ||
1300 | 223 | self.assertRaises( | ||
1301 | 224 | StateChanged, | ||
1302 | 225 | dict_merge, | ||
1303 | 226 | dict(a=1, b=3), | ||
1304 | 227 | dict(b=2, c=1)) | ||
1305 | 228 | |||
1306 | 229 | |||
1307 | 209 | class OpenPortTest(TestCase): | 230 | class OpenPortTest(TestCase): |
1308 | 210 | 231 | ||
1309 | 211 | def test_get_open_port(self): | 232 | def test_get_open_port(self): |
1310 | @@ -301,7 +322,6 @@ | |||
1311 | 301 | zk_data = yaml.load(zk_data) | 322 | zk_data = yaml.load(zk_data) |
1312 | 302 | self.assertEqual(zk_data, options) | 323 | self.assertEqual(zk_data, options) |
1313 | 303 | 324 | ||
1314 | 304 | |||
1315 | 305 | @inlineCallbacks | 325 | @inlineCallbacks |
1316 | 306 | def test_conflict_on_set(self): | 326 | def test_conflict_on_set(self): |
1317 | 307 | """Version conflict error tests. | 327 | """Version conflict error tests. |
1318 | @@ -357,8 +377,8 @@ | |||
1319 | 357 | yield node.read() | 377 | yield node.read() |
1320 | 358 | 378 | ||
1321 | 359 | options = dict(alpha="beta", one=1) | 379 | options = dict(alpha="beta", one=1) |
1324 | 360 | node["alpha"] = "beta" | 380 | node["alpha"] = "beta" |
1325 | 361 | node["one"] = 1 | 381 | node["one"] = 1 |
1326 | 362 | yield node.write() | 382 | yield node.write() |
1327 | 363 | 383 | ||
1328 | 364 | # a local get should reflect proper data | 384 | # a local get should reflect proper data |
1329 | @@ -369,7 +389,6 @@ | |||
1330 | 369 | zk_data = yaml.load(zk_data) | 389 | zk_data = yaml.load(zk_data) |
1331 | 370 | self.assertEqual(zk_data, options) | 390 | self.assertEqual(zk_data, options) |
1332 | 371 | 391 | ||
1333 | 372 | |||
1334 | 373 | @inlineCallbacks | 392 | @inlineCallbacks |
1335 | 374 | def test_multiple_reads(self): | 393 | def test_multiple_reads(self): |
1336 | 375 | """Calling read resets state to ZK after multiple round-trips.""" | 394 | """Calling read resets state to ZK after multiple round-trips.""" |
1337 | @@ -415,7 +434,7 @@ | |||
1338 | 415 | 434 | ||
1339 | 416 | result = node.pop("foo") | 435 | result = node.pop("foo") |
1340 | 417 | self.assertEqual(result, "bar") | 436 | self.assertEqual(result, "bar") |
1342 | 418 | self.assertEqual(node, {"alpha": "beta",}) | 437 | self.assertEqual(node, {"alpha": "beta"}) |
1343 | 419 | 438 | ||
1344 | 420 | node["delta"] = "gamma" | 439 | node["delta"] = "gamma" |
1345 | 421 | self.assertEqual(set(node.keys()), set(("alpha", "delta"))) | 440 | self.assertEqual(set(node.keys()), set(("alpha", "delta"))) |
1346 | @@ -424,7 +443,6 @@ | |||
1347 | 424 | self.assertIn(("alpha", "beta"), result) | 443 | self.assertIn(("alpha", "beta"), result) |
1348 | 425 | self.assertIn(("delta", "gamma"), result) | 444 | self.assertIn(("delta", "gamma"), result) |
1349 | 426 | 445 | ||
1350 | 427 | |||
1351 | 428 | @inlineCallbacks | 446 | @inlineCallbacks |
1352 | 429 | def test_del_empties_state(self): | 447 | def test_del_empties_state(self): |
1353 | 430 | d = YAMLState(self.client, self.path) | 448 | d = YAMLState(self.client, self.path) |
1354 | @@ -500,12 +518,8 @@ | |||
1355 | 500 | yield d1.read() | 518 | yield d1.read() |
1356 | 501 | self.assertEquals(d1, d2) | 519 | self.assertEquals(d1, d2) |
1357 | 502 | 520 | ||
1358 | 503 | |||
1359 | 504 | @inlineCallbacks | 521 | @inlineCallbacks |
1360 | 505 | def test_read_requires_node(self): | 522 | def test_read_requires_node(self): |
1361 | 506 | """Validate that read raises when required=True.""" | 523 | """Validate that read raises when required=True.""" |
1362 | 507 | d1 = YAMLState(self.client, self.path) | 524 | d1 = YAMLState(self.client, self.path) |
1363 | 508 | yield self.assertFailure(d1.read(True), StateNotFound) | 525 | yield self.assertFailure(d1.read(True), StateNotFound) |
1364 | 509 | |||
1365 | 510 | |||
1366 | 511 | |||
1367 | 512 | 526 | ||
1368 | === modified file 'ensemble/state/utils.py' | |||
1369 | --- ensemble/state/utils.py 2011-04-28 17:52:55 +0000 | |||
1370 | +++ ensemble/state/utils.py 2011-05-02 21:57:29 +0000 | |||
1371 | @@ -9,8 +9,10 @@ | |||
1372 | 9 | import yaml | 9 | import yaml |
1373 | 10 | import zookeeper | 10 | import zookeeper |
1374 | 11 | 11 | ||
1375 | 12 | from ensemble.state.errors import StateChanged | ||
1376 | 12 | from ensemble.state.errors import StateNotFound | 13 | from ensemble.state.errors import StateNotFound |
1377 | 13 | 14 | ||
1378 | 15 | |||
1379 | 14 | class PortWatcher(object): | 16 | class PortWatcher(object): |
1380 | 15 | 17 | ||
1381 | 16 | def __init__(self, host, port, timeout, listen=False): | 18 | def __init__(self, host, port, timeout, listen=False): |
1382 | @@ -91,6 +93,22 @@ | |||
1383 | 91 | return port | 93 | return port |
1384 | 92 | 94 | ||
1385 | 93 | 95 | ||
1386 | 96 | def dict_merge(d1, d2): | ||
1387 | 97 | """Return a union of dicts if they have no conflicting values. | ||
1388 | 98 | |||
1389 | 99 | Else raise a StateChanged error. | ||
1390 | 100 | """ | ||
1391 | 101 | must_match = set(d1).intersection(d2) | ||
1392 | 102 | for k in must_match: | ||
1393 | 103 | if not d1[k] == d2[k]: | ||
1394 | 104 | raise StateChanged() | ||
1395 | 105 | |||
1396 | 106 | d = {} | ||
1397 | 107 | d.update(d1) | ||
1398 | 108 | d.update(d2) | ||
1399 | 109 | return d | ||
1400 | 110 | |||
1401 | 111 | |||
1402 | 94 | class YAMLState(DictMixin): | 112 | class YAMLState(DictMixin): |
1403 | 95 | """Provides a dict like interface around a Zookeeper node | 113 | """Provides a dict like interface around a Zookeeper node |
1404 | 96 | containing serialised YAML data. The dict provided represents the | 114 | containing serialised YAML data. The dict provided represents the |
1405 | @@ -144,7 +162,6 @@ | |||
1406 | 144 | if required: | 162 | if required: |
1407 | 145 | raise StateNotFound(self._path) | 163 | raise StateNotFound(self._path) |
1408 | 146 | 164 | ||
1409 | 147 | |||
1410 | 148 | def _check(self): | 165 | def _check(self): |
1411 | 149 | """Verify that sync was called for operations which expect it.""" | 166 | """Verify that sync was called for operations which expect it.""" |
1412 | 150 | if self._pristine_cache is None: | 167 | if self._pristine_cache is None: |
1413 | @@ -164,7 +181,6 @@ | |||
1414 | 164 | self._check() | 181 | self._check() |
1415 | 165 | self._cache[key] = value | 182 | self._cache[key] = value |
1416 | 166 | 183 | ||
1417 | 167 | |||
1418 | 168 | def __delitem__(self, key): | 184 | def __delitem__(self, key): |
1419 | 169 | self._check() | 185 | self._check() |
1420 | 170 | del self._cache[key] | 186 | del self._cache[key] |
1421 | 171 | 187 | ||
1422 | === modified file 'ensemble/unit/lifecycle.py' | |||
1423 | --- ensemble/unit/lifecycle.py 2011-04-15 18:53:29 +0000 | |||
1424 | +++ ensemble/unit/lifecycle.py 2011-05-02 21:57:29 +0000 | |||
1425 | @@ -2,14 +2,14 @@ | |||
1426 | 2 | import logging | 2 | import logging |
1427 | 3 | 3 | ||
1428 | 4 | from twisted.internet.defer import ( | 4 | from twisted.internet.defer import ( |
1430 | 5 | inlineCallbacks, DeferredLock) | 5 | inlineCallbacks, DeferredLock, returnValue) |
1431 | 6 | 6 | ||
1432 | 7 | from ensemble.hooks.invoker import Invoker | 7 | from ensemble.hooks.invoker import Invoker |
1433 | 8 | from ensemble.hooks.scheduler import HookScheduler | 8 | from ensemble.hooks.scheduler import HookScheduler |
1434 | 9 | from ensemble.state.hook import RelationChange | 9 | from ensemble.state.hook import RelationChange |
1435 | 10 | from ensemble.state.errors import StopWatcher, UnitRelationStateNotFound | 10 | from ensemble.state.errors import StopWatcher, UnitRelationStateNotFound |
1436 | 11 | 11 | ||
1438 | 12 | from ensemble.unit.workflow import RelationWorkflowState, RelationWorkflow | 12 | from ensemble.unit.workflow import RelationWorkflowState |
1439 | 13 | 13 | ||
1440 | 14 | 14 | ||
1441 | 15 | HOOK_SOCKET_FILE = ".ensemble.hookcli.sock" | 15 | HOOK_SOCKET_FILE = ".ensemble.hookcli.sock" |
1442 | @@ -32,7 +32,8 @@ | |||
1443 | 32 | self._unit_path = unit_path | 32 | self._unit_path = unit_path |
1444 | 33 | self._relations = {} | 33 | self._relations = {} |
1445 | 34 | self._running = False | 34 | self._running = False |
1447 | 35 | self._watching = False | 35 | self._watching_relation_memberships = False |
1448 | 36 | self._watching_relation_resolved = False | ||
1449 | 36 | self._run_lock = DeferredLock() | 37 | self._run_lock = DeferredLock() |
1450 | 37 | self._log = logging.getLogger("unit.lifecycle") | 38 | self._log = logging.getLogger("unit.lifecycle") |
1451 | 38 | 39 | ||
1452 | @@ -45,21 +46,23 @@ | |||
1453 | 45 | return self._relations[relation_id] | 46 | return self._relations[relation_id] |
1454 | 46 | 47 | ||
1455 | 47 | @inlineCallbacks | 48 | @inlineCallbacks |
1457 | 48 | def install(self): | 49 | def install(self, fire_hooks=True): |
1458 | 49 | """Invoke the unit's install hook. | 50 | """Invoke the unit's install hook. |
1459 | 50 | """ | 51 | """ |
1461 | 51 | yield self._execute_hook("install") | 52 | if fire_hooks: |
1462 | 53 | yield self._execute_hook("install") | ||
1463 | 52 | 54 | ||
1464 | 53 | @inlineCallbacks | 55 | @inlineCallbacks |
1466 | 54 | def upgrade_formula(self): | 56 | def upgrade_formula(self, fire_hooks=True): |
1467 | 55 | """Invoke the unit's upgrade-formula hook. | 57 | """Invoke the unit's upgrade-formula hook. |
1468 | 56 | """ | 58 | """ |
1470 | 57 | yield self._execute_hook("upgrade-formula", now=True) | 59 | if fire_hooks: |
1471 | 60 | yield self._execute_hook("upgrade-formula", now=True) | ||
1472 | 58 | # Restart hook queued hook execution. | 61 | # Restart hook queued hook execution. |
1473 | 59 | self._executor.start() | 62 | self._executor.start() |
1474 | 60 | 63 | ||
1475 | 61 | @inlineCallbacks | 64 | @inlineCallbacks |
1477 | 62 | def start(self): | 65 | def start(self, fire_hooks=True): |
1478 | 63 | """Invoke the start hook, and setup relation watching. | 66 | """Invoke the start hook, and setup relation watching. |
1479 | 64 | """ | 67 | """ |
1480 | 65 | self._log.debug("pre-start acquire, running:%s", self._running) | 68 | self._log.debug("pre-start acquire, running:%s", self._running) |
1481 | @@ -70,22 +73,29 @@ | |||
1482 | 70 | assert not self._running, "Already started" | 73 | assert not self._running, "Already started" |
1483 | 71 | 74 | ||
1484 | 72 | # Execute the start hook | 75 | # Execute the start hook |
1486 | 73 | yield self._execute_hook("start") | 76 | if fire_hooks: |
1487 | 77 | yield self._execute_hook("start") | ||
1488 | 74 | 78 | ||
1489 | 75 | # If we have any existing relations in memory, start them. | 79 | # If we have any existing relations in memory, start them. |
1490 | 76 | if self._relations: | 80 | if self._relations: |
1491 | 77 | self._log.debug("starting relation lifecycles") | 81 | self._log.debug("starting relation lifecycles") |
1492 | 78 | 82 | ||
1493 | 79 | for workflow in self._relations.values(): | 83 | for workflow in self._relations.values(): |
1494 | 80 | # should not transition an | ||
1495 | 81 | yield workflow.transition_state("up") | 84 | yield workflow.transition_state("up") |
1496 | 82 | 85 | ||
1497 | 83 | # Establish a watch on the existing relations. | 86 | # Establish a watch on the existing relations. |
1499 | 84 | if not self._watching: | 87 | if not self._watching_relation_memberships: |
1500 | 85 | self._log.debug("starting service relation watch") | 88 | self._log.debug("starting service relation watch") |
1501 | 86 | yield self._service.watch_relation_states( | 89 | yield self._service.watch_relation_states( |
1502 | 87 | self._on_service_relation_changes) | 90 | self._on_service_relation_changes) |
1504 | 88 | self._watching = True | 91 | self._watching_relation_memberships = True |
1505 | 92 | |||
1506 | 93 | # Establish a watch for resolved relations | ||
1507 | 94 | if not self._watching_relation_resolved: | ||
1508 | 95 | self._log.debug("starting unit relation resolved watch") | ||
1509 | 96 | yield self._unit.watch_relation_resolved( | ||
1510 | 97 | self._on_relation_resolved_changes) | ||
1511 | 98 | self._watching_relation_resolved = True | ||
1512 | 89 | 99 | ||
1513 | 90 | # Set current status | 100 | # Set current status |
1514 | 91 | self._running = True | 101 | self._running = True |
1515 | @@ -94,7 +104,7 @@ | |||
1516 | 94 | self._log.debug("started unit lifecycle") | 104 | self._log.debug("started unit lifecycle") |
1517 | 95 | 105 | ||
1518 | 96 | @inlineCallbacks | 106 | @inlineCallbacks |
1520 | 97 | def stop(self): | 107 | def stop(self, fire_hooks=True): |
1521 | 98 | """Stop the unit, executes the stop hook, and stops relation watching. | 108 | """Stop the unit, executes the stop hook, and stops relation watching. |
1522 | 99 | """ | 109 | """ |
1523 | 100 | self._log.debug("pre-stop acquire, running:%s", self._running) | 110 | self._log.debug("pre-stop acquire, running:%s", self._running) |
1524 | @@ -106,10 +116,12 @@ | |||
1525 | 106 | # Stop relation lifecycles | 116 | # Stop relation lifecycles |
1526 | 107 | if self._relations: | 117 | if self._relations: |
1527 | 108 | self._log.debug("stopping relation lifecycles") | 118 | self._log.debug("stopping relation lifecycles") |
1528 | 119 | |||
1529 | 109 | for workflow in self._relations.values(): | 120 | for workflow in self._relations.values(): |
1530 | 110 | yield workflow.transition_state("down") | 121 | yield workflow.transition_state("down") |
1531 | 111 | 122 | ||
1533 | 112 | yield self._execute_hook("stop") | 123 | if fire_hooks: |
1534 | 124 | yield self._execute_hook("stop") | ||
1535 | 113 | 125 | ||
1536 | 114 | # Set current status | 126 | # Set current status |
1537 | 115 | self._running = False | 127 | self._running = False |
1538 | @@ -118,6 +130,49 @@ | |||
1539 | 118 | self._log.debug("stopped unit lifecycle") | 130 | self._log.debug("stopped unit lifecycle") |
1540 | 119 | 131 | ||
1541 | 120 | @inlineCallbacks | 132 | @inlineCallbacks |
1542 | 133 | def _on_relation_resolved_changes(self, event): | ||
1543 | 134 | """Callback for unit relation resolved watching. | ||
1544 | 135 | |||
1545 | 136 | The callback is invoked whenever the relation resolved | ||
1546 | 137 | settings change. | ||
1547 | 138 | """ | ||
1548 | 139 | self._log.debug("relation resolved changed") | ||
1549 | 140 | # Acquire the run lock, and process the changes. | ||
1550 | 141 | yield self._run_lock.acquire() | ||
1551 | 142 | |||
1552 | 143 | try: | ||
1553 | 144 | # If the unit lifecycle isn't running we shouldn't process | ||
1554 | 145 | # any relation resolutions. | ||
1555 | 146 | if not self._running: | ||
1556 | 147 | self._log.debug("stop watch relation resolved changes") | ||
1557 | 148 | self._watching_relation_resolved = False | ||
1558 | 149 | raise StopWatcher() | ||
1559 | 150 | |||
1560 | 151 | self._log.info("processing relation resolved changed") | ||
1561 | 152 | if self._client.connected: | ||
1562 | 153 | yield self._process_relation_resolved_changes() | ||
1563 | 154 | finally: | ||
1564 | 155 | yield self._run_lock.release() | ||
1565 | 156 | |||
1566 | 157 | @inlineCallbacks | ||
1567 | 158 | def _process_relation_resolved_changes(self): | ||
1568 | 159 | """Invoke retry transitions on relations if their not running. | ||
1569 | 160 | """ | ||
1570 | 161 | relation_resolved = yield self._unit.get_relation_resolved() | ||
1571 | 162 | if relation_resolved is None: | ||
1572 | 163 | returnValue(None) | ||
1573 | 164 | else: | ||
1574 | 165 | yield self._unit.clear_relation_resolved() | ||
1575 | 166 | |||
1576 | 167 | keys = set(relation_resolved).intersection(self._relations) | ||
1577 | 168 | for rel_id in keys: | ||
1578 | 169 | relation_workflow = self._relations[rel_id] | ||
1579 | 170 | relation_state = yield relation_workflow.get_state() | ||
1580 | 171 | if relation_state == "up": | ||
1581 | 172 | continue | ||
1582 | 173 | yield relation_workflow.transition_state("up") | ||
1583 | 174 | |||
1584 | 175 | @inlineCallbacks | ||
1585 | 121 | def _on_service_relation_changes(self, old_relations, new_relations): | 176 | def _on_service_relation_changes(self, old_relations, new_relations): |
1586 | 122 | """Callback for service relation watching. | 177 | """Callback for service relation watching. |
1587 | 123 | 178 | ||
1588 | @@ -137,7 +192,7 @@ | |||
1589 | 137 | # If the lifecycle is not running, then stop the watcher | 192 | # If the lifecycle is not running, then stop the watcher |
1590 | 138 | if not self._running: | 193 | if not self._running: |
1591 | 139 | self._log.debug("stop service-rel watcher, discarding changes") | 194 | self._log.debug("stop service-rel watcher, discarding changes") |
1593 | 140 | self._watching = False | 195 | self._watching_relation_memberships = False |
1594 | 141 | raise StopWatcher() | 196 | raise StopWatcher() |
1595 | 142 | 197 | ||
1596 | 143 | self._log.debug("processing relations changed") | 198 | self._log.debug("processing relations changed") |
1597 | @@ -147,9 +202,9 @@ | |||
1598 | 147 | 202 | ||
1599 | 148 | @inlineCallbacks | 203 | @inlineCallbacks |
1600 | 149 | def _process_service_changes(self, old_relations, new_relations): | 204 | def _process_service_changes(self, old_relations, new_relations): |
1602 | 150 | """Add and remove unit lifecycles per the service relations changes. | 205 | """Add and remove unit lifecycles per the service relations Determine. |
1603 | 151 | """ | 206 | """ |
1605 | 152 | # Determine relation delta of global zk state with our memory state. | 207 | # changes relation delta of global zk state with our memory state. |
1606 | 153 | new_relations = dict([(service_relation.internal_relation_id, | 208 | new_relations = dict([(service_relation.internal_relation_id, |
1607 | 154 | service_relation) for | 209 | service_relation) for |
1608 | 155 | service_relation in new_relations]) | 210 | service_relation in new_relations]) |
1609 | @@ -336,8 +391,11 @@ | |||
1610 | 336 | self._error_handler = handler | 391 | self._error_handler = handler |
1611 | 337 | 392 | ||
1612 | 338 | @inlineCallbacks | 393 | @inlineCallbacks |
1614 | 339 | def start(self): | 394 | def start(self, watches=True): |
1615 | 340 | """Start watching related units and executing change hooks. | 395 | """Start watching related units and executing change hooks. |
1616 | 396 | |||
1617 | 397 | @param watches: boolean parameter denoting if relation watches | ||
1618 | 398 | should be started. | ||
1619 | 341 | """ | 399 | """ |
1620 | 342 | yield self._run_lock.acquire() | 400 | yield self._run_lock.acquire() |
1621 | 343 | try: | 401 | try: |
1622 | @@ -348,19 +406,24 @@ | |||
1623 | 348 | self._watcher = yield self._unit_relation.watch_related_units( | 406 | self._watcher = yield self._unit_relation.watch_related_units( |
1624 | 349 | self._scheduler.notify_change) | 407 | self._scheduler.notify_change) |
1625 | 350 | # And start the watcher. | 408 | # And start the watcher. |
1627 | 351 | yield self._watcher.start() | 409 | if watches: |
1628 | 410 | yield self._watcher.start() | ||
1629 | 352 | finally: | 411 | finally: |
1630 | 353 | self._run_lock.release() | 412 | self._run_lock.release() |
1631 | 354 | self._log.debug( | 413 | self._log.debug( |
1632 | 355 | "started relation:%s lifecycle", self._relation_name) | 414 | "started relation:%s lifecycle", self._relation_name) |
1633 | 356 | 415 | ||
1634 | 357 | @inlineCallbacks | 416 | @inlineCallbacks |
1636 | 358 | def stop(self): | 417 | def stop(self, watches=True): |
1637 | 359 | """Stop watching changes and stop executing relation change hooks. | 418 | """Stop watching changes and stop executing relation change hooks. |
1638 | 419 | |||
1639 | 420 | @param watches: boolean parameter denoting if relation watches | ||
1640 | 421 | should be stopped. | ||
1641 | 360 | """ | 422 | """ |
1642 | 361 | yield self._run_lock.acquire() | 423 | yield self._run_lock.acquire() |
1643 | 362 | try: | 424 | try: |
1645 | 363 | self._watcher.stop() | 425 | if watches and self._watcher: |
1646 | 426 | self._watcher.stop() | ||
1647 | 364 | self._scheduler.stop() | 427 | self._scheduler.stop() |
1648 | 365 | finally: | 428 | finally: |
1649 | 366 | yield self._run_lock.release() | 429 | yield self._run_lock.release() |
1650 | 367 | 430 | ||
1651 | === modified file 'ensemble/unit/tests/test_lifecycle.py' | |||
1652 | --- ensemble/unit/tests/test_lifecycle.py 2011-04-28 17:55:11 +0000 | |||
1653 | +++ ensemble/unit/tests/test_lifecycle.py 2011-05-02 21:57:29 +0000 | |||
1654 | @@ -12,6 +12,8 @@ | |||
1655 | 12 | from ensemble.unit.lifecycle import ( | 12 | from ensemble.unit.lifecycle import ( |
1656 | 13 | UnitLifecycle, UnitRelationLifecycle, RelationInvoker) | 13 | UnitLifecycle, UnitRelationLifecycle, RelationInvoker) |
1657 | 14 | 14 | ||
1658 | 15 | from ensemble.unit.workflow import RelationWorkflowState | ||
1659 | 16 | |||
1660 | 15 | from ensemble.hooks.invoker import Invoker | 17 | from ensemble.hooks.invoker import Invoker |
1661 | 16 | from ensemble.hooks.executor import HookExecutor | 18 | from ensemble.hooks.executor import HookExecutor |
1662 | 17 | 19 | ||
1663 | @@ -19,9 +21,11 @@ | |||
1664 | 19 | 21 | ||
1665 | 20 | from ensemble.state.endpoint import RelationEndpoint | 22 | from ensemble.state.endpoint import RelationEndpoint |
1666 | 21 | from ensemble.state.relation import ClientServerUnitWatcher | 23 | from ensemble.state.relation import ClientServerUnitWatcher |
1667 | 24 | from ensemble.state.service import NO_HOOKS | ||
1668 | 22 | from ensemble.state.tests.test_relation import RelationTestBase | 25 | from ensemble.state.tests.test_relation import RelationTestBase |
1669 | 23 | from ensemble.state.hook import RelationChange | 26 | from ensemble.state.hook import RelationChange |
1670 | 24 | 27 | ||
1671 | 28 | |||
1672 | 25 | from ensemble.lib.testing import TestCase | 29 | from ensemble.lib.testing import TestCase |
1673 | 26 | from ensemble.lib.mocker import MATCH | 30 | from ensemble.lib.mocker import MATCH |
1674 | 27 | 31 | ||
1675 | @@ -145,6 +149,182 @@ | |||
1676 | 145 | return output | 149 | return output |
1677 | 146 | 150 | ||
1678 | 147 | 151 | ||
1679 | 152 | class LifecycleResolvedTest(LifecycleTestBase): | ||
1680 | 153 | |||
1681 | 154 | @inlineCallbacks | ||
1682 | 155 | def setUp(self): | ||
1683 | 156 | yield super(LifecycleResolvedTest, self).setUp() | ||
1684 | 157 | yield self.setup_default_test_relation() | ||
1685 | 158 | self.lifecycle = UnitLifecycle( | ||
1686 | 159 | self.client, self.states["unit"], self.states["service"], | ||
1687 | 160 | self.unit_directory, self.executor) | ||
1688 | 161 | |||
1689 | 162 | def get_unit_relation_workflow(self, states): | ||
1690 | 163 | state_dir = os.path.join(self.ensemble_directory, "state") | ||
1691 | 164 | lifecycle = UnitRelationLifecycle( | ||
1692 | 165 | self.client, | ||
1693 | 166 | states["unit_relation"], | ||
1694 | 167 | states["service_relation"].relation_name, | ||
1695 | 168 | self.unit_directory, | ||
1696 | 169 | self.executor) | ||
1697 | 170 | |||
1698 | 171 | workflow = RelationWorkflowState( | ||
1699 | 172 | self.client, | ||
1700 | 173 | states["unit_relation"], | ||
1701 | 174 | lifecycle, | ||
1702 | 175 | state_dir) | ||
1703 | 176 | |||
1704 | 177 | return (workflow, lifecycle) | ||
1705 | 178 | |||
1706 | 179 | @inlineCallbacks | ||
1707 | 180 | def test_resolved_relation_watch_unit_lifecycle_not_running(self): | ||
1708 | 181 | """If the unit is not running then no relation resolving is performed. | ||
1709 | 182 | However the resolution value remains the same. | ||
1710 | 183 | """ | ||
1711 | 184 | # Start the unit. | ||
1712 | 185 | yield self.lifecycle.start() | ||
1713 | 186 | |||
1714 | 187 | # Wait for the relation to be started.... TODO: async background work | ||
1715 | 188 | yield self.sleep(0.1) | ||
1716 | 189 | |||
1717 | 190 | # Simulate relation down on an individual unit relation | ||
1718 | 191 | workflow = self.lifecycle._relations.get( | ||
1719 | 192 | self.states["unit_relation"].internal_relation_id) | ||
1720 | 193 | self.assertEqual("up", (yield workflow.get_state())) | ||
1721 | 194 | |||
1722 | 195 | yield workflow.transition_state("down") | ||
1723 | 196 | resolved = self.wait_on_state(workflow, "up") | ||
1724 | 197 | |||
1725 | 198 | # Stop the unit lifecycle | ||
1726 | 199 | yield self.lifecycle.stop() | ||
1727 | 200 | |||
1728 | 201 | # Set the relation to resolved | ||
1729 | 202 | yield self.states["unit"].set_relation_resolved( | ||
1730 | 203 | {self.states["unit_relation"].internal_relation_id: NO_HOOKS}) | ||
1731 | 204 | |||
1732 | 205 | # Give a moment for the watch to fire erroneously | ||
1733 | 206 | yield self.sleep(0.2) | ||
1734 | 207 | |||
1735 | 208 | # Ensure we didn't attempt a transition. | ||
1736 | 209 | self.assertFalse(resolved.called) | ||
1737 | 210 | self.assertEqual( | ||
1738 | 211 | {self.states["unit_relation"].internal_relation_id: NO_HOOKS}, | ||
1739 | 212 | (yield self.states["unit"].get_relation_resolved())) | ||
1740 | 213 | |||
1741 | 214 | # If the unit is restarted start, we currently have the | ||
1742 | 215 | # behavior that the unit relation workflow will automatically | ||
1743 | 216 | # be transitioned back to running, as part of the normal state | ||
1744 | 217 | # transition. Sigh.. we should have a separate error | ||
1745 | 218 | # state for relation hooks then down with state variable usage. | ||
1746 | 219 | |||
1747 | 220 | @inlineCallbacks | ||
1748 | 221 | def test_resolved_relation_watch_relation_up(self): | ||
1749 | 222 | """If a relation marked as to be resolved is already running, | ||
1750 | 223 | then no work is performed. | ||
1751 | 224 | """ | ||
1752 | 225 | # Start the unit. | ||
1753 | 226 | yield self.lifecycle.start() | ||
1754 | 227 | |||
1755 | 228 | # Wait for the relation to be started.... TODO: async background work | ||
1756 | 229 | yield self.sleep(0.1) | ||
1757 | 230 | |||
1758 | 231 | # get a hold of the unit relation and verify state | ||
1759 | 232 | workflow = self.lifecycle._relations.get( | ||
1760 | 233 | self.states["unit_relation"].internal_relation_id) | ||
1761 | 234 | self.assertEqual("up", (yield workflow.get_state())) | ||
1762 | 235 | |||
1763 | 236 | # Set the relation to resolved | ||
1764 | 237 | yield self.states["unit"].set_relation_resolved( | ||
1765 | 238 | {self.states["unit_relation"].internal_relation_id: NO_HOOKS}) | ||
1766 | 239 | |||
1767 | 240 | # Give a moment for the async background work. | ||
1768 | 241 | yield self.sleep(0.1) | ||
1769 | 242 | |||
1770 | 243 | # Ensure we're still up and the relation resolved setting has been | ||
1771 | 244 | # cleared. | ||
1772 | 245 | self.assertEqual( | ||
1773 | 246 | None, (yield self.states["unit"].get_relation_resolved())) | ||
1774 | 247 | self.assertEqual("up", (yield workflow.get_state())) | ||
1775 | 248 | |||
1776 | 249 | @inlineCallbacks | ||
1777 | 250 | def test_resolved_relation_watch_from_error(self): | ||
1778 | 251 | """Unit lifecycle's will process a unit relation resolved | ||
1779 | 252 | setting, and transition a down relation back to a running | ||
1780 | 253 | state. | ||
1781 | 254 | """ | ||
1782 | 255 | log_output = self.capture_logging( | ||
1783 | 256 | "unit.lifecycle", level=logging.DEBUG) | ||
1784 | 257 | |||
1785 | 258 | # Start the unit. | ||
1786 | 259 | yield self.lifecycle.start() | ||
1787 | 260 | |||
1788 | 261 | # Wait for the relation to be started... TODO: async background work | ||
1789 | 262 | yield self.sleep(0.1) | ||
1790 | 263 | |||
1791 | 264 | # Simulate an error condition | ||
1792 | 265 | workflow = self.lifecycle._relations.get( | ||
1793 | 266 | self.states["unit_relation"].internal_relation_id) | ||
1794 | 267 | self.assertEqual("up", (yield workflow.get_state())) | ||
1795 | 268 | yield workflow.fire_transition("error") | ||
1796 | 269 | |||
1797 | 270 | resolved = self.wait_on_state(workflow, "up") | ||
1798 | 271 | |||
1799 | 272 | # Set the relation to resolved | ||
1800 | 273 | yield self.states["unit"].set_relation_resolved( | ||
1801 | 274 | {self.states["unit_relation"].internal_relation_id: NO_HOOKS}) | ||
1802 | 275 | |||
1803 | 276 | # Wait for the relation to come back up | ||
1804 | 277 | value = yield self.states["unit"].get_relation_resolved() | ||
1805 | 278 | |||
1806 | 279 | yield resolved | ||
1807 | 280 | |||
1808 | 281 | # Verify state | ||
1809 | 282 | value = yield workflow.get_state() | ||
1810 | 283 | self.assertEqual(value, "up") | ||
1811 | 284 | |||
1812 | 285 | self.assertIn( | ||
1813 | 286 | "processing relation resolved changed", log_output.getvalue()) | ||
1814 | 287 | |||
1815 | 288 | @inlineCallbacks | ||
1816 | 289 | def test_resolved_relation_watch(self): | ||
1817 | 290 | """Unit lifecycle's will process a unit relation resolved | ||
1818 | 291 | setting, and transition a down relation back to a running | ||
1819 | 292 | state. | ||
1820 | 293 | """ | ||
1821 | 294 | log_output = self.capture_logging( | ||
1822 | 295 | "unit.lifecycle", level=logging.DEBUG) | ||
1823 | 296 | |||
1824 | 297 | # Start the unit. | ||
1825 | 298 | yield self.lifecycle.start() | ||
1826 | 299 | |||
1827 | 300 | # Wait for the relation to be started... TODO: async background work | ||
1828 | 301 | yield self.sleep(0.1) | ||
1829 | 302 | |||
1830 | 303 | # Simulate an error condition | ||
1831 | 304 | workflow = self.lifecycle._relations.get( | ||
1832 | 305 | self.states["unit_relation"].internal_relation_id) | ||
1833 | 306 | self.assertEqual("up", (yield workflow.get_state())) | ||
1834 | 307 | yield workflow.transition_state("down") | ||
1835 | 308 | |||
1836 | 309 | resolved = self.wait_on_state(workflow, "up") | ||
1837 | 310 | |||
1838 | 311 | # Set the relation to resolved | ||
1839 | 312 | yield self.states["unit"].set_relation_resolved( | ||
1840 | 313 | {self.states["unit_relation"].internal_relation_id: NO_HOOKS}) | ||
1841 | 314 | |||
1842 | 315 | # Wait for the relation to come back up | ||
1843 | 316 | value = yield self.states["unit"].get_relation_resolved() | ||
1844 | 317 | |||
1845 | 318 | yield resolved | ||
1846 | 319 | |||
1847 | 320 | # Verify state | ||
1848 | 321 | value = yield workflow.get_state() | ||
1849 | 322 | self.assertEqual(value, "up") | ||
1850 | 323 | |||
1851 | 324 | self.assertIn( | ||
1852 | 325 | "processing relation resolved changed", log_output.getvalue()) | ||
1853 | 326 | |||
1854 | 327 | |||
1855 | 148 | class UnitLifecycleTest(LifecycleTestBase): | 328 | class UnitLifecycleTest(LifecycleTestBase): |
1856 | 149 | 329 | ||
1857 | 150 | @inlineCallbacks | 330 | @inlineCallbacks |
1858 | @@ -187,6 +367,45 @@ | |||
1859 | 187 | # verify the sockets are cleaned up. | 367 | # verify the sockets are cleaned up. |
1860 | 188 | self.assertEqual(os.listdir(self.unit_directory), ["formula"]) | 368 | self.assertEqual(os.listdir(self.unit_directory), ["formula"]) |
1861 | 189 | 369 | ||
1862 | 370 | @inlineCallbacks | ||
1863 | 371 | def test_start_sans_hook(self): | ||
1864 | 372 | """The lifecycle start can be invoked without firing hooks.""" | ||
1865 | 373 | self.write_hook("start", "#!/bin/sh\n exit 1") | ||
1866 | 374 | start_executed = self.wait_on_hook("start") | ||
1867 | 375 | yield self.lifecycle.start(fire_hooks=False) | ||
1868 | 376 | # Wait for unit relation background processing.... | ||
1869 | 377 | yield self.sleep(0.1) | ||
1870 | 378 | self.assertFalse(start_executed.called) | ||
1871 | 379 | |||
1872 | 380 | @inlineCallbacks | ||
1873 | 381 | def test_stop_sans_hook(self): | ||
1874 | 382 | """The lifecycle stop can be invoked without firing hooks.""" | ||
1875 | 383 | self.write_hook("stop", "#!/bin/sh\n exit 1") | ||
1876 | 384 | stop_executed = self.wait_on_hook("stop") | ||
1877 | 385 | yield self.lifecycle.start() | ||
1878 | 386 | yield self.lifecycle.stop(fire_hooks=False) | ||
1879 | 387 | # Wait for unit relation background processing.... | ||
1880 | 388 | yield self.sleep(0.1) | ||
1881 | 389 | self.assertFalse(stop_executed.called) | ||
1882 | 390 | |||
1883 | 391 | @inlineCallbacks | ||
1884 | 392 | def test_install_sans_hook(self): | ||
1885 | 393 | """The lifecycle install can be invoked without firing hooks.""" | ||
1886 | 394 | self.write_hook("install", "#!/bin/sh\n exit 1") | ||
1887 | 395 | install_executed = self.wait_on_hook("install") | ||
1888 | 396 | yield self.lifecycle.install(fire_hooks=False) | ||
1889 | 397 | self.assertFalse(install_executed.called) | ||
1890 | 398 | |||
1891 | 399 | @inlineCallbacks | ||
1892 | 400 | def test_upgrade_sans_hook(self): | ||
1893 | 401 | """The lifecycle upgrade can be invoked without firing hooks.""" | ||
1894 | 402 | self.executor.stop() | ||
1895 | 403 | self.write_hook("upgrade-formula", "#!/bin/sh\n exit 1") | ||
1896 | 404 | upgrade_executed = self.wait_on_hook("upgrade-formula") | ||
1897 | 405 | yield self.lifecycle.upgrade_formula(fire_hooks=False) | ||
1898 | 406 | self.assertFalse(upgrade_executed.called) | ||
1899 | 407 | self.assertTrue(self.executor.running) | ||
1900 | 408 | |||
1901 | 190 | def test_hook_error(self): | 409 | def test_hook_error(self): |
1902 | 191 | """Verify hook execution error, raises an exception.""" | 410 | """Verify hook execution error, raises an exception.""" |
1903 | 192 | self.write_hook("install", '#!/bin/sh\n exit 1') | 411 | self.write_hook("install", '#!/bin/sh\n exit 1') |
1904 | @@ -196,14 +415,12 @@ | |||
1905 | 196 | def test_hook_not_executable(self): | 415 | def test_hook_not_executable(self): |
1906 | 197 | """A hook not executable, raises an exception.""" | 416 | """A hook not executable, raises an exception.""" |
1907 | 198 | self.write_hook("install", '#!/bin/sh\n exit 0', no_exec=True) | 417 | self.write_hook("install", '#!/bin/sh\n exit 0', no_exec=True) |
1908 | 199 | # It would be preferrable if this was also a formulainvocation error. | ||
1909 | 200 | return self.failUnlessFailure( | 418 | return self.failUnlessFailure( |
1910 | 201 | self.lifecycle.install(), FormulaError) | 419 | self.lifecycle.install(), FormulaError) |
1911 | 202 | 420 | ||
1912 | 203 | def test_hook_not_formatted_correctly(self): | 421 | def test_hook_not_formatted_correctly(self): |
1913 | 204 | """Hook execution error, raises an exception.""" | 422 | """Hook execution error, raises an exception.""" |
1914 | 205 | self.write_hook("install", '!/bin/sh\n exit 0') | 423 | self.write_hook("install", '!/bin/sh\n exit 0') |
1915 | 206 | # It would be preferrable if this was also a formulainvocation error. | ||
1916 | 207 | return self.failUnlessFailure( | 424 | return self.failUnlessFailure( |
1917 | 208 | self.lifecycle.install(), FormulaInvocationError) | 425 | self.lifecycle.install(), FormulaInvocationError) |
1918 | 209 | 426 | ||
1919 | @@ -510,7 +727,7 @@ | |||
1920 | 510 | @inlineCallbacks | 727 | @inlineCallbacks |
1921 | 511 | def test_initial_start_lifecycle_no_related_no_exec(self): | 728 | def test_initial_start_lifecycle_no_related_no_exec(self): |
1922 | 512 | """ | 729 | """ |
1924 | 513 | If there are no related units on startup, the relation changed hook | 730 | If there are no related units on startup, the relation joined hook |
1925 | 514 | is not invoked. | 731 | is not invoked. |
1926 | 515 | """ | 732 | """ |
1927 | 516 | file_path = self.makeFile() | 733 | file_path = self.makeFile() |
1928 | @@ -523,6 +740,26 @@ | |||
1929 | 523 | self.assertFalse(os.path.exists(file_path)) | 740 | self.assertFalse(os.path.exists(file_path)) |
1930 | 524 | 741 | ||
1931 | 525 | @inlineCallbacks | 742 | @inlineCallbacks |
1932 | 743 | def test_stop_can_continue_watching(self): | ||
1933 | 744 | """ | ||
1934 | 745 | """ | ||
1935 | 746 | file_path = self.makeFile() | ||
1936 | 747 | self.write_hook( | ||
1937 | 748 | "%s-relation-changed" % self.relation_name, | ||
1938 | 749 | ("#!/bin/bash\n" "echo executed >> %s\n" % file_path)) | ||
1939 | 750 | rel_states = yield self.add_opposite_service_unit(self.states) | ||
1940 | 751 | yield self.lifecycle.start() | ||
1941 | 752 | yield self.wait_on_hook( | ||
1942 | 753 | sequence=["app-relation-joined", "app-relation-changed"]) | ||
1943 | 754 | changed_executed = self.wait_on_hook("app-relation-changed") | ||
1944 | 755 | yield self.lifecycle.stop(watches=False) | ||
1945 | 756 | rel_states["unit_relation"].set_data(yaml.dump(dict(hello="world"))) | ||
1946 | 757 | yield self.sleep(0.1) | ||
1947 | 758 | self.assertFalse(changed_executed.called) | ||
1948 | 759 | yield self.lifecycle.start(watches=False) | ||
1949 | 760 | yield changed_executed | ||
1950 | 761 | |||
1951 | 762 | @inlineCallbacks | ||
1952 | 526 | def test_initial_start_lifecycle_with_related(self): | 763 | def test_initial_start_lifecycle_with_related(self): |
1953 | 527 | """ | 764 | """ |
1954 | 528 | If there are related units on startup, the relation changed hook | 765 | If there are related units on startup, the relation changed hook |
1955 | 529 | 766 | ||
1956 | === modified file 'ensemble/unit/tests/test_workflow.py' | |||
1957 | --- ensemble/unit/tests/test_workflow.py 2011-04-28 17:55:11 +0000 | |||
1958 | +++ ensemble/unit/tests/test_workflow.py 2011-05-02 21:57:29 +0000 | |||
1959 | @@ -10,7 +10,7 @@ | |||
1960 | 10 | 10 | ||
1961 | 11 | from ensemble.unit.workflow import ( | 11 | from ensemble.unit.workflow import ( |
1962 | 12 | UnitWorkflowState, RelationWorkflowState, WorkflowStateClient, | 12 | UnitWorkflowState, RelationWorkflowState, WorkflowStateClient, |
1964 | 13 | is_unit_running) | 13 | is_unit_running, is_relation_running) |
1965 | 14 | 14 | ||
1966 | 15 | 15 | ||
1967 | 16 | class WorkflowTestBase(LifecycleTestBase): | 16 | class WorkflowTestBase(LifecycleTestBase): |
1968 | @@ -321,6 +321,26 @@ | |||
1969 | 321 | self.state_directory) | 321 | self.state_directory) |
1970 | 322 | 322 | ||
1971 | 323 | @inlineCallbacks | 323 | @inlineCallbacks |
1972 | 324 | def test_is_relation_running(self): | ||
1973 | 325 | """The unit relation's workflow state can be categorized as a | ||
1974 | 326 | boolean. | ||
1975 | 327 | """ | ||
1976 | 328 | running, state = yield is_relation_running( | ||
1977 | 329 | self.client, self.states["unit_relation"]) | ||
1978 | 330 | self.assertIdentical(running, False) | ||
1979 | 331 | self.assertIdentical(state, None) | ||
1980 | 332 | yield self.workflow.fire_transition("start") | ||
1981 | 333 | running, state = yield is_relation_running( | ||
1982 | 334 | self.client, self.states["unit_relation"]) | ||
1983 | 335 | self.assertIdentical(running, True) | ||
1984 | 336 | self.assertEqual(state, "up") | ||
1985 | 337 | yield self.workflow.fire_transition("stop") | ||
1986 | 338 | running, state = yield is_relation_running( | ||
1987 | 339 | self.client, self.states["unit_relation"]) | ||
1988 | 340 | self.assertIdentical(running, False) | ||
1989 | 341 | self.assertEqual(state, "down") | ||
1990 | 342 | |||
1991 | 343 | @inlineCallbacks | ||
1992 | 324 | def test_up_down_cycle(self): | 344 | def test_up_down_cycle(self): |
1993 | 325 | """The workflow can be transition from up to down, and back. | 345 | """The workflow can be transition from up to down, and back. |
1994 | 326 | """ | 346 | """ |
1995 | @@ -381,7 +401,7 @@ | |||
1996 | 381 | # Add a new unit, and wait for the broken hook to result in | 401 | # Add a new unit, and wait for the broken hook to result in |
1997 | 382 | # the transition to the down state. | 402 | # the transition to the down state. |
1998 | 383 | yield self.add_opposite_service_unit(self.states) | 403 | yield self.add_opposite_service_unit(self.states) |
2000 | 384 | yield self.wait_on_state(self.workflow, "down") | 404 | yield self.wait_on_state(self.workflow, "error") |
2001 | 385 | 405 | ||
2002 | 386 | f_state, history, zk_state = yield self.read_persistent_state( | 406 | f_state, history, zk_state = yield self.read_persistent_state( |
2003 | 387 | history_id=self.workflow.zk_state_id) | 407 | history_id=self.workflow.zk_state_id) |
2004 | @@ -392,7 +412,7 @@ | |||
2005 | 392 | "formula", "hooks", "app-relation-changed")) | 412 | "formula", "hooks", "app-relation-changed")) |
2006 | 393 | 413 | ||
2007 | 394 | self.assertEqual(f_state, | 414 | self.assertEqual(f_state, |
2009 | 395 | {"state": "down", | 415 | {"state": "error", |
2010 | 396 | "state_variables": { | 416 | "state_variables": { |
2011 | 397 | "change_type": "joined", | 417 | "change_type": "joined", |
2012 | 398 | "error_message": error}}) | 418 | "error_message": error}}) |
2013 | 399 | 419 | ||
2014 | === modified file 'ensemble/unit/workflow.py' | |||
2015 | --- ensemble/unit/workflow.py 2011-04-19 18:55:06 +0000 | |||
2016 | +++ ensemble/unit/workflow.py 2011-05-02 21:57:29 +0000 | |||
2017 | @@ -17,15 +17,19 @@ | |||
2018 | 17 | Transition("install", "Install", None, "installed", | 17 | Transition("install", "Install", None, "installed", |
2019 | 18 | error_transition_id="error_install"), | 18 | error_transition_id="error_install"), |
2020 | 19 | Transition("error_install", "Install Error", None, "install_error"), | 19 | Transition("error_install", "Install Error", None, "install_error"), |
2022 | 20 | Transition("retry_install", "Retry Install", "install_error", "installed"), | 20 | Transition("retry_install", "Retry Install", "install_error", "installed", |
2023 | 21 | alias="retry"), | ||
2024 | 21 | Transition("start", "Start", "installed", "started", | 22 | Transition("start", "Start", "installed", "started", |
2025 | 22 | error_transition_id="error_start"), | 23 | error_transition_id="error_start"), |
2026 | 23 | Transition("error_start", "Start Error", "installed", "start_error"), | 24 | Transition("error_start", "Start Error", "installed", "start_error"), |
2028 | 24 | Transition("retry_start", "Retry Start", "start_error", "started"), | 25 | Transition("retry_start", "Retry Start", "start_error", "started", |
2029 | 26 | alias="retry"), | ||
2030 | 25 | Transition("stop", "Stop", "started", "stopped", | 27 | Transition("stop", "Stop", "started", "stopped", |
2031 | 26 | error_transition_id="error_stop"), | 28 | error_transition_id="error_stop"), |
2032 | 27 | Transition("error_stop", "Stop Error", "started", "stop_error"), | 29 | Transition("error_stop", "Stop Error", "started", "stop_error"), |
2034 | 28 | Transition("retry_stop", "Retry Stop", "stop_error", "stopped"), | 30 | Transition("retry_stop", "Retry Stop", "stop_error", "stopped", |
2035 | 31 | alias="retry"), | ||
2036 | 32 | Transition("restart", "Restart", "stop", "start", alias="retry"), | ||
2037 | 29 | 33 | ||
2038 | 30 | # Upgrade Transitions (stay in state, with success transitition) | 34 | # Upgrade Transitions (stay in state, with success transitition) |
2039 | 31 | Transition( | 35 | Transition( |
2040 | @@ -36,27 +40,47 @@ | |||
2041 | 36 | "started", "formula_upgrade_error"), | 40 | "started", "formula_upgrade_error"), |
2042 | 37 | Transition( | 41 | Transition( |
2043 | 38 | "retry_upgrade_formula", "Upgrade from stop error", | 42 | "retry_upgrade_formula", "Upgrade from stop error", |
2045 | 39 | "formula_upgrade_error", "started") | 43 | "formula_upgrade_error", "started", alias="retry") |
2046 | 40 | ) | 44 | ) |
2047 | 41 | 45 | ||
2048 | 42 | 46 | ||
2061 | 43 | # There's been some discussion, if we should have per change type error states | 47 | # Unit relation error states |
2062 | 44 | # here, corresponding to the different changes that the relation-changed hook | 48 | # |
2063 | 45 | # is invoked for. The important aspects to capture are both observability of | 49 | # There's been some discussion, if we should have per change type |
2064 | 46 | # error type locally and globally (zk), and per error type and instance | 50 | # error states here, corresponding to the different changes that the |
2065 | 47 | # recovery of the same. To provide for this functionality without additional | 51 | # relation-changed hook is invoked for. The important aspects to |
2066 | 48 | # states, the error information (change type, and error message) are captured | 52 | # capture are both observability of error type locally and globally |
2067 | 49 | # in state variables which are locally and globally observable. Future | 53 | # (zk), and per error type and instance recovery of the same. To |
2068 | 50 | # extension of the restart transition action, will allow for customized | 54 | # provide for this functionality without additional states, the error |
2069 | 51 | # recovery based on the change type state variable. Effectively this | 55 | # information (change type, and error message) are captured in state |
2070 | 52 | # differs from the unit definition, in that it collapses three possible error | 56 | # variables which are locally and globally observable. Future |
2071 | 53 | # states, into a behavior off switch. A separate state will be needed to | 57 | # extension of the restart transition action, will allow for |
2072 | 54 | # denote departing. | 58 | # customized recovery based on the change type state |
2073 | 59 | # variable. Effectively this differs from the unit definition, in that | ||
2074 | 60 | # it collapses three possible error states, into a behavior off | ||
2075 | 61 | # switch. A separate state will be needed to denote departing. | ||
2076 | 62 | |||
2077 | 63 | |||
2078 | 64 | # Process recovery using on disk workflow state | ||
2079 | 65 | # | ||
2080 | 66 | # Another interesting issue, process recovery using the on disk state, | ||
2081 | 67 | # is complicated by consistency to the the in memory state, which | ||
2082 | 68 | # won't be directly recoverable anymore without some state specific | ||
2083 | 69 | # semantics to recovering from on disk state, ie a restarted unit | ||
2084 | 70 | # agent, with a relation in an error state would require special | ||
2085 | 71 | # semantics around loading from disk to ensure that the in-memory | ||
2086 | 72 | # process state (watching and scheduling but not executing) matches | ||
2087 | 73 | # the recovery transition actions (which just restart hook execution, | ||
2088 | 74 | # but assume the watch continues).. this functionality added to better | ||
2089 | 75 | # allow for the behavior that while down due to a hook error, the | ||
2090 | 76 | # relation would continues to schedule pending hooks | ||
2091 | 55 | 77 | ||
2092 | 56 | RelationWorkflow = Workflow( | 78 | RelationWorkflow = Workflow( |
2093 | 57 | Transition("start", "Start", None, "up"), | 79 | Transition("start", "Start", None, "up"), |
2094 | 58 | Transition("stop", "Stop", "up", "down"), | 80 | Transition("stop", "Stop", "up", "down"), |
2096 | 59 | Transition("restart", "Restart", "down", "up"), | 81 | Transition("restart", "Restart", "down", "up", alias="retry"), |
2097 | 82 | Transition("error", "Relation hook error", "up", "error"), | ||
2098 | 83 | Transition("reset", "Recover from hook error", "error", "up"), | ||
2099 | 60 | Transition("depart", "Relation broken", "up", "departed"), | 84 | Transition("depart", "Relation broken", "up", "departed"), |
2100 | 61 | Transition("down_depart", "Relation broken", "down", "departed"), | 85 | Transition("down_depart", "Relation broken", "down", "departed"), |
2101 | 62 | ) | 86 | ) |
2102 | @@ -67,12 +91,26 @@ | |||
2103 | 67 | """Is the service unit in a running state. | 91 | """Is the service unit in a running state. |
2104 | 68 | 92 | ||
2105 | 69 | Returns a boolean which is true if the unit is running, and | 93 | Returns a boolean which is true if the unit is running, and |
2107 | 70 | the unit state in two element tuple. | 94 | the unit workflow state in a two element tuple. |
2108 | 71 | """ | 95 | """ |
2109 | 72 | workflow_state = yield WorkflowStateClient(client, unit).get_state() | 96 | workflow_state = yield WorkflowStateClient(client, unit).get_state() |
2110 | 73 | if not workflow_state: | 97 | if not workflow_state: |
2111 | 74 | returnValue((False, None)) | 98 | returnValue((False, None)) |
2113 | 75 | running = workflow_state in ("started",) | 99 | running = workflow_state == "started" |
2114 | 100 | returnValue((running, workflow_state)) | ||
2115 | 101 | |||
2116 | 102 | |||
2117 | 103 | @inlineCallbacks | ||
2118 | 104 | def is_relation_running(client, relation): | ||
2119 | 105 | """Is the unit relation in a running state. | ||
2120 | 106 | |||
2121 | 107 | Returns a boolean which is true if the relation is running, and | ||
2122 | 108 | the unit relation workflow state in a two element tuple. | ||
2123 | 109 | """ | ||
2124 | 110 | workflow_state = yield WorkflowStateClient(client, relation).get_state() | ||
2125 | 111 | if not workflow_state: | ||
2126 | 112 | returnValue((False, None)) | ||
2127 | 113 | running = workflow_state == "up" | ||
2128 | 76 | returnValue((running, workflow_state)) | 114 | returnValue((running, workflow_state)) |
2129 | 77 | 115 | ||
2130 | 78 | 116 | ||
2131 | @@ -198,7 +236,6 @@ | |||
2132 | 198 | row per entry with CSV escaping. | 236 | row per entry with CSV escaping. |
2133 | 199 | """ | 237 | """ |
2134 | 200 | state_serialized = yaml.safe_dump(state_dict) | 238 | state_serialized = yaml.safe_dump(state_dict) |
2135 | 201 | |||
2136 | 202 | # State File | 239 | # State File |
2137 | 203 | with open(self.state_file_path, "w") as handle: | 240 | with open(self.state_file_path, "w") as handle: |
2138 | 204 | handle.write(state_serialized) | 241 | handle.write(state_serialized) |
2139 | @@ -218,6 +255,7 @@ | |||
2140 | 218 | return {"state": None} | 255 | return {"state": None} |
2141 | 219 | with open(self.state_file_path, "r") as handle: | 256 | with open(self.state_file_path, "r") as handle: |
2142 | 220 | content = handle.read() | 257 | content = handle.read() |
2143 | 258 | |||
2144 | 221 | return yaml.load(content) | 259 | return yaml.load(content) |
2145 | 222 | 260 | ||
2146 | 223 | 261 | ||
2147 | @@ -257,9 +295,9 @@ | |||
2148 | 257 | self._lifecycle = lifecycle | 295 | self._lifecycle = lifecycle |
2149 | 258 | 296 | ||
2150 | 259 | @inlineCallbacks | 297 | @inlineCallbacks |
2152 | 260 | def _invoke_lifecycle(self, method): | 298 | def _invoke_lifecycle(self, method, *args, **kw): |
2153 | 261 | try: | 299 | try: |
2155 | 262 | result = yield method() | 300 | result = yield method(*args, **kw) |
2156 | 263 | except (FileNotFound, FormulaError, FormulaInvocationError), e: | 301 | except (FileNotFound, FormulaError, FormulaInvocationError), e: |
2157 | 264 | raise TransitionError(e) | 302 | raise TransitionError(e) |
2158 | 265 | returnValue(result) | 303 | returnValue(result) |
2159 | @@ -274,21 +312,25 @@ | |||
2160 | 274 | def do_stop(self): | 312 | def do_stop(self): |
2161 | 275 | return self._invoke_lifecycle(self._lifecycle.stop) | 313 | return self._invoke_lifecycle(self._lifecycle.stop) |
2162 | 276 | 314 | ||
2163 | 277 | def do_retry_start(self): | ||
2164 | 278 | return self._invoke_lifecycle(self._lifecycle.start) | ||
2165 | 279 | |||
2166 | 280 | def do_retry_stop(self): | ||
2167 | 281 | self._invoke_lifecycle(self._lifecycle.stop) | ||
2168 | 282 | |||
2169 | 283 | def do_retry_install(self): | ||
2170 | 284 | return self._invoke_lifecycle(self._lifecycle.install) | ||
2171 | 285 | |||
2172 | 286 | def do_retry_upgrade_formula(self): | ||
2173 | 287 | return self._invoke_lifecycle(self._lifecycle.upgrade_formula) | ||
2174 | 288 | |||
2175 | 289 | def do_upgrade_formula(self): | 315 | def do_upgrade_formula(self): |
2176 | 290 | return self._invoke_lifecycle(self._lifecycle.upgrade_formula) | 316 | return self._invoke_lifecycle(self._lifecycle.upgrade_formula) |
2177 | 291 | 317 | ||
2178 | 318 | def do_retry_start(self, fire_hooks=True): | ||
2179 | 319 | return self._invoke_lifecycle( | ||
2180 | 320 | self._lifecycle.start, fire_hooks=fire_hooks) | ||
2181 | 321 | |||
2182 | 322 | def do_retry_stop(self, fire_hooks=True): | ||
2183 | 323 | self._invoke_lifecycle( | ||
2184 | 324 | self._lifecycle.stop, fire_hooks=fire_hooks) | ||
2185 | 325 | |||
2186 | 326 | def do_retry_install(self, fire_hooks=True): | ||
2187 | 327 | return self._invoke_lifecycle( | ||
2188 | 328 | self._lifecycle.install, fire_hooks=fire_hooks) | ||
2189 | 329 | |||
2190 | 330 | def do_retry_upgrade_formula(self, fire_hooks=True): | ||
2191 | 331 | return self._invoke_lifecycle( | ||
2192 | 332 | self._lifecycle.upgrade_formula, fire_hooks=fire_hooks) | ||
2193 | 333 | |||
2194 | 292 | 334 | ||
2195 | 293 | class RelationWorkflowState(DiskWorkflowState): | 335 | class RelationWorkflowState(DiskWorkflowState): |
2196 | 294 | 336 | ||
2197 | @@ -321,7 +363,7 @@ | |||
2198 | 321 | 363 | ||
2199 | 322 | @param: error: The error from hook invocation. | 364 | @param: error: The error from hook invocation. |
2200 | 323 | """ | 365 | """ |
2202 | 324 | yield self.fire_transition("stop", | 366 | yield self.fire_transition("error", |
2203 | 325 | change_type=relation_change.change_type, | 367 | change_type=relation_change.change_type, |
2204 | 326 | error_message=str(error)) | 368 | error_message=str(error)) |
2205 | 327 | 369 | ||
2206 | @@ -330,12 +372,30 @@ | |||
2207 | 330 | """Transition the workflow to the 'down' state. | 372 | """Transition the workflow to the 'down' state. |
2208 | 331 | 373 | ||
2209 | 332 | Turns off the unit-relation lifecycle monitoring and hook execution. | 374 | Turns off the unit-relation lifecycle monitoring and hook execution. |
2210 | 375 | |||
2211 | 376 | :param error_info: If called on relation hook error, contains | ||
2212 | 377 | error variables. | ||
2213 | 333 | """ | 378 | """ |
2214 | 334 | yield self._lifecycle.stop() | 379 | yield self._lifecycle.stop() |
2215 | 335 | 380 | ||
2216 | 336 | @inlineCallbacks | 381 | @inlineCallbacks |
2217 | 382 | def do_reset(self): | ||
2218 | 383 | """Transition the workflow to the 'up' state from an error state. | ||
2219 | 384 | |||
2220 | 385 | Turns on the unit-relation lifecycle monitoring and hook execution. | ||
2221 | 386 | """ | ||
2222 | 387 | yield self._lifecycle.start(watches=False) | ||
2223 | 388 | |||
2224 | 389 | @inlineCallbacks | ||
2225 | 390 | def do_error(self, **error_info): | ||
2226 | 391 | """A relation hook error, stops further execution hooks but | ||
2227 | 392 | continues to watch for changes. | ||
2228 | 393 | """ | ||
2229 | 394 | yield self._lifecycle.stop(watches=False) | ||
2230 | 395 | |||
2231 | 396 | @inlineCallbacks | ||
2232 | 337 | def do_restart(self): | 397 | def do_restart(self): |
2234 | 338 | """Transition the workflow to the 'up' state. | 398 | """Transition the workflow to the 'up' state from the down state. |
2235 | 339 | 399 | ||
2236 | 340 | Turns on the unit-relation lifecycle monitoring and hook execution. | 400 | Turns on the unit-relation lifecycle monitoring and hook execution. |
2237 | 341 | """ | 401 | """ |