Merge lp:~free.ekanayaka/txamqp/distinguish-channel-and-connection-errors into lp:txamqp

Proposed by Free Ekanayaka
Status: Merged
Merged at revision: 77
Proposed branch: lp:~free.ekanayaka/txamqp/distinguish-channel-and-connection-errors
Merge into: lp:txamqp
Diff against target: 450 lines (+129/-34)
8 files modified
setup.py (+1/-1)
src/txamqp/client.py (+22/-1)
src/txamqp/protocol.py (+13/-4)
src/txamqp/test/test_basic.py (+23/-7)
src/txamqp/test/test_broker.py (+3/-3)
src/txamqp/test/test_exchange.py (+26/-5)
src/txamqp/test/test_protocol.py (+29/-1)
src/txamqp/test/test_queue.py (+12/-12)
To merge this branch: bzr merge lp:~free.ekanayaka/txamqp/distinguish-channel-and-connection-errors
Reviewer Review Type Date Requested Status
Alberto Donato (community) Approve
Review via email: mp+311633@code.launchpad.net

Description of the change

Sub-class the generic txamqp.client.Closed error so it's easier to consuming code to distinguish between AMQP "channel" errors and "connection" errors (they have different semantics and happen under different circumstances, so you want to typically handle them differently).

Note that these are protocol-level errors, non transport-level ones (i.e. the TCP connection is still open after they happen).

While at it, some try/except blocks were migrated to py3-compatible syntax ("Exception, e:" vs "Exception as e:").

To post a comment you must log in.
90. By Free Ekanayaka

Use py3-compatible try/except syntax

Revision history for this message
Alberto Donato (ack) wrote :

nice, +1

