Merge lp:~gnuoy/charms/trusty/ceph/1453940 into lp:~openstack-charmers-archive/charms/trusty/ceph/next
- Trusty Tahr (14.04)
- 1453940
- Merge into next
Status: | Merged |
---|---|
Merged at revision: | 117 |
Proposed branch: | lp:~gnuoy/charms/trusty/ceph/1453940 |
Merge into: | lp:~openstack-charmers-archive/charms/trusty/ceph/next |
Diff against target: |
1195 lines (+885/-62) 7 files modified
hooks/ceph_broker.py (+12/-2) hooks/charmhelpers/contrib/storage/linux/ceph.py (+224/-2) hooks/hooks.py (+9/-1) tests/charmhelpers/contrib/amulet/utils.py (+234/-52) tests/charmhelpers/contrib/openstack/amulet/deployment.py (+20/-5) tests/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0) unit_tests/test_ceph_broker.py (+27/-0) |
To merge this branch: | bzr merge lp:~gnuoy/charms/trusty/ceph/1453940 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Edward Hope-Morley | Approve | ||
Review via email: mp+268614@code.launchpad.net |
Commit message
Description of the change
Edward Hope-Morley (hopem) : | # |
Liam Young (gnuoy) : | # |
Edward Hope-Morley (hopem) : | # |
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_unit_test #8678 ceph-next for gnuoy mp268614
UNIT OK: passed
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_lint_check #9384 ceph-next for gnuoy mp268614
LINT OK: passed
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_amulet_test #6247 ceph-next for gnuoy mp268614
AMULET OK: passed
Build: http://
- 138. By Liam Young
-
Charm helper sync
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_unit_test #8778 ceph-next for gnuoy mp268614
UNIT OK: passed
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_lint_check #9538 ceph-next for gnuoy mp268614
LINT OK: passed
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_amulet_test #6308 ceph-next for gnuoy mp268614
AMULET OK: passed
Build: http://
- 139. By Liam Young
-
Charm helper sync
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_lint_check #9710 ceph-next for gnuoy mp268614
LINT OK: passed
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_unit_test #8943 ceph-next for gnuoy mp268614
UNIT OK: passed
uosci-testing-bot (uosci-testing-bot) wrote : | # |
charm_amulet_test #6344 ceph-next for gnuoy mp268614
AMULET OK: passed
Build: http://
Preview Diff
1 | === modified file 'hooks/ceph_broker.py' |
2 | --- hooks/ceph_broker.py 2014-11-19 21:33:12 +0000 |
3 | +++ hooks/ceph_broker.py 2015-09-10 09:35:21 +0000 |
4 | @@ -31,10 +31,16 @@ |
5 | This is a versioned api. API version must be supplied by the client making |
6 | the request. |
7 | """ |
8 | + request_id = reqs.get('request-id') |
9 | try: |
10 | version = reqs.get('api-version') |
11 | if version == 1: |
12 | - return process_requests_v1(reqs['ops']) |
13 | + log('Processing request {}'.format(request_id), level=DEBUG) |
14 | + resp = process_requests_v1(reqs['ops']) |
15 | + if request_id: |
16 | + resp['request-id'] = request_id |
17 | + |
18 | + return resp |
19 | |
20 | except Exception as exc: |
21 | log(str(exc), level=ERROR) |
22 | @@ -44,7 +50,11 @@ |
23 | return {'exit-code': 1, 'stderr': msg} |
24 | |
25 | msg = ("Missing or invalid api version (%s)" % (version)) |
26 | - return {'exit-code': 1, 'stderr': msg} |
27 | + resp = {'exit-code': 1, 'stderr': msg} |
28 | + if request_id: |
29 | + resp['request-id'] = request_id |
30 | + |
31 | + return resp |
32 | |
33 | |
34 | def process_requests_v1(reqs): |
35 | |
36 | === modified file 'hooks/charmhelpers/contrib/storage/linux/ceph.py' |
37 | --- hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-07-29 10:48:21 +0000 |
38 | +++ hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-09-10 09:35:21 +0000 |
39 | @@ -28,6 +28,7 @@ |
40 | import shutil |
41 | import json |
42 | import time |
43 | +import uuid |
44 | |
45 | from subprocess import ( |
46 | check_call, |
47 | @@ -35,8 +36,10 @@ |
48 | CalledProcessError, |
49 | ) |
50 | from charmhelpers.core.hookenv import ( |
51 | + local_unit, |
52 | relation_get, |
53 | relation_ids, |
54 | + relation_set, |
55 | related_units, |
56 | log, |
57 | DEBUG, |
58 | @@ -411,17 +414,52 @@ |
59 | |
60 | The API is versioned and defaults to version 1. |
61 | """ |
62 | - def __init__(self, api_version=1): |
63 | + def __init__(self, api_version=1, request_id=None): |
64 | self.api_version = api_version |
65 | + if request_id: |
66 | + self.request_id = request_id |
67 | + else: |
68 | + self.request_id = str(uuid.uuid1()) |
69 | self.ops = [] |
70 | |
71 | def add_op_create_pool(self, name, replica_count=3): |
72 | self.ops.append({'op': 'create-pool', 'name': name, |
73 | 'replicas': replica_count}) |
74 | |
75 | + def set_ops(self, ops): |
76 | + """Set request ops to provided value. |
77 | + |
78 | + Useful for injecting ops that come from a previous request |
79 | + to allow comparisons to ensure validity. |
80 | + """ |
81 | + self.ops = ops |
82 | + |
83 | @property |
84 | def request(self): |
85 | - return json.dumps({'api-version': self.api_version, 'ops': self.ops}) |
86 | + return json.dumps({'api-version': self.api_version, 'ops': self.ops, |
87 | + 'request-id': self.request_id}) |
88 | + |
89 | + def _ops_equal(self, other): |
90 | + if len(self.ops) == len(other.ops): |
91 | + for req_no in range(0, len(self.ops)): |
92 | + for key in ['replicas', 'name', 'op']: |
93 | + if self.ops[req_no][key] != other.ops[req_no][key]: |
94 | + return False |
95 | + else: |
96 | + return False |
97 | + return True |
98 | + |
99 | + def __eq__(self, other): |
100 | + if not isinstance(other, self.__class__): |
101 | + return False |
102 | + if self.api_version == other.api_version and \ |
103 | + self._ops_equal(other): |
104 | + return True |
105 | + else: |
106 | + return False |
107 | + |
108 | + def __ne__(self, other): |
109 | + return not self.__eq__(other) |
110 | |
111 | |
112 | class CephBrokerRsp(object): |
113 | @@ -431,14 +469,198 @@ |
114 | |
115 | The API is versioned and defaults to version 1. |
116 | """ |
117 | + |
118 | def __init__(self, encoded_rsp): |
119 | self.api_version = None |
120 | self.rsp = json.loads(encoded_rsp) |
121 | |
122 | @property |
123 | + def request_id(self): |
124 | + return self.rsp.get('request-id') |
125 | + |
126 | + @property |
127 | def exit_code(self): |
128 | return self.rsp.get('exit-code') |
129 | |
130 | @property |
131 | def exit_msg(self): |
132 | return self.rsp.get('stderr') |
133 | + |
134 | + |
135 | +# Ceph Broker Conversation: |
136 | +# If a charm needs an action to be taken by ceph it can create a CephBrokerRq |
137 | +# and send that request to ceph via the ceph relation. The CephBrokerRq has a |
138 | +# unique id so that the client can identity which CephBrokerRsp is associated |
139 | +# with the request. Ceph will also respond to each client unit individually |
140 | +# creating a response key per client unit eg glance/0 will get a CephBrokerRsp |
141 | +# via key broker-rsp-glance-0 |
142 | +# |
143 | +# To use this the charm can just do something like: |
144 | +# |
145 | +# from charmhelpers.contrib.storage.linux.ceph import ( |
146 | +# send_request_if_needed, |
147 | +# is_request_complete, |
148 | +# CephBrokerRq, |
149 | +# ) |
150 | +# |
151 | +# @hooks.hook('ceph-relation-changed') |
152 | +# def ceph_changed(): |
153 | +# rq = CephBrokerRq() |
154 | +# rq.add_op_create_pool(name='poolname', replica_count=3) |
155 | +# |
156 | +# if is_request_complete(rq): |
157 | +# <Request complete actions> |
158 | +# else: |
159 | +# send_request_if_needed(get_ceph_request()) |
160 | +# |
161 | +# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example |
162 | +# of glance having sent a request to ceph which ceph has successfully processed |
163 | +# 'ceph:8': { |
164 | +# 'ceph/0': { |
165 | +# 'auth': 'cephx', |
166 | +# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}', |
167 | +# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}', |
168 | +# 'ceph-public-address': '10.5.44.103', |
169 | +# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==', |
170 | +# 'private-address': '10.5.44.103', |
171 | +# }, |
172 | +# 'glance/0': { |
173 | +# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", ' |
174 | +# '"ops": [{"replicas": 3, "name": "glance", ' |
175 | +# '"op": "create-pool"}]}'), |
176 | +# 'private-address': '10.5.44.109', |
177 | +# }, |
178 | +# } |
179 | + |
180 | +def get_previous_request(rid): |
181 | + """Return the last ceph broker request sent on a given relation |
182 | + |
183 | + @param rid: Relation id to query for request |
184 | + """ |
185 | + request = None |
186 | + broker_req = relation_get(attribute='broker_req', rid=rid, |
187 | + unit=local_unit()) |
188 | + if broker_req: |
189 | + request_data = json.loads(broker_req) |
190 | + request = CephBrokerRq(api_version=request_data['api-version'], |
191 | + request_id=request_data['request-id']) |
192 | + request.set_ops(request_data['ops']) |
193 | + |
194 | + return request |
195 | + |
196 | + |
197 | +def get_request_states(request): |
198 | + """Return a dict of requests per relation id with their corresponding |
199 | + completion state. |
200 | + |
201 | + This allows a charm, which has a request for ceph, to see whether there is |
202 | + an equivalent request already being processed and if so what state that |
203 | + request is in. |
204 | + |
205 | + @param request: A CephBrokerRq object |
206 | + """ |
207 | + complete = [] |
208 | + requests = {} |
209 | + for rid in relation_ids('ceph'): |
210 | + complete = False |
211 | + previous_request = get_previous_request(rid) |
212 | + if request == previous_request: |
213 | + sent = True |
214 | + complete = is_request_complete_for_rid(previous_request, rid) |
215 | + else: |
216 | + sent = False |
217 | + complete = False |
218 | + |
219 | + requests[rid] = { |
220 | + 'sent': sent, |
221 | + 'complete': complete, |
222 | + } |
223 | + |
224 | + return requests |
225 | + |
226 | + |
227 | +def is_request_sent(request): |
228 | + """Check to see if a functionally equivalent request has already been sent |
229 | + |
230 | + Returns True if a similair request has been sent |
231 | + |
232 | + @param request: A CephBrokerRq object |
233 | + """ |
234 | + states = get_request_states(request) |
235 | + for rid in states.keys(): |
236 | + if not states[rid]['sent']: |
237 | + return False |
238 | + |
239 | + return True |
240 | + |
241 | + |
242 | +def is_request_complete(request): |
243 | + """Check to see if a functionally equivalent request has already been |
244 | + completed |
245 | + |
246 | + Returns True if a similair request has been completed |
247 | + |
248 | + @param request: A CephBrokerRq object |
249 | + """ |
250 | + states = get_request_states(request) |
251 | + for rid in states.keys(): |
252 | + if not states[rid]['complete']: |
253 | + return False |
254 | + |
255 | + return True |
256 | + |
257 | + |
258 | +def is_request_complete_for_rid(request, rid): |
259 | + """Check if a given request has been completed on the given relation |
260 | + |
261 | + @param request: A CephBrokerRq object |
262 | + @param rid: Relation ID |
263 | + """ |
264 | + broker_key = get_broker_rsp_key() |
265 | + for unit in related_units(rid): |
266 | + rdata = relation_get(rid=rid, unit=unit) |
267 | + if rdata.get(broker_key): |
268 | + rsp = CephBrokerRsp(rdata.get(broker_key)) |
269 | + if rsp.request_id == request.request_id: |
270 | + if not rsp.exit_code: |
271 | + return True |
272 | + else: |
273 | + # The remote unit sent no reply targeted at this unit so either the |
274 | + # remote ceph cluster does not support unit targeted replies or it |
275 | + # has not processed our request yet. |
276 | + if rdata.get('broker_rsp'): |
277 | + request_data = json.loads(rdata['broker_rsp']) |
278 | + if request_data.get('request-id'): |
279 | + log('Ignoring legacy broker_rsp without unit key as remote ' |
280 | + 'service supports unit specific replies', level=DEBUG) |
281 | + else: |
282 | + log('Using legacy broker_rsp as remote service does not ' |
283 | + 'supports unit specific replies', level=DEBUG) |
284 | + rsp = CephBrokerRsp(rdata['broker_rsp']) |
285 | + if not rsp.exit_code: |
286 | + return True |
287 | + |
288 | + return False |
289 | + |
290 | + |
291 | +def get_broker_rsp_key(): |
292 | + """Return broker response key for this unit |
293 | + |
294 | + This is the key that ceph is going to use to pass request status |
295 | + information back to this unit |
296 | + """ |
297 | + return 'broker-rsp-' + local_unit().replace('/', '-') |
298 | + |
299 | + |
300 | +def send_request_if_needed(request): |
301 | + """Send broker request if an equivalent request has not already been sent |
302 | + |
303 | + @param request: A CephBrokerRq object |
304 | + """ |
305 | + if is_request_sent(request): |
306 | + log('Request already sent but not complete, not sending new request', |
307 | + level=DEBUG) |
308 | + else: |
309 | + for rid in relation_ids('ceph'): |
310 | + log('Sending request {}'.format(request.request_id), level=DEBUG) |
311 | + relation_set(relation_id=rid, broker_req=request.request) |
312 | |
313 | === modified file 'hooks/hooks.py' |
314 | --- hooks/hooks.py 2015-03-23 17:40:42 +0000 |
315 | +++ hooks/hooks.py 2015-09-10 09:35:21 +0000 |
316 | @@ -319,7 +319,15 @@ |
317 | log("Not leader - ignoring broker request", level=DEBUG) |
318 | else: |
319 | rsp = process_requests(settings['broker_req']) |
320 | - relation_set(relation_settings={'broker_rsp': rsp}) |
321 | + unit_id = remote_unit().replace('/', '-') |
322 | + unit_response_key = 'broker-rsp-' + unit_id |
323 | + # broker_rsp is being left for backward compatibility, |
324 | + # unit_response_key superscedes it |
325 | + data = { |
326 | + 'broker_rsp': rsp, |
327 | + unit_response_key: rsp, |
328 | + } |
329 | + relation_set(relation_settings=data) |
330 | else: |
331 | log('mon cluster not in quorum', level=DEBUG) |
332 | |
333 | |
334 | === modified file 'tests/charmhelpers/contrib/amulet/utils.py' |
335 | --- tests/charmhelpers/contrib/amulet/utils.py 2015-08-19 00:51:43 +0000 |
336 | +++ tests/charmhelpers/contrib/amulet/utils.py 2015-09-10 09:35:21 +0000 |
337 | @@ -19,9 +19,11 @@ |
338 | import logging |
339 | import os |
340 | import re |
341 | +import socket |
342 | import subprocess |
343 | import sys |
344 | import time |
345 | +import uuid |
346 | |
347 | import amulet |
348 | import distro_info |
349 | @@ -114,7 +116,7 @@ |
350 | # /!\ DEPRECATION WARNING (beisner): |
351 | # New and existing tests should be rewritten to use |
352 | # validate_services_by_name() as it is aware of init systems. |
353 | - self.log.warn('/!\\ DEPRECATION WARNING: use ' |
354 | + self.log.warn('DEPRECATION WARNING: use ' |
355 | 'validate_services_by_name instead of validate_services ' |
356 | 'due to init system differences.') |
357 | |
358 | @@ -269,33 +271,52 @@ |
359 | """Get last modification time of directory.""" |
360 | return sentry_unit.directory_stat(directory)['mtime'] |
361 | |
362 | - def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False): |
363 | - """Get process' start time. |
364 | - |
365 | - Determine start time of the process based on the last modification |
366 | - time of the /proc/pid directory. If pgrep_full is True, the process |
367 | - name is matched against the full command line. |
368 | - """ |
369 | - if pgrep_full: |
370 | - cmd = 'pgrep -o -f {}'.format(service) |
371 | - else: |
372 | - cmd = 'pgrep -o {}'.format(service) |
373 | - cmd = cmd + ' | grep -v pgrep || exit 0' |
374 | - cmd_out = sentry_unit.run(cmd) |
375 | - self.log.debug('CMDout: ' + str(cmd_out)) |
376 | - if cmd_out[0]: |
377 | - self.log.debug('Pid for %s %s' % (service, str(cmd_out[0]))) |
378 | - proc_dir = '/proc/{}'.format(cmd_out[0].strip()) |
379 | - return self._get_dir_mtime(sentry_unit, proc_dir) |
380 | + def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None): |
381 | + """Get start time of a process based on the last modification time |
382 | + of the /proc/pid directory. |
383 | + |
384 | + :sentry_unit: The sentry unit to check for the service on |
385 | + :service: service name to look for in process table |
386 | + :pgrep_full: [Deprecated] Use full command line search mode with pgrep |
387 | + :returns: epoch time of service process start |
388 | + :param commands: list of bash commands |
389 | + :param sentry_units: list of sentry unit pointers |
390 | + :returns: None if successful; Failure message otherwise |
391 | + """ |
392 | + if pgrep_full is not None: |
393 | + # /!\ DEPRECATION WARNING (beisner): |
394 | + # No longer implemented, as pidof is now used instead of pgrep. |
395 | + # https://bugs.launchpad.net/charm-helpers/+bug/1474030 |
396 | + self.log.warn('DEPRECATION WARNING: pgrep_full bool is no ' |
397 | + 'longer implemented re: lp 1474030.') |
398 | + |
399 | + pid_list = self.get_process_id_list(sentry_unit, service) |
400 | + pid = pid_list[0] |
401 | + proc_dir = '/proc/{}'.format(pid) |
402 | + self.log.debug('Pid for {} on {}: {}'.format( |
403 | + service, sentry_unit.info['unit_name'], pid)) |
404 | + |
405 | + return self._get_dir_mtime(sentry_unit, proc_dir) |
406 | |
407 | def service_restarted(self, sentry_unit, service, filename, |
408 | - pgrep_full=False, sleep_time=20): |
409 | + pgrep_full=None, sleep_time=20): |
410 | """Check if service was restarted. |
411 | |
412 | Compare a service's start time vs a file's last modification time |
413 | (such as a config file for that service) to determine if the service |
414 | has been restarted. |
415 | """ |
416 | + # /!\ DEPRECATION WARNING (beisner): |
417 | + # This method is prone to races in that no before-time is known. |
418 | + # Use validate_service_config_changed instead. |
419 | + |
420 | + # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now |
421 | + # used instead of pgrep. pgrep_full is still passed through to ensure |
422 | + # deprecation WARNS. lp1474030 |
423 | + self.log.warn('DEPRECATION WARNING: use ' |
424 | + 'validate_service_config_changed instead of ' |
425 | + 'service_restarted due to known races.') |
426 | + |
427 | time.sleep(sleep_time) |
428 | if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >= |
429 | self._get_file_mtime(sentry_unit, filename)): |
430 | @@ -304,15 +325,15 @@ |
431 | return False |
432 | |
433 | def service_restarted_since(self, sentry_unit, mtime, service, |
434 | - pgrep_full=False, sleep_time=20, |
435 | - retry_count=2): |
436 | + pgrep_full=None, sleep_time=20, |
437 | + retry_count=2, retry_sleep_time=30): |
438 | """Check if service was been started after a given time. |
439 | |
440 | Args: |
441 | sentry_unit (sentry): The sentry unit to check for the service on |
442 | mtime (float): The epoch time to check against |
443 | service (string): service name to look for in process table |
444 | - pgrep_full (boolean): Use full command line search mode with pgrep |
445 | + pgrep_full: [Deprecated] Use full command line search mode with pgrep |
446 | sleep_time (int): Seconds to sleep before looking for process |
447 | retry_count (int): If service is not found, how many times to retry |
448 | |
449 | @@ -321,30 +342,44 @@ |
450 | False if service is older than mtime or if service was |
451 | not found. |
452 | """ |
453 | - self.log.debug('Checking %s restarted since %s' % (service, mtime)) |
454 | + # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now |
455 | + # used instead of pgrep. pgrep_full is still passed through to ensure |
456 | + # deprecation WARNS. lp1474030 |
457 | + |
458 | + unit_name = sentry_unit.info['unit_name'] |
459 | + self.log.debug('Checking that %s service restarted since %s on ' |
460 | + '%s' % (service, mtime, unit_name)) |
461 | time.sleep(sleep_time) |
462 | - proc_start_time = self._get_proc_start_time(sentry_unit, service, |
463 | - pgrep_full) |
464 | - while retry_count > 0 and not proc_start_time: |
465 | - self.log.debug('No pid file found for service %s, will retry %i ' |
466 | - 'more times' % (service, retry_count)) |
467 | - time.sleep(30) |
468 | - proc_start_time = self._get_proc_start_time(sentry_unit, service, |
469 | - pgrep_full) |
470 | - retry_count = retry_count - 1 |
471 | + proc_start_time = None |
472 | + tries = 0 |
473 | + while tries <= retry_count and not proc_start_time: |
474 | + try: |
475 | + proc_start_time = self._get_proc_start_time(sentry_unit, |
476 | + service, |
477 | + pgrep_full) |
478 | + self.log.debug('Attempt {} to get {} proc start time on {} ' |
479 | + 'OK'.format(tries, service, unit_name)) |
480 | + except IOError: |
481 | + # NOTE(beisner) - race avoidance, proc may not exist yet. |
482 | + # https://bugs.launchpad.net/charm-helpers/+bug/1474030 |
483 | + self.log.debug('Attempt {} to get {} proc start time on {} ' |
484 | + 'failed'.format(tries, service, unit_name)) |
485 | + time.sleep(retry_sleep_time) |
486 | + tries += 1 |
487 | |
488 | if not proc_start_time: |
489 | self.log.warn('No proc start time found, assuming service did ' |
490 | 'not start') |
491 | return False |
492 | if proc_start_time >= mtime: |
493 | - self.log.debug('proc start time is newer than provided mtime' |
494 | - '(%s >= %s)' % (proc_start_time, mtime)) |
495 | + self.log.debug('Proc start time is newer than provided mtime' |
496 | + '(%s >= %s) on %s (OK)' % (proc_start_time, |
497 | + mtime, unit_name)) |
498 | return True |
499 | else: |
500 | - self.log.warn('proc start time (%s) is older than provided mtime ' |
501 | - '(%s), service did not restart' % (proc_start_time, |
502 | - mtime)) |
503 | + self.log.warn('Proc start time (%s) is older than provided mtime ' |
504 | + '(%s) on %s, service did not ' |
505 | + 'restart' % (proc_start_time, mtime, unit_name)) |
506 | return False |
507 | |
508 | def config_updated_since(self, sentry_unit, filename, mtime, |
509 | @@ -374,8 +409,9 @@ |
510 | return False |
511 | |
512 | def validate_service_config_changed(self, sentry_unit, mtime, service, |
513 | - filename, pgrep_full=False, |
514 | - sleep_time=20, retry_count=2): |
515 | + filename, pgrep_full=None, |
516 | + sleep_time=20, retry_count=2, |
517 | + retry_sleep_time=30): |
518 | """Check service and file were updated after mtime |
519 | |
520 | Args: |
521 | @@ -383,9 +419,10 @@ |
522 | mtime (float): The epoch time to check against |
523 | service (string): service name to look for in process table |
524 | filename (string): The file to check mtime of |
525 | - pgrep_full (boolean): Use full command line search mode with pgrep |
526 | - sleep_time (int): Seconds to sleep before looking for process |
527 | + pgrep_full: [Deprecated] Use full command line search mode with pgrep |
528 | + sleep_time (int): Initial sleep in seconds to pass to test helpers |
529 | retry_count (int): If service is not found, how many times to retry |
530 | + retry_sleep_time (int): Time in seconds to wait between retries |
531 | |
532 | Typical Usage: |
533 | u = OpenStackAmuletUtils(ERROR) |
534 | @@ -402,15 +439,25 @@ |
535 | mtime, False if service is older than mtime or if service was |
536 | not found or if filename was modified before mtime. |
537 | """ |
538 | - self.log.debug('Checking %s restarted since %s' % (service, mtime)) |
539 | - time.sleep(sleep_time) |
540 | - service_restart = self.service_restarted_since(sentry_unit, mtime, |
541 | - service, |
542 | - pgrep_full=pgrep_full, |
543 | - sleep_time=0, |
544 | - retry_count=retry_count) |
545 | - config_update = self.config_updated_since(sentry_unit, filename, mtime, |
546 | - sleep_time=0) |
547 | + |
548 | + # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now |
549 | + # used instead of pgrep. pgrep_full is still passed through to ensure |
550 | + # deprecation WARNS. lp1474030 |
551 | + |
552 | + service_restart = self.service_restarted_since( |
553 | + sentry_unit, mtime, |
554 | + service, |
555 | + pgrep_full=pgrep_full, |
556 | + sleep_time=sleep_time, |
557 | + retry_count=retry_count, |
558 | + retry_sleep_time=retry_sleep_time) |
559 | + |
560 | + config_update = self.config_updated_since( |
561 | + sentry_unit, |
562 | + filename, |
563 | + mtime, |
564 | + sleep_time=0) |
565 | + |
566 | return service_restart and config_update |
567 | |
568 | def get_sentry_time(self, sentry_unit): |
569 | @@ -428,7 +475,6 @@ |
570 | """Return a list of all Ubuntu releases in order of release.""" |
571 | _d = distro_info.UbuntuDistroInfo() |
572 | _release_list = _d.all |
573 | - self.log.debug('Ubuntu release list: {}'.format(_release_list)) |
574 | return _release_list |
575 | |
576 | def file_to_url(self, file_rel_path): |
577 | @@ -568,6 +614,142 @@ |
578 | |
579 | return None |
580 | |
581 | + def validate_sectionless_conf(self, file_contents, expected): |
582 | + """A crude conf parser. Useful to inspect configuration files which |
583 | + do not have section headers (as would be necessary in order to use |
584 | + the configparser). Such as openstack-dashboard or rabbitmq confs.""" |
585 | + for line in file_contents.split('\n'): |
586 | + if '=' in line: |
587 | + args = line.split('=') |
588 | + if len(args) <= 1: |
589 | + continue |
590 | + key = args[0].strip() |
591 | + value = args[1].strip() |
592 | + if key in expected.keys(): |
593 | + if expected[key] != value: |
594 | + msg = ('Config mismatch. Expected, actual: {}, ' |
595 | + '{}'.format(expected[key], value)) |
596 | + amulet.raise_status(amulet.FAIL, msg=msg) |
597 | + |
598 | + def get_unit_hostnames(self, units): |
599 | + """Return a dict of juju unit names to hostnames.""" |
600 | + host_names = {} |
601 | + for unit in units: |
602 | + host_names[unit.info['unit_name']] = \ |
603 | + str(unit.file_contents('/etc/hostname').strip()) |
604 | + self.log.debug('Unit host names: {}'.format(host_names)) |
605 | + return host_names |
606 | + |
607 | + def run_cmd_unit(self, sentry_unit, cmd): |
608 | + """Run a command on a unit, return the output and exit code.""" |
609 | + output, code = sentry_unit.run(cmd) |
610 | + if code == 0: |
611 | + self.log.debug('{} `{}` command returned {} ' |
612 | + '(OK)'.format(sentry_unit.info['unit_name'], |
613 | + cmd, code)) |
614 | + else: |
615 | + msg = ('{} `{}` command returned {} ' |
616 | + '{}'.format(sentry_unit.info['unit_name'], |
617 | + cmd, code, output)) |
618 | + amulet.raise_status(amulet.FAIL, msg=msg) |
619 | + return str(output), code |
620 | + |
621 | + def file_exists_on_unit(self, sentry_unit, file_name): |
622 | + """Check if a file exists on a unit.""" |
623 | + try: |
624 | + sentry_unit.file_stat(file_name) |
625 | + return True |
626 | + except IOError: |
627 | + return False |
628 | + except Exception as e: |
629 | + msg = 'Error checking file {}: {}'.format(file_name, e) |
630 | + amulet.raise_status(amulet.FAIL, msg=msg) |
631 | + |
632 | + def file_contents_safe(self, sentry_unit, file_name, |
633 | + max_wait=60, fatal=False): |
634 | + """Get file contents from a sentry unit. Wrap amulet file_contents |
635 | + with retry logic to address races where a file checks as existing, |
636 | + but no longer exists by the time file_contents is called. |
637 | + Return None if file not found. Optionally raise if fatal is True.""" |
638 | + unit_name = sentry_unit.info['unit_name'] |
639 | + file_contents = False |
640 | + tries = 0 |
641 | + while not file_contents and tries < (max_wait / 4): |
642 | + try: |
643 | + file_contents = sentry_unit.file_contents(file_name) |
644 | + except IOError: |
645 | + self.log.debug('Attempt {} to open file {} from {} ' |
646 | + 'failed'.format(tries, file_name, |
647 | + unit_name)) |
648 | + time.sleep(4) |
649 | + tries += 1 |
650 | + |
651 | + if file_contents: |
652 | + return file_contents |
653 | + elif not fatal: |
654 | + return None |
655 | + elif fatal: |
656 | + msg = 'Failed to get file contents from unit.' |
657 | + amulet.raise_status(amulet.FAIL, msg) |
658 | + |
659 | + def port_knock_tcp(self, host="localhost", port=22, timeout=15): |
660 | + """Open a TCP socket to check for a listening sevice on a host. |
661 | + |
662 | + :param host: host name or IP address, default to localhost |
663 | + :param port: TCP port number, default to 22 |
664 | + :param timeout: Connect timeout, default to 15 seconds |
665 | + :returns: True if successful, False if connect failed |
666 | + """ |
667 | + |
668 | + # Resolve host name if possible |
669 | + try: |
670 | + connect_host = socket.gethostbyname(host) |
671 | + host_human = "{} ({})".format(connect_host, host) |
672 | + except socket.error as e: |
673 | + self.log.warn('Unable to resolve address: ' |
674 | + '{} ({}) Trying anyway!'.format(host, e)) |
675 | + connect_host = host |
676 | + host_human = connect_host |
677 | + |
678 | + # Attempt socket connection |
679 | + try: |
680 | + knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
681 | + knock.settimeout(timeout) |
682 | + knock.connect((connect_host, port)) |
683 | + knock.close() |
684 | + self.log.debug('Socket connect OK for host ' |
685 | + '{} on port {}.'.format(host_human, port)) |
686 | + return True |
687 | + except socket.error as e: |
688 | + self.log.debug('Socket connect FAIL for' |
689 | + ' {} port {} ({})'.format(host_human, port, e)) |
690 | + return False |
691 | + |
692 | + def port_knock_units(self, sentry_units, port=22, |
693 | + timeout=15, expect_success=True): |
694 | + """Open a TCP socket to check for a listening sevice on each |
695 | + listed juju unit. |
696 | + |
697 | + :param sentry_units: list of sentry unit pointers |
698 | + :param port: TCP port number, default to 22 |
699 | + :param timeout: Connect timeout, default to 15 seconds |
700 | + :expect_success: True by default, set False to invert logic |
701 | + :returns: None if successful, Failure message otherwise |
702 | + """ |
703 | + for unit in sentry_units: |
704 | + host = unit.info['public-address'] |
705 | + connected = self.port_knock_tcp(host, port, timeout) |
706 | + if not connected and expect_success: |
707 | + return 'Socket connect failed.' |
708 | + elif connected and not expect_success: |
709 | + return 'Socket connected unexpectedly.' |
710 | + |
711 | + def get_uuid_epoch_stamp(self): |
712 | + """Returns a stamp string based on uuid4 and epoch time. Useful in |
713 | + generating test messages which need to be unique-ish.""" |
714 | + return '[{}-{}]'.format(uuid.uuid4(), time.time()) |
715 | + |
716 | +# amulet juju action helpers: |
717 | def run_action(self, unit_sentry, action, |
718 | _check_output=subprocess.check_output): |
719 | """Run the named action on a given unit sentry. |
720 | |
721 | === modified file 'tests/charmhelpers/contrib/openstack/amulet/deployment.py' |
722 | --- tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-19 00:51:43 +0000 |
723 | +++ tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-10 09:35:21 +0000 |
724 | @@ -44,8 +44,15 @@ |
725 | Determine if the local branch being tested is derived from its |
726 | stable or next (dev) branch, and based on this, use the corresonding |
727 | stable or next branches for the other_services.""" |
728 | + |
729 | + # Charms outside the lp:~openstack-charmers namespace |
730 | base_charms = ['mysql', 'mongodb', 'nrpe'] |
731 | |
732 | + # Force these charms to current series even when using an older series. |
733 | + # ie. Use trusty/nrpe even when series is precise, as the P charm |
734 | + # does not possess the necessary external master config and hooks. |
735 | + force_series_current = ['nrpe'] |
736 | + |
737 | if self.series in ['precise', 'trusty']: |
738 | base_series = self.series |
739 | else: |
740 | @@ -53,11 +60,17 @@ |
741 | |
742 | if self.stable: |
743 | for svc in other_services: |
744 | + if svc['name'] in force_series_current: |
745 | + base_series = self.current_next |
746 | + |
747 | temp = 'lp:charms/{}/{}' |
748 | svc['location'] = temp.format(base_series, |
749 | svc['name']) |
750 | else: |
751 | for svc in other_services: |
752 | + if svc['name'] in force_series_current: |
753 | + base_series = self.current_next |
754 | + |
755 | if svc['name'] in base_charms: |
756 | temp = 'lp:charms/{}/{}' |
757 | svc['location'] = temp.format(base_series, |
758 | @@ -77,21 +90,23 @@ |
759 | |
760 | services = other_services |
761 | services.append(this_service) |
762 | + |
763 | + # Charms which should use the source config option |
764 | use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph', |
765 | 'ceph-osd', 'ceph-radosgw'] |
766 | - # Most OpenStack subordinate charms do not expose an origin option |
767 | - # as that is controlled by the principle. |
768 | - ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe'] |
769 | + |
770 | + # Charms which can not use openstack-origin, ie. many subordinates |
771 | + no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe'] |
772 | |
773 | if self.openstack: |
774 | for svc in services: |
775 | - if svc['name'] not in use_source + ignore: |
776 | + if svc['name'] not in use_source + no_origin: |
777 | config = {'openstack-origin': self.openstack} |
778 | self.d.configure(svc['name'], config) |
779 | |
780 | if self.source: |
781 | for svc in services: |
782 | - if svc['name'] in use_source and svc['name'] not in ignore: |
783 | + if svc['name'] in use_source and svc['name'] not in no_origin: |
784 | config = {'source': self.source} |
785 | self.d.configure(svc['name'], config) |
786 | |
787 | |
788 | === modified file 'tests/charmhelpers/contrib/openstack/amulet/utils.py' |
789 | --- tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-06-29 14:25:54 +0000 |
790 | +++ tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-10 09:35:21 +0000 |
791 | @@ -27,6 +27,7 @@ |
792 | import heatclient.v1.client as heat_client |
793 | import keystoneclient.v2_0 as keystone_client |
794 | import novaclient.v1_1.client as nova_client |
795 | +import pika |
796 | import swiftclient |
797 | |
798 | from charmhelpers.contrib.amulet.utils import ( |
799 | @@ -602,3 +603,361 @@ |
800 | self.log.debug('Ceph {} samples (OK): ' |
801 | '{}'.format(sample_type, samples)) |
802 | return None |
803 | + |
804 | +# rabbitmq/amqp specific helpers: |
805 | + def add_rmq_test_user(self, sentry_units, |
806 | + username="testuser1", password="changeme"): |
807 | + """Add a test user via the first rmq juju unit, check connection as |
808 | + the new user against all sentry units. |
809 | + |
810 | + :param sentry_units: list of sentry unit pointers |
811 | + :param username: amqp user name, default to testuser1 |
812 | + :param password: amqp user password |
813 | + :returns: None if successful. Raise on error. |
814 | + """ |
815 | + self.log.debug('Adding rmq user ({})...'.format(username)) |
816 | + |
817 | + # Check that user does not already exist |
818 | + cmd_user_list = 'rabbitmqctl list_users' |
819 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list) |
820 | + if username in output: |
821 | + self.log.warning('User ({}) already exists, returning ' |
822 | + 'gracefully.'.format(username)) |
823 | + return |
824 | + |
825 | + perms = '".*" ".*" ".*"' |
826 | + cmds = ['rabbitmqctl add_user {} {}'.format(username, password), |
827 | + 'rabbitmqctl set_permissions {} {}'.format(username, perms)] |
828 | + |
829 | + # Add user via first unit |
830 | + for cmd in cmds: |
831 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd) |
832 | + |
833 | + # Check connection against the other sentry_units |
834 | + self.log.debug('Checking user connect against units...') |
835 | + for sentry_unit in sentry_units: |
836 | + connection = self.connect_amqp_by_unit(sentry_unit, ssl=False, |
837 | + username=username, |
838 | + password=password) |
839 | + connection.close() |
840 | + |
841 | + def delete_rmq_test_user(self, sentry_units, username="testuser1"): |
842 | + """Delete a rabbitmq user via the first rmq juju unit. |
843 | + |
844 | + :param sentry_units: list of sentry unit pointers |
845 | + :param username: amqp user name, default to testuser1 |
846 | + :param password: amqp user password |
847 | + :returns: None if successful or no such user. |
848 | + """ |
849 | + self.log.debug('Deleting rmq user ({})...'.format(username)) |
850 | + |
851 | + # Check that the user exists |
852 | + cmd_user_list = 'rabbitmqctl list_users' |
853 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list) |
854 | + |
855 | + if username not in output: |
856 | + self.log.warning('User ({}) does not exist, returning ' |
857 | + 'gracefully.'.format(username)) |
858 | + return |
859 | + |
860 | + # Delete the user |
861 | + cmd_user_del = 'rabbitmqctl delete_user {}'.format(username) |
862 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del) |
863 | + |
864 | + def get_rmq_cluster_status(self, sentry_unit): |
865 | + """Execute rabbitmq cluster status command on a unit and return |
866 | + the full output. |
867 | + |
868 | + :param unit: sentry unit |
869 | + :returns: String containing console output of cluster status command |
870 | + """ |
871 | + cmd = 'rabbitmqctl cluster_status' |
872 | + output, _ = self.run_cmd_unit(sentry_unit, cmd) |
873 | + self.log.debug('{} cluster_status:\n{}'.format( |
874 | + sentry_unit.info['unit_name'], output)) |
875 | + return str(output) |
876 | + |
877 | + def get_rmq_cluster_running_nodes(self, sentry_unit): |
878 | + """Parse rabbitmqctl cluster_status output string, return list of |
879 | + running rabbitmq cluster nodes. |
880 | + |
881 | + :param unit: sentry unit |
882 | + :returns: List containing node names of running nodes |
883 | + """ |
884 | + # NOTE(beisner): rabbitmqctl cluster_status output is not |
885 | + # json-parsable, do string chop foo, then json.loads that. |
886 | + str_stat = self.get_rmq_cluster_status(sentry_unit) |
887 | + if 'running_nodes' in str_stat: |
888 | + pos_start = str_stat.find("{running_nodes,") + 15 |
889 | + pos_end = str_stat.find("]},", pos_start) + 1 |
890 | + str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"') |
891 | + run_nodes = json.loads(str_run_nodes) |
892 | + return run_nodes |
893 | + else: |
894 | + return [] |
895 | + |
896 | + def validate_rmq_cluster_running_nodes(self, sentry_units): |
897 | + """Check that all rmq unit hostnames are represented in the |
898 | + cluster_status output of all units. |
899 | + |
900 | + :param host_names: dict of juju unit names to host names |
901 | + :param units: list of sentry unit pointers (all rmq units) |
902 | + :returns: None if successful, otherwise return error message |
903 | + """ |
904 | + host_names = self.get_unit_hostnames(sentry_units) |
905 | + errors = [] |
906 | + |
907 | + # Query every unit for cluster_status running nodes |
908 | + for query_unit in sentry_units: |
909 | + query_unit_name = query_unit.info['unit_name'] |
910 | + running_nodes = self.get_rmq_cluster_running_nodes(query_unit) |
911 | + |
912 | + # Confirm that every unit is represented in the queried unit's |
913 | + # cluster_status running nodes output. |
914 | + for validate_unit in sentry_units: |
915 | + val_host_name = host_names[validate_unit.info['unit_name']] |
916 | + val_node_name = 'rabbit@{}'.format(val_host_name) |
917 | + |
918 | + if val_node_name not in running_nodes: |
919 | + errors.append('Cluster member check failed on {}: {} not ' |
920 | + 'in {}\n'.format(query_unit_name, |
921 | + val_node_name, |
922 | + running_nodes)) |
923 | + if errors: |
924 | + return ''.join(errors) |
925 | + |
926 | + def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None): |
927 | + """Check a single juju rmq unit for ssl and port in the config file.""" |
928 | + host = sentry_unit.info['public-address'] |
929 | + unit_name = sentry_unit.info['unit_name'] |
930 | + |
931 | + conf_file = '/etc/rabbitmq/rabbitmq.config' |
932 | + conf_contents = str(self.file_contents_safe(sentry_unit, |
933 | + conf_file, max_wait=16)) |
934 | + # Checks |
935 | + conf_ssl = 'ssl' in conf_contents |
936 | + conf_port = str(port) in conf_contents |
937 | + |
938 | + # Port explicitly checked in config |
939 | + if port and conf_port and conf_ssl: |
940 | + self.log.debug('SSL is enabled @{}:{} ' |
941 | + '({})'.format(host, port, unit_name)) |
942 | + return True |
943 | + elif port and not conf_port and conf_ssl: |
944 | + self.log.debug('SSL is enabled @{} but not on port {} ' |
945 | + '({})'.format(host, port, unit_name)) |
946 | + return False |
947 | + # Port not checked (useful when checking that ssl is disabled) |
948 | + elif not port and conf_ssl: |
949 | + self.log.debug('SSL is enabled @{}:{} ' |
950 | + '({})'.format(host, port, unit_name)) |
951 | + return True |
952 | + elif not port and not conf_ssl: |
953 | + self.log.debug('SSL not enabled @{}:{} ' |
954 | + '({})'.format(host, port, unit_name)) |
955 | + return False |
956 | + else: |
957 | + msg = ('Unknown condition when checking SSL status @{}:{} ' |
958 | + '({})'.format(host, port, unit_name)) |
959 | + amulet.raise_status(amulet.FAIL, msg) |
960 | + |
961 | + def validate_rmq_ssl_enabled_units(self, sentry_units, port=None): |
962 | + """Check that ssl is enabled on rmq juju sentry units. |
963 | + |
964 | + :param sentry_units: list of all rmq sentry units |
965 | + :param port: optional ssl port override to validate |
966 | + :returns: None if successful, otherwise return error message |
967 | + """ |
968 | + for sentry_unit in sentry_units: |
969 | + if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port): |
970 | + return ('Unexpected condition: ssl is disabled on unit ' |
971 | + '({})'.format(sentry_unit.info['unit_name'])) |
972 | + return None |
973 | + |
974 | + def validate_rmq_ssl_disabled_units(self, sentry_units): |
975 | + """Check that ssl is enabled on listed rmq juju sentry units. |
976 | + |
977 | + :param sentry_units: list of all rmq sentry units |
978 | + :returns: True if successful. Raise on error. |
979 | + """ |
980 | + for sentry_unit in sentry_units: |
981 | + if self.rmq_ssl_is_enabled_on_unit(sentry_unit): |
982 | + return ('Unexpected condition: ssl is enabled on unit ' |
983 | + '({})'.format(sentry_unit.info['unit_name'])) |
984 | + return None |
985 | + |
986 | + def configure_rmq_ssl_on(self, sentry_units, deployment, |
987 | + port=None, max_wait=60): |
988 | + """Turn ssl charm config option on, with optional non-default |
989 | + ssl port specification. Confirm that it is enabled on every |
990 | + unit. |
991 | + |
992 | + :param sentry_units: list of sentry units |
993 | + :param deployment: amulet deployment object pointer |
994 | + :param port: amqp port, use defaults if None |
995 | + :param max_wait: maximum time to wait in seconds to confirm |
996 | + :returns: None if successful. Raise on error. |
997 | + """ |
998 | + self.log.debug('Setting ssl charm config option: on') |
999 | + |
1000 | + # Enable RMQ SSL |
1001 | + config = {'ssl': 'on'} |
1002 | + if port: |
1003 | + config['ssl_port'] = port |
1004 | + |
1005 | + deployment.configure('rabbitmq-server', config) |
1006 | + |
1007 | + # Confirm |
1008 | + tries = 0 |
1009 | + ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port) |
1010 | + while ret and tries < (max_wait / 4): |
1011 | + time.sleep(4) |
1012 | + self.log.debug('Attempt {}: {}'.format(tries, ret)) |
1013 | + ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port) |
1014 | + tries += 1 |
1015 | + |
1016 | + if ret: |
1017 | + amulet.raise_status(amulet.FAIL, ret) |
1018 | + |
1019 | + def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60): |
1020 | + """Turn ssl charm config option off, confirm that it is disabled |
1021 | + on every unit. |
1022 | + |
1023 | + :param sentry_units: list of sentry units |
1024 | + :param deployment: amulet deployment object pointer |
1025 | + :param max_wait: maximum time to wait in seconds to confirm |
1026 | + :returns: None if successful. Raise on error. |
1027 | + """ |
1028 | + self.log.debug('Setting ssl charm config option: off') |
1029 | + |
1030 | + # Disable RMQ SSL |
1031 | + config = {'ssl': 'off'} |
1032 | + deployment.configure('rabbitmq-server', config) |
1033 | + |
1034 | + # Confirm |
1035 | + tries = 0 |
1036 | + ret = self.validate_rmq_ssl_disabled_units(sentry_units) |
1037 | + while ret and tries < (max_wait / 4): |
1038 | + time.sleep(4) |
1039 | + self.log.debug('Attempt {}: {}'.format(tries, ret)) |
1040 | + ret = self.validate_rmq_ssl_disabled_units(sentry_units) |
1041 | + tries += 1 |
1042 | + |
1043 | + if ret: |
1044 | + amulet.raise_status(amulet.FAIL, ret) |
1045 | + |
1046 | + def connect_amqp_by_unit(self, sentry_unit, ssl=False, |
1047 | + port=None, fatal=True, |
1048 | + username="testuser1", password="changeme"): |
1049 | + """Establish and return a pika amqp connection to the rabbitmq service |
1050 | + running on a rmq juju unit. |
1051 | + |
1052 | + :param sentry_unit: sentry unit pointer |
1053 | + :param ssl: boolean, default to False |
1054 | + :param port: amqp port, use defaults if None |
1055 | + :param fatal: boolean, default to True (raises on connect error) |
1056 | + :param username: amqp user name, default to testuser1 |
1057 | + :param password: amqp user password |
1058 | + :returns: pika amqp connection pointer or None if failed and non-fatal |
1059 | + """ |
1060 | + host = sentry_unit.info['public-address'] |
1061 | + unit_name = sentry_unit.info['unit_name'] |
1062 | + |
1063 | + # Default port logic if port is not specified |
1064 | + if ssl and not port: |
1065 | + port = 5671 |
1066 | + elif not ssl and not port: |
1067 | + port = 5672 |
1068 | + |
1069 | + self.log.debug('Connecting to amqp on {}:{} ({}) as ' |
1070 | + '{}...'.format(host, port, unit_name, username)) |
1071 | + |
1072 | + try: |
1073 | + credentials = pika.PlainCredentials(username, password) |
1074 | + parameters = pika.ConnectionParameters(host=host, port=port, |
1075 | + credentials=credentials, |
1076 | + ssl=ssl, |
1077 | + connection_attempts=3, |
1078 | + retry_delay=5, |
1079 | + socket_timeout=1) |
1080 | + connection = pika.BlockingConnection(parameters) |
1081 | + assert connection.server_properties['product'] == 'RabbitMQ' |
1082 | + self.log.debug('Connect OK') |
1083 | + return connection |
1084 | + except Exception as e: |
1085 | + msg = ('amqp connection failed to {}:{} as ' |
1086 | + '{} ({})'.format(host, port, username, str(e))) |
1087 | + if fatal: |
1088 | + amulet.raise_status(amulet.FAIL, msg) |
1089 | + else: |
1090 | + self.log.warn(msg) |
1091 | + return None |
1092 | + |
1093 | + def publish_amqp_message_by_unit(self, sentry_unit, message, |
1094 | + queue="test", ssl=False, |
1095 | + username="testuser1", |
1096 | + password="changeme", |
1097 | + port=None): |
1098 | + """Publish an amqp message to a rmq juju unit. |
1099 | + |
1100 | + :param sentry_unit: sentry unit pointer |
1101 | + :param message: amqp message string |
1102 | + :param queue: message queue, default to test |
1103 | + :param username: amqp user name, default to testuser1 |
1104 | + :param password: amqp user password |
1105 | + :param ssl: boolean, default to False |
1106 | + :param port: amqp port, use defaults if None |
1107 | + :returns: None. Raises exception if publish failed. |
1108 | + """ |
1109 | + self.log.debug('Publishing message to {} queue:\n{}'.format(queue, |
1110 | + message)) |
1111 | + connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl, |
1112 | + port=port, |
1113 | + username=username, |
1114 | + password=password) |
1115 | + |
1116 | + # NOTE(beisner): extra debug here re: pika hang potential: |
1117 | + # https://github.com/pika/pika/issues/297 |
1118 | + # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw |
1119 | + self.log.debug('Defining channel...') |
1120 | + channel = connection.channel() |
1121 | + self.log.debug('Declaring queue...') |
1122 | + channel.queue_declare(queue=queue, auto_delete=False, durable=True) |
1123 | + self.log.debug('Publishing message...') |
1124 | + channel.basic_publish(exchange='', routing_key=queue, body=message) |
1125 | + self.log.debug('Closing channel...') |
1126 | + channel.close() |
1127 | + self.log.debug('Closing connection...') |
1128 | + connection.close() |
1129 | + |
1130 | + def get_amqp_message_by_unit(self, sentry_unit, queue="test", |
1131 | + username="testuser1", |
1132 | + password="changeme", |
1133 | + ssl=False, port=None): |
1134 | + """Get an amqp message from a rmq juju unit. |
1135 | + |
1136 | + :param sentry_unit: sentry unit pointer |
1137 | + :param queue: message queue, default to test |
1138 | + :param username: amqp user name, default to testuser1 |
1139 | + :param password: amqp user password |
1140 | + :param ssl: boolean, default to False |
1141 | + :param port: amqp port, use defaults if None |
1142 | + :returns: amqp message body as string. Raise if get fails. |
1143 | + """ |
1144 | + connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl, |
1145 | + port=port, |
1146 | + username=username, |
1147 | + password=password) |
1148 | + channel = connection.channel() |
1149 | + method_frame, _, body = channel.basic_get(queue) |
1150 | + |
1151 | + if method_frame: |
1152 | + self.log.debug('Retreived message from {} queue:\n{}'.format(queue, |
1153 | + body)) |
1154 | + channel.basic_ack(method_frame.delivery_tag) |
1155 | + channel.close() |
1156 | + connection.close() |
1157 | + return body |
1158 | + else: |
1159 | + msg = 'No message retrieved.' |
1160 | + amulet.raise_status(amulet.FAIL, msg) |
1161 | |
1162 | === modified file 'unit_tests/test_ceph_broker.py' |
1163 | --- unit_tests/test_ceph_broker.py 2014-11-09 12:58:04 +0000 |
1164 | +++ unit_tests/test_ceph_broker.py 2015-09-10 09:35:21 +0000 |
1165 | @@ -70,3 +70,30 @@ |
1166 | mock_pool_exists.assert_called_with(service='admin', name='foo') |
1167 | self.assertFalse(mock_create_pool.called) |
1168 | self.assertEqual(json.loads(rc), {'exit-code': 0}) |
1169 | + |
1170 | + @mock.patch('ceph_broker.create_pool') |
1171 | + @mock.patch('ceph_broker.pool_exists') |
1172 | + @mock.patch('ceph_broker.log') |
1173 | + def test_process_requests_create_pool_rid(self, mock_log, mock_pool_exists, |
1174 | + mock_create_pool): |
1175 | + mock_pool_exists.return_value = False |
1176 | + reqs = json.dumps({'api-version': 1, |
1177 | + 'request-id': '1ef5aede', |
1178 | + 'ops': [{'op': 'create-pool', 'name': |
1179 | + 'foo', 'replicas': 3}]}) |
1180 | + rc = ceph_broker.process_requests(reqs) |
1181 | + mock_pool_exists.assert_called_with(service='admin', name='foo') |
1182 | + mock_create_pool.assert_called_with(service='admin', name='foo', |
1183 | + replicas=3) |
1184 | + self.assertEqual(json.loads(rc)['exit-code'], 0) |
1185 | + self.assertEqual(json.loads(rc)['request-id'], '1ef5aede') |
1186 | + |
1187 | + @mock.patch('ceph_broker.log') |
1188 | + def test_process_requests_invalid_api_rid(self, mock_log): |
1189 | + reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede', |
1190 | + 'ops': [{'op': 'create-pool'}]}) |
1191 | + rc = ceph_broker.process_requests(reqs) |
1192 | + self.assertEqual(json.loads(rc)['exit-code'], 1) |
1193 | + self.assertEqual(json.loads(rc)['stderr'], |
1194 | + "Missing or invalid api version (0)") |
1195 | + self.assertEqual(json.loads(rc)['request-id'], '1ef5aede') |
I've tested this out and it seems good. Tried deploying then scaling for
ceph, cinder, glance and nova-compute and all had updated ceph.conf on the
client side. The use of request-id combined with unit-name scoped responses
seems to nicely avoid any collisions as well. I have a few comments on some
minor fixups inline.