Merge lp:~gholt/swift/lobjects3 into lp:~hudson-openstack/swift/trunk
- lobjects3
- Merge into trunk
Proposed by
gholt
Status: | Rejected |
---|---|
Rejected by: | gholt |
Proposed branch: | lp:~gholt/swift/lobjects3 |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
4643 lines (+3054/-575) 29 files modified
bin/swift-init (+12/-6) bin/swift-object-janitor (+28/-0) doc/source/deployment_guide.rst (+26/-0) doc/source/development_saio.rst (+9/-0) doc/source/index.rst (+1/-0) doc/source/object.rst (+19/-0) doc/source/overview_very_large_objects.rst (+144/-0) etc/object-server.conf-sample (+15/-0) etc/proxy-server.conf-sample (+3/-0) setup.py (+16/-23) swift/common/constraints.py (+11/-6) swift/common/db.py (+1/-2) swift/obj/auditor.py (+9/-15) swift/obj/diskfile.py (+507/-0) swift/obj/janitor.py (+516/-0) swift/obj/replicator.py (+32/-176) swift/obj/server.py (+109/-236) swift/obj/updater.py (+16/-2) swift/proxy/server.py (+391/-26) test/functional/sample.conf (+8/-1) test/functional/tests.py (+1/-1) test/probe/common.py (+1/-1) test/probe/test_object_async_update.py (+1/-1) test/unit/common/test_constraints.py (+6/-8) test/unit/obj/test_diskfile.py (+203/-0) test/unit/obj/test_janitor.py (+433/-0) test/unit/obj/test_server.py (+179/-62) test/unit/obj/test_updater.py (+8/-4) test/unit/proxy/test_server.py (+349/-5) |
To merge this branch: | bzr merge lp:~gholt/swift/lobjects3 |
Related bugs: | |
Related blueprints: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
gholt (community) | Disapprove | ||
Review via email: mp+39792@code.launchpad.net |
Commit message
Very large object support.
Description of the change
Very large object support.
Please read doc/source/
To post a comment you must log in.
Unmerged revisions
- 124. By gholt
-
Merge from trunk
- 123. By gholt
-
Merged from trunk
- 122. By gholt
-
Merged from trunk
- 121. By gholt
-
Added missing SkipTest import
- 120. By gholt
-
Make poor Hudson happier
- 119. By gholt
-
Documentation
- 118. By gholt
-
Tests for proxy server
- 117. By gholt
-
More object server tests
- 116. By gholt
-
More tests for the janitor
- 115. By gholt
-
Working on tests and bugfixes
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'bin/swift-init' |
2 | --- bin/swift-init 2010-08-19 20:01:44 +0000 |
3 | +++ bin/swift-init 2010-11-08 18:51:48 +0000 |
4 | @@ -23,10 +23,11 @@ |
5 | import sys |
6 | import time |
7 | |
8 | -ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor', |
9 | +ALL_SERVERS = ['account-auditor', 'account-reaper', 'account-replicator', |
10 | + 'account-server', 'auth-server', 'container-auditor', |
11 | 'container-replicator', 'container-server', 'container-updater', |
12 | - 'object-auditor', 'object-server', 'object-replicator', 'object-updater', |
13 | - 'proxy-server', 'account-replicator', 'auth-server', 'account-reaper'] |
14 | + 'object-auditor', 'object-janitor', 'object-replicator', 'object-server', |
15 | + 'object-updater', 'proxy-server'] |
16 | GRACEFUL_SHUTDOWN_SERVERS = ['account-server', 'container-server', |
17 | 'object-server', 'proxy-server', 'auth-server'] |
18 | MAX_DESCRIPTORS = 32768 |
19 | @@ -41,6 +42,7 @@ |
20 | servers = [server] |
21 | command = command.lower() |
22 | |
23 | + |
24 | def pid_files(server): |
25 | if os.path.exists('/var/run/swift/%s.pid' % server): |
26 | pid_files = ['/var/run/swift/%s.pid' % server] |
27 | @@ -50,6 +52,7 @@ |
28 | pid = int(open(pid_file).read().strip()) |
29 | yield pid_file, pid |
30 | |
31 | + |
32 | def do_start(server, once=False): |
33 | server_type = '-'.join(server.split('-')[:-1]) |
34 | |
35 | @@ -77,7 +80,7 @@ |
36 | os.makedirs(dir) |
37 | except OSError, err: |
38 | if err.errno == errno.EACCES: |
39 | - sys.exit('Unable to create %s. Running as non-root?' % dir) |
40 | + sys.exit('Unable to create %s. Running as non-root?' % dir) |
41 | fp = open(pid_file, 'w') |
42 | fp.write('%d\n' % pid) |
43 | fp.close() |
44 | @@ -120,18 +123,21 @@ |
45 | elif os.path.exists('/etc/swift/%s-server/' % server_type): |
46 | # found config directory, searching for config file(s) |
47 | launch_args = [] |
48 | - for num, ini_file in enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)): |
49 | + for num, ini_file in \ |
50 | + enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)): |
51 | pid_file = '/var/run/swift/%s/%d.pid' % (server, num) |
52 | # start a server for each ini_file found |
53 | launch_args.append((ini_file, pid_file)) |
54 | else: |
55 | # maybe there's a config file(s) out there, but I couldn't find it! |
56 | - sys.exit('Unable to locate config file for %s. %s does not exist?' % (server, ini_file)) |
57 | + sys.exit('Unable to locate config file for %s. %s does not exist?' % |
58 | + (server, ini_file)) |
59 | |
60 | # start all servers |
61 | for ini_file, pid_file in launch_args: |
62 | launch(ini_file, pid_file) |
63 | |
64 | + |
65 | def do_stop(server, graceful=False): |
66 | if graceful and server in GRACEFUL_SHUTDOWN_SERVERS: |
67 | sig = signal.SIGHUP |
68 | |
69 | === added file 'bin/swift-object-janitor' |
70 | --- bin/swift-object-janitor 1970-01-01 00:00:00 +0000 |
71 | +++ bin/swift-object-janitor 2010-11-08 18:51:48 +0000 |
72 | @@ -0,0 +1,28 @@ |
73 | +#!/usr/bin/python |
74 | +# Copyright (c) 2010 OpenStack, LLC. |
75 | +# |
76 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
77 | +# you may not use this file except in compliance with the License. |
78 | +# You may obtain a copy of the License at |
79 | +# |
80 | +# http://www.apache.org/licenses/LICENSE-2.0 |
81 | +# |
82 | +# Unless required by applicable law or agreed to in writing, software |
83 | +# distributed under the License is distributed on an "AS IS" BASIS, |
84 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
85 | +# implied. |
86 | +# See the License for the specific language governing permissions and |
87 | +# limitations under the License. |
88 | + |
89 | +import sys |
90 | + |
91 | +from swift.obj.janitor import ObjectJanitor |
92 | +from swift.common import utils |
93 | + |
94 | +if __name__ == '__main__': |
95 | + if len(sys.argv) < 2: |
96 | + print "Usage: swift-object-janitor CONFIG_FILE [once]" |
97 | + sys.exit(1) |
98 | + once = len(sys.argv) > 2 and sys.argv[2] == 'once' |
99 | + conf = utils.readconf(sys.argv[1], 'object-janitor') |
100 | + ObjectJanitor(conf).run(once) |
101 | |
102 | === modified file 'doc/source/deployment_guide.rst' |
103 | --- doc/source/deployment_guide.rst 2010-10-21 18:59:43 +0000 |
104 | +++ doc/source/deployment_guide.rst 2010-11-08 18:51:48 +0000 |
105 | @@ -206,6 +206,25 @@ |
106 | object can be reclaimed |
107 | ================== ================= ======================================= |
108 | |
109 | +[object-janitor] |
110 | + |
111 | +=================== ============== ========================================== |
112 | +Option Default Description |
113 | +------------------- -------------- ------------------------------------------ |
114 | +log_name object-janitor Label used when logging |
115 | +log_facility LOG_LOCAL0 Syslog log facility |
116 | +log_level INFO Logging level |
117 | +interval 300 Minimum time for a pass to take |
118 | +concurrency 1 Number of updater workers to spawn |
119 | +node_timeout 10 Request timeout to external services |
120 | +conn_timeout 0.5 Connection timeout to external services |
121 | +slowdown 0.01 Time in seconds to wait between operations |
122 | +segment_reclaim_age 604800 Number of seconds before removing orphaned |
123 | + object segments |
124 | +segments_per_pass 10 Maximum segments per object to remove per |
125 | + pass |
126 | +=================== ============== ========================================== |
127 | + |
128 | [object-updater] |
129 | |
130 | ================== ============== ========================================== |
131 | @@ -438,6 +457,13 @@ |
132 | log_level INFO Log level |
133 | log_headers True If True, log headers in each |
134 | request |
135 | +max_object_size 107374182400 Maximum size of any object in |
136 | + the cluster. Note: The |
137 | + segment_size value will cause |
138 | + objects larger than that to be |
139 | + split into segments |
140 | +segment_size 2147483647 Objects will be split into file |
141 | + segments no larger than this |
142 | recheck_account_existence 60 Cache timeout in seconds to |
143 | send memcached for account |
144 | existence |
145 | |
146 | === modified file 'doc/source/development_saio.rst' |
147 | --- doc/source/development_saio.rst 2010-11-02 14:17:25 +0000 |
148 | +++ doc/source/development_saio.rst 2010-11-08 18:51:48 +0000 |
149 | @@ -440,6 +440,8 @@ |
150 | [object-replicator] |
151 | vm_test_mode = yes |
152 | |
153 | + [object-janitor] |
154 | + |
155 | [object-updater] |
156 | |
157 | [object-auditor] |
158 | @@ -461,6 +463,8 @@ |
159 | [object-replicator] |
160 | vm_test_mode = yes |
161 | |
162 | + [object-janitor] |
163 | + |
164 | [object-updater] |
165 | |
166 | [object-auditor] |
167 | @@ -482,6 +486,8 @@ |
168 | [object-replicator] |
169 | vm_test_mode = yes |
170 | |
171 | + [object-janitor] |
172 | + |
173 | [object-updater] |
174 | |
175 | [object-auditor] |
176 | @@ -503,6 +509,8 @@ |
177 | [object-replicator] |
178 | vm_test_mode = yes |
179 | |
180 | + [object-janitor] |
181 | + |
182 | [object-updater] |
183 | |
184 | [object-auditor] |
185 | @@ -571,6 +579,7 @@ |
186 | # Replace devauth with whatever your super_admin key is (recorded in |
187 | # /etc/swift/auth-server.conf). |
188 | swift-auth-recreate-accounts -K devauth |
189 | + swift-init object-janitor start |
190 | swift-init object-updater start |
191 | swift-init container-updater start |
192 | swift-init object-replicator start |
193 | |
194 | === modified file 'doc/source/index.rst' |
195 | --- doc/source/index.rst 2010-11-04 19:25:23 +0000 |
196 | +++ doc/source/index.rst 2010-11-08 18:51:48 +0000 |
197 | @@ -26,6 +26,7 @@ |
198 | overview_replication |
199 | overview_stats |
200 | ratelimit |
201 | + overview_very_large_objects |
202 | |
203 | Development: |
204 | |
205 | |
206 | === modified file 'doc/source/object.rst' |
207 | --- doc/source/object.rst 2010-07-19 16:25:18 +0000 |
208 | +++ doc/source/object.rst 2010-11-08 18:51:48 +0000 |
209 | @@ -24,6 +24,16 @@ |
210 | :undoc-members: |
211 | :show-inheritance: |
212 | |
213 | +.. _object-janitor: |
214 | + |
215 | +Object Janitor |
216 | +============== |
217 | + |
218 | +.. automodule:: swift.obj.janitor |
219 | + :members: |
220 | + :undoc-members: |
221 | + :show-inheritance: |
222 | + |
223 | .. _object-updater: |
224 | |
225 | Object Updater |
226 | @@ -44,3 +54,12 @@ |
227 | :undoc-members: |
228 | :show-inheritance: |
229 | |
230 | +.. _object-diskfile: |
231 | + |
232 | +Disk File |
233 | +========= |
234 | + |
235 | +.. automodule:: swift.obj.diskfile |
236 | + :members: |
237 | + :undoc-members: |
238 | + :show-inheritance: |
239 | |
240 | === added file 'doc/source/overview_very_large_objects.rst' |
241 | --- doc/source/overview_very_large_objects.rst 1970-01-01 00:00:00 +0000 |
242 | +++ doc/source/overview_very_large_objects.rst 2010-11-08 18:51:48 +0000 |
243 | @@ -0,0 +1,144 @@ |
244 | +========================= |
245 | +Very Large Object Support |
246 | +========================= |
247 | + |
248 | +----- |
249 | +Intro |
250 | +----- |
251 | + |
252 | +Supporting very large objects in Swift presented quite a challenge. The main |
253 | +problem is storage balance; if you just have a few very large objects in the |
254 | +cluster some of the storage nodes will have significantly more data than the |
255 | +others. The basic answer is to break these objects up into segments and |
256 | +distribute the segments across the cluster evenly. |
257 | + |
258 | +The user could do this themselves, breaking up their very large objects into |
259 | +smaller objects and uploading those. But then they'd have to reassemble them |
260 | +themselves as well on download. What we do in Swift is essentially emulate this |
261 | +behavior for the user, transparently to the user. In this way, we can pick the |
262 | +optimal segment size for our cluster, something the user probably wouldn't |
263 | +know. This also allows the user to easily do ranged requests on the object |
264 | +without having to know how it's split up behind the scenes. |
265 | + |
266 | +----------------------- |
267 | +The Proxy is in Control |
268 | +----------------------- |
269 | + |
270 | +In Swift's implementation, the proxy server is in control of the object |
271 | +segmentation. As the user uploads a very large object, the proxy automaticaly |
272 | +closes the current segment when it reaches a configurable segment size and |
273 | +opens a new segment for more data. Once all the data is uploaded and stored, |
274 | +the proxy server then creates a manifest object indicating how the object was |
275 | +segmented so that it can be retreived later. The proxy does not spool any data |
276 | +to disk, it sends all data directly to the object servers. |
277 | + |
278 | +Because of these segment switchovers occurring in the proxy server, a very |
279 | +large object operation will have a higher chance of failure than a normal sized |
280 | +object. If at any point the proxy can't communicate with at least a majority of |
281 | +the nodes for each and every segment of the object, the whole object operation |
282 | +will fail. For instance, a regular 1m object in a 3 replica cluster just needs |
283 | +to make at least 2 object server requests to succeed. If the segment size is |
284 | +1G, a 10G object would need 2 object server requests to succeed 11 times during |
285 | +the operation (10 segments, 1 manifest). |
286 | + |
287 | +The upside of the proxy being in control is that the client does not need to |
288 | +know anything about the segmentation; it's done for them. The downside is that |
289 | +failure to upload means starting the whole thing over again. Later we plan to |
290 | +implement what we term 'user manifests' where the user can upload several |
291 | +objects but download them as if they were one. This would allow the user to |
292 | +just reupload a smaller part of the whole in the case of upload failures, but |
293 | +would require them to manage the segments themselves. |
294 | + |
295 | +------------------------- |
296 | +How They're Really Stored |
297 | +------------------------- |
298 | + |
299 | +If you have a working knowledge of how Swift stores and locates regular objects |
300 | +you'll know that the object name is hashed and that that hash determines the |
301 | +storage nodes and disk location for the object's data. Segmented objects work |
302 | +much the same way, but they have as many names as they do segments, plus one |
303 | +more for the manifest. |
304 | + |
305 | +The manifest is stored at the same location as the object would be if it were a |
306 | +normal object. The manifest is the last item stored once all the segments are |
307 | +in place, so that only one version of an object is available at any given time. |
308 | +This is important because you don't want to be able to retrieve a manifest and |
309 | +not be able to find the segments yet. |
310 | + |
311 | +The segments are given the same name as the object with the operation's |
312 | +timestamp and the segment number appended. The reason the timestamp is used is |
313 | +to keep the segments of multiple uploads of the same very large object from |
314 | +colliding. During an overwrite of a very large object, you don't want a |
315 | +download of that object to have some new data and some old. Since the segments |
316 | +for each version of the object are stored independently, a download before the |
317 | +new manifest file is in place will get the old data and a download afterwards |
318 | +will get the new data. Also, in the case where the new upload fails in transit, |
319 | +the old version will still be available. |
320 | + |
321 | +The storage nodes for each segment are determined the familiar way of hashing |
322 | +the segment name (object name + timestamp + segment number as described above) |
323 | +and then picking the storage nodes based on the hash. However, there is an |
324 | +exception for the first segment. This segment will always be stored on the same |
325 | +storage node and disk partition as a regular object of the same name would be, |
326 | +though the hash of the segment name determines the exact on disk location. This |
327 | +is because with chunked transfer encoded uploads we don't know the object size |
328 | +until the upload is complete. We won't realize we need to segment such an |
329 | +object until we've already uploaded the first segment's worth to the regular |
330 | +object storage nodes. |
331 | + |
332 | +To help keep directory structures smaller, object segments are stored in an |
333 | +'object_segments' directory alongside the usual 'objects' directory. Though all |
334 | +these files could be stored in the same directory structure, making this split |
335 | +should make replication, reclamation, and other scan-type operations less of a |
336 | +jar to the systems. |
337 | + |
338 | +.. note:: |
339 | + |
340 | + Because the first segment is always stored on the same storage nodes and in |
341 | + the same partition as a regular file would, you can often see hashes in a |
342 | + partition where they don't seem to belong. For instance, a first segment |
343 | + name that hashes to 25d40100fd07c8c5fd1a4e31875593e1 might be found in |
344 | + partition 568363 (hex 8ac2b) instead of the expected 154944 (hex 25d40) |
345 | + because the segment's true object name hashed to |
346 | + 8ac2bf59556b61bb5cc521ccb51c200a. You should only see these anomalies in |
347 | + the the object_segments directory, however. |
348 | + |
349 | +----------- |
350 | +Cleaning Up |
351 | +----------- |
352 | + |
353 | +Another challenge very large object support brought was in how to keep |
354 | +everything cleaned up and reduce the chance of completely orphaned segments |
355 | +that would waste a large amount of storage. |
356 | + |
357 | +One such scenario is the common DELETE operation. We don't want to make the |
358 | +client wait while we go out and delete all the segments and the manifest from |
359 | +all over the cluster. Instead, we write out a file that will start a background |
360 | +operation to the cleanup and we just remove the manifest file. An object |
361 | +overwrite is much like a delete in that just before placing the new manifest |
362 | +file we create the background job. |
363 | + |
364 | +Another scenario is failed uploads. If a user disconnects after uploading 100 |
365 | +segments before finishing, we have to clean those up. Also, a proxy could crash |
366 | +due to hardware failure right in the middle of such an operation, so we |
367 | +wouldn't even be able to make a cleanup job at that point. So, we make a |
368 | +pending cleanup job just before starting any segmented upload. That pending |
369 | +cleanup job will wait up to a configurable amount of time (1 week by default) |
370 | +for the associated manifest to appear. If the manifest appears, the job is |
371 | +canceled and deleted. If the manifest never appears, the job goes about |
372 | +removing the orphaned segments from the system. |
373 | + |
374 | +Prior to very large object support, we had a background process called object |
375 | +async. Object async would send object metadata updates to the container servers |
376 | +in the event the container servers couldn't be contacted right away. These |
377 | +update jobs were uncommon and quick to complete, so durability wasn't too much |
378 | +of a concern. However, these segment cleanup jobs can hang around for a while |
379 | +(1 week in the above example of a proxy crash), so durable background jobs |
380 | +became important. |
381 | + |
382 | +With this, the object janitor was born. He accomplishes what object async did |
383 | +and the segment cleanup operations as well. The janitor jobs are stored just |
384 | +like regular object data is, except that they're stored in the object_janitor |
385 | +directory structure. Because they're stored in the same way, they can use the |
386 | +same object replicator code to keep durability with replicas on multiple |
387 | +storage nodes. |
388 | |
389 | === modified file 'etc/object-server.conf-sample' |
390 | --- etc/object-server.conf-sample 2010-10-19 15:02:36 +0000 |
391 | +++ etc/object-server.conf-sample 2010-11-08 18:51:48 +0000 |
392 | @@ -44,6 +44,21 @@ |
393 | # The replicator also performs reclamation |
394 | # reclaim_age = 604800 |
395 | |
396 | +[object-janitor] |
397 | +# log_name = object-janitor |
398 | +# interval = 300 |
399 | +# concurrency = 1 |
400 | +# node_timeout = 10 |
401 | +# conn_timeout = 0.5 |
402 | +# slowdown will sleep that amount between janitor operations |
403 | +# slowdown = 0.01 |
404 | +# Number of seconds before assuming a segmented put will never succeed and |
405 | +# therefore clean up any orphaned segments from the operation. |
406 | +# segment_reclaim_age = 604800 |
407 | +# Number of segments to remove per pass when cleaning up after superceded or |
408 | +# orphaned segmented put operations. |
409 | +# segments_per_pass = 10 |
410 | + |
411 | [object-updater] |
412 | # log_name = object-updater |
413 | # interval = 300 |
414 | |
415 | === modified file 'etc/proxy-server.conf-sample' |
416 | --- etc/proxy-server.conf-sample 2010-11-03 20:17:27 +0000 |
417 | +++ etc/proxy-server.conf-sample 2010-11-08 18:51:48 +0000 |
418 | @@ -17,6 +17,9 @@ |
419 | # log_facility = LOG_LOCAL0 |
420 | # log_level = INFO |
421 | # log_headers = False |
422 | +# max_object_size = 107374182400 |
423 | +# Files will be split into segments no larger than segment_size |
424 | +# segment_size = 2147483647 |
425 | # recheck_account_existence = 60 |
426 | # recheck_container_existence = 60 |
427 | # object_chunk_size = 8192 |
428 | |
429 | === modified file 'setup.py' |
430 | --- setup.py 2010-11-03 19:50:35 +0000 |
431 | +++ setup.py 2010-11-08 18:51:48 +0000 |
432 | @@ -21,6 +21,7 @@ |
433 | |
434 | from swift import __version__ as version |
435 | |
436 | + |
437 | class local_sdist(sdist): |
438 | """Customized sdist hook - builds the ChangeLog file from VC first""" |
439 | |
440 | @@ -57,29 +58,21 @@ |
441 | 'Environment :: No Input/Output (Daemon)', |
442 | ], |
443 | install_requires=[], # removed for better compat |
444 | - scripts=[ |
445 | - 'bin/st', 'bin/swift-account-auditor', |
446 | - 'bin/swift-account-audit', 'bin/swift-account-reaper', |
447 | - 'bin/swift-account-replicator', 'bin/swift-account-server', |
448 | - 'bin/swift-auth-add-user', |
449 | - 'bin/swift-auth-recreate-accounts', 'bin/swift-auth-server', |
450 | - 'bin/swift-auth-update-reseller-prefixes', |
451 | - 'bin/swift-container-auditor', |
452 | - 'bin/swift-container-replicator', |
453 | - 'bin/swift-container-server', 'bin/swift-container-updater', |
454 | - 'bin/swift-drive-audit', 'bin/swift-get-nodes', |
455 | - 'bin/swift-init', 'bin/swift-object-auditor', |
456 | - 'bin/swift-object-info', |
457 | - 'bin/swift-object-replicator', |
458 | - 'bin/swift-object-server', |
459 | - 'bin/swift-object-updater', 'bin/swift-proxy-server', |
460 | - 'bin/swift-ring-builder', 'bin/swift-stats-populate', |
461 | - 'bin/swift-stats-report', |
462 | - 'bin/swift-bench', |
463 | - 'bin/swift-log-uploader', |
464 | - 'bin/swift-log-stats-collector', |
465 | - 'bin/swift-account-stats-logger', |
466 | - ], |
467 | + scripts=['bin/st', 'bin/swift-account-audit', 'bin/swift-account-auditor', |
468 | + 'bin/swift-account-reaper', 'bin/swift-account-replicator', |
469 | + 'bin/swift-account-server', 'bin/swift-account-stats-logger', |
470 | + 'bin/swift-auth-add-user', 'bin/swift-auth-recreate-accounts', |
471 | + 'bin/swift-auth-server', 'bin/swift-auth-update-reseller-prefixes', |
472 | + 'bin/swift-bench', 'bin/swift-container-auditor', |
473 | + 'bin/swift-container-replicator', 'bin/swift-container-server', |
474 | + 'bin/swift-container-updater', 'bin/swift-drive-audit', |
475 | + 'bin/swift-get-nodes', 'bin/swift-init', |
476 | + 'bin/swift-log-stats-collector', 'bin/swift-log-uploader', |
477 | + 'bin/swift-object-auditor', 'bin/swift-object-info', |
478 | + 'bin/swift-object-janitor', 'bin/swift-object-replicator', |
479 | + 'bin/swift-object-server', 'bin/swift-object-updater', |
480 | + 'bin/swift-proxy-server', 'bin/swift-ring-builder', |
481 | + 'bin/swift-stats-populate', 'bin/swift-stats-report'], |
482 | entry_points={ |
483 | 'paste.app_factory': [ |
484 | 'proxy=swift.proxy.server:app_factory', |
485 | |
486 | === modified file 'swift/common/constraints.py' |
487 | --- swift/common/constraints.py 2010-10-26 15:13:14 +0000 |
488 | +++ swift/common/constraints.py 2010-11-08 18:51:48 +0000 |
489 | @@ -19,8 +19,6 @@ |
490 | HTTPRequestEntityTooLarge |
491 | |
492 | |
493 | -#: Max file size allowed for objects |
494 | -MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2 |
495 | #: Max length of the name of a key for metadata |
496 | MAX_META_NAME_LENGTH = 128 |
497 | #: Max length of the value of a key for metadata |
498 | @@ -29,14 +27,18 @@ |
499 | MAX_META_COUNT = 90 |
500 | #: Max overall size of metadata |
501 | MAX_META_OVERALL_SIZE = 4096 |
502 | +#: Max account name length |
503 | +MAX_ACCOUNT_NAME_LENGTH = 256 |
504 | +#: Max container name length |
505 | +MAX_CONTAINER_NAME_LENGTH = 256 |
506 | #: Max object name length |
507 | MAX_OBJECT_NAME_LENGTH = 1024 |
508 | #: Max object list length of a get request for a container |
509 | CONTAINER_LISTING_LIMIT = 10000 |
510 | #: Max container list length of a get request for an account |
511 | ACCOUNT_LISTING_LIMIT = 10000 |
512 | -MAX_ACCOUNT_NAME_LENGTH = 256 |
513 | -MAX_CONTAINER_NAME_LENGTH = 256 |
514 | +#: Default pickle protocol number used for Swift pickles |
515 | +PICKLE_PROTOCOL = 2 |
516 | |
517 | |
518 | def check_metadata(req, target_type): |
519 | @@ -82,19 +84,22 @@ |
520 | return None |
521 | |
522 | |
523 | -def check_object_creation(req, object_name): |
524 | +def check_object_creation(req, object_name, max_object_size=0): |
525 | """ |
526 | Check to ensure that everything is alright about an object to be created. |
527 | |
528 | :param req: HTTP request object |
529 | :param object_name: name of object to be created |
530 | + :param max_object_size: the maximum object size to check against; 0 if no |
531 | + object size checking should be done |
532 | :raises HTTPRequestEntityTooLarge: the object is too large |
533 | :raises HTTPLengthRequered: missing content-length header and not |
534 | a chunked request |
535 | :raises HTTPBadRequest: missing or bad content-type header, or |
536 | bad metadata |
537 | """ |
538 | - if req.content_length and req.content_length > MAX_FILE_SIZE: |
539 | + if max_object_size > 0 and req.content_length and \ |
540 | + req.content_length > max_object_size: |
541 | return HTTPRequestEntityTooLarge(body='Your request is too large.', |
542 | request=req, content_type='text/plain') |
543 | if req.content_length is None and \ |
544 | |
545 | === modified file 'swift/common/db.py' |
546 | --- swift/common/db.py 2010-08-16 22:30:27 +0000 |
547 | +++ swift/common/db.py 2010-11-08 18:51:48 +0000 |
548 | @@ -32,6 +32,7 @@ |
549 | import simplejson as json |
550 | import sqlite3 |
551 | |
552 | +from swift.common.constraints import PICKLE_PROTOCOL |
553 | from swift.common.utils import normalize_timestamp, renamer, \ |
554 | mkdirs, lock_parent_directory, fallocate |
555 | from swift.common.exceptions import LockTimeout |
556 | @@ -39,8 +40,6 @@ |
557 | |
558 | #: Timeout for trying to connect to a DB |
559 | BROKER_TIMEOUT = 25 |
560 | -#: Pickle protocol to use |
561 | -PICKLE_PROTOCOL = 2 |
562 | #: Max number of pending entries |
563 | PENDING_CAP = 131072 |
564 | |
565 | |
566 | === modified file 'swift/obj/auditor.py' |
567 | --- swift/obj/auditor.py 2010-10-21 18:32:10 +0000 |
568 | +++ swift/obj/auditor.py 2010-11-08 18:51:48 +0000 |
569 | @@ -18,8 +18,8 @@ |
570 | from hashlib import md5 |
571 | from random import random |
572 | |
573 | -from swift.obj import server as object_server |
574 | -from swift.obj.replicator import invalidate_hash |
575 | +from swift.obj.diskfile import DATADIR, DiskFile, invalidate_hash, \ |
576 | + read_metadata |
577 | from swift.common.utils import get_logger, renamer, audit_location_generator |
578 | from swift.common.exceptions import AuditException |
579 | from swift.common.daemon import Daemon |
580 | @@ -45,10 +45,8 @@ |
581 | time.sleep(random() * self.interval) |
582 | while True: |
583 | begin = time.time() |
584 | - all_locs = audit_location_generator(self.devices, |
585 | - object_server.DATADIR, |
586 | - mount_check=self.mount_check, |
587 | - logger=self.logger) |
588 | + all_locs = audit_location_generator(self.devices, DATADIR, |
589 | + mount_check=self.mount_check, logger=self.logger) |
590 | for path, device, partition in all_locs: |
591 | self.object_audit(path, device, partition) |
592 | if time.time() - reported >= 3600: # once an hour |
593 | @@ -68,10 +66,8 @@ |
594 | """Run the object audit once.""" |
595 | self.logger.info('Begin object audit "once" mode') |
596 | begin = reported = time.time() |
597 | - all_locs = audit_location_generator(self.devices, |
598 | - object_server.DATADIR, |
599 | - mount_check=self.mount_check, |
600 | - logger=self.logger) |
601 | + all_locs = audit_location_generator(self.devices, DATADIR, |
602 | + mount_check=self.mount_check, logger=self.logger) |
603 | for path, device, partition in all_locs: |
604 | self.object_audit(path, device, partition) |
605 | if time.time() - reported >= 3600: # once an hour |
606 | @@ -99,14 +95,12 @@ |
607 | if not path.endswith('.data'): |
608 | return |
609 | try: |
610 | - name = object_server.read_metadata(path)['name'] |
611 | + name = read_metadata(path)['name'] |
612 | except Exception, exc: |
613 | raise AuditException('Error when reading metadata: %s' % exc) |
614 | _, account, container, obj = name.split('/', 3) |
615 | - df = object_server.DiskFile(self.devices, device, |
616 | - partition, account, |
617 | - container, obj, |
618 | - keep_data_fp=True) |
619 | + df = DiskFile(self.devices, device, partition, account, container, |
620 | + obj, keep_data_fp=True) |
621 | if df.data_file is None: |
622 | # file is deleted, we found the tombstone |
623 | return |
624 | |
625 | === added file 'swift/obj/diskfile.py' |
626 | --- swift/obj/diskfile.py 1970-01-01 00:00:00 +0000 |
627 | +++ swift/obj/diskfile.py 2010-11-08 18:51:48 +0000 |
628 | @@ -0,0 +1,507 @@ |
629 | +# Copyright (c) 2010 OpenStack, LLC. |
630 | +# |
631 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
632 | +# you may not use this file except in compliance with the License. |
633 | +# You may obtain a copy of the License at |
634 | +# |
635 | +# http://www.apache.org/licenses/LICENSE-2.0 |
636 | +# |
637 | +# Unless required by applicable law or agreed to in writing, software |
638 | +# distributed under the License is distributed on an "AS IS" BASIS, |
639 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
640 | +# implied. |
641 | +# See the License for the specific language governing permissions and |
642 | +# limitations under the License. |
643 | + |
644 | +from __future__ import with_statement |
645 | +import cPickle as pickle |
646 | +import errno |
647 | +import hashlib |
648 | +import os |
649 | +from contextlib import contextmanager |
650 | +from os.path import basename, dirname, isdir, join, splitext |
651 | +from tempfile import mkstemp |
652 | +from time import time |
653 | + |
654 | +from eventlet import tpool, sleep |
655 | +from xattr import getxattr, setxattr |
656 | + |
657 | +from swift.common.constraints import PICKLE_PROTOCOL |
658 | +from swift.common.utils import drop_buffer_cache, hash_path, lock_path, \ |
659 | + mkdirs, normalize_timestamp, renamer, split_path, storage_directory |
660 | + |
661 | + |
662 | +DATADIR = 'objects' |
663 | +SEGMENTSDIR = 'object_segments' |
664 | +JANITORDIR = 'object_janitor' |
665 | +HASH_FILE = 'hashes.pkl' |
666 | +METADATA_KEY = 'user.swift.metadata' |
667 | +ONE_WEEK = 604800 |
668 | + |
669 | + |
670 | +def read_metadata(fd): |
671 | + """ |
672 | + Helper function to read the pickled metadata from an object file. |
673 | + |
674 | + :param fd: file descriptor to load the metadata from |
675 | + |
676 | + :returns: dictionary of metadata |
677 | + """ |
678 | + metadata = '' |
679 | + key = 0 |
680 | + try: |
681 | + while True: |
682 | + metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or ''))) |
683 | + key += 1 |
684 | + except IOError: |
685 | + pass |
686 | + return pickle.loads(metadata) |
687 | + |
688 | + |
689 | +def hash_suffix(path, reclaim_age): |
690 | + """ |
691 | + Performs reclamation and returns an md5 of all (remaining) files. |
692 | + |
693 | + :param reclaim_age: age in seconds at which to remove tombstones |
694 | + """ |
695 | + md5 = hashlib.md5() |
696 | + for hsh in sorted(os.listdir(path)): |
697 | + hsh_path = join(path, hsh) |
698 | + files = os.listdir(hsh_path) |
699 | + if len(files) == 1: |
700 | + if files[0].endswith('.ts'): |
701 | + # remove tombstones older than reclaim_age |
702 | + ts = files[0].rsplit('.', 1)[0] |
703 | + if (time() - float(ts)) > reclaim_age: |
704 | + os.unlink(join(hsh_path, files[0])) |
705 | + files.remove(files[0]) |
706 | + elif files: |
707 | + files.sort(reverse=True) |
708 | + meta = data = tomb = None |
709 | + for filename in files: |
710 | + if not meta and filename.endswith('.meta'): |
711 | + meta = filename |
712 | + if not data and filename.endswith('.data'): |
713 | + data = filename |
714 | + if not tomb and filename.endswith('.ts'): |
715 | + tomb = filename |
716 | + if (filename < tomb or # any file older than tomb |
717 | + filename < data or # any file older than data |
718 | + (filename.endswith('.meta') and |
719 | + filename < meta)): # old meta |
720 | + if filename.endswith('.data'): |
721 | + fp = open(join(hsh_path, filename), 'rb') |
722 | + metadata = read_metadata(fp) |
723 | + if metadata.get('X-Object-Type') == 'manifest': |
724 | + manifest = pickle.load(fp) |
725 | + partition = dirname(path) |
726 | + device = dirname(dirname(partition)) |
727 | + devices = dirname(device) |
728 | + partition = basename(partition) |
729 | + device = basename(device) |
730 | + account, container, obj = \ |
731 | + split_path(metadata['name'], 3, |
732 | + rest_with_last=True) |
733 | + df = DiskFile(devices, device, partition, |
734 | + 'Segment-Cleanup', manifest['x-timestamp'], |
735 | + '%s/%s/%s' % (account, container, obj), |
736 | + datadir=JANITORDIR) |
737 | + df.store_janitor_segment_cleanup(account, |
738 | + container, obj, |
739 | + segment_count=(manifest['content-length'] / |
740 | + manifest['x-segment-size'] + 1), |
741 | + segment_last_deleted=None) |
742 | + fp.close() |
743 | + os.unlink(join(hsh_path, filename)) |
744 | + files.remove(filename) |
745 | + if not files: |
746 | + os.rmdir(hsh_path) |
747 | + for filename in files: |
748 | + md5.update(filename) |
749 | + try: |
750 | + os.rmdir(path) |
751 | + except OSError: |
752 | + pass |
753 | + return md5.hexdigest() |
754 | + |
755 | + |
756 | +def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK): |
757 | + """ |
758 | + Recalculates hashes for the given suffixes in the partition and updates |
759 | + them in the partition's hashes file. |
760 | + |
761 | + :param partition_dir: directory of the partition in which to recalculate |
762 | + :param suffixes: list of suffixes to recalculate |
763 | + :param reclaim_age: age in seconds at which tombstones should be removed |
764 | + """ |
765 | + |
766 | + def tpool_listdir(partition_dir): |
767 | + return dict(((suff, None) for suff in os.listdir(partition_dir) |
768 | + if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
769 | + hashes_file = join(partition_dir, HASH_FILE) |
770 | + with lock_path(partition_dir): |
771 | + try: |
772 | + with open(hashes_file, 'rb') as fp: |
773 | + hashes = pickle.load(fp) |
774 | + except Exception: |
775 | + hashes = tpool.execute(tpool_listdir, partition_dir) |
776 | + for suffix in suffixes: |
777 | + suffix_dir = join(partition_dir, suffix) |
778 | + if os.path.exists(suffix_dir): |
779 | + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
780 | + elif suffix in hashes: |
781 | + del hashes[suffix] |
782 | + with open(hashes_file + '.tmp', 'wb') as fp: |
783 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
784 | + renamer(hashes_file + '.tmp', hashes_file) |
785 | + |
786 | + |
787 | +def invalidate_hash(suffix_dir): |
788 | + """ |
789 | + Invalidates the hash for a suffix_dir in the partition's hashes file. |
790 | + |
791 | + :param suffix_dir: absolute path to suffix dir whose hash needs |
792 | + invalidating |
793 | + """ |
794 | + |
795 | + suffix = os.path.basename(suffix_dir) |
796 | + partition_dir = os.path.dirname(suffix_dir) |
797 | + hashes_file = join(partition_dir, HASH_FILE) |
798 | + with lock_path(partition_dir): |
799 | + try: |
800 | + with open(hashes_file, 'rb') as fp: |
801 | + hashes = pickle.load(fp) |
802 | + if suffix in hashes and not hashes[suffix]: |
803 | + return |
804 | + except Exception: |
805 | + return |
806 | + hashes[suffix] = None |
807 | + with open(hashes_file + '.tmp', 'wb') as fp: |
808 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
809 | + renamer(hashes_file + '.tmp', hashes_file) |
810 | + |
811 | + |
812 | +def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK): |
813 | + """ |
814 | + Get a list of hashes for the suffix dir. do_listdir causes it to mistrust |
815 | + the hash cache for suffix existence at the (unexpectedly high) cost of a |
816 | + listdir. reclaim_age is just passed on to hash_suffix. |
817 | + |
818 | + :param partition_dir: absolute path of partition to get hashes for |
819 | + :param do_listdir: force existence check for all hashes in the partition |
820 | + :param reclaim_age: age at which to remove tombstones |
821 | + |
822 | + :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) |
823 | + """ |
824 | + |
825 | + def tpool_listdir(hashes, partition_dir): |
826 | + return dict(((suff, hashes.get(suff, None)) |
827 | + for suff in os.listdir(partition_dir) |
828 | + if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
829 | + hashed = 0 |
830 | + hashes_file = join(partition_dir, HASH_FILE) |
831 | + with lock_path(partition_dir): |
832 | + modified = False |
833 | + hashes = {} |
834 | + try: |
835 | + with open(hashes_file, 'rb') as fp: |
836 | + hashes = pickle.load(fp) |
837 | + except Exception: |
838 | + do_listdir = True |
839 | + if do_listdir: |
840 | + hashes = tpool.execute(tpool_listdir, hashes, partition_dir) |
841 | + modified = True |
842 | + for suffix, hash_ in hashes.items(): |
843 | + if not hash_: |
844 | + suffix_dir = join(partition_dir, suffix) |
845 | + if os.path.exists(suffix_dir): |
846 | + try: |
847 | + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
848 | + hashed += 1 |
849 | + except OSError: |
850 | + logging.exception('Error hashing suffix') |
851 | + hashes[suffix] = None |
852 | + else: |
853 | + del hashes[suffix] |
854 | + modified = True |
855 | + sleep() |
856 | + if modified: |
857 | + with open(hashes_file + '.tmp', 'wb') as fp: |
858 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
859 | + renamer(hashes_file + '.tmp', hashes_file) |
860 | + return hashed, hashes |
861 | + |
862 | + |
863 | +class DiskFile(object): |
864 | + """ |
865 | + Manage files on disk for a single object. |
866 | + |
867 | + :param path: path to devices on the node |
868 | + :param device: device name |
869 | + :param partition: partition on the device the object lives in |
870 | + :param account: account name for the object |
871 | + :param container: container name for the object |
872 | + :param obj: object name for the object |
873 | + :param keep_data_fp: if True, don't close the fp, otherwise close it |
874 | + :param disk_chunk_size: size of chunks on file reads |
875 | + :param datadir: Sets which directory the root of the data structure is |
876 | + named (default: DATADIR) |
877 | + :param segment: If set to not None, indicates which segment of an object |
878 | + this file represents |
879 | + :param segment_timestamp: X-Timestamp of the object's segments (set on the |
880 | + PUT, not changed on POSTs), required if segment |
881 | + is set to not None |
882 | + """ |
883 | + |
884 | + def __init__(self, path, device, partition, account, container, obj, |
885 | + keep_data_fp=False, disk_chunk_size=65536, datadir=DATADIR, |
886 | + segment=None, segment_timestamp=None): |
887 | + self.account = account |
888 | + self.container = container |
889 | + self.obj = obj |
890 | + self.fp = None |
891 | + self.disk_chunk_size = disk_chunk_size |
892 | + self.name = '/' + '/'.join((account, container, obj)) |
893 | + if segment is not None: |
894 | + segment_name = '%s/%s/%s' % (obj, segment_timestamp, segment) |
895 | + segment_hash = hash_path(account, container, segment_name) |
896 | + self.datadir = os.path.join(path, device, |
897 | + storage_directory(SEGMENTSDIR, partition, segment_hash)) |
898 | + name_hash = hash_path(account, container, obj) |
899 | + self.no_longer_segment_datadir = os.path.join(path, device, |
900 | + storage_directory(datadir, partition, name_hash)) |
901 | + else: |
902 | + name_hash = hash_path(account, container, obj) |
903 | + self.datadir = self.no_longer_segment_datadir = os.path.join(path, |
904 | + device, storage_directory(datadir, partition, name_hash)) |
905 | + self.tmpdir = os.path.join(path, device, 'tmp') |
906 | + self.metadata = {} |
907 | + self.meta_file = None |
908 | + self.data_file = None |
909 | + if not os.path.exists(self.datadir): |
910 | + return |
911 | + files = sorted(os.listdir(self.datadir), reverse=True) |
912 | + for file in files: |
913 | + if file.endswith('.ts'): |
914 | + self.data_file = self.meta_file = None |
915 | + self.metadata = {'deleted': True} |
916 | + return |
917 | + if file.endswith('.meta') and not self.meta_file: |
918 | + self.meta_file = os.path.join(self.datadir, file) |
919 | + if file.endswith('.data') and not self.data_file: |
920 | + self.data_file = os.path.join(self.datadir, file) |
921 | + break |
922 | + if not self.data_file: |
923 | + return |
924 | + self.fp = open(self.data_file, 'rb') |
925 | + self.metadata = read_metadata(self.fp) |
926 | + if not keep_data_fp: |
927 | + self.close() |
928 | + if self.meta_file: |
929 | + with open(self.meta_file) as mfp: |
930 | + for key in self.metadata.keys(): |
931 | + if key.lower() not in ('content-type', 'content-encoding', |
932 | + 'deleted', 'content-length', 'etag'): |
933 | + del self.metadata[key] |
934 | + self.metadata.update(read_metadata(mfp)) |
935 | + |
936 | + def __iter__(self): |
937 | + """Returns an iterator over the data file.""" |
938 | + try: |
939 | + dropped_cache = 0 |
940 | + read = 0 |
941 | + while True: |
942 | + chunk = self.fp.read(self.disk_chunk_size) |
943 | + if chunk: |
944 | + read += len(chunk) |
945 | + if read - dropped_cache > (1024 * 1024): |
946 | + drop_buffer_cache(self.fp.fileno(), dropped_cache, |
947 | + read - dropped_cache) |
948 | + dropped_cache = read |
949 | + yield chunk |
950 | + else: |
951 | + drop_buffer_cache(self.fp.fileno(), dropped_cache, |
952 | + read - dropped_cache) |
953 | + break |
954 | + finally: |
955 | + self.close() |
956 | + |
957 | + def app_iter_range(self, start, stop): |
958 | + """Returns an iterator over the data file for range (start, stop)""" |
959 | + if start: |
960 | + self.fp.seek(start) |
961 | + if stop is not None: |
962 | + length = stop - start |
963 | + else: |
964 | + length = None |
965 | + for chunk in self: |
966 | + if length is not None: |
967 | + length -= len(chunk) |
968 | + if length < 0: |
969 | + # Chop off the extra: |
970 | + yield chunk[:length] |
971 | + break |
972 | + yield chunk |
973 | + |
974 | + def close(self): |
975 | + """Close the file.""" |
976 | + if self.fp: |
977 | + self.fp.close() |
978 | + self.fp = None |
979 | + |
980 | + def is_deleted(self): |
981 | + """ |
982 | + Check if the file is deleted. |
983 | + |
984 | + :returns: True if the file doesn't exist or has been flagged as |
985 | + deleted. |
986 | + """ |
987 | + return not self.data_file or 'deleted' in self.metadata |
988 | + |
989 | + @contextmanager |
990 | + def mkstemp(self): |
991 | + """Context manager to make a temporary file.""" |
992 | + if not os.path.exists(self.tmpdir): |
993 | + mkdirs(self.tmpdir) |
994 | + fd, tmppath = mkstemp(dir=self.tmpdir) |
995 | + try: |
996 | + yield fd, tmppath |
997 | + finally: |
998 | + try: |
999 | + os.close(fd) |
1000 | + except OSError: |
1001 | + pass |
1002 | + try: |
1003 | + os.unlink(tmppath) |
1004 | + except OSError: |
1005 | + pass |
1006 | + |
1007 | + def put(self, fd, tmppath, metadata, extension='.data', |
1008 | + no_longer_segment=False): |
1009 | + """ |
1010 | + Finalize writing the file on disk, and renames it from the temp file to |
1011 | + the real location. This should be called after the data has been |
1012 | + written to the temp file. |
1013 | + |
1014 | + :params fd: file descriptor of the temp file |
1015 | + :param tmppath: path to the temporary file being used |
1016 | + :param metadata: dictionary of metada to be written |
1017 | + :param extension: extension to be used when making the file |
1018 | + :param no_longer_segment: Set to True if this was originally an object |
1019 | + segment but no longer is (case with chunked transfer encoding when |
1020 | + the object ends up less than the segment size) |
1021 | + """ |
1022 | + metadata['name'] = self.name |
1023 | + timestamp = normalize_timestamp(metadata['X-Timestamp']) |
1024 | + metastr = pickle.dumps(metadata, PICKLE_PROTOCOL) |
1025 | + key = 0 |
1026 | + while metastr: |
1027 | + setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254]) |
1028 | + metastr = metastr[254:] |
1029 | + key += 1 |
1030 | + if 'Content-Length' in metadata: |
1031 | + drop_buffer_cache(fd, 0, int(metadata['Content-Length'])) |
1032 | + os.fsync(fd) |
1033 | + if no_longer_segment: |
1034 | + self.datadir = self.no_longer_segment_datadir |
1035 | + invalidate_hash(os.path.dirname(self.datadir)) |
1036 | + renamer(tmppath, os.path.join(self.datadir, timestamp + extension)) |
1037 | + self.metadata = metadata |
1038 | + |
1039 | + def unlinkold(self, timestamp): |
1040 | + """ |
1041 | + Remove any older versions of the object file. Any file that has an |
1042 | + older timestamp than timestamp will be deleted. |
1043 | + |
1044 | + :param timestamp: timestamp to compare with each file |
1045 | + """ |
1046 | + timestamp = normalize_timestamp(timestamp) |
1047 | + for fname in os.listdir(self.datadir): |
1048 | + if fname < timestamp: |
1049 | + try: |
1050 | + os.unlink(os.path.join(self.datadir, fname)) |
1051 | + except OSError, err: # pragma: no cover |
1052 | + if err.errno != errno.ENOENT: |
1053 | + raise |
1054 | + |
1055 | + def tombstone(self, timestamp): |
1056 | + """ |
1057 | + Creates a tombstone for the DiskFile, indicating any versions older |
1058 | + than `timestamp` should be removed. |
1059 | + |
1060 | + :param timestamp: normalized timestamp of the tombstone |
1061 | + """ |
1062 | + with self.mkstemp() as (fd, tmppath): |
1063 | + self.put(fd, tmppath, {'X-Timestamp': timestamp, 'deleted': True}, |
1064 | + extension='.ts') |
1065 | + self.unlinkold(timestamp) |
1066 | + |
1067 | + def store_janitor_container_update(self, op, account, container, obj, |
1068 | + headers, successes): |
1069 | + """ |
1070 | + Creates a .data file whose contents contain a operation for the |
1071 | + object-janitor to send object metadata to the container servers. |
1072 | + |
1073 | + :param op: The operation to send to the container server (usually PUT |
1074 | + or DELETE). |
1075 | + :param account: The account name for the object. |
1076 | + :param container: The container name for the object. |
1077 | + :param obj: The object name for the object. |
1078 | + :param headers: The headers to include in the requests to the container |
1079 | + servers. Should at least contain X-Timestamp indicating |
1080 | + the version of the object metadata. |
1081 | + :param successes: An array of container node ids that have already |
1082 | + received the object metadata update. |
1083 | + """ |
1084 | + timestamp = normalize_timestamp(time()) |
1085 | + with self.mkstemp() as (fd, tmppath): |
1086 | + os.write(fd, pickle.dumps({'op': op, 'account': account, |
1087 | + 'container': container, 'obj': obj, 'headers': headers, |
1088 | + 'successes': successes}, PICKLE_PROTOCOL)) |
1089 | + self.put(fd, tmppath, |
1090 | + {'X-Op': 'Container-Update', 'X-Timestamp': timestamp}) |
1091 | + self.unlinkold(timestamp) |
1092 | + |
1093 | + def store_janitor_segment_cleanup(self, account, container, obj, |
1094 | + segment_count, segment_last_deleted): |
1095 | + """ |
1096 | + Creates a .data file whose contents contain a operation for the |
1097 | + object-janitor to send clean up object segments for an object. |
1098 | + |
1099 | + Note that the DiskFile created for this operation is a bit different |
1100 | + than most DiskFiles. The DiskFile account name should be |
1101 | + 'Segment-Cleanup', the container name should be the segments' |
1102 | + timestamp, and the object name should be the full account/container/obj |
1103 | + path (no leading /). However, the account, container, obj given in this |
1104 | + `store_janitor_segment_cleanup` call should be the usual as related to |
1105 | + the actual object. This complexity is so that each set of segments are |
1106 | + treated independently. |
1107 | + |
1108 | + For example:: |
1109 | + |
1110 | + df = DiskFile(devices, device, partition, |
1111 | + 'Segment-Cleanup', manifest['x-timestamp'], |
1112 | + '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR) |
1113 | + df.store_janitor_segment_cleanup(account, container, obj, None, |
1114 | + None) |
1115 | + |
1116 | + :param account: The account name for the object. |
1117 | + :param container: The container name for the object. |
1118 | + :param obj: The object name for the object. |
1119 | + :param segment_count: The number of segments the object has or None if |
1120 | + the number is not known. |
1121 | + :param segment_last_deleted: The segment that was last deleted so the |
1122 | + next pass of this operation can continue |
1123 | + where it left off. Set to None if no |
1124 | + segments have been deleted yet. |
1125 | + """ |
1126 | + timestamp = normalize_timestamp(time()) |
1127 | + with self.mkstemp() as (fd, tmppath): |
1128 | + os.write(fd, pickle.dumps({'account': account, |
1129 | + 'container': container, 'obj': obj, |
1130 | + 'segment_count': segment_count, |
1131 | + 'segment_last_deleted': segment_last_deleted}, |
1132 | + PICKLE_PROTOCOL)) |
1133 | + self.put(fd, tmppath, |
1134 | + {'X-Op': 'Segment-Cleanup', 'X-Timestamp': timestamp}) |
1135 | + self.unlinkold(timestamp) |
1136 | |
1137 | === added file 'swift/obj/janitor.py' |
1138 | --- swift/obj/janitor.py 1970-01-01 00:00:00 +0000 |
1139 | +++ swift/obj/janitor.py 2010-11-08 18:51:48 +0000 |
1140 | @@ -0,0 +1,516 @@ |
1141 | +# Copyright (c) 2010 OpenStack, LLC. |
1142 | +# |
1143 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
1144 | +# you may not use this file except in compliance with the License. |
1145 | +# You may obtain a copy of the License at |
1146 | +# |
1147 | +# http://www.apache.org/licenses/LICENSE-2.0 |
1148 | +# |
1149 | +# Unless required by applicable law or agreed to in writing, software |
1150 | +# distributed under the License is distributed on an "AS IS" BASIS, |
1151 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
1152 | +# implied. |
1153 | +# See the License for the specific language governing permissions and |
1154 | +# limitations under the License. |
1155 | + |
1156 | +from __future__ import with_statement |
1157 | +import cPickle as pickle |
1158 | +import os |
1159 | +import signal |
1160 | +import sys |
1161 | +from random import random |
1162 | +from time import sleep, time |
1163 | + |
1164 | +from eventlet import patcher, Timeout |
1165 | + |
1166 | +from swift.common.bufferedhttp import http_connect |
1167 | +from swift.common.exceptions import ConnectionTimeout |
1168 | +from swift.common.ring import Ring |
1169 | +from swift.common.utils import get_logger, normalize_timestamp, whataremyips |
1170 | +from swift.common.daemon import Daemon |
1171 | +from swift.obj.diskfile import DiskFile, JANITORDIR, read_metadata |
1172 | + |
1173 | + |
1174 | +class ObjectJanitor(Daemon): |
1175 | + """ |
1176 | + Run background operations for the object server, such as updating container |
1177 | + servers with new object metadata and cleaning up discarded segmented |
1178 | + objects. |
1179 | + """ |
1180 | + |
1181 | + def __init__(self, conf): |
1182 | + self.conf = conf |
1183 | + self.logger = get_logger(conf, 'object-janitor') |
1184 | + self.devices = conf.get('devices', '/srv/node') |
1185 | + self.port = int(conf.get('bind_port', 6000)) |
1186 | + self.my_node_ids = [] |
1187 | + self.mount_check = conf.get('mount_check', 'true').lower() in \ |
1188 | + ('true', 't', '1', 'on', 'yes', 'y') |
1189 | + swift_dir = conf.get('swift_dir', '/etc/swift') |
1190 | + self.interval = int(conf.get('interval', 300)) |
1191 | + self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz') |
1192 | + self.object_ring = None |
1193 | + self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz') |
1194 | + self.container_ring = None |
1195 | + self.concurrency = int(conf.get('concurrency', 1)) |
1196 | + self.slowdown = float(conf.get('slowdown', 0.01)) |
1197 | + self.node_timeout = int(conf.get('node_timeout', 10)) |
1198 | + self.conn_timeout = float(conf.get('conn_timeout', 0.5)) |
1199 | + self.segment_reclaim_age = int(conf.get('segment_reclaim_age', 604800)) |
1200 | + self.segments_per_pass = int(conf.get('segments_per_pass', 10)) |
1201 | + self.container_update_successes = 0 |
1202 | + self.container_update_failures = 0 |
1203 | + self.segment_cleanup_completions = 0 |
1204 | + self.segment_cleanup_segments = 0 |
1205 | + self.segment_cleanup_failures = 0 |
1206 | + |
1207 | + def get_object_ring(self): |
1208 | + """Get the object ring. Load it, if it hasn't been yet.""" |
1209 | + if not self.object_ring: |
1210 | + self.logger.debug( |
1211 | + 'Loading object ring from %s' % self.object_ring_path) |
1212 | + self.object_ring = Ring(self.object_ring_path) |
1213 | + return self.object_ring |
1214 | + |
1215 | + def get_container_ring(self): |
1216 | + """Get the container ring. Load it, if it hasn't been yet.""" |
1217 | + if not self.container_ring: |
1218 | + self.logger.debug( |
1219 | + 'Loading container ring from %s' % self.container_ring_path) |
1220 | + self.container_ring = Ring(self.container_ring_path) |
1221 | + return self.container_ring |
1222 | + |
1223 | + def run_forever(self): |
1224 | + """Run the janitor continuously.""" |
1225 | + sleep(random() * self.interval) |
1226 | + while True: |
1227 | + begin = time() |
1228 | + self.full_sweep() |
1229 | + elapsed = time() - begin |
1230 | + if elapsed < self.interval: |
1231 | + sleep(self.interval - elapsed) |
1232 | + |
1233 | + def run_once(self): |
1234 | + """Run the janitor once.""" |
1235 | + self.full_sweep(fork=False) |
1236 | + |
1237 | + def full_sweep(self, fork=True): |
1238 | + """ |
1239 | + Run a full sweep of the server for object janitor operations. |
1240 | + |
1241 | + :param fork: If True, subprocesses will be forked for each device up to |
1242 | + self.concurrency at any given time. |
1243 | + """ |
1244 | + self.logger.info('Begin object janitor sweep') |
1245 | + begin = time() |
1246 | + my_ips = whataremyips() |
1247 | + self.my_node_ids = [n['id'] for n in self.get_object_ring().devs |
1248 | + if n and n['ip'] in my_ips and n['port'] == self.port] |
1249 | + pids = [] |
1250 | + for device in os.listdir(self.devices): |
1251 | + if self.mount_check and not \ |
1252 | + os.path.ismount(os.path.join(self.devices, device)): |
1253 | + self.logger.warn('Skipping %s as it is not mounted' % device) |
1254 | + continue |
1255 | + if fork: |
1256 | + while len(pids) >= self.concurrency: |
1257 | + pids.remove(os.wait()[0]) |
1258 | + # read from rings to ensure they're fresh |
1259 | + self.get_object_ring().get_nodes('') |
1260 | + self.get_container_ring().get_nodes('') |
1261 | + if fork: |
1262 | + pid = os.fork() |
1263 | + if pid: |
1264 | + pids.append(pid) |
1265 | + else: |
1266 | + signal.signal(signal.SIGTERM, signal.SIG_DFL) |
1267 | + patcher.monkey_patch(all=False, socket=True) |
1268 | + self.object_sweep(os.path.join(self.devices, device)) |
1269 | + sys.exit() |
1270 | + else: |
1271 | + self.object_sweep(os.path.join(self.devices, device)) |
1272 | + if fork: |
1273 | + while pids: |
1274 | + pids.remove(os.wait()[0]) |
1275 | + elapsed = time() - begin |
1276 | + self.logger.info('Object janitor sweep completed: %.02fs' % elapsed) |
1277 | + |
1278 | + def object_sweep(self, device): |
1279 | + """ |
1280 | + If there are janitor pendings on the device, walk each one and update. |
1281 | + |
1282 | + :param device: path to device |
1283 | + """ |
1284 | + self.container_update_successes = 0 |
1285 | + self.container_update_failures = 0 |
1286 | + self.segment_cleanup_completions = 0 |
1287 | + self.segment_cleanup_segments = 0 |
1288 | + self.segment_cleanup_failures = 0 |
1289 | + begin = time() |
1290 | + janitordir = os.path.join(device, JANITORDIR) |
1291 | + try: |
1292 | + if not os.path.isdir(janitordir): |
1293 | + return |
1294 | + for partition in os.listdir(janitordir): |
1295 | + partition_path = os.path.join(janitordir, partition) |
1296 | + for suffix in os.listdir(partition_path): |
1297 | + suffix_path = os.path.join(partition_path, suffix) |
1298 | + for janitor in os.listdir(suffix_path): |
1299 | + janitor_path = os.path.join(suffix_path, janitor) |
1300 | + self.process_object_janitor(device, partition, |
1301 | + janitor_path) |
1302 | + sleep(self.slowdown) |
1303 | + finally: |
1304 | + elapsed = time() - begin |
1305 | + self.logger.info('Object janitor sweep of %s completed in %.02fs: ' |
1306 | + 'container updates: %s successes, %s failures; segment ' |
1307 | + 'cleanups: %s completions, %s segments, %s failures' % |
1308 | + (device, elapsed, |
1309 | + self.container_update_successes, |
1310 | + self.container_update_failures, |
1311 | + self.segment_cleanup_completions, |
1312 | + self.segment_cleanup_segments, |
1313 | + self.segment_cleanup_failures)) |
1314 | + |
1315 | + def process_object_janitor(self, device, partition, janitor_path): |
1316 | + """ |
1317 | + Process the object janitor operation. |
1318 | + |
1319 | + :param device: path to device |
1320 | + :param partition: partition for the object |
1321 | + :param janitor_path: path to DiskFile for the janitor operation |
1322 | + """ |
1323 | + data_file = None |
1324 | + files = sorted(os.listdir(janitor_path), reverse=True) |
1325 | + for file_ in files: |
1326 | + if file_.endswith('.ts'): |
1327 | + break |
1328 | + if file_.endswith('.data'): |
1329 | + data_file = os.path.join(janitor_path, file_) |
1330 | + break |
1331 | + if not data_file: |
1332 | + return |
1333 | + metadata = read_metadata(data_file) |
1334 | + _, account, container, obj = metadata['name'].split('/', 3) |
1335 | + if metadata['X-Op'] == 'Container-Update': |
1336 | + self.process_container_update(device, partition, account, |
1337 | + container, obj) |
1338 | + elif metadata['X-Op'] == 'Segment-Cleanup': |
1339 | + if self.get_object_ring().get_part_nodes(int(partition))[0]['id'] \ |
1340 | + in self.my_node_ids: |
1341 | + self.process_segment_cleanup(device, partition, account, |
1342 | + container, obj) |
1343 | + else: |
1344 | + self.logger.error('ERROR: Unknown X-Op: %s' % metadata['X-Op']) |
1345 | + return |
1346 | + |
1347 | + def process_container_update(self, device, partition, account, container, |
1348 | + obj): |
1349 | + """ |
1350 | + Process the container update operation, sending the object information |
1351 | + to the container server. |
1352 | + |
1353 | + :param device: path to device |
1354 | + :param partition: partition for the object |
1355 | + :param account: account for the object |
1356 | + :param container: container for the object |
1357 | + :param obj: name for the object |
1358 | + """ |
1359 | + disk_file = DiskFile(self.devices, device, partition, account, |
1360 | + container, obj, keep_data_fp=True, datadir=JANITORDIR) |
1361 | + if disk_file.is_deleted(): |
1362 | + return |
1363 | + update = pickle.loads(''.join(iter(disk_file))) |
1364 | + op = update['op'] |
1365 | + account = update['account'] |
1366 | + container = update['container'] |
1367 | + obj = update['obj'] |
1368 | + headers = update['headers'] |
1369 | + successes = update.get('successes', []) |
1370 | + part, nodes = self.get_container_ring().get_nodes(account, container) |
1371 | + path = '/%s/%s/%s' % (account, container, obj) |
1372 | + success = True |
1373 | + for node in nodes: |
1374 | + if node['id'] not in successes: |
1375 | + status = self.container_update(node, part, op, path, headers) |
1376 | + if not (200 <= status < 300) and status != 404: |
1377 | + success = False |
1378 | + else: |
1379 | + successes.append(node['id']) |
1380 | + if success: |
1381 | + self.container_update_successes += 1 |
1382 | + self.logger.debug('Update sent for %s %s' % |
1383 | + (path, disk_file.datadir)) |
1384 | + disk_file.tombstone(normalize_timestamp(time())) |
1385 | + else: |
1386 | + self.container_update_failures += 1 |
1387 | + self.logger.debug('Update failed for %s %s' % |
1388 | + (path, disk_file.datadir)) |
1389 | + if len(update.get('successes', [])) != len(successes): |
1390 | + disk_file.store_janitor_container_update(op, account, |
1391 | + container, obj, headers, successes) |
1392 | + |
1393 | + def container_update(self, node, part, op, obj, headers): |
1394 | + """ |
1395 | + Perform the actual container update network operation, sending the |
1396 | + object information to the container server. |
1397 | + |
1398 | + :param node: node dictionary from the container ring |
1399 | + :param part: partition that holds the container |
1400 | + :param op: operation performed (ex: 'POST' or 'DELETE') |
1401 | + :param obj: object name being updated |
1402 | + :param headers: headers to send with the update |
1403 | + """ |
1404 | + try: |
1405 | + with ConnectionTimeout(self.conn_timeout): |
1406 | + conn = http_connect(node['ip'], node['port'], node['device'], |
1407 | + part, op, obj, headers) |
1408 | + with Timeout(self.node_timeout): |
1409 | + resp = conn.getresponse() |
1410 | + resp.read() |
1411 | + return resp.status |
1412 | + except: |
1413 | + self.logger.exception('ERROR with remote server ' |
1414 | + '%(ip)s:%(port)s/%(device)s' % node) |
1415 | + return 500 |
1416 | + |
1417 | + def process_segment_cleanup(self, device, partition, account, container, |
1418 | + obj): |
1419 | + """ |
1420 | + Process the segment cleanup operation, checking to see if the |
1421 | + operation should have completed long ago but still has no manifest and |
1422 | + therefore should have the orphaned segments removed. |
1423 | + |
1424 | + :param device: path to device |
1425 | + :param partition: partition for the operation |
1426 | + :param account: account for the operation (should always be |
1427 | + "Segment-Cleanup") |
1428 | + :param container: container for the operation (actually the timestamp |
1429 | + of the original PUT) |
1430 | + :param obj: name for the object (actually the full path |
1431 | + account/container/object for the original object PUT) |
1432 | + """ |
1433 | + disk_file = DiskFile(self.devices, device, partition, account, |
1434 | + container, obj, keep_data_fp=True, datadir=JANITORDIR) |
1435 | + if disk_file.is_deleted(): |
1436 | + return |
1437 | + disk_file_data = pickle.loads(''.join(iter(disk_file))) |
1438 | + segment_timestamp = container |
1439 | + # If not None, indicates we have started cleaning up these segments |
1440 | + # already and should just continue the operation. |
1441 | + segment_last_deleted = disk_file_data.get('segment_last_deleted', None) |
1442 | + # Sometimes we'll have no clue how many segments there are, which is |
1443 | + # what None means. |
1444 | + segment_count = disk_file_data.get('segment_count', None) |
1445 | + account = disk_file_data['account'] |
1446 | + container = disk_file_data['container'] |
1447 | + obj = disk_file_data['obj'] |
1448 | + path = '/%s/%s/%s' % (account, container, obj) |
1449 | + if segment_last_deleted is not None: |
1450 | + self.logger.debug('Continue cleaning up segments for %s/%s/%s %s ' |
1451 | + '@%s' % (device, partition, path, segment_timestamp, |
1452 | + segment_last_deleted)) |
1453 | + self.cleanup_segments(disk_file, account, container, obj, |
1454 | + segment_timestamp, segment_count, segment_last_deleted) |
1455 | + return |
1456 | + part, nodes = self.get_object_ring().get_nodes(account, container, obj) |
1457 | + newest_object = None |
1458 | + newest_manifest = None |
1459 | + responses = 0 |
1460 | + for node in nodes: |
1461 | + try: |
1462 | + with ConnectionTimeout(self.conn_timeout): |
1463 | + conn = http_connect(node['ip'], node['port'], |
1464 | + node['device'], part, 'GET', path) |
1465 | + with Timeout(self.node_timeout): |
1466 | + resp = conn.getresponse() |
1467 | + responses += 1 |
1468 | + if resp.status // 100 != 2: |
1469 | + conn.close() |
1470 | + continue |
1471 | + if resp.getheader('x-object-type', '') != 'manifest': |
1472 | + if resp.getheader('x-timestamp') > newest_object: |
1473 | + newest_object = resp.getheader('x-timestamp') |
1474 | + conn.close() |
1475 | + if newest_object > segment_timestamp: |
1476 | + # We don't have to talk to the other nodes if we |
1477 | + # already know we have a newer object. |
1478 | + break |
1479 | + continue |
1480 | + with Timeout(self.node_timeout): |
1481 | + body = resp.read() |
1482 | + manifest = pickle.loads(body) |
1483 | + if manifest['x-timestamp'] > newest_manifest: |
1484 | + newest_manifest = manifest['x-timestamp'] |
1485 | + conn.close() |
1486 | + if newest_manifest > segment_timestamp: |
1487 | + # We don't have to talk to the other nodes if we already |
1488 | + # know we have a newer manifest. |
1489 | + break |
1490 | + except: |
1491 | + self.logger.exception('ERROR with remote server ' |
1492 | + '%(ip)s:%(port)s/%(device)s' % node) |
1493 | + if newest_object > segment_timestamp or \ |
1494 | + newest_manifest > segment_timestamp: |
1495 | + self.logger.debug('Newer object/manifest; discarding old ' |
1496 | + 'segments for %s/%s/%s %s' % (device, partition, path, |
1497 | + segment_timestamp)) |
1498 | + self.cleanup_segments(disk_file, account, container, obj, |
1499 | + segment_timestamp, segment_count, segment_last_deleted) |
1500 | + elif newest_manifest == segment_timestamp: |
1501 | + self.logger.debug('Exact manifest confirmed; discarding janitor ' |
1502 | + 'cleanup operation for %s/%s/%s %s' % (device, partition, |
1503 | + path, segment_timestamp)) |
1504 | + disk_file.tombstone(normalize_timestamp(time())) |
1505 | + elif responses == len(nodes) and \ |
1506 | + time() - float(segment_timestamp) > \ |
1507 | + self.segment_reclaim_age: |
1508 | + self.logger.debug('All nodes agree the manifest still does not ' |
1509 | + 'exist after %ss; discarding orphaned segments for ' |
1510 | + '%s/%s/%s %s' % (self.segment_reclaim_age, device, |
1511 | + partition, path, segment_timestamp)) |
1512 | + self.cleanup_segments(disk_file, account, container, obj, |
1513 | + segment_timestamp, segment_count, segment_last_deleted) |
1514 | + |
1515 | + def cleanup_segments(self, disk_file, account, container, obj, |
1516 | + segment_timestamp, segment_count, segment_last_deleted): |
1517 | + """ |
1518 | + Issues DELETEs to the segments of the object, up to |
1519 | + self.segments_per_pass. If all the segments are not deleted before this |
1520 | + function returns, it will update the disk_file with the latest point |
1521 | + completed. |
1522 | + |
1523 | + :param disk_file: DiskFile for the Segment Cleanup operation. |
1524 | + :param account: Account name for the manifest object to delete segments |
1525 | + for. |
1526 | + :param container: Container name for the manifest object to delete |
1527 | + segments for. |
1528 | + :param obj: Object name for the manifest object to delete segments for. |
1529 | + :param segment_timestamp: The timestamp for the segments themselves. |
1530 | + :param segment_count: The number of segments for the object; may be |
1531 | + None if not yet known and this function should |
1532 | + try to determine the count. |
1533 | + :param segment_last_deleted: The segment number that was last deleted; |
1534 | + None of none have yet been deleted. This |
1535 | + allows continuation of previous deletion |
1536 | + attempts. |
1537 | + """ |
1538 | + path = '/%s/%s/%s' % (account, container, obj) |
1539 | + if segment_last_deleted is None: |
1540 | + segment_last_deleted = -1 |
1541 | + if segment_count is None: |
1542 | + # We need to determine how many segments there are so that if we |
1543 | + # crash this run, next run we'll know what range we'll need to work |
1544 | + # within. Example: Determine we have 10 segments; delete the first |
1545 | + # five and crash; next run we now know we need to delete no more |
1546 | + # than 10 segments. With the approach of just deleting segments |
1547 | + # until a 404 and not predetermining the count, with such a crash |
1548 | + # we'd never delete the last five segments. |
1549 | + segment_count = 0 |
1550 | + assumptions = 0 |
1551 | + while True: |
1552 | + if segment_count: |
1553 | + ring_obj = \ |
1554 | + '%s/%s/%s' % (obj, segment_timestamp, segment_count) |
1555 | + else: |
1556 | + ring_obj = obj |
1557 | + part, nodes = self.get_object_ring().get_nodes(account, |
1558 | + container, ring_obj) |
1559 | + headers = {'X-Object-Segment': str(segment_count), |
1560 | + 'X-Object-Segment-Timestamp': segment_timestamp} |
1561 | + status = 0 |
1562 | + for node in nodes: |
1563 | + try: |
1564 | + with ConnectionTimeout(self.conn_timeout): |
1565 | + conn = http_connect(node['ip'], node['port'], |
1566 | + node['device'], part, 'HEAD', path, |
1567 | + headers=headers) |
1568 | + with Timeout(self.node_timeout): |
1569 | + resp = conn.getresponse() |
1570 | + resp.read() |
1571 | + if resp.status == 404: |
1572 | + status = 404 |
1573 | + elif resp.status // 100 == 2: |
1574 | + status = 200 |
1575 | + break |
1576 | + except: |
1577 | + self.logger.exception('ERROR with remote server ' |
1578 | + '%(ip)s:%(port)s/%(device)s' % node) |
1579 | + if status == 404: |
1580 | + break |
1581 | + segment_count += 1 |
1582 | + if not status: |
1583 | + # We couldn't determine if a segment existed, so we'll just |
1584 | + # assume it did and move to the next one; but we'll only do |
1585 | + # this for five consecutive segments before just giving up |
1586 | + # completely and trying the next run. |
1587 | + assumptions += 1 |
1588 | + if assumptions > 5: |
1589 | + self.segment_cleanup_failures += 1 |
1590 | + return |
1591 | + assumptions = 0 |
1592 | + disk_file.store_janitor_segment_cleanup(account, container, obj, |
1593 | + segment_count, segment_last_deleted) |
1594 | + starting_segment_last_deleted = segment_last_deleted |
1595 | + try: |
1596 | + segments_this_pass = self.segments_per_pass |
1597 | + while True: |
1598 | + segment = segment_last_deleted + 1 |
1599 | + if segment: |
1600 | + ring_obj = '%s/%s/%s' % (obj, segment_timestamp, segment) |
1601 | + else: |
1602 | + ring_obj = obj |
1603 | + part, nodes = self.get_object_ring().get_nodes(account, |
1604 | + container, ring_obj) |
1605 | + headers = {'X-Object-Segment': str(segment), |
1606 | + 'X-Object-Segment-Timestamp': segment_timestamp, |
1607 | + 'X-Timestamp': normalize_timestamp(time())} |
1608 | + not_found_count = 0 |
1609 | + success = False |
1610 | + for node in nodes: |
1611 | + try: |
1612 | + with ConnectionTimeout(self.conn_timeout): |
1613 | + conn = http_connect(node['ip'], node['port'], |
1614 | + node['device'], part, 'DELETE', path, |
1615 | + headers=headers) |
1616 | + with Timeout(self.node_timeout): |
1617 | + resp = conn.getresponse() |
1618 | + resp.read() |
1619 | + if resp.status == 404: |
1620 | + not_found_count += 1 |
1621 | + elif resp.status // 100 == 2: |
1622 | + # For the sake of this clean up operation, we're |
1623 | + # going to consider even one success complete |
1624 | + # success. |
1625 | + success = True |
1626 | + except: |
1627 | + self.logger.exception('ERROR with remote server ' |
1628 | + '%(ip)s:%(port)s/%(device)s' % node) |
1629 | + if not_found_count == len(nodes): |
1630 | + success = True |
1631 | + if not success: |
1632 | + # If we didn't even have one success, we'll have to leave |
1633 | + # the janitor operation to retry later. |
1634 | + disk_file.store_janitor_segment_cleanup(account, container, |
1635 | + obj, segment_count, segment_last_deleted) |
1636 | + self.segment_cleanup_failures += 1 |
1637 | + return |
1638 | + segment_last_deleted = segment |
1639 | + self.segment_cleanup_segments += 1 |
1640 | + if segment >= segment_count - 1: |
1641 | + disk_file.tombstone(normalize_timestamp(time())) |
1642 | + self.segment_cleanup_completions += 1 |
1643 | + return |
1644 | + segments_this_pass -= 1 |
1645 | + if segments_this_pass <= 0: |
1646 | + disk_file.store_janitor_segment_cleanup(account, container, |
1647 | + obj, segment_count, segment_last_deleted) |
1648 | + return |
1649 | + except: |
1650 | + # If we get an unexpected exception, log it and try to update the |
1651 | + # disk file to indicate the segment last deleted. |
1652 | + self.logger.exception('ERROR unexpected exception with %s:' % path) |
1653 | + self.segment_cleanup_failures += 1 |
1654 | + if segment_last_deleted != starting_segment_last_deleted: |
1655 | + disk_file.store_janitor_segment_cleanup(account, container, |
1656 | + obj, segment_count, segment_last_deleted) |
1657 | |
1658 | === modified file 'swift/obj/replicator.py' |
1659 | --- swift/obj/replicator.py 2010-10-19 01:05:54 +0000 |
1660 | +++ swift/obj/replicator.py 2010-11-08 18:51:48 +0000 |
1661 | @@ -19,7 +19,6 @@ |
1662 | import shutil |
1663 | import time |
1664 | import logging |
1665 | -import hashlib |
1666 | import itertools |
1667 | import cPickle as pickle |
1668 | |
1669 | @@ -29,168 +28,16 @@ |
1670 | from eventlet.support.greenlets import GreenletExit |
1671 | |
1672 | from swift.common.ring import Ring |
1673 | -from swift.common.utils import whataremyips, unlink_older_than, lock_path, \ |
1674 | - renamer, compute_eta, get_logger |
1675 | +from swift.common.utils import compute_eta, get_logger, unlink_older_than, \ |
1676 | + whataremyips |
1677 | from swift.common.bufferedhttp import http_connect |
1678 | from swift.common.daemon import Daemon |
1679 | +from swift.obj.diskfile import DATADIR, get_hashes, JANITORDIR, \ |
1680 | + recalculate_hashes, SEGMENTSDIR |
1681 | + |
1682 | |
1683 | hubs.use_hub('poll') |
1684 | |
1685 | -PICKLE_PROTOCOL = 2 |
1686 | -ONE_WEEK = 604800 |
1687 | -HASH_FILE = 'hashes.pkl' |
1688 | - |
1689 | - |
1690 | -def hash_suffix(path, reclaim_age): |
1691 | - """ |
1692 | - Performs reclamation and returns an md5 of all (remaining) files. |
1693 | - |
1694 | - :param reclaim_age: age in seconds at which to remove tombstones |
1695 | - """ |
1696 | - md5 = hashlib.md5() |
1697 | - for hsh in sorted(os.listdir(path)): |
1698 | - hsh_path = join(path, hsh) |
1699 | - files = os.listdir(hsh_path) |
1700 | - if len(files) == 1: |
1701 | - if files[0].endswith('.ts'): |
1702 | - # remove tombstones older than reclaim_age |
1703 | - ts = files[0].rsplit('.', 1)[0] |
1704 | - if (time.time() - float(ts)) > reclaim_age: |
1705 | - os.unlink(join(hsh_path, files[0])) |
1706 | - files.remove(files[0]) |
1707 | - elif files: |
1708 | - files.sort(reverse=True) |
1709 | - meta = data = tomb = None |
1710 | - for filename in files: |
1711 | - if not meta and filename.endswith('.meta'): |
1712 | - meta = filename |
1713 | - if not data and filename.endswith('.data'): |
1714 | - data = filename |
1715 | - if not tomb and filename.endswith('.ts'): |
1716 | - tomb = filename |
1717 | - if (filename < tomb or # any file older than tomb |
1718 | - filename < data or # any file older than data |
1719 | - (filename.endswith('.meta') and |
1720 | - filename < meta)): # old meta |
1721 | - os.unlink(join(hsh_path, filename)) |
1722 | - files.remove(filename) |
1723 | - if not files: |
1724 | - os.rmdir(hsh_path) |
1725 | - for filename in files: |
1726 | - md5.update(filename) |
1727 | - try: |
1728 | - os.rmdir(path) |
1729 | - except OSError: |
1730 | - pass |
1731 | - return md5.hexdigest() |
1732 | - |
1733 | - |
1734 | -def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK): |
1735 | - """ |
1736 | - Recalculates hashes for the given suffixes in the partition and updates |
1737 | - them in the partition's hashes file. |
1738 | - |
1739 | - :param partition_dir: directory of the partition in which to recalculate |
1740 | - :param suffixes: list of suffixes to recalculate |
1741 | - :param reclaim_age: age in seconds at which tombstones should be removed |
1742 | - """ |
1743 | - |
1744 | - def tpool_listdir(partition_dir): |
1745 | - return dict(((suff, None) for suff in os.listdir(partition_dir) |
1746 | - if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
1747 | - hashes_file = join(partition_dir, HASH_FILE) |
1748 | - with lock_path(partition_dir): |
1749 | - try: |
1750 | - with open(hashes_file, 'rb') as fp: |
1751 | - hashes = pickle.load(fp) |
1752 | - except Exception: |
1753 | - hashes = tpool.execute(tpool_listdir, partition_dir) |
1754 | - for suffix in suffixes: |
1755 | - suffix_dir = join(partition_dir, suffix) |
1756 | - if os.path.exists(suffix_dir): |
1757 | - hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
1758 | - elif suffix in hashes: |
1759 | - del hashes[suffix] |
1760 | - with open(hashes_file + '.tmp', 'wb') as fp: |
1761 | - pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
1762 | - renamer(hashes_file + '.tmp', hashes_file) |
1763 | - |
1764 | - |
1765 | -def invalidate_hash(suffix_dir): |
1766 | - """ |
1767 | - Invalidates the hash for a suffix_dir in the partition's hashes file. |
1768 | - |
1769 | - :param suffix_dir: absolute path to suffix dir whose hash needs |
1770 | - invalidating |
1771 | - """ |
1772 | - |
1773 | - suffix = os.path.basename(suffix_dir) |
1774 | - partition_dir = os.path.dirname(suffix_dir) |
1775 | - hashes_file = join(partition_dir, HASH_FILE) |
1776 | - with lock_path(partition_dir): |
1777 | - try: |
1778 | - with open(hashes_file, 'rb') as fp: |
1779 | - hashes = pickle.load(fp) |
1780 | - if suffix in hashes and not hashes[suffix]: |
1781 | - return |
1782 | - except Exception: |
1783 | - return |
1784 | - hashes[suffix] = None |
1785 | - with open(hashes_file + '.tmp', 'wb') as fp: |
1786 | - pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
1787 | - renamer(hashes_file + '.tmp', hashes_file) |
1788 | - |
1789 | - |
1790 | -def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK): |
1791 | - """ |
1792 | - Get a list of hashes for the suffix dir. do_listdir causes it to mistrust |
1793 | - the hash cache for suffix existence at the (unexpectedly high) cost of a |
1794 | - listdir. reclaim_age is just passed on to hash_suffix. |
1795 | - |
1796 | - :param partition_dir: absolute path of partition to get hashes for |
1797 | - :param do_listdir: force existence check for all hashes in the partition |
1798 | - :param reclaim_age: age at which to remove tombstones |
1799 | - |
1800 | - :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) |
1801 | - """ |
1802 | - |
1803 | - def tpool_listdir(hashes, partition_dir): |
1804 | - return dict(((suff, hashes.get(suff, None)) |
1805 | - for suff in os.listdir(partition_dir) |
1806 | - if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
1807 | - hashed = 0 |
1808 | - hashes_file = join(partition_dir, HASH_FILE) |
1809 | - with lock_path(partition_dir): |
1810 | - modified = False |
1811 | - hashes = {} |
1812 | - try: |
1813 | - with open(hashes_file, 'rb') as fp: |
1814 | - hashes = pickle.load(fp) |
1815 | - except Exception: |
1816 | - do_listdir = True |
1817 | - if do_listdir: |
1818 | - hashes = tpool.execute(tpool_listdir, hashes, partition_dir) |
1819 | - modified = True |
1820 | - for suffix, hash_ in hashes.items(): |
1821 | - if not hash_: |
1822 | - suffix_dir = join(partition_dir, suffix) |
1823 | - if os.path.exists(suffix_dir): |
1824 | - try: |
1825 | - hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
1826 | - hashed += 1 |
1827 | - except OSError: |
1828 | - logging.exception('Error hashing suffix') |
1829 | - hashes[suffix] = None |
1830 | - else: |
1831 | - del hashes[suffix] |
1832 | - modified = True |
1833 | - sleep() |
1834 | - if modified: |
1835 | - with open(hashes_file + '.tmp', 'wb') as fp: |
1836 | - pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
1837 | - renamer(hashes_file + '.tmp', hashes_file) |
1838 | - return hashed, hashes |
1839 | - |
1840 | |
1841 | class ObjectReplicator(Daemon): |
1842 | """ |
1843 | @@ -302,7 +149,7 @@ |
1844 | if not had_any: |
1845 | return False |
1846 | args.append(join(rsync_module, node['device'], |
1847 | - 'objects', job['partition'])) |
1848 | + job.get('datadir', DATADIR), job['partition'])) |
1849 | return self._rsync(args) == 0 |
1850 | |
1851 | def check_ring(self): |
1852 | @@ -337,12 +184,14 @@ |
1853 | for node in job['nodes']: |
1854 | success = self.rsync(node, job, suffixes) |
1855 | if success: |
1856 | + headers = {'Content-Length': '0', |
1857 | + 'X-Data-Dir': job.get('datadir', DATADIR)} |
1858 | with Timeout(self.http_timeout): |
1859 | http_connect(node['ip'], |
1860 | node['port'], |
1861 | node['device'], job['partition'], 'REPLICATE', |
1862 | '/' + '-'.join(suffixes), |
1863 | - headers={'Content-Length': '0'}).getresponse().read() |
1864 | + headers=headers).getresponse().read() |
1865 | responses.append(success) |
1866 | if not suffixes or (len(responses) == \ |
1867 | self.object_ring.replica_count and all(responses)): |
1868 | @@ -374,10 +223,12 @@ |
1869 | node = next(nodes) |
1870 | attempts_left -= 1 |
1871 | try: |
1872 | + headers = {'Content-Length': '0', |
1873 | + 'X-Data-Dir': job.get('datadir', DATADIR)} |
1874 | with Timeout(self.http_timeout): |
1875 | resp = http_connect(node['ip'], node['port'], |
1876 | node['device'], job['partition'], 'REPLICATE', |
1877 | - '', headers={'Content-Length': '0'}).getresponse() |
1878 | + '', headers=headers).getresponse() |
1879 | if resp.status == 507: |
1880 | self.logger.error('%s/%s responded as unmounted' % |
1881 | (node['ip'], node['device'])) |
1882 | @@ -397,11 +248,13 @@ |
1883 | self.rsync(node, job, suffixes) |
1884 | recalculate_hashes(job['path'], suffixes, |
1885 | reclaim_age=self.reclaim_age) |
1886 | + headers = {'Content-Length': '0', |
1887 | + 'X-Data-Dir': job.get('datadir', DATADIR)} |
1888 | with Timeout(self.http_timeout): |
1889 | conn = http_connect(node['ip'], node['port'], |
1890 | node['device'], job['partition'], 'REPLICATE', |
1891 | '/' + '-'.join(suffixes), |
1892 | - headers={'Content-Length': '0'}) |
1893 | + headers=headers) |
1894 | conn.getresponse().read() |
1895 | self.suffix_sync += len(suffixes) |
1896 | except (Exception, Timeout): |
1897 | @@ -489,24 +342,27 @@ |
1898 | dev for dev in self.object_ring.devs |
1899 | if dev and dev['ip'] in ips and dev['port'] == self.port]: |
1900 | dev_path = join(self.devices_dir, local_dev['device']) |
1901 | - obj_path = join(dev_path, 'objects') |
1902 | - tmp_path = join(dev_path, 'tmp') |
1903 | if self.mount_check and not os.path.ismount(dev_path): |
1904 | self.logger.warn('%s is not mounted' % local_dev['device']) |
1905 | continue |
1906 | + tmp_path = join(dev_path, 'tmp') |
1907 | unlink_older_than(tmp_path, time.time() - self.reclaim_age) |
1908 | - if not os.path.exists(obj_path): |
1909 | - continue |
1910 | - for partition in os.listdir(obj_path): |
1911 | - try: |
1912 | - nodes = [node for node in |
1913 | - self.object_ring.get_part_nodes(int(partition)) |
1914 | - if node['id'] != local_dev['id']] |
1915 | - jobs.append(dict(path=join(obj_path, partition), |
1916 | - nodes=nodes, delete=len(nodes) > 2, |
1917 | - partition=partition)) |
1918 | - except ValueError: |
1919 | - continue |
1920 | + for datadir in (DATADIR, JANITORDIR, SEGMENTSDIR): |
1921 | + obj_path = join(dev_path, datadir) |
1922 | + if os.path.exists(obj_path): |
1923 | + for partition in os.listdir(obj_path): |
1924 | + try: |
1925 | + nodes = [node for node in |
1926 | + self.object_ring.get_part_nodes( |
1927 | + int(partition)) |
1928 | + if node['id'] != local_dev['id']] |
1929 | + jobs.append(dict( |
1930 | + path=join(obj_path, partition), |
1931 | + nodes=nodes, delete=len(nodes) > 2, |
1932 | + partition=partition, |
1933 | + datadir=datadir)) |
1934 | + except ValueError: |
1935 | + continue |
1936 | random.shuffle(jobs) |
1937 | # Partititons that need to be deleted take priority |
1938 | jobs.sort(key=lambda job: not job['delete']) |
1939 | |
1940 | === modified file 'swift/obj/server.py' |
1941 | --- swift/obj/server.py 2010-11-01 21:47:48 +0000 |
1942 | +++ swift/obj/server.py 2010-11-08 18:51:48 +0000 |
1943 | @@ -17,225 +17,28 @@ |
1944 | |
1945 | from __future__ import with_statement |
1946 | import cPickle as pickle |
1947 | -import errno |
1948 | import os |
1949 | import time |
1950 | import traceback |
1951 | from datetime import datetime |
1952 | from hashlib import md5 |
1953 | -from tempfile import mkstemp |
1954 | from urllib import unquote |
1955 | -from contextlib import contextmanager |
1956 | |
1957 | from webob import Request, Response, UTC |
1958 | from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ |
1959 | HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \ |
1960 | HTTPNotModified, HTTPPreconditionFailed, \ |
1961 | HTTPRequestTimeout, HTTPUnprocessableEntity, HTTPMethodNotAllowed |
1962 | -from xattr import getxattr, setxattr |
1963 | from eventlet import sleep, Timeout |
1964 | |
1965 | -from swift.common.utils import mkdirs, normalize_timestamp, \ |
1966 | - storage_directory, hash_path, renamer, fallocate, \ |
1967 | - split_path, drop_buffer_cache, get_logger, write_pickle |
1968 | +from swift.common.utils import drop_buffer_cache, fallocate, get_logger, \ |
1969 | + mkdirs, normalize_timestamp, split_path |
1970 | from swift.common.bufferedhttp import http_connect |
1971 | from swift.common.constraints import check_object_creation, check_mount, \ |
1972 | check_float, check_utf8 |
1973 | from swift.common.exceptions import ConnectionTimeout |
1974 | -from swift.obj.replicator import get_hashes, invalidate_hash, \ |
1975 | - recalculate_hashes |
1976 | - |
1977 | - |
1978 | -DATADIR = 'objects' |
1979 | -ASYNCDIR = 'async_pending' |
1980 | -PICKLE_PROTOCOL = 2 |
1981 | -METADATA_KEY = 'user.swift.metadata' |
1982 | -MAX_OBJECT_NAME_LENGTH = 1024 |
1983 | - |
1984 | - |
1985 | -def read_metadata(fd): |
1986 | - """ |
1987 | - Helper function to read the pickled metadata from an object file. |
1988 | - |
1989 | - :param fd: file descriptor to load the metadata from |
1990 | - |
1991 | - :returns: dictionary of metadata |
1992 | - """ |
1993 | - metadata = '' |
1994 | - key = 0 |
1995 | - try: |
1996 | - while True: |
1997 | - metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or ''))) |
1998 | - key += 1 |
1999 | - except IOError: |
2000 | - pass |
2001 | - return pickle.loads(metadata) |
2002 | - |
2003 | - |
2004 | -class DiskFile(object): |
2005 | - """ |
2006 | - Manage object files on disk. |
2007 | - |
2008 | - :param path: path to devices on the node |
2009 | - :param device: device name |
2010 | - :param partition: partition on the device the object lives in |
2011 | - :param account: account name for the object |
2012 | - :param container: container name for the object |
2013 | - :param obj: object name for the object |
2014 | - :param keep_data_fp: if True, don't close the fp, otherwise close it |
2015 | - :param disk_chunk_Size: size of chunks on file reads |
2016 | - """ |
2017 | - |
2018 | - def __init__(self, path, device, partition, account, container, obj, |
2019 | - keep_data_fp=False, disk_chunk_size=65536): |
2020 | - self.disk_chunk_size = disk_chunk_size |
2021 | - self.name = '/' + '/'.join((account, container, obj)) |
2022 | - name_hash = hash_path(account, container, obj) |
2023 | - self.datadir = os.path.join(path, device, |
2024 | - storage_directory(DATADIR, partition, name_hash)) |
2025 | - self.tmpdir = os.path.join(path, device, 'tmp') |
2026 | - self.metadata = {} |
2027 | - self.meta_file = None |
2028 | - self.data_file = None |
2029 | - if not os.path.exists(self.datadir): |
2030 | - return |
2031 | - files = sorted(os.listdir(self.datadir), reverse=True) |
2032 | - for file in files: |
2033 | - if file.endswith('.ts'): |
2034 | - self.data_file = self.meta_file = None |
2035 | - self.metadata = {'deleted': True} |
2036 | - return |
2037 | - if file.endswith('.meta') and not self.meta_file: |
2038 | - self.meta_file = os.path.join(self.datadir, file) |
2039 | - if file.endswith('.data') and not self.data_file: |
2040 | - self.data_file = os.path.join(self.datadir, file) |
2041 | - break |
2042 | - if not self.data_file: |
2043 | - return |
2044 | - self.fp = open(self.data_file, 'rb') |
2045 | - self.metadata = read_metadata(self.fp) |
2046 | - if not keep_data_fp: |
2047 | - self.close() |
2048 | - if self.meta_file: |
2049 | - with open(self.meta_file) as mfp: |
2050 | - for key in self.metadata.keys(): |
2051 | - if key.lower() not in ('content-type', 'content-encoding', |
2052 | - 'deleted', 'content-length', 'etag'): |
2053 | - del self.metadata[key] |
2054 | - self.metadata.update(read_metadata(mfp)) |
2055 | - |
2056 | - def __iter__(self): |
2057 | - """Returns an iterator over the data file.""" |
2058 | - try: |
2059 | - dropped_cache = 0 |
2060 | - read = 0 |
2061 | - while True: |
2062 | - chunk = self.fp.read(self.disk_chunk_size) |
2063 | - if chunk: |
2064 | - read += len(chunk) |
2065 | - if read - dropped_cache > (1024 * 1024): |
2066 | - drop_buffer_cache(self.fp.fileno(), dropped_cache, |
2067 | - read - dropped_cache) |
2068 | - dropped_cache = read |
2069 | - yield chunk |
2070 | - else: |
2071 | - drop_buffer_cache(self.fp.fileno(), dropped_cache, |
2072 | - read - dropped_cache) |
2073 | - break |
2074 | - finally: |
2075 | - self.close() |
2076 | - |
2077 | - def app_iter_range(self, start, stop): |
2078 | - """Returns an iterator over the data file for range (start, stop)""" |
2079 | - if start: |
2080 | - self.fp.seek(start) |
2081 | - if stop is not None: |
2082 | - length = stop - start |
2083 | - else: |
2084 | - length = None |
2085 | - for chunk in self: |
2086 | - if length is not None: |
2087 | - length -= len(chunk) |
2088 | - if length < 0: |
2089 | - # Chop off the extra: |
2090 | - yield chunk[:length] |
2091 | - break |
2092 | - yield chunk |
2093 | - |
2094 | - def close(self): |
2095 | - """Close the file.""" |
2096 | - if self.fp: |
2097 | - self.fp.close() |
2098 | - self.fp = None |
2099 | - |
2100 | - def is_deleted(self): |
2101 | - """ |
2102 | - Check if the file is deleted. |
2103 | - |
2104 | - :returns: True if the file doesn't exist or has been flagged as |
2105 | - deleted. |
2106 | - """ |
2107 | - return not self.data_file or 'deleted' in self.metadata |
2108 | - |
2109 | - @contextmanager |
2110 | - def mkstemp(self): |
2111 | - """Contextmanager to make a temporary file.""" |
2112 | - if not os.path.exists(self.tmpdir): |
2113 | - mkdirs(self.tmpdir) |
2114 | - fd, tmppath = mkstemp(dir=self.tmpdir) |
2115 | - try: |
2116 | - yield fd, tmppath |
2117 | - finally: |
2118 | - try: |
2119 | - os.close(fd) |
2120 | - except OSError: |
2121 | - pass |
2122 | - try: |
2123 | - os.unlink(tmppath) |
2124 | - except OSError: |
2125 | - pass |
2126 | - |
2127 | - def put(self, fd, tmppath, metadata, extension='.data'): |
2128 | - """ |
2129 | - Finalize writing the file on disk, and renames it from the temp file to |
2130 | - the real location. This should be called after the data has been |
2131 | - written to the temp file. |
2132 | - |
2133 | - :params fd: file descriptor of the temp file |
2134 | - :param tmppath: path to the temporary file being used |
2135 | - :param metadata: dictionary of metada to be written |
2136 | - :param extention: extension to be used when making the file |
2137 | - """ |
2138 | - metadata['name'] = self.name |
2139 | - timestamp = normalize_timestamp(metadata['X-Timestamp']) |
2140 | - metastr = pickle.dumps(metadata, PICKLE_PROTOCOL) |
2141 | - key = 0 |
2142 | - while metastr: |
2143 | - setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254]) |
2144 | - metastr = metastr[254:] |
2145 | - key += 1 |
2146 | - if 'Content-Length' in metadata: |
2147 | - drop_buffer_cache(fd, 0, int(metadata['Content-Length'])) |
2148 | - os.fsync(fd) |
2149 | - invalidate_hash(os.path.dirname(self.datadir)) |
2150 | - renamer(tmppath, os.path.join(self.datadir, timestamp + extension)) |
2151 | - self.metadata = metadata |
2152 | - |
2153 | - def unlinkold(self, timestamp): |
2154 | - """ |
2155 | - Remove any older versions of the object file. Any file that has an |
2156 | - older timestamp than timestamp will be deleted. |
2157 | - |
2158 | - :param timestamp: timestamp to compare with each file |
2159 | - """ |
2160 | - timestamp = normalize_timestamp(timestamp) |
2161 | - for fname in os.listdir(self.datadir): |
2162 | - if fname < timestamp: |
2163 | - try: |
2164 | - os.unlink(os.path.join(self.datadir, fname)) |
2165 | - except OSError, err: # pragma: no cover |
2166 | - if err.errno != errno.ENOENT: |
2167 | - raise |
2168 | +from swift.obj.diskfile import DATADIR, DiskFile, get_hashes, JANITORDIR, \ |
2169 | + recalculate_hashes |
2170 | |
2171 | |
2172 | class ObjectController(object): |
2173 | @@ -262,7 +65,7 @@ |
2174 | self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024 |
2175 | |
2176 | def container_update(self, op, account, container, obj, headers_in, |
2177 | - headers_out, objdevice): |
2178 | + headers_out, objdevice, objpartition): |
2179 | """ |
2180 | Update the container when objects are updated. |
2181 | |
2182 | @@ -274,6 +77,7 @@ |
2183 | :param headers_out: dictionary of headers to send in the container |
2184 | request |
2185 | :param objdevice: device name that the object is in |
2186 | + :param objpartition: partition that the object is in |
2187 | """ |
2188 | host = headers_in.get('X-Container-Host', None) |
2189 | partition = headers_in.get('X-Container-Partition', None) |
2190 | @@ -293,20 +97,16 @@ |
2191 | return |
2192 | else: |
2193 | self.logger.error('ERROR Container update failed (saving ' |
2194 | - 'for async update later): %d response from %s:%s/%s' % |
2195 | - (response.status, ip, port, contdevice)) |
2196 | + 'for janitor update later): %d response from %s:%s/%s' |
2197 | + % (response.status, ip, port, contdevice)) |
2198 | except: |
2199 | self.logger.exception('ERROR container update failed with ' |
2200 | - '%s:%s/%s transaction %s (saving for async update later)' % |
2201 | + '%s:%s/%s transaction %s (saving for janitor update later)' % |
2202 | (ip, port, contdevice, headers_in.get('x-cf-trans-id', '-'))) |
2203 | - async_dir = os.path.join(self.devices, objdevice, ASYNCDIR) |
2204 | - ohash = hash_path(account, container, obj) |
2205 | - write_pickle( |
2206 | - {'op': op, 'account': account, 'container': container, |
2207 | - 'obj': obj, 'headers': headers_out}, |
2208 | - os.path.join(async_dir, ohash[-3:], ohash + '-' + |
2209 | - normalize_timestamp(headers_out['x-timestamp'])), |
2210 | - os.path.join(self.devices, objdevice, 'tmp')) |
2211 | + df = DiskFile(self.devices, objdevice, objpartition, account, |
2212 | + container, obj, datadir=JANITORDIR) |
2213 | + df.store_janitor_container_update(op, account, container, obj, |
2214 | + headers_out, []) |
2215 | |
2216 | def POST(self, request): |
2217 | """Handle HTTP POST requests for the Swift Object Server.""" |
2218 | @@ -355,7 +155,15 @@ |
2219 | if error_response: |
2220 | return error_response |
2221 | file = DiskFile(self.devices, device, partition, account, container, |
2222 | - obj, disk_chunk_size=self.disk_chunk_size) |
2223 | + obj, disk_chunk_size=self.disk_chunk_size, keep_data_fp=True, |
2224 | + segment=request.headers.get('x-object-segment'), |
2225 | + segment_timestamp=request.headers['x-timestamp']) |
2226 | + overwritten_manifest = False |
2227 | + if not file.is_deleted() and \ |
2228 | + file.metadata.get('X-Object-Type') == 'manifest' and \ |
2229 | + 'x-object-segment' not in request.headers: |
2230 | + overwritten_manifest = pickle.loads(''.join(iter(file))) |
2231 | + file.close() |
2232 | upload_expiration = time.time() + self.max_upload_time |
2233 | etag = md5() |
2234 | upload_size = 0 |
2235 | @@ -397,17 +205,51 @@ |
2236 | if 'content-encoding' in request.headers: |
2237 | metadata['Content-Encoding'] = \ |
2238 | request.headers['Content-Encoding'] |
2239 | - file.put(fd, tmppath, metadata) |
2240 | + if 'x-object-type' in request.headers: |
2241 | + metadata['X-Object-Type'] = request.headers['x-object-type'] |
2242 | + if 'x-object-segment' in request.headers: |
2243 | + metadata['X-Object-Segment'] = \ |
2244 | + request.headers['x-object-segment'] |
2245 | + no_longer_segment = False |
2246 | + if 'x-object-segment-if-length' in request.headers and \ |
2247 | + int(request.headers['x-object-segment-if-length']) != \ |
2248 | + os.fstat(fd).st_size: |
2249 | + del metadata['X-Object-Type'] |
2250 | + del metadata['X-Object-Segment'] |
2251 | + no_longer_segment = True |
2252 | + elif int(request.headers.get('x-object-segment', -1)) == 0: |
2253 | + # Write out a janitor operation to clean up this multi-segment |
2254 | + # PUT in the future if it fails. |
2255 | + df = DiskFile(self.devices, device, partition, |
2256 | + 'Segment-Cleanup', request.headers['x-timestamp'], |
2257 | + '%s/%s/%s' % (account, container, obj), |
2258 | + datadir=JANITORDIR) |
2259 | + df.store_janitor_segment_cleanup(account, container, obj, |
2260 | + segment_count=None, segment_last_deleted=None) |
2261 | + if overwritten_manifest: |
2262 | + # Write out a janitor operation to clean up the overwritten |
2263 | + # multi-segment object. |
2264 | + df = DiskFile(self.devices, device, partition, |
2265 | + 'Segment-Cleanup', overwritten_manifest['x-timestamp'], |
2266 | + '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR) |
2267 | + df.store_janitor_segment_cleanup(account, container, obj, |
2268 | + segment_count=(overwritten_manifest['content-length'] / |
2269 | + overwritten_manifest['x-segment-size'] + 1), |
2270 | + segment_last_deleted=None) |
2271 | + file.put(fd, tmppath, metadata, |
2272 | + no_longer_segment=no_longer_segment) |
2273 | file.unlinkold(metadata['X-Timestamp']) |
2274 | - self.container_update('PUT', account, container, obj, request.headers, |
2275 | - {'x-size': file.metadata['Content-Length'], |
2276 | - 'x-content-type': file.metadata['Content-Type'], |
2277 | - 'x-timestamp': file.metadata['X-Timestamp'], |
2278 | - 'x-etag': file.metadata['ETag'], |
2279 | - 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
2280 | - device) |
2281 | - resp = HTTPCreated(request=request, etag=etag) |
2282 | - return resp |
2283 | + if 'X-Object-Segment' not in file.metadata: |
2284 | + self.container_update('PUT', account, container, obj, |
2285 | + request.headers, |
2286 | + {'x-size': request.headers.get('x-object-length', |
2287 | + file.metadata['Content-Length']), |
2288 | + 'x-content-type': file.metadata['Content-Type'], |
2289 | + 'x-timestamp': file.metadata['X-Timestamp'], |
2290 | + 'x-etag': file.metadata['ETag'], |
2291 | + 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
2292 | + device, partition) |
2293 | + return HTTPCreated(request=request, etag=etag) |
2294 | |
2295 | def GET(self, request): |
2296 | """Handle HTTP GET requests for the Swift Object Server.""" |
2297 | @@ -420,7 +262,9 @@ |
2298 | if self.mount_check and not check_mount(self.devices, device): |
2299 | return Response(status='507 %s is not mounted' % device) |
2300 | file = DiskFile(self.devices, device, partition, account, container, |
2301 | - obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size) |
2302 | + obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size, |
2303 | + segment=request.headers.get('x-object-segment'), |
2304 | + segment_timestamp=request.headers.get('x-object-segment-timestamp')) |
2305 | if file.is_deleted(): |
2306 | if request.headers.get('if-match') == '*': |
2307 | return HTTPPreconditionFailed(request=request) |
2308 | @@ -460,7 +304,9 @@ |
2309 | 'application/octet-stream'), app_iter=file, |
2310 | request=request, conditional_response=True) |
2311 | for key, value in file.metadata.iteritems(): |
2312 | - if key.lower().startswith('x-object-meta-'): |
2313 | + if key.lower().startswith('x-object-meta-') or \ |
2314 | + key.lower() in ('x-timestamp', 'x-object-type', |
2315 | + 'x-object-segment'): |
2316 | response.headers[key] = value |
2317 | response.etag = file.metadata['ETag'] |
2318 | response.last_modified = float(file.metadata['X-Timestamp']) |
2319 | @@ -482,13 +328,17 @@ |
2320 | if self.mount_check and not check_mount(self.devices, device): |
2321 | return Response(status='507 %s is not mounted' % device) |
2322 | file = DiskFile(self.devices, device, partition, account, container, |
2323 | - obj, disk_chunk_size=self.disk_chunk_size) |
2324 | + obj, disk_chunk_size=self.disk_chunk_size, |
2325 | + segment=request.headers.get('x-object-segment'), |
2326 | + segment_timestamp=request.headers.get('x-object-segment-timestamp')) |
2327 | if file.is_deleted(): |
2328 | return HTTPNotFound(request=request) |
2329 | response = Response(content_type=file.metadata['Content-Type'], |
2330 | request=request, conditional_response=True) |
2331 | for key, value in file.metadata.iteritems(): |
2332 | - if key.lower().startswith('x-object-meta-'): |
2333 | + if key.lower().startswith('x-object-meta-') or \ |
2334 | + key.lower() in ('x-timestamp', 'x-object-type', |
2335 | + 'x-object-segment'): |
2336 | response.headers[key] = value |
2337 | response.etag = file.metadata['ETag'] |
2338 | response.last_modified = float(file.metadata['X-Timestamp']) |
2339 | @@ -513,21 +363,43 @@ |
2340 | return Response(status='507 %s is not mounted' % device) |
2341 | response_class = HTTPNoContent |
2342 | file = DiskFile(self.devices, device, partition, account, container, |
2343 | - obj, disk_chunk_size=self.disk_chunk_size) |
2344 | + obj, disk_chunk_size=self.disk_chunk_size, keep_data_fp=True, |
2345 | + segment=request.headers.get('x-object-segment'), |
2346 | + segment_timestamp=request.headers.get('x-object-segment-timestamp')) |
2347 | + deleted_manifest = False |
2348 | if file.is_deleted(): |
2349 | response_class = HTTPNotFound |
2350 | + elif 'x-object-segment' not in request.headers and \ |
2351 | + file.metadata.get('X-Object-Type') == 'manifest': |
2352 | + deleted_manifest = pickle.loads(''.join(iter(file))) |
2353 | + file.close() |
2354 | metadata = { |
2355 | 'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True, |
2356 | } |
2357 | with file.mkstemp() as (fd, tmppath): |
2358 | + if deleted_manifest: |
2359 | + # Write out a janitor operation to clean up the deleted |
2360 | + # multi-segment object. Note that setting the |
2361 | + # segment_last_deleted = -1 will cause the object-janitor to |
2362 | + # start removing the segments immediately rather than waiting |
2363 | + # segment_reclaim_age (otherwise it can't tell the difference |
2364 | + # between a deleted manifest and manifest that just hasn't |
2365 | + # appeared yet). |
2366 | + df = DiskFile(self.devices, device, partition, |
2367 | + 'Segment-Cleanup', deleted_manifest['x-timestamp'], |
2368 | + '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR) |
2369 | + df.store_janitor_segment_cleanup(account, container, obj, |
2370 | + segment_count=(deleted_manifest['content-length'] / |
2371 | + deleted_manifest['x-segment-size']) + 1, |
2372 | + segment_last_deleted=-1) |
2373 | file.put(fd, tmppath, metadata, extension='.ts') |
2374 | file.unlinkold(metadata['X-Timestamp']) |
2375 | - self.container_update('DELETE', account, container, obj, |
2376 | - request.headers, {'x-timestamp': metadata['X-Timestamp'], |
2377 | - 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
2378 | - device) |
2379 | - resp = response_class(request=request) |
2380 | - return resp |
2381 | + if 'x-object-segment' not in request.headers: |
2382 | + self.container_update('DELETE', account, container, obj, |
2383 | + request.headers, {'x-timestamp': metadata['X-Timestamp'], |
2384 | + 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
2385 | + device, partition) |
2386 | + return response_class(request=request) |
2387 | |
2388 | def REPLICATE(self, request): |
2389 | """ |
2390 | @@ -542,7 +414,8 @@ |
2391 | content_type='text/plain') |
2392 | if self.mount_check and not check_mount(self.devices, device): |
2393 | return Response(status='507 %s is not mounted' % device) |
2394 | - path = os.path.join(self.devices, device, DATADIR, partition) |
2395 | + path = os.path.join(self.devices, device, |
2396 | + request.headers.get('x-data-dir', DATADIR), partition) |
2397 | if not os.path.exists(path): |
2398 | mkdirs(path) |
2399 | if suffix: |
2400 | |
2401 | === modified file 'swift/obj/updater.py' |
2402 | --- swift/obj/updater.py 2010-09-23 16:09:30 +0000 |
2403 | +++ swift/obj/updater.py 2010-11-08 18:51:48 +0000 |
2404 | @@ -27,11 +27,25 @@ |
2405 | from swift.common.ring import Ring |
2406 | from swift.common.utils import get_logger, renamer, write_pickle |
2407 | from swift.common.daemon import Daemon |
2408 | -from swift.obj.server import ASYNCDIR |
2409 | + |
2410 | + |
2411 | +# Old-style async pending directory |
2412 | +ASYNCDIR = 'async_pending' |
2413 | |
2414 | |
2415 | class ObjectUpdater(Daemon): |
2416 | - """Update object information in container listings.""" |
2417 | + """ |
2418 | + Update object information in container listings based on postponed |
2419 | + operations stored in the old-style async pending directory. |
2420 | + |
2421 | + After upgrade, no new operations will be stored in this old-style async |
2422 | + pending directory. Once this daemon empties those directories of all |
2423 | + operations, the daemon may be disabled and the directories removed. |
2424 | + |
2425 | + In a future release of Swift, this daemon will be removed. |
2426 | + |
2427 | + The new functionality is in swift.obj.janitor. |
2428 | + """ |
2429 | |
2430 | def __init__(self, conf): |
2431 | self.conf = conf |
2432 | |
2433 | === modified file 'swift/proxy/server.py' |
2434 | --- swift/proxy/server.py 2010-11-05 14:47:43 +0000 |
2435 | +++ swift/proxy/server.py 2010-11-08 18:51:48 +0000 |
2436 | @@ -14,21 +14,22 @@ |
2437 | # limitations under the License. |
2438 | |
2439 | from __future__ import with_statement |
2440 | +import cPickle as pickle |
2441 | import mimetypes |
2442 | import os |
2443 | import time |
2444 | import traceback |
2445 | from ConfigParser import ConfigParser |
2446 | +from hashlib import md5 |
2447 | from urllib import unquote, quote |
2448 | import uuid |
2449 | import functools |
2450 | |
2451 | from eventlet.timeout import Timeout |
2452 | -from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ |
2453 | - HTTPNotFound, HTTPPreconditionFailed, \ |
2454 | - HTTPRequestTimeout, HTTPServiceUnavailable, \ |
2455 | - HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \ |
2456 | - status_map |
2457 | +from webob.exc import HTTPBadRequest, HTTPCreated, HTTPInternalServerError, \ |
2458 | + HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ |
2459 | + HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \ |
2460 | + HTTPServiceUnavailable, HTTPUnprocessableEntity, status_map |
2461 | from webob import Request, Response |
2462 | |
2463 | from swift.common.ring import Ring |
2464 | @@ -37,7 +38,7 @@ |
2465 | from swift.common.bufferedhttp import http_connect |
2466 | from swift.common.constraints import check_metadata, check_object_creation, \ |
2467 | check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \ |
2468 | - MAX_FILE_SIZE |
2469 | + PICKLE_PROTOCOL |
2470 | from swift.common.exceptions import ChunkReadTimeout, \ |
2471 | ChunkWriteTimeout, ConnectionTimeout |
2472 | |
2473 | @@ -89,6 +90,144 @@ |
2474 | return wrapped |
2475 | |
2476 | |
2477 | +class SegmentedIterable(object): |
2478 | + """ |
2479 | + Iterable that returns the object contents for a segmented object in Swift. |
2480 | + |
2481 | + In addition to these params, you can also set the `response` attr just |
2482 | + after creating the SegmentedIterable and it will update the response's |
2483 | + `bytes_transferred` value (used to log the size of the request). |
2484 | + |
2485 | + :param controller: The ObjectController instance to work with. |
2486 | + :param content_length: The total length of the object. |
2487 | + :param segment_size: The length of each segment (except perhaps the last) |
2488 | + of the object. |
2489 | + :param timestamp: The X-Timestamp of the object's segments (set on the PUT, |
2490 | + not changed on the POSTs). |
2491 | + """ |
2492 | + |
2493 | + def __init__(self, controller, content_length, segment_size, timestamp): |
2494 | + self.controller = controller |
2495 | + self.content_length = content_length |
2496 | + self.segment_size = segment_size |
2497 | + self.timestamp = timestamp |
2498 | + self.position = 0 |
2499 | + self.segment = -1 |
2500 | + self.segment_iter = None |
2501 | + self.response = None |
2502 | + |
2503 | + def load_next_segment(self): |
2504 | + """ Loads the self.segment_iter with the next segment's contents. """ |
2505 | + self.segment += 1 |
2506 | + if self.segment: |
2507 | + ring_object_name = '%s/%s/%s' % (self.controller.object_name, |
2508 | + self.timestamp, self.segment) |
2509 | + else: |
2510 | + ring_object_name = self.controller.object_name |
2511 | + partition, nodes = self.controller.app.object_ring.get_nodes( |
2512 | + self.controller.account_name, self.controller.container_name, |
2513 | + ring_object_name) |
2514 | + path = '/%s/%s/%s' % (self.controller.account_name, |
2515 | + self.controller.container_name, self.controller.object_name) |
2516 | + req = Request.blank(path, headers={'X-Object-Segment': self.segment, |
2517 | + 'X-Object-Segment-Timestamp': self.timestamp}) |
2518 | + resp = self.controller.GETorHEAD_base(req, 'Object', |
2519 | + partition, self.controller.iter_nodes(partition, nodes, |
2520 | + self.controller.app.object_ring), path, |
2521 | + self.controller.app.object_ring.replica_count) |
2522 | + if resp.status_int // 100 != 2: |
2523 | + raise Exception( |
2524 | + 'Could not load segment %s of %s' % (self.segment, path)) |
2525 | + self.segment_iter = resp.app_iter |
2526 | + |
2527 | + def __iter__(self): |
2528 | + """ Standard iterator function that returns the object's contents. """ |
2529 | + while self.position < self.content_length: |
2530 | + if not self.segment_iter: |
2531 | + self.load_next_segment() |
2532 | + while True: |
2533 | + with ChunkReadTimeout(self.controller.app.node_timeout): |
2534 | + try: |
2535 | + chunk = self.segment_iter.next() |
2536 | + break |
2537 | + except StopIteration: |
2538 | + self.load_next_segment() |
2539 | + if self.position + len(chunk) > self.content_length: |
2540 | + chunk = chunk[:self.content_length - self.position] |
2541 | + self.position += len(chunk) |
2542 | + if self.response: |
2543 | + self.response.bytes_transferred = \ |
2544 | + getattr(self.response, 'bytes_transferred', 0) + len(chunk) |
2545 | + yield chunk |
2546 | + |
2547 | + def app_iter_range(self, start, stop): |
2548 | + """ |
2549 | + Non-standard iterator function for use with Webob in serving Range |
2550 | + requests more quickly. |
2551 | + |
2552 | + .. note:: |
2553 | + |
2554 | + This currently helps on speed by jumping to the proper segment to |
2555 | + start with (and ending without reading the trailing segments, but |
2556 | + that already happened technically with __iter__). |
2557 | + |
2558 | + But, what it does not do yet is issue a Range request with the |
2559 | + first segment to allow the object server to seek to the segment |
2560 | + start point. |
2561 | + |
2562 | + Instead, it just reads and throws away all leading segment data. |
2563 | + Since segments are 2G by default, it'll have to transfer the whole |
2564 | + 2G from the object server to the proxy server even if it only needs |
2565 | + the last byte. In practice, this should happen fairly quickly |
2566 | + relative to how long requests take for these very large files; but |
2567 | + it's still wasteful. |
2568 | + |
2569 | + Anyway, it shouldn't be too hard to implement, I just want to keep |
2570 | + the complexity down for now. |
2571 | + |
2572 | + :param start: The first byte (zero-based) to return. None for 0. |
2573 | + :param stop: The last byte (zero-based) to return. None for end. |
2574 | + """ |
2575 | + if start is None: |
2576 | + start = 0 |
2577 | + if start: |
2578 | + self.segment = (start / self.segment_size) - 1 |
2579 | + self.load_next_segment() |
2580 | + self.position = self.segment * self.segment_size |
2581 | + segment_start = start - (self.segment * self.segment_size) |
2582 | + while segment_start: |
2583 | + with ChunkReadTimeout(self.controller.app.node_timeout): |
2584 | + chunk = self.segment_iter.next() |
2585 | + self.position += len(chunk) |
2586 | + if len(chunk) > segment_start: |
2587 | + chunk = chunk[segment_start:] |
2588 | + if self.response: |
2589 | + self.response.bytes_transferred = \ |
2590 | + getattr(self.response, 'bytes_transferred', 0) + \ |
2591 | + len(chunk) |
2592 | + yield chunk |
2593 | + segment_start = 0 |
2594 | + else: |
2595 | + segment_start -= len(chunk) |
2596 | + if stop is not None: |
2597 | + length = stop - start |
2598 | + else: |
2599 | + length = None |
2600 | + for chunk in self: |
2601 | + if length is not None: |
2602 | + length -= len(chunk) |
2603 | + if length < 0: |
2604 | + # bytes_transferred had len(chunk) added by __iter__ so we |
2605 | + # need to subtract what we aren't going to use of the chunk |
2606 | + if self.response: |
2607 | + self.response.bytes_transferred = \ |
2608 | + getattr(self.response, 'bytes_transferred', |
2609 | + length) + length |
2610 | + yield chunk[:length] |
2611 | + break |
2612 | + yield chunk |
2613 | + |
2614 | + |
2615 | def get_container_memcache_key(account, container): |
2616 | path = '/%s/%s' % (account, container) |
2617 | return 'container%s' % path |
2618 | @@ -518,11 +657,56 @@ |
2619 | aresp = req.environ['swift.authorize'](req) |
2620 | if aresp: |
2621 | return aresp |
2622 | + # This is bit confusing, so an explanation: |
2623 | + # * First we attempt the GET/HEAD normally, as this is the usual case. |
2624 | + # * If the request was a Range request and gave us a 416 Unsatisfiable |
2625 | + # response, we might be trying to do an invalid Range on a manifest |
2626 | + # object, so we try again with no Range. |
2627 | + # * If it turns out we have a manifest object, and we had a Range |
2628 | + # request originally that actually succeeded or we had a HEAD |
2629 | + # request, we have to do the request again as a full GET because |
2630 | + # we'll need the whole manifest. |
2631 | + # * Finally, if we had a manifest object, we pass it and the request |
2632 | + # off to GETorHEAD_segmented; otherwise we just return the response. |
2633 | partition, nodes = self.app.object_ring.get_nodes( |
2634 | self.account_name, self.container_name, self.object_name) |
2635 | - return self.GETorHEAD_base(req, 'Object', partition, |
2636 | + resp = mresp = self.GETorHEAD_base(req, 'Object', partition, |
2637 | + self.iter_nodes(partition, nodes, self.app.object_ring), |
2638 | + req.path_info, self.app.object_ring.replica_count) |
2639 | + range_value = None |
2640 | + if mresp.status_int == 416: |
2641 | + range_value = req.range |
2642 | + req.range = None |
2643 | + mresp = self.GETorHEAD_base(req, 'Object', partition, |
2644 | self.iter_nodes(partition, nodes, self.app.object_ring), |
2645 | req.path_info, self.app.object_ring.replica_count) |
2646 | + if mresp.status_int // 100 != 2: |
2647 | + return resp |
2648 | + if 'x-object-type' in mresp.headers: |
2649 | + if mresp.headers['x-object-type'] == 'manifest': |
2650 | + if req.method == 'HEAD': |
2651 | + req.method = 'GET' |
2652 | + mresp = self.GETorHEAD_base(req, 'Object', partition, |
2653 | + self.iter_nodes(partition, nodes, |
2654 | + self.app.object_ring), req.path_info, |
2655 | + self.app.object_ring.replica_count) |
2656 | + if mresp.status_int // 100 != 2: |
2657 | + return mresp |
2658 | + req.method = 'HEAD' |
2659 | + elif req.range: |
2660 | + range_value = req.range |
2661 | + req.range = None |
2662 | + mresp = self.GETorHEAD_base(req, 'Object', partition, |
2663 | + self.iter_nodes(partition, nodes, |
2664 | + self.app.object_ring), req.path_info, |
2665 | + self.app.object_ring.replica_count) |
2666 | + if mresp.status_int // 100 != 2: |
2667 | + return mresp |
2668 | + if range_value: |
2669 | + req.range = range_value |
2670 | + return self.GETorHEAD_segmented(req, mresp) |
2671 | + return HTTPNotFound(request=req) |
2672 | + return resp |
2673 | |
2674 | @public |
2675 | @delay_denial |
2676 | @@ -536,6 +720,32 @@ |
2677 | """Handler for HTTP HEAD requests.""" |
2678 | return self.GETorHEAD(req) |
2679 | |
2680 | + def GETorHEAD_segmented(self, req, mresp): |
2681 | + """ |
2682 | + Performs a GET for a segmented object. |
2683 | + |
2684 | + :param req: The webob.Request to process. |
2685 | + :param mresp: The webob.Response for the original manifest request. |
2686 | + :returns: webob.Response object. |
2687 | + """ |
2688 | + manifest = pickle.loads(''.join(mresp.app_iter)) |
2689 | + content_length = int(manifest['content-length']) |
2690 | + segment_size = int(manifest['x-segment-size']) |
2691 | + headers = dict(mresp.headers) |
2692 | + headers.update(manifest) |
2693 | + del headers['x-segment-size'] |
2694 | + resp = Response(app_iter=SegmentedIterable(self, content_length, |
2695 | + segment_size, manifest['x-timestamp']), headers=headers, |
2696 | + request=req, conditional_response=True) |
2697 | + resp.headers['etag'] = manifest['etag'].strip('"') |
2698 | + resp.last_modified = mresp.last_modified |
2699 | + resp.content_length = int(manifest['content-length']) |
2700 | + resp.content_type = manifest['content-type'] |
2701 | + if 'content-encoding' in manifest: |
2702 | + resp.content_encoding = manifest['content-encoding'] |
2703 | + resp.app_iter.response = req.get_response(resp) |
2704 | + return resp.app_iter.response |
2705 | + |
2706 | @public |
2707 | @delay_denial |
2708 | def POST(self, req): |
2709 | @@ -609,7 +819,8 @@ |
2710 | req.headers['Content-Type'] = 'application/octet-stream' |
2711 | else: |
2712 | req.headers['Content-Type'] = guessed_type |
2713 | - error_response = check_object_creation(req, self.object_name) |
2714 | + error_response = check_object_creation(req, self.object_name, |
2715 | + self.app.max_object_size) |
2716 | if error_response: |
2717 | return error_response |
2718 | conns = [] |
2719 | @@ -654,11 +865,50 @@ |
2720 | if k.lower().startswith('x-object-meta-'): |
2721 | new_req.headers[k] = v |
2722 | req = new_req |
2723 | + if req.headers.get('transfer-encoding') == 'chunked' or \ |
2724 | + req.content_length > self.app.segment_size: |
2725 | + resp = self.PUT_segmented_object(req, data_source, partition, |
2726 | + nodes, container_partition, containers) |
2727 | + else: |
2728 | + resp = self.PUT_whole_object(req, data_source, partition, nodes, |
2729 | + container_partition, containers) |
2730 | + if source_header: |
2731 | + resp.headers['X-Copied-From'] = quote( |
2732 | + source_header.split('/', 2)[2]) |
2733 | + for k, v in req.headers.items(): |
2734 | + if k.lower().startswith('x-object-meta-'): |
2735 | + resp.headers[k] = v |
2736 | + # reset the bytes, since the user didn't actually send anything |
2737 | + req.bytes_transferred = 0 |
2738 | + resp.last_modified = float(req.headers['X-Timestamp']) |
2739 | + return resp |
2740 | + |
2741 | + def PUT_whole_object(self, req, data_source, partition, nodes, |
2742 | + container_partition=None, containers=None): |
2743 | + """ |
2744 | + Performs a PUT for a whole object (one with a content-length <= |
2745 | + self.app.segment_size). |
2746 | + |
2747 | + :param req: The webob.Request to process. |
2748 | + :param data_source: An iterator providing the data to store. |
2749 | + :param partition: The object ring partition the object falls on. |
2750 | + :param nodes: The object ring nodes the object falls on. |
2751 | + :param container_partition: The container ring partition the container |
2752 | + for the object falls on, None if the |
2753 | + container is not to be updated. |
2754 | + :param containers: The container ring nodes the container for the |
2755 | + object falls on, None if the container is not to be |
2756 | + updated. |
2757 | + :returns: webob.Response object. |
2758 | + """ |
2759 | + conns = [] |
2760 | + update_containers = containers is not None |
2761 | for node in self.iter_nodes(partition, nodes, self.app.object_ring): |
2762 | - container = containers.pop() |
2763 | - req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container |
2764 | - req.headers['X-Container-Partition'] = container_partition |
2765 | - req.headers['X-Container-Device'] = container['device'] |
2766 | + if update_containers: |
2767 | + container = containers.pop() |
2768 | + req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container |
2769 | + req.headers['X-Container-Partition'] = container_partition |
2770 | + req.headers['X-Container-Device'] = container['device'] |
2771 | req.headers['Expect'] = '100-continue' |
2772 | resp = conn = None |
2773 | if not self.error_limited(node): |
2774 | @@ -676,12 +926,14 @@ |
2775 | if conn and resp: |
2776 | if resp.status == 100: |
2777 | conns.append(conn) |
2778 | - if not containers: |
2779 | + if (update_containers and not containers) or \ |
2780 | + len(conns) == len(nodes): |
2781 | break |
2782 | continue |
2783 | elif resp.status == 507: |
2784 | self.error_limit(node) |
2785 | - containers.insert(0, container) |
2786 | + if update_containers: |
2787 | + containers.insert(0, container) |
2788 | if len(conns) <= len(nodes) / 2: |
2789 | self.app.logger.error( |
2790 | 'Object PUT returning 503, %s/%s required connections, ' |
2791 | @@ -701,7 +953,7 @@ |
2792 | break |
2793 | len_chunk = len(chunk) |
2794 | req.bytes_transferred += len_chunk |
2795 | - if req.bytes_transferred > MAX_FILE_SIZE: |
2796 | + if req.bytes_transferred > self.app.max_object_size: |
2797 | return HTTPRequestEntityTooLarge(request=req) |
2798 | for conn in list(conns): |
2799 | try: |
2800 | @@ -767,18 +1019,129 @@ |
2801 | statuses.append(503) |
2802 | reasons.append('') |
2803 | bodies.append('') |
2804 | - resp = self.best_response(req, statuses, reasons, bodies, 'Object PUT', |
2805 | + return self.best_response(req, statuses, reasons, bodies, 'Object PUT', |
2806 | etag=etag) |
2807 | - if source_header: |
2808 | - resp.headers['X-Copied-From'] = quote( |
2809 | - source_header.split('/', 2)[2]) |
2810 | - for k, v in req.headers.items(): |
2811 | - if k.lower().startswith('x-object-meta-'): |
2812 | - resp.headers[k] = v |
2813 | - # reset the bytes, since the user didn't actually send anything |
2814 | - req.bytes_transferred = 0 |
2815 | - resp.last_modified = float(req.headers['X-Timestamp']) |
2816 | - return resp |
2817 | + |
2818 | + def PUT_segmented_object(self, req, data_source, partition, nodes, |
2819 | + container_partition, containers): |
2820 | + """ |
2821 | + Performs a PUT for a segmented object (one with a content-length > |
2822 | + self.app.segment_size). |
2823 | + |
2824 | + :param req: The webob.Request to process. |
2825 | + :param data_source: An iterator providing the data to store. |
2826 | + :param partition: The object ring partition the object falls on. |
2827 | + :param nodes: The object ring nodes the object falls on. |
2828 | + :param container_partition: The container ring partition the container |
2829 | + for the object falls on. |
2830 | + :param containers: The container ring nodes the container for the |
2831 | + object falls on. |
2832 | + :returns: webob.Response object. |
2833 | + """ |
2834 | + req.bytes_transferred = 0 |
2835 | + leftover_chunk = [None] |
2836 | + etag = md5() |
2837 | + |
2838 | + def segment_iter(): |
2839 | + amount_given = 0 |
2840 | + while amount_given < self.app.segment_size: |
2841 | + if leftover_chunk[0]: |
2842 | + chunk = leftover_chunk[0] |
2843 | + leftover_chunk[0] = None |
2844 | + else: |
2845 | + with ChunkReadTimeout(self.app.client_timeout): |
2846 | + chunk = data_source.next() |
2847 | + req.bytes_transferred += len(chunk) |
2848 | + etag.update(chunk) |
2849 | + if amount_given + len(chunk) > self.app.segment_size: |
2850 | + yield chunk[:self.app.segment_size - amount_given] |
2851 | + leftover_chunk[0] = \ |
2852 | + chunk[self.app.segment_size - amount_given:] |
2853 | + amount_given = self.app.segment_size |
2854 | + else: |
2855 | + yield chunk |
2856 | + amount_given += len(chunk) |
2857 | + |
2858 | + def segment_iter_iter(): |
2859 | + while True: |
2860 | + if not leftover_chunk[0]: |
2861 | + with ChunkReadTimeout(self.app.client_timeout): |
2862 | + leftover_chunk[0] = data_source.next() |
2863 | + req.bytes_transferred += len(leftover_chunk[0]) |
2864 | + etag.update(leftover_chunk[0]) |
2865 | + yield segment_iter() |
2866 | + |
2867 | + segment_number = 0 |
2868 | + chunked = req.headers.get('transfer-encoding') == 'chunked' |
2869 | + if not chunked: |
2870 | + amount_left = req.content_length |
2871 | + headers = {'X-Timestamp': req.headers['X-Timestamp'], |
2872 | + 'Content-Type': req.headers['content-type'], |
2873 | + 'X-Object-Type': 'segment'} |
2874 | + for segment_source in segment_iter_iter(): |
2875 | + if chunked: |
2876 | + headers['Transfer-Encoding'] = 'chunked' |
2877 | + if segment_number == 0: |
2878 | + headers['X-Object-Segment-If-Length'] = \ |
2879 | + self.app.segment_size |
2880 | + elif amount_left > self.app.segment_size: |
2881 | + headers['Content-Length'] = self.app.segment_size |
2882 | + else: |
2883 | + headers['Content-Length'] = amount_left |
2884 | + headers['X-Object-Segment'] = segment_number |
2885 | + segment_req = Request.blank(req.path_info, |
2886 | + environ={'REQUEST_METHOD': 'PUT'}, headers=headers) |
2887 | + if 'X-Object-Segment-If-Length' in headers: |
2888 | + del headers['X-Object-Segment-If-Length'] |
2889 | + if segment_number: |
2890 | + ring_object_name = '%s/%s/%s' % (self.object_name, |
2891 | + req.headers['x-timestamp'], segment_number) |
2892 | + else: |
2893 | + ring_object_name = self.object_name |
2894 | + segment_partition, segment_nodes = self.app.object_ring.get_nodes( |
2895 | + self.account_name, self.container_name, ring_object_name) |
2896 | + resp = self.PUT_whole_object(segment_req, segment_source, |
2897 | + segment_partition, segment_nodes) |
2898 | + if resp.status_int // 100 == 4: |
2899 | + return resp |
2900 | + elif resp.status_int // 100 != 2: |
2901 | + return HTTPServiceUnavailable(request=req, |
2902 | + body='Unable to complete very large file operation.') |
2903 | + if segment_number == 0 and \ |
2904 | + req.bytes_transferred < self.app.segment_size: |
2905 | + return HTTPCreated(request=req, etag=etag.hexdigest()) |
2906 | + if not chunked: |
2907 | + amount_left -= self.app.segment_size |
2908 | + segment_number += 1 |
2909 | + etag = etag.hexdigest() |
2910 | + if 'etag' in req.headers and req.headers['etag'].lower() != etag: |
2911 | + return HTTPUnprocessableEntity(request=req) |
2912 | + manifest = {'x-timestamp': req.headers['x-timestamp'], |
2913 | + 'content-length': req.bytes_transferred, |
2914 | + 'content-type': req.headers['content-type'], |
2915 | + 'x-segment-size': self.app.segment_size, |
2916 | + 'etag': etag} |
2917 | + if 'content-encoding' in req.headers: |
2918 | + manifest['content-encoding'] = req.headers['content-encoding'] |
2919 | + manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL) |
2920 | + headers = {'X-Timestamp': req.headers['X-Timestamp'], |
2921 | + 'Content-Type': req.headers['content-type'], |
2922 | + 'Content-Length': len(manifest), |
2923 | + 'X-Object-Type': 'manifest', |
2924 | + 'X-Object-Length': req.bytes_transferred} |
2925 | + headers.update(i for i in req.headers.iteritems() |
2926 | + if i[0].lower().startswith('x-object-meta-') and len(i[0]) > 14) |
2927 | + manifest_req = Request.blank(req.path_info, |
2928 | + environ={'REQUEST_METHOD': 'PUT'}, body=manifest, headers=headers) |
2929 | + manifest_source = iter(lambda: |
2930 | + manifest_req.body_file.read(self.app.client_chunk_size), '') |
2931 | + resp = self.PUT_whole_object(manifest_req, manifest_source, partition, |
2932 | + nodes, container_partition=container_partition, |
2933 | + containers=containers) |
2934 | + if resp.status_int // 100 != 2: |
2935 | + return HTTPServiceUnavailable(request=req, |
2936 | + body='Unable to complete very large file operation.') |
2937 | + return HTTPCreated(request=req, etag=etag) |
2938 | |
2939 | @public |
2940 | @delay_denial |
2941 | @@ -1233,6 +1596,8 @@ |
2942 | if conf is None: |
2943 | conf = {} |
2944 | swift_dir = conf.get('swift_dir', '/etc/swift') |
2945 | + self.max_object_size = int(conf.get('max_object_size', 107374182400)) |
2946 | + self.segment_size = int(conf.get('segment_size', 2147483647)) |
2947 | self.node_timeout = int(conf.get('node_timeout', 10)) |
2948 | self.conn_timeout = float(conf.get('conn_timeout', 0.5)) |
2949 | self.client_timeout = int(conf.get('client_timeout', 60)) |
2950 | |
2951 | === modified file 'test/functional/sample.conf' |
2952 | --- test/functional/sample.conf 2010-09-09 17:24:25 +0000 |
2953 | +++ test/functional/sample.conf 2010-11-08 18:51:48 +0000 |
2954 | @@ -1,8 +1,15 @@ |
2955 | -# sample config |
2956 | auth_host = 127.0.0.1 |
2957 | auth_port = 11000 |
2958 | auth_ssl = no |
2959 | |
2960 | +# The maximum object size the cluster allows (set in the proxy server's conf) |
2961 | +max_object_size = 107374182400 |
2962 | + |
2963 | +# The file segment size for the cluster (set in the proxy server's conf) |
2964 | +# Set to 0 for no segment size testing (recommended if the segment size is |
2965 | +# quite large and you don't want to spend the time testing it) |
2966 | +segment_size = 0 |
2967 | + |
2968 | # Primary functional test account (needs admin access to the account) |
2969 | account = test |
2970 | username = tester |
2971 | |
2972 | === modified file 'test/functional/tests.py' |
2973 | --- test/functional/tests.py 2010-10-29 20:30:34 +0000 |
2974 | +++ test/functional/tests.py 2010-11-08 18:51:48 +0000 |
2975 | @@ -1092,7 +1092,7 @@ |
2976 | self.assert_(file.read(hdrs={'Range': r}) == data[0:1000]) |
2977 | |
2978 | def testFileSizeLimit(self): |
2979 | - limit = 5*2**30 + 2 |
2980 | + limit = int(config.get('max_object_size', 107374182400)) |
2981 | tsecs = 3 |
2982 | |
2983 | for i in (limit-100, limit-10, limit-1, limit, limit+1, limit+10, |
2984 | |
2985 | === modified file 'test/probe/common.py' |
2986 | --- test/probe/common.py 2010-09-12 00:03:09 +0000 |
2987 | +++ test/probe/common.py 2010-11-08 18:51:48 +0000 |
2988 | @@ -88,7 +88,7 @@ |
2989 | for p in ps: |
2990 | p.wait() |
2991 | ps = [] |
2992 | - for job in ('container-updater', 'object-updater'): |
2993 | + for job in ('container-updater', 'object-janitor'): |
2994 | for n in xrange(1, 5): |
2995 | ps.append(Popen(['swift-%s' % job, |
2996 | '/etc/swift/%s-server/%d.conf' % |
2997 | |
2998 | === modified file 'test/probe/test_object_async_update.py' |
2999 | --- test/probe/test_object_async_update.py 2010-09-06 04:06:16 +0000 |
3000 | +++ test/probe/test_object_async_update.py 2010-11-08 18:51:48 +0000 |
3001 | @@ -55,7 +55,7 @@ |
3002 | self.account, container)[1]) |
3003 | ps = [] |
3004 | for n in xrange(1, 5): |
3005 | - ps.append(Popen(['swift-object-updater', |
3006 | + ps.append(Popen(['swift-object-janitor', |
3007 | '/etc/swift/object-server/%d.conf' % n, 'once'])) |
3008 | for p in ps: |
3009 | p.wait() |
3010 | |
3011 | === modified file 'test/unit/common/test_constraints.py' |
3012 | --- test/unit/common/test_constraints.py 2010-08-16 22:30:27 +0000 |
3013 | +++ test/unit/common/test_constraints.py 2010-11-08 18:51:48 +0000 |
3014 | @@ -90,22 +90,20 @@ |
3015 | headers=headers), 'object'), HTTPBadRequest)) |
3016 | |
3017 | def test_check_object_creation_content_length(self): |
3018 | - headers = {'Content-Length': str(constraints.MAX_FILE_SIZE), |
3019 | - 'Content-Type': 'text/plain'} |
3020 | + headers = {'Content-Length': '1024', 'Content-Type': 'text/plain'} |
3021 | self.assertEquals(constraints.check_object_creation(Request.blank('/', |
3022 | - headers=headers), 'object_name'), None) |
3023 | - headers = {'Content-Length': str(constraints.MAX_FILE_SIZE + 1), |
3024 | - 'Content-Type': 'text/plain'} |
3025 | + headers=headers), 'object_name', 1024), None) |
3026 | + headers = {'Content-Length': '1025', 'Content-Type': 'text/plain'} |
3027 | self.assert_(isinstance(constraints.check_object_creation( |
3028 | - Request.blank('/', headers=headers), 'object_name'), |
3029 | + Request.blank('/', headers=headers), 'object_name', 1024), |
3030 | HTTPRequestEntityTooLarge)) |
3031 | headers = {'Transfer-Encoding': 'chunked', |
3032 | 'Content-Type': 'text/plain'} |
3033 | self.assertEquals(constraints.check_object_creation(Request.blank('/', |
3034 | - headers=headers), 'object_name'), None) |
3035 | + headers=headers), 'object_name', 1024), None) |
3036 | headers = {'Content-Type': 'text/plain'} |
3037 | self.assert_(isinstance(constraints.check_object_creation( |
3038 | - Request.blank('/', headers=headers), 'object_name'), |
3039 | + Request.blank('/', headers=headers), 'object_name', 1024), |
3040 | HTTPLengthRequired)) |
3041 | |
3042 | def test_check_object_creation_name_length(self): |
3043 | |
3044 | === added file 'test/unit/obj/test_diskfile.py' |
3045 | --- test/unit/obj/test_diskfile.py 1970-01-01 00:00:00 +0000 |
3046 | +++ test/unit/obj/test_diskfile.py 2010-11-08 18:51:48 +0000 |
3047 | @@ -0,0 +1,203 @@ |
3048 | +# Copyright (c) 2010 OpenStack, LLC. |
3049 | +# |
3050 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
3051 | +# you may not use this file except in compliance with the License. |
3052 | +# You may obtain a copy of the License at |
3053 | +# |
3054 | +# http://www.apache.org/licenses/LICENSE-2.0 |
3055 | +# |
3056 | +# Unless required by applicable law or agreed to in writing, software |
3057 | +# distributed under the License is distributed on an "AS IS" BASIS, |
3058 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
3059 | +# implied. |
3060 | +# See the License for the specific language governing permissions and |
3061 | +# limitations under the License. |
3062 | + |
3063 | +import cPickle as pickle |
3064 | +import os |
3065 | +import sys |
3066 | +import unittest |
3067 | +from nose import SkipTest |
3068 | +from shutil import rmtree |
3069 | +from StringIO import StringIO |
3070 | +from time import time |
3071 | + |
3072 | +from xattr import setxattr |
3073 | + |
3074 | +from swift.common.constraints import PICKLE_PROTOCOL |
3075 | +from swift.common.utils import mkdirs, normalize_timestamp |
3076 | +from swift.obj.diskfile import DiskFile, hash_suffix, JANITORDIR, \ |
3077 | + METADATA_KEY |
3078 | + |
3079 | + |
3080 | +class TestDiskFile(unittest.TestCase): |
3081 | + """ Test swift.obj.diskfile """ |
3082 | + |
3083 | + def setUp(self): |
3084 | + """ Set up for testing swift.obj.diskfile """ |
3085 | + self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS') |
3086 | + if not self.path_to_test_xfs or \ |
3087 | + not os.path.exists(self.path_to_test_xfs): |
3088 | + print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ |
3089 | + 'pointing to a valid directory.\n' \ |
3090 | + 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \ |
3091 | + 'system for testing.' |
3092 | + self.testdir = '/tmp/SWIFTUNITTEST' |
3093 | + else: |
3094 | + self.testdir = os.path.join(self.path_to_test_xfs, |
3095 | + 'tmp_test_obj_diskfile') |
3096 | + mkdirs(self.testdir) |
3097 | + rmtree(self.testdir) |
3098 | + mkdirs(os.path.join(self.testdir, 'sda1')) |
3099 | + mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) |
3100 | + |
3101 | + def tearDown(self): |
3102 | + """ Tear down for testing swift.obj.diskfile """ |
3103 | + rmtree(self.testdir) |
3104 | + |
3105 | + def test_disk_file_app_iter_corners(self): |
3106 | + if not self.path_to_test_xfs: |
3107 | + raise SkipTest |
3108 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3109 | + mkdirs(df.datadir) |
3110 | + f = open(os.path.join(df.datadir, |
3111 | + normalize_timestamp(time()) + '.data'), 'wb') |
3112 | + f.write('1234567890') |
3113 | + setxattr(f.fileno(), METADATA_KEY, pickle.dumps({}, PICKLE_PROTOCOL)) |
3114 | + f.close() |
3115 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', |
3116 | + keep_data_fp=True) |
3117 | + it = df.app_iter_range(0, None) |
3118 | + sio = StringIO() |
3119 | + for chunk in it: |
3120 | + sio.write(chunk) |
3121 | + self.assertEquals(sio.getvalue(), '1234567890') |
3122 | + |
3123 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', |
3124 | + keep_data_fp=True) |
3125 | + it = df.app_iter_range(5, None) |
3126 | + sio = StringIO() |
3127 | + for chunk in it: |
3128 | + sio.write(chunk) |
3129 | + self.assertEquals(sio.getvalue(), '67890') |
3130 | + |
3131 | + def test_disk_file_mkstemp_creates_dir(self): |
3132 | + tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') |
3133 | + os.rmdir(tmpdir) |
3134 | + with DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp(): |
3135 | + self.assert_(os.path.exists(tmpdir)) |
3136 | + |
3137 | + def test_hash_suffix_creates_janitor_jobs(self): |
3138 | + if not self.path_to_test_xfs: |
3139 | + raise SkipTest |
3140 | + # Ensure the janitor job we expect to create is not there right now. |
3141 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3142 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3143 | + self.assert_(df.is_deleted()) |
3144 | + # Set up manifest file to be tombstoned |
3145 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3146 | + with df.mkstemp() as (fd, tmppath): |
3147 | + os.write(fd, pickle.dumps({'x-timestamp': normalize_timestamp(1), |
3148 | + 'content-length': 1234, 'content-type': 'text/plain', |
3149 | + 'x-segment-size': 123, |
3150 | + 'etag': 'd41d8cd98f00b204e9800998ecf8427e'}, |
3151 | + protocol=PICKLE_PROTOCOL)) |
3152 | + df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(2), |
3153 | + 'X-Object-Type': 'manifest'}) |
3154 | + # Make tombstone DiskFile didn't create, such as one rsynced over. |
3155 | + open(os.path.join(df.datadir, normalize_timestamp(3) + '.ts'), 'wb') |
3156 | + # Finally, we call what we want to test. |
3157 | + hash_suffix(os.path.dirname(df.datadir), 604800) |
3158 | + # Ensure the janitor job got created. |
3159 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3160 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3161 | + self.assert_(not df.is_deleted()) |
3162 | + |
3163 | + def test_segment_info_overrides_datadir(self): |
3164 | + if not self.path_to_test_xfs: |
3165 | + raise SkipTest |
3166 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3167 | + datadir1 = df.datadir.split('/')[6] |
3168 | + df = DiskFile(self.testdir, 'sda1', '1', 'a', 'c', 'o') |
3169 | + datadir2 = df.datadir.split('/')[6] |
3170 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', segment=0) |
3171 | + datadir3 = df.datadir.split('/')[6] |
3172 | + self.assertEquals(datadir1, datadir2) |
3173 | + self.assertNotEquals(datadir1, datadir3) |
3174 | + |
3175 | + def test_no_longer_segment(self): |
3176 | + if not self.path_to_test_xfs: |
3177 | + raise SkipTest |
3178 | + # Normal case |
3179 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3180 | + datadir1 = df.datadir |
3181 | + with df.mkstemp() as (fd, tmppath): |
3182 | + df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)}) |
3183 | + self.assert_(os.path.exists(datadir1)) |
3184 | + # Normal case with no_longer_segment (doesn't make sense, but shouldn't |
3185 | + # blow up) |
3186 | + df = DiskFile(self.testdir, 'sda1', '1', 'a', 'c', 'o') |
3187 | + datadir1 = df.datadir |
3188 | + with df.mkstemp() as (fd, tmppath): |
3189 | + df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)}, |
3190 | + no_longer_segment=True) |
3191 | + self.assert_(os.path.exists(datadir1)) |
3192 | + # Segment case |
3193 | + df = DiskFile(self.testdir, 'sda1', '2', 'a', 'c', 'o', segment=0) |
3194 | + datadir1 = df.datadir |
3195 | + with df.mkstemp() as (fd, tmppath): |
3196 | + df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)}) |
3197 | + self.assert_(os.path.exists(datadir1)) |
3198 | + # Segment case with no_longer_segment |
3199 | + df = DiskFile(self.testdir, 'sda1', '3', 'a', 'c', 'o') |
3200 | + normal_datadir1 = df.datadir |
3201 | + self.assert_(not os.path.exists(normal_datadir1)) |
3202 | + df = DiskFile(self.testdir, 'sda1', '3', 'a', 'c', 'o', segment=0) |
3203 | + datadir1 = df.datadir |
3204 | + with df.mkstemp() as (fd, tmppath): |
3205 | + df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)}, |
3206 | + no_longer_segment=True) |
3207 | + self.assert_(not os.path.exists(datadir1)) |
3208 | + self.assert_(os.path.exists(normal_datadir1)) |
3209 | + |
3210 | + def test_tombstone(self): |
3211 | + if not self.path_to_test_xfs: |
3212 | + raise SkipTest |
3213 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3214 | + with df.mkstemp() as (fd, tmppath): |
3215 | + df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)}) |
3216 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3217 | + self.assert_(not df.is_deleted()) |
3218 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3219 | + df.tombstone(normalize_timestamp(2)) |
3220 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3221 | + self.assert_(df.is_deleted()) |
3222 | + |
3223 | + def test_store_janitor_container_update(self): |
3224 | + if not self.path_to_test_xfs: |
3225 | + raise SkipTest |
3226 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', |
3227 | + datadir=JANITORDIR) |
3228 | + df.store_janitor_container_update('PUT', 'a', 'c', 'o', |
3229 | + {'X-Timestamp': normalize_timestamp(1)}, []) |
3230 | + df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', |
3231 | + datadir=JANITORDIR) |
3232 | + self.assertEquals(pickle.load(open(df.data_file, 'rb')), |
3233 | + {'op': 'PUT', 'account': 'a', 'container': 'c', 'obj': 'o', |
3234 | + 'headers': {'X-Timestamp': '0000000001.00000'}, 'successes': []}) |
3235 | + |
3236 | + def test_store_janitor_segment_cleanup(self): |
3237 | + if not self.path_to_test_xfs: |
3238 | + raise SkipTest |
3239 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3240 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3241 | + df.store_janitor_segment_cleanup('a', 'c', 'o', 123, 45) |
3242 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3243 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3244 | + self.assertEquals(pickle.load(open(df.data_file, 'rb')), |
3245 | + {'account': 'a', 'container': 'c', 'obj': 'o', |
3246 | + 'segment_count': 123, 'segment_last_deleted': 45}) |
3247 | + |
3248 | + |
3249 | +if __name__ == '__main__': |
3250 | + unittest.main() |
3251 | |
3252 | === added file 'test/unit/obj/test_janitor.py' |
3253 | --- test/unit/obj/test_janitor.py 1970-01-01 00:00:00 +0000 |
3254 | +++ test/unit/obj/test_janitor.py 2010-11-08 18:51:48 +0000 |
3255 | @@ -0,0 +1,433 @@ |
3256 | +# Copyright (c) 2010 OpenStack, LLC. |
3257 | +# |
3258 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
3259 | +# you may not use this file except in compliance with the License. |
3260 | +# You may obtain a copy of the License at |
3261 | +# |
3262 | +# http://www.apache.org/licenses/LICENSE-2.0 |
3263 | +# |
3264 | +# Unless required by applicable law or agreed to in writing, software |
3265 | +# distributed under the License is distributed on an "AS IS" BASIS, |
3266 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
3267 | +# implied. |
3268 | +# See the License for the specific language governing permissions and |
3269 | +# limitations under the License. |
3270 | + |
3271 | +import cPickle as pickle |
3272 | +import os |
3273 | +import sys |
3274 | +import traceback |
3275 | +import unittest |
3276 | +from gzip import GzipFile |
3277 | +from nose import SkipTest |
3278 | +from shutil import rmtree |
3279 | +from time import time |
3280 | + |
3281 | +from eventlet import spawn, TimeoutError, listen |
3282 | +from eventlet.timeout import Timeout |
3283 | + |
3284 | +from swift.common.constraints import PICKLE_PROTOCOL |
3285 | +from swift.common.ring import Ring, RingData |
3286 | +from swift.common import utils |
3287 | +from swift.common.utils import hash_path, normalize_timestamp, mkdirs |
3288 | +from swift.obj import janitor as object_janitor |
3289 | +from swift.obj.diskfile import DiskFile, JANITORDIR |
3290 | + |
3291 | + |
3292 | +class TestObjectJanitor(unittest.TestCase): |
3293 | + |
3294 | + def setUp(self): |
3295 | + utils.HASH_PATH_SUFFIX = 'endcap' |
3296 | + self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS') |
3297 | + if not self.path_to_test_xfs or \ |
3298 | + not os.path.exists(self.path_to_test_xfs): |
3299 | + print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ |
3300 | + 'pointing to a valid directory.\n' \ |
3301 | + 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \ |
3302 | + 'system for testing.' |
3303 | + self.testdir = '/tmp/SWIFTUNITTEST' |
3304 | + else: |
3305 | + self.testdir = os.path.join(self.path_to_test_xfs, |
3306 | + 'tmp_test_object_server_ObjectController') |
3307 | + rmtree(self.testdir, ignore_errors=1) |
3308 | + os.mkdir(self.testdir) |
3309 | + pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]], |
3310 | + [{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1', |
3311 | + 'zone': 0}, |
3312 | + {'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1', |
3313 | + 'zone': 2}], 30), |
3314 | + GzipFile(os.path.join(self.testdir, 'object.ring.gz'), 'wb')) |
3315 | + self.object_ring = Ring(os.path.join(self.testdir, 'object.ring.gz')) |
3316 | + pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]], |
3317 | + [{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1', |
3318 | + 'zone': 0}, |
3319 | + {'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1', |
3320 | + 'zone': 2}], 30), |
3321 | + GzipFile(os.path.join(self.testdir, 'container.ring.gz'), 'wb')) |
3322 | + self.container_ring = \ |
3323 | + Ring(os.path.join(self.testdir, 'container.ring.gz')) |
3324 | + self.devices_dir = os.path.join(self.testdir, 'devices') |
3325 | + os.mkdir(self.devices_dir) |
3326 | + self.sda1 = os.path.join(self.devices_dir, 'sda1') |
3327 | + os.mkdir(self.sda1) |
3328 | + os.mkdir(os.path.join(self.sda1, 'tmp')) |
3329 | + |
3330 | + def tearDown(self): |
3331 | + rmtree(self.testdir, ignore_errors=1) |
3332 | + |
3333 | + def test_creation(self): |
3334 | + if not self.path_to_test_xfs: |
3335 | + raise SkipTest |
3336 | + janitor = object_janitor.ObjectJanitor({ |
3337 | + 'devices': self.devices_dir, |
3338 | + 'mount_check': 'false', |
3339 | + 'swift_dir': self.testdir, |
3340 | + 'interval': '1', |
3341 | + 'concurrency': '2', |
3342 | + 'node_timeout': '5', |
3343 | + }) |
3344 | + self.assert_(hasattr(janitor, 'logger')) |
3345 | + self.assert_(janitor.logger is not None) |
3346 | + self.assertEquals(janitor.devices, self.devices_dir) |
3347 | + self.assertEquals(janitor.interval, 1) |
3348 | + self.assertEquals(janitor.concurrency, 2) |
3349 | + self.assertEquals(janitor.node_timeout, 5) |
3350 | + self.assert_(janitor.get_container_ring() is not None) |
3351 | + |
3352 | + def test_run_once_container_update(self): |
3353 | + if not self.path_to_test_xfs: |
3354 | + raise SkipTest |
3355 | + janitor = object_janitor.ObjectJanitor({ |
3356 | + 'devices': self.devices_dir, |
3357 | + 'mount_check': 'false', |
3358 | + 'swift_dir': self.testdir, |
3359 | + 'interval': '1', |
3360 | + 'concurrency': '1', |
3361 | + 'node_timeout': '15', |
3362 | + }) |
3363 | + janitor.run_once() |
3364 | + janitor_dir = os.path.join(self.sda1, JANITORDIR) |
3365 | + os.mkdir(janitor_dir) |
3366 | + janitor.run_once() |
3367 | + self.assert_(os.path.exists(janitor_dir)) |
3368 | + |
3369 | + disk_file = DiskFile(self.devices_dir, 'sda1', |
3370 | + str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c', |
3371 | + 'o', datadir=JANITORDIR) |
3372 | + ts = normalize_timestamp(1) |
3373 | + with disk_file.mkstemp() as (fd, tmppath): |
3374 | + os.write(fd, pickle.dumps({'op': 'PUT', |
3375 | + 'account': 'a', 'container': 'c', 'obj': 'o', |
3376 | + 'headers': {'X-Container-Timestamp': ts}}, |
3377 | + PICKLE_PROTOCOL)) |
3378 | + disk_file.put(fd, tmppath, |
3379 | + {'X-Op': 'Container-Update', 'X-Timestamp': ts}) |
3380 | + janitor.run_once() |
3381 | + self.assert_(os.path.exists(os.path.join(disk_file.datadir, |
3382 | + ts + '.data'))) |
3383 | + |
3384 | + bindsock = listen(('127.0.0.1', 0)) |
3385 | + |
3386 | + def accepter(sock, return_code): |
3387 | + try: |
3388 | + with Timeout(3): |
3389 | + inc = sock.makefile('rb') |
3390 | + out = sock.makefile('wb') |
3391 | + out.write('HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' % |
3392 | + return_code) |
3393 | + out.flush() |
3394 | + self.assertEquals(inc.readline(), |
3395 | + 'PUT /sda1/0/a/c/o HTTP/1.1\r\n') |
3396 | + headers = {} |
3397 | + line = inc.readline() |
3398 | + while line and line != '\r\n': |
3399 | + headers[line.split(':')[0].lower()] = \ |
3400 | + line.split(':')[1].strip() |
3401 | + line = inc.readline() |
3402 | + self.assert_('x-container-timestamp' in headers) |
3403 | + except BaseException, err: |
3404 | + return err |
3405 | + return None |
3406 | + |
3407 | + def accept(return_codes): |
3408 | + codes = iter(return_codes) |
3409 | + try: |
3410 | + events = [] |
3411 | + for x in xrange(len(return_codes)): |
3412 | + with Timeout(3): |
3413 | + sock, addr = bindsock.accept() |
3414 | + events.append( |
3415 | + spawn(accepter, sock, codes.next())) |
3416 | + for event in events: |
3417 | + err = event.wait() |
3418 | + if err: |
3419 | + raise err |
3420 | + except BaseException, err: |
3421 | + return err |
3422 | + return None |
3423 | + |
3424 | + event = spawn(accept, [201, 500]) |
3425 | + for dev in janitor.get_container_ring().devs: |
3426 | + if dev is not None: |
3427 | + dev['port'] = bindsock.getsockname()[1] |
3428 | + janitor.run_once() |
3429 | + err = event.wait() |
3430 | + if err: |
3431 | + raise err |
3432 | + disk_file = DiskFile(self.devices_dir, 'sda1', |
3433 | + str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c', |
3434 | + 'o', datadir=JANITORDIR) |
3435 | + self.assert_(not disk_file.is_deleted()) |
3436 | + event = spawn(accept, [201]) |
3437 | + janitor.run_once() |
3438 | + err = event.wait() |
3439 | + if err: |
3440 | + raise err |
3441 | + disk_file = DiskFile(self.devices_dir, 'sda1', |
3442 | + str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c', |
3443 | + 'o', datadir=JANITORDIR) |
3444 | + self.assert_(disk_file.is_deleted()) |
3445 | + |
3446 | + def _segment_cleanup_in_progress_helper(self, statuses, expect_success): |
3447 | + if not self.path_to_test_xfs: |
3448 | + raise SkipTest |
3449 | + janitor = object_janitor.ObjectJanitor({'devices': self.devices_dir, |
3450 | + 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1', |
3451 | + 'concurrency': '1', 'node_timeout': '15'}) |
3452 | + |
3453 | + # Quick test of connection refusals |
3454 | + df = DiskFile(self.devices_dir, 'sda1', |
3455 | + str(self.object_ring.get_nodes('a', 'c', 'o')[0]), |
3456 | + 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o', |
3457 | + datadir=JANITORDIR) |
3458 | + df.store_janitor_segment_cleanup('a', 'c', 'o', 2, 0) |
3459 | + janitor.run_once() |
3460 | + df = DiskFile(self.devices_dir, 'sda1', |
3461 | + str(self.object_ring.get_nodes('a', 'c', 'o')[0]), |
3462 | + 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o', |
3463 | + datadir=JANITORDIR) |
3464 | + self.assert_(not df.is_deleted()) |
3465 | + |
3466 | + bindsock = listen(('127.0.0.1', 0)) |
3467 | + janitor.port = bindsock.getsockname()[1] |
3468 | + |
3469 | + def accepter(sock, return_code): |
3470 | + try: |
3471 | + with Timeout(3): |
3472 | + inc = sock.makefile('rb') |
3473 | + out = sock.makefile('wb') |
3474 | + out.write('HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' % |
3475 | + return_code) |
3476 | + out.flush() |
3477 | + self.assertEquals(inc.readline(), |
3478 | + 'DELETE /sda1/2/a/c/o HTTP/1.1\r\n') |
3479 | + headers = {} |
3480 | + line = inc.readline() |
3481 | + while line and line != '\r\n': |
3482 | + headers[line.split(':')[0].lower()] = \ |
3483 | + line.split(':')[1].strip() |
3484 | + line = inc.readline() |
3485 | + self.assert_('x-object-segment-timestamp' in headers) |
3486 | + self.assertEquals(headers.get('x-object-segment'), '1') |
3487 | + except BaseException, err: |
3488 | + return err |
3489 | + return None |
3490 | + |
3491 | + def accept(return_codes): |
3492 | + codes = iter(return_codes) |
3493 | + try: |
3494 | + events = [] |
3495 | + for x in xrange(len(return_codes)): |
3496 | + with Timeout(3): |
3497 | + sock, addr = bindsock.accept() |
3498 | + events.append( |
3499 | + spawn(accepter, sock, codes.next())) |
3500 | + for event in events: |
3501 | + err = event.wait() |
3502 | + if err: |
3503 | + raise err |
3504 | + except BaseException, err: |
3505 | + return err |
3506 | + return None |
3507 | + |
3508 | + event = spawn(accept, statuses) |
3509 | + for dev in janitor.get_object_ring().devs: |
3510 | + if dev is not None: |
3511 | + dev['port'] = bindsock.getsockname()[1] |
3512 | + janitor.run_once() |
3513 | + err = event.wait() |
3514 | + if err: |
3515 | + raise err |
3516 | + df = DiskFile(self.devices_dir, 'sda1', |
3517 | + str(self.object_ring.get_nodes('a', 'c', 'o')[0]), |
3518 | + 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o', |
3519 | + datadir=JANITORDIR) |
3520 | + self.assertEquals(df.is_deleted(), expect_success) |
3521 | + |
3522 | + def test_segment_cleanup_in_progress_happy_path(self): |
3523 | + self._segment_cleanup_in_progress_helper([204, 204], True) |
3524 | + |
3525 | + def test_segment_cleanup_in_progress_one_failure(self): |
3526 | + self._segment_cleanup_in_progress_helper([204, 500], True) |
3527 | + |
3528 | + def test_segment_cleanup_in_progress_all_failures(self): |
3529 | + self._segment_cleanup_in_progress_helper([500, 500], False) |
3530 | + |
3531 | + def test_segment_cleanup_in_progress_all_not_found(self): |
3532 | + self._segment_cleanup_in_progress_helper([404, 404], True) |
3533 | + |
3534 | + def test_segment_cleanup_in_progress_one_not_found_one_success(self): |
3535 | + self._segment_cleanup_in_progress_helper([404, 204], True) |
3536 | + |
3537 | + def test_segment_cleanup_in_progress_one_not_found_one_failure(self): |
3538 | + self._segment_cleanup_in_progress_helper([404, 500], False) |
3539 | + |
3540 | + def _segment_cleanup_fresh_start_helper(self, cleanup_timestamp, |
3541 | + existing_timestamp, statuses, expect_success): |
3542 | + if not self.path_to_test_xfs: |
3543 | + raise SkipTest |
3544 | + janitor = object_janitor.ObjectJanitor({'devices': self.devices_dir, |
3545 | + 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1', |
3546 | + 'concurrency': '1', 'node_timeout': '15', 'segments_per_pass': 2}) |
3547 | + |
3548 | + # Quick test of connection refusals |
3549 | + df = DiskFile(self.devices_dir, 'sda1', |
3550 | + str(self.object_ring.get_nodes('a', 'c', 'o')[0]), |
3551 | + 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o', |
3552 | + datadir=JANITORDIR) |
3553 | + df.store_janitor_segment_cleanup('a', 'c', 'o', None, None) |
3554 | + janitor.run_once() |
3555 | + df = DiskFile(self.devices_dir, 'sda1', |
3556 | + str(self.object_ring.get_nodes('a', 'c', 'o')[0]), |
3557 | + 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o', |
3558 | + datadir=JANITORDIR) |
3559 | + self.assert_(not df.is_deleted()) |
3560 | + |
3561 | + bindsock = listen(('127.0.0.1', 0)) |
3562 | + janitor.port = bindsock.getsockname()[1] |
3563 | + |
3564 | + def accepter(sock, return_code): |
3565 | + try: |
3566 | + with Timeout(3): |
3567 | + inc = sock.makefile('rb') |
3568 | + out = sock.makefile('wb') |
3569 | + request = inc.readline() |
3570 | + if request.startswith('GET '): |
3571 | + self.assert_(request, 'GET /sda1/0/a/c/o HTTP/1.1\r\n') |
3572 | + if return_code == 200: |
3573 | + pickl = pickle.dumps({'x-timestamp': |
3574 | + normalize_timestamp(existing_timestamp)}) |
3575 | + out.write('HTTP/1.1 %d OK\r\nContent-Length: ' |
3576 | + '%s\r\nX-Object-Type: manifest\r\n\r\n' % |
3577 | + (return_code, len(pickl))) |
3578 | + out.write(pickl) |
3579 | + out.flush() |
3580 | + else: |
3581 | + out.write('HTTP/1.1 %d OK\r\nContent-Length: ' |
3582 | + '0\r\n\r\n' % return_code) |
3583 | + out.flush() |
3584 | + return None |
3585 | + if request.startswith('HEAD '): |
3586 | + self.assert_(request, |
3587 | + 'HEAD /sda1/2/a/c/o HTTP/1.1\r\n') |
3588 | + out.write('HTTP/1.1 %d OK\r\nContent-Length: ' |
3589 | + '0\r\n\r\n' % return_code) |
3590 | + out.flush() |
3591 | + return None |
3592 | + else: |
3593 | + self.assert_(request, |
3594 | + 'DELETE /sda1/2/a/c/o HTTP/1.1\r\n') |
3595 | + out.write( |
3596 | + 'HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' % |
3597 | + return_code) |
3598 | + out.flush() |
3599 | + headers = {} |
3600 | + line = inc.readline() |
3601 | + while line and line != '\r\n': |
3602 | + headers[line.split(':')[0].lower()] = \ |
3603 | + line.split(':')[1].strip() |
3604 | + line = inc.readline() |
3605 | + self.assert_('x-object-segment' in headers) |
3606 | + self.assert_('x-object-segment-timestamp' in headers) |
3607 | + except BaseException, err: |
3608 | + traceback.print_exc() |
3609 | + return err |
3610 | + return None |
3611 | + |
3612 | + def accept(return_codes): |
3613 | + codes = iter(return_codes) |
3614 | + try: |
3615 | + events = [] |
3616 | + for x in xrange(len(return_codes)): |
3617 | + with Timeout(3): |
3618 | + sock, addr = bindsock.accept() |
3619 | + events.append( |
3620 | + spawn(accepter, sock, codes.next())) |
3621 | + for event in events: |
3622 | + err = event.wait() |
3623 | + if err: |
3624 | + raise err |
3625 | + except BaseException, err: |
3626 | + return err |
3627 | + return None |
3628 | + |
3629 | + event = spawn(accept, statuses) |
3630 | + for dev in janitor.get_object_ring().devs: |
3631 | + if dev is not None: |
3632 | + dev['port'] = bindsock.getsockname()[1] |
3633 | + janitor.run_once() |
3634 | + err = event.wait() |
3635 | + if err: |
3636 | + raise err |
3637 | + df = DiskFile(self.devices_dir, 'sda1', |
3638 | + str(self.object_ring.get_nodes('a', 'c', 'o')[0]), |
3639 | + 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o', |
3640 | + datadir=JANITORDIR) |
3641 | + self.assertEquals(df.is_deleted(), expect_success) |
3642 | + |
3643 | + def test_segment_cleanup_fresh_start_happy_path(self): |
3644 | + self._segment_cleanup_fresh_start_helper(normalize_timestamp(1), |
3645 | + normalize_timestamp(1), |
3646 | + [404, 404, # Check for manifest |
3647 | + 200, 200, 404, 404, # Check for segments (2 total) |
3648 | + 204, 204, 204, 204], # Delete segments |
3649 | + True) |
3650 | + |
3651 | + def test_segment_cleanup_fresh_start_manifest_exists(self): |
3652 | + t = time() |
3653 | + self._segment_cleanup_fresh_start_helper(normalize_timestamp(t), |
3654 | + normalize_timestamp(t), [200, 200], True) |
3655 | + |
3656 | + def test_segment_cleanup_fresh_start_old_manifest_exists(self): |
3657 | + self._segment_cleanup_fresh_start_helper(normalize_timestamp(2), |
3658 | + normalize_timestamp(1), |
3659 | + [200, 200, # Check for manifest |
3660 | + 200, 200, 404, 404, # Check for segments (2 total) |
3661 | + 204, 204, 204, 204], # Delete segments |
3662 | + True) |
3663 | + |
3664 | + def test_segment_cleanup_fresh_start_old_manifest_exists2(self): |
3665 | + self._segment_cleanup_fresh_start_helper(normalize_timestamp(time()), |
3666 | + normalize_timestamp(1), [200, 200], False) |
3667 | + |
3668 | + def test_segment_cleanup_fresh_start_new_manifest_exists(self): |
3669 | + t = time() |
3670 | + self._segment_cleanup_fresh_start_helper(normalize_timestamp(t - 1), |
3671 | + normalize_timestamp(t), |
3672 | + [200, # Check for manifest |
3673 | + 200, 200, 404, 404, # Check for segments (2 total) |
3674 | + 204, 204, 204, 204], # Delete segments |
3675 | + True) |
3676 | + |
3677 | + def test_segment_cleanup_fresh_start_stops_at_segments_per_pass(self): |
3678 | + self._segment_cleanup_fresh_start_helper(normalize_timestamp(1), |
3679 | + normalize_timestamp(1), |
3680 | + [404, 404, # Check for manifest |
3681 | + 200, 200, 200, 404, 404, # Check for segments (3 total) |
3682 | + 204, 204, 204, 204], # Delete segments (only 2 expected) |
3683 | + False) |
3684 | + |
3685 | + |
3686 | + |
3687 | +if __name__ == '__main__': |
3688 | + unittest.main() |
3689 | |
3690 | === modified file 'test/unit/obj/test_server.py' |
3691 | --- test/unit/obj/test_server.py 2010-10-13 21:26:43 +0000 |
3692 | +++ test/unit/obj/test_server.py 2010-11-08 18:51:48 +0000 |
3693 | @@ -13,7 +13,7 @@ |
3694 | # See the License for the specific language governing permissions and |
3695 | # limitations under the License. |
3696 | |
3697 | -""" Tests for swift.object_server """ |
3698 | +""" Tests for swift.obj.server """ |
3699 | |
3700 | import cPickle as pickle |
3701 | import os |
3702 | @@ -22,23 +22,25 @@ |
3703 | from nose import SkipTest |
3704 | from shutil import rmtree |
3705 | from StringIO import StringIO |
3706 | -from time import gmtime, sleep, strftime, time |
3707 | +from time import gmtime, strftime, time |
3708 | |
3709 | from eventlet import sleep, spawn, wsgi, listen |
3710 | from webob import Request |
3711 | -from xattr import getxattr, setxattr |
3712 | +from xattr import getxattr |
3713 | |
3714 | from test.unit import connect_tcp, readuntil2crlfs |
3715 | +from swift.common.constraints import PICKLE_PROTOCOL |
3716 | from swift.obj import server as object_server |
3717 | +from swift.obj.diskfile import DATADIR, DiskFile, JANITORDIR, METADATA_KEY |
3718 | from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ |
3719 | NullLogger, storage_directory |
3720 | |
3721 | |
3722 | class TestObjectController(unittest.TestCase): |
3723 | - """ Test swift.object_server.ObjectController """ |
3724 | + """ Test swift.obj.server.ObjectController """ |
3725 | |
3726 | def setUp(self): |
3727 | - """ Set up for testing swift.object_server.ObjectController """ |
3728 | + """ Set up for testing swift.obj.server.ObjectController """ |
3729 | self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS') |
3730 | if not self.path_to_test_xfs or \ |
3731 | not os.path.exists(self.path_to_test_xfs): |
3732 | @@ -49,7 +51,7 @@ |
3733 | self.testdir = '/tmp/SWIFTUNITTEST' |
3734 | else: |
3735 | self.testdir = os.path.join(self.path_to_test_xfs, |
3736 | - 'tmp_test_object_server_ObjectController') |
3737 | + 'tmp_test_obj_server_ObjectController') |
3738 | mkdirs(self.testdir) |
3739 | rmtree(self.testdir) |
3740 | mkdirs(os.path.join(self.testdir, 'sda1')) |
3741 | @@ -59,11 +61,11 @@ |
3742 | self.object_controller.bytes_per_sync = 1 |
3743 | |
3744 | def tearDown(self): |
3745 | - """ Tear down for testing swift.object_server.ObjectController """ |
3746 | + """ Tear down for testing swift.obj.server.ObjectController """ |
3747 | rmtree(self.testdir) |
3748 | |
3749 | def test_POST_update_meta(self): |
3750 | - """ Test swift.object_server.ObjectController.POST """ |
3751 | + """ Test swift.obj.server.ObjectController.POST """ |
3752 | if not self.path_to_test_xfs: |
3753 | raise SkipTest |
3754 | timestamp = normalize_timestamp(time()) |
3755 | @@ -221,12 +223,11 @@ |
3756 | resp = self.object_controller.PUT(req) |
3757 | self.assertEquals(resp.status_int, 201) |
3758 | objfile = os.path.join(self.testdir, 'sda1', |
3759 | - storage_directory(object_server.DATADIR, 'p', |
3760 | - hash_path('a', 'c', 'o')), |
3761 | + storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')), |
3762 | timestamp + '.data') |
3763 | self.assert_(os.path.isfile(objfile)) |
3764 | self.assertEquals(open(objfile).read(), 'VERIFY') |
3765 | - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), |
3766 | + self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)), |
3767 | {'X-Timestamp': timestamp, |
3768 | 'Content-Length': '6', |
3769 | 'ETag': '0b4c12d7e0a73840c1c4f148fda3b037', |
3770 | @@ -253,12 +254,11 @@ |
3771 | resp = self.object_controller.PUT(req) |
3772 | self.assertEquals(resp.status_int, 201) |
3773 | objfile = os.path.join(self.testdir, 'sda1', |
3774 | - storage_directory(object_server.DATADIR, 'p', |
3775 | - hash_path('a', 'c', 'o')), |
3776 | + storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')), |
3777 | timestamp + '.data') |
3778 | self.assert_(os.path.isfile(objfile)) |
3779 | self.assertEquals(open(objfile).read(), 'VERIFY TWO') |
3780 | - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), |
3781 | + self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)), |
3782 | {'X-Timestamp': timestamp, |
3783 | 'Content-Length': '10', |
3784 | 'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039', |
3785 | @@ -299,12 +299,11 @@ |
3786 | resp = self.object_controller.PUT(req) |
3787 | self.assertEquals(resp.status_int, 201) |
3788 | objfile = os.path.join(self.testdir, 'sda1', |
3789 | - storage_directory(object_server.DATADIR, 'p', |
3790 | - hash_path('a', 'c', 'o')), |
3791 | + storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')), |
3792 | timestamp + '.data') |
3793 | self.assert_(os.path.isfile(objfile)) |
3794 | self.assertEquals(open(objfile).read(), 'VERIFY THREE') |
3795 | - self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)), |
3796 | + self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)), |
3797 | {'X-Timestamp': timestamp, |
3798 | 'Content-Length': '12', |
3799 | 'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568', |
3800 | @@ -375,7 +374,7 @@ |
3801 | object_server.http_connect = old_http_connect |
3802 | |
3803 | def test_HEAD(self): |
3804 | - """ Test swift.object_server.ObjectController.HEAD """ |
3805 | + """ Test swift.obj.server.ObjectController.HEAD """ |
3806 | if not self.path_to_test_xfs: |
3807 | raise SkipTest |
3808 | req = Request.blank('/sda1/p/a/c') |
3809 | @@ -410,8 +409,7 @@ |
3810 | self.assertEquals(resp.headers['x-object-meta-two'], 'Two') |
3811 | |
3812 | objfile = os.path.join(self.testdir, 'sda1', |
3813 | - storage_directory(object_server.DATADIR, 'p', |
3814 | - hash_path('a', 'c', 'o')), |
3815 | + storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')), |
3816 | timestamp + '.data') |
3817 | os.unlink(objfile) |
3818 | req = Request.blank('/sda1/p/a/c/o') |
3819 | @@ -442,7 +440,7 @@ |
3820 | self.assertEquals(resp.status_int, 404) |
3821 | |
3822 | def test_GET(self): |
3823 | - """ Test swift.object_server.ObjectController.GET """ |
3824 | + """ Test swift.obj.server.ObjectController.GET """ |
3825 | if not self.path_to_test_xfs: |
3826 | raise SkipTest |
3827 | req = Request.blank('/sda1/p/a/c') |
3828 | @@ -500,8 +498,7 @@ |
3829 | self.assertEquals(resp.headers['content-length'], '2') |
3830 | |
3831 | objfile = os.path.join(self.testdir, 'sda1', |
3832 | - storage_directory(object_server.DATADIR, 'p', |
3833 | - hash_path('a', 'c', 'o')), |
3834 | + storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')), |
3835 | timestamp + '.data') |
3836 | os.unlink(objfile) |
3837 | req = Request.blank('/sda1/p/a/c/o') |
3838 | @@ -712,7 +709,7 @@ |
3839 | self.assertEquals(resp.status_int, 200) |
3840 | |
3841 | def test_DELETE(self): |
3842 | - """ Test swift.object_server.ObjectController.DELETE """ |
3843 | + """ Test swift.obj.server.ObjectController.DELETE """ |
3844 | if not self.path_to_test_xfs: |
3845 | raise SkipTest |
3846 | req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'}) |
3847 | @@ -751,8 +748,7 @@ |
3848 | resp = self.object_controller.DELETE(req) |
3849 | self.assertEquals(resp.status_int, 204) |
3850 | objfile = os.path.join(self.testdir, 'sda1', |
3851 | - storage_directory(object_server.DATADIR, 'p', |
3852 | - hash_path('a', 'c', 'o')), |
3853 | + storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')), |
3854 | timestamp + '.ts') |
3855 | self.assert_(os.path.isfile(objfile)) |
3856 | |
3857 | @@ -764,13 +760,12 @@ |
3858 | resp = self.object_controller.DELETE(req) |
3859 | self.assertEquals(resp.status_int, 204) |
3860 | objfile = os.path.join(self.testdir, 'sda1', |
3861 | - storage_directory(object_server.DATADIR, 'p', |
3862 | - hash_path('a', 'c', 'o')), |
3863 | + storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')), |
3864 | timestamp + '.ts') |
3865 | self.assert_(os.path.isfile(objfile)) |
3866 | |
3867 | def test_call(self): |
3868 | - """ Test swift.object_server.ObjectController.__call__ """ |
3869 | + """ Test swift.obj.server.ObjectController.__call__ """ |
3870 | inbuf = StringIO() |
3871 | errbuf = StringIO() |
3872 | outbuf = StringIO() |
3873 | @@ -886,39 +881,6 @@ |
3874 | resp = self.object_controller.PUT(req) |
3875 | self.assertEquals(resp.status_int, 400) |
3876 | |
3877 | - def test_disk_file_app_iter_corners(self): |
3878 | - if not self.path_to_test_xfs: |
3879 | - raise SkipTest |
3880 | - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o') |
3881 | - mkdirs(df.datadir) |
3882 | - f = open(os.path.join(df.datadir, |
3883 | - normalize_timestamp(time()) + '.data'), 'wb') |
3884 | - f.write('1234567890') |
3885 | - setxattr(f.fileno(), object_server.METADATA_KEY, |
3886 | - pickle.dumps({}, object_server.PICKLE_PROTOCOL)) |
3887 | - f.close() |
3888 | - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', |
3889 | - keep_data_fp=True) |
3890 | - it = df.app_iter_range(0, None) |
3891 | - sio = StringIO() |
3892 | - for chunk in it: |
3893 | - sio.write(chunk) |
3894 | - self.assertEquals(sio.getvalue(), '1234567890') |
3895 | - |
3896 | - df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', |
3897 | - keep_data_fp=True) |
3898 | - it = df.app_iter_range(5, None) |
3899 | - sio = StringIO() |
3900 | - for chunk in it: |
3901 | - sio.write(chunk) |
3902 | - self.assertEquals(sio.getvalue(), '67890') |
3903 | - |
3904 | - def test_disk_file_mkstemp_creates_dir(self): |
3905 | - tmpdir = os.path.join(self.testdir, 'sda1', 'tmp') |
3906 | - os.rmdir(tmpdir) |
3907 | - with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp(): |
3908 | - self.assert_(os.path.exists(tmpdir)) |
3909 | - |
3910 | def test_max_upload_time(self): |
3911 | if not self.path_to_test_xfs: |
3912 | raise SkipTest |
3913 | @@ -1006,6 +968,161 @@ |
3914 | self.assertEquals(resp.status_int, 200) |
3915 | self.assertEquals(resp.headers['content-encoding'], 'gzip') |
3916 | |
3917 | + def test_overwritten_manifest(self): |
3918 | + """ |
3919 | + Ensures a janitor segment cleanup job is created for an overwritten |
3920 | + manifest. |
3921 | + """ |
3922 | + if not self.path_to_test_xfs: |
3923 | + raise SkipTest |
3924 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3925 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3926 | + self.assert_(df.is_deleted()) |
3927 | + |
3928 | + manifest = {'x-timestamp': normalize_timestamp(1), |
3929 | + 'content-length': 100, 'content-type': 'text/plain', |
3930 | + 'x-segment-size': 10, 'etag': 'd41d8cd98f00b204e9800998ecf8427e'} |
3931 | + manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL) |
3932 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
3933 | + headers={'X-Timestamp': normalize_timestamp(1), |
3934 | + 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain', |
3935 | + 'X-Object-Type': 'manifest', 'X-Object-Length': '100'}, |
3936 | + body=manifest) |
3937 | + resp = self.object_controller.PUT(req) |
3938 | + self.assertEquals(resp.status_int, 201) |
3939 | + |
3940 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
3941 | + headers={'X-Timestamp': normalize_timestamp(2), |
3942 | + 'Content-Length': '1', 'Content-Type': 'text/plain'}, body=' ') |
3943 | + resp = self.object_controller.PUT(req) |
3944 | + self.assertEquals(resp.status_int, 201) |
3945 | + |
3946 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3947 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3948 | + self.assert_(not df.is_deleted()) |
3949 | + |
3950 | + def test_segmented_put(self): |
3951 | + if not self.path_to_test_xfs: |
3952 | + raise SkipTest |
3953 | + |
3954 | + # Ensure the janitor cleanup job doesn't exist yet |
3955 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3956 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3957 | + self.assert_(df.is_deleted()) |
3958 | + |
3959 | + # Put the first segment |
3960 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
3961 | + headers={'X-Timestamp': normalize_timestamp(1), |
3962 | + 'Content-Length': '1', 'Content-Type': 'text/plain', |
3963 | + 'X-Object-Type': 'segment', 'X-Object-Segment': '0', |
3964 | + 'X-Object-Segment-If-Length': '1'}, body='1') |
3965 | + resp = self.object_controller.PUT(req) |
3966 | + self.assertEquals(resp.status_int, 201) |
3967 | + |
3968 | + # Ensure the janitor cleanup job now exists |
3969 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3970 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3971 | + self.assert_(not df.is_deleted()) |
3972 | + |
3973 | + # Second segment would go to a different node |
3974 | + |
3975 | + # Put the manifest |
3976 | + manifest = {'x-timestamp': normalize_timestamp(1), |
3977 | + 'content-length': 2, 'content-type': 'text/plain', |
3978 | + 'x-segment-size': 1, 'etag': 'c20ad4d76fe97759aa27a0c99bff6710'} |
3979 | + manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL) |
3980 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
3981 | + headers={'X-Timestamp': normalize_timestamp(1), |
3982 | + 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain', |
3983 | + 'X-Object-Type': 'manifest', 'X-Object-Length': '2'}, |
3984 | + body=manifest) |
3985 | + resp = self.object_controller.PUT(req) |
3986 | + self.assertEquals(resp.status_int, 201) |
3987 | + |
3988 | + # The janitor cleanup job should still exist (only the janitor will |
3989 | + # verify the manifest is in place an remove the job). |
3990 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
3991 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
3992 | + self.assert_(not df.is_deleted()) |
3993 | + |
3994 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'}) |
3995 | + resp = self.object_controller.GET(req) |
3996 | + self.assertEquals(resp.status_int, 200) |
3997 | + self.assertEquals(resp.body, manifest) |
3998 | + |
3999 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'}, |
4000 | + headers={'X-Object-Segment': '0', |
4001 | + 'X-Object-Segment-Timestamp': normalize_timestamp(1)}) |
4002 | + resp = self.object_controller.GET(req) |
4003 | + self.assertEquals(resp.status_int, 200) |
4004 | + self.assertEquals(resp.body, '1') |
4005 | + |
4006 | + def test_segmented_put_no_longer(self): |
4007 | + if not self.path_to_test_xfs: |
4008 | + raise SkipTest |
4009 | + |
4010 | + # Ensure the janitor cleanup job doesn't exist to begin with |
4011 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
4012 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
4013 | + self.assert_(df.is_deleted()) |
4014 | + |
4015 | + # Put the first segment, that really ends up a whole object |
4016 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
4017 | + headers={'X-Timestamp': normalize_timestamp(1), |
4018 | + 'Content-Length': '1', 'Content-Type': 'text/plain', |
4019 | + 'X-Object-Type': 'segment', 'X-Object-Segment': '0', |
4020 | + 'X-Object-Segment-If-Length': '2'}, body='1') |
4021 | + resp = self.object_controller.PUT(req) |
4022 | + self.assertEquals(resp.status_int, 201) |
4023 | + |
4024 | + # Ensure the janitor cleanup job doesn't exist since we put a whole |
4025 | + # file, not a segment (due to X-Object-Segment-If-Length). |
4026 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
4027 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
4028 | + self.assert_(df.is_deleted()) |
4029 | + |
4030 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'}) |
4031 | + resp = self.object_controller.GET(req) |
4032 | + self.assertEquals(resp.status_int, 200) |
4033 | + self.assertEquals(resp.body, '1') |
4034 | + |
4035 | + def test_deleted_manifest(self): |
4036 | + """ |
4037 | + Ensures a janitor segment cleanup job is created for a deleted |
4038 | + manifest. |
4039 | + """ |
4040 | + if not self.path_to_test_xfs: |
4041 | + raise SkipTest |
4042 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
4043 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR) |
4044 | + self.assert_(df.is_deleted()) |
4045 | + |
4046 | + manifest = {'x-timestamp': normalize_timestamp(1), |
4047 | + 'content-length': 100, 'content-type': 'text/plain', |
4048 | + 'x-segment-size': 10, 'etag': 'd41d8cd98f00b204e9800998ecf8427e'} |
4049 | + manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL) |
4050 | + req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
4051 | + headers={'X-Timestamp': normalize_timestamp(1), |
4052 | + 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain', |
4053 | + 'X-Object-Type': 'manifest', 'X-Object-Length': '100'}, |
4054 | + body=manifest) |
4055 | + resp = self.object_controller.PUT(req) |
4056 | + self.assertEquals(resp.status_int, 201) |
4057 | + |
4058 | + req = Request.blank('/sda1/0/a/c/o', |
4059 | + environ={'REQUEST_METHOD': 'DELETE'}, |
4060 | + headers={'X-Timestamp': normalize_timestamp(2), |
4061 | + 'Content-Length': '0', 'Content-Type': 'text/plain'}, body='') |
4062 | + resp = self.object_controller.DELETE(req) |
4063 | + self.assertEquals(resp.status_int, 204) |
4064 | + |
4065 | + df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup', |
4066 | + normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR, |
4067 | + keep_data_fp=True) |
4068 | + self.assert_(not df.is_deleted()) |
4069 | + job = pickle.loads(''.join(iter(df))) |
4070 | + self.assertEquals(job['segment_last_deleted'], -1) |
4071 | + |
4072 | |
4073 | if __name__ == '__main__': |
4074 | unittest.main() |
4075 | |
4076 | === modified file 'test/unit/obj/test_updater.py' |
4077 | --- test/unit/obj/test_updater.py 2010-09-23 16:09:30 +0000 |
4078 | +++ test/unit/obj/test_updater.py 2010-11-08 18:51:48 +0000 |
4079 | @@ -24,7 +24,7 @@ |
4080 | from eventlet import spawn, TimeoutError, listen |
4081 | from eventlet.timeout import Timeout |
4082 | |
4083 | -from swift.obj import updater as object_updater, server as object_server |
4084 | +from swift.obj import updater as object_updater |
4085 | from swift.common.ring import RingData |
4086 | from swift.common import utils |
4087 | from swift.common.utils import hash_path, normalize_timestamp, mkdirs |
4088 | @@ -48,7 +48,7 @@ |
4089 | os.mkdir(self.devices_dir) |
4090 | self.sda1 = os.path.join(self.devices_dir, 'sda1') |
4091 | os.mkdir(self.sda1) |
4092 | - os.mkdir(os.path.join(self.sda1,'tmp')) |
4093 | + os.mkdir(os.path.join(self.sda1, 'tmp')) |
4094 | |
4095 | def tearDown(self): |
4096 | rmtree(self.testdir, ignore_errors=1) |
4097 | @@ -80,7 +80,7 @@ |
4098 | 'node_timeout': '15', |
4099 | }) |
4100 | cu.run_once() |
4101 | - async_dir = os.path.join(self.sda1, object_server.ASYNCDIR) |
4102 | + async_dir = os.path.join(self.sda1, object_updater.ASYNCDIR) |
4103 | os.mkdir(async_dir) |
4104 | cu.run_once() |
4105 | self.assert_(os.path.exists(async_dir)) |
4106 | @@ -103,6 +103,7 @@ |
4107 | self.assert_(os.path.exists(op_path)) |
4108 | |
4109 | bindsock = listen(('127.0.0.1', 0)) |
4110 | + |
4111 | def accepter(sock, return_code): |
4112 | try: |
4113 | with Timeout(3): |
4114 | @@ -123,6 +124,7 @@ |
4115 | except BaseException, err: |
4116 | return err |
4117 | return None |
4118 | + |
4119 | def accept(return_codes): |
4120 | codes = iter(return_codes) |
4121 | try: |
4122 | @@ -139,7 +141,8 @@ |
4123 | except BaseException, err: |
4124 | return err |
4125 | return None |
4126 | - event = spawn(accept, [201,500]) |
4127 | + |
4128 | + event = spawn(accept, [201, 500]) |
4129 | for dev in cu.get_container_ring().devs: |
4130 | if dev is not None: |
4131 | dev['port'] = bindsock.getsockname()[1] |
4132 | @@ -155,5 +158,6 @@ |
4133 | raise err |
4134 | self.assert_(not os.path.exists(op_path)) |
4135 | |
4136 | + |
4137 | if __name__ == '__main__': |
4138 | unittest.main() |
4139 | |
4140 | === modified file 'test/unit/proxy/test_server.py' |
4141 | --- test/unit/proxy/test_server.py 2010-11-05 14:47:43 +0000 |
4142 | +++ test/unit/proxy/test_server.py 2010-11-08 18:51:48 +0000 |
4143 | @@ -34,7 +34,7 @@ |
4144 | from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen |
4145 | from eventlet.timeout import Timeout |
4146 | import simplejson |
4147 | -from webob import Request |
4148 | +from webob import Request, Response |
4149 | from webob.exc import HTTPUnauthorized |
4150 | |
4151 | from test.unit import connect_tcp, readuntil2crlfs |
4152 | @@ -44,7 +44,7 @@ |
4153 | from swift.obj import server as object_server |
4154 | from swift.common import ring |
4155 | from swift.common.constraints import MAX_META_NAME_LENGTH, \ |
4156 | - MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, MAX_FILE_SIZE |
4157 | + MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE |
4158 | from swift.common.utils import mkdirs, normalize_timestamp, NullLogger |
4159 | |
4160 | |
4161 | @@ -53,7 +53,9 @@ |
4162 | |
4163 | |
4164 | def fake_http_connect(*code_iter, **kwargs): |
4165 | + |
4166 | class FakeConn(object): |
4167 | + |
4168 | def __init__(self, status, etag=None, body=''): |
4169 | self.status = status |
4170 | self.reason = 'Fake' |
4171 | @@ -158,6 +160,7 @@ |
4172 | |
4173 | |
4174 | class FakeMemcache(object): |
4175 | + |
4176 | def __init__(self): |
4177 | self.store = {} |
4178 | |
4179 | @@ -212,9 +215,12 @@ |
4180 | class TestProxyServer(unittest.TestCase): |
4181 | |
4182 | def test_unhandled_exception(self): |
4183 | + |
4184 | class MyApp(proxy_server.Application): |
4185 | + |
4186 | def get_controller(self, path): |
4187 | raise Exception('this shouldnt be caught') |
4188 | + |
4189 | app = MyApp(None, FakeMemcache(), account_ring=FakeRing(), |
4190 | container_ring=FakeRing(), object_ring=FakeRing()) |
4191 | req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'}) |
4192 | @@ -323,8 +329,11 @@ |
4193 | test_status_map((200, 200, 204, 500, 404), 503) |
4194 | |
4195 | def test_PUT_connect_exceptions(self): |
4196 | + |
4197 | def mock_http_connect(*code_iter, **kwargs): |
4198 | + |
4199 | class FakeConn(object): |
4200 | + |
4201 | def __init__(self, status): |
4202 | self.status = status |
4203 | self.reason = 'Fake' |
4204 | @@ -372,8 +381,11 @@ |
4205 | test_status_map((200, 200, 503, 503, -1), 503) |
4206 | |
4207 | def test_PUT_send_exceptions(self): |
4208 | + |
4209 | def mock_http_connect(*code_iter, **kwargs): |
4210 | + |
4211 | class FakeConn(object): |
4212 | + |
4213 | def __init__(self, status): |
4214 | self.status = status |
4215 | self.reason = 'Fake' |
4216 | @@ -430,15 +442,18 @@ |
4217 | controller = proxy_server.ObjectController(self.app, 'account', |
4218 | 'container', 'object') |
4219 | req = Request.blank('/a/c/o', {}, headers={ |
4220 | - 'Content-Length': str(MAX_FILE_SIZE + 1), |
4221 | + 'Content-Length': str(self.app.max_object_size + 1), |
4222 | 'Content-Type': 'foo/bar'}) |
4223 | self.app.update_request(req) |
4224 | res = controller.PUT(req) |
4225 | self.assertEquals(res.status_int, 413) |
4226 | |
4227 | def test_PUT_getresponse_exceptions(self): |
4228 | + |
4229 | def mock_http_connect(*code_iter, **kwargs): |
4230 | + |
4231 | class FakeConn(object): |
4232 | + |
4233 | def __init__(self, status): |
4234 | self.status = status |
4235 | self.reason = 'Fake' |
4236 | @@ -633,6 +648,7 @@ |
4237 | dev['port'] = 1 |
4238 | |
4239 | class SlowBody(): |
4240 | + |
4241 | def __init__(self): |
4242 | self.sent = 0 |
4243 | |
4244 | @@ -680,6 +696,7 @@ |
4245 | dev['port'] = 1 |
4246 | |
4247 | class SlowBody(): |
4248 | + |
4249 | def __init__(self): |
4250 | self.sent = 0 |
4251 | |
4252 | @@ -1334,7 +1351,9 @@ |
4253 | |
4254 | def test_chunked_put(self): |
4255 | # quick test of chunked put w/o PATH_TO_TEST_XFS |
4256 | + |
4257 | class ChunkedFile(): |
4258 | + |
4259 | def __init__(self, bytes): |
4260 | self.bytes = bytes |
4261 | self.read_bytes = 0 |
4262 | @@ -1375,12 +1394,13 @@ |
4263 | req.body_file = ChunkedFile(11) |
4264 | self.app.memcache.store = {} |
4265 | self.app.update_request(req) |
4266 | + orig_max_object_size = self.app.max_object_size |
4267 | try: |
4268 | - server.MAX_FILE_SIZE = 10 |
4269 | + self.app.max_object_size = 10 |
4270 | res = controller.PUT(req) |
4271 | self.assertEquals(res.status_int, 413) |
4272 | finally: |
4273 | - server.MAX_FILE_SIZE = MAX_FILE_SIZE |
4274 | + self.app.max_object_size = orig_max_object_size |
4275 | |
4276 | def test_chunked_put_and_a_bit_more(self): |
4277 | # Since we're starting up a lot here, we're going to test more than |
4278 | @@ -1495,6 +1515,7 @@ |
4279 | self.assertEquals(headers[:len(exp)], exp) |
4280 | # Check unhandled exception |
4281 | orig_update_request = prosrv.update_request |
4282 | + |
4283 | def broken_update_request(env, req): |
4284 | raise Exception('fake') |
4285 | prosrv.update_request = broken_update_request |
4286 | @@ -1545,6 +1566,7 @@ |
4287 | # in a test for logging x-forwarded-for (first entry only). |
4288 | |
4289 | class Logger(object): |
4290 | + |
4291 | def info(self, msg): |
4292 | self.msg = msg |
4293 | orig_logger = prosrv.logger |
4294 | @@ -1568,6 +1590,7 @@ |
4295 | # Turn on header logging. |
4296 | |
4297 | class Logger(object): |
4298 | + |
4299 | def info(self, msg): |
4300 | self.msg = msg |
4301 | orig_logger = prosrv.logger |
4302 | @@ -1919,6 +1942,188 @@ |
4303 | res = controller.PUT(req) |
4304 | self.assert_(called[0]) |
4305 | |
4306 | + def test_GETorHEAD1(self): |
4307 | + """ |
4308 | + Ensures we call GETorHEAD_base again without a range if we do a range |
4309 | + request and get a 416 Requested Range Not Satisfiable, just in case the |
4310 | + primary object is a manifest. |
4311 | + """ |
4312 | + called_without_range = [False] |
4313 | + controller = proxy_server.ObjectController(self.app, 'account', |
4314 | + 'container', 'object') |
4315 | + |
4316 | + def local_GETorHEAD_base(req, server_type, partition, nodes, path, |
4317 | + attempts): |
4318 | + if req.range: |
4319 | + return Response(status=416) |
4320 | + called_without_range[0] = True |
4321 | + return Response() |
4322 | + |
4323 | + controller.GETorHEAD_base = local_GETorHEAD_base |
4324 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'}, |
4325 | + headers={'Range': 'bytes=0-1'}) |
4326 | + controller.GETorHEAD(req) |
4327 | + self.assert_(called_without_range[0]) |
4328 | + |
4329 | + def test_GETorHEAD2(self): |
4330 | + """ |
4331 | + Ensures we call GETorHEAD_base again if the first request was a HEAD |
4332 | + and the primary object is a manifest. |
4333 | + """ |
4334 | + called_get = [False] |
4335 | + controller = proxy_server.ObjectController(self.app, 'account', |
4336 | + 'container', 'object') |
4337 | + |
4338 | + def local_GETorHEAD_base(req, server_type, partition, nodes, path, |
4339 | + attempts): |
4340 | + if req.method == 'HEAD': |
4341 | + return Response(headers={'x-object-type': 'manifest'}) |
4342 | + elif req.method == 'GET': |
4343 | + called_get[0] = True |
4344 | + return Response(headers={'x-object-type': 'manifest'}, |
4345 | + body=pickle.dumps({'content-length': 0, 'x-segment-size': 1, |
4346 | + 'x-timestamp': normalize_timestamp(2), |
4347 | + 'etag': 'd41d8cd98f00b204e9800998ecf8427e', |
4348 | + 'content-type': 'text/plain'})) |
4349 | + else: |
4350 | + raise Exception('Unexpected method %s' % req.method) |
4351 | + |
4352 | + controller.GETorHEAD_base = local_GETorHEAD_base |
4353 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'HEAD'}) |
4354 | + controller.GETorHEAD(req) |
4355 | + self.assert_(called_get[0]) |
4356 | + |
4357 | + def test_GETorHEAD3(self): |
4358 | + """ |
4359 | + Ensures we call GETorHEAD_base again without a range if the first |
4360 | + request was a GET with range that succeeded and the primary object is a |
4361 | + manifest. |
4362 | + """ |
4363 | + called_without_range = [False] |
4364 | + controller = proxy_server.ObjectController(self.app, 'account', |
4365 | + 'container', 'object') |
4366 | + |
4367 | + def local_GETorHEAD_base(req, server_type, partition, nodes, path, |
4368 | + attempts): |
4369 | + if not req.range: |
4370 | + called_without_range[0] = True |
4371 | + return Response(headers={'x-object-type': 'manifest'}, |
4372 | + body=pickle.dumps({'content-length': 0, 'x-segment-size': 1, |
4373 | + 'x-timestamp': normalize_timestamp(2), |
4374 | + 'etag': 'd41d8cd98f00b204e9800998ecf8427e', |
4375 | + 'content-type': 'text/plain'})) |
4376 | + |
4377 | + controller.GETorHEAD_base = local_GETorHEAD_base |
4378 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'}, |
4379 | + headers={'Range': 'bytes=0-1'}) |
4380 | + controller.GETorHEAD(req) |
4381 | + self.assert_(called_without_range[0]) |
4382 | + |
4383 | + def test_PUT_segmented_object1(self): |
4384 | + with save_globals(): |
4385 | + proxy_server.http_connect = \ |
4386 | + fake_http_connect(200, 200, # account, container checks |
4387 | + 201, 201, 201, # segment 0 |
4388 | + 201, 201, 201, # segment 1 |
4389 | + 201, 201, 201, # segment 2 |
4390 | + 201, 201, 201) # manifest |
4391 | + controller = proxy_server.ObjectController(self.app, 'account', |
4392 | + 'container', 'object') |
4393 | + self.app.segment_size = 2 |
4394 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
4395 | + body='12345') |
4396 | + resp = controller.PUT(req) |
4397 | + self.assertEquals(resp.status_int, 201) |
4398 | + self.assertEquals(req.bytes_transferred, 5) |
4399 | + |
4400 | + def test_PUT_segmented_object2(self): |
4401 | + """ Same as 1, just with a chunky data source. """ |
4402 | + with save_globals(): |
4403 | + proxy_server.http_connect = \ |
4404 | + fake_http_connect(200, 200, # account, container checks |
4405 | + 201, 201, 201, # segment 0 |
4406 | + 201, 201, 201, # segment 1 |
4407 | + 201, 201, 201, # segment 2 |
4408 | + 201, 201, 201) # manifest |
4409 | + controller = proxy_server.ObjectController(self.app, 'account', |
4410 | + 'container', 'object') |
4411 | + self.app.segment_size = 2 |
4412 | + |
4413 | + class ChunkedReader(object): |
4414 | + |
4415 | + def __init__(self): |
4416 | + self.chunk = 0 |
4417 | + |
4418 | + def read(self, size): |
4419 | + self.chunk += 1 |
4420 | + if self.chunk == 1: |
4421 | + return '123' |
4422 | + elif self.chunk == 2: |
4423 | + return '45' |
4424 | + else: |
4425 | + return '' |
4426 | + |
4427 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT', |
4428 | + 'wsgi.input': ChunkedReader()}, headers={'Content-Length': '5'}) |
4429 | + resp = controller.PUT(req) |
4430 | + self.assertEquals(resp.status_int, 201) |
4431 | + self.assertEquals(req.bytes_transferred, 5) |
4432 | + |
4433 | + def test_PUT_segmented_object3(self): |
4434 | + """ Failed segment PUT. """ |
4435 | + with save_globals(): |
4436 | + proxy_server.http_connect = \ |
4437 | + fake_http_connect(200, 200, # account, container checks |
4438 | + 201, 201, 201, # segment 0 |
4439 | + 201, 500, 500, # segment 1 |
4440 | + 201, 201, 201, # segment 2 |
4441 | + 201, 201, 201) # manifest |
4442 | + controller = proxy_server.ObjectController(self.app, 'account', |
4443 | + 'container', 'object') |
4444 | + self.app.segment_size = 2 |
4445 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
4446 | + body='12345') |
4447 | + resp = controller.PUT(req) |
4448 | + self.assertEquals(resp.status_int, 503) |
4449 | + self.assertEquals(resp.body.strip(), |
4450 | + 'Unable to complete very large file operation.') |
4451 | + |
4452 | + def test_PUT_segmented_object4(self): |
4453 | + """ Non-matching etag sent. """ |
4454 | + with save_globals(): |
4455 | + proxy_server.http_connect = \ |
4456 | + fake_http_connect(200, 200, # account, container checks |
4457 | + 201, 201, 201, # segment 0 |
4458 | + 201, 201, 201, # segment 1 |
4459 | + 201, 201, 201, # segment 2 |
4460 | + 201, 201, 201) # manifest |
4461 | + controller = proxy_server.ObjectController(self.app, 'account', |
4462 | + 'container', 'object') |
4463 | + self.app.segment_size = 2 |
4464 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
4465 | + body='12345', headers={'ETag': 'abc'}) |
4466 | + resp = controller.PUT(req) |
4467 | + self.assertEquals(resp.status_int, 422) |
4468 | + |
4469 | + def test_PUT_segmented_object5(self): |
4470 | + """ Failed manifest PUT. """ |
4471 | + with save_globals(): |
4472 | + proxy_server.http_connect = \ |
4473 | + fake_http_connect(200, 200, # account, container checks |
4474 | + 201, 201, 201, # segment 0 |
4475 | + 201, 201, 201, # segment 1 |
4476 | + 201, 201, 201, # segment 2 |
4477 | + 201, 500, 500) # manifest |
4478 | + controller = proxy_server.ObjectController(self.app, 'account', |
4479 | + 'container', 'object') |
4480 | + self.app.segment_size = 2 |
4481 | + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, |
4482 | + body='12345') |
4483 | + resp = controller.PUT(req) |
4484 | + self.assertEquals(resp.status_int, 503) |
4485 | + self.assertEquals(resp.body.strip(), |
4486 | + 'Unable to complete very large file operation.') |
4487 | + |
4488 | def test_COPY_calls_authorize(self): |
4489 | called = [False] |
4490 | |
4491 | @@ -2080,7 +2285,9 @@ |
4492 | self.assertEquals(resp.status_int, 404) |
4493 | |
4494 | def test_put_locking(self): |
4495 | + |
4496 | class MockMemcache(FakeMemcache): |
4497 | + |
4498 | def __init__(self, allow_lock=None): |
4499 | self.allow_lock = allow_lock |
4500 | super(MockMemcache, self).__init__() |
4501 | @@ -2669,5 +2876,142 @@ |
4502 | self.assertEquals(resp.status_int, 400) |
4503 | |
4504 | |
4505 | +class TestSegmentedIterable(unittest.TestCase): |
4506 | + |
4507 | + def setUp(self): |
4508 | + self.app = proxy_server.Application(None, FakeMemcache(), |
4509 | + account_ring=FakeRing(), container_ring=FakeRing(), |
4510 | + object_ring=FakeRing()) |
4511 | + self.controller = proxy_server.ObjectController(self.app, 'account', |
4512 | + 'container', 'object') |
4513 | + |
4514 | + def test_zero_bytes(self): |
4515 | + si = proxy_server.SegmentedIterable(self.controller, 0, 2, |
4516 | + normalize_timestamp(1)) |
4517 | + self.assertEquals(''.join(iter(si)), '') |
4518 | + |
4519 | + def test_happy_path(self): |
4520 | + segment = [0] |
4521 | + |
4522 | + def give_connect(*args, **kwargs): |
4523 | + self.assertEquals(int(kwargs['headers']['X-Object-Segment']), |
4524 | + segment[0]) |
4525 | + segment[0] += 1 |
4526 | + |
4527 | + with save_globals(): |
4528 | + proxy_server.http_connect = fake_http_connect(200, 200, 200, |
4529 | + body='12', give_connect=give_connect) |
4530 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4531 | + normalize_timestamp(1)) |
4532 | + self.assertEquals(''.join(iter(si)), '12121') |
4533 | + self.assertEquals(segment[0], 3) |
4534 | + |
4535 | + def test_not_found_start(self): |
4536 | + with save_globals(): |
4537 | + proxy_server.http_connect = \ |
4538 | + fake_http_connect(404, 404, 404, 200, 200, 200, body='12') |
4539 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4540 | + normalize_timestamp(1)) |
4541 | + exc = None |
4542 | + try: |
4543 | + for chunk in si: |
4544 | + raise Exception('Got data when we should not have.') |
4545 | + except Exception, err: |
4546 | + exc = err |
4547 | + self.assertEquals(str(exc), |
4548 | + 'Could not load segment 0 of /account/container/object') |
4549 | + |
4550 | + def test_not_found_after_start(self): |
4551 | + with save_globals(): |
4552 | + proxy_server.http_connect = \ |
4553 | + fake_http_connect(200, 404, 404, 404, 200, 200, body='12') |
4554 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4555 | + normalize_timestamp(1)) |
4556 | + exc = None |
4557 | + try: |
4558 | + for chunk in si: |
4559 | + self.assertEquals(chunk, '12') |
4560 | + except Exception, err: |
4561 | + exc = err |
4562 | + self.assertEquals(str(exc), |
4563 | + 'Could not load segment 1 of /account/container/object') |
4564 | + |
4565 | + def test_partial_not_found(self): |
4566 | + with save_globals(): |
4567 | + proxy_server.http_connect = \ |
4568 | + fake_http_connect(404, 200, 404, 404, 200, 200, body='12') |
4569 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4570 | + normalize_timestamp(1)) |
4571 | + self.assertEquals(''.join(iter(si)), '12121') |
4572 | + |
4573 | + def test_bytes_transferred(self): |
4574 | + with save_globals(): |
4575 | + proxy_server.http_connect = \ |
4576 | + fake_http_connect(200, 200, 200, body='12') |
4577 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4578 | + normalize_timestamp(1)) |
4579 | + |
4580 | + class Stub(object): |
4581 | + pass |
4582 | + |
4583 | + si.response = Stub() |
4584 | + self.assertEquals(''.join(iter(si)), '12121') |
4585 | + self.assertEquals(si.response.bytes_transferred, 5) |
4586 | + |
4587 | + def test_bytes_transferred_app_iter_range(self): |
4588 | + with save_globals(): |
4589 | + proxy_server.http_connect = \ |
4590 | + fake_http_connect(200, 200, 200, body='12') |
4591 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4592 | + normalize_timestamp(1)) |
4593 | + |
4594 | + class Stub(object): |
4595 | + pass |
4596 | + |
4597 | + si.response = Stub() |
4598 | + self.assertEquals(''.join(si.app_iter_range(1, 3)), '212') |
4599 | + self.assertEquals(si.response.bytes_transferred, 3) |
4600 | + |
4601 | + def test_app_iter_range_past_end(self): |
4602 | + with save_globals(): |
4603 | + proxy_server.http_connect = \ |
4604 | + fake_http_connect(200, 200, 200, body='12') |
4605 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4606 | + normalize_timestamp(1)) |
4607 | + self.assertEquals(''.join(si.app_iter_range(1, 30)), '2121') |
4608 | + |
4609 | + def test_app_iter_range_start_past_end(self): |
4610 | + with save_globals(): |
4611 | + proxy_server.http_connect = \ |
4612 | + fake_http_connect(200, 200, 200, body='12') |
4613 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4614 | + normalize_timestamp(1)) |
4615 | + self.assertEquals(''.join(si.app_iter_range(30, 31)), '') |
4616 | + |
4617 | + def test_app_iter_range_to_end(self): |
4618 | + with save_globals(): |
4619 | + proxy_server.http_connect = \ |
4620 | + fake_http_connect(200, 200, 200, body='12') |
4621 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4622 | + normalize_timestamp(1)) |
4623 | + self.assertEquals(''.join(si.app_iter_range(3, None)), '21') |
4624 | + |
4625 | + def test_app_iter_range_to_an_end(self): |
4626 | + with save_globals(): |
4627 | + proxy_server.http_connect = \ |
4628 | + fake_http_connect(200, 200, 200, body='12') |
4629 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4630 | + normalize_timestamp(1)) |
4631 | + self.assertEquals(''.join(si.app_iter_range(None, 3)), '121') |
4632 | + |
4633 | + def test_app_iter_range_full(self): |
4634 | + with save_globals(): |
4635 | + proxy_server.http_connect = \ |
4636 | + fake_http_connect(200, 200, 200, body='12') |
4637 | + si = proxy_server.SegmentedIterable(self.controller, 5, 2, |
4638 | + normalize_timestamp(1)) |
4639 | + self.assertEquals(''.join(si.app_iter_range(None, None)), '12121') |
4640 | + |
4641 | + |
4642 | if __name__ == '__main__': |
4643 | unittest.main() |
After a thorough discussion, this idea of transparent large object support is going to be dropped in favor of a combination client and cluster side solution.