Merge lp:~hazmat/pyjuju/unit-agent-resolved into lp:pyjuju

Proposed by Kapil Thangavelu
Status: Superseded
Proposed branch: lp:~hazmat/pyjuju/unit-agent-resolved
Merge into: lp:pyjuju
Diff against target: 2237 lines (+1531/-93)
16 files modified
Makefile (+1/-1)
ensemble/control/__init__.py (+2/-0)
ensemble/control/resolved.py (+110/-0)
ensemble/control/tests/test_resolved.py (+390/-0)
ensemble/lib/statemachine.py (+16/-10)
ensemble/lib/tests/test_statemachine.py (+9/-0)
ensemble/state/errors.py (+24/-0)
ensemble/state/service.py (+186/-2)
ensemble/state/tests/test_errors.py (+20/-1)
ensemble/state/tests/test_service.py (+286/-2)
ensemble/state/tests/test_utils.py (+26/-12)
ensemble/state/utils.py (+18/-2)
ensemble/unit/lifecycle.py (+84/-21)
ensemble/unit/tests/test_lifecycle.py (+240/-3)
ensemble/unit/tests/test_workflow.py (+23/-3)
ensemble/unit/workflow.py (+96/-36)
To merge this branch: bzr merge lp:~hazmat/pyjuju/unit-agent-resolved
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+59712@code.launchpad.net

This proposal has been superseded by a proposal from 2011-05-02.

Description of the change

This implements the unit lifecycle resolving unit relations, and additional changes to enable resolution (transition actions receiving hook execution flags).

The branch is large enough that i'm going to split the remaining unit agent resolving unit relations into another branch.

To post a comment you must log in.
269. By Kapil Thangavelu

update unit tests to use relation workflow accessor on lifecycle.

270. By Kapil Thangavelu

remove passing action transition/state variables

271. By Kapil Thangavelu

separate transitions for retry with hook

272. By Kapil Thangavelu

merge trunk resolve conflict.

273. By Kapil Thangavelu

resolve merge conflict

274. By Kapil Thangavelu

expand out additional recovery transition actions

275. By Kapil Thangavelu

complete the change over to more transitions, fill out coverage

276. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

277. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

278. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

279. By Kapil Thangavelu

address review comments re formatting

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'Makefile'
2--- Makefile 2011-02-11 14:17:18 +0000
3+++ Makefile 2011-05-02 21:57:29 +0000
4@@ -31,7 +31,7 @@
5 @test -n "$(modified)" && echo $(modified) | xargs pyflakes
6
7
8-modified=$(shell bzr status -S -r ancestor:../trunk |grep -P '^\s*M' | awk '{print $$2;}'| grep -P ".py$$")
9+modified=$(shell bzr status -S -r ancestor:../../trunk |grep -P '^\s*M' | awk '{print $$2;}'| grep -P ".py$$")
10 review:
11 @test -n "$(modified)" && echo $(modified) | xargs $(PEP8) --repeat
12 @test -n "$(modified)" && echo $(modified) | xargs pyflakes
13
14=== modified file 'ensemble/control/__init__.py'
15--- ensemble/control/__init__.py 2011-04-25 17:53:19 +0000
16+++ ensemble/control/__init__.py 2011-05-02 21:57:29 +0000
17@@ -17,6 +17,7 @@
18 import open_tunnel
19 import remove_relation
20 import remove_unit
21+import resolved
22 import status
23 import shutdown
24 import ssh
25@@ -39,6 +40,7 @@
26 open_tunnel,
27 remove_relation,
28 remove_unit,
29+ resolved,
30 status,
31 ssh,
32 shutdown,
33
34=== added file 'ensemble/control/resolved.py'
35--- ensemble/control/resolved.py 1970-01-01 00:00:00 +0000
36+++ ensemble/control/resolved.py 2011-05-02 21:57:29 +0000
37@@ -0,0 +1,110 @@
38+"""Implementation of resolved subcommand"""
39+
40+
41+from twisted.internet.defer import inlineCallbacks, returnValue
42+
43+from ensemble.control.utils import get_environment
44+
45+from ensemble.state.service import ServiceStateManager, RETRY_HOOKS, NO_HOOKS
46+from ensemble.state.relation import RelationStateManager
47+from ensemble.state.errors import RelationStateNotFound
48+from ensemble.unit.workflow import is_unit_running, is_relation_running
49+
50+
51+def configure_subparser(subparsers):
52+ """Configure resolved subcommand"""
53+ sub_parser = subparsers.add_parser(
54+ "resolved", help=command.__doc__, description=resolved.__doc__)
55+ sub_parser.add_argument(
56+ "--retry", "-r", action="store_true",
57+ help="Retry failed hook."),
58+ sub_parser.add_argument(
59+ "--environment", "-e",
60+ help="Ensemble environment to operate in.")
61+ sub_parser.add_argument(
62+ "service_unit_name",
63+ help="Name of the service unit that should be resolved")
64+ sub_parser.add_argument(
65+ "relation_name", nargs="?", default=None,
66+ help="Name of the unit relation that should be resolved")
67+ return sub_parser
68+
69+
70+def command(options):
71+ """Mark an error as resolved in a unit or unit relation."""
72+ environment = get_environment(options)
73+ return resolved(
74+ options.environments,
75+ environment,
76+ options.verbose,
77+ options.log,
78+ options.service_unit_name,
79+ options.relation_name,
80+ options.retry)
81+
82+
83+@inlineCallbacks
84+def resolved(
85+ config, environment, verbose, log, unit_name, relation_name, retry):
86+ """Mark an error as resolved in a unit or unit relation.
87+
88+ If one of a unit's formula non-relation hooks returns a non-zero exit
89+ status, the entire unit can be considered to be in a non-running state.
90+
91+ As a resolution, the the unit can be manually returned a running state
92+ via the ensemble resolved command. Optionally this command can also
93+ rerun the failed hook.
94+
95+ This resolution also applies separately to each of the unit's relations.
96+ If one of the relation-hooks failed. In that case there is no
97+ notion of retrying (the change is gone), but resolving will allow
98+ additional relation hooks for that relation to proceed.
99+ """
100+ provider = environment.get_machine_provider()
101+ client = yield provider.connect()
102+ service_manager = ServiceStateManager(client)
103+ relation_manager = RelationStateManager(client)
104+
105+ unit_state = yield service_manager.get_unit_state(unit_name)
106+ service_state = yield service_manager.get_service_state(
107+ unit_name.split("/")[0])
108+
109+ retry = retry and RETRY_HOOKS or NO_HOOKS
110+
111+ if not relation_name:
112+ running, workflow_state = yield is_unit_running(client, unit_state)
113+ if running:
114+ log.info("Unit %r already running: %s", unit_name, workflow_state)
115+ client.close()
116+ returnValue(False)
117+
118+ yield unit_state.set_resolved(retry)
119+ log.info("Marked unit %r as resolved", unit_name)
120+ returnValue(True)
121+
122+ # Check for the matching relations
123+ service_relations = yield relation_manager.get_relations_for_service(
124+ service_state)
125+ service_relations = [
126+ sr for sr in service_relations if sr.relation_name == relation_name]
127+ if not service_relations:
128+ raise RelationStateNotFound()
129+
130+ # Verify the relations are in need of resolution.
131+ resolve_relations = {}
132+ for service_relation in service_relations:
133+ unit_relation = yield service_relation.get_unit_state(unit_state)
134+ running, state = yield is_relation_running(client, unit_relation)
135+ if not running:
136+ resolve_relations[unit_relation.internal_relation_id] = retry
137+
138+ if not resolve_relations:
139+ log.warning("Matched relations are all running")
140+ client.close()
141+ returnValue(False)
142+
143+ # Mark the relations for resolution.
144+ yield unit_state.set_relation_resolved(resolve_relations)
145+ log.info(
146+ "Marked unit %r relation %r as resolved", unit_name, relation_name)
147+ client.close()
148
149=== added file 'ensemble/control/tests/test_resolved.py'
150--- ensemble/control/tests/test_resolved.py 1970-01-01 00:00:00 +0000
151+++ ensemble/control/tests/test_resolved.py 2011-05-02 21:57:29 +0000
152@@ -0,0 +1,390 @@
153+from twisted.internet.defer import inlineCallbacks, returnValue
154+from yaml import dump
155+
156+from ensemble.control import main
157+from ensemble.control.resolved import resolved
158+from ensemble.control.tests.common import ControlToolTest
159+from ensemble.formula.tests.test_repository import RepositoryTestBase
160+
161+from ensemble.state.service import RETRY_HOOKS, NO_HOOKS
162+from ensemble.state.tests.test_service import ServiceStateManagerTestBase
163+from ensemble.state.errors import ServiceStateNotFound
164+
165+from ensemble.unit.workflow import UnitWorkflowState, RelationWorkflowState
166+from ensemble.unit.lifecycle import UnitRelationLifecycle
167+from ensemble.hooks.executor import HookExecutor
168+
169+
170+class ControlResolvedTest(
171+ ServiceStateManagerTestBase, ControlToolTest, RepositoryTestBase):
172+
173+ @inlineCallbacks
174+ def setUp(self):
175+ yield super(ControlResolvedTest, self).setUp()
176+ config = {
177+ "ensemble": "environments",
178+ "environments": {"firstenv": {"type": "dummy"}}}
179+
180+ self.write_config(dump(config))
181+ self.config.load()
182+
183+ yield self.add_relation_state("wordpress", "mysql")
184+ yield self.add_relation_state("wordpress", "varnish")
185+
186+ self.service1 = yield self.service_state_manager.get_service_state(
187+ "mysql")
188+ self.service_unit1 = yield self.service1.add_unit_state()
189+ self.service_unit2 = yield self.service1.add_unit_state()
190+
191+ self.unit1_workflow = UnitWorkflowState(
192+ self.client, self.service_unit1, None, self.makeDir())
193+ yield self.unit1_workflow.set_state("started")
194+
195+ self.environment = self.config.get_default()
196+ self.provider = self.environment.get_machine_provider()
197+
198+ self.output = self.capture_logging()
199+ self.stderr = self.capture_stream("stderr")
200+ self.executor = HookExecutor()
201+
202+ @inlineCallbacks
203+ def add_relation_state(self, *service_names):
204+ for service_name in service_names:
205+ try:
206+ yield self.service_state_manager.get_service_state(
207+ service_name)
208+ except ServiceStateNotFound:
209+ yield self.add_service_from_formula(service_name)
210+
211+ endpoint_pairs = yield self.service_state_manager.join_descriptors(
212+ *service_names)
213+ endpoints = endpoint_pairs[0]
214+ endpoints = endpoint_pairs[0]
215+ if endpoints[0] == endpoints[1]:
216+ endpoints = endpoints[0:1]
217+ relation_state = (yield self.relation_state_manager.add_relation_state(
218+ *endpoints))[0]
219+ returnValue(relation_state)
220+
221+ @inlineCallbacks
222+ def get_named_service_relation(self, service_state, relation_name):
223+ if isinstance(service_state, str):
224+ service_state = yield self.service_state_manager.get_service_state(
225+ service_state)
226+
227+ rels = yield self.relation_state_manager.get_relations_for_service(
228+ service_state)
229+
230+ rels = [sr for sr in rels if sr.relation_name == relation_name]
231+ if len(rels) == 1:
232+ returnValue(rels[0])
233+ returnValue(rels)
234+
235+ @inlineCallbacks
236+ def setup_unit_relations(self, service_relation, *units):
237+ """
238+ Given a service relation and set of unit tuples in the form
239+ unit_state, unit_relation_workflow_state, will add unit relations
240+ for these units and update their workflow state to the desired/given
241+ state.
242+ """
243+ for unit, state in units:
244+ unit_relation = yield service_relation.add_unit_state(unit)
245+ lifecycle = UnitRelationLifecycle(
246+ self.client, unit_relation, service_relation.relation_name,
247+ self.makeDir(), self.executor)
248+ workflow_state = RelationWorkflowState(
249+ self.client, unit_relation, lifecycle, self.makeDir())
250+ yield workflow_state.set_state(state)
251+
252+ @inlineCallbacks
253+ def test_resolved(self):
254+ """
255+ 'ensemble resolved <unit_name>' will schedule a unit for
256+ retrying from an error state.
257+ """
258+ # Push the unit into an error state
259+ yield self.unit1_workflow.set_state("start_error")
260+ self.setup_exit(0)
261+ finished = self.setup_cli_reactor()
262+ self.mocker.replay()
263+
264+ self.assertEqual(
265+ (yield self.service_unit1.get_resolved()), None)
266+
267+ main(["resolved", "mysql/0"])
268+ yield finished
269+
270+ self.assertEqual(
271+ (yield self.service_unit1.get_resolved()), {"retry": NO_HOOKS})
272+ self.assertIn(
273+ "Marked unit 'mysql/0' as resolved",
274+ self.output.getvalue())
275+
276+ @inlineCallbacks
277+ def test_resolved_retry(self):
278+ """
279+ 'ensemble resolved --retry <unit_name>' will schedule a unit
280+ for retrying from an error state with a retry of hooks
281+ executions.
282+ """
283+ yield self.unit1_workflow.set_state("start_error")
284+ self.setup_exit(0)
285+ finished = self.setup_cli_reactor()
286+ self.mocker.replay()
287+
288+ self.assertEqual(
289+ (yield self.service_unit1.get_resolved()), None)
290+
291+ main(["resolved", "--retry", "mysql/0"])
292+ yield finished
293+
294+ self.assertEqual(
295+ (yield self.service_unit1.get_resolved()), {"retry": RETRY_HOOKS})
296+ self.assertIn(
297+ "Marked unit 'mysql/0' as resolved",
298+ self.output.getvalue())
299+
300+ @inlineCallbacks
301+ def test_relation_resolved(self):
302+ """
303+ 'ensemble relation <unit_name> <rel_name>' will schedule
304+ the broken unit relations for being resolved.
305+ """
306+ service_relation = yield self.get_named_service_relation(
307+ self.service1, "server")
308+
309+ self.setup_unit_relations(
310+ service_relation,
311+ (self.service_unit1, "down"),
312+ (self.service_unit2, "up"))
313+
314+ yield self.unit1_workflow.set_state("start_error")
315+ self.setup_exit(0)
316+ finished = self.setup_cli_reactor()
317+ self.mocker.replay()
318+
319+ self.assertEqual(
320+ (yield self.service_unit1.get_relation_resolved()), None)
321+
322+ main(["resolved", "--retry", "mysql/0",
323+ service_relation.relation_name])
324+ yield finished
325+
326+ self.assertEqual(
327+ (yield self.service_unit1.get_relation_resolved()),
328+ {service_relation.internal_relation_id: RETRY_HOOKS})
329+ self.assertEqual(
330+ (yield self.service_unit2.get_relation_resolved()),
331+ None)
332+ self.assertIn(
333+ "Marked unit 'mysql/0' relation 'server' as resolved",
334+ self.output.getvalue())
335+
336+ @inlineCallbacks
337+ def test_resolved_relation_some_already_resolved(self):
338+ """
339+ 'ensemble resolved <service_name> <rel_name>' will mark
340+ resolved all down units that are not already marked resolved.
341+ """
342+
343+ service2 = yield self.service_state_manager.get_service_state(
344+ "wordpress")
345+ service_unit1 = yield service2.add_unit_state()
346+
347+ service_relation = yield self.get_named_service_relation(
348+ service2, "db")
349+ yield self.setup_unit_relations(
350+ service_relation, (service_unit1, "down"))
351+
352+ service_relation2 = yield self.get_named_service_relation(
353+ service2, "cache")
354+ yield self.setup_unit_relations(
355+ service_relation2, (service_unit1, "down"))
356+
357+ yield service_unit1.set_relation_resolved(
358+ {service_relation.internal_relation_id: NO_HOOKS})
359+
360+ self.setup_exit(0)
361+ finished = self.setup_cli_reactor()
362+ self.mocker.replay()
363+
364+ main(["resolved", "--retry", "wordpress/0", "cache"])
365+ yield finished
366+
367+ self.assertEqual(
368+ (yield service_unit1.get_relation_resolved()),
369+ {service_relation.internal_relation_id: NO_HOOKS,
370+ service_relation2.internal_relation_id: RETRY_HOOKS})
371+
372+ self.assertIn(
373+ "Marked unit 'wordpress/0' relation 'cache' as resolved",
374+ self.output.getvalue())
375+
376+ @inlineCallbacks
377+ def test_resolved_relation_some_already_resolved_conflict(self):
378+ """
379+ 'ensemble resolved <service_name> <rel_name>' will mark
380+ resolved all down units that are not already marked resolved.
381+ """
382+
383+ service2 = yield self.service_state_manager.get_service_state(
384+ "wordpress")
385+ service_unit1 = yield service2.add_unit_state()
386+
387+ service_relation = yield self.get_named_service_relation(
388+ service2, "db")
389+ yield self.setup_unit_relations(
390+ service_relation, (service_unit1, "down"))
391+
392+ yield service_unit1.set_relation_resolved(
393+ {service_relation.internal_relation_id: NO_HOOKS})
394+
395+ self.setup_exit(0)
396+ finished = self.setup_cli_reactor()
397+ self.mocker.replay()
398+
399+ main(["resolved", "--retry", "wordpress/0", "db"])
400+ yield finished
401+
402+ self.assertEqual(
403+ (yield service_unit1.get_relation_resolved()),
404+ {service_relation.internal_relation_id: NO_HOOKS})
405+
406+ self.assertIn(
407+ "Service unit 'wordpress/0' already has relations marked as resol",
408+ self.output.getvalue())
409+
410+ @inlineCallbacks
411+ def test_resolved_unknown_service(self):
412+ """
413+ 'ensemble resolved <unit_name>' will report if a service is
414+ invalid.
415+ """
416+ self.setup_exit(0)
417+ finished = self.setup_cli_reactor()
418+ self.mocker.replay()
419+ main(["resolved", "zebra/0"])
420+ yield finished
421+ self.assertIn("Service 'zebra' was not found", self.stderr.getvalue())
422+
423+ @inlineCallbacks
424+ def test_resolved_unknown_unit(self):
425+ """
426+ 'ensemble resolved <unit_name>' will report if a unit is
427+ invalid.
428+ """
429+ self.setup_exit(0)
430+ finished = self.setup_cli_reactor()
431+ self.mocker.replay()
432+ main(["resolved", "mysql/5"])
433+ yield finished
434+ self.assertIn(
435+ "Service unit 'mysql/5' was not found", self.output.getvalue())
436+
437+ @inlineCallbacks
438+ def test_resolved_unknown_unit_relation(self):
439+ """
440+ 'ensemble resolved <unit_name>' will report if a relation is
441+ invalid.
442+ """
443+ self.setup_exit(0)
444+ finished = self.setup_cli_reactor()
445+ self.mocker.replay()
446+
447+ self.assertEqual(
448+ (yield self.service_unit1.get_resolved()), None)
449+
450+ main(["resolved", "mysql/0", "magic"])
451+ yield finished
452+
453+ self.assertIn("Relation not found", self.output.getvalue())
454+
455+ @inlineCallbacks
456+ def test_resolved_already_running(self):
457+ """
458+ 'ensemble resolved <unit_name>' will report if
459+ the unit is already running.
460+ """
461+ # Just verify we don't accidentally mark up another unit of the service
462+ unit2_workflow = UnitWorkflowState(
463+ self.client, self.service_unit2, None, self.makeDir())
464+ unit2_workflow.set_state("start_error")
465+
466+ self.setup_exit(0)
467+ finished = self.setup_cli_reactor()
468+ self.mocker.replay()
469+
470+ main(["resolved", "mysql/0"])
471+ yield finished
472+
473+ self.assertEqual(
474+ (yield self.service_unit2.get_resolved()), None)
475+ self.assertEqual(
476+ (yield self.service_unit1.get_resolved()), None)
477+
478+ self.assertNotIn(
479+ "Unit 'mysql/0 already running: started",
480+ self.output.getvalue())
481+
482+ @inlineCallbacks
483+ def test_resolved_already_resolved(self):
484+ """
485+ 'ensemble resolved <unit_name>' will report if
486+ the unit is already resolved.
487+ """
488+ # Mark the unit as resolved and as in an error state.
489+ yield self.service_unit1.set_resolved(RETRY_HOOKS)
490+ yield self.unit1_workflow.set_state("start_error")
491+
492+ unit2_workflow = UnitWorkflowState(
493+ self.client, self.service_unit1, None, self.makeDir())
494+ unit2_workflow.set_state("start_error")
495+
496+ self.assertEqual(
497+ (yield self.service_unit2.get_resolved()), None)
498+
499+ self.setup_exit(0)
500+ finished = self.setup_cli_reactor()
501+ self.mocker.replay()
502+
503+ main(["resolved", "mysql/0"])
504+ yield finished
505+
506+ self.assertEqual(
507+ (yield self.service_unit1.get_resolved()),
508+ {"retry": RETRY_HOOKS})
509+ self.assertNotIn(
510+ "Marked unit 'mysql/0' as resolved",
511+ self.output.getvalue())
512+ self.assertIn(
513+ "Service unit 'mysql/0' is already marked as resolved.",
514+ self.stderr.getvalue(), "")
515+
516+ @inlineCallbacks
517+ def test_resolved_relation_already_running(self):
518+ """
519+ 'ensemble resolved <unit_name> <rel_name>' will report
520+ if the relation is already running.
521+ """
522+ service2 = yield self.service_state_manager.get_service_state(
523+ "wordpress")
524+ service_unit1 = yield service2.add_unit_state()
525+
526+ service_relation = yield self.get_named_service_relation(
527+ service2, "db")
528+ yield self.setup_unit_relations(
529+ service_relation, (service_unit1, "up"))
530+
531+ self.setup_exit(0)
532+ finished = self.setup_cli_reactor()
533+ self.mocker.replay()
534+
535+ main(["resolved", "wordpress/0", "db"])
536+ yield finished
537+
538+ self.assertIn("Matched relations are all running",
539+ self.output.getvalue())
540+ self.assertEqual(
541+ (yield service_unit1.get_relation_resolved()),
542+ None)
543
544=== modified file 'ensemble/lib/statemachine.py'
545--- ensemble/lib/statemachine.py 2011-04-15 20:20:57 +0000
546+++ ensemble/lib/statemachine.py 2011-05-02 21:57:29 +0000
547@@ -61,7 +61,7 @@
548 returnValue(self._workflow.get_transitions(state_id))
549
550 @inlineCallbacks
551- def fire_transition_alias(self, transition_alias):
552+ def fire_transition_alias(self, transition_alias, **transition_variables):
553 """Fire a transition with the matching alias.
554
555 A transition from the current state with the given alias will
556@@ -81,6 +81,9 @@
557
558 Ambigious (multiple) or no matching transitions cause an exception
559 InvalidTransition to be raised.
560+
561+ Keyword args are used as transition variables and given to the
562+ associated transtion action.
563 """
564
565 found = []
566@@ -100,7 +103,8 @@
567 "No transition found for alias:%s state:%s" % (
568 transition_alias, current_state))
569
570- value = yield self.fire_transition(found[0].transition_id)
571+ value = yield self.fire_transition(
572+ found[0].transition_id, **transition_variables)
573 returnValue(value)
574
575 @inlineCallbacks
576@@ -135,7 +139,8 @@
577 returnValue(False)
578
579 @inlineCallbacks
580- def fire_transition(self, transition_id, **state_variables):
581+ def fire_transition(
582+ self, transition_id, **transition_variables):
583 """Fire a transition with given id.
584
585 Invokes any transition actions, saves state and state variables, and
586@@ -156,10 +161,9 @@
587 transition_id,
588 transition.source,
589 transition.destination,
590- state_variables)
591+ transition_variables)
592
593 # Execute any per transition action.
594- state_variables = state_variables
595 action_id = "do_%s" % transition_id
596 action = getattr(self, action_id, None)
597
598@@ -167,9 +171,9 @@
599 try:
600 log.debug("%s: execute action %s",
601 class_name(self), action.__name__)
602- variables = yield action()
603+ variables = yield action(**transition_variables)
604 if isinstance(variables, dict):
605- state_variables.update(variables)
606+ transition_variables.update(variables)
607 except TransitionError, e:
608 # If an error happens during the transition, allow for
609 # executing an error transition.
610@@ -187,14 +191,16 @@
611 returnValue(False)
612
613 # Set the state with state variables
614- yield self.set_state(transition.destination, **state_variables)
615+ yield self.set_state(transition.destination, **transition_variables)
616 log.debug("%s: transition complete %s (state %s) %r",
617 class_name(self), transition_id,
618- transition.destination, state_variables)
619+ transition.destination, transition_variables)
620 if transition.success_transition_id:
621 log.debug("%s: initiating success transition: %s",
622 class_name(self), transition.success_transition_id)
623- yield self.fire_transition(transition.success_transition_id)
624+ yield self.fire_transition(
625+ transition.success_transition_id,
626+ **transition_variables)
627 returnValue(True)
628
629 @inlineCallbacks
630
631=== modified file 'ensemble/lib/tests/test_statemachine.py'
632--- ensemble/lib/tests/test_statemachine.py 2011-04-15 20:20:57 +0000
633+++ ensemble/lib/tests/test_statemachine.py 2011-05-02 21:57:29 +0000
634@@ -295,6 +295,11 @@
635 Transition("continue", "", "next-state", "final-state"))
636
637 workflow_state = AttributeWorkflowState(workflow)
638+ results = []
639+
640+ def do_begin(*args, **kw):
641+ results.append(kw)
642+ workflow_state.do_begin = do_begin
643
644 yield workflow_state.fire_transition(
645 "begin", rabbit="moon", hello=True)
646@@ -303,6 +308,10 @@
647 variables = yield workflow_state.get_state_variables()
648 self.assertEqual({"rabbit": "moon", "hello": True}, variables)
649
650+ # Ensure the variables made it through to the transition action
651+ self.assertEqual(
652+ {"rabbit": "moon", "hello": True}, results[0])
653+
654 yield workflow_state.fire_transition("continue")
655 current_state = yield workflow_state.get_state()
656 self.assertEqual(current_state, "final-state")
657
658=== modified file 'ensemble/state/errors.py'
659--- ensemble/state/errors.py 2011-04-28 17:52:55 +0000
660+++ ensemble/state/errors.py 2011-05-02 21:57:29 +0000
661@@ -151,6 +151,30 @@
662 self.unit_name
663
664
665+class ServiceUnitResolvedAlreadyEnabled(StateError):
666+ """The unit has already been marked resolved.
667+ """
668+
669+ def __init__(self, unit_name):
670+ self.unit_name = unit_name
671+
672+ def __str__(self):
673+ return "Service unit %r is already marked as resolved." % (
674+ self.unit_name)
675+
676+
677+class ServiceUnitRelationResolvedAlreadyEnabled(StateError):
678+ """The unit has already been marked resolved.
679+ """
680+
681+ def __init__(self, unit_name):
682+ self.unit_name = unit_name
683+
684+ def __str__(self):
685+ return "Service unit %r already has relations marked as resolved." % (
686+ self.unit_name)
687+
688+
689 class RelationAlreadyExists(StateError):
690
691 def __init__(self, *endpoints):
692
693=== modified file 'ensemble/state/service.py'
694--- ensemble/state/service.py 2011-04-27 16:23:16 +0000
695+++ ensemble/state/service.py 2011-05-02 21:57:29 +0000
696@@ -15,11 +15,16 @@
697 StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound,
698 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,
699 BadDescriptor, BadServiceStateName, NoUnusedMachines,
700- ServiceUnitDebugAlreadyEnabled)
701+ ServiceUnitDebugAlreadyEnabled, ServiceUnitResolvedAlreadyEnabled,
702+ ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher)
703 from ensemble.state.formula import FormulaStateManager
704 from ensemble.state.relation import ServiceRelationState, RelationStateManager
705 from ensemble.state.machine import _public_machine_id, MachineState
706-from ensemble.state.utils import remove_tree
707+from ensemble.state.utils import remove_tree, dict_merge
708+
709+
710+RETRY_HOOKS = 1000
711+NO_HOOKS = 1001
712
713
714 class ServiceStateManager(StateBase):
715@@ -855,6 +860,185 @@
716 callback_d.addCallback(
717 lambda x: watch_d.addCallback(watcher) and x)
718
719+ @property
720+ def _unit_resolve_path(self):
721+ return "/units/%s/resolved" % self.internal_id
722+
723+ @inlineCallbacks
724+ def set_resolved(self, retry):
725+ """Mark the unit as in need of being resolved.
726+
727+ :param retry: A boolean denoting if hooks should fire as a result
728+ of the retry.
729+
730+ The resolved setting is set by the command line to inform
731+ a unit to attempt an retry transition from an error state.
732+ """
733+
734+ if not retry in (RETRY_HOOKS, NO_HOOKS):
735+ raise ValueError("invalid retry value %r" % retry)
736+
737+ try:
738+ yield self._client.create(
739+ self._unit_resolve_path, yaml.safe_dump({"retry": retry}))
740+ except zookeeper.NodeExistsException:
741+ raise ServiceUnitResolvedAlreadyEnabled(self.unit_name)
742+
743+ @inlineCallbacks
744+ def get_resolved(self):
745+ """Get the value of the resolved setting if any.
746+
747+ The resolved setting is retrieved by the unit agent and if
748+ found instructs it to attempt an retry transition from an
749+ error state.
750+ """
751+ try:
752+ content, stat = yield self._client.get(self._unit_resolve_path)
753+ except zookeeper.NoNodeException:
754+ # Return a default value.
755+ returnValue(None)
756+ returnValue(yaml.load(content))
757+
758+ @inlineCallbacks
759+ def clear_resolved(self):
760+ """Remove any resolved setting on the unit."""
761+ try:
762+ yield self._client.delete(self._unit_resolve_path)
763+ except zookeeper.NoNodeException:
764+ # We get to the same end state.
765+ pass
766+
767+ @inlineCallbacks
768+ def watch_resolved(self, callback):
769+ """Set a callback to be invoked when an unit is marked resolved.
770+
771+ :param callback: The callback recieves a single parameter, the
772+ change event. The watcher always recieve an initial
773+ boolean value invocation denoting the existence of the
774+ resolved setting. Subsequent invocations will be with change
775+ events.
776+ """
777+ @inlineCallbacks
778+ def watcher(change_event):
779+ if not self._client.connected:
780+ returnValue(None)
781+
782+ exists_d, watch_d = self._client.exists_and_watch(
783+ self._unit_resolve_path)
784+ try:
785+ yield callback(change_event)
786+ except StopWatcher:
787+ returnValue(None)
788+ watch_d.addCallback(watcher)
789+
790+ exists_d, watch_d = self._client.exists_and_watch(
791+ self._unit_resolve_path)
792+ exists = yield exists_d
793+
794+ # Setup the watch deferred callback after the user defined callback
795+ # has returned successfully from the existence invocation.
796+ callback_d = maybeDeferred(callback, bool(exists))
797+ callback_d.addCallback(
798+ lambda x: watch_d.addCallback(watcher) and x)
799+ callback_d.addErrback(
800+ lambda failure: failure.trap(StopWatcher))
801+
802+ @property
803+ def _relation_resolved_path(self):
804+ return "/units/%s/relation-resolved" % self.internal_id
805+
806+ @inlineCallbacks
807+ def set_relation_resolved(self, relation_map):
808+ """Mark a unit's relations as in need of being resolved.
809+
810+ :param relation_map: A map of internal relation ids, to retry hook
811+ values either ensemble.state.service.NO_HOOKS or
812+ RETRY_HOOKS.
813+ """
814+ if not isinstance(relation_map, dict):
815+ raise ValueError(
816+ "Relation map must be a dictionary %r" % relation_map)
817+
818+ if [v for v in relation_map.values() if v not in (
819+ RETRY_HOOKS, NO_HOOKS)]:
820+
821+ print relation_map
822+ raise ValueError("Invalid setting for retry hook")
823+
824+ def update_relation_resolved(content, stat):
825+ if not content:
826+ return yaml.safe_dump(relation_map)
827+
828+ content = yaml.safe_dump(
829+ dict_merge(yaml.load(content), relation_map))
830+ return content
831+
832+ try:
833+ yield retry_change(
834+ self._client,
835+ self._relation_resolved_path,
836+ update_relation_resolved)
837+ except StateChanged:
838+ raise ServiceUnitRelationResolvedAlreadyEnabled(self.unit_name)
839+ returnValue(True)
840+
841+ @inlineCallbacks
842+ def get_relation_resolved(self):
843+ """Retrieve any resolved flags set for this unit's relations.
844+ """
845+ try:
846+ content, stat = yield self._client.get(
847+ self._relation_resolved_path)
848+ except zookeeper.NoNodeException:
849+ returnValue(None)
850+ returnValue(yaml.load(content))
851+
852+ @inlineCallbacks
853+ def clear_relation_resolved(self):
854+ """ Clear the relation resolved setting.
855+ """
856+ try:
857+ yield self._client.delete(self._relation_resolved_path)
858+ except zookeeper.NoNodeException:
859+ # We get to the same end state.
860+ pass
861+
862+ @inlineCallbacks
863+ def watch_relation_resolved(self, callback):
864+ """Set a callback to be invoked when a unit's relations are resolved.
865+
866+ :param callback: The callback recieves a single parameter, the
867+ change event. The watcher always recieve an initial
868+ boolean value invocation denoting the existence of the
869+ resolved setting. Subsequent invocations will be with change
870+ events.
871+ """
872+ @inlineCallbacks
873+ def watcher(change_event):
874+ if not self._client.connected:
875+ returnValue(None)
876+ exists_d, watch_d = self._client.exists_and_watch(
877+ self._relation_resolved_path)
878+ try:
879+ yield callback(change_event)
880+ except StopWatcher:
881+ returnValue(None)
882+
883+ watch_d.addCallback(watcher)
884+
885+ exists_d, watch_d = self._client.exists_and_watch(
886+ self._relation_resolved_path)
887+
888+ exists = yield exists_d
889+
890+ # Setup the watch deferred callback after the user defined callback
891+ # has returned successfully from the existence invocation.
892+ callback_d = maybeDeferred(callback, bool(exists))
893+ callback_d.addCallback(
894+ lambda x: watch_d.addCallback(watcher) and x)
895+ callback_d.addErrback(
896+ lambda failure: failure.trap(StopWatcher))
897+
898
899 def _parse_unit_name(unit_name):
900 """Parse a unit's name into the service name and its sequence.
901
902=== modified file 'ensemble/state/tests/test_errors.py'
903--- ensemble/state/tests/test_errors.py 2011-02-23 17:47:46 +0000
904+++ ensemble/state/tests/test_errors.py 2011-05-02 21:57:29 +0000
905@@ -11,7 +11,9 @@
906 UnitRelationStateAlreadyAssigned, UnknownRelationRole,
907 BadDescriptor, DuplicateEndpoints, IncompatibleEndpoints,
908 NoMatchingEndpoints, AmbiguousEndpoints,
909- ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled)
910+ ServiceUnitStateMachineNotAssigned, ServiceUnitDebugAlreadyEnabled,
911+ ServiceUnitResolvedAlreadyEnabled,
912+ ServiceUnitRelationResolvedAlreadyEnabled)
913
914
915 class StateErrorsTest(TestCase):
916@@ -89,6 +91,23 @@
917 str(error),
918 "Service unit 'wordpress/0' is already in debug mode.")
919
920+ def test_unit_already_in_resolved_mode(self):
921+ error = ServiceUnitResolvedAlreadyEnabled("wordpress/0")
922+ self.assertIsStateError(error)
923+ self.assertEquals(error.unit_name, "wordpress/0")
924+ self.assertEquals(
925+ str(error),
926+ "Service unit 'wordpress/0' is already marked as resolved.")
927+
928+ def test_unit_already_in_relation_resolved_mode(self):
929+ error = ServiceUnitRelationResolvedAlreadyEnabled("wordpress/0")
930+ self.assertIsStateError(error)
931+ self.assertEquals(error.unit_name, "wordpress/0")
932+ self.assertEquals(
933+ str(error),
934+ "Service unit %r already has relations marked as resolved." % (
935+ "wordpress/0"))
936+
937 def test_service_name_in_use(self):
938 error = ServiceStateNameInUse("wordpress")
939 self.assertIsStateError(error)
940
941=== modified file 'ensemble/state/tests/test_service.py'
942--- ensemble/state/tests/test_service.py 2011-04-23 01:13:27 +0000
943+++ ensemble/state/tests/test_service.py 2011-05-02 21:57:29 +0000
944@@ -9,14 +9,15 @@
945 from ensemble.formula.tests.test_metadata import test_repository_path
946 from ensemble.state.endpoint import RelationEndpoint
947 from ensemble.state.formula import FormulaStateManager
948-from ensemble.state.service import ServiceStateManager
949+from ensemble.state.service import ServiceStateManager, NO_HOOKS, RETRY_HOOKS
950 from ensemble.state.machine import MachineStateManager
951 from ensemble.state.relation import RelationStateManager
952 from ensemble.state.errors import (
953 StateChanged, ServiceStateNotFound, ServiceUnitStateNotFound,
954 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,
955 BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled,
956- MachineStateNotFound, NoUnusedMachines)
957+ MachineStateNotFound, NoUnusedMachines, ServiceUnitResolvedAlreadyEnabled,
958+ ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher)
959
960
961 from ensemble.state.tests.common import StateTestBase
962@@ -738,6 +739,283 @@
963 None)
964
965 @inlineCallbacks
966+ def test_get_set_clear_resolved(self):
967+ """The a unit can be set to resolved to mark a future transition, with
968+ an optional retry flag."""
969+
970+ unit_state = yield self.get_unit_state()
971+
972+ self.assertIdentical((yield unit_state.get_resolved()), None)
973+ yield unit_state.set_resolved(NO_HOOKS)
974+
975+ yield self.assertFailure(
976+ unit_state.set_resolved(NO_HOOKS),
977+ ServiceUnitResolvedAlreadyEnabled)
978+ yield self.assertEqual(
979+
980+ (yield unit_state.get_resolved()), {"retry": NO_HOOKS})
981+
982+ yield unit_state.clear_resolved()
983+ self.assertIdentical((yield unit_state.get_resolved()), None)
984+ yield unit_state.clear_resolved()
985+
986+ yield self.assertFailure(unit_state.set_resolved(None), ValueError)
987+
988+ @inlineCallbacks
989+ def test_watch_resolved(self):
990+ """A unit resolved watch can be instituted on a permanent basis."""
991+ unit_state = yield self.get_unit_state()
992+
993+ results = []
994+
995+ def callback(value):
996+ results.append(value)
997+
998+ unit_state.watch_resolved(callback)
999+ yield unit_state.set_resolved(RETRY_HOOKS)
1000+ yield unit_state.clear_resolved()
1001+ yield unit_state.set_resolved(NO_HOOKS)
1002+
1003+ yield self.poke_zk()
1004+
1005+ self.assertEqual(len(results), 4)
1006+ self.assertIdentical(results.pop(0), False)
1007+ self.assertEqual(results.pop(0).type_name, "created")
1008+ self.assertEqual(results.pop(0).type_name, "deleted")
1009+ self.assertEqual(results.pop(0).type_name, "created")
1010+
1011+ self.assertEqual(
1012+ (yield unit_state.get_resolved()),
1013+ {"retry": NO_HOOKS})
1014+
1015+ @inlineCallbacks
1016+ def test_stop_watch_resolved(self):
1017+ """A unit resolved watch can be instituted on a permanent basis.
1018+
1019+ However the callback can raise StopWatcher at anytime to stop the watch
1020+ """
1021+ unit_state = yield self.get_unit_state()
1022+
1023+ results = []
1024+
1025+ def callback(value):
1026+ results.append(value)
1027+ if len(results) == 1:
1028+ raise StopWatcher()
1029+ if len(results) == 3:
1030+ raise StopWatcher()
1031+
1032+ unit_state.watch_resolved(callback)
1033+ yield unit_state.set_resolved(RETRY_HOOKS)
1034+ yield unit_state.clear_resolved()
1035+ yield self.poke_zk()
1036+
1037+ unit_state.watch_resolved(callback)
1038+ yield unit_state.set_resolved(NO_HOOKS)
1039+ yield unit_state.clear_resolved()
1040+
1041+ yield self.poke_zk()
1042+
1043+ self.assertEqual(len(results), 3)
1044+ self.assertIdentical(results.pop(0), False)
1045+ self.assertIdentical(results.pop(0), False)
1046+ self.assertEqual(results.pop(0).type_name, "created")
1047+
1048+ self.assertEqual(
1049+ (yield unit_state.get_resolved()), None)
1050+
1051+ @inlineCallbacks
1052+ def test_get_set_clear_relation_resolved(self):
1053+ """The a unit's realtions can be set to resolved to mark a
1054+ future transition, with an optional retry flag."""
1055+
1056+ unit_state = yield self.get_unit_state()
1057+
1058+ self.assertIdentical((yield unit_state.get_relation_resolved()), None)
1059+ yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
1060+
1061+ # Trying to set a conflicting raises an error
1062+ yield self.assertFailure(
1063+ unit_state.set_relation_resolved({"0": NO_HOOKS}),
1064+ ServiceUnitRelationResolvedAlreadyEnabled)
1065+
1066+ # Doing the same thing is fine
1067+ yield unit_state.set_relation_resolved({"0": RETRY_HOOKS}),
1068+
1069+ # Its fine to put in new values
1070+ yield unit_state.set_relation_resolved({"21": RETRY_HOOKS})
1071+ yield self.assertEqual(
1072+ (yield unit_state.get_relation_resolved()),
1073+ {"0": RETRY_HOOKS, "21": RETRY_HOOKS})
1074+
1075+ yield unit_state.clear_relation_resolved()
1076+ self.assertIdentical((yield unit_state.get_relation_resolved()), None)
1077+ yield unit_state.clear_relation_resolved()
1078+
1079+ yield self.assertFailure(
1080+ unit_state.set_relation_resolved(True), ValueError)
1081+ yield self.assertFailure(
1082+ unit_state.set_relation_resolved(None), ValueError)
1083+
1084+ @inlineCallbacks
1085+ def test_watch_relation_resolved(self):
1086+ """A unit resolved watch can be instituted on a permanent basis."""
1087+ unit_state = yield self.get_unit_state()
1088+
1089+ results = []
1090+
1091+ def callback(value):
1092+ results.append(value)
1093+
1094+ unit_state.watch_relation_resolved(callback)
1095+ yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
1096+ yield unit_state.clear_relation_resolved()
1097+ yield unit_state.set_relation_resolved({"0": NO_HOOKS})
1098+
1099+ yield self.poke_zk()
1100+
1101+ self.assertEqual(len(results), 4)
1102+ self.assertIdentical(results.pop(0), False)
1103+ self.assertEqual(results.pop(0).type_name, "created")
1104+ self.assertEqual(results.pop(0).type_name, "deleted")
1105+ self.assertEqual(results.pop(0).type_name, "created")
1106+
1107+ self.assertEqual(
1108+ (yield unit_state.get_relation_resolved()),
1109+ {"0": NO_HOOKS})
1110+
1111+ @inlineCallbacks
1112+ def test_stop_watch_relation_resolved(self):
1113+ """A unit resolved watch can be instituted on a permanent basis."""
1114+ unit_state = yield self.get_unit_state()
1115+
1116+ results = []
1117+
1118+ def callback(value):
1119+ results.append(value)
1120+
1121+ if len(results) == 1:
1122+ raise StopWatcher()
1123+
1124+ if len(results) == 3:
1125+ raise StopWatcher()
1126+
1127+ unit_state.watch_relation_resolved(callback)
1128+ yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
1129+ yield unit_state.clear_relation_resolved()
1130+ yield self.poke_zk()
1131+ self.assertEqual(len(results), 1)
1132+
1133+ unit_state.watch_relation_resolved(callback)
1134+ yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
1135+ yield unit_state.clear_relation_resolved()
1136+ yield self.poke_zk()
1137+ self.assertEqual(len(results), 3)
1138+ self.assertIdentical(results.pop(0), False)
1139+ self.assertIdentical(results.pop(0), False)
1140+ self.assertEqual(results.pop(0).type_name, "created")
1141+
1142+ self.assertEqual(
1143+ (yield unit_state.get_relation_resolved()), None)
1144+
1145+ @inlineCallbacks
1146+ def test_watch_resolved_slow_callback(self):
1147+ """A slow watch callback is still invoked serially."""
1148+ unit_state = yield self.get_unit_state()
1149+
1150+ callbacks = [Deferred() for i in range(5)]
1151+ results = []
1152+ contents = []
1153+
1154+ @inlineCallbacks
1155+ def watch(value):
1156+ results.append(value)
1157+ yield callbacks[len(results) - 1]
1158+ contents.append((yield unit_state.get_resolved()))
1159+
1160+ yield unit_state.watch_resolved(watch)
1161+
1162+ # These get collapsed into a single event
1163+ yield unit_state.set_resolved(RETRY_HOOKS)
1164+ yield unit_state.clear_resolved()
1165+ yield self.poke_zk()
1166+
1167+ # Verify the callback hasn't completed
1168+ self.assertEqual(len(results), 1)
1169+ self.assertEqual(len(contents), 0)
1170+
1171+ # Let it finish
1172+ callbacks[0].callback(True)
1173+ yield self.poke_zk()
1174+
1175+ # Verify result counts
1176+ self.assertEqual(len(results), 2)
1177+ self.assertEqual(len(contents), 1)
1178+
1179+ # Verify result values. Even though we have created event, the
1180+ # setting retrieved shows the hook is not enabled.
1181+ self.assertEqual(results[-1].type_name, "created")
1182+ self.assertEqual(contents[-1], None)
1183+
1184+ yield unit_state.set_resolved(NO_HOOKS)
1185+ callbacks[1].callback(True)
1186+ yield self.poke_zk()
1187+
1188+ self.assertEqual(len(results), 3)
1189+ self.assertEqual(contents[-1], {"retry": NO_HOOKS})
1190+
1191+ # Clear out any pending activity.
1192+ yield self.poke_zk()
1193+
1194+ @inlineCallbacks
1195+ def test_watch_relation_resolved_slow_callback(self):
1196+ """A slow watch callback is still invoked serially."""
1197+ unit_state = yield self.get_unit_state()
1198+
1199+ callbacks = [Deferred() for i in range(5)]
1200+ results = []
1201+ contents = []
1202+
1203+ @inlineCallbacks
1204+ def watch(value):
1205+ results.append(value)
1206+ yield callbacks[len(results) - 1]
1207+ contents.append((yield unit_state.get_relation_resolved()))
1208+
1209+ yield unit_state.watch_relation_resolved(watch)
1210+
1211+ # These get collapsed into a single event
1212+ yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
1213+ yield unit_state.clear_relation_resolved()
1214+ yield self.poke_zk()
1215+
1216+ # Verify the callback hasn't completed
1217+ self.assertEqual(len(results), 1)
1218+ self.assertEqual(len(contents), 0)
1219+
1220+ # Let it finish
1221+ callbacks[0].callback(True)
1222+ yield self.poke_zk()
1223+
1224+ # Verify result counts
1225+ self.assertEqual(len(results), 2)
1226+ self.assertEqual(len(contents), 1)
1227+
1228+ # Verify result values. Even though we have created event, the
1229+ # setting retrieved shows the hook is not enabled.
1230+ self.assertEqual(results[-1].type_name, "created")
1231+ self.assertEqual(contents[-1], None)
1232+
1233+ yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
1234+ callbacks[1].callback(True)
1235+ yield self.poke_zk()
1236+
1237+ self.assertEqual(len(results), 3)
1238+ self.assertEqual(contents[-1], {"0": RETRY_HOOKS})
1239+ # Clear out any pending activity.
1240+ yield self.poke_zk()
1241+
1242+ @inlineCallbacks
1243 def test_set_and_clear_upgrade_flag(self):
1244 """An upgrade flag can be set on a unit."""
1245
1246@@ -870,6 +1148,9 @@
1247 self.assertEqual(results[-1].type_name, "created")
1248 self.assertEqual(contents[-1], True)
1249
1250+ # Clear out any pending activity.
1251+ yield self.poke_zk()
1252+
1253 @inlineCallbacks
1254 def test_enable_debug_hook(self):
1255 """Unit hook debugging can be enabled on the unit state."""
1256@@ -1061,6 +1342,9 @@
1257 self.assertEqual(results[-1].type_name, "created")
1258 self.assertEqual(contents[-1], {"debug_hooks": ["*"]})
1259
1260+ # Clear out any pending activity.
1261+ yield self.poke_zk()
1262+
1263 @inlineCallbacks
1264 def test_service_unit_agent(self):
1265 """A service unit state has an associated unit agent."""
1266
1267=== modified file 'ensemble/state/tests/test_utils.py'
1268--- ensemble/state/tests/test_utils.py 2011-04-28 17:52:55 +0000
1269+++ ensemble/state/tests/test_utils.py 2011-05-02 21:57:29 +0000
1270@@ -12,9 +12,10 @@
1271 import yaml
1272
1273 from ensemble.lib.testing import TestCase
1274-from ensemble.state.errors import StateNotFound
1275-from ensemble.state.utils import (PortWatcher, remove_tree,
1276+from ensemble.state.errors import StateChanged, StateNotFound
1277+from ensemble.state.utils import (PortWatcher, remove_tree, dict_merge,
1278 get_open_port, YAMLState)
1279+
1280 from ensemble.tests.common import get_test_zookeeper_address
1281
1282
1283@@ -206,6 +207,26 @@
1284 self.assertNotIn("zoo", children)
1285
1286
1287+class DictMergeTest(TestCase):
1288+
1289+ def test_merge_no_match(self):
1290+ self.assertEqual(
1291+ dict_merge(dict(a=1), dict(b=2)),
1292+ dict(a=1, b=2))
1293+
1294+ def test_merge_matching_keys_same_value(self):
1295+ self.assertEqual(
1296+ dict_merge(dict(a=1, b=2), dict(b=2, c=1)),
1297+ dict(a=1, b=2, c=1))
1298+
1299+ def test_merge_conflict(self):
1300+ self.assertRaises(
1301+ StateChanged,
1302+ dict_merge,
1303+ dict(a=1, b=3),
1304+ dict(b=2, c=1))
1305+
1306+
1307 class OpenPortTest(TestCase):
1308
1309 def test_get_open_port(self):
1310@@ -301,7 +322,6 @@
1311 zk_data = yaml.load(zk_data)
1312 self.assertEqual(zk_data, options)
1313
1314-
1315 @inlineCallbacks
1316 def test_conflict_on_set(self):
1317 """Version conflict error tests.
1318@@ -357,8 +377,8 @@
1319 yield node.read()
1320
1321 options = dict(alpha="beta", one=1)
1322- node["alpha"] = "beta"
1323- node["one"] = 1
1324+ node["alpha"] = "beta"
1325+ node["one"] = 1
1326 yield node.write()
1327
1328 # a local get should reflect proper data
1329@@ -369,7 +389,6 @@
1330 zk_data = yaml.load(zk_data)
1331 self.assertEqual(zk_data, options)
1332
1333-
1334 @inlineCallbacks
1335 def test_multiple_reads(self):
1336 """Calling read resets state to ZK after multiple round-trips."""
1337@@ -415,7 +434,7 @@
1338
1339 result = node.pop("foo")
1340 self.assertEqual(result, "bar")
1341- self.assertEqual(node, {"alpha": "beta",})
1342+ self.assertEqual(node, {"alpha": "beta"})
1343
1344 node["delta"] = "gamma"
1345 self.assertEqual(set(node.keys()), set(("alpha", "delta")))
1346@@ -424,7 +443,6 @@
1347 self.assertIn(("alpha", "beta"), result)
1348 self.assertIn(("delta", "gamma"), result)
1349
1350-
1351 @inlineCallbacks
1352 def test_del_empties_state(self):
1353 d = YAMLState(self.client, self.path)
1354@@ -500,12 +518,8 @@
1355 yield d1.read()
1356 self.assertEquals(d1, d2)
1357
1358-
1359 @inlineCallbacks
1360 def test_read_requires_node(self):
1361 """Validate that read raises when required=True."""
1362 d1 = YAMLState(self.client, self.path)
1363 yield self.assertFailure(d1.read(True), StateNotFound)
1364-
1365-
1366-
1367
1368=== modified file 'ensemble/state/utils.py'
1369--- ensemble/state/utils.py 2011-04-28 17:52:55 +0000
1370+++ ensemble/state/utils.py 2011-05-02 21:57:29 +0000
1371@@ -9,8 +9,10 @@
1372 import yaml
1373 import zookeeper
1374
1375+from ensemble.state.errors import StateChanged
1376 from ensemble.state.errors import StateNotFound
1377
1378+
1379 class PortWatcher(object):
1380
1381 def __init__(self, host, port, timeout, listen=False):
1382@@ -91,6 +93,22 @@
1383 return port
1384
1385
1386+def dict_merge(d1, d2):
1387+ """Return a union of dicts if they have no conflicting values.
1388+
1389+ Else raise a StateChanged error.
1390+ """
1391+ must_match = set(d1).intersection(d2)
1392+ for k in must_match:
1393+ if not d1[k] == d2[k]:
1394+ raise StateChanged()
1395+
1396+ d = {}
1397+ d.update(d1)
1398+ d.update(d2)
1399+ return d
1400+
1401+
1402 class YAMLState(DictMixin):
1403 """Provides a dict like interface around a Zookeeper node
1404 containing serialised YAML data. The dict provided represents the
1405@@ -144,7 +162,6 @@
1406 if required:
1407 raise StateNotFound(self._path)
1408
1409-
1410 def _check(self):
1411 """Verify that sync was called for operations which expect it."""
1412 if self._pristine_cache is None:
1413@@ -164,7 +181,6 @@
1414 self._check()
1415 self._cache[key] = value
1416
1417-
1418 def __delitem__(self, key):
1419 self._check()
1420 del self._cache[key]
1421
1422=== modified file 'ensemble/unit/lifecycle.py'
1423--- ensemble/unit/lifecycle.py 2011-04-15 18:53:29 +0000
1424+++ ensemble/unit/lifecycle.py 2011-05-02 21:57:29 +0000
1425@@ -2,14 +2,14 @@
1426 import logging
1427
1428 from twisted.internet.defer import (
1429- inlineCallbacks, DeferredLock)
1430+ inlineCallbacks, DeferredLock, returnValue)
1431
1432 from ensemble.hooks.invoker import Invoker
1433 from ensemble.hooks.scheduler import HookScheduler
1434 from ensemble.state.hook import RelationChange
1435 from ensemble.state.errors import StopWatcher, UnitRelationStateNotFound
1436
1437-from ensemble.unit.workflow import RelationWorkflowState, RelationWorkflow
1438+from ensemble.unit.workflow import RelationWorkflowState
1439
1440
1441 HOOK_SOCKET_FILE = ".ensemble.hookcli.sock"
1442@@ -32,7 +32,8 @@
1443 self._unit_path = unit_path
1444 self._relations = {}
1445 self._running = False
1446- self._watching = False
1447+ self._watching_relation_memberships = False
1448+ self._watching_relation_resolved = False
1449 self._run_lock = DeferredLock()
1450 self._log = logging.getLogger("unit.lifecycle")
1451
1452@@ -45,21 +46,23 @@
1453 return self._relations[relation_id]
1454
1455 @inlineCallbacks
1456- def install(self):
1457+ def install(self, fire_hooks=True):
1458 """Invoke the unit's install hook.
1459 """
1460- yield self._execute_hook("install")
1461+ if fire_hooks:
1462+ yield self._execute_hook("install")
1463
1464 @inlineCallbacks
1465- def upgrade_formula(self):
1466+ def upgrade_formula(self, fire_hooks=True):
1467 """Invoke the unit's upgrade-formula hook.
1468 """
1469- yield self._execute_hook("upgrade-formula", now=True)
1470+ if fire_hooks:
1471+ yield self._execute_hook("upgrade-formula", now=True)
1472 # Restart hook queued hook execution.
1473 self._executor.start()
1474
1475 @inlineCallbacks
1476- def start(self):
1477+ def start(self, fire_hooks=True):
1478 """Invoke the start hook, and setup relation watching.
1479 """
1480 self._log.debug("pre-start acquire, running:%s", self._running)
1481@@ -70,22 +73,29 @@
1482 assert not self._running, "Already started"
1483
1484 # Execute the start hook
1485- yield self._execute_hook("start")
1486+ if fire_hooks:
1487+ yield self._execute_hook("start")
1488
1489 # If we have any existing relations in memory, start them.
1490 if self._relations:
1491 self._log.debug("starting relation lifecycles")
1492
1493 for workflow in self._relations.values():
1494- # should not transition an
1495 yield workflow.transition_state("up")
1496
1497 # Establish a watch on the existing relations.
1498- if not self._watching:
1499+ if not self._watching_relation_memberships:
1500 self._log.debug("starting service relation watch")
1501 yield self._service.watch_relation_states(
1502 self._on_service_relation_changes)
1503- self._watching = True
1504+ self._watching_relation_memberships = True
1505+
1506+ # Establish a watch for resolved relations
1507+ if not self._watching_relation_resolved:
1508+ self._log.debug("starting unit relation resolved watch")
1509+ yield self._unit.watch_relation_resolved(
1510+ self._on_relation_resolved_changes)
1511+ self._watching_relation_resolved = True
1512
1513 # Set current status
1514 self._running = True
1515@@ -94,7 +104,7 @@
1516 self._log.debug("started unit lifecycle")
1517
1518 @inlineCallbacks
1519- def stop(self):
1520+ def stop(self, fire_hooks=True):
1521 """Stop the unit, executes the stop hook, and stops relation watching.
1522 """
1523 self._log.debug("pre-stop acquire, running:%s", self._running)
1524@@ -106,10 +116,12 @@
1525 # Stop relation lifecycles
1526 if self._relations:
1527 self._log.debug("stopping relation lifecycles")
1528+
1529 for workflow in self._relations.values():
1530 yield workflow.transition_state("down")
1531
1532- yield self._execute_hook("stop")
1533+ if fire_hooks:
1534+ yield self._execute_hook("stop")
1535
1536 # Set current status
1537 self._running = False
1538@@ -118,6 +130,49 @@
1539 self._log.debug("stopped unit lifecycle")
1540
1541 @inlineCallbacks
1542+ def _on_relation_resolved_changes(self, event):
1543+ """Callback for unit relation resolved watching.
1544+
1545+ The callback is invoked whenever the relation resolved
1546+ settings change.
1547+ """
1548+ self._log.debug("relation resolved changed")
1549+ # Acquire the run lock, and process the changes.
1550+ yield self._run_lock.acquire()
1551+
1552+ try:
1553+ # If the unit lifecycle isn't running we shouldn't process
1554+ # any relation resolutions.
1555+ if not self._running:
1556+ self._log.debug("stop watch relation resolved changes")
1557+ self._watching_relation_resolved = False
1558+ raise StopWatcher()
1559+
1560+ self._log.info("processing relation resolved changed")
1561+ if self._client.connected:
1562+ yield self._process_relation_resolved_changes()
1563+ finally:
1564+ yield self._run_lock.release()
1565+
1566+ @inlineCallbacks
1567+ def _process_relation_resolved_changes(self):
1568+ """Invoke retry transitions on relations if their not running.
1569+ """
1570+ relation_resolved = yield self._unit.get_relation_resolved()
1571+ if relation_resolved is None:
1572+ returnValue(None)
1573+ else:
1574+ yield self._unit.clear_relation_resolved()
1575+
1576+ keys = set(relation_resolved).intersection(self._relations)
1577+ for rel_id in keys:
1578+ relation_workflow = self._relations[rel_id]
1579+ relation_state = yield relation_workflow.get_state()
1580+ if relation_state == "up":
1581+ continue
1582+ yield relation_workflow.transition_state("up")
1583+
1584+ @inlineCallbacks
1585 def _on_service_relation_changes(self, old_relations, new_relations):
1586 """Callback for service relation watching.
1587
1588@@ -137,7 +192,7 @@
1589 # If the lifecycle is not running, then stop the watcher
1590 if not self._running:
1591 self._log.debug("stop service-rel watcher, discarding changes")
1592- self._watching = False
1593+ self._watching_relation_memberships = False
1594 raise StopWatcher()
1595
1596 self._log.debug("processing relations changed")
1597@@ -147,9 +202,9 @@
1598
1599 @inlineCallbacks
1600 def _process_service_changes(self, old_relations, new_relations):
1601- """Add and remove unit lifecycles per the service relations changes.
1602+ """Add and remove unit lifecycles per the service relations Determine.
1603 """
1604- # Determine relation delta of global zk state with our memory state.
1605+ # changes relation delta of global zk state with our memory state.
1606 new_relations = dict([(service_relation.internal_relation_id,
1607 service_relation) for
1608 service_relation in new_relations])
1609@@ -336,8 +391,11 @@
1610 self._error_handler = handler
1611
1612 @inlineCallbacks
1613- def start(self):
1614+ def start(self, watches=True):
1615 """Start watching related units and executing change hooks.
1616+
1617+ @param watches: boolean parameter denoting if relation watches
1618+ should be started.
1619 """
1620 yield self._run_lock.acquire()
1621 try:
1622@@ -348,19 +406,24 @@
1623 self._watcher = yield self._unit_relation.watch_related_units(
1624 self._scheduler.notify_change)
1625 # And start the watcher.
1626- yield self._watcher.start()
1627+ if watches:
1628+ yield self._watcher.start()
1629 finally:
1630 self._run_lock.release()
1631 self._log.debug(
1632 "started relation:%s lifecycle", self._relation_name)
1633
1634 @inlineCallbacks
1635- def stop(self):
1636+ def stop(self, watches=True):
1637 """Stop watching changes and stop executing relation change hooks.
1638+
1639+ @param watches: boolean parameter denoting if relation watches
1640+ should be stopped.
1641 """
1642 yield self._run_lock.acquire()
1643 try:
1644- self._watcher.stop()
1645+ if watches and self._watcher:
1646+ self._watcher.stop()
1647 self._scheduler.stop()
1648 finally:
1649 yield self._run_lock.release()
1650
1651=== modified file 'ensemble/unit/tests/test_lifecycle.py'
1652--- ensemble/unit/tests/test_lifecycle.py 2011-04-28 17:55:11 +0000
1653+++ ensemble/unit/tests/test_lifecycle.py 2011-05-02 21:57:29 +0000
1654@@ -12,6 +12,8 @@
1655 from ensemble.unit.lifecycle import (
1656 UnitLifecycle, UnitRelationLifecycle, RelationInvoker)
1657
1658+from ensemble.unit.workflow import RelationWorkflowState
1659+
1660 from ensemble.hooks.invoker import Invoker
1661 from ensemble.hooks.executor import HookExecutor
1662
1663@@ -19,9 +21,11 @@
1664
1665 from ensemble.state.endpoint import RelationEndpoint
1666 from ensemble.state.relation import ClientServerUnitWatcher
1667+from ensemble.state.service import NO_HOOKS
1668 from ensemble.state.tests.test_relation import RelationTestBase
1669 from ensemble.state.hook import RelationChange
1670
1671+
1672 from ensemble.lib.testing import TestCase
1673 from ensemble.lib.mocker import MATCH
1674
1675@@ -145,6 +149,182 @@
1676 return output
1677
1678
1679+class LifecycleResolvedTest(LifecycleTestBase):
1680+
1681+ @inlineCallbacks
1682+ def setUp(self):
1683+ yield super(LifecycleResolvedTest, self).setUp()
1684+ yield self.setup_default_test_relation()
1685+ self.lifecycle = UnitLifecycle(
1686+ self.client, self.states["unit"], self.states["service"],
1687+ self.unit_directory, self.executor)
1688+
1689+ def get_unit_relation_workflow(self, states):
1690+ state_dir = os.path.join(self.ensemble_directory, "state")
1691+ lifecycle = UnitRelationLifecycle(
1692+ self.client,
1693+ states["unit_relation"],
1694+ states["service_relation"].relation_name,
1695+ self.unit_directory,
1696+ self.executor)
1697+
1698+ workflow = RelationWorkflowState(
1699+ self.client,
1700+ states["unit_relation"],
1701+ lifecycle,
1702+ state_dir)
1703+
1704+ return (workflow, lifecycle)
1705+
1706+ @inlineCallbacks
1707+ def test_resolved_relation_watch_unit_lifecycle_not_running(self):
1708+ """If the unit is not running then no relation resolving is performed.
1709+ However the resolution value remains the same.
1710+ """
1711+ # Start the unit.
1712+ yield self.lifecycle.start()
1713+
1714+ # Wait for the relation to be started.... TODO: async background work
1715+ yield self.sleep(0.1)
1716+
1717+ # Simulate relation down on an individual unit relation
1718+ workflow = self.lifecycle._relations.get(
1719+ self.states["unit_relation"].internal_relation_id)
1720+ self.assertEqual("up", (yield workflow.get_state()))
1721+
1722+ yield workflow.transition_state("down")
1723+ resolved = self.wait_on_state(workflow, "up")
1724+
1725+ # Stop the unit lifecycle
1726+ yield self.lifecycle.stop()
1727+
1728+ # Set the relation to resolved
1729+ yield self.states["unit"].set_relation_resolved(
1730+ {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
1731+
1732+ # Give a moment for the watch to fire erroneously
1733+ yield self.sleep(0.2)
1734+
1735+ # Ensure we didn't attempt a transition.
1736+ self.assertFalse(resolved.called)
1737+ self.assertEqual(
1738+ {self.states["unit_relation"].internal_relation_id: NO_HOOKS},
1739+ (yield self.states["unit"].get_relation_resolved()))
1740+
1741+ # If the unit is restarted start, we currently have the
1742+ # behavior that the unit relation workflow will automatically
1743+ # be transitioned back to running, as part of the normal state
1744+ # transition. Sigh.. we should have a separate error
1745+ # state for relation hooks then down with state variable usage.
1746+
1747+ @inlineCallbacks
1748+ def test_resolved_relation_watch_relation_up(self):
1749+ """If a relation marked as to be resolved is already running,
1750+ then no work is performed.
1751+ """
1752+ # Start the unit.
1753+ yield self.lifecycle.start()
1754+
1755+ # Wait for the relation to be started.... TODO: async background work
1756+ yield self.sleep(0.1)
1757+
1758+ # get a hold of the unit relation and verify state
1759+ workflow = self.lifecycle._relations.get(
1760+ self.states["unit_relation"].internal_relation_id)
1761+ self.assertEqual("up", (yield workflow.get_state()))
1762+
1763+ # Set the relation to resolved
1764+ yield self.states["unit"].set_relation_resolved(
1765+ {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
1766+
1767+ # Give a moment for the async background work.
1768+ yield self.sleep(0.1)
1769+
1770+ # Ensure we're still up and the relation resolved setting has been
1771+ # cleared.
1772+ self.assertEqual(
1773+ None, (yield self.states["unit"].get_relation_resolved()))
1774+ self.assertEqual("up", (yield workflow.get_state()))
1775+
1776+ @inlineCallbacks
1777+ def test_resolved_relation_watch_from_error(self):
1778+ """Unit lifecycle's will process a unit relation resolved
1779+ setting, and transition a down relation back to a running
1780+ state.
1781+ """
1782+ log_output = self.capture_logging(
1783+ "unit.lifecycle", level=logging.DEBUG)
1784+
1785+ # Start the unit.
1786+ yield self.lifecycle.start()
1787+
1788+ # Wait for the relation to be started... TODO: async background work
1789+ yield self.sleep(0.1)
1790+
1791+ # Simulate an error condition
1792+ workflow = self.lifecycle._relations.get(
1793+ self.states["unit_relation"].internal_relation_id)
1794+ self.assertEqual("up", (yield workflow.get_state()))
1795+ yield workflow.fire_transition("error")
1796+
1797+ resolved = self.wait_on_state(workflow, "up")
1798+
1799+ # Set the relation to resolved
1800+ yield self.states["unit"].set_relation_resolved(
1801+ {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
1802+
1803+ # Wait for the relation to come back up
1804+ value = yield self.states["unit"].get_relation_resolved()
1805+
1806+ yield resolved
1807+
1808+ # Verify state
1809+ value = yield workflow.get_state()
1810+ self.assertEqual(value, "up")
1811+
1812+ self.assertIn(
1813+ "processing relation resolved changed", log_output.getvalue())
1814+
1815+ @inlineCallbacks
1816+ def test_resolved_relation_watch(self):
1817+ """Unit lifecycle's will process a unit relation resolved
1818+ setting, and transition a down relation back to a running
1819+ state.
1820+ """
1821+ log_output = self.capture_logging(
1822+ "unit.lifecycle", level=logging.DEBUG)
1823+
1824+ # Start the unit.
1825+ yield self.lifecycle.start()
1826+
1827+ # Wait for the relation to be started... TODO: async background work
1828+ yield self.sleep(0.1)
1829+
1830+ # Simulate an error condition
1831+ workflow = self.lifecycle._relations.get(
1832+ self.states["unit_relation"].internal_relation_id)
1833+ self.assertEqual("up", (yield workflow.get_state()))
1834+ yield workflow.transition_state("down")
1835+
1836+ resolved = self.wait_on_state(workflow, "up")
1837+
1838+ # Set the relation to resolved
1839+ yield self.states["unit"].set_relation_resolved(
1840+ {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
1841+
1842+ # Wait for the relation to come back up
1843+ value = yield self.states["unit"].get_relation_resolved()
1844+
1845+ yield resolved
1846+
1847+ # Verify state
1848+ value = yield workflow.get_state()
1849+ self.assertEqual(value, "up")
1850+
1851+ self.assertIn(
1852+ "processing relation resolved changed", log_output.getvalue())
1853+
1854+
1855 class UnitLifecycleTest(LifecycleTestBase):
1856
1857 @inlineCallbacks
1858@@ -187,6 +367,45 @@
1859 # verify the sockets are cleaned up.
1860 self.assertEqual(os.listdir(self.unit_directory), ["formula"])
1861
1862+ @inlineCallbacks
1863+ def test_start_sans_hook(self):
1864+ """The lifecycle start can be invoked without firing hooks."""
1865+ self.write_hook("start", "#!/bin/sh\n exit 1")
1866+ start_executed = self.wait_on_hook("start")
1867+ yield self.lifecycle.start(fire_hooks=False)
1868+ # Wait for unit relation background processing....
1869+ yield self.sleep(0.1)
1870+ self.assertFalse(start_executed.called)
1871+
1872+ @inlineCallbacks
1873+ def test_stop_sans_hook(self):
1874+ """The lifecycle stop can be invoked without firing hooks."""
1875+ self.write_hook("stop", "#!/bin/sh\n exit 1")
1876+ stop_executed = self.wait_on_hook("stop")
1877+ yield self.lifecycle.start()
1878+ yield self.lifecycle.stop(fire_hooks=False)
1879+ # Wait for unit relation background processing....
1880+ yield self.sleep(0.1)
1881+ self.assertFalse(stop_executed.called)
1882+
1883+ @inlineCallbacks
1884+ def test_install_sans_hook(self):
1885+ """The lifecycle install can be invoked without firing hooks."""
1886+ self.write_hook("install", "#!/bin/sh\n exit 1")
1887+ install_executed = self.wait_on_hook("install")
1888+ yield self.lifecycle.install(fire_hooks=False)
1889+ self.assertFalse(install_executed.called)
1890+
1891+ @inlineCallbacks
1892+ def test_upgrade_sans_hook(self):
1893+ """The lifecycle upgrade can be invoked without firing hooks."""
1894+ self.executor.stop()
1895+ self.write_hook("upgrade-formula", "#!/bin/sh\n exit 1")
1896+ upgrade_executed = self.wait_on_hook("upgrade-formula")
1897+ yield self.lifecycle.upgrade_formula(fire_hooks=False)
1898+ self.assertFalse(upgrade_executed.called)
1899+ self.assertTrue(self.executor.running)
1900+
1901 def test_hook_error(self):
1902 """Verify hook execution error, raises an exception."""
1903 self.write_hook("install", '#!/bin/sh\n exit 1')
1904@@ -196,14 +415,12 @@
1905 def test_hook_not_executable(self):
1906 """A hook not executable, raises an exception."""
1907 self.write_hook("install", '#!/bin/sh\n exit 0', no_exec=True)
1908- # It would be preferrable if this was also a formulainvocation error.
1909 return self.failUnlessFailure(
1910 self.lifecycle.install(), FormulaError)
1911
1912 def test_hook_not_formatted_correctly(self):
1913 """Hook execution error, raises an exception."""
1914 self.write_hook("install", '!/bin/sh\n exit 0')
1915- # It would be preferrable if this was also a formulainvocation error.
1916 return self.failUnlessFailure(
1917 self.lifecycle.install(), FormulaInvocationError)
1918
1919@@ -510,7 +727,7 @@
1920 @inlineCallbacks
1921 def test_initial_start_lifecycle_no_related_no_exec(self):
1922 """
1923- If there are no related units on startup, the relation changed hook
1924+ If there are no related units on startup, the relation joined hook
1925 is not invoked.
1926 """
1927 file_path = self.makeFile()
1928@@ -523,6 +740,26 @@
1929 self.assertFalse(os.path.exists(file_path))
1930
1931 @inlineCallbacks
1932+ def test_stop_can_continue_watching(self):
1933+ """
1934+ """
1935+ file_path = self.makeFile()
1936+ self.write_hook(
1937+ "%s-relation-changed" % self.relation_name,
1938+ ("#!/bin/bash\n" "echo executed >> %s\n" % file_path))
1939+ rel_states = yield self.add_opposite_service_unit(self.states)
1940+ yield self.lifecycle.start()
1941+ yield self.wait_on_hook(
1942+ sequence=["app-relation-joined", "app-relation-changed"])
1943+ changed_executed = self.wait_on_hook("app-relation-changed")
1944+ yield self.lifecycle.stop(watches=False)
1945+ rel_states["unit_relation"].set_data(yaml.dump(dict(hello="world")))
1946+ yield self.sleep(0.1)
1947+ self.assertFalse(changed_executed.called)
1948+ yield self.lifecycle.start(watches=False)
1949+ yield changed_executed
1950+
1951+ @inlineCallbacks
1952 def test_initial_start_lifecycle_with_related(self):
1953 """
1954 If there are related units on startup, the relation changed hook
1955
1956=== modified file 'ensemble/unit/tests/test_workflow.py'
1957--- ensemble/unit/tests/test_workflow.py 2011-04-28 17:55:11 +0000
1958+++ ensemble/unit/tests/test_workflow.py 2011-05-02 21:57:29 +0000
1959@@ -10,7 +10,7 @@
1960
1961 from ensemble.unit.workflow import (
1962 UnitWorkflowState, RelationWorkflowState, WorkflowStateClient,
1963- is_unit_running)
1964+ is_unit_running, is_relation_running)
1965
1966
1967 class WorkflowTestBase(LifecycleTestBase):
1968@@ -321,6 +321,26 @@
1969 self.state_directory)
1970
1971 @inlineCallbacks
1972+ def test_is_relation_running(self):
1973+ """The unit relation's workflow state can be categorized as a
1974+ boolean.
1975+ """
1976+ running, state = yield is_relation_running(
1977+ self.client, self.states["unit_relation"])
1978+ self.assertIdentical(running, False)
1979+ self.assertIdentical(state, None)
1980+ yield self.workflow.fire_transition("start")
1981+ running, state = yield is_relation_running(
1982+ self.client, self.states["unit_relation"])
1983+ self.assertIdentical(running, True)
1984+ self.assertEqual(state, "up")
1985+ yield self.workflow.fire_transition("stop")
1986+ running, state = yield is_relation_running(
1987+ self.client, self.states["unit_relation"])
1988+ self.assertIdentical(running, False)
1989+ self.assertEqual(state, "down")
1990+
1991+ @inlineCallbacks
1992 def test_up_down_cycle(self):
1993 """The workflow can be transition from up to down, and back.
1994 """
1995@@ -381,7 +401,7 @@
1996 # Add a new unit, and wait for the broken hook to result in
1997 # the transition to the down state.
1998 yield self.add_opposite_service_unit(self.states)
1999- yield self.wait_on_state(self.workflow, "down")
2000+ yield self.wait_on_state(self.workflow, "error")
2001
2002 f_state, history, zk_state = yield self.read_persistent_state(
2003 history_id=self.workflow.zk_state_id)
2004@@ -392,7 +412,7 @@
2005 "formula", "hooks", "app-relation-changed"))
2006
2007 self.assertEqual(f_state,
2008- {"state": "down",
2009+ {"state": "error",
2010 "state_variables": {
2011 "change_type": "joined",
2012 "error_message": error}})
2013
2014=== modified file 'ensemble/unit/workflow.py'
2015--- ensemble/unit/workflow.py 2011-04-19 18:55:06 +0000
2016+++ ensemble/unit/workflow.py 2011-05-02 21:57:29 +0000
2017@@ -17,15 +17,19 @@
2018 Transition("install", "Install", None, "installed",
2019 error_transition_id="error_install"),
2020 Transition("error_install", "Install Error", None, "install_error"),
2021- Transition("retry_install", "Retry Install", "install_error", "installed"),
2022+ Transition("retry_install", "Retry Install", "install_error", "installed",
2023+ alias="retry"),
2024 Transition("start", "Start", "installed", "started",
2025 error_transition_id="error_start"),
2026 Transition("error_start", "Start Error", "installed", "start_error"),
2027- Transition("retry_start", "Retry Start", "start_error", "started"),
2028+ Transition("retry_start", "Retry Start", "start_error", "started",
2029+ alias="retry"),
2030 Transition("stop", "Stop", "started", "stopped",
2031 error_transition_id="error_stop"),
2032 Transition("error_stop", "Stop Error", "started", "stop_error"),
2033- Transition("retry_stop", "Retry Stop", "stop_error", "stopped"),
2034+ Transition("retry_stop", "Retry Stop", "stop_error", "stopped",
2035+ alias="retry"),
2036+ Transition("restart", "Restart", "stop", "start", alias="retry"),
2037
2038 # Upgrade Transitions (stay in state, with success transitition)
2039 Transition(
2040@@ -36,27 +40,47 @@
2041 "started", "formula_upgrade_error"),
2042 Transition(
2043 "retry_upgrade_formula", "Upgrade from stop error",
2044- "formula_upgrade_error", "started")
2045+ "formula_upgrade_error", "started", alias="retry")
2046 )
2047
2048
2049-# There's been some discussion, if we should have per change type error states
2050-# here, corresponding to the different changes that the relation-changed hook
2051-# is invoked for. The important aspects to capture are both observability of
2052-# error type locally and globally (zk), and per error type and instance
2053-# recovery of the same. To provide for this functionality without additional
2054-# states, the error information (change type, and error message) are captured
2055-# in state variables which are locally and globally observable. Future
2056-# extension of the restart transition action, will allow for customized
2057-# recovery based on the change type state variable. Effectively this
2058-# differs from the unit definition, in that it collapses three possible error
2059-# states, into a behavior off switch. A separate state will be needed to
2060-# denote departing.
2061+# Unit relation error states
2062+#
2063+# There's been some discussion, if we should have per change type
2064+# error states here, corresponding to the different changes that the
2065+# relation-changed hook is invoked for. The important aspects to
2066+# capture are both observability of error type locally and globally
2067+# (zk), and per error type and instance recovery of the same. To
2068+# provide for this functionality without additional states, the error
2069+# information (change type, and error message) are captured in state
2070+# variables which are locally and globally observable. Future
2071+# extension of the restart transition action, will allow for
2072+# customized recovery based on the change type state
2073+# variable. Effectively this differs from the unit definition, in that
2074+# it collapses three possible error states, into a behavior off
2075+# switch. A separate state will be needed to denote departing.
2076+
2077+
2078+# Process recovery using on disk workflow state
2079+#
2080+# Another interesting issue, process recovery using the on disk state,
2081+# is complicated by consistency to the the in memory state, which
2082+# won't be directly recoverable anymore without some state specific
2083+# semantics to recovering from on disk state, ie a restarted unit
2084+# agent, with a relation in an error state would require special
2085+# semantics around loading from disk to ensure that the in-memory
2086+# process state (watching and scheduling but not executing) matches
2087+# the recovery transition actions (which just restart hook execution,
2088+# but assume the watch continues).. this functionality added to better
2089+# allow for the behavior that while down due to a hook error, the
2090+# relation would continues to schedule pending hooks
2091
2092 RelationWorkflow = Workflow(
2093 Transition("start", "Start", None, "up"),
2094 Transition("stop", "Stop", "up", "down"),
2095- Transition("restart", "Restart", "down", "up"),
2096+ Transition("restart", "Restart", "down", "up", alias="retry"),
2097+ Transition("error", "Relation hook error", "up", "error"),
2098+ Transition("reset", "Recover from hook error", "error", "up"),
2099 Transition("depart", "Relation broken", "up", "departed"),
2100 Transition("down_depart", "Relation broken", "down", "departed"),
2101 )
2102@@ -67,12 +91,26 @@
2103 """Is the service unit in a running state.
2104
2105 Returns a boolean which is true if the unit is running, and
2106- the unit state in two element tuple.
2107+ the unit workflow state in a two element tuple.
2108 """
2109 workflow_state = yield WorkflowStateClient(client, unit).get_state()
2110 if not workflow_state:
2111 returnValue((False, None))
2112- running = workflow_state in ("started",)
2113+ running = workflow_state == "started"
2114+ returnValue((running, workflow_state))
2115+
2116+
2117+@inlineCallbacks
2118+def is_relation_running(client, relation):
2119+ """Is the unit relation in a running state.
2120+
2121+ Returns a boolean which is true if the relation is running, and
2122+ the unit relation workflow state in a two element tuple.
2123+ """
2124+ workflow_state = yield WorkflowStateClient(client, relation).get_state()
2125+ if not workflow_state:
2126+ returnValue((False, None))
2127+ running = workflow_state == "up"
2128 returnValue((running, workflow_state))
2129
2130
2131@@ -198,7 +236,6 @@
2132 row per entry with CSV escaping.
2133 """
2134 state_serialized = yaml.safe_dump(state_dict)
2135-
2136 # State File
2137 with open(self.state_file_path, "w") as handle:
2138 handle.write(state_serialized)
2139@@ -218,6 +255,7 @@
2140 return {"state": None}
2141 with open(self.state_file_path, "r") as handle:
2142 content = handle.read()
2143+
2144 return yaml.load(content)
2145
2146
2147@@ -257,9 +295,9 @@
2148 self._lifecycle = lifecycle
2149
2150 @inlineCallbacks
2151- def _invoke_lifecycle(self, method):
2152+ def _invoke_lifecycle(self, method, *args, **kw):
2153 try:
2154- result = yield method()
2155+ result = yield method(*args, **kw)
2156 except (FileNotFound, FormulaError, FormulaInvocationError), e:
2157 raise TransitionError(e)
2158 returnValue(result)
2159@@ -274,21 +312,25 @@
2160 def do_stop(self):
2161 return self._invoke_lifecycle(self._lifecycle.stop)
2162
2163- def do_retry_start(self):
2164- return self._invoke_lifecycle(self._lifecycle.start)
2165-
2166- def do_retry_stop(self):
2167- self._invoke_lifecycle(self._lifecycle.stop)
2168-
2169- def do_retry_install(self):
2170- return self._invoke_lifecycle(self._lifecycle.install)
2171-
2172- def do_retry_upgrade_formula(self):
2173- return self._invoke_lifecycle(self._lifecycle.upgrade_formula)
2174-
2175 def do_upgrade_formula(self):
2176 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)
2177
2178+ def do_retry_start(self, fire_hooks=True):
2179+ return self._invoke_lifecycle(
2180+ self._lifecycle.start, fire_hooks=fire_hooks)
2181+
2182+ def do_retry_stop(self, fire_hooks=True):
2183+ self._invoke_lifecycle(
2184+ self._lifecycle.stop, fire_hooks=fire_hooks)
2185+
2186+ def do_retry_install(self, fire_hooks=True):
2187+ return self._invoke_lifecycle(
2188+ self._lifecycle.install, fire_hooks=fire_hooks)
2189+
2190+ def do_retry_upgrade_formula(self, fire_hooks=True):
2191+ return self._invoke_lifecycle(
2192+ self._lifecycle.upgrade_formula, fire_hooks=fire_hooks)
2193+
2194
2195 class RelationWorkflowState(DiskWorkflowState):
2196
2197@@ -321,7 +363,7 @@
2198
2199 @param: error: The error from hook invocation.
2200 """
2201- yield self.fire_transition("stop",
2202+ yield self.fire_transition("error",
2203 change_type=relation_change.change_type,
2204 error_message=str(error))
2205
2206@@ -330,12 +372,30 @@
2207 """Transition the workflow to the 'down' state.
2208
2209 Turns off the unit-relation lifecycle monitoring and hook execution.
2210+
2211+ :param error_info: If called on relation hook error, contains
2212+ error variables.
2213 """
2214 yield self._lifecycle.stop()
2215
2216 @inlineCallbacks
2217+ def do_reset(self):
2218+ """Transition the workflow to the 'up' state from an error state.
2219+
2220+ Turns on the unit-relation lifecycle monitoring and hook execution.
2221+ """
2222+ yield self._lifecycle.start(watches=False)
2223+
2224+ @inlineCallbacks
2225+ def do_error(self, **error_info):
2226+ """A relation hook error, stops further execution hooks but
2227+ continues to watch for changes.
2228+ """
2229+ yield self._lifecycle.stop(watches=False)
2230+
2231+ @inlineCallbacks
2232 def do_restart(self):
2233- """Transition the workflow to the 'up' state.
2234+ """Transition the workflow to the 'up' state from the down state.
2235
2236 Turns on the unit-relation lifecycle monitoring and hook execution.
2237 """

Subscribers

People subscribed via source and target branches

to status/vote changes: