Merge lp:~hazmat/pyjuju/unit-agent-resolved into lp:pyjuju

Proposed by Kapil Thangavelu
Status: Merged
Approved by: Gustavo Niemeyer
Approved revision: 275
Merged at revision: 224
Proposed branch: lp:~hazmat/pyjuju/unit-agent-resolved
Merge into: lp:pyjuju
Prerequisite: lp:~hazmat/pyjuju/ensemble-resolved
Diff against target: 1276 lines (+718/-94)
7 files modified
ensemble/control/tests/test_resolved.py (+0/-1)
ensemble/state/service.py (+18/-5)
ensemble/state/tests/test_service.py (+71/-1)
ensemble/unit/lifecycle.py (+85/-21)
ensemble/unit/tests/test_lifecycle.py (+242/-3)
ensemble/unit/tests/test_workflow.py (+154/-12)
ensemble/unit/workflow.py (+148/-51)
To merge this branch: bzr merge lp:~hazmat/pyjuju/unit-agent-resolved
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Approve
Review via email: mp+59713@code.launchpad.net

This proposal supersedes a proposal from 2011-05-02.

Description of the change

This implements the unit lifecycle resolving unit relations, and additional changes to enable resolution (transition actions receiving hook execution flags).

The branch is large enough that i'm going to split the remaining unit agent resolving unit relations into another branch.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Kapil mentioned he's still working on this one.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

[1]

+ def do_retry_start(self, fire_hooks=True):
+ return self._invoke_lifecycle(
+ self._lifecycle.start, fire_hooks=fire_hooks)

We've already debated this live over a voice call, and had already
covered the topic in a previous conversation: I think it is a mistake
to introduce variables which define how the transition should work.

This is increasing the complexity of actions in an unpredictable way
(a single action with parameters could handle *all* the possible
transitions in the state machine).

review: Needs Fixing
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Changed over to using additional transitions, its a bit nicer now, thanks.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Nice, thanks for the changes. Looking good!

+1, taking these in consideration:

[2]

+ def do_retry_upgrade_formula_hook(self, fire_hooks=True):

The fire_hooks seems to be a left over.

[3]

+ return self._invoke_lifecycle(
+ self._lifecycle.start, fire_hooks=False).addCallback(
+ lambda x: self._invoke_lifecycle(
+ self._lifecycle.configure, fire_hooks=False))
(...)
+ return self._invoke_lifecycle(
+ self._lifecycle.start, fire_hooks=False).addCallback(
+ lambda x: self._invoke_lifecycle(
+ self._lifecycle.configure))

Breaking these down would make them more readable than the huge one-liners.

[4]

+# Another interesting issue, process recovery using the on disk state,
+# is complicated by consistency to the the in memory state, which
+# won't be directly recoverable anymore without some state specific

Sweet. Thanks for capturing those ideas.

[5]

+ # If the unit lifecycle isn't running we shouldn't process
+ # any relation resolutions.
+ if not self._running:
+ self._log.debug("stop watch relation resolved changes")
+ self._watching_relation_resolved = False
+ raise StopWatcher()

It's not clear what's the intention here, and test coverage also
looks a bit poor in this area (e.g. replacing everything under the "if" by
"return" still passes tests fine).

What do we want to happen in this case, and what's the rationale
behind it?

[6]

+ if self._client.connected:
+ yield self._process_relation_resolved_changes()

What's the "if connected" test about? It feels like pretty much every
interaction with zk could have such a test. Why is this location
special?

review: Approve
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Excerpts from Gustavo Niemeyer's message of Thu May 05 02:01:15 UTC 2011:
> Review: Approve
> Nice, thanks for the changes. Looking good!
>
> +1, taking these in consideration:
>
> [5]
>
> + # If the unit lifecycle isn't running we shouldn't process
> + # any relation resolutions.
> + if not self._running:
> + self._log.debug("stop watch relation resolved changes")
> + self._watching_relation_resolved = False
> + raise StopWatcher()
>
> It's not clear what's the intention here, and test coverage also
> looks a bit poor in this area (e.g. replacing everything under the "if" by
> "return" still passes tests fine).
>
> What do we want to happen in this case, and what's the rationale
> behind it?
>

The notion is if we're not running, the watcher terminates, when we start watching
again the watcher is restarted, if it doesn't already exist. Its designed to
avoid concurrent watchers, and to allow for watch termination.

The design rationale being if a unit is not running, it needs to be resolved before
its unit relations can be fixed.

Being able to correctly tests runs into another behavior on the unit lifecycle,
where we automatically repair unit relations when the unit is started. I've
introduced a separate error state for relations in this branch, so we can choose
not to have unit relation errors automatically recovered, in which case the logic
in question can be tested easily, by bringing the unit back online, and the verifying
the resolve action was performed.

>
> [6]
>
> + if self._client.connected:
> + yield self._process_relation_resolved_changes()
>
> What's the "if connected" test about? It feels like pretty much every
> interaction with zk could have such a test. Why is this location
> special?
>

Its typically done in watch callbacks, such that an async background execution while the test
is closing, does not trigger a zookeeper closing exception, and minimizes errors.

276. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

277. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

278. By Kapil Thangavelu

Merged ensemble-resolved into unit-agent-resolved.

279. By Kapil Thangavelu

address review comments re formatting

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Excerpts from Kapil Thangavelu's message of Thu May 05 13:26:02 -0400 2011:
> Excerpts from Gustavo Niemeyer's message of Thu May 05 02:01:15 UTC 2011:
> > Review: Approve
> > Nice, thanks for the changes. Looking good!
> >
> > +1, taking these in consideration:
> >
> > [5]
> >
> > + # If the unit lifecycle isn't running we shouldn't process
> > + # any relation resolutions.
> > + if not self._running:
> > + self._log.debug("stop watch relation resolved changes")
> > + self._watching_relation_resolved = False
> > + raise StopWatcher()
> >
> > It's not clear what's the intention here, and test coverage also
> > looks a bit poor in this area (e.g. replacing everything under the "if" by
> > "return" still passes tests fine).
> >
> > What do we want to happen in this case, and what's the rationale
> > behind it?
> >
>
> The notion is if we're not running, the watcher terminates, when we start watching
> again the watcher is restarted, if it doesn't already exist. Its designed to
> avoid concurrent watchers, and to allow for watch termination.
>
> The design rationale being if a unit is not running, it needs to be resolved before
> its unit relations can be fixed.
>
> Being able to correctly tests runs into another behavior on the unit lifecycle,
> where we automatically repair unit relations when the unit is started. I've
> introduced a separate error state for relations in this branch, so we can choose
> not to have unit relation errors automatically recovered, in which case the logic
> in question can be tested easily, by bringing the unit back online, and the verifying
> the resolve action was performed.

Just to be clear this is a test for the overall functionality in

test_resolved_relation_watch_unit_lifecycle_not_running

