Merge lp:~julian-edwards/maas/cluster-timer-service into lp:~maas-committers/maas/trunk

Proposed by Julian Edwards
Status: Merged
Approved by: Raphaël Badin
Approved revision: no longer in the source branch.
Merged at revision: 2895
Proposed branch: lp:~julian-edwards/maas/cluster-timer-service
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 355 lines (+276/-5)
5 files modified
src/provisioningserver/rpc/cluster.py (+4/-5)
src/provisioningserver/rpc/clusterservice.py (+14/-0)
src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-0)
src/provisioningserver/rpc/tests/test_timers.py (+133/-0)
src/provisioningserver/rpc/timers.py (+77/-0)
To merge this branch: bzr merge lp:~julian-edwards/maas/cluster-timer-service
Reviewer Review Type Date Requested Status
Raphaël Badin (community) Approve
Review via email: mp+233324@code.launchpad.net

Commit message

Add a timer service on the cluster that the region can invoke over RPC and get an RPC callback if it doesn't cancel the timer before its deadline is reached.

To post a comment you must log in.
Revision history for this message
Raphaël Badin (rvb) wrote :

Looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/provisioningserver/rpc/cluster.py'
2--- src/provisioningserver/rpc/cluster.py 2014-09-04 04:40:11 +0000
3+++ src/provisioningserver/rpc/cluster.py 2014-09-04 09:43:13 +0000
4@@ -320,15 +320,14 @@
5 errors = []
6
7
8-class CancelTimers(amp.Command):
9- """Cancels existing timer(s) on the cluster.
10+class CancelTimer(amp.Command):
11+ """Cancels an existing timer on the cluster.
12
13 :since: 1.7
14 """
15
16 arguments = [
17- (b"timers", amp.AmpList(
18- (b"id", amp.Unicode()),
19- ))]
20+ (b"id", amp.Unicode()),
21+ ]
22 response = []
23 error = []
24
25=== modified file 'src/provisioningserver/rpc/clusterservice.py'
26--- src/provisioningserver/rpc/clusterservice.py 2014-09-02 02:48:11 +0000
27+++ src/provisioningserver/rpc/clusterservice.py 2014-09-04 09:43:13 +0000
28@@ -54,6 +54,10 @@
29 change_power_state,
30 get_power_state,
31 )
32+from provisioningserver.rpc.timers import (
33+ cancel_timer,
34+ start_timers,
35+ )
36 from twisted.application.internet import TimerService
37 from twisted.internet import ssl
38 from twisted.internet.defer import inlineCallbacks
39@@ -197,6 +201,16 @@
40 remove_host_maps(ip_addresses, shared_key)
41 return {}
42
43+ @cluster.StartTimers.responder
44+ def start_timers(self, timers):
45+ start_timers(timers)
46+ return {}
47+
48+ @cluster.CancelTimer.responder
49+ def cancel_timer(self, id):
50+ cancel_timer(id)
51+ return {}
52+
53 @amp.StartTLS.responder
54 def get_tls_parameters(self):
55 """get_tls_parameters()
56
57=== modified file 'src/provisioningserver/rpc/tests/test_clusterservice.py'
58--- src/provisioningserver/rpc/tests/test_clusterservice.py 2014-09-02 09:30:04 +0000
59+++ src/provisioningserver/rpc/tests/test_clusterservice.py 2014-09-04 09:43:13 +0000
60@@ -14,6 +14,10 @@
61 __metaclass__ = type
62 __all__ = []
63
64+from datetime import (
65+ datetime,
66+ timedelta,
67+ )
68 from itertools import product
69 import json
70 import os.path
71@@ -73,9 +77,14 @@
72 DummyConnection,
73 StubOS,
74 )
75+from provisioningserver.rpc.timers import (
76+ cancel_timer,
77+ running_timers,
78+ )
79 from provisioningserver.testing.config import set_tftp_root
80 from testtools import ExpectedException
81 from testtools.matchers import (
82+ Contains,
83 Equals,
84 HasLength,
85 Is,
86@@ -84,6 +93,7 @@
87 MatchesAll,
88 MatchesListwise,
89 MatchesStructure,
90+ Not,
91 )
92 from twisted.application.internet import TimerService
93 from twisted.internet import error
94@@ -1052,3 +1062,41 @@
95 self.assertThat(
96 remove_host_maps, MockCalledOnceWith(
97 ip_addresses, shared_key))
98+
99+
100+class TestClusterProtocol_StartTimers(MAASTestCase):
101+
102+ def test__is_registered(self):
103+ protocol = Cluster()
104+ responder = protocol.locateResponder(
105+ cluster.StartTimers.commandName)
106+ self.assertIsNot(responder, None)
107+
108+ def test__executes_start_timers(self):
109+ deadline = datetime.now(amp.utc) + timedelta(seconds=10)
110+ timers = [{
111+ "deadline": deadline, "context": factory.make_name("ctx"),
112+ "id": factory.make_name("id")}]
113+ d = call_responder(Cluster(), cluster.StartTimers, {"timers": timers})
114+ self.addCleanup(cancel_timer, timers[0]["id"])
115+ self.assertTrue(d.called)
116+ self.assertThat(running_timers, Contains(timers[0]["id"]))
117+
118+
119+class TestClusterProtocol_CancelTimer(MAASTestCase):
120+
121+ def test__is_registered(self):
122+ protocol = Cluster()
123+ responder = protocol.locateResponder(
124+ cluster.CancelTimer.commandName)
125+ self.assertIsNot(responder, None)
126+
127+ def test__executes_cancel_timer(self):
128+ deadline = datetime.now(amp.utc) + timedelta(seconds=10)
129+ timers = [{
130+ "deadline": deadline, "context": factory.make_name("ctx"),
131+ "id": factory.make_name("id")}]
132+ call_responder(Cluster(), cluster.StartTimers, {"timers": timers})
133+
134+ call_responder(Cluster(), cluster.CancelTimer, {"id": timers[0]["id"]})
135+ self.assertThat(running_timers, Not(Contains(timers[0]["id"])))
136
137=== added file 'src/provisioningserver/rpc/tests/test_timers.py'
138--- src/provisioningserver/rpc/tests/test_timers.py 1970-01-01 00:00:00 +0000
139+++ src/provisioningserver/rpc/tests/test_timers.py 2014-09-04 09:43:13 +0000
140@@ -0,0 +1,133 @@
141+# Copyright 2014 Canonical Ltd. This software is licensed under the
142+# GNU Affero General Public License version 3 (see the file LICENSE).
143+
144+"""Tests for :py:module:`~provisioningserver.rpc.timers`."""
145+
146+from __future__ import (
147+ absolute_import,
148+ print_function,
149+ unicode_literals,
150+ )
151+
152+str = None
153+
154+__metaclass__ = type
155+__all__ = []
156+
157+from datetime import (
158+ datetime,
159+ timedelta,
160+ )
161+
162+from maastesting.factory import factory
163+from maastesting.matchers import MockCalledOnceWith
164+from mock import Mock
165+from provisioningserver.rpc import timers as timers_module
166+from provisioningserver.rpc.region import TimerExpired
167+from provisioningserver.rpc.timers import (
168+ cancel_timer,
169+ running_timers,
170+ start_timers,
171+ )
172+from provisioningserver.testing.testcase import PservTestCase
173+from testtools.matchers import (
174+ Contains,
175+ Equals,
176+ HasLength,
177+ IsInstance,
178+ Not,
179+ )
180+from twisted.internet.base import DelayedCall
181+from twisted.internet.task import Clock
182+from twisted.protocols import amp
183+
184+
185+def make_timers(time_now=None):
186+ """Make some StartTimers, set to go off one second apart starting in
187+ one second"""
188+ if time_now is None:
189+ time_now = datetime.now(amp.utc)
190+ timers = []
191+ for i in xrange(2):
192+ timers.append({
193+ "deadline": time_now + timedelta(seconds=i + 1),
194+ "context": factory.make_name("context"),
195+ "id": factory.make_name("id"),
196+ })
197+ return timers
198+
199+
200+class TestStartTimers(PservTestCase):
201+ """Tests for `~provisioningserver.rpc.timers.start_timers`."""
202+
203+ def test__sets_up_running_timers(self):
204+ clock = Clock()
205+ timers = make_timers()
206+ start_timers(timers, clock)
207+
208+ self.expectThat(running_timers, HasLength(len(timers)))
209+ for timer in timers:
210+ id = timer["id"]
211+ self.expectThat(running_timers[id], IsInstance(tuple))
212+ delayed_call, context = running_timers[id]
213+ self.expectThat(delayed_call, IsInstance(DelayedCall))
214+ self.expectThat(context, Equals(timer["context"]))
215+
216+ def test__removes_from_running_timers_when_timer_expires(self):
217+ self.patch(timers_module, "getRegionClient")
218+ clock = Clock()
219+ timers = make_timers()
220+ start_timers(timers, clock)
221+
222+ # Expire the first timer.
223+ clock.advance(1)
224+ self.assertThat(running_timers, Not(Contains(timers[0]["id"])))
225+ self.assertThat(running_timers, Contains(timers[1]["id"]))
226+
227+ # Expire the other time.
228+ clock.advance(1)
229+ self.assertThat(running_timers, Not(Contains(timers[1]["id"])))
230+
231+ def test__calls_TimerExpired_when_timer_expires(self):
232+ getRegionClient = self.patch(timers_module, "getRegionClient")
233+ client = Mock()
234+ getRegionClient.return_value = client
235+ clock = Clock()
236+ timers = make_timers()
237+ # Just use the first one for this test.
238+ timer = timers[0]
239+ start_timers([timer], clock)
240+ clock.advance(1)
241+
242+ self.assertThat(
243+ client,
244+ MockCalledOnceWith(
245+ TimerExpired, id=timer["id"],
246+ context=timer["context"]))
247+
248+
249+class TestCancelTimer(PservTestCase):
250+ """Tests for `~provisioningserver.rpc.timers.cancel_timer`."""
251+
252+ def test__cancels_running_timer(self):
253+ timers = make_timers()
254+ clock = Clock()
255+ start_timers(timers, clock)
256+ dc, _ = running_timers[timers[0]["id"]]
257+
258+ cancel_timer(timers[0]["id"])
259+
260+ self.expectThat(running_timers, Not(Contains(timers[0]["id"])))
261+ self.expectThat(running_timers, Contains(timers[1]["id"]))
262+ self.assertTrue(dc.cancelled)
263+
264+ def test__silently_ignores_already_cancelled_timer(self):
265+ timers = make_timers()
266+ clock = Clock()
267+ self.addCleanup(running_timers.clear)
268+ start_timers(timers, clock)
269+
270+ cancel_timer(factory.make_string())
271+
272+ self.expectThat(running_timers, Contains(timers[0]["id"]))
273+ self.expectThat(running_timers, Contains(timers[1]["id"]))
274
275=== added file 'src/provisioningserver/rpc/timers.py'
276--- src/provisioningserver/rpc/timers.py 1970-01-01 00:00:00 +0000
277+++ src/provisioningserver/rpc/timers.py 2014-09-04 09:43:13 +0000
278@@ -0,0 +1,77 @@
279+# Copyright 2014 Canonical Ltd. This software is licensed under the
280+# GNU Affero General Public License version 3 (see the file LICENSE).
281+
282+"""RPC helpers for timers."""
283+
284+from __future__ import (
285+ absolute_import,
286+ print_function,
287+ unicode_literals,
288+ )
289+
290+str = None
291+
292+__metaclass__ = type
293+__all__ = [
294+ "cancel_timer",
295+ "start_timers",
296+]
297+
298+from datetime import datetime
299+
300+from provisioningserver.logger import get_maas_logger
301+from provisioningserver.rpc import getRegionClient
302+from provisioningserver.rpc.exceptions import NoConnectionsAvailable
303+from provisioningserver.rpc.region import TimerExpired
304+from twisted.internet import reactor
305+from twisted.protocols import amp
306+
307+
308+maaslog = get_maas_logger("timers")
309+
310+
311+# Currently running timers; contains dict with keys of ID mapping to a
312+# (delayed_call, context) pair.
313+running_timers = dict()
314+
315+
316+def start_timers(timers, clock=reactor):
317+ """RPC responder to start timers as specified.
318+
319+ :param timers: a `StartTimers` message.
320+
321+ Will create one delayed callback for each of the timers and if it
322+ reaches its deadline, call `TimerExpired` in the region passing back the
323+ timer ID.
324+ """
325+ for timer in timers:
326+ delay = timer["deadline"] - datetime.now(amp.utc)
327+ timer_id = timer["id"]
328+ call = clock.callLater(delay.total_seconds(), timer_expired, timer_id)
329+ running_timers[timer_id] = (call, timer["context"])
330+
331+
332+def timer_expired(timer_id):
333+ """Called when a timer hits its deadline.
334+
335+ Call TimerExpired with the context for the timer.
336+ """
337+ _, context = running_timers.pop(timer_id)
338+ try:
339+ client = getRegionClient()
340+ except NoConnectionsAvailable:
341+ maaslog.error(
342+ "Lost connection to the region, unable to fire timer with ID: %s",
343+ timer_id)
344+ return None
345+
346+ return client(TimerExpired, id=timer_id, context=context)
347+
348+
349+def cancel_timer(timer_id):
350+ """Called from the region to cancel a running timer."""
351+ try:
352+ dc, _ = running_timers.pop(timer_id)
353+ except KeyError:
354+ return
355+ dc.cancel()