Merge lp:~greglange/swift/lp725215 into lp:~hudson-openstack/swift/trunk

Proposed by Greg Lange
Status: Superseded
Proposed branch: lp:~greglange/swift/lp725215
Merge into: lp:~hudson-openstack/swift/trunk
Diff against target: 691 lines (+517/-69)
2 files modified
swift/stats/log_processor.py (+107/-66)
test/unit/stats/test_log_processor.py (+410/-3)
To merge this branch: bzr merge lp:~greglange/swift/lp725215
Reviewer Review Type Date Requested Status
Greg Lange (community) Needs Resubmitting
John Dickinson Needs Fixing
Review via email: mp+54549@code.launchpad.net

This proposal has been superseded by a proposal from 2011-03-30.

Description of the change

Refactored the log processing daemon to make it more testable.

Added tests for that.

I shouldn't have changed how it worked at all.

This needs to be tested on staging extensively before being pushed to production.

To post a comment you must log in.
Revision history for this message
John Dickinson (notmyname) wrote :

pep8 line 35, log_processor.py

test_get_output seems broken. perhaps you forgot "in" on the self.assert_() calls (lines 645, 648)? another option is to sort the expected out and the given out and compare equality

review: Needs Fixing
lp:~greglange/swift/lp725215 updated
254. By Greg Lange <email address hidden>

fixed test and pep8 problem

Revision history for this message
Greg Lange (greglange) :
review: Needs Resubmitting
lp:~greglange/swift/lp725215 updated
255. By Greg Lange <email address hidden>

fixed merge conflict

256. By Greg Lange <email address hidden>

