Merge lp:~allenap/maas/rpc-power-sync-async-mix-up into lp:~maas-committers/maas/trunk
- rpc-power-sync-async-mix-up
- Merge into trunk
Proposed by
Gavin Panella
Status: | Merged |
---|---|
Approved by: | Gavin Panella |
Approved revision: | no longer in the source branch. |
Merged at revision: | 2975 |
Proposed branch: | lp:~allenap/maas/rpc-power-sync-async-mix-up |
Merge into: | lp:~maas-committers/maas/trunk |
Diff against target: |
544 lines (+89/-100) 3 files modified
src/provisioningserver/events.py (+2/-0) src/provisioningserver/rpc/power.py (+18/-3) src/provisioningserver/rpc/tests/test_power.py (+69/-97) |
To merge this branch: | bzr merge lp:~allenap/maas/rpc-power-sync-async-mix-up |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Raphaël Badin (community) | Approve | ||
Review via email: mp+234465@code.launchpad.net |
Commit message
Prevent mix-ups between asynchronous and synchronous code in the cluster power control code, and fix what mix-ups there were.
Description of the change
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'src/provisioningserver/events.py' |
2 | --- src/provisioningserver/events.py 2014-09-04 15:46:06 +0000 |
3 | +++ src/provisioningserver/events.py 2014-09-12 13:25:08 +0000 |
4 | @@ -31,6 +31,7 @@ |
5 | RegisterEventType, |
6 | SendEvent, |
7 | ) |
8 | +from provisioningserver.utils.twisted import asynchronous |
9 | from twisted.internet.defer import inlineCallbacks |
10 | |
11 | |
12 | @@ -84,6 +85,7 @@ |
13 | } |
14 | |
15 | |
16 | +@asynchronous |
17 | @inlineCallbacks |
18 | def send_event_node(event_type, system_id, hostname, description=''): |
19 | """Send the given node event to the region. |
20 | |
21 | === modified file 'src/provisioningserver/rpc/power.py' |
22 | --- src/provisioningserver/rpc/power.py 2014-09-11 21:08:12 +0000 |
23 | +++ src/provisioningserver/rpc/power.py 2014-09-12 13:25:08 +0000 |
24 | @@ -32,7 +32,11 @@ |
25 | MarkNodeFailed, |
26 | UpdateNodePowerState, |
27 | ) |
28 | -from provisioningserver.utils.twisted import pause |
29 | +from provisioningserver.utils.twisted import ( |
30 | + asynchronous, |
31 | + pause, |
32 | + synchronous, |
33 | + ) |
34 | from twisted.internet import reactor |
35 | from twisted.internet.defer import ( |
36 | inlineCallbacks, |
37 | @@ -51,6 +55,7 @@ |
38 | maaslog = get_maas_logger("power") |
39 | |
40 | |
41 | +@asynchronous |
42 | @inlineCallbacks |
43 | def power_change_failure(system_id, hostname, power_change, message): |
44 | """Deal with a node failing to be powered up or down.""" |
45 | @@ -72,6 +77,7 @@ |
46 | yield send_event_node(event_type, system_id, hostname, message) |
47 | |
48 | |
49 | +@synchronous |
50 | def perform_power_change(system_id, hostname, power_type, power_change, |
51 | context): |
52 | """Issue the given `power_change` command. |
53 | @@ -85,10 +91,12 @@ |
54 | except PowerActionFail as error: |
55 | message = "Node could not be powered %s: %s" % ( |
56 | power_change, error) |
57 | - power_change_failure(system_id, hostname, power_change, message) |
58 | + power_change_failure( |
59 | + system_id, hostname, power_change, message).wait(15) |
60 | raise |
61 | |
62 | |
63 | +@asynchronous |
64 | @inlineCallbacks |
65 | def power_change_success(system_id, hostname, power_change): |
66 | assert power_change in ['on', 'off'], ( |
67 | @@ -105,6 +113,7 @@ |
68 | yield send_event_node(event_type, system_id, hostname) |
69 | |
70 | |
71 | +@asynchronous |
72 | @inlineCallbacks |
73 | def power_change_starting(system_id, hostname, power_change): |
74 | assert power_change in ['on', 'off'], ( |
75 | @@ -123,6 +132,7 @@ |
76 | default_waiting_policy = (3, 5, 10) |
77 | |
78 | |
79 | +@asynchronous |
80 | @inlineCallbacks |
81 | def change_power_state(system_id, hostname, power_type, power_change, context, |
82 | clock=reactor): |
83 | @@ -154,7 +164,7 @@ |
84 | perform_power_change, system_id, hostname, power_type, |
85 | 'query', context) |
86 | if new_power_state == power_change: |
87 | - power_change_success(system_id, hostname, power_change) |
88 | + yield power_change_success(system_id, hostname, power_change) |
89 | return |
90 | |
91 | # Failure: the power state of the node hasn't changed: mark it as |
92 | @@ -163,6 +173,7 @@ |
93 | yield power_change_failure(system_id, hostname, power_change, message) |
94 | |
95 | |
96 | +@asynchronous |
97 | @inlineCallbacks |
98 | def power_state_update(system_id, state): |
99 | """Update a node's power state""" |
100 | @@ -174,6 +185,7 @@ |
101 | ) |
102 | |
103 | |
104 | +@asynchronous |
105 | @inlineCallbacks |
106 | def power_query_failure(system_id, hostname, message): |
107 | """Deal with a node failing to be queried.""" |
108 | @@ -189,6 +201,7 @@ |
109 | system_id, hostname, message) |
110 | |
111 | |
112 | +@synchronous |
113 | def perform_power_query(system_id, hostname, power_type, context): |
114 | """Issue the given `power_query` command. |
115 | |
116 | @@ -200,6 +213,7 @@ |
117 | return action.execute(power_change='query', **context) |
118 | |
119 | |
120 | +@asynchronous |
121 | @inlineCallbacks |
122 | def get_power_state(system_id, hostname, power_type, context, clock=reactor): |
123 | if power_type not in QUERY_POWER_TYPES: |
124 | @@ -230,6 +244,7 @@ |
125 | returnValue('error') |
126 | |
127 | |
128 | +@asynchronous |
129 | @inlineCallbacks |
130 | def query_all_nodes(nodes, clock=reactor): |
131 | """Performs `power_query` on all nodes. If the nodes state has changed, |
132 | |
133 | === modified file 'src/provisioningserver/rpc/tests/test_power.py' |
134 | --- src/provisioningserver/rpc/tests/test_power.py 2014-09-02 07:42:30 +0000 |
135 | +++ src/provisioningserver/rpc/tests/test_power.py 2014-09-12 13:25:08 +0000 |
136 | @@ -42,21 +42,24 @@ |
137 | power, |
138 | region, |
139 | ) |
140 | -from provisioningserver.rpc.testing import MockClusterToRegionRPCFixture |
141 | -from testtools.deferredruntest import assert_fails_with |
142 | +from provisioningserver.rpc.testing import ( |
143 | + MockClusterToRegionRPCFixture, |
144 | + MockLiveClusterToRegionRPCFixture, |
145 | + ) |
146 | +from testtools import ExpectedException |
147 | +from testtools.deferredruntest import extract_result |
148 | from twisted.internet import reactor |
149 | from twisted.internet.defer import ( |
150 | fail, |
151 | inlineCallbacks, |
152 | maybeDeferred, |
153 | + returnValue, |
154 | ) |
155 | from twisted.internet.task import Clock |
156 | |
157 | |
158 | class TestPowerHelpers(MAASTestCase): |
159 | |
160 | - run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
161 | - |
162 | def patch_rpc_methods(self): |
163 | fixture = self.useFixture(MockClusterToRegionRPCFixture()) |
164 | protocol, io = fixture.makeEventLoop( |
165 | @@ -86,7 +89,7 @@ |
166 | system_id=system_id, |
167 | description='') |
168 | ) |
169 | - return d |
170 | + self.assertIsNone(extract_result(d)) |
171 | |
172 | def test_power_change_starting_emits_event(self): |
173 | system_id = factory.make_name('system_id') |
174 | @@ -103,7 +106,7 @@ |
175 | system_id=system_id, |
176 | description='') |
177 | ) |
178 | - return d |
179 | + self.assertIsNone(extract_result(d)) |
180 | |
181 | def test_power_change_failure_emits_event(self): |
182 | system_id = factory.make_name('system_id') |
183 | @@ -122,7 +125,7 @@ |
184 | system_id=system_id, |
185 | description=message) |
186 | ) |
187 | - return d |
188 | + self.assertIsNone(extract_result(d)) |
189 | |
190 | def test_power_query_failure_emits_event(self): |
191 | system_id = factory.make_name('system_id') |
192 | @@ -133,7 +136,7 @@ |
193 | system_id, hostname, message) |
194 | # This blocks until the deferred is complete |
195 | io.flush() |
196 | - self.assertTrue(d.called) |
197 | + self.assertIsNone(extract_result(d)) |
198 | self.assertThat( |
199 | protocol.SendEvent, |
200 | MockCalledOnceWith( |
201 | @@ -142,7 +145,6 @@ |
202 | system_id=system_id, |
203 | description=message) |
204 | ) |
205 | - return d |
206 | |
207 | def test_power_query_failure_marks_node_broken(self): |
208 | system_id = factory.make_name('system_id') |
209 | @@ -153,7 +155,7 @@ |
210 | system_id, hostname, message) |
211 | # This blocks until the deferred is complete |
212 | io.flush() |
213 | - self.assertTrue(d.called) |
214 | + self.assertIsNone(extract_result(d)) |
215 | self.assertThat( |
216 | protocol.MarkNodeFailed, |
217 | MockCalledOnceWith( |
218 | @@ -161,7 +163,6 @@ |
219 | system_id=system_id, |
220 | error_description=message) |
221 | ) |
222 | - return d |
223 | |
224 | def test_power_state_update_calls_UpdateNodePowerState(self): |
225 | system_id = factory.make_name('system_id') |
226 | @@ -171,7 +172,7 @@ |
227 | system_id, state) |
228 | # This blocks until the deferred is complete |
229 | io.flush() |
230 | - self.assertTrue(d.called) |
231 | + self.assertIsNone(extract_result(d)) |
232 | self.assertThat( |
233 | protocol.UpdateNodePowerState, |
234 | MockCalledOnceWith( |
235 | @@ -179,18 +180,12 @@ |
236 | system_id=system_id, |
237 | power_state=state) |
238 | ) |
239 | - return d |
240 | |
241 | |
242 | class TestChangePowerChange(MAASTestCase): |
243 | |
244 | run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
245 | |
246 | - def setUp(self): |
247 | - super(TestChangePowerChange, self).setUp() |
248 | - self.patch( |
249 | - provisioningserver.rpc.power, 'deferToThread', maybeDeferred) |
250 | - |
251 | def patch_power_action(self, return_value=None, side_effect=None): |
252 | """Patch the PowerAction object. |
253 | |
254 | @@ -212,15 +207,18 @@ |
255 | power_action.return_value = power_action_obj |
256 | return power_action, power_action_obj_execute |
257 | |
258 | + @inlineCallbacks |
259 | def patch_rpc_methods(self, return_value={}, side_effect=None): |
260 | - fixture = self.useFixture(MockClusterToRegionRPCFixture()) |
261 | - protocol, io = fixture.makeEventLoop( |
262 | + fixture = self.useFixture(MockLiveClusterToRegionRPCFixture()) |
263 | + protocol, connecting = fixture.makeEventLoop( |
264 | region.MarkNodeFailed, region.UpdateNodePowerState, |
265 | region.SendEvent) |
266 | protocol.MarkNodeFailed.return_value = return_value |
267 | protocol.MarkNodeFailed.side_effect = side_effect |
268 | - return protocol.MarkNodeFailed, io |
269 | + self.addCleanup((yield connecting)) |
270 | + returnValue(protocol.MarkNodeFailed) |
271 | |
272 | + @inlineCallbacks |
273 | def test_change_power_state_changes_power_state(self): |
274 | system_id = factory.make_name('system_id') |
275 | hostname = factory.make_name('hostname') |
276 | @@ -234,11 +232,10 @@ |
277 | # in the required power state. |
278 | power_action, execute = self.patch_power_action( |
279 | return_value=power_change) |
280 | - markNodeBroken, io = self.patch_rpc_methods() |
281 | + markNodeBroken = yield self.patch_rpc_methods() |
282 | |
283 | - d = power.change_power_state( |
284 | + yield power.change_power_state( |
285 | system_id, hostname, power_type, power_change, context) |
286 | - io.flush() |
287 | self.assertThat( |
288 | execute, |
289 | MockCallsMatch( |
290 | @@ -250,8 +247,8 @@ |
291 | ) |
292 | # The node hasn't been marked broken. |
293 | self.assertThat(markNodeBroken, MockNotCalled()) |
294 | - return d |
295 | |
296 | + @inlineCallbacks |
297 | def test_change_power_state_doesnt_retry_for_certain_power_types(self): |
298 | system_id = factory.make_name('system_id') |
299 | hostname = factory.make_name('hostname') |
300 | @@ -264,11 +261,10 @@ |
301 | self.patch(power, 'pause') |
302 | power_action, execute = self.patch_power_action( |
303 | return_value=random.choice(['on', 'off'])) |
304 | - markNodeBroken, io = self.patch_rpc_methods() |
305 | + markNodeBroken = yield self.patch_rpc_methods() |
306 | |
307 | - d = power.change_power_state( |
308 | + yield power.change_power_state( |
309 | system_id, hostname, power_type, power_change, context) |
310 | - io.flush() |
311 | self.assertThat( |
312 | execute, |
313 | MockCallsMatch( |
314 | @@ -278,8 +274,8 @@ |
315 | ) |
316 | # The node hasn't been marked broken. |
317 | self.assertThat(markNodeBroken, MockNotCalled()) |
318 | - return d |
319 | |
320 | + @inlineCallbacks |
321 | def test_change_power_state_retries_if_power_state_doesnt_change(self): |
322 | system_id = factory.make_name('system_id') |
323 | hostname = factory.make_name('hostname') |
324 | @@ -292,11 +288,10 @@ |
325 | # Simulate a failure to power up the node, then a success. |
326 | power_action, execute = self.patch_power_action( |
327 | side_effect=[None, 'off', None, 'on']) |
328 | - markNodeBroken, io = self.patch_rpc_methods() |
329 | + markNodeBroken = yield self.patch_rpc_methods() |
330 | |
331 | - d = power.change_power_state( |
332 | + yield power.change_power_state( |
333 | system_id, hostname, power_type, power_change, context) |
334 | - io.flush() |
335 | self.assertThat( |
336 | execute, |
337 | MockCallsMatch( |
338 | @@ -308,8 +303,8 @@ |
339 | ) |
340 | # The node hasn't been marked broken. |
341 | self.assertThat(markNodeBroken, MockNotCalled()) |
342 | - return d |
343 | |
344 | + @inlineCallbacks |
345 | def test_change_power_state_marks_the_node_broken_if_failure(self): |
346 | system_id = factory.make_name('system_id') |
347 | hostname = factory.make_name('hostname') |
348 | @@ -321,11 +316,10 @@ |
349 | self.patch(power, 'pause') |
350 | # Simulate a persistent failure. |
351 | power_action, execute = self.patch_power_action(return_value='off') |
352 | - markNodeBroken, io = self.patch_rpc_methods() |
353 | + markNodeBroken = yield self.patch_rpc_methods() |
354 | |
355 | - d = power.change_power_state( |
356 | + yield power.change_power_state( |
357 | system_id, hostname, power_type, power_change, context) |
358 | - io.flush() |
359 | |
360 | # The node has been marked broken. |
361 | msg = "Timeout after %s tries" % len( |
362 | @@ -337,8 +331,8 @@ |
363 | system_id=system_id, |
364 | error_description=msg) |
365 | ) |
366 | - return d |
367 | |
368 | + @inlineCallbacks |
369 | def test_change_power_state_marks_the_node_broken_if_exception(self): |
370 | system_id = factory.make_name('system_id') |
371 | hostname = factory.make_name('hostname') |
372 | @@ -352,22 +346,18 @@ |
373 | exception_message = factory.make_name('exception') |
374 | power_action, execute = self.patch_power_action( |
375 | side_effect=PowerActionFail(exception_message)) |
376 | - markNodeBroken, io = self.patch_rpc_methods() |
377 | - |
378 | - d = power.change_power_state( |
379 | - system_id, hostname, power_type, power_change, context) |
380 | - io.flush() |
381 | - assert_fails_with(d, PowerActionFail) |
382 | + markNodeBroken = yield self.patch_rpc_methods() |
383 | + |
384 | + with ExpectedException(PowerActionFail): |
385 | + yield power.change_power_state( |
386 | + system_id, hostname, power_type, power_change, context) |
387 | + |
388 | error_message = "Node could not be powered on: %s" % exception_message |
389 | - |
390 | - def check(failure): |
391 | - self.assertThat( |
392 | - markNodeBroken, |
393 | - MockCalledOnceWith( |
394 | - ANY, system_id=system_id, error_description=error_message)) |
395 | - |
396 | - return d.addCallback(check) |
397 | - |
398 | + self.assertThat( |
399 | + markNodeBroken, MockCalledOnceWith( |
400 | + ANY, system_id=system_id, error_description=error_message)) |
401 | + |
402 | + @inlineCallbacks |
403 | def test_change_power_state_pauses_inbetween_retries(self): |
404 | system_id = factory.make_name('system_id') |
405 | hostname = factory.make_name('hostname') |
406 | @@ -379,42 +369,27 @@ |
407 | # Simulate two failures to power up the node, then a success. |
408 | power_action, execute = self.patch_power_action( |
409 | side_effect=[None, 'off', None, 'off', None, 'on']) |
410 | - self.patch(power, "deferToThread", maybeDeferred) |
411 | - markNodeBroken, io = self.patch_rpc_methods() |
412 | - clock = Clock() |
413 | - |
414 | - calls_and_pause = [ |
415 | - ([ |
416 | - call(power_change=power_change, **context), |
417 | - ], 3), |
418 | - ([ |
419 | - call(power_change='query', **context), |
420 | - call(power_change=power_change, **context), |
421 | - ], 5), |
422 | - ([ |
423 | - call(power_change='query', **context), |
424 | - call(power_change=power_change, **context), |
425 | - ], 10), |
426 | - ([ |
427 | - call(power_change='query', **context), |
428 | - ], 0), |
429 | - ] |
430 | - calls = [] |
431 | - d = power.change_power_state( |
432 | - system_id, hostname, power_type, power_change, context, |
433 | - clock=clock) |
434 | - for newcalls, waiting_time in calls_and_pause: |
435 | - calls.extend(newcalls) |
436 | - io.flush() |
437 | - self.assertThat(execute, MockCallsMatch(*calls)) |
438 | - clock.advance(waiting_time) |
439 | - return d |
440 | + # Patch calls to pause() to `execute` so that we record both in the |
441 | + # same place, and can thus see ordering. |
442 | + self.patch(power, 'pause', execute) |
443 | + |
444 | + yield self.patch_rpc_methods() |
445 | + |
446 | + yield power.change_power_state( |
447 | + system_id, hostname, power_type, power_change, context) |
448 | + |
449 | + self.assertThat(execute, MockCallsMatch( |
450 | + call(power_change=power_change, **context), |
451 | + call(3, reactor), # pause(3, reactor) |
452 | + call(power_change='query', **context), |
453 | + call(power_change=power_change, **context), |
454 | + call(5, reactor), # pause(5, reactor) |
455 | + call(power_change='query', **context), |
456 | + )) |
457 | |
458 | |
459 | class TestPowerQuery(MAASTestCase): |
460 | |
461 | - run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
462 | - |
463 | def setUp(self): |
464 | super(TestPowerQuery, self).setUp() |
465 | self.patch( |
466 | @@ -468,7 +443,7 @@ |
467 | system_id, hostname, power_type, context) |
468 | # This blocks until the deferred is complete |
469 | io.flush() |
470 | - self.assertTrue(d.called) |
471 | + self.assertEqual(power_state, extract_result(d)) |
472 | self.assertThat( |
473 | execute, |
474 | MockCallsMatch( |
475 | @@ -476,8 +451,6 @@ |
476 | call(power_change='query', **context), |
477 | ), |
478 | ) |
479 | - self.assertEqual(power_state, d.result) |
480 | - return d |
481 | |
482 | def test_get_power_state_returns_unknown_for_certain_power_types(self): |
483 | system_id = factory.make_name('system_id') |
484 | @@ -493,9 +466,7 @@ |
485 | system_id, hostname, power_type, context) |
486 | # This blocks until the deferred is complete |
487 | io.flush() |
488 | - self.assertTrue(d.called) |
489 | - self.assertEqual('unknown', d.result) |
490 | - return d |
491 | + self.assertEqual('unknown', extract_result(d)) |
492 | |
493 | def test_get_power_state_retries_if_power_query_fails(self): |
494 | system_id = factory.make_name('system_id') |
495 | @@ -516,7 +487,7 @@ |
496 | system_id, hostname, power_type, context) |
497 | # This blocks until the deferred is complete |
498 | io.flush() |
499 | - self.assertTrue(d.called) |
500 | + self.assertEqual(power_state, extract_result(d)) |
501 | self.assertThat( |
502 | execute, |
503 | MockCallsMatch( |
504 | @@ -526,8 +497,6 @@ |
505 | ) |
506 | # The node hasn't been marked broken. |
507 | self.assertThat(markNodeBroken, MockNotCalled()) |
508 | - self.assertEqual(power_state, d.result) |
509 | - return d |
510 | |
511 | def test_get_power_state_marks_the_node_broken_if_failure(self): |
512 | system_id = factory.make_name('system_id') |
513 | @@ -547,7 +516,7 @@ |
514 | system_id, hostname, power_type, context) |
515 | # This blocks until the deferred is complete |
516 | io.flush() |
517 | - self.assertTrue(d.called) |
518 | + self.assertEqual('error', extract_result(d)) |
519 | # The node has been marked broken. |
520 | self.assertThat( |
521 | markNodeBroken, |
522 | @@ -557,8 +526,6 @@ |
523 | error_description="Node could not be queried %s (%s) %s" % ( |
524 | system_id, hostname, err_msg)) |
525 | ) |
526 | - self.assertEqual('error', d.result) |
527 | - return d |
528 | |
529 | def test_get_power_state_pauses_inbetween_retries(self): |
530 | system_id = factory.make_name('system_id') |
531 | @@ -594,7 +561,12 @@ |
532 | io.flush() |
533 | self.assertThat(execute, MockCallsMatch(*calls)) |
534 | clock.advance(waiting_time) |
535 | - return d |
536 | + self.assertEqual("off", extract_result(d)) |
537 | + |
538 | + |
539 | +class TestPowerQueryAsync(MAASTestCase): |
540 | + |
541 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
542 | |
543 | def make_node(self): |
544 | system_id = factory.make_name('system_id') |
Looks good. I've tested this code on my NUCs and it seems to solve the bug in question.