Merge lp:~theiw/txstatsd/txstatsd-coda-hale-counters into lp:txstatsd

Proposed by Ian Wilkinson
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 34
Merged at revision: 34
Proposed branch: lp:~theiw/txstatsd/txstatsd-coda-hale-counters
Merge into: lp:txstatsd
Diff against target: 284 lines (+156/-20)
7 files modified
txstatsd/metrics/countermetric.py (+76/-0)
txstatsd/metrics/extendedmetrics.py (+39/-0)
txstatsd/metrics/metermetric.py (+3/-1)
txstatsd/server/configurableprocessor.py (+26/-8)
txstatsd/server/processor.py (+4/-0)
txstatsd/tests/test_processor.py (+7/-10)
txstatsd/version.py (+1/-1)
To merge this branch: bzr merge lp:~theiw/txstatsd/txstatsd-coda-hale-counters
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Lucio Torre (community) Approve
Review via email: mp+74816@code.launchpad.net

Commit message

Include support for reporting counter metrics, fixes an issue with the prefix for meter metrics.

Description of the change

Include support for reporting counter metrics.

The specialised ExtendedMetrics was introducing imagining an interface away from that of StatsD.

(This also fixes an issue with the prefix for meter metrics.)

To post a comment you must log in.
34. By Ian Wilkinson

