Merge lp:~gholt/swift/lobjects3 into lp:~hudson-openstack/swift/trunk

Proposed by gholt
Status: Rejected
Rejected by: gholt
Proposed branch: lp:~gholt/swift/lobjects3
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 4643 lines (+3054/-575)
29 files modified
bin/swift-init (+12/-6)
bin/swift-object-janitor (+28/-0)
doc/source/deployment_guide.rst (+26/-0)
doc/source/development_saio.rst (+9/-0)
doc/source/index.rst (+1/-0)
doc/source/object.rst (+19/-0)
doc/source/overview_very_large_objects.rst (+144/-0)
etc/object-server.conf-sample (+15/-0)
etc/proxy-server.conf-sample (+3/-0)
setup.py (+16/-23)
swift/common/constraints.py (+11/-6)
swift/common/db.py (+1/-2)
swift/obj/auditor.py (+9/-15)
swift/obj/diskfile.py (+507/-0)
swift/obj/janitor.py (+516/-0)
swift/obj/replicator.py (+32/-176)
swift/obj/server.py (+109/-236)
swift/obj/updater.py (+16/-2)
swift/proxy/server.py (+391/-26)
test/functional/sample.conf (+8/-1)
test/functional/tests.py (+1/-1)
test/probe/common.py (+1/-1)
test/probe/test_object_async_update.py (+1/-1)
test/unit/common/test_constraints.py (+6/-8)
test/unit/obj/test_diskfile.py (+203/-0)
test/unit/obj/test_janitor.py (+433/-0)
test/unit/obj/test_server.py (+179/-62)
test/unit/obj/test_updater.py (+8/-4)
test/unit/proxy/test_server.py (+349/-5)
To merge this branch: bzr merge lp:~gholt/swift/lobjects3
Reviewer Review Type Date Requested Status
gholt (community) Disapprove
Review via email: mp+39792@code.launchpad.net

Commit message

Very large object support.

Description of the change

Very large object support.

Please read doc/source/overview_very_large_objects.rst to start.

To post a comment you must log in.
lp:~gholt/swift/lobjects3 updated
120. By gholt

Make poor Hudson happier

121. By gholt

Added missing SkipTest import

122. By gholt

Merged from trunk

123. By gholt

Merged from trunk

124. By gholt

Merge from trunk

Revision history for this message
gholt (gholt) wrote :

After a thorough discussion, this idea of transparent large object support is going to be dropped in favor of a combination client and cluster side solution.

review: Disapprove

Unmerged revisions

124. By gholt

Merge from trunk

123. By gholt

Merged from trunk

122. By gholt

Merged from trunk

121. By gholt

Added missing SkipTest import

120. By gholt

Make poor Hudson happier

119. By gholt

Documentation

118. By gholt

Tests for proxy server

117. By gholt

More object server tests

116. By gholt

More tests for the janitor

115. By gholt

Working on tests and bugfixes

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/swift-init'
2--- bin/swift-init 2010-08-19 20:01:44 +0000
3+++ bin/swift-init 2010-11-08 18:51:48 +0000
4@@ -23,10 +23,11 @@
5 import sys
6 import time
7
8-ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
9+ALL_SERVERS = ['account-auditor', 'account-reaper', 'account-replicator',
10+ 'account-server', 'auth-server', 'container-auditor',
11 'container-replicator', 'container-server', 'container-updater',
12- 'object-auditor', 'object-server', 'object-replicator', 'object-updater',
13- 'proxy-server', 'account-replicator', 'auth-server', 'account-reaper']
14+ 'object-auditor', 'object-janitor', 'object-replicator', 'object-server',
15+ 'object-updater', 'proxy-server']
16 GRACEFUL_SHUTDOWN_SERVERS = ['account-server', 'container-server',
17 'object-server', 'proxy-server', 'auth-server']
18 MAX_DESCRIPTORS = 32768
19@@ -41,6 +42,7 @@
20 servers = [server]
21 command = command.lower()
22
23+
24 def pid_files(server):
25 if os.path.exists('/var/run/swift/%s.pid' % server):
26 pid_files = ['/var/run/swift/%s.pid' % server]
27@@ -50,6 +52,7 @@
28 pid = int(open(pid_file).read().strip())
29 yield pid_file, pid
30
31+
32 def do_start(server, once=False):
33 server_type = '-'.join(server.split('-')[:-1])
34
35@@ -77,7 +80,7 @@
36 os.makedirs(dir)
37 except OSError, err:
38 if err.errno == errno.EACCES:
39- sys.exit('Unable to create %s. Running as non-root?' % dir)
40+ sys.exit('Unable to create %s. Running as non-root?' % dir)
41 fp = open(pid_file, 'w')
42 fp.write('%d\n' % pid)
43 fp.close()
44@@ -120,18 +123,21 @@
45 elif os.path.exists('/etc/swift/%s-server/' % server_type):
46 # found config directory, searching for config file(s)
47 launch_args = []
48- for num, ini_file in enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)):
49+ for num, ini_file in \
50+ enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)):
51 pid_file = '/var/run/swift/%s/%d.pid' % (server, num)
52 # start a server for each ini_file found
53 launch_args.append((ini_file, pid_file))
54 else:
55 # maybe there's a config file(s) out there, but I couldn't find it!
56- sys.exit('Unable to locate config file for %s. %s does not exist?' % (server, ini_file))
57+ sys.exit('Unable to locate config file for %s. %s does not exist?' %
58+ (server, ini_file))
59
60 # start all servers
61 for ini_file, pid_file in launch_args:
62 launch(ini_file, pid_file)
63
64+
65 def do_stop(server, graceful=False):
66 if graceful and server in GRACEFUL_SHUTDOWN_SERVERS:
67 sig = signal.SIGHUP
68
69=== added file 'bin/swift-object-janitor'
70--- bin/swift-object-janitor 1970-01-01 00:00:00 +0000
71+++ bin/swift-object-janitor 2010-11-08 18:51:48 +0000
72@@ -0,0 +1,28 @@
73+#!/usr/bin/python
74+# Copyright (c) 2010 OpenStack, LLC.
75+#
76+# Licensed under the Apache License, Version 2.0 (the "License");
77+# you may not use this file except in compliance with the License.
78+# You may obtain a copy of the License at
79+#
80+# http://www.apache.org/licenses/LICENSE-2.0
81+#
82+# Unless required by applicable law or agreed to in writing, software
83+# distributed under the License is distributed on an "AS IS" BASIS,
84+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
85+# implied.
86+# See the License for the specific language governing permissions and
87+# limitations under the License.
88+
89+import sys
90+
91+from swift.obj.janitor import ObjectJanitor
92+from swift.common import utils
93+
94+if __name__ == '__main__':
95+ if len(sys.argv) < 2:
96+ print "Usage: swift-object-janitor CONFIG_FILE [once]"
97+ sys.exit(1)
98+ once = len(sys.argv) > 2 and sys.argv[2] == 'once'
99+ conf = utils.readconf(sys.argv[1], 'object-janitor')
100+ ObjectJanitor(conf).run(once)
101
102=== modified file 'doc/source/deployment_guide.rst'
103--- doc/source/deployment_guide.rst 2010-10-21 18:59:43 +0000
104+++ doc/source/deployment_guide.rst 2010-11-08 18:51:48 +0000
105@@ -206,6 +206,25 @@
106 object can be reclaimed
107 ================== ================= =======================================
108
109+[object-janitor]
110+
111+=================== ============== ==========================================
112+Option Default Description
113+------------------- -------------- ------------------------------------------
114+log_name object-janitor Label used when logging
115+log_facility LOG_LOCAL0 Syslog log facility
116+log_level INFO Logging level
117+interval 300 Minimum time for a pass to take
118+concurrency 1 Number of updater workers to spawn
119+node_timeout 10 Request timeout to external services
120+conn_timeout 0.5 Connection timeout to external services
121+slowdown 0.01 Time in seconds to wait between operations
122+segment_reclaim_age 604800 Number of seconds before removing orphaned
123+ object segments
124+segments_per_pass 10 Maximum segments per object to remove per
125+ pass
126+=================== ============== ==========================================
127+
128 [object-updater]
129
130 ================== ============== ==========================================
131@@ -438,6 +457,13 @@
132 log_level INFO Log level
133 log_headers True If True, log headers in each
134 request
135+max_object_size 107374182400 Maximum size of any object in
136+ the cluster. Note: The
137+ segment_size value will cause
138+ objects larger than that to be
139+ split into segments
140+segment_size 2147483647 Objects will be split into file
141+ segments no larger than this
142 recheck_account_existence 60 Cache timeout in seconds to
143 send memcached for account
144 existence
145
146=== modified file 'doc/source/development_saio.rst'
147--- doc/source/development_saio.rst 2010-11-02 14:17:25 +0000
148+++ doc/source/development_saio.rst 2010-11-08 18:51:48 +0000
149@@ -440,6 +440,8 @@
150 [object-replicator]
151 vm_test_mode = yes
152
153+ [object-janitor]
154+
155 [object-updater]
156
157 [object-auditor]
158@@ -461,6 +463,8 @@
159 [object-replicator]
160 vm_test_mode = yes
161
162+ [object-janitor]
163+
164 [object-updater]
165
166 [object-auditor]
167@@ -482,6 +486,8 @@
168 [object-replicator]
169 vm_test_mode = yes
170
171+ [object-janitor]
172+
173 [object-updater]
174
175 [object-auditor]
176@@ -503,6 +509,8 @@
177 [object-replicator]
178 vm_test_mode = yes
179
180+ [object-janitor]
181+
182 [object-updater]
183
184 [object-auditor]
185@@ -571,6 +579,7 @@
186 # Replace devauth with whatever your super_admin key is (recorded in
187 # /etc/swift/auth-server.conf).
188 swift-auth-recreate-accounts -K devauth
189+ swift-init object-janitor start
190 swift-init object-updater start
191 swift-init container-updater start
192 swift-init object-replicator start
193
194=== modified file 'doc/source/index.rst'
195--- doc/source/index.rst 2010-11-04 19:25:23 +0000
196+++ doc/source/index.rst 2010-11-08 18:51:48 +0000
197@@ -26,6 +26,7 @@
198 overview_replication
199 overview_stats
200 ratelimit
201+ overview_very_large_objects
202
203 Development:
204
205
206=== modified file 'doc/source/object.rst'
207--- doc/source/object.rst 2010-07-19 16:25:18 +0000
208+++ doc/source/object.rst 2010-11-08 18:51:48 +0000
209@@ -24,6 +24,16 @@
210 :undoc-members:
211 :show-inheritance:
212
213+.. _object-janitor:
214+
215+Object Janitor
216+==============
217+
218+.. automodule:: swift.obj.janitor
219+ :members:
220+ :undoc-members:
221+ :show-inheritance:
222+
223 .. _object-updater:
224
225 Object Updater
226@@ -44,3 +54,12 @@
227 :undoc-members:
228 :show-inheritance:
229
230+.. _object-diskfile:
231+
232+Disk File
233+=========
234+
235+.. automodule:: swift.obj.diskfile
236+ :members:
237+ :undoc-members:
238+ :show-inheritance:
239
240=== added file 'doc/source/overview_very_large_objects.rst'
241--- doc/source/overview_very_large_objects.rst 1970-01-01 00:00:00 +0000
242+++ doc/source/overview_very_large_objects.rst 2010-11-08 18:51:48 +0000
243@@ -0,0 +1,144 @@
244+=========================
245+Very Large Object Support
246+=========================
247+
248+-----
249+Intro
250+-----
251+
252+Supporting very large objects in Swift presented quite a challenge. The main
253+problem is storage balance; if you just have a few very large objects in the
254+cluster some of the storage nodes will have significantly more data than the
255+others. The basic answer is to break these objects up into segments and
256+distribute the segments across the cluster evenly.
257+
258+The user could do this themselves, breaking up their very large objects into
259+smaller objects and uploading those. But then they'd have to reassemble them
260+themselves as well on download. What we do in Swift is essentially emulate this
261+behavior for the user, transparently to the user. In this way, we can pick the
262+optimal segment size for our cluster, something the user probably wouldn't
263+know. This also allows the user to easily do ranged requests on the object
264+without having to know how it's split up behind the scenes.
265+
266+-----------------------
267+The Proxy is in Control
268+-----------------------
269+
270+In Swift's implementation, the proxy server is in control of the object
271+segmentation. As the user uploads a very large object, the proxy automaticaly
272+closes the current segment when it reaches a configurable segment size and
273+opens a new segment for more data. Once all the data is uploaded and stored,
274+the proxy server then creates a manifest object indicating how the object was
275+segmented so that it can be retreived later. The proxy does not spool any data
276+to disk, it sends all data directly to the object servers.
277+
278+Because of these segment switchovers occurring in the proxy server, a very
279+large object operation will have a higher chance of failure than a normal sized
280+object. If at any point the proxy can't communicate with at least a majority of
281+the nodes for each and every segment of the object, the whole object operation
282+will fail. For instance, a regular 1m object in a 3 replica cluster just needs
283+to make at least 2 object server requests to succeed. If the segment size is
284+1G, a 10G object would need 2 object server requests to succeed 11 times during
285+the operation (10 segments, 1 manifest).
286+
287+The upside of the proxy being in control is that the client does not need to
288+know anything about the segmentation; it's done for them. The downside is that
289+failure to upload means starting the whole thing over again. Later we plan to
290+implement what we term 'user manifests' where the user can upload several
291+objects but download them as if they were one. This would allow the user to
292+just reupload a smaller part of the whole in the case of upload failures, but
293+would require them to manage the segments themselves.
294+
295+-------------------------
296+How They're Really Stored
297+-------------------------
298+
299+If you have a working knowledge of how Swift stores and locates regular objects
300+you'll know that the object name is hashed and that that hash determines the
301+storage nodes and disk location for the object's data. Segmented objects work
302+much the same way, but they have as many names as they do segments, plus one
303+more for the manifest.
304+
305+The manifest is stored at the same location as the object would be if it were a
306+normal object. The manifest is the last item stored once all the segments are
307+in place, so that only one version of an object is available at any given time.
308+This is important because you don't want to be able to retrieve a manifest and
309+not be able to find the segments yet.
310+
311+The segments are given the same name as the object with the operation's
312+timestamp and the segment number appended. The reason the timestamp is used is
313+to keep the segments of multiple uploads of the same very large object from
314+colliding. During an overwrite of a very large object, you don't want a
315+download of that object to have some new data and some old. Since the segments
316+for each version of the object are stored independently, a download before the
317+new manifest file is in place will get the old data and a download afterwards
318+will get the new data. Also, in the case where the new upload fails in transit,
319+the old version will still be available.
320+
321+The storage nodes for each segment are determined the familiar way of hashing
322+the segment name (object name + timestamp + segment number as described above)
323+and then picking the storage nodes based on the hash. However, there is an
324+exception for the first segment. This segment will always be stored on the same
325+storage node and disk partition as a regular object of the same name would be,
326+though the hash of the segment name determines the exact on disk location. This
327+is because with chunked transfer encoded uploads we don't know the object size
328+until the upload is complete. We won't realize we need to segment such an
329+object until we've already uploaded the first segment's worth to the regular
330+object storage nodes.
331+
332+To help keep directory structures smaller, object segments are stored in an
333+'object_segments' directory alongside the usual 'objects' directory. Though all
334+these files could be stored in the same directory structure, making this split
335+should make replication, reclamation, and other scan-type operations less of a
336+jar to the systems.
337+
338+.. note::
339+
340+ Because the first segment is always stored on the same storage nodes and in
341+ the same partition as a regular file would, you can often see hashes in a
342+ partition where they don't seem to belong. For instance, a first segment
343+ name that hashes to 25d40100fd07c8c5fd1a4e31875593e1 might be found in
344+ partition 568363 (hex 8ac2b) instead of the expected 154944 (hex 25d40)
345+ because the segment's true object name hashed to
346+ 8ac2bf59556b61bb5cc521ccb51c200a. You should only see these anomalies in
347+ the the object_segments directory, however.
348+
349+-----------
350+Cleaning Up
351+-----------
352+
353+Another challenge very large object support brought was in how to keep
354+everything cleaned up and reduce the chance of completely orphaned segments
355+that would waste a large amount of storage.
356+
357+One such scenario is the common DELETE operation. We don't want to make the
358+client wait while we go out and delete all the segments and the manifest from
359+all over the cluster. Instead, we write out a file that will start a background
360+operation to the cleanup and we just remove the manifest file. An object
361+overwrite is much like a delete in that just before placing the new manifest
362+file we create the background job.
363+
364+Another scenario is failed uploads. If a user disconnects after uploading 100
365+segments before finishing, we have to clean those up. Also, a proxy could crash
366+due to hardware failure right in the middle of such an operation, so we
367+wouldn't even be able to make a cleanup job at that point. So, we make a
368+pending cleanup job just before starting any segmented upload. That pending
369+cleanup job will wait up to a configurable amount of time (1 week by default)
370+for the associated manifest to appear. If the manifest appears, the job is
371+canceled and deleted. If the manifest never appears, the job goes about
372+removing the orphaned segments from the system.
373+
374+Prior to very large object support, we had a background process called object
375+async. Object async would send object metadata updates to the container servers
376+in the event the container servers couldn't be contacted right away. These
377+update jobs were uncommon and quick to complete, so durability wasn't too much
378+of a concern. However, these segment cleanup jobs can hang around for a while
379+(1 week in the above example of a proxy crash), so durable background jobs
380+became important.
381+
382+With this, the object janitor was born. He accomplishes what object async did
383+and the segment cleanup operations as well. The janitor jobs are stored just
384+like regular object data is, except that they're stored in the object_janitor
385+directory structure. Because they're stored in the same way, they can use the
386+same object replicator code to keep durability with replicas on multiple
387+storage nodes.
388
389=== modified file 'etc/object-server.conf-sample'
390--- etc/object-server.conf-sample 2010-10-19 15:02:36 +0000
391+++ etc/object-server.conf-sample 2010-11-08 18:51:48 +0000
392@@ -44,6 +44,21 @@
393 # The replicator also performs reclamation
394 # reclaim_age = 604800
395
396+[object-janitor]
397+# log_name = object-janitor
398+# interval = 300
399+# concurrency = 1
400+# node_timeout = 10
401+# conn_timeout = 0.5
402+# slowdown will sleep that amount between janitor operations
403+# slowdown = 0.01
404+# Number of seconds before assuming a segmented put will never succeed and
405+# therefore clean up any orphaned segments from the operation.
406+# segment_reclaim_age = 604800
407+# Number of segments to remove per pass when cleaning up after superceded or
408+# orphaned segmented put operations.
409+# segments_per_pass = 10
410+
411 [object-updater]
412 # log_name = object-updater
413 # interval = 300
414
415=== modified file 'etc/proxy-server.conf-sample'
416--- etc/proxy-server.conf-sample 2010-11-03 20:17:27 +0000
417+++ etc/proxy-server.conf-sample 2010-11-08 18:51:48 +0000
418@@ -17,6 +17,9 @@
419 # log_facility = LOG_LOCAL0
420 # log_level = INFO
421 # log_headers = False
422+# max_object_size = 107374182400
423+# Files will be split into segments no larger than segment_size
424+# segment_size = 2147483647
425 # recheck_account_existence = 60
426 # recheck_container_existence = 60
427 # object_chunk_size = 8192
428
429=== modified file 'setup.py'
430--- setup.py 2010-11-03 19:50:35 +0000
431+++ setup.py 2010-11-08 18:51:48 +0000
432@@ -21,6 +21,7 @@
433
434 from swift import __version__ as version
435
436+
437 class local_sdist(sdist):
438 """Customized sdist hook - builds the ChangeLog file from VC first"""
439
440@@ -57,29 +58,21 @@
441 'Environment :: No Input/Output (Daemon)',
442 ],
443 install_requires=[], # removed for better compat
444- scripts=[
445- 'bin/st', 'bin/swift-account-auditor',
446- 'bin/swift-account-audit', 'bin/swift-account-reaper',
447- 'bin/swift-account-replicator', 'bin/swift-account-server',
448- 'bin/swift-auth-add-user',
449- 'bin/swift-auth-recreate-accounts', 'bin/swift-auth-server',
450- 'bin/swift-auth-update-reseller-prefixes',
451- 'bin/swift-container-auditor',
452- 'bin/swift-container-replicator',
453- 'bin/swift-container-server', 'bin/swift-container-updater',
454- 'bin/swift-drive-audit', 'bin/swift-get-nodes',
455- 'bin/swift-init', 'bin/swift-object-auditor',
456- 'bin/swift-object-info',
457- 'bin/swift-object-replicator',
458- 'bin/swift-object-server',
459- 'bin/swift-object-updater', 'bin/swift-proxy-server',
460- 'bin/swift-ring-builder', 'bin/swift-stats-populate',
461- 'bin/swift-stats-report',
462- 'bin/swift-bench',
463- 'bin/swift-log-uploader',
464- 'bin/swift-log-stats-collector',
465- 'bin/swift-account-stats-logger',
466- ],
467+ scripts=['bin/st', 'bin/swift-account-audit', 'bin/swift-account-auditor',
468+ 'bin/swift-account-reaper', 'bin/swift-account-replicator',
469+ 'bin/swift-account-server', 'bin/swift-account-stats-logger',
470+ 'bin/swift-auth-add-user', 'bin/swift-auth-recreate-accounts',
471+ 'bin/swift-auth-server', 'bin/swift-auth-update-reseller-prefixes',
472+ 'bin/swift-bench', 'bin/swift-container-auditor',
473+ 'bin/swift-container-replicator', 'bin/swift-container-server',
474+ 'bin/swift-container-updater', 'bin/swift-drive-audit',
475+ 'bin/swift-get-nodes', 'bin/swift-init',
476+ 'bin/swift-log-stats-collector', 'bin/swift-log-uploader',
477+ 'bin/swift-object-auditor', 'bin/swift-object-info',
478+ 'bin/swift-object-janitor', 'bin/swift-object-replicator',
479+ 'bin/swift-object-server', 'bin/swift-object-updater',
480+ 'bin/swift-proxy-server', 'bin/swift-ring-builder',
481+ 'bin/swift-stats-populate', 'bin/swift-stats-report'],
482 entry_points={
483 'paste.app_factory': [
484 'proxy=swift.proxy.server:app_factory',
485
486=== modified file 'swift/common/constraints.py'
487--- swift/common/constraints.py 2010-10-26 15:13:14 +0000
488+++ swift/common/constraints.py 2010-11-08 18:51:48 +0000
489@@ -19,8 +19,6 @@
490 HTTPRequestEntityTooLarge
491
492
493-#: Max file size allowed for objects
494-MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2
495 #: Max length of the name of a key for metadata
496 MAX_META_NAME_LENGTH = 128
497 #: Max length of the value of a key for metadata
498@@ -29,14 +27,18 @@
499 MAX_META_COUNT = 90
500 #: Max overall size of metadata
501 MAX_META_OVERALL_SIZE = 4096
502+#: Max account name length
503+MAX_ACCOUNT_NAME_LENGTH = 256
504+#: Max container name length
505+MAX_CONTAINER_NAME_LENGTH = 256
506 #: Max object name length
507 MAX_OBJECT_NAME_LENGTH = 1024
508 #: Max object list length of a get request for a container
509 CONTAINER_LISTING_LIMIT = 10000
510 #: Max container list length of a get request for an account
511 ACCOUNT_LISTING_LIMIT = 10000
512-MAX_ACCOUNT_NAME_LENGTH = 256
513-MAX_CONTAINER_NAME_LENGTH = 256
514+#: Default pickle protocol number used for Swift pickles
515+PICKLE_PROTOCOL = 2
516
517
518 def check_metadata(req, target_type):
519@@ -82,19 +84,22 @@
520 return None
521
522
523-def check_object_creation(req, object_name):
524+def check_object_creation(req, object_name, max_object_size=0):
525 """
526 Check to ensure that everything is alright about an object to be created.
527
528 :param req: HTTP request object
529 :param object_name: name of object to be created
530+ :param max_object_size: the maximum object size to check against; 0 if no
531+ object size checking should be done
532 :raises HTTPRequestEntityTooLarge: the object is too large
533 :raises HTTPLengthRequered: missing content-length header and not
534 a chunked request
535 :raises HTTPBadRequest: missing or bad content-type header, or
536 bad metadata
537 """
538- if req.content_length and req.content_length > MAX_FILE_SIZE:
539+ if max_object_size > 0 and req.content_length and \
540+ req.content_length > max_object_size:
541 return HTTPRequestEntityTooLarge(body='Your request is too large.',
542 request=req, content_type='text/plain')
543 if req.content_length is None and \
544
545=== modified file 'swift/common/db.py'
546--- swift/common/db.py 2010-08-16 22:30:27 +0000
547+++ swift/common/db.py 2010-11-08 18:51:48 +0000
548@@ -32,6 +32,7 @@
549 import simplejson as json
550 import sqlite3
551
552+from swift.common.constraints import PICKLE_PROTOCOL
553 from swift.common.utils import normalize_timestamp, renamer, \
554 mkdirs, lock_parent_directory, fallocate
555 from swift.common.exceptions import LockTimeout
556@@ -39,8 +40,6 @@
557
558 #: Timeout for trying to connect to a DB
559 BROKER_TIMEOUT = 25
560-#: Pickle protocol to use
561-PICKLE_PROTOCOL = 2
562 #: Max number of pending entries
563 PENDING_CAP = 131072
564
565
566=== modified file 'swift/obj/auditor.py'
567--- swift/obj/auditor.py 2010-10-21 18:32:10 +0000
568+++ swift/obj/auditor.py 2010-11-08 18:51:48 +0000
569@@ -18,8 +18,8 @@
570 from hashlib import md5
571 from random import random
572
573-from swift.obj import server as object_server
574-from swift.obj.replicator import invalidate_hash
575+from swift.obj.diskfile import DATADIR, DiskFile, invalidate_hash, \
576+ read_metadata
577 from swift.common.utils import get_logger, renamer, audit_location_generator
578 from swift.common.exceptions import AuditException
579 from swift.common.daemon import Daemon
580@@ -45,10 +45,8 @@
581 time.sleep(random() * self.interval)
582 while True:
583 begin = time.time()
584- all_locs = audit_location_generator(self.devices,
585- object_server.DATADIR,
586- mount_check=self.mount_check,
587- logger=self.logger)
588+ all_locs = audit_location_generator(self.devices, DATADIR,
589+ mount_check=self.mount_check, logger=self.logger)
590 for path, device, partition in all_locs:
591 self.object_audit(path, device, partition)
592 if time.time() - reported >= 3600: # once an hour
593@@ -68,10 +66,8 @@
594 """Run the object audit once."""
595 self.logger.info('Begin object audit "once" mode')
596 begin = reported = time.time()
597- all_locs = audit_location_generator(self.devices,
598- object_server.DATADIR,
599- mount_check=self.mount_check,
600- logger=self.logger)
601+ all_locs = audit_location_generator(self.devices, DATADIR,
602+ mount_check=self.mount_check, logger=self.logger)
603 for path, device, partition in all_locs:
604 self.object_audit(path, device, partition)
605 if time.time() - reported >= 3600: # once an hour
606@@ -99,14 +95,12 @@
607 if not path.endswith('.data'):
608 return
609 try:
610- name = object_server.read_metadata(path)['name']
611+ name = read_metadata(path)['name']
612 except Exception, exc:
613 raise AuditException('Error when reading metadata: %s' % exc)
614 _, account, container, obj = name.split('/', 3)
615- df = object_server.DiskFile(self.devices, device,
616- partition, account,
617- container, obj,
618- keep_data_fp=True)
619+ df = DiskFile(self.devices, device, partition, account, container,
620+ obj, keep_data_fp=True)
621 if df.data_file is None:
622 # file is deleted, we found the tombstone
623 return
624
625=== added file 'swift/obj/diskfile.py'
626--- swift/obj/diskfile.py 1970-01-01 00:00:00 +0000
627+++ swift/obj/diskfile.py 2010-11-08 18:51:48 +0000
628@@ -0,0 +1,507 @@
629+# Copyright (c) 2010 OpenStack, LLC.
630+#
631+# Licensed under the Apache License, Version 2.0 (the "License");
632+# you may not use this file except in compliance with the License.
633+# You may obtain a copy of the License at
634+#
635+# http://www.apache.org/licenses/LICENSE-2.0
636+#
637+# Unless required by applicable law or agreed to in writing, software
638+# distributed under the License is distributed on an "AS IS" BASIS,
639+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
640+# implied.
641+# See the License for the specific language governing permissions and
642+# limitations under the License.
643+
644+from __future__ import with_statement
645+import cPickle as pickle
646+import errno
647+import hashlib
648+import os
649+from contextlib import contextmanager
650+from os.path import basename, dirname, isdir, join, splitext
651+from tempfile import mkstemp
652+from time import time
653+
654+from eventlet import tpool, sleep
655+from xattr import getxattr, setxattr
656+
657+from swift.common.constraints import PICKLE_PROTOCOL
658+from swift.common.utils import drop_buffer_cache, hash_path, lock_path, \
659+ mkdirs, normalize_timestamp, renamer, split_path, storage_directory
660+
661+
662+DATADIR = 'objects'
663+SEGMENTSDIR = 'object_segments'
664+JANITORDIR = 'object_janitor'
665+HASH_FILE = 'hashes.pkl'
666+METADATA_KEY = 'user.swift.metadata'
667+ONE_WEEK = 604800
668+
669+
670+def read_metadata(fd):
671+ """
672+ Helper function to read the pickled metadata from an object file.
673+
674+ :param fd: file descriptor to load the metadata from
675+
676+ :returns: dictionary of metadata
677+ """
678+ metadata = ''
679+ key = 0
680+ try:
681+ while True:
682+ metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
683+ key += 1
684+ except IOError:
685+ pass
686+ return pickle.loads(metadata)
687+
688+
689+def hash_suffix(path, reclaim_age):
690+ """
691+ Performs reclamation and returns an md5 of all (remaining) files.
692+
693+ :param reclaim_age: age in seconds at which to remove tombstones
694+ """
695+ md5 = hashlib.md5()
696+ for hsh in sorted(os.listdir(path)):
697+ hsh_path = join(path, hsh)
698+ files = os.listdir(hsh_path)
699+ if len(files) == 1:
700+ if files[0].endswith('.ts'):
701+ # remove tombstones older than reclaim_age
702+ ts = files[0].rsplit('.', 1)[0]
703+ if (time() - float(ts)) > reclaim_age:
704+ os.unlink(join(hsh_path, files[0]))
705+ files.remove(files[0])
706+ elif files:
707+ files.sort(reverse=True)
708+ meta = data = tomb = None
709+ for filename in files:
710+ if not meta and filename.endswith('.meta'):
711+ meta = filename
712+ if not data and filename.endswith('.data'):
713+ data = filename
714+ if not tomb and filename.endswith('.ts'):
715+ tomb = filename
716+ if (filename < tomb or # any file older than tomb
717+ filename < data or # any file older than data
718+ (filename.endswith('.meta') and
719+ filename < meta)): # old meta
720+ if filename.endswith('.data'):
721+ fp = open(join(hsh_path, filename), 'rb')
722+ metadata = read_metadata(fp)
723+ if metadata.get('X-Object-Type') == 'manifest':
724+ manifest = pickle.load(fp)
725+ partition = dirname(path)
726+ device = dirname(dirname(partition))
727+ devices = dirname(device)
728+ partition = basename(partition)
729+ device = basename(device)
730+ account, container, obj = \
731+ split_path(metadata['name'], 3,
732+ rest_with_last=True)
733+ df = DiskFile(devices, device, partition,
734+ 'Segment-Cleanup', manifest['x-timestamp'],
735+ '%s/%s/%s' % (account, container, obj),
736+ datadir=JANITORDIR)
737+ df.store_janitor_segment_cleanup(account,
738+ container, obj,
739+ segment_count=(manifest['content-length'] /
740+ manifest['x-segment-size'] + 1),
741+ segment_last_deleted=None)
742+ fp.close()
743+ os.unlink(join(hsh_path, filename))
744+ files.remove(filename)
745+ if not files:
746+ os.rmdir(hsh_path)
747+ for filename in files:
748+ md5.update(filename)
749+ try:
750+ os.rmdir(path)
751+ except OSError:
752+ pass
753+ return md5.hexdigest()
754+
755+
756+def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
757+ """
758+ Recalculates hashes for the given suffixes in the partition and updates
759+ them in the partition's hashes file.
760+
761+ :param partition_dir: directory of the partition in which to recalculate
762+ :param suffixes: list of suffixes to recalculate
763+ :param reclaim_age: age in seconds at which tombstones should be removed
764+ """
765+
766+ def tpool_listdir(partition_dir):
767+ return dict(((suff, None) for suff in os.listdir(partition_dir)
768+ if len(suff) == 3 and isdir(join(partition_dir, suff))))
769+ hashes_file = join(partition_dir, HASH_FILE)
770+ with lock_path(partition_dir):
771+ try:
772+ with open(hashes_file, 'rb') as fp:
773+ hashes = pickle.load(fp)
774+ except Exception:
775+ hashes = tpool.execute(tpool_listdir, partition_dir)
776+ for suffix in suffixes:
777+ suffix_dir = join(partition_dir, suffix)
778+ if os.path.exists(suffix_dir):
779+ hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
780+ elif suffix in hashes:
781+ del hashes[suffix]
782+ with open(hashes_file + '.tmp', 'wb') as fp:
783+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
784+ renamer(hashes_file + '.tmp', hashes_file)
785+
786+
787+def invalidate_hash(suffix_dir):
788+ """
789+ Invalidates the hash for a suffix_dir in the partition's hashes file.
790+
791+ :param suffix_dir: absolute path to suffix dir whose hash needs
792+ invalidating
793+ """
794+
795+ suffix = os.path.basename(suffix_dir)
796+ partition_dir = os.path.dirname(suffix_dir)
797+ hashes_file = join(partition_dir, HASH_FILE)
798+ with lock_path(partition_dir):
799+ try:
800+ with open(hashes_file, 'rb') as fp:
801+ hashes = pickle.load(fp)
802+ if suffix in hashes and not hashes[suffix]:
803+ return
804+ except Exception:
805+ return
806+ hashes[suffix] = None
807+ with open(hashes_file + '.tmp', 'wb') as fp:
808+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
809+ renamer(hashes_file + '.tmp', hashes_file)
810+
811+
812+def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
813+ """
814+ Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
815+ the hash cache for suffix existence at the (unexpectedly high) cost of a
816+ listdir. reclaim_age is just passed on to hash_suffix.
817+
818+ :param partition_dir: absolute path of partition to get hashes for
819+ :param do_listdir: force existence check for all hashes in the partition
820+ :param reclaim_age: age at which to remove tombstones
821+
822+ :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
823+ """
824+
825+ def tpool_listdir(hashes, partition_dir):
826+ return dict(((suff, hashes.get(suff, None))
827+ for suff in os.listdir(partition_dir)
828+ if len(suff) == 3 and isdir(join(partition_dir, suff))))
829+ hashed = 0
830+ hashes_file = join(partition_dir, HASH_FILE)
831+ with lock_path(partition_dir):
832+ modified = False
833+ hashes = {}
834+ try:
835+ with open(hashes_file, 'rb') as fp:
836+ hashes = pickle.load(fp)
837+ except Exception:
838+ do_listdir = True
839+ if do_listdir:
840+ hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
841+ modified = True
842+ for suffix, hash_ in hashes.items():
843+ if not hash_:
844+ suffix_dir = join(partition_dir, suffix)
845+ if os.path.exists(suffix_dir):
846+ try:
847+ hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
848+ hashed += 1
849+ except OSError:
850+ logging.exception('Error hashing suffix')
851+ hashes[suffix] = None
852+ else:
853+ del hashes[suffix]
854+ modified = True
855+ sleep()
856+ if modified:
857+ with open(hashes_file + '.tmp', 'wb') as fp:
858+ pickle.dump(hashes, fp, PICKLE_PROTOCOL)
859+ renamer(hashes_file + '.tmp', hashes_file)
860+ return hashed, hashes
861+
862+
863+class DiskFile(object):
864+ """
865+ Manage files on disk for a single object.
866+
867+ :param path: path to devices on the node
868+ :param device: device name
869+ :param partition: partition on the device the object lives in
870+ :param account: account name for the object
871+ :param container: container name for the object
872+ :param obj: object name for the object
873+ :param keep_data_fp: if True, don't close the fp, otherwise close it
874+ :param disk_chunk_size: size of chunks on file reads
875+ :param datadir: Sets which directory the root of the data structure is
876+ named (default: DATADIR)
877+ :param segment: If set to not None, indicates which segment of an object
878+ this file represents
879+ :param segment_timestamp: X-Timestamp of the object's segments (set on the
880+ PUT, not changed on POSTs), required if segment
881+ is set to not None
882+ """
883+
884+ def __init__(self, path, device, partition, account, container, obj,
885+ keep_data_fp=False, disk_chunk_size=65536, datadir=DATADIR,
886+ segment=None, segment_timestamp=None):
887+ self.account = account
888+ self.container = container
889+ self.obj = obj
890+ self.fp = None
891+ self.disk_chunk_size = disk_chunk_size
892+ self.name = '/' + '/'.join((account, container, obj))
893+ if segment is not None:
894+ segment_name = '%s/%s/%s' % (obj, segment_timestamp, segment)
895+ segment_hash = hash_path(account, container, segment_name)
896+ self.datadir = os.path.join(path, device,
897+ storage_directory(SEGMENTSDIR, partition, segment_hash))
898+ name_hash = hash_path(account, container, obj)
899+ self.no_longer_segment_datadir = os.path.join(path, device,
900+ storage_directory(datadir, partition, name_hash))
901+ else:
902+ name_hash = hash_path(account, container, obj)
903+ self.datadir = self.no_longer_segment_datadir = os.path.join(path,
904+ device, storage_directory(datadir, partition, name_hash))
905+ self.tmpdir = os.path.join(path, device, 'tmp')
906+ self.metadata = {}
907+ self.meta_file = None
908+ self.data_file = None
909+ if not os.path.exists(self.datadir):
910+ return
911+ files = sorted(os.listdir(self.datadir), reverse=True)
912+ for file in files:
913+ if file.endswith('.ts'):
914+ self.data_file = self.meta_file = None
915+ self.metadata = {'deleted': True}
916+ return
917+ if file.endswith('.meta') and not self.meta_file:
918+ self.meta_file = os.path.join(self.datadir, file)
919+ if file.endswith('.data') and not self.data_file:
920+ self.data_file = os.path.join(self.datadir, file)
921+ break
922+ if not self.data_file:
923+ return
924+ self.fp = open(self.data_file, 'rb')
925+ self.metadata = read_metadata(self.fp)
926+ if not keep_data_fp:
927+ self.close()
928+ if self.meta_file:
929+ with open(self.meta_file) as mfp:
930+ for key in self.metadata.keys():
931+ if key.lower() not in ('content-type', 'content-encoding',
932+ 'deleted', 'content-length', 'etag'):
933+ del self.metadata[key]
934+ self.metadata.update(read_metadata(mfp))
935+
936+ def __iter__(self):
937+ """Returns an iterator over the data file."""
938+ try:
939+ dropped_cache = 0
940+ read = 0
941+ while True:
942+ chunk = self.fp.read(self.disk_chunk_size)
943+ if chunk:
944+ read += len(chunk)
945+ if read - dropped_cache > (1024 * 1024):
946+ drop_buffer_cache(self.fp.fileno(), dropped_cache,
947+ read - dropped_cache)
948+ dropped_cache = read
949+ yield chunk
950+ else:
951+ drop_buffer_cache(self.fp.fileno(), dropped_cache,
952+ read - dropped_cache)
953+ break
954+ finally:
955+ self.close()
956+
957+ def app_iter_range(self, start, stop):
958+ """Returns an iterator over the data file for range (start, stop)"""
959+ if start:
960+ self.fp.seek(start)
961+ if stop is not None:
962+ length = stop - start
963+ else:
964+ length = None
965+ for chunk in self:
966+ if length is not None:
967+ length -= len(chunk)
968+ if length < 0:
969+ # Chop off the extra:
970+ yield chunk[:length]
971+ break
972+ yield chunk
973+
974+ def close(self):
975+ """Close the file."""
976+ if self.fp:
977+ self.fp.close()
978+ self.fp = None
979+
980+ def is_deleted(self):
981+ """
982+ Check if the file is deleted.
983+
984+ :returns: True if the file doesn't exist or has been flagged as
985+ deleted.
986+ """
987+ return not self.data_file or 'deleted' in self.metadata
988+
989+ @contextmanager
990+ def mkstemp(self):
991+ """Context manager to make a temporary file."""
992+ if not os.path.exists(self.tmpdir):
993+ mkdirs(self.tmpdir)
994+ fd, tmppath = mkstemp(dir=self.tmpdir)
995+ try:
996+ yield fd, tmppath
997+ finally:
998+ try:
999+ os.close(fd)
1000+ except OSError:
1001+ pass
1002+ try:
1003+ os.unlink(tmppath)
1004+ except OSError:
1005+ pass
1006+
1007+ def put(self, fd, tmppath, metadata, extension='.data',
1008+ no_longer_segment=False):
1009+ """
1010+ Finalize writing the file on disk, and renames it from the temp file to
1011+ the real location. This should be called after the data has been
1012+ written to the temp file.
1013+
1014+ :params fd: file descriptor of the temp file
1015+ :param tmppath: path to the temporary file being used
1016+ :param metadata: dictionary of metada to be written
1017+ :param extension: extension to be used when making the file
1018+ :param no_longer_segment: Set to True if this was originally an object
1019+ segment but no longer is (case with chunked transfer encoding when
1020+ the object ends up less than the segment size)
1021+ """
1022+ metadata['name'] = self.name
1023+ timestamp = normalize_timestamp(metadata['X-Timestamp'])
1024+ metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
1025+ key = 0
1026+ while metastr:
1027+ setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
1028+ metastr = metastr[254:]
1029+ key += 1
1030+ if 'Content-Length' in metadata:
1031+ drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
1032+ os.fsync(fd)
1033+ if no_longer_segment:
1034+ self.datadir = self.no_longer_segment_datadir
1035+ invalidate_hash(os.path.dirname(self.datadir))
1036+ renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
1037+ self.metadata = metadata
1038+
1039+ def unlinkold(self, timestamp):
1040+ """
1041+ Remove any older versions of the object file. Any file that has an
1042+ older timestamp than timestamp will be deleted.
1043+
1044+ :param timestamp: timestamp to compare with each file
1045+ """
1046+ timestamp = normalize_timestamp(timestamp)
1047+ for fname in os.listdir(self.datadir):
1048+ if fname < timestamp:
1049+ try:
1050+ os.unlink(os.path.join(self.datadir, fname))
1051+ except OSError, err: # pragma: no cover
1052+ if err.errno != errno.ENOENT:
1053+ raise
1054+
1055+ def tombstone(self, timestamp):
1056+ """
1057+ Creates a tombstone for the DiskFile, indicating any versions older
1058+ than `timestamp` should be removed.
1059+
1060+ :param timestamp: normalized timestamp of the tombstone
1061+ """
1062+ with self.mkstemp() as (fd, tmppath):
1063+ self.put(fd, tmppath, {'X-Timestamp': timestamp, 'deleted': True},
1064+ extension='.ts')
1065+ self.unlinkold(timestamp)
1066+
1067+ def store_janitor_container_update(self, op, account, container, obj,
1068+ headers, successes):
1069+ """
1070+ Creates a .data file whose contents contain a operation for the
1071+ object-janitor to send object metadata to the container servers.
1072+
1073+ :param op: The operation to send to the container server (usually PUT
1074+ or DELETE).
1075+ :param account: The account name for the object.
1076+ :param container: The container name for the object.
1077+ :param obj: The object name for the object.
1078+ :param headers: The headers to include in the requests to the container
1079+ servers. Should at least contain X-Timestamp indicating
1080+ the version of the object metadata.
1081+ :param successes: An array of container node ids that have already
1082+ received the object metadata update.
1083+ """
1084+ timestamp = normalize_timestamp(time())
1085+ with self.mkstemp() as (fd, tmppath):
1086+ os.write(fd, pickle.dumps({'op': op, 'account': account,
1087+ 'container': container, 'obj': obj, 'headers': headers,
1088+ 'successes': successes}, PICKLE_PROTOCOL))
1089+ self.put(fd, tmppath,
1090+ {'X-Op': 'Container-Update', 'X-Timestamp': timestamp})
1091+ self.unlinkold(timestamp)
1092+
1093+ def store_janitor_segment_cleanup(self, account, container, obj,
1094+ segment_count, segment_last_deleted):
1095+ """
1096+ Creates a .data file whose contents contain a operation for the
1097+ object-janitor to send clean up object segments for an object.
1098+
1099+ Note that the DiskFile created for this operation is a bit different
1100+ than most DiskFiles. The DiskFile account name should be
1101+ 'Segment-Cleanup', the container name should be the segments'
1102+ timestamp, and the object name should be the full account/container/obj
1103+ path (no leading /). However, the account, container, obj given in this
1104+ `store_janitor_segment_cleanup` call should be the usual as related to
1105+ the actual object. This complexity is so that each set of segments are
1106+ treated independently.
1107+
1108+ For example::
1109+
1110+ df = DiskFile(devices, device, partition,
1111+ 'Segment-Cleanup', manifest['x-timestamp'],
1112+ '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR)
1113+ df.store_janitor_segment_cleanup(account, container, obj, None,
1114+ None)
1115+
1116+ :param account: The account name for the object.
1117+ :param container: The container name for the object.
1118+ :param obj: The object name for the object.
1119+ :param segment_count: The number of segments the object has or None if
1120+ the number is not known.
1121+ :param segment_last_deleted: The segment that was last deleted so the
1122+ next pass of this operation can continue
1123+ where it left off. Set to None if no
1124+ segments have been deleted yet.
1125+ """
1126+ timestamp = normalize_timestamp(time())
1127+ with self.mkstemp() as (fd, tmppath):
1128+ os.write(fd, pickle.dumps({'account': account,
1129+ 'container': container, 'obj': obj,
1130+ 'segment_count': segment_count,
1131+ 'segment_last_deleted': segment_last_deleted},
1132+ PICKLE_PROTOCOL))
1133+ self.put(fd, tmppath,
1134+ {'X-Op': 'Segment-Cleanup', 'X-Timestamp': timestamp})
1135+ self.unlinkold(timestamp)
1136
1137=== added file 'swift/obj/janitor.py'
1138--- swift/obj/janitor.py 1970-01-01 00:00:00 +0000
1139+++ swift/obj/janitor.py 2010-11-08 18:51:48 +0000
1140@@ -0,0 +1,516 @@
1141+# Copyright (c) 2010 OpenStack, LLC.
1142+#
1143+# Licensed under the Apache License, Version 2.0 (the "License");
1144+# you may not use this file except in compliance with the License.
1145+# You may obtain a copy of the License at
1146+#
1147+# http://www.apache.org/licenses/LICENSE-2.0
1148+#
1149+# Unless required by applicable law or agreed to in writing, software
1150+# distributed under the License is distributed on an "AS IS" BASIS,
1151+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1152+# implied.
1153+# See the License for the specific language governing permissions and
1154+# limitations under the License.
1155+
1156+from __future__ import with_statement
1157+import cPickle as pickle
1158+import os
1159+import signal
1160+import sys
1161+from random import random
1162+from time import sleep, time
1163+
1164+from eventlet import patcher, Timeout
1165+
1166+from swift.common.bufferedhttp import http_connect
1167+from swift.common.exceptions import ConnectionTimeout
1168+from swift.common.ring import Ring
1169+from swift.common.utils import get_logger, normalize_timestamp, whataremyips
1170+from swift.common.daemon import Daemon
1171+from swift.obj.diskfile import DiskFile, JANITORDIR, read_metadata
1172+
1173+
1174+class ObjectJanitor(Daemon):
1175+ """
1176+ Run background operations for the object server, such as updating container
1177+ servers with new object metadata and cleaning up discarded segmented
1178+ objects.
1179+ """
1180+
1181+ def __init__(self, conf):
1182+ self.conf = conf
1183+ self.logger = get_logger(conf, 'object-janitor')
1184+ self.devices = conf.get('devices', '/srv/node')
1185+ self.port = int(conf.get('bind_port', 6000))
1186+ self.my_node_ids = []
1187+ self.mount_check = conf.get('mount_check', 'true').lower() in \
1188+ ('true', 't', '1', 'on', 'yes', 'y')
1189+ swift_dir = conf.get('swift_dir', '/etc/swift')
1190+ self.interval = int(conf.get('interval', 300))
1191+ self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
1192+ self.object_ring = None
1193+ self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
1194+ self.container_ring = None
1195+ self.concurrency = int(conf.get('concurrency', 1))
1196+ self.slowdown = float(conf.get('slowdown', 0.01))
1197+ self.node_timeout = int(conf.get('node_timeout', 10))
1198+ self.conn_timeout = float(conf.get('conn_timeout', 0.5))
1199+ self.segment_reclaim_age = int(conf.get('segment_reclaim_age', 604800))
1200+ self.segments_per_pass = int(conf.get('segments_per_pass', 10))
1201+ self.container_update_successes = 0
1202+ self.container_update_failures = 0
1203+ self.segment_cleanup_completions = 0
1204+ self.segment_cleanup_segments = 0
1205+ self.segment_cleanup_failures = 0
1206+
1207+ def get_object_ring(self):
1208+ """Get the object ring. Load it, if it hasn't been yet."""
1209+ if not self.object_ring:
1210+ self.logger.debug(
1211+ 'Loading object ring from %s' % self.object_ring_path)
1212+ self.object_ring = Ring(self.object_ring_path)
1213+ return self.object_ring
1214+
1215+ def get_container_ring(self):
1216+ """Get the container ring. Load it, if it hasn't been yet."""
1217+ if not self.container_ring:
1218+ self.logger.debug(
1219+ 'Loading container ring from %s' % self.container_ring_path)
1220+ self.container_ring = Ring(self.container_ring_path)
1221+ return self.container_ring
1222+
1223+ def run_forever(self):
1224+ """Run the janitor continuously."""
1225+ sleep(random() * self.interval)
1226+ while True:
1227+ begin = time()
1228+ self.full_sweep()
1229+ elapsed = time() - begin
1230+ if elapsed < self.interval:
1231+ sleep(self.interval - elapsed)
1232+
1233+ def run_once(self):
1234+ """Run the janitor once."""
1235+ self.full_sweep(fork=False)
1236+
1237+ def full_sweep(self, fork=True):
1238+ """
1239+ Run a full sweep of the server for object janitor operations.
1240+
1241+ :param fork: If True, subprocesses will be forked for each device up to
1242+ self.concurrency at any given time.
1243+ """
1244+ self.logger.info('Begin object janitor sweep')
1245+ begin = time()
1246+ my_ips = whataremyips()
1247+ self.my_node_ids = [n['id'] for n in self.get_object_ring().devs
1248+ if n and n['ip'] in my_ips and n['port'] == self.port]
1249+ pids = []
1250+ for device in os.listdir(self.devices):
1251+ if self.mount_check and not \
1252+ os.path.ismount(os.path.join(self.devices, device)):
1253+ self.logger.warn('Skipping %s as it is not mounted' % device)
1254+ continue
1255+ if fork:
1256+ while len(pids) >= self.concurrency:
1257+ pids.remove(os.wait()[0])
1258+ # read from rings to ensure they're fresh
1259+ self.get_object_ring().get_nodes('')
1260+ self.get_container_ring().get_nodes('')
1261+ if fork:
1262+ pid = os.fork()
1263+ if pid:
1264+ pids.append(pid)
1265+ else:
1266+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
1267+ patcher.monkey_patch(all=False, socket=True)
1268+ self.object_sweep(os.path.join(self.devices, device))
1269+ sys.exit()
1270+ else:
1271+ self.object_sweep(os.path.join(self.devices, device))
1272+ if fork:
1273+ while pids:
1274+ pids.remove(os.wait()[0])
1275+ elapsed = time() - begin
1276+ self.logger.info('Object janitor sweep completed: %.02fs' % elapsed)
1277+
1278+ def object_sweep(self, device):
1279+ """
1280+ If there are janitor pendings on the device, walk each one and update.
1281+
1282+ :param device: path to device
1283+ """
1284+ self.container_update_successes = 0
1285+ self.container_update_failures = 0
1286+ self.segment_cleanup_completions = 0
1287+ self.segment_cleanup_segments = 0
1288+ self.segment_cleanup_failures = 0
1289+ begin = time()
1290+ janitordir = os.path.join(device, JANITORDIR)
1291+ try:
1292+ if not os.path.isdir(janitordir):
1293+ return
1294+ for partition in os.listdir(janitordir):
1295+ partition_path = os.path.join(janitordir, partition)
1296+ for suffix in os.listdir(partition_path):
1297+ suffix_path = os.path.join(partition_path, suffix)
1298+ for janitor in os.listdir(suffix_path):
1299+ janitor_path = os.path.join(suffix_path, janitor)
1300+ self.process_object_janitor(device, partition,
1301+ janitor_path)
1302+ sleep(self.slowdown)
1303+ finally:
1304+ elapsed = time() - begin
1305+ self.logger.info('Object janitor sweep of %s completed in %.02fs: '
1306+ 'container updates: %s successes, %s failures; segment '
1307+ 'cleanups: %s completions, %s segments, %s failures' %
1308+ (device, elapsed,
1309+ self.container_update_successes,
1310+ self.container_update_failures,
1311+ self.segment_cleanup_completions,
1312+ self.segment_cleanup_segments,
1313+ self.segment_cleanup_failures))
1314+
1315+ def process_object_janitor(self, device, partition, janitor_path):
1316+ """
1317+ Process the object janitor operation.
1318+
1319+ :param device: path to device
1320+ :param partition: partition for the object
1321+ :param janitor_path: path to DiskFile for the janitor operation
1322+ """
1323+ data_file = None
1324+ files = sorted(os.listdir(janitor_path), reverse=True)
1325+ for file_ in files:
1326+ if file_.endswith('.ts'):
1327+ break
1328+ if file_.endswith('.data'):
1329+ data_file = os.path.join(janitor_path, file_)
1330+ break
1331+ if not data_file:
1332+ return
1333+ metadata = read_metadata(data_file)
1334+ _, account, container, obj = metadata['name'].split('/', 3)
1335+ if metadata['X-Op'] == 'Container-Update':
1336+ self.process_container_update(device, partition, account,
1337+ container, obj)
1338+ elif metadata['X-Op'] == 'Segment-Cleanup':
1339+ if self.get_object_ring().get_part_nodes(int(partition))[0]['id'] \
1340+ in self.my_node_ids:
1341+ self.process_segment_cleanup(device, partition, account,
1342+ container, obj)
1343+ else:
1344+ self.logger.error('ERROR: Unknown X-Op: %s' % metadata['X-Op'])
1345+ return
1346+
1347+ def process_container_update(self, device, partition, account, container,
1348+ obj):
1349+ """
1350+ Process the container update operation, sending the object information
1351+ to the container server.
1352+
1353+ :param device: path to device
1354+ :param partition: partition for the object
1355+ :param account: account for the object
1356+ :param container: container for the object
1357+ :param obj: name for the object
1358+ """
1359+ disk_file = DiskFile(self.devices, device, partition, account,
1360+ container, obj, keep_data_fp=True, datadir=JANITORDIR)
1361+ if disk_file.is_deleted():
1362+ return
1363+ update = pickle.loads(''.join(iter(disk_file)))
1364+ op = update['op']
1365+ account = update['account']
1366+ container = update['container']
1367+ obj = update['obj']
1368+ headers = update['headers']
1369+ successes = update.get('successes', [])
1370+ part, nodes = self.get_container_ring().get_nodes(account, container)
1371+ path = '/%s/%s/%s' % (account, container, obj)
1372+ success = True
1373+ for node in nodes:
1374+ if node['id'] not in successes:
1375+ status = self.container_update(node, part, op, path, headers)
1376+ if not (200 <= status < 300) and status != 404:
1377+ success = False
1378+ else:
1379+ successes.append(node['id'])
1380+ if success:
1381+ self.container_update_successes += 1
1382+ self.logger.debug('Update sent for %s %s' %
1383+ (path, disk_file.datadir))
1384+ disk_file.tombstone(normalize_timestamp(time()))
1385+ else:
1386+ self.container_update_failures += 1
1387+ self.logger.debug('Update failed for %s %s' %
1388+ (path, disk_file.datadir))
1389+ if len(update.get('successes', [])) != len(successes):
1390+ disk_file.store_janitor_container_update(op, account,
1391+ container, obj, headers, successes)
1392+
1393+ def container_update(self, node, part, op, obj, headers):
1394+ """
1395+ Perform the actual container update network operation, sending the
1396+ object information to the container server.
1397+
1398+ :param node: node dictionary from the container ring
1399+ :param part: partition that holds the container
1400+ :param op: operation performed (ex: 'POST' or 'DELETE')
1401+ :param obj: object name being updated
1402+ :param headers: headers to send with the update
1403+ """
1404+ try:
1405+ with ConnectionTimeout(self.conn_timeout):
1406+ conn = http_connect(node['ip'], node['port'], node['device'],
1407+ part, op, obj, headers)
1408+ with Timeout(self.node_timeout):
1409+ resp = conn.getresponse()
1410+ resp.read()
1411+ return resp.status
1412+ except:
1413+ self.logger.exception('ERROR with remote server '
1414+ '%(ip)s:%(port)s/%(device)s' % node)
1415+ return 500
1416+
1417+ def process_segment_cleanup(self, device, partition, account, container,
1418+ obj):
1419+ """
1420+ Process the segment cleanup operation, checking to see if the
1421+ operation should have completed long ago but still has no manifest and
1422+ therefore should have the orphaned segments removed.
1423+
1424+ :param device: path to device
1425+ :param partition: partition for the operation
1426+ :param account: account for the operation (should always be
1427+ "Segment-Cleanup")
1428+ :param container: container for the operation (actually the timestamp
1429+ of the original PUT)
1430+ :param obj: name for the object (actually the full path
1431+ account/container/object for the original object PUT)
1432+ """
1433+ disk_file = DiskFile(self.devices, device, partition, account,
1434+ container, obj, keep_data_fp=True, datadir=JANITORDIR)
1435+ if disk_file.is_deleted():
1436+ return
1437+ disk_file_data = pickle.loads(''.join(iter(disk_file)))
1438+ segment_timestamp = container
1439+ # If not None, indicates we have started cleaning up these segments
1440+ # already and should just continue the operation.
1441+ segment_last_deleted = disk_file_data.get('segment_last_deleted', None)
1442+ # Sometimes we'll have no clue how many segments there are, which is
1443+ # what None means.
1444+ segment_count = disk_file_data.get('segment_count', None)
1445+ account = disk_file_data['account']
1446+ container = disk_file_data['container']
1447+ obj = disk_file_data['obj']
1448+ path = '/%s/%s/%s' % (account, container, obj)
1449+ if segment_last_deleted is not None:
1450+ self.logger.debug('Continue cleaning up segments for %s/%s/%s %s '
1451+ '@%s' % (device, partition, path, segment_timestamp,
1452+ segment_last_deleted))
1453+ self.cleanup_segments(disk_file, account, container, obj,
1454+ segment_timestamp, segment_count, segment_last_deleted)
1455+ return
1456+ part, nodes = self.get_object_ring().get_nodes(account, container, obj)
1457+ newest_object = None
1458+ newest_manifest = None
1459+ responses = 0
1460+ for node in nodes:
1461+ try:
1462+ with ConnectionTimeout(self.conn_timeout):
1463+ conn = http_connect(node['ip'], node['port'],
1464+ node['device'], part, 'GET', path)
1465+ with Timeout(self.node_timeout):
1466+ resp = conn.getresponse()
1467+ responses += 1
1468+ if resp.status // 100 != 2:
1469+ conn.close()
1470+ continue
1471+ if resp.getheader('x-object-type', '') != 'manifest':
1472+ if resp.getheader('x-timestamp') > newest_object:
1473+ newest_object = resp.getheader('x-timestamp')
1474+ conn.close()
1475+ if newest_object > segment_timestamp:
1476+ # We don't have to talk to the other nodes if we
1477+ # already know we have a newer object.
1478+ break
1479+ continue
1480+ with Timeout(self.node_timeout):
1481+ body = resp.read()
1482+ manifest = pickle.loads(body)
1483+ if manifest['x-timestamp'] > newest_manifest:
1484+ newest_manifest = manifest['x-timestamp']
1485+ conn.close()
1486+ if newest_manifest > segment_timestamp:
1487+ # We don't have to talk to the other nodes if we already
1488+ # know we have a newer manifest.
1489+ break
1490+ except:
1491+ self.logger.exception('ERROR with remote server '
1492+ '%(ip)s:%(port)s/%(device)s' % node)
1493+ if newest_object > segment_timestamp or \
1494+ newest_manifest > segment_timestamp:
1495+ self.logger.debug('Newer object/manifest; discarding old '
1496+ 'segments for %s/%s/%s %s' % (device, partition, path,
1497+ segment_timestamp))
1498+ self.cleanup_segments(disk_file, account, container, obj,
1499+ segment_timestamp, segment_count, segment_last_deleted)
1500+ elif newest_manifest == segment_timestamp:
1501+ self.logger.debug('Exact manifest confirmed; discarding janitor '
1502+ 'cleanup operation for %s/%s/%s %s' % (device, partition,
1503+ path, segment_timestamp))
1504+ disk_file.tombstone(normalize_timestamp(time()))
1505+ elif responses == len(nodes) and \
1506+ time() - float(segment_timestamp) > \
1507+ self.segment_reclaim_age:
1508+ self.logger.debug('All nodes agree the manifest still does not '
1509+ 'exist after %ss; discarding orphaned segments for '
1510+ '%s/%s/%s %s' % (self.segment_reclaim_age, device,
1511+ partition, path, segment_timestamp))
1512+ self.cleanup_segments(disk_file, account, container, obj,
1513+ segment_timestamp, segment_count, segment_last_deleted)
1514+
1515+ def cleanup_segments(self, disk_file, account, container, obj,
1516+ segment_timestamp, segment_count, segment_last_deleted):
1517+ """
1518+ Issues DELETEs to the segments of the object, up to
1519+ self.segments_per_pass. If all the segments are not deleted before this
1520+ function returns, it will update the disk_file with the latest point
1521+ completed.
1522+
1523+ :param disk_file: DiskFile for the Segment Cleanup operation.
1524+ :param account: Account name for the manifest object to delete segments
1525+ for.
1526+ :param container: Container name for the manifest object to delete
1527+ segments for.
1528+ :param obj: Object name for the manifest object to delete segments for.
1529+ :param segment_timestamp: The timestamp for the segments themselves.
1530+ :param segment_count: The number of segments for the object; may be
1531+ None if not yet known and this function should
1532+ try to determine the count.
1533+ :param segment_last_deleted: The segment number that was last deleted;
1534+ None of none have yet been deleted. This
1535+ allows continuation of previous deletion
1536+ attempts.
1537+ """
1538+ path = '/%s/%s/%s' % (account, container, obj)
1539+ if segment_last_deleted is None:
1540+ segment_last_deleted = -1
1541+ if segment_count is None:
1542+ # We need to determine how many segments there are so that if we
1543+ # crash this run, next run we'll know what range we'll need to work
1544+ # within. Example: Determine we have 10 segments; delete the first
1545+ # five and crash; next run we now know we need to delete no more
1546+ # than 10 segments. With the approach of just deleting segments
1547+ # until a 404 and not predetermining the count, with such a crash
1548+ # we'd never delete the last five segments.
1549+ segment_count = 0
1550+ assumptions = 0
1551+ while True:
1552+ if segment_count:
1553+ ring_obj = \
1554+ '%s/%s/%s' % (obj, segment_timestamp, segment_count)
1555+ else:
1556+ ring_obj = obj
1557+ part, nodes = self.get_object_ring().get_nodes(account,
1558+ container, ring_obj)
1559+ headers = {'X-Object-Segment': str(segment_count),
1560+ 'X-Object-Segment-Timestamp': segment_timestamp}
1561+ status = 0
1562+ for node in nodes:
1563+ try:
1564+ with ConnectionTimeout(self.conn_timeout):
1565+ conn = http_connect(node['ip'], node['port'],
1566+ node['device'], part, 'HEAD', path,
1567+ headers=headers)
1568+ with Timeout(self.node_timeout):
1569+ resp = conn.getresponse()
1570+ resp.read()
1571+ if resp.status == 404:
1572+ status = 404
1573+ elif resp.status // 100 == 2:
1574+ status = 200
1575+ break
1576+ except:
1577+ self.logger.exception('ERROR with remote server '
1578+ '%(ip)s:%(port)s/%(device)s' % node)
1579+ if status == 404:
1580+ break
1581+ segment_count += 1
1582+ if not status:
1583+ # We couldn't determine if a segment existed, so we'll just
1584+ # assume it did and move to the next one; but we'll only do
1585+ # this for five consecutive segments before just giving up
1586+ # completely and trying the next run.
1587+ assumptions += 1
1588+ if assumptions > 5:
1589+ self.segment_cleanup_failures += 1
1590+ return
1591+ assumptions = 0
1592+ disk_file.store_janitor_segment_cleanup(account, container, obj,
1593+ segment_count, segment_last_deleted)
1594+ starting_segment_last_deleted = segment_last_deleted
1595+ try:
1596+ segments_this_pass = self.segments_per_pass
1597+ while True:
1598+ segment = segment_last_deleted + 1
1599+ if segment:
1600+ ring_obj = '%s/%s/%s' % (obj, segment_timestamp, segment)
1601+ else:
1602+ ring_obj = obj
1603+ part, nodes = self.get_object_ring().get_nodes(account,
1604+ container, ring_obj)
1605+ headers = {'X-Object-Segment': str(segment),
1606+ 'X-Object-Segment-Timestamp': segment_timestamp,
1607+ 'X-Timestamp': normalize_timestamp(time())}
1608+ not_found_count = 0
1609+ success = False
1610+ for node in nodes:
1611+ try:
1612+ with ConnectionTimeout(self.conn_timeout):
1613+ conn = http_connect(node['ip'], node['port'],
1614+ node['device'], part, 'DELETE', path,
1615+ headers=headers)
1616+ with Timeout(self.node_timeout):
1617+ resp = conn.getresponse()
1618+ resp.read()
1619+ if resp.status == 404:
1620+ not_found_count += 1
1621+ elif resp.status // 100 == 2:
1622+ # For the sake of this clean up operation, we're
1623+ # going to consider even one success complete
1624+ # success.
1625+ success = True
1626+ except:
1627+ self.logger.exception('ERROR with remote server '
1628+ '%(ip)s:%(port)s/%(device)s' % node)
1629+ if not_found_count == len(nodes):
1630+ success = True
1631+ if not success:
1632+ # If we didn't even have one success, we'll have to leave
1633+ # the janitor operation to retry later.
1634+ disk_file.store_janitor_segment_cleanup(account, container,
1635+ obj, segment_count, segment_last_deleted)
1636+ self.segment_cleanup_failures += 1
1637+ return
1638+ segment_last_deleted = segment
1639+ self.segment_cleanup_segments += 1
1640+ if segment >= segment_count - 1:
1641+ disk_file.tombstone(normalize_timestamp(time()))
1642+ self.segment_cleanup_completions += 1
1643+ return
1644+ segments_this_pass -= 1
1645+ if segments_this_pass <= 0:
1646+ disk_file.store_janitor_segment_cleanup(account, container,
1647+ obj, segment_count, segment_last_deleted)
1648+ return
1649+ except:
1650+ # If we get an unexpected exception, log it and try to update the
1651+ # disk file to indicate the segment last deleted.
1652+ self.logger.exception('ERROR unexpected exception with %s:' % path)
1653+ self.segment_cleanup_failures += 1
1654+ if segment_last_deleted != starting_segment_last_deleted:
1655+ disk_file.store_janitor_segment_cleanup(account, container,
1656+ obj, segment_count, segment_last_deleted)
1657
1658=== modified file 'swift/obj/replicator.py'
1659--- swift/obj/replicator.py 2010-10-19 01:05:54 +0000
1660+++ swift/obj/replicator.py 2010-11-08 18:51:48 +0000
1661@@ -19,7 +19,6 @@
1662 import shutil
1663 import time
1664 import logging
1665-import hashlib
1666 import itertools
1667 import cPickle as pickle
1668
1669@@ -29,168 +28,16 @@
1670 from eventlet.support.greenlets import GreenletExit
1671
1672 from swift.common.ring import Ring
1673-from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
1674- renamer, compute_eta, get_logger
1675+from swift.common.utils import compute_eta, get_logger, unlink_older_than, \
1676+ whataremyips
1677 from swift.common.bufferedhttp import http_connect
1678 from swift.common.daemon import Daemon
1679+from swift.obj.diskfile import DATADIR, get_hashes, JANITORDIR, \
1680+ recalculate_hashes, SEGMENTSDIR
1681+
1682
1683 hubs.use_hub('poll')
1684
1685-PICKLE_PROTOCOL = 2
1686-ONE_WEEK = 604800
1687-HASH_FILE = 'hashes.pkl'
1688-
1689-
1690-def hash_suffix(path, reclaim_age):
1691- """
1692- Performs reclamation and returns an md5 of all (remaining) files.
1693-
1694- :param reclaim_age: age in seconds at which to remove tombstones
1695- """
1696- md5 = hashlib.md5()
1697- for hsh in sorted(os.listdir(path)):
1698- hsh_path = join(path, hsh)
1699- files = os.listdir(hsh_path)
1700- if len(files) == 1:
1701- if files[0].endswith('.ts'):
1702- # remove tombstones older than reclaim_age
1703- ts = files[0].rsplit('.', 1)[0]
1704- if (time.time() - float(ts)) > reclaim_age:
1705- os.unlink(join(hsh_path, files[0]))
1706- files.remove(files[0])
1707- elif files:
1708- files.sort(reverse=True)
1709- meta = data = tomb = None
1710- for filename in files:
1711- if not meta and filename.endswith('.meta'):
1712- meta = filename
1713- if not data and filename.endswith('.data'):
1714- data = filename
1715- if not tomb and filename.endswith('.ts'):
1716- tomb = filename
1717- if (filename < tomb or # any file older than tomb
1718- filename < data or # any file older than data
1719- (filename.endswith('.meta') and
1720- filename < meta)): # old meta
1721- os.unlink(join(hsh_path, filename))
1722- files.remove(filename)
1723- if not files:
1724- os.rmdir(hsh_path)
1725- for filename in files:
1726- md5.update(filename)
1727- try:
1728- os.rmdir(path)
1729- except OSError:
1730- pass
1731- return md5.hexdigest()
1732-
1733-
1734-def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
1735- """
1736- Recalculates hashes for the given suffixes in the partition and updates
1737- them in the partition's hashes file.
1738-
1739- :param partition_dir: directory of the partition in which to recalculate
1740- :param suffixes: list of suffixes to recalculate
1741- :param reclaim_age: age in seconds at which tombstones should be removed
1742- """
1743-
1744- def tpool_listdir(partition_dir):
1745- return dict(((suff, None) for suff in os.listdir(partition_dir)
1746- if len(suff) == 3 and isdir(join(partition_dir, suff))))
1747- hashes_file = join(partition_dir, HASH_FILE)
1748- with lock_path(partition_dir):
1749- try:
1750- with open(hashes_file, 'rb') as fp:
1751- hashes = pickle.load(fp)
1752- except Exception:
1753- hashes = tpool.execute(tpool_listdir, partition_dir)
1754- for suffix in suffixes:
1755- suffix_dir = join(partition_dir, suffix)
1756- if os.path.exists(suffix_dir):
1757- hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
1758- elif suffix in hashes:
1759- del hashes[suffix]
1760- with open(hashes_file + '.tmp', 'wb') as fp:
1761- pickle.dump(hashes, fp, PICKLE_PROTOCOL)
1762- renamer(hashes_file + '.tmp', hashes_file)
1763-
1764-
1765-def invalidate_hash(suffix_dir):
1766- """
1767- Invalidates the hash for a suffix_dir in the partition's hashes file.
1768-
1769- :param suffix_dir: absolute path to suffix dir whose hash needs
1770- invalidating
1771- """
1772-
1773- suffix = os.path.basename(suffix_dir)
1774- partition_dir = os.path.dirname(suffix_dir)
1775- hashes_file = join(partition_dir, HASH_FILE)
1776- with lock_path(partition_dir):
1777- try:
1778- with open(hashes_file, 'rb') as fp:
1779- hashes = pickle.load(fp)
1780- if suffix in hashes and not hashes[suffix]:
1781- return
1782- except Exception:
1783- return
1784- hashes[suffix] = None
1785- with open(hashes_file + '.tmp', 'wb') as fp:
1786- pickle.dump(hashes, fp, PICKLE_PROTOCOL)
1787- renamer(hashes_file + '.tmp', hashes_file)
1788-
1789-
1790-def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
1791- """
1792- Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
1793- the hash cache for suffix existence at the (unexpectedly high) cost of a
1794- listdir. reclaim_age is just passed on to hash_suffix.
1795-
1796- :param partition_dir: absolute path of partition to get hashes for
1797- :param do_listdir: force existence check for all hashes in the partition
1798- :param reclaim_age: age at which to remove tombstones
1799-
1800- :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
1801- """
1802-
1803- def tpool_listdir(hashes, partition_dir):
1804- return dict(((suff, hashes.get(suff, None))
1805- for suff in os.listdir(partition_dir)
1806- if len(suff) == 3 and isdir(join(partition_dir, suff))))
1807- hashed = 0
1808- hashes_file = join(partition_dir, HASH_FILE)
1809- with lock_path(partition_dir):
1810- modified = False
1811- hashes = {}
1812- try:
1813- with open(hashes_file, 'rb') as fp:
1814- hashes = pickle.load(fp)
1815- except Exception:
1816- do_listdir = True
1817- if do_listdir:
1818- hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
1819- modified = True
1820- for suffix, hash_ in hashes.items():
1821- if not hash_:
1822- suffix_dir = join(partition_dir, suffix)
1823- if os.path.exists(suffix_dir):
1824- try:
1825- hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
1826- hashed += 1
1827- except OSError:
1828- logging.exception('Error hashing suffix')
1829- hashes[suffix] = None
1830- else:
1831- del hashes[suffix]
1832- modified = True
1833- sleep()
1834- if modified:
1835- with open(hashes_file + '.tmp', 'wb') as fp:
1836- pickle.dump(hashes, fp, PICKLE_PROTOCOL)
1837- renamer(hashes_file + '.tmp', hashes_file)
1838- return hashed, hashes
1839-
1840
1841 class ObjectReplicator(Daemon):
1842 """
1843@@ -302,7 +149,7 @@
1844 if not had_any:
1845 return False
1846 args.append(join(rsync_module, node['device'],
1847- 'objects', job['partition']))
1848+ job.get('datadir', DATADIR), job['partition']))
1849 return self._rsync(args) == 0
1850
1851 def check_ring(self):
1852@@ -337,12 +184,14 @@
1853 for node in job['nodes']:
1854 success = self.rsync(node, job, suffixes)
1855 if success:
1856+ headers = {'Content-Length': '0',
1857+ 'X-Data-Dir': job.get('datadir', DATADIR)}
1858 with Timeout(self.http_timeout):
1859 http_connect(node['ip'],
1860 node['port'],
1861 node['device'], job['partition'], 'REPLICATE',
1862 '/' + '-'.join(suffixes),
1863- headers={'Content-Length': '0'}).getresponse().read()
1864+ headers=headers).getresponse().read()
1865 responses.append(success)
1866 if not suffixes or (len(responses) == \
1867 self.object_ring.replica_count and all(responses)):
1868@@ -374,10 +223,12 @@
1869 node = next(nodes)
1870 attempts_left -= 1
1871 try:
1872+ headers = {'Content-Length': '0',
1873+ 'X-Data-Dir': job.get('datadir', DATADIR)}
1874 with Timeout(self.http_timeout):
1875 resp = http_connect(node['ip'], node['port'],
1876 node['device'], job['partition'], 'REPLICATE',
1877- '', headers={'Content-Length': '0'}).getresponse()
1878+ '', headers=headers).getresponse()
1879 if resp.status == 507:
1880 self.logger.error('%s/%s responded as unmounted' %
1881 (node['ip'], node['device']))
1882@@ -397,11 +248,13 @@
1883 self.rsync(node, job, suffixes)
1884 recalculate_hashes(job['path'], suffixes,
1885 reclaim_age=self.reclaim_age)
1886+ headers = {'Content-Length': '0',
1887+ 'X-Data-Dir': job.get('datadir', DATADIR)}
1888 with Timeout(self.http_timeout):
1889 conn = http_connect(node['ip'], node['port'],
1890 node['device'], job['partition'], 'REPLICATE',
1891 '/' + '-'.join(suffixes),
1892- headers={'Content-Length': '0'})
1893+ headers=headers)
1894 conn.getresponse().read()
1895 self.suffix_sync += len(suffixes)
1896 except (Exception, Timeout):
1897@@ -489,24 +342,27 @@
1898 dev for dev in self.object_ring.devs
1899 if dev and dev['ip'] in ips and dev['port'] == self.port]:
1900 dev_path = join(self.devices_dir, local_dev['device'])
1901- obj_path = join(dev_path, 'objects')
1902- tmp_path = join(dev_path, 'tmp')
1903 if self.mount_check and not os.path.ismount(dev_path):
1904 self.logger.warn('%s is not mounted' % local_dev['device'])
1905 continue
1906+ tmp_path = join(dev_path, 'tmp')
1907 unlink_older_than(tmp_path, time.time() - self.reclaim_age)
1908- if not os.path.exists(obj_path):
1909- continue
1910- for partition in os.listdir(obj_path):
1911- try:
1912- nodes = [node for node in
1913- self.object_ring.get_part_nodes(int(partition))
1914- if node['id'] != local_dev['id']]
1915- jobs.append(dict(path=join(obj_path, partition),
1916- nodes=nodes, delete=len(nodes) > 2,
1917- partition=partition))
1918- except ValueError:
1919- continue
1920+ for datadir in (DATADIR, JANITORDIR, SEGMENTSDIR):
1921+ obj_path = join(dev_path, datadir)
1922+ if os.path.exists(obj_path):
1923+ for partition in os.listdir(obj_path):
1924+ try:
1925+ nodes = [node for node in
1926+ self.object_ring.get_part_nodes(
1927+ int(partition))
1928+ if node['id'] != local_dev['id']]
1929+ jobs.append(dict(
1930+ path=join(obj_path, partition),
1931+ nodes=nodes, delete=len(nodes) > 2,
1932+ partition=partition,
1933+ datadir=datadir))
1934+ except ValueError:
1935+ continue
1936 random.shuffle(jobs)
1937 # Partititons that need to be deleted take priority
1938 jobs.sort(key=lambda job: not job['delete'])
1939
1940=== modified file 'swift/obj/server.py'
1941--- swift/obj/server.py 2010-11-01 21:47:48 +0000
1942+++ swift/obj/server.py 2010-11-08 18:51:48 +0000
1943@@ -17,225 +17,28 @@
1944
1945 from __future__ import with_statement
1946 import cPickle as pickle
1947-import errno
1948 import os
1949 import time
1950 import traceback
1951 from datetime import datetime
1952 from hashlib import md5
1953-from tempfile import mkstemp
1954 from urllib import unquote
1955-from contextlib import contextmanager
1956
1957 from webob import Request, Response, UTC
1958 from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
1959 HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
1960 HTTPNotModified, HTTPPreconditionFailed, \
1961 HTTPRequestTimeout, HTTPUnprocessableEntity, HTTPMethodNotAllowed
1962-from xattr import getxattr, setxattr
1963 from eventlet import sleep, Timeout
1964
1965-from swift.common.utils import mkdirs, normalize_timestamp, \
1966- storage_directory, hash_path, renamer, fallocate, \
1967- split_path, drop_buffer_cache, get_logger, write_pickle
1968+from swift.common.utils import drop_buffer_cache, fallocate, get_logger, \
1969+ mkdirs, normalize_timestamp, split_path
1970 from swift.common.bufferedhttp import http_connect
1971 from swift.common.constraints import check_object_creation, check_mount, \
1972 check_float, check_utf8
1973 from swift.common.exceptions import ConnectionTimeout
1974-from swift.obj.replicator import get_hashes, invalidate_hash, \
1975- recalculate_hashes
1976-
1977-
1978-DATADIR = 'objects'
1979-ASYNCDIR = 'async_pending'
1980-PICKLE_PROTOCOL = 2
1981-METADATA_KEY = 'user.swift.metadata'
1982-MAX_OBJECT_NAME_LENGTH = 1024
1983-
1984-
1985-def read_metadata(fd):
1986- """
1987- Helper function to read the pickled metadata from an object file.
1988-
1989- :param fd: file descriptor to load the metadata from
1990-
1991- :returns: dictionary of metadata
1992- """
1993- metadata = ''
1994- key = 0
1995- try:
1996- while True:
1997- metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
1998- key += 1
1999- except IOError:
2000- pass
2001- return pickle.loads(metadata)
2002-
2003-
2004-class DiskFile(object):
2005- """
2006- Manage object files on disk.
2007-
2008- :param path: path to devices on the node
2009- :param device: device name
2010- :param partition: partition on the device the object lives in
2011- :param account: account name for the object
2012- :param container: container name for the object
2013- :param obj: object name for the object
2014- :param keep_data_fp: if True, don't close the fp, otherwise close it
2015- :param disk_chunk_Size: size of chunks on file reads
2016- """
2017-
2018- def __init__(self, path, device, partition, account, container, obj,
2019- keep_data_fp=False, disk_chunk_size=65536):
2020- self.disk_chunk_size = disk_chunk_size
2021- self.name = '/' + '/'.join((account, container, obj))
2022- name_hash = hash_path(account, container, obj)
2023- self.datadir = os.path.join(path, device,
2024- storage_directory(DATADIR, partition, name_hash))
2025- self.tmpdir = os.path.join(path, device, 'tmp')
2026- self.metadata = {}
2027- self.meta_file = None
2028- self.data_file = None
2029- if not os.path.exists(self.datadir):
2030- return
2031- files = sorted(os.listdir(self.datadir), reverse=True)
2032- for file in files:
2033- if file.endswith('.ts'):
2034- self.data_file = self.meta_file = None
2035- self.metadata = {'deleted': True}
2036- return
2037- if file.endswith('.meta') and not self.meta_file:
2038- self.meta_file = os.path.join(self.datadir, file)
2039- if file.endswith('.data') and not self.data_file:
2040- self.data_file = os.path.join(self.datadir, file)
2041- break
2042- if not self.data_file:
2043- return
2044- self.fp = open(self.data_file, 'rb')
2045- self.metadata = read_metadata(self.fp)
2046- if not keep_data_fp:
2047- self.close()
2048- if self.meta_file:
2049- with open(self.meta_file) as mfp:
2050- for key in self.metadata.keys():
2051- if key.lower() not in ('content-type', 'content-encoding',
2052- 'deleted', 'content-length', 'etag'):
2053- del self.metadata[key]
2054- self.metadata.update(read_metadata(mfp))
2055-
2056- def __iter__(self):
2057- """Returns an iterator over the data file."""
2058- try:
2059- dropped_cache = 0
2060- read = 0
2061- while True:
2062- chunk = self.fp.read(self.disk_chunk_size)
2063- if chunk:
2064- read += len(chunk)
2065- if read - dropped_cache > (1024 * 1024):
2066- drop_buffer_cache(self.fp.fileno(), dropped_cache,
2067- read - dropped_cache)
2068- dropped_cache = read
2069- yield chunk
2070- else:
2071- drop_buffer_cache(self.fp.fileno(), dropped_cache,
2072- read - dropped_cache)
2073- break
2074- finally:
2075- self.close()
2076-
2077- def app_iter_range(self, start, stop):
2078- """Returns an iterator over the data file for range (start, stop)"""
2079- if start:
2080- self.fp.seek(start)
2081- if stop is not None:
2082- length = stop - start
2083- else:
2084- length = None
2085- for chunk in self:
2086- if length is not None:
2087- length -= len(chunk)
2088- if length < 0:
2089- # Chop off the extra:
2090- yield chunk[:length]
2091- break
2092- yield chunk
2093-
2094- def close(self):
2095- """Close the file."""
2096- if self.fp:
2097- self.fp.close()
2098- self.fp = None
2099-
2100- def is_deleted(self):
2101- """
2102- Check if the file is deleted.
2103-
2104- :returns: True if the file doesn't exist or has been flagged as
2105- deleted.
2106- """
2107- return not self.data_file or 'deleted' in self.metadata
2108-
2109- @contextmanager
2110- def mkstemp(self):
2111- """Contextmanager to make a temporary file."""
2112- if not os.path.exists(self.tmpdir):
2113- mkdirs(self.tmpdir)
2114- fd, tmppath = mkstemp(dir=self.tmpdir)
2115- try:
2116- yield fd, tmppath
2117- finally:
2118- try:
2119- os.close(fd)
2120- except OSError:
2121- pass
2122- try:
2123- os.unlink(tmppath)
2124- except OSError:
2125- pass
2126-
2127- def put(self, fd, tmppath, metadata, extension='.data'):
2128- """
2129- Finalize writing the file on disk, and renames it from the temp file to
2130- the real location. This should be called after the data has been
2131- written to the temp file.
2132-
2133- :params fd: file descriptor of the temp file
2134- :param tmppath: path to the temporary file being used
2135- :param metadata: dictionary of metada to be written
2136- :param extention: extension to be used when making the file
2137- """
2138- metadata['name'] = self.name
2139- timestamp = normalize_timestamp(metadata['X-Timestamp'])
2140- metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
2141- key = 0
2142- while metastr:
2143- setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
2144- metastr = metastr[254:]
2145- key += 1
2146- if 'Content-Length' in metadata:
2147- drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
2148- os.fsync(fd)
2149- invalidate_hash(os.path.dirname(self.datadir))
2150- renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
2151- self.metadata = metadata
2152-
2153- def unlinkold(self, timestamp):
2154- """
2155- Remove any older versions of the object file. Any file that has an
2156- older timestamp than timestamp will be deleted.
2157-
2158- :param timestamp: timestamp to compare with each file
2159- """
2160- timestamp = normalize_timestamp(timestamp)
2161- for fname in os.listdir(self.datadir):
2162- if fname < timestamp:
2163- try:
2164- os.unlink(os.path.join(self.datadir, fname))
2165- except OSError, err: # pragma: no cover
2166- if err.errno != errno.ENOENT:
2167- raise
2168+from swift.obj.diskfile import DATADIR, DiskFile, get_hashes, JANITORDIR, \
2169+ recalculate_hashes
2170
2171
2172 class ObjectController(object):
2173@@ -262,7 +65,7 @@
2174 self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
2175
2176 def container_update(self, op, account, container, obj, headers_in,
2177- headers_out, objdevice):
2178+ headers_out, objdevice, objpartition):
2179 """
2180 Update the container when objects are updated.
2181
2182@@ -274,6 +77,7 @@
2183 :param headers_out: dictionary of headers to send in the container
2184 request
2185 :param objdevice: device name that the object is in
2186+ :param objpartition: partition that the object is in
2187 """
2188 host = headers_in.get('X-Container-Host', None)
2189 partition = headers_in.get('X-Container-Partition', None)
2190@@ -293,20 +97,16 @@
2191 return
2192 else:
2193 self.logger.error('ERROR Container update failed (saving '
2194- 'for async update later): %d response from %s:%s/%s' %
2195- (response.status, ip, port, contdevice))
2196+ 'for janitor update later): %d response from %s:%s/%s'
2197+ % (response.status, ip, port, contdevice))
2198 except:
2199 self.logger.exception('ERROR container update failed with '
2200- '%s:%s/%s transaction %s (saving for async update later)' %
2201+ '%s:%s/%s transaction %s (saving for janitor update later)' %
2202 (ip, port, contdevice, headers_in.get('x-cf-trans-id', '-')))
2203- async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)
2204- ohash = hash_path(account, container, obj)
2205- write_pickle(
2206- {'op': op, 'account': account, 'container': container,
2207- 'obj': obj, 'headers': headers_out},
2208- os.path.join(async_dir, ohash[-3:], ohash + '-' +
2209- normalize_timestamp(headers_out['x-timestamp'])),
2210- os.path.join(self.devices, objdevice, 'tmp'))
2211+ df = DiskFile(self.devices, objdevice, objpartition, account,
2212+ container, obj, datadir=JANITORDIR)
2213+ df.store_janitor_container_update(op, account, container, obj,
2214+ headers_out, [])
2215
2216 def POST(self, request):
2217 """Handle HTTP POST requests for the Swift Object Server."""
2218@@ -355,7 +155,15 @@
2219 if error_response:
2220 return error_response
2221 file = DiskFile(self.devices, device, partition, account, container,
2222- obj, disk_chunk_size=self.disk_chunk_size)
2223+ obj, disk_chunk_size=self.disk_chunk_size, keep_data_fp=True,
2224+ segment=request.headers.get('x-object-segment'),
2225+ segment_timestamp=request.headers['x-timestamp'])
2226+ overwritten_manifest = False
2227+ if not file.is_deleted() and \
2228+ file.metadata.get('X-Object-Type') == 'manifest' and \
2229+ 'x-object-segment' not in request.headers:
2230+ overwritten_manifest = pickle.loads(''.join(iter(file)))
2231+ file.close()
2232 upload_expiration = time.time() + self.max_upload_time
2233 etag = md5()
2234 upload_size = 0
2235@@ -397,17 +205,51 @@
2236 if 'content-encoding' in request.headers:
2237 metadata['Content-Encoding'] = \
2238 request.headers['Content-Encoding']
2239- file.put(fd, tmppath, metadata)
2240+ if 'x-object-type' in request.headers:
2241+ metadata['X-Object-Type'] = request.headers['x-object-type']
2242+ if 'x-object-segment' in request.headers:
2243+ metadata['X-Object-Segment'] = \
2244+ request.headers['x-object-segment']
2245+ no_longer_segment = False
2246+ if 'x-object-segment-if-length' in request.headers and \
2247+ int(request.headers['x-object-segment-if-length']) != \
2248+ os.fstat(fd).st_size:
2249+ del metadata['X-Object-Type']
2250+ del metadata['X-Object-Segment']
2251+ no_longer_segment = True
2252+ elif int(request.headers.get('x-object-segment', -1)) == 0:
2253+ # Write out a janitor operation to clean up this multi-segment
2254+ # PUT in the future if it fails.
2255+ df = DiskFile(self.devices, device, partition,
2256+ 'Segment-Cleanup', request.headers['x-timestamp'],
2257+ '%s/%s/%s' % (account, container, obj),
2258+ datadir=JANITORDIR)
2259+ df.store_janitor_segment_cleanup(account, container, obj,
2260+ segment_count=None, segment_last_deleted=None)
2261+ if overwritten_manifest:
2262+ # Write out a janitor operation to clean up the overwritten
2263+ # multi-segment object.
2264+ df = DiskFile(self.devices, device, partition,
2265+ 'Segment-Cleanup', overwritten_manifest['x-timestamp'],
2266+ '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR)
2267+ df.store_janitor_segment_cleanup(account, container, obj,
2268+ segment_count=(overwritten_manifest['content-length'] /
2269+ overwritten_manifest['x-segment-size'] + 1),
2270+ segment_last_deleted=None)
2271+ file.put(fd, tmppath, metadata,
2272+ no_longer_segment=no_longer_segment)
2273 file.unlinkold(metadata['X-Timestamp'])
2274- self.container_update('PUT', account, container, obj, request.headers,
2275- {'x-size': file.metadata['Content-Length'],
2276- 'x-content-type': file.metadata['Content-Type'],
2277- 'x-timestamp': file.metadata['X-Timestamp'],
2278- 'x-etag': file.metadata['ETag'],
2279- 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
2280- device)
2281- resp = HTTPCreated(request=request, etag=etag)
2282- return resp
2283+ if 'X-Object-Segment' not in file.metadata:
2284+ self.container_update('PUT', account, container, obj,
2285+ request.headers,
2286+ {'x-size': request.headers.get('x-object-length',
2287+ file.metadata['Content-Length']),
2288+ 'x-content-type': file.metadata['Content-Type'],
2289+ 'x-timestamp': file.metadata['X-Timestamp'],
2290+ 'x-etag': file.metadata['ETag'],
2291+ 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
2292+ device, partition)
2293+ return HTTPCreated(request=request, etag=etag)
2294
2295 def GET(self, request):
2296 """Handle HTTP GET requests for the Swift Object Server."""
2297@@ -420,7 +262,9 @@
2298 if self.mount_check and not check_mount(self.devices, device):
2299 return Response(status='507 %s is not mounted' % device)
2300 file = DiskFile(self.devices, device, partition, account, container,
2301- obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size)
2302+ obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size,
2303+ segment=request.headers.get('x-object-segment'),
2304+ segment_timestamp=request.headers.get('x-object-segment-timestamp'))
2305 if file.is_deleted():
2306 if request.headers.get('if-match') == '*':
2307 return HTTPPreconditionFailed(request=request)
2308@@ -460,7 +304,9 @@
2309 'application/octet-stream'), app_iter=file,
2310 request=request, conditional_response=True)
2311 for key, value in file.metadata.iteritems():
2312- if key.lower().startswith('x-object-meta-'):
2313+ if key.lower().startswith('x-object-meta-') or \
2314+ key.lower() in ('x-timestamp', 'x-object-type',
2315+ 'x-object-segment'):
2316 response.headers[key] = value
2317 response.etag = file.metadata['ETag']
2318 response.last_modified = float(file.metadata['X-Timestamp'])
2319@@ -482,13 +328,17 @@
2320 if self.mount_check and not check_mount(self.devices, device):
2321 return Response(status='507 %s is not mounted' % device)
2322 file = DiskFile(self.devices, device, partition, account, container,
2323- obj, disk_chunk_size=self.disk_chunk_size)
2324+ obj, disk_chunk_size=self.disk_chunk_size,
2325+ segment=request.headers.get('x-object-segment'),
2326+ segment_timestamp=request.headers.get('x-object-segment-timestamp'))
2327 if file.is_deleted():
2328 return HTTPNotFound(request=request)
2329 response = Response(content_type=file.metadata['Content-Type'],
2330 request=request, conditional_response=True)
2331 for key, value in file.metadata.iteritems():
2332- if key.lower().startswith('x-object-meta-'):
2333+ if key.lower().startswith('x-object-meta-') or \
2334+ key.lower() in ('x-timestamp', 'x-object-type',
2335+ 'x-object-segment'):
2336 response.headers[key] = value
2337 response.etag = file.metadata['ETag']
2338 response.last_modified = float(file.metadata['X-Timestamp'])
2339@@ -513,21 +363,43 @@
2340 return Response(status='507 %s is not mounted' % device)
2341 response_class = HTTPNoContent
2342 file = DiskFile(self.devices, device, partition, account, container,
2343- obj, disk_chunk_size=self.disk_chunk_size)
2344+ obj, disk_chunk_size=self.disk_chunk_size, keep_data_fp=True,
2345+ segment=request.headers.get('x-object-segment'),
2346+ segment_timestamp=request.headers.get('x-object-segment-timestamp'))
2347+ deleted_manifest = False
2348 if file.is_deleted():
2349 response_class = HTTPNotFound
2350+ elif 'x-object-segment' not in request.headers and \
2351+ file.metadata.get('X-Object-Type') == 'manifest':
2352+ deleted_manifest = pickle.loads(''.join(iter(file)))
2353+ file.close()
2354 metadata = {
2355 'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True,
2356 }
2357 with file.mkstemp() as (fd, tmppath):
2358+ if deleted_manifest:
2359+ # Write out a janitor operation to clean up the deleted
2360+ # multi-segment object. Note that setting the
2361+ # segment_last_deleted = -1 will cause the object-janitor to
2362+ # start removing the segments immediately rather than waiting
2363+ # segment_reclaim_age (otherwise it can't tell the difference
2364+ # between a deleted manifest and manifest that just hasn't
2365+ # appeared yet).
2366+ df = DiskFile(self.devices, device, partition,
2367+ 'Segment-Cleanup', deleted_manifest['x-timestamp'],
2368+ '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR)
2369+ df.store_janitor_segment_cleanup(account, container, obj,
2370+ segment_count=(deleted_manifest['content-length'] /
2371+ deleted_manifest['x-segment-size']) + 1,
2372+ segment_last_deleted=-1)
2373 file.put(fd, tmppath, metadata, extension='.ts')
2374 file.unlinkold(metadata['X-Timestamp'])
2375- self.container_update('DELETE', account, container, obj,
2376- request.headers, {'x-timestamp': metadata['X-Timestamp'],
2377- 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
2378- device)
2379- resp = response_class(request=request)
2380- return resp
2381+ if 'x-object-segment' not in request.headers:
2382+ self.container_update('DELETE', account, container, obj,
2383+ request.headers, {'x-timestamp': metadata['X-Timestamp'],
2384+ 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
2385+ device, partition)
2386+ return response_class(request=request)
2387
2388 def REPLICATE(self, request):
2389 """
2390@@ -542,7 +414,8 @@
2391 content_type='text/plain')
2392 if self.mount_check and not check_mount(self.devices, device):
2393 return Response(status='507 %s is not mounted' % device)
2394- path = os.path.join(self.devices, device, DATADIR, partition)
2395+ path = os.path.join(self.devices, device,
2396+ request.headers.get('x-data-dir', DATADIR), partition)
2397 if not os.path.exists(path):
2398 mkdirs(path)
2399 if suffix:
2400
2401=== modified file 'swift/obj/updater.py'
2402--- swift/obj/updater.py 2010-09-23 16:09:30 +0000
2403+++ swift/obj/updater.py 2010-11-08 18:51:48 +0000
2404@@ -27,11 +27,25 @@
2405 from swift.common.ring import Ring
2406 from swift.common.utils import get_logger, renamer, write_pickle
2407 from swift.common.daemon import Daemon
2408-from swift.obj.server import ASYNCDIR
2409+
2410+
2411+# Old-style async pending directory
2412+ASYNCDIR = 'async_pending'
2413
2414
2415 class ObjectUpdater(Daemon):
2416- """Update object information in container listings."""
2417+ """
2418+ Update object information in container listings based on postponed
2419+ operations stored in the old-style async pending directory.
2420+
2421+ After upgrade, no new operations will be stored in this old-style async
2422+ pending directory. Once this daemon empties those directories of all
2423+ operations, the daemon may be disabled and the directories removed.
2424+
2425+ In a future release of Swift, this daemon will be removed.
2426+
2427+ The new functionality is in swift.obj.janitor.
2428+ """
2429
2430 def __init__(self, conf):
2431 self.conf = conf
2432
2433=== modified file 'swift/proxy/server.py'
2434--- swift/proxy/server.py 2010-11-05 14:47:43 +0000
2435+++ swift/proxy/server.py 2010-11-08 18:51:48 +0000
2436@@ -14,21 +14,22 @@
2437 # limitations under the License.
2438
2439 from __future__ import with_statement
2440+import cPickle as pickle
2441 import mimetypes
2442 import os
2443 import time
2444 import traceback
2445 from ConfigParser import ConfigParser
2446+from hashlib import md5
2447 from urllib import unquote, quote
2448 import uuid
2449 import functools
2450
2451 from eventlet.timeout import Timeout
2452-from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \
2453- HTTPNotFound, HTTPPreconditionFailed, \
2454- HTTPRequestTimeout, HTTPServiceUnavailable, \
2455- HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \
2456- status_map
2457+from webob.exc import HTTPBadRequest, HTTPCreated, HTTPInternalServerError, \
2458+ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
2459+ HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \
2460+ HTTPServiceUnavailable, HTTPUnprocessableEntity, status_map
2461 from webob import Request, Response
2462
2463 from swift.common.ring import Ring
2464@@ -37,7 +38,7 @@
2465 from swift.common.bufferedhttp import http_connect
2466 from swift.common.constraints import check_metadata, check_object_creation, \
2467 check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
2468- MAX_FILE_SIZE
2469+ PICKLE_PROTOCOL
2470 from swift.common.exceptions import ChunkReadTimeout, \
2471 ChunkWriteTimeout, ConnectionTimeout
2472
2473@@ -89,6 +90,144 @@
2474 return wrapped
2475
2476
2477+class SegmentedIterable(object):
2478+ """
2479+ Iterable that returns the object contents for a segmented object in Swift.
2480+
2481+ In addition to these params, you can also set the `response` attr just
2482+ after creating the SegmentedIterable and it will update the response's
2483+ `bytes_transferred` value (used to log the size of the request).
2484+
2485+ :param controller: The ObjectController instance to work with.
2486+ :param content_length: The total length of the object.
2487+ :param segment_size: The length of each segment (except perhaps the last)
2488+ of the object.
2489+ :param timestamp: The X-Timestamp of the object's segments (set on the PUT,
2490+ not changed on the POSTs).
2491+ """
2492+
2493+ def __init__(self, controller, content_length, segment_size, timestamp):
2494+ self.controller = controller
2495+ self.content_length = content_length
2496+ self.segment_size = segment_size
2497+ self.timestamp = timestamp
2498+ self.position = 0
2499+ self.segment = -1
2500+ self.segment_iter = None
2501+ self.response = None
2502+
2503+ def load_next_segment(self):
2504+ """ Loads the self.segment_iter with the next segment's contents. """
2505+ self.segment += 1
2506+ if self.segment:
2507+ ring_object_name = '%s/%s/%s' % (self.controller.object_name,
2508+ self.timestamp, self.segment)
2509+ else:
2510+ ring_object_name = self.controller.object_name
2511+ partition, nodes = self.controller.app.object_ring.get_nodes(
2512+ self.controller.account_name, self.controller.container_name,
2513+ ring_object_name)
2514+ path = '/%s/%s/%s' % (self.controller.account_name,
2515+ self.controller.container_name, self.controller.object_name)
2516+ req = Request.blank(path, headers={'X-Object-Segment': self.segment,
2517+ 'X-Object-Segment-Timestamp': self.timestamp})
2518+ resp = self.controller.GETorHEAD_base(req, 'Object',
2519+ partition, self.controller.iter_nodes(partition, nodes,
2520+ self.controller.app.object_ring), path,
2521+ self.controller.app.object_ring.replica_count)
2522+ if resp.status_int // 100 != 2:
2523+ raise Exception(
2524+ 'Could not load segment %s of %s' % (self.segment, path))
2525+ self.segment_iter = resp.app_iter
2526+
2527+ def __iter__(self):
2528+ """ Standard iterator function that returns the object's contents. """
2529+ while self.position < self.content_length:
2530+ if not self.segment_iter:
2531+ self.load_next_segment()
2532+ while True:
2533+ with ChunkReadTimeout(self.controller.app.node_timeout):
2534+ try:
2535+ chunk = self.segment_iter.next()
2536+ break
2537+ except StopIteration:
2538+ self.load_next_segment()
2539+ if self.position + len(chunk) > self.content_length:
2540+ chunk = chunk[:self.content_length - self.position]
2541+ self.position += len(chunk)
2542+ if self.response:
2543+ self.response.bytes_transferred = \
2544+ getattr(self.response, 'bytes_transferred', 0) + len(chunk)
2545+ yield chunk
2546+
2547+ def app_iter_range(self, start, stop):
2548+ """
2549+ Non-standard iterator function for use with Webob in serving Range
2550+ requests more quickly.
2551+
2552+ .. note::
2553+
2554+ This currently helps on speed by jumping to the proper segment to
2555+ start with (and ending without reading the trailing segments, but
2556+ that already happened technically with __iter__).
2557+
2558+ But, what it does not do yet is issue a Range request with the
2559+ first segment to allow the object server to seek to the segment
2560+ start point.
2561+
2562+ Instead, it just reads and throws away all leading segment data.
2563+ Since segments are 2G by default, it'll have to transfer the whole
2564+ 2G from the object server to the proxy server even if it only needs
2565+ the last byte. In practice, this should happen fairly quickly
2566+ relative to how long requests take for these very large files; but
2567+ it's still wasteful.
2568+
2569+ Anyway, it shouldn't be too hard to implement, I just want to keep
2570+ the complexity down for now.
2571+
2572+ :param start: The first byte (zero-based) to return. None for 0.
2573+ :param stop: The last byte (zero-based) to return. None for end.
2574+ """
2575+ if start is None:
2576+ start = 0
2577+ if start:
2578+ self.segment = (start / self.segment_size) - 1
2579+ self.load_next_segment()
2580+ self.position = self.segment * self.segment_size
2581+ segment_start = start - (self.segment * self.segment_size)
2582+ while segment_start:
2583+ with ChunkReadTimeout(self.controller.app.node_timeout):
2584+ chunk = self.segment_iter.next()
2585+ self.position += len(chunk)
2586+ if len(chunk) > segment_start:
2587+ chunk = chunk[segment_start:]
2588+ if self.response:
2589+ self.response.bytes_transferred = \
2590+ getattr(self.response, 'bytes_transferred', 0) + \
2591+ len(chunk)
2592+ yield chunk
2593+ segment_start = 0
2594+ else:
2595+ segment_start -= len(chunk)
2596+ if stop is not None:
2597+ length = stop - start
2598+ else:
2599+ length = None
2600+ for chunk in self:
2601+ if length is not None:
2602+ length -= len(chunk)
2603+ if length < 0:
2604+ # bytes_transferred had len(chunk) added by __iter__ so we
2605+ # need to subtract what we aren't going to use of the chunk
2606+ if self.response:
2607+ self.response.bytes_transferred = \
2608+ getattr(self.response, 'bytes_transferred',
2609+ length) + length
2610+ yield chunk[:length]
2611+ break
2612+ yield chunk
2613+
2614+
2615 def get_container_memcache_key(account, container):
2616 path = '/%s/%s' % (account, container)
2617 return 'container%s' % path
2618@@ -518,11 +657,56 @@
2619 aresp = req.environ['swift.authorize'](req)
2620 if aresp:
2621 return aresp
2622+ # This is bit confusing, so an explanation:
2623+ # * First we attempt the GET/HEAD normally, as this is the usual case.
2624+ # * If the request was a Range request and gave us a 416 Unsatisfiable
2625+ # response, we might be trying to do an invalid Range on a manifest
2626+ # object, so we try again with no Range.
2627+ # * If it turns out we have a manifest object, and we had a Range
2628+ # request originally that actually succeeded or we had a HEAD
2629+ # request, we have to do the request again as a full GET because
2630+ # we'll need the whole manifest.
2631+ # * Finally, if we had a manifest object, we pass it and the request
2632+ # off to GETorHEAD_segmented; otherwise we just return the response.
2633 partition, nodes = self.app.object_ring.get_nodes(
2634 self.account_name, self.container_name, self.object_name)
2635- return self.GETorHEAD_base(req, 'Object', partition,
2636+ resp = mresp = self.GETorHEAD_base(req, 'Object', partition,
2637+ self.iter_nodes(partition, nodes, self.app.object_ring),
2638+ req.path_info, self.app.object_ring.replica_count)
2639+ range_value = None
2640+ if mresp.status_int == 416:
2641+ range_value = req.range
2642+ req.range = None
2643+ mresp = self.GETorHEAD_base(req, 'Object', partition,
2644 self.iter_nodes(partition, nodes, self.app.object_ring),
2645 req.path_info, self.app.object_ring.replica_count)
2646+ if mresp.status_int // 100 != 2:
2647+ return resp
2648+ if 'x-object-type' in mresp.headers:
2649+ if mresp.headers['x-object-type'] == 'manifest':
2650+ if req.method == 'HEAD':
2651+ req.method = 'GET'
2652+ mresp = self.GETorHEAD_base(req, 'Object', partition,
2653+ self.iter_nodes(partition, nodes,
2654+ self.app.object_ring), req.path_info,
2655+ self.app.object_ring.replica_count)
2656+ if mresp.status_int // 100 != 2:
2657+ return mresp
2658+ req.method = 'HEAD'
2659+ elif req.range:
2660+ range_value = req.range
2661+ req.range = None
2662+ mresp = self.GETorHEAD_base(req, 'Object', partition,
2663+ self.iter_nodes(partition, nodes,
2664+ self.app.object_ring), req.path_info,
2665+ self.app.object_ring.replica_count)
2666+ if mresp.status_int // 100 != 2:
2667+ return mresp
2668+ if range_value:
2669+ req.range = range_value
2670+ return self.GETorHEAD_segmented(req, mresp)
2671+ return HTTPNotFound(request=req)
2672+ return resp
2673
2674 @public
2675 @delay_denial
2676@@ -536,6 +720,32 @@
2677 """Handler for HTTP HEAD requests."""
2678 return self.GETorHEAD(req)
2679
2680+ def GETorHEAD_segmented(self, req, mresp):
2681+ """
2682+ Performs a GET for a segmented object.
2683+
2684+ :param req: The webob.Request to process.
2685+ :param mresp: The webob.Response for the original manifest request.
2686+ :returns: webob.Response object.
2687+ """
2688+ manifest = pickle.loads(''.join(mresp.app_iter))
2689+ content_length = int(manifest['content-length'])
2690+ segment_size = int(manifest['x-segment-size'])
2691+ headers = dict(mresp.headers)
2692+ headers.update(manifest)
2693+ del headers['x-segment-size']
2694+ resp = Response(app_iter=SegmentedIterable(self, content_length,
2695+ segment_size, manifest['x-timestamp']), headers=headers,
2696+ request=req, conditional_response=True)
2697+ resp.headers['etag'] = manifest['etag'].strip('"')
2698+ resp.last_modified = mresp.last_modified
2699+ resp.content_length = int(manifest['content-length'])
2700+ resp.content_type = manifest['content-type']
2701+ if 'content-encoding' in manifest:
2702+ resp.content_encoding = manifest['content-encoding']
2703+ resp.app_iter.response = req.get_response(resp)
2704+ return resp.app_iter.response
2705+
2706 @public
2707 @delay_denial
2708 def POST(self, req):
2709@@ -609,7 +819,8 @@
2710 req.headers['Content-Type'] = 'application/octet-stream'
2711 else:
2712 req.headers['Content-Type'] = guessed_type
2713- error_response = check_object_creation(req, self.object_name)
2714+ error_response = check_object_creation(req, self.object_name,
2715+ self.app.max_object_size)
2716 if error_response:
2717 return error_response
2718 conns = []
2719@@ -654,11 +865,50 @@
2720 if k.lower().startswith('x-object-meta-'):
2721 new_req.headers[k] = v
2722 req = new_req
2723+ if req.headers.get('transfer-encoding') == 'chunked' or \
2724+ req.content_length > self.app.segment_size:
2725+ resp = self.PUT_segmented_object(req, data_source, partition,
2726+ nodes, container_partition, containers)
2727+ else:
2728+ resp = self.PUT_whole_object(req, data_source, partition, nodes,
2729+ container_partition, containers)
2730+ if source_header:
2731+ resp.headers['X-Copied-From'] = quote(
2732+ source_header.split('/', 2)[2])
2733+ for k, v in req.headers.items():
2734+ if k.lower().startswith('x-object-meta-'):
2735+ resp.headers[k] = v
2736+ # reset the bytes, since the user didn't actually send anything
2737+ req.bytes_transferred = 0
2738+ resp.last_modified = float(req.headers['X-Timestamp'])
2739+ return resp
2740+
2741+ def PUT_whole_object(self, req, data_source, partition, nodes,
2742+ container_partition=None, containers=None):
2743+ """
2744+ Performs a PUT for a whole object (one with a content-length <=
2745+ self.app.segment_size).
2746+
2747+ :param req: The webob.Request to process.
2748+ :param data_source: An iterator providing the data to store.
2749+ :param partition: The object ring partition the object falls on.
2750+ :param nodes: The object ring nodes the object falls on.
2751+ :param container_partition: The container ring partition the container
2752+ for the object falls on, None if the
2753+ container is not to be updated.
2754+ :param containers: The container ring nodes the container for the
2755+ object falls on, None if the container is not to be
2756+ updated.
2757+ :returns: webob.Response object.
2758+ """
2759+ conns = []
2760+ update_containers = containers is not None
2761 for node in self.iter_nodes(partition, nodes, self.app.object_ring):
2762- container = containers.pop()
2763- req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
2764- req.headers['X-Container-Partition'] = container_partition
2765- req.headers['X-Container-Device'] = container['device']
2766+ if update_containers:
2767+ container = containers.pop()
2768+ req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
2769+ req.headers['X-Container-Partition'] = container_partition
2770+ req.headers['X-Container-Device'] = container['device']
2771 req.headers['Expect'] = '100-continue'
2772 resp = conn = None
2773 if not self.error_limited(node):
2774@@ -676,12 +926,14 @@
2775 if conn and resp:
2776 if resp.status == 100:
2777 conns.append(conn)
2778- if not containers:
2779+ if (update_containers and not containers) or \
2780+ len(conns) == len(nodes):
2781 break
2782 continue
2783 elif resp.status == 507:
2784 self.error_limit(node)
2785- containers.insert(0, container)
2786+ if update_containers:
2787+ containers.insert(0, container)
2788 if len(conns) <= len(nodes) / 2:
2789 self.app.logger.error(
2790 'Object PUT returning 503, %s/%s required connections, '
2791@@ -701,7 +953,7 @@
2792 break
2793 len_chunk = len(chunk)
2794 req.bytes_transferred += len_chunk
2795- if req.bytes_transferred > MAX_FILE_SIZE:
2796+ if req.bytes_transferred > self.app.max_object_size:
2797 return HTTPRequestEntityTooLarge(request=req)
2798 for conn in list(conns):
2799 try:
2800@@ -767,18 +1019,129 @@
2801 statuses.append(503)
2802 reasons.append('')
2803 bodies.append('')
2804- resp = self.best_response(req, statuses, reasons, bodies, 'Object PUT',
2805+ return self.best_response(req, statuses, reasons, bodies, 'Object PUT',
2806 etag=etag)
2807- if source_header:
2808- resp.headers['X-Copied-From'] = quote(
2809- source_header.split('/', 2)[2])
2810- for k, v in req.headers.items():
2811- if k.lower().startswith('x-object-meta-'):
2812- resp.headers[k] = v
2813- # reset the bytes, since the user didn't actually send anything
2814- req.bytes_transferred = 0
2815- resp.last_modified = float(req.headers['X-Timestamp'])
2816- return resp
2817+
2818+ def PUT_segmented_object(self, req, data_source, partition, nodes,
2819+ container_partition, containers):
2820+ """
2821+ Performs a PUT for a segmented object (one with a content-length >
2822+ self.app.segment_size).
2823+
2824+ :param req: The webob.Request to process.
2825+ :param data_source: An iterator providing the data to store.
2826+ :param partition: The object ring partition the object falls on.
2827+ :param nodes: The object ring nodes the object falls on.
2828+ :param container_partition: The container ring partition the container
2829+ for the object falls on.
2830+ :param containers: The container ring nodes the container for the
2831+ object falls on.
2832+ :returns: webob.Response object.
2833+ """
2834+ req.bytes_transferred = 0
2835+ leftover_chunk = [None]
2836+ etag = md5()
2837+
2838+ def segment_iter():
2839+ amount_given = 0
2840+ while amount_given < self.app.segment_size:
2841+ if leftover_chunk[0]:
2842+ chunk = leftover_chunk[0]
2843+ leftover_chunk[0] = None
2844+ else:
2845+ with ChunkReadTimeout(self.app.client_timeout):
2846+ chunk = data_source.next()
2847+ req.bytes_transferred += len(chunk)
2848+ etag.update(chunk)
2849+ if amount_given + len(chunk) > self.app.segment_size:
2850+ yield chunk[:self.app.segment_size - amount_given]
2851+ leftover_chunk[0] = \
2852+ chunk[self.app.segment_size - amount_given:]
2853+ amount_given = self.app.segment_size
2854+ else:
2855+ yield chunk
2856+ amount_given += len(chunk)
2857+
2858+ def segment_iter_iter():
2859+ while True:
2860+ if not leftover_chunk[0]:
2861+ with ChunkReadTimeout(self.app.client_timeout):
2862+ leftover_chunk[0] = data_source.next()
2863+ req.bytes_transferred += len(leftover_chunk[0])
2864+ etag.update(leftover_chunk[0])
2865+ yield segment_iter()
2866+
2867+ segment_number = 0
2868+ chunked = req.headers.get('transfer-encoding') == 'chunked'
2869+ if not chunked:
2870+ amount_left = req.content_length
2871+ headers = {'X-Timestamp': req.headers['X-Timestamp'],
2872+ 'Content-Type': req.headers['content-type'],
2873+ 'X-Object-Type': 'segment'}
2874+ for segment_source in segment_iter_iter():
2875+ if chunked:
2876+ headers['Transfer-Encoding'] = 'chunked'
2877+ if segment_number == 0:
2878+ headers['X-Object-Segment-If-Length'] = \
2879+ self.app.segment_size
2880+ elif amount_left > self.app.segment_size:
2881+ headers['Content-Length'] = self.app.segment_size
2882+ else:
2883+ headers['Content-Length'] = amount_left
2884+ headers['X-Object-Segment'] = segment_number
2885+ segment_req = Request.blank(req.path_info,
2886+ environ={'REQUEST_METHOD': 'PUT'}, headers=headers)
2887+ if 'X-Object-Segment-If-Length' in headers:
2888+ del headers['X-Object-Segment-If-Length']
2889+ if segment_number:
2890+ ring_object_name = '%s/%s/%s' % (self.object_name,
2891+ req.headers['x-timestamp'], segment_number)
2892+ else:
2893+ ring_object_name = self.object_name
2894+ segment_partition, segment_nodes = self.app.object_ring.get_nodes(
2895+ self.account_name, self.container_name, ring_object_name)
2896+ resp = self.PUT_whole_object(segment_req, segment_source,
2897+ segment_partition, segment_nodes)
2898+ if resp.status_int // 100 == 4:
2899+ return resp
2900+ elif resp.status_int // 100 != 2:
2901+ return HTTPServiceUnavailable(request=req,
2902+ body='Unable to complete very large file operation.')
2903+ if segment_number == 0 and \
2904+ req.bytes_transferred < self.app.segment_size:
2905+ return HTTPCreated(request=req, etag=etag.hexdigest())
2906+ if not chunked:
2907+ amount_left -= self.app.segment_size
2908+ segment_number += 1
2909+ etag = etag.hexdigest()
2910+ if 'etag' in req.headers and req.headers['etag'].lower() != etag:
2911+ return HTTPUnprocessableEntity(request=req)
2912+ manifest = {'x-timestamp': req.headers['x-timestamp'],
2913+ 'content-length': req.bytes_transferred,
2914+ 'content-type': req.headers['content-type'],
2915+ 'x-segment-size': self.app.segment_size,
2916+ 'etag': etag}
2917+ if 'content-encoding' in req.headers:
2918+ manifest['content-encoding'] = req.headers['content-encoding']
2919+ manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
2920+ headers = {'X-Timestamp': req.headers['X-Timestamp'],
2921+ 'Content-Type': req.headers['content-type'],
2922+ 'Content-Length': len(manifest),
2923+ 'X-Object-Type': 'manifest',
2924+ 'X-Object-Length': req.bytes_transferred}
2925+ headers.update(i for i in req.headers.iteritems()
2926+ if i[0].lower().startswith('x-object-meta-') and len(i[0]) > 14)
2927+ manifest_req = Request.blank(req.path_info,
2928+ environ={'REQUEST_METHOD': 'PUT'}, body=manifest, headers=headers)
2929+ manifest_source = iter(lambda:
2930+ manifest_req.body_file.read(self.app.client_chunk_size), '')
2931+ resp = self.PUT_whole_object(manifest_req, manifest_source, partition,
2932+ nodes, container_partition=container_partition,
2933+ containers=containers)
2934+ if resp.status_int // 100 != 2:
2935+ return HTTPServiceUnavailable(request=req,
2936+ body='Unable to complete very large file operation.')
2937+ return HTTPCreated(request=req, etag=etag)
2938
2939 @public
2940 @delay_denial
2941@@ -1233,6 +1596,8 @@
2942 if conf is None:
2943 conf = {}
2944 swift_dir = conf.get('swift_dir', '/etc/swift')
2945+ self.max_object_size = int(conf.get('max_object_size', 107374182400))
2946+ self.segment_size = int(conf.get('segment_size', 2147483647))
2947 self.node_timeout = int(conf.get('node_timeout', 10))
2948 self.conn_timeout = float(conf.get('conn_timeout', 0.5))
2949 self.client_timeout = int(conf.get('client_timeout', 60))
2950
2951=== modified file 'test/functional/sample.conf'
2952--- test/functional/sample.conf 2010-09-09 17:24:25 +0000
2953+++ test/functional/sample.conf 2010-11-08 18:51:48 +0000
2954@@ -1,8 +1,15 @@
2955-# sample config
2956 auth_host = 127.0.0.1
2957 auth_port = 11000
2958 auth_ssl = no
2959
2960+# The maximum object size the cluster allows (set in the proxy server's conf)
2961+max_object_size = 107374182400
2962+
2963+# The file segment size for the cluster (set in the proxy server's conf)
2964+# Set to 0 for no segment size testing (recommended if the segment size is
2965+# quite large and you don't want to spend the time testing it)
2966+segment_size = 0
2967+
2968 # Primary functional test account (needs admin access to the account)
2969 account = test
2970 username = tester
2971
2972=== modified file 'test/functional/tests.py'
2973--- test/functional/tests.py 2010-10-29 20:30:34 +0000
2974+++ test/functional/tests.py 2010-11-08 18:51:48 +0000
2975@@ -1092,7 +1092,7 @@
2976 self.assert_(file.read(hdrs={'Range': r}) == data[0:1000])
2977
2978 def testFileSizeLimit(self):
2979- limit = 5*2**30 + 2
2980+ limit = int(config.get('max_object_size', 107374182400))
2981 tsecs = 3
2982
2983 for i in (limit-100, limit-10, limit-1, limit, limit+1, limit+10,
2984
2985=== modified file 'test/probe/common.py'
2986--- test/probe/common.py 2010-09-12 00:03:09 +0000
2987+++ test/probe/common.py 2010-11-08 18:51:48 +0000
2988@@ -88,7 +88,7 @@
2989 for p in ps:
2990 p.wait()
2991 ps = []
2992- for job in ('container-updater', 'object-updater'):
2993+ for job in ('container-updater', 'object-janitor'):
2994 for n in xrange(1, 5):
2995 ps.append(Popen(['swift-%s' % job,
2996 '/etc/swift/%s-server/%d.conf' %
2997
2998=== modified file 'test/probe/test_object_async_update.py'
2999--- test/probe/test_object_async_update.py 2010-09-06 04:06:16 +0000
3000+++ test/probe/test_object_async_update.py 2010-11-08 18:51:48 +0000
3001@@ -55,7 +55,7 @@
3002 self.account, container)[1])
3003 ps = []
3004 for n in xrange(1, 5):
3005- ps.append(Popen(['swift-object-updater',
3006+ ps.append(Popen(['swift-object-janitor',
3007 '/etc/swift/object-server/%d.conf' % n, 'once']))
3008 for p in ps:
3009 p.wait()
3010
3011=== modified file 'test/unit/common/test_constraints.py'
3012--- test/unit/common/test_constraints.py 2010-08-16 22:30:27 +0000
3013+++ test/unit/common/test_constraints.py 2010-11-08 18:51:48 +0000
3014@@ -90,22 +90,20 @@
3015 headers=headers), 'object'), HTTPBadRequest))
3016
3017 def test_check_object_creation_content_length(self):
3018- headers = {'Content-Length': str(constraints.MAX_FILE_SIZE),
3019- 'Content-Type': 'text/plain'}
3020+ headers = {'Content-Length': '1024', 'Content-Type': 'text/plain'}
3021 self.assertEquals(constraints.check_object_creation(Request.blank('/',
3022- headers=headers), 'object_name'), None)
3023- headers = {'Content-Length': str(constraints.MAX_FILE_SIZE + 1),
3024- 'Content-Type': 'text/plain'}
3025+ headers=headers), 'object_name', 1024), None)
3026+ headers = {'Content-Length': '1025', 'Content-Type': 'text/plain'}
3027 self.assert_(isinstance(constraints.check_object_creation(
3028- Request.blank('/', headers=headers), 'object_name'),
3029+ Request.blank('/', headers=headers), 'object_name', 1024),
3030 HTTPRequestEntityTooLarge))
3031 headers = {'Transfer-Encoding': 'chunked',
3032 'Content-Type': 'text/plain'}
3033 self.assertEquals(constraints.check_object_creation(Request.blank('/',
3034- headers=headers), 'object_name'), None)
3035+ headers=headers), 'object_name', 1024), None)
3036 headers = {'Content-Type': 'text/plain'}
3037 self.assert_(isinstance(constraints.check_object_creation(
3038- Request.blank('/', headers=headers), 'object_name'),
3039+ Request.blank('/', headers=headers), 'object_name', 1024),
3040 HTTPLengthRequired))
3041
3042 def test_check_object_creation_name_length(self):
3043
3044=== added file 'test/unit/obj/test_diskfile.py'
3045--- test/unit/obj/test_diskfile.py 1970-01-01 00:00:00 +0000
3046+++ test/unit/obj/test_diskfile.py 2010-11-08 18:51:48 +0000
3047@@ -0,0 +1,203 @@
3048+# Copyright (c) 2010 OpenStack, LLC.
3049+#
3050+# Licensed under the Apache License, Version 2.0 (the "License");
3051+# you may not use this file except in compliance with the License.
3052+# You may obtain a copy of the License at
3053+#
3054+# http://www.apache.org/licenses/LICENSE-2.0
3055+#
3056+# Unless required by applicable law or agreed to in writing, software
3057+# distributed under the License is distributed on an "AS IS" BASIS,
3058+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
3059+# implied.
3060+# See the License for the specific language governing permissions and
3061+# limitations under the License.
3062+
3063+import cPickle as pickle
3064+import os
3065+import sys
3066+import unittest
3067+from nose import SkipTest
3068+from shutil import rmtree
3069+from StringIO import StringIO
3070+from time import time
3071+
3072+from xattr import setxattr
3073+
3074+from swift.common.constraints import PICKLE_PROTOCOL
3075+from swift.common.utils import mkdirs, normalize_timestamp
3076+from swift.obj.diskfile import DiskFile, hash_suffix, JANITORDIR, \
3077+ METADATA_KEY
3078+
3079+
3080+class TestDiskFile(unittest.TestCase):
3081+ """ Test swift.obj.diskfile """
3082+
3083+ def setUp(self):
3084+ """ Set up for testing swift.obj.diskfile """
3085+ self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
3086+ if not self.path_to_test_xfs or \
3087+ not os.path.exists(self.path_to_test_xfs):
3088+ print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
3089+ 'pointing to a valid directory.\n' \
3090+ 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
3091+ 'system for testing.'
3092+ self.testdir = '/tmp/SWIFTUNITTEST'
3093+ else:
3094+ self.testdir = os.path.join(self.path_to_test_xfs,
3095+ 'tmp_test_obj_diskfile')
3096+ mkdirs(self.testdir)
3097+ rmtree(self.testdir)
3098+ mkdirs(os.path.join(self.testdir, 'sda1'))
3099+ mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
3100+
3101+ def tearDown(self):
3102+ """ Tear down for testing swift.obj.diskfile """
3103+ rmtree(self.testdir)
3104+
3105+ def test_disk_file_app_iter_corners(self):
3106+ if not self.path_to_test_xfs:
3107+ raise SkipTest
3108+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3109+ mkdirs(df.datadir)
3110+ f = open(os.path.join(df.datadir,
3111+ normalize_timestamp(time()) + '.data'), 'wb')
3112+ f.write('1234567890')
3113+ setxattr(f.fileno(), METADATA_KEY, pickle.dumps({}, PICKLE_PROTOCOL))
3114+ f.close()
3115+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
3116+ keep_data_fp=True)
3117+ it = df.app_iter_range(0, None)
3118+ sio = StringIO()
3119+ for chunk in it:
3120+ sio.write(chunk)
3121+ self.assertEquals(sio.getvalue(), '1234567890')
3122+
3123+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
3124+ keep_data_fp=True)
3125+ it = df.app_iter_range(5, None)
3126+ sio = StringIO()
3127+ for chunk in it:
3128+ sio.write(chunk)
3129+ self.assertEquals(sio.getvalue(), '67890')
3130+
3131+ def test_disk_file_mkstemp_creates_dir(self):
3132+ tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
3133+ os.rmdir(tmpdir)
3134+ with DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp():
3135+ self.assert_(os.path.exists(tmpdir))
3136+
3137+ def test_hash_suffix_creates_janitor_jobs(self):
3138+ if not self.path_to_test_xfs:
3139+ raise SkipTest
3140+ # Ensure the janitor job we expect to create is not there right now.
3141+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3142+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3143+ self.assert_(df.is_deleted())
3144+ # Set up manifest file to be tombstoned
3145+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3146+ with df.mkstemp() as (fd, tmppath):
3147+ os.write(fd, pickle.dumps({'x-timestamp': normalize_timestamp(1),
3148+ 'content-length': 1234, 'content-type': 'text/plain',
3149+ 'x-segment-size': 123,
3150+ 'etag': 'd41d8cd98f00b204e9800998ecf8427e'},
3151+ protocol=PICKLE_PROTOCOL))
3152+ df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(2),
3153+ 'X-Object-Type': 'manifest'})
3154+ # Make tombstone DiskFile didn't create, such as one rsynced over.
3155+ open(os.path.join(df.datadir, normalize_timestamp(3) + '.ts'), 'wb')
3156+ # Finally, we call what we want to test.
3157+ hash_suffix(os.path.dirname(df.datadir), 604800)
3158+ # Ensure the janitor job got created.
3159+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3160+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3161+ self.assert_(not df.is_deleted())
3162+
3163+ def test_segment_info_overrides_datadir(self):
3164+ if not self.path_to_test_xfs:
3165+ raise SkipTest
3166+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3167+ datadir1 = df.datadir.split('/')[6]
3168+ df = DiskFile(self.testdir, 'sda1', '1', 'a', 'c', 'o')
3169+ datadir2 = df.datadir.split('/')[6]
3170+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', segment=0)
3171+ datadir3 = df.datadir.split('/')[6]
3172+ self.assertEquals(datadir1, datadir2)
3173+ self.assertNotEquals(datadir1, datadir3)
3174+
3175+ def test_no_longer_segment(self):
3176+ if not self.path_to_test_xfs:
3177+ raise SkipTest
3178+ # Normal case
3179+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3180+ datadir1 = df.datadir
3181+ with df.mkstemp() as (fd, tmppath):
3182+ df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)})
3183+ self.assert_(os.path.exists(datadir1))
3184+ # Normal case with no_longer_segment (doesn't make sense, but shouldn't
3185+ # blow up)
3186+ df = DiskFile(self.testdir, 'sda1', '1', 'a', 'c', 'o')
3187+ datadir1 = df.datadir
3188+ with df.mkstemp() as (fd, tmppath):
3189+ df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)},
3190+ no_longer_segment=True)
3191+ self.assert_(os.path.exists(datadir1))
3192+ # Segment case
3193+ df = DiskFile(self.testdir, 'sda1', '2', 'a', 'c', 'o', segment=0)
3194+ datadir1 = df.datadir
3195+ with df.mkstemp() as (fd, tmppath):
3196+ df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)})
3197+ self.assert_(os.path.exists(datadir1))
3198+ # Segment case with no_longer_segment
3199+ df = DiskFile(self.testdir, 'sda1', '3', 'a', 'c', 'o')
3200+ normal_datadir1 = df.datadir
3201+ self.assert_(not os.path.exists(normal_datadir1))
3202+ df = DiskFile(self.testdir, 'sda1', '3', 'a', 'c', 'o', segment=0)
3203+ datadir1 = df.datadir
3204+ with df.mkstemp() as (fd, tmppath):
3205+ df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)},
3206+ no_longer_segment=True)
3207+ self.assert_(not os.path.exists(datadir1))
3208+ self.assert_(os.path.exists(normal_datadir1))
3209+
3210+ def test_tombstone(self):
3211+ if not self.path_to_test_xfs:
3212+ raise SkipTest
3213+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3214+ with df.mkstemp() as (fd, tmppath):
3215+ df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)})
3216+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3217+ self.assert_(not df.is_deleted())
3218+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3219+ df.tombstone(normalize_timestamp(2))
3220+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3221+ self.assert_(df.is_deleted())
3222+
3223+ def test_store_janitor_container_update(self):
3224+ if not self.path_to_test_xfs:
3225+ raise SkipTest
3226+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
3227+ datadir=JANITORDIR)
3228+ df.store_janitor_container_update('PUT', 'a', 'c', 'o',
3229+ {'X-Timestamp': normalize_timestamp(1)}, [])
3230+ df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
3231+ datadir=JANITORDIR)
3232+ self.assertEquals(pickle.load(open(df.data_file, 'rb')),
3233+ {'op': 'PUT', 'account': 'a', 'container': 'c', 'obj': 'o',
3234+ 'headers': {'X-Timestamp': '0000000001.00000'}, 'successes': []})
3235+
3236+ def test_store_janitor_segment_cleanup(self):
3237+ if not self.path_to_test_xfs:
3238+ raise SkipTest
3239+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3240+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3241+ df.store_janitor_segment_cleanup('a', 'c', 'o', 123, 45)
3242+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3243+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3244+ self.assertEquals(pickle.load(open(df.data_file, 'rb')),
3245+ {'account': 'a', 'container': 'c', 'obj': 'o',
3246+ 'segment_count': 123, 'segment_last_deleted': 45})
3247+
3248+
3249+if __name__ == '__main__':
3250+ unittest.main()
3251
3252=== added file 'test/unit/obj/test_janitor.py'
3253--- test/unit/obj/test_janitor.py 1970-01-01 00:00:00 +0000
3254+++ test/unit/obj/test_janitor.py 2010-11-08 18:51:48 +0000
3255@@ -0,0 +1,433 @@
3256+# Copyright (c) 2010 OpenStack, LLC.
3257+#
3258+# Licensed under the Apache License, Version 2.0 (the "License");
3259+# you may not use this file except in compliance with the License.
3260+# You may obtain a copy of the License at
3261+#
3262+# http://www.apache.org/licenses/LICENSE-2.0
3263+#
3264+# Unless required by applicable law or agreed to in writing, software
3265+# distributed under the License is distributed on an "AS IS" BASIS,
3266+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
3267+# implied.
3268+# See the License for the specific language governing permissions and
3269+# limitations under the License.
3270+
3271+import cPickle as pickle
3272+import os
3273+import sys
3274+import traceback
3275+import unittest
3276+from gzip import GzipFile
3277+from nose import SkipTest
3278+from shutil import rmtree
3279+from time import time
3280+
3281+from eventlet import spawn, TimeoutError, listen
3282+from eventlet.timeout import Timeout
3283+
3284+from swift.common.constraints import PICKLE_PROTOCOL
3285+from swift.common.ring import Ring, RingData
3286+from swift.common import utils
3287+from swift.common.utils import hash_path, normalize_timestamp, mkdirs
3288+from swift.obj import janitor as object_janitor
3289+from swift.obj.diskfile import DiskFile, JANITORDIR
3290+
3291+
3292+class TestObjectJanitor(unittest.TestCase):
3293+
3294+ def setUp(self):
3295+ utils.HASH_PATH_SUFFIX = 'endcap'
3296+ self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
3297+ if not self.path_to_test_xfs or \
3298+ not os.path.exists(self.path_to_test_xfs):
3299+ print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
3300+ 'pointing to a valid directory.\n' \
3301+ 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
3302+ 'system for testing.'
3303+ self.testdir = '/tmp/SWIFTUNITTEST'
3304+ else:
3305+ self.testdir = os.path.join(self.path_to_test_xfs,
3306+ 'tmp_test_object_server_ObjectController')
3307+ rmtree(self.testdir, ignore_errors=1)
3308+ os.mkdir(self.testdir)
3309+ pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
3310+ [{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
3311+ 'zone': 0},
3312+ {'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
3313+ 'zone': 2}], 30),
3314+ GzipFile(os.path.join(self.testdir, 'object.ring.gz'), 'wb'))
3315+ self.object_ring = Ring(os.path.join(self.testdir, 'object.ring.gz'))
3316+ pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
3317+ [{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
3318+ 'zone': 0},
3319+ {'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
3320+ 'zone': 2}], 30),
3321+ GzipFile(os.path.join(self.testdir, 'container.ring.gz'), 'wb'))
3322+ self.container_ring = \
3323+ Ring(os.path.join(self.testdir, 'container.ring.gz'))
3324+ self.devices_dir = os.path.join(self.testdir, 'devices')
3325+ os.mkdir(self.devices_dir)
3326+ self.sda1 = os.path.join(self.devices_dir, 'sda1')
3327+ os.mkdir(self.sda1)
3328+ os.mkdir(os.path.join(self.sda1, 'tmp'))
3329+
3330+ def tearDown(self):
3331+ rmtree(self.testdir, ignore_errors=1)
3332+
3333+ def test_creation(self):
3334+ if not self.path_to_test_xfs:
3335+ raise SkipTest
3336+ janitor = object_janitor.ObjectJanitor({
3337+ 'devices': self.devices_dir,
3338+ 'mount_check': 'false',
3339+ 'swift_dir': self.testdir,
3340+ 'interval': '1',
3341+ 'concurrency': '2',
3342+ 'node_timeout': '5',
3343+ })
3344+ self.assert_(hasattr(janitor, 'logger'))
3345+ self.assert_(janitor.logger is not None)
3346+ self.assertEquals(janitor.devices, self.devices_dir)
3347+ self.assertEquals(janitor.interval, 1)
3348+ self.assertEquals(janitor.concurrency, 2)
3349+ self.assertEquals(janitor.node_timeout, 5)
3350+ self.assert_(janitor.get_container_ring() is not None)
3351+
3352+ def test_run_once_container_update(self):
3353+ if not self.path_to_test_xfs:
3354+ raise SkipTest
3355+ janitor = object_janitor.ObjectJanitor({
3356+ 'devices': self.devices_dir,
3357+ 'mount_check': 'false',
3358+ 'swift_dir': self.testdir,
3359+ 'interval': '1',
3360+ 'concurrency': '1',
3361+ 'node_timeout': '15',
3362+ })
3363+ janitor.run_once()
3364+ janitor_dir = os.path.join(self.sda1, JANITORDIR)
3365+ os.mkdir(janitor_dir)
3366+ janitor.run_once()
3367+ self.assert_(os.path.exists(janitor_dir))
3368+
3369+ disk_file = DiskFile(self.devices_dir, 'sda1',
3370+ str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c',
3371+ 'o', datadir=JANITORDIR)
3372+ ts = normalize_timestamp(1)
3373+ with disk_file.mkstemp() as (fd, tmppath):
3374+ os.write(fd, pickle.dumps({'op': 'PUT',
3375+ 'account': 'a', 'container': 'c', 'obj': 'o',
3376+ 'headers': {'X-Container-Timestamp': ts}},
3377+ PICKLE_PROTOCOL))
3378+ disk_file.put(fd, tmppath,
3379+ {'X-Op': 'Container-Update', 'X-Timestamp': ts})
3380+ janitor.run_once()
3381+ self.assert_(os.path.exists(os.path.join(disk_file.datadir,
3382+ ts + '.data')))
3383+
3384+ bindsock = listen(('127.0.0.1', 0))
3385+
3386+ def accepter(sock, return_code):
3387+ try:
3388+ with Timeout(3):
3389+ inc = sock.makefile('rb')
3390+ out = sock.makefile('wb')
3391+ out.write('HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
3392+ return_code)
3393+ out.flush()
3394+ self.assertEquals(inc.readline(),
3395+ 'PUT /sda1/0/a/c/o HTTP/1.1\r\n')
3396+ headers = {}
3397+ line = inc.readline()
3398+ while line and line != '\r\n':
3399+ headers[line.split(':')[0].lower()] = \
3400+ line.split(':')[1].strip()
3401+ line = inc.readline()
3402+ self.assert_('x-container-timestamp' in headers)
3403+ except BaseException, err:
3404+ return err
3405+ return None
3406+
3407+ def accept(return_codes):
3408+ codes = iter(return_codes)
3409+ try:
3410+ events = []
3411+ for x in xrange(len(return_codes)):
3412+ with Timeout(3):
3413+ sock, addr = bindsock.accept()
3414+ events.append(
3415+ spawn(accepter, sock, codes.next()))
3416+ for event in events:
3417+ err = event.wait()
3418+ if err:
3419+ raise err
3420+ except BaseException, err:
3421+ return err
3422+ return None
3423+
3424+ event = spawn(accept, [201, 500])
3425+ for dev in janitor.get_container_ring().devs:
3426+ if dev is not None:
3427+ dev['port'] = bindsock.getsockname()[1]
3428+ janitor.run_once()
3429+ err = event.wait()
3430+ if err:
3431+ raise err
3432+ disk_file = DiskFile(self.devices_dir, 'sda1',
3433+ str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c',
3434+ 'o', datadir=JANITORDIR)
3435+ self.assert_(not disk_file.is_deleted())
3436+ event = spawn(accept, [201])
3437+ janitor.run_once()
3438+ err = event.wait()
3439+ if err:
3440+ raise err
3441+ disk_file = DiskFile(self.devices_dir, 'sda1',
3442+ str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c',
3443+ 'o', datadir=JANITORDIR)
3444+ self.assert_(disk_file.is_deleted())
3445+
3446+ def _segment_cleanup_in_progress_helper(self, statuses, expect_success):
3447+ if not self.path_to_test_xfs:
3448+ raise SkipTest
3449+ janitor = object_janitor.ObjectJanitor({'devices': self.devices_dir,
3450+ 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1',
3451+ 'concurrency': '1', 'node_timeout': '15'})
3452+
3453+ # Quick test of connection refusals
3454+ df = DiskFile(self.devices_dir, 'sda1',
3455+ str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
3456+ 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o',
3457+ datadir=JANITORDIR)
3458+ df.store_janitor_segment_cleanup('a', 'c', 'o', 2, 0)
3459+ janitor.run_once()
3460+ df = DiskFile(self.devices_dir, 'sda1',
3461+ str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
3462+ 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o',
3463+ datadir=JANITORDIR)
3464+ self.assert_(not df.is_deleted())
3465+
3466+ bindsock = listen(('127.0.0.1', 0))
3467+ janitor.port = bindsock.getsockname()[1]
3468+
3469+ def accepter(sock, return_code):
3470+ try:
3471+ with Timeout(3):
3472+ inc = sock.makefile('rb')
3473+ out = sock.makefile('wb')
3474+ out.write('HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
3475+ return_code)
3476+ out.flush()
3477+ self.assertEquals(inc.readline(),
3478+ 'DELETE /sda1/2/a/c/o HTTP/1.1\r\n')
3479+ headers = {}
3480+ line = inc.readline()
3481+ while line and line != '\r\n':
3482+ headers[line.split(':')[0].lower()] = \
3483+ line.split(':')[1].strip()
3484+ line = inc.readline()
3485+ self.assert_('x-object-segment-timestamp' in headers)
3486+ self.assertEquals(headers.get('x-object-segment'), '1')
3487+ except BaseException, err:
3488+ return err
3489+ return None
3490+
3491+ def accept(return_codes):
3492+ codes = iter(return_codes)
3493+ try:
3494+ events = []
3495+ for x in xrange(len(return_codes)):
3496+ with Timeout(3):
3497+ sock, addr = bindsock.accept()
3498+ events.append(
3499+ spawn(accepter, sock, codes.next()))
3500+ for event in events:
3501+ err = event.wait()
3502+ if err:
3503+ raise err
3504+ except BaseException, err:
3505+ return err
3506+ return None
3507+
3508+ event = spawn(accept, statuses)
3509+ for dev in janitor.get_object_ring().devs:
3510+ if dev is not None:
3511+ dev['port'] = bindsock.getsockname()[1]
3512+ janitor.run_once()
3513+ err = event.wait()
3514+ if err:
3515+ raise err
3516+ df = DiskFile(self.devices_dir, 'sda1',
3517+ str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
3518+ 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o',
3519+ datadir=JANITORDIR)
3520+ self.assertEquals(df.is_deleted(), expect_success)
3521+
3522+ def test_segment_cleanup_in_progress_happy_path(self):
3523+ self._segment_cleanup_in_progress_helper([204, 204], True)
3524+
3525+ def test_segment_cleanup_in_progress_one_failure(self):
3526+ self._segment_cleanup_in_progress_helper([204, 500], True)
3527+
3528+ def test_segment_cleanup_in_progress_all_failures(self):
3529+ self._segment_cleanup_in_progress_helper([500, 500], False)
3530+
3531+ def test_segment_cleanup_in_progress_all_not_found(self):
3532+ self._segment_cleanup_in_progress_helper([404, 404], True)
3533+
3534+ def test_segment_cleanup_in_progress_one_not_found_one_success(self):
3535+ self._segment_cleanup_in_progress_helper([404, 204], True)
3536+
3537+ def test_segment_cleanup_in_progress_one_not_found_one_failure(self):
3538+ self._segment_cleanup_in_progress_helper([404, 500], False)
3539+
3540+ def _segment_cleanup_fresh_start_helper(self, cleanup_timestamp,
3541+ existing_timestamp, statuses, expect_success):
3542+ if not self.path_to_test_xfs:
3543+ raise SkipTest
3544+ janitor = object_janitor.ObjectJanitor({'devices': self.devices_dir,
3545+ 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1',
3546+ 'concurrency': '1', 'node_timeout': '15', 'segments_per_pass': 2})
3547+
3548+ # Quick test of connection refusals
3549+ df = DiskFile(self.devices_dir, 'sda1',
3550+ str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
3551+ 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o',
3552+ datadir=JANITORDIR)
3553+ df.store_janitor_segment_cleanup('a', 'c', 'o', None, None)
3554+ janitor.run_once()
3555+ df = DiskFile(self.devices_dir, 'sda1',
3556+ str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
3557+ 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o',
3558+ datadir=JANITORDIR)
3559+ self.assert_(not df.is_deleted())
3560+
3561+ bindsock = listen(('127.0.0.1', 0))
3562+ janitor.port = bindsock.getsockname()[1]
3563+
3564+ def accepter(sock, return_code):
3565+ try:
3566+ with Timeout(3):
3567+ inc = sock.makefile('rb')
3568+ out = sock.makefile('wb')
3569+ request = inc.readline()
3570+ if request.startswith('GET '):
3571+ self.assert_(request, 'GET /sda1/0/a/c/o HTTP/1.1\r\n')
3572+ if return_code == 200:
3573+ pickl = pickle.dumps({'x-timestamp':
3574+ normalize_timestamp(existing_timestamp)})
3575+ out.write('HTTP/1.1 %d OK\r\nContent-Length: '
3576+ '%s\r\nX-Object-Type: manifest\r\n\r\n' %
3577+ (return_code, len(pickl)))
3578+ out.write(pickl)
3579+ out.flush()
3580+ else:
3581+ out.write('HTTP/1.1 %d OK\r\nContent-Length: '
3582+ '0\r\n\r\n' % return_code)
3583+ out.flush()
3584+ return None
3585+ if request.startswith('HEAD '):
3586+ self.assert_(request,
3587+ 'HEAD /sda1/2/a/c/o HTTP/1.1\r\n')
3588+ out.write('HTTP/1.1 %d OK\r\nContent-Length: '
3589+ '0\r\n\r\n' % return_code)
3590+ out.flush()
3591+ return None
3592+ else:
3593+ self.assert_(request,
3594+ 'DELETE /sda1/2/a/c/o HTTP/1.1\r\n')
3595+ out.write(
3596+ 'HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
3597+ return_code)
3598+ out.flush()
3599+ headers = {}
3600+ line = inc.readline()
3601+ while line and line != '\r\n':
3602+ headers[line.split(':')[0].lower()] = \
3603+ line.split(':')[1].strip()
3604+ line = inc.readline()
3605+ self.assert_('x-object-segment' in headers)
3606+ self.assert_('x-object-segment-timestamp' in headers)
3607+ except BaseException, err:
3608+ traceback.print_exc()
3609+ return err
3610+ return None
3611+
3612+ def accept(return_codes):
3613+ codes = iter(return_codes)
3614+ try:
3615+ events = []
3616+ for x in xrange(len(return_codes)):
3617+ with Timeout(3):
3618+ sock, addr = bindsock.accept()
3619+ events.append(
3620+ spawn(accepter, sock, codes.next()))
3621+ for event in events:
3622+ err = event.wait()
3623+ if err:
3624+ raise err
3625+ except BaseException, err:
3626+ return err
3627+ return None
3628+
3629+ event = spawn(accept, statuses)
3630+ for dev in janitor.get_object_ring().devs:
3631+ if dev is not None:
3632+ dev['port'] = bindsock.getsockname()[1]
3633+ janitor.run_once()
3634+ err = event.wait()
3635+ if err:
3636+ raise err
3637+ df = DiskFile(self.devices_dir, 'sda1',
3638+ str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
3639+ 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o',
3640+ datadir=JANITORDIR)
3641+ self.assertEquals(df.is_deleted(), expect_success)
3642+
3643+ def test_segment_cleanup_fresh_start_happy_path(self):
3644+ self._segment_cleanup_fresh_start_helper(normalize_timestamp(1),
3645+ normalize_timestamp(1),
3646+ [404, 404, # Check for manifest
3647+ 200, 200, 404, 404, # Check for segments (2 total)
3648+ 204, 204, 204, 204], # Delete segments
3649+ True)
3650+
3651+ def test_segment_cleanup_fresh_start_manifest_exists(self):
3652+ t = time()
3653+ self._segment_cleanup_fresh_start_helper(normalize_timestamp(t),
3654+ normalize_timestamp(t), [200, 200], True)
3655+
3656+ def test_segment_cleanup_fresh_start_old_manifest_exists(self):
3657+ self._segment_cleanup_fresh_start_helper(normalize_timestamp(2),
3658+ normalize_timestamp(1),
3659+ [200, 200, # Check for manifest
3660+ 200, 200, 404, 404, # Check for segments (2 total)
3661+ 204, 204, 204, 204], # Delete segments
3662+ True)
3663+
3664+ def test_segment_cleanup_fresh_start_old_manifest_exists2(self):
3665+ self._segment_cleanup_fresh_start_helper(normalize_timestamp(time()),
3666+ normalize_timestamp(1), [200, 200], False)
3667+
3668+ def test_segment_cleanup_fresh_start_new_manifest_exists(self):
3669+ t = time()
3670+ self._segment_cleanup_fresh_start_helper(normalize_timestamp(t - 1),
3671+ normalize_timestamp(t),
3672+ [200, # Check for manifest
3673+ 200, 200, 404, 404, # Check for segments (2 total)
3674+ 204, 204, 204, 204], # Delete segments
3675+ True)
3676+
3677+ def test_segment_cleanup_fresh_start_stops_at_segments_per_pass(self):
3678+ self._segment_cleanup_fresh_start_helper(normalize_timestamp(1),
3679+ normalize_timestamp(1),
3680+ [404, 404, # Check for manifest
3681+ 200, 200, 200, 404, 404, # Check for segments (3 total)
3682+ 204, 204, 204, 204], # Delete segments (only 2 expected)
3683+ False)
3684+
3685+
3686+
3687+if __name__ == '__main__':
3688+ unittest.main()
3689
3690=== modified file 'test/unit/obj/test_server.py'
3691--- test/unit/obj/test_server.py 2010-10-13 21:26:43 +0000
3692+++ test/unit/obj/test_server.py 2010-11-08 18:51:48 +0000
3693@@ -13,7 +13,7 @@
3694 # See the License for the specific language governing permissions and
3695 # limitations under the License.
3696
3697-""" Tests for swift.object_server """
3698+""" Tests for swift.obj.server """
3699
3700 import cPickle as pickle
3701 import os
3702@@ -22,23 +22,25 @@
3703 from nose import SkipTest
3704 from shutil import rmtree
3705 from StringIO import StringIO
3706-from time import gmtime, sleep, strftime, time
3707+from time import gmtime, strftime, time
3708
3709 from eventlet import sleep, spawn, wsgi, listen
3710 from webob import Request
3711-from xattr import getxattr, setxattr
3712+from xattr import getxattr
3713
3714 from test.unit import connect_tcp, readuntil2crlfs
3715+from swift.common.constraints import PICKLE_PROTOCOL
3716 from swift.obj import server as object_server
3717+from swift.obj.diskfile import DATADIR, DiskFile, JANITORDIR, METADATA_KEY
3718 from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
3719 NullLogger, storage_directory
3720
3721
3722 class TestObjectController(unittest.TestCase):
3723- """ Test swift.object_server.ObjectController """
3724+ """ Test swift.obj.server.ObjectController """
3725
3726 def setUp(self):
3727- """ Set up for testing swift.object_server.ObjectController """
3728+ """ Set up for testing swift.obj.server.ObjectController """
3729 self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
3730 if not self.path_to_test_xfs or \
3731 not os.path.exists(self.path_to_test_xfs):
3732@@ -49,7 +51,7 @@
3733 self.testdir = '/tmp/SWIFTUNITTEST'
3734 else:
3735 self.testdir = os.path.join(self.path_to_test_xfs,
3736- 'tmp_test_object_server_ObjectController')
3737+ 'tmp_test_obj_server_ObjectController')
3738 mkdirs(self.testdir)
3739 rmtree(self.testdir)
3740 mkdirs(os.path.join(self.testdir, 'sda1'))
3741@@ -59,11 +61,11 @@
3742 self.object_controller.bytes_per_sync = 1
3743
3744 def tearDown(self):
3745- """ Tear down for testing swift.object_server.ObjectController """
3746+ """ Tear down for testing swift.obj.server.ObjectController """
3747 rmtree(self.testdir)
3748
3749 def test_POST_update_meta(self):
3750- """ Test swift.object_server.ObjectController.POST """
3751+ """ Test swift.obj.server.ObjectController.POST """
3752 if not self.path_to_test_xfs:
3753 raise SkipTest
3754 timestamp = normalize_timestamp(time())
3755@@ -221,12 +223,11 @@
3756 resp = self.object_controller.PUT(req)
3757 self.assertEquals(resp.status_int, 201)
3758 objfile = os.path.join(self.testdir, 'sda1',
3759- storage_directory(object_server.DATADIR, 'p',
3760- hash_path('a', 'c', 'o')),
3761+ storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
3762 timestamp + '.data')
3763 self.assert_(os.path.isfile(objfile))
3764 self.assertEquals(open(objfile).read(), 'VERIFY')
3765- self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
3766+ self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)),
3767 {'X-Timestamp': timestamp,
3768 'Content-Length': '6',
3769 'ETag': '0b4c12d7e0a73840c1c4f148fda3b037',
3770@@ -253,12 +254,11 @@
3771 resp = self.object_controller.PUT(req)
3772 self.assertEquals(resp.status_int, 201)
3773 objfile = os.path.join(self.testdir, 'sda1',
3774- storage_directory(object_server.DATADIR, 'p',
3775- hash_path('a', 'c', 'o')),
3776+ storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
3777 timestamp + '.data')
3778 self.assert_(os.path.isfile(objfile))
3779 self.assertEquals(open(objfile).read(), 'VERIFY TWO')
3780- self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
3781+ self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)),
3782 {'X-Timestamp': timestamp,
3783 'Content-Length': '10',
3784 'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',
3785@@ -299,12 +299,11 @@
3786 resp = self.object_controller.PUT(req)
3787 self.assertEquals(resp.status_int, 201)
3788 objfile = os.path.join(self.testdir, 'sda1',
3789- storage_directory(object_server.DATADIR, 'p',
3790- hash_path('a', 'c', 'o')),
3791+ storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
3792 timestamp + '.data')
3793 self.assert_(os.path.isfile(objfile))
3794 self.assertEquals(open(objfile).read(), 'VERIFY THREE')
3795- self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),
3796+ self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)),
3797 {'X-Timestamp': timestamp,
3798 'Content-Length': '12',
3799 'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',
3800@@ -375,7 +374,7 @@
3801 object_server.http_connect = old_http_connect
3802
3803 def test_HEAD(self):
3804- """ Test swift.object_server.ObjectController.HEAD """
3805+ """ Test swift.obj.server.ObjectController.HEAD """
3806 if not self.path_to_test_xfs:
3807 raise SkipTest
3808 req = Request.blank('/sda1/p/a/c')
3809@@ -410,8 +409,7 @@
3810 self.assertEquals(resp.headers['x-object-meta-two'], 'Two')
3811
3812 objfile = os.path.join(self.testdir, 'sda1',
3813- storage_directory(object_server.DATADIR, 'p',
3814- hash_path('a', 'c', 'o')),
3815+ storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
3816 timestamp + '.data')
3817 os.unlink(objfile)
3818 req = Request.blank('/sda1/p/a/c/o')
3819@@ -442,7 +440,7 @@
3820 self.assertEquals(resp.status_int, 404)
3821
3822 def test_GET(self):
3823- """ Test swift.object_server.ObjectController.GET """
3824+ """ Test swift.obj.server.ObjectController.GET """
3825 if not self.path_to_test_xfs:
3826 raise SkipTest
3827 req = Request.blank('/sda1/p/a/c')
3828@@ -500,8 +498,7 @@
3829 self.assertEquals(resp.headers['content-length'], '2')
3830
3831 objfile = os.path.join(self.testdir, 'sda1',
3832- storage_directory(object_server.DATADIR, 'p',
3833- hash_path('a', 'c', 'o')),
3834+ storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
3835 timestamp + '.data')
3836 os.unlink(objfile)
3837 req = Request.blank('/sda1/p/a/c/o')
3838@@ -712,7 +709,7 @@
3839 self.assertEquals(resp.status_int, 200)
3840
3841 def test_DELETE(self):
3842- """ Test swift.object_server.ObjectController.DELETE """
3843+ """ Test swift.obj.server.ObjectController.DELETE """
3844 if not self.path_to_test_xfs:
3845 raise SkipTest
3846 req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'})
3847@@ -751,8 +748,7 @@
3848 resp = self.object_controller.DELETE(req)
3849 self.assertEquals(resp.status_int, 204)
3850 objfile = os.path.join(self.testdir, 'sda1',
3851- storage_directory(object_server.DATADIR, 'p',
3852- hash_path('a', 'c', 'o')),
3853+ storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
3854 timestamp + '.ts')
3855 self.assert_(os.path.isfile(objfile))
3856
3857@@ -764,13 +760,12 @@
3858 resp = self.object_controller.DELETE(req)
3859 self.assertEquals(resp.status_int, 204)
3860 objfile = os.path.join(self.testdir, 'sda1',
3861- storage_directory(object_server.DATADIR, 'p',
3862- hash_path('a', 'c', 'o')),
3863+ storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
3864 timestamp + '.ts')
3865 self.assert_(os.path.isfile(objfile))
3866
3867 def test_call(self):
3868- """ Test swift.object_server.ObjectController.__call__ """
3869+ """ Test swift.obj.server.ObjectController.__call__ """
3870 inbuf = StringIO()
3871 errbuf = StringIO()
3872 outbuf = StringIO()
3873@@ -886,39 +881,6 @@
3874 resp = self.object_controller.PUT(req)
3875 self.assertEquals(resp.status_int, 400)
3876
3877- def test_disk_file_app_iter_corners(self):
3878- if not self.path_to_test_xfs:
3879- raise SkipTest
3880- df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
3881- mkdirs(df.datadir)
3882- f = open(os.path.join(df.datadir,
3883- normalize_timestamp(time()) + '.data'), 'wb')
3884- f.write('1234567890')
3885- setxattr(f.fileno(), object_server.METADATA_KEY,
3886- pickle.dumps({}, object_server.PICKLE_PROTOCOL))
3887- f.close()
3888- df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
3889- keep_data_fp=True)
3890- it = df.app_iter_range(0, None)
3891- sio = StringIO()
3892- for chunk in it:
3893- sio.write(chunk)
3894- self.assertEquals(sio.getvalue(), '1234567890')
3895-
3896- df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
3897- keep_data_fp=True)
3898- it = df.app_iter_range(5, None)
3899- sio = StringIO()
3900- for chunk in it:
3901- sio.write(chunk)
3902- self.assertEquals(sio.getvalue(), '67890')
3903-
3904- def test_disk_file_mkstemp_creates_dir(self):
3905- tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
3906- os.rmdir(tmpdir)
3907- with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp():
3908- self.assert_(os.path.exists(tmpdir))
3909-
3910 def test_max_upload_time(self):
3911 if not self.path_to_test_xfs:
3912 raise SkipTest
3913@@ -1006,6 +968,161 @@
3914 self.assertEquals(resp.status_int, 200)
3915 self.assertEquals(resp.headers['content-encoding'], 'gzip')
3916
3917+ def test_overwritten_manifest(self):
3918+ """
3919+ Ensures a janitor segment cleanup job is created for an overwritten
3920+ manifest.
3921+ """
3922+ if not self.path_to_test_xfs:
3923+ raise SkipTest
3924+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3925+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3926+ self.assert_(df.is_deleted())
3927+
3928+ manifest = {'x-timestamp': normalize_timestamp(1),
3929+ 'content-length': 100, 'content-type': 'text/plain',
3930+ 'x-segment-size': 10, 'etag': 'd41d8cd98f00b204e9800998ecf8427e'}
3931+ manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
3932+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
3933+ headers={'X-Timestamp': normalize_timestamp(1),
3934+ 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain',
3935+ 'X-Object-Type': 'manifest', 'X-Object-Length': '100'},
3936+ body=manifest)
3937+ resp = self.object_controller.PUT(req)
3938+ self.assertEquals(resp.status_int, 201)
3939+
3940+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
3941+ headers={'X-Timestamp': normalize_timestamp(2),
3942+ 'Content-Length': '1', 'Content-Type': 'text/plain'}, body=' ')
3943+ resp = self.object_controller.PUT(req)
3944+ self.assertEquals(resp.status_int, 201)
3945+
3946+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3947+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3948+ self.assert_(not df.is_deleted())
3949+
3950+ def test_segmented_put(self):
3951+ if not self.path_to_test_xfs:
3952+ raise SkipTest
3953+
3954+ # Ensure the janitor cleanup job doesn't exist yet
3955+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3956+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3957+ self.assert_(df.is_deleted())
3958+
3959+ # Put the first segment
3960+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
3961+ headers={'X-Timestamp': normalize_timestamp(1),
3962+ 'Content-Length': '1', 'Content-Type': 'text/plain',
3963+ 'X-Object-Type': 'segment', 'X-Object-Segment': '0',
3964+ 'X-Object-Segment-If-Length': '1'}, body='1')
3965+ resp = self.object_controller.PUT(req)
3966+ self.assertEquals(resp.status_int, 201)
3967+
3968+ # Ensure the janitor cleanup job now exists
3969+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3970+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3971+ self.assert_(not df.is_deleted())
3972+
3973+ # Second segment would go to a different node
3974+
3975+ # Put the manifest
3976+ manifest = {'x-timestamp': normalize_timestamp(1),
3977+ 'content-length': 2, 'content-type': 'text/plain',
3978+ 'x-segment-size': 1, 'etag': 'c20ad4d76fe97759aa27a0c99bff6710'}
3979+ manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
3980+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
3981+ headers={'X-Timestamp': normalize_timestamp(1),
3982+ 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain',
3983+ 'X-Object-Type': 'manifest', 'X-Object-Length': '2'},
3984+ body=manifest)
3985+ resp = self.object_controller.PUT(req)
3986+ self.assertEquals(resp.status_int, 201)
3987+
3988+ # The janitor cleanup job should still exist (only the janitor will
3989+ # verify the manifest is in place an remove the job).
3990+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
3991+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
3992+ self.assert_(not df.is_deleted())
3993+
3994+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'})
3995+ resp = self.object_controller.GET(req)
3996+ self.assertEquals(resp.status_int, 200)
3997+ self.assertEquals(resp.body, manifest)
3998+
3999+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'},
4000+ headers={'X-Object-Segment': '0',
4001+ 'X-Object-Segment-Timestamp': normalize_timestamp(1)})
4002+ resp = self.object_controller.GET(req)
4003+ self.assertEquals(resp.status_int, 200)
4004+ self.assertEquals(resp.body, '1')
4005+
4006+ def test_segmented_put_no_longer(self):
4007+ if not self.path_to_test_xfs:
4008+ raise SkipTest
4009+
4010+ # Ensure the janitor cleanup job doesn't exist to begin with
4011+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
4012+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
4013+ self.assert_(df.is_deleted())
4014+
4015+ # Put the first segment, that really ends up a whole object
4016+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
4017+ headers={'X-Timestamp': normalize_timestamp(1),
4018+ 'Content-Length': '1', 'Content-Type': 'text/plain',
4019+ 'X-Object-Type': 'segment', 'X-Object-Segment': '0',
4020+ 'X-Object-Segment-If-Length': '2'}, body='1')
4021+ resp = self.object_controller.PUT(req)
4022+ self.assertEquals(resp.status_int, 201)
4023+
4024+ # Ensure the janitor cleanup job doesn't exist since we put a whole
4025+ # file, not a segment (due to X-Object-Segment-If-Length).
4026+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
4027+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
4028+ self.assert_(df.is_deleted())
4029+
4030+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'})
4031+ resp = self.object_controller.GET(req)
4032+ self.assertEquals(resp.status_int, 200)
4033+ self.assertEquals(resp.body, '1')
4034+
4035+ def test_deleted_manifest(self):
4036+ """
4037+ Ensures a janitor segment cleanup job is created for a deleted
4038+ manifest.
4039+ """
4040+ if not self.path_to_test_xfs:
4041+ raise SkipTest
4042+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
4043+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
4044+ self.assert_(df.is_deleted())
4045+
4046+ manifest = {'x-timestamp': normalize_timestamp(1),
4047+ 'content-length': 100, 'content-type': 'text/plain',
4048+ 'x-segment-size': 10, 'etag': 'd41d8cd98f00b204e9800998ecf8427e'}
4049+ manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
4050+ req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
4051+ headers={'X-Timestamp': normalize_timestamp(1),
4052+ 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain',
4053+ 'X-Object-Type': 'manifest', 'X-Object-Length': '100'},
4054+ body=manifest)
4055+ resp = self.object_controller.PUT(req)
4056+ self.assertEquals(resp.status_int, 201)
4057+
4058+ req = Request.blank('/sda1/0/a/c/o',
4059+ environ={'REQUEST_METHOD': 'DELETE'},
4060+ headers={'X-Timestamp': normalize_timestamp(2),
4061+ 'Content-Length': '0', 'Content-Type': 'text/plain'}, body='')
4062+ resp = self.object_controller.DELETE(req)
4063+ self.assertEquals(resp.status_int, 204)
4064+
4065+ df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
4066+ normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR,
4067+ keep_data_fp=True)
4068+ self.assert_(not df.is_deleted())
4069+ job = pickle.loads(''.join(iter(df)))
4070+ self.assertEquals(job['segment_last_deleted'], -1)
4071+
4072
4073 if __name__ == '__main__':
4074 unittest.main()
4075
4076=== modified file 'test/unit/obj/test_updater.py'
4077--- test/unit/obj/test_updater.py 2010-09-23 16:09:30 +0000
4078+++ test/unit/obj/test_updater.py 2010-11-08 18:51:48 +0000
4079@@ -24,7 +24,7 @@
4080 from eventlet import spawn, TimeoutError, listen
4081 from eventlet.timeout import Timeout
4082
4083-from swift.obj import updater as object_updater, server as object_server
4084+from swift.obj import updater as object_updater
4085 from swift.common.ring import RingData
4086 from swift.common import utils
4087 from swift.common.utils import hash_path, normalize_timestamp, mkdirs
4088@@ -48,7 +48,7 @@
4089 os.mkdir(self.devices_dir)
4090 self.sda1 = os.path.join(self.devices_dir, 'sda1')
4091 os.mkdir(self.sda1)
4092- os.mkdir(os.path.join(self.sda1,'tmp'))
4093+ os.mkdir(os.path.join(self.sda1, 'tmp'))
4094
4095 def tearDown(self):
4096 rmtree(self.testdir, ignore_errors=1)
4097@@ -80,7 +80,7 @@
4098 'node_timeout': '15',
4099 })
4100 cu.run_once()
4101- async_dir = os.path.join(self.sda1, object_server.ASYNCDIR)
4102+ async_dir = os.path.join(self.sda1, object_updater.ASYNCDIR)
4103 os.mkdir(async_dir)
4104 cu.run_once()
4105 self.assert_(os.path.exists(async_dir))
4106@@ -103,6 +103,7 @@
4107 self.assert_(os.path.exists(op_path))
4108
4109 bindsock = listen(('127.0.0.1', 0))
4110+
4111 def accepter(sock, return_code):
4112 try:
4113 with Timeout(3):
4114@@ -123,6 +124,7 @@
4115 except BaseException, err:
4116 return err
4117 return None
4118+
4119 def accept(return_codes):
4120 codes = iter(return_codes)
4121 try:
4122@@ -139,7 +141,8 @@
4123 except BaseException, err:
4124 return err
4125 return None
4126- event = spawn(accept, [201,500])
4127+
4128+ event = spawn(accept, [201, 500])
4129 for dev in cu.get_container_ring().devs:
4130 if dev is not None:
4131 dev['port'] = bindsock.getsockname()[1]
4132@@ -155,5 +158,6 @@
4133 raise err
4134 self.assert_(not os.path.exists(op_path))
4135
4136+
4137 if __name__ == '__main__':
4138 unittest.main()
4139
4140=== modified file 'test/unit/proxy/test_server.py'
4141--- test/unit/proxy/test_server.py 2010-11-05 14:47:43 +0000
4142+++ test/unit/proxy/test_server.py 2010-11-08 18:51:48 +0000
4143@@ -34,7 +34,7 @@
4144 from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen
4145 from eventlet.timeout import Timeout
4146 import simplejson
4147-from webob import Request
4148+from webob import Request, Response
4149 from webob.exc import HTTPUnauthorized
4150
4151 from test.unit import connect_tcp, readuntil2crlfs
4152@@ -44,7 +44,7 @@
4153 from swift.obj import server as object_server
4154 from swift.common import ring
4155 from swift.common.constraints import MAX_META_NAME_LENGTH, \
4156- MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, MAX_FILE_SIZE
4157+ MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE
4158 from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
4159
4160
4161@@ -53,7 +53,9 @@
4162
4163
4164 def fake_http_connect(*code_iter, **kwargs):
4165+
4166 class FakeConn(object):
4167+
4168 def __init__(self, status, etag=None, body=''):
4169 self.status = status
4170 self.reason = 'Fake'
4171@@ -158,6 +160,7 @@
4172
4173
4174 class FakeMemcache(object):
4175+
4176 def __init__(self):
4177 self.store = {}
4178
4179@@ -212,9 +215,12 @@
4180 class TestProxyServer(unittest.TestCase):
4181
4182 def test_unhandled_exception(self):
4183+
4184 class MyApp(proxy_server.Application):
4185+
4186 def get_controller(self, path):
4187 raise Exception('this shouldnt be caught')
4188+
4189 app = MyApp(None, FakeMemcache(), account_ring=FakeRing(),
4190 container_ring=FakeRing(), object_ring=FakeRing())
4191 req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'})
4192@@ -323,8 +329,11 @@
4193 test_status_map((200, 200, 204, 500, 404), 503)
4194
4195 def test_PUT_connect_exceptions(self):
4196+
4197 def mock_http_connect(*code_iter, **kwargs):
4198+
4199 class FakeConn(object):
4200+
4201 def __init__(self, status):
4202 self.status = status
4203 self.reason = 'Fake'
4204@@ -372,8 +381,11 @@
4205 test_status_map((200, 200, 503, 503, -1), 503)
4206
4207 def test_PUT_send_exceptions(self):
4208+
4209 def mock_http_connect(*code_iter, **kwargs):
4210+
4211 class FakeConn(object):
4212+
4213 def __init__(self, status):
4214 self.status = status
4215 self.reason = 'Fake'
4216@@ -430,15 +442,18 @@
4217 controller = proxy_server.ObjectController(self.app, 'account',
4218 'container', 'object')
4219 req = Request.blank('/a/c/o', {}, headers={
4220- 'Content-Length': str(MAX_FILE_SIZE + 1),
4221+ 'Content-Length': str(self.app.max_object_size + 1),
4222 'Content-Type': 'foo/bar'})
4223 self.app.update_request(req)
4224 res = controller.PUT(req)
4225 self.assertEquals(res.status_int, 413)
4226
4227 def test_PUT_getresponse_exceptions(self):
4228+
4229 def mock_http_connect(*code_iter, **kwargs):
4230+
4231 class FakeConn(object):
4232+
4233 def __init__(self, status):
4234 self.status = status
4235 self.reason = 'Fake'
4236@@ -633,6 +648,7 @@
4237 dev['port'] = 1
4238
4239 class SlowBody():
4240+
4241 def __init__(self):
4242 self.sent = 0
4243
4244@@ -680,6 +696,7 @@
4245 dev['port'] = 1
4246
4247 class SlowBody():
4248+
4249 def __init__(self):
4250 self.sent = 0
4251
4252@@ -1334,7 +1351,9 @@
4253
4254 def test_chunked_put(self):
4255 # quick test of chunked put w/o PATH_TO_TEST_XFS
4256+
4257 class ChunkedFile():
4258+
4259 def __init__(self, bytes):
4260 self.bytes = bytes
4261 self.read_bytes = 0
4262@@ -1375,12 +1394,13 @@
4263 req.body_file = ChunkedFile(11)
4264 self.app.memcache.store = {}
4265 self.app.update_request(req)
4266+ orig_max_object_size = self.app.max_object_size
4267 try:
4268- server.MAX_FILE_SIZE = 10
4269+ self.app.max_object_size = 10
4270 res = controller.PUT(req)
4271 self.assertEquals(res.status_int, 413)
4272 finally:
4273- server.MAX_FILE_SIZE = MAX_FILE_SIZE
4274+ self.app.max_object_size = orig_max_object_size
4275
4276 def test_chunked_put_and_a_bit_more(self):
4277 # Since we're starting up a lot here, we're going to test more than
4278@@ -1495,6 +1515,7 @@
4279 self.assertEquals(headers[:len(exp)], exp)
4280 # Check unhandled exception
4281 orig_update_request = prosrv.update_request
4282+
4283 def broken_update_request(env, req):
4284 raise Exception('fake')
4285 prosrv.update_request = broken_update_request
4286@@ -1545,6 +1566,7 @@
4287 # in a test for logging x-forwarded-for (first entry only).
4288
4289 class Logger(object):
4290+
4291 def info(self, msg):
4292 self.msg = msg
4293 orig_logger = prosrv.logger
4294@@ -1568,6 +1590,7 @@
4295 # Turn on header logging.
4296
4297 class Logger(object):
4298+
4299 def info(self, msg):
4300 self.msg = msg
4301 orig_logger = prosrv.logger
4302@@ -1919,6 +1942,188 @@
4303 res = controller.PUT(req)
4304 self.assert_(called[0])
4305
4306+ def test_GETorHEAD1(self):
4307+ """
4308+ Ensures we call GETorHEAD_base again without a range if we do a range
4309+ request and get a 416 Requested Range Not Satisfiable, just in case the
4310+ primary object is a manifest.
4311+ """
4312+ called_without_range = [False]
4313+ controller = proxy_server.ObjectController(self.app, 'account',
4314+ 'container', 'object')
4315+
4316+ def local_GETorHEAD_base(req, server_type, partition, nodes, path,
4317+ attempts):
4318+ if req.range:
4319+ return Response(status=416)
4320+ called_without_range[0] = True
4321+ return Response()
4322+
4323+ controller.GETorHEAD_base = local_GETorHEAD_base
4324+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'},
4325+ headers={'Range': 'bytes=0-1'})
4326+ controller.GETorHEAD(req)
4327+ self.assert_(called_without_range[0])
4328+
4329+ def test_GETorHEAD2(self):
4330+ """
4331+ Ensures we call GETorHEAD_base again if the first request was a HEAD
4332+ and the primary object is a manifest.
4333+ """
4334+ called_get = [False]
4335+ controller = proxy_server.ObjectController(self.app, 'account',
4336+ 'container', 'object')
4337+
4338+ def local_GETorHEAD_base(req, server_type, partition, nodes, path,
4339+ attempts):
4340+ if req.method == 'HEAD':
4341+ return Response(headers={'x-object-type': 'manifest'})
4342+ elif req.method == 'GET':
4343+ called_get[0] = True
4344+ return Response(headers={'x-object-type': 'manifest'},
4345+ body=pickle.dumps({'content-length': 0, 'x-segment-size': 1,
4346+ 'x-timestamp': normalize_timestamp(2),
4347+ 'etag': 'd41d8cd98f00b204e9800998ecf8427e',
4348+ 'content-type': 'text/plain'}))
4349+ else:
4350+ raise Exception('Unexpected method %s' % req.method)
4351+
4352+ controller.GETorHEAD_base = local_GETorHEAD_base
4353+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'HEAD'})
4354+ controller.GETorHEAD(req)
4355+ self.assert_(called_get[0])
4356+
4357+ def test_GETorHEAD3(self):
4358+ """
4359+ Ensures we call GETorHEAD_base again without a range if the first
4360+ request was a GET with range that succeeded and the primary object is a
4361+ manifest.
4362+ """
4363+ called_without_range = [False]
4364+ controller = proxy_server.ObjectController(self.app, 'account',
4365+ 'container', 'object')
4366+
4367+ def local_GETorHEAD_base(req, server_type, partition, nodes, path,
4368+ attempts):
4369+ if not req.range:
4370+ called_without_range[0] = True
4371+ return Response(headers={'x-object-type': 'manifest'},
4372+ body=pickle.dumps({'content-length': 0, 'x-segment-size': 1,
4373+ 'x-timestamp': normalize_timestamp(2),
4374+ 'etag': 'd41d8cd98f00b204e9800998ecf8427e',
4375+ 'content-type': 'text/plain'}))
4376+
4377+ controller.GETorHEAD_base = local_GETorHEAD_base
4378+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'},
4379+ headers={'Range': 'bytes=0-1'})
4380+ controller.GETorHEAD(req)
4381+ self.assert_(called_without_range[0])
4382+
4383+ def test_PUT_segmented_object1(self):
4384+ with save_globals():
4385+ proxy_server.http_connect = \
4386+ fake_http_connect(200, 200, # account, container checks
4387+ 201, 201, 201, # segment 0
4388+ 201, 201, 201, # segment 1
4389+ 201, 201, 201, # segment 2
4390+ 201, 201, 201) # manifest
4391+ controller = proxy_server.ObjectController(self.app, 'account',
4392+ 'container', 'object')
4393+ self.app.segment_size = 2
4394+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
4395+ body='12345')
4396+ resp = controller.PUT(req)
4397+ self.assertEquals(resp.status_int, 201)
4398+ self.assertEquals(req.bytes_transferred, 5)
4399+
4400+ def test_PUT_segmented_object2(self):
4401+ """ Same as 1, just with a chunky data source. """
4402+ with save_globals():
4403+ proxy_server.http_connect = \
4404+ fake_http_connect(200, 200, # account, container checks
4405+ 201, 201, 201, # segment 0
4406+ 201, 201, 201, # segment 1
4407+ 201, 201, 201, # segment 2
4408+ 201, 201, 201) # manifest
4409+ controller = proxy_server.ObjectController(self.app, 'account',
4410+ 'container', 'object')
4411+ self.app.segment_size = 2
4412+
4413+ class ChunkedReader(object):
4414+
4415+ def __init__(self):
4416+ self.chunk = 0
4417+
4418+ def read(self, size):
4419+ self.chunk += 1
4420+ if self.chunk == 1:
4421+ return '123'
4422+ elif self.chunk == 2:
4423+ return '45'
4424+ else:
4425+ return ''
4426+
4427+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT',
4428+ 'wsgi.input': ChunkedReader()}, headers={'Content-Length': '5'})
4429+ resp = controller.PUT(req)
4430+ self.assertEquals(resp.status_int, 201)
4431+ self.assertEquals(req.bytes_transferred, 5)
4432+
4433+ def test_PUT_segmented_object3(self):
4434+ """ Failed segment PUT. """
4435+ with save_globals():
4436+ proxy_server.http_connect = \
4437+ fake_http_connect(200, 200, # account, container checks
4438+ 201, 201, 201, # segment 0
4439+ 201, 500, 500, # segment 1
4440+ 201, 201, 201, # segment 2
4441+ 201, 201, 201) # manifest
4442+ controller = proxy_server.ObjectController(self.app, 'account',
4443+ 'container', 'object')
4444+ self.app.segment_size = 2
4445+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
4446+ body='12345')
4447+ resp = controller.PUT(req)
4448+ self.assertEquals(resp.status_int, 503)
4449+ self.assertEquals(resp.body.strip(),
4450+ 'Unable to complete very large file operation.')
4451+
4452+ def test_PUT_segmented_object4(self):
4453+ """ Non-matching etag sent. """
4454+ with save_globals():
4455+ proxy_server.http_connect = \
4456+ fake_http_connect(200, 200, # account, container checks
4457+ 201, 201, 201, # segment 0
4458+ 201, 201, 201, # segment 1
4459+ 201, 201, 201, # segment 2
4460+ 201, 201, 201) # manifest
4461+ controller = proxy_server.ObjectController(self.app, 'account',
4462+ 'container', 'object')
4463+ self.app.segment_size = 2
4464+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
4465+ body='12345', headers={'ETag': 'abc'})
4466+ resp = controller.PUT(req)
4467+ self.assertEquals(resp.status_int, 422)
4468+
4469+ def test_PUT_segmented_object5(self):
4470+ """ Failed manifest PUT. """
4471+ with save_globals():
4472+ proxy_server.http_connect = \
4473+ fake_http_connect(200, 200, # account, container checks
4474+ 201, 201, 201, # segment 0
4475+ 201, 201, 201, # segment 1
4476+ 201, 201, 201, # segment 2
4477+ 201, 500, 500) # manifest
4478+ controller = proxy_server.ObjectController(self.app, 'account',
4479+ 'container', 'object')
4480+ self.app.segment_size = 2
4481+ req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
4482+ body='12345')
4483+ resp = controller.PUT(req)
4484+ self.assertEquals(resp.status_int, 503)
4485+ self.assertEquals(resp.body.strip(),
4486+ 'Unable to complete very large file operation.')
4487+
4488 def test_COPY_calls_authorize(self):
4489 called = [False]
4490
4491@@ -2080,7 +2285,9 @@
4492 self.assertEquals(resp.status_int, 404)
4493
4494 def test_put_locking(self):
4495+
4496 class MockMemcache(FakeMemcache):
4497+
4498 def __init__(self, allow_lock=None):
4499 self.allow_lock = allow_lock
4500 super(MockMemcache, self).__init__()
4501@@ -2669,5 +2876,142 @@
4502 self.assertEquals(resp.status_int, 400)
4503
4504
4505+class TestSegmentedIterable(unittest.TestCase):
4506+
4507+ def setUp(self):
4508+ self.app = proxy_server.Application(None, FakeMemcache(),
4509+ account_ring=FakeRing(), container_ring=FakeRing(),
4510+ object_ring=FakeRing())
4511+ self.controller = proxy_server.ObjectController(self.app, 'account',
4512+ 'container', 'object')
4513+
4514+ def test_zero_bytes(self):
4515+ si = proxy_server.SegmentedIterable(self.controller, 0, 2,
4516+ normalize_timestamp(1))
4517+ self.assertEquals(''.join(iter(si)), '')
4518+
4519+ def test_happy_path(self):
4520+ segment = [0]
4521+
4522+ def give_connect(*args, **kwargs):
4523+ self.assertEquals(int(kwargs['headers']['X-Object-Segment']),
4524+ segment[0])
4525+ segment[0] += 1
4526+
4527+ with save_globals():
4528+ proxy_server.http_connect = fake_http_connect(200, 200, 200,
4529+ body='12', give_connect=give_connect)
4530+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4531+ normalize_timestamp(1))
4532+ self.assertEquals(''.join(iter(si)), '12121')
4533+ self.assertEquals(segment[0], 3)
4534+
4535+ def test_not_found_start(self):
4536+ with save_globals():
4537+ proxy_server.http_connect = \
4538+ fake_http_connect(404, 404, 404, 200, 200, 200, body='12')
4539+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4540+ normalize_timestamp(1))
4541+ exc = None
4542+ try:
4543+ for chunk in si:
4544+ raise Exception('Got data when we should not have.')
4545+ except Exception, err:
4546+ exc = err
4547+ self.assertEquals(str(exc),
4548+ 'Could not load segment 0 of /account/container/object')
4549+
4550+ def test_not_found_after_start(self):
4551+ with save_globals():
4552+ proxy_server.http_connect = \
4553+ fake_http_connect(200, 404, 404, 404, 200, 200, body='12')
4554+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4555+ normalize_timestamp(1))
4556+ exc = None
4557+ try:
4558+ for chunk in si:
4559+ self.assertEquals(chunk, '12')
4560+ except Exception, err:
4561+ exc = err
4562+ self.assertEquals(str(exc),
4563+ 'Could not load segment 1 of /account/container/object')
4564+
4565+ def test_partial_not_found(self):
4566+ with save_globals():
4567+ proxy_server.http_connect = \
4568+ fake_http_connect(404, 200, 404, 404, 200, 200, body='12')
4569+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4570+ normalize_timestamp(1))
4571+ self.assertEquals(''.join(iter(si)), '12121')
4572+
4573+ def test_bytes_transferred(self):
4574+ with save_globals():
4575+ proxy_server.http_connect = \
4576+ fake_http_connect(200, 200, 200, body='12')
4577+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4578+ normalize_timestamp(1))
4579+
4580+ class Stub(object):
4581+ pass
4582+
4583+ si.response = Stub()
4584+ self.assertEquals(''.join(iter(si)), '12121')
4585+ self.assertEquals(si.response.bytes_transferred, 5)
4586+
4587+ def test_bytes_transferred_app_iter_range(self):
4588+ with save_globals():
4589+ proxy_server.http_connect = \
4590+ fake_http_connect(200, 200, 200, body='12')
4591+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4592+ normalize_timestamp(1))
4593+
4594+ class Stub(object):
4595+ pass
4596+
4597+ si.response = Stub()
4598+ self.assertEquals(''.join(si.app_iter_range(1, 3)), '212')
4599+ self.assertEquals(si.response.bytes_transferred, 3)
4600+
4601+ def test_app_iter_range_past_end(self):
4602+ with save_globals():
4603+ proxy_server.http_connect = \
4604+ fake_http_connect(200, 200, 200, body='12')
4605+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4606+ normalize_timestamp(1))
4607+ self.assertEquals(''.join(si.app_iter_range(1, 30)), '2121')
4608+
4609+ def test_app_iter_range_start_past_end(self):
4610+ with save_globals():
4611+ proxy_server.http_connect = \
4612+ fake_http_connect(200, 200, 200, body='12')
4613+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4614+ normalize_timestamp(1))
4615+ self.assertEquals(''.join(si.app_iter_range(30, 31)), '')
4616+
4617+ def test_app_iter_range_to_end(self):
4618+ with save_globals():
4619+ proxy_server.http_connect = \
4620+ fake_http_connect(200, 200, 200, body='12')
4621+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4622+ normalize_timestamp(1))
4623+ self.assertEquals(''.join(si.app_iter_range(3, None)), '21')
4624+
4625+ def test_app_iter_range_to_an_end(self):
4626+ with save_globals():
4627+ proxy_server.http_connect = \
4628+ fake_http_connect(200, 200, 200, body='12')
4629+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4630+ normalize_timestamp(1))
4631+ self.assertEquals(''.join(si.app_iter_range(None, 3)), '121')
4632+
4633+ def test_app_iter_range_full(self):
4634+ with save_globals():
4635+ proxy_server.http_connect = \
4636+ fake_http_connect(200, 200, 200, body='12')
4637+ si = proxy_server.SegmentedIterable(self.controller, 5, 2,
4638+ normalize_timestamp(1))
4639+ self.assertEquals(''.join(si.app_iter_range(None, None)), '12121')
4640+
4641+
4642 if __name__ == '__main__':
4643 unittest.main()