Merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-use-uservice-queue into lp:core-image-publisher

Proposed by Thomi Richards on 2015-04-08
Status: Merged
Approved by: Thomi Richards on 2015-04-08
Approved revision: 28
Merged at revision: 27
Proposed branch: lp:~thomir-deactivatedaccount/core-image-publisher/trunk-use-uservice-queue
Merge into: lp:core-image-publisher
Diff against target: 271 lines (+14/-195)
4 files modified
core_image_publisher/__init__.py (+12/-3)
core_image_publisher/queue.py (+0/-118)
core_image_publisher/tests/test_queue_integration.py (+0/-73)
core_image_publisher/worker.py (+2/-1)
To merge this branch: bzr merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-use-uservice-queue
Reviewer Review Type Date Requested Status
Joe Talbott (community) 2015-04-08 Approve on 2015-04-08
Review via email: mp+255575@code.launchpad.net

Commit message

Use the queue classes from uservice_utils.

Description of the change

Use the queue classes from uservice_utils, rather than the local copy. The version in uservice-utils has better debugging facilities and is better tested.

yay for deleting code!

To post a comment you must log in.
Joe Talbott (joetalbott) wrote :

Doesn't the version of the uservice_utils library need to be updated someplace?

review: Needs Information
Joe Talbott (joetalbott) wrote :

