Merge lp:~elachuni/txamqp/heartbeat-loopingcalls into lp:txamqp

Proposed by Anthony Lenton
Status: Merged
Merged at revision: not available
Proposed branch: lp:~elachuni/txamqp/heartbeat-loopingcalls
Merge into: lp:txamqp
Diff against target: None lines
To merge this branch: bzr merge lp:~elachuni/txamqp/heartbeat-loopingcalls
Reviewer Review Type Date Requested Status
txAMQP Team Pending
Review via email: mp+9503@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Anthony Lenton (elachuni) wrote :

This is a branch that implements heartbeat frames using LoopingCall, per Esteve's suggestion.

Revision history for this message
Esteve Fernandez (esteve) wrote :

Hi Anthony, sorry for the delay. Thank you very much for your patch.

> This is a branch that implements heartbeat frames using LoopingCall, per
> Esteve's suggestion.

I modified it a bit (lp:~esteve/txamqp/heartbeat-loopingcalls):

- added configurable heartbeats and client classes in testcases, so overriding connect and setUp is not needed, making HeartbeatTests a bit shorter
- re-added lastSent and lastReceived from your original patch. I didn't realize it at first, sorry, but now I think it's a great idea for accounting purposes

What do you think of it?

Thanks for your continuing contributions!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/txamqp/client.py'
--- src/txamqp/client.py 2009-05-28 09:46:03 +0000
+++ src/txamqp/client.py 2009-06-11 19:45:31 +0000
@@ -29,7 +29,8 @@
2929
30 def connection_tune(self, ch, msg):30 def connection_tune(self, ch, msg):
31 self.client.MAX_LENGTH = msg.frame_max31 self.client.MAX_LENGTH = msg.frame_max
32 ch.connection_tune_ok(*msg.fields)32 args = msg.channel_max, msg.frame_max, self.client.heartbeatInterval
33 ch.connection_tune_ok(*args)
33 self.client.started.reset()34 self.client.started.reset()
3435
35 @defer.inlineCallbacks36 @defer.inlineCallbacks
3637
=== modified file 'src/txamqp/connection.py'
--- src/txamqp/connection.py 2008-10-29 18:31:04 +0000
+++ src/txamqp/connection.py 2009-04-27 17:00:08 +0000
@@ -196,3 +196,15 @@
196196
197 def __str__(self):197 def __str__(self):
198 return "Body(%r)" % self.content198 return "Body(%r)" % self.content
199
200class Heartbeat(Payload):
201 type = Frame.HEARTBEAT
202 def __str__(self):
203 return "Heartbeat()"
204
205 def encode(self, enc):
206 enc.encode_long(0)
207
208 def decode(spec, dec):
209 dec.decode_long()
210 return Heartbeat()
199211
=== modified file 'src/txamqp/protocol.py'
--- src/txamqp/protocol.py 2009-06-18 08:13:49 +0000
+++ src/txamqp/protocol.py 2009-07-31 11:43:51 +0000
@@ -1,16 +1,18 @@
1# coding: utf-81# coding: utf-8
2from twisted.python import log2from twisted.python import log
3from twisted.internet import defer, protocol3from twisted.internet import defer, protocol, reactor
4from twisted.internet.task import LoopingCall
4from twisted.protocols import basic5from twisted.protocols import basic
5from txamqp import spec6from txamqp import spec
6from txamqp.codec import Codec, EOF7from txamqp.codec import Codec, EOF
7from txamqp.connection import Header, Frame, Method, Body8from txamqp.connection import Header, Frame, Method, Body, Heartbeat
8from txamqp.message import Message9from txamqp.message import Message
9from txamqp.content import Content10from txamqp.content import Content
10from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed11from txamqp.queue import TimeoutDeferredQueue, Empty, Closed as QueueClosed
11from txamqp.client import TwistedEvent, TwistedDelegate, Closed12from txamqp.client import TwistedEvent, TwistedDelegate, Closed
12from cStringIO import StringIO13from cStringIO import StringIO
13import struct14import struct
15from time import time
1416
15class GarbageException(Exception):17class GarbageException(Exception):
16 pass18 pass
@@ -27,10 +29,10 @@
27 self.responses = TimeoutDeferredQueue()29 self.responses = TimeoutDeferredQueue()
2830
29 self.queue = None31 self.queue = None
30 32
31 self.closed = False33 self.closed = False
32 self.reason = None34 self.reason = None
33 35
34 def close(self, reason):36 def close(self, reason):
35 if self.closed:37 if self.closed:
36 return38 return
@@ -200,7 +202,10 @@
200202
201 channelClass = AMQChannel203 channelClass = AMQChannel
202204
203 def __init__(self, delegate, vhost, *args, **kwargs):205 # Max unreceived heartbeat frames. The AMQP standard says it's 3.
206 MAX_UNSEEN_HEARTBEAT = 3
207
208 def __init__(self, delegate, vhost, heartbeat=0, *args, **kwargs):
204 FrameReceiver.__init__(self, *args, **kwargs)209 FrameReceiver.__init__(self, *args, **kwargs)
205 self.delegate = delegate210 self.delegate = delegate
206211
@@ -225,6 +230,27 @@
225230
226 self.outgoing.get().addCallback(self.writer)231 self.outgoing.get().addCallback(self.writer)
227 self.work.get().addCallback(self.worker)232 self.work.get().addCallback(self.worker)
233 self.heartbeatInterval = heartbeat
234 self.checkHB = None
235 self.sendHB = None
236 if self.heartbeatInterval > 0:
237 d = self.started.wait()
238 d.addCallback(self.reschedule_sendHB)
239 d.addCallback(self.reschedule_checkHB)
240
241 def reschedule_sendHB(self, dummy=None):
242 if self.heartbeatInterval > 0:
243 if self.sendHB is None:
244 self.sendHB = LoopingCall(self.sendHeartbeat)
245 elif self.sendHB.running:
246 self.sendHB.stop()
247 self.sendHB.start(self.heartbeatInterval, now=False)
248
249 def reschedule_checkHB(self, dummy=None):
250 if self.checkHB is not None and self.checkHB.active():
251 self.checkHB.cancel()
252 self.checkHB = reactor.callLater(self.heartbeatInterval *
253 self.MAX_UNSEEN_HEARTBEAT, self.checkHeartbeat)
228254
229 def check_0_8(self):255 def check_0_8(self):
230 return (self.spec.minor, self.spec.major) == (0, 8)256 return (self.spec.minor, self.spec.major) == (0, 8)
@@ -254,7 +280,7 @@
254 finally:280 finally:
255 self.queueLock.release()281 self.queueLock.release()
256 defer.returnValue(q)282 defer.returnValue(q)
257 283
258 def close(self, reason):284 def close(self, reason):
259 for ch in self.channels.values():285 for ch in self.channels.values():
260 ch.close(reason)286 ch.close(reason)
@@ -294,10 +320,18 @@
294 def frameReceived(self, frame):320 def frameReceived(self, frame):
295 self.processFrame(frame)321 self.processFrame(frame)
296322
323 def sendFrame(self, frame):
324 if frame.payload.type != Frame.HEARTBEAT:
325 self.reschedule_sendHB()
326 FrameReceiver.sendFrame(self, frame)
327
297 @defer.inlineCallbacks328 @defer.inlineCallbacks
298 def processFrame(self, frame):329 def processFrame(self, frame):
299 ch = yield self.channel(frame.channel)330 ch = yield self.channel(frame.channel)
300 ch.dispatch(frame, self.work)331 if frame.payload.type != Frame.HEARTBEAT:
332 ch.dispatch(frame, self.work)
333 if self.heartbeatInterval > 0:
334 self.reschedule_checkHB()
301335
302 @defer.inlineCallbacks336 @defer.inlineCallbacks
303 def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):337 def authenticate(self, username, password, mechanism='AMQPLAIN', locale='en_US'):
@@ -319,5 +353,19 @@
319 channel0 = yield self.channel(0)353 channel0 = yield self.channel(0)
320 yield channel0.connection_open(self.vhost)354 yield channel0.connection_open(self.vhost)
321355
356 def sendHeartbeat(self):
357 self.sendFrame(Frame(0, Heartbeat()))
358
359 def checkHeartbeat(self):
360 if self.checkHB is not None and self.checkHB.active():
361 self.checkHB.cancel()
362 self.checkHB = None
363 self.transport.loseConnection()
364
322 def connectionLost(self, reason):365 def connectionLost(self, reason):
366 if self.heartbeatInterval > 0 and self.sendHB.running:
367 self.sendHB.stop()
368 if self.checkHB is not None and self.checkHB.active():
369 self.checkHB.cancel()
323 self.close(reason)370 self.close(reason)
371
324372
=== added file 'src/txamqp/test/test_heartbeat.py'
--- src/txamqp/test/test_heartbeat.py 1970-01-01 00:00:00 +0000
+++ src/txamqp/test/test_heartbeat.py 2009-07-31 11:43:51 +0000
@@ -0,0 +1,59 @@
1from time import time
2import txamqp
3from txamqp.testlib import TestBase
4from txamqp.protocol import AMQClient, TwistedDelegate
5from twisted.internet import reactor, protocol
6from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
7
8class SpyAMQClient(AMQClient):
9 called_reschedule_check = 0
10 called_send_hb = 0
11 def reschedule_checkHB(self, dummy=None):
12 AMQClient.reschedule_checkHB(self)
13 self.called_reschedule_check += 1
14 def sendHeartbeat(self):
15 AMQClient.sendHeartbeat(self)
16 self.called_send_hb += 1
17
18class HeartbeatTests(TestBase):
19 """
20 Tests handling of heartbeat frames
21 """
22
23 @inlineCallbacks
24 def connect(self):
25 delegate = TwistedDelegate()
26 onConn = Deferred()
27 p = SpyAMQClient(delegate, self.vhost, heartbeat=1,
28 spec=txamqp.spec.load(self.spec))
29 f = protocol._InstanceFactory(reactor, p, onConn)
30 c = reactor.connectTCP(self.host, self.port, f)
31 self.connectors.append(c)
32 client = yield onConn
33
34 yield client.authenticate(self.user, self.password)
35 returnValue(client)
36
37
38 @inlineCallbacks
39 def setUp(self):
40 """ Set up a heartbeat frame per second """
41 self.client = yield self.connect()
42
43 self.channel = yield self.client.channel(1)
44 yield self.channel.channel_open()
45
46 def test_heartbeat(self):
47 """
48 Test that heartbeat frames are sent and received
49 """
50 d = Deferred()
51 def checkPulse(dummy):
52 t = time()
53 self.assertTrue(self.client.called_send_hb,
54 "A heartbeat frame was recently sent")
55 self.assertTrue(self.client.called_reschedule_check,
56 "A heartbeat frame was recently received")
57 d.addCallback(checkPulse)
58 reactor.callLater(3, d.callback, None)
59 return d
060
=== modified file 'src/txamqp/testlib.py'
--- src/txamqp/testlib.py 2009-06-17 16:01:31 +0000
+++ src/txamqp/testlib.py 2009-07-09 13:26:53 +0000
@@ -6,9 +6,9 @@
6# to you under the Apache License, Version 2.0 (the6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at8# with the License. You may obtain a copy of the License at
9# 9#
10# http://www.apache.org/licenses/LICENSE-2.010# http://www.apache.org/licenses/LICENSE-2.0
11# 11#
12# Unless required by applicable law or agreed to in writing,12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -78,29 +78,33 @@
78 self.user = 'guest'78 self.user = 'guest'
79 self.password = 'guest'79 self.password = 'guest'
80 self.vhost = 'localhost'80 self.vhost = 'localhost'
81 self.heartbeat = 0
81 self.queues = []82 self.queues = []
82 self.exchanges = []83 self.exchanges = []
83 self.connectors = []84 self.connectors = []
8485
85 @inlineCallbacks86 @inlineCallbacks
86 def connect(self, host=None, port=None, spec=None, user=None, password=None, vhost=None):87 def connect(self, host=None, port=None, spec=None, user=None,
88 password=None, vhost=None, heartbeat=None):
87 host = host or self.host89 host = host or self.host
88 port = port or self.port90 port = port or self.port
89 spec = spec or self.spec91 spec = spec or self.spec
90 user = user or self.user92 user = user or self.user
91 password = password or self.password93 password = password or self.password
92 vhost = vhost or self.vhost94 vhost = vhost or self.vhost
95 heartbeat = heartbeat or self.heartbeat
9396
94 delegate = TwistedDelegate()97 delegate = TwistedDelegate()
95 onConn = Deferred()98 onConn = Deferred()
96 f = protocol._InstanceFactory(reactor, AMQClient(delegate, vhost, txamqp.spec.load(spec)), onConn)99 p = AMQClient(delegate, vhost, heartbeat=heartbeat, spec=txamqp.spec.load(spec))
100 f = protocol._InstanceFactory(reactor, p, onConn)
97 c = reactor.connectTCP(host, port, f)101 c = reactor.connectTCP(host, port, f)
98 self.connectors.append(c)102 self.connectors.append(c)
99 client = yield onConn103 client = yield onConn
100104
101 yield client.authenticate(user, password)105 yield client.authenticate(user, password)
102 returnValue(client)106 returnValue(client)
103 107
104 @inlineCallbacks108 @inlineCallbacks
105 def setUp(self):109 def setUp(self):
106 self.client = yield self.connect()110 self.client = yield self.connect()

Subscribers

People subscribed via source and target branches

to status/vote changes: