Merge lp:~txamqpteam/txamqp/support-basic-return into lp:txamqp

Proposed by Terry Jones
Status: Merged
Merged at revision: not available
Proposed branch: lp:~txamqpteam/txamqp/support-basic-return
Merge into: lp:txamqp
Diff against target: None lines
To merge this branch: bzr merge lp:~txamqpteam/txamqp/support-basic-return
Reviewer Review Type Date Requested Status
Dan Di Spaltro code Approve
Esteve Fernandez Approve
Review via email: mp+7561@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Terry Jones (terrycojones) wrote :

This is work done with Esteve to add support for the AMQP basic return message. It adds a mandatory argument to outgoing messages, resulting in a basic return if the message is not routable.

There is an outstanding issue with what, if anything, to do with Thrift oneway methods. They have no client side deferred whose errback can be called if they are unroutable.

Revision history for this message
Esteve Fernandez (esteve) :
review: Approve
Revision history for this message
Esteve Fernandez (esteve) wrote :

Dan, given that you're the only one I know using Thrift and AMQP together (apart from Terry and me), could you review this branch? Thanks!

22. By Terry Jones

Cleaned up handling of KeyError n txamqp/contrib/thrift/client.py (which had a silly logic error - of mine). Added a __repr__ to the Content class in txamqp/content.py. And I now have a reproducible example of 'headers' not being in msg.content when trying to get the thriftClientName in txamqp/contrib/thrift/client.py though haven't tried to dig into why it happens.

Revision history for this message
Dan Di Spaltro (dan-dispaltro) wrote :

+1

Ran the code works well, very handy to boot, now our services are more reliable. The log stuff could probably be cleaned up a bit and lines shortened but that is just cosmetic.

review: Approve (code)
Revision history for this message
Esteve Fernandez (esteve) wrote :

> +1
>
> Ran the code works well, very handy to boot, now our services are more
> reliable. The log stuff could probably be cleaned up a bit and lines
> shortened but that is just cosmetic.

Thanks! Just merged it.

Revision history for this message
Terry Jones (terrycojones) wrote :

Hi Dan

>>>>> "Dan" == Dan Di Spaltro <email address hidden> writes:
Dan> Ran the code works well, very handy to boot, now our services are more
Dan> reliable. The log stuff could probably be cleaned up a bit and lines
Dan> shortened but that is just cosmetic.

The logging in src/txamqp/contrib/thrift/client.py (if that's what you were
referring to) is there due to very the occasional absence of 'headers' in
msg.content. I've seen it happen a couple of times, and had it in
reproducible form (though in the complex setup that is FluidDB, so not easy
to reduce and post for others). But I've not had time to look at it. So I
left the logging there in the hope that someone else might see this pop up
and feel like digging into it :-) I'm sure I will at some point.

It may be benign / harmless. Esteve, I think, came up with a scenario in
which this might happen, but I forget the details.

