Merge lp:~cjwatson/python-oops-amqp/publisher-handle-channel-errors into lp:python-oops-amqp

Proposed by Colin Watson
Status: Needs review
Proposed branch: lp:~cjwatson/python-oops-amqp/publisher-handle-channel-errors
Merge into: lp:python-oops-amqp
Diff against target: 85 lines (+29/-2)
4 files modified
NEWS (+5/-0)
oops_amqp/publisher.py (+11/-0)
oops_amqp/tests/test_publisher.py (+8/-0)
oops_amqp/utils.py (+5/-2)
To merge this branch: bzr merge lp:~cjwatson/python-oops-amqp/publisher-handle-channel-errors
Reviewer Review Type Date Requested Status
Launchpad code reviewers Pending
Review via email: mp+367748@code.launchpad.net

Commit message

Handle AMQP channel errors (particularly NotFound) in the publisher.

Description of the change

amqp 2.4.0 included a change to drain events before publishing. This means that if we try to publish an OOPS to a nonexistent exchange, then some future publishing attempt will raise a NotFound exception, which is a channel error rather than a connection error and so wasn't previously handled.

To try to minimise confusion resulting from this (which can be considerable - it took me several days to track down what was happening in Launchpad's test suite), spend a short time waiting for a response for the broker after publishing an OOPS. This will typically allow us to detect channel errors immediately, which we now handle; even if we don't manage to handle them immediately, they'll be handled the next time we try to publish something on the same channel.

It would be possible to handle channel errors more economically by just reopening a channel on the same connection rather than reopening the entire connection, but reopening the connection seems to work well enough for now.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) wrote :

I do have some concerns about what would happen if the broker was up but slow and there was an OOPS storm: we'd incur an extra second in each case. I added this mainly because it was difficult to reproduce the problem in a test case otherwise. On the other hand, an extra second isn't terrible in that sort of situation, and might even allow the broker to recover more quickly by slowing down the input slightly.

Revision history for this message
Colin Watson (cjwatson) wrote :

I've run the full Launchpad test suite over this successfully.

Unmerged revisions

24. By Colin Watson

Handle AMQP channel errors (particularly NotFound) in the publisher.

amqp 2.4.0 included a change to drain events before publishing. This
means that if we try to publish an OOPS to a nonexistent exchange, then
some future publishing attempt will raise a NotFound exception, which is
a channel error rather than a connection error and so wasn't previously
handled.

To try to minimise confusion resulting from this (which can be
considerable - it took me several days to track down what was happening
in Launchpad's test suite), spend a short time waiting for a response
for the broker after publishing an OOPS. This will typically allow us
to detect channel errors immediately, which we now handle; even if we
don't manage to handle them immediately, they'll be handled the next
time we try to publish something on the same channel.

It would be possible to handle channel errors more economically by just
reopening a channel on the same connection rather than reopening the
entire connection, but reopening the connection seems to work well
enough for now.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2018-05-09 12:31:40 +0000
3+++ NEWS 2019-05-22 09:39:41 +0000
4@@ -3,6 +3,11 @@
5
6 Changes and improvements to oops-amqp, grouped by release.
7
8+0.1.1
9+-----
10+
11+* Handle AMQP channel errors (particularly NotFound) in the publisher.
12+
13 0.1.0
14 -----
15
16
17=== modified file 'oops_amqp/publisher.py'
18--- oops_amqp/publisher.py 2018-03-12 11:48:55 +0000
19+++ oops_amqp/publisher.py 2019-05-22 09:39:41 +0000
20@@ -20,6 +20,7 @@
21 __metaclass__ = type
22
23 from hashlib import md5
24+import socket
25 from threading import local
26
27 import amqp
28@@ -96,6 +97,16 @@
29 try:
30 channel.basic_publish(
31 message, self.exchange_name, routing_key=self.routing_key)
32+ # See if we get a NotFound error back straight away, to avoid
33+ # the event still being on the queue and causing confusion when
34+ # publishing future reports. Don't hang around too long waiting
35+ # for it; if we don't hear anything back quickly, just close the
36+ # connection and let the next call open a new one.
37+ try:
38+ channel.connection.drain_events(timeout=1)
39+ except socket.timeout:
40+ channel.close()
41+ self.channels.channel = None
42 except amqplib_error_types as e:
43 self.channels.channel = None
44 if is_amqplib_connection_error(e):
45
46=== modified file 'oops_amqp/tests/test_publisher.py'
47--- oops_amqp/tests/test_publisher.py 2018-03-12 11:48:55 +0000
48+++ oops_amqp/tests/test_publisher.py 2019-05-22 09:39:41 +0000
49@@ -137,3 +137,11 @@
50 queue.channel = connection.channel()
51 self.assertNotEqual([], publisher(oops))
52
53+ def test_publish_amqp_exchange_not_found(self):
54+ channel = self.useFixture(
55+ ChannelFixture(self.connection_factory)).channel
56+ self.useFixture(QueueFixture(channel, self.getUniqueString))
57+ publisher = Publisher(
58+ self.connection_factory, self.getUniqueString(), "")
59+ oops = {'akey': 42}
60+ self.assertEqual([], publisher(oops))
61
62=== modified file 'oops_amqp/utils.py'
63--- oops_amqp/utils.py 2018-03-12 11:48:55 +0000
64+++ oops_amqp/utils.py 2019-05-22 09:39:41 +0000
65@@ -19,7 +19,10 @@
66
67 import socket
68
69-from amqp.exceptions import ConnectionError
70+from amqp.exceptions import (
71+ ChannelError,
72+ ConnectionError,
73+ )
74
75 __all__ = [
76 'amqplib_error_types',
77@@ -31,7 +34,7 @@
78 # These exception types always indicate an AMQP connection error/closure.
79 # However you should catch amqplib_error_types and post-filter with
80 # is_amqplib_connection_error.
81-amqplib_connection_errors = (socket.error, ConnectionError)
82+amqplib_connection_errors = (socket.error, ConnectionError, ChannelError)
83 # A tuple to reduce duplication in different code paths. Lists the types of
84 # exceptions legitimately raised by amqplib when the AMQP server goes down.
85 # Not all exceptions *will* be such errors - use is_amqplib_connection_error to

Subscribers

People subscribed via source and target branches

to all changes: