Merge lp:~hopem/charms/trusty/neutron-gateway/lp1497517 into lp:~openstack-charmers-archive/charms/trusty/neutron-gateway/next

Proposed by Edward Hope-Morley
Status: Merged
Merged at revision: 145
Proposed branch: lp:~hopem/charms/trusty/neutron-gateway/lp1497517
Merge into: lp:~openstack-charmers-archive/charms/trusty/neutron-gateway/next
Diff against target: 2584 lines (+1793/-186) (has conflicts)
17 files modified
charm-helpers-hooks.yaml (+1/-1)
hooks/charmhelpers/contrib/network/ip.py (+5/-3)
hooks/charmhelpers/contrib/network/ufw.py (+5/-6)
hooks/charmhelpers/contrib/openstack/amulet/deployment.py (+23/-9)
hooks/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
hooks/charmhelpers/contrib/openstack/context.py (+72/-17)
hooks/charmhelpers/contrib/openstack/templating.py (+30/-2)
hooks/charmhelpers/contrib/openstack/utils.py (+264/-45)
hooks/charmhelpers/contrib/storage/linux/ceph.py (+226/-13)
hooks/charmhelpers/core/host.py (+32/-16)
hooks/charmhelpers/core/hugepage.py (+8/-1)
hooks/charmhelpers/core/kernel.py (+68/-0)
hooks/charmhelpers/core/strutils.py (+30/-0)
tests/charmhelpers/contrib/amulet/deployment.py (+4/-2)
tests/charmhelpers/contrib/amulet/utils.py (+284/-62)
tests/charmhelpers/contrib/openstack/amulet/deployment.py (+23/-9)
tests/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
Text conflict in hooks/charmhelpers/contrib/openstack/utils.py
To merge this branch: bzr merge lp:~hopem/charms/trusty/neutron-gateway/lp1497517
Reviewer Review Type Date Requested Status
OpenStack Charmers Pending
Review via email: mp+271887@code.launchpad.net
To post a comment you must log in.
Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #10422 neutron-gateway-next for hopem mp271887
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/10422/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #9618 neutron-gateway-next for hopem mp271887
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/9618/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6567 neutron-gateway-next for hopem mp271887
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/6567/

141. By Edward Hope-Morley

