Merge lp:~hazmat/pyjuju/lp-616504-provision-this into lp:pyjuju

Proposed by Kapil Thangavelu
Status: Merged
Approved by: Gustavo Niemeyer
Approved revision: 90
Merged at revision: 72
Proposed branch: lp:~hazmat/pyjuju/lp-616504-provision-this
Merge into: lp:pyjuju
Diff against target: 1087 lines (+719/-70) (has conflicts)
16 files modified
ensemble/agents/base.py (+11/-2)
ensemble/agents/provision.py (+137/-32)
ensemble/agents/tests/test_base.py (+6/-0)
ensemble/agents/tests/test_provision.py (+301/-27)
ensemble/errors.py (+10/-0)
ensemble/ftests/test_ec2_provider.py (+1/-1)
ensemble/lib/testing.py (+3/-0)
ensemble/lib/tests/test_twistutils.py (+96/-0)
ensemble/lib/twistutils.py (+31/-0)
ensemble/providers/common.py (+35/-0)
ensemble/providers/ec2/__init__.py (+20/-4)
ensemble/providers/ec2/launch.py (+5/-0)
ensemble/providers/ec2/tests/test_launch.py (+6/-2)
ensemble/providers/tests/test_common.py (+47/-0)
ensemble/state/tests/common.py (+1/-1)
ensemble/tests/test_errors.py (+9/-1)
Text conflict in ensemble/providers/common.py
Text conflict in ensemble/providers/ec2/__init__.py
Text conflict in ensemble/providers/tests/test_common.py
To merge this branch: bzr merge lp:~hazmat/pyjuju/lp-616504-provision-this
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Approve
Review via email: mp+35709@code.launchpad.net

Description of the change

Provisioning Agent Implementation!!

To post a comment you must log in.
90. By Kapil Thangavelu

enable test for removed machine state

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

That's been reviewed live.

review: Approve
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

As per the conversation:

[1]

In the concurrent_execution_guard tests, the first block in this test callback should be repeated and ensure that the underlying method was only called twice:

+ def validate_results(results):
+ success, value = results[0]
+ self.assertTrue(success)
+ self.assertEqual(value, 1)
+
+ success, value = results[1]
+ self.assertTrue(success)
+ self.assertEqual(value, False)

91. By Kapil Thangavelu

address review comment, and guard execution value had skipped the concurrent call

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

thanks, addressed [1], and merged.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/ensemble-admin' (properties changed: -x to +x)
2=== modified file 'ensemble/agents/base.py'
3--- ensemble/agents/base.py 2010-09-14 20:03:06 +0000
4+++ ensemble/agents/base.py 2010-09-17 14:41:43 +0000
5@@ -44,6 +44,9 @@
6 name = "ensemble-agent-unknown"
7 client = None
8
9+ # Flag when enabling persistent topology watches, testing aid.
10+ _watch_enabled = True
11+
12 @classmethod
13 def run(cls):
14 """Runs the agent as a unix daemon.
15@@ -130,8 +133,8 @@
16 # conventions.
17 @inlineCallbacks
18 def startService(self):
19- self.client = yield self.connect()
20- self.start()
21+ yield self.connect()
22+ yield self.start()
23
24 @inlineCallbacks
25 def stopService(self):
26@@ -141,6 +144,12 @@
27 if self.client and self.client.connected:
28 self.client.close()
29
30+ def set_watch_enabled(self, flag):
31+ self._watch_enabled = bool(flag)
32+
33+ def get_watch_enabled(self):
34+ return self._watch_enabled
35+
36
37 def setup_default_agent_options(parser, cls):
38
39
40=== modified file 'ensemble/agents/provision.py'
41--- ensemble/agents/provision.py 2010-09-14 20:03:06 +0000
42+++ ensemble/agents/provision.py 2010-09-17 14:41:43 +0000
43@@ -3,8 +3,12 @@
44 from twisted.internet.defer import inlineCallbacks, returnValue
45
46 from ensemble.environment.config import EnvironmentsConfig
47+from ensemble.errors import ProviderInteractionError
48+from ensemble.lib.twistutils import concurrent_execution_guard
49 from ensemble.state.machine import MachineStateManager
50
51+from ensemble.state.errors import MachineStateNotFound, StateChanged
52+
53 from .base import BaseAgent
54
55
56@@ -12,16 +16,25 @@
57
58 name = "ensemble-provisoning-agent"
59
60- # @inlineCallbacks
61- # def start(self):
62- # self.environment = yield self.startup_wait_for_environment()
63- # self.provider = self.environment.get_machine_provider()
64- # self.state_manager = MachineStateManager(self.client)
65- # self.state_manager.watch_machine_changes(self.watch_machine_changes)
66-
67- @inlineCallbacks
68- def startup_wait_for_environment(self):
69- """On startup the provisioning agent needs to wait for its environment.
70+ _current_machines = ()
71+ _machine_watch_enabled = True
72+
73+ machine_check_period = 60 # time in seconds
74+
75+ @inlineCallbacks
76+ def start(self):
77+ self.environment = yield self.configure_environment()
78+ self.provider = self.environment.get_machine_provider()
79+ self.state_manager = MachineStateManager(self.client)
80+ if self.get_watch_enabled():
81+ self.state_manager.watch_machine_states(self.watch_machine_changes)
82+ from twisted.internet import reactor
83+ reactor.callLater(
84+ self.machine_check_period, self.periodic_machine_check)
85+
86+ @inlineCallbacks
87+ def configure_environment(self):
88+ """The provisioning agent configure its environment on start or change.
89
90 The environment contains the configuration th agent needs to interact
91 with its machine provider, in order to do its work. This configuration
92@@ -30,29 +43,59 @@
93 The agent waits for this data to exist before completing its startup.
94 """
95 try:
96- environment_data, stat = yield self.client.get("/environment")
97+ get_d, watch_d = self.client.get_and_watch("/environment")
98+ environment_data, stat = yield get_d
99+ watch_d.addCallback(self._on_environment_changed)
100 except NoNodeException:
101 # Wait till the environment node appears. play twisted gymnastics
102 exists_d, watch_d = self.client.exists_and_watch("/environment")
103 stat = yield exists_d
104 if stat:
105- environment = yield self.startup_wait_for_environment()
106- returnValue(environment)
107+ environment = yield self.configure_environment()
108 else:
109 watch_d.addCallback(
110- lambda result: self.startup_wait_for_environment())
111+ lambda result: self.configure_environment())
112+ if not stat:
113 environment = yield watch_d
114- returnValue(environment)
115+ returnValue(environment)
116
117 config = EnvironmentsConfig()
118 config.parse(environment_data)
119 returnValue(config.get_default())
120
121- def watch_environment_changes(self, event):
122+ @inlineCallbacks
123+ def _on_environment_changed(self, event):
124 """Reload the environment if its data changes."""
125
126+ if event.type_name == "deleted":
127+ return
128+
129+ self.environment = yield self.configure_environment()
130+ self.provider = self.environment.get_machine_provider()
131+
132+ def periodic_machine_check(self):
133+ """A periodic checking of machine states and provider machines.
134+
135+ In addition to the on demand changes to zookeeper states that are
136+ monitored by L{watch_machine_changes}, the periodic machine check
137+ performs non zookeeper state related verification by periodically
138+ checking the last current provider machine states against the
139+ last known zookeeper state.
140+
141+ Primarily this helps in recovering from transient error conditions
142+ which may have prevent processing of an individual machine state, as
143+ well as verifying the current state of the provider's running machines
144+ against the zk state, thus pruning unused resources.
145+ """
146+ from twisted.internet import reactor
147+ d = self.process_machines(self._current_machines)
148+ d.addBoth(
149+ lambda result: reactor.callLater(
150+ self.machine_check_period, self.periodic_machine_check))
151+ return d
152+
153 def watch_machine_changes(self, old_machines, new_machines):
154- """Ensure the currently running machines correspond to state.
155+ """Watches and processes machine state changes.
156
157 This function is used to subscribe to topology changes, and
158 specifically changes to machines within the topology. It performs
159@@ -64,21 +107,83 @@
160 function will automatically be rescheduled to run whenever a topology
161 state change happens that involves machines.
162
163+ This functional also caches the current set of machines as an agent
164+ instance attribute.
165+
166+ @param old_machines machine ids as existed in the previous topology.
167+ @param new_machines machine ids as exist in the current topology.
168+ """
169+ self._current_machines = new_machines
170+ return self.process_machines(self._current_machines)
171+
172+ @concurrent_execution_guard("_processing_machines")
173+ @inlineCallbacks
174+ def process_machines(self, current_machines):
175+ """Ensure the currently running machines correspond to state.
176+
177+ At the end of each process_machines execution, verify that all
178+ running machines within the provider correspond to machine_ids within
179+ the topology. If they don't then shut them down.
180+
181+ Utilizes concurrent execution guard, to ensure that this is only being
182+ executed at most once per process.
183+ """
184+ # XXX this is obviously broken, but the margins of 80 columns prevent
185+ # me from describing. hint think concurrent agents, and use a lock.
186+
187+ # map of instance_id -> machine
188+ try:
189+ provider_machines = yield self.provider.list_machines()
190+ except ProviderInteractionError:
191+ # XXX log me
192+ return
193+
194+ provider_machines = dict(
195+ [(m.instance_id, m) for m in provider_machines])
196+
197+ instance_ids = []
198+ for machine_state_id in current_machines:
199+ try:
200+ instance_id = yield self.process_machine(
201+ machine_state_id, provider_machines)
202+ except (StateChanged,
203+ MachineStateNotFound,
204+ ProviderInteractionError):
205+ # XXX log me
206+ continue
207+ instance_ids.append(instance_id)
208+
209+ # Terminate all unused ensemble machines running within the cluster.
210+ unused = set(provider_machines.keys()) - set(instance_ids)
211+ for instance_id in unused:
212+ machine = provider_machines[instance_id]
213+ try:
214+ yield self.provider.shutdown_machine(machine)
215+ except ProviderInteractionError:
216+ # XXX log me
217+ continue
218+
219+ @inlineCallbacks
220+ def process_machine(self, machine_state_id, provider_machine_map):
221+ """Ensure a provider machine for a machine state id.
222+
223 For each machine_id in new machines which represents the current state
224 of the topology, check to ensure its state reflects that it has been
225 launched. If it hasn't then create the machine and update the state.
226-
227- At the end of each watch_machine_change execution, verify that all
228- running machines within the provider correspond to machine_ids within
229- the topology. If they don't then shut them down.
230-
231- @param old_machines machine ids as existed in the previous topology.
232- @param new_machines machine ids as exist in the current topology.
233- """
234-
235- def process_machine(self, machine_id):
236- """Verify a machine id has state and is running, else launch it."""
237-
238- def terminate_unused(self, machine_states):
239- """Terminate all unused ensemble machines running within the cluster.
240- """
241+ """
242+ # fetch the machine state
243+ machine_state = yield self.state_manager.get_machine_state(
244+ machine_state_id)
245+ instance_id = yield machine_state.get_instance_id()
246+
247+ # Verify a machine id has state and is running, else launch it.
248+ if instance_id is None or not instance_id in provider_machine_map:
249+ machines = yield self.provider.start_machine(
250+ {"machine_id": machine_state.id})
251+ instance_id = machines[0].instance_id
252+ yield machine_state.set_instance_id(instance_id)
253+
254+ returnValue(instance_id)
255+
256+if __name__ == '__main__':
257+ ProvisioningAgent().run()
258
259=== modified file 'ensemble/agents/tests/test_base.py'
260--- ensemble/agents/tests/test_base.py 2010-09-14 19:37:16 +0000
261+++ ensemble/agents/tests/test_base.py 2010-09-17 14:41:43 +0000
262@@ -272,3 +272,9 @@
263 agent = BaseAgent()
264 self.assertRaises(
265 NoConnection, agent.configure, {"zookeeper_servers": None})
266+
267+ def test_watch_enabled_accessors(self):
268+ agent = BaseAgent()
269+ self.assertTrue(agent.get_watch_enabled())
270+ agent.set_watch_enabled(False)
271+ self.assertFalse(agent.get_watch_enabled())
272
273=== modified file 'ensemble/agents/tests/test_provision.py'
274--- ensemble/agents/tests/test_provision.py 2010-09-14 20:03:06 +0000
275+++ ensemble/agents/tests/test_provision.py 2010-09-17 14:41:43 +0000
276@@ -1,10 +1,10 @@
277 import zookeeper
278
279-from twisted.internet.defer import inlineCallbacks
280+from twisted.internet.defer import inlineCallbacks, fail, succeed
281 from twisted.internet import reactor
282
283-from txzookeeper import ZookeeperClient
284 from txzookeeper.tests.utils import deleteTree
285+from txzookeeper.client import ZOO_OPEN_ACL_UNSAFE
286
287 from ensemble.agents.provision import ProvisioningAgent
288 from ensemble.agents.base import TwistedOptionNamespace
289@@ -13,22 +13,24 @@
290 from ensemble.environment.errors import EnvironmentsConfigError
291 from ensemble.environment.tests.test_config import SAMPLE_ENV
292
293-from ensemble.lib.testing import TestCase
294-
295-
296-class ProvisioningAgentStartupTest(TestCase):
297+from ensemble.errors import ProviderInteractionError
298+from ensemble.lib.mocker import MATCH
299+from ensemble.providers.dummy import DummyMachine
300+from ensemble.state.machine import MachineStateManager
301+from ensemble.state.tests.common import StateTestBase
302+
303+MATCH_MACHINE = MATCH(lambda x: isinstance(x, DummyMachine))
304+
305+
306+class ProvisioningTestBase(StateTestBase):
307
308 @inlineCallbacks
309 def setUp(self):
310- zookeeper.set_debug_level(0)
311- self.client = ZookeeperClient()
312- yield self.client.connect("127.0.0.1:2181")
313+ yield super(ProvisioningTestBase, self).setUp()
314 self.options = TwistedOptionNamespace()
315 self.options["zookeeper_servers"] = "127.0.0.1:2181"
316-
317 self.agent = ProvisioningAgent()
318 self.agent.configure(self.options)
319- yield self.agent.connect()
320
321 def tearDown(self):
322 if self.agent.client and self.agent.client.connected:
323@@ -43,13 +45,21 @@
324 config.parse(SAMPLE_ENV)
325 return config.serialize("myfirstenv")
326
327+
328+class ProvisioningAgentStartupTest(ProvisioningTestBase):
329+
330+ @inlineCallbacks
331+ def setUp(self):
332+ yield super(ProvisioningAgentStartupTest, self).setUp()
333+ yield self.agent.connect()
334+
335 def test_agent_waits_for_environment(self):
336 """
337 When the agent starts it waits for the /environment node to exist.
338 As soon as it does, the agent will fetch the environment, and
339 deserialize it into an environment object.
340 """
341- env_loaded_deferred = self.agent.startup_wait_for_environment()
342+ env_loaded_deferred = self.agent.configure_environment()
343
344 def verify_environment(result):
345 self.assertTrue(isinstance(result, Environment))
346@@ -76,27 +86,291 @@
347 self.assertTrue(isinstance(result, Environment))
348 self.assertEqual(result.name, "myfirstenv")
349
350- d = self.agent.startup_wait_for_environment()
351+ d = self.agent.configure_environment()
352 d.addCallback(verify_environment)
353 yield d
354
355 @inlineCallbacks
356 def test_agent_with_invalid_environment(self):
357 yield self.client.create("/environment", "WAHOO!")
358- d = self.agent.startup_wait_for_environment()
359+ d = self.agent.configure_environment()
360 yield self.assertFailure(d, EnvironmentsConfigError)
361
362-
363-class ProvisoningAgentTest(TestCase):
364-
365+ def test_agent_with_nonexistent_environment_created_concurrently(self):
366+ """
367+ If the environment node does not initially exist but it is created
368+ while the agent is processing the NoNodeException, it should detect
369+ this and configure normally.
370+ """
371+ data = self.get_serialized_environment()
372+ exists_and_watch = self.agent.client.exists_and_watch
373+
374+ mock_client = self.mocker.patch(self.agent.client)
375+ mock_client.exists_and_watch("/environment")
376+
377+ def inject_creation(path):
378+ zookeeper.create(
379+ self.agent.client.handle, path, data, [ZOO_OPEN_ACL_UNSAFE])
380+ return exists_and_watch(path)
381+
382+ self.mocker.call(inject_creation)
383+ self.mocker.replay()
384+
385+ def verify_configured(result):
386+ self.assertTrue(isinstance(result, Environment))
387+ self.assertEqual(result.type, "dummy")
388+ # mocker magic test
389+ d = self.agent.configure_environment()
390+ d.addCallback(verify_configured)
391+ return d
392+
393+
394+class ProvisioningAgentTest(ProvisioningTestBase):
395+
396+ @inlineCallbacks
397 def setUp(self):
398- pass
399-
400- def test_process_machine_id(self):
401- pass
402-
403- def test_terminate_unusued_doesnt_touch_used(self):
404- pass
405-
406- def test_terminate_unusued(self):
407- pass
408+ yield super(ProvisioningAgentTest, self).setUp()
409+ yield self.client.create(
410+ "/environment", self.get_serialized_environment())
411+ self.agent.set_watch_enabled(False)
412+ yield self.agent.startService()
413+
414+ @inlineCallbacks
415+ def test_watch_machine_changes_processes_new_machine_id(self):
416+ """The agent should process a new machine id by creating it"""
417+ manager = MachineStateManager(self.client)
418+ machine_state0 = yield manager.add_machine_state()
419+ machine_state1 = yield manager.add_machine_state()
420+
421+ yield self.agent.watch_machine_changes(
422+ None, [machine_state0.id, machine_state1.id])
423+
424+ machines = yield self.agent.provider.list_machines()
425+ self.assertEquals(len(machines), 2)
426+
427+ instance_id = yield machine_state0.get_instance_id()
428+ self.assertEqual(instance_id, 0)
429+
430+ instance_id = yield machine_state1.get_instance_id()
431+ self.assertEqual(instance_id, 1)
432+
433+ @inlineCallbacks
434+ def test_watch_machine_changes_ignores_running_machine(self):
435+ manager = MachineStateManager(self.client)
436+ machine_state0 = yield manager.add_machine_state()
437+ yield self.agent.provider.start_machine()
438+ yield machine_state0.set_instance_id(0)
439+
440+ machine_state1 = yield manager.add_machine_state()
441+
442+ yield self.agent.watch_machine_changes(
443+ None, [machine_state0.id, machine_state1.id])
444+
445+ machines = yield self.agent.provider.list_machines()
446+ self.assertEquals(len(machines), 2)
447+
448+ instance_id = yield machine_state1.get_instance_id()
449+ self.assertEqual(instance_id, 1)
450+
451+ @inlineCallbacks
452+ def test_watch_machine_changes_terminates_unused(self):
453+ # start an unused machine within the dummy provider instance
454+ yield self.agent.provider.start_machine()
455+ yield self.agent.watch_machine_changes(None, [])
456+
457+ machines = yield self.agent.provider.list_machines()
458+ self.assertFalse(machines)
459+
460+ @inlineCallbacks
461+ def test_new_machine_state_removed_while_processing(self):
462+ """
463+ If the machine state is removed while the event is processing the
464+ state, the watch function should process it normally.
465+ """
466+ yield self.agent.watch_machine_changes(
467+ None, [0])
468+ machines = yield self.agent.provider.list_machines()
469+ self.assertEquals(len(machines), 0)
470+
471+ @inlineCallbacks
472+ def test_process_machines_non_concurrency(self):
473+ """
474+ Process machines should only be executed serially by an
475+ agent.
476+ """
477+ manager = MachineStateManager(self.client)
478+ machine_state0 = yield manager.add_machine_state()
479+ machine_state1 = yield manager.add_machine_state()
480+
481+ call_1 = self.agent.process_machines([machine_state0.id])
482+
483+ # The second call should return immediately due to the
484+ # instance attribute guard.
485+ call_2 = self.agent.process_machines([machine_state1.id])
486+ self.assertEqual(call_2.called, True)
487+ self.assertEqual(call_2.result, False)
488+
489+ # The first call should have started a provider machine
490+ yield call_1
491+
492+ machines = yield self.agent.provider.list_machines()
493+ self.assertEquals(len(machines), 1)
494+
495+ instance_id_0 = yield machine_state0.get_instance_id()
496+ self.assertEqual(instance_id_0, 0)
497+
498+ instance_id_1 = yield machine_state1.get_instance_id()
499+ self.assertEqual(instance_id_1, None)
500+
501+ def test_new_machine_state_removed_while_processing_get_provider_id(self):
502+ """
503+ If the machine state is removed while the event is processing the
504+ state, the watch function should process it normally.
505+ """
506+ yield self.agent.watch_machine_changes(
507+ None, [0])
508+ machines = yield self.agent.provider.list_machines()
509+ self.assertEquals(len(machines), 0)
510+
511+ @inlineCallbacks
512+ def test_on_environment_change_agent_reconfigures(self):
513+ """
514+ If the environment changes the agent reconfigures itself
515+ """
516+ provider = self.agent.provider
517+ data = self.get_serialized_environment()
518+ yield self.client.set("/environment", data)
519+ yield self.sleep(0.2)
520+ self.assertNotIdentical(provider, self.agent.provider)
521+
522+ @inlineCallbacks
523+ def test_machine_state_reflects_invalid_provider_state(self):
524+ """
525+ If a machine state has an invalid instance_id, it should be detected,
526+ and a new machine started and the machine state updated with the
527+ new instance_id.
528+ """
529+ machine_manager = MachineStateManager(self.client)
530+ m1 = yield machine_manager.add_machine_state()
531+ yield m1.set_instance_id("zebra")
532+
533+ m2 = yield machine_manager.add_machine_state()
534+ yield self.agent.watch_machine_changes(None, [m1.id, m2.id])
535+
536+ m1_instance_id = yield m1.get_instance_id()
537+ self.assertEqual(m1_instance_id, 0)
538+
539+ m2_instance_id = yield m2.get_instance_id()
540+ self.assertEqual(m2_instance_id, 1)
541+
542+ def test_periodic_task(self):
543+ """
544+ The agent schedules period checks that execute the process machines
545+ call.
546+ """
547+ mock_reactor = self.mocker.patch(reactor)
548+ mock_reactor.callLater(self.agent.machine_check_period,
549+ self.agent.periodic_machine_check)
550+ mock_agent = self.mocker.patch(self.agent)
551+ mock_agent.process_machines(())
552+ self.mocker.result(succeed(None))
553+ self.mocker.replay()
554+
555+ # mocker magic test
556+ self.agent.periodic_machine_check()
557+
558+ @inlineCallbacks
559+ def test_transient_provider_error_on_start_machine(self):
560+ """
561+ If there's an error when processing changes, the agent should log
562+ the error and continue.
563+ """
564+ manager = MachineStateManager(self.client)
565+ machine_state0 = yield manager.add_machine_state()
566+ machine_state1 = yield manager.add_machine_state()
567+
568+ mock_provider = self.mocker.patch(self.agent.provider)
569+ mock_provider.start_machine({"machine_id": 0})
570+ self.mocker.result(fail(ProviderInteractionError(OSError("Bad"))))
571+
572+ mock_provider.start_machine({"machine_id": 1})
573+ self.mocker.passthrough()
574+ self.mocker.replay()
575+
576+ yield self.agent.watch_machine_changes(
577+ [], [machine_state0.id, machine_state1.id])
578+
579+ machine1_instance_id = yield machine_state1.get_instance_id()
580+ self.assertEqual(machine1_instance_id, 0)
581+
582+ @inlineCallbacks
583+ def test_transient_provider_error_on_shutdown_machine(self):
584+
585+ yield self.agent.provider.start_machine()
586+ mock_provider = self.mocker.patch(self.agent.provider)
587+
588+ mock_provider.shutdown_machine(MATCH_MACHINE)
589+ self.mocker.result(fail(ProviderInteractionError("Bad")))
590+
591+ mock_provider.shutdown_machine(MATCH_MACHINE)
592+ self.mocker.passthrough()
593+
594+ self.mocker.replay()
595+ try:
596+ yield self.agent.process_machines([])
597+ except:
598+ self.fail("Should not raise")
599+
600+ machines = yield self.agent.provider.list_machines()
601+ self.assertTrue(machines)
602+
603+ yield self.agent.process_machines([])
604+ machines = yield self.agent.provider.list_machines()
605+ self.assertFalse(machines)
606+
607+ @inlineCallbacks
608+ def test_transient_provider_error_on_list_machines(self):
609+ manager = MachineStateManager(self.client)
610+ machine_state0 = yield manager.add_machine_state()
611+
612+ mock_provider = self.mocker.patch(self.agent.provider)
613+ mock_provider.list_machines()
614+ self.mocker.result(fail(ProviderInteractionError("Bad")))
615+
616+ mock_provider.list_machines()
617+ self.mocker.passthrough()
618+
619+ self.mocker.replay()
620+ try:
621+ yield self.agent.process_machines([machine_state0.id])
622+ except:
623+ self.fail("Should not raise")
624+
625+ instance_id = yield machine_state0.get_instance_id()
626+ self.assertEqual(instance_id, None)
627+
628+ yield self.agent.process_machines(
629+ [machine_state0.id])
630+
631+ instance_id = yield machine_state0.get_instance_id()
632+ self.assertEqual(instance_id, 0)
633+
634+ @inlineCallbacks
635+ def test_start_agent_with_watch(self):
636+ mock_reactor = self.mocker.patch(reactor)
637+ mock_reactor.callLater(
638+ self.agent.machine_check_period,
639+ self.agent.periodic_machine_check)
640+ self.mocker.replay()
641+
642+ self.agent.set_watch_enabled(True)
643+ yield self.agent.start()
644+
645+ manager = MachineStateManager(self.client)
646+ machine_state0 = yield manager.add_machine_state()
647+
648+ # the watch invocation happens out of band, sleep for
649+ # a moment so it has a chance to run.
650+ yield self.sleep(0.1)
651+ instance_id = yield machine_state0.get_instance_id()
652+ self.assertEqual(instance_id, 0)
653
654=== modified file 'ensemble/errors.py'
655--- ensemble/errors.py 2010-08-27 20:48:57 +0000
656+++ ensemble/errors.py 2010-09-17 14:41:43 +0000
657@@ -74,3 +74,13 @@
658
659 class ProviderError(EnsembleError):
660 """Raised when an exception occurs in a provider."""
661+
662+
663+class ProviderInteractionError(ProviderError):
664+
665+ def __init__(self, error):
666+ self.error = error
667+
668+ def __str__(self):
669+ return "ProviderError: Interaction with machine provider failed: %r" \
670+ % self.error
671
672=== modified file 'ensemble/ftests/test_ec2_provider.py'
673--- ensemble/ftests/test_ec2_provider.py 2010-09-08 17:03:26 +0000
674+++ ensemble/ftests/test_ec2_provider.py 2010-09-17 14:41:43 +0000
675@@ -100,7 +100,7 @@
676 return provider_instances
677
678 @inlineCallbacks
679- def xtest_shutdown(self):
680+ def test_shutdown(self):
681 """
682 Shutting down the provider, terminates all instances associated to
683 the provider instance.
684
685=== modified file 'ensemble/lib/testing.py'
686--- ensemble/lib/testing.py 2010-08-31 16:53:06 +0000
687+++ ensemble/lib/testing.py 2010-09-17 14:41:43 +0000
688@@ -64,3 +64,6 @@
689 os.environ.update(original_environ)
690
691 os.environ.update(kw)
692+
693+ def assertInstance(self, instance, type):
694+ self.assertTrue(isinstance(instance, type))
695
696=== added file 'ensemble/lib/tests/test_twistutils.py'
697--- ensemble/lib/tests/test_twistutils.py 1970-01-01 00:00:00 +0000
698+++ ensemble/lib/tests/test_twistutils.py 2010-09-17 14:41:43 +0000
699@@ -0,0 +1,96 @@
700+from twisted.internet.defer import (
701+ succeed, fail, Deferred, DeferredList, inlineCallbacks, returnValue)
702+from twisted.internet import reactor
703+
704+from ensemble.lib.testing import TestCase
705+from ensemble.lib.twistutils import concurrent_execution_guard
706+
707+
708+class Bar(object):
709+
710+ def __init__(self):
711+ self._count = 0
712+
713+ @concurrent_execution_guard("guard")
714+ def my_function(self, a, b=0):
715+ """zebra"""
716+ return succeed(a / b)
717+
718+ @concurrent_execution_guard("other_guard")
719+ def other_function(self, a):
720+ return fail(OSError("Bad"))
721+
722+ @concurrent_execution_guard("increment_guard")
723+ def slow_increment(self, delay=0.1):
724+ deferred = Deferred()
725+
726+ def _increment():
727+ self._count += 1
728+ return deferred.callback(self._count)
729+
730+ reactor.callLater(delay, _increment)
731+ return deferred
732+
733+ @concurrent_execution_guard("inline_guard")
734+ @inlineCallbacks
735+ def inline_increment(self):
736+ result = yield self.slow_increment()
737+ returnValue(result * 100)
738+
739+
740+class ExecutionGuardTest(TestCase):
741+
742+ def test_guarded_function_metadata(self):
743+ self.assertEqual(Bar().my_function.__name__, "my_function")
744+ self.assertEqual(Bar().my_function.__doc__, "zebra")
745+
746+ def test_guarded_function_failure(self):
747+ foo = Bar()
748+ return self.assertFailure(foo.other_function("1"), OSError)
749+
750+ def test_guarded_function_sync_exception(self):
751+ foo = Bar()
752+ try:
753+ result = foo.my_function(1)
754+ except:
755+ self.fail("Should not raise exception")
756+
757+ self.assertFailure(result, ZeroDivisionError)
758+ self.assertFailure(foo.my_function(1), ZeroDivisionError)
759+ self.assertFalse(foo.guard, False)
760+
761+ def test_guard_multiple_execution(self):
762+ foo = Bar()
763+
764+ d1 = foo.slow_increment()
765+ d2 = foo.slow_increment()
766+
767+ def validate_results(results):
768+ success, value = results[0]
769+ self.assertTrue(success)
770+ self.assertEqual(value, 1)
771+
772+ success, value = results[1]
773+ self.assertTrue(success)
774+ self.assertEqual(value, False)
775+ return foo.slow_increment()
776+
777+ def validate_value(results):
778+ # if the guard had not prevent execution the value
779+ # would be 3.
780+ self.assertEqual(results, 2)
781+
782+ dlist = DeferredList([d1, d2])
783+ dlist.addCallback(validate_results)
784+ dlist.addCallback(validate_value)
785+ return dlist
786+
787+ def test_guard_w_inline_callbacks(self):
788+ foo = Bar()
789+
790+ def validate_result(result):
791+ self.assertEqual(result, 100)
792+
793+ d = foo.inline_increment()
794+ d.addCallback(validate_result)
795+ return d
796
797=== added file 'ensemble/lib/twistutils.py'
798--- ensemble/lib/twistutils.py 1970-01-01 00:00:00 +0000
799+++ ensemble/lib/twistutils.py 2010-09-17 14:41:43 +0000
800@@ -0,0 +1,31 @@
801+from twisted.internet.defer import maybeDeferred, succeed
802+from twisted.python.util import mergeFunctionMetadata
803+
804+
805+def concurrent_execution_guard(attribute):
806+ """Sets attribute to True/False during execution of the decorated method.
807+
808+ Used to ensure non concurrent execution of the decorated function via
809+ an instance attribute. *The underlying function must return a defered*.
810+ """
811+
812+ def guard(f):
813+
814+ def guard_execute(self, *args, **kw):
815+ value = getattr(self, attribute, None)
816+ if value:
817+ return succeed(False)
818+ else:
819+ setattr(self, attribute, True)
820+
821+ d = maybeDeferred(f, self, *args, **kw)
822+
823+ def post_execute(result):
824+ setattr(self, attribute, False)
825+ return result
826+ d.addBoth(post_execute)
827+ return d
828+
829+ return mergeFunctionMetadata(f, guard_execute)
830+
831+ return guard
832
833=== modified file 'ensemble/providers/common.py'
834--- ensemble/providers/common.py 2010-09-16 17:00:40 +0000
835+++ ensemble/providers/common.py 2010-09-17 14:41:43 +0000
836@@ -1,4 +1,6 @@
837 import os
838+from twisted.python.failure import Failure
839+from ensemble.errors import EnsembleError, ProviderInteractionError
840
841 BOOTSTRAP_PACKAGES = [
842 "bzr",
843@@ -18,7 +20,40 @@
844 "python-zookeeper"]
845
846
847+<<<<<<< TREE
848 def get_user_authorized_keys(config):
849+=======
850+def convert_unknown_error(failure):
851+ """
852+ Convert any non ensemble errors to a provider interaction error.
853+
854+ Supports both usage from within an except clause, and as an
855+ errback handler ie. both the following forms are supported.
856+
857+ ...
858+ try:
859+ something()
860+ except Exception, e:
861+ convert_unknown_errors(e)
862+
863+ ...
864+ d.addErrback(convert_unknown_errors)
865+ """
866+ if isinstance(failure, Failure):
867+ error = failure.value
868+ else:
869+ error = failure
870+
871+ if not isinstance(error, EnsembleError):
872+ error = ProviderInteractionError(error)
873+
874+ if isinstance(failure, Failure):
875+ return Failure(error)
876+ raise error
877+
878+
879+def get_user_public_key(config):
880+>>>>>>> MERGE-SOURCE
881 """
882 Locate a public key for the user. Either one explicitly passed
883 in or one in the user's .ssh directory.
884
885=== modified file 'ensemble/providers/ec2/__init__.py'
886--- ensemble/providers/ec2/__init__.py 2010-09-17 14:09:20 +0000
887+++ ensemble/providers/ec2/__init__.py 2010-09-17 14:41:43 +0000
888@@ -3,6 +3,7 @@
889
890 from txaws.service import AWSServiceRegion, REGION_US
891
892+<<<<<<< TREE
893 from ensemble.environment.errors import EnvironmentsConfigError
894 from ensemble.providers.common import get_user_authorized_keys
895
896@@ -12,6 +13,16 @@
897 from launch import EC2LaunchMachine, EC2Bootstrap
898 from shutdown import EC2Shutdown, EC2ShutdownMachine
899 from state import EC2SaveState, EC2LoadState
900+=======
901+from ensemble.providers.common import convert_unknown_error
902+
903+from .connect import EC2Connect
904+from .files import FileStorage
905+from .iterate import EC2MachineIteration
906+from .launch import EC2LaunchMachine, EC2Bootstrap
907+from .shutdown import EC2Shutdown, EC2ShutdownMachine
908+from .state import EC2SaveState, EC2LoadState
909+>>>>>>> MERGE-SOURCE
910
911
912 class MachineProvider(object):
913@@ -35,9 +46,9 @@
914 "and authorized-keys-path. Pick one!")
915
916 def get_serialization_data(self):
917- """
918- Return a yaml serialization of the provider configuration within
919- the environment.
920+ """Return a dictionary serialization of the provider configuration.
921+
922+ Additionally this extracts crednetial information from the environment.
923 """
924 data = copy.deepcopy(self.config)
925 data["secret-key"] = self.config.get(
926@@ -48,6 +59,11 @@
927 data.pop("authorized-keys-path", None)
928 return data
929
930+ def _run_operation(self, operation, *args, **kw):
931+ d = operation.run(*args, **kw)
932+ d.addErrback(convert_unknown_error)
933+ return d
934+
935 def connect(self):
936 """
937 Connect to the zookeeper ensemble running in the machine provider.
938@@ -56,7 +72,7 @@
939 C{ensemble.storage.connection.TunnelProtocol}
940 """
941 connect = EC2Connect(self)
942- return connect.run()
943+ return self._run_operation(connect)
944
945 def get_file_storage(self):
946 """Retrieve the provider C{FileStorage} abstraction."""
947
948=== modified file 'ensemble/providers/ec2/launch.py'
949--- ensemble/providers/ec2/launch.py 2010-09-16 17:00:40 +0000
950+++ ensemble/providers/ec2/launch.py 2010-09-17 14:41:43 +0000
951@@ -231,4 +231,9 @@
952 admin_identity)
953
954 variables["scripts"].append(initialize_script)
955+
956+ provision_agent_start = "python -m %s -n --zookeeper-servers %s" %(
957+ "ensemble.agents.provision", "127.0.0.1:2181")
958+ variables["scripts"].append(provision_agent_start)
959+
960 return variables
961
962=== modified file 'ensemble/providers/ec2/tests/test_launch.py'
963--- ensemble/providers/ec2/tests/test_launch.py 2010-09-16 17:00:40 +0000
964+++ ensemble/providers/ec2/tests/test_launch.py 2010-09-17 14:41:43 +0000
965@@ -287,12 +287,16 @@
966 self.assertEqual(
967 config["packages"],
968 list(DEFAULT_PACKAGES) + BOOTSTRAP_PACKAGES)
969- self.failUnlessIn("admin-identity", config["runcmd"][-1])
970+ self.failUnlessIn("admin-identity", config["runcmd"][-2])
971
972 script = '%s initialize --admin-identity="%s"' % (
973 "sudo /usr/local/bin/ensemble-admin",
974 admin_identity)
975
976+ self.assertEqual(config["runcmd"][-2], script)
977+
978+ script = "python -m %s -n --zookeeper-servers %s" % (
979+ "ensemble.agents.provision", "127.0.0.1:2181")
980 self.assertEqual(config["runcmd"][-1], script)
981 return True
982
983@@ -443,4 +447,4 @@
984 bootstrap = EC2Bootstrap(provider)
985 variables = bootstrap.get_machine_variables()
986 identity = make_identity("admin:%s" % config["admin-secret"])
987- self.failUnlessIn(identity, variables["scripts"][-1])
988+ self.failUnlessIn(identity, variables["scripts"][-2])
989
990=== modified file 'ensemble/providers/tests/test_common.py'
991--- ensemble/providers/tests/test_common.py 2010-09-17 14:09:20 +0000
992+++ ensemble/providers/tests/test_common.py 2010-09-17 14:41:43 +0000
993@@ -1,7 +1,15 @@
994 import os
995
996+from twisted.python.failure import Failure
997+
998 from ensemble.environment.tests.test_config import EnvironmentsConfigTestBase
999+<<<<<<< TREE
1000 from ensemble.providers.common import get_user_authorized_keys
1001+=======
1002+from ensemble.errors import ProviderInteractionError, EnsembleError
1003+from ensemble.providers.common import (
1004+ get_user_public_key, convert_unknown_error)
1005+>>>>>>> MERGE-SOURCE
1006
1007
1008 class CommonProviderTests(EnvironmentsConfigTestBase):
1009@@ -46,5 +54,44 @@
1010
1011 def test_invalid_key_specified(self):
1012 """If an invalid key is specified, a LookupError is raised."""
1013+<<<<<<< TREE
1014 config = {"authorized-keys-path": "zebra_moon.pub"}
1015 self.assertRaises(LookupError, get_user_authorized_keys, config)
1016+=======
1017+ config = {"ssh_public_key": "zebra_moon.pub"}
1018+ self.assertRaises(LookupError, get_user_public_key, config)
1019+
1020+ def test_convert_unknown_error(self):
1021+ error = self.assertRaises(
1022+ ProviderInteractionError,
1023+ convert_unknown_error,
1024+ OSError("Bad"))
1025+ self.assertInstance(error, EnsembleError)
1026+ self.assertEqual(
1027+ str(error),
1028+ "ProviderError: Interaction with machine provider failed: "
1029+ "OSError('Bad',)")
1030+
1031+ def test_convert_unknown_error_ignores_ensemble_error(self):
1032+ error = self.assertRaises(
1033+ EnsembleError,
1034+ convert_unknown_error,
1035+ EnsembleError("Magic"))
1036+ self.assertEqual(
1037+ str(error),
1038+ "Magic")
1039+
1040+ def test_convert_unknown_error_ignores_ensemble_failure(self):
1041+ failure = convert_unknown_error(Failure(EnsembleError("Magic")))
1042+ self.assertTrue(isinstance(failure, Failure))
1043+ self.assertEqual(failure.value.__class__, EnsembleError)
1044+
1045+ def test_convert_unknown_error_with_failure(self):
1046+ failure = convert_unknown_error(Failure(OSError("Bad")))
1047+ self.assertTrue(isinstance(failure, Failure))
1048+ self.assertInstance(failure.value, ProviderInteractionError)
1049+ self.assertEqual(
1050+ str(failure.value),
1051+ "ProviderError: Interaction with machine provider failed: "
1052+ "OSError('Bad',)")
1053+>>>>>>> MERGE-SOURCE
1054
1055=== modified file 'ensemble/state/tests/common.py'
1056--- ensemble/state/tests/common.py 2010-09-15 13:21:49 +0000
1057+++ ensemble/state/tests/common.py 2010-09-17 14:41:43 +0000
1058@@ -57,4 +57,4 @@
1059 """Non-blocking sleep."""
1060 deferred = Deferred()
1061 reactor.callLater(delay, deferred.callback, None)
1062- return deferred
1063+ return deferred
1064
1065=== modified file 'ensemble/tests/test_errors.py'
1066--- ensemble/tests/test_errors.py 2010-08-27 20:48:57 +0000
1067+++ ensemble/tests/test_errors.py 2010-09-17 14:41:43 +0000
1068@@ -1,6 +1,6 @@
1069 from ensemble.errors import (
1070 EnsembleError, FileNotFound, FileAlreadyExists, InvalidEnsembleHeaderValue,
1071- NoConnection, ProviderError)
1072+ NoConnection, ProviderError, ProviderInteractionError)
1073
1074 from ensemble.lib.testing import TestCase
1075
1076@@ -37,3 +37,11 @@
1077 def test_ProviderError(self):
1078 error = ProviderError("Invalid credentials")
1079 self.assertIsEnsembleError(error)
1080+
1081+ def test_ProviderInteractionError(self):
1082+ error = ProviderInteractionError(OSError("Bad Stuff"))
1083+ self.assertIsEnsembleError(error)
1084+ self.assertEquals(
1085+ str(error),
1086+ "ProviderError: Interaction with machine provider failed: "
1087+ "OSError('Bad Stuff',)")

Subscribers

People subscribed via source and target branches

to status/vote changes: