Merge lp:~facundo/ubuntuone-client/stable-ping-it-baby into lp:ubuntuone-client/stable-1-4

Proposed by Facundo Batista
Status: Merged
Approved by: Facundo Batista
Approved revision: 720
Merged at revision: 725
Proposed branch: lp:~facundo/ubuntuone-client/stable-ping-it-baby
Merge into: lp:ubuntuone-client/stable-1-4
Diff against target: 233 lines (+123/-12)
3 files modified
contrib/testing/testcase.py (+5/-0)
tests/syncdaemon/test_action_queue.py (+93/-1)
ubuntuone/syncdaemon/action_queue.py (+25/-11)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/stable-ping-it-baby
Reviewer Review Type Date Requested Status
Lucio Torre (community) Approve
Guillermo Gonzalez Approve
Review via email: mp+36574@code.launchpad.net

Commit message

Client periodically pings the server to check connection.

Description of the change

Client periodically pings the server to check connection.

Tests included. Also added a small helper in the MementoHandler that is
useful for debugging while coding.

Note that I also had to fix an ugly monkeypatching that didn't let me log in debug.

Also note that I hardcoded the ping period. Maybe in the future we would want for it to be configurable, but I wanted to reduce the footprint to ease the SRU. I chose 10 minutes as a balance between time between pings and server load (having 500k connected clients to 16 servers, one ping each 10 mins will be 52 pings per second per server).

To post a comment you must log in.
Revision history for this message
Guillermo Gonzalez (verterok) wrote :

looks good.

review: Approve
Revision history for this message
Lucio Torre (lucio.torre) wrote :

looks good

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'contrib/testing/testcase.py'
2--- contrib/testing/testcase.py 2010-09-07 16:02:04 +0000
3+++ contrib/testing/testcase.py 2010-09-24 18:05:58 +0000
4@@ -407,6 +407,7 @@
5 """ Create the instance, and add a records attribute. """
6 logging.Handler.__init__(self, *args, **kwargs)
7 self.records = []
8+ self.debug = False
9
10 def emit(self, record):
11 """ Just add the record to self.records. """
12@@ -417,6 +418,10 @@
13 for rec in self.records:
14 if rec.levelno == level and all(m in rec.message for m in msgs):
15 return True
16+ if self.debug:
17+ recorded = [(logging.getLevelName(r.levelno), r.message)
18+ for r in self.records]
19+ print "Memento messages:", recorded
20 return False
21
22 def check_debug(self, *msgs):
23
24=== modified file 'tests/syncdaemon/test_action_queue.py'
25--- tests/syncdaemon/test_action_queue.py 2010-09-21 14:31:36 +0000
26+++ tests/syncdaemon/test_action_queue.py 2010-09-24 18:05:58 +0000
27@@ -49,7 +49,7 @@
28 from ubuntuone.syncdaemon import states
29 from ubuntuone.syncdaemon.action_queue import (
30 ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF,
31- DeleteVolume, Download, ListVolumes,
32+ DeleteVolume, Download, ListVolumes, ActionQueueProtocol,
33 NoisyRequestQueue, RequestQueue, UploadProgressWrapper, Upload,
34 CreateShare, GetPublicFiles, GetDelta, GetDeltaFromScratch,
35 TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir,
36@@ -3677,3 +3677,95 @@
37 self.assertTrue(self.handler.check_note('Session ID: %r' %
38 str(request.session_id)))
39
40+
41+class ActionQueueProtocolTests(TwistedTestCase):
42+ """Test the ACQ class."""
43+
44+ def setUp(self):
45+ """Set up."""
46+ # create an AQP and put a factory to it
47+ self.aqp = ActionQueueProtocol()
48+ obj = Mocker().mock()
49+ obj.event_queue.push('SYS_CONNECTION_MADE')
50+ self.aqp.factory = obj
51+
52+ # set up the logger
53+ self.handler = MementoHandler()
54+ self.handler.setLevel(logging.DEBUG)
55+ self.aqp.log.addHandler(self.handler)
56+
57+ def tearDown(self):
58+ """Tear down."""
59+ self.aqp.log.removeHandler(self.handler)
60+ task = self.aqp._looping_ping
61+ if task is not None and task.running:
62+ task.stop()
63+
64+ def test_connection_made(self):
65+ """Connection is made."""
66+ mocker = Mocker()
67+ obj = mocker.mock()
68+ obj.event_queue.push('SYS_CONNECTION_MADE')
69+ self.aqp.factory = obj
70+
71+ # test
72+ with mocker:
73+ self.aqp.connectionMade()
74+ self.assertTrue(self.handler.check_info('Connection made.'))
75+ self.assertFalse(self.aqp._looping_ping is None)
76+
77+ def test_connection_lost(self):
78+ """Connection is lost."""
79+ self.aqp.connectionLost('foo')
80+ self.assertTrue(self.handler.check_info(
81+ 'Connection lost, reason: foo.'))
82+ self.assertTrue(self.aqp._looping_ping is None)
83+
84+ def test_ping_connection_made_twice(self):
85+ """If connection made is called twice, don't create two tasks."""
86+ self.aqp.connectionMade()
87+ task1 = self.aqp._looping_ping
88+ self.aqp.connectionMade()
89+ task2 = self.aqp._looping_ping
90+ self.assertTrue(task1 is task2)
91+
92+ def test_ping_connection_lost_twice(self):
93+ """If connection lost is called twice, don't stop None."""
94+ self.aqp.connectionMade()
95+ self.assertFalse(self.aqp._looping_ping is None)
96+ self.aqp.connectionLost('reason')
97+ self.assertTrue(self.aqp._looping_ping is None)
98+ self.aqp.connectionLost('reason')
99+ self.assertTrue(self.aqp._looping_ping is None)
100+
101+ @defer.inlineCallbacks
102+ def test_ping_task_calls_ping(self):
103+ """The task will call the _do_ping method."""
104+ self.aqp._ping_delay = .1
105+ deferred = defer.Deferred()
106+
107+ def fake_ping():
108+ """Stop the loop and trigger the deferred test."""
109+ self.aqp._looping_ping.stop()
110+ deferred.callback(True)
111+
112+ self.aqp._do_ping = fake_ping
113+ self.aqp.connectionMade()
114+ yield deferred
115+
116+ def test_ping_do_ping(self):
117+ """Ping and log."""
118+ # mock the request
119+ mocker = Mocker()
120+ req = mocker.mock()
121+ req.rtt
122+ mocker.result(1.123123)
123+ self.aqp.ping = lambda: defer.succeed(req)
124+
125+ # ping will be called, and req accessed, otherwise mocker will complain
126+ with mocker:
127+ self.aqp._do_ping()
128+
129+ # check also the log
130+ self.assertTrue(self.handler.check_debug('Ping! rtt: 1.123 segs'))
131+
132
133=== modified file 'ubuntuone/syncdaemon/action_queue.py'
134--- ubuntuone/syncdaemon/action_queue.py 2010-09-21 14:31:36 +0000
135+++ ubuntuone/syncdaemon/action_queue.py 2010-09-24 18:05:58 +0000
136@@ -43,7 +43,7 @@
137 from urlparse import urljoin
138
139 from zope.interface import implements
140-from twisted.internet import reactor, defer, threads
141+from twisted.internet import reactor, defer, threads, task
142 from twisted.internet import error as twisted_errors
143 from twisted.names import client as dns_client
144 from twisted.python.failure import Failure, DefaultException
145@@ -120,48 +120,48 @@
146 self.log = logging.getLogger('ubuntuone.SyncDaemon.StorageClient')
147 # configure the handler level to be < than DEBUG
148 self.log.setLevel(TRACE)
149- self.log.debug = partial(self.log.log, TRACE)
150+ self.log_trace = partial(self.log.log, TRACE)
151
152 def processMessage(self, message):
153 """Wrapper that logs the message and result."""
154 # don't log the full message if it's of type BYTES
155 if message.type == protocol_pb2.Message.BYTES:
156- self.log.debug('start - processMessage: id: %s, type: %s',
157+ self.log_trace('start - processMessage: id: %s, type: %s',
158 message.id, message.type)
159 else:
160- self.log.debug('start - processMessage: %s',
161+ self.log_trace('start - processMessage: %s',
162 str(message).replace("\n", " "))
163 if message.id in self.requests:
164 req = self.requests[message.id]
165 req.deferred.addCallbacks(self.log_success, self.log_error)
166 result = ThrottlingStorageClient.processMessage(self, message)
167- self.log.debug('end - processMessage: id: %s - result: %s',
168+ self.log_trace('end - processMessage: id: %s - result: %s',
169 message.id, result)
170 return result
171
172 def log_error(self, failure):
173 """Logging errback for requests."""
174- self.log.debug('request error: %s', failure)
175+ self.log_trace('request error: %s', failure)
176 return failure
177
178 def log_success(self, result):
179 """Logging callback for requests."""
180- self.log.debug('request finished: %s', result)
181+ self.log_trace('request finished: %s', result)
182 if getattr(result, '__dict__', None):
183- self.log.debug('result.__dict__: %s', result.__dict__)
184+ self.log_trace('result.__dict__: %s', result.__dict__)
185 return result
186
187 def sendMessage(self, message):
188 """Wrapper that logs the message and result."""
189 # don't log the full message if it's of type BYTES
190 if message.type == protocol_pb2.Message.BYTES:
191- self.log.debug('start - sendMessage: id: %s, type: %s',
192+ self.log_trace('start - sendMessage: id: %s, type: %s',
193 message.id, message.type)
194 else:
195- self.log.debug('start - sendMessage: %s',
196+ self.log_trace('start - sendMessage: %s',
197 str(message).replace("\n", " "))
198 result = ThrottlingStorageClient.sendMessage(self, message)
199- self.log.debug('end - sendMessage: id: %s', message.id)
200+ self.log_trace('end - sendMessage: id: %s', message.id)
201 return result
202
203
204@@ -169,15 +169,29 @@
205 """This is the Action Queue version of the StorageClient protocol."""
206
207 factory = None
208+ _looping_ping = None
209+ _ping_delay = 600 # 10 minutes
210
211 def connectionMade(self):
212 """A new connection was made."""
213 self.log.info('Connection made.')
214 self.factory.event_queue.push('SYS_CONNECTION_MADE')
215+ if self._looping_ping is None:
216+ self._looping_ping = task.LoopingCall(self._do_ping)
217+ self._looping_ping.start(self._ping_delay, now=False)
218
219 def connectionLost(self, reason):
220 """The connection was lost."""
221 self.log.info('Connection lost, reason: %s.', reason)
222+ if self._looping_ping is not None:
223+ self._looping_ping.stop()
224+ self._looping_ping = None
225+
226+ @defer.inlineCallbacks
227+ def _do_ping(self):
228+ """Ping the server just to use the network."""
229+ req = yield self.ping()
230+ self.log.debug("Ping! rtt: %.3f segs", req.rtt)
231
232
233 class Marker(str):

Subscribers

People subscribed via source and target branches