Merge lp:~sidnei/txstatsd/cooperate into lp:txstatsd

Proposed by Sidnei da Silva
Status: Merged
Approved by: Guillermo Gonzalez
Approved revision: 113
Merged at revision: 108
Proposed branch: lp:~sidnei/txstatsd/cooperate
Merge into: lp:txstatsd
Diff against target: 1162 lines (+489/-176)
15 files modified
txstatsd/metrics/distinctmetric.py (+1/-1)
txstatsd/metrics/timermetric.py (+3/-3)
txstatsd/protocol.py (+4/-1)
txstatsd/report.py (+118/-0)
txstatsd/server/configurableprocessor.py (+6/-20)
txstatsd/server/loggingprocessor.py (+3/-4)
txstatsd/server/processor.py (+58/-81)
txstatsd/server/protocol.py (+16/-15)
txstatsd/service.py (+25/-13)
txstatsd/tests/test_client.py (+11/-3)
txstatsd/tests/test_configurableprocessor.py (+7/-7)
txstatsd/tests/test_inspector.py (+204/-0)
txstatsd/tests/test_loggingprocessor.py (+3/-2)
txstatsd/tests/test_processor.py (+30/-25)
txstatsd/tests/test_service.py (+0/-1)
To merge this branch: bzr merge lp:~sidnei/txstatsd/cooperate
Reviewer Review Type Date Requested Status
Guillermo Gonzalez Approve
Review via email: mp+168197@code.launchpad.net

Commit message

Use cooperator to yield more often and avoid blocking the reactor for too long.

Description of the change

Use cooperator to yield more often and avoid blocking the reactor for too long.

To post a comment you must log in.
lp:~sidnei/txstatsd/cooperate updated
113. By Sidnei da Silva

- Add reactor inspector tests

Revision history for this message
Guillermo Gonzalez (verterok) wrote :

nice!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txstatsd/metrics/distinctmetric.py'
2--- txstatsd/metrics/distinctmetric.py 2013-03-08 13:27:16 +0000
3+++ txstatsd/metrics/distinctmetric.py 2013-06-10 14:01:47 +0000
4@@ -165,7 +165,7 @@
5 ".count_1min": self.count_1min(now),
6 ".count_1hour": self.count_1hour(now),
7 ".count_1day": self.count_1day(now)}
8- for item, value in items.iteritems():
9+ for item, value in sorted(items.iteritems()):
10 metrics.append((self.prefix + self.name + item, value, timestamp))
11 return metrics
12
13
14=== modified file 'txstatsd/metrics/timermetric.py'
15--- txstatsd/metrics/timermetric.py 2012-11-20 11:56:07 +0000
16+++ txstatsd/metrics/timermetric.py 2013-06-10 14:01:47 +0000
17@@ -118,7 +118,7 @@
18 @param percentiles: One or more percentiles.
19 """
20 return [percentile for percentile in
21- self.histogram.percentiles(*percentiles)]
22+ self.histogram.percentiles(*percentiles)]
23
24 def get_values(self):
25 """Returns a list of all recorded durations in the timer's sample."""
26@@ -145,8 +145,8 @@
27 ".999percentile": percentiles[5],
28 ".count": self.count,
29 ".rate": self.rate(timestamp),
30- }
31- for item, value in items.iteritems():
32+ }
33+ for item, value in sorted(items.iteritems()):
34 metrics.append((self.prefix + self.name + item,
35 round(value, 6), timestamp))
36 self.clear(timestamp)
37
38=== modified file 'txstatsd/protocol.py'
39--- txstatsd/protocol.py 2012-12-27 20:49:06 +0000
40+++ txstatsd/protocol.py 2013-06-10 14:01:47 +0000
41@@ -21,7 +21,7 @@
42
43 import socket
44
45-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
46+from twisted.internet import abstract
47 from twisted.internet.protocol import DatagramProtocol
48 from twisted.python import log
49
50@@ -139,6 +139,9 @@
51 self.transport = None
52 self.transport_gateway = None
53
54+ if abstract.isIPAddress(host):
55+ self.host_resolved(host)
56+
57 def __str__(self):
58 return "%s:%d" % (self.host, self.port)
59
60
61=== modified file 'txstatsd/report.py'
62--- txstatsd/report.py 2012-06-28 17:29:26 +0000
63+++ txstatsd/report.py 2013-06-10 14:01:47 +0000
64@@ -19,6 +19,14 @@
65 # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
66 # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
67
68+import os
69+import sys
70+import time
71+import logging
72+import threading
73+import traceback
74+import Queue
75+
76 from twisted.internet.defer import maybeDeferred
77 from twisted.internet.task import LoopingCall
78 from twisted.python import log
79@@ -82,3 +90,113 @@
80 for task, interval in self.tasks:
81 task.stop()
82 Service.stopService(self)
83+
84+
85+class ReactorInspector(threading.Thread):
86+ """Log message with a time delta from the last call."""
87+
88+ def __init__(self, reactor_call, metrics, loop_time=3, log=log.msg):
89+ self.running = False
90+ self.stopped = False
91+ self.queue = Queue.Queue()
92+ self.reactor_call = reactor_call
93+ self.loop_time = loop_time
94+ self.last_responsive_ts = 0
95+ self.reactor_thread = None
96+ self.metrics = metrics
97+ super(ReactorInspector, self).__init__()
98+ self.daemon = True
99+ self.log = log
100+
101+ def start(self):
102+ """Start the thread. Should be called from the reactor main thread."""
103+ self.reactor_thread = threading.currentThread().ident
104+ if not self.running:
105+ self.running = True
106+ super(ReactorInspector, self).start()
107+
108+ def stop(self):
109+ """Stop the thread."""
110+ self.stopped = True
111+ self.log("ReactorInspector: stopped")
112+
113+ def dump_frames(self):
114+ """Dump frames info to log file."""
115+ current = threading.currentThread().ident
116+ frames = sys._current_frames()
117+ for frame_id, frame in frames.iteritems():
118+ if frame_id == current:
119+ continue
120+
121+ stack = ''.join(traceback.format_stack(frame))
122+
123+ if frame_id == self.reactor_thread:
124+ title = "Dumping Python frame for reactor main thread"
125+ else:
126+ title = "Dumping Python frame"
127+ self.log("%s %s (pid: %d):\n%s" %
128+ (title, frame_id, os.getpid(), stack),
129+ logLevel=logging.DEBUG)
130+
131+ def run(self):
132+ """Start running the thread."""
133+ self.log("ReactorInspector: started")
134+ msg_id = 0
135+ oldest_pending_request_ts = time.time()
136+ while not self.stopped:
137+ def task(msg_id=msg_id, tini=time.time()):
138+ """Put result in queue with initial and completed times."""
139+ self.queue.put((msg_id, tini, time.time()))
140+ self.reactor_call(task)
141+ time.sleep(self.loop_time)
142+ try:
143+ id_sent, tini, tsent = self.queue.get_nowait()
144+ except Queue.Empty:
145+ # Oldest pending request is still out there
146+ delay = time.time() - oldest_pending_request_ts
147+ self.metrics.gauge("delay", delay)
148+ self.log("ReactorInspector: detected unresponsive!"
149+ " (current: %d, pid: %d) delay: %.3f" % (
150+ msg_id, os.getpid(), delay),
151+ logLevel=logging.CRITICAL)
152+ self.dump_frames()
153+ else:
154+ delay = tsent - tini
155+ self.metrics.gauge("delay", delay)
156+ if msg_id > id_sent:
157+ self.log("ReactorInspector: late (current: %d, "
158+ "got: %d, pid: %d, cleaning queue) "
159+ "delay: %.3f" % (msg_id, id_sent,
160+ os.getpid(), delay),
161+ logLevel=logging.WARNING)
162+ while not self.queue.empty():
163+ self.queue.get_nowait()
164+ # About to start a new request with nothing pending
165+ oldest_pending_request_ts = time.time()
166+ else:
167+ assert msg_id == id_sent
168+ # About to start a new request with nothing pending
169+ self.last_responsive_ts = time.time()
170+ oldest_pending_request_ts = self.last_responsive_ts
171+ self.log("ReactorInspector: ok (msg: %d, "
172+ "pid: %d) delay: %.3f" % (
173+ msg_id, os.getpid(), delay),
174+ logLevel=logging.DEBUG)
175+ finally:
176+ msg_id += 1
177+
178+
179+class ReactorInspectorService(Service):
180+ """Start/stop the reactor inspector service."""
181+
182+ def __init__(self, reactor, metrics, loop_time=3):
183+ self.inspector = ReactorInspector(
184+ reactor.callFromThread, metrics, loop_time)
185+
186+ def startService(self):
187+ Service.startService(self)
188+ self.inspector.start()
189+
190+ def stopService(self):
191+ self.inspector.stop()
192+ Service.stopService(self)
193
194=== modified file 'txstatsd/server/configurableprocessor.py'
195--- txstatsd/server/configurableprocessor.py 2012-06-28 17:29:26 +0000
196+++ txstatsd/server/configurableprocessor.py 2013-06-10 14:01:47 +0000
197@@ -60,8 +60,9 @@
198
199 def compose_timer_metric(self, key, duration):
200 if not key in self.timer_metrics:
201- metric = TimerMetricReporter(key,
202- wall_time_func=self.time_function, prefix=self.message_prefix)
203+ metric = TimerMetricReporter(
204+ key, wall_time_func=self.time_function,
205+ prefix=self.message_prefix)
206 self.timer_metrics[key] = metric
207 self.timer_metrics[key].update(duration)
208
209@@ -93,31 +94,16 @@
210 self.meter_metrics[key].mark(value)
211
212 def flush_counter_metrics(self, interval, timestamp):
213- metrics = []
214- events = 0
215 for metric in self.counter_metrics.itervalues():
216 messages = metric.report(timestamp)
217- metrics.extend(messages)
218- events += 1
219-
220- return (metrics, events)
221+ yield messages
222
223 def flush_gauge_metrics(self, timestamp):
224- metrics = []
225- events = 0
226 for metric in self.gauge_metrics.itervalues():
227 messages = metric.report(timestamp)
228- metrics.extend(messages)
229- events += 1
230-
231- return (metrics, events)
232+ yield messages
233
234 def flush_timer_metrics(self, percent, timestamp):
235- metrics = []
236- events = 0
237 for metric in self.timer_metrics.itervalues():
238 messages = metric.report(timestamp)
239- metrics.extend(messages)
240- events += 1
241-
242- return (metrics, events)
243+ yield messages
244
245=== modified file 'txstatsd/server/loggingprocessor.py'
246--- txstatsd/server/loggingprocessor.py 2012-06-28 17:29:26 +0000
247+++ txstatsd/server/loggingprocessor.py 2013-06-10 14:01:47 +0000
248@@ -48,8 +48,7 @@
249
250 def flush(self, interval=10000, percent=90):
251 """Log all received metric samples to the supplied logger."""
252- messages = list(super(LoggingMessageProcessor, self).flush(
253- interval=interval, percent=percent))
254- for msg in messages:
255+ parent = super(LoggingMessageProcessor, self)
256+ for msg in parent.flush(interval=interval, percent=percent):
257 self.logger.info("Out: %s %s %s" % msg)
258- return messages
259+ yield msg
260
261=== modified file 'txstatsd/server/processor.py'
262--- txstatsd/server/processor.py 2012-06-28 17:29:26 +0000
263+++ txstatsd/server/processor.py 2013-06-10 14:01:47 +0000
264@@ -25,6 +25,7 @@
265 import logging
266
267 from twisted.python import log
268+from twisted.internet.task import Cooperator
269
270 from txstatsd.metrics.metermetric import MeterMetricReporter
271
272@@ -226,73 +227,75 @@
273 Flush all queued stats, computing a normalized count based on
274 C{interval} and mean timings based on C{threshold}.
275 """
276- messages = []
277 per_metric = {}
278 num_stats = 0
279 interval = interval / 1000
280 timestamp = int(self.time_function())
281
282 start = self.time_function()
283- counter_metrics, events = self.flush_counter_metrics(interval,
284- timestamp)
285+ events = 0
286+ for metrics in self.flush_counter_metrics(interval, timestamp):
287+ for metric in metrics:
288+ yield metric
289+ events += 1
290 duration = self.time_function() - start
291- if events > 0:
292- messages.extend(sorted(counter_metrics))
293- num_stats += events
294+ num_stats += events
295 per_metric["counter"] = (events, duration)
296
297 start = self.time_function()
298- timer_metrics, events = self.flush_timer_metrics(percent, timestamp)
299+ events = 0
300+ for metrics in self.flush_timer_metrics(percent, timestamp):
301+ for metric in metrics:
302+ yield metric
303+ events += 1
304 duration = self.time_function() - start
305- if events > 0:
306- messages.extend(sorted(timer_metrics))
307- num_stats += events
308+ num_stats += events
309 per_metric["timer"] = (events, duration)
310
311 start = self.time_function()
312- gauge_metrics, events = self.flush_gauge_metrics(timestamp)
313+ events = 0
314+ for metrics in self.flush_gauge_metrics(timestamp):
315+ for metric in metrics:
316+ yield metric
317+ events += 1
318 duration = self.time_function() - start
319- if events > 0:
320- messages.extend(sorted(gauge_metrics))
321- num_stats += events
322+ num_stats += events
323 per_metric["gauge"] = (events, duration)
324
325 start = self.time_function()
326- meter_metrics, events = self.flush_meter_metrics(timestamp)
327+ events = 0
328+ for metrics in self.flush_meter_metrics(timestamp):
329+ for metric in metrics:
330+ yield metric
331+ events += 1
332 duration = self.time_function() - start
333- if events > 0:
334- messages.extend(sorted(meter_metrics))
335- num_stats += events
336+ num_stats += events
337 per_metric["meter"] = (events, duration)
338
339 start = self.time_function()
340- plugin_metrics, events = self.flush_plugin_metrics(interval, timestamp)
341+ events = 0
342+ for metrics in self.flush_plugin_metrics(interval, timestamp):
343+ for metric in metrics:
344+ yield metric
345+ events += 1
346 duration = self.time_function() - start
347- if events > 0:
348- messages.extend(sorted(plugin_metrics))
349- num_stats += events
350+ num_stats += events
351 per_metric["plugin"] = (events, duration)
352
353- self.flush_metrics_summary(messages, num_stats, per_metric, timestamp)
354- return messages
355+ for metrics in self.flush_metrics_summary(num_stats, per_metric,
356+ timestamp):
357+ for metric in metrics:
358+ yield metric
359
360 def flush_counter_metrics(self, interval, timestamp):
361- metrics = []
362- events = 0
363 for key, count in self.counter_metrics.iteritems():
364 self.counter_metrics[key] = 0
365
366 value = count / interval
367- metrics.append((self.stats_prefix + key, value, timestamp))
368- metrics.append((self.count_prefix + key, count, timestamp))
369- events += 1
370-
371- return (metrics, events)
372+ yield ((self.stats_prefix + key, value, timestamp),
373+ (self.count_prefix + key, count, timestamp))
374
375 def flush_timer_metrics(self, percent, timestamp):
376- metrics = []
377- events = 0
378-
379 threshold_value = ((100 - percent) / 100.0)
380 for key, timers in self.timer_metrics.iteritems():
381 count = len(timers)
382@@ -318,78 +321,52 @@
383 ".upper_%s" % percent: threshold_upper,
384 ".lower": lower,
385 ".count": count}
386- for item, value in items.iteritems():
387- metrics.append((self.timer_prefix + key + item,
388- value, timestamp))
389- events += 1
390-
391- return (metrics, events)
392+ yield sorted((self.timer_prefix + key + item, value, timestamp)
393+ for item, value in items.iteritems())
394
395 def flush_gauge_metrics(self, timestamp):
396- metrics = []
397- events = 0
398 for metric in self.gauge_metrics:
399 value = metric[0]
400 key = metric[1]
401
402- metrics.append((self.gauge_prefix + key + ".value",
403- value, timestamp))
404- events += 1
405+ yield ((self.gauge_prefix + key + ".value", value, timestamp),)
406
407 self.gauge_metrics.clear()
408
409- return (metrics, events)
410-
411 def flush_meter_metrics(self, timestamp):
412- metrics = []
413- events = 0
414 for metric in self.meter_metrics.itervalues():
415 messages = metric.report(timestamp)
416- metrics.extend(messages)
417- events += 1
418-
419- return (metrics, events)
420+ yield messages
421
422 def flush_plugin_metrics(self, interval, timestamp):
423- metrics = []
424- events = 0
425-
426 for metric in self.plugin_metrics.itervalues():
427 messages = metric.flush(interval, timestamp)
428- metrics.extend(messages)
429- events += 1
430-
431- return (metrics, events)
432-
433- def flush_metrics_summary(self, messages, num_stats,
434- per_metric, timestamp):
435-
436- messages.append((self.internal_metrics_prefix + "numStats",
437- num_stats, timestamp))
438+ yield messages
439+
440+ def flush_metrics_summary(self, num_stats, per_metric, timestamp):
441+ yield ((self.internal_metrics_prefix + "numStats",
442+ num_stats, timestamp),)
443
444 self.last_flush_duration = 0
445 for name, (value, duration) in per_metric.iteritems():
446- messages.extend([
447- (self.internal_metrics_prefix +
448- "flush.%s.count" % name,
449- value, timestamp),
450- (self.internal_metrics_prefix +
451- "flush.%s.duration" % name,
452- duration * 1000, timestamp)])
453+ yield ((self.internal_metrics_prefix +
454+ "flush.%s.count" % name,
455+ value, timestamp),
456+ (self.internal_metrics_prefix +
457+ "flush.%s.duration" % name,
458+ duration * 1000, timestamp))
459 log.msg("Flushed %d %s metrics in %.6f" %
460 (value, name, duration))
461 self.last_flush_duration += duration
462
463 self.last_process_duration = 0
464 for metric_type, duration in self.process_timings.iteritems():
465- messages.extend([
466- (self.internal_metrics_prefix +
467- "receive.%s.count" %
468- metric_type, self.by_type[metric_type], timestamp),
469- (self.internal_metrics_prefix +
470- "receive.%s.duration" %
471- metric_type, duration * 1000, timestamp)
472- ])
473+ yield ((self.internal_metrics_prefix +
474+ "receive.%s.count" %
475+ metric_type, self.by_type[metric_type], timestamp),
476+ (self.internal_metrics_prefix +
477+ "receive.%s.duration" %
478+ metric_type, duration * 1000, timestamp))
479 log.msg("Processing %d %s metrics took %.6f" %
480 (self.by_type[metric_type], metric_type, duration))
481 self.last_process_duration += duration
482
483=== modified file 'txstatsd/server/protocol.py'
484--- txstatsd/server/protocol.py 2012-06-28 17:29:26 +0000
485+++ txstatsd/server/protocol.py 2013-06-10 14:01:47 +0000
486@@ -31,8 +31,8 @@
487 server via TCP.
488 """
489
490- def __init__(self, processor,
491- monitor_message=None, monitor_response=None):
492+ def __init__(self, processor, monitor_message=None,
493+ monitor_response=None):
494 self.processor = processor
495 self.monitor_message = monitor_message
496 self.monitor_response = monitor_response
497@@ -42,9 +42,10 @@
498 if data == self.monitor_message:
499 # Send the expected response to the
500 # monitoring agent.
501- self.transport.write(self.monitor_response, (host, port))
502- else:
503- self.processor.process(data)
504+ return self.transport.write(
505+ self.monitor_response, (host, port))
506+ return self.transport.reactor.callLater(
507+ 0, self.processor.process, data)
508
509
510 class StatsDTCPServerProtocol(LineReceiver):
511@@ -54,8 +55,8 @@
512 server via TCP.
513 """
514
515- def __init__(self, processor,
516- monitor_message=None, monitor_response=None):
517+ def __init__(self, processor, monitor_message=None,
518+ monitor_response=None):
519 self.processor = processor
520 self.monitor_message = monitor_message
521 self.monitor_response = monitor_response
522@@ -65,20 +66,20 @@
523 if data == self.monitor_message:
524 # Send the expected response to the
525 # monitoring agent.
526- self.transport.write(self.monitor_response)
527- else:
528- self.processor.process(data)
529+ return self.transport.write(self.monitor_response)
530+ return self.transport.reactor.callLater(
531+ 0, self.processor.process, data)
532
533
534 class StatsDTCPServerFactory(Factory):
535
536- def __init__(self, processor,
537- monitor_message=None, monitor_response=None):
538+ def __init__(self, processor, monitor_message=None,
539+ monitor_response=None):
540 self.processor = processor
541 self.monitor_message = monitor_message
542 self.monitor_response = monitor_response
543
544 def buildProtocol(self, addr):
545- return StatsDTCPServerProtocol(self.processor,
546- self.monitor_message, self.monitor_response)
547-
548+ return StatsDTCPServerProtocol(
549+ self.processor, self.monitor_message,
550+ self.monitor_response)
551
552=== modified file 'txstatsd/service.py'
553--- txstatsd/service.py 2012-06-28 17:29:26 +0000
554+++ txstatsd/service.py 2013-06-10 14:01:47 +0000
555@@ -41,7 +41,7 @@
556 StatsDServerProtocol, StatsDTCPServerFactory)
557 from txstatsd.server.router import Router
558 from txstatsd.server import httpinfo
559-from txstatsd.report import ReportingService
560+from txstatsd.report import ReportingService, ReactorInspectorService
561 from txstatsd.itxstatsd import IMetricFactory
562 from twisted.application.service import Service
563 from twisted.internet import task
564@@ -211,19 +211,26 @@
565 self.processor = processor
566 self.flush_interval = flush_interval
567 self.flush_task = task.LoopingCall(self.flushProcessor)
568+ self.coop = task.Cooperator()
569 if clock is not None:
570 self.flush_task.clock = clock
571
572 def flushProcessor(self):
573 """Flush messages queued in the processor to Graphite."""
574- flushed = 0
575 start = time.time()
576- for metric, value, timestamp in self.processor.flush(
577- interval=self.flush_interval):
578- self.carbon_client.sendDatapoint(metric, (timestamp, value))
579- flushed += 1
580- log.msg("Flushed total %d metrics in %.6f" %
581- (flushed, time.time() - start))
582+ interval = self.flush_interval
583+ flush = self.processor.flush
584+
585+ def doWork():
586+ flushed = 0
587+ for metric, value, timestamp in flush(interval=interval):
588+ yield self.carbon_client.sendDatapoint(
589+ metric, (timestamp, value))
590+ flushed += 1
591+ log.msg("Flushed total %d metrics in %.6f" %
592+ (flushed, time.time() - start))
593+
594+ self.coop.coiterate(doWork())
595
596 def startService(self):
597 self.flush_task.start(self.flush_interval / 1000, False)
598@@ -276,7 +283,7 @@
599 # LoggingMessageProcessor supersedes
600 # any other processor class in "dump-mode"
601 assert not hasattr(log, 'info')
602- log.info = log.msg # for compatibility with LMP logger interface
603+ log.info = log.msg # for compatibility with LMP logger interface
604 processor = functools.partial(LoggingMessageProcessor, logger=log)
605
606 if options["statsd-compliance"]:
607@@ -315,6 +322,11 @@
608 process.report_reactor_stats(reactor), 60, metrics.gauge)
609 reports = [name.strip() for name in options["report"].split(",")]
610 for report_name in reports:
611+ if report_name == "reactor":
612+ inspector = ReactorInspectorService(reactor, metrics,
613+ loop_time=0.05)
614+ inspector.setServiceParent(root_service)
615+
616 for reporter in getattr(process, "%s_STATS" %
617 report_name.upper(), ()):
618 reporting.schedule(reporter, 60, metrics.gauge)
619@@ -343,12 +355,12 @@
620
621 if options["listen-tcp-port"] is not None:
622 statsd_tcp_server_factory = StatsDTCPServerFactory(
623- input_router,
624- monitor_message=options["monitor-message"],
625- monitor_response=options["monitor-response"])
626+ input_router,
627+ monitor_message=options["monitor-message"],
628+ monitor_response=options["monitor-response"])
629
630 listener = TCPServer(options["listen-tcp-port"],
631- statsd_tcp_server_factory)
632+ statsd_tcp_server_factory)
633 listener.setServiceParent(root_service)
634
635 httpinfo_service = httpinfo.makeService(options, processor, statsd_service)
636
637=== modified file 'txstatsd/tests/test_client.py'
638--- txstatsd/tests/test_client.py 2013-03-07 16:33:45 +0000
639+++ txstatsd/tests/test_client.py 2013-06-10 14:01:47 +0000
640@@ -198,11 +198,18 @@
641
642 self.assertIsInstance(self.client.data_queue, DataQueue)
643
644- def test_starts_without_transport_gateway(self):
645+ def test_starts_with_transport_gateway_if_ip(self):
646 """The client starts without a TransportGateway."""
647 self.client = TwistedStatsDClient('127.0.0.1', 8000)
648 self.build_protocol()
649
650+ self.assertTrue(self.client.transport_gateway is not None)
651+
652+ def test_starts_without_transport_gateway_if_not_ip(self):
653+ """The client starts without a TransportGateway."""
654+ self.client = TwistedStatsDClient('localhost', 8000)
655+ self.build_protocol()
656+
657 self.assertTrue(self.client.transport_gateway is None)
658
659 def test_passes_transport_to_gateway(self):
660@@ -269,7 +276,8 @@
661 callback = Mock()
662 self.client.transport_gateway.write.return_value = bytes_sent
663 self.assertEqual(self.client.write(message, callback), bytes_sent)
664- self.client.transport_gateway.write.assert_called_once_with(message, callback)
665+ self.client.transport_gateway.write.assert_called_once_with(
666+ message, callback)
667
668 def test_sends_messages_to_queue_before_host_resolves(self):
669 """Before the host is resolved, send messages to the DataQueue."""
670@@ -277,7 +285,7 @@
671 self.build_protocol()
672
673 message = 'some data'
674- self.client.data_queue = Mock(spec=DataQueue)
675+ self.client.data_queue = Mock(spec=DataQueue)
676 callback = Mock()
677 self.client.data_queue.write.return_value = None
678 result = self.client.write(message, callback)
679
680=== modified file 'txstatsd/tests/test_configurableprocessor.py'
681--- txstatsd/tests/test_configurableprocessor.py 2012-06-28 17:29:26 +0000
682+++ txstatsd/tests/test_configurableprocessor.py 2013-06-10 14:01:47 +0000
683@@ -40,7 +40,7 @@
684 configurable_processor = ConfigurableMessageProcessor(
685 time_function=lambda: 42)
686 configurable_processor.process("gorets:17|c")
687- messages = configurable_processor.flush()
688+ messages = list(configurable_processor.flush())
689 self.assertEqual(("gorets.count", 17, 42), messages[0])
690 self.assertEqual(("statsd.numStats", 1, 42), messages[1])
691
692@@ -51,7 +51,7 @@
693 configurable_processor = ConfigurableMessageProcessor(
694 time_function=lambda: 42, message_prefix="test.metric")
695 configurable_processor.process("gorets:17|c")
696- messages = configurable_processor.flush()
697+ messages = list(configurable_processor.flush())
698 self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0])
699 self.assertEqual(("test.metric.statsd.numStats", 1, 42),
700 messages[1])
701@@ -64,7 +64,7 @@
702 time_function=lambda: 42, message_prefix="test.metric",
703 internal_metrics_prefix="statsd.foo.")
704 configurable_processor.process("gorets:17|c")
705- messages = configurable_processor.flush()
706+ messages = list(configurable_processor.flush())
707 self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0])
708 self.assertEqual(("statsd.foo.numStats", 1, 42),
709 messages[1])
710@@ -77,7 +77,7 @@
711 time_function=lambda: 42, message_prefix="test.metric",
712 plugins=[distinct_metric_factory])
713 configurable_processor.process("gorets:17|pd")
714- messages = configurable_processor.flush()
715+ messages = list(configurable_processor.flush())
716 self.assertEquals(("test.metric.gorets.count", 1, 42), messages[0])
717
718 def test_flush_single_timer_single_time(self):
719@@ -94,7 +94,7 @@
720 configurable_processor.process("glork:24|ms")
721 _now = 42
722
723- messages = configurable_processor.flush()
724+ messages = list(configurable_processor.flush())
725 messages.sort()
726
727 expected = [
728@@ -128,7 +128,7 @@
729 configurable_processor.process("glork:42|ms")
730
731 _now = 42
732- messages = configurable_processor.flush()
733+ messages = list(configurable_processor.flush())
734 messages.sort()
735
736 expected = [
737@@ -169,7 +169,7 @@
738 self.configurable_processor.process("gorets:3.0|m")
739
740 self.time_now += 1
741- messages = self.configurable_processor.flush()
742+ messages = list(self.configurable_processor.flush())
743 self.assertEqual(("test.metric.gorets.count", 3.0, self.time_now),
744 messages[0])
745 self.assertEqual(("test.metric.gorets.rate", 3.0, self.time_now),
746
747=== added file 'txstatsd/tests/test_inspector.py'
748--- txstatsd/tests/test_inspector.py 1970-01-01 00:00:00 +0000
749+++ txstatsd/tests/test_inspector.py 2013-06-10 14:01:47 +0000
750@@ -0,0 +1,204 @@
751+# Copyright (C) 2011-2012 Canonical Services Ltd
752+#
753+# Permission is hereby granted, free of charge, to any person obtaining
754+# a copy of this software and associated documentation files (the
755+# "Software"), to deal in the Software without restriction, including
756+# without limitation the rights to use, copy, modify, merge, publish,
757+# distribute, sublicense, and/or sell copies of the Software, and to
758+# permit persons to whom the Software is furnished to do so, subject to
759+# the following conditions:
760+#
761+# The above copyright notice and this permission notice shall be
762+# included in all copies or substantial portions of the Software.
763+#
764+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
765+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
766+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
767+# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
768+# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
769+# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
770+# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
771+"""Tests for the ReactorInspector."""
772+
773+import re
774+import time
775+import logging
776+import threading
777+
778+from twisted.trial.unittest import TestCase as TwistedTestCase
779+from twisted.internet import reactor, defer
780+
781+from txstatsd.report import ReactorInspector
782+
783+
784+def parse_delay(msg):
785+ return float(re.search("delay: (\d+.\d{1,3})", msg).group(1))
786+
787+
788+class ReactorInspectorTestCase(TwistedTestCase):
789+ """Test the ReactorInspector class."""
790+
791+ def setUp(self):
792+ """Set up."""
793+
794+ class Helper(object):
795+ """Fake object with a controllable call."""
796+ def __init__(self):
797+ self.call_count = 1
798+ self.calls = []
799+ self.ri = None
800+
801+ def call(self, func):
802+ """Call function when counter is 0, then stop running."""
803+ self.call_count -= 1
804+ self.calls.append(func)
805+ if self.call_count == 0:
806+ for f in self.calls:
807+ f()
808+ if self.call_count <= 0:
809+ self.ri.stop()
810+
811+ class FakeMetrics(object):
812+ """Fake Metrics object that records calls."""
813+ def __init__(self):
814+ """Initialize calls."""
815+ self.calls = []
816+
817+ def meter(self, name, count):
818+ """Record call to meter()."""
819+ self.calls.append(("meter", name, count))
820+
821+ def gauge(self, name, val):
822+ """Record call to gauge()."""
823+ self.calls.append(("gauge", name, round(val, 3)))
824+
825+ def log(msg, logLevel=None):
826+ self.logged.append((msg, logLevel))
827+
828+ self.helper = Helper()
829+ self.fake_metrics = FakeMetrics()
830+ self.ri = ReactorInspector(self.helper.call, self.fake_metrics,
831+ loop_time=.1, log=log)
832+ self.helper.ri = self.ri
833+ self.logged = []
834+
835+ def check_log(self, *expected, **kw):
836+ logLevel = kw.get("logLevel", None)
837+ for (msg, level) in self.logged:
838+ if level == logLevel and all(m in msg for m in expected):
839+ return msg
840+ return False
841+
842+ def run_ri(self, call_count=None, join=True):
843+ """Set the call count and then run the ReactorInspector."""
844+ if call_count is not None:
845+ self.helper.call_count = call_count
846+ # pylint: disable=W0201
847+ self.start_ts = time.time()
848+ self.ri.start()
849+ # Reactor will stop after call_count calls, thanks to helper
850+ if join:
851+ self.ri.join()
852+
853+ def test_stop(self):
854+ """It stops."""
855+ self.run_ri(1000, join=False)
856+ assert self.ri.is_alive()
857+ self.ri.stop()
858+ self.ri.join()
859+ self.assertFalse(self.ri.is_alive())
860+
861+ @defer.inlineCallbacks
862+ def test_dump_frames(self):
863+ """Test how frames are dumped.
864+
865+ Rules:
866+ - own frame must not be logged
867+ - must log all other threads
868+ - main reactor thread must have special title
869+ """
870+ # other thread, whose frame must be logged
871+ waitingd = defer.Deferred()
872+
873+ def waiting_function():
874+ """Function with funny name to be checked later."""
875+ reactor.callFromThread(waitingd.callback, True)
876+ # wait have a default value; pylint: disable=E1120
877+ event.wait()
878+
879+ event = threading.Event()
880+ threading.Thread(target=waiting_function).start()
881+ # Make sure the thread has entered the waiting_function
882+ yield waitingd
883+
884+ # Set reactor_thread since we're not starting the ReactorInspector
885+ # thread here.
886+ self.ri.reactor_thread = threading.currentThread().ident
887+
888+ # dump frames in other thread, also
889+ def dumping_function():
890+ """Function with funny name to be checked later."""
891+ time.sleep(.1)
892+ self.ri.dump_frames()
893+ reactor.callFromThread(d.callback, True)
894+
895+ d = defer.Deferred()
896+ threading.Thread(target=dumping_function).start()
897+ yield d
898+ event.set()
899+
900+ # check
901+ self.assertFalse(self.check_log("dumping_function",
902+ logLevel=logging.DEBUG))
903+ self.assertTrue(self.check_log("Dumping Python frame",
904+ "waiting_function",
905+ logLevel=logging.DEBUG))
906+ self.assertTrue(self.check_log("Dumping Python frame",
907+ "reactor main thread",
908+ logLevel=logging.DEBUG))
909+
910+ def test_reactor_ok(self):
911+ """Reactor working fast."""
912+ self.run_ri()
913+ ok_line = self.assertTrue(self.check_log("ReactorInspector: ok",
914+ logLevel=logging.DEBUG))
915+ # Check the metrics
916+ delay = parse_delay(ok_line)
917+ expected_metric = ("gauge", "delay", delay)
918+ self.assertEqual([expected_metric], self.fake_metrics.calls)
919+ self.assertTrue(self.ri.last_responsive_ts >= self.start_ts)
920+
921+ @defer.inlineCallbacks
922+ def test_reactor_blocked(self):
923+ """Reactor not working fast."""
924+ dump_frames_called = defer.Deferred()
925+ self.ri.dump_frames = lambda: dump_frames_called.callback(True)
926+ self.run_ri(0)
927+ yield dump_frames_called
928+ log_line = self.check_log("ReactorInspector",
929+ "detected unresponsive",
930+ logLevel=logging.CRITICAL)
931+ self.assertTrue(log_line)
932+ delay = parse_delay(log_line)
933+ self.assertTrue(delay >= .1) # waited for entire loop time
934+ # Check the metrics
935+ expected_metric = ("gauge", "delay", delay)
936+ self.assertEqual([expected_metric], self.fake_metrics.calls)
937+
938+ self.assertTrue(self.ri.last_responsive_ts < self.start_ts)
939+
940+ def test_reactor_back_alive(self):
941+ """Reactor resurrects after some loops."""
942+ self.run_ri(3)
943+ late_line = self.check_log("ReactorInspector: late",
944+ "got: 0", logLevel=logging.WARNING)
945+ self.assertTrue(late_line)
946+ delay = parse_delay(late_line)
947+ self.assertTrue(delay >= .2) # At least 2 cycles of delay
948+ # Check the metrics
949+ expected_metric = ("gauge", "delay", delay)
950+ self.assertEqual(expected_metric, self.fake_metrics.calls[-1])
951+
952+ self.assertTrue(self.ri.queue.empty())
953+ # A late reactor is not considered responsive (until a successful loop)
954+ self.assertTrue(self.ri.last_responsive_ts < self.start_ts)
955
956=== modified file 'txstatsd/tests/test_loggingprocessor.py'
957--- txstatsd/tests/test_loggingprocessor.py 2012-06-28 17:29:26 +0000
958+++ txstatsd/tests/test_loggingprocessor.py 2013-06-10 14:01:47 +0000
959@@ -30,6 +30,7 @@
960 def report(self, *args):
961 return [('Sample report', 1, 2)]
962
963+
964 class TestLogger(object):
965 def __init__(self):
966 self.log = ''
967@@ -63,7 +64,7 @@
968 processor = LoggingMessageProcessor(logger)
969 metric = FakeMeterMetric()
970 processor.meter_metrics['test'] = metric
971- processor.flush()
972+ list(processor.flush())
973 expected = ["Out: %s %s %s" % message
974 for message in metric.report()]
975 self.assertFalse(set(expected).difference(logger.log.splitlines()))
976@@ -75,7 +76,7 @@
977 time_function=lambda: 42)
978 msg_in = "gorets:17|pd"
979 processor.process(msg_in)
980- processor.flush()
981+ list(processor.flush())
982 messages = processor.plugin_metrics['gorets'].flush(
983 10, processor.time_function())
984 expected = ["In: %s" % msg_in] + ["Out: %s %s %s" % message
985
986=== modified file 'txstatsd/tests/test_processor.py'
987--- txstatsd/tests/test_processor.py 2012-06-28 17:29:26 +0000
988+++ txstatsd/tests/test_processor.py 2013-06-10 14:01:47 +0000
989@@ -189,22 +189,25 @@
990 flushed.
991 """
992 self.timer.set([0,
993- 0, 1, # counter
994- 1, 3, # timer
995- 3, 6, # gauge
996- 6, 10, # meter
997- 10, 15, # plugin
998+ 0, 1, # counter
999+ 1, 3, # timer
1000+ 3, 6, # gauge
1001+ 6, 10, # meter
1002+ 10, 15, # plugin
1003 ])
1004- def flush_metrics_summary(messages, num_stats, per_metric, timestamp):
1005+
1006+ def flush_metrics_summary(num_stats, per_metric, timestamp):
1007 self.assertEqual((0, 1), per_metric["counter"])
1008 self.assertEqual((0, 2), per_metric["timer"])
1009 self.assertEqual((0, 3), per_metric["gauge"])
1010 self.assertEqual((0, 4), per_metric["meter"])
1011 self.assertEqual((0, 5), per_metric["plugin"])
1012+ yield ()
1013+
1014 self.addCleanup(setattr, self.processor, "flush_metrics_summary",
1015 self.processor.flush_metrics_summary)
1016 self.processor.flush_metrics_summary = flush_metrics_summary
1017- self.processor.flush()
1018+ list(self.processor.flush())
1019
1020 def test_flush_metrics_summary(self):
1021 """
1022@@ -215,14 +218,15 @@
1023 self.processor.process_timings = {"c": 1}
1024 self.processor.by_type = {"c": 42}
1025 messages = []
1026- self.processor.flush_metrics_summary(messages, 1, per_metric, 42)
1027+ map(messages.extend, self.processor.flush_metrics_summary(
1028+ 1, per_metric, 42))
1029 self.assertEqual(5, len(messages))
1030 self.assertEqual([('statsd.numStats', 1, 42),
1031 ('statsd.flush.counter.count', 10, 42),
1032 ('statsd.flush.counter.duration', 1000, 42),
1033 ('statsd.receive.c.count', 42, 42),
1034 ('statsd.receive.c.duration', 1000, 42)],
1035- messages)
1036+ messages)
1037 self.assertEquals({}, self.processor.process_timings)
1038 self.assertEquals({}, self.processor.by_type)
1039
1040@@ -238,7 +242,8 @@
1041 Flushing the message processor when there are no stats available should
1042 still produce one message where C{statsd.numStats} is set to zero.
1043 """
1044- self.assertEqual(("statsd.numStats", 0, 42), self.processor.flush()[0])
1045+ self.assertEqual(("statsd.numStats", 0, 42),
1046+ list(self.processor.flush())[0])
1047
1048 def test_flush_counter(self):
1049 """
1050@@ -246,7 +251,7 @@
1051 normalized to the default interval.
1052 """
1053 self.processor.counter_metrics["gorets"] = 42
1054- messages = self.processor.flush()
1055+ messages = list(self.processor.flush())
1056 self.assertEqual(("stats.gorets", 4, 42), messages[0])
1057 self.assertEqual(("stats_counts.gorets", 42, 42), messages[1])
1058 self.assertEqual(("statsd.numStats", 1, 42), messages[2])
1059@@ -258,7 +263,7 @@
1060 case the counter value will be unchanged.
1061 """
1062 self.processor.counter_metrics["gorets"] = 42
1063- messages = self.processor.flush(interval=1000)
1064+ messages = list(self.processor.flush(interval=1000))
1065 self.assertEqual(("stats.gorets", 42, 42), messages[0])
1066 self.assertEqual(("stats_counts.gorets", 42, 42), messages[1])
1067 self.assertEqual(("statsd.numStats", 1, 42), messages[2])
1068@@ -271,7 +276,7 @@
1069 reset after flush is called.
1070 """
1071 self.processor.timer_metrics["glork"] = [24]
1072- messages = self.processor.flush()
1073+ messages = list(self.processor.flush())
1074 self.assertEqual(("stats.timers.glork.count", 1, 42), messages[0])
1075 self.assertEqual(("stats.timers.glork.lower", 24, 42), messages[1])
1076 self.assertEqual(("stats.timers.glork.mean", 24, 42), messages[2])
1077@@ -290,7 +295,7 @@
1078 - mean will be the mean value within the 90th percentile
1079 """
1080 self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42]
1081- messages = self.processor.flush()
1082+ messages = list(self.processor.flush())
1083 self.assertEqual(("stats.timers.glork.count", 6, 42), messages[0])
1084 self.assertEqual(("stats.timers.glork.lower", 4, 42), messages[1])
1085 self.assertEqual(("stats.timers.glork.mean", 13, 42), messages[2])
1086@@ -312,7 +317,7 @@
1087 - mean will be the mean value within the 50th percentile
1088 """
1089 self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42]
1090- messages = self.processor.flush(percent=50)
1091+ messages = list(self.processor.flush(percent=50))
1092 self.assertEqual(("stats.timers.glork.count", 6, 42), messages[0])
1093 self.assertEqual(("stats.timers.glork.lower", 4, 42), messages[1])
1094 self.assertEqual(("stats.timers.glork.mean", 9, 42), messages[2])
1095@@ -329,7 +334,7 @@
1096
1097 self.processor.process("gorets:9.6|g")
1098
1099- messages = self.processor.flush()
1100+ messages = list(self.processor.flush())
1101 self.assertEqual(
1102 ("stats.gauge.gorets.value", 9.6, 42), messages[0])
1103 self.assertEqual(
1104@@ -344,14 +349,14 @@
1105
1106 self.processor.process("gorets:item|pd")
1107
1108- messages = self.processor.flush()
1109+ messages = list(self.processor.flush())
1110 self.assertEqual(("stats.pdistinct.gorets.count", 1, 42), messages[0])
1111 self.assertEqual(("stats.pdistinct.gorets.count_1day",
1112- 5552568545, 42), messages[1])
1113+ 5552568545, 42), messages[1])
1114 self.assertEqual(("stats.pdistinct.gorets.count_1hour",
1115- 5552568545, 42), messages[2])
1116+ 5552568545, 42), messages[2])
1117 self.assertEqual(("stats.pdistinct.gorets.count_1min",
1118- 5552568545, 42), messages[3])
1119+ 5552568545, 42), messages[3])
1120
1121 def test_flush_plugin_arguments(self):
1122 """Test the passing of arguments for flush."""
1123@@ -362,9 +367,9 @@
1124 return []
1125
1126 self.processor.plugin_metrics["somemetric"] = FakeMetric()
1127- self.processor.flush(41000)
1128- self.assertEquals((41, 42),
1129- self.processor.plugin_metrics["somemetric"].data)
1130+ list(self.processor.flush(41000))
1131+ self.assertEquals(
1132+ (41, 42), self.processor.plugin_metrics["somemetric"].data)
1133
1134
1135 class FlushMeterMetricMessagesTest(TestCase):
1136@@ -384,7 +389,7 @@
1137 self.processor.process("gorets:3.0|m")
1138
1139 self.time_now += 1
1140- messages = self.processor.flush()
1141+ messages = list(self.processor.flush())
1142 self.assertEqual(
1143 ("stats.meter.gorets.count", 3.0, self.time_now),
1144 messages[0])
1145@@ -396,7 +401,7 @@
1146 messages[2])
1147
1148 self.time_now += 60
1149- messages = self.processor.flush()
1150+ messages = list(self.processor.flush())
1151 self.assertEqual(
1152 ("stats.meter.gorets.count", 3.0, self.time_now),
1153 messages[0])
1154
1155=== modified file 'txstatsd/tests/test_service.py'
1156--- txstatsd/tests/test_service.py 2013-03-06 21:50:26 +0000
1157+++ txstatsd/tests/test_service.py 2013-06-10 14:01:47 +0000
1158@@ -307,4 +307,3 @@
1159
1160 reactor.callWhenRunning(exercise)
1161 reactor.run()
1162-

Subscribers

People subscribed via source and target branches