sync charmhelpers

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #10771 neutron-gateway-next for hopem mp271887
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/10771/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #9951 neutron-gateway-next for hopem mp271887
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/9951/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6776 neutron-gateway-next for hopem mp271887
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/6776/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'charm-helpers-hooks.yaml'
2--- charm-helpers-hooks.yaml 2015-08-20 09:24:04 +0000
3+++ charm-helpers-hooks.yaml 2015-09-25 16:13:14 +0000
4@@ -1,4 +1,4 @@
5-branch: lp:charm-helpers
6+branch: lp:~hopem/charm-helpers/lp1497517
7 destination: hooks/charmhelpers
8 include:
9 - core
10
11=== modified file 'hooks/charmhelpers/contrib/network/ip.py'
12--- hooks/charmhelpers/contrib/network/ip.py 2015-09-03 09:43:15 +0000
13+++ hooks/charmhelpers/contrib/network/ip.py 2015-09-25 16:13:14 +0000
14@@ -23,7 +23,7 @@
15 from functools import partial
16
17 from charmhelpers.core.hookenv import unit_get
18-from charmhelpers.fetch import apt_install
19+from charmhelpers.fetch import apt_install, apt_update
20 from charmhelpers.core.hookenv import (
21 log,
22 WARNING,
23@@ -32,13 +32,15 @@
24 try:
25 import netifaces
26 except ImportError:
27- apt_install('python-netifaces')
28+ apt_update(fatal=True)
29+ apt_install('python-netifaces', fatal=True)
30 import netifaces
31
32 try:
33 import netaddr
34 except ImportError:
35- apt_install('python-netaddr')
36+ apt_update(fatal=True)
37+ apt_install('python-netaddr', fatal=True)
38 import netaddr
39
40
41
42=== modified file 'hooks/charmhelpers/contrib/network/ufw.py'
43--- hooks/charmhelpers/contrib/network/ufw.py 2015-07-16 20:18:08 +0000
44+++ hooks/charmhelpers/contrib/network/ufw.py 2015-09-25 16:13:14 +0000
45@@ -40,7 +40,9 @@
46 import re
47 import os
48 import subprocess
49+
50 from charmhelpers.core import hookenv
51+from charmhelpers.core.kernel import modprobe, is_module_loaded
52
53 __author__ = "Felipe Reyes <felipe.reyes@canonical.com>"
54
55@@ -82,14 +84,11 @@
56 # do we have IPv6 in the machine?
57 if os.path.isdir('/proc/sys/net/ipv6'):
58 # is ip6tables kernel module loaded?
59- lsmod = subprocess.check_output(['lsmod'], universal_newlines=True)
60- matches = re.findall('^ip6_tables[ ]+', lsmod, re.M)
61- if len(matches) == 0:
62+ if not is_module_loaded('ip6_tables'):
63 # ip6tables support isn't complete, let's try to load it
64 try:
65- subprocess.check_output(['modprobe', 'ip6_tables'],
66- universal_newlines=True)
67- # great, we could load the module
68+ modprobe('ip6_tables')
69+ # great, we can load the module
70 return True
71 except subprocess.CalledProcessError as ex:
72 hookenv.log("Couldn't load ip6_tables module: %s" % ex.output,
73
74=== modified file 'hooks/charmhelpers/contrib/openstack/amulet/deployment.py'
75--- hooks/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-18 21:16:23 +0000
76+++ hooks/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-25 16:13:14 +0000
77@@ -44,20 +44,31 @@
78 Determine if the local branch being tested is derived from its
79 stable or next (dev) branch, and based on this, use the corresonding
80 stable or next branches for the other_services."""
81+
82+ # Charms outside the lp:~openstack-charmers namespace
83 base_charms = ['mysql', 'mongodb', 'nrpe']
84
85+ # Force these charms to current series even when using an older series.
86+ # ie. Use trusty/nrpe even when series is precise, as the P charm
87+ # does not possess the necessary external master config and hooks.
88+ force_series_current = ['nrpe']
89+
90 if self.series in ['precise', 'trusty']:
91 base_series = self.series
92 else:
93 base_series = self.current_next
94
95- if self.stable:
96- for svc in other_services:
97+ for svc in other_services:
98+ if svc['name'] in force_series_current:
99+ base_series = self.current_next
100+ # If a location has been explicitly set, use it
101+ if svc.get('location'):
102+ continue
103+ if self.stable:
104 temp = 'lp:charms/{}/{}'
105 svc['location'] = temp.format(base_series,
106 svc['name'])
107- else:
108- for svc in other_services:
109+ else:
110 if svc['name'] in base_charms:
111 temp = 'lp:charms/{}/{}'
112 svc['location'] = temp.format(base_series,
113@@ -66,6 +77,7 @@
114 temp = 'lp:~openstack-charmers/charms/{}/{}/next'
115 svc['location'] = temp.format(self.current_next,
116 svc['name'])
117+
118 return other_services
119
120 def _add_services(self, this_service, other_services):
121@@ -77,21 +89,23 @@
122
123 services = other_services
124 services.append(this_service)
125+
126+ # Charms which should use the source config option
127 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
128 'ceph-osd', 'ceph-radosgw']
129- # Most OpenStack subordinate charms do not expose an origin option
130- # as that is controlled by the principle.
131- ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
132+
133+ # Charms which can not use openstack-origin, ie. many subordinates
134+ no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
135
136 if self.openstack:
137 for svc in services:
138- if svc['name'] not in use_source + ignore:
139+ if svc['name'] not in use_source + no_origin:
140 config = {'openstack-origin': self.openstack}
141 self.d.configure(svc['name'], config)
142
143 if self.source:
144 for svc in services:
145- if svc['name'] in use_source and svc['name'] not in ignore:
146+ if svc['name'] in use_source and svc['name'] not in no_origin:
147 config = {'source': self.source}
148 self.d.configure(svc['name'], config)
149
150
151=== modified file 'hooks/charmhelpers/contrib/openstack/amulet/utils.py'
152--- hooks/charmhelpers/contrib/openstack/amulet/utils.py 2015-07-16 20:18:08 +0000
153+++ hooks/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-25 16:13:14 +0000
154@@ -27,6 +27,7 @@
155 import heatclient.v1.client as heat_client
156 import keystoneclient.v2_0 as keystone_client
157 import novaclient.v1_1.client as nova_client
158+import pika
159 import swiftclient
160
161 from charmhelpers.contrib.amulet.utils import (
162@@ -602,3 +603,361 @@
163 self.log.debug('Ceph {} samples (OK): '
164 '{}'.format(sample_type, samples))
165 return None
166+
167+# rabbitmq/amqp specific helpers:
168+ def add_rmq_test_user(self, sentry_units,
169+ username="testuser1", password="changeme"):
170+ """Add a test user via the first rmq juju unit, check connection as
171+ the new user against all sentry units.
172+
173+ :param sentry_units: list of sentry unit pointers
174+ :param username: amqp user name, default to testuser1
175+ :param password: amqp user password
176+ :returns: None if successful. Raise on error.
177+ """
178+ self.log.debug('Adding rmq user ({})...'.format(username))
179+
180+ # Check that user does not already exist
181+ cmd_user_list = 'rabbitmqctl list_users'
182+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
183+ if username in output:
184+ self.log.warning('User ({}) already exists, returning '
185+ 'gracefully.'.format(username))
186+ return
187+
188+ perms = '".*" ".*" ".*"'
189+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
190+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
191+
192+ # Add user via first unit
193+ for cmd in cmds:
194+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
195+
196+ # Check connection against the other sentry_units
197+ self.log.debug('Checking user connect against units...')
198+ for sentry_unit in sentry_units:
199+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
200+ username=username,
201+ password=password)
202+ connection.close()
203+
204+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
205+ """Delete a rabbitmq user via the first rmq juju unit.
206+
207+ :param sentry_units: list of sentry unit pointers
208+ :param username: amqp user name, default to testuser1
209+ :param password: amqp user password
210+ :returns: None if successful or no such user.
211+ """
212+ self.log.debug('Deleting rmq user ({})...'.format(username))
213+
214+ # Check that the user exists
215+ cmd_user_list = 'rabbitmqctl list_users'
216+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
217+
218+ if username not in output:
219+ self.log.warning('User ({}) does not exist, returning '
220+ 'gracefully.'.format(username))
221+ return
222+
223+ # Delete the user
224+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
225+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
226+
227+ def get_rmq_cluster_status(self, sentry_unit):
228+ """Execute rabbitmq cluster status command on a unit and return
229+ the full output.
230+
231+ :param unit: sentry unit
232+ :returns: String containing console output of cluster status command
233+ """
234+ cmd = 'rabbitmqctl cluster_status'
235+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
236+ self.log.debug('{} cluster_status:\n{}'.format(
237+ sentry_unit.info['unit_name'], output))
238+ return str(output)
239+
240+ def get_rmq_cluster_running_nodes(self, sentry_unit):
241+ """Parse rabbitmqctl cluster_status output string, return list of
242+ running rabbitmq cluster nodes.
243+
244+ :param unit: sentry unit
245+ :returns: List containing node names of running nodes
246+ """
247+ # NOTE(beisner): rabbitmqctl cluster_status output is not
248+ # json-parsable, do string chop foo, then json.loads that.
249+ str_stat = self.get_rmq_cluster_status(sentry_unit)
250+ if 'running_nodes' in str_stat:
251+ pos_start = str_stat.find("{running_nodes,") + 15
252+ pos_end = str_stat.find("]},", pos_start) + 1
253+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
254+ run_nodes = json.loads(str_run_nodes)
255+ return run_nodes
256+ else:
257+ return []
258+
259+ def validate_rmq_cluster_running_nodes(self, sentry_units):
260+ """Check that all rmq unit hostnames are represented in the
261+ cluster_status output of all units.
262+
263+ :param host_names: dict of juju unit names to host names
264+ :param units: list of sentry unit pointers (all rmq units)
265+ :returns: None if successful, otherwise return error message
266+ """
267+ host_names = self.get_unit_hostnames(sentry_units)
268+ errors = []
269+
270+ # Query every unit for cluster_status running nodes
271+ for query_unit in sentry_units:
272+ query_unit_name = query_unit.info['unit_name']
273+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
274+
275+ # Confirm that every unit is represented in the queried unit's
276+ # cluster_status running nodes output.
277+ for validate_unit in sentry_units:
278+ val_host_name = host_names[validate_unit.info['unit_name']]
279+ val_node_name = 'rabbit@{}'.format(val_host_name)
280+
281+ if val_node_name not in running_nodes:
282+ errors.append('Cluster member check failed on {}: {} not '
283+ 'in {}\n'.format(query_unit_name,
284+ val_node_name,
285+ running_nodes))
286+ if errors:
287+ return ''.join(errors)
288+
289+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
290+ """Check a single juju rmq unit for ssl and port in the config file."""
291+ host = sentry_unit.info['public-address']
292+ unit_name = sentry_unit.info['unit_name']
293+
294+ conf_file = '/etc/rabbitmq/rabbitmq.config'
295+ conf_contents = str(self.file_contents_safe(sentry_unit,
296+ conf_file, max_wait=16))
297+ # Checks
298+ conf_ssl = 'ssl' in conf_contents
299+ conf_port = str(port) in conf_contents
300+
301+ # Port explicitly checked in config
302+ if port and conf_port and conf_ssl:
303+ self.log.debug('SSL is enabled @{}:{} '
304+ '({})'.format(host, port, unit_name))
305+ return True
306+ elif port and not conf_port and conf_ssl:
307+ self.log.debug('SSL is enabled @{} but not on port {} '
308+ '({})'.format(host, port, unit_name))
309+ return False
310+ # Port not checked (useful when checking that ssl is disabled)
311+ elif not port and conf_ssl:
312+ self.log.debug('SSL is enabled @{}:{} '
313+ '({})'.format(host, port, unit_name))
314+ return True
315+ elif not port and not conf_ssl:
316+ self.log.debug('SSL not enabled @{}:{} '
317+ '({})'.format(host, port, unit_name))
318+ return False
319+ else:
320+ msg = ('Unknown condition when checking SSL status @{}:{} '
321+ '({})'.format(host, port, unit_name))
322+ amulet.raise_status(amulet.FAIL, msg)
323+
324+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
325+ """Check that ssl is enabled on rmq juju sentry units.
326+
327+ :param sentry_units: list of all rmq sentry units
328+ :param port: optional ssl port override to validate
329+ :returns: None if successful, otherwise return error message
330+ """
331+ for sentry_unit in sentry_units:
332+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
333+ return ('Unexpected condition: ssl is disabled on unit '
334+ '({})'.format(sentry_unit.info['unit_name']))
335+ return None
336+
337+ def validate_rmq_ssl_disabled_units(self, sentry_units):
338+ """Check that ssl is enabled on listed rmq juju sentry units.
339+
340+ :param sentry_units: list of all rmq sentry units
341+ :returns: True if successful. Raise on error.
342+ """
343+ for sentry_unit in sentry_units:
344+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
345+ return ('Unexpected condition: ssl is enabled on unit '
346+ '({})'.format(sentry_unit.info['unit_name']))
347+ return None
348+
349+ def configure_rmq_ssl_on(self, sentry_units, deployment,
350+ port=None, max_wait=60):
351+ """Turn ssl charm config option on, with optional non-default
352+ ssl port specification. Confirm that it is enabled on every
353+ unit.
354+
355+ :param sentry_units: list of sentry units
356+ :param deployment: amulet deployment object pointer
357+ :param port: amqp port, use defaults if None
358+ :param max_wait: maximum time to wait in seconds to confirm
359+ :returns: None if successful. Raise on error.
360+ """
361+ self.log.debug('Setting ssl charm config option: on')
362+
363+ # Enable RMQ SSL
364+ config = {'ssl': 'on'}
365+ if port:
366+ config['ssl_port'] = port
367+
368+ deployment.configure('rabbitmq-server', config)
369+
370+ # Confirm
371+ tries = 0
372+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
373+ while ret and tries < (max_wait / 4):
374+ time.sleep(4)
375+ self.log.debug('Attempt {}: {}'.format(tries, ret))
376+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
377+ tries += 1
378+
379+ if ret:
380+ amulet.raise_status(amulet.FAIL, ret)
381+
382+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
383+ """Turn ssl charm config option off, confirm that it is disabled
384+ on every unit.
385+
386+ :param sentry_units: list of sentry units
387+ :param deployment: amulet deployment object pointer
388+ :param max_wait: maximum time to wait in seconds to confirm
389+ :returns: None if successful. Raise on error.
390+ """
391+ self.log.debug('Setting ssl charm config option: off')
392+
393+ # Disable RMQ SSL
394+ config = {'ssl': 'off'}
395+ deployment.configure('rabbitmq-server', config)
396+
397+ # Confirm
398+ tries = 0
399+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
400+ while ret and tries < (max_wait / 4):
401+ time.sleep(4)
402+ self.log.debug('Attempt {}: {}'.format(tries, ret))
403+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
404+ tries += 1
405+
406+ if ret:
407+ amulet.raise_status(amulet.FAIL, ret)
408+
409+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
410+ port=None, fatal=True,
411+ username="testuser1", password="changeme"):
412+ """Establish and return a pika amqp connection to the rabbitmq service
413+ running on a rmq juju unit.
414+
415+ :param sentry_unit: sentry unit pointer
416+ :param ssl: boolean, default to False
417+ :param port: amqp port, use defaults if None
418+ :param fatal: boolean, default to True (raises on connect error)
419+ :param username: amqp user name, default to testuser1
420+ :param password: amqp user password
421+ :returns: pika amqp connection pointer or None if failed and non-fatal
422+ """
423+ host = sentry_unit.info['public-address']
424+ unit_name = sentry_unit.info['unit_name']
425+
426+ # Default port logic if port is not specified
427+ if ssl and not port:
428+ port = 5671
429+ elif not ssl and not port:
430+ port = 5672
431+
432+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
433+ '{}...'.format(host, port, unit_name, username))
434+
435+ try:
436+ credentials = pika.PlainCredentials(username, password)
437+ parameters = pika.ConnectionParameters(host=host, port=port,
438+ credentials=credentials,
439+ ssl=ssl,
440+ connection_attempts=3,
441+ retry_delay=5,
442+ socket_timeout=1)
443+ connection = pika.BlockingConnection(parameters)
444+ assert connection.server_properties['product'] == 'RabbitMQ'
445+ self.log.debug('Connect OK')
446+ return connection
447+ except Exception as e:
448+ msg = ('amqp connection failed to {}:{} as '
449+ '{} ({})'.format(host, port, username, str(e)))
450+ if fatal:
451+ amulet.raise_status(amulet.FAIL, msg)
452+ else:
453+ self.log.warn(msg)
454+ return None
455+
456+ def publish_amqp_message_by_unit(self, sentry_unit, message,
457+ queue="test", ssl=False,
458+ username="testuser1",
459+ password="changeme",
460+ port=None):
461+ """Publish an amqp message to a rmq juju unit.
462+
463+ :param sentry_unit: sentry unit pointer
464+ :param message: amqp message string
465+ :param queue: message queue, default to test
466+ :param username: amqp user name, default to testuser1
467+ :param password: amqp user password
468+ :param ssl: boolean, default to False
469+ :param port: amqp port, use defaults if None
470+ :returns: None. Raises exception if publish failed.
471+ """
472+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
473+ message))
474+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
475+ port=port,
476+ username=username,
477+ password=password)
478+
479+ # NOTE(beisner): extra debug here re: pika hang potential:
480+ # https://github.com/pika/pika/issues/297
481+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
482+ self.log.debug('Defining channel...')
483+ channel = connection.channel()
484+ self.log.debug('Declaring queue...')
485+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
486+ self.log.debug('Publishing message...')
487+ channel.basic_publish(exchange='', routing_key=queue, body=message)
488+ self.log.debug('Closing channel...')
489+ channel.close()
490+ self.log.debug('Closing connection...')
491+ connection.close()
492+
493+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
494+ username="testuser1",
495+ password="changeme",
496+ ssl=False, port=None):
497+ """Get an amqp message from a rmq juju unit.
498+
499+ :param sentry_unit: sentry unit pointer
500+ :param queue: message queue, default to test
501+ :param username: amqp user name, default to testuser1
502+ :param password: amqp user password
503+ :param ssl: boolean, default to False
504+ :param port: amqp port, use defaults if None
505+ :returns: amqp message body as string. Raise if get fails.
506+ """
507+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
508+ port=port,
509+ username=username,
510+ password=password)
511+ channel = connection.channel()
512+ method_frame, _, body = channel.basic_get(queue)
513+
514+ if method_frame:
515+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
516+ body))
517+ channel.basic_ack(method_frame.delivery_tag)
518+ channel.close()
519+ connection.close()
520+ return body
521+ else:
522+ msg = 'No message retrieved.'
523+ amulet.raise_status(amulet.FAIL, msg)
524
525=== modified file 'hooks/charmhelpers/contrib/openstack/context.py'
526--- hooks/charmhelpers/contrib/openstack/context.py 2015-09-03 09:43:15 +0000
527+++ hooks/charmhelpers/contrib/openstack/context.py 2015-09-25 16:13:14 +0000
528@@ -14,6 +14,7 @@
529 # You should have received a copy of the GNU Lesser General Public License
530 # along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
531
532+import glob
533 import json
534 import os
535 import re
536@@ -194,10 +195,50 @@
537 class OSContextGenerator(object):
538 """Base class for all context generators."""
539 interfaces = []
540+ related = False
541+ complete = False
542+ missing_data = []
543
544 def __call__(self):
545 raise NotImplementedError
546
547+ def context_complete(self, ctxt):
548+ """Check for missing data for the required context data.
549+ Set self.missing_data if it exists and return False.
550+ Set self.complete if no missing data and return True.
551+ """
552+ # Fresh start
553+ self.complete = False
554+ self.missing_data = []
555+ for k, v in six.iteritems(ctxt):
556+ if v is None or v == '':
557+ if k not in self.missing_data:
558+ self.missing_data.append(k)
559+
560+ if self.missing_data:
561+ self.complete = False
562+ log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO)
563+ else:
564+ self.complete = True
565+ return self.complete
566+
567+ def get_related(self):
568+ """Check if any of the context interfaces have relation ids.
569+ Set self.related and return True if one of the interfaces
570+ has relation ids.
571+ """
572+ # Fresh start
573+ self.related = False
574+ try:
575+ for interface in self.interfaces:
576+ if relation_ids(interface):
577+ self.related = True
578+ return self.related
579+ except AttributeError as e:
580+ log("{} {}"
581+ "".format(self, e), 'INFO')
582+ return self.related
583+
584
585 class SharedDBContext(OSContextGenerator):
586 interfaces = ['shared-db']
587@@ -213,6 +254,7 @@
588 self.database = database
589 self.user = user
590 self.ssl_dir = ssl_dir
591+ self.rel_name = self.interfaces[0]
592
593 def __call__(self):
594 self.database = self.database or config('database')
595@@ -246,6 +288,7 @@
596 password_setting = self.relation_prefix + '_password'
597
598 for rid in relation_ids(self.interfaces[0]):
599+ self.related = True
600 for unit in related_units(rid):
601 rdata = relation_get(rid=rid, unit=unit)
602 host = rdata.get('db_host')
603@@ -257,7 +300,7 @@
604 'database_password': rdata.get(password_setting),
605 'database_type': 'mysql'
606 }
607- if context_complete(ctxt):
608+ if self.context_complete(ctxt):
609 db_ssl(rdata, ctxt, self.ssl_dir)
610 return ctxt
611 return {}
612@@ -278,6 +321,7 @@
613
614 ctxt = {}
615 for rid in relation_ids(self.interfaces[0]):
616+ self.related = True
617 for unit in related_units(rid):
618 rel_host = relation_get('host', rid=rid, unit=unit)
619 rel_user = relation_get('user', rid=rid, unit=unit)
620@@ -287,7 +331,7 @@
621 'database_user': rel_user,
622 'database_password': rel_passwd,
623 'database_type': 'postgresql'}
624- if context_complete(ctxt):
625+ if self.context_complete(ctxt):
626 return ctxt
627
628 return {}
629@@ -348,6 +392,7 @@
630 ctxt['signing_dir'] = cachedir
631
632 for rid in relation_ids(self.rel_name):
633+ self.related = True
634 for unit in related_units(rid):
635 rdata = relation_get(rid=rid, unit=unit)
636 serv_host = rdata.get('service_host')
637@@ -366,7 +411,7 @@
638 'service_protocol': svc_protocol,
639 'auth_protocol': auth_protocol})
640
641- if context_complete(ctxt):
642+ if self.context_complete(ctxt):
643 # NOTE(jamespage) this is required for >= icehouse
644 # so a missing value just indicates keystone needs
645 # upgrading
646@@ -405,6 +450,7 @@
647 ctxt = {}
648 for rid in relation_ids(self.rel_name):
649 ha_vip_only = False
650+ self.related = True
651 for unit in related_units(rid):
652 if relation_get('clustered', rid=rid, unit=unit):
653 ctxt['clustered'] = True
654@@ -437,7 +483,7 @@
655 ha_vip_only = relation_get('ha-vip-only',
656 rid=rid, unit=unit) is not None
657
658- if context_complete(ctxt):
659+ if self.context_complete(ctxt):
660 if 'rabbit_ssl_ca' in ctxt:
661 if not self.ssl_dir:
662 log("Charm not setup for ssl support but ssl ca "
663@@ -469,7 +515,7 @@
664 ctxt['oslo_messaging_flags'] = config_flags_parser(
665 oslo_messaging_flags)
666
667- if not context_complete(ctxt):
668+ if not self.complete:
669 return {}
670
671 return ctxt
672@@ -485,13 +531,15 @@
673
674 log('Generating template context for ceph', level=DEBUG)
675 mon_hosts = []
676- auth = None
677- key = None
678- use_syslog = str(config('use-syslog')).lower()
679+ ctxt = {
680+ 'use_syslog': str(config('use-syslog')).lower()
681+ }
682 for rid in relation_ids('ceph'):
683 for unit in related_units(rid):
684- auth = relation_get('auth', rid=rid, unit=unit)
685- key = relation_get('key', rid=rid, unit=unit)
686+ if not ctxt.get('auth'):
687+ ctxt['auth'] = relation_get('auth', rid=rid, unit=unit)
688+ if not ctxt.get('key'):
689+ ctxt['key'] = relation_get('key', rid=rid, unit=unit)
690 ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
691 unit=unit)
692 unit_priv_addr = relation_get('private-address', rid=rid,
693@@ -500,15 +548,12 @@
694 ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
695 mon_hosts.append(ceph_addr)
696
697- ctxt = {'mon_hosts': ' '.join(sorted(mon_hosts)),
698- 'auth': auth,
699- 'key': key,
700- 'use_syslog': use_syslog}
701+ ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
702
703 if not os.path.isdir('/etc/ceph'):
704 os.mkdir('/etc/ceph')
705
706- if not context_complete(ctxt):
707+ if not self.context_complete(ctxt):
708 return {}
709
710 ensure_packages(['ceph-common'])
711@@ -1334,8 +1379,18 @@
712 ports = mappings.values()
713 napi_settings = NeutronAPIContext()()
714 mtu = napi_settings.get('network_device_mtu')
715+ all_ports = set()
716+ # If any of ports is a vlan device, its underlying device must have
717+ # mtu applied first.
718+ for port in ports:
719+ for lport in glob.glob("/sys/class/net/%s/lower_*" % port):
720+ lport = os.path.basename(lport)
721+ all_ports.add(lport.split('_')[1])
722+
723+ all_ports = list(all_ports)
724+ all_ports.extend(ports)
725 if mtu:
726- ctxt["devs"] = '\\n'.join(ports)
727+ ctxt["devs"] = '\\n'.join(all_ports)
728 ctxt['mtu'] = mtu
729
730 return ctxt
731@@ -1367,6 +1422,6 @@
732 'auth_protocol':
733 rdata.get('auth_protocol') or 'http',
734 }
735- if context_complete(ctxt):
736+ if self.context_complete(ctxt):
737 return ctxt
738 return {}
739
740=== modified file 'hooks/charmhelpers/contrib/openstack/templating.py'
741--- hooks/charmhelpers/contrib/openstack/templating.py 2015-07-23 15:43:01 +0000
742+++ hooks/charmhelpers/contrib/openstack/templating.py 2015-09-25 16:13:14 +0000
743@@ -18,7 +18,7 @@
744
745 import six
746
747-from charmhelpers.fetch import apt_install
748+from charmhelpers.fetch import apt_install, apt_update
749 from charmhelpers.core.hookenv import (
750 log,
751 ERROR,
752@@ -29,6 +29,7 @@
753 try:
754 from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
755 except ImportError:
756+ apt_update(fatal=True)
757 apt_install('python-jinja2', fatal=True)
758 from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
759
760@@ -112,7 +113,7 @@
761
762 def complete_contexts(self):
763 '''
764- Return a list of interfaces that have atisfied contexts.
765+ Return a list of interfaces that have satisfied contexts.
766 '''
767 if self._complete_contexts:
768 return self._complete_contexts
769@@ -293,3 +294,30 @@
770 [interfaces.extend(i.complete_contexts())
771 for i in six.itervalues(self.templates)]
772 return interfaces
773+
774+ def get_incomplete_context_data(self, interfaces):
775+ '''
776+ Return dictionary of relation status of interfaces and any missing
777+ required context data. Example:
778+ {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
779+ 'zeromq-configuration': {'related': False}}
780+ '''
781+ incomplete_context_data = {}
782+
783+ for i in six.itervalues(self.templates):
784+ for context in i.contexts:
785+ for interface in interfaces:
786+ related = False
787+ if interface in context.interfaces:
788+ related = context.get_related()
789+ missing_data = context.missing_data
790+ if missing_data:
791+ incomplete_context_data[interface] = {'missing_data': missing_data}
792+ if related:
793+ if incomplete_context_data.get(interface):
794+ incomplete_context_data[interface].update({'related': True})
795+ else:
796+ incomplete_context_data[interface] = {'related': True}
797+ else:
798+ incomplete_context_data[interface] = {'related': False}
799+ return incomplete_context_data
800
801=== modified file 'hooks/charmhelpers/contrib/openstack/utils.py'
802--- hooks/charmhelpers/contrib/openstack/utils.py 2015-09-14 20:33:41 +0000
803+++ hooks/charmhelpers/contrib/openstack/utils.py 2015-09-25 16:13:14 +0000
804@@ -42,7 +42,9 @@
805 charm_dir,
806 INFO,
807 relation_ids,
808- relation_set
809+ relation_set,
810+ status_set,
811+ hook_name
812 )
813
814 from charmhelpers.contrib.storage.linux.lvm import (
815@@ -752,47 +754,264 @@
816 return projects[key]
817
818 return None
819-
820-
821-def do_action_openstack_upgrade(package, upgrade_callback, configs):
822- """Perform action-managed OpenStack upgrade.
823-
824- Upgrades packages to the configured openstack-origin version and sets
825- the corresponding action status as a result.
826-
827- If the charm was installed from source we cannot upgrade it.
828- For backwards compatibility a config flag (action-managed-upgrade) must
829- be set for this code to run, otherwise a full service level upgrade will
830- fire on config-changed.
831-
832- @param package: package name for determining if upgrade available
833- @param upgrade_callback: function callback to charm's upgrade function
834- @param configs: templating object derived from OSConfigRenderer class
835-
836- @return: True if upgrade successful; False if upgrade failed or skipped
837- """
838- ret = False
839-
840- if git_install_requested():
841- action_set({'outcome': 'installed from source, skipped upgrade.'})
842- else:
843- if openstack_upgrade_available(package):
844- if config('action-managed-upgrade'):
845- juju_log('Upgrading OpenStack release')
846-
847- try:
848- upgrade_callback(configs=configs)
849- action_set({'outcome': 'success, upgrade completed.'})
850- ret = True
851- except:
852- action_set({'outcome': 'upgrade failed, see traceback.'})
853- action_set({'traceback': traceback.format_exc()})
854- action_fail('do_openstack_upgrade resulted in an '
855- 'unexpected error')
856- else:
857- action_set({'outcome': 'action-managed-upgrade config is '
858- 'False, skipped upgrade.'})
859- else:
860- action_set({'outcome': 'no upgrade available.'})
861-
862- return ret
863+<<<<<<< TREE
864+
865+
866+def do_action_openstack_upgrade(package, upgrade_callback, configs):
867+ """Perform action-managed OpenStack upgrade.
868+
869+ Upgrades packages to the configured openstack-origin version and sets
870+ the corresponding action status as a result.
871+
872+ If the charm was installed from source we cannot upgrade it.
873+ For backwards compatibility a config flag (action-managed-upgrade) must
874+ be set for this code to run, otherwise a full service level upgrade will
875+ fire on config-changed.
876+
877+ @param package: package name for determining if upgrade available
878+ @param upgrade_callback: function callback to charm's upgrade function
879+ @param configs: templating object derived from OSConfigRenderer class
880+
881+ @return: True if upgrade successful; False if upgrade failed or skipped
882+ """
883+ ret = False
884+
885+ if git_install_requested():
886+ action_set({'outcome': 'installed from source, skipped upgrade.'})
887+ else:
888+ if openstack_upgrade_available(package):
889+ if config('action-managed-upgrade'):
890+ juju_log('Upgrading OpenStack release')
891+
892+ try:
893+ upgrade_callback(configs=configs)
894+ action_set({'outcome': 'success, upgrade completed.'})
895+ ret = True
896+ except:
897+ action_set({'outcome': 'upgrade failed, see traceback.'})
898+ action_set({'traceback': traceback.format_exc()})
899+ action_fail('do_openstack_upgrade resulted in an '
900+ 'unexpected error')
901+ else:
902+ action_set({'outcome': 'action-managed-upgrade config is '
903+ 'False, skipped upgrade.'})
904+ else:
905+ action_set({'outcome': 'no upgrade available.'})
906+
907+ return ret
908+=======
909+
910+
911+def os_workload_status(configs, required_interfaces, charm_func=None):
912+ """
913+ Decorator to set workload status based on complete contexts
914+ """
915+ def wrap(f):
916+ @wraps(f)
917+ def wrapped_f(*args, **kwargs):
918+ # Run the original function first
919+ f(*args, **kwargs)
920+ # Set workload status now that contexts have been
921+ # acted on
922+ set_os_workload_status(configs, required_interfaces, charm_func)
923+ return wrapped_f
924+ return wrap
925+
926+
927+def set_os_workload_status(configs, required_interfaces, charm_func=None):
928+ """
929+ Set workload status based on complete contexts.
930+ status-set missing or incomplete contexts
931+ and juju-log details of missing required data.
932+ charm_func is a charm specific function to run checking
933+ for charm specific requirements such as a VIP setting.
934+ """
935+ incomplete_rel_data = incomplete_relation_data(configs, required_interfaces)
936+ state = 'active'
937+ missing_relations = []
938+ incomplete_relations = []
939+ message = None
940+ charm_state = None
941+ charm_message = None
942+
943+ for generic_interface in incomplete_rel_data.keys():
944+ related_interface = None
945+ missing_data = {}
946+ # Related or not?
947+ for interface in incomplete_rel_data[generic_interface]:
948+ if incomplete_rel_data[generic_interface][interface].get('related'):
949+ related_interface = interface
950+ missing_data = incomplete_rel_data[generic_interface][interface].get('missing_data')
951+ # No relation ID for the generic_interface
952+ if not related_interface:
953+ juju_log("{} relation is missing and must be related for "
954+ "functionality. ".format(generic_interface), 'WARN')
955+ state = 'blocked'
956+ if generic_interface not in missing_relations:
957+ missing_relations.append(generic_interface)
958+ else:
959+ # Relation ID exists but no related unit
960+ if not missing_data:
961+ # Edge case relation ID exists but departing
962+ if ('departed' in hook_name() or 'broken' in hook_name()) \
963+ and related_interface in hook_name():
964+ state = 'blocked'
965+ if generic_interface not in missing_relations:
966+ missing_relations.append(generic_interface)
967+ juju_log("{} relation's interface, {}, "
968+ "relationship is departed or broken "
969+ "and is required for functionality."
970+ "".format(generic_interface, related_interface), "WARN")
971+ # Normal case relation ID exists but no related unit
972+ # (joining)
973+ else:
974+ juju_log("{} relations's interface, {}, is related but has "
975+ "no units in the relation."
976+ "".format(generic_interface, related_interface), "INFO")
977+ # Related unit exists and data missing on the relation
978+ else:
979+ juju_log("{} relation's interface, {}, is related awaiting "
980+ "the following data from the relationship: {}. "
981+ "".format(generic_interface, related_interface,
982+ ", ".join(missing_data)), "INFO")
983+ if state != 'blocked':
984+ state = 'waiting'
985+ if generic_interface not in incomplete_relations \
986+ and generic_interface not in missing_relations:
987+ incomplete_relations.append(generic_interface)
988+
989+ if missing_relations:
990+ message = "Missing relations: {}".format(", ".join(missing_relations))
991+ if incomplete_relations:
992+ message += "; incomplete relations: {}" \
993+ "".format(", ".join(incomplete_relations))
994+ state = 'blocked'
995+ elif incomplete_relations:
996+ message = "Incomplete relations: {}" \
997+ "".format(", ".join(incomplete_relations))
998+ state = 'waiting'
999+
1000+ # Run charm specific checks
1001+ if charm_func:
1002+ charm_state, charm_message = charm_func(configs)
1003+ if charm_state != 'active' and charm_state != 'unknown':
1004+ state = workload_state_compare(state, charm_state)
1005+ if message:
1006+ message = "{} {}".format(message, charm_message)
1007+ else:
1008+ message = charm_message
1009+
1010+ # Set to active if all requirements have been met
1011+ if state == 'active':
1012+ message = "Unit is ready"
1013+ juju_log(message, "INFO")
1014+
1015+ status_set(state, message)
1016+
1017+
1018+def workload_state_compare(current_workload_state, workload_state):
1019+ """ Return highest priority of two states"""
1020+ hierarchy = {'unknown': -1,
1021+ 'active': 0,
1022+ 'maintenance': 1,
1023+ 'waiting': 2,
1024+ 'blocked': 3,
1025+ }
1026+
1027+ if hierarchy.get(workload_state) is None:
1028+ workload_state = 'unknown'
1029+ if hierarchy.get(current_workload_state) is None:
1030+ current_workload_state = 'unknown'
1031+
1032+ # Set workload_state based on hierarchy of statuses
1033+ if hierarchy.get(current_workload_state) > hierarchy.get(workload_state):
1034+ return current_workload_state
1035+ else:
1036+ return workload_state
1037+
1038+
1039+def incomplete_relation_data(configs, required_interfaces):
1040+ """
1041+ Check complete contexts against required_interfaces
1042+ Return dictionary of incomplete relation data.
1043+
1044+ configs is an OSConfigRenderer object with configs registered
1045+
1046+ required_interfaces is a dictionary of required general interfaces
1047+ with dictionary values of possible specific interfaces.
1048+ Example:
1049+ required_interfaces = {'database': ['shared-db', 'pgsql-db']}
1050+
1051+ The interface is said to be satisfied if anyone of the interfaces in the
1052+ list has a complete context.
1053+
1054+ Return dictionary of incomplete or missing required contexts with relation
1055+ status of interfaces and any missing data points. Example:
1056+ {'message':
1057+ {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
1058+ 'zeromq-configuration': {'related': False}},
1059+ 'identity':
1060+ {'identity-service': {'related': False}},
1061+ 'database':
1062+ {'pgsql-db': {'related': False},
1063+ 'shared-db': {'related': True}}}
1064+ """
1065+ complete_ctxts = configs.complete_contexts()
1066+ incomplete_relations = []
1067+ for svc_type in required_interfaces.keys():
1068+ # Avoid duplicates
1069+ found_ctxt = False
1070+ for interface in required_interfaces[svc_type]:
1071+ if interface in complete_ctxts:
1072+ found_ctxt = True
1073+ if not found_ctxt:
1074+ incomplete_relations.append(svc_type)
1075+ incomplete_context_data = {}
1076+ for i in incomplete_relations:
1077+ incomplete_context_data[i] = configs.get_incomplete_context_data(required_interfaces[i])
1078+ return incomplete_context_data
1079+
1080+
1081+def do_action_openstack_upgrade(package, upgrade_callback, configs):
1082+ """Perform action-managed OpenStack upgrade.
1083+
1084+ Upgrades packages to the configured openstack-origin version and sets
1085+ the corresponding action status as a result.
1086+
1087+ If the charm was installed from source we cannot upgrade it.
1088+ For backwards compatibility a config flag (action-managed-upgrade) must
1089+ be set for this code to run, otherwise a full service level upgrade will
1090+ fire on config-changed.
1091+
1092+ @param package: package name for determining if upgrade available
1093+ @param upgrade_callback: function callback to charm's upgrade function
1094+ @param configs: templating object derived from OSConfigRenderer class
1095+
1096+ @return: True if upgrade successful; False if upgrade failed or skipped
1097+ """
1098+ ret = False
1099+
1100+ if git_install_requested():
1101+ action_set({'outcome': 'installed from source, skipped upgrade.'})
1102+ else:
1103+ if openstack_upgrade_available(package):
1104+ if config('action-managed-upgrade'):
1105+ juju_log('Upgrading OpenStack release')
1106+
1107+ try:
1108+ upgrade_callback(configs=configs)
1109+ action_set({'outcome': 'success, upgrade completed.'})
1110+ ret = True
1111+ except:
1112+ action_set({'outcome': 'upgrade failed, see traceback.'})
1113+ action_set({'traceback': traceback.format_exc()})
1114+ action_fail('do_openstack_upgrade resulted in an '
1115+ 'unexpected error')
1116+ else:
1117+ action_set({'outcome': 'action-managed-upgrade config is '
1118+ 'False, skipped upgrade.'})
1119+ else:
1120+ action_set({'outcome': 'no upgrade available.'})
1121+
1122+ return ret
1123+>>>>>>> MERGE-SOURCE
1124
1125=== modified file 'hooks/charmhelpers/contrib/storage/linux/ceph.py'
1126--- hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-07-16 20:18:08 +0000
1127+++ hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-09-25 16:13:14 +0000
1128@@ -28,6 +28,7 @@
1129 import shutil
1130 import json
1131 import time
1132+import uuid
1133
1134 from subprocess import (
1135 check_call,
1136@@ -35,8 +36,10 @@
1137 CalledProcessError,
1138 )
1139 from charmhelpers.core.hookenv import (
1140+ local_unit,
1141 relation_get,
1142 relation_ids,
1143+ relation_set,
1144 related_units,
1145 log,
1146 DEBUG,
1147@@ -56,6 +59,8 @@
1148 apt_install,
1149 )
1150
1151+from charmhelpers.core.kernel import modprobe
1152+
1153 KEYRING = '/etc/ceph/ceph.client.{}.keyring'
1154 KEYFILE = '/etc/ceph/ceph.client.{}.key'
1155
1156@@ -288,17 +293,6 @@
1157 os.chown(data_src_dst, uid, gid)
1158
1159
1160-# TODO: re-use
1161-def modprobe(module):
1162- """Load a kernel module and configure for auto-load on reboot."""
1163- log('Loading kernel module', level=INFO)
1164- cmd = ['modprobe', module]
1165- check_call(cmd)
1166- with open('/etc/modules', 'r+') as modules:
1167- if module not in modules.read():
1168- modules.write(module)
1169-
1170-
1171 def copy_files(src, dst, symlinks=False, ignore=None):
1172 """Copy files from src to dst."""
1173 for item in os.listdir(src):
1174@@ -411,17 +405,52 @@
1175
1176 The API is versioned and defaults to version 1.
1177 """
1178- def __init__(self, api_version=1):
1179+ def __init__(self, api_version=1, request_id=None):
1180 self.api_version = api_version
1181+ if request_id:
1182+ self.request_id = request_id
1183+ else:
1184+ self.request_id = str(uuid.uuid1())
1185 self.ops = []
1186
1187 def add_op_create_pool(self, name, replica_count=3):
1188 self.ops.append({'op': 'create-pool', 'name': name,
1189 'replicas': replica_count})
1190
1191+ def set_ops(self, ops):
1192+ """Set request ops to provided value.
1193+
1194+ Useful for injecting ops that come from a previous request
1195+ to allow comparisons to ensure validity.
1196+ """
1197+ self.ops = ops
1198+
1199 @property
1200 def request(self):
1201- return json.dumps({'api-version': self.api_version, 'ops': self.ops})
1202+ return json.dumps({'api-version': self.api_version, 'ops': self.ops,
1203+ 'request-id': self.request_id})
1204+
1205+ def _ops_equal(self, other):
1206+ if len(self.ops) == len(other.ops):
1207+ for req_no in range(0, len(self.ops)):
1208+ for key in ['replicas', 'name', 'op']:
1209+ if self.ops[req_no][key] != other.ops[req_no][key]:
1210+ return False
1211+ else:
1212+ return False
1213+ return True
1214+
1215+ def __eq__(self, other):
1216+ if not isinstance(other, self.__class__):
1217+ return False
1218+ if self.api_version == other.api_version and \
1219+ self._ops_equal(other):
1220+ return True
1221+ else:
1222+ return False
1223+
1224+ def __ne__(self, other):
1225+ return not self.__eq__(other)
1226
1227
1228 class CephBrokerRsp(object):
1229@@ -431,14 +460,198 @@
1230
1231 The API is versioned and defaults to version 1.
1232 """
1233+
1234 def __init__(self, encoded_rsp):
1235 self.api_version = None
1236 self.rsp = json.loads(encoded_rsp)
1237
1238 @property
1239+ def request_id(self):
1240+ return self.rsp.get('request-id')
1241+
1242+ @property
1243 def exit_code(self):
1244 return self.rsp.get('exit-code')
1245
1246 @property
1247 def exit_msg(self):
1248 return self.rsp.get('stderr')
1249+
1250+
1251+# Ceph Broker Conversation:
1252+# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
1253+# and send that request to ceph via the ceph relation. The CephBrokerRq has a
1254+# unique id so that the client can identity which CephBrokerRsp is associated
1255+# with the request. Ceph will also respond to each client unit individually
1256+# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
1257+# via key broker-rsp-glance-0
1258+#
1259+# To use this the charm can just do something like:
1260+#
1261+# from charmhelpers.contrib.storage.linux.ceph import (
1262+# send_request_if_needed,
1263+# is_request_complete,
1264+# CephBrokerRq,
1265+# )
1266+#
1267+# @hooks.hook('ceph-relation-changed')
1268+# def ceph_changed():
1269+# rq = CephBrokerRq()
1270+# rq.add_op_create_pool(name='poolname', replica_count=3)
1271+#
1272+# if is_request_complete(rq):
1273+# <Request complete actions>
1274+# else:
1275+# send_request_if_needed(get_ceph_request())
1276+#
1277+# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
1278+# of glance having sent a request to ceph which ceph has successfully processed
1279+# 'ceph:8': {
1280+# 'ceph/0': {
1281+# 'auth': 'cephx',
1282+# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
1283+# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
1284+# 'ceph-public-address': '10.5.44.103',
1285+# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
1286+# 'private-address': '10.5.44.103',
1287+# },
1288+# 'glance/0': {
1289+# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
1290+# '"ops": [{"replicas": 3, "name": "glance", '
1291+# '"op": "create-pool"}]}'),
1292+# 'private-address': '10.5.44.109',
1293+# },
1294+# }
1295+
1296+def get_previous_request(rid):
1297+ """Return the last ceph broker request sent on a given relation
1298+
1299+ @param rid: Relation id to query for request
1300+ """
1301+ request = None
1302+ broker_req = relation_get(attribute='broker_req', rid=rid,
1303+ unit=local_unit())
1304+ if broker_req:
1305+ request_data = json.loads(broker_req)
1306+ request = CephBrokerRq(api_version=request_data['api-version'],
1307+ request_id=request_data['request-id'])
1308+ request.set_ops(request_data['ops'])
1309+
1310+ return request
1311+
1312+
1313+def get_request_states(request):
1314+ """Return a dict of requests per relation id with their corresponding
1315+ completion state.
1316+
1317+ This allows a charm, which has a request for ceph, to see whether there is
1318+ an equivalent request already being processed and if so what state that
1319+ request is in.
1320+
1321+ @param request: A CephBrokerRq object
1322+ """
1323+ complete = []
1324+ requests = {}
1325+ for rid in relation_ids('ceph'):
1326+ complete = False
1327+ previous_request = get_previous_request(rid)
1328+ if request == previous_request:
1329+ sent = True
1330+ complete = is_request_complete_for_rid(previous_request, rid)
1331+ else:
1332+ sent = False
1333+ complete = False
1334+
1335+ requests[rid] = {
1336+ 'sent': sent,
1337+ 'complete': complete,
1338+ }
1339+
1340+ return requests
1341+
1342+
1343+def is_request_sent(request):
1344+ """Check to see if a functionally equivalent request has already been sent
1345+
1346+ Returns True if a similair request has been sent
1347+
1348+ @param request: A CephBrokerRq object
1349+ """
1350+ states = get_request_states(request)
1351+ for rid in states.keys():
1352+ if not states[rid]['sent']:
1353+ return False
1354+
1355+ return True
1356+
1357+
1358+def is_request_complete(request):
1359+ """Check to see if a functionally equivalent request has already been
1360+ completed
1361+
1362+ Returns True if a similair request has been completed
1363+
1364+ @param request: A CephBrokerRq object
1365+ """
1366+ states = get_request_states(request)
1367+ for rid in states.keys():
1368+ if not states[rid]['complete']:
1369+ return False
1370+
1371+ return True
1372+
1373+
1374+def is_request_complete_for_rid(request, rid):
1375+ """Check if a given request has been completed on the given relation
1376+
1377+ @param request: A CephBrokerRq object
1378+ @param rid: Relation ID
1379+ """
1380+ broker_key = get_broker_rsp_key()
1381+ for unit in related_units(rid):
1382+ rdata = relation_get(rid=rid, unit=unit)
1383+ if rdata.get(broker_key):
1384+ rsp = CephBrokerRsp(rdata.get(broker_key))
1385+ if rsp.request_id == request.request_id:
1386+ if not rsp.exit_code:
1387+ return True
1388+ else:
1389+ # The remote unit sent no reply targeted at this unit so either the
1390+ # remote ceph cluster does not support unit targeted replies or it
1391+ # has not processed our request yet.
1392+ if rdata.get('broker_rsp'):
1393+ request_data = json.loads(rdata['broker_rsp'])
1394+ if request_data.get('request-id'):
1395+ log('Ignoring legacy broker_rsp without unit key as remote '
1396+ 'service supports unit specific replies', level=DEBUG)
1397+ else:
1398+ log('Using legacy broker_rsp as remote service does not '
1399+ 'supports unit specific replies', level=DEBUG)
1400+ rsp = CephBrokerRsp(rdata['broker_rsp'])
1401+ if not rsp.exit_code:
1402+ return True
1403+
1404+ return False
1405+
1406+
1407+def get_broker_rsp_key():
1408+ """Return broker response key for this unit
1409+
1410+ This is the key that ceph is going to use to pass request status
1411+ information back to this unit
1412+ """
1413+ return 'broker-rsp-' + local_unit().replace('/', '-')
1414+
1415+
1416+def send_request_if_needed(request):
1417+ """Send broker request if an equivalent request has not already been sent
1418+
1419+ @param request: A CephBrokerRq object
1420+ """
1421+ if is_request_sent(request):
1422+ log('Request already sent but not complete, not sending new request',
1423+ level=DEBUG)
1424+ else:
1425+ for rid in relation_ids('ceph'):
1426+ log('Sending request {}'.format(request.request_id), level=DEBUG)
1427+ relation_set(relation_id=rid, broker_req=request.request)
1428
1429=== modified file 'hooks/charmhelpers/core/host.py'
1430--- hooks/charmhelpers/core/host.py 2015-08-20 09:17:04 +0000
1431+++ hooks/charmhelpers/core/host.py 2015-09-25 16:13:14 +0000
1432@@ -63,32 +63,48 @@
1433 return service_result
1434
1435
1436-def service_pause(service_name, init_dir=None):
1437+def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
1438 """Pause a system service.
1439
1440 Stop it, and prevent it from starting again at boot."""
1441- if init_dir is None:
1442- init_dir = "/etc/init"
1443 stopped = service_stop(service_name)
1444- # XXX: Support systemd too
1445- override_path = os.path.join(
1446- init_dir, '{}.override'.format(service_name))
1447- with open(override_path, 'w') as fh:
1448- fh.write("manual\n")
1449+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
1450+ sysv_file = os.path.join(initd_dir, service_name)
1451+ if os.path.exists(upstart_file):
1452+ override_path = os.path.join(
1453+ init_dir, '{}.override'.format(service_name))
1454+ with open(override_path, 'w') as fh:
1455+ fh.write("manual\n")
1456+ elif os.path.exists(sysv_file):
1457+ subprocess.check_call(["update-rc.d", service_name, "disable"])
1458+ else:
1459+ # XXX: Support SystemD too
1460+ raise ValueError(
1461+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
1462+ service_name, upstart_file, sysv_file))
1463 return stopped
1464
1465
1466-def service_resume(service_name, init_dir=None):
1467+def service_resume(service_name, init_dir="/etc/init",
1468+ initd_dir="/etc/init.d"):
1469 """Resume a system service.
1470
1471 Reenable starting again at boot. Start the service"""
1472- # XXX: Support systemd too
1473- if init_dir is None:
1474- init_dir = "/etc/init"
1475- override_path = os.path.join(
1476- init_dir, '{}.override'.format(service_name))
1477- if os.path.exists(override_path):
1478- os.unlink(override_path)
1479+ upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
1480+ sysv_file = os.path.join(initd_dir, service_name)
1481+ if os.path.exists(upstart_file):
1482+ override_path = os.path.join(
1483+ init_dir, '{}.override'.format(service_name))
1484+ if os.path.exists(override_path):
1485+ os.unlink(override_path)
1486+ elif os.path.exists(sysv_file):
1487+ subprocess.check_call(["update-rc.d", service_name, "enable"])
1488+ else:
1489+ # XXX: Support SystemD too
1490+ raise ValueError(
1491+ "Unable to detect {0} as either Upstart {1} or SysV {2}".format(
1492+ service_name, upstart_file, sysv_file))
1493+
1494 started = service_start(service_name)
1495 return started
1496
1497
1498=== modified file 'hooks/charmhelpers/core/hugepage.py'
1499--- hooks/charmhelpers/core/hugepage.py 2015-08-19 13:51:39 +0000
1500+++ hooks/charmhelpers/core/hugepage.py 2015-09-25 16:13:14 +0000
1501@@ -25,11 +25,13 @@
1502 fstab_mount,
1503 mkdir,
1504 )
1505+from charmhelpers.core.strutils import bytes_from_string
1506+from subprocess import check_output
1507
1508
1509 def hugepage_support(user, group='hugetlb', nr_hugepages=256,
1510 max_map_count=65536, mnt_point='/run/hugepages/kvm',
1511- pagesize='2MB', mount=True):
1512+ pagesize='2MB', mount=True, set_shmmax=False):
1513 """Enable hugepages on system.
1514
1515 Args:
1516@@ -49,6 +51,11 @@
1517 'vm.max_map_count': max_map_count,
1518 'vm.hugetlb_shm_group': gid,
1519 }
1520+ if set_shmmax:
1521+ shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
1522+ shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
1523+ if shmmax_minsize > shmmax_current:
1524+ sysctl_settings['kernel.shmmax'] = shmmax_minsize
1525 sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
1526 mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
1527 lfstab = fstab.Fstab()
1528
1529=== added file 'hooks/charmhelpers/core/kernel.py'
1530--- hooks/charmhelpers/core/kernel.py 1970-01-01 00:00:00 +0000
1531+++ hooks/charmhelpers/core/kernel.py 2015-09-25 16:13:14 +0000
1532@@ -0,0 +1,68 @@
1533+#!/usr/bin/env python
1534+# -*- coding: utf-8 -*-
1535+
1536+# Copyright 2014-2015 Canonical Limited.
1537+#
1538+# This file is part of charm-helpers.
1539+#
1540+# charm-helpers is free software: you can redistribute it and/or modify
1541+# it under the terms of the GNU Lesser General Public License version 3 as
1542+# published by the Free Software Foundation.
1543+#
1544+# charm-helpers is distributed in the hope that it will be useful,
1545+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1546+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1547+# GNU Lesser General Public License for more details.
1548+#
1549+# You should have received a copy of the GNU Lesser General Public License
1550+# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
1551+
1552+__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
1553+
1554+from charmhelpers.core.hookenv import (
1555+ log,
1556+ INFO
1557+)
1558+
1559+from subprocess import check_call, check_output
1560+import re
1561+
1562+
1563+def modprobe(module, persist=True):
1564+ """Load a kernel module and configure for auto-load on reboot."""
1565+ cmd = ['modprobe', module]
1566+
1567+ log('Loading kernel module %s' % module, level=INFO)
1568+
1569+ check_call(cmd)
1570+ if persist:
1571+ with open('/etc/modules', 'r+') as modules:
1572+ if module not in modules.read():
1573+ modules.write(module)
1574+
1575+
1576+def rmmod(module, force=False):
1577+ """Remove a module from the linux kernel"""
1578+ cmd = ['rmmod']
1579+ if force:
1580+ cmd.append('-f')
1581+ cmd.append(module)
1582+ log('Removing kernel module %s' % module, level=INFO)
1583+ return check_call(cmd)
1584+
1585+
1586+def lsmod():
1587+ """Shows what kernel modules are currently loaded"""
1588+ return check_output(['lsmod'],
1589+ universal_newlines=True)
1590+
1591+
1592+def is_module_loaded(module):
1593+ """Checks if a kernel module is already loaded"""
1594+ matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
1595+ return len(matches) > 0
1596+
1597+
1598+def update_initramfs(version='all'):
1599+ """Updates an initramfs image"""
1600+ return check_call(["update-initramfs", "-k", version, "-u"])
1601
1602=== modified file 'hooks/charmhelpers/core/strutils.py'
1603--- hooks/charmhelpers/core/strutils.py 2015-04-16 20:07:38 +0000
1604+++ hooks/charmhelpers/core/strutils.py 2015-09-25 16:13:14 +0000
1605@@ -18,6 +18,7 @@
1606 # along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
1607
1608 import six
1609+import re
1610
1611
1612 def bool_from_string(value):
1613@@ -40,3 +41,32 @@
1614
1615 msg = "Unable to interpret string value '%s' as boolean" % (value)
1616 raise ValueError(msg)
1617+
1618+
1619+def bytes_from_string(value):
1620+ """Interpret human readable string value as bytes.
1621+
1622+ Returns int
1623+ """
1624+ BYTE_POWER = {
1625+ 'K': 1,
1626+ 'KB': 1,
1627+ 'M': 2,
1628+ 'MB': 2,
1629+ 'G': 3,
1630+ 'GB': 3,
1631+ 'T': 4,
1632+ 'TB': 4,
1633+ 'P': 5,
1634+ 'PB': 5,
1635+ }
1636+ if isinstance(value, six.string_types):
1637+ value = six.text_type(value)
1638+ else:
1639+ msg = "Unable to interpret non-string value '%s' as boolean" % (value)
1640+ raise ValueError(msg)
1641+ matches = re.match("([0-9]+)([a-zA-Z]+)", value)
1642+ if not matches:
1643+ msg = "Unable to interpret string value '%s' as bytes" % (value)
1644+ raise ValueError(msg)
1645+ return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
1646
1647=== modified file 'tests/charmhelpers/contrib/amulet/deployment.py'
1648--- tests/charmhelpers/contrib/amulet/deployment.py 2015-01-23 11:08:26 +0000
1649+++ tests/charmhelpers/contrib/amulet/deployment.py 2015-09-25 16:13:14 +0000
1650@@ -51,7 +51,8 @@
1651 if 'units' not in this_service:
1652 this_service['units'] = 1
1653
1654- self.d.add(this_service['name'], units=this_service['units'])
1655+ self.d.add(this_service['name'], units=this_service['units'],
1656+ constraints=this_service.get('constraints'))
1657
1658 for svc in other_services:
1659 if 'location' in svc:
1660@@ -64,7 +65,8 @@
1661 if 'units' not in svc:
1662 svc['units'] = 1
1663
1664- self.d.add(svc['name'], charm=branch_location, units=svc['units'])
1665+ self.d.add(svc['name'], charm=branch_location, units=svc['units'],
1666+ constraints=svc.get('constraints'))
1667
1668 def _add_relations(self, relations):
1669 """Add all of the relations for the services."""
1670
1671=== modified file 'tests/charmhelpers/contrib/amulet/utils.py'
1672--- tests/charmhelpers/contrib/amulet/utils.py 2015-08-18 21:16:23 +0000
1673+++ tests/charmhelpers/contrib/amulet/utils.py 2015-09-25 16:13:14 +0000
1674@@ -19,9 +19,11 @@
1675 import logging
1676 import os
1677 import re
1678+import socket
1679 import subprocess
1680 import sys
1681 import time
1682+import uuid
1683
1684 import amulet
1685 import distro_info
1686@@ -114,7 +116,7 @@
1687 # /!\ DEPRECATION WARNING (beisner):
1688 # New and existing tests should be rewritten to use
1689 # validate_services_by_name() as it is aware of init systems.
1690- self.log.warn('/!\\ DEPRECATION WARNING: use '
1691+ self.log.warn('DEPRECATION WARNING: use '
1692 'validate_services_by_name instead of validate_services '
1693 'due to init system differences.')
1694
1695@@ -269,33 +271,52 @@
1696 """Get last modification time of directory."""
1697 return sentry_unit.directory_stat(directory)['mtime']
1698
1699- def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
1700- """Get process' start time.
1701-
1702- Determine start time of the process based on the last modification
1703- time of the /proc/pid directory. If pgrep_full is True, the process
1704- name is matched against the full command line.
1705- """
1706- if pgrep_full:
1707- cmd = 'pgrep -o -f {}'.format(service)
1708- else:
1709- cmd = 'pgrep -o {}'.format(service)
1710- cmd = cmd + ' | grep -v pgrep || exit 0'
1711- cmd_out = sentry_unit.run(cmd)
1712- self.log.debug('CMDout: ' + str(cmd_out))
1713- if cmd_out[0]:
1714- self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
1715- proc_dir = '/proc/{}'.format(cmd_out[0].strip())
1716- return self._get_dir_mtime(sentry_unit, proc_dir)
1717+ def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
1718+ """Get start time of a process based on the last modification time
1719+ of the /proc/pid directory.
1720+
1721+ :sentry_unit: The sentry unit to check for the service on
1722+ :service: service name to look for in process table
1723+ :pgrep_full: [Deprecated] Use full command line search mode with pgrep
1724+ :returns: epoch time of service process start
1725+ :param commands: list of bash commands
1726+ :param sentry_units: list of sentry unit pointers
1727+ :returns: None if successful; Failure message otherwise
1728+ """
1729+ if pgrep_full is not None:
1730+ # /!\ DEPRECATION WARNING (beisner):
1731+ # No longer implemented, as pidof is now used instead of pgrep.
1732+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1733+ self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
1734+ 'longer implemented re: lp 1474030.')
1735+
1736+ pid_list = self.get_process_id_list(sentry_unit, service)
1737+ pid = pid_list[0]
1738+ proc_dir = '/proc/{}'.format(pid)
1739+ self.log.debug('Pid for {} on {}: {}'.format(
1740+ service, sentry_unit.info['unit_name'], pid))
1741+
1742+ return self._get_dir_mtime(sentry_unit, proc_dir)
1743
1744 def service_restarted(self, sentry_unit, service, filename,
1745- pgrep_full=False, sleep_time=20):
1746+ pgrep_full=None, sleep_time=20):
1747 """Check if service was restarted.
1748
1749 Compare a service's start time vs a file's last modification time
1750 (such as a config file for that service) to determine if the service
1751 has been restarted.
1752 """
1753+ # /!\ DEPRECATION WARNING (beisner):
1754+ # This method is prone to races in that no before-time is known.
1755+ # Use validate_service_config_changed instead.
1756+
1757+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1758+ # used instead of pgrep. pgrep_full is still passed through to ensure
1759+ # deprecation WARNS. lp1474030
1760+ self.log.warn('DEPRECATION WARNING: use '
1761+ 'validate_service_config_changed instead of '
1762+ 'service_restarted due to known races.')
1763+
1764 time.sleep(sleep_time)
1765 if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
1766 self._get_file_mtime(sentry_unit, filename)):
1767@@ -304,78 +325,122 @@
1768 return False
1769
1770 def service_restarted_since(self, sentry_unit, mtime, service,
1771- pgrep_full=False, sleep_time=20,
1772- retry_count=2):
1773+ pgrep_full=None, sleep_time=20,
1774+ retry_count=30, retry_sleep_time=10):
1775 """Check if service was been started after a given time.
1776
1777 Args:
1778 sentry_unit (sentry): The sentry unit to check for the service on
1779 mtime (float): The epoch time to check against
1780 service (string): service name to look for in process table
1781- pgrep_full (boolean): Use full command line search mode with pgrep
1782- sleep_time (int): Seconds to sleep before looking for process
1783- retry_count (int): If service is not found, how many times to retry
1784+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1785+ sleep_time (int): Initial sleep time (s) before looking for file
1786+ retry_sleep_time (int): Time (s) to sleep between retries
1787+ retry_count (int): If file is not found, how many times to retry
1788
1789 Returns:
1790 bool: True if service found and its start time it newer than mtime,
1791 False if service is older than mtime or if service was
1792 not found.
1793 """
1794- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1795+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1796+ # used instead of pgrep. pgrep_full is still passed through to ensure
1797+ # deprecation WARNS. lp1474030
1798+
1799+ unit_name = sentry_unit.info['unit_name']
1800+ self.log.debug('Checking that %s service restarted since %s on '
1801+ '%s' % (service, mtime, unit_name))
1802 time.sleep(sleep_time)
1803- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1804- pgrep_full)
1805- while retry_count > 0 and not proc_start_time:
1806- self.log.debug('No pid file found for service %s, will retry %i '
1807- 'more times' % (service, retry_count))
1808- time.sleep(30)
1809- proc_start_time = self._get_proc_start_time(sentry_unit, service,
1810- pgrep_full)
1811- retry_count = retry_count - 1
1812+ proc_start_time = None
1813+ tries = 0
1814+ while tries <= retry_count and not proc_start_time:
1815+ try:
1816+ proc_start_time = self._get_proc_start_time(sentry_unit,
1817+ service,
1818+ pgrep_full)
1819+ self.log.debug('Attempt {} to get {} proc start time on {} '
1820+ 'OK'.format(tries, service, unit_name))
1821+ except IOError as e:
1822+ # NOTE(beisner) - race avoidance, proc may not exist yet.
1823+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1824+ self.log.debug('Attempt {} to get {} proc start time on {} '
1825+ 'failed\n{}'.format(tries, service,
1826+ unit_name, e))
1827+ time.sleep(retry_sleep_time)
1828+ tries += 1
1829
1830 if not proc_start_time:
1831 self.log.warn('No proc start time found, assuming service did '
1832 'not start')
1833 return False
1834 if proc_start_time >= mtime:
1835- self.log.debug('proc start time is newer than provided mtime'
1836- '(%s >= %s)' % (proc_start_time, mtime))
1837+ self.log.debug('Proc start time is newer than provided mtime'
1838+ '(%s >= %s) on %s (OK)' % (proc_start_time,
1839+ mtime, unit_name))
1840 return True
1841 else:
1842- self.log.warn('proc start time (%s) is older than provided mtime '
1843- '(%s), service did not restart' % (proc_start_time,
1844- mtime))
1845+ self.log.warn('Proc start time (%s) is older than provided mtime '
1846+ '(%s) on %s, service did not '
1847+ 'restart' % (proc_start_time, mtime, unit_name))
1848 return False
1849
1850 def config_updated_since(self, sentry_unit, filename, mtime,
1851- sleep_time=20):
1852+ sleep_time=20, retry_count=30,
1853+ retry_sleep_time=10):
1854 """Check if file was modified after a given time.
1855
1856 Args:
1857 sentry_unit (sentry): The sentry unit to check the file mtime on
1858 filename (string): The file to check mtime of
1859 mtime (float): The epoch time to check against
1860- sleep_time (int): Seconds to sleep before looking for process
1861+ sleep_time (int): Initial sleep time (s) before looking for file
1862+ retry_sleep_time (int): Time (s) to sleep between retries
1863+ retry_count (int): If file is not found, how many times to retry
1864
1865 Returns:
1866 bool: True if file was modified more recently than mtime, False if
1867- file was modified before mtime,
1868+ file was modified before mtime, or if file not found.
1869 """
1870- self.log.debug('Checking %s updated since %s' % (filename, mtime))
1871+ unit_name = sentry_unit.info['unit_name']
1872+ self.log.debug('Checking that %s updated since %s on '
1873+ '%s' % (filename, mtime, unit_name))
1874 time.sleep(sleep_time)
1875- file_mtime = self._get_file_mtime(sentry_unit, filename)
1876+ file_mtime = None
1877+ tries = 0
1878+ while tries <= retry_count and not file_mtime:
1879+ try:
1880+ file_mtime = self._get_file_mtime(sentry_unit, filename)
1881+ self.log.debug('Attempt {} to get {} file mtime on {} '
1882+ 'OK'.format(tries, filename, unit_name))
1883+ except IOError as e:
1884+ # NOTE(beisner) - race avoidance, file may not exist yet.
1885+ # https://bugs.launchpad.net/charm-helpers/+bug/1474030
1886+ self.log.debug('Attempt {} to get {} file mtime on {} '
1887+ 'failed\n{}'.format(tries, filename,
1888+ unit_name, e))
1889+ time.sleep(retry_sleep_time)
1890+ tries += 1
1891+
1892+ if not file_mtime:
1893+ self.log.warn('Could not determine file mtime, assuming '
1894+ 'file does not exist')
1895+ return False
1896+
1897 if file_mtime >= mtime:
1898 self.log.debug('File mtime is newer than provided mtime '
1899- '(%s >= %s)' % (file_mtime, mtime))
1900+ '(%s >= %s) on %s (OK)' % (file_mtime,
1901+ mtime, unit_name))
1902 return True
1903 else:
1904- self.log.warn('File mtime %s is older than provided mtime %s'
1905- % (file_mtime, mtime))
1906+ self.log.warn('File mtime is older than provided mtime'
1907+ '(%s < on %s) on %s' % (file_mtime,
1908+ mtime, unit_name))
1909 return False
1910
1911 def validate_service_config_changed(self, sentry_unit, mtime, service,
1912- filename, pgrep_full=False,
1913- sleep_time=20, retry_count=2):
1914+ filename, pgrep_full=None,
1915+ sleep_time=20, retry_count=30,
1916+ retry_sleep_time=10):
1917 """Check service and file were updated after mtime
1918
1919 Args:
1920@@ -383,9 +448,10 @@
1921 mtime (float): The epoch time to check against
1922 service (string): service name to look for in process table
1923 filename (string): The file to check mtime of
1924- pgrep_full (boolean): Use full command line search mode with pgrep
1925- sleep_time (int): Seconds to sleep before looking for process
1926+ pgrep_full: [Deprecated] Use full command line search mode with pgrep
1927+ sleep_time (int): Initial sleep in seconds to pass to test helpers
1928 retry_count (int): If service is not found, how many times to retry
1929+ retry_sleep_time (int): Time in seconds to wait between retries
1930
1931 Typical Usage:
1932 u = OpenStackAmuletUtils(ERROR)
1933@@ -402,15 +468,27 @@
1934 mtime, False if service is older than mtime or if service was
1935 not found or if filename was modified before mtime.
1936 """
1937- self.log.debug('Checking %s restarted since %s' % (service, mtime))
1938- time.sleep(sleep_time)
1939- service_restart = self.service_restarted_since(sentry_unit, mtime,
1940- service,
1941- pgrep_full=pgrep_full,
1942- sleep_time=0,
1943- retry_count=retry_count)
1944- config_update = self.config_updated_since(sentry_unit, filename, mtime,
1945- sleep_time=0)
1946+
1947+ # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
1948+ # used instead of pgrep. pgrep_full is still passed through to ensure
1949+ # deprecation WARNS. lp1474030
1950+
1951+ service_restart = self.service_restarted_since(
1952+ sentry_unit, mtime,
1953+ service,
1954+ pgrep_full=pgrep_full,
1955+ sleep_time=sleep_time,
1956+ retry_count=retry_count,
1957+ retry_sleep_time=retry_sleep_time)
1958+
1959+ config_update = self.config_updated_since(
1960+ sentry_unit,
1961+ filename,
1962+ mtime,
1963+ sleep_time=sleep_time,
1964+ retry_count=retry_count,
1965+ retry_sleep_time=retry_sleep_time)
1966+
1967 return service_restart and config_update
1968
1969 def get_sentry_time(self, sentry_unit):
1970@@ -428,7 +506,6 @@
1971 """Return a list of all Ubuntu releases in order of release."""
1972 _d = distro_info.UbuntuDistroInfo()
1973 _release_list = _d.all
1974- self.log.debug('Ubuntu release list: {}'.format(_release_list))
1975 return _release_list
1976
1977 def file_to_url(self, file_rel_path):
1978@@ -568,6 +645,142 @@
1979
1980 return None
1981
1982+ def validate_sectionless_conf(self, file_contents, expected):
1983+ """A crude conf parser. Useful to inspect configuration files which
1984+ do not have section headers (as would be necessary in order to use
1985+ the configparser). Such as openstack-dashboard or rabbitmq confs."""
1986+ for line in file_contents.split('\n'):
1987+ if '=' in line:
1988+ args = line.split('=')
1989+ if len(args) <= 1:
1990+ continue
1991+ key = args[0].strip()
1992+ value = args[1].strip()
1993+ if key in expected.keys():
1994+ if expected[key] != value:
1995+ msg = ('Config mismatch. Expected, actual: {}, '
1996+ '{}'.format(expected[key], value))
1997+ amulet.raise_status(amulet.FAIL, msg=msg)
1998+
1999+ def get_unit_hostnames(self, units):
2000+ """Return a dict of juju unit names to hostnames."""
2001+ host_names = {}
2002+ for unit in units:
2003+ host_names[unit.info['unit_name']] = \
2004+ str(unit.file_contents('/etc/hostname').strip())
2005+ self.log.debug('Unit host names: {}'.format(host_names))
2006+ return host_names
2007+
2008+ def run_cmd_unit(self, sentry_unit, cmd):
2009+ """Run a command on a unit, return the output and exit code."""
2010+ output, code = sentry_unit.run(cmd)
2011+ if code == 0:
2012+ self.log.debug('{} `{}` command returned {} '
2013+ '(OK)'.format(sentry_unit.info['unit_name'],
2014+ cmd, code))
2015+ else:
2016+ msg = ('{} `{}` command returned {} '
2017+ '{}'.format(sentry_unit.info['unit_name'],
2018+ cmd, code, output))
2019+ amulet.raise_status(amulet.FAIL, msg=msg)
2020+ return str(output), code
2021+
2022+ def file_exists_on_unit(self, sentry_unit, file_name):
2023+ """Check if a file exists on a unit."""
2024+ try:
2025+ sentry_unit.file_stat(file_name)
2026+ return True
2027+ except IOError:
2028+ return False
2029+ except Exception as e:
2030+ msg = 'Error checking file {}: {}'.format(file_name, e)
2031+ amulet.raise_status(amulet.FAIL, msg=msg)
2032+
2033+ def file_contents_safe(self, sentry_unit, file_name,
2034+ max_wait=60, fatal=False):
2035+ """Get file contents from a sentry unit. Wrap amulet file_contents
2036+ with retry logic to address races where a file checks as existing,
2037+ but no longer exists by the time file_contents is called.
2038+ Return None if file not found. Optionally raise if fatal is True."""
2039+ unit_name = sentry_unit.info['unit_name']
2040+ file_contents = False
2041+ tries = 0
2042+ while not file_contents and tries < (max_wait / 4):
2043+ try:
2044+ file_contents = sentry_unit.file_contents(file_name)
2045+ except IOError:
2046+ self.log.debug('Attempt {} to open file {} from {} '
2047+ 'failed'.format(tries, file_name,
2048+ unit_name))
2049+ time.sleep(4)
2050+ tries += 1
2051+
2052+ if file_contents:
2053+ return file_contents
2054+ elif not fatal:
2055+ return None
2056+ elif fatal:
2057+ msg = 'Failed to get file contents from unit.'
2058+ amulet.raise_status(amulet.FAIL, msg)
2059+
2060+ def port_knock_tcp(self, host="localhost", port=22, timeout=15):
2061+ """Open a TCP socket to check for a listening sevice on a host.
2062+
2063+ :param host: host name or IP address, default to localhost
2064+ :param port: TCP port number, default to 22
2065+ :param timeout: Connect timeout, default to 15 seconds
2066+ :returns: True if successful, False if connect failed
2067+ """
2068+
2069+ # Resolve host name if possible
2070+ try:
2071+ connect_host = socket.gethostbyname(host)
2072+ host_human = "{} ({})".format(connect_host, host)
2073+ except socket.error as e:
2074+ self.log.warn('Unable to resolve address: '
2075+ '{} ({}) Trying anyway!'.format(host, e))
2076+ connect_host = host
2077+ host_human = connect_host
2078+
2079+ # Attempt socket connection
2080+ try:
2081+ knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2082+ knock.settimeout(timeout)
2083+ knock.connect((connect_host, port))
2084+ knock.close()
2085+ self.log.debug('Socket connect OK for host '
2086+ '{} on port {}.'.format(host_human, port))
2087+ return True
2088+ except socket.error as e:
2089+ self.log.debug('Socket connect FAIL for'
2090+ ' {} port {} ({})'.format(host_human, port, e))
2091+ return False
2092+
2093+ def port_knock_units(self, sentry_units, port=22,
2094+ timeout=15, expect_success=True):
2095+ """Open a TCP socket to check for a listening sevice on each
2096+ listed juju unit.
2097+
2098+ :param sentry_units: list of sentry unit pointers
2099+ :param port: TCP port number, default to 22
2100+ :param timeout: Connect timeout, default to 15 seconds
2101+ :expect_success: True by default, set False to invert logic
2102+ :returns: None if successful, Failure message otherwise
2103+ """
2104+ for unit in sentry_units:
2105+ host = unit.info['public-address']
2106+ connected = self.port_knock_tcp(host, port, timeout)
2107+ if not connected and expect_success:
2108+ return 'Socket connect failed.'
2109+ elif connected and not expect_success:
2110+ return 'Socket connected unexpectedly.'
2111+
2112+ def get_uuid_epoch_stamp(self):
2113+ """Returns a stamp string based on uuid4 and epoch time. Useful in
2114+ generating test messages which need to be unique-ish."""
2115+ return '[{}-{}]'.format(uuid.uuid4(), time.time())
2116+
2117+# amulet juju action helpers:
2118 def run_action(self, unit_sentry, action,
2119 _check_output=subprocess.check_output):
2120 """Run the named action on a given unit sentry.
2121@@ -594,3 +807,12 @@
2122 output = _check_output(command, universal_newlines=True)
2123 data = json.loads(output)
2124 return data.get(u"status") == "completed"
2125+
2126+ def status_get(self, unit):
2127+ """Return the current service status of this unit."""
2128+ raw_status, return_code = unit.run(
2129+ "status-get --format=json --include-data")
2130+ if return_code != 0:
2131+ return ("unknown", "")
2132+ status = json.loads(raw_status)
2133+ return (status["status"], status["message"])
2134
2135=== modified file 'tests/charmhelpers/contrib/openstack/amulet/deployment.py'
2136--- tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-18 21:16:23 +0000
2137+++ tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-25 16:13:14 +0000
2138@@ -44,20 +44,31 @@
2139 Determine if the local branch being tested is derived from its
2140 stable or next (dev) branch, and based on this, use the corresonding
2141 stable or next branches for the other_services."""
2142+
2143+ # Charms outside the lp:~openstack-charmers namespace
2144 base_charms = ['mysql', 'mongodb', 'nrpe']
2145
2146+ # Force these charms to current series even when using an older series.
2147+ # ie. Use trusty/nrpe even when series is precise, as the P charm
2148+ # does not possess the necessary external master config and hooks.
2149+ force_series_current = ['nrpe']
2150+
2151 if self.series in ['precise', 'trusty']:
2152 base_series = self.series
2153 else:
2154 base_series = self.current_next
2155
2156- if self.stable:
2157- for svc in other_services:
2158+ for svc in other_services:
2159+ if svc['name'] in force_series_current:
2160+ base_series = self.current_next
2161+ # If a location has been explicitly set, use it
2162+ if svc.get('location'):
2163+ continue
2164+ if self.stable:
2165 temp = 'lp:charms/{}/{}'
2166 svc['location'] = temp.format(base_series,
2167 svc['name'])
2168- else:
2169- for svc in other_services:
2170+ else:
2171 if svc['name'] in base_charms:
2172 temp = 'lp:charms/{}/{}'
2173 svc['location'] = temp.format(base_series,
2174@@ -66,6 +77,7 @@
2175 temp = 'lp:~openstack-charmers/charms/{}/{}/next'
2176 svc['location'] = temp.format(self.current_next,
2177 svc['name'])
2178+
2179 return other_services
2180
2181 def _add_services(self, this_service, other_services):
2182@@ -77,21 +89,23 @@
2183
2184 services = other_services
2185 services.append(this_service)
2186+
2187+ # Charms which should use the source config option
2188 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
2189 'ceph-osd', 'ceph-radosgw']
2190- # Most OpenStack subordinate charms do not expose an origin option
2191- # as that is controlled by the principle.
2192- ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
2193+
2194+ # Charms which can not use openstack-origin, ie. many subordinates
2195+ no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
2196
2197 if self.openstack:
2198 for svc in services:
2199- if svc['name'] not in use_source + ignore:
2200+ if svc['name'] not in use_source + no_origin:
2201 config = {'openstack-origin': self.openstack}
2202 self.d.configure(svc['name'], config)
2203
2204 if self.source:
2205 for svc in services:
2206- if svc['name'] in use_source and svc['name'] not in ignore:
2207+ if svc['name'] in use_source and svc['name'] not in no_origin:
2208 config = {'source': self.source}
2209 self.d.configure(svc['name'], config)
2210
2211
2212=== modified file 'tests/charmhelpers/contrib/openstack/amulet/utils.py'
2213--- tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-07-16 20:18:08 +0000
2214+++ tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-25 16:13:14 +0000
2215@@ -27,6 +27,7 @@
2216 import heatclient.v1.client as heat_client
2217 import keystoneclient.v2_0 as keystone_client
2218 import novaclient.v1_1.client as nova_client
2219+import pika
2220 import swiftclient
2221
2222 from charmhelpers.contrib.amulet.utils import (
2223@@ -602,3 +603,361 @@
2224 self.log.debug('Ceph {} samples (OK): '
2225 '{}'.format(sample_type, samples))
2226 return None
2227+
2228+# rabbitmq/amqp specific helpers:
2229+ def add_rmq_test_user(self, sentry_units,
2230+ username="testuser1", password="changeme"):
2231+ """Add a test user via the first rmq juju unit, check connection as
2232+ the new user against all sentry units.
2233+
2234+ :param sentry_units: list of sentry unit pointers
2235+ :param username: amqp user name, default to testuser1
2236+ :param password: amqp user password
2237+ :returns: None if successful. Raise on error.
2238+ """
2239+ self.log.debug('Adding rmq user ({})...'.format(username))
2240+
2241+ # Check that user does not already exist
2242+ cmd_user_list = 'rabbitmqctl list_users'
2243+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
2244+ if username in output:
2245+ self.log.warning('User ({}) already exists, returning '
2246+ 'gracefully.'.format(username))
2247+ return
2248+
2249+ perms = '".*" ".*" ".*"'
2250+ cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
2251+ 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
2252+
2253+ # Add user via first unit
2254+ for cmd in cmds:
2255+ output, _ = self.run_cmd_unit(sentry_units[0], cmd)
2256+
2257+ # Check connection against the other sentry_units
2258+ self.log.debug('Checking user connect against units...')
2259+ for sentry_unit in sentry_units:
2260+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
2261+ username=username,
2262+ password=password)
2263+ connection.close()
2264+
2265+ def delete_rmq_test_user(self, sentry_units, username="testuser1"):
2266+ """Delete a rabbitmq user via the first rmq juju unit.
2267+
2268+ :param sentry_units: list of sentry unit pointers
2269+ :param username: amqp user name, default to testuser1
2270+ :param password: amqp user password
2271+ :returns: None if successful or no such user.
2272+ """
2273+ self.log.debug('Deleting rmq user ({})...'.format(username))
2274+
2275+ # Check that the user exists
2276+ cmd_user_list = 'rabbitmqctl list_users'
2277+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
2278+
2279+ if username not in output:
2280+ self.log.warning('User ({}) does not exist, returning '
2281+ 'gracefully.'.format(username))
2282+ return
2283+
2284+ # Delete the user
2285+ cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
2286+ output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
2287+
2288+ def get_rmq_cluster_status(self, sentry_unit):
2289+ """Execute rabbitmq cluster status command on a unit and return
2290+ the full output.
2291+
2292+ :param unit: sentry unit
2293+ :returns: String containing console output of cluster status command
2294+ """
2295+ cmd = 'rabbitmqctl cluster_status'
2296+ output, _ = self.run_cmd_unit(sentry_unit, cmd)
2297+ self.log.debug('{} cluster_status:\n{}'.format(
2298+ sentry_unit.info['unit_name'], output))
2299+ return str(output)
2300+
2301+ def get_rmq_cluster_running_nodes(self, sentry_unit):
2302+ """Parse rabbitmqctl cluster_status output string, return list of
2303+ running rabbitmq cluster nodes.
2304+
2305+ :param unit: sentry unit
2306+ :returns: List containing node names of running nodes
2307+ """
2308+ # NOTE(beisner): rabbitmqctl cluster_status output is not
2309+ # json-parsable, do string chop foo, then json.loads that.
2310+ str_stat = self.get_rmq_cluster_status(sentry_unit)
2311+ if 'running_nodes' in str_stat:
2312+ pos_start = str_stat.find("{running_nodes,") + 15
2313+ pos_end = str_stat.find("]},", pos_start) + 1
2314+ str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
2315+ run_nodes = json.loads(str_run_nodes)
2316+ return run_nodes
2317+ else:
2318+ return []
2319+
2320+ def validate_rmq_cluster_running_nodes(self, sentry_units):
2321+ """Check that all rmq unit hostnames are represented in the
2322+ cluster_status output of all units.
2323+
2324+ :param host_names: dict of juju unit names to host names
2325+ :param units: list of sentry unit pointers (all rmq units)
2326+ :returns: None if successful, otherwise return error message
2327+ """
2328+ host_names = self.get_unit_hostnames(sentry_units)
2329+ errors = []
2330+
2331+ # Query every unit for cluster_status running nodes
2332+ for query_unit in sentry_units:
2333+ query_unit_name = query_unit.info['unit_name']
2334+ running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
2335+
2336+ # Confirm that every unit is represented in the queried unit's
2337+ # cluster_status running nodes output.
2338+ for validate_unit in sentry_units:
2339+ val_host_name = host_names[validate_unit.info['unit_name']]
2340+ val_node_name = 'rabbit@{}'.format(val_host_name)
2341+
2342+ if val_node_name not in running_nodes:
2343+ errors.append('Cluster member check failed on {}: {} not '
2344+ 'in {}\n'.format(query_unit_name,
2345+ val_node_name,
2346+ running_nodes))
2347+ if errors:
2348+ return ''.join(errors)
2349+
2350+ def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
2351+ """Check a single juju rmq unit for ssl and port in the config file."""
2352+ host = sentry_unit.info['public-address']
2353+ unit_name = sentry_unit.info['unit_name']
2354+
2355+ conf_file = '/etc/rabbitmq/rabbitmq.config'
2356+ conf_contents = str(self.file_contents_safe(sentry_unit,
2357+ conf_file, max_wait=16))
2358+ # Checks
2359+ conf_ssl = 'ssl' in conf_contents
2360+ conf_port = str(port) in conf_contents
2361+
2362+ # Port explicitly checked in config
2363+ if port and conf_port and conf_ssl:
2364+ self.log.debug('SSL is enabled @{}:{} '
2365+ '({})'.format(host, port, unit_name))
2366+ return True
2367+ elif port and not conf_port and conf_ssl:
2368+ self.log.debug('SSL is enabled @{} but not on port {} '
2369+ '({})'.format(host, port, unit_name))
2370+ return False
2371+ # Port not checked (useful when checking that ssl is disabled)
2372+ elif not port and conf_ssl:
2373+ self.log.debug('SSL is enabled @{}:{} '
2374+ '({})'.format(host, port, unit_name))
2375+ return True
2376+ elif not port and not conf_ssl:
2377+ self.log.debug('SSL not enabled @{}:{} '
2378+ '({})'.format(host, port, unit_name))
2379+ return False
2380+ else:
2381+ msg = ('Unknown condition when checking SSL status @{}:{} '
2382+ '({})'.format(host, port, unit_name))
2383+ amulet.raise_status(amulet.FAIL, msg)
2384+
2385+ def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
2386+ """Check that ssl is enabled on rmq juju sentry units.
2387+
2388+ :param sentry_units: list of all rmq sentry units
2389+ :param port: optional ssl port override to validate
2390+ :returns: None if successful, otherwise return error message
2391+ """
2392+ for sentry_unit in sentry_units:
2393+ if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
2394+ return ('Unexpected condition: ssl is disabled on unit '
2395+ '({})'.format(sentry_unit.info['unit_name']))
2396+ return None
2397+
2398+ def validate_rmq_ssl_disabled_units(self, sentry_units):
2399+ """Check that ssl is enabled on listed rmq juju sentry units.
2400+
2401+ :param sentry_units: list of all rmq sentry units
2402+ :returns: True if successful. Raise on error.
2403+ """
2404+ for sentry_unit in sentry_units:
2405+ if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
2406+ return ('Unexpected condition: ssl is enabled on unit '
2407+ '({})'.format(sentry_unit.info['unit_name']))
2408+ return None
2409+
2410+ def configure_rmq_ssl_on(self, sentry_units, deployment,
2411+ port=None, max_wait=60):
2412+ """Turn ssl charm config option on, with optional non-default
2413+ ssl port specification. Confirm that it is enabled on every
2414+ unit.
2415+
2416+ :param sentry_units: list of sentry units
2417+ :param deployment: amulet deployment object pointer
2418+ :param port: amqp port, use defaults if None
2419+ :param max_wait: maximum time to wait in seconds to confirm
2420+ :returns: None if successful. Raise on error.
2421+ """
2422+ self.log.debug('Setting ssl charm config option: on')
2423+
2424+ # Enable RMQ SSL
2425+ config = {'ssl': 'on'}
2426+ if port:
2427+ config['ssl_port'] = port
2428+
2429+ deployment.configure('rabbitmq-server', config)
2430+
2431+ # Confirm
2432+ tries = 0
2433+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
2434+ while ret and tries < (max_wait / 4):
2435+ time.sleep(4)
2436+ self.log.debug('Attempt {}: {}'.format(tries, ret))
2437+ ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
2438+ tries += 1
2439+
2440+ if ret:
2441+ amulet.raise_status(amulet.FAIL, ret)
2442+
2443+ def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
2444+ """Turn ssl charm config option off, confirm that it is disabled
2445+ on every unit.
2446+
2447+ :param sentry_units: list of sentry units
2448+ :param deployment: amulet deployment object pointer
2449+ :param max_wait: maximum time to wait in seconds to confirm
2450+ :returns: None if successful. Raise on error.
2451+ """
2452+ self.log.debug('Setting ssl charm config option: off')
2453+
2454+ # Disable RMQ SSL
2455+ config = {'ssl': 'off'}
2456+ deployment.configure('rabbitmq-server', config)
2457+
2458+ # Confirm
2459+ tries = 0
2460+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
2461+ while ret and tries < (max_wait / 4):
2462+ time.sleep(4)
2463+ self.log.debug('Attempt {}: {}'.format(tries, ret))
2464+ ret = self.validate_rmq_ssl_disabled_units(sentry_units)
2465+ tries += 1
2466+
2467+ if ret:
2468+ amulet.raise_status(amulet.FAIL, ret)
2469+
2470+ def connect_amqp_by_unit(self, sentry_unit, ssl=False,
2471+ port=None, fatal=True,
2472+ username="testuser1", password="changeme"):
2473+ """Establish and return a pika amqp connection to the rabbitmq service
2474+ running on a rmq juju unit.
2475+
2476+ :param sentry_unit: sentry unit pointer
2477+ :param ssl: boolean, default to False
2478+ :param port: amqp port, use defaults if None
2479+ :param fatal: boolean, default to True (raises on connect error)
2480+ :param username: amqp user name, default to testuser1
2481+ :param password: amqp user password
2482+ :returns: pika amqp connection pointer or None if failed and non-fatal
2483+ """
2484+ host = sentry_unit.info['public-address']
2485+ unit_name = sentry_unit.info['unit_name']
2486+
2487+ # Default port logic if port is not specified
2488+ if ssl and not port:
2489+ port = 5671
2490+ elif not ssl and not port:
2491+ port = 5672
2492+
2493+ self.log.debug('Connecting to amqp on {}:{} ({}) as '
2494+ '{}...'.format(host, port, unit_name, username))
2495+
2496+ try:
2497+ credentials = pika.PlainCredentials(username, password)
2498+ parameters = pika.ConnectionParameters(host=host, port=port,
2499+ credentials=credentials,
2500+ ssl=ssl,
2501+ connection_attempts=3,
2502+ retry_delay=5,
2503+ socket_timeout=1)
2504+ connection = pika.BlockingConnection(parameters)
2505+ assert connection.server_properties['product'] == 'RabbitMQ'
2506+ self.log.debug('Connect OK')
2507+ return connection
2508+ except Exception as e:
2509+ msg = ('amqp connection failed to {}:{} as '
2510+ '{} ({})'.format(host, port, username, str(e)))
2511+ if fatal:
2512+ amulet.raise_status(amulet.FAIL, msg)
2513+ else:
2514+ self.log.warn(msg)
2515+ return None
2516+
2517+ def publish_amqp_message_by_unit(self, sentry_unit, message,
2518+ queue="test", ssl=False,
2519+ username="testuser1",
2520+ password="changeme",
2521+ port=None):
2522+ """Publish an amqp message to a rmq juju unit.
2523+
2524+ :param sentry_unit: sentry unit pointer
2525+ :param message: amqp message string
2526+ :param queue: message queue, default to test
2527+ :param username: amqp user name, default to testuser1
2528+ :param password: amqp user password
2529+ :param ssl: boolean, default to False
2530+ :param port: amqp port, use defaults if None
2531+ :returns: None. Raises exception if publish failed.
2532+ """
2533+ self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
2534+ message))
2535+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
2536+ port=port,
2537+ username=username,
2538+ password=password)
2539+
2540+ # NOTE(beisner): extra debug here re: pika hang potential:
2541+ # https://github.com/pika/pika/issues/297
2542+ # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
2543+ self.log.debug('Defining channel...')
2544+ channel = connection.channel()
2545+ self.log.debug('Declaring queue...')
2546+ channel.queue_declare(queue=queue, auto_delete=False, durable=True)
2547+ self.log.debug('Publishing message...')
2548+ channel.basic_publish(exchange='', routing_key=queue, body=message)
2549+ self.log.debug('Closing channel...')
2550+ channel.close()
2551+ self.log.debug('Closing connection...')
2552+ connection.close()
2553+
2554+ def get_amqp_message_by_unit(self, sentry_unit, queue="test",
2555+ username="testuser1",
2556+ password="changeme",
2557+ ssl=False, port=None):
2558+ """Get an amqp message from a rmq juju unit.
2559+
2560+ :param sentry_unit: sentry unit pointer
2561+ :param queue: message queue, default to test
2562+ :param username: amqp user name, default to testuser1
2563+ :param password: amqp user password
2564+ :param ssl: boolean, default to False
2565+ :param port: amqp port, use defaults if None
2566+ :returns: amqp message body as string. Raise if get fails.
2567+ """
2568+ connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
2569+ port=port,
2570+ username=username,
2571+ password=password)
2572+ channel = connection.channel()
2573+ method_frame, _, body = channel.basic_get(queue)
2574+
2575+ if method_frame:
2576+ self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
2577+ body))
2578+ channel.basic_ack(method_frame.delivery_tag)
2579+ channel.close()
2580+ connection.close()
2581+ return body
2582+ else:
2583+ msg = 'No message retrieved.'
2584+ amulet.raise_status(amulet.FAIL, msg)

Subscribers

People subscribed via source and target branches