Merge lp:~facundo/magicicada-server/dont-always-update-dbupload into lp:magicicada-server

Proposed by Facundo Batista
Status: Merged
Approved by: Facundo Batista
Approved revision: 40
Merged at revision: 39
Proposed branch: lp:~facundo/magicicada-server/dont-always-update-dbupload
Merge into: lp:magicicada-server
Diff against target: 120 lines (+55/-11)
2 files modified
src/server/content.py (+14/-3)
src/server/tests/test_content.py (+41/-8)
To merge this branch: bzr merge lp:~facundo/magicicada-server/dont-always-update-dbupload
Reviewer Review Type Date Requested Status
Facundo Batista Approve
Natalia Bidart Approve
Review via email: mp+277942@code.launchpad.net

Commit message

Only save progress for resumable upload every accumulated STORAGE_CHUNK_SIZE, not for every chunk.

Description of the change

Only save progress for resumable upload every accumulated STORAGE_CHUNK_SIZE, not for every chunk.

To post a comment you must log in.
Revision history for this message
Natalia Bidart (nataliabidart) wrote :

Branch looks good!

I would suggest renaming accumulated as "unsaved" or similar, but is just a nitpick.

review: Approve
40. By Facundo Batista

Better variable name.

Revision history for this message
Facundo Batista (facundo) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/server/content.py'
2--- src/server/content.py 2015-10-18 01:35:37 +0000
3+++ src/server/content.py 2015-12-05 15:25:28 +0000
4@@ -35,6 +35,7 @@
5
6 from backends.filesync import errors as dataerrors
7 from backends.filesync.models import Share
8+from magicicada import settings
9 from ubuntuone.storage.server import errors, upload
10 from ubuntuone.storageprotocol import protocol_pb2
11
12@@ -153,6 +154,9 @@
13 multipart_key, chunk_count, when_last_active):
14 self.__dict__ = locals()
15
16+ # will only update the DB with parts when accumulate over a trigger
17+ self.unsaved_count = 0
18+
19 @classmethod
20 def get(cls, user, volume_id, node_id, uploadjob_id, hash_value, crc32):
21 """Get a multipart upload job."""
22@@ -183,9 +187,16 @@
23
24 def add_part(self, chunk_size):
25 """Add a part to an upload job."""
26- kwargs = dict(user_id=self.user.id, volume_id=self.volume_id,
27- uploadjob_id=self.uploadjob_id, chunk_size=chunk_size)
28- return self.user.rpc_dal.call('add_part_to_uploadjob', **kwargs)
29+ self.unsaved_count += chunk_size
30+ if self.unsaved_count >= settings.api_server.STORAGE_CHUNK_SIZE:
31+ self.unsaved_count -= settings.api_server.STORAGE_CHUNK_SIZE
32+ kwargs = dict(user_id=self.user.id, volume_id=self.volume_id,
33+ uploadjob_id=self.uploadjob_id,
34+ chunk_size=chunk_size)
35+ d = self.user.rpc_dal.call('add_part_to_uploadjob', **kwargs)
36+ else:
37+ d = defer.succeed(True)
38+ return d
39
40 @defer.inlineCallbacks
41 def delete(self):
42
43=== modified file 'src/server/tests/test_content.py'
44--- src/server/tests/test_content.py 2015-10-18 01:35:37 +0000
45+++ src/server/tests/test_content.py 2015-12-05 15:25:28 +0000
46@@ -2117,12 +2117,7 @@
47 def auth(client):
48
49 def raise_quota(_):
50-
51- # we need a bigger quota for this test...
52- def _raise_quota(new_size):
53- self.usr0.update(max_storage_bytes=new_size)
54- d = threads.deferToThread(_raise_quota, size * 2)
55- return d
56+ self.usr0.update(max_storage_bytes=size * 2)
57
58 def check_content(content):
59 self.assertEqual(content.data, deflated_data)
60@@ -2186,6 +2181,43 @@
61 self.assertFails(d, 'NOT_AVAILABLE')
62 return d
63
64+ def test_deferred_add_part_to_uj(self):
65+ """Check that parts are added to upload job only after a limit."""
66+ size = int(settings.api_server.STORAGE_CHUNK_SIZE * 2.5)
67+ data = os.urandom(size)
68+ deflated_data = zlib.compress(data)
69+ hash_object = content_hash_factory()
70+ hash_object.update(data)
71+ hash_value = hash_object.content_hash()
72+ crc32_value = crc32(data)
73+ deflated_size = len(deflated_data)
74+
75+ recorded_calls = []
76+ orig_call = self.service.rpcdal_client.call
77+
78+ def recording_call(method, **parameters):
79+ if method == 'add_part_to_uploadjob':
80+ recorded_calls.append(parameters)
81+ return orig_call(method, **parameters)
82+
83+ self.service.rpcdal_client.call = recording_call
84+
85+ @defer.inlineCallbacks
86+ def test(client):
87+ yield client.dummy_authenticate("open sesame")
88+ yield self.usr0.update(max_storage_bytes=size * 2)
89+ root = yield client.get_root()
90+ mkfile_req = yield client.make_file(request.ROOT, root, "hola")
91+ putcontent_req = yield client.put_content_request(
92+ request.ROOT, mkfile_req.new_id, NO_CONTENT_HASH, hash_value,
93+ crc32_value, size, deflated_size, StringIO(deflated_data))
94+ yield putcontent_req.deferred
95+
96+ # check calls; there should be only 2, as size == chunk size * 2.5
97+ self.assertEqual(len(recorded_calls), 2)
98+
99+ return self.callback_test(test, add_default_callbacks=True)
100+
101
102 class UserTest(TestWithDatabase):
103 """Test User functionality."""
104@@ -3058,13 +3090,14 @@
105 def test_add_part(self):
106 """Test add_part method."""
107 dbuj = yield self._make_uj()
108- yield dbuj.add_part('chunk_size')
109+ chunk_size = int(settings.api_server.STORAGE_CHUNK_SIZE) + 1
110+ yield dbuj.add_part(chunk_size)
111
112 # check it called rpcdal correctly
113 method, attribs = self.user.recorded
114 self.assertEqual(method, 'add_part_to_uploadjob')
115 should = dict(user_id='fake_user_id', uploadjob_id='uploadjob_id',
116- chunk_size='chunk_size', volume_id='volume_id')
117+ chunk_size=chunk_size, volume_id='volume_id')
118 self.assertEqual(attribs, should)
119
120 @defer.inlineCallbacks

Subscribers

People subscribed via source and target branches

to all changes: