Merge lp:~chris.macnaughton/charms/trusty/ceph-osd/trunk into lp:~openstack-charmers-archive/charms/trusty/ceph-osd/next

Proposed by Chris MacNaughton on 2015-11-10
Status: Needs review
Proposed branch: lp:~chris.macnaughton/charms/trusty/ceph-osd/trunk
Merge into: lp:~openstack-charmers-archive/charms/trusty/ceph-osd/next
Diff against target: 1778 lines (+1103/-119)
18 files modified
config.yaml (+2/-1)
hooks/ceph_hooks.py (+19/-3)
hooks/charmhelpers/cli/__init__.py (+3/-3)
hooks/charmhelpers/contrib/charmsupport/nrpe.py (+44/-8)
hooks/charmhelpers/contrib/network/ip.py (+5/-3)
hooks/charmhelpers/core/hookenv.py (+46/-0)
hooks/charmhelpers/core/host.py (+60/-17)
hooks/charmhelpers/core/hugepage.py (+10/-1)
hooks/charmhelpers/core/kernel.py (+68/-0)
hooks/charmhelpers/core/services/helpers.py (+5/-2)
hooks/charmhelpers/core/strutils.py (+30/-0)
hooks/charmhelpers/core/templating.py (+13/-6)
hooks/charmhelpers/fetch/__init__.py (+1/-1)
metadata.yaml (+5/-0)
tests/charmhelpers/contrib/amulet/deployment.py (+4/-2)
tests/charmhelpers/contrib/amulet/utils.py (+284/-62)
tests/charmhelpers/contrib/openstack/amulet/deployment.py (+123/-10)
tests/charmhelpers/contrib/openstack/amulet/utils.py (+381/-0)
To merge this branch: bzr merge lp:~chris.macnaughton/charms/trusty/ceph-osd/trunk
Reviewer Review Type Date Requested Status
James Page 2015-11-10 Needs Fixing on 2015-12-09
Review via email: mp+277135@code.launchpad.net

Description of the change

Add storage hooks support

To post a comment you must log in.

charm_unit_test #12559 ceph-osd-next for chris.macnaughton mp277135
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/12559/

charm_lint_check #13425 ceph-osd-next for chris.macnaughton mp277135
    LINT FAIL: lint-test failed
    LINT FAIL: charm-proof failed

LINT Results (max last 2 lines):
make: *** [lint] Error 200
ERROR:root:Make target returned non-zero.

Full lint test output: http://paste.ubuntu.com/13216177/
Build: http://10.245.162.77:8080/job/charm_lint_check/13425/

charm_amulet_test #7833 ceph-osd-next for chris.macnaughton mp277135
    AMULET FAIL: amulet-test failed

AMULET Results (max last 2 lines):
make: *** [functional_test] Error 124
ERROR:root:Make target returned non-zero.

Full amulet test output: http://paste.ubuntu.com/13216928/
Build: http://10.245.162.77:8080/job/charm_amulet_test/7833/

56. By Chris MacNaughton on 2015-11-18

add-storage-hooks

charm_unit_test #13062 ceph-osd-next for chris.macnaughton mp277135
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/13062/

charm_lint_check #14013 ceph-osd-next for chris.macnaughton mp277135
    LINT FAIL: lint-test failed
    LINT FAIL: charm-proof failed

LINT Results (max last 2 lines):
make: *** [lint] Error 200
ERROR:root:Make target returned non-zero.

Full lint test output: http://paste.ubuntu.com/13334730/
Build: http://10.245.162.77:8080/job/charm_lint_check/14013/

charm_amulet_test #7940 ceph-osd-next for chris.macnaughton mp277135
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/7940/

Ryan Beisner (1chb1n) wrote :

FYI, the lint fail is https://github.com/juju/charm-tools/issues/41 (charm proof isn't yet storage-aware). Fix-committed upstream, package not yet fix-released.

James Page (james-page) wrote :

Hi Chris

Please can you update this proposal to add support for the journal device as well - I've just landed that into the ceph charm.

Cheers

James

review: Needs Fixing
James Page (james-page) wrote :

Chris - no not that one - specifically the changes that landed into ceph:

storage:
  osd-devices:
    type: block
    multiple:
      range: 0-
  osd-journal:
    type: block
    multiple:
      range: 0-1

Unmerged revisions

56. By Chris MacNaughton on 2015-11-18

add-storage-hooks

55. By Chris MacNaughton on 2015-11-10

add storage hooks

54. By Chris MacNaughton on 2015-11-10

update charmhelpers

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'config.yaml'
2--- config.yaml 2015-07-10 14:12:01 +0000
3+++ config.yaml 2015-11-18 20:55:22 +0000
4@@ -6,7 +6,8 @@
5 The devices to format and set up as osd volumes.
6
7 These devices are the range of devices that will be checked for and
8- used across all service units.
9+ used across all service units, in addition to any volumes attached
10+ via the --storage flag during deployment
11
12 For ceph >= 0.56.6 these can also be directories instead of devices - the
13 charm assumes anything not starting with /dev is a directory instead.
14
15=== modified file 'hooks/ceph_hooks.py'
16--- hooks/ceph_hooks.py 2015-10-30 02:22:54 +0000
17+++ hooks/ceph_hooks.py 2015-11-18 20:55:22 +0000
18@@ -24,6 +24,8 @@
19 UnregisteredHookError,
20 service_name,
21 status_set,
22+ storage_get,
23+ storage_list
24 )
25 from charmhelpers.core.host import (
26 umount,
27@@ -128,7 +130,14 @@
28 ceph.zap_disk(osd_journal)
29 with open(JOURNAL_ZAPPED, 'w') as zapped:
30 zapped.write('DONE')
31-
32+ storage_changed()
33+
34+
35+@hooks.hook('osd-devices-storage-attached',
36+ 'osd-fs-storage-attached',
37+ 'osd-devices-storage-detaching',
38+ 'osd-fs-storage-detaching')
39+def storage_changed():
40 if ceph.is_bootstrapped():
41 log('ceph bootstrapped, rescanning disks')
42 emit_cephconf()
43@@ -180,9 +189,16 @@
44
45 def get_devices():
46 if config('osd-devices'):
47- return config('osd-devices').split(' ')
48+ devices = config('osd-devices').split(' ')
49 else:
50- return []
51+ devices = []
52+ # List storage instances for the 'osd-devices'
53+ # storegdeclared for this charm too, and add
54+ # their block device paths to the list.
55+ storage_ids = storage_list('osd-devices')
56+ storage_ids.extend(storage_list('osd-fs'))
57+ devices.extend((storage_get('location', s) for s in storage_ids))
58+ return devices
59
60
61 @hooks.hook('mon-relation-changed',
62
63=== modified file 'hooks/charmhelpers/cli/__init__.py'
64--- hooks/charmhelpers/cli/__init__.py 2015-08-19 13:50:42 +0000
65+++ hooks/charmhelpers/cli/__init__.py 2015-11-18 20:55:22 +0000
66@@ -20,7 +20,7 @@
67
68 from six.moves import zip
69
70-from charmhelpers.core import unitdata
71+import charmhelpers.core.unitdata
72
73
74 class OutputFormatter(object):
75@@ -163,8 +163,8 @@
76 if getattr(arguments.func, '_cli_no_output', False):
77 output = ''
78 self.formatter.format_output(output, arguments.format)
79- if unitdata._KV:
80- unitdata._KV.flush()
81+ if charmhelpers.core.unitdata._KV:
82+ charmhelpers.core.unitdata._KV.flush()
83
84
85 cmdline = CommandLine()
86
87=== modified file 'hooks/charmhelpers/contrib/charmsupport/nrpe.py'
88--- hooks/charmhelpers/contrib/charmsupport/nrpe.py 2015-06-17 16:59:15 +0000
89+++ hooks/charmhelpers/contrib/charmsupport/nrpe.py 2015-11-18 20:55:22 +0000
90@@ -148,6 +148,13 @@
91 self.description = description
92 self.check_cmd = self._locate_cmd(check_cmd)
93
94+ def _get_check_filename(self):
95+ return os.path.join(NRPE.nrpe_confdir, '{}.cfg'.format(self.command))
96+
97+ def _get_service_filename(self, hostname):
98+ return os.path.join(NRPE.nagios_exportdir,
99+ 'service__{}_{}.cfg'.format(hostname, self.command))
100+
101 def _locate_cmd(self, check_cmd):
102 search_path = (
103 '/usr/lib/nagios/plugins',
104@@ -163,9 +170,21 @@
105 log('Check command not found: {}'.format(parts[0]))
106 return ''
107
108+ def _remove_service_files(self):
109+ if not os.path.exists(NRPE.nagios_exportdir):
110+ return
111+ for f in os.listdir(NRPE.nagios_exportdir):
112+ if f.endswith('_{}.cfg'.format(self.command)):
113+ os.remove(os.path.join(NRPE.nagios_exportdir, f))
114+
115+ def remove(self, hostname):
116+ nrpe_check_file = self._get_check_filename()
117+ if os.path.exists(nrpe_check_file):
118+ os.remove(nrpe_check_file)
119+ self._remove_service_files()
120+
121 def write(self, nagios_context, hostname, nagios_servicegroups):
122- nrpe_check_file = '/etc/nagios/nrpe.d/{}.cfg'.format(
123- self.command)
124+ nrpe_check_file = self._get_check_filename()
125 with open(nrpe_check_file, 'w') as nrpe_check_config:
126 nrpe_check_config.write("# check {}\n".format(self.shortname))
127 nrpe_check_config.write("command[{}]={}\n".format(
128@@ -180,9 +199,7 @@
129
130 def write_service_config(self, nagios_context, hostname,
131 nagios_servicegroups):
132- for f in os.listdir(NRPE.nagios_exportdir):
133- if re.search('.*{}.cfg'.format(self.command), f):
134- os.remove(os.path.join(NRPE.nagios_exportdir, f))
135+ self._remove_service_files()
136
137 templ_vars = {
138 'nagios_hostname': hostname,
139@@ -192,8 +209,7 @@
140 'command': self.command,
141 }
142 nrpe_service_text = Check.service_template.format(**templ_vars)
143- nrpe_service_file = '{}/service__{}_{}.cfg'.format(
144- NRPE.nagios_exportdir, hostname, self.command)
145+ nrpe_service_file = self._get_service_filename(hostname)
146 with open(nrpe_service_file, 'w') as nrpe_service_config:
147 nrpe_service_config.write(str(nrpe_service_text))
148
149@@ -218,12 +234,32 @@
150 if hostname:
151 self.hostname = hostname
152 else:
153- self.hostname = "{}-{}".format(self.nagios_context, self.unit_name)
154+ nagios_hostname = get_nagios_hostname()
155+ if nagios_hostname:
156+ self.hostname = nagios_hostname
157+ else:
158+ self.hostname = "{}-{}".format(self.nagios_context, self.unit_name)
159 self.checks = []
160
161 def add_check(self, *args, **kwargs):
162 self.checks.append(Check(*args, **kwargs))
163
164+ def remove_check(self, *args, **kwargs):
165+ if kwargs.get('shortname') is None:
166+ raise ValueError('shortname of check must be specified')
167+
168+ # Use sensible defaults if they're not specified - these are not
169+ # actually used during removal, but they're required for constructing
170+ # the Check object; check_disk is chosen because it's part of the
171+ # nagios-plugins-basic package.
172+ if kwargs.get('check_cmd') is None:
173+ kwargs['check_cmd'] = 'check_disk'
174+ if kwargs.get('description') is None:
175+ kwargs['description'] = ''
176+
177+ check = Check(*args, **kwargs)
178+ check.remove(self.hostname)
179+
180 def write(self):
181 try:
182 nagios_uid = pwd.getpwnam('nagios').pw_uid
183
184=== modified file 'hooks/charmhelpers/contrib/network/ip.py'
185--- hooks/charmhelpers/contrib/network/ip.py 2015-09-03 09:42:18 +0000
186+++ hooks/charmhelpers/contrib/network/ip.py 2015-11-18 20:55:22 +0000
187@@ -23,7 +23,7 @@
188 from functools import partial
189
190 from charmhelpers.core.hookenv import unit_get
191-from charmhelpers.fetch import apt_install
192+from charmhelpers.fetch import apt_install, apt_update
193 from charmhelpers.core.hookenv import (
194 log,
195 WARNING,
196@@ -32,13 +32,15 @@
197 try:
198 import netifaces
199 except ImportError:
200- apt_install('python-netifaces')
201+ apt_update(fatal=True)
202+ apt_install('python-netifaces', fatal=True)
203 import netifaces
204
205 try:
206 import netaddr
207 except ImportError:
208- apt_install('python-netaddr')
209+ apt_update(fatal=True)
210+ apt_install('python-netaddr', fatal=True)
211 import netaddr
212
213
214
215=== modified file 'hooks/charmhelpers/core/hookenv.py'
216--- hooks/charmhelpers/core/hookenv.py 2015-09-03 09:42:18 +0000
217+++ hooks/charmhelpers/core/hookenv.py 2015-11-18 20:55:22 +0000
218@@ -491,6 +491,19 @@
219
220
221 @cached
222+def peer_relation_id():
223+ '''Get a peer relation id if a peer relation has been joined, else None.'''
224+ md = metadata()
225+ section = md.get('peers')
226+ if section:
227+ for key in section:
228+ relids = relation_ids(key)
229+ if relids:
230+ return relids[0]
231+ return None
232+
233+
234+@cached
235 def relation_to_interface(relation_name):
236 """
237 Given the name of a relation, return the interface that relation uses.
238@@ -623,6 +636,38 @@
239 return unit_get('private-address')
240
241
242+@cached
243+def storage_get(attribute="", storage_id=""):
244+ """Get storage attributes"""
245+ _args = ['storage-get', '--format=json']
246+ if storage_id:
247+ _args.extend(('-s', storage_id))
248+ if attribute:
249+ _args.append(attribute)
250+ try:
251+ return json.loads(subprocess.check_output(_args).decode('UTF-8'))
252+ except ValueError:
253+ return None
254+
255+
256+@cached
257+def storage_list(storage_name=""):
258+ """List the storage IDs for the unit"""
259+ _args = ['storage-list', '--format=json']
260+ if storage_name:
261+ _args.append(storage_name)
262+ try:
263+ return json.loads(subprocess.check_output(_args).decode('UTF-8'))
264+ except ValueError:
265+ return None
266+ except OSError as e:
267+ import errno
268+ if e.errno == errno.ENOENT:
269+ # storage-list does not exist
270+ return []
271+ raise
272+
273+
274 class UnregisteredHookError(Exception):
275 """Raised when an undefined hook is called"""
276 pass
277@@ -788,6 +833,7 @@
278
279 def translate_exc(from_exc, to_exc):
280 def inner_translate_exc1(f):
281+ @wraps(f)
282 def inner_translate_exc2(*args, **kwargs):
283 try:
284 return f(*args, **kwargs)
285
286=== modified file 'hooks/charmhelpers/core/host.py'
287--- hooks/charmhelpers/core/host.py 2015-08-19 13:50:42 +0000
288+++ hooks/charmhelpers/core/host.py 2015-11-18 20:55:22 +0000
289@@ -63,32 +63,48 @@
290 return service_result
291
292
293-def service_pause(service_name, init_dir=None):
294+def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
295 """Pause a system service.
296
297 Stop it, and prevent it from starting again at boot."""
298- if init_dir is None:
299- init_dir = "/etc/init"
300 stopped = service_stop(service_name)
301- # XXX: Support systemd too
302- override_path = os.path.join(
303- init_dir, '{}.override'.format(service_name))
304- with open(override_path, 'w') as fh:
305- fh.write("manual\n")
306+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
307+ sysv_file = os.path.join(initd_dir, service_name)
308+ if os.path.exists(upstart_file):
309+ override_path = os.path.join(
310+ init_dir, '{}.override'.format(service_name))
311+ with open(override_path, 'w') as fh:
312+ fh.write("manual\n")
313+ elif os.path.exists(sysv_file):
314+ subprocess.check_call(["update-rc.d", service_name, "disable"])
315+ else:
316+ # XXX: Support SystemD too
317+ raise ValueError(
318+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
319+ service_name, upstart_file, sysv_file))
320 return stopped
321
322
323-def service_resume(service_name, init_dir=None):
324+def service_resume(service_name, init_dir="/etc/init",
325+ initd_dir="/etc/init.d"):
326 """Resume a system service.
327
328 Reenable starting again at boot. Start the service"""
329- # XXX: Support systemd too
330- if init_dir is None:
331- init_dir = "/etc/init"
332- override_path = os.path.join(
333- init_dir, '{}.override'.format(service_name))
334- if os.path.exists(override_path):
335- os.unlink(override_path)
336+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
337+ sysv_file = os.path.join(initd_dir, service_name)
338+ if os.path.exists(upstart_file):
339+ override_path = os.path.join(
340+ init_dir, '{}.override'.format(service_name))
341+ if os.path.exists(override_path):
342+ os.unlink(override_path)
343+ elif os.path.exists(sysv_file):
344+ subprocess.check_call(["update-rc.d", service_name, "enable"])
345+ else:
346+ # XXX: Support SystemD too
347+ raise ValueError(
348+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
349+ service_name, upstart_file, sysv_file))
350+
351 started = service_start(service_name)
352 return started
353
354@@ -550,7 +566,14 @@
355 os.chdir(cur)
356
357
358-def chownr(path, owner, group, follow_links=True):
359+def chownr(path, owner, group, follow_links=True, chowntopdir=False):
360+ """
361+ Recursively change user and group ownership of files and directories
362+ in given path. Doesn't chown path itself by default, only its children.
363+
364+ :param bool follow_links: Also Chown links if True
365+ :param bool chowntopdir: Also chown path itself if True
366+ """
367 uid = pwd.getpwnam(owner).pw_uid
368 gid = grp.getgrnam(group).gr_gid
369 if follow_links:
370@@ -558,6 +581,10 @@
371 else:
372 chown = os.lchown
373
374+ if chowntopdir:
375+ broken_symlink = os.path.lexists(path) and not os.path.exists(path)
376+ if not broken_symlink:
377+ chown(path, uid, gid)
378 for root, dirs, files in os.walk(path):
379 for name in dirs + files:
380 full = os.path.join(root, name)
381@@ -568,3 +595,19 @@
382
383 def lchownr(path, owner, group):
384 chownr(path, owner, group, follow_links=False)
385+
386+
387+def get_total_ram():
388+ '''The total amount of system RAM in bytes.
389+
390+ This is what is reported by the OS, and may be overcommitted when
391+ there are multiple containers hosted on the same machine.
392+ '''
393+ with open('/proc/meminfo', 'r') as f:
394+ for line in f.readlines():
395+ if line:
396+ key, value, unit = line.split()
397+ if key == 'MemTotal:':
398+ assert unit == 'kB', 'Unknown unit'
399+ return int(value) * 1024 # Classic, not KiB.
400+ raise NotImplementedError()
401
402=== modified file 'hooks/charmhelpers/core/hugepage.py'
403--- hooks/charmhelpers/core/hugepage.py 2015-08-19 13:50:42 +0000
404+++ hooks/charmhelpers/core/hugepage.py 2015-11-18 20:55:22 +0000
405@@ -25,11 +25,13 @@
406 fstab_mount,
407 mkdir,
408 )
409+from charmhelpers.core.strutils import bytes_from_string
410+from subprocess import check_output
411
412
413 def hugepage_support(user, group='hugetlb', nr_hugepages=256,
414 max_map_count=65536, mnt_point='/run/hugepages/kvm',
415- pagesize='2MB', mount=True):
416+ pagesize='2MB', mount=True, set_shmmax=False):
417 """Enable hugepages on system.
418
419 Args:
420@@ -44,11 +46,18 @@
421 group_info = add_group(group)
422 gid = group_info.gr_gid
423 add_user_to_group(user, group)
424+ if max_map_count < 2 * nr_hugepages:
425+ max_map_count = 2 * nr_hugepages
426 sysctl_settings = {
427 'vm.nr_hugepages': nr_hugepages,
428 'vm.max_map_count': max_map_count,
429 'vm.hugetlb_shm_group': gid,
430 }
431+ if set_shmmax:
432+ shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
433+ shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
434+ if shmmax_minsize > shmmax_current:
435+ sysctl_settings['kernel.shmmax'] = shmmax_minsize
436 sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
437 mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
438 lfstab = fstab.Fstab()
439
440=== added file 'hooks/charmhelpers/core/kernel.py'
441--- hooks/charmhelpers/core/kernel.py 1970-01-01 00:00:00 +0000
442+++ hooks/charmhelpers/core/kernel.py 2015-11-18 20:55:22 +0000
443@@ -0,0 +1,68 @@
444+#!/usr/bin/env python
445+# -*- coding: utf-8 -*-
446+
447+# Copyright 2014-2015 Canonical Limited.
448+#
449+# This file is part of charm-helpers.
450+#
451+# charm-helpers is free software: you can redistribute it and/or modify
452+# it under the terms of the GNU Lesser General Public License version 3 as
453+# published by the Free Software Foundation.
454+#
455+# charm-helpers is distributed in the hope that it will be useful,
456+# but WITHOUT ANY WARRANTY; without even the implied warranty of
457+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
458+# GNU Lesser General Public License for more details.
459+#
460+# You should have received a copy of the GNU Lesser General Public License
461+# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
462+
463+__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
464+
465+from charmhelpers.core.hookenv import (
466+ log,
467+ INFO
468+)
469+
470+from subprocess import check_call, check_output
471+import re
472+
473+
474+def modprobe(module, persist=True):
475+ """Load a kernel module and configure for auto-load on reboot."""
476+ cmd = ['modprobe', module]
477+
478+ log('Loading kernel module %s' % module, level=INFO)
479+
480+ check_call(cmd)
481+ if persist:
482+ with open('/etc/modules', 'r+') as modules:
483+ if module not in modules.read():
484+ modules.write(module)
485+
486+
487+def rmmod(module, force=False):
488+ """Remove a module from the linux kernel"""
489+ cmd = ['rmmod']
490+ if force:
491+ cmd.append('-f')
492+ cmd.append(module)
493+ log('Removing kernel module %s' % module, level=INFO)
494+ return check_call(cmd)
495+
496+
497+def lsmod():
498+ """Shows what kernel modules are currently loaded"""
499+ return check_output(['lsmod'],
500+ universal_newlines=True)
501+
502+
503+def is_module_loaded(module):
504+ """Checks if a kernel module is already loaded"""
505+ matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
506+ return len(matches) > 0
507+
508+
509+def update_initramfs(version='all'):
510+ """Updates an initramfs image"""
511+ return check_call(["update-initramfs", "-k", version, "-u"])
512
513=== modified file 'hooks/charmhelpers/core/services/helpers.py'
514--- hooks/charmhelpers/core/services/helpers.py 2015-08-19 13:50:42 +0000
515+++ hooks/charmhelpers/core/services/helpers.py 2015-11-18 20:55:22 +0000
516@@ -249,16 +249,18 @@
517 :param int perms: The permissions of the rendered file
518 :param partial on_change_action: functools partial to be executed when
519 rendered file changes
520+ :param jinja2 loader template_loader: A jinja2 template loader
521 """
522 def __init__(self, source, target,
523 owner='root', group='root', perms=0o444,
524- on_change_action=None):
525+ on_change_action=None, template_loader=None):
526 self.source = source
527 self.target = target
528 self.owner = owner
529 self.group = group
530 self.perms = perms
531 self.on_change_action = on_change_action
532+ self.template_loader = template_loader
533
534 def __call__(self, manager, service_name, event_name):
535 pre_checksum = ''
536@@ -269,7 +271,8 @@
537 for ctx in service.get('required_data', []):
538 context.update(ctx)
539 templating.render(self.source, self.target, context,
540- self.owner, self.group, self.perms)
541+ self.owner, self.group, self.perms,
542+ template_loader=self.template_loader)
543 if self.on_change_action:
544 if pre_checksum == host.file_hash(self.target):
545 hookenv.log(
546
547=== modified file 'hooks/charmhelpers/core/strutils.py'
548--- hooks/charmhelpers/core/strutils.py 2015-04-16 21:32:48 +0000
549+++ hooks/charmhelpers/core/strutils.py 2015-11-18 20:55:22 +0000
550@@ -18,6 +18,7 @@
551 # along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
552
553 import six
554+import re
555
556
557 def bool_from_string(value):
558@@ -40,3 +41,32 @@
559
560 msg = "Unable to interpret string value '%s' as boolean" % (value)
561 raise ValueError(msg)
562+
563+
564+def bytes_from_string(value):
565+ """Interpret human readable string value as bytes.
566+
567+ Returns int
568+ """
569+ BYTE_POWER = {
570+ 'K': 1,
571+ 'KB': 1,
572+ 'M': 2,
573+ 'MB': 2,
574+ 'G': 3,
575+ 'GB': 3,
576+ 'T': 4,
577+ 'TB': 4,
578+ 'P': 5,
579+ 'PB': 5,
580+ }
581+ if isinstance(value, six.string_types):
582+ value = six.text_type(value)
583+ else:
584+ msg = "Unable to interpret non-string value '%s' as boolean" % (value)
585+ raise ValueError(msg)
586+ matches = re.match("([0-9]+)([a-zA-Z]+)", value)
587+ if not matches:
588+ msg = "Unable to interpret string value '%s' as bytes" % (value)
589+ raise ValueError(msg)
590+ return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
591
592=== modified file 'hooks/charmhelpers/core/templating.py'
593--- hooks/charmhelpers/core/templating.py 2015-02-26 13:37:18 +0000
594+++ hooks/charmhelpers/core/templating.py 2015-11-18 20:55:22 +0000
595@@ -21,7 +21,7 @@
596
597
598 def render(source, target, context, owner='root', group='root',
599- perms=0o444, templates_dir=None, encoding='UTF-8'):
600+ perms=0o444, templates_dir=None, encoding='UTF-8', template_loader=None):
601 """
602 Render a template.
603
604@@ -52,17 +52,24 @@
605 apt_install('python-jinja2', fatal=True)
606 from jinja2 import FileSystemLoader, Environment, exceptions
607
608- if templates_dir is None:
609- templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
610- loader = Environment(loader=FileSystemLoader(templates_dir))
611+ if template_loader:
612+ template_env = Environment(loader=template_loader)
613+ else:
614+ if templates_dir is None:
615+ templates_dir = os.path.join(hookenv.charm_dir(), 'templates')
616+ template_env = Environment(loader=FileSystemLoader(templates_dir))
617 try:
618 source = source
619- template = loader.get_template(source)
620+ template = template_env.get_template(source)
621 except exceptions.TemplateNotFound as e:
622 hookenv.log('Could not load template %s from %s.' %
623 (source, templates_dir),
624 level=hookenv.ERROR)
625 raise e
626 content = template.render(context)
627- host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
628+ target_dir = os.path.dirname(target)
629+ if not os.path.exists(target_dir):
630+ # This is a terrible default directory permission, as the file
631+ # or its siblings will often contain secrets.
632+ host.mkdir(os.path.dirname(target), owner, group, perms=0o755)
633 host.write_file(target, content.encode(encoding), owner, group, perms)
634
635=== modified file 'hooks/charmhelpers/fetch/__init__.py'
636--- hooks/charmhelpers/fetch/__init__.py 2015-08-19 13:50:42 +0000
637+++ hooks/charmhelpers/fetch/__init__.py 2015-11-18 20:55:22 +0000
638@@ -225,12 +225,12 @@
639
640 def apt_mark(packages, mark, fatal=False):
641 """Flag one or more packages using apt-mark"""
642+ log("Marking {} as {}".format(packages, mark))
643 cmd = ['apt-mark', mark]
644 if isinstance(packages, six.string_types):
645 cmd.append(packages)
646 else:
647 cmd.extend(packages)
648- log("Holding {}".format(packages))
649
650 if fatal:
651 subprocess.check_call(cmd, universal_newlines=True)
652
653=== added symlink 'hooks/osd-devices-storage-attached'
654=== target is u'./ceph_hooks.py'
655=== added symlink 'hooks/osd-devices-storage-detaching'
656=== target is u'./ceph_hooks.py'
657=== modified file 'metadata.yaml'
658--- metadata.yaml 2015-11-18 10:30:34 +0000
659+++ metadata.yaml 2015-11-18 20:55:22 +0000
660@@ -19,3 +19,8 @@
661 requires:
662 mon:
663 interface: ceph-osd
664+storage:
665+ osd-devices:
666+ type: block
667+ multiple:
668+ range: 0+
669\ No newline at end of file
670
671=== modified file 'tests/charmhelpers/contrib/amulet/deployment.py'
672--- tests/charmhelpers/contrib/amulet/deployment.py 2015-01-26 11:51:28 +0000
673+++ tests/charmhelpers/contrib/amulet/deployment.py 2015-11-18 20:55:22 +0000
674@@ -51,7 +51,8 @@
675 if 'units' not in this_service:
676 this_service['units'] = 1
677
678- self.d.add(this_service['name'], units=this_service['units'])
679+ self.d.add(this_service['name'], units=this_service['units'],
680+ constraints=this_service.get('constraints'))
681
682 for svc in other_services:
683 if 'location' in svc:
684@@ -64,7 +65,8 @@
685 if 'units' not in svc:
686 svc['units'] = 1
687
688- self.d.add(svc['name'], charm=branch_location, units=svc['units'])
689+ self.d.add(svc['name'], charm=branch_location, units=svc['units'],
690+ constraints=svc.get('constraints'))
691
692 def _add_relations(self, relations):
693 """Add all of the relations for the services."""
694
695=== modified file 'tests/charmhelpers/contrib/amulet/utils.py'
696--- tests/charmhelpers/contrib/amulet/utils.py 2015-08-19 13:50:42 +0000
697+++ tests/charmhelpers/contrib/amulet/utils.py 2015-11-18 20:55:22 +0000
698@@ -19,9 +19,11 @@
699 import logging
700 import os
701 import re
702+import socket
703 import subprocess
704 import sys
705 import time
706+import uuid
707
708 import amulet
709 import distro_info
710@@ -114,7 +116,7 @@
711 # /!\ DEPRECATION WARNING (beisner):
712 # New and existing tests should be rewritten to use
713 # validate_services_by_name() as it is aware of init systems.
714- self.log.warn('/!\\ DEPRECATION WARNING: use '
715+ self.log.warn('DEPRECATION WARNING: use '
716 'validate_services_by_name instead of validate_services '
717 'due to init system differences.')
718
719@@ -269,33 +271,52 @@
720 """Get last modification time of directory."""
721 return sentry_unit.directory_stat(directory)['mtime']
722
723- def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
724- """Get process' start time.
725-
726- Determine start time of the process based on the last modification
727- time of the /proc/pid directory. If pgrep_full is True, the process
728- name is matched against the full command line.
729- """
730- if pgrep_full:
731- cmd = 'pgrep -o -f {}'.format(service)
732- else:
733- cmd = 'pgrep -o {}'.format(service)
734- cmd = cmd + ' | grep -v pgrep || exit 0'
735- cmd_out = sentry_unit.run(cmd)
736- self.log.debug('CMDout: ' + str(cmd_out))
737- if cmd_out[0]:
738- self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
739- proc_dir = '/proc/{}'.format(cmd_out[0].strip())
740- return self._get_dir_mtime(sentry_unit, proc_dir)
741+ def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
742+ """Get start time of a process based on the last modification time
743+ of the /proc/pid directory.
744+
745+ :sentry_unit: The sentry unit to check for the service on
746+ :service: service name to look for in process table
747+ :pgrep_full: [Deprecated] Use full command line search mode with pgrep
748+ :returns: epoch time of service process start
749+ :param commands: list of bash commands
750+ :param sentry_units: list of sentry unit pointers
751+ :returns: None if successful; Failure message otherwise
752+ """
753+ if pgrep_full is not None:
754+ # /!\ DEPRECATION WARNING (beisner):
755+ # No longer implemented, as pidof is now used instead of pgrep.
756+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
757+ self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
758+ 'longer implemented re: lp 1474030.')
759+
760+ pid_list = self.get_process_id_list(sentry_unit, service)
761+ pid = pid_list[0]
762+ proc_dir = '/proc/{}'.format(pid)
763+ self.log.debug('Pid for {} on {}: {}'.format(
764+ service, sentry_unit.info['unit_name'], pid))
765+
766+ return self._get_dir_mtime(sentry_unit, proc_dir)
767
768 def service_restarted(self, sentry_unit, service, filename,
769- pgrep_full=False, sleep_time=20):
770+ pgrep_full=None, sleep_time=20):
771 """Check if service was restarted.
772
773 Compare a service's start time vs a file's last modification time
774 (such as a config file for that service) to determine if the service
775 has been restarted.
776 """
777+ # /!\ DEPRECATION WARNING (beisner):
778+ # This method is prone to races in that no before-time is known.
779+ # Use validate_service_config_changed instead.
780+
781+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
782+ # used instead of pgrep. pgrep_full is still passed through to ensure
783+ # deprecation WARNS. lp1474030
784+ self.log.warn('DEPRECATION WARNING: use '
785+ 'validate_service_config_changed instead of '
786+ 'service_restarted due to known races.')
787+
788 time.sleep(sleep_time)
789 if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
790 self._get_file_mtime(sentry_unit, filename)):
791@@ -304,78 +325,122 @@
792 return False
793
794 def service_restarted_since(self, sentry_unit, mtime, service,
795- pgrep_full=False, sleep_time=20,
796- retry_count=2):
797+ pgrep_full=None, sleep_time=20,
798+ retry_count=30, retry_sleep_time=10):
799 """Check if service was been started after a given time.
800
801 Args:
802 sentry_unit (sentry): The sentry unit to check for the service on
803 mtime (float): The epoch time to check against
804 service (string): service name to look for in process table
805- pgrep_full (boolean): Use full command line search mode with pgrep
806- sleep_time (int): Seconds to sleep before looking for process
807- retry_count (int): If service is not found, how many times to retry
808+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
809+ sleep_time (int): Initial sleep time (s) before looking for file
810+ retry_sleep_time (int): Time (s) to sleep between retries
811+ retry_count (int): If file is not found, how many times to retry
812
813 Returns:
814 bool: True if service found and its start time it newer than mtime,
815 False if service is older than mtime or if service was
816 not found.
817 """
818- self.log.debug('Checking %s restarted since %s' % (service, mtime))
819+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
820+ # used instead of pgrep. pgrep_full is still passed through to ensure
821+ # deprecation WARNS. lp1474030
822+
823+ unit_name = sentry_unit.info['unit_name']
824+ self.log.debug('Checking that %s service restarted since %s on '
825+ '%s' % (service, mtime, unit_name))
826 time.sleep(sleep_time)
827- proc_start_time = self._get_proc_start_time(sentry_unit, service,
828- pgrep_full)
829- while retry_count > 0 and not proc_start_time:
830- self.log.debug('No pid file found for service %s, will retry %i '
831- 'more times' % (service, retry_count))
832- time.sleep(30)
833- proc_start_time = self._get_proc_start_time(sentry_unit, service,
834- pgrep_full)
835- retry_count = retry_count - 1
836+ proc_start_time = None
837+ tries = 0
838+ while tries <= retry_count and not proc_start_time:
839+ try:
840+ proc_start_time = self._get_proc_start_time(sentry_unit,
841+ service,
842+ pgrep_full)
843+ self.log.debug('Attempt {} to get {} proc start time on {} '
844+ 'OK'.format(tries, service, unit_name))
845+ except IOError as e:
846+ # NOTE(beisner) - race avoidance, proc may not exist yet.
847+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
848+ self.log.debug('Attempt {} to get {} proc start time on {} '
849+ 'failed\n{}'.format(tries, service,
850+ unit_name, e))
851+ time.sleep(retry_sleep_time)
852+ tries += 1
853
854 if not proc_start_time:
855 self.log.warn('No proc start time found, assuming service did '
856 'not start')
857 return False
858 if proc_start_time >= mtime:
859- self.log.debug('proc start time is newer than provided mtime'
860- '(%s >= %s)' % (proc_start_time, mtime))
861+ self.log.debug('Proc start time is newer than provided mtime'
862+ '(%s >= %s) on %s (OK)' % (proc_start_time,
863+ mtime, unit_name))
864 return True
865 else:
866- self.log.warn('proc start time (%s) is older than provided mtime '
867- '(%s), service did not restart' % (proc_start_time,
868- mtime))
869+ self.log.warn('Proc start time (%s) is older than provided mtime '
870+ '(%s) on %s, service did not '
871+ 'restart' % (proc_start_time, mtime, unit_name))
872 return False
873
874 def config_updated_since(self, sentry_unit, filename, mtime,
875- sleep_time=20):
876+ sleep_time=20, retry_count=30,
877+ retry_sleep_time=10):
878 """Check if file was modified after a given time.
879
880 Args:
881 sentry_unit (sentry): The sentry unit to check the file mtime on
882 filename (string): The file to check mtime of
883 mtime (float): The epoch time to check against
884- sleep_time (int): Seconds to sleep before looking for process
885+ sleep_time (int): Initial sleep time (s) before looking for file
886+ retry_sleep_time (int): Time (s) to sleep between retries
887+ retry_count (int): If file is not found, how many times to retry
888
889 Returns:
890 bool: True if file was modified more recently than mtime, False if
891- file was modified before mtime,
892+ file was modified before mtime, or if file not found.
893 """
894- self.log.debug('Checking %s updated since %s' % (filename, mtime))
895+ unit_name = sentry_unit.info['unit_name']
896+ self.log.debug('Checking that %s updated since %s on '
897+ '%s' % (filename, mtime, unit_name))
898 time.sleep(sleep_time)
899- file_mtime = self._get_file_mtime(sentry_unit, filename)
900+ file_mtime = None
901+ tries = 0
902+ while tries <= retry_count and not file_mtime:
903+ try:
904+ file_mtime = self._get_file_mtime(sentry_unit, filename)
905+ self.log.debug('Attempt {} to get {} file mtime on {} '
906+ 'OK'.format(tries, filename, unit_name))
907+ except IOError as e:
908+ # NOTE(beisner) - race avoidance, file may not exist yet.
909+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
910+ self.log.debug('Attempt {} to get {} file mtime on {} '
911+ 'failed\n{}'.format(tries, filename,
912+ unit_name, e))
913+ time.sleep(retry_sleep_time)
914+ tries += 1
915+
916+ if not file_mtime:
917+ self.log.warn('Could not determine file mtime, assuming '
918+ 'file does not exist')
919+ return False
920+
921 if file_mtime >= mtime:
922 self.log.debug('File mtime is newer than provided mtime '
923- '(%s >= %s)' % (file_mtime, mtime))
924+ '(%s >= %s) on %s (OK)' % (file_mtime,
925+ mtime, unit_name))
926 return True
927 else:
928- self.log.warn('File mtime %s is older than provided mtime %s'
929- % (file_mtime, mtime))
930+ self.log.warn('File mtime is older than provided mtime'
931+ '(%s < on %s) on %s' % (file_mtime,
932+ mtime, unit_name))
933 return False
934
935 def validate_service_config_changed(self, sentry_unit, mtime, service,
936- filename, pgrep_full=False,
937- sleep_time=20, retry_count=2):
938+ filename, pgrep_full=None,
939+ sleep_time=20, retry_count=30,
940+ retry_sleep_time=10):
941 """Check service and file were updated after mtime
942
943 Args:
944@@ -383,9 +448,10 @@
945 mtime (float): The epoch time to check against
946 service (string): service name to look for in process table
947 filename (string): The file to check mtime of
948- pgrep_full (boolean): Use full command line search mode with pgrep
949- sleep_time (int): Seconds to sleep before looking for process
950+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
951+ sleep_time (int): Initial sleep in seconds to pass to test helpers
952 retry_count (int): If service is not found, how many times to retry
953+ retry_sleep_time (int): Time in seconds to wait between retries
954
955 Typical Usage:
956 u = OpenStackAmuletUtils(ERROR)
957@@ -402,15 +468,27 @@
958 mtime, False if service is older than mtime or if service was
959 not found or if filename was modified before mtime.
960 """
961- self.log.debug('Checking %s restarted since %s' % (service, mtime))
962- time.sleep(sleep_time)
963- service_restart = self.service_restarted_since(sentry_unit, mtime,
964- service,
965- pgrep_full=pgrep_full,
966- sleep_time=0,
967- retry_count=retry_count)
968- config_update = self.config_updated_since(sentry_unit, filename, mtime,
969- sleep_time=0)
970+
971+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
972+ # used instead of pgrep. pgrep_full is still passed through to ensure
973+ # deprecation WARNS. lp1474030
974+
975+ service_restart = self.service_restarted_since(
976+ sentry_unit, mtime,
977+ service,
978+ pgrep_full=pgrep_full,
979+ sleep_time=sleep_time,
980+ retry_count=retry_count,
981+ retry_sleep_time=retry_sleep_time)
982+
983+ config_update = self.config_updated_since(
984+ sentry_unit,
985+ filename,
986+ mtime,
987+ sleep_time=sleep_time,
988+ retry_count=retry_count,
989+ retry_sleep_time=retry_sleep_time)
990+
991 return service_restart and config_update
992
993 def get_sentry_time(self, sentry_unit):
994@@ -428,7 +506,6 @@
995 """Return a list of all Ubuntu releases in order of release."""
996 _d = distro_info.UbuntuDistroInfo()
997 _release_list = _d.all
998- self.log.debug('Ubuntu release list: {}'.format(_release_list))
999 return _release_list
1000
1001 def file_to_url(self, file_rel_path):
1002@@ -568,6 +645,142 @@
1003
1004 return None
1005
1006+ def validate_sectionless_conf(self, file_contents, expected):
1007+ """A crude conf parser. Useful to inspect configuration files which
1008+ do not have section headers (as would be necessary in order to use
1009+ the configparser). Such as openstack-dashboard or rabbitmq confs."""
1010+ for line in file_contents.split('\n'):
1011+ if '=' in line:
1012+ args = line.split('=')
1013+ if len(args) <= 1:
1014+ continue
1015+ key = args[0].strip()
1016+ value = args[1].strip()
1017+ if key in expected.keys():
1018+ if expected[key] != value:
1019+ msg = ('Config mismatch. Expected, actual: {}, '
1020+ '{}'.format(expected[key], value))
1021+ amulet.raise_status(amulet.FAIL, msg=msg)
1022+
1023+ def get_unit_hostnames(self, units):
1024+ """Return a dict of juju unit names to hostnames."""
1025+ host_names = {}
1026+ for unit in units:
1027+ host_names[unit.info['unit_name']] = \
1028+ str(unit.file_contents('/etc/hostname').strip())
1029+ self.log.debug('Unit host names: {}'.format(host_names))
1030+ return host_names
1031+
1032+ def run_cmd_unit(self, sentry_unit, cmd):
1033+ """Run a command on a unit, return the output and exit code."""
1034+ output, code = sentry_unit.run(cmd)
1035+ if code == 0:
1036+ self.log.debug('{} `{}` command returned {} '
1037+ '(OK)'.format(sentry_unit.info['unit_name'],
1038+ cmd, code))
1039+ else:
1040+ msg = ('{} `{}` command returned {} '
1041+ '{}'.format(sentry_unit.info['unit_name'],
1042+ cmd, code, output))
1043+ amulet.raise_status(amulet.FAIL, msg=msg)
1044+ return str(output), code
1045+
1046+ def file_exists_on_unit(self, sentry_unit, file_name):
1047+ """Check if a file exists on a unit."""
1048+ try:
1049+ sentry_unit.file_stat(file_name)
1050+ return True
1051+ except IOError:
1052+ return False
1053+ except Exception as e:
1054+ msg = 'Error checking file {}: {}'.format(file_name, e)
1055+ amulet.raise_status(amulet.FAIL, msg=msg)
1056+
1057+ def file_contents_safe(self, sentry_unit, file_name,
1058+ max_wait=60, fatal=False):
1059+ """Get file contents from a sentry unit. Wrap amulet file_contents
1060+ with retry logic to address races where a file checks as existing,
1061+ but no longer exists by the time file_contents is called.
1062+ Return None if file not found. Optionally raise if fatal is True."""
1063+ unit_name = sentry_unit.info['unit_name']
1064+ file_contents = False
1065+ tries = 0
1066+ while not file_contents and tries < (max_wait / 4):
1067+ try:
1068+ file_contents = sentry_unit.file_contents(file_name)
1069+ except IOError:
1070+ self.log.debug('Attempt {} to open file {} from {} '
1071+ 'failed'.format(tries, file_name,
1072+ unit_name))
1073+ time.sleep(4)
1074+ tries += 1
1075+
1076+ if file_contents:
1077+ return file_contents
1078+ elif not fatal:
1079+ return None
1080+ elif fatal:
1081+ msg = 'Failed to get file contents from unit.'
1082+ amulet.raise_status(amulet.FAIL, msg)
1083+
1084+ def port_knock_tcp(self, host="localhost", port=22, timeout=15):
1085+ """Open a TCP socket to check for a listening sevice on a host.
1086+
1087+ :param host: host name or IP address, default to localhost
1088+ :param port: TCP port number, default to 22
1089+ :param timeout: Connect timeout, default to 15 seconds
1090+ :returns: True if successful, False if connect failed
1091+ """
1092+
1093+ # Resolve host name if possible
1094+ try:
1095+ connect_host = socket.gethostbyname(host)
1096+ host_human = "{} ({})".format(connect_host, host)
1097+ except socket.error as e:
1098+ self.log.warn('Unable to resolve address: '
1099+ '{} ({}) Trying anyway!'.format(host, e))
1100+ connect_host = host
1101+ host_human = connect_host
1102+
1103+ # Attempt socket connection
1104+ try:
1105+ knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1106+ knock.settimeout(timeout)
1107+ knock.connect((connect_host, port))
1108+ knock.close()
1109+ self.log.debug('Socket connect OK for host '
1110+ '{} on port {}.'.format(host_human, port))
1111+ return True
1112+ except socket.error as e:
1113+ self.log.debug('Socket connect FAIL for'
1114+ ' {} port {} ({})'.format(host_human, port, e))
1115+ return False
1116+
1117+ def port_knock_units(self, sentry_units, port=22,
1118+ timeout=15, expect_success=True):
1119+ """Open a TCP socket to check for a listening sevice on each
1120+ listed juju unit.
1121+
1122+ :param sentry_units: list of sentry unit pointers
1123+ :param port: TCP port number, default to 22
1124+ :param timeout: Connect timeout, default to 15 seconds
1125+ :expect_success: True by default, set False to invert logic
1126+ :returns: None if successful, Failure message otherwise
1127+ """
1128+ for unit in sentry_units:
1129+ host = unit.info['public-address']
1130+ connected = self.port_knock_tcp(host, port, timeout)
1131+ if not connected and expect_success:
1132+ return 'Socket connect failed.'
1133+ elif connected and not expect_success:
1134+ return 'Socket connected unexpectedly.'
1135+
1136+ def get_uuid_epoch_stamp(self):
1137+ """Returns a stamp string based on uuid4 and epoch time. Useful in
1138+ generating test messages which need to be unique-ish."""
1139+ return '[{}-{}]'.format(uuid.uuid4(), time.time())
1140+
1141+# amulet juju action helpers:
1142 def run_action(self, unit_sentry, action,
1143 _check_output=subprocess.check_output):
1144 """Run the named action on a given unit sentry.
1145@@ -594,3 +807,12 @@
1146 output = _check_output(command, universal_newlines=True)
1147 data = json.loads(output)
1148 return data.get(u"status") == "completed"
1149+
1150+ def status_get(self, unit):
1151+ """Return the current service status of this unit."""
1152+ raw_status, return_code = unit.run(
1153+ "status-get --format=json --include-data")
1154+ if return_code != 0:
1155+ return ("unknown", "")
1156+ status = json.loads(raw_status)
1157+ return (status["status"], status["message"])
1158
1159=== modified file 'tests/charmhelpers/contrib/openstack/amulet/deployment.py'
1160--- tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-19 13:50:42 +0000
1161+++ tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-11-18 20:55:22 +0000
1162@@ -14,12 +14,18 @@
1163 # You should have received a copy of the GNU Lesser General Public License
1164 # along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
1165
1166+import logging
1167+import re
1168+import sys
1169 import six
1170 from collections import OrderedDict
1171 from charmhelpers.contrib.amulet.deployment import (
1172 AmuletDeployment
1173 )
1174
1175+DEBUG = logging.DEBUG
1176+ERROR = logging.ERROR
1177+
1178
1179 class OpenStackAmuletDeployment(AmuletDeployment):
1180 """OpenStack amulet deployment.
1181@@ -28,9 +34,12 @@
1182 that is specifically for use by OpenStack charms.
1183 """
1184
1185- def __init__(self, series=None, openstack=None, source=None, stable=True):
1186+ def __init__(self, series=None, openstack=None, source=None,
1187+ stable=True, log_level=DEBUG):
1188 """Initialize the deployment environment."""
1189 super(OpenStackAmuletDeployment, self).__init__(series)
1190+ self.log = self.get_logger(level=log_level)
1191+ self.log.info('OpenStackAmuletDeployment: init')
1192 self.openstack = openstack
1193 self.source = source
1194 self.stable = stable
1195@@ -38,26 +47,55 @@
1196 # out.
1197 self.current_next = "trusty"
1198
1199+ def get_logger(self, name="deployment-logger", level=logging.DEBUG):
1200+ """Get a logger object that will log to stdout."""
1201+ log = logging
1202+ logger = log.getLogger(name)
1203+ fmt = log.Formatter("%(asctime)s %(funcName)s "
1204+ "%(levelname)s: %(message)s")
1205+
1206+ handler = log.StreamHandler(stream=sys.stdout)
1207+ handler.setLevel(level)
1208+ handler.setFormatter(fmt)
1209+
1210+ logger.addHandler(handler)
1211+ logger.setLevel(level)
1212+
1213+ return logger
1214+
1215 def _determine_branch_locations(self, other_services):
1216 """Determine the branch locations for the other services.
1217
1218 Determine if the local branch being tested is derived from its
1219 stable or next (dev) branch, and based on this, use the corresonding
1220 stable or next branches for the other_services."""
1221+
1222+ self.log.info('OpenStackAmuletDeployment: determine branch locations')
1223+
1224+ # Charms outside the lp:~openstack-charmers namespace
1225 base_charms = ['mysql', 'mongodb', 'nrpe']
1226
1227+ # Force these charms to current series even when using an older series.
1228+ # ie. Use trusty/nrpe even when series is precise, as the P charm
1229+ # does not possess the necessary external master config and hooks.
1230+ force_series_current = ['nrpe']
1231+
1232 if self.series in ['precise', 'trusty']:
1233 base_series = self.series
1234 else:
1235 base_series = self.current_next
1236
1237- if self.stable:
1238- for svc in other_services:
1239+ for svc in other_services:
1240+ if svc['name'] in force_series_current:
1241+ base_series = self.current_next
1242+ # If a location has been explicitly set, use it
1243+ if svc.get('location'):
1244+ continue
1245+ if self.stable:
1246 temp = 'lp:charms/{}/{}'
1247 svc['location'] = temp.format(base_series,
1248 svc['name'])
1249- else:
1250- for svc in other_services:
1251+ else:
1252 if svc['name'] in base_charms:
1253 temp = 'lp:charms/{}/{}'
1254 svc['location'] = temp.format(base_series,
1255@@ -66,10 +104,13 @@
1256 temp = 'lp:~openstack-charmers/charms/{}/{}/next'
1257 svc['location'] = temp.format(self.current_next,
1258 svc['name'])
1259+
1260 return other_services
1261
1262 def _add_services(self, this_service, other_services):
1263 """Add services to the deployment and set openstack-origin/source."""
1264+ self.log.info('OpenStackAmuletDeployment: adding services')
1265+
1266 other_services = self._determine_branch_locations(other_services)
1267
1268 super(OpenStackAmuletDeployment, self)._add_services(this_service,
1269@@ -77,29 +118,101 @@
1270
1271 services = other_services
1272 services.append(this_service)
1273+
1274+ # Charms which should use the source config option
1275 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
1276 'ceph-osd', 'ceph-radosgw']
1277- # Most OpenStack subordinate charms do not expose an origin option
1278- # as that is controlled by the principle.
1279- ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
1280+
1281+ # Charms which can not use openstack-origin, ie. many subordinates
1282+ no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
1283
1284 if self.openstack:
1285 for svc in services:
1286- if svc['name'] not in use_source + ignore:
1287+ if svc['name'] not in use_source + no_origin:
1288 config = {'openstack-origin': self.openstack}
1289 self.d.configure(svc['name'], config)
1290
1291 if self.source:
1292 for svc in services:
1293- if svc['name'] in use_source and svc['name'] not in ignore:
1294+ if svc['name'] in use_source and svc['name'] not in no_origin:
1295 config = {'source': self.source}
1296 self.d.configure(svc['name'], config)
1297
1298 def _configure_services(self, configs):
1299 """Configure all of the services."""
1300+ self.log.info('OpenStackAmuletDeployment: configure services')
1301 for service, config in six.iteritems(configs):
1302 self.d.configure(service, config)
1303
1304+ def _auto_wait_for_status(self, message=None, exclude_services=None,
1305+ include_only=None, timeout=1800):
1306+ """Wait for all units to have a specific extended status, except
1307+ for any defined as excluded. Unless specified via message, any
1308+ status containing any case of 'ready' will be considered a match.
1309+
1310+ Examples of message usage:
1311+
1312+ Wait for all unit status to CONTAIN any case of 'ready' or 'ok':
1313+ message = re.compile('.*ready.*|.*ok.*', re.IGNORECASE)
1314+
1315+ Wait for all units to reach this status (exact match):
1316+ message = re.compile('^Unit is ready and clustered$')
1317+
1318+ Wait for all units to reach any one of these (exact match):
1319+ message = re.compile('Unit is ready|OK|Ready')
1320+
1321+ Wait for at least one unit to reach this status (exact match):
1322+ message = {'ready'}
1323+
1324+ See Amulet's sentry.wait_for_messages() for message usage detail.
1325+ https://github.com/juju/amulet/blob/master/amulet/sentry.py
1326+
1327+ :param message: Expected status match
1328+ :param exclude_services: List of juju service names to ignore,
1329+ not to be used in conjuction with include_only.
1330+ :param include_only: List of juju service names to exclusively check,
1331+ not to be used in conjuction with exclude_services.
1332+ :param timeout: Maximum time in seconds to wait for status match
1333+ :returns: None. Raises if timeout is hit.
1334+ """
1335+ self.log.info('Waiting for extended status on units...')
1336+
1337+ all_services = self.d.services.keys()
1338+
1339+ if exclude_services and include_only:
1340+ raise ValueError('exclude_services can not be used '
1341+ 'with include_only')
1342+
1343+ if message:
1344+ if isinstance(message, re._pattern_type):
1345+ match = message.pattern
1346+ else:
1347+ match = message
1348+
1349+ self.log.debug('Custom extended status wait match: '
1350+ '{}'.format(match))
1351+ else:
1352+ self.log.debug('Default extended status wait match: contains '
1353+ 'READY (case-insensitive)')
1354+ message = re.compile('.*ready.*', re.IGNORECASE)
1355+
1356+ if exclude_services:
1357+ self.log.debug('Excluding services from extended status match: '
1358+ '{}'.format(exclude_services))
1359+ else:
1360+ exclude_services = []
1361+
1362+ if include_only:
1363+ services = include_only
1364+ else:
1365+ services = list(set(all_services) - set(exclude_services))
1366+
1367+ self.log.debug('Waiting up to {}s for extended status on services: '
1368+ '{}'.format(timeout, services))
1369+ service_messages = {service: message for service in services}
1370+ self.d.sentry.wait_for_messages(service_messages, timeout=timeout)
1371+ self.log.info('OK')
1372+
1373 def _get_openstack_release(self):
1374 """Get openstack release.
1375
1376
1377=== modified file 'tests/charmhelpers/contrib/openstack/amulet/utils.py'
1378--- tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-06-29 14:24:05 +0000
1379+++ tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-11-18 20:55:22 +0000
1380@@ -18,6 +18,7 @@
1381 import json
1382 import logging
1383 import os
1384+import re
1385 import six
1386 import time
1387 import urllib
1388@@ -27,6 +28,7 @@
1389 import heatclient.v1.client as heat_client
1390 import keystoneclient.v2_0 as keystone_client
1391 import novaclient.v1_1.client as nova_client
1392+import pika
1393 import swiftclient
1394
1395 from charmhelpers.contrib.amulet.utils import (
1396@@ -602,3 +604,382 @@
1397 self.log.debug('Ceph {} samples (OK): '
1398 '{}'.format(sample_type, samples))
1399 return None
1400+
1401+ # rabbitmq/amqp specific helpers:
1402+
1403+ def rmq_wait_for_cluster(self, deployment, init_sleep=15, timeout=1200):
1404+ """Wait for rmq units extended status to show cluster readiness,
1405+ after an optional initial sleep period. Initial sleep is likely
1406+ necessary to be effective following a config change, as status
1407+ message may not instantly update to non-ready."""
1408+
1409+ if init_sleep:
1410+ time.sleep(init_sleep)
1411+
1412+ message = re.compile('^Unit is ready and clustered$')
1413+ deployment._auto_wait_for_status(message=message,
1414+ timeout=timeout,
1415+ include_only=['rabbitmq-server'])
1416+
1417+ def add_rmq_test_user(self, sentry_units,
1418+ username="testuser1", password="changeme"):
1419+ """Add a test user via the first rmq juju unit, check connection as
1420+ the new user against all sentry units.
1421+
1422+ :param sentry_units: list of sentry unit pointers
1423+ :param username: amqp user name, default to testuser1
1424+ :param password: amqp user password
1425+ :returns: None if successful. Raise on error.
1426+ """
1427+ self.log.debug('Adding rmq user ({})...'.format(username))
1428+
1429+ # Check that user does not already exist
1430+ cmd_user_list = 'rabbitmqctl list_users'
1431+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
1432+ if username in output:
1433+ self.log.warning('User ({}) already exists, returning '
1434+ 'gracefully.'.format(username))
1435+ return
1436+
1437+ perms = '".*" ".*" ".*"'
1438+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
1439+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
1440+
1441+ # Add user via first unit
1442+ for cmd in cmds:
1443+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
1444+
1445+ # Check connection against the other sentry_units
1446+ self.log.debug('Checking user connect against units...')
1447+ for sentry_unit in sentry_units:
1448+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
1449+ username=username,
1450+ password=password)
1451+ connection.close()
1452+
1453+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
1454+ """Delete a rabbitmq user via the first rmq juju unit.
1455+
1456+ :param sentry_units: list of sentry unit pointers
1457+ :param username: amqp user name, default to testuser1
1458+ :param password: amqp user password
1459+ :returns: None if successful or no such user.
1460+ """
1461+ self.log.debug('Deleting rmq user ({})...'.format(username))
1462+
1463+ # Check that the user exists
1464+ cmd_user_list = 'rabbitmqctl list_users'
1465+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
1466+
1467+ if username not in output:
1468+ self.log.warning('User ({}) does not exist, returning '
1469+ 'gracefully.'.format(username))
1470+ return
1471+
1472+ # Delete the user
1473+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
1474+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
1475+
1476+ def get_rmq_cluster_status(self, sentry_unit):
1477+ """Execute rabbitmq cluster status command on a unit and return
1478+ the full output.
1479+
1480+ :param unit: sentry unit
1481+ :returns: String containing console output of cluster status command
1482+ """
1483+ cmd = 'rabbitmqctl cluster_status'
1484+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
1485+ self.log.debug('{} cluster_status:\n{}'.format(
1486+ sentry_unit.info['unit_name'], output))
1487+ return str(output)
1488+
1489+ def get_rmq_cluster_running_nodes(self, sentry_unit):
1490+ """Parse rabbitmqctl cluster_status output string, return list of
1491+ running rabbitmq cluster nodes.
1492+
1493+ :param unit: sentry unit
1494+ :returns: List containing node names of running nodes
1495+ """
1496+ # NOTE(beisner): rabbitmqctl cluster_status output is not
1497+ # json-parsable, do string chop foo, then json.loads that.
1498+ str_stat = self.get_rmq_cluster_status(sentry_unit)
1499+ if 'running_nodes' in str_stat:
1500+ pos_start = str_stat.find("{running_nodes,") + 15
1501+ pos_end = str_stat.find("]},", pos_start) + 1
1502+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
1503+ run_nodes = json.loads(str_run_nodes)
1504+ return run_nodes
1505+ else:
1506+ return []
1507+
1508+ def validate_rmq_cluster_running_nodes(self, sentry_units):
1509+ """Check that all rmq unit hostnames are represented in the
1510+ cluster_status output of all units.
1511+
1512+ :param host_names: dict of juju unit names to host names
1513+ :param units: list of sentry unit pointers (all rmq units)
1514+ :returns: None if successful, otherwise return error message
1515+ """
1516+ host_names = self.get_unit_hostnames(sentry_units)
1517+ errors = []
1518+
1519+ # Query every unit for cluster_status running nodes
1520+ for query_unit in sentry_units:
1521+ query_unit_name = query_unit.info['unit_name']
1522+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
1523+
1524+ # Confirm that every unit is represented in the queried unit's
1525+ # cluster_status running nodes output.
1526+ for validate_unit in sentry_units:
1527+ val_host_name = host_names[validate_unit.info['unit_name']]
1528+ val_node_name = 'rabbit@{}'.format(val_host_name)
1529+
1530+ if val_node_name not in running_nodes:
1531+ errors.append('Cluster member check failed on {}: {} not '
1532+ 'in {}\n'.format(query_unit_name,
1533+ val_node_name,
1534+ running_nodes))
1535+ if errors:
1536+ return ''.join(errors)
1537+
1538+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
1539+ """Check a single juju rmq unit for ssl and port in the config file."""
1540+ host = sentry_unit.info['public-address']
1541+ unit_name = sentry_unit.info['unit_name']
1542+
1543+ conf_file = '/etc/rabbitmq/rabbitmq.config'
1544+ conf_contents = str(self.file_contents_safe(sentry_unit,
1545+ conf_file, max_wait=16))
1546+ # Checks
1547+ conf_ssl = 'ssl' in conf_contents
1548+ conf_port = str(port) in conf_contents
1549+
1550+ # Port explicitly checked in config
1551+ if port and conf_port and conf_ssl:
1552+ self.log.debug('SSL is enabled @{}:{} '
1553+ '({})'.format(host, port, unit_name))
1554+ return True
1555+ elif port and not conf_port and conf_ssl:
1556+ self.log.debug('SSL is enabled @{} but not on port {} '
1557+ '({})'.format(host, port, unit_name))
1558+ return False
1559+ # Port not checked (useful when checking that ssl is disabled)
1560+ elif not port and conf_ssl:
1561+ self.log.debug('SSL is enabled @{}:{} '
1562+ '({})'.format(host, port, unit_name))
1563+ return True
1564+ elif not conf_ssl:
1565+ self.log.debug('SSL not enabled @{}:{} '
1566+ '({})'.format(host, port, unit_name))
1567+ return False
1568+ else:
1569+ msg = ('Unknown condition when checking SSL status @{}:{} '
1570+ '({})'.format(host, port, unit_name))
1571+ amulet.raise_status(amulet.FAIL, msg)
1572+
1573+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
1574+ """Check that ssl is enabled on rmq juju sentry units.
1575+
1576+ :param sentry_units: list of all rmq sentry units
1577+ :param port: optional ssl port override to validate
1578+ :returns: None if successful, otherwise return error message
1579+ """
1580+ for sentry_unit in sentry_units:
1581+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
1582+ return ('Unexpected condition: ssl is disabled on unit '
1583+ '({})'.format(sentry_unit.info['unit_name']))
1584+ return None
1585+
1586+ def validate_rmq_ssl_disabled_units(self, sentry_units):
1587+ """Check that ssl is enabled on listed rmq juju sentry units.
1588+
1589+ :param sentry_units: list of all rmq sentry units
1590+ :returns: True if successful. Raise on error.
1591+ """
1592+ for sentry_unit in sentry_units:
1593+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
1594+ return ('Unexpected condition: ssl is enabled on unit '
1595+ '({})'.format(sentry_unit.info['unit_name']))
1596+ return None
1597+
1598+ def configure_rmq_ssl_on(self, sentry_units, deployment,
1599+ port=None, max_wait=60):
1600+ """Turn ssl charm config option on, with optional non-default
1601+ ssl port specification. Confirm that it is enabled on every
1602+ unit.
1603+
1604+ :param sentry_units: list of sentry units
1605+ :param deployment: amulet deployment object pointer
1606+ :param port: amqp port, use defaults if None
1607+ :param max_wait: maximum time to wait in seconds to confirm
1608+ :returns: None if successful. Raise on error.
1609+ """
1610+ self.log.debug('Setting ssl charm config option: on')
1611+
1612+ # Enable RMQ SSL
1613+ config = {'ssl': 'on'}
1614+ if port:
1615+ config['ssl_port'] = port
1616+
1617+ deployment.d.configure('rabbitmq-server', config)
1618+
1619+ # Wait for unit status
1620+ self.rmq_wait_for_cluster(deployment)
1621+
1622+ # Confirm
1623+ tries = 0
1624+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
1625+ while ret and tries < (max_wait / 4):
1626+ time.sleep(4)
1627+ self.log.debug('Attempt {}: {}'.format(tries, ret))
1628+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
1629+ tries += 1
1630+
1631+ if ret:
1632+ amulet.raise_status(amulet.FAIL, ret)
1633+
1634+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
1635+ """Turn ssl charm config option off, confirm that it is disabled
1636+ on every unit.
1637+
1638+ :param sentry_units: list of sentry units
1639+ :param deployment: amulet deployment object pointer
1640+ :param max_wait: maximum time to wait in seconds to confirm
1641+ :returns: None if successful. Raise on error.
1642+ """
1643+ self.log.debug('Setting ssl charm config option: off')
1644+
1645+ # Disable RMQ SSL
1646+ config = {'ssl': 'off'}
1647+ deployment.d.configure('rabbitmq-server', config)
1648+
1649+ # Wait for unit status
1650+ self.rmq_wait_for_cluster(deployment)
1651+
1652+ # Confirm
1653+ tries = 0
1654+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
1655+ while ret and tries < (max_wait / 4):
1656+ time.sleep(4)
1657+ self.log.debug('Attempt {}: {}'.format(tries, ret))
1658+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
1659+ tries += 1
1660+
1661+ if ret:
1662+ amulet.raise_status(amulet.FAIL, ret)
1663+
1664+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
1665+ port=None, fatal=True,
1666+ username="testuser1", password="changeme"):
1667+ """Establish and return a pika amqp connection to the rabbitmq service
1668+ running on a rmq juju unit.
1669+
1670+ :param sentry_unit: sentry unit pointer
1671+ :param ssl: boolean, default to False
1672+ :param port: amqp port, use defaults if None
1673+ :param fatal: boolean, default to True (raises on connect error)
1674+ :param username: amqp user name, default to testuser1
1675+ :param password: amqp user password
1676+ :returns: pika amqp connection pointer or None if failed and non-fatal
1677+ """
1678+ host = sentry_unit.info['public-address']
1679+ unit_name = sentry_unit.info['unit_name']
1680+
1681+ # Default port logic if port is not specified
1682+ if ssl and not port:
1683+ port = 5671
1684+ elif not ssl and not port:
1685+ port = 5672
1686+
1687+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
1688+ '{}...'.format(host, port, unit_name, username))
1689+
1690+ try:
1691+ credentials = pika.PlainCredentials(username, password)
1692+ parameters = pika.ConnectionParameters(host=host, port=port,
1693+ credentials=credentials,
1694+ ssl=ssl,
1695+ connection_attempts=3,
1696+ retry_delay=5,
1697+ socket_timeout=1)
1698+ connection = pika.BlockingConnection(parameters)
1699+ assert connection.server_properties['product'] == 'RabbitMQ'
1700+ self.log.debug('Connect OK')
1701+ return connection
1702+ except Exception as e:
1703+ msg = ('amqp connection failed to {}:{} as '
1704+ '{} ({})'.format(host, port, username, str(e)))
1705+ if fatal:
1706+ amulet.raise_status(amulet.FAIL, msg)
1707+ else:
1708+ self.log.warn(msg)
1709+ return None
1710+
1711+ def publish_amqp_message_by_unit(self, sentry_unit, message,
1712+ queue="test", ssl=False,
1713+ username="testuser1",
1714+ password="changeme",
1715+ port=None):
1716+ """Publish an amqp message to a rmq juju unit.
1717+
1718+ :param sentry_unit: sentry unit pointer
1719+ :param message: amqp message string
1720+ :param queue: message queue, default to test
1721+ :param username: amqp user name, default to testuser1
1722+ :param password: amqp user password
1723+ :param ssl: boolean, default to False
1724+ :param port: amqp port, use defaults if None
1725+ :returns: None. Raises exception if publish failed.
1726+ """
1727+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
1728+ message))
1729+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
1730+ port=port,
1731+ username=username,
1732+ password=password)
1733+
1734+ # NOTE(beisner): extra debug here re: pika hang potential:
1735+ # https://github.com/pika/pika/issues/297
1736+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
1737+ self.log.debug('Defining channel...')
1738+ channel = connection.channel()
1739+ self.log.debug('Declaring queue...')
1740+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
1741+ self.log.debug('Publishing message...')
1742+ channel.basic_publish(exchange='', routing_key=queue, body=message)
1743+ self.log.debug('Closing channel...')
1744+ channel.close()
1745+ self.log.debug('Closing connection...')
1746+ connection.close()
1747+
1748+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
1749+ username="testuser1",
1750+ password="changeme",
1751+ ssl=False, port=None):
1752+ """Get an amqp message from a rmq juju unit.
1753+
1754+ :param sentry_unit: sentry unit pointer
1755+ :param queue: message queue, default to test
1756+ :param username: amqp user name, default to testuser1
1757+ :param password: amqp user password
1758+ :param ssl: boolean, default to False
1759+ :param port: amqp port, use defaults if None
1760+ :returns: amqp message body as string. Raise if get fails.
1761+ """
1762+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
1763+ port=port,
1764+ username=username,
1765+ password=password)
1766+ channel = connection.channel()
1767+ method_frame, _, body = channel.basic_get(queue)
1768+
1769+ if method_frame:
1770+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
1771+ body))
1772+ channel.basic_ack(method_frame.delivery_tag)
1773+ channel.close()
1774+ connection.close()
1775+ return body
1776+ else:
1777+ msg = 'No message retrieved.'
1778+ amulet.raise_status(amulet.FAIL, msg)

Subscribers

People subscribed via source and target branches