Was done previously (Thomi informed me via IRC).

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'core_image_publisher/__init__.py'
2--- core_image_publisher/__init__.py 2015-04-03 02:06:10 +0000
3+++ core_image_publisher/__init__.py 2015-04-08 20:17:07 +0000
4@@ -21,10 +21,13 @@
5 import os
6
7 from uservice_utils.logging import configure_service_logging
8+from uservice_utils.queue import (
9+ SimpleRabbitQueueWorker,
10+ DefaultRetryPolicy,
11+)
12
13 from core_image_publisher.queue import (
14 create_connection_from_config,
15- CoreImagePublisherQueueMonitor,
16 CoreImageResultPublisher,
17 )
18 from core_image_publisher import (
19@@ -65,9 +68,15 @@
20 try:
21 with connection:
22 publisher = CoreImageResultPublisher(connection)
23- monitor = CoreImagePublisherQueueMonitor(
24+ retry_policy = DefaultRetryPolicy(
25+ max_retries=3,
26+ dead_queue='core.deadletters.{}'.format(constants.API_VERSION)
27+ )
28+ monitor = SimpleRabbitQueueWorker(
29 connection,
30- worker.ImagePublisherWorker(config, publisher)
31+ 'core.image.{}'.format(constants.API_VERSION),
32+ worker.ImagePublisherWorker(config, publisher),
33+ retry_policy,
34 )
35 monitor.run()
36 except KeyboardInterrupt:
37
38=== modified file 'core_image_publisher/queue.py'
39--- core_image_publisher/queue.py 2015-03-31 16:45:10 +0000
40+++ core_image_publisher/queue.py 2015-04-08 20:17:07 +0000
41@@ -15,138 +15,20 @@
42 # along with this program. If not, see <http://www.gnu.org/licenses/>.
43 #
44
45-import enum
46 import logging
47-from pdb import bdb
48
49 import kombu
50-from kombu.mixins import ConsumerMixin
51
52 from core_image_publisher import constants
53
54 logger = logging.getLogger(__name__)
55
56 __all__ = [
57- 'CoreImagePublisherQueueMonitor',
58 'CoreImageResultPublisher',
59 'create_connection_from_config',
60 ]
61
62
63-class MessageActions(enum.Enum):
64-
65- """Actions that the message processing CB can request.
66-
67- These enum values can be returned from the message processing callback,
68- and the CoreImagePublisherQueueMonitor will take appropriate action.
69-
70- """
71-
72- # The message needs to be retried. This might cause the message to be
73- # retried, or it might reject the message if it's retry count has been
74- # exceeded.
75- Retry = 1
76-
77- # The message was processed properly, and should be acknowledged.
78- Acknowledge = 2
79-
80-
81-class CoreImagePublisherQueueMonitor(ConsumerMixin):
82-
83- """A class that watches for incoming messages from a queue, and calls
84- a callback to process them.
85-
86- By using an external callback we can separate the stateful logic of
87- connecting to amqp from our business logic of dealing with the messages.
88-
89- """
90-
91- def __init__(self, connection, on_message_cb):
92- """Construct a new CoreImagePublisherQueueMonitor.
93-
94- :param connection: A kombu.Connection instance.
95- :param on_message_db: A callable that will be called for every new
96- messages from the rabbitMQ queue(s). It must accept a single
97- parameter: the message object. The callback MUST either acknowledge
98- or reject or requeue the messsage, and MUST NOT raise any unhandled
99- exceptions.
100-
101- """
102- self.connection = connection
103- self.on_message_cb = on_message_cb
104-
105- def get_consumers(self, Consumer, channel):
106- """Return consumers instances for all configured queues."""
107- queues = [
108- kombu.Queue('core.image.{}'.format(constants.API_VERSION))
109- ]
110- return [Consumer(queues=queues, callbacks=[self.process])]
111-
112- def process(self, body, message):
113- """Process incomming test request.
114-
115- Run requested tests and posts results to the 'adt_results' queue
116- for later checking.
117- """
118- extra = constants.LOGGING_EXTRA.copy()
119- extra.update(body)
120- logger.info('Got: %r', body, extra=extra)
121- try:
122- requested_action = self.on_message_cb(body)
123- if requested_action == MessageActions.Retry:
124- self.maybe_requeue_message(message)
125- elif requested_action == MessageActions.Acknowledge:
126- message.ack()
127- # Make it possible to use a debugger inside the worker callback:
128- except bdb.BdbQuit:
129- raise
130- except Exception as e:
131- logger.error(
132- "Caught unhandled exception while processing message: %s",
133- e,
134- extra=extra
135- )
136- message.requeue()
137-
138- def maybe_requeue_message(self, message):
139- """Maybe requeue the message, based on it's retry count.
140-
141- Note: kombu (or maybe rabbitmq) does not allow you to alter a message's
142- payload and then call requeue(), which would be super helpful here.
143- Instead we explicitly queue it (using kombu.SimpleQueue) to the last
144- queue the message was on.
145-
146- The key we use to store the retry count is based on the message's
147- routing key, which allows us to have a separate retry count for each
148- stage of the system.
149-
150- """
151- extra = constants.LOGGING_EXTRA.copy()
152- extra.update(message.payload)
153- retry_key = "{}_retry_count".format(
154- message.delivery_info.get('routing_key', '')
155- )
156- retry_count = int(message.payload.get(retry_key, '0'))
157- if retry_count < constants.RETRY_COUNT:
158- message.payload[retry_key] = retry_count + 1
159- queue = self.connection.SimpleQueue(
160- 'core.image.{}'.format(constants.API_VERSION))
161- queue.put(message.payload)
162- queue.close()
163- message.ack()
164- else:
165- logger.error(
166- "Rejecting message due to retry count exceeding maximum. "
167- "Message will reside on the dead letter queue",
168- extra=extra
169- )
170- queue = self.connection.SimpleQueue(
171- 'core.deadletters.{}'.format(constants.API_VERSION))
172- queue.put(message.payload)
173- queue.close()
174- message.ack()
175-
176-
177 def create_connection_from_config(config):
178 """Create a connection given a config object.
179
180
181=== removed file 'core_image_publisher/tests/test_queue_integration.py'
182--- core_image_publisher/tests/test_queue_integration.py 2015-03-30 06:17:13 +0000
183+++ core_image_publisher/tests/test_queue_integration.py 1970-01-01 00:00:00 +0000
184@@ -1,73 +0,0 @@
185-# core-image-publisher
186-# Copyright (C) 2015 Canonical
187-#
188-# This program is free software: you can redistribute it and/or modify
189-# it under the terms of the GNU General Public License as published by
190-# the Free Software Foundation, either version 3 of the License, or
191-# (at your option) any later version.
192-#
193-# This program is distributed in the hope that it will be useful,
194-# but WITHOUT ANY WARRANTY; without even the implied warranty of
195-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
196-# GNU General Public License for more details.
197-#
198-# You should have received a copy of the GNU General Public License
199-# along with this program. If not, see <http://www.gnu.org/licenses/>.
200-#
201-
202-"""Integration tests for the kombu queue code."""
203-
204-import kombu
205-import kombu.simple
206-from testtools import TestCase
207-
208-
209-from core_image_publisher.queue import CoreImagePublisherQueueMonitor
210-
211-
212-class KombuQueueIntegrationTests(TestCase):
213-
214- def test_can_read_message(self):
215- conn = kombu.Connection('memory:///')
216- queue_message(conn, {'test': 'value'})
217-
218- consumer = LoggingConsumer()
219- q = CoreImagePublisherQueueMonitor(conn, consumer)
220- # pump the queue to get the enqueued message:
221- next(q.consume(limit=1, timeout=5.0))
222-
223- self.assertEqual(consumer.got_messages, [dict(test='value')])
224-
225- def test_uncaught_exceptions_cause_message_requeue(self):
226- conn = kombu.Connection('memory:///')
227- queue_message(conn, {'test': 'value'})
228-
229- def consumer_with_bug(message):
230- raise RuntimeError("I am a bug, all up in ur bizniz!")
231-
232- logging_consumer = LoggingConsumer()
233-
234- queue_bugged = CoreImagePublisherQueueMonitor(conn, consumer_with_bug)
235- queue_ok = CoreImagePublisherQueueMonitor(conn, logging_consumer)
236-
237- # pump the queue to get the enqueued message:
238- next(queue_bugged.consume(limit=1, timeout=5.0))
239- next(queue_ok.consume(limit=1, timeout=5.0))
240-
241- self.assertEqual(logging_consumer.got_messages, [dict(test='value')])
242-
243-
244-class LoggingConsumer(object):
245-
246- """A consumer callback object that acks and logs all received payloads."""
247-
248- def __init__(self):
249- self.got_messages = []
250-
251- def __call__(self, payload):
252- self.got_messages.append(payload)
253-
254-
255-def queue_message(conn, message):
256- q = kombu.simple.SimpleQueue(conn, 'core.image.v1')
257- q.put(message)
258
259=== modified file 'core_image_publisher/worker.py'
260--- core_image_publisher/worker.py 2015-04-02 14:10:24 +0000
261+++ core_image_publisher/worker.py 2015-04-08 20:17:07 +0000
262@@ -22,9 +22,10 @@
263 import subprocess
264 import tempfile
265
266+from uservice_utils.queue import MessageActions
267+
268 from core_image_publisher.constants import LOGGING_EXTRA
269 from core_image_publisher.cloud import get_glance_client
270-from core_image_publisher.queue import MessageActions
271 from core_image_publisher.utils import check_call
272
273

Subscribers

People subscribed via source and target branches

to all changes: