Merge lp:~lifeless/python-oops-amqp/0.0.4 into lp:python-oops-amqp

Proposed by Robert Collins
Status: Merged
Merged at revision: 6
Proposed branch: lp:~lifeless/python-oops-amqp/0.0.4
Merge into: lp:python-oops-amqp
Diff against target: 181 lines (+39/-12)
7 files modified
NEWS (+10/-0)
oops_amqp/__init__.py (+1/-1)
oops_amqp/publisher.py (+6/-4)
oops_amqp/receiver.py (+4/-3)
oops_amqp/tests/test_receiver.py (+2/-1)
oops_amqp/utils.py (+15/-2)
setup.py (+1/-1)
To merge this branch: bzr merge lp:~lifeless/python-oops-amqp/0.0.4
Reviewer Review Type Date Requested Status
Steve Kowalik (community) code Approve
Review via email: mp+80865@code.launchpad.net

Description of the change

More robustness discovered by observing repeated stop-start cycles of amqp2disk on carob. I've filed a bug about the basic_cancel blowing up upstream, but the guard isn't wrong, so there is no need to wait or fudge things.

The basic_cancel change is tested (see the stub change) but the new exception isn't, and is still nasty to try to do so.

To post a comment you must log in.
Revision history for this message
Steve Kowalik (stevenk) wrote :

I'm not sure about the wisdom of importing tuples of exceptions to catch -- it strikes me as messy. However, I can't think of a better solution.

review: Approve (code)
Revision history for this message
Robert Collins (lifeless) wrote :

Thanks. Its a reasonable idiom I think, and there isn't a better way that I know of.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'NEWS'
--- NEWS 2011-10-27 03:36:02 +0000
+++ NEWS 2011-11-01 04:14:23 +0000
@@ -6,6 +6,16 @@
6NEXT6NEXT
7----7----
88
90.0.4
10-----
11
12* Do not attempt operations on closed amqplib channels.
13 (Robert Collins, #884539)
14
15* Catch AMQPConnectionException in addition to IOError and socket.error as that
16 is raised by amqplib when it has the chance to read the broker shutdown
17 warning in wait(). (Robert Collins, #884540)
18
90.0.3190.0.3
10-----20-----
1121
1222
=== modified file 'oops_amqp/__init__.py'
--- oops_amqp/__init__.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/__init__.py 2011-11-01 04:14:23 +0000
@@ -97,7 +97,7 @@
97# established at this point, and setup.py will use a version of next-$(revno).97# established at this point, and setup.py will use a version of next-$(revno).
98# If the releaselevel is 'final', then the tarball will be major.minor.micro.98# If the releaselevel is 'final', then the tarball will be major.minor.micro.
99# Otherwise it is major.minor.micro~$(revno).99# Otherwise it is major.minor.micro~$(revno).
100__version__ = (0, 0, 3, 'beta', 0)100__version__ = (0, 0, 4, 'beta', 0)
101101
102__all__ = [102__all__ = [
103 'Publisher',103 'Publisher',
104104
=== modified file 'oops_amqp/publisher.py'
--- oops_amqp/publisher.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/publisher.py 2011-11-01 04:14:23 +0000
@@ -19,13 +19,15 @@
19__metaclass__ = type19__metaclass__ = type
2020
21from hashlib import md521from hashlib import md5
22import socket
23from threading import local22from threading import local
2423
25from amqplib import client_0_8 as amqp24from amqplib import client_0_8 as amqp
26from bson import dumps25from bson import dumps
2726
28from utils import is_amqplib_connection_error27from utils import (
28 amqplib_error_types,
29 is_amqplib_connection_error,
30 )
2931
30__all__ = [32__all__ = [
31 'Publisher',33 'Publisher',
@@ -63,7 +65,7 @@
63 if getattr(self.channels, 'channel', None) is None:65 if getattr(self.channels, 'channel', None) is None:
64 try:66 try:
65 self.channels.channel = self.connection_factory().channel()67 self.channels.channel = self.connection_factory().channel()
66 except (socket.error, IOError), e:68 except amqplib_error_types, e:
67 if is_amqplib_connection_error(e):69 if is_amqplib_connection_error(e):
68 # Could not connect70 # Could not connect
69 return None71 return None
@@ -91,7 +93,7 @@
91 try:93 try:
92 channel.basic_publish(94 channel.basic_publish(
93 message, self.exchange_name, routing_key=self.routing_key)95 message, self.exchange_name, routing_key=self.routing_key)
94 except (socket.error, IOError), e:96 except amqplib_error_types, e:
95 self.channels.channel = None97 self.channels.channel = None
96 if is_amqplib_connection_error(e):98 if is_amqplib_connection_error(e):
97 # Could not connect / interrupted connection99 # Could not connect / interrupted connection
98100
=== modified file 'oops_amqp/receiver.py'
--- oops_amqp/receiver.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/receiver.py 2011-11-01 04:14:23 +0000
@@ -18,12 +18,12 @@
1818
19__metaclass__ = type19__metaclass__ = type
2020
21import socket
22import time21import time
2322
24import bson23import bson
2524
26from utils import (25from utils import (
26 amqplib_error_types,
27 close_ignoring_EPIPE,27 close_ignoring_EPIPE,
28 is_amqplib_connection_error,28 is_amqplib_connection_error,
29 )29 )
@@ -84,7 +84,7 @@
84 (not self.went_bad or time.time() < self.went_bad + 120)):84 (not self.went_bad or time.time() < self.went_bad + 120)):
85 try:85 try:
86 self._run_forever()86 self._run_forever()
87 except (socket.error, IOError), e:87 except amqplib_error_types, e:
88 if not is_amqplib_connection_error(e):88 if not is_amqplib_connection_error(e):
89 # Something unknown went wrong.89 # Something unknown went wrong.
90 raise90 raise
@@ -109,7 +109,8 @@
109 if self.stopping:109 if self.stopping:
110 break110 break
111 finally:111 finally:
112 self.channel.basic_cancel(self.consume_tag)112 if self.channel.is_open:
113 self.channel.basic_cancel(self.consume_tag)
113 finally:114 finally:
114 close_ignoring_EPIPE(self.channel)115 close_ignoring_EPIPE(self.channel)
115 finally:116 finally:
116117
=== modified file 'oops_amqp/tests/test_receiver.py'
--- oops_amqp/tests/test_receiver.py 2011-10-18 03:29:27 +0000
+++ oops_amqp/tests/test_receiver.py 2011-11-01 04:14:23 +0000
@@ -98,6 +98,7 @@
98 class FakeChannel:98 class FakeChannel:
99 def __init__(self, calls):99 def __init__(self, calls):
100 self.calls = calls100 self.calls = calls
101 self.is_open = True
101 def basic_consume(self, queue_name, callback=None):102 def basic_consume(self, queue_name, callback=None):
102 self.calls.append(('basic_consume', queue_name, callback))103 self.calls.append(('basic_consume', queue_name, callback))
103 return 'tag'104 return 'tag'
@@ -108,7 +109,7 @@
108 def basic_cancel(self, tag):109 def basic_cancel(self, tag):
109 self.calls.append(('basic_cancel', tag))110 self.calls.append(('basic_cancel', tag))
110 def close(self):111 def close(self):
111 pass112 self.is_open = False
112 class FakeConnection:113 class FakeConnection:
113 def channel(self):114 def channel(self):
114 return FakeChannel(calls)115 return FakeChannel(calls)
115116
=== modified file 'oops_amqp/utils.py'
--- oops_amqp/utils.py 2011-10-27 03:36:02 +0000
+++ oops_amqp/utils.py 2011-11-01 04:14:23 +0000
@@ -19,12 +19,25 @@
19import errno19import errno
20import socket20import socket
2121
22from amqplib.client_0_8.exceptions import AMQPConnectionException
23
22__all__ = [24__all__ = [
25 'amqplib_error_types',
23 'close_ignoring_EPIPE',26 'close_ignoring_EPIPE',
27 'is_amqplib_connection_error',
24 'is_amqplib_ioerror',28 'is_amqplib_ioerror',
25 'is_amqplib_connection_error',
26 ]29 ]
2730
31# These exception types always indicate an AMQP connection error/closure.
32# However you should catch amqplib_error_types and post-filter with
33# is_amqplib_connection_error.
34amqplib_connection_errors = (socket.error, AMQPConnectionException)
35# A tuple to reduce duplication in different code paths. Lists the types of
36# exceptions legitimately raised by amqplib when the AMQP server goes down.
37# Not all exceptions *will* be such errors - use is_amqplib_connection_error to
38# do a second-stage filter after catching the exception.
39amqplib_error_types = amqplib_connection_errors + (IOError,)
40
2841
29def close_ignoring_EPIPE(closable):42def close_ignoring_EPIPE(closable):
30 try:43 try:
@@ -42,4 +55,4 @@
4255
43def is_amqplib_connection_error(e):56def is_amqplib_connection_error(e):
44 """Return True if e was (probably) raised due to a connection issue."""57 """Return True if e was (probably) raised due to a connection issue."""
45 return isinstance(e, socket.error) or is_amqplib_ioerror(e)58 return isinstance(e, amqplib_connection_errors) or is_amqplib_ioerror(e)
4659
=== modified file 'setup.py'
--- setup.py 2011-10-27 03:36:02 +0000
+++ setup.py 2011-11-01 04:14:23 +0000
@@ -23,7 +23,7 @@
23 os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()23 os.path.join(os.path.dirname(__file__), 'README'), 'rb').read()
2424
25setup(name="oops_amqp",25setup(name="oops_amqp",
26 version="0.0.3",26 version="0.0.4",
27 description=\27 description=\
28 "OOPS AMQP transport.",28 "OOPS AMQP transport.",
29 long_description=description,29 long_description=description,

Subscribers

People subscribed via source and target branches

to all changes: