Merge lp:~greglange/swift/lp725215 into lp:~hudson-openstack/swift/trunk

Proposed by Greg Lange
Status: Merged
Approved by: David Goetz
Approved revision: 256
Merged at revision: 264
Proposed branch: lp:~greglange/swift/lp725215
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 820 lines (+641/-76)
2 files modified
swift/stats/log_processor.py (+231/-73)
test/unit/stats/test_log_processor.py (+410/-3)
To merge this branch: bzr merge lp:~greglange/swift/lp725215
Reviewer Review Type Date Requested Status
John Dickinson Approve
Greg Lange Pending
Review via email: mp+55637@code.launchpad.net

This proposal supersedes a proposal from 2011-03-23.

Description of the change

Refactored the log processing daemon to make it more testable.

Added tests for that.

I shouldn't have changed how it worked at all.

This needs to be tested on staging extensively before being pushed to production.

To post a comment you must log in.
Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal

pep8 line 35, log_processor.py

test_get_output seems broken. perhaps you forgot "in" on the self.assert_() calls (lines 645, 648)? another option is to sort the expected out and the given out and compare equality

review: Needs Fixing
Revision history for this message
Greg Lange (greglange) : Posted in a previous version of this proposal
review: Needs Resubmitting
lp:~greglange/swift/lp725215 updated
255. By Greg Lange <email address hidden>

fixed merge conflict

Revision history for this message
John Dickinson (notmyname) wrote :

good

review: Approve
lp:~greglange/swift/lp725215 updated
256. By Greg Lange <email address hidden>

