Merge lp:~greglange/swift/lp725215 into lp:~hudson-openstack/swift/trunk
- lp725215
- Merge into trunk
Proposed by
Greg Lange
Status: | Merged |
---|---|
Approved by: | David Goetz |
Approved revision: | 256 |
Merged at revision: | 264 |
Proposed branch: | lp:~greglange/swift/lp725215 |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
820 lines (+641/-76) 2 files modified
swift/stats/log_processor.py (+231/-73) test/unit/stats/test_log_processor.py (+410/-3) |
To merge this branch: | bzr merge lp:~greglange/swift/lp725215 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Dickinson | Approve | ||
Greg Lange | Pending | ||
Review via email: mp+55637@code.launchpad.net |
This proposal supersedes a proposal from 2011-03-23.
Commit message
Description of the change
Refactored the log processing daemon to make it more testable.
Added tests for that.
I shouldn't have changed how it worked at all.
This needs to be tested on staging extensively before being pushed to production.
To post a comment you must log in.
Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal | # |
review:
Needs Fixing
Revision history for this message
Greg Lange (greglange) : Posted in a previous version of this proposal | # |
review:
Needs Resubmitting
lp:~greglange/swift/lp725215
updated
- 255. By Greg Lange <email address hidden>
-
fixed merge conflict
lp:~greglange/swift/lp725215
updated
- 256. By Greg Lange <email address hidden>
-
added doc strings to new methods
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'swift/stats/log_processor.py' |
2 | --- swift/stats/log_processor.py 2011-03-30 21:10:27 +0000 |
3 | +++ swift/stats/log_processor.py 2011-04-11 21:52:38 +0000 |
4 | @@ -30,6 +30,8 @@ |
5 | from swift.common.utils import get_logger, readconf |
6 | from swift.common.daemon import Daemon |
7 | |
8 | +now = datetime.datetime.now |
9 | + |
10 | |
11 | class BadFileDownload(Exception): |
12 | def __init__(self, status_code=None): |
13 | @@ -234,31 +236,46 @@ |
14 | self.log_processor_container = c.get('container_name', |
15 | 'log_processing_data') |
16 | self.worker_count = int(c.get('worker_count', '1')) |
17 | - |
18 | - def run_once(self, *args, **kwargs): |
19 | - for k in 'lookback_hours lookback_window'.split(): |
20 | - if kwargs[k] is not None: |
21 | - setattr(self, k, kwargs[k]) |
22 | - |
23 | - self.logger.info(_("Beginning log processing")) |
24 | - start = time.time() |
25 | + self._keylist_mapping = None |
26 | + self.processed_files_filename = 'processed_files.pickle.gz' |
27 | + |
28 | + def get_lookback_interval(self): |
29 | + """ |
30 | + :returns: lookback_start, lookback_end. |
31 | + |
32 | + Both or just lookback_end can be None. Otherwise, returns strings |
33 | + of the form 'YYYYMMDDHH'. The interval returned is used as bounds |
34 | + when looking for logs to processes. |
35 | + |
36 | + A returned None means don't limit the log files examined on that |
37 | + side of the interval. |
38 | + """ |
39 | + |
40 | if self.lookback_hours == 0: |
41 | lookback_start = None |
42 | lookback_end = None |
43 | else: |
44 | delta_hours = datetime.timedelta(hours=self.lookback_hours) |
45 | - lookback_start = datetime.datetime.now() - delta_hours |
46 | + lookback_start = now() - delta_hours |
47 | lookback_start = lookback_start.strftime('%Y%m%d%H') |
48 | if self.lookback_window == 0: |
49 | lookback_end = None |
50 | else: |
51 | delta_window = datetime.timedelta(hours=self.lookback_window) |
52 | - lookback_end = datetime.datetime.now() - \ |
53 | + lookback_end = now() - \ |
54 | delta_hours + \ |
55 | delta_window |
56 | lookback_end = lookback_end.strftime('%Y%m%d%H') |
57 | - self.logger.debug('lookback_start: %s' % lookback_start) |
58 | - self.logger.debug('lookback_end: %s' % lookback_end) |
59 | + return lookback_start, lookback_end |
60 | + |
61 | + def get_processed_files_list(self): |
62 | + """ |
63 | + :returns: a set of files that have already been processed or returns |
64 | + None on error. |
65 | + |
66 | + Downloads the set from the stats account. Creates an empty set if |
67 | + the an existing file cannot be found. |
68 | + """ |
69 | try: |
70 | # Note: this file (or data set) will grow without bound. |
71 | # In practice, if it becomes a problem (say, after many months of |
72 | @@ -266,44 +283,52 @@ |
73 | # entries. Automatically pruning on each run could be dangerous. |
74 | # There is not a good way to determine when an old entry should be |
75 | # pruned (lookback_hours could be set to anything and could change) |
76 | - processed_files_stream = self.log_processor.get_object_data( |
77 | - self.log_processor_account, |
78 | - self.log_processor_container, |
79 | - 'processed_files.pickle.gz', |
80 | - compressed=True) |
81 | - buf = '\n'.join(x for x in processed_files_stream) |
82 | + stream = self.log_processor.get_object_data( |
83 | + self.log_processor_account, |
84 | + self.log_processor_container, |
85 | + self.processed_files_filename, |
86 | + compressed=True) |
87 | + buf = '\n'.join(x for x in stream) |
88 | if buf: |
89 | - already_processed_files = cPickle.loads(buf) |
90 | + files = cPickle.loads(buf) |
91 | else: |
92 | - already_processed_files = set() |
93 | + return None |
94 | except BadFileDownload, err: |
95 | if err.status_code == 404: |
96 | - already_processed_files = set() |
97 | + files = set() |
98 | else: |
99 | - self.logger.error(_('Log processing unable to load list of ' |
100 | - 'already processed log files')) |
101 | - return |
102 | - self.logger.debug(_('found %d processed files') % \ |
103 | - len(already_processed_files)) |
104 | - logs_to_process = self.log_processor.get_data_list(lookback_start, |
105 | - lookback_end, |
106 | - already_processed_files) |
107 | - self.logger.info(_('loaded %d files to process') % |
108 | - len(logs_to_process)) |
109 | - if not logs_to_process: |
110 | - self.logger.info(_("Log processing done (%0.2f minutes)") % |
111 | - ((time.time() - start) / 60)) |
112 | - return |
113 | - |
114 | - # map |
115 | - processor_args = (self.total_conf, self.logger) |
116 | - results = multiprocess_collate(processor_args, logs_to_process, |
117 | - self.worker_count) |
118 | - |
119 | - #reduce |
120 | + return None |
121 | + return files |
122 | + |
123 | + def get_aggregate_data(self, processed_files, input_data): |
124 | + """ |
125 | + Aggregates stats data by account/hour, summing as needed. |
126 | + |
127 | + :param processed_files: set of processed files |
128 | + :param input_data: is the output from multiprocess_collate/the plugins. |
129 | + |
130 | + :returns: A dict containing data aggregated from the input_data |
131 | + passed in. |
132 | + |
133 | + The dict returned has tuple keys of the form: |
134 | + (account, year, month, day, hour) |
135 | + The dict returned has values that are dicts with items of this |
136 | + form: |
137 | + key:field_value |
138 | + - key corresponds to something in one of the plugin's keylist |
139 | + mapping, something like the tuple (source, level, verb, code) |
140 | + - field_value is the sum of the field_values for the |
141 | + corresponding values in the input |
142 | + |
143 | + Both input_data and the dict returned are hourly aggregations of |
144 | + stats. |
145 | + |
146 | + Multiple values for the same (account, hour, tuple key) found in |
147 | + input_data are summed in the dict returned. |
148 | + """ |
149 | + |
150 | aggr_data = {} |
151 | - processed_files = already_processed_files |
152 | - for item, data in results: |
153 | + for item, data in input_data: |
154 | # since item contains the plugin and the log name, new plugins will |
155 | # "reprocess" the file and the results will be in the final csv. |
156 | processed_files.add(item) |
157 | @@ -315,14 +340,30 @@ |
158 | # processing plugins need to realize this |
159 | existing_data[i] = current + j |
160 | aggr_data[k] = existing_data |
161 | - |
162 | - # group |
163 | - # reduce a large number of keys in aggr_data[k] to a small number of |
164 | - # output keys |
165 | - keylist_mapping = self.log_processor.generate_keylist_mapping() |
166 | + return aggr_data |
167 | + |
168 | + def get_final_info(self, aggr_data): |
169 | + """ |
170 | + Aggregates data from aggr_data based on the keylist mapping. |
171 | + |
172 | + :param aggr_data: The results of the get_aggregate_data function. |
173 | + :returns: a dict of further aggregated data |
174 | + |
175 | + The dict returned has keys of the form: |
176 | + (account, year, month, day, hour) |
177 | + The dict returned has values that are dicts with items of this |
178 | + form: |
179 | + 'field_name': field_value (int) |
180 | + |
181 | + Data is aggregated as specified by the keylist mapping. The |
182 | + keylist mapping specifies which keys to combine in aggr_data |
183 | + and the final field_names for these combined keys in the dict |
184 | + returned. Fields combined are summed. |
185 | + """ |
186 | + |
187 | final_info = collections.defaultdict(dict) |
188 | for account, data in aggr_data.items(): |
189 | - for key, mapping in keylist_mapping.items(): |
190 | + for key, mapping in self.keylist_mapping.items(): |
191 | if isinstance(mapping, (list, set)): |
192 | value = 0 |
193 | for k in mapping: |
194 | @@ -336,37 +377,154 @@ |
195 | except KeyError: |
196 | value = 0 |
197 | final_info[account][key] = value |
198 | - |
199 | - # output |
200 | - sorted_keylist_mapping = sorted(keylist_mapping) |
201 | - columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) |
202 | - out_buf = [columns] |
203 | + return final_info |
204 | + |
205 | + def store_processed_files_list(self, processed_files): |
206 | + """ |
207 | + Stores the proccessed files list in the stats account. |
208 | + |
209 | + :param processed_files: set of processed files |
210 | + """ |
211 | + |
212 | + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) |
213 | + f = cStringIO.StringIO(s) |
214 | + self.log_processor.internal_proxy.upload_file(f, |
215 | + self.log_processor_account, |
216 | + self.log_processor_container, |
217 | + self.processed_files_filename) |
218 | + |
219 | + def get_output(self, final_info): |
220 | + """ |
221 | + :returns: a list of rows to appear in the csv file. |
222 | + |
223 | + The first row contains the column headers for the rest of the |
224 | + rows in the returned list. |
225 | + |
226 | + Each row after the first row corresponds to an account's data |
227 | + for that hour. |
228 | + """ |
229 | + |
230 | + sorted_keylist_mapping = sorted(self.keylist_mapping) |
231 | + columns = ['data_ts', 'account'] + sorted_keylist_mapping |
232 | + output = [columns] |
233 | for (account, year, month, day, hour), d in final_info.items(): |
234 | - data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) |
235 | - row = [data_ts] |
236 | - row.append('%s' % account) |
237 | + data_ts = '%04d/%02d/%02d %02d:00:00' % \ |
238 | + (int(year), int(month), int(day), int(hour)) |
239 | + row = [data_ts, '%s' % (account)] |
240 | for k in sorted_keylist_mapping: |
241 | - row.append('%s' % d[k]) |
242 | - out_buf.append(','.join(row)) |
243 | - out_buf = '\n'.join(out_buf) |
244 | + row.append(str(d[k])) |
245 | + output.append(row) |
246 | + return output |
247 | + |
248 | + def store_output(self, output): |
249 | + """ |
250 | + Takes the a list of rows and stores a csv file of the values in the |
251 | + stats account. |
252 | + |
253 | + :param output: list of rows to appear in the csv file |
254 | + |
255 | + This csv file is final product of this script. |
256 | + """ |
257 | + |
258 | + out_buf = '\n'.join([','.join(row) for row in output]) |
259 | h = hashlib.md5(out_buf).hexdigest() |
260 | upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h |
261 | f = cStringIO.StringIO(out_buf) |
262 | self.log_processor.internal_proxy.upload_file(f, |
263 | - self.log_processor_account, |
264 | - self.log_processor_container, |
265 | - upload_name) |
266 | - |
267 | - # cleanup |
268 | - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) |
269 | - f = cStringIO.StringIO(s) |
270 | - self.log_processor.internal_proxy.upload_file(f, |
271 | - self.log_processor_account, |
272 | - self.log_processor_container, |
273 | - 'processed_files.pickle.gz') |
274 | + self.log_processor_account, |
275 | + self.log_processor_container, |
276 | + upload_name) |
277 | + |
278 | + @property |
279 | + def keylist_mapping(self): |
280 | + """ |
281 | + :returns: the keylist mapping. |
282 | + |
283 | + The keylist mapping determines how the stats fields are aggregated in |
284 | + the final aggregation step. |
285 | + """ |
286 | + |
287 | + if self._keylist_mapping == None: |
288 | + self._keylist_mapping = \ |
289 | + self.log_processor.generate_keylist_mapping() |
290 | + return self._keylist_mapping |
291 | + |
292 | + def process_logs(self, logs_to_process, processed_files): |
293 | + """ |
294 | + :param logs_to_process: list of logs to process |
295 | + :param processed_files: set of processed files |
296 | + |
297 | + :returns: returns a list of rows of processed data. |
298 | + |
299 | + The first row is the column headers. The rest of the rows contain |
300 | + hourly aggregate data for the account specified in the row. |
301 | + |
302 | + Files processed are added to the processed_files set. |
303 | + |
304 | + When a large data structure is no longer needed, it is deleted in |
305 | + an effort to conserve memory. |
306 | + """ |
307 | + |
308 | + # map |
309 | + processor_args = (self.total_conf, self.logger) |
310 | + results = multiprocess_collate(processor_args, logs_to_process, |
311 | + self.worker_count) |
312 | + |
313 | + # reduce |
314 | + aggr_data = self.get_aggregate_data(processed_files, results) |
315 | + del results |
316 | + |
317 | + # group |
318 | + # reduce a large number of keys in aggr_data[k] to a small |
319 | + # number of output keys |
320 | + final_info = self.get_final_info(aggr_data) |
321 | + del aggr_data |
322 | + |
323 | + # output |
324 | + return self.get_output(final_info) |
325 | + |
326 | + def run_once(self, *args, **kwargs): |
327 | + """ |
328 | + Process log files that fall within the lookback interval. |
329 | + |
330 | + Upload resulting csv file to stats account. |
331 | + |
332 | + Update processed files list and upload to stats account. |
333 | + """ |
334 | + |
335 | + for k in 'lookback_hours lookback_window'.split(): |
336 | + if k in kwargs and kwargs[k] is not None: |
337 | + setattr(self, k, kwargs[k]) |
338 | + |
339 | + start = time.time() |
340 | + self.logger.info(_("Beginning log processing")) |
341 | + |
342 | + lookback_start, lookback_end = self.get_lookback_interval() |
343 | + self.logger.debug('lookback_start: %s' % lookback_start) |
344 | + self.logger.debug('lookback_end: %s' % lookback_end) |
345 | + |
346 | + processed_files = self.get_processed_files_list() |
347 | + if processed_files == None: |
348 | + self.logger.error(_('Log processing unable to load list of ' |
349 | + 'already processed log files')) |
350 | + return |
351 | + self.logger.debug(_('found %d processed files') % |
352 | + len(processed_files)) |
353 | + |
354 | + logs_to_process = self.log_processor.get_data_list(lookback_start, |
355 | + lookback_end, processed_files) |
356 | + self.logger.info(_('loaded %d files to process') % |
357 | + len(logs_to_process)) |
358 | + |
359 | + if logs_to_process: |
360 | + output = self.process_logs(logs_to_process, processed_files) |
361 | + self.store_output(output) |
362 | + del output |
363 | + |
364 | + self.store_processed_files_list(processed_files) |
365 | |
366 | self.logger.info(_("Log processing done (%0.2f minutes)") % |
367 | - ((time.time() - start) / 60)) |
368 | + ((time.time() - start) / 60)) |
369 | |
370 | |
371 | def multiprocess_collate(processor_args, logs_to_process, worker_count): |
372 | |
373 | === modified file 'test/unit/stats/test_log_processor.py' |
374 | --- test/unit/stats/test_log_processor.py 2011-03-17 22:22:43 +0000 |
375 | +++ test/unit/stats/test_log_processor.py 2011-04-11 21:52:38 +0000 |
376 | @@ -16,6 +16,10 @@ |
377 | import unittest |
378 | from test.unit import tmpfile |
379 | import Queue |
380 | +import datetime |
381 | +import hashlib |
382 | +import pickle |
383 | +import time |
384 | |
385 | from swift.common import internal_proxy |
386 | from swift.stats import log_processor |
387 | @@ -26,7 +30,6 @@ |
388 | def __init__(self, *args, **kwargs): |
389 | pass |
390 | |
391 | - |
392 | class DumbLogger(object): |
393 | def __getattr__(self, n): |
394 | return self.foo |
395 | @@ -77,7 +80,7 @@ |
396 | return self.code, data() |
397 | |
398 | class TestLogProcessor(unittest.TestCase): |
399 | - |
400 | + |
401 | access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ |
402 | '09/Jul/2010/04/14/30 GET '\ |
403 | '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ |
404 | @@ -85,7 +88,7 @@ |
405 | '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' |
406 | stats_test_line = 'account,1,2,3' |
407 | proxy_config = {'log-processor': { |
408 | - |
409 | + |
410 | } |
411 | } |
412 | |
413 | @@ -426,3 +429,407 @@ |
414 | finally: |
415 | log_processor.LogProcessor._internal_proxy = None |
416 | log_processor.LogProcessor.get_object_data = orig_get_object_data |
417 | + |
418 | +class TestLogProcessorDaemon(unittest.TestCase): |
419 | + |
420 | + def test_get_lookback_interval(self): |
421 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
422 | + def __init__(self, lookback_hours, lookback_window): |
423 | + self.lookback_hours = lookback_hours |
424 | + self.lookback_window = lookback_window |
425 | + |
426 | + try: |
427 | + d = datetime.datetime |
428 | + |
429 | + for x in [ |
430 | + [d(2011, 1, 1), 0, 0, None, None], |
431 | + [d(2011, 1, 1), 120, 0, '2010122700', None], |
432 | + [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'], |
433 | + [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'], |
434 | + [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'], |
435 | + [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'], |
436 | + ]: |
437 | + |
438 | + log_processor.now = lambda: x[0] |
439 | + |
440 | + d = MockLogProcessorDaemon(x[1], x[2]) |
441 | + self.assertEquals((x[3], x[4]), d.get_lookback_interval()) |
442 | + finally: |
443 | + log_processor.now = datetime.datetime.now |
444 | + |
445 | + def test_get_processed_files_list(self): |
446 | + class MockLogProcessor(): |
447 | + def __init__(self, stream): |
448 | + self.stream = stream |
449 | + |
450 | + def get_object_data(self, *args, **kwargs): |
451 | + return self.stream |
452 | + |
453 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
454 | + def __init__(self, stream): |
455 | + self.log_processor = MockLogProcessor(stream) |
456 | + self.log_processor_account = 'account' |
457 | + self.log_processor_container = 'container' |
458 | + self.processed_files_filename = 'filename' |
459 | + |
460 | + file_list = set(['a', 'b', 'c']) |
461 | + |
462 | + for s, l in [['', None], |
463 | + [pickle.dumps(set()).split('\n'), set()], |
464 | + [pickle.dumps(file_list).split('\n'), file_list], |
465 | + ]: |
466 | + |
467 | + self.assertEquals(l, |
468 | + MockLogProcessorDaemon(s).get_processed_files_list()) |
469 | + |
470 | + def test_get_processed_files_list_bad_file_downloads(self): |
471 | + class MockLogProcessor(): |
472 | + def __init__(self, status_code): |
473 | + self.err = log_processor.BadFileDownload(status_code) |
474 | + |
475 | + def get_object_data(self, *a, **k): |
476 | + raise self.err |
477 | + |
478 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
479 | + def __init__(self, status_code): |
480 | + self.log_processor = MockLogProcessor(status_code) |
481 | + self.log_processor_account = 'account' |
482 | + self.log_processor_container = 'container' |
483 | + self.processed_files_filename = 'filename' |
484 | + |
485 | + for c, l in [[404, set()], [503, None], [None, None]]: |
486 | + self.assertEquals(l, |
487 | + MockLogProcessorDaemon(c).get_processed_files_list()) |
488 | + |
489 | + def test_get_aggregate_data(self): |
490 | + # when run "for real" |
491 | + # the various keys/values in the input and output |
492 | + # dictionaries are often not simple strings |
493 | + # for testing we can use keys that are easier to work with |
494 | + |
495 | + processed_files = set() |
496 | + |
497 | + data_in = [ |
498 | + ['file1', { |
499 | + 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3}, |
500 | + 'acct1_time2': {'field1': 4, 'field2': 5}, |
501 | + 'acct2_time1': {'field1': 6, 'field2': 7}, |
502 | + 'acct3_time3': {'field1': 8, 'field2': 9}, |
503 | + } |
504 | + ], |
505 | + ['file2', {'acct1_time1': {'field1': 10}}], |
506 | + ] |
507 | + |
508 | + expected_data_out = { |
509 | + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3}, |
510 | + 'acct1_time2': {'field1': 4, 'field2': 5}, |
511 | + 'acct2_time1': {'field1': 6, 'field2': 7}, |
512 | + 'acct3_time3': {'field1': 8, 'field2': 9}, |
513 | + } |
514 | + |
515 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
516 | + def __init__(self): |
517 | + pass |
518 | + |
519 | + d = MockLogProcessorDaemon() |
520 | + data_out = d.get_aggregate_data(processed_files, data_in) |
521 | + |
522 | + for k, v in expected_data_out.items(): |
523 | + self.assertEquals(v, data_out[k]) |
524 | + |
525 | + self.assertEquals(set(['file1', 'file2']), processed_files) |
526 | + |
527 | + def test_get_final_info(self): |
528 | + # when run "for real" |
529 | + # the various keys/values in the input and output |
530 | + # dictionaries are often not simple strings |
531 | + # for testing we can use keys/values that are easier to work with |
532 | + |
533 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
534 | + def __init__(self): |
535 | + self._keylist_mapping = { |
536 | + 'out_field1':['field1', 'field2', 'field3'], |
537 | + 'out_field2':['field2', 'field3'], |
538 | + 'out_field3':['field3'], |
539 | + 'out_field4':'field4', |
540 | + 'out_field5':['field6', 'field7', 'field8'], |
541 | + 'out_field6':['field6'], |
542 | + 'out_field7':'field7', |
543 | + } |
544 | + |
545 | + data_in = { |
546 | + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3, |
547 | + 'field4': 8, 'field5': 11}, |
548 | + 'acct1_time2': {'field1': 4, 'field2': 5}, |
549 | + 'acct2_time1': {'field1': 6, 'field2': 7}, |
550 | + 'acct3_time3': {'field1': 8, 'field2': 9}, |
551 | + } |
552 | + |
553 | + expected_data_out = { |
554 | + 'acct1_time1': {'out_field1': 16, 'out_field2': 5, |
555 | + 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, |
556 | + 'out_field6': 0, 'out_field7': 0,}, |
557 | + 'acct1_time2': {'out_field1': 9, 'out_field2': 5, |
558 | + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, |
559 | + 'out_field6': 0, 'out_field7': 0,}, |
560 | + 'acct2_time1': {'out_field1': 13, 'out_field2': 7, |
561 | + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, |
562 | + 'out_field6': 0, 'out_field7': 0,}, |
563 | + 'acct3_time3': {'out_field1': 17, 'out_field2': 9, |
564 | + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, |
565 | + 'out_field6': 0, 'out_field7': 0,}, |
566 | + } |
567 | + |
568 | + self.assertEquals(expected_data_out, |
569 | + MockLogProcessorDaemon().get_final_info(data_in)) |
570 | + |
571 | + def test_store_processed_files_list(self): |
572 | + class MockInternalProxy: |
573 | + def __init__(self, test, daemon, processed_files): |
574 | + self.test = test |
575 | + self.daemon = daemon |
576 | + self.processed_files = processed_files |
577 | + |
578 | + def upload_file(self, f, account, container, filename): |
579 | + self.test.assertEquals(self.processed_files, |
580 | + pickle.loads(f.getvalue())) |
581 | + self.test.assertEquals(self.daemon.log_processor_account, |
582 | + account) |
583 | + self.test.assertEquals(self.daemon.log_processor_container, |
584 | + container) |
585 | + self.test.assertEquals(self.daemon.processed_files_filename, |
586 | + filename) |
587 | + |
588 | + class MockLogProcessor: |
589 | + def __init__(self, test, daemon, processed_files): |
590 | + self.internal_proxy = MockInternalProxy(test, daemon, |
591 | + processed_files) |
592 | + |
593 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
594 | + def __init__(self, test, processed_files): |
595 | + self.log_processor = \ |
596 | + MockLogProcessor(test, self, processed_files) |
597 | + self.log_processor_account = 'account' |
598 | + self.log_processor_container = 'container' |
599 | + self.processed_files_filename = 'filename' |
600 | + |
601 | + processed_files = set(['a', 'b', 'c']) |
602 | + MockLogProcessorDaemon(self, processed_files).\ |
603 | + store_processed_files_list(processed_files) |
604 | + |
605 | + def test_get_output(self): |
606 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
607 | + def __init__(self): |
608 | + self._keylist_mapping = {'a':None, 'b':None, 'c':None} |
609 | + |
610 | + data_in = { |
611 | + ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3}, |
612 | + ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30}, |
613 | + ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12}, |
614 | + ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25}, |
615 | + } |
616 | + |
617 | + expected_data_out = [ |
618 | + ['data_ts', 'account', 'a', 'b', 'c'], |
619 | + ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'], |
620 | + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], |
621 | + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], |
622 | + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], |
623 | + ] |
624 | + |
625 | + data_out = MockLogProcessorDaemon().get_output(data_in) |
626 | + self.assertEquals(expected_data_out[0], data_out[0]) |
627 | + |
628 | + for row in data_out[1:]: |
629 | + self.assert_(row in expected_data_out) |
630 | + |
631 | + for row in expected_data_out[1:]: |
632 | + self.assert_(row in data_out) |
633 | + |
634 | + def test_store_output(self): |
635 | + try: |
636 | + real_strftime = time.strftime |
637 | + mock_strftime_return = '2010/03/02/01/' |
638 | + def mock_strftime(format): |
639 | + self.assertEquals('%Y/%m/%d/%H/', format) |
640 | + return mock_strftime_return |
641 | + log_processor.time.strftime = mock_strftime |
642 | + |
643 | + data_in = [ |
644 | + ['data_ts', 'account', 'a', 'b', 'c'], |
645 | + ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'], |
646 | + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], |
647 | + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], |
648 | + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], |
649 | + ] |
650 | + |
651 | + expected_output = '\n'.join([','.join(row) for row in data_in]) |
652 | + h = hashlib.md5(expected_output).hexdigest() |
653 | + expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h) |
654 | + |
655 | + class MockInternalProxy: |
656 | + def __init__(self, test, daemon, expected_filename, |
657 | + expected_output): |
658 | + self.test = test |
659 | + self.daemon = daemon |
660 | + self.expected_filename = expected_filename |
661 | + self.expected_output = expected_output |
662 | + |
663 | + def upload_file(self, f, account, container, filename): |
664 | + self.test.assertEquals(self.daemon.log_processor_account, |
665 | + account) |
666 | + self.test.assertEquals(self.daemon.log_processor_container, |
667 | + container) |
668 | + self.test.assertEquals(self.expected_filename, filename) |
669 | + self.test.assertEquals(self.expected_output, f.getvalue()) |
670 | + |
671 | + class MockLogProcessor: |
672 | + def __init__(self, test, daemon, expected_filename, |
673 | + expected_output): |
674 | + self.internal_proxy = MockInternalProxy(test, daemon, |
675 | + expected_filename, expected_output) |
676 | + |
677 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
678 | + def __init__(self, test, expected_filename, expected_output): |
679 | + self.log_processor = MockLogProcessor(test, self, |
680 | + expected_filename, expected_output) |
681 | + self.log_processor_account = 'account' |
682 | + self.log_processor_container = 'container' |
683 | + self.processed_files_filename = 'filename' |
684 | + |
685 | + MockLogProcessorDaemon(self, expected_filename, expected_output).\ |
686 | + store_output(data_in) |
687 | + finally: |
688 | + log_processor.time.strftime = real_strftime |
689 | + |
690 | + def test_keylist_mapping(self): |
691 | + # Kind of lame test to see if the propery is both |
692 | + # generated by a particular method and cached properly. |
693 | + # The method that actually generates the mapping is |
694 | + # tested elsewhere. |
695 | + |
696 | + value_return = 'keylist_mapping' |
697 | + class MockLogProcessor: |
698 | + def __init__(self): |
699 | + self.call_count = 0 |
700 | + |
701 | + def generate_keylist_mapping(self): |
702 | + self.call_count += 1 |
703 | + return value_return |
704 | + |
705 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
706 | + def __init__(self): |
707 | + self.log_processor = MockLogProcessor() |
708 | + self._keylist_mapping = None |
709 | + |
710 | + d = MockLogProcessorDaemon() |
711 | + self.assertEquals(value_return, d.keylist_mapping) |
712 | + self.assertEquals(value_return, d.keylist_mapping) |
713 | + self.assertEquals(1, d.log_processor.call_count) |
714 | + |
715 | + def test_process_logs(self): |
716 | + try: |
717 | + mock_logs_to_process = 'logs_to_process' |
718 | + mock_processed_files = 'processed_files' |
719 | + |
720 | + real_multiprocess_collate = log_processor.multiprocess_collate |
721 | + multiprocess_collate_return = 'multiprocess_collate_return' |
722 | + |
723 | + get_aggregate_data_return = 'get_aggregate_data_return' |
724 | + get_final_info_return = 'get_final_info_return' |
725 | + get_output_return = 'get_output_return' |
726 | + |
727 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
728 | + def __init__(self, test): |
729 | + self.test = test |
730 | + self.total_conf = 'total_conf' |
731 | + self.logger = 'logger' |
732 | + self.worker_count = 'worker_count' |
733 | + |
734 | + def get_aggregate_data(self, processed_files, results): |
735 | + self.test.assertEquals(mock_processed_files, processed_files) |
736 | + self.test.assertEquals(multiprocess_collate_return, results) |
737 | + return get_aggregate_data_return |
738 | + |
739 | + def get_final_info(self, aggr_data): |
740 | + self.test.assertEquals(get_aggregate_data_return, aggr_data) |
741 | + return get_final_info_return |
742 | + |
743 | + def get_output(self, final_info): |
744 | + self.test.assertEquals(get_final_info_return, final_info) |
745 | + return get_output_return |
746 | + |
747 | + d = MockLogProcessorDaemon(self) |
748 | + |
749 | + def mock_multiprocess_collate(processor_args, logs_to_process, |
750 | + worker_count): |
751 | + self.assertEquals(d.total_conf, processor_args[0]) |
752 | + self.assertEquals(d.logger, processor_args[1]) |
753 | + |
754 | + self.assertEquals(mock_logs_to_process, logs_to_process) |
755 | + self.assertEquals(d.worker_count, worker_count) |
756 | + |
757 | + return multiprocess_collate_return |
758 | + |
759 | + log_processor.multiprocess_collate = mock_multiprocess_collate |
760 | + |
761 | + output = d.process_logs(mock_logs_to_process, mock_processed_files) |
762 | + self.assertEquals(get_output_return, output) |
763 | + finally: |
764 | + log_processor.multiprocess_collate = real_multiprocess_collate |
765 | + |
766 | + def test_run_once_get_processed_files_list_returns_none(self): |
767 | + class MockLogProcessor: |
768 | + def get_data_list(self, lookback_start, lookback_end, |
769 | + processed_files): |
770 | + raise unittest.TestCase.failureException, \ |
771 | + 'Method should not be called' |
772 | + |
773 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
774 | + def __init__(self): |
775 | + self.logger = DumbLogger() |
776 | + self.log_processor = MockLogProcessor() |
777 | + |
778 | + def get_lookback_interval(self): |
779 | + return None, None |
780 | + |
781 | + def get_processed_files_list(self): |
782 | + return None |
783 | + |
784 | + MockLogProcessorDaemon().run_once() |
785 | + |
786 | + def test_run_once_no_logs_to_process(self): |
787 | + class MockLogProcessor(): |
788 | + def __init__(self, daemon, test): |
789 | + self.daemon = daemon |
790 | + self.test = test |
791 | + |
792 | + def get_data_list(self, lookback_start, lookback_end, |
793 | + processed_files): |
794 | + self.test.assertEquals(self.daemon.lookback_start, |
795 | + lookback_start) |
796 | + self.test.assertEquals(self.daemon.lookback_end, |
797 | + lookback_end) |
798 | + self.test.assertEquals(self.daemon.processed_files, |
799 | + processed_files) |
800 | + return [] |
801 | + |
802 | + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): |
803 | + def __init__(self, test): |
804 | + self.logger = DumbLogger() |
805 | + self.log_processor = MockLogProcessor(self, test) |
806 | + self.lookback_start = 'lookback_start' |
807 | + self.lookback_end = 'lookback_end' |
808 | + self.processed_files = ['a', 'b', 'c'] |
809 | + |
810 | + def get_lookback_interval(self): |
811 | + return self.lookback_start, self.lookback_end |
812 | + |
813 | + def get_processed_files_list(self): |
814 | + return self.processed_files |
815 | + |
816 | + def process_logs(logs_to_process, processed_files): |
817 | + raise unittest.TestCase.failureException, \ |
818 | + 'Method should not be called' |
819 | + |
820 | + MockLogProcessorDaemon(self).run_once() |
pep8 line 35, log_processor.py
test_get_output seems broken. perhaps you forgot "in" on the self.assert_() calls (lines 645, 648)? another option is to sort the expected out and the given out and compare equality