Merge lp:~hazmat/pyjuju/unit-agent-resolved into lp:pyjuju

Proposed by Kapil Thangavelu
Status: Superseded
Proposed branch: lp:~hazmat/pyjuju/unit-agent-resolved
Merge into: lp:pyjuju
Diff against target: 2237 lines (+1531/-93)
16 files modified
Makefile (+1/-1)
ensemble/control/__init__.py (+2/-0)
ensemble/control/resolved.py (+110/-0)
ensemble/control/tests/test_resolved.py (+390/-0)
ensemble/lib/statemachine.py (+16/-10)
ensemble/lib/tests/test_statemachine.py (+9/-0)
ensemble/state/errors.py (+24/-0)
ensemble/state/service.py (+186/-2)
ensemble/state/tests/test_errors.py (+20/-1)
ensemble/state/tests/test_service.py (+286/-2)
ensemble/state/tests/test_utils.py (+26/-12)
ensemble/state/utils.py (+18/-2)
ensemble/unit/lifecycle.py (+84/-21)
ensemble/unit/tests/test_lifecycle.py (+240/-3)
ensemble/unit/tests/test_workflow.py (+23/-3)
ensemble/unit/workflow.py (+96/-36)
To merge this branch: bzr merge lp:~hazmat/pyjuju/unit-agent-resolved
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+59712@code.launchpad.net

This proposal has been superseded by a proposal from 2011-05-02.

Description of the change

This implements the unit lifecycle resolving unit relations, and additional changes to enable resolution (transition actions receiving hook execution flags).

The branch is large enough that i'm going to split the remaining unit agent resolving unit relations into another branch.

To post a comment you must log in.
269. By Kapil Thangavelu

update unit tests to use relation workflow accessor on lifecycle.

270. By Kapil Thangavelu

remove passing action transition/state variables

271. By Kapil Thangavelu

separate transitions for retry with hook

272. By Kapil Thangavelu

merge trunk resolve conflict.

273. By Kapil Thangavelu

resolve merge conflict

274. By Kapil Thangavelu

expand out additional recovery transition actions

275. By Kapil Thangavelu

complete the change over to more transitions, fill out coverage

276. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

277. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

278. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

279. By Kapil Thangavelu

