Merge lp:~david-goetz/swift/slow_audit into lp:~hudson-openstack/swift/trunk

Proposed by David Goetz
Status: Merged
Approved by: gholt
Approved revision: 153
Merged at revision: 160
Proposed branch: lp:~david-goetz/swift/slow_audit
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 635 lines (+380/-60)
7 files modified
doc/source/deployment_guide.rst (+6/-1)
etc/object-server.conf-sample (+2/-2)
swift/common/utils.py (+36/-6)
swift/obj/auditor.py (+50/-36)
swift/obj/server.py (+18/-7)
test/unit/common/test_utils.py (+45/-5)
test/unit/obj/test_auditor.py (+223/-3)
To merge this branch: bzr merge lp:~david-goetz/swift/slow_audit
Reviewer Review Type Date Requested Status
gholt (community) Approve
Review via email: mp+44861@code.launchpad.net

Description of the change

Rate limit the object auditor, add unit tests, and fix a bug in the location generator.

To post a comment you must log in.
Revision history for this message
gholt (gholt) wrote :

Looks good. You'll have to manually merge trunk into this though before final merge approval, as there's a small conflict at the moment.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'doc/source/deployment_guide.rst'
2--- doc/source/deployment_guide.rst 2010-12-02 01:08:49 +0000
3+++ doc/source/deployment_guide.rst 2011-01-11 17:02:10 +0000
4@@ -229,7 +229,12 @@
5 log_name object-auditor Label used when logging
6 log_facility LOG_LOCAL0 Syslog log facility
7 log_level INFO Logging level
8-interval 1800 Minimum time for a pass to take
9+files_per_second 20 Maximum files audited per second. Should
10+ be tuned according to individual system
11+ specs. 0 is unlimited.
12+bytes_per_second 10000000 Maximum bytes audited per second. Should
13+ be tuned according to individual system
14+ specs. 0 is unlimited.
15 ================== ============== ==========================================
16
17 ------------------------------
18
19=== modified file 'etc/object-server.conf-sample'
20--- etc/object-server.conf-sample 2010-10-19 15:02:36 +0000
21+++ etc/object-server.conf-sample 2011-01-11 17:02:10 +0000
22@@ -55,5 +55,5 @@
23
24 [object-auditor]
25 # log_name = object-auditor
26-# Will audit, at most, 1 object per device per interval
27-# interval = 1800
28+# files_per_second = 20
29+# bytes_per_second = 10000000
30
31=== modified file 'swift/common/utils.py'
32--- swift/common/utils.py 2011-01-10 20:37:49 +0000
33+++ swift/common/utils.py 2011-01-11 17:02:10 +0000
34@@ -779,19 +779,22 @@
35 on devices
36 :param logger: a logger object
37 '''
38- for device in os.listdir(devices):
39- if mount_check and not\
40+ device_dir = os.listdir(devices)
41+ # randomize devices in case of process restart before sweep completed
42+ shuffle(device_dir)
43+ for device in device_dir:
44+ if mount_check and not \
45 os.path.ismount(os.path.join(devices, device)):
46 if logger:
47 logger.debug(
48 _('Skipping %s as it is not mounted'), device)
49 continue
50- datadir = os.path.join(devices, device, datadir)
51- if not os.path.exists(datadir):
52+ datadir_path = os.path.join(devices, device, datadir)
53+ if not os.path.exists(datadir_path):
54 continue
55- partitions = os.listdir(datadir)
56+ partitions = os.listdir(datadir_path)
57 for partition in partitions:
58- part_path = os.path.join(datadir, partition)
59+ part_path = os.path.join(datadir_path, partition)
60 if not os.path.isdir(part_path):
61 continue
62 suffixes = os.listdir(part_path)
63@@ -808,3 +811,30 @@
64 reverse=True):
65 path = os.path.join(hash_path, fname)
66 yield path, device, partition
67+
68+
69+def ratelimit_sleep(running_time, max_rate, incr_by=1):
70+ '''
71+ Will eventlet.sleep() for the appropriate time so that the max_rate
72+ is never exceeded. If max_rate is 0, will not ratelimit. The
73+ maximum recommended rate should not exceed (1000 * incr_by) a second
74+ as eventlet.sleep() does involve some overhead. Returns running_time
75+ that should be used for subsequent calls.
76+
77+ :param running_time: the running time of the next allowable request. Best
78+ to start at zero.
79+ :param max_rate: The maximum rate per second allowed for the process.
80+ :param incr_by: How much to increment the counter. Useful if you want
81+ to ratelimit 1024 bytes/sec and have differing sizes
82+ of requests. Must be >= 0.
83+ '''
84+ if not max_rate or incr_by <= 0:
85+ return running_time
86+ clock_accuracy = 1000.0
87+ now = time.time() * clock_accuracy
88+ time_per_request = clock_accuracy * (float(incr_by) / max_rate)
89+ if running_time < now:
90+ running_time = now
91+ elif running_time - now > time_per_request:
92+ eventlet.sleep((running_time - now) / clock_accuracy)
93+ return running_time + time_per_request
94
95=== modified file 'swift/obj/auditor.py'
96--- swift/obj/auditor.py 2011-01-04 23:34:43 +0000
97+++ swift/obj/auditor.py 2011-01-11 17:02:10 +0000
98@@ -20,7 +20,8 @@
99
100 from swift.obj import server as object_server
101 from swift.obj.replicator import invalidate_hash
102-from swift.common.utils import get_logger, renamer, audit_location_generator
103+from swift.common.utils import get_logger, renamer, audit_location_generator, \
104+ ratelimit_sleep
105 from swift.common.exceptions import AuditException
106 from swift.common.daemon import Daemon
107
108@@ -34,39 +35,30 @@
109 self.devices = conf.get('devices', '/srv/node')
110 self.mount_check = conf.get('mount_check', 'true').lower() in \
111 ('true', 't', '1', 'on', 'yes', 'y')
112- self.interval = int(conf.get('interval', 1800))
113+ self.max_files_per_second = float(conf.get('files_per_second', 20))
114+ self.max_bytes_per_second = float(conf.get('bytes_per_second',
115+ 10000000))
116+ self.files_running_time = 0
117+ self.bytes_running_time = 0
118+ self.bytes_processed = 0
119+ self.total_bytes_processed = 0
120+ self.total_files_processed = 0
121 self.passes = 0
122 self.quarantines = 0
123 self.errors = 0
124+ self.log_time = 3600 # once an hour
125
126- def run_forever(self): # pragma: no cover
127+ def run_forever(self):
128 """Run the object audit until stopped."""
129- reported = time.time()
130- time.sleep(random() * self.interval)
131 while True:
132- begin = time.time()
133- all_locs = audit_location_generator(self.devices,
134- object_server.DATADIR,
135- mount_check=self.mount_check,
136- logger=self.logger)
137- for path, device, partition in all_locs:
138- self.object_audit(path, device, partition)
139- if time.time() - reported >= 3600: # once an hour
140- self.logger.info(_('Since %(time)s: Locally: %(pass)d '
141- 'passed audit, %(quar)d quarantined, %(error)d errors'),
142- {'time': time.ctime(reported), 'pass': self.passes,
143- 'quar': self.quarantines, 'error': self.errors})
144- reported = time.time()
145- self.passes = 0
146- self.quarantines = 0
147- self.errors = 0
148- elapsed = time.time() - begin
149- if elapsed < self.interval:
150- time.sleep(self.interval - elapsed)
151+ self.run_once('forever')
152+ self.total_bytes_processed = 0
153+ self.total_files_processed = 0
154+ time.sleep(30)
155
156- def run_once(self):
157+ def run_once(self, mode='once'):
158 """Run the object audit once."""
159- self.logger.info(_('Begin object audit "once" mode'))
160+ self.logger.info(_('Begin object audit "%s" mode' % mode))
161 begin = reported = time.time()
162 all_locs = audit_location_generator(self.devices,
163 object_server.DATADIR,
164@@ -74,18 +66,35 @@
165 logger=self.logger)
166 for path, device, partition in all_locs:
167 self.object_audit(path, device, partition)
168- if time.time() - reported >= 3600: # once an hour
169- self.logger.info(_('Since %(time)s: Locally: %(pass)d '
170- 'passed audit, %(quar)d quarantined, %(error)d errors'),
171- {'time': time.ctime(reported), 'pass': self.passes,
172- 'quar': self.quarantines, 'error': self.errors})
173+ self.files_running_time = ratelimit_sleep(
174+ self.files_running_time, self.max_files_per_second)
175+ self.total_files_processed += 1
176+ if time.time() - reported >= self.log_time:
177+ self.logger.info(_(
178+ 'Since %(start_time)s: Locally: %(passes)d passed audit, '
179+ '%(quars)d quarantined, %(errors)d errors '
180+ 'files/sec: %(frate).2f , bytes/sec: %(brate).2f') % {
181+ 'start_time': time.ctime(reported),
182+ 'passes': self.passes,
183+ 'quars': self.quarantines,
184+ 'errors': self.errors,
185+ 'frate': self.passes / (time.time() - reported),
186+ 'brate': self.bytes_processed /
187+ (time.time() - reported)})
188 reported = time.time()
189 self.passes = 0
190 self.quarantines = 0
191 self.errors = 0
192+ self.bytes_processed = 0
193 elapsed = time.time() - begin
194- self.logger.info(
195- _('Object audit "once" mode completed: %.02fs'), elapsed)
196+ self.logger.info(_(
197+ 'Object audit "%(mode)s" mode completed: %(elapsed).02fs. '
198+ 'Total files/sec: %(frate).2f , '
199+ 'Total bytes/sec: %(brate).2f ') % {
200+ 'mode': mode,
201+ 'elapsed': elapsed,
202+ 'frate': self.total_files_processed / elapsed,
203+ 'brate': self.total_bytes_processed / elapsed})
204
205 def object_audit(self, path, device, partition):
206 """
207@@ -102,7 +111,7 @@
208 name = object_server.read_metadata(path)['name']
209 except Exception, exc:
210 raise AuditException('Error when reading metadata: %s' % exc)
211- _, account, container, obj = name.split('/', 3)
212+ _junk, account, container, obj = name.split('/', 3)
213 df = object_server.DiskFile(self.devices, device,
214 partition, account,
215 container, obj,
216@@ -117,15 +126,20 @@
217 os.path.getsize(df.data_file)))
218 etag = md5()
219 for chunk in df:
220+ self.bytes_running_time = ratelimit_sleep(
221+ self.bytes_running_time, self.max_bytes_per_second,
222+ incr_by=len(chunk))
223 etag.update(chunk)
224+ self.bytes_processed += len(chunk)
225+ self.total_bytes_processed += len(chunk)
226 etag = etag.hexdigest()
227 if etag != df.metadata['ETag']:
228 raise AuditException("ETag of %s does not match file's md5 of "
229 "%s" % (df.metadata['ETag'], etag))
230 except AuditException, err:
231 self.quarantines += 1
232- self.logger.error(_('ERROR Object %(obj)s failed audit and will be '
233- 'quarantined: %(err)s'), {'obj': path, 'err': err})
234+ self.logger.error(_('ERROR Object %(obj)s failed audit and will '
235+ 'be quarantined: %(err)s'), {'obj': path, 'err': err})
236 invalidate_hash(os.path.dirname(path))
237 renamer_path = os.path.dirname(path)
238 renamer(renamer_path, os.path.join(self.devices, device,
239
240=== modified file 'swift/obj/server.py'
241--- swift/obj/server.py 2011-01-07 21:17:29 +0000
242+++ swift/obj/server.py 2011-01-11 17:02:10 +0000
243@@ -72,6 +72,21 @@
244 return pickle.loads(metadata)
245
246
247+def write_metadata(fd, metadata):
248+ """
249+ Helper function to write pickled metadata for an object file.
250+
251+ :param fd: file descriptor to write the metadata
252+ :param metadata: metadata to write
253+ """
254+ metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
255+ key = 0
256+ while metastr:
257+ setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
258+ metastr = metastr[254:]
259+ key += 1
260+
261+
262 class DiskFile(object):
263 """
264 Manage object files on disk.
265@@ -97,6 +112,7 @@
266 self.metadata = {}
267 self.meta_file = None
268 self.data_file = None
269+ self.fp = None
270 if not os.path.exists(self.datadir):
271 return
272 files = sorted(os.listdir(self.datadir), reverse=True)
273@@ -203,17 +219,12 @@
274
275 :params fd: file descriptor of the temp file
276 :param tmppath: path to the temporary file being used
277- :param metadata: dictionary of metada to be written
278+ :param metadata: dictionary of metadata to be written
279 :param extention: extension to be used when making the file
280 """
281 metadata['name'] = self.name
282 timestamp = normalize_timestamp(metadata['X-Timestamp'])
283- metastr = pickle.dumps(metadata, PICKLE_PROTOCOL)
284- key = 0
285- while metastr:
286- setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254])
287- metastr = metastr[254:]
288- key += 1
289+ write_metadata(fd, metadata)
290 if 'Content-Length' in metadata:
291 drop_buffer_cache(fd, 0, int(metadata['Content-Length']))
292 os.fsync(fd)
293
294=== modified file 'test/unit/common/test_utils.py'
295--- test/unit/common/test_utils.py 2011-01-07 21:17:29 +0000
296+++ test/unit/common/test_utils.py 2011-01-11 17:02:10 +0000
297@@ -21,6 +21,7 @@
298 import os
299 import socket
300 import sys
301+import time
302 import unittest
303 from getpass import getuser
304 from shutil import rmtree
305@@ -34,6 +35,7 @@
306
307
308 class MockOs():
309+
310 def __init__(self, pass_funcs=[], called_funcs=[], raise_funcs=[]):
311 self.closed_fds = []
312 for func in pass_funcs:
313@@ -183,12 +185,12 @@
314 print 'test2'
315 self.assertEquals(sio.getvalue(), 'STDOUT: test2\n')
316 sys.stderr = lfo
317- print >>sys.stderr, 'test4'
318+ print >> sys.stderr, 'test4'
319 self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n')
320 sys.stdout = orig_stdout
321 print 'test5'
322 self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n')
323- print >>sys.stderr, 'test6'
324+ print >> sys.stderr, 'test6'
325 self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
326 'STDOUT: test6\n')
327 sys.stderr = orig_stderr
328@@ -316,7 +318,7 @@
329
330 def test_hash_path(self):
331 # Yes, these tests are deliberately very fragile. We want to make sure
332- # that if someones changes the results hash_path produces, they know it.
333+ # that if someones changes the results hash_path produces, they know it
334 self.assertEquals(utils.hash_path('a'),
335 '1c84525acb02107ea475dcd3d09c2c58')
336 self.assertEquals(utils.hash_path('a', 'c'),
337@@ -355,10 +357,12 @@
338 result = utils.readconf('/tmp/test', 'section2').get('log_name')
339 expected = 'yarr'
340 self.assertEquals(result, expected)
341- result = utils.readconf('/tmp/test', 'section1', log_name='foo').get('log_name')
342+ result = utils.readconf('/tmp/test', 'section1',
343+ log_name='foo').get('log_name')
344 expected = 'foo'
345 self.assertEquals(result, expected)
346- result = utils.readconf('/tmp/test', 'section1', defaults={'bar': 'baz'})
347+ result = utils.readconf('/tmp/test', 'section1',
348+ defaults={'bar': 'baz'})
349 expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'}
350 self.assertEquals(result, expected)
351 os.unlink('/tmp/test')
352@@ -438,5 +442,41 @@
353 self.assertNotEquals(utils.get_logger.console, old_handler)
354 logger.logger.removeHandler(utils.get_logger.console)
355
356+ def test_ratelimit_sleep(self):
357+ running_time = 0
358+ start = time.time()
359+ for i in range(100):
360+ running_time = utils.ratelimit_sleep(running_time, 0)
361+ self.assertTrue(abs((time.time() - start) * 100) < 1)
362+
363+ running_time = 0
364+ start = time.time()
365+ for i in range(50):
366+ running_time = utils.ratelimit_sleep(running_time, 200)
367+ # make sure its accurate to 10th of a second
368+ self.assertTrue(abs(25 - (time.time() - start) * 100) < 10)
369+
370+ def test_ratelimit_sleep_with_sleep(self):
371+ running_time = 0
372+ start = time.time()
373+ for i in range(25):
374+ running_time = utils.ratelimit_sleep(running_time, 50)
375+ time.sleep(1.0 / 75)
376+ # make sure its accurate to 10th of a second
377+ self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
378+
379+ def test_ratelimit_sleep_with_incr(self):
380+ running_time = 0
381+ start = time.time()
382+ vals = [5, 17, 0, 3, 11, 30,
383+ 40, 4, 13, 2, -1] * 2 # adds up to 250 (with no -1)
384+ total = 0
385+ for i in vals:
386+ running_time = utils.ratelimit_sleep(running_time,
387+ 500, incr_by=i)
388+ total += i
389+ self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
390+
391+
392 if __name__ == '__main__':
393 unittest.main()
394
395=== modified file 'test/unit/obj/test_auditor.py'
396--- test/unit/obj/test_auditor.py 2011-01-04 23:34:43 +0000
397+++ test/unit/obj/test_auditor.py 2011-01-11 17:02:10 +0000
398@@ -14,14 +14,234 @@
399 # limitations under the License.
400
401 # TODO: Tests
402-
403 import unittest
404+import tempfile
405+import os
406+import time
407+from shutil import rmtree
408+from hashlib import md5
409 from swift.obj import auditor
410+from swift.obj.server import DiskFile, write_metadata
411+from swift.common.utils import hash_path, mkdirs, normalize_timestamp, renamer
412+from swift.obj.replicator import invalidate_hash
413+from swift.common.exceptions import AuditException
414+
415
416 class TestAuditor(unittest.TestCase):
417
418- def test_placeholder(self):
419- pass
420+ def setUp(self):
421+ self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS')
422+ if not self.path_to_test_xfs or \
423+ not os.path.exists(self.path_to_test_xfs):
424+ print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \
425+ 'pointing to a valid directory.\n' \
426+ 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \
427+ 'system for testing.'
428+ self.testdir = '/tmp/SWIFTUNITTEST'
429+ else:
430+ self.testdir = os.path.join(self.path_to_test_xfs,
431+ 'tmp_test_object_auditor')
432+
433+ self.devices = os.path.join(self.testdir, 'node')
434+ rmtree(self.testdir, ignore_errors=1)
435+ os.mkdir(self.testdir)
436+ os.mkdir(self.devices)
437+ os.mkdir(os.path.join(self.devices, 'sda'))
438+ self.objects = os.path.join(self.devices, 'sda', 'objects')
439+
440+ os.mkdir(os.path.join(self.devices, 'sdb'))
441+ self.objects_2 = os.path.join(self.devices, 'sdb', 'objects')
442+
443+ os.mkdir(self.objects)
444+ self.parts = {}
445+ for part in ['0', '1', '2', '3']:
446+ self.parts[part] = os.path.join(self.objects, part)
447+ os.mkdir(os.path.join(self.objects, part))
448+
449+ self.conf = dict(
450+ devices=self.devices,
451+ mount_check='false')
452+
453+ def tearDown(self):
454+ rmtree(self.testdir, ignore_errors=1)
455+
456+ def test_object_audit_extra_data(self):
457+ self.auditor = auditor.ObjectAuditor(self.conf)
458+ cur_part = '0'
459+ disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
460+ data = '0' * 1024
461+ etag = md5()
462+ with disk_file.mkstemp() as (fd, tmppath):
463+ os.write(fd, data)
464+ etag.update(data)
465+ etag = etag.hexdigest()
466+ timestamp = str(normalize_timestamp(time.time()))
467+ metadata = {
468+ 'ETag': etag,
469+ 'X-Timestamp': timestamp,
470+ 'Content-Length': str(os.fstat(fd).st_size),
471+ }
472+ disk_file.put(fd, tmppath, metadata)
473+ pre_quarantines = self.auditor.quarantines
474+
475+ self.auditor.object_audit(
476+ os.path.join(disk_file.datadir, timestamp + '.data'),
477+ 'sda', cur_part)
478+ self.assertEquals(self.auditor.quarantines, pre_quarantines)
479+
480+ os.write(fd, 'extra_data')
481+ self.auditor.object_audit(
482+ os.path.join(disk_file.datadir, timestamp + '.data'),
483+ 'sda', cur_part)
484+ self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
485+
486+ def test_object_audit_diff_data(self):
487+ self.auditor = auditor.ObjectAuditor(self.conf)
488+ cur_part = '0'
489+ disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
490+ data = '0' * 1024
491+ etag = md5()
492+ timestamp = str(normalize_timestamp(time.time()))
493+ with disk_file.mkstemp() as (fd, tmppath):
494+ os.write(fd, data)
495+ etag.update(data)
496+ etag = etag.hexdigest()
497+ metadata = {
498+ 'ETag': etag,
499+ 'X-Timestamp': timestamp,
500+ 'Content-Length': str(os.fstat(fd).st_size),
501+ }
502+ disk_file.put(fd, tmppath, metadata)
503+ pre_quarantines = self.auditor.quarantines
504+
505+ self.auditor.object_audit(
506+ os.path.join(disk_file.datadir, timestamp + '.data'),
507+ 'sda', cur_part)
508+ self.assertEquals(self.auditor.quarantines, pre_quarantines)
509+ etag = md5()
510+ etag.update('1' + '0' * 1023)
511+ etag = etag.hexdigest()
512+ metadata['ETag'] = etag
513+ write_metadata(fd, metadata)
514+
515+ self.auditor.object_audit(
516+ os.path.join(disk_file.datadir, timestamp + '.data'),
517+ 'sda', cur_part)
518+ self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
519+
520+ def test_object_audit_no_meta(self):
521+ self.auditor = auditor.ObjectAuditor(self.conf)
522+ cur_part = '0'
523+ disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
524+ data = '0' * 1024
525+ etag = md5()
526+ pre_quarantines = self.auditor.quarantines
527+ with disk_file.mkstemp() as (fd, tmppath):
528+ os.write(fd, data)
529+ etag.update(data)
530+ etag = etag.hexdigest()
531+ timestamp = str(normalize_timestamp(time.time()))
532+ os.fsync(fd)
533+ invalidate_hash(os.path.dirname(disk_file.datadir))
534+ renamer(tmppath, os.path.join(disk_file.datadir,
535+ timestamp + '.data'))
536+ self.auditor.object_audit(
537+ os.path.join(disk_file.datadir, timestamp + '.data'),
538+ 'sda', cur_part)
539+ self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
540+
541+ def test_object_audit_bad_args(self):
542+ self.auditor = auditor.ObjectAuditor(self.conf)
543+ pre_errors = self.auditor.errors
544+ self.auditor.object_audit(5, 'sda', '0')
545+ self.assertEquals(self.auditor.errors, pre_errors + 1)
546+ pre_errors = self.auditor.errors
547+ self.auditor.object_audit('badpath', 'sda', '0')
548+ self.assertEquals(self.auditor.errors, pre_errors) # just returns
549+
550+ def test_object_run_once_pass(self):
551+ self.auditor = auditor.ObjectAuditor(self.conf)
552+ self.auditor.log_time = 0
553+ cur_part = '0'
554+ timestamp = str(normalize_timestamp(time.time()))
555+ pre_quarantines = self.auditor.quarantines
556+ disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
557+ data = '0' * 1024
558+ etag = md5()
559+ with disk_file.mkstemp() as (fd, tmppath):
560+ os.write(fd, data)
561+ etag.update(data)
562+ etag = etag.hexdigest()
563+ metadata = {
564+ 'ETag': etag,
565+ 'X-Timestamp': timestamp,
566+ 'Content-Length': str(os.fstat(fd).st_size),
567+ }
568+ disk_file.put(fd, tmppath, metadata)
569+ disk_file.close()
570+ self.auditor.run_once()
571+ self.assertEquals(self.auditor.quarantines, pre_quarantines)
572+
573+ def test_object_run_once_no_sda(self):
574+ self.auditor = auditor.ObjectAuditor(self.conf)
575+ cur_part = '0'
576+ timestamp = str(normalize_timestamp(time.time()))
577+ pre_quarantines = self.auditor.quarantines
578+ disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'o')
579+ data = '0' * 1024
580+ etag = md5()
581+ with disk_file.mkstemp() as (fd, tmppath):
582+ os.write(fd, data)
583+ etag.update(data)
584+ etag = etag.hexdigest()
585+ metadata = {
586+ 'ETag': etag,
587+ 'X-Timestamp': timestamp,
588+ 'Content-Length': str(os.fstat(fd).st_size),
589+ }
590+ disk_file.put(fd, tmppath, metadata)
591+ disk_file.close()
592+ os.write(fd, 'extra_data')
593+ self.auditor.run_once()
594+ self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
595+
596+ def test_object_run_once_multi_devices(self):
597+ self.auditor = auditor.ObjectAuditor(self.conf)
598+ cur_part = '0'
599+ timestamp = str(normalize_timestamp(time.time()))
600+ pre_quarantines = self.auditor.quarantines
601+ disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
602+ data = '0' * 10
603+ etag = md5()
604+ with disk_file.mkstemp() as (fd, tmppath):
605+ os.write(fd, data)
606+ etag.update(data)
607+ etag = etag.hexdigest()
608+ metadata = {
609+ 'ETag': etag,
610+ 'X-Timestamp': timestamp,
611+ 'Content-Length': str(os.fstat(fd).st_size),
612+ }
613+ disk_file.put(fd, tmppath, metadata)
614+ disk_file.close()
615+ self.auditor.run_once()
616+ disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob')
617+ data = '1' * 10
618+ etag = md5()
619+ with disk_file.mkstemp() as (fd, tmppath):
620+ os.write(fd, data)
621+ etag.update(data)
622+ etag = etag.hexdigest()
623+ metadata = {
624+ 'ETag': etag,
625+ 'X-Timestamp': timestamp,
626+ 'Content-Length': str(os.fstat(fd).st_size),
627+ }
628+ disk_file.put(fd, tmppath, metadata)
629+ disk_file.close()
630+ os.write(fd, 'extra_data')
631+ self.auditor.run_once()
632+ self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
633
634
635 if __name__ == '__main__':