Merge lp:~nataliabidart/ubuntuone-client/share-subscription-ops into lp:ubuntuone-client

Proposed by Natalia Bidart
Status: Merged
Approved by: Natalia Bidart
Approved revision: 837
Merged at revision: 835
Proposed branch: lp:~nataliabidart/ubuntuone-client/share-subscription-ops
Merge into: lp:ubuntuone-client
Prerequisite: lp:~nataliabidart/ubuntuone-client/add-subscribed-to-shares
Diff against target: 437 lines (+331/-34)
3 files modified
tests/syncdaemon/test_vm.py (+226/-0)
ubuntuone/syncdaemon/event_queue.py (+4/-0)
ubuntuone/syncdaemon/volume_manager.py (+101/-34)
To merge this branch: bzr merge lp:~nataliabidart/ubuntuone-client/share-subscription-ops
Reviewer Review Type Date Requested Status
dobey (community) Approve
Roberto Alsina (community) Approve
Review via email: mp+48170@code.launchpad.net

Commit message

- Shares can be subscribed to/unsubscribed from (part of LP: #708335).

To post a comment you must log in.
833. By Natalia Bidart

Changes from dependency branch.

834. By Natalia Bidart

Merged trunk in.

835. By Natalia Bidart

Merged trunk in.

836. By Natalia Bidart

Changes from dependency branch.

Revision history for this message
Roberto Alsina (ralsina) wrote :

+1 looks good to me.

Revision history for this message
Roberto Alsina (ralsina) :
review: Approve
837. By Natalia Bidart

Merged trunk in.

Revision history for this message
dobey (dobey) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_vm.py'
2--- tests/syncdaemon/test_vm.py 2011-02-01 14:24:41 +0000
3+++ tests/syncdaemon/test_vm.py 2011-02-03 12:52:26 +0000
4@@ -983,6 +983,232 @@
5 self.assertEqual(set(defined_args[1:]), set(evtargs))
6
7
8+class ViewSharesSubscriptionTests(BaseVolumeManagerTests):
9+ """Test Shares subscription operations when access_level is View."""
10+
11+ access_level = 'View'
12+
13+ @defer.inlineCallbacks
14+ def test_subscribe_share(self):
15+ """Test subscribe_share method."""
16+ share = self._create_share(access_level=self.access_level,
17+ subscribed=False)
18+ yield self.vm.add_share(share)
19+ self.assertFalse(self.vm.shares[share.volume_id].subscribed)
20+ # subscribe to it
21+ yield self.vm.subscribe_share(share.volume_id)
22+ self.assertTrue(self.vm.shares[share.volume_id].subscribed)
23+
24+ @defer.inlineCallbacks
25+ def test_subscribe_share_missing_path(self):
26+ """Test subscribe_share with a missing path """
27+ share = self._create_share(access_level=self.access_level,
28+ subscribed=False)
29+ yield self.vm.add_share(share)
30+ self.assertFalse(os.path.exists(share.path))
31+ self.assertFalse(self.vm.shares[share.id].subscribed)
32+ # subscribe to it
33+ yield self.vm.subscribe_share(share.id)
34+ self.assertTrue(self.vm.shares[share.id].subscribed)
35+ self.assertTrue(os.path.exists(share.path))
36+
37+ @defer.inlineCallbacks
38+ def test_subscribe_share_missing_volume(self):
39+ """Test subscribe_share with a invalid volume_id."""
40+ try:
41+ yield self.vm.subscribe_share('invalid_share_id')
42+ except VolumeDoesNotExist, e:
43+ self.assertEquals('DOES_NOT_EXIST', e.args[0])
44+ self.assertEquals('invalid_share_id', e.args[1])
45+ else:
46+ self.fail('Must get a VolumeDoesNotExist!')
47+
48+ @defer.inlineCallbacks
49+ def test_unsubscribe_share(self):
50+ """Test unsubscribe_share method."""
51+ share = self._create_share(access_level=self.access_level,
52+ subscribed=True)
53+ yield self.vm.add_share(share)
54+ self.assertTrue(self.vm.shares[share.volume_id].subscribed)
55+ # unsubscribe from it
56+ self.vm.unsubscribe_share(share.volume_id)
57+ self.assertFalse(self.vm.shares[share.volume_id].subscribed)
58+
59+ @defer.inlineCallbacks
60+ def test_unsubscribe_share_with_content(self):
61+ """Test unsubscribe_share method in a share with content."""
62+ share = self._create_share(access_level=self.access_level,
63+ subscribed=True)
64+ yield self.vm.add_share(share)
65+
66+ self.assertTrue(self.vm.shares[share.volume_id].subscribed)
67+ # create a few files and directories
68+ dirs = ['dir', 'dir/subdir', 'dir/empty_dir']
69+ for i, dir in enumerate(dirs):
70+ path = os.path.join(share.path, dir)
71+ with allow_writes(os.path.split(share.path)[0]):
72+ with allow_writes(share.path):
73+ if not os.path.exists(path):
74+ os.makedirs(path)
75+ self.main.fs.create(path, share.volume_id, is_dir=True)
76+ self.main.fs.set_node_id(path, 'dir_node_id'+str(i))
77+ # add a inotify watch to the dir
78+ self.main.event_q.add_watch(path)
79+ files = ['a_file', 'dir/file', 'dir/subdir/file']
80+ for i, file in enumerate(files):
81+ path = os.path.join(share.path, file)
82+ with allow_writes(os.path.split(share.path)[0]):
83+ with allow_writes(share.path):
84+ open(path, 'w').close()
85+ self.main.fs.create(path, share.volume_id)
86+ self.main.fs.set_node_id(path, 'file_node_id'+str(i))
87+ paths = self.main.fs.get_paths_starting_with(share.path)
88+ self.assertEquals(len(paths), len(dirs+files)+1)
89+
90+ # unsubscribe from it
91+ self.vm.unsubscribe_share(share.volume_id)
92+
93+ self.assertEquals(2, len(self.vm.shares)) # share and root
94+ self.assertFalse(self.vm.shares[share.volume_id].subscribed)
95+ # check that the share is in the fsm metadata
96+ self.assertTrue(self.main.fs.get_by_path(share.path))
97+ # get the childs (should be an empty list)
98+ paths = list(self.main.fs.get_paths_starting_with(share.path))
99+ self.assertEquals(len(dirs+files)+1, len(paths))
100+ # check that there isn't a watch in the share
101+ self.assertFalse(self.main.event_q.has_watch(share.path))
102+ # check that the childs don't have a watch
103+ for path, is_dir in paths:
104+ if is_dir:
105+ self.assertFalse(self.main.event_q.has_watch(path))
106+
107+
108+class ModifySharesSubscriptionTests(ViewSharesSubscriptionTests):
109+ """Test Shares subscription operations when access_level is Modify."""
110+
111+ access_level = 'Modify'
112+
113+ @defer.inlineCallbacks
114+ def test_subscribe_share_missing_fsm_md(self):
115+ """Test subscribe_share with a missing node in fsm."""
116+ share = self._create_share(access_level=self.access_level,
117+ subscribed=False)
118+ yield self.vm.add_share(share)
119+ self.assertFalse(os.path.exists(share.path))
120+ self.assertFalse(self.vm.shares[share.id].subscribed)
121+ yield self.vm.subscribe_share(share.id)
122+ yield self.vm.unsubscribe_share(share.id)
123+ # delete the fsm metadata
124+ self.main.fs.delete_metadata(share.path)
125+ # subscribe to it and fail!
126+ try:
127+ yield self.vm.subscribe_share(share.id)
128+ except KeyError, e:
129+ self.assertIn(share.path, e.args[0])
130+ else:
131+ self.fail('Must get a KeyError!')
132+
133+ @defer.inlineCallbacks
134+ def _test_subscribe_share_generations(self, share):
135+ """Test subscribe_share with a generation."""
136+ scratch_d = defer.Deferred()
137+ def fake_rescan_from_scratch(volume_id):
138+ """A fake rescan_from_scratch that check the arguments."""
139+ self.assertEquals(share.volume_id, volume_id)
140+ scratch_d.callback(None)
141+ self.main.action_q.rescan_from_scratch = fake_rescan_from_scratch
142+ # subscribe to it
143+ yield self.vm.subscribe_share(share.volume_id)
144+ yield scratch_d
145+ self.assertTrue(self.vm.shares[share.volume_id].subscribed)
146+
147+ @defer.inlineCallbacks
148+ def test_subscribe_share_valid_generation(self):
149+ """Test subscribe_share with a valid generation."""
150+ share = self._create_share(access_level=self.access_level,
151+ subscribed=False)
152+ yield self.vm.add_share(share)
153+ self.assertFalse(self.vm.shares[share.volume_id].subscribed)
154+ # update share generation
155+ self.vm.update_generation(share.volume_id, 0)
156+ yield self._test_subscribe_share_generations(share)
157+
158+ @defer.inlineCallbacks
159+ def test_subscribe_share_without_generation(self):
160+ """Test subscribe_share without a valid generation."""
161+ share = self._create_share(access_level=self.access_level,
162+ subscribed=False)
163+ yield self.vm.add_share(share)
164+ self.assertFalse(self.vm.shares[share.volume_id].subscribed)
165+ # update share generation
166+ self.vm.update_generation(share.volume_id, None)
167+ yield self._test_subscribe_share_generations(share)
168+
169+ @defer.inlineCallbacks
170+ def test_unsubscribe_subscribe_share_with_content(self):
171+ """Test for re-subscribing to a share."""
172+ share = self._create_share(access_level=self.access_level,
173+ subscribed=True)
174+ yield self.vm.add_share(share)
175+ self.main.event_q.rm_watch(share.path)
176+ self.assertTrue(self.vm.shares[share.volume_id].subscribed)
177+ # create a few files and directories
178+ dirs = ['dir', 'dir/subdir', 'dir/empty_dir']
179+ for i, dir in enumerate(dirs):
180+ path = os.path.join(share.path, dir)
181+ with allow_writes(os.path.split(share.path)[0]):
182+ with allow_writes(share.path):
183+ if not os.path.exists(path):
184+ os.makedirs(path)
185+ self.main.fs.create(path, share.volume_id, is_dir=True)
186+ self.main.fs.set_node_id(path, 'dir_node_id'+str(i))
187+ files = ['a_file', 'dir/file', 'dir/subdir/file']
188+ for i, path in enumerate(files):
189+ path = os.path.join(share.path, path)
190+ with allow_writes(os.path.split(share.path)[0]):
191+ with allow_writes(share.path):
192+ open(path, 'w').close()
193+ self.main.fs.create(path, share.volume_id)
194+ self.main.fs.set_node_id(path, 'file_node_id'+str(i))
195+ paths = list(self.main.fs.get_paths_starting_with(share.path))
196+ # add a inotify watch to the dirs
197+ for path, is_dir in paths:
198+ if is_dir:
199+ self.main.event_q.add_watch(path)
200+ self.assertEquals(len(paths), len(dirs+files)+1, paths)
201+
202+ # unsubscribe from it
203+ self.vm.unsubscribe_share(share.volume_id)
204+
205+ self.assertEquals(2, len(self.vm.shares)) # share and root
206+ self.assertFalse(self.vm.shares[share.volume_id].subscribed)
207+ # check that the share is in the fsm metadata
208+ self.assertTrue(self.main.fs.get_by_path(share.path))
209+ # check that there isn't a watch in the share
210+ self.assertFalse(self.main.event_q.has_watch(share.path))
211+ # check that the childs don't have a watch
212+ for path, is_dir in paths:
213+ if is_dir:
214+ self.assertFalse(self.main.event_q.has_watch(path))
215+ # check the childs
216+ paths = self.main.fs.get_paths_starting_with(share.path)
217+ self.assertEquals(len(dirs+files)+1, len(paths))
218+ # resubscribe to it
219+ yield self.vm.subscribe_share(share.volume_id)
220+ paths = list(self.main.fs.get_paths_starting_with(share.path))
221+ # we should only have the dirs, as the files metadata is
222+ # delete by local rescan (both hashes are '')
223+ self.assertEquals(len(dirs)+1, len(paths))
224+ # check that there is a watch in the share
225+ self.assertTrue(self.main.event_q.has_watch(share.path))
226+ # check that the child dirs have a watch
227+ for path, is_dir in paths:
228+ if is_dir:
229+ self.assertTrue(self.main.event_q.has_watch(path),
230+ '%s has a watch' % path)
231+ self.vm._remove_watch(path)
232+
233+
234 class VolumeManagerUnicodeTests(BaseVolumeManagerTests):
235 """Tests for Volume Manager unicode capabilities."""
236
237
238=== modified file 'ubuntuone/syncdaemon/event_queue.py'
239--- ubuntuone/syncdaemon/event_queue.py 2011-01-21 15:45:16 +0000
240+++ ubuntuone/syncdaemon/event_queue.py 2011-02-03 12:52:26 +0000
241@@ -151,6 +151,10 @@
242 'VM_UDF_UNSUBSCRIBE_ERROR': ('udf_id', 'error'),
243 'VM_UDF_CREATED': ('udf',),
244 'VM_UDF_CREATE_ERROR': ('path', 'error'),
245+ 'VM_SHARE_SUBSCRIBED': ('share',),
246+ 'VM_SHARE_SUBSCRIBE_ERROR': ('share_id', 'error'),
247+ 'VM_SHARE_UNSUBSCRIBED': ('share',),
248+ 'VM_SHARE_UNSUBSCRIBE_ERROR': ('share_id', 'error'),
249 'VM_SHARE_CREATED': ('share_id',),
250 'VM_SHARE_DELETED': ('share',),
251 'VM_SHARE_DELETE_ERROR': ('share_id', 'error'),
252
253=== modified file 'ubuntuone/syncdaemon/volume_manager.py'
254--- ubuntuone/syncdaemon/volume_manager.py 2011-02-02 18:33:45 +0000
255+++ ubuntuone/syncdaemon/volume_manager.py 2011-02-03 12:52:26 +0000
256@@ -945,6 +945,19 @@
257 if not share.can_write():
258 set_dir_readonly(share.path)
259
260+ def _create_udf_dir(self, udf):
261+ """Create the udf dir if does not exist."""
262+ if not path_exists(udf.path):
263+ # the udf path isn't there, create it!
264+ make_dir(udf.path, recursive=True)
265+
266+ def _create_volume_dir(self, volume):
267+ """Create the volume dir if does not exist, set perms for shares."""
268+ if isinstance(volume, (Share, Root)):
269+ self._create_share_dir(volume)
270+ elif isinstance(volume, UDF):
271+ self._create_udf_dir(volume)
272+
273 def _create_fsm_object(self, path, volume_id, node_id):
274 """ Creates the mdobj for this share in fs manager. """
275 try:
276@@ -1103,6 +1116,13 @@
277 if all_volumes or volume.active:
278 yield volume
279
280+ def store_volume(self, volume):
281+ """Store 'volume'."""
282+ if isinstance(volume, (Share, Root)):
283+ self.shares[volume.volume_id] = volume
284+ elif isinstance(volume, UDF):
285+ self.udfs[volume.volume_id] = volume
286+
287 def _is_nested_udf(self, path):
288 """Check if it's ok to create a UDF in 'path'.
289
290@@ -1168,44 +1188,74 @@
291 volume = self.get_volume(volume_id)
292 self.m.action_q.delete_volume(volume.id, volume.path)
293
294+ def subscribe_share(self, share_id):
295+ """Mark the Share with 'share_id' as subscribed.
296+
297+ Also fire a local and server rescan.
298+
299+ """
300+ push_error = functools.partial(self.m.event_q.push,
301+ 'VM_SHARE_SUBSCRIBE_ERROR', share_id=share_id)
302+ push_success = lambda volume: \
303+ self.m.event_q.push('VM_SHARE_SUBSCRIBED', share=volume)
304+ self.log.info('subscribe_share: %r', share_id)
305+ d = self._subscribe_volume(share_id, push_success, push_error)
306+ return d
307+
308 def subscribe_udf(self, udf_id):
309- """Mark the UDF with id as subscribed.
310+ """Mark the UDF with 'udf_id' as subscribed.
311
312 Also fire a local and server rescan.
313
314 """
315 push_error = functools.partial(self.m.event_q.push,
316 'VM_UDF_SUBSCRIBE_ERROR', udf_id=udf_id)
317+ push_success = lambda volume: self.m.event_q.push('VM_UDF_SUBSCRIBED',
318+ udf=volume)
319 self.log.info('subscribe_udf: %r', udf_id)
320+ d = self._subscribe_volume(udf_id, push_success, push_error)
321+ return d
322+
323+ def _subscribe_volume(self, volume_id, push_success, push_error):
324+ """Mark the volume with 'volume_id' as subscribed.
325+
326+ If can_write(), fire a local and server rescan while temporary
327+ unsubscribing from it.
328+
329+ """
330+ self.log.debug('_subscribe_volume: %r', volume_id)
331 try:
332- udf = self.udfs[udf_id]
333- except KeyError:
334+ volume = self.get_volume(volume_id)
335+ except VolumeDoesNotExist, e:
336 push_error(error="DOES_NOT_EXIST")
337- d = defer.fail(VolumeDoesNotExist(udf_id))
338- else:
339- if not path_exists(udf.path):
340- # the udf path isn't there, create it!
341- make_dir(udf.path, recursive=True)
342- def subscribe(result):
343- """Subscribe the UDF after the local rescan.
344-
345- As we don't wait for server rescan to finish, the udf is
346- subscribed just after the local rescan it's done.
347-
348- """
349- udf.subscribed = True
350- self.udfs[udf_id] = udf
351- return result
352+ return defer.fail(e)
353+
354+ self._create_volume_dir(volume)
355+
356+ def subscribe(result):
357+ """Subscribe the volume after the local rescan.
358+
359+ As we don't wait for server rescan to finish, the volume is
360+ subscribed just after the local rescan it's done.
361+
362+ """
363+ volume.subscribed = True
364+ self.store_volume(volume)
365+ return result
366+
367+ if volume.can_write():
368 try:
369- d = self._scan_udf(udf)
370+ d = self._scan_volume(volume)
371 except KeyError, e:
372 push_error(error="METADATA_DOES_NOT_EXIST")
373- d = defer.fail(e)
374- else:
375- d.addCallback(subscribe)
376- d.addCallbacks(
377- lambda _: self.m.event_q.push('VM_UDF_SUBSCRIBED', udf=udf),
378- lambda f: push_error(error=f.getErrorMessage()))
379+ return defer.fail(e)
380+ else:
381+ # avoid local and server rescan for read-only volumes
382+ d = defer.succeed(None)
383+
384+ d.addCallback(subscribe)
385+ d.addCallbacks(lambda _: push_success(volume),
386+ lambda f: push_error(error=f.getErrorMessage()))
387 return d
388
389 def _scan_udf(self, udf):
390@@ -1230,21 +1280,38 @@
391 d.addCallback(server_rescan)
392 return d
393
394+ def unsubscribe_share(self, share_id):
395+ """Mark the share with share_id as unsubscribed."""
396+ self.log.info('unsubscribe_share: %r', share_id)
397+ push_error = functools.partial(self.m.event_q.push,
398+ 'VM_SHARE_UNSUBSCRIBE_ERROR', share_id=share_id)
399+ push_success = lambda volume: \
400+ self.m.event_q.push('VM_SHARE_UNSUBSCRIBED', share=volume)
401+ self._unsubscribe_volume(share_id, push_success, push_error)
402+
403 def unsubscribe_udf(self, udf_id):
404 """Mark the UDF with udf_id as unsubscribed."""
405 self.log.info('unsubscribe_udf: %r', udf_id)
406+ push_error = functools.partial(self.m.event_q.push,
407+ 'VM_UDF_UNSUBSCRIBE_ERROR', udf_id=udf_id)
408+ push_success = lambda volume: \
409+ self.m.event_q.push('VM_UDF_UNSUBSCRIBED', udf=volume)
410+ self._unsubscribe_volume(udf_id, push_success, push_error)
411+
412+ def _unsubscribe_volume(self, volume_id, push_success, push_error):
413+ """Mark the volume with volume_id as unsubscribed."""
414+ self.log.debug('unsubscribe_volume: %r', volume_id)
415 try:
416- udf = self.udfs[udf_id]
417- except KeyError:
418- self.m.event_q.push('VM_UDF_UNSUBSCRIBE_ERROR', udf_id=udf_id,
419- error="DOES_NOT_EXIST")
420+ volume = self.get_volume(volume_id)
421+ except VolumeDoesNotExist:
422+ push_error(error="DOES_NOT_EXIST")
423 else:
424 # remove the inotify watches, but don't delete the metadata
425- self._remove_watches(udf.path)
426- # makr the udf as unsubscribed
427- udf.subscribed = False
428- self.udfs[udf_id] = udf
429- self.m.event_q.push('VM_UDF_UNSUBSCRIBED', udf=udf)
430+ self._remove_watches(volume.path)
431+ # mark the volume as unsubscribed
432+ volume.subscribed = False
433+ self.store_volume(volume)
434+ push_success(volume)
435
436 def handle_AQ_CREATE_UDF_OK(self, marker, volume_id, node_id):
437 """Handle AQ_CREATE_UDF_OK."""

Subscribers

People subscribed via source and target branches