Merge lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp

Proposed by Robert Collins
Status: Merged
Merged at revision: 6
Proposed branch: lp:~lifeless/python-oops-amqp/0.0.4
Merge into: lp:python-oops-amqp
Diff against target: 181 lines (+39/-12)
7 files modified
NEWS (+10/-0)
oops_amqp/__init__.py (+1/-1)
oops_amqp/publisher.py (+6/-4)
oops_amqp/receiver.py (+4/-3)
oops_amqp/tests/test_receiver.py (+2/-1)
oops_amqp/utils.py (+15/-2)
setup.py (+1/-1)
To merge this branch: bzr merge lp:~lifeless/python-oops-amqp/0.0.4
Reviewer Review Type Date Requested Status
Steve Kowalik (community) code Approve
Review via email: mp+80865@code.launchpad.net

Description of the change

More robustness discovered by observing repeated stop-start cycles of amqp2disk on carob. I've filed a bug about the basic_cancel blowing up upstream, but the guard isn't wrong, so there is no need to wait or fudge things.

The basic_cancel change is tested (see the stub change) but the new exception isn't, and is still nasty to try to do so.

To post a comment you must log in.
Revision history for this message
Steve Kowalik (stevenk) wrote :

I'm not sure about the wisdom of importing tuples of exceptions to catch -- it strikes me as messy. However, I can't think of a better solution.

review: Approve (code)
Revision history for this message
Robert Collins (lifeless) wrote :

Thanks. Its a reasonable idiom I think, and there isn't a better way that I know of.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2011-10-27 03:36:02 +0000
3+++ NEWS 2011-11-01 04:14:23 +0000
4@@ -6,6 +6,16 @@
5 NEXT
6 ----
7
8+0.0.4
9+-----
10+
11+* Do not attempt operations on closed amqplib channels.
12+ (Robert Collins, #884539)
13+
14+* Catch AMQPConnectionException in addition to IOError and socket.error as that
15+ is raised by amqplib when it has the chance to read the broker shutdown
16+ warning in wait(). (Robert Collins, #884540)
17+
18 0.0.3
19 -----
20
21
22=== modified file 'oops_amqp/__init__.py'
23--- oops_amqp/__init__.py 2011-10-27 03:36:02 +0000
24+++ oops_amqp/__init__.py 2011-11-01 04:14:23 +0000
25@@ -97,7 +97,7 @@
26 # established at this point, and setup.py will use a version of next-$(revno).
27 # If the releaselevel is 'final', then the tarball will be major.minor.micro.
28 # Otherwise it is major.minor.micro~$(revno).
29-__version__ = (0, 0, 3, 'beta', 0)
30+__version__ = (0, 0, 4, 'beta', 0)
31
32 __all__ = [
33 'Publisher',
34
35=== modified file 'oops_amqp/publisher.py'
36--- oops_amqp/publisher.py 2011-10-27 03:36:02 +0000
37+++ oops_amqp/publisher.py 2011-11-01 04:14:23 +0000
38@@ -19,13 +19,15 @@
39 __metaclass__ = type
40
41 from hashlib import md5
42-import socket
43 from threading import local
44
45 from amqplib import client_0_8 as amqp
46 from bson import dumps
47
48-from utils import is_amqplib_connection_error
49+from utils import (
50+ amqplib_error_types,
51+ is_amqplib_connection_error,
52+ )
53
54 __all__ = [
55 'Publisher',
56@@ -63,7 +65,7 @@
57 if getattr(self.channels, 'channel', None) is None:
58 try:
59 self.channels.channel = self.connection_factory().channel()
60- except (socket.error, IOError), e:
61+ except amqplib_error_types, e:
62 if is_amqplib_connection_error(e):
63 # Could not connect
64 return None
65@@ -91,7 +93,7 @@
66 try:
67 channel.basic_publish(
68 message, self.exchange_name, routing_key=self.routing_key)
69- except (socket.error, IOError), e:
70+ except amqplib_error_types, e:
71 self.channels.channel = None
72 if is_amqplib_connection_error(e):
73 # Could not connect / interrupted connection
74
75=== modified file 'oops_amqp/receiver.py'
76--- oops_amqp/receiver.py 2011-10-27 03:36:02 +0000
77+++ oops_amqp/receiver.py 2011-11-01 04:14:23 +0000
78@@ -18,12 +18,12 @@
79
80 __metaclass__ = type
81
82-import socket
83 import time
84
85 import bson
86
87 from utils import (
88+ amqplib_error_types,
89 close_ignoring_EPIPE,
90 is_amqplib_connection_error,
91 )
92@@ -84,7 +84,7 @@
93 (not self.went_bad or time.time() < self.went_bad + 120)):
94 try:
95 self._run_forever()
96- except (socket.error, IOError), e:
97+ except amqplib_error_types, e:
98 if not is_amqplib_connection_error(e):
99 # Something unknown went wrong.
100 raise
101@@ -109,7 +109,8 @@
102 if self.stopping:
103 break
104 finally:
105- self.channel.basic_cancel(self.consume_tag)
106+ if self.channel.is_open:
107+ self.channel.basic_cancel(self.consume_tag)
108 finally:
109 close_ignoring_EPIPE(self.channel)
110 finally:
111
112=== modified file 'oops_amqp/tests/test_receiver.py'
113--- oops_amqp/tests/test_receiver.py 2011-10-18 03:29:27 +0000
114+++ oops_amqp/tests/test_receiver.py 2011-11-01 04:14:23 +0000
115@@ -98,6 +98,7 @@
116 class FakeChannel:
117 def __init__(self, calls):
118 self.calls = calls
119+ self.is_open = True
120 def basic_consume(self, queue_name, callback=None):
121 self.calls.append(('basic_consume', queue_name, callback))
122 return 'tag'
123@@ -108,7 +109,7 @@
124 def basic_cancel(self, tag):
125 self.calls.append(('basic_cancel', tag))
126 def close(self):
127- pass
128+ self.is_open = False
129 class FakeConnection:
130 def channel(self):
131 return FakeChannel(calls)
132
133=== modified file 'oops_amqp/utils.py'
134--- oops_amqp/utils.py 2011-10-27 03:36:02 +0000
135+++ oops_amqp/utils.py 2011-11-01 04:14:23 +0000
136@@ -19,12 +19,25 @@
137 import errno
138 import socket
139
140+from amqplib.client_0_8.exceptions import AMQPConnectionException
141+
142 __all__ = [
143+ 'amqplib_error_types',
144 'close_ignoring_EPIPE',
145+ 'is_amqplib_connection_error',
146 'is_amqplib_ioerror',
147- 'is_amqplib_connection_error',
148 ]
149
150+# These exception types always indicate an AMQP connection error/closure.
151+# However you should catch amqplib_error_types and post-filter with
152+# is_amqplib_connection_error.
153+amqplib_connection_errors = (socket.error, AMQPConnectionException)
154+# A tuple to reduce duplication in different code paths. Lists the types of
155+# exceptions legitimately raised by amqplib when the AMQP server goes down.
156+# Not all exceptions *will* be such errors - use is_amqplib_connection_error to
157+# do a second-stage filter after catching the exception.
158+amqplib_error_types = amqplib_connection_errors + (IOError,)
159+
160
161 def close_ignoring_EPIPE(closable):
162 try:
163@@ -42,4 +55,4 @@
164
165 def is_amqplib_connection_error(e):
166 """Return True if e was (probably) raised due to a connection issue."""
167- return isinstance(e, socket.error) or is_amqplib_ioerror(e)
168+ return isinstance(e, amqplib_connection_errors) or is_amqplib_ioerror(e)
169
170=== modified file 'setup.py'
171--- setup.py 2011-10-27 03:36:02 +0000
172+++ setup.py 2011-11-01 04:14:23 +0000
173@@ -23,7 +23,7 @@
174 os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
175
176 setup(name="oops_amqp",
177- version="0.0.3",
178+ version="0.0.4",
179 description=\
180 "OOPS AMQP transport.",
181 long_description=description,

Subscribers

People subscribed via source and target branches

to all changes: