Merge ~blake-rouse/maas:rack-ping-region into maas:master

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: a5e703c0140cbbb4f9f59bbf2869a727c0770776
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~blake-rouse/maas:rack-ping-region
Merge into: maas:master
Prerequisite: ~blake-rouse/maas:dhcp-timeout
Diff against target: 276 lines (+160/-8)
5 files modified
src/provisioningserver/plugin.py (+9/-0)
src/provisioningserver/rpc/clusterservice.py (+54/-5)
src/provisioningserver/rpc/common.py (+20/-0)
src/provisioningserver/rpc/tests/test_clusterservice.py (+67/-1)
src/provisioningserver/tests/test_plugin.py (+10/-2)
Reviewer Review Type Date Requested Status
Andres Rodriguez (community) Approve
MAAS Lander Approve
Review via email: mp+343345@code.launchpad.net

Commit message

Ping the region through RPC every 30 seconds from the rack controller. When ping fails after 10 seconds close the connection.

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rack-ping-region lp:~blake-rouse/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: a5e703c0140cbbb4f9f59bbf2869a727c0770776

review: Approve
Revision history for this message
Andres Rodriguez (andreserl) wrote :

The code looks good to me, it is fairly straightforward.

review: Approve
Revision history for this message
Blake Rouse (blake-rouse) :
Revision history for this message
MAAS Lander (maas-lander) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/provisioningserver/plugin.py b/src/provisioningserver/plugin.py
2index 031d53c..ba46e86 100644
3--- a/src/provisioningserver/plugin.py
4+++ b/src/provisioningserver/plugin.py
5@@ -134,6 +134,14 @@ class ProvisioningServiceMaker:
6 rpc_service.setName("rpc")
7 return rpc_service
8
9+ def _makeRPCPingService(self, rpc_service, clock=reactor):
10+ from provisioningserver.rpc.clusterservice import (
11+ ClusterClientCheckerService,
12+ )
13+ service = ClusterClientCheckerService(rpc_service, reactor)
14+ service.setName("rpc-ping")
15+ return service
16+
17 def _makeNetworksMonitoringService(self, rpc_service, clock=reactor):
18 from provisioningserver.rackdservices.networks_monitoring_service \
19 import RackNetworksMonitoringService
20@@ -167,6 +175,7 @@ class ProvisioningServiceMaker:
21 rpc_service = self._makeRPCService()
22 yield rpc_service
23 # Other services that make up the MAAS Region Controller.
24+ yield self._makeRPCPingService(rpc_service, clock=clock)
25 yield self._makeNetworksMonitoringService(rpc_service, clock=clock)
26 yield self._makeDHCPProbeService(rpc_service)
27 yield self._makeLeaseSocketService(rpc_service)
28diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
29index cd631c7..1ab1122 100644
30--- a/src/provisioningserver/rpc/clusterservice.py
31+++ b/src/provisioningserver/rpc/clusterservice.py
32@@ -62,7 +62,10 @@ from provisioningserver.rpc.boot_images import (
33 is_import_boot_images_running,
34 list_boot_images,
35 )
36-from provisioningserver.rpc.common import RPCProtocol
37+from provisioningserver.rpc.common import (
38+ Ping,
39+ RPCProtocol,
40+)
41 from provisioningserver.rpc.interfaces import IConnectionToRegion
42 from provisioningserver.rpc.osystems import (
43 gen_operating_systems,
44@@ -1162,10 +1165,12 @@ class ClusterClientService(TimerService, object):
45 # Fully connected to the region; update every so often.
46 return self.INTERVAL_HIGH
47
48- def _update_interval(self, num_eventloops, num_connections):
49+ def _update_interval(self, num_eventloops, num_connections, reset=False):
50 """Change the update interval."""
51 self._loop.interval = self.step = self._calculate_interval(
52 num_eventloops, num_connections)
53+ if reset and self._loop.running:
54+ self._loop.reset()
55
56 @inlineCallbacks
57 def _update_connections(self, eventloops):
58@@ -1337,6 +1342,50 @@ class ClusterClientService(TimerService, object):
59 "Lost all connections to region controllers. "
60 "Stopping service(s) %s." % ",".join(stopping_services))
61 service_monitor.ensureServices()
62- # Lower the interval so a re-check happens sooner instead of its
63- # currently set interval.
64- self._update_interval(0, 0)
65+ # Lower and reset the interval so a reconnection happens.
66+ self._update_interval(0, 0, reset=True)
67+
68+
69+class ClusterClientCheckerService(TimerService, object):
70+ """A cluster controller RPC client checker service.
71+
72+ This is a service - in the Twisted sense - that cordinates with the
73+ `ClusterClientService` to ensure that all RPC connections are functional.
74+ A ping is performed over each current connection to ensure that the
75+ connection is working properly. If connection is not operational then it
76+ is dropped allowing the `ClusterClientService` to make a new connection.
77+
78+ :ivar client_service: The `ClusterClientService` instance.
79+ """
80+
81+ def __init__(self, client_service, reactor):
82+ super(ClusterClientCheckerService, self).__init__(30, self.tryLoop)
83+ self.client_service = client_service
84+ self.clock = reactor
85+
86+ def tryLoop(self):
87+ d = self.loop()
88+ d.addErrback(
89+ log.err, "Failure while performing ping on RPC connections.")
90+ return d
91+
92+ def loop(self):
93+ return DeferredList([
94+ self._ping(client)
95+ for client in self.client_service.getAllClients()
96+ ], consumeErrors=True)
97+
98+ def _ping(self, client):
99+ """Ping the client to ensure it works."""
100+
101+ def _onFailure(failure):
102+ log.msg(
103+ "Failure on ping dropping connection to event-loop: %s" % (
104+ client.ident))
105+ # The protocol will call `remove_connection on the
106+ # `ClusterClientService` that will perform the reconnection.
107+ client._conn.transport.loseConnection()
108+
109+ d = client(Ping, _timeout=10)
110+ d.addErrback(_onFailure)
111+ return d
112diff --git a/src/provisioningserver/rpc/common.py b/src/provisioningserver/rpc/common.py
113index be168cf..6f74366 100644
114--- a/src/provisioningserver/rpc/common.py
115+++ b/src/provisioningserver/rpc/common.py
116@@ -79,6 +79,17 @@ class Authenticate(amp.Command):
117 errors = []
118
119
120+class Ping(amp.Command):
121+ """Ensure the connection is still good.
122+
123+ :since: 2.4
124+ """
125+
126+ arguments = []
127+ response = []
128+ errors = []
129+
130+
131 class Client:
132 """Wrapper around an :class:`amp.AMP` instance.
133
134@@ -283,3 +294,12 @@ class RPCProtocol(amp.AMP, object):
135 "Unhandled failure during AMP request. This is probably a bug. "
136 "Please ensure that this error is handled within application "
137 "code."))
138+
139+ @Ping.responder
140+ def ping(self):
141+ """ping()
142+
143+ Implementation of
144+ :py:class:`~provisioningserver.rpc.common.Ping`.
145+ """
146+ return {}
147diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
148index 1c9eb0f..2989a2c 100644
149--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
150+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
151@@ -90,6 +90,7 @@ from provisioningserver.rpc import (
152 from provisioningserver.rpc.clusterservice import (
153 Cluster,
154 ClusterClient,
155+ ClusterClientCheckerService,
156 ClusterClientService,
157 executeScanNetworksSubprocess,
158 get_scan_all_networks_args,
159@@ -131,7 +132,10 @@ from testtools.matchers import (
160 )
161 from twisted import web
162 from twisted.application.internet import TimerService
163-from twisted.internet import error
164+from twisted.internet import (
165+ error,
166+ reactor,
167+)
168 from twisted.internet.defer import (
169 Deferred,
170 fail,
171@@ -1534,6 +1538,68 @@ class TestClusterClient(MAASTestCase):
172 version=get_maas_version()))
173
174
175+class TestClusterClientCheckerService(MAASTestCase):
176+
177+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
178+
179+ def make_client(self):
180+ client = Mock()
181+ client.return_value = succeed(None)
182+ return client
183+
184+ def test_init_sets_up_timer_correctly(self):
185+ service = ClusterClientCheckerService(
186+ sentinel.client_service, sentinel.clock)
187+ self.assertThat(service, MatchesStructure.byEquality(
188+ call=(service.tryLoop, (), {}),
189+ step=(30), client_service=sentinel.client_service,
190+ clock=sentinel.clock))
191+
192+ def test_tryLoop_calls_loop(self):
193+ service = ClusterClientCheckerService(
194+ sentinel.client_service, sentinel.clock)
195+ mock_loop = self.patch(service, "loop")
196+ mock_loop.return_value = succeed(None)
197+ service.tryLoop()
198+ self.assertThat(
199+ mock_loop,
200+ MockCalledOnceWith())
201+
202+ def test_loop_does_nothing_with_no_clients(self):
203+ mock_client_service = MagicMock()
204+ mock_client_service.getAllClients.return_value = []
205+ service = ClusterClientCheckerService(
206+ mock_client_service, reactor)
207+ # Test will timeout if this blocks longer than 5 seconds.
208+ return service.loop()
209+
210+ @inlineCallbacks
211+ def test_loop_calls_ping_for_each_client(self):
212+ clients = [
213+ self.make_client()
214+ for _ in range(3)
215+ ]
216+ mock_client_service = MagicMock()
217+ mock_client_service.getAllClients.return_value = clients
218+ service = ClusterClientCheckerService(
219+ mock_client_service, reactor)
220+ yield service.loop()
221+ for client in clients:
222+ self.expectThat(
223+ client, MockCalledOnceWith(common.Ping, _timeout=10))
224+
225+ @inlineCallbacks
226+ def test_ping_calls_loseConnection_on_failure(self):
227+ client = MagicMock()
228+ client.return_value = fail(factory.make_exception())
229+ mock_client_service = MagicMock()
230+ service = ClusterClientCheckerService(
231+ mock_client_service, reactor)
232+ yield service._ping(client)
233+ self.assertThat(
234+ client._conn.transport.loseConnection, MockCalledOnceWith())
235+
236+
237 class TestClusterProtocol_ListSupportedArchitectures(MAASTestCase):
238
239 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
240diff --git a/src/provisioningserver/tests/test_plugin.py b/src/provisioningserver/tests/test_plugin.py
241index d62b765..caa9409 100644
242--- a/src/provisioningserver/tests/test_plugin.py
243+++ b/src/provisioningserver/tests/test_plugin.py
244@@ -45,6 +45,7 @@ from provisioningserver.rackdservices.tftp import (
245 TFTPService,
246 )
247 from provisioningserver.rackdservices.tftp_offload import TFTPOffloadService
248+from provisioningserver.rpc.clusterservice import ClusterClientCheckerService
249 from provisioningserver.testing.config import ClusterConfigurationFixture
250 from provisioningserver.utils.twisted import reducedWebLogFormatter
251 from testtools.matchers import (
252@@ -105,8 +106,8 @@ class TestProvisioningServiceMaker(MAASTestCase):
253 self.assertIsInstance(service, MultiService)
254 expected_services = [
255 "dhcp_probe", "networks_monitor", "image_download",
256- "lease_socket_service", "node_monitor", "ntp", "rpc", "tftp",
257- "image_service", "service_monitor",
258+ "lease_socket_service", "node_monitor", "ntp", "rpc", "rpc-ping",
259+ "tftp", "image_service", "service_monitor",
260 ]
261 self.assertThat(service.namedServices, KeysEqual(*expected_services))
262 self.assertEqual(
263@@ -177,6 +178,13 @@ class TestProvisioningServiceMaker(MAASTestCase):
264 service_monitor = service.getServiceNamed("service_monitor")
265 self.assertIsInstance(service_monitor, ServiceMonitorService)
266
267+ def test_rpc_ping_service(self):
268+ options = Options()
269+ service_maker = ProvisioningServiceMaker("Harry", "Hill")
270+ service = service_maker.makeService(options, clock=None)
271+ rpc_ping = service.getServiceNamed("rpc-ping")
272+ self.assertIsInstance(rpc_ping, ClusterClientCheckerService)
273+
274 def test_tftp_service(self):
275 # A TFTP service is configured and added to the top-level service.
276 options = Options()

Subscribers

People subscribed via source and target branches