Merge lp:~greglange/swift/lp725215 into lp:~hudson-openstack/swift/trunk
- lp725215
- Merge into trunk
Proposed by
Greg Lange
Status: | Superseded |
---|---|
Proposed branch: | lp:~greglange/swift/lp725215 |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
691 lines (+517/-69) 2 files modified
swift/stats/log_processor.py (+107/-66) test/unit/stats/test_log_processor.py (+410/-3) |
To merge this branch: | bzr merge lp:~greglange/swift/lp725215 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Greg Lange (community) | Needs Resubmitting | ||
John Dickinson | Needs Fixing | ||
Review via email: mp+54549@code.launchpad.net |
This proposal has been superseded by a proposal from 2011-03-30.
Commit message
Description of the change
Refactored the log processing daemon to make it more testable.
Added tests for that.
I shouldn't have changed how it worked at all.
This needs to be tested on staging extensively before being pushed to production.
To post a comment you must log in.
lp:~greglange/swift/lp725215
updated
- 254. By Greg Lange <email address hidden>
-
fixed test and pep8 problem
Revision history for this message
Greg Lange (greglange) : | # |
review:
Needs Resubmitting
lp:~greglange/swift/lp725215
updated
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'swift/stats/log_processor.py' |
2 | --- swift/stats/log_processor.py 2011-03-15 15:32:27 +0000 |
3 | +++ swift/stats/log_processor.py 2011-03-24 17:14:22 +0000 |
4 | @@ -30,6 +30,8 @@ |
5 | from swift.common.utils import get_logger, readconf |
6 | from swift.common.daemon import Daemon |
7 | |
8 | +now = datetime.datetime.now |
9 | + |
10 | |
11 | class BadFileDownload(Exception): |
12 | def __init__(self, status_code=None): |
13 | @@ -234,27 +236,28 @@ |
14 | self.log_processor_container = c.get('container_name', |
15 | 'log_processing_data') |
16 | self.worker_count = int(c.get('worker_count', '1')) |
17 | + self._keylist_mapping = None |
18 | + self.processed_files_filename = 'processed_files.pickle.gz' |
19 | |
20 | - def run_once(self, *args, **kwargs): |
21 | - self.logger.info(_("Beginning log processing")) |
22 | - start = time.time() |
23 | + def get_lookback_interval(self): |
24 | if self.lookback_hours == 0: |
25 | lookback_start = None |
26 | lookback_end = None |
27 | else: |
28 | delta_hours = datetime.timedelta(hours=self.lookback_hours) |
29 | - lookback_start = datetime.datetime.now() - delta_hours |
30 | + lookback_start = now() - delta_hours |
31 | lookback_start = lookback_start.strftime('%Y%m%d%H') |
32 | if self.lookback_window == 0: |
33 | lookback_end = None |
34 | else: |
35 | delta_window = datetime.timedelta(hours=self.lookback_window) |
36 | - lookback_end = datetime.datetime.now() - \ |
37 | + lookback_end = now() - \ |
38 | delta_hours + \ |
39 | delta_window |
40 | lookback_end = lookback_end.strftime('%Y%m%d%H') |
41 | - self.logger.debug('lookback_start: %s' % lookback_start) |
42 | - self.logger.debug('lookback_end: %s' % lookback_end) |
43 | + return lookback_start, lookback_end |
44 | + |
45 | + def get_processed_files_list(self): |
46 | try: |
47 | # Note: this file (or data set) will grow without bound. |
48 | # In practice, if it becomes a problem (say, after many months of |
49 | @@ -262,43 +265,25 @@ |
50 | # entries. Automatically pruning on each run could be dangerous. |
51 | # There is not a good way to determine when an old entry should be |
52 | # pruned (lookback_hours could be set to anything and could change) |
53 | - processed_files_stream = self.log_processor.get_object_data( |
54 | - self.log_processor_account, |
55 | - self.log_processor_container, |
56 | - 'processed_files.pickle.gz', |
57 | - compressed=True) |
58 | - buf = '\n'.join(x for x in processed_files_stream) |
59 | + stream = self.log_processor.get_object_data( |
60 | + self.log_processor_account, |
61 | + self.log_processor_container, |
62 | + self.processed_files_filename, |
63 | + compressed=True) |
64 | + buf = '\n'.join(x for x in stream) |
65 | if buf: |
66 | - already_processed_files = cPickle.loads(buf) |
67 | + files = cPickle.loads(buf) |
68 | else: |
69 | - already_processed_files = set() |
70 | + return None |
71 | except BadFileDownload, err: |
72 | if err.status_code == 404: |
73 | - already_processed_files = set() |
74 | + files = set() |
75 | else: |
76 | - self.logger.error(_('Log processing unable to load list of ' |
77 | - 'already processed log files')) |
78 | - return |
79 | - self.logger.debug(_('found %d processed files') % \ |
80 | - len(already_processed_files)) |
81 | - logs_to_process = self.log_processor.get_data_list(lookback_start, |
82 | - lookback_end, |
83 | - already_processed_files) |
84 | - self.logger.info(_('loaded %d files to process') % |
85 | - len(logs_to_process)) |
86 | - if not logs_to_process: |
87 | - self.logger.info(_("Log processing done (%0.2f minutes)") % |
88 | - ((time.time() - start) / 60)) |
89 | - return |
90 | - |
91 | - # map |
92 | - processor_args = (self.total_conf, self.logger) |
93 | - results = multiprocess_collate(processor_args, logs_to_process, |
94 | - self.worker_count) |
95 | - |
96 | - #reduce |
97 | + return None |
98 | + return files |
99 | + |
100 | + def get_aggregate_data(self, processed_files, results): |
101 | aggr_data = {} |
102 | - processed_files = already_processed_files |
103 | for item, data in results: |
104 | # since item contains the plugin and the log name, new plugins will |
105 | # "reprocess" the file and the results will be in the final csv. |
106 | @@ -311,14 +296,12 @@ |
107 | # processing plugins need to realize this |
108 | existing_data[i] = current + j |
109 | aggr_data[k] = existing_data |
110 | + return aggr_data |
111 | |
112 | - # group |
113 | - # reduce a large number of keys in aggr_data[k] to a small number of |
114 | - # output keys |
115 | - keylist_mapping = self.log_processor.generate_keylist_mapping() |
116 | + def get_final_info(self, aggr_data): |
117 | final_info = collections.defaultdict(dict) |
118 | for account, data in aggr_data.items(): |
119 | - for key, mapping in keylist_mapping.items(): |
120 | + for key, mapping in self.keylist_mapping.items(): |
121 | if isinstance(mapping, (list, set)): |
122 | value = 0 |
123 | for k in mapping: |
124 | @@ -332,37 +315,95 @@ |
125 | except KeyError: |
126 | value = 0 |
127 | final_info[account][key] = value |
128 | - |
129 | - # output |
130 | - sorted_keylist_mapping = sorted(keylist_mapping) |
131 | - columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) |
132 | - out_buf = [columns] |
133 | + return final_info |
134 | + |
135 | + def store_processed_files_list(self, processed_files): |
136 | + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) |
137 | + f = cStringIO.StringIO(s) |
138 | + self.log_processor.internal_proxy.upload_file(f, |
139 | + self.log_processor_account, |
140 | + self.log_processor_container, |
141 | + self.processed_files_filename) |
142 | + |
143 | + def get_output(self, final_info): |
144 | + sorted_keylist_mapping = sorted(self.keylist_mapping) |
145 | + columns = ['data_ts', 'account'] + sorted_keylist_mapping |
146 | + output = [columns] |
147 | for (account, year, month, day, hour), d in final_info.items(): |
148 | - data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) |
149 | - row = [data_ts] |
150 | - row.append('%s' % account) |
151 | + data_ts = '%04d/%02d/%02d %02d:00:00' % \ |
152 | + (int(year), int(month), int(day), int(hour)) |
153 | + row = [data_ts, '%s' % (account)] |
154 | for k in sorted_keylist_mapping: |
155 | - row.append('%s' % d[k]) |
156 | - out_buf.append(','.join(row)) |
157 | - out_buf = '\n'.join(out_buf) |
158 | + row.append(str(d[k])) |
159 | + output.append(row) |
160 | + return output |
161 | + |
162 | + def store_output(self, output): |
163 | + out_buf = '\n'.join([','.join(row) for row in output]) |
164 | h = hashlib.md5(out_buf).hexdigest() |
165 | upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h |
166 | f = cStringIO.StringIO(out_buf) |
167 | self.log_processor.internal_proxy.upload_file(f, |
168 | - self.log_processor_account, |
169 | - self.log_processor_container, |
170 | - upload_name) |
171 | - |
172 | - # cleanup |
173 | - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) |
174 | - f = cStringIO.StringIO(s) |
175 | - self.log_processor.internal_proxy.upload_file(f, |
176 | - self.log_processor_account, |
177 | - self.log_processor_container, |
178 | - 'processed_files.pickle.gz') |
179 | + self.log_processor_account, |
180 | + self.log_processor_container, |
181 | + upload_name) |
182 | + |
183 | + @property |
184 | + def keylist_mapping(self): |
185 | + if self._keylist_mapping == None: |
186 | + self._keylist_mapping = \ |
187 | + self.log_processor.generate_keylist_mapping() |
188 | + return self._keylist_mapping |
189 | + |
190 | + def process_logs(self, logs_to_process, processed_files): |
191 | + # map |
192 | + processor_args = (self.total_conf, self.logger) |
193 | + results = multiprocess_collate(processor_args, logs_to_process, |
194 | + self.worker_count) |
195 | + |
196 | + # reduce |
197 | + aggr_data = self.get_aggregate_data(processed_files, results) |
198 | + del results |
199 | + |
200 | + # group |
201 | + # reduce a large number of keys in aggr_data[k] to a small |
202 | + # number of output keys |
203 | + final_info = self.get_final_info(aggr_data) |
204 | + del aggr_data |
205 | + |
206 | + # output |
207 | + return self.get_output(final_info) |
208 | + |
209 | + def run_once(self, *args, **kwargs): |
210 | + start = time.time() |
211 | + self.logger.info(_("Beginning log processing")) |
212 | + |
213 | + lookback_start, lookback_end = self.get_lookback_interval() |
214 | + self.logger.debug('lookback_start: %s' % lookback_start) |
215 | + self.logger.debug('lookback_end: %s' % lookback_end) |
216 | + |
217 | + processed_files = self.get_processed_files_list() |
218 | + if processed_files == None: |
219 | + self.logger.error(_('Log processing unable to load list of ' |
220 | + 'already processed log files')) |
221 | + return |
222 | + self.logger.debug(_('found %d processed files') % |
223 | + len(processed_files)) |
224 | + |
225 | + logs_to_process = self.log_processor.get_data_list(lookback_start, |
226 | + lookback_end, processed_files) |
227 | + self.logger.info(_('loaded %d files to process') % |
228 | + len(logs_to_process)) |
229 | + |
230 | + if logs_to_process: |
231 | + output = self.process_logs(logs_to_process, processed_files) |
232 | + self.store_output(output) |
233 | + del output |
234 | + |
235 | + self.store_processed_files_list(processed_files) |
236 | |
237 | self.logger.info(_("Log processing done (%0.2f minutes)") % |
238 | - ((time.time() - start) / 60)) |
239 | + ((time.time() - start) / 60)) |
240 | |
241 | |
242 | def multiprocess_collate(processor_args, logs_to_process, worker_count): |
243 | |
244 | === modified file 'test/unit/stats/test_log_processor.py' |
245 | --- test/unit/stats/test_log_processor.py 2011-03-17 22:22:43 +0000 |
246 | +++ test/unit/stats/test_log_processor.py 2011-03-24 17:14:22 +0000 |
247 | @@ -16,6 +16,10 @@ |
248 | import unittest |
249 | from test.unit import tmpfile |
250 | import Queue |
251 | +import datetime |
252 | +import hashlib |
253 | +import pickle |
254 | +import time |
255 | |
256 | from swift.common import internal_proxy |
257 | from swift.stats import log_processor |
258 | @@ -26,7 +30,6 @@ |
259 | def __init__(self, *args, **kwargs): |
260 | pass |
261 | |
262 | - |
263 | class DumbLogger(object): |
264 | def __getattr__(self, n): |
265 | return self.foo |
266 | @@ -77,7 +80,7 @@ |
267 | return self.code, data() |
268 | |
269 | class TestLogProcessor(unittest.TestCase): |
270 | - |
271 | + |
272 | access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ |
273 | '09/Jul/2010/04/14/30 GET '\ |
274 | '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ |
275 | @@ -85,7 +88,7 @@ |
276 | '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' |
277 | stats_test_line = 'account,1,2,3' |
278 | proxy_config = {'log-processor': { |
279 | - |
280 | + |
281 | } |
282 | } |
283 | |
284 | @@ -426,3 +429,407 @@ |
285 | finally: |
286 | log_processor.LogProcessor._internal_proxy = None |
287 | log_processor.LogProcessor.get_object_data = orig_get_object_data |
288 | + |
289 | +class TestLogProcessorDaemon(unittest.TestCase): |
290 | + |
291 | + def test_get_lookback_interval(self): |
292 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
293 | + def __init__(self, lookback_hours, lookback_window): |
294 | + self.lookback_hours = lookback_hours |
295 | + self.lookback_window = lookback_window |
296 | + |
297 | + try: |
298 | + d = datetime.datetime |
299 | + |
300 | + for x in [ |
301 | + [d(2011, 1, 1), 0, 0, None, None], |
302 | + [d(2011, 1, 1), 120, 0, '2010122700', None], |
303 | + [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'], |
304 | + [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'], |
305 | + [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'], |
306 | + [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'], |
307 | + ]: |
308 | + |
309 | + log_processor.now = lambda: x[0] |
310 | + |
311 | + d = MockLogProcessorDaemon(x[1], x[2]) |
312 | + self.assertEquals((x[3], x[4]), d.get_lookback_interval()) |
313 | + finally: |
314 | + log_processor.now = datetime.datetime.now |
315 | + |
316 | + def test_get_processed_files_list(self): |
317 | + class MockLogProcessor(): |
318 | + def __init__(self, stream): |
319 | + self.stream = stream |
320 | + |
321 | + def get_object_data(self, *args, **kwargs): |
322 | + return self.stream |
323 | + |
324 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
325 | + def __init__(self, stream): |
326 | + self.log_processor = MockLogProcessor(stream) |
327 | + self.log_processor_account = 'account' |
328 | + self.log_processor_container = 'container' |
329 | + self.processed_files_filename = 'filename' |
330 | + |
331 | + file_list = set(['a', 'b', 'c']) |
332 | + |
333 | + for s, l in [['', None], |
334 | + [pickle.dumps(set()).split('\n'), set()], |
335 | + [pickle.dumps(file_list).split('\n'), file_list], |
336 | + ]: |
337 | + |
338 | + self.assertEquals(l, |
339 | + MockLogProcessorDaemon(s).get_processed_files_list()) |
340 | + |
341 | + def test_get_processed_files_list_bad_file_downloads(self): |
342 | + class MockLogProcessor(): |
343 | + def __init__(self, status_code): |
344 | + self.err = log_processor.BadFileDownload(status_code) |
345 | + |
346 | + def get_object_data(self, *a, **k): |
347 | + raise self.err |
348 | + |
349 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
350 | + def __init__(self, status_code): |
351 | + self.log_processor = MockLogProcessor(status_code) |
352 | + self.log_processor_account = 'account' |
353 | + self.log_processor_container = 'container' |
354 | + self.processed_files_filename = 'filename' |
355 | + |
356 | + for c, l in [[404, set()], [503, None], [None, None]]: |
357 | + self.assertEquals(l, |
358 | + MockLogProcessorDaemon(c).get_processed_files_list()) |
359 | + |
360 | + def test_get_aggregate_data(self): |
361 | + # when run "for real" |
362 | + # the various keys/values in the input and output |
363 | + # dictionaries are often not simple strings |
364 | + # for testing we can use keys that are easier to work with |
365 | + |
366 | + processed_files = set() |
367 | + |
368 | + data_in = [ |
369 | + ['file1', { |
370 | + 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3}, |
371 | + 'acct1_time2': {'field1': 4, 'field2': 5}, |
372 | + 'acct2_time1': {'field1': 6, 'field2': 7}, |
373 | + 'acct3_time3': {'field1': 8, 'field2': 9}, |
374 | + } |
375 | + ], |
376 | + ['file2', {'acct1_time1': {'field1': 10}}], |
377 | + ] |
378 | + |
379 | + expected_data_out = { |
380 | + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3}, |
381 | + 'acct1_time2': {'field1': 4, 'field2': 5}, |
382 | + 'acct2_time1': {'field1': 6, 'field2': 7}, |
383 | + 'acct3_time3': {'field1': 8, 'field2': 9}, |
384 | + } |
385 | + |
386 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
387 | + def __init__(self): |
388 | + pass |
389 | + |
390 | + d = MockLogProcessorDaemon() |
391 | + data_out = d.get_aggregate_data(processed_files, data_in) |
392 | + |
393 | + for k, v in expected_data_out.items(): |
394 | + self.assertEquals(v, data_out[k]) |
395 | + |
396 | + self.assertEquals(set(['file1', 'file2']), processed_files) |
397 | + |
398 | + def test_get_final_info(self): |
399 | + # when run "for real" |
400 | + # the various keys/values in the input and output |
401 | + # dictionaries are often not simple strings |
402 | + # for testing we can use keys/values that are easier to work with |
403 | + |
404 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
405 | + def __init__(self): |
406 | + self._keylist_mapping = { |
407 | + 'out_field1':['field1', 'field2', 'field3'], |
408 | + 'out_field2':['field2', 'field3'], |
409 | + 'out_field3':['field3'], |
410 | + 'out_field4':'field4', |
411 | + 'out_field5':['field6', 'field7', 'field8'], |
412 | + 'out_field6':['field6'], |
413 | + 'out_field7':'field7', |
414 | + } |
415 | + |
416 | + data_in = { |
417 | + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3, |
418 | + 'field4': 8, 'field5': 11}, |
419 | + 'acct1_time2': {'field1': 4, 'field2': 5}, |
420 | + 'acct2_time1': {'field1': 6, 'field2': 7}, |
421 | + 'acct3_time3': {'field1': 8, 'field2': 9}, |
422 | + } |
423 | + |
424 | + expected_data_out = { |
425 | + 'acct1_time1': {'out_field1': 16, 'out_field2': 5, |
426 | + 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, |
427 | + 'out_field6': 0, 'out_field7': 0,}, |
428 | + 'acct1_time2': {'out_field1': 9, 'out_field2': 5, |
429 | + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, |
430 | + 'out_field6': 0, 'out_field7': 0,}, |
431 | + 'acct2_time1': {'out_field1': 13, 'out_field2': 7, |
432 | + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, |
433 | + 'out_field6': 0, 'out_field7': 0,}, |
434 | + 'acct3_time3': {'out_field1': 17, 'out_field2': 9, |
435 | + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, |
436 | + 'out_field6': 0, 'out_field7': 0,}, |
437 | + } |
438 | + |
439 | + self.assertEquals(expected_data_out, |
440 | + MockLogProcessorDaemon().get_final_info(data_in)) |
441 | + |
442 | + def test_store_processed_files_list(self): |
443 | + class MockInternalProxy: |
444 | + def __init__(self, test, daemon, processed_files): |
445 | + self.test = test |
446 | + self.daemon = daemon |
447 | + self.processed_files = processed_files |
448 | + |
449 | + def upload_file(self, f, account, container, filename): |
450 | + self.test.assertEquals(self.processed_files, |
451 | + pickle.loads(f.getvalue())) |
452 | + self.test.assertEquals(self.daemon.log_processor_account, |
453 | + account) |
454 | + self.test.assertEquals(self.daemon.log_processor_container, |
455 | + container) |
456 | + self.test.assertEquals(self.daemon.processed_files_filename, |
457 | + filename) |
458 | + |
459 | + class MockLogProcessor: |
460 | + def __init__(self, test, daemon, processed_files): |
461 | + self.internal_proxy = MockInternalProxy(test, daemon, |
462 | + processed_files) |
463 | + |
464 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
465 | + def __init__(self, test, processed_files): |
466 | + self.log_processor = \ |
467 | + MockLogProcessor(test, self, processed_files) |
468 | + self.log_processor_account = 'account' |
469 | + self.log_processor_container = 'container' |
470 | + self.processed_files_filename = 'filename' |
471 | + |
472 | + processed_files = set(['a', 'b', 'c']) |
473 | + MockLogProcessorDaemon(self, processed_files).\ |
474 | + store_processed_files_list(processed_files) |
475 | + |
476 | + def test_get_output(self): |
477 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
478 | + def __init__(self): |
479 | + self._keylist_mapping = {'a':None, 'b':None, 'c':None} |
480 | + |
481 | + data_in = { |
482 | + ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3}, |
483 | + ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30}, |
484 | + ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12}, |
485 | + ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25}, |
486 | + } |
487 | + |
488 | + expected_data_out = [ |
489 | + ['data_ts', 'account', 'a', 'b', 'c'], |
490 | + ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'], |
491 | + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], |
492 | + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], |
493 | + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], |
494 | + ] |
495 | + |
496 | + data_out = MockLogProcessorDaemon().get_output(data_in) |
497 | + self.assertEquals(expected_data_out[0], data_out[0]) |
498 | + |
499 | + for row in data_out[1:]: |
500 | + self.assert_(row in expected_data_out) |
501 | + |
502 | + for row in expected_data_out[1:]: |
503 | + self.assert_(row in data_out) |
504 | + |
505 | + def test_store_output(self): |
506 | + try: |
507 | + real_strftime = time.strftime |
508 | + mock_strftime_return = '2010/03/02/01/' |
509 | + def mock_strftime(format): |
510 | + self.assertEquals('%Y/%m/%d/%H/', format) |
511 | + return mock_strftime_return |
512 | + log_processor.time.strftime = mock_strftime |
513 | + |
514 | + data_in = [ |
515 | + ['data_ts', 'account', 'a', 'b', 'c'], |
516 | + ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'], |
517 | + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], |
518 | + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], |
519 | + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], |
520 | + ] |
521 | + |
522 | + expected_output = '\n'.join([','.join(row) for row in data_in]) |
523 | + h = hashlib.md5(expected_output).hexdigest() |
524 | + expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h) |
525 | + |
526 | + class MockInternalProxy: |
527 | + def __init__(self, test, daemon, expected_filename, |
528 | + expected_output): |
529 | + self.test = test |
530 | + self.daemon = daemon |
531 | + self.expected_filename = expected_filename |
532 | + self.expected_output = expected_output |
533 | + |
534 | + def upload_file(self, f, account, container, filename): |
535 | + self.test.assertEquals(self.daemon.log_processor_account, |
536 | + account) |
537 | + self.test.assertEquals(self.daemon.log_processor_container, |
538 | + container) |
539 | + self.test.assertEquals(self.expected_filename, filename) |
540 | + self.test.assertEquals(self.expected_output, f.getvalue()) |
541 | + |
542 | + class MockLogProcessor: |
543 | + def __init__(self, test, daemon, expected_filename, |
544 | + expected_output): |
545 | + self.internal_proxy = MockInternalProxy(test, daemon, |
546 | + expected_filename, expected_output) |
547 | + |
548 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
549 | + def __init__(self, test, expected_filename, expected_output): |
550 | + self.log_processor = MockLogProcessor(test, self, |
551 | + expected_filename, expected_output) |
552 | + self.log_processor_account = 'account' |
553 | + self.log_processor_container = 'container' |
554 | + self.processed_files_filename = 'filename' |
555 | + |
556 | + MockLogProcessorDaemon(self, expected_filename, expected_output).\ |
557 | + store_output(data_in) |
558 | + finally: |
559 | + log_processor.time.strftime = real_strftime |
560 | + |
561 | + def test_keylist_mapping(self): |
562 | + # Kind of lame test to see if the propery is both |
563 | + # generated by a particular method and cached properly. |
564 | + # The method that actually generates the mapping is |
565 | + # tested elsewhere. |
566 | + |
567 | + value_return = 'keylist_mapping' |
568 | + class MockLogProcessor: |
569 | + def __init__(self): |
570 | + self.call_count = 0 |
571 | + |
572 | + def generate_keylist_mapping(self): |
573 | + self.call_count += 1 |
574 | + return value_return |
575 | + |
576 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
577 | + def __init__(self): |
578 | + self.log_processor = MockLogProcessor() |
579 | + self._keylist_mapping = None |
580 | + |
581 | + d = MockLogProcessorDaemon() |
582 | + self.assertEquals(value_return, d.keylist_mapping) |
583 | + self.assertEquals(value_return, d.keylist_mapping) |
584 | + self.assertEquals(1, d.log_processor.call_count) |
585 | + |
586 | + def test_process_logs(self): |
587 | + try: |
588 | + mock_logs_to_process = 'logs_to_process' |
589 | + mock_processed_files = 'processed_files' |
590 | + |
591 | + real_multiprocess_collate = log_processor.multiprocess_collate |
592 | + multiprocess_collate_return = 'multiprocess_collate_return' |
593 | + |
594 | + get_aggregate_data_return = 'get_aggregate_data_return' |
595 | + get_final_info_return = 'get_final_info_return' |
596 | + get_output_return = 'get_output_return' |
597 | + |
598 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
599 | + def __init__(self, test): |
600 | + self.test = test |
601 | + self.total_conf = 'total_conf' |
602 | + self.logger = 'logger' |
603 | + self.worker_count = 'worker_count' |
604 | + |
605 | + def get_aggregate_data(self, processed_files, results): |
606 | + self.test.assertEquals(mock_processed_files, processed_files) |
607 | + self.test.assertEquals(multiprocess_collate_return, results) |
608 | + return get_aggregate_data_return |
609 | + |
610 | + def get_final_info(self, aggr_data): |
611 | + self.test.assertEquals(get_aggregate_data_return, aggr_data) |
612 | + return get_final_info_return |
613 | + |
614 | + def get_output(self, final_info): |
615 | + self.test.assertEquals(get_final_info_return, final_info) |
616 | + return get_output_return |
617 | + |
618 | + d = MockLogProcessorDaemon(self) |
619 | + |
620 | + def mock_multiprocess_collate(processor_args, logs_to_process, |
621 | + worker_count): |
622 | + self.assertEquals(d.total_conf, processor_args[0]) |
623 | + self.assertEquals(d.logger, processor_args[1]) |
624 | + |
625 | + self.assertEquals(mock_logs_to_process, logs_to_process) |
626 | + self.assertEquals(d.worker_count, worker_count) |
627 | + |
628 | + return multiprocess_collate_return |
629 | + |
630 | + log_processor.multiprocess_collate = mock_multiprocess_collate |
631 | + |
632 | + output = d.process_logs(mock_logs_to_process, mock_processed_files) |
633 | + self.assertEquals(get_output_return, output) |
634 | + finally: |
635 | + log_processor.multiprocess_collate = real_multiprocess_collate |
636 | + |
637 | + def test_run_once_get_processed_files_list_returns_none(self): |
638 | + class MockLogProcessor: |
639 | + def get_data_list(self, lookback_start, lookback_end, |
640 | + processed_files): |
641 | + raise unittest.TestCase.failureException, \ |
642 | + 'Method should not be called' |
643 | + |
644 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
645 | + def __init__(self): |
646 | + self.logger = DumbLogger() |
647 | + self.log_processor = MockLogProcessor() |
648 | + |
649 | + def get_lookback_interval(self): |
650 | + return None, None |
651 | + |
652 | + def get_processed_files_list(self): |
653 | + return None |
654 | + |
655 | + MockLogProcessorDaemon().run_once() |
656 | + |
657 | + def test_run_once_no_logs_to_process(self): |
658 | + class MockLogProcessor(): |
659 | + def __init__(self, daemon, test): |
660 | + self.daemon = daemon |
661 | + self.test = test |
662 | + |
663 | + def get_data_list(self, lookback_start, lookback_end, |
664 | + processed_files): |
665 | + self.test.assertEquals(self.daemon.lookback_start, |
666 | + lookback_start) |
667 | + self.test.assertEquals(self.daemon.lookback_end, |
668 | + lookback_end) |
669 | + self.test.assertEquals(self.daemon.processed_files, |
670 | + processed_files) |
671 | + return [] |
672 | + |
673 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
674 | + def __init__(self, test): |
675 | + self.logger = DumbLogger() |
676 | + self.log_processor = MockLogProcessor(self, test) |
677 | + self.lookback_start = 'lookback_start' |
678 | + self.lookback_end = 'lookback_end' |
679 | + self.processed_files = ['a', 'b', 'c'] |
680 | + |
681 | + def get_lookback_interval(self): |
682 | + return self.lookback_start, self.lookback_end |
683 | + |
684 | + def get_processed_files_list(self): |
685 | + return self.processed_files |
686 | + |
687 | + def process_logs(logs_to_process, processed_files): |
688 | + raise unittest.TestCase.failureException, \ |
689 | + 'Method should not be called' |
690 | + |
691 | + MockLogProcessorDaemon(self).run_once() |
pep8 line 35, log_processor.py
test_get_output seems broken. perhaps you forgot "in" on the self.assert_() calls (lines 645, 648)? another option is to sort the expected out and the given out and compare equality