Terry

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/examples/client.py'
--- src/examples/client.py 2009-02-11 22:28:45 +0000
+++ src/examples/client.py 2009-06-02 20:33:36 +0000
@@ -6,7 +6,7 @@
6sys.path.insert(0, os.path.join(os.path.abspath(os.path.split(sys.argv[0])[0]), 'gen-py'))6sys.path.insert(0, os.path.join(os.path.abspath(os.path.split(sys.argv[0])[0]), 'gen-py'))
7import tutorial.Calculator7import tutorial.Calculator
8from tutorial.ttypes import *8from tutorial.ttypes import *
9from thrift.transport import TTwisted9from thrift.transport import TTwisted, TTransport
10from thrift.protocol import TBinaryProtocol10from thrift.protocol import TBinaryProtocol
1111
12from twisted.internet import reactor, defer12from twisted.internet import reactor, defer
@@ -34,9 +34,15 @@
34 print results34 print results
3535
36def gotCalculateErrors(error):36def gotCalculateErrors(error):
37 print "Got an error"37 error.trap(InvalidOperation)
38 print "Got a calculator error"
38 print error.value.why39 print error.value.why
3940
41def gotTransportError(error):
42 error.trap(TTransport.TTransportException)
43 print "Got an AMQP unroutable message error:"
44 print error.value.message
45
40@defer.inlineCallbacks46@defer.inlineCallbacks
41def prepareClient(client, username, password):47def prepareClient(client, username, password):
42 yield client.authenticate(username, password)48 yield client.authenticate(username, password)
@@ -48,6 +54,11 @@
48 yield channel.exchange_declare(exchange=responsesExchange, type="direct")54 yield channel.exchange_declare(exchange=responsesExchange, type="direct")
4955
50 pfactory = TBinaryProtocol.TBinaryProtocolFactory()56 pfactory = TBinaryProtocol.TBinaryProtocolFactory()
57
58 # To trigger an unroutable message error (caught in the above
59 # gotTransportError errback), change the routing key (i.e.,
60 # calculatorKey) in the following to be something invalid, like
61 # calculatorKey + 'xxx'.
51 thriftClient = yield client.createThriftClient(responsesExchange,62 thriftClient = yield client.createThriftClient(responsesExchange,
52 servicesExchange, calculatorKey, tutorial.Calculator.Client,63 servicesExchange, calculatorKey, tutorial.Calculator.Client,
53 iprot_factory=pfactory, oprot_factory=pfactory)64 iprot_factory=pfactory, oprot_factory=pfactory)
@@ -55,35 +66,35 @@
55 defer.returnValue(thriftClient)66 defer.returnValue(thriftClient)
5667
57def gotClient(client):68def gotClient(client):
58 d1 = client.ping().addCallback(gotPing)69 d1 = client.ping().addCallback(gotPing).addErrback(gotTransportError)
5970
60 d2 = client.add(1, 2).addCallback(gotAddResults)71 d2 = client.add(1, 2).addCallback(gotAddResults).addErrback(gotTransportError)
6172
62 w = Work(num1=2, num2=3, op=Operation.ADD)73 w = Work(num1=2, num2=3, op=Operation.ADD)
6374
64 d3 = client.calculate(1, w).addCallbacks(gotCalculateResults,75 d3 = client.calculate(1, w).addCallbacks(gotCalculateResults,
65 gotCalculateErrors)76 gotCalculateErrors).addErrback(gotTransportError)
6677
67 w = Work(num1=2, num2=3, op=Operation.SUBTRACT)78 w = Work(num1=2, num2=3, op=Operation.SUBTRACT)
6879
69 d4 = client.calculate(2, w).addCallbacks(gotCalculateResults,80 d4 = client.calculate(2, w).addCallbacks(gotCalculateResults,
70 gotCalculateErrors)81 gotCalculateErrors).addErrback(gotTransportError)
7182
72 w = Work(num1=2, num2=3, op=Operation.MULTIPLY)83 w = Work(num1=2, num2=3, op=Operation.MULTIPLY)
7384
74 d5 = client.calculate(3, w).addCallbacks(gotCalculateResults,85 d5 = client.calculate(3, w).addCallbacks(gotCalculateResults,
75 gotCalculateErrors)86 gotCalculateErrors).addErrback(gotTransportError)
7687
77 w = Work(num1=2, num2=3, op=Operation.DIVIDE)88 w = Work(num1=2, num2=3, op=Operation.DIVIDE)
7889
79 d6 = client.calculate(4, w).addCallbacks(gotCalculateResults,90 d6 = client.calculate(4, w).addCallbacks(gotCalculateResults,
80 gotCalculateErrors)91 gotCalculateErrors).addErrback(gotTransportError)
8192
82 # This will fire an errback93 # This will fire an errback
83 w = Work(num1=2, num2=0, op=Operation.DIVIDE)94 w = Work(num1=2, num2=0, op=Operation.DIVIDE)
8495
85 d7 = client.calculate(5, w).addCallbacks(gotCalculateResults,96 d7 = client.calculate(5, w).addCallbacks(gotCalculateResults,
86 gotCalculateErrors)97 gotCalculateErrors).addErrback(gotTransportError)
8798
88 d8 = client.zip()99 d8 = client.zip()
89100
90101
=== modified file 'src/txamqp/client.py'
--- src/txamqp/client.py 2009-05-28 09:46:03 +0000
+++ src/txamqp/client.py 2009-06-02 20:33:36 +0000
@@ -36,6 +36,9 @@
36 def basic_deliver(self, ch, msg):36 def basic_deliver(self, ch, msg):
37 (yield self.client.queue(msg.consumer_tag)).put(msg)37 (yield self.client.queue(msg.consumer_tag)).put(msg)
3838
39 def basic_return_(self, ch, msg):
40 self.client.basic_return_queue.put(msg)
41
39 def channel_close(self, ch, msg):42 def channel_close(self, ch, msg):
40 ch.close(msg)43 ch.close(msg)
4144
4245
=== added file 'src/txamqp/contrib/thrift/client.py'
--- src/txamqp/contrib/thrift/client.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/contrib/thrift/client.py 2009-06-09 13:29:13 +0000
@@ -0,0 +1,21 @@
1from twisted.internet import defer
2
3from txamqp.client import TwistedDelegate
4
5
6class ThriftTwistedDelegate(TwistedDelegate):
7
8 @defer.inlineCallbacks
9 def basic_return_(self, ch, msg):
10 try:
11 thriftClientName = msg.content['headers']['thriftClientName']
12 except KeyError:
13 from twisted.python import log
14 if 'headers' in msg.content:
15 log.msg("'headers' not in msg.content: %r" % msg.content)
16 else:
17 log.msg("'thriftClientName' not in msg.content headers: %r" %
18 msg.content['headers'])
19 else:
20 (yield self.client.thriftBasicReturnQueue(thriftClientName))\
21 .put(msg)
022
=== modified file 'src/txamqp/contrib/thrift/protocol.py'
--- src/txamqp/contrib/thrift/protocol.py 2009-05-29 00:20:49 +0000
+++ src/txamqp/contrib/thrift/protocol.py 2009-06-09 13:35:48 +0000
@@ -2,6 +2,7 @@
2from txamqp.protocol import AMQClient2from txamqp.protocol import AMQClient
3from txamqp.contrib.thrift.transport import TwistedAMQPTransport3from txamqp.contrib.thrift.transport import TwistedAMQPTransport
4from txamqp.content import Content4from txamqp.content import Content
5from txamqp.queue import TimeoutDeferredQueue
56
6from twisted.internet import defer7from twisted.internet import defer
7from twisted.python import log8from twisted.python import log
@@ -19,6 +20,22 @@
19 else:20 else:
20 self.replyToField = "reply-to"21 self.replyToField = "reply-to"
2122
23 self.thriftBasicReturnQueueLock = defer.DeferredLock()
24 self.thriftBasicReturnQueues = {}
25
26 @defer.inlineCallbacks
27 def thriftBasicReturnQueue(self, key):
28 yield self.thriftBasicReturnQueueLock.acquire()
29 try:
30 try:
31 q = self.thriftBasicReturnQueues[key]
32 except KeyError:
33 q = TimeoutDeferredQueue()
34 self.thriftBasicReturnQueues[key] = q
35 finally:
36 self.thriftBasicReturnQueueLock.release()
37 defer.returnValue(q)
38
22 @defer.inlineCallbacks39 @defer.inlineCallbacks
23 def createThriftClient(self, responsesExchange, serviceExchange,40 def createThriftClient(self, responsesExchange, serviceExchange,
24 routingKey, clientClass, channel=1, responseQueue=None, iprot_factory=None,41 routingKey, clientClass, channel=1, responseQueue=None, iprot_factory=None,
@@ -39,8 +56,11 @@
3956
40 log.msg("Consuming messages on queue: %s" % responseQueue)57 log.msg("Consuming messages on queue: %s" % responseQueue)
4158
42 amqpTransport = TwistedAMQPTransport(channel, serviceExchange,59 thriftClientName = clientClass.__name__ + routingKey
43 routingKey, replyTo=responseQueue, replyToField=self.replyToField)60
61 amqpTransport = TwistedAMQPTransport(
62 channel, serviceExchange, routingKey, clientName=thriftClientName,
63 replyTo=responseQueue, replyToField=self.replyToField)
4464
45 if iprot_factory is None:65 if iprot_factory is None:
46 iprot_factory = self.factory.iprot_factory66 iprot_factory = self.factory.iprot_factory
@@ -54,10 +74,16 @@
54 queue.get().addCallback(self.parseClientMessage, channel, queue,74 queue.get().addCallback(self.parseClientMessage, channel, queue,
55 thriftClient, iprot_factory=iprot_factory)75 thriftClient, iprot_factory=iprot_factory)
5676
77 basicReturnQueue = yield self.thriftBasicReturnQueue(thriftClientName)
78
79 basicReturnQueue.get().addCallback(
80 self.parseClientUnrouteableMessage, channel, basicReturnQueue,
81 thriftClient, iprot_factory=iprot_factory)
82
57 defer.returnValue(thriftClient)83 defer.returnValue(thriftClient)
5884
59 def parseClientMessage(self, msg, channel, queue, thriftClient,85 def parseClientMessage(self, msg, channel, queue, thriftClient,
60 iprot_factory=None):86 iprot_factory=None):
61 deliveryTag = msg.delivery_tag87 deliveryTag = msg.delivery_tag
62 tr = TTransport.TMemoryBuffer(msg.content.body)88 tr = TTransport.TMemoryBuffer(msg.content.body)
63 if iprot_factory is None:89 if iprot_factory is None:
@@ -66,6 +92,12 @@
66 iprot = iprot_factory.getProtocol(tr)92 iprot = iprot_factory.getProtocol(tr)
67 (fname, mtype, rseqid) = iprot.readMessageBegin()93 (fname, mtype, rseqid) = iprot.readMessageBegin()
6894
95 if rseqid in thriftClient._reqs:
96 # log.msg('Got reply: fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body))
97 pass
98 else:
99 log.msg('Missing rseqid! fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body))
100
69 method = getattr(thriftClient, 'recv_' + fname)101 method = getattr(thriftClient, 'recv_' + fname)
70 method(iprot, mtype, rseqid)102 method(iprot, mtype, rseqid)
71103
@@ -73,6 +105,34 @@
73 queue.get().addCallback(self.parseClientMessage, channel, queue,105 queue.get().addCallback(self.parseClientMessage, channel, queue,
74 thriftClient, iprot_factory=iprot_factory)106 thriftClient, iprot_factory=iprot_factory)
75107
108 def parseClientUnrouteableMessage(self, msg, channel, queue, thriftClient,
109 iprot_factory=None):
110 tr = TTransport.TMemoryBuffer(msg.content.body)
111 if iprot_factory is None:
112 iprot = self.factory.iprot_factory.getProtocol(tr)
113 else:
114 iprot = iprot_factory.getProtocol(tr)
115 (fname, mtype, rseqid) = iprot.readMessageBegin()
116
117 # log.msg('Got unroutable. fname = %r, rseqid = %s, mtype = %r, routing key = %r, client = %r, msg.content.body = %r' % (fname, rseqid, mtype, msg.routing_key, thriftClient, msg.content.body))
118
119 try:
120 d = thriftClient._reqs.pop(rseqid)
121 except KeyError:
122 # KeyError will occur if the remote Thrift method is oneway,
123 # since there is no outstanding local request deferred for
124 # oneway calls.
125 pass
126 else:
127 d.errback(TTransport.TTransportException(
128 type=TTransport.TTransportException.NOT_OPEN,
129 message='Unrouteable message, routing key = %r calling function %r'
130 % (msg.routing_key, fname)))
131
132 queue.get().addCallback(
133 self.parseClientUnrouteableMessage, channel, queue,
134 thriftClient, iprot_factory=iprot_factory)
135
76 def parseServerMessage(self, msg, channel, exchange, queue, processor,136 def parseServerMessage(self, msg, channel, exchange, queue, processor,
77 iprot_factory=None, oprot_factory=None):137 iprot_factory=None, oprot_factory=None):
78 deliveryTag = msg.delivery_tag138 deliveryTag = msg.delivery_tag
79139
=== modified file 'src/txamqp/contrib/thrift/transport.py'
--- src/txamqp/contrib/thrift/transport.py 2009-02-11 22:28:45 +0000
+++ src/txamqp/contrib/thrift/transport.py 2009-06-05 13:34:08 +0000
@@ -2,18 +2,26 @@
2from thrift.transport import TTwisted2from thrift.transport import TTwisted
33
4class TwistedAMQPTransport(TTwisted.TMessageSenderTransport):4class TwistedAMQPTransport(TTwisted.TMessageSenderTransport):
5 def __init__(self, channel, exchange, routingKey, replyTo=None, replyToField=None):5 def __init__(self, channel, exchange, routingKey, clientName=None,
6 replyTo=None, replyToField=None):
6 TTwisted.TMessageSenderTransport.__init__(self)7 TTwisted.TMessageSenderTransport.__init__(self)
7 self.channel = channel8 self.channel = channel
8 self.exchange = exchange9 self.exchange = exchange
9 self.routingKey = routingKey10 self.routingKey = routingKey
11 # clientName is the name of the client class we are trying to get
12 # the message through to. We need to send it seeing as the message
13 # may be unroutable and we need a basic return that will tell us
14 # who were trying to reach.
15 self.clientName = clientName
10 self.replyTo = replyTo16 self.replyTo = replyTo
11 self.replyToField = replyToField17 self.replyToField = replyToField
1218
13 def sendMessage(self, message):19 def sendMessage(self, message):
14 content = Content(body=message)20 content = Content(body=message)
21 if self.clientName:
22 content['headers'] = { 'thriftClientName' : self.clientName }
15 if self.replyTo:23 if self.replyTo:
16 content[self.replyToField] = self.replyTo24 content[self.replyToField] = self.replyTo
1725
18 self.channel.basic_publish(exchange=self.exchange,26 self.channel.basic_publish(exchange=self.exchange,
19 routing_key=self.routingKey, content=content)27 routing_key=self.routingKey, content=content, mandatory=True)
2028
=== modified file 'src/txamqp/protocol.py'
--- src/txamqp/protocol.py 2009-06-01 16:35:03 +0000
+++ src/txamqp/protocol.py 2009-06-09 13:35:48 +0000
@@ -220,6 +220,7 @@
220 self.started = TwistedEvent()220 self.started = TwistedEvent()
221221
222 self.queueLock = defer.DeferredLock()222 self.queueLock = defer.DeferredLock()
223 self.basic_return_queue = TimeoutDeferredQueue()
223224
224 self.queues = {}225 self.queues = {}
225226
@@ -254,7 +255,7 @@
254 finally:255 finally:
255 self.queueLock.release()256 self.queueLock.release()
256 defer.returnValue(q)257 defer.returnValue(q)
257 258
258 def close(self, reason):259 def close(self, reason):
259 for ch in self.channels.values():260 for ch in self.channels.values():
260 ch.close(reason)261 ch.close(reason)

Subscribers

People subscribed via source and target branches

to status/vote changes: