Merge lp:~hazmat/juju-deployer/refactor-placement-and-validate-feedback into lp:juju-deployer
- refactor-placement-and-validate-feedback
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 91 |
Proposed branch: | lp:~hazmat/juju-deployer/refactor-placement-and-validate-feedback |
Merge into: | lp:juju-deployer |
Diff against target: |
485 lines (+214/-129) 6 files modified
deployer/action/importer.py (+5/-4) deployer/deployment.py (+38/-102) deployer/feedback.py (+35/-0) deployer/service.py (+110/-0) deployer/tests/base.py (+8/-0) deployer/tests/test_deployment.py (+18/-23) |
To merge this branch: | bzr merge lp:~hazmat/juju-deployer/refactor-placement-and-validate-feedback |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Francesco Banconi (community) | Approve | ||
Review via email: mp+195903@code.launchpad.net |
Commit message
Description of the change
refactor of unit placement and deployment validation/
Francesco Banconi (frankban) wrote : | # |
Kapil Thangavelu (hazmat) wrote : | # |
just to be clear the branch atm is still backwards compatible with the previous gui (and other) usages, validate and others still raise ErrorExit, just that it now provides a place to cleanly override the handling of feedback handling on deployment.
Francesco Banconi (frankban) wrote : | # |
This branch looks good Kapil, thank you!
I'll just add some minors/comments below.
105 + def get_unit_
106 + if isinstance(svc, (str, unicode)):
This can also be written as isinstance(svc, basestring) <shrug>.
229 + def _handle_
So, this can be a plan:
after this branch is merged, I'll start working on another
one that modifies the guiserver module in the following ways:
- _validate raises an error if if any units is in error (the deployer will
refuse to proceed anyway, and there is no reason for the
guiserver to wait for that);
- import_bundle instantiates a Deployment subclass overriding
_handle_feedback so that an error is raised if the feedback
has errors.
How does it sound?
336 + if p.isdigit() and p == '0':
337 + continue
This could just be "if p == '0'" <shrug>.
372 + if placement.isdigit() and placement == "0":
373 + return self._format_
The same here.
382 + # Prefer continuing deployment with a new machine rather
383 + # than an in-progress abort.
384 + return None
Nice.
Preview Diff
1 | === modified file 'deployer/action/importer.py' |
2 | --- deployer/action/importer.py 2013-11-12 04:54:58 +0000 |
3 | +++ deployer/action/importer.py 2013-11-20 05:09:11 +0000 |
4 | @@ -48,10 +48,9 @@ |
5 | env_status = self.env.status() |
6 | reloaded = True |
7 | |
8 | + placement = self.deployment.get_unit_placement(svc, env_status) |
9 | for mid in range(cur_units, svc.num_units): |
10 | - mspec = self.deployment.get_unit_placement( |
11 | - svc, mid, env_status) |
12 | - self.env.add_unit(svc.name, mspec) |
13 | + self.env.add_unit(svc.name, placement.get(mid)) |
14 | else: |
15 | self.env.add_units(svc.name, abs(delta)) |
16 | |
17 | @@ -94,6 +93,8 @@ |
18 | else: |
19 | num_units = svc.num_units |
20 | |
21 | + placement = self.deployment.get_unit_placement(svc, env_status) |
22 | + |
23 | self.env.deploy( |
24 | svc.name, |
25 | charm.charm_url, |
26 | @@ -101,7 +102,7 @@ |
27 | svc.config, |
28 | svc.constraints, |
29 | num_units, |
30 | - self.deployment.get_unit_placement(svc, 0, env_status)) |
31 | + placement.get(0)) |
32 | |
33 | if svc.annotations: |
34 | self.log.debug(" Setting annotations") |
35 | |
36 | === modified file 'deployer/deployment.py' |
37 | --- deployer/deployment.py 2013-10-31 19:19:36 +0000 |
38 | +++ deployer/deployment.py 2013-11-20 05:09:11 +0000 |
39 | @@ -6,7 +6,8 @@ |
40 | import yaml |
41 | |
42 | from .charm import Charm |
43 | -from .service import Service |
44 | +from .feedback import Feedback |
45 | +from .service import Service, ServiceUnitPlacement |
46 | from .relation import Endpoint |
47 | from .utils import path_join, yaml_dump, ErrorExit, resolve_include |
48 | |
49 | @@ -56,58 +57,10 @@ |
50 | return -1 |
51 | return cmp(svc_a.name, svc_b.name) |
52 | |
53 | - @staticmethod |
54 | - def _format_placement(machine, container=None): |
55 | - if container: |
56 | - return "%s:%s" % (container, machine) |
57 | - else: |
58 | - return machine |
59 | - |
60 | - def get_unit_placement(self, svc, unit_number, status): |
61 | - unit_mapping = svc.unit_placement |
62 | - if not unit_mapping: |
63 | - return None |
64 | - if len(unit_mapping) <= unit_number: |
65 | - return None |
66 | - |
67 | - unit_placement = placement = str(unit_mapping[unit_number]) |
68 | - container = None |
69 | - u_idx = unit_number |
70 | - |
71 | - if ':' in unit_placement: |
72 | - container, placement = unit_placement.split(":") |
73 | - if '=' in placement: |
74 | - placement, u_idx = placement.split("=") |
75 | - |
76 | - if placement.isdigit() and placement == "0": |
77 | - return self._format_placement(placement, container) |
78 | - |
79 | - with_service = status['services'].get(placement) |
80 | - if with_service is None: |
81 | - # Should be caught in validate relations but sanity check |
82 | - # for concurrency. |
83 | - self.log.error( |
84 | - "Service %s to be deployed with non existant service %s", |
85 | - svc.name, placement) |
86 | - # Prefer continuing deployment with a new machine rather |
87 | - # than an in-progress abort. |
88 | - return None |
89 | - |
90 | - svc_units = with_service['units'] |
91 | - if len(svc_units) <= unit_number: |
92 | - self.log.warning( |
93 | - "Service:%s deploy-with Service:%s, but no with unit found", |
94 | - svc.name, placement) |
95 | - return None |
96 | - unit_names = svc_units.keys() |
97 | - unit_names.sort() |
98 | - machine = svc_units[unit_names[int(u_idx)]].get('machine') |
99 | - if not machine: |
100 | - self.log.warning( |
101 | - "Service:%s deploy-with unit missing machine %s", |
102 | - svc.name, unit_names[unit_number]) |
103 | - return None |
104 | - return self._format_placement(machine, container) |
105 | + def get_unit_placement(self, svc, status): |
106 | + if isinstance(svc, (str, unicode)): |
107 | + svc = self.get_service(svc) |
108 | + return ServiceUnitPlacement(svc, self, status) |
109 | |
110 | def get_relations(self): |
111 | if 'relations' not in self.data: |
112 | @@ -213,6 +166,7 @@ |
113 | self.log.debug("Resolving configuration") |
114 | # XXX TODO, rename resolve, validate relations |
115 | # against defined services |
116 | + feedback = Feedback() |
117 | for svc_name, svc_data in self.data.get('services', {}).items(): |
118 | if not 'options' in svc_data: |
119 | continue |
120 | @@ -222,16 +176,21 @@ |
121 | |
122 | for k, v in svc_data['options'].items(): |
123 | if not k in config: |
124 | - self.log.error( |
125 | - "Invalid config charm %s %s=%s", charm.name, k, v) |
126 | - raise ErrorExit() |
127 | + feedback.error( |
128 | + "Invalid config charm %s %s=%s" % (charm.name, k, v)) |
129 | + continue |
130 | iv = self._resolve_include(svc_name, k, v) |
131 | + if isinstance(iv, Feedback): |
132 | + feedback.extend(iv) |
133 | + continue |
134 | if iv is not None: |
135 | v = iv |
136 | options[k] = v |
137 | svc_data['options'] = options |
138 | + self._handle_feedback(feedback) |
139 | |
140 | def _resolve_include(self, svc_name, k, v): |
141 | + feedback = Feedback() |
142 | for include_type in ["file", "base64"]: |
143 | if (not isinstance(v, basestring) |
144 | or not v.startswith( |
145 | @@ -240,70 +199,47 @@ |
146 | include, fname = v.split("://", 1) |
147 | ip = resolve_include(fname, self.include_dirs) |
148 | if ip is None: |
149 | - self.log.warning( |
150 | - "Invalid config %s.%s include not found %s", |
151 | - svc_name, k, v) |
152 | + feedback.error( |
153 | + "Invalid config %s.%s include not found %s" % ( |
154 | + svc_name, k, v)) |
155 | continue |
156 | with open(ip) as fh: |
157 | v = fh.read() |
158 | if include_type == "base64": |
159 | v = b64encode(v) |
160 | return v |
161 | + if feedback: |
162 | + return feedback |
163 | |
164 | def validate_relations(self): |
165 | # Could extend to do interface matching against charms. |
166 | services = dict([(s.name, s) for s in self.get_services()]) |
167 | + feedback = Feedback() |
168 | for e_a, e_b in self.get_relations(): |
169 | for ep in [Endpoint(e_a), Endpoint(e_b)]: |
170 | if not ep.service in services: |
171 | - self.log.error( |
172 | + feedback.error( |
173 | ("Invalid relation in config," |
174 | - " service %s not found, rel %s"), |
175 | - ep.service, "%s <-> %s" % (e_a, e_b)) |
176 | - raise ErrorExit() |
177 | + " service %s not found, rel %s") % ( |
178 | + ep.service, "%s <-> %s" % (e_a, e_b))) |
179 | + continue |
180 | + self._handle_feedback(feedback) |
181 | |
182 | def validate_placement(self): |
183 | services = dict([(s.name, s) for s in self.get_services()]) |
184 | + feedback = Feedback() |
185 | for name, s in services.items(): |
186 | - unit_placement = s.unit_placement |
187 | - if unit_placement is None: |
188 | - continue |
189 | - if not isinstance(unit_placement, list): |
190 | - unit_placement = [unit_placement] |
191 | - unit_placement = map(str, unit_placement) |
192 | - for idx, p in enumerate(unit_placement): |
193 | - if ':' in p: |
194 | - container, p = p.split(':') |
195 | - if container not in ('lxc', 'kvm'): |
196 | - self.log.error( |
197 | - "Invalid service:%s placement: %s", |
198 | - name, unit_placement[idx]) |
199 | - raise ErrorExit() |
200 | - if '=' in p: |
201 | - p, u_idx = p.split("=") |
202 | - if not u_idx.isdigit(): |
203 | - self.log.error( |
204 | - "Invalid service:%s placement: %s", |
205 | - name, unit_placement[idx]) |
206 | - raise ErrorExit() |
207 | - if p.isdigit() and p == '0': |
208 | - continue |
209 | - elif p.isdigit(): |
210 | - self.log.error( |
211 | - "Service placement to machine not supported %s to %s", |
212 | - name, unit_placement[idx]) |
213 | - raise ErrorExit() |
214 | - elif p in services: |
215 | - if services[p].unit_placement: |
216 | - self.log.error( |
217 | - "Nested placement not supported %s -> %s -> %s" % ( |
218 | - name, p, services[p].unit_placement)) |
219 | - raise ErrorExit() |
220 | - else: |
221 | - self.log.error( |
222 | - "Invalid service placement %s to %s" % ( |
223 | - name, unit_placement[idx])) |
224 | - raise ErrorExit() |
225 | + placement = self.get_unit_placement(s, {}) |
226 | + feedback.extend(placement.validate()) |
227 | + self._handle_feedback(feedback) |
228 | + |
229 | + def _handle_feedback(self, feedback): |
230 | + for e in feedback.get_errors(): |
231 | + self.log.error(e) |
232 | + for w in feedback.get_warnings(): |
233 | + self.log.warning(w) |
234 | + if feedback.has_errors: |
235 | + raise ErrorExit() |
236 | |
237 | def save(self, path): |
238 | with open(path, "w") as fh: |
239 | |
240 | === added file 'deployer/feedback.py' |
241 | --- deployer/feedback.py 1970-01-01 00:00:00 +0000 |
242 | +++ deployer/feedback.py 2013-11-20 05:09:11 +0000 |
243 | @@ -0,0 +1,35 @@ |
244 | + |
245 | + |
246 | +WARN = 3 |
247 | +ERROR = 7 |
248 | + |
249 | + |
250 | +class Feedback(object): |
251 | + |
252 | + def __init__(self): |
253 | + self.messages = [] |
254 | + self.has_errors = False |
255 | + |
256 | + def error(self, msg): |
257 | + self.messages.append((ERROR, msg)) |
258 | + self.has_errors = True |
259 | + |
260 | + def warn(self, msg): |
261 | + self.messages.append((WARN, msg)) |
262 | + |
263 | + def __iter__(self): |
264 | + return iter(self.messages) |
265 | + |
266 | + def __nonzero__(self): |
267 | + return bool(self.messages) |
268 | + |
269 | + def get_errors(self): |
270 | + return [m for (m_kind, m) in self.messages if m_kind == ERROR] |
271 | + |
272 | + def get_warnings(self): |
273 | + return [m for (m_kind, m) in self.messages if m_kind == WARN] |
274 | + |
275 | + def extend(self, other): |
276 | + self.messages.extend(other.messages) |
277 | + if not self.has_errors and other.has_errors: |
278 | + self.has_errors = True |
279 | |
280 | === modified file 'deployer/service.py' |
281 | --- deployer/service.py 2013-11-20 02:28:01 +0000 |
282 | +++ deployer/service.py 2013-11-20 05:09:11 +0000 |
283 | @@ -1,3 +1,6 @@ |
284 | +from feedback import Feedback |
285 | + |
286 | + |
287 | class Service(object): |
288 | |
289 | def __init__(self, name, svc_data): |
290 | @@ -43,3 +46,110 @@ |
291 | @property |
292 | def expose(self): |
293 | return self.svc_data.get('expose', False) |
294 | + |
295 | + |
296 | +class ServiceUnitPlacement(object): |
297 | + |
298 | + def __init__(self, service, deployment, status): |
299 | + self.service = service |
300 | + self.deployment = deployment |
301 | + self.status = status |
302 | + |
303 | + @staticmethod |
304 | + def _format_placement(machine, container=None): |
305 | + if container: |
306 | + return "%s:%s" % (container, machine) |
307 | + else: |
308 | + return machine |
309 | + |
310 | + def validate(self): |
311 | + feedback = Feedback() |
312 | + |
313 | + unit_placement = self.service.unit_placement |
314 | + if unit_placement is None: |
315 | + return feedback |
316 | + |
317 | + if not isinstance(unit_placement, list): |
318 | + unit_placement = [unit_placement] |
319 | + unit_placement = map(str, unit_placement) |
320 | + |
321 | + services = dict([(s.name, s) for s in self.deployment.get_services()]) |
322 | + |
323 | + for idx, p in enumerate(unit_placement): |
324 | + if ':' in p: |
325 | + container, p = p.split(':') |
326 | + if container not in ('lxc', 'kvm'): |
327 | + feedback.error( |
328 | + "Invalid service:%s placement: %s" % ( |
329 | + self.service.name, unit_placement[idx])) |
330 | + if '=' in p: |
331 | + p, u_idx = p.split("=") |
332 | + if not u_idx.isdigit(): |
333 | + feedback.error( |
334 | + "Invalid service:%s placement: %s", |
335 | + self.service.name, unit_placement[idx]) |
336 | + if p.isdigit() and p == '0': |
337 | + continue |
338 | + elif p.isdigit(): |
339 | + feedback.error( |
340 | + "Service placement to machine not supported %s to %s", |
341 | + self.service.name, unit_placement[idx]) |
342 | + elif p in services: |
343 | + if services[p].unit_placement: |
344 | + feedback.error( |
345 | + "Nested placement not supported %s -> %s -> %s" % ( |
346 | + self.service.name, p, services[p].unit_placement)) |
347 | + else: |
348 | + feedback.error( |
349 | + "Invalid service placement %s to %s" % ( |
350 | + self.service.name, unit_placement[idx])) |
351 | + return feedback |
352 | + |
353 | + def get(self, unit_number): |
354 | + status = self.status |
355 | + svc = self.service |
356 | + |
357 | + unit_mapping = svc.unit_placement |
358 | + if not unit_mapping: |
359 | + return None |
360 | + if len(unit_mapping) <= unit_number: |
361 | + return None |
362 | + |
363 | + unit_placement = placement = str(unit_mapping[unit_number]) |
364 | + container = None |
365 | + u_idx = unit_number |
366 | + |
367 | + if ':' in unit_placement: |
368 | + container, placement = unit_placement.split(":") |
369 | + if '=' in placement: |
370 | + placement, u_idx = placement.split("=") |
371 | + |
372 | + if placement.isdigit() and placement == "0": |
373 | + return self._format_placement(placement, container) |
374 | + |
375 | + with_service = status['services'].get(placement) |
376 | + if with_service is None: |
377 | + # Should be caught in validate relations but sanity check |
378 | + # for concurrency. |
379 | + self.deployment.log.error( |
380 | + "Service %s to be deployed with non existant service %s", |
381 | + svc.name, placement) |
382 | + # Prefer continuing deployment with a new machine rather |
383 | + # than an in-progress abort. |
384 | + return None |
385 | + |
386 | + svc_units = with_service['units'] |
387 | + if len(svc_units) <= unit_number: |
388 | + self.deployment.log.warning( |
389 | + "Service:%s deploy-with Service:%s, but no with unit found", |
390 | + svc.name, placement) |
391 | + return None |
392 | + unit_names = svc_units.keys() |
393 | + unit_names.sort() |
394 | + machine = svc_units[unit_names[int(u_idx)]].get('machine') |
395 | + if not machine: |
396 | + self.deployment.log.warning( |
397 | + "Service:%s deploy-with unit missing machine %s", |
398 | + svc.name, unit_names[unit_number]) |
399 | + return None |
400 | + return self._format_placement(machine, container) |
401 | |
402 | === modified file 'deployer/tests/base.py' |
403 | --- deployer/tests/base.py 2013-07-22 18:26:06 +0000 |
404 | +++ deployer/tests/base.py 2013-11-20 05:09:11 +0000 |
405 | @@ -7,6 +7,7 @@ |
406 | import tempfile |
407 | |
408 | import deployer |
409 | +from deployer.config import ConfigStack |
410 | |
411 | |
412 | class Base(unittest.TestCase): |
413 | @@ -14,6 +15,13 @@ |
414 | test_data_dir = os.path.join( |
415 | os.path.dirname(inspect.getabsfile(deployer)), "tests", "test_data") |
416 | |
417 | + def get_named_deployment(self, file_name, stack_name): |
418 | + """ Get deployment from test_data file. |
419 | + """ |
420 | + return ConfigStack( |
421 | + [os.path.join( |
422 | + self.test_data_dir, file_name)]).get(stack_name) |
423 | + |
424 | def capture_logging(self, name="", level=logging.INFO, |
425 | log_file=None, formatter=None): |
426 | if log_file is None: |
427 | |
428 | === modified file 'deployer/tests/test_deployment.py' |
429 | --- deployer/tests/test_deployment.py 2013-10-31 19:19:36 +0000 |
430 | +++ deployer/tests/test_deployment.py 2013-11-20 05:09:11 +0000 |
431 | @@ -16,11 +16,6 @@ |
432 | self.output = setup_logging( |
433 | debug=True, verbose=True, stream=StringIO.StringIO()) |
434 | |
435 | - def get_named_deployment(self, file_name, stack_name): |
436 | - return ConfigStack( |
437 | - [os.path.join( |
438 | - self.test_data_dir, file_name)]).get(stack_name) |
439 | - |
440 | def test_deployer(self): |
441 | d = ConfigStack( |
442 | [os.path.join( |
443 | @@ -97,24 +92,24 @@ |
444 | 'nova-compute/2': {'machine': '1'}, |
445 | 'nova-compute/3': {'machine': '2'}, |
446 | 'nova-compute/4': {'machine': '3'}}}}} |
447 | - svc = d.get_service('ceph') |
448 | - self.assertEqual(d.get_unit_placement(svc, 0, status), '1') |
449 | - self.assertEqual(d.get_unit_placement(svc, 1, status), '2') |
450 | - self.assertEqual(d.get_unit_placement(svc, 2, status), None) |
451 | - |
452 | - svc = d.get_service('quantum') |
453 | - self.assertEqual(d.get_unit_placement(svc, 0, status), 'lxc:1') |
454 | - self.assertEqual(d.get_unit_placement(svc, 2, status), 'lxc:3') |
455 | - self.assertEqual(d.get_unit_placement(svc, 3, status), None) |
456 | - |
457 | - svc = d.get_service('verity') |
458 | - self.assertEqual(d.get_unit_placement(svc, 0, status), 'lxc:3') |
459 | - |
460 | - svc = d.get_service('mysql') |
461 | - self.assertEqual(d.get_unit_placement(svc, 0, status), '0') |
462 | - |
463 | - svc = d.get_service('semper') |
464 | - self.assertEqual(d.get_unit_placement(svc, 0, status), '3') |
465 | + placement = d.get_unit_placement('ceph', status) |
466 | + self.assertEqual(placement.get(0), '1') |
467 | + self.assertEqual(placement.get(1), '2') |
468 | + self.assertEqual(placement.get(2), None) |
469 | + |
470 | + placement = d.get_unit_placement('quantum', status) |
471 | + self.assertEqual(placement.get(0), 'lxc:1') |
472 | + self.assertEqual(placement.get(2), 'lxc:3') |
473 | + self.assertEqual(placement.get(3), None) |
474 | + |
475 | + placement = d.get_unit_placement('verity', status) |
476 | + self.assertEqual(placement.get(0), 'lxc:3') |
477 | + |
478 | + placement = d.get_unit_placement('mysql', status) |
479 | + self.assertEqual(placement.get(0), '0') |
480 | + |
481 | + placement = d.get_unit_placement('semper', status) |
482 | + self.assertEqual(placement.get(0), '3') |
483 | |
484 | def test_multiple_relations_no_weight(self): |
485 | data = {"relations": {"wordpress": {"consumes": ["mysql"]}, |
The following summarizes what discussed on IRC.
Currently the guiserver starts the deployer in a separate process using a concurrent. futures. ProcessPoolExec utor. The executor calls the validate and import_bundle functions in deployer.guiserver. This means that, as the branch is now, the log messages will appear in the guiserver logs, and that's good. But what we'd also need is something that the future returned by the executor can receive, i.e. import_ bundle) .
1) a picklable return value or 2) an picklable exception (that is what we do now).
Long story short, it would be nice to have that kind of feedback implemented as an exception itself, or returned by the functions we call (validate/
Some background:
From the guiserver perspective, in case of errors, we need some error info to sent back to the user, and this can be stored in future.result() or future.exception() (right now the latter is used, i.e. validate() and import_bundle() just return None or raise an exception_.
Currently sometimes the code path started by those functions just raises exceptions without messages. In those cases, we don't have feedback to send back to the guiserver client (usually the GUI).