Merge lp:~pedronis/ubuntuone-client/transfer-progress-events into lp:ubuntuone-client
- transfer-progress-events
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Stuart Colville | ||||
Approved revision: | 528 | ||||
Merged at revision: | 520 | ||||
Proposed branch: | lp:~pedronis/ubuntuone-client/transfer-progress-events | ||||
Merge into: | lp:ubuntuone-client | ||||
Diff against target: |
660 lines (+425/-13) 5 files modified
tests/syncdaemon/test_action_queue.py (+246/-3) tests/syncdaemon/test_dbus.py (+48/-0) ubuntuone/syncdaemon/action_queue.py (+58/-9) ubuntuone/syncdaemon/dbus_interface.py (+69/-1) ubuntuone/syncdaemon/event_queue.py (+4/-0) |
||||
To merge this branch: | bzr merge lp:~pedronis/ubuntuone-client/transfer-progress-events | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Facundo Batista (community) | Approve | ||
Stuart Colville (community) | Approve | ||
Rick McBride (community) | Abstain | ||
Samuele Pedroni | Abstain | ||
Natalia Bidart (community) | Approve | ||
Review via email:
|
Commit message
addressing the demands of Bug #570747, introduces internal events AQ_DOWNLOAD/
Description of the change
addressing the demands of Bug #570747, introduces internal events AQ_DOWNLOAD/
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Samuele Pedroni (pedronis) wrote : | # |
Now all style issues should have been addressed, also the progress hooks have been simplified to take only the byte count.
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Rodrigo Moya (rodrigo-moya) wrote : | # |
As for the libubuntuone needs, this is ok, even a bigger threshold would be ok
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Natalia Bidart (nataliabidart) wrote : | # |
Thank you!
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Samuele Pedroni (pedronis) wrote : | # |
there's a bug that was not there about resetting the n_bytes_*_last counters that need fixing
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Samuele Pedroni (pedronis) wrote : | # |
I pushed fixes for the issue, plus testing
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Rick McBride (rmcbride) : | # |
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Stuart Colville (muffinresearch) wrote : | # |
Tests pass - looks good.
Preview Diff
1 | === modified file 'tests/syncdaemon/test_action_queue.py' |
2 | --- tests/syncdaemon/test_action_queue.py 2010-05-03 23:40:25 +0000 |
3 | +++ tests/syncdaemon/test_action_queue.py 2010-05-05 18:49:23 +0000 |
4 | @@ -40,6 +40,7 @@ |
5 | from contrib.testing.testcase import ( |
6 | BaseTwistedTestCase, MementoHandler, DummyClass |
7 | ) |
8 | +from contrib.mocker import Mocker |
9 | |
10 | from ubuntuone.storageprotocol import client, errors, protocol_pb2, volumes |
11 | from ubuntuone.syncdaemon import states |
12 | @@ -47,8 +48,10 @@ |
13 | from ubuntuone.syncdaemon.main import Main |
14 | from ubuntuone.syncdaemon.action_queue import ( |
15 | ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF, |
16 | - DeleteVolume, ListDir, ListVolumes, NoisyRequestQueue, RequestQueue, |
17 | - Upload, CreateShare, GetPublicFiles, |
18 | + DeleteVolume, Download, GetContentMixin, ListDir, ListVolumes, |
19 | + NoisyRequestQueue, RequestQueue, UploadProgressWrapper, Upload, |
20 | + CreateShare, GetPublicFiles, |
21 | + TRANSFER_PROGRESS_THRESHOLD |
22 | ) |
23 | from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS |
24 | from ubuntuone.syncdaemon.volume_manager import UDF |
25 | @@ -1262,6 +1265,49 @@ |
26 | self.assertTrue(res is None) |
27 | |
28 | |
29 | +class GetContentMixinTestCase(ConnectedBaseTestCase): |
30 | + """Test for GetContentMixin ActionQueueCommand subclass.""" |
31 | + |
32 | + def setUp(self): |
33 | + """Init.""" |
34 | + res = super(GetContentMixinTestCase, self).setUp() |
35 | + |
36 | + self.rq = RequestQueue(name='FOO', action_queue=self.action_queue) |
37 | + return res |
38 | + |
39 | + def test_progress(self): |
40 | + """Test progress_hook invocation.""" |
41 | + mocker = Mocker() |
42 | + gunzip = mocker.mock() |
43 | + gunzip.decompress('some compressed bytes') |
44 | + mocker.result('some bytes') |
45 | + mocker.replay() |
46 | + |
47 | + calls = [] |
48 | + class MyGetContent(GetContentMixin): |
49 | + """Subclass to check progress_hook invocation.""" |
50 | + def progress_hook(innerself, n_bytes): |
51 | + calls.append((innerself.fileobj.getvalue(), |
52 | + n_bytes)) |
53 | + # check just that is there |
54 | + super(MyGetContent, innerself).progress_hook(n_bytes) |
55 | + |
56 | + command = MyGetContent(self.rq, share_id='a_share_id', |
57 | + node_id='a_node_id', server_hash='a_server_hash', |
58 | + fileobj_factory=lambda: None) |
59 | + command.gunzip = gunzip |
60 | + command.fileobj = StringIO() |
61 | + dloading = { |
62 | + 'n_bytes_read': 0 |
63 | + } |
64 | + command.action_queue.downloading['a_share_id', 'a_node_id'] = dloading |
65 | + command.cb('some compressed bytes') |
66 | + expected = len('some compressed bytes') |
67 | + self.assertEqual(calls, [('some bytes', expected)]) |
68 | + mocker.restore() |
69 | + mocker.verify() |
70 | + |
71 | + |
72 | class ListDirTestCase(ConnectedBaseTestCase): |
73 | """Test for ListDir ActionQueueCommand.""" |
74 | |
75 | @@ -1322,12 +1368,177 @@ |
76 | self.assertTrue(res is None) |
77 | |
78 | def test_AQ_DOWNLOAD_DOES_NOT_EXIST_is_valid_event(self): |
79 | - """AQ_DOWNLOAD_DOES_NOT_EXIST is a valdi event.""" |
80 | + """AQ_DOWNLOAD_DOES_NOT_EXIST is a valid event.""" |
81 | event = 'AQ_DOWNLOAD_DOES_NOT_EXIST' |
82 | self.assertTrue(event in EVENTS) |
83 | self.assertEquals(('share_id', 'node_id'), EVENTS[event]) |
84 | |
85 | |
86 | +class DownloadUnconnectedTestCase(FactoryBaseTestCase): |
87 | + """Test for Download ActionQueueCommand, no connection""" |
88 | + |
89 | + def setUp(self): |
90 | + """Init.""" |
91 | + super(DownloadUnconnectedTestCase, self).setUp() |
92 | + |
93 | + self.rq = request_queue = RequestQueue(name='FOO', |
94 | + action_queue=self.action_queue) |
95 | + self.command = Download(request_queue, share_id='a_share_id', |
96 | + node_id='a_node_id', server_hash='a_server_hash', |
97 | + fileobj_factory=lambda: None) |
98 | + self.command.start_unqueued() # create the logger |
99 | + |
100 | + def test_progress_information_setup(self): |
101 | + """Test the setting up of the progress information in ._run().""" |
102 | + calls = [] |
103 | + d = defer.Deferred() |
104 | + class FakedRequest(): |
105 | + """Fake Request.""" |
106 | + pass |
107 | + class FakedClient(object): |
108 | + """Fake Client.""" |
109 | + def get_content_request(innerself, *args, **kwds): |
110 | + """Fake a get content request with its deferred.""" |
111 | + calls.append(kwds.get('offset')) |
112 | + req = FakedRequest() |
113 | + req.deferred = d |
114 | + return req |
115 | + |
116 | + self.command.action_queue.connect_in_progress = False |
117 | + self.command.action_queue.client = FakedClient() |
118 | + self.command._run() |
119 | + |
120 | + self.assertEqual(self.command.n_bytes_read_last, 0) |
121 | + self.assertEqual(calls, [0]) |
122 | + |
123 | + calls = [] |
124 | + dloading = self.command.action_queue.downloading['a_share_id', |
125 | + 'a_node_id'] |
126 | + dloading['n_bytes_read'] = 20 |
127 | + self.command._run() |
128 | + |
129 | + self.assertEqual(self.command.n_bytes_read_last, 20) |
130 | + self.assertEqual(calls, [20]) |
131 | + |
132 | +class DownloadTestCase(ConnectedBaseTestCase): |
133 | + """Test for Download ActionQueueCommand.""" |
134 | + |
135 | + def setUp(self): |
136 | + """Init.""" |
137 | + res = super(DownloadTestCase, self).setUp() |
138 | + |
139 | + request_queue = RequestQueue(name='FOO', action_queue=self.action_queue) |
140 | + self.command = Download(request_queue, share_id='a_share_id', |
141 | + node_id='a_node_id', server_hash='a_server_hash', |
142 | + fileobj_factory=lambda: None) |
143 | + self.command.start_unqueued() # create the logger |
144 | + |
145 | + return res |
146 | + |
147 | + def test_AQ_DOWNLOAD_FILE_PROGRESS_is_valid_event(self): |
148 | + """AQ_DOWNLOAD_FILE_PROGRESS is a valid event.""" |
149 | + event = 'AQ_DOWNLOAD_FILE_PROGRESS' |
150 | + self.assertTrue(event in EVENTS) |
151 | + self.assertEqual(('share_id', 'node_id', 'n_bytes_read', |
152 | + 'deflated_size'), EVENTS[event]) |
153 | + |
154 | + def test_progress_hook(self): |
155 | + # would first get the node attribute including this |
156 | + self.command.action_queue.downloading['a_share_id', 'a_node_id'] = {} |
157 | + self.command.nacb(deflated_size = 2*TRANSFER_PROGRESS_THRESHOLD) |
158 | + self.command.progress_start(0) |
159 | + |
160 | + self.command.progress_hook(5) |
161 | + self.assertEqual([], self.command.action_queue.event_queue.events) |
162 | + self.assertEqual(self.command.n_bytes_read_last, 5) |
163 | + |
164 | + self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD) |
165 | + self.assertEqual([], self.command.action_queue.event_queue.events) |
166 | + self.assertEqual(self.command.n_bytes_read_last, |
167 | + TRANSFER_PROGRESS_THRESHOLD) |
168 | + |
169 | + self.command.progress_start(5) |
170 | + self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD+5) |
171 | + kwargs = {'share_id': 'a_share_id', 'node_id': 'a_node_id', |
172 | + 'deflated_size': 2*TRANSFER_PROGRESS_THRESHOLD, |
173 | + 'n_bytes_read': 5+TRANSFER_PROGRESS_THRESHOLD} |
174 | + events = [('AQ_DOWNLOAD_FILE_PROGRESS', (), kwargs)] |
175 | + self.assertEqual(events, self.command.action_queue.event_queue.events) |
176 | + self.assertEqual(self.command.n_bytes_read_last, |
177 | + 5+TRANSFER_PROGRESS_THRESHOLD) |
178 | + |
179 | + |
180 | +class UploadUnconnectedTestCase(FactoryBaseTestCase): |
181 | + """Test for Upload ActionQueueCommand, no connection""" |
182 | + |
183 | + def setUp(self): |
184 | + """Init.""" |
185 | + super(UploadUnconnectedTestCase, self).setUp() |
186 | + |
187 | + self.rq = request_queue = RequestQueue(name='FOO', |
188 | + action_queue=self.action_queue) |
189 | + self.command = Upload(request_queue, share_id='a_share_id', |
190 | + node_id='a_node_id', previous_hash='prev_hash', |
191 | + hash='yadda', crc32=0, size=0, |
192 | + fileobj_factory=lambda: None, |
193 | + tempfile_factory=lambda: None) |
194 | + self.command.start_unqueued() # create the logger |
195 | + |
196 | + def test_upload_progress_wrapper_setup(self): |
197 | + """Test the setting up of the progress wrapper in ._run().""" |
198 | + calls = [] |
199 | + d = defer.Deferred() |
200 | + class FakedRequest(): |
201 | + """Fake Request.""" |
202 | + pass |
203 | + class FakedClient(object): |
204 | + """Fake Client.""" |
205 | + def put_content_request(innerself, *args): |
206 | + """Fake a put content request with its deferred.""" |
207 | + calls.append(args) |
208 | + req = FakedRequest() |
209 | + req.deferred = d |
210 | + return req |
211 | + |
212 | + self.command.action_queue.connect_in_progress = False |
213 | + self.command.action_queue.client = FakedClient() |
214 | + self.command._run() |
215 | + |
216 | + self.assertEqual(len(calls), 1) |
217 | + upload_wrapper = calls[0][-1] |
218 | + uploading = self.action_queue.uploading['a_share_id', 'a_node_id'] |
219 | + self.assertTrue(upload_wrapper.data_dict is uploading) |
220 | + self.assertEqual(upload_wrapper.progress_hook, |
221 | + self.command.progress_hook) |
222 | + self.assertEqual(self.command.n_bytes_written_last, 0) |
223 | + |
224 | + self.command.n_bytes_written_last = 20 |
225 | + self.command._run() |
226 | + self.assertEqual(self.command.n_bytes_written_last, 0) |
227 | + |
228 | +class UploadProgressWrapperTestCase(BaseTwistedTestCase): |
229 | + """Test for the UploadProgressWrapper helper class.""" |
230 | + |
231 | + def test_read(self): |
232 | + """Test the read method.""" |
233 | + progress = [] |
234 | + |
235 | + def progress_hook(n_bytes_written): |
236 | + progress.append(n_bytes_written) |
237 | + |
238 | + info = {'n_bytes_written': 0} |
239 | + f = StringIO("x"*10+"y"*5) |
240 | + upw = UploadProgressWrapper(f, info, progress_hook) |
241 | + |
242 | + res = upw.read(10) |
243 | + self.assertEqual(res, "x"*10) |
244 | + self.assertEqual(progress, [0]) |
245 | + |
246 | + res = upw.read(5) |
247 | + self.assertEqual(res, "y"*5) |
248 | + self.assertEqual(progress, [0, 10]) |
249 | + |
250 | + |
251 | class UploadTestCase(ConnectedBaseTestCase): |
252 | """Test for Upload ActionQueueCommand.""" |
253 | |
254 | @@ -1411,6 +1622,38 @@ |
255 | events = [('AQ_UPLOAD_ERROR', (), kwargs)] |
256 | self.assertEquals(events, self.command.action_queue.event_queue.events) |
257 | |
258 | + def test_AQ_UPLOAD_FILE_PROGRESS_is_valid_event(self): |
259 | + """AQ_UPLOAD_FILE_PROGRESS is a valid event.""" |
260 | + event = 'AQ_UPLOAD_FILE_PROGRESS' |
261 | + self.assertTrue(event in EVENTS) |
262 | + self.assertEqual(('share_id', 'node_id', 'n_bytes_written', |
263 | + 'deflated_size'), EVENTS[event]) |
264 | + |
265 | + def test_progress_hook(self): |
266 | + """Test the progress hook.""" |
267 | + self.command.deflated_size = 2*TRANSFER_PROGRESS_THRESHOLD |
268 | + self.command.n_bytes_written_last = 0 |
269 | + |
270 | + self.command.progress_hook(5) |
271 | + self.assertEqual([], self.command.action_queue.event_queue.events) |
272 | + self.assertEqual(self.command.n_bytes_written_last, 5) |
273 | + |
274 | + self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD) |
275 | + self.assertEqual([], self.command.action_queue.event_queue.events) |
276 | + self.assertEqual(self.command.n_bytes_written_last, |
277 | + TRANSFER_PROGRESS_THRESHOLD) |
278 | + |
279 | + self.command.n_bytes_written_last = 5 |
280 | + self.command.progress_hook(TRANSFER_PROGRESS_THRESHOLD+14) |
281 | + |
282 | + kwargs = {'share_id': 'a_share_id', 'node_id': 'a_node_id', |
283 | + 'deflated_size': 2*TRANSFER_PROGRESS_THRESHOLD, |
284 | + 'n_bytes_written': 14+TRANSFER_PROGRESS_THRESHOLD } |
285 | + events = [('AQ_UPLOAD_FILE_PROGRESS', (), kwargs)] |
286 | + self.assertEqual(events, self.command.action_queue.event_queue.events) |
287 | + self.assertEqual(self.command.n_bytes_written_last, |
288 | + 14+TRANSFER_PROGRESS_THRESHOLD) |
289 | + |
290 | |
291 | class CreateShareTestCase(ConnectedBaseTestCase): |
292 | """Test for CreateShare ActionQueueCommand.""" |
293 | |
294 | === modified file 'tests/syncdaemon/test_dbus.py' |
295 | --- tests/syncdaemon/test_dbus.py 2010-04-09 18:21:13 +0000 |
296 | +++ tests/syncdaemon/test_dbus.py 2010-05-05 18:49:23 +0000 |
297 | @@ -981,6 +981,30 @@ |
298 | self.main.event_q.push('AQ_DOWNLOAD_STARTED', '', 'node_id', '') |
299 | return d |
300 | |
301 | + def test_download_file_progress(self): |
302 | + """Test the DBus signals in Status.""" |
303 | + a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8')) |
304 | + self.fs_manager.create(a_dir, "", is_dir=False) |
305 | + self.fs_manager.set_node_id(a_dir, "node_id") |
306 | + |
307 | + d = defer.Deferred() |
308 | + def download_handler(path, info): |
309 | + """Handler for DownloadFileProgress signal.""" |
310 | + self.assertEqual(a_dir, path.encode('utf-8')) |
311 | + self.assertEqual(info, {'n_bytes_read': '10', |
312 | + 'deflated_size': '20'}) |
313 | + d.callback(True) |
314 | + |
315 | + match = self.bus.add_signal_receiver(download_handler, |
316 | + signal_name='DownloadFileProgress') |
317 | + self.signal_receivers.add(match) |
318 | + self.main.event_q.push('AQ_DOWNLOAD_FILE_PROGRESS', |
319 | + share_id='', |
320 | + node_id='node_id', |
321 | + n_bytes_read=10, |
322 | + deflated_size=20) |
323 | + return d |
324 | + |
325 | def test_download_finished(self): |
326 | """ Test the DBus signals in Status """ |
327 | a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8')) |
328 | @@ -1056,6 +1080,30 @@ |
329 | self.main.event_q.push('AQ_UPLOAD_STARTED', '', 'node_id', '') |
330 | return d |
331 | |
332 | + def test_upload_file_progress(self): |
333 | + """Test the DBus signals in Status.""" |
334 | + a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8')) |
335 | + self.fs_manager.create(a_dir, "", is_dir=False) |
336 | + self.fs_manager.set_node_id(a_dir, "node_id") |
337 | + |
338 | + d = defer.Deferred() |
339 | + def upload_handler(path, info): |
340 | + """Handler for UploadFileProgress signal.""" |
341 | + self.assertEqual(a_dir, path.encode('utf-8')) |
342 | + self.assertEqual(info, {'n_bytes_written': '10', |
343 | + 'deflated_size': '20'}) |
344 | + d.callback(True) |
345 | + |
346 | + match = self.bus.add_signal_receiver(upload_handler, |
347 | + signal_name='UploadFileProgress') |
348 | + self.signal_receivers.add(match) |
349 | + self.main.event_q.push('AQ_UPLOAD_FILE_PROGRESS', |
350 | + share_id='', |
351 | + node_id='node_id', |
352 | + n_bytes_written=10, |
353 | + deflated_size=20) |
354 | + return d |
355 | + |
356 | def test_upload_finished(self): |
357 | """ Test the DBus signals in Status """ |
358 | a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8')) |
359 | |
360 | === modified file 'ubuntuone/syncdaemon/action_queue.py' |
361 | --- ubuntuone/syncdaemon/action_queue.py 2010-05-03 23:40:25 +0000 |
362 | +++ ubuntuone/syncdaemon/action_queue.py 2010-05-05 18:49:23 +0000 |
363 | @@ -66,6 +66,8 @@ |
364 | # Regular expression to validate an e-mail address |
365 | EREGEX = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$" |
366 | |
367 | +# progress threshold to emit a download/upload progress event: 64Kb |
368 | +TRANSFER_PROGRESS_THRESHOLD = 64*1024*1024 |
369 | |
370 | def passit(func): |
371 | """Pass the value on for the next deferred, while calling func with it.""" |
372 | @@ -598,13 +600,13 @@ |
373 | """A wrapper around the file-like object used for Uploads. |
374 | |
375 | It can be used to keep track of the number of bytes that have been |
376 | - written to the store. |
377 | + written to the store and invokes a hook on progress. |
378 | |
379 | """ |
380 | |
381 | - __slots__ = ('fd', 'data_dict', 'n_bytes_read') |
382 | + __slots__ = ('fd', 'data_dict', 'n_bytes_read', 'progress_hook') |
383 | |
384 | - def __init__(self, fd, data_dict): |
385 | + def __init__(self, fd, data_dict, progress_hook): |
386 | """ |
387 | fd is the file-like object used for uploads. data_dict is the |
388 | entry in the uploading dictionary. |
389 | @@ -612,6 +614,7 @@ |
390 | self.fd = fd |
391 | self.data_dict = data_dict |
392 | self.n_bytes_read = 0 |
393 | + self.progress_hook = progress_hook |
394 | |
395 | def read(self, size=None): |
396 | """ |
397 | @@ -623,6 +626,8 @@ |
398 | latter is done directly in the data_dict. |
399 | """ |
400 | self.data_dict['n_bytes_written'] = self.n_bytes_read |
401 | + self.progress_hook(self.n_bytes_read) |
402 | + |
403 | data = self.fd.read(size) |
404 | self.n_bytes_read += len(data) |
405 | return data |
406 | @@ -2192,7 +2197,7 @@ |
407 | |
408 | __slots__ = ('share_id', 'node_id', 'server_hash', 'fileobj_factory', |
409 | 'fileobj', 'gunzip', 'marker_maybe', 'cancelled', |
410 | - 'download_req') |
411 | + 'download_req', 'deflated_size') |
412 | logged_attrs = ('share_id', 'node_id', 'server_hash', 'fileobj_factory') |
413 | |
414 | def __init__(self, request_queue, share_id, node_id, server_hash, |
415 | @@ -2243,7 +2248,7 @@ |
416 | 'command': self} |
417 | assert downloading[self.share_id, self.node_id]['command'] is self |
418 | offset = downloading[self.share_id, self.node_id]['n_bytes_read'] |
419 | - |
420 | + self.progress_start(offset) |
421 | self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED', |
422 | share_id=self.share_id, |
423 | node_id=self.node_id, |
424 | @@ -2318,9 +2323,19 @@ |
425 | self.fileobj.write(self.gunzip.decompress(bytes)) |
426 | self.fileobj.flush() # not strictly necessary but nice to |
427 | # see the downloaded size |
428 | - |
429 | + self.progress_hook(dloading['n_bytes_read']) |
430 | + |
431 | + def progress_start(self, n_bytes_read_already): |
432 | + """Hook to start tracking progress.""" |
433 | + pass |
434 | + |
435 | + def progress_hook(self, n_bytes_read): |
436 | + """Hook to track progress.""" |
437 | + pass |
438 | + |
439 | def nacb(self, **kwargs): |
440 | """Set the node attrs in the 'currently downloading' dict.""" |
441 | + self.deflated_size = kwargs['deflated_size'] |
442 | self.action_queue.downloading[self.share_id, |
443 | self.node_id].update(kwargs) |
444 | |
445 | @@ -2353,10 +2368,29 @@ |
446 | class Download(GetContentMixin, ActionQueueCommand): |
447 | """Get the contents of a file.""" |
448 | |
449 | - __slots__ = () |
450 | + __slots__ = ('n_bytes_read_last') |
451 | |
452 | is_dir = False |
453 | |
454 | + def progress_start(self, n_bytes_read_already): |
455 | + """Start tracking progress. |
456 | + |
457 | + Consider that n_bytes_read_already have been already read. |
458 | + """ |
459 | + self.n_bytes_read_last = n_bytes_read_already |
460 | + |
461 | + def progress_hook(self, n_bytes_read): |
462 | + """Convert downloading progress into an event.""" |
463 | + n_bytes_read_last = self.n_bytes_read_last |
464 | + self.n_bytes_read_last = n_bytes_read |
465 | + # produce an event only if there has been a threshold-sized progress |
466 | + if n_bytes_read - n_bytes_read_last < TRANSFER_PROGRESS_THRESHOLD: |
467 | + return |
468 | + self.action_queue.event_queue.push('AQ_DOWNLOAD_FILE_PROGRESS', |
469 | + share_id=self.share_id, |
470 | + node_id=self.node_id, |
471 | + n_bytes_read=n_bytes_read, |
472 | + deflated_size=self.deflated_size) |
473 | |
474 | class Upload(ActionQueueCommand): |
475 | """Upload stuff to a file.""" |
476 | @@ -2364,7 +2398,7 @@ |
477 | __slots__ = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32', |
478 | 'size', 'fileobj_factory', 'tempfile_factory', |
479 | 'deflated_size', 'tempfile', 'cancelled', 'upload_req', |
480 | - 'marker_maybe') |
481 | + 'marker_maybe', 'n_bytes_written_last') |
482 | |
483 | logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32', |
484 | 'size', 'fileobj_factory') |
485 | @@ -2389,6 +2423,7 @@ |
486 | self.marker_maybe = None |
487 | if (self.share_id, self.node_id) in self.action_queue.uploading: |
488 | self.action_queue.cancel_upload(self.share_id, self.node_id) |
489 | + self.n_bytes_written_last = None # set by _run |
490 | |
491 | def _is_runnable(self): |
492 | """Returns True if there is sufficient space available to complete |
493 | @@ -2451,7 +2486,8 @@ |
494 | |
495 | if getattr(self.tempfile, 'name', None) is not None: |
496 | self.tempfile = open(self.tempfile.name) |
497 | - f = UploadProgressWrapper(self.tempfile, uploading) |
498 | + self.n_bytes_written_last = 0 |
499 | + f = UploadProgressWrapper(self.tempfile, uploading, self.progress_hook) |
500 | req = self.action_queue.client.put_content_request( |
501 | self.share_id, self.node_id, self.previous_hash, self.hash, |
502 | self.crc32, self.size, self.deflated_size, f) |
503 | @@ -2465,6 +2501,19 @@ |
504 | d.addBoth(passit(lambda _: self.tempfile.close())) |
505 | return d |
506 | |
507 | + def progress_hook(self, n_bytes_written): |
508 | + """Convert uploading progress into an event.""" |
509 | + n_bytes_written_last = self.n_bytes_written_last |
510 | + self.n_bytes_written_last = n_bytes_written |
511 | + # produce an event only if there has been a threshold-sized progress |
512 | + if n_bytes_written - n_bytes_written_last < TRANSFER_PROGRESS_THRESHOLD: |
513 | + return |
514 | + self.action_queue.event_queue.push('AQ_UPLOAD_FILE_PROGRESS', |
515 | + share_id=self.share_id, |
516 | + node_id=self.node_id, |
517 | + n_bytes_written=n_bytes_written, |
518 | + deflated_size=self.deflated_size) |
519 | + |
520 | def handle_success(self, _): |
521 | """ |
522 | It worked! Push the event. |
523 | |
524 | === modified file 'ubuntuone/syncdaemon/dbus_interface.py' |
525 | --- ubuntuone/syncdaemon/dbus_interface.py 2010-04-29 20:56:27 +0000 |
526 | +++ ubuntuone/syncdaemon/dbus_interface.py 2010-05-05 18:49:23 +0000 |
527 | @@ -276,6 +276,12 @@ |
528 | |
529 | @dbus.service.signal(DBUS_IFACE_STATUS_NAME, |
530 | signature='sa{ss}') |
531 | + def DownloadFileProgress(self, path, info): |
532 | + """Fire a D-BUS signal, notifying about a download progress.""" |
533 | + pass |
534 | + |
535 | + @dbus.service.signal(DBUS_IFACE_STATUS_NAME, |
536 | + signature='sa{ss}') |
537 | def DownloadFinished(self, path, info): |
538 | """ Fire a D-BUS signal, notifying a download has finished. """ |
539 | pass |
540 | @@ -287,6 +293,12 @@ |
541 | |
542 | @dbus.service.signal(DBUS_IFACE_STATUS_NAME, |
543 | signature='sa{ss}') |
544 | + def UploadFileProgress(self, path, info): |
545 | + """Fire a D-BUS signal, notifying about an upload progress.""" |
546 | + pass |
547 | + |
548 | + @dbus.service.signal(DBUS_IFACE_STATUS_NAME, |
549 | + signature='sa{ss}') |
550 | def UploadFinished(self, path, info): |
551 | """ Fire a D-BUS signal, notifying an upload has finished. """ |
552 | pass |
553 | @@ -356,8 +368,14 @@ |
554 | """ Emits the signal """ |
555 | self.DownloadStarted(download) |
556 | |
557 | + def emit_download_file_progress(self, download, **info): |
558 | + """Emits the signal.""" |
559 | + for k, v in info.copy().items(): |
560 | + info[str(k)] = str(v) |
561 | + self.DownloadFileProgress(download, info) |
562 | + |
563 | def emit_download_finished(self, download, **info): |
564 | - """ Emits the signal """ |
565 | + """Emits the signal.""" |
566 | for k, v in info.copy().items(): |
567 | info[str(k)] = str(v) |
568 | self.DownloadFinished(download, info) |
569 | @@ -366,6 +384,12 @@ |
570 | """ Emits the signal """ |
571 | self.UploadStarted(upload) |
572 | |
573 | + def emit_upload_file_progress(self, upload, **info): |
574 | + """Emits the signal.""" |
575 | + for k, v in info.copy().items(): |
576 | + info[str(k)] = str(v) |
577 | + self.UploadFileProgress(upload, info) |
578 | + |
579 | def emit_upload_finished(self, upload, **info): |
580 | """ Emits the signal """ |
581 | for k, v in info.copy().items(): |
582 | @@ -457,6 +481,28 @@ |
583 | node_id=str(node_id)) |
584 | self.dbus_iface.status.emit_signal_error('DownloadStarted', args) |
585 | |
586 | + def handle_AQ_DOWNLOAD_FILE_PROGRESS(self, share_id, node_id, |
587 | + n_bytes_read, deflated_size): |
588 | + """Handle AQ_DOWNLOAD_FILE_PROGRESS.""" |
589 | + self.handle_default('AQ_DOWNLOAD_FILE_PROGRESS', share_id, node_id, |
590 | + n_bytes_read, deflated_size) |
591 | + try: |
592 | + mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id) |
593 | + except KeyError, e: |
594 | + args = dict(message='The md is gone before sending ' |
595 | + 'DownloadFileProgress signal', |
596 | + error=str(e), |
597 | + share_id=str(share_id), |
598 | + node_id=str(node_id)) |
599 | + self.dbus_iface.status.emit_signal_error('DownloadFileProgress', |
600 | + args) |
601 | + else: |
602 | + path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path) |
603 | + self.dbus_iface.status.emit_download_file_progress(path, |
604 | + n_bytes_read=n_bytes_read, |
605 | + deflated_size=deflated_size |
606 | + ) |
607 | + |
608 | def handle_AQ_DOWNLOAD_FINISHED(self, share_id, node_id, server_hash): |
609 | """ handle AQ_DOWNLOAD_FINISHED """ |
610 | self.handle_default('AQ_DOWNLOAD_FINISHED', share_id, |
611 | @@ -519,6 +565,28 @@ |
612 | node_id=str(node_id)) |
613 | self.dbus_iface.status.emit_signal_error('UploadStarted', args) |
614 | |
615 | + def handle_AQ_UPLOAD_FILE_PROGRESS(self, share_id, node_id, |
616 | + n_bytes_written, deflated_size): |
617 | + """Handle AQ_UPLOAD_FILE_PROGRESS.""" |
618 | + self.handle_default('AQ_UPLOAD_FILE_PROGRESS', share_id, node_id, |
619 | + n_bytes_written, deflated_size) |
620 | + try: |
621 | + mdobj = self.dbus_iface.fs_manager.get_by_node_id(share_id, node_id) |
622 | + except KeyError, e: |
623 | + args = dict(message='The md is gone before sending ' |
624 | + 'UploadFileProgress signal', |
625 | + error=str(e), |
626 | + share_id=str(share_id), |
627 | + node_id=str(node_id)) |
628 | + self.dbus_iface.status.emit_signal_error('UploadFileProgress', |
629 | + args) |
630 | + else: |
631 | + path = self.dbus_iface.fs_manager.get_abspath(share_id, mdobj.path) |
632 | + self.dbus_iface.status.emit_upload_file_progress(path, |
633 | + n_bytes_written=n_bytes_written, |
634 | + deflated_size=deflated_size |
635 | + ) |
636 | + |
637 | def handle_AQ_UPLOAD_FINISHED(self, share_id, node_id, hash): |
638 | """ handle AQ_UPLOAD_FINISHED """ |
639 | self.handle_default('AQ_UPLOAD_FINISHED', share_id, node_id, hash) |
640 | |
641 | === modified file 'ubuntuone/syncdaemon/event_queue.py' |
642 | --- ubuntuone/syncdaemon/event_queue.py 2010-04-12 17:58:20 +0000 |
643 | +++ ubuntuone/syncdaemon/event_queue.py 2010-05-05 18:49:23 +0000 |
644 | @@ -53,12 +53,16 @@ |
645 | 'AQ_UNLINK_OK': ('share_id', 'parent_id', 'node_id'), |
646 | 'AQ_UNLINK_ERROR': ('share_id', 'parent_id', 'node_id', 'error'), |
647 | 'AQ_DOWNLOAD_STARTED': ('share_id', 'node_id', 'server_hash'), |
648 | + 'AQ_DOWNLOAD_FILE_PROGRESS': ('share_id', 'node_id', |
649 | + 'n_bytes_read', 'deflated_size'), |
650 | 'AQ_DOWNLOAD_COMMIT': ('share_id', 'node_id', 'server_hash'), |
651 | 'AQ_DOWNLOAD_FINISHED': ('share_id', 'node_id', 'server_hash'), |
652 | 'AQ_DOWNLOAD_ERROR': ('share_id', 'node_id', 'server_hash', 'error'), |
653 | 'AQ_DOWNLOAD_CANCELLED': ('share_id', 'node_id', 'server_hash'), |
654 | 'AQ_DOWNLOAD_DOES_NOT_EXIST': ('share_id', 'node_id'), |
655 | 'AQ_UPLOAD_STARTED' : ('share_id', 'node_id', 'hash'), |
656 | + 'AQ_UPLOAD_FILE_PROGRESS': ('share_id', 'node_id', |
657 | + 'n_bytes_written', 'deflated_size'), |
658 | 'AQ_UPLOAD_FINISHED': ('share_id', 'node_id', 'hash'), |
659 | 'AQ_UPLOAD_ERROR': ('share_id', 'node_id', 'error', 'hash'), |
660 | 'AQ_SHARES_LIST': ('shares_list',), |
* This docstring seems incorrect:
+class DownloadTestCas e(ConnectedBase TestCase) : and."""
+ """Test for ListDir ActionQueueComm
* class MyGetContent( GetContentMixin ) (and all the other auxiliar classes) need docstring, and the self shadows the self from the outter class.
* most of the (new) tests needs their docstring.
* use assertEqual instead of assertEquals (the latter will be deprecated with unittest2).
* PEP-8 indicates 2 blank lines between classes.
* the adding of new events to the EVENTS collection should be tested