added doc strings to new methods

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'swift/stats/log_processor.py'
2--- swift/stats/log_processor.py 2011-03-30 21:10:27 +0000
3+++ swift/stats/log_processor.py 2011-04-11 21:52:38 +0000
4@@ -30,6 +30,8 @@
5 from swift.common.utils import get_logger, readconf
6 from swift.common.daemon import Daemon
7
8+now = datetime.datetime.now
9+
10
11 class BadFileDownload(Exception):
12 def __init__(self, status_code=None):
13@@ -234,31 +236,46 @@
14 self.log_processor_container = c.get('container_name',
15 'log_processing_data')
16 self.worker_count = int(c.get('worker_count', '1'))
17-
18- def run_once(self, *args, **kwargs):
19- for k in 'lookback_hours lookback_window'.split():
20- if kwargs[k] is not None:
21- setattr(self, k, kwargs[k])
22-
23- self.logger.info(_("Beginning log processing"))
24- start = time.time()
25+ self._keylist_mapping = None
26+ self.processed_files_filename = 'processed_files.pickle.gz'
27+
28+ def get_lookback_interval(self):
29+ """
30+ :returns: lookback_start, lookback_end.
31+
32+ Both or just lookback_end can be None. Otherwise, returns strings
33+ of the form 'YYYYMMDDHH'. The interval returned is used as bounds
34+ when looking for logs to processes.
35+
36+ A returned None means don't limit the log files examined on that
37+ side of the interval.
38+ """
39+
40 if self.lookback_hours == 0:
41 lookback_start = None
42 lookback_end = None
43 else:
44 delta_hours = datetime.timedelta(hours=self.lookback_hours)
45- lookback_start = datetime.datetime.now() - delta_hours
46+ lookback_start = now() - delta_hours
47 lookback_start = lookback_start.strftime('%Y%m%d%H')
48 if self.lookback_window == 0:
49 lookback_end = None
50 else:
51 delta_window = datetime.timedelta(hours=self.lookback_window)
52- lookback_end = datetime.datetime.now() - \
53+ lookback_end = now() - \
54 delta_hours + \
55 delta_window
56 lookback_end = lookback_end.strftime('%Y%m%d%H')
57- self.logger.debug('lookback_start: %s' % lookback_start)
58- self.logger.debug('lookback_end: %s' % lookback_end)
59+ return lookback_start, lookback_end
60+
61+ def get_processed_files_list(self):
62+ """
63+ :returns: a set of files that have already been processed or returns
64+ None on error.
65+
66+ Downloads the set from the stats account. Creates an empty set if
67+ the an existing file cannot be found.
68+ """
69 try:
70 # Note: this file (or data set) will grow without bound.
71 # In practice, if it becomes a problem (say, after many months of
72@@ -266,44 +283,52 @@
73 # entries. Automatically pruning on each run could be dangerous.
74 # There is not a good way to determine when an old entry should be
75 # pruned (lookback_hours could be set to anything and could change)
76- processed_files_stream = self.log_processor.get_object_data(
77- self.log_processor_account,
78- self.log_processor_container,
79- 'processed_files.pickle.gz',
80- compressed=True)
81- buf = '\n'.join(x for x in processed_files_stream)
82+ stream = self.log_processor.get_object_data(
83+ self.log_processor_account,
84+ self.log_processor_container,
85+ self.processed_files_filename,
86+ compressed=True)
87+ buf = '\n'.join(x for x in stream)
88 if buf:
89- already_processed_files = cPickle.loads(buf)
90+ files = cPickle.loads(buf)
91 else:
92- already_processed_files = set()
93+ return None
94 except BadFileDownload, err:
95 if err.status_code == 404:
96- already_processed_files = set()
97+ files = set()
98 else:
99- self.logger.error(_('Log processing unable to load list of '
100- 'already processed log files'))
101- return
102- self.logger.debug(_('found %d processed files') % \
103- len(already_processed_files))
104- logs_to_process = self.log_processor.get_data_list(lookback_start,
105- lookback_end,
106- already_processed_files)
107- self.logger.info(_('loaded %d files to process') %
108- len(logs_to_process))
109- if not logs_to_process:
110- self.logger.info(_("Log processing done (%0.2f minutes)") %
111- ((time.time() - start) / 60))
112- return
113-
114- # map
115- processor_args = (self.total_conf, self.logger)
116- results = multiprocess_collate(processor_args, logs_to_process,
117- self.worker_count)
118-
119- #reduce
120+ return None
121+ return files
122+
123+ def get_aggregate_data(self, processed_files, input_data):
124+ """
125+ Aggregates stats data by account/hour, summing as needed.
126+
127+ :param processed_files: set of processed files
128+ :param input_data: is the output from multiprocess_collate/the plugins.
129+
130+ :returns: A dict containing data aggregated from the input_data
131+ passed in.
132+
133+ The dict returned has tuple keys of the form:
134+ (account, year, month, day, hour)
135+ The dict returned has values that are dicts with items of this
136+ form:
137+ key:field_value
138+ - key corresponds to something in one of the plugin's keylist
139+ mapping, something like the tuple (source, level, verb, code)
140+ - field_value is the sum of the field_values for the
141+ corresponding values in the input
142+
143+ Both input_data and the dict returned are hourly aggregations of
144+ stats.
145+
146+ Multiple values for the same (account, hour, tuple key) found in
147+ input_data are summed in the dict returned.
148+ """
149+
150 aggr_data = {}
151- processed_files = already_processed_files
152- for item, data in results:
153+ for item, data in input_data:
154 # since item contains the plugin and the log name, new plugins will
155 # "reprocess" the file and the results will be in the final csv.
156 processed_files.add(item)
157@@ -315,14 +340,30 @@
158 # processing plugins need to realize this
159 existing_data[i] = current + j
160 aggr_data[k] = existing_data
161-
162- # group
163- # reduce a large number of keys in aggr_data[k] to a small number of
164- # output keys
165- keylist_mapping = self.log_processor.generate_keylist_mapping()
166+ return aggr_data
167+
168+ def get_final_info(self, aggr_data):
169+ """
170+ Aggregates data from aggr_data based on the keylist mapping.
171+
172+ :param aggr_data: The results of the get_aggregate_data function.
173+ :returns: a dict of further aggregated data
174+
175+ The dict returned has keys of the form:
176+ (account, year, month, day, hour)
177+ The dict returned has values that are dicts with items of this
178+ form:
179+ 'field_name': field_value (int)
180+
181+ Data is aggregated as specified by the keylist mapping. The
182+ keylist mapping specifies which keys to combine in aggr_data
183+ and the final field_names for these combined keys in the dict
184+ returned. Fields combined are summed.
185+ """
186+
187 final_info = collections.defaultdict(dict)
188 for account, data in aggr_data.items():
189- for key, mapping in keylist_mapping.items():
190+ for key, mapping in self.keylist_mapping.items():
191 if isinstance(mapping, (list, set)):
192 value = 0
193 for k in mapping:
194@@ -336,37 +377,154 @@
195 except KeyError:
196 value = 0
197 final_info[account][key] = value
198-
199- # output
200- sorted_keylist_mapping = sorted(keylist_mapping)
201- columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
202- out_buf = [columns]
203+ return final_info
204+
205+ def store_processed_files_list(self, processed_files):
206+ """
207+ Stores the proccessed files list in the stats account.
208+
209+ :param processed_files: set of processed files
210+ """
211+
212+ s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
213+ f = cStringIO.StringIO(s)
214+ self.log_processor.internal_proxy.upload_file(f,
215+ self.log_processor_account,
216+ self.log_processor_container,
217+ self.processed_files_filename)
218+
219+ def get_output(self, final_info):
220+ """
221+ :returns: a list of rows to appear in the csv file.
222+
223+ The first row contains the column headers for the rest of the
224+ rows in the returned list.
225+
226+ Each row after the first row corresponds to an account's data
227+ for that hour.
228+ """
229+
230+ sorted_keylist_mapping = sorted(self.keylist_mapping)
231+ columns = ['data_ts', 'account'] + sorted_keylist_mapping
232+ output = [columns]
233 for (account, year, month, day, hour), d in final_info.items():
234- data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
235- row = [data_ts]
236- row.append('%s' % account)
237+ data_ts = '%04d/%02d/%02d %02d:00:00' % \
238+ (int(year), int(month), int(day), int(hour))
239+ row = [data_ts, '%s' % (account)]
240 for k in sorted_keylist_mapping:
241- row.append('%s' % d[k])
242- out_buf.append(','.join(row))
243- out_buf = '\n'.join(out_buf)
244+ row.append(str(d[k]))
245+ output.append(row)
246+ return output
247+
248+ def store_output(self, output):
249+ """
250+ Takes the a list of rows and stores a csv file of the values in the
251+ stats account.
252+
253+ :param output: list of rows to appear in the csv file
254+
255+ This csv file is final product of this script.
256+ """
257+
258+ out_buf = '\n'.join([','.join(row) for row in output])
259 h = hashlib.md5(out_buf).hexdigest()
260 upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
261 f = cStringIO.StringIO(out_buf)
262 self.log_processor.internal_proxy.upload_file(f,
263- self.log_processor_account,
264- self.log_processor_container,
265- upload_name)
266-
267- # cleanup
268- s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
269- f = cStringIO.StringIO(s)
270- self.log_processor.internal_proxy.upload_file(f,
271- self.log_processor_account,
272- self.log_processor_container,
273- 'processed_files.pickle.gz')
274+ self.log_processor_account,
275+ self.log_processor_container,
276+ upload_name)
277+
278+ @property
279+ def keylist_mapping(self):
280+ """
281+ :returns: the keylist mapping.
282+
283+ The keylist mapping determines how the stats fields are aggregated in
284+ the final aggregation step.
285+ """
286+
287+ if self._keylist_mapping == None:
288+ self._keylist_mapping = \
289+ self.log_processor.generate_keylist_mapping()
290+ return self._keylist_mapping
291+
292+ def process_logs(self, logs_to_process, processed_files):
293+ """
294+ :param logs_to_process: list of logs to process
295+ :param processed_files: set of processed files
296+
297+ :returns: returns a list of rows of processed data.
298+
299+ The first row is the column headers. The rest of the rows contain
300+ hourly aggregate data for the account specified in the row.
301+
302+ Files processed are added to the processed_files set.
303+
304+ When a large data structure is no longer needed, it is deleted in
305+ an effort to conserve memory.
306+ """
307+
308+ # map
309+ processor_args = (self.total_conf, self.logger)
310+ results = multiprocess_collate(processor_args, logs_to_process,
311+ self.worker_count)
312+
313+ # reduce
314+ aggr_data = self.get_aggregate_data(processed_files, results)
315+ del results
316+
317+ # group
318+ # reduce a large number of keys in aggr_data[k] to a small
319+ # number of output keys
320+ final_info = self.get_final_info(aggr_data)
321+ del aggr_data
322+
323+ # output
324+ return self.get_output(final_info)
325+
326+ def run_once(self, *args, **kwargs):
327+ """
328+ Process log files that fall within the lookback interval.
329+
330+ Upload resulting csv file to stats account.
331+
332+ Update processed files list and upload to stats account.
333+ """
334+
335+ for k in 'lookback_hours lookback_window'.split():
336+ if k in kwargs and kwargs[k] is not None:
337+ setattr(self, k, kwargs[k])
338+
339+ start = time.time()
340+ self.logger.info(_("Beginning log processing"))
341+
342+ lookback_start, lookback_end = self.get_lookback_interval()
343+ self.logger.debug('lookback_start: %s' % lookback_start)
344+ self.logger.debug('lookback_end: %s' % lookback_end)
345+
346+ processed_files = self.get_processed_files_list()
347+ if processed_files == None:
348+ self.logger.error(_('Log processing unable to load list of '
349+ 'already processed log files'))
350+ return
351+ self.logger.debug(_('found %d processed files') %
352+ len(processed_files))
353+
354+ logs_to_process = self.log_processor.get_data_list(lookback_start,
355+ lookback_end, processed_files)
356+ self.logger.info(_('loaded %d files to process') %
357+ len(logs_to_process))
358+
359+ if logs_to_process:
360+ output = self.process_logs(logs_to_process, processed_files)
361+ self.store_output(output)
362+ del output
363+
364+ self.store_processed_files_list(processed_files)
365
366 self.logger.info(_("Log processing done (%0.2f minutes)") %
367- ((time.time() - start) / 60))
368+ ((time.time() - start) / 60))
369
370
371 def multiprocess_collate(processor_args, logs_to_process, worker_count):
372
373=== modified file 'test/unit/stats/test_log_processor.py'
374--- test/unit/stats/test_log_processor.py 2011-03-17 22:22:43 +0000
375+++ test/unit/stats/test_log_processor.py 2011-04-11 21:52:38 +0000
376@@ -16,6 +16,10 @@
377 import unittest
378 from test.unit import tmpfile
379 import Queue
380+import datetime
381+import hashlib
382+import pickle
383+import time
384
385 from swift.common import internal_proxy
386 from swift.stats import log_processor
387@@ -26,7 +30,6 @@
388 def __init__(self, *args, **kwargs):
389 pass
390
391-
392 class DumbLogger(object):
393 def __getattr__(self, n):
394 return self.foo
395@@ -77,7 +80,7 @@
396 return self.code, data()
397
398 class TestLogProcessor(unittest.TestCase):
399-
400+
401 access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
402 '09/Jul/2010/04/14/30 GET '\
403 '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
404@@ -85,7 +88,7 @@
405 '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
406 stats_test_line = 'account,1,2,3'
407 proxy_config = {'log-processor': {
408-
409+
410 }
411 }
412
413@@ -426,3 +429,407 @@
414 finally:
415 log_processor.LogProcessor._internal_proxy = None
416 log_processor.LogProcessor.get_object_data = orig_get_object_data
417+
418+class TestLogProcessorDaemon(unittest.TestCase):
419+
420+ def test_get_lookback_interval(self):
421+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
422+ def __init__(self, lookback_hours, lookback_window):
423+ self.lookback_hours = lookback_hours
424+ self.lookback_window = lookback_window
425+
426+ try:
427+ d = datetime.datetime
428+
429+ for x in [
430+ [d(2011, 1, 1), 0, 0, None, None],
431+ [d(2011, 1, 1), 120, 0, '2010122700', None],
432+ [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
433+ [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
434+ [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
435+ [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
436+ ]:
437+
438+ log_processor.now = lambda: x[0]
439+
440+ d = MockLogProcessorDaemon(x[1], x[2])
441+ self.assertEquals((x[3], x[4]), d.get_lookback_interval())
442+ finally:
443+ log_processor.now = datetime.datetime.now
444+
445+ def test_get_processed_files_list(self):
446+ class MockLogProcessor():
447+ def __init__(self, stream):
448+ self.stream = stream
449+
450+ def get_object_data(self, *args, **kwargs):
451+ return self.stream
452+
453+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
454+ def __init__(self, stream):
455+ self.log_processor = MockLogProcessor(stream)
456+ self.log_processor_account = 'account'
457+ self.log_processor_container = 'container'
458+ self.processed_files_filename = 'filename'
459+
460+ file_list = set(['a', 'b', 'c'])
461+
462+ for s, l in [['', None],
463+ [pickle.dumps(set()).split('\n'), set()],
464+ [pickle.dumps(file_list).split('\n'), file_list],
465+ ]:
466+
467+ self.assertEquals(l,
468+ MockLogProcessorDaemon(s).get_processed_files_list())
469+
470+ def test_get_processed_files_list_bad_file_downloads(self):
471+ class MockLogProcessor():
472+ def __init__(self, status_code):
473+ self.err = log_processor.BadFileDownload(status_code)
474+
475+ def get_object_data(self, *a, **k):
476+ raise self.err
477+
478+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
479+ def __init__(self, status_code):
480+ self.log_processor = MockLogProcessor(status_code)
481+ self.log_processor_account = 'account'
482+ self.log_processor_container = 'container'
483+ self.processed_files_filename = 'filename'
484+
485+ for c, l in [[404, set()], [503, None], [None, None]]:
486+ self.assertEquals(l,
487+ MockLogProcessorDaemon(c).get_processed_files_list())
488+
489+ def test_get_aggregate_data(self):
490+ # when run "for real"
491+ # the various keys/values in the input and output
492+ # dictionaries are often not simple strings
493+ # for testing we can use keys that are easier to work with
494+
495+ processed_files = set()
496+
497+ data_in = [
498+ ['file1', {
499+ 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
500+ 'acct1_time2': {'field1': 4, 'field2': 5},
501+ 'acct2_time1': {'field1': 6, 'field2': 7},
502+ 'acct3_time3': {'field1': 8, 'field2': 9},
503+ }
504+ ],
505+ ['file2', {'acct1_time1': {'field1': 10}}],
506+ ]
507+
508+ expected_data_out = {
509+ 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
510+ 'acct1_time2': {'field1': 4, 'field2': 5},
511+ 'acct2_time1': {'field1': 6, 'field2': 7},
512+ 'acct3_time3': {'field1': 8, 'field2': 9},
513+ }
514+
515+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
516+ def __init__(self):
517+ pass
518+
519+ d = MockLogProcessorDaemon()
520+ data_out = d.get_aggregate_data(processed_files, data_in)
521+
522+ for k, v in expected_data_out.items():
523+ self.assertEquals(v, data_out[k])
524+
525+ self.assertEquals(set(['file1', 'file2']), processed_files)
526+
527+ def test_get_final_info(self):
528+ # when run "for real"
529+ # the various keys/values in the input and output
530+ # dictionaries are often not simple strings
531+ # for testing we can use keys/values that are easier to work with
532+
533+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
534+ def __init__(self):
535+ self._keylist_mapping = {
536+ 'out_field1':['field1', 'field2', 'field3'],
537+ 'out_field2':['field2', 'field3'],
538+ 'out_field3':['field3'],
539+ 'out_field4':'field4',
540+ 'out_field5':['field6', 'field7', 'field8'],
541+ 'out_field6':['field6'],
542+ 'out_field7':'field7',
543+ }
544+
545+ data_in = {
546+ 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
547+ 'field4': 8, 'field5': 11},
548+ 'acct1_time2': {'field1': 4, 'field2': 5},
549+ 'acct2_time1': {'field1': 6, 'field2': 7},
550+ 'acct3_time3': {'field1': 8, 'field2': 9},
551+ }
552+
553+ expected_data_out = {
554+ 'acct1_time1': {'out_field1': 16, 'out_field2': 5,
555+ 'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
556+ 'out_field6': 0, 'out_field7': 0,},
557+ 'acct1_time2': {'out_field1': 9, 'out_field2': 5,
558+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
559+ 'out_field6': 0, 'out_field7': 0,},
560+ 'acct2_time1': {'out_field1': 13, 'out_field2': 7,
561+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
562+ 'out_field6': 0, 'out_field7': 0,},
563+ 'acct3_time3': {'out_field1': 17, 'out_field2': 9,
564+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
565+ 'out_field6': 0, 'out_field7': 0,},
566+ }
567+
568+ self.assertEquals(expected_data_out,
569+ MockLogProcessorDaemon().get_final_info(data_in))
570+
571+ def test_store_processed_files_list(self):
572+ class MockInternalProxy:
573+ def __init__(self, test, daemon, processed_files):
574+ self.test = test
575+ self.daemon = daemon
576+ self.processed_files = processed_files
577+
578+ def upload_file(self, f, account, container, filename):
579+ self.test.assertEquals(self.processed_files,
580+ pickle.loads(f.getvalue()))
581+ self.test.assertEquals(self.daemon.log_processor_account,
582+ account)
583+ self.test.assertEquals(self.daemon.log_processor_container,
584+ container)
585+ self.test.assertEquals(self.daemon.processed_files_filename,
586+ filename)
587+
588+ class MockLogProcessor:
589+ def __init__(self, test, daemon, processed_files):
590+ self.internal_proxy = MockInternalProxy(test, daemon,
591+ processed_files)
592+
593+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
594+ def __init__(self, test, processed_files):
595+ self.log_processor = \
596+ MockLogProcessor(test, self, processed_files)
597+ self.log_processor_account = 'account'
598+ self.log_processor_container = 'container'
599+ self.processed_files_filename = 'filename'
600+
601+ processed_files = set(['a', 'b', 'c'])
602+ MockLogProcessorDaemon(self, processed_files).\
603+ store_processed_files_list(processed_files)
604+
605+ def test_get_output(self):
606+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
607+ def __init__(self):
608+ self._keylist_mapping = {'a':None, 'b':None, 'c':None}
609+
610+ data_in = {
611+ ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
612+ ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
613+ ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
614+ ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
615+ }
616+
617+ expected_data_out = [
618+ ['data_ts', 'account', 'a', 'b', 'c'],
619+ ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
620+ ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
621+ ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
622+ ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
623+ ]
624+
625+ data_out = MockLogProcessorDaemon().get_output(data_in)
626+ self.assertEquals(expected_data_out[0], data_out[0])
627+
628+ for row in data_out[1:]:
629+ self.assert_(row in expected_data_out)
630+
631+ for row in expected_data_out[1:]:
632+ self.assert_(row in data_out)
633+
634+ def test_store_output(self):
635+ try:
636+ real_strftime = time.strftime
637+ mock_strftime_return = '2010/03/02/01/'
638+ def mock_strftime(format):
639+ self.assertEquals('%Y/%m/%d/%H/', format)
640+ return mock_strftime_return
641+ log_processor.time.strftime = mock_strftime
642+
643+ data_in = [
644+ ['data_ts', 'account', 'a', 'b', 'c'],
645+ ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
646+ ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
647+ ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
648+ ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
649+ ]
650+
651+ expected_output = '\n'.join([','.join(row) for row in data_in])
652+ h = hashlib.md5(expected_output).hexdigest()
653+ expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
654+
655+ class MockInternalProxy:
656+ def __init__(self, test, daemon, expected_filename,
657+ expected_output):
658+ self.test = test
659+ self.daemon = daemon
660+ self.expected_filename = expected_filename
661+ self.expected_output = expected_output
662+
663+ def upload_file(self, f, account, container, filename):
664+ self.test.assertEquals(self.daemon.log_processor_account,
665+ account)
666+ self.test.assertEquals(self.daemon.log_processor_container,
667+ container)
668+ self.test.assertEquals(self.expected_filename, filename)
669+ self.test.assertEquals(self.expected_output, f.getvalue())
670+
671+ class MockLogProcessor:
672+ def __init__(self, test, daemon, expected_filename,
673+ expected_output):
674+ self.internal_proxy = MockInternalProxy(test, daemon,
675+ expected_filename, expected_output)
676+
677+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
678+ def __init__(self, test, expected_filename, expected_output):
679+ self.log_processor = MockLogProcessor(test, self,
680+ expected_filename, expected_output)
681+ self.log_processor_account = 'account'
682+ self.log_processor_container = 'container'
683+ self.processed_files_filename = 'filename'
684+
685+ MockLogProcessorDaemon(self, expected_filename, expected_output).\
686+ store_output(data_in)
687+ finally:
688+ log_processor.time.strftime = real_strftime
689+
690+ def test_keylist_mapping(self):
691+ # Kind of lame test to see if the propery is both
692+ # generated by a particular method and cached properly.
693+ # The method that actually generates the mapping is
694+ # tested elsewhere.
695+
696+ value_return = 'keylist_mapping'
697+ class MockLogProcessor:
698+ def __init__(self):
699+ self.call_count = 0
700+
701+ def generate_keylist_mapping(self):
702+ self.call_count += 1
703+ return value_return
704+
705+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
706+ def __init__(self):
707+ self.log_processor = MockLogProcessor()
708+ self._keylist_mapping = None
709+
710+ d = MockLogProcessorDaemon()
711+ self.assertEquals(value_return, d.keylist_mapping)
712+ self.assertEquals(value_return, d.keylist_mapping)
713+ self.assertEquals(1, d.log_processor.call_count)
714+
715+ def test_process_logs(self):
716+ try:
717+ mock_logs_to_process = 'logs_to_process'
718+ mock_processed_files = 'processed_files'
719+
720+ real_multiprocess_collate = log_processor.multiprocess_collate
721+ multiprocess_collate_return = 'multiprocess_collate_return'
722+
723+ get_aggregate_data_return = 'get_aggregate_data_return'
724+ get_final_info_return = 'get_final_info_return'
725+ get_output_return = 'get_output_return'
726+
727+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
728+ def __init__(self, test):
729+ self.test = test
730+ self.total_conf = 'total_conf'
731+ self.logger = 'logger'
732+ self.worker_count = 'worker_count'
733+
734+ def get_aggregate_data(self, processed_files, results):
735+ self.test.assertEquals(mock_processed_files, processed_files)
736+ self.test.assertEquals(multiprocess_collate_return, results)
737+ return get_aggregate_data_return
738+
739+ def get_final_info(self, aggr_data):
740+ self.test.assertEquals(get_aggregate_data_return, aggr_data)
741+ return get_final_info_return
742+
743+ def get_output(self, final_info):
744+ self.test.assertEquals(get_final_info_return, final_info)
745+ return get_output_return
746+
747+ d = MockLogProcessorDaemon(self)
748+
749+ def mock_multiprocess_collate(processor_args, logs_to_process,
750+ worker_count):
751+ self.assertEquals(d.total_conf, processor_args[0])
752+ self.assertEquals(d.logger, processor_args[1])
753+
754+ self.assertEquals(mock_logs_to_process, logs_to_process)
755+ self.assertEquals(d.worker_count, worker_count)
756+
757+ return multiprocess_collate_return
758+
759+ log_processor.multiprocess_collate = mock_multiprocess_collate
760+
761+ output = d.process_logs(mock_logs_to_process, mock_processed_files)
762+ self.assertEquals(get_output_return, output)
763+ finally:
764+ log_processor.multiprocess_collate = real_multiprocess_collate
765+
766+ def test_run_once_get_processed_files_list_returns_none(self):
767+ class MockLogProcessor:
768+ def get_data_list(self, lookback_start, lookback_end,
769+ processed_files):
770+ raise unittest.TestCase.failureException, \
771+ 'Method should not be called'
772+
773+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
774+ def __init__(self):
775+ self.logger = DumbLogger()
776+ self.log_processor = MockLogProcessor()
777+
778+ def get_lookback_interval(self):
779+ return None, None
780+
781+ def get_processed_files_list(self):
782+ return None
783+
784+ MockLogProcessorDaemon().run_once()
785+
786+ def test_run_once_no_logs_to_process(self):
787+ class MockLogProcessor():
788+ def __init__(self, daemon, test):
789+ self.daemon = daemon
790+ self.test = test
791+
792+ def get_data_list(self, lookback_start, lookback_end,
793+ processed_files):
794+ self.test.assertEquals(self.daemon.lookback_start,
795+ lookback_start)
796+ self.test.assertEquals(self.daemon.lookback_end,
797+ lookback_end)
798+ self.test.assertEquals(self.daemon.processed_files,
799+ processed_files)
800+ return []
801+
802+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
803+ def __init__(self, test):
804+ self.logger = DumbLogger()
805+ self.log_processor = MockLogProcessor(self, test)
806+ self.lookback_start = 'lookback_start'
807+ self.lookback_end = 'lookback_end'
808+ self.processed_files = ['a', 'b', 'c']
809+
810+ def get_lookback_interval(self):
811+ return self.lookback_start, self.lookback_end
812+
813+ def get_processed_files_list(self):
814+ return self.processed_files
815+
816+ def process_logs(logs_to_process, processed_files):
817+ raise unittest.TestCase.failureException, \
818+ 'Method should not be called'
819+
820+ MockLogProcessorDaemon(self).run_once()