Correctly update the counter reporter.

Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve
Revision history for this message
Sidnei da Silva (sidnei) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'txstatsd/metrics/countermetric.py'
2--- txstatsd/metrics/countermetric.py 1970-01-01 00:00:00 +0000
3+++ txstatsd/metrics/countermetric.py 2011-09-09 15:51:23 +0000
4@@ -0,0 +1,76 @@
5+
6+import math
7+from string import Template
8+
9+from txstatsd.metrics.metric import Metric
10+
11+
12+class CounterMetric(Metric):
13+ """An incrementing and decrementing counter metric."""
14+
15+ def __init__(self, connection, name, sample_rate=1):
16+ """Construct a metric that reports samples to the supplied
17+ C{connection}.
18+
19+ @param connection: The connection endpoint representing
20+ the StatsD server.
21+ @param name: Indicates what is being instrumented.
22+ @param sample_rate: Restrict the number of samples sent
23+ to the StatsD server based on the supplied C{sample_rate}.
24+ """
25+ Metric.__init__(self, connection, name, sample_rate=sample_rate)
26+
27+ self._count = 0
28+
29+ def increment(self, value):
30+ """Increment the counter by C{value}"""
31+ self._count += value
32+ self._update(self._count)
33+
34+ def decrement(self, value):
35+ """Decrement the counter by C{value}"""
36+ self._count -= value
37+ self._update(self._count)
38+
39+ def count(self):
40+ """Returns the counter's current value."""
41+ return self._count
42+
43+ def clear(self):
44+ """Resets the counter to 0."""
45+ self._count = 0
46+ self._update(self._count)
47+
48+ def _update(self, value):
49+ """Report the counter."""
50+ self.send("%s|c" % value)
51+
52+
53+class CounterMetricReporter(object):
54+ """An incrementing and decrementing counter metric."""
55+
56+ MESSAGE = (
57+ "$prefix%(key)s.count %(count)s %(timestamp)s\n")
58+
59+ def __init__(self, name, prefix=""):
60+ """Construct a metric we expect to be periodically updated.
61+
62+ @param name: Indicates what is being instrumented.
63+ """
64+ self.name = name
65+
66+ if prefix:
67+ prefix += '.'
68+ self.message = Template(CounterMetricReporter.MESSAGE).substitute(
69+ prefix=prefix)
70+
71+ self.count = 0
72+
73+ def mark(self, value):
74+ self.count += value
75+
76+ def report(self, timestamp):
77+ return self.message % {
78+ "key": self.name,
79+ "count": math.trunc(self.count),
80+ "timestamp": timestamp}
81
82=== added file 'txstatsd/metrics/extendedmetrics.py'
83--- txstatsd/metrics/extendedmetrics.py 1970-01-01 00:00:00 +0000
84+++ txstatsd/metrics/extendedmetrics.py 2011-09-09 15:51:23 +0000
85@@ -0,0 +1,39 @@
86+
87+from txstatsd.metrics.countermetric import CounterMetric
88+from txstatsd.metrics.metrics import Metrics
89+
90+
91+class ExtendedMetrics(Metrics):
92+ def __init__(self, connection=None, namespace=""):
93+ """A convenience class for reporting metric samples
94+ to a C{txstatsd} server configured with the
95+ L{ConfigurableProcessor<txstatsd.server.configurableprocessor>}
96+ processor.
97+
98+ @param connection: The connection endpoint representing
99+ the C{txstatsd} server.
100+ @param namespace: The top-level namespace identifying the
101+ origin of the samples.
102+ """
103+
104+ super(ExtendedMetrics, self).__init__(connection, namespace)
105+
106+ def increment(self, name, value=1, sample_rate=1):
107+ """Report and increase in name by count."""
108+ name = self.fully_qualify_name(name)
109+ if not name in self._metrics:
110+ metric = CounterMetric(self.connection,
111+ name,
112+ sample_rate)
113+ self._metrics[name] = metric
114+ self._metrics[name].increment(value)
115+
116+ def decrement(self, name, value=1, sample_rate=1):
117+ """Report and decrease in name by count."""
118+ name = self.fully_qualify_name(name)
119+ if not name in self._metrics:
120+ metric = CounterMetric(self.connection,
121+ name,
122+ sample_rate)
123+ self._metrics[name] = metric
124+ self._metrics[name].decrement(value)
125
126=== modified file 'txstatsd/metrics/metermetric.py'
127--- txstatsd/metrics/metermetric.py 2011-09-02 10:11:11 +0000
128+++ txstatsd/metrics/metermetric.py 2011-09-09 15:51:23 +0000
129@@ -60,8 +60,10 @@
130 self.name = name
131 self.wall_time_func = wall_time_func
132
133+ if prefix:
134+ prefix += '.'
135 self.message = Template(MeterMetricReporter.MESSAGE).substitute(
136- prefix=prefix + '.')
137+ prefix=prefix)
138
139 self.m1_rate = Ewma.one_minute_ewma()
140 self.m5_rate = Ewma.five_minute_ewma()
141
142=== modified file 'txstatsd/server/configurableprocessor.py'
143--- txstatsd/server/configurableprocessor.py 2011-09-02 10:11:11 +0000
144+++ txstatsd/server/configurableprocessor.py 2011-09-09 15:51:23 +0000
145@@ -3,6 +3,7 @@
146
147 import time
148
149+from txstatsd.metrics.countermetric import CounterMetricReporter
150 from txstatsd.metrics.metermetric import MeterMetricReporter
151 from txstatsd.server.processor import MessageProcessor
152
153@@ -14,6 +15,7 @@
154 Currently, this extends to:
155 - Allow a prefix to be added to the composed messages sent
156 to the Graphite server.
157+ - Report an incrementing and decrementing counter metric.
158 """
159
160 # Notice: These messages replicate those seen in the
161@@ -21,10 +23,6 @@
162 # In a future release they will be placed in their
163 # respective metric reporter class.
164 # See MeterMetricReporter.
165- COUNTERS_MESSAGE = (
166- "$prefix%(key)s %(value)s %(timestamp)s\n"
167- "$prefix%(key)s %(count)s %(timestamp)s\n")
168-
169 TIMERS_MESSAGE = (
170 "$prefix%(key)s.mean %(mean)s %(timestamp)s\n"
171 "$prefix%(key)s.upper %(upper)s %(timestamp)s\n"
172@@ -43,10 +41,6 @@
173 if message_prefix:
174 message_prefix += '.'
175
176- message = Template(ConfigurableMessageProcessor.COUNTERS_MESSAGE)
177- self.counters_message = message.substitute(
178- prefix=message_prefix)
179-
180 message = Template(ConfigurableMessageProcessor.TIMERS_MESSAGE)
181 self.timers_message = message.substitute(
182 prefix=message_prefix)
183@@ -55,9 +49,33 @@
184 self.gauge_metric_message = message.substitute(
185 prefix=message_prefix)
186
187+ def process_counter_metric(self, key, composite, message):
188+ try:
189+ value = float(composite[0])
190+ except (TypeError, ValueError):
191+ return self.fail(message)
192+
193+ self.compose_counter_metric(key, value)
194+
195+ def compose_counter_metric(self, key, value):
196+ if not key in self.counter_metrics:
197+ metric = CounterMetricReporter(key, prefix=self.message_prefix)
198+ self.counter_metrics[key] = metric
199+ self.counter_metrics[key].mark(value)
200+
201 def compose_meter_metric(self, key, value):
202 if not key in self.meter_metrics:
203 metric = MeterMetricReporter(key, self.time_function,
204 prefix=self.message_prefix)
205 self.meter_metrics[key] = metric
206 self.meter_metrics[key].mark(value)
207+
208+ def flush_counter_metrics(self, interval, timestamp):
209+ metrics = []
210+ events = 0
211+ for metric in self.counter_metrics.itervalues():
212+ message = metric.report(timestamp)
213+ metrics.append(message)
214+ events += 1
215+
216+ return (metrics, events)
217
218=== modified file 'txstatsd/server/processor.py'
219--- txstatsd/server/processor.py 2011-09-02 10:11:11 +0000
220+++ txstatsd/server/processor.py 2011-09-09 15:51:23 +0000
221@@ -114,6 +114,10 @@
222 if match is None:
223 return self.fail(message)
224 rate = match.group(1)
225+
226+ self.compose_counter_metric(key, value, rate)
227+
228+ def compose_counter_metric(self, key, value, rate):
229 if key not in self.counter_metrics:
230 self.counter_metrics[key] = 0
231 self.counter_metrics[key] += value * (1 / float(rate))
232
233=== modified file 'txstatsd/tests/test_processor.py'
234--- txstatsd/tests/test_processor.py 2011-09-02 10:11:11 +0000
235+++ txstatsd/tests/test_processor.py 2011-09-09 15:51:23 +0000
236@@ -148,18 +148,18 @@
237 def test_flush_counter_with_empty_prefix(self):
238 """
239 Ensure no prefix features if none is supplied.
240+ B{Note}: The C{ConfigurableMessageProcessor} reports
241+ the counter value, and not the normalized version as
242+ seen in the StatsD-compliant C{Processor}.
243 """
244 configurable_processor = ConfigurableMessageProcessor(
245 time_function=lambda: 42)
246- configurable_processor.counter_metrics["gorets"] = 42
247+ configurable_processor.process("gorets:17|c")
248 messages = configurable_processor.flush()
249 self.assertEqual(2, len(messages))
250 counters = messages[0].splitlines()
251- self.assertEqual("gorets 4 42", counters[0])
252- self.assertEqual("gorets 42 42", counters[1])
253+ self.assertEqual("gorets.count 17 42", counters[0])
254 self.assertEqual("statsd.numStats 1 42", messages[1])
255- self.assertEqual(
256- 0, configurable_processor.counter_metrics["gorets"])
257
258 def test_flush_counter_with_prefix(self):
259 """
260@@ -167,15 +167,12 @@
261 """
262 configurable_processor = ConfigurableMessageProcessor(
263 time_function=lambda: 42, message_prefix="test.metric")
264- configurable_processor.counter_metrics["gorets"] = 42
265+ configurable_processor.process("gorets:17|c")
266 messages = configurable_processor.flush()
267 self.assertEqual(2, len(messages))
268 counters = messages[0].splitlines()
269- self.assertEqual("test.metric.gorets 4 42", counters[0])
270- self.assertEqual("test.metric.gorets 42 42", counters[1])
271+ self.assertEqual("test.metric.gorets.count 17 42", counters[0])
272 self.assertEqual("statsd.numStats 1 42", messages[1])
273- self.assertEqual(
274- 0, configurable_processor.counter_metrics["gorets"])
275
276 def test_flush_counter_one_second_interval(self):
277 """
278
279=== modified file 'txstatsd/version.py'
280--- txstatsd/version.py 2011-09-07 10:18:11 +0000
281+++ txstatsd/version.py 2011-09-09 15:51:23 +0000
282@@ -1,1 +1,1 @@
283-txstatsd = "0.3.0"
284+txstatsd = "0.4.0"

Subscribers

People subscribed via source and target branches