Merge lp:~allenap/maas/narrow-port-range--bug-1352923 into lp:~maas-committers/maas/trunk

Proposed by Gavin Panella
Status: Merged
Approved by: Gavin Panella
Approved revision: no longer in the source branch.
Merged at revision: 3930
Proposed branch: lp:~allenap/maas/narrow-port-range--bug-1352923
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 299 lines (+154/-21)
4 files modified
INSTALL.txt (+15/-0)
src/maasserver/rpc/regionservice.py (+33/-5)
src/maasserver/rpc/testing/fixtures.py (+1/-1)
src/maasserver/rpc/tests/test_regionservice.py (+105/-15)
To merge this branch: bzr merge lp:~allenap/maas/narrow-port-range--bug-1352923
Reviewer Review Type Date Requested Status
Mike Pontillo (community) Approve
Review via email: mp+259911@code.launchpad.net

Commit message

Bind the region's RPC endpoints to a narrow range of ports between 5250 and 5259 inclusive.

To post a comment you must log in.
Revision history for this message
Mike Pontillo (mpontillo) wrote :

I had a quick chat with Gavin to answer some of my ignorant questions about this MP.

Q: Why do we need so many ports?!
A: Because each cluster is responsible for connecting to every regiond process. (each RPC endpoint is listed in JSON format if you hit <SERVER_URL>/MAAS/rpc/. (clusters periodically refresh this)

Q: Why can't the cluster just connect to any random regiond?
A: Because any arbitrary regiond needs to be able to use this control channel to talk to any arbitrary clusterd. Any individual regiond has no way to directly contact any arbitrary clusterd otherwise.

That being the case, I think this change looks good. I noted a minor issue with an 'except' clause below, and possibly a more robust test case.

review: Approve
Revision history for this message
Raphaël Badin (rvb) :
Revision history for this message
Gavin Panella (allenap) wrote :

Thanks dudes!

I've also added a little bit of documentation.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'INSTALL.txt'
2--- INSTALL.txt 2014-11-10 16:39:12 +0000
3+++ INSTALL.txt 2015-05-22 16:26:39 +0000
4@@ -323,5 +323,20 @@
5 https://www.symantec.com/business/support/index?page=content&id=HOWTO6019
6
7
8+Traffic between the region contoller and cluster controllers
9+------------------------------------------------------------
10+
11+* Each cluster controller must be able to:
12+
13+ * Initiate TCP connections (for HTTP) to each region controller on
14+ port 80 or port 5240, the choice of which depends on the setting of
15+ ``MAAS_URL``.
16+
17+ * Initiate TCP connections (for RPC) to each region controller between
18+ port 5250 and 5259 inclusive. This permits up to 10 ``maas-regiond``
19+ processes on each region controller host. At present this is not
20+ configurable.
21+
22+
23 Once everything is set up and running, you are ready to :doc:`start
24 enlisting nodes <nodes>`
25
26=== modified file 'src/maasserver/rpc/regionservice.py'
27--- src/maasserver/rpc/regionservice.py 2015-05-07 18:14:38 +0000
28+++ src/maasserver/rpc/regionservice.py 2015-05-22 16:26:39 +0000
29@@ -464,7 +464,10 @@
30 This is a service - in the Twisted sense - that exposes the
31 ``Region`` protocol on a port.
32
33- :ivar endpoints: The endpoints on which to listen.
34+ :ivar endpoints: The endpoints on which to listen, as a list of lists.
35+ Only one endpoint in each nested list will be bound (they will be
36+ tried in order until the first success). In this way it is possible to
37+ specify, say, a range of ports, but only bind one of them.
38 :ivar ports: The opened :py:class:`IListeningPort`s.
39 :ivar connections: Maps :class:`Region` connections to clusters.
40 :ivar waiters: Maps cluster idents to callers waiting for a connection.
41@@ -478,7 +481,10 @@
42
43 def __init__(self):
44 super(RegionService, self).__init__()
45- self.endpoints = [TCP4ServerEndpoint(reactor, 0)]
46+ self.endpoints = [
47+ [TCP4ServerEndpoint(reactor, port)
48+ for port in xrange(5250, 5260)],
49+ ]
50 self.connections = defaultdict(set)
51 self.waiters = defaultdict(set)
52 self.factory = Factory.forProtocol(RegionServer)
53@@ -529,15 +535,37 @@
54 elif result.check(defer.CancelledError):
55 pass # Ignore.
56 else:
57- log.err(result)
58+ log.err(result, "RegionServer endpoint failed to listen.")
59+
60+ @inlineCallbacks
61+ def _bindFirst(self, endpoints, factory):
62+ """Return the first endpoint to successfully listen.
63+
64+ :param endpoints: A sized iterable of `IStreamServerEndpoint`.
65+ :param factory: A protocol factory.
66+
67+ :return: A `Deferred` yielding a :class:`twisted.internet.tcp.Port` or
68+ the error encountered when trying to listen on the last of the
69+ given endpoints.
70+ """
71+ assert len(endpoints) > 0, "No endpoint options specified."
72+ last = len(endpoints) - 1
73+ for index, endpoint in enumerate(endpoints):
74+ try:
75+ port = yield endpoint.listen(factory)
76+ except:
77+ if index == last:
78+ raise
79+ else:
80+ returnValue(port)
81
82 @asynchronous
83 def startService(self):
84 """Start listening on an ephemeral port."""
85 super(RegionService, self).startService()
86 self.starting = defer.DeferredList(
87- (endpoint.listen(self.factory) for endpoint in self.endpoints),
88- consumeErrors=True)
89+ (self._bindFirst(endpoint_options, self.factory)
90+ for endpoint_options in self.endpoints))
91
92 def log_failure(failure):
93 if failure.check(defer.CancelledError):
94
95=== modified file 'src/maasserver/rpc/testing/fixtures.py'
96--- src/maasserver/rpc/testing/fixtures.py 2015-05-16 06:21:20 +0000
97+++ src/maasserver/rpc/testing/fixtures.py 2015-05-22 16:26:39 +0000
98@@ -272,7 +272,7 @@
99 assert isinstance(self.rpc.endpoints, list)
100 # Patch a fake UNIX endpoint in to the RPC service.
101 endpoint = endpoints.UNIXServerEndpoint(reactor, self.sockfile)
102- self.monkey.add_patch(self.rpc, "endpoints", [endpoint])
103+ self.monkey.add_patch(self.rpc, "endpoints", [[endpoint]])
104
105 # The RPC service uses a defaultdict(set) to manage connections, but
106 # let's check those assumptions.
107
108=== modified file 'src/maasserver/rpc/tests/test_regionservice.py'
109--- src/maasserver/rpc/tests/test_regionservice.py 2015-05-07 18:14:38 +0000
110+++ src/maasserver/rpc/tests/test_regionservice.py 2015-05-22 16:26:39 +0000
111@@ -83,6 +83,7 @@
112 ANY,
113 call,
114 Mock,
115+ sentinel,
116 )
117 import netaddr
118 from provisioningserver.network import discover_networks
119@@ -144,8 +145,10 @@
120 HasLength,
121 Is,
122 IsInstance,
123+ MatchesAll,
124 MatchesListwise,
125 MatchesStructure,
126+ Not,
127 )
128 from twisted.application.service import Service
129 from twisted.internet import (
130@@ -160,6 +163,7 @@
131 inlineCallbacks,
132 succeed,
133 )
134+from twisted.internet.endpoints import TCP4ServerEndpoint
135 from twisted.internet.error import ConnectionClosed
136 from twisted.internet.interfaces import IStreamServerEndpoint
137 from twisted.internet.protocol import Factory
138@@ -167,6 +171,7 @@
139 from twisted.protocols import amp
140 from twisted.python import log
141 from twisted.python.failure import Failure
142+from twisted.python.reflect import fullyQualifiedName
143 from zope.interface.verify import verifyObject
144
145
146@@ -1223,7 +1228,8 @@
147 self.assertThat(service.connections, IsInstance(defaultdict))
148 self.assertThat(service.connections.default_factory, Is(set))
149 self.assertThat(
150- service.endpoints, AllMatch(Provides(IStreamServerEndpoint)))
151+ service.endpoints, AllMatch(
152+ AllMatch(Provides(IStreamServerEndpoint))))
153 self.assertThat(service.factory, IsInstance(Factory))
154 self.assertThat(service.factory.protocol, Equals(RegionServer))
155
156@@ -1274,8 +1280,8 @@
157 service = RegionService()
158
159 # Return an inert Deferred from the listen() call.
160- endpoints = self.patch(service, "endpoints", [Mock()])
161- endpoints[0].listen.return_value = Deferred()
162+ endpoints = self.patch(service, "endpoints", [[Mock()]])
163+ endpoints[0][0].listen.return_value = Deferred()
164
165 service.startService()
166 self.assertThat(service.starting, IsInstance(Deferred))
167@@ -1296,28 +1302,112 @@
168
169 # Ensure that endpoint.listen fails with a obvious error.
170 exception = ValueError("This is not the messiah.")
171- endpoints = self.patch(service, "endpoints", [Mock()])
172- endpoints[0].listen.return_value = fail(exception)
173-
174- err_calls = []
175- self.patch(log, "err", err_calls.append)
176-
177- err_calls_expected = [
178+ endpoints = self.patch(service, "endpoints", [[Mock()]])
179+ endpoints[0][0].listen.return_value = fail(exception)
180+
181+ logged_failures = []
182+ self.patch(log, "msg", (
183+ lambda failure, **kw: logged_failures.append(failure)))
184+
185+ logged_failures_expected = [
186 AfterPreprocessing(
187 (lambda failure: failure.value),
188 Is(exception)),
189 ]
190
191 yield service.startService()
192- self.assertThat(err_calls, MatchesListwise(err_calls_expected))
193+ self.assertThat(
194+ logged_failures, MatchesListwise(logged_failures_expected))
195+
196+ @wait_for_reactor
197+ @inlineCallbacks
198+ def test_start_up_binds_first_of_endpoint_options(self):
199+ service = RegionService()
200+
201+ endpoint_1 = Mock()
202+ endpoint_1.listen.return_value = succeed(sentinel.port1)
203+ endpoint_2 = Mock()
204+ endpoint_2.listen.return_value = succeed(sentinel.port2)
205+ service.endpoints = [[endpoint_1, endpoint_2]]
206+
207+ yield service.startService()
208+
209+ self.assertThat(service.ports, Equals([sentinel.port1]))
210+
211+ @wait_for_reactor
212+ @inlineCallbacks
213+ def test_start_up_binds_first_of_real_endpoint_options(self):
214+ service = RegionService()
215+
216+ # endpoint_1.listen(...) will bind to a random high-numbered port.
217+ endpoint_1 = TCP4ServerEndpoint(reactor, 0)
218+ # endpoint_2.listen(...), if attempted, will crash because only root
219+ # (or a user with explicit capabilities) can do stuff like that. It's
220+ # a reasonable assumption that the user running these tests is not
221+ # root, but we'll check the port number later too to be sure.
222+ endpoint_2 = TCP4ServerEndpoint(reactor, 1)
223+
224+ service.endpoints = [[endpoint_1, endpoint_2]]
225+
226+ yield service.startService()
227+ self.addCleanup(wait_for_reactor(service.stopService))
228+
229+ # A single port has been bound.
230+ self.assertThat(service.ports, MatchesAll(
231+ HasLength(1), AllMatch(IsInstance(tcp.Port))))
232+
233+ # The port is not listening on port 1; i.e. a belt-n-braces check that
234+ # endpoint_2 was not used.
235+ [port] = service.ports
236+ self.assertThat(port.getHost().port, Not(Equals(1)))
237+
238+ @wait_for_reactor
239+ @inlineCallbacks
240+ def test_start_up_binds_first_successful_of_endpoint_options(self):
241+ service = RegionService()
242+
243+ endpoint_broken = Mock()
244+ endpoint_broken.listen.return_value = fail(factory.make_exception())
245+ endpoint_okay = Mock()
246+ endpoint_okay.listen.return_value = succeed(sentinel.port)
247+ service.endpoints = [[endpoint_broken, endpoint_okay]]
248+
249+ yield service.startService()
250+
251+ self.assertThat(service.ports, Equals([sentinel.port]))
252+
253+ @wait_for_reactor
254+ @inlineCallbacks
255+ def test_start_up_logs_failure_if_all_endpoint_options_fail(self):
256+ service = RegionService()
257+
258+ error_1 = factory.make_exception_type()
259+ error_2 = factory.make_exception_type()
260+
261+ endpoint_1 = Mock()
262+ endpoint_1.listen.return_value = fail(error_1())
263+ endpoint_2 = Mock()
264+ endpoint_2.listen.return_value = fail(error_2())
265+ service.endpoints = [[endpoint_1, endpoint_2]]
266+
267+ with TwistedLoggerFixture() as logger:
268+ yield service.startService()
269+
270+ self.assertDocTestMatches(
271+ """\
272+ RegionServer endpoint failed to listen.
273+ Traceback (most recent call last):
274+ Failure: %s:
275+ """ % fullyQualifiedName(error_2),
276+ logger.output)
277
278 @wait_for_reactor
279 def test_stopping_cancels_startup(self):
280 service = RegionService()
281
282 # Return an inert Deferred from the listen() call.
283- endpoints = self.patch(service, "endpoints", [Mock()])
284- endpoints[0].listen.return_value = Deferred()
285+ endpoints = self.patch(service, "endpoints", [[Mock()]])
286+ endpoints[0][0].listen.return_value = Deferred()
287
288 service.startService()
289 service.stopService()
290@@ -1391,8 +1481,8 @@
291
292 # Ensure that endpoint.listen fails with a obvious error.
293 exception = ValueError("This is a very naughty boy.")
294- endpoints = self.patch(service, "endpoints", [Mock()])
295- endpoints[0].listen.return_value = fail(exception)
296+ endpoints = self.patch(service, "endpoints", [[Mock()]])
297+ endpoints[0][0].listen.return_value = fail(exception)
298 # Suppress logged messages.
299 self.patch(log.theLogPublisher, "observers", [])
300