Merge lp:~julian-edwards/maas/periodic-lease-upload-pserv into lp:~maas-committers/maas/trunk

Proposed by Julian Edwards
Status: Merged
Approved by: Julian Edwards
Approved revision: no longer in the source branch.
Merged at revision: 2830
Proposed branch: lp:~julian-edwards/maas/periodic-lease-upload-pserv
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 906 lines (+558/-64)
15 files modified
docs/development/lease-scanning-and-dns.rst (+4/-10)
etc/celeryconfig_cluster.py (+0/-8)
src/maasserver/rpc/leases.py (+45/-0)
src/maasserver/rpc/regionservice.py (+10/-0)
src/maasserver/rpc/tests/test_regionservice.py (+40/-0)
src/provisioningserver/dhcp/leases.py (+18/-8)
src/provisioningserver/dhcp/tests/test_leases.py (+11/-11)
src/provisioningserver/plugin.py (+13/-0)
src/provisioningserver/pserv_services/lease_upload_service.py (+145/-0)
src/provisioningserver/pserv_services/tests/test_lease_upload_service.py (+235/-0)
src/provisioningserver/rpc/region.py (+16/-0)
src/provisioningserver/tasks.py (+0/-13)
src/provisioningserver/testing/testcase.py (+18/-0)
src/provisioningserver/tests/test_plugin.py (+2/-1)
src/provisioningserver/tests/test_tasks.py (+1/-13)
To merge this branch: bzr merge lp:~julian-edwards/maas/periodic-lease-upload-pserv
Reviewer Review Type Date Requested Status
Gavin Panella (community) Approve
Review via email: mp+231164@code.launchpad.net

Commit message

Add a pserv service that replaces the Celery task that periodically uploads DHCP leases.

Description of the change

This branch contains everything needed to remove the old Celery task to upload leases, and replace it with a pserv-based one.

Roughly, the following changes were made:
 * Removing all trace of the celery config, code and tests
 * Update dev documentation
 * RPC command definition for UpdateLeases call
 * Code to translate between the RPC format and the format needed by the existing code that does the actual updating.
 * New pserv service.
 * Tests. Lots of tests!

Sorry it's on the large side, but it is a complete cut-over. I've tested that it works on my local rig.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

Looking good so far :) Lots of comments, but none of them are blockers.

Revision history for this message
Julian Edwards (julian-edwards) wrote :

Thanks for the comments!

I tried your fixture suggestion in the last test but I can't get it to work. Can you take a look please?

Take this branch and apply this diff:
http://paste.ubuntu.com/8095590/

I get this output:
======================================================================
FAIL: provisioningserver.tests.test_lease_upload_service.TestPeriodicImageDownloadService.test_upload_is_initiated
----------------------------------------------------------------------
_StringException: Failed expectation: {{{
File "/home/ed/canonical/maas/sandbox/src/maastesting/crochet.py", line 101, in __checkResults
    fail_count, Equals(0), "Unfired and/or unhandled "
  File "/usr/lib/python2.7/dist-packages/testtools/testcase.py", line 447, in expectThat
    postfix_content="MismatchError: " + str(mismatch_error)
MismatchError: 0 != 1: Unfired and/or unhandled EventualResult(s); see test details.
}}}

Unfired/unhandled EventualResult #1: {{{
*** EventualResult has not fired:
<crochet._eventloop.EventualResult object at 0x7f9e58fcfbd0>
*** It was connected to a Deferred:
<Deferred at 0x7f9e58fd35a8>
}}}

traceback-1: {{{
Traceback (most recent call last):
AssertionError: Forced Test Failure
}}}

Traceback (most recent call last):
  File "/home/ed/canonical/maas/sandbox/src/provisioningserver/tests/test_lease_upload_service.py", line 214, in test_upload_is_initiated
    MockCalledOnceWith(uuid=uuid, mappings=mappings))
  File "/usr/lib/python2.7/dist-packages/testtools/testcase.py", line 406, in assertThat
    raise mismatch_error
MismatchError: Expected to be called once. Called 0 times.

-------------------- >> begin captured logging << --------------------
twisted: INFO: AMPTestProtocol#1 connection established (HOST:<twisted.test.iosim.FakeAddress object at 0x7f9e58fc64d0> PEER:<twisted.test.iosim.FakeAddress object at 0x7f9e58fc6510>)
twisted: INFO: ClusterClient connection established (HOST:<twisted.test.iosim.FakeAddress object at 0x7f9e58fc6550> PEER:<twisted.test.iosim.FakeAddress object at 0x7f9e58fc6590>)
--------------------- >> end captured logging << ---------------------

Revision history for this message
Gavin Panella (allenap) wrote :
Revision history for this message
Julian Edwards (julian-edwards) wrote :

This is now all working on my test rig. I still can't get your modified tests to pass though, so I'll get a review and land without the changes as your off today and then we can look at it again when you're back.

Revision history for this message
Julian Edwards (julian-edwards) wrote :

I made a couple more changes:
 - Catch service errors and log them — the code may look somewhat familiar to you :)
 - moved the code to the new pserv_services dir

Revision history for this message
Gavin Panella (allenap) wrote :

Looks grand.

