Merge lp:~greglange/swift/lp725215 into lp:~hudson-openstack/swift/trunk

Proposed by Greg Lange
Status: Merged
Approved by: David Goetz
Approved revision: 256
Merged at revision: 264
Proposed branch: lp:~greglange/swift/lp725215
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 820 lines (+641/-76)
2 files modified
swift/stats/log_processor.py (+231/-73)
test/unit/stats/test_log_processor.py (+410/-3)
To merge this branch: bzr merge lp:~greglange/swift/lp725215
Reviewer Review Type Date Requested Status
John Dickinson Approve
Greg Lange Pending
Review via email: mp+55637@code.launchpad.net

This proposal supersedes a proposal from 2011-03-23.

Description of the change

Refactored the log processing daemon to make it more testable.

Added tests for that.

I shouldn't have changed how it worked at all.

This needs to be tested on staging extensively before being pushed to production.

To post a comment you must log in.
Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

pep8 line 35, log_processor.py

test_get_output seems broken. perhaps you forgot "in" on the self.assert_() calls (lines 645, 648)? another option is to sort the expected out and the given out and compare equality

review: Needs Fixing
Revision history for this message
Greg Lange (greglange) : Posted in a previous version of this proposal
review: Needs Resubmitting
lp:~greglange/swift/lp725215 updated
255. By Greg Lange <email address hidden>

fixed merge conflict

Revision history for this message
John Dickinson (notmyname) wrote :

good

review: Approve
lp:~greglange/swift/lp725215 updated
256. By Greg Lange <email address hidden>

added doc strings to new methods

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'swift/stats/log_processor.py'
--- swift/stats/log_processor.py 2011-03-30 21:10:27 +0000
+++ swift/stats/log_processor.py 2011-04-11 21:52:38 +0000
@@ -30,6 +30,8 @@
30from swift.common.utils import get_logger, readconf30from swift.common.utils import get_logger, readconf
31from swift.common.daemon import Daemon31from swift.common.daemon import Daemon
3232
33now = datetime.datetime.now
34
3335
34class BadFileDownload(Exception):36class BadFileDownload(Exception):
35 def __init__(self, status_code=None):37 def __init__(self, status_code=None):
@@ -234,31 +236,46 @@
234 self.log_processor_container = c.get('container_name',236 self.log_processor_container = c.get('container_name',
235 'log_processing_data')237 'log_processing_data')
236 self.worker_count = int(c.get('worker_count', '1'))238 self.worker_count = int(c.get('worker_count', '1'))
237239 self._keylist_mapping = None
238 def run_once(self, *args, **kwargs):240 self.processed_files_filename = 'processed_files.pickle.gz'
239 for k in 'lookback_hours lookback_window'.split():241
240 if kwargs[k] is not None:242 def get_lookback_interval(self):
241 setattr(self, k, kwargs[k])243 """
242244 :returns: lookback_start, lookback_end.
243 self.logger.info(_("Beginning log processing"))245
244 start = time.time()246 Both or just lookback_end can be None. Otherwise, returns strings
247 of the form 'YYYYMMDDHH'. The interval returned is used as bounds
248 when looking for logs to processes.
249
250 A returned None means don't limit the log files examined on that
251 side of the interval.
252 """
253
245 if self.lookback_hours == 0:254 if self.lookback_hours == 0:
246 lookback_start = None255 lookback_start = None
247 lookback_end = None256 lookback_end = None
248 else:257 else:
249 delta_hours = datetime.timedelta(hours=self.lookback_hours)258 delta_hours = datetime.timedelta(hours=self.lookback_hours)
250 lookback_start = datetime.datetime.now() - delta_hours259 lookback_start = now() - delta_hours
251 lookback_start = lookback_start.strftime('%Y%m%d%H')260 lookback_start = lookback_start.strftime('%Y%m%d%H')
252 if self.lookback_window == 0:261 if self.lookback_window == 0:
253 lookback_end = None262 lookback_end = None
254 else:263 else:
255 delta_window = datetime.timedelta(hours=self.lookback_window)264 delta_window = datetime.timedelta(hours=self.lookback_window)
256 lookback_end = datetime.datetime.now() - \265 lookback_end = now() - \
257 delta_hours + \266 delta_hours + \
258 delta_window267 delta_window
259 lookback_end = lookback_end.strftime('%Y%m%d%H')268 lookback_end = lookback_end.strftime('%Y%m%d%H')
260 self.logger.debug('lookback_start: %s' % lookback_start)269 return lookback_start, lookback_end
261 self.logger.debug('lookback_end: %s' % lookback_end)270
271 def get_processed_files_list(self):
272 """
273 :returns: a set of files that have already been processed or returns
274 None on error.
275
276 Downloads the set from the stats account. Creates an empty set if
277 the an existing file cannot be found.
278 """
262 try:279 try:
263 # Note: this file (or data set) will grow without bound.280 # Note: this file (or data set) will grow without bound.
264 # In practice, if it becomes a problem (say, after many months of281 # In practice, if it becomes a problem (say, after many months of
@@ -266,44 +283,52 @@
266 # entries. Automatically pruning on each run could be dangerous.283 # entries. Automatically pruning on each run could be dangerous.
267 # There is not a good way to determine when an old entry should be284 # There is not a good way to determine when an old entry should be
268 # pruned (lookback_hours could be set to anything and could change)285 # pruned (lookback_hours could be set to anything and could change)
269 processed_files_stream = self.log_processor.get_object_data(286 stream = self.log_processor.get_object_data(
270 self.log_processor_account,287 self.log_processor_account,
271 self.log_processor_container,288 self.log_processor_container,
272 'processed_files.pickle.gz',289 self.processed_files_filename,
273 compressed=True)290 compressed=True)
274 buf = '\n'.join(x for x in processed_files_stream)291 buf = '\n'.join(x for x in stream)
275 if buf:292 if buf:
276 already_processed_files = cPickle.loads(buf)293 files = cPickle.loads(buf)
277 else:294 else:
278 already_processed_files = set()295 return None
279 except BadFileDownload, err:296 except BadFileDownload, err:
280 if err.status_code == 404:297 if err.status_code == 404:
281 already_processed_files = set()298 files = set()
282 else:299 else:
283 self.logger.error(_('Log processing unable to load list of '300 return None
284 'already processed log files'))301 return files
285 return302
286 self.logger.debug(_('found %d processed files') % \303 def get_aggregate_data(self, processed_files, input_data):
287 len(already_processed_files))304 """
288 logs_to_process = self.log_processor.get_data_list(lookback_start,305 Aggregates stats data by account/hour, summing as needed.
289 lookback_end,306
290 already_processed_files)307 :param processed_files: set of processed files
291 self.logger.info(_('loaded %d files to process') %308 :param input_data: is the output from multiprocess_collate/the plugins.
292 len(logs_to_process))309
293 if not logs_to_process:310 :returns: A dict containing data aggregated from the input_data
294 self.logger.info(_("Log processing done (%0.2f minutes)") %311 passed in.
295 ((time.time() - start) / 60))312
296 return313 The dict returned has tuple keys of the form:
297314 (account, year, month, day, hour)
298 # map315 The dict returned has values that are dicts with items of this
299 processor_args = (self.total_conf, self.logger)316 form:
300 results = multiprocess_collate(processor_args, logs_to_process,317 key:field_value
301 self.worker_count)318 - key corresponds to something in one of the plugin's keylist
302319 mapping, something like the tuple (source, level, verb, code)
303 #reduce320 - field_value is the sum of the field_values for the
321 corresponding values in the input
322
323 Both input_data and the dict returned are hourly aggregations of
324 stats.
325
326 Multiple values for the same (account, hour, tuple key) found in
327 input_data are summed in the dict returned.
328 """
329
304 aggr_data = {}330 aggr_data = {}
305 processed_files = already_processed_files331 for item, data in input_data:
306 for item, data in results:
307 # since item contains the plugin and the log name, new plugins will332 # since item contains the plugin and the log name, new plugins will
308 # "reprocess" the file and the results will be in the final csv.333 # "reprocess" the file and the results will be in the final csv.
309 processed_files.add(item)334 processed_files.add(item)
@@ -315,14 +340,30 @@
315 # processing plugins need to realize this340 # processing plugins need to realize this
316 existing_data[i] = current + j341 existing_data[i] = current + j
317 aggr_data[k] = existing_data342 aggr_data[k] = existing_data
318343 return aggr_data
319 # group344
320 # reduce a large number of keys in aggr_data[k] to a small number of345 def get_final_info(self, aggr_data):
321 # output keys346 """
322 keylist_mapping = self.log_processor.generate_keylist_mapping()347 Aggregates data from aggr_data based on the keylist mapping.
348
349 :param aggr_data: The results of the get_aggregate_data function.
350 :returns: a dict of further aggregated data
351
352 The dict returned has keys of the form:
353 (account, year, month, day, hour)
354 The dict returned has values that are dicts with items of this
355 form:
356 'field_name': field_value (int)
357
358 Data is aggregated as specified by the keylist mapping. The
359 keylist mapping specifies which keys to combine in aggr_data
360 and the final field_names for these combined keys in the dict
361 returned. Fields combined are summed.
362 """
363
323 final_info = collections.defaultdict(dict)364 final_info = collections.defaultdict(dict)
324 for account, data in aggr_data.items():365 for account, data in aggr_data.items():
325 for key, mapping in keylist_mapping.items():366 for key, mapping in self.keylist_mapping.items():
326 if isinstance(mapping, (list, set)):367 if isinstance(mapping, (list, set)):
327 value = 0368 value = 0
328 for k in mapping:369 for k in mapping:
@@ -336,37 +377,154 @@
336 except KeyError:377 except KeyError:
337 value = 0378 value = 0
338 final_info[account][key] = value379 final_info[account][key] = value
339380 return final_info
340 # output381
341 sorted_keylist_mapping = sorted(keylist_mapping)382 def store_processed_files_list(self, processed_files):
342 columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)383 """
343 out_buf = [columns]384 Stores the proccessed files list in the stats account.
385
386 :param processed_files: set of processed files
387 """
388
389 s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
390 f = cStringIO.StringIO(s)
391 self.log_processor.internal_proxy.upload_file(f,
392 self.log_processor_account,
393 self.log_processor_container,
394 self.processed_files_filename)
395
396 def get_output(self, final_info):
397 """
398 :returns: a list of rows to appear in the csv file.
399
400 The first row contains the column headers for the rest of the
401 rows in the returned list.
402
403 Each row after the first row corresponds to an account's data
404 for that hour.
405 """
406
407 sorted_keylist_mapping = sorted(self.keylist_mapping)
408 columns = ['data_ts', 'account'] + sorted_keylist_mapping
409 output = [columns]
344 for (account, year, month, day, hour), d in final_info.items():410 for (account, year, month, day, hour), d in final_info.items():
345 data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)411 data_ts = '%04d/%02d/%02d %02d:00:00' % \
346 row = [data_ts]412 (int(year), int(month), int(day), int(hour))
347 row.append('%s' % account)413 row = [data_ts, '%s' % (account)]
348 for k in sorted_keylist_mapping:414 for k in sorted_keylist_mapping:
349 row.append('%s' % d[k])415 row.append(str(d[k]))
350 out_buf.append(','.join(row))416 output.append(row)
351 out_buf = '\n'.join(out_buf)417 return output
418
419 def store_output(self, output):
420 """
421 Takes the a list of rows and stores a csv file of the values in the
422 stats account.
423
424 :param output: list of rows to appear in the csv file
425
426 This csv file is final product of this script.
427 """
428
429 out_buf = '\n'.join([','.join(row) for row in output])
352 h = hashlib.md5(out_buf).hexdigest()430 h = hashlib.md5(out_buf).hexdigest()
353 upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h431 upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
354 f = cStringIO.StringIO(out_buf)432 f = cStringIO.StringIO(out_buf)
355 self.log_processor.internal_proxy.upload_file(f,433 self.log_processor.internal_proxy.upload_file(f,
356 self.log_processor_account,434 self.log_processor_account,
357 self.log_processor_container,435 self.log_processor_container,
358 upload_name)436 upload_name)
359437
360 # cleanup438 @property
361 s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)439 def keylist_mapping(self):
362 f = cStringIO.StringIO(s)440 """
363 self.log_processor.internal_proxy.upload_file(f,441 :returns: the keylist mapping.
364 self.log_processor_account,442
365 self.log_processor_container,443 The keylist mapping determines how the stats fields are aggregated in
366 'processed_files.pickle.gz')444 the final aggregation step.
445 """
446
447 if self._keylist_mapping == None:
448 self._keylist_mapping = \
449 self.log_processor.generate_keylist_mapping()
450 return self._keylist_mapping
451
452 def process_logs(self, logs_to_process, processed_files):
453 """
454 :param logs_to_process: list of logs to process
455 :param processed_files: set of processed files
456
457 :returns: returns a list of rows of processed data.
458
459 The first row is the column headers. The rest of the rows contain
460 hourly aggregate data for the account specified in the row.
461
462 Files processed are added to the processed_files set.
463
464 When a large data structure is no longer needed, it is deleted in
465 an effort to conserve memory.
466 """
467
468 # map
469 processor_args = (self.total_conf, self.logger)
470 results = multiprocess_collate(processor_args, logs_to_process,
471 self.worker_count)
472
473 # reduce
474 aggr_data = self.get_aggregate_data(processed_files, results)
475 del results
476
477 # group
478 # reduce a large number of keys in aggr_data[k] to a small
479 # number of output keys
480 final_info = self.get_final_info(aggr_data)
481 del aggr_data
482
483 # output
484 return self.get_output(final_info)
485
486 def run_once(self, *args, **kwargs):
487 """
488 Process log files that fall within the lookback interval.
489
490 Upload resulting csv file to stats account.
491
492 Update processed files list and upload to stats account.
493 """
494
495 for k in 'lookback_hours lookback_window'.split():
496 if k in kwargs and kwargs[k] is not None:
497 setattr(self, k, kwargs[k])
498
499 start = time.time()
500 self.logger.info(_("Beginning log processing"))
501
502 lookback_start, lookback_end = self.get_lookback_interval()
503 self.logger.debug('lookback_start: %s' % lookback_start)
504 self.logger.debug('lookback_end: %s' % lookback_end)
505
506 processed_files = self.get_processed_files_list()
507 if processed_files == None:
508 self.logger.error(_('Log processing unable to load list of '
509 'already processed log files'))
510 return
511 self.logger.debug(_('found %d processed files') %
512 len(processed_files))
513
514 logs_to_process = self.log_processor.get_data_list(lookback_start,
515 lookback_end, processed_files)
516 self.logger.info(_('loaded %d files to process') %
517 len(logs_to_process))
518
519 if logs_to_process:
520 output = self.process_logs(logs_to_process, processed_files)
521 self.store_output(output)
522 del output
523
524 self.store_processed_files_list(processed_files)
367525
368 self.logger.info(_("Log processing done (%0.2f minutes)") %526 self.logger.info(_("Log processing done (%0.2f minutes)") %
369 ((time.time() - start) / 60))527 ((time.time() - start) / 60))
370528
371529
372def multiprocess_collate(processor_args, logs_to_process, worker_count):530def multiprocess_collate(processor_args, logs_to_process, worker_count):
373531
=== modified file 'test/unit/stats/test_log_processor.py'
--- test/unit/stats/test_log_processor.py 2011-03-17 22:22:43 +0000
+++ test/unit/stats/test_log_processor.py 2011-04-11 21:52:38 +0000
@@ -16,6 +16,10 @@
16import unittest16import unittest
17from test.unit import tmpfile17from test.unit import tmpfile
18import Queue18import Queue
19import datetime
20import hashlib
21import pickle
22import time
1923
20from swift.common import internal_proxy24from swift.common import internal_proxy
21from swift.stats import log_processor25from swift.stats import log_processor
@@ -26,7 +30,6 @@
26 def __init__(self, *args, **kwargs):30 def __init__(self, *args, **kwargs):
27 pass31 pass
2832
29
30class DumbLogger(object):33class DumbLogger(object):
31 def __getattr__(self, n):34 def __getattr__(self, n):
32 return self.foo35 return self.foo
@@ -77,7 +80,7 @@
77 return self.code, data()80 return self.code, data()
7881
79class TestLogProcessor(unittest.TestCase):82class TestLogProcessor(unittest.TestCase):
80 83
81 access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\84 access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
82 '09/Jul/2010/04/14/30 GET '\85 '09/Jul/2010/04/14/30 GET '\
83 '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\86 '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
@@ -85,7 +88,7 @@
85 '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'88 '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
86 stats_test_line = 'account,1,2,3'89 stats_test_line = 'account,1,2,3'
87 proxy_config = {'log-processor': {90 proxy_config = {'log-processor': {
88 91
89 }92 }
90 }93 }
9194
@@ -426,3 +429,407 @@
426 finally:429 finally:
427 log_processor.LogProcessor._internal_proxy = None430 log_processor.LogProcessor._internal_proxy = None
428 log_processor.LogProcessor.get_object_data = orig_get_object_data431 log_processor.LogProcessor.get_object_data = orig_get_object_data
432
433class TestLogProcessorDaemon(unittest.TestCase):
434
435 def test_get_lookback_interval(self):
436 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
437 def __init__(self, lookback_hours, lookback_window):
438 self.lookback_hours = lookback_hours
439 self.lookback_window = lookback_window
440
441 try:
442 d = datetime.datetime
443
444 for x in [
445 [d(2011, 1, 1), 0, 0, None, None],
446 [d(2011, 1, 1), 120, 0, '2010122700', None],
447 [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
448 [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
449 [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
450 [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
451 ]:
452
453 log_processor.now = lambda: x[0]
454
455 d = MockLogProcessorDaemon(x[1], x[2])
456 self.assertEquals((x[3], x[4]), d.get_lookback_interval())
457 finally:
458 log_processor.now = datetime.datetime.now
459
460 def test_get_processed_files_list(self):
461 class MockLogProcessor():
462 def __init__(self, stream):
463 self.stream = stream
464
465 def get_object_data(self, *args, **kwargs):
466 return self.stream
467
468 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
469 def __init__(self, stream):
470 self.log_processor = MockLogProcessor(stream)
471 self.log_processor_account = 'account'
472 self.log_processor_container = 'container'
473 self.processed_files_filename = 'filename'
474
475 file_list = set(['a', 'b', 'c'])
476
477 for s, l in [['', None],
478 [pickle.dumps(set()).split('\n'), set()],
479 [pickle.dumps(file_list).split('\n'), file_list],
480 ]:
481
482 self.assertEquals(l,
483 MockLogProcessorDaemon(s).get_processed_files_list())
484
485 def test_get_processed_files_list_bad_file_downloads(self):
486 class MockLogProcessor():
487 def __init__(self, status_code):
488 self.err = log_processor.BadFileDownload(status_code)
489
490 def get_object_data(self, *a, **k):
491 raise self.err
492
493 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
494 def __init__(self, status_code):
495 self.log_processor = MockLogProcessor(status_code)
496 self.log_processor_account = 'account'
497 self.log_processor_container = 'container'
498 self.processed_files_filename = 'filename'
499
500 for c, l in [[404, set()], [503, None], [None, None]]:
501 self.assertEquals(l,
502 MockLogProcessorDaemon(c).get_processed_files_list())
503
504 def test_get_aggregate_data(self):
505 # when run "for real"
506 # the various keys/values in the input and output
507 # dictionaries are often not simple strings
508 # for testing we can use keys that are easier to work with
509
510 processed_files = set()
511
512 data_in = [
513 ['file1', {
514 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
515 'acct1_time2': {'field1': 4, 'field2': 5},
516 'acct2_time1': {'field1': 6, 'field2': 7},
517 'acct3_time3': {'field1': 8, 'field2': 9},
518 }
519 ],
520 ['file2', {'acct1_time1': {'field1': 10}}],
521 ]
522
523 expected_data_out = {
524 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
525 'acct1_time2': {'field1': 4, 'field2': 5},
526 'acct2_time1': {'field1': 6, 'field2': 7},
527 'acct3_time3': {'field1': 8, 'field2': 9},
528 }
529
530 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
531 def __init__(self):
532 pass
533
534 d = MockLogProcessorDaemon()
535 data_out = d.get_aggregate_data(processed_files, data_in)
536
537 for k, v in expected_data_out.items():
538 self.assertEquals(v, data_out[k])
539
540 self.assertEquals(set(['file1', 'file2']), processed_files)
541
542 def test_get_final_info(self):
543 # when run "for real"
544 # the various keys/values in the input and output
545 # dictionaries are often not simple strings
546 # for testing we can use keys/values that are easier to work with
547
548 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
549 def __init__(self):
550 self._keylist_mapping = {
551 'out_field1':['field1', 'field2', 'field3'],
552 'out_field2':['field2', 'field3'],
553 'out_field3':['field3'],
554 'out_field4':'field4',
555 'out_field5':['field6', 'field7', 'field8'],
556 'out_field6':['field6'],
557 'out_field7':'field7',
558 }
559
560 data_in = {
561 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
562 'field4': 8, 'field5': 11},
563 'acct1_time2': {'field1': 4, 'field2': 5},
564 'acct2_time1': {'field1': 6, 'field2': 7},
565 'acct3_time3': {'field1': 8, 'field2': 9},
566 }
567
568 expected_data_out = {
569 'acct1_time1': {'out_field1': 16, 'out_field2': 5,
570 'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
571 'out_field6': 0, 'out_field7': 0,},
572 'acct1_time2': {'out_field1': 9, 'out_field2': 5,
573 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
574 'out_field6': 0, 'out_field7': 0,},
575 'acct2_time1': {'out_field1': 13, 'out_field2': 7,
576 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
577 'out_field6': 0, 'out_field7': 0,},
578 'acct3_time3': {'out_field1': 17, 'out_field2': 9,
579 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
580 'out_field6': 0, 'out_field7': 0,},
581 }
582
583 self.assertEquals(expected_data_out,
584 MockLogProcessorDaemon().get_final_info(data_in))
585
586 def test_store_processed_files_list(self):
587 class MockInternalProxy:
588 def __init__(self, test, daemon, processed_files):
589 self.test = test
590 self.daemon = daemon
591 self.processed_files = processed_files
592
593 def upload_file(self, f, account, container, filename):
594 self.test.assertEquals(self.processed_files,
595 pickle.loads(f.getvalue()))
596 self.test.assertEquals(self.daemon.log_processor_account,
597 account)
598 self.test.assertEquals(self.daemon.log_processor_container,
599 container)
600 self.test.assertEquals(self.daemon.processed_files_filename,
601 filename)
602
603 class MockLogProcessor:
604 def __init__(self, test, daemon, processed_files):
605 self.internal_proxy = MockInternalProxy(test, daemon,
606 processed_files)
607
608 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
609 def __init__(self, test, processed_files):
610 self.log_processor = \
611 MockLogProcessor(test, self, processed_files)
612 self.log_processor_account = 'account'
613 self.log_processor_container = 'container'
614 self.processed_files_filename = 'filename'
615
616 processed_files = set(['a', 'b', 'c'])
617 MockLogProcessorDaemon(self, processed_files).\
618 store_processed_files_list(processed_files)
619
620 def test_get_output(self):
621 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
622 def __init__(self):
623 self._keylist_mapping = {'a':None, 'b':None, 'c':None}
624
625 data_in = {
626 ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
627 ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
628 ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
629 ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
630 }
631
632 expected_data_out = [
633 ['data_ts', 'account', 'a', 'b', 'c'],
634 ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
635 ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
636 ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
637 ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
638 ]
639
640 data_out = MockLogProcessorDaemon().get_output(data_in)
641 self.assertEquals(expected_data_out[0], data_out[0])
642
643 for row in data_out[1:]:
644 self.assert_(row in expected_data_out)
645
646 for row in expected_data_out[1:]:
647 self.assert_(row in data_out)
648
649 def test_store_output(self):
650 try:
651 real_strftime = time.strftime
652 mock_strftime_return = '2010/03/02/01/'
653 def mock_strftime(format):
654 self.assertEquals('%Y/%m/%d/%H/', format)
655 return mock_strftime_return
656 log_processor.time.strftime = mock_strftime
657
658 data_in = [
659 ['data_ts', 'account', 'a', 'b', 'c'],
660 ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
661 ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
662 ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
663 ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
664 ]
665
666 expected_output = '\n'.join([','.join(row) for row in data_in])
667 h = hashlib.md5(expected_output).hexdigest()
668 expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
669
670 class MockInternalProxy:
671 def __init__(self, test, daemon, expected_filename,
672 expected_output):
673 self.test = test
674 self.daemon = daemon
675 self.expected_filename = expected_filename
676 self.expected_output = expected_output
677
678 def upload_file(self, f, account, container, filename):
679 self.test.assertEquals(self.daemon.log_processor_account,
680 account)
681 self.test.assertEquals(self.daemon.log_processor_container,
682 container)
683 self.test.assertEquals(self.expected_filename, filename)
684 self.test.assertEquals(self.expected_output, f.getvalue())
685
686 class MockLogProcessor:
687 def __init__(self, test, daemon, expected_filename,
688 expected_output):
689 self.internal_proxy = MockInternalProxy(test, daemon,
690 expected_filename, expected_output)
691
692 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
693 def __init__(self, test, expected_filename, expected_output):
694 self.log_processor = MockLogProcessor(test, self,
695 expected_filename, expected_output)
696 self.log_processor_account = 'account'
697 self.log_processor_container = 'container'
698 self.processed_files_filename = 'filename'
699
700 MockLogProcessorDaemon(self, expected_filename, expected_output).\
701 store_output(data_in)
702 finally:
703 log_processor.time.strftime = real_strftime
704
705 def test_keylist_mapping(self):
706 # Kind of lame test to see if the propery is both
707 # generated by a particular method and cached properly.
708 # The method that actually generates the mapping is
709 # tested elsewhere.
710
711 value_return = 'keylist_mapping'
712 class MockLogProcessor:
713 def __init__(self):
714 self.call_count = 0
715
716 def generate_keylist_mapping(self):
717 self.call_count += 1
718 return value_return
719
720 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
721 def __init__(self):
722 self.log_processor = MockLogProcessor()
723 self._keylist_mapping = None
724
725 d = MockLogProcessorDaemon()
726 self.assertEquals(value_return, d.keylist_mapping)
727 self.assertEquals(value_return, d.keylist_mapping)
728 self.assertEquals(1, d.log_processor.call_count)
729
730 def test_process_logs(self):
731 try:
732 mock_logs_to_process = 'logs_to_process'
733 mock_processed_files = 'processed_files'
734
735 real_multiprocess_collate = log_processor.multiprocess_collate
736 multiprocess_collate_return = 'multiprocess_collate_return'
737
738 get_aggregate_data_return = 'get_aggregate_data_return'
739 get_final_info_return = 'get_final_info_return'
740 get_output_return = 'get_output_return'
741
742 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
743 def __init__(self, test):
744 self.test = test
745 self.total_conf = 'total_conf'
746 self.logger = 'logger'
747 self.worker_count = 'worker_count'
748
749 def get_aggregate_data(self, processed_files, results):
750 self.test.assertEquals(mock_processed_files, processed_files)
751 self.test.assertEquals(multiprocess_collate_return, results)
752 return get_aggregate_data_return
753
754 def get_final_info(self, aggr_data):
755 self.test.assertEquals(get_aggregate_data_return, aggr_data)
756 return get_final_info_return
757
758 def get_output(self, final_info):
759 self.test.assertEquals(get_final_info_return, final_info)
760 return get_output_return
761
762 d = MockLogProcessorDaemon(self)
763
764 def mock_multiprocess_collate(processor_args, logs_to_process,
765 worker_count):
766 self.assertEquals(d.total_conf, processor_args[0])
767 self.assertEquals(d.logger, processor_args[1])
768
769 self.assertEquals(mock_logs_to_process, logs_to_process)
770 self.assertEquals(d.worker_count, worker_count)
771
772 return multiprocess_collate_return
773
774 log_processor.multiprocess_collate = mock_multiprocess_collate
775
776 output = d.process_logs(mock_logs_to_process, mock_processed_files)
777 self.assertEquals(get_output_return, output)
778 finally:
779 log_processor.multiprocess_collate = real_multiprocess_collate
780
781 def test_run_once_get_processed_files_list_returns_none(self):
782 class MockLogProcessor:
783 def get_data_list(self, lookback_start, lookback_end,
784 processed_files):
785 raise unittest.TestCase.failureException, \
786 'Method should not be called'
787
788 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
789 def __init__(self):
790 self.logger = DumbLogger()
791 self.log_processor = MockLogProcessor()
792
793 def get_lookback_interval(self):
794 return None, None
795
796 def get_processed_files_list(self):
797 return None
798
799 MockLogProcessorDaemon().run_once()
800
801 def test_run_once_no_logs_to_process(self):
802 class MockLogProcessor():
803 def __init__(self, daemon, test):
804 self.daemon = daemon
805 self.test = test
806
807 def get_data_list(self, lookback_start, lookback_end,
808 processed_files):
809 self.test.assertEquals(self.daemon.lookback_start,
810 lookback_start)
811 self.test.assertEquals(self.daemon.lookback_end,
812 lookback_end)
813 self.test.assertEquals(self.daemon.processed_files,
814 processed_files)
815 return []
816
817 class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
818 def __init__(self, test):
819 self.logger = DumbLogger()
820 self.log_processor = MockLogProcessor(self, test)
821 self.lookback_start = 'lookback_start'
822 self.lookback_end = 'lookback_end'
823 self.processed_files = ['a', 'b', 'c']
824
825 def get_lookback_interval(self):
826 return self.lookback_start, self.lookback_end
827
828 def get_processed_files_list(self):
829 return self.processed_files
830
831 def process_logs(logs_to_process, processed_files):
832 raise unittest.TestCase.failureException, \
833 'Method should not be called'
834
835 MockLogProcessorDaemon(self).run_once()