Merge lp:~ubuntuone-control-tower/ubuntuone-client/generations into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: John Lenton
Approved revision: 591
Merged at revision: 590
Proposed branch: lp:~ubuntuone-control-tower/ubuntuone-client/generations
Merge into: lp:ubuntuone-client
Diff against target: 2461 lines (+906/-577)
11 files modified
contrib/testing/testcase.py (+8/-6)
tests/syncdaemon/test_action_queue.py (+16/-189)
tests/syncdaemon/test_dbus.py (+2/-2)
tests/syncdaemon/test_sync.py (+323/-18)
tests/syncdaemon/test_vm.py (+310/-77)
ubuntuone/syncdaemon/action_queue.py (+31/-77)
ubuntuone/syncdaemon/event_queue.py (+2/-7)
ubuntuone/syncdaemon/main.py (+1/-3)
ubuntuone/syncdaemon/sync.py (+127/-154)
ubuntuone/syncdaemon/u1fsfsm.py (+14/-14)
ubuntuone/syncdaemon/volume_manager.py (+72/-30)
To merge this branch: bzr merge lp:~ubuntuone-control-tower/ubuntuone-client/generations
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Lucio Torre (community) Approve
Review via email: mp+30651@code.launchpad.net

Commit message

Merge a lot of 'generations' changes into trunk.

Description of the change

Merge a lot of 'generations' changes into trunk.

This branch is *big*, but note we've been reviewing and testing the different branches when merged into it.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) wrote :

this is correct.

review: Approve
Revision history for this message
John Lenton (chipaca) wrote :

