Merge lp:~ubuntuone-control-tower/ubuntuone-client/generations into lp:ubuntuone-client
- generations
- Merge into trunk
Proposed by
Facundo Batista
Status: | Merged |
---|---|
Approved by: | John Lenton |
Approved revision: | 591 |
Merged at revision: | 590 |
Proposed branch: | lp:~ubuntuone-control-tower/ubuntuone-client/generations |
Merge into: | lp:ubuntuone-client |
Diff against target: |
2461 lines (+906/-577) 11 files modified
contrib/testing/testcase.py (+8/-6) tests/syncdaemon/test_action_queue.py (+16/-189) tests/syncdaemon/test_dbus.py (+2/-2) tests/syncdaemon/test_sync.py (+323/-18) tests/syncdaemon/test_vm.py (+310/-77) ubuntuone/syncdaemon/action_queue.py (+31/-77) ubuntuone/syncdaemon/event_queue.py (+2/-7) ubuntuone/syncdaemon/main.py (+1/-3) ubuntuone/syncdaemon/sync.py (+127/-154) ubuntuone/syncdaemon/u1fsfsm.py (+14/-14) ubuntuone/syncdaemon/volume_manager.py (+72/-30) |
To merge this branch: | bzr merge lp:~ubuntuone-control-tower/ubuntuone-client/generations |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Lucio Torre (community) | Approve | ||
Review via email: mp+30651@code.launchpad.net |
Commit message
Merge a lot of 'generations' changes into trunk.
Description of the change
Merge a lot of 'generations' changes into trunk.
This branch is *big*, but note we've been reviewing and testing the different branches when merged into it.
To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote : | # |
This breaks trunk, and that's OK.
<cue lumberjack sketch>
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'contrib/testing/testcase.py' |
2 | --- contrib/testing/testcase.py 2010-07-07 18:51:07 +0000 |
3 | +++ contrib/testing/testcase.py 2010-07-22 13:39:48 +0000 |
4 | @@ -201,8 +201,13 @@ |
5 | """Stub implementation.""" |
6 | |
7 | def query_volumes(self): |
8 | - """Stub implementation""" |
9 | - |
10 | + """Stub implementation.""" |
11 | + |
12 | + def get_delta(self, volume_id, generation): |
13 | + """Stub implementation.""" |
14 | + |
15 | + def rescan_from_scratch(self, volume_id): |
16 | + """Stub implementation.""" |
17 | |
18 | class FakeMain(main.Main): |
19 | """ A fake Main class to setup the tests """ |
20 | @@ -558,9 +563,6 @@ |
21 | """Add udf to the udfs dict.""" |
22 | self.udfs[udf.id] = udf |
23 | |
24 | - def handle_SYS_ROOT_RECEIVED(self, root_id): |
25 | - """Do nothing.""" |
26 | - |
27 | def share_deleted(self, _): |
28 | """Do nothing.""" |
29 | |
30 | @@ -683,7 +685,7 @@ |
31 | class DummyClass(object): |
32 | """Dummy class, does nothing.""" |
33 | |
34 | - def __getattribute__(self, name): |
35 | + def __getattr__(self, name): |
36 | """Any attribute is a no-op.""" |
37 | return lambda *args, **kwargs: None |
38 | |
39 | |
40 | === modified file 'tests/syncdaemon/test_action_queue.py' |
41 | --- tests/syncdaemon/test_action_queue.py 2010-07-19 20:15:06 +0000 |
42 | +++ tests/syncdaemon/test_action_queue.py 2010-07-22 13:39:48 +0000 |
43 | @@ -43,18 +43,17 @@ |
44 | from contrib.mocker import Mocker |
45 | |
46 | from ubuntuone.storageprotocol import ( |
47 | - client, errors, protocol_pb2, volumes, request |
48 | + client, errors, protocol_pb2, request |
49 | ) |
50 | from ubuntuone.syncdaemon import states |
51 | from ubuntuone.syncdaemon.action_queue import ( |
52 | ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF, |
53 | DeleteVolume, Download, GetContentMixin, ListDir, ListVolumes, |
54 | NoisyRequestQueue, RequestQueue, UploadProgressWrapper, Upload, |
55 | - CreateShare, GetPublicFiles, VolumesQuery, GetDelta, |
56 | + CreateShare, GetPublicFiles, GetDelta, |
57 | TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir, |
58 | ) |
59 | from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS |
60 | -from ubuntuone.syncdaemon.volume_manager import UDF |
61 | |
62 | |
63 | PATH = u'~/Documents/pdfs/moño/' |
64 | @@ -789,8 +788,6 @@ |
65 | # callbacks are connected |
66 | # pylint: disable-msg=W0212 |
67 | aq = self.action_queue |
68 | - self.assertEqual(aq.client._node_state_callback, |
69 | - aq._node_state_callback) |
70 | self.assertEqual(aq.client._share_change_callback, |
71 | aq._share_change_callback) |
72 | self.assertEqual(aq.client._share_answer_callback, |
73 | @@ -1279,60 +1276,6 @@ |
74 | self.assertTrue(res is None) |
75 | |
76 | |
77 | -class VolumesQueryTestCase(ConnectedBaseTestCase): |
78 | - """Test for VolumesQuery ActionQueueCommand.""" |
79 | - |
80 | - def setUp(self): |
81 | - """Init.""" |
82 | - res = super(VolumesQueryTestCase, self).setUp() |
83 | - |
84 | - request_queue = RequestQueue(name='fu', action_queue=self.action_queue) |
85 | - self.command = VolumesQuery(request_queue) |
86 | - |
87 | - return res |
88 | - |
89 | - def test_is_action_queue_command(self): |
90 | - """Test proper inheritance.""" |
91 | - self.assertTrue(isinstance(self.command, ActionQueueCommand)) |
92 | - self.assertTrue(isinstance(self.command, ListVolumes)) |
93 | - |
94 | - def test_run_returns_a_deferred(self): |
95 | - """Test a deferred is returned.""" |
96 | - res = self.command._run() |
97 | - self.assertTrue(isinstance(res, defer.Deferred), 'deferred returned') |
98 | - |
99 | - def test_run_calls_protocol(self): |
100 | - """Test protocol's list_volumes is called.""" |
101 | - original = self.command.action_queue.client.list_volumes |
102 | - self.called = False |
103 | - |
104 | - def check(): |
105 | - """Take control over client's feature.""" |
106 | - self.called = True |
107 | - |
108 | - self.command.action_queue.client.list_volumes = check |
109 | - |
110 | - self.command._run() |
111 | - |
112 | - self.assertTrue(self.called, 'command was called') |
113 | - |
114 | - self.command.action_queue.client.list_volumes = original |
115 | - |
116 | - def test_handle_success(self): |
117 | - """Test AQ_LIST_VOLUMES is pushed on success.""" |
118 | - request = client.ListVolumes(self.action_queue.client) |
119 | - request.volumes = [FakedVolume(), FakedVolume()] |
120 | - res = self.command.handle_success(success=request) |
121 | - self.assertEqual(request.volumes, res) |
122 | - |
123 | - def test_handle_failure(self): |
124 | - """Test AQ_LIST_VOLUMES_ERROR is pushed on failure.""" |
125 | - msg = 'Something went wrong' |
126 | - failure = Failure(DefaultException(msg)) |
127 | - res = self.command.handle_failure(failure=failure) |
128 | - self.assertEqual(res, failure) |
129 | - |
130 | - |
131 | class DeleteVolumeTestCase(ConnectedBaseTestCase): |
132 | """Test for DeleteVolume ActionQueueCommand.""" |
133 | |
134 | @@ -1419,52 +1362,6 @@ |
135 | |
136 | BasicTestCase.tearDown(self) |
137 | |
138 | - @defer.inlineCallbacks |
139 | - def test_SV_HASH_NEW_is_pushed_for_subscrined_volume(self): |
140 | - """SV_HASH_NEW is filtered when the volume is unsubscribed.""" |
141 | - udf_id = 'udf_id' |
142 | - udf_volume = volumes.UDFVolume(udf_id, 'udf_node', None, |
143 | - 10, u'~/ñoño') |
144 | - path = self.vm._build_udf_path(udf_volume.suggested_path) |
145 | - udf = UDF.from_udf_volume(udf_volume, path) |
146 | - yield self.vm.add_udf(udf) |
147 | - yield self.vm.subscribe_udf(udf_id) |
148 | - assert self.vm.udfs[udf_id].subscribed |
149 | - self.action_queue.event_queue.events = [] # reset events |
150 | - |
151 | - kwargs = dict(share_id=udf_id, node_id=NODE, hash=None) |
152 | - self.action_queue._node_state_callback(**kwargs) |
153 | - self.assertEqual([('SV_HASH_NEW', (), kwargs)], |
154 | - self.action_queue.event_queue.events) |
155 | - |
156 | - @defer.inlineCallbacks |
157 | - def test_SV_HASH_NEW_is_filtered_for_unsubscribed_volume(self): |
158 | - """SV_HASH_NEW is filtered when the volume is unsubscribed.""" |
159 | - # build a VM and add it an UDF with subscribed to False |
160 | - udf_id = 'udf_id' |
161 | - udf_volume = volumes.UDFVolume(udf_id, 'udf_node', None, 10, u'~/ñoño') |
162 | - path = self.vm._build_udf_path(udf_volume.suggested_path) |
163 | - udf = UDF.from_udf_volume(udf_volume, path) |
164 | - yield self.vm.add_udf(udf) |
165 | - yield self.vm.unsubscribe_udf(udf_id) |
166 | - assert not self.vm.udfs[udf_id].subscribed |
167 | - self.action_queue.event_queue.events = [] # reset events |
168 | - |
169 | - self.action_queue._node_state_callback(share_id=udf_id, |
170 | - node_id=None, hash=None) |
171 | - self.assertEqual([], self.action_queue.event_queue.events) |
172 | - |
173 | - def test_SV_HASH_NEW_doesnt_fail_for_non_udf(self): |
174 | - """SV_HASH_NEW keeps working like before for non-udfs.""" |
175 | - other_id = 'not in udfs' |
176 | - assert other_id not in self.vm.udfs |
177 | - self.action_queue.event_queue.events = [] # reset events |
178 | - |
179 | - kwargs = dict(share_id=other_id, node_id=NODE, hash=None) |
180 | - self.action_queue._node_state_callback(**kwargs) |
181 | - self.assertEqual([('SV_HASH_NEW', (), kwargs)], |
182 | - self.action_queue.event_queue.events) |
183 | - |
184 | |
185 | class ChangePublicAccessTests(ConnectedBaseTestCase): |
186 | """Tests for the ChangePublicAccess ActionQueueCommand.""" |
187 | @@ -2222,19 +2119,23 @@ |
188 | class SimpleAQTestCase(BasicTestCase): |
189 | """Simple tests for AQ API.""" |
190 | |
191 | - def test_aq_server_rescan(self): |
192 | - """Check the API of AQ.server_rescan.""" |
193 | + def test_aq_query_volumes(self): |
194 | + """Check the API of AQ.query_volumes.""" |
195 | self.main.start() |
196 | d = defer.Deferred() |
197 | - def get_root(mdid): |
198 | - """Fake get_root.""" |
199 | - d.callback(mdid) |
200 | + def list_volumes(): |
201 | + """Fake list_volumes.""" |
202 | + result = DummyClass() |
203 | + result.volumes = ['foo', 'bar'] |
204 | + return defer.succeed(result) |
205 | |
206 | self.action_queue.client = DummyClass() |
207 | - self.action_queue.get_root = get_root |
208 | - self.action_queue.server_rescan('foo', lambda: list()) |
209 | + self.action_queue.client.list_volumes = list_volumes |
210 | + d = self.action_queue.query_volumes() |
211 | def check(result): |
212 | - self.assertEqual('foo', result) |
213 | + self.assertIn('foo', result) |
214 | + self.assertIn('bar', result) |
215 | + return result |
216 | d.addCallback(check) |
217 | return d |
218 | |
219 | @@ -2783,81 +2684,6 @@ |
220 | # assert internal deferred was fired |
221 | self.assertTrue(self.action_queue.deferred.called) |
222 | |
223 | - @defer.inlineCallbacks |
224 | - def test_server_rescan_as_a_whole(self): |
225 | - """Test error handling after server_rescan with no error.""" |
226 | - |
227 | - def faked_get_root(marker): |
228 | - """Fake the action_queue.get_root.""" |
229 | - root_id=object() |
230 | - self.action_queue.event_queue.push('SYS_ROOT_RECEIVED', |
231 | - root_id=root_id) |
232 | - return root_id |
233 | - |
234 | - self.patch(self.action_queue, 'get_root', faked_get_root) |
235 | - |
236 | - self.action_queue.client.query = \ |
237 | - self.succeed_please(result=self.action_queue.client) |
238 | - yield self.action_queue.server_rescan(root_mdid=object(), |
239 | - data_gen=list) |
240 | - event = ('SYS_SERVER_RESCAN_DONE', (), {}) |
241 | - self.assertEqual(event, self.action_queue.event_queue.events[-1]) |
242 | - |
243 | - # assert internal deferred wasn't fired |
244 | - self.assertFalse(self.action_queue.deferred.called) |
245 | - |
246 | - @defer.inlineCallbacks |
247 | - def test_server_rescan_when_get_root_fails(self): |
248 | - """Test error handling after server_rescan when get_root fails.""" |
249 | - |
250 | - msg = protocol_pb2.Message() |
251 | - msg.type = protocol_pb2.Message.ERROR |
252 | - msg.error.type = protocol_pb2.Error.PROTOCOL_ERROR |
253 | - msg.error.comment = 'get_root failed' |
254 | - exc = errors.StorageRequestError(request=None, message=msg) |
255 | - self.patch(self.action_queue, 'get_root', self.fail_please(exc)) |
256 | - |
257 | - self.action_queue.client.query = self.fail_please( |
258 | - NotImplementedError()) |
259 | - |
260 | - yield self.action_queue.server_rescan(root_mdid=object(), |
261 | - data_gen=list) |
262 | - |
263 | - event = ('SYS_SERVER_RESCAN_DONE', (), {}) |
264 | - self.assertNotIn(event, self.action_queue.event_queue.events) |
265 | - |
266 | - event = ('SYS_SERVER_ERROR', (), {'error': str(exc)}) |
267 | - self.assertEqual(event, self.action_queue.event_queue.events[-1]) |
268 | - |
269 | - # assert internal deferred wasn't fired |
270 | - self.assertFalse(self.action_queue.deferred.called) |
271 | - |
272 | - @defer.inlineCallbacks |
273 | - def test_server_rescan_when_query_fails(self): |
274 | - """Test error handling after server_rescan when query fails.""" |
275 | - |
276 | - self.patch(self.action_queue, 'get_root', |
277 | - self.succeed_please(result=object())) |
278 | - |
279 | - msg = protocol_pb2.Message() |
280 | - msg.type = protocol_pb2.Message.ERROR |
281 | - msg.error.type = protocol_pb2.Error.PROTOCOL_ERROR |
282 | - msg.error.comment = 'query failed' |
283 | - exc = errors.StorageRequestError(request=None, message=msg) |
284 | - self.action_queue.client.query = self.fail_please(exc) |
285 | - |
286 | - yield self.action_queue.server_rescan(root_mdid=object(), |
287 | - data_gen=list) |
288 | - |
289 | - event = ('SYS_SERVER_RESCAN_DONE', (), {}) |
290 | - self.assertNotIn(event, self.action_queue.event_queue.events) |
291 | - |
292 | - event = ('SYS_SERVER_ERROR', (), {'error': str(exc)}) |
293 | - self.assertEqual(event, self.action_queue.event_queue.events[-1]) |
294 | - |
295 | - # assert internal deferred wasn't fired |
296 | - self.assertFalse(self.action_queue.deferred.called) |
297 | - |
298 | |
299 | class GetDeltaTestCase(ConnectedBaseTestCase): |
300 | """Test for GetDelta ActionQueueCommand.""" |
301 | @@ -2912,7 +2738,8 @@ |
302 | |
303 | # check for successful event |
304 | received = self.action_queue.event_queue.events[0] |
305 | - delta_info = dict(delta_content=['foo', 'bar'], end_generation=76, |
306 | + delta_info = dict(volume_id=VOLUME, delta_content=['foo', 'bar'], |
307 | + end_generation=76, |
308 | full=True, free_bytes=1231234) |
309 | self.assertEqual(received, ('AQ_DELTA_OK', (), delta_info)) |
310 | |
311 | |
312 | === modified file 'tests/syncdaemon/test_dbus.py' |
313 | --- tests/syncdaemon/test_dbus.py 2010-07-08 21:52:23 +0000 |
314 | +++ tests/syncdaemon/test_dbus.py 2010-07-22 13:39:48 +0000 |
315 | @@ -1508,8 +1508,8 @@ |
316 | self.assertEquals('root_id', root_id) |
317 | self.assertEquals('another_root_id', new_root_id) |
318 | d.addCallback(check) |
319 | - self.event_q.push('SYS_ROOT_RECEIVED', 'root_id') |
320 | - self.event_q.push('SYS_ROOT_RECEIVED', 'another_root_id') |
321 | + self.main.vm._got_root('root_id') |
322 | + self.main.vm._got_root('another_root_id') |
323 | return d |
324 | |
325 | def test_public_files_list(self): |
326 | |
327 | === modified file 'tests/syncdaemon/test_sync.py' |
328 | --- tests/syncdaemon/test_sync.py 2010-07-20 16:17:58 +0000 |
329 | +++ tests/syncdaemon/test_sync.py 2010-07-22 13:39:48 +0000 |
330 | @@ -19,6 +19,7 @@ |
331 | |
332 | from __future__ import with_statement |
333 | |
334 | +import copy |
335 | import logging |
336 | import os |
337 | import shutil |
338 | @@ -43,6 +44,8 @@ |
339 | from ubuntuone.syncdaemon.volume_manager import Share |
340 | from ubuntuone.syncdaemon.event_queue import EventQueue |
341 | from ubuntuone.storageprotocol.request import ROOT |
342 | +from ubuntuone.storageprotocol import delta |
343 | +from ubuntuone.syncdaemon.marker import MDMarker |
344 | |
345 | DBusInterface.test = True |
346 | |
347 | @@ -312,7 +315,7 @@ |
348 | |
349 | # send the event and check |
350 | mdobj = fsm.get_by_mdid(mdid) |
351 | - sync.handle_SV_HASH_NEW(mdobj.share_id, mdobj.node_id, '') # no content |
352 | + sync._handle_SV_HASH_NEW(mdobj.share_id, mdobj.node_id, '') # no content |
353 | self.assertTrue(self.called) |
354 | |
355 | |
356 | @@ -464,81 +467,81 @@ |
357 | def test_handle_AQ_UNLINK_OK(self): |
358 | """Test that AQ_UNLINK_OK calls the generation handler.""" |
359 | called = [] |
360 | - self.patch(SyncStateMachineRunner, 'check_new_volume_generation', |
361 | + self.patch(SyncStateMachineRunner, 'update_generation', |
362 | lambda s, *a: called.append(a)) |
363 | |
364 | d = dict(share_id='volume_id', node_id='node_id', parent_id='parent', |
365 | new_generation=77) |
366 | self.sync.handle_AQ_UNLINK_OK(**d) |
367 | - self.assertEqual(called, [('volume_id', 77)]) |
368 | + self.assertEqual(called, [('volume_id', "node_id", 77)]) |
369 | |
370 | def test_handle_AQ_MOVE_OK(self): |
371 | """Test that AQ_MOVE_OK calls the generation handler.""" |
372 | called = [] |
373 | - self.patch(SyncStateMachineRunner, 'check_new_volume_generation', |
374 | + self.patch(SyncStateMachineRunner, 'update_generation', |
375 | lambda s, *a: called.append(a)) |
376 | |
377 | d = dict(share_id='volume_id', node_id='node_id', new_generation=32) |
378 | self.sync.handle_AQ_MOVE_OK(**d) |
379 | - self.assertEqual(called, [('volume_id', 32)]) |
380 | + self.assertEqual(called, [('volume_id', "node_id", 32)]) |
381 | |
382 | def test_handle_AQ_UPLOAD_FINISHED(self): |
383 | """Test that AQ_UPLOAD_FINISHED calls the generation handler.""" |
384 | called = [] |
385 | - self.patch(SyncStateMachineRunner, 'check_new_volume_generation', |
386 | + self.patch(SyncStateMachineRunner, 'update_generation', |
387 | lambda s, *a: called.append(a)) |
388 | |
389 | d = dict(share_id='volume_id', node_id='node_id', |
390 | hash='hash', new_generation=15) |
391 | self.sync.handle_AQ_UPLOAD_FINISHED(**d) |
392 | - self.assertEqual(called, [('volume_id', 15)]) |
393 | + self.assertEqual(called, [('volume_id', "node_id", 15)]) |
394 | |
395 | def test_handle_AQ_FILE_NEW_OK(self): |
396 | """Test that AQ_FILE_NEW_OK calls the generation handler.""" |
397 | called = [] |
398 | - self.patch(SyncStateMachineRunner, 'check_new_volume_generation', |
399 | + self.patch(SyncStateMachineRunner, 'update_generation', |
400 | lambda s, *a: called.append(a)) |
401 | |
402 | d = dict(marker='mdid', new_id='new_id', new_generation=12, |
403 | volume_id=ROOT) |
404 | self.sync.handle_AQ_FILE_NEW_OK(**d) |
405 | - self.assertEqual(called, [(ROOT, 12)]) |
406 | + self.assertEqual(called, [(ROOT, "new_id", 12)]) |
407 | |
408 | def test_handle_AQ_DIR_NEW_OK(self): |
409 | """Test that AQ_DIR_NEW_OK calls the generation handler.""" |
410 | called = [] |
411 | - self.patch(SyncStateMachineRunner, 'check_new_volume_generation', |
412 | + self.patch(SyncStateMachineRunner, 'update_generation', |
413 | lambda s, *a: called.append(a)) |
414 | |
415 | d = dict(marker='mdid', new_id='new_id', new_generation=17, |
416 | volume_id=ROOT) |
417 | self.sync.handle_AQ_DIR_NEW_OK(**d) |
418 | - self.assertEqual(called, [(ROOT, 17)]) |
419 | + self.assertEqual(called, [(ROOT, "new_id", 17)]) |
420 | |
421 | def test_checknewvol_no_volume(self): |
422 | """Log warning if volume does not exist.""" |
423 | not_existant_vol = str(uuid.uuid4()) |
424 | - self.ssmr.check_new_volume_generation(not_existant_vol, 77) |
425 | + self.ssmr.update_generation(not_existant_vol, "node_id", 77) |
426 | self.assertTrue(self.handler.check_warning('Volume not found')) |
427 | |
428 | def test_checknewvol_smaller_gen(self): |
429 | """Only log debug if new generation smaller than current.""" |
430 | self.vm.update_generation(ROOT, 15) |
431 | - self.ssmr.check_new_volume_generation(ROOT, 14) |
432 | + self.ssmr.update_generation(ROOT, "node_id", 14) |
433 | self.assertTrue(self.handler.check_info( |
434 | 'Got smaller or equal generation')) |
435 | |
436 | def test_checknewvol_same_gen(self): |
437 | """Only log debug if new generation equal than current.""" |
438 | self.vm.update_generation(ROOT, 15) |
439 | - self.ssmr.check_new_volume_generation(ROOT, 15) |
440 | + self.ssmr.update_generation(ROOT, "node_id", 15) |
441 | self.assertTrue(self.handler.check_info( |
442 | 'Got smaller or equal generation')) |
443 | |
444 | def test_checknewvol_gen_current_plus_one(self): |
445 | """Set new volume generation if current plus one.""" |
446 | self.vm.update_generation(ROOT, 15) |
447 | - self.ssmr.check_new_volume_generation(ROOT, 16) |
448 | + self.ssmr.update_generation(ROOT, "node_id", 16) |
449 | self.assertEqual(self.vm.get_volume(ROOT).generation, 16) |
450 | self.assertTrue(self.handler.check_info('Updating current generation')) |
451 | |
452 | @@ -550,7 +553,7 @@ |
453 | self.vm.update_generation(ROOT, 15) |
454 | |
455 | # call the method |
456 | - self.ssmr.check_new_volume_generation(ROOT, 17) |
457 | + self.ssmr.update_generation(ROOT, "node_id", 17) |
458 | |
459 | # check that generation didn't change, we asked for delta, and logged |
460 | self.assertEqual(self.vm.get_volume(ROOT).generation, 15) |
461 | @@ -560,13 +563,315 @@ |
462 | def test_checknewvol_new_gen_is_None(self): |
463 | """Log warning if volume does not exist.""" |
464 | self.vm.update_generation(ROOT, 1) |
465 | - self.ssmr.check_new_volume_generation(ROOT, None) |
466 | + self.ssmr.update_generation(ROOT, "node_id", None) |
467 | self.assertTrue(self.handler.check_debug( |
468 | 'Client not ready for generations')) |
469 | |
470 | def test_checknewvol_volume_gen_is_None(self): |
471 | """Log warning if volume does not exist.""" |
472 | assert self.vm.get_volume(ROOT).generation is None |
473 | - self.ssmr.check_new_volume_generation(ROOT, 15) |
474 | + self.ssmr.update_generation(ROOT, "node_id", 15) |
475 | self.assertTrue(self.handler.check_debug( |
476 | 'Client not ready for generations')) |
477 | + |
478 | + def test_check_generation_on_node_set(self): |
479 | + """Check that we update the generation of the node.""" |
480 | + # create the fake file |
481 | + self.main.vm._got_root("parent_id") |
482 | + self.sync._handle_SV_FILE_NEW(ROOT, "node_id", "parent_id", "file") |
483 | + |
484 | + # update generation |
485 | + self.ssmr.update_generation(ROOT, "node_id", 15) |
486 | + |
487 | + # test |
488 | + node = self.main.fs.get_by_node_id(ROOT, "node_id") |
489 | + self.assertEqual(node.generation, 15) |
490 | + |
491 | + def test_check_generation_on_node_set_wont_fail(self): |
492 | + """Check that if there is no node we dont fail.""" |
493 | + # update generation |
494 | + self.ssmr.update_generation(ROOT, "node_id", 15) |
495 | + |
496 | + def test_save_generation_after_seting_node_id(self): |
497 | + """Test that we call update_generation after the ssmr handler.""" |
498 | + root_id = uuid.uuid4() |
499 | + self.main.vm._got_root(root_id) |
500 | + mdobj = self.main.fs.get_by_node_id(ROOT, root_id) |
501 | + path = os.path.join( |
502 | + self.main.fs.get_abspath(ROOT, mdobj.path), "file") |
503 | + self.main.fs.create(path=path, share_id=ROOT, is_dir=False) |
504 | + node = self.main.fs.get_by_path(path) |
505 | + d = dict(marker=MDMarker(node.mdid), |
506 | + new_id='new_id', new_generation=12, |
507 | + volume_id=ROOT) |
508 | + self.sync.handle_AQ_FILE_NEW_OK(**d) |
509 | + |
510 | + # test |
511 | + node = self.main.fs.get_by_node_id(ROOT, "new_id") |
512 | + self.assertEqual(node.generation, 12) |
513 | + |
514 | +class TestHandleAqDeltaOk(BaseSync): |
515 | + """Sync.handle_AQ_DELTA_OK handles the recepcion of a new delta and applies |
516 | + all the changes that came from it.""" |
517 | + |
518 | + def setUp(self): |
519 | + """Do the setUp.""" |
520 | + |
521 | + super(TestHandleAqDeltaOk, self).setUp() |
522 | + self.sync = Sync(main=self.main) |
523 | + root_id = uuid.uuid4() |
524 | + self.main.vm._got_root(root_id) |
525 | + |
526 | + self.filetxtdelta = delta.FileInfoDelta( |
527 | + generation=5, is_live=True, file_type=delta.FILE, |
528 | + parent_id=root_id, share_id=ROOT, node_id=uuid.uuid4(), |
529 | + name="file.txt", is_public=False, content_hash="hash", |
530 | + crc32=1, size=10, last_modified=0) |
531 | + |
532 | + self.dirdelta = delta.FileInfoDelta( |
533 | + generation=6, is_live=True, file_type=delta.DIRECTORY, |
534 | + parent_id=root_id, share_id=ROOT, node_id=uuid.uuid4(), |
535 | + name="directory", is_public=False, content_hash="hash", |
536 | + crc32=1, size=10, last_modified=0) |
537 | + |
538 | + def create_filetxt(self): |
539 | + """Create a file based on self.filetxtdelta.""" |
540 | + |
541 | + dt = self.filetxtdelta |
542 | + mdobj = self.main.fs.get_by_node_id(dt.share_id, dt.parent_id) |
543 | + path = os.path.join( |
544 | + self.main.fs.get_abspath(dt.share_id, mdobj.path), dt.name) |
545 | + self.main.fs.create( |
546 | + path=path, share_id=dt.share_id, node_id=dt.node_id, |
547 | + is_dir=False) |
548 | + node = self.main.fs.get_by_node_id(dt.share_id, dt.node_id) |
549 | + self.main.fs.set_by_mdid(node.mdid, generation=dt.generation) |
550 | + |
551 | + def create_dir(self): |
552 | + """Create a directory based on self.dirdelta.""" |
553 | + |
554 | + dt = self.dirdelta |
555 | + mdobj = self.main.fs.get_by_node_id(dt.share_id, dt.parent_id) |
556 | + path = os.path.join( |
557 | + self.main.fs.get_abspath(dt.share_id, mdobj.path), dt.name) |
558 | + self.main.fs.create( |
559 | + path=path, share_id=dt.share_id, node_id=dt.node_id, |
560 | + is_dir=True) |
561 | + node = self.main.fs.get_by_node_id(dt.share_id, dt.node_id) |
562 | + self.main.fs.set_by_mdid(node.mdid, generation=dt.generation) |
563 | + |
564 | + def test_not_full(self): |
565 | + """If we dont have a full delta, we need to ask for another one.""" |
566 | + sync = Sync(main=self.main) |
567 | + called = [] |
568 | + self.main.action_q.get_delta = lambda *a: called.append(a) |
569 | + |
570 | + kwargs = dict(volume_id=ROOT, delta_content=[], end_generation=11, |
571 | + full=False, free_bytes=0) |
572 | + sync.handle_AQ_DELTA_OK(**kwargs) |
573 | + |
574 | + self.assertEqual(called, [(ROOT, 11)]) |
575 | + |
576 | + def test_free_bytes_set(self): |
577 | + """The volume gets the free bytes set.""" |
578 | + sync = Sync(main=self.main) |
579 | + |
580 | + kwargs = dict(volume_id=ROOT, delta_content=[], end_generation=11, |
581 | + full=True, free_bytes=10) |
582 | + sync.handle_AQ_DELTA_OK(**kwargs) |
583 | + |
584 | + self.assertEqual(self.main.vm.get_volume(ROOT).free_bytes, 10) |
585 | + |
586 | + def test_end_generation_set(self): |
587 | + """The volume gets the end generation set.""" |
588 | + sync = Sync(main=self.main) |
589 | + |
590 | + kwargs = dict(volume_id=ROOT, delta_content=[], end_generation=11, |
591 | + full=True, free_bytes=10) |
592 | + sync.handle_AQ_DELTA_OK(**kwargs) |
593 | + |
594 | + self.assertEqual(self.main.vm.get_volume(ROOT).generation, 11) |
595 | + |
596 | + def test_node_generation_older_skip(self): |
597 | + """The node does not get the new generation set.""" |
598 | + self.create_filetxt() |
599 | + |
600 | + dt2 = copy.copy(self.filetxtdelta) |
601 | + dt2.generation = self.filetxtdelta.generation - 1 |
602 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
603 | + full=True, free_bytes=10) |
604 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
605 | + |
606 | + node = self.main.fs.get_by_node_id(ROOT, self.filetxtdelta.node_id) |
607 | + self.assertEqual(node.generation, self.filetxtdelta.generation) |
608 | + |
609 | + def test_new_file(self): |
610 | + """Make sure a live file in the delta is in fs after executed.""" |
611 | + deltas = [ self.filetxtdelta ] |
612 | + |
613 | + kwargs = dict(volume_id=ROOT, delta_content=deltas, end_generation=11, |
614 | + full=True, free_bytes=10) |
615 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
616 | + |
617 | + # check that the file is created |
618 | + node = self.main.fs.get_by_node_id(ROOT, self.filetxtdelta.node_id) |
619 | + self.assertEqual(node.path, self.filetxtdelta.name) |
620 | + self.assertEqual(node.is_dir, False) |
621 | + self.assertEqual(node.generation, self.filetxtdelta.generation) |
622 | + |
623 | + def test_existing_file_still_there(self): |
624 | + """A file will still exist after a delta arrives.""" |
625 | + self.create_filetxt() |
626 | + |
627 | + # send a new delta |
628 | + dt2 = copy.copy(self.filetxtdelta) |
629 | + dt2.generation = 8 |
630 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
631 | + full=True, free_bytes=10) |
632 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
633 | + |
634 | + # check that the file is created |
635 | + node = self.main.fs.get_by_node_id(ROOT, self.filetxtdelta.node_id) |
636 | + self.assertEqual(node.generation, dt2.generation) |
637 | + |
638 | + def test_existing_file_dead(self): |
639 | + """The handler for SV_FILE_DELETED is called""" |
640 | + # send a new delta |
641 | + dt2 = copy.copy(self.filetxtdelta) |
642 | + dt2.is_live = False |
643 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
644 | + full=True, free_bytes=10) |
645 | + called = [] |
646 | + self.sync._handle_SV_FILE_DELETED = \ |
647 | + lambda *args, **kwargs: called.append((args, kwargs)) |
648 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
649 | + |
650 | + # check that the handler is created |
651 | + self.assertTrue(called) |
652 | + |
653 | + def test_new_dir(self): |
654 | + """Make sure a live dir in the delta is in fs after executed.""" |
655 | + |
656 | + deltas = [ self.dirdelta ] |
657 | + |
658 | + kwargs = dict(volume_id=ROOT, delta_content=deltas, end_generation=11, |
659 | + full=True, free_bytes=10) |
660 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
661 | + |
662 | + # check that the dir is created |
663 | + node = self.main.fs.get_by_node_id(ROOT, self.dirdelta.node_id) |
664 | + self.assertEqual(node.path, self.dirdelta.name) |
665 | + self.assertEqual(node.is_dir, True) |
666 | + self.assertEqual(node.generation, self.dirdelta.generation) |
667 | + |
668 | + def test_sv_hash_new_called_for_file(self): |
669 | + """The handler for SV_HASH_NEW is called""" |
670 | + self.create_filetxt() |
671 | + |
672 | + # send a new delta |
673 | + dt2 = copy.copy(self.filetxtdelta) |
674 | + dt2.generation = self.filetxtdelta.generation + 1 |
675 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
676 | + full=True, free_bytes=10) |
677 | + called = [] |
678 | + self.sync._handle_SV_HASH_NEW = \ |
679 | + lambda *args, **kwargs: called.append((args, kwargs)) |
680 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
681 | + |
682 | + # check that the handler is created |
683 | + self.assertTrue(called) |
684 | + |
685 | + def test_sv_hash_new_not_called_for_dir(self): |
686 | + """The handler for SV_HASH_NEW is not called""" |
687 | + self.create_dir() |
688 | + |
689 | + # send a new delta |
690 | + dt2 = copy.copy(self.dirdelta) |
691 | + dt2.generation = self.dirdelta.generation + 1 |
692 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
693 | + full=True, free_bytes=10) |
694 | + called = [] |
695 | + self.sync._handle_SV_HASH_NEW = \ |
696 | + lambda *args, **kwargs: called.append((args, kwargs)) |
697 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
698 | + |
699 | + # check that the handler is created |
700 | + self.assertFalse(called) |
701 | + |
702 | + def test_sv_moved_called(self): |
703 | + """The handler for SV_MOVED is called""" |
704 | + self.create_dir() |
705 | + self.create_filetxt() |
706 | + |
707 | + # send a new delta |
708 | + dt2 = copy.copy(self.filetxtdelta) |
709 | + dt2.generation = self.dirdelta.generation + 1 |
710 | + dt2.parent_id = self.dirdelta.node_id |
711 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
712 | + full=True, free_bytes=10) |
713 | + called = [] |
714 | + self.sync._handle_SV_MOVED = \ |
715 | + lambda *args, **kwargs: called.append((args, kwargs)) |
716 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
717 | + |
718 | + # check that the handler is created |
719 | + self.assertTrue(called) |
720 | + |
721 | + def test_sv_moved_called_name(self): |
722 | + """The handler for SV_MOVED is called""" |
723 | + self.create_dir() |
724 | + self.create_filetxt() |
725 | + |
726 | + # send a new delta |
727 | + dt2 = copy.copy(self.filetxtdelta) |
728 | + dt2.generation = self.dirdelta.generation + 1 |
729 | + dt2.name = "newname" |
730 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
731 | + full=True, free_bytes=10) |
732 | + called = [] |
733 | + self.sync._handle_SV_MOVED = \ |
734 | + lambda *args, **kwargs: called.append((args, kwargs)) |
735 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
736 | + |
737 | + # check that the handler is created |
738 | + self.assertTrue(called) |
739 | + |
740 | + def test_sv_moved_not_called(self): |
741 | + """The handler for SV_MOVED is not called""" |
742 | + self.create_dir() |
743 | + self.create_filetxt() |
744 | + |
745 | + # send a new delta |
746 | + dt2 = copy.copy(self.filetxtdelta) |
747 | + dt2.generation = self.dirdelta.generation + 1 |
748 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
749 | + full=True, free_bytes=10) |
750 | + called = [] |
751 | + self.sync._handle_SV_MOVED = \ |
752 | + lambda *args, **kwargs: called.append((args, kwargs)) |
753 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
754 | + |
755 | + # check that the handler is created |
756 | + self.assertFalse(called) |
757 | + |
758 | + def test_exception_logged(self): |
759 | + """We call self.logger.exception on error.""" |
760 | + # send a new delta |
761 | + dt2 = copy.copy(self.filetxtdelta) |
762 | + dt2.is_live = False |
763 | + kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11, |
764 | + full=True, free_bytes=10) |
765 | + self.sync._handle_SV_FILE_DELETED = \ |
766 | + lambda *args, **kwargs: 1/0 |
767 | + handler = testcase.MementoHandler() |
768 | + handler.setLevel(logging.ERROR) |
769 | + self.sync.logger.addHandler(handler) |
770 | + |
771 | + self.sync.handle_AQ_DELTA_OK(**kwargs) |
772 | + |
773 | + # check log |
774 | + self.assertEqual(len(handler.records), 1) |
775 | + log_msg = handler.records[0].message |
776 | + self.assertTrue("can't be applied." in log_msg) |
777 | + self.sync.logger.removeHandler(handler) |
778 | + self.handler.records = [] |
779 | |
780 | === modified file 'tests/syncdaemon/test_vm.py' |
781 | --- tests/syncdaemon/test_vm.py 2010-07-12 15:47:38 +0000 |
782 | +++ tests/syncdaemon/test_vm.py 2010-07-22 13:39:48 +0000 |
783 | @@ -32,6 +32,7 @@ |
784 | FakeMain, |
785 | BaseTwistedTestCase, |
786 | environ, |
787 | + MementoHandler, |
788 | ) |
789 | from ubuntuone.syncdaemon import config |
790 | from ubuntuone.syncdaemon.volume_manager import ( |
791 | @@ -115,22 +116,27 @@ |
792 | class VolumeManagerTests(BaseVolumeManagerTests): |
793 | """ Tests for Volume Manager internal API. """ |
794 | |
795 | - def test_handle_SYS_ROOT_RECEIVED(self): |
796 | - """Check that list_shares is called in handle_SYS_ROOT_RECEIVED """ |
797 | - d = defer.Deferred() |
798 | - # helper method, pylint: disable-msg=C0111 |
799 | - def list_shares(): |
800 | - mdobj = self.main.fs.get_by_path(self.root_dir) |
801 | - self.assertEquals('root_uuid', mdobj.node_id) |
802 | - d.callback(mdobj) |
803 | - self.main.action_q.list_shares = list_shares |
804 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
805 | - return d |
806 | + @defer.inlineCallbacks |
807 | + def test__got_root_ok(self): |
808 | + """Test _got_root method.""" |
809 | + d = defer.Deferred() |
810 | + self._listen_for('SYS_ROOT_RECEIVED', d.callback) |
811 | + self.vm._got_root('root_uuid') |
812 | + yield d |
813 | + |
814 | + @defer.inlineCallbacks |
815 | + def test__got_root_mismatch(self): |
816 | + """Test for _got_root with different root node_id.""" |
817 | + self.vm._got_root('root_uuid') |
818 | + d = defer.Deferred() |
819 | + self._listen_for('SYS_ROOT_MISMATCH', d.callback) |
820 | + self.vm._got_root('other_root_uuid') |
821 | + yield d |
822 | |
823 | def test_add_share(self): |
824 | """ test the add_share method. """ |
825 | # initialize the the root |
826 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
827 | + self.vm._got_root('root_uuid') |
828 | share_path = os.path.join(self.shares_dir, 'fake_share') |
829 | share = Share(path=share_path, volume_id='share_id') |
830 | self.vm.add_share(share) |
831 | @@ -139,7 +145,7 @@ |
832 | def test_share_deleted(self): |
833 | """ Check that a share is deleted from the share mapping. """ |
834 | # initialize the the root |
835 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
836 | + self.vm._got_root('root_uuid') |
837 | share_path = os.path.join(self.shares_dir, 'fake_share') |
838 | share = Share(path=share_path, volume_id='share_id') |
839 | self.vm.add_share(share) |
840 | @@ -150,7 +156,7 @@ |
841 | def test_share_deleted_with_content(self): |
842 | """ Check that a share is deleted from the share mapping. """ |
843 | # initialize the the root |
844 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
845 | + self.vm._got_root('root_uuid') |
846 | share_path = os.path.join(self.shares_dir, 'fake_share') |
847 | share = Share(path=share_path, volume_id='share_id', |
848 | node_id='share_node_id', access_level='Modify', |
849 | @@ -192,7 +198,7 @@ |
850 | 'test_username', |
851 | 'visible_name', 'Modify') |
852 | # initialize the the root |
853 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
854 | + self.vm._got_root('root_uuid') |
855 | share_path = os.path.join(self.shares_dir, share_holder.share_name) |
856 | share = Share(path=share_path, volume_id=share_holder.share_id, |
857 | access_level='View') |
858 | @@ -209,7 +215,7 @@ |
859 | 'visible_username', 'yes', |
860 | 'View') |
861 | # initialize the the root |
862 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
863 | + self.vm._got_root('root_uuid') |
864 | response = ListShares(None) |
865 | response.shares = [share_response] |
866 | self.vm.handle_AQ_SHARES_LIST(response) |
867 | @@ -228,7 +234,7 @@ |
868 | 'test_username', |
869 | 'visible_name', 'Modify') |
870 | # initialize the the root |
871 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
872 | + self.vm._got_root('root_uuid') |
873 | # create a share |
874 | share_path = os.path.join(self.shares_dir, share_holder.share_name) |
875 | share = Share(path=share_path, volume_id=str(share_holder.share_id), |
876 | @@ -265,7 +271,7 @@ |
877 | 'my_visible_name', 'yes', |
878 | 'Modify') |
879 | # initialize the the root |
880 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
881 | + self.vm._got_root('root_uuid') |
882 | shared_dir = os.path.join(self.root_dir, 'shared_dir') |
883 | self.main.fs.create(path=shared_dir, share_id="", is_dir=True) |
884 | self.main.fs.set_node_id(shared_dir, shared_response.subtree) |
885 | @@ -374,7 +380,7 @@ |
886 | def test_accept_share(self): |
887 | """ Test the accept_share method. """ |
888 | d = defer.Deferred() |
889 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
890 | + self.vm._got_root('root_uuid') |
891 | share_path = os.path.join(self.shares_dir, 'fake_share') |
892 | share = Share(path=share_path, volume_id='share_id', node_id="node_id") |
893 | self.vm.add_share(share) |
894 | @@ -403,7 +409,7 @@ |
895 | 'my_visible_name', 'yes', |
896 | 'Modify') |
897 | # initialize the the root |
898 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
899 | + self.vm._got_root('root_uuid') |
900 | response = ListShares(None) |
901 | response.shares = [shared_response] |
902 | self.vm.handle_AQ_SHARES_LIST(response) |
903 | @@ -421,7 +427,7 @@ |
904 | self.main.fs.set_node_id(path, 'node_id') |
905 | self.main.fs.get_by_node_id("", 'node_id') |
906 | # initialize the the root |
907 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
908 | + self.vm._got_root('root_uuid') |
909 | # add the shared folder |
910 | share = Share(path=path, volume_id='share_id', access_level='View') |
911 | self.vm.add_shared(share) |
912 | @@ -437,7 +443,7 @@ |
913 | self.main.fs.set_node_id(path, 'node_id') |
914 | self.main.fs.get_by_node_id("", 'node_id') |
915 | # initialize the the root |
916 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
917 | + self.vm._got_root('root_uuid') |
918 | self.assertNotIn('share_id', self.vm.shared) |
919 | # check that a answer notify of a missing share don't blowup |
920 | self.vm.handle_SV_SHARE_ANSWERED('share_id', 'Yes') |
921 | @@ -459,7 +465,7 @@ |
922 | 'visible_username', False, |
923 | 'View') |
924 | # initialize the the root |
925 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
926 | + self.vm._got_root('root_uuid') |
927 | response = ListShares(None) |
928 | response.shares = [share_response, share_response_1] |
929 | self.vm.handle_AQ_SHARES_LIST(response) |
930 | @@ -554,14 +560,43 @@ |
931 | @defer.inlineCallbacks |
932 | def test_root_mismatch(self): |
933 | """Test that SYS_ROOT_MISMATCH is pushed.""" |
934 | - self.vm.handle_SYS_ROOT_RECEIVED('root_node_id') |
935 | + self.vm._got_root('root_node_id') |
936 | d = defer.Deferred() |
937 | self._listen_for('SYS_ROOT_MISMATCH', d.callback) |
938 | - self.vm.handle_SYS_ROOT_RECEIVED('root_id') |
939 | + self.vm._got_root('root_id') |
940 | current_id, new_id = yield d |
941 | self.assertEquals('root_node_id', current_id) |
942 | self.assertEquals('root_id', new_id) |
943 | |
944 | + @defer.inlineCallbacks |
945 | + def test_handle_AQ_ANSWER_SHARE_OK(self): |
946 | + """Test for handle_AQ_ANSWER_SHARE_OK.""" |
947 | + share_id = uuid.uuid4() |
948 | + share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None, |
949 | + 10, 'to_me', 'fake_share', 'username', |
950 | + 'visible_username', False, 'View') |
951 | + dir_name = self.vm._build_share_path(share_volume.share_name, |
952 | + share_volume.other_visible_name) |
953 | + share_path = os.path.join(self.main.shares_dir, dir_name) |
954 | + share = Share.from_share_volume(share_volume, share_path) |
955 | + |
956 | + get_delta_d = defer.Deferred() |
957 | + def fake_get_delta(volume_id, generation): |
958 | + """A fake get_delta that check the arguments.""" |
959 | + self.assertEquals(share.volume_id, volume_id) |
960 | + self.assertEquals(share.generation, generation) |
961 | + get_delta_d.callback(None) |
962 | + self.main.action_q.get_delta = fake_get_delta |
963 | + |
964 | + self.vm.add_share(share) |
965 | + self.vm.handle_AQ_ANSWER_SHARE_OK(share.volume_id, 'Yes') |
966 | + yield get_delta_d |
967 | + share = self.vm.get_volume(share.volume_id) |
968 | + self.assertTrue(share.accepted, 'accepted != True') |
969 | + self.assertTrue(self.main.fs.get_by_path(share.path), |
970 | + 'No metadata for share root node.') |
971 | + self.assertTrue(os.path.exists(share.path), 'share path missing on disk!') |
972 | + |
973 | |
974 | class VolumeManagerUnicodeTests(BaseVolumeManagerTests): |
975 | """Tests for Volume Manager unicode capabilities.""" |
976 | @@ -574,7 +609,7 @@ |
977 | 'visible', 'yes', |
978 | 'View') |
979 | # initialize the the root |
980 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
981 | + self.vm._got_root('root_uuid') |
982 | response = ListShares(None) |
983 | response.shares = [share_response] |
984 | self.vm.handle_AQ_SHARES_LIST(response) |
985 | @@ -593,7 +628,7 @@ |
986 | u'Darío Toño', 'yes', |
987 | 'View') |
988 | # initialize the the root |
989 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
990 | + self.vm._got_root('root_uuid') |
991 | response = ListShares(None) |
992 | response.shares = [share_response] |
993 | self.vm.handle_AQ_SHARES_LIST(response) |
994 | @@ -610,7 +645,7 @@ |
995 | u'año', |
996 | 'test_username', |
997 | 'visible', 'Modify') |
998 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
999 | + self.vm._got_root('root_uuid') |
1000 | self.vm.handle_SV_SHARE_CHANGED(info=share_holder) |
1001 | shouldbe_dir = os.path.join(self.shares_dir, |
1002 | u"año".encode("utf8") + " from visible") |
1003 | @@ -622,7 +657,7 @@ |
1004 | 'share', |
1005 | 'test_username', |
1006 | u'Ramón', 'Modify') |
1007 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1008 | + self.vm._got_root('root_uuid') |
1009 | self.vm.handle_SV_SHARE_CHANGED(info=share_holder) |
1010 | shouldbe_dir = os.path.join(self.shares_dir, |
1011 | "share from " + u"Ramón".encode("utf8")) |
1012 | @@ -654,6 +689,7 @@ |
1013 | free_bytes, suggested_path) |
1014 | udf = UDF.from_udf_volume(volume, path) |
1015 | udf.subscribed = subscribed |
1016 | + udf.generation = generation |
1017 | return udf, volume |
1018 | |
1019 | def test_build_udf_path(self): |
1020 | @@ -715,20 +751,16 @@ |
1021 | suggested_path = "suggested_path" |
1022 | udf, volume = self._create_udf(uuid.uuid4(), 'udf_node_id', |
1023 | '~/' + suggested_path, subscribed=True) |
1024 | - query_d = defer.Deferred() |
1025 | - space_d = defer.Deferred() |
1026 | - query_d.addCallback(lambda _: space_d) |
1027 | - def fake_query(data): |
1028 | - self.assertEquals([(udf.volume_id, udf.node_id, '')], data) |
1029 | - query_d.callback(None) |
1030 | - def fake_inquire_free_space(volume_id): |
1031 | - self.assertEquals(volume_id, request.ROOT) |
1032 | - space_d.callback(None) |
1033 | - self.main.action_q.query = fake_query |
1034 | - self.main.action_q.inquire_free_space = fake_inquire_free_space |
1035 | + get_delta_d = defer.Deferred() |
1036 | + def fake_get_delta(volume_id, generation): |
1037 | + """A fake get_delta that check the arguments.""" |
1038 | + self.assertEquals(udf.volume_id, volume_id) |
1039 | + self.assertEquals(udf.generation, generation) |
1040 | + get_delta_d.callback(None) |
1041 | + self.main.action_q.get_delta = fake_get_delta |
1042 | |
1043 | yield self.vm.add_udf(udf) |
1044 | - yield query_d |
1045 | + yield get_delta_d |
1046 | self.assertEquals(os.path.join(self.home_dir, suggested_path), |
1047 | udf.path) |
1048 | self.assertEquals(1, len(self.vm.udfs)) |
1049 | @@ -849,8 +881,9 @@ |
1050 | self.assertEquals('udf_uuid', udf.node_id) |
1051 | self.assertEquals(os.path.join(self.home_dir, 'UDF'), udf.path) |
1052 | # check that the root it's there |
1053 | - self.assertTrue('' in self.vm.shares) |
1054 | - self.assertEquals(self.vm.shares[''].node_id, str(root_volume.node_id)) |
1055 | + self.assertTrue(request.ROOT in self.vm.shares) |
1056 | + self.assertEquals(self.vm.shares[request.ROOT].node_id, |
1057 | + str(root_volume.node_id)) |
1058 | # now send the same list again and check |
1059 | # use a custom home |
1060 | with environ('HOME', self.home_dir): |
1061 | @@ -896,7 +929,7 @@ |
1062 | udf_id = uuid.uuid4() |
1063 | udf_volume = volumes.UDFVolume(udf_id, 'udf_uuid', None, 10, u'~/ñoño') |
1064 | # initialize the the root |
1065 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1066 | + self.vm._got_root('root_uuid') |
1067 | response = [share_volume, udf_volume] |
1068 | d = defer.Deferred() |
1069 | self._listen_for('VM_UDF_CREATED', d.callback) |
1070 | @@ -927,8 +960,9 @@ |
1071 | self.vm.handle_AQ_LIST_VOLUMES(response) |
1072 | self.assertEquals(1, len(self.vm.shares)) # the new share and root |
1073 | # check that the root is in the shares dict |
1074 | - self.assertTrue('' in self.vm.shares) |
1075 | - self.assertEquals(self.vm.shares[''].node_id, str(root_volume.node_id)) |
1076 | + self.assertTrue(request.ROOT in self.vm.shares) |
1077 | + self.assertEquals(self.vm.shares[request.ROOT].node_id, |
1078 | + str(root_volume.node_id)) |
1079 | |
1080 | def test_get_udf_path_name(self): |
1081 | """Test for _get_udf_path_name.""" |
1082 | @@ -1413,7 +1447,7 @@ |
1083 | 'username', 'visible_username', |
1084 | True, 'View') |
1085 | # initialize the the root |
1086 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1087 | + self.vm._got_root('root_uuid') |
1088 | self._listen_for('VM_UDF_CREATED', d.callback) |
1089 | def check_udf(info): |
1090 | """The udf creation callback""" |
1091 | @@ -1475,7 +1509,7 @@ |
1092 | yield self.vm.add_udf(udf) |
1093 | self.vm.add_share(share) |
1094 | # initialize the the root |
1095 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1096 | + self.vm._got_root('root_uuid') |
1097 | d = defer.Deferred() |
1098 | self._listen_for('VM_VOLUME_DELETED', d.callback) |
1099 | with environ('HOME', self.home_dir): |
1100 | @@ -1585,7 +1619,7 @@ |
1101 | def test_no_UDFs_inside_root(self): |
1102 | """Test that a UDF can't be created inside the root""" |
1103 | # initialize the root |
1104 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1105 | + self.vm._got_root('root_uuid') |
1106 | udf_path = os.path.join(self.root_dir, 'udf_inside_root') |
1107 | # patch FakeAQ |
1108 | def create_udf(path, name, marker): |
1109 | @@ -1695,8 +1729,8 @@ |
1110 | share_path = os.path.join(self.main.shares_dir, dir_name) |
1111 | share = Share.from_share_volume(share_volume, share_path) |
1112 | # get the root |
1113 | - root = self.vm.get_volume('') |
1114 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1115 | + root = self.vm.get_volume(request.ROOT) |
1116 | + self.vm._got_root('root_uuid') |
1117 | # create a UDF |
1118 | udf_id = uuid.uuid4() |
1119 | udf, volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF') |
1120 | @@ -1738,8 +1772,8 @@ |
1121 | share_path = os.path.join(self.main.shares_dir, dir_name) |
1122 | share = Share.from_share_volume(share_volume, share_path) |
1123 | # get the root |
1124 | - root = self.vm.get_volume('') |
1125 | - self.vm.handle_SYS_ROOT_RECEIVED('root_node_id') |
1126 | + root = self.vm.get_volume(request.ROOT) |
1127 | + self.vm._got_root('root_node_id') |
1128 | # create a UDF |
1129 | udf_id = uuid.uuid4() |
1130 | udf, volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF') |
1131 | @@ -1763,7 +1797,7 @@ |
1132 | def test_UDF_cant_be_a_symlink(self): |
1133 | """Test that a UDF can't be a symlink.""" |
1134 | # initialize the root |
1135 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1136 | + self.vm._got_root('root_uuid') |
1137 | real_udf_path = os.path.join(self.home_dir, "my_udf") |
1138 | udf_path = os.path.join(self.home_dir, "MyUDF") |
1139 | # patch FakeAQ |
1140 | @@ -1789,7 +1823,7 @@ |
1141 | def test_UDF_cant_be_inside_symlink(self): |
1142 | """Test that a UDF can't be inside a symlink.""" |
1143 | # initialize the root |
1144 | - self.vm.handle_SYS_ROOT_RECEIVED('root_uuid') |
1145 | + self.vm._got_root('root_uuid') |
1146 | real_udf_path = os.path.join(self.home_dir, "udf_parent", "my_udf") |
1147 | udf_path = os.path.join(self.home_dir, "MyUDF") |
1148 | # patch FakeAQ |
1149 | @@ -1839,10 +1873,10 @@ |
1150 | for event in events) |
1151 | self.assertIn(str(share_id), events_dict) |
1152 | self.assertIn(str(udf_id), events_dict) |
1153 | - self.assertIn('', events_dict) |
1154 | + self.assertIn(request.ROOT, events_dict) |
1155 | self.assertEquals(1, events_dict[str(share_id)]) |
1156 | self.assertEquals(1, events_dict[str(udf_id)]) |
1157 | - self.assertEquals(1, events_dict['']) |
1158 | + self.assertEquals(1, events_dict[request.ROOT]) |
1159 | |
1160 | @defer.inlineCallbacks |
1161 | def test_server_rescan_error(self): |
1162 | @@ -1888,10 +1922,10 @@ |
1163 | for event in events) |
1164 | self.assertIn(str(share_id), events_dict) |
1165 | self.assertIn(str(udf_id), events_dict) |
1166 | - self.assertIn('', events_dict) |
1167 | + self.assertIn(request.ROOT, events_dict) |
1168 | self.assertEquals(1, events_dict[str(share_id)]) |
1169 | self.assertEquals(1, events_dict[str(udf_id)]) |
1170 | - self.assertEquals(1, events_dict['']) |
1171 | + self.assertEquals(1, events_dict[request.ROOT]) |
1172 | # set the local metadata generation to new value |
1173 | share = self.vm.shares[str(share_id)] |
1174 | share.generation = share_volume.generation |
1175 | @@ -1901,7 +1935,7 @@ |
1176 | self.vm.udfs[str(udf_id)] = udf |
1177 | root = self.vm.root |
1178 | root.generation = root_volume.generation |
1179 | - self.vm.shares[''] = root |
1180 | + self.vm.shares[request.ROOT] = root |
1181 | # now that we have the volumes in metadata, try with a higher value. |
1182 | share_volume.generation = 10 |
1183 | udf_volume.generation = 5 |
1184 | @@ -1916,7 +1950,7 @@ |
1185 | for event in events) |
1186 | self.assertIn(str(share_id), events_dict) |
1187 | self.assertIn(str(udf_id), events_dict) |
1188 | - self.assertNotIn('', events_dict) # same generartion as metadata |
1189 | + self.assertNotIn(request.ROOT, events_dict) # same generartion as metadata |
1190 | self.assertEquals(10, events_dict[str(share_id)]) |
1191 | self.assertEquals(5, events_dict[str(udf_id)]) |
1192 | # now only change the root volume generation |
1193 | @@ -1933,8 +1967,8 @@ |
1194 | for event in events) |
1195 | self.assertNotIn(str(share_id), events_dict) |
1196 | self.assertNotIn(str(udf_id), events_dict) |
1197 | - self.assertIn('', events_dict) # same generartion as metadata |
1198 | - self.assertEquals(100, events_dict['']) |
1199 | + self.assertIn(request.ROOT, events_dict) # same generartion as metadata |
1200 | + self.assertEquals(100, events_dict[request.ROOT]) |
1201 | |
1202 | @defer.inlineCallbacks |
1203 | def test_update_generation(self): |
1204 | @@ -1948,8 +1982,8 @@ |
1205 | share_path = os.path.join(self.main.shares_dir, dir_name) |
1206 | share = Share.from_share_volume(share_volume, share_path) |
1207 | # get the root |
1208 | - root = self.vm.get_volume('') |
1209 | - self.vm.handle_SYS_ROOT_RECEIVED('root_node_id') |
1210 | + root = self.vm.get_volume(request.ROOT) |
1211 | + self.vm._got_root('root_node_id') |
1212 | # create a UDF |
1213 | udf_id = uuid.uuid4() |
1214 | udf, _ = self._create_udf(udf_id, 'udf_node_id', '~/UDF') |
1215 | @@ -1971,6 +2005,205 @@ |
1216 | self.assertRaises(VolumeDoesNotExist, self.vm.update_generation, |
1217 | str(uuid.uuid4()), 1) |
1218 | |
1219 | + @defer.inlineCallbacks |
1220 | + def test_handle_SV_VOLUME_NEW_GENERATION_udf(self): |
1221 | + """Test handle_SV_VOLUME_NEW_GENERATION for udf.""" |
1222 | + # create a UDF |
1223 | + udf_id = uuid.uuid4() |
1224 | + udf, udf_volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF') |
1225 | + yield self.vm.add_udf(udf) |
1226 | + self.vm.update_generation(udf.volume_id, 10) |
1227 | + d = defer.Deferred() |
1228 | + self.patch(self.main.action_q, 'get_delta', lambda v, g: d.callback((v, g))) |
1229 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', udf.volume_id, 100) |
1230 | + vol_id, gen = yield d |
1231 | + vol = self.vm.get_volume(vol_id) |
1232 | + self.assertEqual(vol_id, vol.volume_id) |
1233 | + self.assertEqual(gen, vol.generation) |
1234 | + |
1235 | + @defer.inlineCallbacks |
1236 | + def test_handle_SV_VOLUME_NEW_GENERATION_udf_from_scratch(self): |
1237 | + """Test handle_SV_VOLUME_NEW_GENERATION for udf.""" |
1238 | + # create a UDF |
1239 | + udf_id = uuid.uuid4() |
1240 | + udf, udf_volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF') |
1241 | + yield self.vm.add_udf(udf) |
1242 | + self.vm.update_generation(udf.volume_id, None) |
1243 | + d = defer.Deferred() |
1244 | + self.patch(self.main.action_q, 'rescan_from_scratch', d.callback) |
1245 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', udf.volume_id, 100) |
1246 | + vol_id = yield d |
1247 | + self.assertEquals(vol_id, udf.volume_id) |
1248 | + |
1249 | + @defer.inlineCallbacks |
1250 | + def test_handle_SV_VOLUME_NEW_GENERATION_udf_eq(self): |
1251 | + """Test handle_SV_VOLUME_NEW_GENERATION for udf.""" |
1252 | + # get the root |
1253 | + udf_id = uuid.uuid4() |
1254 | + udf, udf_volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF') |
1255 | + yield self.vm.add_udf(udf) |
1256 | + self.vm.update_generation(udf.volume_id, 100) |
1257 | + handler = MementoHandler() |
1258 | + handler.setLevel(logging.INFO) |
1259 | + self.vm.log.addHandler(handler) |
1260 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', udf.volume_id, 100) |
1261 | + self.assertEqual(1, len(handler.records)) |
1262 | + msg = 'Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume' + \ |
1263 | + ' is at generation: %r' |
1264 | + self.assertEqual(msg % (udf.volume_id, 100, 100), |
1265 | + handler.records[0].message) |
1266 | + self.vm.log.removeHandler(handler) |
1267 | + |
1268 | + @defer.inlineCallbacks |
1269 | + def test_handle_SV_VOLUME_NEW_GENERATION_root(self): |
1270 | + """Test handle_SV_VOLUME_NEW_GENERATION for root share.""" |
1271 | + # get the root |
1272 | + root = self.vm.get_volume(request.ROOT) |
1273 | + self.vm._got_root('root_node_id') |
1274 | + self.vm.update_generation(root.volume_id, 10) |
1275 | + d = defer.Deferred() |
1276 | + self.patch(self.main.action_q, 'get_delta', lambda v, g: d.callback((v, g))) |
1277 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', root.volume_id, 100) |
1278 | + vol_id, gen = yield d |
1279 | + vol = self.vm.get_volume(vol_id) |
1280 | + self.assertEqual(vol_id, vol.volume_id) |
1281 | + self.assertEqual(gen, vol.generation) |
1282 | + |
1283 | + @defer.inlineCallbacks |
1284 | + def test_handle_SV_VOLUME_NEW_GENERATION_root_from_scratch(self): |
1285 | + """Test handle_SV_VOLUME_NEW_GENERATION for root share.""" |
1286 | + # get the root |
1287 | + root = self.vm.get_volume(request.ROOT) |
1288 | + self.vm._got_root('root_node_id') |
1289 | + self.vm.update_generation(root.volume_id, None) |
1290 | + d = defer.Deferred() |
1291 | + self.patch(self.main.action_q, 'rescan_from_scratch', d.callback) |
1292 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', root.volume_id, 100) |
1293 | + vol_id = yield d |
1294 | + self.assertEquals(vol_id, root.volume_id) |
1295 | + |
1296 | + def test_handle_SV_VOLUME_NEW_GENERATION_root_eq(self): |
1297 | + """Test handle_SV_VOLUME_NEW_GENERATION for root share.""" |
1298 | + # get the root |
1299 | + root = self.vm.get_volume(request.ROOT) |
1300 | + self.vm._got_root('root_node_id') |
1301 | + self.vm.update_generation(root.volume_id, 100) |
1302 | + handler = MementoHandler() |
1303 | + handler.setLevel(logging.INFO) |
1304 | + self.vm.log.addHandler(handler) |
1305 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', root.volume_id, 100) |
1306 | + self.assertEqual(1, len(handler.records)) |
1307 | + msg = 'Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume' + \ |
1308 | + ' is at generation: %r' |
1309 | + self.assertEqual(msg % (root.volume_id, 100, 100), |
1310 | + handler.records[0].message) |
1311 | + self.vm.log.removeHandler(handler) |
1312 | + |
1313 | + @defer.inlineCallbacks |
1314 | + def test_handle_SV_VOLUME_NEW_GENERATION_share(self): |
1315 | + """Test handle_SV_VOLUME_NEW_GENERATION for a share.""" |
1316 | + share_id = uuid.uuid4() |
1317 | + share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None, |
1318 | + 10, 'to_me', 'fake_share', 'username', |
1319 | + 'visible_username', True, 'View') |
1320 | + dir_name = self.vm._build_share_path(share_volume.share_name, |
1321 | + share_volume.other_visible_name) |
1322 | + share_path = os.path.join(self.main.shares_dir, dir_name) |
1323 | + share = Share.from_share_volume(share_volume, share_path) |
1324 | + self.vm.add_share(share) |
1325 | + self.vm.update_generation(share.volume_id, 10) |
1326 | + d = defer.Deferred() |
1327 | + self.patch(self.main.action_q, 'get_delta', lambda v, g: d.callback((v, g))) |
1328 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', share.volume_id, 100) |
1329 | + vol_id, gen = yield d |
1330 | + vol = self.vm.get_volume(vol_id) |
1331 | + self.assertEqual(vol_id, vol.volume_id) |
1332 | + self.assertEqual(gen, vol.generation) |
1333 | + |
1334 | + @defer.inlineCallbacks |
1335 | + def test_handle_SV_VOLUME_NEW_GENERATION_share_from_scratch(self): |
1336 | + """Test handle_SV_VOLUME_NEW_GENERATION for a share.""" |
1337 | + share_id = uuid.uuid4() |
1338 | + share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None, |
1339 | + 10, 'to_me', 'fake_share', 'username', |
1340 | + 'visible_username', True, 'View') |
1341 | + dir_name = self.vm._build_share_path(share_volume.share_name, |
1342 | + share_volume.other_visible_name) |
1343 | + share_path = os.path.join(self.main.shares_dir, dir_name) |
1344 | + share = Share.from_share_volume(share_volume, share_path) |
1345 | + self.vm.add_share(share) |
1346 | + self.vm.update_generation(share.volume_id, None) |
1347 | + d = defer.Deferred() |
1348 | + self.patch(self.main.action_q, 'rescan_from_scratch', d.callback) |
1349 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', share.volume_id, 100) |
1350 | + vol_id = yield d |
1351 | + self.assertEquals(vol_id, share.volume_id) |
1352 | + |
1353 | + def test_handle_SV_VOLUME_NEW_GENERATION_share_eq(self): |
1354 | + """Test handle_SV_VOLUME_NEW_GENERATION for a share.""" |
1355 | + share_id = uuid.uuid4() |
1356 | + share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None, |
1357 | + 10, 'to_me', 'fake_share', 'username', |
1358 | + 'visible_username', True, 'View') |
1359 | + dir_name = self.vm._build_share_path(share_volume.share_name, |
1360 | + share_volume.other_visible_name) |
1361 | + share_path = os.path.join(self.main.shares_dir, dir_name) |
1362 | + share = Share.from_share_volume(share_volume, share_path) |
1363 | + self.vm.add_share(share) |
1364 | + self.vm.update_generation(share.volume_id, 100) |
1365 | + handler = MementoHandler() |
1366 | + handler.setLevel(logging.INFO) |
1367 | + self.vm.log.addHandler(handler) |
1368 | + self.main.event_q.push('SV_VOLUME_NEW_GENERATION', share.volume_id, 100) |
1369 | + self.assertEqual(1, len(handler.records)) |
1370 | + msg = 'Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume' + \ |
1371 | + ' is at generation: %r' |
1372 | + self.assertEqual(msg % (share.volume_id, 100, 100), |
1373 | + handler.records[0].message) |
1374 | + self.vm.log.removeHandler(handler) |
1375 | + |
1376 | + @defer.inlineCallbacks |
1377 | + def test_handle_AQ_DELTA_NOT_POSSIBLE(self): |
1378 | + """Test for handle_AQ_DELTA_NOT_POSSIBLE.""" |
1379 | + share_id = uuid.uuid4() |
1380 | + share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None, |
1381 | + 10, 'to_me', 'fake_share', 'username', |
1382 | + 'visible_username', True, 'View') |
1383 | + dir_name = self.vm._build_share_path(share_volume.share_name, |
1384 | + share_volume.other_visible_name) |
1385 | + share_path = os.path.join(self.main.shares_dir, dir_name) |
1386 | + share = Share.from_share_volume(share_volume, share_path) |
1387 | + # get the root |
1388 | + root = self.vm.get_volume(request.ROOT) |
1389 | + self.vm._got_root('root_node_id') |
1390 | + # create a UDF |
1391 | + udf_id = uuid.uuid4() |
1392 | + udf, _ = self._create_udf(udf_id, 'udf_node_id', '~/UDF') |
1393 | + self.vm.add_share(share) |
1394 | + yield self.vm.add_udf(udf) |
1395 | + # patch AQ.rescan_from_scratch |
1396 | + calls = [] |
1397 | + self.patch(self.main.action_q, 'rescan_from_scratch', calls.append) |
1398 | + self.vm.handle_AQ_DELTA_NOT_POSSIBLE(udf_id) |
1399 | + self.vm.handle_AQ_DELTA_NOT_POSSIBLE(share_id) |
1400 | + self.vm.handle_AQ_DELTA_NOT_POSSIBLE(root.volume_id) |
1401 | + for i, vol in enumerate([udf, share, root]): |
1402 | + self.assertEquals(calls[i], str(vol.volume_id)) |
1403 | + |
1404 | + def test_handle_AQ_DELTA_NOT_POSSIBLE_missing_volume(self): |
1405 | + """Test for handle_AQ_DELTA_NOT_POSSIBLE with an missing volume.""" |
1406 | + handler = MementoHandler() |
1407 | + handler.setLevel(logging.WARNING) |
1408 | + self.vm.log.addHandler(handler) |
1409 | + vol_id = uuid.uuid4() |
1410 | + try: |
1411 | + self.vm.handle_AQ_DELTA_NOT_POSSIBLE(vol_id) |
1412 | + finally: |
1413 | + self.vm.log.removeHandler(handler) |
1414 | + self.assertEqual(1, len(handler.records)) |
1415 | + msg = 'Got a AQ_DELTA_NOT_POSSIBLE for a missing volume: %r' |
1416 | + self.assertEqual(msg % str(vol_id), handler.records[0].message) |
1417 | + |
1418 | |
1419 | class MetadataTestCase(BaseTwistedTestCase): |
1420 | md_version_None = False |
1421 | @@ -2062,7 +2295,7 @@ |
1422 | # add the root_uuid key |
1423 | root_share = _Share(path=self.root_dir) |
1424 | root_share.access_level = 'Modify' |
1425 | - old_shelf[''] = root_share |
1426 | + old_shelf[request.ROOT] = root_share |
1427 | for idx in range(1, 10): |
1428 | sid = str(uuid.uuid4()) |
1429 | old_shelf[sid] = _Share(path=os.path.join(self.shares_dir, str(idx)), |
1430 | @@ -2129,7 +2362,7 @@ |
1431 | self.data_dir, self.partials_dir) |
1432 | new_keys = [new_key for new_key in self.main.vm.shares.keys()] |
1433 | self.assertEquals(2, len(new_keys)) # the fake share plus root |
1434 | - for key in ['', share.id]: |
1435 | + for key in [request.ROOT, share.id]: |
1436 | self.assertIn(key, new_keys) |
1437 | self.check_version() |
1438 | |
1439 | @@ -2322,7 +2555,7 @@ |
1440 | # add the root_uuid key |
1441 | root_share = _Share(path=self.root_dir) |
1442 | root_share.access_level = 'Modify' |
1443 | - maybe_old_shelf[''] = root_share |
1444 | + maybe_old_shelf[request.ROOT] = root_share |
1445 | for idx in range(1, 10): |
1446 | share_id = str(uuid.uuid4()) |
1447 | maybe_old_shelf[share_id] = \ |
1448 | @@ -2483,7 +2716,7 @@ |
1449 | root_share.access_level = 'Modify' |
1450 | # set None to the share path |
1451 | root_share.path = None |
1452 | - shares[''] = root_share |
1453 | + shares[request.ROOT] = root_share |
1454 | |
1455 | if self.md_version_None: |
1456 | self.set_md_version('') |
1457 | @@ -2521,9 +2754,9 @@ |
1458 | self.set_md_version('5') |
1459 | # create some old shares and shared metadata |
1460 | legacy_shares = LegacyShareFileShelf(self.share_md_dir) |
1461 | - root_share = _Share(path=self.root_dir, share_id='', |
1462 | + root_share = _Share(path=self.root_dir, share_id=request.ROOT, |
1463 | access_level='Modify') |
1464 | - legacy_shares[''] = root_share |
1465 | + legacy_shares[request.ROOT] = root_share |
1466 | for idx, name in enumerate(['share'] * 10): |
1467 | sid = str(uuid.uuid4()) |
1468 | share_name = name + '_' + str(idx) |
1469 | @@ -2595,9 +2828,9 @@ |
1470 | self.udfs_md_dir = os.path.join(self.vm_data_dir, 'udfs') |
1471 | # create some old shares and shared metadata |
1472 | legacy_shares = LegacyShareFileShelf(self.share_md_dir) |
1473 | - root_share = _Share(path=self.root_dir, share_id='', |
1474 | + root_share = _Share(path=self.root_dir, share_id=request.ROOT, |
1475 | access_level='Modify') |
1476 | - legacy_shares[''] = root_share |
1477 | + legacy_shares[request.ROOT] = root_share |
1478 | for idx, name in enumerate(['share'] * 10): |
1479 | sid = str(uuid.uuid4()) |
1480 | share_name = name + '_' + str(idx) |
1481 | @@ -2695,9 +2928,9 @@ |
1482 | self.udfs_md_dir = os.path.join(self.vm_data_dir, 'udfs') |
1483 | # create some old shares and shared metadata |
1484 | legacy_shares = LegacyShareFileShelf(self.share_md_dir) |
1485 | - root_share = _Share(path=self.root_dir, share_id='', |
1486 | + root_share = _Share(path=self.root_dir, share_id=request.ROOT, |
1487 | access_level='Modify', node_id=str(uuid.uuid4())) |
1488 | - legacy_shares[''] = root_share |
1489 | + legacy_shares[request.ROOT] = root_share |
1490 | for idx, name in enumerate(['share'] * 3): |
1491 | sid = str(uuid.uuid4()) |
1492 | share_name = name + '_' + str(idx) |
1493 | @@ -2802,9 +3035,9 @@ |
1494 | self.set_md_version('5') |
1495 | # create some old shares and shared metadata |
1496 | legacy_shares = LegacyShareFileShelf(self.share_md_dir) |
1497 | - root_share = _Share(path=self.root_dir, share_id='', |
1498 | + root_share = _Share(path=self.root_dir, share_id=request.ROOT, |
1499 | access_level='Modify') |
1500 | - legacy_shares[''] = root_share |
1501 | + legacy_shares[request.ROOT] = root_share |
1502 | for idx, name in enumerate(['share'] * 10): |
1503 | sid = str(uuid.uuid4()) |
1504 | share_name = name + '_' + str(idx) |
1505 | |
1506 | === modified file 'ubuntuone/syncdaemon/action_queue.py' |
1507 | --- ubuntuone/syncdaemon/action_queue.py 2010-07-19 20:15:06 +0000 |
1508 | +++ ubuntuone/syncdaemon/action_queue.py 2010-07-22 13:39:48 +0000 |
1509 | @@ -703,14 +703,6 @@ |
1510 | self.connector = None |
1511 | self.connect_in_progress = False |
1512 | |
1513 | - def _node_state_callback(self, share_id, node_id, hash): |
1514 | - """Called by the client when notified that node changed.""" |
1515 | - volume = self.main.vm.udfs.get(share_id, None) |
1516 | - if volume is not None and not volume.subscribed: |
1517 | - return |
1518 | - self.event_queue.push('SV_HASH_NEW', |
1519 | - share_id=share_id, node_id=node_id, hash=hash) |
1520 | - |
1521 | def _share_change_callback(self, info): |
1522 | """Called by the client when notified that a share changed.""" |
1523 | self.event_queue.push('SV_SHARE_CHANGED', info=info) |
1524 | @@ -827,7 +819,6 @@ |
1525 | # does nothing (safely). |
1526 | self.client = ThrottlingStorageClientFactory.buildProtocol(self, addr) |
1527 | |
1528 | - self.client.set_node_state_callback(self._node_state_callback) |
1529 | self.client.set_share_change_callback(self._share_change_callback) |
1530 | self.client.set_share_answer_callback(self._share_answer_callback) |
1531 | self.client.set_free_space_callback(self._free_space_callback) |
1532 | @@ -878,6 +869,7 @@ |
1533 | def _send_request_and_handle_errors(self, request, request_error, |
1534 | event_error, event_ok, |
1535 | fire_deferred=True, |
1536 | + handle_exception=True, |
1537 | args=(), kwargs={}): |
1538 | """Send 'request' to the server, using params 'args' and 'kwargs'. |
1539 | |
1540 | @@ -919,8 +911,11 @@ |
1541 | event = 'SYS_SERVER_ERROR' |
1542 | self.event_queue.push(event, error=str(failure)) |
1543 | except Exception, failure: |
1544 | - event = 'SYS_UNKNOWN_ERROR' |
1545 | - self.event_queue.push(event) |
1546 | + if handle_exception: |
1547 | + event = 'SYS_UNKNOWN_ERROR' |
1548 | + self.event_queue.push(event) |
1549 | + else: |
1550 | + raise |
1551 | else: |
1552 | logger.info("The request '%s' finished OK.", req_name) |
1553 | if event_ok is not None: |
1554 | @@ -997,50 +992,19 @@ |
1555 | # callback the deferred if everything went ok |
1556 | self.deferred.callback(self.client) |
1557 | |
1558 | - def server_rescan(self, root_mdid, data_gen): |
1559 | - """Do the server rescan.""" |
1560 | - |
1561 | - @defer.inlineCallbacks |
1562 | - def _get_root_and_query(root_mdid, data_gen): |
1563 | - """Get user's root and then query each element in data_gen().""" |
1564 | - yield self.get_root(root_mdid) |
1565 | - data = data_gen() |
1566 | - logger.info("Server rescan: will query %d objects", len(data)) |
1567 | - # check we're going to actually log, because this could be expensive |
1568 | - if logger.isEnabledFor(TRACE): |
1569 | - for share, node, hash in data: |
1570 | - logger.trace("Server rescan: share: %r, node: %r, hash: %s", |
1571 | - share or '/root/', node, hash) |
1572 | - logger.trace("Server rescan: all data shown") |
1573 | - yield self.client.query(data) |
1574 | - |
1575 | - get_root_and_query_d = self._send_request_and_handle_errors( |
1576 | - request=_get_root_and_query, |
1577 | + @defer.inlineCallbacks |
1578 | + def query_volumes(self): |
1579 | + """Get the list of volumes. |
1580 | + |
1581 | + This method will *not* queue a command, the request will be |
1582 | + executed right away. |
1583 | + """ |
1584 | + result = yield self._send_request_and_handle_errors( |
1585 | + request=self.client.list_volumes, |
1586 | request_error=None, event_error=None, |
1587 | - event_ok='SYS_SERVER_RESCAN_DONE', fire_deferred=False, |
1588 | - args=(root_mdid, data_gen) |
1589 | - ) |
1590 | - return get_root_and_query_d |
1591 | - |
1592 | - def get_root(self, marker): |
1593 | - """Get the user's root uuid. |
1594 | - |
1595 | - Use the uuid_map, so the caller can use the marker in followup |
1596 | - operations. |
1597 | - |
1598 | - """ |
1599 | - log = mklog(logger, 'get_root', '', marker, marker=marker) |
1600 | - log.debug('starting') |
1601 | - d = self.client.get_root() |
1602 | - d.addCallbacks(*log.callbacks()) |
1603 | - d.addCallbacks(passit(lambda root: self.uuid_map.set(marker, root)), |
1604 | - passit(lambda f: self.uuid_map.err(marker, f))) |
1605 | - def handle_root(root_id): |
1606 | - """Push SYS_ROOT_RECEIVED event""" |
1607 | - self.event_queue.push('SYS_ROOT_RECEIVED', root_id=root_id) |
1608 | - return root_id |
1609 | - d.addCallback(handle_root) |
1610 | - return d |
1611 | + event_ok=None, fire_deferred=False, |
1612 | + handle_exception=False) |
1613 | + defer.returnValue(result.volumes) |
1614 | |
1615 | def make_file(self, share_id, parent_id, name, marker): |
1616 | """ |
1617 | @@ -1134,17 +1098,6 @@ |
1618 | """ |
1619 | return ListVolumes(self.meta_queue).start() |
1620 | |
1621 | - def query_volumes(self): |
1622 | - """List all the volumes the user has. |
1623 | - |
1624 | - This includes the volumes: |
1625 | - - all the user's UDFs. |
1626 | - - all the shares the user has accepted. |
1627 | - - the root-root volume. |
1628 | - |
1629 | - """ |
1630 | - return VolumesQuery(self.meta_queue).start() |
1631 | - |
1632 | def delete_volume(self, volume_id): |
1633 | """Delete a volume on the server, removing the associated tree. |
1634 | |
1635 | @@ -1256,6 +1209,18 @@ |
1636 | """ |
1637 | return GetDelta(self.meta_queue, volume_id, generation).start() |
1638 | |
1639 | + def rescan_from_scratch(self, volume_id): |
1640 | + """Get a delta from scratch for the volume. |
1641 | + |
1642 | + @param volume_id: the id of the volume to get the delta. |
1643 | + |
1644 | + Result will be signaled using events: |
1645 | + - AQ_RESCAN_FROM_SCRATCH_OK on succeess. |
1646 | + - AQ_RESCAN_FROM_SCRATCH_ERROR on generic failure. |
1647 | + """ |
1648 | + # TODO: implement rescan from scratch |
1649 | + return None |
1650 | + |
1651 | |
1652 | SKIP_THIS_ITEM = object() |
1653 | |
1654 | @@ -1989,18 +1954,6 @@ |
1655 | error=failure.getErrorMessage()) |
1656 | |
1657 | |
1658 | -class VolumesQuery(ListVolumes): |
1659 | - """List all the volumes for a given user, but doesn't push a event.""" |
1660 | - |
1661 | - def handle_success(self, success): |
1662 | - """It worked!, just forward the value""" |
1663 | - return success.volumes |
1664 | - |
1665 | - def handle_failure(self, failure): |
1666 | - """It didn't work! just forward the failure.""" |
1667 | - return failure |
1668 | - |
1669 | - |
1670 | class DeleteVolume(ActionQueueCommand): |
1671 | """Delete an exsistent volume.""" |
1672 | |
1673 | @@ -2065,6 +2018,7 @@ |
1674 | def handle_success(self, request): |
1675 | """It worked! Push the success event.""" |
1676 | data = dict( |
1677 | + volume_id=self.volume_id, |
1678 | delta_content=request.response, |
1679 | end_generation=request.end_generation, |
1680 | full=request.full, |
1681 | |
1682 | === modified file 'ubuntuone/syncdaemon/event_queue.py' |
1683 | --- ubuntuone/syncdaemon/event_queue.py 2010-07-19 20:15:06 +0000 |
1684 | +++ ubuntuone/syncdaemon/event_queue.py 2010-07-22 13:39:48 +0000 |
1685 | @@ -85,19 +85,14 @@ |
1686 | 'AQ_CHANGE_PUBLIC_ACCESS_ERROR': ('share_id', 'node_id', 'error'), |
1687 | 'AQ_PUBLIC_FILES_LIST_OK': ('public_files',), |
1688 | 'AQ_PUBLIC_FILES_LIST_ERROR': ('error',), |
1689 | - 'AQ_DELTA_OK': ('delta_content', 'end_generation', 'full', 'free_bytes'), |
1690 | + 'AQ_DELTA_OK': ('volume_id', 'delta_content', 'end_generation', |
1691 | + 'full', 'free_bytes'), |
1692 | 'AQ_DELTA_ERROR': ('volume_id', 'error'), |
1693 | 'AQ_DELTA_NOT_POSSIBLE': ('volume_id',), |
1694 | |
1695 | 'SV_SHARE_CHANGED': ('info',), |
1696 | 'SV_SHARE_DELETED': ('share_id',), |
1697 | 'SV_SHARE_ANSWERED': ('share_id', 'answer'), |
1698 | - 'SV_HASH_NEW': ('share_id', 'node_id', 'hash'), |
1699 | - 'SV_FILE_NEW': ('share_id', 'node_id', 'parent_id', 'name'), |
1700 | - 'SV_DIR_NEW': ('share_id', 'node_id', 'parent_id', 'name'), |
1701 | - 'SV_FILE_DELETED': ('share_id', 'node_id'), |
1702 | - 'SV_MOVED': ('share_id', 'node_id', 'new_share_id', 'new_parent_id', |
1703 | - 'new_name'), |
1704 | 'SV_FREE_SPACE': ('share_id', 'free_bytes'), |
1705 | 'SV_ACCOUNT_CHANGED': ('account_info',), |
1706 | 'SV_VOLUME_CREATED': ('volume',), |
1707 | |
1708 | === modified file 'ubuntuone/syncdaemon/main.py' |
1709 | --- ubuntuone/syncdaemon/main.py 2010-03-19 14:28:40 +0000 |
1710 | +++ ubuntuone/syncdaemon/main.py 2010-07-22 13:39:48 +0000 |
1711 | @@ -233,9 +233,7 @@ |
1712 | |
1713 | def server_rescan(self): |
1714 | """Do the server rescan.""" |
1715 | - mdobj = self.fs.get_by_path(self.root_dir) |
1716 | - self.action_q.server_rescan(mdobj.mdid, |
1717 | - self.fs.get_data_for_server_rescan) |
1718 | + return self.vm.server_rescan() |
1719 | |
1720 | def set_oauth_token(self, key, secret): |
1721 | """ Sets the oauth token """ |
1722 | |
1723 | === modified file 'ubuntuone/syncdaemon/sync.py' |
1724 | --- ubuntuone/syncdaemon/sync.py 2010-07-20 16:17:58 +0000 |
1725 | +++ ubuntuone/syncdaemon/sync.py 2010-07-22 13:39:48 +0000 |
1726 | @@ -24,11 +24,9 @@ |
1727 | import sys |
1728 | |
1729 | from ubuntuone.syncdaemon.marker import MDMarker |
1730 | -from ubuntuone.storageprotocol.dircontent_pb2 import DIRECTORY |
1731 | -from ubuntuone.storageprotocol import dircontent |
1732 | +from ubuntuone.storageprotocol import delta |
1733 | from ubuntuone.syncdaemon.fsm.fsm import \ |
1734 | StateMachineRunner, StateMachine |
1735 | -from ubuntuone.syncdaemon.interfaces import IMarker |
1736 | from ubuntuone.syncdaemon import u1fsfsm |
1737 | from ubuntuone.syncdaemon.logger import DebugCapture |
1738 | from ubuntuone.syncdaemon.filesystem_manager import InconsistencyError |
1739 | @@ -309,6 +307,7 @@ |
1740 | with DebugCapture(self.log.logger): |
1741 | func_name = super(SyncStateMachineRunner, self).on_event(*args, |
1742 | **kwargs) |
1743 | + |
1744 | if not is_debug: |
1745 | self.log.info("Called %s (In: %s)" % (func_name, in_state)) |
1746 | |
1747 | @@ -362,8 +361,18 @@ |
1748 | is_directory=self.key.is_directory(), |
1749 | ) |
1750 | |
1751 | - def check_new_volume_generation(self, volume_id, new_generation): |
1752 | - """Check the new generation for the volume and take proper action.""" |
1753 | + def update_generation(self, volume_id, node_id, new_generation): |
1754 | + """Update the generation for the node and volume.""" |
1755 | + |
1756 | + # update the file |
1757 | + try: |
1758 | + node = self.m.fs.get_by_node_id(volume_id, node_id) |
1759 | + except KeyError: |
1760 | + pass |
1761 | + else: |
1762 | + self.m.fs.set_by_mdid(node.mdid, generation=new_generation) |
1763 | + |
1764 | + # update the volume |
1765 | try: |
1766 | volume = self.m.vm.get_volume(volume_id) |
1767 | except VolumeDoesNotExist: |
1768 | @@ -405,7 +414,6 @@ |
1769 | path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name) |
1770 | self.m.fs.create(path=path, share_id=share_id, node_id=node_id, |
1771 | is_dir=True) |
1772 | - self.m.action_q.query([(share_id, node_id, "")]) |
1773 | # pylint: disable-msg=W0704 |
1774 | # this should be provided by FSM, fix!! |
1775 | try: |
1776 | @@ -439,30 +447,6 @@ |
1777 | self.m.fs.set_node_id(self.key['path'], node_id) |
1778 | self.reget_dir(event, params) |
1779 | |
1780 | - def reget_dir(self, event, params, hash=None): |
1781 | - """Reget the directory.""" |
1782 | - self.m.action_q.cancel_download(share_id=self.key['share_id'], |
1783 | - node_id=self.key['node_id']) |
1784 | - self.key.remove_partial() |
1785 | - self.m.action_q.query([(self.key['share_id'], |
1786 | - self.key['node_id'], |
1787 | - self.key['local_hash'] or "")]) |
1788 | - self.key.set(server_hash=self.key['local_hash']) |
1789 | - self.key.sync() |
1790 | - |
1791 | - def get_dir(self, event, params, hash): |
1792 | - """Get the directory.""" |
1793 | - self.key.set(server_hash=hash) |
1794 | - self.key.sync() |
1795 | - self.m.fs.create_partial(node_id=self.key['node_id'], |
1796 | - share_id=self.key['share_id']) |
1797 | - self.m.action_q.listdir( |
1798 | - self.key['share_id'], self.key['node_id'], hash, |
1799 | - lambda : self.m.fs.get_partial_for_writing( |
1800 | - node_id=self.key['node_id'], |
1801 | - share_id=self.key['share_id']) |
1802 | - ) |
1803 | - |
1804 | def file_conflict(self, event, params, hash, crc32, size, stat): |
1805 | """This file is in conflict.""" |
1806 | self.key.move_to_conflict() |
1807 | @@ -474,92 +458,6 @@ |
1808 | node_id=self.key['node_id']) |
1809 | self.get_file(event, params, hash) |
1810 | |
1811 | - def merge_directory(self, event, params, hash): |
1812 | - """Merge the server directory with the local one.""" |
1813 | - new_files = [] |
1814 | - new_dirs = [] |
1815 | - deleted_filedirs = [] |
1816 | - moved = set() |
1817 | - |
1818 | - try: |
1819 | - fd = self.m.fs.get_partial(node_id=self.key['node_id'], |
1820 | - share_id=self.key['share_id']) |
1821 | - except InconsistencyError: |
1822 | - self.key.remove_partial() |
1823 | - self.key.set(server_hash=self.key['local_hash']) |
1824 | - self.key.sync() |
1825 | - self.m.action_q.query([ |
1826 | - (self.key["share_id"], self.key["node_id"], "")]) |
1827 | - # we dont perform the merge, we try to re get it |
1828 | - return |
1829 | - |
1830 | - |
1831 | - items = dircontent.parse_dir_content(fd) |
1832 | - server_dir = [ (o.utf8_name, o.node_type == DIRECTORY, o.uuid) |
1833 | - for o in items ] |
1834 | - client_dir = self.m.fs.dir_content(self.key['path']) |
1835 | - # XXX: lucio.torre: with huge dirs, this could take a while |
1836 | - |
1837 | - share = self.key['share_id'] |
1838 | - for name, isdir, uuid in server_dir: |
1839 | - # we took the name as bytes already encoded in utf8 |
1840 | - # directly from dircontent! |
1841 | - try: |
1842 | - md = self.m.fs.get_by_node_id(share, uuid) |
1843 | - except KeyError: |
1844 | - # not there, a new thing |
1845 | - if isdir: |
1846 | - new_dirs.append((share, uuid, name)) |
1847 | - else: |
1848 | - new_files.append((share, uuid, name)) |
1849 | - continue |
1850 | - mdpath = self.m.fs.get_abspath(md.share_id, md.path) |
1851 | - if mdpath != os.path.join(self.key['path'], name): |
1852 | - # this was moved, or maybe the server still didn't receive |
1853 | - # the move that happened here |
1854 | - if not self.m.action_q.node_is_with_queued_move(share, uuid): |
1855 | - # mark as moved |
1856 | - moved.add(uuid) |
1857 | - # signal moved |
1858 | - self.m.event_q.push("SV_MOVED", |
1859 | - share_id=md.share_id, node_id=uuid, |
1860 | - new_share_id=share, new_parent_id=self.key['node_id'], |
1861 | - new_name=name) |
1862 | - |
1863 | - |
1864 | - for name, isdir, uuid in client_dir: |
1865 | - if uuid is None: |
1866 | - continue |
1867 | - |
1868 | - if not (name, isdir, uuid) in server_dir: |
1869 | - # not there, a its gone on the server |
1870 | - if uuid in moved: |
1871 | - # this was a move, dont delete |
1872 | - continue |
1873 | - deleted_filedirs.append((share, uuid)) |
1874 | - |
1875 | - # these nodes are in process of being deleted from the server, so they |
1876 | - # are not exactly new locally (were already deleted here) |
1877 | - trash = set((share_id, node_id) for (share_id, node_id, _) |
1878 | - in self.m.fs.get_iter_trash()) |
1879 | - |
1880 | - parent_uuid = self.key['node_id'] |
1881 | - for share, uuid in deleted_filedirs: |
1882 | - self.m.event_q.push("SV_FILE_DELETED", |
1883 | - node_id=uuid, share_id=share) |
1884 | - for share, uuid, name in new_files: |
1885 | - if (share, uuid) not in trash: |
1886 | - self.m.event_q.push("SV_FILE_NEW", parent_id=parent_uuid, |
1887 | - node_id=uuid, share_id=share, name=name) |
1888 | - for share, uuid, name in new_dirs: |
1889 | - if (share, uuid) not in trash: |
1890 | - self.m.event_q.push("SV_DIR_NEW", parent_id=parent_uuid, |
1891 | - node_id=uuid, share_id=share, name=name) |
1892 | - |
1893 | - self.key.remove_partial() |
1894 | - self.key.set(local_hash=hash) |
1895 | - self.key.sync() |
1896 | - |
1897 | def new_file(self, event, params, share_id, node_id, parent_id, name): |
1898 | """create a local file.""" |
1899 | mdobj = self.m.fs.get_by_node_id(share_id, parent_id) |
1900 | @@ -569,7 +467,6 @@ |
1901 | self.key.set(server_hash="") |
1902 | self.key.set(local_hash="") |
1903 | self.key.sync() |
1904 | - self.m.action_q.query([(share_id, node_id, "")]) |
1905 | |
1906 | def new_file_on_server_with_local(self, event, params, share_id, |
1907 | node_id, parent_id, name): |
1908 | @@ -616,15 +513,6 @@ |
1909 | new_parent_id=new_parent_id, new_name=new_name) |
1910 | self.key.moved(self.key['share_id'], path_to) |
1911 | |
1912 | - # this is cheating, we change the state of another node |
1913 | - if not IMarker.providedBy(old_parent_id): |
1914 | - share_id = self.key['share_id'] |
1915 | - self.m.action_q.cancel_download(share_id, old_parent_id) |
1916 | - old_parent.remove_partial() |
1917 | - self.m.fs.set_by_node_id(old_parent_id, share_id, |
1918 | - server_hash="", local_hash="") |
1919 | - self.m.action_q.query([(share_id, old_parent_id, "")]) |
1920 | - |
1921 | # we only hash it if we're a file, not a directory |
1922 | if not self.key.is_dir(): |
1923 | self.m.hash_q.insert(self.key['path'], self.key['mdid']) |
1924 | @@ -649,8 +537,7 @@ |
1925 | self.key.remove_partial() |
1926 | self.key.set(server_hash=self.key['local_hash']) |
1927 | self.key.sync() |
1928 | - self.m.action_q.query([ |
1929 | - (self.key["share_id"], self.key["node_id"], "")]) |
1930 | + |
1931 | |
1932 | def new_local_file(self, event, parms, path): |
1933 | """a new local file was created""" |
1934 | @@ -690,10 +577,6 @@ |
1935 | """Server answered that dir creation was ok.""" |
1936 | self.m.fs.set_node_id(self.key['path'], new_id) |
1937 | |
1938 | - # query to get any updates in case dir was already there in server |
1939 | - self.m.action_q.query([(self.key['share_id'], self.key['node_id'], |
1940 | - self.key['local_hash'])]) |
1941 | - |
1942 | def calculate_hash(self, event, params): |
1943 | """calculate the hash of this.""" |
1944 | self.m.hash_q.insert(self.key['path'], self.key['mdid']) |
1945 | @@ -809,12 +692,8 @@ |
1946 | def dir_not_created_in_server(self, event, params, error): |
1947 | """Re-get the dir if it was because already exist.""" |
1948 | if error == "ALREADY_EXISTS": |
1949 | - # delete metadata and query the father to converge |
1950 | - parent = self.m.fs.get_by_node_id(self.key['share_id'], |
1951 | - self.key['parent_id']) |
1952 | + # delete metadata |
1953 | self.key.delete_metadata() |
1954 | - self.m.action_q.query([(parent.share_id, |
1955 | - parent.node_id, parent.local_hash)]) |
1956 | else: |
1957 | self.key.move_to_conflict() |
1958 | self.key.delete_metadata() |
1959 | @@ -890,9 +769,6 @@ |
1960 | self.key.remove_partial() |
1961 | self.key.set(server_hash=self.key['local_hash']) |
1962 | self.key.sync() |
1963 | - self.m.action_q.query([(self.key['share_id'], |
1964 | - self.key['node_id'], |
1965 | - self.key['local_hash'] or "")]) |
1966 | |
1967 | # pylint: disable-msg=C0103 |
1968 | def DESPAIR(self, event, params, *args, **kwargs): |
1969 | @@ -927,14 +803,14 @@ |
1970 | self.m = main |
1971 | self.m.event_q.subscribe(self) |
1972 | |
1973 | - def handle_SV_HASH_NEW(self, share_id, node_id, hash): |
1974 | - """on SV_HASH_NEW""" |
1975 | + def _handle_SV_HASH_NEW(self, share_id, node_id, hash): |
1976 | + """on SV_HASH_NEW. No longer called by EQ, only internally.""" |
1977 | key = FSKey(self.m.fs, share_id=share_id, node_id=node_id) |
1978 | log = FileLogger(self.logger, key) |
1979 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
1980 | ssmr.signal_event_with_hash("SV_HASH_NEW", hash) |
1981 | |
1982 | - def handle_SV_FILE_NEW(self, share_id, node_id, parent_id, name): |
1983 | + def _handle_SV_FILE_NEW(self, share_id, node_id, parent_id, name): |
1984 | """on SV_FILE_NEW""" |
1985 | parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id) |
1986 | path = os.path.join(parent["path"], name) |
1987 | @@ -943,7 +819,7 @@ |
1988 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
1989 | ssmr.on_event("SV_FILE_NEW", {}, share_id, node_id, parent_id, name) |
1990 | |
1991 | - def handle_SV_DIR_NEW(self, share_id, node_id, parent_id, name): |
1992 | + def _handle_SV_DIR_NEW(self, share_id, node_id, parent_id, name): |
1993 | """on SV_DIR_NEW""" |
1994 | parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id) |
1995 | path = os.path.join(parent["path"], name) |
1996 | @@ -952,8 +828,8 @@ |
1997 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
1998 | ssmr.on_event("SV_DIR_NEW", {}, share_id, node_id, parent_id, name) |
1999 | |
2000 | - def handle_SV_FILE_DELETED(self, share_id, node_id): |
2001 | - """on SV_FILE_DELETED""" |
2002 | + def _handle_SV_FILE_DELETED(self, share_id, node_id): |
2003 | + """on SV_FILE_DELETED. Not called by EQ anymore.""" |
2004 | key = FSKey(self.m.fs, share_id=share_id, node_id=node_id) |
2005 | log = FileLogger(self.logger, key) |
2006 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2007 | @@ -1035,8 +911,8 @@ |
2008 | key = FSKey(self.m.fs, mdid=marker) |
2009 | log = FileLogger(self.logger, key) |
2010 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2011 | - ssmr.check_new_volume_generation(volume_id, new_generation) |
2012 | ssmr.on_event("AQ_FILE_NEW_OK", {}, new_id) |
2013 | + ssmr.update_generation(volume_id, new_id, new_generation) |
2014 | |
2015 | def handle_AQ_FILE_NEW_ERROR(self, marker, error): |
2016 | """on AQ_FILE_NEW_ERROR""" |
2017 | @@ -1057,8 +933,8 @@ |
2018 | key = FSKey(self.m.fs, mdid=marker) |
2019 | log = FileLogger(self.logger, key) |
2020 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2021 | - ssmr.check_new_volume_generation(volume_id, new_generation) |
2022 | ssmr.on_event("AQ_DIR_NEW_OK", {}, new_id) |
2023 | + ssmr.update_generation(volume_id, new_id, new_generation) |
2024 | |
2025 | def handle_FS_FILE_CLOSE_WRITE(self, path): |
2026 | """on FS_FILE_CLOSE_WRITE""" |
2027 | @@ -1082,7 +958,7 @@ |
2028 | ssmr.on_event('HQ_HASH_ERROR', {}) |
2029 | |
2030 | def handle_HQ_HASH_NEW(self, path, hash, crc32, size, stat): |
2031 | - """on HQ_HASH_NEW""" |
2032 | + """on HQ_HASH_NEW.""" |
2033 | key = FSKey(self.m.fs, path=path) |
2034 | log = FileLogger(self.logger, key) |
2035 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2036 | @@ -1104,8 +980,8 @@ |
2037 | key = FSKey(self.m.fs, share_id=share_id, node_id=node_id) |
2038 | log = FileLogger(self.logger, key) |
2039 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2040 | - ssmr.check_new_volume_generation(share_id, new_generation) |
2041 | ssmr.signal_event_with_hash("AQ_UPLOAD_FINISHED", hash) |
2042 | + ssmr.update_generation(share_id, node_id, new_generation) |
2043 | |
2044 | def handle_AQ_UPLOAD_ERROR(self, share_id, node_id, error, hash): |
2045 | """on AQ_UPLOAD_ERROR""" |
2046 | @@ -1114,7 +990,7 @@ |
2047 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2048 | ssmr.signal_event_with_error_and_hash("AQ_UPLOAD_ERROR", error, hash) |
2049 | |
2050 | - def handle_SV_MOVED(self, share_id, node_id, new_share_id, new_parent_id, |
2051 | + def _handle_SV_MOVED(self, share_id, node_id, new_share_id, new_parent_id, |
2052 | new_name): |
2053 | """on SV_MOVED""" |
2054 | key = FSKey(self.m.fs, share_id=share_id, node_id=node_id) |
2055 | @@ -1129,8 +1005,8 @@ |
2056 | key = FSKey(self.m.fs, share_id=share_id, node_id=node_id) |
2057 | log = FileLogger(self.logger, key) |
2058 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2059 | - ssmr.check_new_volume_generation(share_id, new_generation) |
2060 | ssmr.on_event("AQ_UNLINK_OK", {}, share_id, node_id) |
2061 | + ssmr.update_generation(share_id, node_id, new_generation) |
2062 | |
2063 | def handle_AQ_UNLINK_ERROR(self, share_id, parent_id, node_id, error): |
2064 | """on AQ_UNLINK_ERROR""" |
2065 | @@ -1144,5 +1020,102 @@ |
2066 | key = FSKey(self.m.fs, share_id=share_id, node_id=node_id) |
2067 | log = FileLogger(self.logger, key) |
2068 | ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log) |
2069 | - ssmr.check_new_volume_generation(share_id, new_generation) |
2070 | - |
2071 | + ssmr.update_generation(share_id, node_id, new_generation) |
2072 | + |
2073 | + def handle_AQ_DELTA_OK(self, volume_id, delta_content, end_generation, |
2074 | + full, free_bytes): |
2075 | + """We got a new delta. Apply item by item of the delta. |
2076 | + |
2077 | + Unlike other Sync methods that just defer to Sync State Machine Runner, |
2078 | + delta operations sync state in a different way. So this method has a |
2079 | + lot of logic that you will not see in the other event handlers. |
2080 | + """ |
2081 | + |
2082 | + for dt in delta_content: |
2083 | + # we only know how to update files |
2084 | + try: |
2085 | + if not isinstance(dt, delta.FileInfoDelta): |
2086 | + continue |
2087 | + |
2088 | + # we only support files and directories |
2089 | + if dt.file_type == delta.DIRECTORY: |
2090 | + is_dir = True |
2091 | + elif dt.file_type == delta.FILE: |
2092 | + is_dir = False |
2093 | + else: |
2094 | + self.logger.warn("Unknown file type: %r" , dt.file_type) |
2095 | + continue |
2096 | + |
2097 | + # if the node is dead, call the remove handler and forget |
2098 | + # about it |
2099 | + if not dt.is_live: |
2100 | + self._handle_SV_FILE_DELETED(dt.share_id, dt.node_id) |
2101 | + # nothing else should happen with this |
2102 | + continue |
2103 | + |
2104 | + # here we must call handlers for SV_HASH_NEW, SV_MOVED |
2105 | + |
2106 | + try: |
2107 | + node = self.m.fs.get_by_node_id(dt.share_id, dt.node_id) |
2108 | + except KeyError: |
2109 | + # node not there, we must create it |
2110 | + if is_dir: |
2111 | + self._handle_SV_DIR_NEW(dt.share_id, dt.node_id, |
2112 | + dt.parent_id, dt.name) |
2113 | + else: |
2114 | + self._handle_SV_FILE_NEW(dt.share_id, dt.node_id, |
2115 | + dt.parent_id, dt.name) |
2116 | + node = self.m.fs.get_by_node_id(dt.share_id, dt.node_id) |
2117 | + |
2118 | + # if the delta is older than the node, skip! |
2119 | + if node.generation > dt.generation: |
2120 | + continue |
2121 | + |
2122 | + # if the path changed, we have a move, notify it |
2123 | + node_parent_id = self.m.fs.get_by_path( |
2124 | + self.m.fs.get_abspath(node.share_id, |
2125 | + os.path.dirname(node.path)) |
2126 | + ).node_id |
2127 | + node_name = os.path.basename(node.path) |
2128 | + if dt.parent_id != node_parent_id or dt.name != node_name: |
2129 | + # this was moved, or maybe the server still didn't receive |
2130 | + # the move that happened here |
2131 | + if not self.m.action_q.node_is_with_queued_move( |
2132 | + dt.share_id, dt.node_id): |
2133 | + # signal moved |
2134 | + self._handle_SV_MOVED( |
2135 | + share_id=node.share_id, node_id=node.node_id, |
2136 | + new_share_id=dt.share_id, |
2137 | + new_parent_id=dt.parent_id, new_name=dt.name) |
2138 | + else: |
2139 | + self.logger.info("Not calling _handle_SV_MOVED for " |
2140 | + "%s:%s due to pending move. " |
2141 | + "(old parent = %s, new_parent = %s " |
2142 | + "old_name = %s, new_name = %s)", |
2143 | + node.share_id, node.node_id, |
2144 | + node_parent_id, dt.parent_id, |
2145 | + node_name, dt.name) |
2146 | + |
2147 | + |
2148 | + # if its a dir, theres nothing else that we do with them except |
2149 | + # creating them. |
2150 | + # if its a file, we only care about the hash |
2151 | + if not is_dir: |
2152 | + self._handle_SV_HASH_NEW(dt.share_id, dt.node_id, |
2153 | + dt.content_hash) |
2154 | + |
2155 | + # node updated, update generation |
2156 | + self.m.fs.set_by_mdid(node.mdid, generation=dt.generation) |
2157 | + except Exception: |
2158 | + # we trap all exceptions so we can continue processing deltas |
2159 | + # even if something fails for one file. We just make sure |
2160 | + # we log. |
2161 | + self.logger.exception( |
2162 | + "Node delta for %s:%s can't be applied.""", |
2163 | + dt.share_id, dt.node_id) |
2164 | + |
2165 | + self.m.vm.update_free_space(volume_id, free_bytes) |
2166 | + self.m.vm.update_generation(volume_id, end_generation) |
2167 | + |
2168 | + if not full: |
2169 | + self.m.action_q.get_delta(volume_id, end_generation) |
2170 | |
2171 | === modified file 'ubuntuone/syncdaemon/u1fsfsm.ods' |
2172 | Binary files ubuntuone/syncdaemon/u1fsfsm.ods 2010-05-06 18:07:50 +0000 and ubuntuone/syncdaemon/u1fsfsm.ods 2010-07-22 13:39:48 +0000 differ |
2173 | === modified file 'ubuntuone/syncdaemon/u1fsfsm.py' |
2174 | --- ubuntuone/syncdaemon/u1fsfsm.py 2010-05-06 18:07:50 +0000 |
2175 | +++ ubuntuone/syncdaemon/u1fsfsm.py 2010-07-22 13:39:48 +0000 |
2176 | @@ -704,8 +704,8 @@ |
2177 | u'has_metadata': u'=', |
2178 | u'is_directory': u'='}}, |
2179 | {'ACTION': u'pass', |
2180 | - 'ACTION_FUNC': u'nothing', |
2181 | - 'COMMENTS': u'spurius download finished, ignore', |
2182 | + 'ACTION_FUNC': u'DESPAIR', |
2183 | + 'COMMENTS': u'we dont download directories anymore', |
2184 | 'PARAMETERS': {u'hash_eq_local_hash': u'!NA', |
2185 | u'hash_eq_server_hash': u'!NA', |
2186 | u'not_authorized': u'NA', |
2187 | @@ -717,8 +717,8 @@ |
2188 | u'has_metadata': u'=', |
2189 | u'is_directory': u'='}}, |
2190 | {'ACTION': u'merge_from_partial(uuid)', |
2191 | - 'ACTION_FUNC': u'merge_directory', |
2192 | - 'COMMENTS': u'this is the vanilla case, call merge directories', |
2193 | + 'ACTION_FUNC': u'DESPAIR', |
2194 | + 'COMMENTS': u'we dont download directories anymore', |
2195 | 'PARAMETERS': {u'hash_eq_local_hash': u'!NA', |
2196 | u'hash_eq_server_hash': u'T', |
2197 | u'not_authorized': u'NA', |
2198 | @@ -730,8 +730,8 @@ |
2199 | u'has_metadata': u'=', |
2200 | u'is_directory': u'='}}, |
2201 | {'ACTION': u'pass', |
2202 | - 'ACTION_FUNC': u'nothing', |
2203 | - 'COMMENTS': u'download we are not interested in', |
2204 | + 'ACTION_FUNC': u'DESPAIR', |
2205 | + 'COMMENTS': u'we dont download directories anymore', |
2206 | 'PARAMETERS': {u'hash_eq_local_hash': u'!NA', |
2207 | u'hash_eq_server_hash': u'F', |
2208 | u'not_authorized': u'NA', |
2209 | @@ -2485,8 +2485,8 @@ |
2210 | u'has_metadata': u'T', |
2211 | u'is_directory': u'='}}, |
2212 | {'ACTION': u'pass', |
2213 | - 'ACTION_FUNC': u'nothing', |
2214 | - 'COMMENTS': u'no news is good news', |
2215 | + 'ACTION_FUNC': u'DESPAIR', |
2216 | + 'COMMENTS': u'cant set hash on directories', |
2217 | 'PARAMETERS': {u'hash_eq_local_hash': u'!NA', |
2218 | u'hash_eq_server_hash': u'T', |
2219 | u'not_authorized': u'NA', |
2220 | @@ -2498,8 +2498,8 @@ |
2221 | u'has_metadata': u'=', |
2222 | u'is_directory': u'='}}, |
2223 | {'ACTION': u'md.set(uuid, server_hash=hash)\npartial = md.create_partial(uuid)\naq.getcontent(*partial)', |
2224 | - 'ACTION_FUNC': u'get_dir', |
2225 | - 'COMMENTS': u'normal case', |
2226 | + 'ACTION_FUNC': u'DESPAIR', |
2227 | + 'COMMENTS': u'cant set hash on directories', |
2228 | 'PARAMETERS': {u'hash_eq_local_hash': u'!NA', |
2229 | u'hash_eq_server_hash': u'F', |
2230 | u'not_authorized': u'NA', |
2231 | @@ -2511,8 +2511,8 @@ |
2232 | u'has_metadata': u'=', |
2233 | u'is_directory': u'='}}, |
2234 | {'ACTION': u'aq.cancel_download(uuid) \nmd.set(uuid, server_hash=hash)', |
2235 | - 'ACTION_FUNC': u'nothing', |
2236 | - 'COMMENTS': u'A download for a content object with the same hash is already in progress', |
2237 | + 'ACTION_FUNC': u'DESPAIR', |
2238 | + 'COMMENTS': u'cant set hash on directories', |
2239 | 'PARAMETERS': {u'hash_eq_local_hash': u'!NA', |
2240 | u'hash_eq_server_hash': u'T', |
2241 | u'not_authorized': u'NA', |
2242 | @@ -2524,8 +2524,8 @@ |
2243 | u'has_metadata': u'=', |
2244 | u'is_directory': u'='}}, |
2245 | {'ACTION': u'aq.cancel_download(uuid)\nmd.set(uuid, server_hash=hash)\npartial = md.get_partial(uuid)\naq.getcontent(*partial)', |
2246 | - 'ACTION_FUNC': u'reget_dir', |
2247 | - 'COMMENTS': u'a download was in progress but the server changed again. Note that this makes it important for AQ_DOWNLOAD_FINISHED to check the server hash.', |
2248 | + 'ACTION_FUNC': u'DESPAIR', |
2249 | + 'COMMENTS': u'cant set hash on directories', |
2250 | 'PARAMETERS': {u'hash_eq_local_hash': u'!NA', |
2251 | u'hash_eq_server_hash': u'F', |
2252 | u'not_authorized': u'NA', |
2253 | |
2254 | === modified file 'ubuntuone/syncdaemon/volume_manager.py' |
2255 | --- ubuntuone/syncdaemon/volume_manager.py 2010-07-19 16:03:28 +0000 |
2256 | +++ ubuntuone/syncdaemon/volume_manager.py 2010-07-22 13:39:48 +0000 |
2257 | @@ -110,6 +110,10 @@ |
2258 | self.node_id == other.node_id) |
2259 | return result |
2260 | |
2261 | + def __repr__(self): |
2262 | + return "<Volume id %r, node_id %r, generation %r>" % (self.volume_id, |
2263 | + self.node_id, |
2264 | + self.generation) |
2265 | |
2266 | class Share(Volume): |
2267 | """A volume representing a Share.""" |
2268 | @@ -251,7 +255,7 @@ |
2269 | self.subscribed = subscribed |
2270 | |
2271 | def __repr__(self): |
2272 | - return "<UDF id %r, real path %r>" % (self.id, self.path) |
2273 | + return "<UDF id %r, generation %r, real path %r>" % (self.id, self.generation, self.path) |
2274 | |
2275 | @property |
2276 | def ancestors(self): |
2277 | @@ -393,15 +397,6 @@ |
2278 | self.m.fs.create(path=root.path, |
2279 | share_id=root.volume_id, is_dir=True) |
2280 | |
2281 | - def handle_SYS_ROOT_RECEIVED(self, root_id): |
2282 | - """Got the root, map it to the root share.""" |
2283 | - self.log.debug('handle_SYS_ROOT_RECEIVED(%s)', root_id) |
2284 | - self._got_root(root_id) |
2285 | - self.m.action_q.inquire_account_info() |
2286 | - self.m.action_q.inquire_free_space(request.ROOT) |
2287 | - self.refresh_volumes() |
2288 | - self.refresh_shares() |
2289 | - |
2290 | def _got_root(self, node_id, free_bytes=None): |
2291 | """Set the root node_id to the root share and mdobj.""" |
2292 | # only set the root if we don't have it |
2293 | @@ -414,8 +409,12 @@ |
2294 | self.m.fs.set_node_id(root.path, node_id) |
2295 | root.node_id = node_id |
2296 | self.shares[request.ROOT] = root |
2297 | + self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id) |
2298 | elif root.node_id != node_id: |
2299 | self.m.event_q.push('SYS_ROOT_MISMATCH', root.node_id, node_id) |
2300 | + else: |
2301 | + # the root node_id match and we already have it |
2302 | + self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id) |
2303 | |
2304 | def refresh_shares(self): |
2305 | """Request the list of shares to the server.""" |
2306 | @@ -439,6 +438,43 @@ |
2307 | udfs.append(vol.volume_id) |
2308 | self._cleanup_volumes(shares=shares, udfs=udfs) |
2309 | |
2310 | + def handle_SV_VOLUME_NEW_GENERATION(self, volume_id, generation): |
2311 | + """Handle SV_VOLUME_NEW_GENERATION.""" |
2312 | + self.log.debug('handle_SV_VOLUME_NEW_GENERATION(%r, %r)', |
2313 | + volume_id, generation) |
2314 | + volume_id = str(volume_id) # be safe and use a str |
2315 | + try: |
2316 | + volume = self.get_volume(volume_id) |
2317 | + except VolumeDoesNotExist: |
2318 | + self.log.warning('Got a SV_VOLUME_NEW_GENERATION for a missing ' |
2319 | + 'volume: %r with %r', volume_id, generation) |
2320 | + else: |
2321 | + current_gen = volume.generation |
2322 | + if current_gen is None: |
2323 | + self.m.action_q.rescan_from_scratch(volume_id) |
2324 | + elif current_gen < generation: |
2325 | + # XXX: check if we want to impose a hard limit in the size of |
2326 | + # the delta and do a rescan from scratch if it's too big |
2327 | + self.m.action_q.get_delta(volume_id, current_gen) |
2328 | + # TODO/XXX: should we (VM) handle AQ_DELTA_ERROR? |
2329 | + elif current_gen >= generation: |
2330 | + self.log.info('Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume' |
2331 | + ' is at generation: %r', |
2332 | + volume_id, generation, current_gen) |
2333 | + |
2334 | + def handle_AQ_DELTA_NOT_POSSIBLE(self, volume_id): |
2335 | + """Handle AQ_DELTA_NOT_POSSIBLE.""" |
2336 | + self.log.debug('handle_AQ_DELTA_NOT_POSSIBLE(%r)', volume_id) |
2337 | + volume_id = str(volume_id) |
2338 | + try: |
2339 | + volume = self.get_volume(volume_id) |
2340 | + except VolumeDoesNotExist: |
2341 | + self.log.warning('Got a AQ_DELTA_NOT_POSSIBLE for a missing ' |
2342 | + 'volume: %r', volume_id) |
2343 | + else: |
2344 | + self.log.info('Requesting a rescan from scratch for: %s', volume) |
2345 | + self.m.action_q.rescan_from_scratch(volume_id) |
2346 | + |
2347 | def server_rescan(self): |
2348 | """Do the 'server rescan'""" |
2349 | d = self.m.action_q.query_volumes() |
2350 | @@ -453,7 +489,7 @@ |
2351 | |
2352 | def _volumes_rescan_cb(self, volumes): |
2353 | """Handle the volumes list for server rescan""" |
2354 | - self.log.debug('handling volumes rescan') |
2355 | + self.log.debug('handling volumes rescan: %r', volumes) |
2356 | events = [] |
2357 | for new_volume in volumes: |
2358 | # TODO: all this block might need to be inside a try:except |
2359 | @@ -465,14 +501,13 @@ |
2360 | # A new volume! |
2361 | self.log.debug('New volume! id: %r', volume_id) |
2362 | volume = self._handle_new_volume(new_volume) |
2363 | - # we don't have the volume, use the initial generation |
2364 | - current_generation = 0 |
2365 | - else: |
2366 | - current_generation = volume.generation or 0 # None case |
2367 | + finally: |
2368 | + current_generation = volume.generation |
2369 | new_generation = new_volume.generation |
2370 | - # update the free_bytes on the volume |
2371 | - self.update_free_space(volume_id, new_volume.free_bytes) |
2372 | - if current_generation < new_generation: |
2373 | + self.log.debug('%s current gen: %s vs new gen: %s', volume, |
2374 | + current_generation, new_generation) |
2375 | + if current_generation is None or \ |
2376 | + current_generation < new_generation: |
2377 | # add the event |
2378 | events.append(('SV_VOLUME_NEW_GENERATION', |
2379 | dict(volume_id=volume_id, |
2380 | @@ -668,31 +703,36 @@ |
2381 | self.shared[share_id] = share |
2382 | |
2383 | def handle_AQ_ANSWER_SHARE_OK(self, share_id, answer): |
2384 | - """ Handle successfully accepting a share """ |
2385 | + """Handle successfully accepting a share.""" |
2386 | if answer == 'Yes': |
2387 | share = self.shares[share_id] |
2388 | + share.accepted = True |
2389 | + self.shares[share.volume_id] = share |
2390 | self._create_fsm_object(share.path, share.volume_id, share.node_id) |
2391 | self._create_share_dir(share) |
2392 | - self.m.action_q.query([(share.volume_id, str(share.node_id), "")]) |
2393 | - self.m.action_q.inquire_free_space(share.volume_id) |
2394 | + # request the delta from the last known generation (should be None) |
2395 | + self.m.action_q.get_delta(share.volume_id, share.generation) |
2396 | |
2397 | def add_share(self, a_share): |
2398 | """ Add a share to the share list, and creates the fs mdobj. """ |
2399 | self.log.info('Adding new share with id: %s - path: %r', |
2400 | a_share.volume_id, a_share.path) |
2401 | - # if the share is there, do nothing and return it |
2402 | share = self.shares.get(a_share.volume_id) |
2403 | is_new_share = share is None |
2404 | if share is not None: |
2405 | share.accepted = a_share.accepted |
2406 | self.shares[share.volume_id] = share |
2407 | else: |
2408 | - share = self.shares[share.volume_id] = a_share |
2409 | + share = a_share |
2410 | + if is_new_share or not share.accepted: |
2411 | + # if it's a new share or isn't accepted set the generation to None |
2412 | + # to force a rescan |
2413 | + share.generation = None |
2414 | + # store the share |
2415 | + self.shares[share.volume_id] = share |
2416 | if share.accepted: |
2417 | self._create_fsm_object(share.path, share.volume_id, share.node_id) |
2418 | self._create_share_dir(share) |
2419 | - self.m.action_q.query([(share.volume_id, str(share.node_id), "")]) |
2420 | - self.m.action_q.inquire_free_space(share.volume_id) |
2421 | if is_new_share: |
2422 | # push the event only if it's a new share |
2423 | self.m.event_q.push('VM_SHARE_CREATED', share.volume_id) |
2424 | @@ -852,12 +892,15 @@ |
2425 | """Add the udf to the VM metadata if isn't there. |
2426 | |
2427 | If it's a new udf, create the directory, hook inotify |
2428 | - and execute a query. |
2429 | + and request the full delta. |
2430 | |
2431 | """ |
2432 | self.log.debug('add_udf: %s', udf) |
2433 | if self.udfs.get(udf.volume_id, None) is None: |
2434 | self.log.debug('udf not in metadata, adding it!') |
2435 | + # if it's a new UDF set the generation to None to force a |
2436 | + # rescan |
2437 | + udf.generation = None |
2438 | self.udfs[udf.volume_id] = udf |
2439 | self._create_fsm_object(udf.path, udf.volume_id, udf.node_id) |
2440 | # local and server rescan, this will add the inotify hooks |
2441 | @@ -1063,10 +1106,8 @@ |
2442 | mdobj = self.m.fs.get_by_path(udf.path) |
2443 | d = self.m.lr.scan_dir(mdobj.mdid, udf.path, udfmode=True) |
2444 | def server_rescan(_): |
2445 | - """Do a query over all known nodes.""" |
2446 | - data = self.m.fs.get_for_server_rescan_by_path(udf.path) |
2447 | - self.m.action_q.query(data) |
2448 | - self.m.action_q.inquire_free_space(request.ROOT) |
2449 | + """Request the delta from the last known generation.""" |
2450 | + self.m.action_q.get_delta(udf.id, udf.generation) |
2451 | d.addCallback(server_rescan) |
2452 | return d |
2453 | |
2454 | @@ -1140,6 +1181,7 @@ |
2455 | |
2456 | def update_generation(self, volume_id, generation): |
2457 | """Update the generation of the specified volume.""" |
2458 | + self.log.debug('update_generation: %r, %r', volume_id, generation) |
2459 | vol = self.get_volume(volume_id) |
2460 | vol.generation = generation |
2461 | if isinstance(vol, (Share, Root)): |
this is correct.