Merge lp:~xianghui/charms/trusty/nova-cloud-controller/live_migration into lp:~openstack-charmers-archive/charms/trusty/nova-cloud-controller/next

Proposed by Xiang Hui
Status: Merged
Merged at revision: 189
Proposed branch: lp:~xianghui/charms/trusty/nova-cloud-controller/live_migration
Merge into: lp:~openstack-charmers-archive/charms/trusty/nova-cloud-controller/next
Diff against target: 2135 lines (+1496/-110)
15 files modified
hooks/charmhelpers/contrib/network/ip.py (+5/-3)
hooks/charmhelpers/contrib/openstack/amulet/deployment.py (+13/-6)
hooks/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
hooks/charmhelpers/contrib/openstack/context.py (+60/-16)
hooks/charmhelpers/contrib/openstack/templating.py (+29/-2)
hooks/charmhelpers/contrib/openstack/utils.py (+221/-1)
hooks/charmhelpers/contrib/storage/linux/ceph.py (+226/-13)
hooks/charmhelpers/core/host.py (+3/-4)
hooks/charmhelpers/core/kernel.py (+68/-0)
hooks/charmhelpers/core/strutils.py (+30/-0)
hooks/nova_cc_utils.py (+17/-5)
tests/charmhelpers/contrib/amulet/deployment.py (+4/-2)
tests/charmhelpers/contrib/amulet/utils.py (+96/-52)
tests/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
unit_tests/test_nova_cc_utils.py (+6/-6)
To merge this branch: bzr merge lp:~xianghui/charms/trusty/nova-cloud-controller/live_migration
Reviewer Review Type Date Requested Status
Billy Olsen Approve
Review via email: mp+267132@code.launchpad.net

Description of the change

Fix live-migration failed when re-add nova-compute unit.

To post a comment you must log in.
Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #7609 nova-cloud-controller-next for xianghui mp267132
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/7609/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #7047 nova-cloud-controller-next for xianghui mp267132
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/7047/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #5646 nova-cloud-controller-next for xianghui mp267132
    AMULET FAIL: amulet-test failed

AMULET Results (max last 2 lines):
make: *** [test] Error 1
ERROR:root:Make target returned non-zero.

Full amulet test output: http://paste.ubuntu.com/12011951/
Build: http://10.245.162.77:8080/job/charm_amulet_test/5646/

183. By Xiang Hui

Merge lp:charm-helpers.

184. By Xiang Hui

Fix conflicts.

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #10420 nova-cloud-controller-next for xianghui mp267132
    LINT FAIL: lint-test failed

LINT Results (max last 2 lines):
make: *** [lint] Error 1
ERROR:root:Make target returned non-zero.

Full lint test output: http://paste.ubuntu.com/12519228/
Build: http://10.245.162.77:8080/job/charm_lint_check/10420/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #9617 nova-cloud-controller-next for xianghui mp267132
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/9617/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6564 nova-cloud-controller-next for xianghui mp267132
    AMULET FAIL: amulet-test failed

AMULET Results (max last 2 lines):
make: *** [test] Error 1
ERROR:root:Make target returned non-zero.

Full amulet test output: http://paste.ubuntu.com/12519253/
Build: http://10.245.162.77:8080/job/charm_amulet_test/6564/

185. By Xiang Hui

Fix lint error

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #10428 nova-cloud-controller-next for xianghui mp267132
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/10428/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #9619 nova-cloud-controller-next for xianghui mp267132
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/9619/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6571 nova-cloud-controller-next for xianghui mp267132
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/6571/

Revision history for this message
Billy Olsen (billy-olsen) wrote :

LGTM, thanks Xiang Hui!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'hooks/charmhelpers/contrib/network/ip.py'
2--- hooks/charmhelpers/contrib/network/ip.py 2015-09-03 09:39:34 +0000
3+++ hooks/charmhelpers/contrib/network/ip.py 2015-09-22 06:12:36 +0000
4@@ -23,7 +23,7 @@
5 from functools import partial
6
7 from charmhelpers.core.hookenv import unit_get
8-from charmhelpers.fetch import apt_install
9+from charmhelpers.fetch import apt_install, apt_update
10 from charmhelpers.core.hookenv import (
11 log,
12 WARNING,
13@@ -32,13 +32,15 @@
14 try:
15 import netifaces
16 except ImportError:
17- apt_install('python-netifaces')
18+ apt_update(fatal=True)
19+ apt_install('python-netifaces', fatal=True)
20 import netifaces
21
22 try:
23 import netaddr
24 except ImportError:
25- apt_install('python-netaddr')
26+ apt_update(fatal=True)
27+ apt_install('python-netaddr', fatal=True)
28 import netaddr
29
30
31
32=== modified file 'hooks/charmhelpers/contrib/openstack/amulet/deployment.py'
33--- hooks/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-18 17:34:35 +0000
34+++ hooks/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-22 06:12:36 +0000
35@@ -51,13 +51,17 @@
36 else:
37 base_series = self.current_next
38
39- if self.stable:
40- for svc in other_services:
41+ for svc in other_services:
42+ if svc['name'] in force_series_current:
43+ base_series = self.current_next
44+ # If a location has been explicitly set, use it
45+ if svc.get('location'):
46+ continue
47+ if self.stable:
48 temp = 'lp:charms/{}/{}'
49 svc['location'] = temp.format(base_series,
50 svc['name'])
51- else:
52- for svc in other_services:
53+ else:
54 if svc['name'] in base_charms:
55 temp = 'lp:charms/{}/{}'
56 svc['location'] = temp.format(base_series,
57@@ -66,6 +70,7 @@
58 temp = 'lp:~openstack-charmers/charms/{}/{}/next'
59 svc['location'] = temp.format(self.current_next,
60 svc['name'])
61+
62 return other_services
63
64 def _add_services(self, this_service, other_services):
65@@ -77,6 +82,8 @@
66
67 services = other_services
68 services.append(this_service)
69+
70+ # Charms which should use the source config option
71 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
72 'ceph-osd', 'ceph-radosgw']
73 # Most OpenStack subordinate charms do not expose an origin option
74@@ -85,13 +92,13 @@
75
76 if self.openstack:
77 for svc in services:
78- if svc['name'] not in use_source + ignore:
79+ if svc['name'] not in use_source + no_origin:
80 config = {'openstack-origin': self.openstack}
81 self.d.configure(svc['name'], config)
82
83 if self.source:
84 for svc in services:
85- if svc['name'] in use_source and svc['name'] not in ignore:
86+ if svc['name'] in use_source and svc['name'] not in no_origin:
87 config = {'source': self.source}
88 self.d.configure(svc['name'], config)
89
90
91=== modified file 'hooks/charmhelpers/contrib/openstack/amulet/utils.py'
92--- hooks/charmhelpers/contrib/openstack/amulet/utils.py 2015-07-16 20:18:38 +0000
93+++ hooks/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-22 06:12:36 +0000
94@@ -27,6 +27,7 @@
95 import heatclient.v1.client as heat_client
96 import keystoneclient.v2_0 as keystone_client
97 import novaclient.v1_1.client as nova_client
98+import pika
99 import swiftclient
100
101 from charmhelpers.contrib.amulet.utils import (
102@@ -602,3 +603,361 @@
103 self.log.debug('Ceph {} samples (OK): '
104 '{}'.format(sample_type, samples))
105 return None
106+
107+# rabbitmq/amqp specific helpers:
108+ def add_rmq_test_user(self, sentry_units,
109+ username="testuser1", password="changeme"):
110+ """Add a test user via the first rmq juju unit, check connection as
111+ the new user against all sentry units.
112+
113+ :param sentry_units: list of sentry unit pointers
114+ :param username: amqp user name, default to testuser1
115+ :param password: amqp user password
116+ :returns: None if successful. Raise on error.
117+ """
118+ self.log.debug('Adding rmq user ({})...'.format(username))
119+
120+ # Check that user does not already exist
121+ cmd_user_list = 'rabbitmqctl list_users'
122+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
123+ if username in output:
124+ self.log.warning('User ({}) already exists, returning '
125+ 'gracefully.'.format(username))
126+ return
127+
128+ perms = '".*" ".*" ".*"'
129+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
130+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
131+
132+ # Add user via first unit
133+ for cmd in cmds:
134+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
135+
136+ # Check connection against the other sentry_units
137+ self.log.debug('Checking user connect against units...')
138+ for sentry_unit in sentry_units:
139+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
140+ username=username,
141+ password=password)
142+ connection.close()
143+
144+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
145+ """Delete a rabbitmq user via the first rmq juju unit.
146+
147+ :param sentry_units: list of sentry unit pointers
148+ :param username: amqp user name, default to testuser1
149+ :param password: amqp user password
150+ :returns: None if successful or no such user.
151+ """
152+ self.log.debug('Deleting rmq user ({})...'.format(username))
153+
154+ # Check that the user exists
155+ cmd_user_list = 'rabbitmqctl list_users'
156+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
157+
158+ if username not in output:
159+ self.log.warning('User ({}) does not exist, returning '
160+ 'gracefully.'.format(username))
161+ return
162+
163+ # Delete the user
164+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
165+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
166+
167+ def get_rmq_cluster_status(self, sentry_unit):
168+ """Execute rabbitmq cluster status command on a unit and return
169+ the full output.
170+
171+ :param unit: sentry unit
172+ :returns: String containing console output of cluster status command
173+ """
174+ cmd = 'rabbitmqctl cluster_status'
175+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
176+ self.log.debug('{} cluster_status:\n{}'.format(
177+ sentry_unit.info['unit_name'], output))
178+ return str(output)
179+
180+ def get_rmq_cluster_running_nodes(self, sentry_unit):
181+ """Parse rabbitmqctl cluster_status output string, return list of
182+ running rabbitmq cluster nodes.
183+
184+ :param unit: sentry unit
185+ :returns: List containing node names of running nodes
186+ """
187+ # NOTE(beisner): rabbitmqctl cluster_status output is not
188+ # json-parsable, do string chop foo, then json.loads that.
189+ str_stat = self.get_rmq_cluster_status(sentry_unit)
190+ if 'running_nodes' in str_stat:
191+ pos_start = str_stat.find("{running_nodes,") + 15
192+ pos_end = str_stat.find("]},", pos_start) + 1
193+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
194+ run_nodes = json.loads(str_run_nodes)
195+ return run_nodes
196+ else:
197+ return []
198+
199+ def validate_rmq_cluster_running_nodes(self, sentry_units):
200+ """Check that all rmq unit hostnames are represented in the
201+ cluster_status output of all units.
202+
203+ :param host_names: dict of juju unit names to host names
204+ :param units: list of sentry unit pointers (all rmq units)
205+ :returns: None if successful, otherwise return error message
206+ """
207+ host_names = self.get_unit_hostnames(sentry_units)
208+ errors = []
209+
210+ # Query every unit for cluster_status running nodes
211+ for query_unit in sentry_units:
212+ query_unit_name = query_unit.info['unit_name']
213+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
214+
215+ # Confirm that every unit is represented in the queried unit's
216+ # cluster_status running nodes output.
217+ for validate_unit in sentry_units:
218+ val_host_name = host_names[validate_unit.info['unit_name']]
219+ val_node_name = 'rabbit@{}'.format(val_host_name)
220+
221+ if val_node_name not in running_nodes:
222+ errors.append('Cluster member check failed on {}: {} not '
223+ 'in {}\n'.format(query_unit_name,
224+ val_node_name,
225+ running_nodes))
226+ if errors:
227+ return ''.join(errors)
228+
229+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
230+ """Check a single juju rmq unit for ssl and port in the config file."""
231+ host = sentry_unit.info['public-address']
232+ unit_name = sentry_unit.info['unit_name']
233+
234+ conf_file = '/etc/rabbitmq/rabbitmq.config'
235+ conf_contents = str(self.file_contents_safe(sentry_unit,
236+ conf_file, max_wait=16))
237+ # Checks
238+ conf_ssl = 'ssl' in conf_contents
239+ conf_port = str(port) in conf_contents
240+
241+ # Port explicitly checked in config
242+ if port and conf_port and conf_ssl:
243+ self.log.debug('SSL is enabled @{}:{} '
244+ '({})'.format(host, port, unit_name))
245+ return True
246+ elif port and not conf_port and conf_ssl:
247+ self.log.debug('SSL is enabled @{} but not on port {} '
248+ '({})'.format(host, port, unit_name))
249+ return False
250+ # Port not checked (useful when checking that ssl is disabled)
251+ elif not port and conf_ssl:
252+ self.log.debug('SSL is enabled @{}:{} '
253+ '({})'.format(host, port, unit_name))
254+ return True
255+ elif not port and not conf_ssl:
256+ self.log.debug('SSL not enabled @{}:{} '
257+ '({})'.format(host, port, unit_name))
258+ return False
259+ else:
260+ msg = ('Unknown condition when checking SSL status @{}:{} '
261+ '({})'.format(host, port, unit_name))
262+ amulet.raise_status(amulet.FAIL, msg)
263+
264+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
265+ """Check that ssl is enabled on rmq juju sentry units.
266+
267+ :param sentry_units: list of all rmq sentry units
268+ :param port: optional ssl port override to validate
269+ :returns: None if successful, otherwise return error message
270+ """
271+ for sentry_unit in sentry_units:
272+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
273+ return ('Unexpected condition: ssl is disabled on unit '
274+ '({})'.format(sentry_unit.info['unit_name']))
275+ return None
276+
277+ def validate_rmq_ssl_disabled_units(self, sentry_units):
278+ """Check that ssl is enabled on listed rmq juju sentry units.
279+
280+ :param sentry_units: list of all rmq sentry units
281+ :returns: True if successful. Raise on error.
282+ """
283+ for sentry_unit in sentry_units:
284+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
285+ return ('Unexpected condition: ssl is enabled on unit '
286+ '({})'.format(sentry_unit.info['unit_name']))
287+ return None
288+
289+ def configure_rmq_ssl_on(self, sentry_units, deployment,
290+ port=None, max_wait=60):
291+ """Turn ssl charm config option on, with optional non-default
292+ ssl port specification. Confirm that it is enabled on every
293+ unit.
294+
295+ :param sentry_units: list of sentry units
296+ :param deployment: amulet deployment object pointer
297+ :param port: amqp port, use defaults if None
298+ :param max_wait: maximum time to wait in seconds to confirm
299+ :returns: None if successful. Raise on error.
300+ """
301+ self.log.debug('Setting ssl charm config option: on')
302+
303+ # Enable RMQ SSL
304+ config = {'ssl': 'on'}
305+ if port:
306+ config['ssl_port'] = port
307+
308+ deployment.configure('rabbitmq-server', config)
309+
310+ # Confirm
311+ tries = 0
312+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
313+ while ret and tries < (max_wait / 4):
314+ time.sleep(4)
315+ self.log.debug('Attempt {}: {}'.format(tries, ret))
316+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
317+ tries += 1
318+
319+ if ret:
320+ amulet.raise_status(amulet.FAIL, ret)
321+
322+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
323+ """Turn ssl charm config option off, confirm that it is disabled
324+ on every unit.
325+
326+ :param sentry_units: list of sentry units
327+ :param deployment: amulet deployment object pointer
328+ :param max_wait: maximum time to wait in seconds to confirm
329+ :returns: None if successful. Raise on error.
330+ """
331+ self.log.debug('Setting ssl charm config option: off')
332+
333+ # Disable RMQ SSL
334+ config = {'ssl': 'off'}
335+ deployment.configure('rabbitmq-server', config)
336+
337+ # Confirm
338+ tries = 0
339+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
340+ while ret and tries < (max_wait / 4):
341+ time.sleep(4)
342+ self.log.debug('Attempt {}: {}'.format(tries, ret))
343+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
344+ tries += 1
345+
346+ if ret:
347+ amulet.raise_status(amulet.FAIL, ret)
348+
349+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
350+ port=None, fatal=True,
351+ username="testuser1", password="changeme"):
352+ """Establish and return a pika amqp connection to the rabbitmq service
353+ running on a rmq juju unit.
354+
355+ :param sentry_unit: sentry unit pointer
356+ :param ssl: boolean, default to False
357+ :param port: amqp port, use defaults if None
358+ :param fatal: boolean, default to True (raises on connect error)
359+ :param username: amqp user name, default to testuser1
360+ :param password: amqp user password
361+ :returns: pika amqp connection pointer or None if failed and non-fatal
362+ """
363+ host = sentry_unit.info['public-address']
364+ unit_name = sentry_unit.info['unit_name']
365+
366+ # Default port logic if port is not specified
367+ if ssl and not port:
368+ port = 5671
369+ elif not ssl and not port:
370+ port = 5672
371+
372+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
373+ '{}...'.format(host, port, unit_name, username))
374+
375+ try:
376+ credentials = pika.PlainCredentials(username, password)
377+ parameters = pika.ConnectionParameters(host=host, port=port,
378+ credentials=credentials,
379+ ssl=ssl,
380+ connection_attempts=3,
381+ retry_delay=5,
382+ socket_timeout=1)
383+ connection = pika.BlockingConnection(parameters)
384+ assert connection.server_properties['product'] == 'RabbitMQ'
385+ self.log.debug('Connect OK')
386+ return connection
387+ except Exception as e:
388+ msg = ('amqp connection failed to {}:{} as '
389+ '{} ({})'.format(host, port, username, str(e)))
390+ if fatal:
391+ amulet.raise_status(amulet.FAIL, msg)
392+ else:
393+ self.log.warn(msg)
394+ return None
395+
396+ def publish_amqp_message_by_unit(self, sentry_unit, message,
397+ queue="test", ssl=False,
398+ username="testuser1",
399+ password="changeme",
400+ port=None):
401+ """Publish an amqp message to a rmq juju unit.
402+
403+ :param sentry_unit: sentry unit pointer
404+ :param message: amqp message string
405+ :param queue: message queue, default to test
406+ :param username: amqp user name, default to testuser1
407+ :param password: amqp user password
408+ :param ssl: boolean, default to False
409+ :param port: amqp port, use defaults if None
410+ :returns: None. Raises exception if publish failed.
411+ """
412+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
413+ message))
414+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
415+ port=port,
416+ username=username,
417+ password=password)
418+
419+ # NOTE(beisner): extra debug here re: pika hang potential:
420+ # https://github.com/pika/pika/issues/297
421+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
422+ self.log.debug('Defining channel...')
423+ channel = connection.channel()
424+ self.log.debug('Declaring queue...')
425+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
426+ self.log.debug('Publishing message...')
427+ channel.basic_publish(exchange='', routing_key=queue, body=message)
428+ self.log.debug('Closing channel...')
429+ channel.close()
430+ self.log.debug('Closing connection...')
431+ connection.close()
432+
433+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
434+ username="testuser1",
435+ password="changeme",
436+ ssl=False, port=None):
437+ """Get an amqp message from a rmq juju unit.
438+
439+ :param sentry_unit: sentry unit pointer
440+ :param queue: message queue, default to test
441+ :param username: amqp user name, default to testuser1
442+ :param password: amqp user password
443+ :param ssl: boolean, default to False
444+ :param port: amqp port, use defaults if None
445+ :returns: amqp message body as string. Raise if get fails.
446+ """
447+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
448+ port=port,
449+ username=username,
450+ password=password)
451+ channel = connection.channel()
452+ method_frame, _, body = channel.basic_get(queue)
453+
454+ if method_frame:
455+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
456+ body))
457+ channel.basic_ack(method_frame.delivery_tag)
458+ channel.close()
459+ connection.close()
460+ return body
461+ else:
462+ msg = 'No message retrieved.'
463+ amulet.raise_status(amulet.FAIL, msg)
464
465=== modified file 'hooks/charmhelpers/contrib/openstack/context.py'
466--- hooks/charmhelpers/contrib/openstack/context.py 2015-09-03 09:39:34 +0000
467+++ hooks/charmhelpers/contrib/openstack/context.py 2015-09-22 06:12:36 +0000
468@@ -194,10 +194,50 @@
469 class OSContextGenerator(object):
470 """Base class for all context generators."""
471 interfaces = []
472+ related = False
473+ complete = False
474+ missing_data = []
475
476 def __call__(self):
477 raise NotImplementedError
478
479+ def context_complete(self, ctxt):
480+ """Check for missing data for the required context data.
481+ Set self.missing_data if it exists and return False.
482+ Set self.complete if no missing data and return True.
483+ """
484+ # Fresh start
485+ self.complete = False
486+ self.missing_data = []
487+ for k, v in six.iteritems(ctxt):
488+ if v is None or v == '':
489+ if k not in self.missing_data:
490+ self.missing_data.append(k)
491+
492+ if self.missing_data:
493+ self.complete = False
494+ log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO)
495+ else:
496+ self.complete = True
497+ return self.complete
498+
499+ def get_related(self):
500+ """Check if any of the context interfaces have relation ids.
501+ Set self.related and return True if one of the interfaces
502+ has relation ids.
503+ """
504+ # Fresh start
505+ self.related = False
506+ try:
507+ for interface in self.interfaces:
508+ if relation_ids(interface):
509+ self.related = True
510+ return self.related
511+ except AttributeError as e:
512+ log("{} {}"
513+ "".format(self, e), 'INFO')
514+ return self.related
515+
516
517 class SharedDBContext(OSContextGenerator):
518 interfaces = ['shared-db']
519@@ -213,6 +253,7 @@
520 self.database = database
521 self.user = user
522 self.ssl_dir = ssl_dir
523+ self.rel_name = self.interfaces[0]
524
525 def __call__(self):
526 self.database = self.database or config('database')
527@@ -246,6 +287,7 @@
528 password_setting = self.relation_prefix + '_password'
529
530 for rid in relation_ids(self.interfaces[0]):
531+ self.related = True
532 for unit in related_units(rid):
533 rdata = relation_get(rid=rid, unit=unit)
534 host = rdata.get('db_host')
535@@ -257,7 +299,7 @@
536 'database_password': rdata.get(password_setting),
537 'database_type': 'mysql'
538 }
539- if context_complete(ctxt):
540+ if self.context_complete(ctxt):
541 db_ssl(rdata, ctxt, self.ssl_dir)
542 return ctxt
543 return {}
544@@ -278,6 +320,7 @@
545
546 ctxt = {}
547 for rid in relation_ids(self.interfaces[0]):
548+ self.related = True
549 for unit in related_units(rid):
550 rel_host = relation_get('host', rid=rid, unit=unit)
551 rel_user = relation_get('user', rid=rid, unit=unit)
552@@ -287,7 +330,7 @@
553 'database_user': rel_user,
554 'database_password': rel_passwd,
555 'database_type': 'postgresql'}
556- if context_complete(ctxt):
557+ if self.context_complete(ctxt):
558 return ctxt
559
560 return {}
561@@ -348,6 +391,7 @@
562 ctxt['signing_dir'] = cachedir
563
564 for rid in relation_ids(self.rel_name):
565+ self.related = True
566 for unit in related_units(rid):
567 rdata = relation_get(rid=rid, unit=unit)
568 serv_host = rdata.get('service_host')
569@@ -366,7 +410,7 @@
570 'service_protocol': svc_protocol,
571 'auth_protocol': auth_protocol})
572
573- if context_complete(ctxt):
574+ if self.context_complete(ctxt):
575 # NOTE(jamespage) this is required for >= icehouse
576 # so a missing value just indicates keystone needs
577 # upgrading
578@@ -405,6 +449,7 @@
579 ctxt = {}
580 for rid in relation_ids(self.rel_name):
581 ha_vip_only = False
582+ self.related = True
583 for unit in related_units(rid):
584 if relation_get('clustered', rid=rid, unit=unit):
585 ctxt['clustered'] = True
586@@ -437,7 +482,7 @@
587 ha_vip_only = relation_get('ha-vip-only',
588 rid=rid, unit=unit) is not None
589
590- if context_complete(ctxt):
591+ if self.context_complete(ctxt):
592 if 'rabbit_ssl_ca' in ctxt:
593 if not self.ssl_dir:
594 log("Charm not setup for ssl support but ssl ca "
595@@ -469,7 +514,7 @@
596 ctxt['oslo_messaging_flags'] = config_flags_parser(
597 oslo_messaging_flags)
598
599- if not context_complete(ctxt):
600+ if not self.complete:
601 return {}
602
603 return ctxt
604@@ -485,13 +530,15 @@
605
606 log('Generating template context for ceph', level=DEBUG)
607 mon_hosts = []
608- auth = None
609- key = None
610- use_syslog = str(config('use-syslog')).lower()
611+ ctxt = {
612+ 'use_syslog': str(config('use-syslog')).lower()
613+ }
614 for rid in relation_ids('ceph'):
615 for unit in related_units(rid):
616- auth = relation_get('auth', rid=rid, unit=unit)
617- key = relation_get('key', rid=rid, unit=unit)
618+ if not ctxt.get('auth'):
619+ ctxt['auth'] = relation_get('auth', rid=rid, unit=unit)
620+ if not ctxt.get('key'):
621+ ctxt['key'] = relation_get('key', rid=rid, unit=unit)
622 ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
623 unit=unit)
624 unit_priv_addr = relation_get('private-address', rid=rid,
625@@ -500,15 +547,12 @@
626 ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
627 mon_hosts.append(ceph_addr)
628
629- ctxt = {'mon_hosts': ' '.join(sorted(mon_hosts)),
630- 'auth': auth,
631- 'key': key,
632- 'use_syslog': use_syslog}
633+ ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
634
635 if not os.path.isdir('/etc/ceph'):
636 os.mkdir('/etc/ceph')
637
638- if not context_complete(ctxt):
639+ if not self.context_complete(ctxt):
640 return {}
641
642 ensure_packages(['ceph-common'])
643@@ -1367,6 +1411,6 @@
644 'auth_protocol':
645 rdata.get('auth_protocol') or 'http',
646 }
647- if context_complete(ctxt):
648+ if self.context_complete(ctxt):
649 return ctxt
650 return {}
651
652=== modified file 'hooks/charmhelpers/contrib/openstack/templating.py'
653--- hooks/charmhelpers/contrib/openstack/templating.py 2015-07-29 10:46:43 +0000
654+++ hooks/charmhelpers/contrib/openstack/templating.py 2015-09-22 06:12:36 +0000
655@@ -18,7 +18,7 @@
656
657 import six
658
659-from charmhelpers.fetch import apt_install
660+from charmhelpers.fetch import apt_install, apt_update
661 from charmhelpers.core.hookenv import (
662 log,
663 ERROR,
664@@ -112,7 +112,7 @@
665
666 def complete_contexts(self):
667 '''
668- Return a list of interfaces that have atisfied contexts.
669+ Return a list of interfaces that have satisfied contexts.
670 '''
671 if self._complete_contexts:
672 return self._complete_contexts
673@@ -293,3 +293,30 @@
674 [interfaces.extend(i.complete_contexts())
675 for i in six.itervalues(self.templates)]
676 return interfaces
677+
678+ def get_incomplete_context_data(self, interfaces):
679+ '''
680+ Return dictionary of relation status of interfaces and any missing
681+ required context data. Example:
682+ {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
683+ 'zeromq-configuration': {'related': False}}
684+ '''
685+ incomplete_context_data = {}
686+
687+ for i in six.itervalues(self.templates):
688+ for context in i.contexts:
689+ for interface in interfaces:
690+ related = False
691+ if interface in context.interfaces:
692+ related = context.get_related()
693+ missing_data = context.missing_data
694+ if missing_data:
695+ incomplete_context_data[interface] = {'missing_data': missing_data}
696+ if related:
697+ if incomplete_context_data.get(interface):
698+ incomplete_context_data[interface].update({'related': True})
699+ else:
700+ incomplete_context_data[interface] = {'related': True}
701+ else:
702+ incomplete_context_data[interface] = {'related': False}
703+ return incomplete_context_data
704
705=== modified file 'hooks/charmhelpers/contrib/openstack/utils.py'
706--- hooks/charmhelpers/contrib/openstack/utils.py 2015-09-03 09:39:34 +0000
707+++ hooks/charmhelpers/contrib/openstack/utils.py 2015-09-22 06:12:36 +0000
708@@ -25,6 +25,7 @@
709 import re
710
711 import six
712+import traceback
713 import yaml
714
715 from charmhelpers.contrib.network import ip
716@@ -34,12 +35,16 @@
717 )
718
719 from charmhelpers.core.hookenv import (
720+ action_fail,
721+ action_set,
722 config,
723 log as juju_log,
724 charm_dir,
725 INFO,
726 relation_ids,
727- relation_set
728+ relation_set,
729+ status_set,
730+ hook_name
731 )
732
733 from charmhelpers.contrib.storage.linux.lvm import (
734@@ -114,6 +119,7 @@
735 ('2.2.1', 'kilo'),
736 ('2.2.2', 'kilo'),
737 ('2.3.0', 'liberty'),
738+ ('2.4.0', 'liberty'),
739 ])
740
741 # >= Liberty version->codename mapping
742@@ -745,3 +751,217 @@
743 return projects[key]
744
745 return None
746+
747+
748+def os_workload_status(configs, required_interfaces, charm_func=None):
749+ """
750+ Decorator to set workload status based on complete contexts
751+ """
752+ def wrap(f):
753+ @wraps(f)
754+ def wrapped_f(*args, **kwargs):
755+ # Run the original function first
756+ f(*args, **kwargs)
757+ # Set workload status now that contexts have been
758+ # acted on
759+ set_os_workload_status(configs, required_interfaces, charm_func)
760+ return wrapped_f
761+ return wrap
762+
763+
764+def set_os_workload_status(configs, required_interfaces, charm_func=None):
765+ """
766+ Set workload status based on complete contexts.
767+ status-set missing or incomplete contexts
768+ and juju-log details of missing required data.
769+ charm_func is a charm specific function to run checking
770+ for charm specific requirements such as a VIP setting.
771+ """
772+ incomplete_rel_data = incomplete_relation_data(configs, required_interfaces)
773+ state = 'active'
774+ missing_relations = []
775+ incomplete_relations = []
776+ message = None
777+ charm_state = None
778+ charm_message = None
779+
780+ for generic_interface in incomplete_rel_data.keys():
781+ related_interface = None
782+ missing_data = {}
783+ # Related or not?
784+ for interface in incomplete_rel_data[generic_interface]:
785+ if incomplete_rel_data[generic_interface][interface].get('related'):
786+ related_interface = interface
787+ missing_data = incomplete_rel_data[generic_interface][interface].get('missing_data')
788+ # No relation ID for the generic_interface
789+ if not related_interface:
790+ juju_log("{} relation is missing and must be related for "
791+ "functionality. ".format(generic_interface), 'WARN')
792+ state = 'blocked'
793+ if generic_interface not in missing_relations:
794+ missing_relations.append(generic_interface)
795+ else:
796+ # Relation ID exists but no related unit
797+ if not missing_data:
798+ # Edge case relation ID exists but departing
799+ if ('departed' in hook_name() or 'broken' in hook_name()) \
800+ and related_interface in hook_name():
801+ state = 'blocked'
802+ if generic_interface not in missing_relations:
803+ missing_relations.append(generic_interface)
804+ juju_log("{} relation's interface, {}, "
805+ "relationship is departed or broken "
806+ "and is required for functionality."
807+ "".format(generic_interface, related_interface), "WARN")
808+ # Normal case relation ID exists but no related unit
809+ # (joining)
810+ else:
811+ juju_log("{} relations's interface, {}, is related but has "
812+ "no units in the relation."
813+ "".format(generic_interface, related_interface), "INFO")
814+ # Related unit exists and data missing on the relation
815+ else:
816+ juju_log("{} relation's interface, {}, is related awaiting "
817+ "the following data from the relationship: {}. "
818+ "".format(generic_interface, related_interface,
819+ ", ".join(missing_data)), "INFO")
820+ if state != 'blocked':
821+ state = 'waiting'
822+ if generic_interface not in incomplete_relations \
823+ and generic_interface not in missing_relations:
824+ incomplete_relations.append(generic_interface)
825+
826+ if missing_relations:
827+ message = "Missing relations: {}".format(", ".join(missing_relations))
828+ if incomplete_relations:
829+ message += "; incomplete relations: {}" \
830+ "".format(", ".join(incomplete_relations))
831+ state = 'blocked'
832+ elif incomplete_relations:
833+ message = "Incomplete relations: {}" \
834+ "".format(", ".join(incomplete_relations))
835+ state = 'waiting'
836+
837+ # Run charm specific checks
838+ if charm_func:
839+ charm_state, charm_message = charm_func(configs)
840+ if charm_state != 'active' and charm_state != 'unknown':
841+ state = workload_state_compare(state, charm_state)
842+ if message:
843+ message = "{} {}".format(message, charm_message)
844+ else:
845+ message = charm_message
846+
847+ # Set to active if all requirements have been met
848+ if state == 'active':
849+ message = "Unit is ready"
850+ juju_log(message, "INFO")
851+
852+ status_set(state, message)
853+
854+
855+def workload_state_compare(current_workload_state, workload_state):
856+ """ Return highest priority of two states"""
857+ hierarchy = {'unknown': -1,
858+ 'active': 0,
859+ 'maintenance': 1,
860+ 'waiting': 2,
861+ 'blocked': 3,
862+ }
863+
864+ if hierarchy.get(workload_state) is None:
865+ workload_state = 'unknown'
866+ if hierarchy.get(current_workload_state) is None:
867+ current_workload_state = 'unknown'
868+
869+ # Set workload_state based on hierarchy of statuses
870+ if hierarchy.get(current_workload_state) > hierarchy.get(workload_state):
871+ return current_workload_state
872+ else:
873+ return workload_state
874+
875+
876+def incomplete_relation_data(configs, required_interfaces):
877+ """
878+ Check complete contexts against required_interfaces
879+ Return dictionary of incomplete relation data.
880+
881+ configs is an OSConfigRenderer object with configs registered
882+
883+ required_interfaces is a dictionary of required general interfaces
884+ with dictionary values of possible specific interfaces.
885+ Example:
886+ required_interfaces = {'database': ['shared-db', 'pgsql-db']}
887+
888+ The interface is said to be satisfied if anyone of the interfaces in the
889+ list has a complete context.
890+
891+ Return dictionary of incomplete or missing required contexts with relation
892+ status of interfaces and any missing data points. Example:
893+ {'message':
894+ {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
895+ 'zeromq-configuration': {'related': False}},
896+ 'identity':
897+ {'identity-service': {'related': False}},
898+ 'database':
899+ {'pgsql-db': {'related': False},
900+ 'shared-db': {'related': True}}}
901+ """
902+ complete_ctxts = configs.complete_contexts()
903+ incomplete_relations = []
904+ for svc_type in required_interfaces.keys():
905+ # Avoid duplicates
906+ found_ctxt = False
907+ for interface in required_interfaces[svc_type]:
908+ if interface in complete_ctxts:
909+ found_ctxt = True
910+ if not found_ctxt:
911+ incomplete_relations.append(svc_type)
912+ incomplete_context_data = {}
913+ for i in incomplete_relations:
914+ incomplete_context_data[i] = configs.get_incomplete_context_data(required_interfaces[i])
915+ return incomplete_context_data
916+
917+
918+def do_action_openstack_upgrade(package, upgrade_callback, configs):
919+ """Perform action-managed OpenStack upgrade.
920+
921+ Upgrades packages to the configured openstack-origin version and sets
922+ the corresponding action status as a result.
923+
924+ If the charm was installed from source we cannot upgrade it.
925+ For backwards compatibility a config flag (action-managed-upgrade) must
926+ be set for this code to run, otherwise a full service level upgrade will
927+ fire on config-changed.
928+
929+ @param package: package name for determining if upgrade available
930+ @param upgrade_callback: function callback to charm's upgrade function
931+ @param configs: templating object derived from OSConfigRenderer class
932+
933+ @return: True if upgrade successful; False if upgrade failed or skipped
934+ """
935+ ret = False
936+
937+ if git_install_requested():
938+ action_set({'outcome': 'installed from source, skipped upgrade.'})
939+ else:
940+ if openstack_upgrade_available(package):
941+ if config('action-managed-upgrade'):
942+ juju_log('Upgrading OpenStack release')
943+
944+ try:
945+ upgrade_callback(configs=configs)
946+ action_set({'outcome': 'success, upgrade completed.'})
947+ ret = True
948+ except:
949+ action_set({'outcome': 'upgrade failed, see traceback.'})
950+ action_set({'traceback': traceback.format_exc()})
951+ action_fail('do_openstack_upgrade resulted in an '
952+ 'unexpected error')
953+ else:
954+ action_set({'outcome': 'action-managed-upgrade config is '
955+ 'False, skipped upgrade.'})
956+ else:
957+ action_set({'outcome': 'no upgrade available.'})
958+
959+ return ret
960
961=== modified file 'hooks/charmhelpers/contrib/storage/linux/ceph.py'
962--- hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-07-16 20:18:38 +0000
963+++ hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-09-22 06:12:36 +0000
964@@ -28,6 +28,7 @@
965 import shutil
966 import json
967 import time
968+import uuid
969
970 from subprocess import (
971 check_call,
972@@ -35,8 +36,10 @@
973 CalledProcessError,
974 )
975 from charmhelpers.core.hookenv import (
976+ local_unit,
977 relation_get,
978 relation_ids,
979+ relation_set,
980 related_units,
981 log,
982 DEBUG,
983@@ -56,6 +59,8 @@
984 apt_install,
985 )
986
987+from charmhelpers.core.kernel import modprobe
988+
989 KEYRING = '/etc/ceph/ceph.client.{}.keyring'
990 KEYFILE = '/etc/ceph/ceph.client.{}.key'
991
992@@ -288,17 +293,6 @@
993 os.chown(data_src_dst, uid, gid)
994
995
996-# TODO: re-use
997-def modprobe(module):
998- """Load a kernel module and configure for auto-load on reboot."""
999- log('Loading kernel module', level=INFO)
1000- cmd = ['modprobe', module]
1001- check_call(cmd)
1002- with open('/etc/modules', 'r+') as modules:
1003- if module not in modules.read():
1004- modules.write(module)
1005-
1006-
1007 def copy_files(src, dst, symlinks=False, ignore=None):
1008 """Copy files from src to dst."""
1009 for item in os.listdir(src):
1010@@ -411,17 +405,52 @@
1011
1012 The API is versioned and defaults to version 1.
1013 """
1014- def __init__(self, api_version=1):
1015+ def __init__(self, api_version=1, request_id=None):
1016 self.api_version = api_version
1017+ if request_id:
1018+ self.request_id = request_id
1019+ else:
1020+ self.request_id = str(uuid.uuid1())
1021 self.ops = []
1022
1023 def add_op_create_pool(self, name, replica_count=3):
1024 self.ops.append({'op': 'create-pool', 'name': name,
1025 'replicas': replica_count})
1026
1027+ def set_ops(self, ops):
1028+ """Set request ops to provided value.
1029+
1030+ Useful for injecting ops that come from a previous request
1031+ to allow comparisons to ensure validity.
1032+ """
1033+ self.ops = ops
1034+
1035 @property
1036 def request(self):
1037- return json.dumps({'api-version': self.api_version, 'ops': self.ops})
1038+ return json.dumps({'api-version': self.api_version, 'ops': self.ops,
1039+ 'request-id': self.request_id})
1040+
1041+ def _ops_equal(self, other):
1042+ if len(self.ops) == len(other.ops):
1043+ for req_no in range(0, len(self.ops)):
1044+ for key in ['replicas', 'name', 'op']:
1045+ if self.ops[req_no][key] != other.ops[req_no][key]:
1046+ return False
1047+ else:
1048+ return False
1049+ return True
1050+
1051+ def __eq__(self, other):
1052+ if not isinstance(other, self.__class__):
1053+ return False
1054+ if self.api_version == other.api_version and \
1055+ self._ops_equal(other):
1056+ return True
1057+ else:
1058+ return False
1059+
1060+ def __ne__(self, other):
1061+ return not self.__eq__(other)
1062
1063
1064 class CephBrokerRsp(object):
1065@@ -431,14 +460,198 @@
1066
1067 The API is versioned and defaults to version 1.
1068 """
1069+
1070 def __init__(self, encoded_rsp):
1071 self.api_version = None
1072 self.rsp = json.loads(encoded_rsp)
1073
1074 @property
1075+ def request_id(self):
1076+ return self.rsp.get('request-id')
1077+
1078+ @property
1079 def exit_code(self):
1080 return self.rsp.get('exit-code')
1081
1082 @property
1083 def exit_msg(self):
1084 return self.rsp.get('stderr')
1085+
1086+
1087+# Ceph Broker Conversation:
1088+# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1089+# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1090+# unique id so that the client can identity which CephBrokerRsp is associated
1091+# with the request. Ceph will also respond to each client unit individually
1092+# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1093+# via key broker-rsp-glance-0
1094+#
1095+# To use this the charm can just do something like:
1096+#
1097+# from charmhelpers.contrib.storage.linux.ceph import (
1098+# send_request_if_needed,
1099+# is_request_complete,
1100+# CephBrokerRq,
1101+# )
1102+#
1103+# @hooks.hook('ceph-relation-changed')
1104+# def ceph_changed():
1105+# rq = CephBrokerRq()
1106+# rq.add_op_create_pool(name='poolname', replica_count=3)
1107+#
1108+# if is_request_complete(rq):
1109+# <Request complete actions>
1110+# else:
1111+# send_request_if_needed(get_ceph_request())
1112+#
1113+# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1114+# of glance having sent a request to ceph which ceph has successfully processed
1115+# 'ceph:8': {
1116+# 'ceph/0': {
1117+# 'auth': 'cephx',
1118+# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1119+# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1120+# 'ceph-public-address': '10.5.44.103',
1121+# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1122+# 'private-address': '10.5.44.103',
1123+# },
1124+# 'glance/0': {
1125+# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1126+# '"ops": [{"replicas": 3, "name": "glance", '
1127+# '"op": "create-pool"}]}'),
1128+# 'private-address': '10.5.44.109',
1129+# },
1130+# }
1131+
1132+def get_previous_request(rid):
1133+ """Return the last ceph broker request sent on a given relation
1134+
1135+ @param rid: Relation id to query for request
1136+ """
1137+ request = None
1138+ broker_req = relation_get(attribute='broker_req', rid=rid,
1139+ unit=local_unit())
1140+ if broker_req:
1141+ request_data = json.loads(broker_req)
1142+ request = CephBrokerRq(api_version=request_data['api-version'],
1143+ request_id=request_data['request-id'])
1144+ request.set_ops(request_data['ops'])
1145+
1146+ return request
1147+
1148+
1149+def get_request_states(request):
1150+ """Return a dict of requests per relation id with their corresponding
1151+ completion state.
1152+
1153+ This allows a charm, which has a request for ceph, to see whether there is
1154+ an equivalent request already being processed and if so what state that
1155+ request is in.
1156+
1157+ @param request: A CephBrokerRq object
1158+ """
1159+ complete = []
1160+ requests = {}
1161+ for rid in relation_ids('ceph'):
1162+ complete = False
1163+ previous_request = get_previous_request(rid)
1164+ if request == previous_request:
1165+ sent = True
1166+ complete = is_request_complete_for_rid(previous_request, rid)
1167+ else:
1168+ sent = False
1169+ complete = False
1170+
1171+ requests[rid] = {
1172+ 'sent': sent,
1173+ 'complete': complete,
1174+ }
1175+
1176+ return requests
1177+
1178+
1179+def is_request_sent(request):
1180+ """Check to see if a functionally equivalent request has already been sent
1181+
1182+ Returns True if a similair request has been sent
1183+
1184+ @param request: A CephBrokerRq object
1185+ """
1186+ states = get_request_states(request)
1187+ for rid in states.keys():
1188+ if not states[rid]['sent']:
1189+ return False
1190+
1191+ return True
1192+
1193+
1194+def is_request_complete(request):
1195+ """Check to see if a functionally equivalent request has already been
1196+ completed
1197+
1198+ Returns True if a similair request has been completed
1199+
1200+ @param request: A CephBrokerRq object
1201+ """
1202+ states = get_request_states(request)
1203+ for rid in states.keys():
1204+ if not states[rid]['complete']:
1205+ return False
1206+
1207+ return True
1208+
1209+
1210+def is_request_complete_for_rid(request, rid):
1211+ """Check if a given request has been completed on the given relation
1212+
1213+ @param request: A CephBrokerRq object
1214+ @param rid: Relation ID
1215+ """
1216+ broker_key = get_broker_rsp_key()
1217+ for unit in related_units(rid):
1218+ rdata = relation_get(rid=rid, unit=unit)
1219+ if rdata.get(broker_key):
1220+ rsp = CephBrokerRsp(rdata.get(broker_key))
1221+ if rsp.request_id == request.request_id:
1222+ if not rsp.exit_code:
1223+ return True
1224+ else:
1225+ # The remote unit sent no reply targeted at this unit so either the
1226+ # remote ceph cluster does not support unit targeted replies or it
1227+ # has not processed our request yet.
1228+ if rdata.get('broker_rsp'):
1229+ request_data = json.loads(rdata['broker_rsp'])
1230+ if request_data.get('request-id'):
1231+ log('Ignoring legacy broker_rsp without unit key as remote '
1232+ 'service supports unit specific replies', level=DEBUG)
1233+ else:
1234+ log('Using legacy broker_rsp as remote service does not '
1235+ 'supports unit specific replies', level=DEBUG)
1236+ rsp = CephBrokerRsp(rdata['broker_rsp'])
1237+ if not rsp.exit_code:
1238+ return True
1239+
1240+ return False
1241+
1242+
1243+def get_broker_rsp_key():
1244+ """Return broker response key for this unit
1245+
1246+ This is the key that ceph is going to use to pass request status
1247+ information back to this unit
1248+ """
1249+ return 'broker-rsp-' + local_unit().replace('/', '-')
1250+
1251+
1252+def send_request_if_needed(request):
1253+ """Send broker request if an equivalent request has not already been sent
1254+
1255+ @param request: A CephBrokerRq object
1256+ """
1257+ if is_request_sent(request):
1258+ log('Request already sent but not complete, not sending new request',
1259+ level=DEBUG)
1260+ else:
1261+ for rid in relation_ids('ceph'):
1262+ log('Sending request {}'.format(request.request_id), level=DEBUG)
1263+ relation_set(relation_id=rid, broker_req=request.request)
1264
1265=== modified file 'hooks/charmhelpers/core/host.py'
1266--- hooks/charmhelpers/core/host.py 2015-08-19 13:48:32 +0000
1267+++ hooks/charmhelpers/core/host.py 2015-09-22 06:12:36 +0000
1268@@ -63,12 +63,10 @@
1269 return service_result
1270
1271
1272-def service_pause(service_name, init_dir=None):
1273+def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
1274 """Pause a system service.
1275
1276 Stop it, and prevent it from starting again at boot."""
1277- if init_dir is None:
1278- init_dir = "/etc/init"
1279 stopped = service_stop(service_name)
1280 # XXX: Support systemd too
1281 override_path = os.path.join(
1282@@ -78,7 +76,8 @@
1283 return stopped
1284
1285
1286-def service_resume(service_name, init_dir=None):
1287+def service_resume(service_name, init_dir="/etc/init",
1288+ initd_dir="/etc/init.d"):
1289 """Resume a system service.
1290
1291 Reenable starting again at boot. Start the service"""
1292
1293=== added file 'hooks/charmhelpers/core/kernel.py'
1294--- hooks/charmhelpers/core/kernel.py 1970-01-01 00:00:00 +0000
1295+++ hooks/charmhelpers/core/kernel.py 2015-09-22 06:12:36 +0000
1296@@ -0,0 +1,68 @@
1297+#!/usr/bin/env python
1298+# -*- coding: utf-8 -*-
1299+
1300+# Copyright 2014-2015 Canonical Limited.
1301+#
1302+# This file is part of charm-helpers.
1303+#
1304+# charm-helpers is free software: you can redistribute it and/or modify
1305+# it under the terms of the GNU Lesser General Public License version 3 as
1306+# published by the Free Software Foundation.
1307+#
1308+# charm-helpers is distributed in the hope that it will be useful,
1309+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1310+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1311+# GNU Lesser General Public License for more details.
1312+#
1313+# You should have received a copy of the GNU Lesser General Public License
1314+# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
1315+
1316+__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
1317+
1318+from charmhelpers.core.hookenv import (
1319+ log,
1320+ INFO
1321+)
1322+
1323+from subprocess import check_call, check_output
1324+import re
1325+
1326+
1327+def modprobe(module, persist=True):
1328+ """Load a kernel module and configure for auto-load on reboot."""
1329+ cmd = ['modprobe', module]
1330+
1331+ log('Loading kernel module %s' % module, level=INFO)
1332+
1333+ check_call(cmd)
1334+ if persist:
1335+ with open('/etc/modules', 'r+') as modules:
1336+ if module not in modules.read():
1337+ modules.write(module)
1338+
1339+
1340+def rmmod(module, force=False):
1341+ """Remove a module from the linux kernel"""
1342+ cmd = ['rmmod']
1343+ if force:
1344+ cmd.append('-f')
1345+ cmd.append(module)
1346+ log('Removing kernel module %s' % module, level=INFO)
1347+ return check_call(cmd)
1348+
1349+
1350+def lsmod():
1351+ """Shows what kernel modules are currently loaded"""
1352+ return check_output(['lsmod'],
1353+ universal_newlines=True)
1354+
1355+
1356+def is_module_loaded(module):
1357+ """Checks if a kernel module is already loaded"""
1358+ matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
1359+ return len(matches) > 0
1360+
1361+
1362+def update_initramfs(version='all'):
1363+ """Updates an initramfs image"""
1364+ return check_call(["update-initramfs", "-k", version, "-u"])
1365
1366=== modified file 'hooks/charmhelpers/core/strutils.py'
1367--- hooks/charmhelpers/core/strutils.py 2015-04-14 01:15:40 +0000
1368+++ hooks/charmhelpers/core/strutils.py 2015-09-22 06:12:36 +0000
1369@@ -18,6 +18,7 @@
1370 # along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
1371
1372 import six
1373+import re
1374
1375
1376 def bool_from_string(value):
1377@@ -40,3 +41,32 @@
1378
1379 msg = "Unable to interpret string value '%s' as boolean" % (value)
1380 raise ValueError(msg)
1381+
1382+
1383+def bytes_from_string(value):
1384+ """Interpret human readable string value as bytes.
1385+
1386+ Returns int
1387+ """
1388+ BYTE_POWER = {
1389+ 'K': 1,
1390+ 'KB': 1,
1391+ 'M': 2,
1392+ 'MB': 2,
1393+ 'G': 3,
1394+ 'GB': 3,
1395+ 'T': 4,
1396+ 'TB': 4,
1397+ 'P': 5,
1398+ 'PB': 5,
1399+ }
1400+ if isinstance(value, six.string_types):
1401+ value = six.text_type(value)
1402+ else:
1403+ msg = "Unable to interpret non-string value '%s' as boolean" % (value)
1404+ raise ValueError(msg)
1405+ matches = re.match("([0-9]+)([a-zA-Z]+)", value)
1406+ if not matches:
1407+ msg = "Unable to interpret string value '%s' as bytes" % (value)
1408+ raise ValueError(msg)
1409+ return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
1410
1411=== modified file 'hooks/nova_cc_utils.py'
1412--- hooks/nova_cc_utils.py 2015-07-22 12:01:24 +0000
1413+++ hooks/nova_cc_utils.py 2015-09-22 06:12:36 +0000
1414@@ -724,7 +724,10 @@
1415 def ssh_known_host_key(host, unit=None, user=None):
1416 cmd = ['ssh-keygen', '-f', known_hosts(unit, user), '-H', '-F', host]
1417 try:
1418- return subprocess.check_output(cmd).strip()
1419+ # The first line of output is like '# Host xx found: line 1 type RSA',
1420+ # which should be excluded.
1421+ output = subprocess.check_output(cmd).strip()
1422+ return output.split('\n')[1]
1423 except subprocess.CalledProcessError:
1424 return None
1425
1426@@ -735,6 +738,16 @@
1427 subprocess.check_call(cmd)
1428
1429
1430+def is_same_key(key_1, key_2):
1431+ # The key format get will be like '|1|2rUumCavEXWVaVyB5uMl6m85pZo=|Cp'
1432+ # 'EL6l7VTY37T/fg/ihhNb/GPgs= ssh-rsa AAAAB', we only need to compare
1433+ # the part start with 'ssh-rsa' followed with '= ', because the hash
1434+ # value in the beginning will change each time.
1435+ k_1 = key_1.split('= ')[1]
1436+ k_2 = key_2.split('= ')[1]
1437+ return k_1 == k_2
1438+
1439+
1440 def add_known_host(host, unit=None, user=None):
1441 '''Add variations of host to a known hosts file.'''
1442 cmd = ['ssh-keyscan', '-H', '-t', 'rsa', host]
1443@@ -745,8 +758,8 @@
1444 raise e
1445
1446 current_key = ssh_known_host_key(host, unit, user)
1447- if current_key:
1448- if remote_key == current_key:
1449+ if current_key and remote_key:
1450+ if is_same_key(remote_key, current_key):
1451 log('Known host key for compute host %s up to date.' % host)
1452 return
1453 else:
1454@@ -787,8 +800,7 @@
1455 hosts.append(hn.split('.')[0])
1456
1457 for host in list(set(hosts)):
1458- if not ssh_known_host_key(host, unit, user):
1459- add_known_host(host, unit, user)
1460+ add_known_host(host, unit, user)
1461
1462 if not ssh_authorized_key_exists(public_key, unit, user):
1463 log('Saving SSH authorized key for compute host at %s.' %
1464
1465=== modified file 'tests/charmhelpers/contrib/amulet/deployment.py'
1466--- tests/charmhelpers/contrib/amulet/deployment.py 2015-03-31 11:39:19 +0000
1467+++ tests/charmhelpers/contrib/amulet/deployment.py 2015-09-22 06:12:36 +0000
1468@@ -51,7 +51,8 @@
1469 if 'units' not in this_service:
1470 this_service['units'] = 1
1471
1472- self.d.add(this_service['name'], units=this_service['units'])
1473+ self.d.add(this_service['name'], units=this_service['units'],
1474+ constraints=this_service.get('constraints'))
1475
1476 for svc in other_services:
1477 if 'location' in svc:
1478@@ -64,7 +65,8 @@
1479 if 'units' not in svc:
1480 svc['units'] = 1
1481
1482- self.d.add(svc['name'], charm=branch_location, units=svc['units'])
1483+ self.d.add(svc['name'], charm=branch_location, units=svc['units'],
1484+ constraints=svc.get('constraints'))
1485
1486 def _add_relations(self, relations):
1487 """Add all of the relations for the services."""
1488
1489=== modified file 'tests/charmhelpers/contrib/amulet/utils.py'
1490--- tests/charmhelpers/contrib/amulet/utils.py 2015-08-18 17:34:35 +0000
1491+++ tests/charmhelpers/contrib/amulet/utils.py 2015-09-22 06:12:36 +0000
1492@@ -114,7 +114,7 @@
1493 # /!\ DEPRECATION WARNING (beisner):
1494 # New and existing tests should be rewritten to use
1495 # validate_services_by_name() as it is aware of init systems.
1496- self.log.warn('/!\\ DEPRECATION WARNING: use '
1497+ self.log.warn('DEPRECATION WARNING: use '
1498 'validate_services_by_name instead of validate_services '
1499 'due to init system differences.')
1500
1501@@ -269,33 +269,52 @@
1502 """Get last modification time of directory."""
1503 return sentry_unit.directory_stat(directory)['mtime']
1504
1505- def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
1506- """Get process' start time.
1507-
1508- Determine start time of the process based on the last modification
1509- time of the /proc/pid directory. If pgrep_full is True, the process
1510- name is matched against the full command line.
1511- """
1512- if pgrep_full:
1513- cmd = 'pgrep -o -f {}'.format(service)
1514- else:
1515- cmd = 'pgrep -o {}'.format(service)
1516- cmd = cmd + ' | grep -v pgrep || exit 0'
1517- cmd_out = sentry_unit.run(cmd)
1518- self.log.debug('CMDout: ' + str(cmd_out))
1519- if cmd_out[0]:
1520- self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
1521- proc_dir = '/proc/{}'.format(cmd_out[0].strip())
1522- return self._get_dir_mtime(sentry_unit, proc_dir)
1523+ def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
1524+ """Get start time of a process based on the last modification time
1525+ of the /proc/pid directory.
1526+
1527+ :sentry_unit: The sentry unit to check for the service on
1528+ :service: service name to look for in process table
1529+ :pgrep_full: [Deprecated] Use full command line search mode with pgrep
1530+ :returns: epoch time of service process start
1531+ :param commands: list of bash commands
1532+ :param sentry_units: list of sentry unit pointers
1533+ :returns: None if successful; Failure message otherwise
1534+ """
1535+ if pgrep_full is not None:
1536+ # /!\ DEPRECATION WARNING (beisner):
1537+ # No longer implemented, as pidof is now used instead of pgrep.
1538+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1539+ self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
1540+ 'longer implemented re: lp 1474030.')
1541+
1542+ pid_list = self.get_process_id_list(sentry_unit, service)
1543+ pid = pid_list[0]
1544+ proc_dir = '/proc/{}'.format(pid)
1545+ self.log.debug('Pid for {} on {}: {}'.format(
1546+ service, sentry_unit.info['unit_name'], pid))
1547+
1548+ return self._get_dir_mtime(sentry_unit, proc_dir)
1549
1550 def service_restarted(self, sentry_unit, service, filename,
1551- pgrep_full=False, sleep_time=20):
1552+ pgrep_full=None, sleep_time=20):
1553 """Check if service was restarted.
1554
1555 Compare a service's start time vs a file's last modification time
1556 (such as a config file for that service) to determine if the service
1557 has been restarted.
1558 """
1559+ # /!\ DEPRECATION WARNING (beisner):
1560+ # This method is prone to races in that no before-time is known.
1561+ # Use validate_service_config_changed instead.
1562+
1563+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1564+ # used instead of pgrep. pgrep_full is still passed through to ensure
1565+ # deprecation WARNS. lp1474030
1566+ self.log.warn('DEPRECATION WARNING: use '
1567+ 'validate_service_config_changed instead of '
1568+ 'service_restarted due to known races.')
1569+
1570 time.sleep(sleep_time)
1571 if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
1572 self._get_file_mtime(sentry_unit, filename)):
1573@@ -304,15 +323,15 @@
1574 return False
1575
1576 def service_restarted_since(self, sentry_unit, mtime, service,
1577- pgrep_full=False, sleep_time=20,
1578- retry_count=2):
1579+ pgrep_full=None, sleep_time=20,
1580+ retry_count=2, retry_sleep_time=30):
1581 """Check if service was been started after a given time.
1582
1583 Args:
1584 sentry_unit (sentry): The sentry unit to check for the service on
1585 mtime (float): The epoch time to check against
1586 service (string): service name to look for in process table
1587- pgrep_full (boolean): Use full command line search mode with pgrep
1588+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1589 sleep_time (int): Seconds to sleep before looking for process
1590 retry_count (int): If service is not found, how many times to retry
1591
1592@@ -321,30 +340,44 @@
1593 False if service is older than mtime or if service was
1594 not found.
1595 """
1596- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1597+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1598+ # used instead of pgrep. pgrep_full is still passed through to ensure
1599+ # deprecation WARNS. lp1474030
1600+
1601+ unit_name = sentry_unit.info['unit_name']
1602+ self.log.debug('Checking that %s service restarted since %s on '
1603+ '%s' % (service, mtime, unit_name))
1604 time.sleep(sleep_time)
1605- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1606- pgrep_full)
1607- while retry_count > 0 and not proc_start_time:
1608- self.log.debug('No pid file found for service %s, will retry %i '
1609- 'more times' % (service, retry_count))
1610- time.sleep(30)
1611- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1612- pgrep_full)
1613- retry_count = retry_count - 1
1614+ proc_start_time = None
1615+ tries = 0
1616+ while tries <= retry_count and not proc_start_time:
1617+ try:
1618+ proc_start_time = self._get_proc_start_time(sentry_unit,
1619+ service,
1620+ pgrep_full)
1621+ self.log.debug('Attempt {} to get {} proc start time on {} '
1622+ 'OK'.format(tries, service, unit_name))
1623+ except IOError:
1624+ # NOTE(beisner) - race avoidance, proc may not exist yet.
1625+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1626+ self.log.debug('Attempt {} to get {} proc start time on {} '
1627+ 'failed'.format(tries, service, unit_name))
1628+ time.sleep(retry_sleep_time)
1629+ tries += 1
1630
1631 if not proc_start_time:
1632 self.log.warn('No proc start time found, assuming service did '
1633 'not start')
1634 return False
1635 if proc_start_time >= mtime:
1636- self.log.debug('proc start time is newer than provided mtime'
1637- '(%s >= %s)' % (proc_start_time, mtime))
1638+ self.log.debug('Proc start time is newer than provided mtime'
1639+ '(%s >= %s) on %s (OK)' % (proc_start_time,
1640+ mtime, unit_name))
1641 return True
1642 else:
1643- self.log.warn('proc start time (%s) is older than provided mtime '
1644- '(%s), service did not restart' % (proc_start_time,
1645- mtime))
1646+ self.log.warn('Proc start time (%s) is older than provided mtime '
1647+ '(%s) on %s, service did not '
1648+ 'restart' % (proc_start_time, mtime, unit_name))
1649 return False
1650
1651 def config_updated_since(self, sentry_unit, filename, mtime,
1652@@ -374,8 +407,9 @@
1653 return False
1654
1655 def validate_service_config_changed(self, sentry_unit, mtime, service,
1656- filename, pgrep_full=False,
1657- sleep_time=20, retry_count=2):
1658+ filename, pgrep_full=None,
1659+ sleep_time=20, retry_count=2,
1660+ retry_sleep_time=30):
1661 """Check service and file were updated after mtime
1662
1663 Args:
1664@@ -383,9 +417,10 @@
1665 mtime (float): The epoch time to check against
1666 service (string): service name to look for in process table
1667 filename (string): The file to check mtime of
1668- pgrep_full (boolean): Use full command line search mode with pgrep
1669- sleep_time (int): Seconds to sleep before looking for process
1670+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1671+ sleep_time (int): Initial sleep in seconds to pass to test helpers
1672 retry_count (int): If service is not found, how many times to retry
1673+ retry_sleep_time (int): Time in seconds to wait between retries
1674
1675 Typical Usage:
1676 u = OpenStackAmuletUtils(ERROR)
1677@@ -402,15 +437,25 @@
1678 mtime, False if service is older than mtime or if service was
1679 not found or if filename was modified before mtime.
1680 """
1681- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1682- time.sleep(sleep_time)
1683- service_restart = self.service_restarted_since(sentry_unit, mtime,
1684- service,
1685- pgrep_full=pgrep_full,
1686- sleep_time=0,
1687- retry_count=retry_count)
1688- config_update = self.config_updated_since(sentry_unit, filename, mtime,
1689- sleep_time=0)
1690+
1691+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1692+ # used instead of pgrep. pgrep_full is still passed through to ensure
1693+ # deprecation WARNS. lp1474030
1694+
1695+ service_restart = self.service_restarted_since(
1696+ sentry_unit, mtime,
1697+ service,
1698+ pgrep_full=pgrep_full,
1699+ sleep_time=sleep_time,
1700+ retry_count=retry_count,
1701+ retry_sleep_time=retry_sleep_time)
1702+
1703+ config_update = self.config_updated_since(
1704+ sentry_unit,
1705+ filename,
1706+ mtime,
1707+ sleep_time=0)
1708+
1709 return service_restart and config_update
1710
1711 def get_sentry_time(self, sentry_unit):
1712@@ -428,7 +473,6 @@
1713 """Return a list of all Ubuntu releases in order of release."""
1714 _d = distro_info.UbuntuDistroInfo()
1715 _release_list = _d.all
1716- self.log.debug('Ubuntu release list: {}'.format(_release_list))
1717 return _release_list
1718
1719 def file_to_url(self, file_rel_path):
1720
1721=== modified file 'tests/charmhelpers/contrib/openstack/amulet/utils.py'
1722--- tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-07-16 20:18:38 +0000
1723+++ tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-22 06:12:36 +0000
1724@@ -27,6 +27,7 @@
1725 import heatclient.v1.client as heat_client
1726 import keystoneclient.v2_0 as keystone_client
1727 import novaclient.v1_1.client as nova_client
1728+import pika
1729 import swiftclient
1730
1731 from charmhelpers.contrib.amulet.utils import (
1732@@ -602,3 +603,361 @@
1733 self.log.debug('Ceph {} samples (OK): '
1734 '{}'.format(sample_type, samples))
1735 return None
1736+
1737+# rabbitmq/amqp specific helpers:
1738+ def add_rmq_test_user(self, sentry_units,
1739+ username="testuser1", password="changeme"):
1740+ """Add a test user via the first rmq juju unit, check connection as
1741+ the new user against all sentry units.
1742+
1743+ :param sentry_units: list of sentry unit pointers
1744+ :param username: amqp user name, default to testuser1
1745+ :param password: amqp user password
1746+ :returns: None if successful. Raise on error.
1747+ """
1748+ self.log.debug('Adding rmq user ({})...'.format(username))
1749+
1750+ # Check that user does not already exist
1751+ cmd_user_list = 'rabbitmqctl list_users'
1752+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
1753+ if username in output:
1754+ self.log.warning('User ({}) already exists, returning '
1755+ 'gracefully.'.format(username))
1756+ return
1757+
1758+ perms = '".*" ".*" ".*"'
1759+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
1760+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
1761+
1762+ # Add user via first unit
1763+ for cmd in cmds:
1764+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
1765+
1766+ # Check connection against the other sentry_units
1767+ self.log.debug('Checking user connect against units...')
1768+ for sentry_unit in sentry_units:
1769+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
1770+ username=username,
1771+ password=password)
1772+ connection.close()
1773+
1774+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
1775+ """Delete a rabbitmq user via the first rmq juju unit.
1776+
1777+ :param sentry_units: list of sentry unit pointers
1778+ :param username: amqp user name, default to testuser1
1779+ :param password: amqp user password
1780+ :returns: None if successful or no such user.
1781+ """
1782+ self.log.debug('Deleting rmq user ({})...'.format(username))
1783+
1784+ # Check that the user exists
1785+ cmd_user_list = 'rabbitmqctl list_users'
1786+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
1787+
1788+ if username not in output:
1789+ self.log.warning('User ({}) does not exist, returning '
1790+ 'gracefully.'.format(username))
1791+ return
1792+
1793+ # Delete the user
1794+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
1795+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
1796+
1797+ def get_rmq_cluster_status(self, sentry_unit):
1798+ """Execute rabbitmq cluster status command on a unit and return
1799+ the full output.
1800+
1801+ :param unit: sentry unit
1802+ :returns: String containing console output of cluster status command
1803+ """
1804+ cmd = 'rabbitmqctl cluster_status'
1805+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
1806+ self.log.debug('{} cluster_status:\n{}'.format(
1807+ sentry_unit.info['unit_name'], output))
1808+ return str(output)
1809+
1810+ def get_rmq_cluster_running_nodes(self, sentry_unit):
1811+ """Parse rabbitmqctl cluster_status output string, return list of
1812+ running rabbitmq cluster nodes.
1813+
1814+ :param unit: sentry unit
1815+ :returns: List containing node names of running nodes
1816+ """
1817+ # NOTE(beisner): rabbitmqctl cluster_status output is not
1818+ # json-parsable, do string chop foo, then json.loads that.
1819+ str_stat = self.get_rmq_cluster_status(sentry_unit)
1820+ if 'running_nodes' in str_stat:
1821+ pos_start = str_stat.find("{running_nodes,") + 15
1822+ pos_end = str_stat.find("]},", pos_start) + 1
1823+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
1824+ run_nodes = json.loads(str_run_nodes)
1825+ return run_nodes
1826+ else:
1827+ return []
1828+
1829+ def validate_rmq_cluster_running_nodes(self, sentry_units):
1830+ """Check that all rmq unit hostnames are represented in the
1831+ cluster_status output of all units.
1832+
1833+ :param host_names: dict of juju unit names to host names
1834+ :param units: list of sentry unit pointers (all rmq units)
1835+ :returns: None if successful, otherwise return error message
1836+ """
1837+ host_names = self.get_unit_hostnames(sentry_units)
1838+ errors = []
1839+
1840+ # Query every unit for cluster_status running nodes
1841+ for query_unit in sentry_units:
1842+ query_unit_name = query_unit.info['unit_name']
1843+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
1844+
1845+ # Confirm that every unit is represented in the queried unit's
1846+ # cluster_status running nodes output.
1847+ for validate_unit in sentry_units:
1848+ val_host_name = host_names[validate_unit.info['unit_name']]
1849+ val_node_name = 'rabbit@{}'.format(val_host_name)
1850+
1851+ if val_node_name not in running_nodes:
1852+ errors.append('Cluster member check failed on {}: {} not '
1853+ 'in {}\n'.format(query_unit_name,
1854+ val_node_name,
1855+ running_nodes))
1856+ if errors:
1857+ return ''.join(errors)
1858+
1859+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
1860+ """Check a single juju rmq unit for ssl and port in the config file."""
1861+ host = sentry_unit.info['public-address']
1862+ unit_name = sentry_unit.info['unit_name']
1863+
1864+ conf_file = '/etc/rabbitmq/rabbitmq.config'
1865+ conf_contents = str(self.file_contents_safe(sentry_unit,
1866+ conf_file, max_wait=16))
1867+ # Checks
1868+ conf_ssl = 'ssl' in conf_contents
1869+ conf_port = str(port) in conf_contents
1870+
1871+ # Port explicitly checked in config
1872+ if port and conf_port and conf_ssl:
1873+ self.log.debug('SSL is enabled @{}:{} '
1874+ '({})'.format(host, port, unit_name))
1875+ return True
1876+ elif port and not conf_port and conf_ssl:
1877+ self.log.debug('SSL is enabled @{} but not on port {} '
1878+ '({})'.format(host, port, unit_name))
1879+ return False
1880+ # Port not checked (useful when checking that ssl is disabled)
1881+ elif not port and conf_ssl:
1882+ self.log.debug('SSL is enabled @{}:{} '
1883+ '({})'.format(host, port, unit_name))
1884+ return True
1885+ elif not port and not conf_ssl:
1886+ self.log.debug('SSL not enabled @{}:{} '
1887+ '({})'.format(host, port, unit_name))
1888+ return False
1889+ else:
1890+ msg = ('Unknown condition when checking SSL status @{}:{} '
1891+ '({})'.format(host, port, unit_name))
1892+ amulet.raise_status(amulet.FAIL, msg)
1893+
1894+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
1895+ """Check that ssl is enabled on rmq juju sentry units.
1896+
1897+ :param sentry_units: list of all rmq sentry units
1898+ :param port: optional ssl port override to validate
1899+ :returns: None if successful, otherwise return error message
1900+ """
1901+ for sentry_unit in sentry_units:
1902+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
1903+ return ('Unexpected condition: ssl is disabled on unit '
1904+ '({})'.format(sentry_unit.info['unit_name']))
1905+ return None
1906+
1907+ def validate_rmq_ssl_disabled_units(self, sentry_units):
1908+ """Check that ssl is enabled on listed rmq juju sentry units.
1909+
1910+ :param sentry_units: list of all rmq sentry units
1911+ :returns: True if successful. Raise on error.
1912+ """
1913+ for sentry_unit in sentry_units:
1914+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
1915+ return ('Unexpected condition: ssl is enabled on unit '
1916+ '({})'.format(sentry_unit.info['unit_name']))
1917+ return None
1918+
1919+ def configure_rmq_ssl_on(self, sentry_units, deployment,
1920+ port=None, max_wait=60):
1921+ """Turn ssl charm config option on, with optional non-default
1922+ ssl port specification. Confirm that it is enabled on every
1923+ unit.
1924+
1925+ :param sentry_units: list of sentry units
1926+ :param deployment: amulet deployment object pointer
1927+ :param port: amqp port, use defaults if None
1928+ :param max_wait: maximum time to wait in seconds to confirm
1929+ :returns: None if successful. Raise on error.
1930+ """
1931+ self.log.debug('Setting ssl charm config option: on')
1932+
1933+ # Enable RMQ SSL
1934+ config = {'ssl': 'on'}
1935+ if port:
1936+ config['ssl_port'] = port
1937+
1938+ deployment.configure('rabbitmq-server', config)
1939+
1940+ # Confirm
1941+ tries = 0
1942+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
1943+ while ret and tries < (max_wait / 4):
1944+ time.sleep(4)
1945+ self.log.debug('Attempt {}: {}'.format(tries, ret))
1946+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
1947+ tries += 1
1948+
1949+ if ret:
1950+ amulet.raise_status(amulet.FAIL, ret)
1951+
1952+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
1953+ """Turn ssl charm config option off, confirm that it is disabled
1954+ on every unit.
1955+
1956+ :param sentry_units: list of sentry units
1957+ :param deployment: amulet deployment object pointer
1958+ :param max_wait: maximum time to wait in seconds to confirm
1959+ :returns: None if successful. Raise on error.
1960+ """
1961+ self.log.debug('Setting ssl charm config option: off')
1962+
1963+ # Disable RMQ SSL
1964+ config = {'ssl': 'off'}
1965+ deployment.configure('rabbitmq-server', config)
1966+
1967+ # Confirm
1968+ tries = 0
1969+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
1970+ while ret and tries < (max_wait / 4):
1971+ time.sleep(4)
1972+ self.log.debug('Attempt {}: {}'.format(tries, ret))
1973+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
1974+ tries += 1
1975+
1976+ if ret:
1977+ amulet.raise_status(amulet.FAIL, ret)
1978+
1979+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
1980+ port=None, fatal=True,
1981+ username="testuser1", password="changeme"):
1982+ """Establish and return a pika amqp connection to the rabbitmq service
1983+ running on a rmq juju unit.
1984+
1985+ :param sentry_unit: sentry unit pointer
1986+ :param ssl: boolean, default to False
1987+ :param port: amqp port, use defaults if None
1988+ :param fatal: boolean, default to True (raises on connect error)
1989+ :param username: amqp user name, default to testuser1
1990+ :param password: amqp user password
1991+ :returns: pika amqp connection pointer or None if failed and non-fatal
1992+ """
1993+ host = sentry_unit.info['public-address']
1994+ unit_name = sentry_unit.info['unit_name']
1995+
1996+ # Default port logic if port is not specified
1997+ if ssl and not port:
1998+ port = 5671
1999+ elif not ssl and not port:
2000+ port = 5672
2001+
2002+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
2003+ '{}...'.format(host, port, unit_name, username))
2004+
2005+ try:
2006+ credentials = pika.PlainCredentials(username, password)
2007+ parameters = pika.ConnectionParameters(host=host, port=port,
2008+ credentials=credentials,
2009+ ssl=ssl,
2010+ connection_attempts=3,
2011+ retry_delay=5,
2012+ socket_timeout=1)
2013+ connection = pika.BlockingConnection(parameters)
2014+ assert connection.server_properties['product'] == 'RabbitMQ'
2015+ self.log.debug('Connect OK')
2016+ return connection
2017+ except Exception as e:
2018+ msg = ('amqp connection failed to {}:{} as '
2019+ '{} ({})'.format(host, port, username, str(e)))
2020+ if fatal:
2021+ amulet.raise_status(amulet.FAIL, msg)
2022+ else:
2023+ self.log.warn(msg)
2024+ return None
2025+
2026+ def publish_amqp_message_by_unit(self, sentry_unit, message,
2027+ queue="test", ssl=False,
2028+ username="testuser1",
2029+ password="changeme",
2030+ port=None):
2031+ """Publish an amqp message to a rmq juju unit.
2032+
2033+ :param sentry_unit: sentry unit pointer
2034+ :param message: amqp message string
2035+ :param queue: message queue, default to test
2036+ :param username: amqp user name, default to testuser1
2037+ :param password: amqp user password
2038+ :param ssl: boolean, default to False
2039+ :param port: amqp port, use defaults if None
2040+ :returns: None. Raises exception if publish failed.
2041+ """
2042+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
2043+ message))
2044+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
2045+ port=port,
2046+ username=username,
2047+ password=password)
2048+
2049+ # NOTE(beisner): extra debug here re: pika hang potential:
2050+ # https://github.com/pika/pika/issues/297
2051+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
2052+ self.log.debug('Defining channel...')
2053+ channel = connection.channel()
2054+ self.log.debug('Declaring queue...')
2055+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
2056+ self.log.debug('Publishing message...')
2057+ channel.basic_publish(exchange='', routing_key=queue, body=message)
2058+ self.log.debug('Closing channel...')
2059+ channel.close()
2060+ self.log.debug('Closing connection...')
2061+ connection.close()
2062+
2063+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
2064+ username="testuser1",
2065+ password="changeme",
2066+ ssl=False, port=None):
2067+ """Get an amqp message from a rmq juju unit.
2068+
2069+ :param sentry_unit: sentry unit pointer
2070+ :param queue: message queue, default to test
2071+ :param username: amqp user name, default to testuser1
2072+ :param password: amqp user password
2073+ :param ssl: boolean, default to False
2074+ :param port: amqp port, use defaults if None
2075+ :returns: amqp message body as string. Raise if get fails.
2076+ """
2077+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
2078+ port=port,
2079+ username=username,
2080+ password=password)
2081+ channel = connection.channel()
2082+ method_frame, _, body = channel.basic_get(queue)
2083+
2084+ if method_frame:
2085+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
2086+ body))
2087+ channel.basic_ack(method_frame.delivery_tag)
2088+ channel.close()
2089+ connection.close()
2090+ return body
2091+ else:
2092+ msg = 'No message retrieved.'
2093+ amulet.raise_status(amulet.FAIL, msg)
2094
2095=== modified file 'unit_tests/test_nova_cc_utils.py'
2096--- unit_tests/test_nova_cc_utils.py 2015-06-10 15:48:34 +0000
2097+++ unit_tests/test_nova_cc_utils.py 2015-09-22 06:12:36 +0000
2098@@ -416,8 +416,8 @@
2099 @patch.object(utils, 'ssh_known_host_key')
2100 @patch('subprocess.check_output')
2101 def test_add_known_host_exists(self, check_output, host_key, rm):
2102- check_output.return_value = 'fookey'
2103- host_key.return_value = 'fookey'
2104+ check_output.return_value = '|1|= fookey'
2105+ host_key.return_value = '|1|= fookey'
2106 with patch_open() as (_open, _file):
2107 utils.add_known_host('foohost')
2108 self.assertFalse(rm.called)
2109@@ -429,8 +429,8 @@
2110 @patch('subprocess.check_output')
2111 def test_add_known_host_exists_outdated(
2112 self, check_output, host_key, rm, known_hosts):
2113- check_output.return_value = 'fookey'
2114- host_key.return_value = 'fookey_old'
2115+ check_output.return_value = '|1|= fookey'
2116+ host_key.return_value = '|1|= fookey_old'
2117 with patch_open() as (_open, _file):
2118 utils.add_known_host('foohost', None, None)
2119 rm.assert_called_with('foohost', None, None)
2120@@ -441,13 +441,13 @@
2121 @patch('subprocess.check_output')
2122 def test_add_known_host_exists_added(
2123 self, check_output, host_key, rm, known_hosts):
2124- check_output.return_value = 'fookey'
2125+ check_output.return_value = '|1|= fookey'
2126 host_key.return_value = None
2127 with patch_open() as (_open, _file):
2128 _file.write = MagicMock()
2129 utils.add_known_host('foohost')
2130 self.assertFalse(rm.called)
2131- _file.write.assert_called_with('fookey\n')
2132+ _file.write.assert_called_with('|1|= fookey\n')
2133
2134 @patch('__builtin__.open')
2135 @patch('os.mkdir')

Subscribers

People subscribed via source and target branches