Merge lp:~david-goetz/swift/zero_byte_audit_dos into lp:~hudson-openstack/swift/trunk

Proposed by David Goetz
Status: Merged
Approved by: gholt
Approved revision: 226
Merged at revision: 232
Proposed branch: lp:~david-goetz/swift/zero_byte_audit_dos
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 794 lines (+282/-86)
19 files modified
bin/swift-log-uploader (+4/-3)
bin/swift-object-auditor (+5/-1)
doc/source/admin_guide.rst (+12/-0)
etc/object-server.conf-sample (+2/-0)
swift/account/auditor.py (+2/-2)
swift/account/reaper.py (+2/-2)
swift/common/daemon.py (+7/-9)
swift/common/db_replicator.py (+2/-2)
swift/common/utils.py (+7/-4)
swift/container/auditor.py (+2/-2)
swift/container/updater.py (+2/-2)
swift/obj/auditor.py (+67/-23)
swift/obj/replicator.py (+2/-2)
swift/obj/updater.py (+2/-2)
swift/stats/account_stats.py (+1/-1)
swift/stats/log_processor.py (+1/-1)
swift/stats/log_uploader.py (+1/-1)
test/unit/common/test_utils.py (+6/-16)
test/unit/obj/test_auditor.py (+155/-13)
To merge this branch: bzr merge lp:~david-goetz/swift/zero_byte_audit_dos
Reviewer Review Type Date Requested Status
gholt (community) Approve
Review via email: mp+49706@code.launchpad.net

Description of the change

Ability to fasttrack auditing of zero byte files.

To post a comment you must log in.
Revision history for this message
gholt (gholt) wrote :

doc/source/admin_guide.rst
Typo/confusing phrase: "to a 1000 from the config file, ..."

Seems there are new conf variables but not documented and not in etc/-sample?

What's the difference between fasttrack_zero_byte_files and zero_byte_file_worker and zero_byte_fps and how do the interact? I think I can mostly make it out.

I guess my thought on this initially was that there would be an extra process forked that would simple scan for zero byte files exclusively and other processes would run as they always have. Then, the command line override would only do the zbf fork and not the normal.

224. By David Goetz

peer review edits

225. By David Goetz

get rid of logger

226. By David Goetz

merge to trunk

