Merge lp:~me-smira/txamqp/patches into lp:txamqp

Proposed by Andrey Smirnov
Status: Merged
Merge reported by: Esteve Fernandez
Merged at revision: not available
Proposed branch: lp:~me-smira/txamqp/patches
Merge into: lp:txamqp
Diff against target: 44 lines (+22/-0)
2 files modified
src/txamqp/client.py (+4/-0)
src/txamqp/protocol.py (+18/-0)
To merge this branch: bzr merge lp:~me-smira/txamqp/patches
Reviewer Review Type Date Requested Status
Esteve Fernandez Approve
Review via email: mp+19466@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Andrey Smirnov (me-smira) wrote :

Some small patches I had to apply to txAMQP to keep it working.

I'm using it heavily at qik.com, it receives good load (5M+ messages per day pushed through queues per day).

One of the changes fixes bug with channel-flow messages not being implemented. Other are related to scenario when channel is closed by server due to some error (like "queue doesn't exist") and client should reopen the channel to continue operations.

Thanks for great library!

Revision history for this message
Esteve Fernandez (esteve) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/txamqp/client.py'
--- src/txamqp/client.py 2009-08-08 17:54:09 +0000
+++ src/txamqp/client.py 2010-02-17 06:06:16 +0000
@@ -40,7 +40,11 @@
40 def basic_return_(self, ch, msg):40 def basic_return_(self, ch, msg):
41 self.client.basic_return_queue.put(msg)41 self.client.basic_return_queue.put(msg)
4242
43 def channel_flow(self, ch, msg):
44 ch.channel_flow_ok(active=msg.active)
45
43 def channel_close(self, ch, msg):46 def channel_close(self, ch, msg):
47 ch.channel_close_ok()
44 ch.close(msg)48 ch.close(msg)
4549
46 def connection_close(self, ch, msg):50 def connection_close(self, ch, msg):
4751
=== modified file 'src/txamqp/protocol.py'
--- src/txamqp/protocol.py 2009-11-13 18:50:13 +0000
+++ src/txamqp/protocol.py 2010-02-17 06:06:16 +0000
@@ -284,6 +284,24 @@
284 self.queueLock.release()284 self.queueLock.release()
285 defer.returnValue(q)285 defer.returnValue(q)
286286
287 @defer.inlineCallbacks
288 def closeChannel(self, channel):
289 yield self.channelLock.acquire()
290 try:
291 channel.close(None)
292 del self.channels[channel.id]
293 finally:
294 self.channelLock.release()
295
296 @defer.inlineCallbacks
297 def closeQueue(self, key, queue):
298 yield self.queueLock.acquire()
299 try:
300 queue.close()
301 del self.queues[key]
302 finally:
303 self.queueLock.release()
304
287 def close(self, reason):305 def close(self, reason):
288 for ch in self.channels.values():306 for ch in self.channels.values():
289 ch.close(reason)307 ch.close(reason)

Subscribers

People subscribed via source and target branches

to status/vote changes: