Merge lp:~nataliabidart/ubuntuone-client/share-subscription-ops into lp:ubuntuone-client
- share-subscription-ops
- Merge into trunk
Proposed by
Natalia Bidart
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Natalia Bidart | ||||
Approved revision: | 837 | ||||
Merged at revision: | 835 | ||||
Proposed branch: | lp:~nataliabidart/ubuntuone-client/share-subscription-ops | ||||
Merge into: | lp:ubuntuone-client | ||||
Prerequisite: | lp:~nataliabidart/ubuntuone-client/add-subscribed-to-shares | ||||
Diff against target: |
437 lines (+331/-34) 3 files modified
tests/syncdaemon/test_vm.py (+226/-0) ubuntuone/syncdaemon/event_queue.py (+4/-0) ubuntuone/syncdaemon/volume_manager.py (+101/-34) |
||||
To merge this branch: | bzr merge lp:~nataliabidart/ubuntuone-client/share-subscription-ops | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
dobey (community) | Approve | ||
Roberto Alsina (community) | Approve | ||
Review via email: mp+48170@code.launchpad.net |
Commit message
- Shares can be subscribed to/unsubscribed from (part of LP: #708335).
Description of the change
To post a comment you must log in.
- 833. By Natalia Bidart
-
Changes from dependency branch.
- 834. By Natalia Bidart
-
Merged trunk in.
- 835. By Natalia Bidart
-
Merged trunk in.
- 836. By Natalia Bidart
-
Changes from dependency branch.
Revision history for this message
Roberto Alsina (ralsina) wrote : | # |
Revision history for this message
Roberto Alsina (ralsina) : | # |
review:
Approve
- 837. By Natalia Bidart
-
Merged trunk in.
Revision history for this message
dobey (dobey) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'tests/syncdaemon/test_vm.py' | |||
2 | --- tests/syncdaemon/test_vm.py 2011-02-01 14:24:41 +0000 | |||
3 | +++ tests/syncdaemon/test_vm.py 2011-02-03 12:52:26 +0000 | |||
4 | @@ -983,6 +983,232 @@ | |||
5 | 983 | self.assertEqual(set(defined_args[1:]), set(evtargs)) | 983 | self.assertEqual(set(defined_args[1:]), set(evtargs)) |
6 | 984 | 984 | ||
7 | 985 | 985 | ||
8 | 986 | class ViewSharesSubscriptionTests(BaseVolumeManagerTests): | ||
9 | 987 | """Test Shares subscription operations when access_level is View.""" | ||
10 | 988 | |||
11 | 989 | access_level = 'View' | ||
12 | 990 | |||
13 | 991 | @defer.inlineCallbacks | ||
14 | 992 | def test_subscribe_share(self): | ||
15 | 993 | """Test subscribe_share method.""" | ||
16 | 994 | share = self._create_share(access_level=self.access_level, | ||
17 | 995 | subscribed=False) | ||
18 | 996 | yield self.vm.add_share(share) | ||
19 | 997 | self.assertFalse(self.vm.shares[share.volume_id].subscribed) | ||
20 | 998 | # subscribe to it | ||
21 | 999 | yield self.vm.subscribe_share(share.volume_id) | ||
22 | 1000 | self.assertTrue(self.vm.shares[share.volume_id].subscribed) | ||
23 | 1001 | |||
24 | 1002 | @defer.inlineCallbacks | ||
25 | 1003 | def test_subscribe_share_missing_path(self): | ||
26 | 1004 | """Test subscribe_share with a missing path """ | ||
27 | 1005 | share = self._create_share(access_level=self.access_level, | ||
28 | 1006 | subscribed=False) | ||
29 | 1007 | yield self.vm.add_share(share) | ||
30 | 1008 | self.assertFalse(os.path.exists(share.path)) | ||
31 | 1009 | self.assertFalse(self.vm.shares[share.id].subscribed) | ||
32 | 1010 | # subscribe to it | ||
33 | 1011 | yield self.vm.subscribe_share(share.id) | ||
34 | 1012 | self.assertTrue(self.vm.shares[share.id].subscribed) | ||
35 | 1013 | self.assertTrue(os.path.exists(share.path)) | ||
36 | 1014 | |||
37 | 1015 | @defer.inlineCallbacks | ||
38 | 1016 | def test_subscribe_share_missing_volume(self): | ||
39 | 1017 | """Test subscribe_share with a invalid volume_id.""" | ||
40 | 1018 | try: | ||
41 | 1019 | yield self.vm.subscribe_share('invalid_share_id') | ||
42 | 1020 | except VolumeDoesNotExist, e: | ||
43 | 1021 | self.assertEquals('DOES_NOT_EXIST', e.args[0]) | ||
44 | 1022 | self.assertEquals('invalid_share_id', e.args[1]) | ||
45 | 1023 | else: | ||
46 | 1024 | self.fail('Must get a VolumeDoesNotExist!') | ||
47 | 1025 | |||
48 | 1026 | @defer.inlineCallbacks | ||
49 | 1027 | def test_unsubscribe_share(self): | ||
50 | 1028 | """Test unsubscribe_share method.""" | ||
51 | 1029 | share = self._create_share(access_level=self.access_level, | ||
52 | 1030 | subscribed=True) | ||
53 | 1031 | yield self.vm.add_share(share) | ||
54 | 1032 | self.assertTrue(self.vm.shares[share.volume_id].subscribed) | ||
55 | 1033 | # unsubscribe from it | ||
56 | 1034 | self.vm.unsubscribe_share(share.volume_id) | ||
57 | 1035 | self.assertFalse(self.vm.shares[share.volume_id].subscribed) | ||
58 | 1036 | |||
59 | 1037 | @defer.inlineCallbacks | ||
60 | 1038 | def test_unsubscribe_share_with_content(self): | ||
61 | 1039 | """Test unsubscribe_share method in a share with content.""" | ||
62 | 1040 | share = self._create_share(access_level=self.access_level, | ||
63 | 1041 | subscribed=True) | ||
64 | 1042 | yield self.vm.add_share(share) | ||
65 | 1043 | |||
66 | 1044 | self.assertTrue(self.vm.shares[share.volume_id].subscribed) | ||
67 | 1045 | # create a few files and directories | ||
68 | 1046 | dirs = ['dir', 'dir/subdir', 'dir/empty_dir'] | ||
69 | 1047 | for i, dir in enumerate(dirs): | ||
70 | 1048 | path = os.path.join(share.path, dir) | ||
71 | 1049 | with allow_writes(os.path.split(share.path)[0]): | ||
72 | 1050 | with allow_writes(share.path): | ||
73 | 1051 | if not os.path.exists(path): | ||
74 | 1052 | os.makedirs(path) | ||
75 | 1053 | self.main.fs.create(path, share.volume_id, is_dir=True) | ||
76 | 1054 | self.main.fs.set_node_id(path, 'dir_node_id'+str(i)) | ||
77 | 1055 | # add a inotify watch to the dir | ||
78 | 1056 | self.main.event_q.add_watch(path) | ||
79 | 1057 | files = ['a_file', 'dir/file', 'dir/subdir/file'] | ||
80 | 1058 | for i, file in enumerate(files): | ||
81 | 1059 | path = os.path.join(share.path, file) | ||
82 | 1060 | with allow_writes(os.path.split(share.path)[0]): | ||
83 | 1061 | with allow_writes(share.path): | ||
84 | 1062 | open(path, 'w').close() | ||
85 | 1063 | self.main.fs.create(path, share.volume_id) | ||
86 | 1064 | self.main.fs.set_node_id(path, 'file_node_id'+str(i)) | ||
87 | 1065 | paths = self.main.fs.get_paths_starting_with(share.path) | ||
88 | 1066 | self.assertEquals(len(paths), len(dirs+files)+1) | ||
89 | 1067 | |||
90 | 1068 | # unsubscribe from it | ||
91 | 1069 | self.vm.unsubscribe_share(share.volume_id) | ||
92 | 1070 | |||
93 | 1071 | self.assertEquals(2, len(self.vm.shares)) # share and root | ||
94 | 1072 | self.assertFalse(self.vm.shares[share.volume_id].subscribed) | ||
95 | 1073 | # check that the share is in the fsm metadata | ||
96 | 1074 | self.assertTrue(self.main.fs.get_by_path(share.path)) | ||
97 | 1075 | # get the childs (should be an empty list) | ||
98 | 1076 | paths = list(self.main.fs.get_paths_starting_with(share.path)) | ||
99 | 1077 | self.assertEquals(len(dirs+files)+1, len(paths)) | ||
100 | 1078 | # check that there isn't a watch in the share | ||
101 | 1079 | self.assertFalse(self.main.event_q.has_watch(share.path)) | ||
102 | 1080 | # check that the childs don't have a watch | ||
103 | 1081 | for path, is_dir in paths: | ||
104 | 1082 | if is_dir: | ||
105 | 1083 | self.assertFalse(self.main.event_q.has_watch(path)) | ||
106 | 1084 | |||
107 | 1085 | |||
108 | 1086 | class ModifySharesSubscriptionTests(ViewSharesSubscriptionTests): | ||
109 | 1087 | """Test Shares subscription operations when access_level is Modify.""" | ||
110 | 1088 | |||
111 | 1089 | access_level = 'Modify' | ||
112 | 1090 | |||
113 | 1091 | @defer.inlineCallbacks | ||
114 | 1092 | def test_subscribe_share_missing_fsm_md(self): | ||
115 | 1093 | """Test subscribe_share with a missing node in fsm.""" | ||
116 | 1094 | share = self._create_share(access_level=self.access_level, | ||
117 | 1095 | subscribed=False) | ||
118 | 1096 | yield self.vm.add_share(share) | ||
119 | 1097 | self.assertFalse(os.path.exists(share.path)) | ||
120 | 1098 | self.assertFalse(self.vm.shares[share.id].subscribed) | ||
121 | 1099 | yield self.vm.subscribe_share(share.id) | ||
122 | 1100 | yield self.vm.unsubscribe_share(share.id) | ||
123 | 1101 | # delete the fsm metadata | ||
124 | 1102 | self.main.fs.delete_metadata(share.path) | ||
125 | 1103 | # subscribe to it and fail! | ||
126 | 1104 | try: | ||
127 | 1105 | yield self.vm.subscribe_share(share.id) | ||
128 | 1106 | except KeyError, e: | ||
129 | 1107 | self.assertIn(share.path, e.args[0]) | ||
130 | 1108 | else: | ||
131 | 1109 | self.fail('Must get a KeyError!') | ||
132 | 1110 | |||
133 | 1111 | @defer.inlineCallbacks | ||
134 | 1112 | def _test_subscribe_share_generations(self, share): | ||
135 | 1113 | """Test subscribe_share with a generation.""" | ||
136 | 1114 | scratch_d = defer.Deferred() | ||
137 | 1115 | def fake_rescan_from_scratch(volume_id): | ||
138 | 1116 | """A fake rescan_from_scratch that check the arguments.""" | ||
139 | 1117 | self.assertEquals(share.volume_id, volume_id) | ||
140 | 1118 | scratch_d.callback(None) | ||
141 | 1119 | self.main.action_q.rescan_from_scratch = fake_rescan_from_scratch | ||
142 | 1120 | # subscribe to it | ||
143 | 1121 | yield self.vm.subscribe_share(share.volume_id) | ||
144 | 1122 | yield scratch_d | ||
145 | 1123 | self.assertTrue(self.vm.shares[share.volume_id].subscribed) | ||
146 | 1124 | |||
147 | 1125 | @defer.inlineCallbacks | ||
148 | 1126 | def test_subscribe_share_valid_generation(self): | ||
149 | 1127 | """Test subscribe_share with a valid generation.""" | ||
150 | 1128 | share = self._create_share(access_level=self.access_level, | ||
151 | 1129 | subscribed=False) | ||
152 | 1130 | yield self.vm.add_share(share) | ||
153 | 1131 | self.assertFalse(self.vm.shares[share.volume_id].subscribed) | ||
154 | 1132 | # update share generation | ||
155 | 1133 | self.vm.update_generation(share.volume_id, 0) | ||
156 | 1134 | yield self._test_subscribe_share_generations(share) | ||
157 | 1135 | |||
158 | 1136 | @defer.inlineCallbacks | ||
159 | 1137 | def test_subscribe_share_without_generation(self): | ||
160 | 1138 | """Test subscribe_share without a valid generation.""" | ||
161 | 1139 | share = self._create_share(access_level=self.access_level, | ||
162 | 1140 | subscribed=False) | ||
163 | 1141 | yield self.vm.add_share(share) | ||
164 | 1142 | self.assertFalse(self.vm.shares[share.volume_id].subscribed) | ||
165 | 1143 | # update share generation | ||
166 | 1144 | self.vm.update_generation(share.volume_id, None) | ||
167 | 1145 | yield self._test_subscribe_share_generations(share) | ||
168 | 1146 | |||
169 | 1147 | @defer.inlineCallbacks | ||
170 | 1148 | def test_unsubscribe_subscribe_share_with_content(self): | ||
171 | 1149 | """Test for re-subscribing to a share.""" | ||
172 | 1150 | share = self._create_share(access_level=self.access_level, | ||
173 | 1151 | subscribed=True) | ||
174 | 1152 | yield self.vm.add_share(share) | ||
175 | 1153 | self.main.event_q.rm_watch(share.path) | ||
176 | 1154 | self.assertTrue(self.vm.shares[share.volume_id].subscribed) | ||
177 | 1155 | # create a few files and directories | ||
178 | 1156 | dirs = ['dir', 'dir/subdir', 'dir/empty_dir'] | ||
179 | 1157 | for i, dir in enumerate(dirs): | ||
180 | 1158 | path = os.path.join(share.path, dir) | ||
181 | 1159 | with allow_writes(os.path.split(share.path)[0]): | ||
182 | 1160 | with allow_writes(share.path): | ||
183 | 1161 | if not os.path.exists(path): | ||
184 | 1162 | os.makedirs(path) | ||
185 | 1163 | self.main.fs.create(path, share.volume_id, is_dir=True) | ||
186 | 1164 | self.main.fs.set_node_id(path, 'dir_node_id'+str(i)) | ||
187 | 1165 | files = ['a_file', 'dir/file', 'dir/subdir/file'] | ||
188 | 1166 | for i, path in enumerate(files): | ||
189 | 1167 | path = os.path.join(share.path, path) | ||
190 | 1168 | with allow_writes(os.path.split(share.path)[0]): | ||
191 | 1169 | with allow_writes(share.path): | ||
192 | 1170 | open(path, 'w').close() | ||
193 | 1171 | self.main.fs.create(path, share.volume_id) | ||
194 | 1172 | self.main.fs.set_node_id(path, 'file_node_id'+str(i)) | ||
195 | 1173 | paths = list(self.main.fs.get_paths_starting_with(share.path)) | ||
196 | 1174 | # add a inotify watch to the dirs | ||
197 | 1175 | for path, is_dir in paths: | ||
198 | 1176 | if is_dir: | ||
199 | 1177 | self.main.event_q.add_watch(path) | ||
200 | 1178 | self.assertEquals(len(paths), len(dirs+files)+1, paths) | ||
201 | 1179 | |||
202 | 1180 | # unsubscribe from it | ||
203 | 1181 | self.vm.unsubscribe_share(share.volume_id) | ||
204 | 1182 | |||
205 | 1183 | self.assertEquals(2, len(self.vm.shares)) # share and root | ||
206 | 1184 | self.assertFalse(self.vm.shares[share.volume_id].subscribed) | ||
207 | 1185 | # check that the share is in the fsm metadata | ||
208 | 1186 | self.assertTrue(self.main.fs.get_by_path(share.path)) | ||
209 | 1187 | # check that there isn't a watch in the share | ||
210 | 1188 | self.assertFalse(self.main.event_q.has_watch(share.path)) | ||
211 | 1189 | # check that the childs don't have a watch | ||
212 | 1190 | for path, is_dir in paths: | ||
213 | 1191 | if is_dir: | ||
214 | 1192 | self.assertFalse(self.main.event_q.has_watch(path)) | ||
215 | 1193 | # check the childs | ||
216 | 1194 | paths = self.main.fs.get_paths_starting_with(share.path) | ||
217 | 1195 | self.assertEquals(len(dirs+files)+1, len(paths)) | ||
218 | 1196 | # resubscribe to it | ||
219 | 1197 | yield self.vm.subscribe_share(share.volume_id) | ||
220 | 1198 | paths = list(self.main.fs.get_paths_starting_with(share.path)) | ||
221 | 1199 | # we should only have the dirs, as the files metadata is | ||
222 | 1200 | # delete by local rescan (both hashes are '') | ||
223 | 1201 | self.assertEquals(len(dirs)+1, len(paths)) | ||
224 | 1202 | # check that there is a watch in the share | ||
225 | 1203 | self.assertTrue(self.main.event_q.has_watch(share.path)) | ||
226 | 1204 | # check that the child dirs have a watch | ||
227 | 1205 | for path, is_dir in paths: | ||
228 | 1206 | if is_dir: | ||
229 | 1207 | self.assertTrue(self.main.event_q.has_watch(path), | ||
230 | 1208 | '%s has a watch' % path) | ||
231 | 1209 | self.vm._remove_watch(path) | ||
232 | 1210 | |||
233 | 1211 | |||
234 | 986 | class VolumeManagerUnicodeTests(BaseVolumeManagerTests): | 1212 | class VolumeManagerUnicodeTests(BaseVolumeManagerTests): |
235 | 987 | """Tests for Volume Manager unicode capabilities.""" | 1213 | """Tests for Volume Manager unicode capabilities.""" |
236 | 988 | 1214 | ||
237 | 989 | 1215 | ||
238 | === modified file 'ubuntuone/syncdaemon/event_queue.py' | |||
239 | --- ubuntuone/syncdaemon/event_queue.py 2011-01-21 15:45:16 +0000 | |||
240 | +++ ubuntuone/syncdaemon/event_queue.py 2011-02-03 12:52:26 +0000 | |||
241 | @@ -151,6 +151,10 @@ | |||
242 | 151 | 'VM_UDF_UNSUBSCRIBE_ERROR': ('udf_id', 'error'), | 151 | 'VM_UDF_UNSUBSCRIBE_ERROR': ('udf_id', 'error'), |
243 | 152 | 'VM_UDF_CREATED': ('udf',), | 152 | 'VM_UDF_CREATED': ('udf',), |
244 | 153 | 'VM_UDF_CREATE_ERROR': ('path', 'error'), | 153 | 'VM_UDF_CREATE_ERROR': ('path', 'error'), |
245 | 154 | 'VM_SHARE_SUBSCRIBED': ('share',), | ||
246 | 155 | 'VM_SHARE_SUBSCRIBE_ERROR': ('share_id', 'error'), | ||
247 | 156 | 'VM_SHARE_UNSUBSCRIBED': ('share',), | ||
248 | 157 | 'VM_SHARE_UNSUBSCRIBE_ERROR': ('share_id', 'error'), | ||
249 | 154 | 'VM_SHARE_CREATED': ('share_id',), | 158 | 'VM_SHARE_CREATED': ('share_id',), |
250 | 155 | 'VM_SHARE_DELETED': ('share',), | 159 | 'VM_SHARE_DELETED': ('share',), |
251 | 156 | 'VM_SHARE_DELETE_ERROR': ('share_id', 'error'), | 160 | 'VM_SHARE_DELETE_ERROR': ('share_id', 'error'), |
252 | 157 | 161 | ||
253 | === modified file 'ubuntuone/syncdaemon/volume_manager.py' | |||
254 | --- ubuntuone/syncdaemon/volume_manager.py 2011-02-02 18:33:45 +0000 | |||
255 | +++ ubuntuone/syncdaemon/volume_manager.py 2011-02-03 12:52:26 +0000 | |||
256 | @@ -945,6 +945,19 @@ | |||
257 | 945 | if not share.can_write(): | 945 | if not share.can_write(): |
258 | 946 | set_dir_readonly(share.path) | 946 | set_dir_readonly(share.path) |
259 | 947 | 947 | ||
260 | 948 | def _create_udf_dir(self, udf): | ||
261 | 949 | """Create the udf dir if does not exist.""" | ||
262 | 950 | if not path_exists(udf.path): | ||
263 | 951 | # the udf path isn't there, create it! | ||
264 | 952 | make_dir(udf.path, recursive=True) | ||
265 | 953 | |||
266 | 954 | def _create_volume_dir(self, volume): | ||
267 | 955 | """Create the volume dir if does not exist, set perms for shares.""" | ||
268 | 956 | if isinstance(volume, (Share, Root)): | ||
269 | 957 | self._create_share_dir(volume) | ||
270 | 958 | elif isinstance(volume, UDF): | ||
271 | 959 | self._create_udf_dir(volume) | ||
272 | 960 | |||
273 | 948 | def _create_fsm_object(self, path, volume_id, node_id): | 961 | def _create_fsm_object(self, path, volume_id, node_id): |
274 | 949 | """ Creates the mdobj for this share in fs manager. """ | 962 | """ Creates the mdobj for this share in fs manager. """ |
275 | 950 | try: | 963 | try: |
276 | @@ -1103,6 +1116,13 @@ | |||
277 | 1103 | if all_volumes or volume.active: | 1116 | if all_volumes or volume.active: |
278 | 1104 | yield volume | 1117 | yield volume |
279 | 1105 | 1118 | ||
280 | 1119 | def store_volume(self, volume): | ||
281 | 1120 | """Store 'volume'.""" | ||
282 | 1121 | if isinstance(volume, (Share, Root)): | ||
283 | 1122 | self.shares[volume.volume_id] = volume | ||
284 | 1123 | elif isinstance(volume, UDF): | ||
285 | 1124 | self.udfs[volume.volume_id] = volume | ||
286 | 1125 | |||
287 | 1106 | def _is_nested_udf(self, path): | 1126 | def _is_nested_udf(self, path): |
288 | 1107 | """Check if it's ok to create a UDF in 'path'. | 1127 | """Check if it's ok to create a UDF in 'path'. |
289 | 1108 | 1128 | ||
290 | @@ -1168,44 +1188,74 @@ | |||
291 | 1168 | volume = self.get_volume(volume_id) | 1188 | volume = self.get_volume(volume_id) |
292 | 1169 | self.m.action_q.delete_volume(volume.id, volume.path) | 1189 | self.m.action_q.delete_volume(volume.id, volume.path) |
293 | 1170 | 1190 | ||
294 | 1191 | def subscribe_share(self, share_id): | ||
295 | 1192 | """Mark the Share with 'share_id' as subscribed. | ||
296 | 1193 | |||
297 | 1194 | Also fire a local and server rescan. | ||
298 | 1195 | |||
299 | 1196 | """ | ||
300 | 1197 | push_error = functools.partial(self.m.event_q.push, | ||
301 | 1198 | 'VM_SHARE_SUBSCRIBE_ERROR', share_id=share_id) | ||
302 | 1199 | push_success = lambda volume: \ | ||
303 | 1200 | self.m.event_q.push('VM_SHARE_SUBSCRIBED', share=volume) | ||
304 | 1201 | self.log.info('subscribe_share: %r', share_id) | ||
305 | 1202 | d = self._subscribe_volume(share_id, push_success, push_error) | ||
306 | 1203 | return d | ||
307 | 1204 | |||
308 | 1171 | def subscribe_udf(self, udf_id): | 1205 | def subscribe_udf(self, udf_id): |
310 | 1172 | """Mark the UDF with id as subscribed. | 1206 | """Mark the UDF with 'udf_id' as subscribed. |
311 | 1173 | 1207 | ||
312 | 1174 | Also fire a local and server rescan. | 1208 | Also fire a local and server rescan. |
313 | 1175 | 1209 | ||
314 | 1176 | """ | 1210 | """ |
315 | 1177 | push_error = functools.partial(self.m.event_q.push, | 1211 | push_error = functools.partial(self.m.event_q.push, |
316 | 1178 | 'VM_UDF_SUBSCRIBE_ERROR', udf_id=udf_id) | 1212 | 'VM_UDF_SUBSCRIBE_ERROR', udf_id=udf_id) |
317 | 1213 | push_success = lambda volume: self.m.event_q.push('VM_UDF_SUBSCRIBED', | ||
318 | 1214 | udf=volume) | ||
319 | 1179 | self.log.info('subscribe_udf: %r', udf_id) | 1215 | self.log.info('subscribe_udf: %r', udf_id) |
320 | 1216 | d = self._subscribe_volume(udf_id, push_success, push_error) | ||
321 | 1217 | return d | ||
322 | 1218 | |||
323 | 1219 | def _subscribe_volume(self, volume_id, push_success, push_error): | ||
324 | 1220 | """Mark the volume with 'volume_id' as subscribed. | ||
325 | 1221 | |||
326 | 1222 | If can_write(), fire a local and server rescan while temporary | ||
327 | 1223 | unsubscribing from it. | ||
328 | 1224 | |||
329 | 1225 | """ | ||
330 | 1226 | self.log.debug('_subscribe_volume: %r', volume_id) | ||
331 | 1180 | try: | 1227 | try: |
334 | 1181 | udf = self.udfs[udf_id] | 1228 | volume = self.get_volume(volume_id) |
335 | 1182 | except KeyError: | 1229 | except VolumeDoesNotExist, e: |
336 | 1183 | push_error(error="DOES_NOT_EXIST") | 1230 | push_error(error="DOES_NOT_EXIST") |
352 | 1184 | d = defer.fail(VolumeDoesNotExist(udf_id)) | 1231 | return defer.fail(e) |
353 | 1185 | else: | 1232 | |
354 | 1186 | if not path_exists(udf.path): | 1233 | self._create_volume_dir(volume) |
355 | 1187 | # the udf path isn't there, create it! | 1234 | |
356 | 1188 | make_dir(udf.path, recursive=True) | 1235 | def subscribe(result): |
357 | 1189 | def subscribe(result): | 1236 | """Subscribe the volume after the local rescan. |
358 | 1190 | """Subscribe the UDF after the local rescan. | 1237 | |
359 | 1191 | 1238 | As we don't wait for server rescan to finish, the volume is | |
360 | 1192 | As we don't wait for server rescan to finish, the udf is | 1239 | subscribed just after the local rescan it's done. |
361 | 1193 | subscribed just after the local rescan it's done. | 1240 | |
362 | 1194 | 1241 | """ | |
363 | 1195 | """ | 1242 | volume.subscribed = True |
364 | 1196 | udf.subscribed = True | 1243 | self.store_volume(volume) |
365 | 1197 | self.udfs[udf_id] = udf | 1244 | return result |
366 | 1198 | return result | 1245 | |
367 | 1246 | if volume.can_write(): | ||
368 | 1199 | try: | 1247 | try: |
370 | 1200 | d = self._scan_udf(udf) | 1248 | d = self._scan_volume(volume) |
371 | 1201 | except KeyError, e: | 1249 | except KeyError, e: |
372 | 1202 | push_error(error="METADATA_DOES_NOT_EXIST") | 1250 | push_error(error="METADATA_DOES_NOT_EXIST") |
379 | 1203 | d = defer.fail(e) | 1251 | return defer.fail(e) |
380 | 1204 | else: | 1252 | else: |
381 | 1205 | d.addCallback(subscribe) | 1253 | # avoid local and server rescan for read-only volumes |
382 | 1206 | d.addCallbacks( | 1254 | d = defer.succeed(None) |
383 | 1207 | lambda _: self.m.event_q.push('VM_UDF_SUBSCRIBED', udf=udf), | 1255 | |
384 | 1208 | lambda f: push_error(error=f.getErrorMessage())) | 1256 | d.addCallback(subscribe) |
385 | 1257 | d.addCallbacks(lambda _: push_success(volume), | ||
386 | 1258 | lambda f: push_error(error=f.getErrorMessage())) | ||
387 | 1209 | return d | 1259 | return d |
388 | 1210 | 1260 | ||
389 | 1211 | def _scan_udf(self, udf): | 1261 | def _scan_udf(self, udf): |
390 | @@ -1230,21 +1280,38 @@ | |||
391 | 1230 | d.addCallback(server_rescan) | 1280 | d.addCallback(server_rescan) |
392 | 1231 | return d | 1281 | return d |
393 | 1232 | 1282 | ||
394 | 1283 | def unsubscribe_share(self, share_id): | ||
395 | 1284 | """Mark the share with share_id as unsubscribed.""" | ||
396 | 1285 | self.log.info('unsubscribe_share: %r', share_id) | ||
397 | 1286 | push_error = functools.partial(self.m.event_q.push, | ||
398 | 1287 | 'VM_SHARE_UNSUBSCRIBE_ERROR', share_id=share_id) | ||
399 | 1288 | push_success = lambda volume: \ | ||
400 | 1289 | self.m.event_q.push('VM_SHARE_UNSUBSCRIBED', share=volume) | ||
401 | 1290 | self._unsubscribe_volume(share_id, push_success, push_error) | ||
402 | 1291 | |||
403 | 1233 | def unsubscribe_udf(self, udf_id): | 1292 | def unsubscribe_udf(self, udf_id): |
404 | 1234 | """Mark the UDF with udf_id as unsubscribed.""" | 1293 | """Mark the UDF with udf_id as unsubscribed.""" |
405 | 1235 | self.log.info('unsubscribe_udf: %r', udf_id) | 1294 | self.log.info('unsubscribe_udf: %r', udf_id) |
406 | 1295 | push_error = functools.partial(self.m.event_q.push, | ||
407 | 1296 | 'VM_UDF_UNSUBSCRIBE_ERROR', udf_id=udf_id) | ||
408 | 1297 | push_success = lambda volume: \ | ||
409 | 1298 | self.m.event_q.push('VM_UDF_UNSUBSCRIBED', udf=volume) | ||
410 | 1299 | self._unsubscribe_volume(udf_id, push_success, push_error) | ||
411 | 1300 | |||
412 | 1301 | def _unsubscribe_volume(self, volume_id, push_success, push_error): | ||
413 | 1302 | """Mark the volume with volume_id as unsubscribed.""" | ||
414 | 1303 | self.log.debug('unsubscribe_volume: %r', volume_id) | ||
415 | 1236 | try: | 1304 | try: |
420 | 1237 | udf = self.udfs[udf_id] | 1305 | volume = self.get_volume(volume_id) |
421 | 1238 | except KeyError: | 1306 | except VolumeDoesNotExist: |
422 | 1239 | self.m.event_q.push('VM_UDF_UNSUBSCRIBE_ERROR', udf_id=udf_id, | 1307 | push_error(error="DOES_NOT_EXIST") |
419 | 1240 | error="DOES_NOT_EXIST") | ||
423 | 1241 | else: | 1308 | else: |
424 | 1242 | # remove the inotify watches, but don't delete the metadata | 1309 | # remove the inotify watches, but don't delete the metadata |
430 | 1243 | self._remove_watches(udf.path) | 1310 | self._remove_watches(volume.path) |
431 | 1244 | # makr the udf as unsubscribed | 1311 | # mark the volume as unsubscribed |
432 | 1245 | udf.subscribed = False | 1312 | volume.subscribed = False |
433 | 1246 | self.udfs[udf_id] = udf | 1313 | self.store_volume(volume) |
434 | 1247 | self.m.event_q.push('VM_UDF_UNSUBSCRIBED', udf=udf) | 1314 | push_success(volume) |
435 | 1248 | 1315 | ||
436 | 1249 | def handle_AQ_CREATE_UDF_OK(self, marker, volume_id, node_id): | 1316 | def handle_AQ_CREATE_UDF_OK(self, marker, volume_id, node_id): |
437 | 1250 | """Handle AQ_CREATE_UDF_OK.""" | 1317 | """Handle AQ_CREATE_UDF_OK.""" |
+1 looks good to me.