Merge ~blake-rouse/maas:rack-ping-region into maas:master

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: a5e703c0140cbbb4f9f59bbf2869a727c0770776
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~blake-rouse/maas:rack-ping-region
Merge into: maas:master
Prerequisite: ~blake-rouse/maas:dhcp-timeout
Diff against target: 276 lines (+160/-8)
5 files modified
src/provisioningserver/plugin.py (+9/-0)
src/provisioningserver/rpc/clusterservice.py (+54/-5)
src/provisioningserver/rpc/common.py (+20/-0)
src/provisioningserver/rpc/tests/test_clusterservice.py (+67/-1)
src/provisioningserver/tests/test_plugin.py (+10/-2)
Reviewer Review Type Date Requested Status
Andres Rodriguez (community) Approve
MAAS Lander Approve
Review via email: mp+343345@code.launchpad.net

Commit message

Ping the region through RPC every 30 seconds from the rack controller. When ping fails after 10 seconds close the connection.

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rack-ping-region lp:~blake-rouse/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: a5e703c0140cbbb4f9f59bbf2869a727c0770776

review: Approve
Revision history for this message
Andres Rodriguez (andreserl) wrote :

The code looks good to me, it is fairly straightforward.

review: Approve
Revision history for this message
Blake Rouse (blake-rouse) :
Revision history for this message
MAAS Lander (maas-lander) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/src/provisioningserver/plugin.py b/src/provisioningserver/plugin.py
index 031d53c..ba46e86 100644
--- a/src/provisioningserver/plugin.py
+++ b/src/provisioningserver/plugin.py
@@ -134,6 +134,14 @@ class ProvisioningServiceMaker:
134 rpc_service.setName("rpc")134 rpc_service.setName("rpc")
135 return rpc_service135 return rpc_service
136136
137 def _makeRPCPingService(self, rpc_service, clock=reactor):
138 from provisioningserver.rpc.clusterservice import (
139 ClusterClientCheckerService,
140 )
141 service = ClusterClientCheckerService(rpc_service, reactor)
142 service.setName("rpc-ping")
143 return service
144
137 def _makeNetworksMonitoringService(self, rpc_service, clock=reactor):145 def _makeNetworksMonitoringService(self, rpc_service, clock=reactor):
138 from provisioningserver.rackdservices.networks_monitoring_service \146 from provisioningserver.rackdservices.networks_monitoring_service \
139 import RackNetworksMonitoringService147 import RackNetworksMonitoringService
@@ -167,6 +175,7 @@ class ProvisioningServiceMaker:
167 rpc_service = self._makeRPCService()175 rpc_service = self._makeRPCService()
168 yield rpc_service176 yield rpc_service
169 # Other services that make up the MAAS Region Controller.177 # Other services that make up the MAAS Region Controller.
178 yield self._makeRPCPingService(rpc_service, clock=clock)
170 yield self._makeNetworksMonitoringService(rpc_service, clock=clock)179 yield self._makeNetworksMonitoringService(rpc_service, clock=clock)
171 yield self._makeDHCPProbeService(rpc_service)180 yield self._makeDHCPProbeService(rpc_service)
172 yield self._makeLeaseSocketService(rpc_service)181 yield self._makeLeaseSocketService(rpc_service)
diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
index cd631c7..1ab1122 100644
--- a/src/provisioningserver/rpc/clusterservice.py
+++ b/src/provisioningserver/rpc/clusterservice.py
@@ -62,7 +62,10 @@ from provisioningserver.rpc.boot_images import (
62 is_import_boot_images_running,62 is_import_boot_images_running,
63 list_boot_images,63 list_boot_images,
64)64)
65from provisioningserver.rpc.common import RPCProtocol65from provisioningserver.rpc.common import (
66 Ping,
67 RPCProtocol,
68)
66from provisioningserver.rpc.interfaces import IConnectionToRegion69from provisioningserver.rpc.interfaces import IConnectionToRegion
67from provisioningserver.rpc.osystems import (70from provisioningserver.rpc.osystems import (
68 gen_operating_systems,71 gen_operating_systems,
@@ -1162,10 +1165,12 @@ class ClusterClientService(TimerService, object):
1162 # Fully connected to the region; update every so often.1165 # Fully connected to the region; update every so often.
1163 return self.INTERVAL_HIGH1166 return self.INTERVAL_HIGH
11641167
1165 def _update_interval(self, num_eventloops, num_connections):1168 def _update_interval(self, num_eventloops, num_connections, reset=False):
1166 """Change the update interval."""1169 """Change the update interval."""
1167 self._loop.interval = self.step = self._calculate_interval(1170 self._loop.interval = self.step = self._calculate_interval(
1168 num_eventloops, num_connections)1171 num_eventloops, num_connections)
1172 if reset and self._loop.running:
1173 self._loop.reset()
11691174
1170 @inlineCallbacks1175 @inlineCallbacks
1171 def _update_connections(self, eventloops):1176 def _update_connections(self, eventloops):
@@ -1337,6 +1342,50 @@ class ClusterClientService(TimerService, object):
1337 "Lost all connections to region controllers. "1342 "Lost all connections to region controllers. "
1338 "Stopping service(s) %s." % ",".join(stopping_services))1343 "Stopping service(s) %s." % ",".join(stopping_services))
1339 service_monitor.ensureServices()1344 service_monitor.ensureServices()
1340 # Lower the interval so a re-check happens sooner instead of its1345 # Lower and reset the interval so a reconnection happens.
1341 # currently set interval.1346 self._update_interval(0, 0, reset=True)
1342 self._update_interval(0, 0)1347
1348
1349class ClusterClientCheckerService(TimerService, object):
1350 """A cluster controller RPC client checker service.
1351
1352 This is a service - in the Twisted sense - that cordinates with the
1353 `ClusterClientService` to ensure that all RPC connections are functional.
1354 A ping is performed over each current connection to ensure that the
1355 connection is working properly. If connection is not operational then it
1356 is dropped allowing the `ClusterClientService` to make a new connection.
1357
1358 :ivar client_service: The `ClusterClientService` instance.
1359 """
1360
1361 def __init__(self, client_service, reactor):
1362 super(ClusterClientCheckerService, self).__init__(30, self.tryLoop)
1363 self.client_service = client_service
1364 self.clock = reactor
1365
1366 def tryLoop(self):
1367 d = self.loop()
1368 d.addErrback(
1369 log.err, "Failure while performing ping on RPC connections.")
1370 return d
1371
1372 def loop(self):
1373 return DeferredList([
1374 self._ping(client)
1375 for client in self.client_service.getAllClients()
1376 ], consumeErrors=True)
1377
1378 def _ping(self, client):
1379 """Ping the client to ensure it works."""
1380
1381 def _onFailure(failure):
1382 log.msg(
1383 "Failure on ping dropping connection to event-loop: %s" % (
1384 client.ident))
1385 # The protocol will call `remove_connection on the
1386 # `ClusterClientService` that will perform the reconnection.
1387 client._conn.transport.loseConnection()
1388
1389 d = client(Ping, _timeout=10)
1390 d.addErrback(_onFailure)
1391 return d
diff --git a/src/provisioningserver/rpc/common.py b/src/provisioningserver/rpc/common.py
index be168cf..6f74366 100644
--- a/src/provisioningserver/rpc/common.py
+++ b/src/provisioningserver/rpc/common.py
@@ -79,6 +79,17 @@ class Authenticate(amp.Command):
79 errors = []79 errors = []
8080
8181
82class Ping(amp.Command):
83 """Ensure the connection is still good.
84
85 :since: 2.4
86 """
87
88 arguments = []
89 response = []
90 errors = []
91
92
82class Client:93class Client:
83 """Wrapper around an :class:`amp.AMP` instance.94 """Wrapper around an :class:`amp.AMP` instance.
8495
@@ -283,3 +294,12 @@ class RPCProtocol(amp.AMP, object):
283 "Unhandled failure during AMP request. This is probably a bug. "294 "Unhandled failure during AMP request. This is probably a bug. "
284 "Please ensure that this error is handled within application "295 "Please ensure that this error is handled within application "
285 "code."))296 "code."))
297
298 @Ping.responder
299 def ping(self):
300 """ping()
301
302 Implementation of
303 :py:class:`~provisioningserver.rpc.common.Ping`.
304 """
305 return {}
diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
index 1c9eb0f..2989a2c 100644
--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
@@ -90,6 +90,7 @@ from provisioningserver.rpc import (
90from provisioningserver.rpc.clusterservice import (90from provisioningserver.rpc.clusterservice import (
91 Cluster,91 Cluster,
92 ClusterClient,92 ClusterClient,
93 ClusterClientCheckerService,
93 ClusterClientService,94 ClusterClientService,
94 executeScanNetworksSubprocess,95 executeScanNetworksSubprocess,
95 get_scan_all_networks_args,96 get_scan_all_networks_args,
@@ -131,7 +132,10 @@ from testtools.matchers import (
131)132)
132from twisted import web133from twisted import web
133from twisted.application.internet import TimerService134from twisted.application.internet import TimerService
134from twisted.internet import error135from twisted.internet import (
136 error,
137 reactor,
138)
135from twisted.internet.defer import (139from twisted.internet.defer import (
136 Deferred,140 Deferred,
137 fail,141 fail,
@@ -1534,6 +1538,68 @@ class TestClusterClient(MAASTestCase):
1534 version=get_maas_version()))1538 version=get_maas_version()))
15351539
15361540
1541class TestClusterClientCheckerService(MAASTestCase):
1542
1543 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
1544
1545 def make_client(self):
1546 client = Mock()
1547 client.return_value = succeed(None)
1548 return client
1549
1550 def test_init_sets_up_timer_correctly(self):
1551 service = ClusterClientCheckerService(
1552 sentinel.client_service, sentinel.clock)
1553 self.assertThat(service, MatchesStructure.byEquality(
1554 call=(service.tryLoop, (), {}),
1555 step=(30), client_service=sentinel.client_service,
1556 clock=sentinel.clock))
1557
1558 def test_tryLoop_calls_loop(self):
1559 service = ClusterClientCheckerService(
1560 sentinel.client_service, sentinel.clock)
1561 mock_loop = self.patch(service, "loop")
1562 mock_loop.return_value = succeed(None)
1563 service.tryLoop()
1564 self.assertThat(
1565 mock_loop,
1566 MockCalledOnceWith())
1567
1568 def test_loop_does_nothing_with_no_clients(self):
1569 mock_client_service = MagicMock()
1570 mock_client_service.getAllClients.return_value = []
1571 service = ClusterClientCheckerService(
1572 mock_client_service, reactor)
1573 # Test will timeout if this blocks longer than 5 seconds.
1574 return service.loop()
1575
1576 @inlineCallbacks
1577 def test_loop_calls_ping_for_each_client(self):
1578 clients = [
1579 self.make_client()
1580 for _ in range(3)
1581 ]
1582 mock_client_service = MagicMock()
1583 mock_client_service.getAllClients.return_value = clients
1584 service = ClusterClientCheckerService(
1585 mock_client_service, reactor)
1586 yield service.loop()
1587 for client in clients:
1588 self.expectThat(
1589 client, MockCalledOnceWith(common.Ping, _timeout=10))
1590
1591 @inlineCallbacks
1592 def test_ping_calls_loseConnection_on_failure(self):
1593 client = MagicMock()
1594 client.return_value = fail(factory.make_exception())
1595 mock_client_service = MagicMock()
1596 service = ClusterClientCheckerService(
1597 mock_client_service, reactor)
1598 yield service._ping(client)
1599 self.assertThat(
1600 client._conn.transport.loseConnection, MockCalledOnceWith())
1601
1602
1537class TestClusterProtocol_ListSupportedArchitectures(MAASTestCase):1603class TestClusterProtocol_ListSupportedArchitectures(MAASTestCase):
15381604
1539 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)1605 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
diff --git a/src/provisioningserver/tests/test_plugin.py b/src/provisioningserver/tests/test_plugin.py
index d62b765..caa9409 100644
--- a/src/provisioningserver/tests/test_plugin.py
+++ b/src/provisioningserver/tests/test_plugin.py
@@ -45,6 +45,7 @@ from provisioningserver.rackdservices.tftp import (
45 TFTPService,45 TFTPService,
46)46)
47from provisioningserver.rackdservices.tftp_offload import TFTPOffloadService47from provisioningserver.rackdservices.tftp_offload import TFTPOffloadService
48from provisioningserver.rpc.clusterservice import ClusterClientCheckerService
48from provisioningserver.testing.config import ClusterConfigurationFixture49from provisioningserver.testing.config import ClusterConfigurationFixture
49from provisioningserver.utils.twisted import reducedWebLogFormatter50from provisioningserver.utils.twisted import reducedWebLogFormatter
50from testtools.matchers import (51from testtools.matchers import (
@@ -105,8 +106,8 @@ class TestProvisioningServiceMaker(MAASTestCase):
105 self.assertIsInstance(service, MultiService)106 self.assertIsInstance(service, MultiService)
106 expected_services = [107 expected_services = [
107 "dhcp_probe", "networks_monitor", "image_download",108 "dhcp_probe", "networks_monitor", "image_download",
108 "lease_socket_service", "node_monitor", "ntp", "rpc", "tftp",109 "lease_socket_service", "node_monitor", "ntp", "rpc", "rpc-ping",
109 "image_service", "service_monitor",110 "tftp", "image_service", "service_monitor",
110 ]111 ]
111 self.assertThat(service.namedServices, KeysEqual(*expected_services))112 self.assertThat(service.namedServices, KeysEqual(*expected_services))
112 self.assertEqual(113 self.assertEqual(
@@ -177,6 +178,13 @@ class TestProvisioningServiceMaker(MAASTestCase):
177 service_monitor = service.getServiceNamed("service_monitor")178 service_monitor = service.getServiceNamed("service_monitor")
178 self.assertIsInstance(service_monitor, ServiceMonitorService)179 self.assertIsInstance(service_monitor, ServiceMonitorService)
179180
181 def test_rpc_ping_service(self):
182 options = Options()
183 service_maker = ProvisioningServiceMaker("Harry", "Hill")
184 service = service_maker.makeService(options, clock=None)
185 rpc_ping = service.getServiceNamed("rpc-ping")
186 self.assertIsInstance(rpc_ping, ClusterClientCheckerService)
187
180 def test_tftp_service(self):188 def test_tftp_service(self):
181 # A TFTP service is configured and added to the top-level service.189 # A TFTP service is configured and added to the top-level service.
182 options = Options()190 options = Options()

Subscribers

People subscribed via source and target branches