Merge lp:~hazmat/pyjuju/lp-616504-provision-this into lp:pyjuju
- lp-616504-provision-this
- Merge into trunk
Proposed by
Kapil Thangavelu
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Gustavo Niemeyer | ||||
Approved revision: | 90 | ||||
Merged at revision: | 72 | ||||
Proposed branch: | lp:~hazmat/pyjuju/lp-616504-provision-this | ||||
Merge into: | lp:pyjuju | ||||
Diff against target: |
1087 lines (+719/-70) (has conflicts) 16 files modified
ensemble/agents/base.py (+11/-2) ensemble/agents/provision.py (+137/-32) ensemble/agents/tests/test_base.py (+6/-0) ensemble/agents/tests/test_provision.py (+301/-27) ensemble/errors.py (+10/-0) ensemble/ftests/test_ec2_provider.py (+1/-1) ensemble/lib/testing.py (+3/-0) ensemble/lib/tests/test_twistutils.py (+96/-0) ensemble/lib/twistutils.py (+31/-0) ensemble/providers/common.py (+35/-0) ensemble/providers/ec2/__init__.py (+20/-4) ensemble/providers/ec2/launch.py (+5/-0) ensemble/providers/ec2/tests/test_launch.py (+6/-2) ensemble/providers/tests/test_common.py (+47/-0) ensemble/state/tests/common.py (+1/-1) ensemble/tests/test_errors.py (+9/-1) Text conflict in ensemble/providers/common.py Text conflict in ensemble/providers/ec2/__init__.py Text conflict in ensemble/providers/tests/test_common.py |
||||
To merge this branch: | bzr merge lp:~hazmat/pyjuju/lp-616504-provision-this | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gustavo Niemeyer | Approve | ||
Review via email: mp+35709@code.launchpad.net |
Commit message
Description of the change
Provisioning Agent Implementation!!
To post a comment you must log in.
- 90. By Kapil Thangavelu
-
enable test for removed machine state
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote : | # |
As per the conversation:
[1]
In the concurrent_
+ def validate_
+ success, value = results[0]
+ self.assertTrue
+ self.assertEqua
+
+ success, value = results[1]
+ self.assertTrue
+ self.assertEqua
- 91. By Kapil Thangavelu
-
address review comment, and guard execution value had skipped the concurrent call
Revision history for this message
Kapil Thangavelu (hazmat) wrote : | # |
thanks, addressed [1], and merged.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'bin/ensemble-admin' (properties changed: -x to +x) |
2 | === modified file 'ensemble/agents/base.py' |
3 | --- ensemble/agents/base.py 2010-09-14 20:03:06 +0000 |
4 | +++ ensemble/agents/base.py 2010-09-17 14:41:43 +0000 |
5 | @@ -44,6 +44,9 @@ |
6 | name = "ensemble-agent-unknown" |
7 | client = None |
8 | |
9 | + # Flag when enabling persistent topology watches, testing aid. |
10 | + _watch_enabled = True |
11 | + |
12 | @classmethod |
13 | def run(cls): |
14 | """Runs the agent as a unix daemon. |
15 | @@ -130,8 +133,8 @@ |
16 | # conventions. |
17 | @inlineCallbacks |
18 | def startService(self): |
19 | - self.client = yield self.connect() |
20 | - self.start() |
21 | + yield self.connect() |
22 | + yield self.start() |
23 | |
24 | @inlineCallbacks |
25 | def stopService(self): |
26 | @@ -141,6 +144,12 @@ |
27 | if self.client and self.client.connected: |
28 | self.client.close() |
29 | |
30 | + def set_watch_enabled(self, flag): |
31 | + self._watch_enabled = bool(flag) |
32 | + |
33 | + def get_watch_enabled(self): |
34 | + return self._watch_enabled |
35 | + |
36 | |
37 | def setup_default_agent_options(parser, cls): |
38 | |
39 | |
40 | === modified file 'ensemble/agents/provision.py' |
41 | --- ensemble/agents/provision.py 2010-09-14 20:03:06 +0000 |
42 | +++ ensemble/agents/provision.py 2010-09-17 14:41:43 +0000 |
43 | @@ -3,8 +3,12 @@ |
44 | from twisted.internet.defer import inlineCallbacks, returnValue |
45 | |
46 | from ensemble.environment.config import EnvironmentsConfig |
47 | +from ensemble.errors import ProviderInteractionError |
48 | +from ensemble.lib.twistutils import concurrent_execution_guard |
49 | from ensemble.state.machine import MachineStateManager |
50 | |
51 | +from ensemble.state.errors import MachineStateNotFound, StateChanged |
52 | + |
53 | from .base import BaseAgent |
54 | |
55 | |
56 | @@ -12,16 +16,25 @@ |
57 | |
58 | name = "ensemble-provisoning-agent" |
59 | |
60 | - # @inlineCallbacks |
61 | - # def start(self): |
62 | - # self.environment = yield self.startup_wait_for_environment() |
63 | - # self.provider = self.environment.get_machine_provider() |
64 | - # self.state_manager = MachineStateManager(self.client) |
65 | - # self.state_manager.watch_machine_changes(self.watch_machine_changes) |
66 | - |
67 | - @inlineCallbacks |
68 | - def startup_wait_for_environment(self): |
69 | - """On startup the provisioning agent needs to wait for its environment. |
70 | + _current_machines = () |
71 | + _machine_watch_enabled = True |
72 | + |
73 | + machine_check_period = 60 # time in seconds |
74 | + |
75 | + @inlineCallbacks |
76 | + def start(self): |
77 | + self.environment = yield self.configure_environment() |
78 | + self.provider = self.environment.get_machine_provider() |
79 | + self.state_manager = MachineStateManager(self.client) |
80 | + if self.get_watch_enabled(): |
81 | + self.state_manager.watch_machine_states(self.watch_machine_changes) |
82 | + from twisted.internet import reactor |
83 | + reactor.callLater( |
84 | + self.machine_check_period, self.periodic_machine_check) |
85 | + |
86 | + @inlineCallbacks |
87 | + def configure_environment(self): |
88 | + """The provisioning agent configure its environment on start or change. |
89 | |
90 | The environment contains the configuration th agent needs to interact |
91 | with its machine provider, in order to do its work. This configuration |
92 | @@ -30,29 +43,59 @@ |
93 | The agent waits for this data to exist before completing its startup. |
94 | """ |
95 | try: |
96 | - environment_data, stat = yield self.client.get("/environment") |
97 | + get_d, watch_d = self.client.get_and_watch("/environment") |
98 | + environment_data, stat = yield get_d |
99 | + watch_d.addCallback(self._on_environment_changed) |
100 | except NoNodeException: |
101 | # Wait till the environment node appears. play twisted gymnastics |
102 | exists_d, watch_d = self.client.exists_and_watch("/environment") |
103 | stat = yield exists_d |
104 | if stat: |
105 | - environment = yield self.startup_wait_for_environment() |
106 | - returnValue(environment) |
107 | + environment = yield self.configure_environment() |
108 | else: |
109 | watch_d.addCallback( |
110 | - lambda result: self.startup_wait_for_environment()) |
111 | + lambda result: self.configure_environment()) |
112 | + if not stat: |
113 | environment = yield watch_d |
114 | - returnValue(environment) |
115 | + returnValue(environment) |
116 | |
117 | config = EnvironmentsConfig() |
118 | config.parse(environment_data) |
119 | returnValue(config.get_default()) |
120 | |
121 | - def watch_environment_changes(self, event): |
122 | + @inlineCallbacks |
123 | + def _on_environment_changed(self, event): |
124 | """Reload the environment if its data changes.""" |
125 | |
126 | + if event.type_name == "deleted": |
127 | + return |
128 | + |
129 | + self.environment = yield self.configure_environment() |
130 | + self.provider = self.environment.get_machine_provider() |
131 | + |
132 | + def periodic_machine_check(self): |
133 | + """A periodic checking of machine states and provider machines. |
134 | + |
135 | + In addition to the on demand changes to zookeeper states that are |
136 | + monitored by L{watch_machine_changes}, the periodic machine check |
137 | + performs non zookeeper state related verification by periodically |
138 | + checking the last current provider machine states against the |
139 | + last known zookeeper state. |
140 | + |
141 | + Primarily this helps in recovering from transient error conditions |
142 | + which may have prevent processing of an individual machine state, as |
143 | + well as verifying the current state of the provider's running machines |
144 | + against the zk state, thus pruning unused resources. |
145 | + """ |
146 | + from twisted.internet import reactor |
147 | + d = self.process_machines(self._current_machines) |
148 | + d.addBoth( |
149 | + lambda result: reactor.callLater( |
150 | + self.machine_check_period, self.periodic_machine_check)) |
151 | + return d |
152 | + |
153 | def watch_machine_changes(self, old_machines, new_machines): |
154 | - """Ensure the currently running machines correspond to state. |
155 | + """Watches and processes machine state changes. |
156 | |
157 | This function is used to subscribe to topology changes, and |
158 | specifically changes to machines within the topology. It performs |
159 | @@ -64,21 +107,83 @@ |
160 | function will automatically be rescheduled to run whenever a topology |
161 | state change happens that involves machines. |
162 | |
163 | + This functional also caches the current set of machines as an agent |
164 | + instance attribute. |
165 | + |
166 | + @param old_machines machine ids as existed in the previous topology. |
167 | + @param new_machines machine ids as exist in the current topology. |
168 | + """ |
169 | + self._current_machines = new_machines |
170 | + return self.process_machines(self._current_machines) |
171 | + |
172 | + @concurrent_execution_guard("_processing_machines") |
173 | + @inlineCallbacks |
174 | + def process_machines(self, current_machines): |
175 | + """Ensure the currently running machines correspond to state. |
176 | + |
177 | + At the end of each process_machines execution, verify that all |
178 | + running machines within the provider correspond to machine_ids within |
179 | + the topology. If they don't then shut them down. |
180 | + |
181 | + Utilizes concurrent execution guard, to ensure that this is only being |
182 | + executed at most once per process. |
183 | + """ |
184 | + # XXX this is obviously broken, but the margins of 80 columns prevent |
185 | + # me from describing. hint think concurrent agents, and use a lock. |
186 | + |
187 | + # map of instance_id -> machine |
188 | + try: |
189 | + provider_machines = yield self.provider.list_machines() |
190 | + except ProviderInteractionError: |
191 | + # XXX log me |
192 | + return |
193 | + |
194 | + provider_machines = dict( |
195 | + [(m.instance_id, m) for m in provider_machines]) |
196 | + |
197 | + instance_ids = [] |
198 | + for machine_state_id in current_machines: |
199 | + try: |
200 | + instance_id = yield self.process_machine( |
201 | + machine_state_id, provider_machines) |
202 | + except (StateChanged, |
203 | + MachineStateNotFound, |
204 | + ProviderInteractionError): |
205 | + # XXX log me |
206 | + continue |
207 | + instance_ids.append(instance_id) |
208 | + |
209 | + # Terminate all unused ensemble machines running within the cluster. |
210 | + unused = set(provider_machines.keys()) - set(instance_ids) |
211 | + for instance_id in unused: |
212 | + machine = provider_machines[instance_id] |
213 | + try: |
214 | + yield self.provider.shutdown_machine(machine) |
215 | + except ProviderInteractionError: |
216 | + # XXX log me |
217 | + continue |
218 | + |
219 | + @inlineCallbacks |
220 | + def process_machine(self, machine_state_id, provider_machine_map): |
221 | + """Ensure a provider machine for a machine state id. |
222 | + |
223 | For each machine_id in new machines which represents the current state |
224 | of the topology, check to ensure its state reflects that it has been |
225 | launched. If it hasn't then create the machine and update the state. |
226 | - |
227 | - At the end of each watch_machine_change execution, verify that all |
228 | - running machines within the provider correspond to machine_ids within |
229 | - the topology. If they don't then shut them down. |
230 | - |
231 | - @param old_machines machine ids as existed in the previous topology. |
232 | - @param new_machines machine ids as exist in the current topology. |
233 | - """ |
234 | - |
235 | - def process_machine(self, machine_id): |
236 | - """Verify a machine id has state and is running, else launch it.""" |
237 | - |
238 | - def terminate_unused(self, machine_states): |
239 | - """Terminate all unused ensemble machines running within the cluster. |
240 | - """ |
241 | + """ |
242 | + # fetch the machine state |
243 | + machine_state = yield self.state_manager.get_machine_state( |
244 | + machine_state_id) |
245 | + instance_id = yield machine_state.get_instance_id() |
246 | + |
247 | + # Verify a machine id has state and is running, else launch it. |
248 | + if instance_id is None or not instance_id in provider_machine_map: |
249 | + machines = yield self.provider.start_machine( |
250 | + {"machine_id": machine_state.id}) |
251 | + instance_id = machines[0].instance_id |
252 | + yield machine_state.set_instance_id(instance_id) |
253 | + |
254 | + returnValue(instance_id) |
255 | + |
256 | +if __name__ == '__main__': |
257 | + ProvisioningAgent().run() |
258 | |
259 | === modified file 'ensemble/agents/tests/test_base.py' |
260 | --- ensemble/agents/tests/test_base.py 2010-09-14 19:37:16 +0000 |
261 | +++ ensemble/agents/tests/test_base.py 2010-09-17 14:41:43 +0000 |
262 | @@ -272,3 +272,9 @@ |
263 | agent = BaseAgent() |
264 | self.assertRaises( |
265 | NoConnection, agent.configure, {"zookeeper_servers": None}) |
266 | + |
267 | + def test_watch_enabled_accessors(self): |
268 | + agent = BaseAgent() |
269 | + self.assertTrue(agent.get_watch_enabled()) |
270 | + agent.set_watch_enabled(False) |
271 | + self.assertFalse(agent.get_watch_enabled()) |
272 | |
273 | === modified file 'ensemble/agents/tests/test_provision.py' |
274 | --- ensemble/agents/tests/test_provision.py 2010-09-14 20:03:06 +0000 |
275 | +++ ensemble/agents/tests/test_provision.py 2010-09-17 14:41:43 +0000 |
276 | @@ -1,10 +1,10 @@ |
277 | import zookeeper |
278 | |
279 | -from twisted.internet.defer import inlineCallbacks |
280 | +from twisted.internet.defer import inlineCallbacks, fail, succeed |
281 | from twisted.internet import reactor |
282 | |
283 | -from txzookeeper import ZookeeperClient |
284 | from txzookeeper.tests.utils import deleteTree |
285 | +from txzookeeper.client import ZOO_OPEN_ACL_UNSAFE |
286 | |
287 | from ensemble.agents.provision import ProvisioningAgent |
288 | from ensemble.agents.base import TwistedOptionNamespace |
289 | @@ -13,22 +13,24 @@ |
290 | from ensemble.environment.errors import EnvironmentsConfigError |
291 | from ensemble.environment.tests.test_config import SAMPLE_ENV |
292 | |
293 | -from ensemble.lib.testing import TestCase |
294 | - |
295 | - |
296 | -class ProvisioningAgentStartupTest(TestCase): |
297 | +from ensemble.errors import ProviderInteractionError |
298 | +from ensemble.lib.mocker import MATCH |
299 | +from ensemble.providers.dummy import DummyMachine |
300 | +from ensemble.state.machine import MachineStateManager |
301 | +from ensemble.state.tests.common import StateTestBase |
302 | + |
303 | +MATCH_MACHINE = MATCH(lambda x: isinstance(x, DummyMachine)) |
304 | + |
305 | + |
306 | +class ProvisioningTestBase(StateTestBase): |
307 | |
308 | @inlineCallbacks |
309 | def setUp(self): |
310 | - zookeeper.set_debug_level(0) |
311 | - self.client = ZookeeperClient() |
312 | - yield self.client.connect("127.0.0.1:2181") |
313 | + yield super(ProvisioningTestBase, self).setUp() |
314 | self.options = TwistedOptionNamespace() |
315 | self.options["zookeeper_servers"] = "127.0.0.1:2181" |
316 | - |
317 | self.agent = ProvisioningAgent() |
318 | self.agent.configure(self.options) |
319 | - yield self.agent.connect() |
320 | |
321 | def tearDown(self): |
322 | if self.agent.client and self.agent.client.connected: |
323 | @@ -43,13 +45,21 @@ |
324 | config.parse(SAMPLE_ENV) |
325 | return config.serialize("myfirstenv") |
326 | |
327 | + |
328 | +class ProvisioningAgentStartupTest(ProvisioningTestBase): |
329 | + |
330 | + @inlineCallbacks |
331 | + def setUp(self): |
332 | + yield super(ProvisioningAgentStartupTest, self).setUp() |
333 | + yield self.agent.connect() |
334 | + |
335 | def test_agent_waits_for_environment(self): |
336 | """ |
337 | When the agent starts it waits for the /environment node to exist. |
338 | As soon as it does, the agent will fetch the environment, and |
339 | deserialize it into an environment object. |
340 | """ |
341 | - env_loaded_deferred = self.agent.startup_wait_for_environment() |
342 | + env_loaded_deferred = self.agent.configure_environment() |
343 | |
344 | def verify_environment(result): |
345 | self.assertTrue(isinstance(result, Environment)) |
346 | @@ -76,27 +86,291 @@ |
347 | self.assertTrue(isinstance(result, Environment)) |
348 | self.assertEqual(result.name, "myfirstenv") |
349 | |
350 | - d = self.agent.startup_wait_for_environment() |
351 | + d = self.agent.configure_environment() |
352 | d.addCallback(verify_environment) |
353 | yield d |
354 | |
355 | @inlineCallbacks |
356 | def test_agent_with_invalid_environment(self): |
357 | yield self.client.create("/environment", "WAHOO!") |
358 | - d = self.agent.startup_wait_for_environment() |
359 | + d = self.agent.configure_environment() |
360 | yield self.assertFailure(d, EnvironmentsConfigError) |
361 | |
362 | - |
363 | -class ProvisoningAgentTest(TestCase): |
364 | - |
365 | + def test_agent_with_nonexistent_environment_created_concurrently(self): |
366 | + """ |
367 | + If the environment node does not initially exist but it is created |
368 | + while the agent is processing the NoNodeException, it should detect |
369 | + this and configure normally. |
370 | + """ |
371 | + data = self.get_serialized_environment() |
372 | + exists_and_watch = self.agent.client.exists_and_watch |
373 | + |
374 | + mock_client = self.mocker.patch(self.agent.client) |
375 | + mock_client.exists_and_watch("/environment") |
376 | + |
377 | + def inject_creation(path): |
378 | + zookeeper.create( |
379 | + self.agent.client.handle, path, data, [ZOO_OPEN_ACL_UNSAFE]) |
380 | + return exists_and_watch(path) |
381 | + |
382 | + self.mocker.call(inject_creation) |
383 | + self.mocker.replay() |
384 | + |
385 | + def verify_configured(result): |
386 | + self.assertTrue(isinstance(result, Environment)) |
387 | + self.assertEqual(result.type, "dummy") |
388 | + # mocker magic test |
389 | + d = self.agent.configure_environment() |
390 | + d.addCallback(verify_configured) |
391 | + return d |
392 | + |
393 | + |
394 | +class ProvisioningAgentTest(ProvisioningTestBase): |
395 | + |
396 | + @inlineCallbacks |
397 | def setUp(self): |
398 | - pass |
399 | - |
400 | - def test_process_machine_id(self): |
401 | - pass |
402 | - |
403 | - def test_terminate_unusued_doesnt_touch_used(self): |
404 | - pass |
405 | - |
406 | - def test_terminate_unusued(self): |
407 | - pass |
408 | + yield super(ProvisioningAgentTest, self).setUp() |
409 | + yield self.client.create( |
410 | + "/environment", self.get_serialized_environment()) |
411 | + self.agent.set_watch_enabled(False) |
412 | + yield self.agent.startService() |
413 | + |
414 | + @inlineCallbacks |
415 | + def test_watch_machine_changes_processes_new_machine_id(self): |
416 | + """The agent should process a new machine id by creating it""" |
417 | + manager = MachineStateManager(self.client) |
418 | + machine_state0 = yield manager.add_machine_state() |
419 | + machine_state1 = yield manager.add_machine_state() |
420 | + |
421 | + yield self.agent.watch_machine_changes( |
422 | + None, [machine_state0.id, machine_state1.id]) |
423 | + |
424 | + machines = yield self.agent.provider.list_machines() |
425 | + self.assertEquals(len(machines), 2) |
426 | + |
427 | + instance_id = yield machine_state0.get_instance_id() |
428 | + self.assertEqual(instance_id, 0) |
429 | + |
430 | + instance_id = yield machine_state1.get_instance_id() |
431 | + self.assertEqual(instance_id, 1) |
432 | + |
433 | + @inlineCallbacks |
434 | + def test_watch_machine_changes_ignores_running_machine(self): |
435 | + manager = MachineStateManager(self.client) |
436 | + machine_state0 = yield manager.add_machine_state() |
437 | + yield self.agent.provider.start_machine() |
438 | + yield machine_state0.set_instance_id(0) |
439 | + |
440 | + machine_state1 = yield manager.add_machine_state() |
441 | + |
442 | + yield self.agent.watch_machine_changes( |
443 | + None, [machine_state0.id, machine_state1.id]) |
444 | + |
445 | + machines = yield self.agent.provider.list_machines() |
446 | + self.assertEquals(len(machines), 2) |
447 | + |
448 | + instance_id = yield machine_state1.get_instance_id() |
449 | + self.assertEqual(instance_id, 1) |
450 | + |
451 | + @inlineCallbacks |
452 | + def test_watch_machine_changes_terminates_unused(self): |
453 | + # start an unused machine within the dummy provider instance |
454 | + yield self.agent.provider.start_machine() |
455 | + yield self.agent.watch_machine_changes(None, []) |
456 | + |
457 | + machines = yield self.agent.provider.list_machines() |
458 | + self.assertFalse(machines) |
459 | + |
460 | + @inlineCallbacks |
461 | + def test_new_machine_state_removed_while_processing(self): |
462 | + """ |
463 | + If the machine state is removed while the event is processing the |
464 | + state, the watch function should process it normally. |
465 | + """ |
466 | + yield self.agent.watch_machine_changes( |
467 | + None, [0]) |
468 | + machines = yield self.agent.provider.list_machines() |
469 | + self.assertEquals(len(machines), 0) |
470 | + |
471 | + @inlineCallbacks |
472 | + def test_process_machines_non_concurrency(self): |
473 | + """ |
474 | + Process machines should only be executed serially by an |
475 | + agent. |
476 | + """ |
477 | + manager = MachineStateManager(self.client) |
478 | + machine_state0 = yield manager.add_machine_state() |
479 | + machine_state1 = yield manager.add_machine_state() |
480 | + |
481 | + call_1 = self.agent.process_machines([machine_state0.id]) |
482 | + |
483 | + # The second call should return immediately due to the |
484 | + # instance attribute guard. |
485 | + call_2 = self.agent.process_machines([machine_state1.id]) |
486 | + self.assertEqual(call_2.called, True) |
487 | + self.assertEqual(call_2.result, False) |
488 | + |
489 | + # The first call should have started a provider machine |
490 | + yield call_1 |
491 | + |
492 | + machines = yield self.agent.provider.list_machines() |
493 | + self.assertEquals(len(machines), 1) |
494 | + |
495 | + instance_id_0 = yield machine_state0.get_instance_id() |
496 | + self.assertEqual(instance_id_0, 0) |
497 | + |
498 | + instance_id_1 = yield machine_state1.get_instance_id() |
499 | + self.assertEqual(instance_id_1, None) |
500 | + |
501 | + def test_new_machine_state_removed_while_processing_get_provider_id(self): |
502 | + """ |
503 | + If the machine state is removed while the event is processing the |
504 | + state, the watch function should process it normally. |
505 | + """ |
506 | + yield self.agent.watch_machine_changes( |
507 | + None, [0]) |
508 | + machines = yield self.agent.provider.list_machines() |
509 | + self.assertEquals(len(machines), 0) |
510 | + |
511 | + @inlineCallbacks |
512 | + def test_on_environment_change_agent_reconfigures(self): |
513 | + """ |
514 | + If the environment changes the agent reconfigures itself |
515 | + """ |
516 | + provider = self.agent.provider |
517 | + data = self.get_serialized_environment() |
518 | + yield self.client.set("/environment", data) |
519 | + yield self.sleep(0.2) |
520 | + self.assertNotIdentical(provider, self.agent.provider) |
521 | + |
522 | + @inlineCallbacks |
523 | + def test_machine_state_reflects_invalid_provider_state(self): |
524 | + """ |
525 | + If a machine state has an invalid instance_id, it should be detected, |
526 | + and a new machine started and the machine state updated with the |
527 | + new instance_id. |
528 | + """ |
529 | + machine_manager = MachineStateManager(self.client) |
530 | + m1 = yield machine_manager.add_machine_state() |
531 | + yield m1.set_instance_id("zebra") |
532 | + |
533 | + m2 = yield machine_manager.add_machine_state() |
534 | + yield self.agent.watch_machine_changes(None, [m1.id, m2.id]) |
535 | + |
536 | + m1_instance_id = yield m1.get_instance_id() |
537 | + self.assertEqual(m1_instance_id, 0) |
538 | + |
539 | + m2_instance_id = yield m2.get_instance_id() |
540 | + self.assertEqual(m2_instance_id, 1) |
541 | + |
542 | + def test_periodic_task(self): |
543 | + """ |
544 | + The agent schedules period checks that execute the process machines |
545 | + call. |
546 | + """ |
547 | + mock_reactor = self.mocker.patch(reactor) |
548 | + mock_reactor.callLater(self.agent.machine_check_period, |
549 | + self.agent.periodic_machine_check) |
550 | + mock_agent = self.mocker.patch(self.agent) |
551 | + mock_agent.process_machines(()) |
552 | + self.mocker.result(succeed(None)) |
553 | + self.mocker.replay() |
554 | + |
555 | + # mocker magic test |
556 | + self.agent.periodic_machine_check() |
557 | + |
558 | + @inlineCallbacks |
559 | + def test_transient_provider_error_on_start_machine(self): |
560 | + """ |
561 | + If there's an error when processing changes, the agent should log |
562 | + the error and continue. |
563 | + """ |
564 | + manager = MachineStateManager(self.client) |
565 | + machine_state0 = yield manager.add_machine_state() |
566 | + machine_state1 = yield manager.add_machine_state() |
567 | + |
568 | + mock_provider = self.mocker.patch(self.agent.provider) |
569 | + mock_provider.start_machine({"machine_id": 0}) |
570 | + self.mocker.result(fail(ProviderInteractionError(OSError("Bad")))) |
571 | + |
572 | + mock_provider.start_machine({"machine_id": 1}) |
573 | + self.mocker.passthrough() |
574 | + self.mocker.replay() |
575 | + |
576 | + yield self.agent.watch_machine_changes( |
577 | + [], [machine_state0.id, machine_state1.id]) |
578 | + |
579 | + machine1_instance_id = yield machine_state1.get_instance_id() |
580 | + self.assertEqual(machine1_instance_id, 0) |
581 | + |
582 | + @inlineCallbacks |
583 | + def test_transient_provider_error_on_shutdown_machine(self): |
584 | + |
585 | + yield self.agent.provider.start_machine() |
586 | + mock_provider = self.mocker.patch(self.agent.provider) |
587 | + |
588 | + mock_provider.shutdown_machine(MATCH_MACHINE) |
589 | + self.mocker.result(fail(ProviderInteractionError("Bad"))) |
590 | + |
591 | + mock_provider.shutdown_machine(MATCH_MACHINE) |
592 | + self.mocker.passthrough() |
593 | + |
594 | + self.mocker.replay() |
595 | + try: |
596 | + yield self.agent.process_machines([]) |
597 | + except: |
598 | + self.fail("Should not raise") |
599 | + |
600 | + machines = yield self.agent.provider.list_machines() |
601 | + self.assertTrue(machines) |
602 | + |
603 | + yield self.agent.process_machines([]) |
604 | + machines = yield self.agent.provider.list_machines() |
605 | + self.assertFalse(machines) |
606 | + |
607 | + @inlineCallbacks |
608 | + def test_transient_provider_error_on_list_machines(self): |
609 | + manager = MachineStateManager(self.client) |
610 | + machine_state0 = yield manager.add_machine_state() |
611 | + |
612 | + mock_provider = self.mocker.patch(self.agent.provider) |
613 | + mock_provider.list_machines() |
614 | + self.mocker.result(fail(ProviderInteractionError("Bad"))) |
615 | + |
616 | + mock_provider.list_machines() |
617 | + self.mocker.passthrough() |
618 | + |
619 | + self.mocker.replay() |
620 | + try: |
621 | + yield self.agent.process_machines([machine_state0.id]) |
622 | + except: |
623 | + self.fail("Should not raise") |
624 | + |
625 | + instance_id = yield machine_state0.get_instance_id() |
626 | + self.assertEqual(instance_id, None) |
627 | + |
628 | + yield self.agent.process_machines( |
629 | + [machine_state0.id]) |
630 | + |
631 | + instance_id = yield machine_state0.get_instance_id() |
632 | + self.assertEqual(instance_id, 0) |
633 | + |
634 | + @inlineCallbacks |
635 | + def test_start_agent_with_watch(self): |
636 | + mock_reactor = self.mocker.patch(reactor) |
637 | + mock_reactor.callLater( |
638 | + self.agent.machine_check_period, |
639 | + self.agent.periodic_machine_check) |
640 | + self.mocker.replay() |
641 | + |
642 | + self.agent.set_watch_enabled(True) |
643 | + yield self.agent.start() |
644 | + |
645 | + manager = MachineStateManager(self.client) |
646 | + machine_state0 = yield manager.add_machine_state() |
647 | + |
648 | + # the watch invocation happens out of band, sleep for |
649 | + # a moment so it has a chance to run. |
650 | + yield self.sleep(0.1) |
651 | + instance_id = yield machine_state0.get_instance_id() |
652 | + self.assertEqual(instance_id, 0) |
653 | |
654 | === modified file 'ensemble/errors.py' |
655 | --- ensemble/errors.py 2010-08-27 20:48:57 +0000 |
656 | +++ ensemble/errors.py 2010-09-17 14:41:43 +0000 |
657 | @@ -74,3 +74,13 @@ |
658 | |
659 | class ProviderError(EnsembleError): |
660 | """Raised when an exception occurs in a provider.""" |
661 | + |
662 | + |
663 | +class ProviderInteractionError(ProviderError): |
664 | + |
665 | + def __init__(self, error): |
666 | + self.error = error |
667 | + |
668 | + def __str__(self): |
669 | + return "ProviderError: Interaction with machine provider failed: %r" \ |
670 | + % self.error |
671 | |
672 | === modified file 'ensemble/ftests/test_ec2_provider.py' |
673 | --- ensemble/ftests/test_ec2_provider.py 2010-09-08 17:03:26 +0000 |
674 | +++ ensemble/ftests/test_ec2_provider.py 2010-09-17 14:41:43 +0000 |
675 | @@ -100,7 +100,7 @@ |
676 | return provider_instances |
677 | |
678 | @inlineCallbacks |
679 | - def xtest_shutdown(self): |
680 | + def test_shutdown(self): |
681 | """ |
682 | Shutting down the provider, terminates all instances associated to |
683 | the provider instance. |
684 | |
685 | === modified file 'ensemble/lib/testing.py' |
686 | --- ensemble/lib/testing.py 2010-08-31 16:53:06 +0000 |
687 | +++ ensemble/lib/testing.py 2010-09-17 14:41:43 +0000 |
688 | @@ -64,3 +64,6 @@ |
689 | os.environ.update(original_environ) |
690 | |
691 | os.environ.update(kw) |
692 | + |
693 | + def assertInstance(self, instance, type): |
694 | + self.assertTrue(isinstance(instance, type)) |
695 | |
696 | === added file 'ensemble/lib/tests/test_twistutils.py' |
697 | --- ensemble/lib/tests/test_twistutils.py 1970-01-01 00:00:00 +0000 |
698 | +++ ensemble/lib/tests/test_twistutils.py 2010-09-17 14:41:43 +0000 |
699 | @@ -0,0 +1,96 @@ |
700 | +from twisted.internet.defer import ( |
701 | + succeed, fail, Deferred, DeferredList, inlineCallbacks, returnValue) |
702 | +from twisted.internet import reactor |
703 | + |
704 | +from ensemble.lib.testing import TestCase |
705 | +from ensemble.lib.twistutils import concurrent_execution_guard |
706 | + |
707 | + |
708 | +class Bar(object): |
709 | + |
710 | + def __init__(self): |
711 | + self._count = 0 |
712 | + |
713 | + @concurrent_execution_guard("guard") |
714 | + def my_function(self, a, b=0): |
715 | + """zebra""" |
716 | + return succeed(a / b) |
717 | + |
718 | + @concurrent_execution_guard("other_guard") |
719 | + def other_function(self, a): |
720 | + return fail(OSError("Bad")) |
721 | + |
722 | + @concurrent_execution_guard("increment_guard") |
723 | + def slow_increment(self, delay=0.1): |
724 | + deferred = Deferred() |
725 | + |
726 | + def _increment(): |
727 | + self._count += 1 |
728 | + return deferred.callback(self._count) |
729 | + |
730 | + reactor.callLater(delay, _increment) |
731 | + return deferred |
732 | + |
733 | + @concurrent_execution_guard("inline_guard") |
734 | + @inlineCallbacks |
735 | + def inline_increment(self): |
736 | + result = yield self.slow_increment() |
737 | + returnValue(result * 100) |
738 | + |
739 | + |
740 | +class ExecutionGuardTest(TestCase): |
741 | + |
742 | + def test_guarded_function_metadata(self): |
743 | + self.assertEqual(Bar().my_function.__name__, "my_function") |
744 | + self.assertEqual(Bar().my_function.__doc__, "zebra") |
745 | + |
746 | + def test_guarded_function_failure(self): |
747 | + foo = Bar() |
748 | + return self.assertFailure(foo.other_function("1"), OSError) |
749 | + |
750 | + def test_guarded_function_sync_exception(self): |
751 | + foo = Bar() |
752 | + try: |
753 | + result = foo.my_function(1) |
754 | + except: |
755 | + self.fail("Should not raise exception") |
756 | + |
757 | + self.assertFailure(result, ZeroDivisionError) |
758 | + self.assertFailure(foo.my_function(1), ZeroDivisionError) |
759 | + self.assertFalse(foo.guard, False) |
760 | + |
761 | + def test_guard_multiple_execution(self): |
762 | + foo = Bar() |
763 | + |
764 | + d1 = foo.slow_increment() |
765 | + d2 = foo.slow_increment() |
766 | + |
767 | + def validate_results(results): |
768 | + success, value = results[0] |
769 | + self.assertTrue(success) |
770 | + self.assertEqual(value, 1) |
771 | + |
772 | + success, value = results[1] |
773 | + self.assertTrue(success) |
774 | + self.assertEqual(value, False) |
775 | + return foo.slow_increment() |
776 | + |
777 | + def validate_value(results): |
778 | + # if the guard had not prevent execution the value |
779 | + # would be 3. |
780 | + self.assertEqual(results, 2) |
781 | + |
782 | + dlist = DeferredList([d1, d2]) |
783 | + dlist.addCallback(validate_results) |
784 | + dlist.addCallback(validate_value) |
785 | + return dlist |
786 | + |
787 | + def test_guard_w_inline_callbacks(self): |
788 | + foo = Bar() |
789 | + |
790 | + def validate_result(result): |
791 | + self.assertEqual(result, 100) |
792 | + |
793 | + d = foo.inline_increment() |
794 | + d.addCallback(validate_result) |
795 | + return d |
796 | |
797 | === added file 'ensemble/lib/twistutils.py' |
798 | --- ensemble/lib/twistutils.py 1970-01-01 00:00:00 +0000 |
799 | +++ ensemble/lib/twistutils.py 2010-09-17 14:41:43 +0000 |
800 | @@ -0,0 +1,31 @@ |
801 | +from twisted.internet.defer import maybeDeferred, succeed |
802 | +from twisted.python.util import mergeFunctionMetadata |
803 | + |
804 | + |
805 | +def concurrent_execution_guard(attribute): |
806 | + """Sets attribute to True/False during execution of the decorated method. |
807 | + |
808 | + Used to ensure non concurrent execution of the decorated function via |
809 | + an instance attribute. *The underlying function must return a defered*. |
810 | + """ |
811 | + |
812 | + def guard(f): |
813 | + |
814 | + def guard_execute(self, *args, **kw): |
815 | + value = getattr(self, attribute, None) |
816 | + if value: |
817 | + return succeed(False) |
818 | + else: |
819 | + setattr(self, attribute, True) |
820 | + |
821 | + d = maybeDeferred(f, self, *args, **kw) |
822 | + |
823 | + def post_execute(result): |
824 | + setattr(self, attribute, False) |
825 | + return result |
826 | + d.addBoth(post_execute) |
827 | + return d |
828 | + |
829 | + return mergeFunctionMetadata(f, guard_execute) |
830 | + |
831 | + return guard |
832 | |
833 | === modified file 'ensemble/providers/common.py' |
834 | --- ensemble/providers/common.py 2010-09-16 17:00:40 +0000 |
835 | +++ ensemble/providers/common.py 2010-09-17 14:41:43 +0000 |
836 | @@ -1,4 +1,6 @@ |
837 | import os |
838 | +from twisted.python.failure import Failure |
839 | +from ensemble.errors import EnsembleError, ProviderInteractionError |
840 | |
841 | BOOTSTRAP_PACKAGES = [ |
842 | "bzr", |
843 | @@ -18,7 +20,40 @@ |
844 | "python-zookeeper"] |
845 | |
846 | |
847 | +<<<<<<< TREE |
848 | def get_user_authorized_keys(config): |
849 | +======= |
850 | +def convert_unknown_error(failure): |
851 | + """ |
852 | + Convert any non ensemble errors to a provider interaction error. |
853 | + |
854 | + Supports both usage from within an except clause, and as an |
855 | + errback handler ie. both the following forms are supported. |
856 | + |
857 | + ... |
858 | + try: |
859 | + something() |
860 | + except Exception, e: |
861 | + convert_unknown_errors(e) |
862 | + |
863 | + ... |
864 | + d.addErrback(convert_unknown_errors) |
865 | + """ |
866 | + if isinstance(failure, Failure): |
867 | + error = failure.value |
868 | + else: |
869 | + error = failure |
870 | + |
871 | + if not isinstance(error, EnsembleError): |
872 | + error = ProviderInteractionError(error) |
873 | + |
874 | + if isinstance(failure, Failure): |
875 | + return Failure(error) |
876 | + raise error |
877 | + |
878 | + |
879 | +def get_user_public_key(config): |
880 | +>>>>>>> MERGE-SOURCE |
881 | """ |
882 | Locate a public key for the user. Either one explicitly passed |
883 | in or one in the user's .ssh directory. |
884 | |
885 | === modified file 'ensemble/providers/ec2/__init__.py' |
886 | --- ensemble/providers/ec2/__init__.py 2010-09-17 14:09:20 +0000 |
887 | +++ ensemble/providers/ec2/__init__.py 2010-09-17 14:41:43 +0000 |
888 | @@ -3,6 +3,7 @@ |
889 | |
890 | from txaws.service import AWSServiceRegion, REGION_US |
891 | |
892 | +<<<<<<< TREE |
893 | from ensemble.environment.errors import EnvironmentsConfigError |
894 | from ensemble.providers.common import get_user_authorized_keys |
895 | |
896 | @@ -12,6 +13,16 @@ |
897 | from launch import EC2LaunchMachine, EC2Bootstrap |
898 | from shutdown import EC2Shutdown, EC2ShutdownMachine |
899 | from state import EC2SaveState, EC2LoadState |
900 | +======= |
901 | +from ensemble.providers.common import convert_unknown_error |
902 | + |
903 | +from .connect import EC2Connect |
904 | +from .files import FileStorage |
905 | +from .iterate import EC2MachineIteration |
906 | +from .launch import EC2LaunchMachine, EC2Bootstrap |
907 | +from .shutdown import EC2Shutdown, EC2ShutdownMachine |
908 | +from .state import EC2SaveState, EC2LoadState |
909 | +>>>>>>> MERGE-SOURCE |
910 | |
911 | |
912 | class MachineProvider(object): |
913 | @@ -35,9 +46,9 @@ |
914 | "and authorized-keys-path. Pick one!") |
915 | |
916 | def get_serialization_data(self): |
917 | - """ |
918 | - Return a yaml serialization of the provider configuration within |
919 | - the environment. |
920 | + """Return a dictionary serialization of the provider configuration. |
921 | + |
922 | + Additionally this extracts crednetial information from the environment. |
923 | """ |
924 | data = copy.deepcopy(self.config) |
925 | data["secret-key"] = self.config.get( |
926 | @@ -48,6 +59,11 @@ |
927 | data.pop("authorized-keys-path", None) |
928 | return data |
929 | |
930 | + def _run_operation(self, operation, *args, **kw): |
931 | + d = operation.run(*args, **kw) |
932 | + d.addErrback(convert_unknown_error) |
933 | + return d |
934 | + |
935 | def connect(self): |
936 | """ |
937 | Connect to the zookeeper ensemble running in the machine provider. |
938 | @@ -56,7 +72,7 @@ |
939 | C{ensemble.storage.connection.TunnelProtocol} |
940 | """ |
941 | connect = EC2Connect(self) |
942 | - return connect.run() |
943 | + return self._run_operation(connect) |
944 | |
945 | def get_file_storage(self): |
946 | """Retrieve the provider C{FileStorage} abstraction.""" |
947 | |
948 | === modified file 'ensemble/providers/ec2/launch.py' |
949 | --- ensemble/providers/ec2/launch.py 2010-09-16 17:00:40 +0000 |
950 | +++ ensemble/providers/ec2/launch.py 2010-09-17 14:41:43 +0000 |
951 | @@ -231,4 +231,9 @@ |
952 | admin_identity) |
953 | |
954 | variables["scripts"].append(initialize_script) |
955 | + |
956 | + provision_agent_start = "python -m %s -n --zookeeper-servers %s" %( |
957 | + "ensemble.agents.provision", "127.0.0.1:2181") |
958 | + variables["scripts"].append(provision_agent_start) |
959 | + |
960 | return variables |
961 | |
962 | === modified file 'ensemble/providers/ec2/tests/test_launch.py' |
963 | --- ensemble/providers/ec2/tests/test_launch.py 2010-09-16 17:00:40 +0000 |
964 | +++ ensemble/providers/ec2/tests/test_launch.py 2010-09-17 14:41:43 +0000 |
965 | @@ -287,12 +287,16 @@ |
966 | self.assertEqual( |
967 | config["packages"], |
968 | list(DEFAULT_PACKAGES) + BOOTSTRAP_PACKAGES) |
969 | - self.failUnlessIn("admin-identity", config["runcmd"][-1]) |
970 | + self.failUnlessIn("admin-identity", config["runcmd"][-2]) |
971 | |
972 | script = '%s initialize --admin-identity="%s"' % ( |
973 | "sudo /usr/local/bin/ensemble-admin", |
974 | admin_identity) |
975 | |
976 | + self.assertEqual(config["runcmd"][-2], script) |
977 | + |
978 | + script = "python -m %s -n --zookeeper-servers %s" % ( |
979 | + "ensemble.agents.provision", "127.0.0.1:2181") |
980 | self.assertEqual(config["runcmd"][-1], script) |
981 | return True |
982 | |
983 | @@ -443,4 +447,4 @@ |
984 | bootstrap = EC2Bootstrap(provider) |
985 | variables = bootstrap.get_machine_variables() |
986 | identity = make_identity("admin:%s" % config["admin-secret"]) |
987 | - self.failUnlessIn(identity, variables["scripts"][-1]) |
988 | + self.failUnlessIn(identity, variables["scripts"][-2]) |
989 | |
990 | === modified file 'ensemble/providers/tests/test_common.py' |
991 | --- ensemble/providers/tests/test_common.py 2010-09-17 14:09:20 +0000 |
992 | +++ ensemble/providers/tests/test_common.py 2010-09-17 14:41:43 +0000 |
993 | @@ -1,7 +1,15 @@ |
994 | import os |
995 | |
996 | +from twisted.python.failure import Failure |
997 | + |
998 | from ensemble.environment.tests.test_config import EnvironmentsConfigTestBase |
999 | +<<<<<<< TREE |
1000 | from ensemble.providers.common import get_user_authorized_keys |
1001 | +======= |
1002 | +from ensemble.errors import ProviderInteractionError, EnsembleError |
1003 | +from ensemble.providers.common import ( |
1004 | + get_user_public_key, convert_unknown_error) |
1005 | +>>>>>>> MERGE-SOURCE |
1006 | |
1007 | |
1008 | class CommonProviderTests(EnvironmentsConfigTestBase): |
1009 | @@ -46,5 +54,44 @@ |
1010 | |
1011 | def test_invalid_key_specified(self): |
1012 | """If an invalid key is specified, a LookupError is raised.""" |
1013 | +<<<<<<< TREE |
1014 | config = {"authorized-keys-path": "zebra_moon.pub"} |
1015 | self.assertRaises(LookupError, get_user_authorized_keys, config) |
1016 | +======= |
1017 | + config = {"ssh_public_key": "zebra_moon.pub"} |
1018 | + self.assertRaises(LookupError, get_user_public_key, config) |
1019 | + |
1020 | + def test_convert_unknown_error(self): |
1021 | + error = self.assertRaises( |
1022 | + ProviderInteractionError, |
1023 | + convert_unknown_error, |
1024 | + OSError("Bad")) |
1025 | + self.assertInstance(error, EnsembleError) |
1026 | + self.assertEqual( |
1027 | + str(error), |
1028 | + "ProviderError: Interaction with machine provider failed: " |
1029 | + "OSError('Bad',)") |
1030 | + |
1031 | + def test_convert_unknown_error_ignores_ensemble_error(self): |
1032 | + error = self.assertRaises( |
1033 | + EnsembleError, |
1034 | + convert_unknown_error, |
1035 | + EnsembleError("Magic")) |
1036 | + self.assertEqual( |
1037 | + str(error), |
1038 | + "Magic") |
1039 | + |
1040 | + def test_convert_unknown_error_ignores_ensemble_failure(self): |
1041 | + failure = convert_unknown_error(Failure(EnsembleError("Magic"))) |
1042 | + self.assertTrue(isinstance(failure, Failure)) |
1043 | + self.assertEqual(failure.value.__class__, EnsembleError) |
1044 | + |
1045 | + def test_convert_unknown_error_with_failure(self): |
1046 | + failure = convert_unknown_error(Failure(OSError("Bad"))) |
1047 | + self.assertTrue(isinstance(failure, Failure)) |
1048 | + self.assertInstance(failure.value, ProviderInteractionError) |
1049 | + self.assertEqual( |
1050 | + str(failure.value), |
1051 | + "ProviderError: Interaction with machine provider failed: " |
1052 | + "OSError('Bad',)") |
1053 | +>>>>>>> MERGE-SOURCE |
1054 | |
1055 | === modified file 'ensemble/state/tests/common.py' |
1056 | --- ensemble/state/tests/common.py 2010-09-15 13:21:49 +0000 |
1057 | +++ ensemble/state/tests/common.py 2010-09-17 14:41:43 +0000 |
1058 | @@ -57,4 +57,4 @@ |
1059 | """Non-blocking sleep.""" |
1060 | deferred = Deferred() |
1061 | reactor.callLater(delay, deferred.callback, None) |
1062 | - return deferred |
1063 | + return deferred |
1064 | |
1065 | === modified file 'ensemble/tests/test_errors.py' |
1066 | --- ensemble/tests/test_errors.py 2010-08-27 20:48:57 +0000 |
1067 | +++ ensemble/tests/test_errors.py 2010-09-17 14:41:43 +0000 |
1068 | @@ -1,6 +1,6 @@ |
1069 | from ensemble.errors import ( |
1070 | EnsembleError, FileNotFound, FileAlreadyExists, InvalidEnsembleHeaderValue, |
1071 | - NoConnection, ProviderError) |
1072 | + NoConnection, ProviderError, ProviderInteractionError) |
1073 | |
1074 | from ensemble.lib.testing import TestCase |
1075 | |
1076 | @@ -37,3 +37,11 @@ |
1077 | def test_ProviderError(self): |
1078 | error = ProviderError("Invalid credentials") |
1079 | self.assertIsEnsembleError(error) |
1080 | + |
1081 | + def test_ProviderInteractionError(self): |
1082 | + error = ProviderInteractionError(OSError("Bad Stuff")) |
1083 | + self.assertIsEnsembleError(error) |
1084 | + self.assertEquals( |
1085 | + str(error), |
1086 | + "ProviderError: Interaction with machine provider failed: " |
1087 | + "OSError('Bad Stuff',)") |
That's been reviewed live.