Merge lp:~sidnei/txstatsd/derive-metric into lp:txstatsd

Proposed by Sidnei da Silva
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 98
Merged at revision: 92
Proposed branch: lp:~sidnei/txstatsd/derive-metric
Merge into: lp:txstatsd
Diff against target: 512 lines (+121/-160)
11 files modified
txstatsd/metrics/metermetric.py (+34/-53)
txstatsd/metrics/timermetric.py (+0/-3)
txstatsd/server/configurableprocessor.py (+0/-6)
txstatsd/server/loggingprocessor.py (+15/-23)
txstatsd/server/processor.py (+0/-4)
txstatsd/service.py (+15/-5)
txstatsd/tests/metrics/test_metermetric.py (+33/-0)
txstatsd/tests/test_configurableprocessor.py (+2/-14)
txstatsd/tests/test_loggingprocessor.py (+8/-9)
txstatsd/tests/test_processor.py (+7/-34)
txstatsd/tests/test_service.py (+7/-9)
To merge this branch: bzr merge lp:~sidnei/txstatsd/derive-metric
Reviewer Review Type Date Requested Status
Lucio Torre (community) Approve
Review via email: mp+107268@code.launchpad.net

Commit message

Replace meter metric by a simpler direct rate metric not using EWMA.

Description of the change

Replace meter metric by a simpler direct rate metric not using EWMA.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve
lp:~sidnei/txstatsd/derive-metric updated
95. By Sidnei da Silva

- Use provided timestamp, cleanup some formatting.

96. By Sidnei da Silva

- Restore accidentally deleted file

97. By Sidnei da Silva

- More formatting fixes

98. By Mike Kazantsev

