Merge lp:~david-goetz/swift/slow_audit into lp:~hudson-openstack/swift/trunk
- slow_audit
- Merge into trunk
Proposed by
David Goetz
Status: | Merged | ||||||||
---|---|---|---|---|---|---|---|---|---|
Approved by: | gholt | ||||||||
Approved revision: | 153 | ||||||||
Merged at revision: | 160 | ||||||||
Proposed branch: | lp:~david-goetz/swift/slow_audit | ||||||||
Merge into: | lp:~hudson-openstack/swift/trunk | ||||||||
Diff against target: |
635 lines (+380/-60) 7 files modified
doc/source/deployment_guide.rst (+6/-1) etc/object-server.conf-sample (+2/-2) swift/common/utils.py (+36/-6) swift/obj/auditor.py (+50/-36) swift/obj/server.py (+18/-7) test/unit/common/test_utils.py (+45/-5) test/unit/obj/test_auditor.py (+223/-3) |
||||||||
To merge this branch: | bzr merge lp:~david-goetz/swift/slow_audit | ||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
gholt (community) | Approve | ||
Review via email: mp+44861@code.launchpad.net |
Commit message
Description of the change
Rate limit the object auditor, add unit tests, and fix a bug in the location generator.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'doc/source/deployment_guide.rst' |
2 | --- doc/source/deployment_guide.rst 2010-12-02 01:08:49 +0000 |
3 | +++ doc/source/deployment_guide.rst 2011-01-11 17:02:10 +0000 |
4 | @@ -229,7 +229,12 @@ |
5 | log_name object-auditor Label used when logging |
6 | log_facility LOG_LOCAL0 Syslog log facility |
7 | log_level INFO Logging level |
8 | -interval 1800 Minimum time for a pass to take |
9 | +files_per_second 20 Maximum files audited per second. Should |
10 | + be tuned according to individual system |
11 | + specs. 0 is unlimited. |
12 | +bytes_per_second 10000000 Maximum bytes audited per second. Should |
13 | + be tuned according to individual system |
14 | + specs. 0 is unlimited. |
15 | ================== ============== ========================================== |
16 | |
17 | ------------------------------ |
18 | |
19 | === modified file 'etc/object-server.conf-sample' |
20 | --- etc/object-server.conf-sample 2010-10-19 15:02:36 +0000 |
21 | +++ etc/object-server.conf-sample 2011-01-11 17:02:10 +0000 |
22 | @@ -55,5 +55,5 @@ |
23 | |
24 | [object-auditor] |
25 | # log_name = object-auditor |
26 | -# Will audit, at most, 1 object per device per interval |
27 | -# interval = 1800 |
28 | +# files_per_second = 20 |
29 | +# bytes_per_second = 10000000 |
30 | |
31 | === modified file 'swift/common/utils.py' |
32 | --- swift/common/utils.py 2011-01-10 20:37:49 +0000 |
33 | +++ swift/common/utils.py 2011-01-11 17:02:10 +0000 |
34 | @@ -779,19 +779,22 @@ |
35 | on devices |
36 | :param logger: a logger object |
37 | ''' |
38 | - for device in os.listdir(devices): |
39 | - if mount_check and not\ |
40 | + device_dir = os.listdir(devices) |
41 | + # randomize devices in case of process restart before sweep completed |
42 | + shuffle(device_dir) |
43 | + for device in device_dir: |
44 | + if mount_check and not \ |
45 | os.path.ismount(os.path.join(devices, device)): |
46 | if logger: |
47 | logger.debug( |
48 | _('Skipping %s as it is not mounted'), device) |
49 | continue |
50 | - datadir = os.path.join(devices, device, datadir) |
51 | - if not os.path.exists(datadir): |
52 | + datadir_path = os.path.join(devices, device, datadir) |
53 | + if not os.path.exists(datadir_path): |
54 | continue |
55 | - partitions = os.listdir(datadir) |
56 | + partitions = os.listdir(datadir_path) |
57 | for partition in partitions: |
58 | - part_path = os.path.join(datadir, partition) |
59 | + part_path = os.path.join(datadir_path, partition) |
60 | if not os.path.isdir(part_path): |
61 | continue |
62 | suffixes = os.listdir(part_path) |
63 | @@ -808,3 +811,30 @@ |
64 | reverse=True): |
65 | path = os.path.join(hash_path, fname) |
66 | yield path, device, partition |
67 | + |
68 | + |
69 | +def ratelimit_sleep(running_time, max_rate, incr_by=1): |
70 | + ''' |
71 | + Will eventlet.sleep() for the appropriate time so that the max_rate |
72 | + is never exceeded. If max_rate is 0, will not ratelimit. The |
73 | + maximum recommended rate should not exceed (1000 * incr_by) a second |
74 | + as eventlet.sleep() does involve some overhead. Returns running_time |
75 | + that should be used for subsequent calls. |
76 | + |
77 | + :param running_time: the running time of the next allowable request. Best |
78 | + to start at zero. |
79 | + :param max_rate: The maximum rate per second allowed for the process. |
80 | + :param incr_by: How much to increment the counter. Useful if you want |
81 | + to ratelimit 1024 bytes/sec and have differing sizes |
82 | + of requests. Must be >= 0. |
83 | + ''' |
84 | + if not max_rate or incr_by <= 0: |
85 | + return running_time |
86 | + clock_accuracy = 1000.0 |
87 | + now = time.time() * clock_accuracy |
88 | + time_per_request = clock_accuracy * (float(incr_by) / max_rate) |
89 | + if running_time < now: |
90 | + running_time = now |
91 | + elif running_time - now > time_per_request: |
92 | + eventlet.sleep((running_time - now) / clock_accuracy) |
93 | + return running_time + time_per_request |
94 | |
95 | === modified file 'swift/obj/auditor.py' |
96 | --- swift/obj/auditor.py 2011-01-04 23:34:43 +0000 |
97 | +++ swift/obj/auditor.py 2011-01-11 17:02:10 +0000 |
98 | @@ -20,7 +20,8 @@ |
99 | |
100 | from swift.obj import server as object_server |
101 | from swift.obj.replicator import invalidate_hash |
102 | -from swift.common.utils import get_logger, renamer, audit_location_generator |
103 | +from swift.common.utils import get_logger, renamer, audit_location_generator, \ |
104 | + ratelimit_sleep |
105 | from swift.common.exceptions import AuditException |
106 | from swift.common.daemon import Daemon |
107 | |
108 | @@ -34,39 +35,30 @@ |
109 | self.devices = conf.get('devices', '/srv/node') |
110 | self.mount_check = conf.get('mount_check', 'true').lower() in \ |
111 | ('true', 't', '1', 'on', 'yes', 'y') |
112 | - self.interval = int(conf.get('interval', 1800)) |
113 | + self.max_files_per_second = float(conf.get('files_per_second', 20)) |
114 | + self.max_bytes_per_second = float(conf.get('bytes_per_second', |
115 | + 10000000)) |
116 | + self.files_running_time = 0 |
117 | + self.bytes_running_time = 0 |
118 | + self.bytes_processed = 0 |
119 | + self.total_bytes_processed = 0 |
120 | + self.total_files_processed = 0 |
121 | self.passes = 0 |
122 | self.quarantines = 0 |
123 | self.errors = 0 |
124 | + self.log_time = 3600 # once an hour |
125 | |
126 | - def run_forever(self): # pragma: no cover |
127 | + def run_forever(self): |
128 | """Run the object audit until stopped.""" |
129 | - reported = time.time() |
130 | - time.sleep(random() * self.interval) |
131 | while True: |
132 | - begin = time.time() |
133 | - all_locs = audit_location_generator(self.devices, |
134 | - object_server.DATADIR, |
135 | - mount_check=self.mount_check, |
136 | - logger=self.logger) |
137 | - for path, device, partition in all_locs: |
138 | - self.object_audit(path, device, partition) |
139 | - if time.time() - reported >= 3600: # once an hour |
140 | - self.logger.info(_('Since %(time)s: Locally: %(pass)d ' |
141 | - 'passed audit, %(quar)d quarantined, %(error)d errors'), |
142 | - {'time': time.ctime(reported), 'pass': self.passes, |
143 | - 'quar': self.quarantines, 'error': self.errors}) |
144 | - reported = time.time() |
145 | - self.passes = 0 |
146 | - self.quarantines = 0 |
147 | - self.errors = 0 |
148 | - elapsed = time.time() - begin |
149 | - if elapsed < self.interval: |
150 | - time.sleep(self.interval - elapsed) |
151 | + self.run_once('forever') |
152 | + self.total_bytes_processed = 0 |
153 | + self.total_files_processed = 0 |
154 | + time.sleep(30) |
155 | |
156 | - def run_once(self): |
157 | + def run_once(self, mode='once'): |
158 | """Run the object audit once.""" |
159 | - self.logger.info(_('Begin object audit "once" mode')) |
160 | + self.logger.info(_('Begin object audit "%s" mode' % mode)) |
161 | begin = reported = time.time() |
162 | all_locs = audit_location_generator(self.devices, |
163 | object_server.DATADIR, |
164 | @@ -74,18 +66,35 @@ |
165 | logger=self.logger) |
166 | for path, device, partition in all_locs: |
167 | self.object_audit(path, device, partition) |
168 | - if time.time() - reported >= 3600: # once an hour |
169 | - self.logger.info(_('Since %(time)s: Locally: %(pass)d ' |
170 | - 'passed audit, %(quar)d quarantined, %(error)d errors'), |
171 | - {'time': time.ctime(reported), 'pass': self.passes, |
172 | - 'quar': self.quarantines, 'error': self.errors}) |
173 | + self.files_running_time = ratelimit_sleep( |
174 | + self.files_running_time, self.max_files_per_second) |
175 | + self.total_files_processed += 1 |
176 | + if time.time() - reported >= self.log_time: |
177 | + self.logger.info(_( |
178 | + 'Since %(start_time)s: Locally: %(passes)d passed audit, ' |
179 | + '%(quars)d quarantined, %(errors)d errors ' |
180 | + 'files/sec: %(frate).2f , bytes/sec: %(brate).2f') % { |
181 | + 'start_time': time.ctime(reported), |
182 | + 'passes': self.passes, |
183 | + 'quars': self.quarantines, |
184 | + 'errors': self.errors, |
185 | + 'frate': self.passes / (time.time() - reported), |
186 | + 'brate': self.bytes_processed / |
187 | + (time.time() - reported)}) |
188 | reported = time.time() |
189 | self.passes = 0 |
190 | self.quarantines = 0 |
191 | self.errors = 0 |
192 | + self.bytes_processed = 0 |
193 | elapsed = time.time() - begin |
194 | - self.logger.info( |
195 | - _('Object audit "once" mode completed: %.02fs'), elapsed) |
196 | + self.logger.info(_( |
197 | + 'Object audit "%(mode)s" mode completed: %(elapsed).02fs. ' |
198 | + 'Total files/sec: %(frate).2f , ' |
199 | + 'Total bytes/sec: %(brate).2f ') % { |
200 | + 'mode': mode, |
201 | + 'elapsed': elapsed, |
202 | + 'frate': self.total_files_processed / elapsed, |
203 | + 'brate': self.total_bytes_processed / elapsed}) |
204 | |
205 | def object_audit(self, path, device, partition): |
206 | """ |
207 | @@ -102,7 +111,7 @@ |
208 | name = object_server.read_metadata(path)['name'] |
209 | except Exception, exc: |
210 | raise AuditException('Error when reading metadata: %s' % exc) |
211 | - _, account, container, obj = name.split('/', 3) |
212 | + _junk, account, container, obj = name.split('/', 3) |
213 | df = object_server.DiskFile(self.devices, device, |
214 | partition, account, |
215 | container, obj, |
216 | @@ -117,15 +126,20 @@ |
217 | os.path.getsize(df.data_file))) |
218 | etag = md5() |
219 | for chunk in df: |
220 | + self.bytes_running_time = ratelimit_sleep( |
221 | + self.bytes_running_time, self.max_bytes_per_second, |
222 | + incr_by=len(chunk)) |
223 | etag.update(chunk) |
224 | + self.bytes_processed += len(chunk) |
225 | + self.total_bytes_processed += len(chunk) |
226 | etag = etag.hexdigest() |
227 | if etag != df.metadata['ETag']: |
228 | raise AuditException("ETag of %s does not match file's md5 of " |
229 | "%s" % (df.metadata['ETag'], etag)) |
230 | except AuditException, err: |
231 | self.quarantines += 1 |
232 | - self.logger.error(_('ERROR Object %(obj)s failed audit and will be ' |
233 | - 'quarantined: %(err)s'), {'obj': path, 'err': err}) |
234 | + self.logger.error(_('ERROR Object %(obj)s failed audit and will ' |
235 | + 'be quarantined: %(err)s'), {'obj': path, 'err': err}) |
236 | invalidate_hash(os.path.dirname(path)) |
237 | renamer_path = os.path.dirname(path) |
238 | renamer(renamer_path, os.path.join(self.devices, device, |
239 | |
240 | === modified file 'swift/obj/server.py' |
241 | --- swift/obj/server.py 2011-01-07 21:17:29 +0000 |
242 | +++ swift/obj/server.py 2011-01-11 17:02:10 +0000 |
243 | @@ -72,6 +72,21 @@ |
244 | return pickle.loads(metadata) |
245 | |
246 | |
247 | +def write_metadata(fd, metadata): |
248 | + """ |
249 | + Helper function to write pickled metadata for an object file. |
250 | + |
251 | + :param fd: file descriptor to write the metadata |
252 | + :param metadata: metadata to write |
253 | + """ |
254 | + metastr = pickle.dumps(metadata, PICKLE_PROTOCOL) |
255 | + key = 0 |
256 | + while metastr: |
257 | + setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254]) |
258 | + metastr = metastr[254:] |
259 | + key += 1 |
260 | + |
261 | + |
262 | class DiskFile(object): |
263 | """ |
264 | Manage object files on disk. |
265 | @@ -97,6 +112,7 @@ |
266 | self.metadata = {} |
267 | self.meta_file = None |
268 | self.data_file = None |
269 | + self.fp = None |
270 | if not os.path.exists(self.datadir): |
271 | return |
272 | files = sorted(os.listdir(self.datadir), reverse=True) |
273 | @@ -203,17 +219,12 @@ |
274 | |
275 | :params fd: file descriptor of the temp file |
276 | :param tmppath: path to the temporary file being used |
277 | - :param metadata: dictionary of metada to be written |
278 | + :param metadata: dictionary of metadata to be written |
279 | :param extention: extension to be used when making the file |
280 | """ |
281 | metadata['name'] = self.name |
282 | timestamp = normalize_timestamp(metadata['X-Timestamp']) |
283 | - metastr = pickle.dumps(metadata, PICKLE_PROTOCOL) |
284 | - key = 0 |
285 | - while metastr: |
286 | - setxattr(fd, '%s%s' % (METADATA_KEY, key or ''), metastr[:254]) |
287 | - metastr = metastr[254:] |
288 | - key += 1 |
289 | + write_metadata(fd, metadata) |
290 | if 'Content-Length' in metadata: |
291 | drop_buffer_cache(fd, 0, int(metadata['Content-Length'])) |
292 | os.fsync(fd) |
293 | |
294 | === modified file 'test/unit/common/test_utils.py' |
295 | --- test/unit/common/test_utils.py 2011-01-07 21:17:29 +0000 |
296 | +++ test/unit/common/test_utils.py 2011-01-11 17:02:10 +0000 |
297 | @@ -21,6 +21,7 @@ |
298 | import os |
299 | import socket |
300 | import sys |
301 | +import time |
302 | import unittest |
303 | from getpass import getuser |
304 | from shutil import rmtree |
305 | @@ -34,6 +35,7 @@ |
306 | |
307 | |
308 | class MockOs(): |
309 | + |
310 | def __init__(self, pass_funcs=[], called_funcs=[], raise_funcs=[]): |
311 | self.closed_fds = [] |
312 | for func in pass_funcs: |
313 | @@ -183,12 +185,12 @@ |
314 | print 'test2' |
315 | self.assertEquals(sio.getvalue(), 'STDOUT: test2\n') |
316 | sys.stderr = lfo |
317 | - print >>sys.stderr, 'test4' |
318 | + print >> sys.stderr, 'test4' |
319 | self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n') |
320 | sys.stdout = orig_stdout |
321 | print 'test5' |
322 | self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n') |
323 | - print >>sys.stderr, 'test6' |
324 | + print >> sys.stderr, 'test6' |
325 | self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n' |
326 | 'STDOUT: test6\n') |
327 | sys.stderr = orig_stderr |
328 | @@ -316,7 +318,7 @@ |
329 | |
330 | def test_hash_path(self): |
331 | # Yes, these tests are deliberately very fragile. We want to make sure |
332 | - # that if someones changes the results hash_path produces, they know it. |
333 | + # that if someones changes the results hash_path produces, they know it |
334 | self.assertEquals(utils.hash_path('a'), |
335 | '1c84525acb02107ea475dcd3d09c2c58') |
336 | self.assertEquals(utils.hash_path('a', 'c'), |
337 | @@ -355,10 +357,12 @@ |
338 | result = utils.readconf('/tmp/test', 'section2').get('log_name') |
339 | expected = 'yarr' |
340 | self.assertEquals(result, expected) |
341 | - result = utils.readconf('/tmp/test', 'section1', log_name='foo').get('log_name') |
342 | + result = utils.readconf('/tmp/test', 'section1', |
343 | + log_name='foo').get('log_name') |
344 | expected = 'foo' |
345 | self.assertEquals(result, expected) |
346 | - result = utils.readconf('/tmp/test', 'section1', defaults={'bar': 'baz'}) |
347 | + result = utils.readconf('/tmp/test', 'section1', |
348 | + defaults={'bar': 'baz'}) |
349 | expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'} |
350 | self.assertEquals(result, expected) |
351 | os.unlink('/tmp/test') |
352 | @@ -438,5 +442,41 @@ |
353 | self.assertNotEquals(utils.get_logger.console, old_handler) |
354 | logger.logger.removeHandler(utils.get_logger.console) |
355 | |
356 | + def test_ratelimit_sleep(self): |
357 | + running_time = 0 |
358 | + start = time.time() |
359 | + for i in range(100): |
360 | + running_time = utils.ratelimit_sleep(running_time, 0) |
361 | + self.assertTrue(abs((time.time() - start) * 100) < 1) |
362 | + |
363 | + running_time = 0 |
364 | + start = time.time() |
365 | + for i in range(50): |
366 | + running_time = utils.ratelimit_sleep(running_time, 200) |
367 | + # make sure its accurate to 10th of a second |
368 | + self.assertTrue(abs(25 - (time.time() - start) * 100) < 10) |
369 | + |
370 | + def test_ratelimit_sleep_with_sleep(self): |
371 | + running_time = 0 |
372 | + start = time.time() |
373 | + for i in range(25): |
374 | + running_time = utils.ratelimit_sleep(running_time, 50) |
375 | + time.sleep(1.0 / 75) |
376 | + # make sure its accurate to 10th of a second |
377 | + self.assertTrue(abs(50 - (time.time() - start) * 100) < 10) |
378 | + |
379 | + def test_ratelimit_sleep_with_incr(self): |
380 | + running_time = 0 |
381 | + start = time.time() |
382 | + vals = [5, 17, 0, 3, 11, 30, |
383 | + 40, 4, 13, 2, -1] * 2 # adds up to 250 (with no -1) |
384 | + total = 0 |
385 | + for i in vals: |
386 | + running_time = utils.ratelimit_sleep(running_time, |
387 | + 500, incr_by=i) |
388 | + total += i |
389 | + self.assertTrue(abs(50 - (time.time() - start) * 100) < 10) |
390 | + |
391 | + |
392 | if __name__ == '__main__': |
393 | unittest.main() |
394 | |
395 | === modified file 'test/unit/obj/test_auditor.py' |
396 | --- test/unit/obj/test_auditor.py 2011-01-04 23:34:43 +0000 |
397 | +++ test/unit/obj/test_auditor.py 2011-01-11 17:02:10 +0000 |
398 | @@ -14,14 +14,234 @@ |
399 | # limitations under the License. |
400 | |
401 | # TODO: Tests |
402 | - |
403 | import unittest |
404 | +import tempfile |
405 | +import os |
406 | +import time |
407 | +from shutil import rmtree |
408 | +from hashlib import md5 |
409 | from swift.obj import auditor |
410 | +from swift.obj.server import DiskFile, write_metadata |
411 | +from swift.common.utils import hash_path, mkdirs, normalize_timestamp, renamer |
412 | +from swift.obj.replicator import invalidate_hash |
413 | +from swift.common.exceptions import AuditException |
414 | + |
415 | |
416 | class TestAuditor(unittest.TestCase): |
417 | |
418 | - def test_placeholder(self): |
419 | - pass |
420 | + def setUp(self): |
421 | + self.path_to_test_xfs = os.environ.get('PATH_TO_TEST_XFS') |
422 | + if not self.path_to_test_xfs or \ |
423 | + not os.path.exists(self.path_to_test_xfs): |
424 | + print >> sys.stderr, 'WARNING: PATH_TO_TEST_XFS not set or not ' \ |
425 | + 'pointing to a valid directory.\n' \ |
426 | + 'Please set PATH_TO_TEST_XFS to a directory on an XFS file ' \ |
427 | + 'system for testing.' |
428 | + self.testdir = '/tmp/SWIFTUNITTEST' |
429 | + else: |
430 | + self.testdir = os.path.join(self.path_to_test_xfs, |
431 | + 'tmp_test_object_auditor') |
432 | + |
433 | + self.devices = os.path.join(self.testdir, 'node') |
434 | + rmtree(self.testdir, ignore_errors=1) |
435 | + os.mkdir(self.testdir) |
436 | + os.mkdir(self.devices) |
437 | + os.mkdir(os.path.join(self.devices, 'sda')) |
438 | + self.objects = os.path.join(self.devices, 'sda', 'objects') |
439 | + |
440 | + os.mkdir(os.path.join(self.devices, 'sdb')) |
441 | + self.objects_2 = os.path.join(self.devices, 'sdb', 'objects') |
442 | + |
443 | + os.mkdir(self.objects) |
444 | + self.parts = {} |
445 | + for part in ['0', '1', '2', '3']: |
446 | + self.parts[part] = os.path.join(self.objects, part) |
447 | + os.mkdir(os.path.join(self.objects, part)) |
448 | + |
449 | + self.conf = dict( |
450 | + devices=self.devices, |
451 | + mount_check='false') |
452 | + |
453 | + def tearDown(self): |
454 | + rmtree(self.testdir, ignore_errors=1) |
455 | + |
456 | + def test_object_audit_extra_data(self): |
457 | + self.auditor = auditor.ObjectAuditor(self.conf) |
458 | + cur_part = '0' |
459 | + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') |
460 | + data = '0' * 1024 |
461 | + etag = md5() |
462 | + with disk_file.mkstemp() as (fd, tmppath): |
463 | + os.write(fd, data) |
464 | + etag.update(data) |
465 | + etag = etag.hexdigest() |
466 | + timestamp = str(normalize_timestamp(time.time())) |
467 | + metadata = { |
468 | + 'ETag': etag, |
469 | + 'X-Timestamp': timestamp, |
470 | + 'Content-Length': str(os.fstat(fd).st_size), |
471 | + } |
472 | + disk_file.put(fd, tmppath, metadata) |
473 | + pre_quarantines = self.auditor.quarantines |
474 | + |
475 | + self.auditor.object_audit( |
476 | + os.path.join(disk_file.datadir, timestamp + '.data'), |
477 | + 'sda', cur_part) |
478 | + self.assertEquals(self.auditor.quarantines, pre_quarantines) |
479 | + |
480 | + os.write(fd, 'extra_data') |
481 | + self.auditor.object_audit( |
482 | + os.path.join(disk_file.datadir, timestamp + '.data'), |
483 | + 'sda', cur_part) |
484 | + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) |
485 | + |
486 | + def test_object_audit_diff_data(self): |
487 | + self.auditor = auditor.ObjectAuditor(self.conf) |
488 | + cur_part = '0' |
489 | + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') |
490 | + data = '0' * 1024 |
491 | + etag = md5() |
492 | + timestamp = str(normalize_timestamp(time.time())) |
493 | + with disk_file.mkstemp() as (fd, tmppath): |
494 | + os.write(fd, data) |
495 | + etag.update(data) |
496 | + etag = etag.hexdigest() |
497 | + metadata = { |
498 | + 'ETag': etag, |
499 | + 'X-Timestamp': timestamp, |
500 | + 'Content-Length': str(os.fstat(fd).st_size), |
501 | + } |
502 | + disk_file.put(fd, tmppath, metadata) |
503 | + pre_quarantines = self.auditor.quarantines |
504 | + |
505 | + self.auditor.object_audit( |
506 | + os.path.join(disk_file.datadir, timestamp + '.data'), |
507 | + 'sda', cur_part) |
508 | + self.assertEquals(self.auditor.quarantines, pre_quarantines) |
509 | + etag = md5() |
510 | + etag.update('1' + '0' * 1023) |
511 | + etag = etag.hexdigest() |
512 | + metadata['ETag'] = etag |
513 | + write_metadata(fd, metadata) |
514 | + |
515 | + self.auditor.object_audit( |
516 | + os.path.join(disk_file.datadir, timestamp + '.data'), |
517 | + 'sda', cur_part) |
518 | + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) |
519 | + |
520 | + def test_object_audit_no_meta(self): |
521 | + self.auditor = auditor.ObjectAuditor(self.conf) |
522 | + cur_part = '0' |
523 | + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') |
524 | + data = '0' * 1024 |
525 | + etag = md5() |
526 | + pre_quarantines = self.auditor.quarantines |
527 | + with disk_file.mkstemp() as (fd, tmppath): |
528 | + os.write(fd, data) |
529 | + etag.update(data) |
530 | + etag = etag.hexdigest() |
531 | + timestamp = str(normalize_timestamp(time.time())) |
532 | + os.fsync(fd) |
533 | + invalidate_hash(os.path.dirname(disk_file.datadir)) |
534 | + renamer(tmppath, os.path.join(disk_file.datadir, |
535 | + timestamp + '.data')) |
536 | + self.auditor.object_audit( |
537 | + os.path.join(disk_file.datadir, timestamp + '.data'), |
538 | + 'sda', cur_part) |
539 | + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) |
540 | + |
541 | + def test_object_audit_bad_args(self): |
542 | + self.auditor = auditor.ObjectAuditor(self.conf) |
543 | + pre_errors = self.auditor.errors |
544 | + self.auditor.object_audit(5, 'sda', '0') |
545 | + self.assertEquals(self.auditor.errors, pre_errors + 1) |
546 | + pre_errors = self.auditor.errors |
547 | + self.auditor.object_audit('badpath', 'sda', '0') |
548 | + self.assertEquals(self.auditor.errors, pre_errors) # just returns |
549 | + |
550 | + def test_object_run_once_pass(self): |
551 | + self.auditor = auditor.ObjectAuditor(self.conf) |
552 | + self.auditor.log_time = 0 |
553 | + cur_part = '0' |
554 | + timestamp = str(normalize_timestamp(time.time())) |
555 | + pre_quarantines = self.auditor.quarantines |
556 | + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') |
557 | + data = '0' * 1024 |
558 | + etag = md5() |
559 | + with disk_file.mkstemp() as (fd, tmppath): |
560 | + os.write(fd, data) |
561 | + etag.update(data) |
562 | + etag = etag.hexdigest() |
563 | + metadata = { |
564 | + 'ETag': etag, |
565 | + 'X-Timestamp': timestamp, |
566 | + 'Content-Length': str(os.fstat(fd).st_size), |
567 | + } |
568 | + disk_file.put(fd, tmppath, metadata) |
569 | + disk_file.close() |
570 | + self.auditor.run_once() |
571 | + self.assertEquals(self.auditor.quarantines, pre_quarantines) |
572 | + |
573 | + def test_object_run_once_no_sda(self): |
574 | + self.auditor = auditor.ObjectAuditor(self.conf) |
575 | + cur_part = '0' |
576 | + timestamp = str(normalize_timestamp(time.time())) |
577 | + pre_quarantines = self.auditor.quarantines |
578 | + disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'o') |
579 | + data = '0' * 1024 |
580 | + etag = md5() |
581 | + with disk_file.mkstemp() as (fd, tmppath): |
582 | + os.write(fd, data) |
583 | + etag.update(data) |
584 | + etag = etag.hexdigest() |
585 | + metadata = { |
586 | + 'ETag': etag, |
587 | + 'X-Timestamp': timestamp, |
588 | + 'Content-Length': str(os.fstat(fd).st_size), |
589 | + } |
590 | + disk_file.put(fd, tmppath, metadata) |
591 | + disk_file.close() |
592 | + os.write(fd, 'extra_data') |
593 | + self.auditor.run_once() |
594 | + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) |
595 | + |
596 | + def test_object_run_once_multi_devices(self): |
597 | + self.auditor = auditor.ObjectAuditor(self.conf) |
598 | + cur_part = '0' |
599 | + timestamp = str(normalize_timestamp(time.time())) |
600 | + pre_quarantines = self.auditor.quarantines |
601 | + disk_file = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') |
602 | + data = '0' * 10 |
603 | + etag = md5() |
604 | + with disk_file.mkstemp() as (fd, tmppath): |
605 | + os.write(fd, data) |
606 | + etag.update(data) |
607 | + etag = etag.hexdigest() |
608 | + metadata = { |
609 | + 'ETag': etag, |
610 | + 'X-Timestamp': timestamp, |
611 | + 'Content-Length': str(os.fstat(fd).st_size), |
612 | + } |
613 | + disk_file.put(fd, tmppath, metadata) |
614 | + disk_file.close() |
615 | + self.auditor.run_once() |
616 | + disk_file = DiskFile(self.devices, 'sdb', cur_part, 'a', 'c', 'ob') |
617 | + data = '1' * 10 |
618 | + etag = md5() |
619 | + with disk_file.mkstemp() as (fd, tmppath): |
620 | + os.write(fd, data) |
621 | + etag.update(data) |
622 | + etag = etag.hexdigest() |
623 | + metadata = { |
624 | + 'ETag': etag, |
625 | + 'X-Timestamp': timestamp, |
626 | + 'Content-Length': str(os.fstat(fd).st_size), |
627 | + } |
628 | + disk_file.put(fd, tmppath, metadata) |
629 | + disk_file.close() |
630 | + os.write(fd, 'extra_data') |
631 | + self.auditor.run_once() |
632 | + self.assertEquals(self.auditor.quarantines, pre_quarantines + 1) |
633 | |
634 | |
635 | if __name__ == '__main__': |
Looks good. You'll have to manually merge trunk into this though before final merge approval, as there's a small conflict at the moment.