Merge lp:~blake-rouse/maas/queue-status-messages into lp:~maas-committers/maas/trunk
- queue-status-messages
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Blake Rouse |
Approved revision: | no longer in the source branch. |
Merged at revision: | 5763 |
Proposed branch: | lp:~blake-rouse/maas/queue-status-messages |
Merge into: | lp:~maas-committers/maas/trunk |
Diff against target: |
1575 lines (+1274/-25) 13 files modified
src/maasserver/eventloop.py (+14/-3) src/maasserver/models/event.py (+4/-3) src/maasserver/models/node.py (+1/-1) src/maasserver/models/tests/test_event.py (+10/-0) src/maasserver/preseed.py (+1/-0) src/maasserver/tests/test_eventloop.py (+19/-2) src/maasserver/tests/test_plugin.py (+1/-0) src/maasserver/tests/test_preseed.py (+3/-0) src/maasserver/tests/test_webapp.py (+87/-4) src/maasserver/webapp.py (+76/-10) src/metadataserver/api.py (+2/-2) src/metadataserver/api_twisted.py (+290/-0) src/metadataserver/tests/test_api_twisted.py (+766/-0) |
To merge this branch: | bzr merge lp:~blake-rouse/maas/queue-status-messages |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Andres Rodriguez (community) | Approve | ||
Review via email: mp+317394@code.launchpad.net |
Commit message
Status message from curtin and cloud-init are now handled by twisted. Each message is placed in a queue to be processed every second. This helps with the load on the regiond processes by dropping load by more than 50%. This branch also allows a Twisted API endpoint to overlay a Django endpoint.
Description of the change
Blake Rouse (blake-rouse) : | # |
MAAS Lander (maas-lander) wrote : | # |
The attempt to merge lp:~blake-rouse/maas/queue-status-messages into lp:maas failed. Below is the output from the failed tests.
Hit:1 http://
Get:2 http://
Get:3 http://
Get:4 http://
Get:5 http://
Get:6 http://
Get:7 http://
Get:8 http://
Get:9 http://
Get:10 http://
Fetched 1,793 kB in 0s (2,724 kB/s)
Reading package lists...
sudo DEBIAN_
--no-
Reading package lists...
Building dependency tree...
Reading state information...
authbind is already the newest version (2.1.1+nmu1).
avahi-utils is already the newest version (0.6.32~
build-essential is already the newest version (12.1ubuntu2).
debhelper is already the newest version (9.20160115ubun
distro-info is already the newest version (0.14build1).
git is already the newest version (1:2.7.4-0ubuntu1).
libjs-angularjs is already the newest version (1.2.28-1ubuntu2).
libjs-jquery is ...
Preview Diff
1 | === modified file 'src/maasserver/eventloop.py' |
2 | --- src/maasserver/eventloop.py 2016-12-05 09:38:41 +0000 |
3 | +++ src/maasserver/eventloop.py 2017-02-28 20:14:47 +0000 |
4 | @@ -120,6 +120,11 @@ |
5 | return RackControllerService(postgresListener, advertisingService) |
6 | |
7 | |
8 | +def make_StatusWorkerService(dbtasks): |
9 | + from metadataserver.api_twisted import StatusWorkerService |
10 | + return StatusWorkerService(dbtasks) |
11 | + |
12 | + |
13 | def make_ServiceMonitorService(advertisingService): |
14 | from maasserver.regiondservices import service_monitor_service |
15 | return service_monitor_service.ServiceMonitorService(advertisingService) |
16 | @@ -151,7 +156,7 @@ |
17 | return ntp.RegionNetworkTimeProtocolService(reactor) |
18 | |
19 | |
20 | -def make_WebApplicationService(postgresListener): |
21 | +def make_WebApplicationService(postgresListener, statusWorker): |
22 | from maasserver.webapp import WebApplicationService |
23 | site_port = DEFAULT_PORT # config["port"] |
24 | # Make a socket with SO_REUSEPORT set so that we can run multiple web |
25 | @@ -169,7 +174,8 @@ |
26 | site_endpoint = AdoptedStreamServerEndpoint(reactor, s.fileno(), s.family) |
27 | site_endpoint.port = site_port # Make it easy to get the port number. |
28 | site_endpoint.socket = s # Prevent garbage collection. |
29 | - site_service = WebApplicationService(site_endpoint, postgresListener) |
30 | + site_service = WebApplicationService( |
31 | + site_endpoint, postgresListener, statusWorker) |
32 | return site_service |
33 | |
34 | |
35 | @@ -245,13 +251,18 @@ |
36 | "web": { |
37 | "only_on_master": False, |
38 | "factory": make_WebApplicationService, |
39 | - "requires": ["postgres-listener"], |
40 | + "requires": ["postgres-listener", "status-worker"], |
41 | }, |
42 | "service-monitor": { |
43 | "only_on_master": True, |
44 | "factory": make_ServiceMonitorService, |
45 | "requires": ["rpc-advertise"], |
46 | }, |
47 | + "status-worker": { |
48 | + "only_on_master": False, |
49 | + "factory": make_StatusWorkerService, |
50 | + "requires": ["database-tasks"], |
51 | + }, |
52 | "networks-monitor": { |
53 | "only_on_master": False, |
54 | "factory": make_NetworksMonitoringService, |
55 | |
56 | === modified file 'src/maasserver/models/event.py' |
57 | --- src/maasserver/models/event.py 2016-12-07 12:46:14 +0000 |
58 | +++ src/maasserver/models/event.py 2017-02-28 20:14:47 +0000 |
59 | @@ -32,7 +32,8 @@ |
60 | |
61 | def register_event_and_event_type( |
62 | self, system_id, type_name, type_description='', |
63 | - type_level=logging.INFO, event_action='', event_description=''): |
64 | + type_level=logging.INFO, event_action='', event_description='', |
65 | + created=None): |
66 | """Register EventType if it does not exist, then register the Event.""" |
67 | node = Node.objects.get(system_id=system_id) |
68 | try: |
69 | @@ -42,9 +43,9 @@ |
70 | # We didn't find it so register it. |
71 | event_type = EventType.objects.register( |
72 | type_name, type_description, type_level) |
73 | - Event.objects.create( |
74 | + return Event.objects.create( |
75 | node=node, type=event_type, action=event_action, |
76 | - description=event_description) |
77 | + description=event_description, created=created) |
78 | |
79 | def create_node_event( |
80 | self, system_id, event_type, event_action='', |
81 | |
82 | === modified file 'src/maasserver/models/node.py' |
83 | --- src/maasserver/models/node.py 2017-02-17 21:27:46 +0000 |
84 | +++ src/maasserver/models/node.py 2017-02-28 20:14:47 +0000 |
85 | @@ -1037,7 +1037,7 @@ |
86 | |
87 | def __str__(self): |
88 | if self.hostname: |
89 | - return "%s (%s)" % (self.system_id, self.fqdn) |
90 | + return "%s (%s)" % (self.system_id, self.hostname) |
91 | else: |
92 | return self.system_id |
93 | |
94 | |
95 | === modified file 'src/maasserver/models/tests/test_event.py' |
96 | --- src/maasserver/models/tests/test_event.py 2016-12-07 12:46:14 +0000 |
97 | +++ src/maasserver/models/tests/test_event.py 2017-02-28 20:14:47 +0000 |
98 | @@ -33,6 +33,16 @@ |
99 | system_id=node.system_id, type_name=event_type.name) |
100 | self.assertIsNotNone(Event.objects.get(node=node)) |
101 | |
102 | + def test_register_event_and_event_type_registers_event_with_datetime(self): |
103 | + # EvenType exists |
104 | + node = factory.make_Node() |
105 | + event_type = factory.make_EventType() |
106 | + created = factory.make_date() |
107 | + event = Event.objects.register_event_and_event_type( |
108 | + system_id=node.system_id, type_name=event_type.name, |
109 | + created=created) |
110 | + self.assertEqual(created, event.created) |
111 | + |
112 | def test_register_event_and_event_type_registers_event_for_new_type(self): |
113 | # EventType does not exist |
114 | node = factory.make_Node() |
115 | |
116 | === modified file 'src/maasserver/preseed.py' |
117 | --- src/maasserver/preseed.py 2017-02-22 14:55:09 +0000 |
118 | +++ src/maasserver/preseed.py 2017-02-28 20:14:47 +0000 |
119 | @@ -191,6 +191,7 @@ |
120 | 'consumer_key': token.consumer.key, |
121 | 'token_key': token.key, |
122 | 'token_secret': token.secret, |
123 | + 'level': 'INFO', |
124 | }, |
125 | }, |
126 | 'install': { |
127 | |
128 | === modified file 'src/maasserver/tests/test_eventloop.py' |
129 | --- src/maasserver/tests/test_eventloop.py 2017-01-28 00:51:47 +0000 |
130 | +++ src/maasserver/tests/test_eventloop.py 2017-02-28 20:14:47 +0000 |
131 | @@ -32,6 +32,7 @@ |
132 | ) |
133 | from maastesting.factory import factory |
134 | from maastesting.testcase import MAASTestCase |
135 | +from metadataserver import api_twisted |
136 | from testtools.matchers import ( |
137 | Equals, |
138 | IsInstance, |
139 | @@ -254,7 +255,7 @@ |
140 | |
141 | def test_make_WebApplicationService(self): |
142 | service = eventloop.make_WebApplicationService( |
143 | - FakePostgresListenerService()) |
144 | + FakePostgresListenerService(), sentinel.status_worker) |
145 | self.assertThat(service, IsInstance(webapp.WebApplicationService)) |
146 | # The endpoint is set to port 5243 on localhost. |
147 | self.assertThat(service.endpoint, MatchesStructure.byEquality( |
148 | @@ -271,7 +272,7 @@ |
149 | eventloop.loop.factories["web"]["factory"]) |
150 | # Has a dependency of postgres-listener. |
151 | self.assertEquals( |
152 | - ["postgres-listener"], |
153 | + ["postgres-listener", "status-worker"], |
154 | eventloop.loop.factories["web"]["requires"]) |
155 | self.assertFalse( |
156 | eventloop.loop.factories["web"]["only_on_master"]) |
157 | @@ -308,6 +309,22 @@ |
158 | self.assertTrue( |
159 | eventloop.loop.factories["service-monitor"]["only_on_master"]) |
160 | |
161 | + def test_make_StatusWorkerService(self): |
162 | + service = eventloop.make_StatusWorkerService( |
163 | + sentinel.dbtasks) |
164 | + self.assertThat(service, IsInstance( |
165 | + api_twisted.StatusWorkerService)) |
166 | + # It is registered as a factory in RegionEventLoop. |
167 | + self.assertIs( |
168 | + eventloop.make_StatusWorkerService, |
169 | + eventloop.loop.factories["status-worker"]["factory"]) |
170 | + # Has a dependency of database-tasks. |
171 | + self.assertEquals( |
172 | + ["database-tasks"], |
173 | + eventloop.loop.factories["status-worker"]["requires"]) |
174 | + self.assertFalse( |
175 | + eventloop.loop.factories["status-worker"]["only_on_master"]) |
176 | + |
177 | |
178 | class TestDisablingDatabaseConnections(MAASServerTestCase): |
179 | |
180 | |
181 | === modified file 'src/maasserver/tests/test_plugin.py' |
182 | --- src/maasserver/tests/test_plugin.py 2017-01-28 00:51:47 +0000 |
183 | +++ src/maasserver/tests/test_plugin.py 2017-02-28 20:14:47 +0000 |
184 | @@ -108,6 +108,7 @@ |
185 | "rpc-advertise", |
186 | "service-monitor", |
187 | "status-monitor", |
188 | + "status-worker", |
189 | "web", |
190 | ] |
191 | self.assertItemsEqual(expected_services, service.namedServices.keys()) |
192 | |
193 | === modified file 'src/maasserver/tests/test_preseed.py' |
194 | --- src/maasserver/tests/test_preseed.py 2017-02-24 15:17:17 +0000 |
195 | +++ src/maasserver/tests/test_preseed.py 2017-02-28 20:14:47 +0000 |
196 | @@ -646,6 +646,9 @@ |
197 | self.assertEqual( |
198 | token.secret, |
199 | reporter['reporting']['maas']['token_secret']) |
200 | + self.assertEqual( |
201 | + 'INFO', |
202 | + reporter['reporting']['maas']['level']) |
203 | |
204 | def test__curtin_maas_reporter_without_events_support(self): |
205 | node = factory.make_Node_with_Interface_on_Subnet() |
206 | |
207 | === modified file 'src/maasserver/tests/test_webapp.py' |
208 | --- src/maasserver/tests/test_webapp.py 2017-01-28 00:51:47 +0000 |
209 | +++ src/maasserver/tests/test_webapp.py 2017-02-28 20:14:47 +0000 |
210 | @@ -17,6 +17,7 @@ |
211 | webapp, |
212 | ) |
213 | from maasserver.testing.listener import FakePostgresListenerService |
214 | +from maasserver.webapp import OverlaySite |
215 | from maasserver.websockets.protocol import WebSocketFactory |
216 | from maastesting.factory import factory |
217 | from maastesting.matchers import MockCalledOnceWith |
218 | @@ -27,10 +28,15 @@ |
219 | Is, |
220 | IsInstance, |
221 | MatchesStructure, |
222 | + Not, |
223 | ) |
224 | from twisted.internet import reactor |
225 | from twisted.internet.endpoints import TCP4ServerEndpoint |
226 | -from twisted.web.resource import Resource |
227 | +from twisted.web.error import UnsupportedMethod |
228 | +from twisted.web.resource import ( |
229 | + NoResource, |
230 | + Resource, |
231 | +) |
232 | from twisted.web.server import Site |
233 | from twisted.web.test.requesthelper import ( |
234 | DummyChannel, |
235 | @@ -76,6 +82,70 @@ |
236 | sentinel.command, single_path, sentinel.version)) |
237 | |
238 | |
239 | +class TestOverlaySite(MAASTestCase): |
240 | + |
241 | + def test__init__(self): |
242 | + root = Resource() |
243 | + site = OverlaySite(root) |
244 | + self.assertThat(site, IsInstance(Site)) |
245 | + |
246 | + def test_getResourceFor_returns_no_resource_wo_underlay(self): |
247 | + root = Resource() |
248 | + site = OverlaySite(root) |
249 | + request = DummyRequest([b'MAAS']) |
250 | + resource = site.getResourceFor(request) |
251 | + self.assertThat(resource, IsInstance(NoResource)) |
252 | + |
253 | + def test_getResourceFor_wraps_render_wo_underlay(self): |
254 | + root = Resource() |
255 | + maas = Resource() |
256 | + mock_render = self.patch(maas, 'render') |
257 | + root.putChild(b'MAAS', maas) |
258 | + site = OverlaySite(root) |
259 | + request = DummyRequest([b'MAAS']) |
260 | + resource = site.getResourceFor(request) |
261 | + self.assertThat(resource, Is(maas)) |
262 | + self.assertThat(resource.render, Not(Is(mock_render))) |
263 | + resource.render(request) |
264 | + self.assertThat(mock_render, MockCalledOnceWith(request)) |
265 | + |
266 | + def test_getResourceFor_wraps_render_wo_underlay_raises_no_method(self): |
267 | + root = Resource() |
268 | + maas = Resource() |
269 | + root.putChild(b'MAAS', maas) |
270 | + site = OverlaySite(root) |
271 | + request = DummyRequest([b'MAAS']) |
272 | + resource = site.getResourceFor(request) |
273 | + self.assertThat(resource, Is(maas)) |
274 | + self.assertRaises(UnsupportedMethod, resource.render, request) |
275 | + |
276 | + def test_getResourceFor_returns_resource_from_underlay(self): |
277 | + underlay_root = Resource() |
278 | + underlay_maas = Resource() |
279 | + underlay_root.putChild(b'MAAS', underlay_maas) |
280 | + overlay_root = Resource() |
281 | + site = OverlaySite(overlay_root) |
282 | + site.underlay = Site(underlay_root) |
283 | + request = DummyRequest([b'MAAS']) |
284 | + resource = site.getResourceFor(request) |
285 | + self.assertThat(resource, Is(underlay_maas)) |
286 | + |
287 | + def test_getResourceFor_calls_render_on_underlay_when_no_method(self): |
288 | + underlay_root = Resource() |
289 | + underlay_maas = Resource() |
290 | + mock_underlay_maas_render = self.patch(underlay_maas, 'render') |
291 | + underlay_root.putChild(b'MAAS', underlay_maas) |
292 | + overlay_root = Resource() |
293 | + overlay_maas = Resource() |
294 | + overlay_root.putChild(b'MAAS', overlay_maas) |
295 | + site = OverlaySite(overlay_root) |
296 | + site.underlay = Site(underlay_root) |
297 | + request = DummyRequest([b'MAAS']) |
298 | + resource = site.getResourceFor(request) |
299 | + resource.render(request) |
300 | + self.assertThat(mock_underlay_maas_render, MockCalledOnceWith(request)) |
301 | + |
302 | + |
303 | class TestResourceOverlay(MAASTestCase): |
304 | |
305 | def make_resourceoverlay(self): |
306 | @@ -100,7 +170,7 @@ |
307 | listener = FakePostgresListenerService() |
308 | service_endpoint = self.make_endpoint() |
309 | service = webapp.WebApplicationService( |
310 | - service_endpoint, listener) |
311 | + service_endpoint, listener, sentinel.status_worker) |
312 | # Patch the getServiceNamed so the WebSocketFactory does not |
313 | # error trying to register for events from the RPC service. In this |
314 | # test the RPC service is not started. |
315 | @@ -197,11 +267,24 @@ |
316 | |
317 | service.startService() |
318 | |
319 | + # Overlay |
320 | + site = service.site |
321 | + self.assertThat(site, IsInstance(OverlaySite)) |
322 | resource = service.site.resource |
323 | self.assertThat(resource, IsInstance(Resource)) |
324 | overlay_resource = resource.getChildWithDefault(b"MAAS", request=None) |
325 | - self.assertThat(overlay_resource, IsInstance(webapp.ResourceOverlay)) |
326 | - self.assertThat(overlay_resource.basis, MatchesStructure( |
327 | + self.assertThat(overlay_resource, IsInstance(Resource)) |
328 | + |
329 | + # Underlay |
330 | + site = service.site.underlay |
331 | + self.assertThat(site, IsInstance(Site)) |
332 | + underlay_resource = site.resource |
333 | + self.assertThat(underlay_resource, IsInstance(Resource)) |
334 | + underlay_maas_resource = underlay_resource.getChildWithDefault( |
335 | + b"MAAS", request=None) |
336 | + self.assertThat( |
337 | + underlay_maas_resource, IsInstance(webapp.ResourceOverlay)) |
338 | + self.assertThat(underlay_maas_resource.basis, MatchesStructure( |
339 | _reactor=Is(reactor), _threadpool=Is(service.threadpool), |
340 | _application=IsInstance(WSGIHandler))) |
341 | |
342 | |
343 | === modified file 'src/maasserver/webapp.py' |
344 | --- src/maasserver/webapp.py 2016-10-28 15:58:32 +0000 |
345 | +++ src/maasserver/webapp.py 2017-02-28 20:14:47 +0000 |
346 | @@ -1,4 +1,4 @@ |
347 | -# Copyright 2014-2016 Canonical Ltd. This software is licensed under the |
348 | +# Copyright 2014-2017 Canonical Ltd. This software is licensed under the |
349 | # GNU Affero General Public License version 3 (see the file LICENSE). |
350 | |
351 | """The MAAS Web Application.""" |
352 | @@ -7,6 +7,8 @@ |
353 | "WebApplicationService", |
354 | ] |
355 | |
356 | +import copy |
357 | +from functools import partial |
358 | from http.client import SERVICE_UNAVAILABLE |
359 | import re |
360 | |
361 | @@ -19,6 +21,7 @@ |
362 | lookupProtocolForFactory, |
363 | WebSocketsResource, |
364 | ) |
365 | +from metadataserver.api_twisted import StatusHandlerResource |
366 | from provisioningserver.logger import LegacyLogger |
367 | from provisioningserver.utils.twisted import ( |
368 | asynchronous, |
369 | @@ -30,8 +33,10 @@ |
370 | reactor, |
371 | ) |
372 | from twisted.python import failure |
373 | +from twisted.web.error import UnsupportedMethod |
374 | from twisted.web.resource import ( |
375 | ErrorPage, |
376 | + NoResource, |
377 | Resource, |
378 | ) |
379 | from twisted.web.server import ( |
380 | @@ -81,6 +86,49 @@ |
381 | command, path, version) |
382 | |
383 | |
384 | +class OverlaySite(Site): |
385 | + """A site that is over another site. |
386 | + |
387 | + If this site cannot resolve a valid resource to handle the request, then |
388 | + the underlay site gets passed the request to process. |
389 | + """ |
390 | + |
391 | + underlay = None |
392 | + |
393 | + def getResourceFor(self, request): |
394 | + """Override to support an underlay site. |
395 | + |
396 | + If this site cannot return a valid resource to request is passed to |
397 | + the underlay site to resolve the request. |
398 | + """ |
399 | + def call_underlay(request): |
400 | + # Reset the paths and forward to the underlay site. |
401 | + request.prepath = [] |
402 | + request.postpath = postpath |
403 | + return self.underlay.getResourceFor(request) |
404 | + |
405 | + def wrap_render(orig_render, request): |
406 | + # Wrap the render call of the resource, catching any |
407 | + # UnsupportedMethod exceptions and forwarding those onto |
408 | + # the underlay site. |
409 | + try: |
410 | + return orig_render(request) |
411 | + except UnsupportedMethod: |
412 | + if self.underlay is not None: |
413 | + resource = call_underlay(request) |
414 | + return resource.render(request) |
415 | + else: |
416 | + raise |
417 | + |
418 | + postpath = copy.copy(request.postpath) |
419 | + result = super(OverlaySite, self).getResourceFor(request) |
420 | + if isinstance(result, NoResource) and self.underlay is not None: |
421 | + return call_underlay(request) |
422 | + else: |
423 | + result.render = partial(wrap_render, result.render) |
424 | + return result |
425 | + |
426 | + |
427 | class ResourceOverlay(Resource, object): |
428 | """A resource that can fall-back to a basis resource. |
429 | |
430 | @@ -126,13 +174,14 @@ |
431 | the web application. |
432 | """ |
433 | |
434 | - def __init__(self, endpoint, listener): |
435 | - self.site = Site(StartPage()) |
436 | + def __init__(self, endpoint, listener, status_worker): |
437 | + self.site = OverlaySite(StartPage()) |
438 | self.site.requestFactory = CleanPathRequest |
439 | super(WebApplicationService, self).__init__(endpoint, self.site) |
440 | self.websocket = WebSocketFactory(listener) |
441 | self.threadpool = ThreadPoolLimiter( |
442 | reactor.threadpoolForDatabase, concurrency.webapp) |
443 | + self.status_worker = status_worker |
444 | |
445 | def prepareApplication(self): |
446 | """Return the WSGI application. |
447 | @@ -157,16 +206,33 @@ |
448 | with RegionConfiguration.open() as config: |
449 | static_root = File(config.static_root) |
450 | |
451 | + # Setup resources to process paths that twisted handles. |
452 | + metadata = Resource() |
453 | + metadata.putChild(b'status', StatusHandlerResource(self.status_worker)) |
454 | + |
455 | + maas = Resource() |
456 | + maas.putChild(b'metadata', metadata) |
457 | + maas.putChild(b'static', static_root) |
458 | + maas.putChild( |
459 | + b'ws', |
460 | + WebSocketsResource(lookupProtocolForFactory(self.websocket))) |
461 | + |
462 | root = Resource() |
463 | - webapp = ResourceOverlay( |
464 | + root.putChild(b'', Redirect(b"MAAS/")) |
465 | + root.putChild(b'MAAS', maas) |
466 | + |
467 | + # Setup the resources to process paths that django handles. |
468 | + underlay_maas = ResourceOverlay( |
469 | WSGIResource(reactor, self.threadpool, application)) |
470 | - root.putChild(b"", Redirect(b"MAAS/")) |
471 | - root.putChild(b"MAAS", webapp) |
472 | - webapp.putChild( |
473 | - b'ws', |
474 | - WebSocketsResource(lookupProtocolForFactory(self.websocket))) |
475 | - webapp.putChild(b'static', static_root) |
476 | + underlay_root = Resource() |
477 | + underlay_root.putChild(b'MAAS', underlay_maas) |
478 | + underlay_site = Site(underlay_root) |
479 | + underlay_site.requestFactory = CleanPathRequest |
480 | + |
481 | + # Setup the main resource as the twisted handler and the underlay |
482 | + # resource as the django handler. |
483 | self.site.resource = root |
484 | + self.site.underlay = underlay_site |
485 | |
486 | def installFailed(self, failure): |
487 | """Display a page explaining why the web app could not start.""" |
488 | |
489 | === modified file 'src/metadataserver/api.py' |
490 | --- src/metadataserver/api.py 2017-02-17 14:23:04 +0000 |
491 | +++ src/metadataserver/api.py 2017-02-28 20:14:47 +0000 |
492 | @@ -176,7 +176,7 @@ |
493 | |
494 | |
495 | def add_event_to_node_event_log( |
496 | - node, origin, action, description, result=None): |
497 | + node, origin, action, description, result=None, created=None): |
498 | """Add an entry to the node's event log.""" |
499 | if node.status == NODE_STATUS.COMMISSIONING: |
500 | if result in ['SUCCESS', None]: |
501 | @@ -205,7 +205,7 @@ |
502 | node.system_id, type_name, type_level=event_details.level, |
503 | type_description=event_details.description, |
504 | event_action=action, |
505 | - event_description="'%s' %s" % (origin, description)) |
506 | + event_description="'%s' %s" % (origin, description), created=created) |
507 | |
508 | |
509 | def process_file( |
510 | |
511 | === added file 'src/metadataserver/api_twisted.py' |
512 | --- src/metadataserver/api_twisted.py 1970-01-01 00:00:00 +0000 |
513 | +++ src/metadataserver/api_twisted.py 2017-02-28 20:14:47 +0000 |
514 | @@ -0,0 +1,290 @@ |
515 | +# Copyright 2017 Canonical Ltd. This software is licensed under the |
516 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
517 | + |
518 | +"""Metadata API that runs in the Twisted reactor.""" |
519 | + |
520 | +import base64 |
521 | +import bz2 |
522 | +from collections import defaultdict |
523 | +from datetime import datetime |
524 | +import json |
525 | + |
526 | +from maasserver.api.utils import extract_oauth_key_from_auth_header |
527 | +from maasserver.enum import ( |
528 | + NODE_STATUS, |
529 | + NODE_TYPE, |
530 | +) |
531 | +from maasserver.utils.orm import ( |
532 | + in_transaction, |
533 | + transactional, |
534 | + TransactionManagementError, |
535 | +) |
536 | +from maasserver.utils.threads import deferToDatabase |
537 | +from metadataserver import logger |
538 | +from metadataserver.api import ( |
539 | + add_event_to_node_event_log, |
540 | + process_file, |
541 | +) |
542 | +from metadataserver.models import NodeKey |
543 | +from provisioningserver.logger import LegacyLogger |
544 | +from twisted.application.internet import TimerService |
545 | +from twisted.internet import reactor |
546 | +from twisted.web.resource import Resource |
547 | + |
548 | + |
549 | +log = LegacyLogger() |
550 | + |
551 | + |
552 | +class StatusHandlerResource(Resource): |
553 | + |
554 | + # Has no children, so getChild will not be called. |
555 | + isLeaf = True |
556 | + |
557 | + # Only POST operations are allowed. |
558 | + allowedMethods = [b'POST'] |
559 | + |
560 | + # Required keys in the message. |
561 | + requiredMessageKeys = ['event_type', 'origin', 'name', 'description'] |
562 | + |
563 | + def __init__(self, status_worker): |
564 | + self.worker = status_worker |
565 | + |
566 | + def render_POST(self, request): |
567 | + # Extract the authorization from request. This only does a basic |
568 | + # check that its provided. The status worker will do the authorization, |
569 | + # the negative to this is that the calling client will no know. To |
570 | + # them the message was accepted. This overall is okay since they are |
571 | + # just status messages. |
572 | + authorization = request.getHeader(b'authorization') |
573 | + if not authorization: |
574 | + request.setResponseCode(401) |
575 | + return b"" |
576 | + authorization = extract_oauth_key_from_auth_header( |
577 | + authorization.decode('utf-8')) |
578 | + if authorization is None: |
579 | + request.setResponseCode(401) |
580 | + return b"" |
581 | + |
582 | + # Load the content to ensure that its atleast correct before placing |
583 | + # it into the status worker. |
584 | + payload = request.content.read() |
585 | + try: |
586 | + payload = payload.decode("ascii") |
587 | + except UnicodeDecodeError as error: |
588 | + request.setResponseCode(400) |
589 | + error_msg = "Status payload must be ASCII-only: %s" % error |
590 | + logger.error(error_msg) |
591 | + return error_msg.encode('ascii') |
592 | + |
593 | + try: |
594 | + message = json.loads(payload) |
595 | + except ValueError: |
596 | + request.setResponseCode(400) |
597 | + error_msg = "Status payload is not valid JSON:\n%s\n\n" % payload |
598 | + logger.error(error_msg) |
599 | + return error_msg.encode('ascii') |
600 | + |
601 | + # Filter the level early so less messages need to be processed. |
602 | + level = message.get('level', None) |
603 | + if level is not None and level == "DEBUG": |
604 | + # Ignore all debug messages. |
605 | + request.setResponseCode(204) |
606 | + return b"" |
607 | + |
608 | + # Ensure the other required keys exist. |
609 | + missing_keys = [ |
610 | + key |
611 | + for key in self.requiredMessageKeys |
612 | + if key not in message |
613 | + ] |
614 | + if len(missing_keys) > 0: |
615 | + request.setResponseCode(400) |
616 | + error_msg = ( |
617 | + 'Missing parameter(s) %s in ' |
618 | + 'status message.' % ', '.join(missing_keys)) |
619 | + logger.error(error_msg) |
620 | + return error_msg.encode('ascii') |
621 | + |
622 | + # Queue the message with its authorization in the status worker. |
623 | + self.worker.queueMessage(authorization, message) |
624 | + request.setResponseCode(204) |
625 | + return b"" |
626 | + |
627 | + |
628 | +class StatusWorkerService(TimerService, object): |
629 | + """Service to update nodes from recieved status messages.""" |
630 | + |
631 | + check_interval = 60 # Every second. |
632 | + |
633 | + def __init__(self, dbtasks, clock=reactor): |
634 | + # Call self._tryUpdateNodes() every self.check_interval. |
635 | + super(StatusWorkerService, self).__init__( |
636 | + self.check_interval, self._tryUpdateNodes) |
637 | + self.dbtasks = dbtasks |
638 | + self.clock = clock |
639 | + self.queue = defaultdict(list) |
640 | + |
641 | + def _tryUpdateNodes(self): |
642 | + if len(self.queue) != 0: |
643 | + queue, self.queue = self.queue, defaultdict(list) |
644 | + d = deferToDatabase(self._preProcessQueue, queue) |
645 | + d.addCallback(self._processMessagesLater) |
646 | + d.addErrback(log.err, "Failed to process node status messages.") |
647 | + return d |
648 | + |
649 | + @transactional |
650 | + def _preProcessQueue(self, queue): |
651 | + """Check authorizations. |
652 | + |
653 | + Return a list of (node, messages) tuples, where each node is found |
654 | + from its authorisation. |
655 | + """ |
656 | + keys = NodeKey.objects.filter( |
657 | + key__in=list(queue.keys())).select_related('node') |
658 | + return [ |
659 | + (key.node, queue[key.key]) |
660 | + for key in keys |
661 | + ] |
662 | + |
663 | + def _processMessagesLater(self, tasks): |
664 | + # Move all messages on the queue off onto the database tasks queue. |
665 | + # We're not going to wait for them to be processed because we can't / |
666 | + # don't apply back-pressure to those systems that are producing these |
667 | + # messages anyway. |
668 | + for node, messages in tasks: |
669 | + self.dbtasks.addTask(self._processMessages, node, messages) |
670 | + |
671 | + def _processMessages(self, node, messages): |
672 | + # Push the messages into the database, recording them for this node. |
673 | + # This should be called in a non-reactor thread with a pre-existing |
674 | + # connection (e.g. via deferToDatabase). |
675 | + if in_transaction(): |
676 | + raise TransactionManagementError( |
677 | + "_processMessages must be called from " |
678 | + "outside of a transaction.") |
679 | + else: |
680 | + # Here we're in a database thread, with a database connection. |
681 | + for message in messages: |
682 | + try: |
683 | + self._processMessage(node, message) |
684 | + except: |
685 | + log.err( |
686 | + None, |
687 | + "Failed to process message " |
688 | + "for node: %s" % node.hostname) |
689 | + |
690 | + @transactional |
691 | + def _processMessage(self, node, message): |
692 | + event_type = message['event_type'] |
693 | + origin = message['origin'] |
694 | + activity_name = message['name'] |
695 | + description = message['description'] |
696 | + result = message.get('result', None) |
697 | + |
698 | + # Add this event to the node event log. |
699 | + add_event_to_node_event_log( |
700 | + node, origin, activity_name, description, result, |
701 | + message['timestamp']) |
702 | + |
703 | + # Group files together with the ScriptResult they belong. |
704 | + results = {} |
705 | + for sent_file in message.get('files', []): |
706 | + # Set the result type according to the node's status. |
707 | + if node.status == NODE_STATUS.TESTING: |
708 | + script_set = node.current_testing_script_set |
709 | + elif (node.status == NODE_STATUS.COMMISSIONING or |
710 | + node.node_type != NODE_TYPE.MACHINE): |
711 | + script_set = node.current_commissioning_script_set |
712 | + elif node.status == NODE_STATUS.DEPLOYING: |
713 | + script_set = node.current_installation_script_set |
714 | + else: |
715 | + raise ValueError( |
716 | + "Invalid status for saving files: %d" % node.status) |
717 | + |
718 | + script_name = sent_file['path'] |
719 | + content = self._retrieve_content( |
720 | + compression=sent_file.get('compression', None), |
721 | + encoding=sent_file['encoding'], |
722 | + content=sent_file['content']) |
723 | + process_file(results, script_set, script_name, content, sent_file) |
724 | + |
725 | + # Commit results to the database. |
726 | + for script_result, args in results.items(): |
727 | + script_result.store_result(**args) |
728 | + |
729 | + # Update the last ping in any status which uses a script_set whenever a |
730 | + # node in that status contacts us. |
731 | + script_set_statuses = { |
732 | + NODE_STATUS.COMMISSIONING: node.current_commissioning_script_set, |
733 | + NODE_STATUS.TESTING: node.current_testing_script_set, |
734 | + NODE_STATUS.DEPLOYING: node.current_installation_script_set, |
735 | + } |
736 | + script_set = script_set_statuses.get(node.status) |
737 | + if script_set is not None: |
738 | + script_set.last_ping = message['timestamp'] |
739 | + script_set.save() |
740 | + |
741 | + # At the end of a top-level event, we change the node status. |
742 | + save_node = False |
743 | + if self._is_top_level(activity_name) and event_type == 'finish': |
744 | + if node.status == NODE_STATUS.COMMISSIONING: |
745 | + if result in ['FAIL', 'FAILURE']: |
746 | + node.status = NODE_STATUS.FAILED_COMMISSIONING |
747 | + save_node = True |
748 | + elif node.status == NODE_STATUS.DEPLOYING: |
749 | + if result in ['FAIL', 'FAILURE']: |
750 | + node.mark_failed( |
751 | + comment="Installation failed (refer to the " |
752 | + "installation log for more information).") |
753 | + save_node = True |
754 | + elif node.status == NODE_STATUS.DISK_ERASING: |
755 | + if result in ['FAIL', 'FAILURE']: |
756 | + node.mark_failed(comment="Failed to erase disks.") |
757 | + save_node = True |
758 | + |
759 | + # Deallocate the node if we enter any terminal state. |
760 | + if node.node_type == NODE_TYPE.MACHINE and node.status in [ |
761 | + NODE_STATUS.READY, |
762 | + NODE_STATUS.FAILED_COMMISSIONING]: |
763 | + node.status_expires = None |
764 | + node.owner = None |
765 | + node.error = 'failed: %s' % description |
766 | + save_node = True |
767 | + |
768 | + if save_node: |
769 | + node.save() |
770 | + |
771 | + def _retrieve_content(self, compression, encoding, content): |
772 | + """Extract the content of the sent file.""" |
773 | + # Select the appropriate decompressor. |
774 | + if compression is None: |
775 | + decompress = lambda s: s |
776 | + elif compression == 'bzip2': |
777 | + decompress = bz2.decompress |
778 | + else: |
779 | + raise ValueError('Invalid compression: %s' % compression) |
780 | + |
781 | + # Select the appropriate decoder. |
782 | + if encoding == 'base64': |
783 | + decode = base64.decodebytes |
784 | + else: |
785 | + raise ValueError('Invalid encoding: %s' % encoding) |
786 | + |
787 | + return decompress(decode(content.encode("ascii"))) |
788 | + |
789 | + def _is_top_level(self, activity_name): |
790 | + """Top-level events do not have slashes in their names.""" |
791 | + return '/' not in activity_name |
792 | + |
793 | + def queueMessage(self, authorization, message): |
794 | + """Queue message for processing.""" |
795 | + # Ensure a timestamp exists in the message and convert it to a |
796 | + # datetime object. This is used to update the `last_ping` and the |
797 | + # time for the event message. |
798 | + timestamp = message.get('timestamp', None) |
799 | + if timestamp is not None: |
800 | + message['timestamp'] = datetime.utcfromtimestamp( |
801 | + message['timestamp']) |
802 | + else: |
803 | + message['timestamp'] = datetime.utcnow() |
804 | + self.queue[authorization].append(message) |
805 | |
806 | === added file 'src/metadataserver/tests/test_api_twisted.py' |
807 | --- src/metadataserver/tests/test_api_twisted.py 1970-01-01 00:00:00 +0000 |
808 | +++ src/metadataserver/tests/test_api_twisted.py 2017-02-28 20:14:47 +0000 |
809 | @@ -0,0 +1,766 @@ |
810 | +# Copyright 2017 Canonical Ltd. This software is licensed under the |
811 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
812 | + |
813 | +"""Tests for the twisted metadata API.""" |
814 | + |
815 | +__all__ = [] |
816 | + |
817 | +import base64 |
818 | +import bz2 |
819 | +from datetime import datetime |
820 | +from io import BytesIO |
821 | +import json |
822 | +from unittest.mock import ( |
823 | + Mock, |
824 | + sentinel, |
825 | +) |
826 | + |
827 | +from crochet import wait_for |
828 | +from maasserver.enum import NODE_STATUS |
829 | +from maasserver.models import ( |
830 | + Event, |
831 | + Tag, |
832 | +) |
833 | +from maasserver.models.signals.testing import SignalsDisabled |
834 | +from maasserver.testing.factory import factory |
835 | +from maasserver.testing.testcase import ( |
836 | + MAASServerTestCase, |
837 | + MAASTransactionServerTestCase, |
838 | +) |
839 | +from maasserver.utils.orm import ( |
840 | + reload_object, |
841 | + transactional, |
842 | + TransactionManagementError, |
843 | +) |
844 | +from maasserver.utils.threads import deferToDatabase |
845 | +from maastesting.matchers import ( |
846 | + MockCalledOnceWith, |
847 | + MockNotCalled, |
848 | +) |
849 | +from maastesting.testcase import MAASTestCase |
850 | +from metadataserver import api |
851 | +from metadataserver.api_twisted import ( |
852 | + StatusHandlerResource, |
853 | + StatusWorkerService, |
854 | +) |
855 | +from metadataserver.enum import SCRIPT_STATUS |
856 | +from metadataserver.models import NodeKey |
857 | +from testtools import ExpectedException |
858 | +from testtools.matchers import ( |
859 | + Equals, |
860 | + MatchesListwise, |
861 | + MatchesSetwise, |
862 | +) |
863 | +from twisted.internet.defer import inlineCallbacks |
864 | +from twisted.web.test.requesthelper import DummyRequest |
865 | + |
866 | + |
867 | +wait_for_reactor = wait_for(30) |
868 | + |
869 | + |
870 | +class TestStatusHandlerResource(MAASTestCase): |
871 | + |
872 | + def make_request(self, content=None, token=None): |
873 | + request = DummyRequest([]) |
874 | + if token is None: |
875 | + token = factory.make_name('token') |
876 | + request.requestHeaders.addRawHeader( |
877 | + b'authorization', 'oauth_token=%s' % token) |
878 | + if content is not None: |
879 | + request.content = BytesIO(content) |
880 | + return request |
881 | + |
882 | + def test__init__(self): |
883 | + resource = StatusHandlerResource(sentinel.status_worker) |
884 | + self.assertIs(sentinel.status_worker, resource.worker) |
885 | + self.assertTrue(resource.isLeaf) |
886 | + self.assertEquals([b'POST'], resource.allowedMethods) |
887 | + self.assertEquals( |
888 | + ['event_type', 'origin', 'name', 'description'], |
889 | + resource.requiredMessageKeys) |
890 | + |
891 | + def test__render_POST_missing_authorization(self): |
892 | + resource = StatusHandlerResource(sentinel.status_worker) |
893 | + request = DummyRequest([]) |
894 | + output = resource.render_POST(request) |
895 | + self.assertEquals(b'', output) |
896 | + self.assertEquals(401, request.responseCode) |
897 | + |
898 | + def test__render_POST_empty_authorization(self): |
899 | + resource = StatusHandlerResource(sentinel.status_worker) |
900 | + request = DummyRequest([]) |
901 | + request.requestHeaders.addRawHeader(b'authorization', '') |
902 | + output = resource.render_POST(request) |
903 | + self.assertEquals(b'', output) |
904 | + self.assertEquals(401, request.responseCode) |
905 | + |
906 | + def test__render_POST_bad_authorization(self): |
907 | + resource = StatusHandlerResource(sentinel.status_worker) |
908 | + request = DummyRequest([]) |
909 | + request.requestHeaders.addRawHeader( |
910 | + b'authorization', factory.make_name('auth')) |
911 | + output = resource.render_POST(request) |
912 | + self.assertEquals(b'', output) |
913 | + self.assertEquals(401, request.responseCode) |
914 | + |
915 | + def test__render_POST_body_must_be_ascii(self): |
916 | + resource = StatusHandlerResource(sentinel.status_worker) |
917 | + request = self.make_request(content=b'\xe9') |
918 | + output = resource.render_POST(request) |
919 | + self.assertEquals( |
920 | + b"Status payload must be ASCII-only: 'ascii' codec can't " |
921 | + b"decode byte 0xe9 in position 0: ordinal not in range(128)", |
922 | + output) |
923 | + self.assertEquals(400, request.responseCode) |
924 | + |
925 | + def test__render_POST_body_must_be_valid_json(self): |
926 | + resource = StatusHandlerResource(sentinel.status_worker) |
927 | + request = self.make_request(content=b'testing not json') |
928 | + output = resource.render_POST(request) |
929 | + self.assertEquals( |
930 | + b"Status payload is not valid JSON:\ntesting not json\n\n", |
931 | + output) |
932 | + self.assertEquals(400, request.responseCode) |
933 | + |
934 | + def test__render_POST_ignores_debug_messages(self): |
935 | + resource = StatusHandlerResource(sentinel.status_worker) |
936 | + request = self.make_request(content=json.dumps({ |
937 | + 'level': 'DEBUG', |
938 | + }).encode('ascii')) |
939 | + output = resource.render_POST(request) |
940 | + self.assertEquals(b'', output) |
941 | + self.assertEquals(204, request.responseCode) |
942 | + |
943 | + def test__render_POST_validates_required_keys(self): |
944 | + resource = StatusHandlerResource(sentinel.status_worker) |
945 | + request = self.make_request(content=json.dumps({}).encode('ascii')) |
946 | + output = resource.render_POST(request) |
947 | + self.assertEquals( |
948 | + b'Missing parameter(s) event_type, origin, name, description ' |
949 | + b'in status message.', output) |
950 | + self.assertEquals(400, request.responseCode) |
951 | + |
952 | + def test__render_POST_queue_messages(self): |
953 | + status_worker = Mock() |
954 | + status_worker.queueMessage = Mock() |
955 | + resource = StatusHandlerResource(status_worker) |
956 | + message = { |
957 | + 'event_type': factory.make_name('type'), |
958 | + 'origin': factory.make_name('origin'), |
959 | + 'name': factory.make_name('name'), |
960 | + 'description': factory.make_name('description'), |
961 | + } |
962 | + token = factory.make_name('token') |
963 | + request = self.make_request( |
964 | + content=json.dumps(message).encode('ascii'), token=token) |
965 | + output = resource.render_POST(request) |
966 | + self.assertEquals(b'', output) |
967 | + self.assertEquals(204, request.responseCode) |
968 | + self.assertThat( |
969 | + status_worker.queueMessage, MockCalledOnceWith(token, message)) |
970 | + |
971 | + |
972 | +class TestStatusWorkerServiceTransactional(MAASTransactionServerTestCase): |
973 | + |
974 | + @transactional |
975 | + def make_nodes_with_tokens(self): |
976 | + nodes = [ |
977 | + factory.make_Node() |
978 | + for _ in range(3) |
979 | + ] |
980 | + return [ |
981 | + (node, NodeKey.objects.get_token_for_node(node)) |
982 | + for node in nodes |
983 | + ] |
984 | + |
985 | + def make_message(self): |
986 | + return { |
987 | + 'event_type': factory.make_name('type'), |
988 | + 'origin': factory.make_name('origin'), |
989 | + 'name': factory.make_name('name'), |
990 | + 'description': factory.make_name('description'), |
991 | + 'timestamp': datetime.utcnow().timestamp(), |
992 | + } |
993 | + |
994 | + def test__init__(self): |
995 | + worker = StatusWorkerService(sentinel.dbtasks, clock=sentinel.reactor) |
996 | + self.assertEqual(sentinel.dbtasks, worker.dbtasks) |
997 | + self.assertEqual(sentinel.reactor, worker.clock) |
998 | + self.assertEqual(60, worker.step) |
999 | + self.assertEqual((worker._tryUpdateNodes, tuple(), {}), worker.call) |
1000 | + |
1001 | + def test__tryUpdateNodes_returns_None_when_empty_queue(self): |
1002 | + worker = StatusWorkerService(sentinel.dbtasks) |
1003 | + self.assertIsNone(worker._tryUpdateNodes()) |
1004 | + |
1005 | + @wait_for_reactor |
1006 | + @inlineCallbacks |
1007 | + def test__tryUpdateNodes_sends_work_to_dbtasks(self): |
1008 | + nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens) |
1009 | + node_messages = { |
1010 | + node: [ |
1011 | + self.make_message() |
1012 | + for _ in range(3) |
1013 | + ] |
1014 | + for node, _ in nodes_with_tokens |
1015 | + } |
1016 | + dbtasks = Mock() |
1017 | + dbtasks.addTask = Mock() |
1018 | + worker = StatusWorkerService(dbtasks) |
1019 | + for node, token in nodes_with_tokens: |
1020 | + for message in node_messages[node]: |
1021 | + worker.queueMessage(token.key, message) |
1022 | + yield worker._tryUpdateNodes() |
1023 | + call_args = [ |
1024 | + (call_arg[0][1], call_arg[0][2]) |
1025 | + for call_arg in dbtasks.addTask.call_args_list |
1026 | + ] |
1027 | + self.assertThat(call_args, MatchesSetwise(*[ |
1028 | + MatchesListwise([Equals(node), Equals(messages)]) |
1029 | + for node, messages in node_messages.items() |
1030 | + ])) |
1031 | + |
1032 | + @wait_for_reactor |
1033 | + @inlineCallbacks |
1034 | + def test__processMessages_fails_when_in_transaction(self): |
1035 | + worker = StatusWorkerService(sentinel.dbtasks) |
1036 | + with ExpectedException(TransactionManagementError): |
1037 | + yield deferToDatabase( |
1038 | + transactional(worker._processMessages), |
1039 | + sentinel.node, [sentinel.message]) |
1040 | + |
1041 | + @wait_for_reactor |
1042 | + @inlineCallbacks |
1043 | + def test__processMessages_calls_processMessage(self): |
1044 | + worker = StatusWorkerService(sentinel.dbtasks) |
1045 | + mock_processMessage = self.patch(worker, "_processMessage") |
1046 | + yield deferToDatabase( |
1047 | + worker._processMessages, sentinel.node, [sentinel.message]) |
1048 | + self.assertThat( |
1049 | + mock_processMessage, |
1050 | + MockCalledOnceWith(sentinel.node, sentinel.message)) |
1051 | + |
1052 | + |
1053 | +def encode_as_base64(content): |
1054 | + return base64.encodebytes(content).decode("ascii") |
1055 | + |
1056 | + |
1057 | +class TestStatusWorkerService(MAASServerTestCase): |
1058 | + |
1059 | + def setUp(self): |
1060 | + super(TestStatusWorkerService, self).setUp() |
1061 | + self.useFixture(SignalsDisabled("power")) |
1062 | + |
1063 | + def processMessage(self, node, payload): |
1064 | + worker = StatusWorkerService(sentinel.dbtasks) |
1065 | + worker._processMessage(node, payload) |
1066 | + |
1067 | + def test_status_installation_result_does_not_affect_other_node(self): |
1068 | + node1 = factory.make_Node(status=NODE_STATUS.DEPLOYING) |
1069 | + node2 = factory.make_Node(status=NODE_STATUS.DEPLOYING) |
1070 | + payload = { |
1071 | + 'event_type': 'finish', |
1072 | + 'result': 'SUCCESS', |
1073 | + 'origin': 'curtin', |
1074 | + 'name': 'cmd-install', |
1075 | + 'description': 'Command Install', |
1076 | + 'timestamp': datetime.utcnow(), |
1077 | + } |
1078 | + self.processMessage(node1, payload) |
1079 | + self.assertEqual( |
1080 | + NODE_STATUS.DEPLOYING, reload_object(node2).status) |
1081 | + # Check last node1 event. |
1082 | + self.assertEqual( |
1083 | + "'curtin' Command Install", |
1084 | + Event.objects.filter(node=node1).last().description) |
1085 | + # There must me no events for node2. |
1086 | + self.assertFalse(Event.objects.filter(node=node2).exists()) |
1087 | + |
1088 | + def test_status_installation_success_leaves_node_deploying(self): |
1089 | + node = factory.make_Node(interface=True, status=NODE_STATUS.DEPLOYING) |
1090 | + payload = { |
1091 | + 'event_type': 'finish', |
1092 | + 'result': 'SUCCESS', |
1093 | + 'origin': 'curtin', |
1094 | + 'name': 'cmd-install', |
1095 | + 'description': 'Command Install', |
1096 | + 'timestamp': datetime.utcnow(), |
1097 | + } |
1098 | + self.processMessage(node, payload) |
1099 | + self.assertEqual(NODE_STATUS.DEPLOYING, reload_object(node).status) |
1100 | + # Check last node event. |
1101 | + self.assertEqual( |
1102 | + "'curtin' Command Install", |
1103 | + Event.objects.filter(node=node).last().description) |
1104 | + |
1105 | + def test_status_commissioning_failure_leaves_node_failed(self): |
1106 | + node = factory.make_Node( |
1107 | + interface=True, status=NODE_STATUS.COMMISSIONING) |
1108 | + payload = { |
1109 | + 'event_type': 'finish', |
1110 | + 'result': 'FAILURE', |
1111 | + 'origin': 'curtin', |
1112 | + 'name': 'commissioning', |
1113 | + 'description': 'Commissioning', |
1114 | + 'timestamp': datetime.utcnow(), |
1115 | + } |
1116 | + self.processMessage(node, payload) |
1117 | + self.assertEqual( |
1118 | + NODE_STATUS.FAILED_COMMISSIONING, reload_object(node).status) |
1119 | + # Check last node event. |
1120 | + self.assertEqual( |
1121 | + "'curtin' Commissioning", |
1122 | + Event.objects.filter(node=node).last().description) |
1123 | + |
1124 | + def test_status_commissioning_failure_clears_owner(self): |
1125 | + user = factory.make_User() |
1126 | + node = factory.make_Node( |
1127 | + interface=True, status=NODE_STATUS.COMMISSIONING, owner=user) |
1128 | + payload = { |
1129 | + 'event_type': 'finish', |
1130 | + 'result': 'FAILURE', |
1131 | + 'origin': 'curtin', |
1132 | + 'name': 'commissioning', |
1133 | + 'description': 'Commissioning', |
1134 | + 'timestamp': datetime.utcnow(), |
1135 | + } |
1136 | + self.assertEqual(user, node.owner) # Node has an owner |
1137 | + self.processMessage(node, payload) |
1138 | + self.assertEqual( |
1139 | + NODE_STATUS.FAILED_COMMISSIONING, reload_object(node).status) |
1140 | + self.assertIsNone(reload_object(node).owner) |
1141 | + |
1142 | + def test_status_installation_failure_leaves_node_failed(self): |
1143 | + node = factory.make_Node(interface=True, status=NODE_STATUS.DEPLOYING) |
1144 | + payload = { |
1145 | + 'event_type': 'finish', |
1146 | + 'result': 'FAILURE', |
1147 | + 'origin': 'curtin', |
1148 | + 'name': 'cmd-install', |
1149 | + 'description': 'Command Install', |
1150 | + 'timestamp': datetime.utcnow(), |
1151 | + } |
1152 | + self.processMessage(node, payload) |
1153 | + self.assertEqual( |
1154 | + NODE_STATUS.FAILED_DEPLOYMENT, reload_object(node).status) |
1155 | + # Check last node event. |
1156 | + self.assertEqual( |
1157 | + "Installation failed (refer to the installation" |
1158 | + " log for more information).", |
1159 | + Event.objects.filter(node=node).last().description) |
1160 | + |
1161 | + def test_status_installation_fail_leaves_node_failed(self): |
1162 | + node = factory.make_Node(interface=True, status=NODE_STATUS.DEPLOYING) |
1163 | + payload = { |
1164 | + 'event_type': 'finish', |
1165 | + 'result': 'FAIL', |
1166 | + 'origin': 'curtin', |
1167 | + 'name': 'cmd-install', |
1168 | + 'description': 'Command Install', |
1169 | + 'timestamp': datetime.utcnow(), |
1170 | + } |
1171 | + self.processMessage(node, payload) |
1172 | + self.assertEqual( |
1173 | + NODE_STATUS.FAILED_DEPLOYMENT, reload_object(node).status) |
1174 | + # Check last node event. |
1175 | + self.assertEqual( |
1176 | + "Installation failed (refer to the installation" |
1177 | + " log for more information).", |
1178 | + Event.objects.filter(node=node).last().description) |
1179 | + |
1180 | + def test_status_installation_failure_doesnt_clear_owner(self): |
1181 | + user = factory.make_User() |
1182 | + node = factory.make_Node( |
1183 | + interface=True, status=NODE_STATUS.DEPLOYING, owner=user) |
1184 | + payload = { |
1185 | + 'event_type': 'finish', |
1186 | + 'result': 'FAILURE', |
1187 | + 'origin': 'curtin', |
1188 | + 'name': 'cmd-install', |
1189 | + 'description': 'Command Install', |
1190 | + 'timestamp': datetime.utcnow(), |
1191 | + } |
1192 | + self.assertEqual(user, node.owner) # Node has an owner |
1193 | + self.processMessage(node, payload) |
1194 | + self.assertEqual( |
1195 | + NODE_STATUS.FAILED_DEPLOYMENT, reload_object(node).status) |
1196 | + self.assertIsNotNone(reload_object(node).owner) |
1197 | + |
1198 | + def test_status_commissioning_failure_does_not_populate_tags(self): |
1199 | + populate_tags_for_single_node = self.patch( |
1200 | + api, "populate_tags_for_single_node") |
1201 | + node = factory.make_Node( |
1202 | + interface=True, status=NODE_STATUS.COMMISSIONING) |
1203 | + payload = { |
1204 | + 'event_type': 'finish', |
1205 | + 'result': 'FAILURE', |
1206 | + 'origin': 'curtin', |
1207 | + 'name': 'commissioning', |
1208 | + 'description': 'Commissioning', |
1209 | + 'timestamp': datetime.utcnow(), |
1210 | + } |
1211 | + self.processMessage(node, payload) |
1212 | + self.assertEqual( |
1213 | + NODE_STATUS.FAILED_COMMISSIONING, reload_object(node).status) |
1214 | + self.assertThat(populate_tags_for_single_node, MockNotCalled()) |
1215 | + |
1216 | + def test_status_erasure_failure_leaves_node_failed(self): |
1217 | + node = factory.make_Node( |
1218 | + interface=True, status=NODE_STATUS.DISK_ERASING) |
1219 | + payload = { |
1220 | + 'event_type': 'finish', |
1221 | + 'result': 'FAILURE', |
1222 | + 'origin': 'curtin', |
1223 | + 'name': 'cmd-erase', |
1224 | + 'description': 'Erasing disk', |
1225 | + 'timestamp': datetime.utcnow(), |
1226 | + } |
1227 | + self.processMessage(node, payload) |
1228 | + self.assertEqual( |
1229 | + NODE_STATUS.FAILED_DISK_ERASING, reload_object(node).status) |
1230 | + # Check last node event. |
1231 | + self.assertEqual( |
1232 | + "Failed to erase disks.", |
1233 | + Event.objects.filter(node=node).last().description) |
1234 | + |
1235 | + def test_status_erasure_failure_does_not_populate_tags(self): |
1236 | + populate_tags_for_single_node = self.patch( |
1237 | + api, "populate_tags_for_single_node") |
1238 | + node = factory.make_Node( |
1239 | + interface=True, status=NODE_STATUS.DISK_ERASING) |
1240 | + payload = { |
1241 | + 'event_type': 'finish', |
1242 | + 'result': 'FAILURE', |
1243 | + 'origin': 'curtin', |
1244 | + 'name': 'cmd-erase', |
1245 | + 'description': 'Erasing disk', |
1246 | + 'timestamp': datetime.utcnow(), |
1247 | + } |
1248 | + self.processMessage(node, payload) |
1249 | + self.assertEqual( |
1250 | + NODE_STATUS.FAILED_DISK_ERASING, reload_object(node).status) |
1251 | + self.assertThat(populate_tags_for_single_node, MockNotCalled()) |
1252 | + |
1253 | + def test_status_erasure_failure_doesnt_clear_owner(self): |
1254 | + user = factory.make_User() |
1255 | + node = factory.make_Node( |
1256 | + interface=True, status=NODE_STATUS.DISK_ERASING, owner=user) |
1257 | + payload = { |
1258 | + 'event_type': 'finish', |
1259 | + 'result': 'FAILURE', |
1260 | + 'origin': 'curtin', |
1261 | + 'name': 'cmd-erase', |
1262 | + 'description': 'Erasing disk', |
1263 | + 'timestamp': datetime.utcnow(), |
1264 | + } |
1265 | + self.processMessage(node, payload) |
1266 | + self.assertEqual( |
1267 | + NODE_STATUS.FAILED_DISK_ERASING, reload_object(node).status) |
1268 | + self.assertEqual(user, node.owner) |
1269 | + |
1270 | + def test_status_with_file_bad_encoder_fails(self): |
1271 | + node = factory.make_Node( |
1272 | + interface=True, status=NODE_STATUS.COMMISSIONING) |
1273 | + contents = b'These are the contents of the file.' |
1274 | + encoded_content = encode_as_base64(bz2.compress(contents)) |
1275 | + payload = { |
1276 | + 'event_type': 'finish', |
1277 | + 'result': 'FAILURE', |
1278 | + 'origin': 'curtin', |
1279 | + 'name': 'commissioning', |
1280 | + 'description': 'Commissioning', |
1281 | + 'timestamp': datetime.utcnow(), |
1282 | + 'files': [ |
1283 | + { |
1284 | + "path": "sample.txt", |
1285 | + "encoding": "uuencode", |
1286 | + "compression": "bzip2", |
1287 | + "content": encoded_content |
1288 | + } |
1289 | + ] |
1290 | + } |
1291 | + with ExpectedException(ValueError): |
1292 | + self.processMessage(node, payload) |
1293 | + |
1294 | + def test_status_with_file_bad_compression_fails(self): |
1295 | + node = factory.make_Node( |
1296 | + interface=True, status=NODE_STATUS.COMMISSIONING) |
1297 | + contents = b'These are the contents of the file.' |
1298 | + encoded_content = encode_as_base64(bz2.compress(contents)) |
1299 | + payload = { |
1300 | + 'event_type': 'finish', |
1301 | + 'result': 'FAILURE', |
1302 | + 'origin': 'curtin', |
1303 | + 'name': 'commissioning', |
1304 | + 'description': 'Commissioning', |
1305 | + 'timestamp': datetime.utcnow(), |
1306 | + 'files': [ |
1307 | + { |
1308 | + "path": "sample.txt", |
1309 | + "encoding": "base64", |
1310 | + "compression": "jpeg", |
1311 | + "content": encoded_content |
1312 | + } |
1313 | + ] |
1314 | + } |
1315 | + with ExpectedException(ValueError): |
1316 | + self.processMessage(node, payload) |
1317 | + |
1318 | + def test_status_with_file_no_compression_succeeds(self): |
1319 | + node = factory.make_Node( |
1320 | + interface=True, status=NODE_STATUS.COMMISSIONING, |
1321 | + with_empty_script_sets=True) |
1322 | + script_result = ( |
1323 | + node.current_commissioning_script_set.scriptresult_set.first()) |
1324 | + script_result.status = SCRIPT_STATUS.RUNNING |
1325 | + script_result.save() |
1326 | + contents = b'These are the contents of the file.' |
1327 | + encoded_content = encode_as_base64(contents) |
1328 | + payload = { |
1329 | + 'event_type': 'finish', |
1330 | + 'result': 'FAILURE', |
1331 | + 'origin': 'curtin', |
1332 | + 'name': 'commissioning', |
1333 | + 'description': 'Commissioning', |
1334 | + 'timestamp': datetime.utcnow(), |
1335 | + 'files': [ |
1336 | + { |
1337 | + "path": script_result.name, |
1338 | + "encoding": "base64", |
1339 | + "content": encoded_content |
1340 | + } |
1341 | + ] |
1342 | + } |
1343 | + self.processMessage(node, payload) |
1344 | + self.assertEqual(contents, reload_object(script_result).stdout) |
1345 | + |
1346 | + def test_status_with_file_invalid_statuses_fails(self): |
1347 | + """Adding files should fail for every status that's neither |
1348 | + COMMISSIONING nor DEPLOYING""" |
1349 | + for node_status in [ |
1350 | + NODE_STATUS.DEFAULT, |
1351 | + NODE_STATUS.NEW, |
1352 | + NODE_STATUS.FAILED_COMMISSIONING, |
1353 | + NODE_STATUS.MISSING, |
1354 | + NODE_STATUS.READY, |
1355 | + NODE_STATUS.RESERVED, |
1356 | + NODE_STATUS.DEPLOYED, |
1357 | + NODE_STATUS.RETIRED, |
1358 | + NODE_STATUS.BROKEN, |
1359 | + NODE_STATUS.ALLOCATED, |
1360 | + NODE_STATUS.FAILED_DEPLOYMENT, |
1361 | + NODE_STATUS.RELEASING, |
1362 | + NODE_STATUS.FAILED_RELEASING, |
1363 | + NODE_STATUS.DISK_ERASING, |
1364 | + NODE_STATUS.FAILED_DISK_ERASING]: |
1365 | + node = factory.make_Node(interface=True, status=node_status) |
1366 | + contents = b'These are the contents of the file.' |
1367 | + encoded_content = encode_as_base64(bz2.compress(contents)) |
1368 | + payload = { |
1369 | + 'event_type': 'finish', |
1370 | + 'result': 'FAILURE', |
1371 | + 'origin': 'curtin', |
1372 | + 'name': 'commissioning', |
1373 | + 'description': 'Commissioning', |
1374 | + 'timestamp': datetime.utcnow(), |
1375 | + 'files': [ |
1376 | + { |
1377 | + "path": "sample.txt", |
1378 | + "encoding": "base64", |
1379 | + "compression": "bzip2", |
1380 | + "content": encoded_content |
1381 | + } |
1382 | + ] |
1383 | + } |
1384 | + with ExpectedException(ValueError): |
1385 | + self.processMessage(node, payload) |
1386 | + |
1387 | + def test_status_with_file_succeeds(self): |
1388 | + """Adding files should succeed for every status that's either |
1389 | + COMMISSIONING or DEPLOYING""" |
1390 | + for node_status, target_status in [ |
1391 | + (NODE_STATUS.COMMISSIONING, NODE_STATUS.FAILED_COMMISSIONING), |
1392 | + (NODE_STATUS.DEPLOYING, NODE_STATUS.FAILED_DEPLOYMENT)]: |
1393 | + node = factory.make_Node( |
1394 | + interface=True, status=node_status, |
1395 | + with_empty_script_sets=True) |
1396 | + if node_status == NODE_STATUS.COMMISSIONING: |
1397 | + script_set = node.current_commissioning_script_set |
1398 | + elif node_status == NODE_STATUS.DEPLOYING: |
1399 | + script_set = node.current_installation_script_set |
1400 | + script_result = script_set.scriptresult_set.first() |
1401 | + script_result.status = SCRIPT_STATUS.RUNNING |
1402 | + script_result.save() |
1403 | + contents = b'These are the contents of the file.' |
1404 | + encoded_content = encode_as_base64(bz2.compress(contents)) |
1405 | + payload = { |
1406 | + 'event_type': 'finish', |
1407 | + 'result': 'FAILURE', |
1408 | + 'origin': 'curtin', |
1409 | + 'name': 'commissioning', |
1410 | + 'description': 'Commissioning', |
1411 | + 'timestamp': datetime.utcnow(), |
1412 | + 'files': [ |
1413 | + { |
1414 | + "path": script_result.name, |
1415 | + "encoding": "base64", |
1416 | + "compression": "bzip2", |
1417 | + "content": encoded_content |
1418 | + } |
1419 | + ] |
1420 | + } |
1421 | + self.processMessage(node, payload) |
1422 | + self.assertEqual( |
1423 | + target_status, reload_object(node).status) |
1424 | + # Check the node result. |
1425 | + self.assertEqual(contents, reload_object(script_result).stdout) |
1426 | + |
1427 | + def test_status_with_results_succeeds(self): |
1428 | + """Adding a script result should succeed""" |
1429 | + node = factory.make_Node( |
1430 | + interface=True, status=NODE_STATUS.COMMISSIONING, |
1431 | + with_empty_script_sets=True) |
1432 | + script_result = ( |
1433 | + node.current_commissioning_script_set.scriptresult_set.first()) |
1434 | + script_result.status = SCRIPT_STATUS.RUNNING |
1435 | + script_result.save() |
1436 | + contents = b'These are the contents of the file.' |
1437 | + encoded_content = encode_as_base64(bz2.compress(contents)) |
1438 | + payload = { |
1439 | + 'event_type': 'finish', |
1440 | + 'result': 'FAILURE', |
1441 | + 'origin': 'curtin', |
1442 | + 'name': 'commissioning', |
1443 | + 'description': 'Commissioning', |
1444 | + 'timestamp': datetime.utcnow(), |
1445 | + 'files': [ |
1446 | + { |
1447 | + "path": script_result.name, |
1448 | + "encoding": "base64", |
1449 | + "compression": "bzip2", |
1450 | + "content": encoded_content, |
1451 | + "result": -42 |
1452 | + } |
1453 | + ] |
1454 | + } |
1455 | + self.processMessage(node, payload) |
1456 | + script_result = reload_object(script_result) |
1457 | + self.assertEqual(contents, script_result.stdout) |
1458 | + self.assertEqual(-42, script_result.exit_status) |
1459 | + |
1460 | + def test_status_with_results_no_exit_status_defaults_to_zero(self): |
1461 | + """Adding a script result should succeed without a return code defaults |
1462 | + it to zero.""" |
1463 | + node = factory.make_Node( |
1464 | + interface=True, status=NODE_STATUS.COMMISSIONING, |
1465 | + with_empty_script_sets=True) |
1466 | + script_result = ( |
1467 | + node.current_commissioning_script_set.scriptresult_set.first()) |
1468 | + script_result.status = SCRIPT_STATUS.RUNNING |
1469 | + script_result.save() |
1470 | + contents = b'These are the contents of the file.' |
1471 | + encoded_content = encode_as_base64(bz2.compress(contents)) |
1472 | + payload = { |
1473 | + 'event_type': 'finish', |
1474 | + 'result': 'FAILURE', |
1475 | + 'origin': 'curtin', |
1476 | + 'name': 'commissioning', |
1477 | + 'description': 'Commissioning', |
1478 | + 'timestamp': datetime.utcnow(), |
1479 | + 'files': [ |
1480 | + { |
1481 | + "path": script_result.name, |
1482 | + "encoding": "base64", |
1483 | + "compression": "bzip2", |
1484 | + "content": encoded_content, |
1485 | + } |
1486 | + ] |
1487 | + } |
1488 | + self.processMessage(node, payload) |
1489 | + self.assertEqual(0, reload_object(script_result).exit_status) |
1490 | + |
1491 | + def test_status_stores_virtual_tag_on_node_if_virtual(self): |
1492 | + node = factory.make_Node( |
1493 | + status=NODE_STATUS.COMMISSIONING, with_empty_script_sets=True) |
1494 | + content = 'virtual'.encode('utf-8') |
1495 | + payload = { |
1496 | + 'event_type': 'finish', |
1497 | + 'result': 'SUCCESS', |
1498 | + 'origin': 'curtin', |
1499 | + 'name': 'commissioning', |
1500 | + 'description': 'Commissioning', |
1501 | + 'timestamp': datetime.utcnow(), |
1502 | + 'files': [ |
1503 | + { |
1504 | + "path": "00-maas-02-virtuality", |
1505 | + "encoding": "base64", |
1506 | + "content": encode_as_base64(content), |
1507 | + } |
1508 | + ] |
1509 | + } |
1510 | + self.processMessage(node, payload) |
1511 | + node = reload_object(node) |
1512 | + self.assertEqual( |
1513 | + ["virtual"], [each_tag.name for each_tag in node.tags.all()]) |
1514 | + for script_result in node.current_commissioning_script_set: |
1515 | + if script_result.name == "00-maas-02-virtuality": |
1516 | + break |
1517 | + self.assertEqual(content, script_result.stdout) |
1518 | + |
1519 | + def test_status_removes_virtual_tag_on_node_if_not_virtual(self): |
1520 | + node = factory.make_Node( |
1521 | + status=NODE_STATUS.COMMISSIONING, with_empty_script_sets=True) |
1522 | + tag, _ = Tag.objects.get_or_create(name='virtual') |
1523 | + node.tags.add(tag) |
1524 | + content = 'none'.encode('utf-8') |
1525 | + payload = { |
1526 | + 'event_type': 'finish', |
1527 | + 'result': 'SUCCESS', |
1528 | + 'origin': 'curtin', |
1529 | + 'name': 'commissioning', |
1530 | + 'description': 'Commissioning', |
1531 | + 'timestamp': datetime.utcnow(), |
1532 | + 'files': [ |
1533 | + { |
1534 | + "path": "00-maas-02-virtuality", |
1535 | + "encoding": "base64", |
1536 | + "content": encode_as_base64(content), |
1537 | + } |
1538 | + ] |
1539 | + } |
1540 | + self.processMessage(node, payload) |
1541 | + node = reload_object(node) |
1542 | + self.assertEqual( |
1543 | + [], [each_tag.name for each_tag in node.tags.all()]) |
1544 | + for script_result in node.current_commissioning_script_set: |
1545 | + if script_result.name == "00-maas-02-virtuality": |
1546 | + break |
1547 | + self.assertEqual(content, script_result.stdout) |
1548 | + |
1549 | + def test_status_updates_script_status_last_ping(self): |
1550 | + nodes = { |
1551 | + status: factory.make_Node( |
1552 | + status=status, with_empty_script_sets=True) |
1553 | + for status in ( |
1554 | + NODE_STATUS.COMMISSIONING, |
1555 | + NODE_STATUS.TESTING, |
1556 | + NODE_STATUS.DEPLOYING) |
1557 | + } |
1558 | + |
1559 | + for status, node in nodes.items(): |
1560 | + payload = { |
1561 | + 'event_type': 'progress', |
1562 | + 'origin': 'curtin', |
1563 | + 'name': 'test', |
1564 | + 'description': 'testing', |
1565 | + 'timestamp': datetime.utcnow(), |
1566 | + } |
1567 | + self.processMessage(node, payload) |
1568 | + script_set_statuses = { |
1569 | + NODE_STATUS.COMMISSIONING: ( |
1570 | + node.current_commissioning_script_set), |
1571 | + NODE_STATUS.TESTING: node.current_testing_script_set, |
1572 | + NODE_STATUS.DEPLOYING: node.current_installation_script_set, |
1573 | + } |
1574 | + script_set = script_set_statuses.get(node.status) |
1575 | + self.assertIsNotNone(script_set.last_ping) |
Overall this looks good. Just a few comments inline.