address review comments re formatting

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'Makefile'
--- Makefile 2011-02-11 14:17:18 +0000
+++ Makefile 2011-05-02 21:57:29 +0000
@@ -31,7 +31,7 @@
31 @test -n "$(modified)" && echo $(modified) | xargs pyflakes31 @test -n "$(modified)" && echo $(modified) | xargs pyflakes
3232
3333
34modified=$(shell bzr status -S -r ancestor:../trunk |grep -P '^\s*M' | awk '{print $$2;}'| grep -P ".py$$")34modified=$(shell bzr status -S -r ancestor:../../trunk |grep -P '^\s*M' | awk '{print $$2;}'| grep -P ".py$$")
35review:35review:
36 @test -n "$(modified)" && echo $(modified) | xargs $(PEP8) --repeat36 @test -n "$(modified)" && echo $(modified) | xargs $(PEP8) --repeat
37 @test -n "$(modified)" && echo $(modified) | xargs pyflakes37 @test -n "$(modified)" && echo $(modified) | xargs pyflakes
3838
=== modified file 'ensemble/control/__init__.py'
--- ensemble/control/__init__.py 2011-04-25 17:53:19 +0000
+++ ensemble/control/__init__.py 2011-05-02 21:57:29 +0000
@@ -17,6 +17,7 @@
17import open_tunnel17import open_tunnel
18import remove_relation18import remove_relation
19import remove_unit19import remove_unit
20import resolved
20import status21import status
21import shutdown22import shutdown
22import ssh23import ssh
@@ -39,6 +40,7 @@
39 open_tunnel,40 open_tunnel,
40 remove_relation,41 remove_relation,
41 remove_unit,42 remove_unit,
43 resolved,
42 status,44 status,
43 ssh,45 ssh,
44 shutdown,46 shutdown,
4547
=== added file 'ensemble/control/resolved.py'
--- ensemble/control/resolved.py 1970-01-01 00:00:00 +0000
+++ ensemble/control/resolved.py 2011-05-02 21:57:29 +0000
@@ -0,0 +1,110 @@
1"""Implementation of resolved subcommand"""
2
3
4from twisted.internet.defer import inlineCallbacks, returnValue
5
6from ensemble.control.utils import get_environment
7
8from ensemble.state.service import ServiceStateManager, RETRY_HOOKS, NO_HOOKS
9from ensemble.state.relation import RelationStateManager
10from ensemble.state.errors import RelationStateNotFound
11from ensemble.unit.workflow import is_unit_running, is_relation_running
12
13
14def configure_subparser(subparsers):
15 """Configure resolved subcommand"""
16 sub_parser = subparsers.add_parser(
17 "resolved", help=command.__doc__, description=resolved.__doc__)
18 sub_parser.add_argument(
19 "--retry", "-r", action="store_true",
20 help="Retry failed hook."),
21 sub_parser.add_argument(
22 "--environment", "-e",
23 help="Ensemble environment to operate in.")
24 sub_parser.add_argument(
25 "service_unit_name",
26 help="Name of the service unit that should be resolved")
27 sub_parser.add_argument(
28 "relation_name", nargs="?", default=None,
29 help="Name of the unit relation that should be resolved")
30 return sub_parser
31
32
33def command(options):
34 """Mark an error as resolved in a unit or unit relation."""
35 environment = get_environment(options)
36 return resolved(
37 options.environments,
38 environment,
39 options.verbose,
40 options.log,
41 options.service_unit_name,
42 options.relation_name,
43 options.retry)
44
45
46@inlineCallbacks
47def resolved(
48 config, environment, verbose, log, unit_name, relation_name, retry):
49 """Mark an error as resolved in a unit or unit relation.
50
51 If one of a unit's formula non-relation hooks returns a non-zero exit
52 status, the entire unit can be considered to be in a non-running state.
53
54 As a resolution, the the unit can be manually returned a running state
55 via the ensemble resolved command. Optionally this command can also
56 rerun the failed hook.
57
58 This resolution also applies separately to each of the unit's relations.
59 If one of the relation-hooks failed. In that case there is no
60 notion of retrying (the change is gone), but resolving will allow
61 additional relation hooks for that relation to proceed.
62 """
63 provider = environment.get_machine_provider()
64 client = yield provider.connect()
65 service_manager = ServiceStateManager(client)
66 relation_manager = RelationStateManager(client)
67
68 unit_state = yield service_manager.get_unit_state(unit_name)
69 service_state = yield service_manager.get_service_state(
70 unit_name.split("/")[0])
71
72 retry = retry and RETRY_HOOKS or NO_HOOKS
73
74 if not relation_name:
75 running, workflow_state = yield is_unit_running(client, unit_state)
76 if running:
77 log.info("Unit %r already running: %s", unit_name, workflow_state)
78 client.close()
79 returnValue(False)
80
81 yield unit_state.set_resolved(retry)
82 log.info("Marked unit %r as resolved", unit_name)
83 returnValue(True)
84
85 # Check for the matching relations
86 service_relations = yield relation_manager.get_relations_for_service(
87 service_state)
88 service_relations = [
89 sr for sr in service_relations if sr.relation_name == relation_name]
90 if not service_relations:
91 raise RelationStateNotFound()
92
93 # Verify the relations are in need of resolution.
94 resolve_relations = {}
95 for service_relation in service_relations:
96 unit_relation = yield service_relation.get_unit_state(unit_state)
97 running, state = yield is_relation_running(client, unit_relation)
98 if not running:
99 resolve_relations[unit_relation.internal_relation_id] = retry
100
101 if not resolve_relations:
102 log.warning("Matched relations are all running")
103 client.close()
104 returnValue(False)
105
106 # Mark the relations for resolution.
107 yield unit_state.set_relation_resolved(resolve_relations)
108 log.info(
109 "Marked unit %r relation %r as resolved", unit_name, relation_name)
110 client.close()
0111
=== added file 'ensemble/control/tests/test_resolved.py'
--- ensemble/control/tests/test_resolved.py 1970-01-01 00:00:00 +0000
+++ ensemble/control/tests/test_resolved.py 2011-05-02 21:57:29 +0000
@@ -0,0 +1,390 @@
1from twisted.internet.defer import inlineCallbacks, returnValue
2from yaml import dump
3
4from ensemble.control import main
5from ensemble.control.resolved import resolved
6from ensemble.control.tests.common import ControlToolTest
7from ensemble.formula.tests.test_repository import RepositoryTestBase
8
9from ensemble.state.service import RETRY_HOOKS, NO_HOOKS
10from ensemble.state.tests.test_service import ServiceStateManagerTestBase
11from ensemble.state.errors import ServiceStateNotFound
12
13from ensemble.unit.workflow import UnitWorkflowState, RelationWorkflowState
14from ensemble.unit.lifecycle import UnitRelationLifecycle
15from ensemble.hooks.executor import HookExecutor
16
17
18class ControlResolvedTest(
19 ServiceStateManagerTestBase, ControlToolTest, RepositoryTestBase):
20
21 @inlineCallbacks
22 def setUp(self):
23 yield super(ControlResolvedTest, self).setUp()
24 config = {
25 "ensemble": "environments",
26 "environments": {"firstenv": {"type": "dummy"}}}
27
28 self.write_config(dump(config))
29 self.config.load()
30
31 yield self.add_relation_state("wordpress", "mysql")
32 yield self.add_relation_state("wordpress", "varnish")
33
34 self.service1 = yield self.service_state_manager.get_service_state(
35 "mysql")
36 self.service_unit1 = yield self.service1.add_unit_state()
37 self.service_unit2 = yield self.service1.add_unit_state()
38
39 self.unit1_workflow = UnitWorkflowState(
40 self.client, self.service_unit1, None, self.makeDir())
41 yield self.unit1_workflow.set_state("started")
42
43 self.environment = self.config.get_default()
44 self.provider = self.environment.get_machine_provider()
45
46 self.output = self.capture_logging()
47 self.stderr = self.capture_stream("stderr")
48 self.executor = HookExecutor()
49
50 @inlineCallbacks
51 def add_relation_state(self, *service_names):
52 for service_name in service_names:
53 try:
54 yield self.service_state_manager.get_service_state(
55 service_name)
56 except ServiceStateNotFound:
57 yield self.add_service_from_formula(service_name)
58
59 endpoint_pairs = yield self.service_state_manager.join_descriptors(
60 *service_names)
61 endpoints = endpoint_pairs[0]
62 endpoints = endpoint_pairs[0]
63 if endpoints[0] == endpoints[1]:
64 endpoints = endpoints[0:1]
65 relation_state = (yield self.relation_state_manager.add_relation_state(
66 *endpoints))[0]
67 returnValue(relation_state)
68
69 @inlineCallbacks
70 def get_named_service_relation(self, service_state, relation_name):
71 if isinstance(service_state, str):
72 service_state = yield self.service_state_manager.get_service_state(
73 service_state)
74
75 rels = yield self.relation_state_manager.get_relations_for_service(
76 service_state)
77
78 rels = [sr for sr in rels if sr.relation_name == relation_name]
79 if len(rels) == 1:
80 returnValue(rels[0])
81 returnValue(rels)
82
83 @inlineCallbacks
84 def setup_unit_relations(self, service_relation, *units):
85 """
86 Given a service relation and set of unit tuples in the form
87 unit_state, unit_relation_workflow_state, will add unit relations
88 for these units and update their workflow state to the desired/given
89 state.
90 """
91 for unit, state in units:
92 unit_relation = yield service_relation.add_unit_state(unit)
93 lifecycle = UnitRelationLifecycle(
94 self.client, unit_relation, service_relation.relation_name,
95 self.makeDir(), self.executor)
96 workflow_state = RelationWorkflowState(
97 self.client, unit_relation, lifecycle, self.makeDir())
98 yield workflow_state.set_state(state)
99
100 @inlineCallbacks
101 def test_resolved(self):
102 """
103 'ensemble resolved <unit_name>' will schedule a unit for
104 retrying from an error state.
105 """
106 # Push the unit into an error state
107 yield self.unit1_workflow.set_state("start_error")
108 self.setup_exit(0)
109 finished = self.setup_cli_reactor()
110 self.mocker.replay()
111
112 self.assertEqual(
113 (yield self.service_unit1.get_resolved()), None)
114
115 main(["resolved", "mysql/0"])
116 yield finished
117
118 self.assertEqual(
119 (yield self.service_unit1.get_resolved()), {"retry": NO_HOOKS})
120 self.assertIn(
121 "Marked unit 'mysql/0' as resolved",
122 self.output.getvalue())
123
124 @inlineCallbacks
125 def test_resolved_retry(self):
126 """
127 'ensemble resolved --retry <unit_name>' will schedule a unit
128 for retrying from an error state with a retry of hooks
129 executions.
130 """
131 yield self.unit1_workflow.set_state("start_error")
132 self.setup_exit(0)
133 finished = self.setup_cli_reactor()
134 self.mocker.replay()
135
136 self.assertEqual(
137 (yield self.service_unit1.get_resolved()), None)
138
139 main(["resolved", "--retry", "mysql/0"])
140 yield finished
141
142 self.assertEqual(
143 (yield self.service_unit1.get_resolved()), {"retry": RETRY_HOOKS})
144 self.assertIn(
145 "Marked unit 'mysql/0' as resolved",
146 self.output.getvalue())
147
148 @inlineCallbacks
149 def test_relation_resolved(self):
150 """
151 'ensemble relation <unit_name> <rel_name>' will schedule
152 the broken unit relations for being resolved.
153 """
154 service_relation = yield self.get_named_service_relation(
155 self.service1, "server")
156
157 self.setup_unit_relations(
158 service_relation,
159 (self.service_unit1, "down"),
160 (self.service_unit2, "up"))
161
162 yield self.unit1_workflow.set_state("start_error")
163 self.setup_exit(0)
164 finished = self.setup_cli_reactor()
165 self.mocker.replay()
166
167 self.assertEqual(
168 (yield self.service_unit1.get_relation_resolved()), None)
169
170 main(["resolved", "--retry", "mysql/0",
171 service_relation.relation_name])
172 yield finished
173
174 self.assertEqual(
175 (yield self.service_unit1.get_relation_resolved()),
176 {service_relation.internal_relation_id: RETRY_HOOKS})
177 self.assertEqual(
178 (yield self.service_unit2.get_relation_resolved()),
179 None)
180 self.assertIn(
181 "Marked unit 'mysql/0' relation 'server' as resolved",
182 self.output.getvalue())
183
184 @inlineCallbacks
185 def test_resolved_relation_some_already_resolved(self):
186 """
187 'ensemble resolved <service_name> <rel_name>' will mark
188 resolved all down units that are not already marked resolved.
189 """
190
191 service2 = yield self.service_state_manager.get_service_state(
192 "wordpress")
193 service_unit1 = yield service2.add_unit_state()
194
195 service_relation = yield self.get_named_service_relation(
196 service2, "db")
197 yield self.setup_unit_relations(
198 service_relation, (service_unit1, "down"))
199
200 service_relation2 = yield self.get_named_service_relation(
201 service2, "cache")
202 yield self.setup_unit_relations(
203 service_relation2, (service_unit1, "down"))
204
205 yield service_unit1.set_relation_resolved(
206 {service_relation.internal_relation_id: NO_HOOKS})
207
208 self.setup_exit(0)
209 finished = self.setup_cli_reactor()
210 self.mocker.replay()
211
212 main(["resolved", "--retry", "wordpress/0", "cache"])
213 yield finished
214
215 self.assertEqual(
216 (yield service_unit1.get_relation_resolved()),
217 {service_relation.internal_relation_id: NO_HOOKS,
218 service_relation2.internal_relation_id: RETRY_HOOKS})
219
220 self.assertIn(
221 "Marked unit 'wordpress/0' relation 'cache' as resolved",
222 self.output.getvalue())
223
224 @inlineCallbacks
225 def test_resolved_relation_some_already_resolved_conflict(self):
226 """
227 'ensemble resolved <service_name> <rel_name>' will mark
228 resolved all down units that are not already marked resolved.
229 """
230
231 service2 = yield self.service_state_manager.get_service_state(
232 "wordpress")
233 service_unit1 = yield service2.add_unit_state()
234
235 service_relation = yield self.get_named_service_relation(
236 service2, "db")
237 yield self.setup_unit_relations(
238 service_relation, (service_unit1, "down"))
239
240 yield service_unit1.set_relation_resolved(
241 {service_relation.internal_relation_id: NO_HOOKS})
242
243 self.setup_exit(0)
244 finished = self.setup_cli_reactor()
245 self.mocker.replay()
246
247 main(["resolved", "--retry", "wordpress/0", "db"])
248 yield finished
249
250 self.assertEqual(
251 (yield service_unit1.get_relation_resolved()),
252 {service_relation.internal_relation_id: NO_HOOKS})
253
254 self.assertIn(
255 "Service unit 'wordpress/0' already has relations marked as resol",
256 self.output.getvalue())
257
258 @inlineCallbacks
259 def test_resolved_unknown_service(self):
260 """
261 'ensemble resolved <unit_name>' will report if a service is
262 invalid.
263 """
264 self.setup_exit(0)
265 finished = self.setup_cli_reactor()
266 self.mocker.replay()
267 main(["resolved", "zebra/0"])
268 yield finished
269 self.assertIn("Service 'zebra' was not found", self.stderr.getvalue())
270
271 @inlineCallbacks
272 def test_resolved_unknown_unit(self):
273 """
274 'ensemble resolved <unit_name>' will report if a unit is
275 invalid.
276 """
277 self.setup_exit(0)
278 finished = self.setup_cli_reactor()
279 self.mocker.replay()
280 main(["resolved", "mysql/5"])
281 yield finished
282 self.assertIn(
283 "Service unit 'mysql/5' was not found", self.output.getvalue())
284
285 @inlineCallbacks
286 def test_resolved_unknown_unit_relation(self):
287 """
288 'ensemble resolved <unit_name>' will report if a relation is
289 invalid.
290 """
291 self.setup_exit(0)
292 finished = self.setup_cli_reactor()
293 self.mocker.replay()
294
295 self.assertEqual(
296 (yield self.service_unit1.get_resolved()), None)
297
298 main(["resolved", "mysql/0", "magic"])
299 yield finished
300
301 self.assertIn("Relation not found", self.output.getvalue())
302
303 @inlineCallbacks
304 def test_resolved_already_running(self):
305 """
306 'ensemble resolved <unit_name>' will report if
307 the unit is already running.
308 """
309 # Just verify we don't accidentally mark up another unit of the service
310 unit2_workflow = UnitWorkflowState(
311 self.client, self.service_unit2, None, self.makeDir())
312 unit2_workflow.set_state("start_error")
313
314 self.setup_exit(0)
315 finished = self.setup_cli_reactor()
316 self.mocker.replay()
317
318 main(["resolved", "mysql/0"])
319 yield finished
320
321 self.assertEqual(
322 (yield self.service_unit2.get_resolved()), None)
323 self.assertEqual(
324 (yield self.service_unit1.get_resolved()), None)
325
326 self.assertNotIn(
327 "Unit 'mysql/0 already running: started",
328 self.output.getvalue())
329
330 @inlineCallbacks
331 def test_resolved_already_resolved(self):
332 """
333 'ensemble resolved <unit_name>' will report if
334 the unit is already resolved.
335 """
336 # Mark the unit as resolved and as in an error state.
337 yield self.service_unit1.set_resolved(RETRY_HOOKS)
338 yield self.unit1_workflow.set_state("start_error")
339
340 unit2_workflow = UnitWorkflowState(
341 self.client, self.service_unit1, None, self.makeDir())
342 unit2_workflow.set_state("start_error")
343
344 self.assertEqual(
345 (yield self.service_unit2.get_resolved()), None)
346
347 self.setup_exit(0)
348 finished = self.setup_cli_reactor()
349 self.mocker.replay()
350
351 main(["resolved", "mysql/0"])
352 yield finished
353
354 self.assertEqual(
355 (yield self.service_unit1.get_resolved()),
356 {"retry": RETRY_HOOKS})
357 self.assertNotIn(
358 "Marked unit 'mysql/0' as resolved",
359 self.output.getvalue())
360 self.assertIn(
361 "Service unit 'mysql/0' is already marked as resolved.",
362 self.stderr.getvalue(), "")
363
364 @inlineCallbacks
365 def test_resolved_relation_already_running(self):
366 """
367 'ensemble resolved <unit_name> <rel_name>' will report
368 if the relation is already running.
369 """
370 service2 = yield self.service_state_manager.get_service_state(
371 "wordpress")
372 service_unit1 = yield service2.add_unit_state()
373
374 service_relation = yield self.get_named_service_relation(
375 service2, "db")
376 yield self.setup_unit_relations(
377 service_relation, (service_unit1, "up"))
378
379 self.setup_exit(0)
380 finished = self.setup_cli_reactor()
381 self.mocker.replay()
382
383 main(["resolved", "wordpress/0", "db"])
384 yield finished
385
386 self.assertIn("Matched relations are all running",
387 self.output.getvalue())
388 self.assertEqual(
389 (yield service_unit1.get_relation_resolved()),
390 None)
0391
=== modified file 'ensemble/lib/statemachine.py'
--- ensemble/lib/statemachine.py 2011-04-15 20:20:57 +0000
+++ ensemble/lib/statemachine.py 2011-05-02 21:57:29 +0000
@@ -61,7 +61,7 @@
61 returnValue(self._workflow.get_transitions(state_id))61 returnValue(self._workflow.get_transitions(state_id))
6262
63 @inlineCallbacks63 @inlineCallbacks
64 def fire_transition_alias(self, transition_alias):64 def fire_transition_alias(self, transition_alias, **transition_variables):
65 """Fire a transition with the matching alias.65 """Fire a transition with the matching alias.
6666
67 A transition from the current state with the given alias will67 A transition from the current state with the given alias will
@@ -81,6 +81,9 @@
8181
82 Ambigious (multiple) or no matching transitions cause an exception82 Ambigious (multiple) or no matching transitions cause an exception
83 InvalidTransition to be raised.83 InvalidTransition to be raised.
84
85 Keyword args are used as transition variables and given to the
86 associated transtion action.
84 """87 """
8588
86 found = []89 found = []
@@ -100,7 +103,8 @@
100 "No transition found for alias:%s state:%s" % (103 "No transition found for alias:%s state:%s" % (
101 transition_alias, current_state))104 transition_alias, current_state))
102105
103 value = yield self.fire_transition(found[0].transition_id)106 value = yield self.fire_transition(
107 found[0].transition_id, **transition_variables)
104 returnValue(value)108 returnValue(value)
105109
106 @inlineCallbacks110 @inlineCallbacks
@@ -135,7 +139,8 @@
135 returnValue(False)139 returnValue(False)
136140
137 @inlineCallbacks141 @inlineCallbacks
138 def fire_transition(self, transition_id, **state_variables):142 def fire_transition(
143 self, transition_id, **transition_variables):
139 """Fire a transition with given id.144 """Fire a transition with given id.
140145
141 Invokes any transition actions, saves state and state variables, and146 Invokes any transition actions, saves state and state variables, and
@@ -156,10 +161,9 @@
156 transition_id,161 transition_id,
157 transition.source,162 transition.source,
158 transition.destination,163 transition.destination,
159 state_variables)164 transition_variables)
160165
161 # Execute any per transition action.166 # Execute any per transition action.
162 state_variables = state_variables
163 action_id = "do_%s" % transition_id167 action_id = "do_%s" % transition_id
164 action = getattr(self, action_id, None)168 action = getattr(self, action_id, None)
165169
@@ -167,9 +171,9 @@
167 try:171 try:
168 log.debug("%s: execute action %s",172 log.debug("%s: execute action %s",
169 class_name(self), action.__name__)173 class_name(self), action.__name__)
170 variables = yield action()174 variables = yield action(**transition_variables)
171 if isinstance(variables, dict):175 if isinstance(variables, dict):
172 state_variables.update(variables)176 transition_variables.update(variables)
173 except TransitionError, e:177 except TransitionError, e:
174 # If an error happens during the transition, allow for178 # If an error happens during the transition, allow for
175 # executing an error transition.179 # executing an error transition.
@@ -187,14 +191,16 @@
187 returnValue(False)191 returnValue(False)
188192
189 # Set the state with state variables193 # Set the state with state variables
190 yield self.set_state(transition.destination, **state_variables)194 yield self.set_state(transition.destination, **transition_variables)
191 log.debug("%s: transition complete %s (state %s) %r",195 log.debug("%s: transition complete %s (state %s) %r",
192 class_name(self), transition_id,196 class_name(self), transition_id,
193 transition.destination, state_variables)197 transition.destination, transition_variables)
194 if transition.success_transition_id:198 if transition.success_transition_id:
195 log.debug("%s: initiating success transition: %s",199 log.debug("%s: initiating success transition: %s",
196 class_name(self), transition.success_transition_id)200 class_name(self), transition.success_transition_id)
197 yield self.fire_transition(transition.success_transition_id)201 yield self.fire_transition(
202 transition.success_transition_id,
203 **transition_variables)
198 returnValue(True)204 returnValue(True)
199205
200 @inlineCallbacks206 @inlineCallbacks
201207
=== modified file 'ensemble/lib/tests/test_statemachine.py'
--- ensemble/lib/tests/test_statemachine.py 2011-04-15 20:20:57 +0000
+++ ensemble/lib/tests/test_statemachine.py 2011-05-02 21:57:29 +0000
@@ -295,6 +295,11 @@
295 Transition("continue", "", "next-state", "final-state"))295 Transition("continue", "", "next-state", "final-state"))
296296
297 workflow_state = AttributeWorkflowState(workflow)297 workflow_state = AttributeWorkflowState(workflow)
298 results = []
299
300 def do_begin(*args, **kw):
301 results.append(kw)
302 workflow_state.do_begin = do_begin
298303
299 yield workflow_state.fire_transition(304 yield workflow_state.fire_transition(
300 "begin", rabbit="moon", hello=True)305 "begin", rabbit="moon", hello=True)
@@ -303,6 +308,10 @@
303 variables = yield workflow_state.get_state_variables()308 variables = yield workflow_state.get_state_variables()
304 self.assertEqual({"rabbit": "moon", "hello": True}, variables)309 self.assertEqual({"rabbit": "moon", "hello": True}, variables)
305310
311 # Ensure the variables made it through to the transition action
312 self.assertEqual(
313 {"rabbit": "moon", "hello": True}, results[0])
314
306 yield workflow_state.fire_transition("continue")315 yield workflow_state.fire_transition("continue")
307 current_state = yield workflow_state.get_state()316 current_state = yield workflow_state.get_state()
308 self.assertEqual(current_state, "final-state")317 self.assertEqual(current_state, "final-state")
309318
=== modified file 'ensemble/state/errors.py'
--- ensemble/state/errors.py 2011-04-28 17:52:55 +0000
+++ ensemble/state/errors.py 2011-05-02 21:57:29 +0000
@@ -151,6 +151,30 @@
151 self.unit_name151 self.unit_name
152152
153153
154class ServiceUnitResolvedAlreadyEnabled(StateError):
155 """The unit has already been marked resolved.
156 """
157
158 def __init__(self, unit_name):
159 self.unit_name = unit_name
160
161 def __str__(self):
162 return "Service unit %r is already marked as resolved." % (
163 self.unit_name)
164
165
166class ServiceUnitRelationResolvedAlreadyEnabled(StateError):
167 """The unit has already been marked resolved.
168 """
169
170 def __init__(self, unit_name):
171 self.unit_name = unit_name
172
173 def __str__(self):
174 return "Service unit %r already has relations marked as resolved." % (
175 self.unit_name)
176
177
154class RelationAlreadyExists(StateError):178class RelationAlreadyExists(StateError):
155179
156 def __init__(self, *endpoints):180 def __init__(self, *endpoints):
157181
=== modified file 'ensemble/state/service.py'
--- ensemble/state/service.py 2011-04-27 16:23:16 +0000
+++ ensemble/state/service.py 2011-05-02 21:57:29 +0000
@@ -15,11 +15,16 @@
15 StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound,15 StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound,
16 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,16 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,
17 BadDescriptor, BadServiceStateName, NoUnusedMachines,17 BadDescriptor, BadServiceStateName, NoUnusedMachines,
18 ServiceUnitDebugAlreadyEnabled)18 ServiceUnitDebugAlreadyEnabled, ServiceUnitResolvedAlreadyEnabled,
19 ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher)
19from ensemble.state.formula import FormulaStateManager20from ensemble.state.formula import FormulaStateManager
20from ensemble.state.relation import ServiceRelationState, RelationStateManager21from ensemble.state.relation import ServiceRelationState, RelationStateManager
21from ensemble.state.machine import _public_machine_id, MachineState22from ensemble.state.machine import _public_machine_id, MachineState
22from ensemble.state.utils import remove_tree23from ensemble.state.utils import remove_tree, dict_merge
24
25
26RETRY_HOOKS = 1000
27NO_HOOKS = 1001
2328
2429
25class ServiceStateManager(StateBase):30class ServiceStateManager(StateBase):
@@ -855,6 +860,185 @@
855 callback_d.addCallback(860 callback_d.addCallback(
856 lambda x: watch_d.addCallback(watcher) and x)861 lambda x: watch_d.addCallback(watcher) and x)
857862
863 @property
864 def _unit_resolve_path(self):
865 return "/units/%s/resolved" % self.internal_id
866
867 @inlineCallbacks
868 def set_resolved(self, retry):
869 """Mark the unit as in need of being resolved.
870
871 :param retry: A boolean denoting if hooks should fire as a result
872 of the retry.
873
874 The resolved setting is set by the command line to inform
875 a unit to attempt an retry transition from an error state.
876 """
877
878 if not retry in (RETRY_HOOKS, NO_HOOKS):
879 raise ValueError("invalid retry value %r" % retry)
880
881 try:
882 yield self._client.create(
883 self._unit_resolve_path, yaml.safe_dump({"retry": retry}))
884 except zookeeper.NodeExistsException:
885 raise ServiceUnitResolvedAlreadyEnabled(self.unit_name)
886
887 @inlineCallbacks
888 def get_resolved(self):
889 """Get the value of the resolved setting if any.
890
891 The resolved setting is retrieved by the unit agent and if
892 found instructs it to attempt an retry transition from an
893 error state.
894 """
895 try:
896 content, stat = yield self._client.get(self._unit_resolve_path)
897 except zookeeper.NoNodeException:
898 # Return a default value.
899 returnValue(None)
900 returnValue(yaml.load(content))
901
902 @inlineCallbacks
903 def clear_resolved(self):
904 """Remove any resolved setting on the unit."""
905 try:
906 yield self._client.delete(self._unit_resolve_path)
907 except zookeeper.NoNodeException:
908 # We get to the same end state.
909 pass
910
911 @inlineCallbacks
912 def watch_resolved(self, callback):
913 """Set a callback to be invoked when an unit is marked resolved.
914
915 :param callback: The callback recieves a single parameter, the
916 change event. The watcher always recieve an initial
917 boolean value invocation denoting the existence of the
918 resolved setting. Subsequent invocations will be with change
919 events.
920 """
921 @inlineCallbacks
922 def watcher(change_event):
923 if not self._client.connected:
924 returnValue(None)
925
926 exists_d, watch_d = self._client.exists_and_watch(
927 self._unit_resolve_path)
928 try:
929 yield callback(change_event)
930 except StopWatcher:
931 returnValue(None)
932 watch_d.addCallback(watcher)
933
934 exists_d, watch_d = self._client.exists_and_watch(
935 self._unit_resolve_path)
936 exists = yield exists_d
937
938 # Setup the watch deferred callback after the user defined callback
939 # has returned successfully from the existence invocation.
940 callback_d = maybeDeferred(callback, bool(exists))
941 callback_d.addCallback(
942 lambda x: watch_d.addCallback(watcher) and x)
943 callback_d.addErrback(
944 lambda failure: failure.trap(StopWatcher))
945
946 @property
947 def _relation_resolved_path(self):
948 return "/units/%s/relation-resolved" % self.internal_id
949
950 @inlineCallbacks
951 def set_relation_resolved(self, relation_map):
952 """Mark a unit's relations as in need of being resolved.
953
954 :param relation_map: A map of internal relation ids, to retry hook
955 values either ensemble.state.service.NO_HOOKS or
956 RETRY_HOOKS.
957 """
958 if not isinstance(relation_map, dict):
959 raise ValueError(
960 "Relation map must be a dictionary %r" % relation_map)
961
962 if [v for v in relation_map.values() if v not in (
963 RETRY_HOOKS, NO_HOOKS)]:
964
965 print relation_map
966 raise ValueError("Invalid setting for retry hook")
967
968 def update_relation_resolved(content, stat):
969 if not content:
970 return yaml.safe_dump(relation_map)
971
972 content = yaml.safe_dump(
973 dict_merge(yaml.load(content), relation_map))
974 return content
975
976 try:
977 yield retry_change(
978 self._client,
979 self._relation_resolved_path,
980 update_relation_resolved)
981 except StateChanged:
982 raise ServiceUnitRelationResolvedAlreadyEnabled(self.unit_name)
983 returnValue(True)
984
985 @inlineCallbacks
986 def get_relation_resolved(self):
987 """Retrieve any resolved flags set for this unit's relations.
988 """
989 try:
990 content, stat = yield self._client.get(
991 self._relation_resolved_path)
992 except zookeeper.NoNodeException:
993 returnValue(None)
994 returnValue(yaml.load(content))
995
996 @inlineCallbacks
997 def clear_relation_resolved(self):
998 """ Clear the relation resolved setting.
999 """
1000 try:
1001 yield self._client.delete(self._relation_resolved_path)
1002 except zookeeper.NoNodeException:
1003 # We get to the same end state.
1004 pass
1005
1006 @inlineCallbacks
1007 def watch_relation_resolved(self, callback):
1008 """Set a callback to be invoked when a unit's relations are resolved.
1009
1010 :param callback: The callback recieves a single parameter, the
1011 change event. The watcher always recieve an initial
1012 boolean value invocation denoting the existence of the
1013 resolved setting. Subsequent invocations will be with change
1014 events.
1015 """
1016 @inlineCallbacks
1017 def watcher(change_event):
1018 if not self._client.connected:
1019 returnValue(None)
1020 exists_d, watch_d = self._client.exists_and_watch(
1021 self._relation_resolved_path)
1022 try:
1023 yield callback(change_event)
1024 except StopWatcher:
1025 returnValue(None)
1026
1027 watch_d.addCallback(watcher)
1028
1029 exists_d, watch_d = self._client.exists_and_watch(
1030 self._relation_resolved_path)
1031
1032 exists = yield exists_d
1033
1034 # Setup the watch deferred callback after the user defined callback
1035 # has returned successfully from the existence invocation.
1036 callback_d = maybeDeferred(callback, bool(exists))
1037 callback_d.addCallback(
1038 lambda x: watch_d.addCallback(watcher) and x)
1039 callback_d.addErrback(
1040 lambda failure: failure.trap(StopWatcher))
1041
8581042
859def _parse_unit_name(unit_name):1043def _parse_unit_name(unit_name):
860 """Parse a unit's name into the service name and its sequence.1044 """Parse a unit's name into the service name and its sequence.
8611045
=== modified file 'ensemble/state/tests/test_errors.py'
--- ensemble/state/tests/test_errors.py 2011-02-23 17:47:46 +0000
+++ ensemble/state/tests/test_errors.py 2011-05-02 21:57:29 +0000
@@ -11,7 +11,9 @@
11 UnitRelationStateAlreadyAssigned, UnknownRelationRole,11 UnitRelationStateAlreadyAssigned, UnknownRelationRole,
12 BadDescriptor, DuplicateEndpoints, IncompatibleEndpoints,12 BadDescriptor, DuplicateEndpoints, IncompatibleEndpoints,
13 NoMatchingEndpoints, AmbiguousEndpoints,13 NoMatchingEndpoints, AmbiguousEndpoints,
14 ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled)14 ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled,
15 ServiceUnitResolvedAlreadyEnabled,
16 ServiceUnitRelationResolvedAlreadyEnabled)
1517
1618
17class StateErrorsTest(TestCase):19class StateErrorsTest(TestCase):
@@ -89,6 +91,23 @@
89 str(error),91 str(error),
90 "Service unit 'wordpress/0' is already in debug mode.")92 "Service unit 'wordpress/0' is already in debug mode.")
9193
94 def test_unit_already_in_resolved_mode(self):
95 error = ServiceUnitResolvedAlreadyEnabled("wordpress/0")
96 self.assertIsStateError(error)
97 self.assertEquals(error.unit_name, "wordpress/0")
98 self.assertEquals(
99 str(error),
100 "Service unit 'wordpress/0' is already marked as resolved.")
101
102 def test_unit_already_in_relation_resolved_mode(self):
103 error = ServiceUnitRelationResolvedAlreadyEnabled("wordpress/0")
104 self.assertIsStateError(error)
105 self.assertEquals(error.unit_name, "wordpress/0")
106 self.assertEquals(
107 str(error),
108 "Service unit %r already has relations marked as resolved." % (
109 "wordpress/0"))
110
92 def test_service_name_in_use(self):111 def test_service_name_in_use(self):
93 error = ServiceStateNameInUse("wordpress")112 error = ServiceStateNameInUse("wordpress")
94 self.assertIsStateError(error)113 self.assertIsStateError(error)
95114
=== modified file 'ensemble/state/tests/test_service.py'
--- ensemble/state/tests/test_service.py 2011-04-23 01:13:27 +0000
+++ ensemble/state/tests/test_service.py 2011-05-02 21:57:29 +0000
@@ -9,14 +9,15 @@
9from ensemble.formula.tests.test_metadata import test_repository_path9from ensemble.formula.tests.test_metadata import test_repository_path
10from ensemble.state.endpoint import RelationEndpoint10from ensemble.state.endpoint import RelationEndpoint
11from ensemble.state.formula import FormulaStateManager11from ensemble.state.formula import FormulaStateManager
12from ensemble.state.service import ServiceStateManager12from ensemble.state.service import ServiceStateManager, NO_HOOKS, RETRY_HOOKS
13from ensemble.state.machine import MachineStateManager13from ensemble.state.machine import MachineStateManager
14from ensemble.state.relation import RelationStateManager14from ensemble.state.relation import RelationStateManager
15from ensemble.state.errors import (15from ensemble.state.errors import (
16 StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound,16 StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound,
17 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,17 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,
18 BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled,18 BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled,
19 MachineStateNotFound, NoUnusedMachines)19 MachineStateNotFound, NoUnusedMachines, ServiceUnitResolvedAlreadyEnabled,
20 ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher)
2021
2122
22from ensemble.state.tests.common import StateTestBase23from ensemble.state.tests.common import StateTestBase
@@ -738,6 +739,283 @@
738 None)739 None)
739740
740 @inlineCallbacks741 @inlineCallbacks
742 def test_get_set_clear_resolved(self):
743 """The a unit can be set to resolved to mark a future transition, with
744 an optional retry flag."""
745
746 unit_state = yield self.get_unit_state()
747
748 self.assertIdentical((yield unit_state.get_resolved()), None)
749 yield unit_state.set_resolved(NO_HOOKS)
750
751 yield self.assertFailure(
752 unit_state.set_resolved(NO_HOOKS),
753 ServiceUnitResolvedAlreadyEnabled)
754 yield self.assertEqual(
755
756 (yield unit_state.get_resolved()), {"retry": NO_HOOKS})
757
758 yield unit_state.clear_resolved()
759 self.assertIdentical((yield unit_state.get_resolved()), None)
760 yield unit_state.clear_resolved()
761
762 yield self.assertFailure(unit_state.set_resolved(None), ValueError)
763
764 @inlineCallbacks
765 def test_watch_resolved(self):
766 """A unit resolved watch can be instituted on a permanent basis."""
767 unit_state = yield self.get_unit_state()
768
769 results = []
770
771 def callback(value):
772 results.append(value)
773
774 unit_state.watch_resolved(callback)
775 yield unit_state.set_resolved(RETRY_HOOKS)
776 yield unit_state.clear_resolved()
777 yield unit_state.set_resolved(NO_HOOKS)
778
779 yield self.poke_zk()
780
781 self.assertEqual(len(results), 4)
782 self.assertIdentical(results.pop(0), False)
783 self.assertEqual(results.pop(0).type_name, "created")
784 self.assertEqual(results.pop(0).type_name, "deleted")
785 self.assertEqual(results.pop(0).type_name, "created")
786
787 self.assertEqual(
788 (yield unit_state.get_resolved()),
789 {"retry": NO_HOOKS})
790
791 @inlineCallbacks
792 def test_stop_watch_resolved(self):
793 """A unit resolved watch can be instituted on a permanent basis.
794
795 However the callback can raise StopWatcher at anytime to stop the watch
796 """
797 unit_state = yield self.get_unit_state()
798
799 results = []
800
801 def callback(value):
802 results.append(value)
803 if len(results) == 1:
804 raise StopWatcher()
805 if len(results) == 3:
806 raise StopWatcher()
807
808 unit_state.watch_resolved(callback)
809 yield unit_state.set_resolved(RETRY_HOOKS)
810 yield unit_state.clear_resolved()
811 yield self.poke_zk()
812
813 unit_state.watch_resolved(callback)
814 yield unit_state.set_resolved(NO_HOOKS)
815 yield unit_state.clear_resolved()
816
817 yield self.poke_zk()
818
819 self.assertEqual(len(results), 3)
820 self.assertIdentical(results.pop(0), False)
821 self.assertIdentical(results.pop(0), False)
822 self.assertEqual(results.pop(0).type_name, "created")
823
824 self.assertEqual(
825 (yield unit_state.get_resolved()), None)
826
827 @inlineCallbacks
828 def test_get_set_clear_relation_resolved(self):
829 """The a unit's realtions can be set to resolved to mark a
830 future transition, with an optional retry flag."""
831
832 unit_state = yield self.get_unit_state()
833
834 self.assertIdentical((yield unit_state.get_relation_resolved()), None)
835 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
836
837 # Trying to set a conflicting raises an error
838 yield self.assertFailure(
839 unit_state.set_relation_resolved({"0": NO_HOOKS}),
840 ServiceUnitRelationResolvedAlreadyEnabled)
841
842 # Doing the same thing is fine
843 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}),
844
845 # Its fine to put in new values
846 yield unit_state.set_relation_resolved({"21": RETRY_HOOKS})
847 yield self.assertEqual(
848 (yield unit_state.get_relation_resolved()),
849 {"0": RETRY_HOOKS, "21": RETRY_HOOKS})
850
851 yield unit_state.clear_relation_resolved()
852 self.assertIdentical((yield unit_state.get_relation_resolved()), None)
853 yield unit_state.clear_relation_resolved()
854
855 yield self.assertFailure(
856 unit_state.set_relation_resolved(True), ValueError)
857 yield self.assertFailure(
858 unit_state.set_relation_resolved(None), ValueError)
859
860 @inlineCallbacks
861 def test_watch_relation_resolved(self):
862 """A unit resolved watch can be instituted on a permanent basis."""
863 unit_state = yield self.get_unit_state()
864
865 results = []
866
867 def callback(value):
868 results.append(value)
869
870 unit_state.watch_relation_resolved(callback)
871 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
872 yield unit_state.clear_relation_resolved()
873 yield unit_state.set_relation_resolved({"0": NO_HOOKS})
874
875 yield self.poke_zk()
876
877 self.assertEqual(len(results), 4)
878 self.assertIdentical(results.pop(0), False)
879 self.assertEqual(results.pop(0).type_name, "created")
880 self.assertEqual(results.pop(0).type_name, "deleted")
881 self.assertEqual(results.pop(0).type_name, "created")
882
883 self.assertEqual(
884 (yield unit_state.get_relation_resolved()),
885 {"0": NO_HOOKS})
886
887 @inlineCallbacks
888 def test_stop_watch_relation_resolved(self):
889 """A unit resolved watch can be instituted on a permanent basis."""
890 unit_state = yield self.get_unit_state()
891
892 results = []
893
894 def callback(value):
895 results.append(value)
896
897 if len(results) == 1:
898 raise StopWatcher()
899
900 if len(results) == 3:
901 raise StopWatcher()
902
903 unit_state.watch_relation_resolved(callback)
904 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
905 yield unit_state.clear_relation_resolved()
906 yield self.poke_zk()
907 self.assertEqual(len(results), 1)
908
909 unit_state.watch_relation_resolved(callback)
910 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
911 yield unit_state.clear_relation_resolved()
912 yield self.poke_zk()
913 self.assertEqual(len(results), 3)
914 self.assertIdentical(results.pop(0), False)
915 self.assertIdentical(results.pop(0), False)
916 self.assertEqual(results.pop(0).type_name, "created")
917
918 self.assertEqual(
919 (yield unit_state.get_relation_resolved()), None)
920
921 @inlineCallbacks
922 def test_watch_resolved_slow_callback(self):
923 """A slow watch callback is still invoked serially."""
924 unit_state = yield self.get_unit_state()
925
926 callbacks = [Deferred() for i in range(5)]
927 results = []
928 contents = []
929
930 @inlineCallbacks
931 def watch(value):
932 results.append(value)
933 yield callbacks[len(results) - 1]
934 contents.append((yield unit_state.get_resolved()))
935
936 yield unit_state.watch_resolved(watch)
937
938 # These get collapsed into a single event
939 yield unit_state.set_resolved(RETRY_HOOKS)
940 yield unit_state.clear_resolved()
941 yield self.poke_zk()
942
943 # Verify the callback hasn't completed
944 self.assertEqual(len(results), 1)
945 self.assertEqual(len(contents), 0)
946
947 # Let it finish
948 callbacks[0].callback(True)
949 yield self.poke_zk()
950
951 # Verify result counts
952 self.assertEqual(len(results), 2)
953 self.assertEqual(len(contents), 1)
954
955 # Verify result values. Even though we have created event, the
956 # setting retrieved shows the hook is not enabled.
957 self.assertEqual(results[-1].type_name, "created")
958 self.assertEqual(contents[-1], None)
959
960 yield unit_state.set_resolved(NO_HOOKS)
961 callbacks[1].callback(True)
962 yield self.poke_zk()
963
964 self.assertEqual(len(results), 3)
965 self.assertEqual(contents[-1], {"retry": NO_HOOKS})
966
967 # Clear out any pending activity.
968 yield self.poke_zk()
969
970 @inlineCallbacks
971 def test_watch_relation_resolved_slow_callback(self):
972 """A slow watch callback is still invoked serially."""
973 unit_state = yield self.get_unit_state()
974
975 callbacks = [Deferred() for i in range(5)]
976 results = []
977 contents = []
978
979 @inlineCallbacks
980 def watch(value):
981 results.append(value)
982 yield callbacks[len(results) - 1]
983 contents.append((yield unit_state.get_relation_resolved()))
984
985 yield unit_state.watch_relation_resolved(watch)
986
987 # These get collapsed into a single event
988 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
989 yield unit_state.clear_relation_resolved()
990 yield self.poke_zk()
991
992 # Verify the callback hasn't completed
993 self.assertEqual(len(results), 1)
994 self.assertEqual(len(contents), 0)
995
996 # Let it finish
997 callbacks[0].callback(True)
998 yield self.poke_zk()
999
1000 # Verify result counts
1001 self.assertEqual(len(results), 2)
1002 self.assertEqual(len(contents), 1)
1003
1004 # Verify result values. Even though we have created event, the
1005 # setting retrieved shows the hook is not enabled.
1006 self.assertEqual(results[-1].type_name, "created")
1007 self.assertEqual(contents[-1], None)
1008
1009 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
1010 callbacks[1].callback(True)
1011 yield self.poke_zk()
1012
1013 self.assertEqual(len(results), 3)
1014 self.assertEqual(contents[-1], {"0": RETRY_HOOKS})
1015 # Clear out any pending activity.
1016 yield self.poke_zk()
1017
1018 @inlineCallbacks
741 def test_set_and_clear_upgrade_flag(self):1019 def test_set_and_clear_upgrade_flag(self):
742 """An upgrade flag can be set on a unit."""1020 """An upgrade flag can be set on a unit."""
7431021
@@ -870,6 +1148,9 @@
870 self.assertEqual(results[-1].type_name, "created")1148 self.assertEqual(results[-1].type_name, "created")
871 self.assertEqual(contents[-1], True)1149 self.assertEqual(contents[-1], True)
8721150
1151 # Clear out any pending activity.
1152 yield self.poke_zk()
1153
873 @inlineCallbacks1154 @inlineCallbacks
874 def test_enable_debug_hook(self):1155 def test_enable_debug_hook(self):
875 """Unit hook debugging can be enabled on the unit state."""1156 """Unit hook debugging can be enabled on the unit state."""
@@ -1061,6 +1342,9 @@
1061 self.assertEqual(results[-1].type_name, "created")1342 self.assertEqual(results[-1].type_name, "created")
1062 self.assertEqual(contents[-1], {"debug_hooks": ["*"]})1343 self.assertEqual(contents[-1], {"debug_hooks": ["*"]})
10631344
1345 # Clear out any pending activity.
1346 yield self.poke_zk()
1347
1064 @inlineCallbacks1348 @inlineCallbacks
1065 def test_service_unit_agent(self):1349 def test_service_unit_agent(self):
1066 """A service unit state has an associated unit agent."""1350 """A service unit state has an associated unit agent."""
10671351
=== modified file 'ensemble/state/tests/test_utils.py'
--- ensemble/state/tests/test_utils.py 2011-04-28 17:52:55 +0000
+++ ensemble/state/tests/test_utils.py 2011-05-02 21:57:29 +0000
@@ -12,9 +12,10 @@
12import yaml12import yaml
1313
14from ensemble.lib.testing import TestCase14from ensemble.lib.testing import TestCase
15from ensemble.state.errors import StateNotFound15from ensemble.state.errors import StateChanged, StateNotFound
16from ensemble.state.utils import (PortWatcher, remove_tree,16from ensemble.state.utils import (PortWatcher, remove_tree, dict_merge,
17 get_open_port, YAMLState)17 get_open_port, YAMLState)
18
18from ensemble.tests.common import get_test_zookeeper_address19from ensemble.tests.common import get_test_zookeeper_address
1920
2021
@@ -206,6 +207,26 @@
206 self.assertNotIn("zoo", children)207 self.assertNotIn("zoo", children)
207208
208209
210class DictMergeTest(TestCase):
211
212 def test_merge_no_match(self):
213 self.assertEqual(
214 dict_merge(dict(a=1), dict(b=2)),
215 dict(a=1, b=2))
216
217 def test_merge_matching_keys_same_value(self):
218 self.assertEqual(
219 dict_merge(dict(a=1, b=2), dict(b=2, c=1)),
220 dict(a=1, b=2, c=1))
221
222 def test_merge_conflict(self):
223 self.assertRaises(
224 StateChanged,
225 dict_merge,
226 dict(a=1, b=3),
227 dict(b=2, c=1))
228
229
209class OpenPortTest(TestCase):230class OpenPortTest(TestCase):
210231
211 def test_get_open_port(self):232 def test_get_open_port(self):
@@ -301,7 +322,6 @@
301 zk_data = yaml.load(zk_data)322 zk_data = yaml.load(zk_data)
302 self.assertEqual(zk_data, options)323 self.assertEqual(zk_data, options)
303324
304
305 @inlineCallbacks325 @inlineCallbacks
306 def test_conflict_on_set(self):326 def test_conflict_on_set(self):
307 """Version conflict error tests.327 """Version conflict error tests.
@@ -357,8 +377,8 @@
357 yield node.read()377 yield node.read()
358378
359 options = dict(alpha="beta", one=1)379 options = dict(alpha="beta", one=1)
360 node["alpha"] = "beta"380 node["alpha"] = "beta"
361 node["one"] = 1381 node["one"] = 1
362 yield node.write()382 yield node.write()
363383
364 # a local get should reflect proper data384 # a local get should reflect proper data
@@ -369,7 +389,6 @@
369 zk_data = yaml.load(zk_data)389 zk_data = yaml.load(zk_data)
370 self.assertEqual(zk_data, options)390 self.assertEqual(zk_data, options)
371391
372
373 @inlineCallbacks392 @inlineCallbacks
374 def test_multiple_reads(self):393 def test_multiple_reads(self):
375 """Calling read resets state to ZK after multiple round-trips."""394 """Calling read resets state to ZK after multiple round-trips."""
@@ -415,7 +434,7 @@
415434
416 result = node.pop("foo")435 result = node.pop("foo")
417 self.assertEqual(result, "bar")436 self.assertEqual(result, "bar")
418 self.assertEqual(node, {"alpha": "beta",})437 self.assertEqual(node, {"alpha": "beta"})
419438
420 node["delta"] = "gamma"439 node["delta"] = "gamma"
421 self.assertEqual(set(node.keys()), set(("alpha", "delta")))440 self.assertEqual(set(node.keys()), set(("alpha", "delta")))
@@ -424,7 +443,6 @@
424 self.assertIn(("alpha", "beta"), result)443 self.assertIn(("alpha", "beta"), result)
425 self.assertIn(("delta", "gamma"), result)444 self.assertIn(("delta", "gamma"), result)
426445
427
428 @inlineCallbacks446 @inlineCallbacks
429 def test_del_empties_state(self):447 def test_del_empties_state(self):
430 d = YAMLState(self.client, self.path)448 d = YAMLState(self.client, self.path)
@@ -500,12 +518,8 @@
500 yield d1.read()518 yield d1.read()
501 self.assertEquals(d1, d2)519 self.assertEquals(d1, d2)
502520
503
504 @inlineCallbacks521 @inlineCallbacks
505 def test_read_requires_node(self):522 def test_read_requires_node(self):
506 """Validate that read raises when required=True."""523 """Validate that read raises when required=True."""
507 d1 = YAMLState(self.client, self.path)524 d1 = YAMLState(self.client, self.path)
508 yield self.assertFailure(d1.read(True), StateNotFound)525 yield self.assertFailure(d1.read(True), StateNotFound)
509
510
511
512526
=== modified file 'ensemble/state/utils.py'
--- ensemble/state/utils.py 2011-04-28 17:52:55 +0000
+++ ensemble/state/utils.py 2011-05-02 21:57:29 +0000
@@ -9,8 +9,10 @@
9import yaml9import yaml
10import zookeeper10import zookeeper
1111
12from ensemble.state.errors import StateChanged
12from ensemble.state.errors import StateNotFound13from ensemble.state.errors import StateNotFound
1314
15
14class PortWatcher(object):16class PortWatcher(object):
1517
16 def __init__(self, host, port, timeout, listen=False):18 def __init__(self, host, port, timeout, listen=False):
@@ -91,6 +93,22 @@
91 return port93 return port
9294
9395
96def dict_merge(d1, d2):
97 """Return a union of dicts if they have no conflicting values.
98
99 Else raise a StateChanged error.
100 """
101 must_match = set(d1).intersection(d2)
102 for k in must_match:
103 if not d1[k] == d2[k]:
104 raise StateChanged()
105
106 d = {}
107 d.update(d1)
108 d.update(d2)
109 return d
110
111
94class YAMLState(DictMixin):112class YAMLState(DictMixin):
95 """Provides a dict like interface around a Zookeeper node113 """Provides a dict like interface around a Zookeeper node
96 containing serialised YAML data. The dict provided represents the114 containing serialised YAML data. The dict provided represents the
@@ -144,7 +162,6 @@
144 if required:162 if required:
145 raise StateNotFound(self._path)163 raise StateNotFound(self._path)
146164
147
148 def _check(self):165 def _check(self):
149 """Verify that sync was called for operations which expect it."""166 """Verify that sync was called for operations which expect it."""
150 if self._pristine_cache is None:167 if self._pristine_cache is None:
@@ -164,7 +181,6 @@
164 self._check()181 self._check()
165 self._cache[key] = value182 self._cache[key] = value
166183
167
168 def __delitem__(self, key):184 def __delitem__(self, key):
169 self._check()185 self._check()
170 del self._cache[key]186 del self._cache[key]
171187
=== modified file 'ensemble/unit/lifecycle.py'
--- ensemble/unit/lifecycle.py 2011-04-15 18:53:29 +0000
+++ ensemble/unit/lifecycle.py 2011-05-02 21:57:29 +0000
@@ -2,14 +2,14 @@
2import logging2import logging
33
4from twisted.internet.defer import (4from twisted.internet.defer import (
5 inlineCallbacks, DeferredLock)5 inlineCallbacks, DeferredLock, returnValue)
66
7from ensemble.hooks.invoker import Invoker7from ensemble.hooks.invoker import Invoker
8from ensemble.hooks.scheduler import HookScheduler8from ensemble.hooks.scheduler import HookScheduler
9from ensemble.state.hook import RelationChange9from ensemble.state.hook import RelationChange
10from ensemble.state.errors import StopWatcher, UnitRelationStateNotFound10from ensemble.state.errors import StopWatcher, UnitRelationStateNotFound
1111
12from ensemble.unit.workflow import RelationWorkflowState, RelationWorkflow12from ensemble.unit.workflow import RelationWorkflowState
1313
1414
15HOOK_SOCKET_FILE = ".ensemble.hookcli.sock"15HOOK_SOCKET_FILE = ".ensemble.hookcli.sock"
@@ -32,7 +32,8 @@
32 self._unit_path = unit_path32 self._unit_path = unit_path
33 self._relations = {}33 self._relations = {}
34 self._running = False34 self._running = False
35 self._watching = False35 self._watching_relation_memberships = False
36 self._watching_relation_resolved = False
36 self._run_lock = DeferredLock()37 self._run_lock = DeferredLock()
37 self._log = logging.getLogger("unit.lifecycle")38 self._log = logging.getLogger("unit.lifecycle")
3839
@@ -45,21 +46,23 @@
45 return self._relations[relation_id]46 return self._relations[relation_id]
4647
47 @inlineCallbacks48 @inlineCallbacks
48 def install(self):49 def install(self, fire_hooks=True):
49 """Invoke the unit's install hook.50 """Invoke the unit's install hook.
50 """51 """
51 yield self._execute_hook("install")52 if fire_hooks:
53 yield self._execute_hook("install")
5254
53 @inlineCallbacks55 @inlineCallbacks
54 def upgrade_formula(self):56 def upgrade_formula(self, fire_hooks=True):
55 """Invoke the unit's upgrade-formula hook.57 """Invoke the unit's upgrade-formula hook.
56 """58 """
57 yield self._execute_hook("upgrade-formula", now=True)59 if fire_hooks:
60 yield self._execute_hook("upgrade-formula", now=True)
58 # Restart hook queued hook execution.61 # Restart hook queued hook execution.
59 self._executor.start()62 self._executor.start()
6063
61 @inlineCallbacks64 @inlineCallbacks
62 def start(self):65 def start(self, fire_hooks=True):
63 """Invoke the start hook, and setup relation watching.66 """Invoke the start hook, and setup relation watching.
64 """67 """
65 self._log.debug("pre-start acquire, running:%s", self._running)68 self._log.debug("pre-start acquire, running:%s", self._running)
@@ -70,22 +73,29 @@
70 assert not self._running, "Already started"73 assert not self._running, "Already started"
7174
72 # Execute the start hook75 # Execute the start hook
73 yield self._execute_hook("start")76 if fire_hooks:
77 yield self._execute_hook("start")
7478
75 # If we have any existing relations in memory, start them.79 # If we have any existing relations in memory, start them.
76 if self._relations:80 if self._relations:
77 self._log.debug("starting relation lifecycles")81 self._log.debug("starting relation lifecycles")
7882
79 for workflow in self._relations.values():83 for workflow in self._relations.values():
80 # should not transition an
81 yield workflow.transition_state("up")84 yield workflow.transition_state("up")
8285
83 # Establish a watch on the existing relations.86 # Establish a watch on the existing relations.
84 if not self._watching:87 if not self._watching_relation_memberships:
85 self._log.debug("starting service relation watch")88 self._log.debug("starting service relation watch")
86 yield self._service.watch_relation_states(89 yield self._service.watch_relation_states(
87 self._on_service_relation_changes)90 self._on_service_relation_changes)
88 self._watching = True91 self._watching_relation_memberships = True
92
93 # Establish a watch for resolved relations
94 if not self._watching_relation_resolved:
95 self._log.debug("starting unit relation resolved watch")
96 yield self._unit.watch_relation_resolved(
97 self._on_relation_resolved_changes)
98 self._watching_relation_resolved = True
8999
90 # Set current status100 # Set current status
91 self._running = True101 self._running = True
@@ -94,7 +104,7 @@
94 self._log.debug("started unit lifecycle")104 self._log.debug("started unit lifecycle")
95105
96 @inlineCallbacks106 @inlineCallbacks
97 def stop(self):107 def stop(self, fire_hooks=True):
98 """Stop the unit, executes the stop hook, and stops relation watching.108 """Stop the unit, executes the stop hook, and stops relation watching.
99 """109 """
100 self._log.debug("pre-stop acquire, running:%s", self._running)110 self._log.debug("pre-stop acquire, running:%s", self._running)
@@ -106,10 +116,12 @@
106 # Stop relation lifecycles116 # Stop relation lifecycles
107 if self._relations:117 if self._relations:
108 self._log.debug("stopping relation lifecycles")118 self._log.debug("stopping relation lifecycles")
119
109 for workflow in self._relations.values():120 for workflow in self._relations.values():
110 yield workflow.transition_state("down")121 yield workflow.transition_state("down")
111122
112 yield self._execute_hook("stop")123 if fire_hooks:
124 yield self._execute_hook("stop")
113125
114 # Set current status126 # Set current status
115 self._running = False127 self._running = False
@@ -118,6 +130,49 @@
118 self._log.debug("stopped unit lifecycle")130 self._log.debug("stopped unit lifecycle")
119131
120 @inlineCallbacks132 @inlineCallbacks
133 def _on_relation_resolved_changes(self, event):
134 """Callback for unit relation resolved watching.
135
136 The callback is invoked whenever the relation resolved
137 settings change.
138 """
139 self._log.debug("relation resolved changed")
140 # Acquire the run lock, and process the changes.
141 yield self._run_lock.acquire()
142
143 try:
144 # If the unit lifecycle isn't running we shouldn't process
145 # any relation resolutions.
146 if not self._running:
147 self._log.debug("stop watch relation resolved changes")
148 self._watching_relation_resolved = False
149 raise StopWatcher()
150
151 self._log.info("processing relation resolved changed")
152 if self._client.connected:
153 yield self._process_relation_resolved_changes()
154 finally:
155 yield self._run_lock.release()
156
157 @inlineCallbacks
158 def _process_relation_resolved_changes(self):
159 """Invoke retry transitions on relations if their not running.
160 """
161 relation_resolved = yield self._unit.get_relation_resolved()
162 if relation_resolved is None:
163 returnValue(None)
164 else:
165 yield self._unit.clear_relation_resolved()
166
167 keys = set(relation_resolved).intersection(self._relations)
168 for rel_id in keys:
169 relation_workflow = self._relations[rel_id]
170 relation_state = yield relation_workflow.get_state()
171 if relation_state == "up":
172 continue
173 yield relation_workflow.transition_state("up")
174
175 @inlineCallbacks
121 def _on_service_relation_changes(self, old_relations, new_relations):176 def _on_service_relation_changes(self, old_relations, new_relations):
122 """Callback for service relation watching.177 """Callback for service relation watching.
123178
@@ -137,7 +192,7 @@
137 # If the lifecycle is not running, then stop the watcher192 # If the lifecycle is not running, then stop the watcher
138 if not self._running:193 if not self._running:
139 self._log.debug("stop service-rel watcher, discarding changes")194 self._log.debug("stop service-rel watcher, discarding changes")
140 self._watching = False195 self._watching_relation_memberships = False
141 raise StopWatcher()196 raise StopWatcher()
142197
143 self._log.debug("processing relations changed")198 self._log.debug("processing relations changed")
@@ -147,9 +202,9 @@
147202
148 @inlineCallbacks203 @inlineCallbacks
149 def _process_service_changes(self, old_relations, new_relations):204 def _process_service_changes(self, old_relations, new_relations):
150 """Add and remove unit lifecycles per the service relations changes.205 """Add and remove unit lifecycles per the service relations Determine.
151 """206 """
152 # Determine relation delta of global zk state with our memory state.207 # changes relation delta of global zk state with our memory state.
153 new_relations = dict([(service_relation.internal_relation_id,208 new_relations = dict([(service_relation.internal_relation_id,
154 service_relation) for209 service_relation) for
155 service_relation in new_relations])210 service_relation in new_relations])
@@ -336,8 +391,11 @@
336 self._error_handler = handler391 self._error_handler = handler
337392
338 @inlineCallbacks393 @inlineCallbacks
339 def start(self):394 def start(self, watches=True):
340 """Start watching related units and executing change hooks.395 """Start watching related units and executing change hooks.
396
397 @param watches: boolean parameter denoting if relation watches
398 should be started.
341 """399 """
342 yield self._run_lock.acquire()400 yield self._run_lock.acquire()
343 try:401 try:
@@ -348,19 +406,24 @@
348 self._watcher = yield self._unit_relation.watch_related_units(406 self._watcher = yield self._unit_relation.watch_related_units(
349 self._scheduler.notify_change)407 self._scheduler.notify_change)
350 # And start the watcher.408 # And start the watcher.
351 yield self._watcher.start()409 if watches:
410 yield self._watcher.start()
352 finally:411 finally:
353 self._run_lock.release()412 self._run_lock.release()
354 self._log.debug(413 self._log.debug(
355 "started relation:%s lifecycle", self._relation_name)414 "started relation:%s lifecycle", self._relation_name)
356415
357 @inlineCallbacks416 @inlineCallbacks
358 def stop(self):417 def stop(self, watches=True):
359 """Stop watching changes and stop executing relation change hooks.418 """Stop watching changes and stop executing relation change hooks.
419
420 @param watches: boolean parameter denoting if relation watches
421 should be stopped.
360 """422 """
361 yield self._run_lock.acquire()423 yield self._run_lock.acquire()
362 try:424 try:
363 self._watcher.stop()425 if watches and self._watcher:
426 self._watcher.stop()
364 self._scheduler.stop()427 self._scheduler.stop()
365 finally:428 finally:
366 yield self._run_lock.release()429 yield self._run_lock.release()
367430
=== modified file 'ensemble/unit/tests/test_lifecycle.py'
--- ensemble/unit/tests/test_lifecycle.py 2011-04-28 17:55:11 +0000
+++ ensemble/unit/tests/test_lifecycle.py 2011-05-02 21:57:29 +0000
@@ -12,6 +12,8 @@
12from ensemble.unit.lifecycle import (12from ensemble.unit.lifecycle import (
13 UnitLifecycle, UnitRelationLifecycle, RelationInvoker)13 UnitLifecycle, UnitRelationLifecycle, RelationInvoker)
1414
15from ensemble.unit.workflow import RelationWorkflowState
16
15from ensemble.hooks.invoker import Invoker17from ensemble.hooks.invoker import Invoker
16from ensemble.hooks.executor import HookExecutor18from ensemble.hooks.executor import HookExecutor
1719
@@ -19,9 +21,11 @@
1921
20from ensemble.state.endpoint import RelationEndpoint22from ensemble.state.endpoint import RelationEndpoint
21from ensemble.state.relation import ClientServerUnitWatcher23from ensemble.state.relation import ClientServerUnitWatcher
24from ensemble.state.service import NO_HOOKS
22from ensemble.state.tests.test_relation import RelationTestBase25from ensemble.state.tests.test_relation import RelationTestBase
23from ensemble.state.hook import RelationChange26from ensemble.state.hook import RelationChange
2427
28
25from ensemble.lib.testing import TestCase29from ensemble.lib.testing import TestCase
26from ensemble.lib.mocker import MATCH30from ensemble.lib.mocker import MATCH
2731
@@ -145,6 +149,182 @@
145 return output149 return output
146150
147151
152class LifecycleResolvedTest(LifecycleTestBase):
153
154 @inlineCallbacks
155 def setUp(self):
156 yield super(LifecycleResolvedTest, self).setUp()
157 yield self.setup_default_test_relation()
158 self.lifecycle = UnitLifecycle(
159 self.client, self.states["unit"], self.states["service"],
160 self.unit_directory, self.executor)
161
162 def get_unit_relation_workflow(self, states):
163 state_dir = os.path.join(self.ensemble_directory, "state")
164 lifecycle = UnitRelationLifecycle(
165 self.client,
166 states["unit_relation"],
167 states["service_relation"].relation_name,
168 self.unit_directory,
169 self.executor)
170
171 workflow = RelationWorkflowState(
172 self.client,
173 states["unit_relation"],
174 lifecycle,
175 state_dir)
176
177 return (workflow, lifecycle)
178
179 @inlineCallbacks
180 def test_resolved_relation_watch_unit_lifecycle_not_running(self):
181 """If the unit is not running then no relation resolving is performed.
182 However the resolution value remains the same.
183 """
184 # Start the unit.
185 yield self.lifecycle.start()
186
187 # Wait for the relation to be started.... TODO: async background work
188 yield self.sleep(0.1)
189
190 # Simulate relation down on an individual unit relation
191 workflow = self.lifecycle._relations.get(
192 self.states["unit_relation"].internal_relation_id)
193 self.assertEqual("up", (yield workflow.get_state()))
194
195 yield workflow.transition_state("down")
196 resolved = self.wait_on_state(workflow, "up")
197
198 # Stop the unit lifecycle
199 yield self.lifecycle.stop()
200
201 # Set the relation to resolved
202 yield self.states["unit"].set_relation_resolved(
203 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
204
205 # Give a moment for the watch to fire erroneously
206 yield self.sleep(0.2)
207
208 # Ensure we didn't attempt a transition.
209 self.assertFalse(resolved.called)
210 self.assertEqual(
211 {self.states["unit_relation"].internal_relation_id: NO_HOOKS},
212 (yield self.states["unit"].get_relation_resolved()))
213
214 # If the unit is restarted start, we currently have the
215 # behavior that the unit relation workflow will automatically
216 # be transitioned back to running, as part of the normal state
217 # transition. Sigh.. we should have a separate error
218 # state for relation hooks then down with state variable usage.
219
220 @inlineCallbacks
221 def test_resolved_relation_watch_relation_up(self):
222 """If a relation marked as to be resolved is already running,
223 then no work is performed.
224 """
225 # Start the unit.
226 yield self.lifecycle.start()
227
228 # Wait for the relation to be started.... TODO: async background work
229 yield self.sleep(0.1)
230
231 # get a hold of the unit relation and verify state
232 workflow = self.lifecycle._relations.get(
233 self.states["unit_relation"].internal_relation_id)
234 self.assertEqual("up", (yield workflow.get_state()))
235
236 # Set the relation to resolved
237 yield self.states["unit"].set_relation_resolved(
238 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
239
240 # Give a moment for the async background work.
241 yield self.sleep(0.1)
242
243 # Ensure we're still up and the relation resolved setting has been
244 # cleared.
245 self.assertEqual(
246 None, (yield self.states["unit"].get_relation_resolved()))
247 self.assertEqual("up", (yield workflow.get_state()))
248
249 @inlineCallbacks
250 def test_resolved_relation_watch_from_error(self):
251 """Unit lifecycle's will process a unit relation resolved
252 setting, and transition a down relation back to a running
253 state.
254 """
255 log_output = self.capture_logging(
256 "unit.lifecycle", level=logging.DEBUG)
257
258 # Start the unit.
259 yield self.lifecycle.start()
260
261 # Wait for the relation to be started... TODO: async background work
262 yield self.sleep(0.1)
263
264 # Simulate an error condition
265 workflow = self.lifecycle._relations.get(
266 self.states["unit_relation"].internal_relation_id)
267 self.assertEqual("up", (yield workflow.get_state()))
268 yield workflow.fire_transition("error")
269
270 resolved = self.wait_on_state(workflow, "up")
271
272 # Set the relation to resolved
273 yield self.states["unit"].set_relation_resolved(
274 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
275
276 # Wait for the relation to come back up
277 value = yield self.states["unit"].get_relation_resolved()
278
279 yield resolved
280
281 # Verify state
282 value = yield workflow.get_state()
283 self.assertEqual(value, "up")
284
285 self.assertIn(
286 "processing relation resolved changed", log_output.getvalue())
287
288 @inlineCallbacks
289 def test_resolved_relation_watch(self):
290 """Unit lifecycle's will process a unit relation resolved
291 setting, and transition a down relation back to a running
292 state.
293 """
294 log_output = self.capture_logging(
295 "unit.lifecycle", level=logging.DEBUG)
296
297 # Start the unit.
298 yield self.lifecycle.start()
299
300 # Wait for the relation to be started... TODO: async background work
301 yield self.sleep(0.1)
302
303 # Simulate an error condition
304 workflow = self.lifecycle._relations.get(
305 self.states["unit_relation"].internal_relation_id)
306 self.assertEqual("up", (yield workflow.get_state()))
307 yield workflow.transition_state("down")
308
309 resolved = self.wait_on_state(workflow, "up")
310
311 # Set the relation to resolved
312 yield self.states["unit"].set_relation_resolved(
313 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
314
315 # Wait for the relation to come back up
316 value = yield self.states["unit"].get_relation_resolved()
317
318 yield resolved
319
320 # Verify state
321 value = yield workflow.get_state()
322 self.assertEqual(value, "up")
323
324 self.assertIn(
325 "processing relation resolved changed", log_output.getvalue())
326
327
148class UnitLifecycleTest(LifecycleTestBase):328class UnitLifecycleTest(LifecycleTestBase):
149329
150 @inlineCallbacks330 @inlineCallbacks
@@ -187,6 +367,45 @@
187 # verify the sockets are cleaned up.367 # verify the sockets are cleaned up.
188 self.assertEqual(os.listdir(self.unit_directory), ["formula"])368 self.assertEqual(os.listdir(self.unit_directory), ["formula"])
189369
370 @inlineCallbacks
371 def test_start_sans_hook(self):
372 """The lifecycle start can be invoked without firing hooks."""
373 self.write_hook("start", "#!/bin/sh\n exit 1")
374 start_executed = self.wait_on_hook("start")
375 yield self.lifecycle.start(fire_hooks=False)
376 # Wait for unit relation background processing....
377 yield self.sleep(0.1)
378 self.assertFalse(start_executed.called)
379
380 @inlineCallbacks
381 def test_stop_sans_hook(self):
382 """The lifecycle stop can be invoked without firing hooks."""
383 self.write_hook("stop", "#!/bin/sh\n exit 1")
384 stop_executed = self.wait_on_hook("stop")
385 yield self.lifecycle.start()
386 yield self.lifecycle.stop(fire_hooks=False)
387 # Wait for unit relation background processing....
388 yield self.sleep(0.1)
389 self.assertFalse(stop_executed.called)
390
391 @inlineCallbacks
392 def test_install_sans_hook(self):
393 """The lifecycle install can be invoked without firing hooks."""
394 self.write_hook("install", "#!/bin/sh\n exit 1")
395 install_executed = self.wait_on_hook("install")
396 yield self.lifecycle.install(fire_hooks=False)
397 self.assertFalse(install_executed.called)
398
399 @inlineCallbacks
400 def test_upgrade_sans_hook(self):
401 """The lifecycle upgrade can be invoked without firing hooks."""
402 self.executor.stop()
403 self.write_hook("upgrade-formula", "#!/bin/sh\n exit 1")
404 upgrade_executed = self.wait_on_hook("upgrade-formula")
405 yield self.lifecycle.upgrade_formula(fire_hooks=False)
406 self.assertFalse(upgrade_executed.called)
407 self.assertTrue(self.executor.running)
408
190 def test_hook_error(self):409 def test_hook_error(self):
191 """Verify hook execution error, raises an exception."""410 """Verify hook execution error, raises an exception."""
192 self.write_hook("install", '#!/bin/sh\n exit 1')411 self.write_hook("install", '#!/bin/sh\n exit 1')
@@ -196,14 +415,12 @@
196 def test_hook_not_executable(self):415 def test_hook_not_executable(self):
197 """A hook not executable, raises an exception."""416 """A hook not executable, raises an exception."""
198 self.write_hook("install", '#!/bin/sh\n exit 0', no_exec=True)417 self.write_hook("install", '#!/bin/sh\n exit 0', no_exec=True)
199 # It would be preferrable if this was also a formulainvocation error.
200 return self.failUnlessFailure(418 return self.failUnlessFailure(
201 self.lifecycle.install(), FormulaError)419 self.lifecycle.install(), FormulaError)
202420
203 def test_hook_not_formatted_correctly(self):421 def test_hook_not_formatted_correctly(self):
204 """Hook execution error, raises an exception."""422 """Hook execution error, raises an exception."""
205 self.write_hook("install", '!/bin/sh\n exit 0')423 self.write_hook("install", '!/bin/sh\n exit 0')
206 # It would be preferrable if this was also a formulainvocation error.
207 return self.failUnlessFailure(424 return self.failUnlessFailure(
208 self.lifecycle.install(), FormulaInvocationError)425 self.lifecycle.install(), FormulaInvocationError)
209426
@@ -510,7 +727,7 @@
510 @inlineCallbacks727 @inlineCallbacks
511 def test_initial_start_lifecycle_no_related_no_exec(self):728 def test_initial_start_lifecycle_no_related_no_exec(self):
512 """729 """
513 If there are no related units on startup, the relation changed hook730 If there are no related units on startup, the relation joined hook
514 is not invoked.731 is not invoked.
515 """732 """
516 file_path = self.makeFile()733 file_path = self.makeFile()
@@ -523,6 +740,26 @@
523 self.assertFalse(os.path.exists(file_path))740 self.assertFalse(os.path.exists(file_path))
524741
525 @inlineCallbacks742 @inlineCallbacks
743 def test_stop_can_continue_watching(self):
744 """
745 """
746 file_path = self.makeFile()
747 self.write_hook(
748 "%s-relation-changed" % self.relation_name,
749 ("#!/bin/bash\n" "echo executed >> %s\n" % file_path))
750 rel_states = yield self.add_opposite_service_unit(self.states)
751 yield self.lifecycle.start()
752 yield self.wait_on_hook(
753 sequence=["app-relation-joined", "app-relation-changed"])
754 changed_executed = self.wait_on_hook("app-relation-changed")
755 yield self.lifecycle.stop(watches=False)
756 rel_states["unit_relation"].set_data(yaml.dump(dict(hello="world")))
757 yield self.sleep(0.1)
758 self.assertFalse(changed_executed.called)
759 yield self.lifecycle.start(watches=False)
760 yield changed_executed
761
762 @inlineCallbacks
526 def test_initial_start_lifecycle_with_related(self):763 def test_initial_start_lifecycle_with_related(self):
527 """764 """
528 If there are related units on startup, the relation changed hook765 If there are related units on startup, the relation changed hook
529766
=== modified file 'ensemble/unit/tests/test_workflow.py'
--- ensemble/unit/tests/test_workflow.py 2011-04-28 17:55:11 +0000
+++ ensemble/unit/tests/test_workflow.py 2011-05-02 21:57:29 +0000
@@ -10,7 +10,7 @@
1010
11from ensemble.unit.workflow import (11from ensemble.unit.workflow import (
12 UnitWorkflowState, RelationWorkflowState, WorkflowStateClient,12 UnitWorkflowState, RelationWorkflowState, WorkflowStateClient,
13 is_unit_running)13 is_unit_running, is_relation_running)
1414
1515
16class WorkflowTestBase(LifecycleTestBase):16class WorkflowTestBase(LifecycleTestBase):
@@ -321,6 +321,26 @@
321 self.state_directory)321 self.state_directory)
322322
323 @inlineCallbacks323 @inlineCallbacks
324 def test_is_relation_running(self):
325 """The unit relation's workflow state can be categorized as a
326 boolean.
327 """
328 running, state = yield is_relation_running(
329 self.client, self.states["unit_relation"])
330 self.assertIdentical(running, False)
331 self.assertIdentical(state, None)
332 yield self.workflow.fire_transition("start")
333 running, state = yield is_relation_running(
334 self.client, self.states["unit_relation"])
335 self.assertIdentical(running, True)
336 self.assertEqual(state, "up")
337 yield self.workflow.fire_transition("stop")
338 running, state = yield is_relation_running(
339 self.client, self.states["unit_relation"])
340 self.assertIdentical(running, False)
341 self.assertEqual(state, "down")
342
343 @inlineCallbacks
324 def test_up_down_cycle(self):344 def test_up_down_cycle(self):
325 """The workflow can be transition from up to down, and back.345 """The workflow can be transition from up to down, and back.
326 """346 """
@@ -381,7 +401,7 @@
381 # Add a new unit, and wait for the broken hook to result in401 # Add a new unit, and wait for the broken hook to result in
382 # the transition to the down state.402 # the transition to the down state.
383 yield self.add_opposite_service_unit(self.states)403 yield self.add_opposite_service_unit(self.states)
384 yield self.wait_on_state(self.workflow, "down")404 yield self.wait_on_state(self.workflow, "error")
385405
386 f_state, history, zk_state = yield self.read_persistent_state(406 f_state, history, zk_state = yield self.read_persistent_state(
387 history_id=self.workflow.zk_state_id)407 history_id=self.workflow.zk_state_id)
@@ -392,7 +412,7 @@
392 "formula", "hooks", "app-relation-changed"))412 "formula", "hooks", "app-relation-changed"))
393413
394 self.assertEqual(f_state,414 self.assertEqual(f_state,
395 {"state": "down",415 {"state": "error",
396 "state_variables": {416 "state_variables": {
397 "change_type": "joined",417 "change_type": "joined",
398 "error_message": error}})418 "error_message": error}})
399419
=== modified file 'ensemble/unit/workflow.py'
--- ensemble/unit/workflow.py 2011-04-19 18:55:06 +0000
+++ ensemble/unit/workflow.py 2011-05-02 21:57:29 +0000
@@ -17,15 +17,19 @@
17 Transition("install", "Install", None, "installed",17 Transition("install", "Install", None, "installed",
18 error_transition_id="error_install"),18 error_transition_id="error_install"),
19 Transition("error_install", "Install Error", None, "install_error"),19 Transition("error_install", "Install Error", None, "install_error"),
20 Transition("retry_install", "Retry Install", "install_error", "installed"),20 Transition("retry_install", "Retry Install", "install_error", "installed",
21 alias="retry"),
21 Transition("start", "Start", "installed", "started",22 Transition("start", "Start", "installed", "started",
22 error_transition_id="error_start"),23 error_transition_id="error_start"),
23 Transition("error_start", "Start Error", "installed", "start_error"),24 Transition("error_start", "Start Error", "installed", "start_error"),
24 Transition("retry_start", "Retry Start", "start_error", "started"),25 Transition("retry_start", "Retry Start", "start_error", "started",
26 alias="retry"),
25 Transition("stop", "Stop", "started", "stopped",27 Transition("stop", "Stop", "started", "stopped",
26 error_transition_id="error_stop"),28 error_transition_id="error_stop"),
27 Transition("error_stop", "Stop Error", "started", "stop_error"),29 Transition("error_stop", "Stop Error", "started", "stop_error"),
28 Transition("retry_stop", "Retry Stop", "stop_error", "stopped"),30 Transition("retry_stop", "Retry Stop", "stop_error", "stopped",
31 alias="retry"),
32 Transition("restart", "Restart", "stop", "start", alias="retry"),
2933
30 # Upgrade Transitions (stay in state, with success transitition)34 # Upgrade Transitions (stay in state, with success transitition)
31 Transition(35 Transition(
@@ -36,27 +40,47 @@
36 "started", "formula_upgrade_error"),40 "started", "formula_upgrade_error"),
37 Transition(41 Transition(
38 "retry_upgrade_formula", "Upgrade from stop error",42 "retry_upgrade_formula", "Upgrade from stop error",
39 "formula_upgrade_error", "started")43 "formula_upgrade_error", "started", alias="retry")
40 )44 )
4145
4246
43# There's been some discussion, if we should have per change type error states47# Unit relation error states
44# here, corresponding to the different changes that the relation-changed hook48#
45# is invoked for. The important aspects to capture are both observability of49# There's been some discussion, if we should have per change type
46# error type locally and globally (zk), and per error type and instance50# error states here, corresponding to the different changes that the
47# recovery of the same. To provide for this functionality without additional51# relation-changed hook is invoked for. The important aspects to
48# states, the error information (change type, and error message) are captured52# capture are both observability of error type locally and globally
49# in state variables which are locally and globally observable. Future53# (zk), and per error type and instance recovery of the same. To
50# extension of the restart transition action, will allow for customized54# provide for this functionality without additional states, the error
51# recovery based on the change type state variable. Effectively this55# information (change type, and error message) are captured in state
52# differs from the unit definition, in that it collapses three possible error56# variables which are locally and globally observable. Future
53# states, into a behavior off switch. A separate state will be needed to57# extension of the restart transition action, will allow for
54# denote departing.58# customized recovery based on the change type state
59# variable. Effectively this differs from the unit definition, in that
60# it collapses three possible error states, into a behavior off
61# switch. A separate state will be needed to denote departing.
62
63
64# Process recovery using on disk workflow state
65#
66# Another interesting issue, process recovery using the on disk state,
67# is complicated by consistency to the the in memory state, which
68# won't be directly recoverable anymore without some state specific
69# semantics to recovering from on disk state, ie a restarted unit
70# agent, with a relation in an error state would require special
71# semantics around loading from disk to ensure that the in-memory
72# process state (watching and scheduling but not executing) matches
73# the recovery transition actions (which just restart hook execution,
74# but assume the watch continues).. this functionality added to better
75# allow for the behavior that while down due to a hook error, the
76# relation would continues to schedule pending hooks
5577
56RelationWorkflow = Workflow(78RelationWorkflow = Workflow(
57 Transition("start", "Start", None, "up"),79 Transition("start", "Start", None, "up"),
58 Transition("stop", "Stop", "up", "down"),80 Transition("stop", "Stop", "up", "down"),
59 Transition("restart", "Restart", "down", "up"),81 Transition("restart", "Restart", "down", "up", alias="retry"),
82 Transition("error", "Relation hook error", "up", "error"),
83 Transition("reset", "Recover from hook error", "error", "up"),
60 Transition("depart", "Relation broken", "up", "departed"),84 Transition("depart", "Relation broken", "up", "departed"),
61 Transition("down_depart", "Relation broken", "down", "departed"),85 Transition("down_depart", "Relation broken", "down", "departed"),
62 )86 )
@@ -67,12 +91,26 @@
67 """Is the service unit in a running state.91 """Is the service unit in a running state.
6892
69 Returns a boolean which is true if the unit is running, and93 Returns a boolean which is true if the unit is running, and
70 the unit state in two element tuple.94 the unit workflow state in a two element tuple.
71 """95 """
72 workflow_state = yield WorkflowStateClient(client, unit).get_state()96 workflow_state = yield WorkflowStateClient(client, unit).get_state()
73 if not workflow_state:97 if not workflow_state:
74 returnValue((False, None))98 returnValue((False, None))
75 running = workflow_state in ("started",)99 running = workflow_state == "started"
100 returnValue((running, workflow_state))
101
102
103@inlineCallbacks
104def is_relation_running(client, relation):
105 """Is the unit relation in a running state.
106
107 Returns a boolean which is true if the relation is running, and
108 the unit relation workflow state in a two element tuple.
109 """
110 workflow_state = yield WorkflowStateClient(client, relation).get_state()
111 if not workflow_state:
112 returnValue((False, None))
113 running = workflow_state == "up"
76 returnValue((running, workflow_state))114 returnValue((running, workflow_state))
77115
78116
@@ -198,7 +236,6 @@
198 row per entry with CSV escaping.236 row per entry with CSV escaping.
199 """237 """
200 state_serialized = yaml.safe_dump(state_dict)238 state_serialized = yaml.safe_dump(state_dict)
201
202 # State File239 # State File
203 with open(self.state_file_path, "w") as handle:240 with open(self.state_file_path, "w") as handle:
204 handle.write(state_serialized)241 handle.write(state_serialized)
@@ -218,6 +255,7 @@
218 return {"state": None}255 return {"state": None}
219 with open(self.state_file_path, "r") as handle:256 with open(self.state_file_path, "r") as handle:
220 content = handle.read()257 content = handle.read()
258
221 return yaml.load(content)259 return yaml.load(content)
222260
223261
@@ -257,9 +295,9 @@
257 self._lifecycle = lifecycle295 self._lifecycle = lifecycle
258296
259 @inlineCallbacks297 @inlineCallbacks
260 def _invoke_lifecycle(self, method):298 def _invoke_lifecycle(self, method, *args, **kw):
261 try:299 try:
262 result = yield method()300 result = yield method(*args, **kw)
263 except (FileNotFound, FormulaError, FormulaInvocationError), e:301 except (FileNotFound, FormulaError, FormulaInvocationError), e:
264 raise TransitionError(e)302 raise TransitionError(e)
265 returnValue(result)303 returnValue(result)
@@ -274,21 +312,25 @@
274 def do_stop(self):312 def do_stop(self):
275 return self._invoke_lifecycle(self._lifecycle.stop)313 return self._invoke_lifecycle(self._lifecycle.stop)
276314
277 def do_retry_start(self):
278 return self._invoke_lifecycle(self._lifecycle.start)
279
280 def do_retry_stop(self):
281 self._invoke_lifecycle(self._lifecycle.stop)
282
283 def do_retry_install(self):
284 return self._invoke_lifecycle(self._lifecycle.install)
285
286 def do_retry_upgrade_formula(self):
287 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)
288
289 def do_upgrade_formula(self):315 def do_upgrade_formula(self):
290 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)316 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)
291317
318 def do_retry_start(self, fire_hooks=True):
319 return self._invoke_lifecycle(
320 self._lifecycle.start, fire_hooks=fire_hooks)
321
322 def do_retry_stop(self, fire_hooks=True):
323 self._invoke_lifecycle(
324 self._lifecycle.stop, fire_hooks=fire_hooks)
325
326 def do_retry_install(self, fire_hooks=True):
327 return self._invoke_lifecycle(
328 self._lifecycle.install, fire_hooks=fire_hooks)
329
330 def do_retry_upgrade_formula(self, fire_hooks=True):
331 return self._invoke_lifecycle(
332 self._lifecycle.upgrade_formula, fire_hooks=fire_hooks)
333
292334
293class RelationWorkflowState(DiskWorkflowState):335class RelationWorkflowState(DiskWorkflowState):
294336
@@ -321,7 +363,7 @@
321363
322 @param: error: The error from hook invocation.364 @param: error: The error from hook invocation.
323 """365 """
324 yield self.fire_transition("stop",366 yield self.fire_transition("error",
325 change_type=relation_change.change_type,367 change_type=relation_change.change_type,
326 error_message=str(error))368 error_message=str(error))
327369
@@ -330,12 +372,30 @@
330 """Transition the workflow to the 'down' state.372 """Transition the workflow to the 'down' state.
331373
332 Turns off the unit-relation lifecycle monitoring and hook execution.374 Turns off the unit-relation lifecycle monitoring and hook execution.
375
376 :param error_info: If called on relation hook error, contains
377 error variables.
333 """378 """
334 yield self._lifecycle.stop()379 yield self._lifecycle.stop()
335380
336 @inlineCallbacks381 @inlineCallbacks
382 def do_reset(self):
383 """Transition the workflow to the 'up' state from an error state.
384
385 Turns on the unit-relation lifecycle monitoring and hook execution.
386 """
387 yield self._lifecycle.start(watches=False)
388
389 @inlineCallbacks
390 def do_error(self, **error_info):
391 """A relation hook error, stops further execution hooks but
392 continues to watch for changes.
393 """
394 yield self._lifecycle.stop(watches=False)
395
396 @inlineCallbacks
337 def do_restart(self):397 def do_restart(self):
338 """Transition the workflow to the 'up' state.398 """Transition the workflow to the 'up' state from the down state.
339399
340 Turns on the unit-relation lifecycle monitoring and hook execution.400 Turns on the unit-relation lifecycle monitoring and hook execution.
341 """401 """

Subscribers

People subscribed via source and target branches

to status/vote changes: