Merge ~r00ta/maas:lp-2027735-3.4 into maas:3.4

Proposed by Jacopo Rota
Status: Merged
Approved by: Adam Collard
Approved revision: 5ba89afc21bd8c0ddd4751c02b1ee3156617b004
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~r00ta/maas:lp-2027735-3.4
Merge into: maas:3.4
Diff against target: 444 lines (+121/-52)
7 files modified
snap/local/tree/bin/run-regiond (+1/-1)
src/maasserver/regiondservices/http.py (+17/-5)
src/maasserver/regiondservices/tests/test_http.py (+20/-7)
src/maasserver/templates/http/regiond.nginx.conf.template (+3/-1)
src/maasserver/tests/test_workers.py (+51/-20)
src/maasserver/webapp.py (+9/-7)
src/maasserver/workers.py (+20/-11)
Reviewer Review Type Date Requested Status
MAAS Lander Approve
Jacopo Rota Approve
Review via email: mp+446954@code.launchpad.net

Commit message

Cherry pick fdd4e6da22c3f23bde83cc3f138a00ab3db42f46 - bind each regiond process to a dedicated unix socket and let nginx load balancing the requests

Description of the change

Cherry pick fdd4e6da22c3f23bde83cc3f138a00ab3db42f46 - bind each regiond process to a dedicated unix socket and let nginx load balancing the requests

To post a comment you must log in.
Revision history for this message
Jacopo Rota (r00ta) :
review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b lp-2027735-3.4 lp:~r00ta/maas/+git/maas into -b 3.4 lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/3057/console
COMMIT: 5ba89afc21bd8c0ddd4751c02b1ee3156617b004

review: Needs Fixing
Revision history for this message
Jacopo Rota (r00ta) wrote :

jenkins: !test

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b lp-2027735-3.4 lp:~r00ta/maas/+git/maas into -b 3.4 lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/3063/console
COMMIT: 5ba89afc21bd8c0ddd4751c02b1ee3156617b004

review: Needs Fixing
Revision history for this message
Jacopo Rota (r00ta) wrote :

jenkins: !test

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b lp-2027735-3.4 lp:~r00ta/maas/+git/maas into -b 3.4 lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 5ba89afc21bd8c0ddd4751c02b1ee3156617b004

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/snap/local/tree/bin/run-regiond b/snap/local/tree/bin/run-regiond
2index e7c86ea..f3f8e7d 100755
3--- a/snap/local/tree/bin/run-regiond
4+++ b/snap/local/tree/bin/run-regiond
5@@ -18,7 +18,7 @@ export MAAS_SYSLOG_LOG_DIR="$SNAP_COMMON/log"
6 export MAAS_IMAGES_KEYRING_FILEPATH="/snap/maas/current/usr/share/keyrings/ubuntu-cloudimage-keyring.gpg"
7 export MAAS_THIRD_PARTY_DRIVER_SETTINGS="$SNAP/etc/maas/drivers.yaml"
8 export MAAS_HTTP_CONFIG_DIR="$SNAP_DATA/http"
9-export MAAS_HTTP_SOCKET_PATH="$SNAP_DATA/maas-regiond-webapp.sock"
10+export MAAS_HTTP_SOCKET_WORKER_BASE_PATH="$SNAP_DATA/maas-regiond-webapp.sock"
11
12 # ensure these dirs exist here since the region creates config files for other
13 # services
14diff --git a/src/maasserver/regiondservices/http.py b/src/maasserver/regiondservices/http.py
15index 0b8cb50..55c633a 100644
16--- a/src/maasserver/regiondservices/http.py
17+++ b/src/maasserver/regiondservices/http.py
18@@ -18,6 +18,7 @@ from maasserver.regiondservices import certificate_expiration_check
19 from maasserver.service_monitor import service_monitor
20 from maasserver.utils import load_template
21 from maasserver.utils.threads import deferToDatabase
22+from maasserver.workers import WorkersService
23 from provisioningserver.certificates import Certificate
24 from provisioningserver.logger import LegacyLogger
25 from provisioningserver.path import get_maas_data_path
26@@ -29,8 +30,17 @@ from provisioningserver.utils.fs import atomic_write, get_root_path, snap
27
28 log = LegacyLogger()
29
30+REGIOND_SOCKET_PATH = os.getenv(
31+ "MAAS_HTTP_SOCKET_WORKER_BASE_PATH",
32+ get_maas_data_path("maas-regiond-webapp.sock"),
33+)
34+
35
36 class RegionHTTPService(Service):
37+ @staticmethod
38+ def build_unix_socket_path_for_worker(worker_id: str) -> str:
39+ return f"{REGIOND_SOCKET_PATH}.{worker_id}"
40+
41 def __init__(self, postgresListener: PostgresListenerService = None):
42 super().__init__()
43 self.listener = postgresListener
44@@ -57,10 +67,12 @@ class RegionHTTPService(Service):
45 def _configure(self, configuration):
46 """Update the HTTP configuration for the region proxy service."""
47 template = load_template("http", "regiond.nginx.conf.template")
48- socket_path = os.getenv(
49- "MAAS_HTTP_SOCKET_PATH",
50- get_maas_data_path("maas-regiond-webapp.sock"),
51- )
52+
53+ # Load balancing the unix sockets
54+ worker_socket_paths = [
55+ self.build_unix_socket_path_for_worker(worker_id)
56+ for worker_id in WorkersService.get_worker_ids()
57+ ]
58
59 if configuration.tls_enabled:
60 key_path, cert_path = self._create_cert_files(configuration.cert)
61@@ -72,7 +84,7 @@ class RegionHTTPService(Service):
62 "tls_port": configuration.port,
63 "tls_key_path": key_path,
64 "tls_cert_path": cert_path,
65- "socket_path": socket_path,
66+ "worker_socket_paths": worker_socket_paths,
67 "static_dir": str(get_root_path() / "usr/share/maas"),
68 }
69 rendered = template.substitute(environ).encode()
70diff --git a/src/maasserver/regiondservices/tests/test_http.py b/src/maasserver/regiondservices/tests/test_http.py
71index fc423b6..32f11a1 100644
72--- a/src/maasserver/regiondservices/tests/test_http.py
73+++ b/src/maasserver/regiondservices/tests/test_http.py
74@@ -15,6 +15,7 @@ from maasserver.secrets import SecretManager
75 from maasserver.testing.testcase import MAASTransactionServerTestCase
76 from maasserver.triggers.testing import TransactionalHelpersMixin
77 from maasserver.utils.threads import deferToDatabase
78+from maasserver.workers import WorkersService
79 from maastesting.crochet import wait_for
80 from provisioningserver.testing.certificates import (
81 get_sample_cert_with_cacerts,
82@@ -85,6 +86,10 @@ class TestRegionHTTPService(
83 mock_cert_check.assert_called_once_with()
84
85 def test_configure_not_snap(self):
86+ # MAASDataFixture updates `MAAS_DATA` in the environment to point to this new location.
87+ data_path = os.getenv("MAAS_DATA")
88+ http.REGIOND_SOCKET_PATH = f"{data_path}/maas-regiond-webapp.sock"
89+
90 tempdir = self.make_dir()
91 nginx_conf = Path(tempdir) / "regiond.nginx.conf"
92 service = http.RegionHTTPService()
93@@ -97,10 +102,14 @@ class TestRegionHTTPService(
94
95 service._configure(http._Configuration(self.cert, port=5443))
96
97- # MAASDataFixture updates `MAAS_DATA` in the environment to point to this new location.
98- data_path = os.getenv("MAAS_DATA")
99 nginx_config = nginx_conf.read_text()
100- self.assertIn(f"{data_path}/maas-regiond-webapp.sock;", nginx_config)
101+
102+ worker_ids = WorkersService.get_worker_ids()
103+ for worker_id in worker_ids:
104+ self.assertIn(
105+ f"{data_path}/maas-regiond-webapp.sock.{worker_id};",
106+ nginx_config,
107+ )
108 self.assertIn("root /usr/share/maas/web/static;", nginx_config)
109 self.assertIn("listen 5443 ssl http2;", nginx_config)
110 self.assertIn("ssl_certificate cert_path;", nginx_config)
111@@ -111,11 +120,12 @@ class TestRegionHTTPService(
112 os,
113 "environ",
114 {
115- "MAAS_HTTP_SOCKET_PATH": "/snap/maas/maas-regiond-webapp.sock",
116 "SNAP": "/snap/maas/5443",
117 "MAAS_HTTP_CONFIG_DIR": os.getenv("MAAS_DATA"),
118 },
119 )
120+ http.REGIOND_SOCKET_PATH = "/snap/maas/maas-regiond-webapp.sock"
121+
122 tempdir = self.make_dir()
123 nginx_conf = Path(tempdir) / "regiond.nginx.conf"
124 service = http.RegionHTTPService()
125@@ -129,9 +139,12 @@ class TestRegionHTTPService(
126 service._configure(http._Configuration(cert=self.cert, port=5443))
127
128 nginx_config = nginx_conf.read_text()
129- self.assertIn(
130- "server unix:/snap/maas/maas-regiond-webapp.sock;", nginx_config
131- )
132+ worker_ids = WorkersService.get_worker_ids()
133+ for worker_id in worker_ids:
134+ self.assertIn(
135+ f"server unix:/snap/maas/maas-regiond-webapp.sock.{worker_id};",
136+ nginx_config,
137+ )
138 self.assertIn(
139 "root /snap/maas/5443/usr/share/maas/web/static;", nginx_config
140 )
141diff --git a/src/maasserver/templates/http/regiond.nginx.conf.template b/src/maasserver/templates/http/regiond.nginx.conf.template
142index 1c0c539..681d6e6 100644
143--- a/src/maasserver/templates/http/regiond.nginx.conf.template
144+++ b/src/maasserver/templates/http/regiond.nginx.conf.template
145@@ -1,7 +1,9 @@
146 # -*- mode: nginx -*-
147
148 upstream regiond-webapp {
149- server unix:{{socket_path}};
150+{{for worker_socket_path in worker_socket_paths}}
151+ server unix:{{worker_socket_path}};
152+{{endfor}}
153 }
154
155 proxy_http_version 1.1;
156diff --git a/src/maasserver/tests/test_workers.py b/src/maasserver/tests/test_workers.py
157index 2da7ac8..8276a6b 100644
158--- a/src/maasserver/tests/test_workers.py
159+++ b/src/maasserver/tests/test_workers.py
160@@ -18,7 +18,6 @@ from maasserver.workers import (
161 WorkersService,
162 )
163 from maastesting.crochet import wait_for
164-from maastesting.matchers import MockCalledOnceWith, MockCallsMatch
165 from maastesting.testcase import MAASTestCase
166 from provisioningserver.utils.twisted import DeferredValue
167
168@@ -29,14 +28,14 @@ class TestWorkersCount(MAASTestCase):
169 def test_MAX_WORKERS_COUNT_default_cpucount(self):
170 from maasserver.workers import MAX_WORKERS_COUNT
171
172- self.assertEqual(os.cpu_count(), MAX_WORKERS_COUNT)
173+ assert os.cpu_count() == MAX_WORKERS_COUNT
174
175 def test_set_max_workers_count(self):
176 worker_count = random.randint(1, 8)
177 set_max_workers_count(worker_count)
178 from maasserver.workers import MAX_WORKERS_COUNT
179
180- self.assertEqual(worker_count, MAX_WORKERS_COUNT)
181+ assert worker_count == MAX_WORKERS_COUNT
182
183
184 class TestWorkersService(MAASTestCase):
185@@ -47,31 +46,61 @@ class TestWorkersService(MAASTestCase):
186
187 from maasserver.workers import MAX_WORKERS_COUNT
188
189- self.assertEqual(MAX_WORKERS_COUNT, service.worker_count)
190- self.assertEqual(sys.argv[0], service.worker_cmd)
191+ assert MAX_WORKERS_COUNT == len(service.get_worker_ids())
192+ assert sys.argv[0] == service.worker_cmd
193
194 def test_calls_spawnWorkers_on_start(self):
195 service = WorkersService(reactor)
196 self.patch(service, "spawnWorkers")
197 service.startService()
198- self.assertThat(service.spawnWorkers, MockCalledOnceWith())
199+ service.spawnWorkers.assert_called_once()
200
201 def test_spawnWorkers_calls__spawnWorker_for_missing_workers(self):
202 worker_count = random.randint(2, 16)
203- service = WorkersService(reactor, worker_count=worker_count)
204+ set_max_workers_count(worker_count)
205+ service = WorkersService(reactor)
206 self.patch(service, "_spawnWorker")
207 pid = random.randint(1, 500)
208- service.workers[pid] = WorkerProcess(service)
209+ service.workers[pid] = WorkerProcess(service, worker_id="0")
210+ service.missing_worker_ids.remove("0")
211 service.spawnWorkers()
212- calls = [call(runningImport=True)] + [
213- call() for _ in range(worker_count - 2)
214+ calls = [call("1", runningImport=True)] + [
215+ call(str(worker_id)) for worker_id in range(2, worker_count)
216 ]
217- self.assertThat(service._spawnWorker, MockCallsMatch(*calls))
218+ service._spawnWorker.assert_has_calls(calls)
219+
220+ def test_registerWorker(self):
221+ worker_count = 2
222+ set_max_workers_count(worker_count)
223+ service = WorkersService(reactor)
224+ self.patch(service, "_spawnWorker")
225+
226+ worker = WorkerProcess(service, worker_id="0")
227+ worker.pid = 100
228+ service.registerWorker(worker)
229+
230+ service.spawnWorkers()
231+ calls = [call("1", runningImport=True)]
232+ service._spawnWorker.assert_has_calls(calls)
233+
234+ def test_unregisterWorker(self):
235+ worker_count = 2
236+ set_max_workers_count(worker_count)
237+ service = WorkersService(reactor)
238+ self.patch(service, "_spawnWorker")
239+
240+ worker = WorkerProcess(service, worker_id="0")
241+ worker.pid = 100
242+ service.registerWorker(worker)
243+ service.unregisterWorker(worker, None)
244+ calls = [call("1", runningImport=True), call("0")]
245+ service._spawnWorker.assert_has_calls(calls)
246
247 @wait_for_reactor
248 @inlineCallbacks
249 def test_killWorker_spawns_another(self):
250- service = WorkersService(reactor, worker_count=1, worker_cmd="cat")
251+ set_max_workers_count(1)
252+ service = WorkersService(reactor, worker_cmd="cat")
253
254 dv = DeferredValue()
255 original_unregisterWorker = service.unregisterWorker
256@@ -91,15 +120,16 @@ class TestWorkersService(MAASTestCase):
257 pid = list(service.workers.keys())[0]
258 service.killWorker(pid)
259 yield dv.get(timeout=2)
260- self.assertNotIn(pid, service.workers)
261- self.assertEqual(1, len(service.workers))
262+ assert pid not in service.workers
263+ assert len(service.workers) == 1
264 finally:
265 service.stopService()
266
267 @wait_for_reactor
268 @inlineCallbacks
269 def test_termWorker_spawns_another(self):
270- service = WorkersService(reactor, worker_count=1, worker_cmd="cat")
271+ set_max_workers_count(1)
272+ service = WorkersService(reactor, worker_cmd="cat")
273
274 dv = DeferredValue()
275 original_unregisterWorker = service.unregisterWorker
276@@ -119,15 +149,16 @@ class TestWorkersService(MAASTestCase):
277 pid = list(service.workers.keys())[0]
278 service.termWorker(pid)
279 yield dv.get(timeout=2)
280- self.assertNotIn(pid, service.workers)
281- self.assertEqual(1, len(service.workers))
282+ assert pid not in service.workers
283+ assert len(service.workers) == 1
284 finally:
285 service.stopService()
286
287 @wait_for_reactor
288 @inlineCallbacks
289 def test_stopService_doesnt(self):
290- service = WorkersService(reactor, worker_count=1, worker_cmd="cat")
291+ set_max_workers_count(1)
292+ service = WorkersService(reactor, worker_cmd="cat")
293
294 dv = DeferredValue()
295 original_unregisterWorker = service.unregisterWorker
296@@ -142,9 +173,9 @@ class TestWorkersService(MAASTestCase):
297
298 try:
299 service.startService()
300- self.assertEqual(1, len(service.workers))
301+ assert len(service.workers) == 1
302 finally:
303 service.stopService()
304
305 yield dv.get(timeout=2)
306- self.assertEqual({}, service.workers)
307+ assert len(service.workers) == 0
308diff --git a/src/maasserver/webapp.py b/src/maasserver/webapp.py
309index 143364e..5363847 100644
310--- a/src/maasserver/webapp.py
311+++ b/src/maasserver/webapp.py
312@@ -20,6 +20,7 @@ from twisted.web.server import Request, Site
313 from twisted.web.wsgi import WSGIResource
314
315 from maasserver import concurrency
316+from maasserver.regiondservices.http import RegionHTTPService
317 from maasserver.utils.threads import deferToDatabase
318 from maasserver.utils.views import WebApplicationHandler
319 from maasserver.websockets.protocol import WebSocketFactory
320@@ -29,7 +30,6 @@ from maasserver.websockets.websockets import (
321 )
322 from metadataserver.api_twisted import StatusHandlerResource
323 from provisioningserver.logger import LegacyLogger
324-from provisioningserver.path import get_maas_data_path
325 from provisioningserver.utils.twisted import (
326 asynchronous,
327 reducedWebLogFormatter,
328@@ -227,16 +227,18 @@ class WebApplicationService(StreamServerEndpointService):
329 def _makeEndpoint(self):
330 """Make the endpoint for the webapp."""
331
332- socket_path = os.getenv(
333- "MAAS_HTTP_SOCKET_PATH",
334- get_maas_data_path("maas-regiond-webapp.sock"),
335+ worker_id = os.getenv("MAAS_REGIOND_WORKER_ID", "")
336+ worker_socket_path = (
337+ RegionHTTPService.build_unix_socket_path_for_worker(worker_id)
338 )
339
340 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
341- if os.path.exists(socket_path):
342- os.unlink(socket_path)
343+ try:
344+ os.unlink(worker_socket_path)
345+ except FileNotFoundError:
346+ pass
347
348- s.bind(socket_path)
349+ s.bind(worker_socket_path)
350 # Use a backlog of 50, which seems to be fairly common.
351 s.listen(50)
352
353diff --git a/src/maasserver/workers.py b/src/maasserver/workers.py
354index 9e9a0b1..2f40068 100644
355--- a/src/maasserver/workers.py
356+++ b/src/maasserver/workers.py
357@@ -26,9 +26,10 @@ def set_max_workers_count(worker_count):
358
359
360 class WorkerProcess(protocol.ProcessProtocol):
361- def __init__(self, service, runningImport=False):
362+ def __init__(self, service, worker_id: str, runningImport=False):
363 super().__init__()
364 self.service = service
365+ self.worker_id = worker_id
366 self.runningImport = runningImport
367
368 def connectionMade(self):
369@@ -54,17 +55,19 @@ class WorkersService(service.Service):
370 Manages the lifecycle of the workers.
371 """
372
373- def __init__(self, reactor, *, worker_count=None, worker_cmd=None):
374+ @staticmethod
375+ def get_worker_ids() -> list[str]:
376+ return [str(worker_id) for worker_id in range(MAX_WORKERS_COUNT)]
377+
378+ def __init__(self, reactor, *, worker_cmd=None):
379 super().__init__()
380 self.reactor = reactor
381 self.stopping = False
382- self.worker_count = worker_count
383- if self.worker_count is None:
384- self.worker_count = MAX_WORKERS_COUNT
385 self.worker_cmd = worker_cmd
386 if self.worker_cmd is None:
387 self.worker_cmd = sys.argv[0]
388 self.workers = {}
389+ self.missing_worker_ids = self.get_worker_ids()
390
391 def startService(self):
392 """Start the workers."""
393@@ -85,27 +88,30 @@ class WorkersService(service.Service):
394 if self.stopping:
395 # Don't spwan new workers if the service is stopping.
396 return
397- missing = self.worker_count - len(self.workers)
398 if self.workers:
399 runningImport = max(
400 worker.runningImport for worker in self.workers.values()
401 )
402 else:
403 runningImport = False
404- for _ in range(missing):
405+
406+ # Work on a copy to avoid races
407+ for worker_id in list(self.missing_worker_ids):
408 if not runningImport:
409- self._spawnWorker(runningImport=True)
410+ self._spawnWorker(worker_id, runningImport=True)
411 runningImport = True
412 else:
413- self._spawnWorker()
414+ self._spawnWorker(worker_id)
415
416 def registerWorker(self, worker):
417 """Register the worker."""
418 self.workers[worker.pid] = worker
419+ self.missing_worker_ids.remove(worker.worker_id)
420
421 def unregisterWorker(self, worker, status):
422 """Worker has died."""
423 del self.workers[worker.pid]
424+ self.missing_worker_ids.append(worker.worker_id)
425 self.spawnWorkers()
426
427 def termWorker(self, pid):
428@@ -122,11 +128,14 @@ class WorkersService(service.Service):
429 log.msg("Killing worker pid:%d." % pid)
430 worker.signal("KILL")
431
432- def _spawnWorker(self, runningImport=False):
433+ def _spawnWorker(self, worker_id: str, runningImport: bool = False):
434 """Spawn a new worker."""
435- worker = WorkerProcess(self, runningImport=runningImport)
436+ worker = WorkerProcess(
437+ self, worker_id=worker_id, runningImport=runningImport
438+ )
439 env = os.environ.copy()
440 env["MAAS_REGIOND_PROCESS_MODE"] = "worker"
441+ env["MAAS_REGIOND_WORKER_ID"] = worker_id
442 env["MAAS_REGIOND_WORKER_COUNT"] = str(MAX_WORKERS_COUNT)
443 if runningImport:
444 env["MAAS_REGIOND_RUN_IMPORTER_SERVICE"] = "true"

Subscribers

People subscribed via source and target branches