Merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-use-uservice-queue into lp:core-image-publisher

Proposed by Thomi Richards
Status: Merged
Approved by: Thomi Richards
Approved revision: 28
Merged at revision: 27
Proposed branch: lp:~thomir-deactivatedaccount/core-image-publisher/trunk-use-uservice-queue
Merge into: lp:core-image-publisher
Diff against target: 271 lines (+14/-195)
4 files modified
core_image_publisher/__init__.py (+12/-3)
core_image_publisher/queue.py (+0/-118)
core_image_publisher/tests/test_queue_integration.py (+0/-73)
core_image_publisher/worker.py (+2/-1)
To merge this branch: bzr merge lp:~thomir-deactivatedaccount/core-image-publisher/trunk-use-uservice-queue
Reviewer Review Type Date Requested Status
Joe Talbott (community) Approve
Review via email: mp+255575@code.launchpad.net

Commit message

Use the queue classes from uservice_utils.

Description of the change

Use the queue classes from uservice_utils, rather than the local copy. The version in uservice-utils has better debugging facilities and is better tested.

yay for deleting code!

To post a comment you must log in.
Revision history for this message
Joe Talbott (joetalbott) wrote :

Doesn't the version of the uservice_utils library need to be updated someplace?

review: Needs Information
Revision history for this message
Joe Talbott (joetalbott) wrote :