review: Approve
Revision history for this message
Julian Edwards (julian-edwards) wrote :

Thanks for reviewing! I'm going to leave the lock for now, see my comment. It's easily changed later if needed.

Revision history for this message
Julian Edwards (julian-edwards) wrote :

Tested again on my local rig and it seems fine, so landing now.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'docs/development/lease-scanning-and-dns.rst'
2--- docs/development/lease-scanning-and-dns.rst 2013-09-13 04:08:22 +0000
3+++ docs/development/lease-scanning-and-dns.rst 2014-08-28 03:19:24 +0000
4@@ -20,16 +20,10 @@
5 ===============
6
7 MAAS will periodically scan the DHCP leases file using the
8-``upload_dhcp_leases()`` celery task, which originates via celerybeat.
9-
10-As leases are discovered, it calls the api function ``update_leases()`` which
11-will subsequently generate a new task called ``add_new_dhcp_host_map()`` that
12-will use omshell to write a permanent mapping from MAC to IP in the DHCP
13-server.
14-
15-That host map remains until the node is deleted, at which point the
16-``remove_dhcp_host_map()`` task is invoked which uses omshell to remove the
17-map.
18+``PeriodicLeaseUploadService()`` pserv service.
19+
20+As leases are discovered, it calls the RPC function ``UpdateLeases`` which
21+stores the active leases in the DHCPLease table.
22
23
24 Updating the DNS zone file
25
26=== modified file 'etc/celeryconfig_cluster.py'
27--- etc/celeryconfig_cluster.py 2014-08-26 23:55:04 +0000
28+++ etc/celeryconfig_cluster.py 2014-08-28 03:19:24 +0000
29@@ -40,14 +40,6 @@
30 PROBE_DHCP_SERVERS_SCHEDULE = timedelta(minutes=1)
31
32 CELERYBEAT_SCHEDULE = {
33- 'unconditional-dhcp-lease-upload': {
34- 'task': 'provisioningserver.tasks.upload_dhcp_leases',
35- 'schedule': DHCP_LEASE_UPLOAD_SCHEDULE,
36- 'options': {
37- 'queue': CLUSTER_UUID,
38- 'expires': int(DHCP_LEASE_UPLOAD_SCHEDULE.total_seconds()),
39- },
40- },
41 'report-boot-images': {
42 'task': 'provisioningserver.tasks.report_boot_images',
43 'schedule': REPORT_BOOT_IMAGES_SCHEDULE,
44
45=== added file 'src/maasserver/rpc/leases.py'
46--- src/maasserver/rpc/leases.py 1970-01-01 00:00:00 +0000
47+++ src/maasserver/rpc/leases.py 2014-08-28 03:19:24 +0000
48@@ -0,0 +1,45 @@
49+# Copyright 2014 Canonical Ltd. This software is licensed under the
50+# GNU Affero General Public License version 3 (see the file LICENSE).
51+
52+"""RPC helpers relating to DHCP leases."""
53+
54+from __future__ import (
55+ absolute_import,
56+ print_function,
57+ unicode_literals,
58+ )
59+
60+str = None
61+
62+__metaclass__ = type
63+__all__ = [
64+ "update_leases",
65+]
66+
67+from django.shortcuts import get_object_or_404
68+from maasserver.models.dhcplease import DHCPLease
69+from maasserver.models.nodegroup import NodeGroup
70+from maasserver.utils.async import transactional
71+from provisioningserver.pserv_services.lease_upload_service import (
72+ convert_mappings_to_leases,
73+ )
74+from provisioningserver.utils.twisted import synchronous
75+
76+
77+@synchronous
78+@transactional
79+def update_leases(uuid, mappings):
80+ """Updates DHCP leases on a cluster given the mappings in UpdateLeases.
81+
82+ :param uuid: Cluster UUID as found in
83+ :py:class`~provisioningserver.rpc.region.UpdateLeases`.
84+ :param mappings: List of pairs of (ip, mac) as defined in
85+ :py:class`~provisioningserver.rpc.region.UpdateLeases`.
86+
87+ Converts the mappings format into a dict that
88+ DHCPLease.objects.update_leases needs and then calls it.
89+ """
90+ nodegroup = get_object_or_404(NodeGroup, uuid=uuid)
91+ leases = convert_mappings_to_leases(mappings)
92+ DHCPLease.objects.update_leases(nodegroup, leases)
93+ return {}
94
95=== modified file 'src/maasserver/rpc/regionservice.py'
96--- src/maasserver/rpc/regionservice.py 2014-08-26 14:29:13 +0000
97+++ src/maasserver/rpc/regionservice.py 2014-08-28 03:19:24 +0000
98@@ -34,6 +34,7 @@
99 bootsources,
100 configuration,
101 events,
102+ leases,
103 nodes,
104 )
105 from maasserver.utils import synchronised
106@@ -99,6 +100,15 @@
107 """
108 return {}
109
110+ @region.UpdateLeases.responder
111+ def update_leases(self, uuid, mappings):
112+ """update_leases(uuid, mappings)
113+
114+ Implementation of
115+ :py:class`~provisioningserver.rpc.region.UpdateLeases`.
116+ """
117+ return deferToThread(leases.update_leases, uuid, mappings)
118+
119 @amp.StartTLS.responder
120 def get_tls_parameters(self):
121 """get_tls_parameters()
122
123=== modified file 'src/maasserver/rpc/tests/test_regionservice.py'
124--- src/maasserver/rpc/tests/test_regionservice.py 2014-08-26 17:32:40 +0000
125+++ src/maasserver/rpc/tests/test_regionservice.py 2014-08-28 03:19:24 +0000
126@@ -34,6 +34,7 @@
127 POWER_STATE,
128 )
129 from maasserver.models.config import Config
130+from maasserver.models.dhcplease import DHCPLease
131 from maasserver.models.event import Event
132 from maasserver.models.eventtype import EventType
133 from maasserver.models.node import Node
134@@ -80,6 +81,7 @@
135 RegisterEventType,
136 ReportBootImages,
137 SendEvent,
138+ UpdateLeases,
139 UpdateNodePowerState,
140 )
141 from provisioningserver.rpc.testing import (
142@@ -234,6 +236,44 @@
143 return d.addCallback(check)
144
145
146+class TestRegionProtocol_UpdateLeases(TransactionTestCase):
147+
148+ def test_update_leases_is_registered(self):
149+ protocol = Region()
150+ responder = protocol.locateResponder(UpdateLeases.commandName)
151+ self.assertIsNot(responder, None)
152+
153+ @transactional
154+ def make_node_group(self, uuid):
155+ return factory.make_node_group(uuid=uuid)
156+
157+ @transactional
158+ def get_leases_for(self, nodegroup):
159+ return [
160+ (ng.ip, ng.mac)
161+ for ng in DHCPLease.objects.filter(nodegroup=nodegroup)]
162+
163+ @wait_for_reactor
164+ @inlineCallbacks
165+ def test__stores_leases(self):
166+ uuid = factory.make_name("uuid")
167+ nodegroup = yield deferToThread(self.make_node_group, uuid)
168+ mapping = {
169+ "ip": factory.getRandomIPAddress(),
170+ "mac": factory.getRandomMACAddress()
171+ }
172+
173+ response = yield call_responder(Region(), UpdateLeases, {
174+ b"uuid": uuid, b"mappings": [mapping]})
175+
176+ self.assertThat(response, Equals({}))
177+
178+ [(ip, mac)] = yield deferToThread(
179+ self.get_leases_for, nodegroup=nodegroup)
180+ self.expectThat(ip, Equals(mapping["ip"]))
181+ self.expectThat(mac, Equals(mapping["mac"]))
182+
183+
184 class TestRegionProtocol_GetBootSources(TransactionTestCase):
185
186 def test_get_boot_sources_is_registered(self):
187
188=== modified file 'src/provisioningserver/dhcp/leases.py'
189--- src/provisioningserver/dhcp/leases.py 2014-08-13 21:49:35 +0000
190+++ src/provisioningserver/dhcp/leases.py 2014-08-28 03:19:24 +0000
191@@ -34,6 +34,7 @@
192 ]
193
194
195+from collections import defaultdict
196 import errno
197 import json
198 from os import (
199@@ -46,8 +47,6 @@
200 MAASDispatcher,
201 MAASOAuth,
202 )
203-from celery.app import app_or_default
204-from provisioningserver import cache
205 from provisioningserver.auth import (
206 get_recorded_api_credentials,
207 get_recorded_nodegroup_uuid,
208@@ -59,6 +58,12 @@
209
210 maaslog = get_maas_logger("dhcp.leases")
211
212+# This used to be the cache in provisioningserver.cache, but that
213+# unfortunately makes Twisted fail in ways we can't work out, probably
214+# because of its use of multiprocessing. Instead it's now using a
215+# simple dict, because there are no plans to make pserv multi-process.
216+cache = defaultdict()
217+
218
219 # Cache key for the modification time on last-processed leases file.
220 LEASES_TIME_CACHE_KEY = 'leases_time'
221@@ -69,8 +74,13 @@
222
223
224 def get_leases_file():
225- """Get the location of the DHCP leases file from the config."""
226- return app_or_default().conf.DHCP_LEASES_FILE
227+ """Return the location of the DHCP leases file."""
228+ # This used to be celery config-based so that the development env could
229+ # have a different location. However, nobody seems to be
230+ # provisioning from a dev environment so it's hard-coded until that
231+ # need arises, as converting to the pserv config would be wasted
232+ # work right now.
233+ return "/var/lib/maas/dhcp/dhcpd.leases"
234
235
236 def get_leases_timestamp():
237@@ -115,8 +125,8 @@
238 # These variables are shared between worker threads/processes.
239 # A bit of inconsistency due to concurrent updates is not a problem,
240 # but read them both at once here to reduce the scope for trouble.
241- previous_leases = cache.cache.get(LEASES_CACHE_KEY)
242- previous_leases_time = cache.cache.get(LEASES_TIME_CACHE_KEY)
243+ previous_leases = cache.get(LEASES_CACHE_KEY)
244+ previous_leases_time = cache.get(LEASES_TIME_CACHE_KEY)
245
246 if get_leases_timestamp() == previous_leases_time:
247 return None
248@@ -139,8 +149,8 @@
249 :param leases: A dict mapping each leased IP address to the MAC address
250 that it has been assigned to.
251 """
252- cache.cache.set(LEASES_TIME_CACHE_KEY, last_change)
253- cache.cache.set(LEASES_CACHE_KEY, leases)
254+ cache[LEASES_TIME_CACHE_KEY] = last_change
255+ cache[LEASES_CACHE_KEY] = leases
256
257
258 def list_missing_items(knowledge):
259
260=== modified file 'src/provisioningserver/dhcp/tests/test_leases.py'
261--- src/provisioningserver/dhcp/tests/test_leases.py 2014-07-18 17:05:57 +0000
262+++ src/provisioningserver/dhcp/tests/test_leases.py 2014-08-28 03:19:24 +0000
263@@ -30,10 +30,10 @@
264 get_write_time,
265 )
266 from mock import Mock
267-from provisioningserver import cache
268 from provisioningserver.auth import NODEGROUP_UUID_CACHE_KEY
269 from provisioningserver.dhcp import leases as leases_module
270 from provisioningserver.dhcp.leases import (
271+ cache,
272 check_lease_changes,
273 LEASES_CACHE_KEY,
274 LEASES_TIME_CACHE_KEY,
275@@ -56,8 +56,8 @@
276 record_lease_state(time, leases)
277 self.assertEqual(
278 (time, leases), (
279- cache.cache.get(LEASES_TIME_CACHE_KEY),
280- cache.cache.get(LEASES_CACHE_KEY),
281+ cache.get(LEASES_TIME_CACHE_KEY),
282+ cache.get(LEASES_CACHE_KEY),
283 ))
284
285
286@@ -115,12 +115,12 @@
287 def set_nodegroup_uuid(self):
288 """Set the recorded nodegroup uuid for the duration of this test."""
289 uuid = factory.make_UUID()
290- cache.cache.set(NODEGROUP_UUID_CACHE_KEY, uuid)
291+ cache[NODEGROUP_UUID_CACHE_KEY] = uuid
292 return uuid
293
294 def clear_nodegroup_uuid(self):
295 """Clear the recorded nodegroup uuid."""
296- cache.cache.set(NODEGROUP_UUID_CACHE_KEY, None)
297+ cache[NODEGROUP_UUID_CACHE_KEY] = None
298
299 def set_maas_url(self):
300 """Set the recorded MAAS URL for the duration of this test."""
301@@ -138,11 +138,11 @@
302 """Set recorded API credentials for the duration of this test."""
303 creds_string = ':'.join(
304 factory.make_string() for counter in range(3))
305- cache.cache.set('api_credentials', creds_string)
306+ cache['api_credentials'] = creds_string
307
308 def clear_api_credentials(self):
309 """Clear recorded API credentials."""
310- cache.cache.set('api_credentials', None)
311+ cache['api_credentials'] = None
312
313 def set_items_needed_for_lease_update(self):
314 """Set the recorded items required by `update_leases`."""
315@@ -157,8 +157,8 @@
316 state so that it gets reset at the end of the test. Using this will
317 prevent recorded lease state from leaking into other tests.
318 """
319- cache.cache.set(LEASES_TIME_CACHE_KEY, time)
320- cache.cache.set(LEASES_CACHE_KEY, leases)
321+ cache[LEASES_TIME_CACHE_KEY] = time
322+ cache[LEASES_CACHE_KEY] = leases
323
324 def test_record_lease_state_sets_leases_and_timestamp(self):
325 time = datetime.utcnow()
326@@ -167,8 +167,8 @@
327 record_lease_state(time, leases)
328 self.assertEqual(
329 (time, leases), (
330- cache.cache.get(LEASES_TIME_CACHE_KEY),
331- cache.cache.get(LEASES_CACHE_KEY),
332+ cache.get(LEASES_TIME_CACHE_KEY),
333+ cache.get(LEASES_CACHE_KEY),
334 ))
335
336 def test_check_lease_changes_returns_tuple_if_no_state_cached(self):
337
338=== modified file 'src/provisioningserver/plugin.py'
339--- src/provisioningserver/plugin.py 2014-08-27 03:38:28 +0000
340+++ src/provisioningserver/plugin.py 2014-08-28 03:19:24 +0000
341@@ -34,6 +34,9 @@
342 from provisioningserver.pserv_services.image_download_service import (
343 PeriodicImageDownloadService,
344 )
345+from provisioningserver.pserv_services.lease_upload_service import (
346+ PeriodicLeaseUploadService,
347+ )
348 from provisioningserver.pserv_services.node_power_monitor_service import (
349 NodePowerMonitorService,
350 )
351@@ -232,6 +235,12 @@
352 image_download_service.setName("image_download")
353 return image_download_service
354
355+ def _makePeriodicLeaseUploadService(self, rpc_service):
356+ lease_upload_service = PeriodicLeaseUploadService(
357+ rpc_service, reactor, get_cluster_uuid())
358+ lease_upload_service.setName("lease_upload")
359+ return lease_upload_service
360+
361 def _makeNodePowerMonitorService(self, rpc_service):
362 node_monitor = NodePowerMonitorService(
363 rpc_service, reactor, get_cluster_uuid())
364@@ -267,4 +276,8 @@
365 rpc_service)
366 image_download_service.setServiceParent(services)
367
368+ lease_upload_service = self._makePeriodicLeaseUploadService(
369+ rpc_service)
370+ lease_upload_service.setServiceParent(services)
371+
372 return services
373
374=== added file 'src/provisioningserver/pserv_services/lease_upload_service.py'
375--- src/provisioningserver/pserv_services/lease_upload_service.py 1970-01-01 00:00:00 +0000
376+++ src/provisioningserver/pserv_services/lease_upload_service.py 2014-08-28 03:19:24 +0000
377@@ -0,0 +1,145 @@
378+# Copyright 2014 Canonical Ltd. This software is licensed under the
379+# GNU Affero General Public License version 3 (see the file LICENSE).
380+
381+"""Twisted service that periodically uploads DHCP leases to the region."""
382+
383+from __future__ import (
384+ absolute_import,
385+ print_function,
386+ unicode_literals,
387+ )
388+
389+str = None
390+
391+__metaclass__ = type
392+__all__ = [
393+ "convert_leases_to_mappings",
394+ "convert_mappings_to_leases",
395+ "PeriodicLeaseUploadService",
396+ ]
397+
398+
399+from provisioningserver.dhcp.leases import (
400+ check_lease_changes,
401+ record_lease_state,
402+ )
403+from provisioningserver.logger import get_maas_logger
404+from provisioningserver.rpc.exceptions import NoConnectionsAvailable
405+from provisioningserver.rpc.region import UpdateLeases
406+from provisioningserver.utils.twisted import (
407+ pause,
408+ retries,
409+ )
410+from twisted.application.internet import TimerService
411+from twisted.internet.defer import (
412+ DeferredLock,
413+ inlineCallbacks,
414+ )
415+from twisted.internet.threads import deferToThread
416+from twisted.python import log
417+
418+
419+maaslog = get_maas_logger("lease_upload_service")
420+service_lock = DeferredLock()
421+
422+
423+def convert_mappings_to_leases(mappings):
424+ """Convert AMP mappings to record_lease_state() leases.
425+
426+ Take mappings, as used by UpdateLeases, and turn into leases
427+ as used by record_lease_state().
428+ """
429+ return {
430+ mapping["ip"]: mapping["mac"]
431+ for mapping in mappings
432+ }
433+
434+
435+def convert_leases_to_mappings(leases):
436+ """Convert record_lease_state() leases into UpdateLeases mappings.
437+
438+ Take the leases dict, as returned by record_lease_state(), and
439+ turn it into a mappings list suitable for transportation in
440+ the UpdateLeases AMP command.
441+ """
442+ return [
443+ {"ip": ip, "mac": leases[ip]}
444+ for ip in leases
445+ ]
446+
447+
448+class PeriodicLeaseUploadService(TimerService, object):
449+ """Twisted service to periodically upload DHCP leases to the region.
450+
451+ :param client_service: A `ClusterClientService` instance for talking
452+ to the region controller.
453+ :param reactor: An `IReactor` instance.
454+ """
455+
456+ check_interval = 60 # In seconds.
457+
458+ def __init__(self, client_service, reactor, cluster_uuid):
459+ # Call self.maybe_start_upload() every self.check_interval.
460+ super(PeriodicLeaseUploadService, self).__init__(
461+ self.check_interval, self.try_upload)
462+ self.clock = reactor
463+ self.client_service = client_service
464+ self.uuid = cluster_uuid
465+ maaslog.info("PeriodicLeaseUploadService starting.")
466+
467+ def try_upload(self):
468+ """Wrap upload attempts in something that catches Failures.
469+
470+ Log the full error to the Twisted log, and a concise error to
471+ the maas log.
472+ """
473+ def upload_failure(failure):
474+ log.err(failure)
475+ maaslog.error(
476+ "Failed to upload leases: %s", failure.getErrorMessage())
477+
478+ return self.maybe_start_upload().addErrback(upload_failure)
479+
480+ @inlineCallbacks
481+ def maybe_start_upload(self):
482+ """Initiate a new upload if one is not already in progress."""
483+ # Use a DeferredLock to prevent simultaneous uploads.
484+ if service_lock.locked:
485+ # Don't want to block on lock release.
486+ return
487+ yield service_lock.acquire()
488+ try:
489+ yield self._get_client_and_start_upload()
490+ finally:
491+ service_lock.release()
492+
493+ @inlineCallbacks
494+ def _get_client_and_start_upload(self):
495+ # Retry a few times, since this service usually comes up before
496+ # the RPC service.
497+ for elapsed, remaining, wait in retries(15, 5, self.clock):
498+ try:
499+ client = self.client_service.getClient()
500+ break
501+ except NoConnectionsAvailable:
502+ yield pause(wait, clock=self.clock)
503+ else:
504+ maaslog.error(
505+ "Failed to connect to region controller, cannot upload leases")
506+ return
507+ yield self._start_upload(client)
508+
509+ @inlineCallbacks
510+ def _start_upload(self, client):
511+ maaslog.info("Scanning DHCP leases...")
512+ updated_lease_info = yield deferToThread(check_lease_changes)
513+ if updated_lease_info is None:
514+ maaslog.info("No leases changed since last scan")
515+ else:
516+ timestamp, leases = updated_lease_info
517+ record_lease_state(timestamp, leases)
518+ mappings = convert_leases_to_mappings(leases)
519+ maaslog.info("Uploading DHCP leases to region controller.")
520+ yield client(
521+ UpdateLeases, uuid=self.uuid, mappings=mappings)
522+ maaslog.info("Lease upload complete.")
523
524=== added file 'src/provisioningserver/pserv_services/tests/test_lease_upload_service.py'
525--- src/provisioningserver/pserv_services/tests/test_lease_upload_service.py 1970-01-01 00:00:00 +0000
526+++ src/provisioningserver/pserv_services/tests/test_lease_upload_service.py 2014-08-28 03:19:24 +0000
527@@ -0,0 +1,235 @@
528+# Copyright 2014 Canonical Ltd. This software is licensed under the
529+# GNU Affero General Public License version 3 (see the file LICENSE).
530+
531+"""Tests for src/provisioningserver/pserv_services/lease_upload_service.py"""
532+
533+from __future__ import (
534+ absolute_import,
535+ print_function,
536+ unicode_literals,
537+ )
538+
539+str = None
540+
541+__metaclass__ = type
542+__all__ = []
543+
544+from datetime import datetime
545+
546+from fixtures import FakeLogger
547+from maastesting.factory import factory
548+from maastesting.matchers import (
549+ MockCalledOnceWith,
550+ MockCallsMatch,
551+ MockNotCalled,
552+ )
553+from mock import (
554+ ANY,
555+ call,
556+ Mock,
557+ sentinel,
558+ )
559+from provisioningserver import services
560+from provisioningserver.dhcp.leases import check_lease_changes
561+from provisioningserver.pserv_services import lease_upload_service
562+from provisioningserver.pserv_services.lease_upload_service import (
563+ convert_leases_to_mappings,
564+ convert_mappings_to_leases,
565+ PeriodicLeaseUploadService,
566+ service_lock,
567+ )
568+from provisioningserver.rpc.exceptions import NoConnectionsAvailable
569+from provisioningserver.rpc.region import UpdateLeases
570+from provisioningserver.rpc.testing import (
571+ MockClusterToRegionRPCFixture,
572+ TwistedLoggerFixture,
573+ )
574+from provisioningserver.testing.testcase import PservTestCase
575+from provisioningserver.utils.twisted import pause
576+from testtools.deferredruntest import extract_result
577+from twisted.application.internet import TimerService
578+from twisted.internet import defer
579+from twisted.internet.task import Clock
580+
581+
582+def make_random_lease():
583+ ip = factory.getRandomIPAddress()
584+ mac = factory.getRandomMACAddress()
585+ return {ip: mac}
586+
587+
588+def make_random_mapping():
589+ ip = factory.getRandomIPAddress()
590+ mac = factory.getRandomMACAddress()
591+ mapping = {"ip": ip, "mac": mac}
592+ return mapping
593+
594+
595+class TestHelperFunctions(PservTestCase):
596+
597+ def test_convert_leases_to_mappings_maps_correctly(self):
598+ mappings = list()
599+ for _ in xrange(3):
600+ mappings.append(make_random_mapping())
601+
602+ # Convert to leases.
603+ leases = convert_mappings_to_leases(mappings)
604+ # Convert back and test against our original mappings.
605+ observed = convert_leases_to_mappings(leases)
606+ self.assertItemsEqual(mappings, observed)
607+
608+ def test_convert_leases_to_mappings_converts_correctly(self):
609+ leases = dict()
610+ for _ in xrange(3):
611+ leases.update(make_random_lease())
612+
613+ # Convert to mappings.
614+ mappings = convert_leases_to_mappings(leases)
615+ # Convert back and test against our original leases.
616+ observed = convert_mappings_to_leases(mappings)
617+ self.assertEqual(observed, leases)
618+
619+
620+class TestPeriodicImageDownloadService(PservTestCase):
621+
622+ def test_init(self):
623+ service = PeriodicLeaseUploadService(
624+ sentinel.service, sentinel.clock, sentinel.uuid)
625+ self.assertIsInstance(service, TimerService)
626+ self.assertIs(service.clock, sentinel.clock)
627+ self.assertIs(service.uuid, sentinel.uuid)
628+ self.assertIs(service.client_service, sentinel.service)
629+
630+ def patch_upload(self, service, return_value=None):
631+ patched = self.patch(service, '_get_client_and_start_upload')
632+ patched.return_value = defer.succeed(return_value)
633+ return patched
634+
635+ def test_is_called_every_interval(self):
636+ clock = Clock()
637+ service = PeriodicLeaseUploadService(
638+ sentinel.service, clock, sentinel.uuid)
639+ # Avoid actual uploads:
640+ start_upload = self.patch_upload(service)
641+
642+ # There are no calls before the service is started.
643+ self.assertThat(start_upload, MockNotCalled())
644+
645+ service.startService()
646+
647+ # The first call is issued at startup.
648+ self.assertThat(start_upload, MockCalledOnceWith())
649+
650+ # Wind clock forward one second less than the desired interval.
651+ clock.advance(service.check_interval - 1)
652+ # No more periodic calls made.
653+ self.assertThat(start_upload, MockCalledOnceWith())
654+
655+ # Wind clock forward one second, past the interval.
656+ clock.advance(1)
657+
658+ # Now there were two calls.
659+ self.assertThat(start_upload, MockCallsMatch(call(), call()))
660+
661+ # Forward another interval, should be three calls.
662+ clock.advance(service.check_interval)
663+ self.assertThat(
664+ start_upload, MockCallsMatch(call(), call(), call()))
665+
666+ @defer.inlineCallbacks
667+ def test_does_not_run_if_lock_taken(self):
668+ service = PeriodicLeaseUploadService(
669+ sentinel.rpc, Clock(), sentinel.uuid)
670+ start_upload = self.patch_upload(service)
671+ yield service_lock.acquire()
672+ self.addCleanup(service_lock.release)
673+ service.startService()
674+ self.assertThat(start_upload, MockNotCalled())
675+
676+ def test_takes_lock_when_running(self):
677+ clock = Clock()
678+ service = PeriodicLeaseUploadService(
679+ sentinel.rpc, clock, sentinel.uuid)
680+
681+ # Patch the upload func so it's just a Deferred that waits for
682+ # one second.
683+ _start_upload = self.patch(service, '_get_client_and_start_upload')
684+ _start_upload.return_value = pause(1, clock)
685+
686+ # Lock is acquired for the first download after startup.
687+ self.assertFalse(service_lock.locked)
688+ service.startService()
689+ self.assertTrue(service_lock.locked)
690+
691+ # Lock is released once the download is done.
692+ clock.advance(1)
693+ self.assertFalse(service_lock.locked)
694+
695+ def test_no_upload_if_no_rpc_connections(self):
696+ rpc_client = Mock()
697+ rpc_client.getClient.side_effect = NoConnectionsAvailable()
698+
699+ clock = Clock()
700+ service = PeriodicLeaseUploadService(
701+ rpc_client, clock, sentinel.uuid)
702+ start_upload = self.patch(service, '_start_upload')
703+ service.startService()
704+ # Wind clock past all the retries. You can't do this in one big
705+ # lump, it seems. The test looks like it passes, but the
706+ # maybe_start_upload() method never returns properly which
707+ # leaves the service_lock locked.
708+ clock.pump((5, 5, 5))
709+ self.assertThat(start_upload, MockNotCalled())
710+
711+ def test_upload_is_initiated(self):
712+ # We're pretending to be the reactor in this thread. To ensure correct
713+ # operation from things like the @asynchronous decorators we need to
714+ # register as the IO thread.
715+ self.register_as_io_thread()
716+
717+ # Create a fixture for the region side of the RPC.
718+ rpc_fixture = self.useFixture(MockClusterToRegionRPCFixture())
719+ rpc_service = services.getServiceNamed('rpc')
720+ server, io = rpc_fixture.makeEventLoop(UpdateLeases)
721+ server.UpdateLeases.return_value = defer.succeed({})
722+
723+ # Create a mock response to "check_lease_changes()"
724+ fake_lease = make_random_lease()
725+ deferToThread = self.patch(lease_upload_service, 'deferToThread')
726+ deferToThread.return_value = defer.succeed(
727+ (datetime.now(), fake_lease),)
728+ mappings = convert_leases_to_mappings(fake_lease)
729+
730+ # Start the service.
731+ uuid = factory.make_UUID()
732+ service = PeriodicLeaseUploadService(rpc_service, Clock(), uuid)
733+ service.startService()
734+
735+ # Gavin says that I need to pump my IO. I don't know what this
736+ # means but it sounds important!
737+ io.pump()
738+
739+ # Ensure it called out to a new thread to get and parse the leases.
740+ self.assertThat(deferToThread, MockCalledOnceWith(check_lease_changes))
741+
742+ # Ensure it sent them to the region using RPC.
743+ self.assertThat(
744+ server.UpdateLeases,
745+ MockCalledOnceWith(ANY, uuid=uuid, mappings=mappings))
746+
747+ def test_logs_other_errors(self):
748+ service = PeriodicLeaseUploadService(
749+ sentinel.rpc, Clock(), sentinel.uuid)
750+
751+ maybe_start_upload = self.patch(service, "maybe_start_upload")
752+ maybe_start_upload.return_value = defer.fail(
753+ ZeroDivisionError("Such a shame I can't divide by zero"))
754+
755+ with FakeLogger("maas") as maaslog, TwistedLoggerFixture():
756+ d = service.try_upload()
757+
758+ self.assertEqual(None, extract_result(d))
759+ self.assertDocTestMatches(
760+ "Failed to upload leases: "
761+ "Such a shame I can't divide by zero",
762+ maaslog.output)
763
764=== modified file 'src/provisioningserver/rpc/region.py'
765--- src/provisioningserver/rpc/region.py 2014-08-25 17:19:56 +0000
766+++ src/provisioningserver/rpc/region.py 2014-08-28 03:19:24 +0000
767@@ -104,6 +104,22 @@
768 errors = []
769
770
771+class UpdateLeases(amp.Command):
772+ """Report DHCP leases on the invoking cluster controller.
773+
774+ :since: 1.7
775+ """
776+ arguments = [
777+ # The cluster UUID.
778+ (b"uuid", amp.Unicode()),
779+ (b"mappings", amp.AmpList(
780+ [(b"ip", amp.Unicode()),
781+ (b"mac", amp.Unicode())]))
782+ ]
783+ response = []
784+ errors = []
785+
786+
787 class GetProxies(amp.Command):
788 """Return the HTTP and HTTPS proxies to use.
789
790
791=== modified file 'src/provisioningserver/tasks.py'
792--- src/provisioningserver/tasks.py 2014-08-26 23:55:04 +0000
793+++ src/provisioningserver/tasks.py 2014-08-28 03:19:24 +0000
794@@ -51,7 +51,6 @@
795 restart_dhcpv4,
796 stop_dhcpv4,
797 )
798-from provisioningserver.dhcp.leases import upload_leases
799 from provisioningserver.dns.config import (
800 DNSConfig,
801 execute_rndc_command,
802@@ -321,18 +320,6 @@
803
804
805 @task
806-@log_task_events(level=logging.DEBUG)
807-@log_exception_text
808-def upload_dhcp_leases():
809- """Upload DHCP leases.
810-
811- Uploads leases to the MAAS API, using cached credentials -- the task
812- originates with celerybeat, not with a server request.
813- """
814- upload_leases()
815-
816-
817-@task
818 @log_task_events()
819 @log_exception_text
820 def add_new_dhcp_host_map(mappings, server_address, shared_key):
821
822=== modified file 'src/provisioningserver/testing/testcase.py'
823--- src/provisioningserver/testing/testcase.py 2013-10-07 09:12:40 +0000
824+++ src/provisioningserver/testing/testcase.py 2014-08-28 03:19:24 +0000
825@@ -25,6 +25,8 @@
826 record_nodegroup_uuid,
827 )
828 from provisioningserver.testing.worker_cache import WorkerCacheFixture
829+from twisted.internet import reactor
830+from twisted.python import threadable
831
832
833 class PservTestCase(testcase.MAASTestCase):
834@@ -52,3 +54,19 @@
835 self.set_maas_url()
836 self.set_api_credentials()
837 self.set_node_group_uuid()
838+
839+ def register_as_io_thread(self):
840+ """Make the current thread the IO thread.
841+
842+ When pretending to be the reactor, by using clocks and suchlike,
843+ register the current thread as the reactor thread, a.k.a. the IO
844+ thread, to ensure correct operation from things like the `synchronous`
845+ and `asynchronous` decorators.
846+
847+ Do not use this when the reactor is running.
848+ """
849+ self.assertFalse(
850+ reactor.running, "Do not use this to change the IO thread "
851+ "while the reactor is running.")
852+ self.addCleanup(setattr, threadable, "ioThread", threadable.ioThread)
853+ threadable.ioThread = threadable.getThreadID()
854
855=== modified file 'src/provisioningserver/tests/test_plugin.py'
856--- src/provisioningserver/tests/test_plugin.py 2014-08-27 03:38:28 +0000
857+++ src/provisioningserver/tests/test_plugin.py 2014-08-28 03:19:24 +0000
858@@ -114,7 +114,8 @@
859 service = service_maker.makeService(options)
860 self.assertIsInstance(service, MultiService)
861 self.assertSequenceEqual(
862- ["image_download", "log", "node_monitor", "oops", "rpc", "tftp"],
863+ ["image_download", "lease_upload", "log", "node_monitor", "oops",
864+ "rpc", "tftp"],
865 sorted(service.namedServices))
866 self.assertEqual(
867 len(service.namedServices), len(service.services),
868
869=== modified file 'src/provisioningserver/tests/test_tasks.py'
870--- src/provisioningserver/tests/test_tasks.py 2014-08-25 03:14:30 +0000
871+++ src/provisioningserver/tests/test_tasks.py 2014-08-28 03:19:24 +0000
872@@ -14,7 +14,6 @@
873 __metaclass__ = type
874 __all__ = []
875
876-from datetime import datetime
877 import json
878 import os
879 import random
880@@ -50,10 +49,7 @@
881 tasks,
882 )
883 from provisioningserver.boot import tftppath
884-from provisioningserver.dhcp import (
885- config,
886- leases,
887- )
888+from provisioningserver.dhcp import config
889 from provisioningserver.dhcp.testing.config import make_subnet_config
890 from provisioningserver.dns.config import (
891 celery_conf,
892@@ -195,14 +191,6 @@
893 'dhcp_subnets': [make_subnet_config()],
894 }
895
896- def test_upload_dhcp_leases(self):
897- self.patch(
898- leases, 'parse_leases_file',
899- Mock(return_value=(datetime.utcnow(), {})))
900- self.patch(leases, 'process_leases', Mock())
901- tasks.upload_dhcp_leases.delay()
902- self.assertEqual(1, leases.process_leases.call_count)
903-
904 def test_add_new_dhcp_host_map(self):
905 # We don't want to actually run omshell in the task, so we stub
906 # out the wrapper class's _run method and record what it would