Merge lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-queue into lp:uservice-utils

Proposed by Thomi Richards
Status: Merged
Approved by: Joe Talbott
Approved revision: 10
Merged at revision: 9
Proposed branch: lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-queue
Merge into: lp:uservice-utils
Diff against target: 434 lines (+393/-0)
5 files modified
CHANGELOG (+6/-0)
README.rst (+6/-0)
setup.py (+1/-0)
uservice_utils/queue.py (+195/-0)
uservice_utils/tests/test_queue.py (+185/-0)
To merge this branch: bzr merge lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-queue
Reviewer Review Type Date Requested Status
Joe Talbott (community) Approve
Review via email: mp+255044@code.launchpad.net

Commit message

Add the queue module, containing kombu abstractions.

Description of the change

Move the queue code, with a few new improvments to uservice_utils.

To post a comment you must log in.
Revision history for this message
Joe Talbott (joetalbott) wrote :

Some in-line nits. Otherwise nice work!

review: Approve
10. By Thomi Richards

Fix code from merge proposal.

Revision history for this message
Joe Talbott (joetalbott) :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'CHANGELOG'
2--- CHANGELOG 2015-04-02 04:07:12 +0000
3+++ CHANGELOG 2015-04-02 19:29:13 +0000
4@@ -1,6 +1,12 @@
5 Release History:
6 ################
7
8+NEXT:
9+=====
10+
11+* Added the `uservuce_utils.queue` module, which contains several useful ways
12+ to interact with kombu.
13+
14 1.0.1
15 =====
16
17
18=== modified file 'README.rst'
19--- README.rst 2015-04-01 02:42:16 +0000
20+++ README.rst 2015-04-02 19:29:13 +0000
21@@ -23,5 +23,11 @@
22
23 $ python setup.py test
24
25+Dependencies:
26+=============
27
28+This library contains many different parts, and we don't want to force users to
29+install all the dependencies for all the parts in order to use any one piece.
30+For that reason, setup.py does not list any install-time dependencies, and users
31+of this library must ensure they have the required dependencies configured.
32
33
34=== modified file 'setup.py'
35--- setup.py 2015-04-02 02:43:04 +0000
36+++ setup.py 2015-04-02 19:29:13 +0000
37@@ -51,6 +51,7 @@
38 tests_require=[
39 'fixtures',
40 'testtools',
41+ 'kombu',
42 ],
43 classifiers=[
44 'Development Status :: 5 - Production/Stable',
45
46=== added file 'uservice_utils/queue.py'
47--- uservice_utils/queue.py 1970-01-01 00:00:00 +0000
48+++ uservice_utils/queue.py 2015-04-02 19:29:13 +0000
49@@ -0,0 +1,195 @@
50+# uservice-utils
51+# Copyright (C) 2015 Canonical
52+#
53+# This program is free software: you can redistribute it and/or modify
54+# it under the terms of the GNU General Public License as published by
55+# the Free Software Foundation, either version 3 of the License, or
56+# (at your option) any later version.
57+#
58+# This program is distributed in the hope that it will be useful,
59+# but WITHOUT ANY WARRANTY; without even the implied warranty of
60+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
61+# GNU General Public License for more details.
62+#
63+# You should have received a copy of the GNU General Public License
64+# along with this program. If not, see <http://www.gnu.org/licenses/>.
65+#
66+
67+"""Code to abstract away rabbit message queues."""
68+
69+import enum
70+import logging
71+from pdb import bdb
72+
73+import kombu
74+from kombu.mixins import ConsumerMixin
75+
76+logger = logging.getLogger(__name__)
77+
78+__all__ = [
79+ 'MessageActions',
80+ 'SimpleRabbitQueueWorker',
81+ 'RetryPolicy',
82+ 'DefaultRetryPolicy',
83+]
84+
85+
86+class MessageActions(enum.Enum):
87+
88+ """Actions that the message processing callback can request.
89+
90+ These enum values can be returned from the message processing callback,
91+ and the CoreImagePublisherQueueMonitor will take appropriate action.
92+
93+ """
94+
95+ # The message needs to be retried. This might cause the message to be
96+ # retried, or it might reject the message if it's retry count has been
97+ # exceeded.
98+ Retry = 1
99+
100+ # The message was processed properly, and should be acknowledged.
101+ Acknowledge = 2
102+
103+
104+class SimpleRabbitQueueWorker(ConsumerMixin):
105+
106+ """A class that watches for incoming messages from a single queue, and calls
107+ a callback to process them.
108+
109+ Note that this class does not deal with rabbitMQ exchanges - it only works
110+ with single, direct queues.
111+
112+ """
113+
114+ def __init__(self, connection, queue, on_message_cb, retry_policy):
115+ """Construct a new SimpleRabbitQueueWorker.
116+
117+ The connection must be a kombu.Connection instance, but otherwise may
118+ be constructed to point at any backend (including a memory backend).
119+
120+ The queue should be a single string, pointing to the queue you wish to
121+ monitor.
122+
123+ The callback can be any callable, including a class with __call__
124+ defined. The callable must accept a single parameter, which will be
125+ the decoded message payload. The callable must also return one of the
126+ MessageActions attributes, and must not raise any uncaught exceptions.
127+
128+ The retry policy should be an instance of RetryPolicy, or one of it's
129+ subclasses. The 'DefaultRetryPolicy' is a sensible policy that's been
130+ created for you already.
131+
132+ :param connection: A kombu.Connection instance.
133+ :param queues: A list of rabbitMQ queue names.
134+ :param on_message_db: A callable that will be called for every new
135+ messages from the rabbitMQ queue(s).
136+ :param retry_policy: An instance of RetryPolicy, or it's subclass.
137+
138+ """
139+ self.connection = connection
140+ self._queue = queue
141+ self.on_message_cb = on_message_cb
142+ self.retry_policy = retry_policy
143+
144+ def get_consumers(self, Consumer, channel):
145+ """Return consumers instances for all configured queues."""
146+ queues = [ kombu.Queue(self._queue) ]
147+ return [Consumer(queues=queues, callbacks=[self.process])]
148+
149+ def process(self, body, message):
150+ """Handle incoming message.
151+
152+ We hand off processing to the worker callback, and manage calling the
153+ retry policy or ack()ing the message depending on the returned value
154+ from the worker.
155+
156+ """
157+ try:
158+ requested_action = self.on_message_cb(body)
159+ if requested_action == MessageActions.Retry:
160+ # delegate to the retry policy:
161+ self.retry_policy.retry(self.connection, message)
162+ elif requested_action == MessageActions.Acknowledge:
163+ message.ack()
164+ elif requested_action is None:
165+ # treat this as a silent ack, but maybe we should change this
166+ # to print a warning in the future?
167+ message.ack()
168+ # Make it possible to use a debugger inside the worker callback:
169+ except bdb.BdbQuit:
170+ raise
171+ except Exception as e:
172+ logger.error(
173+ "Caught unhandled exception while processing message: %s",
174+ e,
175+ )
176+ self.retry_policy.retry(self.connection, message)
177+
178+
179+class RetryPolicy(object):
180+
181+ """Encapsulate a retry policy.
182+
183+ This is a base class for all retry policies. A retry policy dictates what
184+ happens to a message when the consumer callback returns
185+ MessageActions.Retry.
186+
187+ The work is done in the 'retry' method. The policy can do anything it
188+ likes with the message, including:
189+
190+ * ack()ing the message and putting a new message on another queue.
191+ * reject()ing or requeue()ing the message,
192+ * ...or anythingg else.
193+
194+ The only requirement is that the policy *must* handle the message given
195+ to it. No further processing will be done on the message, so one of
196+ ack() retry() or requeue() must be called.
197+
198+ """
199+
200+ def retry(self, connection, message):
201+ """Determine what to do with 'message'."""
202+ pass
203+
204+
205+class DefaultRetryPolicy(RetryPolicy):
206+
207+ """A sensible 'N-strikes' retry policy.
208+
209+ This policy will retry a message a certain number of times by re-inserting
210+ the message into the last queue it was in. When it's retry limit has been
211+ exceeded, the message will instead be inserted into a dead-letter queue.
212+
213+ """
214+
215+ def __init__(self, max_retries, dead_queue):
216+ """Create a new DefaultRetryPolicy.
217+
218+ 'max_retries' specifies the number of times a message will be retried.
219+ 'dead_queue' specifies the name of the dead-letter queue to use.
220+
221+ """
222+ self.max_retries = max_retries
223+ self.dead_queue = dead_queue
224+
225+ def retry(self, connection, message):
226+ last_queue_name = message.delivery_info['routing_key']
227+ retry_key = "{}_retry_count".format(last_queue_name)
228+ retry_count = int(message.payload.get(retry_key, '0'))
229+
230+ if retry_count < self.max_retries:
231+ message.payload[retry_key] = retry_count + 1
232+ queue = connection.SimpleQueue(last_queue_name)
233+ queue.put(message.payload)
234+ queue.close()
235+ message.ack()
236+ else:
237+ logger.error(
238+ "Rejecting message due to retry count exceeding maximum. "
239+ "Message will reside on the dead letter queue",
240+ )
241+ queue = connection.SimpleQueue(self.dead_queue)
242+ queue.put(message.payload)
243+ queue.close()
244+ message.ack()
245
246=== added file 'uservice_utils/tests/test_queue.py'
247--- uservice_utils/tests/test_queue.py 1970-01-01 00:00:00 +0000
248+++ uservice_utils/tests/test_queue.py 2015-04-02 19:29:13 +0000
249@@ -0,0 +1,185 @@
250+# uservice-utils
251+# Copyright (C) 2015 Canonical
252+#
253+# This program is free software: you can redistribute it and/or modify
254+# it under the terms of the GNU General Public License as published by
255+# the Free Software Foundation, either version 3 of the License, or
256+# (at your option) any later version.
257+#
258+# This program is distributed in the hope that it will be useful,
259+# but WITHOUT ANY WARRANTY; without even the implied warranty of
260+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
261+# GNU General Public License for more details.
262+#
263+# You should have received a copy of the GNU General Public License
264+# along with this program. If not, see <http://www.gnu.org/licenses/>.
265+#
266+
267+"""Integration tests for the kombu queue code."""
268+
269+import kombu
270+import kombu.simple
271+from testtools import TestCase
272+
273+from testtools.matchers import (
274+ Contains,
275+ Equals,
276+)
277+
278+
279+from uservice_utils.queue import (
280+ DefaultRetryPolicy,
281+ MessageActions,
282+ SimpleRabbitQueueWorker,
283+)
284+
285+
286+class KombuQueueIntegrationTests(TestCase):
287+
288+ def test_can_read_and_accept_message(self):
289+ conn = kombu.Connection('memory:///')
290+ queue_name = self.getUniqueString()
291+ queue_message(conn, queue_name, {'test': 'value'})
292+ retry_policy = LoggingRetryPolicy()
293+
294+ consumer = LoggingConsumer(MessageActions.Acknowledge)
295+ q = SimpleRabbitQueueWorker(
296+ conn,
297+ queue_name,
298+ consumer,
299+ retry_policy,
300+ )
301+ # pump the queue to get the enqueued message:
302+ next(q.consume(limit=1, timeout=5.0))
303+
304+ self.assertEqual(consumer.got_messages, [dict(test='value')])
305+ self.assertEqual(retry_policy.payloads_retried, [])
306+
307+ def test_uncaught_exceptions_cause_message_retry(self):
308+ conn = kombu.Connection('memory:///')
309+ queue_name = self.getUniqueString()
310+ message = {'test': 'value'}
311+ queue_message(conn, queue_name, message)
312+ retry_policy = LoggingRetryPolicy()
313+
314+ def consumer_with_bug(message):
315+ raise RuntimeError("I am a bug, all up in ur bizniz!")
316+ queue = SimpleRabbitQueueWorker(
317+ conn,
318+ queue_name,
319+ consumer_with_bug,
320+ retry_policy
321+ )
322+ next(queue.consume(limit=1, timeout=5.0))
323+
324+ self.assertEqual(retry_policy.payloads_retried, [message])
325+
326+ def test_retry_message_works(self):
327+ conn = kombu.Connection('memory:///')
328+ queue_name = self.getUniqueString()
329+ message = {'test': 'value'}
330+ queue_message(conn, queue_name, message)
331+ retry_policy = LoggingRetryPolicy()
332+
333+ consumer = LoggingConsumer(MessageActions.Retry)
334+ q = SimpleRabbitQueueWorker(
335+ conn,
336+ queue_name,
337+ consumer,
338+ retry_policy,
339+ )
340+ # pump the queue to get the enqueued message:
341+ next(q.consume(limit=1, timeout=5.0))
342+
343+ self.assertEqual(consumer.got_messages, [message])
344+ self.assertEqual(retry_policy.payloads_retried, [message])
345+
346+
347+class DefaultRetryPolicyTests(TestCase):
348+ """Tests for the default retry policy."""
349+
350+ def route_message(self, connection, queue_name, message):
351+ """Route a message in an input queue, so we get a proper message object.
352+
353+ Since we rely on kombu having actually made us a proper message object,
354+ we take 'message' and put it on the input queue, then retrieve it again.
355+
356+ """
357+ q = kombu.simple.SimpleQueue(connection, queue_name)
358+ q.put(message)
359+ return q.get(timeout=5.0)
360+
361+ def test_retry_message(self):
362+ input_queue, dead_queue = 'input.queue', 'dead.queue'
363+ conn = kombu.Connection('memory:///')
364+ message = self.route_message(
365+ conn,
366+ input_queue,
367+ dict(test='value')
368+ )
369+ policy = DefaultRetryPolicy(max_retries=1, dead_queue=dead_queue)
370+
371+ policy.retry(conn, message)
372+
373+ # make sure the input message was dealt with:
374+ self.assertTrue(message.acknowledged)
375+
376+ # look in our input queue and make sure out message is there with a new
377+ # retry count:
378+ new_message = conn.SimpleQueue(input_queue).get(timeout=5.0)
379+ new_payload = new_message.payload
380+
381+ expected_retry_key = input_queue + '_retry_count'
382+ self.assertThat(new_payload, Contains(expected_retry_key))
383+ self.assertThat(new_payload[expected_retry_key], Equals(1))
384+
385+ def test_kill_message(self):
386+ input_queue, dead_queue = 'input.queue', 'dead.queue'
387+ conn = kombu.Connection('memory:///')
388+ message = self.route_message(
389+ conn,
390+ input_queue,
391+ dict(test='value')
392+ )
393+ policy = DefaultRetryPolicy(max_retries=0, dead_queue=dead_queue)
394+
395+ policy.retry(conn, message)
396+
397+ # make sure the input message was dealt with:
398+ self.assertTrue(message.acknowledged)
399+
400+ # look in the dead letter queue and make sure our message is there
401+ new_message = conn.SimpleQueue(dead_queue).get(timeout=5.0)
402+ new_payload = new_message.payload
403+
404+ # The policy shouldn't alter the payload when moving to the dead letter queue
405+ self.assertThat(new_payload, Equals(dict(test='value')))
406+
407+
408+class LoggingConsumer(object):
409+
410+ """A consumer callback object that acks and logs all received payloads."""
411+
412+ def __init__(self, return_action):
413+ self.got_messages = []
414+ self.return_action = return_action
415+
416+ def __call__(self, payload):
417+ self.got_messages.append(payload)
418+ return self.return_action
419+
420+
421+class LoggingRetryPolicy(object):
422+
423+ """A fake retry policy that ack()s all messages, but logs their payloads."""
424+ def __init__(self):
425+ self.payloads_retried = []
426+
427+ def retry(self, connection, message):
428+ self.payloads_retried.append(message.payload)
429+ message.ack()
430+
431+
432+def queue_message(conn, queue, message):
433+ q = kombu.simple.SimpleQueue(conn, queue)
434+ q.put(message)

Subscribers

People subscribed via source and target branches

to all changes: