Merge lp:~larry-e-works/uci-engine/amqp-to-kombu into lp:uci-engine

Proposed by Larry E Works
Status: Work in progress
Proposed branch: lp:~larry-e-works/uci-engine/amqp-to-kombu
Merge into: lp:uci-engine
Diff against target: 120 lines (+32/-17)
2 files modified
ci-utils/ci_utils/amqp_utils.py (+30/-16)
testing/venv.py (+2/-1)
To merge this branch: bzr merge lp:~larry-e-works/uci-engine/amqp-to-kombu
Reviewer Review Type Date Requested Status
Canonical CI Engineering Pending
Review via email: mp+236614@code.launchpad.net

Commit message

Changed the bits in ci-utils/ci_utils/amqp_utils.py to use kombu instead of libamqp; has not been fully tested yet as I receive an error when attempting to run run-tests on a bootstack deployment. Will wipe that out and try a local LXC deployment.

Description of the change

Merged from trunk to gather in all of the changes since work started on this; no conflicts.

Changed the bits in ci-utils/ci_utils/amqp_utils.py to use kombu instead of libamqp; has not been fully tested yet as I receive an error when attempting to run run-tests on a bootstack deployment. Will wipe that out and try a local LXC deployment.

To post a comment you must log in.
770. By Larry E Works

Seems I missed a comma at the end of the password=rabbit_config.AMQP_PASSWORD line. Have added and will re-run.

771. By Larry E Works

Merged from trunk while I was at it; no conflicts.

Revision history for this message
Larry E Works (larry-e-works) wrote :

Was thinking about this earlier (and the traceback received when attempting to run the tests) and the problem originates with with a missing comma in setting the kombu connection string. Have fixed that and will try again.

772. By Larry E Works

Modified send() in amqp_utils.pu to call kombu in the correct manner. This worked with the test in the sense that I can ssh into the rabbit server and see the test queue get created and populated. The rest of the test still fails since the test itself (tests.test_rabbit.TestRabbit.test_acked_message) is still written to use libamqp functionality that doesn not directly translate to kombu functionality.

Modified venv.py to add the needed kombu dependencies for testing. The deps have been downloaded locally, added and pushed to lp:~canonical-ci-engineering/uci-engine/deps/

Unmerged revisions

772. By Larry E Works

Modified send() in amqp_utils.pu to call kombu in the correct manner. This worked with the test in the sense that I can ssh into the rabbit server and see the test queue get created and populated. The rest of the test still fails since the test itself (tests.test_rabbit.TestRabbit.test_acked_message) is still written to use libamqp functionality that doesn not directly translate to kombu functionality.

Modified venv.py to add the needed kombu dependencies for testing. The deps have been downloaded locally, added and pushed to lp:~canonical-ci-engineering/uci-engine/deps/

771. By Larry E Works

Merged from trunk while I was at it; no conflicts.

770. By Larry E Works

Seems I missed a comma at the end of the password=rabbit_config.AMQP_PASSWORD line. Have added and will re-run.

769. By Larry E Works

Massive merge from trunk, no conflicts.

768. By Larry E Works

