Merge lp:~allenap/maas/rpc-power-sync-async-mix-up into lp:~maas-committers/maas/trunk

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: no longer in the source branch.
Merged at revision: 2975
Proposed branch: lp:~allenap/maas/rpc-power-sync-async-mix-up
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 544 lines (+89/-100)
3 files modified
src/provisioningserver/events.py (+2/-0)
src/provisioningserver/rpc/power.py (+18/-3)
src/provisioningserver/rpc/tests/test_power.py (+69/-97)
To merge this branch: bzr merge lp:~allenap/maas/rpc-power-sync-async-mix-up
Reviewer Review Type Date Requested Status
Raphaël Badin (community) Approve
Review via email: mp+234465@code.launchpad.net

Commit message

Prevent mix-ups between asynchronous and synchronous code in the cluster power control code, and fix what mix-ups there were.

To post a comment you must log in.
Revision history for this message
Raphaël Badin (rvb) wrote :

Looks good. I've tested this code on my NUCs and it seems to solve the bug in question.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/provisioningserver/events.py'
2--- src/provisioningserver/events.py 2014-09-04 15:46:06 +0000
3+++ src/provisioningserver/events.py 2014-09-12 13:25:08 +0000
4@@ -31,6 +31,7 @@
5 RegisterEventType,
6 SendEvent,
7 )
8+from provisioningserver.utils.twisted import asynchronous
9 from twisted.internet.defer import inlineCallbacks
10
11
12@@ -84,6 +85,7 @@
13 }
14
15
16+@asynchronous
17 @inlineCallbacks
18 def send_event_node(event_type, system_id, hostname, description=''):
19 """Send the given node event to the region.
20
21=== modified file 'src/provisioningserver/rpc/power.py'
22--- src/provisioningserver/rpc/power.py 2014-09-11 21:08:12 +0000
23+++ src/provisioningserver/rpc/power.py 2014-09-12 13:25:08 +0000
24@@ -32,7 +32,11 @@
25 MarkNodeFailed,
26 UpdateNodePowerState,
27 )
28-from provisioningserver.utils.twisted import pause
29+from provisioningserver.utils.twisted import (
30+ asynchronous,
31+ pause,
32+ synchronous,
33+ )
34 from twisted.internet import reactor
35 from twisted.internet.defer import (
36 inlineCallbacks,
37@@ -51,6 +55,7 @@
38 maaslog = get_maas_logger("power")
39
40
41+@asynchronous
42 @inlineCallbacks
43 def power_change_failure(system_id, hostname, power_change, message):
44 """Deal with a node failing to be powered up or down."""
45@@ -72,6 +77,7 @@
46 yield send_event_node(event_type, system_id, hostname, message)
47
48
49+@synchronous
50 def perform_power_change(system_id, hostname, power_type, power_change,
51 context):
52 """Issue the given `power_change` command.
53@@ -85,10 +91,12 @@
54 except PowerActionFail as error:
55 message = "Node could not be powered %s: %s" % (
56 power_change, error)
57- power_change_failure(system_id, hostname, power_change, message)
58+ power_change_failure(
59+ system_id, hostname, power_change, message).wait(15)
60 raise
61
62
63+@asynchronous
64 @inlineCallbacks
65 def power_change_success(system_id, hostname, power_change):
66 assert power_change in ['on', 'off'], (
67@@ -105,6 +113,7 @@
68 yield send_event_node(event_type, system_id, hostname)
69
70
71+@asynchronous
72 @inlineCallbacks
73 def power_change_starting(system_id, hostname, power_change):
74 assert power_change in ['on', 'off'], (
75@@ -123,6 +132,7 @@
76 default_waiting_policy = (3, 5, 10)
77
78
79+@asynchronous
80 @inlineCallbacks
81 def change_power_state(system_id, hostname, power_type, power_change, context,
82 clock=reactor):
83@@ -154,7 +164,7 @@
84 perform_power_change, system_id, hostname, power_type,
85 'query', context)
86 if new_power_state == power_change:
87- power_change_success(system_id, hostname, power_change)
88+ yield power_change_success(system_id, hostname, power_change)
89 return
90
91 # Failure: the power state of the node hasn't changed: mark it as
92@@ -163,6 +173,7 @@
93 yield power_change_failure(system_id, hostname, power_change, message)
94
95
96+@asynchronous
97 @inlineCallbacks
98 def power_state_update(system_id, state):
99 """Update a node's power state"""
100@@ -174,6 +185,7 @@
101 )
102
103
104+@asynchronous
105 @inlineCallbacks
106 def power_query_failure(system_id, hostname, message):
107 """Deal with a node failing to be queried."""
108@@ -189,6 +201,7 @@
109 system_id, hostname, message)
110
111
112+@synchronous
113 def perform_power_query(system_id, hostname, power_type, context):
114 """Issue the given `power_query` command.
115
116@@ -200,6 +213,7 @@
117 return action.execute(power_change='query', **context)
118
119
120+@asynchronous
121 @inlineCallbacks
122 def get_power_state(system_id, hostname, power_type, context, clock=reactor):
123 if power_type not in QUERY_POWER_TYPES:
124@@ -230,6 +244,7 @@
125 returnValue('error')
126
127
128+@asynchronous
129 @inlineCallbacks
130 def query_all_nodes(nodes, clock=reactor):
131 """Performs `power_query` on all nodes. If the nodes state has changed,
132
133=== modified file 'src/provisioningserver/rpc/tests/test_power.py'
134--- src/provisioningserver/rpc/tests/test_power.py 2014-09-02 07:42:30 +0000
135+++ src/provisioningserver/rpc/tests/test_power.py 2014-09-12 13:25:08 +0000
136@@ -42,21 +42,24 @@
137 power,
138 region,
139 )
140-from provisioningserver.rpc.testing import MockClusterToRegionRPCFixture
141-from testtools.deferredruntest import assert_fails_with
142+from provisioningserver.rpc.testing import (
143+ MockClusterToRegionRPCFixture,
144+ MockLiveClusterToRegionRPCFixture,
145+ )
146+from testtools import ExpectedException
147+from testtools.deferredruntest import extract_result
148 from twisted.internet import reactor
149 from twisted.internet.defer import (
150 fail,
151 inlineCallbacks,
152 maybeDeferred,
153+ returnValue,
154 )
155 from twisted.internet.task import Clock
156
157
158 class TestPowerHelpers(MAASTestCase):
159
160- run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
161-
162 def patch_rpc_methods(self):
163 fixture = self.useFixture(MockClusterToRegionRPCFixture())
164 protocol, io = fixture.makeEventLoop(
165@@ -86,7 +89,7 @@
166 system_id=system_id,
167 description='')
168 )
169- return d
170+ self.assertIsNone(extract_result(d))
171
172 def test_power_change_starting_emits_event(self):
173 system_id = factory.make_name('system_id')
174@@ -103,7 +106,7 @@
175 system_id=system_id,
176 description='')
177 )
178- return d
179+ self.assertIsNone(extract_result(d))
180
181 def test_power_change_failure_emits_event(self):
182 system_id = factory.make_name('system_id')
183@@ -122,7 +125,7 @@
184 system_id=system_id,
185 description=message)
186 )
187- return d
188+ self.assertIsNone(extract_result(d))
189
190 def test_power_query_failure_emits_event(self):
191 system_id = factory.make_name('system_id')
192@@ -133,7 +136,7 @@
193 system_id, hostname, message)
194 # This blocks until the deferred is complete
195 io.flush()
196- self.assertTrue(d.called)
197+ self.assertIsNone(extract_result(d))
198 self.assertThat(
199 protocol.SendEvent,
200 MockCalledOnceWith(
201@@ -142,7 +145,6 @@
202 system_id=system_id,
203 description=message)
204 )
205- return d
206
207 def test_power_query_failure_marks_node_broken(self):
208 system_id = factory.make_name('system_id')
209@@ -153,7 +155,7 @@
210 system_id, hostname, message)
211 # This blocks until the deferred is complete
212 io.flush()
213- self.assertTrue(d.called)
214+ self.assertIsNone(extract_result(d))
215 self.assertThat(
216 protocol.MarkNodeFailed,
217 MockCalledOnceWith(
218@@ -161,7 +163,6 @@
219 system_id=system_id,
220 error_description=message)
221 )
222- return d
223
224 def test_power_state_update_calls_UpdateNodePowerState(self):
225 system_id = factory.make_name('system_id')
226@@ -171,7 +172,7 @@
227 system_id, state)
228 # This blocks until the deferred is complete
229 io.flush()
230- self.assertTrue(d.called)
231+ self.assertIsNone(extract_result(d))
232 self.assertThat(
233 protocol.UpdateNodePowerState,
234 MockCalledOnceWith(
235@@ -179,18 +180,12 @@
236 system_id=system_id,
237 power_state=state)
238 )
239- return d
240
241
242 class TestChangePowerChange(MAASTestCase):
243
244 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
245
246- def setUp(self):
247- super(TestChangePowerChange, self).setUp()
248- self.patch(
249- provisioningserver.rpc.power, 'deferToThread', maybeDeferred)
250-
251 def patch_power_action(self, return_value=None, side_effect=None):
252 """Patch the PowerAction object.
253
254@@ -212,15 +207,18 @@
255 power_action.return_value = power_action_obj
256 return power_action, power_action_obj_execute
257
258+ @inlineCallbacks
259 def patch_rpc_methods(self, return_value={}, side_effect=None):
260- fixture = self.useFixture(MockClusterToRegionRPCFixture())
261- protocol, io = fixture.makeEventLoop(
262+ fixture = self.useFixture(MockLiveClusterToRegionRPCFixture())
263+ protocol, connecting = fixture.makeEventLoop(
264 region.MarkNodeFailed, region.UpdateNodePowerState,
265 region.SendEvent)
266 protocol.MarkNodeFailed.return_value = return_value
267 protocol.MarkNodeFailed.side_effect = side_effect
268- return protocol.MarkNodeFailed, io
269+ self.addCleanup((yield connecting))
270+ returnValue(protocol.MarkNodeFailed)
271
272+ @inlineCallbacks
273 def test_change_power_state_changes_power_state(self):
274 system_id = factory.make_name('system_id')
275 hostname = factory.make_name('hostname')
276@@ -234,11 +232,10 @@
277 # in the required power state.
278 power_action, execute = self.patch_power_action(
279 return_value=power_change)
280- markNodeBroken, io = self.patch_rpc_methods()
281+ markNodeBroken = yield self.patch_rpc_methods()
282
283- d = power.change_power_state(
284+ yield power.change_power_state(
285 system_id, hostname, power_type, power_change, context)
286- io.flush()
287 self.assertThat(
288 execute,
289 MockCallsMatch(
290@@ -250,8 +247,8 @@
291 )
292 # The node hasn't been marked broken.
293 self.assertThat(markNodeBroken, MockNotCalled())
294- return d
295
296+ @inlineCallbacks
297 def test_change_power_state_doesnt_retry_for_certain_power_types(self):
298 system_id = factory.make_name('system_id')
299 hostname = factory.make_name('hostname')
300@@ -264,11 +261,10 @@
301 self.patch(power, 'pause')
302 power_action, execute = self.patch_power_action(
303 return_value=random.choice(['on', 'off']))
304- markNodeBroken, io = self.patch_rpc_methods()
305+ markNodeBroken = yield self.patch_rpc_methods()
306
307- d = power.change_power_state(
308+ yield power.change_power_state(
309 system_id, hostname, power_type, power_change, context)
310- io.flush()
311 self.assertThat(
312 execute,
313 MockCallsMatch(
314@@ -278,8 +274,8 @@
315 )
316 # The node hasn't been marked broken.
317 self.assertThat(markNodeBroken, MockNotCalled())
318- return d
319
320+ @inlineCallbacks
321 def test_change_power_state_retries_if_power_state_doesnt_change(self):
322 system_id = factory.make_name('system_id')
323 hostname = factory.make_name('hostname')
324@@ -292,11 +288,10 @@
325 # Simulate a failure to power up the node, then a success.
326 power_action, execute = self.patch_power_action(
327 side_effect=[None, 'off', None, 'on'])
328- markNodeBroken, io = self.patch_rpc_methods()
329+ markNodeBroken = yield self.patch_rpc_methods()
330
331- d = power.change_power_state(
332+ yield power.change_power_state(
333 system_id, hostname, power_type, power_change, context)
334- io.flush()
335 self.assertThat(
336 execute,
337 MockCallsMatch(
338@@ -308,8 +303,8 @@
339 )
340 # The node hasn't been marked broken.
341 self.assertThat(markNodeBroken, MockNotCalled())
342- return d
343
344+ @inlineCallbacks
345 def test_change_power_state_marks_the_node_broken_if_failure(self):
346 system_id = factory.make_name('system_id')
347 hostname = factory.make_name('hostname')
348@@ -321,11 +316,10 @@
349 self.patch(power, 'pause')
350 # Simulate a persistent failure.
351 power_action, execute = self.patch_power_action(return_value='off')
352- markNodeBroken, io = self.patch_rpc_methods()
353+ markNodeBroken = yield self.patch_rpc_methods()
354
355- d = power.change_power_state(
356+ yield power.change_power_state(
357 system_id, hostname, power_type, power_change, context)
358- io.flush()
359
360 # The node has been marked broken.
361 msg = "Timeout after %s tries" % len(
362@@ -337,8 +331,8 @@
363 system_id=system_id,
364 error_description=msg)
365 )
366- return d
367
368+ @inlineCallbacks
369 def test_change_power_state_marks_the_node_broken_if_exception(self):
370 system_id = factory.make_name('system_id')
371 hostname = factory.make_name('hostname')
372@@ -352,22 +346,18 @@
373 exception_message = factory.make_name('exception')
374 power_action, execute = self.patch_power_action(
375 side_effect=PowerActionFail(exception_message))
376- markNodeBroken, io = self.patch_rpc_methods()
377-
378- d = power.change_power_state(
379- system_id, hostname, power_type, power_change, context)
380- io.flush()
381- assert_fails_with(d, PowerActionFail)
382+ markNodeBroken = yield self.patch_rpc_methods()
383+
384+ with ExpectedException(PowerActionFail):
385+ yield power.change_power_state(
386+ system_id, hostname, power_type, power_change, context)
387+
388 error_message = "Node could not be powered on: %s" % exception_message
389-
390- def check(failure):
391- self.assertThat(
392- markNodeBroken,
393- MockCalledOnceWith(
394- ANY, system_id=system_id, error_description=error_message))
395-
396- return d.addCallback(check)
397-
398+ self.assertThat(
399+ markNodeBroken, MockCalledOnceWith(
400+ ANY, system_id=system_id, error_description=error_message))
401+
402+ @inlineCallbacks
403 def test_change_power_state_pauses_inbetween_retries(self):
404 system_id = factory.make_name('system_id')
405 hostname = factory.make_name('hostname')
406@@ -379,42 +369,27 @@
407 # Simulate two failures to power up the node, then a success.
408 power_action, execute = self.patch_power_action(
409 side_effect=[None, 'off', None, 'off', None, 'on'])
410- self.patch(power, "deferToThread", maybeDeferred)
411- markNodeBroken, io = self.patch_rpc_methods()
412- clock = Clock()
413-
414- calls_and_pause = [
415- ([
416- call(power_change=power_change, **context),
417- ], 3),
418- ([
419- call(power_change='query', **context),
420- call(power_change=power_change, **context),
421- ], 5),
422- ([
423- call(power_change='query', **context),
424- call(power_change=power_change, **context),
425- ], 10),
426- ([
427- call(power_change='query', **context),
428- ], 0),
429- ]
430- calls = []
431- d = power.change_power_state(
432- system_id, hostname, power_type, power_change, context,
433- clock=clock)
434- for newcalls, waiting_time in calls_and_pause:
435- calls.extend(newcalls)
436- io.flush()
437- self.assertThat(execute, MockCallsMatch(*calls))
438- clock.advance(waiting_time)
439- return d
440+ # Patch calls to pause() to `execute` so that we record both in the
441+ # same place, and can thus see ordering.
442+ self.patch(power, 'pause', execute)
443+
444+ yield self.patch_rpc_methods()
445+
446+ yield power.change_power_state(
447+ system_id, hostname, power_type, power_change, context)
448+
449+ self.assertThat(execute, MockCallsMatch(
450+ call(power_change=power_change, **context),
451+ call(3, reactor), # pause(3, reactor)
452+ call(power_change='query', **context),
453+ call(power_change=power_change, **context),
454+ call(5, reactor), # pause(5, reactor)
455+ call(power_change='query', **context),
456+ ))
457
458
459 class TestPowerQuery(MAASTestCase):
460
461- run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
462-
463 def setUp(self):
464 super(TestPowerQuery, self).setUp()
465 self.patch(
466@@ -468,7 +443,7 @@
467 system_id, hostname, power_type, context)
468 # This blocks until the deferred is complete
469 io.flush()
470- self.assertTrue(d.called)
471+ self.assertEqual(power_state, extract_result(d))
472 self.assertThat(
473 execute,
474 MockCallsMatch(
475@@ -476,8 +451,6 @@
476 call(power_change='query', **context),
477 ),
478 )
479- self.assertEqual(power_state, d.result)
480- return d
481
482 def test_get_power_state_returns_unknown_for_certain_power_types(self):
483 system_id = factory.make_name('system_id')
484@@ -493,9 +466,7 @@
485 system_id, hostname, power_type, context)
486 # This blocks until the deferred is complete
487 io.flush()
488- self.assertTrue(d.called)
489- self.assertEqual('unknown', d.result)
490- return d
491+ self.assertEqual('unknown', extract_result(d))
492
493 def test_get_power_state_retries_if_power_query_fails(self):
494 system_id = factory.make_name('system_id')
495@@ -516,7 +487,7 @@
496 system_id, hostname, power_type, context)
497 # This blocks until the deferred is complete
498 io.flush()
499- self.assertTrue(d.called)
500+ self.assertEqual(power_state, extract_result(d))
501 self.assertThat(
502 execute,
503 MockCallsMatch(
504@@ -526,8 +497,6 @@
505 )
506 # The node hasn't been marked broken.
507 self.assertThat(markNodeBroken, MockNotCalled())
508- self.assertEqual(power_state, d.result)
509- return d
510
511 def test_get_power_state_marks_the_node_broken_if_failure(self):
512 system_id = factory.make_name('system_id')
513@@ -547,7 +516,7 @@
514 system_id, hostname, power_type, context)
515 # This blocks until the deferred is complete
516 io.flush()
517- self.assertTrue(d.called)
518+ self.assertEqual('error', extract_result(d))
519 # The node has been marked broken.
520 self.assertThat(
521 markNodeBroken,
522@@ -557,8 +526,6 @@
523 error_description="Node could not be queried %s (%s) %s" % (
524 system_id, hostname, err_msg))
525 )
526- self.assertEqual('error', d.result)
527- return d
528
529 def test_get_power_state_pauses_inbetween_retries(self):
530 system_id = factory.make_name('system_id')
531@@ -594,7 +561,12 @@
532 io.flush()
533 self.assertThat(execute, MockCallsMatch(*calls))
534 clock.advance(waiting_time)
535- return d
536+ self.assertEqual("off", extract_result(d))
537+
538+
539+class TestPowerQueryAsync(MAASTestCase):
540+
541+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
542
543 def make_node(self):
544 system_id = factory.make_name('system_id')