Merge lp:~julian-edwards/maas/periodic-lease-upload-pserv into lp:~maas-committers/maas/trunk
- periodic-lease-upload-pserv
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Julian Edwards |
Approved revision: | no longer in the source branch. |
Merged at revision: | 2830 |
Proposed branch: | lp:~julian-edwards/maas/periodic-lease-upload-pserv |
Merge into: | lp:~maas-committers/maas/trunk |
Diff against target: |
906 lines (+558/-64) 15 files modified
docs/development/lease-scanning-and-dns.rst (+4/-10) etc/celeryconfig_cluster.py (+0/-8) src/maasserver/rpc/leases.py (+45/-0) src/maasserver/rpc/regionservice.py (+10/-0) src/maasserver/rpc/tests/test_regionservice.py (+40/-0) src/provisioningserver/dhcp/leases.py (+18/-8) src/provisioningserver/dhcp/tests/test_leases.py (+11/-11) src/provisioningserver/plugin.py (+13/-0) src/provisioningserver/pserv_services/lease_upload_service.py (+145/-0) src/provisioningserver/pserv_services/tests/test_lease_upload_service.py (+235/-0) src/provisioningserver/rpc/region.py (+16/-0) src/provisioningserver/tasks.py (+0/-13) src/provisioningserver/testing/testcase.py (+18/-0) src/provisioningserver/tests/test_plugin.py (+2/-1) src/provisioningserver/tests/test_tasks.py (+1/-13) |
To merge this branch: | bzr merge lp:~julian-edwards/maas/periodic-lease-upload-pserv |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gavin Panella (community) | Approve | ||
Review via email:
|
Commit message
Add a pserv service that replaces the Celery task that periodically uploads DHCP leases.
Description of the change
This branch contains everything needed to remove the old Celery task to upload leases, and replace it with a pserv-based one.
Roughly, the following changes were made:
* Removing all trace of the celery config, code and tests
* Update dev documentation
* RPC command definition for UpdateLeases call
* Code to translate between the RPC format and the format needed by the existing code that does the actual updating.
* New pserv service.
* Tests. Lots of tests!
Sorry it's on the large side, but it is a complete cut-over. I've tested that it works on my local rig.
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Gavin Panella (allenap) wrote : | # |
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Julian Edwards (julian-edwards) wrote : | # |
Thanks for the comments!
I tried your fixture suggestion in the last test but I can't get it to work. Can you take a look please?
Take this branch and apply this diff:
http://
I get this output:
=======
FAIL: provisioningser
-------
_StringException: Failed expectation: {{{
File "/home/
fail_count, Equals(0), "Unfired and/or unhandled "
File "/usr/lib/
postfix_
MismatchError: 0 != 1: Unfired and/or unhandled EventualResult(s); see test details.
}}}
Unfired/unhandled EventualResult #1: {{{
*** EventualResult has not fired:
<crochet.
*** It was connected to a Deferred:
<Deferred at 0x7f9e58fd35a8>
}}}
traceback-1: {{{
Traceback (most recent call last):
AssertionError: Forced Test Failure
}}}
Traceback (most recent call last):
File "/home/
MockCalledO
File "/usr/lib/
raise mismatch_error
MismatchError: Expected to be called once. Called 0 times.
-------
twisted: INFO: AMPTestProtocol#1 connection established (HOST:<
twisted: INFO: ClusterClient connection established (HOST:<
-------
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Gavin Panella (allenap) wrote : | # |
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Julian Edwards (julian-edwards) wrote : | # |
This is now all working on my test rig. I still can't get your modified tests to pass though, so I'll get a review and land without the changes as your off today and then we can look at it again when you're back.
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Julian Edwards (julian-edwards) wrote : | # |
I made a couple more changes:
- Catch service errors and log them — the code may look somewhat familiar to you :)
- moved the code to the new pserv_services dir
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Julian Edwards (julian-edwards) wrote : | # |
Thanks for reviewing! I'm going to leave the lock for now, see my comment. It's easily changed later if needed.
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Julian Edwards (julian-edwards) wrote : | # |
Tested again on my local rig and it seems fine, so landing now.
Preview Diff
1 | === modified file 'docs/development/lease-scanning-and-dns.rst' |
2 | --- docs/development/lease-scanning-and-dns.rst 2013-09-13 04:08:22 +0000 |
3 | +++ docs/development/lease-scanning-and-dns.rst 2014-08-28 03:19:24 +0000 |
4 | @@ -20,16 +20,10 @@ |
5 | =============== |
6 | |
7 | MAAS will periodically scan the DHCP leases file using the |
8 | -``upload_dhcp_leases()`` celery task, which originates via celerybeat. |
9 | - |
10 | -As leases are discovered, it calls the api function ``update_leases()`` which |
11 | -will subsequently generate a new task called ``add_new_dhcp_host_map()`` that |
12 | -will use omshell to write a permanent mapping from MAC to IP in the DHCP |
13 | -server. |
14 | - |
15 | -That host map remains until the node is deleted, at which point the |
16 | -``remove_dhcp_host_map()`` task is invoked which uses omshell to remove the |
17 | -map. |
18 | +``PeriodicLeaseUploadService()`` pserv service. |
19 | + |
20 | +As leases are discovered, it calls the RPC function ``UpdateLeases`` which |
21 | +stores the active leases in the DHCPLease table. |
22 | |
23 | |
24 | Updating the DNS zone file |
25 | |
26 | === modified file 'etc/celeryconfig_cluster.py' |
27 | --- etc/celeryconfig_cluster.py 2014-08-26 23:55:04 +0000 |
28 | +++ etc/celeryconfig_cluster.py 2014-08-28 03:19:24 +0000 |
29 | @@ -40,14 +40,6 @@ |
30 | PROBE_DHCP_SERVERS_SCHEDULE = timedelta(minutes=1) |
31 | |
32 | CELERYBEAT_SCHEDULE = { |
33 | - 'unconditional-dhcp-lease-upload': { |
34 | - 'task': 'provisioningserver.tasks.upload_dhcp_leases', |
35 | - 'schedule': DHCP_LEASE_UPLOAD_SCHEDULE, |
36 | - 'options': { |
37 | - 'queue': CLUSTER_UUID, |
38 | - 'expires': int(DHCP_LEASE_UPLOAD_SCHEDULE.total_seconds()), |
39 | - }, |
40 | - }, |
41 | 'report-boot-images': { |
42 | 'task': 'provisioningserver.tasks.report_boot_images', |
43 | 'schedule': REPORT_BOOT_IMAGES_SCHEDULE, |
44 | |
45 | === added file 'src/maasserver/rpc/leases.py' |
46 | --- src/maasserver/rpc/leases.py 1970-01-01 00:00:00 +0000 |
47 | +++ src/maasserver/rpc/leases.py 2014-08-28 03:19:24 +0000 |
48 | @@ -0,0 +1,45 @@ |
49 | +# Copyright 2014 Canonical Ltd. This software is licensed under the |
50 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
51 | + |
52 | +"""RPC helpers relating to DHCP leases.""" |
53 | + |
54 | +from __future__ import ( |
55 | + absolute_import, |
56 | + print_function, |
57 | + unicode_literals, |
58 | + ) |
59 | + |
60 | +str = None |
61 | + |
62 | +__metaclass__ = type |
63 | +__all__ = [ |
64 | + "update_leases", |
65 | +] |
66 | + |
67 | +from django.shortcuts import get_object_or_404 |
68 | +from maasserver.models.dhcplease import DHCPLease |
69 | +from maasserver.models.nodegroup import NodeGroup |
70 | +from maasserver.utils.async import transactional |
71 | +from provisioningserver.pserv_services.lease_upload_service import ( |
72 | + convert_mappings_to_leases, |
73 | + ) |
74 | +from provisioningserver.utils.twisted import synchronous |
75 | + |
76 | + |
77 | +@synchronous |
78 | +@transactional |
79 | +def update_leases(uuid, mappings): |
80 | + """Updates DHCP leases on a cluster given the mappings in UpdateLeases. |
81 | + |
82 | + :param uuid: Cluster UUID as found in |
83 | + :py:class`~provisioningserver.rpc.region.UpdateLeases`. |
84 | + :param mappings: List of pairs of (ip, mac) as defined in |
85 | + :py:class`~provisioningserver.rpc.region.UpdateLeases`. |
86 | + |
87 | + Converts the mappings format into a dict that |
88 | + DHCPLease.objects.update_leases needs and then calls it. |
89 | + """ |
90 | + nodegroup = get_object_or_404(NodeGroup, uuid=uuid) |
91 | + leases = convert_mappings_to_leases(mappings) |
92 | + DHCPLease.objects.update_leases(nodegroup, leases) |
93 | + return {} |
94 | |
95 | === modified file 'src/maasserver/rpc/regionservice.py' |
96 | --- src/maasserver/rpc/regionservice.py 2014-08-26 14:29:13 +0000 |
97 | +++ src/maasserver/rpc/regionservice.py 2014-08-28 03:19:24 +0000 |
98 | @@ -34,6 +34,7 @@ |
99 | bootsources, |
100 | configuration, |
101 | events, |
102 | + leases, |
103 | nodes, |
104 | ) |
105 | from maasserver.utils import synchronised |
106 | @@ -99,6 +100,15 @@ |
107 | """ |
108 | return {} |
109 | |
110 | + @region.UpdateLeases.responder |
111 | + def update_leases(self, uuid, mappings): |
112 | + """update_leases(uuid, mappings) |
113 | + |
114 | + Implementation of |
115 | + :py:class`~provisioningserver.rpc.region.UpdateLeases`. |
116 | + """ |
117 | + return deferToThread(leases.update_leases, uuid, mappings) |
118 | + |
119 | @amp.StartTLS.responder |
120 | def get_tls_parameters(self): |
121 | """get_tls_parameters() |
122 | |
123 | === modified file 'src/maasserver/rpc/tests/test_regionservice.py' |
124 | --- src/maasserver/rpc/tests/test_regionservice.py 2014-08-26 17:32:40 +0000 |
125 | +++ src/maasserver/rpc/tests/test_regionservice.py 2014-08-28 03:19:24 +0000 |
126 | @@ -34,6 +34,7 @@ |
127 | POWER_STATE, |
128 | ) |
129 | from maasserver.models.config import Config |
130 | +from maasserver.models.dhcplease import DHCPLease |
131 | from maasserver.models.event import Event |
132 | from maasserver.models.eventtype import EventType |
133 | from maasserver.models.node import Node |
134 | @@ -80,6 +81,7 @@ |
135 | RegisterEventType, |
136 | ReportBootImages, |
137 | SendEvent, |
138 | + UpdateLeases, |
139 | UpdateNodePowerState, |
140 | ) |
141 | from provisioningserver.rpc.testing import ( |
142 | @@ -234,6 +236,44 @@ |
143 | return d.addCallback(check) |
144 | |
145 | |
146 | +class TestRegionProtocol_UpdateLeases(TransactionTestCase): |
147 | + |
148 | + def test_update_leases_is_registered(self): |
149 | + protocol = Region() |
150 | + responder = protocol.locateResponder(UpdateLeases.commandName) |
151 | + self.assertIsNot(responder, None) |
152 | + |
153 | + @transactional |
154 | + def make_node_group(self, uuid): |
155 | + return factory.make_node_group(uuid=uuid) |
156 | + |
157 | + @transactional |
158 | + def get_leases_for(self, nodegroup): |
159 | + return [ |
160 | + (ng.ip, ng.mac) |
161 | + for ng in DHCPLease.objects.filter(nodegroup=nodegroup)] |
162 | + |
163 | + @wait_for_reactor |
164 | + @inlineCallbacks |
165 | + def test__stores_leases(self): |
166 | + uuid = factory.make_name("uuid") |
167 | + nodegroup = yield deferToThread(self.make_node_group, uuid) |
168 | + mapping = { |
169 | + "ip": factory.getRandomIPAddress(), |
170 | + "mac": factory.getRandomMACAddress() |
171 | + } |
172 | + |
173 | + response = yield call_responder(Region(), UpdateLeases, { |
174 | + b"uuid": uuid, b"mappings": [mapping]}) |
175 | + |
176 | + self.assertThat(response, Equals({})) |
177 | + |
178 | + [(ip, mac)] = yield deferToThread( |
179 | + self.get_leases_for, nodegroup=nodegroup) |
180 | + self.expectThat(ip, Equals(mapping["ip"])) |
181 | + self.expectThat(mac, Equals(mapping["mac"])) |
182 | + |
183 | + |
184 | class TestRegionProtocol_GetBootSources(TransactionTestCase): |
185 | |
186 | def test_get_boot_sources_is_registered(self): |
187 | |
188 | === modified file 'src/provisioningserver/dhcp/leases.py' |
189 | --- src/provisioningserver/dhcp/leases.py 2014-08-13 21:49:35 +0000 |
190 | +++ src/provisioningserver/dhcp/leases.py 2014-08-28 03:19:24 +0000 |
191 | @@ -34,6 +34,7 @@ |
192 | ] |
193 | |
194 | |
195 | +from collections import defaultdict |
196 | import errno |
197 | import json |
198 | from os import ( |
199 | @@ -46,8 +47,6 @@ |
200 | MAASDispatcher, |
201 | MAASOAuth, |
202 | ) |
203 | -from celery.app import app_or_default |
204 | -from provisioningserver import cache |
205 | from provisioningserver.auth import ( |
206 | get_recorded_api_credentials, |
207 | get_recorded_nodegroup_uuid, |
208 | @@ -59,6 +58,12 @@ |
209 | |
210 | maaslog = get_maas_logger("dhcp.leases") |
211 | |
212 | +# This used to be the cache in provisioningserver.cache, but that |
213 | +# unfortunately makes Twisted fail in ways we can't work out, probably |
214 | +# because of its use of multiprocessing. Instead it's now using a |
215 | +# simple dict, because there are no plans to make pserv multi-process. |
216 | +cache = defaultdict() |
217 | + |
218 | |
219 | # Cache key for the modification time on last-processed leases file. |
220 | LEASES_TIME_CACHE_KEY = 'leases_time' |
221 | @@ -69,8 +74,13 @@ |
222 | |
223 | |
224 | def get_leases_file(): |
225 | - """Get the location of the DHCP leases file from the config.""" |
226 | - return app_or_default().conf.DHCP_LEASES_FILE |
227 | + """Return the location of the DHCP leases file.""" |
228 | + # This used to be celery config-based so that the development env could |
229 | + # have a different location. However, nobody seems to be |
230 | + # provisioning from a dev environment so it's hard-coded until that |
231 | + # need arises, as converting to the pserv config would be wasted |
232 | + # work right now. |
233 | + return "/var/lib/maas/dhcp/dhcpd.leases" |
234 | |
235 | |
236 | def get_leases_timestamp(): |
237 | @@ -115,8 +125,8 @@ |
238 | # These variables are shared between worker threads/processes. |
239 | # A bit of inconsistency due to concurrent updates is not a problem, |
240 | # but read them both at once here to reduce the scope for trouble. |
241 | - previous_leases = cache.cache.get(LEASES_CACHE_KEY) |
242 | - previous_leases_time = cache.cache.get(LEASES_TIME_CACHE_KEY) |
243 | + previous_leases = cache.get(LEASES_CACHE_KEY) |
244 | + previous_leases_time = cache.get(LEASES_TIME_CACHE_KEY) |
245 | |
246 | if get_leases_timestamp() == previous_leases_time: |
247 | return None |
248 | @@ -139,8 +149,8 @@ |
249 | :param leases: A dict mapping each leased IP address to the MAC address |
250 | that it has been assigned to. |
251 | """ |
252 | - cache.cache.set(LEASES_TIME_CACHE_KEY, last_change) |
253 | - cache.cache.set(LEASES_CACHE_KEY, leases) |
254 | + cache[LEASES_TIME_CACHE_KEY] = last_change |
255 | + cache[LEASES_CACHE_KEY] = leases |
256 | |
257 | |
258 | def list_missing_items(knowledge): |
259 | |
260 | === modified file 'src/provisioningserver/dhcp/tests/test_leases.py' |
261 | --- src/provisioningserver/dhcp/tests/test_leases.py 2014-07-18 17:05:57 +0000 |
262 | +++ src/provisioningserver/dhcp/tests/test_leases.py 2014-08-28 03:19:24 +0000 |
263 | @@ -30,10 +30,10 @@ |
264 | get_write_time, |
265 | ) |
266 | from mock import Mock |
267 | -from provisioningserver import cache |
268 | from provisioningserver.auth import NODEGROUP_UUID_CACHE_KEY |
269 | from provisioningserver.dhcp import leases as leases_module |
270 | from provisioningserver.dhcp.leases import ( |
271 | + cache, |
272 | check_lease_changes, |
273 | LEASES_CACHE_KEY, |
274 | LEASES_TIME_CACHE_KEY, |
275 | @@ -56,8 +56,8 @@ |
276 | record_lease_state(time, leases) |
277 | self.assertEqual( |
278 | (time, leases), ( |
279 | - cache.cache.get(LEASES_TIME_CACHE_KEY), |
280 | - cache.cache.get(LEASES_CACHE_KEY), |
281 | + cache.get(LEASES_TIME_CACHE_KEY), |
282 | + cache.get(LEASES_CACHE_KEY), |
283 | )) |
284 | |
285 | |
286 | @@ -115,12 +115,12 @@ |
287 | def set_nodegroup_uuid(self): |
288 | """Set the recorded nodegroup uuid for the duration of this test.""" |
289 | uuid = factory.make_UUID() |
290 | - cache.cache.set(NODEGROUP_UUID_CACHE_KEY, uuid) |
291 | + cache[NODEGROUP_UUID_CACHE_KEY] = uuid |
292 | return uuid |
293 | |
294 | def clear_nodegroup_uuid(self): |
295 | """Clear the recorded nodegroup uuid.""" |
296 | - cache.cache.set(NODEGROUP_UUID_CACHE_KEY, None) |
297 | + cache[NODEGROUP_UUID_CACHE_KEY] = None |
298 | |
299 | def set_maas_url(self): |
300 | """Set the recorded MAAS URL for the duration of this test.""" |
301 | @@ -138,11 +138,11 @@ |
302 | """Set recorded API credentials for the duration of this test.""" |
303 | creds_string = ':'.join( |
304 | factory.make_string() for counter in range(3)) |
305 | - cache.cache.set('api_credentials', creds_string) |
306 | + cache['api_credentials'] = creds_string |
307 | |
308 | def clear_api_credentials(self): |
309 | """Clear recorded API credentials.""" |
310 | - cache.cache.set('api_credentials', None) |
311 | + cache['api_credentials'] = None |
312 | |
313 | def set_items_needed_for_lease_update(self): |
314 | """Set the recorded items required by `update_leases`.""" |
315 | @@ -157,8 +157,8 @@ |
316 | state so that it gets reset at the end of the test. Using this will |
317 | prevent recorded lease state from leaking into other tests. |
318 | """ |
319 | - cache.cache.set(LEASES_TIME_CACHE_KEY, time) |
320 | - cache.cache.set(LEASES_CACHE_KEY, leases) |
321 | + cache[LEASES_TIME_CACHE_KEY] = time |
322 | + cache[LEASES_CACHE_KEY] = leases |
323 | |
324 | def test_record_lease_state_sets_leases_and_timestamp(self): |
325 | time = datetime.utcnow() |
326 | @@ -167,8 +167,8 @@ |
327 | record_lease_state(time, leases) |
328 | self.assertEqual( |
329 | (time, leases), ( |
330 | - cache.cache.get(LEASES_TIME_CACHE_KEY), |
331 | - cache.cache.get(LEASES_CACHE_KEY), |
332 | + cache.get(LEASES_TIME_CACHE_KEY), |
333 | + cache.get(LEASES_CACHE_KEY), |
334 | )) |
335 | |
336 | def test_check_lease_changes_returns_tuple_if_no_state_cached(self): |
337 | |
338 | === modified file 'src/provisioningserver/plugin.py' |
339 | --- src/provisioningserver/plugin.py 2014-08-27 03:38:28 +0000 |
340 | +++ src/provisioningserver/plugin.py 2014-08-28 03:19:24 +0000 |
341 | @@ -34,6 +34,9 @@ |
342 | from provisioningserver.pserv_services.image_download_service import ( |
343 | PeriodicImageDownloadService, |
344 | ) |
345 | +from provisioningserver.pserv_services.lease_upload_service import ( |
346 | + PeriodicLeaseUploadService, |
347 | + ) |
348 | from provisioningserver.pserv_services.node_power_monitor_service import ( |
349 | NodePowerMonitorService, |
350 | ) |
351 | @@ -232,6 +235,12 @@ |
352 | image_download_service.setName("image_download") |
353 | return image_download_service |
354 | |
355 | + def _makePeriodicLeaseUploadService(self, rpc_service): |
356 | + lease_upload_service = PeriodicLeaseUploadService( |
357 | + rpc_service, reactor, get_cluster_uuid()) |
358 | + lease_upload_service.setName("lease_upload") |
359 | + return lease_upload_service |
360 | + |
361 | def _makeNodePowerMonitorService(self, rpc_service): |
362 | node_monitor = NodePowerMonitorService( |
363 | rpc_service, reactor, get_cluster_uuid()) |
364 | @@ -267,4 +276,8 @@ |
365 | rpc_service) |
366 | image_download_service.setServiceParent(services) |
367 | |
368 | + lease_upload_service = self._makePeriodicLeaseUploadService( |
369 | + rpc_service) |
370 | + lease_upload_service.setServiceParent(services) |
371 | + |
372 | return services |
373 | |
374 | === added file 'src/provisioningserver/pserv_services/lease_upload_service.py' |
375 | --- src/provisioningserver/pserv_services/lease_upload_service.py 1970-01-01 00:00:00 +0000 |
376 | +++ src/provisioningserver/pserv_services/lease_upload_service.py 2014-08-28 03:19:24 +0000 |
377 | @@ -0,0 +1,145 @@ |
378 | +# Copyright 2014 Canonical Ltd. This software is licensed under the |
379 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
380 | + |
381 | +"""Twisted service that periodically uploads DHCP leases to the region.""" |
382 | + |
383 | +from __future__ import ( |
384 | + absolute_import, |
385 | + print_function, |
386 | + unicode_literals, |
387 | + ) |
388 | + |
389 | +str = None |
390 | + |
391 | +__metaclass__ = type |
392 | +__all__ = [ |
393 | + "convert_leases_to_mappings", |
394 | + "convert_mappings_to_leases", |
395 | + "PeriodicLeaseUploadService", |
396 | + ] |
397 | + |
398 | + |
399 | +from provisioningserver.dhcp.leases import ( |
400 | + check_lease_changes, |
401 | + record_lease_state, |
402 | + ) |
403 | +from provisioningserver.logger import get_maas_logger |
404 | +from provisioningserver.rpc.exceptions import NoConnectionsAvailable |
405 | +from provisioningserver.rpc.region import UpdateLeases |
406 | +from provisioningserver.utils.twisted import ( |
407 | + pause, |
408 | + retries, |
409 | + ) |
410 | +from twisted.application.internet import TimerService |
411 | +from twisted.internet.defer import ( |
412 | + DeferredLock, |
413 | + inlineCallbacks, |
414 | + ) |
415 | +from twisted.internet.threads import deferToThread |
416 | +from twisted.python import log |
417 | + |
418 | + |
419 | +maaslog = get_maas_logger("lease_upload_service") |
420 | +service_lock = DeferredLock() |
421 | + |
422 | + |
423 | +def convert_mappings_to_leases(mappings): |
424 | + """Convert AMP mappings to record_lease_state() leases. |
425 | + |
426 | + Take mappings, as used by UpdateLeases, and turn into leases |
427 | + as used by record_lease_state(). |
428 | + """ |
429 | + return { |
430 | + mapping["ip"]: mapping["mac"] |
431 | + for mapping in mappings |
432 | + } |
433 | + |
434 | + |
435 | +def convert_leases_to_mappings(leases): |
436 | + """Convert record_lease_state() leases into UpdateLeases mappings. |
437 | + |
438 | + Take the leases dict, as returned by record_lease_state(), and |
439 | + turn it into a mappings list suitable for transportation in |
440 | + the UpdateLeases AMP command. |
441 | + """ |
442 | + return [ |
443 | + {"ip": ip, "mac": leases[ip]} |
444 | + for ip in leases |
445 | + ] |
446 | + |
447 | + |
448 | +class PeriodicLeaseUploadService(TimerService, object): |
449 | + """Twisted service to periodically upload DHCP leases to the region. |
450 | + |
451 | + :param client_service: A `ClusterClientService` instance for talking |
452 | + to the region controller. |
453 | + :param reactor: An `IReactor` instance. |
454 | + """ |
455 | + |
456 | + check_interval = 60 # In seconds. |
457 | + |
458 | + def __init__(self, client_service, reactor, cluster_uuid): |
459 | + # Call self.maybe_start_upload() every self.check_interval. |
460 | + super(PeriodicLeaseUploadService, self).__init__( |
461 | + self.check_interval, self.try_upload) |
462 | + self.clock = reactor |
463 | + self.client_service = client_service |
464 | + self.uuid = cluster_uuid |
465 | + maaslog.info("PeriodicLeaseUploadService starting.") |
466 | + |
467 | + def try_upload(self): |
468 | + """Wrap upload attempts in something that catches Failures. |
469 | + |
470 | + Log the full error to the Twisted log, and a concise error to |
471 | + the maas log. |
472 | + """ |
473 | + def upload_failure(failure): |
474 | + log.err(failure) |
475 | + maaslog.error( |
476 | + "Failed to upload leases: %s", failure.getErrorMessage()) |
477 | + |
478 | + return self.maybe_start_upload().addErrback(upload_failure) |
479 | + |
480 | + @inlineCallbacks |
481 | + def maybe_start_upload(self): |
482 | + """Initiate a new upload if one is not already in progress.""" |
483 | + # Use a DeferredLock to prevent simultaneous uploads. |
484 | + if service_lock.locked: |
485 | + # Don't want to block on lock release. |
486 | + return |
487 | + yield service_lock.acquire() |
488 | + try: |
489 | + yield self._get_client_and_start_upload() |
490 | + finally: |
491 | + service_lock.release() |
492 | + |
493 | + @inlineCallbacks |
494 | + def _get_client_and_start_upload(self): |
495 | + # Retry a few times, since this service usually comes up before |
496 | + # the RPC service. |
497 | + for elapsed, remaining, wait in retries(15, 5, self.clock): |
498 | + try: |
499 | + client = self.client_service.getClient() |
500 | + break |
501 | + except NoConnectionsAvailable: |
502 | + yield pause(wait, clock=self.clock) |
503 | + else: |
504 | + maaslog.error( |
505 | + "Failed to connect to region controller, cannot upload leases") |
506 | + return |
507 | + yield self._start_upload(client) |
508 | + |
509 | + @inlineCallbacks |
510 | + def _start_upload(self, client): |
511 | + maaslog.info("Scanning DHCP leases...") |
512 | + updated_lease_info = yield deferToThread(check_lease_changes) |
513 | + if updated_lease_info is None: |
514 | + maaslog.info("No leases changed since last scan") |
515 | + else: |
516 | + timestamp, leases = updated_lease_info |
517 | + record_lease_state(timestamp, leases) |
518 | + mappings = convert_leases_to_mappings(leases) |
519 | + maaslog.info("Uploading DHCP leases to region controller.") |
520 | + yield client( |
521 | + UpdateLeases, uuid=self.uuid, mappings=mappings) |
522 | + maaslog.info("Lease upload complete.") |
523 | |
524 | === added file 'src/provisioningserver/pserv_services/tests/test_lease_upload_service.py' |
525 | --- src/provisioningserver/pserv_services/tests/test_lease_upload_service.py 1970-01-01 00:00:00 +0000 |
526 | +++ src/provisioningserver/pserv_services/tests/test_lease_upload_service.py 2014-08-28 03:19:24 +0000 |
527 | @@ -0,0 +1,235 @@ |
528 | +# Copyright 2014 Canonical Ltd. This software is licensed under the |
529 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
530 | + |
531 | +"""Tests for src/provisioningserver/pserv_services/lease_upload_service.py""" |
532 | + |
533 | +from __future__ import ( |
534 | + absolute_import, |
535 | + print_function, |
536 | + unicode_literals, |
537 | + ) |
538 | + |
539 | +str = None |
540 | + |
541 | +__metaclass__ = type |
542 | +__all__ = [] |
543 | + |
544 | +from datetime import datetime |
545 | + |
546 | +from fixtures import FakeLogger |
547 | +from maastesting.factory import factory |
548 | +from maastesting.matchers import ( |
549 | + MockCalledOnceWith, |
550 | + MockCallsMatch, |
551 | + MockNotCalled, |
552 | + ) |
553 | +from mock import ( |
554 | + ANY, |
555 | + call, |
556 | + Mock, |
557 | + sentinel, |
558 | + ) |
559 | +from provisioningserver import services |
560 | +from provisioningserver.dhcp.leases import check_lease_changes |
561 | +from provisioningserver.pserv_services import lease_upload_service |
562 | +from provisioningserver.pserv_services.lease_upload_service import ( |
563 | + convert_leases_to_mappings, |
564 | + convert_mappings_to_leases, |
565 | + PeriodicLeaseUploadService, |
566 | + service_lock, |
567 | + ) |
568 | +from provisioningserver.rpc.exceptions import NoConnectionsAvailable |
569 | +from provisioningserver.rpc.region import UpdateLeases |
570 | +from provisioningserver.rpc.testing import ( |
571 | + MockClusterToRegionRPCFixture, |
572 | + TwistedLoggerFixture, |
573 | + ) |
574 | +from provisioningserver.testing.testcase import PservTestCase |
575 | +from provisioningserver.utils.twisted import pause |
576 | +from testtools.deferredruntest import extract_result |
577 | +from twisted.application.internet import TimerService |
578 | +from twisted.internet import defer |
579 | +from twisted.internet.task import Clock |
580 | + |
581 | + |
582 | +def make_random_lease(): |
583 | + ip = factory.getRandomIPAddress() |
584 | + mac = factory.getRandomMACAddress() |
585 | + return {ip: mac} |
586 | + |
587 | + |
588 | +def make_random_mapping(): |
589 | + ip = factory.getRandomIPAddress() |
590 | + mac = factory.getRandomMACAddress() |
591 | + mapping = {"ip": ip, "mac": mac} |
592 | + return mapping |
593 | + |
594 | + |
595 | +class TestHelperFunctions(PservTestCase): |
596 | + |
597 | + def test_convert_leases_to_mappings_maps_correctly(self): |
598 | + mappings = list() |
599 | + for _ in xrange(3): |
600 | + mappings.append(make_random_mapping()) |
601 | + |
602 | + # Convert to leases. |
603 | + leases = convert_mappings_to_leases(mappings) |
604 | + # Convert back and test against our original mappings. |
605 | + observed = convert_leases_to_mappings(leases) |
606 | + self.assertItemsEqual(mappings, observed) |
607 | + |
608 | + def test_convert_leases_to_mappings_converts_correctly(self): |
609 | + leases = dict() |
610 | + for _ in xrange(3): |
611 | + leases.update(make_random_lease()) |
612 | + |
613 | + # Convert to mappings. |
614 | + mappings = convert_leases_to_mappings(leases) |
615 | + # Convert back and test against our original leases. |
616 | + observed = convert_mappings_to_leases(mappings) |
617 | + self.assertEqual(observed, leases) |
618 | + |
619 | + |
620 | +class TestPeriodicImageDownloadService(PservTestCase): |
621 | + |
622 | + def test_init(self): |
623 | + service = PeriodicLeaseUploadService( |
624 | + sentinel.service, sentinel.clock, sentinel.uuid) |
625 | + self.assertIsInstance(service, TimerService) |
626 | + self.assertIs(service.clock, sentinel.clock) |
627 | + self.assertIs(service.uuid, sentinel.uuid) |
628 | + self.assertIs(service.client_service, sentinel.service) |
629 | + |
630 | + def patch_upload(self, service, return_value=None): |
631 | + patched = self.patch(service, '_get_client_and_start_upload') |
632 | + patched.return_value = defer.succeed(return_value) |
633 | + return patched |
634 | + |
635 | + def test_is_called_every_interval(self): |
636 | + clock = Clock() |
637 | + service = PeriodicLeaseUploadService( |
638 | + sentinel.service, clock, sentinel.uuid) |
639 | + # Avoid actual uploads: |
640 | + start_upload = self.patch_upload(service) |
641 | + |
642 | + # There are no calls before the service is started. |
643 | + self.assertThat(start_upload, MockNotCalled()) |
644 | + |
645 | + service.startService() |
646 | + |
647 | + # The first call is issued at startup. |
648 | + self.assertThat(start_upload, MockCalledOnceWith()) |
649 | + |
650 | + # Wind clock forward one second less than the desired interval. |
651 | + clock.advance(service.check_interval - 1) |
652 | + # No more periodic calls made. |
653 | + self.assertThat(start_upload, MockCalledOnceWith()) |
654 | + |
655 | + # Wind clock forward one second, past the interval. |
656 | + clock.advance(1) |
657 | + |
658 | + # Now there were two calls. |
659 | + self.assertThat(start_upload, MockCallsMatch(call(), call())) |
660 | + |
661 | + # Forward another interval, should be three calls. |
662 | + clock.advance(service.check_interval) |
663 | + self.assertThat( |
664 | + start_upload, MockCallsMatch(call(), call(), call())) |
665 | + |
666 | + @defer.inlineCallbacks |
667 | + def test_does_not_run_if_lock_taken(self): |
668 | + service = PeriodicLeaseUploadService( |
669 | + sentinel.rpc, Clock(), sentinel.uuid) |
670 | + start_upload = self.patch_upload(service) |
671 | + yield service_lock.acquire() |
672 | + self.addCleanup(service_lock.release) |
673 | + service.startService() |
674 | + self.assertThat(start_upload, MockNotCalled()) |
675 | + |
676 | + def test_takes_lock_when_running(self): |
677 | + clock = Clock() |
678 | + service = PeriodicLeaseUploadService( |
679 | + sentinel.rpc, clock, sentinel.uuid) |
680 | + |
681 | + # Patch the upload func so it's just a Deferred that waits for |
682 | + # one second. |
683 | + _start_upload = self.patch(service, '_get_client_and_start_upload') |
684 | + _start_upload.return_value = pause(1, clock) |
685 | + |
686 | + # Lock is acquired for the first download after startup. |
687 | + self.assertFalse(service_lock.locked) |
688 | + service.startService() |
689 | + self.assertTrue(service_lock.locked) |
690 | + |
691 | + # Lock is released once the download is done. |
692 | + clock.advance(1) |
693 | + self.assertFalse(service_lock.locked) |
694 | + |
695 | + def test_no_upload_if_no_rpc_connections(self): |
696 | + rpc_client = Mock() |
697 | + rpc_client.getClient.side_effect = NoConnectionsAvailable() |
698 | + |
699 | + clock = Clock() |
700 | + service = PeriodicLeaseUploadService( |
701 | + rpc_client, clock, sentinel.uuid) |
702 | + start_upload = self.patch(service, '_start_upload') |
703 | + service.startService() |
704 | + # Wind clock past all the retries. You can't do this in one big |
705 | + # lump, it seems. The test looks like it passes, but the |
706 | + # maybe_start_upload() method never returns properly which |
707 | + # leaves the service_lock locked. |
708 | + clock.pump((5, 5, 5)) |
709 | + self.assertThat(start_upload, MockNotCalled()) |
710 | + |
711 | + def test_upload_is_initiated(self): |
712 | + # We're pretending to be the reactor in this thread. To ensure correct |
713 | + # operation from things like the @asynchronous decorators we need to |
714 | + # register as the IO thread. |
715 | + self.register_as_io_thread() |
716 | + |
717 | + # Create a fixture for the region side of the RPC. |
718 | + rpc_fixture = self.useFixture(MockClusterToRegionRPCFixture()) |
719 | + rpc_service = services.getServiceNamed('rpc') |
720 | + server, io = rpc_fixture.makeEventLoop(UpdateLeases) |
721 | + server.UpdateLeases.return_value = defer.succeed({}) |
722 | + |
723 | + # Create a mock response to "check_lease_changes()" |
724 | + fake_lease = make_random_lease() |
725 | + deferToThread = self.patch(lease_upload_service, 'deferToThread') |
726 | + deferToThread.return_value = defer.succeed( |
727 | + (datetime.now(), fake_lease),) |
728 | + mappings = convert_leases_to_mappings(fake_lease) |
729 | + |
730 | + # Start the service. |
731 | + uuid = factory.make_UUID() |
732 | + service = PeriodicLeaseUploadService(rpc_service, Clock(), uuid) |
733 | + service.startService() |
734 | + |
735 | + # Gavin says that I need to pump my IO. I don't know what this |
736 | + # means but it sounds important! |
737 | + io.pump() |
738 | + |
739 | + # Ensure it called out to a new thread to get and parse the leases. |
740 | + self.assertThat(deferToThread, MockCalledOnceWith(check_lease_changes)) |
741 | + |
742 | + # Ensure it sent them to the region using RPC. |
743 | + self.assertThat( |
744 | + server.UpdateLeases, |
745 | + MockCalledOnceWith(ANY, uuid=uuid, mappings=mappings)) |
746 | + |
747 | + def test_logs_other_errors(self): |
748 | + service = PeriodicLeaseUploadService( |
749 | + sentinel.rpc, Clock(), sentinel.uuid) |
750 | + |
751 | + maybe_start_upload = self.patch(service, "maybe_start_upload") |
752 | + maybe_start_upload.return_value = defer.fail( |
753 | + ZeroDivisionError("Such a shame I can't divide by zero")) |
754 | + |
755 | + with FakeLogger("maas") as maaslog, TwistedLoggerFixture(): |
756 | + d = service.try_upload() |
757 | + |
758 | + self.assertEqual(None, extract_result(d)) |
759 | + self.assertDocTestMatches( |
760 | + "Failed to upload leases: " |
761 | + "Such a shame I can't divide by zero", |
762 | + maaslog.output) |
763 | |
764 | === modified file 'src/provisioningserver/rpc/region.py' |
765 | --- src/provisioningserver/rpc/region.py 2014-08-25 17:19:56 +0000 |
766 | +++ src/provisioningserver/rpc/region.py 2014-08-28 03:19:24 +0000 |
767 | @@ -104,6 +104,22 @@ |
768 | errors = [] |
769 | |
770 | |
771 | +class UpdateLeases(amp.Command): |
772 | + """Report DHCP leases on the invoking cluster controller. |
773 | + |
774 | + :since: 1.7 |
775 | + """ |
776 | + arguments = [ |
777 | + # The cluster UUID. |
778 | + (b"uuid", amp.Unicode()), |
779 | + (b"mappings", amp.AmpList( |
780 | + [(b"ip", amp.Unicode()), |
781 | + (b"mac", amp.Unicode())])) |
782 | + ] |
783 | + response = [] |
784 | + errors = [] |
785 | + |
786 | + |
787 | class GetProxies(amp.Command): |
788 | """Return the HTTP and HTTPS proxies to use. |
789 | |
790 | |
791 | === modified file 'src/provisioningserver/tasks.py' |
792 | --- src/provisioningserver/tasks.py 2014-08-26 23:55:04 +0000 |
793 | +++ src/provisioningserver/tasks.py 2014-08-28 03:19:24 +0000 |
794 | @@ -51,7 +51,6 @@ |
795 | restart_dhcpv4, |
796 | stop_dhcpv4, |
797 | ) |
798 | -from provisioningserver.dhcp.leases import upload_leases |
799 | from provisioningserver.dns.config import ( |
800 | DNSConfig, |
801 | execute_rndc_command, |
802 | @@ -321,18 +320,6 @@ |
803 | |
804 | |
805 | @task |
806 | -@log_task_events(level=logging.DEBUG) |
807 | -@log_exception_text |
808 | -def upload_dhcp_leases(): |
809 | - """Upload DHCP leases. |
810 | - |
811 | - Uploads leases to the MAAS API, using cached credentials -- the task |
812 | - originates with celerybeat, not with a server request. |
813 | - """ |
814 | - upload_leases() |
815 | - |
816 | - |
817 | -@task |
818 | @log_task_events() |
819 | @log_exception_text |
820 | def add_new_dhcp_host_map(mappings, server_address, shared_key): |
821 | |
822 | === modified file 'src/provisioningserver/testing/testcase.py' |
823 | --- src/provisioningserver/testing/testcase.py 2013-10-07 09:12:40 +0000 |
824 | +++ src/provisioningserver/testing/testcase.py 2014-08-28 03:19:24 +0000 |
825 | @@ -25,6 +25,8 @@ |
826 | record_nodegroup_uuid, |
827 | ) |
828 | from provisioningserver.testing.worker_cache import WorkerCacheFixture |
829 | +from twisted.internet import reactor |
830 | +from twisted.python import threadable |
831 | |
832 | |
833 | class PservTestCase(testcase.MAASTestCase): |
834 | @@ -52,3 +54,19 @@ |
835 | self.set_maas_url() |
836 | self.set_api_credentials() |
837 | self.set_node_group_uuid() |
838 | + |
839 | + def register_as_io_thread(self): |
840 | + """Make the current thread the IO thread. |
841 | + |
842 | + When pretending to be the reactor, by using clocks and suchlike, |
843 | + register the current thread as the reactor thread, a.k.a. the IO |
844 | + thread, to ensure correct operation from things like the `synchronous` |
845 | + and `asynchronous` decorators. |
846 | + |
847 | + Do not use this when the reactor is running. |
848 | + """ |
849 | + self.assertFalse( |
850 | + reactor.running, "Do not use this to change the IO thread " |
851 | + "while the reactor is running.") |
852 | + self.addCleanup(setattr, threadable, "ioThread", threadable.ioThread) |
853 | + threadable.ioThread = threadable.getThreadID() |
854 | |
855 | === modified file 'src/provisioningserver/tests/test_plugin.py' |
856 | --- src/provisioningserver/tests/test_plugin.py 2014-08-27 03:38:28 +0000 |
857 | +++ src/provisioningserver/tests/test_plugin.py 2014-08-28 03:19:24 +0000 |
858 | @@ -114,7 +114,8 @@ |
859 | service = service_maker.makeService(options) |
860 | self.assertIsInstance(service, MultiService) |
861 | self.assertSequenceEqual( |
862 | - ["image_download", "log", "node_monitor", "oops", "rpc", "tftp"], |
863 | + ["image_download", "lease_upload", "log", "node_monitor", "oops", |
864 | + "rpc", "tftp"], |
865 | sorted(service.namedServices)) |
866 | self.assertEqual( |
867 | len(service.namedServices), len(service.services), |
868 | |
869 | === modified file 'src/provisioningserver/tests/test_tasks.py' |
870 | --- src/provisioningserver/tests/test_tasks.py 2014-08-25 03:14:30 +0000 |
871 | +++ src/provisioningserver/tests/test_tasks.py 2014-08-28 03:19:24 +0000 |
872 | @@ -14,7 +14,6 @@ |
873 | __metaclass__ = type |
874 | __all__ = [] |
875 | |
876 | -from datetime import datetime |
877 | import json |
878 | import os |
879 | import random |
880 | @@ -50,10 +49,7 @@ |
881 | tasks, |
882 | ) |
883 | from provisioningserver.boot import tftppath |
884 | -from provisioningserver.dhcp import ( |
885 | - config, |
886 | - leases, |
887 | - ) |
888 | +from provisioningserver.dhcp import config |
889 | from provisioningserver.dhcp.testing.config import make_subnet_config |
890 | from provisioningserver.dns.config import ( |
891 | celery_conf, |
892 | @@ -195,14 +191,6 @@ |
893 | 'dhcp_subnets': [make_subnet_config()], |
894 | } |
895 | |
896 | - def test_upload_dhcp_leases(self): |
897 | - self.patch( |
898 | - leases, 'parse_leases_file', |
899 | - Mock(return_value=(datetime.utcnow(), {}))) |
900 | - self.patch(leases, 'process_leases', Mock()) |
901 | - tasks.upload_dhcp_leases.delay() |
902 | - self.assertEqual(1, leases.process_leases.call_count) |
903 | - |
904 | def test_add_new_dhcp_host_map(self): |
905 | # We don't want to actually run omshell in the task, so we stub |
906 | # out the wrapper class's _run method and record what it would |
Looking good so far :) Lots of comments, but none of them are blockers.