just a few nits inline

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'setup.py'
2--- setup.py 2013-03-03 13:40:48 +0000
3+++ setup.py 2016-11-23 16:38:57 +0000
4@@ -1,6 +1,6 @@
5 setupdict= {
6 'name': 'txAMQP',
7- 'version': '0.6.2',
8+ 'version': '0.7.0',
9 'author': 'Esteve Fernandez',
10 'author_email': 'esteve@fluidinfo.com',
11 'url': 'https://launchpad.net/txamqp',
12
13=== modified file 'src/txamqp/client.py'
14--- src/txamqp/client.py 2016-06-06 07:09:04 +0000
15+++ src/txamqp/client.py 2016-11-23 16:38:57 +0000
16@@ -2,12 +2,33 @@
17 from twisted.internet import defer
18 from txamqp.delegate import Delegate
19
20+
21 class Closed(Exception):
22- pass
23+ """Raised if either a channel or the whole connection got closed."""
24+
25+
26+class ChannelClosed(Closed):
27+ """Raised if a channel got closed because of a channel error.
28+
29+ This happens when the broker sends us a 'channel-close' method.
30+
31+ @see: The AMQP specification for possible channel error codes.
32+ """
33+
34+
35+class ConnectionClosed(Closed):
36+ """Raised if the connection got closed because of a connection error.
37+
38+ This happens when the broker sends us a 'connection-close' method.
39+
40+ @see: The AMQP specification for possible connection error codes.
41+ """
42+
43
44 class AlreadyFiredError(Exception):
45 pass
46
47+
48 class TwistedEvent(object):
49 """
50 An asynchronous event that is in one of three states:
51
52=== modified file 'src/txamqp/protocol.py'
53--- src/txamqp/protocol.py 2016-11-22 13:31:30 +0000
54+++ src/txamqp/protocol.py 2016-11-23 16:38:57 +0000
55@@ -10,7 +10,7 @@
56 from txamqp.message import Message
57 from txamqp.content import Content
58 from txamqp.queue import TimeoutDeferredQueue, Closed as QueueClosed
59-from txamqp.client import TwistedEvent, Closed
60+from txamqp.client import TwistedEvent, Closed, ConnectionClosed, ChannelClosed
61 from cStringIO import StringIO
62 import struct
63 from time import time
64@@ -66,7 +66,7 @@
65 @defer.inlineCallbacks
66 def invoke(self, method, args, content=None):
67 if self.closed:
68- raise Closed(self.reason)
69+ self._raiseClosed(self.reason)
70 frame = Frame(self.id, Method(method, *args))
71 self.outgoing.put(frame)
72
73@@ -96,7 +96,7 @@
74 raise ValueError(resp)
75 except QueueClosed, e:
76 if self.closed:
77- raise Closed(self.reason)
78+ self._raiseClosed(self.reason)
79 else:
80 raise e
81
82@@ -112,6 +112,15 @@
83 chunk = content.body[i:i + maxChunkSize]
84 queue.put(Frame(self.id, Body(chunk)))
85
86+ def _raiseClosed(self, reason):
87+ """Raise the appropriate Closed-based error for the given reason."""
88+ if isinstance(reason, Message):
89+ if reason.method.klass.name == "channel":
90+ raise ChannelClosed(reason)
91+ elif reason.method.klass.name == "connection":
92+ raise ConnectionClosed(reason)
93+ raise Closed(reason)
94+
95
96 class FrameReceiver(protocol.Protocol, basic._PauseableMixin):
97
98@@ -318,7 +327,7 @@
99 cleanly, by sending a "close" method and waiting for "close-ok". If
100 no reply is received within the given amount of seconds, the
101 transport will be forcely shutdown.
102- """
103+ """
104 if self.closed:
105 return
106
107
108=== modified file 'src/txamqp/test/test_basic.py'
109--- src/txamqp/test/test_basic.py 2016-06-08 13:10:58 +0000
110+++ src/txamqp/test/test_basic.py 2016-11-23 16:38:57 +0000
111@@ -16,10 +16,11 @@
112 # specific language governing permissions and limitations
113 # under the License.
114 #
115-from txamqp.client import Closed
116+from txamqp.client import ConnectionClosed, ChannelClosed
117 from txamqp.queue import Empty
118 from txamqp.content import Content
119-from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ
120+from txamqp.testlib import (
121+ TestBase, supportedBrokers, QPID, OPENAMQ, RABBITMQ)
122
123 from twisted.internet.defer import inlineCallbacks
124
125@@ -68,7 +69,7 @@
126 try:
127 yield channel.basic_consume(consumer_tag="second", queue="test-queue-2")
128 self.fail("Expected consume request to fail due to previous exclusive consumer")
129- except Closed, e:
130+ except ChannelClosed as e:
131 self.assertChannelException(403, e.args[0])
132
133 #open new channel and cleanup last consumer:
134@@ -80,7 +81,7 @@
135 try:
136 yield channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
137 self.fail("Expected exclusive consume request to fail due to previous consumer")
138- except Closed, e:
139+ except ChannelClosed as e:
140 self.assertChannelException(403, e.args[0])
141
142 @inlineCallbacks
143@@ -94,7 +95,7 @@
144 #queue specified but doesn't exist:
145 yield channel.basic_consume(queue="invalid-queue")
146 self.fail("Expected failure when consuming from non-existent queue")
147- except Closed, e:
148+ except ChannelClosed as e:
149 self.assertChannelException(404, e.args[0])
150
151 @supportedBrokers(QPID, OPENAMQ)
152@@ -109,9 +110,24 @@
153 #queue not specified and none previously declared for channel:
154 yield channel.basic_consume(queue="")
155 self.fail("Expected failure when consuming from unspecified queue")
156- except Closed, e:
157+ except ConnectionClosed, e:
158 self.assertConnectionException(530, e.args[0])
159
160+ @supportedBrokers(RABBITMQ)
161+ @inlineCallbacks
162+ def test_consume_queue_unspecified_rabbit(self):
163+ """
164+ C{basic_consume} fails with a channel exception with a C{404} code
165+ when no queue is specified.
166+ """
167+ channel = self.channel
168+ try:
169+ #queue not specified and none previously declared for channel:
170+ yield channel.basic_consume(queue="")
171+ self.fail("Expected failure when consuming from unspecified queue")
172+ except ChannelClosed as e:
173+ self.assertChannelException(404, e.args[0])
174+
175 @inlineCallbacks
176 def test_consume_unique_consumers(self):
177 """
178@@ -126,7 +142,7 @@
179 try:
180 yield channel.basic_consume(consumer_tag="first", queue="test-queue-3")
181 self.fail("Expected consume request to fail due to non-unique tag")
182- except Closed, e:
183+ except ConnectionClosed, e:
184 self.assertConnectionException(530, e.args[0])
185
186 @inlineCallbacks
187
188=== modified file 'src/txamqp/test/test_broker.py'
189--- src/txamqp/test/test_broker.py 2016-06-06 07:09:04 +0000
190+++ src/txamqp/test/test_broker.py 2016-11-23 16:38:57 +0000
191@@ -16,7 +16,7 @@
192 # specific language governing permissions and limitations
193 # under the License.
194 #
195-from txamqp.client import Closed
196+from txamqp.client import ConnectionClosed
197 from txamqp.queue import Empty
198 from txamqp.content import Content
199 from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ
200@@ -132,7 +132,7 @@
201 try:
202 yield channel.queue_declare(exclusive=True)
203 self.fail("Expected error on queue_declare for invalid channel")
204- except Closed, e:
205+ except ConnectionClosed as e:
206 self.assertConnectionException(504, e.args[0])
207
208 @inlineCallbacks
209@@ -143,7 +143,7 @@
210 try:
211 yield channel.queue_declare(exclusive=True)
212 self.fail("Expected error on queue_declare for closed channel")
213- except Closed, e:
214+ except ConnectionClosed as e:
215 self.assertConnectionException(504, e.args[0])
216
217 @supportedBrokers(QPID, OPENAMQ)
218
219=== modified file 'src/txamqp/test/test_exchange.py'
220--- src/txamqp/test/test_exchange.py 2011-02-08 12:34:54 +0000
221+++ src/txamqp/test/test_exchange.py 2016-11-23 16:38:57 +0000
222@@ -24,9 +24,9 @@
223 """
224
225 from txamqp.queue import Empty
226-from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ
227+from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ, RABBITMQ
228 from txamqp.content import Content
229-from txamqp.client import Closed
230+from txamqp.client import ChannelClosed, ConnectionClosed
231
232 from twisted.internet.defer import inlineCallbacks
233
234@@ -238,7 +238,7 @@
235 try:
236 yield self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
237 self.fail("Expected 404 for passive declaration of unknown exchange.")
238- except Closed, e:
239+ except ChannelClosed as e:
240 self.assertChannelException(404, e.args[0])
241
242
243@@ -334,7 +334,7 @@
244 try:
245 yield self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
246 self.fail("Expected 503 for declaration of unknown exchange type.")
247- except Closed, e:
248+ except ConnectionClosed as e:
249 self.assertConnectionException(503, e.args[0])
250
251 @supportedBrokers(QPID, OPENAMQ)
252@@ -344,7 +344,7 @@
253 try:
254 yield self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
255 self.fail("Expected 530 for redeclaration of exchange with different type.")
256- except Closed, e:
257+ except ConnectionClosed as e:
258 self.assertConnectionException(530, e.args[0])
259 #cleanup
260 other = yield self.connect()
261@@ -352,3 +352,24 @@
262 yield c2.channel_open()
263 yield c2.exchange_delete(exchange="test_different_declared_type_exchange")
264
265+ @supportedBrokers(RABBITMQ)
266+ @inlineCallbacks
267+ def testDifferentDeclaredTypeRabbit(self):
268+ """Test redeclaration of exchange with different type on RabbitMQ."""
269+ yield self.channel.exchange_declare(
270+ exchange="test_different_declared_type_exchange", type="direct")
271+ try:
272+ yield self.channel.exchange_declare(
273+ exchange="test_different_declared_type_exchange", type="topic")
274+ self.fail(
275+ "Expected 406 for redeclaration of exchange with "
276+ "different type.")
277+ except ChannelClosed as e:
278+ self.assertChannelException(406, e.args[0])
279+ finally:
280+ # cleanup
281+ other = yield self.connect()
282+ c2 = yield other.channel(1)
283+ yield c2.channel_open()
284+ yield c2.exchange_delete(
285+ exchange="test_different_declared_type_exchange")
286
287=== modified file 'src/txamqp/test/test_protocol.py'
288--- src/txamqp/test/test_protocol.py 2016-06-06 07:09:04 +0000
289+++ src/txamqp/test/test_protocol.py 2016-11-23 16:38:57 +0000
290@@ -4,7 +4,8 @@
291 from twisted.logger import Logger
292
293 from txamqp.protocol import AMQClient
294-from txamqp.client import TwistedDelegate, Closed
295+from txamqp.client import (
296+ TwistedDelegate, Closed, ConnectionClosed, ChannelClosed)
297 from txamqp.testing import AMQPump
298 from txamqp.spec import DEFAULT_SPEC, load
299 from txamqp.queue import Closed as QueueClosed
300@@ -34,6 +35,14 @@
301 channel0 = self.successResultOf(self.protocol.channel(0))
302 self.assertTrue(channel0.closed)
303
304+ def test_connection_close_raises_error(self):
305+ """Test receiving a connection-close method raises ConnectionClosed."""
306+ channel = self.successResultOf(self.protocol.channel(0))
307+ d = channel.basic_consume(queue="test-queue")
308+ self.transport.channel(0).connection_close(reply_code=320)
309+ failure = self.failureResultOf(d)
310+ self.assertIsInstance(failure.value, ConnectionClosed)
311+
312 def test_close(self):
313 """Test explicitely closing a client."""
314 d = self.protocol.close()
315@@ -106,6 +115,25 @@
316 self.assertIsInstance(failure.value, Closed)
317 self.assertIsInstance(failure.value.args[0].value, ConnectionLost)
318
319+ def test_channel_close(self):
320+ """Test receiving a channel-close method raises ChannelClosed."""
321+ channel = self.successResultOf(self.protocol.channel(0))
322+ d = channel.basic_consume(queue="non-existing-queue")
323+ self.transport.channel(0).channel_close(reply_code=404)
324+ failure = self.failureResultOf(d)
325+ self.assertIsInstance(failure.value, ChannelClosed)
326+
327+ def test_sending_method_on_closed_channel(self):
328+ """Sending a method on a closed channel fails immediately."""
329+ channel = self.successResultOf(self.protocol.channel(0))
330+ self.transport.channel(0).connection_close(reply_code=320)
331+ self.transport.outgoing.clear()
332+ d = channel.basic_consume(queue="test-queue")
333+ # No frames were sent
334+ self.assertEqual({}, self.transport.outgoing)
335+ failure = self.failureResultOf(d)
336+ self.assertIsInstance(failure.value, ConnectionClosed)
337+
338 def test_disconnected_event(self):
339 """Test disconnected event fired after the connection is lost."""
340 deferred = self.protocol.disconnected.wait()
341
342=== modified file 'src/txamqp/test/test_queue.py'
343--- src/txamqp/test/test_queue.py 2016-05-23 14:41:22 +0000
344+++ src/txamqp/test/test_queue.py 2016-11-23 16:38:57 +0000
345@@ -64,7 +64,7 @@
346 #queue specified but doesn't exist:
347 yield channel.queue_purge(queue="invalid-queue")
348 self.fail("Expected failure when purging non-existent queue")
349- except Closed, e:
350+ except Closed as e:
351 self.assertChannelException(404, e.args[0])
352
353 channel = yield self.client.channel(3)
354@@ -73,7 +73,7 @@
355 #queue not specified and none previously declared for channel:
356 yield channel.queue_purge()
357 self.fail("Expected failure when purging unspecified queue")
358- except Closed, e:
359+ except Closed as e:
360 self.assertConnectionException(530, e.args[0])
361
362 #cleanup
363@@ -100,7 +100,7 @@
364 #other connection should not be allowed to declare this:
365 yield c2.queue_declare(queue="exclusive-queue", exclusive="True")
366 self.fail("Expected second exclusive queue_declare to raise a channel exception")
367- except Closed, e:
368+ except Closed as e:
369 self.assertChannelException(405, e.args[0])
370
371 @inlineCallbacks
372@@ -116,7 +116,7 @@
373 #other connection should not be allowed to declare this:
374 yield channel.queue_declare(queue="passive-queue-2", passive="True")
375 self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
376- except Closed, e:
377+ except Closed as e:
378 self.assertChannelException(404, e.args[0])
379
380 @inlineCallbacks
381@@ -140,7 +140,7 @@
382 try:
383 yield channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
384 self.fail("Expected bind to non-existant exchange to fail")
385- except Closed, e:
386+ except Closed as e:
387 self.assertChannelException(404, e.args[0])
388
389 #need to reopen a channel:
390@@ -151,7 +151,7 @@
391 try:
392 yield channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
393 self.fail("Expected bind of non-existant queue to fail")
394- except Closed, e:
395+ except Closed as e:
396 self.assertChannelException(404, e.args[0])
397
398 @inlineCallbacks
399@@ -172,7 +172,7 @@
400 try:
401 yield channel.queue_declare(queue="delete-me", passive="True")
402 self.fail("Queue has not been deleted")
403- except Closed, e:
404+ except Closed as e:
405 self.assertChannelException(404, e.args[0])
406
407 @inlineCallbacks
408@@ -200,7 +200,7 @@
409 result = yield channel.queue_delete(queue="i-dont-exist", if_empty="True")
410 print result
411 self.fail("Expected delete of non-existant queue to fail")
412- except Closed, e:
413+ except Closed as e:
414 self.assertChannelException(404, e.args[0])
415
416 @inlineCallbacks
417@@ -219,7 +219,7 @@
418 try:
419 yield channel.queue_delete(queue="delete-me-2", if_empty="True")
420 self.fail("Expected delete if_empty to fail for non-empty queue")
421- except Closed, e:
422+ except Closed as e:
423 self.assertChannelException(406, e.args[0])
424
425 #need new channel now:
426@@ -240,7 +240,7 @@
427 try:
428 yield channel.queue_declare(queue="delete-me-2", passive="True")
429 self.fail("Queue has not been deleted")
430- except Closed, e:
431+ except Closed as e:
432 self.assertChannelException(404, e.args[0])
433
434 @inlineCallbacks
435@@ -262,7 +262,7 @@
436 try:
437 yield channel2.queue_delete(queue="delete-me-3", if_unused="True")
438 self.fail("Expected delete if_unused to fail for queue with existing consumer")
439- except Closed, e:
440+ except Closed as e:
441 self.assertChannelException(406, e.args[0])
442
443
444@@ -272,7 +272,7 @@
445 try:
446 yield channel.queue_declare(queue="delete-me-3", passive="True")
447 self.fail("Queue has not been deleted")
448- except Closed, e:
449+ except Closed as e:
450 self.assertChannelException(404, e.args[0])
451
452

Subscribers

People subscribed via source and target branches

to status/vote changes: