Merge lp:~gholt/swift/largefiles into lp:~hudson-openstack/swift/trunk

Proposed by gholt
Status: Rejected
Rejected by: gholt
Proposed branch: lp:~gholt/swift/largefiles
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 1491 lines (+930/-231)
10 files modified
swift/common/constraints.py (+8/-2)
swift/obj/auditor.py (+1/-1)
swift/obj/hashes.py (+179/-0)
swift/obj/hashes.py.moved (+179/-0)
swift/obj/replicator.py (+37/-177)
swift/obj/server.py (+81/-30)
swift/proxy/server.py (+387/-21)
test/unit/__init__.py (+2/-0)
test/unit/obj/test_hashes.py (+28/-0)
test/unit/obj/test_hashes.py.moved (+28/-0)
To merge this branch: bzr merge lp:~gholt/swift/largefiles
Reviewer Review Type Date Requested Status
gholt (community) Disapprove
Review via email: mp+35493@code.launchpad.net

Description of the change

Support for very large files by breaking them into segments and distributing the segments across the cluster.

To post a comment you must log in.
lp:~gholt/swift/largefiles updated
72. By gholt

Fixed typo

73. By gholt

Merged changes from trunk

74. By gholt

very-large-files: now distributes segments across cluster and will not overwrite existing segments during the upload

75. By gholt

Updating object auditor to clean up old orphaned large file segments

76. By gholt

Merged changes from trunk

77. By gholt

Merged changes from trunk

78. By gholt

Merged from trunk

79. By gholt

Renovation to store object segments separate from whole objects

80. By gholt

Just a bit of DRY

81. By gholt

Support very large chunked transfer encoding uploads

82. By gholt

Update import and new test stub for moved obj hashes code

83. By gholt

Merge from trunk

84. By gholt

Merged from trunk

85. By gholt

Merged from trunk

86. By gholt

Merged with trunk

87. By gholt

Merged from trunk

88. By gholt

Refactor of container_updater calling; removed outdated imports

89. By gholt

Merged with refactorhashes

Revision history for this message
gholt (gholt) wrote :

I'm going to have to redo this puppy. It got to unwieldy so I'm splitting into separate branches.

review: Disapprove
lp:~gholt/swift/largefiles updated
90. By gholt

Merge from trunk

91. By gholt

Merged from refactorobjasync

Unmerged revisions

91. By gholt

Merged from refactorobjasync

90. By gholt

Merge from trunk

89. By gholt

Merged with refactorhashes

88. By gholt

Refactor of container_updater calling; removed outdated imports

87. By gholt

Merged from trunk

86. By gholt

Merged with trunk

85. By gholt

Merged from trunk

84. By gholt

Merged from trunk

83. By gholt

Merge from trunk

82. By gholt

Update import and new test stub for moved obj hashes code

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'swift/common/constraints.py'
2--- swift/common/constraints.py 2010-09-22 19:53:38 +0000
3+++ swift/common/constraints.py 2010-10-19 18:43:49 +0000
4@@ -19,8 +19,12 @@
5 HTTPRequestEntityTooLarge
6
7
8+# FIXME: This will get bumped way up once very large file support is added.
9 #: Max file size allowed for objects
10 MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2
11+# FIXME: Yeah, I know this is a silly low value; just for testing right now.
12+#: File size for segments of large objects
13+SEGMENT_SIZE = 1024
14 #: Max length of the name of a key for metadata
15 MAX_META_NAME_LENGTH = 128
16 #: Max length of the value of a key for metadata
17@@ -29,14 +33,16 @@
18 MAX_META_COUNT = 90
19 #: Max overall size of metadata
20 MAX_META_OVERALL_SIZE = 4096
21+#: Max account name length
22+MAX_ACCOUNT_NAME_LENGTH = 256
23+#: Max container name length
24+MAX_CONTAINER_NAME_LENGTH = 256
25 #: Max object name length
26 MAX_OBJECT_NAME_LENGTH = 1024
27 #: Max object list length of a get request for a container
28 CONTAINER_LISTING_LIMIT = 10000
29 #: Max container list length of a get request for an account
30 ACCOUNT_LISTING_LIMIT = 10000
31-MAX_ACCOUNT_NAME_LENGTH = 256
32-MAX_CONTAINER_NAME_LENGTH = 256
33
34
35 def check_metadata(req, target_type):
36
37=== modified file 'swift/obj/auditor.py'
38--- swift/obj/auditor.py 2010-10-18 22:30:26 +0000
39+++ swift/obj/auditor.py 2010-10-19 18:43:49 +0000
40@@ -19,7 +19,7 @@
41 from random import random
42
43 from swift.obj import server as object_server
44-from swift.obj.replicator import invalidate_hash
45+from swift.obj.hashes import invalidate_hash
46 from swift.common.utils import get_logger, renamer, audit_location_generator
47 from swift.common.exceptions import AuditException
48 from swift.common.daemon import Daemon
49
50=== added file 'swift/obj/hashes.py'
51--- swift/obj/hashes.py 1970-01-01 00:00:00 +0000
52+++ swift/obj/hashes.py 2010-10-19 18:43:49 +0000
53@@ -0,0 +1,179 @@
54+# Copyright (c) 2010 OpenStack, LLC.
55+#
56+# Licensed under the Apache License, Version 2.0 (the "License");
57+# you may not use this file except in compliance with the License.
58+# You may obtain a copy of the License at
59+#
60+# http://www.apache.org/licenses/LICENSE-2.0
61+#
62+# Unless required by applicable law or agreed to in writing, software
63+# distributed under the License is distributed on an "AS IS" BASIS,
64+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
65+# implied.
66+# See the License for the specific language governing permissions and
67+# limitations under the License.
68+
69+import cPickle as pickle
70+import hashlib
71+import os
72+from os.path import isdir, join
73+
74+from eventlet import tpool, sleep
75+
76+from swift.common.utils import lock_path, renamer
77+
78+
79+PICKLE_PROTOCOL = 2
80+ONE_WEEK = 604800
81+HASH_FILE = 'hashes.pkl'
82+
83+
84+def hash_suffix(path, reclaim_age):
85+ """
86+ Performs reclamation and returns an md5 of all (remaining) files.
87+
88+ :param reclaim_age: age in seconds at which to remove tombstones
89+ """
90+ md5 = hashlib.md5()
91+ for hsh in sorted(os.listdir(path)):
92+ hsh_path = join(path, hsh)
93+ files = os.listdir(hsh_path)
94+ if len(files) == 1:
95+ if files[0].endswith('.ts'):
96+ # remove tombstones older than reclaim_age
97+ ts = files[0].rsplit('.', 1)[0]
98+ if (time.time() - float(ts)) > reclaim_age:
99+ os.unlink(join(hsh_path, files[0]))
100+ files.remove(files[0])
101+ elif files:
102+ files.sort(reverse=True)
103+ meta = data = tomb = None
104+ for filename in files:
105+ if not meta and filename.endswith('.meta'):
106+ meta = filename
107+ if not data and filename.endswith('.data'):
108+ data = filename
109+ if not tomb and filename.endswith('.ts'):
110+ tomb = filename
111+ if (filename < tomb or # any file older than tomb
112+ filename < data or # any file older than data
113+ (filename.endswith('.meta') and
114+ filename < meta)): # old meta
115+ os.unlink(join(hsh_path, filename))
116+ files.remove(filename)
117+ if not files:
118+ os.rmdir(hsh_path)
119+ for filename in files:
120+ md5.update(filename)
121+ try:
122+ os.rmdir(path)
123+ except OSError:
124+ pass
125+ return md5.hexdigest()
126+
127+
128+def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
129+ """
130+ Recalculates hashes for the given suffixes in the partition and updates
131+ them in the partition's hashes file.
132+
133+ :param partition_dir: directory of the partition in which to recalculate
134+ :param suffixes: list of suffixes to recalculate
135+ :param reclaim_age: age in seconds at which tombstones should be removed
136+ """
137+
138+ def tpool_listdir(partition_dir):
139+ return dict(((suff, None) for suff in os.listdir(partition_dir)
140+ if len(suff) == 3 and isdir(join(partition_dir, suff))))
141+ hashes_file = join(partition_dir, HASH_FILE)
142+ with lock_path(partition_dir):
143+ try:
144+ with open(hashes_file, 'rb') as fp:
145+ hashes = pickle.load(fp)
146+ except Exception:
147+ hashes = tpool.execute(tpool_listdir, partition_dir)
148+ for suffix in suffixes:
149+ suffix_dir = join(partition_dir, suffix)
150+ if os.path.exists(suffix_dir):
151+ hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
152+ elif suffix in hashes:
153+ del hashes[suffix]
154+ with open(hashes_file + '.tmp', 'wb') as fp:
155+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
156+ renamer(hashes_file + '.tmp', hashes_file)
157+
158+
159+def invalidate_hash(suffix_dir):
160+ """
161+ Invalidates the hash for a suffix_dir in the partition's hashes file.
162+
163+ :param suffix_dir: absolute path to suffix dir whose hash needs
164+ invalidating
165+ """
166+
167+ suffix = os.path.basename(suffix_dir)
168+ partition_dir = os.path.dirname(suffix_dir)
169+ hashes_file = join(partition_dir, HASH_FILE)
170+ with lock_path(partition_dir):
171+ try:
172+ with open(hashes_file, 'rb') as fp:
173+ hashes = pickle.load(fp)
174+ if suffix in hashes and not hashes[suffix]:
175+ return
176+ except Exception:
177+ return
178+ hashes[suffix] = None
179+ with open(hashes_file + '.tmp', 'wb') as fp:
180+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
181+ renamer(hashes_file + '.tmp', hashes_file)
182+
183+
184+def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
185+ """
186+ Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
187+ the hash cache for suffix existence at the (unexpectedly high) cost of a
188+ listdir. reclaim_age is just passed on to hash_suffix.
189+
190+ :param partition_dir: absolute path of partition to get hashes for
191+ :param do_listdir: force existence check for all hashes in the partition
192+ :param reclaim_age: age at which to remove tombstones
193+
194+ :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
195+ """
196+
197+ def tpool_listdir(hashes, partition_dir):
198+ return dict(((suff, hashes.get(suff, None))
199+ for suff in os.listdir(partition_dir)
200+ if len(suff) == 3 and isdir(join(partition_dir, suff))))
201+ hashed = 0
202+ hashes_file = join(partition_dir, HASH_FILE)
203+ with lock_path(partition_dir):
204+ modified = False
205+ hashes = {}
206+ try:
207+ with open(hashes_file, 'rb') as fp:
208+ hashes = pickle.load(fp)
209+ except Exception:
210+ do_listdir = True
211+ if do_listdir:
212+ hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
213+ modified = True
214+ for suffix, hash_ in hashes.items():
215+ if not hash_:
216+ suffix_dir = join(partition_dir, suffix)
217+ if os.path.exists(suffix_dir):
218+ try:
219+ hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
220+ hashed += 1
221+ except OSError:
222+ logging.exception('Error hashing suffix')
223+ hashes[suffix] = None
224+ else:
225+ del hashes[suffix]
226+ modified = True
227+ sleep()
228+ if modified:
229+ with open(hashes_file + '.tmp', 'wb') as fp:
230+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
231+ renamer(hashes_file + '.tmp', hashes_file)
232+ return hashed, hashes
233
234=== added file 'swift/obj/hashes.py.moved'
235--- swift/obj/hashes.py.moved 1970-01-01 00:00:00 +0000
236+++ swift/obj/hashes.py.moved 2010-10-19 18:43:49 +0000
237@@ -0,0 +1,179 @@
238+# Copyright (c) 2010 OpenStack, LLC.
239+#
240+# Licensed under the Apache License, Version 2.0 (the "License");
241+# you may not use this file except in compliance with the License.
242+# You may obtain a copy of the License at
243+#
244+# http://www.apache.org/licenses/LICENSE-2.0
245+#
246+# Unless required by applicable law or agreed to in writing, software
247+# distributed under the License is distributed on an "AS IS" BASIS,
248+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
249+# implied.
250+# See the License for the specific language governing permissions and
251+# limitations under the License.
252+
253+import cPickle as pickle
254+import hashlib
255+import os
256+from os.path import isdir, join
257+
258+from eventlet import tpool, sleep
259+
260+from swift.common.utils import lock_path, renamer
261+
262+
263+PICKLE_PROTOCOL = 2
264+ONE_WEEK = 604800
265+HASH_FILE = 'hashes.pkl'
266+
267+
268+def hash_suffix(path, reclaim_age):
269+ """
270+ Performs reclamation and returns an md5 of all (remaining) files.
271+
272+ :param reclaim_age: age in seconds at which to remove tombstones
273+ """
274+ md5 = hashlib.md5()
275+ for hsh in sorted(os.listdir(path)):
276+ hsh_path = join(path, hsh)
277+ files = os.listdir(hsh_path)
278+ if len(files) == 1:
279+ if files[0].endswith('.ts'):
280+ # remove tombstones older than reclaim_age
281+ ts = files[0].rsplit('.', 1)[0]
282+ if (time.time() - float(ts)) > reclaim_age:
283+ os.unlink(join(hsh_path, files[0]))
284+ files.remove(files[0])
285+ elif files:
286+ files.sort(reverse=True)
287+ meta = data = tomb = None
288+ for filename in files:
289+ if not meta and filename.endswith('.meta'):
290+ meta = filename
291+ if not data and filename.endswith('.data'):
292+ data = filename
293+ if not tomb and filename.endswith('.ts'):
294+ tomb = filename
295+ if (filename < tomb or # any file older than tomb
296+ filename < data or # any file older than data
297+ (filename.endswith('.meta') and
298+ filename < meta)): # old meta
299+ os.unlink(join(hsh_path, filename))
300+ files.remove(filename)
301+ if not files:
302+ os.rmdir(hsh_path)
303+ for filename in files:
304+ md5.update(filename)
305+ try:
306+ os.rmdir(path)
307+ except OSError:
308+ pass
309+ return md5.hexdigest()
310+
311+
312+def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
313+ """
314+ Recalculates hashes for the given suffixes in the partition and updates
315+ them in the partition's hashes file.
316+
317+ :param partition_dir: directory of the partition in which to recalculate
318+ :param suffixes: list of suffixes to recalculate
319+ :param reclaim_age: age in seconds at which tombstones should be removed
320+ """
321+
322+ def tpool_listdir(partition_dir):
323+ return dict(((suff, None) for suff in os.listdir(partition_dir)
324+ if len(suff) == 3 and isdir(join(partition_dir, suff))))
325+ hashes_file = join(partition_dir, HASH_FILE)
326+ with lock_path(partition_dir):
327+ try:
328+ with open(hashes_file, 'rb') as fp:
329+ hashes = pickle.load(fp)
330+ except Exception:
331+ hashes = tpool.execute(tpool_listdir, partition_dir)
332+ for suffix in suffixes:
333+ suffix_dir = join(partition_dir, suffix)
334+ if os.path.exists(suffix_dir):
335+ hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
336+ elif suffix in hashes:
337+ del hashes[suffix]
338+ with open(hashes_file + '.tmp', 'wb') as fp:
339+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
340+ renamer(hashes_file + '.tmp', hashes_file)
341+
342+
343+def invalidate_hash(suffix_dir):
344+ """
345+ Invalidates the hash for a suffix_dir in the partition's hashes file.
346+
347+ :param suffix_dir: absolute path to suffix dir whose hash needs
348+ invalidating
349+ """
350+
351+ suffix = os.path.basename(suffix_dir)
352+ partition_dir = os.path.dirname(suffix_dir)
353+ hashes_file = join(partition_dir, HASH_FILE)
354+ with lock_path(partition_dir):
355+ try:
356+ with open(hashes_file, 'rb') as fp:
357+ hashes = pickle.load(fp)
358+ if suffix in hashes and not hashes[suffix]:
359+ return
360+ except Exception:
361+ return
362+ hashes[suffix] = None
363+ with open(hashes_file + '.tmp', 'wb') as fp:
364+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
365+ renamer(hashes_file + '.tmp', hashes_file)
366+
367+
368+def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
369+ """
370+ Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
371+ the hash cache for suffix existence at the (unexpectedly high) cost of a
372+ listdir. reclaim_age is just passed on to hash_suffix.
373+
374+ :param partition_dir: absolute path of partition to get hashes for
375+ :param do_listdir: force existence check for all hashes in the partition
376+ :param reclaim_age: age at which to remove tombstones
377+
378+ :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
379+ """
380+
381+ def tpool_listdir(hashes, partition_dir):
382+ return dict(((suff, hashes.get(suff, None))
383+ for suff in os.listdir(partition_dir)
384+ if len(suff) == 3 and isdir(join(partition_dir, suff))))
385+ hashed = 0
386+ hashes_file = join(partition_dir, HASH_FILE)
387+ with lock_path(partition_dir):
388+ modified = False
389+ hashes = {}
390+ try:
391+ with open(hashes_file, 'rb') as fp:
392+ hashes = pickle.load(fp)
393+ except Exception:
394+ do_listdir = True
395+ if do_listdir:
396+ hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
397+ modified = True
398+ for suffix, hash_ in hashes.items():
399+ if not hash_:
400+ suffix_dir = join(partition_dir, suffix)
401+ if os.path.exists(suffix_dir):
402+ try:
403+ hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
404+ hashed += 1
405+ except OSError:
406+ logging.exception('Error hashing suffix')
407+ hashes[suffix] = None
408+ else:
409+ del hashes[suffix]
410+ modified = True
411+ sleep()
412+ if modified:
413+ with open(hashes_file + '.tmp', 'wb') as fp:
414+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
415+ renamer(hashes_file + '.tmp', hashes_file)
416+ return hashed, hashes
417
418=== modified file 'swift/obj/replicator.py'
419--- swift/obj/replicator.py 2010-10-19 01:05:54 +0000
420+++ swift/obj/replicator.py 2010-10-19 18:43:49 +0000
421@@ -19,7 +19,6 @@
422 import shutil
423 import time
424 import logging
425-import hashlib
426 import itertools
427 import cPickle as pickle
428
429@@ -29,168 +28,16 @@
430 from eventlet.support.greenlets import GreenletExit
431
432 from swift.common.ring import Ring
433-from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
434- renamer, compute_eta, get_logger
435+from swift.common.utils import compute_eta, get_logger, unlink_older_than, \
436+ whataremyips
437 from swift.common.bufferedhttp import http_connect
438 from swift.common.daemon import Daemon
439+from swift.obj.hashes import get_hashes, recalculate_hashes
440+from swift.obj.server import DATADIR
441+
442
443 hubs.use_hub('poll')
444
445-PICKLE_PROTOCOL = 2
446-ONE_WEEK = 604800
447-HASH_FILE = 'hashes.pkl'
448-
449-
450-def hash_suffix(path, reclaim_age):
451- """
452- Performs reclamation and returns an md5 of all (remaining) files.
453-
454- :param reclaim_age: age in seconds at which to remove tombstones
455- """
456- md5 = hashlib.md5()
457- for hsh in sorted(os.listdir(path)):
458- hsh_path = join(path, hsh)
459- files = os.listdir(hsh_path)
460- if len(files) == 1:
461- if files[0].endswith('.ts'):
462- # remove tombstones older than reclaim_age
463- ts = files[0].rsplit('.', 1)[0]
464- if (time.time() - float(ts)) > reclaim_age:
465- os.unlink(join(hsh_path, files[0]))
466- files.remove(files[0])
467- elif files:
468- files.sort(reverse=True)
469- meta = data = tomb = None
470- for filename in files:
471- if not meta and filename.endswith('.meta'):
472- meta = filename
473- if not data and filename.endswith('.data'):
474- data = filename
475- if not tomb and filename.endswith('.ts'):
476- tomb = filename
477- if (filename < tomb or # any file older than tomb
478- filename < data or # any file older than data
479- (filename.endswith('.meta') and
480- filename < meta)): # old meta
481- os.unlink(join(hsh_path, filename))
482- files.remove(filename)
483- if not files:
484- os.rmdir(hsh_path)
485- for filename in files:
486- md5.update(filename)
487- try:
488- os.rmdir(path)
489- except OSError:
490- pass
491- return md5.hexdigest()
492-
493-
494-def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
495- """
496- Recalculates hashes for the given suffixes in the partition and updates
497- them in the partition's hashes file.
498-
499- :param partition_dir: directory of the partition in which to recalculate
500- :param suffixes: list of suffixes to recalculate
501- :param reclaim_age: age in seconds at which tombstones should be removed
502- """
503-
504- def tpool_listdir(partition_dir):
505- return dict(((suff, None) for suff in os.listdir(partition_dir)
506- if len(suff) == 3 and isdir(join(partition_dir, suff))))
507- hashes_file = join(partition_dir, HASH_FILE)
508- with lock_path(partition_dir):
509- try:
510- with open(hashes_file, 'rb') as fp:
511- hashes = pickle.load(fp)
512- except Exception:
513- hashes = tpool.execute(tpool_listdir, partition_dir)
514- for suffix in suffixes:
515- suffix_dir = join(partition_dir, suffix)
516- if os.path.exists(suffix_dir):
517- hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
518- elif suffix in hashes:
519- del hashes[suffix]
520- with open(hashes_file + '.tmp', 'wb') as fp:
521- pickle.dump(hashes, fp, PICKLE_PROTOCOL)
522- renamer(hashes_file + '.tmp', hashes_file)
523-
524-
525-def invalidate_hash(suffix_dir):
526- """
527- Invalidates the hash for a suffix_dir in the partition's hashes file.
528-
529- :param suffix_dir: absolute path to suffix dir whose hash needs
530- invalidating
531- """
532-
533- suffix = os.path.basename(suffix_dir)
534- partition_dir = os.path.dirname(suffix_dir)
535- hashes_file = join(partition_dir, HASH_FILE)
536- with lock_path(partition_dir):
537- try:
538- with open(hashes_file, 'rb') as fp:
539- hashes = pickle.load(fp)
540- if suffix in hashes and not hashes[suffix]:
541- return
542- except Exception:
543- return
544- hashes[suffix] = None
545- with open(hashes_file + '.tmp', 'wb') as fp:
546- pickle.dump(hashes, fp, PICKLE_PROTOCOL)
547- renamer(hashes_file + '.tmp', hashes_file)
548-
549-
550-def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
551- """
552- Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
553- the hash cache for suffix existence at the (unexpectedly high) cost of a
554- listdir. reclaim_age is just passed on to hash_suffix.
555-
556- :param partition_dir: absolute path of partition to get hashes for
557- :param do_listdir: force existence check for all hashes in the partition
558- :param reclaim_age: age at which to remove tombstones
559-
560- :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
561- """
562-
563- def tpool_listdir(hashes, partition_dir):
564- return dict(((suff, hashes.get(suff, None))
565- for suff in os.listdir(partition_dir)
566- if len(suff) == 3 and isdir(join(partition_dir, suff))))
567- hashed = 0
568- hashes_file = join(partition_dir, HASH_FILE)
569- with lock_path(partition_dir):
570- modified = False
571- hashes = {}
572- try:
573- with open(hashes_file, 'rb') as fp:
574- hashes = pickle.load(fp)
575- except Exception:
576- do_listdir = True
577- if do_listdir:
578- hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
579- modified = True
580- for suffix, hash_ in hashes.items():
581- if not hash_:
582- suffix_dir = join(partition_dir, suffix)
583- if os.path.exists(suffix_dir):
584- try:
585- hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
586- hashed += 1
587- except OSError:
588- logging.exception('Error hashing suffix')
589- hashes[suffix] = None
590- else:
591- del hashes[suffix]
592- modified = True
593- sleep()
594- if modified:
595- with open(hashes_file + '.tmp', 'wb') as fp:
596- pickle.dump(hashes, fp, PICKLE_PROTOCOL)
597- renamer(hashes_file + '.tmp', hashes_file)
598- return hashed, hashes
599-
600
601 class ObjectReplicator(Daemon):
602 """
603@@ -301,8 +148,9 @@
604 had_any = True
605 if not had_any:
606 return False
607- args.append(join(rsync_module, node['device'],
608- 'objects', job['partition']))
609+ objdir = job.get('segments') and SEGMENTSDIR or DATADIR
610+ args.append(join(rsync_module, node['device'], objdir,
611+ job['partition']))
612 return self._rsync(args) == 0
613
614 def check_ring(self):
615@@ -337,12 +185,15 @@
616 for node in job['nodes']:
617 success = self.rsync(node, job, suffixes)
618 if success:
619+ headers = {'Content-Length': '0'}
620+ if job.get('segments'):
621+ headers['X-Object-Type'] = 'segment'
622 with Timeout(self.http_timeout):
623 http_connect(node['ip'],
624 node['port'],
625 node['device'], job['partition'], 'REPLICATE',
626 '/' + '-'.join(suffixes),
627- headers={'Content-Length': '0'}).getresponse().read()
628+ headers=headers).getresponse().read()
629 responses.append(success)
630 if not suffixes or (len(responses) == \
631 self.object_ring.replica_count and all(responses)):
632@@ -374,10 +225,13 @@
633 node = next(nodes)
634 attempts_left -= 1
635 try:
636+ headers = {'Content-Length': '0'}
637+ if job.get('segments'):
638+ headers['X-Object-Type'] = 'segment'
639 with Timeout(self.http_timeout):
640 resp = http_connect(node['ip'], node['port'],
641 node['device'], job['partition'], 'REPLICATE',
642- '', headers={'Content-Length': '0'}).getresponse()
643+ '', headers=headers).getresponse()
644 if resp.status == 507:
645 self.logger.error('%s/%s responded as unmounted' %
646 (node['ip'], node['device']))
647@@ -397,11 +251,14 @@
648 self.rsync(node, job, suffixes)
649 recalculate_hashes(job['path'], suffixes,
650 reclaim_age=self.reclaim_age)
651+ headers = {'Content-Length': '0'}
652+ if job.get('segments'):
653+ headers['X-Object-Type'] = 'segment'
654 with Timeout(self.http_timeout):
655 conn = http_connect(node['ip'], node['port'],
656 node['device'], job['partition'], 'REPLICATE',
657 '/' + '-'.join(suffixes),
658- headers={'Content-Length': '0'})
659+ headers=headers)
660 conn.getresponse().read()
661 self.suffix_sync += len(suffixes)
662 except (Exception, Timeout):
663@@ -489,24 +346,27 @@
664 dev for dev in self.object_ring.devs
665 if dev and dev['ip'] in ips and dev['port'] == self.port]:
666 dev_path = join(self.devices_dir, local_dev['device'])
667- obj_path = join(dev_path, 'objects')
668- tmp_path = join(dev_path, 'tmp')
669 if self.mount_check and not os.path.ismount(dev_path):
670 self.logger.warn('%s is not mounted' % local_dev['device'])
671 continue
672+ tmp_path = join(dev_path, 'tmp')
673 unlink_older_than(tmp_path, time.time() - self.reclaim_age)
674- if not os.path.exists(obj_path):
675- continue
676- for partition in os.listdir(obj_path):
677- try:
678- nodes = [node for node in
679- self.object_ring.get_part_nodes(int(partition))
680- if node['id'] != local_dev['id']]
681- jobs.append(dict(path=join(obj_path, partition),
682- nodes=nodes, delete=len(nodes) > 2,
683- partition=partition))
684- except ValueError:
685- continue
686+ for objdir in (DATADIR, SEGMENTSDIR):
687+ obj_path = join(dev_path, objdir)
688+ if os.path.exists(obj_path):
689+ for partition in os.listdir(obj_path):
690+ try:
691+ nodes = [node for node in
692+ self.object_ring.get_part_nodes(
693+ int(partition))
694+ if node['id'] != local_dev['id']]
695+ jobs.append(dict(
696+ path=join(obj_path, partition),
697+ nodes=nodes, delete=len(nodes) > 2,
698+ partition=partition,
699+ segments=(objdir == SEGMENTSDIR)))
700+ except ValueError:
701+ continue
702 random.shuffle(jobs)
703 # Partititons that need to be deleted take priority
704 jobs.sort(key=lambda job: not job['delete'])
705
706=== modified file 'swift/obj/server.py'
707--- swift/obj/server.py 2010-10-19 01:05:54 +0000
708+++ swift/obj/server.py 2010-10-19 18:43:49 +0000
709@@ -42,12 +42,12 @@
710 from swift.common.constraints import check_object_creation, check_mount, \
711 check_float, check_utf8
712 from swift.common.exceptions import ConnectionTimeout
713-from swift.obj.replicator import get_hashes, invalidate_hash, \
714- recalculate_hashes
715+from swift.obj.hashes import get_hashes, invalidate_hash, recalculate_hashes
716
717
718 DATADIR = 'objects'
719-ASYNCDIR = 'async_pending'
720+SEGMENTSDIR = 'object_segments'
721+ASYNCDIR = 'object_async'
722 PICKLE_PROTOCOL = 2
723 METADATA_KEY = 'user.swift.metadata'
724 MAX_OBJECT_NAME_LENGTH = 1024
725@@ -84,15 +84,31 @@
726 :param obj: object name for the object
727 :param keep_data_fp: if True, don't close the fp, otherwise close it
728 :param disk_chunk_Size: size of chunks on file reads
729+ :param segment: If set to not None, indicates which segment of an object
730+ this file represents
731+ :param segment_timestamp: X-Timestamp of the object's segments (set on the
732+ PUT, not changed on POSTs), required if segment
733+ is set to not None
734 """
735
736 def __init__(self, path, device, partition, account, container, obj,
737- keep_data_fp=False, disk_chunk_size=65536):
738+ keep_data_fp=False, disk_chunk_size=65536, segment=None,
739+ segment_timestamp=None):
740 self.disk_chunk_size = disk_chunk_size
741 self.name = '/' + '/'.join((account, container, obj))
742- name_hash = hash_path(account, container, obj)
743- self.datadir = os.path.join(path, device,
744- storage_directory(DATADIR, partition, name_hash))
745+ if segment and int(segment):
746+ ring_obj = '%s/%s/%s' % (obj, segment_timestamp, segment)
747+ else:
748+ ring_obj = obj
749+ name_hash = hash_path(account, container, ring_obj)
750+ if segment is not None:
751+ self.datadir = os.path.join(path, device,
752+ storage_directory(SEGMENTSDIR, partition, name_hash))
753+ self.no_longer_segment_datadir = os.path.join(path, device,
754+ storage_directory(DATADIR, partition, name_hash))
755+ else:
756+ self.datadir = os.path.join(path, device,
757+ storage_directory(DATADIR, partition, name_hash))
758 self.tmpdir = os.path.join(path, device, 'tmp')
759 self.metadata = {}
760 self.meta_file = None
761@@ -195,7 +211,8 @@
762 except OSError:
763 pass
764
765- def put(self, fd, tmppath, metadata, extension='.data'):
766+ def put(self, fd, tmppath, metadata, extension='.data',
767+ no_longer_segment=False):
768 """
769 Finalize writing the file on disk, and renames it from the temp file to
770 the real location. This should be called after the data has been
771@@ -204,7 +221,10 @@
772 :params fd: file descriptor of the temp file
773 :param tmppath: path to the temporary file being used
774 :param metadata: dictionary of metada to be written
775- :param extention: extension to be used when making the file
776+ :param extension: extension to be used when making the file
777+ :param no_longer_segment: Set to True if this was originally an object
778+ segment but no longer is (case with chunked transfer encoding when
779+ the object ends up less than the segment size)
780 """
781 metadata['name'] = self.name
782 timestamp = normalize_timestamp(metadata['X-Timestamp'])
783@@ -217,6 +237,8 @@
784 if 'Content-Length' in metadata:
785 drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
786 os.fsync(fd)
787+ if no_longer_segment:
788+ self.datadir = self.no_longer_segment_datadir
789 invalidate_hash(os.path.dirname(self.datadir))
790 renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
791 self.metadata = metadata
792@@ -355,7 +377,9 @@
793 if error_response:
794 return error_response
795 file = DiskFile(self.devices, device, partition, account, container,
796- obj, disk_chunk_size=self.disk_chunk_size)
797+ obj, disk_chunk_size=self.disk_chunk_size,
798+ segment=request.headers.get('x-object-segment'),
799+ segment_timestamp=request.headers['x-timestamp'])
800 upload_expiration = time.time() + self.max_upload_time
801 etag = md5()
802 upload_size = 0
803@@ -397,17 +421,32 @@
804 if 'content-encoding' in request.headers:
805 metadata['Content-Encoding'] = \
806 request.headers['Content-Encoding']
807- file.put(fd, tmppath, metadata)
808+ if 'x-object-type' in request.headers:
809+ metadata['X-Object-Type'] = request.headers['x-object-type']
810+ if 'x-object-segment' in request.headers:
811+ metadata['X-Object-Segment'] = \
812+ request.headers['x-object-segment']
813+ no_longer_segment = False
814+ if 'x-object-segment-if-length' in request.headers and \
815+ int(request.headers['x-object-segment-if-length']) != \
816+ os.fstat(fd).st_size:
817+ del metadata['X-Object-Type']
818+ del metadata['X-Object-Segment']
819+ no_longer_segment = True
820+ file.put(fd, tmppath, metadata,
821+ no_longer_segment=no_longer_segment)
822 file.unlinkold(metadata['X-Timestamp'])
823- self.container_update('PUT', account, container, obj, request.headers,
824- {'x-size': file.metadata['Content-Length'],
825- 'x-content-type': file.metadata['Content-Type'],
826- 'x-timestamp': file.metadata['X-Timestamp'],
827- 'x-etag': file.metadata['ETag'],
828- 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
829- device)
830- resp = HTTPCreated(request=request, etag=etag)
831- return resp
832+ if 'X-Object-Segment' not in file.metadata:
833+ self.container_update('PUT', account, container, obj,
834+ request.headers,
835+ {'x-size': request.headers.get('x-object-length',
836+ file.metadata['Content-Length']),
837+ 'x-content-type': file.metadata['Content-Type'],
838+ 'x-timestamp': file.metadata['X-Timestamp'],
839+ 'x-etag': file.metadata['ETag'],
840+ 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
841+ device)
842+ return HTTPCreated(request=request, etag=etag)
843
844 def GET(self, request):
845 """Handle HTTP GET requests for the Swift Object Server."""
846@@ -420,7 +459,9 @@
847 if self.mount_check and not check_mount(self.devices, device):
848 return Response(status='507 %s is not mounted' % device)
849 file = DiskFile(self.devices, device, partition, account, container,
850- obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size)
851+ obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size,
852+ segment=request.headers.get('x-object-segment'),
853+ segment_timestamp=request.headers.get('x-object-segment-timestamp'))
854 if file.is_deleted():
855 if request.headers.get('if-match') == '*':
856 return HTTPPreconditionFailed(request=request)
857@@ -460,7 +501,8 @@
858 'application/octet-stream'), app_iter=file,
859 request=request, conditional_response=True)
860 for key, value in file.metadata.iteritems():
861- if key.lower().startswith('x-object-meta-'):
862+ if key.lower().startswith('x-object-meta-') or \
863+ key.lower() in ('x-object-type', 'x-object-segment'):
864 response.headers[key] = value
865 response.etag = file.metadata['ETag']
866 response.last_modified = float(file.metadata['X-Timestamp'])
867@@ -482,13 +524,16 @@
868 if self.mount_check and not check_mount(self.devices, device):
869 return Response(status='507 %s is not mounted' % device)
870 file = DiskFile(self.devices, device, partition, account, container,
871- obj, disk_chunk_size=self.disk_chunk_size)
872+ obj, disk_chunk_size=self.disk_chunk_size,
873+ segment=request.headers.get('x-object-segment'),
874+ segment_timestamp=request.headers.get('x-object-segment-timestamp'))
875 if file.is_deleted():
876 return HTTPNotFound(request=request)
877 response = Response(content_type=file.metadata['Content-Type'],
878 request=request, conditional_response=True)
879 for key, value in file.metadata.iteritems():
880- if key.lower().startswith('x-object-meta-'):
881+ if key.lower().startswith('x-object-meta-') or \
882+ key.lower() in ('x-object-type', 'x-object-segment'):
883 response.headers[key] = value
884 response.etag = file.metadata['ETag']
885 response.last_modified = float(file.metadata['X-Timestamp'])
886@@ -513,7 +558,9 @@
887 return Response(status='507 %s is not mounted' % device)
888 response_class = HTTPNoContent
889 file = DiskFile(self.devices, device, partition, account, container,
890- obj, disk_chunk_size=self.disk_chunk_size)
891+ obj, disk_chunk_size=self.disk_chunk_size,
892+ segment=request.headers.get('x-object-segment'),
893+ segment_timestamp=request.headers.get('x-object-segment-timestamp'))
894 if file.is_deleted():
895 response_class = HTTPNotFound
896 metadata = {
897@@ -522,10 +569,11 @@
898 with file.mkstemp() as (fd, tmppath):
899 file.put(fd, tmppath, metadata, extension='.ts')
900 file.unlinkold(metadata['X-Timestamp'])
901- self.container_update('DELETE', account, container, obj,
902- request.headers, {'x-timestamp': metadata['X-Timestamp'],
903- 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
904- device)
905+ if 'x-object-segment' not in request.headers:
906+ self.container_update('DELETE', account, container, obj,
907+ request.headers, {'x-timestamp': metadata['X-Timestamp'],
908+ 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
909+ device)
910 resp = response_class(request=request)
911 return resp
912
913@@ -538,7 +586,10 @@
914 unquote(request.path), 2, 3, True)
915 if self.mount_check and not check_mount(self.devices, device):
916 return Response(status='507 %s is not mounted' % device)
917- path = os.path.join(self.devices, device, DATADIR, partition)
918+ if request.headers.get('x-object-type') == 'segment':
919+ path = os.path.join(self.devices, device, SEGMENTSDIR, partition)
920+ else:
921+ path = os.path.join(self.devices, device, DATADIR, partition)
922 if not os.path.exists(path):
923 mkdirs(path)
924 if suffix:
925
926=== modified file 'swift/proxy/server.py'
927--- swift/proxy/server.py 2010-10-15 15:07:19 +0000
928+++ swift/proxy/server.py 2010-10-19 18:43:49 +0000
929@@ -14,21 +14,25 @@
930 # limitations under the License.
931
932 from __future__ import with_statement
933+try:
934+ import simplejson as json
935+except ImportError:
936+ import json
937 import mimetypes
938 import os
939 import time
940 import traceback
941 from ConfigParser import ConfigParser
942+from hashlib import md5
943 from urllib import unquote, quote
944 import uuid
945 import functools
946
947 from eventlet.timeout import Timeout
948-from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
949- HTTPNotFound, HTTPPreconditionFailed, \
950- HTTPRequestTimeout, HTTPServiceUnavailable, \
951- HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \
952- status_map
953+from webob.exc import HTTPBadRequest, HTTPCreated, HTTPInternalServerError, \
954+ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
955+ HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \
956+ HTTPServiceUnavailable, HTTPUnprocessableEntity, status_map
957 from webob import Request, Response
958
959 from swift.common.ring import Ring
960@@ -37,7 +41,7 @@
961 from swift.common.bufferedhttp import http_connect
962 from swift.common.constraints import check_metadata, check_object_creation, \
963 check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
964- MAX_FILE_SIZE
965+ MAX_FILE_SIZE, SEGMENT_SIZE
966 from swift.common.exceptions import ChunkReadTimeout, \
967 ChunkWriteTimeout, ConnectionTimeout
968
969@@ -89,6 +93,135 @@
970 return wrapped
971
972
973+class SegmentedIterable(object):
974+ """
975+ Iterable that returns the object contents for a segmented object in Swift.
976+
977+ In addition to these params, you can also set the `response` attr just
978+ after creating the SegmentedIterable and it will update the response's
979+ `bytes_transferred` value. Be sure to set the `bytes_transferred` value to
980+ 0 beforehand.
981+
982+ :param controller: The ObjectController instance to work with.
983+ :param content_length: The total length of the object.
984+ :param segment_size: The length of each segment (except perhaps the last)
985+ of the object.
986+ :param timestamp: The X-Timestamp of the object's segments (set on the PUT,
987+ not changed on the POSTs).
988+ """
989+
990+ def __init__(self, controller, content_length, segment_size, timestamp):
991+ self.controller = controller
992+ self.content_length = content_length
993+ self.segment_size = segment_size
994+ self.timestamp = timestamp
995+ self.position = 0
996+ self.segment = -1
997+ self.segment_iter = None
998+ self.response = None
999+
1000+ def load_next_segment(self):
1001+ """ Loads the self.segment_iter with the next segment's contents. """
1002+ self.segment += 1
1003+ if self.segment:
1004+ ring_object_name = '%s/%s/%s' % (self.controller.object_name,
1005+ self.timestamp, self.segment)
1006+ else:
1007+ ring_object_name = self.controller.object_name
1008+ partition, nodes = self.controller.app.object_ring.get_nodes(
1009+ self.controller.account_name, self.controller.container_name,
1010+ ring_object_name)
1011+ path = '/%s/%s/%s' % (self.controller.account_name,
1012+ self.controller.container_name, self.controller.object_name)
1013+ req = Request.blank(path, headers={'X-Object-Segment': self.segment,
1014+ 'X-Object-Segment-Timestamp': self.timestamp})
1015+ resp = self.controller.GETorHEAD_base(req, 'Object',
1016+ partition, self.controller.iter_nodes(partition, nodes,
1017+ self.controller.app.object_ring), path,
1018+ self.controller.app.object_ring.replica_count)
1019+ if resp.status_int // 100 != 2:
1020+ raise Exception(
1021+ 'Could not load segment %s of %s' % (self.segment, path))
1022+ self.segment_iter = resp.app_iter
1023+
1024+ def __iter__(self):
1025+ """ Standard iterator function that returns the object's contents. """
1026+ while self.position < self.content_length:
1027+ if not self.segment_iter:
1028+ self.load_next_segment()
1029+ while True:
1030+ with ChunkReadTimeout(self.controller.app.node_timeout):
1031+ try:
1032+ chunk = self.segment_iter.next()
1033+ break
1034+ except StopIteration:
1035+ self.load_next_segment()
1036+ if self.position + len(chunk) > self.content_length:
1037+ chunk = chunk[:self.content_length - self.position]
1038+ self.position += len(chunk)
1039+ if self.response:
1040+ self.response.bytes_transferred += len(chunk)
1041+ yield chunk
1042+
1043+ def app_iter_range(self, start, stop):
1044+ """
1045+ Non-standard iterator function for use with Webob in serving Range
1046+ requests more quickly.
1047+
1048+ TODO:
1049+
1050+ This currently helps on speed by jumping to the proper segment to start
1051+ with (and ending without reading the trailing segments, but that
1052+ already happened technically with __iter__).
1053+
1054+ But, what it does not do yet is issue a Range request with the first
1055+ segment to allow the object server to seek to the segment start point.
1056+
1057+ Instead, it just reads and throws away all leading segment data. Since
1058+ segments are 5G by default, it'll have to transfer the whole 5G from
1059+ the object server to the proxy server even if it only needs the last
1060+ byte. In practice, this should happen fairly quickly relative to how
1061+ long requests take for these very large files; but it's still wasteful.
1062+
1063+ Anyway, it shouldn't be too hard to implement, I just have other things
1064+ to work out first.
1065+
1066+ :param start: The first byte (zero-based) to return.
1067+ :param stop: The last byte (zero-based) to return.
1068+ """
1069+ if start:
1070+ self.segment = (start / self.segment_size) - 1
1071+ self.load_next_segment()
1072+ self.position = self.segment * self.segment_size
1073+ segment_start = start - (self.segment * self.segment_size)
1074+ while segment_start:
1075+ with ChunkReadTimeout(self.controller.app.node_timeout):
1076+ chunk = self.segment_iter.next()
1077+ self.position += len(chunk)
1078+ if len(chunk) > segment_start:
1079+ chunk = chunk[segment_start:]
1080+ if self.response:
1081+ self.response.bytes_transferred += len(chunk)
1082+ yield chunk
1083+ segment_start = 0
1084+ else:
1085+ segment_start -= len(chunk)
1086+ if stop is not None:
1087+ length = stop - start
1088+ else:
1089+ length = None
1090+ for chunk in self:
1091+ if length is not None:
1092+ length -= len(chunk)
1093+ if length < 0:
1094+ # Chop off the extra:
1095+ if self.response:
1096+ self.response.bytes_transferred -= length
1097+ yield chunk[:length]
1098+ break
1099+ yield chunk
1100+
1101+
1102 def get_container_memcache_key(account, container):
1103 path = '/%s/%s' % (account, container)
1104 return 'container%s' % path
1105@@ -518,11 +651,56 @@
1106 aresp = req.environ['swift.authorize'](req)
1107 if aresp:
1108 return aresp
1109+ # This is bit confusing, so an explanation:
1110+ # * First we attempt the GET/HEAD normally, as this is the usual case.
1111+ # * If the request was a Range request and gave us a 416 Unsatisfiable
1112+ # response, we might be trying to do an invalid Range on a manifest
1113+ # object, so we try again with no Range.
1114+ # * If it turns out we have a manifest object, and we had a Range
1115+ # request originally that actually succeeded or we had a HEAD
1116+ # request, we have to do the request again as a full GET because
1117+ # we'll need the whole manifest.
1118+ # * Finally, if we had a manifest object, we pass it and the request
1119+ # off to GETorHEAD_segmented; otherwise we just return the response.
1120 partition, nodes = self.app.object_ring.get_nodes(
1121 self.account_name, self.container_name, self.object_name)
1122- return self.GETorHEAD_base(req, 'Object', partition,
1123+ resp = mresp = self.GETorHEAD_base(req, 'Object', partition,
1124+ self.iter_nodes(partition, nodes, self.app.object_ring),
1125+ req.path_info, self.app.object_ring.replica_count)
1126+ range_value = None
1127+ if mresp.status_int == 416:
1128+ range_value = req.range
1129+ req.range = None
1130+ mresp = self.GETorHEAD_base(req, 'Object', partition,
1131 self.iter_nodes(partition, nodes, self.app.object_ring),
1132 req.path_info, self.app.object_ring.replica_count)
1133+ if mresp.status_int // 100 != 2:
1134+ return resp
1135+ if 'x-object-type' in mresp.headers:
1136+ if mresp.headers['x-object-type'] == 'manifest':
1137+ if req.method == 'HEAD':
1138+ req.method = 'GET'
1139+ mresp = self.GETorHEAD_base(req, 'Object', partition,
1140+ self.iter_nodes(partition, nodes,
1141+ self.app.object_ring), req.path_info,
1142+ self.app.object_ring.replica_count)
1143+ if mresp.status_int // 100 != 2:
1144+ return mresp
1145+ req.method = 'HEAD'
1146+ elif req.range:
1147+ range_value = req.range
1148+ req.range = None
1149+ mresp = self.GETorHEAD_base(req, 'Object', partition,
1150+ self.iter_nodes(partition, nodes,
1151+ self.app.object_ring), req.path_info,
1152+ self.app.object_ring.replica_count)
1153+ if mresp.status_int // 100 != 2:
1154+ return mresp
1155+ if range_value:
1156+ req.range = range_value
1157+ return self.GETorHEAD_segmented(req, mresp)
1158+ return HTTPNotFound(request=req)
1159+ return resp
1160
1161 @public
1162 @delay_denial
1163@@ -536,6 +714,48 @@
1164 """Handler for HTTP HEAD requests."""
1165 return self.GETorHEAD(req)
1166
1167+ def GETorHEAD_segmented(self, req, mresp):
1168+ """
1169+ Performs a GET for a segmented object.
1170+
1171+ :param req: The webob.Request to process.
1172+ :param mresp: The webob.Response for the original manifest request.
1173+ :returns: webob.Response object.
1174+ """
1175+ manifest = json.loads(''.join(mresp.app_iter))
1176+ # Ah, the fun of JSONing strs and getting unicodes back. We
1177+ # reencode to UTF8 to ensure crap doesn't blow up everywhere
1178+ # else.
1179+ keys_to_encode = []
1180+ for k, v in manifest.iteritems():
1181+ if isinstance(k, unicode):
1182+ keys_to_encode.append(k)
1183+ if isinstance(v, unicode):
1184+ manifest[k] = v.encode('utf8')
1185+ for k in keys_to_encode:
1186+ v = manifest[k]
1187+ del manifest[k]
1188+ manifest[k.encode('utf8')] = v
1189+ content_length = int(manifest['content-length'])
1190+ segment_size = int(manifest['x-segment-size'])
1191+ headers = dict(mresp.headers)
1192+ headers.update(manifest)
1193+ del headers['x-segment-size']
1194+ resp = Response(app_iter=SegmentedIterable(self, content_length,
1195+ segment_size, manifest['x-timestamp']), headers=headers,
1196+ request=req, conditional_response=True)
1197+ resp.headers['etag'] = manifest['etag'].strip('"')
1198+ resp.last_modified = mresp.last_modified
1199+ resp.content_length = int(manifest['content-length'])
1200+ resp.content_type = manifest['content-type']
1201+ if 'content-encoding' in manifest:
1202+ resp.content_encoding = manifest['content-encoding']
1203+ cresp = req.get_response(resp)
1204+ # Needed for SegmentedIterable to update bytes_transferred
1205+ cresp.bytes_transferred = 0
1206+ resp.app_iter.response = cresp
1207+ return cresp
1208+
1209 @public
1210 @delay_denial
1211 def POST(self, req):
1212@@ -652,11 +872,47 @@
1213 if k.lower().startswith('x-object-meta-'):
1214 new_req.headers[k] = v
1215 req = new_req
1216+ if req.headers.get('transfer-encoding') == 'chunked' or \
1217+ req.content_length > SEGMENT_SIZE:
1218+ resp = self.PUT_segmented_object(req, data_source, partition,
1219+ nodes, container_partition, containers)
1220+ else:
1221+ resp = self.PUT_whole_object(req, data_source, partition, nodes,
1222+ container_partition, containers)
1223+ if 'x-copy-from' in req.headers:
1224+ resp.headers['X-Copied-From'] = req.headers['x-copy-from']
1225+ for k, v in req.headers.items():
1226+ if k.lower().startswith('x-object-meta-'):
1227+ resp.headers[k] = v
1228+ resp.last_modified = float(req.headers['X-Timestamp'])
1229+ return resp
1230+
1231+ def PUT_whole_object(self, req, data_source, partition, nodes,
1232+ container_partition=None, containers=None):
1233+ """
1234+ Performs a PUT for a whole object (one with a content-length <=
1235+ SEGMENT_SIZE).
1236+
1237+ :param req: The webob.Request to process.
1238+ :param data_source: An iterator providing the data to store.
1239+ :param partition: The object ring partition the object falls on.
1240+ :param nodes: The object ring nodes the object falls on.
1241+ :param container_partition: The container ring partition the container
1242+ for the object falls on, None if the
1243+ container is not to be updated.
1244+ :param containers: The container ring nodes the container for the
1245+ object falls on, None if the container is not to be
1246+ updated.
1247+ :returns: webob.Response object.
1248+ """
1249+ conns = []
1250+ update_containers = containers is not None
1251 for node in self.iter_nodes(partition, nodes, self.app.object_ring):
1252- container = containers.pop()
1253- req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
1254- req.headers['X-Container-Partition'] = container_partition
1255- req.headers['X-Container-Device'] = container['device']
1256+ if update_containers:
1257+ container = containers.pop()
1258+ req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
1259+ req.headers['X-Container-Partition'] = container_partition
1260+ req.headers['X-Container-Device'] = container['device']
1261 req.headers['Expect'] = '100-continue'
1262 resp = conn = None
1263 if not self.error_limited(node):
1264@@ -674,12 +930,14 @@
1265 if conn and resp:
1266 if resp.status == 100:
1267 conns.append(conn)
1268- if not containers:
1269+ if (update_containers and not containers) or \
1270+ len(conns) == len(nodes):
1271 break
1272 continue
1273 elif resp.status == 507:
1274 self.error_limit(node)
1275- containers.insert(0, container)
1276+ if update_containers:
1277+ containers.insert(0, container)
1278 if len(conns) <= len(nodes) / 2:
1279 self.app.logger.error(
1280 'Object PUT returning 503, %s/%s required connections, '
1281@@ -765,15 +1023,123 @@
1282 statuses.append(503)
1283 reasons.append('')
1284 bodies.append('')
1285- resp = self.best_response(req, statuses, reasons, bodies, 'Object PUT',
1286+ return self.best_response(req, statuses, reasons, bodies, 'Object PUT',
1287 etag=etag)
1288- if 'x-copy-from' in req.headers:
1289- resp.headers['X-Copied-From'] = req.headers['x-copy-from']
1290- for k, v in req.headers.items():
1291- if k.lower().startswith('x-object-meta-'):
1292- resp.headers[k] = v
1293- resp.last_modified = float(req.headers['X-Timestamp'])
1294- return resp
1295+
1296+ def PUT_segmented_object(self, req, data_source, partition, nodes,
1297+ container_partition, containers):
1298+ """
1299+ Performs a PUT for a segmented object (one with a content-length >
1300+ SEGMENT_SIZE).
1301+
1302+ :param req: The webob.Request to process.
1303+ :param data_source: An iterator providing the data to store.
1304+ :param partition: The object ring partition the object falls on.
1305+ :param nodes: The object ring nodes the object falls on.
1306+ :param container_partition: The container ring partition the container
1307+ for the object falls on.
1308+ :param containers: The container ring nodes the container for the
1309+ object falls on.
1310+ :returns: webob.Response object.
1311+ """
1312+ req.bytes_transferred = 0
1313+ leftover_chunk = [None]
1314+ etag = md5()
1315+ def segment_iter():
1316+ amount_given = 0
1317+ while amount_given < SEGMENT_SIZE:
1318+ if leftover_chunk[0]:
1319+ chunk = leftover_chunk[0]
1320+ leftover_chunk[0] = None
1321+ else:
1322+ with ChunkReadTimeout(self.app.client_timeout):
1323+ chunk = data_source.next()
1324+ req.bytes_transferred += len(chunk)
1325+ etag.update(chunk)
1326+ if amount_given + len(chunk) > SEGMENT_SIZE:
1327+ yield chunk[:SEGMENT_SIZE - amount_given]
1328+ leftover_chunk[0] = chunk[SEGMENT_SIZE - amount_given:]
1329+ amount_given = SEGMENT_SIZE
1330+ else:
1331+ yield chunk
1332+ amount_given += len(chunk)
1333+ def segment_iter_iter():
1334+ while True:
1335+ if not leftover_chunk[0]:
1336+ with ChunkReadTimeout(self.app.client_timeout):
1337+ leftover_chunk[0] = data_source.next()
1338+ req.bytes_transferred += len(leftover_chunk[0])
1339+ etag.update(leftover_chunk[0])
1340+ yield segment_iter()
1341+ segment_number = 0
1342+ chunked = req.headers.get('transfer-encoding') == 'chunked'
1343+ if not chunked:
1344+ amount_left = req.content_length
1345+ headers = {'X-Timestamp': req.headers['X-Timestamp'],
1346+ 'Content-Type': req.headers['content-type'],
1347+ 'X-Object-Type': 'segment'}
1348+ for segment_source in segment_iter_iter():
1349+ if chunked:
1350+ headers['Transfer-Encoding'] = 'chunked'
1351+ if segment_number == 0:
1352+ headers['X-Object-Segment-If-Length'] = SEGMENT_SIZE
1353+ elif amount_left > SEGMENT_SIZE:
1354+ headers['Content-Length'] = SEGMENT_SIZE
1355+ else:
1356+ headers['Content-Length'] = amount_left
1357+ headers['X-Object-Segment'] = segment_number
1358+ segment_req = Request.blank(req.path_info,
1359+ environ={'REQUEST_METHOD': 'PUT'}, headers=headers)
1360+ if 'X-Object-Segment-If-Length' in headers:
1361+ del headers['X-Object-Segment-If-Length']
1362+ if segment_number:
1363+ ring_object_name = '%s/%s/%s' % (self.object_name,
1364+ req.headers['x-timestamp'], segment_number)
1365+ else:
1366+ ring_object_name = self.object_name
1367+ segment_partition, segment_nodes = self.app.object_ring.get_nodes(
1368+ self.account_name, self.container_name, ring_object_name)
1369+ resp = self.PUT_whole_object(segment_req, segment_source,
1370+ segment_partition, segment_nodes)
1371+ if resp.status_int // 100 == 4:
1372+ return resp
1373+ elif resp.status_int // 100 != 2:
1374+ return HTTPServiceUnavailable(request=req,
1375+ body='Unable to complete very large file operation.')
1376+ if segment_number == 0 and req.bytes_transferred < SEGMENT_SIZE:
1377+ return HTTPCreated(request=req, etag=etag.hexdigest())
1378+ if not chunked:
1379+ amount_left -= SEGMENT_SIZE
1380+ segment_number += 1
1381+ etag = etag.hexdigest()
1382+ if 'etag' in req.headers and req.headers['etag'].lower() != etag:
1383+ return HTTPUnprocessableEntity(request=req)
1384+ manifest = {'x-timestamp': req.headers['x-timestamp'],
1385+ 'content-length': req.bytes_transferred,
1386+ 'content-type': req.headers['content-type'],
1387+ 'x-segment-size': SEGMENT_SIZE,
1388+ 'etag': etag}
1389+ if 'content-encoding' in req.headers:
1390+ manifest['content-encoding'] = req.headers['content-encoding']
1391+ manifest = json.dumps(manifest)
1392+ headers = {'X-Timestamp': req.headers['X-Timestamp'],
1393+ 'Content-Type': req.headers['content-type'],
1394+ 'Content-Length': len(manifest),
1395+ 'X-Object-Type': 'manifest',
1396+ 'X-Object-Length': req.bytes_transferred}
1397+ headers.update(i for i in req.headers.iteritems()
1398+ if i[0].lower().startswith('x-object-meta-') and len(i[0]) > 14)
1399+ manifest_req = Request.blank(req.path_info,
1400+ environ={'REQUEST_METHOD': 'PUT'}, body=manifest, headers=headers)
1401+ manifest_source = iter(lambda:
1402+ manifest_req.body_file.read(self.app.client_chunk_size), '')
1403+ resp = self.PUT_whole_object(manifest_req, manifest_source, partition,
1404+ nodes, container_partition=container_partition,
1405+ containers=containers)
1406+ if resp.status_int // 100 != 2:
1407+ return HTTPServiceUnavailable(request=req,
1408+ body='Unable to complete very large file operation.')
1409+ return HTTPCreated(request=req, etag=etag)
1410
1411 @public
1412 @delay_denial
1413
1414=== modified file 'test/unit/__init__.py'
1415--- test/unit/__init__.py 2010-07-29 20:06:01 +0000
1416+++ test/unit/__init__.py 2010-10-19 18:43:49 +0000
1417@@ -9,6 +9,8 @@
1418 crlfs = 0
1419 while crlfs < 2:
1420 c = fd.read(1)
1421+ if not len(c):
1422+ raise Exception('Never read 2crlfs; got %s' % repr(rv))
1423 rv = rv + c
1424 if c == '\r' and lc != '\n':
1425 crlfs = 0
1426
1427=== added file 'test/unit/obj/test_hashes.py'
1428--- test/unit/obj/test_hashes.py 1970-01-01 00:00:00 +0000
1429+++ test/unit/obj/test_hashes.py 2010-10-19 18:43:49 +0000
1430@@ -0,0 +1,28 @@
1431+# Copyright (c) 2010 OpenStack, LLC.
1432+#
1433+# Licensed under the Apache License, Version 2.0 (the "License");
1434+# you may not use this file except in compliance with the License.
1435+# You may obtain a copy of the License at
1436+#
1437+# http://www.apache.org/licenses/LICENSE-2.0
1438+#
1439+# Unless required by applicable law or agreed to in writing, software
1440+# distributed under the License is distributed on an "AS IS" BASIS,
1441+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1442+# implied.
1443+# See the License for the specific language governing permissions and
1444+# limitations under the License.
1445+
1446+# TODO: Tests
1447+
1448+import unittest
1449+from swift.obj import hashes
1450+
1451+class TestHashes(unittest.TestCase):
1452+
1453+ def test_placeholder(self):
1454+ pass
1455+
1456+
1457+if __name__ == '__main__':
1458+ unittest.main()
1459
1460=== added file 'test/unit/obj/test_hashes.py.moved'
1461--- test/unit/obj/test_hashes.py.moved 1970-01-01 00:00:00 +0000
1462+++ test/unit/obj/test_hashes.py.moved 2010-10-19 18:43:49 +0000
1463@@ -0,0 +1,28 @@
1464+# Copyright (c) 2010 OpenStack, LLC.
1465+#
1466+# Licensed under the Apache License, Version 2.0 (the "License");
1467+# you may not use this file except in compliance with the License.
1468+# You may obtain a copy of the License at
1469+#
1470+# http://www.apache.org/licenses/LICENSE-2.0
1471+#
1472+# Unless required by applicable law or agreed to in writing, software
1473+# distributed under the License is distributed on an "AS IS" BASIS,
1474+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1475+# implied.
1476+# See the License for the specific language governing permissions and
1477+# limitations under the License.
1478+
1479+# TODO: Tests
1480+
1481+import unittest
1482+from swift.obj import hashes
1483+
1484+class TestHashes(unittest.TestCase):
1485+
1486+ def test_placeholder(self):
1487+ pass
1488+
1489+
1490+if __name__ == '__main__':
1491+ unittest.main()