This breaks trunk, and that's OK.
<cue lumberjack sketch>

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'contrib/testing/testcase.py'
2--- contrib/testing/testcase.py 2010-07-07 18:51:07 +0000
3+++ contrib/testing/testcase.py 2010-07-22 13:39:48 +0000
4@@ -201,8 +201,13 @@
5 """Stub implementation."""
6
7 def query_volumes(self):
8- """Stub implementation"""
9-
10+ """Stub implementation."""
11+
12+ def get_delta(self, volume_id, generation):
13+ """Stub implementation."""
14+
15+ def rescan_from_scratch(self, volume_id):
16+ """Stub implementation."""
17
18 class FakeMain(main.Main):
19 """ A fake Main class to setup the tests """
20@@ -558,9 +563,6 @@
21 """Add udf to the udfs dict."""
22 self.udfs[udf.id] = udf
23
24- def handle_SYS_ROOT_RECEIVED(self, root_id):
25- """Do nothing."""
26-
27 def share_deleted(self, _):
28 """Do nothing."""
29
30@@ -683,7 +685,7 @@
31 class DummyClass(object):
32 """Dummy class, does nothing."""
33
34- def __getattribute__(self, name):
35+ def __getattr__(self, name):
36 """Any attribute is a no-op."""
37 return lambda *args, **kwargs: None
38
39
40=== modified file 'tests/syncdaemon/test_action_queue.py'
41--- tests/syncdaemon/test_action_queue.py 2010-07-19 20:15:06 +0000
42+++ tests/syncdaemon/test_action_queue.py 2010-07-22 13:39:48 +0000
43@@ -43,18 +43,17 @@
44 from contrib.mocker import Mocker
45
46 from ubuntuone.storageprotocol import (
47- client, errors, protocol_pb2, volumes, request
48+ client, errors, protocol_pb2, request
49 )
50 from ubuntuone.syncdaemon import states
51 from ubuntuone.syncdaemon.action_queue import (
52 ActionQueue, ActionQueueCommand, ChangePublicAccess, CreateUDF,
53 DeleteVolume, Download, GetContentMixin, ListDir, ListVolumes,
54 NoisyRequestQueue, RequestQueue, UploadProgressWrapper, Upload,
55- CreateShare, GetPublicFiles, VolumesQuery, GetDelta,
56+ CreateShare, GetPublicFiles, GetDelta,
57 TRANSFER_PROGRESS_THRESHOLD, Unlink, Move, MakeFile, MakeDir,
58 )
59 from ubuntuone.syncdaemon.event_queue import EventQueue, EVENTS
60-from ubuntuone.syncdaemon.volume_manager import UDF
61
62
63 PATH = u'~/Documents/pdfs/moño/'
64@@ -789,8 +788,6 @@
65 # callbacks are connected
66 # pylint: disable-msg=W0212
67 aq = self.action_queue
68- self.assertEqual(aq.client._node_state_callback,
69- aq._node_state_callback)
70 self.assertEqual(aq.client._share_change_callback,
71 aq._share_change_callback)
72 self.assertEqual(aq.client._share_answer_callback,
73@@ -1279,60 +1276,6 @@
74 self.assertTrue(res is None)
75
76
77-class VolumesQueryTestCase(ConnectedBaseTestCase):
78- """Test for VolumesQuery ActionQueueCommand."""
79-
80- def setUp(self):
81- """Init."""
82- res = super(VolumesQueryTestCase, self).setUp()
83-
84- request_queue = RequestQueue(name='fu', action_queue=self.action_queue)
85- self.command = VolumesQuery(request_queue)
86-
87- return res
88-
89- def test_is_action_queue_command(self):
90- """Test proper inheritance."""
91- self.assertTrue(isinstance(self.command, ActionQueueCommand))
92- self.assertTrue(isinstance(self.command, ListVolumes))
93-
94- def test_run_returns_a_deferred(self):
95- """Test a deferred is returned."""
96- res = self.command._run()
97- self.assertTrue(isinstance(res, defer.Deferred), 'deferred returned')
98-
99- def test_run_calls_protocol(self):
100- """Test protocol's list_volumes is called."""
101- original = self.command.action_queue.client.list_volumes
102- self.called = False
103-
104- def check():
105- """Take control over client's feature."""
106- self.called = True
107-
108- self.command.action_queue.client.list_volumes = check
109-
110- self.command._run()
111-
112- self.assertTrue(self.called, 'command was called')
113-
114- self.command.action_queue.client.list_volumes = original
115-
116- def test_handle_success(self):
117- """Test AQ_LIST_VOLUMES is pushed on success."""
118- request = client.ListVolumes(self.action_queue.client)
119- request.volumes = [FakedVolume(), FakedVolume()]
120- res = self.command.handle_success(success=request)
121- self.assertEqual(request.volumes, res)
122-
123- def test_handle_failure(self):
124- """Test AQ_LIST_VOLUMES_ERROR is pushed on failure."""
125- msg = 'Something went wrong'
126- failure = Failure(DefaultException(msg))
127- res = self.command.handle_failure(failure=failure)
128- self.assertEqual(res, failure)
129-
130-
131 class DeleteVolumeTestCase(ConnectedBaseTestCase):
132 """Test for DeleteVolume ActionQueueCommand."""
133
134@@ -1419,52 +1362,6 @@
135
136 BasicTestCase.tearDown(self)
137
138- @defer.inlineCallbacks
139- def test_SV_HASH_NEW_is_pushed_for_subscrined_volume(self):
140- """SV_HASH_NEW is filtered when the volume is unsubscribed."""
141- udf_id = 'udf_id'
142- udf_volume = volumes.UDFVolume(udf_id, 'udf_node', None,
143- 10, u'~/ñoño')
144- path = self.vm._build_udf_path(udf_volume.suggested_path)
145- udf = UDF.from_udf_volume(udf_volume, path)
146- yield self.vm.add_udf(udf)
147- yield self.vm.subscribe_udf(udf_id)
148- assert self.vm.udfs[udf_id].subscribed
149- self.action_queue.event_queue.events = [] # reset events
150-
151- kwargs = dict(share_id=udf_id, node_id=NODE, hash=None)
152- self.action_queue._node_state_callback(**kwargs)
153- self.assertEqual([('SV_HASH_NEW', (), kwargs)],
154- self.action_queue.event_queue.events)
155-
156- @defer.inlineCallbacks
157- def test_SV_HASH_NEW_is_filtered_for_unsubscribed_volume(self):
158- """SV_HASH_NEW is filtered when the volume is unsubscribed."""
159- # build a VM and add it an UDF with subscribed to False
160- udf_id = 'udf_id'
161- udf_volume = volumes.UDFVolume(udf_id, 'udf_node', None, 10, u'~/ñoño')
162- path = self.vm._build_udf_path(udf_volume.suggested_path)
163- udf = UDF.from_udf_volume(udf_volume, path)
164- yield self.vm.add_udf(udf)
165- yield self.vm.unsubscribe_udf(udf_id)
166- assert not self.vm.udfs[udf_id].subscribed
167- self.action_queue.event_queue.events = [] # reset events
168-
169- self.action_queue._node_state_callback(share_id=udf_id,
170- node_id=None, hash=None)
171- self.assertEqual([], self.action_queue.event_queue.events)
172-
173- def test_SV_HASH_NEW_doesnt_fail_for_non_udf(self):
174- """SV_HASH_NEW keeps working like before for non-udfs."""
175- other_id = 'not in udfs'
176- assert other_id not in self.vm.udfs
177- self.action_queue.event_queue.events = [] # reset events
178-
179- kwargs = dict(share_id=other_id, node_id=NODE, hash=None)
180- self.action_queue._node_state_callback(**kwargs)
181- self.assertEqual([('SV_HASH_NEW', (), kwargs)],
182- self.action_queue.event_queue.events)
183-
184
185 class ChangePublicAccessTests(ConnectedBaseTestCase):
186 """Tests for the ChangePublicAccess ActionQueueCommand."""
187@@ -2222,19 +2119,23 @@
188 class SimpleAQTestCase(BasicTestCase):
189 """Simple tests for AQ API."""
190
191- def test_aq_server_rescan(self):
192- """Check the API of AQ.server_rescan."""
193+ def test_aq_query_volumes(self):
194+ """Check the API of AQ.query_volumes."""
195 self.main.start()
196 d = defer.Deferred()
197- def get_root(mdid):
198- """Fake get_root."""
199- d.callback(mdid)
200+ def list_volumes():
201+ """Fake list_volumes."""
202+ result = DummyClass()
203+ result.volumes = ['foo', 'bar']
204+ return defer.succeed(result)
205
206 self.action_queue.client = DummyClass()
207- self.action_queue.get_root = get_root
208- self.action_queue.server_rescan('foo', lambda: list())
209+ self.action_queue.client.list_volumes = list_volumes
210+ d = self.action_queue.query_volumes()
211 def check(result):
212- self.assertEqual('foo', result)
213+ self.assertIn('foo', result)
214+ self.assertIn('bar', result)
215+ return result
216 d.addCallback(check)
217 return d
218
219@@ -2783,81 +2684,6 @@
220 # assert internal deferred was fired
221 self.assertTrue(self.action_queue.deferred.called)
222
223- @defer.inlineCallbacks
224- def test_server_rescan_as_a_whole(self):
225- """Test error handling after server_rescan with no error."""
226-
227- def faked_get_root(marker):
228- """Fake the action_queue.get_root."""
229- root_id=object()
230- self.action_queue.event_queue.push('SYS_ROOT_RECEIVED',
231- root_id=root_id)
232- return root_id
233-
234- self.patch(self.action_queue, 'get_root', faked_get_root)
235-
236- self.action_queue.client.query = \
237- self.succeed_please(result=self.action_queue.client)
238- yield self.action_queue.server_rescan(root_mdid=object(),
239- data_gen=list)
240- event = ('SYS_SERVER_RESCAN_DONE', (), {})
241- self.assertEqual(event, self.action_queue.event_queue.events[-1])
242-
243- # assert internal deferred wasn't fired
244- self.assertFalse(self.action_queue.deferred.called)
245-
246- @defer.inlineCallbacks
247- def test_server_rescan_when_get_root_fails(self):
248- """Test error handling after server_rescan when get_root fails."""
249-
250- msg = protocol_pb2.Message()
251- msg.type = protocol_pb2.Message.ERROR
252- msg.error.type = protocol_pb2.Error.PROTOCOL_ERROR
253- msg.error.comment = 'get_root failed'
254- exc = errors.StorageRequestError(request=None, message=msg)
255- self.patch(self.action_queue, 'get_root', self.fail_please(exc))
256-
257- self.action_queue.client.query = self.fail_please(
258- NotImplementedError())
259-
260- yield self.action_queue.server_rescan(root_mdid=object(),
261- data_gen=list)
262-
263- event = ('SYS_SERVER_RESCAN_DONE', (), {})
264- self.assertNotIn(event, self.action_queue.event_queue.events)
265-
266- event = ('SYS_SERVER_ERROR', (), {'error': str(exc)})
267- self.assertEqual(event, self.action_queue.event_queue.events[-1])
268-
269- # assert internal deferred wasn't fired
270- self.assertFalse(self.action_queue.deferred.called)
271-
272- @defer.inlineCallbacks
273- def test_server_rescan_when_query_fails(self):
274- """Test error handling after server_rescan when query fails."""
275-
276- self.patch(self.action_queue, 'get_root',
277- self.succeed_please(result=object()))
278-
279- msg = protocol_pb2.Message()
280- msg.type = protocol_pb2.Message.ERROR
281- msg.error.type = protocol_pb2.Error.PROTOCOL_ERROR
282- msg.error.comment = 'query failed'
283- exc = errors.StorageRequestError(request=None, message=msg)
284- self.action_queue.client.query = self.fail_please(exc)
285-
286- yield self.action_queue.server_rescan(root_mdid=object(),
287- data_gen=list)
288-
289- event = ('SYS_SERVER_RESCAN_DONE', (), {})
290- self.assertNotIn(event, self.action_queue.event_queue.events)
291-
292- event = ('SYS_SERVER_ERROR', (), {'error': str(exc)})
293- self.assertEqual(event, self.action_queue.event_queue.events[-1])
294-
295- # assert internal deferred wasn't fired
296- self.assertFalse(self.action_queue.deferred.called)
297-
298
299 class GetDeltaTestCase(ConnectedBaseTestCase):
300 """Test for GetDelta ActionQueueCommand."""
301@@ -2912,7 +2738,8 @@
302
303 # check for successful event
304 received = self.action_queue.event_queue.events[0]
305- delta_info = dict(delta_content=['foo', 'bar'], end_generation=76,
306+ delta_info = dict(volume_id=VOLUME, delta_content=['foo', 'bar'],
307+ end_generation=76,
308 full=True, free_bytes=1231234)
309 self.assertEqual(received, ('AQ_DELTA_OK', (), delta_info))
310
311
312=== modified file 'tests/syncdaemon/test_dbus.py'
313--- tests/syncdaemon/test_dbus.py 2010-07-08 21:52:23 +0000
314+++ tests/syncdaemon/test_dbus.py 2010-07-22 13:39:48 +0000
315@@ -1508,8 +1508,8 @@
316 self.assertEquals('root_id', root_id)
317 self.assertEquals('another_root_id', new_root_id)
318 d.addCallback(check)
319- self.event_q.push('SYS_ROOT_RECEIVED', 'root_id')
320- self.event_q.push('SYS_ROOT_RECEIVED', 'another_root_id')
321+ self.main.vm._got_root('root_id')
322+ self.main.vm._got_root('another_root_id')
323 return d
324
325 def test_public_files_list(self):
326
327=== modified file 'tests/syncdaemon/test_sync.py'
328--- tests/syncdaemon/test_sync.py 2010-07-20 16:17:58 +0000
329+++ tests/syncdaemon/test_sync.py 2010-07-22 13:39:48 +0000
330@@ -19,6 +19,7 @@
331
332 from __future__ import with_statement
333
334+import copy
335 import logging
336 import os
337 import shutil
338@@ -43,6 +44,8 @@
339 from ubuntuone.syncdaemon.volume_manager import Share
340 from ubuntuone.syncdaemon.event_queue import EventQueue
341 from ubuntuone.storageprotocol.request import ROOT
342+from ubuntuone.storageprotocol import delta
343+from ubuntuone.syncdaemon.marker import MDMarker
344
345 DBusInterface.test = True
346
347@@ -312,7 +315,7 @@
348
349 # send the event and check
350 mdobj = fsm.get_by_mdid(mdid)
351- sync.handle_SV_HASH_NEW(mdobj.share_id, mdobj.node_id, '') # no content
352+ sync._handle_SV_HASH_NEW(mdobj.share_id, mdobj.node_id, '') # no content
353 self.assertTrue(self.called)
354
355
356@@ -464,81 +467,81 @@
357 def test_handle_AQ_UNLINK_OK(self):
358 """Test that AQ_UNLINK_OK calls the generation handler."""
359 called = []
360- self.patch(SyncStateMachineRunner, 'check_new_volume_generation',
361+ self.patch(SyncStateMachineRunner, 'update_generation',
362 lambda s, *a: called.append(a))
363
364 d = dict(share_id='volume_id', node_id='node_id', parent_id='parent',
365 new_generation=77)
366 self.sync.handle_AQ_UNLINK_OK(**d)
367- self.assertEqual(called, [('volume_id', 77)])
368+ self.assertEqual(called, [('volume_id', "node_id", 77)])
369
370 def test_handle_AQ_MOVE_OK(self):
371 """Test that AQ_MOVE_OK calls the generation handler."""
372 called = []
373- self.patch(SyncStateMachineRunner, 'check_new_volume_generation',
374+ self.patch(SyncStateMachineRunner, 'update_generation',
375 lambda s, *a: called.append(a))
376
377 d = dict(share_id='volume_id', node_id='node_id', new_generation=32)
378 self.sync.handle_AQ_MOVE_OK(**d)
379- self.assertEqual(called, [('volume_id', 32)])
380+ self.assertEqual(called, [('volume_id', "node_id", 32)])
381
382 def test_handle_AQ_UPLOAD_FINISHED(self):
383 """Test that AQ_UPLOAD_FINISHED calls the generation handler."""
384 called = []
385- self.patch(SyncStateMachineRunner, 'check_new_volume_generation',
386+ self.patch(SyncStateMachineRunner, 'update_generation',
387 lambda s, *a: called.append(a))
388
389 d = dict(share_id='volume_id', node_id='node_id',
390 hash='hash', new_generation=15)
391 self.sync.handle_AQ_UPLOAD_FINISHED(**d)
392- self.assertEqual(called, [('volume_id', 15)])
393+ self.assertEqual(called, [('volume_id', "node_id", 15)])
394
395 def test_handle_AQ_FILE_NEW_OK(self):
396 """Test that AQ_FILE_NEW_OK calls the generation handler."""
397 called = []
398- self.patch(SyncStateMachineRunner, 'check_new_volume_generation',
399+ self.patch(SyncStateMachineRunner, 'update_generation',
400 lambda s, *a: called.append(a))
401
402 d = dict(marker='mdid', new_id='new_id', new_generation=12,
403 volume_id=ROOT)
404 self.sync.handle_AQ_FILE_NEW_OK(**d)
405- self.assertEqual(called, [(ROOT, 12)])
406+ self.assertEqual(called, [(ROOT, "new_id", 12)])
407
408 def test_handle_AQ_DIR_NEW_OK(self):
409 """Test that AQ_DIR_NEW_OK calls the generation handler."""
410 called = []
411- self.patch(SyncStateMachineRunner, 'check_new_volume_generation',
412+ self.patch(SyncStateMachineRunner, 'update_generation',
413 lambda s, *a: called.append(a))
414
415 d = dict(marker='mdid', new_id='new_id', new_generation=17,
416 volume_id=ROOT)
417 self.sync.handle_AQ_DIR_NEW_OK(**d)
418- self.assertEqual(called, [(ROOT, 17)])
419+ self.assertEqual(called, [(ROOT, "new_id", 17)])
420
421 def test_checknewvol_no_volume(self):
422 """Log warning if volume does not exist."""
423 not_existant_vol = str(uuid.uuid4())
424- self.ssmr.check_new_volume_generation(not_existant_vol, 77)
425+ self.ssmr.update_generation(not_existant_vol, "node_id", 77)
426 self.assertTrue(self.handler.check_warning('Volume not found'))
427
428 def test_checknewvol_smaller_gen(self):
429 """Only log debug if new generation smaller than current."""
430 self.vm.update_generation(ROOT, 15)
431- self.ssmr.check_new_volume_generation(ROOT, 14)
432+ self.ssmr.update_generation(ROOT, "node_id", 14)
433 self.assertTrue(self.handler.check_info(
434 'Got smaller or equal generation'))
435
436 def test_checknewvol_same_gen(self):
437 """Only log debug if new generation equal than current."""
438 self.vm.update_generation(ROOT, 15)
439- self.ssmr.check_new_volume_generation(ROOT, 15)
440+ self.ssmr.update_generation(ROOT, "node_id", 15)
441 self.assertTrue(self.handler.check_info(
442 'Got smaller or equal generation'))
443
444 def test_checknewvol_gen_current_plus_one(self):
445 """Set new volume generation if current plus one."""
446 self.vm.update_generation(ROOT, 15)
447- self.ssmr.check_new_volume_generation(ROOT, 16)
448+ self.ssmr.update_generation(ROOT, "node_id", 16)
449 self.assertEqual(self.vm.get_volume(ROOT).generation, 16)
450 self.assertTrue(self.handler.check_info('Updating current generation'))
451
452@@ -550,7 +553,7 @@
453 self.vm.update_generation(ROOT, 15)
454
455 # call the method
456- self.ssmr.check_new_volume_generation(ROOT, 17)
457+ self.ssmr.update_generation(ROOT, "node_id", 17)
458
459 # check that generation didn't change, we asked for delta, and logged
460 self.assertEqual(self.vm.get_volume(ROOT).generation, 15)
461@@ -560,13 +563,315 @@
462 def test_checknewvol_new_gen_is_None(self):
463 """Log warning if volume does not exist."""
464 self.vm.update_generation(ROOT, 1)
465- self.ssmr.check_new_volume_generation(ROOT, None)
466+ self.ssmr.update_generation(ROOT, "node_id", None)
467 self.assertTrue(self.handler.check_debug(
468 'Client not ready for generations'))
469
470 def test_checknewvol_volume_gen_is_None(self):
471 """Log warning if volume does not exist."""
472 assert self.vm.get_volume(ROOT).generation is None
473- self.ssmr.check_new_volume_generation(ROOT, 15)
474+ self.ssmr.update_generation(ROOT, "node_id", 15)
475 self.assertTrue(self.handler.check_debug(
476 'Client not ready for generations'))
477+
478+ def test_check_generation_on_node_set(self):
479+ """Check that we update the generation of the node."""
480+ # create the fake file
481+ self.main.vm._got_root("parent_id")
482+ self.sync._handle_SV_FILE_NEW(ROOT, "node_id", "parent_id", "file")
483+
484+ # update generation
485+ self.ssmr.update_generation(ROOT, "node_id", 15)
486+
487+ # test
488+ node = self.main.fs.get_by_node_id(ROOT, "node_id")
489+ self.assertEqual(node.generation, 15)
490+
491+ def test_check_generation_on_node_set_wont_fail(self):
492+ """Check that if there is no node we dont fail."""
493+ # update generation
494+ self.ssmr.update_generation(ROOT, "node_id", 15)
495+
496+ def test_save_generation_after_seting_node_id(self):
497+ """Test that we call update_generation after the ssmr handler."""
498+ root_id = uuid.uuid4()
499+ self.main.vm._got_root(root_id)
500+ mdobj = self.main.fs.get_by_node_id(ROOT, root_id)
501+ path = os.path.join(
502+ self.main.fs.get_abspath(ROOT, mdobj.path), "file")
503+ self.main.fs.create(path=path, share_id=ROOT, is_dir=False)
504+ node = self.main.fs.get_by_path(path)
505+ d = dict(marker=MDMarker(node.mdid),
506+ new_id='new_id', new_generation=12,
507+ volume_id=ROOT)
508+ self.sync.handle_AQ_FILE_NEW_OK(**d)
509+
510+ # test
511+ node = self.main.fs.get_by_node_id(ROOT, "new_id")
512+ self.assertEqual(node.generation, 12)
513+
514+class TestHandleAqDeltaOk(BaseSync):
515+ """Sync.handle_AQ_DELTA_OK handles the recepcion of a new delta and applies
516+ all the changes that came from it."""
517+
518+ def setUp(self):
519+ """Do the setUp."""
520+
521+ super(TestHandleAqDeltaOk, self).setUp()
522+ self.sync = Sync(main=self.main)
523+ root_id = uuid.uuid4()
524+ self.main.vm._got_root(root_id)
525+
526+ self.filetxtdelta = delta.FileInfoDelta(
527+ generation=5, is_live=True, file_type=delta.FILE,
528+ parent_id=root_id, share_id=ROOT, node_id=uuid.uuid4(),
529+ name="file.txt", is_public=False, content_hash="hash",
530+ crc32=1, size=10, last_modified=0)
531+
532+ self.dirdelta = delta.FileInfoDelta(
533+ generation=6, is_live=True, file_type=delta.DIRECTORY,
534+ parent_id=root_id, share_id=ROOT, node_id=uuid.uuid4(),
535+ name="directory", is_public=False, content_hash="hash",
536+ crc32=1, size=10, last_modified=0)
537+
538+ def create_filetxt(self):
539+ """Create a file based on self.filetxtdelta."""
540+
541+ dt = self.filetxtdelta
542+ mdobj = self.main.fs.get_by_node_id(dt.share_id, dt.parent_id)
543+ path = os.path.join(
544+ self.main.fs.get_abspath(dt.share_id, mdobj.path), dt.name)
545+ self.main.fs.create(
546+ path=path, share_id=dt.share_id, node_id=dt.node_id,
547+ is_dir=False)
548+ node = self.main.fs.get_by_node_id(dt.share_id, dt.node_id)
549+ self.main.fs.set_by_mdid(node.mdid, generation=dt.generation)
550+
551+ def create_dir(self):
552+ """Create a directory based on self.dirdelta."""
553+
554+ dt = self.dirdelta
555+ mdobj = self.main.fs.get_by_node_id(dt.share_id, dt.parent_id)
556+ path = os.path.join(
557+ self.main.fs.get_abspath(dt.share_id, mdobj.path), dt.name)
558+ self.main.fs.create(
559+ path=path, share_id=dt.share_id, node_id=dt.node_id,
560+ is_dir=True)
561+ node = self.main.fs.get_by_node_id(dt.share_id, dt.node_id)
562+ self.main.fs.set_by_mdid(node.mdid, generation=dt.generation)
563+
564+ def test_not_full(self):
565+ """If we dont have a full delta, we need to ask for another one."""
566+ sync = Sync(main=self.main)
567+ called = []
568+ self.main.action_q.get_delta = lambda *a: called.append(a)
569+
570+ kwargs = dict(volume_id=ROOT, delta_content=[], end_generation=11,
571+ full=False, free_bytes=0)
572+ sync.handle_AQ_DELTA_OK(**kwargs)
573+
574+ self.assertEqual(called, [(ROOT, 11)])
575+
576+ def test_free_bytes_set(self):
577+ """The volume gets the free bytes set."""
578+ sync = Sync(main=self.main)
579+
580+ kwargs = dict(volume_id=ROOT, delta_content=[], end_generation=11,
581+ full=True, free_bytes=10)
582+ sync.handle_AQ_DELTA_OK(**kwargs)
583+
584+ self.assertEqual(self.main.vm.get_volume(ROOT).free_bytes, 10)
585+
586+ def test_end_generation_set(self):
587+ """The volume gets the end generation set."""
588+ sync = Sync(main=self.main)
589+
590+ kwargs = dict(volume_id=ROOT, delta_content=[], end_generation=11,
591+ full=True, free_bytes=10)
592+ sync.handle_AQ_DELTA_OK(**kwargs)
593+
594+ self.assertEqual(self.main.vm.get_volume(ROOT).generation, 11)
595+
596+ def test_node_generation_older_skip(self):
597+ """The node does not get the new generation set."""
598+ self.create_filetxt()
599+
600+ dt2 = copy.copy(self.filetxtdelta)
601+ dt2.generation = self.filetxtdelta.generation - 1
602+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
603+ full=True, free_bytes=10)
604+ self.sync.handle_AQ_DELTA_OK(**kwargs)
605+
606+ node = self.main.fs.get_by_node_id(ROOT, self.filetxtdelta.node_id)
607+ self.assertEqual(node.generation, self.filetxtdelta.generation)
608+
609+ def test_new_file(self):
610+ """Make sure a live file in the delta is in fs after executed."""
611+ deltas = [ self.filetxtdelta ]
612+
613+ kwargs = dict(volume_id=ROOT, delta_content=deltas, end_generation=11,
614+ full=True, free_bytes=10)
615+ self.sync.handle_AQ_DELTA_OK(**kwargs)
616+
617+ # check that the file is created
618+ node = self.main.fs.get_by_node_id(ROOT, self.filetxtdelta.node_id)
619+ self.assertEqual(node.path, self.filetxtdelta.name)
620+ self.assertEqual(node.is_dir, False)
621+ self.assertEqual(node.generation, self.filetxtdelta.generation)
622+
623+ def test_existing_file_still_there(self):
624+ """A file will still exist after a delta arrives."""
625+ self.create_filetxt()
626+
627+ # send a new delta
628+ dt2 = copy.copy(self.filetxtdelta)
629+ dt2.generation = 8
630+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
631+ full=True, free_bytes=10)
632+ self.sync.handle_AQ_DELTA_OK(**kwargs)
633+
634+ # check that the file is created
635+ node = self.main.fs.get_by_node_id(ROOT, self.filetxtdelta.node_id)
636+ self.assertEqual(node.generation, dt2.generation)
637+
638+ def test_existing_file_dead(self):
639+ """The handler for SV_FILE_DELETED is called"""
640+ # send a new delta
641+ dt2 = copy.copy(self.filetxtdelta)
642+ dt2.is_live = False
643+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
644+ full=True, free_bytes=10)
645+ called = []
646+ self.sync._handle_SV_FILE_DELETED = \
647+ lambda *args, **kwargs: called.append((args, kwargs))
648+ self.sync.handle_AQ_DELTA_OK(**kwargs)
649+
650+ # check that the handler is created
651+ self.assertTrue(called)
652+
653+ def test_new_dir(self):
654+ """Make sure a live dir in the delta is in fs after executed."""
655+
656+ deltas = [ self.dirdelta ]
657+
658+ kwargs = dict(volume_id=ROOT, delta_content=deltas, end_generation=11,
659+ full=True, free_bytes=10)
660+ self.sync.handle_AQ_DELTA_OK(**kwargs)
661+
662+ # check that the dir is created
663+ node = self.main.fs.get_by_node_id(ROOT, self.dirdelta.node_id)
664+ self.assertEqual(node.path, self.dirdelta.name)
665+ self.assertEqual(node.is_dir, True)
666+ self.assertEqual(node.generation, self.dirdelta.generation)
667+
668+ def test_sv_hash_new_called_for_file(self):
669+ """The handler for SV_HASH_NEW is called"""
670+ self.create_filetxt()
671+
672+ # send a new delta
673+ dt2 = copy.copy(self.filetxtdelta)
674+ dt2.generation = self.filetxtdelta.generation + 1
675+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
676+ full=True, free_bytes=10)
677+ called = []
678+ self.sync._handle_SV_HASH_NEW = \
679+ lambda *args, **kwargs: called.append((args, kwargs))
680+ self.sync.handle_AQ_DELTA_OK(**kwargs)
681+
682+ # check that the handler is created
683+ self.assertTrue(called)
684+
685+ def test_sv_hash_new_not_called_for_dir(self):
686+ """The handler for SV_HASH_NEW is not called"""
687+ self.create_dir()
688+
689+ # send a new delta
690+ dt2 = copy.copy(self.dirdelta)
691+ dt2.generation = self.dirdelta.generation + 1
692+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
693+ full=True, free_bytes=10)
694+ called = []
695+ self.sync._handle_SV_HASH_NEW = \
696+ lambda *args, **kwargs: called.append((args, kwargs))
697+ self.sync.handle_AQ_DELTA_OK(**kwargs)
698+
699+ # check that the handler is created
700+ self.assertFalse(called)
701+
702+ def test_sv_moved_called(self):
703+ """The handler for SV_MOVED is called"""
704+ self.create_dir()
705+ self.create_filetxt()
706+
707+ # send a new delta
708+ dt2 = copy.copy(self.filetxtdelta)
709+ dt2.generation = self.dirdelta.generation + 1
710+ dt2.parent_id = self.dirdelta.node_id
711+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
712+ full=True, free_bytes=10)
713+ called = []
714+ self.sync._handle_SV_MOVED = \
715+ lambda *args, **kwargs: called.append((args, kwargs))
716+ self.sync.handle_AQ_DELTA_OK(**kwargs)
717+
718+ # check that the handler is created
719+ self.assertTrue(called)
720+
721+ def test_sv_moved_called_name(self):
722+ """The handler for SV_MOVED is called"""
723+ self.create_dir()
724+ self.create_filetxt()
725+
726+ # send a new delta
727+ dt2 = copy.copy(self.filetxtdelta)
728+ dt2.generation = self.dirdelta.generation + 1
729+ dt2.name = "newname"
730+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
731+ full=True, free_bytes=10)
732+ called = []
733+ self.sync._handle_SV_MOVED = \
734+ lambda *args, **kwargs: called.append((args, kwargs))
735+ self.sync.handle_AQ_DELTA_OK(**kwargs)
736+
737+ # check that the handler is created
738+ self.assertTrue(called)
739+
740+ def test_sv_moved_not_called(self):
741+ """The handler for SV_MOVED is not called"""
742+ self.create_dir()
743+ self.create_filetxt()
744+
745+ # send a new delta
746+ dt2 = copy.copy(self.filetxtdelta)
747+ dt2.generation = self.dirdelta.generation + 1
748+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
749+ full=True, free_bytes=10)
750+ called = []
751+ self.sync._handle_SV_MOVED = \
752+ lambda *args, **kwargs: called.append((args, kwargs))
753+ self.sync.handle_AQ_DELTA_OK(**kwargs)
754+
755+ # check that the handler is created
756+ self.assertFalse(called)
757+
758+ def test_exception_logged(self):
759+ """We call self.logger.exception on error."""
760+ # send a new delta
761+ dt2 = copy.copy(self.filetxtdelta)
762+ dt2.is_live = False
763+ kwargs = dict(volume_id=ROOT, delta_content=[dt2], end_generation=11,
764+ full=True, free_bytes=10)
765+ self.sync._handle_SV_FILE_DELETED = \
766+ lambda *args, **kwargs: 1/0
767+ handler = testcase.MementoHandler()
768+ handler.setLevel(logging.ERROR)
769+ self.sync.logger.addHandler(handler)
770+
771+ self.sync.handle_AQ_DELTA_OK(**kwargs)
772+
773+ # check log
774+ self.assertEqual(len(handler.records), 1)
775+ log_msg = handler.records[0].message
776+ self.assertTrue("can't be applied." in log_msg)
777+ self.sync.logger.removeHandler(handler)
778+ self.handler.records = []
779
780=== modified file 'tests/syncdaemon/test_vm.py'
781--- tests/syncdaemon/test_vm.py 2010-07-12 15:47:38 +0000
782+++ tests/syncdaemon/test_vm.py 2010-07-22 13:39:48 +0000
783@@ -32,6 +32,7 @@
784 FakeMain,
785 BaseTwistedTestCase,
786 environ,
787+ MementoHandler,
788 )
789 from ubuntuone.syncdaemon import config
790 from ubuntuone.syncdaemon.volume_manager import (
791@@ -115,22 +116,27 @@
792 class VolumeManagerTests(BaseVolumeManagerTests):
793 """ Tests for Volume Manager internal API. """
794
795- def test_handle_SYS_ROOT_RECEIVED(self):
796- """Check that list_shares is called in handle_SYS_ROOT_RECEIVED """
797- d = defer.Deferred()
798- # helper method, pylint: disable-msg=C0111
799- def list_shares():
800- mdobj = self.main.fs.get_by_path(self.root_dir)
801- self.assertEquals('root_uuid', mdobj.node_id)
802- d.callback(mdobj)
803- self.main.action_q.list_shares = list_shares
804- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
805- return d
806+ @defer.inlineCallbacks
807+ def test__got_root_ok(self):
808+ """Test _got_root method."""
809+ d = defer.Deferred()
810+ self._listen_for('SYS_ROOT_RECEIVED', d.callback)
811+ self.vm._got_root('root_uuid')
812+ yield d
813+
814+ @defer.inlineCallbacks
815+ def test__got_root_mismatch(self):
816+ """Test for _got_root with different root node_id."""
817+ self.vm._got_root('root_uuid')
818+ d = defer.Deferred()
819+ self._listen_for('SYS_ROOT_MISMATCH', d.callback)
820+ self.vm._got_root('other_root_uuid')
821+ yield d
822
823 def test_add_share(self):
824 """ test the add_share method. """
825 # initialize the the root
826- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
827+ self.vm._got_root('root_uuid')
828 share_path = os.path.join(self.shares_dir, 'fake_share')
829 share = Share(path=share_path, volume_id='share_id')
830 self.vm.add_share(share)
831@@ -139,7 +145,7 @@
832 def test_share_deleted(self):
833 """ Check that a share is deleted from the share mapping. """
834 # initialize the the root
835- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
836+ self.vm._got_root('root_uuid')
837 share_path = os.path.join(self.shares_dir, 'fake_share')
838 share = Share(path=share_path, volume_id='share_id')
839 self.vm.add_share(share)
840@@ -150,7 +156,7 @@
841 def test_share_deleted_with_content(self):
842 """ Check that a share is deleted from the share mapping. """
843 # initialize the the root
844- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
845+ self.vm._got_root('root_uuid')
846 share_path = os.path.join(self.shares_dir, 'fake_share')
847 share = Share(path=share_path, volume_id='share_id',
848 node_id='share_node_id', access_level='Modify',
849@@ -192,7 +198,7 @@
850 'test_username',
851 'visible_name', 'Modify')
852 # initialize the the root
853- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
854+ self.vm._got_root('root_uuid')
855 share_path = os.path.join(self.shares_dir, share_holder.share_name)
856 share = Share(path=share_path, volume_id=share_holder.share_id,
857 access_level='View')
858@@ -209,7 +215,7 @@
859 'visible_username', 'yes',
860 'View')
861 # initialize the the root
862- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
863+ self.vm._got_root('root_uuid')
864 response = ListShares(None)
865 response.shares = [share_response]
866 self.vm.handle_AQ_SHARES_LIST(response)
867@@ -228,7 +234,7 @@
868 'test_username',
869 'visible_name', 'Modify')
870 # initialize the the root
871- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
872+ self.vm._got_root('root_uuid')
873 # create a share
874 share_path = os.path.join(self.shares_dir, share_holder.share_name)
875 share = Share(path=share_path, volume_id=str(share_holder.share_id),
876@@ -265,7 +271,7 @@
877 'my_visible_name', 'yes',
878 'Modify')
879 # initialize the the root
880- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
881+ self.vm._got_root('root_uuid')
882 shared_dir = os.path.join(self.root_dir, 'shared_dir')
883 self.main.fs.create(path=shared_dir, share_id="", is_dir=True)
884 self.main.fs.set_node_id(shared_dir, shared_response.subtree)
885@@ -374,7 +380,7 @@
886 def test_accept_share(self):
887 """ Test the accept_share method. """
888 d = defer.Deferred()
889- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
890+ self.vm._got_root('root_uuid')
891 share_path = os.path.join(self.shares_dir, 'fake_share')
892 share = Share(path=share_path, volume_id='share_id', node_id="node_id")
893 self.vm.add_share(share)
894@@ -403,7 +409,7 @@
895 'my_visible_name', 'yes',
896 'Modify')
897 # initialize the the root
898- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
899+ self.vm._got_root('root_uuid')
900 response = ListShares(None)
901 response.shares = [shared_response]
902 self.vm.handle_AQ_SHARES_LIST(response)
903@@ -421,7 +427,7 @@
904 self.main.fs.set_node_id(path, 'node_id')
905 self.main.fs.get_by_node_id("", 'node_id')
906 # initialize the the root
907- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
908+ self.vm._got_root('root_uuid')
909 # add the shared folder
910 share = Share(path=path, volume_id='share_id', access_level='View')
911 self.vm.add_shared(share)
912@@ -437,7 +443,7 @@
913 self.main.fs.set_node_id(path, 'node_id')
914 self.main.fs.get_by_node_id("", 'node_id')
915 # initialize the the root
916- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
917+ self.vm._got_root('root_uuid')
918 self.assertNotIn('share_id', self.vm.shared)
919 # check that a answer notify of a missing share don't blowup
920 self.vm.handle_SV_SHARE_ANSWERED('share_id', 'Yes')
921@@ -459,7 +465,7 @@
922 'visible_username', False,
923 'View')
924 # initialize the the root
925- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
926+ self.vm._got_root('root_uuid')
927 response = ListShares(None)
928 response.shares = [share_response, share_response_1]
929 self.vm.handle_AQ_SHARES_LIST(response)
930@@ -554,14 +560,43 @@
931 @defer.inlineCallbacks
932 def test_root_mismatch(self):
933 """Test that SYS_ROOT_MISMATCH is pushed."""
934- self.vm.handle_SYS_ROOT_RECEIVED('root_node_id')
935+ self.vm._got_root('root_node_id')
936 d = defer.Deferred()
937 self._listen_for('SYS_ROOT_MISMATCH', d.callback)
938- self.vm.handle_SYS_ROOT_RECEIVED('root_id')
939+ self.vm._got_root('root_id')
940 current_id, new_id = yield d
941 self.assertEquals('root_node_id', current_id)
942 self.assertEquals('root_id', new_id)
943
944+ @defer.inlineCallbacks
945+ def test_handle_AQ_ANSWER_SHARE_OK(self):
946+ """Test for handle_AQ_ANSWER_SHARE_OK."""
947+ share_id = uuid.uuid4()
948+ share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None,
949+ 10, 'to_me', 'fake_share', 'username',
950+ 'visible_username', False, 'View')
951+ dir_name = self.vm._build_share_path(share_volume.share_name,
952+ share_volume.other_visible_name)
953+ share_path = os.path.join(self.main.shares_dir, dir_name)
954+ share = Share.from_share_volume(share_volume, share_path)
955+
956+ get_delta_d = defer.Deferred()
957+ def fake_get_delta(volume_id, generation):
958+ """A fake get_delta that check the arguments."""
959+ self.assertEquals(share.volume_id, volume_id)
960+ self.assertEquals(share.generation, generation)
961+ get_delta_d.callback(None)
962+ self.main.action_q.get_delta = fake_get_delta
963+
964+ self.vm.add_share(share)
965+ self.vm.handle_AQ_ANSWER_SHARE_OK(share.volume_id, 'Yes')
966+ yield get_delta_d
967+ share = self.vm.get_volume(share.volume_id)
968+ self.assertTrue(share.accepted, 'accepted != True')
969+ self.assertTrue(self.main.fs.get_by_path(share.path),
970+ 'No metadata for share root node.')
971+ self.assertTrue(os.path.exists(share.path), 'share path missing on disk!')
972+
973
974 class VolumeManagerUnicodeTests(BaseVolumeManagerTests):
975 """Tests for Volume Manager unicode capabilities."""
976@@ -574,7 +609,7 @@
977 'visible', 'yes',
978 'View')
979 # initialize the the root
980- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
981+ self.vm._got_root('root_uuid')
982 response = ListShares(None)
983 response.shares = [share_response]
984 self.vm.handle_AQ_SHARES_LIST(response)
985@@ -593,7 +628,7 @@
986 u'Darío Toño', 'yes',
987 'View')
988 # initialize the the root
989- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
990+ self.vm._got_root('root_uuid')
991 response = ListShares(None)
992 response.shares = [share_response]
993 self.vm.handle_AQ_SHARES_LIST(response)
994@@ -610,7 +645,7 @@
995 u'año',
996 'test_username',
997 'visible', 'Modify')
998- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
999+ self.vm._got_root('root_uuid')
1000 self.vm.handle_SV_SHARE_CHANGED(info=share_holder)
1001 shouldbe_dir = os.path.join(self.shares_dir,
1002 u"año".encode("utf8") + " from visible")
1003@@ -622,7 +657,7 @@
1004 'share',
1005 'test_username',
1006 u'Ramón', 'Modify')
1007- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1008+ self.vm._got_root('root_uuid')
1009 self.vm.handle_SV_SHARE_CHANGED(info=share_holder)
1010 shouldbe_dir = os.path.join(self.shares_dir,
1011 "share from " + u"Ramón".encode("utf8"))
1012@@ -654,6 +689,7 @@
1013 free_bytes, suggested_path)
1014 udf = UDF.from_udf_volume(volume, path)
1015 udf.subscribed = subscribed
1016+ udf.generation = generation
1017 return udf, volume
1018
1019 def test_build_udf_path(self):
1020@@ -715,20 +751,16 @@
1021 suggested_path = "suggested_path"
1022 udf, volume = self._create_udf(uuid.uuid4(), 'udf_node_id',
1023 '~/' + suggested_path, subscribed=True)
1024- query_d = defer.Deferred()
1025- space_d = defer.Deferred()
1026- query_d.addCallback(lambda _: space_d)
1027- def fake_query(data):
1028- self.assertEquals([(udf.volume_id, udf.node_id, '')], data)
1029- query_d.callback(None)
1030- def fake_inquire_free_space(volume_id):
1031- self.assertEquals(volume_id, request.ROOT)
1032- space_d.callback(None)
1033- self.main.action_q.query = fake_query
1034- self.main.action_q.inquire_free_space = fake_inquire_free_space
1035+ get_delta_d = defer.Deferred()
1036+ def fake_get_delta(volume_id, generation):
1037+ """A fake get_delta that check the arguments."""
1038+ self.assertEquals(udf.volume_id, volume_id)
1039+ self.assertEquals(udf.generation, generation)
1040+ get_delta_d.callback(None)
1041+ self.main.action_q.get_delta = fake_get_delta
1042
1043 yield self.vm.add_udf(udf)
1044- yield query_d
1045+ yield get_delta_d
1046 self.assertEquals(os.path.join(self.home_dir, suggested_path),
1047 udf.path)
1048 self.assertEquals(1, len(self.vm.udfs))
1049@@ -849,8 +881,9 @@
1050 self.assertEquals('udf_uuid', udf.node_id)
1051 self.assertEquals(os.path.join(self.home_dir, 'UDF'), udf.path)
1052 # check that the root it's there
1053- self.assertTrue('' in self.vm.shares)
1054- self.assertEquals(self.vm.shares[''].node_id, str(root_volume.node_id))
1055+ self.assertTrue(request.ROOT in self.vm.shares)
1056+ self.assertEquals(self.vm.shares[request.ROOT].node_id,
1057+ str(root_volume.node_id))
1058 # now send the same list again and check
1059 # use a custom home
1060 with environ('HOME', self.home_dir):
1061@@ -896,7 +929,7 @@
1062 udf_id = uuid.uuid4()
1063 udf_volume = volumes.UDFVolume(udf_id, 'udf_uuid', None, 10, u'~/ñoño')
1064 # initialize the the root
1065- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1066+ self.vm._got_root('root_uuid')
1067 response = [share_volume, udf_volume]
1068 d = defer.Deferred()
1069 self._listen_for('VM_UDF_CREATED', d.callback)
1070@@ -927,8 +960,9 @@
1071 self.vm.handle_AQ_LIST_VOLUMES(response)
1072 self.assertEquals(1, len(self.vm.shares)) # the new share and root
1073 # check that the root is in the shares dict
1074- self.assertTrue('' in self.vm.shares)
1075- self.assertEquals(self.vm.shares[''].node_id, str(root_volume.node_id))
1076+ self.assertTrue(request.ROOT in self.vm.shares)
1077+ self.assertEquals(self.vm.shares[request.ROOT].node_id,
1078+ str(root_volume.node_id))
1079
1080 def test_get_udf_path_name(self):
1081 """Test for _get_udf_path_name."""
1082@@ -1413,7 +1447,7 @@
1083 'username', 'visible_username',
1084 True, 'View')
1085 # initialize the the root
1086- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1087+ self.vm._got_root('root_uuid')
1088 self._listen_for('VM_UDF_CREATED', d.callback)
1089 def check_udf(info):
1090 """The udf creation callback"""
1091@@ -1475,7 +1509,7 @@
1092 yield self.vm.add_udf(udf)
1093 self.vm.add_share(share)
1094 # initialize the the root
1095- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1096+ self.vm._got_root('root_uuid')
1097 d = defer.Deferred()
1098 self._listen_for('VM_VOLUME_DELETED', d.callback)
1099 with environ('HOME', self.home_dir):
1100@@ -1585,7 +1619,7 @@
1101 def test_no_UDFs_inside_root(self):
1102 """Test that a UDF can't be created inside the root"""
1103 # initialize the root
1104- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1105+ self.vm._got_root('root_uuid')
1106 udf_path = os.path.join(self.root_dir, 'udf_inside_root')
1107 # patch FakeAQ
1108 def create_udf(path, name, marker):
1109@@ -1695,8 +1729,8 @@
1110 share_path = os.path.join(self.main.shares_dir, dir_name)
1111 share = Share.from_share_volume(share_volume, share_path)
1112 # get the root
1113- root = self.vm.get_volume('')
1114- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1115+ root = self.vm.get_volume(request.ROOT)
1116+ self.vm._got_root('root_uuid')
1117 # create a UDF
1118 udf_id = uuid.uuid4()
1119 udf, volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF')
1120@@ -1738,8 +1772,8 @@
1121 share_path = os.path.join(self.main.shares_dir, dir_name)
1122 share = Share.from_share_volume(share_volume, share_path)
1123 # get the root
1124- root = self.vm.get_volume('')
1125- self.vm.handle_SYS_ROOT_RECEIVED('root_node_id')
1126+ root = self.vm.get_volume(request.ROOT)
1127+ self.vm._got_root('root_node_id')
1128 # create a UDF
1129 udf_id = uuid.uuid4()
1130 udf, volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF')
1131@@ -1763,7 +1797,7 @@
1132 def test_UDF_cant_be_a_symlink(self):
1133 """Test that a UDF can't be a symlink."""
1134 # initialize the root
1135- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1136+ self.vm._got_root('root_uuid')
1137 real_udf_path = os.path.join(self.home_dir, "my_udf")
1138 udf_path = os.path.join(self.home_dir, "MyUDF")
1139 # patch FakeAQ
1140@@ -1789,7 +1823,7 @@
1141 def test_UDF_cant_be_inside_symlink(self):
1142 """Test that a UDF can't be inside a symlink."""
1143 # initialize the root
1144- self.vm.handle_SYS_ROOT_RECEIVED('root_uuid')
1145+ self.vm._got_root('root_uuid')
1146 real_udf_path = os.path.join(self.home_dir, "udf_parent", "my_udf")
1147 udf_path = os.path.join(self.home_dir, "MyUDF")
1148 # patch FakeAQ
1149@@ -1839,10 +1873,10 @@
1150 for event in events)
1151 self.assertIn(str(share_id), events_dict)
1152 self.assertIn(str(udf_id), events_dict)
1153- self.assertIn('', events_dict)
1154+ self.assertIn(request.ROOT, events_dict)
1155 self.assertEquals(1, events_dict[str(share_id)])
1156 self.assertEquals(1, events_dict[str(udf_id)])
1157- self.assertEquals(1, events_dict[''])
1158+ self.assertEquals(1, events_dict[request.ROOT])
1159
1160 @defer.inlineCallbacks
1161 def test_server_rescan_error(self):
1162@@ -1888,10 +1922,10 @@
1163 for event in events)
1164 self.assertIn(str(share_id), events_dict)
1165 self.assertIn(str(udf_id), events_dict)
1166- self.assertIn('', events_dict)
1167+ self.assertIn(request.ROOT, events_dict)
1168 self.assertEquals(1, events_dict[str(share_id)])
1169 self.assertEquals(1, events_dict[str(udf_id)])
1170- self.assertEquals(1, events_dict[''])
1171+ self.assertEquals(1, events_dict[request.ROOT])
1172 # set the local metadata generation to new value
1173 share = self.vm.shares[str(share_id)]
1174 share.generation = share_volume.generation
1175@@ -1901,7 +1935,7 @@
1176 self.vm.udfs[str(udf_id)] = udf
1177 root = self.vm.root
1178 root.generation = root_volume.generation
1179- self.vm.shares[''] = root
1180+ self.vm.shares[request.ROOT] = root
1181 # now that we have the volumes in metadata, try with a higher value.
1182 share_volume.generation = 10
1183 udf_volume.generation = 5
1184@@ -1916,7 +1950,7 @@
1185 for event in events)
1186 self.assertIn(str(share_id), events_dict)
1187 self.assertIn(str(udf_id), events_dict)
1188- self.assertNotIn('', events_dict) # same generartion as metadata
1189+ self.assertNotIn(request.ROOT, events_dict) # same generartion as metadata
1190 self.assertEquals(10, events_dict[str(share_id)])
1191 self.assertEquals(5, events_dict[str(udf_id)])
1192 # now only change the root volume generation
1193@@ -1933,8 +1967,8 @@
1194 for event in events)
1195 self.assertNotIn(str(share_id), events_dict)
1196 self.assertNotIn(str(udf_id), events_dict)
1197- self.assertIn('', events_dict) # same generartion as metadata
1198- self.assertEquals(100, events_dict[''])
1199+ self.assertIn(request.ROOT, events_dict) # same generartion as metadata
1200+ self.assertEquals(100, events_dict[request.ROOT])
1201
1202 @defer.inlineCallbacks
1203 def test_update_generation(self):
1204@@ -1948,8 +1982,8 @@
1205 share_path = os.path.join(self.main.shares_dir, dir_name)
1206 share = Share.from_share_volume(share_volume, share_path)
1207 # get the root
1208- root = self.vm.get_volume('')
1209- self.vm.handle_SYS_ROOT_RECEIVED('root_node_id')
1210+ root = self.vm.get_volume(request.ROOT)
1211+ self.vm._got_root('root_node_id')
1212 # create a UDF
1213 udf_id = uuid.uuid4()
1214 udf, _ = self._create_udf(udf_id, 'udf_node_id', '~/UDF')
1215@@ -1971,6 +2005,205 @@
1216 self.assertRaises(VolumeDoesNotExist, self.vm.update_generation,
1217 str(uuid.uuid4()), 1)
1218
1219+ @defer.inlineCallbacks
1220+ def test_handle_SV_VOLUME_NEW_GENERATION_udf(self):
1221+ """Test handle_SV_VOLUME_NEW_GENERATION for udf."""
1222+ # create a UDF
1223+ udf_id = uuid.uuid4()
1224+ udf, udf_volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF')
1225+ yield self.vm.add_udf(udf)
1226+ self.vm.update_generation(udf.volume_id, 10)
1227+ d = defer.Deferred()
1228+ self.patch(self.main.action_q, 'get_delta', lambda v, g: d.callback((v, g)))
1229+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', udf.volume_id, 100)
1230+ vol_id, gen = yield d
1231+ vol = self.vm.get_volume(vol_id)
1232+ self.assertEqual(vol_id, vol.volume_id)
1233+ self.assertEqual(gen, vol.generation)
1234+
1235+ @defer.inlineCallbacks
1236+ def test_handle_SV_VOLUME_NEW_GENERATION_udf_from_scratch(self):
1237+ """Test handle_SV_VOLUME_NEW_GENERATION for udf."""
1238+ # create a UDF
1239+ udf_id = uuid.uuid4()
1240+ udf, udf_volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF')
1241+ yield self.vm.add_udf(udf)
1242+ self.vm.update_generation(udf.volume_id, None)
1243+ d = defer.Deferred()
1244+ self.patch(self.main.action_q, 'rescan_from_scratch', d.callback)
1245+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', udf.volume_id, 100)
1246+ vol_id = yield d
1247+ self.assertEquals(vol_id, udf.volume_id)
1248+
1249+ @defer.inlineCallbacks
1250+ def test_handle_SV_VOLUME_NEW_GENERATION_udf_eq(self):
1251+ """Test handle_SV_VOLUME_NEW_GENERATION for udf."""
1252+ # get the root
1253+ udf_id = uuid.uuid4()
1254+ udf, udf_volume = self._create_udf(udf_id, 'udf_node_id', '~/UDF')
1255+ yield self.vm.add_udf(udf)
1256+ self.vm.update_generation(udf.volume_id, 100)
1257+ handler = MementoHandler()
1258+ handler.setLevel(logging.INFO)
1259+ self.vm.log.addHandler(handler)
1260+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', udf.volume_id, 100)
1261+ self.assertEqual(1, len(handler.records))
1262+ msg = 'Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume' + \
1263+ ' is at generation: %r'
1264+ self.assertEqual(msg % (udf.volume_id, 100, 100),
1265+ handler.records[0].message)
1266+ self.vm.log.removeHandler(handler)
1267+
1268+ @defer.inlineCallbacks
1269+ def test_handle_SV_VOLUME_NEW_GENERATION_root(self):
1270+ """Test handle_SV_VOLUME_NEW_GENERATION for root share."""
1271+ # get the root
1272+ root = self.vm.get_volume(request.ROOT)
1273+ self.vm._got_root('root_node_id')
1274+ self.vm.update_generation(root.volume_id, 10)
1275+ d = defer.Deferred()
1276+ self.patch(self.main.action_q, 'get_delta', lambda v, g: d.callback((v, g)))
1277+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', root.volume_id, 100)
1278+ vol_id, gen = yield d
1279+ vol = self.vm.get_volume(vol_id)
1280+ self.assertEqual(vol_id, vol.volume_id)
1281+ self.assertEqual(gen, vol.generation)
1282+
1283+ @defer.inlineCallbacks
1284+ def test_handle_SV_VOLUME_NEW_GENERATION_root_from_scratch(self):
1285+ """Test handle_SV_VOLUME_NEW_GENERATION for root share."""
1286+ # get the root
1287+ root = self.vm.get_volume(request.ROOT)
1288+ self.vm._got_root('root_node_id')
1289+ self.vm.update_generation(root.volume_id, None)
1290+ d = defer.Deferred()
1291+ self.patch(self.main.action_q, 'rescan_from_scratch', d.callback)
1292+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', root.volume_id, 100)
1293+ vol_id = yield d
1294+ self.assertEquals(vol_id, root.volume_id)
1295+
1296+ def test_handle_SV_VOLUME_NEW_GENERATION_root_eq(self):
1297+ """Test handle_SV_VOLUME_NEW_GENERATION for root share."""
1298+ # get the root
1299+ root = self.vm.get_volume(request.ROOT)
1300+ self.vm._got_root('root_node_id')
1301+ self.vm.update_generation(root.volume_id, 100)
1302+ handler = MementoHandler()
1303+ handler.setLevel(logging.INFO)
1304+ self.vm.log.addHandler(handler)
1305+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', root.volume_id, 100)
1306+ self.assertEqual(1, len(handler.records))
1307+ msg = 'Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume' + \
1308+ ' is at generation: %r'
1309+ self.assertEqual(msg % (root.volume_id, 100, 100),
1310+ handler.records[0].message)
1311+ self.vm.log.removeHandler(handler)
1312+
1313+ @defer.inlineCallbacks
1314+ def test_handle_SV_VOLUME_NEW_GENERATION_share(self):
1315+ """Test handle_SV_VOLUME_NEW_GENERATION for a share."""
1316+ share_id = uuid.uuid4()
1317+ share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None,
1318+ 10, 'to_me', 'fake_share', 'username',
1319+ 'visible_username', True, 'View')
1320+ dir_name = self.vm._build_share_path(share_volume.share_name,
1321+ share_volume.other_visible_name)
1322+ share_path = os.path.join(self.main.shares_dir, dir_name)
1323+ share = Share.from_share_volume(share_volume, share_path)
1324+ self.vm.add_share(share)
1325+ self.vm.update_generation(share.volume_id, 10)
1326+ d = defer.Deferred()
1327+ self.patch(self.main.action_q, 'get_delta', lambda v, g: d.callback((v, g)))
1328+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', share.volume_id, 100)
1329+ vol_id, gen = yield d
1330+ vol = self.vm.get_volume(vol_id)
1331+ self.assertEqual(vol_id, vol.volume_id)
1332+ self.assertEqual(gen, vol.generation)
1333+
1334+ @defer.inlineCallbacks
1335+ def test_handle_SV_VOLUME_NEW_GENERATION_share_from_scratch(self):
1336+ """Test handle_SV_VOLUME_NEW_GENERATION for a share."""
1337+ share_id = uuid.uuid4()
1338+ share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None,
1339+ 10, 'to_me', 'fake_share', 'username',
1340+ 'visible_username', True, 'View')
1341+ dir_name = self.vm._build_share_path(share_volume.share_name,
1342+ share_volume.other_visible_name)
1343+ share_path = os.path.join(self.main.shares_dir, dir_name)
1344+ share = Share.from_share_volume(share_volume, share_path)
1345+ self.vm.add_share(share)
1346+ self.vm.update_generation(share.volume_id, None)
1347+ d = defer.Deferred()
1348+ self.patch(self.main.action_q, 'rescan_from_scratch', d.callback)
1349+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', share.volume_id, 100)
1350+ vol_id = yield d
1351+ self.assertEquals(vol_id, share.volume_id)
1352+
1353+ def test_handle_SV_VOLUME_NEW_GENERATION_share_eq(self):
1354+ """Test handle_SV_VOLUME_NEW_GENERATION for a share."""
1355+ share_id = uuid.uuid4()
1356+ share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None,
1357+ 10, 'to_me', 'fake_share', 'username',
1358+ 'visible_username', True, 'View')
1359+ dir_name = self.vm._build_share_path(share_volume.share_name,
1360+ share_volume.other_visible_name)
1361+ share_path = os.path.join(self.main.shares_dir, dir_name)
1362+ share = Share.from_share_volume(share_volume, share_path)
1363+ self.vm.add_share(share)
1364+ self.vm.update_generation(share.volume_id, 100)
1365+ handler = MementoHandler()
1366+ handler.setLevel(logging.INFO)
1367+ self.vm.log.addHandler(handler)
1368+ self.main.event_q.push('SV_VOLUME_NEW_GENERATION', share.volume_id, 100)
1369+ self.assertEqual(1, len(handler.records))
1370+ msg = 'Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume' + \
1371+ ' is at generation: %r'
1372+ self.assertEqual(msg % (share.volume_id, 100, 100),
1373+ handler.records[0].message)
1374+ self.vm.log.removeHandler(handler)
1375+
1376+ @defer.inlineCallbacks
1377+ def test_handle_AQ_DELTA_NOT_POSSIBLE(self):
1378+ """Test for handle_AQ_DELTA_NOT_POSSIBLE."""
1379+ share_id = uuid.uuid4()
1380+ share_volume = volumes.ShareVolume(share_id, 'fake_share_uuid', None,
1381+ 10, 'to_me', 'fake_share', 'username',
1382+ 'visible_username', True, 'View')
1383+ dir_name = self.vm._build_share_path(share_volume.share_name,
1384+ share_volume.other_visible_name)
1385+ share_path = os.path.join(self.main.shares_dir, dir_name)
1386+ share = Share.from_share_volume(share_volume, share_path)
1387+ # get the root
1388+ root = self.vm.get_volume(request.ROOT)
1389+ self.vm._got_root('root_node_id')
1390+ # create a UDF
1391+ udf_id = uuid.uuid4()
1392+ udf, _ = self._create_udf(udf_id, 'udf_node_id', '~/UDF')
1393+ self.vm.add_share(share)
1394+ yield self.vm.add_udf(udf)
1395+ # patch AQ.rescan_from_scratch
1396+ calls = []
1397+ self.patch(self.main.action_q, 'rescan_from_scratch', calls.append)
1398+ self.vm.handle_AQ_DELTA_NOT_POSSIBLE(udf_id)
1399+ self.vm.handle_AQ_DELTA_NOT_POSSIBLE(share_id)
1400+ self.vm.handle_AQ_DELTA_NOT_POSSIBLE(root.volume_id)
1401+ for i, vol in enumerate([udf, share, root]):
1402+ self.assertEquals(calls[i], str(vol.volume_id))
1403+
1404+ def test_handle_AQ_DELTA_NOT_POSSIBLE_missing_volume(self):
1405+ """Test for handle_AQ_DELTA_NOT_POSSIBLE with an missing volume."""
1406+ handler = MementoHandler()
1407+ handler.setLevel(logging.WARNING)
1408+ self.vm.log.addHandler(handler)
1409+ vol_id = uuid.uuid4()
1410+ try:
1411+ self.vm.handle_AQ_DELTA_NOT_POSSIBLE(vol_id)
1412+ finally:
1413+ self.vm.log.removeHandler(handler)
1414+ self.assertEqual(1, len(handler.records))
1415+ msg = 'Got a AQ_DELTA_NOT_POSSIBLE for a missing volume: %r'
1416+ self.assertEqual(msg % str(vol_id), handler.records[0].message)
1417+
1418
1419 class MetadataTestCase(BaseTwistedTestCase):
1420 md_version_None = False
1421@@ -2062,7 +2295,7 @@
1422 # add the root_uuid key
1423 root_share = _Share(path=self.root_dir)
1424 root_share.access_level = 'Modify'
1425- old_shelf[''] = root_share
1426+ old_shelf[request.ROOT] = root_share
1427 for idx in range(1, 10):
1428 sid = str(uuid.uuid4())
1429 old_shelf[sid] = _Share(path=os.path.join(self.shares_dir, str(idx)),
1430@@ -2129,7 +2362,7 @@
1431 self.data_dir, self.partials_dir)
1432 new_keys = [new_key for new_key in self.main.vm.shares.keys()]
1433 self.assertEquals(2, len(new_keys)) # the fake share plus root
1434- for key in ['', share.id]:
1435+ for key in [request.ROOT, share.id]:
1436 self.assertIn(key, new_keys)
1437 self.check_version()
1438
1439@@ -2322,7 +2555,7 @@
1440 # add the root_uuid key
1441 root_share = _Share(path=self.root_dir)
1442 root_share.access_level = 'Modify'
1443- maybe_old_shelf[''] = root_share
1444+ maybe_old_shelf[request.ROOT] = root_share
1445 for idx in range(1, 10):
1446 share_id = str(uuid.uuid4())
1447 maybe_old_shelf[share_id] = \
1448@@ -2483,7 +2716,7 @@
1449 root_share.access_level = 'Modify'
1450 # set None to the share path
1451 root_share.path = None
1452- shares[''] = root_share
1453+ shares[request.ROOT] = root_share
1454
1455 if self.md_version_None:
1456 self.set_md_version('')
1457@@ -2521,9 +2754,9 @@
1458 self.set_md_version('5')
1459 # create some old shares and shared metadata
1460 legacy_shares = LegacyShareFileShelf(self.share_md_dir)
1461- root_share = _Share(path=self.root_dir, share_id='',
1462+ root_share = _Share(path=self.root_dir, share_id=request.ROOT,
1463 access_level='Modify')
1464- legacy_shares[''] = root_share
1465+ legacy_shares[request.ROOT] = root_share
1466 for idx, name in enumerate(['share'] * 10):
1467 sid = str(uuid.uuid4())
1468 share_name = name + '_' + str(idx)
1469@@ -2595,9 +2828,9 @@
1470 self.udfs_md_dir = os.path.join(self.vm_data_dir, 'udfs')
1471 # create some old shares and shared metadata
1472 legacy_shares = LegacyShareFileShelf(self.share_md_dir)
1473- root_share = _Share(path=self.root_dir, share_id='',
1474+ root_share = _Share(path=self.root_dir, share_id=request.ROOT,
1475 access_level='Modify')
1476- legacy_shares[''] = root_share
1477+ legacy_shares[request.ROOT] = root_share
1478 for idx, name in enumerate(['share'] * 10):
1479 sid = str(uuid.uuid4())
1480 share_name = name + '_' + str(idx)
1481@@ -2695,9 +2928,9 @@
1482 self.udfs_md_dir = os.path.join(self.vm_data_dir, 'udfs')
1483 # create some old shares and shared metadata
1484 legacy_shares = LegacyShareFileShelf(self.share_md_dir)
1485- root_share = _Share(path=self.root_dir, share_id='',
1486+ root_share = _Share(path=self.root_dir, share_id=request.ROOT,
1487 access_level='Modify', node_id=str(uuid.uuid4()))
1488- legacy_shares[''] = root_share
1489+ legacy_shares[request.ROOT] = root_share
1490 for idx, name in enumerate(['share'] * 3):
1491 sid = str(uuid.uuid4())
1492 share_name = name + '_' + str(idx)
1493@@ -2802,9 +3035,9 @@
1494 self.set_md_version('5')
1495 # create some old shares and shared metadata
1496 legacy_shares = LegacyShareFileShelf(self.share_md_dir)
1497- root_share = _Share(path=self.root_dir, share_id='',
1498+ root_share = _Share(path=self.root_dir, share_id=request.ROOT,
1499 access_level='Modify')
1500- legacy_shares[''] = root_share
1501+ legacy_shares[request.ROOT] = root_share
1502 for idx, name in enumerate(['share'] * 10):
1503 sid = str(uuid.uuid4())
1504 share_name = name + '_' + str(idx)
1505
1506=== modified file 'ubuntuone/syncdaemon/action_queue.py'
1507--- ubuntuone/syncdaemon/action_queue.py 2010-07-19 20:15:06 +0000
1508+++ ubuntuone/syncdaemon/action_queue.py 2010-07-22 13:39:48 +0000
1509@@ -703,14 +703,6 @@
1510 self.connector = None
1511 self.connect_in_progress = False
1512
1513- def _node_state_callback(self, share_id, node_id, hash):
1514- """Called by the client when notified that node changed."""
1515- volume = self.main.vm.udfs.get(share_id, None)
1516- if volume is not None and not volume.subscribed:
1517- return
1518- self.event_queue.push('SV_HASH_NEW',
1519- share_id=share_id, node_id=node_id, hash=hash)
1520-
1521 def _share_change_callback(self, info):
1522 """Called by the client when notified that a share changed."""
1523 self.event_queue.push('SV_SHARE_CHANGED', info=info)
1524@@ -827,7 +819,6 @@
1525 # does nothing (safely).
1526 self.client = ThrottlingStorageClientFactory.buildProtocol(self, addr)
1527
1528- self.client.set_node_state_callback(self._node_state_callback)
1529 self.client.set_share_change_callback(self._share_change_callback)
1530 self.client.set_share_answer_callback(self._share_answer_callback)
1531 self.client.set_free_space_callback(self._free_space_callback)
1532@@ -878,6 +869,7 @@
1533 def _send_request_and_handle_errors(self, request, request_error,
1534 event_error, event_ok,
1535 fire_deferred=True,
1536+ handle_exception=True,
1537 args=(), kwargs={}):
1538 """Send 'request' to the server, using params 'args' and 'kwargs'.
1539
1540@@ -919,8 +911,11 @@
1541 event = 'SYS_SERVER_ERROR'
1542 self.event_queue.push(event, error=str(failure))
1543 except Exception, failure:
1544- event = 'SYS_UNKNOWN_ERROR'
1545- self.event_queue.push(event)
1546+ if handle_exception:
1547+ event = 'SYS_UNKNOWN_ERROR'
1548+ self.event_queue.push(event)
1549+ else:
1550+ raise
1551 else:
1552 logger.info("The request '%s' finished OK.", req_name)
1553 if event_ok is not None:
1554@@ -997,50 +992,19 @@
1555 # callback the deferred if everything went ok
1556 self.deferred.callback(self.client)
1557
1558- def server_rescan(self, root_mdid, data_gen):
1559- """Do the server rescan."""
1560-
1561- @defer.inlineCallbacks
1562- def _get_root_and_query(root_mdid, data_gen):
1563- """Get user's root and then query each element in data_gen()."""
1564- yield self.get_root(root_mdid)
1565- data = data_gen()
1566- logger.info("Server rescan: will query %d objects", len(data))
1567- # check we're going to actually log, because this could be expensive
1568- if logger.isEnabledFor(TRACE):
1569- for share, node, hash in data:
1570- logger.trace("Server rescan: share: %r, node: %r, hash: %s",
1571- share or '/root/', node, hash)
1572- logger.trace("Server rescan: all data shown")
1573- yield self.client.query(data)
1574-
1575- get_root_and_query_d = self._send_request_and_handle_errors(
1576- request=_get_root_and_query,
1577+ @defer.inlineCallbacks
1578+ def query_volumes(self):
1579+ """Get the list of volumes.
1580+
1581+ This method will *not* queue a command, the request will be
1582+ executed right away.
1583+ """
1584+ result = yield self._send_request_and_handle_errors(
1585+ request=self.client.list_volumes,
1586 request_error=None, event_error=None,
1587- event_ok='SYS_SERVER_RESCAN_DONE', fire_deferred=False,
1588- args=(root_mdid, data_gen)
1589- )
1590- return get_root_and_query_d
1591-
1592- def get_root(self, marker):
1593- """Get the user's root uuid.
1594-
1595- Use the uuid_map, so the caller can use the marker in followup
1596- operations.
1597-
1598- """
1599- log = mklog(logger, 'get_root', '', marker, marker=marker)
1600- log.debug('starting')
1601- d = self.client.get_root()
1602- d.addCallbacks(*log.callbacks())
1603- d.addCallbacks(passit(lambda root: self.uuid_map.set(marker, root)),
1604- passit(lambda f: self.uuid_map.err(marker, f)))
1605- def handle_root(root_id):
1606- """Push SYS_ROOT_RECEIVED event"""
1607- self.event_queue.push('SYS_ROOT_RECEIVED', root_id=root_id)
1608- return root_id
1609- d.addCallback(handle_root)
1610- return d
1611+ event_ok=None, fire_deferred=False,
1612+ handle_exception=False)
1613+ defer.returnValue(result.volumes)
1614
1615 def make_file(self, share_id, parent_id, name, marker):
1616 """
1617@@ -1134,17 +1098,6 @@
1618 """
1619 return ListVolumes(self.meta_queue).start()
1620
1621- def query_volumes(self):
1622- """List all the volumes the user has.
1623-
1624- This includes the volumes:
1625- - all the user's UDFs.
1626- - all the shares the user has accepted.
1627- - the root-root volume.
1628-
1629- """
1630- return VolumesQuery(self.meta_queue).start()
1631-
1632 def delete_volume(self, volume_id):
1633 """Delete a volume on the server, removing the associated tree.
1634
1635@@ -1256,6 +1209,18 @@
1636 """
1637 return GetDelta(self.meta_queue, volume_id, generation).start()
1638
1639+ def rescan_from_scratch(self, volume_id):
1640+ """Get a delta from scratch for the volume.
1641+
1642+ @param volume_id: the id of the volume to get the delta.
1643+
1644+ Result will be signaled using events:
1645+ - AQ_RESCAN_FROM_SCRATCH_OK on succeess.
1646+ - AQ_RESCAN_FROM_SCRATCH_ERROR on generic failure.
1647+ """
1648+ # TODO: implement rescan from scratch
1649+ return None
1650+
1651
1652 SKIP_THIS_ITEM = object()
1653
1654@@ -1989,18 +1954,6 @@
1655 error=failure.getErrorMessage())
1656
1657
1658-class VolumesQuery(ListVolumes):
1659- """List all the volumes for a given user, but doesn't push a event."""
1660-
1661- def handle_success(self, success):
1662- """It worked!, just forward the value"""
1663- return success.volumes
1664-
1665- def handle_failure(self, failure):
1666- """It didn't work! just forward the failure."""
1667- return failure
1668-
1669-
1670 class DeleteVolume(ActionQueueCommand):
1671 """Delete an exsistent volume."""
1672
1673@@ -2065,6 +2018,7 @@
1674 def handle_success(self, request):
1675 """It worked! Push the success event."""
1676 data = dict(
1677+ volume_id=self.volume_id,
1678 delta_content=request.response,
1679 end_generation=request.end_generation,
1680 full=request.full,
1681
1682=== modified file 'ubuntuone/syncdaemon/event_queue.py'
1683--- ubuntuone/syncdaemon/event_queue.py 2010-07-19 20:15:06 +0000
1684+++ ubuntuone/syncdaemon/event_queue.py 2010-07-22 13:39:48 +0000
1685@@ -85,19 +85,14 @@
1686 'AQ_CHANGE_PUBLIC_ACCESS_ERROR': ('share_id', 'node_id', 'error'),
1687 'AQ_PUBLIC_FILES_LIST_OK': ('public_files',),
1688 'AQ_PUBLIC_FILES_LIST_ERROR': ('error',),
1689- 'AQ_DELTA_OK': ('delta_content', 'end_generation', 'full', 'free_bytes'),
1690+ 'AQ_DELTA_OK': ('volume_id', 'delta_content', 'end_generation',
1691+ 'full', 'free_bytes'),
1692 'AQ_DELTA_ERROR': ('volume_id', 'error'),
1693 'AQ_DELTA_NOT_POSSIBLE': ('volume_id',),
1694
1695 'SV_SHARE_CHANGED': ('info',),
1696 'SV_SHARE_DELETED': ('share_id',),
1697 'SV_SHARE_ANSWERED': ('share_id', 'answer'),
1698- 'SV_HASH_NEW': ('share_id', 'node_id', 'hash'),
1699- 'SV_FILE_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
1700- 'SV_DIR_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
1701- 'SV_FILE_DELETED': ('share_id', 'node_id'),
1702- 'SV_MOVED': ('share_id', 'node_id', 'new_share_id', 'new_parent_id',
1703- 'new_name'),
1704 'SV_FREE_SPACE': ('share_id', 'free_bytes'),
1705 'SV_ACCOUNT_CHANGED': ('account_info',),
1706 'SV_VOLUME_CREATED': ('volume',),
1707
1708=== modified file 'ubuntuone/syncdaemon/main.py'
1709--- ubuntuone/syncdaemon/main.py 2010-03-19 14:28:40 +0000
1710+++ ubuntuone/syncdaemon/main.py 2010-07-22 13:39:48 +0000
1711@@ -233,9 +233,7 @@
1712
1713 def server_rescan(self):
1714 """Do the server rescan."""
1715- mdobj = self.fs.get_by_path(self.root_dir)
1716- self.action_q.server_rescan(mdobj.mdid,
1717- self.fs.get_data_for_server_rescan)
1718+ return self.vm.server_rescan()
1719
1720 def set_oauth_token(self, key, secret):
1721 """ Sets the oauth token """
1722
1723=== modified file 'ubuntuone/syncdaemon/sync.py'
1724--- ubuntuone/syncdaemon/sync.py 2010-07-20 16:17:58 +0000
1725+++ ubuntuone/syncdaemon/sync.py 2010-07-22 13:39:48 +0000
1726@@ -24,11 +24,9 @@
1727 import sys
1728
1729 from ubuntuone.syncdaemon.marker import MDMarker
1730-from ubuntuone.storageprotocol.dircontent_pb2 import DIRECTORY
1731-from ubuntuone.storageprotocol import dircontent
1732+from ubuntuone.storageprotocol import delta
1733 from ubuntuone.syncdaemon.fsm.fsm import \
1734 StateMachineRunner, StateMachine
1735-from ubuntuone.syncdaemon.interfaces import IMarker
1736 from ubuntuone.syncdaemon import u1fsfsm
1737 from ubuntuone.syncdaemon.logger import DebugCapture
1738 from ubuntuone.syncdaemon.filesystem_manager import InconsistencyError
1739@@ -309,6 +307,7 @@
1740 with DebugCapture(self.log.logger):
1741 func_name = super(SyncStateMachineRunner, self).on_event(*args,
1742 **kwargs)
1743+
1744 if not is_debug:
1745 self.log.info("Called %s (In: %s)" % (func_name, in_state))
1746
1747@@ -362,8 +361,18 @@
1748 is_directory=self.key.is_directory(),
1749 )
1750
1751- def check_new_volume_generation(self, volume_id, new_generation):
1752- """Check the new generation for the volume and take proper action."""
1753+ def update_generation(self, volume_id, node_id, new_generation):
1754+ """Update the generation for the node and volume."""
1755+
1756+ # update the file
1757+ try:
1758+ node = self.m.fs.get_by_node_id(volume_id, node_id)
1759+ except KeyError:
1760+ pass
1761+ else:
1762+ self.m.fs.set_by_mdid(node.mdid, generation=new_generation)
1763+
1764+ # update the volume
1765 try:
1766 volume = self.m.vm.get_volume(volume_id)
1767 except VolumeDoesNotExist:
1768@@ -405,7 +414,6 @@
1769 path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
1770 self.m.fs.create(path=path, share_id=share_id, node_id=node_id,
1771 is_dir=True)
1772- self.m.action_q.query([(share_id, node_id, "")])
1773 # pylint: disable-msg=W0704
1774 # this should be provided by FSM, fix!!
1775 try:
1776@@ -439,30 +447,6 @@
1777 self.m.fs.set_node_id(self.key['path'], node_id)
1778 self.reget_dir(event, params)
1779
1780- def reget_dir(self, event, params, hash=None):
1781- """Reget the directory."""
1782- self.m.action_q.cancel_download(share_id=self.key['share_id'],
1783- node_id=self.key['node_id'])
1784- self.key.remove_partial()
1785- self.m.action_q.query([(self.key['share_id'],
1786- self.key['node_id'],
1787- self.key['local_hash'] or "")])
1788- self.key.set(server_hash=self.key['local_hash'])
1789- self.key.sync()
1790-
1791- def get_dir(self, event, params, hash):
1792- """Get the directory."""
1793- self.key.set(server_hash=hash)
1794- self.key.sync()
1795- self.m.fs.create_partial(node_id=self.key['node_id'],
1796- share_id=self.key['share_id'])
1797- self.m.action_q.listdir(
1798- self.key['share_id'], self.key['node_id'], hash,
1799- lambda : self.m.fs.get_partial_for_writing(
1800- node_id=self.key['node_id'],
1801- share_id=self.key['share_id'])
1802- )
1803-
1804 def file_conflict(self, event, params, hash, crc32, size, stat):
1805 """This file is in conflict."""
1806 self.key.move_to_conflict()
1807@@ -474,92 +458,6 @@
1808 node_id=self.key['node_id'])
1809 self.get_file(event, params, hash)
1810
1811- def merge_directory(self, event, params, hash):
1812- """Merge the server directory with the local one."""
1813- new_files = []
1814- new_dirs = []
1815- deleted_filedirs = []
1816- moved = set()
1817-
1818- try:
1819- fd = self.m.fs.get_partial(node_id=self.key['node_id'],
1820- share_id=self.key['share_id'])
1821- except InconsistencyError:
1822- self.key.remove_partial()
1823- self.key.set(server_hash=self.key['local_hash'])
1824- self.key.sync()
1825- self.m.action_q.query([
1826- (self.key["share_id"], self.key["node_id"], "")])
1827- # we dont perform the merge, we try to re get it
1828- return
1829-
1830-
1831- items = dircontent.parse_dir_content(fd)
1832- server_dir = [ (o.utf8_name, o.node_type == DIRECTORY, o.uuid)
1833- for o in items ]
1834- client_dir = self.m.fs.dir_content(self.key['path'])
1835- # XXX: lucio.torre: with huge dirs, this could take a while
1836-
1837- share = self.key['share_id']
1838- for name, isdir, uuid in server_dir:
1839- # we took the name as bytes already encoded in utf8
1840- # directly from dircontent!
1841- try:
1842- md = self.m.fs.get_by_node_id(share, uuid)
1843- except KeyError:
1844- # not there, a new thing
1845- if isdir:
1846- new_dirs.append((share, uuid, name))
1847- else:
1848- new_files.append((share, uuid, name))
1849- continue
1850- mdpath = self.m.fs.get_abspath(md.share_id, md.path)
1851- if mdpath != os.path.join(self.key['path'], name):
1852- # this was moved, or maybe the server still didn't receive
1853- # the move that happened here
1854- if not self.m.action_q.node_is_with_queued_move(share, uuid):
1855- # mark as moved
1856- moved.add(uuid)
1857- # signal moved
1858- self.m.event_q.push("SV_MOVED",
1859- share_id=md.share_id, node_id=uuid,
1860- new_share_id=share, new_parent_id=self.key['node_id'],
1861- new_name=name)
1862-
1863-
1864- for name, isdir, uuid in client_dir:
1865- if uuid is None:
1866- continue
1867-
1868- if not (name, isdir, uuid) in server_dir:
1869- # not there, a its gone on the server
1870- if uuid in moved:
1871- # this was a move, dont delete
1872- continue
1873- deleted_filedirs.append((share, uuid))
1874-
1875- # these nodes are in process of being deleted from the server, so they
1876- # are not exactly new locally (were already deleted here)
1877- trash = set((share_id, node_id) for (share_id, node_id, _)
1878- in self.m.fs.get_iter_trash())
1879-
1880- parent_uuid = self.key['node_id']
1881- for share, uuid in deleted_filedirs:
1882- self.m.event_q.push("SV_FILE_DELETED",
1883- node_id=uuid, share_id=share)
1884- for share, uuid, name in new_files:
1885- if (share, uuid) not in trash:
1886- self.m.event_q.push("SV_FILE_NEW", parent_id=parent_uuid,
1887- node_id=uuid, share_id=share, name=name)
1888- for share, uuid, name in new_dirs:
1889- if (share, uuid) not in trash:
1890- self.m.event_q.push("SV_DIR_NEW", parent_id=parent_uuid,
1891- node_id=uuid, share_id=share, name=name)
1892-
1893- self.key.remove_partial()
1894- self.key.set(local_hash=hash)
1895- self.key.sync()
1896-
1897 def new_file(self, event, params, share_id, node_id, parent_id, name):
1898 """create a local file."""
1899 mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
1900@@ -569,7 +467,6 @@
1901 self.key.set(server_hash="")
1902 self.key.set(local_hash="")
1903 self.key.sync()
1904- self.m.action_q.query([(share_id, node_id, "")])
1905
1906 def new_file_on_server_with_local(self, event, params, share_id,
1907 node_id, parent_id, name):
1908@@ -616,15 +513,6 @@
1909 new_parent_id=new_parent_id, new_name=new_name)
1910 self.key.moved(self.key['share_id'], path_to)
1911
1912- # this is cheating, we change the state of another node
1913- if not IMarker.providedBy(old_parent_id):
1914- share_id = self.key['share_id']
1915- self.m.action_q.cancel_download(share_id, old_parent_id)
1916- old_parent.remove_partial()
1917- self.m.fs.set_by_node_id(old_parent_id, share_id,
1918- server_hash="", local_hash="")
1919- self.m.action_q.query([(share_id, old_parent_id, "")])
1920-
1921 # we only hash it if we're a file, not a directory
1922 if not self.key.is_dir():
1923 self.m.hash_q.insert(self.key['path'], self.key['mdid'])
1924@@ -649,8 +537,7 @@
1925 self.key.remove_partial()
1926 self.key.set(server_hash=self.key['local_hash'])
1927 self.key.sync()
1928- self.m.action_q.query([
1929- (self.key["share_id"], self.key["node_id"], "")])
1930+
1931
1932 def new_local_file(self, event, parms, path):
1933 """a new local file was created"""
1934@@ -690,10 +577,6 @@
1935 """Server answered that dir creation was ok."""
1936 self.m.fs.set_node_id(self.key['path'], new_id)
1937
1938- # query to get any updates in case dir was already there in server
1939- self.m.action_q.query([(self.key['share_id'], self.key['node_id'],
1940- self.key['local_hash'])])
1941-
1942 def calculate_hash(self, event, params):
1943 """calculate the hash of this."""
1944 self.m.hash_q.insert(self.key['path'], self.key['mdid'])
1945@@ -809,12 +692,8 @@
1946 def dir_not_created_in_server(self, event, params, error):
1947 """Re-get the dir if it was because already exist."""
1948 if error == "ALREADY_EXISTS":
1949- # delete metadata and query the father to converge
1950- parent = self.m.fs.get_by_node_id(self.key['share_id'],
1951- self.key['parent_id'])
1952+ # delete metadata
1953 self.key.delete_metadata()
1954- self.m.action_q.query([(parent.share_id,
1955- parent.node_id, parent.local_hash)])
1956 else:
1957 self.key.move_to_conflict()
1958 self.key.delete_metadata()
1959@@ -890,9 +769,6 @@
1960 self.key.remove_partial()
1961 self.key.set(server_hash=self.key['local_hash'])
1962 self.key.sync()
1963- self.m.action_q.query([(self.key['share_id'],
1964- self.key['node_id'],
1965- self.key['local_hash'] or "")])
1966
1967 # pylint: disable-msg=C0103
1968 def DESPAIR(self, event, params, *args, **kwargs):
1969@@ -927,14 +803,14 @@
1970 self.m = main
1971 self.m.event_q.subscribe(self)
1972
1973- def handle_SV_HASH_NEW(self, share_id, node_id, hash):
1974- """on SV_HASH_NEW"""
1975+ def _handle_SV_HASH_NEW(self, share_id, node_id, hash):
1976+ """on SV_HASH_NEW. No longer called by EQ, only internally."""
1977 key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
1978 log = FileLogger(self.logger, key)
1979 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
1980 ssmr.signal_event_with_hash("SV_HASH_NEW", hash)
1981
1982- def handle_SV_FILE_NEW(self, share_id, node_id, parent_id, name):
1983+ def _handle_SV_FILE_NEW(self, share_id, node_id, parent_id, name):
1984 """on SV_FILE_NEW"""
1985 parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
1986 path = os.path.join(parent["path"], name)
1987@@ -943,7 +819,7 @@
1988 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
1989 ssmr.on_event("SV_FILE_NEW", {}, share_id, node_id, parent_id, name)
1990
1991- def handle_SV_DIR_NEW(self, share_id, node_id, parent_id, name):
1992+ def _handle_SV_DIR_NEW(self, share_id, node_id, parent_id, name):
1993 """on SV_DIR_NEW"""
1994 parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
1995 path = os.path.join(parent["path"], name)
1996@@ -952,8 +828,8 @@
1997 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
1998 ssmr.on_event("SV_DIR_NEW", {}, share_id, node_id, parent_id, name)
1999
2000- def handle_SV_FILE_DELETED(self, share_id, node_id):
2001- """on SV_FILE_DELETED"""
2002+ def _handle_SV_FILE_DELETED(self, share_id, node_id):
2003+ """on SV_FILE_DELETED. Not called by EQ anymore."""
2004 key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
2005 log = FileLogger(self.logger, key)
2006 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2007@@ -1035,8 +911,8 @@
2008 key = FSKey(self.m.fs, mdid=marker)
2009 log = FileLogger(self.logger, key)
2010 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2011- ssmr.check_new_volume_generation(volume_id, new_generation)
2012 ssmr.on_event("AQ_FILE_NEW_OK", {}, new_id)
2013+ ssmr.update_generation(volume_id, new_id, new_generation)
2014
2015 def handle_AQ_FILE_NEW_ERROR(self, marker, error):
2016 """on AQ_FILE_NEW_ERROR"""
2017@@ -1057,8 +933,8 @@
2018 key = FSKey(self.m.fs, mdid=marker)
2019 log = FileLogger(self.logger, key)
2020 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2021- ssmr.check_new_volume_generation(volume_id, new_generation)
2022 ssmr.on_event("AQ_DIR_NEW_OK", {}, new_id)
2023+ ssmr.update_generation(volume_id, new_id, new_generation)
2024
2025 def handle_FS_FILE_CLOSE_WRITE(self, path):
2026 """on FS_FILE_CLOSE_WRITE"""
2027@@ -1082,7 +958,7 @@
2028 ssmr.on_event('HQ_HASH_ERROR', {})
2029
2030 def handle_HQ_HASH_NEW(self, path, hash, crc32, size, stat):
2031- """on HQ_HASH_NEW"""
2032+ """on HQ_HASH_NEW."""
2033 key = FSKey(self.m.fs, path=path)
2034 log = FileLogger(self.logger, key)
2035 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2036@@ -1104,8 +980,8 @@
2037 key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
2038 log = FileLogger(self.logger, key)
2039 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2040- ssmr.check_new_volume_generation(share_id, new_generation)
2041 ssmr.signal_event_with_hash("AQ_UPLOAD_FINISHED", hash)
2042+ ssmr.update_generation(share_id, node_id, new_generation)
2043
2044 def handle_AQ_UPLOAD_ERROR(self, share_id, node_id, error, hash):
2045 """on AQ_UPLOAD_ERROR"""
2046@@ -1114,7 +990,7 @@
2047 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2048 ssmr.signal_event_with_error_and_hash("AQ_UPLOAD_ERROR", error, hash)
2049
2050- def handle_SV_MOVED(self, share_id, node_id, new_share_id, new_parent_id,
2051+ def _handle_SV_MOVED(self, share_id, node_id, new_share_id, new_parent_id,
2052 new_name):
2053 """on SV_MOVED"""
2054 key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
2055@@ -1129,8 +1005,8 @@
2056 key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
2057 log = FileLogger(self.logger, key)
2058 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2059- ssmr.check_new_volume_generation(share_id, new_generation)
2060 ssmr.on_event("AQ_UNLINK_OK", {}, share_id, node_id)
2061+ ssmr.update_generation(share_id, node_id, new_generation)
2062
2063 def handle_AQ_UNLINK_ERROR(self, share_id, parent_id, node_id, error):
2064 """on AQ_UNLINK_ERROR"""
2065@@ -1144,5 +1020,102 @@
2066 key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
2067 log = FileLogger(self.logger, key)
2068 ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
2069- ssmr.check_new_volume_generation(share_id, new_generation)
2070-
2071+ ssmr.update_generation(share_id, node_id, new_generation)
2072+
2073+ def handle_AQ_DELTA_OK(self, volume_id, delta_content, end_generation,
2074+ full, free_bytes):
2075+ """We got a new delta. Apply item by item of the delta.
2076+
2077+ Unlike other Sync methods that just defer to Sync State Machine Runner,
2078+ delta operations sync state in a different way. So this method has a
2079+ lot of logic that you will not see in the other event handlers.
2080+ """
2081+
2082+ for dt in delta_content:
2083+ # we only know how to update files
2084+ try:
2085+ if not isinstance(dt, delta.FileInfoDelta):
2086+ continue
2087+
2088+ # we only support files and directories
2089+ if dt.file_type == delta.DIRECTORY:
2090+ is_dir = True
2091+ elif dt.file_type == delta.FILE:
2092+ is_dir = False
2093+ else:
2094+ self.logger.warn("Unknown file type: %r" , dt.file_type)
2095+ continue
2096+
2097+ # if the node is dead, call the remove handler and forget
2098+ # about it
2099+ if not dt.is_live:
2100+ self._handle_SV_FILE_DELETED(dt.share_id, dt.node_id)
2101+ # nothing else should happen with this
2102+ continue
2103+
2104+ # here we must call handlers for SV_HASH_NEW, SV_MOVED
2105+
2106+ try:
2107+ node = self.m.fs.get_by_node_id(dt.share_id, dt.node_id)
2108+ except KeyError:
2109+ # node not there, we must create it
2110+ if is_dir:
2111+ self._handle_SV_DIR_NEW(dt.share_id, dt.node_id,
2112+ dt.parent_id, dt.name)
2113+ else:
2114+ self._handle_SV_FILE_NEW(dt.share_id, dt.node_id,
2115+ dt.parent_id, dt.name)
2116+ node = self.m.fs.get_by_node_id(dt.share_id, dt.node_id)
2117+
2118+ # if the delta is older than the node, skip!
2119+ if node.generation > dt.generation:
2120+ continue
2121+
2122+ # if the path changed, we have a move, notify it
2123+ node_parent_id = self.m.fs.get_by_path(
2124+ self.m.fs.get_abspath(node.share_id,
2125+ os.path.dirname(node.path))
2126+ ).node_id
2127+ node_name = os.path.basename(node.path)
2128+ if dt.parent_id != node_parent_id or dt.name != node_name:
2129+ # this was moved, or maybe the server still didn't receive
2130+ # the move that happened here
2131+ if not self.m.action_q.node_is_with_queued_move(
2132+ dt.share_id, dt.node_id):
2133+ # signal moved
2134+ self._handle_SV_MOVED(
2135+ share_id=node.share_id, node_id=node.node_id,
2136+ new_share_id=dt.share_id,
2137+ new_parent_id=dt.parent_id, new_name=dt.name)
2138+ else:
2139+ self.logger.info("Not calling _handle_SV_MOVED for "
2140+ "%s:%s due to pending move. "
2141+ "(old parent = %s, new_parent = %s "
2142+ "old_name = %s, new_name = %s)",
2143+ node.share_id, node.node_id,
2144+ node_parent_id, dt.parent_id,
2145+ node_name, dt.name)
2146+
2147+
2148+ # if its a dir, theres nothing else that we do with them except
2149+ # creating them.
2150+ # if its a file, we only care about the hash
2151+ if not is_dir:
2152+ self._handle_SV_HASH_NEW(dt.share_id, dt.node_id,
2153+ dt.content_hash)
2154+
2155+ # node updated, update generation
2156+ self.m.fs.set_by_mdid(node.mdid, generation=dt.generation)
2157+ except Exception:
2158+ # we trap all exceptions so we can continue processing deltas
2159+ # even if something fails for one file. We just make sure
2160+ # we log.
2161+ self.logger.exception(
2162+ "Node delta for %s:%s can't be applied.""",
2163+ dt.share_id, dt.node_id)
2164+
2165+ self.m.vm.update_free_space(volume_id, free_bytes)
2166+ self.m.vm.update_generation(volume_id, end_generation)
2167+
2168+ if not full:
2169+ self.m.action_q.get_delta(volume_id, end_generation)
2170
2171=== modified file 'ubuntuone/syncdaemon/u1fsfsm.ods'
2172Binary files ubuntuone/syncdaemon/u1fsfsm.ods 2010-05-06 18:07:50 +0000 and ubuntuone/syncdaemon/u1fsfsm.ods 2010-07-22 13:39:48 +0000 differ
2173=== modified file 'ubuntuone/syncdaemon/u1fsfsm.py'
2174--- ubuntuone/syncdaemon/u1fsfsm.py 2010-05-06 18:07:50 +0000
2175+++ ubuntuone/syncdaemon/u1fsfsm.py 2010-07-22 13:39:48 +0000
2176@@ -704,8 +704,8 @@
2177 u'has_metadata': u'=',
2178 u'is_directory': u'='}},
2179 {'ACTION': u'pass',
2180- 'ACTION_FUNC': u'nothing',
2181- 'COMMENTS': u'spurius download finished, ignore',
2182+ 'ACTION_FUNC': u'DESPAIR',
2183+ 'COMMENTS': u'we dont download directories anymore',
2184 'PARAMETERS': {u'hash_eq_local_hash': u'!NA',
2185 u'hash_eq_server_hash': u'!NA',
2186 u'not_authorized': u'NA',
2187@@ -717,8 +717,8 @@
2188 u'has_metadata': u'=',
2189 u'is_directory': u'='}},
2190 {'ACTION': u'merge_from_partial(uuid)',
2191- 'ACTION_FUNC': u'merge_directory',
2192- 'COMMENTS': u'this is the vanilla case, call merge directories',
2193+ 'ACTION_FUNC': u'DESPAIR',
2194+ 'COMMENTS': u'we dont download directories anymore',
2195 'PARAMETERS': {u'hash_eq_local_hash': u'!NA',
2196 u'hash_eq_server_hash': u'T',
2197 u'not_authorized': u'NA',
2198@@ -730,8 +730,8 @@
2199 u'has_metadata': u'=',
2200 u'is_directory': u'='}},
2201 {'ACTION': u'pass',
2202- 'ACTION_FUNC': u'nothing',
2203- 'COMMENTS': u'download we are not interested in',
2204+ 'ACTION_FUNC': u'DESPAIR',
2205+ 'COMMENTS': u'we dont download directories anymore',
2206 'PARAMETERS': {u'hash_eq_local_hash': u'!NA',
2207 u'hash_eq_server_hash': u'F',
2208 u'not_authorized': u'NA',
2209@@ -2485,8 +2485,8 @@
2210 u'has_metadata': u'T',
2211 u'is_directory': u'='}},
2212 {'ACTION': u'pass',
2213- 'ACTION_FUNC': u'nothing',
2214- 'COMMENTS': u'no news is good news',
2215+ 'ACTION_FUNC': u'DESPAIR',
2216+ 'COMMENTS': u'cant set hash on directories',
2217 'PARAMETERS': {u'hash_eq_local_hash': u'!NA',
2218 u'hash_eq_server_hash': u'T',
2219 u'not_authorized': u'NA',
2220@@ -2498,8 +2498,8 @@
2221 u'has_metadata': u'=',
2222 u'is_directory': u'='}},
2223 {'ACTION': u'md.set(uuid, server_hash=hash)\npartial = md.create_partial(uuid)\naq.getcontent(*partial)',
2224- 'ACTION_FUNC': u'get_dir',
2225- 'COMMENTS': u'normal case',
2226+ 'ACTION_FUNC': u'DESPAIR',
2227+ 'COMMENTS': u'cant set hash on directories',
2228 'PARAMETERS': {u'hash_eq_local_hash': u'!NA',
2229 u'hash_eq_server_hash': u'F',
2230 u'not_authorized': u'NA',
2231@@ -2511,8 +2511,8 @@
2232 u'has_metadata': u'=',
2233 u'is_directory': u'='}},
2234 {'ACTION': u'aq.cancel_download(uuid) \nmd.set(uuid, server_hash=hash)',
2235- 'ACTION_FUNC': u'nothing',
2236- 'COMMENTS': u'A download for a content object with the same hash is already in progress',
2237+ 'ACTION_FUNC': u'DESPAIR',
2238+ 'COMMENTS': u'cant set hash on directories',
2239 'PARAMETERS': {u'hash_eq_local_hash': u'!NA',
2240 u'hash_eq_server_hash': u'T',
2241 u'not_authorized': u'NA',
2242@@ -2524,8 +2524,8 @@
2243 u'has_metadata': u'=',
2244 u'is_directory': u'='}},
2245 {'ACTION': u'aq.cancel_download(uuid)\nmd.set(uuid, server_hash=hash)\npartial = md.get_partial(uuid)\naq.getcontent(*partial)',
2246- 'ACTION_FUNC': u'reget_dir',
2247- 'COMMENTS': u'a download was in progress but the server changed again. Note that this makes it important for AQ_DOWNLOAD_FINISHED to check the server hash.',
2248+ 'ACTION_FUNC': u'DESPAIR',
2249+ 'COMMENTS': u'cant set hash on directories',
2250 'PARAMETERS': {u'hash_eq_local_hash': u'!NA',
2251 u'hash_eq_server_hash': u'F',
2252 u'not_authorized': u'NA',
2253
2254=== modified file 'ubuntuone/syncdaemon/volume_manager.py'
2255--- ubuntuone/syncdaemon/volume_manager.py 2010-07-19 16:03:28 +0000
2256+++ ubuntuone/syncdaemon/volume_manager.py 2010-07-22 13:39:48 +0000
2257@@ -110,6 +110,10 @@
2258 self.node_id == other.node_id)
2259 return result
2260
2261+ def __repr__(self):
2262+ return "<Volume id %r, node_id %r, generation %r>" % (self.volume_id,
2263+ self.node_id,
2264+ self.generation)
2265
2266 class Share(Volume):
2267 """A volume representing a Share."""
2268@@ -251,7 +255,7 @@
2269 self.subscribed = subscribed
2270
2271 def __repr__(self):
2272- return "<UDF id %r, real path %r>" % (self.id, self.path)
2273+ return "<UDF id %r, generation %r, real path %r>" % (self.id, self.generation, self.path)
2274
2275 @property
2276 def ancestors(self):
2277@@ -393,15 +397,6 @@
2278 self.m.fs.create(path=root.path,
2279 share_id=root.volume_id, is_dir=True)
2280
2281- def handle_SYS_ROOT_RECEIVED(self, root_id):
2282- """Got the root, map it to the root share."""
2283- self.log.debug('handle_SYS_ROOT_RECEIVED(%s)', root_id)
2284- self._got_root(root_id)
2285- self.m.action_q.inquire_account_info()
2286- self.m.action_q.inquire_free_space(request.ROOT)
2287- self.refresh_volumes()
2288- self.refresh_shares()
2289-
2290 def _got_root(self, node_id, free_bytes=None):
2291 """Set the root node_id to the root share and mdobj."""
2292 # only set the root if we don't have it
2293@@ -414,8 +409,12 @@
2294 self.m.fs.set_node_id(root.path, node_id)
2295 root.node_id = node_id
2296 self.shares[request.ROOT] = root
2297+ self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id)
2298 elif root.node_id != node_id:
2299 self.m.event_q.push('SYS_ROOT_MISMATCH', root.node_id, node_id)
2300+ else:
2301+ # the root node_id match and we already have it
2302+ self.m.event_q.push('SYS_ROOT_RECEIVED', root.node_id)
2303
2304 def refresh_shares(self):
2305 """Request the list of shares to the server."""
2306@@ -439,6 +438,43 @@
2307 udfs.append(vol.volume_id)
2308 self._cleanup_volumes(shares=shares, udfs=udfs)
2309
2310+ def handle_SV_VOLUME_NEW_GENERATION(self, volume_id, generation):
2311+ """Handle SV_VOLUME_NEW_GENERATION."""
2312+ self.log.debug('handle_SV_VOLUME_NEW_GENERATION(%r, %r)',
2313+ volume_id, generation)
2314+ volume_id = str(volume_id) # be safe and use a str
2315+ try:
2316+ volume = self.get_volume(volume_id)
2317+ except VolumeDoesNotExist:
2318+ self.log.warning('Got a SV_VOLUME_NEW_GENERATION for a missing '
2319+ 'volume: %r with %r', volume_id, generation)
2320+ else:
2321+ current_gen = volume.generation
2322+ if current_gen is None:
2323+ self.m.action_q.rescan_from_scratch(volume_id)
2324+ elif current_gen < generation:
2325+ # XXX: check if we want to impose a hard limit in the size of
2326+ # the delta and do a rescan from scratch if it's too big
2327+ self.m.action_q.get_delta(volume_id, current_gen)
2328+ # TODO/XXX: should we (VM) handle AQ_DELTA_ERROR?
2329+ elif current_gen >= generation:
2330+ self.log.info('Got SV_VOLUME_NEW_GENERATION(%r, %r) but volume'
2331+ ' is at generation: %r',
2332+ volume_id, generation, current_gen)
2333+
2334+ def handle_AQ_DELTA_NOT_POSSIBLE(self, volume_id):
2335+ """Handle AQ_DELTA_NOT_POSSIBLE."""
2336+ self.log.debug('handle_AQ_DELTA_NOT_POSSIBLE(%r)', volume_id)
2337+ volume_id = str(volume_id)
2338+ try:
2339+ volume = self.get_volume(volume_id)
2340+ except VolumeDoesNotExist:
2341+ self.log.warning('Got a AQ_DELTA_NOT_POSSIBLE for a missing '
2342+ 'volume: %r', volume_id)
2343+ else:
2344+ self.log.info('Requesting a rescan from scratch for: %s', volume)
2345+ self.m.action_q.rescan_from_scratch(volume_id)
2346+
2347 def server_rescan(self):
2348 """Do the 'server rescan'"""
2349 d = self.m.action_q.query_volumes()
2350@@ -453,7 +489,7 @@
2351
2352 def _volumes_rescan_cb(self, volumes):
2353 """Handle the volumes list for server rescan"""
2354- self.log.debug('handling volumes rescan')
2355+ self.log.debug('handling volumes rescan: %r', volumes)
2356 events = []
2357 for new_volume in volumes:
2358 # TODO: all this block might need to be inside a try:except
2359@@ -465,14 +501,13 @@
2360 # A new volume!
2361 self.log.debug('New volume! id: %r', volume_id)
2362 volume = self._handle_new_volume(new_volume)
2363- # we don't have the volume, use the initial generation
2364- current_generation = 0
2365- else:
2366- current_generation = volume.generation or 0 # None case
2367+ finally:
2368+ current_generation = volume.generation
2369 new_generation = new_volume.generation
2370- # update the free_bytes on the volume
2371- self.update_free_space(volume_id, new_volume.free_bytes)
2372- if current_generation < new_generation:
2373+ self.log.debug('%s current gen: %s vs new gen: %s', volume,
2374+ current_generation, new_generation)
2375+ if current_generation is None or \
2376+ current_generation < new_generation:
2377 # add the event
2378 events.append(('SV_VOLUME_NEW_GENERATION',
2379 dict(volume_id=volume_id,
2380@@ -668,31 +703,36 @@
2381 self.shared[share_id] = share
2382
2383 def handle_AQ_ANSWER_SHARE_OK(self, share_id, answer):
2384- """ Handle successfully accepting a share """
2385+ """Handle successfully accepting a share."""
2386 if answer == 'Yes':
2387 share = self.shares[share_id]
2388+ share.accepted = True
2389+ self.shares[share.volume_id] = share
2390 self._create_fsm_object(share.path, share.volume_id, share.node_id)
2391 self._create_share_dir(share)
2392- self.m.action_q.query([(share.volume_id, str(share.node_id), "")])
2393- self.m.action_q.inquire_free_space(share.volume_id)
2394+ # request the delta from the last known generation (should be None)
2395+ self.m.action_q.get_delta(share.volume_id, share.generation)
2396
2397 def add_share(self, a_share):
2398 """ Add a share to the share list, and creates the fs mdobj. """
2399 self.log.info('Adding new share with id: %s - path: %r',
2400 a_share.volume_id, a_share.path)
2401- # if the share is there, do nothing and return it
2402 share = self.shares.get(a_share.volume_id)
2403 is_new_share = share is None
2404 if share is not None:
2405 share.accepted = a_share.accepted
2406 self.shares[share.volume_id] = share
2407 else:
2408- share = self.shares[share.volume_id] = a_share
2409+ share = a_share
2410+ if is_new_share or not share.accepted:
2411+ # if it's a new share or isn't accepted set the generation to None
2412+ # to force a rescan
2413+ share.generation = None
2414+ # store the share
2415+ self.shares[share.volume_id] = share
2416 if share.accepted:
2417 self._create_fsm_object(share.path, share.volume_id, share.node_id)
2418 self._create_share_dir(share)
2419- self.m.action_q.query([(share.volume_id, str(share.node_id), "")])
2420- self.m.action_q.inquire_free_space(share.volume_id)
2421 if is_new_share:
2422 # push the event only if it's a new share
2423 self.m.event_q.push('VM_SHARE_CREATED', share.volume_id)
2424@@ -852,12 +892,15 @@
2425 """Add the udf to the VM metadata if isn't there.
2426
2427 If it's a new udf, create the directory, hook inotify
2428- and execute a query.
2429+ and request the full delta.
2430
2431 """
2432 self.log.debug('add_udf: %s', udf)
2433 if self.udfs.get(udf.volume_id, None) is None:
2434 self.log.debug('udf not in metadata, adding it!')
2435+ # if it's a new UDF set the generation to None to force a
2436+ # rescan
2437+ udf.generation = None
2438 self.udfs[udf.volume_id] = udf
2439 self._create_fsm_object(udf.path, udf.volume_id, udf.node_id)
2440 # local and server rescan, this will add the inotify hooks
2441@@ -1063,10 +1106,8 @@
2442 mdobj = self.m.fs.get_by_path(udf.path)
2443 d = self.m.lr.scan_dir(mdobj.mdid, udf.path, udfmode=True)
2444 def server_rescan(_):
2445- """Do a query over all known nodes."""
2446- data = self.m.fs.get_for_server_rescan_by_path(udf.path)
2447- self.m.action_q.query(data)
2448- self.m.action_q.inquire_free_space(request.ROOT)
2449+ """Request the delta from the last known generation."""
2450+ self.m.action_q.get_delta(udf.id, udf.generation)
2451 d.addCallback(server_rescan)
2452 return d
2453
2454@@ -1140,6 +1181,7 @@
2455
2456 def update_generation(self, volume_id, generation):
2457 """Update the generation of the specified volume."""
2458+ self.log.debug('update_generation: %r, %r', volume_id, generation)
2459 vol = self.get_volume(volume_id)
2460 vol.generation = generation
2461 if isinstance(vol, (Share, Root)):

Subscribers

People subscribed via source and target branches