Merge lp:~larry-e-works/uci-engine/amqp-to-kombu into lp:uci-engine

Proposed by Larry E Works
Status: Work in progress
Proposed branch: lp:~larry-e-works/uci-engine/amqp-to-kombu
Merge into: lp:uci-engine
Diff against target: 120 lines (+32/-17)
2 files modified
ci-utils/ci_utils/amqp_utils.py (+30/-16)
testing/venv.py (+2/-1)
To merge this branch: bzr merge lp:~larry-e-works/uci-engine/amqp-to-kombu
Reviewer Review Type Date Requested Status
Canonical CI Engineering Pending
Review via email: mp+236614@code.launchpad.net

Commit message

Changed the bits in ci-utils/ci_utils/amqp_utils.py to use kombu instead of libamqp; has not been fully tested yet as I receive an error when attempting to run run-tests on a bootstack deployment. Will wipe that out and try a local LXC deployment.

Description of the change

Merged from trunk to gather in all of the changes since work started on this; no conflicts.

Changed the bits in ci-utils/ci_utils/amqp_utils.py to use kombu instead of libamqp; has not been fully tested yet as I receive an error when attempting to run run-tests on a bootstack deployment. Will wipe that out and try a local LXC deployment.

To post a comment you must log in.
770. By Larry E Works

Seems I missed a comma at the end of the password=rabbit_config.AMQP_PASSWORD line. Have added and will re-run.

771. By Larry E Works

Merged from trunk while I was at it; no conflicts.

Revision history for this message
Larry E Works (larry-e-works) wrote :

Was thinking about this earlier (and the traceback received when attempting to run the tests) and the problem originates with with a missing comma in setting the kombu connection string. Have fixed that and will try again.

772. By Larry E Works

Modified send() in amqp_utils.pu to call kombu in the correct manner. This worked with the test in the sense that I can ssh into the rabbit server and see the test queue get created and populated. The rest of the test still fails since the test itself (tests.test_rabbit.TestRabbit.test_acked_message) is still written to use libamqp functionality that doesn not directly translate to kombu functionality.

Modified venv.py to add the needed kombu dependencies for testing. The deps have been downloaded locally, added and pushed to lp:~canonical-ci-engineering/uci-engine/deps/

Unmerged revisions

772. By Larry E Works

Modified send() in amqp_utils.pu to call kombu in the correct manner. This worked with the test in the sense that I can ssh into the rabbit server and see the test queue get created and populated. The rest of the test still fails since the test itself (tests.test_rabbit.TestRabbit.test_acked_message) is still written to use libamqp functionality that doesn not directly translate to kombu functionality.

Modified venv.py to add the needed kombu dependencies for testing. The deps have been downloaded locally, added and pushed to lp:~canonical-ci-engineering/uci-engine/deps/

771. By Larry E Works

Merged from trunk while I was at it; no conflicts.

770. By Larry E Works

Seems I missed a comma at the end of the password=rabbit_config.AMQP_PASSWORD line. Have added and will re-run.

769. By Larry E Works

Massive merge from trunk, no conflicts.

768. By Larry E Works

Initial try at replacing amqp with kombu.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ci-utils/ci_utils/amqp_utils.py'
2--- ci-utils/ci_utils/amqp_utils.py 2014-08-19 15:10:19 +0000
3+++ ci-utils/ci_utils/amqp_utils.py 2014-10-02 20:36:00 +0000
4@@ -19,7 +19,9 @@
5 import socket
6 import time
7
8-from amqplib import client_0_8 as amqp
9+##TOAD##from amqplib import client_0_8 as amqp
10+from kombu import Connection, Producer, Exchange, Queue
11+from kombu.mixins import ConsumerMixin
12
13 BSBUILDER_QUEUE = 'bsbuilder'
14 IMAGE_BUILDER_QUEUE = 'imagebuilder'
15@@ -46,6 +48,17 @@
16 HERE = os.path.abspath(os.path.dirname(__file__))
17
18
19+class Worker(ConsumerMixin):
20+
21+ def __init__(self, connection, queue, callback):
22+ self.connection = connection
23+ self.queue = queue
24+ return
25+
26+ def get_consumers(self, Consumer, channel):
27+ return [Consumer(self.queue, callbacks=callback)]
28+
29+
30 def get_config():
31 '''Load the rabbit config created by the charm'''
32 config = None
33@@ -65,11 +78,12 @@
34
35
36 def connection(rabbit_config):
37- return amqp.Connection(
38+ return Connection(
39+ 'librabbitmq://' + rabbit_config.AMQP_HOST + '//',
40 userid=rabbit_config.AMQP_USER,
41+ password=rabbit_config.AMQP_PASSWORD,
42 virtual_host=rabbit_config.AMQP_VHOST,
43- host=rabbit_config.AMQP_HOST,
44- password=rabbit_config.AMQP_PASSWORD
45+ failover_strategy='round-robin'
46 )
47
48
49@@ -79,12 +93,12 @@
50 conn = channel = None
51 try:
52 conn = connection(config)
53- channel = conn.channel()
54- channel.queue_declare(queue=queue_name, durable=True,
55- auto_delete=False)
56+ conn.ensure_connection()
57+ print " Connected: " + str(conn.connection)
58+ queue = Queue(queue_name, durable=True, auto_delete=False)
59+ channel = queue(conn)
60+ channel.declare()
61 except Exception as e:
62- if channel:
63- channel.close()
64 if conn:
65 conn.close()
66 raise e
67@@ -104,17 +118,17 @@
68 conn = channel = None
69 try:
70 conn, channel = declare_queue(queue_name, config)
71- body = amqp.Message(msg)
72- body.properties['delivery_mode'] = 2 # Persistent
73- channel.basic_publish(body, exchange='', routing_key=queue_name)
74+ producer = Producer(conn)
75+ producer.publish(msg, routing_key=queue_name, delivery_mode=2)
76+##TOAD## body = amqp.Message(msg)
77+##TOAD## body.properties['delivery_mode'] = 2 # Persistent
78+##TOAD## channel.basic_publish(body, exchange='', routing_key=queue_name)
79 except Exception as e:
80 if raise_errors:
81 raise
82 logging.exception('unable to queue up message: %s', e)
83 return str(e)
84 finally:
85- if channel:
86- channel.close()
87 if conn:
88 conn.close()
89 return None
90@@ -122,14 +136,14 @@
91
92 def _run_forever(channel, queue, callback, retry_period=120):
93 logging.info('Waiting for messages. ^C to exit.')
94- tag = channel.basic_consume(callback=callback, queue=queue)
95+ tag = channel.ConsumerMixin.Consumer(queue, callbacks=callback)
96 try:
97 timeout = time.time()
98 while channel.callbacks and time.time() < timeout + retry_period:
99 try:
100 channel.wait()
101 timeout = time.time()
102- except (amqp.AMQPConnectionException, socket.error):
103+ except (kombu.ConnectionError, socket.error):
104 logging.error('lost connection to Rabbit')
105 # TODO metrics.meter('lost_rabbit_connection')
106 # Don't probe immediately, give the network/process
107
108=== modified file 'testing/venv.py'
109--- testing/venv.py 2014-09-04 17:02:00 +0000
110+++ testing/venv.py 2014-10-02 20:36:00 +0000
111@@ -107,7 +107,8 @@
112
113 def additional_deps():
114 '''Install addtional dependencies.'''
115- additional_deps = ['cheetah', 'python-apt', 'jinja2']
116+ additional_deps = ['cheetah', 'python-apt', 'jinja2', 'kombu', 'amqp',
117+ 'librabbitmq', 'anyjson']
118 for dep in additional_deps:
119 cache_file = 'file://%s' % os.environ['PIP_CACHE']
120 cmd = ['pip', 'install', '--no-index', '-f', cache_file, dep]

Subscribers

People subscribed via source and target branches