Merge lp:~james-page/charms/trusty/swift-storage/resync into lp:~openstack-charmers-archive/charms/trusty/swift-storage/next

Proposed by James Page
Status: Merged
Merged at revision: 81
Proposed branch: lp:~james-page/charms/trusty/swift-storage/resync
Merge into: lp:~openstack-charmers-archive/charms/trusty/swift-storage/next
Diff against target: 1786 lines (+1339/-100)
10 files modified
charmhelpers/contrib/openstack/amulet/deployment.py (+20/-5)
charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
charmhelpers/contrib/openstack/context.py (+8/-9)
charmhelpers/contrib/openstack/utils.py (+4/-0)
charmhelpers/contrib/storage/linux/ceph.py (+226/-13)
charmhelpers/core/host.py (+32/-16)
charmhelpers/core/kernel.py (+68/-0)
tests/charmhelpers/contrib/amulet/utils.py (+243/-52)
tests/charmhelpers/contrib/openstack/amulet/deployment.py (+20/-5)
tests/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
To merge this branch: bzr merge lp:~james-page/charms/trusty/swift-storage/resync
Reviewer Review Type Date Requested Status
OpenStack Charmers Pending
Review via email: mp+270946@code.launchpad.net
To post a comment you must log in.
Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #9931 swift-storage-next for james-page mp270946
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/9931/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #9150 swift-storage-next for james-page mp270946
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/9150/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6414 swift-storage-next for james-page mp270946
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/6414/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'charmhelpers/contrib/openstack/amulet/deployment.py'
2--- charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-17 13:44:42 +0000
3+++ charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-14 11:48:26 +0000
4@@ -44,8 +44,15 @@
5 Determine if the local branch being tested is derived from its
6 stable or next (dev) branch, and based on this, use the corresonding
7 stable or next branches for the other_services."""
8+
9+ # Charms outside the lp:~openstack-charmers namespace
10 base_charms = ['mysql', 'mongodb', 'nrpe']
11
12+ # Force these charms to current series even when using an older series.
13+ # ie. Use trusty/nrpe even when series is precise, as the P charm
14+ # does not possess the necessary external master config and hooks.
15+ force_series_current = ['nrpe']
16+
17 if self.series in ['precise', 'trusty']:
18 base_series = self.series
19 else:
20@@ -53,11 +60,17 @@
21
22 if self.stable:
23 for svc in other_services:
24+ if svc['name'] in force_series_current:
25+ base_series = self.current_next
26+
27 temp = 'lp:charms/{}/{}'
28 svc['location'] = temp.format(base_series,
29 svc['name'])
30 else:
31 for svc in other_services:
32+ if svc['name'] in force_series_current:
33+ base_series = self.current_next
34+
35 if svc['name'] in base_charms:
36 temp = 'lp:charms/{}/{}'
37 svc['location'] = temp.format(base_series,
38@@ -77,21 +90,23 @@
39
40 services = other_services
41 services.append(this_service)
42+
43+ # Charms which should use the source config option
44 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
45 'ceph-osd', 'ceph-radosgw']
46- # Most OpenStack subordinate charms do not expose an origin option
47- # as that is controlled by the principle.
48- ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
49+
50+ # Charms which can not use openstack-origin, ie. many subordinates
51+ no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
52
53 if self.openstack:
54 for svc in services:
55- if svc['name'] not in use_source + ignore:
56+ if svc['name'] not in use_source + no_origin:
57 config = {'openstack-origin': self.openstack}
58 self.d.configure(svc['name'], config)
59
60 if self.source:
61 for svc in services:
62- if svc['name'] in use_source and svc['name'] not in ignore:
63+ if svc['name'] in use_source and svc['name'] not in no_origin:
64 config = {'source': self.source}
65 self.d.configure(svc['name'], config)
66
67
68=== modified file 'charmhelpers/contrib/openstack/amulet/utils.py'
69--- charmhelpers/contrib/openstack/amulet/utils.py 2015-08-11 08:17:24 +0000
70+++ charmhelpers/contrib/openstack/amulet/utils.py 2015-09-14 11:48:26 +0000
71@@ -27,6 +27,7 @@
72 import heatclient.v1.client as heat_client
73 import keystoneclient.v2_0 as keystone_client
74 import novaclient.v1_1.client as nova_client
75+import pika
76 import swiftclient
77
78 from charmhelpers.contrib.amulet.utils import (
79@@ -602,3 +603,361 @@
80 self.log.debug('Ceph {} samples (OK): '
81 '{}'.format(sample_type, samples))
82 return None
83+
84+# rabbitmq/amqp specific helpers:
85+ def add_rmq_test_user(self, sentry_units,
86+ username="testuser1", password="changeme"):
87+ """Add a test user via the first rmq juju unit, check connection as
88+ the new user against all sentry units.
89+
90+ :param sentry_units: list of sentry unit pointers
91+ :param username: amqp user name, default to testuser1
92+ :param password: amqp user password
93+ :returns: None if successful. Raise on error.
94+ """
95+ self.log.debug('Adding rmq user ({})...'.format(username))
96+
97+ # Check that user does not already exist
98+ cmd_user_list = 'rabbitmqctl list_users'
99+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
100+ if username in output:
101+ self.log.warning('User ({}) already exists, returning '
102+ 'gracefully.'.format(username))
103+ return
104+
105+ perms = '".*" ".*" ".*"'
106+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
107+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
108+
109+ # Add user via first unit
110+ for cmd in cmds:
111+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
112+
113+ # Check connection against the other sentry_units
114+ self.log.debug('Checking user connect against units...')
115+ for sentry_unit in sentry_units:
116+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
117+ username=username,
118+ password=password)
119+ connection.close()
120+
121+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
122+ """Delete a rabbitmq user via the first rmq juju unit.
123+
124+ :param sentry_units: list of sentry unit pointers
125+ :param username: amqp user name, default to testuser1
126+ :param password: amqp user password
127+ :returns: None if successful or no such user.
128+ """
129+ self.log.debug('Deleting rmq user ({})...'.format(username))
130+
131+ # Check that the user exists
132+ cmd_user_list = 'rabbitmqctl list_users'
133+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
134+
135+ if username not in output:
136+ self.log.warning('User ({}) does not exist, returning '
137+ 'gracefully.'.format(username))
138+ return
139+
140+ # Delete the user
141+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
142+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
143+
144+ def get_rmq_cluster_status(self, sentry_unit):
145+ """Execute rabbitmq cluster status command on a unit and return
146+ the full output.
147+
148+ :param unit: sentry unit
149+ :returns: String containing console output of cluster status command
150+ """
151+ cmd = 'rabbitmqctl cluster_status'
152+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
153+ self.log.debug('{} cluster_status:\n{}'.format(
154+ sentry_unit.info['unit_name'], output))
155+ return str(output)
156+
157+ def get_rmq_cluster_running_nodes(self, sentry_unit):
158+ """Parse rabbitmqctl cluster_status output string, return list of
159+ running rabbitmq cluster nodes.
160+
161+ :param unit: sentry unit
162+ :returns: List containing node names of running nodes
163+ """
164+ # NOTE(beisner): rabbitmqctl cluster_status output is not
165+ # json-parsable, do string chop foo, then json.loads that.
166+ str_stat = self.get_rmq_cluster_status(sentry_unit)
167+ if 'running_nodes' in str_stat:
168+ pos_start = str_stat.find("{running_nodes,") + 15
169+ pos_end = str_stat.find("]},", pos_start) + 1
170+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
171+ run_nodes = json.loads(str_run_nodes)
172+ return run_nodes
173+ else:
174+ return []
175+
176+ def validate_rmq_cluster_running_nodes(self, sentry_units):
177+ """Check that all rmq unit hostnames are represented in the
178+ cluster_status output of all units.
179+
180+ :param host_names: dict of juju unit names to host names
181+ :param units: list of sentry unit pointers (all rmq units)
182+ :returns: None if successful, otherwise return error message
183+ """
184+ host_names = self.get_unit_hostnames(sentry_units)
185+ errors = []
186+
187+ # Query every unit for cluster_status running nodes
188+ for query_unit in sentry_units:
189+ query_unit_name = query_unit.info['unit_name']
190+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
191+
192+ # Confirm that every unit is represented in the queried unit's
193+ # cluster_status running nodes output.
194+ for validate_unit in sentry_units:
195+ val_host_name = host_names[validate_unit.info['unit_name']]
196+ val_node_name = 'rabbit@{}'.format(val_host_name)
197+
198+ if val_node_name not in running_nodes:
199+ errors.append('Cluster member check failed on {}: {} not '
200+ 'in {}\n'.format(query_unit_name,
201+ val_node_name,
202+ running_nodes))
203+ if errors:
204+ return ''.join(errors)
205+
206+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
207+ """Check a single juju rmq unit for ssl and port in the config file."""
208+ host = sentry_unit.info['public-address']
209+ unit_name = sentry_unit.info['unit_name']
210+
211+ conf_file = '/etc/rabbitmq/rabbitmq.config'
212+ conf_contents = str(self.file_contents_safe(sentry_unit,
213+ conf_file, max_wait=16))
214+ # Checks
215+ conf_ssl = 'ssl' in conf_contents
216+ conf_port = str(port) in conf_contents
217+
218+ # Port explicitly checked in config
219+ if port and conf_port and conf_ssl:
220+ self.log.debug('SSL is enabled @{}:{} '
221+ '({})'.format(host, port, unit_name))
222+ return True
223+ elif port and not conf_port and conf_ssl:
224+ self.log.debug('SSL is enabled @{} but not on port {} '
225+ '({})'.format(host, port, unit_name))
226+ return False
227+ # Port not checked (useful when checking that ssl is disabled)
228+ elif not port and conf_ssl:
229+ self.log.debug('SSL is enabled @{}:{} '
230+ '({})'.format(host, port, unit_name))
231+ return True
232+ elif not port and not conf_ssl:
233+ self.log.debug('SSL not enabled @{}:{} '
234+ '({})'.format(host, port, unit_name))
235+ return False
236+ else:
237+ msg = ('Unknown condition when checking SSL status @{}:{} '
238+ '({})'.format(host, port, unit_name))
239+ amulet.raise_status(amulet.FAIL, msg)
240+
241+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
242+ """Check that ssl is enabled on rmq juju sentry units.
243+
244+ :param sentry_units: list of all rmq sentry units
245+ :param port: optional ssl port override to validate
246+ :returns: None if successful, otherwise return error message
247+ """
248+ for sentry_unit in sentry_units:
249+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
250+ return ('Unexpected condition: ssl is disabled on unit '
251+ '({})'.format(sentry_unit.info['unit_name']))
252+ return None
253+
254+ def validate_rmq_ssl_disabled_units(self, sentry_units):
255+ """Check that ssl is enabled on listed rmq juju sentry units.
256+
257+ :param sentry_units: list of all rmq sentry units
258+ :returns: True if successful. Raise on error.
259+ """
260+ for sentry_unit in sentry_units:
261+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
262+ return ('Unexpected condition: ssl is enabled on unit '
263+ '({})'.format(sentry_unit.info['unit_name']))
264+ return None
265+
266+ def configure_rmq_ssl_on(self, sentry_units, deployment,
267+ port=None, max_wait=60):
268+ """Turn ssl charm config option on, with optional non-default
269+ ssl port specification. Confirm that it is enabled on every
270+ unit.
271+
272+ :param sentry_units: list of sentry units
273+ :param deployment: amulet deployment object pointer
274+ :param port: amqp port, use defaults if None
275+ :param max_wait: maximum time to wait in seconds to confirm
276+ :returns: None if successful. Raise on error.
277+ """
278+ self.log.debug('Setting ssl charm config option: on')
279+
280+ # Enable RMQ SSL
281+ config = {'ssl': 'on'}
282+ if port:
283+ config['ssl_port'] = port
284+
285+ deployment.configure('rabbitmq-server', config)
286+
287+ # Confirm
288+ tries = 0
289+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
290+ while ret and tries < (max_wait / 4):
291+ time.sleep(4)
292+ self.log.debug('Attempt {}: {}'.format(tries, ret))
293+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
294+ tries += 1
295+
296+ if ret:
297+ amulet.raise_status(amulet.FAIL, ret)
298+
299+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
300+ """Turn ssl charm config option off, confirm that it is disabled
301+ on every unit.
302+
303+ :param sentry_units: list of sentry units
304+ :param deployment: amulet deployment object pointer
305+ :param max_wait: maximum time to wait in seconds to confirm
306+ :returns: None if successful. Raise on error.
307+ """
308+ self.log.debug('Setting ssl charm config option: off')
309+
310+ # Disable RMQ SSL
311+ config = {'ssl': 'off'}
312+ deployment.configure('rabbitmq-server', config)
313+
314+ # Confirm
315+ tries = 0
316+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
317+ while ret and tries < (max_wait / 4):
318+ time.sleep(4)
319+ self.log.debug('Attempt {}: {}'.format(tries, ret))
320+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
321+ tries += 1
322+
323+ if ret:
324+ amulet.raise_status(amulet.FAIL, ret)
325+
326+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
327+ port=None, fatal=True,
328+ username="testuser1", password="changeme"):
329+ """Establish and return a pika amqp connection to the rabbitmq service
330+ running on a rmq juju unit.
331+
332+ :param sentry_unit: sentry unit pointer
333+ :param ssl: boolean, default to False
334+ :param port: amqp port, use defaults if None
335+ :param fatal: boolean, default to True (raises on connect error)
336+ :param username: amqp user name, default to testuser1
337+ :param password: amqp user password
338+ :returns: pika amqp connection pointer or None if failed and non-fatal
339+ """
340+ host = sentry_unit.info['public-address']
341+ unit_name = sentry_unit.info['unit_name']
342+
343+ # Default port logic if port is not specified
344+ if ssl and not port:
345+ port = 5671
346+ elif not ssl and not port:
347+ port = 5672
348+
349+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
350+ '{}...'.format(host, port, unit_name, username))
351+
352+ try:
353+ credentials = pika.PlainCredentials(username, password)
354+ parameters = pika.ConnectionParameters(host=host, port=port,
355+ credentials=credentials,
356+ ssl=ssl,
357+ connection_attempts=3,
358+ retry_delay=5,
359+ socket_timeout=1)
360+ connection = pika.BlockingConnection(parameters)
361+ assert connection.server_properties['product'] == 'RabbitMQ'
362+ self.log.debug('Connect OK')
363+ return connection
364+ except Exception as e:
365+ msg = ('amqp connection failed to {}:{} as '
366+ '{} ({})'.format(host, port, username, str(e)))
367+ if fatal:
368+ amulet.raise_status(amulet.FAIL, msg)
369+ else:
370+ self.log.warn(msg)
371+ return None
372+
373+ def publish_amqp_message_by_unit(self, sentry_unit, message,
374+ queue="test", ssl=False,
375+ username="testuser1",
376+ password="changeme",
377+ port=None):
378+ """Publish an amqp message to a rmq juju unit.
379+
380+ :param sentry_unit: sentry unit pointer
381+ :param message: amqp message string
382+ :param queue: message queue, default to test
383+ :param username: amqp user name, default to testuser1
384+ :param password: amqp user password
385+ :param ssl: boolean, default to False
386+ :param port: amqp port, use defaults if None
387+ :returns: None. Raises exception if publish failed.
388+ """
389+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
390+ message))
391+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
392+ port=port,
393+ username=username,
394+ password=password)
395+
396+ # NOTE(beisner): extra debug here re: pika hang potential:
397+ # https://github.com/pika/pika/issues/297
398+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
399+ self.log.debug('Defining channel...')
400+ channel = connection.channel()
401+ self.log.debug('Declaring queue...')
402+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
403+ self.log.debug('Publishing message...')
404+ channel.basic_publish(exchange='', routing_key=queue, body=message)
405+ self.log.debug('Closing channel...')
406+ channel.close()
407+ self.log.debug('Closing connection...')
408+ connection.close()
409+
410+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
411+ username="testuser1",
412+ password="changeme",
413+ ssl=False, port=None):
414+ """Get an amqp message from a rmq juju unit.
415+
416+ :param sentry_unit: sentry unit pointer
417+ :param queue: message queue, default to test
418+ :param username: amqp user name, default to testuser1
419+ :param password: amqp user password
420+ :param ssl: boolean, default to False
421+ :param port: amqp port, use defaults if None
422+ :returns: amqp message body as string. Raise if get fails.
423+ """
424+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
425+ port=port,
426+ username=username,
427+ password=password)
428+ channel = connection.channel()
429+ method_frame, _, body = channel.basic_get(queue)
430+
431+ if method_frame:
432+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
433+ body))
434+ channel.basic_ack(method_frame.delivery_tag)
435+ channel.close()
436+ connection.close()
437+ return body
438+ else:
439+ msg = 'No message retrieved.'
440+ amulet.raise_status(amulet.FAIL, msg)
441
442=== modified file 'charmhelpers/contrib/openstack/context.py'
443--- charmhelpers/contrib/openstack/context.py 2015-09-03 09:43:52 +0000
444+++ charmhelpers/contrib/openstack/context.py 2015-09-14 11:48:26 +0000
445@@ -485,13 +485,15 @@
446
447 log('Generating template context for ceph', level=DEBUG)
448 mon_hosts = []
449- auth = None
450- key = None
451- use_syslog = str(config('use-syslog')).lower()
452+ ctxt = {
453+ 'use_syslog': str(config('use-syslog')).lower()
454+ }
455 for rid in relation_ids('ceph'):
456 for unit in related_units(rid):
457- auth = relation_get('auth', rid=rid, unit=unit)
458- key = relation_get('key', rid=rid, unit=unit)
459+ if not ctxt.get('auth'):
460+ ctxt['auth'] = relation_get('auth', rid=rid, unit=unit)
461+ if not ctxt.get('key'):
462+ ctxt['key'] = relation_get('key', rid=rid, unit=unit)
463 ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
464 unit=unit)
465 unit_priv_addr = relation_get('private-address', rid=rid,
466@@ -500,10 +502,7 @@
467 ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
468 mon_hosts.append(ceph_addr)
469
470- ctxt = {'mon_hosts': ' '.join(sorted(mon_hosts)),
471- 'auth': auth,
472- 'key': key,
473- 'use_syslog': use_syslog}
474+ ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
475
476 if not os.path.isdir('/etc/ceph'):
477 os.mkdir('/etc/ceph')
478
479=== modified file 'charmhelpers/contrib/openstack/utils.py'
480--- charmhelpers/contrib/openstack/utils.py 2015-09-07 09:16:50 +0000
481+++ charmhelpers/contrib/openstack/utils.py 2015-09-14 11:48:26 +0000
482@@ -114,6 +114,7 @@
483 ('2.2.1', 'kilo'),
484 ('2.2.2', 'kilo'),
485 ('2.3.0', 'liberty'),
486+ ('2.4.0', 'liberty'),
487 ])
488
489 # >= Liberty version->codename mapping
490@@ -142,6 +143,9 @@
491 'glance-common': OrderedDict([
492 ('11.0.0', 'liberty'),
493 ]),
494+ 'openstack-dashboard': OrderedDict([
495+ ('8.0.0', 'liberty'),
496+ ]),
497 }
498
499 DEFAULT_LOOPBACK_SIZE = '5G'
500
501=== modified file 'charmhelpers/contrib/storage/linux/ceph.py'
502--- charmhelpers/contrib/storage/linux/ceph.py 2015-08-11 08:17:24 +0000
503+++ charmhelpers/contrib/storage/linux/ceph.py 2015-09-14 11:48:26 +0000
504@@ -28,6 +28,7 @@
505 import shutil
506 import json
507 import time
508+import uuid
509
510 from subprocess import (
511 check_call,
512@@ -35,8 +36,10 @@
513 CalledProcessError,
514 )
515 from charmhelpers.core.hookenv import (
516+ local_unit,
517 relation_get,
518 relation_ids,
519+ relation_set,
520 related_units,
521 log,
522 DEBUG,
523@@ -56,6 +59,8 @@
524 apt_install,
525 )
526
527+from charmhelpers.core.kernel import modprobe
528+
529 KEYRING = '/etc/ceph/ceph.client.{}.keyring'
530 KEYFILE = '/etc/ceph/ceph.client.{}.key'
531
532@@ -288,17 +293,6 @@
533 os.chown(data_src_dst, uid, gid)
534
535
536-# TODO: re-use
537-def modprobe(module):
538- """Load a kernel module and configure for auto-load on reboot."""
539- log('Loading kernel module', level=INFO)
540- cmd = ['modprobe', module]
541- check_call(cmd)
542- with open('/etc/modules', 'r+') as modules:
543- if module not in modules.read():
544- modules.write(module)
545-
546-
547 def copy_files(src, dst, symlinks=False, ignore=None):
548 """Copy files from src to dst."""
549 for item in os.listdir(src):
550@@ -411,17 +405,52 @@
551
552 The API is versioned and defaults to version 1.
553 """
554- def __init__(self, api_version=1):
555+ def __init__(self, api_version=1, request_id=None):
556 self.api_version = api_version
557+ if request_id:
558+ self.request_id = request_id
559+ else:
560+ self.request_id = str(uuid.uuid1())
561 self.ops = []
562
563 def add_op_create_pool(self, name, replica_count=3):
564 self.ops.append({'op': 'create-pool', 'name': name,
565 'replicas': replica_count})
566
567+ def set_ops(self, ops):
568+ """Set request ops to provided value.
569+
570+ Useful for injecting ops that come from a previous request
571+ to allow comparisons to ensure validity.
572+ """
573+ self.ops = ops
574+
575 @property
576 def request(self):
577- return json.dumps({'api-version': self.api_version, 'ops': self.ops})
578+ return json.dumps({'api-version': self.api_version, 'ops': self.ops,
579+ 'request-id': self.request_id})
580+
581+ def _ops_equal(self, other):
582+ if len(self.ops) == len(other.ops):
583+ for req_no in range(0, len(self.ops)):
584+ for key in ['replicas', 'name', 'op']:
585+ if self.ops[req_no][key] != other.ops[req_no][key]:
586+ return False
587+ else:
588+ return False
589+ return True
590+
591+ def __eq__(self, other):
592+ if not isinstance(other, self.__class__):
593+ return False
594+ if self.api_version == other.api_version and \
595+ self._ops_equal(other):
596+ return True
597+ else:
598+ return False
599+
600+ def __ne__(self, other):
601+ return not self.__eq__(other)
602
603
604 class CephBrokerRsp(object):
605@@ -431,14 +460,198 @@
606
607 The API is versioned and defaults to version 1.
608 """
609+
610 def __init__(self, encoded_rsp):
611 self.api_version = None
612 self.rsp = json.loads(encoded_rsp)
613
614 @property
615+ def request_id(self):
616+ return self.rsp.get('request-id')
617+
618+ @property
619 def exit_code(self):
620 return self.rsp.get('exit-code')
621
622 @property
623 def exit_msg(self):
624 return self.rsp.get('stderr')
625+
626+
627+# Ceph Broker Conversation:
628+# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
629+# and send that request to ceph via the ceph relation. The CephBrokerRq has a
630+# unique id so that the client can identity which CephBrokerRsp is associated
631+# with the request. Ceph will also respond to each client unit individually
632+# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
633+# via key broker-rsp-glance-0
634+#
635+# To use this the charm can just do something like:
636+#
637+# from charmhelpers.contrib.storage.linux.ceph import (
638+# send_request_if_needed,
639+# is_request_complete,
640+# CephBrokerRq,
641+# )
642+#
643+# @hooks.hook('ceph-relation-changed')
644+# def ceph_changed():
645+# rq = CephBrokerRq()
646+# rq.add_op_create_pool(name='poolname', replica_count=3)
647+#
648+# if is_request_complete(rq):
649+# <Request complete actions>
650+# else:
651+# send_request_if_needed(get_ceph_request())
652+#
653+# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
654+# of glance having sent a request to ceph which ceph has successfully processed
655+# 'ceph:8': {
656+# 'ceph/0': {
657+# 'auth': 'cephx',
658+# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
659+# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
660+# 'ceph-public-address': '10.5.44.103',
661+# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
662+# 'private-address': '10.5.44.103',
663+# },
664+# 'glance/0': {
665+# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
666+# '"ops": [{"replicas": 3, "name": "glance", '
667+# '"op": "create-pool"}]}'),
668+# 'private-address': '10.5.44.109',
669+# },
670+# }
671+
672+def get_previous_request(rid):
673+ """Return the last ceph broker request sent on a given relation
674+
675+ @param rid: Relation id to query for request
676+ """
677+ request = None
678+ broker_req = relation_get(attribute='broker_req', rid=rid,
679+ unit=local_unit())
680+ if broker_req:
681+ request_data = json.loads(broker_req)
682+ request = CephBrokerRq(api_version=request_data['api-version'],
683+ request_id=request_data['request-id'])
684+ request.set_ops(request_data['ops'])
685+
686+ return request
687+
688+
689+def get_request_states(request):
690+ """Return a dict of requests per relation id with their corresponding
691+ completion state.
692+
693+ This allows a charm, which has a request for ceph, to see whether there is
694+ an equivalent request already being processed and if so what state that
695+ request is in.
696+
697+ @param request: A CephBrokerRq object
698+ """
699+ complete = []
700+ requests = {}
701+ for rid in relation_ids('ceph'):
702+ complete = False
703+ previous_request = get_previous_request(rid)
704+ if request == previous_request:
705+ sent = True
706+ complete = is_request_complete_for_rid(previous_request, rid)
707+ else:
708+ sent = False
709+ complete = False
710+
711+ requests[rid] = {
712+ 'sent': sent,
713+ 'complete': complete,
714+ }
715+
716+ return requests
717+
718+
719+def is_request_sent(request):
720+ """Check to see if a functionally equivalent request has already been sent
721+
722+ Returns True if a similair request has been sent
723+
724+ @param request: A CephBrokerRq object
725+ """
726+ states = get_request_states(request)
727+ for rid in states.keys():
728+ if not states[rid]['sent']:
729+ return False
730+
731+ return True
732+
733+
734+def is_request_complete(request):
735+ """Check to see if a functionally equivalent request has already been
736+ completed
737+
738+ Returns True if a similair request has been completed
739+
740+ @param request: A CephBrokerRq object
741+ """
742+ states = get_request_states(request)
743+ for rid in states.keys():
744+ if not states[rid]['complete']:
745+ return False
746+
747+ return True
748+
749+
750+def is_request_complete_for_rid(request, rid):
751+ """Check if a given request has been completed on the given relation
752+
753+ @param request: A CephBrokerRq object
754+ @param rid: Relation ID
755+ """
756+ broker_key = get_broker_rsp_key()
757+ for unit in related_units(rid):
758+ rdata = relation_get(rid=rid, unit=unit)
759+ if rdata.get(broker_key):
760+ rsp = CephBrokerRsp(rdata.get(broker_key))
761+ if rsp.request_id == request.request_id:
762+ if not rsp.exit_code:
763+ return True
764+ else:
765+ # The remote unit sent no reply targeted at this unit so either the
766+ # remote ceph cluster does not support unit targeted replies or it
767+ # has not processed our request yet.
768+ if rdata.get('broker_rsp'):
769+ request_data = json.loads(rdata['broker_rsp'])
770+ if request_data.get('request-id'):
771+ log('Ignoring legacy broker_rsp without unit key as remote '
772+ 'service supports unit specific replies', level=DEBUG)
773+ else:
774+ log('Using legacy broker_rsp as remote service does not '
775+ 'supports unit specific replies', level=DEBUG)
776+ rsp = CephBrokerRsp(rdata['broker_rsp'])
777+ if not rsp.exit_code:
778+ return True
779+
780+ return False
781+
782+
783+def get_broker_rsp_key():
784+ """Return broker response key for this unit
785+
786+ This is the key that ceph is going to use to pass request status
787+ information back to this unit
788+ """
789+ return 'broker-rsp-' + local_unit().replace('/', '-')
790+
791+
792+def send_request_if_needed(request):
793+ """Send broker request if an equivalent request has not already been sent
794+
795+ @param request: A CephBrokerRq object
796+ """
797+ if is_request_sent(request):
798+ log('Request already sent but not complete, not sending new request',
799+ level=DEBUG)
800+ else:
801+ for rid in relation_ids('ceph'):
802+ log('Sending request {}'.format(request.request_id), level=DEBUG)
803+ relation_set(relation_id=rid, broker_req=request.request)
804
805=== modified file 'charmhelpers/core/host.py'
806--- charmhelpers/core/host.py 2015-08-19 13:52:16 +0000
807+++ charmhelpers/core/host.py 2015-09-14 11:48:26 +0000
808@@ -63,32 +63,48 @@
809 return service_result
810
811
812-def service_pause(service_name, init_dir=None):
813+def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
814 """Pause a system service.
815
816 Stop it, and prevent it from starting again at boot."""
817- if init_dir is None:
818- init_dir = "/etc/init"
819 stopped = service_stop(service_name)
820- # XXX: Support systemd too
821- override_path = os.path.join(
822- init_dir, '{}.override'.format(service_name))
823- with open(override_path, 'w') as fh:
824- fh.write("manual\n")
825+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
826+ sysv_file = os.path.join(initd_dir, service_name)
827+ if os.path.exists(upstart_file):
828+ override_path = os.path.join(
829+ init_dir, '{}.override'.format(service_name))
830+ with open(override_path, 'w') as fh:
831+ fh.write("manual\n")
832+ elif os.path.exists(sysv_file):
833+ subprocess.check_call(["update-rc.d", service_name, "disable"])
834+ else:
835+ # XXX: Support SystemD too
836+ raise ValueError(
837+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
838+ service_name, upstart_file, sysv_file))
839 return stopped
840
841
842-def service_resume(service_name, init_dir=None):
843+def service_resume(service_name, init_dir="/etc/init",
844+ initd_dir="/etc/init.d"):
845 """Resume a system service.
846
847 Reenable starting again at boot. Start the service"""
848- # XXX: Support systemd too
849- if init_dir is None:
850- init_dir = "/etc/init"
851- override_path = os.path.join(
852- init_dir, '{}.override'.format(service_name))
853- if os.path.exists(override_path):
854- os.unlink(override_path)
855+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
856+ sysv_file = os.path.join(initd_dir, service_name)
857+ if os.path.exists(upstart_file):
858+ override_path = os.path.join(
859+ init_dir, '{}.override'.format(service_name))
860+ if os.path.exists(override_path):
861+ os.unlink(override_path)
862+ elif os.path.exists(sysv_file):
863+ subprocess.check_call(["update-rc.d", service_name, "enable"])
864+ else:
865+ # XXX: Support SystemD too
866+ raise ValueError(
867+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
868+ service_name, upstart_file, sysv_file))
869+
870 started = service_start(service_name)
871 return started
872
873
874=== added file 'charmhelpers/core/kernel.py'
875--- charmhelpers/core/kernel.py 1970-01-01 00:00:00 +0000
876+++ charmhelpers/core/kernel.py 2015-09-14 11:48:26 +0000
877@@ -0,0 +1,68 @@
878+#!/usr/bin/env python
879+# -*- coding: utf-8 -*-
880+
881+# Copyright 2014-2015 Canonical Limited.
882+#
883+# This file is part of charm-helpers.
884+#
885+# charm-helpers is free software: you can redistribute it and/or modify
886+# it under the terms of the GNU Lesser General Public License version 3 as
887+# published by the Free Software Foundation.
888+#
889+# charm-helpers is distributed in the hope that it will be useful,
890+# but WITHOUT ANY WARRANTY; without even the implied warranty of
891+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
892+# GNU Lesser General Public License for more details.
893+#
894+# You should have received a copy of the GNU Lesser General Public License
895+# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
896+
897+__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
898+
899+from charmhelpers.core.hookenv import (
900+ log,
901+ INFO
902+)
903+
904+from subprocess import check_call, check_output
905+import re
906+
907+
908+def modprobe(module, persist=True):
909+ """Load a kernel module and configure for auto-load on reboot."""
910+ cmd = ['modprobe', module]
911+
912+ log('Loading kernel module %s' % module, level=INFO)
913+
914+ check_call(cmd)
915+ if persist:
916+ with open('/etc/modules', 'r+') as modules:
917+ if module not in modules.read():
918+ modules.write(module)
919+
920+
921+def rmmod(module, force=False):
922+ """Remove a module from the linux kernel"""
923+ cmd = ['rmmod']
924+ if force:
925+ cmd.append('-f')
926+ cmd.append(module)
927+ log('Removing kernel module %s' % module, level=INFO)
928+ return check_call(cmd)
929+
930+
931+def lsmod():
932+ """Shows what kernel modules are currently loaded"""
933+ return check_output(['lsmod'],
934+ universal_newlines=True)
935+
936+
937+def is_module_loaded(module):
938+ """Checks if a kernel module is already loaded"""
939+ matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
940+ return len(matches) > 0
941+
942+
943+def update_initramfs(version='all'):
944+ """Updates an initramfs image"""
945+ return check_call(["update-initramfs", "-k", version, "-u"])
946
947=== modified file 'tests/charmhelpers/contrib/amulet/utils.py'
948--- tests/charmhelpers/contrib/amulet/utils.py 2015-08-17 13:44:42 +0000
949+++ tests/charmhelpers/contrib/amulet/utils.py 2015-09-14 11:48:26 +0000
950@@ -19,9 +19,11 @@
951 import logging
952 import os
953 import re
954+import socket
955 import subprocess
956 import sys
957 import time
958+import uuid
959
960 import amulet
961 import distro_info
962@@ -114,7 +116,7 @@
963 # /!\ DEPRECATION WARNING (beisner):
964 # New and existing tests should be rewritten to use
965 # validate_services_by_name() as it is aware of init systems.
966- self.log.warn('/!\\ DEPRECATION WARNING: use '
967+ self.log.warn('DEPRECATION WARNING: use '
968 'validate_services_by_name instead of validate_services '
969 'due to init system differences.')
970
971@@ -269,33 +271,52 @@
972 """Get last modification time of directory."""
973 return sentry_unit.directory_stat(directory)['mtime']
974
975- def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
976- """Get process' start time.
977-
978- Determine start time of the process based on the last modification
979- time of the /proc/pid directory. If pgrep_full is True, the process
980- name is matched against the full command line.
981- """
982- if pgrep_full:
983- cmd = 'pgrep -o -f {}'.format(service)
984- else:
985- cmd = 'pgrep -o {}'.format(service)
986- cmd = cmd + ' | grep -v pgrep || exit 0'
987- cmd_out = sentry_unit.run(cmd)
988- self.log.debug('CMDout: ' + str(cmd_out))
989- if cmd_out[0]:
990- self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
991- proc_dir = '/proc/{}'.format(cmd_out[0].strip())
992- return self._get_dir_mtime(sentry_unit, proc_dir)
993+ def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
994+ """Get start time of a process based on the last modification time
995+ of the /proc/pid directory.
996+
997+ :sentry_unit: The sentry unit to check for the service on
998+ :service: service name to look for in process table
999+ :pgrep_full: [Deprecated] Use full command line search mode with pgrep
1000+ :returns: epoch time of service process start
1001+ :param commands: list of bash commands
1002+ :param sentry_units: list of sentry unit pointers
1003+ :returns: None if successful; Failure message otherwise
1004+ """
1005+ if pgrep_full is not None:
1006+ # /!\ DEPRECATION WARNING (beisner):
1007+ # No longer implemented, as pidof is now used instead of pgrep.
1008+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1009+ self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
1010+ 'longer implemented re: lp 1474030.')
1011+
1012+ pid_list = self.get_process_id_list(sentry_unit, service)
1013+ pid = pid_list[0]
1014+ proc_dir = '/proc/{}'.format(pid)
1015+ self.log.debug('Pid for {} on {}: {}'.format(
1016+ service, sentry_unit.info['unit_name'], pid))
1017+
1018+ return self._get_dir_mtime(sentry_unit, proc_dir)
1019
1020 def service_restarted(self, sentry_unit, service, filename,
1021- pgrep_full=False, sleep_time=20):
1022+ pgrep_full=None, sleep_time=20):
1023 """Check if service was restarted.
1024
1025 Compare a service's start time vs a file's last modification time
1026 (such as a config file for that service) to determine if the service
1027 has been restarted.
1028 """
1029+ # /!\ DEPRECATION WARNING (beisner):
1030+ # This method is prone to races in that no before-time is known.
1031+ # Use validate_service_config_changed instead.
1032+
1033+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1034+ # used instead of pgrep. pgrep_full is still passed through to ensure
1035+ # deprecation WARNS. lp1474030
1036+ self.log.warn('DEPRECATION WARNING: use '
1037+ 'validate_service_config_changed instead of '
1038+ 'service_restarted due to known races.')
1039+
1040 time.sleep(sleep_time)
1041 if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
1042 self._get_file_mtime(sentry_unit, filename)):
1043@@ -304,15 +325,15 @@
1044 return False
1045
1046 def service_restarted_since(self, sentry_unit, mtime, service,
1047- pgrep_full=False, sleep_time=20,
1048- retry_count=2):
1049+ pgrep_full=None, sleep_time=20,
1050+ retry_count=2, retry_sleep_time=30):
1051 """Check if service was been started after a given time.
1052
1053 Args:
1054 sentry_unit (sentry): The sentry unit to check for the service on
1055 mtime (float): The epoch time to check against
1056 service (string): service name to look for in process table
1057- pgrep_full (boolean): Use full command line search mode with pgrep
1058+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1059 sleep_time (int): Seconds to sleep before looking for process
1060 retry_count (int): If service is not found, how many times to retry
1061
1062@@ -321,30 +342,44 @@
1063 False if service is older than mtime or if service was
1064 not found.
1065 """
1066- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1067+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1068+ # used instead of pgrep. pgrep_full is still passed through to ensure
1069+ # deprecation WARNS. lp1474030
1070+
1071+ unit_name = sentry_unit.info['unit_name']
1072+ self.log.debug('Checking that %s service restarted since %s on '
1073+ '%s' % (service, mtime, unit_name))
1074 time.sleep(sleep_time)
1075- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1076- pgrep_full)
1077- while retry_count > 0 and not proc_start_time:
1078- self.log.debug('No pid file found for service %s, will retry %i '
1079- 'more times' % (service, retry_count))
1080- time.sleep(30)
1081- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1082- pgrep_full)
1083- retry_count = retry_count - 1
1084+ proc_start_time = None
1085+ tries = 0
1086+ while tries <= retry_count and not proc_start_time:
1087+ try:
1088+ proc_start_time = self._get_proc_start_time(sentry_unit,
1089+ service,
1090+ pgrep_full)
1091+ self.log.debug('Attempt {} to get {} proc start time on {} '
1092+ 'OK'.format(tries, service, unit_name))
1093+ except IOError:
1094+ # NOTE(beisner) - race avoidance, proc may not exist yet.
1095+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1096+ self.log.debug('Attempt {} to get {} proc start time on {} '
1097+ 'failed'.format(tries, service, unit_name))
1098+ time.sleep(retry_sleep_time)
1099+ tries += 1
1100
1101 if not proc_start_time:
1102 self.log.warn('No proc start time found, assuming service did '
1103 'not start')
1104 return False
1105 if proc_start_time >= mtime:
1106- self.log.debug('proc start time is newer than provided mtime'
1107- '(%s >= %s)' % (proc_start_time, mtime))
1108+ self.log.debug('Proc start time is newer than provided mtime'
1109+ '(%s >= %s) on %s (OK)' % (proc_start_time,
1110+ mtime, unit_name))
1111 return True
1112 else:
1113- self.log.warn('proc start time (%s) is older than provided mtime '
1114- '(%s), service did not restart' % (proc_start_time,
1115- mtime))
1116+ self.log.warn('Proc start time (%s) is older than provided mtime '
1117+ '(%s) on %s, service did not '
1118+ 'restart' % (proc_start_time, mtime, unit_name))
1119 return False
1120
1121 def config_updated_since(self, sentry_unit, filename, mtime,
1122@@ -374,8 +409,9 @@
1123 return False
1124
1125 def validate_service_config_changed(self, sentry_unit, mtime, service,
1126- filename, pgrep_full=False,
1127- sleep_time=20, retry_count=2):
1128+ filename, pgrep_full=None,
1129+ sleep_time=20, retry_count=2,
1130+ retry_sleep_time=30):
1131 """Check service and file were updated after mtime
1132
1133 Args:
1134@@ -383,9 +419,10 @@
1135 mtime (float): The epoch time to check against
1136 service (string): service name to look for in process table
1137 filename (string): The file to check mtime of
1138- pgrep_full (boolean): Use full command line search mode with pgrep
1139- sleep_time (int): Seconds to sleep before looking for process
1140+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1141+ sleep_time (int): Initial sleep in seconds to pass to test helpers
1142 retry_count (int): If service is not found, how many times to retry
1143+ retry_sleep_time (int): Time in seconds to wait between retries
1144
1145 Typical Usage:
1146 u = OpenStackAmuletUtils(ERROR)
1147@@ -402,15 +439,25 @@
1148 mtime, False if service is older than mtime or if service was
1149 not found or if filename was modified before mtime.
1150 """
1151- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1152- time.sleep(sleep_time)
1153- service_restart = self.service_restarted_since(sentry_unit, mtime,
1154- service,
1155- pgrep_full=pgrep_full,
1156- sleep_time=0,
1157- retry_count=retry_count)
1158- config_update = self.config_updated_since(sentry_unit, filename, mtime,
1159- sleep_time=0)
1160+
1161+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1162+ # used instead of pgrep. pgrep_full is still passed through to ensure
1163+ # deprecation WARNS. lp1474030
1164+
1165+ service_restart = self.service_restarted_since(
1166+ sentry_unit, mtime,
1167+ service,
1168+ pgrep_full=pgrep_full,
1169+ sleep_time=sleep_time,
1170+ retry_count=retry_count,
1171+ retry_sleep_time=retry_sleep_time)
1172+
1173+ config_update = self.config_updated_since(
1174+ sentry_unit,
1175+ filename,
1176+ mtime,
1177+ sleep_time=0)
1178+
1179 return service_restart and config_update
1180
1181 def get_sentry_time(self, sentry_unit):
1182@@ -428,7 +475,6 @@
1183 """Return a list of all Ubuntu releases in order of release."""
1184 _d = distro_info.UbuntuDistroInfo()
1185 _release_list = _d.all
1186- self.log.debug('Ubuntu release list: {}'.format(_release_list))
1187 return _release_list
1188
1189 def file_to_url(self, file_rel_path):
1190@@ -568,6 +614,142 @@
1191
1192 return None
1193
1194+ def validate_sectionless_conf(self, file_contents, expected):
1195+ """A crude conf parser. Useful to inspect configuration files which
1196+ do not have section headers (as would be necessary in order to use
1197+ the configparser). Such as openstack-dashboard or rabbitmq confs."""
1198+ for line in file_contents.split('\n'):
1199+ if '=' in line:
1200+ args = line.split('=')
1201+ if len(args) <= 1:
1202+ continue
1203+ key = args[0].strip()
1204+ value = args[1].strip()
1205+ if key in expected.keys():
1206+ if expected[key] != value:
1207+ msg = ('Config mismatch. Expected, actual: {}, '
1208+ '{}'.format(expected[key], value))
1209+ amulet.raise_status(amulet.FAIL, msg=msg)
1210+
1211+ def get_unit_hostnames(self, units):
1212+ """Return a dict of juju unit names to hostnames."""
1213+ host_names = {}
1214+ for unit in units:
1215+ host_names[unit.info['unit_name']] = \
1216+ str(unit.file_contents('/etc/hostname').strip())
1217+ self.log.debug('Unit host names: {}'.format(host_names))
1218+ return host_names
1219+
1220+ def run_cmd_unit(self, sentry_unit, cmd):
1221+ """Run a command on a unit, return the output and exit code."""
1222+ output, code = sentry_unit.run(cmd)
1223+ if code == 0:
1224+ self.log.debug('{} `{}` command returned {} '
1225+ '(OK)'.format(sentry_unit.info['unit_name'],
1226+ cmd, code))
1227+ else:
1228+ msg = ('{} `{}` command returned {} '
1229+ '{}'.format(sentry_unit.info['unit_name'],
1230+ cmd, code, output))
1231+ amulet.raise_status(amulet.FAIL, msg=msg)
1232+ return str(output), code
1233+
1234+ def file_exists_on_unit(self, sentry_unit, file_name):
1235+ """Check if a file exists on a unit."""
1236+ try:
1237+ sentry_unit.file_stat(file_name)
1238+ return True
1239+ except IOError:
1240+ return False
1241+ except Exception as e:
1242+ msg = 'Error checking file {}: {}'.format(file_name, e)
1243+ amulet.raise_status(amulet.FAIL, msg=msg)
1244+
1245+ def file_contents_safe(self, sentry_unit, file_name,
1246+ max_wait=60, fatal=False):
1247+ """Get file contents from a sentry unit. Wrap amulet file_contents
1248+ with retry logic to address races where a file checks as existing,
1249+ but no longer exists by the time file_contents is called.
1250+ Return None if file not found. Optionally raise if fatal is True."""
1251+ unit_name = sentry_unit.info['unit_name']
1252+ file_contents = False
1253+ tries = 0
1254+ while not file_contents and tries < (max_wait / 4):
1255+ try:
1256+ file_contents = sentry_unit.file_contents(file_name)
1257+ except IOError:
1258+ self.log.debug('Attempt {} to open file {} from {} '
1259+ 'failed'.format(tries, file_name,
1260+ unit_name))
1261+ time.sleep(4)
1262+ tries += 1
1263+
1264+ if file_contents:
1265+ return file_contents
1266+ elif not fatal:
1267+ return None
1268+ elif fatal:
1269+ msg = 'Failed to get file contents from unit.'
1270+ amulet.raise_status(amulet.FAIL, msg)
1271+
1272+ def port_knock_tcp(self, host="localhost", port=22, timeout=15):
1273+ """Open a TCP socket to check for a listening sevice on a host.
1274+
1275+ :param host: host name or IP address, default to localhost
1276+ :param port: TCP port number, default to 22
1277+ :param timeout: Connect timeout, default to 15 seconds
1278+ :returns: True if successful, False if connect failed
1279+ """
1280+
1281+ # Resolve host name if possible
1282+ try:
1283+ connect_host = socket.gethostbyname(host)
1284+ host_human = "{} ({})".format(connect_host, host)
1285+ except socket.error as e:
1286+ self.log.warn('Unable to resolve address: '
1287+ '{} ({}) Trying anyway!'.format(host, e))
1288+ connect_host = host
1289+ host_human = connect_host
1290+
1291+ # Attempt socket connection
1292+ try:
1293+ knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1294+ knock.settimeout(timeout)
1295+ knock.connect((connect_host, port))
1296+ knock.close()
1297+ self.log.debug('Socket connect OK for host '
1298+ '{} on port {}.'.format(host_human, port))
1299+ return True
1300+ except socket.error as e:
1301+ self.log.debug('Socket connect FAIL for'
1302+ ' {} port {} ({})'.format(host_human, port, e))
1303+ return False
1304+
1305+ def port_knock_units(self, sentry_units, port=22,
1306+ timeout=15, expect_success=True):
1307+ """Open a TCP socket to check for a listening sevice on each
1308+ listed juju unit.
1309+
1310+ :param sentry_units: list of sentry unit pointers
1311+ :param port: TCP port number, default to 22
1312+ :param timeout: Connect timeout, default to 15 seconds
1313+ :expect_success: True by default, set False to invert logic
1314+ :returns: None if successful, Failure message otherwise
1315+ """
1316+ for unit in sentry_units:
1317+ host = unit.info['public-address']
1318+ connected = self.port_knock_tcp(host, port, timeout)
1319+ if not connected and expect_success:
1320+ return 'Socket connect failed.'
1321+ elif connected and not expect_success:
1322+ return 'Socket connected unexpectedly.'
1323+
1324+ def get_uuid_epoch_stamp(self):
1325+ """Returns a stamp string based on uuid4 and epoch time. Useful in
1326+ generating test messages which need to be unique-ish."""
1327+ return '[{}-{}]'.format(uuid.uuid4(), time.time())
1328+
1329+# amulet juju action helpers:
1330 def run_action(self, unit_sentry, action,
1331 _check_output=subprocess.check_output):
1332 """Run the named action on a given unit sentry.
1333@@ -594,3 +776,12 @@
1334 output = _check_output(command, universal_newlines=True)
1335 data = json.loads(output)
1336 return data.get(u"status") == "completed"
1337+
1338+ def status_get(self, unit):
1339+ """Return the current service status of this unit."""
1340+ raw_status, return_code = unit.run(
1341+ "status-get --format=json --include-data")
1342+ if return_code != 0:
1343+ return ("unknown", "")
1344+ status = json.loads(raw_status)
1345+ return (status["status"], status["message"])
1346
1347=== modified file 'tests/charmhelpers/contrib/openstack/amulet/deployment.py'
1348--- tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-17 13:44:42 +0000
1349+++ tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-14 11:48:26 +0000
1350@@ -44,8 +44,15 @@
1351 Determine if the local branch being tested is derived from its
1352 stable or next (dev) branch, and based on this, use the corresonding
1353 stable or next branches for the other_services."""
1354+
1355+ # Charms outside the lp:~openstack-charmers namespace
1356 base_charms = ['mysql', 'mongodb', 'nrpe']
1357
1358+ # Force these charms to current series even when using an older series.
1359+ # ie. Use trusty/nrpe even when series is precise, as the P charm
1360+ # does not possess the necessary external master config and hooks.
1361+ force_series_current = ['nrpe']
1362+
1363 if self.series in ['precise', 'trusty']:
1364 base_series = self.series
1365 else:
1366@@ -53,11 +60,17 @@
1367
1368 if self.stable:
1369 for svc in other_services:
1370+ if svc['name'] in force_series_current:
1371+ base_series = self.current_next
1372+
1373 temp = 'lp:charms/{}/{}'
1374 svc['location'] = temp.format(base_series,
1375 svc['name'])
1376 else:
1377 for svc in other_services:
1378+ if svc['name'] in force_series_current:
1379+ base_series = self.current_next
1380+
1381 if svc['name'] in base_charms:
1382 temp = 'lp:charms/{}/{}'
1383 svc['location'] = temp.format(base_series,
1384@@ -77,21 +90,23 @@
1385
1386 services = other_services
1387 services.append(this_service)
1388+
1389+ # Charms which should use the source config option
1390 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
1391 'ceph-osd', 'ceph-radosgw']
1392- # Most OpenStack subordinate charms do not expose an origin option
1393- # as that is controlled by the principle.
1394- ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
1395+
1396+ # Charms which can not use openstack-origin, ie. many subordinates
1397+ no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
1398
1399 if self.openstack:
1400 for svc in services:
1401- if svc['name'] not in use_source + ignore:
1402+ if svc['name'] not in use_source + no_origin:
1403 config = {'openstack-origin': self.openstack}
1404 self.d.configure(svc['name'], config)
1405
1406 if self.source:
1407 for svc in services:
1408- if svc['name'] in use_source and svc['name'] not in ignore:
1409+ if svc['name'] in use_source and svc['name'] not in no_origin:
1410 config = {'source': self.source}
1411 self.d.configure(svc['name'], config)
1412
1413
1414=== modified file 'tests/charmhelpers/contrib/openstack/amulet/utils.py'
1415--- tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-08-11 08:17:24 +0000
1416+++ tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-14 11:48:26 +0000
1417@@ -27,6 +27,7 @@
1418 import heatclient.v1.client as heat_client
1419 import keystoneclient.v2_0 as keystone_client
1420 import novaclient.v1_1.client as nova_client
1421+import pika
1422 import swiftclient
1423
1424 from charmhelpers.contrib.amulet.utils import (
1425@@ -602,3 +603,361 @@
1426 self.log.debug('Ceph {} samples (OK): '
1427 '{}'.format(sample_type, samples))
1428 return None
1429+
1430+# rabbitmq/amqp specific helpers:
1431+ def add_rmq_test_user(self, sentry_units,
1432+ username="testuser1", password="changeme"):
1433+ """Add a test user via the first rmq juju unit, check connection as
1434+ the new user against all sentry units.
1435+
1436+ :param sentry_units: list of sentry unit pointers
1437+ :param username: amqp user name, default to testuser1
1438+ :param password: amqp user password
1439+ :returns: None if successful. Raise on error.
1440+ """
1441+ self.log.debug('Adding rmq user ({})...'.format(username))
1442+
1443+ # Check that user does not already exist
1444+ cmd_user_list = 'rabbitmqctl list_users'
1445+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
1446+ if username in output:
1447+ self.log.warning('User ({}) already exists, returning '
1448+ 'gracefully.'.format(username))
1449+ return
1450+
1451+ perms = '".*" ".*" ".*"'
1452+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
1453+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
1454+
1455+ # Add user via first unit
1456+ for cmd in cmds:
1457+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
1458+
1459+ # Check connection against the other sentry_units
1460+ self.log.debug('Checking user connect against units...')
1461+ for sentry_unit in sentry_units:
1462+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
1463+ username=username,
1464+ password=password)
1465+ connection.close()
1466+
1467+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
1468+ """Delete a rabbitmq user via the first rmq juju unit.
1469+
1470+ :param sentry_units: list of sentry unit pointers
1471+ :param username: amqp user name, default to testuser1
1472+ :param password: amqp user password
1473+ :returns: None if successful or no such user.
1474+ """
1475+ self.log.debug('Deleting rmq user ({})...'.format(username))
1476+
1477+ # Check that the user exists
1478+ cmd_user_list = 'rabbitmqctl list_users'
1479+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
1480+
1481+ if username not in output:
1482+ self.log.warning('User ({}) does not exist, returning '
1483+ 'gracefully.'.format(username))
1484+ return
1485+
1486+ # Delete the user
1487+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
1488+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
1489+
1490+ def get_rmq_cluster_status(self, sentry_unit):
1491+ """Execute rabbitmq cluster status command on a unit and return
1492+ the full output.
1493+
1494+ :param unit: sentry unit
1495+ :returns: String containing console output of cluster status command
1496+ """
1497+ cmd = 'rabbitmqctl cluster_status'
1498+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
1499+ self.log.debug('{} cluster_status:\n{}'.format(
1500+ sentry_unit.info['unit_name'], output))
1501+ return str(output)
1502+
1503+ def get_rmq_cluster_running_nodes(self, sentry_unit):
1504+ """Parse rabbitmqctl cluster_status output string, return list of
1505+ running rabbitmq cluster nodes.
1506+
1507+ :param unit: sentry unit
1508+ :returns: List containing node names of running nodes
1509+ """
1510+ # NOTE(beisner): rabbitmqctl cluster_status output is not
1511+ # json-parsable, do string chop foo, then json.loads that.
1512+ str_stat = self.get_rmq_cluster_status(sentry_unit)
1513+ if 'running_nodes' in str_stat:
1514+ pos_start = str_stat.find("{running_nodes,") + 15
1515+ pos_end = str_stat.find("]},", pos_start) + 1
1516+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
1517+ run_nodes = json.loads(str_run_nodes)
1518+ return run_nodes
1519+ else:
1520+ return []
1521+
1522+ def validate_rmq_cluster_running_nodes(self, sentry_units):
1523+ """Check that all rmq unit hostnames are represented in the
1524+ cluster_status output of all units.
1525+
1526+ :param host_names: dict of juju unit names to host names
1527+ :param units: list of sentry unit pointers (all rmq units)
1528+ :returns: None if successful, otherwise return error message
1529+ """
1530+ host_names = self.get_unit_hostnames(sentry_units)
1531+ errors = []
1532+
1533+ # Query every unit for cluster_status running nodes
1534+ for query_unit in sentry_units:
1535+ query_unit_name = query_unit.info['unit_name']
1536+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
1537+
1538+ # Confirm that every unit is represented in the queried unit's
1539+ # cluster_status running nodes output.
1540+ for validate_unit in sentry_units:
1541+ val_host_name = host_names[validate_unit.info['unit_name']]
1542+ val_node_name = 'rabbit@{}'.format(val_host_name)
1543+
1544+ if val_node_name not in running_nodes:
1545+ errors.append('Cluster member check failed on {}: {} not '
1546+ 'in {}\n'.format(query_unit_name,
1547+ val_node_name,
1548+ running_nodes))
1549+ if errors:
1550+ return ''.join(errors)
1551+
1552+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
1553+ """Check a single juju rmq unit for ssl and port in the config file."""
1554+ host = sentry_unit.info['public-address']
1555+ unit_name = sentry_unit.info['unit_name']
1556+
1557+ conf_file = '/etc/rabbitmq/rabbitmq.config'
1558+ conf_contents = str(self.file_contents_safe(sentry_unit,
1559+ conf_file, max_wait=16))
1560+ # Checks
1561+ conf_ssl = 'ssl' in conf_contents
1562+ conf_port = str(port) in conf_contents
1563+
1564+ # Port explicitly checked in config
1565+ if port and conf_port and conf_ssl:
1566+ self.log.debug('SSL is enabled @{}:{} '
1567+ '({})'.format(host, port, unit_name))
1568+ return True
1569+ elif port and not conf_port and conf_ssl:
1570+ self.log.debug('SSL is enabled @{} but not on port {} '
1571+ '({})'.format(host, port, unit_name))
1572+ return False
1573+ # Port not checked (useful when checking that ssl is disabled)
1574+ elif not port and conf_ssl:
1575+ self.log.debug('SSL is enabled @{}:{} '
1576+ '({})'.format(host, port, unit_name))
1577+ return True
1578+ elif not port and not conf_ssl:
1579+ self.log.debug('SSL not enabled @{}:{} '
1580+ '({})'.format(host, port, unit_name))
1581+ return False
1582+ else:
1583+ msg = ('Unknown condition when checking SSL status @{}:{} '
1584+ '({})'.format(host, port, unit_name))
1585+ amulet.raise_status(amulet.FAIL, msg)
1586+
1587+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
1588+ """Check that ssl is enabled on rmq juju sentry units.
1589+
1590+ :param sentry_units: list of all rmq sentry units
1591+ :param port: optional ssl port override to validate
1592+ :returns: None if successful, otherwise return error message
1593+ """
1594+ for sentry_unit in sentry_units:
1595+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
1596+ return ('Unexpected condition: ssl is disabled on unit '
1597+ '({})'.format(sentry_unit.info['unit_name']))
1598+ return None
1599+
1600+ def validate_rmq_ssl_disabled_units(self, sentry_units):
1601+ """Check that ssl is enabled on listed rmq juju sentry units.
1602+
1603+ :param sentry_units: list of all rmq sentry units
1604+ :returns: True if successful. Raise on error.
1605+ """
1606+ for sentry_unit in sentry_units:
1607+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
1608+ return ('Unexpected condition: ssl is enabled on unit '
1609+ '({})'.format(sentry_unit.info['unit_name']))
1610+ return None
1611+
1612+ def configure_rmq_ssl_on(self, sentry_units, deployment,
1613+ port=None, max_wait=60):
1614+ """Turn ssl charm config option on, with optional non-default
1615+ ssl port specification. Confirm that it is enabled on every
1616+ unit.
1617+
1618+ :param sentry_units: list of sentry units
1619+ :param deployment: amulet deployment object pointer
1620+ :param port: amqp port, use defaults if None
1621+ :param max_wait: maximum time to wait in seconds to confirm
1622+ :returns: None if successful. Raise on error.
1623+ """
1624+ self.log.debug('Setting ssl charm config option: on')
1625+
1626+ # Enable RMQ SSL
1627+ config = {'ssl': 'on'}
1628+ if port:
1629+ config['ssl_port'] = port
1630+
1631+ deployment.configure('rabbitmq-server', config)
1632+
1633+ # Confirm
1634+ tries = 0
1635+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
1636+ while ret and tries < (max_wait / 4):
1637+ time.sleep(4)
1638+ self.log.debug('Attempt {}: {}'.format(tries, ret))
1639+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
1640+ tries += 1
1641+
1642+ if ret:
1643+ amulet.raise_status(amulet.FAIL, ret)
1644+
1645+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
1646+ """Turn ssl charm config option off, confirm that it is disabled
1647+ on every unit.
1648+
1649+ :param sentry_units: list of sentry units
1650+ :param deployment: amulet deployment object pointer
1651+ :param max_wait: maximum time to wait in seconds to confirm
1652+ :returns: None if successful. Raise on error.
1653+ """
1654+ self.log.debug('Setting ssl charm config option: off')
1655+
1656+ # Disable RMQ SSL
1657+ config = {'ssl': 'off'}
1658+ deployment.configure('rabbitmq-server', config)
1659+
1660+ # Confirm
1661+ tries = 0
1662+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
1663+ while ret and tries < (max_wait / 4):
1664+ time.sleep(4)
1665+ self.log.debug('Attempt {}: {}'.format(tries, ret))
1666+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
1667+ tries += 1
1668+
1669+ if ret:
1670+ amulet.raise_status(amulet.FAIL, ret)
1671+
1672+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
1673+ port=None, fatal=True,
1674+ username="testuser1", password="changeme"):
1675+ """Establish and return a pika amqp connection to the rabbitmq service
1676+ running on a rmq juju unit.
1677+
1678+ :param sentry_unit: sentry unit pointer
1679+ :param ssl: boolean, default to False
1680+ :param port: amqp port, use defaults if None
1681+ :param fatal: boolean, default to True (raises on connect error)
1682+ :param username: amqp user name, default to testuser1
1683+ :param password: amqp user password
1684+ :returns: pika amqp connection pointer or None if failed and non-fatal
1685+ """
1686+ host = sentry_unit.info['public-address']
1687+ unit_name = sentry_unit.info['unit_name']
1688+
1689+ # Default port logic if port is not specified
1690+ if ssl and not port:
1691+ port = 5671
1692+ elif not ssl and not port:
1693+ port = 5672
1694+
1695+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
1696+ '{}...'.format(host, port, unit_name, username))
1697+
1698+ try:
1699+ credentials = pika.PlainCredentials(username, password)
1700+ parameters = pika.ConnectionParameters(host=host, port=port,
1701+ credentials=credentials,
1702+ ssl=ssl,
1703+ connection_attempts=3,
1704+ retry_delay=5,
1705+ socket_timeout=1)
1706+ connection = pika.BlockingConnection(parameters)
1707+ assert connection.server_properties['product'] == 'RabbitMQ'
1708+ self.log.debug('Connect OK')
1709+ return connection
1710+ except Exception as e:
1711+ msg = ('amqp connection failed to {}:{} as '
1712+ '{} ({})'.format(host, port, username, str(e)))
1713+ if fatal:
1714+ amulet.raise_status(amulet.FAIL, msg)
1715+ else:
1716+ self.log.warn(msg)
1717+ return None
1718+
1719+ def publish_amqp_message_by_unit(self, sentry_unit, message,
1720+ queue="test", ssl=False,
1721+ username="testuser1",
1722+ password="changeme",
1723+ port=None):
1724+ """Publish an amqp message to a rmq juju unit.
1725+
1726+ :param sentry_unit: sentry unit pointer
1727+ :param message: amqp message string
1728+ :param queue: message queue, default to test
1729+ :param username: amqp user name, default to testuser1
1730+ :param password: amqp user password
1731+ :param ssl: boolean, default to False
1732+ :param port: amqp port, use defaults if None
1733+ :returns: None. Raises exception if publish failed.
1734+ """
1735+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
1736+ message))
1737+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
1738+ port=port,
1739+ username=username,
1740+ password=password)
1741+
1742+ # NOTE(beisner): extra debug here re: pika hang potential:
1743+ # https://github.com/pika/pika/issues/297
1744+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
1745+ self.log.debug('Defining channel...')
1746+ channel = connection.channel()
1747+ self.log.debug('Declaring queue...')
1748+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
1749+ self.log.debug('Publishing message...')
1750+ channel.basic_publish(exchange='', routing_key=queue, body=message)
1751+ self.log.debug('Closing channel...')
1752+ channel.close()
1753+ self.log.debug('Closing connection...')
1754+ connection.close()
1755+
1756+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
1757+ username="testuser1",
1758+ password="changeme",
1759+ ssl=False, port=None):
1760+ """Get an amqp message from a rmq juju unit.
1761+
1762+ :param sentry_unit: sentry unit pointer
1763+ :param queue: message queue, default to test
1764+ :param username: amqp user name, default to testuser1
1765+ :param password: amqp user password
1766+ :param ssl: boolean, default to False
1767+ :param port: amqp port, use defaults if None
1768+ :returns: amqp message body as string. Raise if get fails.
1769+ """
1770+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
1771+ port=port,
1772+ username=username,
1773+ password=password)
1774+ channel = connection.channel()
1775+ method_frame, _, body = channel.basic_get(queue)
1776+
1777+ if method_frame:
1778+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
1779+ body))
1780+ channel.basic_ack(method_frame.delivery_tag)
1781+ channel.close()
1782+ connection.close()
1783+ return body
1784+ else:
1785+ msg = 'No message retrieved.'
1786+ amulet.raise_status(amulet.FAIL, msg)

Subscribers

People subscribed via source and target branches