Merge lp:~gholt/swift/largefiles into lp:~hudson-openstack/swift/trunk

Proposed by gholt
Status: Rejected
Rejected by: gholt
Proposed branch: lp:~gholt/swift/largefiles
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 1491 lines (+930/-231)
10 files modified
swift/common/constraints.py (+8/-2)
swift/obj/auditor.py (+1/-1)
swift/obj/hashes.py (+179/-0)
swift/obj/hashes.py.moved (+179/-0)
swift/obj/replicator.py (+37/-177)
swift/obj/server.py (+81/-30)
swift/proxy/server.py (+387/-21)
test/unit/__init__.py (+2/-0)
test/unit/obj/test_hashes.py (+28/-0)
test/unit/obj/test_hashes.py.moved (+28/-0)
To merge this branch: bzr merge lp:~gholt/swift/largefiles
Reviewer Review Type Date Requested Status
gholt (community) Disapprove
Review via email: mp+35493@code.launchpad.net

Description of the change

Support for very large files by breaking them into segments and distributing the segments across the cluster.

To post a comment you must log in.
lp:~gholt/swift/largefiles updated
72. By gholt

Fixed typo

73. By gholt

Merged changes from trunk

74. By gholt

very-large-files: now distributes segments across cluster and will not overwrite existing segments during the upload

75. By gholt

Updating object auditor to clean up old orphaned large file segments

76. By gholt

Merged changes from trunk

77. By gholt

Merged changes from trunk

78. By gholt

Merged from trunk

79. By gholt

Renovation to store object segments separate from whole objects

80. By gholt

Just a bit of DRY

81. By gholt

Support very large chunked transfer encoding uploads

82. By gholt

Update import and new test stub for moved obj hashes code

83. By gholt

Merge from trunk

84. By gholt

Merged from trunk

85. By gholt

Merged from trunk

86. By gholt

Merged with trunk

87. By gholt

Merged from trunk

88. By gholt

Refactor of container_updater calling; removed outdated imports

89. By gholt

Merged with refactorhashes

Revision history for this message
gholt (gholt) wrote :

I'm going to have to redo this puppy. It got to unwieldy so I'm splitting into separate branches.

review: Disapprove
lp:~gholt/swift/largefiles updated
90. By gholt

Merge from trunk

91. By gholt

Merged from refactorobjasync

Unmerged revisions

91. By gholt

Merged from refactorobjasync

90. By gholt

Merge from trunk

89. By gholt

Merged with refactorhashes

88. By gholt

Refactor of container_updater calling; removed outdated imports

87. By gholt

Merged from trunk

86. By gholt

Merged with trunk

85. By gholt

Merged from trunk

84. By gholt

Merged from trunk

83. By gholt

Merge from trunk

82. By gholt

Update import and new test stub for moved obj hashes code

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'swift/common/constraints.py'
--- swift/common/constraints.py 2010-09-22 19:53:38 +0000
+++ swift/common/constraints.py 2010-10-19 18:43:49 +0000
@@ -19,8 +19,12 @@
19 HTTPRequestEntityTooLarge19 HTTPRequestEntityTooLarge
2020
2121
22# FIXME: This will get bumped way up once very large file support is added.
22#: Max file size allowed for objects23#: Max file size allowed for objects
23MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 224MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2
25# FIXME: Yeah, I know this is a silly low value; just for testing right now.
26#: File size for segments of large objects
27SEGMENT_SIZE = 1024
24#: Max length of the name of a key for metadata28#: Max length of the name of a key for metadata
25MAX_META_NAME_LENGTH = 12829MAX_META_NAME_LENGTH = 128
26#: Max length of the value of a key for metadata30#: Max length of the value of a key for metadata
@@ -29,14 +33,16 @@
29MAX_META_COUNT = 9033MAX_META_COUNT = 90
30#: Max overall size of metadata34#: Max overall size of metadata
31MAX_META_OVERALL_SIZE = 409635MAX_META_OVERALL_SIZE = 4096
36#: Max account name length
37MAX_ACCOUNT_NAME_LENGTH = 256
38#: Max container name length
39MAX_CONTAINER_NAME_LENGTH = 256
32#: Max object name length40#: Max object name length
33MAX_OBJECT_NAME_LENGTH = 102441MAX_OBJECT_NAME_LENGTH = 1024
34#: Max object list length of a get request for a container42#: Max object list length of a get request for a container
35CONTAINER_LISTING_LIMIT = 1000043CONTAINER_LISTING_LIMIT = 10000
36#: Max container list length of a get request for an account44#: Max container list length of a get request for an account
37ACCOUNT_LISTING_LIMIT = 1000045ACCOUNT_LISTING_LIMIT = 10000
38MAX_ACCOUNT_NAME_LENGTH = 256
39MAX_CONTAINER_NAME_LENGTH = 256
4046
4147
42def check_metadata(req, target_type):48def check_metadata(req, target_type):
4349
=== modified file 'swift/obj/auditor.py'
--- swift/obj/auditor.py 2010-10-18 22:30:26 +0000
+++ swift/obj/auditor.py 2010-10-19 18:43:49 +0000
@@ -19,7 +19,7 @@
19from random import random19from random import random
2020
21from swift.obj import server as object_server21from swift.obj import server as object_server
22from swift.obj.replicator import invalidate_hash22from swift.obj.hashes import invalidate_hash
23from swift.common.utils import get_logger, renamer, audit_location_generator23from swift.common.utils import get_logger, renamer, audit_location_generator
24from swift.common.exceptions import AuditException24from swift.common.exceptions import AuditException
25from swift.common.daemon import Daemon25from swift.common.daemon import Daemon
2626
=== added file 'swift/obj/hashes.py'
--- swift/obj/hashes.py 1970-01-01 00:00:00 +0000
+++ swift/obj/hashes.py 2010-10-19 18:43:49 +0000
@@ -0,0 +1,179 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import cPickle as pickle
17import hashlib
18import os
19from os.path import isdir, join
20
21from eventlet import tpool, sleep
22
23from swift.common.utils import lock_path, renamer
24
25
26PICKLE_PROTOCOL = 2
27ONE_WEEK = 604800
28HASH_FILE = 'hashes.pkl'
29
30
31def hash_suffix(path, reclaim_age):
32 """
33 Performs reclamation and returns an md5 of all (remaining) files.
34
35 :param reclaim_age: age in seconds at which to remove tombstones
36 """
37 md5 = hashlib.md5()
38 for hsh in sorted(os.listdir(path)):
39 hsh_path = join(path, hsh)
40 files = os.listdir(hsh_path)
41 if len(files) == 1:
42 if files[0].endswith('.ts'):
43 # remove tombstones older than reclaim_age
44 ts = files[0].rsplit('.', 1)[0]
45 if (time.time() - float(ts)) > reclaim_age:
46 os.unlink(join(hsh_path, files[0]))
47 files.remove(files[0])
48 elif files:
49 files.sort(reverse=True)
50 meta = data = tomb = None
51 for filename in files:
52 if not meta and filename.endswith('.meta'):
53 meta = filename
54 if not data and filename.endswith('.data'):
55 data = filename
56 if not tomb and filename.endswith('.ts'):
57 tomb = filename
58 if (filename < tomb or # any file older than tomb
59 filename < data or # any file older than data
60 (filename.endswith('.meta') and
61 filename < meta)): # old meta
62 os.unlink(join(hsh_path, filename))
63 files.remove(filename)
64 if not files:
65 os.rmdir(hsh_path)
66 for filename in files:
67 md5.update(filename)
68 try:
69 os.rmdir(path)
70 except OSError:
71 pass
72 return md5.hexdigest()
73
74
75def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
76 """
77 Recalculates hashes for the given suffixes in the partition and updates
78 them in the partition's hashes file.
79
80 :param partition_dir: directory of the partition in which to recalculate
81 :param suffixes: list of suffixes to recalculate
82 :param reclaim_age: age in seconds at which tombstones should be removed
83 """
84
85 def tpool_listdir(partition_dir):
86 return dict(((suff, None) for suff in os.listdir(partition_dir)
87 if len(suff) == 3 and isdir(join(partition_dir, suff))))
88 hashes_file = join(partition_dir, HASH_FILE)
89 with lock_path(partition_dir):
90 try:
91 with open(hashes_file, 'rb') as fp:
92 hashes = pickle.load(fp)
93 except Exception:
94 hashes = tpool.execute(tpool_listdir, partition_dir)
95 for suffix in suffixes:
96 suffix_dir = join(partition_dir, suffix)
97 if os.path.exists(suffix_dir):
98 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
99 elif suffix in hashes:
100 del hashes[suffix]
101 with open(hashes_file + '.tmp', 'wb') as fp:
102 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
103 renamer(hashes_file + '.tmp', hashes_file)
104
105
106def invalidate_hash(suffix_dir):
107 """
108 Invalidates the hash for a suffix_dir in the partition's hashes file.
109
110 :param suffix_dir: absolute path to suffix dir whose hash needs
111 invalidating
112 """
113
114 suffix = os.path.basename(suffix_dir)
115 partition_dir = os.path.dirname(suffix_dir)
116 hashes_file = join(partition_dir, HASH_FILE)
117 with lock_path(partition_dir):
118 try:
119 with open(hashes_file, 'rb') as fp:
120 hashes = pickle.load(fp)
121 if suffix in hashes and not hashes[suffix]:
122 return
123 except Exception:
124 return
125 hashes[suffix] = None
126 with open(hashes_file + '.tmp', 'wb') as fp:
127 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
128 renamer(hashes_file + '.tmp', hashes_file)
129
130
131def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
132 """
133 Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
134 the hash cache for suffix existence at the (unexpectedly high) cost of a
135 listdir. reclaim_age is just passed on to hash_suffix.
136
137 :param partition_dir: absolute path of partition to get hashes for
138 :param do_listdir: force existence check for all hashes in the partition
139 :param reclaim_age: age at which to remove tombstones
140
141 :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
142 """
143
144 def tpool_listdir(hashes, partition_dir):
145 return dict(((suff, hashes.get(suff, None))
146 for suff in os.listdir(partition_dir)
147 if len(suff) == 3 and isdir(join(partition_dir, suff))))
148 hashed = 0
149 hashes_file = join(partition_dir, HASH_FILE)
150 with lock_path(partition_dir):
151 modified = False
152 hashes = {}
153 try:
154 with open(hashes_file, 'rb') as fp:
155 hashes = pickle.load(fp)
156 except Exception:
157 do_listdir = True
158 if do_listdir:
159 hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
160 modified = True
161 for suffix, hash_ in hashes.items():
162 if not hash_:
163 suffix_dir = join(partition_dir, suffix)
164 if os.path.exists(suffix_dir):
165 try:
166 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
167 hashed += 1
168 except OSError:
169 logging.exception('Error hashing suffix')
170 hashes[suffix] = None
171 else:
172 del hashes[suffix]
173 modified = True
174 sleep()
175 if modified:
176 with open(hashes_file + '.tmp', 'wb') as fp:
177 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
178 renamer(hashes_file + '.tmp', hashes_file)
179 return hashed, hashes
0180
=== added file 'swift/obj/hashes.py.moved'
--- swift/obj/hashes.py.moved 1970-01-01 00:00:00 +0000
+++ swift/obj/hashes.py.moved 2010-10-19 18:43:49 +0000
@@ -0,0 +1,179 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import cPickle as pickle
17import hashlib
18import os
19from os.path import isdir, join
20
21from eventlet import tpool, sleep
22
23from swift.common.utils import lock_path, renamer
24
25
26PICKLE_PROTOCOL = 2
27ONE_WEEK = 604800
28HASH_FILE = 'hashes.pkl'
29
30
31def hash_suffix(path, reclaim_age):
32 """
33 Performs reclamation and returns an md5 of all (remaining) files.
34
35 :param reclaim_age: age in seconds at which to remove tombstones
36 """
37 md5 = hashlib.md5()
38 for hsh in sorted(os.listdir(path)):
39 hsh_path = join(path, hsh)
40 files = os.listdir(hsh_path)
41 if len(files) == 1:
42 if files[0].endswith('.ts'):
43 # remove tombstones older than reclaim_age
44 ts = files[0].rsplit('.', 1)[0]
45 if (time.time() - float(ts)) > reclaim_age:
46 os.unlink(join(hsh_path, files[0]))
47 files.remove(files[0])
48 elif files:
49 files.sort(reverse=True)
50 meta = data = tomb = None
51 for filename in files:
52 if not meta and filename.endswith('.meta'):
53 meta = filename
54 if not data and filename.endswith('.data'):
55 data = filename
56 if not tomb and filename.endswith('.ts'):
57 tomb = filename
58 if (filename < tomb or # any file older than tomb
59 filename < data or # any file older than data
60 (filename.endswith('.meta') and
61 filename < meta)): # old meta
62 os.unlink(join(hsh_path, filename))
63 files.remove(filename)
64 if not files:
65 os.rmdir(hsh_path)
66 for filename in files:
67 md5.update(filename)
68 try:
69 os.rmdir(path)
70 except OSError:
71 pass
72 return md5.hexdigest()
73
74
75def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
76 """
77 Recalculates hashes for the given suffixes in the partition and updates
78 them in the partition's hashes file.
79
80 :param partition_dir: directory of the partition in which to recalculate
81 :param suffixes: list of suffixes to recalculate
82 :param reclaim_age: age in seconds at which tombstones should be removed
83 """
84
85 def tpool_listdir(partition_dir):
86 return dict(((suff, None) for suff in os.listdir(partition_dir)
87 if len(suff) == 3 and isdir(join(partition_dir, suff))))
88 hashes_file = join(partition_dir, HASH_FILE)
89 with lock_path(partition_dir):
90 try:
91 with open(hashes_file, 'rb') as fp:
92 hashes = pickle.load(fp)
93 except Exception:
94 hashes = tpool.execute(tpool_listdir, partition_dir)
95 for suffix in suffixes:
96 suffix_dir = join(partition_dir, suffix)
97 if os.path.exists(suffix_dir):
98 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
99 elif suffix in hashes:
100 del hashes[suffix]
101 with open(hashes_file + '.tmp', 'wb') as fp:
102 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
103 renamer(hashes_file + '.tmp', hashes_file)
104
105
106def invalidate_hash(suffix_dir):
107 """
108 Invalidates the hash for a suffix_dir in the partition's hashes file.
109
110 :param suffix_dir: absolute path to suffix dir whose hash needs
111 invalidating
112 """
113
114 suffix = os.path.basename(suffix_dir)
115 partition_dir = os.path.dirname(suffix_dir)
116 hashes_file = join(partition_dir, HASH_FILE)
117 with lock_path(partition_dir):
118 try:
119 with open(hashes_file, 'rb') as fp:
120 hashes = pickle.load(fp)
121 if suffix in hashes and not hashes[suffix]:
122 return
123 except Exception:
124 return
125 hashes[suffix] = None
126 with open(hashes_file + '.tmp', 'wb') as fp:
127 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
128 renamer(hashes_file + '.tmp', hashes_file)
129
130
131def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
132 """
133 Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
134 the hash cache for suffix existence at the (unexpectedly high) cost of a
135 listdir. reclaim_age is just passed on to hash_suffix.
136
137 :param partition_dir: absolute path of partition to get hashes for
138 :param do_listdir: force existence check for all hashes in the partition
139 :param reclaim_age: age at which to remove tombstones
140
141 :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
142 """
143
144 def tpool_listdir(hashes, partition_dir):
145 return dict(((suff, hashes.get(suff, None))
146 for suff in os.listdir(partition_dir)
147 if len(suff) == 3 and isdir(join(partition_dir, suff))))
148 hashed = 0
149 hashes_file = join(partition_dir, HASH_FILE)
150 with lock_path(partition_dir):
151 modified = False
152 hashes = {}
153 try:
154 with open(hashes_file, 'rb') as fp:
155 hashes = pickle.load(fp)
156 except Exception:
157 do_listdir = True
158 if do_listdir:
159 hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
160 modified = True
161 for suffix, hash_ in hashes.items():
162 if not hash_:
163 suffix_dir = join(partition_dir, suffix)
164 if os.path.exists(suffix_dir):
165 try:
166 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
167 hashed += 1
168 except OSError:
169 logging.exception('Error hashing suffix')
170 hashes[suffix] = None
171 else:
172 del hashes[suffix]
173 modified = True
174 sleep()
175 if modified:
176 with open(hashes_file + '.tmp', 'wb') as fp:
177 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
178 renamer(hashes_file + '.tmp', hashes_file)
179 return hashed, hashes
0180
=== modified file 'swift/obj/replicator.py'
--- swift/obj/replicator.py 2010-10-19 01:05:54 +0000
+++ swift/obj/replicator.py 2010-10-19 18:43:49 +0000
@@ -19,7 +19,6 @@
19import shutil19import shutil
20import time20import time
21import logging21import logging
22import hashlib
23import itertools22import itertools
24import cPickle as pickle23import cPickle as pickle
2524
@@ -29,168 +28,16 @@
29from eventlet.support.greenlets import GreenletExit28from eventlet.support.greenlets import GreenletExit
3029
31from swift.common.ring import Ring30from swift.common.ring import Ring
32from swift.common.utils import whataremyips, unlink_older_than, lock_path, \31from swift.common.utils import compute_eta, get_logger, unlink_older_than, \
33 renamer, compute_eta, get_logger32 whataremyips
34from swift.common.bufferedhttp import http_connect33from swift.common.bufferedhttp import http_connect
35from swift.common.daemon import Daemon34from swift.common.daemon import Daemon
35from swift.obj.hashes import get_hashes, recalculate_hashes
36from swift.obj.server import DATADIR
37
3638
37hubs.use_hub('poll')39hubs.use_hub('poll')
3840
39PICKLE_PROTOCOL = 2
40ONE_WEEK = 604800
41HASH_FILE = 'hashes.pkl'
42
43
44def hash_suffix(path, reclaim_age):
45 """
46 Performs reclamation and returns an md5 of all (remaining) files.
47
48 :param reclaim_age: age in seconds at which to remove tombstones
49 """
50 md5 = hashlib.md5()
51 for hsh in sorted(os.listdir(path)):
52 hsh_path = join(path, hsh)
53 files = os.listdir(hsh_path)
54 if len(files) == 1:
55 if files[0].endswith('.ts'):
56 # remove tombstones older than reclaim_age
57 ts = files[0].rsplit('.', 1)[0]
58 if (time.time() - float(ts)) > reclaim_age:
59 os.unlink(join(hsh_path, files[0]))
60 files.remove(files[0])
61 elif files:
62 files.sort(reverse=True)
63 meta = data = tomb = None
64 for filename in files:
65 if not meta and filename.endswith('.meta'):
66 meta = filename
67 if not data and filename.endswith('.data'):
68 data = filename
69 if not tomb and filename.endswith('.ts'):
70 tomb = filename
71 if (filename < tomb or # any file older than tomb
72 filename < data or # any file older than data
73 (filename.endswith('.meta') and
74 filename < meta)): # old meta
75 os.unlink(join(hsh_path, filename))
76 files.remove(filename)
77 if not files:
78 os.rmdir(hsh_path)
79 for filename in files:
80 md5.update(filename)
81 try:
82 os.rmdir(path)
83 except OSError:
84 pass
85 return md5.hexdigest()
86
87
88def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
89 """
90 Recalculates hashes for the given suffixes in the partition and updates
91 them in the partition's hashes file.
92
93 :param partition_dir: directory of the partition in which to recalculate
94 :param suffixes: list of suffixes to recalculate
95 :param reclaim_age: age in seconds at which tombstones should be removed
96 """
97
98 def tpool_listdir(partition_dir):
99 return dict(((suff, None) for suff in os.listdir(partition_dir)
100 if len(suff) == 3 and isdir(join(partition_dir, suff))))
101 hashes_file = join(partition_dir, HASH_FILE)
102 with lock_path(partition_dir):
103 try:
104 with open(hashes_file, 'rb') as fp:
105 hashes = pickle.load(fp)
106 except Exception:
107 hashes = tpool.execute(tpool_listdir, partition_dir)
108 for suffix in suffixes:
109 suffix_dir = join(partition_dir, suffix)
110 if os.path.exists(suffix_dir):
111 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
112 elif suffix in hashes:
113 del hashes[suffix]
114 with open(hashes_file + '.tmp', 'wb') as fp:
115 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
116 renamer(hashes_file + '.tmp', hashes_file)
117
118
119def invalidate_hash(suffix_dir):
120 """
121 Invalidates the hash for a suffix_dir in the partition's hashes file.
122
123 :param suffix_dir: absolute path to suffix dir whose hash needs
124 invalidating
125 """
126
127 suffix = os.path.basename(suffix_dir)
128 partition_dir = os.path.dirname(suffix_dir)
129 hashes_file = join(partition_dir, HASH_FILE)
130 with lock_path(partition_dir):
131 try:
132 with open(hashes_file, 'rb') as fp:
133 hashes = pickle.load(fp)
134 if suffix in hashes and not hashes[suffix]:
135 return
136 except Exception:
137 return
138 hashes[suffix] = None
139 with open(hashes_file + '.tmp', 'wb') as fp:
140 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
141 renamer(hashes_file + '.tmp', hashes_file)
142
143
144def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
145 """
146 Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
147 the hash cache for suffix existence at the (unexpectedly high) cost of a
148 listdir. reclaim_age is just passed on to hash_suffix.
149
150 :param partition_dir: absolute path of partition to get hashes for
151 :param do_listdir: force existence check for all hashes in the partition
152 :param reclaim_age: age at which to remove tombstones
153
154 :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
155 """
156
157 def tpool_listdir(hashes, partition_dir):
158 return dict(((suff, hashes.get(suff, None))
159 for suff in os.listdir(partition_dir)
160 if len(suff) == 3 and isdir(join(partition_dir, suff))))
161 hashed = 0
162 hashes_file = join(partition_dir, HASH_FILE)
163 with lock_path(partition_dir):
164 modified = False
165 hashes = {}
166 try:
167 with open(hashes_file, 'rb') as fp:
168 hashes = pickle.load(fp)
169 except Exception:
170 do_listdir = True
171 if do_listdir:
172 hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
173 modified = True
174 for suffix, hash_ in hashes.items():
175 if not hash_:
176 suffix_dir = join(partition_dir, suffix)
177 if os.path.exists(suffix_dir):
178 try:
179 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
180 hashed += 1
181 except OSError:
182 logging.exception('Error hashing suffix')
183 hashes[suffix] = None
184 else:
185 del hashes[suffix]
186 modified = True
187 sleep()
188 if modified:
189 with open(hashes_file + '.tmp', 'wb') as fp:
190 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
191 renamer(hashes_file + '.tmp', hashes_file)
192 return hashed, hashes
193
19441
195class ObjectReplicator(Daemon):42class ObjectReplicator(Daemon):
196 """43 """
@@ -301,8 +148,9 @@
301 had_any = True148 had_any = True
302 if not had_any:149 if not had_any:
303 return False150 return False
304 args.append(join(rsync_module, node['device'],151 objdir = job.get('segments') and SEGMENTSDIR or DATADIR
305 'objects', job['partition']))152 args.append(join(rsync_module, node['device'], objdir,
153 job['partition']))
306 return self._rsync(args) == 0154 return self._rsync(args) == 0
307155
308 def check_ring(self):156 def check_ring(self):
@@ -337,12 +185,15 @@
337 for node in job['nodes']:185 for node in job['nodes']:
338 success = self.rsync(node, job, suffixes)186 success = self.rsync(node, job, suffixes)
339 if success:187 if success:
188 headers = {'Content-Length': '0'}
189 if job.get('segments'):
190 headers['X-Object-Type'] = 'segment'
340 with Timeout(self.http_timeout):191 with Timeout(self.http_timeout):
341 http_connect(node['ip'],192 http_connect(node['ip'],
342 node['port'],193 node['port'],
343 node['device'], job['partition'], 'REPLICATE',194 node['device'], job['partition'], 'REPLICATE',
344 '/' + '-'.join(suffixes),195 '/' + '-'.join(suffixes),
345 headers={'Content-Length': '0'}).getresponse().read()196 headers=headers).getresponse().read()
346 responses.append(success)197 responses.append(success)
347 if not suffixes or (len(responses) == \198 if not suffixes or (len(responses) == \
348 self.object_ring.replica_count and all(responses)):199 self.object_ring.replica_count and all(responses)):
@@ -374,10 +225,13 @@
374 node = next(nodes)225 node = next(nodes)
375 attempts_left -= 1226 attempts_left -= 1
376 try:227 try:
228 headers = {'Content-Length': '0'}
229 if job.get('segments'):
230 headers['X-Object-Type'] = 'segment'
377 with Timeout(self.http_timeout):231 with Timeout(self.http_timeout):
378 resp = http_connect(node['ip'], node['port'],232 resp = http_connect(node['ip'], node['port'],
379 node['device'], job['partition'], 'REPLICATE',233 node['device'], job['partition'], 'REPLICATE',
380 '', headers={'Content-Length': '0'}).getresponse()234 '', headers=headers).getresponse()
381 if resp.status == 507:235 if resp.status == 507:
382 self.logger.error('%s/%s responded as unmounted' %236 self.logger.error('%s/%s responded as unmounted' %
383 (node['ip'], node['device']))237 (node['ip'], node['device']))
@@ -397,11 +251,14 @@
397 self.rsync(node, job, suffixes)251 self.rsync(node, job, suffixes)
398 recalculate_hashes(job['path'], suffixes,252 recalculate_hashes(job['path'], suffixes,
399 reclaim_age=self.reclaim_age)253 reclaim_age=self.reclaim_age)
254 headers = {'Content-Length': '0'}
255 if job.get('segments'):
256 headers['X-Object-Type'] = 'segment'
400 with Timeout(self.http_timeout):257 with Timeout(self.http_timeout):
401 conn = http_connect(node['ip'], node['port'],258 conn = http_connect(node['ip'], node['port'],
402 node['device'], job['partition'], 'REPLICATE',259 node['device'], job['partition'], 'REPLICATE',
403 '/' + '-'.join(suffixes),260 '/' + '-'.join(suffixes),
404 headers={'Content-Length': '0'})261 headers=headers)
405 conn.getresponse().read()262 conn.getresponse().read()
406 self.suffix_sync += len(suffixes)263 self.suffix_sync += len(suffixes)
407 except (Exception, Timeout):264 except (Exception, Timeout):
@@ -489,24 +346,27 @@
489 dev for dev in self.object_ring.devs346 dev for dev in self.object_ring.devs
490 if dev and dev['ip'] in ips and dev['port'] == self.port]:347 if dev and dev['ip'] in ips and dev['port'] == self.port]:
491 dev_path = join(self.devices_dir, local_dev['device'])348 dev_path = join(self.devices_dir, local_dev['device'])
492 obj_path = join(dev_path, 'objects')
493 tmp_path = join(dev_path, 'tmp')
494 if self.mount_check and not os.path.ismount(dev_path):349 if self.mount_check and not os.path.ismount(dev_path):
495 self.logger.warn('%s is not mounted' % local_dev['device'])350 self.logger.warn('%s is not mounted' % local_dev['device'])
496 continue351 continue
352 tmp_path = join(dev_path, 'tmp')
497 unlink_older_than(tmp_path, time.time() - self.reclaim_age)353 unlink_older_than(tmp_path, time.time() - self.reclaim_age)
498 if not os.path.exists(obj_path):354 for objdir in (DATADIR, SEGMENTSDIR):
499 continue355 obj_path = join(dev_path, objdir)
500 for partition in os.listdir(obj_path):356 if os.path.exists(obj_path):
501 try:357 for partition in os.listdir(obj_path):
502 nodes = [node for node in358 try:
503 self.object_ring.get_part_nodes(int(partition))359 nodes = [node for node in
504 if node['id'] != local_dev['id']]360 self.object_ring.get_part_nodes(
505 jobs.append(dict(path=join(obj_path, partition),361 int(partition))
506 nodes=nodes, delete=len(nodes) > 2,362 if node['id'] != local_dev['id']]
507 partition=partition))363 jobs.append(dict(
508 except ValueError:364 path=join(obj_path, partition),
509 continue365 nodes=nodes, delete=len(nodes) > 2,
366 partition=partition,
367 segments=(objdir == SEGMENTSDIR)))
368 except ValueError:
369 continue
510 random.shuffle(jobs)370 random.shuffle(jobs)
511 # Partititons that need to be deleted take priority371 # Partititons that need to be deleted take priority
512 jobs.sort(key=lambda job: not job['delete'])372 jobs.sort(key=lambda job: not job['delete'])
513373
=== modified file 'swift/obj/server.py'
--- swift/obj/server.py 2010-10-19 01:05:54 +0000
+++ swift/obj/server.py 2010-10-19 18:43:49 +0000
@@ -42,12 +42,12 @@
42from swift.common.constraints import check_object_creation, check_mount, \42from swift.common.constraints import check_object_creation, check_mount, \
43 check_float, check_utf843 check_float, check_utf8
44from swift.common.exceptions import ConnectionTimeout44from swift.common.exceptions import ConnectionTimeout
45from swift.obj.replicator import get_hashes, invalidate_hash, \45from swift.obj.hashes import get_hashes, invalidate_hash, recalculate_hashes
46 recalculate_hashes
4746
4847
49DATADIR = 'objects'48DATADIR = 'objects'
50ASYNCDIR = 'async_pending'49SEGMENTSDIR = 'object_segments'
50ASYNCDIR = 'object_async'
51PICKLE_PROTOCOL = 251PICKLE_PROTOCOL = 2
52METADATA_KEY = 'user.swift.metadata'52METADATA_KEY = 'user.swift.metadata'
53MAX_OBJECT_NAME_LENGTH = 102453MAX_OBJECT_NAME_LENGTH = 1024
@@ -84,15 +84,31 @@
84 :param obj: object name for the object84 :param obj: object name for the object
85 :param keep_data_fp: if True, don't close the fp, otherwise close it85 :param keep_data_fp: if True, don't close the fp, otherwise close it
86 :param disk_chunk_Size: size of chunks on file reads86 :param disk_chunk_Size: size of chunks on file reads
87 :param segment: If set to not None, indicates which segment of an object
88 this file represents
89 :param segment_timestamp: X-Timestamp of the object's segments (set on the
90 PUT, not changed on POSTs), required if segment
91 is set to not None
87 """92 """
8893
89 def __init__(self, path, device, partition, account, container, obj,94 def __init__(self, path, device, partition, account, container, obj,
90 keep_data_fp=False, disk_chunk_size=65536):95 keep_data_fp=False, disk_chunk_size=65536, segment=None,
96 segment_timestamp=None):
91 self.disk_chunk_size = disk_chunk_size97 self.disk_chunk_size = disk_chunk_size
92 self.name = '/' + '/'.join((account, container, obj))98 self.name = '/' + '/'.join((account, container, obj))
93 name_hash = hash_path(account, container, obj)99 if segment and int(segment):
94 self.datadir = os.path.join(path, device,100 ring_obj = '%s/%s/%s' % (obj, segment_timestamp, segment)
95 storage_directory(DATADIR, partition, name_hash))101 else:
102 ring_obj = obj
103 name_hash = hash_path(account, container, ring_obj)
104 if segment is not None:
105 self.datadir = os.path.join(path, device,
106 storage_directory(SEGMENTSDIR, partition, name_hash))
107 self.no_longer_segment_datadir = os.path.join(path, device,
108 storage_directory(DATADIR, partition, name_hash))
109 else:
110 self.datadir = os.path.join(path, device,
111 storage_directory(DATADIR, partition, name_hash))
96 self.tmpdir = os.path.join(path, device, 'tmp')112 self.tmpdir = os.path.join(path, device, 'tmp')
97 self.metadata = {}113 self.metadata = {}
98 self.meta_file = None114 self.meta_file = None
@@ -195,7 +211,8 @@
195 except OSError:211 except OSError:
196 pass212 pass
197213
198 def put(self, fd, tmppath, metadata, extension='.data'):214 def put(self, fd, tmppath, metadata, extension='.data',
215 no_longer_segment=False):
199 """216 """
200 Finalize writing the file on disk, and renames it from the temp file to217 Finalize writing the file on disk, and renames it from the temp file to
201 the real location. This should be called after the data has been218 the real location. This should be called after the data has been
@@ -204,7 +221,10 @@
204 :params fd: file descriptor of the temp file221 :params fd: file descriptor of the temp file
205 :param tmppath: path to the temporary file being used222 :param tmppath: path to the temporary file being used
206 :param metadata: dictionary of metada to be written223 :param metadata: dictionary of metada to be written
207 :param extention: extension to be used when making the file224 :param extension: extension to be used when making the file
225 :param no_longer_segment: Set to True if this was originally an object
226 segment but no longer is (case with chunked transfer encoding when
227 the object ends up less than the segment size)
208 """228 """
209 metadata['name'] = self.name229 metadata['name'] = self.name
210 timestamp = normalize_timestamp(metadata['X-Timestamp'])230 timestamp = normalize_timestamp(metadata['X-Timestamp'])
@@ -217,6 +237,8 @@
217 if 'Content-Length' in metadata:237 if 'Content-Length' in metadata:
218 drop_buffer_cache(fd, 0, int(metadata['Content-Length']))238 drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
219 os.fsync(fd)239 os.fsync(fd)
240 if no_longer_segment:
241 self.datadir = self.no_longer_segment_datadir
220 invalidate_hash(os.path.dirname(self.datadir))242 invalidate_hash(os.path.dirname(self.datadir))
221 renamer(tmppath, os.path.join(self.datadir, timestamp + extension))243 renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
222 self.metadata = metadata244 self.metadata = metadata
@@ -355,7 +377,9 @@
355 if error_response:377 if error_response:
356 return error_response378 return error_response
357 file = DiskFile(self.devices, device, partition, account, container,379 file = DiskFile(self.devices, device, partition, account, container,
358 obj, disk_chunk_size=self.disk_chunk_size)380 obj, disk_chunk_size=self.disk_chunk_size,
381 segment=request.headers.get('x-object-segment'),
382 segment_timestamp=request.headers['x-timestamp'])
359 upload_expiration = time.time() + self.max_upload_time383 upload_expiration = time.time() + self.max_upload_time
360 etag = md5()384 etag = md5()
361 upload_size = 0385 upload_size = 0
@@ -397,17 +421,32 @@
397 if 'content-encoding' in request.headers:421 if 'content-encoding' in request.headers:
398 metadata['Content-Encoding'] = \422 metadata['Content-Encoding'] = \
399 request.headers['Content-Encoding']423 request.headers['Content-Encoding']
400 file.put(fd, tmppath, metadata)424 if 'x-object-type' in request.headers:
425 metadata['X-Object-Type'] = request.headers['x-object-type']
426 if 'x-object-segment' in request.headers:
427 metadata['X-Object-Segment'] = \
428 request.headers['x-object-segment']
429 no_longer_segment = False
430 if 'x-object-segment-if-length' in request.headers and \
431 int(request.headers['x-object-segment-if-length']) != \
432 os.fstat(fd).st_size:
433 del metadata['X-Object-Type']
434 del metadata['X-Object-Segment']
435 no_longer_segment = True
436 file.put(fd, tmppath, metadata,
437 no_longer_segment=no_longer_segment)
401 file.unlinkold(metadata['X-Timestamp'])438 file.unlinkold(metadata['X-Timestamp'])
402 self.container_update('PUT', account, container, obj, request.headers,439 if 'X-Object-Segment' not in file.metadata:
403 {'x-size': file.metadata['Content-Length'],440 self.container_update('PUT', account, container, obj,
404 'x-content-type': file.metadata['Content-Type'],441 request.headers,
405 'x-timestamp': file.metadata['X-Timestamp'],442 {'x-size': request.headers.get('x-object-length',
406 'x-etag': file.metadata['ETag'],443 file.metadata['Content-Length']),
407 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},444 'x-content-type': file.metadata['Content-Type'],
408 device)445 'x-timestamp': file.metadata['X-Timestamp'],
409 resp = HTTPCreated(request=request, etag=etag)446 'x-etag': file.metadata['ETag'],
410 return resp447 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
448 device)
449 return HTTPCreated(request=request, etag=etag)
411450
412 def GET(self, request):451 def GET(self, request):
413 """Handle HTTP GET requests for the Swift Object Server."""452 """Handle HTTP GET requests for the Swift Object Server."""
@@ -420,7 +459,9 @@
420 if self.mount_check and not check_mount(self.devices, device):459 if self.mount_check and not check_mount(self.devices, device):
421 return Response(status='507 %s is not mounted' % device)460 return Response(status='507 %s is not mounted' % device)
422 file = DiskFile(self.devices, device, partition, account, container,461 file = DiskFile(self.devices, device, partition, account, container,
423 obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size)462 obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size,
463 segment=request.headers.get('x-object-segment'),
464 segment_timestamp=request.headers.get('x-object-segment-timestamp'))
424 if file.is_deleted():465 if file.is_deleted():
425 if request.headers.get('if-match') == '*':466 if request.headers.get('if-match') == '*':
426 return HTTPPreconditionFailed(request=request)467 return HTTPPreconditionFailed(request=request)
@@ -460,7 +501,8 @@
460 'application/octet-stream'), app_iter=file,501 'application/octet-stream'), app_iter=file,
461 request=request, conditional_response=True)502 request=request, conditional_response=True)
462 for key, value in file.metadata.iteritems():503 for key, value in file.metadata.iteritems():
463 if key.lower().startswith('x-object-meta-'):504 if key.lower().startswith('x-object-meta-') or \
505 key.lower() in ('x-object-type', 'x-object-segment'):
464 response.headers[key] = value506 response.headers[key] = value
465 response.etag = file.metadata['ETag']507 response.etag = file.metadata['ETag']
466 response.last_modified = float(file.metadata['X-Timestamp'])508 response.last_modified = float(file.metadata['X-Timestamp'])
@@ -482,13 +524,16 @@
482 if self.mount_check and not check_mount(self.devices, device):524 if self.mount_check and not check_mount(self.devices, device):
483 return Response(status='507 %s is not mounted' % device)525 return Response(status='507 %s is not mounted' % device)
484 file = DiskFile(self.devices, device, partition, account, container,526 file = DiskFile(self.devices, device, partition, account, container,
485 obj, disk_chunk_size=self.disk_chunk_size)527 obj, disk_chunk_size=self.disk_chunk_size,
528 segment=request.headers.get('x-object-segment'),
529 segment_timestamp=request.headers.get('x-object-segment-timestamp'))
486 if file.is_deleted():530 if file.is_deleted():
487 return HTTPNotFound(request=request)531 return HTTPNotFound(request=request)
488 response = Response(content_type=file.metadata['Content-Type'],532 response = Response(content_type=file.metadata['Content-Type'],
489 request=request, conditional_response=True)533 request=request, conditional_response=True)
490 for key, value in file.metadata.iteritems():534 for key, value in file.metadata.iteritems():
491 if key.lower().startswith('x-object-meta-'):535 if key.lower().startswith('x-object-meta-') or \
536 key.lower() in ('x-object-type', 'x-object-segment'):
492 response.headers[key] = value537 response.headers[key] = value
493 response.etag = file.metadata['ETag']538 response.etag = file.metadata['ETag']
494 response.last_modified = float(file.metadata['X-Timestamp'])539 response.last_modified = float(file.metadata['X-Timestamp'])
@@ -513,7 +558,9 @@
513 return Response(status='507 %s is not mounted' % device)558 return Response(status='507 %s is not mounted' % device)
514 response_class = HTTPNoContent559 response_class = HTTPNoContent
515 file = DiskFile(self.devices, device, partition, account, container,560 file = DiskFile(self.devices, device, partition, account, container,
516 obj, disk_chunk_size=self.disk_chunk_size)561 obj, disk_chunk_size=self.disk_chunk_size,
562 segment=request.headers.get('x-object-segment'),
563 segment_timestamp=request.headers.get('x-object-segment-timestamp'))
517 if file.is_deleted():564 if file.is_deleted():
518 response_class = HTTPNotFound565 response_class = HTTPNotFound
519 metadata = {566 metadata = {
@@ -522,10 +569,11 @@
522 with file.mkstemp() as (fd, tmppath):569 with file.mkstemp() as (fd, tmppath):
523 file.put(fd, tmppath, metadata, extension='.ts')570 file.put(fd, tmppath, metadata, extension='.ts')
524 file.unlinkold(metadata['X-Timestamp'])571 file.unlinkold(metadata['X-Timestamp'])
525 self.container_update('DELETE', account, container, obj,572 if 'x-object-segment' not in request.headers:
526 request.headers, {'x-timestamp': metadata['X-Timestamp'],573 self.container_update('DELETE', account, container, obj,
527 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},574 request.headers, {'x-timestamp': metadata['X-Timestamp'],
528 device)575 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
576 device)
529 resp = response_class(request=request)577 resp = response_class(request=request)
530 return resp578 return resp
531579
@@ -538,7 +586,10 @@
538 unquote(request.path), 2, 3, True)586 unquote(request.path), 2, 3, True)
539 if self.mount_check and not check_mount(self.devices, device):587 if self.mount_check and not check_mount(self.devices, device):
540 return Response(status='507 %s is not mounted' % device)588 return Response(status='507 %s is not mounted' % device)
541 path = os.path.join(self.devices, device, DATADIR, partition)589 if request.headers.get('x-object-type') == 'segment':
590 path = os.path.join(self.devices, device, SEGMENTSDIR, partition)
591 else:
592 path = os.path.join(self.devices, device, DATADIR, partition)
542 if not os.path.exists(path):593 if not os.path.exists(path):
543 mkdirs(path)594 mkdirs(path)
544 if suffix:595 if suffix:
545596
=== modified file 'swift/proxy/server.py'
--- swift/proxy/server.py 2010-10-15 15:07:19 +0000
+++ swift/proxy/server.py 2010-10-19 18:43:49 +0000
@@ -14,21 +14,25 @@
14# limitations under the License.14# limitations under the License.
1515
16from __future__ import with_statement16from __future__ import with_statement
17try:
18 import simplejson as json
19except ImportError:
20 import json
17import mimetypes21import mimetypes
18import os22import os
19import time23import time
20import traceback24import traceback
21from ConfigParser import ConfigParser25from ConfigParser import ConfigParser
26from hashlib import md5
22from urllib import unquote, quote27from urllib import unquote, quote
23import uuid28import uuid
24import functools29import functools
2530
26from eventlet.timeout import Timeout31from eventlet.timeout import Timeout
27from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \32from webob.exc import HTTPBadRequest, HTTPCreated, HTTPInternalServerError, \
28 HTTPNotFound, HTTPPreconditionFailed, \33 HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
29 HTTPRequestTimeout, HTTPServiceUnavailable, \34 HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \
30 HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \35 HTTPServiceUnavailable, HTTPUnprocessableEntity, status_map
31 status_map
32from webob import Request, Response36from webob import Request, Response
3337
34from swift.common.ring import Ring38from swift.common.ring import Ring
@@ -37,7 +41,7 @@
37from swift.common.bufferedhttp import http_connect41from swift.common.bufferedhttp import http_connect
38from swift.common.constraints import check_metadata, check_object_creation, \42from swift.common.constraints import check_metadata, check_object_creation, \
39 check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \43 check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
40 MAX_FILE_SIZE44 MAX_FILE_SIZE, SEGMENT_SIZE
41from swift.common.exceptions import ChunkReadTimeout, \45from swift.common.exceptions import ChunkReadTimeout, \
42 ChunkWriteTimeout, ConnectionTimeout46 ChunkWriteTimeout, ConnectionTimeout
4347
@@ -89,6 +93,135 @@
89 return wrapped93 return wrapped
9094
9195
96class SegmentedIterable(object):
97 """
98 Iterable that returns the object contents for a segmented object in Swift.
99
100 In addition to these params, you can also set the `response` attr just
101 after creating the SegmentedIterable and it will update the response's
102 `bytes_transferred` value. Be sure to set the `bytes_transferred` value to
103 0 beforehand.
104
105 :param controller: The ObjectController instance to work with.
106 :param content_length: The total length of the object.
107 :param segment_size: The length of each segment (except perhaps the last)
108 of the object.
109 :param timestamp: The X-Timestamp of the object's segments (set on the PUT,
110 not changed on the POSTs).
111 """
112
113 def __init__(self, controller, content_length, segment_size, timestamp):
114 self.controller = controller
115 self.content_length = content_length
116 self.segment_size = segment_size
117 self.timestamp = timestamp
118 self.position = 0
119 self.segment = -1
120 self.segment_iter = None
121 self.response = None
122
123 def load_next_segment(self):
124 """ Loads the self.segment_iter with the next segment's contents. """
125 self.segment += 1
126 if self.segment:
127 ring_object_name = '%s/%s/%s' % (self.controller.object_name,
128 self.timestamp, self.segment)
129 else:
130 ring_object_name = self.controller.object_name
131 partition, nodes = self.controller.app.object_ring.get_nodes(
132 self.controller.account_name, self.controller.container_name,
133 ring_object_name)
134 path = '/%s/%s/%s' % (self.controller.account_name,
135 self.controller.container_name, self.controller.object_name)
136 req = Request.blank(path, headers={'X-Object-Segment': self.segment,
137 'X-Object-Segment-Timestamp': self.timestamp})
138 resp = self.controller.GETorHEAD_base(req, 'Object',
139 partition, self.controller.iter_nodes(partition, nodes,
140 self.controller.app.object_ring), path,
141 self.controller.app.object_ring.replica_count)
142 if resp.status_int // 100 != 2:
143 raise Exception(
144 'Could not load segment %s of %s' % (self.segment, path))
145 self.segment_iter = resp.app_iter
146
147 def __iter__(self):
148 """ Standard iterator function that returns the object's contents. """
149 while self.position < self.content_length:
150 if not self.segment_iter:
151 self.load_next_segment()
152 while True:
153 with ChunkReadTimeout(self.controller.app.node_timeout):
154 try:
155 chunk = self.segment_iter.next()
156 break
157 except StopIteration:
158 self.load_next_segment()
159 if self.position + len(chunk) > self.content_length:
160 chunk = chunk[:self.content_length - self.position]
161 self.position += len(chunk)
162 if self.response:
163 self.response.bytes_transferred += len(chunk)
164 yield chunk
165
166 def app_iter_range(self, start, stop):
167 """
168 Non-standard iterator function for use with Webob in serving Range
169 requests more quickly.
170
171 TODO:
172
173 This currently helps on speed by jumping to the proper segment to start
174 with (and ending without reading the trailing segments, but that
175 already happened technically with __iter__).
176
177 But, what it does not do yet is issue a Range request with the first
178 segment to allow the object server to seek to the segment start point.
179
180 Instead, it just reads and throws away all leading segment data. Since
181 segments are 5G by default, it'll have to transfer the whole 5G from
182 the object server to the proxy server even if it only needs the last
183 byte. In practice, this should happen fairly quickly relative to how
184 long requests take for these very large files; but it's still wasteful.
185
186 Anyway, it shouldn't be too hard to implement, I just have other things
187 to work out first.
188
189 :param start: The first byte (zero-based) to return.
190 :param stop: The last byte (zero-based) to return.
191 """
192 if start:
193 self.segment = (start / self.segment_size) - 1
194 self.load_next_segment()
195 self.position = self.segment * self.segment_size
196 segment_start = start - (self.segment * self.segment_size)
197 while segment_start:
198 with ChunkReadTimeout(self.controller.app.node_timeout):
199 chunk = self.segment_iter.next()
200 self.position += len(chunk)
201 if len(chunk) > segment_start:
202 chunk = chunk[segment_start:]
203 if self.response:
204 self.response.bytes_transferred += len(chunk)
205 yield chunk
206 segment_start = 0
207 else:
208 segment_start -= len(chunk)
209 if stop is not None:
210 length = stop - start
211 else:
212 length = None
213 for chunk in self:
214 if length is not None:
215 length -= len(chunk)
216 if length < 0:
217 # Chop off the extra:
218 if self.response:
219 self.response.bytes_transferred -= length
220 yield chunk[:length]
221 break
222 yield chunk
223
224
92def get_container_memcache_key(account, container):225def get_container_memcache_key(account, container):
93 path = '/%s/%s' % (account, container)226 path = '/%s/%s' % (account, container)
94 return 'container%s' % path227 return 'container%s' % path
@@ -518,11 +651,56 @@
518 aresp = req.environ['swift.authorize'](req)651 aresp = req.environ['swift.authorize'](req)
519 if aresp:652 if aresp:
520 return aresp653 return aresp
654 # This is bit confusing, so an explanation:
655 # * First we attempt the GET/HEAD normally, as this is the usual case.
656 # * If the request was a Range request and gave us a 416 Unsatisfiable
657 # response, we might be trying to do an invalid Range on a manifest
658 # object, so we try again with no Range.
659 # * If it turns out we have a manifest object, and we had a Range
660 # request originally that actually succeeded or we had a HEAD
661 # request, we have to do the request again as a full GET because
662 # we'll need the whole manifest.
663 # * Finally, if we had a manifest object, we pass it and the request
664 # off to GETorHEAD_segmented; otherwise we just return the response.
521 partition, nodes = self.app.object_ring.get_nodes(665 partition, nodes = self.app.object_ring.get_nodes(
522 self.account_name, self.container_name, self.object_name)666 self.account_name, self.container_name, self.object_name)
523 return self.GETorHEAD_base(req, 'Object', partition,667 resp = mresp = self.GETorHEAD_base(req, 'Object', partition,
668 self.iter_nodes(partition, nodes, self.app.object_ring),
669 req.path_info, self.app.object_ring.replica_count)
670 range_value = None
671 if mresp.status_int == 416:
672 range_value = req.range
673 req.range = None
674 mresp = self.GETorHEAD_base(req, 'Object', partition,
524 self.iter_nodes(partition, nodes, self.app.object_ring),675 self.iter_nodes(partition, nodes, self.app.object_ring),
525 req.path_info, self.app.object_ring.replica_count)676 req.path_info, self.app.object_ring.replica_count)
677 if mresp.status_int // 100 != 2:
678 return resp
679 if 'x-object-type' in mresp.headers:
680 if mresp.headers['x-object-type'] == 'manifest':
681 if req.method == 'HEAD':
682 req.method = 'GET'
683 mresp = self.GETorHEAD_base(req, 'Object', partition,
684 self.iter_nodes(partition, nodes,
685 self.app.object_ring), req.path_info,
686 self.app.object_ring.replica_count)
687 if mresp.status_int // 100 != 2:
688 return mresp
689 req.method = 'HEAD'
690 elif req.range:
691 range_value = req.range
692 req.range = None
693 mresp = self.GETorHEAD_base(req, 'Object', partition,
694 self.iter_nodes(partition, nodes,
695 self.app.object_ring), req.path_info,
696 self.app.object_ring.replica_count)
697 if mresp.status_int // 100 != 2:
698 return mresp
699 if range_value:
700 req.range = range_value
701 return self.GETorHEAD_segmented(req, mresp)
702 return HTTPNotFound(request=req)
703 return resp
526704
527 @public705 @public
528 @delay_denial706 @delay_denial
@@ -536,6 +714,48 @@
536 """Handler for HTTP HEAD requests."""714 """Handler for HTTP HEAD requests."""
537 return self.GETorHEAD(req)715 return self.GETorHEAD(req)
538716
717 def GETorHEAD_segmented(self, req, mresp):
718 """
719 Performs a GET for a segmented object.
720
721 :param req: The webob.Request to process.
722 :param mresp: The webob.Response for the original manifest request.
723 :returns: webob.Response object.
724 """
725 manifest = json.loads(''.join(mresp.app_iter))
726 # Ah, the fun of JSONing strs and getting unicodes back. We
727 # reencode to UTF8 to ensure crap doesn't blow up everywhere
728 # else.
729 keys_to_encode = []
730 for k, v in manifest.iteritems():
731 if isinstance(k, unicode):
732 keys_to_encode.append(k)
733 if isinstance(v, unicode):
734 manifest[k] = v.encode('utf8')
735 for k in keys_to_encode:
736 v = manifest[k]
737 del manifest[k]
738 manifest[k.encode('utf8')] = v
739 content_length = int(manifest['content-length'])
740 segment_size = int(manifest['x-segment-size'])
741 headers = dict(mresp.headers)
742 headers.update(manifest)
743 del headers['x-segment-size']
744 resp = Response(app_iter=SegmentedIterable(self, content_length,
745 segment_size, manifest['x-timestamp']), headers=headers,
746 request=req, conditional_response=True)
747 resp.headers['etag'] = manifest['etag'].strip('"')
748 resp.last_modified = mresp.last_modified
749 resp.content_length = int(manifest['content-length'])
750 resp.content_type = manifest['content-type']
751 if 'content-encoding' in manifest:
752 resp.content_encoding = manifest['content-encoding']
753 cresp = req.get_response(resp)
754 # Needed for SegmentedIterable to update bytes_transferred
755 cresp.bytes_transferred = 0
756 resp.app_iter.response = cresp
757 return cresp
758
539 @public759 @public
540 @delay_denial760 @delay_denial
541 def POST(self, req):761 def POST(self, req):
@@ -652,11 +872,47 @@
652 if k.lower().startswith('x-object-meta-'):872 if k.lower().startswith('x-object-meta-'):
653 new_req.headers[k] = v873 new_req.headers[k] = v
654 req = new_req874 req = new_req
875 if req.headers.get('transfer-encoding') == 'chunked' or \
876 req.content_length > SEGMENT_SIZE:
877 resp = self.PUT_segmented_object(req, data_source, partition,
878 nodes, container_partition, containers)
879 else:
880 resp = self.PUT_whole_object(req, data_source, partition, nodes,
881 container_partition, containers)
882 if 'x-copy-from' in req.headers:
883 resp.headers['X-Copied-From'] = req.headers['x-copy-from']
884 for k, v in req.headers.items():
885 if k.lower().startswith('x-object-meta-'):
886 resp.headers[k] = v
887 resp.last_modified = float(req.headers['X-Timestamp'])
888 return resp
889
890 def PUT_whole_object(self, req, data_source, partition, nodes,
891 container_partition=None, containers=None):
892 """
893 Performs a PUT for a whole object (one with a content-length <=
894 SEGMENT_SIZE).
895
896 :param req: The webob.Request to process.
897 :param data_source: An iterator providing the data to store.
898 :param partition: The object ring partition the object falls on.
899 :param nodes: The object ring nodes the object falls on.
900 :param container_partition: The container ring partition the container
901 for the object falls on, None if the
902 container is not to be updated.
903 :param containers: The container ring nodes the container for the
904 object falls on, None if the container is not to be
905 updated.
906 :returns: webob.Response object.
907 """
908 conns = []
909 update_containers = containers is not None
655 for node in self.iter_nodes(partition, nodes, self.app.object_ring):910 for node in self.iter_nodes(partition, nodes, self.app.object_ring):
656 container = containers.pop()911 if update_containers:
657 req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container912 container = containers.pop()
658 req.headers['X-Container-Partition'] = container_partition913 req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
659 req.headers['X-Container-Device'] = container['device']914 req.headers['X-Container-Partition'] = container_partition
915 req.headers['X-Container-Device'] = container['device']
660 req.headers['Expect'] = '100-continue'916 req.headers['Expect'] = '100-continue'
661 resp = conn = None917 resp = conn = None
662 if not self.error_limited(node):918 if not self.error_limited(node):
@@ -674,12 +930,14 @@
674 if conn and resp:930 if conn and resp:
675 if resp.status == 100:931 if resp.status == 100:
676 conns.append(conn)932 conns.append(conn)
677 if not containers:933 if (update_containers and not containers) or \
934 len(conns) == len(nodes):
678 break935 break
679 continue936 continue
680 elif resp.status == 507:937 elif resp.status == 507:
681 self.error_limit(node)938 self.error_limit(node)
682 containers.insert(0, container)939 if update_containers:
940 containers.insert(0, container)
683 if len(conns) <= len(nodes) / 2:941 if len(conns) <= len(nodes) / 2:
684 self.app.logger.error(942 self.app.logger.error(
685 'Object PUT returning 503, %s/%s required connections, '943 'Object PUT returning 503, %s/%s required connections, '
@@ -765,15 +1023,123 @@
765 statuses.append(503)1023 statuses.append(503)
766 reasons.append('')1024 reasons.append('')
767 bodies.append('')1025 bodies.append('')
768 resp = self.best_response(req, statuses, reasons, bodies, 'Object PUT',1026 return self.best_response(req, statuses, reasons, bodies, 'Object PUT',
769 etag=etag)1027 etag=etag)
770 if 'x-copy-from' in req.headers:1028
771 resp.headers['X-Copied-From'] = req.headers['x-copy-from']1029 def PUT_segmented_object(self, req, data_source, partition, nodes,
772 for k, v in req.headers.items():1030 container_partition, containers):
773 if k.lower().startswith('x-object-meta-'):1031 """
774 resp.headers[k] = v1032 Performs a PUT for a segmented object (one with a content-length >
775 resp.last_modified = float(req.headers['X-Timestamp'])1033 SEGMENT_SIZE).
776 return resp1034
1035 :param req: The webob.Request to process.
1036 :param data_source: An iterator providing the data to store.
1037 :param partition: The object ring partition the object falls on.
1038 :param nodes: The object ring nodes the object falls on.
1039 :param container_partition: The container ring partition the container
1040 for the object falls on.
1041 :param containers: The container ring nodes the container for the
1042 object falls on.
1043 :returns: webob.Response object.
1044 """
1045 req.bytes_transferred = 0
1046 leftover_chunk = [None]
1047 etag = md5()
1048 def segment_iter():
1049 amount_given = 0
1050 while amount_given < SEGMENT_SIZE:
1051 if leftover_chunk[0]:
1052 chunk = leftover_chunk[0]
1053 leftover_chunk[0] = None
1054 else:
1055 with ChunkReadTimeout(self.app.client_timeout):
1056 chunk = data_source.next()
1057 req.bytes_transferred += len(chunk)
1058 etag.update(chunk)
1059 if amount_given + len(chunk) > SEGMENT_SIZE:
1060 yield chunk[:SEGMENT_SIZE - amount_given]
1061 leftover_chunk[0] = chunk[SEGMENT_SIZE - amount_given:]
1062 amount_given = SEGMENT_SIZE
1063 else:
1064 yield chunk
1065 amount_given += len(chunk)
1066 def segment_iter_iter():
1067 while True:
1068 if not leftover_chunk[0]:
1069 with ChunkReadTimeout(self.app.client_timeout):
1070 leftover_chunk[0] = data_source.next()
1071 req.bytes_transferred += len(leftover_chunk[0])
1072 etag.update(leftover_chunk[0])
1073 yield segment_iter()
1074 segment_number = 0
1075 chunked = req.headers.get('transfer-encoding') == 'chunked'
1076 if not chunked:
1077 amount_left = req.content_length
1078 headers = {'X-Timestamp': req.headers['X-Timestamp'],
1079 'Content-Type': req.headers['content-type'],
1080 'X-Object-Type': 'segment'}
1081 for segment_source in segment_iter_iter():
1082 if chunked:
1083 headers['Transfer-Encoding'] = 'chunked'
1084 if segment_number == 0:
1085 headers['X-Object-Segment-If-Length'] = SEGMENT_SIZE
1086 elif amount_left > SEGMENT_SIZE:
1087 headers['Content-Length'] = SEGMENT_SIZE
1088 else:
1089 headers['Content-Length'] = amount_left
1090 headers['X-Object-Segment'] = segment_number
1091 segment_req = Request.blank(req.path_info,
1092 environ={'REQUEST_METHOD': 'PUT'}, headers=headers)
1093 if 'X-Object-Segment-If-Length' in headers:
1094 del headers['X-Object-Segment-If-Length']
1095 if segment_number:
1096 ring_object_name = '%s/%s/%s' % (self.object_name,
1097 req.headers['x-timestamp'], segment_number)
1098 else:
1099 ring_object_name = self.object_name
1100 segment_partition, segment_nodes = self.app.object_ring.get_nodes(
1101 self.account_name, self.container_name, ring_object_name)
1102 resp = self.PUT_whole_object(segment_req, segment_source,
1103 segment_partition, segment_nodes)
1104 if resp.status_int // 100 == 4:
1105 return resp
1106 elif resp.status_int // 100 != 2:
1107 return HTTPServiceUnavailable(request=req,
1108 body='Unable to complete very large file operation.')
1109 if segment_number == 0 and req.bytes_transferred < SEGMENT_SIZE:
1110 return HTTPCreated(request=req, etag=etag.hexdigest())
1111 if not chunked:
1112 amount_left -= SEGMENT_SIZE
1113 segment_number += 1
1114 etag = etag.hexdigest()
1115 if 'etag' in req.headers and req.headers['etag'].lower() != etag:
1116 return HTTPUnprocessableEntity(request=req)
1117 manifest = {'x-timestamp': req.headers['x-timestamp'],
1118 'content-length': req.bytes_transferred,
1119 'content-type': req.headers['content-type'],
1120 'x-segment-size': SEGMENT_SIZE,
1121 'etag': etag}
1122 if 'content-encoding' in req.headers:
1123 manifest['content-encoding'] = req.headers['content-encoding']
1124 manifest = json.dumps(manifest)
1125 headers = {'X-Timestamp': req.headers['X-Timestamp'],
1126 'Content-Type': req.headers['content-type'],
1127 'Content-Length': len(manifest),
1128 'X-Object-Type': 'manifest',
1129 'X-Object-Length': req.bytes_transferred}
1130 headers.update(i for i in req.headers.iteritems()
1131 if i[0].lower().startswith('x-object-meta-') and len(i[0]) > 14)
1132 manifest_req = Request.blank(req.path_info,
1133 environ={'REQUEST_METHOD': 'PUT'}, body=manifest, headers=headers)
1134 manifest_source = iter(lambda:
1135 manifest_req.body_file.read(self.app.client_chunk_size), '')
1136 resp = self.PUT_whole_object(manifest_req, manifest_source, partition,
1137 nodes, container_partition=container_partition,
1138 containers=containers)
1139 if resp.status_int // 100 != 2:
1140 return HTTPServiceUnavailable(request=req,
1141 body='Unable to complete very large file operation.')
1142 return HTTPCreated(request=req, etag=etag)
7771143
778 @public1144 @public
779 @delay_denial1145 @delay_denial
7801146
=== modified file 'test/unit/__init__.py'
--- test/unit/__init__.py 2010-07-29 20:06:01 +0000
+++ test/unit/__init__.py 2010-10-19 18:43:49 +0000
@@ -9,6 +9,8 @@
9 crlfs = 09 crlfs = 0
10 while crlfs < 2:10 while crlfs < 2:
11 c = fd.read(1)11 c = fd.read(1)
12 if not len(c):
13 raise Exception('Never read 2crlfs; got %s' % repr(rv))
12 rv = rv + c14 rv = rv + c
13 if c == '\r' and lc != '\n':15 if c == '\r' and lc != '\n':
14 crlfs = 016 crlfs = 0
1517
=== added file 'test/unit/obj/test_hashes.py'
--- test/unit/obj/test_hashes.py 1970-01-01 00:00:00 +0000
+++ test/unit/obj/test_hashes.py 2010-10-19 18:43:49 +0000
@@ -0,0 +1,28 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# TODO: Tests
17
18import unittest
19from swift.obj import hashes
20
21class TestHashes(unittest.TestCase):
22
23 def test_placeholder(self):
24 pass
25
26
27if __name__ == '__main__':
28 unittest.main()
029
=== added file 'test/unit/obj/test_hashes.py.moved'
--- test/unit/obj/test_hashes.py.moved 1970-01-01 00:00:00 +0000
+++ test/unit/obj/test_hashes.py.moved 2010-10-19 18:43:49 +0000
@@ -0,0 +1,28 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# TODO: Tests
17
18import unittest
19from swift.obj import hashes
20
21class TestHashes(unittest.TestCase):
22
23 def test_placeholder(self):
24 pass
25
26
27if __name__ == '__main__':
28 unittest.main()