Merge lp:~sidnei/txstatsd/cooperate into lp:txstatsd
- cooperate
- Merge into trunk
Proposed by
Sidnei da Silva
Status: | Merged |
---|---|
Approved by: | Guillermo Gonzalez |
Approved revision: | 113 |
Merged at revision: | 108 |
Proposed branch: | lp:~sidnei/txstatsd/cooperate |
Merge into: | lp:txstatsd |
Diff against target: |
1162 lines (+489/-176) 15 files modified
txstatsd/metrics/distinctmetric.py (+1/-1) txstatsd/metrics/timermetric.py (+3/-3) txstatsd/protocol.py (+4/-1) txstatsd/report.py (+118/-0) txstatsd/server/configurableprocessor.py (+6/-20) txstatsd/server/loggingprocessor.py (+3/-4) txstatsd/server/processor.py (+58/-81) txstatsd/server/protocol.py (+16/-15) txstatsd/service.py (+25/-13) txstatsd/tests/test_client.py (+11/-3) txstatsd/tests/test_configurableprocessor.py (+7/-7) txstatsd/tests/test_inspector.py (+204/-0) txstatsd/tests/test_loggingprocessor.py (+3/-2) txstatsd/tests/test_processor.py (+30/-25) txstatsd/tests/test_service.py (+0/-1) |
To merge this branch: | bzr merge lp:~sidnei/txstatsd/cooperate |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Guillermo Gonzalez | Approve | ||
Review via email: mp+168197@code.launchpad.net |
Commit message
Use cooperator to yield more often and avoid blocking the reactor for too long.
Description of the change
Use cooperator to yield more often and avoid blocking the reactor for too long.
To post a comment you must log in.
lp:~sidnei/txstatsd/cooperate
updated
- 113. By Sidnei da Silva
-
- Add reactor inspector tests
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'txstatsd/metrics/distinctmetric.py' |
2 | --- txstatsd/metrics/distinctmetric.py 2013-03-08 13:27:16 +0000 |
3 | +++ txstatsd/metrics/distinctmetric.py 2013-06-10 14:01:47 +0000 |
4 | @@ -165,7 +165,7 @@ |
5 | ".count_1min": self.count_1min(now), |
6 | ".count_1hour": self.count_1hour(now), |
7 | ".count_1day": self.count_1day(now)} |
8 | - for item, value in items.iteritems(): |
9 | + for item, value in sorted(items.iteritems()): |
10 | metrics.append((self.prefix + self.name + item, value, timestamp)) |
11 | return metrics |
12 | |
13 | |
14 | === modified file 'txstatsd/metrics/timermetric.py' |
15 | --- txstatsd/metrics/timermetric.py 2012-11-20 11:56:07 +0000 |
16 | +++ txstatsd/metrics/timermetric.py 2013-06-10 14:01:47 +0000 |
17 | @@ -118,7 +118,7 @@ |
18 | @param percentiles: One or more percentiles. |
19 | """ |
20 | return [percentile for percentile in |
21 | - self.histogram.percentiles(*percentiles)] |
22 | + self.histogram.percentiles(*percentiles)] |
23 | |
24 | def get_values(self): |
25 | """Returns a list of all recorded durations in the timer's sample.""" |
26 | @@ -145,8 +145,8 @@ |
27 | ".999percentile": percentiles[5], |
28 | ".count": self.count, |
29 | ".rate": self.rate(timestamp), |
30 | - } |
31 | - for item, value in items.iteritems(): |
32 | + } |
33 | + for item, value in sorted(items.iteritems()): |
34 | metrics.append((self.prefix + self.name + item, |
35 | round(value, 6), timestamp)) |
36 | self.clear(timestamp) |
37 | |
38 | === modified file 'txstatsd/protocol.py' |
39 | --- txstatsd/protocol.py 2012-12-27 20:49:06 +0000 |
40 | +++ txstatsd/protocol.py 2013-06-10 14:01:47 +0000 |
41 | @@ -21,7 +21,7 @@ |
42 | |
43 | import socket |
44 | |
45 | -from twisted.internet.defer import inlineCallbacks, returnValue, Deferred |
46 | +from twisted.internet import abstract |
47 | from twisted.internet.protocol import DatagramProtocol |
48 | from twisted.python import log |
49 | |
50 | @@ -139,6 +139,9 @@ |
51 | self.transport = None |
52 | self.transport_gateway = None |
53 | |
54 | + if abstract.isIPAddress(host): |
55 | + self.host_resolved(host) |
56 | + |
57 | def __str__(self): |
58 | return "%s:%d" % (self.host, self.port) |
59 | |
60 | |
61 | === modified file 'txstatsd/report.py' |
62 | --- txstatsd/report.py 2012-06-28 17:29:26 +0000 |
63 | +++ txstatsd/report.py 2013-06-10 14:01:47 +0000 |
64 | @@ -19,6 +19,14 @@ |
65 | # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
66 | # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
67 | |
68 | +import os |
69 | +import sys |
70 | +import time |
71 | +import logging |
72 | +import threading |
73 | +import traceback |
74 | +import Queue |
75 | + |
76 | from twisted.internet.defer import maybeDeferred |
77 | from twisted.internet.task import LoopingCall |
78 | from twisted.python import log |
79 | @@ -82,3 +90,113 @@ |
80 | for task, interval in self.tasks: |
81 | task.stop() |
82 | Service.stopService(self) |
83 | + |
84 | + |
85 | +class ReactorInspector(threading.Thread): |
86 | + """Log message with a time delta from the last call.""" |
87 | + |
88 | + def __init__(self, reactor_call, metrics, loop_time=3, log=log.msg): |
89 | + self.running = False |
90 | + self.stopped = False |
91 | + self.queue = Queue.Queue() |
92 | + self.reactor_call = reactor_call |
93 | + self.loop_time = loop_time |
94 | + self.last_responsive_ts = 0 |
95 | + self.reactor_thread = None |
96 | + self.metrics = metrics |
97 | + super(ReactorInspector, self).__init__() |
98 | + self.daemon = True |
99 | + self.log = log |
100 | + |
101 | + def start(self): |
102 | + """Start the thread. Should be called from the reactor main thread.""" |
103 | + self.reactor_thread = threading.currentThread().ident |
104 | + if not self.running: |
105 | + self.running = True |
106 | + super(ReactorInspector, self).start() |
107 | + |
108 | + def stop(self): |
109 | + """Stop the thread.""" |
110 | + self.stopped = True |
111 | + self.log("ReactorInspector: stopped") |
112 | + |
113 | + def dump_frames(self): |
114 | + """Dump frames info to log file.""" |
115 | + current = threading.currentThread().ident |
116 | + frames = sys._current_frames() |
117 | + for frame_id, frame in frames.iteritems(): |
118 | + if frame_id == current: |
119 | + continue |
120 | + |
121 | + stack = ''.join(traceback.format_stack(frame)) |
122 | + |
123 | + if frame_id == self.reactor_thread: |
124 | + title = "Dumping Python frame for reactor main thread" |
125 | + else: |
126 | + title = "Dumping Python frame" |
127 | + self.log("%s %s (pid: %d):\n%s" % |
128 | + (title, frame_id, os.getpid(), stack), |
129 | + logLevel=logging.DEBUG) |
130 | + |
131 | + def run(self): |
132 | + """Start running the thread.""" |
133 | + self.log("ReactorInspector: started") |
134 | + msg_id = 0 |
135 | + oldest_pending_request_ts = time.time() |
136 | + while not self.stopped: |
137 | + def task(msg_id=msg_id, tini=time.time()): |
138 | + """Put result in queue with initial and completed times.""" |
139 | + self.queue.put((msg_id, tini, time.time())) |
140 | + self.reactor_call(task) |
141 | + time.sleep(self.loop_time) |
142 | + try: |
143 | + id_sent, tini, tsent = self.queue.get_nowait() |
144 | + except Queue.Empty: |
145 | + # Oldest pending request is still out there |
146 | + delay = time.time() - oldest_pending_request_ts |
147 | + self.metrics.gauge("delay", delay) |
148 | + self.log("ReactorInspector: detected unresponsive!" |
149 | + " (current: %d, pid: %d) delay: %.3f" % ( |
150 | + msg_id, os.getpid(), delay), |
151 | + logLevel=logging.CRITICAL) |
152 | + self.dump_frames() |
153 | + else: |
154 | + delay = tsent - tini |
155 | + self.metrics.gauge("delay", delay) |
156 | + if msg_id > id_sent: |
157 | + self.log("ReactorInspector: late (current: %d, " |
158 | + "got: %d, pid: %d, cleaning queue) " |
159 | + "delay: %.3f" % (msg_id, id_sent, |
160 | + os.getpid(), delay), |
161 | + logLevel=logging.WARNING) |
162 | + while not self.queue.empty(): |
163 | + self.queue.get_nowait() |
164 | + # About to start a new request with nothing pending |
165 | + oldest_pending_request_ts = time.time() |
166 | + else: |
167 | + assert msg_id == id_sent |
168 | + # About to start a new request with nothing pending |
169 | + self.last_responsive_ts = time.time() |
170 | + oldest_pending_request_ts = self.last_responsive_ts |
171 | + self.log("ReactorInspector: ok (msg: %d, " |
172 | + "pid: %d) delay: %.3f" % ( |
173 | + msg_id, os.getpid(), delay), |
174 | + logLevel=logging.DEBUG) |
175 | + finally: |
176 | + msg_id += 1 |
177 | + |
178 | + |
179 | +class ReactorInspectorService(Service): |
180 | + """Start/stop the reactor inspector service.""" |
181 | + |
182 | + def __init__(self, reactor, metrics, loop_time=3): |
183 | + self.inspector = ReactorInspector( |
184 | + reactor.callFromThread, metrics, loop_time) |
185 | + |
186 | + def startService(self): |
187 | + Service.startService(self) |
188 | + self.inspector.start() |
189 | + |
190 | + def stopService(self): |
191 | + self.inspector.stop() |
192 | + Service.stopService(self) |
193 | |
194 | === modified file 'txstatsd/server/configurableprocessor.py' |
195 | --- txstatsd/server/configurableprocessor.py 2012-06-28 17:29:26 +0000 |
196 | +++ txstatsd/server/configurableprocessor.py 2013-06-10 14:01:47 +0000 |
197 | @@ -60,8 +60,9 @@ |
198 | |
199 | def compose_timer_metric(self, key, duration): |
200 | if not key in self.timer_metrics: |
201 | - metric = TimerMetricReporter(key, |
202 | - wall_time_func=self.time_function, prefix=self.message_prefix) |
203 | + metric = TimerMetricReporter( |
204 | + key, wall_time_func=self.time_function, |
205 | + prefix=self.message_prefix) |
206 | self.timer_metrics[key] = metric |
207 | self.timer_metrics[key].update(duration) |
208 | |
209 | @@ -93,31 +94,16 @@ |
210 | self.meter_metrics[key].mark(value) |
211 | |
212 | def flush_counter_metrics(self, interval, timestamp): |
213 | - metrics = [] |
214 | - events = 0 |
215 | for metric in self.counter_metrics.itervalues(): |
216 | messages = metric.report(timestamp) |
217 | - metrics.extend(messages) |
218 | - events += 1 |
219 | - |
220 | - return (metrics, events) |
221 | + yield messages |
222 | |
223 | def flush_gauge_metrics(self, timestamp): |
224 | - metrics = [] |
225 | - events = 0 |
226 | for metric in self.gauge_metrics.itervalues(): |
227 | messages = metric.report(timestamp) |
228 | - metrics.extend(messages) |
229 | - events += 1 |
230 | - |
231 | - return (metrics, events) |
232 | + yield messages |
233 | |
234 | def flush_timer_metrics(self, percent, timestamp): |
235 | - metrics = [] |
236 | - events = 0 |
237 | for metric in self.timer_metrics.itervalues(): |
238 | messages = metric.report(timestamp) |
239 | - metrics.extend(messages) |
240 | - events += 1 |
241 | - |
242 | - return (metrics, events) |
243 | + yield messages |
244 | |
245 | === modified file 'txstatsd/server/loggingprocessor.py' |
246 | --- txstatsd/server/loggingprocessor.py 2012-06-28 17:29:26 +0000 |
247 | +++ txstatsd/server/loggingprocessor.py 2013-06-10 14:01:47 +0000 |
248 | @@ -48,8 +48,7 @@ |
249 | |
250 | def flush(self, interval=10000, percent=90): |
251 | """Log all received metric samples to the supplied logger.""" |
252 | - messages = list(super(LoggingMessageProcessor, self).flush( |
253 | - interval=interval, percent=percent)) |
254 | - for msg in messages: |
255 | + parent = super(LoggingMessageProcessor, self) |
256 | + for msg in parent.flush(interval=interval, percent=percent): |
257 | self.logger.info("Out: %s %s %s" % msg) |
258 | - return messages |
259 | + yield msg |
260 | |
261 | === modified file 'txstatsd/server/processor.py' |
262 | --- txstatsd/server/processor.py 2012-06-28 17:29:26 +0000 |
263 | +++ txstatsd/server/processor.py 2013-06-10 14:01:47 +0000 |
264 | @@ -25,6 +25,7 @@ |
265 | import logging |
266 | |
267 | from twisted.python import log |
268 | +from twisted.internet.task import Cooperator |
269 | |
270 | from txstatsd.metrics.metermetric import MeterMetricReporter |
271 | |
272 | @@ -226,73 +227,75 @@ |
273 | Flush all queued stats, computing a normalized count based on |
274 | C{interval} and mean timings based on C{threshold}. |
275 | """ |
276 | - messages = [] |
277 | per_metric = {} |
278 | num_stats = 0 |
279 | interval = interval / 1000 |
280 | timestamp = int(self.time_function()) |
281 | |
282 | start = self.time_function() |
283 | - counter_metrics, events = self.flush_counter_metrics(interval, |
284 | - timestamp) |
285 | + events = 0 |
286 | + for metrics in self.flush_counter_metrics(interval, timestamp): |
287 | + for metric in metrics: |
288 | + yield metric |
289 | + events += 1 |
290 | duration = self.time_function() - start |
291 | - if events > 0: |
292 | - messages.extend(sorted(counter_metrics)) |
293 | - num_stats += events |
294 | + num_stats += events |
295 | per_metric["counter"] = (events, duration) |
296 | |
297 | start = self.time_function() |
298 | - timer_metrics, events = self.flush_timer_metrics(percent, timestamp) |
299 | + events = 0 |
300 | + for metrics in self.flush_timer_metrics(percent, timestamp): |
301 | + for metric in metrics: |
302 | + yield metric |
303 | + events += 1 |
304 | duration = self.time_function() - start |
305 | - if events > 0: |
306 | - messages.extend(sorted(timer_metrics)) |
307 | - num_stats += events |
308 | + num_stats += events |
309 | per_metric["timer"] = (events, duration) |
310 | |
311 | start = self.time_function() |
312 | - gauge_metrics, events = self.flush_gauge_metrics(timestamp) |
313 | + events = 0 |
314 | + for metrics in self.flush_gauge_metrics(timestamp): |
315 | + for metric in metrics: |
316 | + yield metric |
317 | + events += 1 |
318 | duration = self.time_function() - start |
319 | - if events > 0: |
320 | - messages.extend(sorted(gauge_metrics)) |
321 | - num_stats += events |
322 | + num_stats += events |
323 | per_metric["gauge"] = (events, duration) |
324 | |
325 | start = self.time_function() |
326 | - meter_metrics, events = self.flush_meter_metrics(timestamp) |
327 | + events = 0 |
328 | + for metrics in self.flush_meter_metrics(timestamp): |
329 | + for metric in metrics: |
330 | + yield metric |
331 | + events += 1 |
332 | duration = self.time_function() - start |
333 | - if events > 0: |
334 | - messages.extend(sorted(meter_metrics)) |
335 | - num_stats += events |
336 | + num_stats += events |
337 | per_metric["meter"] = (events, duration) |
338 | |
339 | start = self.time_function() |
340 | - plugin_metrics, events = self.flush_plugin_metrics(interval, timestamp) |
341 | + events = 0 |
342 | + for metrics in self.flush_plugin_metrics(interval, timestamp): |
343 | + for metric in metrics: |
344 | + yield metric |
345 | + events += 1 |
346 | duration = self.time_function() - start |
347 | - if events > 0: |
348 | - messages.extend(sorted(plugin_metrics)) |
349 | - num_stats += events |
350 | + num_stats += events |
351 | per_metric["plugin"] = (events, duration) |
352 | |
353 | - self.flush_metrics_summary(messages, num_stats, per_metric, timestamp) |
354 | - return messages |
355 | + for metrics in self.flush_metrics_summary(num_stats, per_metric, |
356 | + timestamp): |
357 | + for metric in metrics: |
358 | + yield metric |
359 | |
360 | def flush_counter_metrics(self, interval, timestamp): |
361 | - metrics = [] |
362 | - events = 0 |
363 | for key, count in self.counter_metrics.iteritems(): |
364 | self.counter_metrics[key] = 0 |
365 | |
366 | value = count / interval |
367 | - metrics.append((self.stats_prefix + key, value, timestamp)) |
368 | - metrics.append((self.count_prefix + key, count, timestamp)) |
369 | - events += 1 |
370 | - |
371 | - return (metrics, events) |
372 | + yield ((self.stats_prefix + key, value, timestamp), |
373 | + (self.count_prefix + key, count, timestamp)) |
374 | |
375 | def flush_timer_metrics(self, percent, timestamp): |
376 | - metrics = [] |
377 | - events = 0 |
378 | - |
379 | threshold_value = ((100 - percent) / 100.0) |
380 | for key, timers in self.timer_metrics.iteritems(): |
381 | count = len(timers) |
382 | @@ -318,78 +321,52 @@ |
383 | ".upper_%s" % percent: threshold_upper, |
384 | ".lower": lower, |
385 | ".count": count} |
386 | - for item, value in items.iteritems(): |
387 | - metrics.append((self.timer_prefix + key + item, |
388 | - value, timestamp)) |
389 | - events += 1 |
390 | - |
391 | - return (metrics, events) |
392 | + yield sorted((self.timer_prefix + key + item, value, timestamp) |
393 | + for item, value in items.iteritems()) |
394 | |
395 | def flush_gauge_metrics(self, timestamp): |
396 | - metrics = [] |
397 | - events = 0 |
398 | for metric in self.gauge_metrics: |
399 | value = metric[0] |
400 | key = metric[1] |
401 | |
402 | - metrics.append((self.gauge_prefix + key + ".value", |
403 | - value, timestamp)) |
404 | - events += 1 |
405 | + yield ((self.gauge_prefix + key + ".value", value, timestamp),) |
406 | |
407 | self.gauge_metrics.clear() |
408 | |
409 | - return (metrics, events) |
410 | - |
411 | def flush_meter_metrics(self, timestamp): |
412 | - metrics = [] |
413 | - events = 0 |
414 | for metric in self.meter_metrics.itervalues(): |
415 | messages = metric.report(timestamp) |
416 | - metrics.extend(messages) |
417 | - events += 1 |
418 | - |
419 | - return (metrics, events) |
420 | + yield messages |
421 | |
422 | def flush_plugin_metrics(self, interval, timestamp): |
423 | - metrics = [] |
424 | - events = 0 |
425 | - |
426 | for metric in self.plugin_metrics.itervalues(): |
427 | messages = metric.flush(interval, timestamp) |
428 | - metrics.extend(messages) |
429 | - events += 1 |
430 | - |
431 | - return (metrics, events) |
432 | - |
433 | - def flush_metrics_summary(self, messages, num_stats, |
434 | - per_metric, timestamp): |
435 | - |
436 | - messages.append((self.internal_metrics_prefix + "numStats", |
437 | - num_stats, timestamp)) |
438 | + yield messages |
439 | + |
440 | + def flush_metrics_summary(self, num_stats, per_metric, timestamp): |
441 | + yield ((self.internal_metrics_prefix + "numStats", |
442 | + num_stats, timestamp),) |
443 | |
444 | self.last_flush_duration = 0 |
445 | for name, (value, duration) in per_metric.iteritems(): |
446 | - messages.extend([ |
447 | - (self.internal_metrics_prefix + |
448 | - "flush.%s.count" % name, |
449 | - value, timestamp), |
450 | - (self.internal_metrics_prefix + |
451 | - "flush.%s.duration" % name, |
452 | - duration * 1000, timestamp)]) |
453 | + yield ((self.internal_metrics_prefix + |
454 | + "flush.%s.count" % name, |
455 | + value, timestamp), |
456 | + (self.internal_metrics_prefix + |
457 | + "flush.%s.duration" % name, |
458 | + duration * 1000, timestamp)) |
459 | log.msg("Flushed %d %s metrics in %.6f" % |
460 | (value, name, duration)) |
461 | self.last_flush_duration += duration |
462 | |
463 | self.last_process_duration = 0 |
464 | for metric_type, duration in self.process_timings.iteritems(): |
465 | - messages.extend([ |
466 | - (self.internal_metrics_prefix + |
467 | - "receive.%s.count" % |
468 | - metric_type, self.by_type[metric_type], timestamp), |
469 | - (self.internal_metrics_prefix + |
470 | - "receive.%s.duration" % |
471 | - metric_type, duration * 1000, timestamp) |
472 | - ]) |
473 | + yield ((self.internal_metrics_prefix + |
474 | + "receive.%s.count" % |
475 | + metric_type, self.by_type[metric_type], timestamp), |
476 | + (self.internal_metrics_prefix + |
477 | + "receive.%s.duration" % |
478 | + metric_type, duration * 1000, timestamp)) |
479 | log.msg("Processing %d %s metrics took %.6f" % |
480 | (self.by_type[metric_type], metric_type, duration)) |
481 | self.last_process_duration += duration |
482 | |
483 | === modified file 'txstatsd/server/protocol.py' |
484 | --- txstatsd/server/protocol.py 2012-06-28 17:29:26 +0000 |
485 | +++ txstatsd/server/protocol.py 2013-06-10 14:01:47 +0000 |
486 | @@ -31,8 +31,8 @@ |
487 | server via TCP. |
488 | """ |
489 | |
490 | - def __init__(self, processor, |
491 | - monitor_message=None, monitor_response=None): |
492 | + def __init__(self, processor, monitor_message=None, |
493 | + monitor_response=None): |
494 | self.processor = processor |
495 | self.monitor_message = monitor_message |
496 | self.monitor_response = monitor_response |
497 | @@ -42,9 +42,10 @@ |
498 | if data == self.monitor_message: |
499 | # Send the expected response to the |
500 | # monitoring agent. |
501 | - self.transport.write(self.monitor_response, (host, port)) |
502 | - else: |
503 | - self.processor.process(data) |
504 | + return self.transport.write( |
505 | + self.monitor_response, (host, port)) |
506 | + return self.transport.reactor.callLater( |
507 | + 0, self.processor.process, data) |
508 | |
509 | |
510 | class StatsDTCPServerProtocol(LineReceiver): |
511 | @@ -54,8 +55,8 @@ |
512 | server via TCP. |
513 | """ |
514 | |
515 | - def __init__(self, processor, |
516 | - monitor_message=None, monitor_response=None): |
517 | + def __init__(self, processor, monitor_message=None, |
518 | + monitor_response=None): |
519 | self.processor = processor |
520 | self.monitor_message = monitor_message |
521 | self.monitor_response = monitor_response |
522 | @@ -65,20 +66,20 @@ |
523 | if data == self.monitor_message: |
524 | # Send the expected response to the |
525 | # monitoring agent. |
526 | - self.transport.write(self.monitor_response) |
527 | - else: |
528 | - self.processor.process(data) |
529 | + return self.transport.write(self.monitor_response) |
530 | + return self.transport.reactor.callLater( |
531 | + 0, self.processor.process, data) |
532 | |
533 | |
534 | class StatsDTCPServerFactory(Factory): |
535 | |
536 | - def __init__(self, processor, |
537 | - monitor_message=None, monitor_response=None): |
538 | + def __init__(self, processor, monitor_message=None, |
539 | + monitor_response=None): |
540 | self.processor = processor |
541 | self.monitor_message = monitor_message |
542 | self.monitor_response = monitor_response |
543 | |
544 | def buildProtocol(self, addr): |
545 | - return StatsDTCPServerProtocol(self.processor, |
546 | - self.monitor_message, self.monitor_response) |
547 | - |
548 | + return StatsDTCPServerProtocol( |
549 | + self.processor, self.monitor_message, |
550 | + self.monitor_response) |
551 | |
552 | === modified file 'txstatsd/service.py' |
553 | --- txstatsd/service.py 2012-06-28 17:29:26 +0000 |
554 | +++ txstatsd/service.py 2013-06-10 14:01:47 +0000 |
555 | @@ -41,7 +41,7 @@ |
556 | StatsDServerProtocol, StatsDTCPServerFactory) |
557 | from txstatsd.server.router import Router |
558 | from txstatsd.server import httpinfo |
559 | -from txstatsd.report import ReportingService |
560 | +from txstatsd.report import ReportingService, ReactorInspectorService |
561 | from txstatsd.itxstatsd import IMetricFactory |
562 | from twisted.application.service import Service |
563 | from twisted.internet import task |
564 | @@ -211,19 +211,26 @@ |
565 | self.processor = processor |
566 | self.flush_interval = flush_interval |
567 | self.flush_task = task.LoopingCall(self.flushProcessor) |
568 | + self.coop = task.Cooperator() |
569 | if clock is not None: |
570 | self.flush_task.clock = clock |
571 | |
572 | def flushProcessor(self): |
573 | """Flush messages queued in the processor to Graphite.""" |
574 | - flushed = 0 |
575 | start = time.time() |
576 | - for metric, value, timestamp in self.processor.flush( |
577 | - interval=self.flush_interval): |
578 | - self.carbon_client.sendDatapoint(metric, (timestamp, value)) |
579 | - flushed += 1 |
580 | - log.msg("Flushed total %d metrics in %.6f" % |
581 | - (flushed, time.time() - start)) |
582 | + interval = self.flush_interval |
583 | + flush = self.processor.flush |
584 | + |
585 | + def doWork(): |
586 | + flushed = 0 |
587 | + for metric, value, timestamp in flush(interval=interval): |
588 | + yield self.carbon_client.sendDatapoint( |
589 | + metric, (timestamp, value)) |
590 | + flushed += 1 |
591 | + log.msg("Flushed total %d metrics in %.6f" % |
592 | + (flushed, time.time() - start)) |
593 | + |
594 | + self.coop.coiterate(doWork()) |
595 | |
596 | def startService(self): |
597 | self.flush_task.start(self.flush_interval / 1000, False) |
598 | @@ -276,7 +283,7 @@ |
599 | # LoggingMessageProcessor supersedes |
600 | # any other processor class in "dump-mode" |
601 | assert not hasattr(log, 'info') |
602 | - log.info = log.msg # for compatibility with LMP logger interface |
603 | + log.info = log.msg # for compatibility with LMP logger interface |
604 | processor = functools.partial(LoggingMessageProcessor, logger=log) |
605 | |
606 | if options["statsd-compliance"]: |
607 | @@ -315,6 +322,11 @@ |
608 | process.report_reactor_stats(reactor), 60, metrics.gauge) |
609 | reports = [name.strip() for name in options["report"].split(",")] |
610 | for report_name in reports: |
611 | + if report_name == "reactor": |
612 | + inspector = ReactorInspectorService(reactor, metrics, |
613 | + loop_time=0.05) |
614 | + inspector.setServiceParent(root_service) |
615 | + |
616 | for reporter in getattr(process, "%s_STATS" % |
617 | report_name.upper(), ()): |
618 | reporting.schedule(reporter, 60, metrics.gauge) |
619 | @@ -343,12 +355,12 @@ |
620 | |
621 | if options["listen-tcp-port"] is not None: |
622 | statsd_tcp_server_factory = StatsDTCPServerFactory( |
623 | - input_router, |
624 | - monitor_message=options["monitor-message"], |
625 | - monitor_response=options["monitor-response"]) |
626 | + input_router, |
627 | + monitor_message=options["monitor-message"], |
628 | + monitor_response=options["monitor-response"]) |
629 | |
630 | listener = TCPServer(options["listen-tcp-port"], |
631 | - statsd_tcp_server_factory) |
632 | + statsd_tcp_server_factory) |
633 | listener.setServiceParent(root_service) |
634 | |
635 | httpinfo_service = httpinfo.makeService(options, processor, statsd_service) |
636 | |
637 | === modified file 'txstatsd/tests/test_client.py' |
638 | --- txstatsd/tests/test_client.py 2013-03-07 16:33:45 +0000 |
639 | +++ txstatsd/tests/test_client.py 2013-06-10 14:01:47 +0000 |
640 | @@ -198,11 +198,18 @@ |
641 | |
642 | self.assertIsInstance(self.client.data_queue, DataQueue) |
643 | |
644 | - def test_starts_without_transport_gateway(self): |
645 | + def test_starts_with_transport_gateway_if_ip(self): |
646 | """The client starts without a TransportGateway.""" |
647 | self.client = TwistedStatsDClient('127.0.0.1', 8000) |
648 | self.build_protocol() |
649 | |
650 | + self.assertTrue(self.client.transport_gateway is not None) |
651 | + |
652 | + def test_starts_without_transport_gateway_if_not_ip(self): |
653 | + """The client starts without a TransportGateway.""" |
654 | + self.client = TwistedStatsDClient('localhost', 8000) |
655 | + self.build_protocol() |
656 | + |
657 | self.assertTrue(self.client.transport_gateway is None) |
658 | |
659 | def test_passes_transport_to_gateway(self): |
660 | @@ -269,7 +276,8 @@ |
661 | callback = Mock() |
662 | self.client.transport_gateway.write.return_value = bytes_sent |
663 | self.assertEqual(self.client.write(message, callback), bytes_sent) |
664 | - self.client.transport_gateway.write.assert_called_once_with(message, callback) |
665 | + self.client.transport_gateway.write.assert_called_once_with( |
666 | + message, callback) |
667 | |
668 | def test_sends_messages_to_queue_before_host_resolves(self): |
669 | """Before the host is resolved, send messages to the DataQueue.""" |
670 | @@ -277,7 +285,7 @@ |
671 | self.build_protocol() |
672 | |
673 | message = 'some data' |
674 | - self.client.data_queue = Mock(spec=DataQueue) |
675 | + self.client.data_queue = Mock(spec=DataQueue) |
676 | callback = Mock() |
677 | self.client.data_queue.write.return_value = None |
678 | result = self.client.write(message, callback) |
679 | |
680 | === modified file 'txstatsd/tests/test_configurableprocessor.py' |
681 | --- txstatsd/tests/test_configurableprocessor.py 2012-06-28 17:29:26 +0000 |
682 | +++ txstatsd/tests/test_configurableprocessor.py 2013-06-10 14:01:47 +0000 |
683 | @@ -40,7 +40,7 @@ |
684 | configurable_processor = ConfigurableMessageProcessor( |
685 | time_function=lambda: 42) |
686 | configurable_processor.process("gorets:17|c") |
687 | - messages = configurable_processor.flush() |
688 | + messages = list(configurable_processor.flush()) |
689 | self.assertEqual(("gorets.count", 17, 42), messages[0]) |
690 | self.assertEqual(("statsd.numStats", 1, 42), messages[1]) |
691 | |
692 | @@ -51,7 +51,7 @@ |
693 | configurable_processor = ConfigurableMessageProcessor( |
694 | time_function=lambda: 42, message_prefix="test.metric") |
695 | configurable_processor.process("gorets:17|c") |
696 | - messages = configurable_processor.flush() |
697 | + messages = list(configurable_processor.flush()) |
698 | self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0]) |
699 | self.assertEqual(("test.metric.statsd.numStats", 1, 42), |
700 | messages[1]) |
701 | @@ -64,7 +64,7 @@ |
702 | time_function=lambda: 42, message_prefix="test.metric", |
703 | internal_metrics_prefix="statsd.foo.") |
704 | configurable_processor.process("gorets:17|c") |
705 | - messages = configurable_processor.flush() |
706 | + messages = list(configurable_processor.flush()) |
707 | self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0]) |
708 | self.assertEqual(("statsd.foo.numStats", 1, 42), |
709 | messages[1]) |
710 | @@ -77,7 +77,7 @@ |
711 | time_function=lambda: 42, message_prefix="test.metric", |
712 | plugins=[distinct_metric_factory]) |
713 | configurable_processor.process("gorets:17|pd") |
714 | - messages = configurable_processor.flush() |
715 | + messages = list(configurable_processor.flush()) |
716 | self.assertEquals(("test.metric.gorets.count", 1, 42), messages[0]) |
717 | |
718 | def test_flush_single_timer_single_time(self): |
719 | @@ -94,7 +94,7 @@ |
720 | configurable_processor.process("glork:24|ms") |
721 | _now = 42 |
722 | |
723 | - messages = configurable_processor.flush() |
724 | + messages = list(configurable_processor.flush()) |
725 | messages.sort() |
726 | |
727 | expected = [ |
728 | @@ -128,7 +128,7 @@ |
729 | configurable_processor.process("glork:42|ms") |
730 | |
731 | _now = 42 |
732 | - messages = configurable_processor.flush() |
733 | + messages = list(configurable_processor.flush()) |
734 | messages.sort() |
735 | |
736 | expected = [ |
737 | @@ -169,7 +169,7 @@ |
738 | self.configurable_processor.process("gorets:3.0|m") |
739 | |
740 | self.time_now += 1 |
741 | - messages = self.configurable_processor.flush() |
742 | + messages = list(self.configurable_processor.flush()) |
743 | self.assertEqual(("test.metric.gorets.count", 3.0, self.time_now), |
744 | messages[0]) |
745 | self.assertEqual(("test.metric.gorets.rate", 3.0, self.time_now), |
746 | |
747 | === added file 'txstatsd/tests/test_inspector.py' |
748 | --- txstatsd/tests/test_inspector.py 1970-01-01 00:00:00 +0000 |
749 | +++ txstatsd/tests/test_inspector.py 2013-06-10 14:01:47 +0000 |
750 | @@ -0,0 +1,204 @@ |
751 | +# Copyright (C) 2011-2012 Canonical Services Ltd |
752 | +# |
753 | +# Permission is hereby granted, free of charge, to any person obtaining |
754 | +# a copy of this software and associated documentation files (the |
755 | +# "Software"), to deal in the Software without restriction, including |
756 | +# without limitation the rights to use, copy, modify, merge, publish, |
757 | +# distribute, sublicense, and/or sell copies of the Software, and to |
758 | +# permit persons to whom the Software is furnished to do so, subject to |
759 | +# the following conditions: |
760 | +# |
761 | +# The above copyright notice and this permission notice shall be |
762 | +# included in all copies or substantial portions of the Software. |
763 | +# |
764 | +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
765 | +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
766 | +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
767 | +# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
768 | +# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, |
769 | +# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
770 | +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
771 | +"""Tests for the ReactorInspector.""" |
772 | + |
773 | +import re |
774 | +import time |
775 | +import logging |
776 | +import threading |
777 | + |
778 | +from twisted.trial.unittest import TestCase as TwistedTestCase |
779 | +from twisted.internet import reactor, defer |
780 | + |
781 | +from txstatsd.report import ReactorInspector |
782 | + |
783 | + |
784 | +def parse_delay(msg): |
785 | + return float(re.search("delay: (\d+.\d{1,3})", msg).group(1)) |
786 | + |
787 | + |
788 | +class ReactorInspectorTestCase(TwistedTestCase): |
789 | + """Test the ReactorInspector class.""" |
790 | + |
791 | + def setUp(self): |
792 | + """Set up.""" |
793 | + |
794 | + class Helper(object): |
795 | + """Fake object with a controllable call.""" |
796 | + def __init__(self): |
797 | + self.call_count = 1 |
798 | + self.calls = [] |
799 | + self.ri = None |
800 | + |
801 | + def call(self, func): |
802 | + """Call function when counter is 0, then stop running.""" |
803 | + self.call_count -= 1 |
804 | + self.calls.append(func) |
805 | + if self.call_count == 0: |
806 | + for f in self.calls: |
807 | + f() |
808 | + if self.call_count <= 0: |
809 | + self.ri.stop() |
810 | + |
811 | + class FakeMetrics(object): |
812 | + """Fake Metrics object that records calls.""" |
813 | + def __init__(self): |
814 | + """Initialize calls.""" |
815 | + self.calls = [] |
816 | + |
817 | + def meter(self, name, count): |
818 | + """Record call to meter().""" |
819 | + self.calls.append(("meter", name, count)) |
820 | + |
821 | + def gauge(self, name, val): |
822 | + """Record call to gauge().""" |
823 | + self.calls.append(("gauge", name, round(val, 3))) |
824 | + |
825 | + def log(msg, logLevel=None): |
826 | + self.logged.append((msg, logLevel)) |
827 | + |
828 | + self.helper = Helper() |
829 | + self.fake_metrics = FakeMetrics() |
830 | + self.ri = ReactorInspector(self.helper.call, self.fake_metrics, |
831 | + loop_time=.1, log=log) |
832 | + self.helper.ri = self.ri |
833 | + self.logged = [] |
834 | + |
835 | + def check_log(self, *expected, **kw): |
836 | + logLevel = kw.get("logLevel", None) |
837 | + for (msg, level) in self.logged: |
838 | + if level == logLevel and all(m in msg for m in expected): |
839 | + return msg |
840 | + return False |
841 | + |
842 | + def run_ri(self, call_count=None, join=True): |
843 | + """Set the call count and then run the ReactorInspector.""" |
844 | + if call_count is not None: |
845 | + self.helper.call_count = call_count |
846 | + # pylint: disable=W0201 |
847 | + self.start_ts = time.time() |
848 | + self.ri.start() |
849 | + # Reactor will stop after call_count calls, thanks to helper |
850 | + if join: |
851 | + self.ri.join() |
852 | + |
853 | + def test_stop(self): |
854 | + """It stops.""" |
855 | + self.run_ri(1000, join=False) |
856 | + assert self.ri.is_alive() |
857 | + self.ri.stop() |
858 | + self.ri.join() |
859 | + self.assertFalse(self.ri.is_alive()) |
860 | + |
861 | + @defer.inlineCallbacks |
862 | + def test_dump_frames(self): |
863 | + """Test how frames are dumped. |
864 | + |
865 | + Rules: |
866 | + - own frame must not be logged |
867 | + - must log all other threads |
868 | + - main reactor thread must have special title |
869 | + """ |
870 | + # other thread, whose frame must be logged |
871 | + waitingd = defer.Deferred() |
872 | + |
873 | + def waiting_function(): |
874 | + """Function with funny name to be checked later.""" |
875 | + reactor.callFromThread(waitingd.callback, True) |
876 | + # wait have a default value; pylint: disable=E1120 |
877 | + event.wait() |
878 | + |
879 | + event = threading.Event() |
880 | + threading.Thread(target=waiting_function).start() |
881 | + # Make sure the thread has entered the waiting_function |
882 | + yield waitingd |
883 | + |
884 | + # Set reactor_thread since we're not starting the ReactorInspector |
885 | + # thread here. |
886 | + self.ri.reactor_thread = threading.currentThread().ident |
887 | + |
888 | + # dump frames in other thread, also |
889 | + def dumping_function(): |
890 | + """Function with funny name to be checked later.""" |
891 | + time.sleep(.1) |
892 | + self.ri.dump_frames() |
893 | + reactor.callFromThread(d.callback, True) |
894 | + |
895 | + d = defer.Deferred() |
896 | + threading.Thread(target=dumping_function).start() |
897 | + yield d |
898 | + event.set() |
899 | + |
900 | + # check |
901 | + self.assertFalse(self.check_log("dumping_function", |
902 | + logLevel=logging.DEBUG)) |
903 | + self.assertTrue(self.check_log("Dumping Python frame", |
904 | + "waiting_function", |
905 | + logLevel=logging.DEBUG)) |
906 | + self.assertTrue(self.check_log("Dumping Python frame", |
907 | + "reactor main thread", |
908 | + logLevel=logging.DEBUG)) |
909 | + |
910 | + def test_reactor_ok(self): |
911 | + """Reactor working fast.""" |
912 | + self.run_ri() |
913 | + ok_line = self.assertTrue(self.check_log("ReactorInspector: ok", |
914 | + logLevel=logging.DEBUG)) |
915 | + # Check the metrics |
916 | + delay = parse_delay(ok_line) |
917 | + expected_metric = ("gauge", "delay", delay) |
918 | + self.assertEqual([expected_metric], self.fake_metrics.calls) |
919 | + self.assertTrue(self.ri.last_responsive_ts >= self.start_ts) |
920 | + |
921 | + @defer.inlineCallbacks |
922 | + def test_reactor_blocked(self): |
923 | + """Reactor not working fast.""" |
924 | + dump_frames_called = defer.Deferred() |
925 | + self.ri.dump_frames = lambda: dump_frames_called.callback(True) |
926 | + self.run_ri(0) |
927 | + yield dump_frames_called |
928 | + log_line = self.check_log("ReactorInspector", |
929 | + "detected unresponsive", |
930 | + logLevel=logging.CRITICAL) |
931 | + self.assertTrue(log_line) |
932 | + delay = parse_delay(log_line) |
933 | + self.assertTrue(delay >= .1) # waited for entire loop time |
934 | + # Check the metrics |
935 | + expected_metric = ("gauge", "delay", delay) |
936 | + self.assertEqual([expected_metric], self.fake_metrics.calls) |
937 | + |
938 | + self.assertTrue(self.ri.last_responsive_ts < self.start_ts) |
939 | + |
940 | + def test_reactor_back_alive(self): |
941 | + """Reactor resurrects after some loops.""" |
942 | + self.run_ri(3) |
943 | + late_line = self.check_log("ReactorInspector: late", |
944 | + "got: 0", logLevel=logging.WARNING) |
945 | + self.assertTrue(late_line) |
946 | + delay = parse_delay(late_line) |
947 | + self.assertTrue(delay >= .2) # At least 2 cycles of delay |
948 | + # Check the metrics |
949 | + expected_metric = ("gauge", "delay", delay) |
950 | + self.assertEqual(expected_metric, self.fake_metrics.calls[-1]) |
951 | + |
952 | + self.assertTrue(self.ri.queue.empty()) |
953 | + # A late reactor is not considered responsive (until a successful loop) |
954 | + self.assertTrue(self.ri.last_responsive_ts < self.start_ts) |
955 | |
956 | === modified file 'txstatsd/tests/test_loggingprocessor.py' |
957 | --- txstatsd/tests/test_loggingprocessor.py 2012-06-28 17:29:26 +0000 |
958 | +++ txstatsd/tests/test_loggingprocessor.py 2013-06-10 14:01:47 +0000 |
959 | @@ -30,6 +30,7 @@ |
960 | def report(self, *args): |
961 | return [('Sample report', 1, 2)] |
962 | |
963 | + |
964 | class TestLogger(object): |
965 | def __init__(self): |
966 | self.log = '' |
967 | @@ -63,7 +64,7 @@ |
968 | processor = LoggingMessageProcessor(logger) |
969 | metric = FakeMeterMetric() |
970 | processor.meter_metrics['test'] = metric |
971 | - processor.flush() |
972 | + list(processor.flush()) |
973 | expected = ["Out: %s %s %s" % message |
974 | for message in metric.report()] |
975 | self.assertFalse(set(expected).difference(logger.log.splitlines())) |
976 | @@ -75,7 +76,7 @@ |
977 | time_function=lambda: 42) |
978 | msg_in = "gorets:17|pd" |
979 | processor.process(msg_in) |
980 | - processor.flush() |
981 | + list(processor.flush()) |
982 | messages = processor.plugin_metrics['gorets'].flush( |
983 | 10, processor.time_function()) |
984 | expected = ["In: %s" % msg_in] + ["Out: %s %s %s" % message |
985 | |
986 | === modified file 'txstatsd/tests/test_processor.py' |
987 | --- txstatsd/tests/test_processor.py 2012-06-28 17:29:26 +0000 |
988 | +++ txstatsd/tests/test_processor.py 2013-06-10 14:01:47 +0000 |
989 | @@ -189,22 +189,25 @@ |
990 | flushed. |
991 | """ |
992 | self.timer.set([0, |
993 | - 0, 1, # counter |
994 | - 1, 3, # timer |
995 | - 3, 6, # gauge |
996 | - 6, 10, # meter |
997 | - 10, 15, # plugin |
998 | + 0, 1, # counter |
999 | + 1, 3, # timer |
1000 | + 3, 6, # gauge |
1001 | + 6, 10, # meter |
1002 | + 10, 15, # plugin |
1003 | ]) |
1004 | - def flush_metrics_summary(messages, num_stats, per_metric, timestamp): |
1005 | + |
1006 | + def flush_metrics_summary(num_stats, per_metric, timestamp): |
1007 | self.assertEqual((0, 1), per_metric["counter"]) |
1008 | self.assertEqual((0, 2), per_metric["timer"]) |
1009 | self.assertEqual((0, 3), per_metric["gauge"]) |
1010 | self.assertEqual((0, 4), per_metric["meter"]) |
1011 | self.assertEqual((0, 5), per_metric["plugin"]) |
1012 | + yield () |
1013 | + |
1014 | self.addCleanup(setattr, self.processor, "flush_metrics_summary", |
1015 | self.processor.flush_metrics_summary) |
1016 | self.processor.flush_metrics_summary = flush_metrics_summary |
1017 | - self.processor.flush() |
1018 | + list(self.processor.flush()) |
1019 | |
1020 | def test_flush_metrics_summary(self): |
1021 | """ |
1022 | @@ -215,14 +218,15 @@ |
1023 | self.processor.process_timings = {"c": 1} |
1024 | self.processor.by_type = {"c": 42} |
1025 | messages = [] |
1026 | - self.processor.flush_metrics_summary(messages, 1, per_metric, 42) |
1027 | + map(messages.extend, self.processor.flush_metrics_summary( |
1028 | + 1, per_metric, 42)) |
1029 | self.assertEqual(5, len(messages)) |
1030 | self.assertEqual([('statsd.numStats', 1, 42), |
1031 | ('statsd.flush.counter.count', 10, 42), |
1032 | ('statsd.flush.counter.duration', 1000, 42), |
1033 | ('statsd.receive.c.count', 42, 42), |
1034 | ('statsd.receive.c.duration', 1000, 42)], |
1035 | - messages) |
1036 | + messages) |
1037 | self.assertEquals({}, self.processor.process_timings) |
1038 | self.assertEquals({}, self.processor.by_type) |
1039 | |
1040 | @@ -238,7 +242,8 @@ |
1041 | Flushing the message processor when there are no stats available should |
1042 | still produce one message where C{statsd.numStats} is set to zero. |
1043 | """ |
1044 | - self.assertEqual(("statsd.numStats", 0, 42), self.processor.flush()[0]) |
1045 | + self.assertEqual(("statsd.numStats", 0, 42), |
1046 | + list(self.processor.flush())[0]) |
1047 | |
1048 | def test_flush_counter(self): |
1049 | """ |
1050 | @@ -246,7 +251,7 @@ |
1051 | normalized to the default interval. |
1052 | """ |
1053 | self.processor.counter_metrics["gorets"] = 42 |
1054 | - messages = self.processor.flush() |
1055 | + messages = list(self.processor.flush()) |
1056 | self.assertEqual(("stats.gorets", 4, 42), messages[0]) |
1057 | self.assertEqual(("stats_counts.gorets", 42, 42), messages[1]) |
1058 | self.assertEqual(("statsd.numStats", 1, 42), messages[2]) |
1059 | @@ -258,7 +263,7 @@ |
1060 | case the counter value will be unchanged. |
1061 | """ |
1062 | self.processor.counter_metrics["gorets"] = 42 |
1063 | - messages = self.processor.flush(interval=1000) |
1064 | + messages = list(self.processor.flush(interval=1000)) |
1065 | self.assertEqual(("stats.gorets", 42, 42), messages[0]) |
1066 | self.assertEqual(("stats_counts.gorets", 42, 42), messages[1]) |
1067 | self.assertEqual(("statsd.numStats", 1, 42), messages[2]) |
1068 | @@ -271,7 +276,7 @@ |
1069 | reset after flush is called. |
1070 | """ |
1071 | self.processor.timer_metrics["glork"] = [24] |
1072 | - messages = self.processor.flush() |
1073 | + messages = list(self.processor.flush()) |
1074 | self.assertEqual(("stats.timers.glork.count", 1, 42), messages[0]) |
1075 | self.assertEqual(("stats.timers.glork.lower", 24, 42), messages[1]) |
1076 | self.assertEqual(("stats.timers.glork.mean", 24, 42), messages[2]) |
1077 | @@ -290,7 +295,7 @@ |
1078 | - mean will be the mean value within the 90th percentile |
1079 | """ |
1080 | self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42] |
1081 | - messages = self.processor.flush() |
1082 | + messages = list(self.processor.flush()) |
1083 | self.assertEqual(("stats.timers.glork.count", 6, 42), messages[0]) |
1084 | self.assertEqual(("stats.timers.glork.lower", 4, 42), messages[1]) |
1085 | self.assertEqual(("stats.timers.glork.mean", 13, 42), messages[2]) |
1086 | @@ -312,7 +317,7 @@ |
1087 | - mean will be the mean value within the 50th percentile |
1088 | """ |
1089 | self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42] |
1090 | - messages = self.processor.flush(percent=50) |
1091 | + messages = list(self.processor.flush(percent=50)) |
1092 | self.assertEqual(("stats.timers.glork.count", 6, 42), messages[0]) |
1093 | self.assertEqual(("stats.timers.glork.lower", 4, 42), messages[1]) |
1094 | self.assertEqual(("stats.timers.glork.mean", 9, 42), messages[2]) |
1095 | @@ -329,7 +334,7 @@ |
1096 | |
1097 | self.processor.process("gorets:9.6|g") |
1098 | |
1099 | - messages = self.processor.flush() |
1100 | + messages = list(self.processor.flush()) |
1101 | self.assertEqual( |
1102 | ("stats.gauge.gorets.value", 9.6, 42), messages[0]) |
1103 | self.assertEqual( |
1104 | @@ -344,14 +349,14 @@ |
1105 | |
1106 | self.processor.process("gorets:item|pd") |
1107 | |
1108 | - messages = self.processor.flush() |
1109 | + messages = list(self.processor.flush()) |
1110 | self.assertEqual(("stats.pdistinct.gorets.count", 1, 42), messages[0]) |
1111 | self.assertEqual(("stats.pdistinct.gorets.count_1day", |
1112 | - 5552568545, 42), messages[1]) |
1113 | + 5552568545, 42), messages[1]) |
1114 | self.assertEqual(("stats.pdistinct.gorets.count_1hour", |
1115 | - 5552568545, 42), messages[2]) |
1116 | + 5552568545, 42), messages[2]) |
1117 | self.assertEqual(("stats.pdistinct.gorets.count_1min", |
1118 | - 5552568545, 42), messages[3]) |
1119 | + 5552568545, 42), messages[3]) |
1120 | |
1121 | def test_flush_plugin_arguments(self): |
1122 | """Test the passing of arguments for flush.""" |
1123 | @@ -362,9 +367,9 @@ |
1124 | return [] |
1125 | |
1126 | self.processor.plugin_metrics["somemetric"] = FakeMetric() |
1127 | - self.processor.flush(41000) |
1128 | - self.assertEquals((41, 42), |
1129 | - self.processor.plugin_metrics["somemetric"].data) |
1130 | + list(self.processor.flush(41000)) |
1131 | + self.assertEquals( |
1132 | + (41, 42), self.processor.plugin_metrics["somemetric"].data) |
1133 | |
1134 | |
1135 | class FlushMeterMetricMessagesTest(TestCase): |
1136 | @@ -384,7 +389,7 @@ |
1137 | self.processor.process("gorets:3.0|m") |
1138 | |
1139 | self.time_now += 1 |
1140 | - messages = self.processor.flush() |
1141 | + messages = list(self.processor.flush()) |
1142 | self.assertEqual( |
1143 | ("stats.meter.gorets.count", 3.0, self.time_now), |
1144 | messages[0]) |
1145 | @@ -396,7 +401,7 @@ |
1146 | messages[2]) |
1147 | |
1148 | self.time_now += 60 |
1149 | - messages = self.processor.flush() |
1150 | + messages = list(self.processor.flush()) |
1151 | self.assertEqual( |
1152 | ("stats.meter.gorets.count", 3.0, self.time_now), |
1153 | messages[0]) |
1154 | |
1155 | === modified file 'txstatsd/tests/test_service.py' |
1156 | --- txstatsd/tests/test_service.py 2013-03-06 21:50:26 +0000 |
1157 | +++ txstatsd/tests/test_service.py 2013-06-10 14:01:47 +0000 |
1158 | @@ -307,4 +307,3 @@ |
1159 | |
1160 | reactor.callWhenRunning(exercise) |
1161 | reactor.run() |
1162 | - |
nice!