Merge lp:~pedronis/ubuntuone-client/transfer-progress-events into lp:ubuntuone-client

Proposed by Samuele Pedroni
Status: Merged
Approved by: Stuart Colville
Approved revision: 528
Merged at revision: 520
Proposed branch: lp:~pedronis/ubuntuone-client/transfer-progress-events
Merge into: lp:ubuntuone-client
Diff against target: 660 lines (+425/-13)
5 files modified
tests/syncdaemon/test_action_queue.py (+246/-3)
tests/syncdaemon/test_dbus.py (+48/-0)
ubuntuone/syncdaemon/action_queue.py (+58/-9)
ubuntuone/syncdaemon/dbus_interface.py (+69/-1)
ubuntuone/syncdaemon/event_queue.py (+4/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntuone-client/transfer-progress-events
Reviewer Review Type Date Requested Status
Facundo Batista (community) Approve
Stuart Colville (community) Approve
Rick McBride (community) Abstain
Samuele Pedroni Abstain
Natalia Bidart (community) Approve
Review via email: mp+24627@code.launchpad.net

Commit message

addressing the demands of Bug #570747, introduces internal events AQ_DOWNLOAD/UPLOAD_FILE_PROGRESS that then get converted into D-Bus signals Download/UploadFileProgress.

Description of the change

addressing the demands of Bug #570747, introduces internal events AQ_DOWNLOAD/UPLOAD_FILE_PROGRESS that then get converted into D-Bus signals Download/UploadFileProgress. The information conveyed is similar to that of the current_downloads method, information about deflated_size and numbers of *compressed* bytes transfered are reported. Events are generated only if at least 64Kb of progress have been made.

To post a comment you must log in.
Revision history for this message
Natalia Bidart (nataliabidart) wrote :

 * This docstring seems incorrect:

+class DownloadTestCase(ConnectedBaseTestCase):
+ """Test for ListDir ActionQueueCommand."""

 * class MyGetContent(GetContentMixin) (and all the other auxiliar classes) need docstring, and the self shadows the self from the outter class.

 * most of the (new) tests needs their docstring.

 * use assertEqual instead of assertEquals (the latter will be deprecated with unittest2).

 * PEP-8 indicates 2 blank lines between classes.

 * the adding of new events to the EVENTS collection should be tested

review: Needs Fixing
Revision history for this message
Samuele Pedroni (pedronis) wrote :

Now all style issues should have been addressed, also the progress hooks have been simplified to take only the byte count.

Revision history for this message
Rodrigo Moya (rodrigo-moya) wrote :

As for the libubuntuone needs, this is ok, even a bigger threshold would be ok

Revision history for this message
Natalia Bidart (nataliabidart) wrote :

Thank you!

review: Approve
Revision history for this message
Rick McBride (rmcbride) wrote :

Looks good!

review: Approve
Revision history for this message
Samuele Pedroni (pedronis) wrote :

there's a bug that was not there about resetting the n_bytes_*_last counters that need fixing

review: Needs Fixing
Revision history for this message
Samuele Pedroni (pedronis) wrote :

I pushed fixes for the issue, plus testing

review: Abstain
Revision history for this message
Rick McBride (rmcbride) :
review: Abstain
Revision history for this message
Stuart Colville (muffinresearch) wrote :

Tests pass - looks good.

review: Approve
Revision history for this message
Facundo Batista (facundo) wrote :

Looks ok now

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_action_queue.py'
2--- tests/syncdaemon/test_action_queue.py 2010-05-03 23:40:25 +0000
3+++ tests/syncdaemon/test_action_queue.py 2010-05-05 18:49:23 +0000
4@@ -40,6 +40,7 @@
5 from contrib.testing.testcase import (
6 BaseTwistedTestCase, MementoHandler, DummyClass
7 )
8+from contrib.mocker import Mocker
9
10 from ubuntuone.storageprotocol import client, errors, protocol_pb2, volumes
11 from ubuntuone.syncdaemon import states
12@@ -47,8 +48,10 @@
13 from ubuntuone.syncdaemon.main import Main
14 from ubuntuone.syncdaemon.action_queue import (
15 ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF,
16- DeleteVolume, ListDir, ListVolumes, NoisyRequestQueue, RequestQueue,
17- Upload, CreateShare, GetPublicFiles,
18+ DeleteVolume, Download, GetContentMixin, ListDir, ListVolumes,
19+ NoisyRequestQueue, RequestQueue, UploadProgressWrapper, Upload,
20+ CreateShare, GetPublicFiles,
21+ TRANSFER_PROGRESS_THRESHOLD
22 )
23 from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
24 from ubuntuone.syncdaemon.volume_manager import UDF
25@@ -1262,6 +1265,49 @@
26 self.assertTrue(res is None)
27
28
29+class GetContentMixinTestCase(ConnectedBaseTestCase):
30+ """Test for GetContentMixin ActionQueueCommand subclass."""
31+
32+ def setUp(self):
33+ """Init."""
34+ res = super(GetContentMixinTestCase, self).setUp()
35+
36+ self.rq = RequestQueue(name='FOO', action_queue=self.action_queue)
37+ return res
38+
39+ def test_progress(self):
40+ """Test progress_hook invocation."""
41+ mocker = Mocker()
42+ gunzip = mocker.mock()
43+ gunzip.decompress('some compressed bytes')
44+ mocker.result('some bytes')
45+ mocker.replay()
46+
47+ calls = []
48+ class MyGetContent(GetContentMixin):
49+ """Subclass to check progress_hook invocation."""
50+ def progress_hook(innerself, n_bytes):
51+ calls.append((innerself.fileobj.getvalue(),
52+ n_bytes))
53+ # check just that is there
54+ super(MyGetContent, innerself).progress_hook(n_bytes)
55+
56+ command = MyGetContent(self.rq, share_id='a_share_id',
57+ node_id='a_node_id', server_hash='a_server_hash',
58+ fileobj_factory=lambda: None)
59+ command.gunzip = gunzip
60+ command.fileobj = StringIO()
61+ dloading = {
62+ 'n_bytes_read': 0
63+ }
64+ command.action_queue.downloading['a_share_id', 'a_node_id'] = dloading
65+ command.cb('some compressed bytes')
66+ expected = len('some compressed bytes')
67+ self.assertEqual(calls, [('some bytes', expected)])
68+ mocker.restore()
69+ mocker.verify()
70+
71+
72 class ListDirTestCase(ConnectedBaseTestCase):
73 """Test for ListDir ActionQueueCommand."""
74
75@@ -1322,12 +1368,177 @@
76 self.assertTrue(res is None)
77
78 def test_AQ_DOWNLOAD_DOES_NOT_EXIST_is_valid_event(self):
79- """AQ_DOWNLOAD_DOES_NOT_EXIST is a valdi event."""
80+ """AQ_DOWNLOAD_DOES_NOT_EXIST is a valid event."""
81 event = 'AQ_DOWNLOAD_DOES_NOT_EXIST'
82 self.assertTrue(event in EVENTS)
83 self.assertEquals(('share_id', 'node_id'), EVENTS[event])
84
85
86+class DownloadUnconnectedTestCase(FactoryBaseTestCase):
87+ """Test for Download ActionQueueCommand, no connection"""
88+
89+ def setUp(self):
90+ """Init."""
91+ super(DownloadUnconnectedTestCase, self).setUp()
92+
93+ self.rq = request_queue = RequestQueue(name='FOO',
94+ action_queue=self.action_queue)
95+ self.command = Download(request_queue, share_id='a_share_id',
96+ node_id='a_node_id', server_hash='a_server_hash',
97+ fileobj_factory=lambda: None)
98+ self.command.start_unqueued() # create the logger
99+
100+ def test_progress_information_setup(self):
101+ """Test the setting up of the progress information in ._run()."""
102+ calls = []
103+ d = defer.Deferred()
104+ class FakedRequest():
105+ """Fake Request."""
106+ pass
107+ class FakedClient(object):
108+ """Fake Client."""
109+ def get_content_request(innerself, *args, **kwds):
110+ """Fake a get content request with its deferred."""
111+ calls.append(kwds.get('offset'))
112+ req = FakedRequest()
113+ req.deferred = d
114+ return req
115+
116+ self.command.action_queue.connect_in_progress = False
117+ self.command.action_queue.client = FakedClient()
118+ self.command._run()
119+
120+ self.assertEqual(self.command.n_bytes_read_last, 0)
121+ self.assertEqual(calls, [0])
122+
123+ calls = []
124+ dloading = self.command.action_queue.downloading['a_share_id',
125+ 'a_node_id']
126+ dloading['n_bytes_read'] = 20
127+ self.command._run()
128+
129+ self.assertEqual(self.command.n_bytes_read_last, 20)
130+ self.assertEqual(calls, [20])
131+
132+class DownloadTestCase(ConnectedBaseTestCase):
133+ """Test for Download ActionQueueCommand."""
134+
135+ def setUp(self):
136+ """Init."""
137+ res = super(DownloadTestCase, self).setUp()
138+
139+ request_queue = RequestQueue(name='FOO', action_queue=self.action_queue)
140+ self.command = Download(request_queue, share_id='a_share_id',
141+ node_id='a_node_id', server_hash='a_server_hash',
142+ fileobj_factory=lambda: None)
143+ self.command.start_unqueued() # create the logger
144+
145+ return res
146+
147+ def test_AQ_DOWNLOAD_FILE_PROGRESS_is_valid_event(self):
148+ """AQ_DOWNLOAD_FILE_PROGRESS is a valid event."""
149+ event = 'AQ_DOWNLOAD_FILE_PROGRESS'
150+ self.assertTrue(event in EVENTS)
151+ self.assertEqual(('share_id', 'node_id', 'n_bytes_read',
152+ 'deflated_size'), EVENTS[event])
153+
154+ def test_progress_hook(self):
155+ # would first get the node attribute including this
156+ self.command.action_queue.downloading['a_share_id', 'a_node_id'] = {}
157+ self.command.nacb(deflated_size = 2*TRANSFER_PROGRESS_THRESHOLD)
158+ self.command.progress_start(0)
159+
160+ self.command.progress_hook(5)
161+ self.assertEqual([], self.command.action_queue.event_queue.events)
162+ self.assertEqual(self.command.n_bytes_read_last, 5)
163+
164+ self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD)
165+ self.assertEqual([], self.command.action_queue.event_queue.events)
166+ self.assertEqual(self.command.n_bytes_read_last,
167+ TRANSFER_PROGRESS_THRESHOLD)
168+
169+ self.command.progress_start(5)
170+ self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD+5)
171+ kwargs = {'share_id': 'a_share_id', 'node_id': 'a_node_id',
172+ 'deflated_size': 2*TRANSFER_PROGRESS_THRESHOLD,
173+ 'n_bytes_read': 5+TRANSFER_PROGRESS_THRESHOLD}
174+ events = [('AQ_DOWNLOAD_FILE_PROGRESS', (), kwargs)]
175+ self.assertEqual(events, self.command.action_queue.event_queue.events)
176+ self.assertEqual(self.command.n_bytes_read_last,
177+ 5+TRANSFER_PROGRESS_THRESHOLD)
178+
179+
180+class UploadUnconnectedTestCase(FactoryBaseTestCase):
181+ """Test for Upload ActionQueueCommand, no connection"""
182+
183+ def setUp(self):
184+ """Init."""
185+ super(UploadUnconnectedTestCase, self).setUp()
186+
187+ self.rq = request_queue = RequestQueue(name='FOO',
188+ action_queue=self.action_queue)
189+ self.command = Upload(request_queue, share_id='a_share_id',
190+ node_id='a_node_id', previous_hash='prev_hash',
191+ hash='yadda', crc32=0, size=0,
192+ fileobj_factory=lambda: None,
193+ tempfile_factory=lambda: None)
194+ self.command.start_unqueued() # create the logger
195+
196+ def test_upload_progress_wrapper_setup(self):
197+ """Test the setting up of the progress wrapper in ._run()."""
198+ calls = []
199+ d = defer.Deferred()
200+ class FakedRequest():
201+ """Fake Request."""
202+ pass
203+ class FakedClient(object):
204+ """Fake Client."""
205+ def put_content_request(innerself, *args):
206+ """Fake a put content request with its deferred."""
207+ calls.append(args)
208+ req = FakedRequest()
209+ req.deferred = d
210+ return req
211+
212+ self.command.action_queue.connect_in_progress = False
213+ self.command.action_queue.client = FakedClient()
214+ self.command._run()
215+
216+ self.assertEqual(len(calls), 1)
217+ upload_wrapper = calls[0][-1]
218+ uploading = self.action_queue.uploading['a_share_id', 'a_node_id']
219+ self.assertTrue(upload_wrapper.data_dict is uploading)
220+ self.assertEqual(upload_wrapper.progress_hook,
221+ self.command.progress_hook)
222+ self.assertEqual(self.command.n_bytes_written_last, 0)
223+
224+ self.command.n_bytes_written_last = 20
225+ self.command._run()
226+ self.assertEqual(self.command.n_bytes_written_last, 0)
227+
228+class UploadProgressWrapperTestCase(BaseTwistedTestCase):
229+ """Test for the UploadProgressWrapper helper class."""
230+
231+ def test_read(self):
232+ """Test the read method."""
233+ progress = []
234+
235+ def progress_hook(n_bytes_written):
236+ progress.append(n_bytes_written)
237+
238+ info = {'n_bytes_written': 0}
239+ f = StringIO("x"*10+"y"*5)
240+ upw = UploadProgressWrapper(f, info, progress_hook)
241+
242+ res = upw.read(10)
243+ self.assertEqual(res, "x"*10)
244+ self.assertEqual(progress, [0])
245+
246+ res = upw.read(5)
247+ self.assertEqual(res, "y"*5)
248+ self.assertEqual(progress, [0, 10])
249+
250+
251 class UploadTestCase(ConnectedBaseTestCase):
252 """Test for Upload ActionQueueCommand."""
253
254@@ -1411,6 +1622,38 @@
255 events = [('AQ_UPLOAD_ERROR', (), kwargs)]
256 self.assertEquals(events, self.command.action_queue.event_queue.events)
257
258+ def test_AQ_UPLOAD_FILE_PROGRESS_is_valid_event(self):
259+ """AQ_UPLOAD_FILE_PROGRESS is a valid event."""
260+ event = 'AQ_UPLOAD_FILE_PROGRESS'
261+ self.assertTrue(event in EVENTS)
262+ self.assertEqual(('share_id', 'node_id', 'n_bytes_written',
263+ 'deflated_size'), EVENTS[event])
264+
265+ def test_progress_hook(self):
266+ """Test the progress hook."""
267+ self.command.deflated_size = 2*TRANSFER_PROGRESS_THRESHOLD
268+ self.command.n_bytes_written_last = 0
269+
270+ self.command.progress_hook(5)
271+ self.assertEqual([], self.command.action_queue.event_queue.events)
272+ self.assertEqual(self.command.n_bytes_written_last, 5)
273+
274+ self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD)
275+ self.assertEqual([], self.command.action_queue.event_queue.events)
276+ self.assertEqual(self.command.n_bytes_written_last,
277+ TRANSFER_PROGRESS_THRESHOLD)
278+
279+ self.command.n_bytes_written_last = 5
280+ self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD+14)
281+
282+ kwargs = {'share_id': 'a_share_id', 'node_id': 'a_node_id',
283+ 'deflated_size': 2*TRANSFER_PROGRESS_THRESHOLD,
284+ 'n_bytes_written': 14+TRANSFER_PROGRESS_THRESHOLD }
285+ events = [('AQ_UPLOAD_FILE_PROGRESS', (), kwargs)]
286+ self.assertEqual(events, self.command.action_queue.event_queue.events)
287+ self.assertEqual(self.command.n_bytes_written_last,
288+ 14+TRANSFER_PROGRESS_THRESHOLD)
289+
290
291 class CreateShareTestCase(ConnectedBaseTestCase):
292 """Test for CreateShare ActionQueueCommand."""
293
294=== modified file 'tests/syncdaemon/test_dbus.py'
295--- tests/syncdaemon/test_dbus.py 2010-04-09 18:21:13 +0000
296+++ tests/syncdaemon/test_dbus.py 2010-05-05 18:49:23 +0000
297@@ -981,6 +981,30 @@
298 self.main.event_q.push('AQ_DOWNLOAD_STARTED', '', 'node_id', '')
299 return d
300
301+ def test_download_file_progress(self):
302+ """Test the DBus signals in Status."""
303+ a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
304+ self.fs_manager.create(a_dir, "", is_dir=False)
305+ self.fs_manager.set_node_id(a_dir, "node_id")
306+
307+ d = defer.Deferred()
308+ def download_handler(path, info):
309+ """Handler for DownloadFileProgress signal."""
310+ self.assertEqual(a_dir, path.encode('utf-8'))
311+ self.assertEqual(info, {'n_bytes_read': '10',
312+ 'deflated_size': '20'})
313+ d.callback(True)
314+
315+ match = self.bus.add_signal_receiver(download_handler,
316+ signal_name='DownloadFileProgress')
317+ self.signal_receivers.add(match)
318+ self.main.event_q.push('AQ_DOWNLOAD_FILE_PROGRESS',
319+ share_id='',
320+ node_id='node_id',
321+ n_bytes_read=10,
322+ deflated_size=20)
323+ return d
324+
325 def test_download_finished(self):
326 """ Test the DBus signals in Status """
327 a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
328@@ -1056,6 +1080,30 @@
329 self.main.event_q.push('AQ_UPLOAD_STARTED', '', 'node_id', '')
330 return d
331
332+ def test_upload_file_progress(self):
333+ """Test the DBus signals in Status."""
334+ a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
335+ self.fs_manager.create(a_dir, "", is_dir=False)
336+ self.fs_manager.set_node_id(a_dir, "node_id")
337+
338+ d = defer.Deferred()
339+ def upload_handler(path, info):
340+ """Handler for UploadFileProgress signal."""
341+ self.assertEqual(a_dir, path.encode('utf-8'))
342+ self.assertEqual(info, {'n_bytes_written': '10',
343+ 'deflated_size': '20'})
344+ d.callback(True)
345+
346+ match = self.bus.add_signal_receiver(upload_handler,
347+ signal_name='UploadFileProgress')
348+ self.signal_receivers.add(match)
349+ self.main.event_q.push('AQ_UPLOAD_FILE_PROGRESS',
350+ share_id='',
351+ node_id='node_id',
352+ n_bytes_written=10,
353+ deflated_size=20)
354+ return d
355+
356 def test_upload_finished(self):
357 """ Test the DBus signals in Status """
358 a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
359
360=== modified file 'ubuntuone/syncdaemon/action_queue.py'
361--- ubuntuone/syncdaemon/action_queue.py 2010-05-03 23:40:25 +0000
362+++ ubuntuone/syncdaemon/action_queue.py 2010-05-05 18:49:23 +0000
363@@ -66,6 +66,8 @@
364 # Regular expression to validate an e-mail address
365 EREGEX = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
366
367+# progress threshold to emit a download/upload progress event: 64Kb
368+TRANSFER_PROGRESS_THRESHOLD = 64*1024*1024
369
370 def passit(func):
371 """Pass the value on for the next deferred, while calling func with it."""
372@@ -598,13 +600,13 @@
373 """A wrapper around the file-like object used for Uploads.
374
375 It can be used to keep track of the number of bytes that have been
376- written to the store.
377+ written to the store and invokes a hook on progress.
378
379 """
380
381- __slots__ = ('fd', 'data_dict', 'n_bytes_read')
382+ __slots__ = ('fd', 'data_dict', 'n_bytes_read', 'progress_hook')
383
384- def __init__(self, fd, data_dict):
385+ def __init__(self, fd, data_dict, progress_hook):
386 """
387 fd is the file-like object used for uploads. data_dict is the
388 entry in the uploading dictionary.
389@@ -612,6 +614,7 @@
390 self.fd = fd
391 self.data_dict = data_dict
392 self.n_bytes_read = 0
393+ self.progress_hook = progress_hook
394
395 def read(self, size=None):
396 """
397@@ -623,6 +626,8 @@
398 latter is done directly in the data_dict.
399 """
400 self.data_dict['n_bytes_written'] = self.n_bytes_read
401+ self.progress_hook(self.n_bytes_read)
402+
403 data = self.fd.read(size)
404 self.n_bytes_read += len(data)
405 return data
406@@ -2192,7 +2197,7 @@
407
408 __slots__ = ('share_id', 'node_id', 'server_hash', 'fileobj_factory',
409 'fileobj', 'gunzip', 'marker_maybe', 'cancelled',
410- 'download_req')
411+ 'download_req', 'deflated_size')
412 logged_attrs = ('share_id', 'node_id', 'server_hash', 'fileobj_factory')
413
414 def __init__(self, request_queue, share_id, node_id, server_hash,
415@@ -2243,7 +2248,7 @@
416 'command': self}
417 assert downloading[self.share_id, self.node_id]['command'] is self
418 offset = downloading[self.share_id, self.node_id]['n_bytes_read']
419-
420+ self.progress_start(offset)
421 self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
422 share_id=self.share_id,
423 node_id=self.node_id,
424@@ -2318,9 +2323,19 @@
425 self.fileobj.write(self.gunzip.decompress(bytes))
426 self.fileobj.flush() # not strictly necessary but nice to
427 # see the downloaded size
428-
429+ self.progress_hook(dloading['n_bytes_read'])
430+
431+ def progress_start(self, n_bytes_read_already):
432+ """Hook to start tracking progress."""
433+ pass
434+
435+ def progress_hook(self, n_bytes_read):
436+ """Hook to track progress."""
437+ pass
438+
439 def nacb(self, **kwargs):
440 """Set the node attrs in the 'currently downloading' dict."""
441+ self.deflated_size = kwargs['deflated_size']
442 self.action_queue.downloading[self.share_id,
443 self.node_id].update(kwargs)
444
445@@ -2353,10 +2368,29 @@
446 class Download(GetContentMixin, ActionQueueCommand):
447 """Get the contents of a file."""
448
449- __slots__ = ()
450+ __slots__ = ('n_bytes_read_last')
451
452 is_dir = False
453
454+ def progress_start(self, n_bytes_read_already):
455+ """Start tracking progress.
456+
457+ Consider that n_bytes_read_already have been already read.
458+ """
459+ self.n_bytes_read_last = n_bytes_read_already
460+
461+ def progress_hook(self, n_bytes_read):
462+ """Convert downloading progress into an event."""
463+ n_bytes_read_last = self.n_bytes_read_last
464+ self.n_bytes_read_last = n_bytes_read
465+ # produce an event only if there has been a threshold-sized progress
466+ if n_bytes_read - n_bytes_read_last < TRANSFER_PROGRESS_THRESHOLD:
467+ return
468+ self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS',
469+ share_id=self.share_id,
470+ node_id=self.node_id,
471+ n_bytes_read=n_bytes_read,
472+ deflated_size=self.deflated_size)
473
474 class Upload(ActionQueueCommand):
475 """Upload stuff to a file."""
476@@ -2364,7 +2398,7 @@
477 __slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
478 'size', 'fileobj_factory', 'tempfile_factory',
479 'deflated_size', 'tempfile', 'cancelled', 'upload_req',
480- 'marker_maybe')
481+ 'marker_maybe', 'n_bytes_written_last')
482
483 logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
484 'size', 'fileobj_factory')
485@@ -2389,6 +2423,7 @@
486 self.marker_maybe = None
487 if (self.share_id, self.node_id) in self.action_queue.uploading:
488 self.action_queue.cancel_upload(self.share_id, self.node_id)
489+ self.n_bytes_written_last = None # set by _run
490
491 def _is_runnable(self):
492 """Returns True if there is sufficient space available to complete
493@@ -2451,7 +2486,8 @@
494
495 if getattr(self.tempfile, 'name', None) is not None:
496 self.tempfile = open(self.tempfile.name)
497- f = UploadProgressWrapper(self.tempfile, uploading)
498+ self.n_bytes_written_last = 0
499+ f = UploadProgressWrapper(self.tempfile, uploading, self.progress_hook)
500 req = self.action_queue.client.put_content_request(
501 self.share_id, self.node_id, self.previous_hash, self.hash,
502 self.crc32, self.size, self.deflated_size, f)
503@@ -2465,6 +2501,19 @@
504 d.addBoth(passit(lambda _: self.tempfile.close()))
505 return d
506
507+ def progress_hook(self, n_bytes_written):
508+ """Convert uploading progress into an event."""
509+ n_bytes_written_last = self.n_bytes_written_last
510+ self.n_bytes_written_last = n_bytes_written
511+ # produce an event only if there has been a threshold-sized progress
512+ if n_bytes_written - n_bytes_written_last < TRANSFER_PROGRESS_THRESHOLD:
513+ return
514+ self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS',
515+ share_id=self.share_id,
516+ node_id=self.node_id,
517+ n_bytes_written=n_bytes_written,
518+ deflated_size=self.deflated_size)
519+
520 def handle_success(self, _):
521 """
522 It worked! Push the event.
523
524=== modified file 'ubuntuone/syncdaemon/dbus_interface.py'
525--- ubuntuone/syncdaemon/dbus_interface.py 2010-04-29 20:56:27 +0000
526+++ ubuntuone/syncdaemon/dbus_interface.py 2010-05-05 18:49:23 +0000
527@@ -276,6 +276,12 @@
528
529 @dbus.service.signal(DBUS_IFACE_STATUS_NAME,
530 signature='sa{ss}')
531+ def DownloadFileProgress(self, path, info):
532+ """Fire a D-BUS signal, notifying about a download progress."""
533+ pass
534+
535+ @dbus.service.signal(DBUS_IFACE_STATUS_NAME,
536+ signature='sa{ss}')
537 def DownloadFinished(self, path, info):
538 """ Fire a D-BUS signal, notifying a download has finished. """
539 pass
540@@ -287,6 +293,12 @@
541
542 @dbus.service.signal(DBUS_IFACE_STATUS_NAME,
543 signature='sa{ss}')
544+ def UploadFileProgress(self, path, info):
545+ """Fire a D-BUS signal, notifying about an upload progress."""
546+ pass
547+
548+ @dbus.service.signal(DBUS_IFACE_STATUS_NAME,
549+ signature='sa{ss}')
550 def UploadFinished(self, path, info):
551 """ Fire a D-BUS signal, notifying an upload has finished. """
552 pass
553@@ -356,8 +368,14 @@
554 """ Emits the signal """
555 self.DownloadStarted(download)
556
557+ def emit_download_file_progress(self, download, **info):
558+ """Emits the signal."""
559+ for k, v in info.copy().items():
560+ info[str(k)] = str(v)
561+ self.DownloadFileProgress(download, info)
562+
563 def emit_download_finished(self, download, **info):
564- """ Emits the signal """
565+ """Emits the signal."""
566 for k, v in info.copy().items():
567 info[str(k)] = str(v)
568 self.DownloadFinished(download, info)
569@@ -366,6 +384,12 @@
570 """ Emits the signal """
571 self.UploadStarted(upload)
572
573+ def emit_upload_file_progress(self, upload, **info):
574+ """Emits the signal."""
575+ for k, v in info.copy().items():
576+ info[str(k)] = str(v)
577+ self.UploadFileProgress(upload, info)
578+
579 def emit_upload_finished(self, upload, **info):
580 """ Emits the signal """
581 for k, v in info.copy().items():
582@@ -457,6 +481,28 @@
583 node_id=str(node_id))
584 self.dbus_iface.status.emit_signal_error('DownloadStarted', args)
585
586+ def handle_AQ_DOWNLOAD_FILE_PROGRESS(self, share_id, node_id,
587+ n_bytes_read, deflated_size):
588+ """Handle AQ_DOWNLOAD_FILE_PROGRESS."""
589+ self.handle_default('AQ_DOWNLOAD_FILE_PROGRESS', share_id, node_id,
590+ n_bytes_read, deflated_size)
591+ try:
592+ mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id)
593+ except KeyError, e:
594+ args = dict(message='The md is gone before sending '
595+ 'DownloadFileProgress signal',
596+ error=str(e),
597+ share_id=str(share_id),
598+ node_id=str(node_id))
599+ self.dbus_iface.status.emit_signal_error('DownloadFileProgress',
600+ args)
601+ else:
602+ path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path)
603+ self.dbus_iface.status.emit_download_file_progress(path,
604+ n_bytes_read=n_bytes_read,
605+ deflated_size=deflated_size
606+ )
607+
608 def handle_AQ_DOWNLOAD_FINISHED(self, share_id, node_id, server_hash):
609 """ handle AQ_DOWNLOAD_FINISHED """
610 self.handle_default('AQ_DOWNLOAD_FINISHED', share_id,
611@@ -519,6 +565,28 @@
612 node_id=str(node_id))
613 self.dbus_iface.status.emit_signal_error('UploadStarted', args)
614
615+ def handle_AQ_UPLOAD_FILE_PROGRESS(self, share_id, node_id,
616+ n_bytes_written, deflated_size):
617+ """Handle AQ_UPLOAD_FILE_PROGRESS."""
618+ self.handle_default('AQ_UPLOAD_FILE_PROGRESS', share_id, node_id,
619+ n_bytes_written, deflated_size)
620+ try:
621+ mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id)
622+ except KeyError, e:
623+ args = dict(message='The md is gone before sending '
624+ 'UploadFileProgress signal',
625+ error=str(e),
626+ share_id=str(share_id),
627+ node_id=str(node_id))
628+ self.dbus_iface.status.emit_signal_error('UploadFileProgress',
629+ args)
630+ else:
631+ path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path)
632+ self.dbus_iface.status.emit_upload_file_progress(path,
633+ n_bytes_written=n_bytes_written,
634+ deflated_size=deflated_size
635+ )
636+
637 def handle_AQ_UPLOAD_FINISHED(self, share_id, node_id, hash):
638 """ handle AQ_UPLOAD_FINISHED """
639 self.handle_default('AQ_UPLOAD_FINISHED', share_id, node_id, hash)
640
641=== modified file 'ubuntuone/syncdaemon/event_queue.py'
642--- ubuntuone/syncdaemon/event_queue.py 2010-04-12 17:58:20 +0000
643+++ ubuntuone/syncdaemon/event_queue.py 2010-05-05 18:49:23 +0000
644@@ -53,12 +53,16 @@
645 'AQ_UNLINK_OK': ('share_id', 'parent_id', 'node_id'),
646 'AQ_UNLINK_ERROR': ('share_id', 'parent_id', 'node_id', 'error'),
647 'AQ_DOWNLOAD_STARTED': ('share_id', 'node_id', 'server_hash'),
648+ 'AQ_DOWNLOAD_FILE_PROGRESS': ('share_id', 'node_id',
649+ 'n_bytes_read', 'deflated_size'),
650 'AQ_DOWNLOAD_COMMIT': ('share_id', 'node_id', 'server_hash'),
651 'AQ_DOWNLOAD_FINISHED': ('share_id', 'node_id', 'server_hash'),
652 'AQ_DOWNLOAD_ERROR': ('share_id', 'node_id', 'server_hash', 'error'),
653 'AQ_DOWNLOAD_CANCELLED': ('share_id', 'node_id', 'server_hash'),
654 'AQ_DOWNLOAD_DOES_NOT_EXIST': ('share_id', 'node_id'),
655 'AQ_UPLOAD_STARTED' : ('share_id', 'node_id', 'hash'),
656+ 'AQ_UPLOAD_FILE_PROGRESS': ('share_id', 'node_id',
657+ 'n_bytes_written', 'deflated_size'),
658 'AQ_UPLOAD_FINISHED': ('share_id', 'node_id', 'hash'),
659 'AQ_UPLOAD_ERROR': ('share_id', 'node_id', 'error', 'hash'),
660 'AQ_SHARES_LIST': ('shares_list',),

Subscribers

People subscribed via source and target branches