Merge lp:~hopem/charms/trusty/neutron-openvswitch/lp1497517 into lp:~openstack-charmers-archive/charms/trusty/neutron-openvswitch/next

Proposed by Edward Hope-Morley
Status: Merged
Merged at revision: 88
Proposed branch: lp:~hopem/charms/trusty/neutron-openvswitch/lp1497517
Merge into: lp:~openstack-charmers-archive/charms/trusty/neutron-openvswitch/next
Diff against target: 2494 lines (+1748/-136)
16 files modified
charm-helpers-hooks.yaml (+1/-1)
hooks/charmhelpers/contrib/network/ip.py (+5/-3)
hooks/charmhelpers/contrib/openstack/amulet/deployment.py (+23/-9)
hooks/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
hooks/charmhelpers/contrib/openstack/context.py (+72/-17)
hooks/charmhelpers/contrib/openstack/templating.py (+30/-2)
hooks/charmhelpers/contrib/openstack/utils.py (+224/-1)
hooks/charmhelpers/contrib/storage/linux/ceph.py (+226/-13)
hooks/charmhelpers/core/host.py (+32/-16)
hooks/charmhelpers/core/hugepage.py (+8/-1)
hooks/charmhelpers/core/kernel.py (+68/-0)
hooks/charmhelpers/core/strutils.py (+30/-0)
tests/charmhelpers/contrib/amulet/deployment.py (+4/-2)
tests/charmhelpers/contrib/amulet/utils.py (+284/-62)
tests/charmhelpers/contrib/openstack/amulet/deployment.py (+23/-9)
tests/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
To merge this branch: bzr merge lp:~hopem/charms/trusty/neutron-openvswitch/lp1497517
Reviewer Review Type Date Requested Status
Liam Young (community) Approve
Review via email: mp+271888@code.launchpad.net
To post a comment you must log in.
Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #9612 neutron-openvswitch-next for hopem mp271888
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/9612/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #10425 neutron-openvswitch-next for hopem mp271888
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/10425/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6563 neutron-openvswitch-next for hopem mp271888
    AMULET FAIL: amulet-test failed

AMULET Results (max last 2 lines):
make: *** [test] Error 1
ERROR:root:Make target returned non-zero.

Full amulet test output: http://paste.ubuntu.com/12519382/
Build: http://10.245.162.77:8080/job/charm_amulet_test/6563/

88. By Edward Hope-Morley

sync charmhelpers

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #10774 neutron-openvswitch-next for hopem mp271888
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/10774/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #9953 neutron-openvswitch-next for hopem mp271888
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/9953/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6779 neutron-openvswitch-next for hopem mp271888
    AMULET FAIL: amulet-test failed

AMULET Results (max last 2 lines):
make: *** [test] Error 1
ERROR:root:Make target returned non-zero.

Full amulet test output: http://paste.ubuntu.com/12556955/
Build: http://10.245.162.77:8080/job/charm_amulet_test/6779/

Revision history for this message
Liam Young (gnuoy) wrote :

LGTM, amulet fail looks unconnected

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'charm-helpers-hooks.yaml'
2--- charm-helpers-hooks.yaml 2015-08-20 09:27:14 +0000
3+++ charm-helpers-hooks.yaml 2015-09-25 16:48:59 +0000
4@@ -1,4 +1,4 @@
5-branch: lp:charm-helpers
6+branch: lp:~hopem/charm-helpers/lp1497517
7 destination: hooks/charmhelpers
8 include:
9 - core
10
11=== modified file 'hooks/charmhelpers/contrib/network/ip.py'
12--- hooks/charmhelpers/contrib/network/ip.py 2015-09-03 09:45:37 +0000
13+++ hooks/charmhelpers/contrib/network/ip.py 2015-09-25 16:48:59 +0000
14@@ -23,7 +23,7 @@
15 from functools import partial
16
17 from charmhelpers.core.hookenv import unit_get
18-from charmhelpers.fetch import apt_install
19+from charmhelpers.fetch import apt_install, apt_update
20 from charmhelpers.core.hookenv import (
21 log,
22 WARNING,
23@@ -32,13 +32,15 @@
24 try:
25 import netifaces
26 except ImportError:
27- apt_install('python-netifaces')
28+ apt_update(fatal=True)
29+ apt_install('python-netifaces', fatal=True)
30 import netifaces
31
32 try:
33 import netaddr
34 except ImportError:
35- apt_install('python-netaddr')
36+ apt_update(fatal=True)
37+ apt_install('python-netaddr', fatal=True)
38 import netaddr
39
40
41
42=== modified file 'hooks/charmhelpers/contrib/openstack/amulet/deployment.py'
43--- hooks/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-18 21:19:31 +0000
44+++ hooks/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-25 16:48:59 +0000
45@@ -44,20 +44,31 @@
46 Determine if the local branch being tested is derived from its
47 stable or next (dev) branch, and based on this, use the corresonding
48 stable or next branches for the other_services."""
49+
50+ # Charms outside the lp:~openstack-charmers namespace
51 base_charms = ['mysql', 'mongodb', 'nrpe']
52
53+ # Force these charms to current series even when using an older series.
54+ # ie. Use trusty/nrpe even when series is precise, as the P charm
55+ # does not possess the necessary external master config and hooks.
56+ force_series_current = ['nrpe']
57+
58 if self.series in ['precise', 'trusty']:
59 base_series = self.series
60 else:
61 base_series = self.current_next
62
63- if self.stable:
64- for svc in other_services:
65+ for svc in other_services:
66+ if svc['name'] in force_series_current:
67+ base_series = self.current_next
68+ # If a location has been explicitly set, use it
69+ if svc.get('location'):
70+ continue
71+ if self.stable:
72 temp = 'lp:charms/{}/{}'
73 svc['location'] = temp.format(base_series,
74 svc['name'])
75- else:
76- for svc in other_services:
77+ else:
78 if svc['name'] in base_charms:
79 temp = 'lp:charms/{}/{}'
80 svc['location'] = temp.format(base_series,
81@@ -66,6 +77,7 @@
82 temp = 'lp:~openstack-charmers/charms/{}/{}/next'
83 svc['location'] = temp.format(self.current_next,
84 svc['name'])
85+
86 return other_services
87
88 def _add_services(self, this_service, other_services):
89@@ -77,21 +89,23 @@
90
91 services = other_services
92 services.append(this_service)
93+
94+ # Charms which should use the source config option
95 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
96 'ceph-osd', 'ceph-radosgw']
97- # Most OpenStack subordinate charms do not expose an origin option
98- # as that is controlled by the principle.
99- ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
100+
101+ # Charms which can not use openstack-origin, ie. many subordinates
102+ no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
103
104 if self.openstack:
105 for svc in services:
106- if svc['name'] not in use_source + ignore:
107+ if svc['name'] not in use_source + no_origin:
108 config = {'openstack-origin': self.openstack}
109 self.d.configure(svc['name'], config)
110
111 if self.source:
112 for svc in services:
113- if svc['name'] in use_source and svc['name'] not in ignore:
114+ if svc['name'] in use_source and svc['name'] not in no_origin:
115 config = {'source': self.source}
116 self.d.configure(svc['name'], config)
117
118
119=== modified file 'hooks/charmhelpers/contrib/openstack/amulet/utils.py'
120--- hooks/charmhelpers/contrib/openstack/amulet/utils.py 2015-07-16 20:18:23 +0000
121+++ hooks/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-25 16:48:59 +0000
122@@ -27,6 +27,7 @@
123 import heatclient.v1.client as heat_client
124 import keystoneclient.v2_0 as keystone_client
125 import novaclient.v1_1.client as nova_client
126+import pika
127 import swiftclient
128
129 from charmhelpers.contrib.amulet.utils import (
130@@ -602,3 +603,361 @@
131 self.log.debug('Ceph {} samples (OK): '
132 '{}'.format(sample_type, samples))
133 return None
134+
135+# rabbitmq/amqp specific helpers:
136+ def add_rmq_test_user(self, sentry_units,
137+ username="testuser1", password="changeme"):
138+ """Add a test user via the first rmq juju unit, check connection as
139+ the new user against all sentry units.
140+
141+ :param sentry_units: list of sentry unit pointers
142+ :param username: amqp user name, default to testuser1
143+ :param password: amqp user password
144+ :returns: None if successful. Raise on error.
145+ """
146+ self.log.debug('Adding rmq user ({})...'.format(username))
147+
148+ # Check that user does not already exist
149+ cmd_user_list = 'rabbitmqctl list_users'
150+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
151+ if username in output:
152+ self.log.warning('User ({}) already exists, returning '
153+ 'gracefully.'.format(username))
154+ return
155+
156+ perms = '".*" ".*" ".*"'
157+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
158+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
159+
160+ # Add user via first unit
161+ for cmd in cmds:
162+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
163+
164+ # Check connection against the other sentry_units
165+ self.log.debug('Checking user connect against units...')
166+ for sentry_unit in sentry_units:
167+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
168+ username=username,
169+ password=password)
170+ connection.close()
171+
172+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
173+ """Delete a rabbitmq user via the first rmq juju unit.
174+
175+ :param sentry_units: list of sentry unit pointers
176+ :param username: amqp user name, default to testuser1
177+ :param password: amqp user password
178+ :returns: None if successful or no such user.
179+ """
180+ self.log.debug('Deleting rmq user ({})...'.format(username))
181+
182+ # Check that the user exists
183+ cmd_user_list = 'rabbitmqctl list_users'
184+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
185+
186+ if username not in output:
187+ self.log.warning('User ({}) does not exist, returning '
188+ 'gracefully.'.format(username))
189+ return
190+
191+ # Delete the user
192+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
193+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
194+
195+ def get_rmq_cluster_status(self, sentry_unit):
196+ """Execute rabbitmq cluster status command on a unit and return
197+ the full output.
198+
199+ :param unit: sentry unit
200+ :returns: String containing console output of cluster status command
201+ """
202+ cmd = 'rabbitmqctl cluster_status'
203+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
204+ self.log.debug('{} cluster_status:\n{}'.format(
205+ sentry_unit.info['unit_name'], output))
206+ return str(output)
207+
208+ def get_rmq_cluster_running_nodes(self, sentry_unit):
209+ """Parse rabbitmqctl cluster_status output string, return list of
210+ running rabbitmq cluster nodes.
211+
212+ :param unit: sentry unit
213+ :returns: List containing node names of running nodes
214+ """
215+ # NOTE(beisner): rabbitmqctl cluster_status output is not
216+ # json-parsable, do string chop foo, then json.loads that.
217+ str_stat = self.get_rmq_cluster_status(sentry_unit)
218+ if 'running_nodes' in str_stat:
219+ pos_start = str_stat.find("{running_nodes,") + 15
220+ pos_end = str_stat.find("]},", pos_start) + 1
221+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
222+ run_nodes = json.loads(str_run_nodes)
223+ return run_nodes
224+ else:
225+ return []
226+
227+ def validate_rmq_cluster_running_nodes(self, sentry_units):
228+ """Check that all rmq unit hostnames are represented in the
229+ cluster_status output of all units.
230+
231+ :param host_names: dict of juju unit names to host names
232+ :param units: list of sentry unit pointers (all rmq units)
233+ :returns: None if successful, otherwise return error message
234+ """
235+ host_names = self.get_unit_hostnames(sentry_units)
236+ errors = []
237+
238+ # Query every unit for cluster_status running nodes
239+ for query_unit in sentry_units:
240+ query_unit_name = query_unit.info['unit_name']
241+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
242+
243+ # Confirm that every unit is represented in the queried unit's
244+ # cluster_status running nodes output.
245+ for validate_unit in sentry_units:
246+ val_host_name = host_names[validate_unit.info['unit_name']]
247+ val_node_name = 'rabbit@{}'.format(val_host_name)
248+
249+ if val_node_name not in running_nodes:
250+ errors.append('Cluster member check failed on {}: {} not '
251+ 'in {}\n'.format(query_unit_name,
252+ val_node_name,
253+ running_nodes))
254+ if errors:
255+ return ''.join(errors)
256+
257+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
258+ """Check a single juju rmq unit for ssl and port in the config file."""
259+ host = sentry_unit.info['public-address']
260+ unit_name = sentry_unit.info['unit_name']
261+
262+ conf_file = '/etc/rabbitmq/rabbitmq.config'
263+ conf_contents = str(self.file_contents_safe(sentry_unit,
264+ conf_file, max_wait=16))
265+ # Checks
266+ conf_ssl = 'ssl' in conf_contents
267+ conf_port = str(port) in conf_contents
268+
269+ # Port explicitly checked in config
270+ if port and conf_port and conf_ssl:
271+ self.log.debug('SSL is enabled @{}:{} '
272+ '({})'.format(host, port, unit_name))
273+ return True
274+ elif port and not conf_port and conf_ssl:
275+ self.log.debug('SSL is enabled @{} but not on port {} '
276+ '({})'.format(host, port, unit_name))
277+ return False
278+ # Port not checked (useful when checking that ssl is disabled)
279+ elif not port and conf_ssl:
280+ self.log.debug('SSL is enabled @{}:{} '
281+ '({})'.format(host, port, unit_name))
282+ return True
283+ elif not port and not conf_ssl:
284+ self.log.debug('SSL not enabled @{}:{} '
285+ '({})'.format(host, port, unit_name))
286+ return False
287+ else:
288+ msg = ('Unknown condition when checking SSL status @{}:{} '
289+ '({})'.format(host, port, unit_name))
290+ amulet.raise_status(amulet.FAIL, msg)
291+
292+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
293+ """Check that ssl is enabled on rmq juju sentry units.
294+
295+ :param sentry_units: list of all rmq sentry units
296+ :param port: optional ssl port override to validate
297+ :returns: None if successful, otherwise return error message
298+ """
299+ for sentry_unit in sentry_units:
300+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
301+ return ('Unexpected condition: ssl is disabled on unit '
302+ '({})'.format(sentry_unit.info['unit_name']))
303+ return None
304+
305+ def validate_rmq_ssl_disabled_units(self, sentry_units):
306+ """Check that ssl is enabled on listed rmq juju sentry units.
307+
308+ :param sentry_units: list of all rmq sentry units
309+ :returns: True if successful. Raise on error.
310+ """
311+ for sentry_unit in sentry_units:
312+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
313+ return ('Unexpected condition: ssl is enabled on unit '
314+ '({})'.format(sentry_unit.info['unit_name']))
315+ return None
316+
317+ def configure_rmq_ssl_on(self, sentry_units, deployment,
318+ port=None, max_wait=60):
319+ """Turn ssl charm config option on, with optional non-default
320+ ssl port specification. Confirm that it is enabled on every
321+ unit.
322+
323+ :param sentry_units: list of sentry units
324+ :param deployment: amulet deployment object pointer
325+ :param port: amqp port, use defaults if None
326+ :param max_wait: maximum time to wait in seconds to confirm
327+ :returns: None if successful. Raise on error.
328+ """
329+ self.log.debug('Setting ssl charm config option: on')
330+
331+ # Enable RMQ SSL
332+ config = {'ssl': 'on'}
333+ if port:
334+ config['ssl_port'] = port
335+
336+ deployment.configure('rabbitmq-server', config)
337+
338+ # Confirm
339+ tries = 0
340+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
341+ while ret and tries < (max_wait / 4):
342+ time.sleep(4)
343+ self.log.debug('Attempt {}: {}'.format(tries, ret))
344+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
345+ tries += 1
346+
347+ if ret:
348+ amulet.raise_status(amulet.FAIL, ret)
349+
350+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
351+ """Turn ssl charm config option off, confirm that it is disabled
352+ on every unit.
353+
354+ :param sentry_units: list of sentry units
355+ :param deployment: amulet deployment object pointer
356+ :param max_wait: maximum time to wait in seconds to confirm
357+ :returns: None if successful. Raise on error.
358+ """
359+ self.log.debug('Setting ssl charm config option: off')
360+
361+ # Disable RMQ SSL
362+ config = {'ssl': 'off'}
363+ deployment.configure('rabbitmq-server', config)
364+
365+ # Confirm
366+ tries = 0
367+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
368+ while ret and tries < (max_wait / 4):
369+ time.sleep(4)
370+ self.log.debug('Attempt {}: {}'.format(tries, ret))
371+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
372+ tries += 1
373+
374+ if ret:
375+ amulet.raise_status(amulet.FAIL, ret)
376+
377+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
378+ port=None, fatal=True,
379+ username="testuser1", password="changeme"):
380+ """Establish and return a pika amqp connection to the rabbitmq service
381+ running on a rmq juju unit.
382+
383+ :param sentry_unit: sentry unit pointer
384+ :param ssl: boolean, default to False
385+ :param port: amqp port, use defaults if None
386+ :param fatal: boolean, default to True (raises on connect error)
387+ :param username: amqp user name, default to testuser1
388+ :param password: amqp user password
389+ :returns: pika amqp connection pointer or None if failed and non-fatal
390+ """
391+ host = sentry_unit.info['public-address']
392+ unit_name = sentry_unit.info['unit_name']
393+
394+ # Default port logic if port is not specified
395+ if ssl and not port:
396+ port = 5671
397+ elif not ssl and not port:
398+ port = 5672
399+
400+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
401+ '{}...'.format(host, port, unit_name, username))
402+
403+ try:
404+ credentials = pika.PlainCredentials(username, password)
405+ parameters = pika.ConnectionParameters(host=host, port=port,
406+ credentials=credentials,
407+ ssl=ssl,
408+ connection_attempts=3,
409+ retry_delay=5,
410+ socket_timeout=1)
411+ connection = pika.BlockingConnection(parameters)
412+ assert connection.server_properties['product'] == 'RabbitMQ'
413+ self.log.debug('Connect OK')
414+ return connection
415+ except Exception as e:
416+ msg = ('amqp connection failed to {}:{} as '
417+ '{} ({})'.format(host, port, username, str(e)))
418+ if fatal:
419+ amulet.raise_status(amulet.FAIL, msg)
420+ else:
421+ self.log.warn(msg)
422+ return None
423+
424+ def publish_amqp_message_by_unit(self, sentry_unit, message,
425+ queue="test", ssl=False,
426+ username="testuser1",
427+ password="changeme",
428+ port=None):
429+ """Publish an amqp message to a rmq juju unit.
430+
431+ :param sentry_unit: sentry unit pointer
432+ :param message: amqp message string
433+ :param queue: message queue, default to test
434+ :param username: amqp user name, default to testuser1
435+ :param password: amqp user password
436+ :param ssl: boolean, default to False
437+ :param port: amqp port, use defaults if None
438+ :returns: None. Raises exception if publish failed.
439+ """
440+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
441+ message))
442+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
443+ port=port,
444+ username=username,
445+ password=password)
446+
447+ # NOTE(beisner): extra debug here re: pika hang potential:
448+ # https://github.com/pika/pika/issues/297
449+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
450+ self.log.debug('Defining channel...')
451+ channel = connection.channel()
452+ self.log.debug('Declaring queue...')
453+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
454+ self.log.debug('Publishing message...')
455+ channel.basic_publish(exchange='', routing_key=queue, body=message)
456+ self.log.debug('Closing channel...')
457+ channel.close()
458+ self.log.debug('Closing connection...')
459+ connection.close()
460+
461+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
462+ username="testuser1",
463+ password="changeme",
464+ ssl=False, port=None):
465+ """Get an amqp message from a rmq juju unit.
466+
467+ :param sentry_unit: sentry unit pointer
468+ :param queue: message queue, default to test
469+ :param username: amqp user name, default to testuser1
470+ :param password: amqp user password
471+ :param ssl: boolean, default to False
472+ :param port: amqp port, use defaults if None
473+ :returns: amqp message body as string. Raise if get fails.
474+ """
475+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
476+ port=port,
477+ username=username,
478+ password=password)
479+ channel = connection.channel()
480+ method_frame, _, body = channel.basic_get(queue)
481+
482+ if method_frame:
483+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
484+ body))
485+ channel.basic_ack(method_frame.delivery_tag)
486+ channel.close()
487+ connection.close()
488+ return body
489+ else:
490+ msg = 'No message retrieved.'
491+ amulet.raise_status(amulet.FAIL, msg)
492
493=== modified file 'hooks/charmhelpers/contrib/openstack/context.py'
494--- hooks/charmhelpers/contrib/openstack/context.py 2015-09-03 09:45:37 +0000
495+++ hooks/charmhelpers/contrib/openstack/context.py 2015-09-25 16:48:59 +0000
496@@ -14,6 +14,7 @@
497 # You should have received a copy of the GNU Lesser General Public License
498 # along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
499
500+import glob
501 import json
502 import os
503 import re
504@@ -194,10 +195,50 @@
505 class OSContextGenerator(object):
506 """Base class for all context generators."""
507 interfaces = []
508+ related = False
509+ complete = False
510+ missing_data = []
511
512 def __call__(self):
513 raise NotImplementedError
514
515+ def context_complete(self, ctxt):
516+ """Check for missing data for the required context data.
517+ Set self.missing_data if it exists and return False.
518+ Set self.complete if no missing data and return True.
519+ """
520+ # Fresh start
521+ self.complete = False
522+ self.missing_data = []
523+ for k, v in six.iteritems(ctxt):
524+ if v is None or v == '':
525+ if k not in self.missing_data:
526+ self.missing_data.append(k)
527+
528+ if self.missing_data:
529+ self.complete = False
530+ log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO)
531+ else:
532+ self.complete = True
533+ return self.complete
534+
535+ def get_related(self):
536+ """Check if any of the context interfaces have relation ids.
537+ Set self.related and return True if one of the interfaces
538+ has relation ids.
539+ """
540+ # Fresh start
541+ self.related = False
542+ try:
543+ for interface in self.interfaces:
544+ if relation_ids(interface):
545+ self.related = True
546+ return self.related
547+ except AttributeError as e:
548+ log("{} {}"
549+ "".format(self, e), 'INFO')
550+ return self.related
551+
552
553 class SharedDBContext(OSContextGenerator):
554 interfaces = ['shared-db']
555@@ -213,6 +254,7 @@
556 self.database = database
557 self.user = user
558 self.ssl_dir = ssl_dir
559+ self.rel_name = self.interfaces[0]
560
561 def __call__(self):
562 self.database = self.database or config('database')
563@@ -246,6 +288,7 @@
564 password_setting = self.relation_prefix + '_password'
565
566 for rid in relation_ids(self.interfaces[0]):
567+ self.related = True
568 for unit in related_units(rid):
569 rdata = relation_get(rid=rid, unit=unit)
570 host = rdata.get('db_host')
571@@ -257,7 +300,7 @@
572 'database_password': rdata.get(password_setting),
573 'database_type': 'mysql'
574 }
575- if context_complete(ctxt):
576+ if self.context_complete(ctxt):
577 db_ssl(rdata, ctxt, self.ssl_dir)
578 return ctxt
579 return {}
580@@ -278,6 +321,7 @@
581
582 ctxt = {}
583 for rid in relation_ids(self.interfaces[0]):
584+ self.related = True
585 for unit in related_units(rid):
586 rel_host = relation_get('host', rid=rid, unit=unit)
587 rel_user = relation_get('user', rid=rid, unit=unit)
588@@ -287,7 +331,7 @@
589 'database_user': rel_user,
590 'database_password': rel_passwd,
591 'database_type': 'postgresql'}
592- if context_complete(ctxt):
593+ if self.context_complete(ctxt):
594 return ctxt
595
596 return {}
597@@ -348,6 +392,7 @@
598 ctxt['signing_dir'] = cachedir
599
600 for rid in relation_ids(self.rel_name):
601+ self.related = True
602 for unit in related_units(rid):
603 rdata = relation_get(rid=rid, unit=unit)
604 serv_host = rdata.get('service_host')
605@@ -366,7 +411,7 @@
606 'service_protocol': svc_protocol,
607 'auth_protocol': auth_protocol})
608
609- if context_complete(ctxt):
610+ if self.context_complete(ctxt):
611 # NOTE(jamespage) this is required for >= icehouse
612 # so a missing value just indicates keystone needs
613 # upgrading
614@@ -405,6 +450,7 @@
615 ctxt = {}
616 for rid in relation_ids(self.rel_name):
617 ha_vip_only = False
618+ self.related = True
619 for unit in related_units(rid):
620 if relation_get('clustered', rid=rid, unit=unit):
621 ctxt['clustered'] = True
622@@ -437,7 +483,7 @@
623 ha_vip_only = relation_get('ha-vip-only',
624 rid=rid, unit=unit) is not None
625
626- if context_complete(ctxt):
627+ if self.context_complete(ctxt):
628 if 'rabbit_ssl_ca' in ctxt:
629 if not self.ssl_dir:
630 log("Charm not setup for ssl support but ssl ca "
631@@ -469,7 +515,7 @@
632 ctxt['oslo_messaging_flags'] = config_flags_parser(
633 oslo_messaging_flags)
634
635- if not context_complete(ctxt):
636+ if not self.complete:
637 return {}
638
639 return ctxt
640@@ -485,13 +531,15 @@
641
642 log('Generating template context for ceph', level=DEBUG)
643 mon_hosts = []
644- auth = None
645- key = None
646- use_syslog = str(config('use-syslog')).lower()
647+ ctxt = {
648+ 'use_syslog': str(config('use-syslog')).lower()
649+ }
650 for rid in relation_ids('ceph'):
651 for unit in related_units(rid):
652- auth = relation_get('auth', rid=rid, unit=unit)
653- key = relation_get('key', rid=rid, unit=unit)
654+ if not ctxt.get('auth'):
655+ ctxt['auth'] = relation_get('auth', rid=rid, unit=unit)
656+ if not ctxt.get('key'):
657+ ctxt['key'] = relation_get('key', rid=rid, unit=unit)
658 ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
659 unit=unit)
660 unit_priv_addr = relation_get('private-address', rid=rid,
661@@ -500,15 +548,12 @@
662 ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
663 mon_hosts.append(ceph_addr)
664
665- ctxt = {'mon_hosts': ' '.join(sorted(mon_hosts)),
666- 'auth': auth,
667- 'key': key,
668- 'use_syslog': use_syslog}
669+ ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
670
671 if not os.path.isdir('/etc/ceph'):
672 os.mkdir('/etc/ceph')
673
674- if not context_complete(ctxt):
675+ if not self.context_complete(ctxt):
676 return {}
677
678 ensure_packages(['ceph-common'])
679@@ -1334,8 +1379,18 @@
680 ports = mappings.values()
681 napi_settings = NeutronAPIContext()()
682 mtu = napi_settings.get('network_device_mtu')
683+ all_ports = set()
684+ # If any of ports is a vlan device, its underlying device must have
685+ # mtu applied first.
686+ for port in ports:
687+ for lport in glob.glob("/sys/class/net/%s/lower_*" % port):
688+ lport = os.path.basename(lport)
689+ all_ports.add(lport.split('_')[1])
690+
691+ all_ports = list(all_ports)
692+ all_ports.extend(ports)
693 if mtu:
694- ctxt["devs"] = '\\n'.join(ports)
695+ ctxt["devs"] = '\\n'.join(all_ports)
696 ctxt['mtu'] = mtu
697
698 return ctxt
699@@ -1367,6 +1422,6 @@
700 'auth_protocol':
701 rdata.get('auth_protocol') or 'http',
702 }
703- if context_complete(ctxt):
704+ if self.context_complete(ctxt):
705 return ctxt
706 return {}
707
708=== modified file 'hooks/charmhelpers/contrib/openstack/templating.py'
709--- hooks/charmhelpers/contrib/openstack/templating.py 2015-07-29 10:51:24 +0000
710+++ hooks/charmhelpers/contrib/openstack/templating.py 2015-09-25 16:48:59 +0000
711@@ -18,7 +18,7 @@
712
713 import six
714
715-from charmhelpers.fetch import apt_install
716+from charmhelpers.fetch import apt_install, apt_update
717 from charmhelpers.core.hookenv import (
718 log,
719 ERROR,
720@@ -29,6 +29,7 @@
721 try:
722 from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
723 except ImportError:
724+ apt_update(fatal=True)
725 apt_install('python-jinja2', fatal=True)
726 from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
727
728@@ -112,7 +113,7 @@
729
730 def complete_contexts(self):
731 '''
732- Return a list of interfaces that have atisfied contexts.
733+ Return a list of interfaces that have satisfied contexts.
734 '''
735 if self._complete_contexts:
736 return self._complete_contexts
737@@ -293,3 +294,30 @@
738 [interfaces.extend(i.complete_contexts())
739 for i in six.itervalues(self.templates)]
740 return interfaces
741+
742+ def get_incomplete_context_data(self, interfaces):
743+ '''
744+ Return dictionary of relation status of interfaces and any missing
745+ required context data. Example:
746+ {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
747+ 'zeromq-configuration': {'related': False}}
748+ '''
749+ incomplete_context_data = {}
750+
751+ for i in six.itervalues(self.templates):
752+ for context in i.contexts:
753+ for interface in interfaces:
754+ related = False
755+ if interface in context.interfaces:
756+ related = context.get_related()
757+ missing_data = context.missing_data
758+ if missing_data:
759+ incomplete_context_data[interface] = {'missing_data': missing_data}
760+ if related:
761+ if incomplete_context_data.get(interface):
762+ incomplete_context_data[interface].update({'related': True})
763+ else:
764+ incomplete_context_data[interface] = {'related': True}
765+ else:
766+ incomplete_context_data[interface] = {'related': False}
767+ return incomplete_context_data
768
769=== modified file 'hooks/charmhelpers/contrib/openstack/utils.py'
770--- hooks/charmhelpers/contrib/openstack/utils.py 2015-09-03 09:45:37 +0000
771+++ hooks/charmhelpers/contrib/openstack/utils.py 2015-09-25 16:48:59 +0000
772@@ -25,6 +25,7 @@
773 import re
774
775 import six
776+import traceback
777 import yaml
778
779 from charmhelpers.contrib.network import ip
780@@ -34,12 +35,16 @@
781 )
782
783 from charmhelpers.core.hookenv import (
784+ action_fail,
785+ action_set,
786 config,
787 log as juju_log,
788 charm_dir,
789 INFO,
790 relation_ids,
791- relation_set
792+ relation_set,
793+ status_set,
794+ hook_name
795 )
796
797 from charmhelpers.contrib.storage.linux.lvm import (
798@@ -114,6 +119,7 @@
799 ('2.2.1', 'kilo'),
800 ('2.2.2', 'kilo'),
801 ('2.3.0', 'liberty'),
802+ ('2.4.0', 'liberty'),
803 ])
804
805 # >= Liberty version->codename mapping
806@@ -142,6 +148,9 @@
807 'glance-common': OrderedDict([
808 ('11.0.0', 'liberty'),
809 ]),
810+ 'openstack-dashboard': OrderedDict([
811+ ('8.0.0', 'liberty'),
812+ ]),
813 }
814
815 DEFAULT_LOOPBACK_SIZE = '5G'
816@@ -745,3 +754,217 @@
817 return projects[key]
818
819 return None
820+
821+
822+def os_workload_status(configs, required_interfaces, charm_func=None):
823+ """
824+ Decorator to set workload status based on complete contexts
825+ """
826+ def wrap(f):
827+ @wraps(f)
828+ def wrapped_f(*args, **kwargs):
829+ # Run the original function first
830+ f(*args, **kwargs)
831+ # Set workload status now that contexts have been
832+ # acted on
833+ set_os_workload_status(configs, required_interfaces, charm_func)
834+ return wrapped_f
835+ return wrap
836+
837+
838+def set_os_workload_status(configs, required_interfaces, charm_func=None):
839+ """
840+ Set workload status based on complete contexts.
841+ status-set missing or incomplete contexts
842+ and juju-log details of missing required data.
843+ charm_func is a charm specific function to run checking
844+ for charm specific requirements such as a VIP setting.
845+ """
846+ incomplete_rel_data = incomplete_relation_data(configs, required_interfaces)
847+ state = 'active'
848+ missing_relations = []
849+ incomplete_relations = []
850+ message = None
851+ charm_state = None
852+ charm_message = None
853+
854+ for generic_interface in incomplete_rel_data.keys():
855+ related_interface = None
856+ missing_data = {}
857+ # Related or not?
858+ for interface in incomplete_rel_data[generic_interface]:
859+ if incomplete_rel_data[generic_interface][interface].get('related'):
860+ related_interface = interface
861+ missing_data = incomplete_rel_data[generic_interface][interface].get('missing_data')
862+ # No relation ID for the generic_interface
863+ if not related_interface:
864+ juju_log("{} relation is missing and must be related for "
865+ "functionality. ".format(generic_interface), 'WARN')
866+ state = 'blocked'
867+ if generic_interface not in missing_relations:
868+ missing_relations.append(generic_interface)
869+ else:
870+ # Relation ID exists but no related unit
871+ if not missing_data:
872+ # Edge case relation ID exists but departing
873+ if ('departed' in hook_name() or 'broken' in hook_name()) \
874+ and related_interface in hook_name():
875+ state = 'blocked'
876+ if generic_interface not in missing_relations:
877+ missing_relations.append(generic_interface)
878+ juju_log("{} relation's interface, {}, "
879+ "relationship is departed or broken "
880+ "and is required for functionality."
881+ "".format(generic_interface, related_interface), "WARN")
882+ # Normal case relation ID exists but no related unit
883+ # (joining)
884+ else:
885+ juju_log("{} relations's interface, {}, is related but has "
886+ "no units in the relation."
887+ "".format(generic_interface, related_interface), "INFO")
888+ # Related unit exists and data missing on the relation
889+ else:
890+ juju_log("{} relation's interface, {}, is related awaiting "
891+ "the following data from the relationship: {}. "
892+ "".format(generic_interface, related_interface,
893+ ", ".join(missing_data)), "INFO")
894+ if state != 'blocked':
895+ state = 'waiting'
896+ if generic_interface not in incomplete_relations \
897+ and generic_interface not in missing_relations:
898+ incomplete_relations.append(generic_interface)
899+
900+ if missing_relations:
901+ message = "Missing relations: {}".format(", ".join(missing_relations))
902+ if incomplete_relations:
903+ message += "; incomplete relations: {}" \
904+ "".format(", ".join(incomplete_relations))
905+ state = 'blocked'
906+ elif incomplete_relations:
907+ message = "Incomplete relations: {}" \
908+ "".format(", ".join(incomplete_relations))
909+ state = 'waiting'
910+
911+ # Run charm specific checks
912+ if charm_func:
913+ charm_state, charm_message = charm_func(configs)
914+ if charm_state != 'active' and charm_state != 'unknown':
915+ state = workload_state_compare(state, charm_state)
916+ if message:
917+ message = "{} {}".format(message, charm_message)
918+ else:
919+ message = charm_message
920+
921+ # Set to active if all requirements have been met
922+ if state == 'active':
923+ message = "Unit is ready"
924+ juju_log(message, "INFO")
925+
926+ status_set(state, message)
927+
928+
929+def workload_state_compare(current_workload_state, workload_state):
930+ """ Return highest priority of two states"""
931+ hierarchy = {'unknown': -1,
932+ 'active': 0,
933+ 'maintenance': 1,
934+ 'waiting': 2,
935+ 'blocked': 3,
936+ }
937+
938+ if hierarchy.get(workload_state) is None:
939+ workload_state = 'unknown'
940+ if hierarchy.get(current_workload_state) is None:
941+ current_workload_state = 'unknown'
942+
943+ # Set workload_state based on hierarchy of statuses
944+ if hierarchy.get(current_workload_state) > hierarchy.get(workload_state):
945+ return current_workload_state
946+ else:
947+ return workload_state
948+
949+
950+def incomplete_relation_data(configs, required_interfaces):
951+ """
952+ Check complete contexts against required_interfaces
953+ Return dictionary of incomplete relation data.
954+
955+ configs is an OSConfigRenderer object with configs registered
956+
957+ required_interfaces is a dictionary of required general interfaces
958+ with dictionary values of possible specific interfaces.
959+ Example:
960+ required_interfaces = {'database': ['shared-db', 'pgsql-db']}
961+
962+ The interface is said to be satisfied if anyone of the interfaces in the
963+ list has a complete context.
964+
965+ Return dictionary of incomplete or missing required contexts with relation
966+ status of interfaces and any missing data points. Example:
967+ {'message':
968+ {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
969+ 'zeromq-configuration': {'related': False}},
970+ 'identity':
971+ {'identity-service': {'related': False}},
972+ 'database':
973+ {'pgsql-db': {'related': False},
974+ 'shared-db': {'related': True}}}
975+ """
976+ complete_ctxts = configs.complete_contexts()
977+ incomplete_relations = []
978+ for svc_type in required_interfaces.keys():
979+ # Avoid duplicates
980+ found_ctxt = False
981+ for interface in required_interfaces[svc_type]:
982+ if interface in complete_ctxts:
983+ found_ctxt = True
984+ if not found_ctxt:
985+ incomplete_relations.append(svc_type)
986+ incomplete_context_data = {}
987+ for i in incomplete_relations:
988+ incomplete_context_data[i] = configs.get_incomplete_context_data(required_interfaces[i])
989+ return incomplete_context_data
990+
991+
992+def do_action_openstack_upgrade(package, upgrade_callback, configs):
993+ """Perform action-managed OpenStack upgrade.
994+
995+ Upgrades packages to the configured openstack-origin version and sets
996+ the corresponding action status as a result.
997+
998+ If the charm was installed from source we cannot upgrade it.
999+ For backwards compatibility a config flag (action-managed-upgrade) must
1000+ be set for this code to run, otherwise a full service level upgrade will
1001+ fire on config-changed.
1002+
1003+ @param package: package name for determining if upgrade available
1004+ @param upgrade_callback: function callback to charm's upgrade function
1005+ @param configs: templating object derived from OSConfigRenderer class
1006+
1007+ @return: True if upgrade successful; False if upgrade failed or skipped
1008+ """
1009+ ret = False
1010+
1011+ if git_install_requested():
1012+ action_set({'outcome': 'installed from source, skipped upgrade.'})
1013+ else:
1014+ if openstack_upgrade_available(package):
1015+ if config('action-managed-upgrade'):
1016+ juju_log('Upgrading OpenStack release')
1017+
1018+ try:
1019+ upgrade_callback(configs=configs)
1020+ action_set({'outcome': 'success, upgrade completed.'})
1021+ ret = True
1022+ except:
1023+ action_set({'outcome': 'upgrade failed, see traceback.'})
1024+ action_set({'traceback': traceback.format_exc()})
1025+ action_fail('do_openstack_upgrade resulted in an '
1026+ 'unexpected error')
1027+ else:
1028+ action_set({'outcome': 'action-managed-upgrade config is '
1029+ 'False, skipped upgrade.'})
1030+ else:
1031+ action_set({'outcome': 'no upgrade available.'})
1032+
1033+ return ret
1034
1035=== modified file 'hooks/charmhelpers/contrib/storage/linux/ceph.py'
1036--- hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-07-16 20:18:23 +0000
1037+++ hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-09-25 16:48:59 +0000
1038@@ -28,6 +28,7 @@
1039 import shutil
1040 import json
1041 import time
1042+import uuid
1043
1044 from subprocess import (
1045 check_call,
1046@@ -35,8 +36,10 @@
1047 CalledProcessError,
1048 )
1049 from charmhelpers.core.hookenv import (
1050+ local_unit,
1051 relation_get,
1052 relation_ids,
1053+ relation_set,
1054 related_units,
1055 log,
1056 DEBUG,
1057@@ -56,6 +59,8 @@
1058 apt_install,
1059 )
1060
1061+from charmhelpers.core.kernel import modprobe
1062+
1063 KEYRING = '/etc/ceph/ceph.client.{}.keyring'
1064 KEYFILE = '/etc/ceph/ceph.client.{}.key'
1065
1066@@ -288,17 +293,6 @@
1067 os.chown(data_src_dst, uid, gid)
1068
1069
1070-# TODO: re-use
1071-def modprobe(module):
1072- """Load a kernel module and configure for auto-load on reboot."""
1073- log('Loading kernel module', level=INFO)
1074- cmd = ['modprobe', module]
1075- check_call(cmd)
1076- with open('/etc/modules', 'r+') as modules:
1077- if module not in modules.read():
1078- modules.write(module)
1079-
1080-
1081 def copy_files(src, dst, symlinks=False, ignore=None):
1082 """Copy files from src to dst."""
1083 for item in os.listdir(src):
1084@@ -411,17 +405,52 @@
1085
1086 The API is versioned and defaults to version 1.
1087 """
1088- def __init__(self, api_version=1):
1089+ def __init__(self, api_version=1, request_id=None):
1090 self.api_version = api_version
1091+ if request_id:
1092+ self.request_id = request_id
1093+ else:
1094+ self.request_id = str(uuid.uuid1())
1095 self.ops = []
1096
1097 def add_op_create_pool(self, name, replica_count=3):
1098 self.ops.append({'op': 'create-pool', 'name': name,
1099 'replicas': replica_count})
1100
1101+ def set_ops(self, ops):
1102+ """Set request ops to provided value.
1103+
1104+ Useful for injecting ops that come from a previous request
1105+ to allow comparisons to ensure validity.
1106+ """
1107+ self.ops = ops
1108+
1109 @property
1110 def request(self):
1111- return json.dumps({'api-version': self.api_version, 'ops': self.ops})
1112+ return json.dumps({'api-version': self.api_version, 'ops': self.ops,
1113+ 'request-id': self.request_id})
1114+
1115+ def _ops_equal(self, other):
1116+ if len(self.ops) == len(other.ops):
1117+ for req_no in range(0, len(self.ops)):
1118+ for key in ['replicas', 'name', 'op']:
1119+ if self.ops[req_no][key] != other.ops[req_no][key]:
1120+ return False
1121+ else:
1122+ return False
1123+ return True
1124+
1125+ def __eq__(self, other):
1126+ if not isinstance(other, self.__class__):
1127+ return False
1128+ if self.api_version == other.api_version and \
1129+ self._ops_equal(other):
1130+ return True
1131+ else:
1132+ return False
1133+
1134+ def __ne__(self, other):
1135+ return not self.__eq__(other)
1136
1137
1138 class CephBrokerRsp(object):
1139@@ -431,14 +460,198 @@
1140
1141 The API is versioned and defaults to version 1.
1142 """
1143+
1144 def __init__(self, encoded_rsp):
1145 self.api_version = None
1146 self.rsp = json.loads(encoded_rsp)
1147
1148 @property
1149+ def request_id(self):
1150+ return self.rsp.get('request-id')
1151+
1152+ @property
1153 def exit_code(self):
1154 return self.rsp.get('exit-code')
1155
1156 @property
1157 def exit_msg(self):
1158 return self.rsp.get('stderr')
1159+
1160+
1161+# Ceph Broker Conversation:
1162+# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1163+# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1164+# unique id so that the client can identity which CephBrokerRsp is associated
1165+# with the request. Ceph will also respond to each client unit individually
1166+# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1167+# via key broker-rsp-glance-0
1168+#
1169+# To use this the charm can just do something like:
1170+#
1171+# from charmhelpers.contrib.storage.linux.ceph import (
1172+# send_request_if_needed,
1173+# is_request_complete,
1174+# CephBrokerRq,
1175+# )
1176+#
1177+# @hooks.hook('ceph-relation-changed')
1178+# def ceph_changed():
1179+# rq = CephBrokerRq()
1180+# rq.add_op_create_pool(name='poolname', replica_count=3)
1181+#
1182+# if is_request_complete(rq):
1183+# <Request complete actions>
1184+# else:
1185+# send_request_if_needed(get_ceph_request())
1186+#
1187+# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1188+# of glance having sent a request to ceph which ceph has successfully processed
1189+# 'ceph:8': {
1190+# 'ceph/0': {
1191+# 'auth': 'cephx',
1192+# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1193+# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1194+# 'ceph-public-address': '10.5.44.103',
1195+# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1196+# 'private-address': '10.5.44.103',
1197+# },
1198+# 'glance/0': {
1199+# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1200+# '"ops": [{"replicas": 3, "name": "glance", '
1201+# '"op": "create-pool"}]}'),
1202+# 'private-address': '10.5.44.109',
1203+# },
1204+# }
1205+
1206+def get_previous_request(rid):
1207+ """Return the last ceph broker request sent on a given relation
1208+
1209+ @param rid: Relation id to query for request
1210+ """
1211+ request = None
1212+ broker_req = relation_get(attribute='broker_req', rid=rid,
1213+ unit=local_unit())
1214+ if broker_req:
1215+ request_data = json.loads(broker_req)
1216+ request = CephBrokerRq(api_version=request_data['api-version'],
1217+ request_id=request_data['request-id'])
1218+ request.set_ops(request_data['ops'])
1219+
1220+ return request
1221+
1222+
1223+def get_request_states(request):
1224+ """Return a dict of requests per relation id with their corresponding
1225+ completion state.
1226+
1227+ This allows a charm, which has a request for ceph, to see whether there is
1228+ an equivalent request already being processed and if so what state that
1229+ request is in.
1230+
1231+ @param request: A CephBrokerRq object
1232+ """
1233+ complete = []
1234+ requests = {}
1235+ for rid in relation_ids('ceph'):
1236+ complete = False
1237+ previous_request = get_previous_request(rid)
1238+ if request == previous_request:
1239+ sent = True
1240+ complete = is_request_complete_for_rid(previous_request, rid)
1241+ else:
1242+ sent = False
1243+ complete = False
1244+
1245+ requests[rid] = {
1246+ 'sent': sent,
1247+ 'complete': complete,
1248+ }
1249+
1250+ return requests
1251+
1252+
1253+def is_request_sent(request):
1254+ """Check to see if a functionally equivalent request has already been sent
1255+
1256+ Returns True if a similair request has been sent
1257+
1258+ @param request: A CephBrokerRq object
1259+ """
1260+ states = get_request_states(request)
1261+ for rid in states.keys():
1262+ if not states[rid]['sent']:
1263+ return False
1264+
1265+ return True
1266+
1267+
1268+def is_request_complete(request):
1269+ """Check to see if a functionally equivalent request has already been
1270+ completed
1271+
1272+ Returns True if a similair request has been completed
1273+
1274+ @param request: A CephBrokerRq object
1275+ """
1276+ states = get_request_states(request)
1277+ for rid in states.keys():
1278+ if not states[rid]['complete']:
1279+ return False
1280+
1281+ return True
1282+
1283+
1284+def is_request_complete_for_rid(request, rid):
1285+ """Check if a given request has been completed on the given relation
1286+
1287+ @param request: A CephBrokerRq object
1288+ @param rid: Relation ID
1289+ """
1290+ broker_key = get_broker_rsp_key()
1291+ for unit in related_units(rid):
1292+ rdata = relation_get(rid=rid, unit=unit)
1293+ if rdata.get(broker_key):
1294+ rsp = CephBrokerRsp(rdata.get(broker_key))
1295+ if rsp.request_id == request.request_id:
1296+ if not rsp.exit_code:
1297+ return True
1298+ else:
1299+ # The remote unit sent no reply targeted at this unit so either the
1300+ # remote ceph cluster does not support unit targeted replies or it
1301+ # has not processed our request yet.
1302+ if rdata.get('broker_rsp'):
1303+ request_data = json.loads(rdata['broker_rsp'])
1304+ if request_data.get('request-id'):
1305+ log('Ignoring legacy broker_rsp without unit key as remote '
1306+ 'service supports unit specific replies', level=DEBUG)
1307+ else:
1308+ log('Using legacy broker_rsp as remote service does not '
1309+ 'supports unit specific replies', level=DEBUG)
1310+ rsp = CephBrokerRsp(rdata['broker_rsp'])
1311+ if not rsp.exit_code:
1312+ return True
1313+
1314+ return False
1315+
1316+
1317+def get_broker_rsp_key():
1318+ """Return broker response key for this unit
1319+
1320+ This is the key that ceph is going to use to pass request status
1321+ information back to this unit
1322+ """
1323+ return 'broker-rsp-' + local_unit().replace('/', '-')
1324+
1325+
1326+def send_request_if_needed(request):
1327+ """Send broker request if an equivalent request has not already been sent
1328+
1329+ @param request: A CephBrokerRq object
1330+ """
1331+ if is_request_sent(request):
1332+ log('Request already sent but not complete, not sending new request',
1333+ level=DEBUG)
1334+ else:
1335+ for rid in relation_ids('ceph'):
1336+ log('Sending request {}'.format(request.request_id), level=DEBUG)
1337+ relation_set(relation_id=rid, broker_req=request.request)
1338
1339=== modified file 'hooks/charmhelpers/core/host.py'
1340--- hooks/charmhelpers/core/host.py 2015-08-20 09:19:13 +0000
1341+++ hooks/charmhelpers/core/host.py 2015-09-25 16:48:59 +0000
1342@@ -63,32 +63,48 @@
1343 return service_result
1344
1345
1346-def service_pause(service_name, init_dir=None):
1347+def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
1348 """Pause a system service.
1349
1350 Stop it, and prevent it from starting again at boot."""
1351- if init_dir is None:
1352- init_dir = "/etc/init"
1353 stopped = service_stop(service_name)
1354- # XXX: Support systemd too
1355- override_path = os.path.join(
1356- init_dir, '{}.override'.format(service_name))
1357- with open(override_path, 'w') as fh:
1358- fh.write("manual\n")
1359+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
1360+ sysv_file = os.path.join(initd_dir, service_name)
1361+ if os.path.exists(upstart_file):
1362+ override_path = os.path.join(
1363+ init_dir, '{}.override'.format(service_name))
1364+ with open(override_path, 'w') as fh:
1365+ fh.write("manual\n")
1366+ elif os.path.exists(sysv_file):
1367+ subprocess.check_call(["update-rc.d", service_name, "disable"])
1368+ else:
1369+ # XXX: Support SystemD too
1370+ raise ValueError(
1371+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
1372+ service_name, upstart_file, sysv_file))
1373 return stopped
1374
1375
1376-def service_resume(service_name, init_dir=None):
1377+def service_resume(service_name, init_dir="/etc/init",
1378+ initd_dir="/etc/init.d"):
1379 """Resume a system service.
1380
1381 Reenable starting again at boot. Start the service"""
1382- # XXX: Support systemd too
1383- if init_dir is None:
1384- init_dir = "/etc/init"
1385- override_path = os.path.join(
1386- init_dir, '{}.override'.format(service_name))
1387- if os.path.exists(override_path):
1388- os.unlink(override_path)
1389+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
1390+ sysv_file = os.path.join(initd_dir, service_name)
1391+ if os.path.exists(upstart_file):
1392+ override_path = os.path.join(
1393+ init_dir, '{}.override'.format(service_name))
1394+ if os.path.exists(override_path):
1395+ os.unlink(override_path)
1396+ elif os.path.exists(sysv_file):
1397+ subprocess.check_call(["update-rc.d", service_name, "enable"])
1398+ else:
1399+ # XXX: Support SystemD too
1400+ raise ValueError(
1401+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
1402+ service_name, upstart_file, sysv_file))
1403+
1404 started = service_start(service_name)
1405 return started
1406
1407
1408=== modified file 'hooks/charmhelpers/core/hugepage.py'
1409--- hooks/charmhelpers/core/hugepage.py 2015-08-19 13:53:30 +0000
1410+++ hooks/charmhelpers/core/hugepage.py 2015-09-25 16:48:59 +0000
1411@@ -25,11 +25,13 @@
1412 fstab_mount,
1413 mkdir,
1414 )
1415+from charmhelpers.core.strutils import bytes_from_string
1416+from subprocess import check_output
1417
1418
1419 def hugepage_support(user, group='hugetlb', nr_hugepages=256,
1420 max_map_count=65536, mnt_point='/run/hugepages/kvm',
1421- pagesize='2MB', mount=True):
1422+ pagesize='2MB', mount=True, set_shmmax=False):
1423 """Enable hugepages on system.
1424
1425 Args:
1426@@ -49,6 +51,11 @@
1427 'vm.max_map_count': max_map_count,
1428 'vm.hugetlb_shm_group': gid,
1429 }
1430+ if set_shmmax:
1431+ shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
1432+ shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
1433+ if shmmax_minsize > shmmax_current:
1434+ sysctl_settings['kernel.shmmax'] = shmmax_minsize
1435 sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
1436 mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
1437 lfstab = fstab.Fstab()
1438
1439=== added file 'hooks/charmhelpers/core/kernel.py'
1440--- hooks/charmhelpers/core/kernel.py 1970-01-01 00:00:00 +0000
1441+++ hooks/charmhelpers/core/kernel.py 2015-09-25 16:48:59 +0000
1442@@ -0,0 +1,68 @@
1443+#!/usr/bin/env python
1444+# -*- coding: utf-8 -*-
1445+
1446+# Copyright 2014-2015 Canonical Limited.
1447+#
1448+# This file is part of charm-helpers.
1449+#
1450+# charm-helpers is free software: you can redistribute it and/or modify
1451+# it under the terms of the GNU Lesser General Public License version 3 as
1452+# published by the Free Software Foundation.
1453+#
1454+# charm-helpers is distributed in the hope that it will be useful,
1455+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1456+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1457+# GNU Lesser General Public License for more details.
1458+#
1459+# You should have received a copy of the GNU Lesser General Public License
1460+# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
1461+
1462+__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
1463+
1464+from charmhelpers.core.hookenv import (
1465+ log,
1466+ INFO
1467+)
1468+
1469+from subprocess import check_call, check_output
1470+import re
1471+
1472+
1473+def modprobe(module, persist=True):
1474+ """Load a kernel module and configure for auto-load on reboot."""
1475+ cmd = ['modprobe', module]
1476+
1477+ log('Loading kernel module %s' % module, level=INFO)
1478+
1479+ check_call(cmd)
1480+ if persist:
1481+ with open('/etc/modules', 'r+') as modules:
1482+ if module not in modules.read():
1483+ modules.write(module)
1484+
1485+
1486+def rmmod(module, force=False):
1487+ """Remove a module from the linux kernel"""
1488+ cmd = ['rmmod']
1489+ if force:
1490+ cmd.append('-f')
1491+ cmd.append(module)
1492+ log('Removing kernel module %s' % module, level=INFO)
1493+ return check_call(cmd)
1494+
1495+
1496+def lsmod():
1497+ """Shows what kernel modules are currently loaded"""
1498+ return check_output(['lsmod'],
1499+ universal_newlines=True)
1500+
1501+
1502+def is_module_loaded(module):
1503+ """Checks if a kernel module is already loaded"""
1504+ matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
1505+ return len(matches) > 0
1506+
1507+
1508+def update_initramfs(version='all'):
1509+ """Updates an initramfs image"""
1510+ return check_call(["update-initramfs", "-k", version, "-u"])
1511
1512=== modified file 'hooks/charmhelpers/core/strutils.py'
1513--- hooks/charmhelpers/core/strutils.py 2015-04-16 20:02:01 +0000
1514+++ hooks/charmhelpers/core/strutils.py 2015-09-25 16:48:59 +0000
1515@@ -18,6 +18,7 @@
1516 # along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
1517
1518 import six
1519+import re
1520
1521
1522 def bool_from_string(value):
1523@@ -40,3 +41,32 @@
1524
1525 msg = "Unable to interpret string value '%s' as boolean" % (value)
1526 raise ValueError(msg)
1527+
1528+
1529+def bytes_from_string(value):
1530+ """Interpret human readable string value as bytes.
1531+
1532+ Returns int
1533+ """
1534+ BYTE_POWER = {
1535+ 'K': 1,
1536+ 'KB': 1,
1537+ 'M': 2,
1538+ 'MB': 2,
1539+ 'G': 3,
1540+ 'GB': 3,
1541+ 'T': 4,
1542+ 'TB': 4,
1543+ 'P': 5,
1544+ 'PB': 5,
1545+ }
1546+ if isinstance(value, six.string_types):
1547+ value = six.text_type(value)
1548+ else:
1549+ msg = "Unable to interpret non-string value '%s' as boolean" % (value)
1550+ raise ValueError(msg)
1551+ matches = re.match("([0-9]+)([a-zA-Z]+)", value)
1552+ if not matches:
1553+ msg = "Unable to interpret string value '%s' as bytes" % (value)
1554+ raise ValueError(msg)
1555+ return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
1556
1557=== modified file 'tests/charmhelpers/contrib/amulet/deployment.py'
1558--- tests/charmhelpers/contrib/amulet/deployment.py 2015-02-13 12:02:13 +0000
1559+++ tests/charmhelpers/contrib/amulet/deployment.py 2015-09-25 16:48:59 +0000
1560@@ -51,7 +51,8 @@
1561 if 'units' not in this_service:
1562 this_service['units'] = 1
1563
1564- self.d.add(this_service['name'], units=this_service['units'])
1565+ self.d.add(this_service['name'], units=this_service['units'],
1566+ constraints=this_service.get('constraints'))
1567
1568 for svc in other_services:
1569 if 'location' in svc:
1570@@ -64,7 +65,8 @@
1571 if 'units' not in svc:
1572 svc['units'] = 1
1573
1574- self.d.add(svc['name'], charm=branch_location, units=svc['units'])
1575+ self.d.add(svc['name'], charm=branch_location, units=svc['units'],
1576+ constraints=svc.get('constraints'))
1577
1578 def _add_relations(self, relations):
1579 """Add all of the relations for the services."""
1580
1581=== modified file 'tests/charmhelpers/contrib/amulet/utils.py'
1582--- tests/charmhelpers/contrib/amulet/utils.py 2015-08-18 21:19:31 +0000
1583+++ tests/charmhelpers/contrib/amulet/utils.py 2015-09-25 16:48:59 +0000
1584@@ -19,9 +19,11 @@
1585 import logging
1586 import os
1587 import re
1588+import socket
1589 import subprocess
1590 import sys
1591 import time
1592+import uuid
1593
1594 import amulet
1595 import distro_info
1596@@ -114,7 +116,7 @@
1597 # /!\ DEPRECATION WARNING (beisner):
1598 # New and existing tests should be rewritten to use
1599 # validate_services_by_name() as it is aware of init systems.
1600- self.log.warn('/!\\ DEPRECATION WARNING: use '
1601+ self.log.warn('DEPRECATION WARNING: use '
1602 'validate_services_by_name instead of validate_services '
1603 'due to init system differences.')
1604
1605@@ -269,33 +271,52 @@
1606 """Get last modification time of directory."""
1607 return sentry_unit.directory_stat(directory)['mtime']
1608
1609- def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
1610- """Get process' start time.
1611-
1612- Determine start time of the process based on the last modification
1613- time of the /proc/pid directory. If pgrep_full is True, the process
1614- name is matched against the full command line.
1615- """
1616- if pgrep_full:
1617- cmd = 'pgrep -o -f {}'.format(service)
1618- else:
1619- cmd = 'pgrep -o {}'.format(service)
1620- cmd = cmd + ' | grep -v pgrep || exit 0'
1621- cmd_out = sentry_unit.run(cmd)
1622- self.log.debug('CMDout: ' + str(cmd_out))
1623- if cmd_out[0]:
1624- self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
1625- proc_dir = '/proc/{}'.format(cmd_out[0].strip())
1626- return self._get_dir_mtime(sentry_unit, proc_dir)
1627+ def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
1628+ """Get start time of a process based on the last modification time
1629+ of the /proc/pid directory.
1630+
1631+ :sentry_unit: The sentry unit to check for the service on
1632+ :service: service name to look for in process table
1633+ :pgrep_full: [Deprecated] Use full command line search mode with pgrep
1634+ :returns: epoch time of service process start
1635+ :param commands: list of bash commands
1636+ :param sentry_units: list of sentry unit pointers
1637+ :returns: None if successful; Failure message otherwise
1638+ """
1639+ if pgrep_full is not None:
1640+ # /!\ DEPRECATION WARNING (beisner):
1641+ # No longer implemented, as pidof is now used instead of pgrep.
1642+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1643+ self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
1644+ 'longer implemented re: lp 1474030.')
1645+
1646+ pid_list = self.get_process_id_list(sentry_unit, service)
1647+ pid = pid_list[0]
1648+ proc_dir = '/proc/{}'.format(pid)
1649+ self.log.debug('Pid for {} on {}: {}'.format(
1650+ service, sentry_unit.info['unit_name'], pid))
1651+
1652+ return self._get_dir_mtime(sentry_unit, proc_dir)
1653
1654 def service_restarted(self, sentry_unit, service, filename,
1655- pgrep_full=False, sleep_time=20):
1656+ pgrep_full=None, sleep_time=20):
1657 """Check if service was restarted.
1658
1659 Compare a service's start time vs a file's last modification time
1660 (such as a config file for that service) to determine if the service
1661 has been restarted.
1662 """
1663+ # /!\ DEPRECATION WARNING (beisner):
1664+ # This method is prone to races in that no before-time is known.
1665+ # Use validate_service_config_changed instead.
1666+
1667+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1668+ # used instead of pgrep. pgrep_full is still passed through to ensure
1669+ # deprecation WARNS. lp1474030
1670+ self.log.warn('DEPRECATION WARNING: use '
1671+ 'validate_service_config_changed instead of '
1672+ 'service_restarted due to known races.')
1673+
1674 time.sleep(sleep_time)
1675 if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
1676 self._get_file_mtime(sentry_unit, filename)):
1677@@ -304,78 +325,122 @@
1678 return False
1679
1680 def service_restarted_since(self, sentry_unit, mtime, service,
1681- pgrep_full=False, sleep_time=20,
1682- retry_count=2):
1683+ pgrep_full=None, sleep_time=20,
1684+ retry_count=30, retry_sleep_time=10):
1685 """Check if service was been started after a given time.
1686
1687 Args:
1688 sentry_unit (sentry): The sentry unit to check for the service on
1689 mtime (float): The epoch time to check against
1690 service (string): service name to look for in process table
1691- pgrep_full (boolean): Use full command line search mode with pgrep
1692- sleep_time (int): Seconds to sleep before looking for process
1693- retry_count (int): If service is not found, how many times to retry
1694+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1695+ sleep_time (int): Initial sleep time (s) before looking for file
1696+ retry_sleep_time (int): Time (s) to sleep between retries
1697+ retry_count (int): If file is not found, how many times to retry
1698
1699 Returns:
1700 bool: True if service found and its start time it newer than mtime,
1701 False if service is older than mtime or if service was
1702 not found.
1703 """
1704- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1705+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1706+ # used instead of pgrep. pgrep_full is still passed through to ensure
1707+ # deprecation WARNS. lp1474030
1708+
1709+ unit_name = sentry_unit.info['unit_name']
1710+ self.log.debug('Checking that %s service restarted since %s on '
1711+ '%s' % (service, mtime, unit_name))
1712 time.sleep(sleep_time)
1713- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1714- pgrep_full)
1715- while retry_count > 0 and not proc_start_time:
1716- self.log.debug('No pid file found for service %s, will retry %i '
1717- 'more times' % (service, retry_count))
1718- time.sleep(30)
1719- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1720- pgrep_full)
1721- retry_count = retry_count - 1
1722+ proc_start_time = None
1723+ tries = 0
1724+ while tries <= retry_count and not proc_start_time:
1725+ try:
1726+ proc_start_time = self._get_proc_start_time(sentry_unit,
1727+ service,
1728+ pgrep_full)
1729+ self.log.debug('Attempt {} to get {} proc start time on {} '
1730+ 'OK'.format(tries, service, unit_name))
1731+ except IOError as e:
1732+ # NOTE(beisner) - race avoidance, proc may not exist yet.
1733+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1734+ self.log.debug('Attempt {} to get {} proc start time on {} '
1735+ 'failed\n{}'.format(tries, service,
1736+ unit_name, e))
1737+ time.sleep(retry_sleep_time)
1738+ tries += 1
1739
1740 if not proc_start_time:
1741 self.log.warn('No proc start time found, assuming service did '
1742 'not start')
1743 return False
1744 if proc_start_time >= mtime:
1745- self.log.debug('proc start time is newer than provided mtime'
1746- '(%s >= %s)' % (proc_start_time, mtime))
1747+ self.log.debug('Proc start time is newer than provided mtime'
1748+ '(%s >= %s) on %s (OK)' % (proc_start_time,
1749+ mtime, unit_name))
1750 return True
1751 else:
1752- self.log.warn('proc start time (%s) is older than provided mtime '
1753- '(%s), service did not restart' % (proc_start_time,
1754- mtime))
1755+ self.log.warn('Proc start time (%s) is older than provided mtime '
1756+ '(%s) on %s, service did not '
1757+ 'restart' % (proc_start_time, mtime, unit_name))
1758 return False
1759
1760 def config_updated_since(self, sentry_unit, filename, mtime,
1761- sleep_time=20):
1762+ sleep_time=20, retry_count=30,
1763+ retry_sleep_time=10):
1764 """Check if file was modified after a given time.
1765
1766 Args:
1767 sentry_unit (sentry): The sentry unit to check the file mtime on
1768 filename (string): The file to check mtime of
1769 mtime (float): The epoch time to check against
1770- sleep_time (int): Seconds to sleep before looking for process
1771+ sleep_time (int): Initial sleep time (s) before looking for file
1772+ retry_sleep_time (int): Time (s) to sleep between retries
1773+ retry_count (int): If file is not found, how many times to retry
1774
1775 Returns:
1776 bool: True if file was modified more recently than mtime, False if
1777- file was modified before mtime,
1778+ file was modified before mtime, or if file not found.
1779 """
1780- self.log.debug('Checking %s updated since %s' % (filename, mtime))
1781+ unit_name = sentry_unit.info['unit_name']
1782+ self.log.debug('Checking that %s updated since %s on '
1783+ '%s' % (filename, mtime, unit_name))
1784 time.sleep(sleep_time)
1785- file_mtime = self._get_file_mtime(sentry_unit, filename)
1786+ file_mtime = None
1787+ tries = 0
1788+ while tries <= retry_count and not file_mtime:
1789+ try:
1790+ file_mtime = self._get_file_mtime(sentry_unit, filename)
1791+ self.log.debug('Attempt {} to get {} file mtime on {} '
1792+ 'OK'.format(tries, filename, unit_name))
1793+ except IOError as e:
1794+ # NOTE(beisner) - race avoidance, file may not exist yet.
1795+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1796+ self.log.debug('Attempt {} to get {} file mtime on {} '
1797+ 'failed\n{}'.format(tries, filename,
1798+ unit_name, e))
1799+ time.sleep(retry_sleep_time)
1800+ tries += 1
1801+
1802+ if not file_mtime:
1803+ self.log.warn('Could not determine file mtime, assuming '
1804+ 'file does not exist')
1805+ return False
1806+
1807 if file_mtime >= mtime:
1808 self.log.debug('File mtime is newer than provided mtime '
1809- '(%s >= %s)' % (file_mtime, mtime))
1810+ '(%s >= %s) on %s (OK)' % (file_mtime,
1811+ mtime, unit_name))
1812 return True
1813 else:
1814- self.log.warn('File mtime %s is older than provided mtime %s'
1815- % (file_mtime, mtime))
1816+ self.log.warn('File mtime is older than provided mtime'
1817+ '(%s < on %s) on %s' % (file_mtime,
1818+ mtime, unit_name))
1819 return False
1820
1821 def validate_service_config_changed(self, sentry_unit, mtime, service,
1822- filename, pgrep_full=False,
1823- sleep_time=20, retry_count=2):
1824+ filename, pgrep_full=None,
1825+ sleep_time=20, retry_count=30,
1826+ retry_sleep_time=10):
1827 """Check service and file were updated after mtime
1828
1829 Args:
1830@@ -383,9 +448,10 @@
1831 mtime (float): The epoch time to check against
1832 service (string): service name to look for in process table
1833 filename (string): The file to check mtime of
1834- pgrep_full (boolean): Use full command line search mode with pgrep
1835- sleep_time (int): Seconds to sleep before looking for process
1836+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1837+ sleep_time (int): Initial sleep in seconds to pass to test helpers
1838 retry_count (int): If service is not found, how many times to retry
1839+ retry_sleep_time (int): Time in seconds to wait between retries
1840
1841 Typical Usage:
1842 u = OpenStackAmuletUtils(ERROR)
1843@@ -402,15 +468,27 @@
1844 mtime, False if service is older than mtime or if service was
1845 not found or if filename was modified before mtime.
1846 """
1847- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1848- time.sleep(sleep_time)
1849- service_restart = self.service_restarted_since(sentry_unit, mtime,
1850- service,
1851- pgrep_full=pgrep_full,
1852- sleep_time=0,
1853- retry_count=retry_count)
1854- config_update = self.config_updated_since(sentry_unit, filename, mtime,
1855- sleep_time=0)
1856+
1857+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1858+ # used instead of pgrep. pgrep_full is still passed through to ensure
1859+ # deprecation WARNS. lp1474030
1860+
1861+ service_restart = self.service_restarted_since(
1862+ sentry_unit, mtime,
1863+ service,
1864+ pgrep_full=pgrep_full,
1865+ sleep_time=sleep_time,
1866+ retry_count=retry_count,
1867+ retry_sleep_time=retry_sleep_time)
1868+
1869+ config_update = self.config_updated_since(
1870+ sentry_unit,
1871+ filename,
1872+ mtime,
1873+ sleep_time=sleep_time,
1874+ retry_count=retry_count,
1875+ retry_sleep_time=retry_sleep_time)
1876+
1877 return service_restart and config_update
1878
1879 def get_sentry_time(self, sentry_unit):
1880@@ -428,7 +506,6 @@
1881 """Return a list of all Ubuntu releases in order of release."""
1882 _d = distro_info.UbuntuDistroInfo()
1883 _release_list = _d.all
1884- self.log.debug('Ubuntu release list: {}'.format(_release_list))
1885 return _release_list
1886
1887 def file_to_url(self, file_rel_path):
1888@@ -568,6 +645,142 @@
1889
1890 return None
1891
1892+ def validate_sectionless_conf(self, file_contents, expected):
1893+ """A crude conf parser. Useful to inspect configuration files which
1894+ do not have section headers (as would be necessary in order to use
1895+ the configparser). Such as openstack-dashboard or rabbitmq confs."""
1896+ for line in file_contents.split('\n'):
1897+ if '=' in line:
1898+ args = line.split('=')
1899+ if len(args) <= 1:
1900+ continue
1901+ key = args[0].strip()
1902+ value = args[1].strip()
1903+ if key in expected.keys():
1904+ if expected[key] != value:
1905+ msg = ('Config mismatch. Expected, actual: {}, '
1906+ '{}'.format(expected[key], value))
1907+ amulet.raise_status(amulet.FAIL, msg=msg)
1908+
1909+ def get_unit_hostnames(self, units):
1910+ """Return a dict of juju unit names to hostnames."""
1911+ host_names = {}
1912+ for unit in units:
1913+ host_names[unit.info['unit_name']] = \
1914+ str(unit.file_contents('/etc/hostname').strip())
1915+ self.log.debug('Unit host names: {}'.format(host_names))
1916+ return host_names
1917+
1918+ def run_cmd_unit(self, sentry_unit, cmd):
1919+ """Run a command on a unit, return the output and exit code."""
1920+ output, code = sentry_unit.run(cmd)
1921+ if code == 0:
1922+ self.log.debug('{} `{}` command returned {} '
1923+ '(OK)'.format(sentry_unit.info['unit_name'],
1924+ cmd, code))
1925+ else:
1926+ msg = ('{} `{}` command returned {} '
1927+ '{}'.format(sentry_unit.info['unit_name'],
1928+ cmd, code, output))
1929+ amulet.raise_status(amulet.FAIL, msg=msg)
1930+ return str(output), code
1931+
1932+ def file_exists_on_unit(self, sentry_unit, file_name):
1933+ """Check if a file exists on a unit."""
1934+ try:
1935+ sentry_unit.file_stat(file_name)
1936+ return True
1937+ except IOError:
1938+ return False
1939+ except Exception as e:
1940+ msg = 'Error checking file {}: {}'.format(file_name, e)
1941+ amulet.raise_status(amulet.FAIL, msg=msg)
1942+
1943+ def file_contents_safe(self, sentry_unit, file_name,
1944+ max_wait=60, fatal=False):
1945+ """Get file contents from a sentry unit. Wrap amulet file_contents
1946+ with retry logic to address races where a file checks as existing,
1947+ but no longer exists by the time file_contents is called.
1948+ Return None if file not found. Optionally raise if fatal is True."""
1949+ unit_name = sentry_unit.info['unit_name']
1950+ file_contents = False
1951+ tries = 0
1952+ while not file_contents and tries < (max_wait / 4):
1953+ try:
1954+ file_contents = sentry_unit.file_contents(file_name)
1955+ except IOError:
1956+ self.log.debug('Attempt {} to open file {} from {} '
1957+ 'failed'.format(tries, file_name,
1958+ unit_name))
1959+ time.sleep(4)
1960+ tries += 1
1961+
1962+ if file_contents:
1963+ return file_contents
1964+ elif not fatal:
1965+ return None
1966+ elif fatal:
1967+ msg = 'Failed to get file contents from unit.'
1968+ amulet.raise_status(amulet.FAIL, msg)
1969+
1970+ def port_knock_tcp(self, host="localhost", port=22, timeout=15):
1971+ """Open a TCP socket to check for a listening sevice on a host.
1972+
1973+ :param host: host name or IP address, default to localhost
1974+ :param port: TCP port number, default to 22
1975+ :param timeout: Connect timeout, default to 15 seconds
1976+ :returns: True if successful, False if connect failed
1977+ """
1978+
1979+ # Resolve host name if possible
1980+ try:
1981+ connect_host = socket.gethostbyname(host)
1982+ host_human = "{} ({})".format(connect_host, host)
1983+ except socket.error as e:
1984+ self.log.warn('Unable to resolve address: '
1985+ '{} ({}) Trying anyway!'.format(host, e))
1986+ connect_host = host
1987+ host_human = connect_host
1988+
1989+ # Attempt socket connection
1990+ try:
1991+ knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1992+ knock.settimeout(timeout)
1993+ knock.connect((connect_host, port))
1994+ knock.close()
1995+ self.log.debug('Socket connect OK for host '
1996+ '{} on port {}.'.format(host_human, port))
1997+ return True
1998+ except socket.error as e:
1999+ self.log.debug('Socket connect FAIL for'
2000+ ' {} port {} ({})'.format(host_human, port, e))
2001+ return False
2002+
2003+ def port_knock_units(self, sentry_units, port=22,
2004+ timeout=15, expect_success=True):
2005+ """Open a TCP socket to check for a listening sevice on each
2006+ listed juju unit.
2007+
2008+ :param sentry_units: list of sentry unit pointers
2009+ :param port: TCP port number, default to 22
2010+ :param timeout: Connect timeout, default to 15 seconds
2011+ :expect_success: True by default, set False to invert logic
2012+ :returns: None if successful, Failure message otherwise
2013+ """
2014+ for unit in sentry_units:
2015+ host = unit.info['public-address']
2016+ connected = self.port_knock_tcp(host, port, timeout)
2017+ if not connected and expect_success:
2018+ return 'Socket connect failed.'
2019+ elif connected and not expect_success:
2020+ return 'Socket connected unexpectedly.'
2021+
2022+ def get_uuid_epoch_stamp(self):
2023+ """Returns a stamp string based on uuid4 and epoch time. Useful in
2024+ generating test messages which need to be unique-ish."""
2025+ return '[{}-{}]'.format(uuid.uuid4(), time.time())
2026+
2027+# amulet juju action helpers:
2028 def run_action(self, unit_sentry, action,
2029 _check_output=subprocess.check_output):
2030 """Run the named action on a given unit sentry.
2031@@ -594,3 +807,12 @@
2032 output = _check_output(command, universal_newlines=True)
2033 data = json.loads(output)
2034 return data.get(u"status") == "completed"
2035+
2036+ def status_get(self, unit):
2037+ """Return the current service status of this unit."""
2038+ raw_status, return_code = unit.run(
2039+ "status-get --format=json --include-data")
2040+ if return_code != 0:
2041+ return ("unknown", "")
2042+ status = json.loads(raw_status)
2043+ return (status["status"], status["message"])
2044
2045=== modified file 'tests/charmhelpers/contrib/openstack/amulet/deployment.py'
2046--- tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-18 21:19:31 +0000
2047+++ tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-25 16:48:59 +0000
2048@@ -44,20 +44,31 @@
2049 Determine if the local branch being tested is derived from its
2050 stable or next (dev) branch, and based on this, use the corresonding
2051 stable or next branches for the other_services."""
2052+
2053+ # Charms outside the lp:~openstack-charmers namespace
2054 base_charms = ['mysql', 'mongodb', 'nrpe']
2055
2056+ # Force these charms to current series even when using an older series.
2057+ # ie. Use trusty/nrpe even when series is precise, as the P charm
2058+ # does not possess the necessary external master config and hooks.
2059+ force_series_current = ['nrpe']
2060+
2061 if self.series in ['precise', 'trusty']:
2062 base_series = self.series
2063 else:
2064 base_series = self.current_next
2065
2066- if self.stable:
2067- for svc in other_services:
2068+ for svc in other_services:
2069+ if svc['name'] in force_series_current:
2070+ base_series = self.current_next
2071+ # If a location has been explicitly set, use it
2072+ if svc.get('location'):
2073+ continue
2074+ if self.stable:
2075 temp = 'lp:charms/{}/{}'
2076 svc['location'] = temp.format(base_series,
2077 svc['name'])
2078- else:
2079- for svc in other_services:
2080+ else:
2081 if svc['name'] in base_charms:
2082 temp = 'lp:charms/{}/{}'
2083 svc['location'] = temp.format(base_series,
2084@@ -66,6 +77,7 @@
2085 temp = 'lp:~openstack-charmers/charms/{}/{}/next'
2086 svc['location'] = temp.format(self.current_next,
2087 svc['name'])
2088+
2089 return other_services
2090
2091 def _add_services(self, this_service, other_services):
2092@@ -77,21 +89,23 @@
2093
2094 services = other_services
2095 services.append(this_service)
2096+
2097+ # Charms which should use the source config option
2098 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
2099 'ceph-osd', 'ceph-radosgw']
2100- # Most OpenStack subordinate charms do not expose an origin option
2101- # as that is controlled by the principle.
2102- ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
2103+
2104+ # Charms which can not use openstack-origin, ie. many subordinates
2105+ no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
2106
2107 if self.openstack:
2108 for svc in services:
2109- if svc['name'] not in use_source + ignore:
2110+ if svc['name'] not in use_source + no_origin:
2111 config = {'openstack-origin': self.openstack}
2112 self.d.configure(svc['name'], config)
2113
2114 if self.source:
2115 for svc in services:
2116- if svc['name'] in use_source and svc['name'] not in ignore:
2117+ if svc['name'] in use_source and svc['name'] not in no_origin:
2118 config = {'source': self.source}
2119 self.d.configure(svc['name'], config)
2120
2121
2122=== modified file 'tests/charmhelpers/contrib/openstack/amulet/utils.py'
2123--- tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-07-16 20:18:23 +0000
2124+++ tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-25 16:48:59 +0000
2125@@ -27,6 +27,7 @@
2126 import heatclient.v1.client as heat_client
2127 import keystoneclient.v2_0 as keystone_client
2128 import novaclient.v1_1.client as nova_client
2129+import pika
2130 import swiftclient
2131
2132 from charmhelpers.contrib.amulet.utils import (
2133@@ -602,3 +603,361 @@
2134 self.log.debug('Ceph {} samples (OK): '
2135 '{}'.format(sample_type, samples))
2136 return None
2137+
2138+# rabbitmq/amqp specific helpers:
2139+ def add_rmq_test_user(self, sentry_units,
2140+ username="testuser1", password="changeme"):
2141+ """Add a test user via the first rmq juju unit, check connection as
2142+ the new user against all sentry units.
2143+
2144+ :param sentry_units: list of sentry unit pointers
2145+ :param username: amqp user name, default to testuser1
2146+ :param password: amqp user password
2147+ :returns: None if successful. Raise on error.
2148+ """
2149+ self.log.debug('Adding rmq user ({})...'.format(username))
2150+
2151+ # Check that user does not already exist
2152+ cmd_user_list = 'rabbitmqctl list_users'
2153+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
2154+ if username in output:
2155+ self.log.warning('User ({}) already exists, returning '
2156+ 'gracefully.'.format(username))
2157+ return
2158+
2159+ perms = '".*" ".*" ".*"'
2160+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
2161+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
2162+
2163+ # Add user via first unit
2164+ for cmd in cmds:
2165+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
2166+
2167+ # Check connection against the other sentry_units
2168+ self.log.debug('Checking user connect against units...')
2169+ for sentry_unit in sentry_units:
2170+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
2171+ username=username,
2172+ password=password)
2173+ connection.close()
2174+
2175+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
2176+ """Delete a rabbitmq user via the first rmq juju unit.
2177+
2178+ :param sentry_units: list of sentry unit pointers
2179+ :param username: amqp user name, default to testuser1
2180+ :param password: amqp user password
2181+ :returns: None if successful or no such user.
2182+ """
2183+ self.log.debug('Deleting rmq user ({})...'.format(username))
2184+
2185+ # Check that the user exists
2186+ cmd_user_list = 'rabbitmqctl list_users'
2187+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
2188+
2189+ if username not in output:
2190+ self.log.warning('User ({}) does not exist, returning '
2191+ 'gracefully.'.format(username))
2192+ return
2193+
2194+ # Delete the user
2195+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
2196+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
2197+
2198+ def get_rmq_cluster_status(self, sentry_unit):
2199+ """Execute rabbitmq cluster status command on a unit and return
2200+ the full output.
2201+
2202+ :param unit: sentry unit
2203+ :returns: String containing console output of cluster status command
2204+ """
2205+ cmd = 'rabbitmqctl cluster_status'
2206+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
2207+ self.log.debug('{} cluster_status:\n{}'.format(
2208+ sentry_unit.info['unit_name'], output))
2209+ return str(output)
2210+
2211+ def get_rmq_cluster_running_nodes(self, sentry_unit):
2212+ """Parse rabbitmqctl cluster_status output string, return list of
2213+ running rabbitmq cluster nodes.
2214+
2215+ :param unit: sentry unit
2216+ :returns: List containing node names of running nodes
2217+ """
2218+ # NOTE(beisner): rabbitmqctl cluster_status output is not
2219+ # json-parsable, do string chop foo, then json.loads that.
2220+ str_stat = self.get_rmq_cluster_status(sentry_unit)
2221+ if 'running_nodes' in str_stat:
2222+ pos_start = str_stat.find("{running_nodes,") + 15
2223+ pos_end = str_stat.find("]},", pos_start) + 1
2224+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
2225+ run_nodes = json.loads(str_run_nodes)
2226+ return run_nodes
2227+ else:
2228+ return []
2229+
2230+ def validate_rmq_cluster_running_nodes(self, sentry_units):
2231+ """Check that all rmq unit hostnames are represented in the
2232+ cluster_status output of all units.
2233+
2234+ :param host_names: dict of juju unit names to host names
2235+ :param units: list of sentry unit pointers (all rmq units)
2236+ :returns: None if successful, otherwise return error message
2237+ """
2238+ host_names = self.get_unit_hostnames(sentry_units)
2239+ errors = []
2240+
2241+ # Query every unit for cluster_status running nodes
2242+ for query_unit in sentry_units:
2243+ query_unit_name = query_unit.info['unit_name']
2244+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
2245+
2246+ # Confirm that every unit is represented in the queried unit's
2247+ # cluster_status running nodes output.
2248+ for validate_unit in sentry_units:
2249+ val_host_name = host_names[validate_unit.info['unit_name']]
2250+ val_node_name = 'rabbit@{}'.format(val_host_name)
2251+
2252+ if val_node_name not in running_nodes:
2253+ errors.append('Cluster member check failed on {}: {} not '
2254+ 'in {}\n'.format(query_unit_name,
2255+ val_node_name,
2256+ running_nodes))
2257+ if errors:
2258+ return ''.join(errors)
2259+
2260+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
2261+ """Check a single juju rmq unit for ssl and port in the config file."""
2262+ host = sentry_unit.info['public-address']
2263+ unit_name = sentry_unit.info['unit_name']
2264+
2265+ conf_file = '/etc/rabbitmq/rabbitmq.config'
2266+ conf_contents = str(self.file_contents_safe(sentry_unit,
2267+ conf_file, max_wait=16))
2268+ # Checks
2269+ conf_ssl = 'ssl' in conf_contents
2270+ conf_port = str(port) in conf_contents
2271+
2272+ # Port explicitly checked in config
2273+ if port and conf_port and conf_ssl:
2274+ self.log.debug('SSL is enabled @{}:{} '
2275+ '({})'.format(host, port, unit_name))
2276+ return True
2277+ elif port and not conf_port and conf_ssl:
2278+ self.log.debug('SSL is enabled @{} but not on port {} '
2279+ '({})'.format(host, port, unit_name))
2280+ return False
2281+ # Port not checked (useful when checking that ssl is disabled)
2282+ elif not port and conf_ssl:
2283+ self.log.debug('SSL is enabled @{}:{} '
2284+ '({})'.format(host, port, unit_name))
2285+ return True
2286+ elif not port and not conf_ssl:
2287+ self.log.debug('SSL not enabled @{}:{} '
2288+ '({})'.format(host, port, unit_name))
2289+ return False
2290+ else:
2291+ msg = ('Unknown condition when checking SSL status @{}:{} '
2292+ '({})'.format(host, port, unit_name))
2293+ amulet.raise_status(amulet.FAIL, msg)
2294+
2295+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
2296+ """Check that ssl is enabled on rmq juju sentry units.
2297+
2298+ :param sentry_units: list of all rmq sentry units
2299+ :param port: optional ssl port override to validate
2300+ :returns: None if successful, otherwise return error message
2301+ """
2302+ for sentry_unit in sentry_units:
2303+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
2304+ return ('Unexpected condition: ssl is disabled on unit '
2305+ '({})'.format(sentry_unit.info['unit_name']))
2306+ return None
2307+
2308+ def validate_rmq_ssl_disabled_units(self, sentry_units):
2309+ """Check that ssl is enabled on listed rmq juju sentry units.
2310+
2311+ :param sentry_units: list of all rmq sentry units
2312+ :returns: True if successful. Raise on error.
2313+ """
2314+ for sentry_unit in sentry_units:
2315+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
2316+ return ('Unexpected condition: ssl is enabled on unit '
2317+ '({})'.format(sentry_unit.info['unit_name']))
2318+ return None
2319+
2320+ def configure_rmq_ssl_on(self, sentry_units, deployment,
2321+ port=None, max_wait=60):
2322+ """Turn ssl charm config option on, with optional non-default
2323+ ssl port specification. Confirm that it is enabled on every
2324+ unit.
2325+
2326+ :param sentry_units: list of sentry units
2327+ :param deployment: amulet deployment object pointer
2328+ :param port: amqp port, use defaults if None
2329+ :param max_wait: maximum time to wait in seconds to confirm
2330+ :returns: None if successful. Raise on error.
2331+ """
2332+ self.log.debug('Setting ssl charm config option: on')
2333+
2334+ # Enable RMQ SSL
2335+ config = {'ssl': 'on'}
2336+ if port:
2337+ config['ssl_port'] = port
2338+
2339+ deployment.configure('rabbitmq-server', config)
2340+
2341+ # Confirm
2342+ tries = 0
2343+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
2344+ while ret and tries < (max_wait / 4):
2345+ time.sleep(4)
2346+ self.log.debug('Attempt {}: {}'.format(tries, ret))
2347+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
2348+ tries += 1
2349+
2350+ if ret:
2351+ amulet.raise_status(amulet.FAIL, ret)
2352+
2353+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
2354+ """Turn ssl charm config option off, confirm that it is disabled
2355+ on every unit.
2356+
2357+ :param sentry_units: list of sentry units
2358+ :param deployment: amulet deployment object pointer
2359+ :param max_wait: maximum time to wait in seconds to confirm
2360+ :returns: None if successful. Raise on error.
2361+ """
2362+ self.log.debug('Setting ssl charm config option: off')
2363+
2364+ # Disable RMQ SSL
2365+ config = {'ssl': 'off'}
2366+ deployment.configure('rabbitmq-server', config)
2367+
2368+ # Confirm
2369+ tries = 0
2370+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
2371+ while ret and tries < (max_wait / 4):
2372+ time.sleep(4)
2373+ self.log.debug('Attempt {}: {}'.format(tries, ret))
2374+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
2375+ tries += 1
2376+
2377+ if ret:
2378+ amulet.raise_status(amulet.FAIL, ret)
2379+
2380+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
2381+ port=None, fatal=True,
2382+ username="testuser1", password="changeme"):
2383+ """Establish and return a pika amqp connection to the rabbitmq service
2384+ running on a rmq juju unit.
2385+
2386+ :param sentry_unit: sentry unit pointer
2387+ :param ssl: boolean, default to False
2388+ :param port: amqp port, use defaults if None
2389+ :param fatal: boolean, default to True (raises on connect error)
2390+ :param username: amqp user name, default to testuser1
2391+ :param password: amqp user password
2392+ :returns: pika amqp connection pointer or None if failed and non-fatal
2393+ """
2394+ host = sentry_unit.info['public-address']
2395+ unit_name = sentry_unit.info['unit_name']
2396+
2397+ # Default port logic if port is not specified
2398+ if ssl and not port:
2399+ port = 5671
2400+ elif not ssl and not port:
2401+ port = 5672
2402+
2403+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
2404+ '{}...'.format(host, port, unit_name, username))
2405+
2406+ try:
2407+ credentials = pika.PlainCredentials(username, password)
2408+ parameters = pika.ConnectionParameters(host=host, port=port,
2409+ credentials=credentials,
2410+ ssl=ssl,
2411+ connection_attempts=3,
2412+ retry_delay=5,
2413+ socket_timeout=1)
2414+ connection = pika.BlockingConnection(parameters)
2415+ assert connection.server_properties['product'] == 'RabbitMQ'
2416+ self.log.debug('Connect OK')
2417+ return connection
2418+ except Exception as e:
2419+ msg = ('amqp connection failed to {}:{} as '
2420+ '{} ({})'.format(host, port, username, str(e)))
2421+ if fatal:
2422+ amulet.raise_status(amulet.FAIL, msg)
2423+ else:
2424+ self.log.warn(msg)
2425+ return None
2426+
2427+ def publish_amqp_message_by_unit(self, sentry_unit, message,
2428+ queue="test", ssl=False,
2429+ username="testuser1",
2430+ password="changeme",
2431+ port=None):
2432+ """Publish an amqp message to a rmq juju unit.
2433+
2434+ :param sentry_unit: sentry unit pointer
2435+ :param message: amqp message string
2436+ :param queue: message queue, default to test
2437+ :param username: amqp user name, default to testuser1
2438+ :param password: amqp user password
2439+ :param ssl: boolean, default to False
2440+ :param port: amqp port, use defaults if None
2441+ :returns: None. Raises exception if publish failed.
2442+ """
2443+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
2444+ message))
2445+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
2446+ port=port,
2447+ username=username,
2448+ password=password)
2449+
2450+ # NOTE(beisner): extra debug here re: pika hang potential:
2451+ # https://github.com/pika/pika/issues/297
2452+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
2453+ self.log.debug('Defining channel...')
2454+ channel = connection.channel()
2455+ self.log.debug('Declaring queue...')
2456+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
2457+ self.log.debug('Publishing message...')
2458+ channel.basic_publish(exchange='', routing_key=queue, body=message)
2459+ self.log.debug('Closing channel...')
2460+ channel.close()
2461+ self.log.debug('Closing connection...')
2462+ connection.close()
2463+
2464+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
2465+ username="testuser1",
2466+ password="changeme",
2467+ ssl=False, port=None):
2468+ """Get an amqp message from a rmq juju unit.
2469+
2470+ :param sentry_unit: sentry unit pointer
2471+ :param queue: message queue, default to test
2472+ :param username: amqp user name, default to testuser1
2473+ :param password: amqp user password
2474+ :param ssl: boolean, default to False
2475+ :param port: amqp port, use defaults if None
2476+ :returns: amqp message body as string. Raise if get fails.
2477+ """
2478+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
2479+ port=port,
2480+ username=username,
2481+ password=password)
2482+ channel = connection.channel()
2483+ method_frame, _, body = channel.basic_get(queue)
2484+
2485+ if method_frame:
2486+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
2487+ body))
2488+ channel.basic_ack(method_frame.delivery_tag)
2489+ channel.close()
2490+ connection.close()
2491+ return body
2492+ else:
2493+ msg = 'No message retrieved.'
2494+ amulet.raise_status(amulet.FAIL, msg)

Subscribers

People subscribed via source and target branches