Add Mike Kazantsev as author.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txstatsd/metrics/metermetric.py'
2--- txstatsd/metrics/metermetric.py 2012-02-08 23:30:35 +0000
3+++ txstatsd/metrics/metermetric.py 2012-05-24 19:44:17 +0000
4@@ -1,17 +1,12 @@
5 import time
6
7 from txstatsd.metrics.metric import Metric
8-from txstatsd.stats.ewma import Ewma
9
10
11 class MeterMetric(Metric):
12 """
13- A meter metric which measures mean throughput and one-, five-, and
14- fifteen-minute exponentially-weighted moving average throughputs.
15-
16- See:
17- - U{EMA
18- <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>}
19+ A simplier meter metric which measures instant throughput rate for each
20+ interval.
21 """
22
23 def __init__(self, connection, name, sample_rate=1):
24@@ -33,12 +28,8 @@
25
26 class MeterMetricReporter(object):
27 """
28- A meter metric which measures mean throughput and one-, five-, and
29- fifteen-minute exponentially-weighted moving average throughputs.
30-
31- See:
32- - U{EMA
33- <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>}
34+ A simplier meter metric which measures instant throughput rate for each
35+ interval.
36 """
37
38 def __init__(self, name, wall_time_func=time.time, prefix=""):
39@@ -54,50 +45,40 @@
40 prefix += "."
41 self.prefix = prefix
42
43- self.m1_rate = Ewma.one_minute_ewma()
44- self.m5_rate = Ewma.five_minute_ewma()
45- self.m15_rate = Ewma.fifteen_minute_ewma()
46- self.count = 0
47- self.start_time = self.wall_time_func()
48-
49- def mark(self, value=1):
50- """Mark the occurrence of a given number of events."""
51- self.count += value
52- self.m1_rate.update(value)
53- self.m5_rate.update(value)
54- self.m15_rate.update(value)
55-
56- def tick(self):
57- """Updates the moving averages."""
58- self.m1_rate.tick()
59- self.m5_rate.tick()
60- self.m15_rate.tick()
61+ self.value = self.count = 0
62+ self.poll_time = self.wall_time_func()
63+
64+ def mark(self, value):
65+ """
66+ Process new data for this metric.
67+
68+ @type value: C{float}
69+ @param value: The reported value, to be aggregate into the meter.
70+ """
71+ self.value += value
72
73 def report(self, timestamp):
74+ """
75+ Returns a list of metrics to report.
76+
77+ @type timestamp: C{float}
78+ @param timestamp: The timestamp for now.
79+ """
80+ poll_prev, self.poll_time = self.poll_time, timestamp
81+
82+ if self.poll_time == poll_prev:
83+ return list()
84+
85+ rate = float(self.value) / (self.poll_time - poll_prev)
86+ self.count, self.value = self.count + self.value, 0
87+
88 metrics = []
89- items = {".count": self.count,
90- ".mean_rate": self.mean_rate(),
91- ".1min_rate": self.one_minute_rate(),
92- ".5min_rate": self.five_minute_rate(),
93- ".15min_rate": self.fifteen_minute_rate()}
94+ items = {
95+ ".count": self.count,
96+ ".rate": rate
97+ }
98
99- for item, value in items.iteritems():
100+ for item, value in sorted(items.iteritems()):
101 metrics.append((self.prefix + self.name + item,
102 round(value, 6), timestamp))
103 return metrics
104-
105- def fifteen_minute_rate(self):
106- return self.m15_rate.rate
107-
108- def five_minute_rate(self):
109- return self.m5_rate.rate
110-
111- def one_minute_rate(self):
112- return self.m1_rate.rate
113-
114- def mean_rate(self):
115- if self.count == 0:
116- return 0.0
117- else:
118- elapsed = self.wall_time_func() - self.start_time
119- return float(self.count) / elapsed
120
121=== modified file 'txstatsd/metrics/timermetric.py'
122--- txstatsd/metrics/timermetric.py 2012-05-22 19:12:00 +0000
123+++ txstatsd/metrics/timermetric.py 2012-05-24 19:44:17 +0000
124@@ -131,9 +131,6 @@
125 if duration >= 0:
126 self.histogram.update(duration)
127
128- def tick(self):
129- pass
130-
131 def report(self, timestamp):
132 # median, 75, 95, 98, 99, 99.9 percentile
133 percentiles = self.percentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999)
134
135=== modified file 'txstatsd/server/configurableprocessor.py'
136--- txstatsd/server/configurableprocessor.py 2012-05-22 19:12:00 +0000
137+++ txstatsd/server/configurableprocessor.py 2012-05-24 19:44:17 +0000
138@@ -101,9 +101,3 @@
139 events += 1
140
141 return (metrics, events)
142-
143- def update_metrics(self):
144- super(ConfigurableMessageProcessor, self).update_metrics()
145-
146- for metric in self.timer_metrics.itervalues():
147- metric.tick()
148
149=== modified file 'txstatsd/server/loggingprocessor.py'
150--- txstatsd/server/loggingprocessor.py 2011-12-02 17:09:32 +0000
151+++ txstatsd/server/loggingprocessor.py 2012-05-24 19:44:17 +0000
152@@ -1,4 +1,3 @@
153-
154 import time
155
156 from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor
157@@ -11,33 +10,26 @@
158 attribute.)
159 """
160
161- def __init__(self, logger, time_function=time.time, message_prefix="", plugins=None):
162+ def __init__(self, logger, time_function=time.time,
163+ message_prefix="", plugins=None, **kwz):
164 super(LoggingMessageProcessor, self).__init__(
165 time_function=time_function, message_prefix=message_prefix,
166- plugins=plugins)
167+ plugins=plugins, **kwz)
168
169- logger_info = getattr(logger, 'info', None)
170+ logger_info = getattr(logger, "info", None)
171 if logger_info is None or not callable(logger_info):
172 raise TypeError()
173 self.logger = logger
174
175+ def process_message(self, message, metric_type, key, fields):
176+ self.logger.info("In: %s" % message)
177+ return super(LoggingMessageProcessor, self).process_message(
178+ message, metric_type, key, fields)
179+
180 def flush(self, interval=10000, percent=90):
181- """Log all received metric samples to the supplied log file."""
182- timestamp = int(self.time_function())
183- interval = interval / 1000
184-
185- def log_metrics(metrics):
186- for metric in metrics.itervalues():
187- messages = metric.report(timestamp)
188- for measurement in messages:
189- self.logger.info("%s %s %s" % measurement)
190-
191- log_metrics(self.counter_metrics)
192- log_metrics(self.gauge_metrics)
193- log_metrics(self.meter_metrics)
194- log_metrics(self.timer_metrics)
195-
196- for metric in self.plugin_metrics.itervalues():
197- for measurement in metric.flush(interval, timestamp):
198- self.logger.info("%s %s %s" % measurement)
199-
200+ """Log all received metric samples to the supplied logger."""
201+ messages = list(super(LoggingMessageProcessor, self).flush(
202+ interval=interval, percent=percent))
203+ for msg in messages:
204+ self.logger.info("Out: %s %s %s" % msg)
205+ return messages
206
207=== modified file 'txstatsd/server/processor.py'
208--- txstatsd/server/processor.py 2012-04-26 19:21:24 +0000
209+++ txstatsd/server/processor.py 2012-05-24 19:44:17 +0000
210@@ -375,7 +375,3 @@
211
212 self.process_timings.clear()
213 self.by_type.clear()
214-
215- def update_metrics(self):
216- for metric in self.meter_metrics.itervalues():
217- metric.tick()
218
219=== modified file 'txstatsd/service.py'
220--- txstatsd/service.py 2012-02-08 23:30:59 +0000
221+++ txstatsd/service.py 2012-05-24 19:44:17 +0000
222@@ -4,6 +4,7 @@
223 import time
224 import ConfigParser
225 import platform
226+import functools
227
228 from twisted.application.internet import UDPServer, TCPServer
229 from twisted.application.service import MultiService
230@@ -15,6 +16,7 @@
231 from txstatsd.metrics.extendedmetrics import ExtendedMetrics
232 from txstatsd.server.processor import MessageProcessor
233 from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor
234+from txstatsd.server.loggingprocessor import LoggingMessageProcessor
235 from txstatsd.server.protocol import (
236 StatsDServerProtocol, StatsDTCPServerFactory)
237 from txstatsd.server.router import Router
238@@ -150,6 +152,9 @@
239 "Response we should send monitoring agent.", str],
240 ["statsd-compliance", "s", 1,
241 "Produce StatsD-compliant messages.", int],
242+ ["dump-mode", "d", 0,
243+ "Dump received and aggregated metrics"
244+ " before passing them to carbon.", int],
245 ["routing", "g", "",
246 "Routing rules", str],
247 ["listen-tcp-port", "t", None,
248@@ -246,13 +251,21 @@
249 plugin.configure(options)
250 plugin_metrics.append(plugin)
251
252+ processor = None
253+ if options["dump-mode"]:
254+ # LoggingMessageProcessor supersedes
255+ # any other processor class in "dump-mode"
256+ assert not hasattr(log, 'info')
257+ log.info = log.msg # for compatibility with LMP logger interface
258+ processor = functools.partial(LoggingMessageProcessor, logger=log)
259+
260 if options["statsd-compliance"]:
261- processor = MessageProcessor(plugins=plugin_metrics)
262+ processor = (processor or MessageProcessor)(plugins=plugin_metrics)
263 input_router = Router(processor, options['routing'], root_service)
264 connection = InternalClient(input_router)
265 metrics = Metrics(connection)
266 else:
267- processor = ConfigurableMessageProcessor(
268+ processor = (processor or ConfigurableMessageProcessor)(
269 message_prefix=prefix,
270 internal_metrics_prefix=prefix + "." + instance_name + ".",
271 plugins=plugin_metrics)
272@@ -270,9 +283,6 @@
273 reporting = ReportingService(instance_name)
274 reporting.setServiceParent(root_service)
275
276- # Schedule updates for those metrics expecting to be
277- # periodically updated, for example the meter metric.
278- reporting.schedule(processor.update_metrics, 10, None)
279 reporting.schedule(report_client_manager_stats,
280 options["flush-interval"] / 1000,
281 metrics.gauge)
282
283=== added file 'txstatsd/tests/metrics/test_metermetric.py'
284--- txstatsd/tests/metrics/test_metermetric.py 1970-01-01 00:00:00 +0000
285+++ txstatsd/tests/metrics/test_metermetric.py 2012-05-24 19:44:17 +0000
286@@ -0,0 +1,33 @@
287+import random
288+from twisted.trial.unittest import TestCase
289+
290+from txstatsd.metrics.metermetric import MeterMetricReporter
291+
292+
293+class TestDeriveMetricReporter(TestCase):
294+
295+ def test_fastpoll(self):
296+ wall_time = 42
297+ reporter = MeterMetricReporter(
298+ "test", wall_time_func=lambda: wall_time)
299+
300+ self.assertEquals([], reporter.report(wall_time))
301+
302+ def test_interface(self):
303+ random.seed(1)
304+
305+ wall_time = [0]
306+ reporter = MeterMetricReporter("test", prefix="some.prefix",
307+ wall_time_func=lambda: wall_time[0])
308+ reporter.mark(42)
309+ reporter.mark(60)
310+ reporter.mark(38)
311+ wall_time = [10]
312+
313+ reported = reporter.report(10)
314+ self.assertEqual(2, len(reported))
315+ self.assertEqual(140, reported[0][1])
316+ self.assertEqual(14, reported[1][1])
317+ self.assertEquals(
318+ ['some.prefix.test.count', 'some.prefix.test.rate'],
319+ [reported[0][0], reported[1][0]])
320
321=== modified file 'txstatsd/tests/test_configurableprocessor.py'
322--- txstatsd/tests/test_configurableprocessor.py 2012-05-22 19:12:00 +0000
323+++ txstatsd/tests/test_configurableprocessor.py 2012-05-24 19:44:17 +0000
324@@ -100,17 +100,11 @@
325 time_function=lambda: _now)
326
327 configurable_processor.process("glork:4|ms")
328- configurable_processor.update_metrics()
329 configurable_processor.process("glork:8|ms")
330- configurable_processor.update_metrics()
331 configurable_processor.process("glork:15|ms")
332- configurable_processor.update_metrics()
333 configurable_processor.process("glork:16|ms")
334- configurable_processor.update_metrics()
335 configurable_processor.process("glork:23|ms")
336- configurable_processor.update_metrics()
337 configurable_processor.process("glork:42|ms")
338- configurable_processor.update_metrics()
339
340 _now = 42
341 messages = configurable_processor.flush()
342@@ -155,13 +149,7 @@
343
344 self.time_now += 1
345 messages = self.configurable_processor.flush()
346- self.assertEqual(("test.metric.gorets.15min_rate", 0.0, self.time_now),
347+ self.assertEqual(("test.metric.gorets.count", 3.0, self.time_now),
348 messages[0])
349- self.assertEqual(("test.metric.gorets.1min_rate", 0.0, self.time_now),
350+ self.assertEqual(("test.metric.gorets.rate", 3.0, self.time_now),
351 messages[1])
352- self.assertEqual(("test.metric.gorets.5min_rate", 0.0, self.time_now),
353- messages[2])
354- self.assertEqual(("test.metric.gorets.count", 3.0, self.time_now),
355- messages[3])
356- self.assertEqual(("test.metric.gorets.mean_rate", 3.0, self.time_now),
357- messages[4])
358
359=== modified file 'txstatsd/tests/test_loggingprocessor.py'
360--- txstatsd/tests/test_loggingprocessor.py 2011-12-02 17:09:32 +0000
361+++ txstatsd/tests/test_loggingprocessor.py 2012-05-24 19:44:17 +0000
362@@ -1,4 +1,3 @@
363-
364 from unittest import TestCase
365
366 from twisted.plugins.distinct_plugin import distinct_metric_factory
367@@ -44,20 +43,20 @@
368 metric = FakeMeterMetric()
369 processor.meter_metrics['test'] = metric
370 processor.flush()
371- expected = "\n".join(["%s %s %s" % message
372- for message in metric.report()])
373- self.assertEqual(expected + "\n", logger.log)
374+ expected = ["Out: %s %s %s" % message
375+ for message in metric.report()]
376+ self.assertFalse(set(expected).difference(logger.log.splitlines()))
377
378 def test_logger_plugin(self):
379 logger = TestLogger()
380 processor = LoggingMessageProcessor(
381 logger, plugins=[distinct_metric_factory],
382 time_function=lambda: 42)
383- processor.process("gorets:17|pd")
384+ msg_in = "gorets:17|pd"
385+ processor.process(msg_in)
386 processor.flush()
387 messages = processor.plugin_metrics['gorets'].flush(
388 10, processor.time_function())
389- expected = "\n".join(["%s %s %s" % message
390- for message in messages])
391- self.assertEqual(expected + "\n", logger.log)
392-
393+ expected = ["In: %s" % msg_in] + ["Out: %s %s %s" % message
394+ for message in messages]
395+ self.assertFalse(set(expected).difference(logger.log.splitlines()))
396
397=== modified file 'txstatsd/tests/test_processor.py'
398--- txstatsd/tests/test_processor.py 2012-04-26 19:21:24 +0000
399+++ txstatsd/tests/test_processor.py 2012-05-24 19:44:17 +0000
400@@ -355,10 +355,6 @@
401 def wall_clock_time(self):
402 return self.time_now
403
404- def mark_minutes(self, minutes):
405- for i in range(1, minutes * 60, 5):
406- self.processor.update_metrics()
407-
408 def test_flush_meter_metric(self):
409 """
410 Test the correct rendering of the Graphite report for
411@@ -369,45 +365,22 @@
412 self.time_now += 1
413 messages = self.processor.flush()
414 self.assertEqual(
415- ("stats.meter.gorets.15min_rate", 0.0, self.time_now),
416+ ("stats.meter.gorets.count", 3.0, self.time_now),
417 messages[0])
418 self.assertEqual(
419- ("stats.meter.gorets.1min_rate", 0.0, self.time_now),
420+ ("stats.meter.gorets.rate", 3.0, self.time_now),
421 messages[1])
422 self.assertEqual(
423- ("stats.meter.gorets.5min_rate", 0.0, self.time_now),
424- messages[2])
425- self.assertEqual(
426- ("stats.meter.gorets.count", 3.0, self.time_now),
427- messages[3])
428- self.assertEqual(
429- ("stats.meter.gorets.mean_rate", 3.0, self.time_now),
430- messages[4])
431- self.assertEqual(
432 ("statsd.numStats", 1, self.time_now),
433- messages[5])
434-
435- # As we are employing the expected results from test_ewma.py
436- # we perform the initial tick(), before advancing the clock 60sec.
437- self.processor.update_metrics()
438-
439- self.mark_minutes(1)
440+ messages[2])
441+
442 self.time_now += 60
443 messages = self.processor.flush()
444 self.assertEqual(
445- ("stats.meter.gorets.15min_rate", 0.561304, self.time_now),
446+ ("stats.meter.gorets.count", 3.0, self.time_now),
447 messages[0])
448 self.assertEqual(
449- ("stats.meter.gorets.1min_rate", 0.220728, self.time_now),
450+ ("stats.meter.gorets.rate", 0.0, self.time_now),
451 messages[1])
452 self.assertEqual(
453- ("stats.meter.gorets.5min_rate", 0.491238, self.time_now),
454- messages[2])
455- self.assertEqual(
456- ("stats.meter.gorets.count", 3.0, self.time_now),
457- messages[3])
458- self.assertEqual(
459- ("stats.meter.gorets.mean_rate", 0.049180, self.time_now),
460- messages[4])
461- self.assertEqual(
462- ("statsd.numStats", 1, self.time_now), messages[5])
463+ ("statsd.numStats", 1, self.time_now), messages[2])
464
465=== modified file 'txstatsd/tests/test_service.py'
466--- txstatsd/tests/test_service.py 2012-02-07 21:11:42 +0000
467+++ txstatsd/tests/test_service.py 2012-05-24 19:44:17 +0000
468@@ -1,13 +1,13 @@
469+import tempfile
470 import ConfigParser
471-import tempfile
472-from unittest import TestCase
473 from StringIO import StringIO
474
475+from twisted.trial.unittest import TestCase
476+
477 from carbon.client import CarbonClientManager
478
479 from twisted.internet.defer import inlineCallbacks, Deferred
480 from twisted.internet.protocol import DatagramProtocol
481-from twisted.internet.test.reactormixins import ReactorBuilder
482 from twisted.application.internet import UDPServer
483
484 from txstatsd import service
485@@ -186,7 +186,7 @@
486 self.monitor_response = data
487
488
489-class ServiceTestsBuilder(ReactorBuilder):
490+class ServiceTestsBuilder(TestCase):
491
492 def test_service(self):
493 """
494@@ -243,7 +243,7 @@
495 The StatsD service messages the expected response to the
496 monitoring agent.
497 """
498- reactor = self.buildReactor()
499+ from twisted.internet import reactor
500
501 options = service.StatsDOptions()
502 processor = MessageProcessor()
503@@ -280,7 +280,5 @@
504 reactor.stop()
505
506 reactor.callWhenRunning(exercise)
507- self.runReactor(reactor)
508-
509-
510-globals().update(ServiceTestsBuilder.makeTestCaseClasses())
511+ reactor.run()
512+

Subscribers

People subscribed via source and target branches