Merge lp:~gnuoy/charms/trusty/ceph/1453940 into lp:~openstack-charmers-archive/charms/trusty/ceph/next

Proposed by Liam Young
Status: Merged
Merged at revision: 117
Proposed branch: lp:~gnuoy/charms/trusty/ceph/1453940
Merge into: lp:~openstack-charmers-archive/charms/trusty/ceph/next
Diff against target: 1195 lines (+885/-62)
7 files modified
hooks/ceph_broker.py (+12/-2)
hooks/charmhelpers/contrib/storage/linux/ceph.py (+224/-2)
hooks/hooks.py (+9/-1)
tests/charmhelpers/contrib/amulet/utils.py (+234/-52)
tests/charmhelpers/contrib/openstack/amulet/deployment.py (+20/-5)
tests/charmhelpers/contrib/openstack/amulet/utils.py (+359/-0)
unit_tests/test_ceph_broker.py (+27/-0)
To merge this branch: bzr merge lp:~gnuoy/charms/trusty/ceph/1453940
Reviewer Review Type Date Requested Status
Edward Hope-Morley Approve
Review via email: mp+268614@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Edward Hope-Morley (hopem) :
Revision history for this message
Edward Hope-Morley (hopem) wrote :

I've tested this out and it seems good. Tried deploying then scaling for
ceph, cinder, glance and nova-compute and all had updated ceph.conf on the
client side. The use of request-id combined with unit-name scoped responses
seems to nicely avoid any collisions as well. I have a few comments on some
minor fixups inline.

review: Needs Fixing
Revision history for this message
Liam Young (gnuoy) :
Revision history for this message
Edward Hope-Morley (hopem) :
Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #8678 ceph-next for gnuoy mp268614
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/8678/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #9384 ceph-next for gnuoy mp268614
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/9384/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6247 ceph-next for gnuoy mp268614
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/6247/

lp:~gnuoy/charms/trusty/ceph/1453940 updated
138. By Liam Young

Charm helper sync

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #8778 ceph-next for gnuoy mp268614
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/8778/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #9538 ceph-next for gnuoy mp268614
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/9538/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6308 ceph-next for gnuoy mp268614
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/6308/

lp:~gnuoy/charms/trusty/ceph/1453940 updated
139. By Liam Young

Charm helper sync

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_lint_check #9710 ceph-next for gnuoy mp268614
    LINT OK: passed

Build: http://10.245.162.77:8080/job/charm_lint_check/9710/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_unit_test #8943 ceph-next for gnuoy mp268614
    UNIT OK: passed

Build: http://10.245.162.77:8080/job/charm_unit_test/8943/

Revision history for this message
uosci-testing-bot (uosci-testing-bot) wrote :

charm_amulet_test #6344 ceph-next for gnuoy mp268614
    AMULET OK: passed

Build: http://10.245.162.77:8080/job/charm_amulet_test/6344/

Revision history for this message
Edward Hope-Morley (hopem) wrote :

LGTM +1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'hooks/ceph_broker.py'
--- hooks/ceph_broker.py 2014-11-19 21:33:12 +0000
+++ hooks/ceph_broker.py 2015-09-10 09:35:21 +0000
@@ -31,10 +31,16 @@
31 This is a versioned api. API version must be supplied by the client making31 This is a versioned api. API version must be supplied by the client making
32 the request.32 the request.
33 """33 """
34 request_id = reqs.get('request-id')
34 try:35 try:
35 version = reqs.get('api-version')36 version = reqs.get('api-version')
36 if version == 1:37 if version == 1:
37 return process_requests_v1(reqs['ops'])38 log('Processing request {}'.format(request_id), level=DEBUG)
39 resp = process_requests_v1(reqs['ops'])
40 if request_id:
41 resp['request-id'] = request_id
42
43 return resp
3844
39 except Exception as exc:45 except Exception as exc:
40 log(str(exc), level=ERROR)46 log(str(exc), level=ERROR)
@@ -44,7 +50,11 @@
44 return {'exit-code': 1, 'stderr': msg}50 return {'exit-code': 1, 'stderr': msg}
4551
46 msg = ("Missing or invalid api version (%s)" % (version))52 msg = ("Missing or invalid api version (%s)" % (version))
47 return {'exit-code': 1, 'stderr': msg}53 resp = {'exit-code': 1, 'stderr': msg}
54 if request_id:
55 resp['request-id'] = request_id
56
57 return resp
4858
4959
50def process_requests_v1(reqs):60def process_requests_v1(reqs):
5161
=== modified file 'hooks/charmhelpers/contrib/storage/linux/ceph.py'
--- hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-07-29 10:48:21 +0000
+++ hooks/charmhelpers/contrib/storage/linux/ceph.py 2015-09-10 09:35:21 +0000
@@ -28,6 +28,7 @@
28import shutil28import shutil
29import json29import json
30import time30import time
31import uuid
3132
32from subprocess import (33from subprocess import (
33 check_call,34 check_call,
@@ -35,8 +36,10 @@
35 CalledProcessError,36 CalledProcessError,
36)37)
37from charmhelpers.core.hookenv import (38from charmhelpers.core.hookenv import (
39 local_unit,
38 relation_get,40 relation_get,
39 relation_ids,41 relation_ids,
42 relation_set,
40 related_units,43 related_units,
41 log,44 log,
42 DEBUG,45 DEBUG,
@@ -411,17 +414,52 @@
411414
412 The API is versioned and defaults to version 1.415 The API is versioned and defaults to version 1.
413 """416 """
414 def __init__(self, api_version=1):417 def __init__(self, api_version=1, request_id=None):
415 self.api_version = api_version418 self.api_version = api_version
419 if request_id:
420 self.request_id = request_id
421 else:
422 self.request_id = str(uuid.uuid1())
416 self.ops = []423 self.ops = []
417424
418 def add_op_create_pool(self, name, replica_count=3):425 def add_op_create_pool(self, name, replica_count=3):
419 self.ops.append({'op': 'create-pool', 'name': name,426 self.ops.append({'op': 'create-pool', 'name': name,
420 'replicas': replica_count})427 'replicas': replica_count})
421428
429 def set_ops(self, ops):
430 """Set request ops to provided value.
431
432 Useful for injecting ops that come from a previous request
433 to allow comparisons to ensure validity.
434 """
435 self.ops = ops
436
422 @property437 @property
423 def request(self):438 def request(self):
424 return json.dumps({'api-version': self.api_version, 'ops': self.ops})439 return json.dumps({'api-version': self.api_version, 'ops': self.ops,
440 'request-id': self.request_id})
441
442 def _ops_equal(self, other):
443 if len(self.ops) == len(other.ops):
444 for req_no in range(0, len(self.ops)):
445 for key in ['replicas', 'name', 'op']:
446 if self.ops[req_no][key] != other.ops[req_no][key]:
447 return False
448 else:
449 return False
450 return True
451
452 def __eq__(self, other):
453 if not isinstance(other, self.__class__):
454 return False
455 if self.api_version == other.api_version and \
456 self._ops_equal(other):
457 return True
458 else:
459 return False
460
461 def __ne__(self, other):
462 return not self.__eq__(other)
425463
426464
427class CephBrokerRsp(object):465class CephBrokerRsp(object):
@@ -431,14 +469,198 @@
431469
432 The API is versioned and defaults to version 1.470 The API is versioned and defaults to version 1.
433 """471 """
472
434 def __init__(self, encoded_rsp):473 def __init__(self, encoded_rsp):
435 self.api_version = None474 self.api_version = None
436 self.rsp = json.loads(encoded_rsp)475 self.rsp = json.loads(encoded_rsp)
437476
438 @property477 @property
478 def request_id(self):
479 return self.rsp.get('request-id')
480
481 @property
439 def exit_code(self):482 def exit_code(self):
440 return self.rsp.get('exit-code')483 return self.rsp.get('exit-code')
441484
442 @property485 @property
443 def exit_msg(self):486 def exit_msg(self):
444 return self.rsp.get('stderr')487 return self.rsp.get('stderr')
488
489
490# Ceph Broker Conversation:
491# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
492# and send that request to ceph via the ceph relation. The CephBrokerRq has a
493# unique id so that the client can identity which CephBrokerRsp is associated
494# with the request. Ceph will also respond to each client unit individually
495# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
496# via key broker-rsp-glance-0
497#
498# To use this the charm can just do something like:
499#
500# from charmhelpers.contrib.storage.linux.ceph import (
501# send_request_if_needed,
502# is_request_complete,
503# CephBrokerRq,
504# )
505#
506# @hooks.hook('ceph-relation-changed')
507# def ceph_changed():
508# rq = CephBrokerRq()
509# rq.add_op_create_pool(name='poolname', replica_count=3)
510#
511# if is_request_complete(rq):
512# <Request complete actions>
513# else:
514# send_request_if_needed(get_ceph_request())
515#
516# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
517# of glance having sent a request to ceph which ceph has successfully processed
518# 'ceph:8': {
519# 'ceph/0': {
520# 'auth': 'cephx',
521# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
522# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
523# 'ceph-public-address': '10.5.44.103',
524# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
525# 'private-address': '10.5.44.103',
526# },
527# 'glance/0': {
528# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
529# '"ops": [{"replicas": 3, "name": "glance", '
530# '"op": "create-pool"}]}'),
531# 'private-address': '10.5.44.109',
532# },
533# }
534
535def get_previous_request(rid):
536 """Return the last ceph broker request sent on a given relation
537
538 @param rid: Relation id to query for request
539 """
540 request = None
541 broker_req = relation_get(attribute='broker_req', rid=rid,
542 unit=local_unit())
543 if broker_req:
544 request_data = json.loads(broker_req)
545 request = CephBrokerRq(api_version=request_data['api-version'],
546 request_id=request_data['request-id'])
547 request.set_ops(request_data['ops'])
548
549 return request
550
551
552def get_request_states(request):
553 """Return a dict of requests per relation id with their corresponding
554 completion state.
555
556 This allows a charm, which has a request for ceph, to see whether there is
557 an equivalent request already being processed and if so what state that
558 request is in.
559
560 @param request: A CephBrokerRq object
561 """
562 complete = []
563 requests = {}
564 for rid in relation_ids('ceph'):
565 complete = False
566 previous_request = get_previous_request(rid)
567 if request == previous_request:
568 sent = True
569 complete = is_request_complete_for_rid(previous_request, rid)
570 else:
571 sent = False
572 complete = False
573
574 requests[rid] = {
575 'sent': sent,
576 'complete': complete,
577 }
578
579 return requests
580
581
582def is_request_sent(request):
583 """Check to see if a functionally equivalent request has already been sent
584
585 Returns True if a similair request has been sent
586
587 @param request: A CephBrokerRq object
588 """
589 states = get_request_states(request)
590 for rid in states.keys():
591 if not states[rid]['sent']:
592 return False
593
594 return True
595
596
597def is_request_complete(request):
598 """Check to see if a functionally equivalent request has already been
599 completed
600
601 Returns True if a similair request has been completed
602
603 @param request: A CephBrokerRq object
604 """
605 states = get_request_states(request)
606 for rid in states.keys():
607 if not states[rid]['complete']:
608 return False
609
610 return True
611
612
613def is_request_complete_for_rid(request, rid):
614 """Check if a given request has been completed on the given relation
615
616 @param request: A CephBrokerRq object
617 @param rid: Relation ID
618 """
619 broker_key = get_broker_rsp_key()
620 for unit in related_units(rid):
621 rdata = relation_get(rid=rid, unit=unit)
622 if rdata.get(broker_key):
623 rsp = CephBrokerRsp(rdata.get(broker_key))
624 if rsp.request_id == request.request_id:
625 if not rsp.exit_code:
626 return True
627 else:
628 # The remote unit sent no reply targeted at this unit so either the
629 # remote ceph cluster does not support unit targeted replies or it
630 # has not processed our request yet.
631 if rdata.get('broker_rsp'):
632 request_data = json.loads(rdata['broker_rsp'])
633 if request_data.get('request-id'):
634 log('Ignoring legacy broker_rsp without unit key as remote '
635 'service supports unit specific replies', level=DEBUG)
636 else:
637 log('Using legacy broker_rsp as remote service does not '
638 'supports unit specific replies', level=DEBUG)
639 rsp = CephBrokerRsp(rdata['broker_rsp'])
640 if not rsp.exit_code:
641 return True
642
643 return False
644
645
646def get_broker_rsp_key():
647 """Return broker response key for this unit
648
649 This is the key that ceph is going to use to pass request status
650 information back to this unit
651 """
652 return 'broker-rsp-' + local_unit().replace('/', '-')
653
654
655def send_request_if_needed(request):
656 """Send broker request if an equivalent request has not already been sent
657
658 @param request: A CephBrokerRq object
659 """
660 if is_request_sent(request):
661 log('Request already sent but not complete, not sending new request',
662 level=DEBUG)
663 else:
664 for rid in relation_ids('ceph'):
665 log('Sending request {}'.format(request.request_id), level=DEBUG)
666 relation_set(relation_id=rid, broker_req=request.request)
445667
=== modified file 'hooks/hooks.py'
--- hooks/hooks.py 2015-03-23 17:40:42 +0000
+++ hooks/hooks.py 2015-09-10 09:35:21 +0000
@@ -319,7 +319,15 @@
319 log("Not leader - ignoring broker request", level=DEBUG)319 log("Not leader - ignoring broker request", level=DEBUG)
320 else:320 else:
321 rsp = process_requests(settings['broker_req'])321 rsp = process_requests(settings['broker_req'])
322 relation_set(relation_settings={'broker_rsp': rsp})322 unit_id = remote_unit().replace('/', '-')
323 unit_response_key = 'broker-rsp-' + unit_id
324 # broker_rsp is being left for backward compatibility,
325 # unit_response_key superscedes it
326 data = {
327 'broker_rsp': rsp,
328 unit_response_key: rsp,
329 }
330 relation_set(relation_settings=data)
323 else:331 else:
324 log('mon cluster not in quorum', level=DEBUG)332 log('mon cluster not in quorum', level=DEBUG)
325333
326334
=== modified file 'tests/charmhelpers/contrib/amulet/utils.py'
--- tests/charmhelpers/contrib/amulet/utils.py 2015-08-19 00:51:43 +0000
+++ tests/charmhelpers/contrib/amulet/utils.py 2015-09-10 09:35:21 +0000
@@ -19,9 +19,11 @@
19import logging19import logging
20import os20import os
21import re21import re
22import socket
22import subprocess23import subprocess
23import sys24import sys
24import time25import time
26import uuid
2527
26import amulet28import amulet
27import distro_info29import distro_info
@@ -114,7 +116,7 @@
114 # /!\ DEPRECATION WARNING (beisner):116 # /!\ DEPRECATION WARNING (beisner):
115 # New and existing tests should be rewritten to use117 # New and existing tests should be rewritten to use
116 # validate_services_by_name() as it is aware of init systems.118 # validate_services_by_name() as it is aware of init systems.
117 self.log.warn('/!\\ DEPRECATION WARNING: use '119 self.log.warn('DEPRECATION WARNING: use '
118 'validate_services_by_name instead of validate_services '120 'validate_services_by_name instead of validate_services '
119 'due to init system differences.')121 'due to init system differences.')
120122
@@ -269,33 +271,52 @@
269 """Get last modification time of directory."""271 """Get last modification time of directory."""
270 return sentry_unit.directory_stat(directory)['mtime']272 return sentry_unit.directory_stat(directory)['mtime']
271273
272 def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):274 def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
273 """Get process' start time.275 """Get start time of a process based on the last modification time
274276 of the /proc/pid directory.
275 Determine start time of the process based on the last modification277
276 time of the /proc/pid directory. If pgrep_full is True, the process278 :sentry_unit: The sentry unit to check for the service on
277 name is matched against the full command line.279 :service: service name to look for in process table
278 """280 :pgrep_full: [Deprecated] Use full command line search mode with pgrep
279 if pgrep_full:281 :returns: epoch time of service process start
280 cmd = 'pgrep -o -f {}'.format(service)282 :param commands: list of bash commands
281 else:283 :param sentry_units: list of sentry unit pointers
282 cmd = 'pgrep -o {}'.format(service)284 :returns: None if successful; Failure message otherwise
283 cmd = cmd + ' | grep -v pgrep || exit 0'285 """
284 cmd_out = sentry_unit.run(cmd)286 if pgrep_full is not None:
285 self.log.debug('CMDout: ' + str(cmd_out))287 # /!\ DEPRECATION WARNING (beisner):
286 if cmd_out[0]:288 # No longer implemented, as pidof is now used instead of pgrep.
287 self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))289 # https://bugs.launchpad.net/charm-helpers/+bug/1474030
288 proc_dir = '/proc/{}'.format(cmd_out[0].strip())290 self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
289 return self._get_dir_mtime(sentry_unit, proc_dir)291 'longer implemented re: lp 1474030.')
292
293 pid_list = self.get_process_id_list(sentry_unit, service)
294 pid = pid_list[0]
295 proc_dir = '/proc/{}'.format(pid)
296 self.log.debug('Pid for {} on {}: {}'.format(
297 service, sentry_unit.info['unit_name'], pid))
298
299 return self._get_dir_mtime(sentry_unit, proc_dir)
290300
291 def service_restarted(self, sentry_unit, service, filename,301 def service_restarted(self, sentry_unit, service, filename,
292 pgrep_full=False, sleep_time=20):302 pgrep_full=None, sleep_time=20):
293 """Check if service was restarted.303 """Check if service was restarted.
294304
295 Compare a service's start time vs a file's last modification time305 Compare a service's start time vs a file's last modification time
296 (such as a config file for that service) to determine if the service306 (such as a config file for that service) to determine if the service
297 has been restarted.307 has been restarted.
298 """308 """
309 # /!\ DEPRECATION WARNING (beisner):
310 # This method is prone to races in that no before-time is known.
311 # Use validate_service_config_changed instead.
312
313 # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
314 # used instead of pgrep. pgrep_full is still passed through to ensure
315 # deprecation WARNS. lp1474030
316 self.log.warn('DEPRECATION WARNING: use '
317 'validate_service_config_changed instead of '
318 'service_restarted due to known races.')
319
299 time.sleep(sleep_time)320 time.sleep(sleep_time)
300 if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=321 if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
301 self._get_file_mtime(sentry_unit, filename)):322 self._get_file_mtime(sentry_unit, filename)):
@@ -304,15 +325,15 @@
304 return False325 return False
305326
306 def service_restarted_since(self, sentry_unit, mtime, service,327 def service_restarted_since(self, sentry_unit, mtime, service,
307 pgrep_full=False, sleep_time=20,328 pgrep_full=None, sleep_time=20,
308 retry_count=2):329 retry_count=2, retry_sleep_time=30):
309 """Check if service was been started after a given time.330 """Check if service was been started after a given time.
310331
311 Args:332 Args:
312 sentry_unit (sentry): The sentry unit to check for the service on333 sentry_unit (sentry): The sentry unit to check for the service on
313 mtime (float): The epoch time to check against334 mtime (float): The epoch time to check against
314 service (string): service name to look for in process table335 service (string): service name to look for in process table
315 pgrep_full (boolean): Use full command line search mode with pgrep336 pgrep_full: [Deprecated] Use full command line search mode with pgrep
316 sleep_time (int): Seconds to sleep before looking for process337 sleep_time (int): Seconds to sleep before looking for process
317 retry_count (int): If service is not found, how many times to retry338 retry_count (int): If service is not found, how many times to retry
318339
@@ -321,30 +342,44 @@
321 False if service is older than mtime or if service was342 False if service is older than mtime or if service was
322 not found.343 not found.
323 """344 """
324 self.log.debug('Checking %s restarted since %s' % (service, mtime))345 # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
346 # used instead of pgrep. pgrep_full is still passed through to ensure
347 # deprecation WARNS. lp1474030
348
349 unit_name = sentry_unit.info['unit_name']
350 self.log.debug('Checking that %s service restarted since %s on '
351 '%s' % (service, mtime, unit_name))
325 time.sleep(sleep_time)352 time.sleep(sleep_time)
326 proc_start_time = self._get_proc_start_time(sentry_unit, service,353 proc_start_time = None
327 pgrep_full)354 tries = 0
328 while retry_count > 0 and not proc_start_time:355 while tries <= retry_count and not proc_start_time:
329 self.log.debug('No pid file found for service %s, will retry %i '356 try:
330 'more times' % (service, retry_count))357 proc_start_time = self._get_proc_start_time(sentry_unit,
331 time.sleep(30)358 service,
332 proc_start_time = self._get_proc_start_time(sentry_unit, service,359 pgrep_full)
333 pgrep_full)360 self.log.debug('Attempt {} to get {} proc start time on {} '
334 retry_count = retry_count - 1361 'OK'.format(tries, service, unit_name))
362 except IOError:
363 # NOTE(beisner) - race avoidance, proc may not exist yet.
364 # https://bugs.launchpad.net/charm-helpers/+bug/1474030
365 self.log.debug('Attempt {} to get {} proc start time on {} '
366 'failed'.format(tries, service, unit_name))
367 time.sleep(retry_sleep_time)
368 tries += 1
335369
336 if not proc_start_time:370 if not proc_start_time:
337 self.log.warn('No proc start time found, assuming service did '371 self.log.warn('No proc start time found, assuming service did '
338 'not start')372 'not start')
339 return False373 return False
340 if proc_start_time >= mtime:374 if proc_start_time >= mtime:
341 self.log.debug('proc start time is newer than provided mtime'375 self.log.debug('Proc start time is newer than provided mtime'
342 '(%s >= %s)' % (proc_start_time, mtime))376 '(%s >= %s) on %s (OK)' % (proc_start_time,
377 mtime, unit_name))
343 return True378 return True
344 else:379 else:
345 self.log.warn('proc start time (%s) is older than provided mtime '380 self.log.warn('Proc start time (%s) is older than provided mtime '
346 '(%s), service did not restart' % (proc_start_time,381 '(%s) on %s, service did not '
347 mtime))382 'restart' % (proc_start_time, mtime, unit_name))
348 return False383 return False
349384
350 def config_updated_since(self, sentry_unit, filename, mtime,385 def config_updated_since(self, sentry_unit, filename, mtime,
@@ -374,8 +409,9 @@
374 return False409 return False
375410
376 def validate_service_config_changed(self, sentry_unit, mtime, service,411 def validate_service_config_changed(self, sentry_unit, mtime, service,
377 filename, pgrep_full=False,412 filename, pgrep_full=None,
378 sleep_time=20, retry_count=2):413 sleep_time=20, retry_count=2,
414 retry_sleep_time=30):
379 """Check service and file were updated after mtime415 """Check service and file were updated after mtime
380416
381 Args:417 Args:
@@ -383,9 +419,10 @@
383 mtime (float): The epoch time to check against419 mtime (float): The epoch time to check against
384 service (string): service name to look for in process table420 service (string): service name to look for in process table
385 filename (string): The file to check mtime of421 filename (string): The file to check mtime of
386 pgrep_full (boolean): Use full command line search mode with pgrep422 pgrep_full: [Deprecated] Use full command line search mode with pgrep
387 sleep_time (int): Seconds to sleep before looking for process423 sleep_time (int): Initial sleep in seconds to pass to test helpers
388 retry_count (int): If service is not found, how many times to retry424 retry_count (int): If service is not found, how many times to retry
425 retry_sleep_time (int): Time in seconds to wait between retries
389426
390 Typical Usage:427 Typical Usage:
391 u = OpenStackAmuletUtils(ERROR)428 u = OpenStackAmuletUtils(ERROR)
@@ -402,15 +439,25 @@
402 mtime, False if service is older than mtime or if service was439 mtime, False if service is older than mtime or if service was
403 not found or if filename was modified before mtime.440 not found or if filename was modified before mtime.
404 """441 """
405 self.log.debug('Checking %s restarted since %s' % (service, mtime))442
406 time.sleep(sleep_time)443 # NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
407 service_restart = self.service_restarted_since(sentry_unit, mtime,444 # used instead of pgrep. pgrep_full is still passed through to ensure
408 service,445 # deprecation WARNS. lp1474030
409 pgrep_full=pgrep_full,446
410 sleep_time=0,447 service_restart = self.service_restarted_since(
411 retry_count=retry_count)448 sentry_unit, mtime,
412 config_update = self.config_updated_since(sentry_unit, filename, mtime,449 service,
413 sleep_time=0)450 pgrep_full=pgrep_full,
451 sleep_time=sleep_time,
452 retry_count=retry_count,
453 retry_sleep_time=retry_sleep_time)
454
455 config_update = self.config_updated_since(
456 sentry_unit,
457 filename,
458 mtime,
459 sleep_time=0)
460
414 return service_restart and config_update461 return service_restart and config_update
415462
416 def get_sentry_time(self, sentry_unit):463 def get_sentry_time(self, sentry_unit):
@@ -428,7 +475,6 @@
428 """Return a list of all Ubuntu releases in order of release."""475 """Return a list of all Ubuntu releases in order of release."""
429 _d = distro_info.UbuntuDistroInfo()476 _d = distro_info.UbuntuDistroInfo()
430 _release_list = _d.all477 _release_list = _d.all
431 self.log.debug('Ubuntu release list: {}'.format(_release_list))
432 return _release_list478 return _release_list
433479
434 def file_to_url(self, file_rel_path):480 def file_to_url(self, file_rel_path):
@@ -568,6 +614,142 @@
568614
569 return None615 return None
570616
617 def validate_sectionless_conf(self, file_contents, expected):
618 """A crude conf parser. Useful to inspect configuration files which
619 do not have section headers (as would be necessary in order to use
620 the configparser). Such as openstack-dashboard or rabbitmq confs."""
621 for line in file_contents.split('\n'):
622 if '=' in line:
623 args = line.split('=')
624 if len(args) <= 1:
625 continue
626 key = args[0].strip()
627 value = args[1].strip()
628 if key in expected.keys():
629 if expected[key] != value:
630 msg = ('Config mismatch. Expected, actual: {}, '
631 '{}'.format(expected[key], value))
632 amulet.raise_status(amulet.FAIL, msg=msg)
633
634 def get_unit_hostnames(self, units):
635 """Return a dict of juju unit names to hostnames."""
636 host_names = {}
637 for unit in units:
638 host_names[unit.info['unit_name']] = \
639 str(unit.file_contents('/etc/hostname').strip())
640 self.log.debug('Unit host names: {}'.format(host_names))
641 return host_names
642
643 def run_cmd_unit(self, sentry_unit, cmd):
644 """Run a command on a unit, return the output and exit code."""
645 output, code = sentry_unit.run(cmd)
646 if code == 0:
647 self.log.debug('{} `{}` command returned {} '
648 '(OK)'.format(sentry_unit.info['unit_name'],
649 cmd, code))
650 else:
651 msg = ('{} `{}` command returned {} '
652 '{}'.format(sentry_unit.info['unit_name'],
653 cmd, code, output))
654 amulet.raise_status(amulet.FAIL, msg=msg)
655 return str(output), code
656
657 def file_exists_on_unit(self, sentry_unit, file_name):
658 """Check if a file exists on a unit."""
659 try:
660 sentry_unit.file_stat(file_name)
661 return True
662 except IOError:
663 return False
664 except Exception as e:
665 msg = 'Error checking file {}: {}'.format(file_name, e)
666 amulet.raise_status(amulet.FAIL, msg=msg)
667
668 def file_contents_safe(self, sentry_unit, file_name,
669 max_wait=60, fatal=False):
670 """Get file contents from a sentry unit. Wrap amulet file_contents
671 with retry logic to address races where a file checks as existing,
672 but no longer exists by the time file_contents is called.
673 Return None if file not found. Optionally raise if fatal is True."""
674 unit_name = sentry_unit.info['unit_name']
675 file_contents = False
676 tries = 0
677 while not file_contents and tries < (max_wait / 4):
678 try:
679 file_contents = sentry_unit.file_contents(file_name)
680 except IOError:
681 self.log.debug('Attempt {} to open file {} from {} '
682 'failed'.format(tries, file_name,
683 unit_name))
684 time.sleep(4)
685 tries += 1
686
687 if file_contents:
688 return file_contents
689 elif not fatal:
690 return None
691 elif fatal:
692 msg = 'Failed to get file contents from unit.'
693 amulet.raise_status(amulet.FAIL, msg)
694
695 def port_knock_tcp(self, host="localhost", port=22, timeout=15):
696 """Open a TCP socket to check for a listening sevice on a host.
697
698 :param host: host name or IP address, default to localhost
699 :param port: TCP port number, default to 22
700 :param timeout: Connect timeout, default to 15 seconds
701 :returns: True if successful, False if connect failed
702 """
703
704 # Resolve host name if possible
705 try:
706 connect_host = socket.gethostbyname(host)
707 host_human = "{} ({})".format(connect_host, host)
708 except socket.error as e:
709 self.log.warn('Unable to resolve address: '
710 '{} ({}) Trying anyway!'.format(host, e))
711 connect_host = host
712 host_human = connect_host
713
714 # Attempt socket connection
715 try:
716 knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
717 knock.settimeout(timeout)
718 knock.connect((connect_host, port))
719 knock.close()
720 self.log.debug('Socket connect OK for host '
721 '{} on port {}.'.format(host_human, port))
722 return True
723 except socket.error as e:
724 self.log.debug('Socket connect FAIL for'
725 ' {} port {} ({})'.format(host_human, port, e))
726 return False
727
728 def port_knock_units(self, sentry_units, port=22,
729 timeout=15, expect_success=True):
730 """Open a TCP socket to check for a listening sevice on each
731 listed juju unit.
732
733 :param sentry_units: list of sentry unit pointers
734 :param port: TCP port number, default to 22
735 :param timeout: Connect timeout, default to 15 seconds
736 :expect_success: True by default, set False to invert logic
737 :returns: None if successful, Failure message otherwise
738 """
739 for unit in sentry_units:
740 host = unit.info['public-address']
741 connected = self.port_knock_tcp(host, port, timeout)
742 if not connected and expect_success:
743 return 'Socket connect failed.'
744 elif connected and not expect_success:
745 return 'Socket connected unexpectedly.'
746
747 def get_uuid_epoch_stamp(self):
748 """Returns a stamp string based on uuid4 and epoch time. Useful in
749 generating test messages which need to be unique-ish."""
750 return '[{}-{}]'.format(uuid.uuid4(), time.time())
751
752# amulet juju action helpers:
571 def run_action(self, unit_sentry, action,753 def run_action(self, unit_sentry, action,
572 _check_output=subprocess.check_output):754 _check_output=subprocess.check_output):
573 """Run the named action on a given unit sentry.755 """Run the named action on a given unit sentry.
574756
=== modified file 'tests/charmhelpers/contrib/openstack/amulet/deployment.py'
--- tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-08-19 00:51:43 +0000
+++ tests/charmhelpers/contrib/openstack/amulet/deployment.py 2015-09-10 09:35:21 +0000
@@ -44,8 +44,15 @@
44 Determine if the local branch being tested is derived from its44 Determine if the local branch being tested is derived from its
45 stable or next (dev) branch, and based on this, use the corresonding45 stable or next (dev) branch, and based on this, use the corresonding
46 stable or next branches for the other_services."""46 stable or next branches for the other_services."""
47
48 # Charms outside the lp:~openstack-charmers namespace
47 base_charms = ['mysql', 'mongodb', 'nrpe']49 base_charms = ['mysql', 'mongodb', 'nrpe']
4850
51 # Force these charms to current series even when using an older series.
52 # ie. Use trusty/nrpe even when series is precise, as the P charm
53 # does not possess the necessary external master config and hooks.
54 force_series_current = ['nrpe']
55
49 if self.series in ['precise', 'trusty']:56 if self.series in ['precise', 'trusty']:
50 base_series = self.series57 base_series = self.series
51 else:58 else:
@@ -53,11 +60,17 @@
5360
54 if self.stable:61 if self.stable:
55 for svc in other_services:62 for svc in other_services:
63 if svc['name'] in force_series_current:
64 base_series = self.current_next
65
56 temp = 'lp:charms/{}/{}'66 temp = 'lp:charms/{}/{}'
57 svc['location'] = temp.format(base_series,67 svc['location'] = temp.format(base_series,
58 svc['name'])68 svc['name'])
59 else:69 else:
60 for svc in other_services:70 for svc in other_services:
71 if svc['name'] in force_series_current:
72 base_series = self.current_next
73
61 if svc['name'] in base_charms:74 if svc['name'] in base_charms:
62 temp = 'lp:charms/{}/{}'75 temp = 'lp:charms/{}/{}'
63 svc['location'] = temp.format(base_series,76 svc['location'] = temp.format(base_series,
@@ -77,21 +90,23 @@
7790
78 services = other_services91 services = other_services
79 services.append(this_service)92 services.append(this_service)
93
94 # Charms which should use the source config option
80 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',95 use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
81 'ceph-osd', 'ceph-radosgw']96 'ceph-osd', 'ceph-radosgw']
82 # Most OpenStack subordinate charms do not expose an origin option97
83 # as that is controlled by the principle.98 # Charms which can not use openstack-origin, ie. many subordinates
84 ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']99 no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
85100
86 if self.openstack:101 if self.openstack:
87 for svc in services:102 for svc in services:
88 if svc['name'] not in use_source + ignore:103 if svc['name'] not in use_source + no_origin:
89 config = {'openstack-origin': self.openstack}104 config = {'openstack-origin': self.openstack}
90 self.d.configure(svc['name'], config)105 self.d.configure(svc['name'], config)
91106
92 if self.source:107 if self.source:
93 for svc in services:108 for svc in services:
94 if svc['name'] in use_source and svc['name'] not in ignore:109 if svc['name'] in use_source and svc['name'] not in no_origin:
95 config = {'source': self.source}110 config = {'source': self.source}
96 self.d.configure(svc['name'], config)111 self.d.configure(svc['name'], config)
97112
98113
=== modified file 'tests/charmhelpers/contrib/openstack/amulet/utils.py'
--- tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-06-29 14:25:54 +0000
+++ tests/charmhelpers/contrib/openstack/amulet/utils.py 2015-09-10 09:35:21 +0000
@@ -27,6 +27,7 @@
27import heatclient.v1.client as heat_client27import heatclient.v1.client as heat_client
28import keystoneclient.v2_0 as keystone_client28import keystoneclient.v2_0 as keystone_client
29import novaclient.v1_1.client as nova_client29import novaclient.v1_1.client as nova_client
30import pika
30import swiftclient31import swiftclient
3132
32from charmhelpers.contrib.amulet.utils import (33from charmhelpers.contrib.amulet.utils import (
@@ -602,3 +603,361 @@
602 self.log.debug('Ceph {} samples (OK): '603 self.log.debug('Ceph {} samples (OK): '
603 '{}'.format(sample_type, samples))604 '{}'.format(sample_type, samples))
604 return None605 return None
606
607# rabbitmq/amqp specific helpers:
608 def add_rmq_test_user(self, sentry_units,
609 username="testuser1", password="changeme"):
610 """Add a test user via the first rmq juju unit, check connection as
611 the new user against all sentry units.
612
613 :param sentry_units: list of sentry unit pointers
614 :param username: amqp user name, default to testuser1
615 :param password: amqp user password
616 :returns: None if successful. Raise on error.
617 """
618 self.log.debug('Adding rmq user ({})...'.format(username))
619
620 # Check that user does not already exist
621 cmd_user_list = 'rabbitmqctl list_users'
622 output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
623 if username in output:
624 self.log.warning('User ({}) already exists, returning '
625 'gracefully.'.format(username))
626 return
627
628 perms = '".*" ".*" ".*"'
629 cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
630 'rabbitmqctl set_permissions {} {}'.format(username, perms)]
631
632 # Add user via first unit
633 for cmd in cmds:
634 output, _ = self.run_cmd_unit(sentry_units[0], cmd)
635
636 # Check connection against the other sentry_units
637 self.log.debug('Checking user connect against units...')
638 for sentry_unit in sentry_units:
639 connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
640 username=username,
641 password=password)
642 connection.close()
643
644 def delete_rmq_test_user(self, sentry_units, username="testuser1"):
645 """Delete a rabbitmq user via the first rmq juju unit.
646
647 :param sentry_units: list of sentry unit pointers
648 :param username: amqp user name, default to testuser1
649 :param password: amqp user password
650 :returns: None if successful or no such user.
651 """
652 self.log.debug('Deleting rmq user ({})...'.format(username))
653
654 # Check that the user exists
655 cmd_user_list = 'rabbitmqctl list_users'
656 output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
657
658 if username not in output:
659 self.log.warning('User ({}) does not exist, returning '
660 'gracefully.'.format(username))
661 return
662
663 # Delete the user
664 cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
665 output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
666
667 def get_rmq_cluster_status(self, sentry_unit):
668 """Execute rabbitmq cluster status command on a unit and return
669 the full output.
670
671 :param unit: sentry unit
672 :returns: String containing console output of cluster status command
673 """
674 cmd = 'rabbitmqctl cluster_status'
675 output, _ = self.run_cmd_unit(sentry_unit, cmd)
676 self.log.debug('{} cluster_status:\n{}'.format(
677 sentry_unit.info['unit_name'], output))
678 return str(output)
679
680 def get_rmq_cluster_running_nodes(self, sentry_unit):
681 """Parse rabbitmqctl cluster_status output string, return list of
682 running rabbitmq cluster nodes.
683
684 :param unit: sentry unit
685 :returns: List containing node names of running nodes
686 """
687 # NOTE(beisner): rabbitmqctl cluster_status output is not
688 # json-parsable, do string chop foo, then json.loads that.
689 str_stat = self.get_rmq_cluster_status(sentry_unit)
690 if 'running_nodes' in str_stat:
691 pos_start = str_stat.find("{running_nodes,") + 15
692 pos_end = str_stat.find("]},", pos_start) + 1
693 str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
694 run_nodes = json.loads(str_run_nodes)
695 return run_nodes
696 else:
697 return []
698
699 def validate_rmq_cluster_running_nodes(self, sentry_units):
700 """Check that all rmq unit hostnames are represented in the
701 cluster_status output of all units.
702
703 :param host_names: dict of juju unit names to host names
704 :param units: list of sentry unit pointers (all rmq units)
705 :returns: None if successful, otherwise return error message
706 """
707 host_names = self.get_unit_hostnames(sentry_units)
708 errors = []
709
710 # Query every unit for cluster_status running nodes
711 for query_unit in sentry_units:
712 query_unit_name = query_unit.info['unit_name']
713 running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
714
715 # Confirm that every unit is represented in the queried unit's
716 # cluster_status running nodes output.
717 for validate_unit in sentry_units:
718 val_host_name = host_names[validate_unit.info['unit_name']]
719 val_node_name = 'rabbit@{}'.format(val_host_name)
720
721 if val_node_name not in running_nodes:
722 errors.append('Cluster member check failed on {}: {} not '
723 'in {}\n'.format(query_unit_name,
724 val_node_name,
725 running_nodes))
726 if errors:
727 return ''.join(errors)
728
729 def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
730 """Check a single juju rmq unit for ssl and port in the config file."""
731 host = sentry_unit.info['public-address']
732 unit_name = sentry_unit.info['unit_name']
733
734 conf_file = '/etc/rabbitmq/rabbitmq.config'
735 conf_contents = str(self.file_contents_safe(sentry_unit,
736 conf_file, max_wait=16))
737 # Checks
738 conf_ssl = 'ssl' in conf_contents
739 conf_port = str(port) in conf_contents
740
741 # Port explicitly checked in config
742 if port and conf_port and conf_ssl:
743 self.log.debug('SSL is enabled @{}:{} '
744 '({})'.format(host, port, unit_name))
745 return True
746 elif port and not conf_port and conf_ssl:
747 self.log.debug('SSL is enabled @{} but not on port {} '
748 '({})'.format(host, port, unit_name))
749 return False
750 # Port not checked (useful when checking that ssl is disabled)
751 elif not port and conf_ssl:
752 self.log.debug('SSL is enabled @{}:{} '
753 '({})'.format(host, port, unit_name))
754 return True
755 elif not port and not conf_ssl:
756 self.log.debug('SSL not enabled @{}:{} '
757 '({})'.format(host, port, unit_name))
758 return False
759 else:
760 msg = ('Unknown condition when checking SSL status @{}:{} '
761 '({})'.format(host, port, unit_name))
762 amulet.raise_status(amulet.FAIL, msg)
763
764 def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
765 """Check that ssl is enabled on rmq juju sentry units.
766
767 :param sentry_units: list of all rmq sentry units
768 :param port: optional ssl port override to validate
769 :returns: None if successful, otherwise return error message
770 """
771 for sentry_unit in sentry_units:
772 if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
773 return ('Unexpected condition: ssl is disabled on unit '
774 '({})'.format(sentry_unit.info['unit_name']))
775 return None
776
777 def validate_rmq_ssl_disabled_units(self, sentry_units):
778 """Check that ssl is enabled on listed rmq juju sentry units.
779
780 :param sentry_units: list of all rmq sentry units
781 :returns: True if successful. Raise on error.
782 """
783 for sentry_unit in sentry_units:
784 if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
785 return ('Unexpected condition: ssl is enabled on unit '
786 '({})'.format(sentry_unit.info['unit_name']))
787 return None
788
789 def configure_rmq_ssl_on(self, sentry_units, deployment,
790 port=None, max_wait=60):
791 """Turn ssl charm config option on, with optional non-default
792 ssl port specification. Confirm that it is enabled on every
793 unit.
794
795 :param sentry_units: list of sentry units
796 :param deployment: amulet deployment object pointer
797 :param port: amqp port, use defaults if None
798 :param max_wait: maximum time to wait in seconds to confirm
799 :returns: None if successful. Raise on error.
800 """
801 self.log.debug('Setting ssl charm config option: on')
802
803 # Enable RMQ SSL
804 config = {'ssl': 'on'}
805 if port:
806 config['ssl_port'] = port
807
808 deployment.configure('rabbitmq-server', config)
809
810 # Confirm
811 tries = 0
812 ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
813 while ret and tries < (max_wait / 4):
814 time.sleep(4)
815 self.log.debug('Attempt {}: {}'.format(tries, ret))
816 ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
817 tries += 1
818
819 if ret:
820 amulet.raise_status(amulet.FAIL, ret)
821
822 def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
823 """Turn ssl charm config option off, confirm that it is disabled
824 on every unit.
825
826 :param sentry_units: list of sentry units
827 :param deployment: amulet deployment object pointer
828 :param max_wait: maximum time to wait in seconds to confirm
829 :returns: None if successful. Raise on error.
830 """
831 self.log.debug('Setting ssl charm config option: off')
832
833 # Disable RMQ SSL
834 config = {'ssl': 'off'}
835 deployment.configure('rabbitmq-server', config)
836
837 # Confirm
838 tries = 0
839 ret = self.validate_rmq_ssl_disabled_units(sentry_units)
840 while ret and tries < (max_wait / 4):
841 time.sleep(4)
842 self.log.debug('Attempt {}: {}'.format(tries, ret))
843 ret = self.validate_rmq_ssl_disabled_units(sentry_units)
844 tries += 1
845
846 if ret:
847 amulet.raise_status(amulet.FAIL, ret)
848
849 def connect_amqp_by_unit(self, sentry_unit, ssl=False,
850 port=None, fatal=True,
851 username="testuser1", password="changeme"):
852 """Establish and return a pika amqp connection to the rabbitmq service
853 running on a rmq juju unit.
854
855 :param sentry_unit: sentry unit pointer
856 :param ssl: boolean, default to False
857 :param port: amqp port, use defaults if None
858 :param fatal: boolean, default to True (raises on connect error)
859 :param username: amqp user name, default to testuser1
860 :param password: amqp user password
861 :returns: pika amqp connection pointer or None if failed and non-fatal
862 """
863 host = sentry_unit.info['public-address']
864 unit_name = sentry_unit.info['unit_name']
865
866 # Default port logic if port is not specified
867 if ssl and not port:
868 port = 5671
869 elif not ssl and not port:
870 port = 5672
871
872 self.log.debug('Connecting to amqp on {}:{} ({}) as '
873 '{}...'.format(host, port, unit_name, username))
874
875 try:
876 credentials = pika.PlainCredentials(username, password)
877 parameters = pika.ConnectionParameters(host=host, port=port,
878 credentials=credentials,
879 ssl=ssl,
880 connection_attempts=3,
881 retry_delay=5,
882 socket_timeout=1)
883 connection = pika.BlockingConnection(parameters)
884 assert connection.server_properties['product'] == 'RabbitMQ'
885 self.log.debug('Connect OK')
886 return connection
887 except Exception as e:
888 msg = ('amqp connection failed to {}:{} as '
889 '{} ({})'.format(host, port, username, str(e)))
890 if fatal:
891 amulet.raise_status(amulet.FAIL, msg)
892 else:
893 self.log.warn(msg)
894 return None
895
896 def publish_amqp_message_by_unit(self, sentry_unit, message,
897 queue="test", ssl=False,
898 username="testuser1",
899 password="changeme",
900 port=None):
901 """Publish an amqp message to a rmq juju unit.
902
903 :param sentry_unit: sentry unit pointer
904 :param message: amqp message string
905 :param queue: message queue, default to test
906 :param username: amqp user name, default to testuser1
907 :param password: amqp user password
908 :param ssl: boolean, default to False
909 :param port: amqp port, use defaults if None
910 :returns: None. Raises exception if publish failed.
911 """
912 self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
913 message))
914 connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
915 port=port,
916 username=username,
917 password=password)
918
919 # NOTE(beisner): extra debug here re: pika hang potential:
920 # https://github.com/pika/pika/issues/297
921 # https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
922 self.log.debug('Defining channel...')
923 channel = connection.channel()
924 self.log.debug('Declaring queue...')
925 channel.queue_declare(queue=queue, auto_delete=False, durable=True)
926 self.log.debug('Publishing message...')
927 channel.basic_publish(exchange='', routing_key=queue, body=message)
928 self.log.debug('Closing channel...')
929 channel.close()
930 self.log.debug('Closing connection...')
931 connection.close()
932
933 def get_amqp_message_by_unit(self, sentry_unit, queue="test",
934 username="testuser1",
935 password="changeme",
936 ssl=False, port=None):
937 """Get an amqp message from a rmq juju unit.
938
939 :param sentry_unit: sentry unit pointer
940 :param queue: message queue, default to test
941 :param username: amqp user name, default to testuser1
942 :param password: amqp user password
943 :param ssl: boolean, default to False
944 :param port: amqp port, use defaults if None
945 :returns: amqp message body as string. Raise if get fails.
946 """
947 connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
948 port=port,
949 username=username,
950 password=password)
951 channel = connection.channel()
952 method_frame, _, body = channel.basic_get(queue)
953
954 if method_frame:
955 self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
956 body))
957 channel.basic_ack(method_frame.delivery_tag)
958 channel.close()
959 connection.close()
960 return body
961 else:
962 msg = 'No message retrieved.'
963 amulet.raise_status(amulet.FAIL, msg)
605964
=== modified file 'unit_tests/test_ceph_broker.py'
--- unit_tests/test_ceph_broker.py 2014-11-09 12:58:04 +0000
+++ unit_tests/test_ceph_broker.py 2015-09-10 09:35:21 +0000
@@ -70,3 +70,30 @@
70 mock_pool_exists.assert_called_with(service='admin', name='foo')70 mock_pool_exists.assert_called_with(service='admin', name='foo')
71 self.assertFalse(mock_create_pool.called)71 self.assertFalse(mock_create_pool.called)
72 self.assertEqual(json.loads(rc), {'exit-code': 0})72 self.assertEqual(json.loads(rc), {'exit-code': 0})
73
74 @mock.patch('ceph_broker.create_pool')
75 @mock.patch('ceph_broker.pool_exists')
76 @mock.patch('ceph_broker.log')
77 def test_process_requests_create_pool_rid(self, mock_log, mock_pool_exists,
78 mock_create_pool):
79 mock_pool_exists.return_value = False
80 reqs = json.dumps({'api-version': 1,
81 'request-id': '1ef5aede',
82 'ops': [{'op': 'create-pool', 'name':
83 'foo', 'replicas': 3}]})
84 rc = ceph_broker.process_requests(reqs)
85 mock_pool_exists.assert_called_with(service='admin', name='foo')
86 mock_create_pool.assert_called_with(service='admin', name='foo',
87 replicas=3)
88 self.assertEqual(json.loads(rc)['exit-code'], 0)
89 self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')
90
91 @mock.patch('ceph_broker.log')
92 def test_process_requests_invalid_api_rid(self, mock_log):
93 reqs = json.dumps({'api-version': 0, 'request-id': '1ef5aede',
94 'ops': [{'op': 'create-pool'}]})
95 rc = ceph_broker.process_requests(reqs)
96 self.assertEqual(json.loads(rc)['exit-code'], 1)
97 self.assertEqual(json.loads(rc)['stderr'],
98 "Missing or invalid api version (0)")
99 self.assertEqual(json.loads(rc)['request-id'], '1ef5aede')

Subscribers

People subscribed via source and target branches