Merge lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-queue into lp:uservice-utils
- trunk-add-queue
- Merge into trunk
Proposed by
Thomi Richards
Status: | Merged |
---|---|
Approved by: | Joe Talbott |
Approved revision: | 10 |
Merged at revision: | 9 |
Proposed branch: | lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-queue |
Merge into: | lp:uservice-utils |
Diff against target: |
434 lines (+393/-0) 5 files modified
CHANGELOG (+6/-0) README.rst (+6/-0) setup.py (+1/-0) uservice_utils/queue.py (+195/-0) uservice_utils/tests/test_queue.py (+185/-0) |
To merge this branch: | bzr merge lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-queue |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Joe Talbott (community) | Approve | ||
Review via email: mp+255044@code.launchpad.net |
Commit message
Add the queue module, containing kombu abstractions.
Description of the change
Move the queue code, with a few new improvments to uservice_utils.
To post a comment you must log in.
- 10. By Thomi Richards
-
Fix code from merge proposal.
Revision history for this message
Joe Talbott (joetalbott) : | # |
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'CHANGELOG' |
2 | --- CHANGELOG 2015-04-02 04:07:12 +0000 |
3 | +++ CHANGELOG 2015-04-02 19:29:13 +0000 |
4 | @@ -1,6 +1,12 @@ |
5 | Release History: |
6 | ################ |
7 | |
8 | +NEXT: |
9 | +===== |
10 | + |
11 | +* Added the `uservuce_utils.queue` module, which contains several useful ways |
12 | + to interact with kombu. |
13 | + |
14 | 1.0.1 |
15 | ===== |
16 | |
17 | |
18 | === modified file 'README.rst' |
19 | --- README.rst 2015-04-01 02:42:16 +0000 |
20 | +++ README.rst 2015-04-02 19:29:13 +0000 |
21 | @@ -23,5 +23,11 @@ |
22 | |
23 | $ python setup.py test |
24 | |
25 | +Dependencies: |
26 | +============= |
27 | |
28 | +This library contains many different parts, and we don't want to force users to |
29 | +install all the dependencies for all the parts in order to use any one piece. |
30 | +For that reason, setup.py does not list any install-time dependencies, and users |
31 | +of this library must ensure they have the required dependencies configured. |
32 | |
33 | |
34 | === modified file 'setup.py' |
35 | --- setup.py 2015-04-02 02:43:04 +0000 |
36 | +++ setup.py 2015-04-02 19:29:13 +0000 |
37 | @@ -51,6 +51,7 @@ |
38 | tests_require=[ |
39 | 'fixtures', |
40 | 'testtools', |
41 | + 'kombu', |
42 | ], |
43 | classifiers=[ |
44 | 'Development Status :: 5 - Production/Stable', |
45 | |
46 | === added file 'uservice_utils/queue.py' |
47 | --- uservice_utils/queue.py 1970-01-01 00:00:00 +0000 |
48 | +++ uservice_utils/queue.py 2015-04-02 19:29:13 +0000 |
49 | @@ -0,0 +1,195 @@ |
50 | +# uservice-utils |
51 | +# Copyright (C) 2015 Canonical |
52 | +# |
53 | +# This program is free software: you can redistribute it and/or modify |
54 | +# it under the terms of the GNU General Public License as published by |
55 | +# the Free Software Foundation, either version 3 of the License, or |
56 | +# (at your option) any later version. |
57 | +# |
58 | +# This program is distributed in the hope that it will be useful, |
59 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
60 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
61 | +# GNU General Public License for more details. |
62 | +# |
63 | +# You should have received a copy of the GNU General Public License |
64 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
65 | +# |
66 | + |
67 | +"""Code to abstract away rabbit message queues.""" |
68 | + |
69 | +import enum |
70 | +import logging |
71 | +from pdb import bdb |
72 | + |
73 | +import kombu |
74 | +from kombu.mixins import ConsumerMixin |
75 | + |
76 | +logger = logging.getLogger(__name__) |
77 | + |
78 | +__all__ = [ |
79 | + 'MessageActions', |
80 | + 'SimpleRabbitQueueWorker', |
81 | + 'RetryPolicy', |
82 | + 'DefaultRetryPolicy', |
83 | +] |
84 | + |
85 | + |
86 | +class MessageActions(enum.Enum): |
87 | + |
88 | + """Actions that the message processing callback can request. |
89 | + |
90 | + These enum values can be returned from the message processing callback, |
91 | + and the CoreImagePublisherQueueMonitor will take appropriate action. |
92 | + |
93 | + """ |
94 | + |
95 | + # The message needs to be retried. This might cause the message to be |
96 | + # retried, or it might reject the message if it's retry count has been |
97 | + # exceeded. |
98 | + Retry = 1 |
99 | + |
100 | + # The message was processed properly, and should be acknowledged. |
101 | + Acknowledge = 2 |
102 | + |
103 | + |
104 | +class SimpleRabbitQueueWorker(ConsumerMixin): |
105 | + |
106 | + """A class that watches for incoming messages from a single queue, and calls |
107 | + a callback to process them. |
108 | + |
109 | + Note that this class does not deal with rabbitMQ exchanges - it only works |
110 | + with single, direct queues. |
111 | + |
112 | + """ |
113 | + |
114 | + def __init__(self, connection, queue, on_message_cb, retry_policy): |
115 | + """Construct a new SimpleRabbitQueueWorker. |
116 | + |
117 | + The connection must be a kombu.Connection instance, but otherwise may |
118 | + be constructed to point at any backend (including a memory backend). |
119 | + |
120 | + The queue should be a single string, pointing to the queue you wish to |
121 | + monitor. |
122 | + |
123 | + The callback can be any callable, including a class with __call__ |
124 | + defined. The callable must accept a single parameter, which will be |
125 | + the decoded message payload. The callable must also return one of the |
126 | + MessageActions attributes, and must not raise any uncaught exceptions. |
127 | + |
128 | + The retry policy should be an instance of RetryPolicy, or one of it's |
129 | + subclasses. The 'DefaultRetryPolicy' is a sensible policy that's been |
130 | + created for you already. |
131 | + |
132 | + :param connection: A kombu.Connection instance. |
133 | + :param queues: A list of rabbitMQ queue names. |
134 | + :param on_message_db: A callable that will be called for every new |
135 | + messages from the rabbitMQ queue(s). |
136 | + :param retry_policy: An instance of RetryPolicy, or it's subclass. |
137 | + |
138 | + """ |
139 | + self.connection = connection |
140 | + self._queue = queue |
141 | + self.on_message_cb = on_message_cb |
142 | + self.retry_policy = retry_policy |
143 | + |
144 | + def get_consumers(self, Consumer, channel): |
145 | + """Return consumers instances for all configured queues.""" |
146 | + queues = [ kombu.Queue(self._queue) ] |
147 | + return [Consumer(queues=queues, callbacks=[self.process])] |
148 | + |
149 | + def process(self, body, message): |
150 | + """Handle incoming message. |
151 | + |
152 | + We hand off processing to the worker callback, and manage calling the |
153 | + retry policy or ack()ing the message depending on the returned value |
154 | + from the worker. |
155 | + |
156 | + """ |
157 | + try: |
158 | + requested_action = self.on_message_cb(body) |
159 | + if requested_action == MessageActions.Retry: |
160 | + # delegate to the retry policy: |
161 | + self.retry_policy.retry(self.connection, message) |
162 | + elif requested_action == MessageActions.Acknowledge: |
163 | + message.ack() |
164 | + elif requested_action is None: |
165 | + # treat this as a silent ack, but maybe we should change this |
166 | + # to print a warning in the future? |
167 | + message.ack() |
168 | + # Make it possible to use a debugger inside the worker callback: |
169 | + except bdb.BdbQuit: |
170 | + raise |
171 | + except Exception as e: |
172 | + logger.error( |
173 | + "Caught unhandled exception while processing message: %s", |
174 | + e, |
175 | + ) |
176 | + self.retry_policy.retry(self.connection, message) |
177 | + |
178 | + |
179 | +class RetryPolicy(object): |
180 | + |
181 | + """Encapsulate a retry policy. |
182 | + |
183 | + This is a base class for all retry policies. A retry policy dictates what |
184 | + happens to a message when the consumer callback returns |
185 | + MessageActions.Retry. |
186 | + |
187 | + The work is done in the 'retry' method. The policy can do anything it |
188 | + likes with the message, including: |
189 | + |
190 | + * ack()ing the message and putting a new message on another queue. |
191 | + * reject()ing or requeue()ing the message, |
192 | + * ...or anythingg else. |
193 | + |
194 | + The only requirement is that the policy *must* handle the message given |
195 | + to it. No further processing will be done on the message, so one of |
196 | + ack() retry() or requeue() must be called. |
197 | + |
198 | + """ |
199 | + |
200 | + def retry(self, connection, message): |
201 | + """Determine what to do with 'message'.""" |
202 | + pass |
203 | + |
204 | + |
205 | +class DefaultRetryPolicy(RetryPolicy): |
206 | + |
207 | + """A sensible 'N-strikes' retry policy. |
208 | + |
209 | + This policy will retry a message a certain number of times by re-inserting |
210 | + the message into the last queue it was in. When it's retry limit has been |
211 | + exceeded, the message will instead be inserted into a dead-letter queue. |
212 | + |
213 | + """ |
214 | + |
215 | + def __init__(self, max_retries, dead_queue): |
216 | + """Create a new DefaultRetryPolicy. |
217 | + |
218 | + 'max_retries' specifies the number of times a message will be retried. |
219 | + 'dead_queue' specifies the name of the dead-letter queue to use. |
220 | + |
221 | + """ |
222 | + self.max_retries = max_retries |
223 | + self.dead_queue = dead_queue |
224 | + |
225 | + def retry(self, connection, message): |
226 | + last_queue_name = message.delivery_info['routing_key'] |
227 | + retry_key = "{}_retry_count".format(last_queue_name) |
228 | + retry_count = int(message.payload.get(retry_key, '0')) |
229 | + |
230 | + if retry_count < self.max_retries: |
231 | + message.payload[retry_key] = retry_count + 1 |
232 | + queue = connection.SimpleQueue(last_queue_name) |
233 | + queue.put(message.payload) |
234 | + queue.close() |
235 | + message.ack() |
236 | + else: |
237 | + logger.error( |
238 | + "Rejecting message due to retry count exceeding maximum. " |
239 | + "Message will reside on the dead letter queue", |
240 | + ) |
241 | + queue = connection.SimpleQueue(self.dead_queue) |
242 | + queue.put(message.payload) |
243 | + queue.close() |
244 | + message.ack() |
245 | |
246 | === added file 'uservice_utils/tests/test_queue.py' |
247 | --- uservice_utils/tests/test_queue.py 1970-01-01 00:00:00 +0000 |
248 | +++ uservice_utils/tests/test_queue.py 2015-04-02 19:29:13 +0000 |
249 | @@ -0,0 +1,185 @@ |
250 | +# uservice-utils |
251 | +# Copyright (C) 2015 Canonical |
252 | +# |
253 | +# This program is free software: you can redistribute it and/or modify |
254 | +# it under the terms of the GNU General Public License as published by |
255 | +# the Free Software Foundation, either version 3 of the License, or |
256 | +# (at your option) any later version. |
257 | +# |
258 | +# This program is distributed in the hope that it will be useful, |
259 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
260 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
261 | +# GNU General Public License for more details. |
262 | +# |
263 | +# You should have received a copy of the GNU General Public License |
264 | +# along with this program. If not, see <http://www.gnu.org/licenses/>. |
265 | +# |
266 | + |
267 | +"""Integration tests for the kombu queue code.""" |
268 | + |
269 | +import kombu |
270 | +import kombu.simple |
271 | +from testtools import TestCase |
272 | + |
273 | +from testtools.matchers import ( |
274 | + Contains, |
275 | + Equals, |
276 | +) |
277 | + |
278 | + |
279 | +from uservice_utils.queue import ( |
280 | + DefaultRetryPolicy, |
281 | + MessageActions, |
282 | + SimpleRabbitQueueWorker, |
283 | +) |
284 | + |
285 | + |
286 | +class KombuQueueIntegrationTests(TestCase): |
287 | + |
288 | + def test_can_read_and_accept_message(self): |
289 | + conn = kombu.Connection('memory:///') |
290 | + queue_name = self.getUniqueString() |
291 | + queue_message(conn, queue_name, {'test': 'value'}) |
292 | + retry_policy = LoggingRetryPolicy() |
293 | + |
294 | + consumer = LoggingConsumer(MessageActions.Acknowledge) |
295 | + q = SimpleRabbitQueueWorker( |
296 | + conn, |
297 | + queue_name, |
298 | + consumer, |
299 | + retry_policy, |
300 | + ) |
301 | + # pump the queue to get the enqueued message: |
302 | + next(q.consume(limit=1, timeout=5.0)) |
303 | + |
304 | + self.assertEqual(consumer.got_messages, [dict(test='value')]) |
305 | + self.assertEqual(retry_policy.payloads_retried, []) |
306 | + |
307 | + def test_uncaught_exceptions_cause_message_retry(self): |
308 | + conn = kombu.Connection('memory:///') |
309 | + queue_name = self.getUniqueString() |
310 | + message = {'test': 'value'} |
311 | + queue_message(conn, queue_name, message) |
312 | + retry_policy = LoggingRetryPolicy() |
313 | + |
314 | + def consumer_with_bug(message): |
315 | + raise RuntimeError("I am a bug, all up in ur bizniz!") |
316 | + queue = SimpleRabbitQueueWorker( |
317 | + conn, |
318 | + queue_name, |
319 | + consumer_with_bug, |
320 | + retry_policy |
321 | + ) |
322 | + next(queue.consume(limit=1, timeout=5.0)) |
323 | + |
324 | + self.assertEqual(retry_policy.payloads_retried, [message]) |
325 | + |
326 | + def test_retry_message_works(self): |
327 | + conn = kombu.Connection('memory:///') |
328 | + queue_name = self.getUniqueString() |
329 | + message = {'test': 'value'} |
330 | + queue_message(conn, queue_name, message) |
331 | + retry_policy = LoggingRetryPolicy() |
332 | + |
333 | + consumer = LoggingConsumer(MessageActions.Retry) |
334 | + q = SimpleRabbitQueueWorker( |
335 | + conn, |
336 | + queue_name, |
337 | + consumer, |
338 | + retry_policy, |
339 | + ) |
340 | + # pump the queue to get the enqueued message: |
341 | + next(q.consume(limit=1, timeout=5.0)) |
342 | + |
343 | + self.assertEqual(consumer.got_messages, [message]) |
344 | + self.assertEqual(retry_policy.payloads_retried, [message]) |
345 | + |
346 | + |
347 | +class DefaultRetryPolicyTests(TestCase): |
348 | + """Tests for the default retry policy.""" |
349 | + |
350 | + def route_message(self, connection, queue_name, message): |
351 | + """Route a message in an input queue, so we get a proper message object. |
352 | + |
353 | + Since we rely on kombu having actually made us a proper message object, |
354 | + we take 'message' and put it on the input queue, then retrieve it again. |
355 | + |
356 | + """ |
357 | + q = kombu.simple.SimpleQueue(connection, queue_name) |
358 | + q.put(message) |
359 | + return q.get(timeout=5.0) |
360 | + |
361 | + def test_retry_message(self): |
362 | + input_queue, dead_queue = 'input.queue', 'dead.queue' |
363 | + conn = kombu.Connection('memory:///') |
364 | + message = self.route_message( |
365 | + conn, |
366 | + input_queue, |
367 | + dict(test='value') |
368 | + ) |
369 | + policy = DefaultRetryPolicy(max_retries=1, dead_queue=dead_queue) |
370 | + |
371 | + policy.retry(conn, message) |
372 | + |
373 | + # make sure the input message was dealt with: |
374 | + self.assertTrue(message.acknowledged) |
375 | + |
376 | + # look in our input queue and make sure out message is there with a new |
377 | + # retry count: |
378 | + new_message = conn.SimpleQueue(input_queue).get(timeout=5.0) |
379 | + new_payload = new_message.payload |
380 | + |
381 | + expected_retry_key = input_queue + '_retry_count' |
382 | + self.assertThat(new_payload, Contains(expected_retry_key)) |
383 | + self.assertThat(new_payload[expected_retry_key], Equals(1)) |
384 | + |
385 | + def test_kill_message(self): |
386 | + input_queue, dead_queue = 'input.queue', 'dead.queue' |
387 | + conn = kombu.Connection('memory:///') |
388 | + message = self.route_message( |
389 | + conn, |
390 | + input_queue, |
391 | + dict(test='value') |
392 | + ) |
393 | + policy = DefaultRetryPolicy(max_retries=0, dead_queue=dead_queue) |
394 | + |
395 | + policy.retry(conn, message) |
396 | + |
397 | + # make sure the input message was dealt with: |
398 | + self.assertTrue(message.acknowledged) |
399 | + |
400 | + # look in the dead letter queue and make sure our message is there |
401 | + new_message = conn.SimpleQueue(dead_queue).get(timeout=5.0) |
402 | + new_payload = new_message.payload |
403 | + |
404 | + # The policy shouldn't alter the payload when moving to the dead letter queue |
405 | + self.assertThat(new_payload, Equals(dict(test='value'))) |
406 | + |
407 | + |
408 | +class LoggingConsumer(object): |
409 | + |
410 | + """A consumer callback object that acks and logs all received payloads.""" |
411 | + |
412 | + def __init__(self, return_action): |
413 | + self.got_messages = [] |
414 | + self.return_action = return_action |
415 | + |
416 | + def __call__(self, payload): |
417 | + self.got_messages.append(payload) |
418 | + return self.return_action |
419 | + |
420 | + |
421 | +class LoggingRetryPolicy(object): |
422 | + |
423 | + """A fake retry policy that ack()s all messages, but logs their payloads.""" |
424 | + def __init__(self): |
425 | + self.payloads_retried = [] |
426 | + |
427 | + def retry(self, connection, message): |
428 | + self.payloads_retried.append(message.payload) |
429 | + message.ack() |
430 | + |
431 | + |
432 | +def queue_message(conn, queue, message): |
433 | + q = kombu.simple.SimpleQueue(conn, queue) |
434 | + q.put(message) |
Some in-line nits. Otherwise nice work!