Merge lp:~elachuni/txamqp/heartbeat into lp:txamqp

Proposed by Anthony Lenton
Status: Merged
Merged at revision: not available
Proposed branch: lp:~elachuni/txamqp/heartbeat
Merge into: lp:txamqp
Diff against target: None lines
To merge this branch: bzr merge lp:~elachuni/txamqp/heartbeat
Reviewer Review Type Date Requested Status
Thomas Herve Approve
Review via email: mp+7078@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Anthony Lenton (elachuni) wrote :

Added handling for heartbeat frames.
The heartbeat interval is configurable by a AMQClient constructor parameter.
Added a brief test to check that it works.

It's only tested on RabbitMQ, but it should work on qpid too.

Revision history for this message
Thomas Herve (therve) wrote :

Thanks a lot for this code Anthony! It's going to be pretty useful. Here are some comments:

[1] There is a small conflict with current trunk.

[2] The connectionLostEvent doesn't seem to be used anywhere, it can probably be removed.

[3] The test is a bit bad: I understand you want to test it live, but a test running for 8 seconds is too long. Also, setTimeout is deprecated.

Thanks!

review: Needs Fixing
lp:~elachuni/txamqp/heartbeat updated
20. By Anthony Lenton

- Merged changes from trunk
- Removed unused connectionLostEvent
- Fixed test.

Revision history for this message
Anthony Lenton (elachuni) wrote :

Hi Thomas,
>
> [1] There is a small conflict with current trunk.
Merged and resolved.

>
> [2] The connectionLostEvent doesn't seem to be used anywhere, it can probably
> be removed.
Right, it was being used by client code here to easily access connectionLost events, but definitely shouldn't be part of the same patch. Removed.

>
> [3] The test is a bit bad: I understand you want to test it live, but a test
> running for 8 seconds is too long. Also, setTimeout is deprecated.
I've reduced it to 3 seconds. If that's still too long we'll need to test it some other way, as you can't ask for a heartbeat interval of less than one second. And it's not calling setTimeout now.

>
> Thanks!
Thank you! :)

Revision history for this message
Thomas Herve (therve) wrote :

OK, some last comments:

1) There is a conflict again :). Be careful here, because connectionLost is now defined in trunk, so you want to move your clean code.

2) The heartbeat test class is called TxTests, it should be named HeartbeatTests. The test file doesn't need the ASF license header too.

Thanks, +1!

review: Approve
lp:~elachuni/txamqp/heartbeat updated
21. By Anthony Lenton

Merged changes from trunk.

Revision history for this message
Esteve Fernandez (esteve) wrote :

Sorry for the slow response. I think adding heartbeat is a very useful feature, thanks for taking the time for implementing it. However, there's a couple of issues:

1 - The virtual host was changed to "/", instead of "localhost". We really need to implement "profiles" or whatever, so we can select some sane defaults for every broker. Until then, I'm not comfortable with introducing a change that's not intrinsic to the problem your branch solves.

2 - Although the code works and is clear, I'd rather use a LoopingCall (http://twistedmatrix.com/documents/8.2.0/api/twisted.internet.task.LoopingCall.html). It's already built into Twisted, so you don't need to implement things like lastSent, lastReceived, etc. I think it would be a bit easier to do something like this (it's just pseudo-python, can't guarantee that it works):

class AMQClient(...):
    # Max unreceived heartbeat frames. The AMQP standard says it's 3.
    MAX_UNSEEN_HEARTBEAT = 3

    def __init__(self, ...):
        ...
        if self.heartbeatInterval > 0:
            self.sendHB = LoopingCall(self.sendHeartbeat)
            self.checkHB = LoopingCall(self.checkHeartbeat)
            d = self.started.wait()
            d.addCallback(lambda _: self.sendHB.start(self.heartbeatInterval, now=False))
            d.addCallback(lambda _: self.checkHB.start(
                self.heartbeatInterval * self.MAX_UNSEEN_HEARTBEAT, now=False))

    def sendHeartbeat(self):
        self.sendFrame(Frame(0, Heartbeat()))

    def checkHeartbeat(self):
        self.checkHB.stop()
        self.transport.loseConnection()

    def processFrame(self, frame):
        ...
        if frame.payload.type != Frame.HEARTBEAT:
            self.sendHB.stop()
            self.sendHB.start(self.heartbeatInterval, now=False)
            ch.dispatch(frame, self.work)
        self.checkHB.stop()
        self.checkHb.start(self.heartbeatInterval * self.MAX_UNSEEN_HEARTBEAT, now=False)

What do you think guys? The changes are minimal and I think they are a bit easier to read.

Revision history for this message
Esteve Fernandez (esteve) wrote :

I just realized that self.checkHB doesn't need to be a LoopingCall, reactor.callLater is enough.

lp:~elachuni/txamqp/heartbeat updated
22. By Anthony Lenton

Putting back 'localhost' vhost.

Revision history for this message
Anthony Lenton (elachuni) wrote :

> 1 - The virtual host was changed to "/", instead of "localhost".

Yikes, that was unintentionally left in from running the tests :-/
I've put the 'localhost' vhost back in rev.22, sorry for that.

> 2 - Although the code works and is clear, I'd rather use a LoopingCall (http:/
> /twistedmatrix.com/documents/8.2.0/api/twisted.internet.task.LoopingCall.html)
> . It's already built into Twisted, so you don't need to implement things like
> lastSent, lastReceived, etc. I think it would be a bit easier to do something
> like this (it's just pseudo-python, can't guarantee that it works):
>
> (...code...)
>
> What do you think guys? The changes are minimal and I think they are a bit
> easier to read.

I gave your code a try and it works well, with a couple of changes. However I don't really see much benefit in using LoopingCall in this case:
 * checkHB could use a regular callLater call, as you've pointed out.
 * The rescheduling of sendHB should actually be done in sendFrame, not in processFrame, as it's sending
frames (not receiving them) that postpones the need to send a HB frame.
 * After changing that it's practically the same to be using a regular callLater instead of a LoopingCall for sendHB; if in sendFrame you reschedule sendHB whatever the payload type (instead of filtering out HBs) it effectively behaves like a LoopingCall.
 * We do get rid of lastSent and lastReceived, but instead we'd be using two callbacks instead of one (checkHeartbeat and sendHeartbeat vs. heartbeatHandler), and two 'callback handles' (checkHB and sendHB vs. pendingHeartbeat).
 * The free bonus you get with lastSent and lastReceived is a bit of accountability about how long ago you sent and received frames (including heartbeat frames), which makes it easier to test.

I haven't pushed these changes into this branch, but if you want I can push a working version using LoopingCalls on some other branch so we can compare.

lp:~elachuni/txamqp/heartbeat updated
23. By Anthony Lenton

Removed License header and fixed class name, per therve's review.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/txamqp/client.py'
--- src/txamqp/client.py 2009-05-04 11:12:36 +0000
+++ src/txamqp/client.py 2009-05-05 15:41:08 +0000
@@ -29,7 +29,8 @@
2929
30 def connection_tune(self, ch, msg):30 def connection_tune(self, ch, msg):
31 self.client.MAX_LENGTH = msg.frame_max31 self.client.MAX_LENGTH = msg.frame_max
32 ch.connection_tune_ok(*msg.fields)32 args = msg.channel_max, msg.frame_max, self.client.heartbeatInterval
33 ch.connection_tune_ok(*args)
33 self.client.started.set()34 self.client.started.set()
3435
35 @defer.inlineCallbacks36 @defer.inlineCallbacks
3637
=== modified file 'src/txamqp/connection.py'
--- src/txamqp/connection.py 2008-10-29 18:31:04 +0000
+++ src/txamqp/connection.py 2009-04-27 17:00:08 +0000
@@ -196,3 +196,15 @@
196196
197 def __str__(self):197 def __str__(self):
198 return "Body(%r)" % self.content198 return "Body(%r)" % self.content
199
200class Heartbeat(Payload):
201 type = Frame.HEARTBEAT
202 def __str__(self):
203 return "Heartbeat()"
204
205 def encode(self, enc):
206 enc.encode_long(0)
207
208 def decode(spec, dec):
209 dec.decode_long()
210 return Heartbeat()
199211
=== modified file 'src/txamqp/protocol.py'
--- src/txamqp/protocol.py 2009-05-04 11:12:36 +0000
+++ src/txamqp/protocol.py 2009-06-04 14:42:57 +0000
@@ -1,16 +1,17 @@
1# coding: utf-81# coding: utf-8
2from twisted.python import log2from twisted.python import log
3from twisted.internet import defer, protocol3from twisted.internet import defer, protocol, reactor
4from twisted.protocols import basic4from twisted.protocols import basic
5from txamqp import spec5from txamqp import spec
6from txamqp.codec import Codec, EOF6from txamqp.codec import Codec, EOF
7from txamqp.connection import Header, Frame, Method, Body7from txamqp.connection import Header, Frame, Method, Body, Heartbeat
8from txamqp.message import Message8from txamqp.message import Message
9from txamqp.content import Content9from txamqp.content import Content
10from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed10from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed
11from txamqp.client import TwistedEvent, TwistedDelegate, Closed11from txamqp.client import TwistedEvent, TwistedDelegate, Closed
12from cStringIO import StringIO12from cStringIO import StringIO
13import struct13import struct
14from time import time
1415
15class GarbageException(Exception):16class GarbageException(Exception):
16 pass17 pass
@@ -200,7 +201,7 @@
200201
201 channelClass = AMQChannel202 channelClass = AMQChannel
202203
203 def __init__(self, delegate, vhost, *args, **kwargs):204 def __init__(self, delegate, vhost, heartbeat=0, *args, **kwargs):
204 FrameReceiver.__init__(self, *args, **kwargs)205 FrameReceiver.__init__(self, *args, **kwargs)
205 self.delegate = delegate206 self.delegate = delegate
206207
@@ -218,6 +219,9 @@
218 self.work = defer.DeferredQueue()219 self.work = defer.DeferredQueue()
219220
220 self.started = TwistedEvent()221 self.started = TwistedEvent()
222 self.connectionLostEvent = TwistedEvent()
223 self.lastSent = time()
224 self.lastReceived = time()
221225
222 self.queueLock = defer.DeferredLock()226 self.queueLock = defer.DeferredLock()
223227
@@ -225,6 +229,10 @@
225229
226 self.outgoing.get().addCallback(self.writer)230 self.outgoing.get().addCallback(self.writer)
227 self.work.get().addCallback(self.worker)231 self.work.get().addCallback(self.worker)
232 self.heartbeatInterval = heartbeat
233 self.pendingHeartbeat = None
234 if self.heartbeatInterval > 0:
235 self.started.wait().addCallback(self.heartbeatHandler)
228236
229 def check_0_8(self):237 def check_0_8(self):
230 return (self.spec.minor, self.spec.major) == (0, 8)238 return (self.spec.minor, self.spec.major) == (0, 8)
@@ -293,13 +301,25 @@
293 self.sendInitString()301 self.sendInitString()
294 self.setFrameMode()302 self.setFrameMode()
295303
304 def connectionLost(self, reason):
305 if self.pendingHeartbeat is not None and self.pendingHeartbeat.active():
306 self.pendingHeartbeat.cancel()
307 self.pendingHeartbeat = None
308 self.connectionLostEvent.set()
309
296 def frameReceived(self, frame):310 def frameReceived(self, frame):
297 self.processFrame(frame)311 self.processFrame(frame)
298312
313 def sendFrame(self, frame):
314 self.lastSent = time()
315 FrameReceiver.sendFrame(self, frame)
316
299 @defer.inlineCallbacks317 @defer.inlineCallbacks
300 def processFrame(self, frame):318 def processFrame(self, frame):
319 self.lastReceived = time()
301 ch = yield self.channel(frame.channel)320 ch = yield self.channel(frame.channel)
302 ch.dispatch(frame, self.work)321 if frame.payload.type != Frame.HEARTBEAT:
322 ch.dispatch(frame, self.work)
303323
304 @defer.inlineCallbacks324 @defer.inlineCallbacks
305 def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):325 def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
@@ -320,3 +340,15 @@
320340
321 channel0 = yield self.channel(0)341 channel0 = yield self.channel(0)
322 yield channel0.connection_open(self.vhost)342 yield channel0.connection_open(self.vhost)
343
344 def heartbeatHandler (self, dummy=None):
345 now = time()
346 if self.lastSent + self.heartbeatInterval < now:
347 self.sendFrame(Frame(0, Heartbeat()))
348 if self.lastReceived + self.heartbeatInterval * 3 < now:
349 self.transport.loseConnection()
350 tple = None
351 if self.transport.connected:
352 tple = reactor.callLater(self.heartbeatInterval, self.heartbeatHandler)
353 self.pendingHeartbeat = tple
354
323355
=== added file 'src/txamqp/test/test_heartbeat.py'
--- src/txamqp/test/test_heartbeat.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/test/test_heartbeat.py 2009-06-04 16:26:58 +0000
@@ -0,0 +1,57 @@
1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20from time import time
21#from txamqp.client import Closed
22#from txamqp.queue import Empty
23#from txamqp.content import Content
24from txamqp.testlib import TestBase
25
26#from twisted.internet.defer import, returnValue,
27from twisted.internet.defer import Deferred, inlineCallbacks
28
29class TxTests(TestBase):
30 """
31 Tests handling of heartbeat frames
32 """
33
34
35 @inlineCallbacks
36 def setUp(self):
37 """ Set up a heartbeat frame per second """
38 self.client = yield self.connect(heartbeat=1)
39
40 self.channel = yield self.client.channel(1)
41 yield self.channel.channel_open()
42
43
44 @inlineCallbacks
45 def test_heartbeat(self):
46 """
47 Test that heartbeat frames are sent and received
48 """
49 d = Deferred()
50 d.setTimeout(8)
51 d.addBoth(lambda x:True)
52 yield d
53 t = time()
54 self.assertTrue(self.client.lastSent > t - 3,
55 "A heartbeat frame was recently sent")
56 self.assertTrue(self.client.lastReceived > t - 3,
57 "A heartbeat frame was recently received")
058
=== modified file 'src/txamqp/testlib.py'
--- src/txamqp/testlib.py 2009-02-11 22:28:45 +0000
+++ src/txamqp/testlib.py 2009-06-04 16:26:58 +0000
@@ -39,22 +39,26 @@
39 self.user = 'guest'39 self.user = 'guest'
40 self.password = 'guest'40 self.password = 'guest'
41 self.vhost = 'localhost'41 self.vhost = 'localhost'
42 self.heartbeat = 0
42 self.queues = []43 self.queues = []
43 self.exchanges = []44 self.exchanges = []
44 self.connectors = []45 self.connectors = []
4546
46 @inlineCallbacks47 @inlineCallbacks
47 def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None):48 def connect(self, host=None, port=None, spec=None, user=None,
49 password=None, vhost=None, heartbeat=None):
48 host = host or self.host50 host = host or self.host
49 port = port or self.port51 port = port or self.port
50 spec = spec or self.spec52 spec = spec or self.spec
51 user = user or self.user53 user = user or self.user
52 password = password or self.password54 password = password or self.password
53 vhost = vhost or self.vhost55 vhost = vhost or self.vhost
56 heartbeat = heartbeat or self.heartbeat
5457
55 delegate = TwistedDelegate()58 delegate = TwistedDelegate()
56 onConn = Deferred()59 onConn = Deferred()
57 f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn)60 p = AMQClient(delegate, vhost, heartbeat=heartbeat, spec=txamqp.spec.load(spec))
61 f = protocol._InstanceFactory(reactor, p, onConn)
58 c = reactor.connectTCP(host, port, f)62 c = reactor.connectTCP(host, port, f)
59 self.connectors.append(c)63 self.connectors.append(c)
60 client = yield onConn64 client = yield onConn

Subscribers

People subscribed via source and target branches

to status/vote changes: