Merge lp:~greglange/swift/lp725215 into lp:~hudson-openstack/swift/trunk
- lp725215
- Merge into trunk
Proposed by
Greg Lange
Status: | Merged |
---|---|
Approved by: | David Goetz |
Approved revision: | 256 |
Merged at revision: | 264 |
Proposed branch: | lp:~greglange/swift/lp725215 |
Merge into: | lp:~hudson-openstack/swift/trunk |
Diff against target: |
820 lines (+641/-76) 2 files modified
swift/stats/log_processor.py (+231/-73) test/unit/stats/test_log_processor.py (+410/-3) |
To merge this branch: | bzr merge lp:~greglange/swift/lp725215 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Dickinson | Approve | ||
Greg Lange | Pending | ||
Review via email: mp+55637@code.launchpad.net |
This proposal supersedes a proposal from 2011-03-23.
Commit message
Description of the change
Refactored the log processing daemon to make it more testable.
Added tests for that.
I shouldn't have changed how it worked at all.
This needs to be tested on staging extensively before being pushed to production.
To post a comment you must log in.
Revision history for this message
John Dickinson (notmyname) wrote : Posted in a previous version of this proposal | # |
review:
Needs Fixing
Revision history for this message
Greg Lange (greglange) : Posted in a previous version of this proposal | # |
review:
Needs Resubmitting
lp:~greglange/swift/lp725215
updated
- 255. By Greg Lange <email address hidden>
-
fixed merge conflict
lp:~greglange/swift/lp725215
updated
- 256. By Greg Lange <email address hidden>
-
added doc strings to new methods
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'swift/stats/log_processor.py' | |||
2 | --- swift/stats/log_processor.py 2011-03-30 21:10:27 +0000 | |||
3 | +++ swift/stats/log_processor.py 2011-04-11 21:52:38 +0000 | |||
4 | @@ -30,6 +30,8 @@ | |||
5 | 30 | from swift.common.utils import get_logger, readconf | 30 | from swift.common.utils import get_logger, readconf |
6 | 31 | from swift.common.daemon import Daemon | 31 | from swift.common.daemon import Daemon |
7 | 32 | 32 | ||
8 | 33 | now = datetime.datetime.now | ||
9 | 34 | |||
10 | 33 | 35 | ||
11 | 34 | class BadFileDownload(Exception): | 36 | class BadFileDownload(Exception): |
12 | 35 | def __init__(self, status_code=None): | 37 | def __init__(self, status_code=None): |
13 | @@ -234,31 +236,46 @@ | |||
14 | 234 | self.log_processor_container = c.get('container_name', | 236 | self.log_processor_container = c.get('container_name', |
15 | 235 | 'log_processing_data') | 237 | 'log_processing_data') |
16 | 236 | self.worker_count = int(c.get('worker_count', '1')) | 238 | self.worker_count = int(c.get('worker_count', '1')) |
25 | 237 | 239 | self._keylist_mapping = None | |
26 | 238 | def run_once(self, *args, **kwargs): | 240 | self.processed_files_filename = 'processed_files.pickle.gz' |
27 | 239 | for k in 'lookback_hours lookback_window'.split(): | 241 | |
28 | 240 | if kwargs[k] is not None: | 242 | def get_lookback_interval(self): |
29 | 241 | setattr(self, k, kwargs[k]) | 243 | """ |
30 | 242 | 244 | :returns: lookback_start, lookback_end. | |
31 | 243 | self.logger.info(_("Beginning log processing")) | 245 | |
32 | 244 | start = time.time() | 246 | Both or just lookback_end can be None. Otherwise, returns strings |
33 | 247 | of the form 'YYYYMMDDHH'. The interval returned is used as bounds | ||
34 | 248 | when looking for logs to processes. | ||
35 | 249 | |||
36 | 250 | A returned None means don't limit the log files examined on that | ||
37 | 251 | side of the interval. | ||
38 | 252 | """ | ||
39 | 253 | |||
40 | 245 | if self.lookback_hours == 0: | 254 | if self.lookback_hours == 0: |
41 | 246 | lookback_start = None | 255 | lookback_start = None |
42 | 247 | lookback_end = None | 256 | lookback_end = None |
43 | 248 | else: | 257 | else: |
44 | 249 | delta_hours = datetime.timedelta(hours=self.lookback_hours) | 258 | delta_hours = datetime.timedelta(hours=self.lookback_hours) |
46 | 250 | lookback_start = datetime.datetime.now() - delta_hours | 259 | lookback_start = now() - delta_hours |
47 | 251 | lookback_start = lookback_start.strftime('%Y%m%d%H') | 260 | lookback_start = lookback_start.strftime('%Y%m%d%H') |
48 | 252 | if self.lookback_window == 0: | 261 | if self.lookback_window == 0: |
49 | 253 | lookback_end = None | 262 | lookback_end = None |
50 | 254 | else: | 263 | else: |
51 | 255 | delta_window = datetime.timedelta(hours=self.lookback_window) | 264 | delta_window = datetime.timedelta(hours=self.lookback_window) |
53 | 256 | lookback_end = datetime.datetime.now() - \ | 265 | lookback_end = now() - \ |
54 | 257 | delta_hours + \ | 266 | delta_hours + \ |
55 | 258 | delta_window | 267 | delta_window |
56 | 259 | lookback_end = lookback_end.strftime('%Y%m%d%H') | 268 | lookback_end = lookback_end.strftime('%Y%m%d%H') |
59 | 260 | self.logger.debug('lookback_start: %s' % lookback_start) | 269 | return lookback_start, lookback_end |
60 | 261 | self.logger.debug('lookback_end: %s' % lookback_end) | 270 | |
61 | 271 | def get_processed_files_list(self): | ||
62 | 272 | """ | ||
63 | 273 | :returns: a set of files that have already been processed or returns | ||
64 | 274 | None on error. | ||
65 | 275 | |||
66 | 276 | Downloads the set from the stats account. Creates an empty set if | ||
67 | 277 | the an existing file cannot be found. | ||
68 | 278 | """ | ||
69 | 262 | try: | 279 | try: |
70 | 263 | # Note: this file (or data set) will grow without bound. | 280 | # Note: this file (or data set) will grow without bound. |
71 | 264 | # In practice, if it becomes a problem (say, after many months of | 281 | # In practice, if it becomes a problem (say, after many months of |
72 | @@ -266,44 +283,52 @@ | |||
73 | 266 | # entries. Automatically pruning on each run could be dangerous. | 283 | # entries. Automatically pruning on each run could be dangerous. |
74 | 267 | # There is not a good way to determine when an old entry should be | 284 | # There is not a good way to determine when an old entry should be |
75 | 268 | # pruned (lookback_hours could be set to anything and could change) | 285 | # pruned (lookback_hours could be set to anything and could change) |
82 | 269 | processed_files_stream = self.log_processor.get_object_data( | 286 | stream = self.log_processor.get_object_data( |
83 | 270 | self.log_processor_account, | 287 | self.log_processor_account, |
84 | 271 | self.log_processor_container, | 288 | self.log_processor_container, |
85 | 272 | 'processed_files.pickle.gz', | 289 | self.processed_files_filename, |
86 | 273 | compressed=True) | 290 | compressed=True) |
87 | 274 | buf = '\n'.join(x for x in processed_files_stream) | 291 | buf = '\n'.join(x for x in stream) |
88 | 275 | if buf: | 292 | if buf: |
90 | 276 | already_processed_files = cPickle.loads(buf) | 293 | files = cPickle.loads(buf) |
91 | 277 | else: | 294 | else: |
93 | 278 | already_processed_files = set() | 295 | return None |
94 | 279 | except BadFileDownload, err: | 296 | except BadFileDownload, err: |
95 | 280 | if err.status_code == 404: | 297 | if err.status_code == 404: |
97 | 281 | already_processed_files = set() | 298 | files = set() |
98 | 282 | else: | 299 | else: |
120 | 283 | self.logger.error(_('Log processing unable to load list of ' | 300 | return None |
121 | 284 | 'already processed log files')) | 301 | return files |
122 | 285 | return | 302 | |
123 | 286 | self.logger.debug(_('found %d processed files') % \ | 303 | def get_aggregate_data(self, processed_files, input_data): |
124 | 287 | len(already_processed_files)) | 304 | """ |
125 | 288 | logs_to_process = self.log_processor.get_data_list(lookback_start, | 305 | Aggregates stats data by account/hour, summing as needed. |
126 | 289 | lookback_end, | 306 | |
127 | 290 | already_processed_files) | 307 | :param processed_files: set of processed files |
128 | 291 | self.logger.info(_('loaded %d files to process') % | 308 | :param input_data: is the output from multiprocess_collate/the plugins. |
129 | 292 | len(logs_to_process)) | 309 | |
130 | 293 | if not logs_to_process: | 310 | :returns: A dict containing data aggregated from the input_data |
131 | 294 | self.logger.info(_("Log processing done (%0.2f minutes)") % | 311 | passed in. |
132 | 295 | ((time.time() - start) / 60)) | 312 | |
133 | 296 | return | 313 | The dict returned has tuple keys of the form: |
134 | 297 | 314 | (account, year, month, day, hour) | |
135 | 298 | # map | 315 | The dict returned has values that are dicts with items of this |
136 | 299 | processor_args = (self.total_conf, self.logger) | 316 | form: |
137 | 300 | results = multiprocess_collate(processor_args, logs_to_process, | 317 | key:field_value |
138 | 301 | self.worker_count) | 318 | - key corresponds to something in one of the plugin's keylist |
139 | 302 | 319 | mapping, something like the tuple (source, level, verb, code) | |
140 | 303 | #reduce | 320 | - field_value is the sum of the field_values for the |
141 | 321 | corresponding values in the input | ||
142 | 322 | |||
143 | 323 | Both input_data and the dict returned are hourly aggregations of | ||
144 | 324 | stats. | ||
145 | 325 | |||
146 | 326 | Multiple values for the same (account, hour, tuple key) found in | ||
147 | 327 | input_data are summed in the dict returned. | ||
148 | 328 | """ | ||
149 | 329 | |||
150 | 304 | aggr_data = {} | 330 | aggr_data = {} |
153 | 305 | processed_files = already_processed_files | 331 | for item, data in input_data: |
152 | 306 | for item, data in results: | ||
154 | 307 | # since item contains the plugin and the log name, new plugins will | 332 | # since item contains the plugin and the log name, new plugins will |
155 | 308 | # "reprocess" the file and the results will be in the final csv. | 333 | # "reprocess" the file and the results will be in the final csv. |
156 | 309 | processed_files.add(item) | 334 | processed_files.add(item) |
157 | @@ -315,14 +340,30 @@ | |||
158 | 315 | # processing plugins need to realize this | 340 | # processing plugins need to realize this |
159 | 316 | existing_data[i] = current + j | 341 | existing_data[i] = current + j |
160 | 317 | aggr_data[k] = existing_data | 342 | aggr_data[k] = existing_data |
166 | 318 | 343 | return aggr_data | |
167 | 319 | # group | 344 | |
168 | 320 | # reduce a large number of keys in aggr_data[k] to a small number of | 345 | def get_final_info(self, aggr_data): |
169 | 321 | # output keys | 346 | """ |
170 | 322 | keylist_mapping = self.log_processor.generate_keylist_mapping() | 347 | Aggregates data from aggr_data based on the keylist mapping. |
171 | 348 | |||
172 | 349 | :param aggr_data: The results of the get_aggregate_data function. | ||
173 | 350 | :returns: a dict of further aggregated data | ||
174 | 351 | |||
175 | 352 | The dict returned has keys of the form: | ||
176 | 353 | (account, year, month, day, hour) | ||
177 | 354 | The dict returned has values that are dicts with items of this | ||
178 | 355 | form: | ||
179 | 356 | 'field_name': field_value (int) | ||
180 | 357 | |||
181 | 358 | Data is aggregated as specified by the keylist mapping. The | ||
182 | 359 | keylist mapping specifies which keys to combine in aggr_data | ||
183 | 360 | and the final field_names for these combined keys in the dict | ||
184 | 361 | returned. Fields combined are summed. | ||
185 | 362 | """ | ||
186 | 363 | |||
187 | 323 | final_info = collections.defaultdict(dict) | 364 | final_info = collections.defaultdict(dict) |
188 | 324 | for account, data in aggr_data.items(): | 365 | for account, data in aggr_data.items(): |
190 | 325 | for key, mapping in keylist_mapping.items(): | 366 | for key, mapping in self.keylist_mapping.items(): |
191 | 326 | if isinstance(mapping, (list, set)): | 367 | if isinstance(mapping, (list, set)): |
192 | 327 | value = 0 | 368 | value = 0 |
193 | 328 | for k in mapping: | 369 | for k in mapping: |
194 | @@ -336,37 +377,154 @@ | |||
195 | 336 | except KeyError: | 377 | except KeyError: |
196 | 337 | value = 0 | 378 | value = 0 |
197 | 338 | final_info[account][key] = value | 379 | final_info[account][key] = value |
203 | 339 | 380 | return final_info | |
204 | 340 | # output | 381 | |
205 | 341 | sorted_keylist_mapping = sorted(keylist_mapping) | 382 | def store_processed_files_list(self, processed_files): |
206 | 342 | columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) | 383 | """ |
207 | 343 | out_buf = [columns] | 384 | Stores the proccessed files list in the stats account. |
208 | 385 | |||
209 | 386 | :param processed_files: set of processed files | ||
210 | 387 | """ | ||
211 | 388 | |||
212 | 389 | s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) | ||
213 | 390 | f = cStringIO.StringIO(s) | ||
214 | 391 | self.log_processor.internal_proxy.upload_file(f, | ||
215 | 392 | self.log_processor_account, | ||
216 | 393 | self.log_processor_container, | ||
217 | 394 | self.processed_files_filename) | ||
218 | 395 | |||
219 | 396 | def get_output(self, final_info): | ||
220 | 397 | """ | ||
221 | 398 | :returns: a list of rows to appear in the csv file. | ||
222 | 399 | |||
223 | 400 | The first row contains the column headers for the rest of the | ||
224 | 401 | rows in the returned list. | ||
225 | 402 | |||
226 | 403 | Each row after the first row corresponds to an account's data | ||
227 | 404 | for that hour. | ||
228 | 405 | """ | ||
229 | 406 | |||
230 | 407 | sorted_keylist_mapping = sorted(self.keylist_mapping) | ||
231 | 408 | columns = ['data_ts', 'account'] + sorted_keylist_mapping | ||
232 | 409 | output = [columns] | ||
233 | 344 | for (account, year, month, day, hour), d in final_info.items(): | 410 | for (account, year, month, day, hour), d in final_info.items(): |
237 | 345 | data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) | 411 | data_ts = '%04d/%02d/%02d %02d:00:00' % \ |
238 | 346 | row = [data_ts] | 412 | (int(year), int(month), int(day), int(hour)) |
239 | 347 | row.append('%s' % account) | 413 | row = [data_ts, '%s' % (account)] |
240 | 348 | for k in sorted_keylist_mapping: | 414 | for k in sorted_keylist_mapping: |
244 | 349 | row.append('%s' % d[k]) | 415 | row.append(str(d[k])) |
245 | 350 | out_buf.append(','.join(row)) | 416 | output.append(row) |
246 | 351 | out_buf = '\n'.join(out_buf) | 417 | return output |
247 | 418 | |||
248 | 419 | def store_output(self, output): | ||
249 | 420 | """ | ||
250 | 421 | Takes the a list of rows and stores a csv file of the values in the | ||
251 | 422 | stats account. | ||
252 | 423 | |||
253 | 424 | :param output: list of rows to appear in the csv file | ||
254 | 425 | |||
255 | 426 | This csv file is final product of this script. | ||
256 | 427 | """ | ||
257 | 428 | |||
258 | 429 | out_buf = '\n'.join([','.join(row) for row in output]) | ||
259 | 352 | h = hashlib.md5(out_buf).hexdigest() | 430 | h = hashlib.md5(out_buf).hexdigest() |
260 | 353 | upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h | 431 | upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h |
261 | 354 | f = cStringIO.StringIO(out_buf) | 432 | f = cStringIO.StringIO(out_buf) |
262 | 355 | self.log_processor.internal_proxy.upload_file(f, | 433 | self.log_processor.internal_proxy.upload_file(f, |
274 | 356 | self.log_processor_account, | 434 | self.log_processor_account, |
275 | 357 | self.log_processor_container, | 435 | self.log_processor_container, |
276 | 358 | upload_name) | 436 | upload_name) |
277 | 359 | 437 | ||
278 | 360 | # cleanup | 438 | @property |
279 | 361 | s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) | 439 | def keylist_mapping(self): |
280 | 362 | f = cStringIO.StringIO(s) | 440 | """ |
281 | 363 | self.log_processor.internal_proxy.upload_file(f, | 441 | :returns: the keylist mapping. |
282 | 364 | self.log_processor_account, | 442 | |
283 | 365 | self.log_processor_container, | 443 | The keylist mapping determines how the stats fields are aggregated in |
284 | 366 | 'processed_files.pickle.gz') | 444 | the final aggregation step. |
285 | 445 | """ | ||
286 | 446 | |||
287 | 447 | if self._keylist_mapping == None: | ||
288 | 448 | self._keylist_mapping = \ | ||
289 | 449 | self.log_processor.generate_keylist_mapping() | ||
290 | 450 | return self._keylist_mapping | ||
291 | 451 | |||
292 | 452 | def process_logs(self, logs_to_process, processed_files): | ||
293 | 453 | """ | ||
294 | 454 | :param logs_to_process: list of logs to process | ||
295 | 455 | :param processed_files: set of processed files | ||
296 | 456 | |||
297 | 457 | :returns: returns a list of rows of processed data. | ||
298 | 458 | |||
299 | 459 | The first row is the column headers. The rest of the rows contain | ||
300 | 460 | hourly aggregate data for the account specified in the row. | ||
301 | 461 | |||
302 | 462 | Files processed are added to the processed_files set. | ||
303 | 463 | |||
304 | 464 | When a large data structure is no longer needed, it is deleted in | ||
305 | 465 | an effort to conserve memory. | ||
306 | 466 | """ | ||
307 | 467 | |||
308 | 468 | # map | ||
309 | 469 | processor_args = (self.total_conf, self.logger) | ||
310 | 470 | results = multiprocess_collate(processor_args, logs_to_process, | ||
311 | 471 | self.worker_count) | ||
312 | 472 | |||
313 | 473 | # reduce | ||
314 | 474 | aggr_data = self.get_aggregate_data(processed_files, results) | ||
315 | 475 | del results | ||
316 | 476 | |||
317 | 477 | # group | ||
318 | 478 | # reduce a large number of keys in aggr_data[k] to a small | ||
319 | 479 | # number of output keys | ||
320 | 480 | final_info = self.get_final_info(aggr_data) | ||
321 | 481 | del aggr_data | ||
322 | 482 | |||
323 | 483 | # output | ||
324 | 484 | return self.get_output(final_info) | ||
325 | 485 | |||
326 | 486 | def run_once(self, *args, **kwargs): | ||
327 | 487 | """ | ||
328 | 488 | Process log files that fall within the lookback interval. | ||
329 | 489 | |||
330 | 490 | Upload resulting csv file to stats account. | ||
331 | 491 | |||
332 | 492 | Update processed files list and upload to stats account. | ||
333 | 493 | """ | ||
334 | 494 | |||
335 | 495 | for k in 'lookback_hours lookback_window'.split(): | ||
336 | 496 | if k in kwargs and kwargs[k] is not None: | ||
337 | 497 | setattr(self, k, kwargs[k]) | ||
338 | 498 | |||
339 | 499 | start = time.time() | ||
340 | 500 | self.logger.info(_("Beginning log processing")) | ||
341 | 501 | |||
342 | 502 | lookback_start, lookback_end = self.get_lookback_interval() | ||
343 | 503 | self.logger.debug('lookback_start: %s' % lookback_start) | ||
344 | 504 | self.logger.debug('lookback_end: %s' % lookback_end) | ||
345 | 505 | |||
346 | 506 | processed_files = self.get_processed_files_list() | ||
347 | 507 | if processed_files == None: | ||
348 | 508 | self.logger.error(_('Log processing unable to load list of ' | ||
349 | 509 | 'already processed log files')) | ||
350 | 510 | return | ||
351 | 511 | self.logger.debug(_('found %d processed files') % | ||
352 | 512 | len(processed_files)) | ||
353 | 513 | |||
354 | 514 | logs_to_process = self.log_processor.get_data_list(lookback_start, | ||
355 | 515 | lookback_end, processed_files) | ||
356 | 516 | self.logger.info(_('loaded %d files to process') % | ||
357 | 517 | len(logs_to_process)) | ||
358 | 518 | |||
359 | 519 | if logs_to_process: | ||
360 | 520 | output = self.process_logs(logs_to_process, processed_files) | ||
361 | 521 | self.store_output(output) | ||
362 | 522 | del output | ||
363 | 523 | |||
364 | 524 | self.store_processed_files_list(processed_files) | ||
365 | 367 | 525 | ||
366 | 368 | self.logger.info(_("Log processing done (%0.2f minutes)") % | 526 | self.logger.info(_("Log processing done (%0.2f minutes)") % |
368 | 369 | ((time.time() - start) / 60)) | 527 | ((time.time() - start) / 60)) |
369 | 370 | 528 | ||
370 | 371 | 529 | ||
371 | 372 | def multiprocess_collate(processor_args, logs_to_process, worker_count): | 530 | def multiprocess_collate(processor_args, logs_to_process, worker_count): |
372 | 373 | 531 | ||
373 | === modified file 'test/unit/stats/test_log_processor.py' | |||
374 | --- test/unit/stats/test_log_processor.py 2011-03-17 22:22:43 +0000 | |||
375 | +++ test/unit/stats/test_log_processor.py 2011-04-11 21:52:38 +0000 | |||
376 | @@ -16,6 +16,10 @@ | |||
377 | 16 | import unittest | 16 | import unittest |
378 | 17 | from test.unit import tmpfile | 17 | from test.unit import tmpfile |
379 | 18 | import Queue | 18 | import Queue |
380 | 19 | import datetime | ||
381 | 20 | import hashlib | ||
382 | 21 | import pickle | ||
383 | 22 | import time | ||
384 | 19 | 23 | ||
385 | 20 | from swift.common import internal_proxy | 24 | from swift.common import internal_proxy |
386 | 21 | from swift.stats import log_processor | 25 | from swift.stats import log_processor |
387 | @@ -26,7 +30,6 @@ | |||
388 | 26 | def __init__(self, *args, **kwargs): | 30 | def __init__(self, *args, **kwargs): |
389 | 27 | pass | 31 | pass |
390 | 28 | 32 | ||
391 | 29 | |||
392 | 30 | class DumbLogger(object): | 33 | class DumbLogger(object): |
393 | 31 | def __getattr__(self, n): | 34 | def __getattr__(self, n): |
394 | 32 | return self.foo | 35 | return self.foo |
395 | @@ -77,7 +80,7 @@ | |||
396 | 77 | return self.code, data() | 80 | return self.code, data() |
397 | 78 | 81 | ||
398 | 79 | class TestLogProcessor(unittest.TestCase): | 82 | class TestLogProcessor(unittest.TestCase): |
400 | 80 | 83 | ||
401 | 81 | access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ | 84 | access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ |
402 | 82 | '09/Jul/2010/04/14/30 GET '\ | 85 | '09/Jul/2010/04/14/30 GET '\ |
403 | 83 | '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ | 86 | '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ |
404 | @@ -85,7 +88,7 @@ | |||
405 | 85 | '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' | 88 | '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' |
406 | 86 | stats_test_line = 'account,1,2,3' | 89 | stats_test_line = 'account,1,2,3' |
407 | 87 | proxy_config = {'log-processor': { | 90 | proxy_config = {'log-processor': { |
409 | 88 | 91 | ||
410 | 89 | } | 92 | } |
411 | 90 | } | 93 | } |
412 | 91 | 94 | ||
413 | @@ -426,3 +429,407 @@ | |||
414 | 426 | finally: | 429 | finally: |
415 | 427 | log_processor.LogProcessor._internal_proxy = None | 430 | log_processor.LogProcessor._internal_proxy = None |
416 | 428 | log_processor.LogProcessor.get_object_data = orig_get_object_data | 431 | log_processor.LogProcessor.get_object_data = orig_get_object_data |
417 | 432 | |||
418 | 433 | class TestLogProcessorDaemon(unittest.TestCase): | ||
419 | 434 | |||
420 | 435 | def test_get_lookback_interval(self): | ||
421 | 436 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
422 | 437 | def __init__(self, lookback_hours, lookback_window): | ||
423 | 438 | self.lookback_hours = lookback_hours | ||
424 | 439 | self.lookback_window = lookback_window | ||
425 | 440 | |||
426 | 441 | try: | ||
427 | 442 | d = datetime.datetime | ||
428 | 443 | |||
429 | 444 | for x in [ | ||
430 | 445 | [d(2011, 1, 1), 0, 0, None, None], | ||
431 | 446 | [d(2011, 1, 1), 120, 0, '2010122700', None], | ||
432 | 447 | [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'], | ||
433 | 448 | [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'], | ||
434 | 449 | [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'], | ||
435 | 450 | [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'], | ||
436 | 451 | ]: | ||
437 | 452 | |||
438 | 453 | log_processor.now = lambda: x[0] | ||
439 | 454 | |||
440 | 455 | d = MockLogProcessorDaemon(x[1], x[2]) | ||
441 | 456 | self.assertEquals((x[3], x[4]), d.get_lookback_interval()) | ||
442 | 457 | finally: | ||
443 | 458 | log_processor.now = datetime.datetime.now | ||
444 | 459 | |||
445 | 460 | def test_get_processed_files_list(self): | ||
446 | 461 | class MockLogProcessor(): | ||
447 | 462 | def __init__(self, stream): | ||
448 | 463 | self.stream = stream | ||
449 | 464 | |||
450 | 465 | def get_object_data(self, *args, **kwargs): | ||
451 | 466 | return self.stream | ||
452 | 467 | |||
453 | 468 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
454 | 469 | def __init__(self, stream): | ||
455 | 470 | self.log_processor = MockLogProcessor(stream) | ||
456 | 471 | self.log_processor_account = 'account' | ||
457 | 472 | self.log_processor_container = 'container' | ||
458 | 473 | self.processed_files_filename = 'filename' | ||
459 | 474 | |||
460 | 475 | file_list = set(['a', 'b', 'c']) | ||
461 | 476 | |||
462 | 477 | for s, l in [['', None], | ||
463 | 478 | [pickle.dumps(set()).split('\n'), set()], | ||
464 | 479 | [pickle.dumps(file_list).split('\n'), file_list], | ||
465 | 480 | ]: | ||
466 | 481 | |||
467 | 482 | self.assertEquals(l, | ||
468 | 483 | MockLogProcessorDaemon(s).get_processed_files_list()) | ||
469 | 484 | |||
470 | 485 | def test_get_processed_files_list_bad_file_downloads(self): | ||
471 | 486 | class MockLogProcessor(): | ||
472 | 487 | def __init__(self, status_code): | ||
473 | 488 | self.err = log_processor.BadFileDownload(status_code) | ||
474 | 489 | |||
475 | 490 | def get_object_data(self, *a, **k): | ||
476 | 491 | raise self.err | ||
477 | 492 | |||
478 | 493 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
479 | 494 | def __init__(self, status_code): | ||
480 | 495 | self.log_processor = MockLogProcessor(status_code) | ||
481 | 496 | self.log_processor_account = 'account' | ||
482 | 497 | self.log_processor_container = 'container' | ||
483 | 498 | self.processed_files_filename = 'filename' | ||
484 | 499 | |||
485 | 500 | for c, l in [[404, set()], [503, None], [None, None]]: | ||
486 | 501 | self.assertEquals(l, | ||
487 | 502 | MockLogProcessorDaemon(c).get_processed_files_list()) | ||
488 | 503 | |||
489 | 504 | def test_get_aggregate_data(self): | ||
490 | 505 | # when run "for real" | ||
491 | 506 | # the various keys/values in the input and output | ||
492 | 507 | # dictionaries are often not simple strings | ||
493 | 508 | # for testing we can use keys that are easier to work with | ||
494 | 509 | |||
495 | 510 | processed_files = set() | ||
496 | 511 | |||
497 | 512 | data_in = [ | ||
498 | 513 | ['file1', { | ||
499 | 514 | 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3}, | ||
500 | 515 | 'acct1_time2': {'field1': 4, 'field2': 5}, | ||
501 | 516 | 'acct2_time1': {'field1': 6, 'field2': 7}, | ||
502 | 517 | 'acct3_time3': {'field1': 8, 'field2': 9}, | ||
503 | 518 | } | ||
504 | 519 | ], | ||
505 | 520 | ['file2', {'acct1_time1': {'field1': 10}}], | ||
506 | 521 | ] | ||
507 | 522 | |||
508 | 523 | expected_data_out = { | ||
509 | 524 | 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3}, | ||
510 | 525 | 'acct1_time2': {'field1': 4, 'field2': 5}, | ||
511 | 526 | 'acct2_time1': {'field1': 6, 'field2': 7}, | ||
512 | 527 | 'acct3_time3': {'field1': 8, 'field2': 9}, | ||
513 | 528 | } | ||
514 | 529 | |||
515 | 530 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
516 | 531 | def __init__(self): | ||
517 | 532 | pass | ||
518 | 533 | |||
519 | 534 | d = MockLogProcessorDaemon() | ||
520 | 535 | data_out = d.get_aggregate_data(processed_files, data_in) | ||
521 | 536 | |||
522 | 537 | for k, v in expected_data_out.items(): | ||
523 | 538 | self.assertEquals(v, data_out[k]) | ||
524 | 539 | |||
525 | 540 | self.assertEquals(set(['file1', 'file2']), processed_files) | ||
526 | 541 | |||
527 | 542 | def test_get_final_info(self): | ||
528 | 543 | # when run "for real" | ||
529 | 544 | # the various keys/values in the input and output | ||
530 | 545 | # dictionaries are often not simple strings | ||
531 | 546 | # for testing we can use keys/values that are easier to work with | ||
532 | 547 | |||
533 | 548 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
534 | 549 | def __init__(self): | ||
535 | 550 | self._keylist_mapping = { | ||
536 | 551 | 'out_field1':['field1', 'field2', 'field3'], | ||
537 | 552 | 'out_field2':['field2', 'field3'], | ||
538 | 553 | 'out_field3':['field3'], | ||
539 | 554 | 'out_field4':'field4', | ||
540 | 555 | 'out_field5':['field6', 'field7', 'field8'], | ||
541 | 556 | 'out_field6':['field6'], | ||
542 | 557 | 'out_field7':'field7', | ||
543 | 558 | } | ||
544 | 559 | |||
545 | 560 | data_in = { | ||
546 | 561 | 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3, | ||
547 | 562 | 'field4': 8, 'field5': 11}, | ||
548 | 563 | 'acct1_time2': {'field1': 4, 'field2': 5}, | ||
549 | 564 | 'acct2_time1': {'field1': 6, 'field2': 7}, | ||
550 | 565 | 'acct3_time3': {'field1': 8, 'field2': 9}, | ||
551 | 566 | } | ||
552 | 567 | |||
553 | 568 | expected_data_out = { | ||
554 | 569 | 'acct1_time1': {'out_field1': 16, 'out_field2': 5, | ||
555 | 570 | 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, | ||
556 | 571 | 'out_field6': 0, 'out_field7': 0,}, | ||
557 | 572 | 'acct1_time2': {'out_field1': 9, 'out_field2': 5, | ||
558 | 573 | 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, | ||
559 | 574 | 'out_field6': 0, 'out_field7': 0,}, | ||
560 | 575 | 'acct2_time1': {'out_field1': 13, 'out_field2': 7, | ||
561 | 576 | 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, | ||
562 | 577 | 'out_field6': 0, 'out_field7': 0,}, | ||
563 | 578 | 'acct3_time3': {'out_field1': 17, 'out_field2': 9, | ||
564 | 579 | 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, | ||
565 | 580 | 'out_field6': 0, 'out_field7': 0,}, | ||
566 | 581 | } | ||
567 | 582 | |||
568 | 583 | self.assertEquals(expected_data_out, | ||
569 | 584 | MockLogProcessorDaemon().get_final_info(data_in)) | ||
570 | 585 | |||
571 | 586 | def test_store_processed_files_list(self): | ||
572 | 587 | class MockInternalProxy: | ||
573 | 588 | def __init__(self, test, daemon, processed_files): | ||
574 | 589 | self.test = test | ||
575 | 590 | self.daemon = daemon | ||
576 | 591 | self.processed_files = processed_files | ||
577 | 592 | |||
578 | 593 | def upload_file(self, f, account, container, filename): | ||
579 | 594 | self.test.assertEquals(self.processed_files, | ||
580 | 595 | pickle.loads(f.getvalue())) | ||
581 | 596 | self.test.assertEquals(self.daemon.log_processor_account, | ||
582 | 597 | account) | ||
583 | 598 | self.test.assertEquals(self.daemon.log_processor_container, | ||
584 | 599 | container) | ||
585 | 600 | self.test.assertEquals(self.daemon.processed_files_filename, | ||
586 | 601 | filename) | ||
587 | 602 | |||
588 | 603 | class MockLogProcessor: | ||
589 | 604 | def __init__(self, test, daemon, processed_files): | ||
590 | 605 | self.internal_proxy = MockInternalProxy(test, daemon, | ||
591 | 606 | processed_files) | ||
592 | 607 | |||
593 | 608 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
594 | 609 | def __init__(self, test, processed_files): | ||
595 | 610 | self.log_processor = \ | ||
596 | 611 | MockLogProcessor(test, self, processed_files) | ||
597 | 612 | self.log_processor_account = 'account' | ||
598 | 613 | self.log_processor_container = 'container' | ||
599 | 614 | self.processed_files_filename = 'filename' | ||
600 | 615 | |||
601 | 616 | processed_files = set(['a', 'b', 'c']) | ||
602 | 617 | MockLogProcessorDaemon(self, processed_files).\ | ||
603 | 618 | store_processed_files_list(processed_files) | ||
604 | 619 | |||
605 | 620 | def test_get_output(self): | ||
606 | 621 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
607 | 622 | def __init__(self): | ||
608 | 623 | self._keylist_mapping = {'a':None, 'b':None, 'c':None} | ||
609 | 624 | |||
610 | 625 | data_in = { | ||
611 | 626 | ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3}, | ||
612 | 627 | ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30}, | ||
613 | 628 | ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12}, | ||
614 | 629 | ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25}, | ||
615 | 630 | } | ||
616 | 631 | |||
617 | 632 | expected_data_out = [ | ||
618 | 633 | ['data_ts', 'account', 'a', 'b', 'c'], | ||
619 | 634 | ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'], | ||
620 | 635 | ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], | ||
621 | 636 | ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], | ||
622 | 637 | ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], | ||
623 | 638 | ] | ||
624 | 639 | |||
625 | 640 | data_out = MockLogProcessorDaemon().get_output(data_in) | ||
626 | 641 | self.assertEquals(expected_data_out[0], data_out[0]) | ||
627 | 642 | |||
628 | 643 | for row in data_out[1:]: | ||
629 | 644 | self.assert_(row in expected_data_out) | ||
630 | 645 | |||
631 | 646 | for row in expected_data_out[1:]: | ||
632 | 647 | self.assert_(row in data_out) | ||
633 | 648 | |||
634 | 649 | def test_store_output(self): | ||
635 | 650 | try: | ||
636 | 651 | real_strftime = time.strftime | ||
637 | 652 | mock_strftime_return = '2010/03/02/01/' | ||
638 | 653 | def mock_strftime(format): | ||
639 | 654 | self.assertEquals('%Y/%m/%d/%H/', format) | ||
640 | 655 | return mock_strftime_return | ||
641 | 656 | log_processor.time.strftime = mock_strftime | ||
642 | 657 | |||
643 | 658 | data_in = [ | ||
644 | 659 | ['data_ts', 'account', 'a', 'b', 'c'], | ||
645 | 660 | ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'], | ||
646 | 661 | ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], | ||
647 | 662 | ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], | ||
648 | 663 | ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], | ||
649 | 664 | ] | ||
650 | 665 | |||
651 | 666 | expected_output = '\n'.join([','.join(row) for row in data_in]) | ||
652 | 667 | h = hashlib.md5(expected_output).hexdigest() | ||
653 | 668 | expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h) | ||
654 | 669 | |||
655 | 670 | class MockInternalProxy: | ||
656 | 671 | def __init__(self, test, daemon, expected_filename, | ||
657 | 672 | expected_output): | ||
658 | 673 | self.test = test | ||
659 | 674 | self.daemon = daemon | ||
660 | 675 | self.expected_filename = expected_filename | ||
661 | 676 | self.expected_output = expected_output | ||
662 | 677 | |||
663 | 678 | def upload_file(self, f, account, container, filename): | ||
664 | 679 | self.test.assertEquals(self.daemon.log_processor_account, | ||
665 | 680 | account) | ||
666 | 681 | self.test.assertEquals(self.daemon.log_processor_container, | ||
667 | 682 | container) | ||
668 | 683 | self.test.assertEquals(self.expected_filename, filename) | ||
669 | 684 | self.test.assertEquals(self.expected_output, f.getvalue()) | ||
670 | 685 | |||
671 | 686 | class MockLogProcessor: | ||
672 | 687 | def __init__(self, test, daemon, expected_filename, | ||
673 | 688 | expected_output): | ||
674 | 689 | self.internal_proxy = MockInternalProxy(test, daemon, | ||
675 | 690 | expected_filename, expected_output) | ||
676 | 691 | |||
677 | 692 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
678 | 693 | def __init__(self, test, expected_filename, expected_output): | ||
679 | 694 | self.log_processor = MockLogProcessor(test, self, | ||
680 | 695 | expected_filename, expected_output) | ||
681 | 696 | self.log_processor_account = 'account' | ||
682 | 697 | self.log_processor_container = 'container' | ||
683 | 698 | self.processed_files_filename = 'filename' | ||
684 | 699 | |||
685 | 700 | MockLogProcessorDaemon(self, expected_filename, expected_output).\ | ||
686 | 701 | store_output(data_in) | ||
687 | 702 | finally: | ||
688 | 703 | log_processor.time.strftime = real_strftime | ||
689 | 704 | |||
690 | 705 | def test_keylist_mapping(self): | ||
691 | 706 | # Kind of lame test to see if the propery is both | ||
692 | 707 | # generated by a particular method and cached properly. | ||
693 | 708 | # The method that actually generates the mapping is | ||
694 | 709 | # tested elsewhere. | ||
695 | 710 | |||
696 | 711 | value_return = 'keylist_mapping' | ||
697 | 712 | class MockLogProcessor: | ||
698 | 713 | def __init__(self): | ||
699 | 714 | self.call_count = 0 | ||
700 | 715 | |||
701 | 716 | def generate_keylist_mapping(self): | ||
702 | 717 | self.call_count += 1 | ||
703 | 718 | return value_return | ||
704 | 719 | |||
705 | 720 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
706 | 721 | def __init__(self): | ||
707 | 722 | self.log_processor = MockLogProcessor() | ||
708 | 723 | self._keylist_mapping = None | ||
709 | 724 | |||
710 | 725 | d = MockLogProcessorDaemon() | ||
711 | 726 | self.assertEquals(value_return, d.keylist_mapping) | ||
712 | 727 | self.assertEquals(value_return, d.keylist_mapping) | ||
713 | 728 | self.assertEquals(1, d.log_processor.call_count) | ||
714 | 729 | |||
715 | 730 | def test_process_logs(self): | ||
716 | 731 | try: | ||
717 | 732 | mock_logs_to_process = 'logs_to_process' | ||
718 | 733 | mock_processed_files = 'processed_files' | ||
719 | 734 | |||
720 | 735 | real_multiprocess_collate = log_processor.multiprocess_collate | ||
721 | 736 | multiprocess_collate_return = 'multiprocess_collate_return' | ||
722 | 737 | |||
723 | 738 | get_aggregate_data_return = 'get_aggregate_data_return' | ||
724 | 739 | get_final_info_return = 'get_final_info_return' | ||
725 | 740 | get_output_return = 'get_output_return' | ||
726 | 741 | |||
727 | 742 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
728 | 743 | def __init__(self, test): | ||
729 | 744 | self.test = test | ||
730 | 745 | self.total_conf = 'total_conf' | ||
731 | 746 | self.logger = 'logger' | ||
732 | 747 | self.worker_count = 'worker_count' | ||
733 | 748 | |||
734 | 749 | def get_aggregate_data(self, processed_files, results): | ||
735 | 750 | self.test.assertEquals(mock_processed_files, processed_files) | ||
736 | 751 | self.test.assertEquals(multiprocess_collate_return, results) | ||
737 | 752 | return get_aggregate_data_return | ||
738 | 753 | |||
739 | 754 | def get_final_info(self, aggr_data): | ||
740 | 755 | self.test.assertEquals(get_aggregate_data_return, aggr_data) | ||
741 | 756 | return get_final_info_return | ||
742 | 757 | |||
743 | 758 | def get_output(self, final_info): | ||
744 | 759 | self.test.assertEquals(get_final_info_return, final_info) | ||
745 | 760 | return get_output_return | ||
746 | 761 | |||
747 | 762 | d = MockLogProcessorDaemon(self) | ||
748 | 763 | |||
749 | 764 | def mock_multiprocess_collate(processor_args, logs_to_process, | ||
750 | 765 | worker_count): | ||
751 | 766 | self.assertEquals(d.total_conf, processor_args[0]) | ||
752 | 767 | self.assertEquals(d.logger, processor_args[1]) | ||
753 | 768 | |||
754 | 769 | self.assertEquals(mock_logs_to_process, logs_to_process) | ||
755 | 770 | self.assertEquals(d.worker_count, worker_count) | ||
756 | 771 | |||
757 | 772 | return multiprocess_collate_return | ||
758 | 773 | |||
759 | 774 | log_processor.multiprocess_collate = mock_multiprocess_collate | ||
760 | 775 | |||
761 | 776 | output = d.process_logs(mock_logs_to_process, mock_processed_files) | ||
762 | 777 | self.assertEquals(get_output_return, output) | ||
763 | 778 | finally: | ||
764 | 779 | log_processor.multiprocess_collate = real_multiprocess_collate | ||
765 | 780 | |||
766 | 781 | def test_run_once_get_processed_files_list_returns_none(self): | ||
767 | 782 | class MockLogProcessor: | ||
768 | 783 | def get_data_list(self, lookback_start, lookback_end, | ||
769 | 784 | processed_files): | ||
770 | 785 | raise unittest.TestCase.failureException, \ | ||
771 | 786 | 'Method should not be called' | ||
772 | 787 | |||
773 | 788 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
774 | 789 | def __init__(self): | ||
775 | 790 | self.logger = DumbLogger() | ||
776 | 791 | self.log_processor = MockLogProcessor() | ||
777 | 792 | |||
778 | 793 | def get_lookback_interval(self): | ||
779 | 794 | return None, None | ||
780 | 795 | |||
781 | 796 | def get_processed_files_list(self): | ||
782 | 797 | return None | ||
783 | 798 | |||
784 | 799 | MockLogProcessorDaemon().run_once() | ||
785 | 800 | |||
786 | 801 | def test_run_once_no_logs_to_process(self): | ||
787 | 802 | class MockLogProcessor(): | ||
788 | 803 | def __init__(self, daemon, test): | ||
789 | 804 | self.daemon = daemon | ||
790 | 805 | self.test = test | ||
791 | 806 | |||
792 | 807 | def get_data_list(self, lookback_start, lookback_end, | ||
793 | 808 | processed_files): | ||
794 | 809 | self.test.assertEquals(self.daemon.lookback_start, | ||
795 | 810 | lookback_start) | ||
796 | 811 | self.test.assertEquals(self.daemon.lookback_end, | ||
797 | 812 | lookback_end) | ||
798 | 813 | self.test.assertEquals(self.daemon.processed_files, | ||
799 | 814 | processed_files) | ||
800 | 815 | return [] | ||
801 | 816 | |||
802 | 817 | class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): | ||
803 | 818 | def __init__(self, test): | ||
804 | 819 | self.logger = DumbLogger() | ||
805 | 820 | self.log_processor = MockLogProcessor(self, test) | ||
806 | 821 | self.lookback_start = 'lookback_start' | ||
807 | 822 | self.lookback_end = 'lookback_end' | ||
808 | 823 | self.processed_files = ['a', 'b', 'c'] | ||
809 | 824 | |||
810 | 825 | def get_lookback_interval(self): | ||
811 | 826 | return self.lookback_start, self.lookback_end | ||
812 | 827 | |||
813 | 828 | def get_processed_files_list(self): | ||
814 | 829 | return self.processed_files | ||
815 | 830 | |||
816 | 831 | def process_logs(logs_to_process, processed_files): | ||
817 | 832 | raise unittest.TestCase.failureException, \ | ||
818 | 833 | 'Method should not be called' | ||
819 | 834 | |||
820 | 835 | MockLogProcessorDaemon(self).run_once() |
pep8 line 35, log_processor.py
test_get_output seems broken. perhaps you forgot "in" on the self.assert_() calls (lines 645, 648)? another option is to sort the expected out and the given out and compare equality