Revision history for this message
gholt (gholt) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bin/swift-log-uploader'
2--- bin/swift-log-uploader 2011-02-10 17:57:51 +0000
3+++ bin/swift-log-uploader 2011-03-03 21:53:01 +0000
4@@ -15,16 +15,17 @@
5 # limitations under the License.
6
7 import sys
8-
9+from optparse import OptionParser
10 from swift.stats.log_uploader import LogUploader
11 from swift.common.utils import parse_options
12 from swift.common import utils
13
14 if __name__ == '__main__':
15- conf_file, options = parse_options(usage="Usage: %prog CONFIG_FILE PLUGIN")
16+ parser = OptionParser("Usage: %prog CONFIG_FILE PLUGIN")
17+ conf_file, options = parse_options(parser=parser)
18 try:
19 plugin = options['extra_args'][0]
20- except IndexError:
21+ except (IndexError, KeyError):
22 print "Error: missing plugin name"
23 sys.exit(1)
24
25
26=== modified file 'bin/swift-object-auditor'
27--- bin/swift-object-auditor 2011-01-04 23:34:43 +0000
28+++ bin/swift-object-auditor 2011-03-03 21:53:01 +0000
29@@ -17,7 +17,11 @@
30 from swift.obj.auditor import ObjectAuditor
31 from swift.common.utils import parse_options
32 from swift.common.daemon import run_daemon
33+from optparse import OptionParser
34
35 if __name__ == '__main__':
36- conf_file, options = parse_options(once=True)
37+ parser = OptionParser("%prog CONFIG [options]")
38+ parser.add_option('-z', '--zero_byte_fps',
39+ help='Audit only zero byte files at specified files/sec')
40+ conf_file, options = parse_options(parser=parser, once=True)
41 run_daemon(ObjectAuditor, conf_file, **options)
42
43=== modified file 'doc/source/admin_guide.rst'
44--- doc/source/admin_guide.rst 2010-12-10 01:57:26 +0000
45+++ doc/source/admin_guide.rst 2011-03-03 21:53:01 +0000
46@@ -288,3 +288,15 @@
47 completely stopping the old service. There is also a special case of
48 `swift-init all <command>`, which will run the command for all swift services.
49
50+--------------
51+Object Auditor
52+--------------
53+
54+On system failures, the XFS file system can sometimes truncate files it's
55+trying to write and produce zero byte files. The object-auditor will catch
56+these problems but in the case of a system crash it would be advisable to run
57+an extra, less rate limited sweep to check for these specific files. You can
58+run this command as follows:
59+`swift-object-auditor /path/to/object-server/config/file.conf once -z 1000`
60+"-z" means to only check for zero-byte files at 1000 files per second.
61+
62
63=== modified file 'etc/object-server.conf-sample'
64--- etc/object-server.conf-sample 2011-01-27 21:02:53 +0000
65+++ etc/object-server.conf-sample 2011-03-03 21:53:01 +0000
66@@ -72,3 +72,5 @@
67 # files_per_second = 20
68 # bytes_per_second = 10000000
69 # log_time = 3600
70+# zero_byte_files_per_second = 50
71+
72
73=== modified file 'swift/account/auditor.py'
74--- swift/account/auditor.py 2011-02-02 21:39:08 +0000
75+++ swift/account/auditor.py 2011-03-03 21:53:01 +0000
76@@ -36,7 +36,7 @@
77 self.account_passes = 0
78 self.account_failures = 0
79
80- def run_forever(self): # pragma: no cover
81+ def run_forever(self, *args, **kwargs):
82 """Run the account audit until stopped."""
83 reported = time.time()
84 time.sleep(random() * self.interval)
85@@ -61,7 +61,7 @@
86 if elapsed < self.interval:
87 time.sleep(self.interval - elapsed)
88
89- def run_once(self):
90+ def run_once(self, *args, **kwargs):
91 """Run the account audit once."""
92 self.logger.info('Begin account audit "once" mode')
93 begin = reported = time.time()
94
95=== modified file 'swift/account/reaper.py'
96--- swift/account/reaper.py 2011-02-02 21:39:08 +0000
97+++ swift/account/reaper.py 2011-03-03 21:53:01 +0000
98@@ -97,7 +97,7 @@
99 self.object_ring = Ring(self.object_ring_path)
100 return self.object_ring
101
102- def run_forever(self):
103+ def run_forever(self, *args, **kwargs):
104 """
105 Main entry point when running the reaper in its normal daemon mode.
106 This repeatedly calls :func:`reap_once` no quicker than the
107@@ -112,7 +112,7 @@
108 if elapsed < self.interval:
109 sleep(self.interval - elapsed)
110
111- def run_once(self):
112+ def run_once(self, *args, **kwargs):
113 """
114 Main entry point when running the reaper in 'once' mode, where it will
115 do a single pass over all accounts on the server. This is called
116
117=== modified file 'swift/common/daemon.py'
118--- swift/common/daemon.py 2011-02-14 23:02:08 +0000
119+++ swift/common/daemon.py 2011-03-03 21:53:01 +0000
120@@ -28,11 +28,11 @@
121 self.conf = conf
122 self.logger = utils.get_logger(conf, log_route='daemon')
123
124- def run_once(self):
125+ def run_once(self, *args, **kwargs):
126 """Override this to run the script once"""
127 raise NotImplementedError('run_once not implemented')
128
129- def run_forever(self):
130+ def run_forever(self, *args, **kwargs):
131 """Override this to run forever"""
132 raise NotImplementedError('run_forever not implemented')
133
134@@ -48,15 +48,13 @@
135 sys.exit()
136
137 signal.signal(signal.SIGTERM, kill_children)
138-
139 if once:
140- self.run_once()
141+ self.run_once(**kwargs)
142 else:
143- self.run_forever()
144-
145-
146-def run_daemon(klass, conf_file, section_name='',
147- once=False, **kwargs):
148+ self.run_forever(**kwargs)
149+
150+
151+def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
152 """
153 Loads settings from conf, then instantiates daemon "klass" and runs the
154 daemon with the specified once kwarg. The section_name will be derived
155
156=== modified file 'swift/common/db_replicator.py'
157--- swift/common/db_replicator.py 2011-02-17 17:30:41 +0000
158+++ swift/common/db_replicator.py 2011-03-03 21:53:01 +0000
159@@ -396,7 +396,7 @@
160 except StopIteration:
161 its.remove(it)
162
163- def run_once(self):
164+ def run_once(self, *args, **kwargs):
165 """Run a replication pass once."""
166 self._zero_stats()
167 dirs = []
168@@ -425,7 +425,7 @@
169 self.logger.info(_('Replication run OVER'))
170 self._report_stats()
171
172- def run_forever(self):
173+ def run_forever(self, *args, **kwargs):
174 """
175 Replicate dbs under the given root in an infinite loop.
176 """
177
178=== modified file 'swift/common/utils.py'
179--- swift/common/utils.py 2011-02-24 07:43:05 +0000
180+++ swift/common/utils.py 2011-03-03 21:53:01 +0000
181@@ -370,6 +370,7 @@
182 Custom logging.Formatter will append txn_id to a log message if the record
183 has one and the message does not.
184 """
185+
186 def format(self, record):
187 msg = logging.Formatter.format(self, record)
188 if (record.txn_id and record.levelno != logging.INFO and
189@@ -492,11 +493,11 @@
190 sys.stderr = LoggerFileObject(logger)
191
192
193-def parse_options(usage="%prog CONFIG [options]", once=False, test_args=None):
194+def parse_options(parser=None, once=False, test_args=None):
195 """
196 Parse standard swift server/daemon options with optparse.OptionParser.
197
198- :param usage: String describing usage
199+ :param parser: OptionParser to use. If not sent one will be created.
200 :param once: Boolean indicating the "once" option is available
201 :param test_args: Override sys.argv; used in testing
202
203@@ -505,7 +506,8 @@
204
205 :raises SystemExit: First arg (CONFIG) is required, file must exist
206 """
207- parser = OptionParser(usage)
208+ if not parser:
209+ parser = OptionParser(usage="%prog CONFIG [options]")
210 parser.add_option("-v", "--verbose", default=False, action="store_true",
211 help="log to console")
212 if once:
213@@ -534,7 +536,8 @@
214 extra_args.append(arg)
215
216 options = vars(options)
217- options['extra_args'] = extra_args
218+ if extra_args:
219+ options['extra_args'] = extra_args
220 return config, options
221
222
223
224=== modified file 'swift/container/auditor.py'
225--- swift/container/auditor.py 2011-02-02 21:39:08 +0000
226+++ swift/container/auditor.py 2011-03-03 21:53:01 +0000
227@@ -37,7 +37,7 @@
228 self.container_passes = 0
229 self.container_failures = 0
230
231- def run_forever(self): # pragma: no cover
232+ def run_forever(self, *args, **kwargs):
233 """Run the container audit until stopped."""
234 reported = time.time()
235 time.sleep(random() * self.interval)
236@@ -63,7 +63,7 @@
237 if elapsed < self.interval:
238 time.sleep(self.interval - elapsed)
239
240- def run_once(self):
241+ def run_once(self, *args, **kwargs):
242 """Run the container audit once."""
243 self.logger.info(_('Begin container audit "once" mode'))
244 begin = reported = time.time()
245
246=== modified file 'swift/container/updater.py'
247--- swift/container/updater.py 2011-02-02 21:39:08 +0000
248+++ swift/container/updater.py 2011-03-03 21:53:01 +0000
249@@ -98,7 +98,7 @@
250 finally:
251 os.unlink(filename)
252
253- def run_forever(self): # pragma: no cover
254+ def run_forever(self, *args, **kwargs):
255 """
256 Run the updator continuously.
257 """
258@@ -156,7 +156,7 @@
259 if elapsed < self.interval:
260 time.sleep(self.interval - elapsed)
261
262- def run_once(self):
263+ def run_once(self, *args, **kwargs):
264 """
265 Run the updater once.
266 """
267
268=== modified file 'swift/obj/auditor.py'
269--- swift/obj/auditor.py 2011-02-02 21:39:08 +0000
270+++ swift/obj/auditor.py 2011-03-03 21:53:01 +0000
271@@ -21,23 +21,30 @@
272 from swift.obj import server as object_server
273 from swift.obj.replicator import invalidate_hash
274 from swift.common.utils import get_logger, renamer, audit_location_generator, \
275- ratelimit_sleep
276+ ratelimit_sleep, TRUE_VALUES
277 from swift.common.exceptions import AuditException
278 from swift.common.daemon import Daemon
279
280-
281-class ObjectAuditor(Daemon):
282- """Audit objects."""
283-
284- def __init__(self, conf):
285+SLEEP_BETWEEN_AUDITS = 30
286+
287+
288+class AuditorWorker(object):
289+ """Walk through file system to audit object"""
290+
291+ def __init__(self, conf, zero_byte_only_at_fps=0):
292 self.conf = conf
293 self.logger = get_logger(conf, log_route='object-auditor')
294 self.devices = conf.get('devices', '/srv/node')
295 self.mount_check = conf.get('mount_check', 'true').lower() in \
296- ('true', 't', '1', 'on', 'yes', 'y')
297+ TRUE_VALUES
298 self.max_files_per_second = float(conf.get('files_per_second', 20))
299 self.max_bytes_per_second = float(conf.get('bytes_per_second',
300 10000000))
301+ self.auditor_type = 'ALL'
302+ self.zero_byte_only_at_fps = zero_byte_only_at_fps
303+ if self.zero_byte_only_at_fps:
304+ self.max_files_per_second = float(self.zero_byte_only_at_fps)
305+ self.auditor_type = 'ZBF'
306 self.log_time = int(conf.get('log_time', 3600))
307 self.files_running_time = 0
308 self.bytes_running_time = 0
309@@ -48,18 +55,13 @@
310 self.quarantines = 0
311 self.errors = 0
312
313- def run_forever(self):
314- """Run the object audit until stopped."""
315- while True:
316- self.run_once('forever')
317- self.total_bytes_processed = 0
318- self.total_files_processed = 0
319- time.sleep(30)
320-
321- def run_once(self, mode='once'):
322- """Run the object audit once."""
323- self.logger.info(_('Begin object audit "%s" mode' % mode))
324+ def audit_all_objects(self, mode='once'):
325+ self.logger.info(_('Begin object audit "%s" mode (%s)' %
326+ (mode, self.auditor_type)))
327 begin = reported = time.time()
328+ self.total_bytes_processed = 0
329+ self.total_files_processed = 0
330+ files_running_time = 0
331 all_locs = audit_location_generator(self.devices,
332 object_server.DATADIR,
333 mount_check=self.mount_check,
334@@ -71,9 +73,11 @@
335 self.total_files_processed += 1
336 if time.time() - reported >= self.log_time:
337 self.logger.info(_(
338- 'Since %(start_time)s: Locally: %(passes)d passed audit, '
339+ 'Object audit (%(type)s). '
340+ 'Since %(start_time)s: Locally: %(passes)d passed, '
341 '%(quars)d quarantined, %(errors)d errors '
342 'files/sec: %(frate).2f , bytes/sec: %(brate).2f') % {
343+ 'type': self.auditor_type,
344 'start_time': time.ctime(reported),
345 'passes': self.passes,
346 'quars': self.quarantines,
347@@ -88,9 +92,11 @@
348 self.bytes_processed = 0
349 elapsed = time.time() - begin
350 self.logger.info(_(
351- 'Object audit "%(mode)s" mode completed: %(elapsed).02fs. '
352+ 'Object audit (%(type)s) "%(mode)s" mode '
353+ 'completed: %(elapsed).02fs. '
354 'Total files/sec: %(frate).2f , '
355 'Total bytes/sec: %(brate).2f ') % {
356+ 'type': self.auditor_type,
357 'mode': mode,
358 'elapsed': elapsed,
359 'frate': self.total_files_processed / elapsed,
360@@ -98,7 +104,7 @@
361
362 def object_audit(self, path, device, partition):
363 """
364- Audits the given object path
365+ Audits the given object path.
366
367 :param path: a path to an object
368 :param device: the device the path is on
369@@ -119,11 +125,13 @@
370 if df.data_file is None:
371 # file is deleted, we found the tombstone
372 return
373- if os.path.getsize(df.data_file) != \
374- int(df.metadata['Content-Length']):
375+ obj_size = os.path.getsize(df.data_file)
376+ if obj_size != int(df.metadata['Content-Length']):
377 raise AuditException('Content-Length of %s does not match '
378 'file size of %s' % (int(df.metadata['Content-Length']),
379 os.path.getsize(df.data_file)))
380+ if self.zero_byte_only_at_fps and obj_size:
381+ return
382 etag = md5()
383 for chunk in df:
384 self.bytes_running_time = ratelimit_sleep(
385@@ -150,3 +158,39 @@
386 self.logger.exception(_('ERROR Trying to audit %s'), path)
387 return
388 self.passes += 1
389+
390+
391+class ObjectAuditor(Daemon):
392+ """Audit objects."""
393+
394+ def __init__(self, conf, **options):
395+ self.conf = conf
396+ self.conf_zero_byte_fps = int(conf.get(
397+ 'zero_byte_files_per_second', 50))
398+
399+ def _sleep(self):
400+ time.sleep(SLEEP_BETWEEN_AUDITS)
401+
402+ def run_forever(self, *args, **kwargs):
403+ """Run the object audit until stopped."""
404+ # zero byte only command line option
405+ zbo_fps = kwargs.get('zero_byte_fps', 0)
406+ if zbo_fps:
407+ # only start parent
408+ parent = True
409+ else:
410+ parent = os.fork() # child gets parent = 0
411+ kwargs = {'mode': 'forever'}
412+ if parent:
413+ kwargs['zero_byte_fps'] = zbo_fps or self.conf_zero_byte_fps
414+ while True:
415+ self.run_once(**kwargs)
416+ self._sleep()
417+
418+ def run_once(self, *args, **kwargs):
419+ """Run the object audit once."""
420+ mode = kwargs.get('mode', 'once')
421+ zero_byte_only_at_fps = kwargs.get('zero_byte_fps', 0)
422+ worker = AuditorWorker(self.conf,
423+ zero_byte_only_at_fps=zero_byte_only_at_fps)
424+ worker.audit_all_objects(mode=mode)
425
426=== modified file 'swift/obj/replicator.py'
427--- swift/obj/replicator.py 2011-02-02 21:39:08 +0000
428+++ swift/obj/replicator.py 2011-03-03 21:53:01 +0000
429@@ -547,7 +547,7 @@
430 lockup_detector.kill()
431 self.stats_line()
432
433- def run_once(self):
434+ def run_once(self, *args, **kwargs):
435 start = time.time()
436 self.logger.info(_("Running object replicator in script mode."))
437 self.replicate()
438@@ -555,7 +555,7 @@
439 self.logger.info(
440 _("Object replication complete. (%.02f minutes)"), total)
441
442- def run_forever(self):
443+ def run_forever(self, *args, **kwargs):
444 self.logger.info("Starting object replicator in daemon mode.")
445 # Run the replicator continually
446 while True:
447
448=== modified file 'swift/obj/updater.py'
449--- swift/obj/updater.py 2011-02-02 21:39:08 +0000
450+++ swift/obj/updater.py 2011-03-03 21:53:01 +0000
451@@ -58,7 +58,7 @@
452 self.container_ring = Ring(self.container_ring_path)
453 return self.container_ring
454
455- def run_forever(self): # pragma: no cover
456+ def run_forever(self, *args, **kwargs):
457 """Run the updater continuously."""
458 time.sleep(random() * self.interval)
459 while True:
460@@ -100,7 +100,7 @@
461 if elapsed < self.interval:
462 time.sleep(self.interval - elapsed)
463
464- def run_once(self):
465+ def run_once(self, *args, **kwargs):
466 """Run the updater once"""
467 self.logger.info(_('Begin object update single threaded sweep'))
468 begin = time.time()
469
470=== modified file 'swift/stats/account_stats.py'
471--- swift/stats/account_stats.py 2011-02-02 21:39:08 +0000
472+++ swift/stats/account_stats.py 2011-03-03 21:53:01 +0000
473@@ -51,7 +51,7 @@
474 self.logger = \
475 get_logger(stats_conf, log_route='account-stats')
476
477- def run_once(self):
478+ def run_once(self, *args, **kwargs):
479 self.logger.info(_("Gathering account stats"))
480 start = time.time()
481 self.find_and_process()
482
483=== modified file 'swift/stats/log_processor.py'
484--- swift/stats/log_processor.py 2011-03-01 18:27:19 +0000
485+++ swift/stats/log_processor.py 2011-03-03 21:53:01 +0000
486@@ -235,7 +235,7 @@
487 'log_processing_data')
488 self.worker_count = int(c.get('worker_count', '1'))
489
490- def run_once(self):
491+ def run_once(self, *args, **kwargs):
492 self.logger.info(_("Beginning log processing"))
493 start = time.time()
494 if self.lookback_hours == 0:
495
496=== modified file 'swift/stats/log_uploader.py'
497--- swift/stats/log_uploader.py 2011-02-17 20:42:14 +0000
498+++ swift/stats/log_uploader.py 2011-03-03 21:53:01 +0000
499@@ -68,7 +68,7 @@
500 self.logger = utils.get_logger(uploader_conf, log_name,
501 log_route=plugin_name)
502
503- def run_once(self):
504+ def run_once(self, *args, **kwargs):
505 self.logger.info(_("Uploading logs"))
506 start = time.time()
507 self.upload_all_logs()
508
509=== modified file 'test/unit/common/test_utils.py'
510--- test/unit/common/test_utils.py 2011-02-12 07:55:57 +0000
511+++ test/unit/common/test_utils.py 2011-03-03 21:53:01 +0000
512@@ -275,26 +275,16 @@
513 stde = StringIO()
514 utils.sys.stdout = stdo
515 utils.sys.stderr = stde
516- err_msg = """Usage: test usage
517-
518-Error: missing config file argument
519-"""
520- test_args = []
521- self.assertRaises(SystemExit, utils.parse_options, 'test usage', True,
522- test_args)
523- self.assertEquals(stdo.getvalue(), err_msg)
524+ self.assertRaises(SystemExit, utils.parse_options, once=True,
525+ test_args=[])
526+ self.assert_('missing config file' in stdo.getvalue())
527
528 # verify conf file must exist, context manager will delete temp file
529 with NamedTemporaryFile() as f:
530 conf_file = f.name
531- err_msg += """Usage: test usage
532-
533-Error: unable to locate %s
534-""" % conf_file
535- test_args = [conf_file]
536- self.assertRaises(SystemExit, utils.parse_options, 'test usage', True,
537- test_args)
538- self.assertEquals(stdo.getvalue(), err_msg)
539+ self.assertRaises(SystemExit, utils.parse_options, once=True,
540+ test_args=[conf_file])
541+ self.assert_('unable to locate' in stdo.getvalue())
542
543 # reset stdio
544 utils.sys.stdout = orig_stdout
545
546=== modified file 'test/unit/obj/test_auditor.py'
547--- test/unit/obj/test_auditor.py 2011-02-10 17:57:51 +0000
548+++ test/unit/obj/test_auditor.py 2011-03-03 21:53:01 +0000
549@@ -24,8 +24,9 @@
550 from tempfile import mkdtemp
551 from swift.obj import auditor
552 from swift.obj import server as object_server
553-from swift.obj.server import DiskFile, write_metadata
554-from swift.common.utils import hash_path, mkdirs, normalize_timestamp, renamer
555+from swift.obj.server import DiskFile, write_metadata, DATADIR
556+from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
557+ renamer, storage_directory
558 from swift.obj.replicator import invalidate_hash
559 from swift.common.exceptions import AuditException
560
561@@ -60,7 +61,7 @@
562 unit.xattr_data = {}
563
564 def test_object_audit_extra_data(self):
565- self.auditor = auditor.ObjectAuditor(self.conf)
566+ self.auditor = auditor.AuditorWorker(self.conf)
567 cur_part = '0'
568 disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
569 data = '0' * 1024
570@@ -90,7 +91,7 @@
571 self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
572
573 def test_object_audit_diff_data(self):
574- self.auditor = auditor.ObjectAuditor(self.conf)
575+ self.auditor = auditor.AuditorWorker(self.conf)
576 cur_part = '0'
577 disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
578 data = '0' * 1024
579@@ -133,7 +134,7 @@
580 fp.write('0' * 1024)
581 fp.close()
582 invalidate_hash(os.path.dirname(disk_file.datadir))
583- self.auditor = auditor.ObjectAuditor(self.conf)
584+ self.auditor = auditor.AuditorWorker(self.conf)
585 pre_quarantines = self.auditor.quarantines
586 self.auditor.object_audit(
587 os.path.join(disk_file.datadir, timestamp + '.data'),
588@@ -141,7 +142,7 @@
589 self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
590
591 def test_object_audit_bad_args(self):
592- self.auditor = auditor.ObjectAuditor(self.conf)
593+ self.auditor = auditor.AuditorWorker(self.conf)
594 pre_errors = self.auditor.errors
595 self.auditor.object_audit(5, 'sda', '0')
596 self.assertEquals(self.auditor.errors, pre_errors + 1)
597@@ -150,7 +151,7 @@
598 self.assertEquals(self.auditor.errors, pre_errors) # just returns
599
600 def test_object_run_once_pass(self):
601- self.auditor = auditor.ObjectAuditor(self.conf)
602+ self.auditor = auditor.AuditorWorker(self.conf)
603 self.auditor.log_time = 0
604 cur_part = '0'
605 timestamp = str(normalize_timestamp(time.time()))
606@@ -169,11 +170,11 @@
607 }
608 disk_file.put(fd, tmppath, metadata)
609 disk_file.close()
610- self.auditor.run_once()
611+ self.auditor.audit_all_objects()
612 self.assertEquals(self.auditor.quarantines, pre_quarantines)
613
614 def test_object_run_once_no_sda(self):
615- self.auditor = auditor.ObjectAuditor(self.conf)
616+ self.auditor = auditor.AuditorWorker(self.conf)
617 cur_part = '0'
618 timestamp = str(normalize_timestamp(time.time()))
619 pre_quarantines = self.auditor.quarantines
620@@ -192,11 +193,11 @@
621 disk_file.put(fd, tmppath, metadata)
622 disk_file.close()
623 os.write(fd, 'extra_data')
624- self.auditor.run_once()
625+ self.auditor.audit_all_objects()
626 self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
627
628 def test_object_run_once_multi_devices(self):
629- self.auditor = auditor.ObjectAuditor(self.conf)
630+ self.auditor = auditor.AuditorWorker(self.conf)
631 cur_part = '0'
632 timestamp = str(normalize_timestamp(time.time()))
633 pre_quarantines = self.auditor.quarantines
634@@ -214,7 +215,7 @@
635 }
636 disk_file.put(fd, tmppath, metadata)
637 disk_file.close()
638- self.auditor.run_once()
639+ self.auditor.audit_all_objects()
640 disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob')
641 data = '1' * 10
642 etag = md5()
643@@ -230,9 +231,150 @@
644 disk_file.put(fd, tmppath, metadata)
645 disk_file.close()
646 os.write(fd, 'extra_data')
647- self.auditor.run_once()
648+ self.auditor.audit_all_objects()
649 self.assertEquals(self.auditor.quarantines, pre_quarantines + 1)
650
651+ def test_object_run_fast_track_non_zero(self):
652+ self.auditor = auditor.ObjectAuditor(self.conf)
653+ self.auditor.log_time = 0
654+ cur_part = '0'
655+ disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
656+ data = '0' * 1024
657+ etag = md5()
658+ with disk_file.mkstemp() as (fd, tmppath):
659+ os.write(fd, data)
660+ etag.update(data)
661+ etag = etag.hexdigest()
662+ metadata = {
663+ 'ETag': etag,
664+ 'X-Timestamp': str(normalize_timestamp(time.time())),
665+ 'Content-Length': str(os.fstat(fd).st_size),
666+ }
667+ disk_file.put(fd, tmppath, metadata)
668+ etag = md5()
669+ etag.update('1' + '0' * 1023)
670+ etag = etag.hexdigest()
671+ metadata['ETag'] = etag
672+ write_metadata(fd, metadata)
673+
674+ quarantine_path = os.path.join(self.devices,
675+ 'sda', 'quarantined', 'objects')
676+ self.auditor.run_once(zero_byte_fps=50)
677+ self.assertFalse(os.path.isdir(quarantine_path))
678+ self.auditor.run_once()
679+ self.assertTrue(os.path.isdir(quarantine_path))
680+
681+ def setup_bad_zero_byte(self, with_ts=False):
682+ self.auditor = auditor.ObjectAuditor(self.conf)
683+ self.auditor.log_time = 0
684+ cur_part = '0'
685+ ts_file_path = ''
686+ if with_ts:
687+
688+ name_hash = hash_path('a', 'c', 'o')
689+ dir_path = os.path.join(self.devices, 'sda',
690+ storage_directory(DATADIR, cur_part, name_hash))
691+ ts_file_path = os.path.join(dir_path, '99999.ts')
692+ if not os.path.exists(dir_path):
693+ mkdirs(dir_path)
694+ fp = open(ts_file_path, 'w')
695+ fp.close()
696+
697+
698+ disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
699+ etag = md5()
700+ with disk_file.mkstemp() as (fd, tmppath):
701+ etag = etag.hexdigest()
702+ metadata = {
703+ 'ETag': etag,
704+ 'X-Timestamp': str(normalize_timestamp(time.time())),
705+ 'Content-Length': 10,
706+ }
707+ disk_file.put(fd, tmppath, metadata)
708+ etag = md5()
709+ etag = etag.hexdigest()
710+ metadata['ETag'] = etag
711+ write_metadata(fd, metadata)
712+ if disk_file.data_file:
713+ return disk_file.data_file
714+ return ts_file_path
715+
716+ def test_object_run_fast_track_all(self):
717+ self.setup_bad_zero_byte()
718+ self.auditor.run_once()
719+ quarantine_path = os.path.join(self.devices,
720+ 'sda', 'quarantined', 'objects')
721+ self.assertTrue(os.path.isdir(quarantine_path))
722+
723+ def test_object_run_fast_track_zero(self):
724+ self.setup_bad_zero_byte()
725+ self.auditor.run_once(zero_byte_fps=50)
726+ quarantine_path = os.path.join(self.devices,
727+ 'sda', 'quarantined', 'objects')
728+ self.assertTrue(os.path.isdir(quarantine_path))
729+
730+ def test_with_tombstone(self):
731+ ts_file_path = self.setup_bad_zero_byte(with_ts=True)
732+ self.auditor.run_once()
733+ quarantine_path = os.path.join(self.devices,
734+ 'sda', 'quarantined', 'objects')
735+ self.assertTrue(ts_file_path.endswith('ts'))
736+ self.assertTrue(os.path.exists(ts_file_path))
737+
738+ def test_sleeper(self):
739+ auditor.SLEEP_BETWEEN_AUDITS = 0.01
740+ my_auditor = auditor.ObjectAuditor(self.conf)
741+ start = time.time()
742+ my_auditor._sleep()
743+ self.assertEquals(round(time.time() - start, 2), 0.01)
744+
745+ def test_run_forever(self):
746+
747+ class StopForever(Exception):
748+ pass
749+
750+ class ObjectAuditorMock(object):
751+ check_args = ()
752+ check_kwargs = {}
753+ fork_called = 0
754+ fork_res = 0
755+
756+ def mock_run(self, *args, **kwargs):
757+ self.check_args = args
758+ self.check_kwargs = kwargs
759+
760+ def mock_sleep(self):
761+ raise StopForever('stop')
762+
763+ def mock_fork(self):
764+ self.fork_called += 1
765+ return self.fork_res
766+
767+ my_auditor = auditor.ObjectAuditor(dict(devices=self.devices,
768+ mount_check='false',
769+ zero_byte_files_per_second=89))
770+ mocker = ObjectAuditorMock()
771+ my_auditor.run_once = mocker.mock_run
772+ my_auditor._sleep = mocker.mock_sleep
773+ was_fork = os.fork
774+ try:
775+ os.fork = mocker.mock_fork
776+ self.assertRaises(StopForever,
777+ my_auditor.run_forever, zero_byte_fps=50)
778+ self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 50)
779+ self.assertEquals(mocker.fork_called, 0)
780+
781+ self.assertRaises(StopForever, my_auditor.run_forever)
782+ self.assertEquals(mocker.fork_called, 1)
783+ self.assertEquals(mocker.check_args, ())
784+
785+ mocker.fork_res = 1
786+ self.assertRaises(StopForever, my_auditor.run_forever)
787+ self.assertEquals(mocker.fork_called, 2)
788+ self.assertEquals(mocker.check_kwargs['zero_byte_fps'], 89)
789+
790+ finally:
791+ os.fork = was_fork
792
793 if __name__ == '__main__':
794 unittest.main()