Merge lp:~gholt/swift/largefiles into lp:~hudson-openstack/swift/trunk
- largefiles
- Merge into trunk
Status: | Rejected |
---|---|
Rejected by: | gholt |
Proposed branch: | lp:~gholt/swift/largefiles |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
1491 lines (+930/-231) 10 files modified
swift/common/constraints.py (+8/-2) swift/obj/auditor.py (+1/-1) swift/obj/hashes.py (+179/-0) swift/obj/hashes.py.moved (+179/-0) swift/obj/replicator.py (+37/-177) swift/obj/server.py (+81/-30) swift/proxy/server.py (+387/-21) test/unit/__init__.py (+2/-0) test/unit/obj/test_hashes.py (+28/-0) test/unit/obj/test_hashes.py.moved (+28/-0) |
To merge this branch: | bzr merge lp:~gholt/swift/largefiles |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
gholt (community) | Disapprove | ||
Review via email: mp+35493@code.launchpad.net |
Commit message
Description of the change
Support for very large files by breaking them into segments and distributing the segments across the cluster.
- 72. By gholt
-
Fixed typo
- 73. By gholt
-
Merged changes from trunk
- 74. By gholt
-
very-large-files: now distributes segments across cluster and will not overwrite existing segments during the upload
- 75. By gholt
-
Updating object auditor to clean up old orphaned large file segments
- 76. By gholt
-
Merged changes from trunk
- 77. By gholt
-
Merged changes from trunk
- 78. By gholt
-
Merged from trunk
- 79. By gholt
-
Renovation to store object segments separate from whole objects
- 80. By gholt
-
Just a bit of DRY
- 81. By gholt
-
Support very large chunked transfer encoding uploads
- 82. By gholt
-
Update import and new test stub for moved obj hashes code
- 83. By gholt
-
Merge from trunk
- 84. By gholt
-
Merged from trunk
- 85. By gholt
-
Merged from trunk
- 86. By gholt
-
Merged with trunk
- 87. By gholt
-
Merged from trunk
- 88. By gholt
-
Refactor of container_updater calling; removed outdated imports
- 89. By gholt
-
Merged with refactorhashes
Unmerged revisions
- 91. By gholt
-
Merged from refactorobjasync
- 90. By gholt
-
Merge from trunk
- 89. By gholt
-
Merged with refactorhashes
- 88. By gholt
-
Refactor of container_updater calling; removed outdated imports
- 87. By gholt
-
Merged from trunk
- 86. By gholt
-
Merged with trunk
- 85. By gholt
-
Merged from trunk
- 84. By gholt
-
Merged from trunk
- 83. By gholt
-
Merge from trunk
- 82. By gholt
-
Update import and new test stub for moved obj hashes code
Preview Diff
1 | === modified file 'swift/common/constraints.py' |
2 | --- swift/common/constraints.py 2010-09-22 19:53:38 +0000 |
3 | +++ swift/common/constraints.py 2010-10-19 18:43:49 +0000 |
4 | @@ -19,8 +19,12 @@ |
5 | HTTPRequestEntityTooLarge |
6 | |
7 | |
8 | +# FIXME: This will get bumped way up once very large file support is added. |
9 | #: Max file size allowed for objects |
10 | MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2 |
11 | +# FIXME: Yeah, I know this is a silly low value; just for testing right now. |
12 | +#: File size for segments of large objects |
13 | +SEGMENT_SIZE = 1024 |
14 | #: Max length of the name of a key for metadata |
15 | MAX_META_NAME_LENGTH = 128 |
16 | #: Max length of the value of a key for metadata |
17 | @@ -29,14 +33,16 @@ |
18 | MAX_META_COUNT = 90 |
19 | #: Max overall size of metadata |
20 | MAX_META_OVERALL_SIZE = 4096 |
21 | +#: Max account name length |
22 | +MAX_ACCOUNT_NAME_LENGTH = 256 |
23 | +#: Max container name length |
24 | +MAX_CONTAINER_NAME_LENGTH = 256 |
25 | #: Max object name length |
26 | MAX_OBJECT_NAME_LENGTH = 1024 |
27 | #: Max object list length of a get request for a container |
28 | CONTAINER_LISTING_LIMIT = 10000 |
29 | #: Max container list length of a get request for an account |
30 | ACCOUNT_LISTING_LIMIT = 10000 |
31 | -MAX_ACCOUNT_NAME_LENGTH = 256 |
32 | -MAX_CONTAINER_NAME_LENGTH = 256 |
33 | |
34 | |
35 | def check_metadata(req, target_type): |
36 | |
37 | === modified file 'swift/obj/auditor.py' |
38 | --- swift/obj/auditor.py 2010-10-18 22:30:26 +0000 |
39 | +++ swift/obj/auditor.py 2010-10-19 18:43:49 +0000 |
40 | @@ -19,7 +19,7 @@ |
41 | from random import random |
42 | |
43 | from swift.obj import server as object_server |
44 | -from swift.obj.replicator import invalidate_hash |
45 | +from swift.obj.hashes import invalidate_hash |
46 | from swift.common.utils import get_logger, renamer, audit_location_generator |
47 | from swift.common.exceptions import AuditException |
48 | from swift.common.daemon import Daemon |
49 | |
50 | === added file 'swift/obj/hashes.py' |
51 | --- swift/obj/hashes.py 1970-01-01 00:00:00 +0000 |
52 | +++ swift/obj/hashes.py 2010-10-19 18:43:49 +0000 |
53 | @@ -0,0 +1,179 @@ |
54 | +# Copyright (c) 2010 OpenStack, LLC. |
55 | +# |
56 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
57 | +# you may not use this file except in compliance with the License. |
58 | +# You may obtain a copy of the License at |
59 | +# |
60 | +# http://www.apache.org/licenses/LICENSE-2.0 |
61 | +# |
62 | +# Unless required by applicable law or agreed to in writing, software |
63 | +# distributed under the License is distributed on an "AS IS" BASIS, |
64 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
65 | +# implied. |
66 | +# See the License for the specific language governing permissions and |
67 | +# limitations under the License. |
68 | + |
69 | +import cPickle as pickle |
70 | +import hashlib |
71 | +import os |
72 | +from os.path import isdir, join |
73 | + |
74 | +from eventlet import tpool, sleep |
75 | + |
76 | +from swift.common.utils import lock_path, renamer |
77 | + |
78 | + |
79 | +PICKLE_PROTOCOL = 2 |
80 | +ONE_WEEK = 604800 |
81 | +HASH_FILE = 'hashes.pkl' |
82 | + |
83 | + |
84 | +def hash_suffix(path, reclaim_age): |
85 | + """ |
86 | + Performs reclamation and returns an md5 of all (remaining) files. |
87 | + |
88 | + :param reclaim_age: age in seconds at which to remove tombstones |
89 | + """ |
90 | + md5 = hashlib.md5() |
91 | + for hsh in sorted(os.listdir(path)): |
92 | + hsh_path = join(path, hsh) |
93 | + files = os.listdir(hsh_path) |
94 | + if len(files) == 1: |
95 | + if files[0].endswith('.ts'): |
96 | + # remove tombstones older than reclaim_age |
97 | + ts = files[0].rsplit('.', 1)[0] |
98 | + if (time.time() - float(ts)) > reclaim_age: |
99 | + os.unlink(join(hsh_path, files[0])) |
100 | + files.remove(files[0]) |
101 | + elif files: |
102 | + files.sort(reverse=True) |
103 | + meta = data = tomb = None |
104 | + for filename in files: |
105 | + if not meta and filename.endswith('.meta'): |
106 | + meta = filename |
107 | + if not data and filename.endswith('.data'): |
108 | + data = filename |
109 | + if not tomb and filename.endswith('.ts'): |
110 | + tomb = filename |
111 | + if (filename < tomb or # any file older than tomb |
112 | + filename < data or # any file older than data |
113 | + (filename.endswith('.meta') and |
114 | + filename < meta)): # old meta |
115 | + os.unlink(join(hsh_path, filename)) |
116 | + files.remove(filename) |
117 | + if not files: |
118 | + os.rmdir(hsh_path) |
119 | + for filename in files: |
120 | + md5.update(filename) |
121 | + try: |
122 | + os.rmdir(path) |
123 | + except OSError: |
124 | + pass |
125 | + return md5.hexdigest() |
126 | + |
127 | + |
128 | +def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK): |
129 | + """ |
130 | + Recalculates hashes for the given suffixes in the partition and updates |
131 | + them in the partition's hashes file. |
132 | + |
133 | + :param partition_dir: directory of the partition in which to recalculate |
134 | + :param suffixes: list of suffixes to recalculate |
135 | + :param reclaim_age: age in seconds at which tombstones should be removed |
136 | + """ |
137 | + |
138 | + def tpool_listdir(partition_dir): |
139 | + return dict(((suff, None) for suff in os.listdir(partition_dir) |
140 | + if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
141 | + hashes_file = join(partition_dir, HASH_FILE) |
142 | + with lock_path(partition_dir): |
143 | + try: |
144 | + with open(hashes_file, 'rb') as fp: |
145 | + hashes = pickle.load(fp) |
146 | + except Exception: |
147 | + hashes = tpool.execute(tpool_listdir, partition_dir) |
148 | + for suffix in suffixes: |
149 | + suffix_dir = join(partition_dir, suffix) |
150 | + if os.path.exists(suffix_dir): |
151 | + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
152 | + elif suffix in hashes: |
153 | + del hashes[suffix] |
154 | + with open(hashes_file + '.tmp', 'wb') as fp: |
155 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
156 | + renamer(hashes_file + '.tmp', hashes_file) |
157 | + |
158 | + |
159 | +def invalidate_hash(suffix_dir): |
160 | + """ |
161 | + Invalidates the hash for a suffix_dir in the partition's hashes file. |
162 | + |
163 | + :param suffix_dir: absolute path to suffix dir whose hash needs |
164 | + invalidating |
165 | + """ |
166 | + |
167 | + suffix = os.path.basename(suffix_dir) |
168 | + partition_dir = os.path.dirname(suffix_dir) |
169 | + hashes_file = join(partition_dir, HASH_FILE) |
170 | + with lock_path(partition_dir): |
171 | + try: |
172 | + with open(hashes_file, 'rb') as fp: |
173 | + hashes = pickle.load(fp) |
174 | + if suffix in hashes and not hashes[suffix]: |
175 | + return |
176 | + except Exception: |
177 | + return |
178 | + hashes[suffix] = None |
179 | + with open(hashes_file + '.tmp', 'wb') as fp: |
180 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
181 | + renamer(hashes_file + '.tmp', hashes_file) |
182 | + |
183 | + |
184 | +def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK): |
185 | + """ |
186 | + Get a list of hashes for the suffix dir. do_listdir causes it to mistrust |
187 | + the hash cache for suffix existence at the (unexpectedly high) cost of a |
188 | + listdir. reclaim_age is just passed on to hash_suffix. |
189 | + |
190 | + :param partition_dir: absolute path of partition to get hashes for |
191 | + :param do_listdir: force existence check for all hashes in the partition |
192 | + :param reclaim_age: age at which to remove tombstones |
193 | + |
194 | + :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) |
195 | + """ |
196 | + |
197 | + def tpool_listdir(hashes, partition_dir): |
198 | + return dict(((suff, hashes.get(suff, None)) |
199 | + for suff in os.listdir(partition_dir) |
200 | + if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
201 | + hashed = 0 |
202 | + hashes_file = join(partition_dir, HASH_FILE) |
203 | + with lock_path(partition_dir): |
204 | + modified = False |
205 | + hashes = {} |
206 | + try: |
207 | + with open(hashes_file, 'rb') as fp: |
208 | + hashes = pickle.load(fp) |
209 | + except Exception: |
210 | + do_listdir = True |
211 | + if do_listdir: |
212 | + hashes = tpool.execute(tpool_listdir, hashes, partition_dir) |
213 | + modified = True |
214 | + for suffix, hash_ in hashes.items(): |
215 | + if not hash_: |
216 | + suffix_dir = join(partition_dir, suffix) |
217 | + if os.path.exists(suffix_dir): |
218 | + try: |
219 | + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
220 | + hashed += 1 |
221 | + except OSError: |
222 | + logging.exception('Error hashing suffix') |
223 | + hashes[suffix] = None |
224 | + else: |
225 | + del hashes[suffix] |
226 | + modified = True |
227 | + sleep() |
228 | + if modified: |
229 | + with open(hashes_file + '.tmp', 'wb') as fp: |
230 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
231 | + renamer(hashes_file + '.tmp', hashes_file) |
232 | + return hashed, hashes |
233 | |
234 | === added file 'swift/obj/hashes.py.moved' |
235 | --- swift/obj/hashes.py.moved 1970-01-01 00:00:00 +0000 |
236 | +++ swift/obj/hashes.py.moved 2010-10-19 18:43:49 +0000 |
237 | @@ -0,0 +1,179 @@ |
238 | +# Copyright (c) 2010 OpenStack, LLC. |
239 | +# |
240 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
241 | +# you may not use this file except in compliance with the License. |
242 | +# You may obtain a copy of the License at |
243 | +# |
244 | +# http://www.apache.org/licenses/LICENSE-2.0 |
245 | +# |
246 | +# Unless required by applicable law or agreed to in writing, software |
247 | +# distributed under the License is distributed on an "AS IS" BASIS, |
248 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
249 | +# implied. |
250 | +# See the License for the specific language governing permissions and |
251 | +# limitations under the License. |
252 | + |
253 | +import cPickle as pickle |
254 | +import hashlib |
255 | +import os |
256 | +from os.path import isdir, join |
257 | + |
258 | +from eventlet import tpool, sleep |
259 | + |
260 | +from swift.common.utils import lock_path, renamer |
261 | + |
262 | + |
263 | +PICKLE_PROTOCOL = 2 |
264 | +ONE_WEEK = 604800 |
265 | +HASH_FILE = 'hashes.pkl' |
266 | + |
267 | + |
268 | +def hash_suffix(path, reclaim_age): |
269 | + """ |
270 | + Performs reclamation and returns an md5 of all (remaining) files. |
271 | + |
272 | + :param reclaim_age: age in seconds at which to remove tombstones |
273 | + """ |
274 | + md5 = hashlib.md5() |
275 | + for hsh in sorted(os.listdir(path)): |
276 | + hsh_path = join(path, hsh) |
277 | + files = os.listdir(hsh_path) |
278 | + if len(files) == 1: |
279 | + if files[0].endswith('.ts'): |
280 | + # remove tombstones older than reclaim_age |
281 | + ts = files[0].rsplit('.', 1)[0] |
282 | + if (time.time() - float(ts)) > reclaim_age: |
283 | + os.unlink(join(hsh_path, files[0])) |
284 | + files.remove(files[0]) |
285 | + elif files: |
286 | + files.sort(reverse=True) |
287 | + meta = data = tomb = None |
288 | + for filename in files: |
289 | + if not meta and filename.endswith('.meta'): |
290 | + meta = filename |
291 | + if not data and filename.endswith('.data'): |
292 | + data = filename |
293 | + if not tomb and filename.endswith('.ts'): |
294 | + tomb = filename |
295 | + if (filename < tomb or # any file older than tomb |
296 | + filename < data or # any file older than data |
297 | + (filename.endswith('.meta') and |
298 | + filename < meta)): # old meta |
299 | + os.unlink(join(hsh_path, filename)) |
300 | + files.remove(filename) |
301 | + if not files: |
302 | + os.rmdir(hsh_path) |
303 | + for filename in files: |
304 | + md5.update(filename) |
305 | + try: |
306 | + os.rmdir(path) |
307 | + except OSError: |
308 | + pass |
309 | + return md5.hexdigest() |
310 | + |
311 | + |
312 | +def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK): |
313 | + """ |
314 | + Recalculates hashes for the given suffixes in the partition and updates |
315 | + them in the partition's hashes file. |
316 | + |
317 | + :param partition_dir: directory of the partition in which to recalculate |
318 | + :param suffixes: list of suffixes to recalculate |
319 | + :param reclaim_age: age in seconds at which tombstones should be removed |
320 | + """ |
321 | + |
322 | + def tpool_listdir(partition_dir): |
323 | + return dict(((suff, None) for suff in os.listdir(partition_dir) |
324 | + if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
325 | + hashes_file = join(partition_dir, HASH_FILE) |
326 | + with lock_path(partition_dir): |
327 | + try: |
328 | + with open(hashes_file, 'rb') as fp: |
329 | + hashes = pickle.load(fp) |
330 | + except Exception: |
331 | + hashes = tpool.execute(tpool_listdir, partition_dir) |
332 | + for suffix in suffixes: |
333 | + suffix_dir = join(partition_dir, suffix) |
334 | + if os.path.exists(suffix_dir): |
335 | + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
336 | + elif suffix in hashes: |
337 | + del hashes[suffix] |
338 | + with open(hashes_file + '.tmp', 'wb') as fp: |
339 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
340 | + renamer(hashes_file + '.tmp', hashes_file) |
341 | + |
342 | + |
343 | +def invalidate_hash(suffix_dir): |
344 | + """ |
345 | + Invalidates the hash for a suffix_dir in the partition's hashes file. |
346 | + |
347 | + :param suffix_dir: absolute path to suffix dir whose hash needs |
348 | + invalidating |
349 | + """ |
350 | + |
351 | + suffix = os.path.basename(suffix_dir) |
352 | + partition_dir = os.path.dirname(suffix_dir) |
353 | + hashes_file = join(partition_dir, HASH_FILE) |
354 | + with lock_path(partition_dir): |
355 | + try: |
356 | + with open(hashes_file, 'rb') as fp: |
357 | + hashes = pickle.load(fp) |
358 | + if suffix in hashes and not hashes[suffix]: |
359 | + return |
360 | + except Exception: |
361 | + return |
362 | + hashes[suffix] = None |
363 | + with open(hashes_file + '.tmp', 'wb') as fp: |
364 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
365 | + renamer(hashes_file + '.tmp', hashes_file) |
366 | + |
367 | + |
368 | +def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK): |
369 | + """ |
370 | + Get a list of hashes for the suffix dir. do_listdir causes it to mistrust |
371 | + the hash cache for suffix existence at the (unexpectedly high) cost of a |
372 | + listdir. reclaim_age is just passed on to hash_suffix. |
373 | + |
374 | + :param partition_dir: absolute path of partition to get hashes for |
375 | + :param do_listdir: force existence check for all hashes in the partition |
376 | + :param reclaim_age: age at which to remove tombstones |
377 | + |
378 | + :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) |
379 | + """ |
380 | + |
381 | + def tpool_listdir(hashes, partition_dir): |
382 | + return dict(((suff, hashes.get(suff, None)) |
383 | + for suff in os.listdir(partition_dir) |
384 | + if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
385 | + hashed = 0 |
386 | + hashes_file = join(partition_dir, HASH_FILE) |
387 | + with lock_path(partition_dir): |
388 | + modified = False |
389 | + hashes = {} |
390 | + try: |
391 | + with open(hashes_file, 'rb') as fp: |
392 | + hashes = pickle.load(fp) |
393 | + except Exception: |
394 | + do_listdir = True |
395 | + if do_listdir: |
396 | + hashes = tpool.execute(tpool_listdir, hashes, partition_dir) |
397 | + modified = True |
398 | + for suffix, hash_ in hashes.items(): |
399 | + if not hash_: |
400 | + suffix_dir = join(partition_dir, suffix) |
401 | + if os.path.exists(suffix_dir): |
402 | + try: |
403 | + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
404 | + hashed += 1 |
405 | + except OSError: |
406 | + logging.exception('Error hashing suffix') |
407 | + hashes[suffix] = None |
408 | + else: |
409 | + del hashes[suffix] |
410 | + modified = True |
411 | + sleep() |
412 | + if modified: |
413 | + with open(hashes_file + '.tmp', 'wb') as fp: |
414 | + pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
415 | + renamer(hashes_file + '.tmp', hashes_file) |
416 | + return hashed, hashes |
417 | |
418 | === modified file 'swift/obj/replicator.py' |
419 | --- swift/obj/replicator.py 2010-10-19 01:05:54 +0000 |
420 | +++ swift/obj/replicator.py 2010-10-19 18:43:49 +0000 |
421 | @@ -19,7 +19,6 @@ |
422 | import shutil |
423 | import time |
424 | import logging |
425 | -import hashlib |
426 | import itertools |
427 | import cPickle as pickle |
428 | |
429 | @@ -29,168 +28,16 @@ |
430 | from eventlet.support.greenlets import GreenletExit |
431 | |
432 | from swift.common.ring import Ring |
433 | -from swift.common.utils import whataremyips, unlink_older_than, lock_path, \ |
434 | - renamer, compute_eta, get_logger |
435 | +from swift.common.utils import compute_eta, get_logger, unlink_older_than, \ |
436 | + whataremyips |
437 | from swift.common.bufferedhttp import http_connect |
438 | from swift.common.daemon import Daemon |
439 | +from swift.obj.hashes import get_hashes, recalculate_hashes |
440 | +from swift.obj.server import DATADIR |
441 | + |
442 | |
443 | hubs.use_hub('poll') |
444 | |
445 | -PICKLE_PROTOCOL = 2 |
446 | -ONE_WEEK = 604800 |
447 | -HASH_FILE = 'hashes.pkl' |
448 | - |
449 | - |
450 | -def hash_suffix(path, reclaim_age): |
451 | - """ |
452 | - Performs reclamation and returns an md5 of all (remaining) files. |
453 | - |
454 | - :param reclaim_age: age in seconds at which to remove tombstones |
455 | - """ |
456 | - md5 = hashlib.md5() |
457 | - for hsh in sorted(os.listdir(path)): |
458 | - hsh_path = join(path, hsh) |
459 | - files = os.listdir(hsh_path) |
460 | - if len(files) == 1: |
461 | - if files[0].endswith('.ts'): |
462 | - # remove tombstones older than reclaim_age |
463 | - ts = files[0].rsplit('.', 1)[0] |
464 | - if (time.time() - float(ts)) > reclaim_age: |
465 | - os.unlink(join(hsh_path, files[0])) |
466 | - files.remove(files[0]) |
467 | - elif files: |
468 | - files.sort(reverse=True) |
469 | - meta = data = tomb = None |
470 | - for filename in files: |
471 | - if not meta and filename.endswith('.meta'): |
472 | - meta = filename |
473 | - if not data and filename.endswith('.data'): |
474 | - data = filename |
475 | - if not tomb and filename.endswith('.ts'): |
476 | - tomb = filename |
477 | - if (filename < tomb or # any file older than tomb |
478 | - filename < data or # any file older than data |
479 | - (filename.endswith('.meta') and |
480 | - filename < meta)): # old meta |
481 | - os.unlink(join(hsh_path, filename)) |
482 | - files.remove(filename) |
483 | - if not files: |
484 | - os.rmdir(hsh_path) |
485 | - for filename in files: |
486 | - md5.update(filename) |
487 | - try: |
488 | - os.rmdir(path) |
489 | - except OSError: |
490 | - pass |
491 | - return md5.hexdigest() |
492 | - |
493 | - |
494 | -def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK): |
495 | - """ |
496 | - Recalculates hashes for the given suffixes in the partition and updates |
497 | - them in the partition's hashes file. |
498 | - |
499 | - :param partition_dir: directory of the partition in which to recalculate |
500 | - :param suffixes: list of suffixes to recalculate |
501 | - :param reclaim_age: age in seconds at which tombstones should be removed |
502 | - """ |
503 | - |
504 | - def tpool_listdir(partition_dir): |
505 | - return dict(((suff, None) for suff in os.listdir(partition_dir) |
506 | - if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
507 | - hashes_file = join(partition_dir, HASH_FILE) |
508 | - with lock_path(partition_dir): |
509 | - try: |
510 | - with open(hashes_file, 'rb') as fp: |
511 | - hashes = pickle.load(fp) |
512 | - except Exception: |
513 | - hashes = tpool.execute(tpool_listdir, partition_dir) |
514 | - for suffix in suffixes: |
515 | - suffix_dir = join(partition_dir, suffix) |
516 | - if os.path.exists(suffix_dir): |
517 | - hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
518 | - elif suffix in hashes: |
519 | - del hashes[suffix] |
520 | - with open(hashes_file + '.tmp', 'wb') as fp: |
521 | - pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
522 | - renamer(hashes_file + '.tmp', hashes_file) |
523 | - |
524 | - |
525 | -def invalidate_hash(suffix_dir): |
526 | - """ |
527 | - Invalidates the hash for a suffix_dir in the partition's hashes file. |
528 | - |
529 | - :param suffix_dir: absolute path to suffix dir whose hash needs |
530 | - invalidating |
531 | - """ |
532 | - |
533 | - suffix = os.path.basename(suffix_dir) |
534 | - partition_dir = os.path.dirname(suffix_dir) |
535 | - hashes_file = join(partition_dir, HASH_FILE) |
536 | - with lock_path(partition_dir): |
537 | - try: |
538 | - with open(hashes_file, 'rb') as fp: |
539 | - hashes = pickle.load(fp) |
540 | - if suffix in hashes and not hashes[suffix]: |
541 | - return |
542 | - except Exception: |
543 | - return |
544 | - hashes[suffix] = None |
545 | - with open(hashes_file + '.tmp', 'wb') as fp: |
546 | - pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
547 | - renamer(hashes_file + '.tmp', hashes_file) |
548 | - |
549 | - |
550 | -def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK): |
551 | - """ |
552 | - Get a list of hashes for the suffix dir. do_listdir causes it to mistrust |
553 | - the hash cache for suffix existence at the (unexpectedly high) cost of a |
554 | - listdir. reclaim_age is just passed on to hash_suffix. |
555 | - |
556 | - :param partition_dir: absolute path of partition to get hashes for |
557 | - :param do_listdir: force existence check for all hashes in the partition |
558 | - :param reclaim_age: age at which to remove tombstones |
559 | - |
560 | - :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) |
561 | - """ |
562 | - |
563 | - def tpool_listdir(hashes, partition_dir): |
564 | - return dict(((suff, hashes.get(suff, None)) |
565 | - for suff in os.listdir(partition_dir) |
566 | - if len(suff) == 3 and isdir(join(partition_dir, suff)))) |
567 | - hashed = 0 |
568 | - hashes_file = join(partition_dir, HASH_FILE) |
569 | - with lock_path(partition_dir): |
570 | - modified = False |
571 | - hashes = {} |
572 | - try: |
573 | - with open(hashes_file, 'rb') as fp: |
574 | - hashes = pickle.load(fp) |
575 | - except Exception: |
576 | - do_listdir = True |
577 | - if do_listdir: |
578 | - hashes = tpool.execute(tpool_listdir, hashes, partition_dir) |
579 | - modified = True |
580 | - for suffix, hash_ in hashes.items(): |
581 | - if not hash_: |
582 | - suffix_dir = join(partition_dir, suffix) |
583 | - if os.path.exists(suffix_dir): |
584 | - try: |
585 | - hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) |
586 | - hashed += 1 |
587 | - except OSError: |
588 | - logging.exception('Error hashing suffix') |
589 | - hashes[suffix] = None |
590 | - else: |
591 | - del hashes[suffix] |
592 | - modified = True |
593 | - sleep() |
594 | - if modified: |
595 | - with open(hashes_file + '.tmp', 'wb') as fp: |
596 | - pickle.dump(hashes, fp, PICKLE_PROTOCOL) |
597 | - renamer(hashes_file + '.tmp', hashes_file) |
598 | - return hashed, hashes |
599 | - |
600 | |
601 | class ObjectReplicator(Daemon): |
602 | """ |
603 | @@ -301,8 +148,9 @@ |
604 | had_any = True |
605 | if not had_any: |
606 | return False |
607 | - args.append(join(rsync_module, node['device'], |
608 | - 'objects', job['partition'])) |
609 | + objdir = job.get('segments') and SEGMENTSDIR or DATADIR |
610 | + args.append(join(rsync_module, node['device'], objdir, |
611 | + job['partition'])) |
612 | return self._rsync(args) == 0 |
613 | |
614 | def check_ring(self): |
615 | @@ -337,12 +185,15 @@ |
616 | for node in job['nodes']: |
617 | success = self.rsync(node, job, suffixes) |
618 | if success: |
619 | + headers = {'Content-Length': '0'} |
620 | + if job.get('segments'): |
621 | + headers['X-Object-Type'] = 'segment' |
622 | with Timeout(self.http_timeout): |
623 | http_connect(node['ip'], |
624 | node['port'], |
625 | node['device'], job['partition'], 'REPLICATE', |
626 | '/' + '-'.join(suffixes), |
627 | - headers={'Content-Length': '0'}).getresponse().read() |
628 | + headers=headers).getresponse().read() |
629 | responses.append(success) |
630 | if not suffixes or (len(responses) == \ |
631 | self.object_ring.replica_count and all(responses)): |
632 | @@ -374,10 +225,13 @@ |
633 | node = next(nodes) |
634 | attempts_left -= 1 |
635 | try: |
636 | + headers = {'Content-Length': '0'} |
637 | + if job.get('segments'): |
638 | + headers['X-Object-Type'] = 'segment' |
639 | with Timeout(self.http_timeout): |
640 | resp = http_connect(node['ip'], node['port'], |
641 | node['device'], job['partition'], 'REPLICATE', |
642 | - '', headers={'Content-Length': '0'}).getresponse() |
643 | + '', headers=headers).getresponse() |
644 | if resp.status == 507: |
645 | self.logger.error('%s/%s responded as unmounted' % |
646 | (node['ip'], node['device'])) |
647 | @@ -397,11 +251,14 @@ |
648 | self.rsync(node, job, suffixes) |
649 | recalculate_hashes(job['path'], suffixes, |
650 | reclaim_age=self.reclaim_age) |
651 | + headers = {'Content-Length': '0'} |
652 | + if job.get('segments'): |
653 | + headers['X-Object-Type'] = 'segment' |
654 | with Timeout(self.http_timeout): |
655 | conn = http_connect(node['ip'], node['port'], |
656 | node['device'], job['partition'], 'REPLICATE', |
657 | '/' + '-'.join(suffixes), |
658 | - headers={'Content-Length': '0'}) |
659 | + headers=headers) |
660 | conn.getresponse().read() |
661 | self.suffix_sync += len(suffixes) |
662 | except (Exception, Timeout): |
663 | @@ -489,24 +346,27 @@ |
664 | dev for dev in self.object_ring.devs |
665 | if dev and dev['ip'] in ips and dev['port'] == self.port]: |
666 | dev_path = join(self.devices_dir, local_dev['device']) |
667 | - obj_path = join(dev_path, 'objects') |
668 | - tmp_path = join(dev_path, 'tmp') |
669 | if self.mount_check and not os.path.ismount(dev_path): |
670 | self.logger.warn('%s is not mounted' % local_dev['device']) |
671 | continue |
672 | + tmp_path = join(dev_path, 'tmp') |
673 | unlink_older_than(tmp_path, time.time() - self.reclaim_age) |
674 | - if not os.path.exists(obj_path): |
675 | - continue |
676 | - for partition in os.listdir(obj_path): |
677 | - try: |
678 | - nodes = [node for node in |
679 | - self.object_ring.get_part_nodes(int(partition)) |
680 | - if node['id'] != local_dev['id']] |
681 | - jobs.append(dict(path=join(obj_path, partition), |
682 | - nodes=nodes, delete=len(nodes) > 2, |
683 | - partition=partition)) |
684 | - except ValueError: |
685 | - continue |
686 | + for objdir in (DATADIR, SEGMENTSDIR): |
687 | + obj_path = join(dev_path, objdir) |
688 | + if os.path.exists(obj_path): |
689 | + for partition in os.listdir(obj_path): |
690 | + try: |
691 | + nodes = [node for node in |
692 | + self.object_ring.get_part_nodes( |
693 | + int(partition)) |
694 | + if node['id'] != local_dev['id']] |
695 | + jobs.append(dict( |
696 | + path=join(obj_path, partition), |
697 | + nodes=nodes, delete=len(nodes) > 2, |
698 | + partition=partition, |
699 | + segments=(objdir == SEGMENTSDIR))) |
700 | + except ValueError: |
701 | + continue |
702 | random.shuffle(jobs) |
703 | # Partititons that need to be deleted take priority |
704 | jobs.sort(key=lambda job: not job['delete']) |
705 | |
706 | === modified file 'swift/obj/server.py' |
707 | --- swift/obj/server.py 2010-10-19 01:05:54 +0000 |
708 | +++ swift/obj/server.py 2010-10-19 18:43:49 +0000 |
709 | @@ -42,12 +42,12 @@ |
710 | from swift.common.constraints import check_object_creation, check_mount, \ |
711 | check_float, check_utf8 |
712 | from swift.common.exceptions import ConnectionTimeout |
713 | -from swift.obj.replicator import get_hashes, invalidate_hash, \ |
714 | - recalculate_hashes |
715 | +from swift.obj.hashes import get_hashes, invalidate_hash, recalculate_hashes |
716 | |
717 | |
718 | DATADIR = 'objects' |
719 | -ASYNCDIR = 'async_pending' |
720 | +SEGMENTSDIR = 'object_segments' |
721 | +ASYNCDIR = 'object_async' |
722 | PICKLE_PROTOCOL = 2 |
723 | METADATA_KEY = 'user.swift.metadata' |
724 | MAX_OBJECT_NAME_LENGTH = 1024 |
725 | @@ -84,15 +84,31 @@ |
726 | :param obj: object name for the object |
727 | :param keep_data_fp: if True, don't close the fp, otherwise close it |
728 | :param disk_chunk_Size: size of chunks on file reads |
729 | + :param segment: If set to not None, indicates which segment of an object |
730 | + this file represents |
731 | + :param segment_timestamp: X-Timestamp of the object's segments (set on the |
732 | + PUT, not changed on POSTs), required if segment |
733 | + is set to not None |
734 | """ |
735 | |
736 | def __init__(self, path, device, partition, account, container, obj, |
737 | - keep_data_fp=False, disk_chunk_size=65536): |
738 | + keep_data_fp=False, disk_chunk_size=65536, segment=None, |
739 | + segment_timestamp=None): |
740 | self.disk_chunk_size = disk_chunk_size |
741 | self.name = '/' + '/'.join((account, container, obj)) |
742 | - name_hash = hash_path(account, container, obj) |
743 | - self.datadir = os.path.join(path, device, |
744 | - storage_directory(DATADIR, partition, name_hash)) |
745 | + if segment and int(segment): |
746 | + ring_obj = '%s/%s/%s' % (obj, segment_timestamp, segment) |
747 | + else: |
748 | + ring_obj = obj |
749 | + name_hash = hash_path(account, container, ring_obj) |
750 | + if segment is not None: |
751 | + self.datadir = os.path.join(path, device, |
752 | + storage_directory(SEGMENTSDIR, partition, name_hash)) |
753 | + self.no_longer_segment_datadir = os.path.join(path, device, |
754 | + storage_directory(DATADIR, partition, name_hash)) |
755 | + else: |
756 | + self.datadir = os.path.join(path, device, |
757 | + storage_directory(DATADIR, partition, name_hash)) |
758 | self.tmpdir = os.path.join(path, device, 'tmp') |
759 | self.metadata = {} |
760 | self.meta_file = None |
761 | @@ -195,7 +211,8 @@ |
762 | except OSError: |
763 | pass |
764 | |
765 | - def put(self, fd, tmppath, metadata, extension='.data'): |
766 | + def put(self, fd, tmppath, metadata, extension='.data', |
767 | + no_longer_segment=False): |
768 | """ |
769 | Finalize writing the file on disk, and renames it from the temp file to |
770 | the real location. This should be called after the data has been |
771 | @@ -204,7 +221,10 @@ |
772 | :params fd: file descriptor of the temp file |
773 | :param tmppath: path to the temporary file being used |
774 | :param metadata: dictionary of metada to be written |
775 | - :param extention: extension to be used when making the file |
776 | + :param extension: extension to be used when making the file |
777 | + :param no_longer_segment: Set to True if this was originally an object |
778 | + segment but no longer is (case with chunked transfer encoding when |
779 | + the object ends up less than the segment size) |
780 | """ |
781 | metadata['name'] = self.name |
782 | timestamp = normalize_timestamp(metadata['X-Timestamp']) |
783 | @@ -217,6 +237,8 @@ |
784 | if 'Content-Length' in metadata: |
785 | drop_buffer_cache(fd, 0, int(metadata['Content-Length'])) |
786 | os.fsync(fd) |
787 | + if no_longer_segment: |
788 | + self.datadir = self.no_longer_segment_datadir |
789 | invalidate_hash(os.path.dirname(self.datadir)) |
790 | renamer(tmppath, os.path.join(self.datadir, timestamp + extension)) |
791 | self.metadata = metadata |
792 | @@ -355,7 +377,9 @@ |
793 | if error_response: |
794 | return error_response |
795 | file = DiskFile(self.devices, device, partition, account, container, |
796 | - obj, disk_chunk_size=self.disk_chunk_size) |
797 | + obj, disk_chunk_size=self.disk_chunk_size, |
798 | + segment=request.headers.get('x-object-segment'), |
799 | + segment_timestamp=request.headers['x-timestamp']) |
800 | upload_expiration = time.time() + self.max_upload_time |
801 | etag = md5() |
802 | upload_size = 0 |
803 | @@ -397,17 +421,32 @@ |
804 | if 'content-encoding' in request.headers: |
805 | metadata['Content-Encoding'] = \ |
806 | request.headers['Content-Encoding'] |
807 | - file.put(fd, tmppath, metadata) |
808 | + if 'x-object-type' in request.headers: |
809 | + metadata['X-Object-Type'] = request.headers['x-object-type'] |
810 | + if 'x-object-segment' in request.headers: |
811 | + metadata['X-Object-Segment'] = \ |
812 | + request.headers['x-object-segment'] |
813 | + no_longer_segment = False |
814 | + if 'x-object-segment-if-length' in request.headers and \ |
815 | + int(request.headers['x-object-segment-if-length']) != \ |
816 | + os.fstat(fd).st_size: |
817 | + del metadata['X-Object-Type'] |
818 | + del metadata['X-Object-Segment'] |
819 | + no_longer_segment = True |
820 | + file.put(fd, tmppath, metadata, |
821 | + no_longer_segment=no_longer_segment) |
822 | file.unlinkold(metadata['X-Timestamp']) |
823 | - self.container_update('PUT', account, container, obj, request.headers, |
824 | - {'x-size': file.metadata['Content-Length'], |
825 | - 'x-content-type': file.metadata['Content-Type'], |
826 | - 'x-timestamp': file.metadata['X-Timestamp'], |
827 | - 'x-etag': file.metadata['ETag'], |
828 | - 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
829 | - device) |
830 | - resp = HTTPCreated(request=request, etag=etag) |
831 | - return resp |
832 | + if 'X-Object-Segment' not in file.metadata: |
833 | + self.container_update('PUT', account, container, obj, |
834 | + request.headers, |
835 | + {'x-size': request.headers.get('x-object-length', |
836 | + file.metadata['Content-Length']), |
837 | + 'x-content-type': file.metadata['Content-Type'], |
838 | + 'x-timestamp': file.metadata['X-Timestamp'], |
839 | + 'x-etag': file.metadata['ETag'], |
840 | + 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
841 | + device) |
842 | + return HTTPCreated(request=request, etag=etag) |
843 | |
844 | def GET(self, request): |
845 | """Handle HTTP GET requests for the Swift Object Server.""" |
846 | @@ -420,7 +459,9 @@ |
847 | if self.mount_check and not check_mount(self.devices, device): |
848 | return Response(status='507 %s is not mounted' % device) |
849 | file = DiskFile(self.devices, device, partition, account, container, |
850 | - obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size) |
851 | + obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size, |
852 | + segment=request.headers.get('x-object-segment'), |
853 | + segment_timestamp=request.headers.get('x-object-segment-timestamp')) |
854 | if file.is_deleted(): |
855 | if request.headers.get('if-match') == '*': |
856 | return HTTPPreconditionFailed(request=request) |
857 | @@ -460,7 +501,8 @@ |
858 | 'application/octet-stream'), app_iter=file, |
859 | request=request, conditional_response=True) |
860 | for key, value in file.metadata.iteritems(): |
861 | - if key.lower().startswith('x-object-meta-'): |
862 | + if key.lower().startswith('x-object-meta-') or \ |
863 | + key.lower() in ('x-object-type', 'x-object-segment'): |
864 | response.headers[key] = value |
865 | response.etag = file.metadata['ETag'] |
866 | response.last_modified = float(file.metadata['X-Timestamp']) |
867 | @@ -482,13 +524,16 @@ |
868 | if self.mount_check and not check_mount(self.devices, device): |
869 | return Response(status='507 %s is not mounted' % device) |
870 | file = DiskFile(self.devices, device, partition, account, container, |
871 | - obj, disk_chunk_size=self.disk_chunk_size) |
872 | + obj, disk_chunk_size=self.disk_chunk_size, |
873 | + segment=request.headers.get('x-object-segment'), |
874 | + segment_timestamp=request.headers.get('x-object-segment-timestamp')) |
875 | if file.is_deleted(): |
876 | return HTTPNotFound(request=request) |
877 | response = Response(content_type=file.metadata['Content-Type'], |
878 | request=request, conditional_response=True) |
879 | for key, value in file.metadata.iteritems(): |
880 | - if key.lower().startswith('x-object-meta-'): |
881 | + if key.lower().startswith('x-object-meta-') or \ |
882 | + key.lower() in ('x-object-type', 'x-object-segment'): |
883 | response.headers[key] = value |
884 | response.etag = file.metadata['ETag'] |
885 | response.last_modified = float(file.metadata['X-Timestamp']) |
886 | @@ -513,7 +558,9 @@ |
887 | return Response(status='507 %s is not mounted' % device) |
888 | response_class = HTTPNoContent |
889 | file = DiskFile(self.devices, device, partition, account, container, |
890 | - obj, disk_chunk_size=self.disk_chunk_size) |
891 | + obj, disk_chunk_size=self.disk_chunk_size, |
892 | + segment=request.headers.get('x-object-segment'), |
893 | + segment_timestamp=request.headers.get('x-object-segment-timestamp')) |
894 | if file.is_deleted(): |
895 | response_class = HTTPNotFound |
896 | metadata = { |
897 | @@ -522,10 +569,11 @@ |
898 | with file.mkstemp() as (fd, tmppath): |
899 | file.put(fd, tmppath, metadata, extension='.ts') |
900 | file.unlinkold(metadata['X-Timestamp']) |
901 | - self.container_update('DELETE', account, container, obj, |
902 | - request.headers, {'x-timestamp': metadata['X-Timestamp'], |
903 | - 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
904 | - device) |
905 | + if 'x-object-segment' not in request.headers: |
906 | + self.container_update('DELETE', account, container, obj, |
907 | + request.headers, {'x-timestamp': metadata['X-Timestamp'], |
908 | + 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')}, |
909 | + device) |
910 | resp = response_class(request=request) |
911 | return resp |
912 | |
913 | @@ -538,7 +586,10 @@ |
914 | unquote(request.path), 2, 3, True) |
915 | if self.mount_check and not check_mount(self.devices, device): |
916 | return Response(status='507 %s is not mounted' % device) |
917 | - path = os.path.join(self.devices, device, DATADIR, partition) |
918 | + if request.headers.get('x-object-type') == 'segment': |
919 | + path = os.path.join(self.devices, device, SEGMENTSDIR, partition) |
920 | + else: |
921 | + path = os.path.join(self.devices, device, DATADIR, partition) |
922 | if not os.path.exists(path): |
923 | mkdirs(path) |
924 | if suffix: |
925 | |
926 | === modified file 'swift/proxy/server.py' |
927 | --- swift/proxy/server.py 2010-10-15 15:07:19 +0000 |
928 | +++ swift/proxy/server.py 2010-10-19 18:43:49 +0000 |
929 | @@ -14,21 +14,25 @@ |
930 | # limitations under the License. |
931 | |
932 | from __future__ import with_statement |
933 | +try: |
934 | + import simplejson as json |
935 | +except ImportError: |
936 | + import json |
937 | import mimetypes |
938 | import os |
939 | import time |
940 | import traceback |
941 | from ConfigParser import ConfigParser |
942 | +from hashlib import md5 |
943 | from urllib import unquote, quote |
944 | import uuid |
945 | import functools |
946 | |
947 | from eventlet.timeout import Timeout |
948 | -from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ |
949 | - HTTPNotFound, HTTPPreconditionFailed, \ |
950 | - HTTPRequestTimeout, HTTPServiceUnavailable, \ |
951 | - HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \ |
952 | - status_map |
953 | +from webob.exc import HTTPBadRequest, HTTPCreated, HTTPInternalServerError, \ |
954 | + HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ |
955 | + HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \ |
956 | + HTTPServiceUnavailable, HTTPUnprocessableEntity, status_map |
957 | from webob import Request, Response |
958 | |
959 | from swift.common.ring import Ring |
960 | @@ -37,7 +41,7 @@ |
961 | from swift.common.bufferedhttp import http_connect |
962 | from swift.common.constraints import check_metadata, check_object_creation, \ |
963 | check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \ |
964 | - MAX_FILE_SIZE |
965 | + MAX_FILE_SIZE, SEGMENT_SIZE |
966 | from swift.common.exceptions import ChunkReadTimeout, \ |
967 | ChunkWriteTimeout, ConnectionTimeout |
968 | |
969 | @@ -89,6 +93,135 @@ |
970 | return wrapped |
971 | |
972 | |
973 | +class SegmentedIterable(object): |
974 | + """ |
975 | + Iterable that returns the object contents for a segmented object in Swift. |
976 | + |
977 | + In addition to these params, you can also set the `response` attr just |
978 | + after creating the SegmentedIterable and it will update the response's |
979 | + `bytes_transferred` value. Be sure to set the `bytes_transferred` value to |
980 | + 0 beforehand. |
981 | + |
982 | + :param controller: The ObjectController instance to work with. |
983 | + :param content_length: The total length of the object. |
984 | + :param segment_size: The length of each segment (except perhaps the last) |
985 | + of the object. |
986 | + :param timestamp: The X-Timestamp of the object's segments (set on the PUT, |
987 | + not changed on the POSTs). |
988 | + """ |
989 | + |
990 | + def __init__(self, controller, content_length, segment_size, timestamp): |
991 | + self.controller = controller |
992 | + self.content_length = content_length |
993 | + self.segment_size = segment_size |
994 | + self.timestamp = timestamp |
995 | + self.position = 0 |
996 | + self.segment = -1 |
997 | + self.segment_iter = None |
998 | + self.response = None |
999 | + |
1000 | + def load_next_segment(self): |
1001 | + """ Loads the self.segment_iter with the next segment's contents. """ |
1002 | + self.segment += 1 |
1003 | + if self.segment: |
1004 | + ring_object_name = '%s/%s/%s' % (self.controller.object_name, |
1005 | + self.timestamp, self.segment) |
1006 | + else: |
1007 | + ring_object_name = self.controller.object_name |
1008 | + partition, nodes = self.controller.app.object_ring.get_nodes( |
1009 | + self.controller.account_name, self.controller.container_name, |
1010 | + ring_object_name) |
1011 | + path = '/%s/%s/%s' % (self.controller.account_name, |
1012 | + self.controller.container_name, self.controller.object_name) |
1013 | + req = Request.blank(path, headers={'X-Object-Segment': self.segment, |
1014 | + 'X-Object-Segment-Timestamp': self.timestamp}) |
1015 | + resp = self.controller.GETorHEAD_base(req, 'Object', |
1016 | + partition, self.controller.iter_nodes(partition, nodes, |
1017 | + self.controller.app.object_ring), path, |
1018 | + self.controller.app.object_ring.replica_count) |
1019 | + if resp.status_int // 100 != 2: |
1020 | + raise Exception( |
1021 | + 'Could not load segment %s of %s' % (self.segment, path)) |
1022 | + self.segment_iter = resp.app_iter |
1023 | + |
1024 | + def __iter__(self): |
1025 | + """ Standard iterator function that returns the object's contents. """ |
1026 | + while self.position < self.content_length: |
1027 | + if not self.segment_iter: |
1028 | + self.load_next_segment() |
1029 | + while True: |
1030 | + with ChunkReadTimeout(self.controller.app.node_timeout): |
1031 | + try: |
1032 | + chunk = self.segment_iter.next() |
1033 | + break |
1034 | + except StopIteration: |
1035 | + self.load_next_segment() |
1036 | + if self.position + len(chunk) > self.content_length: |
1037 | + chunk = chunk[:self.content_length - self.position] |
1038 | + self.position += len(chunk) |
1039 | + if self.response: |
1040 | + self.response.bytes_transferred += len(chunk) |
1041 | + yield chunk |
1042 | + |
1043 | + def app_iter_range(self, start, stop): |
1044 | + """ |
1045 | + Non-standard iterator function for use with Webob in serving Range |
1046 | + requests more quickly. |
1047 | + |
1048 | + TODO: |
1049 | + |
1050 | + This currently helps on speed by jumping to the proper segment to start |
1051 | + with (and ending without reading the trailing segments, but that |
1052 | + already happened technically with __iter__). |
1053 | + |
1054 | + But, what it does not do yet is issue a Range request with the first |
1055 | + segment to allow the object server to seek to the segment start point. |
1056 | + |
1057 | + Instead, it just reads and throws away all leading segment data. Since |
1058 | + segments are 5G by default, it'll have to transfer the whole 5G from |
1059 | + the object server to the proxy server even if it only needs the last |
1060 | + byte. In practice, this should happen fairly quickly relative to how |
1061 | + long requests take for these very large files; but it's still wasteful. |
1062 | + |
1063 | + Anyway, it shouldn't be too hard to implement, I just have other things |
1064 | + to work out first. |
1065 | + |
1066 | + :param start: The first byte (zero-based) to return. |
1067 | + :param stop: The last byte (zero-based) to return. |
1068 | + """ |
1069 | + if start: |
1070 | + self.segment = (start / self.segment_size) - 1 |
1071 | + self.load_next_segment() |
1072 | + self.position = self.segment * self.segment_size |
1073 | + segment_start = start - (self.segment * self.segment_size) |
1074 | + while segment_start: |
1075 | + with ChunkReadTimeout(self.controller.app.node_timeout): |
1076 | + chunk = self.segment_iter.next() |
1077 | + self.position += len(chunk) |
1078 | + if len(chunk) > segment_start: |
1079 | + chunk = chunk[segment_start:] |
1080 | + if self.response: |
1081 | + self.response.bytes_transferred += len(chunk) |
1082 | + yield chunk |
1083 | + segment_start = 0 |
1084 | + else: |
1085 | + segment_start -= len(chunk) |
1086 | + if stop is not None: |
1087 | + length = stop - start |
1088 | + else: |
1089 | + length = None |
1090 | + for chunk in self: |
1091 | + if length is not None: |
1092 | + length -= len(chunk) |
1093 | + if length < 0: |
1094 | + # Chop off the extra: |
1095 | + if self.response: |
1096 | + self.response.bytes_transferred -= length |
1097 | + yield chunk[:length] |
1098 | + break |
1099 | + yield chunk |
1100 | + |
1101 | + |
1102 | def get_container_memcache_key(account, container): |
1103 | path = '/%s/%s' % (account, container) |
1104 | return 'container%s' % path |
1105 | @@ -518,11 +651,56 @@ |
1106 | aresp = req.environ['swift.authorize'](req) |
1107 | if aresp: |
1108 | return aresp |
1109 | + # This is bit confusing, so an explanation: |
1110 | + # * First we attempt the GET/HEAD normally, as this is the usual case. |
1111 | + # * If the request was a Range request and gave us a 416 Unsatisfiable |
1112 | + # response, we might be trying to do an invalid Range on a manifest |
1113 | + # object, so we try again with no Range. |
1114 | + # * If it turns out we have a manifest object, and we had a Range |
1115 | + # request originally that actually succeeded or we had a HEAD |
1116 | + # request, we have to do the request again as a full GET because |
1117 | + # we'll need the whole manifest. |
1118 | + # * Finally, if we had a manifest object, we pass it and the request |
1119 | + # off to GETorHEAD_segmented; otherwise we just return the response. |
1120 | partition, nodes = self.app.object_ring.get_nodes( |
1121 | self.account_name, self.container_name, self.object_name) |
1122 | - return self.GETorHEAD_base(req, 'Object', partition, |
1123 | + resp = mresp = self.GETorHEAD_base(req, 'Object', partition, |
1124 | + self.iter_nodes(partition, nodes, self.app.object_ring), |
1125 | + req.path_info, self.app.object_ring.replica_count) |
1126 | + range_value = None |
1127 | + if mresp.status_int == 416: |
1128 | + range_value = req.range |
1129 | + req.range = None |
1130 | + mresp = self.GETorHEAD_base(req, 'Object', partition, |
1131 | self.iter_nodes(partition, nodes, self.app.object_ring), |
1132 | req.path_info, self.app.object_ring.replica_count) |
1133 | + if mresp.status_int // 100 != 2: |
1134 | + return resp |
1135 | + if 'x-object-type' in mresp.headers: |
1136 | + if mresp.headers['x-object-type'] == 'manifest': |
1137 | + if req.method == 'HEAD': |
1138 | + req.method = 'GET' |
1139 | + mresp = self.GETorHEAD_base(req, 'Object', partition, |
1140 | + self.iter_nodes(partition, nodes, |
1141 | + self.app.object_ring), req.path_info, |
1142 | + self.app.object_ring.replica_count) |
1143 | + if mresp.status_int // 100 != 2: |
1144 | + return mresp |
1145 | + req.method = 'HEAD' |
1146 | + elif req.range: |
1147 | + range_value = req.range |
1148 | + req.range = None |
1149 | + mresp = self.GETorHEAD_base(req, 'Object', partition, |
1150 | + self.iter_nodes(partition, nodes, |
1151 | + self.app.object_ring), req.path_info, |
1152 | + self.app.object_ring.replica_count) |
1153 | + if mresp.status_int // 100 != 2: |
1154 | + return mresp |
1155 | + if range_value: |
1156 | + req.range = range_value |
1157 | + return self.GETorHEAD_segmented(req, mresp) |
1158 | + return HTTPNotFound(request=req) |
1159 | + return resp |
1160 | |
1161 | @public |
1162 | @delay_denial |
1163 | @@ -536,6 +714,48 @@ |
1164 | """Handler for HTTP HEAD requests.""" |
1165 | return self.GETorHEAD(req) |
1166 | |
1167 | + def GETorHEAD_segmented(self, req, mresp): |
1168 | + """ |
1169 | + Performs a GET for a segmented object. |
1170 | + |
1171 | + :param req: The webob.Request to process. |
1172 | + :param mresp: The webob.Response for the original manifest request. |
1173 | + :returns: webob.Response object. |
1174 | + """ |
1175 | + manifest = json.loads(''.join(mresp.app_iter)) |
1176 | + # Ah, the fun of JSONing strs and getting unicodes back. We |
1177 | + # reencode to UTF8 to ensure crap doesn't blow up everywhere |
1178 | + # else. |
1179 | + keys_to_encode = [] |
1180 | + for k, v in manifest.iteritems(): |
1181 | + if isinstance(k, unicode): |
1182 | + keys_to_encode.append(k) |
1183 | + if isinstance(v, unicode): |
1184 | + manifest[k] = v.encode('utf8') |
1185 | + for k in keys_to_encode: |
1186 | + v = manifest[k] |
1187 | + del manifest[k] |
1188 | + manifest[k.encode('utf8')] = v |
1189 | + content_length = int(manifest['content-length']) |
1190 | + segment_size = int(manifest['x-segment-size']) |
1191 | + headers = dict(mresp.headers) |
1192 | + headers.update(manifest) |
1193 | + del headers['x-segment-size'] |
1194 | + resp = Response(app_iter=SegmentedIterable(self, content_length, |
1195 | + segment_size, manifest['x-timestamp']), headers=headers, |
1196 | + request=req, conditional_response=True) |
1197 | + resp.headers['etag'] = manifest['etag'].strip('"') |
1198 | + resp.last_modified = mresp.last_modified |
1199 | + resp.content_length = int(manifest['content-length']) |
1200 | + resp.content_type = manifest['content-type'] |
1201 | + if 'content-encoding' in manifest: |
1202 | + resp.content_encoding = manifest['content-encoding'] |
1203 | + cresp = req.get_response(resp) |
1204 | + # Needed for SegmentedIterable to update bytes_transferred |
1205 | + cresp.bytes_transferred = 0 |
1206 | + resp.app_iter.response = cresp |
1207 | + return cresp |
1208 | + |
1209 | @public |
1210 | @delay_denial |
1211 | def POST(self, req): |
1212 | @@ -652,11 +872,47 @@ |
1213 | if k.lower().startswith('x-object-meta-'): |
1214 | new_req.headers[k] = v |
1215 | req = new_req |
1216 | + if req.headers.get('transfer-encoding') == 'chunked' or \ |
1217 | + req.content_length > SEGMENT_SIZE: |
1218 | + resp = self.PUT_segmented_object(req, data_source, partition, |
1219 | + nodes, container_partition, containers) |
1220 | + else: |
1221 | + resp = self.PUT_whole_object(req, data_source, partition, nodes, |
1222 | + container_partition, containers) |
1223 | + if 'x-copy-from' in req.headers: |
1224 | + resp.headers['X-Copied-From'] = req.headers['x-copy-from'] |
1225 | + for k, v in req.headers.items(): |
1226 | + if k.lower().startswith('x-object-meta-'): |
1227 | + resp.headers[k] = v |
1228 | + resp.last_modified = float(req.headers['X-Timestamp']) |
1229 | + return resp |
1230 | + |
1231 | + def PUT_whole_object(self, req, data_source, partition, nodes, |
1232 | + container_partition=None, containers=None): |
1233 | + """ |
1234 | + Performs a PUT for a whole object (one with a content-length <= |
1235 | + SEGMENT_SIZE). |
1236 | + |
1237 | + :param req: The webob.Request to process. |
1238 | + :param data_source: An iterator providing the data to store. |
1239 | + :param partition: The object ring partition the object falls on. |
1240 | + :param nodes: The object ring nodes the object falls on. |
1241 | + :param container_partition: The container ring partition the container |
1242 | + for the object falls on, None if the |
1243 | + container is not to be updated. |
1244 | + :param containers: The container ring nodes the container for the |
1245 | + object falls on, None if the container is not to be |
1246 | + updated. |
1247 | + :returns: webob.Response object. |
1248 | + """ |
1249 | + conns = [] |
1250 | + update_containers = containers is not None |
1251 | for node in self.iter_nodes(partition, nodes, self.app.object_ring): |
1252 | - container = containers.pop() |
1253 | - req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container |
1254 | - req.headers['X-Container-Partition'] = container_partition |
1255 | - req.headers['X-Container-Device'] = container['device'] |
1256 | + if update_containers: |
1257 | + container = containers.pop() |
1258 | + req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container |
1259 | + req.headers['X-Container-Partition'] = container_partition |
1260 | + req.headers['X-Container-Device'] = container['device'] |
1261 | req.headers['Expect'] = '100-continue' |
1262 | resp = conn = None |
1263 | if not self.error_limited(node): |
1264 | @@ -674,12 +930,14 @@ |
1265 | if conn and resp: |
1266 | if resp.status == 100: |
1267 | conns.append(conn) |
1268 | - if not containers: |
1269 | + if (update_containers and not containers) or \ |
1270 | + len(conns) == len(nodes): |
1271 | break |
1272 | continue |
1273 | elif resp.status == 507: |
1274 | self.error_limit(node) |
1275 | - containers.insert(0, container) |
1276 | + if update_containers: |
1277 | + containers.insert(0, container) |
1278 | if len(conns) <= len(nodes) / 2: |
1279 | self.app.logger.error( |
1280 | 'Object PUT returning 503, %s/%s required connections, ' |
1281 | @@ -765,15 +1023,123 @@ |
1282 | statuses.append(503) |
1283 | reasons.append('') |
1284 | bodies.append('') |
1285 | - resp = self.best_response(req, statuses, reasons, bodies, 'Object PUT', |
1286 | + return self.best_response(req, statuses, reasons, bodies, 'Object PUT', |
1287 | etag=etag) |
1288 | - if 'x-copy-from' in req.headers: |
1289 | - resp.headers['X-Copied-From'] = req.headers['x-copy-from'] |
1290 | - for k, v in req.headers.items(): |
1291 | - if k.lower().startswith('x-object-meta-'): |
1292 | - resp.headers[k] = v |
1293 | - resp.last_modified = float(req.headers['X-Timestamp']) |
1294 | - return resp |
1295 | + |
1296 | + def PUT_segmented_object(self, req, data_source, partition, nodes, |
1297 | + container_partition, containers): |
1298 | + """ |
1299 | + Performs a PUT for a segmented object (one with a content-length > |
1300 | + SEGMENT_SIZE). |
1301 | + |
1302 | + :param req: The webob.Request to process. |
1303 | + :param data_source: An iterator providing the data to store. |
1304 | + :param partition: The object ring partition the object falls on. |
1305 | + :param nodes: The object ring nodes the object falls on. |
1306 | + :param container_partition: The container ring partition the container |
1307 | + for the object falls on. |
1308 | + :param containers: The container ring nodes the container for the |
1309 | + object falls on. |
1310 | + :returns: webob.Response object. |
1311 | + """ |
1312 | + req.bytes_transferred = 0 |
1313 | + leftover_chunk = [None] |
1314 | + etag = md5() |
1315 | + def segment_iter(): |
1316 | + amount_given = 0 |
1317 | + while amount_given < SEGMENT_SIZE: |
1318 | + if leftover_chunk[0]: |
1319 | + chunk = leftover_chunk[0] |
1320 | + leftover_chunk[0] = None |
1321 | + else: |
1322 | + with ChunkReadTimeout(self.app.client_timeout): |
1323 | + chunk = data_source.next() |
1324 | + req.bytes_transferred += len(chunk) |
1325 | + etag.update(chunk) |
1326 | + if amount_given + len(chunk) > SEGMENT_SIZE: |
1327 | + yield chunk[:SEGMENT_SIZE - amount_given] |
1328 | + leftover_chunk[0] = chunk[SEGMENT_SIZE - amount_given:] |
1329 | + amount_given = SEGMENT_SIZE |
1330 | + else: |
1331 | + yield chunk |
1332 | + amount_given += len(chunk) |
1333 | + def segment_iter_iter(): |
1334 | + while True: |
1335 | + if not leftover_chunk[0]: |
1336 | + with ChunkReadTimeout(self.app.client_timeout): |
1337 | + leftover_chunk[0] = data_source.next() |
1338 | + req.bytes_transferred += len(leftover_chunk[0]) |
1339 | + etag.update(leftover_chunk[0]) |
1340 | + yield segment_iter() |
1341 | + segment_number = 0 |
1342 | + chunked = req.headers.get('transfer-encoding') == 'chunked' |
1343 | + if not chunked: |
1344 | + amount_left = req.content_length |
1345 | + headers = {'X-Timestamp': req.headers['X-Timestamp'], |
1346 | + 'Content-Type': req.headers['content-type'], |
1347 | + 'X-Object-Type': 'segment'} |
1348 | + for segment_source in segment_iter_iter(): |
1349 | + if chunked: |
1350 | + headers['Transfer-Encoding'] = 'chunked' |
1351 | + if segment_number == 0: |
1352 | + headers['X-Object-Segment-If-Length'] = SEGMENT_SIZE |
1353 | + elif amount_left > SEGMENT_SIZE: |
1354 | + headers['Content-Length'] = SEGMENT_SIZE |
1355 | + else: |
1356 | + headers['Content-Length'] = amount_left |
1357 | + headers['X-Object-Segment'] = segment_number |
1358 | + segment_req = Request.blank(req.path_info, |
1359 | + environ={'REQUEST_METHOD': 'PUT'}, headers=headers) |
1360 | + if 'X-Object-Segment-If-Length' in headers: |
1361 | + del headers['X-Object-Segment-If-Length'] |
1362 | + if segment_number: |
1363 | + ring_object_name = '%s/%s/%s' % (self.object_name, |
1364 | + req.headers['x-timestamp'], segment_number) |
1365 | + else: |
1366 | + ring_object_name = self.object_name |
1367 | + segment_partition, segment_nodes = self.app.object_ring.get_nodes( |
1368 | + self.account_name, self.container_name, ring_object_name) |
1369 | + resp = self.PUT_whole_object(segment_req, segment_source, |
1370 | + segment_partition, segment_nodes) |
1371 | + if resp.status_int // 100 == 4: |
1372 | + return resp |
1373 | + elif resp.status_int // 100 != 2: |
1374 | + return HTTPServiceUnavailable(request=req, |
1375 | + body='Unable to complete very large file operation.') |
1376 | + if segment_number == 0 and req.bytes_transferred < SEGMENT_SIZE: |
1377 | + return HTTPCreated(request=req, etag=etag.hexdigest()) |
1378 | + if not chunked: |
1379 | + amount_left -= SEGMENT_SIZE |
1380 | + segment_number += 1 |
1381 | + etag = etag.hexdigest() |
1382 | + if 'etag' in req.headers and req.headers['etag'].lower() != etag: |
1383 | + return HTTPUnprocessableEntity(request=req) |
1384 | + manifest = {'x-timestamp': req.headers['x-timestamp'], |
1385 | + 'content-length': req.bytes_transferred, |
1386 | + 'content-type': req.headers['content-type'], |
1387 | + 'x-segment-size': SEGMENT_SIZE, |
1388 | + 'etag': etag} |
1389 | + if 'content-encoding' in req.headers: |
1390 | + manifest['content-encoding'] = req.headers['content-encoding'] |
1391 | + manifest = json.dumps(manifest) |
1392 | + headers = {'X-Timestamp': req.headers['X-Timestamp'], |
1393 | + 'Content-Type': req.headers['content-type'], |
1394 | + 'Content-Length': len(manifest), |
1395 | + 'X-Object-Type': 'manifest', |
1396 | + 'X-Object-Length': req.bytes_transferred} |
1397 | + headers.update(i for i in req.headers.iteritems() |
1398 | + if i[0].lower().startswith('x-object-meta-') and len(i[0]) > 14) |
1399 | + manifest_req = Request.blank(req.path_info, |
1400 | + environ={'REQUEST_METHOD': 'PUT'}, body=manifest, headers=headers) |
1401 | + manifest_source = iter(lambda: |
1402 | + manifest_req.body_file.read(self.app.client_chunk_size), '') |
1403 | + resp = self.PUT_whole_object(manifest_req, manifest_source, partition, |
1404 | + nodes, container_partition=container_partition, |
1405 | + containers=containers) |
1406 | + if resp.status_int // 100 != 2: |
1407 | + return HTTPServiceUnavailable(request=req, |
1408 | + body='Unable to complete very large file operation.') |
1409 | + return HTTPCreated(request=req, etag=etag) |
1410 | |
1411 | @public |
1412 | @delay_denial |
1413 | |
1414 | === modified file 'test/unit/__init__.py' |
1415 | --- test/unit/__init__.py 2010-07-29 20:06:01 +0000 |
1416 | +++ test/unit/__init__.py 2010-10-19 18:43:49 +0000 |
1417 | @@ -9,6 +9,8 @@ |
1418 | crlfs = 0 |
1419 | while crlfs < 2: |
1420 | c = fd.read(1) |
1421 | + if not len(c): |
1422 | + raise Exception('Never read 2crlfs; got %s' % repr(rv)) |
1423 | rv = rv + c |
1424 | if c == '\r' and lc != '\n': |
1425 | crlfs = 0 |
1426 | |
1427 | === added file 'test/unit/obj/test_hashes.py' |
1428 | --- test/unit/obj/test_hashes.py 1970-01-01 00:00:00 +0000 |
1429 | +++ test/unit/obj/test_hashes.py 2010-10-19 18:43:49 +0000 |
1430 | @@ -0,0 +1,28 @@ |
1431 | +# Copyright (c) 2010 OpenStack, LLC. |
1432 | +# |
1433 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
1434 | +# you may not use this file except in compliance with the License. |
1435 | +# You may obtain a copy of the License at |
1436 | +# |
1437 | +# http://www.apache.org/licenses/LICENSE-2.0 |
1438 | +# |
1439 | +# Unless required by applicable law or agreed to in writing, software |
1440 | +# distributed under the License is distributed on an "AS IS" BASIS, |
1441 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
1442 | +# implied. |
1443 | +# See the License for the specific language governing permissions and |
1444 | +# limitations under the License. |
1445 | + |
1446 | +# TODO: Tests |
1447 | + |
1448 | +import unittest |
1449 | +from swift.obj import hashes |
1450 | + |
1451 | +class TestHashes(unittest.TestCase): |
1452 | + |
1453 | + def test_placeholder(self): |
1454 | + pass |
1455 | + |
1456 | + |
1457 | +if __name__ == '__main__': |
1458 | + unittest.main() |
1459 | |
1460 | === added file 'test/unit/obj/test_hashes.py.moved' |
1461 | --- test/unit/obj/test_hashes.py.moved 1970-01-01 00:00:00 +0000 |
1462 | +++ test/unit/obj/test_hashes.py.moved 2010-10-19 18:43:49 +0000 |
1463 | @@ -0,0 +1,28 @@ |
1464 | +# Copyright (c) 2010 OpenStack, LLC. |
1465 | +# |
1466 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
1467 | +# you may not use this file except in compliance with the License. |
1468 | +# You may obtain a copy of the License at |
1469 | +# |
1470 | +# http://www.apache.org/licenses/LICENSE-2.0 |
1471 | +# |
1472 | +# Unless required by applicable law or agreed to in writing, software |
1473 | +# distributed under the License is distributed on an "AS IS" BASIS, |
1474 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
1475 | +# implied. |
1476 | +# See the License for the specific language governing permissions and |
1477 | +# limitations under the License. |
1478 | + |
1479 | +# TODO: Tests |
1480 | + |
1481 | +import unittest |
1482 | +from swift.obj import hashes |
1483 | + |
1484 | +class TestHashes(unittest.TestCase): |
1485 | + |
1486 | + def test_placeholder(self): |
1487 | + pass |
1488 | + |
1489 | + |
1490 | +if __name__ == '__main__': |
1491 | + unittest.main() |
I'm going to have to redo this puppy. It got to unwieldy so I'm splitting into separate branches.