Was done previously (Thomi informed me via IRC).

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'core_image_publisher/__init__.py'
--- core_image_publisher/__init__.py 2015-04-03 02:06:10 +0000
+++ core_image_publisher/__init__.py 2015-04-08 20:17:07 +0000
@@ -21,10 +21,13 @@
21import os21import os
2222
23from uservice_utils.logging import configure_service_logging23from uservice_utils.logging import configure_service_logging
24from uservice_utils.queue import (
25 SimpleRabbitQueueWorker,
26 DefaultRetryPolicy,
27)
2428
25from core_image_publisher.queue import (29from core_image_publisher.queue import (
26 create_connection_from_config,30 create_connection_from_config,
27 CoreImagePublisherQueueMonitor,
28 CoreImageResultPublisher,31 CoreImageResultPublisher,
29)32)
30from core_image_publisher import (33from core_image_publisher import (
@@ -65,9 +68,15 @@
65 try:68 try:
66 with connection:69 with connection:
67 publisher = CoreImageResultPublisher(connection)70 publisher = CoreImageResultPublisher(connection)
68 monitor = CoreImagePublisherQueueMonitor(71 retry_policy = DefaultRetryPolicy(
72 max_retries=3,
73 dead_queue='core.deadletters.{}'.format(constants.API_VERSION)
74 )
75 monitor = SimpleRabbitQueueWorker(
69 connection,76 connection,
70 worker.ImagePublisherWorker(config, publisher)77 'core.image.{}'.format(constants.API_VERSION),
78 worker.ImagePublisherWorker(config, publisher),
79 retry_policy,
71 )80 )
72 monitor.run()81 monitor.run()
73 except KeyboardInterrupt:82 except KeyboardInterrupt:
7483
=== modified file 'core_image_publisher/queue.py'
--- core_image_publisher/queue.py 2015-03-31 16:45:10 +0000
+++ core_image_publisher/queue.py 2015-04-08 20:17:07 +0000
@@ -15,138 +15,20 @@
15# along with this program. If not, see <http://www.gnu.org/licenses/>.15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16#16#
1717
18import enum
19import logging18import logging
20from pdb import bdb
2119
22import kombu20import kombu
23from kombu.mixins import ConsumerMixin
2421
25from core_image_publisher import constants22from core_image_publisher import constants
2623
27logger = logging.getLogger(__name__)24logger = logging.getLogger(__name__)
2825
29__all__ = [26__all__ = [
30 'CoreImagePublisherQueueMonitor',
31 'CoreImageResultPublisher',27 'CoreImageResultPublisher',
32 'create_connection_from_config',28 'create_connection_from_config',
33]29]
3430
3531
36class MessageActions(enum.Enum):
37
38 """Actions that the message processing CB can request.
39
40 These enum values can be returned from the message processing callback,
41 and the CoreImagePublisherQueueMonitor will take appropriate action.
42
43 """
44
45 # The message needs to be retried. This might cause the message to be
46 # retried, or it might reject the message if it's retry count has been
47 # exceeded.
48 Retry = 1
49
50 # The message was processed properly, and should be acknowledged.
51 Acknowledge = 2
52
53
54class CoreImagePublisherQueueMonitor(ConsumerMixin):
55
56 """A class that watches for incoming messages from a queue, and calls
57 a callback to process them.
58
59 By using an external callback we can separate the stateful logic of
60 connecting to amqp from our business logic of dealing with the messages.
61
62 """
63
64 def __init__(self, connection, on_message_cb):
65 """Construct a new CoreImagePublisherQueueMonitor.
66
67 :param connection: A kombu.Connection instance.
68 :param on_message_db: A callable that will be called for every new
69 messages from the rabbitMQ queue(s). It must accept a single
70 parameter: the message object. The callback MUST either acknowledge
71 or reject or requeue the messsage, and MUST NOT raise any unhandled
72 exceptions.
73
74 """
75 self.connection = connection
76 self.on_message_cb = on_message_cb
77
78 def get_consumers(self, Consumer, channel):
79 """Return consumers instances for all configured queues."""
80 queues = [
81 kombu.Queue('core.image.{}'.format(constants.API_VERSION))
82 ]
83 return [Consumer(queues=queues, callbacks=[self.process])]
84
85 def process(self, body, message):
86 """Process incomming test request.
87
88 Run requested tests and posts results to the 'adt_results' queue
89 for later checking.
90 """
91 extra = constants.LOGGING_EXTRA.copy()
92 extra.update(body)
93 logger.info('Got: %r', body, extra=extra)
94 try:
95 requested_action = self.on_message_cb(body)
96 if requested_action == MessageActions.Retry:
97 self.maybe_requeue_message(message)
98 elif requested_action == MessageActions.Acknowledge:
99 message.ack()
100 # Make it possible to use a debugger inside the worker callback:
101 except bdb.BdbQuit:
102 raise
103 except Exception as e:
104 logger.error(
105 "Caught unhandled exception while processing message: %s",
106 e,
107 extra=extra
108 )
109 message.requeue()
110
111 def maybe_requeue_message(self, message):
112 """Maybe requeue the message, based on it's retry count.
113
114 Note: kombu (or maybe rabbitmq) does not allow you to alter a message's
115 payload and then call requeue(), which would be super helpful here.
116 Instead we explicitly queue it (using kombu.SimpleQueue) to the last
117 queue the message was on.
118
119 The key we use to store the retry count is based on the message's
120 routing key, which allows us to have a separate retry count for each
121 stage of the system.
122
123 """
124 extra = constants.LOGGING_EXTRA.copy()
125 extra.update(message.payload)
126 retry_key = "{}_retry_count".format(
127 message.delivery_info.get('routing_key', '')
128 )
129 retry_count = int(message.payload.get(retry_key, '0'))
130 if retry_count < constants.RETRY_COUNT:
131 message.payload[retry_key] = retry_count + 1
132 queue = self.connection.SimpleQueue(
133 'core.image.{}'.format(constants.API_VERSION))
134 queue.put(message.payload)
135 queue.close()
136 message.ack()
137 else:
138 logger.error(
139 "Rejecting message due to retry count exceeding maximum. "
140 "Message will reside on the dead letter queue",
141 extra=extra
142 )
143 queue = self.connection.SimpleQueue(
144 'core.deadletters.{}'.format(constants.API_VERSION))
145 queue.put(message.payload)
146 queue.close()
147 message.ack()
148
149
150def create_connection_from_config(config):32def create_connection_from_config(config):
151 """Create a connection given a config object.33 """Create a connection given a config object.
15234
15335
=== removed file 'core_image_publisher/tests/test_queue_integration.py'
--- core_image_publisher/tests/test_queue_integration.py 2015-03-30 06:17:13 +0000
+++ core_image_publisher/tests/test_queue_integration.py 1970-01-01 00:00:00 +0000
@@ -1,73 +0,0 @@
1# core-image-publisher
2# Copyright (C) 2015 Canonical
3#
4# This program is free software: you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation, either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16#
17
18"""Integration tests for the kombu queue code."""
19
20import kombu
21import kombu.simple
22from testtools import TestCase
23
24
25from core_image_publisher.queue import CoreImagePublisherQueueMonitor
26
27
28class KombuQueueIntegrationTests(TestCase):
29
30 def test_can_read_message(self):
31 conn = kombu.Connection('memory:///')
32 queue_message(conn, {'test': 'value'})
33
34 consumer = LoggingConsumer()
35 q = CoreImagePublisherQueueMonitor(conn, consumer)
36 # pump the queue to get the enqueued message:
37 next(q.consume(limit=1, timeout=5.0))
38
39 self.assertEqual(consumer.got_messages, [dict(test='value')])
40
41 def test_uncaught_exceptions_cause_message_requeue(self):
42 conn = kombu.Connection('memory:///')
43 queue_message(conn, {'test': 'value'})
44
45 def consumer_with_bug(message):
46 raise RuntimeError("I am a bug, all up in ur bizniz!")
47
48 logging_consumer = LoggingConsumer()
49
50 queue_bugged = CoreImagePublisherQueueMonitor(conn, consumer_with_bug)
51 queue_ok = CoreImagePublisherQueueMonitor(conn, logging_consumer)
52
53 # pump the queue to get the enqueued message:
54 next(queue_bugged.consume(limit=1, timeout=5.0))
55 next(queue_ok.consume(limit=1, timeout=5.0))
56
57 self.assertEqual(logging_consumer.got_messages, [dict(test='value')])
58
59
60class LoggingConsumer(object):
61
62 """A consumer callback object that acks and logs all received payloads."""
63
64 def __init__(self):
65 self.got_messages = []
66
67 def __call__(self, payload):
68 self.got_messages.append(payload)
69
70
71def queue_message(conn, message):
72 q = kombu.simple.SimpleQueue(conn, 'core.image.v1')
73 q.put(message)
740
=== modified file 'core_image_publisher/worker.py'
--- core_image_publisher/worker.py 2015-04-02 14:10:24 +0000
+++ core_image_publisher/worker.py 2015-04-08 20:17:07 +0000
@@ -22,9 +22,10 @@
22import subprocess22import subprocess
23import tempfile23import tempfile
2424
25from uservice_utils.queue import MessageActions
26
25from core_image_publisher.constants import LOGGING_EXTRA27from core_image_publisher.constants import LOGGING_EXTRA
26from core_image_publisher.cloud import get_glance_client28from core_image_publisher.cloud import get_glance_client
27from core_image_publisher.queue import MessageActions
28from core_image_publisher.utils import check_call29from core_image_publisher.utils import check_call
2930
3031

Subscribers

People subscribed via source and target branches

to all changes: