Merge lp:~jameinel/maas/tag-updating into lp:maas/trunk

Proposed by John A Meinel on 2012-10-04
Status: Merged
Approved by: John A Meinel on 2012-10-04
Approved revision: 1143
Merged at revision: 1162
Proposed branch: lp:~jameinel/maas/tag-updating
Merge into: lp:maas/trunk
Diff against target: 445 lines (+371/-14)
5 files modified
src/provisioningserver/tags.py (+150/-0)
src/provisioningserver/tasks.py (+14/-1)
src/provisioningserver/testing/testcase.py (+26/-0)
src/provisioningserver/tests/test_boot_images.py (+0/-13)
src/provisioningserver/tests/test_tags.py (+181/-0)
To merge this branch: bzr merge lp:~jameinel/maas/tag-updating
Reviewer Review Type Date Requested Status
Martin Packman (community) 2012-10-04 Approve on 2012-10-04
Review via email: mp+127962@code.launchpad.net

Commit Message

Add a task for the provisioning_server that can update tags using the APIs we just added.

This allows us to farm out all the work for processing 100,000 tags into the provisioning_servers, which should allow us to consistently evaluate a Tag definition in under 10s, even as we scale.

Description of the Change

Add a task for the provisioning_server that can update tags using the APIs we just added.

This uses a lot of mocking to make sure the calls are correct, but I'll still need to add testing on the other side that end-to-end actually works.

I can wait to land this until the next patch is complete if so desired, but I would like to get feedback on the mock work that was done here.

To post a comment you must log in.
lp:~jameinel/maas/tag-updating updated on 2012-10-04
1141. By John A Meinel on 2012-10-04

Gavin confirmed that you just pass it as a python list.

1142. By John A Meinel on 2012-10-04

Revert the changes to the acceptance Makefile, as it was probably specific for Jelmer's machine.

Martin Packman (gz) wrote :

-include /etc/lsb-release
+DISTRIB_CODENAME=unstable

This seems unrelated, probably wants reverting.

+ # XXX: Check the response code before we parse the content

A helper in client that raised an exception on a non-2XX response would be good, can be added later.

The rest of the stuff I understand looks good. :)

review: Approve
lp:~jameinel/maas/tag-updating updated on 2012-10-04
1143. By John A Meinel on 2012-10-04

Simplify to a single upload_node_tags call.

The bandwidth required should be reasonable.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'src/provisioningserver/tags.py'
2--- src/provisioningserver/tags.py 1970-01-01 00:00:00 +0000
3+++ src/provisioningserver/tags.py 2012-10-04 09:47:27 +0000
4@@ -0,0 +1,150 @@
5+# Copyright 2012 Canonical Ltd. This software is licensed under the
6+# GNU Affero General Public License version 3 (see the file LICENSE).
7+
8+"""Celery jobs for managing tags.
9+
10+"""
11+
12+import json
13+from lxml import etree
14+
15+from apiclient.maas_client import (
16+ MAASClient,
17+ MAASDispatcher,
18+ MAASOAuth,
19+ )
20+
21+from provisioningserver.auth import (
22+ get_recorded_api_credentials,
23+ get_recorded_maas_url,
24+ get_recorded_nodegroup_uuid,
25+ )
26+
27+from provisioningserver.logging import task_logger
28+
29+
30+DEFAULT_BATCH_SIZE = 100
31+
32+
33+def get_cached_knowledge():
34+ """Get all the information that we need to know, or raise an error.
35+
36+ :return: (client, nodegroup_uuid)
37+ """
38+ maas_url = get_recorded_maas_url()
39+ if maas_url is None:
40+ task_logger.error("Not updating tags: don't have API URL yet.")
41+ return None, None
42+ api_credentials = get_recorded_api_credentials()
43+ if api_credentials is None:
44+ task_logger.error("Not updating tags: don't have API key yet.")
45+ return None, None
46+ nodegroup_uuid = get_recorded_nodegroup_uuid()
47+ if nodegroup_uuid is None:
48+ task_logger.error("Not updating tags: don't have UUID yet.")
49+ return None, None
50+ client = MAASClient(MAASOAuth(*api_credentials), MAASDispatcher(),
51+ maas_url)
52+ return client, nodegroup_uuid
53+
54+
55+def get_nodes_for_node_group(client, nodegroup_uuid):
56+ """Retrieve the UUIDs of nodes in a particular group.
57+
58+ :param client: MAAS client instance
59+ :param nodegroup_uuid: Node group for which to retrieve nodes
60+ :return: List of UUIDs for nodes in nodegroup
61+ """
62+ path = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid)
63+ response = client.get(path, op='list_nodes')
64+ # XXX: Check the response code before we parse the content
65+ return json.loads(response.content)
66+
67+
68+def get_hardware_details_for_nodes(client, nodegroup_uuid, system_ids):
69+ """Retrieve the lshw output for a set of nodes.
70+
71+ :param client: MAAS client
72+ :param system_ids: List of UUIDs of systems for which to fetch lshw data
73+ :return: Dictionary mapping node UUIDs to lshw output
74+ """
75+ path = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid,)
76+ response = client.get(
77+ path, op='node_hardware_details', system_ids=system_ids)
78+ # XXX: Check the response code before we parse the content
79+ return json.loads(response.content)
80+
81+
82+def update_node_tags(client, tag_name, uuid, added, removed):
83+ """Update the nodes relevant for a particular tag.
84+
85+ :param client: MAAS client
86+ :param tag_name: Name of tag
87+ :param uuid: NodeGroup uuid of this worker. Needed for security
88+ permissions. (The nodegroup worker is only allowed to touch nodes in
89+ its nodegroup, otherwise you need to be a superuser.)
90+ :param added: Set of nodes to add
91+ :param removed: Set of nodes to remove
92+ """
93+ path = 'api/1.0/tags/%s/' % (tag_name,)
94+ response = client.post(path, op='update_nodes', add=added, remove=removed)
95+ # XXX: Check the response code before we parse the content
96+ return json.loads(response.content)
97+
98+
99+def process_batch(xpath, hardware_details):
100+ """Get the details for one batch, and process whether they match or not.
101+ """
102+ # Fetch node XML in batches
103+ matched_nodes = []
104+ unmatched_nodes = []
105+ for system_id, hw_xml in hardware_details:
106+ xml = etree.XML(hw_xml)
107+ if xpath(xml):
108+ matched_nodes.append(system_id)
109+ else:
110+ unmatched_nodes.append(system_id)
111+ return matched_nodes, unmatched_nodes
112+
113+
114+def process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
115+ batch_size=None):
116+ if batch_size is None:
117+ batch_size = DEFAULT_BATCH_SIZE
118+ all_matched = []
119+ all_unmatched = []
120+ for i in range(0, len(system_ids), batch_size):
121+ selected_ids = system_ids[i:i + batch_size]
122+ details = get_hardware_details_for_nodes(
123+ client, nodegroup_uuid, selected_ids)
124+ matched, unmatched = process_batch(xpath, details)
125+ all_matched.extend(matched)
126+ all_unmatched.extend(unmatched)
127+ # Upload all updates for one nodegroup at one time. This should be no more
128+ # than ~41*10,000 = 410kB. That should take <1s even on a 10Mbit network.
129+ # This also allows us to track if a nodegroup has been processed in the DB,
130+ # without having to add another API call.
131+ update_node_tags(client, tag_name, nodegroup_uuid, all_matched,
132+ all_unmatched)
133+
134+
135+def process_node_tags(tag_name, tag_definition, batch_size=None):
136+ """Update the nodes for a new/changed tag definition.
137+
138+ :param tag_name: Name of the tag to update nodes for
139+ :param tag_definition: Tag definition
140+ :param batch_size: Size of batch
141+ """
142+ client, nodegroup_uuid = get_cached_knowledge()
143+ if not all([client, nodegroup_uuid]):
144+ task_logger.error('Unable to update tag: %s for definition %r'
145+ ' please refresh secrets, then rebuild this tag'
146+ % (tag_name, tag_definition))
147+ return
148+ # We evaluate this early, so we can fail before sending a bunch of data to
149+ # the server
150+ xpath = etree.XPath(tag_definition)
151+ # Get nodes to process
152+ system_ids = get_nodes_for_node_group(client, nodegroup_uuid)
153+ process_all(client, tag_name, nodegroup_uuid, system_ids, xpath,
154+ batch_size=batch_size)
155
156=== modified file 'src/provisioningserver/tasks.py'
157--- src/provisioningserver/tasks.py 2012-09-29 23:49:08 +0000
158+++ src/provisioningserver/tasks.py 2012-10-04 09:47:27 +0000
159@@ -30,7 +30,10 @@
160
161 from celery.app import app_or_default
162 from celery.task import task
163-from provisioningserver import boot_images
164+from provisioningserver import (
165+ boot_images,
166+ tags,
167+ )
168 from provisioningserver.auth import (
169 record_api_credentials,
170 record_maas_url,
171@@ -335,3 +338,13 @@
172 def report_boot_images():
173 """For master worker only: report available netboot images."""
174 boot_images.report_to_server()
175+
176+
177+@task
178+def update_node_tags(tag_name, tag_definition):
179+ """Update the nodes for a new/changed tag definition.
180+
181+ :param tag_name: Name of the tag to update nodes for
182+ :param tag_definition: Tag definition
183+ """
184+ tags.process_node_tags(tag_name, tag_definition)
185
186=== modified file 'src/provisioningserver/testing/testcase.py'
187--- src/provisioningserver/testing/testcase.py 2012-08-24 02:51:42 +0000
188+++ src/provisioningserver/testing/testcase.py 2012-10-04 09:47:27 +0000
189@@ -14,7 +14,14 @@
190 'PservTestCase',
191 ]
192
193+from apiclient.testing.credentials import make_api_credentials
194 from maastesting import testcase
195+from maastesting.factory import factory
196+from provisioningserver.auth import (
197+ record_api_credentials,
198+ record_maas_url,
199+ record_nodegroup_uuid,
200+ )
201 from provisioningserver.testing.worker_cache import WorkerCacheFixture
202
203
204@@ -23,3 +30,22 @@
205 def setUp(self):
206 super(PservTestCase, self).setUp()
207 self.useFixture(WorkerCacheFixture())
208+
209+ def make_maas_url(self):
210+ return 'http://127.0.0.1/%s' % factory.make_name('path')
211+
212+ def set_maas_url(self):
213+ record_maas_url(self.make_maas_url())
214+
215+ def set_api_credentials(self):
216+ record_api_credentials(':'.join(make_api_credentials()))
217+
218+ def set_node_group_uuid(self):
219+ nodegroup_uuid = factory.make_name('nodegroupuuid')
220+ record_nodegroup_uuid(nodegroup_uuid)
221+
222+ def set_secrets(self):
223+ """Setup all secrets that we would get from refresh_secrets."""
224+ self.set_maas_url()
225+ self.set_api_credentials()
226+ self.set_node_group_uuid()
227
228=== modified file 'src/provisioningserver/tests/test_boot_images.py'
229--- src/provisioningserver/tests/test_boot_images.py 2012-09-25 14:45:59 +0000
230+++ src/provisioningserver/tests/test_boot_images.py 2012-10-04 09:47:27 +0000
231@@ -15,14 +15,8 @@
232 import json
233
234 from apiclient.maas_client import MAASClient
235-from apiclient.testing.credentials import make_api_credentials
236-from maastesting.factory import factory
237 from mock import Mock
238 from provisioningserver import boot_images
239-from provisioningserver.auth import (
240- record_api_credentials,
241- record_maas_url,
242- )
243 from provisioningserver.pxe import tftppath
244 from provisioningserver.testing.boot_images import make_boot_image_params
245 from provisioningserver.testing.config import ConfigFixture
246@@ -35,13 +29,6 @@
247 super(TestBootImagesTasks, self).setUp()
248 self.useFixture(ConfigFixture({'tftp': {'root': self.make_dir()}}))
249
250- def set_maas_url(self):
251- record_maas_url(
252- 'http://127.0.0.1/%s' % factory.make_name('path'))
253-
254- def set_api_credentials(self):
255- record_api_credentials(':'.join(make_api_credentials()))
256-
257 def test_sends_boot_images_to_server(self):
258 self.set_maas_url()
259 self.set_api_credentials()
260
261=== added file 'src/provisioningserver/tests/test_tags.py'
262--- src/provisioningserver/tests/test_tags.py 1970-01-01 00:00:00 +0000
263+++ src/provisioningserver/tests/test_tags.py 2012-10-04 09:47:27 +0000
264@@ -0,0 +1,181 @@
265+# Copyright 2012 Canonical Ltd. This software is licensed under the
266+# GNU Affero General Public License version 3 (see the file LICENSE).
267+
268+"""Tests for tag updating."""
269+
270+from __future__ import (
271+ absolute_import,
272+ print_function,
273+ unicode_literals,
274+ )
275+
276+__metaclass__ = type
277+__all__ = []
278+
279+from apiclient.maas_client import MAASClient
280+import httplib
281+from lxml import etree
282+from maastesting.factory import factory
283+from maastesting.fakemethod import (
284+ FakeMethod,
285+ MultiFakeMethod,
286+ )
287+from mock import MagicMock
288+from provisioningserver.auth import (
289+ get_recorded_nodegroup_uuid,
290+ )
291+from provisioningserver.testing.testcase import PservTestCase
292+from provisioningserver import tags
293+
294+
295+class FakeResponse:
296+
297+ def __init__(self, status_code, content):
298+ self.status_code = status_code
299+ self.content = content
300+
301+
302+class TestTagUpdating(PservTestCase):
303+
304+ def test_get_cached_knowledge_knows_nothing(self):
305+ # If we haven't given it any secrets, we should get back nothing
306+ self.assertEqual((None, None), tags.get_cached_knowledge())
307+
308+ def test_get_cached_knowledge_with_only_url(self):
309+ self.set_maas_url()
310+ self.assertEqual((None, None), tags.get_cached_knowledge())
311+
312+ def test_get_cached_knowledge_with_only_url_creds(self):
313+ self.set_maas_url()
314+ self.set_api_credentials()
315+ self.assertEqual((None, None), tags.get_cached_knowledge())
316+
317+ def test_get_cached_knowledge_with_all_info(self):
318+ self.set_maas_url()
319+ self.set_api_credentials()
320+ self.set_node_group_uuid()
321+ client, uuid = tags.get_cached_knowledge()
322+ self.assertIsNot(None, client)
323+ self.assertIsInstance(client, MAASClient)
324+ self.assertIsNot(None, uuid)
325+ self.assertEqual(get_recorded_nodegroup_uuid(), uuid)
326+
327+ def fake_client(self):
328+ return MAASClient(None, None, self.make_maas_url())
329+
330+ def fake_cached_knowledge(self):
331+ nodegroup_uuid = factory.make_name('nodegroupuuid')
332+ return self.fake_client(), nodegroup_uuid
333+
334+ def test_get_nodes_calls_correct_api_and_parses_result(self):
335+ client, uuid = self.fake_cached_knowledge()
336+ response = FakeResponse(httplib.OK, '["system-id1", "system-id2"]')
337+ mock = MagicMock(return_value=response)
338+ self.patch(client, 'get', mock)
339+ result = tags.get_nodes_for_node_group(client, uuid)
340+ self.assertEqual(['system-id1', 'system-id2'], result)
341+ url = 'api/1.0/nodegroup/%s/' % (uuid,)
342+ mock.assert_called_once_with(url, op='list_nodes')
343+
344+ def test_get_hardware_details_calls_correct_api_and_parses_result(self):
345+ client, uuid = self.fake_cached_knowledge()
346+ xml_data = "<test><data /></test>"
347+ content = '[["system-id1", "%s"]]' % (xml_data,)
348+ response = FakeResponse(httplib.OK, content)
349+ mock = MagicMock(return_value=response)
350+ self.patch(client, 'get', mock)
351+ result = tags.get_hardware_details_for_nodes(
352+ client, uuid, ['system-id1', 'system-id2'])
353+ self.assertEqual([['system-id1', xml_data]], result)
354+ url = 'api/1.0/nodegroup/%s/' % (uuid,)
355+ mock.assert_called_once_with(
356+ url, op='node_hardware_details',
357+ system_ids=["system-id1", "system-id2"])
358+
359+ def test_update_node_tags_calls_correct_api_and_parses_result(self):
360+ client, uuid = self.fake_cached_knowledge()
361+ content = '{"added": 1, "removed": 2}'
362+ response = FakeResponse(httplib.OK, content)
363+ mock = MagicMock(return_value=response)
364+ self.patch(client, 'post', mock)
365+ name = factory.make_name('tag')
366+ result = tags.update_node_tags(client, name, uuid,
367+ ['add-system-id'], ['remove-1', 'remove-2'])
368+ self.assertEqual({'added': 1, 'removed': 2}, result)
369+ url = 'api/1.0/tags/%s/' % (name,)
370+ mock.assert_called_once_with(
371+ url, op='update_nodes',
372+ add=['add-system-id'], remove=['remove-1', 'remove-2'])
373+
374+ def test_process_batch_evaluates_xpath(self):
375+ # Yay, something that doesn't need patching...
376+ xpath = etree.XPath('//node')
377+ node_details = [['a', '<node />'],
378+ ['b', '<not-node />'],
379+ ['c', '<parent><node /></parent>'],
380+ ]
381+ self.assertEqual(
382+ (['a', 'c'], ['b']),
383+ tags.process_batch(xpath, node_details))
384+
385+ def test_process_node_tags_no_secrets(self):
386+ self.patch(MAASClient, 'get')
387+ self.patch(MAASClient, 'post')
388+ tag_name = factory.make_name('tag')
389+ tags.process_node_tags(tag_name, '//node')
390+ self.assertFalse(MAASClient.get.called)
391+ self.assertFalse(MAASClient.post.called)
392+
393+ def test_process_node_tags_integration(self):
394+ self.set_secrets()
395+ get_nodes = FakeMethod(
396+ result=FakeResponse(httplib.OK, '["system-id1", "system-id2"]'))
397+ get_hw_details = FakeMethod(
398+ result=FakeResponse(httplib.OK,
399+ '[["system-id1", "<node />"], ["system-id2", "<no-node />"]]'))
400+ get_fake = MultiFakeMethod([get_nodes, get_hw_details])
401+ post_fake = FakeMethod(
402+ result=FakeResponse(httplib.OK, '{"added": 1, "removed": 1}'))
403+ self.patch(MAASClient, 'get', get_fake)
404+ self.patch(MAASClient, 'post', post_fake)
405+ tag_name = factory.make_name('tag')
406+ nodegroup_uuid = get_recorded_nodegroup_uuid()
407+ tags.process_node_tags(tag_name, '//node')
408+ nodegroup_url = 'api/1.0/nodegroup/%s/' % (nodegroup_uuid,)
409+ tag_url = 'api/1.0/tags/%s/' % (tag_name,)
410+ self.assertEqual([((nodegroup_url,), {'op': 'list_nodes'})],
411+ get_nodes.calls)
412+ self.assertEqual([((nodegroup_url,),
413+ {'op': 'node_hardware_details',
414+ 'system_ids': ['system-id1', 'system-id2']})],
415+ get_hw_details.calls)
416+ self.assertEqual([((tag_url,),
417+ {'op': 'update_nodes',
418+ 'add': ['system-id1'],
419+ 'remove': ['system-id2'],
420+ })], post_fake.calls)
421+
422+ def test_process_node_tags_requests_details_in_batches(self):
423+ client = object()
424+ uuid = factory.make_name('nodegroupuuid')
425+ self.patch(
426+ tags, 'get_cached_knowledge',
427+ MagicMock(return_value=(client, uuid)))
428+ self.patch(
429+ tags, 'get_nodes_for_node_group',
430+ MagicMock(return_value=['a', 'b', 'c']))
431+ fake_first = FakeMethod(
432+ result=[['a', '<node />'], ['b', '<not-node />']])
433+ fake_second = FakeMethod(
434+ result=[['c', '<parent><node /></parent>']])
435+ self.patch(tags, 'get_hardware_details_for_nodes',
436+ MultiFakeMethod([fake_first, fake_second]))
437+ self.patch(tags, 'update_node_tags')
438+ tag_name = factory.make_name('tag')
439+ tags.process_node_tags(tag_name, '//node', batch_size=2)
440+ tags.get_cached_knowledge.assert_called_once_with()
441+ tags.get_nodes_for_node_group.assert_called_once_with(client, uuid)
442+ self.assertEqual([((client, uuid, ['a', 'b']), {})], fake_first.calls)
443+ self.assertEqual([((client, uuid, ['c']), {})], fake_second.calls)
444+ tags.update_node_tags.assert_called_once_with(
445+ client, tag_name, uuid, ['a', 'c'], ['b'])