Merge lp:~free.ekanayaka/txamqp/distinguish-channel-and-connection-errors into lp:txamqp
- distinguish-channel-and-connection-errors
- Merge into trunk
Proposed by
Free Ekanayaka
Status: | Merged |
---|---|
Merged at revision: | 77 |
Proposed branch: | lp:~free.ekanayaka/txamqp/distinguish-channel-and-connection-errors |
Merge into: | lp:txamqp |
Diff against target: |
450 lines (+129/-34) 8 files modified
setup.py (+1/-1) src/txamqp/client.py (+22/-1) src/txamqp/protocol.py (+13/-4) src/txamqp/test/test_basic.py (+23/-7) src/txamqp/test/test_broker.py (+3/-3) src/txamqp/test/test_exchange.py (+26/-5) src/txamqp/test/test_protocol.py (+29/-1) src/txamqp/test/test_queue.py (+12/-12) |
To merge this branch: | bzr merge lp:~free.ekanayaka/txamqp/distinguish-channel-and-connection-errors |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Alberto Donato (community) | Approve | ||
Review via email: mp+311633@code.launchpad.net |
Commit message
Description of the change
Sub-class the generic txamqp.
Note that these are protocol-level errors, non transport-level ones (i.e. the TCP connection is still open after they happen).
While at it, some try/except blocks were migrated to py3-compatible syntax ("Exception, e:" vs "Exception as e:").
To post a comment you must log in.
- 90. By Free Ekanayaka
-
Use py3-compatible try/except syntax
Revision history for this message
Free Ekanayaka (free.ekanayaka) : | # |
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'setup.py' |
2 | --- setup.py 2013-03-03 13:40:48 +0000 |
3 | +++ setup.py 2016-11-23 16:38:57 +0000 |
4 | @@ -1,6 +1,6 @@ |
5 | setupdict= { |
6 | 'name': 'txAMQP', |
7 | - 'version': '0.6.2', |
8 | + 'version': '0.7.0', |
9 | 'author': 'Esteve Fernandez', |
10 | 'author_email': 'esteve@fluidinfo.com', |
11 | 'url': 'https://launchpad.net/txamqp', |
12 | |
13 | === modified file 'src/txamqp/client.py' |
14 | --- src/txamqp/client.py 2016-06-06 07:09:04 +0000 |
15 | +++ src/txamqp/client.py 2016-11-23 16:38:57 +0000 |
16 | @@ -2,12 +2,33 @@ |
17 | from twisted.internet import defer |
18 | from txamqp.delegate import Delegate |
19 | |
20 | + |
21 | class Closed(Exception): |
22 | - pass |
23 | + """Raised if either a channel or the whole connection got closed.""" |
24 | + |
25 | + |
26 | +class ChannelClosed(Closed): |
27 | + """Raised if a channel got closed because of a channel error. |
28 | + |
29 | + This happens when the broker sends us a 'channel-close' method. |
30 | + |
31 | + @see: The AMQP specification for possible channel error codes. |
32 | + """ |
33 | + |
34 | + |
35 | +class ConnectionClosed(Closed): |
36 | + """Raised if the connection got closed because of a connection error. |
37 | + |
38 | + This happens when the broker sends us a 'connection-close' method. |
39 | + |
40 | + @see: The AMQP specification for possible connection error codes. |
41 | + """ |
42 | + |
43 | |
44 | class AlreadyFiredError(Exception): |
45 | pass |
46 | |
47 | + |
48 | class TwistedEvent(object): |
49 | """ |
50 | An asynchronous event that is in one of three states: |
51 | |
52 | === modified file 'src/txamqp/protocol.py' |
53 | --- src/txamqp/protocol.py 2016-11-22 13:31:30 +0000 |
54 | +++ src/txamqp/protocol.py 2016-11-23 16:38:57 +0000 |
55 | @@ -10,7 +10,7 @@ |
56 | from txamqp.message import Message |
57 | from txamqp.content import Content |
58 | from txamqp.queue import TimeoutDeferredQueue, Closed as QueueClosed |
59 | -from txamqp.client import TwistedEvent, Closed |
60 | +from txamqp.client import TwistedEvent, Closed, ConnectionClosed, ChannelClosed |
61 | from cStringIO import StringIO |
62 | import struct |
63 | from time import time |
64 | @@ -66,7 +66,7 @@ |
65 | @defer.inlineCallbacks |
66 | def invoke(self, method, args, content=None): |
67 | if self.closed: |
68 | - raise Closed(self.reason) |
69 | + self._raiseClosed(self.reason) |
70 | frame = Frame(self.id, Method(method, *args)) |
71 | self.outgoing.put(frame) |
72 | |
73 | @@ -96,7 +96,7 @@ |
74 | raise ValueError(resp) |
75 | except QueueClosed, e: |
76 | if self.closed: |
77 | - raise Closed(self.reason) |
78 | + self._raiseClosed(self.reason) |
79 | else: |
80 | raise e |
81 | |
82 | @@ -112,6 +112,15 @@ |
83 | chunk = content.body[i:i + maxChunkSize] |
84 | queue.put(Frame(self.id, Body(chunk))) |
85 | |
86 | + def _raiseClosed(self, reason): |
87 | + """Raise the appropriate Closed-based error for the given reason.""" |
88 | + if isinstance(reason, Message): |
89 | + if reason.method.klass.name == "channel": |
90 | + raise ChannelClosed(reason) |
91 | + elif reason.method.klass.name == "connection": |
92 | + raise ConnectionClosed(reason) |
93 | + raise Closed(reason) |
94 | + |
95 | |
96 | class FrameReceiver(protocol.Protocol, basic._PauseableMixin): |
97 | |
98 | @@ -318,7 +327,7 @@ |
99 | cleanly, by sending a "close" method and waiting for "close-ok". If |
100 | no reply is received within the given amount of seconds, the |
101 | transport will be forcely shutdown. |
102 | - """ |
103 | + """ |
104 | if self.closed: |
105 | return |
106 | |
107 | |
108 | === modified file 'src/txamqp/test/test_basic.py' |
109 | --- src/txamqp/test/test_basic.py 2016-06-08 13:10:58 +0000 |
110 | +++ src/txamqp/test/test_basic.py 2016-11-23 16:38:57 +0000 |
111 | @@ -16,10 +16,11 @@ |
112 | # specific language governing permissions and limitations |
113 | # under the License. |
114 | # |
115 | -from txamqp.client import Closed |
116 | +from txamqp.client import ConnectionClosed, ChannelClosed |
117 | from txamqp.queue import Empty |
118 | from txamqp.content import Content |
119 | -from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ |
120 | +from txamqp.testlib import ( |
121 | + TestBase, supportedBrokers, QPID, OPENAMQ, RABBITMQ) |
122 | |
123 | from twisted.internet.defer import inlineCallbacks |
124 | |
125 | @@ -68,7 +69,7 @@ |
126 | try: |
127 | yield channel.basic_consume(consumer_tag="second", queue="test-queue-2") |
128 | self.fail("Expected consume request to fail due to previous exclusive consumer") |
129 | - except Closed, e: |
130 | + except ChannelClosed as e: |
131 | self.assertChannelException(403, e.args[0]) |
132 | |
133 | #open new channel and cleanup last consumer: |
134 | @@ -80,7 +81,7 @@ |
135 | try: |
136 | yield channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True) |
137 | self.fail("Expected exclusive consume request to fail due to previous consumer") |
138 | - except Closed, e: |
139 | + except ChannelClosed as e: |
140 | self.assertChannelException(403, e.args[0]) |
141 | |
142 | @inlineCallbacks |
143 | @@ -94,7 +95,7 @@ |
144 | #queue specified but doesn't exist: |
145 | yield channel.basic_consume(queue="invalid-queue") |
146 | self.fail("Expected failure when consuming from non-existent queue") |
147 | - except Closed, e: |
148 | + except ChannelClosed as e: |
149 | self.assertChannelException(404, e.args[0]) |
150 | |
151 | @supportedBrokers(QPID, OPENAMQ) |
152 | @@ -109,9 +110,24 @@ |
153 | #queue not specified and none previously declared for channel: |
154 | yield channel.basic_consume(queue="") |
155 | self.fail("Expected failure when consuming from unspecified queue") |
156 | - except Closed, e: |
157 | + except ConnectionClosed, e: |
158 | self.assertConnectionException(530, e.args[0]) |
159 | |
160 | + @supportedBrokers(RABBITMQ) |
161 | + @inlineCallbacks |
162 | + def test_consume_queue_unspecified_rabbit(self): |
163 | + """ |
164 | + C{basic_consume} fails with a channel exception with a C{404} code |
165 | + when no queue is specified. |
166 | + """ |
167 | + channel = self.channel |
168 | + try: |
169 | + #queue not specified and none previously declared for channel: |
170 | + yield channel.basic_consume(queue="") |
171 | + self.fail("Expected failure when consuming from unspecified queue") |
172 | + except ChannelClosed as e: |
173 | + self.assertChannelException(404, e.args[0]) |
174 | + |
175 | @inlineCallbacks |
176 | def test_consume_unique_consumers(self): |
177 | """ |
178 | @@ -126,7 +142,7 @@ |
179 | try: |
180 | yield channel.basic_consume(consumer_tag="first", queue="test-queue-3") |
181 | self.fail("Expected consume request to fail due to non-unique tag") |
182 | - except Closed, e: |
183 | + except ConnectionClosed, e: |
184 | self.assertConnectionException(530, e.args[0]) |
185 | |
186 | @inlineCallbacks |
187 | |
188 | === modified file 'src/txamqp/test/test_broker.py' |
189 | --- src/txamqp/test/test_broker.py 2016-06-06 07:09:04 +0000 |
190 | +++ src/txamqp/test/test_broker.py 2016-11-23 16:38:57 +0000 |
191 | @@ -16,7 +16,7 @@ |
192 | # specific language governing permissions and limitations |
193 | # under the License. |
194 | # |
195 | -from txamqp.client import Closed |
196 | +from txamqp.client import ConnectionClosed |
197 | from txamqp.queue import Empty |
198 | from txamqp.content import Content |
199 | from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ |
200 | @@ -132,7 +132,7 @@ |
201 | try: |
202 | yield channel.queue_declare(exclusive=True) |
203 | self.fail("Expected error on queue_declare for invalid channel") |
204 | - except Closed, e: |
205 | + except ConnectionClosed as e: |
206 | self.assertConnectionException(504, e.args[0]) |
207 | |
208 | @inlineCallbacks |
209 | @@ -143,7 +143,7 @@ |
210 | try: |
211 | yield channel.queue_declare(exclusive=True) |
212 | self.fail("Expected error on queue_declare for closed channel") |
213 | - except Closed, e: |
214 | + except ConnectionClosed as e: |
215 | self.assertConnectionException(504, e.args[0]) |
216 | |
217 | @supportedBrokers(QPID, OPENAMQ) |
218 | |
219 | === modified file 'src/txamqp/test/test_exchange.py' |
220 | --- src/txamqp/test/test_exchange.py 2011-02-08 12:34:54 +0000 |
221 | +++ src/txamqp/test/test_exchange.py 2016-11-23 16:38:57 +0000 |
222 | @@ -24,9 +24,9 @@ |
223 | """ |
224 | |
225 | from txamqp.queue import Empty |
226 | -from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ |
227 | +from txamqp.testlib import TestBase, supportedBrokers, QPID, OPENAMQ, RABBITMQ |
228 | from txamqp.content import Content |
229 | -from txamqp.client import Closed |
230 | +from txamqp.client import ChannelClosed, ConnectionClosed |
231 | |
232 | from twisted.internet.defer import inlineCallbacks |
233 | |
234 | @@ -238,7 +238,7 @@ |
235 | try: |
236 | yield self.channel.exchange_declare(exchange="humpty_dumpty", passive=True) |
237 | self.fail("Expected 404 for passive declaration of unknown exchange.") |
238 | - except Closed, e: |
239 | + except ChannelClosed as e: |
240 | self.assertChannelException(404, e.args[0]) |
241 | |
242 | |
243 | @@ -334,7 +334,7 @@ |
244 | try: |
245 | yield self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type") |
246 | self.fail("Expected 503 for declaration of unknown exchange type.") |
247 | - except Closed, e: |
248 | + except ConnectionClosed as e: |
249 | self.assertConnectionException(503, e.args[0]) |
250 | |
251 | @supportedBrokers(QPID, OPENAMQ) |
252 | @@ -344,7 +344,7 @@ |
253 | try: |
254 | yield self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic") |
255 | self.fail("Expected 530 for redeclaration of exchange with different type.") |
256 | - except Closed, e: |
257 | + except ConnectionClosed as e: |
258 | self.assertConnectionException(530, e.args[0]) |
259 | #cleanup |
260 | other = yield self.connect() |
261 | @@ -352,3 +352,24 @@ |
262 | yield c2.channel_open() |
263 | yield c2.exchange_delete(exchange="test_different_declared_type_exchange") |
264 | |
265 | + @supportedBrokers(RABBITMQ) |
266 | + @inlineCallbacks |
267 | + def testDifferentDeclaredTypeRabbit(self): |
268 | + """Test redeclaration of exchange with different type on RabbitMQ.""" |
269 | + yield self.channel.exchange_declare( |
270 | + exchange="test_different_declared_type_exchange", type="direct") |
271 | + try: |
272 | + yield self.channel.exchange_declare( |
273 | + exchange="test_different_declared_type_exchange", type="topic") |
274 | + self.fail( |
275 | + "Expected 406 for redeclaration of exchange with " |
276 | + "different type.") |
277 | + except ChannelClosed as e: |
278 | + self.assertChannelException(406, e.args[0]) |
279 | + finally: |
280 | + # cleanup |
281 | + other = yield self.connect() |
282 | + c2 = yield other.channel(1) |
283 | + yield c2.channel_open() |
284 | + yield c2.exchange_delete( |
285 | + exchange="test_different_declared_type_exchange") |
286 | |
287 | === modified file 'src/txamqp/test/test_protocol.py' |
288 | --- src/txamqp/test/test_protocol.py 2016-06-06 07:09:04 +0000 |
289 | +++ src/txamqp/test/test_protocol.py 2016-11-23 16:38:57 +0000 |
290 | @@ -4,7 +4,8 @@ |
291 | from twisted.logger import Logger |
292 | |
293 | from txamqp.protocol import AMQClient |
294 | -from txamqp.client import TwistedDelegate, Closed |
295 | +from txamqp.client import ( |
296 | + TwistedDelegate, Closed, ConnectionClosed, ChannelClosed) |
297 | from txamqp.testing import AMQPump |
298 | from txamqp.spec import DEFAULT_SPEC, load |
299 | from txamqp.queue import Closed as QueueClosed |
300 | @@ -34,6 +35,14 @@ |
301 | channel0 = self.successResultOf(self.protocol.channel(0)) |
302 | self.assertTrue(channel0.closed) |
303 | |
304 | + def test_connection_close_raises_error(self): |
305 | + """Test receiving a connection-close method raises ConnectionClosed.""" |
306 | + channel = self.successResultOf(self.protocol.channel(0)) |
307 | + d = channel.basic_consume(queue="test-queue") |
308 | + self.transport.channel(0).connection_close(reply_code=320) |
309 | + failure = self.failureResultOf(d) |
310 | + self.assertIsInstance(failure.value, ConnectionClosed) |
311 | + |
312 | def test_close(self): |
313 | """Test explicitely closing a client.""" |
314 | d = self.protocol.close() |
315 | @@ -106,6 +115,25 @@ |
316 | self.assertIsInstance(failure.value, Closed) |
317 | self.assertIsInstance(failure.value.args[0].value, ConnectionLost) |
318 | |
319 | + def test_channel_close(self): |
320 | + """Test receiving a channel-close method raises ChannelClosed.""" |
321 | + channel = self.successResultOf(self.protocol.channel(0)) |
322 | + d = channel.basic_consume(queue="non-existing-queue") |
323 | + self.transport.channel(0).channel_close(reply_code=404) |
324 | + failure = self.failureResultOf(d) |
325 | + self.assertIsInstance(failure.value, ChannelClosed) |
326 | + |
327 | + def test_sending_method_on_closed_channel(self): |
328 | + """Sending a method on a closed channel fails immediately.""" |
329 | + channel = self.successResultOf(self.protocol.channel(0)) |
330 | + self.transport.channel(0).connection_close(reply_code=320) |
331 | + self.transport.outgoing.clear() |
332 | + d = channel.basic_consume(queue="test-queue") |
333 | + # No frames were sent |
334 | + self.assertEqual({}, self.transport.outgoing) |
335 | + failure = self.failureResultOf(d) |
336 | + self.assertIsInstance(failure.value, ConnectionClosed) |
337 | + |
338 | def test_disconnected_event(self): |
339 | """Test disconnected event fired after the connection is lost.""" |
340 | deferred = self.protocol.disconnected.wait() |
341 | |
342 | === modified file 'src/txamqp/test/test_queue.py' |
343 | --- src/txamqp/test/test_queue.py 2016-05-23 14:41:22 +0000 |
344 | +++ src/txamqp/test/test_queue.py 2016-11-23 16:38:57 +0000 |
345 | @@ -64,7 +64,7 @@ |
346 | #queue specified but doesn't exist: |
347 | yield channel.queue_purge(queue="invalid-queue") |
348 | self.fail("Expected failure when purging non-existent queue") |
349 | - except Closed, e: |
350 | + except Closed as e: |
351 | self.assertChannelException(404, e.args[0]) |
352 | |
353 | channel = yield self.client.channel(3) |
354 | @@ -73,7 +73,7 @@ |
355 | #queue not specified and none previously declared for channel: |
356 | yield channel.queue_purge() |
357 | self.fail("Expected failure when purging unspecified queue") |
358 | - except Closed, e: |
359 | + except Closed as e: |
360 | self.assertConnectionException(530, e.args[0]) |
361 | |
362 | #cleanup |
363 | @@ -100,7 +100,7 @@ |
364 | #other connection should not be allowed to declare this: |
365 | yield c2.queue_declare(queue="exclusive-queue", exclusive="True") |
366 | self.fail("Expected second exclusive queue_declare to raise a channel exception") |
367 | - except Closed, e: |
368 | + except Closed as e: |
369 | self.assertChannelException(405, e.args[0]) |
370 | |
371 | @inlineCallbacks |
372 | @@ -116,7 +116,7 @@ |
373 | #other connection should not be allowed to declare this: |
374 | yield channel.queue_declare(queue="passive-queue-2", passive="True") |
375 | self.fail("Expected passive declaration of non-existant queue to raise a channel exception") |
376 | - except Closed, e: |
377 | + except Closed as e: |
378 | self.assertChannelException(404, e.args[0]) |
379 | |
380 | @inlineCallbacks |
381 | @@ -140,7 +140,7 @@ |
382 | try: |
383 | yield channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") |
384 | self.fail("Expected bind to non-existant exchange to fail") |
385 | - except Closed, e: |
386 | + except Closed as e: |
387 | self.assertChannelException(404, e.args[0]) |
388 | |
389 | #need to reopen a channel: |
390 | @@ -151,7 +151,7 @@ |
391 | try: |
392 | yield channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") |
393 | self.fail("Expected bind of non-existant queue to fail") |
394 | - except Closed, e: |
395 | + except Closed as e: |
396 | self.assertChannelException(404, e.args[0]) |
397 | |
398 | @inlineCallbacks |
399 | @@ -172,7 +172,7 @@ |
400 | try: |
401 | yield channel.queue_declare(queue="delete-me", passive="True") |
402 | self.fail("Queue has not been deleted") |
403 | - except Closed, e: |
404 | + except Closed as e: |
405 | self.assertChannelException(404, e.args[0]) |
406 | |
407 | @inlineCallbacks |
408 | @@ -200,7 +200,7 @@ |
409 | result = yield channel.queue_delete(queue="i-dont-exist", if_empty="True") |
410 | print result |
411 | self.fail("Expected delete of non-existant queue to fail") |
412 | - except Closed, e: |
413 | + except Closed as e: |
414 | self.assertChannelException(404, e.args[0]) |
415 | |
416 | @inlineCallbacks |
417 | @@ -219,7 +219,7 @@ |
418 | try: |
419 | yield channel.queue_delete(queue="delete-me-2", if_empty="True") |
420 | self.fail("Expected delete if_empty to fail for non-empty queue") |
421 | - except Closed, e: |
422 | + except Closed as e: |
423 | self.assertChannelException(406, e.args[0]) |
424 | |
425 | #need new channel now: |
426 | @@ -240,7 +240,7 @@ |
427 | try: |
428 | yield channel.queue_declare(queue="delete-me-2", passive="True") |
429 | self.fail("Queue has not been deleted") |
430 | - except Closed, e: |
431 | + except Closed as e: |
432 | self.assertChannelException(404, e.args[0]) |
433 | |
434 | @inlineCallbacks |
435 | @@ -262,7 +262,7 @@ |
436 | try: |
437 | yield channel2.queue_delete(queue="delete-me-3", if_unused="True") |
438 | self.fail("Expected delete if_unused to fail for queue with existing consumer") |
439 | - except Closed, e: |
440 | + except Closed as e: |
441 | self.assertChannelException(406, e.args[0]) |
442 | |
443 | |
444 | @@ -272,7 +272,7 @@ |
445 | try: |
446 | yield channel.queue_declare(queue="delete-me-3", passive="True") |
447 | self.fail("Queue has not been deleted") |
448 | - except Closed, e: |
449 | + except Closed as e: |
450 | self.assertChannelException(404, e.args[0]) |
451 | |
452 |
nice, +1
just a few nits inline