Merge lp:~gholt/swift/lobjects3 into lp:~hudson-openstack/swift/trunk

Proposed by gholt
Status: Rejected
Rejected by: gholt
Proposed branch: lp:~gholt/swift/lobjects3
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 4643 lines (+3054/-575)
29 files modified
bin/swift-init (+12/-6)
bin/swift-object-janitor (+28/-0)
doc/source/deployment_guide.rst (+26/-0)
doc/source/development_saio.rst (+9/-0)
doc/source/index.rst (+1/-0)
doc/source/object.rst (+19/-0)
doc/source/overview_very_large_objects.rst (+144/-0)
etc/object-server.conf-sample (+15/-0)
etc/proxy-server.conf-sample (+3/-0)
setup.py (+16/-23)
swift/common/constraints.py (+11/-6)
swift/common/db.py (+1/-2)
swift/obj/auditor.py (+9/-15)
swift/obj/diskfile.py (+507/-0)
swift/obj/janitor.py (+516/-0)
swift/obj/replicator.py (+32/-176)
swift/obj/server.py (+109/-236)
swift/obj/updater.py (+16/-2)
swift/proxy/server.py (+391/-26)
test/functional/sample.conf (+8/-1)
test/functional/tests.py (+1/-1)
test/probe/common.py (+1/-1)
test/probe/test_object_async_update.py (+1/-1)
test/unit/common/test_constraints.py (+6/-8)
test/unit/obj/test_diskfile.py (+203/-0)
test/unit/obj/test_janitor.py (+433/-0)
test/unit/obj/test_server.py (+179/-62)
test/unit/obj/test_updater.py (+8/-4)
test/unit/proxy/test_server.py (+349/-5)
To merge this branch: bzr merge lp:~gholt/swift/lobjects3
Reviewer Review Type Date Requested Status
gholt (community) Disapprove
Review via email: mp+39792@code.launchpad.net

Commit message

Very large object support.

Description of the change

Very large object support.

Please read doc/source/overview_very_large_objects.rst to start.

To post a comment you must log in.
lp:~gholt/swift/lobjects3 updated
120. By gholt

Make poor Hudson happier

121. By gholt

Added missing SkipTest import

122. By gholt

Merged from trunk

123. By gholt

Merged from trunk

124. By gholt

Merge from trunk

Revision history for this message
gholt (gholt) wrote :

After a thorough discussion, this idea of transparent large object support is going to be dropped in favor of a combination client and cluster side solution.

review: Disapprove

Unmerged revisions

124. By gholt

Merge from trunk

123. By gholt

Merged from trunk

122. By gholt

Merged from trunk

121. By gholt

Added missing SkipTest import

120. By gholt

Make poor Hudson happier

119. By gholt

Documentation

118. By gholt

Tests for proxy server

117. By gholt

More object server tests

116. By gholt

More tests for the janitor

115. By gholt

Working on tests and bugfixes

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'bin/swift-init'
--- bin/swift-init 2010-08-19 20:01:44 +0000
+++ bin/swift-init 2010-11-08 18:51:48 +0000
@@ -23,10 +23,11 @@
23import sys23import sys
24import time24import time
2525
26ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',26ALL_SERVERS = ['account-auditor', 'account-reaper', 'account-replicator',
27 'account-server', 'auth-server', 'container-auditor',
27 'container-replicator', 'container-server', 'container-updater',28 'container-replicator', 'container-server', 'container-updater',
28 'object-auditor', 'object-server', 'object-replicator', 'object-updater',29 'object-auditor', 'object-janitor', 'object-replicator', 'object-server',
29 'proxy-server', 'account-replicator', 'auth-server', 'account-reaper']30 'object-updater', 'proxy-server']
30GRACEFUL_SHUTDOWN_SERVERS = ['account-server', 'container-server',31GRACEFUL_SHUTDOWN_SERVERS = ['account-server', 'container-server',
31 'object-server', 'proxy-server', 'auth-server']32 'object-server', 'proxy-server', 'auth-server']
32MAX_DESCRIPTORS = 3276833MAX_DESCRIPTORS = 32768
@@ -41,6 +42,7 @@
41 servers = [server]42 servers = [server]
42command = command.lower()43command = command.lower()
4344
45
44def pid_files(server):46def pid_files(server):
45 if os.path.exists('/var/run/swift/%s.pid' % server):47 if os.path.exists('/var/run/swift/%s.pid' % server):
46 pid_files = ['/var/run/swift/%s.pid' % server]48 pid_files = ['/var/run/swift/%s.pid' % server]
@@ -50,6 +52,7 @@
50 pid = int(open(pid_file).read().strip())52 pid = int(open(pid_file).read().strip())
51 yield pid_file, pid53 yield pid_file, pid
5254
55
53def do_start(server, once=False):56def do_start(server, once=False):
54 server_type = '-'.join(server.split('-')[:-1])57 server_type = '-'.join(server.split('-')[:-1])
5558
@@ -77,7 +80,7 @@
77 os.makedirs(dir)80 os.makedirs(dir)
78 except OSError, err:81 except OSError, err:
79 if err.errno == errno.EACCES:82 if err.errno == errno.EACCES:
80 sys.exit('Unable to create %s. Running as non-root?' % dir)83 sys.exit('Unable to create %s. Running as non-root?' % dir)
81 fp = open(pid_file, 'w')84 fp = open(pid_file, 'w')
82 fp.write('%d\n' % pid)85 fp.write('%d\n' % pid)
83 fp.close()86 fp.close()
@@ -120,18 +123,21 @@
120 elif os.path.exists('/etc/swift/%s-server/' % server_type):123 elif os.path.exists('/etc/swift/%s-server/' % server_type):
121 # found config directory, searching for config file(s)124 # found config directory, searching for config file(s)
122 launch_args = []125 launch_args = []
123 for num, ini_file in enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)):126 for num, ini_file in \
127 enumerate(glob.glob('/etc/swift/%s-server/*.conf' % server_type)):
124 pid_file = '/var/run/swift/%s/%d.pid' % (server, num)128 pid_file = '/var/run/swift/%s/%d.pid' % (server, num)
125 # start a server for each ini_file found129 # start a server for each ini_file found
126 launch_args.append((ini_file, pid_file))130 launch_args.append((ini_file, pid_file))
127 else:131 else:
128 # maybe there's a config file(s) out there, but I couldn't find it!132 # maybe there's a config file(s) out there, but I couldn't find it!
129 sys.exit('Unable to locate config file for %s. %s does not exist?' % (server, ini_file))133 sys.exit('Unable to locate config file for %s. %s does not exist?' %
134 (server, ini_file))
130135
131 # start all servers136 # start all servers
132 for ini_file, pid_file in launch_args:137 for ini_file, pid_file in launch_args:
133 launch(ini_file, pid_file)138 launch(ini_file, pid_file)
134139
140
135def do_stop(server, graceful=False):141def do_stop(server, graceful=False):
136 if graceful and server in GRACEFUL_SHUTDOWN_SERVERS:142 if graceful and server in GRACEFUL_SHUTDOWN_SERVERS:
137 sig = signal.SIGHUP143 sig = signal.SIGHUP
138144
=== added file 'bin/swift-object-janitor'
--- bin/swift-object-janitor 1970-01-01 00:00:00 +0000
+++ bin/swift-object-janitor 2010-11-08 18:51:48 +0000
@@ -0,0 +1,28 @@
1#!/usr/bin/python
2# Copyright (c) 2010 OpenStack, LLC.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import sys
18
19from swift.obj.janitor import ObjectJanitor
20from swift.common import utils
21
22if __name__ == '__main__':
23 if len(sys.argv) < 2:
24 print "Usage: swift-object-janitor CONFIG_FILE [once]"
25 sys.exit(1)
26 once = len(sys.argv) > 2 and sys.argv[2] == 'once'
27 conf = utils.readconf(sys.argv[1], 'object-janitor')
28 ObjectJanitor(conf).run(once)
029
=== modified file 'doc/source/deployment_guide.rst'
--- doc/source/deployment_guide.rst 2010-10-21 18:59:43 +0000
+++ doc/source/deployment_guide.rst 2010-11-08 18:51:48 +0000
@@ -206,6 +206,25 @@
206 object can be reclaimed206 object can be reclaimed
207================== ================= =======================================207================== ================= =======================================
208208
209[object-janitor]
210
211=================== ============== ==========================================
212Option Default Description
213------------------- -------------- ------------------------------------------
214log_name object-janitor Label used when logging
215log_facility LOG_LOCAL0 Syslog log facility
216log_level INFO Logging level
217interval 300 Minimum time for a pass to take
218concurrency 1 Number of updater workers to spawn
219node_timeout 10 Request timeout to external services
220conn_timeout 0.5 Connection timeout to external services
221slowdown 0.01 Time in seconds to wait between operations
222segment_reclaim_age 604800 Number of seconds before removing orphaned
223 object segments
224segments_per_pass 10 Maximum segments per object to remove per
225 pass
226=================== ============== ==========================================
227
209[object-updater]228[object-updater]
210229
211================== ============== ==========================================230================== ============== ==========================================
@@ -438,6 +457,13 @@
438log_level INFO Log level457log_level INFO Log level
439log_headers True If True, log headers in each458log_headers True If True, log headers in each
440 request459 request
460max_object_size 107374182400 Maximum size of any object in
461 the cluster. Note: The
462 segment_size value will cause
463 objects larger than that to be
464 split into segments
465segment_size 2147483647 Objects will be split into file
466 segments no larger than this
441recheck_account_existence 60 Cache timeout in seconds to467recheck_account_existence 60 Cache timeout in seconds to
442 send memcached for account468 send memcached for account
443 existence469 existence
444470
=== modified file 'doc/source/development_saio.rst'
--- doc/source/development_saio.rst 2010-11-02 14:17:25 +0000
+++ doc/source/development_saio.rst 2010-11-08 18:51:48 +0000
@@ -440,6 +440,8 @@
440 [object-replicator]440 [object-replicator]
441 vm_test_mode = yes441 vm_test_mode = yes
442442
443 [object-janitor]
444
443 [object-updater]445 [object-updater]
444446
445 [object-auditor]447 [object-auditor]
@@ -461,6 +463,8 @@
461 [object-replicator]463 [object-replicator]
462 vm_test_mode = yes464 vm_test_mode = yes
463465
466 [object-janitor]
467
464 [object-updater]468 [object-updater]
465469
466 [object-auditor]470 [object-auditor]
@@ -482,6 +486,8 @@
482 [object-replicator]486 [object-replicator]
483 vm_test_mode = yes487 vm_test_mode = yes
484488
489 [object-janitor]
490
485 [object-updater]491 [object-updater]
486492
487 [object-auditor]493 [object-auditor]
@@ -503,6 +509,8 @@
503 [object-replicator]509 [object-replicator]
504 vm_test_mode = yes510 vm_test_mode = yes
505511
512 [object-janitor]
513
506 [object-updater]514 [object-updater]
507515
508 [object-auditor]516 [object-auditor]
@@ -571,6 +579,7 @@
571 # Replace devauth with whatever your super_admin key is (recorded in579 # Replace devauth with whatever your super_admin key is (recorded in
572 # /etc/swift/auth-server.conf).580 # /etc/swift/auth-server.conf).
573 swift-auth-recreate-accounts -K devauth581 swift-auth-recreate-accounts -K devauth
582 swift-init object-janitor start
574 swift-init object-updater start583 swift-init object-updater start
575 swift-init container-updater start584 swift-init container-updater start
576 swift-init object-replicator start585 swift-init object-replicator start
577586
=== modified file 'doc/source/index.rst'
--- doc/source/index.rst 2010-11-04 19:25:23 +0000
+++ doc/source/index.rst 2010-11-08 18:51:48 +0000
@@ -26,6 +26,7 @@
26 overview_replication26 overview_replication
27 overview_stats27 overview_stats
28 ratelimit28 ratelimit
29 overview_very_large_objects
2930
30Development:31Development:
3132
3233
=== modified file 'doc/source/object.rst'
--- doc/source/object.rst 2010-07-19 16:25:18 +0000
+++ doc/source/object.rst 2010-11-08 18:51:48 +0000
@@ -24,6 +24,16 @@
24 :undoc-members:24 :undoc-members:
25 :show-inheritance:25 :show-inheritance:
2626
27.. _object-janitor:
28
29Object Janitor
30==============
31
32.. automodule:: swift.obj.janitor
33 :members:
34 :undoc-members:
35 :show-inheritance:
36
27.. _object-updater:37.. _object-updater:
2838
29Object Updater39Object Updater
@@ -44,3 +54,12 @@
44 :undoc-members:54 :undoc-members:
45 :show-inheritance:55 :show-inheritance:
4656
57.. _object-diskfile:
58
59Disk File
60=========
61
62.. automodule:: swift.obj.diskfile
63 :members:
64 :undoc-members:
65 :show-inheritance:
4766
=== added file 'doc/source/overview_very_large_objects.rst'
--- doc/source/overview_very_large_objects.rst 1970-01-01 00:00:00 +0000
+++ doc/source/overview_very_large_objects.rst 2010-11-08 18:51:48 +0000
@@ -0,0 +1,144 @@
1=========================
2Very Large Object Support
3=========================
4
5-----
6Intro
7-----
8
9Supporting very large objects in Swift presented quite a challenge. The main
10problem is storage balance; if you just have a few very large objects in the
11cluster some of the storage nodes will have significantly more data than the
12others. The basic answer is to break these objects up into segments and
13distribute the segments across the cluster evenly.
14
15The user could do this themselves, breaking up their very large objects into
16smaller objects and uploading those. But then they'd have to reassemble them
17themselves as well on download. What we do in Swift is essentially emulate this
18behavior for the user, transparently to the user. In this way, we can pick the
19optimal segment size for our cluster, something the user probably wouldn't
20know. This also allows the user to easily do ranged requests on the object
21without having to know how it's split up behind the scenes.
22
23-----------------------
24The Proxy is in Control
25-----------------------
26
27In Swift's implementation, the proxy server is in control of the object
28segmentation. As the user uploads a very large object, the proxy automaticaly
29closes the current segment when it reaches a configurable segment size and
30opens a new segment for more data. Once all the data is uploaded and stored,
31the proxy server then creates a manifest object indicating how the object was
32segmented so that it can be retreived later. The proxy does not spool any data
33to disk, it sends all data directly to the object servers.
34
35Because of these segment switchovers occurring in the proxy server, a very
36large object operation will have a higher chance of failure than a normal sized
37object. If at any point the proxy can't communicate with at least a majority of
38the nodes for each and every segment of the object, the whole object operation
39will fail. For instance, a regular 1m object in a 3 replica cluster just needs
40to make at least 2 object server requests to succeed. If the segment size is
411G, a 10G object would need 2 object server requests to succeed 11 times during
42the operation (10 segments, 1 manifest).
43
44The upside of the proxy being in control is that the client does not need to
45know anything about the segmentation; it's done for them. The downside is that
46failure to upload means starting the whole thing over again. Later we plan to
47implement what we term 'user manifests' where the user can upload several
48objects but download them as if they were one. This would allow the user to
49just reupload a smaller part of the whole in the case of upload failures, but
50would require them to manage the segments themselves.
51
52-------------------------
53How They're Really Stored
54-------------------------
55
56If you have a working knowledge of how Swift stores and locates regular objects
57you'll know that the object name is hashed and that that hash determines the
58storage nodes and disk location for the object's data. Segmented objects work
59much the same way, but they have as many names as they do segments, plus one
60more for the manifest.
61
62The manifest is stored at the same location as the object would be if it were a
63normal object. The manifest is the last item stored once all the segments are
64in place, so that only one version of an object is available at any given time.
65This is important because you don't want to be able to retrieve a manifest and
66not be able to find the segments yet.
67
68The segments are given the same name as the object with the operation's
69timestamp and the segment number appended. The reason the timestamp is used is
70to keep the segments of multiple uploads of the same very large object from
71colliding. During an overwrite of a very large object, you don't want a
72download of that object to have some new data and some old. Since the segments
73for each version of the object are stored independently, a download before the
74new manifest file is in place will get the old data and a download afterwards
75will get the new data. Also, in the case where the new upload fails in transit,
76the old version will still be available.
77
78The storage nodes for each segment are determined the familiar way of hashing
79the segment name (object name + timestamp + segment number as described above)
80and then picking the storage nodes based on the hash. However, there is an
81exception for the first segment. This segment will always be stored on the same
82storage node and disk partition as a regular object of the same name would be,
83though the hash of the segment name determines the exact on disk location. This
84is because with chunked transfer encoded uploads we don't know the object size
85until the upload is complete. We won't realize we need to segment such an
86object until we've already uploaded the first segment's worth to the regular
87object storage nodes.
88
89To help keep directory structures smaller, object segments are stored in an
90'object_segments' directory alongside the usual 'objects' directory. Though all
91these files could be stored in the same directory structure, making this split
92should make replication, reclamation, and other scan-type operations less of a
93jar to the systems.
94
95.. note::
96
97 Because the first segment is always stored on the same storage nodes and in
98 the same partition as a regular file would, you can often see hashes in a
99 partition where they don't seem to belong. For instance, a first segment
100 name that hashes to 25d40100fd07c8c5fd1a4e31875593e1 might be found in
101 partition 568363 (hex 8ac2b) instead of the expected 154944 (hex 25d40)
102 because the segment's true object name hashed to
103 8ac2bf59556b61bb5cc521ccb51c200a. You should only see these anomalies in
104 the the object_segments directory, however.
105
106-----------
107Cleaning Up
108-----------
109
110Another challenge very large object support brought was in how to keep
111everything cleaned up and reduce the chance of completely orphaned segments
112that would waste a large amount of storage.
113
114One such scenario is the common DELETE operation. We don't want to make the
115client wait while we go out and delete all the segments and the manifest from
116all over the cluster. Instead, we write out a file that will start a background
117operation to the cleanup and we just remove the manifest file. An object
118overwrite is much like a delete in that just before placing the new manifest
119file we create the background job.
120
121Another scenario is failed uploads. If a user disconnects after uploading 100
122segments before finishing, we have to clean those up. Also, a proxy could crash
123due to hardware failure right in the middle of such an operation, so we
124wouldn't even be able to make a cleanup job at that point. So, we make a
125pending cleanup job just before starting any segmented upload. That pending
126cleanup job will wait up to a configurable amount of time (1 week by default)
127for the associated manifest to appear. If the manifest appears, the job is
128canceled and deleted. If the manifest never appears, the job goes about
129removing the orphaned segments from the system.
130
131Prior to very large object support, we had a background process called object
132async. Object async would send object metadata updates to the container servers
133in the event the container servers couldn't be contacted right away. These
134update jobs were uncommon and quick to complete, so durability wasn't too much
135of a concern. However, these segment cleanup jobs can hang around for a while
136(1 week in the above example of a proxy crash), so durable background jobs
137became important.
138
139With this, the object janitor was born. He accomplishes what object async did
140and the segment cleanup operations as well. The janitor jobs are stored just
141like regular object data is, except that they're stored in the object_janitor
142directory structure. Because they're stored in the same way, they can use the
143same object replicator code to keep durability with replicas on multiple
144storage nodes.
0145
=== modified file 'etc/object-server.conf-sample'
--- etc/object-server.conf-sample 2010-10-19 15:02:36 +0000
+++ etc/object-server.conf-sample 2010-11-08 18:51:48 +0000
@@ -44,6 +44,21 @@
44# The replicator also performs reclamation44# The replicator also performs reclamation
45# reclaim_age = 60480045# reclaim_age = 604800
4646
47[object-janitor]
48# log_name = object-janitor
49# interval = 300
50# concurrency = 1
51# node_timeout = 10
52# conn_timeout = 0.5
53# slowdown will sleep that amount between janitor operations
54# slowdown = 0.01
55# Number of seconds before assuming a segmented put will never succeed and
56# therefore clean up any orphaned segments from the operation.
57# segment_reclaim_age = 604800
58# Number of segments to remove per pass when cleaning up after superceded or
59# orphaned segmented put operations.
60# segments_per_pass = 10
61
47[object-updater]62[object-updater]
48# log_name = object-updater63# log_name = object-updater
49# interval = 30064# interval = 300
5065
=== modified file 'etc/proxy-server.conf-sample'
--- etc/proxy-server.conf-sample 2010-11-03 20:17:27 +0000
+++ etc/proxy-server.conf-sample 2010-11-08 18:51:48 +0000
@@ -17,6 +17,9 @@
17# log_facility = LOG_LOCAL017# log_facility = LOG_LOCAL0
18# log_level = INFO18# log_level = INFO
19# log_headers = False19# log_headers = False
20# max_object_size = 107374182400
21# Files will be split into segments no larger than segment_size
22# segment_size = 2147483647
20# recheck_account_existence = 6023# recheck_account_existence = 60
21# recheck_container_existence = 6024# recheck_container_existence = 60
22# object_chunk_size = 819225# object_chunk_size = 8192
2326
=== modified file 'setup.py'
--- setup.py 2010-11-03 19:50:35 +0000
+++ setup.py 2010-11-08 18:51:48 +0000
@@ -21,6 +21,7 @@
2121
22from swift import __version__ as version22from swift import __version__ as version
2323
24
24class local_sdist(sdist):25class local_sdist(sdist):
25 """Customized sdist hook - builds the ChangeLog file from VC first"""26 """Customized sdist hook - builds the ChangeLog file from VC first"""
2627
@@ -57,29 +58,21 @@
57 'Environment :: No Input/Output (Daemon)',58 'Environment :: No Input/Output (Daemon)',
58 ],59 ],
59 install_requires=[], # removed for better compat60 install_requires=[], # removed for better compat
60 scripts=[61 scripts=['bin/st', 'bin/swift-account-audit', 'bin/swift-account-auditor',
61 'bin/st', 'bin/swift-account-auditor',62 'bin/swift-account-reaper', 'bin/swift-account-replicator',
62 'bin/swift-account-audit', 'bin/swift-account-reaper',63 'bin/swift-account-server', 'bin/swift-account-stats-logger',
63 'bin/swift-account-replicator', 'bin/swift-account-server',64 'bin/swift-auth-add-user', 'bin/swift-auth-recreate-accounts',
64 'bin/swift-auth-add-user',65 'bin/swift-auth-server', 'bin/swift-auth-update-reseller-prefixes',
65 'bin/swift-auth-recreate-accounts', 'bin/swift-auth-server',66 'bin/swift-bench', 'bin/swift-container-auditor',
66 'bin/swift-auth-update-reseller-prefixes',67 'bin/swift-container-replicator', 'bin/swift-container-server',
67 'bin/swift-container-auditor',68 'bin/swift-container-updater', 'bin/swift-drive-audit',
68 'bin/swift-container-replicator',69 'bin/swift-get-nodes', 'bin/swift-init',
69 'bin/swift-container-server', 'bin/swift-container-updater',70 'bin/swift-log-stats-collector', 'bin/swift-log-uploader',
70 'bin/swift-drive-audit', 'bin/swift-get-nodes',71 'bin/swift-object-auditor', 'bin/swift-object-info',
71 'bin/swift-init', 'bin/swift-object-auditor',72 'bin/swift-object-janitor', 'bin/swift-object-replicator',
72 'bin/swift-object-info',73 'bin/swift-object-server', 'bin/swift-object-updater',
73 'bin/swift-object-replicator',74 'bin/swift-proxy-server', 'bin/swift-ring-builder',
74 'bin/swift-object-server',75 'bin/swift-stats-populate', 'bin/swift-stats-report'],
75 'bin/swift-object-updater', 'bin/swift-proxy-server',
76 'bin/swift-ring-builder', 'bin/swift-stats-populate',
77 'bin/swift-stats-report',
78 'bin/swift-bench',
79 'bin/swift-log-uploader',
80 'bin/swift-log-stats-collector',
81 'bin/swift-account-stats-logger',
82 ],
83 entry_points={76 entry_points={
84 'paste.app_factory': [77 'paste.app_factory': [
85 'proxy=swift.proxy.server:app_factory',78 'proxy=swift.proxy.server:app_factory',
8679
=== modified file 'swift/common/constraints.py'
--- swift/common/constraints.py 2010-10-26 15:13:14 +0000
+++ swift/common/constraints.py 2010-11-08 18:51:48 +0000
@@ -19,8 +19,6 @@
19 HTTPRequestEntityTooLarge19 HTTPRequestEntityTooLarge
2020
2121
22#: Max file size allowed for objects
23MAX_FILE_SIZE = 5 * 1024 * 1024 * 1024 + 2
24#: Max length of the name of a key for metadata22#: Max length of the name of a key for metadata
25MAX_META_NAME_LENGTH = 12823MAX_META_NAME_LENGTH = 128
26#: Max length of the value of a key for metadata24#: Max length of the value of a key for metadata
@@ -29,14 +27,18 @@
29MAX_META_COUNT = 9027MAX_META_COUNT = 90
30#: Max overall size of metadata28#: Max overall size of metadata
31MAX_META_OVERALL_SIZE = 409629MAX_META_OVERALL_SIZE = 4096
30#: Max account name length
31MAX_ACCOUNT_NAME_LENGTH = 256
32#: Max container name length
33MAX_CONTAINER_NAME_LENGTH = 256
32#: Max object name length34#: Max object name length
33MAX_OBJECT_NAME_LENGTH = 102435MAX_OBJECT_NAME_LENGTH = 1024
34#: Max object list length of a get request for a container36#: Max object list length of a get request for a container
35CONTAINER_LISTING_LIMIT = 1000037CONTAINER_LISTING_LIMIT = 10000
36#: Max container list length of a get request for an account38#: Max container list length of a get request for an account
37ACCOUNT_LISTING_LIMIT = 1000039ACCOUNT_LISTING_LIMIT = 10000
38MAX_ACCOUNT_NAME_LENGTH = 25640#: Default pickle protocol number used for Swift pickles
39MAX_CONTAINER_NAME_LENGTH = 25641PICKLE_PROTOCOL = 2
4042
4143
42def check_metadata(req, target_type):44def check_metadata(req, target_type):
@@ -82,19 +84,22 @@
82 return None84 return None
8385
8486
85def check_object_creation(req, object_name):87def check_object_creation(req, object_name, max_object_size=0):
86 """88 """
87 Check to ensure that everything is alright about an object to be created.89 Check to ensure that everything is alright about an object to be created.
8890
89 :param req: HTTP request object91 :param req: HTTP request object
90 :param object_name: name of object to be created92 :param object_name: name of object to be created
93 :param max_object_size: the maximum object size to check against; 0 if no
94 object size checking should be done
91 :raises HTTPRequestEntityTooLarge: the object is too large95 :raises HTTPRequestEntityTooLarge: the object is too large
92 :raises HTTPLengthRequered: missing content-length header and not96 :raises HTTPLengthRequered: missing content-length header and not
93 a chunked request97 a chunked request
94 :raises HTTPBadRequest: missing or bad content-type header, or98 :raises HTTPBadRequest: missing or bad content-type header, or
95 bad metadata99 bad metadata
96 """100 """
97 if req.content_length and req.content_length > MAX_FILE_SIZE:101 if max_object_size > 0 and req.content_length and \
102 req.content_length > max_object_size:
98 return HTTPRequestEntityTooLarge(body='Your request is too large.',103 return HTTPRequestEntityTooLarge(body='Your request is too large.',
99 request=req, content_type='text/plain')104 request=req, content_type='text/plain')
100 if req.content_length is None and \105 if req.content_length is None and \
101106
=== modified file 'swift/common/db.py'
--- swift/common/db.py 2010-08-16 22:30:27 +0000
+++ swift/common/db.py 2010-11-08 18:51:48 +0000
@@ -32,6 +32,7 @@
32import simplejson as json32import simplejson as json
33import sqlite333import sqlite3
3434
35from swift.common.constraints import PICKLE_PROTOCOL
35from swift.common.utils import normalize_timestamp, renamer, \36from swift.common.utils import normalize_timestamp, renamer, \
36 mkdirs, lock_parent_directory, fallocate37 mkdirs, lock_parent_directory, fallocate
37from swift.common.exceptions import LockTimeout38from swift.common.exceptions import LockTimeout
@@ -39,8 +40,6 @@
3940
40#: Timeout for trying to connect to a DB41#: Timeout for trying to connect to a DB
41BROKER_TIMEOUT = 2542BROKER_TIMEOUT = 25
42#: Pickle protocol to use
43PICKLE_PROTOCOL = 2
44#: Max number of pending entries43#: Max number of pending entries
45PENDING_CAP = 13107244PENDING_CAP = 131072
4645
4746
=== modified file 'swift/obj/auditor.py'
--- swift/obj/auditor.py 2010-10-21 18:32:10 +0000
+++ swift/obj/auditor.py 2010-11-08 18:51:48 +0000
@@ -18,8 +18,8 @@
18from hashlib import md518from hashlib import md5
19from random import random19from random import random
2020
21from swift.obj import server as object_server21from swift.obj.diskfile import DATADIR, DiskFile, invalidate_hash, \
22from swift.obj.replicator import invalidate_hash22 read_metadata
23from swift.common.utils import get_logger, renamer, audit_location_generator23from swift.common.utils import get_logger, renamer, audit_location_generator
24from swift.common.exceptions import AuditException24from swift.common.exceptions import AuditException
25from swift.common.daemon import Daemon25from swift.common.daemon import Daemon
@@ -45,10 +45,8 @@
45 time.sleep(random() * self.interval)45 time.sleep(random() * self.interval)
46 while True:46 while True:
47 begin = time.time()47 begin = time.time()
48 all_locs = audit_location_generator(self.devices,48 all_locs = audit_location_generator(self.devices, DATADIR,
49 object_server.DATADIR,49 mount_check=self.mount_check, logger=self.logger)
50 mount_check=self.mount_check,
51 logger=self.logger)
52 for path, device, partition in all_locs:50 for path, device, partition in all_locs:
53 self.object_audit(path, device, partition)51 self.object_audit(path, device, partition)
54 if time.time() - reported >= 3600: # once an hour52 if time.time() - reported >= 3600: # once an hour
@@ -68,10 +66,8 @@
68 """Run the object audit once."""66 """Run the object audit once."""
69 self.logger.info('Begin object audit "once" mode')67 self.logger.info('Begin object audit "once" mode')
70 begin = reported = time.time()68 begin = reported = time.time()
71 all_locs = audit_location_generator(self.devices,69 all_locs = audit_location_generator(self.devices, DATADIR,
72 object_server.DATADIR,70 mount_check=self.mount_check, logger=self.logger)
73 mount_check=self.mount_check,
74 logger=self.logger)
75 for path, device, partition in all_locs:71 for path, device, partition in all_locs:
76 self.object_audit(path, device, partition)72 self.object_audit(path, device, partition)
77 if time.time() - reported >= 3600: # once an hour73 if time.time() - reported >= 3600: # once an hour
@@ -99,14 +95,12 @@
99 if not path.endswith('.data'):95 if not path.endswith('.data'):
100 return96 return
101 try:97 try:
102 name = object_server.read_metadata(path)['name']98 name = read_metadata(path)['name']
103 except Exception, exc:99 except Exception, exc:
104 raise AuditException('Error when reading metadata: %s' % exc)100 raise AuditException('Error when reading metadata: %s' % exc)
105 _, account, container, obj = name.split('/', 3)101 _, account, container, obj = name.split('/', 3)
106 df = object_server.DiskFile(self.devices, device,102 df = DiskFile(self.devices, device, partition, account, container,
107 partition, account,103 obj, keep_data_fp=True)
108 container, obj,
109 keep_data_fp=True)
110 if df.data_file is None:104 if df.data_file is None:
111 # file is deleted, we found the tombstone105 # file is deleted, we found the tombstone
112 return106 return
113107
=== added file 'swift/obj/diskfile.py'
--- swift/obj/diskfile.py 1970-01-01 00:00:00 +0000
+++ swift/obj/diskfile.py 2010-11-08 18:51:48 +0000
@@ -0,0 +1,507 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import with_statement
17import cPickle as pickle
18import errno
19import hashlib
20import os
21from contextlib import contextmanager
22from os.path import basename, dirname, isdir, join, splitext
23from tempfile import mkstemp
24from time import time
25
26from eventlet import tpool, sleep
27from xattr import getxattr, setxattr
28
29from swift.common.constraints import PICKLE_PROTOCOL
30from swift.common.utils import drop_buffer_cache, hash_path, lock_path, \
31 mkdirs, normalize_timestamp, renamer, split_path, storage_directory
32
33
34DATADIR = 'objects'
35SEGMENTSDIR = 'object_segments'
36JANITORDIR = 'object_janitor'
37HASH_FILE = 'hashes.pkl'
38METADATA_KEY = 'user.swift.metadata'
39ONE_WEEK = 604800
40
41
42def read_metadata(fd):
43 """
44 Helper function to read the pickled metadata from an object file.
45
46 :param fd: file descriptor to load the metadata from
47
48 :returns: dictionary of metadata
49 """
50 metadata = ''
51 key = 0
52 try:
53 while True:
54 metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
55 key += 1
56 except IOError:
57 pass
58 return pickle.loads(metadata)
59
60
61def hash_suffix(path, reclaim_age):
62 """
63 Performs reclamation and returns an md5 of all (remaining) files.
64
65 :param reclaim_age: age in seconds at which to remove tombstones
66 """
67 md5 = hashlib.md5()
68 for hsh in sorted(os.listdir(path)):
69 hsh_path = join(path, hsh)
70 files = os.listdir(hsh_path)
71 if len(files) == 1:
72 if files[0].endswith('.ts'):
73 # remove tombstones older than reclaim_age
74 ts = files[0].rsplit('.', 1)[0]
75 if (time() - float(ts)) > reclaim_age:
76 os.unlink(join(hsh_path, files[0]))
77 files.remove(files[0])
78 elif files:
79 files.sort(reverse=True)
80 meta = data = tomb = None
81 for filename in files:
82 if not meta and filename.endswith('.meta'):
83 meta = filename
84 if not data and filename.endswith('.data'):
85 data = filename
86 if not tomb and filename.endswith('.ts'):
87 tomb = filename
88 if (filename < tomb or # any file older than tomb
89 filename < data or # any file older than data
90 (filename.endswith('.meta') and
91 filename < meta)): # old meta
92 if filename.endswith('.data'):
93 fp = open(join(hsh_path, filename), 'rb')
94 metadata = read_metadata(fp)
95 if metadata.get('X-Object-Type') == 'manifest':
96 manifest = pickle.load(fp)
97 partition = dirname(path)
98 device = dirname(dirname(partition))
99 devices = dirname(device)
100 partition = basename(partition)
101 device = basename(device)
102 account, container, obj = \
103 split_path(metadata['name'], 3,
104 rest_with_last=True)
105 df = DiskFile(devices, device, partition,
106 'Segment-Cleanup', manifest['x-timestamp'],
107 '%s/%s/%s' % (account, container, obj),
108 datadir=JANITORDIR)
109 df.store_janitor_segment_cleanup(account,
110 container, obj,
111 segment_count=(manifest['content-length'] /
112 manifest['x-segment-size'] + 1),
113 segment_last_deleted=None)
114 fp.close()
115 os.unlink(join(hsh_path, filename))
116 files.remove(filename)
117 if not files:
118 os.rmdir(hsh_path)
119 for filename in files:
120 md5.update(filename)
121 try:
122 os.rmdir(path)
123 except OSError:
124 pass
125 return md5.hexdigest()
126
127
128def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
129 """
130 Recalculates hashes for the given suffixes in the partition and updates
131 them in the partition's hashes file.
132
133 :param partition_dir: directory of the partition in which to recalculate
134 :param suffixes: list of suffixes to recalculate
135 :param reclaim_age: age in seconds at which tombstones should be removed
136 """
137
138 def tpool_listdir(partition_dir):
139 return dict(((suff, None) for suff in os.listdir(partition_dir)
140 if len(suff) == 3 and isdir(join(partition_dir, suff))))
141 hashes_file = join(partition_dir, HASH_FILE)
142 with lock_path(partition_dir):
143 try:
144 with open(hashes_file, 'rb') as fp:
145 hashes = pickle.load(fp)
146 except Exception:
147 hashes = tpool.execute(tpool_listdir, partition_dir)
148 for suffix in suffixes:
149 suffix_dir = join(partition_dir, suffix)
150 if os.path.exists(suffix_dir):
151 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
152 elif suffix in hashes:
153 del hashes[suffix]
154 with open(hashes_file + '.tmp', 'wb') as fp:
155 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
156 renamer(hashes_file + '.tmp', hashes_file)
157
158
159def invalidate_hash(suffix_dir):
160 """
161 Invalidates the hash for a suffix_dir in the partition's hashes file.
162
163 :param suffix_dir: absolute path to suffix dir whose hash needs
164 invalidating
165 """
166
167 suffix = os.path.basename(suffix_dir)
168 partition_dir = os.path.dirname(suffix_dir)
169 hashes_file = join(partition_dir, HASH_FILE)
170 with lock_path(partition_dir):
171 try:
172 with open(hashes_file, 'rb') as fp:
173 hashes = pickle.load(fp)
174 if suffix in hashes and not hashes[suffix]:
175 return
176 except Exception:
177 return
178 hashes[suffix] = None
179 with open(hashes_file + '.tmp', 'wb') as fp:
180 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
181 renamer(hashes_file + '.tmp', hashes_file)
182
183
184def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
185 """
186 Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
187 the hash cache for suffix existence at the (unexpectedly high) cost of a
188 listdir. reclaim_age is just passed on to hash_suffix.
189
190 :param partition_dir: absolute path of partition to get hashes for
191 :param do_listdir: force existence check for all hashes in the partition
192 :param reclaim_age: age at which to remove tombstones
193
194 :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
195 """
196
197 def tpool_listdir(hashes, partition_dir):
198 return dict(((suff, hashes.get(suff, None))
199 for suff in os.listdir(partition_dir)
200 if len(suff) == 3 and isdir(join(partition_dir, suff))))
201 hashed = 0
202 hashes_file = join(partition_dir, HASH_FILE)
203 with lock_path(partition_dir):
204 modified = False
205 hashes = {}
206 try:
207 with open(hashes_file, 'rb') as fp:
208 hashes = pickle.load(fp)
209 except Exception:
210 do_listdir = True
211 if do_listdir:
212 hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
213 modified = True
214 for suffix, hash_ in hashes.items():
215 if not hash_:
216 suffix_dir = join(partition_dir, suffix)
217 if os.path.exists(suffix_dir):
218 try:
219 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
220 hashed += 1
221 except OSError:
222 logging.exception('Error hashing suffix')
223 hashes[suffix] = None
224 else:
225 del hashes[suffix]
226 modified = True
227 sleep()
228 if modified:
229 with open(hashes_file + '.tmp', 'wb') as fp:
230 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
231 renamer(hashes_file + '.tmp', hashes_file)
232 return hashed, hashes
233
234
235class DiskFile(object):
236 """
237 Manage files on disk for a single object.
238
239 :param path: path to devices on the node
240 :param device: device name
241 :param partition: partition on the device the object lives in
242 :param account: account name for the object
243 :param container: container name for the object
244 :param obj: object name for the object
245 :param keep_data_fp: if True, don't close the fp, otherwise close it
246 :param disk_chunk_size: size of chunks on file reads
247 :param datadir: Sets which directory the root of the data structure is
248 named (default: DATADIR)
249 :param segment: If set to not None, indicates which segment of an object
250 this file represents
251 :param segment_timestamp: X-Timestamp of the object's segments (set on the
252 PUT, not changed on POSTs), required if segment
253 is set to not None
254 """
255
256 def __init__(self, path, device, partition, account, container, obj,
257 keep_data_fp=False, disk_chunk_size=65536, datadir=DATADIR,
258 segment=None, segment_timestamp=None):
259 self.account = account
260 self.container = container
261 self.obj = obj
262 self.fp = None
263 self.disk_chunk_size = disk_chunk_size
264 self.name = '/' + '/'.join((account, container, obj))
265 if segment is not None:
266 segment_name = '%s/%s/%s' % (obj, segment_timestamp, segment)
267 segment_hash = hash_path(account, container, segment_name)
268 self.datadir = os.path.join(path, device,
269 storage_directory(SEGMENTSDIR, partition, segment_hash))
270 name_hash = hash_path(account, container, obj)
271 self.no_longer_segment_datadir = os.path.join(path, device,
272 storage_directory(datadir, partition, name_hash))
273 else:
274 name_hash = hash_path(account, container, obj)
275 self.datadir = self.no_longer_segment_datadir = os.path.join(path,
276 device, storage_directory(datadir, partition, name_hash))
277 self.tmpdir = os.path.join(path, device, 'tmp')
278 self.metadata = {}
279 self.meta_file = None
280 self.data_file = None
281 if not os.path.exists(self.datadir):
282 return
283 files = sorted(os.listdir(self.datadir), reverse=True)
284 for file in files:
285 if file.endswith('.ts'):
286 self.data_file = self.meta_file = None
287 self.metadata = {'deleted': True}
288 return
289 if file.endswith('.meta') and not self.meta_file:
290 self.meta_file = os.path.join(self.datadir, file)
291 if file.endswith('.data') and not self.data_file:
292 self.data_file = os.path.join(self.datadir, file)
293 break
294 if not self.data_file:
295 return
296 self.fp = open(self.data_file, 'rb')
297 self.metadata = read_metadata(self.fp)
298 if not keep_data_fp:
299 self.close()
300 if self.meta_file:
301 with open(self.meta_file) as mfp:
302 for key in self.metadata.keys():
303 if key.lower() not in ('content-type', 'content-encoding',
304 'deleted', 'content-length', 'etag'):
305 del self.metadata[key]
306 self.metadata.update(read_metadata(mfp))
307
308 def __iter__(self):
309 """Returns an iterator over the data file."""
310 try:
311 dropped_cache = 0
312 read = 0
313 while True:
314 chunk = self.fp.read(self.disk_chunk_size)
315 if chunk:
316 read += len(chunk)
317 if read - dropped_cache > (1024 * 1024):
318 drop_buffer_cache(self.fp.fileno(), dropped_cache,
319 read - dropped_cache)
320 dropped_cache = read
321 yield chunk
322 else:
323 drop_buffer_cache(self.fp.fileno(), dropped_cache,
324 read - dropped_cache)
325 break
326 finally:
327 self.close()
328
329 def app_iter_range(self, start, stop):
330 """Returns an iterator over the data file for range (start, stop)"""
331 if start:
332 self.fp.seek(start)
333 if stop is not None:
334 length = stop - start
335 else:
336 length = None
337 for chunk in self:
338 if length is not None:
339 length -= len(chunk)
340 if length < 0:
341 # Chop off the extra:
342 yield chunk[:length]
343 break
344 yield chunk
345
346 def close(self):
347 """Close the file."""
348 if self.fp:
349 self.fp.close()
350 self.fp = None
351
352 def is_deleted(self):
353 """
354 Check if the file is deleted.
355
356 :returns: True if the file doesn't exist or has been flagged as
357 deleted.
358 """
359 return not self.data_file or 'deleted' in self.metadata
360
361 @contextmanager
362 def mkstemp(self):
363 """Context manager to make a temporary file."""
364 if not os.path.exists(self.tmpdir):
365 mkdirs(self.tmpdir)
366 fd, tmppath = mkstemp(dir=self.tmpdir)
367 try:
368 yield fd, tmppath
369 finally:
370 try:
371 os.close(fd)
372 except OSError:
373 pass
374 try:
375 os.unlink(tmppath)
376 except OSError:
377 pass
378
379 def put(self, fd, tmppath, metadata, extension='.data',
380 no_longer_segment=False):
381 """
382 Finalize writing the file on disk, and renames it from the temp file to
383 the real location. This should be called after the data has been
384 written to the temp file.
385
386 :params fd: file descriptor of the temp file
387 :param tmppath: path to the temporary file being used
388 :param metadata: dictionary of metada to be written
389 :param extension: extension to be used when making the file
390 :param no_longer_segment: Set to True if this was originally an object
391 segment but no longer is (case with chunked transfer encoding when
392 the object ends up less than the segment size)
393 """
394 metadata['name'] = self.name
395 timestamp = normalize_timestamp(metadata['X-Timestamp'])
396 metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
397 key = 0
398 while metastr:
399 setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
400 metastr = metastr[254:]
401 key += 1
402 if 'Content-Length' in metadata:
403 drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
404 os.fsync(fd)
405 if no_longer_segment:
406 self.datadir = self.no_longer_segment_datadir
407 invalidate_hash(os.path.dirname(self.datadir))
408 renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
409 self.metadata = metadata
410
411 def unlinkold(self, timestamp):
412 """
413 Remove any older versions of the object file. Any file that has an
414 older timestamp than timestamp will be deleted.
415
416 :param timestamp: timestamp to compare with each file
417 """
418 timestamp = normalize_timestamp(timestamp)
419 for fname in os.listdir(self.datadir):
420 if fname < timestamp:
421 try:
422 os.unlink(os.path.join(self.datadir, fname))
423 except OSError, err: # pragma: no cover
424 if err.errno != errno.ENOENT:
425 raise
426
427 def tombstone(self, timestamp):
428 """
429 Creates a tombstone for the DiskFile, indicating any versions older
430 than `timestamp` should be removed.
431
432 :param timestamp: normalized timestamp of the tombstone
433 """
434 with self.mkstemp() as (fd, tmppath):
435 self.put(fd, tmppath, {'X-Timestamp': timestamp, 'deleted': True},
436 extension='.ts')
437 self.unlinkold(timestamp)
438
439 def store_janitor_container_update(self, op, account, container, obj,
440 headers, successes):
441 """
442 Creates a .data file whose contents contain a operation for the
443 object-janitor to send object metadata to the container servers.
444
445 :param op: The operation to send to the container server (usually PUT
446 or DELETE).
447 :param account: The account name for the object.
448 :param container: The container name for the object.
449 :param obj: The object name for the object.
450 :param headers: The headers to include in the requests to the container
451 servers. Should at least contain X-Timestamp indicating
452 the version of the object metadata.
453 :param successes: An array of container node ids that have already
454 received the object metadata update.
455 """
456 timestamp = normalize_timestamp(time())
457 with self.mkstemp() as (fd, tmppath):
458 os.write(fd, pickle.dumps({'op': op, 'account': account,
459 'container': container, 'obj': obj, 'headers': headers,
460 'successes': successes}, PICKLE_PROTOCOL))
461 self.put(fd, tmppath,
462 {'X-Op': 'Container-Update', 'X-Timestamp': timestamp})
463 self.unlinkold(timestamp)
464
465 def store_janitor_segment_cleanup(self, account, container, obj,
466 segment_count, segment_last_deleted):
467 """
468 Creates a .data file whose contents contain a operation for the
469 object-janitor to send clean up object segments for an object.
470
471 Note that the DiskFile created for this operation is a bit different
472 than most DiskFiles. The DiskFile account name should be
473 'Segment-Cleanup', the container name should be the segments'
474 timestamp, and the object name should be the full account/container/obj
475 path (no leading /). However, the account, container, obj given in this
476 `store_janitor_segment_cleanup` call should be the usual as related to
477 the actual object. This complexity is so that each set of segments are
478 treated independently.
479
480 For example::
481
482 df = DiskFile(devices, device, partition,
483 'Segment-Cleanup', manifest['x-timestamp'],
484 '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR)
485 df.store_janitor_segment_cleanup(account, container, obj, None,
486 None)
487
488 :param account: The account name for the object.
489 :param container: The container name for the object.
490 :param obj: The object name for the object.
491 :param segment_count: The number of segments the object has or None if
492 the number is not known.
493 :param segment_last_deleted: The segment that was last deleted so the
494 next pass of this operation can continue
495 where it left off. Set to None if no
496 segments have been deleted yet.
497 """
498 timestamp = normalize_timestamp(time())
499 with self.mkstemp() as (fd, tmppath):
500 os.write(fd, pickle.dumps({'account': account,
501 'container': container, 'obj': obj,
502 'segment_count': segment_count,
503 'segment_last_deleted': segment_last_deleted},
504 PICKLE_PROTOCOL))
505 self.put(fd, tmppath,
506 {'X-Op': 'Segment-Cleanup', 'X-Timestamp': timestamp})
507 self.unlinkold(timestamp)
0508
=== added file 'swift/obj/janitor.py'
--- swift/obj/janitor.py 1970-01-01 00:00:00 +0000
+++ swift/obj/janitor.py 2010-11-08 18:51:48 +0000
@@ -0,0 +1,516 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import with_statement
17import cPickle as pickle
18import os
19import signal
20import sys
21from random import random
22from time import sleep, time
23
24from eventlet import patcher, Timeout
25
26from swift.common.bufferedhttp import http_connect
27from swift.common.exceptions import ConnectionTimeout
28from swift.common.ring import Ring
29from swift.common.utils import get_logger, normalize_timestamp, whataremyips
30from swift.common.daemon import Daemon
31from swift.obj.diskfile import DiskFile, JANITORDIR, read_metadata
32
33
34class ObjectJanitor(Daemon):
35 """
36 Run background operations for the object server, such as updating container
37 servers with new object metadata and cleaning up discarded segmented
38 objects.
39 """
40
41 def __init__(self, conf):
42 self.conf = conf
43 self.logger = get_logger(conf, 'object-janitor')
44 self.devices = conf.get('devices', '/srv/node')
45 self.port = int(conf.get('bind_port', 6000))
46 self.my_node_ids = []
47 self.mount_check = conf.get('mount_check', 'true').lower() in \
48 ('true', 't', '1', 'on', 'yes', 'y')
49 swift_dir = conf.get('swift_dir', '/etc/swift')
50 self.interval = int(conf.get('interval', 300))
51 self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
52 self.object_ring = None
53 self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
54 self.container_ring = None
55 self.concurrency = int(conf.get('concurrency', 1))
56 self.slowdown = float(conf.get('slowdown', 0.01))
57 self.node_timeout = int(conf.get('node_timeout', 10))
58 self.conn_timeout = float(conf.get('conn_timeout', 0.5))
59 self.segment_reclaim_age = int(conf.get('segment_reclaim_age', 604800))
60 self.segments_per_pass = int(conf.get('segments_per_pass', 10))
61 self.container_update_successes = 0
62 self.container_update_failures = 0
63 self.segment_cleanup_completions = 0
64 self.segment_cleanup_segments = 0
65 self.segment_cleanup_failures = 0
66
67 def get_object_ring(self):
68 """Get the object ring. Load it, if it hasn't been yet."""
69 if not self.object_ring:
70 self.logger.debug(
71 'Loading object ring from %s' % self.object_ring_path)
72 self.object_ring = Ring(self.object_ring_path)
73 return self.object_ring
74
75 def get_container_ring(self):
76 """Get the container ring. Load it, if it hasn't been yet."""
77 if not self.container_ring:
78 self.logger.debug(
79 'Loading container ring from %s' % self.container_ring_path)
80 self.container_ring = Ring(self.container_ring_path)
81 return self.container_ring
82
83 def run_forever(self):
84 """Run the janitor continuously."""
85 sleep(random() * self.interval)
86 while True:
87 begin = time()
88 self.full_sweep()
89 elapsed = time() - begin
90 if elapsed < self.interval:
91 sleep(self.interval - elapsed)
92
93 def run_once(self):
94 """Run the janitor once."""
95 self.full_sweep(fork=False)
96
97 def full_sweep(self, fork=True):
98 """
99 Run a full sweep of the server for object janitor operations.
100
101 :param fork: If True, subprocesses will be forked for each device up to
102 self.concurrency at any given time.
103 """
104 self.logger.info('Begin object janitor sweep')
105 begin = time()
106 my_ips = whataremyips()
107 self.my_node_ids = [n['id'] for n in self.get_object_ring().devs
108 if n and n['ip'] in my_ips and n['port'] == self.port]
109 pids = []
110 for device in os.listdir(self.devices):
111 if self.mount_check and not \
112 os.path.ismount(os.path.join(self.devices, device)):
113 self.logger.warn('Skipping %s as it is not mounted' % device)
114 continue
115 if fork:
116 while len(pids) >= self.concurrency:
117 pids.remove(os.wait()[0])
118 # read from rings to ensure they're fresh
119 self.get_object_ring().get_nodes('')
120 self.get_container_ring().get_nodes('')
121 if fork:
122 pid = os.fork()
123 if pid:
124 pids.append(pid)
125 else:
126 signal.signal(signal.SIGTERM, signal.SIG_DFL)
127 patcher.monkey_patch(all=False, socket=True)
128 self.object_sweep(os.path.join(self.devices, device))
129 sys.exit()
130 else:
131 self.object_sweep(os.path.join(self.devices, device))
132 if fork:
133 while pids:
134 pids.remove(os.wait()[0])
135 elapsed = time() - begin
136 self.logger.info('Object janitor sweep completed: %.02fs' % elapsed)
137
138 def object_sweep(self, device):
139 """
140 If there are janitor pendings on the device, walk each one and update.
141
142 :param device: path to device
143 """
144 self.container_update_successes = 0
145 self.container_update_failures = 0
146 self.segment_cleanup_completions = 0
147 self.segment_cleanup_segments = 0
148 self.segment_cleanup_failures = 0
149 begin = time()
150 janitordir = os.path.join(device, JANITORDIR)
151 try:
152 if not os.path.isdir(janitordir):
153 return
154 for partition in os.listdir(janitordir):
155 partition_path = os.path.join(janitordir, partition)
156 for suffix in os.listdir(partition_path):
157 suffix_path = os.path.join(partition_path, suffix)
158 for janitor in os.listdir(suffix_path):
159 janitor_path = os.path.join(suffix_path, janitor)
160 self.process_object_janitor(device, partition,
161 janitor_path)
162 sleep(self.slowdown)
163 finally:
164 elapsed = time() - begin
165 self.logger.info('Object janitor sweep of %s completed in %.02fs: '
166 'container updates: %s successes, %s failures; segment '
167 'cleanups: %s completions, %s segments, %s failures' %
168 (device, elapsed,
169 self.container_update_successes,
170 self.container_update_failures,
171 self.segment_cleanup_completions,
172 self.segment_cleanup_segments,
173 self.segment_cleanup_failures))
174
175 def process_object_janitor(self, device, partition, janitor_path):
176 """
177 Process the object janitor operation.
178
179 :param device: path to device
180 :param partition: partition for the object
181 :param janitor_path: path to DiskFile for the janitor operation
182 """
183 data_file = None
184 files = sorted(os.listdir(janitor_path), reverse=True)
185 for file_ in files:
186 if file_.endswith('.ts'):
187 break
188 if file_.endswith('.data'):
189 data_file = os.path.join(janitor_path, file_)
190 break
191 if not data_file:
192 return
193 metadata = read_metadata(data_file)
194 _, account, container, obj = metadata['name'].split('/', 3)
195 if metadata['X-Op'] == 'Container-Update':
196 self.process_container_update(device, partition, account,
197 container, obj)
198 elif metadata['X-Op'] == 'Segment-Cleanup':
199 if self.get_object_ring().get_part_nodes(int(partition))[0]['id'] \
200 in self.my_node_ids:
201 self.process_segment_cleanup(device, partition, account,
202 container, obj)
203 else:
204 self.logger.error('ERROR: Unknown X-Op: %s' % metadata['X-Op'])
205 return
206
207 def process_container_update(self, device, partition, account, container,
208 obj):
209 """
210 Process the container update operation, sending the object information
211 to the container server.
212
213 :param device: path to device
214 :param partition: partition for the object
215 :param account: account for the object
216 :param container: container for the object
217 :param obj: name for the object
218 """
219 disk_file = DiskFile(self.devices, device, partition, account,
220 container, obj, keep_data_fp=True, datadir=JANITORDIR)
221 if disk_file.is_deleted():
222 return
223 update = pickle.loads(''.join(iter(disk_file)))
224 op = update['op']
225 account = update['account']
226 container = update['container']
227 obj = update['obj']
228 headers = update['headers']
229 successes = update.get('successes', [])
230 part, nodes = self.get_container_ring().get_nodes(account, container)
231 path = '/%s/%s/%s' % (account, container, obj)
232 success = True
233 for node in nodes:
234 if node['id'] not in successes:
235 status = self.container_update(node, part, op, path, headers)
236 if not (200 <= status < 300) and status != 404:
237 success = False
238 else:
239 successes.append(node['id'])
240 if success:
241 self.container_update_successes += 1
242 self.logger.debug('Update sent for %s %s' %
243 (path, disk_file.datadir))
244 disk_file.tombstone(normalize_timestamp(time()))
245 else:
246 self.container_update_failures += 1
247 self.logger.debug('Update failed for %s %s' %
248 (path, disk_file.datadir))
249 if len(update.get('successes', [])) != len(successes):
250 disk_file.store_janitor_container_update(op, account,
251 container, obj, headers, successes)
252
253 def container_update(self, node, part, op, obj, headers):
254 """
255 Perform the actual container update network operation, sending the
256 object information to the container server.
257
258 :param node: node dictionary from the container ring
259 :param part: partition that holds the container
260 :param op: operation performed (ex: 'POST' or 'DELETE')
261 :param obj: object name being updated
262 :param headers: headers to send with the update
263 """
264 try:
265 with ConnectionTimeout(self.conn_timeout):
266 conn = http_connect(node['ip'], node['port'], node['device'],
267 part, op, obj, headers)
268 with Timeout(self.node_timeout):
269 resp = conn.getresponse()
270 resp.read()
271 return resp.status
272 except:
273 self.logger.exception('ERROR with remote server '
274 '%(ip)s:%(port)s/%(device)s' % node)
275 return 500
276
277 def process_segment_cleanup(self, device, partition, account, container,
278 obj):
279 """
280 Process the segment cleanup operation, checking to see if the
281 operation should have completed long ago but still has no manifest and
282 therefore should have the orphaned segments removed.
283
284 :param device: path to device
285 :param partition: partition for the operation
286 :param account: account for the operation (should always be
287 "Segment-Cleanup")
288 :param container: container for the operation (actually the timestamp
289 of the original PUT)
290 :param obj: name for the object (actually the full path
291 account/container/object for the original object PUT)
292 """
293 disk_file = DiskFile(self.devices, device, partition, account,
294 container, obj, keep_data_fp=True, datadir=JANITORDIR)
295 if disk_file.is_deleted():
296 return
297 disk_file_data = pickle.loads(''.join(iter(disk_file)))
298 segment_timestamp = container
299 # If not None, indicates we have started cleaning up these segments
300 # already and should just continue the operation.
301 segment_last_deleted = disk_file_data.get('segment_last_deleted', None)
302 # Sometimes we'll have no clue how many segments there are, which is
303 # what None means.
304 segment_count = disk_file_data.get('segment_count', None)
305 account = disk_file_data['account']
306 container = disk_file_data['container']
307 obj = disk_file_data['obj']
308 path = '/%s/%s/%s' % (account, container, obj)
309 if segment_last_deleted is not None:
310 self.logger.debug('Continue cleaning up segments for %s/%s/%s %s '
311 '@%s' % (device, partition, path, segment_timestamp,
312 segment_last_deleted))
313 self.cleanup_segments(disk_file, account, container, obj,
314 segment_timestamp, segment_count, segment_last_deleted)
315 return
316 part, nodes = self.get_object_ring().get_nodes(account, container, obj)
317 newest_object = None
318 newest_manifest = None
319 responses = 0
320 for node in nodes:
321 try:
322 with ConnectionTimeout(self.conn_timeout):
323 conn = http_connect(node['ip'], node['port'],
324 node['device'], part, 'GET', path)
325 with Timeout(self.node_timeout):
326 resp = conn.getresponse()
327 responses += 1
328 if resp.status // 100 != 2:
329 conn.close()
330 continue
331 if resp.getheader('x-object-type', '') != 'manifest':
332 if resp.getheader('x-timestamp') > newest_object:
333 newest_object = resp.getheader('x-timestamp')
334 conn.close()
335 if newest_object > segment_timestamp:
336 # We don't have to talk to the other nodes if we
337 # already know we have a newer object.
338 break
339 continue
340 with Timeout(self.node_timeout):
341 body = resp.read()
342 manifest = pickle.loads(body)
343 if manifest['x-timestamp'] > newest_manifest:
344 newest_manifest = manifest['x-timestamp']
345 conn.close()
346 if newest_manifest > segment_timestamp:
347 # We don't have to talk to the other nodes if we already
348 # know we have a newer manifest.
349 break
350 except:
351 self.logger.exception('ERROR with remote server '
352 '%(ip)s:%(port)s/%(device)s' % node)
353 if newest_object > segment_timestamp or \
354 newest_manifest > segment_timestamp:
355 self.logger.debug('Newer object/manifest; discarding old '
356 'segments for %s/%s/%s %s' % (device, partition, path,
357 segment_timestamp))
358 self.cleanup_segments(disk_file, account, container, obj,
359 segment_timestamp, segment_count, segment_last_deleted)
360 elif newest_manifest == segment_timestamp:
361 self.logger.debug('Exact manifest confirmed; discarding janitor '
362 'cleanup operation for %s/%s/%s %s' % (device, partition,
363 path, segment_timestamp))
364 disk_file.tombstone(normalize_timestamp(time()))
365 elif responses == len(nodes) and \
366 time() - float(segment_timestamp) > \
367 self.segment_reclaim_age:
368 self.logger.debug('All nodes agree the manifest still does not '
369 'exist after %ss; discarding orphaned segments for '
370 '%s/%s/%s %s' % (self.segment_reclaim_age, device,
371 partition, path, segment_timestamp))
372 self.cleanup_segments(disk_file, account, container, obj,
373 segment_timestamp, segment_count, segment_last_deleted)
374
375 def cleanup_segments(self, disk_file, account, container, obj,
376 segment_timestamp, segment_count, segment_last_deleted):
377 """
378 Issues DELETEs to the segments of the object, up to
379 self.segments_per_pass. If all the segments are not deleted before this
380 function returns, it will update the disk_file with the latest point
381 completed.
382
383 :param disk_file: DiskFile for the Segment Cleanup operation.
384 :param account: Account name for the manifest object to delete segments
385 for.
386 :param container: Container name for the manifest object to delete
387 segments for.
388 :param obj: Object name for the manifest object to delete segments for.
389 :param segment_timestamp: The timestamp for the segments themselves.
390 :param segment_count: The number of segments for the object; may be
391 None if not yet known and this function should
392 try to determine the count.
393 :param segment_last_deleted: The segment number that was last deleted;
394 None of none have yet been deleted. This
395 allows continuation of previous deletion
396 attempts.
397 """
398 path = '/%s/%s/%s' % (account, container, obj)
399 if segment_last_deleted is None:
400 segment_last_deleted = -1
401 if segment_count is None:
402 # We need to determine how many segments there are so that if we
403 # crash this run, next run we'll know what range we'll need to work
404 # within. Example: Determine we have 10 segments; delete the first
405 # five and crash; next run we now know we need to delete no more
406 # than 10 segments. With the approach of just deleting segments
407 # until a 404 and not predetermining the count, with such a crash
408 # we'd never delete the last five segments.
409 segment_count = 0
410 assumptions = 0
411 while True:
412 if segment_count:
413 ring_obj = \
414 '%s/%s/%s' % (obj, segment_timestamp, segment_count)
415 else:
416 ring_obj = obj
417 part, nodes = self.get_object_ring().get_nodes(account,
418 container, ring_obj)
419 headers = {'X-Object-Segment': str(segment_count),
420 'X-Object-Segment-Timestamp': segment_timestamp}
421 status = 0
422 for node in nodes:
423 try:
424 with ConnectionTimeout(self.conn_timeout):
425 conn = http_connect(node['ip'], node['port'],
426 node['device'], part, 'HEAD', path,
427 headers=headers)
428 with Timeout(self.node_timeout):
429 resp = conn.getresponse()
430 resp.read()
431 if resp.status == 404:
432 status = 404
433 elif resp.status // 100 == 2:
434 status = 200
435 break
436 except:
437 self.logger.exception('ERROR with remote server '
438 '%(ip)s:%(port)s/%(device)s' % node)
439 if status == 404:
440 break
441 segment_count += 1
442 if not status:
443 # We couldn't determine if a segment existed, so we'll just
444 # assume it did and move to the next one; but we'll only do
445 # this for five consecutive segments before just giving up
446 # completely and trying the next run.
447 assumptions += 1
448 if assumptions > 5:
449 self.segment_cleanup_failures += 1
450 return
451 assumptions = 0
452 disk_file.store_janitor_segment_cleanup(account, container, obj,
453 segment_count, segment_last_deleted)
454 starting_segment_last_deleted = segment_last_deleted
455 try:
456 segments_this_pass = self.segments_per_pass
457 while True:
458 segment = segment_last_deleted + 1
459 if segment:
460 ring_obj = '%s/%s/%s' % (obj, segment_timestamp, segment)
461 else:
462 ring_obj = obj
463 part, nodes = self.get_object_ring().get_nodes(account,
464 container, ring_obj)
465 headers = {'X-Object-Segment': str(segment),
466 'X-Object-Segment-Timestamp': segment_timestamp,
467 'X-Timestamp': normalize_timestamp(time())}
468 not_found_count = 0
469 success = False
470 for node in nodes:
471 try:
472 with ConnectionTimeout(self.conn_timeout):
473 conn = http_connect(node['ip'], node['port'],
474 node['device'], part, 'DELETE', path,
475 headers=headers)
476 with Timeout(self.node_timeout):
477 resp = conn.getresponse()
478 resp.read()
479 if resp.status == 404:
480 not_found_count += 1
481 elif resp.status // 100 == 2:
482 # For the sake of this clean up operation, we're
483 # going to consider even one success complete
484 # success.
485 success = True
486 except:
487 self.logger.exception('ERROR with remote server '
488 '%(ip)s:%(port)s/%(device)s' % node)
489 if not_found_count == len(nodes):
490 success = True
491 if not success:
492 # If we didn't even have one success, we'll have to leave
493 # the janitor operation to retry later.
494 disk_file.store_janitor_segment_cleanup(account, container,
495 obj, segment_count, segment_last_deleted)
496 self.segment_cleanup_failures += 1
497 return
498 segment_last_deleted = segment
499 self.segment_cleanup_segments += 1
500 if segment >= segment_count - 1:
501 disk_file.tombstone(normalize_timestamp(time()))
502 self.segment_cleanup_completions += 1
503 return
504 segments_this_pass -= 1
505 if segments_this_pass <= 0:
506 disk_file.store_janitor_segment_cleanup(account, container,
507 obj, segment_count, segment_last_deleted)
508 return
509 except:
510 # If we get an unexpected exception, log it and try to update the
511 # disk file to indicate the segment last deleted.
512 self.logger.exception('ERROR unexpected exception with %s:' % path)
513 self.segment_cleanup_failures += 1
514 if segment_last_deleted != starting_segment_last_deleted:
515 disk_file.store_janitor_segment_cleanup(account, container,
516 obj, segment_count, segment_last_deleted)
0517
=== modified file 'swift/obj/replicator.py'
--- swift/obj/replicator.py 2010-10-19 01:05:54 +0000
+++ swift/obj/replicator.py 2010-11-08 18:51:48 +0000
@@ -19,7 +19,6 @@
19import shutil19import shutil
20import time20import time
21import logging21import logging
22import hashlib
23import itertools22import itertools
24import cPickle as pickle23import cPickle as pickle
2524
@@ -29,168 +28,16 @@
29from eventlet.support.greenlets import GreenletExit28from eventlet.support.greenlets import GreenletExit
3029
31from swift.common.ring import Ring30from swift.common.ring import Ring
32from swift.common.utils import whataremyips, unlink_older_than, lock_path, \31from swift.common.utils import compute_eta, get_logger, unlink_older_than, \
33 renamer, compute_eta, get_logger32 whataremyips
34from swift.common.bufferedhttp import http_connect33from swift.common.bufferedhttp import http_connect
35from swift.common.daemon import Daemon34from swift.common.daemon import Daemon
35from swift.obj.diskfile import DATADIR, get_hashes, JANITORDIR, \
36 recalculate_hashes, SEGMENTSDIR
37
3638
37hubs.use_hub('poll')39hubs.use_hub('poll')
3840
39PICKLE_PROTOCOL = 2
40ONE_WEEK = 604800
41HASH_FILE = 'hashes.pkl'
42
43
44def hash_suffix(path, reclaim_age):
45 """
46 Performs reclamation and returns an md5 of all (remaining) files.
47
48 :param reclaim_age: age in seconds at which to remove tombstones
49 """
50 md5 = hashlib.md5()
51 for hsh in sorted(os.listdir(path)):
52 hsh_path = join(path, hsh)
53 files = os.listdir(hsh_path)
54 if len(files) == 1:
55 if files[0].endswith('.ts'):
56 # remove tombstones older than reclaim_age
57 ts = files[0].rsplit('.', 1)[0]
58 if (time.time() - float(ts)) > reclaim_age:
59 os.unlink(join(hsh_path, files[0]))
60 files.remove(files[0])
61 elif files:
62 files.sort(reverse=True)
63 meta = data = tomb = None
64 for filename in files:
65 if not meta and filename.endswith('.meta'):
66 meta = filename
67 if not data and filename.endswith('.data'):
68 data = filename
69 if not tomb and filename.endswith('.ts'):
70 tomb = filename
71 if (filename < tomb or # any file older than tomb
72 filename < data or # any file older than data
73 (filename.endswith('.meta') and
74 filename < meta)): # old meta
75 os.unlink(join(hsh_path, filename))
76 files.remove(filename)
77 if not files:
78 os.rmdir(hsh_path)
79 for filename in files:
80 md5.update(filename)
81 try:
82 os.rmdir(path)
83 except OSError:
84 pass
85 return md5.hexdigest()
86
87
88def recalculate_hashes(partition_dir, suffixes, reclaim_age=ONE_WEEK):
89 """
90 Recalculates hashes for the given suffixes in the partition and updates
91 them in the partition's hashes file.
92
93 :param partition_dir: directory of the partition in which to recalculate
94 :param suffixes: list of suffixes to recalculate
95 :param reclaim_age: age in seconds at which tombstones should be removed
96 """
97
98 def tpool_listdir(partition_dir):
99 return dict(((suff, None) for suff in os.listdir(partition_dir)
100 if len(suff) == 3 and isdir(join(partition_dir, suff))))
101 hashes_file = join(partition_dir, HASH_FILE)
102 with lock_path(partition_dir):
103 try:
104 with open(hashes_file, 'rb') as fp:
105 hashes = pickle.load(fp)
106 except Exception:
107 hashes = tpool.execute(tpool_listdir, partition_dir)
108 for suffix in suffixes:
109 suffix_dir = join(partition_dir, suffix)
110 if os.path.exists(suffix_dir):
111 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
112 elif suffix in hashes:
113 del hashes[suffix]
114 with open(hashes_file + '.tmp', 'wb') as fp:
115 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
116 renamer(hashes_file + '.tmp', hashes_file)
117
118
119def invalidate_hash(suffix_dir):
120 """
121 Invalidates the hash for a suffix_dir in the partition's hashes file.
122
123 :param suffix_dir: absolute path to suffix dir whose hash needs
124 invalidating
125 """
126
127 suffix = os.path.basename(suffix_dir)
128 partition_dir = os.path.dirname(suffix_dir)
129 hashes_file = join(partition_dir, HASH_FILE)
130 with lock_path(partition_dir):
131 try:
132 with open(hashes_file, 'rb') as fp:
133 hashes = pickle.load(fp)
134 if suffix in hashes and not hashes[suffix]:
135 return
136 except Exception:
137 return
138 hashes[suffix] = None
139 with open(hashes_file + '.tmp', 'wb') as fp:
140 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
141 renamer(hashes_file + '.tmp', hashes_file)
142
143
144def get_hashes(partition_dir, do_listdir=True, reclaim_age=ONE_WEEK):
145 """
146 Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
147 the hash cache for suffix existence at the (unexpectedly high) cost of a
148 listdir. reclaim_age is just passed on to hash_suffix.
149
150 :param partition_dir: absolute path of partition to get hashes for
151 :param do_listdir: force existence check for all hashes in the partition
152 :param reclaim_age: age at which to remove tombstones
153
154 :returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
155 """
156
157 def tpool_listdir(hashes, partition_dir):
158 return dict(((suff, hashes.get(suff, None))
159 for suff in os.listdir(partition_dir)
160 if len(suff) == 3 and isdir(join(partition_dir, suff))))
161 hashed = 0
162 hashes_file = join(partition_dir, HASH_FILE)
163 with lock_path(partition_dir):
164 modified = False
165 hashes = {}
166 try:
167 with open(hashes_file, 'rb') as fp:
168 hashes = pickle.load(fp)
169 except Exception:
170 do_listdir = True
171 if do_listdir:
172 hashes = tpool.execute(tpool_listdir, hashes, partition_dir)
173 modified = True
174 for suffix, hash_ in hashes.items():
175 if not hash_:
176 suffix_dir = join(partition_dir, suffix)
177 if os.path.exists(suffix_dir):
178 try:
179 hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
180 hashed += 1
181 except OSError:
182 logging.exception('Error hashing suffix')
183 hashes[suffix] = None
184 else:
185 del hashes[suffix]
186 modified = True
187 sleep()
188 if modified:
189 with open(hashes_file + '.tmp', 'wb') as fp:
190 pickle.dump(hashes, fp, PICKLE_PROTOCOL)
191 renamer(hashes_file + '.tmp', hashes_file)
192 return hashed, hashes
193
19441
195class ObjectReplicator(Daemon):42class ObjectReplicator(Daemon):
196 """43 """
@@ -302,7 +149,7 @@
302 if not had_any:149 if not had_any:
303 return False150 return False
304 args.append(join(rsync_module, node['device'],151 args.append(join(rsync_module, node['device'],
305 'objects', job['partition']))152 job.get('datadir', DATADIR), job['partition']))
306 return self._rsync(args) == 0153 return self._rsync(args) == 0
307154
308 def check_ring(self):155 def check_ring(self):
@@ -337,12 +184,14 @@
337 for node in job['nodes']:184 for node in job['nodes']:
338 success = self.rsync(node, job, suffixes)185 success = self.rsync(node, job, suffixes)
339 if success:186 if success:
187 headers = {'Content-Length': '0',
188 'X-Data-Dir': job.get('datadir', DATADIR)}
340 with Timeout(self.http_timeout):189 with Timeout(self.http_timeout):
341 http_connect(node['ip'],190 http_connect(node['ip'],
342 node['port'],191 node['port'],
343 node['device'], job['partition'], 'REPLICATE',192 node['device'], job['partition'], 'REPLICATE',
344 '/' + '-'.join(suffixes),193 '/' + '-'.join(suffixes),
345 headers={'Content-Length': '0'}).getresponse().read()194 headers=headers).getresponse().read()
346 responses.append(success)195 responses.append(success)
347 if not suffixes or (len(responses) == \196 if not suffixes or (len(responses) == \
348 self.object_ring.replica_count and all(responses)):197 self.object_ring.replica_count and all(responses)):
@@ -374,10 +223,12 @@
374 node = next(nodes)223 node = next(nodes)
375 attempts_left -= 1224 attempts_left -= 1
376 try:225 try:
226 headers = {'Content-Length': '0',
227 'X-Data-Dir': job.get('datadir', DATADIR)}
377 with Timeout(self.http_timeout):228 with Timeout(self.http_timeout):
378 resp = http_connect(node['ip'], node['port'],229 resp = http_connect(node['ip'], node['port'],
379 node['device'], job['partition'], 'REPLICATE',230 node['device'], job['partition'], 'REPLICATE',
380 '', headers={'Content-Length': '0'}).getresponse()231 '', headers=headers).getresponse()
381 if resp.status == 507:232 if resp.status == 507:
382 self.logger.error('%s/%s responded as unmounted' %233 self.logger.error('%s/%s responded as unmounted' %
383 (node['ip'], node['device']))234 (node['ip'], node['device']))
@@ -397,11 +248,13 @@
397 self.rsync(node, job, suffixes)248 self.rsync(node, job, suffixes)
398 recalculate_hashes(job['path'], suffixes,249 recalculate_hashes(job['path'], suffixes,
399 reclaim_age=self.reclaim_age)250 reclaim_age=self.reclaim_age)
251 headers = {'Content-Length': '0',
252 'X-Data-Dir': job.get('datadir', DATADIR)}
400 with Timeout(self.http_timeout):253 with Timeout(self.http_timeout):
401 conn = http_connect(node['ip'], node['port'],254 conn = http_connect(node['ip'], node['port'],
402 node['device'], job['partition'], 'REPLICATE',255 node['device'], job['partition'], 'REPLICATE',
403 '/' + '-'.join(suffixes),256 '/' + '-'.join(suffixes),
404 headers={'Content-Length': '0'})257 headers=headers)
405 conn.getresponse().read()258 conn.getresponse().read()
406 self.suffix_sync += len(suffixes)259 self.suffix_sync += len(suffixes)
407 except (Exception, Timeout):260 except (Exception, Timeout):
@@ -489,24 +342,27 @@
489 dev for dev in self.object_ring.devs342 dev for dev in self.object_ring.devs
490 if dev and dev['ip'] in ips and dev['port'] == self.port]:343 if dev and dev['ip'] in ips and dev['port'] == self.port]:
491 dev_path = join(self.devices_dir, local_dev['device'])344 dev_path = join(self.devices_dir, local_dev['device'])
492 obj_path = join(dev_path, 'objects')
493 tmp_path = join(dev_path, 'tmp')
494 if self.mount_check and not os.path.ismount(dev_path):345 if self.mount_check and not os.path.ismount(dev_path):
495 self.logger.warn('%s is not mounted' % local_dev['device'])346 self.logger.warn('%s is not mounted' % local_dev['device'])
496 continue347 continue
348 tmp_path = join(dev_path, 'tmp')
497 unlink_older_than(tmp_path, time.time() - self.reclaim_age)349 unlink_older_than(tmp_path, time.time() - self.reclaim_age)
498 if not os.path.exists(obj_path):350 for datadir in (DATADIR, JANITORDIR, SEGMENTSDIR):
499 continue351 obj_path = join(dev_path, datadir)
500 for partition in os.listdir(obj_path):352 if os.path.exists(obj_path):
501 try:353 for partition in os.listdir(obj_path):
502 nodes = [node for node in354 try:
503 self.object_ring.get_part_nodes(int(partition))355 nodes = [node for node in
504 if node['id'] != local_dev['id']]356 self.object_ring.get_part_nodes(
505 jobs.append(dict(path=join(obj_path, partition),357 int(partition))
506 nodes=nodes, delete=len(nodes) > 2,358 if node['id'] != local_dev['id']]
507 partition=partition))359 jobs.append(dict(
508 except ValueError:360 path=join(obj_path, partition),
509 continue361 nodes=nodes, delete=len(nodes) > 2,
362 partition=partition,
363 datadir=datadir))
364 except ValueError:
365 continue
510 random.shuffle(jobs)366 random.shuffle(jobs)
511 # Partititons that need to be deleted take priority367 # Partititons that need to be deleted take priority
512 jobs.sort(key=lambda job: not job['delete'])368 jobs.sort(key=lambda job: not job['delete'])
513369
=== modified file 'swift/obj/server.py'
--- swift/obj/server.py 2010-11-01 21:47:48 +0000
+++ swift/obj/server.py 2010-11-08 18:51:48 +0000
@@ -17,225 +17,28 @@
1717
18from __future__ import with_statement18from __future__ import with_statement
19import cPickle as pickle19import cPickle as pickle
20import errno
21import os20import os
22import time21import time
23import traceback22import traceback
24from datetime import datetime23from datetime import datetime
25from hashlib import md524from hashlib import md5
26from tempfile import mkstemp
27from urllib import unquote25from urllib import unquote
28from contextlib import contextmanager
2926
30from webob import Request, Response, UTC27from webob import Request, Response, UTC
31from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPCreated, \28from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
32 HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \29 HTTPInternalServerError, HTTPNoContent, HTTPNotFound, \
33 HTTPNotModified, HTTPPreconditionFailed, \30 HTTPNotModified, HTTPPreconditionFailed, \
34 HTTPRequestTimeout, HTTPUnprocessableEntity, HTTPMethodNotAllowed31 HTTPRequestTimeout, HTTPUnprocessableEntity, HTTPMethodNotAllowed
35from xattr import getxattr, setxattr
36from eventlet import sleep, Timeout32from eventlet import sleep, Timeout
3733
38from swift.common.utils import mkdirs, normalize_timestamp, \34from swift.common.utils import drop_buffer_cache, fallocate, get_logger, \
39 storage_directory, hash_path, renamer, fallocate, \35 mkdirs, normalize_timestamp, split_path
40 split_path, drop_buffer_cache, get_logger, write_pickle
41from swift.common.bufferedhttp import http_connect36from swift.common.bufferedhttp import http_connect
42from swift.common.constraints import check_object_creation, check_mount, \37from swift.common.constraints import check_object_creation, check_mount, \
43 check_float, check_utf838 check_float, check_utf8
44from swift.common.exceptions import ConnectionTimeout39from swift.common.exceptions import ConnectionTimeout
45from swift.obj.replicator import get_hashes, invalidate_hash, \40from swift.obj.diskfile import DATADIR, DiskFile, get_hashes, JANITORDIR, \
46 recalculate_hashes41 recalculate_hashes
47
48
49DATADIR = 'objects'
50ASYNCDIR = 'async_pending'
51PICKLE_PROTOCOL = 2
52METADATA_KEY = 'user.swift.metadata'
53MAX_OBJECT_NAME_LENGTH = 1024
54
55
56def read_metadata(fd):
57 """
58 Helper function to read the pickled metadata from an object file.
59
60 :param fd: file descriptor to load the metadata from
61
62 :returns: dictionary of metadata
63 """
64 metadata = ''
65 key = 0
66 try:
67 while True:
68 metadata += getxattr(fd, '%s%s' % (METADATA_KEY, (key or '')))
69 key += 1
70 except IOError:
71 pass
72 return pickle.loads(metadata)
73
74
75class DiskFile(object):
76 """
77 Manage object files on disk.
78
79 :param path: path to devices on the node
80 :param device: device name
81 :param partition: partition on the device the object lives in
82 :param account: account name for the object
83 :param container: container name for the object
84 :param obj: object name for the object
85 :param keep_data_fp: if True, don't close the fp, otherwise close it
86 :param disk_chunk_Size: size of chunks on file reads
87 """
88
89 def __init__(self, path, device, partition, account, container, obj,
90 keep_data_fp=False, disk_chunk_size=65536):
91 self.disk_chunk_size = disk_chunk_size
92 self.name = '/' + '/'.join((account, container, obj))
93 name_hash = hash_path(account, container, obj)
94 self.datadir = os.path.join(path, device,
95 storage_directory(DATADIR, partition, name_hash))
96 self.tmpdir = os.path.join(path, device, 'tmp')
97 self.metadata = {}
98 self.meta_file = None
99 self.data_file = None
100 if not os.path.exists(self.datadir):
101 return
102 files = sorted(os.listdir(self.datadir), reverse=True)
103 for file in files:
104 if file.endswith('.ts'):
105 self.data_file = self.meta_file = None
106 self.metadata = {'deleted': True}
107 return
108 if file.endswith('.meta') and not self.meta_file:
109 self.meta_file = os.path.join(self.datadir, file)
110 if file.endswith('.data') and not self.data_file:
111 self.data_file = os.path.join(self.datadir, file)
112 break
113 if not self.data_file:
114 return
115 self.fp = open(self.data_file, 'rb')
116 self.metadata = read_metadata(self.fp)
117 if not keep_data_fp:
118 self.close()
119 if self.meta_file:
120 with open(self.meta_file) as mfp:
121 for key in self.metadata.keys():
122 if key.lower() not in ('content-type', 'content-encoding',
123 'deleted', 'content-length', 'etag'):
124 del self.metadata[key]
125 self.metadata.update(read_metadata(mfp))
126
127 def __iter__(self):
128 """Returns an iterator over the data file."""
129 try:
130 dropped_cache = 0
131 read = 0
132 while True:
133 chunk = self.fp.read(self.disk_chunk_size)
134 if chunk:
135 read += len(chunk)
136 if read - dropped_cache > (1024 * 1024):
137 drop_buffer_cache(self.fp.fileno(), dropped_cache,
138 read - dropped_cache)
139 dropped_cache = read
140 yield chunk
141 else:
142 drop_buffer_cache(self.fp.fileno(), dropped_cache,
143 read - dropped_cache)
144 break
145 finally:
146 self.close()
147
148 def app_iter_range(self, start, stop):
149 """Returns an iterator over the data file for range (start, stop)"""
150 if start:
151 self.fp.seek(start)
152 if stop is not None:
153 length = stop - start
154 else:
155 length = None
156 for chunk in self:
157 if length is not None:
158 length -= len(chunk)
159 if length < 0:
160 # Chop off the extra:
161 yield chunk[:length]
162 break
163 yield chunk
164
165 def close(self):
166 """Close the file."""
167 if self.fp:
168 self.fp.close()
169 self.fp = None
170
171 def is_deleted(self):
172 """
173 Check if the file is deleted.
174
175 :returns: True if the file doesn't exist or has been flagged as
176 deleted.
177 """
178 return not self.data_file or 'deleted' in self.metadata
179
180 @contextmanager
181 def mkstemp(self):
182 """Contextmanager to make a temporary file."""
183 if not os.path.exists(self.tmpdir):
184 mkdirs(self.tmpdir)
185 fd, tmppath = mkstemp(dir=self.tmpdir)
186 try:
187 yield fd, tmppath
188 finally:
189 try:
190 os.close(fd)
191 except OSError:
192 pass
193 try:
194 os.unlink(tmppath)
195 except OSError:
196 pass
197
198 def put(self, fd, tmppath, metadata, extension='.data'):
199 """
200 Finalize writing the file on disk, and renames it from the temp file to
201 the real location. This should be called after the data has been
202 written to the temp file.
203
204 :params fd: file descriptor of the temp file
205 :param tmppath: path to the temporary file being used
206 :param metadata: dictionary of metada to be written
207 :param extention: extension to be used when making the file
208 """
209 metadata['name'] = self.name
210 timestamp = normalize_timestamp(metadata['X-Timestamp'])
211 metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
212 key = 0
213 while metastr:
214 setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
215 metastr = metastr[254:]
216 key += 1
217 if 'Content-Length' in metadata:
218 drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
219 os.fsync(fd)
220 invalidate_hash(os.path.dirname(self.datadir))
221 renamer(tmppath, os.path.join(self.datadir, timestamp + extension))
222 self.metadata = metadata
223
224 def unlinkold(self, timestamp):
225 """
226 Remove any older versions of the object file. Any file that has an
227 older timestamp than timestamp will be deleted.
228
229 :param timestamp: timestamp to compare with each file
230 """
231 timestamp = normalize_timestamp(timestamp)
232 for fname in os.listdir(self.datadir):
233 if fname < timestamp:
234 try:
235 os.unlink(os.path.join(self.datadir, fname))
236 except OSError, err: # pragma: no cover
237 if err.errno != errno.ENOENT:
238 raise
23942
24043
241class ObjectController(object):44class ObjectController(object):
@@ -262,7 +65,7 @@
262 self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 102465 self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
26366
264 def container_update(self, op, account, container, obj, headers_in,67 def container_update(self, op, account, container, obj, headers_in,
265 headers_out, objdevice):68 headers_out, objdevice, objpartition):
266 """69 """
267 Update the container when objects are updated.70 Update the container when objects are updated.
26871
@@ -274,6 +77,7 @@
274 :param headers_out: dictionary of headers to send in the container77 :param headers_out: dictionary of headers to send in the container
275 request78 request
276 :param objdevice: device name that the object is in79 :param objdevice: device name that the object is in
80 :param objpartition: partition that the object is in
277 """81 """
278 host = headers_in.get('X-Container-Host', None)82 host = headers_in.get('X-Container-Host', None)
279 partition = headers_in.get('X-Container-Partition', None)83 partition = headers_in.get('X-Container-Partition', None)
@@ -293,20 +97,16 @@
293 return97 return
294 else:98 else:
295 self.logger.error('ERROR Container update failed (saving '99 self.logger.error('ERROR Container update failed (saving '
296 'for async update later): %d response from %s:%s/%s' %100 'for janitor update later): %d response from %s:%s/%s'
297 (response.status, ip, port, contdevice))101 % (response.status, ip, port, contdevice))
298 except:102 except:
299 self.logger.exception('ERROR container update failed with '103 self.logger.exception('ERROR container update failed with '
300 '%s:%s/%s transaction %s (saving for async update later)' %104 '%s:%s/%s transaction %s (saving for janitor update later)' %
301 (ip, port, contdevice, headers_in.get('x-cf-trans-id', '-')))105 (ip, port, contdevice, headers_in.get('x-cf-trans-id', '-')))
302 async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)106 df = DiskFile(self.devices, objdevice, objpartition, account,
303 ohash = hash_path(account, container, obj)107 container, obj, datadir=JANITORDIR)
304 write_pickle(108 df.store_janitor_container_update(op, account, container, obj,
305 {'op': op, 'account': account, 'container': container,109 headers_out, [])
306 'obj': obj, 'headers': headers_out},
307 os.path.join(async_dir, ohash[-3:], ohash + '-' +
308 normalize_timestamp(headers_out['x-timestamp'])),
309 os.path.join(self.devices, objdevice, 'tmp'))
310110
311 def POST(self, request):111 def POST(self, request):
312 """Handle HTTP POST requests for the Swift Object Server."""112 """Handle HTTP POST requests for the Swift Object Server."""
@@ -355,7 +155,15 @@
355 if error_response:155 if error_response:
356 return error_response156 return error_response
357 file = DiskFile(self.devices, device, partition, account, container,157 file = DiskFile(self.devices, device, partition, account, container,
358 obj, disk_chunk_size=self.disk_chunk_size)158 obj, disk_chunk_size=self.disk_chunk_size, keep_data_fp=True,
159 segment=request.headers.get('x-object-segment'),
160 segment_timestamp=request.headers['x-timestamp'])
161 overwritten_manifest = False
162 if not file.is_deleted() and \
163 file.metadata.get('X-Object-Type') == 'manifest' and \
164 'x-object-segment' not in request.headers:
165 overwritten_manifest = pickle.loads(''.join(iter(file)))
166 file.close()
359 upload_expiration = time.time() + self.max_upload_time167 upload_expiration = time.time() + self.max_upload_time
360 etag = md5()168 etag = md5()
361 upload_size = 0169 upload_size = 0
@@ -397,17 +205,51 @@
397 if 'content-encoding' in request.headers:205 if 'content-encoding' in request.headers:
398 metadata['Content-Encoding'] = \206 metadata['Content-Encoding'] = \
399 request.headers['Content-Encoding']207 request.headers['Content-Encoding']
400 file.put(fd, tmppath, metadata)208 if 'x-object-type' in request.headers:
209 metadata['X-Object-Type'] = request.headers['x-object-type']
210 if 'x-object-segment' in request.headers:
211 metadata['X-Object-Segment'] = \
212 request.headers['x-object-segment']
213 no_longer_segment = False
214 if 'x-object-segment-if-length' in request.headers and \
215 int(request.headers['x-object-segment-if-length']) != \
216 os.fstat(fd).st_size:
217 del metadata['X-Object-Type']
218 del metadata['X-Object-Segment']
219 no_longer_segment = True
220 elif int(request.headers.get('x-object-segment', -1)) == 0:
221 # Write out a janitor operation to clean up this multi-segment
222 # PUT in the future if it fails.
223 df = DiskFile(self.devices, device, partition,
224 'Segment-Cleanup', request.headers['x-timestamp'],
225 '%s/%s/%s' % (account, container, obj),
226 datadir=JANITORDIR)
227 df.store_janitor_segment_cleanup(account, container, obj,
228 segment_count=None, segment_last_deleted=None)
229 if overwritten_manifest:
230 # Write out a janitor operation to clean up the overwritten
231 # multi-segment object.
232 df = DiskFile(self.devices, device, partition,
233 'Segment-Cleanup', overwritten_manifest['x-timestamp'],
234 '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR)
235 df.store_janitor_segment_cleanup(account, container, obj,
236 segment_count=(overwritten_manifest['content-length'] /
237 overwritten_manifest['x-segment-size'] + 1),
238 segment_last_deleted=None)
239 file.put(fd, tmppath, metadata,
240 no_longer_segment=no_longer_segment)
401 file.unlinkold(metadata['X-Timestamp'])241 file.unlinkold(metadata['X-Timestamp'])
402 self.container_update('PUT', account, container, obj, request.headers,242 if 'X-Object-Segment' not in file.metadata:
403 {'x-size': file.metadata['Content-Length'],243 self.container_update('PUT', account, container, obj,
404 'x-content-type': file.metadata['Content-Type'],244 request.headers,
405 'x-timestamp': file.metadata['X-Timestamp'],245 {'x-size': request.headers.get('x-object-length',
406 'x-etag': file.metadata['ETag'],246 file.metadata['Content-Length']),
407 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},247 'x-content-type': file.metadata['Content-Type'],
408 device)248 'x-timestamp': file.metadata['X-Timestamp'],
409 resp = HTTPCreated(request=request, etag=etag)249 'x-etag': file.metadata['ETag'],
410 return resp250 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
251 device, partition)
252 return HTTPCreated(request=request, etag=etag)
411253
412 def GET(self, request):254 def GET(self, request):
413 """Handle HTTP GET requests for the Swift Object Server."""255 """Handle HTTP GET requests for the Swift Object Server."""
@@ -420,7 +262,9 @@
420 if self.mount_check and not check_mount(self.devices, device):262 if self.mount_check and not check_mount(self.devices, device):
421 return Response(status='507 %s is not mounted' % device)263 return Response(status='507 %s is not mounted' % device)
422 file = DiskFile(self.devices, device, partition, account, container,264 file = DiskFile(self.devices, device, partition, account, container,
423 obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size)265 obj, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size,
266 segment=request.headers.get('x-object-segment'),
267 segment_timestamp=request.headers.get('x-object-segment-timestamp'))
424 if file.is_deleted():268 if file.is_deleted():
425 if request.headers.get('if-match') == '*':269 if request.headers.get('if-match') == '*':
426 return HTTPPreconditionFailed(request=request)270 return HTTPPreconditionFailed(request=request)
@@ -460,7 +304,9 @@
460 'application/octet-stream'), app_iter=file,304 'application/octet-stream'), app_iter=file,
461 request=request, conditional_response=True)305 request=request, conditional_response=True)
462 for key, value in file.metadata.iteritems():306 for key, value in file.metadata.iteritems():
463 if key.lower().startswith('x-object-meta-'):307 if key.lower().startswith('x-object-meta-') or \
308 key.lower() in ('x-timestamp', 'x-object-type',
309 'x-object-segment'):
464 response.headers[key] = value310 response.headers[key] = value
465 response.etag = file.metadata['ETag']311 response.etag = file.metadata['ETag']
466 response.last_modified = float(file.metadata['X-Timestamp'])312 response.last_modified = float(file.metadata['X-Timestamp'])
@@ -482,13 +328,17 @@
482 if self.mount_check and not check_mount(self.devices, device):328 if self.mount_check and not check_mount(self.devices, device):
483 return Response(status='507 %s is not mounted' % device)329 return Response(status='507 %s is not mounted' % device)
484 file = DiskFile(self.devices, device, partition, account, container,330 file = DiskFile(self.devices, device, partition, account, container,
485 obj, disk_chunk_size=self.disk_chunk_size)331 obj, disk_chunk_size=self.disk_chunk_size,
332 segment=request.headers.get('x-object-segment'),
333 segment_timestamp=request.headers.get('x-object-segment-timestamp'))
486 if file.is_deleted():334 if file.is_deleted():
487 return HTTPNotFound(request=request)335 return HTTPNotFound(request=request)
488 response = Response(content_type=file.metadata['Content-Type'],336 response = Response(content_type=file.metadata['Content-Type'],
489 request=request, conditional_response=True)337 request=request, conditional_response=True)
490 for key, value in file.metadata.iteritems():338 for key, value in file.metadata.iteritems():
491 if key.lower().startswith('x-object-meta-'):339 if key.lower().startswith('x-object-meta-') or \
340 key.lower() in ('x-timestamp', 'x-object-type',
341 'x-object-segment'):
492 response.headers[key] = value342 response.headers[key] = value
493 response.etag = file.metadata['ETag']343 response.etag = file.metadata['ETag']
494 response.last_modified = float(file.metadata['X-Timestamp'])344 response.last_modified = float(file.metadata['X-Timestamp'])
@@ -513,21 +363,43 @@
513 return Response(status='507 %s is not mounted' % device)363 return Response(status='507 %s is not mounted' % device)
514 response_class = HTTPNoContent364 response_class = HTTPNoContent
515 file = DiskFile(self.devices, device, partition, account, container,365 file = DiskFile(self.devices, device, partition, account, container,
516 obj, disk_chunk_size=self.disk_chunk_size)366 obj, disk_chunk_size=self.disk_chunk_size, keep_data_fp=True,
367 segment=request.headers.get('x-object-segment'),
368 segment_timestamp=request.headers.get('x-object-segment-timestamp'))
369 deleted_manifest = False
517 if file.is_deleted():370 if file.is_deleted():
518 response_class = HTTPNotFound371 response_class = HTTPNotFound
372 elif 'x-object-segment' not in request.headers and \
373 file.metadata.get('X-Object-Type') == 'manifest':
374 deleted_manifest = pickle.loads(''.join(iter(file)))
375 file.close()
519 metadata = {376 metadata = {
520 'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True,377 'X-Timestamp': request.headers['X-Timestamp'], 'deleted': True,
521 }378 }
522 with file.mkstemp() as (fd, tmppath):379 with file.mkstemp() as (fd, tmppath):
380 if deleted_manifest:
381 # Write out a janitor operation to clean up the deleted
382 # multi-segment object. Note that setting the
383 # segment_last_deleted = -1 will cause the object-janitor to
384 # start removing the segments immediately rather than waiting
385 # segment_reclaim_age (otherwise it can't tell the difference
386 # between a deleted manifest and manifest that just hasn't
387 # appeared yet).
388 df = DiskFile(self.devices, device, partition,
389 'Segment-Cleanup', deleted_manifest['x-timestamp'],
390 '%s/%s/%s' % (account, container, obj), datadir=JANITORDIR)
391 df.store_janitor_segment_cleanup(account, container, obj,
392 segment_count=(deleted_manifest['content-length'] /
393 deleted_manifest['x-segment-size']) + 1,
394 segment_last_deleted=-1)
523 file.put(fd, tmppath, metadata, extension='.ts')395 file.put(fd, tmppath, metadata, extension='.ts')
524 file.unlinkold(metadata['X-Timestamp'])396 file.unlinkold(metadata['X-Timestamp'])
525 self.container_update('DELETE', account, container, obj,397 if 'x-object-segment' not in request.headers:
526 request.headers, {'x-timestamp': metadata['X-Timestamp'],398 self.container_update('DELETE', account, container, obj,
527 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},399 request.headers, {'x-timestamp': metadata['X-Timestamp'],
528 device)400 'x-cf-trans-id': request.headers.get('x-cf-trans-id', '-')},
529 resp = response_class(request=request)401 device, partition)
530 return resp402 return response_class(request=request)
531403
532 def REPLICATE(self, request):404 def REPLICATE(self, request):
533 """405 """
@@ -542,7 +414,8 @@
542 content_type='text/plain')414 content_type='text/plain')
543 if self.mount_check and not check_mount(self.devices, device):415 if self.mount_check and not check_mount(self.devices, device):
544 return Response(status='507 %s is not mounted' % device)416 return Response(status='507 %s is not mounted' % device)
545 path = os.path.join(self.devices, device, DATADIR, partition)417 path = os.path.join(self.devices, device,
418 request.headers.get('x-data-dir', DATADIR), partition)
546 if not os.path.exists(path):419 if not os.path.exists(path):
547 mkdirs(path)420 mkdirs(path)
548 if suffix:421 if suffix:
549422
=== modified file 'swift/obj/updater.py'
--- swift/obj/updater.py 2010-09-23 16:09:30 +0000
+++ swift/obj/updater.py 2010-11-08 18:51:48 +0000
@@ -27,11 +27,25 @@
27from swift.common.ring import Ring27from swift.common.ring import Ring
28from swift.common.utils import get_logger, renamer, write_pickle28from swift.common.utils import get_logger, renamer, write_pickle
29from swift.common.daemon import Daemon29from swift.common.daemon import Daemon
30from swift.obj.server import ASYNCDIR30
31
32# Old-style async pending directory
33ASYNCDIR = 'async_pending'
3134
3235
33class ObjectUpdater(Daemon):36class ObjectUpdater(Daemon):
34 """Update object information in container listings."""37 """
38 Update object information in container listings based on postponed
39 operations stored in the old-style async pending directory.
40
41 After upgrade, no new operations will be stored in this old-style async
42 pending directory. Once this daemon empties those directories of all
43 operations, the daemon may be disabled and the directories removed.
44
45 In a future release of Swift, this daemon will be removed.
46
47 The new functionality is in swift.obj.janitor.
48 """
3549
36 def __init__(self, conf):50 def __init__(self, conf):
37 self.conf = conf51 self.conf = conf
3852
=== modified file 'swift/proxy/server.py'
--- swift/proxy/server.py 2010-11-05 14:47:43 +0000
+++ swift/proxy/server.py 2010-11-08 18:51:48 +0000
@@ -14,21 +14,22 @@
14# limitations under the License.14# limitations under the License.
1515
16from __future__ import with_statement16from __future__ import with_statement
17import cPickle as pickle
17import mimetypes18import mimetypes
18import os19import os
19import time20import time
20import traceback21import traceback
21from ConfigParser import ConfigParser22from ConfigParser import ConfigParser
23from hashlib import md5
22from urllib import unquote, quote24from urllib import unquote, quote
23import uuid25import uuid
24import functools26import functools
2527
26from eventlet.timeout import Timeout28from eventlet.timeout import Timeout
27from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \29from webob.exc import HTTPBadRequest, HTTPCreated, HTTPInternalServerError, \
28 HTTPNotFound, HTTPPreconditionFailed, \30 HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
29 HTTPRequestTimeout, HTTPServiceUnavailable, \31 HTTPRequestEntityTooLarge, HTTPRequestTimeout, HTTPServerError, \
30 HTTPUnprocessableEntity, HTTPRequestEntityTooLarge, HTTPServerError, \32 HTTPServiceUnavailable, HTTPUnprocessableEntity, status_map
31 status_map
32from webob import Request, Response33from webob import Request, Response
3334
34from swift.common.ring import Ring35from swift.common.ring import Ring
@@ -37,7 +38,7 @@
37from swift.common.bufferedhttp import http_connect38from swift.common.bufferedhttp import http_connect
38from swift.common.constraints import check_metadata, check_object_creation, \39from swift.common.constraints import check_metadata, check_object_creation, \
39 check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \40 check_utf8, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH, \
40 MAX_FILE_SIZE41 PICKLE_PROTOCOL
41from swift.common.exceptions import ChunkReadTimeout, \42from swift.common.exceptions import ChunkReadTimeout, \
42 ChunkWriteTimeout, ConnectionTimeout43 ChunkWriteTimeout, ConnectionTimeout
4344
@@ -89,6 +90,144 @@
89 return wrapped90 return wrapped
9091
9192
93class SegmentedIterable(object):
94 """
95 Iterable that returns the object contents for a segmented object in Swift.
96
97 In addition to these params, you can also set the `response` attr just
98 after creating the SegmentedIterable and it will update the response's
99 `bytes_transferred` value (used to log the size of the request).
100
101 :param controller: The ObjectController instance to work with.
102 :param content_length: The total length of the object.
103 :param segment_size: The length of each segment (except perhaps the last)
104 of the object.
105 :param timestamp: The X-Timestamp of the object's segments (set on the PUT,
106 not changed on the POSTs).
107 """
108
109 def __init__(self, controller, content_length, segment_size, timestamp):
110 self.controller = controller
111 self.content_length = content_length
112 self.segment_size = segment_size
113 self.timestamp = timestamp
114 self.position = 0
115 self.segment = -1
116 self.segment_iter = None
117 self.response = None
118
119 def load_next_segment(self):
120 """ Loads the self.segment_iter with the next segment's contents. """
121 self.segment += 1
122 if self.segment:
123 ring_object_name = '%s/%s/%s' % (self.controller.object_name,
124 self.timestamp, self.segment)
125 else:
126 ring_object_name = self.controller.object_name
127 partition, nodes = self.controller.app.object_ring.get_nodes(
128 self.controller.account_name, self.controller.container_name,
129 ring_object_name)
130 path = '/%s/%s/%s' % (self.controller.account_name,
131 self.controller.container_name, self.controller.object_name)
132 req = Request.blank(path, headers={'X-Object-Segment': self.segment,
133 'X-Object-Segment-Timestamp': self.timestamp})
134 resp = self.controller.GETorHEAD_base(req, 'Object',
135 partition, self.controller.iter_nodes(partition, nodes,
136 self.controller.app.object_ring), path,
137 self.controller.app.object_ring.replica_count)
138 if resp.status_int // 100 != 2:
139 raise Exception(
140 'Could not load segment %s of %s' % (self.segment, path))
141 self.segment_iter = resp.app_iter
142
143 def __iter__(self):
144 """ Standard iterator function that returns the object's contents. """
145 while self.position < self.content_length:
146 if not self.segment_iter:
147 self.load_next_segment()
148 while True:
149 with ChunkReadTimeout(self.controller.app.node_timeout):
150 try:
151 chunk = self.segment_iter.next()
152 break
153 except StopIteration:
154 self.load_next_segment()
155 if self.position + len(chunk) > self.content_length:
156 chunk = chunk[:self.content_length - self.position]
157 self.position += len(chunk)
158 if self.response:
159 self.response.bytes_transferred = \
160 getattr(self.response, 'bytes_transferred', 0) + len(chunk)
161 yield chunk
162
163 def app_iter_range(self, start, stop):
164 """
165 Non-standard iterator function for use with Webob in serving Range
166 requests more quickly.
167
168 .. note::
169
170 This currently helps on speed by jumping to the proper segment to
171 start with (and ending without reading the trailing segments, but
172 that already happened technically with __iter__).
173
174 But, what it does not do yet is issue a Range request with the
175 first segment to allow the object server to seek to the segment
176 start point.
177
178 Instead, it just reads and throws away all leading segment data.
179 Since segments are 2G by default, it'll have to transfer the whole
180 2G from the object server to the proxy server even if it only needs
181 the last byte. In practice, this should happen fairly quickly
182 relative to how long requests take for these very large files; but
183 it's still wasteful.
184
185 Anyway, it shouldn't be too hard to implement, I just want to keep
186 the complexity down for now.
187
188 :param start: The first byte (zero-based) to return. None for 0.
189 :param stop: The last byte (zero-based) to return. None for end.
190 """
191 if start is None:
192 start = 0
193 if start:
194 self.segment = (start / self.segment_size) - 1
195 self.load_next_segment()
196 self.position = self.segment * self.segment_size
197 segment_start = start - (self.segment * self.segment_size)
198 while segment_start:
199 with ChunkReadTimeout(self.controller.app.node_timeout):
200 chunk = self.segment_iter.next()
201 self.position += len(chunk)
202 if len(chunk) > segment_start:
203 chunk = chunk[segment_start:]
204 if self.response:
205 self.response.bytes_transferred = \
206 getattr(self.response, 'bytes_transferred', 0) + \
207 len(chunk)
208 yield chunk
209 segment_start = 0
210 else:
211 segment_start -= len(chunk)
212 if stop is not None:
213 length = stop - start
214 else:
215 length = None
216 for chunk in self:
217 if length is not None:
218 length -= len(chunk)
219 if length < 0:
220 # bytes_transferred had len(chunk) added by __iter__ so we
221 # need to subtract what we aren't going to use of the chunk
222 if self.response:
223 self.response.bytes_transferred = \
224 getattr(self.response, 'bytes_transferred',
225 length) + length
226 yield chunk[:length]
227 break
228 yield chunk
229
230
92def get_container_memcache_key(account, container):231def get_container_memcache_key(account, container):
93 path = '/%s/%s' % (account, container)232 path = '/%s/%s' % (account, container)
94 return 'container%s' % path233 return 'container%s' % path
@@ -518,11 +657,56 @@
518 aresp = req.environ['swift.authorize'](req)657 aresp = req.environ['swift.authorize'](req)
519 if aresp:658 if aresp:
520 return aresp659 return aresp
660 # This is bit confusing, so an explanation:
661 # * First we attempt the GET/HEAD normally, as this is the usual case.
662 # * If the request was a Range request and gave us a 416 Unsatisfiable
663 # response, we might be trying to do an invalid Range on a manifest
664 # object, so we try again with no Range.
665 # * If it turns out we have a manifest object, and we had a Range
666 # request originally that actually succeeded or we had a HEAD
667 # request, we have to do the request again as a full GET because
668 # we'll need the whole manifest.
669 # * Finally, if we had a manifest object, we pass it and the request
670 # off to GETorHEAD_segmented; otherwise we just return the response.
521 partition, nodes = self.app.object_ring.get_nodes(671 partition, nodes = self.app.object_ring.get_nodes(
522 self.account_name, self.container_name, self.object_name)672 self.account_name, self.container_name, self.object_name)
523 return self.GETorHEAD_base(req, 'Object', partition,673 resp = mresp = self.GETorHEAD_base(req, 'Object', partition,
674 self.iter_nodes(partition, nodes, self.app.object_ring),
675 req.path_info, self.app.object_ring.replica_count)
676 range_value = None
677 if mresp.status_int == 416:
678 range_value = req.range
679 req.range = None
680 mresp = self.GETorHEAD_base(req, 'Object', partition,
524 self.iter_nodes(partition, nodes, self.app.object_ring),681 self.iter_nodes(partition, nodes, self.app.object_ring),
525 req.path_info, self.app.object_ring.replica_count)682 req.path_info, self.app.object_ring.replica_count)
683 if mresp.status_int // 100 != 2:
684 return resp
685 if 'x-object-type' in mresp.headers:
686 if mresp.headers['x-object-type'] == 'manifest':
687 if req.method == 'HEAD':
688 req.method = 'GET'
689 mresp = self.GETorHEAD_base(req, 'Object', partition,
690 self.iter_nodes(partition, nodes,
691 self.app.object_ring), req.path_info,
692 self.app.object_ring.replica_count)
693 if mresp.status_int // 100 != 2:
694 return mresp
695 req.method = 'HEAD'
696 elif req.range:
697 range_value = req.range
698 req.range = None
699 mresp = self.GETorHEAD_base(req, 'Object', partition,
700 self.iter_nodes(partition, nodes,
701 self.app.object_ring), req.path_info,
702 self.app.object_ring.replica_count)
703 if mresp.status_int // 100 != 2:
704 return mresp
705 if range_value:
706 req.range = range_value
707 return self.GETorHEAD_segmented(req, mresp)
708 return HTTPNotFound(request=req)
709 return resp
526710
527 @public711 @public
528 @delay_denial712 @delay_denial
@@ -536,6 +720,32 @@
536 """Handler for HTTP HEAD requests."""720 """Handler for HTTP HEAD requests."""
537 return self.GETorHEAD(req)721 return self.GETorHEAD(req)
538722
723 def GETorHEAD_segmented(self, req, mresp):
724 """
725 Performs a GET for a segmented object.
726
727 :param req: The webob.Request to process.
728 :param mresp: The webob.Response for the original manifest request.
729 :returns: webob.Response object.
730 """
731 manifest = pickle.loads(''.join(mresp.app_iter))
732 content_length = int(manifest['content-length'])
733 segment_size = int(manifest['x-segment-size'])
734 headers = dict(mresp.headers)
735 headers.update(manifest)
736 del headers['x-segment-size']
737 resp = Response(app_iter=SegmentedIterable(self, content_length,
738 segment_size, manifest['x-timestamp']), headers=headers,
739 request=req, conditional_response=True)
740 resp.headers['etag'] = manifest['etag'].strip('"')
741 resp.last_modified = mresp.last_modified
742 resp.content_length = int(manifest['content-length'])
743 resp.content_type = manifest['content-type']
744 if 'content-encoding' in manifest:
745 resp.content_encoding = manifest['content-encoding']
746 resp.app_iter.response = req.get_response(resp)
747 return resp.app_iter.response
748
539 @public749 @public
540 @delay_denial750 @delay_denial
541 def POST(self, req):751 def POST(self, req):
@@ -609,7 +819,8 @@
609 req.headers['Content-Type'] = 'application/octet-stream'819 req.headers['Content-Type'] = 'application/octet-stream'
610 else:820 else:
611 req.headers['Content-Type'] = guessed_type821 req.headers['Content-Type'] = guessed_type
612 error_response = check_object_creation(req, self.object_name)822 error_response = check_object_creation(req, self.object_name,
823 self.app.max_object_size)
613 if error_response:824 if error_response:
614 return error_response825 return error_response
615 conns = []826 conns = []
@@ -654,11 +865,50 @@
654 if k.lower().startswith('x-object-meta-'):865 if k.lower().startswith('x-object-meta-'):
655 new_req.headers[k] = v866 new_req.headers[k] = v
656 req = new_req867 req = new_req
868 if req.headers.get('transfer-encoding') == 'chunked' or \
869 req.content_length > self.app.segment_size:
870 resp = self.PUT_segmented_object(req, data_source, partition,
871 nodes, container_partition, containers)
872 else:
873 resp = self.PUT_whole_object(req, data_source, partition, nodes,
874 container_partition, containers)
875 if source_header:
876 resp.headers['X-Copied-From'] = quote(
877 source_header.split('/', 2)[2])
878 for k, v in req.headers.items():
879 if k.lower().startswith('x-object-meta-'):
880 resp.headers[k] = v
881 # reset the bytes, since the user didn't actually send anything
882 req.bytes_transferred = 0
883 resp.last_modified = float(req.headers['X-Timestamp'])
884 return resp
885
886 def PUT_whole_object(self, req, data_source, partition, nodes,
887 container_partition=None, containers=None):
888 """
889 Performs a PUT for a whole object (one with a content-length <=
890 self.app.segment_size).
891
892 :param req: The webob.Request to process.
893 :param data_source: An iterator providing the data to store.
894 :param partition: The object ring partition the object falls on.
895 :param nodes: The object ring nodes the object falls on.
896 :param container_partition: The container ring partition the container
897 for the object falls on, None if the
898 container is not to be updated.
899 :param containers: The container ring nodes the container for the
900 object falls on, None if the container is not to be
901 updated.
902 :returns: webob.Response object.
903 """
904 conns = []
905 update_containers = containers is not None
657 for node in self.iter_nodes(partition, nodes, self.app.object_ring):906 for node in self.iter_nodes(partition, nodes, self.app.object_ring):
658 container = containers.pop()907 if update_containers:
659 req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container908 container = containers.pop()
660 req.headers['X-Container-Partition'] = container_partition909 req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container
661 req.headers['X-Container-Device'] = container['device']910 req.headers['X-Container-Partition'] = container_partition
911 req.headers['X-Container-Device'] = container['device']
662 req.headers['Expect'] = '100-continue'912 req.headers['Expect'] = '100-continue'
663 resp = conn = None913 resp = conn = None
664 if not self.error_limited(node):914 if not self.error_limited(node):
@@ -676,12 +926,14 @@
676 if conn and resp:926 if conn and resp:
677 if resp.status == 100:927 if resp.status == 100:
678 conns.append(conn)928 conns.append(conn)
679 if not containers:929 if (update_containers and not containers) or \
930 len(conns) == len(nodes):
680 break931 break
681 continue932 continue
682 elif resp.status == 507:933 elif resp.status == 507:
683 self.error_limit(node)934 self.error_limit(node)
684 containers.insert(0, container)935 if update_containers:
936 containers.insert(0, container)
685 if len(conns) <= len(nodes) / 2:937 if len(conns) <= len(nodes) / 2:
686 self.app.logger.error(938 self.app.logger.error(
687 'Object PUT returning 503, %s/%s required connections, '939 'Object PUT returning 503, %s/%s required connections, '
@@ -701,7 +953,7 @@
701 break953 break
702 len_chunk = len(chunk)954 len_chunk = len(chunk)
703 req.bytes_transferred += len_chunk955 req.bytes_transferred += len_chunk
704 if req.bytes_transferred > MAX_FILE_SIZE:956 if req.bytes_transferred > self.app.max_object_size:
705 return HTTPRequestEntityTooLarge(request=req)957 return HTTPRequestEntityTooLarge(request=req)
706 for conn in list(conns):958 for conn in list(conns):
707 try:959 try:
@@ -767,18 +1019,129 @@
767 statuses.append(503)1019 statuses.append(503)
768 reasons.append('')1020 reasons.append('')
769 bodies.append('')1021 bodies.append('')
770 resp = self.best_response(req, statuses, reasons, bodies, 'Object PUT',1022 return self.best_response(req, statuses, reasons, bodies, 'Object PUT',
771 etag=etag)1023 etag=etag)
772 if source_header:1024
773 resp.headers['X-Copied-From'] = quote(1025 def PUT_segmented_object(self, req, data_source, partition, nodes,
774 source_header.split('/', 2)[2])1026 container_partition, containers):
775 for k, v in req.headers.items():1027 """
776 if k.lower().startswith('x-object-meta-'):1028 Performs a PUT for a segmented object (one with a content-length >
777 resp.headers[k] = v1029 self.app.segment_size).
778 # reset the bytes, since the user didn't actually send anything1030
779 req.bytes_transferred = 01031 :param req: The webob.Request to process.
780 resp.last_modified = float(req.headers['X-Timestamp'])1032 :param data_source: An iterator providing the data to store.
781 return resp1033 :param partition: The object ring partition the object falls on.
1034 :param nodes: The object ring nodes the object falls on.
1035 :param container_partition: The container ring partition the container
1036 for the object falls on.
1037 :param containers: The container ring nodes the container for the
1038 object falls on.
1039 :returns: webob.Response object.
1040 """
1041 req.bytes_transferred = 0
1042 leftover_chunk = [None]
1043 etag = md5()
1044
1045 def segment_iter():
1046 amount_given = 0
1047 while amount_given < self.app.segment_size:
1048 if leftover_chunk[0]:
1049 chunk = leftover_chunk[0]
1050 leftover_chunk[0] = None
1051 else:
1052 with ChunkReadTimeout(self.app.client_timeout):
1053 chunk = data_source.next()
1054 req.bytes_transferred += len(chunk)
1055 etag.update(chunk)
1056 if amount_given + len(chunk) > self.app.segment_size:
1057 yield chunk[:self.app.segment_size - amount_given]
1058 leftover_chunk[0] = \
1059 chunk[self.app.segment_size - amount_given:]
1060 amount_given = self.app.segment_size
1061 else:
1062 yield chunk
1063 amount_given += len(chunk)
1064
1065 def segment_iter_iter():
1066 while True:
1067 if not leftover_chunk[0]:
1068 with ChunkReadTimeout(self.app.client_timeout):
1069 leftover_chunk[0] = data_source.next()
1070 req.bytes_transferred += len(leftover_chunk[0])
1071 etag.update(leftover_chunk[0])
1072 yield segment_iter()
1073
1074 segment_number = 0
1075 chunked = req.headers.get('transfer-encoding') == 'chunked'
1076 if not chunked:
1077 amount_left = req.content_length
1078 headers = {'X-Timestamp': req.headers['X-Timestamp'],
1079 'Content-Type': req.headers['content-type'],
1080 'X-Object-Type': 'segment'}
1081 for segment_source in segment_iter_iter():
1082 if chunked:
1083 headers['Transfer-Encoding'] = 'chunked'
1084 if segment_number == 0:
1085 headers['X-Object-Segment-If-Length'] = \
1086 self.app.segment_size
1087 elif amount_left > self.app.segment_size:
1088 headers['Content-Length'] = self.app.segment_size
1089 else:
1090 headers['Content-Length'] = amount_left
1091 headers['X-Object-Segment'] = segment_number
1092 segment_req = Request.blank(req.path_info,
1093 environ={'REQUEST_METHOD': 'PUT'}, headers=headers)
1094 if 'X-Object-Segment-If-Length' in headers:
1095 del headers['X-Object-Segment-If-Length']
1096 if segment_number:
1097 ring_object_name = '%s/%s/%s' % (self.object_name,
1098 req.headers['x-timestamp'], segment_number)
1099 else:
1100 ring_object_name = self.object_name
1101 segment_partition, segment_nodes = self.app.object_ring.get_nodes(
1102 self.account_name, self.container_name, ring_object_name)
1103 resp = self.PUT_whole_object(segment_req, segment_source,
1104 segment_partition, segment_nodes)
1105 if resp.status_int // 100 == 4:
1106 return resp
1107 elif resp.status_int // 100 != 2:
1108 return HTTPServiceUnavailable(request=req,
1109 body='Unable to complete very large file operation.')
1110 if segment_number == 0 and \
1111 req.bytes_transferred < self.app.segment_size:
1112 return HTTPCreated(request=req, etag=etag.hexdigest())
1113 if not chunked:
1114 amount_left -= self.app.segment_size
1115 segment_number += 1
1116 etag = etag.hexdigest()
1117 if 'etag' in req.headers and req.headers['etag'].lower() != etag:
1118 return HTTPUnprocessableEntity(request=req)
1119 manifest = {'x-timestamp': req.headers['x-timestamp'],
1120 'content-length': req.bytes_transferred,
1121 'content-type': req.headers['content-type'],
1122 'x-segment-size': self.app.segment_size,
1123 'etag': etag}
1124 if 'content-encoding' in req.headers:
1125 manifest['content-encoding'] = req.headers['content-encoding']
1126 manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
1127 headers = {'X-Timestamp': req.headers['X-Timestamp'],
1128 'Content-Type': req.headers['content-type'],
1129 'Content-Length': len(manifest),
1130 'X-Object-Type': 'manifest',
1131 'X-Object-Length': req.bytes_transferred}
1132 headers.update(i for i in req.headers.iteritems()
1133 if i[0].lower().startswith('x-object-meta-') and len(i[0]) > 14)
1134 manifest_req = Request.blank(req.path_info,
1135 environ={'REQUEST_METHOD': 'PUT'}, body=manifest, headers=headers)
1136 manifest_source = iter(lambda:
1137 manifest_req.body_file.read(self.app.client_chunk_size), '')
1138 resp = self.PUT_whole_object(manifest_req, manifest_source, partition,
1139 nodes, container_partition=container_partition,
1140 containers=containers)
1141 if resp.status_int // 100 != 2:
1142 return HTTPServiceUnavailable(request=req,
1143 body='Unable to complete very large file operation.')
1144 return HTTPCreated(request=req, etag=etag)
7821145
783 @public1146 @public
784 @delay_denial1147 @delay_denial
@@ -1233,6 +1596,8 @@
1233 if conf is None:1596 if conf is None:
1234 conf = {}1597 conf = {}
1235 swift_dir = conf.get('swift_dir', '/etc/swift')1598 swift_dir = conf.get('swift_dir', '/etc/swift')
1599 self.max_object_size = int(conf.get('max_object_size', 107374182400))
1600 self.segment_size = int(conf.get('segment_size', 2147483647))
1236 self.node_timeout = int(conf.get('node_timeout', 10))1601 self.node_timeout = int(conf.get('node_timeout', 10))
1237 self.conn_timeout = float(conf.get('conn_timeout', 0.5))1602 self.conn_timeout = float(conf.get('conn_timeout', 0.5))
1238 self.client_timeout = int(conf.get('client_timeout', 60))1603 self.client_timeout = int(conf.get('client_timeout', 60))
12391604
=== modified file 'test/functional/sample.conf'
--- test/functional/sample.conf 2010-09-09 17:24:25 +0000
+++ test/functional/sample.conf 2010-11-08 18:51:48 +0000
@@ -1,8 +1,15 @@
1# sample config
2auth_host = 127.0.0.11auth_host = 127.0.0.1
3auth_port = 110002auth_port = 11000
4auth_ssl = no3auth_ssl = no
54
5# The maximum object size the cluster allows (set in the proxy server's conf)
6max_object_size = 107374182400
7
8# The file segment size for the cluster (set in the proxy server's conf)
9# Set to 0 for no segment size testing (recommended if the segment size is
10# quite large and you don't want to spend the time testing it)
11segment_size = 0
12
6# Primary functional test account (needs admin access to the account)13# Primary functional test account (needs admin access to the account)
7account = test14account = test
8username = tester15username = tester
916
=== modified file 'test/functional/tests.py'
--- test/functional/tests.py 2010-10-29 20:30:34 +0000
+++ test/functional/tests.py 2010-11-08 18:51:48 +0000
@@ -1092,7 +1092,7 @@
1092 self.assert_(file.read(hdrs={'Range': r}) == data[0:1000])1092 self.assert_(file.read(hdrs={'Range': r}) == data[0:1000])
10931093
1094 def testFileSizeLimit(self):1094 def testFileSizeLimit(self):
1095 limit = 5*2**30 + 21095 limit = int(config.get('max_object_size', 107374182400))
1096 tsecs = 31096 tsecs = 3
10971097
1098 for i in (limit-100, limit-10, limit-1, limit, limit+1, limit+10,1098 for i in (limit-100, limit-10, limit-1, limit, limit+1, limit+10,
10991099
=== modified file 'test/probe/common.py'
--- test/probe/common.py 2010-09-12 00:03:09 +0000
+++ test/probe/common.py 2010-11-08 18:51:48 +0000
@@ -88,7 +88,7 @@
88 for p in ps:88 for p in ps:
89 p.wait()89 p.wait()
90 ps = []90 ps = []
91 for job in ('container-updater', 'object-updater'):91 for job in ('container-updater', 'object-janitor'):
92 for n in xrange(1, 5):92 for n in xrange(1, 5):
93 ps.append(Popen(['swift-%s' % job,93 ps.append(Popen(['swift-%s' % job,
94 '/etc/swift/%s-server/%d.conf' %94 '/etc/swift/%s-server/%d.conf' %
9595
=== modified file 'test/probe/test_object_async_update.py'
--- test/probe/test_object_async_update.py 2010-09-06 04:06:16 +0000
+++ test/probe/test_object_async_update.py 2010-11-08 18:51:48 +0000
@@ -55,7 +55,7 @@
55 self.account, container)[1])55 self.account, container)[1])
56 ps = []56 ps = []
57 for n in xrange(1, 5):57 for n in xrange(1, 5):
58 ps.append(Popen(['swift-object-updater',58 ps.append(Popen(['swift-object-janitor',
59 '/etc/swift/object-server/%d.conf' % n, 'once']))59 '/etc/swift/object-server/%d.conf' % n, 'once']))
60 for p in ps:60 for p in ps:
61 p.wait()61 p.wait()
6262
=== modified file 'test/unit/common/test_constraints.py'
--- test/unit/common/test_constraints.py 2010-08-16 22:30:27 +0000
+++ test/unit/common/test_constraints.py 2010-11-08 18:51:48 +0000
@@ -90,22 +90,20 @@
90 headers=headers), 'object'), HTTPBadRequest))90 headers=headers), 'object'), HTTPBadRequest))
9191
92 def test_check_object_creation_content_length(self):92 def test_check_object_creation_content_length(self):
93 headers = {'Content-Length': str(constraints.MAX_FILE_SIZE),93 headers = {'Content-Length': '1024', 'Content-Type': 'text/plain'}
94 'Content-Type': 'text/plain'}
95 self.assertEquals(constraints.check_object_creation(Request.blank('/',94 self.assertEquals(constraints.check_object_creation(Request.blank('/',
96 headers=headers), 'object_name'), None)95 headers=headers), 'object_name', 1024), None)
97 headers = {'Content-Length': str(constraints.MAX_FILE_SIZE + 1),96 headers = {'Content-Length': '1025', 'Content-Type': 'text/plain'}
98 'Content-Type': 'text/plain'}
99 self.assert_(isinstance(constraints.check_object_creation(97 self.assert_(isinstance(constraints.check_object_creation(
100 Request.blank('/', headers=headers), 'object_name'),98 Request.blank('/', headers=headers), 'object_name', 1024),
101 HTTPRequestEntityTooLarge))99 HTTPRequestEntityTooLarge))
102 headers = {'Transfer-Encoding': 'chunked',100 headers = {'Transfer-Encoding': 'chunked',
103 'Content-Type': 'text/plain'}101 'Content-Type': 'text/plain'}
104 self.assertEquals(constraints.check_object_creation(Request.blank('/',102 self.assertEquals(constraints.check_object_creation(Request.blank('/',
105 headers=headers), 'object_name'), None)103 headers=headers), 'object_name', 1024), None)
106 headers = {'Content-Type': 'text/plain'}104 headers = {'Content-Type': 'text/plain'}
107 self.assert_(isinstance(constraints.check_object_creation(105 self.assert_(isinstance(constraints.check_object_creation(
108 Request.blank('/', headers=headers), 'object_name'),106 Request.blank('/', headers=headers), 'object_name', 1024),
109 HTTPLengthRequired))107 HTTPLengthRequired))
110108
111 def test_check_object_creation_name_length(self):109 def test_check_object_creation_name_length(self):
112110
=== added file 'test/unit/obj/test_diskfile.py'
--- test/unit/obj/test_diskfile.py 1970-01-01 00:00:00 +0000
+++ test/unit/obj/test_diskfile.py 2010-11-08 18:51:48 +0000
@@ -0,0 +1,203 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import cPickle as pickle
17import os
18import sys
19import unittest
20from nose import SkipTest
21from shutil import rmtree
22from StringIO import StringIO
23from time import time
24
25from xattr import setxattr
26
27from swift.common.constraints import PICKLE_PROTOCOL
28from swift.common.utils import mkdirs, normalize_timestamp
29from swift.obj.diskfile import DiskFile, hash_suffix, JANITORDIR, \
30 METADATA_KEY
31
32
33class TestDiskFile(unittest.TestCase):
34 """ Test swift.obj.diskfile """
35
36 def setUp(self):
37 """ Set up for testing swift.obj.diskfile """
38 self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
39 if not self.path_to_test_xfs or \
40 not os.path.exists(self.path_to_test_xfs):
41 print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
42 'pointing to a valid directory.\n' \
43 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
44 'system for testing.'
45 self.testdir = '/tmp/SWIFTUNITTEST'
46 else:
47 self.testdir = os.path.join(self.path_to_test_xfs,
48 'tmp_test_obj_diskfile')
49 mkdirs(self.testdir)
50 rmtree(self.testdir)
51 mkdirs(os.path.join(self.testdir, 'sda1'))
52 mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
53
54 def tearDown(self):
55 """ Tear down for testing swift.obj.diskfile """
56 rmtree(self.testdir)
57
58 def test_disk_file_app_iter_corners(self):
59 if not self.path_to_test_xfs:
60 raise SkipTest
61 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
62 mkdirs(df.datadir)
63 f = open(os.path.join(df.datadir,
64 normalize_timestamp(time()) + '.data'), 'wb')
65 f.write('1234567890')
66 setxattr(f.fileno(), METADATA_KEY, pickle.dumps({}, PICKLE_PROTOCOL))
67 f.close()
68 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
69 keep_data_fp=True)
70 it = df.app_iter_range(0, None)
71 sio = StringIO()
72 for chunk in it:
73 sio.write(chunk)
74 self.assertEquals(sio.getvalue(), '1234567890')
75
76 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
77 keep_data_fp=True)
78 it = df.app_iter_range(5, None)
79 sio = StringIO()
80 for chunk in it:
81 sio.write(chunk)
82 self.assertEquals(sio.getvalue(), '67890')
83
84 def test_disk_file_mkstemp_creates_dir(self):
85 tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
86 os.rmdir(tmpdir)
87 with DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp():
88 self.assert_(os.path.exists(tmpdir))
89
90 def test_hash_suffix_creates_janitor_jobs(self):
91 if not self.path_to_test_xfs:
92 raise SkipTest
93 # Ensure the janitor job we expect to create is not there right now.
94 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
95 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
96 self.assert_(df.is_deleted())
97 # Set up manifest file to be tombstoned
98 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
99 with df.mkstemp() as (fd, tmppath):
100 os.write(fd, pickle.dumps({'x-timestamp': normalize_timestamp(1),
101 'content-length': 1234, 'content-type': 'text/plain',
102 'x-segment-size': 123,
103 'etag': 'd41d8cd98f00b204e9800998ecf8427e'},
104 protocol=PICKLE_PROTOCOL))
105 df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(2),
106 'X-Object-Type': 'manifest'})
107 # Make tombstone DiskFile didn't create, such as one rsynced over.
108 open(os.path.join(df.datadir, normalize_timestamp(3) + '.ts'), 'wb')
109 # Finally, we call what we want to test.
110 hash_suffix(os.path.dirname(df.datadir), 604800)
111 # Ensure the janitor job got created.
112 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
113 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
114 self.assert_(not df.is_deleted())
115
116 def test_segment_info_overrides_datadir(self):
117 if not self.path_to_test_xfs:
118 raise SkipTest
119 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
120 datadir1 = df.datadir.split('/')[6]
121 df = DiskFile(self.testdir, 'sda1', '1', 'a', 'c', 'o')
122 datadir2 = df.datadir.split('/')[6]
123 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o', segment=0)
124 datadir3 = df.datadir.split('/')[6]
125 self.assertEquals(datadir1, datadir2)
126 self.assertNotEquals(datadir1, datadir3)
127
128 def test_no_longer_segment(self):
129 if not self.path_to_test_xfs:
130 raise SkipTest
131 # Normal case
132 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
133 datadir1 = df.datadir
134 with df.mkstemp() as (fd, tmppath):
135 df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)})
136 self.assert_(os.path.exists(datadir1))
137 # Normal case with no_longer_segment (doesn't make sense, but shouldn't
138 # blow up)
139 df = DiskFile(self.testdir, 'sda1', '1', 'a', 'c', 'o')
140 datadir1 = df.datadir
141 with df.mkstemp() as (fd, tmppath):
142 df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)},
143 no_longer_segment=True)
144 self.assert_(os.path.exists(datadir1))
145 # Segment case
146 df = DiskFile(self.testdir, 'sda1', '2', 'a', 'c', 'o', segment=0)
147 datadir1 = df.datadir
148 with df.mkstemp() as (fd, tmppath):
149 df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)})
150 self.assert_(os.path.exists(datadir1))
151 # Segment case with no_longer_segment
152 df = DiskFile(self.testdir, 'sda1', '3', 'a', 'c', 'o')
153 normal_datadir1 = df.datadir
154 self.assert_(not os.path.exists(normal_datadir1))
155 df = DiskFile(self.testdir, 'sda1', '3', 'a', 'c', 'o', segment=0)
156 datadir1 = df.datadir
157 with df.mkstemp() as (fd, tmppath):
158 df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)},
159 no_longer_segment=True)
160 self.assert_(not os.path.exists(datadir1))
161 self.assert_(os.path.exists(normal_datadir1))
162
163 def test_tombstone(self):
164 if not self.path_to_test_xfs:
165 raise SkipTest
166 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
167 with df.mkstemp() as (fd, tmppath):
168 df.put(fd, tmppath, {'X-Timestamp': normalize_timestamp(1)})
169 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
170 self.assert_(not df.is_deleted())
171 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
172 df.tombstone(normalize_timestamp(2))
173 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
174 self.assert_(df.is_deleted())
175
176 def test_store_janitor_container_update(self):
177 if not self.path_to_test_xfs:
178 raise SkipTest
179 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
180 datadir=JANITORDIR)
181 df.store_janitor_container_update('PUT', 'a', 'c', 'o',
182 {'X-Timestamp': normalize_timestamp(1)}, [])
183 df = DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
184 datadir=JANITORDIR)
185 self.assertEquals(pickle.load(open(df.data_file, 'rb')),
186 {'op': 'PUT', 'account': 'a', 'container': 'c', 'obj': 'o',
187 'headers': {'X-Timestamp': '0000000001.00000'}, 'successes': []})
188
189 def test_store_janitor_segment_cleanup(self):
190 if not self.path_to_test_xfs:
191 raise SkipTest
192 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
193 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
194 df.store_janitor_segment_cleanup('a', 'c', 'o', 123, 45)
195 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
196 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
197 self.assertEquals(pickle.load(open(df.data_file, 'rb')),
198 {'account': 'a', 'container': 'c', 'obj': 'o',
199 'segment_count': 123, 'segment_last_deleted': 45})
200
201
202if __name__ == '__main__':
203 unittest.main()
0204
=== added file 'test/unit/obj/test_janitor.py'
--- test/unit/obj/test_janitor.py 1970-01-01 00:00:00 +0000
+++ test/unit/obj/test_janitor.py 2010-11-08 18:51:48 +0000
@@ -0,0 +1,433 @@
1# Copyright (c) 2010 OpenStack, LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import cPickle as pickle
17import os
18import sys
19import traceback
20import unittest
21from gzip import GzipFile
22from nose import SkipTest
23from shutil import rmtree
24from time import time
25
26from eventlet import spawn, TimeoutError, listen
27from eventlet.timeout import Timeout
28
29from swift.common.constraints import PICKLE_PROTOCOL
30from swift.common.ring import Ring, RingData
31from swift.common import utils
32from swift.common.utils import hash_path, normalize_timestamp, mkdirs
33from swift.obj import janitor as object_janitor
34from swift.obj.diskfile import DiskFile, JANITORDIR
35
36
37class TestObjectJanitor(unittest.TestCase):
38
39 def setUp(self):
40 utils.HASH_PATH_SUFFIX = 'endcap'
41 self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
42 if not self.path_to_test_xfs or \
43 not os.path.exists(self.path_to_test_xfs):
44 print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
45 'pointing to a valid directory.\n' \
46 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
47 'system for testing.'
48 self.testdir = '/tmp/SWIFTUNITTEST'
49 else:
50 self.testdir = os.path.join(self.path_to_test_xfs,
51 'tmp_test_object_server_ObjectController')
52 rmtree(self.testdir, ignore_errors=1)
53 os.mkdir(self.testdir)
54 pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
55 [{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
56 'zone': 0},
57 {'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
58 'zone': 2}], 30),
59 GzipFile(os.path.join(self.testdir, 'object.ring.gz'), 'wb'))
60 self.object_ring = Ring(os.path.join(self.testdir, 'object.ring.gz'))
61 pickle.dump(RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
62 [{'id': 0, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
63 'zone': 0},
64 {'id': 1, 'ip': '127.0.0.1', 'port': 1, 'device': 'sda1',
65 'zone': 2}], 30),
66 GzipFile(os.path.join(self.testdir, 'container.ring.gz'), 'wb'))
67 self.container_ring = \
68 Ring(os.path.join(self.testdir, 'container.ring.gz'))
69 self.devices_dir = os.path.join(self.testdir, 'devices')
70 os.mkdir(self.devices_dir)
71 self.sda1 = os.path.join(self.devices_dir, 'sda1')
72 os.mkdir(self.sda1)
73 os.mkdir(os.path.join(self.sda1, 'tmp'))
74
75 def tearDown(self):
76 rmtree(self.testdir, ignore_errors=1)
77
78 def test_creation(self):
79 if not self.path_to_test_xfs:
80 raise SkipTest
81 janitor = object_janitor.ObjectJanitor({
82 'devices': self.devices_dir,
83 'mount_check': 'false',
84 'swift_dir': self.testdir,
85 'interval': '1',
86 'concurrency': '2',
87 'node_timeout': '5',
88 })
89 self.assert_(hasattr(janitor, 'logger'))
90 self.assert_(janitor.logger is not None)
91 self.assertEquals(janitor.devices, self.devices_dir)
92 self.assertEquals(janitor.interval, 1)
93 self.assertEquals(janitor.concurrency, 2)
94 self.assertEquals(janitor.node_timeout, 5)
95 self.assert_(janitor.get_container_ring() is not None)
96
97 def test_run_once_container_update(self):
98 if not self.path_to_test_xfs:
99 raise SkipTest
100 janitor = object_janitor.ObjectJanitor({
101 'devices': self.devices_dir,
102 'mount_check': 'false',
103 'swift_dir': self.testdir,
104 'interval': '1',
105 'concurrency': '1',
106 'node_timeout': '15',
107 })
108 janitor.run_once()
109 janitor_dir = os.path.join(self.sda1, JANITORDIR)
110 os.mkdir(janitor_dir)
111 janitor.run_once()
112 self.assert_(os.path.exists(janitor_dir))
113
114 disk_file = DiskFile(self.devices_dir, 'sda1',
115 str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c',
116 'o', datadir=JANITORDIR)
117 ts = normalize_timestamp(1)
118 with disk_file.mkstemp() as (fd, tmppath):
119 os.write(fd, pickle.dumps({'op': 'PUT',
120 'account': 'a', 'container': 'c', 'obj': 'o',
121 'headers': {'X-Container-Timestamp': ts}},
122 PICKLE_PROTOCOL))
123 disk_file.put(fd, tmppath,
124 {'X-Op': 'Container-Update', 'X-Timestamp': ts})
125 janitor.run_once()
126 self.assert_(os.path.exists(os.path.join(disk_file.datadir,
127 ts + '.data')))
128
129 bindsock = listen(('127.0.0.1', 0))
130
131 def accepter(sock, return_code):
132 try:
133 with Timeout(3):
134 inc = sock.makefile('rb')
135 out = sock.makefile('wb')
136 out.write('HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
137 return_code)
138 out.flush()
139 self.assertEquals(inc.readline(),
140 'PUT /sda1/0/a/c/o HTTP/1.1\r\n')
141 headers = {}
142 line = inc.readline()
143 while line and line != '\r\n':
144 headers[line.split(':')[0].lower()] = \
145 line.split(':')[1].strip()
146 line = inc.readline()
147 self.assert_('x-container-timestamp' in headers)
148 except BaseException, err:
149 return err
150 return None
151
152 def accept(return_codes):
153 codes = iter(return_codes)
154 try:
155 events = []
156 for x in xrange(len(return_codes)):
157 with Timeout(3):
158 sock, addr = bindsock.accept()
159 events.append(
160 spawn(accepter, sock, codes.next()))
161 for event in events:
162 err = event.wait()
163 if err:
164 raise err
165 except BaseException, err:
166 return err
167 return None
168
169 event = spawn(accept, [201, 500])
170 for dev in janitor.get_container_ring().devs:
171 if dev is not None:
172 dev['port'] = bindsock.getsockname()[1]
173 janitor.run_once()
174 err = event.wait()
175 if err:
176 raise err
177 disk_file = DiskFile(self.devices_dir, 'sda1',
178 str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c',
179 'o', datadir=JANITORDIR)
180 self.assert_(not disk_file.is_deleted())
181 event = spawn(accept, [201])
182 janitor.run_once()
183 err = event.wait()
184 if err:
185 raise err
186 disk_file = DiskFile(self.devices_dir, 'sda1',
187 str(self.container_ring.get_nodes('a', 'c', 'o')[0]), 'a', 'c',
188 'o', datadir=JANITORDIR)
189 self.assert_(disk_file.is_deleted())
190
191 def _segment_cleanup_in_progress_helper(self, statuses, expect_success):
192 if not self.path_to_test_xfs:
193 raise SkipTest
194 janitor = object_janitor.ObjectJanitor({'devices': self.devices_dir,
195 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1',
196 'concurrency': '1', 'node_timeout': '15'})
197
198 # Quick test of connection refusals
199 df = DiskFile(self.devices_dir, 'sda1',
200 str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
201 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o',
202 datadir=JANITORDIR)
203 df.store_janitor_segment_cleanup('a', 'c', 'o', 2, 0)
204 janitor.run_once()
205 df = DiskFile(self.devices_dir, 'sda1',
206 str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
207 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o',
208 datadir=JANITORDIR)
209 self.assert_(not df.is_deleted())
210
211 bindsock = listen(('127.0.0.1', 0))
212 janitor.port = bindsock.getsockname()[1]
213
214 def accepter(sock, return_code):
215 try:
216 with Timeout(3):
217 inc = sock.makefile('rb')
218 out = sock.makefile('wb')
219 out.write('HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
220 return_code)
221 out.flush()
222 self.assertEquals(inc.readline(),
223 'DELETE /sda1/2/a/c/o HTTP/1.1\r\n')
224 headers = {}
225 line = inc.readline()
226 while line and line != '\r\n':
227 headers[line.split(':')[0].lower()] = \
228 line.split(':')[1].strip()
229 line = inc.readline()
230 self.assert_('x-object-segment-timestamp' in headers)
231 self.assertEquals(headers.get('x-object-segment'), '1')
232 except BaseException, err:
233 return err
234 return None
235
236 def accept(return_codes):
237 codes = iter(return_codes)
238 try:
239 events = []
240 for x in xrange(len(return_codes)):
241 with Timeout(3):
242 sock, addr = bindsock.accept()
243 events.append(
244 spawn(accepter, sock, codes.next()))
245 for event in events:
246 err = event.wait()
247 if err:
248 raise err
249 except BaseException, err:
250 return err
251 return None
252
253 event = spawn(accept, statuses)
254 for dev in janitor.get_object_ring().devs:
255 if dev is not None:
256 dev['port'] = bindsock.getsockname()[1]
257 janitor.run_once()
258 err = event.wait()
259 if err:
260 raise err
261 df = DiskFile(self.devices_dir, 'sda1',
262 str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
263 'Segment-Cleanup', normalize_timestamp(1), 'a/c/o',
264 datadir=JANITORDIR)
265 self.assertEquals(df.is_deleted(), expect_success)
266
267 def test_segment_cleanup_in_progress_happy_path(self):
268 self._segment_cleanup_in_progress_helper([204, 204], True)
269
270 def test_segment_cleanup_in_progress_one_failure(self):
271 self._segment_cleanup_in_progress_helper([204, 500], True)
272
273 def test_segment_cleanup_in_progress_all_failures(self):
274 self._segment_cleanup_in_progress_helper([500, 500], False)
275
276 def test_segment_cleanup_in_progress_all_not_found(self):
277 self._segment_cleanup_in_progress_helper([404, 404], True)
278
279 def test_segment_cleanup_in_progress_one_not_found_one_success(self):
280 self._segment_cleanup_in_progress_helper([404, 204], True)
281
282 def test_segment_cleanup_in_progress_one_not_found_one_failure(self):
283 self._segment_cleanup_in_progress_helper([404, 500], False)
284
285 def _segment_cleanup_fresh_start_helper(self, cleanup_timestamp,
286 existing_timestamp, statuses, expect_success):
287 if not self.path_to_test_xfs:
288 raise SkipTest
289 janitor = object_janitor.ObjectJanitor({'devices': self.devices_dir,
290 'mount_check': 'false', 'swift_dir': self.testdir, 'interval': '1',
291 'concurrency': '1', 'node_timeout': '15', 'segments_per_pass': 2})
292
293 # Quick test of connection refusals
294 df = DiskFile(self.devices_dir, 'sda1',
295 str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
296 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o',
297 datadir=JANITORDIR)
298 df.store_janitor_segment_cleanup('a', 'c', 'o', None, None)
299 janitor.run_once()
300 df = DiskFile(self.devices_dir, 'sda1',
301 str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
302 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o',
303 datadir=JANITORDIR)
304 self.assert_(not df.is_deleted())
305
306 bindsock = listen(('127.0.0.1', 0))
307 janitor.port = bindsock.getsockname()[1]
308
309 def accepter(sock, return_code):
310 try:
311 with Timeout(3):
312 inc = sock.makefile('rb')
313 out = sock.makefile('wb')
314 request = inc.readline()
315 if request.startswith('GET '):
316 self.assert_(request, 'GET /sda1/0/a/c/o HTTP/1.1\r\n')
317 if return_code == 200:
318 pickl = pickle.dumps({'x-timestamp':
319 normalize_timestamp(existing_timestamp)})
320 out.write('HTTP/1.1 %d OK\r\nContent-Length: '
321 '%s\r\nX-Object-Type: manifest\r\n\r\n' %
322 (return_code, len(pickl)))
323 out.write(pickl)
324 out.flush()
325 else:
326 out.write('HTTP/1.1 %d OK\r\nContent-Length: '
327 '0\r\n\r\n' % return_code)
328 out.flush()
329 return None
330 if request.startswith('HEAD '):
331 self.assert_(request,
332 'HEAD /sda1/2/a/c/o HTTP/1.1\r\n')
333 out.write('HTTP/1.1 %d OK\r\nContent-Length: '
334 '0\r\n\r\n' % return_code)
335 out.flush()
336 return None
337 else:
338 self.assert_(request,
339 'DELETE /sda1/2/a/c/o HTTP/1.1\r\n')
340 out.write(
341 'HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
342 return_code)
343 out.flush()
344 headers = {}
345 line = inc.readline()
346 while line and line != '\r\n':
347 headers[line.split(':')[0].lower()] = \
348 line.split(':')[1].strip()
349 line = inc.readline()
350 self.assert_('x-object-segment' in headers)
351 self.assert_('x-object-segment-timestamp' in headers)
352 except BaseException, err:
353 traceback.print_exc()
354 return err
355 return None
356
357 def accept(return_codes):
358 codes = iter(return_codes)
359 try:
360 events = []
361 for x in xrange(len(return_codes)):
362 with Timeout(3):
363 sock, addr = bindsock.accept()
364 events.append(
365 spawn(accepter, sock, codes.next()))
366 for event in events:
367 err = event.wait()
368 if err:
369 raise err
370 except BaseException, err:
371 return err
372 return None
373
374 event = spawn(accept, statuses)
375 for dev in janitor.get_object_ring().devs:
376 if dev is not None:
377 dev['port'] = bindsock.getsockname()[1]
378 janitor.run_once()
379 err = event.wait()
380 if err:
381 raise err
382 df = DiskFile(self.devices_dir, 'sda1',
383 str(self.object_ring.get_nodes('a', 'c', 'o')[0]),
384 'Segment-Cleanup', normalize_timestamp(cleanup_timestamp), 'a/c/o',
385 datadir=JANITORDIR)
386 self.assertEquals(df.is_deleted(), expect_success)
387
388 def test_segment_cleanup_fresh_start_happy_path(self):
389 self._segment_cleanup_fresh_start_helper(normalize_timestamp(1),
390 normalize_timestamp(1),
391 [404, 404, # Check for manifest
392 200, 200, 404, 404, # Check for segments (2 total)
393 204, 204, 204, 204], # Delete segments
394 True)
395
396 def test_segment_cleanup_fresh_start_manifest_exists(self):
397 t = time()
398 self._segment_cleanup_fresh_start_helper(normalize_timestamp(t),
399 normalize_timestamp(t), [200, 200], True)
400
401 def test_segment_cleanup_fresh_start_old_manifest_exists(self):
402 self._segment_cleanup_fresh_start_helper(normalize_timestamp(2),
403 normalize_timestamp(1),
404 [200, 200, # Check for manifest
405 200, 200, 404, 404, # Check for segments (2 total)
406 204, 204, 204, 204], # Delete segments
407 True)
408
409 def test_segment_cleanup_fresh_start_old_manifest_exists2(self):
410 self._segment_cleanup_fresh_start_helper(normalize_timestamp(time()),
411 normalize_timestamp(1), [200, 200], False)
412
413 def test_segment_cleanup_fresh_start_new_manifest_exists(self):
414 t = time()
415 self._segment_cleanup_fresh_start_helper(normalize_timestamp(t - 1),
416 normalize_timestamp(t),
417 [200, # Check for manifest
418 200, 200, 404, 404, # Check for segments (2 total)
419 204, 204, 204, 204], # Delete segments
420 True)
421
422 def test_segment_cleanup_fresh_start_stops_at_segments_per_pass(self):
423 self._segment_cleanup_fresh_start_helper(normalize_timestamp(1),
424 normalize_timestamp(1),
425 [404, 404, # Check for manifest
426 200, 200, 200, 404, 404, # Check for segments (3 total)
427 204, 204, 204, 204], # Delete segments (only 2 expected)
428 False)
429
430
431
432if __name__ == '__main__':
433 unittest.main()
0434
=== modified file 'test/unit/obj/test_server.py'
--- test/unit/obj/test_server.py 2010-10-13 21:26:43 +0000
+++ test/unit/obj/test_server.py 2010-11-08 18:51:48 +0000
@@ -13,7 +13,7 @@
13# See the License for the specific language governing permissions and13# See the License for the specific language governing permissions and
14# limitations under the License.14# limitations under the License.
1515
16""" Tests for swift.object_server """16""" Tests for swift.obj.server """
1717
18import cPickle as pickle18import cPickle as pickle
19import os19import os
@@ -22,23 +22,25 @@
22from nose import SkipTest22from nose import SkipTest
23from shutil import rmtree23from shutil import rmtree
24from StringIO import StringIO24from StringIO import StringIO
25from time import gmtime, sleep, strftime, time25from time import gmtime, strftime, time
2626
27from eventlet import sleep, spawn, wsgi, listen27from eventlet import sleep, spawn, wsgi, listen
28from webob import Request28from webob import Request
29from xattr import getxattr, setxattr29from xattr import getxattr
3030
31from test.unit import connect_tcp, readuntil2crlfs31from test.unit import connect_tcp, readuntil2crlfs
32from swift.common.constraints import PICKLE_PROTOCOL
32from swift.obj import server as object_server33from swift.obj import server as object_server
34from swift.obj.diskfile import DATADIR, DiskFile, JANITORDIR, METADATA_KEY
33from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \35from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
34 NullLogger, storage_directory36 NullLogger, storage_directory
3537
3638
37class TestObjectController(unittest.TestCase):39class TestObjectController(unittest.TestCase):
38 """ Test swift.object_server.ObjectController """40 """ Test swift.obj.server.ObjectController """
3941
40 def setUp(self):42 def setUp(self):
41 """ Set up for testing swift.object_server.ObjectController """43 """ Set up for testing swift.obj.server.ObjectController """
42 self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')44 self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
43 if not self.path_to_test_xfs or \45 if not self.path_to_test_xfs or \
44 not os.path.exists(self.path_to_test_xfs):46 not os.path.exists(self.path_to_test_xfs):
@@ -49,7 +51,7 @@
49 self.testdir = '/tmp/SWIFTUNITTEST'51 self.testdir = '/tmp/SWIFTUNITTEST'
50 else:52 else:
51 self.testdir = os.path.join(self.path_to_test_xfs,53 self.testdir = os.path.join(self.path_to_test_xfs,
52 'tmp_test_object_server_ObjectController')54 'tmp_test_obj_server_ObjectController')
53 mkdirs(self.testdir)55 mkdirs(self.testdir)
54 rmtree(self.testdir)56 rmtree(self.testdir)
55 mkdirs(os.path.join(self.testdir, 'sda1'))57 mkdirs(os.path.join(self.testdir, 'sda1'))
@@ -59,11 +61,11 @@
59 self.object_controller.bytes_per_sync = 161 self.object_controller.bytes_per_sync = 1
6062
61 def tearDown(self):63 def tearDown(self):
62 """ Tear down for testing swift.object_server.ObjectController """64 """ Tear down for testing swift.obj.server.ObjectController """
63 rmtree(self.testdir)65 rmtree(self.testdir)
6466
65 def test_POST_update_meta(self):67 def test_POST_update_meta(self):
66 """ Test swift.object_server.ObjectController.POST """68 """ Test swift.obj.server.ObjectController.POST """
67 if not self.path_to_test_xfs:69 if not self.path_to_test_xfs:
68 raise SkipTest70 raise SkipTest
69 timestamp = normalize_timestamp(time())71 timestamp = normalize_timestamp(time())
@@ -221,12 +223,11 @@
221 resp = self.object_controller.PUT(req)223 resp = self.object_controller.PUT(req)
222 self.assertEquals(resp.status_int, 201)224 self.assertEquals(resp.status_int, 201)
223 objfile = os.path.join(self.testdir, 'sda1',225 objfile = os.path.join(self.testdir, 'sda1',
224 storage_directory(object_server.DATADIR, 'p',226 storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
225 hash_path('a', 'c', 'o')),
226 timestamp + '.data')227 timestamp + '.data')
227 self.assert_(os.path.isfile(objfile))228 self.assert_(os.path.isfile(objfile))
228 self.assertEquals(open(objfile).read(), 'VERIFY')229 self.assertEquals(open(objfile).read(), 'VERIFY')
229 self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),230 self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)),
230 {'X-Timestamp': timestamp,231 {'X-Timestamp': timestamp,
231 'Content-Length': '6',232 'Content-Length': '6',
232 'ETag': '0b4c12d7e0a73840c1c4f148fda3b037',233 'ETag': '0b4c12d7e0a73840c1c4f148fda3b037',
@@ -253,12 +254,11 @@
253 resp = self.object_controller.PUT(req)254 resp = self.object_controller.PUT(req)
254 self.assertEquals(resp.status_int, 201)255 self.assertEquals(resp.status_int, 201)
255 objfile = os.path.join(self.testdir, 'sda1',256 objfile = os.path.join(self.testdir, 'sda1',
256 storage_directory(object_server.DATADIR, 'p',257 storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
257 hash_path('a', 'c', 'o')),
258 timestamp + '.data')258 timestamp + '.data')
259 self.assert_(os.path.isfile(objfile))259 self.assert_(os.path.isfile(objfile))
260 self.assertEquals(open(objfile).read(), 'VERIFY TWO')260 self.assertEquals(open(objfile).read(), 'VERIFY TWO')
261 self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),261 self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)),
262 {'X-Timestamp': timestamp,262 {'X-Timestamp': timestamp,
263 'Content-Length': '10',263 'Content-Length': '10',
264 'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',264 'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',
@@ -299,12 +299,11 @@
299 resp = self.object_controller.PUT(req)299 resp = self.object_controller.PUT(req)
300 self.assertEquals(resp.status_int, 201)300 self.assertEquals(resp.status_int, 201)
301 objfile = os.path.join(self.testdir, 'sda1',301 objfile = os.path.join(self.testdir, 'sda1',
302 storage_directory(object_server.DATADIR, 'p',302 storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
303 hash_path('a', 'c', 'o')),
304 timestamp + '.data')303 timestamp + '.data')
305 self.assert_(os.path.isfile(objfile))304 self.assert_(os.path.isfile(objfile))
306 self.assertEquals(open(objfile).read(), 'VERIFY THREE')305 self.assertEquals(open(objfile).read(), 'VERIFY THREE')
307 self.assertEquals(pickle.loads(getxattr(objfile, object_server.METADATA_KEY)),306 self.assertEquals(pickle.loads(getxattr(objfile, METADATA_KEY)),
308 {'X-Timestamp': timestamp,307 {'X-Timestamp': timestamp,
309 'Content-Length': '12',308 'Content-Length': '12',
310 'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',309 'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',
@@ -375,7 +374,7 @@
375 object_server.http_connect = old_http_connect374 object_server.http_connect = old_http_connect
376375
377 def test_HEAD(self):376 def test_HEAD(self):
378 """ Test swift.object_server.ObjectController.HEAD """377 """ Test swift.obj.server.ObjectController.HEAD """
379 if not self.path_to_test_xfs:378 if not self.path_to_test_xfs:
380 raise SkipTest379 raise SkipTest
381 req = Request.blank('/sda1/p/a/c')380 req = Request.blank('/sda1/p/a/c')
@@ -410,8 +409,7 @@
410 self.assertEquals(resp.headers['x-object-meta-two'], 'Two')409 self.assertEquals(resp.headers['x-object-meta-two'], 'Two')
411410
412 objfile = os.path.join(self.testdir, 'sda1',411 objfile = os.path.join(self.testdir, 'sda1',
413 storage_directory(object_server.DATADIR, 'p',412 storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
414 hash_path('a', 'c', 'o')),
415 timestamp + '.data')413 timestamp + '.data')
416 os.unlink(objfile)414 os.unlink(objfile)
417 req = Request.blank('/sda1/p/a/c/o')415 req = Request.blank('/sda1/p/a/c/o')
@@ -442,7 +440,7 @@
442 self.assertEquals(resp.status_int, 404)440 self.assertEquals(resp.status_int, 404)
443441
444 def test_GET(self):442 def test_GET(self):
445 """ Test swift.object_server.ObjectController.GET """443 """ Test swift.obj.server.ObjectController.GET """
446 if not self.path_to_test_xfs:444 if not self.path_to_test_xfs:
447 raise SkipTest445 raise SkipTest
448 req = Request.blank('/sda1/p/a/c')446 req = Request.blank('/sda1/p/a/c')
@@ -500,8 +498,7 @@
500 self.assertEquals(resp.headers['content-length'], '2')498 self.assertEquals(resp.headers['content-length'], '2')
501499
502 objfile = os.path.join(self.testdir, 'sda1',500 objfile = os.path.join(self.testdir, 'sda1',
503 storage_directory(object_server.DATADIR, 'p',501 storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
504 hash_path('a', 'c', 'o')),
505 timestamp + '.data')502 timestamp + '.data')
506 os.unlink(objfile)503 os.unlink(objfile)
507 req = Request.blank('/sda1/p/a/c/o')504 req = Request.blank('/sda1/p/a/c/o')
@@ -712,7 +709,7 @@
712 self.assertEquals(resp.status_int, 200)709 self.assertEquals(resp.status_int, 200)
713710
714 def test_DELETE(self):711 def test_DELETE(self):
715 """ Test swift.object_server.ObjectController.DELETE """712 """ Test swift.obj.server.ObjectController.DELETE """
716 if not self.path_to_test_xfs:713 if not self.path_to_test_xfs:
717 raise SkipTest714 raise SkipTest
718 req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'})715 req = Request.blank('/sda1/p/a/c', environ={'REQUEST_METHOD': 'DELETE'})
@@ -751,8 +748,7 @@
751 resp = self.object_controller.DELETE(req)748 resp = self.object_controller.DELETE(req)
752 self.assertEquals(resp.status_int, 204)749 self.assertEquals(resp.status_int, 204)
753 objfile = os.path.join(self.testdir, 'sda1',750 objfile = os.path.join(self.testdir, 'sda1',
754 storage_directory(object_server.DATADIR, 'p',751 storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
755 hash_path('a', 'c', 'o')),
756 timestamp + '.ts')752 timestamp + '.ts')
757 self.assert_(os.path.isfile(objfile))753 self.assert_(os.path.isfile(objfile))
758754
@@ -764,13 +760,12 @@
764 resp = self.object_controller.DELETE(req)760 resp = self.object_controller.DELETE(req)
765 self.assertEquals(resp.status_int, 204)761 self.assertEquals(resp.status_int, 204)
766 objfile = os.path.join(self.testdir, 'sda1',762 objfile = os.path.join(self.testdir, 'sda1',
767 storage_directory(object_server.DATADIR, 'p',763 storage_directory(DATADIR, 'p', hash_path('a', 'c', 'o')),
768 hash_path('a', 'c', 'o')),
769 timestamp + '.ts')764 timestamp + '.ts')
770 self.assert_(os.path.isfile(objfile))765 self.assert_(os.path.isfile(objfile))
771766
772 def test_call(self):767 def test_call(self):
773 """ Test swift.object_server.ObjectController.__call__ """768 """ Test swift.obj.server.ObjectController.__call__ """
774 inbuf = StringIO()769 inbuf = StringIO()
775 errbuf = StringIO()770 errbuf = StringIO()
776 outbuf = StringIO()771 outbuf = StringIO()
@@ -886,39 +881,6 @@
886 resp = self.object_controller.PUT(req)881 resp = self.object_controller.PUT(req)
887 self.assertEquals(resp.status_int, 400)882 self.assertEquals(resp.status_int, 400)
888883
889 def test_disk_file_app_iter_corners(self):
890 if not self.path_to_test_xfs:
891 raise SkipTest
892 df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o')
893 mkdirs(df.datadir)
894 f = open(os.path.join(df.datadir,
895 normalize_timestamp(time()) + '.data'), 'wb')
896 f.write('1234567890')
897 setxattr(f.fileno(), object_server.METADATA_KEY,
898 pickle.dumps({}, object_server.PICKLE_PROTOCOL))
899 f.close()
900 df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
901 keep_data_fp=True)
902 it = df.app_iter_range(0, None)
903 sio = StringIO()
904 for chunk in it:
905 sio.write(chunk)
906 self.assertEquals(sio.getvalue(), '1234567890')
907
908 df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
909 keep_data_fp=True)
910 it = df.app_iter_range(5, None)
911 sio = StringIO()
912 for chunk in it:
913 sio.write(chunk)
914 self.assertEquals(sio.getvalue(), '67890')
915
916 def test_disk_file_mkstemp_creates_dir(self):
917 tmpdir = os.path.join(self.testdir, 'sda1', 'tmp')
918 os.rmdir(tmpdir)
919 with object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o').mkstemp():
920 self.assert_(os.path.exists(tmpdir))
921
922 def test_max_upload_time(self):884 def test_max_upload_time(self):
923 if not self.path_to_test_xfs:885 if not self.path_to_test_xfs:
924 raise SkipTest886 raise SkipTest
@@ -1006,6 +968,161 @@
1006 self.assertEquals(resp.status_int, 200)968 self.assertEquals(resp.status_int, 200)
1007 self.assertEquals(resp.headers['content-encoding'], 'gzip')969 self.assertEquals(resp.headers['content-encoding'], 'gzip')
1008970
971 def test_overwritten_manifest(self):
972 """
973 Ensures a janitor segment cleanup job is created for an overwritten
974 manifest.
975 """
976 if not self.path_to_test_xfs:
977 raise SkipTest
978 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
979 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
980 self.assert_(df.is_deleted())
981
982 manifest = {'x-timestamp': normalize_timestamp(1),
983 'content-length': 100, 'content-type': 'text/plain',
984 'x-segment-size': 10, 'etag': 'd41d8cd98f00b204e9800998ecf8427e'}
985 manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
986 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
987 headers={'X-Timestamp': normalize_timestamp(1),
988 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain',
989 'X-Object-Type': 'manifest', 'X-Object-Length': '100'},
990 body=manifest)
991 resp = self.object_controller.PUT(req)
992 self.assertEquals(resp.status_int, 201)
993
994 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
995 headers={'X-Timestamp': normalize_timestamp(2),
996 'Content-Length': '1', 'Content-Type': 'text/plain'}, body=' ')
997 resp = self.object_controller.PUT(req)
998 self.assertEquals(resp.status_int, 201)
999
1000 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1001 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
1002 self.assert_(not df.is_deleted())
1003
1004 def test_segmented_put(self):
1005 if not self.path_to_test_xfs:
1006 raise SkipTest
1007
1008 # Ensure the janitor cleanup job doesn't exist yet
1009 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1010 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
1011 self.assert_(df.is_deleted())
1012
1013 # Put the first segment
1014 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
1015 headers={'X-Timestamp': normalize_timestamp(1),
1016 'Content-Length': '1', 'Content-Type': 'text/plain',
1017 'X-Object-Type': 'segment', 'X-Object-Segment': '0',
1018 'X-Object-Segment-If-Length': '1'}, body='1')
1019 resp = self.object_controller.PUT(req)
1020 self.assertEquals(resp.status_int, 201)
1021
1022 # Ensure the janitor cleanup job now exists
1023 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1024 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
1025 self.assert_(not df.is_deleted())
1026
1027 # Second segment would go to a different node
1028
1029 # Put the manifest
1030 manifest = {'x-timestamp': normalize_timestamp(1),
1031 'content-length': 2, 'content-type': 'text/plain',
1032 'x-segment-size': 1, 'etag': 'c20ad4d76fe97759aa27a0c99bff6710'}
1033 manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
1034 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
1035 headers={'X-Timestamp': normalize_timestamp(1),
1036 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain',
1037 'X-Object-Type': 'manifest', 'X-Object-Length': '2'},
1038 body=manifest)
1039 resp = self.object_controller.PUT(req)
1040 self.assertEquals(resp.status_int, 201)
1041
1042 # The janitor cleanup job should still exist (only the janitor will
1043 # verify the manifest is in place an remove the job).
1044 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1045 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
1046 self.assert_(not df.is_deleted())
1047
1048 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'})
1049 resp = self.object_controller.GET(req)
1050 self.assertEquals(resp.status_int, 200)
1051 self.assertEquals(resp.body, manifest)
1052
1053 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'},
1054 headers={'X-Object-Segment': '0',
1055 'X-Object-Segment-Timestamp': normalize_timestamp(1)})
1056 resp = self.object_controller.GET(req)
1057 self.assertEquals(resp.status_int, 200)
1058 self.assertEquals(resp.body, '1')
1059
1060 def test_segmented_put_no_longer(self):
1061 if not self.path_to_test_xfs:
1062 raise SkipTest
1063
1064 # Ensure the janitor cleanup job doesn't exist to begin with
1065 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1066 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
1067 self.assert_(df.is_deleted())
1068
1069 # Put the first segment, that really ends up a whole object
1070 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
1071 headers={'X-Timestamp': normalize_timestamp(1),
1072 'Content-Length': '1', 'Content-Type': 'text/plain',
1073 'X-Object-Type': 'segment', 'X-Object-Segment': '0',
1074 'X-Object-Segment-If-Length': '2'}, body='1')
1075 resp = self.object_controller.PUT(req)
1076 self.assertEquals(resp.status_int, 201)
1077
1078 # Ensure the janitor cleanup job doesn't exist since we put a whole
1079 # file, not a segment (due to X-Object-Segment-If-Length).
1080 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1081 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
1082 self.assert_(df.is_deleted())
1083
1084 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'GET'})
1085 resp = self.object_controller.GET(req)
1086 self.assertEquals(resp.status_int, 200)
1087 self.assertEquals(resp.body, '1')
1088
1089 def test_deleted_manifest(self):
1090 """
1091 Ensures a janitor segment cleanup job is created for a deleted
1092 manifest.
1093 """
1094 if not self.path_to_test_xfs:
1095 raise SkipTest
1096 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1097 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR)
1098 self.assert_(df.is_deleted())
1099
1100 manifest = {'x-timestamp': normalize_timestamp(1),
1101 'content-length': 100, 'content-type': 'text/plain',
1102 'x-segment-size': 10, 'etag': 'd41d8cd98f00b204e9800998ecf8427e'}
1103 manifest = pickle.dumps(manifest, protocol=PICKLE_PROTOCOL)
1104 req = Request.blank('/sda1/0/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
1105 headers={'X-Timestamp': normalize_timestamp(1),
1106 'Content-Length': str(len(manifest)), 'Content-Type': 'text/plain',
1107 'X-Object-Type': 'manifest', 'X-Object-Length': '100'},
1108 body=manifest)
1109 resp = self.object_controller.PUT(req)
1110 self.assertEquals(resp.status_int, 201)
1111
1112 req = Request.blank('/sda1/0/a/c/o',
1113 environ={'REQUEST_METHOD': 'DELETE'},
1114 headers={'X-Timestamp': normalize_timestamp(2),
1115 'Content-Length': '0', 'Content-Type': 'text/plain'}, body='')
1116 resp = self.object_controller.DELETE(req)
1117 self.assertEquals(resp.status_int, 204)
1118
1119 df = DiskFile(self.testdir, 'sda1', '0', 'Segment-Cleanup',
1120 normalize_timestamp(1), 'a/c/o', datadir=JANITORDIR,
1121 keep_data_fp=True)
1122 self.assert_(not df.is_deleted())
1123 job = pickle.loads(''.join(iter(df)))
1124 self.assertEquals(job['segment_last_deleted'], -1)
1125
10091126
1010if __name__ == '__main__':1127if __name__ == '__main__':
1011 unittest.main()1128 unittest.main()
10121129
=== modified file 'test/unit/obj/test_updater.py'
--- test/unit/obj/test_updater.py 2010-09-23 16:09:30 +0000
+++ test/unit/obj/test_updater.py 2010-11-08 18:51:48 +0000
@@ -24,7 +24,7 @@
24from eventlet import spawn, TimeoutError, listen24from eventlet import spawn, TimeoutError, listen
25from eventlet.timeout import Timeout25from eventlet.timeout import Timeout
2626
27from swift.obj import updater as object_updater, server as object_server27from swift.obj import updater as object_updater
28from swift.common.ring import RingData28from swift.common.ring import RingData
29from swift.common import utils29from swift.common import utils
30from swift.common.utils import hash_path, normalize_timestamp, mkdirs30from swift.common.utils import hash_path, normalize_timestamp, mkdirs
@@ -48,7 +48,7 @@
48 os.mkdir(self.devices_dir)48 os.mkdir(self.devices_dir)
49 self.sda1 = os.path.join(self.devices_dir, 'sda1')49 self.sda1 = os.path.join(self.devices_dir, 'sda1')
50 os.mkdir(self.sda1)50 os.mkdir(self.sda1)
51 os.mkdir(os.path.join(self.sda1,'tmp'))51 os.mkdir(os.path.join(self.sda1, 'tmp'))
5252
53 def tearDown(self):53 def tearDown(self):
54 rmtree(self.testdir, ignore_errors=1)54 rmtree(self.testdir, ignore_errors=1)
@@ -80,7 +80,7 @@
80 'node_timeout': '15',80 'node_timeout': '15',
81 })81 })
82 cu.run_once()82 cu.run_once()
83 async_dir = os.path.join(self.sda1, object_server.ASYNCDIR)83 async_dir = os.path.join(self.sda1, object_updater.ASYNCDIR)
84 os.mkdir(async_dir)84 os.mkdir(async_dir)
85 cu.run_once()85 cu.run_once()
86 self.assert_(os.path.exists(async_dir))86 self.assert_(os.path.exists(async_dir))
@@ -103,6 +103,7 @@
103 self.assert_(os.path.exists(op_path))103 self.assert_(os.path.exists(op_path))
104104
105 bindsock = listen(('127.0.0.1', 0))105 bindsock = listen(('127.0.0.1', 0))
106
106 def accepter(sock, return_code):107 def accepter(sock, return_code):
107 try:108 try:
108 with Timeout(3):109 with Timeout(3):
@@ -123,6 +124,7 @@
123 except BaseException, err:124 except BaseException, err:
124 return err125 return err
125 return None126 return None
127
126 def accept(return_codes):128 def accept(return_codes):
127 codes = iter(return_codes)129 codes = iter(return_codes)
128 try:130 try:
@@ -139,7 +141,8 @@
139 except BaseException, err:141 except BaseException, err:
140 return err142 return err
141 return None143 return None
142 event = spawn(accept, [201,500])144
145 event = spawn(accept, [201, 500])
143 for dev in cu.get_container_ring().devs:146 for dev in cu.get_container_ring().devs:
144 if dev is not None:147 if dev is not None:
145 dev['port'] = bindsock.getsockname()[1]148 dev['port'] = bindsock.getsockname()[1]
@@ -155,5 +158,6 @@
155 raise err158 raise err
156 self.assert_(not os.path.exists(op_path))159 self.assert_(not os.path.exists(op_path))
157160
161
158if __name__ == '__main__':162if __name__ == '__main__':
159 unittest.main()163 unittest.main()
160164
=== modified file 'test/unit/proxy/test_server.py'
--- test/unit/proxy/test_server.py 2010-11-05 14:47:43 +0000
+++ test/unit/proxy/test_server.py 2010-11-08 18:51:48 +0000
@@ -34,7 +34,7 @@
34from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen34from eventlet import sleep, spawn, TimeoutError, util, wsgi, listen
35from eventlet.timeout import Timeout35from eventlet.timeout import Timeout
36import simplejson36import simplejson
37from webob import Request37from webob import Request, Response
38from webob.exc import HTTPUnauthorized38from webob.exc import HTTPUnauthorized
3939
40from test.unit import connect_tcp, readuntil2crlfs40from test.unit import connect_tcp, readuntil2crlfs
@@ -44,7 +44,7 @@
44from swift.obj import server as object_server44from swift.obj import server as object_server
45from swift.common import ring45from swift.common import ring
46from swift.common.constraints import MAX_META_NAME_LENGTH, \46from swift.common.constraints import MAX_META_NAME_LENGTH, \
47 MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, MAX_FILE_SIZE47 MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE
48from swift.common.utils import mkdirs, normalize_timestamp, NullLogger48from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
4949
5050
@@ -53,7 +53,9 @@
5353
5454
55def fake_http_connect(*code_iter, **kwargs):55def fake_http_connect(*code_iter, **kwargs):
56
56 class FakeConn(object):57 class FakeConn(object):
58
57 def __init__(self, status, etag=None, body=''):59 def __init__(self, status, etag=None, body=''):
58 self.status = status60 self.status = status
59 self.reason = 'Fake'61 self.reason = 'Fake'
@@ -158,6 +160,7 @@
158160
159161
160class FakeMemcache(object):162class FakeMemcache(object):
163
161 def __init__(self):164 def __init__(self):
162 self.store = {}165 self.store = {}
163166
@@ -212,9 +215,12 @@
212class TestProxyServer(unittest.TestCase):215class TestProxyServer(unittest.TestCase):
213216
214 def test_unhandled_exception(self):217 def test_unhandled_exception(self):
218
215 class MyApp(proxy_server.Application):219 class MyApp(proxy_server.Application):
220
216 def get_controller(self, path):221 def get_controller(self, path):
217 raise Exception('this shouldnt be caught')222 raise Exception('this shouldnt be caught')
223
218 app = MyApp(None, FakeMemcache(), account_ring=FakeRing(),224 app = MyApp(None, FakeMemcache(), account_ring=FakeRing(),
219 container_ring=FakeRing(), object_ring=FakeRing())225 container_ring=FakeRing(), object_ring=FakeRing())
220 req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'})226 req = Request.blank('/account', environ={'REQUEST_METHOD': 'HEAD'})
@@ -323,8 +329,11 @@
323 test_status_map((200, 200, 204, 500, 404), 503)329 test_status_map((200, 200, 204, 500, 404), 503)
324330
325 def test_PUT_connect_exceptions(self):331 def test_PUT_connect_exceptions(self):
332
326 def mock_http_connect(*code_iter, **kwargs):333 def mock_http_connect(*code_iter, **kwargs):
334
327 class FakeConn(object):335 class FakeConn(object):
336
328 def __init__(self, status):337 def __init__(self, status):
329 self.status = status338 self.status = status
330 self.reason = 'Fake'339 self.reason = 'Fake'
@@ -372,8 +381,11 @@
372 test_status_map((200, 200, 503, 503, -1), 503)381 test_status_map((200, 200, 503, 503, -1), 503)
373382
374 def test_PUT_send_exceptions(self):383 def test_PUT_send_exceptions(self):
384
375 def mock_http_connect(*code_iter, **kwargs):385 def mock_http_connect(*code_iter, **kwargs):
386
376 class FakeConn(object):387 class FakeConn(object):
388
377 def __init__(self, status):389 def __init__(self, status):
378 self.status = status390 self.status = status
379 self.reason = 'Fake'391 self.reason = 'Fake'
@@ -430,15 +442,18 @@
430 controller = proxy_server.ObjectController(self.app, 'account',442 controller = proxy_server.ObjectController(self.app, 'account',
431 'container', 'object')443 'container', 'object')
432 req = Request.blank('/a/c/o', {}, headers={444 req = Request.blank('/a/c/o', {}, headers={
433 'Content-Length': str(MAX_FILE_SIZE + 1),445 'Content-Length': str(self.app.max_object_size + 1),
434 'Content-Type': 'foo/bar'})446 'Content-Type': 'foo/bar'})
435 self.app.update_request(req)447 self.app.update_request(req)
436 res = controller.PUT(req)448 res = controller.PUT(req)
437 self.assertEquals(res.status_int, 413)449 self.assertEquals(res.status_int, 413)
438450
439 def test_PUT_getresponse_exceptions(self):451 def test_PUT_getresponse_exceptions(self):
452
440 def mock_http_connect(*code_iter, **kwargs):453 def mock_http_connect(*code_iter, **kwargs):
454
441 class FakeConn(object):455 class FakeConn(object):
456
442 def __init__(self, status):457 def __init__(self, status):
443 self.status = status458 self.status = status
444 self.reason = 'Fake'459 self.reason = 'Fake'
@@ -633,6 +648,7 @@
633 dev['port'] = 1648 dev['port'] = 1
634649
635 class SlowBody():650 class SlowBody():
651
636 def __init__(self):652 def __init__(self):
637 self.sent = 0653 self.sent = 0
638654
@@ -680,6 +696,7 @@
680 dev['port'] = 1696 dev['port'] = 1
681697
682 class SlowBody():698 class SlowBody():
699
683 def __init__(self):700 def __init__(self):
684 self.sent = 0701 self.sent = 0
685702
@@ -1334,7 +1351,9 @@
13341351
1335 def test_chunked_put(self):1352 def test_chunked_put(self):
1336 # quick test of chunked put w/o PATH_TO_TEST_XFS1353 # quick test of chunked put w/o PATH_TO_TEST_XFS
1354
1337 class ChunkedFile():1355 class ChunkedFile():
1356
1338 def __init__(self, bytes):1357 def __init__(self, bytes):
1339 self.bytes = bytes1358 self.bytes = bytes
1340 self.read_bytes = 01359 self.read_bytes = 0
@@ -1375,12 +1394,13 @@
1375 req.body_file = ChunkedFile(11)1394 req.body_file = ChunkedFile(11)
1376 self.app.memcache.store = {}1395 self.app.memcache.store = {}
1377 self.app.update_request(req)1396 self.app.update_request(req)
1397 orig_max_object_size = self.app.max_object_size
1378 try:1398 try:
1379 server.MAX_FILE_SIZE = 101399 self.app.max_object_size = 10
1380 res = controller.PUT(req)1400 res = controller.PUT(req)
1381 self.assertEquals(res.status_int, 413)1401 self.assertEquals(res.status_int, 413)
1382 finally:1402 finally:
1383 server.MAX_FILE_SIZE = MAX_FILE_SIZE1403 self.app.max_object_size = orig_max_object_size
13841404
1385 def test_chunked_put_and_a_bit_more(self):1405 def test_chunked_put_and_a_bit_more(self):
1386 # Since we're starting up a lot here, we're going to test more than1406 # Since we're starting up a lot here, we're going to test more than
@@ -1495,6 +1515,7 @@
1495 self.assertEquals(headers[:len(exp)], exp)1515 self.assertEquals(headers[:len(exp)], exp)
1496 # Check unhandled exception1516 # Check unhandled exception
1497 orig_update_request = prosrv.update_request1517 orig_update_request = prosrv.update_request
1518
1498 def broken_update_request(env, req):1519 def broken_update_request(env, req):
1499 raise Exception('fake')1520 raise Exception('fake')
1500 prosrv.update_request = broken_update_request1521 prosrv.update_request = broken_update_request
@@ -1545,6 +1566,7 @@
1545 # in a test for logging x-forwarded-for (first entry only).1566 # in a test for logging x-forwarded-for (first entry only).
15461567
1547 class Logger(object):1568 class Logger(object):
1569
1548 def info(self, msg):1570 def info(self, msg):
1549 self.msg = msg1571 self.msg = msg
1550 orig_logger = prosrv.logger1572 orig_logger = prosrv.logger
@@ -1568,6 +1590,7 @@
1568 # Turn on header logging.1590 # Turn on header logging.
15691591
1570 class Logger(object):1592 class Logger(object):
1593
1571 def info(self, msg):1594 def info(self, msg):
1572 self.msg = msg1595 self.msg = msg
1573 orig_logger = prosrv.logger1596 orig_logger = prosrv.logger
@@ -1919,6 +1942,188 @@
1919 res = controller.PUT(req)1942 res = controller.PUT(req)
1920 self.assert_(called[0])1943 self.assert_(called[0])
19211944
1945 def test_GETorHEAD1(self):
1946 """
1947 Ensures we call GETorHEAD_base again without a range if we do a range
1948 request and get a 416 Requested Range Not Satisfiable, just in case the
1949 primary object is a manifest.
1950 """
1951 called_without_range = [False]
1952 controller = proxy_server.ObjectController(self.app, 'account',
1953 'container', 'object')
1954
1955 def local_GETorHEAD_base(req, server_type, partition, nodes, path,
1956 attempts):
1957 if req.range:
1958 return Response(status=416)
1959 called_without_range[0] = True
1960 return Response()
1961
1962 controller.GETorHEAD_base = local_GETorHEAD_base
1963 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'},
1964 headers={'Range': 'bytes=0-1'})
1965 controller.GETorHEAD(req)
1966 self.assert_(called_without_range[0])
1967
1968 def test_GETorHEAD2(self):
1969 """
1970 Ensures we call GETorHEAD_base again if the first request was a HEAD
1971 and the primary object is a manifest.
1972 """
1973 called_get = [False]
1974 controller = proxy_server.ObjectController(self.app, 'account',
1975 'container', 'object')
1976
1977 def local_GETorHEAD_base(req, server_type, partition, nodes, path,
1978 attempts):
1979 if req.method == 'HEAD':
1980 return Response(headers={'x-object-type': 'manifest'})
1981 elif req.method == 'GET':
1982 called_get[0] = True
1983 return Response(headers={'x-object-type': 'manifest'},
1984 body=pickle.dumps({'content-length': 0, 'x-segment-size': 1,
1985 'x-timestamp': normalize_timestamp(2),
1986 'etag': 'd41d8cd98f00b204e9800998ecf8427e',
1987 'content-type': 'text/plain'}))
1988 else:
1989 raise Exception('Unexpected method %s' % req.method)
1990
1991 controller.GETorHEAD_base = local_GETorHEAD_base
1992 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'HEAD'})
1993 controller.GETorHEAD(req)
1994 self.assert_(called_get[0])
1995
1996 def test_GETorHEAD3(self):
1997 """
1998 Ensures we call GETorHEAD_base again without a range if the first
1999 request was a GET with range that succeeded and the primary object is a
2000 manifest.
2001 """
2002 called_without_range = [False]
2003 controller = proxy_server.ObjectController(self.app, 'account',
2004 'container', 'object')
2005
2006 def local_GETorHEAD_base(req, server_type, partition, nodes, path,
2007 attempts):
2008 if not req.range:
2009 called_without_range[0] = True
2010 return Response(headers={'x-object-type': 'manifest'},
2011 body=pickle.dumps({'content-length': 0, 'x-segment-size': 1,
2012 'x-timestamp': normalize_timestamp(2),
2013 'etag': 'd41d8cd98f00b204e9800998ecf8427e',
2014 'content-type': 'text/plain'}))
2015
2016 controller.GETorHEAD_base = local_GETorHEAD_base
2017 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'},
2018 headers={'Range': 'bytes=0-1'})
2019 controller.GETorHEAD(req)
2020 self.assert_(called_without_range[0])
2021
2022 def test_PUT_segmented_object1(self):
2023 with save_globals():
2024 proxy_server.http_connect = \
2025 fake_http_connect(200, 200, # account, container checks
2026 201, 201, 201, # segment 0
2027 201, 201, 201, # segment 1
2028 201, 201, 201, # segment 2
2029 201, 201, 201) # manifest
2030 controller = proxy_server.ObjectController(self.app, 'account',
2031 'container', 'object')
2032 self.app.segment_size = 2
2033 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
2034 body='12345')
2035 resp = controller.PUT(req)
2036 self.assertEquals(resp.status_int, 201)
2037 self.assertEquals(req.bytes_transferred, 5)
2038
2039 def test_PUT_segmented_object2(self):
2040 """ Same as 1, just with a chunky data source. """
2041 with save_globals():
2042 proxy_server.http_connect = \
2043 fake_http_connect(200, 200, # account, container checks
2044 201, 201, 201, # segment 0
2045 201, 201, 201, # segment 1
2046 201, 201, 201, # segment 2
2047 201, 201, 201) # manifest
2048 controller = proxy_server.ObjectController(self.app, 'account',
2049 'container', 'object')
2050 self.app.segment_size = 2
2051
2052 class ChunkedReader(object):
2053
2054 def __init__(self):
2055 self.chunk = 0
2056
2057 def read(self, size):
2058 self.chunk += 1
2059 if self.chunk == 1:
2060 return '123'
2061 elif self.chunk == 2:
2062 return '45'
2063 else:
2064 return ''
2065
2066 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT',
2067 'wsgi.input': ChunkedReader()}, headers={'Content-Length': '5'})
2068 resp = controller.PUT(req)
2069 self.assertEquals(resp.status_int, 201)
2070 self.assertEquals(req.bytes_transferred, 5)
2071
2072 def test_PUT_segmented_object3(self):
2073 """ Failed segment PUT. """
2074 with save_globals():
2075 proxy_server.http_connect = \
2076 fake_http_connect(200, 200, # account, container checks
2077 201, 201, 201, # segment 0
2078 201, 500, 500, # segment 1
2079 201, 201, 201, # segment 2
2080 201, 201, 201) # manifest
2081 controller = proxy_server.ObjectController(self.app, 'account',
2082 'container', 'object')
2083 self.app.segment_size = 2
2084 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
2085 body='12345')
2086 resp = controller.PUT(req)
2087 self.assertEquals(resp.status_int, 503)
2088 self.assertEquals(resp.body.strip(),
2089 'Unable to complete very large file operation.')
2090
2091 def test_PUT_segmented_object4(self):
2092 """ Non-matching etag sent. """
2093 with save_globals():
2094 proxy_server.http_connect = \
2095 fake_http_connect(200, 200, # account, container checks
2096 201, 201, 201, # segment 0
2097 201, 201, 201, # segment 1
2098 201, 201, 201, # segment 2
2099 201, 201, 201) # manifest
2100 controller = proxy_server.ObjectController(self.app, 'account',
2101 'container', 'object')
2102 self.app.segment_size = 2
2103 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
2104 body='12345', headers={'ETag': 'abc'})
2105 resp = controller.PUT(req)
2106 self.assertEquals(resp.status_int, 422)
2107
2108 def test_PUT_segmented_object5(self):
2109 """ Failed manifest PUT. """
2110 with save_globals():
2111 proxy_server.http_connect = \
2112 fake_http_connect(200, 200, # account, container checks
2113 201, 201, 201, # segment 0
2114 201, 201, 201, # segment 1
2115 201, 201, 201, # segment 2
2116 201, 500, 500) # manifest
2117 controller = proxy_server.ObjectController(self.app, 'account',
2118 'container', 'object')
2119 self.app.segment_size = 2
2120 req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
2121 body='12345')
2122 resp = controller.PUT(req)
2123 self.assertEquals(resp.status_int, 503)
2124 self.assertEquals(resp.body.strip(),
2125 'Unable to complete very large file operation.')
2126
1922 def test_COPY_calls_authorize(self):2127 def test_COPY_calls_authorize(self):
1923 called = [False]2128 called = [False]
19242129
@@ -2080,7 +2285,9 @@
2080 self.assertEquals(resp.status_int, 404)2285 self.assertEquals(resp.status_int, 404)
20812286
2082 def test_put_locking(self):2287 def test_put_locking(self):
2288
2083 class MockMemcache(FakeMemcache):2289 class MockMemcache(FakeMemcache):
2290
2084 def __init__(self, allow_lock=None):2291 def __init__(self, allow_lock=None):
2085 self.allow_lock = allow_lock2292 self.allow_lock = allow_lock
2086 super(MockMemcache, self).__init__()2293 super(MockMemcache, self).__init__()
@@ -2669,5 +2876,142 @@
2669 self.assertEquals(resp.status_int, 400)2876 self.assertEquals(resp.status_int, 400)
26702877
26712878
2879class TestSegmentedIterable(unittest.TestCase):
2880
2881 def setUp(self):
2882 self.app = proxy_server.Application(None, FakeMemcache(),
2883 account_ring=FakeRing(), container_ring=FakeRing(),
2884 object_ring=FakeRing())
2885 self.controller = proxy_server.ObjectController(self.app, 'account',
2886 'container', 'object')
2887
2888 def test_zero_bytes(self):
2889 si = proxy_server.SegmentedIterable(self.controller, 0, 2,
2890 normalize_timestamp(1))
2891 self.assertEquals(''.join(iter(si)), '')
2892
2893 def test_happy_path(self):
2894 segment = [0]
2895
2896 def give_connect(*args, **kwargs):
2897 self.assertEquals(int(kwargs['headers']['X-Object-Segment']),
2898 segment[0])
2899 segment[0] += 1
2900
2901 with save_globals():
2902 proxy_server.http_connect = fake_http_connect(200, 200, 200,
2903 body='12', give_connect=give_connect)
2904 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2905 normalize_timestamp(1))
2906 self.assertEquals(''.join(iter(si)), '12121')
2907 self.assertEquals(segment[0], 3)
2908
2909 def test_not_found_start(self):
2910 with save_globals():
2911 proxy_server.http_connect = \
2912 fake_http_connect(404, 404, 404, 200, 200, 200, body='12')
2913 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2914 normalize_timestamp(1))
2915 exc = None
2916 try:
2917 for chunk in si:
2918 raise Exception('Got data when we should not have.')
2919 except Exception, err:
2920 exc = err
2921 self.assertEquals(str(exc),
2922 'Could not load segment 0 of /account/container/object')
2923
2924 def test_not_found_after_start(self):
2925 with save_globals():
2926 proxy_server.http_connect = \
2927 fake_http_connect(200, 404, 404, 404, 200, 200, body='12')
2928 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2929 normalize_timestamp(1))
2930 exc = None
2931 try:
2932 for chunk in si:
2933 self.assertEquals(chunk, '12')
2934 except Exception, err:
2935 exc = err
2936 self.assertEquals(str(exc),
2937 'Could not load segment 1 of /account/container/object')
2938
2939 def test_partial_not_found(self):
2940 with save_globals():
2941 proxy_server.http_connect = \
2942 fake_http_connect(404, 200, 404, 404, 200, 200, body='12')
2943 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2944 normalize_timestamp(1))
2945 self.assertEquals(''.join(iter(si)), '12121')
2946
2947 def test_bytes_transferred(self):
2948 with save_globals():
2949 proxy_server.http_connect = \
2950 fake_http_connect(200, 200, 200, body='12')
2951 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2952 normalize_timestamp(1))
2953
2954 class Stub(object):
2955 pass
2956
2957 si.response = Stub()
2958 self.assertEquals(''.join(iter(si)), '12121')
2959 self.assertEquals(si.response.bytes_transferred, 5)
2960
2961 def test_bytes_transferred_app_iter_range(self):
2962 with save_globals():
2963 proxy_server.http_connect = \
2964 fake_http_connect(200, 200, 200, body='12')
2965 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2966 normalize_timestamp(1))
2967
2968 class Stub(object):
2969 pass
2970
2971 si.response = Stub()
2972 self.assertEquals(''.join(si.app_iter_range(1, 3)), '212')
2973 self.assertEquals(si.response.bytes_transferred, 3)
2974
2975 def test_app_iter_range_past_end(self):
2976 with save_globals():
2977 proxy_server.http_connect = \
2978 fake_http_connect(200, 200, 200, body='12')
2979 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2980 normalize_timestamp(1))
2981 self.assertEquals(''.join(si.app_iter_range(1, 30)), '2121')
2982
2983 def test_app_iter_range_start_past_end(self):
2984 with save_globals():
2985 proxy_server.http_connect = \
2986 fake_http_connect(200, 200, 200, body='12')
2987 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2988 normalize_timestamp(1))
2989 self.assertEquals(''.join(si.app_iter_range(30, 31)), '')
2990
2991 def test_app_iter_range_to_end(self):
2992 with save_globals():
2993 proxy_server.http_connect = \
2994 fake_http_connect(200, 200, 200, body='12')
2995 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
2996 normalize_timestamp(1))
2997 self.assertEquals(''.join(si.app_iter_range(3, None)), '21')
2998
2999 def test_app_iter_range_to_an_end(self):
3000 with save_globals():
3001 proxy_server.http_connect = \
3002 fake_http_connect(200, 200, 200, body='12')
3003 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
3004 normalize_timestamp(1))
3005 self.assertEquals(''.join(si.app_iter_range(None, 3)), '121')
3006
3007 def test_app_iter_range_full(self):
3008 with save_globals():
3009 proxy_server.http_connect = \
3010 fake_http_connect(200, 200, 200, body='12')
3011 si = proxy_server.SegmentedIterable(self.controller, 5, 2,
3012 normalize_timestamp(1))
3013 self.assertEquals(''.join(si.app_iter_range(None, None)), '12121')
3014
3015
2672if __name__ == '__main__':3016if __name__ == '__main__':
2673 unittest.main()3017 unittest.main()