Merge ~r00ta/maas:lp-2027735-3.3 into maas:3.3
- Git
- lp:~r00ta/maas
- lp-2027735-3.3
- Merge into 3.3
Proposed by
Jacopo Rota
Status: | Merged |
---|---|
Approved by: | Jacopo Rota |
Approved revision: | c809961416b5dab75cb3ad1e4b8053fd6cba9d76 |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~r00ta/maas:lp-2027735-3.3 |
Merge into: | maas:3.3 |
Diff against target: |
444 lines (+121/-52) 7 files modified
snap/local/tree/bin/run-regiond (+1/-1) src/maasserver/regiondservices/http.py (+17/-5) src/maasserver/regiondservices/tests/test_http.py (+20/-7) src/maasserver/templates/http/regiond.nginx.conf.template (+3/-1) src/maasserver/tests/test_workers.py (+51/-20) src/maasserver/webapp.py (+9/-7) src/maasserver/workers.py (+20/-11) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
MAAS Lander | Approve | ||
Jacopo Rota | Approve | ||
Review via email: mp+446959@code.launchpad.net |
Commit message
Cherry pick fdd4e6da22c3f23
Description of the change
Cherry pick fdd4e6da22c3f23
To post a comment you must log in.
Revision history for this message
Jacopo Rota (r00ta) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/snap/local/tree/bin/run-regiond b/snap/local/tree/bin/run-regiond |
2 | index e7c86ea..f3f8e7d 100755 |
3 | --- a/snap/local/tree/bin/run-regiond |
4 | +++ b/snap/local/tree/bin/run-regiond |
5 | @@ -18,7 +18,7 @@ export MAAS_SYSLOG_LOG_DIR="$SNAP_COMMON/log" |
6 | export MAAS_IMAGES_KEYRING_FILEPATH="/snap/maas/current/usr/share/keyrings/ubuntu-cloudimage-keyring.gpg" |
7 | export MAAS_THIRD_PARTY_DRIVER_SETTINGS="$SNAP/etc/maas/drivers.yaml" |
8 | export MAAS_HTTP_CONFIG_DIR="$SNAP_DATA/http" |
9 | -export MAAS_HTTP_SOCKET_PATH="$SNAP_DATA/maas-regiond-webapp.sock" |
10 | +export MAAS_HTTP_SOCKET_WORKER_BASE_PATH="$SNAP_DATA/maas-regiond-webapp.sock" |
11 | |
12 | # ensure these dirs exist here since the region creates config files for other |
13 | # services |
14 | diff --git a/src/maasserver/regiondservices/http.py b/src/maasserver/regiondservices/http.py |
15 | index 0b8cb50..55c633a 100644 |
16 | --- a/src/maasserver/regiondservices/http.py |
17 | +++ b/src/maasserver/regiondservices/http.py |
18 | @@ -18,6 +18,7 @@ from maasserver.regiondservices import certificate_expiration_check |
19 | from maasserver.service_monitor import service_monitor |
20 | from maasserver.utils import load_template |
21 | from maasserver.utils.threads import deferToDatabase |
22 | +from maasserver.workers import WorkersService |
23 | from provisioningserver.certificates import Certificate |
24 | from provisioningserver.logger import LegacyLogger |
25 | from provisioningserver.path import get_maas_data_path |
26 | @@ -29,8 +30,17 @@ from provisioningserver.utils.fs import atomic_write, get_root_path, snap |
27 | |
28 | log = LegacyLogger() |
29 | |
30 | +REGIOND_SOCKET_PATH = os.getenv( |
31 | + "MAAS_HTTP_SOCKET_WORKER_BASE_PATH", |
32 | + get_maas_data_path("maas-regiond-webapp.sock"), |
33 | +) |
34 | + |
35 | |
36 | class RegionHTTPService(Service): |
37 | + @staticmethod |
38 | + def build_unix_socket_path_for_worker(worker_id: str) -> str: |
39 | + return f"{REGIOND_SOCKET_PATH}.{worker_id}" |
40 | + |
41 | def __init__(self, postgresListener: PostgresListenerService = None): |
42 | super().__init__() |
43 | self.listener = postgresListener |
44 | @@ -57,10 +67,12 @@ class RegionHTTPService(Service): |
45 | def _configure(self, configuration): |
46 | """Update the HTTP configuration for the region proxy service.""" |
47 | template = load_template("http", "regiond.nginx.conf.template") |
48 | - socket_path = os.getenv( |
49 | - "MAAS_HTTP_SOCKET_PATH", |
50 | - get_maas_data_path("maas-regiond-webapp.sock"), |
51 | - ) |
52 | + |
53 | + # Load balancing the unix sockets |
54 | + worker_socket_paths = [ |
55 | + self.build_unix_socket_path_for_worker(worker_id) |
56 | + for worker_id in WorkersService.get_worker_ids() |
57 | + ] |
58 | |
59 | if configuration.tls_enabled: |
60 | key_path, cert_path = self._create_cert_files(configuration.cert) |
61 | @@ -72,7 +84,7 @@ class RegionHTTPService(Service): |
62 | "tls_port": configuration.port, |
63 | "tls_key_path": key_path, |
64 | "tls_cert_path": cert_path, |
65 | - "socket_path": socket_path, |
66 | + "worker_socket_paths": worker_socket_paths, |
67 | "static_dir": str(get_root_path() / "usr/share/maas"), |
68 | } |
69 | rendered = template.substitute(environ).encode() |
70 | diff --git a/src/maasserver/regiondservices/tests/test_http.py b/src/maasserver/regiondservices/tests/test_http.py |
71 | index fc423b6..32f11a1 100644 |
72 | --- a/src/maasserver/regiondservices/tests/test_http.py |
73 | +++ b/src/maasserver/regiondservices/tests/test_http.py |
74 | @@ -15,6 +15,7 @@ from maasserver.secrets import SecretManager |
75 | from maasserver.testing.testcase import MAASTransactionServerTestCase |
76 | from maasserver.triggers.testing import TransactionalHelpersMixin |
77 | from maasserver.utils.threads import deferToDatabase |
78 | +from maasserver.workers import WorkersService |
79 | from maastesting.crochet import wait_for |
80 | from provisioningserver.testing.certificates import ( |
81 | get_sample_cert_with_cacerts, |
82 | @@ -85,6 +86,10 @@ class TestRegionHTTPService( |
83 | mock_cert_check.assert_called_once_with() |
84 | |
85 | def test_configure_not_snap(self): |
86 | + # MAASDataFixture updates `MAAS_DATA` in the environment to point to this new location. |
87 | + data_path = os.getenv("MAAS_DATA") |
88 | + http.REGIOND_SOCKET_PATH = f"{data_path}/maas-regiond-webapp.sock" |
89 | + |
90 | tempdir = self.make_dir() |
91 | nginx_conf = Path(tempdir) / "regiond.nginx.conf" |
92 | service = http.RegionHTTPService() |
93 | @@ -97,10 +102,14 @@ class TestRegionHTTPService( |
94 | |
95 | service._configure(http._Configuration(self.cert, port=5443)) |
96 | |
97 | - # MAASDataFixture updates `MAAS_DATA` in the environment to point to this new location. |
98 | - data_path = os.getenv("MAAS_DATA") |
99 | nginx_config = nginx_conf.read_text() |
100 | - self.assertIn(f"{data_path}/maas-regiond-webapp.sock;", nginx_config) |
101 | + |
102 | + worker_ids = WorkersService.get_worker_ids() |
103 | + for worker_id in worker_ids: |
104 | + self.assertIn( |
105 | + f"{data_path}/maas-regiond-webapp.sock.{worker_id};", |
106 | + nginx_config, |
107 | + ) |
108 | self.assertIn("root /usr/share/maas/web/static;", nginx_config) |
109 | self.assertIn("listen 5443 ssl http2;", nginx_config) |
110 | self.assertIn("ssl_certificate cert_path;", nginx_config) |
111 | @@ -111,11 +120,12 @@ class TestRegionHTTPService( |
112 | os, |
113 | "environ", |
114 | { |
115 | - "MAAS_HTTP_SOCKET_PATH": "/snap/maas/maas-regiond-webapp.sock", |
116 | "SNAP": "/snap/maas/5443", |
117 | "MAAS_HTTP_CONFIG_DIR": os.getenv("MAAS_DATA"), |
118 | }, |
119 | ) |
120 | + http.REGIOND_SOCKET_PATH = "/snap/maas/maas-regiond-webapp.sock" |
121 | + |
122 | tempdir = self.make_dir() |
123 | nginx_conf = Path(tempdir) / "regiond.nginx.conf" |
124 | service = http.RegionHTTPService() |
125 | @@ -129,9 +139,12 @@ class TestRegionHTTPService( |
126 | service._configure(http._Configuration(cert=self.cert, port=5443)) |
127 | |
128 | nginx_config = nginx_conf.read_text() |
129 | - self.assertIn( |
130 | - "server unix:/snap/maas/maas-regiond-webapp.sock;", nginx_config |
131 | - ) |
132 | + worker_ids = WorkersService.get_worker_ids() |
133 | + for worker_id in worker_ids: |
134 | + self.assertIn( |
135 | + f"server unix:/snap/maas/maas-regiond-webapp.sock.{worker_id};", |
136 | + nginx_config, |
137 | + ) |
138 | self.assertIn( |
139 | "root /snap/maas/5443/usr/share/maas/web/static;", nginx_config |
140 | ) |
141 | diff --git a/src/maasserver/templates/http/regiond.nginx.conf.template b/src/maasserver/templates/http/regiond.nginx.conf.template |
142 | index 13b1cbe..03480c3 100644 |
143 | --- a/src/maasserver/templates/http/regiond.nginx.conf.template |
144 | +++ b/src/maasserver/templates/http/regiond.nginx.conf.template |
145 | @@ -1,7 +1,9 @@ |
146 | # -*- mode: nginx -*- |
147 | |
148 | upstream regiond-webapp { |
149 | - server unix:{{socket_path}}; |
150 | +{{for worker_socket_path in worker_socket_paths}} |
151 | + server unix:{{worker_socket_path}}; |
152 | +{{endfor}} |
153 | } |
154 | |
155 | proxy_http_version 1.1; |
156 | diff --git a/src/maasserver/tests/test_workers.py b/src/maasserver/tests/test_workers.py |
157 | index 2da7ac8..8276a6b 100644 |
158 | --- a/src/maasserver/tests/test_workers.py |
159 | +++ b/src/maasserver/tests/test_workers.py |
160 | @@ -18,7 +18,6 @@ from maasserver.workers import ( |
161 | WorkersService, |
162 | ) |
163 | from maastesting.crochet import wait_for |
164 | -from maastesting.matchers import MockCalledOnceWith, MockCallsMatch |
165 | from maastesting.testcase import MAASTestCase |
166 | from provisioningserver.utils.twisted import DeferredValue |
167 | |
168 | @@ -29,14 +28,14 @@ class TestWorkersCount(MAASTestCase): |
169 | def test_MAX_WORKERS_COUNT_default_cpucount(self): |
170 | from maasserver.workers import MAX_WORKERS_COUNT |
171 | |
172 | - self.assertEqual(os.cpu_count(), MAX_WORKERS_COUNT) |
173 | + assert os.cpu_count() == MAX_WORKERS_COUNT |
174 | |
175 | def test_set_max_workers_count(self): |
176 | worker_count = random.randint(1, 8) |
177 | set_max_workers_count(worker_count) |
178 | from maasserver.workers import MAX_WORKERS_COUNT |
179 | |
180 | - self.assertEqual(worker_count, MAX_WORKERS_COUNT) |
181 | + assert worker_count == MAX_WORKERS_COUNT |
182 | |
183 | |
184 | class TestWorkersService(MAASTestCase): |
185 | @@ -47,31 +46,61 @@ class TestWorkersService(MAASTestCase): |
186 | |
187 | from maasserver.workers import MAX_WORKERS_COUNT |
188 | |
189 | - self.assertEqual(MAX_WORKERS_COUNT, service.worker_count) |
190 | - self.assertEqual(sys.argv[0], service.worker_cmd) |
191 | + assert MAX_WORKERS_COUNT == len(service.get_worker_ids()) |
192 | + assert sys.argv[0] == service.worker_cmd |
193 | |
194 | def test_calls_spawnWorkers_on_start(self): |
195 | service = WorkersService(reactor) |
196 | self.patch(service, "spawnWorkers") |
197 | service.startService() |
198 | - self.assertThat(service.spawnWorkers, MockCalledOnceWith()) |
199 | + service.spawnWorkers.assert_called_once() |
200 | |
201 | def test_spawnWorkers_calls__spawnWorker_for_missing_workers(self): |
202 | worker_count = random.randint(2, 16) |
203 | - service = WorkersService(reactor, worker_count=worker_count) |
204 | + set_max_workers_count(worker_count) |
205 | + service = WorkersService(reactor) |
206 | self.patch(service, "_spawnWorker") |
207 | pid = random.randint(1, 500) |
208 | - service.workers[pid] = WorkerProcess(service) |
209 | + service.workers[pid] = WorkerProcess(service, worker_id="0") |
210 | + service.missing_worker_ids.remove("0") |
211 | service.spawnWorkers() |
212 | - calls = [call(runningImport=True)] + [ |
213 | - call() for _ in range(worker_count - 2) |
214 | + calls = [call("1", runningImport=True)] + [ |
215 | + call(str(worker_id)) for worker_id in range(2, worker_count) |
216 | ] |
217 | - self.assertThat(service._spawnWorker, MockCallsMatch(*calls)) |
218 | + service._spawnWorker.assert_has_calls(calls) |
219 | + |
220 | + def test_registerWorker(self): |
221 | + worker_count = 2 |
222 | + set_max_workers_count(worker_count) |
223 | + service = WorkersService(reactor) |
224 | + self.patch(service, "_spawnWorker") |
225 | + |
226 | + worker = WorkerProcess(service, worker_id="0") |
227 | + worker.pid = 100 |
228 | + service.registerWorker(worker) |
229 | + |
230 | + service.spawnWorkers() |
231 | + calls = [call("1", runningImport=True)] |
232 | + service._spawnWorker.assert_has_calls(calls) |
233 | + |
234 | + def test_unregisterWorker(self): |
235 | + worker_count = 2 |
236 | + set_max_workers_count(worker_count) |
237 | + service = WorkersService(reactor) |
238 | + self.patch(service, "_spawnWorker") |
239 | + |
240 | + worker = WorkerProcess(service, worker_id="0") |
241 | + worker.pid = 100 |
242 | + service.registerWorker(worker) |
243 | + service.unregisterWorker(worker, None) |
244 | + calls = [call("1", runningImport=True), call("0")] |
245 | + service._spawnWorker.assert_has_calls(calls) |
246 | |
247 | @wait_for_reactor |
248 | @inlineCallbacks |
249 | def test_killWorker_spawns_another(self): |
250 | - service = WorkersService(reactor, worker_count=1, worker_cmd="cat") |
251 | + set_max_workers_count(1) |
252 | + service = WorkersService(reactor, worker_cmd="cat") |
253 | |
254 | dv = DeferredValue() |
255 | original_unregisterWorker = service.unregisterWorker |
256 | @@ -91,15 +120,16 @@ class TestWorkersService(MAASTestCase): |
257 | pid = list(service.workers.keys())[0] |
258 | service.killWorker(pid) |
259 | yield dv.get(timeout=2) |
260 | - self.assertNotIn(pid, service.workers) |
261 | - self.assertEqual(1, len(service.workers)) |
262 | + assert pid not in service.workers |
263 | + assert len(service.workers) == 1 |
264 | finally: |
265 | service.stopService() |
266 | |
267 | @wait_for_reactor |
268 | @inlineCallbacks |
269 | def test_termWorker_spawns_another(self): |
270 | - service = WorkersService(reactor, worker_count=1, worker_cmd="cat") |
271 | + set_max_workers_count(1) |
272 | + service = WorkersService(reactor, worker_cmd="cat") |
273 | |
274 | dv = DeferredValue() |
275 | original_unregisterWorker = service.unregisterWorker |
276 | @@ -119,15 +149,16 @@ class TestWorkersService(MAASTestCase): |
277 | pid = list(service.workers.keys())[0] |
278 | service.termWorker(pid) |
279 | yield dv.get(timeout=2) |
280 | - self.assertNotIn(pid, service.workers) |
281 | - self.assertEqual(1, len(service.workers)) |
282 | + assert pid not in service.workers |
283 | + assert len(service.workers) == 1 |
284 | finally: |
285 | service.stopService() |
286 | |
287 | @wait_for_reactor |
288 | @inlineCallbacks |
289 | def test_stopService_doesnt(self): |
290 | - service = WorkersService(reactor, worker_count=1, worker_cmd="cat") |
291 | + set_max_workers_count(1) |
292 | + service = WorkersService(reactor, worker_cmd="cat") |
293 | |
294 | dv = DeferredValue() |
295 | original_unregisterWorker = service.unregisterWorker |
296 | @@ -142,9 +173,9 @@ class TestWorkersService(MAASTestCase): |
297 | |
298 | try: |
299 | service.startService() |
300 | - self.assertEqual(1, len(service.workers)) |
301 | + assert len(service.workers) == 1 |
302 | finally: |
303 | service.stopService() |
304 | |
305 | yield dv.get(timeout=2) |
306 | - self.assertEqual({}, service.workers) |
307 | + assert len(service.workers) == 0 |
308 | diff --git a/src/maasserver/webapp.py b/src/maasserver/webapp.py |
309 | index 143364e..5363847 100644 |
310 | --- a/src/maasserver/webapp.py |
311 | +++ b/src/maasserver/webapp.py |
312 | @@ -20,6 +20,7 @@ from twisted.web.server import Request, Site |
313 | from twisted.web.wsgi import WSGIResource |
314 | |
315 | from maasserver import concurrency |
316 | +from maasserver.regiondservices.http import RegionHTTPService |
317 | from maasserver.utils.threads import deferToDatabase |
318 | from maasserver.utils.views import WebApplicationHandler |
319 | from maasserver.websockets.protocol import WebSocketFactory |
320 | @@ -29,7 +30,6 @@ from maasserver.websockets.websockets import ( |
321 | ) |
322 | from metadataserver.api_twisted import StatusHandlerResource |
323 | from provisioningserver.logger import LegacyLogger |
324 | -from provisioningserver.path import get_maas_data_path |
325 | from provisioningserver.utils.twisted import ( |
326 | asynchronous, |
327 | reducedWebLogFormatter, |
328 | @@ -227,16 +227,18 @@ class WebApplicationService(StreamServerEndpointService): |
329 | def _makeEndpoint(self): |
330 | """Make the endpoint for the webapp.""" |
331 | |
332 | - socket_path = os.getenv( |
333 | - "MAAS_HTTP_SOCKET_PATH", |
334 | - get_maas_data_path("maas-regiond-webapp.sock"), |
335 | + worker_id = os.getenv("MAAS_REGIOND_WORKER_ID", "") |
336 | + worker_socket_path = ( |
337 | + RegionHTTPService.build_unix_socket_path_for_worker(worker_id) |
338 | ) |
339 | |
340 | s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) |
341 | - if os.path.exists(socket_path): |
342 | - os.unlink(socket_path) |
343 | + try: |
344 | + os.unlink(worker_socket_path) |
345 | + except FileNotFoundError: |
346 | + pass |
347 | |
348 | - s.bind(socket_path) |
349 | + s.bind(worker_socket_path) |
350 | # Use a backlog of 50, which seems to be fairly common. |
351 | s.listen(50) |
352 | |
353 | diff --git a/src/maasserver/workers.py b/src/maasserver/workers.py |
354 | index 11c3a03..6384bd7 100644 |
355 | --- a/src/maasserver/workers.py |
356 | +++ b/src/maasserver/workers.py |
357 | @@ -26,9 +26,10 @@ def set_max_workers_count(worker_count): |
358 | |
359 | |
360 | class WorkerProcess(protocol.ProcessProtocol): |
361 | - def __init__(self, service, runningImport=False): |
362 | + def __init__(self, service, worker_id: str, runningImport=False): |
363 | super().__init__() |
364 | self.service = service |
365 | + self.worker_id = worker_id |
366 | self.runningImport = runningImport |
367 | |
368 | def connectionMade(self): |
369 | @@ -54,17 +55,19 @@ class WorkersService(service.Service): |
370 | Manages the lifecycle of the workers. |
371 | """ |
372 | |
373 | - def __init__(self, reactor, *, worker_count=None, worker_cmd=None): |
374 | + @staticmethod |
375 | + def get_worker_ids() -> list[str]: |
376 | + return [str(worker_id) for worker_id in range(MAX_WORKERS_COUNT)] |
377 | + |
378 | + def __init__(self, reactor, *, worker_cmd=None): |
379 | super().__init__() |
380 | self.reactor = reactor |
381 | self.stopping = False |
382 | - self.worker_count = worker_count |
383 | - if self.worker_count is None: |
384 | - self.worker_count = MAX_WORKERS_COUNT |
385 | self.worker_cmd = worker_cmd |
386 | if self.worker_cmd is None: |
387 | self.worker_cmd = sys.argv[0] |
388 | self.workers = {} |
389 | + self.missing_worker_ids = self.get_worker_ids() |
390 | |
391 | def startService(self): |
392 | """Start the workers.""" |
393 | @@ -83,27 +86,30 @@ class WorkersService(service.Service): |
394 | if self.stopping: |
395 | # Don't spwan new workers if the service is stopping. |
396 | return |
397 | - missing = self.worker_count - len(self.workers) |
398 | if self.workers: |
399 | runningImport = max( |
400 | worker.runningImport for worker in self.workers.values() |
401 | ) |
402 | else: |
403 | runningImport = False |
404 | - for _ in range(missing): |
405 | + |
406 | + # Work on a copy to avoid races |
407 | + for worker_id in list(self.missing_worker_ids): |
408 | if not runningImport: |
409 | - self._spawnWorker(runningImport=True) |
410 | + self._spawnWorker(worker_id, runningImport=True) |
411 | runningImport = True |
412 | else: |
413 | - self._spawnWorker() |
414 | + self._spawnWorker(worker_id) |
415 | |
416 | def registerWorker(self, worker): |
417 | """Register the worker.""" |
418 | self.workers[worker.pid] = worker |
419 | + self.missing_worker_ids.remove(worker.worker_id) |
420 | |
421 | def unregisterWorker(self, worker, status): |
422 | """Worker has died.""" |
423 | del self.workers[worker.pid] |
424 | + self.missing_worker_ids.append(worker.worker_id) |
425 | self.spawnWorkers() |
426 | |
427 | def termWorker(self, pid): |
428 | @@ -120,11 +126,14 @@ class WorkersService(service.Service): |
429 | log.msg("Killing worker pid:%d." % pid) |
430 | worker.signal("KILL") |
431 | |
432 | - def _spawnWorker(self, runningImport=False): |
433 | + def _spawnWorker(self, worker_id: str, runningImport: bool = False): |
434 | """Spawn a new worker.""" |
435 | - worker = WorkerProcess(self, runningImport=runningImport) |
436 | + worker = WorkerProcess( |
437 | + self, worker_id=worker_id, runningImport=runningImport |
438 | + ) |
439 | env = os.environ.copy() |
440 | env["MAAS_REGIOND_PROCESS_MODE"] = "worker" |
441 | + env["MAAS_REGIOND_WORKER_ID"] = worker_id |
442 | env["MAAS_REGIOND_WORKER_COUNT"] = str(MAX_WORKERS_COUNT) |
443 | if runningImport: |
444 | env["MAAS_REGIOND_RUN_IMPORTER_SERVICE"] = "true" |
UNIT TESTS
-b lp-2027735-3.3 lp:~r00ta/maas/+git/maas into -b 3.3 lp:~maas-committers/maas
STATUS: SUCCESS 75cb3ad1e4b8053 fd6cba9d76
COMMIT: c809961416b5dab