Merge lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-exception-logging into lp:uservice-utils

Proposed by Thomi Richards
Status: Merged
Approved by: Thomi Richards
Approved revision: 13
Merged at revision: 13
Proposed branch: lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-exception-logging
Merge into: lp:uservice-utils
Diff against target: 176 lines (+68/-46)
2 files modified
uservice_utils/queue.py (+12/-3)
uservice_utils/tests/test_queue.py (+56/-43)
To merge this branch: bzr merge lp:~thomir-deactivatedaccount/uservice-utils/trunk-add-exception-logging
Reviewer Review Type Date Requested Status
Celso Providelo (community) Approve
Review via email: mp+255448@code.launchpad.net

Commit message

Log full exception tracebacks on uncaught exceptions in the queue worker.

Description of the change

Format uncaught exceptions better.

To post a comment you must log in.
Revision history for this message
Celso Providelo (cprov) wrote :

Thomi,

Thanks for including traceback on error messages, they will be useful in logstash.

I don't really get the reason beyond the tests redesign apart from incorporating the retry policy mechanism. Maybe I am missing something ? Either way I am happy if tarmac like them.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'uservice_utils/queue.py'
2--- uservice_utils/queue.py 2015-04-02 19:28:58 +0000
3+++ uservice_utils/queue.py 2015-04-07 23:22:42 +0000
4@@ -21,6 +21,15 @@
5 import logging
6 from pdb import bdb
7
8+# Traceback2 contains back-ported functionality that will appear in python
9+# version 3.5. Specifically, tracebacks will have stack-locals printed in
10+# them. This is useful, but we can't rely on it being installed. Try and
11+# import the new version, but fall back to the old if it's not present.
12+try:
13+ import traceback2 as traceback
14+except ImportError:
15+ import traceback
16+
17 import kombu
18 from kombu.mixins import ConsumerMixin
19
20@@ -119,10 +128,10 @@
21 # Make it possible to use a debugger inside the worker callback:
22 except bdb.BdbQuit:
23 raise
24- except Exception as e:
25+ except Exception:
26 logger.error(
27- "Caught unhandled exception while processing message: %s",
28- e,
29+ "Caught unhandled exception while processing message:\n%s",
30+ traceback.format_exc(),
31 )
32 self.retry_policy.retry(self.connection, message)
33
34
35=== modified file 'uservice_utils/tests/test_queue.py'
36--- uservice_utils/tests/test_queue.py 2015-04-02 19:28:58 +0000
37+++ uservice_utils/tests/test_queue.py 2015-04-07 23:22:42 +0000
38@@ -17,10 +17,10 @@
39
40 """Integration tests for the kombu queue code."""
41
42+from fixtures import FakeLogger
43 import kombu
44 import kombu.simple
45 from testtools import TestCase
46-
47 from testtools.matchers import (
48 Contains,
49 Equals,
50@@ -36,63 +36,74 @@
51
52 class KombuQueueIntegrationTests(TestCase):
53
54+ def setUp(self):
55+ super().setUp()
56+ self.message = {'test': self.getUniqueString()}
57+
58+ def create_worker(self, consumer, retry_policy=None):
59+ conn = kombu.Connection('memory:///')
60+ queue_name = self.getUniqueString()
61+ queue_message(conn, queue_name, self.message)
62+ retry_policy = retry_policy or LoggingRetryPolicy()
63+
64+ return SimpleRabbitQueueWorker(
65+ conn,
66+ queue_name,
67+ consumer,
68+ retry_policy,
69+ )
70+
71 def test_can_read_and_accept_message(self):
72- conn = kombu.Connection('memory:///')
73- queue_name = self.getUniqueString()
74- queue_message(conn, queue_name, {'test': 'value'})
75- retry_policy = LoggingRetryPolicy()
76-
77 consumer = LoggingConsumer(MessageActions.Acknowledge)
78- q = SimpleRabbitQueueWorker(
79- conn,
80- queue_name,
81- consumer,
82- retry_policy,
83- )
84+ retry_policy = LoggingRetryPolicy()
85+ q = self.create_worker(consumer, retry_policy)
86+
87 # pump the queue to get the enqueued message:
88 next(q.consume(limit=1, timeout=5.0))
89
90- self.assertEqual(consumer.got_messages, [dict(test='value')])
91+ self.assertEqual(consumer.got_messages, [self.message])
92 self.assertEqual(retry_policy.payloads_retried, [])
93
94 def test_uncaught_exceptions_cause_message_retry(self):
95- conn = kombu.Connection('memory:///')
96- queue_name = self.getUniqueString()
97- message = {'test': 'value'}
98- queue_message(conn, queue_name, message)
99- retry_policy = LoggingRetryPolicy()
100-
101 def consumer_with_bug(message):
102 raise RuntimeError("I am a bug, all up in ur bizniz!")
103- queue = SimpleRabbitQueueWorker(
104- conn,
105- queue_name,
106- consumer_with_bug,
107- retry_policy
108- )
109- next(queue.consume(limit=1, timeout=5.0))
110-
111- self.assertEqual(retry_policy.payloads_retried, [message])
112+ retry_policy = LoggingRetryPolicy()
113+
114+ queue = self.create_worker(consumer_with_bug, retry_policy)
115+ # supress the logged message:
116+ with FakeLogger():
117+ next(queue.consume(limit=1, timeout=5.0))
118+
119+ self.assertEqual(retry_policy.payloads_retried, [self.message])
120
121 def test_retry_message_works(self):
122- conn = kombu.Connection('memory:///')
123- queue_name = self.getUniqueString()
124- message = {'test': 'value'}
125- queue_message(conn, queue_name, message)
126- retry_policy = LoggingRetryPolicy()
127-
128 consumer = LoggingConsumer(MessageActions.Retry)
129- q = SimpleRabbitQueueWorker(
130- conn,
131- queue_name,
132- consumer,
133- retry_policy,
134- )
135+ retry_policy = LoggingRetryPolicy()
136+
137+ q = self.create_worker(consumer, retry_policy)
138 # pump the queue to get the enqueued message:
139 next(q.consume(limit=1, timeout=5.0))
140
141- self.assertEqual(consumer.got_messages, [message])
142- self.assertEqual(retry_policy.payloads_retried, [message])
143+ self.assertEqual(consumer.got_messages, [self.message])
144+ self.assertEqual(retry_policy.payloads_retried, [self.message])
145+
146+ def test_unhandled_exceptions_cause_full_traceback(self):
147+ def consumer_with_bug(message):
148+ raise RuntimeError("I am a bug, all up in ur bizniz!")
149+ retry_policy = LoggingRetryPolicy()
150+ queue = self.create_worker(consumer_with_bug, retry_policy)
151+ # supress the logged message:
152+ with FakeLogger() as f:
153+ next(queue.consume(limit=1, timeout=5.0))
154+ log_contents = f.output
155+ # must contain the exception line...
156+ self.assertThat(log_contents, Contains("I am a bug, all up in ur bizniz!"))
157+ # and also the traceback (search for this file, since it will be
158+ # the top of the tracback)
159+ self.assertThat(log_contents, Contains(__file__))
160+
161+ self.assertEqual(retry_policy.payloads_retried, [self.message])
162+
163
164
165 class DefaultRetryPolicyTests(TestCase):
166@@ -143,7 +154,9 @@
167 )
168 policy = DefaultRetryPolicy(max_retries=0, dead_queue=dead_queue)
169
170- policy.retry(conn, message)
171+ # supress the logged message:
172+ with FakeLogger():
173+ policy.retry(conn, message)
174
175 # make sure the input message was dealt with:
176 self.assertTrue(message.acknowledged)

Subscribers

People subscribed via source and target branches

to all changes: