Merge lp:~blake-rouse/maas/queue-status-messages into lp:~maas-committers/maas/trunk

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: no longer in the source branch.
Merged at revision: 5763
Proposed branch: lp:~blake-rouse/maas/queue-status-messages
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 1575 lines (+1274/-25)
13 files modified
src/maasserver/eventloop.py (+14/-3)
src/maasserver/models/event.py (+4/-3)
src/maasserver/models/node.py (+1/-1)
src/maasserver/models/tests/test_event.py (+10/-0)
src/maasserver/preseed.py (+1/-0)
src/maasserver/tests/test_eventloop.py (+19/-2)
src/maasserver/tests/test_plugin.py (+1/-0)
src/maasserver/tests/test_preseed.py (+3/-0)
src/maasserver/tests/test_webapp.py (+87/-4)
src/maasserver/webapp.py (+76/-10)
src/metadataserver/api.py (+2/-2)
src/metadataserver/api_twisted.py (+290/-0)
src/metadataserver/tests/test_api_twisted.py (+766/-0)
To merge this branch: bzr merge lp:~blake-rouse/maas/queue-status-messages
Reviewer Review Type Date Requested Status
Andres Rodriguez (community) Approve
Review via email: mp+317394@code.launchpad.net

Commit message

Status message from curtin and cloud-init are now handled by twisted. Each message is placed in a queue to be processed every second. This helps with the load on the regiond processes by dropping load by more than 50%. This branch also allows a Twisted API endpoint to overlay a Django endpoint.

To post a comment you must log in.
Revision history for this message
Andres Rodriguez (andreserl) wrote :

Overall this looks good. Just a few comments inline.

review: Approve
Revision history for this message
Blake Rouse (blake-rouse) :
Revision history for this message
MAAS Lander (maas-lander) wrote :
Download full text (2.0 MiB)

The attempt to merge lp:~blake-rouse/maas/queue-status-messages into lp:maas failed. Below is the output from the failed tests.

Hit:1 http://prodstack-zone-2.clouds.archive.ubuntu.com/ubuntu xenial InRelease
Get:2 http://prodstack-zone-2.clouds.archive.ubuntu.com/ubuntu xenial-updates InRelease [102 kB]
Get:3 http://prodstack-zone-2.clouds.archive.ubuntu.com/ubuntu xenial-backports InRelease [102 kB]
Get:4 http://security.ubuntu.com/ubuntu xenial-security InRelease [102 kB]
Get:5 http://prodstack-zone-2.clouds.archive.ubuntu.com/ubuntu xenial-updates/main Sources [233 kB]
Get:6 http://prodstack-zone-2.clouds.archive.ubuntu.com/ubuntu xenial-updates/main amd64 Packages [483 kB]
Get:7 http://prodstack-zone-2.clouds.archive.ubuntu.com/ubuntu xenial-updates/universe amd64 Packages [411 kB]
Get:8 http://security.ubuntu.com/ubuntu xenial-security/main Sources [61.1 kB]
Get:9 http://security.ubuntu.com/ubuntu xenial-security/main amd64 Packages [219 kB]
Get:10 http://security.ubuntu.com/ubuntu xenial-security/universe amd64 Packages [79.1 kB]
Fetched 1,793 kB in 0s (2,724 kB/s)
Reading package lists...
sudo DEBIAN_FRONTEND=noninteractive apt-get -y \
    --no-install-recommends install apache2 archdetect-deb authbind avahi-utils bash bind9 bind9utils build-essential bzr bzr-builddeb chromium-browser chromium-chromedriver curl daemontools debhelper dh-apport dh-systemd distro-info dnsutils firefox freeipmi-tools git gjs ipython isc-dhcp-common isc-dhcp-server libjs-angularjs libjs-jquery libjs-jquery-hotkeys libjs-yui3-full libjs-yui3-min libnss-wrapper libpq-dev make nodejs-legacy npm postgresql psmisc pxelinux python3-all python3-apt python3-attr python3-bson python3-convoy python3-crochet python3-cssselect python3-curtin python3-dev python3-distro-info python3-django python3-django-nose python3-django-piston3 python3-dnspython python3-docutils python3-formencode python3-hivex python3-httplib2 python3-jinja2 python3-jsonschema python3-lxml python3-netaddr python3-netifaces python3-novaclient python3-oauth python3-oauthlib python3-openssl python3-paramiko python3-petname python3-pexpect python3-psycopg2 python3-pyinotify python3-pyparsing python3-pyvmomi python3-requests python3-seamicroclient python3-setuptools python3-simplestreams python3-sphinx python3-tempita python3-twisted python3-txtftp python3-tz python3-yaml python3-zope.interface python-bson python-crochet python-django python-django-piston python-djorm-ext-pgarray python-formencode python-lxml python-netaddr python-netifaces python-pocket-lint python-psycopg2 python-simplejson python-tempita python-twisted python-yaml socat syslinux-common tgt ubuntu-cloudimage-keyring wget xvfb
Reading package lists...
Building dependency tree...
Reading state information...
authbind is already the newest version (2.1.1+nmu1).
avahi-utils is already the newest version (0.6.32~rc+dfsg-1ubuntu2).
build-essential is already the newest version (12.1ubuntu2).
debhelper is already the newest version (9.20160115ubuntu3).
distro-info is already the newest version (0.14build1).
git is already the newest version (1:2.7.4-0ubuntu1).
libjs-angularjs is already the newest version (1.2.28-1ubuntu2).
libjs-jquery is ...

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/maasserver/eventloop.py'
2--- src/maasserver/eventloop.py 2016-12-05 09:38:41 +0000
3+++ src/maasserver/eventloop.py 2017-02-28 20:14:47 +0000
4@@ -120,6 +120,11 @@
5 return RackControllerService(postgresListener, advertisingService)
6
7
8+def make_StatusWorkerService(dbtasks):
9+ from metadataserver.api_twisted import StatusWorkerService
10+ return StatusWorkerService(dbtasks)
11+
12+
13 def make_ServiceMonitorService(advertisingService):
14 from maasserver.regiondservices import service_monitor_service
15 return service_monitor_service.ServiceMonitorService(advertisingService)
16@@ -151,7 +156,7 @@
17 return ntp.RegionNetworkTimeProtocolService(reactor)
18
19
20-def make_WebApplicationService(postgresListener):
21+def make_WebApplicationService(postgresListener, statusWorker):
22 from maasserver.webapp import WebApplicationService
23 site_port = DEFAULT_PORT # config["port"]
24 # Make a socket with SO_REUSEPORT set so that we can run multiple web
25@@ -169,7 +174,8 @@
26 site_endpoint = AdoptedStreamServerEndpoint(reactor, s.fileno(), s.family)
27 site_endpoint.port = site_port # Make it easy to get the port number.
28 site_endpoint.socket = s # Prevent garbage collection.
29- site_service = WebApplicationService(site_endpoint, postgresListener)
30+ site_service = WebApplicationService(
31+ site_endpoint, postgresListener, statusWorker)
32 return site_service
33
34
35@@ -245,13 +251,18 @@
36 "web": {
37 "only_on_master": False,
38 "factory": make_WebApplicationService,
39- "requires": ["postgres-listener"],
40+ "requires": ["postgres-listener", "status-worker"],
41 },
42 "service-monitor": {
43 "only_on_master": True,
44 "factory": make_ServiceMonitorService,
45 "requires": ["rpc-advertise"],
46 },
47+ "status-worker": {
48+ "only_on_master": False,
49+ "factory": make_StatusWorkerService,
50+ "requires": ["database-tasks"],
51+ },
52 "networks-monitor": {
53 "only_on_master": False,
54 "factory": make_NetworksMonitoringService,
55
56=== modified file 'src/maasserver/models/event.py'
57--- src/maasserver/models/event.py 2016-12-07 12:46:14 +0000
58+++ src/maasserver/models/event.py 2017-02-28 20:14:47 +0000
59@@ -32,7 +32,8 @@
60
61 def register_event_and_event_type(
62 self, system_id, type_name, type_description='',
63- type_level=logging.INFO, event_action='', event_description=''):
64+ type_level=logging.INFO, event_action='', event_description='',
65+ created=None):
66 """Register EventType if it does not exist, then register the Event."""
67 node = Node.objects.get(system_id=system_id)
68 try:
69@@ -42,9 +43,9 @@
70 # We didn't find it so register it.
71 event_type = EventType.objects.register(
72 type_name, type_description, type_level)
73- Event.objects.create(
74+ return Event.objects.create(
75 node=node, type=event_type, action=event_action,
76- description=event_description)
77+ description=event_description, created=created)
78
79 def create_node_event(
80 self, system_id, event_type, event_action='',
81
82=== modified file 'src/maasserver/models/node.py'
83--- src/maasserver/models/node.py 2017-02-17 21:27:46 +0000
84+++ src/maasserver/models/node.py 2017-02-28 20:14:47 +0000
85@@ -1037,7 +1037,7 @@
86
87 def __str__(self):
88 if self.hostname:
89- return "%s (%s)" % (self.system_id, self.fqdn)
90+ return "%s (%s)" % (self.system_id, self.hostname)
91 else:
92 return self.system_id
93
94
95=== modified file 'src/maasserver/models/tests/test_event.py'
96--- src/maasserver/models/tests/test_event.py 2016-12-07 12:46:14 +0000
97+++ src/maasserver/models/tests/test_event.py 2017-02-28 20:14:47 +0000
98@@ -33,6 +33,16 @@
99 system_id=node.system_id, type_name=event_type.name)
100 self.assertIsNotNone(Event.objects.get(node=node))
101
102+ def test_register_event_and_event_type_registers_event_with_datetime(self):
103+ # EvenType exists
104+ node = factory.make_Node()
105+ event_type = factory.make_EventType()
106+ created = factory.make_date()
107+ event = Event.objects.register_event_and_event_type(
108+ system_id=node.system_id, type_name=event_type.name,
109+ created=created)
110+ self.assertEqual(created, event.created)
111+
112 def test_register_event_and_event_type_registers_event_for_new_type(self):
113 # EventType does not exist
114 node = factory.make_Node()
115
116=== modified file 'src/maasserver/preseed.py'
117--- src/maasserver/preseed.py 2017-02-22 14:55:09 +0000
118+++ src/maasserver/preseed.py 2017-02-28 20:14:47 +0000
119@@ -191,6 +191,7 @@
120 'consumer_key': token.consumer.key,
121 'token_key': token.key,
122 'token_secret': token.secret,
123+ 'level': 'INFO',
124 },
125 },
126 'install': {
127
128=== modified file 'src/maasserver/tests/test_eventloop.py'
129--- src/maasserver/tests/test_eventloop.py 2017-01-28 00:51:47 +0000
130+++ src/maasserver/tests/test_eventloop.py 2017-02-28 20:14:47 +0000
131@@ -32,6 +32,7 @@
132 )
133 from maastesting.factory import factory
134 from maastesting.testcase import MAASTestCase
135+from metadataserver import api_twisted
136 from testtools.matchers import (
137 Equals,
138 IsInstance,
139@@ -254,7 +255,7 @@
140
141 def test_make_WebApplicationService(self):
142 service = eventloop.make_WebApplicationService(
143- FakePostgresListenerService())
144+ FakePostgresListenerService(), sentinel.status_worker)
145 self.assertThat(service, IsInstance(webapp.WebApplicationService))
146 # The endpoint is set to port 5243 on localhost.
147 self.assertThat(service.endpoint, MatchesStructure.byEquality(
148@@ -271,7 +272,7 @@
149 eventloop.loop.factories["web"]["factory"])
150 # Has a dependency of postgres-listener.
151 self.assertEquals(
152- ["postgres-listener"],
153+ ["postgres-listener", "status-worker"],
154 eventloop.loop.factories["web"]["requires"])
155 self.assertFalse(
156 eventloop.loop.factories["web"]["only_on_master"])
157@@ -308,6 +309,22 @@
158 self.assertTrue(
159 eventloop.loop.factories["service-monitor"]["only_on_master"])
160
161+ def test_make_StatusWorkerService(self):
162+ service = eventloop.make_StatusWorkerService(
163+ sentinel.dbtasks)
164+ self.assertThat(service, IsInstance(
165+ api_twisted.StatusWorkerService))
166+ # It is registered as a factory in RegionEventLoop.
167+ self.assertIs(
168+ eventloop.make_StatusWorkerService,
169+ eventloop.loop.factories["status-worker"]["factory"])
170+ # Has a dependency of database-tasks.
171+ self.assertEquals(
172+ ["database-tasks"],
173+ eventloop.loop.factories["status-worker"]["requires"])
174+ self.assertFalse(
175+ eventloop.loop.factories["status-worker"]["only_on_master"])
176+
177
178 class TestDisablingDatabaseConnections(MAASServerTestCase):
179
180
181=== modified file 'src/maasserver/tests/test_plugin.py'
182--- src/maasserver/tests/test_plugin.py 2017-01-28 00:51:47 +0000
183+++ src/maasserver/tests/test_plugin.py 2017-02-28 20:14:47 +0000
184@@ -108,6 +108,7 @@
185 "rpc-advertise",
186 "service-monitor",
187 "status-monitor",
188+ "status-worker",
189 "web",
190 ]
191 self.assertItemsEqual(expected_services, service.namedServices.keys())
192
193=== modified file 'src/maasserver/tests/test_preseed.py'
194--- src/maasserver/tests/test_preseed.py 2017-02-24 15:17:17 +0000
195+++ src/maasserver/tests/test_preseed.py 2017-02-28 20:14:47 +0000
196@@ -646,6 +646,9 @@
197 self.assertEqual(
198 token.secret,
199 reporter['reporting']['maas']['token_secret'])
200+ self.assertEqual(
201+ 'INFO',
202+ reporter['reporting']['maas']['level'])
203
204 def test__curtin_maas_reporter_without_events_support(self):
205 node = factory.make_Node_with_Interface_on_Subnet()
206
207=== modified file 'src/maasserver/tests/test_webapp.py'
208--- src/maasserver/tests/test_webapp.py 2017-01-28 00:51:47 +0000
209+++ src/maasserver/tests/test_webapp.py 2017-02-28 20:14:47 +0000
210@@ -17,6 +17,7 @@
211 webapp,
212 )
213 from maasserver.testing.listener import FakePostgresListenerService
214+from maasserver.webapp import OverlaySite
215 from maasserver.websockets.protocol import WebSocketFactory
216 from maastesting.factory import factory
217 from maastesting.matchers import MockCalledOnceWith
218@@ -27,10 +28,15 @@
219 Is,
220 IsInstance,
221 MatchesStructure,
222+ Not,
223 )
224 from twisted.internet import reactor
225 from twisted.internet.endpoints import TCP4ServerEndpoint
226-from twisted.web.resource import Resource
227+from twisted.web.error import UnsupportedMethod
228+from twisted.web.resource import (
229+ NoResource,
230+ Resource,
231+)
232 from twisted.web.server import Site
233 from twisted.web.test.requesthelper import (
234 DummyChannel,
235@@ -76,6 +82,70 @@
236 sentinel.command, single_path, sentinel.version))
237
238
239+class TestOverlaySite(MAASTestCase):
240+
241+ def test__init__(self):
242+ root = Resource()
243+ site = OverlaySite(root)
244+ self.assertThat(site, IsInstance(Site))
245+
246+ def test_getResourceFor_returns_no_resource_wo_underlay(self):
247+ root = Resource()
248+ site = OverlaySite(root)
249+ request = DummyRequest([b'MAAS'])
250+ resource = site.getResourceFor(request)
251+ self.assertThat(resource, IsInstance(NoResource))
252+
253+ def test_getResourceFor_wraps_render_wo_underlay(self):
254+ root = Resource()
255+ maas = Resource()
256+ mock_render = self.patch(maas, 'render')
257+ root.putChild(b'MAAS', maas)
258+ site = OverlaySite(root)
259+ request = DummyRequest([b'MAAS'])
260+ resource = site.getResourceFor(request)
261+ self.assertThat(resource, Is(maas))
262+ self.assertThat(resource.render, Not(Is(mock_render)))
263+ resource.render(request)
264+ self.assertThat(mock_render, MockCalledOnceWith(request))
265+
266+ def test_getResourceFor_wraps_render_wo_underlay_raises_no_method(self):
267+ root = Resource()
268+ maas = Resource()
269+ root.putChild(b'MAAS', maas)
270+ site = OverlaySite(root)
271+ request = DummyRequest([b'MAAS'])
272+ resource = site.getResourceFor(request)
273+ self.assertThat(resource, Is(maas))
274+ self.assertRaises(UnsupportedMethod, resource.render, request)
275+
276+ def test_getResourceFor_returns_resource_from_underlay(self):
277+ underlay_root = Resource()
278+ underlay_maas = Resource()
279+ underlay_root.putChild(b'MAAS', underlay_maas)
280+ overlay_root = Resource()
281+ site = OverlaySite(overlay_root)
282+ site.underlay = Site(underlay_root)
283+ request = DummyRequest([b'MAAS'])
284+ resource = site.getResourceFor(request)
285+ self.assertThat(resource, Is(underlay_maas))
286+
287+ def test_getResourceFor_calls_render_on_underlay_when_no_method(self):
288+ underlay_root = Resource()
289+ underlay_maas = Resource()
290+ mock_underlay_maas_render = self.patch(underlay_maas, 'render')
291+ underlay_root.putChild(b'MAAS', underlay_maas)
292+ overlay_root = Resource()
293+ overlay_maas = Resource()
294+ overlay_root.putChild(b'MAAS', overlay_maas)
295+ site = OverlaySite(overlay_root)
296+ site.underlay = Site(underlay_root)
297+ request = DummyRequest([b'MAAS'])
298+ resource = site.getResourceFor(request)
299+ resource.render(request)
300+ self.assertThat(mock_underlay_maas_render, MockCalledOnceWith(request))
301+
302+
303 class TestResourceOverlay(MAASTestCase):
304
305 def make_resourceoverlay(self):
306@@ -100,7 +170,7 @@
307 listener = FakePostgresListenerService()
308 service_endpoint = self.make_endpoint()
309 service = webapp.WebApplicationService(
310- service_endpoint, listener)
311+ service_endpoint, listener, sentinel.status_worker)
312 # Patch the getServiceNamed so the WebSocketFactory does not
313 # error trying to register for events from the RPC service. In this
314 # test the RPC service is not started.
315@@ -197,11 +267,24 @@
316
317 service.startService()
318
319+ # Overlay
320+ site = service.site
321+ self.assertThat(site, IsInstance(OverlaySite))
322 resource = service.site.resource
323 self.assertThat(resource, IsInstance(Resource))
324 overlay_resource = resource.getChildWithDefault(b"MAAS", request=None)
325- self.assertThat(overlay_resource, IsInstance(webapp.ResourceOverlay))
326- self.assertThat(overlay_resource.basis, MatchesStructure(
327+ self.assertThat(overlay_resource, IsInstance(Resource))
328+
329+ # Underlay
330+ site = service.site.underlay
331+ self.assertThat(site, IsInstance(Site))
332+ underlay_resource = site.resource
333+ self.assertThat(underlay_resource, IsInstance(Resource))
334+ underlay_maas_resource = underlay_resource.getChildWithDefault(
335+ b"MAAS", request=None)
336+ self.assertThat(
337+ underlay_maas_resource, IsInstance(webapp.ResourceOverlay))
338+ self.assertThat(underlay_maas_resource.basis, MatchesStructure(
339 _reactor=Is(reactor), _threadpool=Is(service.threadpool),
340 _application=IsInstance(WSGIHandler)))
341
342
343=== modified file 'src/maasserver/webapp.py'
344--- src/maasserver/webapp.py 2016-10-28 15:58:32 +0000
345+++ src/maasserver/webapp.py 2017-02-28 20:14:47 +0000
346@@ -1,4 +1,4 @@
347-# Copyright 2014-2016 Canonical Ltd. This software is licensed under the
348+# Copyright 2014-2017 Canonical Ltd. This software is licensed under the
349 # GNU Affero General Public License version 3 (see the file LICENSE).
350
351 """The MAAS Web Application."""
352@@ -7,6 +7,8 @@
353 "WebApplicationService",
354 ]
355
356+import copy
357+from functools import partial
358 from http.client import SERVICE_UNAVAILABLE
359 import re
360
361@@ -19,6 +21,7 @@
362 lookupProtocolForFactory,
363 WebSocketsResource,
364 )
365+from metadataserver.api_twisted import StatusHandlerResource
366 from provisioningserver.logger import LegacyLogger
367 from provisioningserver.utils.twisted import (
368 asynchronous,
369@@ -30,8 +33,10 @@
370 reactor,
371 )
372 from twisted.python import failure
373+from twisted.web.error import UnsupportedMethod
374 from twisted.web.resource import (
375 ErrorPage,
376+ NoResource,
377 Resource,
378 )
379 from twisted.web.server import (
380@@ -81,6 +86,49 @@
381 command, path, version)
382
383
384+class OverlaySite(Site):
385+ """A site that is over another site.
386+
387+ If this site cannot resolve a valid resource to handle the request, then
388+ the underlay site gets passed the request to process.
389+ """
390+
391+ underlay = None
392+
393+ def getResourceFor(self, request):
394+ """Override to support an underlay site.
395+
396+ If this site cannot return a valid resource to request is passed to
397+ the underlay site to resolve the request.
398+ """
399+ def call_underlay(request):
400+ # Reset the paths and forward to the underlay site.
401+ request.prepath = []
402+ request.postpath = postpath
403+ return self.underlay.getResourceFor(request)
404+
405+ def wrap_render(orig_render, request):
406+ # Wrap the render call of the resource, catching any
407+ # UnsupportedMethod exceptions and forwarding those onto
408+ # the underlay site.
409+ try:
410+ return orig_render(request)
411+ except UnsupportedMethod:
412+ if self.underlay is not None:
413+ resource = call_underlay(request)
414+ return resource.render(request)
415+ else:
416+ raise
417+
418+ postpath = copy.copy(request.postpath)
419+ result = super(OverlaySite, self).getResourceFor(request)
420+ if isinstance(result, NoResource) and self.underlay is not None:
421+ return call_underlay(request)
422+ else:
423+ result.render = partial(wrap_render, result.render)
424+ return result
425+
426+
427 class ResourceOverlay(Resource, object):
428 """A resource that can fall-back to a basis resource.
429
430@@ -126,13 +174,14 @@
431 the web application.
432 """
433
434- def __init__(self, endpoint, listener):
435- self.site = Site(StartPage())
436+ def __init__(self, endpoint, listener, status_worker):
437+ self.site = OverlaySite(StartPage())
438 self.site.requestFactory = CleanPathRequest
439 super(WebApplicationService, self).__init__(endpoint, self.site)
440 self.websocket = WebSocketFactory(listener)
441 self.threadpool = ThreadPoolLimiter(
442 reactor.threadpoolForDatabase, concurrency.webapp)
443+ self.status_worker = status_worker
444
445 def prepareApplication(self):
446 """Return the WSGI application.
447@@ -157,16 +206,33 @@
448 with RegionConfiguration.open() as config:
449 static_root = File(config.static_root)
450
451+ # Setup resources to process paths that twisted handles.
452+ metadata = Resource()
453+ metadata.putChild(b'status', StatusHandlerResource(self.status_worker))
454+
455+ maas = Resource()
456+ maas.putChild(b'metadata', metadata)
457+ maas.putChild(b'static', static_root)
458+ maas.putChild(
459+ b'ws',
460+ WebSocketsResource(lookupProtocolForFactory(self.websocket)))
461+
462 root = Resource()
463- webapp = ResourceOverlay(
464+ root.putChild(b'', Redirect(b"MAAS/"))
465+ root.putChild(b'MAAS', maas)
466+
467+ # Setup the resources to process paths that django handles.
468+ underlay_maas = ResourceOverlay(
469 WSGIResource(reactor, self.threadpool, application))
470- root.putChild(b"", Redirect(b"MAAS/"))
471- root.putChild(b"MAAS", webapp)
472- webapp.putChild(
473- b'ws',
474- WebSocketsResource(lookupProtocolForFactory(self.websocket)))
475- webapp.putChild(b'static', static_root)
476+ underlay_root = Resource()
477+ underlay_root.putChild(b'MAAS', underlay_maas)
478+ underlay_site = Site(underlay_root)
479+ underlay_site.requestFactory = CleanPathRequest
480+
481+ # Setup the main resource as the twisted handler and the underlay
482+ # resource as the django handler.
483 self.site.resource = root
484+ self.site.underlay = underlay_site
485
486 def installFailed(self, failure):
487 """Display a page explaining why the web app could not start."""
488
489=== modified file 'src/metadataserver/api.py'
490--- src/metadataserver/api.py 2017-02-17 14:23:04 +0000
491+++ src/metadataserver/api.py 2017-02-28 20:14:47 +0000
492@@ -176,7 +176,7 @@
493
494
495 def add_event_to_node_event_log(
496- node, origin, action, description, result=None):
497+ node, origin, action, description, result=None, created=None):
498 """Add an entry to the node's event log."""
499 if node.status == NODE_STATUS.COMMISSIONING:
500 if result in ['SUCCESS', None]:
501@@ -205,7 +205,7 @@
502 node.system_id, type_name, type_level=event_details.level,
503 type_description=event_details.description,
504 event_action=action,
505- event_description="'%s' %s" % (origin, description))
506+ event_description="'%s' %s" % (origin, description), created=created)
507
508
509 def process_file(
510
511=== added file 'src/metadataserver/api_twisted.py'
512--- src/metadataserver/api_twisted.py 1970-01-01 00:00:00 +0000
513+++ src/metadataserver/api_twisted.py 2017-02-28 20:14:47 +0000
514@@ -0,0 +1,290 @@
515+# Copyright 2017 Canonical Ltd. This software is licensed under the
516+# GNU Affero General Public License version 3 (see the file LICENSE).
517+
518+"""Metadata API that runs in the Twisted reactor."""
519+
520+import base64
521+import bz2
522+from collections import defaultdict
523+from datetime import datetime
524+import json
525+
526+from maasserver.api.utils import extract_oauth_key_from_auth_header
527+from maasserver.enum import (
528+ NODE_STATUS,
529+ NODE_TYPE,
530+)
531+from maasserver.utils.orm import (
532+ in_transaction,
533+ transactional,
534+ TransactionManagementError,
535+)
536+from maasserver.utils.threads import deferToDatabase
537+from metadataserver import logger
538+from metadataserver.api import (
539+ add_event_to_node_event_log,
540+ process_file,
541+)
542+from metadataserver.models import NodeKey
543+from provisioningserver.logger import LegacyLogger
544+from twisted.application.internet import TimerService
545+from twisted.internet import reactor
546+from twisted.web.resource import Resource
547+
548+
549+log = LegacyLogger()
550+
551+
552+class StatusHandlerResource(Resource):
553+
554+ # Has no children, so getChild will not be called.
555+ isLeaf = True
556+
557+ # Only POST operations are allowed.
558+ allowedMethods = [b'POST']
559+
560+ # Required keys in the message.
561+ requiredMessageKeys = ['event_type', 'origin', 'name', 'description']
562+
563+ def __init__(self, status_worker):
564+ self.worker = status_worker
565+
566+ def render_POST(self, request):
567+ # Extract the authorization from request. This only does a basic
568+ # check that its provided. The status worker will do the authorization,
569+ # the negative to this is that the calling client will no know. To
570+ # them the message was accepted. This overall is okay since they are
571+ # just status messages.
572+ authorization = request.getHeader(b'authorization')
573+ if not authorization:
574+ request.setResponseCode(401)
575+ return b""
576+ authorization = extract_oauth_key_from_auth_header(
577+ authorization.decode('utf-8'))
578+ if authorization is None:
579+ request.setResponseCode(401)
580+ return b""
581+
582+ # Load the content to ensure that its atleast correct before placing
583+ # it into the status worker.
584+ payload = request.content.read()
585+ try:
586+ payload = payload.decode("ascii")
587+ except UnicodeDecodeError as error:
588+ request.setResponseCode(400)
589+ error_msg = "Status payload must be ASCII-only: %s" % error
590+ logger.error(error_msg)
591+ return error_msg.encode('ascii')
592+
593+ try:
594+ message = json.loads(payload)
595+ except ValueError:
596+ request.setResponseCode(400)
597+ error_msg = "Status payload is not valid JSON:\n%s\n\n" % payload
598+ logger.error(error_msg)
599+ return error_msg.encode('ascii')
600+
601+ # Filter the level early so less messages need to be processed.
602+ level = message.get('level', None)
603+ if level is not None and level == "DEBUG":
604+ # Ignore all debug messages.
605+ request.setResponseCode(204)
606+ return b""
607+
608+ # Ensure the other required keys exist.
609+ missing_keys = [
610+ key
611+ for key in self.requiredMessageKeys
612+ if key not in message
613+ ]
614+ if len(missing_keys) > 0:
615+ request.setResponseCode(400)
616+ error_msg = (
617+ 'Missing parameter(s) %s in '
618+ 'status message.' % ', '.join(missing_keys))
619+ logger.error(error_msg)
620+ return error_msg.encode('ascii')
621+
622+ # Queue the message with its authorization in the status worker.
623+ self.worker.queueMessage(authorization, message)
624+ request.setResponseCode(204)
625+ return b""
626+
627+
628+class StatusWorkerService(TimerService, object):
629+ """Service to update nodes from recieved status messages."""
630+
631+ check_interval = 60 # Every second.
632+
633+ def __init__(self, dbtasks, clock=reactor):
634+ # Call self._tryUpdateNodes() every self.check_interval.
635+ super(StatusWorkerService, self).__init__(
636+ self.check_interval, self._tryUpdateNodes)
637+ self.dbtasks = dbtasks
638+ self.clock = clock
639+ self.queue = defaultdict(list)
640+
641+ def _tryUpdateNodes(self):
642+ if len(self.queue) != 0:
643+ queue, self.queue = self.queue, defaultdict(list)
644+ d = deferToDatabase(self._preProcessQueue, queue)
645+ d.addCallback(self._processMessagesLater)
646+ d.addErrback(log.err, "Failed to process node status messages.")
647+ return d
648+
649+ @transactional
650+ def _preProcessQueue(self, queue):
651+ """Check authorizations.
652+
653+ Return a list of (node, messages) tuples, where each node is found
654+ from its authorisation.
655+ """
656+ keys = NodeKey.objects.filter(
657+ key__in=list(queue.keys())).select_related('node')
658+ return [
659+ (key.node, queue[key.key])
660+ for key in keys
661+ ]
662+
663+ def _processMessagesLater(self, tasks):
664+ # Move all messages on the queue off onto the database tasks queue.
665+ # We're not going to wait for them to be processed because we can't /
666+ # don't apply back-pressure to those systems that are producing these
667+ # messages anyway.
668+ for node, messages in tasks:
669+ self.dbtasks.addTask(self._processMessages, node, messages)
670+
671+ def _processMessages(self, node, messages):
672+ # Push the messages into the database, recording them for this node.
673+ # This should be called in a non-reactor thread with a pre-existing
674+ # connection (e.g. via deferToDatabase).
675+ if in_transaction():
676+ raise TransactionManagementError(
677+ "_processMessages must be called from "
678+ "outside of a transaction.")
679+ else:
680+ # Here we're in a database thread, with a database connection.
681+ for message in messages:
682+ try:
683+ self._processMessage(node, message)
684+ except:
685+ log.err(
686+ None,
687+ "Failed to process message "
688+ "for node: %s" % node.hostname)
689+
690+ @transactional
691+ def _processMessage(self, node, message):
692+ event_type = message['event_type']
693+ origin = message['origin']
694+ activity_name = message['name']
695+ description = message['description']
696+ result = message.get('result', None)
697+
698+ # Add this event to the node event log.
699+ add_event_to_node_event_log(
700+ node, origin, activity_name, description, result,
701+ message['timestamp'])
702+
703+ # Group files together with the ScriptResult they belong.
704+ results = {}
705+ for sent_file in message.get('files', []):
706+ # Set the result type according to the node's status.
707+ if node.status == NODE_STATUS.TESTING:
708+ script_set = node.current_testing_script_set
709+ elif (node.status == NODE_STATUS.COMMISSIONING or
710+ node.node_type != NODE_TYPE.MACHINE):
711+ script_set = node.current_commissioning_script_set
712+ elif node.status == NODE_STATUS.DEPLOYING:
713+ script_set = node.current_installation_script_set
714+ else:
715+ raise ValueError(
716+ "Invalid status for saving files: %d" % node.status)
717+
718+ script_name = sent_file['path']
719+ content = self._retrieve_content(
720+ compression=sent_file.get('compression', None),
721+ encoding=sent_file['encoding'],
722+ content=sent_file['content'])
723+ process_file(results, script_set, script_name, content, sent_file)
724+
725+ # Commit results to the database.
726+ for script_result, args in results.items():
727+ script_result.store_result(**args)
728+
729+ # Update the last ping in any status which uses a script_set whenever a
730+ # node in that status contacts us.
731+ script_set_statuses = {
732+ NODE_STATUS.COMMISSIONING: node.current_commissioning_script_set,
733+ NODE_STATUS.TESTING: node.current_testing_script_set,
734+ NODE_STATUS.DEPLOYING: node.current_installation_script_set,
735+ }
736+ script_set = script_set_statuses.get(node.status)
737+ if script_set is not None:
738+ script_set.last_ping = message['timestamp']
739+ script_set.save()
740+
741+ # At the end of a top-level event, we change the node status.
742+ save_node = False
743+ if self._is_top_level(activity_name) and event_type == 'finish':
744+ if node.status == NODE_STATUS.COMMISSIONING:
745+ if result in ['FAIL', 'FAILURE']:
746+ node.status = NODE_STATUS.FAILED_COMMISSIONING
747+ save_node = True
748+ elif node.status == NODE_STATUS.DEPLOYING:
749+ if result in ['FAIL', 'FAILURE']:
750+ node.mark_failed(
751+ comment="Installation failed (refer to the "
752+ "installation log for more information).")
753+ save_node = True
754+ elif node.status == NODE_STATUS.DISK_ERASING:
755+ if result in ['FAIL', 'FAILURE']:
756+ node.mark_failed(comment="Failed to erase disks.")
757+ save_node = True
758+
759+ # Deallocate the node if we enter any terminal state.
760+ if node.node_type == NODE_TYPE.MACHINE and node.status in [
761+ NODE_STATUS.READY,
762+ NODE_STATUS.FAILED_COMMISSIONING]:
763+ node.status_expires = None
764+ node.owner = None
765+ node.error = 'failed: %s' % description
766+ save_node = True
767+
768+ if save_node:
769+ node.save()
770+
771+ def _retrieve_content(self, compression, encoding, content):
772+ """Extract the content of the sent file."""
773+ # Select the appropriate decompressor.
774+ if compression is None:
775+ decompress = lambda s: s
776+ elif compression == 'bzip2':
777+ decompress = bz2.decompress
778+ else:
779+ raise ValueError('Invalid compression: %s' % compression)
780+
781+ # Select the appropriate decoder.
782+ if encoding == 'base64':
783+ decode = base64.decodebytes
784+ else:
785+ raise ValueError('Invalid encoding: %s' % encoding)
786+
787+ return decompress(decode(content.encode("ascii")))
788+
789+ def _is_top_level(self, activity_name):
790+ """Top-level events do not have slashes in their names."""
791+ return '/' not in activity_name
792+
793+ def queueMessage(self, authorization, message):
794+ """Queue message for processing."""
795+ # Ensure a timestamp exists in the message and convert it to a
796+ # datetime object. This is used to update the `last_ping` and the
797+ # time for the event message.
798+ timestamp = message.get('timestamp', None)
799+ if timestamp is not None:
800+ message['timestamp'] = datetime.utcfromtimestamp(
801+ message['timestamp'])
802+ else:
803+ message['timestamp'] = datetime.utcnow()
804+ self.queue[authorization].append(message)
805
806=== added file 'src/metadataserver/tests/test_api_twisted.py'
807--- src/metadataserver/tests/test_api_twisted.py 1970-01-01 00:00:00 +0000
808+++ src/metadataserver/tests/test_api_twisted.py 2017-02-28 20:14:47 +0000
809@@ -0,0 +1,766 @@
810+# Copyright 2017 Canonical Ltd. This software is licensed under the
811+# GNU Affero General Public License version 3 (see the file LICENSE).
812+
813+"""Tests for the twisted metadata API."""
814+
815+__all__ = []
816+
817+import base64
818+import bz2
819+from datetime import datetime
820+from io import BytesIO
821+import json
822+from unittest.mock import (
823+ Mock,
824+ sentinel,
825+)
826+
827+from crochet import wait_for
828+from maasserver.enum import NODE_STATUS
829+from maasserver.models import (
830+ Event,
831+ Tag,
832+)
833+from maasserver.models.signals.testing import SignalsDisabled
834+from maasserver.testing.factory import factory
835+from maasserver.testing.testcase import (
836+ MAASServerTestCase,
837+ MAASTransactionServerTestCase,
838+)
839+from maasserver.utils.orm import (
840+ reload_object,
841+ transactional,
842+ TransactionManagementError,
843+)
844+from maasserver.utils.threads import deferToDatabase
845+from maastesting.matchers import (
846+ MockCalledOnceWith,
847+ MockNotCalled,
848+)
849+from maastesting.testcase import MAASTestCase
850+from metadataserver import api
851+from metadataserver.api_twisted import (
852+ StatusHandlerResource,
853+ StatusWorkerService,
854+)
855+from metadataserver.enum import SCRIPT_STATUS
856+from metadataserver.models import NodeKey
857+from testtools import ExpectedException
858+from testtools.matchers import (
859+ Equals,
860+ MatchesListwise,
861+ MatchesSetwise,
862+)
863+from twisted.internet.defer import inlineCallbacks
864+from twisted.web.test.requesthelper import DummyRequest
865+
866+
867+wait_for_reactor = wait_for(30)
868+
869+
870+class TestStatusHandlerResource(MAASTestCase):
871+
872+ def make_request(self, content=None, token=None):
873+ request = DummyRequest([])
874+ if token is None:
875+ token = factory.make_name('token')
876+ request.requestHeaders.addRawHeader(
877+ b'authorization', 'oauth_token=%s' % token)
878+ if content is not None:
879+ request.content = BytesIO(content)
880+ return request
881+
882+ def test__init__(self):
883+ resource = StatusHandlerResource(sentinel.status_worker)
884+ self.assertIs(sentinel.status_worker, resource.worker)
885+ self.assertTrue(resource.isLeaf)
886+ self.assertEquals([b'POST'], resource.allowedMethods)
887+ self.assertEquals(
888+ ['event_type', 'origin', 'name', 'description'],
889+ resource.requiredMessageKeys)
890+
891+ def test__render_POST_missing_authorization(self):
892+ resource = StatusHandlerResource(sentinel.status_worker)
893+ request = DummyRequest([])
894+ output = resource.render_POST(request)
895+ self.assertEquals(b'', output)
896+ self.assertEquals(401, request.responseCode)
897+
898+ def test__render_POST_empty_authorization(self):
899+ resource = StatusHandlerResource(sentinel.status_worker)
900+ request = DummyRequest([])
901+ request.requestHeaders.addRawHeader(b'authorization', '')
902+ output = resource.render_POST(request)
903+ self.assertEquals(b'', output)
904+ self.assertEquals(401, request.responseCode)
905+
906+ def test__render_POST_bad_authorization(self):
907+ resource = StatusHandlerResource(sentinel.status_worker)
908+ request = DummyRequest([])
909+ request.requestHeaders.addRawHeader(
910+ b'authorization', factory.make_name('auth'))
911+ output = resource.render_POST(request)
912+ self.assertEquals(b'', output)
913+ self.assertEquals(401, request.responseCode)
914+
915+ def test__render_POST_body_must_be_ascii(self):
916+ resource = StatusHandlerResource(sentinel.status_worker)
917+ request = self.make_request(content=b'\xe9')
918+ output = resource.render_POST(request)
919+ self.assertEquals(
920+ b"Status payload must be ASCII-only: 'ascii' codec can't "
921+ b"decode byte 0xe9 in position 0: ordinal not in range(128)",
922+ output)
923+ self.assertEquals(400, request.responseCode)
924+
925+ def test__render_POST_body_must_be_valid_json(self):
926+ resource = StatusHandlerResource(sentinel.status_worker)
927+ request = self.make_request(content=b'testing not json')
928+ output = resource.render_POST(request)
929+ self.assertEquals(
930+ b"Status payload is not valid JSON:\ntesting not json\n\n",
931+ output)
932+ self.assertEquals(400, request.responseCode)
933+
934+ def test__render_POST_ignores_debug_messages(self):
935+ resource = StatusHandlerResource(sentinel.status_worker)
936+ request = self.make_request(content=json.dumps({
937+ 'level': 'DEBUG',
938+ }).encode('ascii'))
939+ output = resource.render_POST(request)
940+ self.assertEquals(b'', output)
941+ self.assertEquals(204, request.responseCode)
942+
943+ def test__render_POST_validates_required_keys(self):
944+ resource = StatusHandlerResource(sentinel.status_worker)
945+ request = self.make_request(content=json.dumps({}).encode('ascii'))
946+ output = resource.render_POST(request)
947+ self.assertEquals(
948+ b'Missing parameter(s) event_type, origin, name, description '
949+ b'in status message.', output)
950+ self.assertEquals(400, request.responseCode)
951+
952+ def test__render_POST_queue_messages(self):
953+ status_worker = Mock()
954+ status_worker.queueMessage = Mock()
955+ resource = StatusHandlerResource(status_worker)
956+ message = {
957+ 'event_type': factory.make_name('type'),
958+ 'origin': factory.make_name('origin'),
959+ 'name': factory.make_name('name'),
960+ 'description': factory.make_name('description'),
961+ }
962+ token = factory.make_name('token')
963+ request = self.make_request(
964+ content=json.dumps(message).encode('ascii'), token=token)
965+ output = resource.render_POST(request)
966+ self.assertEquals(b'', output)
967+ self.assertEquals(204, request.responseCode)
968+ self.assertThat(
969+ status_worker.queueMessage, MockCalledOnceWith(token, message))
970+
971+
972+class TestStatusWorkerServiceTransactional(MAASTransactionServerTestCase):
973+
974+ @transactional
975+ def make_nodes_with_tokens(self):
976+ nodes = [
977+ factory.make_Node()
978+ for _ in range(3)
979+ ]
980+ return [
981+ (node, NodeKey.objects.get_token_for_node(node))
982+ for node in nodes
983+ ]
984+
985+ def make_message(self):
986+ return {
987+ 'event_type': factory.make_name('type'),
988+ 'origin': factory.make_name('origin'),
989+ 'name': factory.make_name('name'),
990+ 'description': factory.make_name('description'),
991+ 'timestamp': datetime.utcnow().timestamp(),
992+ }
993+
994+ def test__init__(self):
995+ worker = StatusWorkerService(sentinel.dbtasks, clock=sentinel.reactor)
996+ self.assertEqual(sentinel.dbtasks, worker.dbtasks)
997+ self.assertEqual(sentinel.reactor, worker.clock)
998+ self.assertEqual(60, worker.step)
999+ self.assertEqual((worker._tryUpdateNodes, tuple(), {}), worker.call)
1000+
1001+ def test__tryUpdateNodes_returns_None_when_empty_queue(self):
1002+ worker = StatusWorkerService(sentinel.dbtasks)
1003+ self.assertIsNone(worker._tryUpdateNodes())
1004+
1005+ @wait_for_reactor
1006+ @inlineCallbacks
1007+ def test__tryUpdateNodes_sends_work_to_dbtasks(self):
1008+ nodes_with_tokens = yield deferToDatabase(self.make_nodes_with_tokens)
1009+ node_messages = {
1010+ node: [
1011+ self.make_message()
1012+ for _ in range(3)
1013+ ]
1014+ for node, _ in nodes_with_tokens
1015+ }
1016+ dbtasks = Mock()
1017+ dbtasks.addTask = Mock()
1018+ worker = StatusWorkerService(dbtasks)
1019+ for node, token in nodes_with_tokens:
1020+ for message in node_messages[node]:
1021+ worker.queueMessage(token.key, message)
1022+ yield worker._tryUpdateNodes()
1023+ call_args = [
1024+ (call_arg[0][1], call_arg[0][2])
1025+ for call_arg in dbtasks.addTask.call_args_list
1026+ ]
1027+ self.assertThat(call_args, MatchesSetwise(*[
1028+ MatchesListwise([Equals(node), Equals(messages)])
1029+ for node, messages in node_messages.items()
1030+ ]))
1031+
1032+ @wait_for_reactor
1033+ @inlineCallbacks
1034+ def test__processMessages_fails_when_in_transaction(self):
1035+ worker = StatusWorkerService(sentinel.dbtasks)
1036+ with ExpectedException(TransactionManagementError):
1037+ yield deferToDatabase(
1038+ transactional(worker._processMessages),
1039+ sentinel.node, [sentinel.message])
1040+
1041+ @wait_for_reactor
1042+ @inlineCallbacks
1043+ def test__processMessages_calls_processMessage(self):
1044+ worker = StatusWorkerService(sentinel.dbtasks)
1045+ mock_processMessage = self.patch(worker, "_processMessage")
1046+ yield deferToDatabase(
1047+ worker._processMessages, sentinel.node, [sentinel.message])
1048+ self.assertThat(
1049+ mock_processMessage,
1050+ MockCalledOnceWith(sentinel.node, sentinel.message))
1051+
1052+
1053+def encode_as_base64(content):
1054+ return base64.encodebytes(content).decode("ascii")
1055+
1056+
1057+class TestStatusWorkerService(MAASServerTestCase):
1058+
1059+ def setUp(self):
1060+ super(TestStatusWorkerService, self).setUp()
1061+ self.useFixture(SignalsDisabled("power"))
1062+
1063+ def processMessage(self, node, payload):
1064+ worker = StatusWorkerService(sentinel.dbtasks)
1065+ worker._processMessage(node, payload)
1066+
1067+ def test_status_installation_result_does_not_affect_other_node(self):
1068+ node1 = factory.make_Node(status=NODE_STATUS.DEPLOYING)
1069+ node2 = factory.make_Node(status=NODE_STATUS.DEPLOYING)
1070+ payload = {
1071+ 'event_type': 'finish',
1072+ 'result': 'SUCCESS',
1073+ 'origin': 'curtin',
1074+ 'name': 'cmd-install',
1075+ 'description': 'Command Install',
1076+ 'timestamp': datetime.utcnow(),
1077+ }
1078+ self.processMessage(node1, payload)
1079+ self.assertEqual(
1080+ NODE_STATUS.DEPLOYING, reload_object(node2).status)
1081+ # Check last node1 event.
1082+ self.assertEqual(
1083+ "'curtin' Command Install",
1084+ Event.objects.filter(node=node1).last().description)
1085+ # There must me no events for node2.
1086+ self.assertFalse(Event.objects.filter(node=node2).exists())
1087+
1088+ def test_status_installation_success_leaves_node_deploying(self):
1089+ node = factory.make_Node(interface=True, status=NODE_STATUS.DEPLOYING)
1090+ payload = {
1091+ 'event_type': 'finish',
1092+ 'result': 'SUCCESS',
1093+ 'origin': 'curtin',
1094+ 'name': 'cmd-install',
1095+ 'description': 'Command Install',
1096+ 'timestamp': datetime.utcnow(),
1097+ }
1098+ self.processMessage(node, payload)
1099+ self.assertEqual(NODE_STATUS.DEPLOYING, reload_object(node).status)
1100+ # Check last node event.
1101+ self.assertEqual(
1102+ "'curtin' Command Install",
1103+ Event.objects.filter(node=node).last().description)
1104+
1105+ def test_status_commissioning_failure_leaves_node_failed(self):
1106+ node = factory.make_Node(
1107+ interface=True, status=NODE_STATUS.COMMISSIONING)
1108+ payload = {
1109+ 'event_type': 'finish',
1110+ 'result': 'FAILURE',
1111+ 'origin': 'curtin',
1112+ 'name': 'commissioning',
1113+ 'description': 'Commissioning',
1114+ 'timestamp': datetime.utcnow(),
1115+ }
1116+ self.processMessage(node, payload)
1117+ self.assertEqual(
1118+ NODE_STATUS.FAILED_COMMISSIONING, reload_object(node).status)
1119+ # Check last node event.
1120+ self.assertEqual(
1121+ "'curtin' Commissioning",
1122+ Event.objects.filter(node=node).last().description)
1123+
1124+ def test_status_commissioning_failure_clears_owner(self):
1125+ user = factory.make_User()
1126+ node = factory.make_Node(
1127+ interface=True, status=NODE_STATUS.COMMISSIONING, owner=user)
1128+ payload = {
1129+ 'event_type': 'finish',
1130+ 'result': 'FAILURE',
1131+ 'origin': 'curtin',
1132+ 'name': 'commissioning',
1133+ 'description': 'Commissioning',
1134+ 'timestamp': datetime.utcnow(),
1135+ }
1136+ self.assertEqual(user, node.owner) # Node has an owner
1137+ self.processMessage(node, payload)
1138+ self.assertEqual(
1139+ NODE_STATUS.FAILED_COMMISSIONING, reload_object(node).status)
1140+ self.assertIsNone(reload_object(node).owner)
1141+
1142+ def test_status_installation_failure_leaves_node_failed(self):
1143+ node = factory.make_Node(interface=True, status=NODE_STATUS.DEPLOYING)
1144+ payload = {
1145+ 'event_type': 'finish',
1146+ 'result': 'FAILURE',
1147+ 'origin': 'curtin',
1148+ 'name': 'cmd-install',
1149+ 'description': 'Command Install',
1150+ 'timestamp': datetime.utcnow(),
1151+ }
1152+ self.processMessage(node, payload)
1153+ self.assertEqual(
1154+ NODE_STATUS.FAILED_DEPLOYMENT, reload_object(node).status)
1155+ # Check last node event.
1156+ self.assertEqual(
1157+ "Installation failed (refer to the installation"
1158+ " log for more information).",
1159+ Event.objects.filter(node=node).last().description)
1160+
1161+ def test_status_installation_fail_leaves_node_failed(self):
1162+ node = factory.make_Node(interface=True, status=NODE_STATUS.DEPLOYING)
1163+ payload = {
1164+ 'event_type': 'finish',
1165+ 'result': 'FAIL',
1166+ 'origin': 'curtin',
1167+ 'name': 'cmd-install',
1168+ 'description': 'Command Install',
1169+ 'timestamp': datetime.utcnow(),
1170+ }
1171+ self.processMessage(node, payload)
1172+ self.assertEqual(
1173+ NODE_STATUS.FAILED_DEPLOYMENT, reload_object(node).status)
1174+ # Check last node event.
1175+ self.assertEqual(
1176+ "Installation failed (refer to the installation"
1177+ " log for more information).",
1178+ Event.objects.filter(node=node).last().description)
1179+
1180+ def test_status_installation_failure_doesnt_clear_owner(self):
1181+ user = factory.make_User()
1182+ node = factory.make_Node(
1183+ interface=True, status=NODE_STATUS.DEPLOYING, owner=user)
1184+ payload = {
1185+ 'event_type': 'finish',
1186+ 'result': 'FAILURE',
1187+ 'origin': 'curtin',
1188+ 'name': 'cmd-install',
1189+ 'description': 'Command Install',
1190+ 'timestamp': datetime.utcnow(),
1191+ }
1192+ self.assertEqual(user, node.owner) # Node has an owner
1193+ self.processMessage(node, payload)
1194+ self.assertEqual(
1195+ NODE_STATUS.FAILED_DEPLOYMENT, reload_object(node).status)
1196+ self.assertIsNotNone(reload_object(node).owner)
1197+
1198+ def test_status_commissioning_failure_does_not_populate_tags(self):
1199+ populate_tags_for_single_node = self.patch(
1200+ api, "populate_tags_for_single_node")
1201+ node = factory.make_Node(
1202+ interface=True, status=NODE_STATUS.COMMISSIONING)
1203+ payload = {
1204+ 'event_type': 'finish',
1205+ 'result': 'FAILURE',
1206+ 'origin': 'curtin',
1207+ 'name': 'commissioning',
1208+ 'description': 'Commissioning',
1209+ 'timestamp': datetime.utcnow(),
1210+ }
1211+ self.processMessage(node, payload)
1212+ self.assertEqual(
1213+ NODE_STATUS.FAILED_COMMISSIONING, reload_object(node).status)
1214+ self.assertThat(populate_tags_for_single_node, MockNotCalled())
1215+
1216+ def test_status_erasure_failure_leaves_node_failed(self):
1217+ node = factory.make_Node(
1218+ interface=True, status=NODE_STATUS.DISK_ERASING)
1219+ payload = {
1220+ 'event_type': 'finish',
1221+ 'result': 'FAILURE',
1222+ 'origin': 'curtin',
1223+ 'name': 'cmd-erase',
1224+ 'description': 'Erasing disk',
1225+ 'timestamp': datetime.utcnow(),
1226+ }
1227+ self.processMessage(node, payload)
1228+ self.assertEqual(
1229+ NODE_STATUS.FAILED_DISK_ERASING, reload_object(node).status)
1230+ # Check last node event.
1231+ self.assertEqual(
1232+ "Failed to erase disks.",
1233+ Event.objects.filter(node=node).last().description)
1234+
1235+ def test_status_erasure_failure_does_not_populate_tags(self):
1236+ populate_tags_for_single_node = self.patch(
1237+ api, "populate_tags_for_single_node")
1238+ node = factory.make_Node(
1239+ interface=True, status=NODE_STATUS.DISK_ERASING)
1240+ payload = {
1241+ 'event_type': 'finish',
1242+ 'result': 'FAILURE',
1243+ 'origin': 'curtin',
1244+ 'name': 'cmd-erase',
1245+ 'description': 'Erasing disk',
1246+ 'timestamp': datetime.utcnow(),
1247+ }
1248+ self.processMessage(node, payload)
1249+ self.assertEqual(
1250+ NODE_STATUS.FAILED_DISK_ERASING, reload_object(node).status)
1251+ self.assertThat(populate_tags_for_single_node, MockNotCalled())
1252+
1253+ def test_status_erasure_failure_doesnt_clear_owner(self):
1254+ user = factory.make_User()
1255+ node = factory.make_Node(
1256+ interface=True, status=NODE_STATUS.DISK_ERASING, owner=user)
1257+ payload = {
1258+ 'event_type': 'finish',
1259+ 'result': 'FAILURE',
1260+ 'origin': 'curtin',
1261+ 'name': 'cmd-erase',
1262+ 'description': 'Erasing disk',
1263+ 'timestamp': datetime.utcnow(),
1264+ }
1265+ self.processMessage(node, payload)
1266+ self.assertEqual(
1267+ NODE_STATUS.FAILED_DISK_ERASING, reload_object(node).status)
1268+ self.assertEqual(user, node.owner)
1269+
1270+ def test_status_with_file_bad_encoder_fails(self):
1271+ node = factory.make_Node(
1272+ interface=True, status=NODE_STATUS.COMMISSIONING)
1273+ contents = b'These are the contents of the file.'
1274+ encoded_content = encode_as_base64(bz2.compress(contents))
1275+ payload = {
1276+ 'event_type': 'finish',
1277+ 'result': 'FAILURE',
1278+ 'origin': 'curtin',
1279+ 'name': 'commissioning',
1280+ 'description': 'Commissioning',
1281+ 'timestamp': datetime.utcnow(),
1282+ 'files': [
1283+ {
1284+ "path": "sample.txt",
1285+ "encoding": "uuencode",
1286+ "compression": "bzip2",
1287+ "content": encoded_content
1288+ }
1289+ ]
1290+ }
1291+ with ExpectedException(ValueError):
1292+ self.processMessage(node, payload)
1293+
1294+ def test_status_with_file_bad_compression_fails(self):
1295+ node = factory.make_Node(
1296+ interface=True, status=NODE_STATUS.COMMISSIONING)
1297+ contents = b'These are the contents of the file.'
1298+ encoded_content = encode_as_base64(bz2.compress(contents))
1299+ payload = {
1300+ 'event_type': 'finish',
1301+ 'result': 'FAILURE',
1302+ 'origin': 'curtin',
1303+ 'name': 'commissioning',
1304+ 'description': 'Commissioning',
1305+ 'timestamp': datetime.utcnow(),
1306+ 'files': [
1307+ {
1308+ "path": "sample.txt",
1309+ "encoding": "base64",
1310+ "compression": "jpeg",
1311+ "content": encoded_content
1312+ }
1313+ ]
1314+ }
1315+ with ExpectedException(ValueError):
1316+ self.processMessage(node, payload)
1317+
1318+ def test_status_with_file_no_compression_succeeds(self):
1319+ node = factory.make_Node(
1320+ interface=True, status=NODE_STATUS.COMMISSIONING,
1321+ with_empty_script_sets=True)
1322+ script_result = (
1323+ node.current_commissioning_script_set.scriptresult_set.first())
1324+ script_result.status = SCRIPT_STATUS.RUNNING
1325+ script_result.save()
1326+ contents = b'These are the contents of the file.'
1327+ encoded_content = encode_as_base64(contents)
1328+ payload = {
1329+ 'event_type': 'finish',
1330+ 'result': 'FAILURE',
1331+ 'origin': 'curtin',
1332+ 'name': 'commissioning',
1333+ 'description': 'Commissioning',
1334+ 'timestamp': datetime.utcnow(),
1335+ 'files': [
1336+ {
1337+ "path": script_result.name,
1338+ "encoding": "base64",
1339+ "content": encoded_content
1340+ }
1341+ ]
1342+ }
1343+ self.processMessage(node, payload)
1344+ self.assertEqual(contents, reload_object(script_result).stdout)
1345+
1346+ def test_status_with_file_invalid_statuses_fails(self):
1347+ """Adding files should fail for every status that's neither
1348+ COMMISSIONING nor DEPLOYING"""
1349+ for node_status in [
1350+ NODE_STATUS.DEFAULT,
1351+ NODE_STATUS.NEW,
1352+ NODE_STATUS.FAILED_COMMISSIONING,
1353+ NODE_STATUS.MISSING,
1354+ NODE_STATUS.READY,
1355+ NODE_STATUS.RESERVED,
1356+ NODE_STATUS.DEPLOYED,
1357+ NODE_STATUS.RETIRED,
1358+ NODE_STATUS.BROKEN,
1359+ NODE_STATUS.ALLOCATED,
1360+ NODE_STATUS.FAILED_DEPLOYMENT,
1361+ NODE_STATUS.RELEASING,
1362+ NODE_STATUS.FAILED_RELEASING,
1363+ NODE_STATUS.DISK_ERASING,
1364+ NODE_STATUS.FAILED_DISK_ERASING]:
1365+ node = factory.make_Node(interface=True, status=node_status)
1366+ contents = b'These are the contents of the file.'
1367+ encoded_content = encode_as_base64(bz2.compress(contents))
1368+ payload = {
1369+ 'event_type': 'finish',
1370+ 'result': 'FAILURE',
1371+ 'origin': 'curtin',
1372+ 'name': 'commissioning',
1373+ 'description': 'Commissioning',
1374+ 'timestamp': datetime.utcnow(),
1375+ 'files': [
1376+ {
1377+ "path": "sample.txt",
1378+ "encoding": "base64",
1379+ "compression": "bzip2",
1380+ "content": encoded_content
1381+ }
1382+ ]
1383+ }
1384+ with ExpectedException(ValueError):
1385+ self.processMessage(node, payload)
1386+
1387+ def test_status_with_file_succeeds(self):
1388+ """Adding files should succeed for every status that's either
1389+ COMMISSIONING or DEPLOYING"""
1390+ for node_status, target_status in [
1391+ (NODE_STATUS.COMMISSIONING, NODE_STATUS.FAILED_COMMISSIONING),
1392+ (NODE_STATUS.DEPLOYING, NODE_STATUS.FAILED_DEPLOYMENT)]:
1393+ node = factory.make_Node(
1394+ interface=True, status=node_status,
1395+ with_empty_script_sets=True)
1396+ if node_status == NODE_STATUS.COMMISSIONING:
1397+ script_set = node.current_commissioning_script_set
1398+ elif node_status == NODE_STATUS.DEPLOYING:
1399+ script_set = node.current_installation_script_set
1400+ script_result = script_set.scriptresult_set.first()
1401+ script_result.status = SCRIPT_STATUS.RUNNING
1402+ script_result.save()
1403+ contents = b'These are the contents of the file.'
1404+ encoded_content = encode_as_base64(bz2.compress(contents))
1405+ payload = {
1406+ 'event_type': 'finish',
1407+ 'result': 'FAILURE',
1408+ 'origin': 'curtin',
1409+ 'name': 'commissioning',
1410+ 'description': 'Commissioning',
1411+ 'timestamp': datetime.utcnow(),
1412+ 'files': [
1413+ {
1414+ "path": script_result.name,
1415+ "encoding": "base64",
1416+ "compression": "bzip2",
1417+ "content": encoded_content
1418+ }
1419+ ]
1420+ }
1421+ self.processMessage(node, payload)
1422+ self.assertEqual(
1423+ target_status, reload_object(node).status)
1424+ # Check the node result.
1425+ self.assertEqual(contents, reload_object(script_result).stdout)
1426+
1427+ def test_status_with_results_succeeds(self):
1428+ """Adding a script result should succeed"""
1429+ node = factory.make_Node(
1430+ interface=True, status=NODE_STATUS.COMMISSIONING,
1431+ with_empty_script_sets=True)
1432+ script_result = (
1433+ node.current_commissioning_script_set.scriptresult_set.first())
1434+ script_result.status = SCRIPT_STATUS.RUNNING
1435+ script_result.save()
1436+ contents = b'These are the contents of the file.'
1437+ encoded_content = encode_as_base64(bz2.compress(contents))
1438+ payload = {
1439+ 'event_type': 'finish',
1440+ 'result': 'FAILURE',
1441+ 'origin': 'curtin',
1442+ 'name': 'commissioning',
1443+ 'description': 'Commissioning',
1444+ 'timestamp': datetime.utcnow(),
1445+ 'files': [
1446+ {
1447+ "path": script_result.name,
1448+ "encoding": "base64",
1449+ "compression": "bzip2",
1450+ "content": encoded_content,
1451+ "result": -42
1452+ }
1453+ ]
1454+ }
1455+ self.processMessage(node, payload)
1456+ script_result = reload_object(script_result)
1457+ self.assertEqual(contents, script_result.stdout)
1458+ self.assertEqual(-42, script_result.exit_status)
1459+
1460+ def test_status_with_results_no_exit_status_defaults_to_zero(self):
1461+ """Adding a script result should succeed without a return code defaults
1462+ it to zero."""
1463+ node = factory.make_Node(
1464+ interface=True, status=NODE_STATUS.COMMISSIONING,
1465+ with_empty_script_sets=True)
1466+ script_result = (
1467+ node.current_commissioning_script_set.scriptresult_set.first())
1468+ script_result.status = SCRIPT_STATUS.RUNNING
1469+ script_result.save()
1470+ contents = b'These are the contents of the file.'
1471+ encoded_content = encode_as_base64(bz2.compress(contents))
1472+ payload = {
1473+ 'event_type': 'finish',
1474+ 'result': 'FAILURE',
1475+ 'origin': 'curtin',
1476+ 'name': 'commissioning',
1477+ 'description': 'Commissioning',
1478+ 'timestamp': datetime.utcnow(),
1479+ 'files': [
1480+ {
1481+ "path": script_result.name,
1482+ "encoding": "base64",
1483+ "compression": "bzip2",
1484+ "content": encoded_content,
1485+ }
1486+ ]
1487+ }
1488+ self.processMessage(node, payload)
1489+ self.assertEqual(0, reload_object(script_result).exit_status)
1490+
1491+ def test_status_stores_virtual_tag_on_node_if_virtual(self):
1492+ node = factory.make_Node(
1493+ status=NODE_STATUS.COMMISSIONING, with_empty_script_sets=True)
1494+ content = 'virtual'.encode('utf-8')
1495+ payload = {
1496+ 'event_type': 'finish',
1497+ 'result': 'SUCCESS',
1498+ 'origin': 'curtin',
1499+ 'name': 'commissioning',
1500+ 'description': 'Commissioning',
1501+ 'timestamp': datetime.utcnow(),
1502+ 'files': [
1503+ {
1504+ "path": "00-maas-02-virtuality",
1505+ "encoding": "base64",
1506+ "content": encode_as_base64(content),
1507+ }
1508+ ]
1509+ }
1510+ self.processMessage(node, payload)
1511+ node = reload_object(node)
1512+ self.assertEqual(
1513+ ["virtual"], [each_tag.name for each_tag in node.tags.all()])
1514+ for script_result in node.current_commissioning_script_set:
1515+ if script_result.name == "00-maas-02-virtuality":
1516+ break
1517+ self.assertEqual(content, script_result.stdout)
1518+
1519+ def test_status_removes_virtual_tag_on_node_if_not_virtual(self):
1520+ node = factory.make_Node(
1521+ status=NODE_STATUS.COMMISSIONING, with_empty_script_sets=True)
1522+ tag, _ = Tag.objects.get_or_create(name='virtual')
1523+ node.tags.add(tag)
1524+ content = 'none'.encode('utf-8')
1525+ payload = {
1526+ 'event_type': 'finish',
1527+ 'result': 'SUCCESS',
1528+ 'origin': 'curtin',
1529+ 'name': 'commissioning',
1530+ 'description': 'Commissioning',
1531+ 'timestamp': datetime.utcnow(),
1532+ 'files': [
1533+ {
1534+ "path": "00-maas-02-virtuality",
1535+ "encoding": "base64",
1536+ "content": encode_as_base64(content),
1537+ }
1538+ ]
1539+ }
1540+ self.processMessage(node, payload)
1541+ node = reload_object(node)
1542+ self.assertEqual(
1543+ [], [each_tag.name for each_tag in node.tags.all()])
1544+ for script_result in node.current_commissioning_script_set:
1545+ if script_result.name == "00-maas-02-virtuality":
1546+ break
1547+ self.assertEqual(content, script_result.stdout)
1548+
1549+ def test_status_updates_script_status_last_ping(self):
1550+ nodes = {
1551+ status: factory.make_Node(
1552+ status=status, with_empty_script_sets=True)
1553+ for status in (
1554+ NODE_STATUS.COMMISSIONING,
1555+ NODE_STATUS.TESTING,
1556+ NODE_STATUS.DEPLOYING)
1557+ }
1558+
1559+ for status, node in nodes.items():
1560+ payload = {
1561+ 'event_type': 'progress',
1562+ 'origin': 'curtin',
1563+ 'name': 'test',
1564+ 'description': 'testing',
1565+ 'timestamp': datetime.utcnow(),
1566+ }
1567+ self.processMessage(node, payload)
1568+ script_set_statuses = {
1569+ NODE_STATUS.COMMISSIONING: (
1570+ node.current_commissioning_script_set),
1571+ NODE_STATUS.TESTING: node.current_testing_script_set,
1572+ NODE_STATUS.DEPLOYING: node.current_installation_script_set,
1573+ }
1574+ script_set = script_set_statuses.get(node.status)
1575+ self.assertIsNotNone(script_set.last_ping)