But it because of the above constraint regarding automatic recover on start, it can
only verify that the watcher has stopped, not that the recovery proceeds normally
after restart, so it instead verifies that the resolved setting has persisted
past the end of the watcher.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'ensemble/control/tests/test_resolved.py'
--- ensemble/control/tests/test_resolved.py 2011-04-28 17:09:36 +0000
+++ ensemble/control/tests/test_resolved.py 2011-05-05 17:43:23 +0000
@@ -2,7 +2,6 @@
2from yaml import dump2from yaml import dump
33
4from ensemble.control import main4from ensemble.control import main
5from ensemble.control.resolved import resolved
6from ensemble.control.tests.common import ControlToolTest5from ensemble.control.tests.common import ControlToolTest
7from ensemble.formula.tests.test_repository import RepositoryTestBase6from ensemble.formula.tests.test_repository import RepositoryTestBase
87
98
=== modified file 'ensemble/state/service.py'
--- ensemble/state/service.py 2011-05-05 16:18:34 +0000
+++ ensemble/state/service.py 2011-05-05 17:43:23 +0000
@@ -16,11 +16,10 @@
16 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,16 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,
17 BadDescriptor, BadServiceStateName, NoUnusedMachines,17 BadDescriptor, BadServiceStateName, NoUnusedMachines,
18 ServiceUnitDebugAlreadyEnabled, ServiceUnitResolvedAlreadyEnabled,18 ServiceUnitDebugAlreadyEnabled, ServiceUnitResolvedAlreadyEnabled,
19 ServiceUnitRelationResolvedAlreadyEnabled)19 ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher)
20from ensemble.state.formula import FormulaStateManager20from ensemble.state.formula import FormulaStateManager
21from ensemble.state.relation import ServiceRelationState, RelationStateManager21from ensemble.state.relation import ServiceRelationState, RelationStateManager
22from ensemble.state.machine import _public_machine_id, MachineState22from ensemble.state.machine import _public_machine_id, MachineState
23
24from ensemble.state.utils import remove_tree, dict_merge, YAMLState23from ensemble.state.utils import remove_tree, dict_merge, YAMLState
2524
26RETRY_HOOKS = 100025RETRY_HOOKS = 1000
@@ -516,7 +515,7 @@
516 its `write` method invoked to publish the state to Zookeeper.515 its `write` method invoked to publish the state to Zookeeper.
517 """516 """
518 config_node = YAMLState(self._client,517 config_node = YAMLState(self._client,
519 "/services/%s/config" % self._internal_id )518 "/services/%s/config" % self._internal_id)
520 yield config_node.read()519 yield config_node.read()
521 returnValue(config_node)520 returnValue(config_node)
522521
@@ -944,9 +943,13 @@
944 def watcher(change_event):943 def watcher(change_event):
945 if not self._client.connected:944 if not self._client.connected:
946 returnValue(None)945 returnValue(None)
946
947 exists_d, watch_d = self._client.exists_and_watch(947 exists_d, watch_d = self._client.exists_and_watch(
948 self._unit_resolve_path)948 self._unit_resolve_path)
949 yield callback(change_event)949 try:
950 yield callback(change_event)
951 except StopWatcher:
952 returnValue(None)
950 watch_d.addCallback(watcher)953 watch_d.addCallback(watcher)
951954
952 exists_d, watch_d = self._client.exists_and_watch(955 exists_d, watch_d = self._client.exists_and_watch(
@@ -958,6 +961,8 @@
958 callback_d = maybeDeferred(callback, bool(exists))961 callback_d = maybeDeferred(callback, bool(exists))
959 callback_d.addCallback(962 callback_d.addCallback(
960 lambda x: watch_d.addCallback(watcher) and x)963 lambda x: watch_d.addCallback(watcher) and x)
964 callback_d.addErrback(
965 lambda failure: failure.trap(StopWatcher))
961966
962 @property967 @property
963 def _relation_resolved_path(self):968 def _relation_resolved_path(self):
@@ -1033,6 +1038,8 @@
10331038
1034 @inlineCallbacks1039 @inlineCallbacks
1035 def clear_relation_resolved(self):1040 def clear_relation_resolved(self):
1041 """ Clear the relation resolved setting.
1042 """
1036 try:1043 try:
1037 yield self._client.delete(self._relation_resolved_path)1044 yield self._client.delete(self._relation_resolved_path)
1038 except zookeeper.NoNodeException:1045 except zookeeper.NoNodeException:
@@ -1055,7 +1062,11 @@
1055 returnValue(None)1062 returnValue(None)
1056 exists_d, watch_d = self._client.exists_and_watch(1063 exists_d, watch_d = self._client.exists_and_watch(
1057 self._relation_resolved_path)1064 self._relation_resolved_path)
1058 yield callback(change_event)1065 try:
1066 yield callback(change_event)
1067 except StopWatcher:
1068 returnValue(None)
1069
1059 watch_d.addCallback(watcher)1070 watch_d.addCallback(watcher)
10601071
1061 exists_d, watch_d = self._client.exists_and_watch(1072 exists_d, watch_d = self._client.exists_and_watch(
@@ -1068,6 +1079,8 @@
1068 callback_d = maybeDeferred(callback, bool(exists))1079 callback_d = maybeDeferred(callback, bool(exists))
1069 callback_d.addCallback(1080 callback_d.addCallback(
1070 lambda x: watch_d.addCallback(watcher) and x)1081 lambda x: watch_d.addCallback(watcher) and x)
1082 callback_d.addErrback(
1083 lambda failure: failure.trap(StopWatcher))
10711084
10721085
1073def _parse_unit_name(unit_name):1086def _parse_unit_name(unit_name):
10741087
=== modified file 'ensemble/state/tests/test_service.py'
--- ensemble/state/tests/test_service.py 2011-05-05 14:31:41 +0000
+++ ensemble/state/tests/test_service.py 2011-05-05 17:43:23 +0000
@@ -17,7 +17,7 @@
17 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,17 ServiceUnitStateMachineAlreadyAssigned, ServiceStateNameInUse,
18 BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled,18 BadDescriptor, BadServiceStateName, ServiceUnitDebugAlreadyEnabled,
19 MachineStateNotFound, NoUnusedMachines, ServiceUnitResolvedAlreadyEnabled,19 MachineStateNotFound, NoUnusedMachines, ServiceUnitResolvedAlreadyEnabled,
20 ServiceUnitRelationResolvedAlreadyEnabled)20 ServiceUnitRelationResolvedAlreadyEnabled, StopWatcher)
2121
2222
23from ensemble.state.tests.common import StateTestBase23from ensemble.state.tests.common import StateTestBase
@@ -790,6 +790,42 @@
790 {"retry": NO_HOOKS})790 {"retry": NO_HOOKS})
791791
792 @inlineCallbacks792 @inlineCallbacks
793 def test_stop_watch_resolved(self):
794 """A unit resolved watch can be instituted on a permanent basis.
795
796 However the callback can raise StopWatcher at anytime to stop the watch
797 """
798 unit_state = yield self.get_unit_state()
799
800 results = []
801
802 def callback(value):
803 results.append(value)
804 if len(results) == 1:
805 raise StopWatcher()
806 if len(results) == 3:
807 raise StopWatcher()
808
809 unit_state.watch_resolved(callback)
810 yield unit_state.set_resolved(RETRY_HOOKS)
811 yield unit_state.clear_resolved()
812 yield self.poke_zk()
813
814 unit_state.watch_resolved(callback)
815 yield unit_state.set_resolved(NO_HOOKS)
816 yield unit_state.clear_resolved()
817
818 yield self.poke_zk()
819
820 self.assertEqual(len(results), 3)
821 self.assertIdentical(results.pop(0), False)
822 self.assertIdentical(results.pop(0), False)
823 self.assertEqual(results.pop(0).type_name, "created")
824
825 self.assertEqual(
826 (yield unit_state.get_resolved()), None)
827
828 @inlineCallbacks
793 def test_get_set_clear_relation_resolved(self):829 def test_get_set_clear_relation_resolved(self):
794 """The a unit's realtions can be set to resolved to mark a830 """The a unit's realtions can be set to resolved to mark a
795 future transition, with an optional retry flag."""831 future transition, with an optional retry flag."""
@@ -850,6 +886,40 @@
850 {"0": NO_HOOKS})886 {"0": NO_HOOKS})
851887
852 @inlineCallbacks888 @inlineCallbacks
889 def test_stop_watch_relation_resolved(self):
890 """A unit resolved watch can be instituted on a permanent basis."""
891 unit_state = yield self.get_unit_state()
892
893 results = []
894
895 def callback(value):
896 results.append(value)
897
898 if len(results) == 1:
899 raise StopWatcher()
900
901 if len(results) == 3:
902 raise StopWatcher()
903
904 unit_state.watch_relation_resolved(callback)
905 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
906 yield unit_state.clear_relation_resolved()
907 yield self.poke_zk()
908 self.assertEqual(len(results), 1)
909
910 unit_state.watch_relation_resolved(callback)
911 yield unit_state.set_relation_resolved({"0": RETRY_HOOKS})
912 yield unit_state.clear_relation_resolved()
913 yield self.poke_zk()
914 self.assertEqual(len(results), 3)
915 self.assertIdentical(results.pop(0), False)
916 self.assertIdentical(results.pop(0), False)
917 self.assertEqual(results.pop(0).type_name, "created")
918
919 self.assertEqual(
920 (yield unit_state.get_relation_resolved()), None)
921
922 @inlineCallbacks
853 def test_watch_resolved_slow_callback(self):923 def test_watch_resolved_slow_callback(self):
854 """A slow watch callback is still invoked serially."""924 """A slow watch callback is still invoked serially."""
855 unit_state = yield self.get_unit_state()925 unit_state = yield self.get_unit_state()
856926
=== modified file 'ensemble/unit/lifecycle.py'
--- ensemble/unit/lifecycle.py 2011-05-05 14:31:43 +0000
+++ ensemble/unit/lifecycle.py 2011-05-05 17:43:23 +0000
@@ -2,7 +2,7 @@
2import logging2import logging
33
4from twisted.internet.defer import (4from twisted.internet.defer import (
5 inlineCallbacks, DeferredLock)5 inlineCallbacks, DeferredLock, returnValue)
66
7from ensemble.hooks.invoker import Invoker7from ensemble.hooks.invoker import Invoker
8from ensemble.hooks.scheduler import HookScheduler8from ensemble.hooks.scheduler import HookScheduler
@@ -32,7 +32,8 @@
32 self._unit_path = unit_path32 self._unit_path = unit_path
33 self._relations = {}33 self._relations = {}
34 self._running = False34 self._running = False
35 self._watching = False35 self._watching_relation_memberships = False
36 self._watching_relation_resolved = False
36 self._run_lock = DeferredLock()37 self._run_lock = DeferredLock()
37 self._log = logging.getLogger("unit.lifecycle")38 self._log = logging.getLogger("unit.lifecycle")
3839
@@ -45,21 +46,23 @@
45 return self._relations[relation_id]46 return self._relations[relation_id]
4647
47 @inlineCallbacks48 @inlineCallbacks
48 def install(self):49 def install(self, fire_hooks=True):
49 """Invoke the unit's install hook.50 """Invoke the unit's install hook.
50 """51 """
51 yield self._execute_hook("install")52 if fire_hooks:
53 yield self._execute_hook("install")
5254
53 @inlineCallbacks55 @inlineCallbacks
54 def upgrade_formula(self):56 def upgrade_formula(self, fire_hooks=True):
55 """Invoke the unit's upgrade-formula hook.57 """Invoke the unit's upgrade-formula hook.
56 """58 """
57 yield self._execute_hook("upgrade-formula", now=True)59 if fire_hooks:
60 yield self._execute_hook("upgrade-formula", now=True)
58 # Restart hook queued hook execution.61 # Restart hook queued hook execution.
59 self._executor.start()62 self._executor.start()
6063
61 @inlineCallbacks64 @inlineCallbacks
62 def start(self):65 def start(self, fire_hooks=True):
63 """Invoke the start hook, and setup relation watching.66 """Invoke the start hook, and setup relation watching.
64 """67 """
65 self._log.debug("pre-start acquire, running:%s", self._running)68 self._log.debug("pre-start acquire, running:%s", self._running)
@@ -70,22 +73,29 @@
70 assert not self._running, "Already started"73 assert not self._running, "Already started"
7174
72 # Execute the start hook75 # Execute the start hook
73 yield self._execute_hook("start")76 if fire_hooks:
77 yield self._execute_hook("start")
7478
75 # If we have any existing relations in memory, start them.79 # If we have any existing relations in memory, start them.
76 if self._relations:80 if self._relations:
77 self._log.debug("starting relation lifecycles")81 self._log.debug("starting relation lifecycles")
7882
79 for workflow in self._relations.values():83 for workflow in self._relations.values():
80 # should not transition an
81 yield workflow.transition_state("up")84 yield workflow.transition_state("up")
8285
83 # Establish a watch on the existing relations.86 # Establish a watch on the existing relations.
84 if not self._watching:87 if not self._watching_relation_memberships:
85 self._log.debug("starting service relation watch")88 self._log.debug("starting service relation watch")
86 yield self._service.watch_relation_states(89 yield self._service.watch_relation_states(
87 self._on_service_relation_changes)90 self._on_service_relation_changes)
88 self._watching = True91 self._watching_relation_memberships = True
92
93 # Establish a watch for resolved relations
94 if not self._watching_relation_resolved:
95 self._log.debug("starting unit relation resolved watch")
96 yield self._unit.watch_relation_resolved(
97 self._on_relation_resolved_changes)
98 self._watching_relation_resolved = True
8999
90 # Set current status100 # Set current status
91 self._running = True101 self._running = True
@@ -94,7 +104,7 @@
94 self._log.debug("started unit lifecycle")104 self._log.debug("started unit lifecycle")
95105
96 @inlineCallbacks106 @inlineCallbacks
97 def stop(self):107 def stop(self, fire_hooks=True):
98 """Stop the unit, executes the stop hook, and stops relation watching.108 """Stop the unit, executes the stop hook, and stops relation watching.
99 """109 """
100 self._log.debug("pre-stop acquire, running:%s", self._running)110 self._log.debug("pre-stop acquire, running:%s", self._running)
@@ -110,7 +120,8 @@
110 for workflow in self._relations.values():120 for workflow in self._relations.values():
111 yield workflow.transition_state("down")121 yield workflow.transition_state("down")
112122
113 yield self._execute_hook("stop")123 if fire_hooks:
124 yield self._execute_hook("stop")
114125
115 # Set current status126 # Set current status
116 self._running = False127 self._running = False
@@ -119,9 +130,11 @@
119 self._log.debug("stopped unit lifecycle")130 self._log.debug("stopped unit lifecycle")
120131
121 @inlineCallbacks132 @inlineCallbacks
122 def configure(self):133 def configure(self, fire_hooks=True):
123 """Inform the unit that its service config has changed.134 """Inform the unit that its service config has changed.
124 """135 """
136 if not fire_hooks:
137 returnValue(None)
125 yield self._run_lock.acquire()138 yield self._run_lock.acquire()
126 try:139 try:
127 # Verify State140 # Verify State
@@ -134,6 +147,49 @@
134 self._log.debug("configured unit")147 self._log.debug("configured unit")
135148
136 @inlineCallbacks149 @inlineCallbacks
150 def _on_relation_resolved_changes(self, event):
151 """Callback for unit relation resolved watching.
152
153 The callback is invoked whenever the relation resolved
154 settings change.
155 """
156 self._log.debug("relation resolved changed")
157 # Acquire the run lock, and process the changes.
158 yield self._run_lock.acquire()
159
160 try:
161 # If the unit lifecycle isn't running we shouldn't process
162 # any relation resolutions.
163 if not self._running:
164 self._log.debug("stop watch relation resolved changes")
165 self._watching_relation_resolved = False
166 raise StopWatcher()
167
168 self._log.info("processing relation resolved changed")
169 if self._client.connected:
170 yield self._process_relation_resolved_changes()
171 finally:
172 yield self._run_lock.release()
173
174 @inlineCallbacks
175 def _process_relation_resolved_changes(self):
176 """Invoke retry transitions on relations if their not running.
177 """
178 relation_resolved = yield self._unit.get_relation_resolved()
179 if relation_resolved is None:
180 returnValue(None)
181 else:
182 yield self._unit.clear_relation_resolved()
183
184 keys = set(relation_resolved).intersection(self._relations)
185 for rel_id in keys:
186 relation_workflow = self._relations[rel_id]
187 relation_state = yield relation_workflow.get_state()
188 if relation_state == "up":
189 continue
190 yield relation_workflow.transition_state("up")
191
192 @inlineCallbacks
137 def _on_service_relation_changes(self, old_relations, new_relations):193 def _on_service_relation_changes(self, old_relations, new_relations):
138 """Callback for service relation watching.194 """Callback for service relation watching.
139195
@@ -153,7 +209,7 @@
153 # If the lifecycle is not running, then stop the watcher209 # If the lifecycle is not running, then stop the watcher
154 if not self._running:210 if not self._running:
155 self._log.debug("stop service-rel watcher, discarding changes")211 self._log.debug("stop service-rel watcher, discarding changes")
156 self._watching = False212 self._watching_relation_memberships = False
157 raise StopWatcher()213 raise StopWatcher()
158214
159 self._log.debug("processing relations changed")215 self._log.debug("processing relations changed")
@@ -163,9 +219,9 @@
163219
164 @inlineCallbacks220 @inlineCallbacks
165 def _process_service_changes(self, old_relations, new_relations):221 def _process_service_changes(self, old_relations, new_relations):
166 """Add and remove unit lifecycles per the service relations changes.222 """Add and remove unit lifecycles per the service relations Determine.
167 """223 """
168 # Determine relation delta of global zk state with our memory state.224 # changes relation delta of global zk state with our memory state.
169 new_relations = dict([(service_relation.internal_relation_id,225 new_relations = dict([(service_relation.internal_relation_id,
170 service_relation) for226 service_relation) for
171 service_relation in new_relations])227 service_relation in new_relations])
@@ -352,8 +408,11 @@
352 self._error_handler = handler408 self._error_handler = handler
353409
354 @inlineCallbacks410 @inlineCallbacks
355 def start(self):411 def start(self, watches=True):
356 """Start watching related units and executing change hooks.412 """Start watching related units and executing change hooks.
413
414 @param watches: boolean parameter denoting if relation watches
415 should be started.
357 """416 """
358 yield self._run_lock.acquire()417 yield self._run_lock.acquire()
359 try:418 try:
@@ -364,19 +423,24 @@
364 self._watcher = yield self._unit_relation.watch_related_units(423 self._watcher = yield self._unit_relation.watch_related_units(
365 self._scheduler.notify_change)424 self._scheduler.notify_change)
366 # And start the watcher.425 # And start the watcher.
367 yield self._watcher.start()426 if watches:
427 yield self._watcher.start()
368 finally:428 finally:
369 self._run_lock.release()429 self._run_lock.release()
370 self._log.debug(430 self._log.debug(
371 "started relation:%s lifecycle", self._relation_name)431 "started relation:%s lifecycle", self._relation_name)
372432
373 @inlineCallbacks433 @inlineCallbacks
374 def stop(self):434 def stop(self, watches=True):
375 """Stop watching changes and stop executing relation change hooks.435 """Stop watching changes and stop executing relation change hooks.
436
437 @param watches: boolean parameter denoting if relation watches
438 should be stopped.
376 """439 """
377 yield self._run_lock.acquire()440 yield self._run_lock.acquire()
378 try:441 try:
379 self._watcher.stop()442 if watches and self._watcher:
443 self._watcher.stop()
380 self._scheduler.stop()444 self._scheduler.stop()
381 finally:445 finally:
382 yield self._run_lock.release()446 yield self._run_lock.release()
383447
=== modified file 'ensemble/unit/tests/test_lifecycle.py'
--- ensemble/unit/tests/test_lifecycle.py 2011-05-05 09:41:11 +0000
+++ ensemble/unit/tests/test_lifecycle.py 2011-05-05 17:43:23 +0000
@@ -12,6 +12,8 @@
12from ensemble.unit.lifecycle import (12from ensemble.unit.lifecycle import (
13 UnitLifecycle, UnitRelationLifecycle, RelationInvoker)13 UnitLifecycle, UnitRelationLifecycle, RelationInvoker)
1414
15from ensemble.unit.workflow import RelationWorkflowState
16
15from ensemble.hooks.invoker import Invoker17from ensemble.hooks.invoker import Invoker
16from ensemble.hooks.executor import HookExecutor18from ensemble.hooks.executor import HookExecutor
1719
@@ -19,9 +21,11 @@
1921
20from ensemble.state.endpoint import RelationEndpoint22from ensemble.state.endpoint import RelationEndpoint
21from ensemble.state.relation import ClientServerUnitWatcher23from ensemble.state.relation import ClientServerUnitWatcher
24from ensemble.state.service import NO_HOOKS
22from ensemble.state.tests.test_relation import RelationTestBase25from ensemble.state.tests.test_relation import RelationTestBase
23from ensemble.state.hook import RelationChange26from ensemble.state.hook import RelationChange
2427
28
25from ensemble.lib.testing import TestCase29from ensemble.lib.testing import TestCase
26from ensemble.lib.mocker import MATCH30from ensemble.lib.mocker import MATCH
2731
@@ -97,6 +101,8 @@
97 results.append(hook_name)101 results.append(hook_name)
98 if debug:102 if debug:
99 print "-> exec hook", hook_name103 print "-> exec hook", hook_name
104 if d.called:
105 return
100 if results == sequence:106 if results == sequence:
101 d.callback(True)107 d.callback(True)
102 if hook_name == name and count is None:108 if hook_name == name and count is None:
@@ -145,6 +151,182 @@
145 return output151 return output
146152
147153
154class LifecycleResolvedTest(LifecycleTestBase):
155
156 @inlineCallbacks
157 def setUp(self):
158 yield super(LifecycleResolvedTest, self).setUp()
159 yield self.setup_default_test_relation()
160 self.lifecycle = UnitLifecycle(
161 self.client, self.states["unit"], self.states["service"],
162 self.unit_directory, self.executor)
163
164 def get_unit_relation_workflow(self, states):
165 state_dir = os.path.join(self.ensemble_directory, "state")
166 lifecycle = UnitRelationLifecycle(
167 self.client,
168 states["unit_relation"],
169 states["service_relation"].relation_name,
170 self.unit_directory,
171 self.executor)
172
173 workflow = RelationWorkflowState(
174 self.client,
175 states["unit_relation"],
176 lifecycle,
177 state_dir)
178
179 return (workflow, lifecycle)
180
181 @inlineCallbacks
182 def test_resolved_relation_watch_unit_lifecycle_not_running(self):
183 """If the unit is not running then no relation resolving is performed.
184 However the resolution value remains the same.
185 """
186 # Start the unit.
187 yield self.lifecycle.start()
188
189 # Wait for the relation to be started.... TODO: async background work
190 yield self.sleep(0.1)
191
192 # Simulate relation down on an individual unit relation
193 workflow = self.lifecycle.get_relation_workflow(
194 self.states["unit_relation"].internal_relation_id)
195 self.assertEqual("up", (yield workflow.get_state()))
196
197 yield workflow.transition_state("down")
198 resolved = self.wait_on_state(workflow, "up")
199
200 # Stop the unit lifecycle
201 yield self.lifecycle.stop()
202
203 # Set the relation to resolved
204 yield self.states["unit"].set_relation_resolved(
205 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
206
207 # Give a moment for the watch to fire erroneously
208 yield self.sleep(0.2)
209
210 # Ensure we didn't attempt a transition.
211 self.assertFalse(resolved.called)
212 self.assertEqual(
213 {self.states["unit_relation"].internal_relation_id: NO_HOOKS},
214 (yield self.states["unit"].get_relation_resolved()))
215
216 # If the unit is restarted start, we currently have the
217 # behavior that the unit relation workflow will automatically
218 # be transitioned back to running, as part of the normal state
219 # transition. Sigh.. we should have a separate error
220 # state for relation hooks then down with state variable usage.
221
222 @inlineCallbacks
223 def test_resolved_relation_watch_relation_up(self):
224 """If a relation marked as to be resolved is already running,
225 then no work is performed.
226 """
227 # Start the unit.
228 yield self.lifecycle.start()
229
230 # Wait for the relation to be started.... TODO: async background work
231 yield self.sleep(0.1)
232
233 # get a hold of the unit relation and verify state
234 workflow = self.lifecycle.get_relation_workflow(
235 self.states["unit_relation"].internal_relation_id)
236 self.assertEqual("up", (yield workflow.get_state()))
237
238 # Set the relation to resolved
239 yield self.states["unit"].set_relation_resolved(
240 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
241
242 # Give a moment for the async background work.
243 yield self.sleep(0.1)
244
245 # Ensure we're still up and the relation resolved setting has been
246 # cleared.
247 self.assertEqual(
248 None, (yield self.states["unit"].get_relation_resolved()))
249 self.assertEqual("up", (yield workflow.get_state()))
250
251 @inlineCallbacks
252 def test_resolved_relation_watch_from_error(self):
253 """Unit lifecycle's will process a unit relation resolved
254 setting, and transition a down relation back to a running
255 state.
256 """
257 log_output = self.capture_logging(
258 "unit.lifecycle", level=logging.DEBUG)
259
260 # Start the unit.
261 yield self.lifecycle.start()
262
263 # Wait for the relation to be started... TODO: async background work
264 yield self.sleep(0.1)
265
266 # Simulate an error condition
267 workflow = self.lifecycle.get_relation_workflow(
268 self.states["unit_relation"].internal_relation_id)
269 self.assertEqual("up", (yield workflow.get_state()))
270 yield workflow.fire_transition("error")
271
272 resolved = self.wait_on_state(workflow, "up")
273
274 # Set the relation to resolved
275 yield self.states["unit"].set_relation_resolved(
276 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
277
278 # Wait for the relation to come back up
279 value = yield self.states["unit"].get_relation_resolved()
280
281 yield resolved
282
283 # Verify state
284 value = yield workflow.get_state()
285 self.assertEqual(value, "up")
286
287 self.assertIn(
288 "processing relation resolved changed", log_output.getvalue())
289
290 @inlineCallbacks
291 def test_resolved_relation_watch(self):
292 """Unit lifecycle's will process a unit relation resolved
293 setting, and transition a down relation back to a running
294 state.
295 """
296 log_output = self.capture_logging(
297 "unit.lifecycle", level=logging.DEBUG)
298
299 # Start the unit.
300 yield self.lifecycle.start()
301
302 # Wait for the relation to be started... TODO: async background work
303 yield self.sleep(0.1)
304
305 # Simulate an error condition
306 workflow = self.lifecycle.get_relation_workflow(
307 self.states["unit_relation"].internal_relation_id)
308 self.assertEqual("up", (yield workflow.get_state()))
309 yield workflow.transition_state("down")
310
311 resolved = self.wait_on_state(workflow, "up")
312
313 # Set the relation to resolved
314 yield self.states["unit"].set_relation_resolved(
315 {self.states["unit_relation"].internal_relation_id: NO_HOOKS})
316
317 # Wait for the relation to come back up
318 value = yield self.states["unit"].get_relation_resolved()
319
320 yield resolved
321
322 # Verify state
323 value = yield workflow.get_state()
324 self.assertEqual(value, "up")
325
326 self.assertIn(
327 "processing relation resolved changed", log_output.getvalue())
328
329
148class UnitLifecycleTest(LifecycleTestBase):330class UnitLifecycleTest(LifecycleTestBase):
149331
150 @inlineCallbacks332 @inlineCallbacks
@@ -187,6 +369,45 @@
187 # verify the sockets are cleaned up.369 # verify the sockets are cleaned up.
188 self.assertEqual(os.listdir(self.unit_directory), ["formula"])370 self.assertEqual(os.listdir(self.unit_directory), ["formula"])
189371
372 @inlineCallbacks
373 def test_start_sans_hook(self):
374 """The lifecycle start can be invoked without firing hooks."""
375 self.write_hook("start", "#!/bin/sh\n exit 1")
376 start_executed = self.wait_on_hook("start")
377 yield self.lifecycle.start(fire_hooks=False)
378 # Wait for unit relation background processing....
379 yield self.sleep(0.1)
380 self.assertFalse(start_executed.called)
381
382 @inlineCallbacks
383 def test_stop_sans_hook(self):
384 """The lifecycle stop can be invoked without firing hooks."""
385 self.write_hook("stop", "#!/bin/sh\n exit 1")
386 stop_executed = self.wait_on_hook("stop")
387 yield self.lifecycle.start()
388 yield self.lifecycle.stop(fire_hooks=False)
389 # Wait for unit relation background processing....
390 yield self.sleep(0.1)
391 self.assertFalse(stop_executed.called)
392
393 @inlineCallbacks
394 def test_install_sans_hook(self):
395 """The lifecycle install can be invoked without firing hooks."""
396 self.write_hook("install", "#!/bin/sh\n exit 1")
397 install_executed = self.wait_on_hook("install")
398 yield self.lifecycle.install(fire_hooks=False)
399 self.assertFalse(install_executed.called)
400
401 @inlineCallbacks
402 def test_upgrade_sans_hook(self):
403 """The lifecycle upgrade can be invoked without firing hooks."""
404 self.executor.stop()
405 self.write_hook("upgrade-formula", "#!/bin/sh\n exit 1")
406 upgrade_executed = self.wait_on_hook("upgrade-formula")
407 yield self.lifecycle.upgrade_formula(fire_hooks=False)
408 self.assertFalse(upgrade_executed.called)
409 self.assertTrue(self.executor.running)
410
190 def test_hook_error(self):411 def test_hook_error(self):
191 """Verify hook execution error, raises an exception."""412 """Verify hook execution error, raises an exception."""
192 self.write_hook("install", '#!/bin/sh\n exit 1')413 self.write_hook("install", '#!/bin/sh\n exit 1')
@@ -196,14 +417,12 @@
196 def test_hook_not_executable(self):417 def test_hook_not_executable(self):
197 """A hook not executable, raises an exception."""418 """A hook not executable, raises an exception."""
198 self.write_hook("install", '#!/bin/sh\n exit 0', no_exec=True)419 self.write_hook("install", '#!/bin/sh\n exit 0', no_exec=True)
199 # It would be preferrable if this was also a formulainvocation error.
200 return self.failUnlessFailure(420 return self.failUnlessFailure(
201 self.lifecycle.install(), FormulaError)421 self.lifecycle.install(), FormulaError)
202422
203 def test_hook_not_formatted_correctly(self):423 def test_hook_not_formatted_correctly(self):
204 """Hook execution error, raises an exception."""424 """Hook execution error, raises an exception."""
205 self.write_hook("install", '!/bin/sh\n exit 0')425 self.write_hook("install", '!/bin/sh\n exit 0')
206 # It would be preferrable if this was also a formulainvocation error.
207 return self.failUnlessFailure(426 return self.failUnlessFailure(
208 self.lifecycle.install(), FormulaInvocationError)427 self.lifecycle.install(), FormulaInvocationError)
209428
@@ -532,7 +751,7 @@
532 @inlineCallbacks751 @inlineCallbacks
533 def test_initial_start_lifecycle_no_related_no_exec(self):752 def test_initial_start_lifecycle_no_related_no_exec(self):
534 """753 """
535 If there are no related units on startup, the relation changed hook754 If there are no related units on startup, the relation joined hook
536 is not invoked.755 is not invoked.
537 """756 """
538 file_path = self.makeFile()757 file_path = self.makeFile()
@@ -545,6 +764,26 @@
545 self.assertFalse(os.path.exists(file_path))764 self.assertFalse(os.path.exists(file_path))
546765
547 @inlineCallbacks766 @inlineCallbacks
767 def test_stop_can_continue_watching(self):
768 """
769 """
770 file_path = self.makeFile()
771 self.write_hook(
772 "%s-relation-changed" % self.relation_name,
773 ("#!/bin/bash\n" "echo executed >> %s\n" % file_path))
774 rel_states = yield self.add_opposite_service_unit(self.states)
775 yield self.lifecycle.start()
776 yield self.wait_on_hook(
777 sequence=["app-relation-joined", "app-relation-changed"])
778 changed_executed = self.wait_on_hook("app-relation-changed")
779 yield self.lifecycle.stop(watches=False)
780 rel_states["unit_relation"].set_data(yaml.dump(dict(hello="world")))
781 yield self.sleep(0.1)
782 self.assertFalse(changed_executed.called)
783 yield self.lifecycle.start(watches=False)
784 yield changed_executed
785
786 @inlineCallbacks
548 def test_initial_start_lifecycle_with_related(self):787 def test_initial_start_lifecycle_with_related(self):
549 """788 """
550 If there are related units on startup, the relation changed hook789 If there are related units on startup, the relation changed hook
551790
=== modified file 'ensemble/unit/tests/test_workflow.py'
--- ensemble/unit/tests/test_workflow.py 2011-05-05 14:31:43 +0000
+++ ensemble/unit/tests/test_workflow.py 2011-05-05 17:43:23 +0000
@@ -83,11 +83,30 @@
83 self.assertFalse(result)83 self.assertFalse(result)
84 current_state = yield self.workflow.get_state()84 current_state = yield self.workflow.get_state()
85 yield self.assertEqual(current_state, "install_error")85 yield self.assertEqual(current_state, "install_error")
86 self.write_hook("install", "#!/bin/bash\necho hello\n")
87 result = yield self.workflow.fire_transition("retry_install")86 result = yield self.workflow.fire_transition("retry_install")
88 yield self.assertState(self.workflow, "installed")87 yield self.assertState(self.workflow, "installed")
8988
90 @inlineCallbacks89 @inlineCallbacks
90 def test_install_error_with_retry_hook(self):
91 """If the install hook fails, the workflow is transition to the
92 install_error state.
93 """
94 self.write_hook("install", "#!/bin/bash\nexit 1")
95 result = yield self.workflow.fire_transition("install")
96 self.assertFalse(result)
97 current_state = yield self.workflow.get_state()
98 yield self.assertEqual(current_state, "install_error")
99
100 result = yield self.workflow.fire_transition("retry_install_hook")
101 yield self.assertState(self.workflow, "install_error")
102
103 self.write_hook("install", "#!/bin/bash\necho hello\n")
104 hook_deferred = self.wait_on_hook("install")
105 result = yield self.workflow.fire_transition_alias("retry_hook")
106 yield hook_deferred
107 yield self.assertState(self.workflow, "installed")
108
109 @inlineCallbacks
91 def test_start(self):110 def test_start(self):
92 file_path = self.makeFile()111 file_path = self.makeFile()
93 self.write_hook(112 self.write_hook(
@@ -131,12 +150,41 @@
131 current_state = yield self.workflow.get_state()150 current_state = yield self.workflow.get_state()
132 self.assertEqual(current_state, "start_error")151 self.assertEqual(current_state, "start_error")
133152
134 self.write_hook("start", "#!/bin/bash\necho hello\n")
135 result = yield self.workflow.fire_transition("retry_start")153 result = yield self.workflow.fire_transition("retry_start")
136 yield self.assertState(self.workflow, "started")154 yield self.assertState(self.workflow, "started")
137 # If we don't stop, we'll end up with the relation lifecycle155
138 # watches firing in the background when the test stops.156 # If we don't stop, we'll end up with the relation lifecycle
139 self.write_hook("stop", "#!/bin/bash\necho hello\n")157 # watches firing in the background when the test stops.
158 result = yield self.workflow.fire_transition("stop")
159 yield self.assertState(self.workflow, "stopped")
160
161 @inlineCallbacks
162 def test_start_error_with_retry_hook(self):
163 """Executing the start transition with a hook error, results in the
164 workflow going to the start_error state. The start can be retried.
165 """
166 self.write_hook("install", "#!/bin/bash\necho hello\n")
167 result = yield self.workflow.fire_transition("install")
168 self.assertTrue(result)
169 self.write_hook("start", "#!/bin/bash\nexit 1")
170 result = yield self.workflow.fire_transition("start")
171 self.assertFalse(result)
172 current_state = yield self.workflow.get_state()
173 self.assertEqual(current_state, "start_error")
174
175 hook_deferred = self.wait_on_hook("start")
176 result = yield self.workflow.fire_transition("retry_start_hook")
177 yield hook_deferred
178 yield self.assertState(self.workflow, "start_error")
179
180 self.write_hook("start", "#!/bin/bash\nexit 0")
181 hook_deferred = self.wait_on_hook("start")
182 result = yield self.workflow.fire_transition_alias("retry_hook")
183 yield hook_deferred
184 yield self.assertState(self.workflow, "started")
185
186 # If we don't stop, we'll end up with the relation lifecycle
187 # watches firing in the background when the test stops.
140 result = yield self.workflow.fire_transition("stop")188 result = yield self.workflow.fire_transition("stop")
141 yield self.assertState(self.workflow, "stopped")189 yield self.assertState(self.workflow, "stopped")
142190
@@ -193,13 +241,49 @@
193 yield self.assertState(self.workflow, "configure_error")241 yield self.assertState(self.workflow, "configure_error")
194242
195 # Verify recovery from error state243 # Verify recovery from error state
244 result = yield self.workflow.fire_transition_alias("retry")
245 self.assertTrue(result)
246 yield self.assertState(self.workflow, "started")
247
248 # Stop any background processing
249 yield self.workflow.fire_transition("stop")
250
251 @inlineCallbacks
252 def test_configure_error_and_retry_hook(self):
253 """An error while configuring, transitions the unit and
254 stops the lifecycle."""
255 #self.capture_output()
256 yield self.workflow.fire_transition("install")
257 result = yield self.workflow.fire_transition("start")
258 self.assertTrue(result)
259 self.assertState(self.workflow, "started")
260
261 # Verify transition to error state
262 hook_deferred = self.wait_on_hook("config-changed")
263 self.write_hook("config-changed", "#!/bin/bash\nexit 1")
264 result = yield self.workflow.fire_transition("reconfigure")
265 yield hook_deferred
266 self.assertFalse(result)
267 yield self.assertState(self.workflow, "configure_error")
268
269 # Verify retry hook with hook error stays in error state
270 hook_deferred = self.wait_on_hook("config-changed")
271 result = yield self.workflow.fire_transition("retry_configure_hook")
272
273 self.assertFalse(result)
274 yield hook_deferred
275 yield self.assertState(self.workflow, "configure_error")
276
196 hook_deferred = self.wait_on_hook("config-changed")277 hook_deferred = self.wait_on_hook("config-changed")
197 self.write_hook("config-changed", "#!/bin/bash\nexit 0")278 self.write_hook("config-changed", "#!/bin/bash\nexit 0")
198 result = yield self.workflow.fire_transition("retry_configure")279 result = yield self.workflow.fire_transition_alias("retry_hook")
199 yield hook_deferred280 yield hook_deferred
200 self.assertTrue(result)
201 yield self.assertState(self.workflow, "started")281 yield self.assertState(self.workflow, "started")
202282
283 # Stop any background processing
284 yield self.workflow.fire_transition("stop")
285 yield self.sleep(0.1)
286
203 @inlineCallbacks287 @inlineCallbacks
204 def test_upgrade(self):288 def test_upgrade(self):
205 """Upgrading a workflow results in the upgrade hook being289 """Upgrading a workflow results in the upgrade hook being
@@ -235,7 +319,7 @@
235 yield self.workflow.fire_transition("stop")319 yield self.workflow.fire_transition("stop")
236320
237 @inlineCallbacks321 @inlineCallbacks
238 def test_upgrade_error_state(self):322 def test_upgrade_error_retry(self):
239 """A hook error during an upgrade transitions to323 """A hook error during an upgrade transitions to
240 upgrade_error.324 upgrade_error.
241 """325 """
@@ -259,6 +343,41 @@
259 yield self.workflow.fire_transition("retry_upgrade_formula")343 yield self.workflow.fire_transition("retry_upgrade_formula")
260 current_state = yield self.workflow.get_state()344 current_state = yield self.workflow.get_state()
261 self.assertEqual(current_state, "started")345 self.assertEqual(current_state, "started")
346
347 # Stop any background activity
348 yield self.workflow.fire_transition("stop")
349
350 @inlineCallbacks
351 def test_upgrade_error_retry_hook(self):
352 """A hook error during an upgrade transitions to
353 upgrade_error, and can be re-tried with hook execution.
354 """
355 yield self.workflow.fire_transition("install")
356 yield self.workflow.fire_transition("start")
357 current_state = yield self.workflow.get_state()
358 self.assertEqual(current_state, "started")
359
360 # Agent prepares this.
361 self.executor.stop()
362
363 self.write_hook("upgrade-formula", "#!/bin/bash\nexit 1")
364 hook_deferred = self.wait_on_hook("upgrade-formula")
365 yield self.workflow.fire_transition("upgrade_formula")
366 yield hook_deferred
367 current_state = yield self.workflow.get_state()
368 self.assertEqual(current_state, "formula_upgrade_error")
369
370 hook_deferred = self.wait_on_hook("upgrade-formula")
371 self.write_hook("upgrade-formula", "#!/bin/bash\nexit 0")
372 # The upgrade error hook should ensure that the executor is stoppped.
373 self.assertFalse(self.executor.running)
374 yield self.workflow.fire_transition_alias("retry_hook")
375 yield hook_deferred
376 current_state = yield self.workflow.get_state()
377 self.assertEqual(current_state, "started")
378 self.assertTrue(self.executor.running)
379
380 # Stop any background activity
262 yield self.workflow.fire_transition("stop")381 yield self.workflow.fire_transition("stop")
263382
264 @inlineCallbacks383 @inlineCallbacks
@@ -301,13 +420,36 @@
301 self.assertTrue(result)420 self.assertTrue(result)
302 result = yield self.workflow.fire_transition("start")421 result = yield self.workflow.fire_transition("start")
303 self.assertTrue(result)422 self.assertTrue(result)
423
304 self.write_hook("stop", "#!/bin/bash\nexit 1")424 self.write_hook("stop", "#!/bin/bash\nexit 1")
305 result = yield self.workflow.fire_transition("stop")425 result = yield self.workflow.fire_transition("stop")
306 self.assertFalse(result)426 self.assertFalse(result)
307 current_state = yield self.workflow.get_state()427
308 self.assertEqual(current_state, "stop_error")428 yield self.assertState(self.workflow, "stop_error")
309 self.write_hook("stop", "#!/bin/bash\necho hello\n")429 self.write_hook("stop", "#!/bin/bash\necho hello\n")
310 result = yield self.workflow.fire_transition("retry_stop")430 result = yield self.workflow.fire_transition("retry_stop")
431
432 yield self.assertState(self.workflow, "stopped")
433
434 @inlineCallbacks
435 def test_stop_error_with_retry_hook(self):
436 self.write_hook("install", "#!/bin/bash\necho hello\n")
437 self.write_hook("start", "#!/bin/bash\necho hello\n")
438 result = yield self.workflow.fire_transition("install")
439 self.assertTrue(result)
440 result = yield self.workflow.fire_transition("start")
441 self.assertTrue(result)
442
443 self.write_hook("stop", "#!/bin/bash\nexit 1")
444 result = yield self.workflow.fire_transition("stop")
445 self.assertFalse(result)
446 yield self.assertState(self.workflow, "stop_error")
447
448 result = yield self.workflow.fire_transition_alias("retry_hook")
449 yield self.assertState(self.workflow, "stop_error")
450
451 self.write_hook("stop", "#!/bin/bash\nexit 0")
452 result = yield self.workflow.fire_transition_alias("retry_hook")
311 yield self.assertState(self.workflow, "stopped")453 yield self.assertState(self.workflow, "stopped")
312454
313 @inlineCallbacks455 @inlineCallbacks
@@ -447,7 +589,7 @@
447 # Add a new unit, and wait for the broken hook to result in589 # Add a new unit, and wait for the broken hook to result in
448 # the transition to the down state.590 # the transition to the down state.
449 yield self.add_opposite_service_unit(self.states)591 yield self.add_opposite_service_unit(self.states)
450 yield self.wait_on_state(self.workflow, "down")592 yield self.wait_on_state(self.workflow, "error")
451593
452 f_state, history, zk_state = yield self.read_persistent_state(594 f_state, history, zk_state = yield self.read_persistent_state(
453 history_id=self.workflow.zk_state_id)595 history_id=self.workflow.zk_state_id)
@@ -458,7 +600,7 @@
458 "formula", "hooks", "app-relation-changed"))600 "formula", "hooks", "app-relation-changed"))
459601
460 self.assertEqual(f_state,602 self.assertEqual(f_state,
461 {"state": "down",603 {"state": "error",
462 "state_variables": {604 "state_variables": {
463 "change_type": "joined",605 "change_type": "joined",
464 "error_message": error}})606 "error_message": error}})
465607
=== modified file 'ensemble/unit/workflow.py'
--- ensemble/unit/workflow.py 2011-05-05 14:31:43 +0000
+++ ensemble/unit/workflow.py 2011-05-05 17:43:23 +0000
@@ -14,31 +14,56 @@
1414
1515
16UnitWorkflow = Workflow(16UnitWorkflow = Workflow(
17 # Install transitions
17 Transition("install", "Install", None, "installed",18 Transition("install", "Install", None, "installed",
18 error_transition_id="error_install"),19 error_transition_id="error_install"),
19 Transition("error_install", "Install Error", None, "install_error"),20 Transition("error_install", "Install error", None, "install_error"),
20 Transition("retry_install", "Retry Install", "install_error", "installed"),21
22 Transition("retry_install", "Retry install", "install_error", "installed",
23 alias="retry"),
24 Transition("retry_install_hook", "Retry install with hook",
25 "install_error", "installed", alias="retry_hook"),
26
27 # Start transitions
21 Transition("start", "Start", "installed", "started",28 Transition("start", "Start", "installed", "started",
22 error_transition_id="error_start"),29 error_transition_id="error_start"),
23 Transition("error_start", "Start Error", "installed", "start_error"),30 Transition("error_start", "Start error", "installed", "start_error"),
24 Transition("retry_start", "Retry Start", "start_error", "started"),31 Transition("retry_start", "Retry start", "start_error", "started",
32 alias="retry"),
33 Transition("retry_start_hook", "Retry start with hook",
34 "start_error", "started", alias="retry_hook"),
35
36 # Stop transitions
25 Transition("stop", "Stop", "started", "stopped",37 Transition("stop", "Stop", "started", "stopped",
26 error_transition_id="error_stop"),38 error_transition_id="error_stop"),
27 Transition("error_stop", "Stop Error", "started", "stop_error"),39 Transition("error_stop", "Stop error", "started", "stop_error"),
28 Transition("retry_stop", "Retry Stop", "stop_error", "stopped"),40 Transition("retry_stop", "Retry stop", "stop_error", "stopped",
2941 alias="retry"),
30 # Upgrade Transitions (stay in state, with success transitition)42 Transition("retry_stop_hook", "Retry stop with hook",
43 "stop_error", "stopped", alias="retry_hook"),
44
45 # Restart transitions
46 Transition("restart", "Restart", "stop", "start",
47 error_transition_id="error_start", alias="retry"),
48 Transition("restart_with_hook", "Restart with hook",
49 "stop", "start", alias="retry_hook",
50 error_transition_id="error_start"),
51
52 # Upgrade transitions
31 Transition(53 Transition(
32 "upgrade_formula", "Upgrade", "started", "started",54 "upgrade_formula", "Upgrade", "started", "started",
33 error_transition_id="upgrade_formula_error"),55 error_transition_id="upgrade_formula_error"),
34 Transition(56 Transition(
35 "upgrade_formula_error", "On upgrade error",57 "upgrade_formula_error", "Upgrade from stop error",
36 "started", "formula_upgrade_error"),58 "started", "formula_upgrade_error"),
37 Transition(59 Transition(
38 "retry_upgrade_formula", "Retry failed upgrade",60 "retry_upgrade_formula", "Upgrade from stop error",
39 "formula_upgrade_error", "started"),61 "formula_upgrade_error", "started", alias="retry"),
62 Transition(
63 "retry_upgrade_formula_hook", "Upgrade from stop error with hook",
64 "formula_upgrade_error", "started", alias="retry_hook"),
4065
41 # Configuration Transitions (stay in state, with success transition)66 # Configuration Transitions
42 Transition(67 Transition(
43 "reconfigure", "Reconfigure", "started", "started",68 "reconfigure", "Reconfigure", "started", "started",
44 error_transition_id="error_configure"),69 error_transition_id="error_configure"),
@@ -46,28 +71,56 @@
46 "error_configure", "On configure error",71 "error_configure", "On configure error",
47 "started", "configure_error"),72 "started", "configure_error"),
48 Transition(73 Transition(
74 "retry_error", "On retry configure error",
75 "configure_error", "configure_error"),
76 Transition(
49 "retry_configure", "Retry configure",77 "retry_configure", "Retry configure",
50 "configure_error", "started")78 "configure_error", "started", alias="retry",
79 error_transition_id="retry_error"),
80 Transition(
81 "retry_configure_hook", "Retry configure with hooks",
82 "configure_error", "started", alias="retry_hook",
83 error_transition_id="retry_error")
51 )84 )
5285
5386
54# There's been some discussion, if we should have per change type error states87# Unit relation error states
55# here, corresponding to the different changes that the relation-changed hook88#
56# is invoked for. The important aspects to capture are both observability of89# There's been some discussion, if we should have per change type
57# error type locally and globally (zk), and per error type and instance90# error states here, corresponding to the different changes that the
58# recovery of the same. To provide for this functionality without additional91# relation-changed hook is invoked for. The important aspects to
59# states, the error information (change type, and error message) are captured92# capture are both observability of error type locally and globally
60# in state variables which are locally and globally observable. Future93# (zk), and per error type and instance recovery of the same. To
61# extension of the restart transition action, will allow for customized94# provide for this functionality without additional states, the error
62# recovery based on the change type state variable. Effectively this95# information (change type, and error message) are captured in state
63# differs from the unit definition, in that it collapses three possible error96# variables which are locally and globally observable. Future
64# states, into a behavior off switch. A separate state will be needed to97# extension of the restart transition action, will allow for
65# denote departing.98# customized recovery based on the change type state
99# variable. Effectively this differs from the unit definition, in that
100# it collapses three possible error states, into a behavior off
101# switch. A separate state will be needed to denote departing.
102
103
104# Process recovery using on disk workflow state
105#
106# Another interesting issue, process recovery using the on disk state,
107# is complicated by consistency to the the in memory state, which
108# won't be directly recoverable anymore without some state specific
109# semantics to recovering from on disk state, ie a restarted unit
110# agent, with a relation in an error state would require special
111# semantics around loading from disk to ensure that the in-memory
112# process state (watching and scheduling but not executing) matches
113# the recovery transition actions (which just restart hook execution,
114# but assume the watch continues).. this functionality added to better
115# allow for the behavior that while down due to a hook error, the
116# relation would continues to schedule pending hooks
66117
67RelationWorkflow = Workflow(118RelationWorkflow = Workflow(
68 Transition("start", "Start", None, "up"),119 Transition("start", "Start", None, "up"),
69 Transition("stop", "Stop", "up", "down"),120 Transition("stop", "Stop", "up", "down"),
70 Transition("restart", "Restart", "down", "up"),121 Transition("restart", "Restart", "down", "up", alias="retry"),
122 Transition("error", "Relation hook error", "up", "error"),
123 Transition("reset", "Recover from hook error", "error", "up"),
71 Transition("depart", "Relation broken", "up", "departed"),124 Transition("depart", "Relation broken", "up", "departed"),
72 Transition("down_depart", "Relation broken", "down", "departed"),125 Transition("down_depart", "Relation broken", "down", "departed"),
73 )126 )
@@ -223,7 +276,6 @@
223 row per entry with CSV escaping.276 row per entry with CSV escaping.
224 """277 """
225 state_serialized = yaml.safe_dump(state_dict)278 state_serialized = yaml.safe_dump(state_dict)
226
227 # State File279 # State File
228 with open(self.state_file_path, "w") as handle:280 with open(self.state_file_path, "w") as handle:
229 handle.write(state_serialized)281 handle.write(state_serialized)
@@ -243,6 +295,7 @@
243 return {"state": None}295 return {"state": None}
244 with open(self.state_file_path, "r") as handle:296 with open(self.state_file_path, "r") as handle:
245 content = handle.read()297 content = handle.read()
298
246 return yaml.load(content)299 return yaml.load(content)
247300
248301
@@ -282,51 +335,77 @@
282 self._lifecycle = lifecycle335 self._lifecycle = lifecycle
283336
284 @inlineCallbacks337 @inlineCallbacks
285 def _invoke_lifecycle(self, method):338 def _invoke_lifecycle(self, method, *args, **kw):
286 try:339 try:
287 result = yield method()340 result = yield method(*args, **kw)
288 except (FileNotFound, FormulaError, FormulaInvocationError), e:341 except (FileNotFound, FormulaError, FormulaInvocationError), e:
289 raise TransitionError(e)342 raise TransitionError(e)
290 returnValue(result)343 returnValue(result)
291344
292 # Transition Actions345 # Install transitions
293 def do_install(self):346 def do_install(self):
294 return self._invoke_lifecycle(self._lifecycle.install)347 return self._invoke_lifecycle(self._lifecycle.install)
295348
349 def do_retry_install(self):
350 return self._invoke_lifecycle(self._lifecycle.install,
351 fire_hooks=False)
352
353 def do_retry_install_hook(self):
354 return self._invoke_lifecycle(self._lifecycle.install)
355
356 # Start transitions
296 def do_start(self):357 def do_start(self):
297 return self._invoke_lifecycle(self._lifecycle.start)358 return self._invoke_lifecycle(self._lifecycle.start)
298359
299 def do_stop(self):
300 return self._invoke_lifecycle(self._lifecycle.stop)
301
302 def do_retry_start(self):360 def do_retry_start(self):
361 return self._invoke_lifecycle(self._lifecycle.start,
362 fire_hooks=False)
363
364 def do_retry_start_hook(self):
303 return self._invoke_lifecycle(self._lifecycle.start)365 return self._invoke_lifecycle(self._lifecycle.start)
304366
367 # Stop transitions
368 def do_stop(self):
369 return self._invoke_lifecycle(self._lifecycle.stop)
370
305 def do_retry_stop(self):371 def do_retry_stop(self):
306 self._invoke_lifecycle(self._lifecycle.stop)372 return self._invoke_lifecycle(self._lifecycle.stop,
307373 fire_hooks=False)
308 def do_retry_install(self):374
309 return self._invoke_lifecycle(self._lifecycle.install)375 def do_retry_stop_hook(self):
376 return self._invoke_lifecycle(self._lifecycle.stop)
377
378 # Upgrade transititions
379 def do_upgrade_formula(self):
380 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)
310381
311 def do_retry_upgrade_formula(self):382 def do_retry_upgrade_formula(self):
312 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)383 return self._invoke_lifecycle(self._lifecycle.upgrade_formula,
313384 fire_hooks=False)
314 def do_upgrade_formula(self):385
315 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)386 def do_retry_upgrade_formula_hook(self):
316387 return self._invoke_lifecycle(self._lifecycle.upgrade_formula)
317 # Some of this needs support from the resolved branches, as we388
318 # want to fire some of these lifecycle methods sans hooks.389 # Config transitions
319 def do_error_configure(self):390 def do_error_configure(self):
320 # self._invoke_lifecycle(self._lifecycle.stop, fire_hooks=False)391 return self._invoke_lifecycle(self._lifecycle.stop, fire_hooks=False)
321 pass
322392
323 def do_reconfigure(self):393 def do_reconfigure(self):
324 return self._invoke_lifecycle(self._lifecycle.configure)394 return self._invoke_lifecycle(self._lifecycle.configure)
325395
396 def do_retry_error(self):
397 return self._invoke_lifecycle(self._lifecycle.stop, fire_hooks=False)
398
399 @inlineCallbacks
326 def do_retry_configure(self):400 def do_retry_configure(self):
327 # self._invoke_lifecycle(self._lifecycle.start, fire_hooks=False)401 yield self._invoke_lifecycle(self._lifecycle.start, fire_hooks=False)
328 self._invoke_lifecycle(402 yield self._invoke_lifecycle(self._lifecycle.configure,
329 self._lifecycle.configure) # fire_hooks=False)403 fire_hooks=False)
404
405 @inlineCallbacks
406 def do_retry_configure_hook(self):
407 yield self._invoke_lifecycle(self._lifecycle.start, fire_hooks=False)
408 yield self._invoke_lifecycle(self._lifecycle.configure)
330409
331410
332class RelationWorkflowState(DiskWorkflowState):411class RelationWorkflowState(DiskWorkflowState):
@@ -360,7 +439,7 @@
360439
361 @param: error: The error from hook invocation.440 @param: error: The error from hook invocation.
362 """441 """
363 yield self.fire_transition("stop",442 yield self.fire_transition("error",
364 change_type=relation_change.change_type,443 change_type=relation_change.change_type,
365 error_message=str(error))444 error_message=str(error))
366445
@@ -369,12 +448,30 @@
369 """Transition the workflow to the 'down' state.448 """Transition the workflow to the 'down' state.
370449
371 Turns off the unit-relation lifecycle monitoring and hook execution.450 Turns off the unit-relation lifecycle monitoring and hook execution.
451
452 :param error_info: If called on relation hook error, contains
453 error variables.
372 """454 """
373 yield self._lifecycle.stop()455 yield self._lifecycle.stop()
374456
375 @inlineCallbacks457 @inlineCallbacks
458 def do_reset(self):
459 """Transition the workflow to the 'up' state from an error state.
460
461 Turns on the unit-relation lifecycle monitoring and hook execution.
462 """
463 yield self._lifecycle.start(watches=False)
464
465 @inlineCallbacks
466 def do_error(self, **error_info):
467 """A relation hook error, stops further execution hooks but
468 continues to watch for changes.
469 """
470 yield self._lifecycle.stop(watches=False)
471
472 @inlineCallbacks
376 def do_restart(self):473 def do_restart(self):
377 """Transition the workflow to the 'up' state.474 """Transition the workflow to the 'up' state from the down state.
378475
379 Turns on the unit-relation lifecycle monitoring and hook execution.476 Turns on the unit-relation lifecycle monitoring and hook execution.
380 """477 """

Subscribers

People subscribed via source and target branches

to status/vote changes: