Merge lp:~1chb1n/charm-helpers/amulet-rmq-helpers into lp:charm-helpers
- amulet-rmq-helpers
- Merge into devel
Status: | Merged |
---|---|
Merged at revision: | 445 |
Proposed branch: | lp:~1chb1n/charm-helpers/amulet-rmq-helpers |
Merge into: | lp:charm-helpers |
Diff against target: |
827 lines (+613/-57) 3 files modified
charmhelpers/contrib/amulet/utils.py (+234/-52) charmhelpers/contrib/openstack/amulet/deployment.py (+20/-5) charmhelpers/contrib/openstack/amulet/utils.py (+359/-0) |
To merge this branch: | bzr merge lp:~1chb1n/charm-helpers/amulet-rmq-helpers |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Liam Young (community) | Approve | ||
David Ames (community) | Approve | ||
Review via email: mp+267859@code.launchpad.net |
Commit message
Description of the change
Add amulet & openstack/amulet helpers for rabbitmq-server tests; resolve misc. race conditions in amulet helpers.
Add exception handling to the retry logic in service_
Add deprecation WARN to the service_restarted amulet helper.
Update _determine_
Note re: validate_
This charmhelpers proposal is pre-requisite for the refactored rmq tests:
https:/
- 440. By Ryan Beisner
-
lint cleanup
- 441. By Ryan Beisner
-
clarify comment, fix typo, update add_rmq_test_user per review
Ryan Beisner (1chb1n) wrote : | # |
Thank you for your review. Items addressed and pushed.
David Ames (thedac) wrote : | # |
Looks good to me.
Good work.
- 442. By Ryan Beisner
-
re-merge lp:~1chb1n/charm-helpers/amulet-svc-restart-race for updates
Preview Diff
1 | === modified file 'charmhelpers/contrib/amulet/utils.py' |
2 | --- charmhelpers/contrib/amulet/utils.py 2015-08-17 10:47:36 +0000 |
3 | +++ charmhelpers/contrib/amulet/utils.py 2015-09-02 01:36:03 +0000 |
4 | @@ -19,9 +19,11 @@ |
5 | import logging |
6 | import os |
7 | import re |
8 | +import socket |
9 | import subprocess |
10 | import sys |
11 | import time |
12 | +import uuid |
13 | |
14 | import amulet |
15 | import distro_info |
16 | @@ -114,7 +116,7 @@ |
17 | # /!\ DEPRECATION WARNING (beisner): |
18 | # New and existing tests should be rewritten to use |
19 | # validate_services_by_name() as it is aware of init systems. |
20 | - self.log.warn('/!\\ DEPRECATION WARNING: use ' |
21 | + self.log.warn('DEPRECATION WARNING: use ' |
22 | 'validate_services_by_name instead of validate_services ' |
23 | 'due to init system differences.') |
24 | |
25 | @@ -269,33 +271,52 @@ |
26 | """Get last modification time of directory.""" |
27 | return sentry_unit.directory_stat(directory)['mtime'] |
28 | |
29 | - def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False): |
30 | - """Get process' start time. |
31 | - |
32 | - Determine start time of the process based on the last modification |
33 | - time of the /proc/pid directory. If pgrep_full is True, the process |
34 | - name is matched against the full command line. |
35 | - """ |
36 | - if pgrep_full: |
37 | - cmd = 'pgrep -o -f {}'.format(service) |
38 | - else: |
39 | - cmd = 'pgrep -o {}'.format(service) |
40 | - cmd = cmd + ' | grep -v pgrep || exit 0' |
41 | - cmd_out = sentry_unit.run(cmd) |
42 | - self.log.debug('CMDout: ' + str(cmd_out)) |
43 | - if cmd_out[0]: |
44 | - self.log.debug('Pid for %s %s' % (service, str(cmd_out[0]))) |
45 | - proc_dir = '/proc/{}'.format(cmd_out[0].strip()) |
46 | - return self._get_dir_mtime(sentry_unit, proc_dir) |
47 | + def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None): |
48 | + """Get start time of a process based on the last modification time |
49 | + of the /proc/pid directory. |
50 | + |
51 | + :sentry_unit: The sentry unit to check for the service on |
52 | + :service: service name to look for in process table |
53 | + :pgrep_full: [Deprecated] Use full command line search mode with pgrep |
54 | + :returns: epoch time of service process start |
55 | + :param commands: list of bash commands |
56 | + :param sentry_units: list of sentry unit pointers |
57 | + :returns: None if successful; Failure message otherwise |
58 | + """ |
59 | + if pgrep_full is not None: |
60 | + # /!\ DEPRECATION WARNING (beisner): |
61 | + # No longer implemented, as pidof is now used instead of pgrep. |
62 | + # https://bugs.launchpad.net/charm-helpers/+bug/1474030 |
63 | + self.log.warn('DEPRECATION WARNING: pgrep_full bool is no ' |
64 | + 'longer implemented re: lp 1474030.') |
65 | + |
66 | + pid_list = self.get_process_id_list(sentry_unit, service) |
67 | + pid = pid_list[0] |
68 | + proc_dir = '/proc/{}'.format(pid) |
69 | + self.log.debug('Pid for {} on {}: {}'.format( |
70 | + service, sentry_unit.info['unit_name'], pid)) |
71 | + |
72 | + return self._get_dir_mtime(sentry_unit, proc_dir) |
73 | |
74 | def service_restarted(self, sentry_unit, service, filename, |
75 | - pgrep_full=False, sleep_time=20): |
76 | + pgrep_full=None, sleep_time=20): |
77 | """Check if service was restarted. |
78 | |
79 | Compare a service's start time vs a file's last modification time |
80 | (such as a config file for that service) to determine if the service |
81 | has been restarted. |
82 | """ |
83 | + # /!\ DEPRECATION WARNING (beisner): |
84 | + # This method is prone to races in that no before-time is known. |
85 | + # Use validate_service_config_changed instead. |
86 | + |
87 | + # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now |
88 | + # used instead of pgrep. pgrep_full is still passed through to ensure |
89 | + # deprecation WARNS. lp1474030 |
90 | + self.log.warn('DEPRECATION WARNING: use ' |
91 | + 'validate_service_config_changed instead of ' |
92 | + 'service_restarted due to known races.') |
93 | + |
94 | time.sleep(sleep_time) |
95 | if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >= |
96 | self._get_file_mtime(sentry_unit, filename)): |
97 | @@ -304,15 +325,15 @@ |
98 | return False |
99 | |
100 | def service_restarted_since(self, sentry_unit, mtime, service, |
101 | - pgrep_full=False, sleep_time=20, |
102 | - retry_count=2): |
103 | + pgrep_full=None, sleep_time=20, |
104 | + retry_count=2, retry_sleep_time=30): |
105 | """Check if service was been started after a given time. |
106 | |
107 | Args: |
108 | sentry_unit (sentry): The sentry unit to check for the service on |
109 | mtime (float): The epoch time to check against |
110 | service (string): service name to look for in process table |
111 | - pgrep_full (boolean): Use full command line search mode with pgrep |
112 | + pgrep_full: [Deprecated] Use full command line search mode with pgrep |
113 | sleep_time (int): Seconds to sleep before looking for process |
114 | retry_count (int): If service is not found, how many times to retry |
115 | |
116 | @@ -321,30 +342,44 @@ |
117 | False if service is older than mtime or if service was |
118 | not found. |
119 | """ |
120 | - self.log.debug('Checking %s restarted since %s' % (service, mtime)) |
121 | + # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now |
122 | + # used instead of pgrep. pgrep_full is still passed through to ensure |
123 | + # deprecation WARNS. lp1474030 |
124 | + |
125 | + unit_name = sentry_unit.info['unit_name'] |
126 | + self.log.debug('Checking that %s service restarted since %s on ' |
127 | + '%s' % (service, mtime, unit_name)) |
128 | time.sleep(sleep_time) |
129 | - proc_start_time = self._get_proc_start_time(sentry_unit, service, |
130 | - pgrep_full) |
131 | - while retry_count > 0 and not proc_start_time: |
132 | - self.log.debug('No pid file found for service %s, will retry %i ' |
133 | - 'more times' % (service, retry_count)) |
134 | - time.sleep(30) |
135 | - proc_start_time = self._get_proc_start_time(sentry_unit, service, |
136 | - pgrep_full) |
137 | - retry_count = retry_count - 1 |
138 | + proc_start_time = None |
139 | + tries = 0 |
140 | + while tries <= retry_count and not proc_start_time: |
141 | + try: |
142 | + proc_start_time = self._get_proc_start_time(sentry_unit, |
143 | + service, |
144 | + pgrep_full) |
145 | + self.log.debug('Attempt {} to get {} proc start time on {} ' |
146 | + 'OK'.format(tries, service, unit_name)) |
147 | + except IOError: |
148 | + # NOTE(beisner) - race avoidance, proc may not exist yet. |
149 | + # https://bugs.launchpad.net/charm-helpers/+bug/1474030 |
150 | + self.log.debug('Attempt {} to get {} proc start time on {} ' |
151 | + 'failed'.format(tries, service, unit_name)) |
152 | + time.sleep(retry_sleep_time) |
153 | + tries += 1 |
154 | |
155 | if not proc_start_time: |
156 | self.log.warn('No proc start time found, assuming service did ' |
157 | 'not start') |
158 | return False |
159 | if proc_start_time >= mtime: |
160 | - self.log.debug('proc start time is newer than provided mtime' |
161 | - '(%s >= %s)' % (proc_start_time, mtime)) |
162 | + self.log.debug('Proc start time is newer than provided mtime' |
163 | + '(%s >= %s) on %s (OK)' % (proc_start_time, |
164 | + mtime, unit_name)) |
165 | return True |
166 | else: |
167 | - self.log.warn('proc start time (%s) is older than provided mtime ' |
168 | - '(%s), service did not restart' % (proc_start_time, |
169 | - mtime)) |
170 | + self.log.warn('Proc start time (%s) is older than provided mtime ' |
171 | + '(%s) on %s, service did not ' |
172 | + 'restart' % (proc_start_time, mtime, unit_name)) |
173 | return False |
174 | |
175 | def config_updated_since(self, sentry_unit, filename, mtime, |
176 | @@ -374,8 +409,9 @@ |
177 | return False |
178 | |
179 | def validate_service_config_changed(self, sentry_unit, mtime, service, |
180 | - filename, pgrep_full=False, |
181 | - sleep_time=20, retry_count=2): |
182 | + filename, pgrep_full=None, |
183 | + sleep_time=20, retry_count=2, |
184 | + retry_sleep_time=30): |
185 | """Check service and file were updated after mtime |
186 | |
187 | Args: |
188 | @@ -383,9 +419,10 @@ |
189 | mtime (float): The epoch time to check against |
190 | service (string): service name to look for in process table |
191 | filename (string): The file to check mtime of |
192 | - pgrep_full (boolean): Use full command line search mode with pgrep |
193 | - sleep_time (int): Seconds to sleep before looking for process |
194 | + pgrep_full: [Deprecated] Use full command line search mode with pgrep |
195 | + sleep_time (int): Initial sleep in seconds to pass to test helpers |
196 | retry_count (int): If service is not found, how many times to retry |
197 | + retry_sleep_time (int): Time in seconds to wait between retries |
198 | |
199 | Typical Usage: |
200 | u = OpenStackAmuletUtils(ERROR) |
201 | @@ -402,15 +439,25 @@ |
202 | mtime, False if service is older than mtime or if service was |
203 | not found or if filename was modified before mtime. |
204 | """ |
205 | - self.log.debug('Checking %s restarted since %s' % (service, mtime)) |
206 | - time.sleep(sleep_time) |
207 | - service_restart = self.service_restarted_since(sentry_unit, mtime, |
208 | - service, |
209 | - pgrep_full=pgrep_full, |
210 | - sleep_time=0, |
211 | - retry_count=retry_count) |
212 | - config_update = self.config_updated_since(sentry_unit, filename, mtime, |
213 | - sleep_time=0) |
214 | + |
215 | + # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now |
216 | + # used instead of pgrep. pgrep_full is still passed through to ensure |
217 | + # deprecation WARNS. lp1474030 |
218 | + |
219 | + service_restart = self.service_restarted_since( |
220 | + sentry_unit, mtime, |
221 | + service, |
222 | + pgrep_full=pgrep_full, |
223 | + sleep_time=sleep_time, |
224 | + retry_count=retry_count, |
225 | + retry_sleep_time=retry_sleep_time) |
226 | + |
227 | + config_update = self.config_updated_since( |
228 | + sentry_unit, |
229 | + filename, |
230 | + mtime, |
231 | + sleep_time=0) |
232 | + |
233 | return service_restart and config_update |
234 | |
235 | def get_sentry_time(self, sentry_unit): |
236 | @@ -428,7 +475,6 @@ |
237 | """Return a list of all Ubuntu releases in order of release.""" |
238 | _d = distro_info.UbuntuDistroInfo() |
239 | _release_list = _d.all |
240 | - self.log.debug('Ubuntu release list: {}'.format(_release_list)) |
241 | return _release_list |
242 | |
243 | def file_to_url(self, file_rel_path): |
244 | @@ -568,6 +614,142 @@ |
245 | |
246 | return None |
247 | |
248 | + def validate_sectionless_conf(self, file_contents, expected): |
249 | + """A crude conf parser. Useful to inspect configuration files which |
250 | + do not have section headers (as would be necessary in order to use |
251 | + the configparser). Such as openstack-dashboard or rabbitmq confs.""" |
252 | + for line in file_contents.split('\n'): |
253 | + if '=' in line: |
254 | + args = line.split('=') |
255 | + if len(args) <= 1: |
256 | + continue |
257 | + key = args[0].strip() |
258 | + value = args[1].strip() |
259 | + if key in expected.keys(): |
260 | + if expected[key] != value: |
261 | + msg = ('Config mismatch. Expected, actual: {}, ' |
262 | + '{}'.format(expected[key], value)) |
263 | + amulet.raise_status(amulet.FAIL, msg=msg) |
264 | + |
265 | + def get_unit_hostnames(self, units): |
266 | + """Return a dict of juju unit names to hostnames.""" |
267 | + host_names = {} |
268 | + for unit in units: |
269 | + host_names[unit.info['unit_name']] = \ |
270 | + str(unit.file_contents('/etc/hostname').strip()) |
271 | + self.log.debug('Unit host names: {}'.format(host_names)) |
272 | + return host_names |
273 | + |
274 | + def run_cmd_unit(self, sentry_unit, cmd): |
275 | + """Run a command on a unit, return the output and exit code.""" |
276 | + output, code = sentry_unit.run(cmd) |
277 | + if code == 0: |
278 | + self.log.debug('{} `{}` command returned {} ' |
279 | + '(OK)'.format(sentry_unit.info['unit_name'], |
280 | + cmd, code)) |
281 | + else: |
282 | + msg = ('{} `{}` command returned {} ' |
283 | + '{}'.format(sentry_unit.info['unit_name'], |
284 | + cmd, code, output)) |
285 | + amulet.raise_status(amulet.FAIL, msg=msg) |
286 | + return str(output), code |
287 | + |
288 | + def file_exists_on_unit(self, sentry_unit, file_name): |
289 | + """Check if a file exists on a unit.""" |
290 | + try: |
291 | + sentry_unit.file_stat(file_name) |
292 | + return True |
293 | + except IOError: |
294 | + return False |
295 | + except Exception as e: |
296 | + msg = 'Error checking file {}: {}'.format(file_name, e) |
297 | + amulet.raise_status(amulet.FAIL, msg=msg) |
298 | + |
299 | + def file_contents_safe(self, sentry_unit, file_name, |
300 | + max_wait=60, fatal=False): |
301 | + """Get file contents from a sentry unit. Wrap amulet file_contents |
302 | + with retry logic to address races where a file checks as existing, |
303 | + but no longer exists by the time file_contents is called. |
304 | + Return None if file not found. Optionally raise if fatal is True.""" |
305 | + unit_name = sentry_unit.info['unit_name'] |
306 | + file_contents = False |
307 | + tries = 0 |
308 | + while not file_contents and tries < (max_wait / 4): |
309 | + try: |
310 | + file_contents = sentry_unit.file_contents(file_name) |
311 | + except IOError: |
312 | + self.log.debug('Attempt {} to open file {} from {} ' |
313 | + 'failed'.format(tries, file_name, |
314 | + unit_name)) |
315 | + time.sleep(4) |
316 | + tries += 1 |
317 | + |
318 | + if file_contents: |
319 | + return file_contents |
320 | + elif not fatal: |
321 | + return None |
322 | + elif fatal: |
323 | + msg = 'Failed to get file contents from unit.' |
324 | + amulet.raise_status(amulet.FAIL, msg) |
325 | + |
326 | + def port_knock_tcp(self, host="localhost", port=22, timeout=15): |
327 | + """Open a TCP socket to check for a listening sevice on a host. |
328 | + |
329 | + :param host: host name or IP address, default to localhost |
330 | + :param port: TCP port number, default to 22 |
331 | + :param timeout: Connect timeout, default to 15 seconds |
332 | + :returns: True if successful, False if connect failed |
333 | + """ |
334 | + |
335 | + # Resolve host name if possible |
336 | + try: |
337 | + connect_host = socket.gethostbyname(host) |
338 | + host_human = "{} ({})".format(connect_host, host) |
339 | + except socket.error as e: |
340 | + self.log.warn('Unable to resolve address: ' |
341 | + '{} ({}) Trying anyway!'.format(host, e)) |
342 | + connect_host = host |
343 | + host_human = connect_host |
344 | + |
345 | + # Attempt socket connection |
346 | + try: |
347 | + knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
348 | + knock.settimeout(timeout) |
349 | + knock.connect((connect_host, port)) |
350 | + knock.close() |
351 | + self.log.debug('Socket connect OK for host ' |
352 | + '{} on port {}.'.format(host_human, port)) |
353 | + return True |
354 | + except socket.error as e: |
355 | + self.log.debug('Socket connect FAIL for' |
356 | + ' {} port {} ({})'.format(host_human, port, e)) |
357 | + return False |
358 | + |
359 | + def port_knock_units(self, sentry_units, port=22, |
360 | + timeout=15, expect_success=True): |
361 | + """Open a TCP socket to check for a listening sevice on each |
362 | + listed juju unit. |
363 | + |
364 | + :param sentry_units: list of sentry unit pointers |
365 | + :param port: TCP port number, default to 22 |
366 | + :param timeout: Connect timeout, default to 15 seconds |
367 | + :expect_success: True by default, set False to invert logic |
368 | + :returns: None if successful, Failure message otherwise |
369 | + """ |
370 | + for unit in sentry_units: |
371 | + host = unit.info['public-address'] |
372 | + connected = self.port_knock_tcp(host, port, timeout) |
373 | + if not connected and expect_success: |
374 | + return 'Socket connect failed.' |
375 | + elif connected and not expect_success: |
376 | + return 'Socket connected unexpectedly.' |
377 | + |
378 | + def get_uuid_epoch_stamp(self): |
379 | + """Returns a stamp string based on uuid4 and epoch time. Useful in |
380 | + generating test messages which need to be unique-ish.""" |
381 | + return '[{}-{}]'.format(uuid.uuid4(), time.time()) |
382 | + |
383 | +# amulet juju action helpers: |
384 | def run_action(self, unit_sentry, action, |
385 | _check_output=subprocess.check_output): |
386 | """Run the named action on a given unit sentry. |
387 | |
388 | === modified file 'charmhelpers/contrib/openstack/amulet/deployment.py' |
389 | --- charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-10 19:56:36 +0000 |
390 | +++ charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-02 01:36:03 +0000 |
391 | @@ -44,8 +44,15 @@ |
392 | Determine if the local branch being tested is derived from its |
393 | stable or next (dev) branch, and based on this, use the corresonding |
394 | stable or next branches for the other_services.""" |
395 | + |
396 | + # Charms outside the lp:~openstack-charmers namespace |
397 | base_charms = ['mysql', 'mongodb', 'nrpe'] |
398 | |
399 | + # Force these charms to current series even when using an older series. |
400 | + # ie. Use trusty/nrpe even when series is precise, as the P charm |
401 | + # does not possess the necessary external master config and hooks. |
402 | + force_series_current = ['nrpe'] |
403 | + |
404 | if self.series in ['precise', 'trusty']: |
405 | base_series = self.series |
406 | else: |
407 | @@ -53,11 +60,17 @@ |
408 | |
409 | if self.stable: |
410 | for svc in other_services: |
411 | + if svc['name'] in force_series_current: |
412 | + base_series = self.current_next |
413 | + |
414 | temp = 'lp:charms/{}/{}' |
415 | svc['location'] = temp.format(base_series, |
416 | svc['name']) |
417 | else: |
418 | for svc in other_services: |
419 | + if svc['name'] in force_series_current: |
420 | + base_series = self.current_next |
421 | + |
422 | if svc['name'] in base_charms: |
423 | temp = 'lp:charms/{}/{}' |
424 | svc['location'] = temp.format(base_series, |
425 | @@ -77,21 +90,23 @@ |
426 | |
427 | services = other_services |
428 | services.append(this_service) |
429 | + |
430 | + # Charms which should use the source config option |
431 | use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph', |
432 | 'ceph-osd', 'ceph-radosgw'] |
433 | - # Most OpenStack subordinate charms do not expose an origin option |
434 | - # as that is controlled by the principle. |
435 | - ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe'] |
436 | + |
437 | + # Charms which can not use openstack-origin, ie. many subordinates |
438 | + no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe'] |
439 | |
440 | if self.openstack: |
441 | for svc in services: |
442 | - if svc['name'] not in use_source + ignore: |
443 | + if svc['name'] not in use_source + no_origin: |
444 | config = {'openstack-origin': self.openstack} |
445 | self.d.configure(svc['name'], config) |
446 | |
447 | if self.source: |
448 | for svc in services: |
449 | - if svc['name'] in use_source and svc['name'] not in ignore: |
450 | + if svc['name'] in use_source and svc['name'] not in no_origin: |
451 | config = {'source': self.source} |
452 | self.d.configure(svc['name'], config) |
453 | |
454 | |
455 | === modified file 'charmhelpers/contrib/openstack/amulet/utils.py' |
456 | --- charmhelpers/contrib/openstack/amulet/utils.py 2015-06-29 13:19:46 +0000 |
457 | +++ charmhelpers/contrib/openstack/amulet/utils.py 2015-09-02 01:36:03 +0000 |
458 | @@ -27,6 +27,7 @@ |
459 | import heatclient.v1.client as heat_client |
460 | import keystoneclient.v2_0 as keystone_client |
461 | import novaclient.v1_1.client as nova_client |
462 | +import pika |
463 | import swiftclient |
464 | |
465 | from charmhelpers.contrib.amulet.utils import ( |
466 | @@ -602,3 +603,361 @@ |
467 | self.log.debug('Ceph {} samples (OK): ' |
468 | '{}'.format(sample_type, samples)) |
469 | return None |
470 | + |
471 | +# rabbitmq/amqp specific helpers: |
472 | + def add_rmq_test_user(self, sentry_units, |
473 | + username="testuser1", password="changeme"): |
474 | + """Add a test user via the first rmq juju unit, check connection as |
475 | + the new user against all sentry units. |
476 | + |
477 | + :param sentry_units: list of sentry unit pointers |
478 | + :param username: amqp user name, default to testuser1 |
479 | + :param password: amqp user password |
480 | + :returns: None if successful. Raise on error. |
481 | + """ |
482 | + self.log.debug('Adding rmq user ({})...'.format(username)) |
483 | + |
484 | + # Check that user does not already exist |
485 | + cmd_user_list = 'rabbitmqctl list_users' |
486 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list) |
487 | + if username in output: |
488 | + self.log.warning('User ({}) already exists, returning ' |
489 | + 'gracefully.'.format(username)) |
490 | + return |
491 | + |
492 | + perms = '".*" ".*" ".*"' |
493 | + cmds = ['rabbitmqctl add_user {} {}'.format(username, password), |
494 | + 'rabbitmqctl set_permissions {} {}'.format(username, perms)] |
495 | + |
496 | + # Add user via first unit |
497 | + for cmd in cmds: |
498 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd) |
499 | + |
500 | + # Check connection against the other sentry_units |
501 | + self.log.debug('Checking user connect against units...') |
502 | + for sentry_unit in sentry_units: |
503 | + connection = self.connect_amqp_by_unit(sentry_unit, ssl=False, |
504 | + username=username, |
505 | + password=password) |
506 | + connection.close() |
507 | + |
508 | + def delete_rmq_test_user(self, sentry_units, username="testuser1"): |
509 | + """Delete a rabbitmq user via the first rmq juju unit. |
510 | + |
511 | + :param sentry_units: list of sentry unit pointers |
512 | + :param username: amqp user name, default to testuser1 |
513 | + :param password: amqp user password |
514 | + :returns: None if successful or no such user. |
515 | + """ |
516 | + self.log.debug('Deleting rmq user ({})...'.format(username)) |
517 | + |
518 | + # Check that the user exists |
519 | + cmd_user_list = 'rabbitmqctl list_users' |
520 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list) |
521 | + |
522 | + if username not in output: |
523 | + self.log.warning('User ({}) does not exist, returning ' |
524 | + 'gracefully.'.format(username)) |
525 | + return |
526 | + |
527 | + # Delete the user |
528 | + cmd_user_del = 'rabbitmqctl delete_user {}'.format(username) |
529 | + output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del) |
530 | + |
531 | + def get_rmq_cluster_status(self, sentry_unit): |
532 | + """Execute rabbitmq cluster status command on a unit and return |
533 | + the full output. |
534 | + |
535 | + :param unit: sentry unit |
536 | + :returns: String containing console output of cluster status command |
537 | + """ |
538 | + cmd = 'rabbitmqctl cluster_status' |
539 | + output, _ = self.run_cmd_unit(sentry_unit, cmd) |
540 | + self.log.debug('{} cluster_status:\n{}'.format( |
541 | + sentry_unit.info['unit_name'], output)) |
542 | + return str(output) |
543 | + |
544 | + def get_rmq_cluster_running_nodes(self, sentry_unit): |
545 | + """Parse rabbitmqctl cluster_status output string, return list of |
546 | + running rabbitmq cluster nodes. |
547 | + |
548 | + :param unit: sentry unit |
549 | + :returns: List containing node names of running nodes |
550 | + """ |
551 | + # NOTE(beisner): rabbitmqctl cluster_status output is not |
552 | + # json-parsable, do string chop foo, then json.loads that. |
553 | + str_stat = self.get_rmq_cluster_status(sentry_unit) |
554 | + if 'running_nodes' in str_stat: |
555 | + pos_start = str_stat.find("{running_nodes,") + 15 |
556 | + pos_end = str_stat.find("]},", pos_start) + 1 |
557 | + str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"') |
558 | + run_nodes = json.loads(str_run_nodes) |
559 | + return run_nodes |
560 | + else: |
561 | + return [] |
562 | + |
563 | + def validate_rmq_cluster_running_nodes(self, sentry_units): |
564 | + """Check that all rmq unit hostnames are represented in the |
565 | + cluster_status output of all units. |
566 | + |
567 | + :param host_names: dict of juju unit names to host names |
568 | + :param units: list of sentry unit pointers (all rmq units) |
569 | + :returns: None if successful, otherwise return error message |
570 | + """ |
571 | + host_names = self.get_unit_hostnames(sentry_units) |
572 | + errors = [] |
573 | + |
574 | + # Query every unit for cluster_status running nodes |
575 | + for query_unit in sentry_units: |
576 | + query_unit_name = query_unit.info['unit_name'] |
577 | + running_nodes = self.get_rmq_cluster_running_nodes(query_unit) |
578 | + |
579 | + # Confirm that every unit is represented in the queried unit's |
580 | + # cluster_status running nodes output. |
581 | + for validate_unit in sentry_units: |
582 | + val_host_name = host_names[validate_unit.info['unit_name']] |
583 | + val_node_name = 'rabbit@{}'.format(val_host_name) |
584 | + |
585 | + if val_node_name not in running_nodes: |
586 | + errors.append('Cluster member check failed on {}: {} not ' |
587 | + 'in {}\n'.format(query_unit_name, |
588 | + val_node_name, |
589 | + running_nodes)) |
590 | + if errors: |
591 | + return ''.join(errors) |
592 | + |
593 | + def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None): |
594 | + """Check a single juju rmq unit for ssl and port in the config file.""" |
595 | + host = sentry_unit.info['public-address'] |
596 | + unit_name = sentry_unit.info['unit_name'] |
597 | + |
598 | + conf_file = '/etc/rabbitmq/rabbitmq.config' |
599 | + conf_contents = str(self.file_contents_safe(sentry_unit, |
600 | + conf_file, max_wait=16)) |
601 | + # Checks |
602 | + conf_ssl = 'ssl' in conf_contents |
603 | + conf_port = str(port) in conf_contents |
604 | + |
605 | + # Port explicitly checked in config |
606 | + if port and conf_port and conf_ssl: |
607 | + self.log.debug('SSL is enabled @{}:{} ' |
608 | + '({})'.format(host, port, unit_name)) |
609 | + return True |
610 | + elif port and not conf_port and conf_ssl: |
611 | + self.log.debug('SSL is enabled @{} but not on port {} ' |
612 | + '({})'.format(host, port, unit_name)) |
613 | + return False |
614 | + # Port not checked (useful when checking that ssl is disabled) |
615 | + elif not port and conf_ssl: |
616 | + self.log.debug('SSL is enabled @{}:{} ' |
617 | + '({})'.format(host, port, unit_name)) |
618 | + return True |
619 | + elif not port and not conf_ssl: |
620 | + self.log.debug('SSL not enabled @{}:{} ' |
621 | + '({})'.format(host, port, unit_name)) |
622 | + return False |
623 | + else: |
624 | + msg = ('Unknown condition when checking SSL status @{}:{} ' |
625 | + '({})'.format(host, port, unit_name)) |
626 | + amulet.raise_status(amulet.FAIL, msg) |
627 | + |
628 | + def validate_rmq_ssl_enabled_units(self, sentry_units, port=None): |
629 | + """Check that ssl is enabled on rmq juju sentry units. |
630 | + |
631 | + :param sentry_units: list of all rmq sentry units |
632 | + :param port: optional ssl port override to validate |
633 | + :returns: None if successful, otherwise return error message |
634 | + """ |
635 | + for sentry_unit in sentry_units: |
636 | + if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port): |
637 | + return ('Unexpected condition: ssl is disabled on unit ' |
638 | + '({})'.format(sentry_unit.info['unit_name'])) |
639 | + return None |
640 | + |
641 | + def validate_rmq_ssl_disabled_units(self, sentry_units): |
642 | + """Check that ssl is enabled on listed rmq juju sentry units. |
643 | + |
644 | + :param sentry_units: list of all rmq sentry units |
645 | + :returns: True if successful. Raise on error. |
646 | + """ |
647 | + for sentry_unit in sentry_units: |
648 | + if self.rmq_ssl_is_enabled_on_unit(sentry_unit): |
649 | + return ('Unexpected condition: ssl is enabled on unit ' |
650 | + '({})'.format(sentry_unit.info['unit_name'])) |
651 | + return None |
652 | + |
653 | + def configure_rmq_ssl_on(self, sentry_units, deployment, |
654 | + port=None, max_wait=60): |
655 | + """Turn ssl charm config option on, with optional non-default |
656 | + ssl port specification. Confirm that it is enabled on every |
657 | + unit. |
658 | + |
659 | + :param sentry_units: list of sentry units |
660 | + :param deployment: amulet deployment object pointer |
661 | + :param port: amqp port, use defaults if None |
662 | + :param max_wait: maximum time to wait in seconds to confirm |
663 | + :returns: None if successful. Raise on error. |
664 | + """ |
665 | + self.log.debug('Setting ssl charm config option: on') |
666 | + |
667 | + # Enable RMQ SSL |
668 | + config = {'ssl': 'on'} |
669 | + if port: |
670 | + config['ssl_port'] = port |
671 | + |
672 | + deployment.configure('rabbitmq-server', config) |
673 | + |
674 | + # Confirm |
675 | + tries = 0 |
676 | + ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port) |
677 | + while ret and tries < (max_wait / 4): |
678 | + time.sleep(4) |
679 | + self.log.debug('Attempt {}: {}'.format(tries, ret)) |
680 | + ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port) |
681 | + tries += 1 |
682 | + |
683 | + if ret: |
684 | + amulet.raise_status(amulet.FAIL, ret) |
685 | + |
686 | + def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60): |
687 | + """Turn ssl charm config option off, confirm that it is disabled |
688 | + on every unit. |
689 | + |
690 | + :param sentry_units: list of sentry units |
691 | + :param deployment: amulet deployment object pointer |
692 | + :param max_wait: maximum time to wait in seconds to confirm |
693 | + :returns: None if successful. Raise on error. |
694 | + """ |
695 | + self.log.debug('Setting ssl charm config option: off') |
696 | + |
697 | + # Disable RMQ SSL |
698 | + config = {'ssl': 'off'} |
699 | + deployment.configure('rabbitmq-server', config) |
700 | + |
701 | + # Confirm |
702 | + tries = 0 |
703 | + ret = self.validate_rmq_ssl_disabled_units(sentry_units) |
704 | + while ret and tries < (max_wait / 4): |
705 | + time.sleep(4) |
706 | + self.log.debug('Attempt {}: {}'.format(tries, ret)) |
707 | + ret = self.validate_rmq_ssl_disabled_units(sentry_units) |
708 | + tries += 1 |
709 | + |
710 | + if ret: |
711 | + amulet.raise_status(amulet.FAIL, ret) |
712 | + |
713 | + def connect_amqp_by_unit(self, sentry_unit, ssl=False, |
714 | + port=None, fatal=True, |
715 | + username="testuser1", password="changeme"): |
716 | + """Establish and return a pika amqp connection to the rabbitmq service |
717 | + running on a rmq juju unit. |
718 | + |
719 | + :param sentry_unit: sentry unit pointer |
720 | + :param ssl: boolean, default to False |
721 | + :param port: amqp port, use defaults if None |
722 | + :param fatal: boolean, default to True (raises on connect error) |
723 | + :param username: amqp user name, default to testuser1 |
724 | + :param password: amqp user password |
725 | + :returns: pika amqp connection pointer or None if failed and non-fatal |
726 | + """ |
727 | + host = sentry_unit.info['public-address'] |
728 | + unit_name = sentry_unit.info['unit_name'] |
729 | + |
730 | + # Default port logic if port is not specified |
731 | + if ssl and not port: |
732 | + port = 5671 |
733 | + elif not ssl and not port: |
734 | + port = 5672 |
735 | + |
736 | + self.log.debug('Connecting to amqp on {}:{} ({}) as ' |
737 | + '{}...'.format(host, port, unit_name, username)) |
738 | + |
739 | + try: |
740 | + credentials = pika.PlainCredentials(username, password) |
741 | + parameters = pika.ConnectionParameters(host=host, port=port, |
742 | + credentials=credentials, |
743 | + ssl=ssl, |
744 | + connection_attempts=3, |
745 | + retry_delay=5, |
746 | + socket_timeout=1) |
747 | + connection = pika.BlockingConnection(parameters) |
748 | + assert connection.server_properties['product'] == 'RabbitMQ' |
749 | + self.log.debug('Connect OK') |
750 | + return connection |
751 | + except Exception as e: |
752 | + msg = ('amqp connection failed to {}:{} as ' |
753 | + '{} ({})'.format(host, port, username, str(e))) |
754 | + if fatal: |
755 | + amulet.raise_status(amulet.FAIL, msg) |
756 | + else: |
757 | + self.log.warn(msg) |
758 | + return None |
759 | + |
760 | + def publish_amqp_message_by_unit(self, sentry_unit, message, |
761 | + queue="test", ssl=False, |
762 | + username="testuser1", |
763 | + password="changeme", |
764 | + port=None): |
765 | + """Publish an amqp message to a rmq juju unit. |
766 | + |
767 | + :param sentry_unit: sentry unit pointer |
768 | + :param message: amqp message string |
769 | + :param queue: message queue, default to test |
770 | + :param username: amqp user name, default to testuser1 |
771 | + :param password: amqp user password |
772 | + :param ssl: boolean, default to False |
773 | + :param port: amqp port, use defaults if None |
774 | + :returns: None. Raises exception if publish failed. |
775 | + """ |
776 | + self.log.debug('Publishing message to {} queue:\n{}'.format(queue, |
777 | + message)) |
778 | + connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl, |
779 | + port=port, |
780 | + username=username, |
781 | + password=password) |
782 | + |
783 | + # NOTE(beisner): extra debug here re: pika hang potential: |
784 | + # https://github.com/pika/pika/issues/297 |
785 | + # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw |
786 | + self.log.debug('Defining channel...') |
787 | + channel = connection.channel() |
788 | + self.log.debug('Declaring queue...') |
789 | + channel.queue_declare(queue=queue, auto_delete=False, durable=True) |
790 | + self.log.debug('Publishing message...') |
791 | + channel.basic_publish(exchange='', routing_key=queue, body=message) |
792 | + self.log.debug('Closing channel...') |
793 | + channel.close() |
794 | + self.log.debug('Closing connection...') |
795 | + connection.close() |
796 | + |
797 | + def get_amqp_message_by_unit(self, sentry_unit, queue="test", |
798 | + username="testuser1", |
799 | + password="changeme", |
800 | + ssl=False, port=None): |
801 | + """Get an amqp message from a rmq juju unit. |
802 | + |
803 | + :param sentry_unit: sentry unit pointer |
804 | + :param queue: message queue, default to test |
805 | + :param username: amqp user name, default to testuser1 |
806 | + :param password: amqp user password |
807 | + :param ssl: boolean, default to False |
808 | + :param port: amqp port, use defaults if None |
809 | + :returns: amqp message body as string. Raise if get fails. |
810 | + """ |
811 | + connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl, |
812 | + port=port, |
813 | + username=username, |
814 | + password=password) |
815 | + channel = connection.channel() |
816 | + method_frame, _, body = channel.basic_get(queue) |
817 | + |
818 | + if method_frame: |
819 | + self.log.debug('Retreived message from {} queue:\n{}'.format(queue, |
820 | + body)) |
821 | + channel.basic_ack(method_frame.delivery_tag) |
822 | + channel.close() |
823 | + connection.close() |
824 | + return body |
825 | + else: |
826 | + msg = 'No message retrieved.' |
827 | + amulet.raise_status(amulet.FAIL, msg) |
Just a few comments in-line.