added doc strings to new methods

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'swift/stats/log_processor.py'
2--- swift/stats/log_processor.py 2011-03-15 15:32:27 +0000
3+++ swift/stats/log_processor.py 2011-03-24 17:14:22 +0000
4@@ -30,6 +30,8 @@
5 from swift.common.utils import get_logger, readconf
6 from swift.common.daemon import Daemon
7
8+now = datetime.datetime.now
9+
10
11 class BadFileDownload(Exception):
12 def __init__(self, status_code=None):
13@@ -234,27 +236,28 @@
14 self.log_processor_container = c.get('container_name',
15 'log_processing_data')
16 self.worker_count = int(c.get('worker_count', '1'))
17+ self._keylist_mapping = None
18+ self.processed_files_filename = 'processed_files.pickle.gz'
19
20- def run_once(self, *args, **kwargs):
21- self.logger.info(_("Beginning log processing"))
22- start = time.time()
23+ def get_lookback_interval(self):
24 if self.lookback_hours == 0:
25 lookback_start = None
26 lookback_end = None
27 else:
28 delta_hours = datetime.timedelta(hours=self.lookback_hours)
29- lookback_start = datetime.datetime.now() - delta_hours
30+ lookback_start = now() - delta_hours
31 lookback_start = lookback_start.strftime('%Y%m%d%H')
32 if self.lookback_window == 0:
33 lookback_end = None
34 else:
35 delta_window = datetime.timedelta(hours=self.lookback_window)
36- lookback_end = datetime.datetime.now() - \
37+ lookback_end = now() - \
38 delta_hours + \
39 delta_window
40 lookback_end = lookback_end.strftime('%Y%m%d%H')
41- self.logger.debug('lookback_start: %s' % lookback_start)
42- self.logger.debug('lookback_end: %s' % lookback_end)
43+ return lookback_start, lookback_end
44+
45+ def get_processed_files_list(self):
46 try:
47 # Note: this file (or data set) will grow without bound.
48 # In practice, if it becomes a problem (say, after many months of
49@@ -262,43 +265,25 @@
50 # entries. Automatically pruning on each run could be dangerous.
51 # There is not a good way to determine when an old entry should be
52 # pruned (lookback_hours could be set to anything and could change)
53- processed_files_stream = self.log_processor.get_object_data(
54- self.log_processor_account,
55- self.log_processor_container,
56- 'processed_files.pickle.gz',
57- compressed=True)
58- buf = '\n'.join(x for x in processed_files_stream)
59+ stream = self.log_processor.get_object_data(
60+ self.log_processor_account,
61+ self.log_processor_container,
62+ self.processed_files_filename,
63+ compressed=True)
64+ buf = '\n'.join(x for x in stream)
65 if buf:
66- already_processed_files = cPickle.loads(buf)
67+ files = cPickle.loads(buf)
68 else:
69- already_processed_files = set()
70+ return None
71 except BadFileDownload, err:
72 if err.status_code == 404:
73- already_processed_files = set()
74+ files = set()
75 else:
76- self.logger.error(_('Log processing unable to load list of '
77- 'already processed log files'))
78- return
79- self.logger.debug(_('found %d processed files') % \
80- len(already_processed_files))
81- logs_to_process = self.log_processor.get_data_list(lookback_start,
82- lookback_end,
83- already_processed_files)
84- self.logger.info(_('loaded %d files to process') %
85- len(logs_to_process))
86- if not logs_to_process:
87- self.logger.info(_("Log processing done (%0.2f minutes)") %
88- ((time.time() - start) / 60))
89- return
90-
91- # map
92- processor_args = (self.total_conf, self.logger)
93- results = multiprocess_collate(processor_args, logs_to_process,
94- self.worker_count)
95-
96- #reduce
97+ return None
98+ return files
99+
100+ def get_aggregate_data(self, processed_files, results):
101 aggr_data = {}
102- processed_files = already_processed_files
103 for item, data in results:
104 # since item contains the plugin and the log name, new plugins will
105 # "reprocess" the file and the results will be in the final csv.
106@@ -311,14 +296,12 @@
107 # processing plugins need to realize this
108 existing_data[i] = current + j
109 aggr_data[k] = existing_data
110+ return aggr_data
111
112- # group
113- # reduce a large number of keys in aggr_data[k] to a small number of
114- # output keys
115- keylist_mapping = self.log_processor.generate_keylist_mapping()
116+ def get_final_info(self, aggr_data):
117 final_info = collections.defaultdict(dict)
118 for account, data in aggr_data.items():
119- for key, mapping in keylist_mapping.items():
120+ for key, mapping in self.keylist_mapping.items():
121 if isinstance(mapping, (list, set)):
122 value = 0
123 for k in mapping:
124@@ -332,37 +315,95 @@
125 except KeyError:
126 value = 0
127 final_info[account][key] = value
128-
129- # output
130- sorted_keylist_mapping = sorted(keylist_mapping)
131- columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
132- out_buf = [columns]
133+ return final_info
134+
135+ def store_processed_files_list(self, processed_files):
136+ s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
137+ f = cStringIO.StringIO(s)
138+ self.log_processor.internal_proxy.upload_file(f,
139+ self.log_processor_account,
140+ self.log_processor_container,
141+ self.processed_files_filename)
142+
143+ def get_output(self, final_info):
144+ sorted_keylist_mapping = sorted(self.keylist_mapping)
145+ columns = ['data_ts', 'account'] + sorted_keylist_mapping
146+ output = [columns]
147 for (account, year, month, day, hour), d in final_info.items():
148- data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
149- row = [data_ts]
150- row.append('%s' % account)
151+ data_ts = '%04d/%02d/%02d %02d:00:00' % \
152+ (int(year), int(month), int(day), int(hour))
153+ row = [data_ts, '%s' % (account)]
154 for k in sorted_keylist_mapping:
155- row.append('%s' % d[k])
156- out_buf.append(','.join(row))
157- out_buf = '\n'.join(out_buf)
158+ row.append(str(d[k]))
159+ output.append(row)
160+ return output
161+
162+ def store_output(self, output):
163+ out_buf = '\n'.join([','.join(row) for row in output])
164 h = hashlib.md5(out_buf).hexdigest()
165 upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
166 f = cStringIO.StringIO(out_buf)
167 self.log_processor.internal_proxy.upload_file(f,
168- self.log_processor_account,
169- self.log_processor_container,
170- upload_name)
171-
172- # cleanup
173- s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
174- f = cStringIO.StringIO(s)
175- self.log_processor.internal_proxy.upload_file(f,
176- self.log_processor_account,
177- self.log_processor_container,
178- 'processed_files.pickle.gz')
179+ self.log_processor_account,
180+ self.log_processor_container,
181+ upload_name)
182+
183+ @property
184+ def keylist_mapping(self):
185+ if self._keylist_mapping == None:
186+ self._keylist_mapping = \
187+ self.log_processor.generate_keylist_mapping()
188+ return self._keylist_mapping
189+
190+ def process_logs(self, logs_to_process, processed_files):
191+ # map
192+ processor_args = (self.total_conf, self.logger)
193+ results = multiprocess_collate(processor_args, logs_to_process,
194+ self.worker_count)
195+
196+ # reduce
197+ aggr_data = self.get_aggregate_data(processed_files, results)
198+ del results
199+
200+ # group
201+ # reduce a large number of keys in aggr_data[k] to a small
202+ # number of output keys
203+ final_info = self.get_final_info(aggr_data)
204+ del aggr_data
205+
206+ # output
207+ return self.get_output(final_info)
208+
209+ def run_once(self, *args, **kwargs):
210+ start = time.time()
211+ self.logger.info(_("Beginning log processing"))
212+
213+ lookback_start, lookback_end = self.get_lookback_interval()
214+ self.logger.debug('lookback_start: %s' % lookback_start)
215+ self.logger.debug('lookback_end: %s' % lookback_end)
216+
217+ processed_files = self.get_processed_files_list()
218+ if processed_files == None:
219+ self.logger.error(_('Log processing unable to load list of '
220+ 'already processed log files'))
221+ return
222+ self.logger.debug(_('found %d processed files') %
223+ len(processed_files))
224+
225+ logs_to_process = self.log_processor.get_data_list(lookback_start,
226+ lookback_end, processed_files)
227+ self.logger.info(_('loaded %d files to process') %
228+ len(logs_to_process))
229+
230+ if logs_to_process:
231+ output = self.process_logs(logs_to_process, processed_files)
232+ self.store_output(output)
233+ del output
234+
235+ self.store_processed_files_list(processed_files)
236
237 self.logger.info(_("Log processing done (%0.2f minutes)") %
238- ((time.time() - start) / 60))
239+ ((time.time() - start) / 60))
240
241
242 def multiprocess_collate(processor_args, logs_to_process, worker_count):
243
244=== modified file 'test/unit/stats/test_log_processor.py'
245--- test/unit/stats/test_log_processor.py 2011-03-17 22:22:43 +0000
246+++ test/unit/stats/test_log_processor.py 2011-03-24 17:14:22 +0000
247@@ -16,6 +16,10 @@
248 import unittest
249 from test.unit import tmpfile
250 import Queue
251+import datetime
252+import hashlib
253+import pickle
254+import time
255
256 from swift.common import internal_proxy
257 from swift.stats import log_processor
258@@ -26,7 +30,6 @@
259 def __init__(self, *args, **kwargs):
260 pass
261
262-
263 class DumbLogger(object):
264 def __getattr__(self, n):
265 return self.foo
266@@ -77,7 +80,7 @@
267 return self.code, data()
268
269 class TestLogProcessor(unittest.TestCase):
270-
271+
272 access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
273 '09/Jul/2010/04/14/30 GET '\
274 '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
275@@ -85,7 +88,7 @@
276 '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
277 stats_test_line = 'account,1,2,3'
278 proxy_config = {'log-processor': {
279-
280+
281 }
282 }
283
284@@ -426,3 +429,407 @@
285 finally:
286 log_processor.LogProcessor._internal_proxy = None
287 log_processor.LogProcessor.get_object_data = orig_get_object_data
288+
289+class TestLogProcessorDaemon(unittest.TestCase):
290+
291+ def test_get_lookback_interval(self):
292+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
293+ def __init__(self, lookback_hours, lookback_window):
294+ self.lookback_hours = lookback_hours
295+ self.lookback_window = lookback_window
296+
297+ try:
298+ d = datetime.datetime
299+
300+ for x in [
301+ [d(2011, 1, 1), 0, 0, None, None],
302+ [d(2011, 1, 1), 120, 0, '2010122700', None],
303+ [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
304+ [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
305+ [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
306+ [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
307+ ]:
308+
309+ log_processor.now = lambda: x[0]
310+
311+ d = MockLogProcessorDaemon(x[1], x[2])
312+ self.assertEquals((x[3], x[4]), d.get_lookback_interval())
313+ finally:
314+ log_processor.now = datetime.datetime.now
315+
316+ def test_get_processed_files_list(self):
317+ class MockLogProcessor():
318+ def __init__(self, stream):
319+ self.stream = stream
320+
321+ def get_object_data(self, *args, **kwargs):
322+ return self.stream
323+
324+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
325+ def __init__(self, stream):
326+ self.log_processor = MockLogProcessor(stream)
327+ self.log_processor_account = 'account'
328+ self.log_processor_container = 'container'
329+ self.processed_files_filename = 'filename'
330+
331+ file_list = set(['a', 'b', 'c'])
332+
333+ for s, l in [['', None],
334+ [pickle.dumps(set()).split('\n'), set()],
335+ [pickle.dumps(file_list).split('\n'), file_list],
336+ ]:
337+
338+ self.assertEquals(l,
339+ MockLogProcessorDaemon(s).get_processed_files_list())
340+
341+ def test_get_processed_files_list_bad_file_downloads(self):
342+ class MockLogProcessor():
343+ def __init__(self, status_code):
344+ self.err = log_processor.BadFileDownload(status_code)
345+
346+ def get_object_data(self, *a, **k):
347+ raise self.err
348+
349+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
350+ def __init__(self, status_code):
351+ self.log_processor = MockLogProcessor(status_code)
352+ self.log_processor_account = 'account'
353+ self.log_processor_container = 'container'
354+ self.processed_files_filename = 'filename'
355+
356+ for c, l in [[404, set()], [503, None], [None, None]]:
357+ self.assertEquals(l,
358+ MockLogProcessorDaemon(c).get_processed_files_list())
359+
360+ def test_get_aggregate_data(self):
361+ # when run "for real"
362+ # the various keys/values in the input and output
363+ # dictionaries are often not simple strings
364+ # for testing we can use keys that are easier to work with
365+
366+ processed_files = set()
367+
368+ data_in = [
369+ ['file1', {
370+ 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
371+ 'acct1_time2': {'field1': 4, 'field2': 5},
372+ 'acct2_time1': {'field1': 6, 'field2': 7},
373+ 'acct3_time3': {'field1': 8, 'field2': 9},
374+ }
375+ ],
376+ ['file2', {'acct1_time1': {'field1': 10}}],
377+ ]
378+
379+ expected_data_out = {
380+ 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
381+ 'acct1_time2': {'field1': 4, 'field2': 5},
382+ 'acct2_time1': {'field1': 6, 'field2': 7},
383+ 'acct3_time3': {'field1': 8, 'field2': 9},
384+ }
385+
386+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
387+ def __init__(self):
388+ pass
389+
390+ d = MockLogProcessorDaemon()
391+ data_out = d.get_aggregate_data(processed_files, data_in)
392+
393+ for k, v in expected_data_out.items():
394+ self.assertEquals(v, data_out[k])
395+
396+ self.assertEquals(set(['file1', 'file2']), processed_files)
397+
398+ def test_get_final_info(self):
399+ # when run "for real"
400+ # the various keys/values in the input and output
401+ # dictionaries are often not simple strings
402+ # for testing we can use keys/values that are easier to work with
403+
404+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
405+ def __init__(self):
406+ self._keylist_mapping = {
407+ 'out_field1':['field1', 'field2', 'field3'],
408+ 'out_field2':['field2', 'field3'],
409+ 'out_field3':['field3'],
410+ 'out_field4':'field4',
411+ 'out_field5':['field6', 'field7', 'field8'],
412+ 'out_field6':['field6'],
413+ 'out_field7':'field7',
414+ }
415+
416+ data_in = {
417+ 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
418+ 'field4': 8, 'field5': 11},
419+ 'acct1_time2': {'field1': 4, 'field2': 5},
420+ 'acct2_time1': {'field1': 6, 'field2': 7},
421+ 'acct3_time3': {'field1': 8, 'field2': 9},
422+ }
423+
424+ expected_data_out = {
425+ 'acct1_time1': {'out_field1': 16, 'out_field2': 5,
426+ 'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
427+ 'out_field6': 0, 'out_field7': 0,},
428+ 'acct1_time2': {'out_field1': 9, 'out_field2': 5,
429+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
430+ 'out_field6': 0, 'out_field7': 0,},
431+ 'acct2_time1': {'out_field1': 13, 'out_field2': 7,
432+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
433+ 'out_field6': 0, 'out_field7': 0,},
434+ 'acct3_time3': {'out_field1': 17, 'out_field2': 9,
435+ 'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
436+ 'out_field6': 0, 'out_field7': 0,},
437+ }
438+
439+ self.assertEquals(expected_data_out,
440+ MockLogProcessorDaemon().get_final_info(data_in))
441+
442+ def test_store_processed_files_list(self):
443+ class MockInternalProxy:
444+ def __init__(self, test, daemon, processed_files):
445+ self.test = test
446+ self.daemon = daemon
447+ self.processed_files = processed_files
448+
449+ def upload_file(self, f, account, container, filename):
450+ self.test.assertEquals(self.processed_files,
451+ pickle.loads(f.getvalue()))
452+ self.test.assertEquals(self.daemon.log_processor_account,
453+ account)
454+ self.test.assertEquals(self.daemon.log_processor_container,
455+ container)
456+ self.test.assertEquals(self.daemon.processed_files_filename,
457+ filename)
458+
459+ class MockLogProcessor:
460+ def __init__(self, test, daemon, processed_files):
461+ self.internal_proxy = MockInternalProxy(test, daemon,
462+ processed_files)
463+
464+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
465+ def __init__(self, test, processed_files):
466+ self.log_processor = \
467+ MockLogProcessor(test, self, processed_files)
468+ self.log_processor_account = 'account'
469+ self.log_processor_container = 'container'
470+ self.processed_files_filename = 'filename'
471+
472+ processed_files = set(['a', 'b', 'c'])
473+ MockLogProcessorDaemon(self, processed_files).\
474+ store_processed_files_list(processed_files)
475+
476+ def test_get_output(self):
477+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
478+ def __init__(self):
479+ self._keylist_mapping = {'a':None, 'b':None, 'c':None}
480+
481+ data_in = {
482+ ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
483+ ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
484+ ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
485+ ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
486+ }
487+
488+ expected_data_out = [
489+ ['data_ts', 'account', 'a', 'b', 'c'],
490+ ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
491+ ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
492+ ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
493+ ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
494+ ]
495+
496+ data_out = MockLogProcessorDaemon().get_output(data_in)
497+ self.assertEquals(expected_data_out[0], data_out[0])
498+
499+ for row in data_out[1:]:
500+ self.assert_(row in expected_data_out)
501+
502+ for row in expected_data_out[1:]:
503+ self.assert_(row in data_out)
504+
505+ def test_store_output(self):
506+ try:
507+ real_strftime = time.strftime
508+ mock_strftime_return = '2010/03/02/01/'
509+ def mock_strftime(format):
510+ self.assertEquals('%Y/%m/%d/%H/', format)
511+ return mock_strftime_return
512+ log_processor.time.strftime = mock_strftime
513+
514+ data_in = [
515+ ['data_ts', 'account', 'a', 'b', 'c'],
516+ ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
517+ ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
518+ ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
519+ ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
520+ ]
521+
522+ expected_output = '\n'.join([','.join(row) for row in data_in])
523+ h = hashlib.md5(expected_output).hexdigest()
524+ expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
525+
526+ class MockInternalProxy:
527+ def __init__(self, test, daemon, expected_filename,
528+ expected_output):
529+ self.test = test
530+ self.daemon = daemon
531+ self.expected_filename = expected_filename
532+ self.expected_output = expected_output
533+
534+ def upload_file(self, f, account, container, filename):
535+ self.test.assertEquals(self.daemon.log_processor_account,
536+ account)
537+ self.test.assertEquals(self.daemon.log_processor_container,
538+ container)
539+ self.test.assertEquals(self.expected_filename, filename)
540+ self.test.assertEquals(self.expected_output, f.getvalue())
541+
542+ class MockLogProcessor:
543+ def __init__(self, test, daemon, expected_filename,
544+ expected_output):
545+ self.internal_proxy = MockInternalProxy(test, daemon,
546+ expected_filename, expected_output)
547+
548+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
549+ def __init__(self, test, expected_filename, expected_output):
550+ self.log_processor = MockLogProcessor(test, self,
551+ expected_filename, expected_output)
552+ self.log_processor_account = 'account'
553+ self.log_processor_container = 'container'
554+ self.processed_files_filename = 'filename'
555+
556+ MockLogProcessorDaemon(self, expected_filename, expected_output).\
557+ store_output(data_in)
558+ finally:
559+ log_processor.time.strftime = real_strftime
560+
561+ def test_keylist_mapping(self):
562+ # Kind of lame test to see if the propery is both
563+ # generated by a particular method and cached properly.
564+ # The method that actually generates the mapping is
565+ # tested elsewhere.
566+
567+ value_return = 'keylist_mapping'
568+ class MockLogProcessor:
569+ def __init__(self):
570+ self.call_count = 0
571+
572+ def generate_keylist_mapping(self):
573+ self.call_count += 1
574+ return value_return
575+
576+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
577+ def __init__(self):
578+ self.log_processor = MockLogProcessor()
579+ self._keylist_mapping = None
580+
581+ d = MockLogProcessorDaemon()
582+ self.assertEquals(value_return, d.keylist_mapping)
583+ self.assertEquals(value_return, d.keylist_mapping)
584+ self.assertEquals(1, d.log_processor.call_count)
585+
586+ def test_process_logs(self):
587+ try:
588+ mock_logs_to_process = 'logs_to_process'
589+ mock_processed_files = 'processed_files'
590+
591+ real_multiprocess_collate = log_processor.multiprocess_collate
592+ multiprocess_collate_return = 'multiprocess_collate_return'
593+
594+ get_aggregate_data_return = 'get_aggregate_data_return'
595+ get_final_info_return = 'get_final_info_return'
596+ get_output_return = 'get_output_return'
597+
598+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
599+ def __init__(self, test):
600+ self.test = test
601+ self.total_conf = 'total_conf'
602+ self.logger = 'logger'
603+ self.worker_count = 'worker_count'
604+
605+ def get_aggregate_data(self, processed_files, results):
606+ self.test.assertEquals(mock_processed_files, processed_files)
607+ self.test.assertEquals(multiprocess_collate_return, results)
608+ return get_aggregate_data_return
609+
610+ def get_final_info(self, aggr_data):
611+ self.test.assertEquals(get_aggregate_data_return, aggr_data)
612+ return get_final_info_return
613+
614+ def get_output(self, final_info):
615+ self.test.assertEquals(get_final_info_return, final_info)
616+ return get_output_return
617+
618+ d = MockLogProcessorDaemon(self)
619+
620+ def mock_multiprocess_collate(processor_args, logs_to_process,
621+ worker_count):
622+ self.assertEquals(d.total_conf, processor_args[0])
623+ self.assertEquals(d.logger, processor_args[1])
624+
625+ self.assertEquals(mock_logs_to_process, logs_to_process)
626+ self.assertEquals(d.worker_count, worker_count)
627+
628+ return multiprocess_collate_return
629+
630+ log_processor.multiprocess_collate = mock_multiprocess_collate
631+
632+ output = d.process_logs(mock_logs_to_process, mock_processed_files)
633+ self.assertEquals(get_output_return, output)
634+ finally:
635+ log_processor.multiprocess_collate = real_multiprocess_collate
636+
637+ def test_run_once_get_processed_files_list_returns_none(self):
638+ class MockLogProcessor:
639+ def get_data_list(self, lookback_start, lookback_end,
640+ processed_files):
641+ raise unittest.TestCase.failureException, \
642+ 'Method should not be called'
643+
644+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
645+ def __init__(self):
646+ self.logger = DumbLogger()
647+ self.log_processor = MockLogProcessor()
648+
649+ def get_lookback_interval(self):
650+ return None, None
651+
652+ def get_processed_files_list(self):
653+ return None
654+
655+ MockLogProcessorDaemon().run_once()
656+
657+ def test_run_once_no_logs_to_process(self):
658+ class MockLogProcessor():
659+ def __init__(self, daemon, test):
660+ self.daemon = daemon
661+ self.test = test
662+
663+ def get_data_list(self, lookback_start, lookback_end,
664+ processed_files):
665+ self.test.assertEquals(self.daemon.lookback_start,
666+ lookback_start)
667+ self.test.assertEquals(self.daemon.lookback_end,
668+ lookback_end)
669+ self.test.assertEquals(self.daemon.processed_files,
670+ processed_files)
671+ return []
672+
673+ class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
674+ def __init__(self, test):
675+ self.logger = DumbLogger()
676+ self.log_processor = MockLogProcessor(self, test)
677+ self.lookback_start = 'lookback_start'
678+ self.lookback_end = 'lookback_end'
679+ self.processed_files = ['a', 'b', 'c']
680+
681+ def get_lookback_interval(self):
682+ return self.lookback_start, self.lookback_end
683+
684+ def get_processed_files_list(self):
685+ return self.processed_files
686+
687+ def process_logs(logs_to_process, processed_files):
688+ raise unittest.TestCase.failureException, \
689+ 'Method should not be called'
690+
691+ MockLogProcessorDaemon(self).run_once()