Merge lp:~nataliabidart/ubuntuone-client/share-subscription-ops into lp:ubuntuone-client

Proposed by Natalia Bidart
Status: Merged
Approved by: Natalia Bidart
Approved revision: 837
Merged at revision: 835
Proposed branch: lp:~nataliabidart/ubuntuone-client/share-subscription-ops
Merge into: lp:ubuntuone-client
Prerequisite: lp:~nataliabidart/ubuntuone-client/add-subscribed-to-shares
Diff against target: 437 lines (+331/-34)
3 files modified
tests/syncdaemon/test_vm.py (+226/-0)
ubuntuone/syncdaemon/event_queue.py (+4/-0)
ubuntuone/syncdaemon/volume_manager.py (+101/-34)
To merge this branch: bzr merge lp:~nataliabidart/ubuntuone-client/share-subscription-ops
Reviewer Review Type Date Requested Status
dobey (community) Approve
Roberto Alsina (community) Approve
Review via email: mp+48170@code.launchpad.net

Commit message

- Shares can be subscribed to/unsubscribed from (part of LP: #708335).

To post a comment you must log in.
833. By Natalia Bidart

Changes from dependency branch.

834. By Natalia Bidart

Merged trunk in.

835. By Natalia Bidart

Merged trunk in.

836. By Natalia Bidart

Changes from dependency branch.

Revision history for this message
Roberto Alsina (ralsina) wrote :

+1 looks good to me.

Revision history for this message
Roberto Alsina (ralsina) :
review: Approve
837. By Natalia Bidart

Merged trunk in.

Revision history for this message
dobey (dobey) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'tests/syncdaemon/test_vm.py'
--- tests/syncdaemon/test_vm.py 2011-02-01 14:24:41 +0000
+++ tests/syncdaemon/test_vm.py 2011-02-03 12:52:26 +0000
@@ -983,6 +983,232 @@
983 self.assertEqual(set(defined_args[1:]), set(evtargs))983 self.assertEqual(set(defined_args[1:]), set(evtargs))
984984
985985
986class ViewSharesSubscriptionTests(BaseVolumeManagerTests):
987 """Test Shares subscription operations when access_level is View."""
988
989 access_level = 'View'
990
991 @defer.inlineCallbacks
992 def test_subscribe_share(self):
993 """Test subscribe_share method."""
994 share = self._create_share(access_level=self.access_level,
995 subscribed=False)
996 yield self.vm.add_share(share)
997 self.assertFalse(self.vm.shares[share.volume_id].subscribed)
998 # subscribe to it
999 yield self.vm.subscribe_share(share.volume_id)
1000 self.assertTrue(self.vm.shares[share.volume_id].subscribed)
1001
1002 @defer.inlineCallbacks
1003 def test_subscribe_share_missing_path(self):
1004 """Test subscribe_share with a missing path """
1005 share = self._create_share(access_level=self.access_level,
1006 subscribed=False)
1007 yield self.vm.add_share(share)
1008 self.assertFalse(os.path.exists(share.path))
1009 self.assertFalse(self.vm.shares[share.id].subscribed)
1010 # subscribe to it
1011 yield self.vm.subscribe_share(share.id)
1012 self.assertTrue(self.vm.shares[share.id].subscribed)
1013 self.assertTrue(os.path.exists(share.path))
1014
1015 @defer.inlineCallbacks
1016 def test_subscribe_share_missing_volume(self):
1017 """Test subscribe_share with a invalid volume_id."""
1018 try:
1019 yield self.vm.subscribe_share('invalid_share_id')
1020 except VolumeDoesNotExist, e:
1021 self.assertEquals('DOES_NOT_EXIST', e.args[0])
1022 self.assertEquals('invalid_share_id', e.args[1])
1023 else:
1024 self.fail('Must get a VolumeDoesNotExist!')
1025
1026 @defer.inlineCallbacks
1027 def test_unsubscribe_share(self):
1028 """Test unsubscribe_share method."""
1029 share = self._create_share(access_level=self.access_level,
1030 subscribed=True)
1031 yield self.vm.add_share(share)
1032 self.assertTrue(self.vm.shares[share.volume_id].subscribed)
1033 # unsubscribe from it
1034 self.vm.unsubscribe_share(share.volume_id)
1035 self.assertFalse(self.vm.shares[share.volume_id].subscribed)
1036
1037 @defer.inlineCallbacks
1038 def test_unsubscribe_share_with_content(self):
1039 """Test unsubscribe_share method in a share with content."""
1040 share = self._create_share(access_level=self.access_level,
1041 subscribed=True)
1042 yield self.vm.add_share(share)
1043
1044 self.assertTrue(self.vm.shares[share.volume_id].subscribed)
1045 # create a few files and directories
1046 dirs = ['dir', 'dir/subdir', 'dir/empty_dir']
1047 for i, dir in enumerate(dirs):
1048 path = os.path.join(share.path, dir)
1049 with allow_writes(os.path.split(share.path)[0]):
1050 with allow_writes(share.path):
1051 if not os.path.exists(path):
1052 os.makedirs(path)
1053 self.main.fs.create(path, share.volume_id, is_dir=True)
1054 self.main.fs.set_node_id(path, 'dir_node_id'+str(i))
1055 # add a inotify watch to the dir
1056 self.main.event_q.add_watch(path)
1057 files = ['a_file', 'dir/file', 'dir/subdir/file']
1058 for i, file in enumerate(files):
1059 path = os.path.join(share.path, file)
1060 with allow_writes(os.path.split(share.path)[0]):
1061 with allow_writes(share.path):
1062 open(path, 'w').close()
1063 self.main.fs.create(path, share.volume_id)
1064 self.main.fs.set_node_id(path, 'file_node_id'+str(i))
1065 paths = self.main.fs.get_paths_starting_with(share.path)
1066 self.assertEquals(len(paths), len(dirs+files)+1)
1067
1068 # unsubscribe from it
1069 self.vm.unsubscribe_share(share.volume_id)
1070
1071 self.assertEquals(2, len(self.vm.shares)) # share and root
1072 self.assertFalse(self.vm.shares[share.volume_id].subscribed)
1073 # check that the share is in the fsm metadata
1074 self.assertTrue(self.main.fs.get_by_path(share.path))
1075 # get the childs (should be an empty list)
1076 paths = list(self.main.fs.get_paths_starting_with(share.path))
1077 self.assertEquals(len(dirs+files)+1, len(paths))
1078 # check that there isn't a watch in the share
1079 self.assertFalse(self.main.event_q.has_watch(share.path))
1080 # check that the childs don't have a watch
1081 for path, is_dir in paths:
1082 if is_dir:
1083 self.assertFalse(self.main.event_q.has_watch(path))
1084
1085
1086class ModifySharesSubscriptionTests(ViewSharesSubscriptionTests):
1087 """Test Shares subscription operations when access_level is Modify."""
1088
1089 access_level = 'Modify'
1090
1091 @defer.inlineCallbacks
1092 def test_subscribe_share_missing_fsm_md(self):
1093 """Test subscribe_share with a missing node in fsm."""
1094 share = self._create_share(access_level=self.access_level,
1095 subscribed=False)
1096 yield self.vm.add_share(share)
1097 self.assertFalse(os.path.exists(share.path))
1098 self.assertFalse(self.vm.shares[share.id].subscribed)
1099 yield self.vm.subscribe_share(share.id)
1100 yield self.vm.unsubscribe_share(share.id)
1101 # delete the fsm metadata
1102 self.main.fs.delete_metadata(share.path)
1103 # subscribe to it and fail!
1104 try:
1105 yield self.vm.subscribe_share(share.id)
1106 except KeyError, e:
1107 self.assertIn(share.path, e.args[0])
1108 else:
1109 self.fail('Must get a KeyError!')
1110
1111 @defer.inlineCallbacks
1112 def _test_subscribe_share_generations(self, share):
1113 """Test subscribe_share with a generation."""
1114 scratch_d = defer.Deferred()
1115 def fake_rescan_from_scratch(volume_id):
1116 """A fake rescan_from_scratch that check the arguments."""
1117 self.assertEquals(share.volume_id, volume_id)
1118 scratch_d.callback(None)
1119 self.main.action_q.rescan_from_scratch = fake_rescan_from_scratch
1120 # subscribe to it
1121 yield self.vm.subscribe_share(share.volume_id)
1122 yield scratch_d
1123 self.assertTrue(self.vm.shares[share.volume_id].subscribed)
1124
1125 @defer.inlineCallbacks
1126 def test_subscribe_share_valid_generation(self):
1127 """Test subscribe_share with a valid generation."""
1128 share = self._create_share(access_level=self.access_level,
1129 subscribed=False)
1130 yield self.vm.add_share(share)
1131 self.assertFalse(self.vm.shares[share.volume_id].subscribed)
1132 # update share generation
1133 self.vm.update_generation(share.volume_id, 0)
1134 yield self._test_subscribe_share_generations(share)
1135
1136 @defer.inlineCallbacks
1137 def test_subscribe_share_without_generation(self):
1138 """Test subscribe_share without a valid generation."""
1139 share = self._create_share(access_level=self.access_level,
1140 subscribed=False)
1141 yield self.vm.add_share(share)
1142 self.assertFalse(self.vm.shares[share.volume_id].subscribed)
1143 # update share generation
1144 self.vm.update_generation(share.volume_id, None)
1145 yield self._test_subscribe_share_generations(share)
1146
1147 @defer.inlineCallbacks
1148 def test_unsubscribe_subscribe_share_with_content(self):
1149 """Test for re-subscribing to a share."""
1150 share = self._create_share(access_level=self.access_level,
1151 subscribed=True)
1152 yield self.vm.add_share(share)
1153 self.main.event_q.rm_watch(share.path)
1154 self.assertTrue(self.vm.shares[share.volume_id].subscribed)
1155 # create a few files and directories
1156 dirs = ['dir', 'dir/subdir', 'dir/empty_dir']
1157 for i, dir in enumerate(dirs):
1158 path = os.path.join(share.path, dir)
1159 with allow_writes(os.path.split(share.path)[0]):
1160 with allow_writes(share.path):
1161 if not os.path.exists(path):
1162 os.makedirs(path)
1163 self.main.fs.create(path, share.volume_id, is_dir=True)
1164 self.main.fs.set_node_id(path, 'dir_node_id'+str(i))
1165 files = ['a_file', 'dir/file', 'dir/subdir/file']
1166 for i, path in enumerate(files):
1167 path = os.path.join(share.path, path)
1168 with allow_writes(os.path.split(share.path)[0]):
1169 with allow_writes(share.path):
1170 open(path, 'w').close()
1171 self.main.fs.create(path, share.volume_id)
1172 self.main.fs.set_node_id(path, 'file_node_id'+str(i))
1173 paths = list(self.main.fs.get_paths_starting_with(share.path))
1174 # add a inotify watch to the dirs
1175 for path, is_dir in paths:
1176 if is_dir:
1177 self.main.event_q.add_watch(path)
1178 self.assertEquals(len(paths), len(dirs+files)+1, paths)
1179
1180 # unsubscribe from it
1181 self.vm.unsubscribe_share(share.volume_id)
1182
1183 self.assertEquals(2, len(self.vm.shares)) # share and root
1184 self.assertFalse(self.vm.shares[share.volume_id].subscribed)
1185 # check that the share is in the fsm metadata
1186 self.assertTrue(self.main.fs.get_by_path(share.path))
1187 # check that there isn't a watch in the share
1188 self.assertFalse(self.main.event_q.has_watch(share.path))
1189 # check that the childs don't have a watch
1190 for path, is_dir in paths:
1191 if is_dir:
1192 self.assertFalse(self.main.event_q.has_watch(path))
1193 # check the childs
1194 paths = self.main.fs.get_paths_starting_with(share.path)
1195 self.assertEquals(len(dirs+files)+1, len(paths))
1196 # resubscribe to it
1197 yield self.vm.subscribe_share(share.volume_id)
1198 paths = list(self.main.fs.get_paths_starting_with(share.path))
1199 # we should only have the dirs, as the files metadata is
1200 # delete by local rescan (both hashes are '')
1201 self.assertEquals(len(dirs)+1, len(paths))
1202 # check that there is a watch in the share
1203 self.assertTrue(self.main.event_q.has_watch(share.path))
1204 # check that the child dirs have a watch
1205 for path, is_dir in paths:
1206 if is_dir:
1207 self.assertTrue(self.main.event_q.has_watch(path),
1208 '%s has a watch' % path)
1209 self.vm._remove_watch(path)
1210
1211
986class VolumeManagerUnicodeTests(BaseVolumeManagerTests):1212class VolumeManagerUnicodeTests(BaseVolumeManagerTests):
987 """Tests for Volume Manager unicode capabilities."""1213 """Tests for Volume Manager unicode capabilities."""
9881214
9891215
=== modified file 'ubuntuone/syncdaemon/event_queue.py'
--- ubuntuone/syncdaemon/event_queue.py 2011-01-21 15:45:16 +0000
+++ ubuntuone/syncdaemon/event_queue.py 2011-02-03 12:52:26 +0000
@@ -151,6 +151,10 @@
151 'VM_UDF_UNSUBSCRIBE_ERROR': ('udf_id', 'error'),151 'VM_UDF_UNSUBSCRIBE_ERROR': ('udf_id', 'error'),
152 'VM_UDF_CREATED': ('udf',),152 'VM_UDF_CREATED': ('udf',),
153 'VM_UDF_CREATE_ERROR': ('path', 'error'),153 'VM_UDF_CREATE_ERROR': ('path', 'error'),
154 'VM_SHARE_SUBSCRIBED': ('share',),
155 'VM_SHARE_SUBSCRIBE_ERROR': ('share_id', 'error'),
156 'VM_SHARE_UNSUBSCRIBED': ('share',),
157 'VM_SHARE_UNSUBSCRIBE_ERROR': ('share_id', 'error'),
154 'VM_SHARE_CREATED': ('share_id',),158 'VM_SHARE_CREATED': ('share_id',),
155 'VM_SHARE_DELETED': ('share',),159 'VM_SHARE_DELETED': ('share',),
156 'VM_SHARE_DELETE_ERROR': ('share_id', 'error'),160 'VM_SHARE_DELETE_ERROR': ('share_id', 'error'),
157161
=== modified file 'ubuntuone/syncdaemon/volume_manager.py'
--- ubuntuone/syncdaemon/volume_manager.py 2011-02-02 18:33:45 +0000
+++ ubuntuone/syncdaemon/volume_manager.py 2011-02-03 12:52:26 +0000
@@ -945,6 +945,19 @@
945 if not share.can_write():945 if not share.can_write():
946 set_dir_readonly(share.path)946 set_dir_readonly(share.path)
947947
948 def _create_udf_dir(self, udf):
949 """Create the udf dir if does not exist."""
950 if not path_exists(udf.path):
951 # the udf path isn't there, create it!
952 make_dir(udf.path, recursive=True)
953
954 def _create_volume_dir(self, volume):
955 """Create the volume dir if does not exist, set perms for shares."""
956 if isinstance(volume, (Share, Root)):
957 self._create_share_dir(volume)
958 elif isinstance(volume, UDF):
959 self._create_udf_dir(volume)
960
948 def _create_fsm_object(self, path, volume_id, node_id):961 def _create_fsm_object(self, path, volume_id, node_id):
949 """ Creates the mdobj for this share in fs manager. """962 """ Creates the mdobj for this share in fs manager. """
950 try:963 try:
@@ -1103,6 +1116,13 @@
1103 if all_volumes or volume.active:1116 if all_volumes or volume.active:
1104 yield volume1117 yield volume
11051118
1119 def store_volume(self, volume):
1120 """Store 'volume'."""
1121 if isinstance(volume, (Share, Root)):
1122 self.shares[volume.volume_id] = volume
1123 elif isinstance(volume, UDF):
1124 self.udfs[volume.volume_id] = volume
1125
1106 def _is_nested_udf(self, path):1126 def _is_nested_udf(self, path):
1107 """Check if it's ok to create a UDF in 'path'.1127 """Check if it's ok to create a UDF in 'path'.
11081128
@@ -1168,44 +1188,74 @@
1168 volume = self.get_volume(volume_id)1188 volume = self.get_volume(volume_id)
1169 self.m.action_q.delete_volume(volume.id, volume.path)1189 self.m.action_q.delete_volume(volume.id, volume.path)
11701190
1191 def subscribe_share(self, share_id):
1192 """Mark the Share with 'share_id' as subscribed.
1193
1194 Also fire a local and server rescan.
1195
1196 """
1197 push_error = functools.partial(self.m.event_q.push,
1198 'VM_SHARE_SUBSCRIBE_ERROR', share_id=share_id)
1199 push_success = lambda volume: \
1200 self.m.event_q.push('VM_SHARE_SUBSCRIBED', share=volume)
1201 self.log.info('subscribe_share: %r', share_id)
1202 d = self._subscribe_volume(share_id, push_success, push_error)
1203 return d
1204
1171 def subscribe_udf(self, udf_id):1205 def subscribe_udf(self, udf_id):
1172 """Mark the UDF with id as subscribed.1206 """Mark the UDF with 'udf_id' as subscribed.
11731207
1174 Also fire a local and server rescan.1208 Also fire a local and server rescan.
11751209
1176 """1210 """
1177 push_error = functools.partial(self.m.event_q.push,1211 push_error = functools.partial(self.m.event_q.push,
1178 'VM_UDF_SUBSCRIBE_ERROR', udf_id=udf_id)1212 'VM_UDF_SUBSCRIBE_ERROR', udf_id=udf_id)
1213 push_success = lambda volume: self.m.event_q.push('VM_UDF_SUBSCRIBED',
1214 udf=volume)
1179 self.log.info('subscribe_udf: %r', udf_id)1215 self.log.info('subscribe_udf: %r', udf_id)
1216 d = self._subscribe_volume(udf_id, push_success, push_error)
1217 return d
1218
1219 def _subscribe_volume(self, volume_id, push_success, push_error):
1220 """Mark the volume with 'volume_id' as subscribed.
1221
1222 If can_write(), fire a local and server rescan while temporary
1223 unsubscribing from it.
1224
1225 """
1226 self.log.debug('_subscribe_volume: %r', volume_id)
1180 try:1227 try:
1181 udf = self.udfs[udf_id]1228 volume = self.get_volume(volume_id)
1182 except KeyError:1229 except VolumeDoesNotExist, e:
1183 push_error(error="DOES_NOT_EXIST")1230 push_error(error="DOES_NOT_EXIST")
1184 d = defer.fail(VolumeDoesNotExist(udf_id))1231 return defer.fail(e)
1185 else:1232
1186 if not path_exists(udf.path):1233 self._create_volume_dir(volume)
1187 # the udf path isn't there, create it!1234
1188 make_dir(udf.path, recursive=True)1235 def subscribe(result):
1189 def subscribe(result):1236 """Subscribe the volume after the local rescan.
1190 """Subscribe the UDF after the local rescan.1237
11911238 As we don't wait for server rescan to finish, the volume is
1192 As we don't wait for server rescan to finish, the udf is1239 subscribed just after the local rescan it's done.
1193 subscribed just after the local rescan it's done.1240
11941241 """
1195 """1242 volume.subscribed = True
1196 udf.subscribed = True1243 self.store_volume(volume)
1197 self.udfs[udf_id] = udf1244 return result
1198 return result1245
1246 if volume.can_write():
1199 try:1247 try:
1200 d = self._scan_udf(udf)1248 d = self._scan_volume(volume)
1201 except KeyError, e:1249 except KeyError, e:
1202 push_error(error="METADATA_DOES_NOT_EXIST")1250 push_error(error="METADATA_DOES_NOT_EXIST")
1203 d = defer.fail(e)1251 return defer.fail(e)
1204 else:1252 else:
1205 d.addCallback(subscribe)1253 # avoid local and server rescan for read-only volumes
1206 d.addCallbacks(1254 d = defer.succeed(None)
1207 lambda _: self.m.event_q.push('VM_UDF_SUBSCRIBED', udf=udf),1255
1208 lambda f: push_error(error=f.getErrorMessage()))1256 d.addCallback(subscribe)
1257 d.addCallbacks(lambda _: push_success(volume),
1258 lambda f: push_error(error=f.getErrorMessage()))
1209 return d1259 return d
12101260
1211 def _scan_udf(self, udf):1261 def _scan_udf(self, udf):
@@ -1230,21 +1280,38 @@
1230 d.addCallback(server_rescan)1280 d.addCallback(server_rescan)
1231 return d1281 return d
12321282
1283 def unsubscribe_share(self, share_id):
1284 """Mark the share with share_id as unsubscribed."""
1285 self.log.info('unsubscribe_share: %r', share_id)
1286 push_error = functools.partial(self.m.event_q.push,
1287 'VM_SHARE_UNSUBSCRIBE_ERROR', share_id=share_id)
1288 push_success = lambda volume: \
1289 self.m.event_q.push('VM_SHARE_UNSUBSCRIBED', share=volume)
1290 self._unsubscribe_volume(share_id, push_success, push_error)
1291
1233 def unsubscribe_udf(self, udf_id):1292 def unsubscribe_udf(self, udf_id):
1234 """Mark the UDF with udf_id as unsubscribed."""1293 """Mark the UDF with udf_id as unsubscribed."""
1235 self.log.info('unsubscribe_udf: %r', udf_id)1294 self.log.info('unsubscribe_udf: %r', udf_id)
1295 push_error = functools.partial(self.m.event_q.push,
1296 'VM_UDF_UNSUBSCRIBE_ERROR', udf_id=udf_id)
1297 push_success = lambda volume: \
1298 self.m.event_q.push('VM_UDF_UNSUBSCRIBED', udf=volume)
1299 self._unsubscribe_volume(udf_id, push_success, push_error)
1300
1301 def _unsubscribe_volume(self, volume_id, push_success, push_error):
1302 """Mark the volume with volume_id as unsubscribed."""
1303 self.log.debug('unsubscribe_volume: %r', volume_id)
1236 try:1304 try:
1237 udf = self.udfs[udf_id]1305 volume = self.get_volume(volume_id)
1238 except KeyError:1306 except VolumeDoesNotExist:
1239 self.m.event_q.push('VM_UDF_UNSUBSCRIBE_ERROR', udf_id=udf_id,1307 push_error(error="DOES_NOT_EXIST")
1240 error="DOES_NOT_EXIST")
1241 else:1308 else:
1242 # remove the inotify watches, but don't delete the metadata1309 # remove the inotify watches, but don't delete the metadata
1243 self._remove_watches(udf.path)1310 self._remove_watches(volume.path)
1244 # makr the udf as unsubscribed1311 # mark the volume as unsubscribed
1245 udf.subscribed = False1312 volume.subscribed = False
1246 self.udfs[udf_id] = udf1313 self.store_volume(volume)
1247 self.m.event_q.push('VM_UDF_UNSUBSCRIBED', udf=udf)1314 push_success(volume)
12481315
1249 def handle_AQ_CREATE_UDF_OK(self, marker, volume_id, node_id):1316 def handle_AQ_CREATE_UDF_OK(self, marker, volume_id, node_id):
1250 """Handle AQ_CREATE_UDF_OK."""1317 """Handle AQ_CREATE_UDF_OK."""

Subscribers

People subscribed via source and target branches