Initial try at replacing amqp with kombu.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'ci-utils/ci_utils/amqp_utils.py'
--- ci-utils/ci_utils/amqp_utils.py 2014-08-19 15:10:19 +0000
+++ ci-utils/ci_utils/amqp_utils.py 2014-10-02 20:36:00 +0000
@@ -19,7 +19,9 @@
19import socket19import socket
20import time20import time
2121
22from amqplib import client_0_8 as amqp22##TOAD##from amqplib import client_0_8 as amqp
23from kombu import Connection, Producer, Exchange, Queue
24from kombu.mixins import ConsumerMixin
2325
24BSBUILDER_QUEUE = 'bsbuilder'26BSBUILDER_QUEUE = 'bsbuilder'
25IMAGE_BUILDER_QUEUE = 'imagebuilder'27IMAGE_BUILDER_QUEUE = 'imagebuilder'
@@ -46,6 +48,17 @@
46HERE = os.path.abspath(os.path.dirname(__file__))48HERE = os.path.abspath(os.path.dirname(__file__))
4749
4850
51class Worker(ConsumerMixin):
52
53 def __init__(self, connection, queue, callback):
54 self.connection = connection
55 self.queue = queue
56 return
57
58 def get_consumers(self, Consumer, channel):
59 return [Consumer(self.queue, callbacks=callback)]
60
61
49def get_config():62def get_config():
50 '''Load the rabbit config created by the charm'''63 '''Load the rabbit config created by the charm'''
51 config = None64 config = None
@@ -65,11 +78,12 @@
6578
6679
67def connection(rabbit_config):80def connection(rabbit_config):
68 return amqp.Connection(81 return Connection(
82 'librabbitmq://' + rabbit_config.AMQP_HOST + '//',
69 userid=rabbit_config.AMQP_USER,83 userid=rabbit_config.AMQP_USER,
84 password=rabbit_config.AMQP_PASSWORD,
70 virtual_host=rabbit_config.AMQP_VHOST,85 virtual_host=rabbit_config.AMQP_VHOST,
71 host=rabbit_config.AMQP_HOST,86 failover_strategy='round-robin'
72 password=rabbit_config.AMQP_PASSWORD
73 )87 )
7488
7589
@@ -79,12 +93,12 @@
79 conn = channel = None93 conn = channel = None
80 try:94 try:
81 conn = connection(config)95 conn = connection(config)
82 channel = conn.channel()96 conn.ensure_connection()
83 channel.queue_declare(queue=queue_name, durable=True,97 print " Connected: " + str(conn.connection)
84 auto_delete=False)98 queue = Queue(queue_name, durable=True, auto_delete=False)
99 channel = queue(conn)
100 channel.declare()
85 except Exception as e:101 except Exception as e:
86 if channel:
87 channel.close()
88 if conn:102 if conn:
89 conn.close()103 conn.close()
90 raise e104 raise e
@@ -104,17 +118,17 @@
104 conn = channel = None118 conn = channel = None
105 try:119 try:
106 conn, channel = declare_queue(queue_name, config)120 conn, channel = declare_queue(queue_name, config)
107 body = amqp.Message(msg)121 producer = Producer(conn)
108 body.properties['delivery_mode'] = 2 # Persistent122 producer.publish(msg, routing_key=queue_name, delivery_mode=2)
109 channel.basic_publish(body, exchange='', routing_key=queue_name)123##TOAD## body = amqp.Message(msg)
124##TOAD## body.properties['delivery_mode'] = 2 # Persistent
125##TOAD## channel.basic_publish(body, exchange='', routing_key=queue_name)
110 except Exception as e:126 except Exception as e:
111 if raise_errors:127 if raise_errors:
112 raise128 raise
113 logging.exception('unable to queue up message: %s', e)129 logging.exception('unable to queue up message: %s', e)
114 return str(e)130 return str(e)
115 finally:131 finally:
116 if channel:
117 channel.close()
118 if conn:132 if conn:
119 conn.close()133 conn.close()
120 return None134 return None
@@ -122,14 +136,14 @@
122136
123def _run_forever(channel, queue, callback, retry_period=120):137def _run_forever(channel, queue, callback, retry_period=120):
124 logging.info('Waiting for messages. ^C to exit.')138 logging.info('Waiting for messages. ^C to exit.')
125 tag = channel.basic_consume(callback=callback, queue=queue)139 tag = channel.ConsumerMixin.Consumer(queue, callbacks=callback)
126 try:140 try:
127 timeout = time.time()141 timeout = time.time()
128 while channel.callbacks and time.time() < timeout + retry_period:142 while channel.callbacks and time.time() < timeout + retry_period:
129 try:143 try:
130 channel.wait()144 channel.wait()
131 timeout = time.time()145 timeout = time.time()
132 except (amqp.AMQPConnectionException, socket.error):146 except (kombu.ConnectionError, socket.error):
133 logging.error('lost connection to Rabbit')147 logging.error('lost connection to Rabbit')
134 # TODO metrics.meter('lost_rabbit_connection')148 # TODO metrics.meter('lost_rabbit_connection')
135 # Don't probe immediately, give the network/process149 # Don't probe immediately, give the network/process
136150
=== modified file 'testing/venv.py'
--- testing/venv.py 2014-09-04 17:02:00 +0000
+++ testing/venv.py 2014-10-02 20:36:00 +0000
@@ -107,7 +107,8 @@
107107
108def additional_deps():108def additional_deps():
109 '''Install addtional dependencies.'''109 '''Install addtional dependencies.'''
110 additional_deps = ['cheetah', 'python-apt', 'jinja2']110 additional_deps = ['cheetah', 'python-apt', 'jinja2', 'kombu', 'amqp',
111 'librabbitmq', 'anyjson']
111 for dep in additional_deps:112 for dep in additional_deps:
112 cache_file = 'file://%s' % os.environ['PIP_CACHE']113 cache_file = 'file://%s' % os.environ['PIP_CACHE']
113 cmd = ['pip', 'install', '--no-index', '-f', cache_file, dep]114 cmd = ['pip', 'install', '--no-index', '-f', cache_file, dep]

Subscribers

People subscribed via source and target branches