Merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-add-queue-listener into lp:core-image-publisher

Proposed by Thomi Richards
Status: Merged
Approved by: Thomi Richards
Approved revision: 3
Merged at revision: 4
Proposed branch: lp:~thomir-deactivatedaccount/core-image-publisher/trunk-add-queue-listener
Merge into: lp:core-image-publisher
Diff against target: 269 lines (+211/-3)
8 files modified
core_image_publisher/__init__.py (+16/-1)
core_image_publisher/constants.py (+20/-0)
core_image_publisher/queue.py (+78/-0)
core_image_publisher/tests/__init__.py (+16/-0)
core_image_publisher/tests/test_queue_integration.py (+76/-0)
requirements.txt (+1/-0)
setup.py (+3/-2)
test_requirements.txt (+1/-0)
To merge this branch: bzr merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-add-queue-listener
Reviewer Review Type Date Requested Status
Celso Providelo (community) Approve
Review via email: mp+254159@code.launchpad.net

Commit message

Add code to read from amqp.

Description of the change

Add code to connect to, and read from amqp. A few changes from last sprint, in order to support testability.

To post a comment you must log in.
Revision history for this message
Celso Providelo (cprov) wrote :

Thomi,

I suppose we can use the on_message callback pattern for consumers, It kind of adds some additional indirection levels but makes it testable.

Let's got with it till the completion of the publisher, then we can evaluate if it's good to be propagated to the other services.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'core_image_publisher/__init__.py'
2--- core_image_publisher/__init__.py 2015-03-25 02:46:02 +0000
3+++ core_image_publisher/__init__.py 2015-03-25 21:37:19 +0000
4@@ -1,4 +1,19 @@
5-
6+# core-image-publisher
7+# Copyright (C) 2015 Canonical
8+#
9+# This program is free software: you can redistribute it and/or modify
10+# it under the terms of the GNU General Public License as published by
11+# the Free Software Foundation, either version 3 of the License, or
12+# (at your option) any later version.
13+#
14+# This program is distributed in the hope that it will be useful,
15+# but WITHOUT ANY WARRANTY; without even the implied warranty of
16+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17+# GNU General Public License for more details.
18+#
19+# You should have received a copy of the GNU General Public License
20+# along with this program. If not, see <http://www.gnu.org/licenses/>.
21+#
22
23 import select
24
25
26=== added file 'core_image_publisher/constants.py'
27--- core_image_publisher/constants.py 1970-01-01 00:00:00 +0000
28+++ core_image_publisher/constants.py 2015-03-25 21:37:19 +0000
29@@ -0,0 +1,20 @@
30+# core-image-publisher
31+# Copyright (C) 2015 Canonical
32+#
33+# This program is free software: you can redistribute it and/or modify
34+# it under the terms of the GNU General Public License as published by
35+# the Free Software Foundation, either version 3 of the License, or
36+# (at your option) any later version.
37+#
38+# This program is distributed in the hope that it will be useful,
39+# but WITHOUT ANY WARRANTY; without even the implied warranty of
40+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
41+# GNU General Public License for more details.
42+#
43+# You should have received a copy of the GNU General Public License
44+# along with this program. If not, see <http://www.gnu.org/licenses/>.
45+#
46+
47+"""Constants for this service."""
48+
49+API_VERSION="v1"
50
51=== added file 'core_image_publisher/queue.py'
52--- core_image_publisher/queue.py 1970-01-01 00:00:00 +0000
53+++ core_image_publisher/queue.py 2015-03-25 21:37:19 +0000
54@@ -0,0 +1,78 @@
55+# core-image-publisher
56+# Copyright (C) 2015 Canonical
57+#
58+# This program is free software: you can redistribute it and/or modify
59+# it under the terms of the GNU General Public License as published by
60+# the Free Software Foundation, either version 3 of the License, or
61+# (at your option) any later version.
62+#
63+# This program is distributed in the hope that it will be useful,
64+# but WITHOUT ANY WARRANTY; without even the implied warranty of
65+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
66+# GNU General Public License for more details.
67+#
68+# You should have received a copy of the GNU General Public License
69+# along with this program. If not, see <http://www.gnu.org/licenses/>.
70+#
71+
72+import logging
73+
74+import kombu
75+from kombu.mixins import ConsumerMixin
76+
77+from core_image_publisher import constants
78+
79+logger = logging.getLogger(__name__)
80+
81+__all__ = [
82+ 'CoreImagePublisherQueueMonitor'
83+]
84+
85+
86+class CoreImagePublisherQueueMonitor(ConsumerMixin):
87+
88+ """A class that watches for incoming messages from a queue, and calls
89+ a callback to process them.
90+
91+ By using an external callback we can separate the stateful logic of
92+ connecting to amqp from our business logic of dealing with the messages.
93+
94+ """
95+
96+ def __init__(self, connection, on_message_cb):
97+ """Construct a new CoreImagePublisherQueueMonitor.
98+
99+ :param connection: A kombu.Connection instance.
100+ :param on_message_db: A callable that will be called for every new
101+ messages from the rabbitMQ queue(s). It must accept a single
102+ parameter: the message object. The callback MUST either acknowledge
103+ or reject or requeue the messsage, and MUST NOT raise any unhandled
104+ exceptions.
105+
106+ """
107+ self.connection = connection
108+ self.on_message_cb = on_message_cb
109+
110+ def get_consumers(self, Consumer, channel):
111+ """Return consumers instances for all configured queues."""
112+ queues = [
113+ kombu.Queue('core.image.{}'.format(constants.API_VERSION))
114+ ]
115+ return [Consumer(queues=queues, callbacks=[self.process])]
116+
117+ def process(self, body, message):
118+ """Process incomming test request.
119+
120+ Run requested tests and posts results to the 'adt_results' queue
121+ for later checking.
122+ """
123+ logger.info('Got: {}'.format(body), extra=body)
124+ try:
125+ self.on_message_cb(message)
126+ except Exception as e:
127+ logger.error(
128+ "Caught unhandled exception while processing message: %s",
129+ e,
130+ extra=message.payload
131+ )
132+ message.requeue()
133
134=== added directory 'core_image_publisher/tests'
135=== added file 'core_image_publisher/tests/__init__.py'
136--- core_image_publisher/tests/__init__.py 1970-01-01 00:00:00 +0000
137+++ core_image_publisher/tests/__init__.py 2015-03-25 21:37:19 +0000
138@@ -0,0 +1,16 @@
139+# core-image-publisher
140+# Copyright (C) 2015 Canonical
141+#
142+# This program is free software: you can redistribute it and/or modify
143+# it under the terms of the GNU General Public License as published by
144+# the Free Software Foundation, either version 3 of the License, or
145+# (at your option) any later version.
146+#
147+# This program is distributed in the hope that it will be useful,
148+# but WITHOUT ANY WARRANTY; without even the implied warranty of
149+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
150+# GNU General Public License for more details.
151+#
152+# You should have received a copy of the GNU General Public License
153+# along with this program. If not, see <http://www.gnu.org/licenses/>.
154+#
155
156=== added file 'core_image_publisher/tests/test_queue_integration.py'
157--- core_image_publisher/tests/test_queue_integration.py 1970-01-01 00:00:00 +0000
158+++ core_image_publisher/tests/test_queue_integration.py 2015-03-25 21:37:19 +0000
159@@ -0,0 +1,76 @@
160+# core-image-publisher
161+# Copyright (C) 2015 Canonical
162+#
163+# This program is free software: you can redistribute it and/or modify
164+# it under the terms of the GNU General Public License as published by
165+# the Free Software Foundation, either version 3 of the License, or
166+# (at your option) any later version.
167+#
168+# This program is distributed in the hope that it will be useful,
169+# but WITHOUT ANY WARRANTY; without even the implied warranty of
170+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
171+# GNU General Public License for more details.
172+#
173+# You should have received a copy of the GNU General Public License
174+# along with this program. If not, see <http://www.gnu.org/licenses/>.
175+#
176+
177+"""Integration tests for the kombu queue code."""
178+
179+import kombu
180+import kombu.simple
181+from testtools import TestCase
182+
183+
184+from core_image_publisher.queue import CoreImagePublisherQueueMonitor
185+
186+
187+class KombuQueueIntegrationTests(TestCase):
188+
189+ def test_can_read_message(self):
190+ conn = kombu.Connection('memory:///')
191+ queue_message(conn, {'test': 'value'})
192+
193+ consumer = LoggingConsumer()
194+ q = CoreImagePublisherQueueMonitor(conn, consumer)
195+ # pump the queue to get the enqueued message:
196+ next(q.consume(limit=1, timeout=5.0))
197+
198+ self.assertEqual(consumer.got_messages, [dict(test='value')])
199+
200+ def test_uncaught_exceptions_cause_message_requeue(self):
201+ conn = kombu.Connection('memory:///')
202+ queue_message(conn, {'test': 'value'})
203+
204+ def consumer_with_bug(message):
205+ raise RuntimeError("I am a bug, all up in ur bizniz!")
206+
207+ logging_consumer = LoggingConsumer()
208+
209+ queue_bugged = CoreImagePublisherQueueMonitor(conn, consumer_with_bug)
210+ queue_ok = CoreImagePublisherQueueMonitor(conn, logging_consumer)
211+
212+ # pump the queue to get the enqueued message:
213+ next(queue_bugged.consume(limit=1, timeout=5.0))
214+ next(queue_ok.consume(limit=1, timeout=5.0))
215+
216+ self.assertEqual(logging_consumer.got_messages, [dict(test='value')])
217+
218+
219+
220+class LoggingConsumer(object):
221+
222+ """A consumer callback object that acks and logs all received payloads."""
223+
224+ def __init__(self):
225+ self.got_messages = []
226+
227+ def __call__(self, message):
228+ self.got_messages.append(message.payload)
229+ message.ack()
230+
231+
232+def queue_message(conn, message):
233+ q = kombu.simple.SimpleQueue(conn, 'core.image.v1')
234+ q.put(message)
235+
236
237=== modified file 'requirements.txt'
238--- requirements.txt 2015-03-25 02:46:02 +0000
239+++ requirements.txt 2015-03-25 21:37:19 +0000
240@@ -0,0 +1,1 @@
241+kombu==3.0.24
242\ No newline at end of file
243
244=== modified file 'setup.py'
245--- setup.py 2015-03-25 04:19:49 +0000
246+++ setup.py 2015-03-25 21:37:19 +0000
247@@ -1,6 +1,6 @@
248 #!/usr/bin/env python3
249
250-# core-image-watcher
251+# core-image-publisher
252 # Copyright (C) 2015 Canonical
253 #
254 # This program is free software: you can redistribute it and/or modify
255@@ -36,5 +36,6 @@
256 url='https://launchpad.net/core-image-publisher',
257 license='GPLv3',
258 packages=find_packages(),
259- scripts=['core-image-publisher.py']
260+ scripts=['core-image-publisher.py'],
261+ test_suite='core_image_publisher.tests'
262 )
263
264=== modified file 'test_requirements.txt'
265--- test_requirements.txt 2015-03-25 02:46:02 +0000
266+++ test_requirements.txt 2015-03-25 21:37:19 +0000
267@@ -0,0 +1,1 @@
268+testtools==1.7.1
269\ No newline at end of file

Subscribers

People subscribed via source and target branches

to all changes: