Merge lp:~mk-fraggod/txstatsd/txstatsd into lp:txstatsd
- txstatsd
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 92 |
Proposed branch: | lp:~mk-fraggod/txstatsd/txstatsd |
Merge into: | lp:txstatsd |
Diff against target: |
6367 lines (+6061/-0) (has conflicts) 56 files modified
.bzrignore (+2/-0) LICENSE (+20/-0) MANIFEST.in (+5/-0) README (+21/-0) example-stats-client.tac (+52/-0) requirements.txt (+7/-0) setup.py (+67/-0) statsd.tac (+8/-0) twisted/plugins/derive_plugin.py (+69/-0) twisted/plugins/distinct_plugin.py (+20/-0) twisted/plugins/txstatsd_plugin.py (+22/-0) txstatsd.conf-example (+26/-0) txstatsd/client.py (+165/-0) txstatsd/itxstatsd.py (+54/-0) txstatsd/metrics/countermetric.py (+67/-0) txstatsd/metrics/distinctmetric.py (+150/-0) txstatsd/metrics/extendedmetrics.py (+53/-0) txstatsd/metrics/gaugemetric.py (+43/-0) txstatsd/metrics/histogrammetric.py (+209/-0) txstatsd/metrics/imetrics.py (+18/-0) txstatsd/metrics/metermetric.py (+103/-0) txstatsd/metrics/metric.py (+47/-0) txstatsd/metrics/metrics.py (+140/-0) txstatsd/metrics/timermetric.py (+161/-0) txstatsd/process.py (+217/-0) txstatsd/report.py (+64/-0) txstatsd/server/configurableprocessor.py (+108/-0) txstatsd/server/httpinfo.py (+60/-0) txstatsd/server/loggingprocessor.py (+36/-0) txstatsd/server/processor.py (+371/-0) txstatsd/server/protocol.py (+63/-0) txstatsd/server/router.py (+308/-0) txstatsd/service.py (+340/-0) txstatsd/stats/ewma.py (+70/-0) txstatsd/stats/exponentiallydecayingsample.py (+124/-0) txstatsd/stats/uniformsample.py (+45/-0) txstatsd/tests/helper.py (+18/-0) txstatsd/tests/metrics/__init__.py (+2/-0) txstatsd/tests/metrics/test_derive.py (+77/-0) txstatsd/tests/metrics/test_distinct.py (+94/-0) txstatsd/tests/metrics/test_histogrammetric.py (+91/-0) txstatsd/tests/metrics/test_timermetric.py (+139/-0) txstatsd/tests/stats/test_ewma.py (+315/-0) txstatsd/tests/stats/test_exponentiallydecayingsample.py (+84/-0) txstatsd/tests/stats/test_uniformsample.py (+32/-0) txstatsd/tests/test_client.py (+93/-0) txstatsd/tests/test_configurableprocessor.py (+152/-0) txstatsd/tests/test_httpinfo.py (+128/-0) txstatsd/tests/test_loggingprocessor.py (+64/-0) txstatsd/tests/test_metrics.py (+120/-0) txstatsd/tests/test_process.py (+322/-0) txstatsd/tests/test_processor.py (+406/-0) txstatsd/tests/test_report.py (+84/-0) txstatsd/tests/test_router.py (+218/-0) txstatsd/tests/test_service.py (+286/-0) txstatsd/version.py (+1/-0) Conflict adding file .bzrignore. Moved existing file to .bzrignore.moved. Conflict adding file LICENSE. Moved existing file to LICENSE.moved. Conflict adding file MANIFEST.in. Moved existing file to MANIFEST.in.moved. Conflict adding file README. Moved existing file to README.moved. Conflict adding file example-stats-client.tac. Moved existing file to example-stats-client.tac.moved. Conflict adding file requirements.txt. Moved existing file to requirements.txt.moved. Conflict adding file setup.py. Moved existing file to setup.py.moved. Conflict adding file statsd.tac. Moved existing file to statsd.tac.moved. Conflict adding file twisted. Moved existing file to twisted.moved. Conflict adding file txstatsd.conf-example. Moved existing file to txstatsd.conf-example.moved. Conflict adding file txstatsd. Moved existing file to txstatsd.moved. |
To merge this branch: | bzr merge lp:~mk-fraggod/txstatsd/txstatsd |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
txStatsD Developers | Pending | ||
Review via email: mp+105687@code.launchpad.net |
This proposal supersedes a proposal from 2012-04-19.
Commit message
Description of the change
Adds --dump-mode option with passing data further to graphite (as per suggestion on the IRC) and derive_plugin for instant-rate meters, tests included.
I've squashed commits per-feature, original history can be found in the github fork (github.
Lucio Torre (lucio.torre) wrote : Posted in a previous version of this proposal | # |
Mike Kazantsev (mk-fraggod) wrote : Posted in a previous version of this proposal | # |
Will update to use "dx", sure, just failed to notice the conflict.
About graphite-side derivative calculation - I assume you question the need for "rate" value with "count" already present.
1. I find it harder (from usability standpoint) to make graphite calculate derivative (than just pre-calculate one) every time the value is needed. Especially true for wildcard graphs, dashboards and any custom data interface.
2. "derivative()" in graphite is incapable to handle overflow/restart correctly, producing huge spike whenever that happens, totally screwing up any graph, while "rate" sent from statsd does not suffer from that issue at all.
3. "count" offset (say, when the absolute level was known-baseline) can be calculated from rate by summing all the rate values since then, and the damage from overflow/restart with this approach is minimal, compared to "reset to a totally arbitrary value".
4. "count" value does not make any sense in nearly all cases - it's a completely arbitrary value offset from some point in time. All the real-world use-cases I have for such metric type are for "rate" only.
Initially I've left "count" there just for consistency with "meter" metric, which can be used if you need just "count", I guess, but at this point I think it was a mistake.
So what do you think about leaving only the "rate" value there?
It would definitely be better for my purposes - minus one nesting level, lesser load and disk usage.
If you believe it'd make sense to have only ".count" in mainline, existing "meter" metric should probably be used instead, and the plugin isn't needed there at all, I guess.
Preview Diff
1 | === added file '.bzrignore' |
2 | --- .bzrignore 1970-01-01 00:00:00 +0000 |
3 | +++ .bzrignore 2012-05-14 16:57:19 +0000 |
4 | @@ -0,0 +1,2 @@ |
5 | +_trial_temp* |
6 | +twisted/plugins/dropin.cache |
7 | |
8 | === renamed file '.bzrignore' => '.bzrignore.moved' |
9 | === added file 'LICENSE' |
10 | --- LICENSE 1970-01-01 00:00:00 +0000 |
11 | +++ LICENSE 2012-05-14 16:57:19 +0000 |
12 | @@ -0,0 +1,20 @@ |
13 | +Copyright (C) 2011 Canonical Services Ltd |
14 | + |
15 | +Permission is hereby granted, free of charge, to any person obtaining |
16 | +a copy of this software and associated documentation files (the |
17 | +"Software"), to deal in the Software without restriction, including |
18 | +without limitation the rights to use, copy, modify, merge, publish, |
19 | +distribute, sublicense, and/or sell copies of the Software, and to |
20 | +permit persons to whom the Software is furnished to do so, subject to |
21 | +the following conditions: |
22 | + |
23 | +The above copyright notice and this permission notice shall be |
24 | +included in all copies or substantial portions of the Software. |
25 | + |
26 | +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
27 | +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
28 | +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
29 | +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
30 | +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, |
31 | +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
32 | +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
33 | |
34 | === renamed file 'LICENSE' => 'LICENSE.moved' |
35 | === added file 'MANIFEST.in' |
36 | --- MANIFEST.in 1970-01-01 00:00:00 +0000 |
37 | +++ MANIFEST.in 2012-05-14 16:57:19 +0000 |
38 | @@ -0,0 +1,5 @@ |
39 | +include README |
40 | +include LICENSE |
41 | +include setup.py |
42 | +recursive-include txstatsd *.py |
43 | +graft twisted |
44 | |
45 | === renamed file 'MANIFEST.in' => 'MANIFEST.in.moved' |
46 | === added file 'README' |
47 | --- README 1970-01-01 00:00:00 +0000 |
48 | +++ README 2012-05-14 16:57:19 +0000 |
49 | @@ -0,0 +1,21 @@ |
50 | +Dependencies |
51 | +------------ |
52 | + |
53 | +* Python |
54 | + |
55 | +* The twisted python package (python-twisted on Debian or similar systems) |
56 | + |
57 | +Things present here |
58 | +------------------- |
59 | + |
60 | +* The txstatsd python package, containing a server and a client implementation |
61 | + for the statsd protocol. |
62 | + |
63 | +* The metrics support borrows from Coda Hale's Metrics project |
64 | + https://github.com/codahale/metrics. |
65 | + |
66 | +License |
67 | +------- |
68 | + |
69 | +txStatsD is open source software, MIT License. See the LICENSE file for more |
70 | +details. |
71 | |
72 | === renamed file 'README' => 'README.moved' |
73 | === added file 'example-stats-client.tac' |
74 | --- example-stats-client.tac 1970-01-01 00:00:00 +0000 |
75 | +++ example-stats-client.tac 2012-05-14 16:57:19 +0000 |
76 | @@ -0,0 +1,52 @@ |
77 | +import socket |
78 | +import random |
79 | + |
80 | +from twisted.internet import reactor |
81 | +from twisted.internet import task |
82 | + |
83 | +from twisted.application.service import Application |
84 | + |
85 | +from txstatsd.client import ( |
86 | + TwistedStatsDClient, StatsDClientProtocol) |
87 | +from txstatsd.metrics.metrics import Metrics |
88 | +from txstatsd.process import PROCESS_STATS |
89 | +from txstatsd.report import ReportingService |
90 | + |
91 | + |
92 | +STATSD_HOST = "127.0.0.1" |
93 | +STATSD_PORT = 8125 |
94 | + |
95 | +application = Application("example-stats-client") |
96 | +statsd_client = TwistedStatsDClient(STATSD_HOST, STATSD_PORT) |
97 | +metrics = Metrics(connection=statsd_client, |
98 | + namespace=socket.gethostname() + ".example-client") |
99 | + |
100 | +reporting = ReportingService() |
101 | +reporting.setServiceParent(application) |
102 | + |
103 | +for report in PROCESS_STATS: |
104 | + reporting.schedule(report, 10, metrics.increment) |
105 | + |
106 | +def random_walker(name): |
107 | + """Meters a random walk.""" |
108 | + if random.random() > 0.5: |
109 | + metrics.increment(name) |
110 | + else: |
111 | + metrics.decrement(name) |
112 | + |
113 | +def random_normal(name): |
114 | + """Meters samples from a normal distribution.""" |
115 | + metrics.timing(name, random.normalvariate(10, 3)) |
116 | + |
117 | + |
118 | +for n in range(5): |
119 | + t = task.LoopingCall(random_walker, name="walker%i" % n) |
120 | + t.start(0.5, now=False) |
121 | + |
122 | +for n in range(5): |
123 | + t = task.LoopingCall(random_normal, name="normal%i" % n) |
124 | + t.start(0.5, now=False) |
125 | + |
126 | + |
127 | +protocol = StatsDClientProtocol(statsd_client) |
128 | +reactor.listenUDP(0, protocol) |
129 | |
130 | === renamed file 'example-stats-client.tac' => 'example-stats-client.tac.moved' |
131 | === added file 'requirements.txt' |
132 | --- requirements.txt 1970-01-01 00:00:00 +0000 |
133 | +++ requirements.txt 2012-05-14 16:57:19 +0000 |
134 | @@ -0,0 +1,7 @@ |
135 | +Twisted==11.0.0 |
136 | +mocker==1.1 |
137 | +numpy==1.6.1 |
138 | +psutil==0.4.0 |
139 | +scipy==0.9.0 |
140 | +wsgiref==0.1.2 |
141 | +zope.interface==3.8.0 |
142 | |
143 | === renamed file 'requirements.txt' => 'requirements.txt.moved' |
144 | === added file 'setup.py' |
145 | --- setup.py 1970-01-01 00:00:00 +0000 |
146 | +++ setup.py 2012-05-14 16:57:19 +0000 |
147 | @@ -0,0 +1,67 @@ |
148 | +from distutils.core import setup |
149 | +from distutils.command.install import install |
150 | +from glob import glob |
151 | +import os |
152 | + |
153 | +from twisted.plugin import IPlugin, getPlugins |
154 | + |
155 | +from txstatsd import version |
156 | + |
157 | +# If setuptools is present, use it to find_packages(), and also |
158 | +# declare our dependency on epsilon. |
159 | +extra_setup_args = {} |
160 | +try: |
161 | + import setuptools |
162 | + from setuptools import find_packages |
163 | +except ImportError: |
164 | + def find_packages(): |
165 | + """ |
166 | + Compatibility wrapper. |
167 | + |
168 | + Taken from storm setup.py. |
169 | + """ |
170 | + packages = [] |
171 | + for directory, subdirectories, files in os.walk("txstatsd"): |
172 | + if '__init__.py' in files: |
173 | + packages.append(directory.replace(os.sep, '.')) |
174 | + return packages |
175 | + |
176 | +long_description = """ |
177 | +Twisted-based implementation of a statsd-compatible server and client. |
178 | +""" |
179 | + |
180 | + |
181 | +class TxPluginInstaller(install): |
182 | + def run(self): |
183 | + install.run(self) |
184 | + # Make sure we refresh the plugin list when installing, so we know |
185 | + # we have enough write permissions. |
186 | + # see http://twistedmatrix.com/documents/current/core/howto/plugin.html |
187 | + # "when installing or removing software which provides Twisted plugins, |
188 | + # the site administrator should be sure the cache is regenerated" |
189 | + list(getPlugins(IPlugin)) |
190 | + |
191 | +setup( |
192 | + cmdclass = {'install': TxPluginInstaller}, |
193 | + name="txStatsD", |
194 | + version=version.txstatsd, |
195 | + description="A network daemon for aggregating statistics", |
196 | + author="txStatsD Developers", |
197 | + url="https://launchpad.net/txstatsd", |
198 | + license="MIT", |
199 | + packages=find_packages() + ["twisted.plugins"], |
200 | + scripts=glob("./bin/*"), |
201 | + long_description=long_description, |
202 | + classifiers=[ |
203 | + "Development Status :: 4 - Beta", |
204 | + "Intended Audience :: Developers", |
205 | + "Intended Audience :: System Administrators", |
206 | + "Intended Audience :: Information Technology", |
207 | + "Programming Language :: Python", |
208 | + "Topic :: Database", |
209 | + "Topic :: Internet :: WWW/HTTP", |
210 | + "License :: OSI Approved :: MIT License", |
211 | + ], |
212 | + **extra_setup_args |
213 | + ) |
214 | + |
215 | |
216 | === renamed file 'setup.py' => 'setup.py.moved' |
217 | === added file 'statsd.tac' |
218 | --- statsd.tac 1970-01-01 00:00:00 +0000 |
219 | +++ statsd.tac 2012-05-14 16:57:19 +0000 |
220 | @@ -0,0 +1,8 @@ |
221 | +from twisted.application.service import Application |
222 | + |
223 | +from txstatsd import service |
224 | + |
225 | +application = Application("statsd") |
226 | + |
227 | +statsd_service = service.createService(service.StatsDOptions()) |
228 | +statsd_service.setServiceParent(application) |
229 | |
230 | === renamed file 'statsd.tac' => 'statsd.tac.moved' |
231 | === added directory 'twisted' |
232 | === renamed directory 'twisted' => 'twisted.moved' |
233 | === added directory 'twisted/plugins' |
234 | === added file 'twisted/plugins/derive_plugin.py' |
235 | --- twisted/plugins/derive_plugin.py 1970-01-01 00:00:00 +0000 |
236 | +++ twisted/plugins/derive_plugin.py 2012-05-14 16:57:19 +0000 |
237 | @@ -0,0 +1,69 @@ |
238 | +from zope.interface import implements |
239 | + |
240 | +from twisted.plugin import IPlugin |
241 | +from txstatsd.itxstatsd import IMetric, IMetricFactory |
242 | +from txstatsd.metrics.metric import Metric |
243 | + |
244 | +import logging, time |
245 | + |
246 | + |
247 | +class DeriveMetricReporter(object): |
248 | + """ |
249 | + A simplier meter metric which measures instant throughput rate for each interval. |
250 | + """ |
251 | + implements(IMetric) |
252 | + |
253 | + def __init__(self, name, wall_time_func=time.time, prefix=""): |
254 | + """Construct a metric we expect to be periodically updated. |
255 | + |
256 | + @param name: Indicates what is being instrumented. |
257 | + @param wall_time_func: Function for obtaining wall time. |
258 | + """ |
259 | + self.name = name |
260 | + self.wall_time_func = wall_time_func |
261 | + if prefix: prefix += "." |
262 | + self.prefix = prefix |
263 | + self.update = self.count = 0 |
264 | + self.poll_time = self.wall_time_func() |
265 | + |
266 | + def process(self, fields): |
267 | + """ |
268 | + Process new data for this metric. |
269 | + |
270 | + @type fields: C{list} |
271 | + @param fields: The list of message parts. Usually in the form of |
272 | + (value, metric_type, [sample_rate]) |
273 | + """ |
274 | + val, k = fields |
275 | + self.update += float(val) |
276 | + |
277 | + def flush(self, interval, timestamp): |
278 | + """ |
279 | + Returns a string with new line separated list of metrics to report. |
280 | + |
281 | + @type interval: C{float} |
282 | + @param interval: The time since last flush. |
283 | + @type timestamp: C{float} |
284 | + @param timestamp: The timestamp for now. |
285 | + """ |
286 | + poll_prev, self.poll_time = self.poll_time, self.wall_time_func() |
287 | + if self.poll_time == poll_prev: return list() |
288 | + rate = float(self.update) / (self.poll_time - poll_prev) |
289 | + self.count, self.update = self.count + self.update, 0 |
290 | + return list( |
291 | + (self.prefix + self.name + "." + item, round(value, 6), timestamp) |
292 | + for item, value in [("count", self.count), ("rate", rate)] ) |
293 | + |
294 | + |
295 | +class DeriveMetricFactory(object): |
296 | + implements(IMetricFactory, IPlugin) |
297 | + |
298 | + name = "derive" |
299 | + metric_type = "dx" |
300 | + |
301 | + def build_metric(self, prefix, name, wall_time_func=None): |
302 | + return DeriveMetricReporter(name, prefix=prefix, wall_time_func=wall_time_func) |
303 | + |
304 | + def configure(self, options): pass |
305 | + |
306 | +derive_metric_factory = DeriveMetricFactory() |
307 | |
308 | === added file 'twisted/plugins/distinct_plugin.py' |
309 | --- twisted/plugins/distinct_plugin.py 1970-01-01 00:00:00 +0000 |
310 | +++ twisted/plugins/distinct_plugin.py 2012-05-14 16:57:19 +0000 |
311 | @@ -0,0 +1,20 @@ |
312 | +from zope.interface import implements |
313 | + |
314 | +from twisted.plugin import IPlugin |
315 | +from txstatsd.itxstatsd import IMetricFactory |
316 | +from txstatsd.metrics.distinctmetric import DistinctMetricReporter |
317 | + |
318 | +class DistinctMetricFactory(object): |
319 | + implements(IMetricFactory, IPlugin) |
320 | + |
321 | + name = "pdistinct" |
322 | + metric_type = "pd" |
323 | + |
324 | + def build_metric(self, prefix, name, wall_time_func=None): |
325 | + return DistinctMetricReporter(name, prefix=prefix, |
326 | + wall_time_func=wall_time_func) |
327 | + |
328 | + def configure(self, options): |
329 | + pass |
330 | + |
331 | +distinct_metric_factory = DistinctMetricFactory() |
332 | |
333 | === added file 'twisted/plugins/txstatsd_plugin.py' |
334 | --- twisted/plugins/txstatsd_plugin.py 1970-01-01 00:00:00 +0000 |
335 | +++ twisted/plugins/txstatsd_plugin.py 2012-05-14 16:57:19 +0000 |
336 | @@ -0,0 +1,22 @@ |
337 | +from zope.interface import implements |
338 | + |
339 | +from twisted.plugin import IPlugin |
340 | +from twisted.application.service import IServiceMaker |
341 | + |
342 | +from txstatsd import service |
343 | + |
344 | + |
345 | +class StatsDServiceMaker(object): |
346 | + implements(IServiceMaker, IPlugin) |
347 | + tapname = "statsd" |
348 | + description = "Collect and aggregate stats for graphite." |
349 | + options = service.StatsDOptions |
350 | + |
351 | + def makeService(self, options): |
352 | + """ |
353 | + Construct a StatsD service. |
354 | + """ |
355 | + return service.createService(options) |
356 | + |
357 | +# Now construct an object which *provides* the relevant interfaces |
358 | +serviceMaker = StatsDServiceMaker() |
359 | |
360 | === added directory 'txstatsd' |
361 | === added file 'txstatsd.conf-example' |
362 | --- txstatsd.conf-example 1970-01-01 00:00:00 +0000 |
363 | +++ txstatsd.conf-example 2012-05-14 16:57:19 +0000 |
364 | @@ -0,0 +1,26 @@ |
365 | +[statsd] |
366 | +# The host where carbon cache is listening. |
367 | +carbon-cache-host: localhost |
368 | +# The port where carbon cache is listening. |
369 | +carbon-cache-port: 2003 |
370 | +# The UDP port where we will listen. |
371 | +listen-port: 8125 |
372 | + |
373 | +# The number of milliseconds between each flush. |
374 | +flush-interval: 60000 |
375 | + |
376 | +# Which additional stats to report {process|net|io|system}. |
377 | +report: |
378 | +# Prefix to use when reporting stats. |
379 | +prefix: |
380 | +# Produce StatsD-compliant messages. |
381 | +statsd-compliance: 1 |
382 | + |
383 | +# Support application monitoring. UDP echo is initially supported. |
384 | +# Should we receive the monitor-message, we respond with the |
385 | +# configured monitor-response. |
386 | +monitor-message: txstatsd ping |
387 | +monitor-response: txstatsd pong |
388 | + |
389 | +[plugin_sample] |
390 | +sample-key: sample-value |
391 | \ No newline at end of file |
392 | |
393 | === renamed file 'txstatsd.conf-example' => 'txstatsd.conf-example.moved' |
394 | === renamed directory 'txstatsd' => 'txstatsd.moved' |
395 | === added file 'txstatsd/__init__.py' |
396 | === added file 'txstatsd/client.py' |
397 | --- txstatsd/client.py 1970-01-01 00:00:00 +0000 |
398 | +++ txstatsd/client.py 2012-05-14 16:57:19 +0000 |
399 | @@ -0,0 +1,165 @@ |
400 | +import socket |
401 | + |
402 | +from twisted.internet.defer import inlineCallbacks, returnValue |
403 | +from twisted.internet.protocol import DatagramProtocol |
404 | +from twisted.python import log |
405 | + |
406 | + |
407 | +class StatsDClientProtocol(DatagramProtocol): |
408 | + """A Twisted-based implementation of the StatsD client protocol. |
409 | + |
410 | + Data is sent via UDP to a StatsD server for aggregation. |
411 | + """ |
412 | + |
413 | + def __init__(self, client): |
414 | + self.client = client |
415 | + |
416 | + def startProtocol(self): |
417 | + """Connect to destination host.""" |
418 | + self.client.connect(self.transport) |
419 | + |
420 | + def stopProtocol(self): |
421 | + """Connection was lost.""" |
422 | + self.client.disconnect() |
423 | + |
424 | + |
425 | +class TwistedStatsDClient(object): |
426 | + |
427 | + def __init__(self, host, port, |
428 | + connect_callback=None, |
429 | + disconnect_callback=None, |
430 | + resolver_errback=None): |
431 | + """ |
432 | + Build a connection that reports to the endpoint (on C{host} and |
433 | + C{port}) using UDP. |
434 | + |
435 | + @param host: The StatsD server host. |
436 | + @param port: The StatsD server port. |
437 | + @param resolver_errback: The errback to invoke should |
438 | + issues occur resolving the supplied C{host}. |
439 | + @param connect_callback: The callback to invoke on connection. |
440 | + @param disconnect_callback: The callback to invoke on disconnection. |
441 | + """ |
442 | + from twisted.internet import reactor |
443 | + |
444 | + self.reactor = reactor |
445 | + |
446 | + @inlineCallbacks |
447 | + def resolve(host): |
448 | + self.host = yield reactor.resolve(host) |
449 | + returnValue(self.host) |
450 | + |
451 | + self.host = None |
452 | + self.resolver = resolve(host) |
453 | + if resolver_errback is None: |
454 | + self.resolver.addErrback(log.err) |
455 | + else: |
456 | + self.resolver.addErrback(resolver_errback) |
457 | + |
458 | + self.port = port |
459 | + self.connect_callback = connect_callback |
460 | + self.disconnect_callback = disconnect_callback |
461 | + |
462 | + self.transport = None |
463 | + |
464 | + @inlineCallbacks |
465 | + def connect(self, transport=None): |
466 | + """Connect to the StatsD server.""" |
467 | + host = yield self.resolver |
468 | + if host is not None: |
469 | + self.transport = transport |
470 | + if self.transport is not None: |
471 | + if self.connect_callback is not None: |
472 | + self.connect_callback() |
473 | + |
474 | + def disconnect(self): |
475 | + """Disconnect from the StatsD server.""" |
476 | + if self.disconnect_callback is not None: |
477 | + self.disconnect_callback() |
478 | + self.transport = None |
479 | + |
480 | + def write(self, data, callback=None): |
481 | + """Send the metric to the StatsD server. |
482 | + |
483 | + @param data: The data to be sent. |
484 | + @param callback: The callback to which the result should be sent. |
485 | + B{Note}: The C{callback} will be called in the C{reactor} |
486 | + thread, and not in the thread of the original caller. |
487 | + """ |
488 | + self.reactor.callFromThread(self._write, data, callback) |
489 | + |
490 | + def _write(self, data, callback): |
491 | + """Send the metric to the StatsD server. |
492 | + |
493 | + @param data: The data to be sent. |
494 | + @param callback: The callback to which the result should be sent. |
495 | + @raise twisted.internet.error.MessageLengthError: If the size of data |
496 | + is too large. |
497 | + """ |
498 | + if self.host is not None and self.transport is not None: |
499 | + try: |
500 | + bytes_sent = self.transport.write(data, (self.host, self.port)) |
501 | + if callback is not None: |
502 | + callback(bytes_sent) |
503 | + except (OverflowError, TypeError, socket.error, socket.gaierror): |
504 | + if callback is not None: |
505 | + callback(None) |
506 | + |
507 | + |
508 | +class UdpStatsDClient(object): |
509 | + |
510 | + def __init__(self, host=None, port=None): |
511 | + """Build a connection that reports to C{host} and C{port}) |
512 | + using UDP. |
513 | + |
514 | + @param host: The StatsD host. |
515 | + @param port: The StatsD port. |
516 | + @raise ValueError: If the C{host} and C{port} cannot be |
517 | + resolved (for the case where they are not C{None}). |
518 | + """ |
519 | + self.host = host |
520 | + self.port = port |
521 | + |
522 | + if host is not None and port is not None: |
523 | + try: |
524 | + self.host, self.port = socket.getaddrinfo( |
525 | + host, port, socket.AF_INET, |
526 | + socket.SOCK_DGRAM, socket.SOL_UDP)[0][4] |
527 | + except (TypeError, IndexError, socket.error, socket.gaierror): |
528 | + raise ValueError("The address cannot be resolved.") |
529 | + |
530 | + self.socket = None |
531 | + |
532 | + def connect(self): |
533 | + """Connect to the StatsD server.""" |
534 | + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
535 | + self.socket.setblocking(0) |
536 | + |
537 | + def disconnect(self): |
538 | + """Disconnect from the StatsD server.""" |
539 | + if self.socket is not None: |
540 | + self.socket.close() |
541 | + self.socket = None |
542 | + |
543 | + def write(self, data): |
544 | + """Send the metric to the StatsD server.""" |
545 | + if self.host is None or self.port is None or self.socket is None: |
546 | + return |
547 | + try: |
548 | + return self.socket.sendto(data, (self.host, self.port)) |
549 | + except (socket.error, socket.gaierror): |
550 | + return None |
551 | + |
552 | + |
553 | +class InternalClient(object): |
554 | + """A connection that can be used inside the C{StatsD} daemon itself.""" |
555 | + |
556 | + def __init__(self, processor): |
557 | + """ |
558 | + A connection that writes directly to the C{MessageProcessor}. |
559 | + """ |
560 | + self._processor = processor |
561 | + |
562 | + def write(self, data): |
563 | + """Write directly to the C{MessageProcessor}.""" |
564 | + self._processor.process(data) |
565 | |
566 | === added file 'txstatsd/itxstatsd.py' |
567 | --- txstatsd/itxstatsd.py 1970-01-01 00:00:00 +0000 |
568 | +++ txstatsd/itxstatsd.py 2012-05-14 16:57:19 +0000 |
569 | @@ -0,0 +1,54 @@ |
570 | +from zope.interface import Interface, Attribute |
571 | + |
572 | + |
573 | +class IMetricFactory(Interface): |
574 | + name = Attribute(""" |
575 | + @type name C{str} |
576 | + @ivar name: The name of this kind of metric |
577 | + """) |
578 | + |
579 | + metric_type = Attribute(""" |
580 | + @type metric_type: C{str} |
581 | + @ivar metric_type: The string that will be used by clients to send |
582 | + metrics of this kind to the server. |
583 | + """) |
584 | + |
585 | + def build_metric(prefix, name, wall_time_func=None): |
586 | + """ |
587 | + Returns an object that implements the C{IMetric} interface for name. |
588 | + |
589 | + @type prefix: C{str} |
590 | + @param prefix: The prefix used for reporting this metric. |
591 | + @type name: C{str} |
592 | + @param name: The name used for reporting this metric. |
593 | + """ |
594 | + |
595 | + def configure(options): |
596 | + """ |
597 | + Configures the factory. Will be called at startup by the service |
598 | + factory. |
599 | + |
600 | + @type options: C{twisted.python.usage.Options} |
601 | + @param options: The configuration options. |
602 | + """ |
603 | + |
604 | + |
605 | +class IMetric(Interface): |
606 | + def process(fields): |
607 | + """ |
608 | + Process new data for this metric. |
609 | + |
610 | + @type fields: C{list} |
611 | + @param fields: The list of message parts. Usually in the form of |
612 | + (value, metric_type, [sample_rate]) |
613 | + """ |
614 | + |
615 | + def flush(interval, timestamp): |
616 | + """ |
617 | + Returns a string with new line separated list of metrics to report. |
618 | + |
619 | + @type interval: C{float} |
620 | + @param interval: The time since last flush. |
621 | + @type timestamp: C{float} |
622 | + @param timestamp: The timestamp for now. |
623 | + """ |
624 | |
625 | === added directory 'txstatsd/metrics' |
626 | === added file 'txstatsd/metrics/__init__.py' |
627 | === added file 'txstatsd/metrics/countermetric.py' |
628 | --- txstatsd/metrics/countermetric.py 1970-01-01 00:00:00 +0000 |
629 | +++ txstatsd/metrics/countermetric.py 2012-05-14 16:57:19 +0000 |
630 | @@ -0,0 +1,67 @@ |
631 | +import math |
632 | + |
633 | +from txstatsd.metrics.metric import Metric |
634 | + |
635 | + |
636 | +class CounterMetric(Metric): |
637 | + """An incrementing and decrementing counter metric.""" |
638 | + |
639 | + def __init__(self, connection, name, sample_rate=1): |
640 | + """Construct a metric that reports samples to the supplied |
641 | + C{connection}. |
642 | + |
643 | + @param connection: The connection endpoint representing |
644 | + the StatsD server. |
645 | + @param name: Indicates what is being instrumented. |
646 | + @param sample_rate: Restrict the number of samples sent |
647 | + to the StatsD server based on the supplied C{sample_rate}. |
648 | + """ |
649 | + Metric.__init__(self, connection, name, sample_rate=sample_rate) |
650 | + |
651 | + self._count = 0 |
652 | + |
653 | + def increment(self, value): |
654 | + """Increment the counter by C{value}""" |
655 | + self._count += value |
656 | + self._update(self._count) |
657 | + |
658 | + def decrement(self, value): |
659 | + """Decrement the counter by C{value}""" |
660 | + self._count -= value |
661 | + self._update(self._count) |
662 | + |
663 | + def count(self): |
664 | + """Returns the counter's current value.""" |
665 | + return self._count |
666 | + |
667 | + def clear(self): |
668 | + """Resets the counter to 0.""" |
669 | + self._count = 0 |
670 | + self._update(self._count) |
671 | + |
672 | + def _update(self, value): |
673 | + """Report the counter.""" |
674 | + self.send("%s|c" % value) |
675 | + |
676 | + |
677 | +class CounterMetricReporter(object): |
678 | + """An incrementing and decrementing counter metric.""" |
679 | + |
680 | + def __init__(self, name, prefix=""): |
681 | + """Construct a metric we expect to be periodically updated. |
682 | + |
683 | + @param name: Indicates what is being instrumented. |
684 | + """ |
685 | + self.name = name |
686 | + |
687 | + if prefix: |
688 | + prefix += "." |
689 | + self.prefix = prefix |
690 | + self.count = 0 |
691 | + |
692 | + def mark(self, value): |
693 | + self.count = value |
694 | + |
695 | + def report(self, timestamp): |
696 | + return [(self.prefix + self.name + ".count", |
697 | + math.trunc(self.count), timestamp)] |
698 | |
699 | === added file 'txstatsd/metrics/distinctmetric.py' |
700 | --- txstatsd/metrics/distinctmetric.py 1970-01-01 00:00:00 +0000 |
701 | +++ txstatsd/metrics/distinctmetric.py 2012-05-14 16:57:19 +0000 |
702 | @@ -0,0 +1,150 @@ |
703 | +# Copyright (C) 2011 Canonical |
704 | +# All Rights Reserved |
705 | +""" |
706 | +Implements a probabilistic distinct counter with sliding windows. |
707 | + |
708 | +Based on: |
709 | +http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.12.7100 |
710 | + |
711 | +And extended for sliding windows. |
712 | +""" |
713 | +import random |
714 | +import time |
715 | + |
716 | +from zope.interface import implements |
717 | + |
718 | +from txstatsd.metrics.metric import Metric |
719 | +from txstatsd.itxstatsd import IMetric |
720 | + |
721 | + |
722 | +class SBoxHash(object): |
723 | + """A very fast hash. |
724 | + |
725 | + This class create a random hash function that is very fast. |
726 | + Based on SBOXes. Not Crypto Strong. |
727 | + |
728 | + Two instances of this class will hash differently. |
729 | + """ |
730 | + |
731 | + def __init__(self): |
732 | + self.table = [random.randint(0, 0xFFFFFFFF - 1) for i in range(256)] |
733 | + |
734 | + def hash(self, data): |
735 | + value = 0 |
736 | + for c in data: |
737 | + value = value ^ self.table[ord(c)] |
738 | + value = value * 3 |
739 | + value = value & 0xFFFFFFFF |
740 | + return value |
741 | + |
742 | + |
743 | +def hash(data): |
744 | + """Hash data using a random hasher.""" |
745 | + p = SBoxHash() |
746 | + return p.hash(data) |
747 | + |
748 | + |
749 | +def zeros(n): |
750 | + """Count the zeros to the right of the binary representation of n.""" |
751 | + count = 0 |
752 | + i = 0 |
753 | + while True: |
754 | + v = (n >> i) |
755 | + if v <= 0: |
756 | + return count |
757 | + if v & 1: |
758 | + return count |
759 | + count += 1 |
760 | + i += 1 |
761 | + return count |
762 | + |
763 | + |
764 | +class SlidingDistinctCounter(object): |
765 | + """A probabilistic distinct counter with sliding windows.""" |
766 | + |
767 | + def __init__(self, n_hashes, n_buckets): |
768 | + self.n_hashes = n_hashes |
769 | + self.n_buckets = n_buckets |
770 | + |
771 | + self.hashes = [SBoxHash() for i in range(n_hashes)] |
772 | + self.buckets = [[0] * n_buckets for i in range(n_hashes)] |
773 | + |
774 | + def add(self, when, item): |
775 | + hashes = (h.hash(item) for h in self.hashes) |
776 | + for i, value in enumerate(hashes): |
777 | + self.buckets[i][min(self.n_buckets - 1, zeros(value))] = when |
778 | + |
779 | + def distinct(self, since=0): |
780 | + total = 0.0 |
781 | + for i in range(self.n_hashes): |
782 | + least0 = 0 |
783 | + for b in range(self.n_buckets): |
784 | + if self.buckets[i][b] <= since: |
785 | + break |
786 | + least0 += 1 |
787 | + total += least0 |
788 | + v = total / self.n_hashes |
789 | + return int((2 ** v) / 0.77351) |
790 | + |
791 | + |
792 | +class DistinctMetric(Metric): |
793 | + """ |
794 | + Keeps an estimate of the distinct numbers of items seen on various |
795 | + sliding windows of time. |
796 | + """ |
797 | + |
798 | + def mark(self, item): |
799 | + """Report this item was seen.""" |
800 | + self.send("%s|d" % item) |
801 | + |
802 | + |
803 | +class DistinctMetricReporter(object): |
804 | + """ |
805 | + Keeps an estimate of the distinct numbers of items seen on various |
806 | + sliding windows of time. |
807 | + """ |
808 | + implements(IMetric) |
809 | + |
810 | + def __init__(self, name, wall_time_func=time.time, prefix=""): |
811 | + """Construct a metric we expect to be periodically updated. |
812 | + |
813 | + @param name: Indicates what is being instrumented. |
814 | + @param wall_time_func: Function for obtaining wall time. |
815 | + @param prefix: If present, a string to prepend to the message |
816 | + composed when C{report} is called. |
817 | + """ |
818 | + self.name = name |
819 | + self.wall_time_func = wall_time_func |
820 | + self.counter = SlidingDistinctCounter(32, 32) |
821 | + if prefix: |
822 | + prefix += "." |
823 | + self.prefix = prefix |
824 | + |
825 | + def count(self): |
826 | + return self.counter.distinct() |
827 | + |
828 | + def count_1min(self, now): |
829 | + return self.counter.distinct(now - 60) |
830 | + |
831 | + def count_1hour(self, now): |
832 | + return self.counter.distinct(now - 60 * 60) |
833 | + |
834 | + def count_1day(self, now): |
835 | + return self.counter.distinct(now - 60 * 60 * 24) |
836 | + |
837 | + def process(self, fields): |
838 | + self.update(fields[0]) |
839 | + |
840 | + def update(self, item): |
841 | + self.counter.add(self.wall_time_func(), item) |
842 | + |
843 | + def flush(self, interval, timestamp): |
844 | + now = self.wall_time_func() |
845 | + metrics = [] |
846 | + items = {".count": self.count(), |
847 | + ".count_1min": self.count_1min(now), |
848 | + ".count_1hour": self.count_1hour(now), |
849 | + ".count_1day": self.count_1day(now)} |
850 | + for item, value in items.iteritems(): |
851 | + metrics.append((self.prefix + self.name + item, value, timestamp)) |
852 | + return metrics |
853 | |
854 | === added file 'txstatsd/metrics/extendedmetrics.py' |
855 | --- txstatsd/metrics/extendedmetrics.py 1970-01-01 00:00:00 +0000 |
856 | +++ txstatsd/metrics/extendedmetrics.py 2012-05-14 16:57:19 +0000 |
857 | @@ -0,0 +1,53 @@ |
858 | + |
859 | +from txstatsd.metrics.countermetric import CounterMetric |
860 | +from txstatsd.metrics.timermetric import TimerMetric |
861 | +from txstatsd.metrics.metrics import Metrics |
862 | + |
863 | + |
864 | +class ExtendedMetrics(Metrics): |
865 | + |
866 | + def __init__(self, connection=None, namespace=""): |
867 | + """A convenience class for reporting metric samples |
868 | + to a C{txstatsd} server configured with the |
869 | + L{ConfigurableProcessor<txstatsd.server.configurableprocessor>} |
870 | + processor. |
871 | + |
872 | + @param connection: The connection endpoint representing |
873 | + the C{txstatsd} server. |
874 | + @param namespace: The top-level namespace identifying the |
875 | + origin of the samples. |
876 | + """ |
877 | + |
878 | + super(ExtendedMetrics, self).__init__(connection, namespace) |
879 | + |
880 | + def increment(self, name, value=1, sample_rate=1): |
881 | + """Report and increase in name by count.""" |
882 | + name = self.fully_qualify_name(name) |
883 | + if not name in self._metrics: |
884 | + metric = CounterMetric(self.connection, |
885 | + name, |
886 | + sample_rate) |
887 | + self._metrics[name] = metric |
888 | + self._metrics[name].increment(value) |
889 | + |
890 | + def decrement(self, name, value=1, sample_rate=1): |
891 | + """Report and decrease in name by count.""" |
892 | + name = self.fully_qualify_name(name) |
893 | + if not name in self._metrics: |
894 | + metric = CounterMetric(self.connection, |
895 | + name, |
896 | + sample_rate) |
897 | + self._metrics[name] = metric |
898 | + self._metrics[name].decrement(value) |
899 | + |
900 | + def timing(self, name, duration=None, sample_rate=1): |
901 | + """Report this sample performed in duration seconds.""" |
902 | + if duration is None: |
903 | + duration = self.calculate_duration() |
904 | + name = self.fully_qualify_name(name) |
905 | + if not name in self._metrics: |
906 | + metric = TimerMetric(self.connection, |
907 | + name, |
908 | + sample_rate) |
909 | + self._metrics[name] = metric |
910 | + self._metrics[name].mark(duration) |
911 | |
912 | === added file 'txstatsd/metrics/gaugemetric.py' |
913 | --- txstatsd/metrics/gaugemetric.py 1970-01-01 00:00:00 +0000 |
914 | +++ txstatsd/metrics/gaugemetric.py 2012-05-14 16:57:19 +0000 |
915 | @@ -0,0 +1,43 @@ |
916 | +from txstatsd.metrics.metric import Metric |
917 | + |
918 | + |
919 | +class GaugeMetric(Metric): |
920 | + """A gauge metric is an instantaneous reading of a particular value.""" |
921 | + |
922 | + def __init__(self, connection, name, sample_rate=1): |
923 | + """Construct a metric that reports samples to the supplied |
924 | + C{connection}. |
925 | + |
926 | + @param connection: The connection endpoint representing |
927 | + the StatsD server. |
928 | + @param name: Indicates what is being instrumented. |
929 | + @param sample_rate: Restrict the number of samples sent |
930 | + to the StatsD server based on the supplied C{sample_rate}. |
931 | + """ |
932 | + Metric.__init__(self, connection, name, sample_rate=sample_rate) |
933 | + |
934 | + def mark(self, value): |
935 | + """Report the C{value} for this gauge.""" |
936 | + self.send("%s|g" % value) |
937 | + |
938 | + |
939 | +class GaugeMetricReporter(object): |
940 | + """A gauge metric is an instantaneous reading of a particular value.""" |
941 | + |
942 | + def __init__(self, name, prefix=""): |
943 | + """Construct a metric we expect to be periodically updated. |
944 | + |
945 | + @param name: Indicates what is being instrumented. |
946 | + """ |
947 | + self.name = name |
948 | + |
949 | + if prefix: |
950 | + prefix += "." |
951 | + self.prefix = prefix |
952 | + self.value = 0 |
953 | + |
954 | + def mark(self, value): |
955 | + self.value = value |
956 | + |
957 | + def report(self, timestamp): |
958 | + return [(self.prefix + self.name + ".value", self.value, timestamp)] |
959 | |
960 | === added file 'txstatsd/metrics/histogrammetric.py' |
961 | --- txstatsd/metrics/histogrammetric.py 1970-01-01 00:00:00 +0000 |
962 | +++ txstatsd/metrics/histogrammetric.py 2012-05-14 16:57:19 +0000 |
963 | @@ -0,0 +1,209 @@ |
964 | +# -*- coding: utf-8 *-* |
965 | +import math |
966 | + |
967 | +from txstatsd.stats.exponentiallydecayingsample \ |
968 | + import ExponentiallyDecayingSample |
969 | +from txstatsd.stats.uniformsample import UniformSample |
970 | + |
971 | + |
972 | +class HistogramMetricReporter(object): |
973 | + """ |
974 | + A metric which calculates the distribution of a value. |
975 | + |
976 | + See: |
977 | + - U{Accurately computing running variance |
978 | + <http://www.johndcook.com/standard_deviation.html>} |
979 | + """ |
980 | + |
981 | + @classmethod |
982 | + def using_uniform_sample(cls, prefix=""): |
983 | + """ |
984 | + Uses a uniform sample of 1028 elements, which offers a 99.9% |
985 | + confidence level with a 5% margin of error assuming a normal |
986 | + distribution. |
987 | + """ |
988 | + sample = UniformSample(1028) |
989 | + return HistogramMetricReporter(sample, prefix=prefix) |
990 | + |
991 | + @classmethod |
992 | + def using_exponentially_decaying_sample(cls, prefix=""): |
993 | + """ |
994 | + Uses an exponentially decaying sample of 1028 elements, which offers |
995 | + a 99.9% confidence level with a 5% margin of error assuming a normal |
996 | + distribution, and an alpha factor of 0.015, which heavily biases |
997 | + the sample to the past 5 minutes of measurements. |
998 | + """ |
999 | + sample = ExponentiallyDecayingSample(1028, 0.015) |
1000 | + return HistogramMetricReporter(sample, prefix=prefix) |
1001 | + |
1002 | + def __init__(self, sample, prefix=""): |
1003 | + """Creates a new HistogramMetric with the given sample. |
1004 | + |
1005 | + @param sample: The sample to create a histogram from. |
1006 | + """ |
1007 | + self.sample = sample |
1008 | + |
1009 | + if prefix: |
1010 | + prefix += "." |
1011 | + self.prefix = prefix |
1012 | + |
1013 | + self._min = 0 |
1014 | + self._max = 0 |
1015 | + self._sum = 0 |
1016 | + |
1017 | + # These are for the Welford algorithm for calculating running |
1018 | + # variance without floating-point doom. |
1019 | + self.variance = [-1.0, 0.0] # M, S |
1020 | + self.count = 0 |
1021 | + |
1022 | + self.clear() |
1023 | + |
1024 | + def clear(self): |
1025 | + """Clears all recorded values.""" |
1026 | + self.sample.clear() |
1027 | + self.count = 0 |
1028 | + self._max = None |
1029 | + self._min = None |
1030 | + self._sum = 0 |
1031 | + self.variance = [-1.0, 0.0] |
1032 | + |
1033 | + def update(self, value, name=""): |
1034 | + """Adds a recorded value. |
1035 | + |
1036 | + @param value: The length of the value. |
1037 | + """ |
1038 | + self.count += 1 |
1039 | + self.sample.update(value) |
1040 | + self.set_max(value) |
1041 | + self.set_min(value) |
1042 | + self._sum += value |
1043 | + self.update_variance(value) |
1044 | + |
1045 | + def report(self, timestamp): |
1046 | + # median, 75, 95, 98, 99, 99.9 percentile |
1047 | + percentiles = self.percentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999) |
1048 | + metrics = [] |
1049 | + items = { |
1050 | + ".min": self.min(), |
1051 | + ".max": self.max(), |
1052 | + ".mean": self.mean(), |
1053 | + ".stddev": self.std_dev(), |
1054 | + ".median": percentiles[0], |
1055 | + ".75percentile": percentiles[1], |
1056 | + ".95percentile": percentiles[2], |
1057 | + ".98percentile": percentiles[3], |
1058 | + ".99percentile": percentiles[4], |
1059 | + ".999percentile": percentiles[5]} |
1060 | + |
1061 | + for item, value in items.itervalues(): |
1062 | + metrics.append((self.prefix + self.name + item, value, timestamp)) |
1063 | + return metrics |
1064 | + |
1065 | + def min(self): |
1066 | + """Returns the smallest recorded value.""" |
1067 | + return self._min if self.count > 0 else 0.0 |
1068 | + |
1069 | + def max(self): |
1070 | + """Returns the largest recorded value.""" |
1071 | + return self._max if self.count > 0 else 0.0 |
1072 | + |
1073 | + def mean(self): |
1074 | + """Returns the arithmetic mean of all recorded values.""" |
1075 | + return float(self._sum) / self.count if self.count > 0 else 0.0 |
1076 | + |
1077 | + def std_dev(self): |
1078 | + """Returns the standard deviation of all recorded values.""" |
1079 | + return math.sqrt(self.get_variance()) if self.count > 0 else 0.0 |
1080 | + |
1081 | + def percentiles(self, *percentiles): |
1082 | + """Returns a list of values at the given percentiles. |
1083 | + |
1084 | + @param percentiles one or more percentiles |
1085 | + """ |
1086 | + |
1087 | + scores = [0.0] * len(percentiles) |
1088 | + if self.count > 0: |
1089 | + values = self.sample.get_values() |
1090 | + values.sort() |
1091 | + |
1092 | + for i in range(len(percentiles)): |
1093 | + p = percentiles[i] |
1094 | + pos = p * (len(values) + 1) |
1095 | + if pos < 1: |
1096 | + scores[i] = values[0] |
1097 | + elif pos >= len(values): |
1098 | + scores[i] = values[-1] |
1099 | + else: |
1100 | + lower = values[int(pos) - 1] |
1101 | + upper = values[int(pos)] |
1102 | + scores[i] = lower + (pos - math.floor(pos)) * ( |
1103 | + upper - lower) |
1104 | + |
1105 | + return scores |
1106 | + |
1107 | + def histogram(self): |
1108 | + """Returns an histogram of the sample. |
1109 | + """ |
1110 | + |
1111 | + # If we dont have data, build an empty histogram. |
1112 | + if not self.count > 0: |
1113 | + return [0.0] * 10 |
1114 | + |
1115 | + # Sturges Rule for selecting the number of bins |
1116 | + # Sturges, H. A. (1926) The choice of a class interval. |
1117 | + # Journal of the American Statistical Association 21, 65–66. |
1118 | + n_bins = int(math.ceil(1 + math.log(self.count, 2))) |
1119 | + |
1120 | + scores = [0.0] * n_bins |
1121 | + |
1122 | + values = self.sample.get_values() |
1123 | + max_value = float(max(values)) |
1124 | + min_value = float(min(values)) |
1125 | + value_range = max_value - min_value |
1126 | + |
1127 | + for value in values: |
1128 | + pos = int(((value - min_value) / value_range) * n_bins) |
1129 | + if pos == n_bins: |
1130 | + pos -= 1 |
1131 | + |
1132 | + scores[pos] += 1 |
1133 | + return scores |
1134 | + |
1135 | + def get_values(self): |
1136 | + """Returns a list of all values in the histogram's sample.""" |
1137 | + return self.sample.get_values() |
1138 | + |
1139 | + def get_variance(self): |
1140 | + if self.count <= 1: |
1141 | + return 0.0 |
1142 | + return self.variance[1] / (self.count - 1) |
1143 | + |
1144 | + def set_max(self, potential_max): |
1145 | + if self._max is None: |
1146 | + self._max = potential_max |
1147 | + else: |
1148 | + self._max = max(self.max(), potential_max) |
1149 | + |
1150 | + def set_min(self, potential_min): |
1151 | + if self._min is None: |
1152 | + self._min = potential_min |
1153 | + else: |
1154 | + self._min = min(self.min(), potential_min) |
1155 | + |
1156 | + def update_variance(self, value): |
1157 | + old_values = self.variance |
1158 | + new_values = [0.0, 0.0] |
1159 | + if old_values[0] == -1: |
1160 | + new_values[0] = value |
1161 | + new_values[1] = 0.0 |
1162 | + else: |
1163 | + old_m = old_values[0] |
1164 | + old_s = old_values[1] |
1165 | + |
1166 | + new_m = old_m + (float(value - old_m) / self.count) |
1167 | + new_s = old_s + (float(value - old_m) * (value - new_m)) |
1168 | + |
1169 | + new_values[0] = new_m |
1170 | + new_values[1] = new_s |
1171 | + |
1172 | + self.variance = new_values |
1173 | |
1174 | === added file 'txstatsd/metrics/imetrics.py' |
1175 | --- txstatsd/metrics/imetrics.py 1970-01-01 00:00:00 +0000 |
1176 | +++ txstatsd/metrics/imetrics.py 2012-05-14 16:57:19 +0000 |
1177 | @@ -0,0 +1,18 @@ |
1178 | +from zope.interface import Interface |
1179 | + |
1180 | +class IMetrics(Interface): |
1181 | + """ |
1182 | + Provides a global utility for sending out metric information. |
1183 | + """ |
1184 | + |
1185 | + def gauge(name, value, sample_rate=1): |
1186 | + """Record an absolute reading for C{name} with C{value}.""" |
1187 | + |
1188 | + def increment(name, value=1, sample_rate=1): |
1189 | + """Increment counter C{name} by C{count}.""" |
1190 | + |
1191 | + def decrement(name, value=1, sample_rate=1): |
1192 | + """Decrement counter C{name} by C{count}.""" |
1193 | + |
1194 | + def timing(name, duration=None, sample_rate=1): |
1195 | + """Report that C{name} took C{duration} seconds.""" |
1196 | |
1197 | === added file 'txstatsd/metrics/metermetric.py' |
1198 | --- txstatsd/metrics/metermetric.py 1970-01-01 00:00:00 +0000 |
1199 | +++ txstatsd/metrics/metermetric.py 2012-05-14 16:57:19 +0000 |
1200 | @@ -0,0 +1,103 @@ |
1201 | +import time |
1202 | + |
1203 | +from txstatsd.metrics.metric import Metric |
1204 | +from txstatsd.stats.ewma import Ewma |
1205 | + |
1206 | + |
1207 | +class MeterMetric(Metric): |
1208 | + """ |
1209 | + A meter metric which measures mean throughput and one-, five-, and |
1210 | + fifteen-minute exponentially-weighted moving average throughputs. |
1211 | + |
1212 | + See: |
1213 | + - U{EMA |
1214 | + <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>} |
1215 | + """ |
1216 | + |
1217 | + def __init__(self, connection, name, sample_rate=1): |
1218 | + """Construct a metric that reports samples to the supplied |
1219 | + C{connection}. |
1220 | + |
1221 | + @param connection: The connection endpoint representing |
1222 | + the StatsD server. |
1223 | + @param name: Indicates what is being instrumented. |
1224 | + @param sample_rate: Restrict the number of samples sent |
1225 | + to the StatsD server based on the supplied C{sample_rate}. |
1226 | + """ |
1227 | + Metric.__init__(self, connection, name, sample_rate=sample_rate) |
1228 | + |
1229 | + def mark(self, value=1): |
1230 | + """Mark the occurrence of a given number (C{value}) of events.""" |
1231 | + self.send("%s|m" % value) |
1232 | + |
1233 | + |
1234 | +class MeterMetricReporter(object): |
1235 | + """ |
1236 | + A meter metric which measures mean throughput and one-, five-, and |
1237 | + fifteen-minute exponentially-weighted moving average throughputs. |
1238 | + |
1239 | + See: |
1240 | + - U{EMA |
1241 | + <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>} |
1242 | + """ |
1243 | + |
1244 | + def __init__(self, name, wall_time_func=time.time, prefix=""): |
1245 | + """Construct a metric we expect to be periodically updated. |
1246 | + |
1247 | + @param name: Indicates what is being instrumented. |
1248 | + @param wall_time_func: Function for obtaining wall time. |
1249 | + """ |
1250 | + self.name = name |
1251 | + self.wall_time_func = wall_time_func |
1252 | + |
1253 | + if prefix: |
1254 | + prefix += "." |
1255 | + self.prefix = prefix |
1256 | + |
1257 | + self.m1_rate = Ewma.one_minute_ewma() |
1258 | + self.m5_rate = Ewma.five_minute_ewma() |
1259 | + self.m15_rate = Ewma.fifteen_minute_ewma() |
1260 | + self.count = 0 |
1261 | + self.start_time = self.wall_time_func() |
1262 | + |
1263 | + def mark(self, value=1): |
1264 | + """Mark the occurrence of a given number of events.""" |
1265 | + self.count += value |
1266 | + self.m1_rate.update(value) |
1267 | + self.m5_rate.update(value) |
1268 | + self.m15_rate.update(value) |
1269 | + |
1270 | + def tick(self): |
1271 | + """Updates the moving averages.""" |
1272 | + self.m1_rate.tick() |
1273 | + self.m5_rate.tick() |
1274 | + self.m15_rate.tick() |
1275 | + |
1276 | + def report(self, timestamp): |
1277 | + metrics = [] |
1278 | + items = {".count": self.count, |
1279 | + ".mean_rate": self.mean_rate(), |
1280 | + ".1min_rate": self.one_minute_rate(), |
1281 | + ".5min_rate": self.five_minute_rate(), |
1282 | + ".15min_rate": self.fifteen_minute_rate()} |
1283 | + |
1284 | + for item, value in items.iteritems(): |
1285 | + metrics.append((self.prefix + self.name + item, |
1286 | + round(value, 6), timestamp)) |
1287 | + return metrics |
1288 | + |
1289 | + def fifteen_minute_rate(self): |
1290 | + return self.m15_rate.rate |
1291 | + |
1292 | + def five_minute_rate(self): |
1293 | + return self.m5_rate.rate |
1294 | + |
1295 | + def one_minute_rate(self): |
1296 | + return self.m1_rate.rate |
1297 | + |
1298 | + def mean_rate(self): |
1299 | + if self.count == 0: |
1300 | + return 0.0 |
1301 | + else: |
1302 | + elapsed = self.wall_time_func() - self.start_time |
1303 | + return float(self.count) / elapsed |
1304 | |
1305 | === added file 'txstatsd/metrics/metric.py' |
1306 | --- txstatsd/metrics/metric.py 1970-01-01 00:00:00 +0000 |
1307 | +++ txstatsd/metrics/metric.py 2012-05-14 16:57:19 +0000 |
1308 | @@ -0,0 +1,47 @@ |
1309 | + |
1310 | +import random |
1311 | + |
1312 | + |
1313 | +class Metric(object): |
1314 | + """ |
1315 | + The foundation metric from which the specialized |
1316 | + metrics are derived. |
1317 | + """ |
1318 | + |
1319 | + def __init__(self, connection, name, sample_rate=1): |
1320 | + """Construct a metric that reports samples to the supplied |
1321 | + C{connection}. |
1322 | + |
1323 | + @param connection: The connection endpoint representing |
1324 | + the StatsD server. |
1325 | + @param name: Specific description for this metric. |
1326 | + @param sample_rate: Restrict the number of samples sent |
1327 | + to the StatsD server based on the supplied C{sample_rate}. |
1328 | + """ |
1329 | + self.connection = connection |
1330 | + self.name = name |
1331 | + self.sample_rate = sample_rate |
1332 | + |
1333 | + def clear(self): |
1334 | + """Responsibility of the specialized metrics.""" |
1335 | + pass |
1336 | + |
1337 | + def send(self, data): |
1338 | + """ |
1339 | + Message the C{data} to the C{StatsD} server according to the |
1340 | + C{sample_rate}. |
1341 | + """ |
1342 | + |
1343 | + if self.sample_rate < 1: |
1344 | + if random.random() > self.sample_rate: |
1345 | + return |
1346 | + data += "|@%s" % (self.sample_rate,) |
1347 | + |
1348 | + data = self.name + ":" + data |
1349 | + |
1350 | + self.write(data) |
1351 | + |
1352 | + def write(self, data): |
1353 | + """Message the C{data} to the C{StatsD} server.""" |
1354 | + if self.connection is not None: |
1355 | + self.connection.write(data) |
1356 | |
1357 | === added file 'txstatsd/metrics/metrics.py' |
1358 | --- txstatsd/metrics/metrics.py 1970-01-01 00:00:00 +0000 |
1359 | +++ txstatsd/metrics/metrics.py 2012-05-14 16:57:19 +0000 |
1360 | @@ -0,0 +1,140 @@ |
1361 | + |
1362 | +import time |
1363 | +from txstatsd.metrics.gaugemetric import GaugeMetric |
1364 | +from txstatsd.metrics.metermetric import MeterMetric |
1365 | +from txstatsd.metrics.distinctmetric import DistinctMetric |
1366 | +from txstatsd.metrics.metric import Metric |
1367 | + |
1368 | + |
1369 | +class GenericMetric(Metric): |
1370 | + def __init__(self, connection, key, name, sample_rate=1): |
1371 | + super(GenericMetric, self).__init__(connection, name, |
1372 | + sample_rate=sample_rate) |
1373 | + self.key = key |
1374 | + |
1375 | + def mark(self, value): |
1376 | + self.send("%s|%s" % (value, self.key)) |
1377 | + |
1378 | + |
1379 | +class Metrics(object): |
1380 | + def __init__(self, connection=None, namespace=""): |
1381 | + """A convenience class for reporting metric samples |
1382 | + to a StatsD server (C{connection}). |
1383 | + |
1384 | + @param connection: The connection endpoint representing |
1385 | + the StatsD server. |
1386 | + @param namespace: The top-level namespace identifying the |
1387 | + origin of the samples. |
1388 | + """ |
1389 | + |
1390 | + self.connection = connection |
1391 | + self.namespace = namespace |
1392 | + self._metrics = {} |
1393 | + self.last_time = 0 |
1394 | + |
1395 | + def report(self, name, value, metric_type, sample_rate=1): |
1396 | + """Report a generic metric. |
1397 | + |
1398 | + Used for server side plugins without client support. |
1399 | + """ |
1400 | + name = self.fully_qualify_name(name) |
1401 | + if not name in self._metrics: |
1402 | + metric = GenericMetric(self.connection, |
1403 | + metric_type, |
1404 | + name, |
1405 | + sample_rate) |
1406 | + self._metrics[name] = metric |
1407 | + self._metrics[name].mark(value) |
1408 | + |
1409 | + def gauge(self, name, value, sample_rate=1): |
1410 | + """Report an instantaneous reading of a particular value.""" |
1411 | + name = self.fully_qualify_name(name) |
1412 | + if not name in self._metrics: |
1413 | + gauge_metric = GaugeMetric(self.connection, |
1414 | + name, |
1415 | + sample_rate) |
1416 | + self._metrics[name] = gauge_metric |
1417 | + self._metrics[name].mark(value) |
1418 | + |
1419 | + def meter(self, name, value=1, sample_rate=1): |
1420 | + """Mark the occurrence of a given number of events.""" |
1421 | + name = self.fully_qualify_name(name) |
1422 | + if not name in self._metrics: |
1423 | + meter_metric = MeterMetric(self.connection, |
1424 | + name, |
1425 | + sample_rate) |
1426 | + self._metrics[name] = meter_metric |
1427 | + self._metrics[name].mark(value) |
1428 | + |
1429 | + def increment(self, name, value=1, sample_rate=1): |
1430 | + """Report and increase in name by count.""" |
1431 | + name = self.fully_qualify_name(name) |
1432 | + if not name in self._metrics: |
1433 | + metric = Metric(self.connection, |
1434 | + name, |
1435 | + sample_rate) |
1436 | + self._metrics[name] = metric |
1437 | + self._metrics[name].send("%s|c" % value) |
1438 | + |
1439 | + def decrement(self, name, value=1, sample_rate=1): |
1440 | + """Report and decrease in name by count.""" |
1441 | + name = self.fully_qualify_name(name) |
1442 | + if not name in self._metrics: |
1443 | + metric = Metric(self.connection, |
1444 | + name, |
1445 | + sample_rate) |
1446 | + self._metrics[name] = metric |
1447 | + self._metrics[name].send("%s|c" % -value) |
1448 | + |
1449 | + def reset_timing(self): |
1450 | + """Resets the duration timer for the next call to timing()""" |
1451 | + self.last_time = time.time() |
1452 | + |
1453 | + def calculate_duration(self): |
1454 | + """Resets the duration timer and returns the elapsed duration""" |
1455 | + current_time = time.time() |
1456 | + duration = current_time - self.last_time |
1457 | + self.last_time = current_time |
1458 | + return duration |
1459 | + |
1460 | + def timing(self, name, duration=None, sample_rate=1): |
1461 | + """Report that this sample performed in duration seconds. |
1462 | + Default duration is the actual elapsed time since |
1463 | + the last call to this method or reset_timing()""" |
1464 | + if duration is None: |
1465 | + duration = self.calculate_duration() |
1466 | + name = self.fully_qualify_name(name) |
1467 | + if not name in self._metrics: |
1468 | + metric = Metric(self.connection, |
1469 | + name, |
1470 | + sample_rate) |
1471 | + self._metrics[name] = metric |
1472 | + self._metrics[name].send("%s|ms" % (duration * 1000)) |
1473 | + |
1474 | + def distinct(self, name, item): |
1475 | + name = self.fully_qualify_name(name) |
1476 | + if not name in self._metrics: |
1477 | + metric = DistinctMetric(self.connection, name) |
1478 | + self._metrics[name] = metric |
1479 | + self._metrics[name].mark(item) |
1480 | + |
1481 | + def clear(self, name): |
1482 | + """Allow the metric to re-initialize its internal state.""" |
1483 | + name = self.fully_qualify_name(name) |
1484 | + if name in self._metrics: |
1485 | + metric = self._metrics[name] |
1486 | + if getattr(metric, 'clear', None) is not None: |
1487 | + metric.clear() |
1488 | + |
1489 | + def fully_qualify_name(self, name): |
1490 | + """Compose the fully-qualified name: namespace and name.""" |
1491 | + fully_qualified_name = "" |
1492 | + if self.namespace is not None: |
1493 | + fully_qualified_name = self.namespace |
1494 | + if name is not None: |
1495 | + # prepend the separator should we have a namespace |
1496 | + if self.namespace is not None and len(self.namespace) > 0: |
1497 | + fully_qualified_name += "." + name |
1498 | + else: |
1499 | + fully_qualified_name = name |
1500 | + return fully_qualified_name |
1501 | |
1502 | === added file 'txstatsd/metrics/timermetric.py' |
1503 | --- txstatsd/metrics/timermetric.py 1970-01-01 00:00:00 +0000 |
1504 | +++ txstatsd/metrics/timermetric.py 2012-05-14 16:57:19 +0000 |
1505 | @@ -0,0 +1,161 @@ |
1506 | +import json |
1507 | +import time |
1508 | + |
1509 | +from twisted.web import resource |
1510 | + |
1511 | +from txstatsd.metrics.histogrammetric import HistogramMetricReporter |
1512 | +from txstatsd.metrics.metermetric import MeterMetricReporter |
1513 | +from txstatsd.metrics.metric import Metric |
1514 | +from txstatsd.stats.exponentiallydecayingsample \ |
1515 | + import ExponentiallyDecayingSample |
1516 | + |
1517 | + |
1518 | +class TimerMetric(Metric): |
1519 | + """ |
1520 | + A timer metric which aggregates timing durations and provides duration |
1521 | + statistics, plus throughput statistics via L{MeterMetric}. |
1522 | + """ |
1523 | + |
1524 | + def __init__(self, connection, name, sample_rate=1): |
1525 | + """Construct a metric that reports samples to the supplied |
1526 | + C{connection}. |
1527 | + |
1528 | + @param connection: The connection endpoint representing |
1529 | + the StatsD server. |
1530 | + @param name: Indicates what is being instrumented. |
1531 | + @param sample_rate: Restrict the number of samples sent |
1532 | + to the StatsD server based on the supplied C{sample_rate}. |
1533 | + """ |
1534 | + Metric.__init__(self, connection, name, sample_rate=sample_rate) |
1535 | + |
1536 | + def mark(self, duration): |
1537 | + """Report this sample performed in duration (measured in seconds).""" |
1538 | + self.send("%s|ms" % (duration * 1000)) |
1539 | + |
1540 | + |
1541 | +class TimerResource(resource.Resource): |
1542 | + isLeaf = True |
1543 | + |
1544 | + def __init__(self, reporter): |
1545 | + resource.Resource.__init__(self) |
1546 | + self.reporter = reporter |
1547 | + |
1548 | + def render_GET(self, request): |
1549 | + result = dict( |
1550 | + histogram=self.reporter.histogram.histogram(), |
1551 | + max_value=self.reporter.max(), |
1552 | + min_value=self.reporter.min()) |
1553 | + return json.dumps(result) |
1554 | + |
1555 | + |
1556 | +class TimerMetricReporter(object): |
1557 | + """ |
1558 | + A timer metric which aggregates timing durations and provides duration |
1559 | + statistics, plus throughput statistics via L{MeterMetricReporter}. |
1560 | + """ |
1561 | + |
1562 | + def __init__(self, name, wall_time_func=time.time, prefix=""): |
1563 | + """Construct a metric we expect to be periodically updated. |
1564 | + |
1565 | + @param name: Indicates what is being instrumented. |
1566 | + @param wall_time_func: Function for obtaining wall time. |
1567 | + @param prefix: If present, a string to prepend to the message |
1568 | + composed when C{report} is called. |
1569 | + """ |
1570 | + self.name = name |
1571 | + self.wall_time_func = wall_time_func |
1572 | + |
1573 | + if prefix: |
1574 | + prefix += "." |
1575 | + self.prefix = prefix |
1576 | + |
1577 | + sample = ExponentiallyDecayingSample(1028, 0.015) |
1578 | + self.histogram = HistogramMetricReporter(sample) |
1579 | + self.meter = MeterMetricReporter( |
1580 | + "calls", wall_time_func=self.wall_time_func) |
1581 | + self.clear() |
1582 | + |
1583 | + def clear(self): |
1584 | + """Clears all recorded durations.""" |
1585 | + self.histogram.clear() |
1586 | + |
1587 | + def count(self): |
1588 | + return self.histogram.count |
1589 | + |
1590 | + def fifteen_minute_rate(self): |
1591 | + return self.meter.fifteen_minute_rate() |
1592 | + |
1593 | + def five_minute_rate(self): |
1594 | + return self.meter.five_minute_rate() |
1595 | + |
1596 | + def mean_rate(self): |
1597 | + return self.meter.mean_rate() |
1598 | + |
1599 | + def one_minute_rate(self): |
1600 | + return self.meter.one_minute_rate() |
1601 | + |
1602 | + def max(self): |
1603 | + """Returns the longest recorded duration.""" |
1604 | + return self.histogram.max() |
1605 | + |
1606 | + def min(self): |
1607 | + """Returns the shortest recorded duration.""" |
1608 | + return self.histogram.min() |
1609 | + |
1610 | + def mean(self): |
1611 | + """Returns the arithmetic mean of all recorded durations.""" |
1612 | + return self.histogram.mean() |
1613 | + |
1614 | + def std_dev(self): |
1615 | + """Returns the standard deviation of all recorded durations.""" |
1616 | + return self.histogram.std_dev() |
1617 | + |
1618 | + def getResource(self): |
1619 | + """Return an http resource to represent this.""" |
1620 | + return TimerResource(self) |
1621 | + |
1622 | + def percentiles(self, *percentiles): |
1623 | + """ |
1624 | + Returns an array of durations at the given percentiles. |
1625 | + |
1626 | + @param percentiles: One or more percentiles. |
1627 | + """ |
1628 | + return [percentile for percentile in |
1629 | + self.histogram.percentiles(*percentiles)] |
1630 | + |
1631 | + def get_values(self): |
1632 | + """Returns a list of all recorded durations in the timer's sample.""" |
1633 | + return [value for value in self.histogram.get_values()] |
1634 | + |
1635 | + def update(self, duration): |
1636 | + """Adds a recorded duration. |
1637 | + |
1638 | + @param duration: The length of the duration in seconds. |
1639 | + """ |
1640 | + if duration >= 0: |
1641 | + self.histogram.update(duration) |
1642 | + self.meter.mark() |
1643 | + |
1644 | + def tick(self): |
1645 | + """Updates the moving averages.""" |
1646 | + self.meter.tick() |
1647 | + |
1648 | + def report(self, timestamp): |
1649 | + # median, 75, 95, 98, 99, 99.9 percentile |
1650 | + percentiles = self.percentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999) |
1651 | + metrics = [] |
1652 | + items = {".min": self.min(), |
1653 | + ".max": self.max(), |
1654 | + ".mean": self.mean(), |
1655 | + ".stddev": self.std_dev(), |
1656 | + ".99percentile": percentiles[4], |
1657 | + ".999percentile": percentiles[5], |
1658 | + ".count": self.meter.count, |
1659 | + ".1min_rate": self.meter.one_minute_rate(), |
1660 | + ".5min_rate": self.meter.five_minute_rate(), |
1661 | + ".15min_rate": self.meter.fifteen_minute_rate()} |
1662 | + |
1663 | + for item, value in items.iteritems(): |
1664 | + metrics.append((self.prefix + self.name + item, |
1665 | + round(value, 6), timestamp)) |
1666 | + return metrics |
1667 | |
1668 | === added file 'txstatsd/process.py' |
1669 | --- txstatsd/process.py 1970-01-01 00:00:00 +0000 |
1670 | +++ txstatsd/process.py 2012-05-14 16:57:19 +0000 |
1671 | @@ -0,0 +1,217 @@ |
1672 | +import os |
1673 | +import socket |
1674 | +import psutil |
1675 | + |
1676 | +from functools import update_wrapper |
1677 | + |
1678 | + |
1679 | +MEMINFO_KEYS = ("MemTotal:", "MemFree:", "Buffers:", |
1680 | + "Cached:", "SwapCached:", "SwapTotal:", |
1681 | + "SwapFree:") |
1682 | + |
1683 | +MULTIPLIERS = {"kB": 1024, "mB": 1024 * 1024} |
1684 | + |
1685 | + |
1686 | +def load_file(filename): |
1687 | + """Load a file into memory.""" |
1688 | + with open(filename, "r") as f: |
1689 | + return f.read() |
1690 | + |
1691 | + |
1692 | +def parse_meminfo(data, prefix="sys.mem"): |
1693 | + """Parse data from /proc/meminfo.""" |
1694 | + result = {} |
1695 | + |
1696 | + for line in data.split("\n"): |
1697 | + if not line: |
1698 | + continue |
1699 | + parts = [x for x in line.split(" ") if x] |
1700 | + if not parts[0] in MEMINFO_KEYS: |
1701 | + continue |
1702 | + |
1703 | + multiple = 1 |
1704 | + |
1705 | + if len(parts) == 3: |
1706 | + multiple = MULTIPLIERS[parts[2]] |
1707 | + |
1708 | + # remove ':' |
1709 | + label = parts[0][:-1] |
1710 | + amount = int(parts[1]) * multiple |
1711 | + result[prefix + "." + label] = amount |
1712 | + |
1713 | + return result |
1714 | + |
1715 | + |
1716 | +def parse_loadavg(data, prefix="sys.loadavg"): |
1717 | + """Parse data from /proc/loadavg.""" |
1718 | + return dict(zip( |
1719 | + (prefix + ".oneminute", |
1720 | + prefix + ".fiveminutes", |
1721 | + prefix + ".fifthteenminutes"), |
1722 | + [float(x) for x in data.split()[:3]])) |
1723 | + |
1724 | + |
1725 | +def parse_netdev(data, prefix="sys.net"): |
1726 | + """Parse data from /proc/net/dev.""" |
1727 | + lines = data.splitlines() |
1728 | + # Parse out the column headers as keys. |
1729 | + _, receive_columns, transmit_columns = lines[1].split("|") |
1730 | + columns = ["recv_%s" % column for column in receive_columns.split()] |
1731 | + columns.extend(["send_%s" % column for column in transmit_columns.split()]) |
1732 | + |
1733 | + # Parse out the network devices. |
1734 | + result = {} |
1735 | + for line in lines[2:]: |
1736 | + if not ":" in line: |
1737 | + continue |
1738 | + device, data = line.split(":") |
1739 | + device = device.strip() |
1740 | + data = dict(zip(columns, map(int, data.split()))) |
1741 | + result.update({ |
1742 | + "%s.%s.bytes.received" % (prefix, device): data["recv_bytes"], |
1743 | + "%s.%s.bytes.sent" % (prefix, device): data["send_bytes"], |
1744 | + "%s.%s.packets.received" % (prefix, device): data["recv_packets"], |
1745 | + "%s.%s.packets.sent" % (prefix, device): data["send_packets"]}) |
1746 | + return result |
1747 | + |
1748 | + |
1749 | +class ProcessReport(object): |
1750 | + |
1751 | + def __init__(self, process=None): |
1752 | + self._process = process |
1753 | + |
1754 | + @property |
1755 | + def process(self): |
1756 | + """Override property with current process on first access.""" |
1757 | + if self._process is None: |
1758 | + self._process = psutil.Process(os.getpid()) |
1759 | + return self._process |
1760 | + |
1761 | + def get_memory_and_cpu(self, prefix="proc"): |
1762 | + """Report memory and CPU stats for C{process}.""" |
1763 | + vsize, rss = self.process.get_memory_info() |
1764 | + result = {prefix + ".cpu.percent": self.process.get_cpu_percent(), |
1765 | + prefix + ".memory.percent": |
1766 | + self.process.get_memory_percent(), |
1767 | + prefix + ".memory.vsize": vsize, |
1768 | + prefix + ".memory.rss": rss} |
1769 | + if getattr(self.process, "get_num_threads", None) is not None: |
1770 | + result[prefix + ".threads"] = self.process.get_num_threads() |
1771 | + return result |
1772 | + |
1773 | + def get_cpu_counters(self, prefix="proc"): |
1774 | + """Report memory and CPU counters for C{process}.""" |
1775 | + utime, stime = self.process.get_cpu_times() |
1776 | + result = {prefix + ".cpu.user": utime, |
1777 | + prefix + ".cpu.system": stime} |
1778 | + return result |
1779 | + |
1780 | + def get_io_counters(self, prefix="proc.io"): |
1781 | + """Report IO statistics for C{process}.""" |
1782 | + result = {} |
1783 | + if getattr(self.process, "get_io_counters", None) is not None: |
1784 | + (read_count, write_count, |
1785 | + read_bytes, write_bytes) = self.process.get_io_counters() |
1786 | + result.update({ |
1787 | + prefix + ".read.count": read_count, |
1788 | + prefix + ".write.count": write_count, |
1789 | + prefix + ".read.bytes": read_bytes, |
1790 | + prefix + ".write.bytes": write_bytes}) |
1791 | + return result |
1792 | + |
1793 | + def get_net_stats(self, prefix="proc.net"): |
1794 | + """Report active connection statistics for C{process}.""" |
1795 | + result = {} |
1796 | + if getattr(self.process, "get_connections", None) is not None: |
1797 | + for connection in self.process.get_connections(): |
1798 | + fd, family, _type, laddr, raddr, status = connection |
1799 | + if _type == socket.SOCK_STREAM: |
1800 | + key = prefix + ".status.%s" % status.lower() |
1801 | + if not key in result: |
1802 | + result[key] = 1 |
1803 | + else: |
1804 | + result[key] += 1 |
1805 | + return result |
1806 | + |
1807 | + |
1808 | +def report_counters(report_function): |
1809 | + """ |
1810 | + Report difference between last value and current value for wrapped |
1811 | + function. |
1812 | + """ |
1813 | + def generate(): |
1814 | + last_value = None |
1815 | + while True: |
1816 | + result = {} |
1817 | + new_value = report_function() |
1818 | + if last_value is None: |
1819 | + last_value = new_value |
1820 | + else: |
1821 | + for key, value in new_value.iteritems(): |
1822 | + result[key] = value - last_value[key] |
1823 | + last_value = new_value |
1824 | + yield result |
1825 | + generator = generate() |
1826 | + def report(): |
1827 | + return generator.next() |
1828 | + update_wrapper(report, report_function) |
1829 | + return report |
1830 | + |
1831 | + |
1832 | +process_report = ProcessReport() |
1833 | +report_process_memory_and_cpu = process_report.get_memory_and_cpu |
1834 | +report_process_cpu_counters = report_counters(process_report.get_cpu_counters) |
1835 | +report_process_io_counters = report_counters(process_report.get_io_counters) |
1836 | +report_process_net_stats = process_report.get_net_stats |
1837 | + |
1838 | + |
1839 | +def report_system_stats(prefix="sys"): |
1840 | + cpu_times = psutil.cpu_times()._asdict() |
1841 | + system_stats = {} |
1842 | + for mode, time in cpu_times.iteritems(): |
1843 | + system_stats[prefix + ".cpu." + mode] = time |
1844 | + return system_stats |
1845 | + |
1846 | + |
1847 | +def report_threadpool_stats(threadpool, prefix="threadpool"): |
1848 | + """Report stats about a given threadpool.""" |
1849 | + def report(): |
1850 | + return {prefix + ".working": len(threadpool.working), |
1851 | + prefix + ".queue": threadpool.q.qsize(), |
1852 | + prefix + ".waiters": len(threadpool.waiters), |
1853 | + prefix + ".threads": len(threadpool.threads)} |
1854 | + update_wrapper(report, report_threadpool_stats) |
1855 | + return report |
1856 | + |
1857 | + |
1858 | +def report_reactor_stats(reactor, prefix="reactor"): |
1859 | + """Report statistics about a twisted reactor.""" |
1860 | + def report(): |
1861 | + return {prefix + ".readers": len(reactor.getReaders()), |
1862 | + prefix + ".writers": len(reactor.getWriters())} |
1863 | + |
1864 | + update_wrapper(report, report_reactor_stats) |
1865 | + return report |
1866 | + |
1867 | + |
1868 | +def report_file_stats(filename, parser): |
1869 | + """Read statistics from a file and report them.""" |
1870 | + def report(): |
1871 | + return parser(load_file(filename)) |
1872 | + update_wrapper(report, report_file_stats) |
1873 | + return report |
1874 | + |
1875 | + |
1876 | +PROCESS_STATS = (report_process_memory_and_cpu,) |
1877 | + |
1878 | +COUNTER_STATS = (report_process_cpu_counters,) |
1879 | + |
1880 | +IO_STATS = (report_process_io_counters,) |
1881 | + |
1882 | +NET_STATS = (report_process_net_stats,) |
1883 | + |
1884 | +SYSTEM_STATS = (report_file_stats("/proc/meminfo", parse_meminfo), |
1885 | + report_file_stats("/proc/loadavg", parse_loadavg), |
1886 | + report_counters(report_file_stats("/proc/net/dev", parse_netdev)), |
1887 | + report_counters(report_system_stats), |
1888 | + ) |
1889 | |
1890 | === added file 'txstatsd/report.py' |
1891 | --- txstatsd/report.py 1970-01-01 00:00:00 +0000 |
1892 | +++ txstatsd/report.py 2012-05-14 16:57:19 +0000 |
1893 | @@ -0,0 +1,64 @@ |
1894 | +from twisted.internet.defer import maybeDeferred |
1895 | +from twisted.internet.task import LoopingCall |
1896 | +from twisted.python import log |
1897 | + |
1898 | +from functools import wraps |
1899 | + |
1900 | +from twisted.application.service import Service |
1901 | +from twisted.internet import threads |
1902 | + |
1903 | + |
1904 | +class ReportingService(Service): |
1905 | + |
1906 | + def __init__(self, instance_name="", clock=None): |
1907 | + self.tasks = [] |
1908 | + self.clock = clock |
1909 | + self.instance_name = instance_name |
1910 | + |
1911 | + def schedule(self, function, interval, report_function): |
1912 | + """ |
1913 | + Schedule C{function} to be called every C{interval} seconds and then |
1914 | + report gathered metrics to C{Graphite} using C{report_function}. |
1915 | + |
1916 | + If C{report_function} is C{None}, it just calls the function without |
1917 | + reporting the metrics. |
1918 | + """ |
1919 | + if report_function is not None: |
1920 | + call = self.wrapped(function, report_function) |
1921 | + else: |
1922 | + call = function |
1923 | + task = LoopingCall(call) |
1924 | + if self.clock is not None: |
1925 | + task.clock = self.clock |
1926 | + self.tasks.append((task, interval)) |
1927 | + if self.running: |
1928 | + task.start(interval, now=True) |
1929 | + |
1930 | + def wrapped(self, function, report_function): |
1931 | + def report_metrics(metrics): |
1932 | + """For each metric returned, call C{report_function} with it.""" |
1933 | + for name, value in metrics.items(): |
1934 | + if self.instance_name: |
1935 | + name = self.instance_name + "." + name |
1936 | + report_function(name, value) |
1937 | + return metrics |
1938 | + |
1939 | + @wraps(function) |
1940 | + def wrapper(): |
1941 | + """Wrap C{function} to report metrics or log a failure.""" |
1942 | + deferred = maybeDeferred(function) |
1943 | + deferred.addCallback(report_metrics) |
1944 | + deferred.addErrback(lambda failure: log.err( |
1945 | + failure, "Error while processing %s" % function.func_name)) |
1946 | + return deferred |
1947 | + return wrapper |
1948 | + |
1949 | + def startService(self): |
1950 | + Service.startService(self) |
1951 | + for task, interval in self.tasks: |
1952 | + task.start(interval, now=False) |
1953 | + |
1954 | + def stopService(self): |
1955 | + for task, interval in self.tasks: |
1956 | + task.stop() |
1957 | + Service.stopService(self) |
1958 | |
1959 | === added directory 'txstatsd/server' |
1960 | === added file 'txstatsd/server/__init__.py' |
1961 | === added file 'txstatsd/server/configurableprocessor.py' |
1962 | --- txstatsd/server/configurableprocessor.py 1970-01-01 00:00:00 +0000 |
1963 | +++ txstatsd/server/configurableprocessor.py 2012-05-14 16:57:19 +0000 |
1964 | @@ -0,0 +1,108 @@ |
1965 | + |
1966 | +import time |
1967 | + |
1968 | +from txstatsd.metrics.countermetric import CounterMetricReporter |
1969 | +from txstatsd.metrics.gaugemetric import GaugeMetricReporter |
1970 | +from txstatsd.metrics.metermetric import MeterMetricReporter |
1971 | +from txstatsd.metrics.timermetric import TimerMetricReporter |
1972 | +from txstatsd.server.processor import MessageProcessor |
1973 | + |
1974 | + |
1975 | +class ConfigurableMessageProcessor(MessageProcessor): |
1976 | + """ |
1977 | + This specialised C{MessageProcessor} supports behaviour |
1978 | + that is not StatsD-compliant. |
1979 | + |
1980 | + Currently, this extends to: |
1981 | + - Allow a prefix to be added to the composed messages sent |
1982 | + to the Graphite server. |
1983 | + - Report an instantaneous reading of a particular value. |
1984 | + - Report an incrementing and decrementing counter metric. |
1985 | + - Report a timer metric which aggregates timing durations and provides |
1986 | + duration statistics, plus throughput statistics. |
1987 | + """ |
1988 | + |
1989 | + def __init__(self, time_function=time.time, message_prefix="", |
1990 | + internal_metrics_prefix="", plugins=None): |
1991 | + super(ConfigurableMessageProcessor, self).__init__( |
1992 | + time_function=time_function, plugins=plugins) |
1993 | + |
1994 | + if not internal_metrics_prefix and not message_prefix: |
1995 | + internal_metrics_prefix = "statsd." |
1996 | + elif message_prefix and not internal_metrics_prefix: |
1997 | + internal_metrics_prefix = message_prefix + "." + "statsd." |
1998 | + self.internal_metrics_prefix = internal_metrics_prefix |
1999 | + self.message_prefix = message_prefix |
2000 | + self.gauge_metrics = {} |
2001 | + |
2002 | + def get_message_prefix(self, kind): |
2003 | + return self.message_prefix |
2004 | + |
2005 | + def compose_timer_metric(self, key, duration): |
2006 | + if not key in self.timer_metrics: |
2007 | + metric = TimerMetricReporter(key, prefix=self.message_prefix) |
2008 | + self.timer_metrics[key] = metric |
2009 | + self.timer_metrics[key].update(duration) |
2010 | + |
2011 | + def process_counter_metric(self, key, composite, message): |
2012 | + try: |
2013 | + value = float(composite[0]) |
2014 | + except (TypeError, ValueError): |
2015 | + return self.fail(message) |
2016 | + |
2017 | + self.compose_counter_metric(key, value) |
2018 | + |
2019 | + def compose_counter_metric(self, key, value): |
2020 | + if not key in self.counter_metrics: |
2021 | + metric = CounterMetricReporter(key, prefix=self.message_prefix) |
2022 | + self.counter_metrics[key] = metric |
2023 | + self.counter_metrics[key].mark(value) |
2024 | + |
2025 | + def compose_gauge_metric(self, key, value): |
2026 | + if not key in self.gauge_metrics: |
2027 | + metric = GaugeMetricReporter(key, prefix=self.message_prefix) |
2028 | + self.gauge_metrics[key] = metric |
2029 | + self.gauge_metrics[key].mark(value) |
2030 | + |
2031 | + def compose_meter_metric(self, key, value): |
2032 | + if not key in self.meter_metrics: |
2033 | + metric = MeterMetricReporter(key, self.time_function, |
2034 | + prefix=self.message_prefix) |
2035 | + self.meter_metrics[key] = metric |
2036 | + self.meter_metrics[key].mark(value) |
2037 | + |
2038 | + def flush_counter_metrics(self, interval, timestamp): |
2039 | + metrics = [] |
2040 | + events = 0 |
2041 | + for metric in self.counter_metrics.itervalues(): |
2042 | + messages = metric.report(timestamp) |
2043 | + metrics.extend(messages) |
2044 | + events += 1 |
2045 | + |
2046 | + return (metrics, events) |
2047 | + |
2048 | + def flush_gauge_metrics(self, timestamp): |
2049 | + metrics = [] |
2050 | + events = 0 |
2051 | + for metric in self.gauge_metrics.itervalues(): |
2052 | + messages = metric.report(timestamp) |
2053 | + metrics.extend(messages) |
2054 | + events += 1 |
2055 | + |
2056 | + return (metrics, events) |
2057 | + |
2058 | + def flush_timer_metrics(self, percent, timestamp): |
2059 | + metrics = [] |
2060 | + events = 0 |
2061 | + for metric in self.timer_metrics.itervalues(): |
2062 | + messages = metric.report(timestamp) |
2063 | + metrics.extend(messages) |
2064 | + events += 1 |
2065 | + |
2066 | + return (metrics, events) |
2067 | + |
2068 | + def update_metrics(self): |
2069 | + super(ConfigurableMessageProcessor, self).update_metrics() |
2070 | + |
2071 | + for metric in self.timer_metrics.itervalues(): |
2072 | + metric.tick() |
2073 | |
2074 | === added file 'txstatsd/server/httpinfo.py' |
2075 | --- txstatsd/server/httpinfo.py 1970-01-01 00:00:00 +0000 |
2076 | +++ txstatsd/server/httpinfo.py 2012-05-14 16:57:19 +0000 |
2077 | @@ -0,0 +1,60 @@ |
2078 | +# -*- coding: utf-8 *-* |
2079 | +import json |
2080 | + |
2081 | +from twisted.application import service, internet |
2082 | +from twisted.web import server, resource, http |
2083 | + |
2084 | + |
2085 | +class Status(resource.Resource): |
2086 | + isLeaf = True |
2087 | + time_high_water = 0.7 |
2088 | + |
2089 | + def __init__(self, processor, statsd_service): |
2090 | + resource.Resource.__init__(self) |
2091 | + self.processor = processor |
2092 | + self.statsd_service = statsd_service |
2093 | + |
2094 | + def render_GET(self, request): |
2095 | + data = dict(flush_time=self.processor.last_flush_duration, |
2096 | + process_time=self.processor.last_process_duration, |
2097 | + flush_interval=self.statsd_service.flush_interval) |
2098 | + if data["flush_interval"] * self.time_high_water < ( |
2099 | + data["process_time"] + data["flush_time"]): |
2100 | + data["status"] = "ERROR" |
2101 | + request.setResponseCode(http.INTERNAL_SERVER_ERROR) |
2102 | + return json.dumps(data) |
2103 | + data["status"] = "OK" |
2104 | + return json.dumps(data) |
2105 | + |
2106 | + |
2107 | +class Metrics(resource.Resource): |
2108 | + |
2109 | + def __init__(self, processor): |
2110 | + resource.Resource.__init__(self) |
2111 | + self.processor = processor |
2112 | + |
2113 | + def getChild(self, name, request): |
2114 | + metric = self.processor.timer_metrics.get(name, None) or \ |
2115 | + self.processor.plugin_metrics.get(name, None) |
2116 | + if metric is None: |
2117 | + return resource.NoResource() |
2118 | + |
2119 | + meth = getattr(metric, "getResource", None) |
2120 | + |
2121 | + if meth is None: |
2122 | + return resource.NoResource() |
2123 | + |
2124 | + return meth() |
2125 | + |
2126 | + |
2127 | +def makeService(options, processor, statsd_service): |
2128 | + |
2129 | + if options["http-port"] is None: |
2130 | + return service.MultiService() |
2131 | + |
2132 | + root = resource.Resource() |
2133 | + root.putChild("status", Status(processor, statsd_service)) |
2134 | + root.putChild("metrics", Metrics(processor)) |
2135 | + site = server.Site(root) |
2136 | + s = internet.TCPServer(int(options["http-port"]), site) |
2137 | + return s |
2138 | |
2139 | === added file 'txstatsd/server/loggingprocessor.py' |
2140 | --- txstatsd/server/loggingprocessor.py 1970-01-01 00:00:00 +0000 |
2141 | +++ txstatsd/server/loggingprocessor.py 2012-05-14 16:57:19 +0000 |
2142 | @@ -0,0 +1,36 @@ |
2143 | + |
2144 | +import time |
2145 | + |
2146 | +from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor |
2147 | + |
2148 | + |
2149 | +class LoggingMessageProcessor(ConfigurableMessageProcessor): |
2150 | + """ |
2151 | + This specialised C{MessageProcessor} logs the received metrics |
2152 | + using the supplied logger (which should have a callable C{info} |
2153 | + attribute.) |
2154 | + """ |
2155 | + |
2156 | + def __init__( self, logger, time_function=time.time, |
2157 | + message_prefix="", plugins=None, **kwz ): |
2158 | + super(LoggingMessageProcessor, self).__init__( |
2159 | + time_function=time_function, message_prefix=message_prefix, |
2160 | + plugins=plugins, **kwz ) |
2161 | + |
2162 | + logger_info = getattr(logger, "info", None) |
2163 | + if logger_info is None or not callable(logger_info): |
2164 | + raise TypeError() |
2165 | + self.logger = logger |
2166 | + |
2167 | + def process_message(self, message, metric_type, key, fields): |
2168 | + self.logger.info("In: %s" % message) |
2169 | + return super(LoggingMessageProcessor, self)\ |
2170 | + .process_message(message, metric_type, key, fields) |
2171 | + |
2172 | + def flush(self, interval=10000, percent=90): |
2173 | + """Log all received metric samples to the supplied logger.""" |
2174 | + messages = list( super(LoggingMessageProcessor, self)\ |
2175 | + .flush(interval=interval, percent=percent) ) |
2176 | + for msg in messages: |
2177 | + self.logger.info("Out: %s %s %s" % msg) |
2178 | + return messages |
2179 | |
2180 | === added file 'txstatsd/server/processor.py' |
2181 | --- txstatsd/server/processor.py 1970-01-01 00:00:00 +0000 |
2182 | +++ txstatsd/server/processor.py 2012-05-14 16:57:19 +0000 |
2183 | @@ -0,0 +1,371 @@ |
2184 | +from collections import deque |
2185 | +import re |
2186 | +import time |
2187 | +import logging |
2188 | + |
2189 | +from twisted.python import log |
2190 | + |
2191 | +from txstatsd.metrics.metermetric import MeterMetricReporter |
2192 | + |
2193 | + |
2194 | +SPACES = re.compile("\s+") |
2195 | +SLASHES = re.compile("\/+") |
2196 | +NON_ALNUM = re.compile("[^a-zA-Z_\-0-9\.]") |
2197 | +RATE = re.compile("^@([\d\.]+)") |
2198 | + |
2199 | + |
2200 | +def normalize_key(key): |
2201 | + """ |
2202 | + Normalize a key that might contain spaces, forward-slashes and other |
2203 | + special characters into something that is acceptable by graphite. |
2204 | + """ |
2205 | + key = SPACES.sub("_", key) |
2206 | + key = SLASHES.sub("-", key) |
2207 | + key = NON_ALNUM.sub("", key) |
2208 | + return key |
2209 | + |
2210 | + |
2211 | +class BaseMessageProcessor(object): |
2212 | + |
2213 | + def process(self, message): |
2214 | + """ |
2215 | + """ |
2216 | + if not ":" in message: |
2217 | + return self.fail(message) |
2218 | + |
2219 | + key, data = message.strip().split(":", 1) |
2220 | + if not "|" in data: |
2221 | + return self.fail(message) |
2222 | + |
2223 | + fields = data.split("|") |
2224 | + if len(fields) < 2 or len(fields) > 3: |
2225 | + return self.fail(message) |
2226 | + |
2227 | + key = normalize_key(key) |
2228 | + metric_type = fields[1] |
2229 | + return self.process_message(message, metric_type, key, fields) |
2230 | + |
2231 | + def rebuild_message(self, metric_type, key, fields): |
2232 | + return key + ":" + "|".join(fields) |
2233 | + |
2234 | + def fail(self, message): |
2235 | + """Log and discard malformed message.""" |
2236 | + log.msg("Bad line: %r" % message, logLevel=logging.DEBUG) |
2237 | + |
2238 | + |
2239 | +class MessageProcessor(BaseMessageProcessor): |
2240 | + """ |
2241 | + This C{MessageProcessor} produces StatsD-compliant messages |
2242 | + for publishing to a Graphite server. |
2243 | + Metrics behaviour that varies from StatsD should be placed in |
2244 | + some specialised C{MessageProcessor} (see L{ConfigurableMessageProcessor |
2245 | + <txstatsd.server.configurableprocessor.ConfigurableMessageProcessor>}). |
2246 | + """ |
2247 | + |
2248 | + def __init__(self, time_function=time.time, plugins=None): |
2249 | + self.time_function = time_function |
2250 | + |
2251 | + self.stats_prefix = "stats." |
2252 | + self.internal_metrics_prefix = "statsd." |
2253 | + self.count_prefix = "stats_counts." |
2254 | + self.timer_prefix = self.stats_prefix + "timers." |
2255 | + self.gauge_prefix = self.stats_prefix + "gauge." |
2256 | + |
2257 | + self.process_timings = {} |
2258 | + self.by_type = {} |
2259 | + self.last_flush_duration = 0 |
2260 | + self.last_process_duration = 0 |
2261 | + |
2262 | + self.timer_metrics = {} |
2263 | + self.counter_metrics = {} |
2264 | + self.gauge_metrics = deque() |
2265 | + self.meter_metrics = {} |
2266 | + |
2267 | + self.plugins = {} |
2268 | + self.plugin_metrics = {} |
2269 | + |
2270 | + if plugins is not None: |
2271 | + for plugin in plugins: |
2272 | + self.plugins[plugin.metric_type] = plugin |
2273 | + |
2274 | + def process_message(self, message, metric_type, key, fields): |
2275 | + """ |
2276 | + Process a single entry, adding it to either C{counters}, C{timers}, |
2277 | + or C{gauge_metrics} depending on which kind of message it is. |
2278 | + """ |
2279 | + start = self.time_function() |
2280 | + if metric_type == "c": |
2281 | + self.process_counter_metric(key, fields, message) |
2282 | + elif metric_type == "ms": |
2283 | + self.process_timer_metric(key, fields[0], message) |
2284 | + elif metric_type == "g": |
2285 | + self.process_gauge_metric(key, fields[0], message) |
2286 | + elif metric_type == "m": |
2287 | + self.process_meter_metric(key, fields[0], message) |
2288 | + elif metric_type in self.plugins: |
2289 | + self.process_plugin_metric(metric_type, key, fields, message) |
2290 | + else: |
2291 | + return self.fail(message) |
2292 | + self.process_timings.setdefault(metric_type, 0) |
2293 | + self.process_timings[metric_type] += self.time_function() - start |
2294 | + self.by_type.setdefault(metric_type, 0) |
2295 | + self.by_type[metric_type] += 1 |
2296 | + |
2297 | + def get_message_prefix(self, kind): |
2298 | + return "stats." + kind |
2299 | + |
2300 | + def process_plugin_metric(self, metric_type, key, items, message): |
2301 | + if not key in self.plugin_metrics: |
2302 | + factory = self.plugins[metric_type] |
2303 | + metric = factory.build_metric( |
2304 | + self.get_message_prefix(factory.name), |
2305 | + name=key, wall_time_func=self.time_function) |
2306 | + self.plugin_metrics[key] = metric |
2307 | + self.plugin_metrics[key].process(items) |
2308 | + |
2309 | + def process_timer_metric(self, key, duration, message): |
2310 | + try: |
2311 | + duration = float(duration) |
2312 | + except (TypeError, ValueError): |
2313 | + return self.fail(message) |
2314 | + |
2315 | + self.compose_timer_metric(key, duration) |
2316 | + |
2317 | + def compose_timer_metric(self, key, duration): |
2318 | + if key not in self.timer_metrics: |
2319 | + self.timer_metrics[key] = [] |
2320 | + self.timer_metrics[key].append(duration) |
2321 | + |
2322 | + def process_counter_metric(self, key, composite, message): |
2323 | + try: |
2324 | + value = float(composite[0]) |
2325 | + except (TypeError, ValueError): |
2326 | + return self.fail(message) |
2327 | + rate = 1 |
2328 | + if len(composite) == 3: |
2329 | + match = RATE.match(composite[2]) |
2330 | + if match is None: |
2331 | + return self.fail(message) |
2332 | + rate = match.group(1) |
2333 | + |
2334 | + self.compose_counter_metric(key, value, rate) |
2335 | + |
2336 | + def compose_counter_metric(self, key, value, rate): |
2337 | + if key not in self.counter_metrics: |
2338 | + self.counter_metrics[key] = 0 |
2339 | + self.counter_metrics[key] += value * (1 / float(rate)) |
2340 | + |
2341 | + def process_gauge_metric(self, key, composite, message): |
2342 | + values = composite.split(":") |
2343 | + if not len(values) == 1: |
2344 | + return self.fail(message) |
2345 | + |
2346 | + try: |
2347 | + value = float(values[0]) |
2348 | + except (TypeError, ValueError): |
2349 | + self.fail(message) |
2350 | + |
2351 | + self.compose_gauge_metric(key, value) |
2352 | + |
2353 | + def compose_gauge_metric(self, key, value): |
2354 | + metric = [value, key] |
2355 | + self.gauge_metrics.append(metric) |
2356 | + |
2357 | + def process_meter_metric(self, key, composite, message): |
2358 | + values = composite.split(":") |
2359 | + if not len(values) == 1: |
2360 | + return self.fail(message) |
2361 | + |
2362 | + try: |
2363 | + value = float(values[0]) |
2364 | + except (TypeError, ValueError): |
2365 | + self.fail(message) |
2366 | + |
2367 | + self.compose_meter_metric(key, value) |
2368 | + |
2369 | + def compose_meter_metric(self, key, value): |
2370 | + if not key in self.meter_metrics: |
2371 | + metric = MeterMetricReporter(key, self.time_function, |
2372 | + prefix="stats.meter") |
2373 | + self.meter_metrics[key] = metric |
2374 | + self.meter_metrics[key].mark(value) |
2375 | + |
2376 | + def flush(self, interval=10000, percent=90): |
2377 | + """ |
2378 | + Flush all queued stats, computing a normalized count based on |
2379 | + C{interval} and mean timings based on C{threshold}. |
2380 | + """ |
2381 | + messages = [] |
2382 | + per_metric = {} |
2383 | + num_stats = 0 |
2384 | + interval = interval / 1000 |
2385 | + timestamp = int(self.time_function()) |
2386 | + |
2387 | + start = self.time_function() |
2388 | + counter_metrics, events = self.flush_counter_metrics(interval, |
2389 | + timestamp) |
2390 | + duration = self.time_function() - start |
2391 | + if events > 0: |
2392 | + messages.extend(sorted(counter_metrics)) |
2393 | + num_stats += events |
2394 | + per_metric["counter"] = (events, duration) |
2395 | + |
2396 | + start = self.time_function() |
2397 | + timer_metrics, events = self.flush_timer_metrics(percent, timestamp) |
2398 | + duration = self.time_function() - start |
2399 | + if events > 0: |
2400 | + messages.extend(sorted(timer_metrics)) |
2401 | + num_stats += events |
2402 | + per_metric["timer"] = (events, duration) |
2403 | + |
2404 | + start = self.time_function() |
2405 | + gauge_metrics, events = self.flush_gauge_metrics(timestamp) |
2406 | + duration = self.time_function() - start |
2407 | + if events > 0: |
2408 | + messages.extend(sorted(gauge_metrics)) |
2409 | + num_stats += events |
2410 | + per_metric["gauge"] = (events, duration) |
2411 | + |
2412 | + start = self.time_function() |
2413 | + meter_metrics, events = self.flush_meter_metrics(timestamp) |
2414 | + duration = self.time_function() - start |
2415 | + if events > 0: |
2416 | + messages.extend(sorted(meter_metrics)) |
2417 | + num_stats += events |
2418 | + per_metric["meter"] = (events, duration) |
2419 | + |
2420 | + start = self.time_function() |
2421 | + plugin_metrics, events = self.flush_plugin_metrics(interval, timestamp) |
2422 | + duration = self.time_function() - start |
2423 | + if events > 0: |
2424 | + messages.extend(sorted(plugin_metrics)) |
2425 | + num_stats += events |
2426 | + per_metric["plugin"] = (events, duration) |
2427 | + |
2428 | + self.flush_metrics_summary(messages, num_stats, per_metric, timestamp) |
2429 | + return messages |
2430 | + |
2431 | + def flush_counter_metrics(self, interval, timestamp): |
2432 | + metrics = [] |
2433 | + events = 0 |
2434 | + for key, count in self.counter_metrics.iteritems(): |
2435 | + self.counter_metrics[key] = 0 |
2436 | + |
2437 | + value = count / interval |
2438 | + metrics.append((self.stats_prefix + key, value, timestamp)) |
2439 | + metrics.append((self.count_prefix + key, count, timestamp)) |
2440 | + events += 1 |
2441 | + |
2442 | + return (metrics, events) |
2443 | + |
2444 | + def flush_timer_metrics(self, percent, timestamp): |
2445 | + metrics = [] |
2446 | + events = 0 |
2447 | + |
2448 | + threshold_value = ((100 - percent) / 100.0) |
2449 | + for key, timers in self.timer_metrics.iteritems(): |
2450 | + count = len(timers) |
2451 | + if count > 0: |
2452 | + self.timer_metrics[key] = [] |
2453 | + |
2454 | + timers.sort() |
2455 | + lower = timers[0] |
2456 | + upper = timers[-1] |
2457 | + count = len(timers) |
2458 | + |
2459 | + mean = lower |
2460 | + threshold_upper = upper |
2461 | + |
2462 | + if count > 1: |
2463 | + index = count - int(round(threshold_value * count)) |
2464 | + timers = timers[:index] |
2465 | + threshold_upper = timers[-1] |
2466 | + mean = sum(timers) / index |
2467 | + |
2468 | + items = {".mean": mean, |
2469 | + ".upper": upper, |
2470 | + ".upper_%s" % percent: threshold_upper, |
2471 | + ".lower": lower, |
2472 | + ".count": count} |
2473 | + for item, value in items.iteritems(): |
2474 | + metrics.append((self.timer_prefix + key + item, |
2475 | + value, timestamp)) |
2476 | + events += 1 |
2477 | + |
2478 | + return (metrics, events) |
2479 | + |
2480 | + def flush_gauge_metrics(self, timestamp): |
2481 | + metrics = [] |
2482 | + events = 0 |
2483 | + for metric in self.gauge_metrics: |
2484 | + value = metric[0] |
2485 | + key = metric[1] |
2486 | + |
2487 | + metrics.append((self.gauge_prefix + key + ".value", |
2488 | + value, timestamp)) |
2489 | + events += 1 |
2490 | + |
2491 | + self.gauge_metrics.clear() |
2492 | + |
2493 | + return (metrics, events) |
2494 | + |
2495 | + def flush_meter_metrics(self, timestamp): |
2496 | + metrics = [] |
2497 | + events = 0 |
2498 | + for metric in self.meter_metrics.itervalues(): |
2499 | + messages = metric.report(timestamp) |
2500 | + metrics.extend(messages) |
2501 | + events += 1 |
2502 | + |
2503 | + return (metrics, events) |
2504 | + |
2505 | + def flush_plugin_metrics(self, interval, timestamp): |
2506 | + metrics = [] |
2507 | + events = 0 |
2508 | + |
2509 | + for metric in self.plugin_metrics.itervalues(): |
2510 | + messages = metric.flush(interval, timestamp) |
2511 | + metrics.extend(messages) |
2512 | + events += 1 |
2513 | + |
2514 | + return (metrics, events) |
2515 | + |
2516 | + def flush_metrics_summary(self, messages, num_stats, |
2517 | + per_metric, timestamp): |
2518 | + |
2519 | + messages.append((self.internal_metrics_prefix + "numStats", |
2520 | + num_stats, timestamp)) |
2521 | + |
2522 | + self.last_flush_duration = 0 |
2523 | + for name, (value, duration) in per_metric.iteritems(): |
2524 | + messages.extend([ |
2525 | + (self.internal_metrics_prefix + |
2526 | + "flush.%s.count" % name, |
2527 | + value, timestamp), |
2528 | + (self.internal_metrics_prefix + |
2529 | + "flush.%s.duration" % name, |
2530 | + duration * 1000, timestamp)]) |
2531 | + log.msg("Flushed %d %s metrics in %.6f" % |
2532 | + (value, name, duration)) |
2533 | + self.last_flush_duration += duration |
2534 | + |
2535 | + self.last_process_duration = 0 |
2536 | + for metric_type, duration in self.process_timings.iteritems(): |
2537 | + messages.extend([ |
2538 | + (self.internal_metrics_prefix + |
2539 | + "receive.%s.count" % |
2540 | + metric_type, self.by_type[metric_type], timestamp), |
2541 | + (self.internal_metrics_prefix + |
2542 | + "receive.%s.duration" % |
2543 | + metric_type, duration * 1000, timestamp) |
2544 | + ]) |
2545 | + log.msg("Processing %d %s metrics took %.6f" % |
2546 | + (self.by_type[metric_type], metric_type, duration)) |
2547 | + self.last_process_duration += duration |
2548 | + |
2549 | + self.process_timings.clear() |
2550 | + self.by_type.clear() |
2551 | + |
2552 | + def update_metrics(self): |
2553 | + for metric in self.meter_metrics.itervalues(): |
2554 | + metric.tick() |
2555 | |
2556 | === added file 'txstatsd/server/protocol.py' |
2557 | --- txstatsd/server/protocol.py 1970-01-01 00:00:00 +0000 |
2558 | +++ txstatsd/server/protocol.py 2012-05-14 16:57:19 +0000 |
2559 | @@ -0,0 +1,63 @@ |
2560 | +from twisted.internet.protocol import ( |
2561 | + DatagramProtocol, Protocol, Factory) |
2562 | +from twisted.protocols.basic import LineReceiver |
2563 | + |
2564 | + |
2565 | +class StatsDServerProtocol(DatagramProtocol): |
2566 | + """A Twisted-based implementation of the StatsD server. |
2567 | + |
2568 | + Data is received via UDP for local aggregation and then sent to a Graphite |
2569 | + server via TCP. |
2570 | + """ |
2571 | + |
2572 | + def __init__(self, processor, |
2573 | + monitor_message=None, monitor_response=None): |
2574 | + self.processor = processor |
2575 | + self.monitor_message = monitor_message |
2576 | + self.monitor_response = monitor_response |
2577 | + |
2578 | + def datagramReceived(self, data, (host, port)): |
2579 | + """Process received data and store it locally.""" |
2580 | + if data == self.monitor_message: |
2581 | + # Send the expected response to the |
2582 | + # monitoring agent. |
2583 | + self.transport.write(self.monitor_response, (host, port)) |
2584 | + else: |
2585 | + self.processor.process(data) |
2586 | + |
2587 | + |
2588 | +class StatsDTCPServerProtocol(LineReceiver): |
2589 | + """A Twisted-based implementation of the StatsD server over TCP. |
2590 | + |
2591 | + Data is received via TCP for local aggregation and then sent to a Graphite |
2592 | + server via TCP. |
2593 | + """ |
2594 | + |
2595 | + def __init__(self, processor, |
2596 | + monitor_message=None, monitor_response=None): |
2597 | + self.processor = processor |
2598 | + self.monitor_message = monitor_message |
2599 | + self.monitor_response = monitor_response |
2600 | + |
2601 | + def lineReceived(self, data): |
2602 | + """Process received data and store it locally.""" |
2603 | + if data == self.monitor_message: |
2604 | + # Send the expected response to the |
2605 | + # monitoring agent. |
2606 | + self.transport.write(self.monitor_response) |
2607 | + else: |
2608 | + self.processor.process(data) |
2609 | + |
2610 | + |
2611 | +class StatsDTCPServerFactory(Factory): |
2612 | + |
2613 | + def __init__(self, processor, |
2614 | + monitor_message=None, monitor_response=None): |
2615 | + self.processor = processor |
2616 | + self.monitor_message = monitor_message |
2617 | + self.monitor_response = monitor_response |
2618 | + |
2619 | + def buildProtocol(self, addr): |
2620 | + return StatsDTCPServerProtocol(self.processor, |
2621 | + self.monitor_message, self.monitor_response) |
2622 | + |
2623 | |
2624 | === added file 'txstatsd/server/router.py' |
2625 | --- txstatsd/server/router.py 1970-01-01 00:00:00 +0000 |
2626 | +++ txstatsd/server/router.py 2012-05-14 16:57:19 +0000 |
2627 | @@ -0,0 +1,308 @@ |
2628 | +""" |
2629 | +Routes messages to different processors. |
2630 | + |
2631 | +Rules are of the form: |
2632 | + condition => target |
2633 | +Where each on of condition and target are of the form: |
2634 | + name [arguments]* |
2635 | +And the arguments are dependant on the name. |
2636 | + |
2637 | +Each rule is applied to the message on the order they are specified. |
2638 | + |
2639 | +Conditions supported: |
2640 | + any: will match all messages |
2641 | + metric_type [type]+: will match a metric of any of the types specified |
2642 | + path_like fnmatch_exp: will match the path against the expression with |
2643 | + fnmatch. |
2644 | + not [rule..]: will return the negation of the result of rule |
2645 | + |
2646 | + |
2647 | +Targets supported: |
2648 | + drop: will drop the message, stopping any further processing. |
2649 | + redirect_udp host port: will send to (host, port) by udp |
2650 | + redirect_tcp host port: will send to (host, port) by tcp |
2651 | + rewrite pattern repl: will rewrite the path like re.sub |
2652 | + set_metric_type metric_type: will make the metric of type metric_type |
2653 | + |
2654 | +""" |
2655 | +import fnmatch |
2656 | +import re |
2657 | +import time |
2658 | + |
2659 | +from zope.interface import implements |
2660 | + |
2661 | +from twisted.application.internet import UDPServer |
2662 | +from twisted.application.service import Service |
2663 | +from twisted.internet import interfaces |
2664 | +from twisted.internet.protocol import ( |
2665 | + DatagramProtocol, ReconnectingClientFactory, Protocol) |
2666 | +from twisted.internet import defer |
2667 | +from twisted.python import log |
2668 | + |
2669 | +from txstatsd.server.processor import BaseMessageProcessor |
2670 | + |
2671 | + |
2672 | +class StopProcessingException(Exception): |
2673 | + |
2674 | + pass |
2675 | + |
2676 | + |
2677 | +class TCPRedirectService(Service): |
2678 | + |
2679 | + def __init__(self, host, port, factory): |
2680 | + self.host = host |
2681 | + self.port = port |
2682 | + self.factory = factory |
2683 | + |
2684 | + def startService(self): |
2685 | + from twisted.internet import reactor |
2686 | + |
2687 | + reactor.connectTCP(self.host, self.port, self.factory) |
2688 | + return Service.startService(self) |
2689 | + |
2690 | + def stopService(self): |
2691 | + self.factory.stopTrying() |
2692 | + if self.factory.protocol: |
2693 | + self.factory.protocol.transport.loseConnection() |
2694 | + return Service.stopService(self) |
2695 | + |
2696 | + |
2697 | +class TCPRedirectClientFactory(ReconnectingClientFactory): |
2698 | + |
2699 | + def __init__(self, callback=None): |
2700 | + self.callback = callback |
2701 | + self.protocol = None |
2702 | + |
2703 | + def buildProtocol(self, addr): |
2704 | + from twisted.internet import reactor |
2705 | + |
2706 | + self.resetDelay() |
2707 | + self.protocol = TCPRedirectProtocol() |
2708 | + if self.callback: |
2709 | + reactor.callLater(0, self.callback) |
2710 | + self.callback = None |
2711 | + |
2712 | + return self.protocol |
2713 | + |
2714 | + def write(self, data): |
2715 | + if self.protocol: |
2716 | + self.protocol.write(data) |
2717 | + |
2718 | + |
2719 | +class TCPRedirectProtocol(Protocol): |
2720 | + """A client protocol for redicting messages over tcp. |
2721 | + """ |
2722 | + |
2723 | + implements(interfaces.IPushProducer) |
2724 | + |
2725 | + def __init__(self): |
2726 | + self.paused = False |
2727 | + self.last_paused = None |
2728 | + self.dropped = 0 |
2729 | + |
2730 | + def connectionMade(self): |
2731 | + """ |
2732 | + A connection has been made, register ourselves as a producer for the |
2733 | + bound transport. |
2734 | + """ |
2735 | + self.transport.registerProducer(self, True) |
2736 | + |
2737 | + def pauseProducing(self): |
2738 | + """Pause producing messages, since the buffer is full.""" |
2739 | + self.last_paused = int(time.time()) |
2740 | + self.paused = True |
2741 | + |
2742 | + stopProducing = pauseProducing |
2743 | + |
2744 | + def resumeProducing(self): |
2745 | + """We can write to the transport again. Yay!.""" |
2746 | + time_now = int(time.time()) |
2747 | + log("Resumed TCP redirect. " |
2748 | + "Dropped %s messages during %s seconds ", |
2749 | + self.dropped, time_now - self.last_paused) |
2750 | + self.paused = False |
2751 | + self.dropped = 0 |
2752 | + self.last_paused = None |
2753 | + |
2754 | + def write(self, line): |
2755 | + if self.paused: |
2756 | + self.dropped += 1 |
2757 | + return |
2758 | + |
2759 | + if line[-2:] != "\r\n": |
2760 | + if line[-1] == "\r": |
2761 | + line += "\n" |
2762 | + else: |
2763 | + line += "\r\n" |
2764 | + self.transport.write(line) |
2765 | + |
2766 | + |
2767 | +class UDPRedirectProtocol(DatagramProtocol): |
2768 | + |
2769 | + def __init__(self, host, port, callback): |
2770 | + self.host = host |
2771 | + self.port = port |
2772 | + self.callback = callback |
2773 | + |
2774 | + def startProtocol(self): |
2775 | + self.transport.connect(self.host, self.port) |
2776 | + self.callback() |
2777 | + |
2778 | + def write(self, data): |
2779 | + if self.transport is not None: |
2780 | + self.transport.write(data) |
2781 | + |
2782 | + |
2783 | +class Router(BaseMessageProcessor): |
2784 | + |
2785 | + def __init__(self, message_processor, rules_config, service=None): |
2786 | + """Configure a router with rules_config. |
2787 | + |
2788 | + rules_config is a new_line separeted list of rules. |
2789 | + """ |
2790 | + self.rules_config = rules_config |
2791 | + self.message_processor = message_processor |
2792 | + self.flush = message_processor.flush |
2793 | + self.ready = defer.succeed(None) |
2794 | + self.service = service |
2795 | + self.rules = self.build_rules(rules_config) |
2796 | + |
2797 | + def build_condition(self, condition): |
2798 | + condition_parts = [ |
2799 | + p.strip() for p in condition.split(" ") if p] |
2800 | + condition_factory = getattr(self, |
2801 | + "build_condition_" + condition_parts[0], None) |
2802 | + if condition_factory is None: |
2803 | + raise ValueError("unknown condition %s" % |
2804 | + (condition_parts[0],)) |
2805 | + condition_function = condition_factory(*condition_parts[1:]) |
2806 | + return condition_function |
2807 | + |
2808 | + def build_rules(self, rules_config): |
2809 | + rules = [] |
2810 | + for line in rules_config.split("\n"): |
2811 | + if not line: |
2812 | + continue |
2813 | + |
2814 | + condition, target = line.split("=>") |
2815 | + condition_function = self.build_condition(condition) |
2816 | + |
2817 | + target_parts = [ |
2818 | + p.strip() for p in target.split(" ") if p] |
2819 | + |
2820 | + target_factory = getattr(self, |
2821 | + "build_target_" + target_parts[0], None) |
2822 | + |
2823 | + if target_factory is None: |
2824 | + raise ValueError("unknown target %s" % |
2825 | + (target_parts[0],)) |
2826 | + |
2827 | + rules.append(( |
2828 | + condition_function, |
2829 | + target_factory(*target_parts[1:]))) |
2830 | + return rules |
2831 | + |
2832 | + def build_condition_any(self): |
2833 | + """Returns a condition that always matches.""" |
2834 | + return lambda *args: True |
2835 | + |
2836 | + def build_condition_not(self, *args): |
2837 | + """ |
2838 | + Returns a condition that negates the condition from its arguments. |
2839 | + """ |
2840 | + other_condition = self.build_condition(" ".join(args)) |
2841 | + |
2842 | + def not_condition(metric_type, key, fields): |
2843 | + return not other_condition(metric_type, key, fields) |
2844 | + return not_condition |
2845 | + |
2846 | + def build_condition_metric_type(self, *metric_types): |
2847 | + """Returns a condition that matched on metric kind.""" |
2848 | + def metric_type_condition(metric_type, key, fields): |
2849 | + return (metric_type in metric_types) |
2850 | + return metric_type_condition |
2851 | + |
2852 | + def build_condition_path_like(self, pattern): |
2853 | + def path_like_condition(metric_type, key, fields): |
2854 | + return fnmatch.fnmatch(key, pattern) |
2855 | + return path_like_condition |
2856 | + |
2857 | + def build_target_drop(self): |
2858 | + """Returns a target that stops the processing of a message.""" |
2859 | + def drop(*args): |
2860 | + return |
2861 | + return drop |
2862 | + |
2863 | + def build_target_rewrite(self, pattern, repl, dup="no-dup"): |
2864 | + rexp = re.compile(pattern) |
2865 | + |
2866 | + def rewrite_target(metric_type, key, fields): |
2867 | + if dup == "dup" and rexp.match(key) is not None: |
2868 | + yield metric_type, key, fields |
2869 | + key = rexp.sub(repl, key) |
2870 | + yield metric_type, key, fields |
2871 | + |
2872 | + return rewrite_target |
2873 | + |
2874 | + def build_target_set_metric_type(self, metric_type, dup="no-dup"): |
2875 | + def set_metric_type(_, key, fields): |
2876 | + if dup == "dup": |
2877 | + yield _, key, fields |
2878 | + yield metric_type, key, fields |
2879 | + return set_metric_type |
2880 | + |
2881 | + def build_target_redirect_udp(self, host, port): |
2882 | + if self.service is None: |
2883 | + return lambda *args: True |
2884 | + |
2885 | + port = int(port) |
2886 | + d = defer.Deferred() |
2887 | + self.ready.addCallback(lambda _: d) |
2888 | + protocol = UDPRedirectProtocol(host, port, lambda: d.callback(None)) |
2889 | + |
2890 | + client = UDPServer(0, protocol) |
2891 | + client.setServiceParent(self.service) |
2892 | + |
2893 | + def redirect_udp_target(metric_type, key, fields): |
2894 | + message = self.rebuild_message(metric_type, key, fields) |
2895 | + protocol.write(message) |
2896 | + yield metric_type, key, fields |
2897 | + return redirect_udp_target |
2898 | + |
2899 | + def build_target_redirect_tcp(self, host, port): |
2900 | + if self.service is None: |
2901 | + return lambda *args: True |
2902 | + |
2903 | + port = int(port) |
2904 | + d = defer.Deferred() |
2905 | + self.ready.addCallback(lambda _: d) |
2906 | + factory = TCPRedirectClientFactory(lambda: d.callback(None)) |
2907 | + |
2908 | + redirect_service = TCPRedirectService(host, port, factory) |
2909 | + redirect_service.setServiceParent(self.service) |
2910 | + |
2911 | + def redirect_tcp_target(metric_type, key, fields): |
2912 | + message = self.rebuild_message(metric_type, key, fields) |
2913 | + factory.write(message) |
2914 | + yield metric_type, key, fields |
2915 | + return redirect_tcp_target |
2916 | + |
2917 | + def process_message(self, message, metric_type, key, fields): |
2918 | + metrics = [(metric_type, key, fields)] |
2919 | + if self.rules: |
2920 | + for condition, target in self.rules: |
2921 | + pending, metrics = metrics, [] |
2922 | + if not pending: |
2923 | + return |
2924 | + for metric_type, key, fields in pending: |
2925 | + if not condition(metric_type, key, fields): |
2926 | + metrics.append((metric_type, key, fields)) |
2927 | + continue |
2928 | + result = target(metric_type, key, fields) |
2929 | + if result is not None: |
2930 | + metrics.extend(result) |
2931 | + |
2932 | + for (metric_type, key, fields) in metrics: |
2933 | + message = self.rebuild_message(metric_type, key, fields) |
2934 | + self.message_processor.process_message(message, metric_type, |
2935 | + key, fields) |
2936 | |
2937 | === added file 'txstatsd/service.py' |
2938 | --- txstatsd/service.py 1970-01-01 00:00:00 +0000 |
2939 | +++ txstatsd/service.py 2012-05-14 16:57:19 +0000 |
2940 | @@ -0,0 +1,340 @@ |
2941 | + |
2942 | +import getopt |
2943 | +import sys |
2944 | +import time |
2945 | +import ConfigParser |
2946 | +import platform |
2947 | +import functools |
2948 | + |
2949 | +from twisted.application.internet import UDPServer, TCPServer |
2950 | +from twisted.application.service import MultiService |
2951 | +from twisted.python import usage, log |
2952 | +from twisted.plugin import getPlugins |
2953 | + |
2954 | +from txstatsd.client import InternalClient |
2955 | +from txstatsd.metrics.metrics import Metrics |
2956 | +from txstatsd.metrics.extendedmetrics import ExtendedMetrics |
2957 | +from txstatsd.server.processor import MessageProcessor |
2958 | +from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor |
2959 | +from txstatsd.server.loggingprocessor import LoggingMessageProcessor |
2960 | +from txstatsd.server.protocol import ( |
2961 | + StatsDServerProtocol, StatsDTCPServerFactory) |
2962 | +from txstatsd.server.router import Router |
2963 | +from txstatsd.server import httpinfo |
2964 | +from txstatsd.report import ReportingService |
2965 | +from txstatsd.itxstatsd import IMetricFactory |
2966 | +from twisted.application.service import Service |
2967 | +from twisted.internet import task |
2968 | + |
2969 | + |
2970 | +def accumulateClassList(classObj, attr, listObj, |
2971 | + baseClass=None, excludeClass=None): |
2972 | + """Accumulate all attributes of a given name in a class hierarchy |
2973 | + into a single list. |
2974 | + |
2975 | + Assuming all class attributes of this name are lists. |
2976 | + """ |
2977 | + for base in classObj.__bases__: |
2978 | + accumulateClassList(base, attr, listObj, excludeClass=excludeClass) |
2979 | + if excludeClass != classObj: |
2980 | + if baseClass is None or baseClass in classObj.__bases__: |
2981 | + listObj.extend(classObj.__dict__.get(attr, [])) |
2982 | + |
2983 | + |
2984 | +class OptionsGlue(usage.Options): |
2985 | + """Extends usage.Options to also read parameters from a config file.""" |
2986 | + |
2987 | + optParameters = [ |
2988 | + ["config", "c", None, "Config file to use."]] |
2989 | + |
2990 | + def __init__(self): |
2991 | + parameters = [] |
2992 | + accumulateClassList(self.__class__, 'optParameters', |
2993 | + parameters, excludeClass=OptionsGlue) |
2994 | + for parameter in parameters: |
2995 | + if parameter[0] == "config" or parameter[1] == "c": |
2996 | + raise ValueError("the --config/-c parameter is reserved.") |
2997 | + |
2998 | + self.overridden_options = [] |
2999 | + |
3000 | + super(OptionsGlue, self).__init__() |
3001 | + |
3002 | + def opt_config(self, config_path): |
3003 | + self['config'] = config_path |
3004 | + |
3005 | + opt_c = opt_config |
3006 | + |
3007 | + def parseOptions(self, options=None): |
3008 | + """Obtain overridden options.""" |
3009 | + |
3010 | + if options is None: |
3011 | + options = sys.argv[1:] |
3012 | + try: |
3013 | + opts, args = getopt.getopt(options, |
3014 | + self.shortOpt, self.longOpt) |
3015 | + except getopt.error, e: |
3016 | + raise usage.UsageError(str(e)) |
3017 | + |
3018 | + for opt, arg in opts: |
3019 | + if opt[1] == '-': |
3020 | + opt = opt[2:] |
3021 | + else: |
3022 | + opt = opt[1:] |
3023 | + self.overridden_options.append(opt) |
3024 | + |
3025 | + super(OptionsGlue, self).parseOptions(options=options) |
3026 | + |
3027 | + def postOptions(self): |
3028 | + """Read the configuration file if one is provided.""" |
3029 | + if self['config'] is not None: |
3030 | + config_file = ConfigParser.RawConfigParser() |
3031 | + config_file.read(self['config']) |
3032 | + |
3033 | + self.configure(config_file) |
3034 | + |
3035 | + def overridden_option(self, opt): |
3036 | + """Return whether this option was overridden.""" |
3037 | + return opt in self.overridden_options |
3038 | + |
3039 | + def configure(self, config_file): |
3040 | + """Read the configuration items, coercing types as required.""" |
3041 | + for name, value in config_file.items(self.config_section): |
3042 | + self._coerce_option(name, value) |
3043 | + |
3044 | + for section in sorted(config_file.sections()): |
3045 | + if section.startswith("plugin_"): |
3046 | + self[section] = config_file.items(section) |
3047 | + if section.startswith("carbon-cache"): |
3048 | + for name, value in config_file.items(section): |
3049 | + self._coerce_option(name, value) |
3050 | + |
3051 | + def _coerce_option(self, name, value): |
3052 | + """Coerce a single option, checking for overriden options.""" |
3053 | + # Overridden options have precedence |
3054 | + if not self.overridden_option(name): |
3055 | + # Options appends '=' when gathering the parameters |
3056 | + if (name + '=') in self.longOpt: |
3057 | + # Coerce the type if required |
3058 | + if name in self._dispatch: |
3059 | + if isinstance(self._dispatch[name], usage.CoerceParameter): |
3060 | + value = self._dispatch[name].coerce(value) |
3061 | + else: |
3062 | + self._dispatch[name](name, value) |
3063 | + return |
3064 | + self[name] = value |
3065 | + |
3066 | + |
3067 | +class StatsDOptions(OptionsGlue): |
3068 | + """ |
3069 | + The set of configuration settings for txStatsD. |
3070 | + """ |
3071 | + |
3072 | + optParameters = [ |
3073 | + ["carbon-cache-host", "h", None, |
3074 | + "The host where carbon cache is listening.", str], |
3075 | + ["carbon-cache-port", "p", None, |
3076 | + "The port where carbon cache is listening.", int], |
3077 | + ["carbon-cache-name", "n", None, |
3078 | + "An identifier for the carbon-cache instance."], |
3079 | + ["listen-port", "l", 8125, |
3080 | + "The UDP port where we will listen.", int], |
3081 | + ["flush-interval", "i", 60000, |
3082 | + "The number of milliseconds between each flush.", int], |
3083 | + ["prefix", "x", None, |
3084 | + "Prefix to use when reporting stats.", str], |
3085 | + ["instance-name", "N", None, |
3086 | + "Instance name for our own stats reporting.", str], |
3087 | + ["report", "r", None, |
3088 | + "Which additional stats to report {process|net|io|system}.", str], |
3089 | + ["monitor-message", "m", "txstatsd ping", |
3090 | + "Message we expect from monitoring agent.", str], |
3091 | + ["monitor-response", "o", "txstatsd pong", |
3092 | + "Response we should send monitoring agent.", str], |
3093 | + ["statsd-compliance", "s", 1, |
3094 | + "Produce StatsD-compliant messages.", int], |
3095 | + ["dump-mode", "d", 0, |
3096 | + "Dump received and aggregated metrics" |
3097 | + " before passing them to carbon.", int], |
3098 | + ["routing", "g", "", |
3099 | + "Routing rules", str], |
3100 | + ["listen-tcp-port", "t", None, |
3101 | + "The TCP port where we will listen.", int], |
3102 | + ["max-queue-size", "Q", 20000, |
3103 | + "Maximum send queue size per destination.", int], |
3104 | + ["max-datapoints-per-message", "M", 1000, |
3105 | + "Maximum datapoints per message to carbon-cache.", int], |
3106 | + ["http-port", "P", None, |
3107 | + "The httpinfo port.", int], |
3108 | + ] |
3109 | + |
3110 | + def __init__(self): |
3111 | + self.config_section = 'statsd' |
3112 | + super(StatsDOptions, self).__init__() |
3113 | + self["carbon-cache-host"] = [] |
3114 | + self["carbon-cache-port"] = [] |
3115 | + self["carbon-cache-name"] = [] |
3116 | + |
3117 | + def opt_carbon_cache_host(self, host): |
3118 | + self["carbon-cache-host"].append(host) |
3119 | + |
3120 | + def opt_carbon_cache_port(self, port): |
3121 | + self["carbon-cache-port"].append(usage.portCoerce(port)) |
3122 | + |
3123 | + def opt_carbon_cache_name(self, name): |
3124 | + self["carbon-cache-name"].append(name) |
3125 | + |
3126 | + |
3127 | +class StatsDService(Service): |
3128 | + |
3129 | + def __init__(self, carbon_client, processor, flush_interval, clock=None): |
3130 | + self.carbon_client = carbon_client |
3131 | + self.processor = processor |
3132 | + self.flush_interval = flush_interval |
3133 | + self.flush_task = task.LoopingCall(self.flushProcessor) |
3134 | + if clock is not None: |
3135 | + self.flush_task.clock = clock |
3136 | + |
3137 | + def flushProcessor(self): |
3138 | + """Flush messages queued in the processor to Graphite.""" |
3139 | + flushed = 0 |
3140 | + start = time.time() |
3141 | + for metric, value, timestamp in self.processor.flush( |
3142 | + interval=self.flush_interval): |
3143 | + self.carbon_client.sendDatapoint(metric, (timestamp, value)) |
3144 | + flushed += 1 |
3145 | + log.msg("Flushed total %d metrics in %.6f" % |
3146 | + (flushed, time.time() - start)) |
3147 | + |
3148 | + def startService(self): |
3149 | + self.flush_task.start(self.flush_interval / 1000, False) |
3150 | + |
3151 | + def stopService(self): |
3152 | + if self.flush_task.running: |
3153 | + self.flush_task.stop() |
3154 | + |
3155 | + |
3156 | +def report_client_manager_stats(): |
3157 | + from carbon.instrumentation import stats |
3158 | + |
3159 | + current_stats = stats.copy() |
3160 | + for name, value in list(current_stats.items()): |
3161 | + del current_stats[name] |
3162 | + if name.startswith("destinations"): |
3163 | + current_stats[name.replace(":", "_")] = value |
3164 | + stats[name] = 0 |
3165 | + return current_stats |
3166 | + |
3167 | + |
3168 | +def createService(options): |
3169 | + """Create a txStatsD service.""" |
3170 | + from carbon.routers import ConsistentHashingRouter |
3171 | + from carbon.client import CarbonClientManager |
3172 | + from carbon.conf import settings |
3173 | + |
3174 | + settings.MAX_QUEUE_SIZE = options["max-queue-size"] |
3175 | + settings.MAX_DATAPOINTS_PER_MESSAGE = options["max-datapoints-per-message"] |
3176 | + |
3177 | + root_service = MultiService() |
3178 | + root_service.setName("statsd") |
3179 | + |
3180 | + prefix = options["prefix"] |
3181 | + if prefix is None: |
3182 | + prefix = "statsd" |
3183 | + |
3184 | + instance_name = options["instance-name"] |
3185 | + if not instance_name: |
3186 | + instance_name = platform.node() |
3187 | + |
3188 | + # initialize plugins |
3189 | + plugin_metrics = [] |
3190 | + for plugin in getPlugins(IMetricFactory): |
3191 | + plugin.configure(options) |
3192 | + plugin_metrics.append(plugin) |
3193 | + |
3194 | + processor = None |
3195 | + if options["dump-mode"]: |
3196 | + # LoggingMessageProcessor supersedes |
3197 | + # any other processor class in "dump-mode" |
3198 | + assert not hasattr(log, 'info') |
3199 | + log.info = log.msg # for compatibility with LMP logger interface |
3200 | + processor = functools.partial(LoggingMessageProcessor, logger=log) |
3201 | + |
3202 | + if options["statsd-compliance"]: |
3203 | + processor = (processor or MessageProcessor)(plugins=plugin_metrics) |
3204 | + input_router = Router(processor, options['routing'], root_service) |
3205 | + connection = InternalClient(input_router) |
3206 | + metrics = Metrics(connection) |
3207 | + else: |
3208 | + processor = (processor or ConfigurableMessageProcessor)( |
3209 | + message_prefix=prefix, |
3210 | + internal_metrics_prefix=prefix + "." + instance_name + ".", |
3211 | + plugins=plugin_metrics) |
3212 | + input_router = Router(processor, options['routing'], root_service) |
3213 | + connection = InternalClient(input_router) |
3214 | + metrics = ExtendedMetrics(connection) |
3215 | + |
3216 | + if not options["carbon-cache-host"]: |
3217 | + options["carbon-cache-host"].append("127.0.0.1") |
3218 | + if not options["carbon-cache-port"]: |
3219 | + options["carbon-cache-port"].append(2004) |
3220 | + if not options["carbon-cache-name"]: |
3221 | + options["carbon-cache-name"].append(None) |
3222 | + |
3223 | + reporting = ReportingService(instance_name) |
3224 | + reporting.setServiceParent(root_service) |
3225 | + |
3226 | + # Schedule updates for those metrics expecting to be |
3227 | + # periodically updated, for example the meter metric. |
3228 | + reporting.schedule(processor.update_metrics, 10, None) |
3229 | + reporting.schedule(report_client_manager_stats, |
3230 | + options["flush-interval"] / 1000, |
3231 | + metrics.gauge) |
3232 | + |
3233 | + if options["report"] is not None: |
3234 | + from txstatsd import process |
3235 | + from twisted.internet import reactor |
3236 | + |
3237 | + reporting.schedule( |
3238 | + process.report_reactor_stats(reactor), 60, metrics.gauge) |
3239 | + reports = [name.strip() for name in options["report"].split(",")] |
3240 | + for report_name in reports: |
3241 | + for reporter in getattr(process, "%s_STATS" % |
3242 | + report_name.upper(), ()): |
3243 | + reporting.schedule(reporter, 60, metrics.gauge) |
3244 | + |
3245 | + # XXX Make this configurable. |
3246 | + router = ConsistentHashingRouter() |
3247 | + carbon_client = CarbonClientManager(router) |
3248 | + carbon_client.setServiceParent(root_service) |
3249 | + |
3250 | + for host, port, name in zip(options["carbon-cache-host"], |
3251 | + options["carbon-cache-port"], |
3252 | + options["carbon-cache-name"]): |
3253 | + carbon_client.startClient((host, port, name)) |
3254 | + |
3255 | + statsd_service = StatsDService(carbon_client, input_router, |
3256 | + options["flush-interval"]) |
3257 | + statsd_service.setServiceParent(root_service) |
3258 | + |
3259 | + statsd_server_protocol = StatsDServerProtocol( |
3260 | + input_router, |
3261 | + monitor_message=options["monitor-message"], |
3262 | + monitor_response=options["monitor-response"]) |
3263 | + |
3264 | + listener = UDPServer(options["listen-port"], statsd_server_protocol) |
3265 | + listener.setServiceParent(root_service) |
3266 | + |
3267 | + if options["listen-tcp-port"] is not None: |
3268 | + statsd_tcp_server_factory = StatsDTCPServerFactory( |
3269 | + input_router, |
3270 | + monitor_message=options["monitor-message"], |
3271 | + monitor_response=options["monitor-response"]) |
3272 | + |
3273 | + listener = TCPServer(options["listen-tcp-port"], |
3274 | + statsd_tcp_server_factory) |
3275 | + listener.setServiceParent(root_service) |
3276 | + |
3277 | + httpinfo_service = httpinfo.makeService(options, processor, statsd_service) |
3278 | + httpinfo_service.setServiceParent(root_service) |
3279 | + |
3280 | + return root_service |
3281 | |
3282 | === added directory 'txstatsd/stats' |
3283 | === added file 'txstatsd/stats/__init__.py' |
3284 | === added file 'txstatsd/stats/ewma.py' |
3285 | --- txstatsd/stats/ewma.py 1970-01-01 00:00:00 +0000 |
3286 | +++ txstatsd/stats/ewma.py 2012-05-14 16:57:19 +0000 |
3287 | @@ -0,0 +1,70 @@ |
3288 | + |
3289 | +""" |
3290 | +An exponentially-weighted moving average. |
3291 | + |
3292 | +See: |
3293 | +- U{UNIX Load Average Part 1: How It Works |
3294 | + <http://www.teamquest.com/pdfs/whitepaper/ldavg1.pdf>} |
3295 | +- U{UNIX Load Average Part 2: Not Your Average Average |
3296 | + <http://www.teamquest.com/pdfs/whitepaper/ldavg2.pdf>} |
3297 | +""" |
3298 | + |
3299 | +import math |
3300 | + |
3301 | + |
3302 | +class Ewma(object): |
3303 | + M1_ALPHA = 1 - math.exp(-5 / 60.0) |
3304 | + M5_ALPHA = 1 - math.exp(-5 / 60.0 / 5) |
3305 | + M15_ALPHA = 1 - math.exp(-5 / 60.0 / 15) |
3306 | + |
3307 | + @classmethod |
3308 | + def one_minute_ewma(cls): |
3309 | + """ |
3310 | + Creates a new C{Ewma} which is equivalent to the UNIX one minute |
3311 | + load average and which expects to be ticked every 5 seconds. |
3312 | + """ |
3313 | + return Ewma(Ewma.M1_ALPHA, 5) |
3314 | + |
3315 | + @classmethod |
3316 | + def five_minute_ewma(cls): |
3317 | + """ |
3318 | + Creates a new C{Ewma} which is equivalent to the UNIX five minute |
3319 | + load average and which expects to be ticked every 5 seconds. |
3320 | + """ |
3321 | + return Ewma(Ewma.M5_ALPHA, 5) |
3322 | + |
3323 | + @classmethod |
3324 | + def fifteen_minute_ewma(cls): |
3325 | + """ |
3326 | + Creates a new C{Ewma} which is equivalent to the UNIX fifteen |
3327 | + minute load average and which expects to be ticked every 5 seconds. |
3328 | + """ |
3329 | + return Ewma(Ewma.M15_ALPHA, 5) |
3330 | + |
3331 | + def __init__(self, alpha, interval): |
3332 | + """Create a new C{Ewma} with a specific smoothing constant. |
3333 | + |
3334 | + @param alpha: The smoothing constant. |
3335 | + @param interval: The expected tick interval in seconds. |
3336 | + """ |
3337 | + self.interval = interval |
3338 | + self.alpha = float(alpha) |
3339 | + |
3340 | + self.initialized = False |
3341 | + self.rate = 0.0 |
3342 | + self.uncounted = 0 |
3343 | + |
3344 | + def update(self, n): |
3345 | + """Update the moving average with a new value.""" |
3346 | + self.uncounted += n |
3347 | + |
3348 | + def tick(self): |
3349 | + """Mark the passage of time and decay the current rate accordingly.""" |
3350 | + count = self.uncounted |
3351 | + self.uncounted = 0 |
3352 | + instant_rate = float(count) / self.interval |
3353 | + if self.initialized: |
3354 | + self.rate += (self.alpha * (instant_rate - self.rate)) |
3355 | + else: |
3356 | + self.rate = instant_rate |
3357 | + self.initialized = True |
3358 | |
3359 | === added file 'txstatsd/stats/exponentiallydecayingsample.py' |
3360 | --- txstatsd/stats/exponentiallydecayingsample.py 1970-01-01 00:00:00 +0000 |
3361 | +++ txstatsd/stats/exponentiallydecayingsample.py 2012-05-14 16:57:19 +0000 |
3362 | @@ -0,0 +1,124 @@ |
3363 | +import bisect |
3364 | +import math |
3365 | +import random |
3366 | +import time |
3367 | + |
3368 | + |
3369 | +class ExponentiallyDecayingSample(object): |
3370 | + """ |
3371 | + An exponentially-decaying random sample of values. Uses Cormode et |
3372 | + al's forward-decaying priority reservoir sampling method to produce a |
3373 | + statistically representative sample, exponentially biased towards newer |
3374 | + entries. |
3375 | + |
3376 | + See: |
3377 | + - U{Cormode et al. Forward Decay: A Practical Time Decay Model for |
3378 | + Streaming Systems. ICDE '09: Proceedings of the 2009 IEEE International |
3379 | + Conference on Data Engineering (2009) |
3380 | + <http://www.research.att.com/people/Cormode_Graham/ |
3381 | + library/publications/CormodeShkapenyukSrivastavaXu09.pdf>} |
3382 | + """ |
3383 | + |
3384 | + # 10 minutes (in seconds) |
3385 | + RESCALE_THRESHOLD = 60 * 10 |
3386 | + |
3387 | + def __init__(self, reservoir_size, alpha, wall_time=None): |
3388 | + """Creates a new C{ExponentiallyDecayingSample}. |
3389 | + |
3390 | + @param reservoir_size: The number of samples to keep in the sampling |
3391 | + reservoir. |
3392 | + @parama alpha: The exponential decay factor; the higher this is, |
3393 | + the more biased the sample will be towards newer values. |
3394 | + """ |
3395 | + self._values = [] |
3396 | + self.alpha = alpha |
3397 | + self.reservoir_size = reservoir_size |
3398 | + |
3399 | + self.count = 0 |
3400 | + self.start_time = 0 |
3401 | + self.next_scale_time = 0 |
3402 | + |
3403 | + if wall_time is None: |
3404 | + wall_time = time.time |
3405 | + self._wall_time = wall_time |
3406 | + self.clear() |
3407 | + |
3408 | + def clear(self): |
3409 | + self._values = [] |
3410 | + self.count = 0 |
3411 | + self.start_time = self.tick() |
3412 | + self.next_scale_time = ( |
3413 | + self._wall_time() + self.RESCALE_THRESHOLD) |
3414 | + |
3415 | + def size(self): |
3416 | + return min(self.reservoir_size, self.count) |
3417 | + |
3418 | + def update(self, value, timestamp=None): |
3419 | + """Adds an old value with a fixed timestamp to the sample. |
3420 | + |
3421 | + @param value: The value to be added. |
3422 | + @param timestamp: The epoch timestamp of *value* in seconds. |
3423 | + """ |
3424 | + |
3425 | + now = self._wall_time() |
3426 | + next = self.next_scale_time |
3427 | + if now >= next: |
3428 | + self.rescale(now, next) |
3429 | + |
3430 | + if timestamp is None: |
3431 | + timestamp = self.tick() |
3432 | + |
3433 | + priority = self.weight(timestamp - self.start_time) / random.random() |
3434 | + self.count += 1 |
3435 | + new_count = self.count |
3436 | + if new_count <= self.reservoir_size: |
3437 | + bisect.insort(self._values, (priority, value)) |
3438 | + else: |
3439 | + first = self._values[0][0] |
3440 | + |
3441 | + if first < priority: |
3442 | + bisect.insort(self._values, (priority, value)) |
3443 | + self._values = self._values[1:] |
3444 | + |
3445 | + def get_values(self): |
3446 | + return [v for (k, v) in self._values] |
3447 | + |
3448 | + def tick(self): |
3449 | + return self._wall_time() |
3450 | + |
3451 | + def weight(self, t): |
3452 | + return math.exp(self.alpha * t) |
3453 | + |
3454 | + def rescale(self, now, next): |
3455 | + """ |
3456 | + A common feature of the above techniques - indeed, the key technique |
3457 | + that allows us to track the decayed weights efficiently - is that they |
3458 | + maintain counts and other quantities based on g(ti - L), and only |
3459 | + scale by g(t - L) at query time. But while g(ti - L)/g(t-L) is |
3460 | + guaranteed to lie between zero and one, the intermediate values of |
3461 | + g(ti - L) could become very large. For polynomial functions, these |
3462 | + values should not grow too large, and should be effectively |
3463 | + represented in practice by floating point values without loss of |
3464 | + precision. For exponential functions, these values could grow quite |
3465 | + large as new values of (ti - L) become large, and potentially exceed |
3466 | + the capacity of common floating point types. However, since the values |
3467 | + stored by the algorithms are linear combinations of g values (scaled |
3468 | + sums), they can be rescaled relative to a new landmark. That is, by |
3469 | + the analysis of exponential decay in Section III-A, the choice of L |
3470 | + does not affect the final result. We can therefore multiply each value |
3471 | + based on L by a factor of exp(-alpha(L' - L)), and obtain the correct |
3472 | + value as if we had instead computed relative to a new landmark L' (and |
3473 | + then use this new L' at query time). This can be done with a linear |
3474 | + pass over whatever data structure is being used. |
3475 | + """ |
3476 | + |
3477 | + self.next_scale_time = ( |
3478 | + now + self.RESCALE_THRESHOLD) |
3479 | + old_start_time = self.start_time |
3480 | + self.start_time = self.tick() |
3481 | + |
3482 | + new_values = [] |
3483 | + for k, v in self._values: |
3484 | + nk = k * math.exp(-self.alpha * (self.start_time - old_start_time)) |
3485 | + new_values.append((nk, v)) |
3486 | + self._values = new_values |
3487 | |
3488 | === added file 'txstatsd/stats/uniformsample.py' |
3489 | --- txstatsd/stats/uniformsample.py 1970-01-01 00:00:00 +0000 |
3490 | +++ txstatsd/stats/uniformsample.py 2012-05-14 16:57:19 +0000 |
3491 | @@ -0,0 +1,45 @@ |
3492 | + |
3493 | +import random |
3494 | +import sys |
3495 | + |
3496 | + |
3497 | +class UniformSample(object): |
3498 | + """ |
3499 | + A random sample of a stream of values. Uses Vitter's Algorithm R to |
3500 | + produce a statistically representative sample. |
3501 | + |
3502 | + See: |
3503 | + - U{Random Sampling with a Reservoir |
3504 | + <http://www.cs.umd.edu/~samir/498/vitter.pdf>} |
3505 | + """ |
3506 | + |
3507 | + def __init__(self, reservoir_size): |
3508 | + """Creates a new C{UniformSample}. |
3509 | + |
3510 | + @param reservoir_size: The number of samples to keep in the sampling |
3511 | + reservoir. |
3512 | + """ |
3513 | + self._values = [0 for i in range(reservoir_size)] |
3514 | + self._count = 0 |
3515 | + self.clear() |
3516 | + |
3517 | + def clear(self): |
3518 | + self._values = [0 for i in range(len(self._values))] |
3519 | + self._count = 0 |
3520 | + |
3521 | + def size(self): |
3522 | + c = self._count |
3523 | + return len(self._values) if c > len(self._values) else c |
3524 | + |
3525 | + def update(self, value): |
3526 | + self._count += 1 |
3527 | + if self._count <= len(self._values): |
3528 | + self._values[self._count - 1] = value |
3529 | + else: |
3530 | + r = random.randint(1, sys.maxint) % self._count |
3531 | + if r < len(self._values): |
3532 | + self._values[r] = value |
3533 | + |
3534 | + def get_values(self): |
3535 | + s = self.size() |
3536 | + return [self._values[i] for i in range(0, s)] |
3537 | |
3538 | === added directory 'txstatsd/tests' |
3539 | === added file 'txstatsd/tests/__init__.py' |
3540 | === added file 'txstatsd/tests/helper.py' |
3541 | --- txstatsd/tests/helper.py 1970-01-01 00:00:00 +0000 |
3542 | +++ txstatsd/tests/helper.py 2012-05-14 16:57:19 +0000 |
3543 | @@ -0,0 +1,18 @@ |
3544 | +from txstatsd.metrics.metrics import Metrics |
3545 | + |
3546 | +class FakeStatsDClient(object): |
3547 | + """A fake C{StatsDClient} that simply appends to metrics.data on write.""" |
3548 | + |
3549 | + def __init__(self, metrics): |
3550 | + self.metrics = metrics |
3551 | + |
3552 | + def write(self, data): |
3553 | + self.metrics.data.append(data) |
3554 | + |
3555 | + |
3556 | +class FakeMetrics(Metrics): |
3557 | + """A fake C{IMeter} that simply stores metrics locally.""" |
3558 | + |
3559 | + def __init__(self, namespace=""): |
3560 | + Metrics.__init__(self, FakeStatsDClient(self), namespace=namespace) |
3561 | + self.data = [] |
3562 | |
3563 | === added directory 'txstatsd/tests/metrics' |
3564 | === added file 'txstatsd/tests/metrics/__init__.py' |
3565 | --- txstatsd/tests/metrics/__init__.py 1970-01-01 00:00:00 +0000 |
3566 | +++ txstatsd/tests/metrics/__init__.py 2012-05-14 16:57:19 +0000 |
3567 | @@ -0,0 +1,2 @@ |
3568 | + |
3569 | + |
3570 | |
3571 | === added file 'txstatsd/tests/metrics/test_derive.py' |
3572 | --- txstatsd/tests/metrics/test_derive.py 1970-01-01 00:00:00 +0000 |
3573 | +++ txstatsd/tests/metrics/test_derive.py 2012-05-14 16:57:19 +0000 |
3574 | @@ -0,0 +1,77 @@ |
3575 | +import itertools as it, operator as op, functools as ft |
3576 | + |
3577 | +import random |
3578 | + |
3579 | +from twisted.trial.unittest import TestCase |
3580 | +from twisted.plugin import getPlugins |
3581 | +from twisted.plugins import derive_plugin |
3582 | +from txstatsd.itxstatsd import IMetricFactory |
3583 | + |
3584 | + |
3585 | +class TestDeriveMetricReporter(TestCase): |
3586 | + |
3587 | + mt = derive_plugin.DeriveMetricFactory.metric_type |
3588 | + |
3589 | + |
3590 | + def test_fastpoll(self): |
3591 | + random.seed(1) |
3592 | + |
3593 | + wall_time = random.randint(1, 1000) |
3594 | + reporter = derive_plugin.DeriveMetricReporter( |
3595 | + 'test', wall_time_func=lambda: wall_time ) |
3596 | + self.assertEquals(reporter.flush(random.randint(1, 1000), wall_time), list()) |
3597 | + |
3598 | + for j in xrange(random.randint(1, 100)): |
3599 | + reporter.process([random.random() * 1e3, self.mt]) |
3600 | + self.assertEquals(reporter.flush(random.randint(1, 1000), wall_time), list()) |
3601 | + |
3602 | + |
3603 | + def test_interface(self): |
3604 | + random.seed(1) |
3605 | + |
3606 | + wall_time = random.random() * 1e6 |
3607 | + reporter = derive_plugin.DeriveMetricReporter( |
3608 | + 'test', prefix='some.prefix', wall_time_func=lambda: wall_time ) |
3609 | + |
3610 | + count = random.randint(1, 100) |
3611 | + for j in xrange(count): |
3612 | + wall_time += 1 |
3613 | + reporter.process([random.random() * 1e3, self.mt]) |
3614 | + |
3615 | + ts = random.random() * 1e6 |
3616 | + values = reporter.flush(count, ts) |
3617 | + self.assertEquals(len(values), 2) |
3618 | + self.assertEquals(set(it.imap(len, values)), {3}) |
3619 | + self.assertEquals(set(it.imap(op.itemgetter(2), values)), {ts}) |
3620 | + self.assertEquals( |
3621 | + set(it.imap(op.itemgetter(0), values)), |
3622 | + {'some.prefix.test.count', 'some.prefix.test.rate'} ) |
3623 | + |
3624 | + |
3625 | + def test_values(self): |
3626 | + random.seed(1) |
3627 | + |
3628 | + wall_time = 0 |
3629 | + reporter = derive_plugin.DeriveMetricReporter( |
3630 | + 'test', wall_time_func=lambda: wall_time ) |
3631 | + |
3632 | + val_count = 0 |
3633 | + for i in xrange(100): |
3634 | + rate = (random.random() - 0.5) * 1e3 |
3635 | + count = random.randint(2, 100) |
3636 | + |
3637 | + for j in xrange(1, count): |
3638 | + wall_time += 1 |
3639 | + reporter.process([rate, self.mt]) |
3640 | + val_count += rate |
3641 | + |
3642 | + values = dict(it.imap(op.itemgetter(0, 1), reporter.flush(count, wall_time))) |
3643 | + self.assertTrue(abs(values['test.count'] - val_count) < count**-1) |
3644 | + self.assertTrue(abs(values['test.rate'] - rate) < count**-1) |
3645 | + |
3646 | + |
3647 | +class TestPlugin(TestCase): |
3648 | + |
3649 | + def test_factory(self): |
3650 | + self.assertTrue(derive_plugin.derive_metric_factory in \ |
3651 | + list(getPlugins(IMetricFactory))) |
3652 | |
3653 | === added file 'txstatsd/tests/metrics/test_distinct.py' |
3654 | --- txstatsd/tests/metrics/test_distinct.py 1970-01-01 00:00:00 +0000 |
3655 | +++ txstatsd/tests/metrics/test_distinct.py 2012-05-14 16:57:19 +0000 |
3656 | @@ -0,0 +1,94 @@ |
3657 | +# Copyright (C) 2011 Canonical |
3658 | +# All Rights Reserved |
3659 | +import random |
3660 | + |
3661 | +from scipy.stats import chi2 |
3662 | + |
3663 | +from twisted.trial.unittest import TestCase |
3664 | +from twisted.plugin import getPlugins |
3665 | +from twisted.plugins import distinct_plugin |
3666 | +import txstatsd.metrics.distinctmetric as distinct |
3667 | +from txstatsd.itxstatsd import IMetricFactory |
3668 | + |
3669 | + |
3670 | +class TestHash(TestCase): |
3671 | + |
3672 | + def test_hash_chars(self): |
3673 | + "For one table, all chars map to different chars" |
3674 | + results = set() |
3675 | + for c in range(256): |
3676 | + random.seed(1) |
3677 | + h = distinct.hash(chr(c)) |
3678 | + results.add(h) |
3679 | + self.assertEquals(len(results), 256) |
3680 | + |
3681 | + def test_chi_square(self): |
3682 | + N = 10000 |
3683 | + |
3684 | + for (bits, buckets) in [(-1, 1024), (24, 256), |
3685 | + (16, 256), (8, 256), (0, 256)]: |
3686 | + bins = [0] * buckets |
3687 | + for i in range(N): |
3688 | + v = distinct.hash(str(i)) |
3689 | + if bits < 0: |
3690 | + bin = v / (0xFFFFFFFF / buckets) |
3691 | + else: |
3692 | + bin = (v >> bits) & 0xFF |
3693 | + bins[bin] += 1 |
3694 | + value = sum(((x - N / buckets) ** 2) / (N / buckets) for x in bins) |
3695 | + pval = chi2.cdf(value, N) |
3696 | + if pval > 0.5: |
3697 | + print bins, pval |
3698 | + self.assertTrue(pval < 0.5, "bits %s, pval == %s" % (bits, pval)) |
3699 | + test_chi_square.skip = "Takes too long to run every time." |
3700 | + |
3701 | + |
3702 | +class TestZeros(TestCase): |
3703 | + |
3704 | + def test_zeros(self): |
3705 | + self.assertEquals(distinct.zeros(1), 0) |
3706 | + self.assertEquals(distinct.zeros(2), 1) |
3707 | + self.assertEquals(distinct.zeros(4), 2) |
3708 | + self.assertEquals(distinct.zeros(5), 0) |
3709 | + self.assertEquals(distinct.zeros(8), 3) |
3710 | + self.assertEquals(distinct.zeros(9), 0) |
3711 | + |
3712 | + |
3713 | +class TestDistinct(TestCase): |
3714 | + |
3715 | + def test_all(self): |
3716 | + random.seed(1) |
3717 | + |
3718 | + for r in [1000, 10000]: |
3719 | + cd = distinct.SlidingDistinctCounter(32, 32) |
3720 | + for i in range(r): |
3721 | + cd.add(1, str(i)) |
3722 | + error = abs(cd.distinct() - r) |
3723 | + self.assertTrue(error < 0.15 * r) |
3724 | + |
3725 | + |
3726 | +class TestDistinctMetricReporter(TestCase): |
3727 | + |
3728 | + def test_reports(self): |
3729 | + random.seed(1) |
3730 | + _wall_time = [0] |
3731 | + def _time(): |
3732 | + return _wall_time[0] |
3733 | + |
3734 | + dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time) |
3735 | + for i in range(3000): |
3736 | + _wall_time[0] = i * 50 |
3737 | + dmr.update(str(i)) |
3738 | + now = _time() |
3739 | + self.assertTrue(abs(dmr.count() - 3000) < 600) |
3740 | + self.assertTrue(abs(dmr.count_1min(now) - 1) < 2) |
3741 | + self.assertTrue(abs(dmr.count_1hour(now) - 72) < 15) |
3742 | + self.assertTrue(abs(dmr.count_1day(now) - 1728) < 500) |
3743 | + |
3744 | + |
3745 | +class TestPlugin(TestCase): |
3746 | + |
3747 | + def test_factory(self): |
3748 | + self.assertTrue(distinct_plugin.distinct_metric_factory in \ |
3749 | + list(getPlugins(IMetricFactory))) |
3750 | + |
3751 | |
3752 | === added file 'txstatsd/tests/metrics/test_histogrammetric.py' |
3753 | --- txstatsd/tests/metrics/test_histogrammetric.py 1970-01-01 00:00:00 +0000 |
3754 | +++ txstatsd/tests/metrics/test_histogrammetric.py 2012-05-14 16:57:19 +0000 |
3755 | @@ -0,0 +1,91 @@ |
3756 | + |
3757 | +import math |
3758 | +from unittest import TestCase |
3759 | + |
3760 | +from txstatsd.metrics.histogrammetric import HistogramMetricReporter |
3761 | +from txstatsd.stats.uniformsample import UniformSample |
3762 | + |
3763 | + |
3764 | +class TestHistogramReporterMetric(TestCase): |
3765 | + |
3766 | + def test_histogram_with_zero_recorded_values(self): |
3767 | + sample = UniformSample(100) |
3768 | + histogram = HistogramMetricReporter(sample) |
3769 | + |
3770 | + self.assertEqual(histogram.count, 0, 'Should have a count of 0') |
3771 | + self.assertEqual(histogram.max(), 0, |
3772 | + 'Should have a max of 0') |
3773 | + self.assertEqual(histogram.min(), 0, |
3774 | + 'Should have a min of 0') |
3775 | + |
3776 | + self.assertEqual(histogram.mean(), 0, |
3777 | + 'Should have a mean of 0') |
3778 | + |
3779 | + self.assertEqual(histogram.std_dev(), 0, |
3780 | + 'Should have a standard deviation of 0') |
3781 | + |
3782 | + percentiles = histogram.percentiles(0.5, 0.75, 0.99) |
3783 | + self.assertTrue( |
3784 | + (math.fabs(percentiles[0] - 0) < 0.01), |
3785 | + 'Should calculate percentiles') |
3786 | + self.assertTrue( |
3787 | + (math.fabs(percentiles[1] - 0) < 0.01), |
3788 | + 'Should calculate percentiles') |
3789 | + self.assertTrue( |
3790 | + (math.fabs(percentiles[2] - 0) < 0.01), |
3791 | + 'Should calculate percentiles') |
3792 | + |
3793 | + self.assertEqual(len(histogram.get_values()), 0, |
3794 | + 'Should have no values') |
3795 | + |
3796 | + def test_histogram_of_numbers_1_through_10000(self): |
3797 | + sample = UniformSample(100000) |
3798 | + histogram = HistogramMetricReporter(sample) |
3799 | + for i in range(1, 10001): |
3800 | + histogram.update(i) |
3801 | + |
3802 | + self.assertEqual(histogram.count, 10000, |
3803 | + 'Should have a count of 10000') |
3804 | + |
3805 | + self.assertEqual(histogram.max(), 10000, |
3806 | + 'Should have a max of 10000') |
3807 | + self.assertEqual(histogram.min(), 1, |
3808 | + 'Should have a min of 1') |
3809 | + |
3810 | + self.assertTrue( |
3811 | + (math.fabs(histogram.mean() - 5000.5) < 0.01), |
3812 | + 'Should have a mean value of 5000.5') |
3813 | + |
3814 | + self.assertTrue( |
3815 | + (math.fabs(histogram.std_dev() - 2886.89) < 0.01), |
3816 | + 'Should have a standard deviation of X') |
3817 | + |
3818 | + percentiles = histogram.percentiles(0.5, 0.75, 0.99) |
3819 | + self.assertTrue( |
3820 | + (math.fabs(percentiles[0] - 5000.5) < 0.01), |
3821 | + 'Should calculate percentiles') |
3822 | + self.assertTrue( |
3823 | + (math.fabs(percentiles[1] - 7500.75) < 0.01), |
3824 | + 'Should calculate percentiles') |
3825 | + self.assertTrue( |
3826 | + (math.fabs(percentiles[2] - 9900.99) < 0.01), |
3827 | + 'Should calculate percentiles') |
3828 | + |
3829 | + values = [i for i in range(1, 10001)] |
3830 | + self.assertEqual(histogram.get_values(), values, |
3831 | + 'Should have 10000 values') |
3832 | + |
3833 | + def test_histogram_histogram(self): |
3834 | + sample = UniformSample(100000) |
3835 | + histogram = HistogramMetricReporter(sample) |
3836 | + for i in range(1001, 11001): |
3837 | + histogram.update(i) |
3838 | + |
3839 | + hist = histogram.histogram() |
3840 | + self.assertEquals(sum(hist), 10000) |
3841 | + |
3842 | + total = sum(hist) |
3843 | + binsize = int(total / len(hist)) |
3844 | + for i in hist: |
3845 | + self.assertTrue(abs(i - binsize) <= 1) |
3846 | + |
3847 | |
3848 | === added file 'txstatsd/tests/metrics/test_timermetric.py' |
3849 | --- txstatsd/tests/metrics/test_timermetric.py 1970-01-01 00:00:00 +0000 |
3850 | +++ txstatsd/tests/metrics/test_timermetric.py 2012-05-14 16:57:19 +0000 |
3851 | @@ -0,0 +1,139 @@ |
3852 | + |
3853 | +import math |
3854 | + |
3855 | +from twisted.trial.unittest import TestCase |
3856 | + |
3857 | +from txstatsd.metrics.timermetric import TimerMetricReporter |
3858 | + |
3859 | + |
3860 | +class TestBlankTimerMetric(TestCase): |
3861 | + def setUp(self): |
3862 | + self.timer = TimerMetricReporter('test') |
3863 | + self.timer.tick() |
3864 | + |
3865 | + def test_max(self): |
3866 | + self.assertEqual( |
3867 | + self.timer.max(), 0, |
3868 | + 'Should have a max of zero') |
3869 | + |
3870 | + def test_min(self): |
3871 | + self.assertEqual( |
3872 | + self.timer.min(), 0, |
3873 | + 'Should have a min of zero') |
3874 | + |
3875 | + def test_mean(self): |
3876 | + self.assertEqual( |
3877 | + self.timer.max(), 0, |
3878 | + 'Should have a mean of zero') |
3879 | + |
3880 | + def test_count(self): |
3881 | + self.assertEqual( |
3882 | + self.timer.count(), 0, |
3883 | + 'Should have a count of zero') |
3884 | + |
3885 | + def test_std_dev(self): |
3886 | + self.assertEqual( |
3887 | + self.timer.std_dev(), 0, |
3888 | + 'Should have a standard deviation of zero') |
3889 | + |
3890 | + def test_percentiles(self): |
3891 | + percentiles = self.timer.percentiles(0.5, 0.95, 0.98, 0.99, 0.999) |
3892 | + self.assertEqual( |
3893 | + percentiles[0], 0, |
3894 | + 'Should have median of zero') |
3895 | + self.assertEqual( |
3896 | + percentiles[1], 0, |
3897 | + 'Should have p95 of zero') |
3898 | + self.assertEqual( |
3899 | + percentiles[2], 0, |
3900 | + 'Should have p98 of zero') |
3901 | + self.assertEqual( |
3902 | + percentiles[3], 0, |
3903 | + 'Should have p99 of zero') |
3904 | + self.assertEqual( |
3905 | + percentiles[4], 0, |
3906 | + 'Should have p99.9 of zero') |
3907 | + |
3908 | + def test_mean_rate(self): |
3909 | + self.assertEqual( |
3910 | + self.timer.mean_rate(), 0, |
3911 | + 'Should have a mean rate of zero') |
3912 | + |
3913 | + def test_one_minute_rate(self): |
3914 | + self.assertEqual( |
3915 | + self.timer.one_minute_rate(), 0, |
3916 | + 'Should have a one-minute rate of zero`') |
3917 | + |
3918 | + def test_five_minute_rate(self): |
3919 | + self.assertEqual( |
3920 | + self.timer.five_minute_rate(), 0, |
3921 | + 'Should have a five-minute rate of zero') |
3922 | + |
3923 | + def test_fifteen_minute_rate(self): |
3924 | + self.assertEqual( |
3925 | + self.timer.fifteen_minute_rate(), 0, |
3926 | + 'Should have a fifteen-minute rate of zero') |
3927 | + |
3928 | + def test_no_values(self): |
3929 | + self.assertEqual( |
3930 | + len(self.timer.get_values()), 0, |
3931 | + 'Should have no values') |
3932 | + |
3933 | + |
3934 | +class TestTimingSeriesEvents(TestCase): |
3935 | + def setUp(self): |
3936 | + self.timer = TimerMetricReporter('test') |
3937 | + self.timer.tick() |
3938 | + self.timer.update(10) |
3939 | + self.timer.update(20) |
3940 | + self.timer.update(20) |
3941 | + self.timer.update(30) |
3942 | + self.timer.update(40) |
3943 | + |
3944 | + def test_count(self): |
3945 | + self.assertEqual( |
3946 | + self.timer.count(), 5, |
3947 | + 'Should record the count') |
3948 | + |
3949 | + def test_min(self): |
3950 | + self.assertTrue( |
3951 | + (math.fabs(self.timer.min() - 10.0) < 0.001), |
3952 | + 'Should calculate the minimum duration') |
3953 | + |
3954 | + def test_max(self): |
3955 | + self.assertTrue( |
3956 | + (math.fabs(self.timer.max() - 40.0) < 0.001), |
3957 | + 'Should calculate the maximum duration') |
3958 | + |
3959 | + def test_mean(self): |
3960 | + self.assertTrue( |
3961 | + (math.fabs(self.timer.mean() - 24.0) < 0.001), |
3962 | + 'Should calculate the mean duration') |
3963 | + |
3964 | + def test_std_dev(self): |
3965 | + self.assertTrue( |
3966 | + (math.fabs(self.timer.std_dev() - 11.401) < 0.001), |
3967 | + 'Should calculate the standard deviation') |
3968 | + |
3969 | + def test_percentiles(self): |
3970 | + percentiles = self.timer.percentiles(0.5, 0.95, 0.98, 0.99, 0.999) |
3971 | + self.assertTrue( |
3972 | + (math.fabs(percentiles[0] - 20.0) < 0.001), |
3973 | + 'Should calculate the median') |
3974 | + self.assertTrue( |
3975 | + (math.fabs(percentiles[1] - 40.0) < 0.001), |
3976 | + 'Should calculate the p95') |
3977 | + self.assertTrue( |
3978 | + (math.fabs(percentiles[2] - 40.0) < 0.001), |
3979 | + 'Should calculate the p98') |
3980 | + self.assertTrue( |
3981 | + (math.fabs(percentiles[3] - 40.0) < 0.001), |
3982 | + 'Should calculate the p99') |
3983 | + self.assertTrue( |
3984 | + (math.fabs(percentiles[4] - 40.0) < 0.001), |
3985 | + 'Should calculate the p999') |
3986 | + |
3987 | + def test_values(self): |
3988 | + self.assertEqual( |
3989 | + set(self.timer.get_values()), set([10, 20, 20, 30, 40]), |
3990 | + 'Should have a series of values') |
3991 | |
3992 | === added directory 'txstatsd/tests/stats' |
3993 | === added file 'txstatsd/tests/stats/__init__.py' |
3994 | === added file 'txstatsd/tests/stats/test_ewma.py' |
3995 | --- txstatsd/tests/stats/test_ewma.py 1970-01-01 00:00:00 +0000 |
3996 | +++ txstatsd/tests/stats/test_ewma.py 2012-05-14 16:57:19 +0000 |
3997 | @@ -0,0 +1,315 @@ |
3998 | + |
3999 | +import math |
4000 | +from unittest import TestCase |
4001 | + |
4002 | +from txstatsd.stats.ewma import Ewma |
4003 | + |
4004 | + |
4005 | +def mark_minutes(minutes, ewma): |
4006 | + for i in range(1, minutes * 60, 5): |
4007 | + ewma.tick() |
4008 | + |
4009 | +class TestEwmaOneMinute(TestCase): |
4010 | + def setUp(self): |
4011 | + self.ewma = Ewma.one_minute_ewma() |
4012 | + self.ewma.update(3) |
4013 | + self.ewma.tick() |
4014 | + |
4015 | + def test_first_tick(self): |
4016 | + self.assertTrue( |
4017 | + (math.fabs(self.ewma.rate - 0.6) < 0.000001), |
4018 | + 'Should have a rate of 0.6 events/sec after the first tick') |
4019 | + |
4020 | + def test_one_minute(self): |
4021 | + mark_minutes(1, self.ewma) |
4022 | + self.assertTrue( |
4023 | + (math.fabs(self.ewma.rate - 0.22072766) < 0.00000001), |
4024 | + 'Should have a rate of 0.22072766 events/sec after 1 minute') |
4025 | + |
4026 | + def test_two_minutes(self): |
4027 | + mark_minutes(2, self.ewma) |
4028 | + self.assertTrue( |
4029 | + (math.fabs(self.ewma.rate - 0.08120117) < 0.00000001), |
4030 | + 'Should have a rate of 0.08120117 events/sec after 2 minutes') |
4031 | + |
4032 | + def test_three_minutes(self): |
4033 | + mark_minutes(3, self.ewma) |
4034 | + self.assertTrue( |
4035 | + (math.fabs(self.ewma.rate - 0.02987224) < 0.00000001), |
4036 | + 'Should have a rate of 0.02987224 events/sec after 3 minutes') |
4037 | + |
4038 | + def test_four_minutes(self): |
4039 | + mark_minutes(4, self.ewma) |
4040 | + self.assertTrue( |
4041 | + (math.fabs(self.ewma.rate - 0.01098938) < 0.00000001), |
4042 | + 'Should have a rate of 0.01098938 events/sec after 4 minutes') |
4043 | + |
4044 | + def test_five_minutes(self): |
4045 | + mark_minutes(5, self.ewma) |
4046 | + self.assertTrue( |
4047 | + (math.fabs(self.ewma.rate - 0.00404277) < 0.00000001), |
4048 | + 'Should have a rate of 0.00404277 events/sec after 5 minutes') |
4049 | + |
4050 | + def test_six_minutes(self): |
4051 | + mark_minutes(6, self.ewma) |
4052 | + self.assertTrue( |
4053 | + (math.fabs(self.ewma.rate - 0.00148725) < 0.00000001), |
4054 | + 'Should have a rate of 0.00148725 events/sec after 6 minutes') |
4055 | + |
4056 | + def test_seven_minutes(self): |
4057 | + mark_minutes(7, self.ewma) |
4058 | + self.assertTrue( |
4059 | + (math.fabs(self.ewma.rate - 0.00054713) < 0.00000001), |
4060 | + 'Should have a rate of 0.00054713 events/sec after 7 minutes') |
4061 | + |
4062 | + def test_eight_minutes(self): |
4063 | + mark_minutes(8, self.ewma) |
4064 | + self.assertTrue( |
4065 | + (math.fabs(self.ewma.rate - 0.00020128) < 0.00000001), |
4066 | + 'Should have a rate of 0.00020128 events/sec after 8 minutes') |
4067 | + |
4068 | + def test_nine_minutes(self): |
4069 | + mark_minutes(9, self.ewma) |
4070 | + self.assertTrue( |
4071 | + (math.fabs(self.ewma.rate - 0.00007405) < 0.00000001), |
4072 | + 'Should have a rate of 0.00007405 events/sec after 9 minutes') |
4073 | + |
4074 | + def test_ten_minutes(self): |
4075 | + mark_minutes(10, self.ewma) |
4076 | + self.assertTrue( |
4077 | + (math.fabs(self.ewma.rate - 0.00002724) < 0.00000001), |
4078 | + 'Should have a rate of 0.00002724 events/sec after 10 minutes') |
4079 | + |
4080 | + def test_eleven_minutes(self): |
4081 | + mark_minutes(11, self.ewma) |
4082 | + self.assertTrue( |
4083 | + (math.fabs(self.ewma.rate - 0.00001002) < 0.00000001), |
4084 | + 'Should have a rate of 0.00001002 events/sec after 11 minutes') |
4085 | + |
4086 | + def test_twelve_minutes(self): |
4087 | + mark_minutes(12, self.ewma) |
4088 | + self.assertTrue( |
4089 | + (math.fabs(self.ewma.rate - 0.00000369) < 0.00000001), |
4090 | + 'Should have a rate of 0.00000369 events/sec after 12 minutes') |
4091 | + |
4092 | + def test_thirteen_minutes(self): |
4093 | + mark_minutes(13, self.ewma) |
4094 | + self.assertTrue( |
4095 | + (math.fabs(self.ewma.rate - 0.00000136) < 0.00000001), |
4096 | + 'Should have a rate of 0.00000136 events/sec after 13 minutes') |
4097 | + |
4098 | + def test_fourteen_minutes(self): |
4099 | + mark_minutes(14, self.ewma) |
4100 | + self.assertTrue( |
4101 | + (math.fabs(self.ewma.rate - 0.00000050) < 0.00000001), |
4102 | + 'Should have a rate of 0.00000050 events/sec after 14 minutes') |
4103 | + |
4104 | + def test_fifteen_minutes(self): |
4105 | + mark_minutes(15, self.ewma) |
4106 | + self.assertTrue( |
4107 | + (math.fabs(self.ewma.rate - 0.00000018) < 0.00000001), |
4108 | + 'Should have a rate of 0.00000018 events/sec after 15 minutes') |
4109 | + |
4110 | + |
4111 | +class TestEwmaFiveMinute(TestCase): |
4112 | + def setUp(self): |
4113 | + self.ewma = Ewma.five_minute_ewma() |
4114 | + self.ewma.update(3) |
4115 | + self.ewma.tick() |
4116 | + |
4117 | + def test_first_tick(self): |
4118 | + self.assertTrue( |
4119 | + (math.fabs(self.ewma.rate - 0.6) < 0.000001), |
4120 | + 'Should have a rate of 0.6 events/sec after the first tick') |
4121 | + |
4122 | + def test_one_minute(self): |
4123 | + mark_minutes(1, self.ewma) |
4124 | + self.assertTrue( |
4125 | + (math.fabs(self.ewma.rate - 0.49123845) < 0.00000001), |
4126 | + 'Should have a rate of 0.49123845 events/sec after 1 minute') |
4127 | + |
4128 | + def test_two_minutes(self): |
4129 | + mark_minutes(2, self.ewma) |
4130 | + self.assertTrue( |
4131 | + (math.fabs(self.ewma.rate - 0.40219203) < 0.00000001), |
4132 | + 'Should have a rate of 0.40219203 events/sec after 2 minutes') |
4133 | + |
4134 | + def test_three_minutes(self): |
4135 | + mark_minutes(3, self.ewma) |
4136 | + self.assertTrue( |
4137 | + (math.fabs(self.ewma.rate - 0.32928698) < 0.00000001), |
4138 | + 'Should have a rate of 0.32928698 events/sec after 3 minutes') |
4139 | + |
4140 | + def test_four_minutes(self): |
4141 | + mark_minutes(4, self.ewma) |
4142 | + self.assertTrue( |
4143 | + (math.fabs(self.ewma.rate - 0.26959738) < 0.00000001), |
4144 | + 'Should have a rate of 0.26959738 events/sec after 4 minutes') |
4145 | + |
4146 | + def test_five_minutes(self): |
4147 | + mark_minutes(5, self.ewma) |
4148 | + self.assertTrue( |
4149 | + (math.fabs(self.ewma.rate - 0.22072766) < 0.00000001), |
4150 | + 'Should have a rate of 0.22072766 events/sec after 5 minutes') |
4151 | + |
4152 | + def test_six_minutes(self): |
4153 | + mark_minutes(6, self.ewma) |
4154 | + self.assertTrue( |
4155 | + (math.fabs(self.ewma.rate - 0.18071653) < 0.00000001), |
4156 | + 'Should have a rate of 0.18071653 events/sec after 6 minutes') |
4157 | + |
4158 | + def test_seven_minutes(self): |
4159 | + mark_minutes(7, self.ewma) |
4160 | + self.assertTrue( |
4161 | + (math.fabs(self.ewma.rate - 0.14795818) < 0.00000001), |
4162 | + 'Should have a rate of 0.14795818 events/sec after 7 minutes') |
4163 | + |
4164 | + def test_eight_minutes(self): |
4165 | + mark_minutes(8, self.ewma) |
4166 | + self.assertTrue( |
4167 | + (math.fabs(self.ewma.rate - 0.12113791) < 0.00000001), |
4168 | + 'Should have a rate of 0.12113791 events/sec after 8 minutes') |
4169 | + |
4170 | + def test_nine_minutes(self): |
4171 | + mark_minutes(9, self.ewma) |
4172 | + self.assertTrue( |
4173 | + (math.fabs(self.ewma.rate - 0.09917933) < 0.00000001), |
4174 | + 'Should have a rate of 0.09917933 events/sec after 9 minutes') |
4175 | + |
4176 | + def test_ten_minutes(self): |
4177 | + mark_minutes(10, self.ewma) |
4178 | + self.assertTrue( |
4179 | + (math.fabs(self.ewma.rate - 0.08120117) < 0.00000001), |
4180 | + 'Should have a rate of 0.08120117 events/sec after 10 minutes') |
4181 | + |
4182 | + def test_eleven_minutes(self): |
4183 | + mark_minutes(11, self.ewma) |
4184 | + self.assertTrue( |
4185 | + (math.fabs(self.ewma.rate - 0.06648190) < 0.00000001), |
4186 | + 'Should have a rate of 0.06648190 events/sec after 11 minutes') |
4187 | + |
4188 | + def test_twelve_minutes(self): |
4189 | + mark_minutes(12, self.ewma) |
4190 | + self.assertTrue( |
4191 | + (math.fabs(self.ewma.rate - 0.05443077) < 0.00000001), |
4192 | + 'Should have a rate of 0.05443077 events/sec after 12 minutes') |
4193 | + |
4194 | + def test_thirteen_minutes(self): |
4195 | + mark_minutes(13, self.ewma) |
4196 | + self.assertTrue( |
4197 | + (math.fabs(self.ewma.rate - 0.04456415) < 0.00000001), |
4198 | + 'Should have a rate of 0.04456415 events/sec after 13 minutes') |
4199 | + |
4200 | + def test_fourteen_minutes(self): |
4201 | + mark_minutes(14, self.ewma) |
4202 | + self.assertTrue( |
4203 | + (math.fabs(self.ewma.rate - 0.03648604) < 0.00000001), |
4204 | + 'Should have a rate of 0.03648604 events/sec after 14 minutes') |
4205 | + |
4206 | + def test_fifteen_minutes(self): |
4207 | + mark_minutes(15, self.ewma) |
4208 | + self.assertTrue( |
4209 | + (math.fabs(self.ewma.rate - 0.02987224) < 0.00000001), |
4210 | + 'Should have a rate of 0.02987224 events/sec after 15 minutes') |
4211 | + |
4212 | + |
4213 | +class TestEwmaFifteenMinute(TestCase): |
4214 | + def setUp(self): |
4215 | + self.ewma = Ewma.fifteen_minute_ewma() |
4216 | + self.ewma.update(3) |
4217 | + self.ewma.tick() |
4218 | + |
4219 | + def test_first_tick(self): |
4220 | + self.assertTrue( |
4221 | + (math.fabs(self.ewma.rate - 0.6) < 0.000001), |
4222 | + 'Should have a rate of 0.6 events/sec after the first tick') |
4223 | + |
4224 | + def test_one_minute(self): |
4225 | + mark_minutes(1, self.ewma) |
4226 | + self.assertTrue( |
4227 | + (math.fabs(self.ewma.rate - 0.56130419) < 0.00000001), |
4228 | + 'Should have a rate of 0.56130419 events/sec after 1 minute') |
4229 | + |
4230 | + def test_two_minutes(self): |
4231 | + mark_minutes(2, self.ewma) |
4232 | + self.assertTrue( |
4233 | + (math.fabs(self.ewma.rate - 0.52510399) < 0.00000001), |
4234 | + 'Should have a rate of 0.52510399 events/sec after 2 minutes') |
4235 | + |
4236 | + def test_three_minutes(self): |
4237 | + mark_minutes(3, self.ewma) |
4238 | + self.assertTrue( |
4239 | + (math.fabs(self.ewma.rate - 0.49123845) < 0.00000001), |
4240 | + 'Should have a rate of 0.49123845 events/sec after 3 minutes') |
4241 | + |
4242 | + def test_four_minutes(self): |
4243 | + mark_minutes(4, self.ewma) |
4244 | + self.assertTrue( |
4245 | + (math.fabs(self.ewma.rate - 0.45955700) < 0.00000001), |
4246 | + 'Should have a rate of 0.45955700 events/sec after 4 minutes') |
4247 | + |
4248 | + def test_five_minutes(self): |
4249 | + mark_minutes(5, self.ewma) |
4250 | + self.assertTrue( |
4251 | + (math.fabs(self.ewma.rate - 0.42991879) < 0.00000001), |
4252 | + 'Should have a rate of 0.42991879 events/sec after 5 minutes') |
4253 | + |
4254 | + def test_six_minutes(self): |
4255 | + mark_minutes(6, self.ewma) |
4256 | + self.assertTrue( |
4257 | + (math.fabs(self.ewma.rate - 0.40219203) < 0.00000001), |
4258 | + 'Should have a rate of 0.40219203 events/sec after 6 minutes') |
4259 | + |
4260 | + def test_seven_minutes(self): |
4261 | + mark_minutes(7, self.ewma) |
4262 | + self.assertTrue( |
4263 | + (math.fabs(self.ewma.rate - 0.37625345) < 0.00000001), |
4264 | + 'Should have a rate of 0.37625345 events/sec after 7 minutes') |
4265 | + |
4266 | + def test_eight_minutes(self): |
4267 | + mark_minutes(8, self.ewma) |
4268 | + self.assertTrue( |
4269 | + (math.fabs(self.ewma.rate - 0.35198773) < 0.00000001), |
4270 | + 'Should have a rate of 0.35198773 events/sec after 8 minutes') |
4271 | + |
4272 | + def test_nine_minutes(self): |
4273 | + mark_minutes(9, self.ewma) |
4274 | + self.assertTrue( |
4275 | + (math.fabs(self.ewma.rate - 0.32928698) < 0.00000001), |
4276 | + 'Should have a rate of 0.32928698 events/sec after 9 minutes') |
4277 | + |
4278 | + def test_ten_minutes(self): |
4279 | + mark_minutes(10, self.ewma) |
4280 | + self.assertTrue( |
4281 | + (math.fabs(self.ewma.rate - 0.30805027) < 0.00000001), |
4282 | + 'Should have a rate of 0.30805027 events/sec after 10 minutes') |
4283 | + |
4284 | + def test_eleven_minutes(self): |
4285 | + mark_minutes(11, self.ewma) |
4286 | + self.assertTrue( |
4287 | + (math.fabs(self.ewma.rate - 0.28818318) < 0.00000001), |
4288 | + 'Should have a rate of 0.28818318 events/sec after 11 minutes') |
4289 | + |
4290 | + def test_twelve_minutes(self): |
4291 | + mark_minutes(12, self.ewma) |
4292 | + self.assertTrue( |
4293 | + (math.fabs(self.ewma.rate - 0.26959738) < 0.00000001), |
4294 | + 'Should have a rate of 0.26959738 events/sec after 12 minutes') |
4295 | + |
4296 | + def test_thirteen_minutes(self): |
4297 | + mark_minutes(13, self.ewma) |
4298 | + self.assertTrue( |
4299 | + (math.fabs(self.ewma.rate - 0.25221023) < 0.00000001), |
4300 | + 'Should have a rate of 0.25221023 events/sec after 13 minutes') |
4301 | + |
4302 | + def test_fourteen_minutes(self): |
4303 | + mark_minutes(14, self.ewma) |
4304 | + self.assertTrue( |
4305 | + (math.fabs(self.ewma.rate - 0.23594443) < 0.00000001), |
4306 | + 'Should have a rate of 0.23594443 events/sec after 14 minutes') |
4307 | + |
4308 | + def test_fifteen_minutes(self): |
4309 | + mark_minutes(15, self.ewma) |
4310 | + self.assertTrue( |
4311 | + (math.fabs(self.ewma.rate - 0.22072766) < 0.00000001), |
4312 | + 'Should have a rate of 0.22072766 events/sec after 15 minutes') |
4313 | |
4314 | === added file 'txstatsd/tests/stats/test_exponentiallydecayingsample.py' |
4315 | --- txstatsd/tests/stats/test_exponentiallydecayingsample.py 1970-01-01 00:00:00 +0000 |
4316 | +++ txstatsd/tests/stats/test_exponentiallydecayingsample.py 2012-05-14 16:57:19 +0000 |
4317 | @@ -0,0 +1,84 @@ |
4318 | +import random |
4319 | + |
4320 | +from twisted.trial.unittest import TestCase |
4321 | + |
4322 | +from txstatsd.stats.exponentiallydecayingsample import ( |
4323 | + ExponentiallyDecayingSample) |
4324 | + |
4325 | + |
4326 | +class TestExponentiallyDecayingSample(TestCase): |
4327 | + |
4328 | + def test_100_out_of_1000_elements(self): |
4329 | + population = [i for i in range(0, 100)] |
4330 | + sample = ExponentiallyDecayingSample(1000, 0.99) |
4331 | + for i in population: |
4332 | + sample.update(i) |
4333 | + |
4334 | + self.assertEqual(sample.size(), 100, 'Should have 100 elements') |
4335 | + self.assertEqual(len(sample.get_values()), 100, |
4336 | + 'Should have 100 elements') |
4337 | + self.assertEqual( |
4338 | + len(set(sample.get_values()).difference(set(population))), 0, |
4339 | + 'Should only have elements from the population') |
4340 | + |
4341 | + def test_100_out_of_10_elements(self): |
4342 | + population = [i for i in range(0, 10)] |
4343 | + sample = ExponentiallyDecayingSample(100, 0.99) |
4344 | + for i in population: |
4345 | + sample.update(i) |
4346 | + |
4347 | + self.assertEqual(sample.size(), 10) |
4348 | + self.assertEqual(len(sample.get_values()), 10, |
4349 | + 'Should have 10 elements') |
4350 | + self.assertEqual( |
4351 | + len(set(sample.get_values()).difference(set(population))), 0, |
4352 | + 'Should only have elements from the population') |
4353 | + |
4354 | + def test_heavily_biased_100_out_of_1000_elements(self): |
4355 | + population = [i for i in range(0, 100)] |
4356 | + sample = ExponentiallyDecayingSample(1000, 0.01) |
4357 | + for i in population: |
4358 | + sample.update(i) |
4359 | + |
4360 | + self.assertEqual(sample.size(), 100, 'Should have 100 elements') |
4361 | + self.assertEqual(len(sample.get_values()), 100, |
4362 | + 'Should have 100 elements') |
4363 | + |
4364 | + self.assertEqual( |
4365 | + len(set(sample.get_values()).difference(set(population))), 0, |
4366 | + 'Should only have elements from the population') |
4367 | + |
4368 | + def test_ewma_sample_load(self): |
4369 | + |
4370 | + _time = [10000] |
4371 | + |
4372 | + def wtime(): |
4373 | + return _time[0] |
4374 | + |
4375 | + sample = ExponentiallyDecayingSample(100, 0.99, wall_time=wtime) |
4376 | + sample.RESCALE_THRESHOLD = 100 |
4377 | + sample.clear() |
4378 | + for i in xrange(10000000): |
4379 | + sample.update(random.normalvariate(0, 10)) |
4380 | + _time[0] += 1 |
4381 | + |
4382 | + self.assertEqual(sample.size(), 100) |
4383 | + self.assertEqual(len(sample.get_values()), 100, |
4384 | + 'Should have 100 elements') |
4385 | + test_ewma_sample_load.skip = "takes too long to run" |
4386 | + |
4387 | + def test_ewma_overflow(self): |
4388 | + """Long pauses on metric input should not overflow weight.""" |
4389 | + _time = [10000] |
4390 | + |
4391 | + def wtime(): |
4392 | + return _time[0] |
4393 | + |
4394 | + sample = ExponentiallyDecayingSample(100, 0.99, wall_time=wtime) |
4395 | + for i in xrange(100): |
4396 | + sample.update(random.normalvariate(0, 10)) |
4397 | + _time[0] += 10000 |
4398 | + |
4399 | + self.assertEqual(sample.size(), 100) |
4400 | + self.assertEqual(len(sample.get_values()), 100, |
4401 | + 'Should have 100 elements') |
4402 | |
4403 | === added file 'txstatsd/tests/stats/test_uniformsample.py' |
4404 | --- txstatsd/tests/stats/test_uniformsample.py 1970-01-01 00:00:00 +0000 |
4405 | +++ txstatsd/tests/stats/test_uniformsample.py 2012-05-14 16:57:19 +0000 |
4406 | @@ -0,0 +1,32 @@ |
4407 | + |
4408 | +from unittest import TestCase |
4409 | + |
4410 | +from txstatsd.stats.uniformsample import UniformSample |
4411 | + |
4412 | + |
4413 | +class TestUniformSample(TestCase): |
4414 | + def test_100_out_of_1000_elements(self): |
4415 | + population = [i for i in range(0, 1000)] |
4416 | + sample = UniformSample(100) |
4417 | + for i in population: |
4418 | + sample.update(i) |
4419 | + |
4420 | + self.assertEqual(sample.size(), 100, 'Should have 100 elements') |
4421 | + self.assertEqual(len(sample.get_values()), 100, |
4422 | + 'Should have 100 elements') |
4423 | + self.assertEqual( |
4424 | + len(set(sample.get_values()).difference(set(population))), 0, |
4425 | + 'Should only have elements from the population') |
4426 | + |
4427 | + def test_100_out_of_10_elements(self): |
4428 | + population = [i for i in range(0, 10)] |
4429 | + sample = UniformSample(100) |
4430 | + for i in population: |
4431 | + sample.update(i) |
4432 | + |
4433 | + self.assertEqual(sample.size(), 10, 'Should have 10 elements') |
4434 | + self.assertEqual(len(sample.get_values()), 10, |
4435 | + 'Should have 10 elements') |
4436 | + self.assertEqual( |
4437 | + len(set(sample.get_values()).difference(set(population))), 0, |
4438 | + 'Should only have elements from the population') |
4439 | |
4440 | === added file 'txstatsd/tests/test_client.py' |
4441 | --- txstatsd/tests/test_client.py 1970-01-01 00:00:00 +0000 |
4442 | +++ txstatsd/tests/test_client.py 2012-05-14 16:57:19 +0000 |
4443 | @@ -0,0 +1,93 @@ |
4444 | +"""Tests for the various client classes.""" |
4445 | + |
4446 | +from twisted.internet import reactor |
4447 | +from twisted.internet.defer import inlineCallbacks, Deferred |
4448 | +from twisted.python import log |
4449 | +from twisted.trial.unittest import TestCase |
4450 | + |
4451 | +from txstatsd.client import ( |
4452 | + StatsDClientProtocol, TwistedStatsDClient, UdpStatsDClient) |
4453 | + |
4454 | + |
4455 | +class TestClient(TestCase): |
4456 | + |
4457 | + def setUp(self): |
4458 | + super(TestClient, self).setUp() |
4459 | + self.client = None |
4460 | + self.exception = None |
4461 | + |
4462 | + def tearDown(self): |
4463 | + if self.client: |
4464 | + self.client.transport.stopListening() |
4465 | + super(TestClient, self).tearDown() |
4466 | + |
4467 | + def test_twistedstatsd_write_with_wellformed_address(self): |
4468 | + self.client = TwistedStatsDClient('127.0.0.1', 8000) |
4469 | + protocol = StatsDClientProtocol(self.client) |
4470 | + reactor.listenUDP(0, protocol) |
4471 | + |
4472 | + def ensure_bytes_sent(bytes_sent): |
4473 | + self.assertEqual(bytes_sent, len('message')) |
4474 | + |
4475 | + def exercise(callback): |
4476 | + self.client.write('message', callback=callback) |
4477 | + |
4478 | + d = Deferred() |
4479 | + d.addCallback(ensure_bytes_sent) |
4480 | + reactor.callWhenRunning(exercise, d.callback) |
4481 | + return d |
4482 | + |
4483 | + @inlineCallbacks |
4484 | + def test_twistedstatsd_with_malformed_address_and_errback(self): |
4485 | + def ensure_exception_raised(ignore): |
4486 | + self.assertTrue(self.exception.startswith("DNS lookup failed")) |
4487 | + |
4488 | + def capture_exception_raised(failure): |
4489 | + self.exception = failure.getErrorMessage() |
4490 | + |
4491 | + yield TwistedStatsDClient( |
4492 | + '256.0.0.0', 1, |
4493 | + resolver_errback=capture_exception_raised) |
4494 | + |
4495 | + d = Deferred() |
4496 | + d.addCallback(ensure_exception_raised) |
4497 | + reactor.callLater(.5, d.callback, None) |
4498 | + yield d |
4499 | + |
4500 | + @inlineCallbacks |
4501 | + def test_twistedstatsd_with_malformed_address_and_no_errback(self): |
4502 | + def ensure_exception_raised(ignore): |
4503 | + self.assertTrue(self.exception.startswith("DNS lookup failed")) |
4504 | + |
4505 | + def capture_exception_raised(failure): |
4506 | + self.exception = failure.getErrorMessage() |
4507 | + |
4508 | + self.patch(log, "err", capture_exception_raised) |
4509 | + |
4510 | + yield TwistedStatsDClient('256.0.0.0', 1) |
4511 | + |
4512 | + d = Deferred() |
4513 | + d.addCallback(ensure_exception_raised) |
4514 | + reactor.callLater(.5, d.callback, None) |
4515 | + yield d |
4516 | + |
4517 | + def test_udpstatsd_wellformed_address(self): |
4518 | + client = UdpStatsDClient('localhost', 8000) |
4519 | + self.assertEqual(client.host, '127.0.0.1') |
4520 | + client = UdpStatsDClient(None, None) |
4521 | + self.assertEqual(client.host, None) |
4522 | + |
4523 | + def test_udpstatsd_malformed_address(self): |
4524 | + self.assertRaises(ValueError, |
4525 | + UdpStatsDClient, 'localhost', -1) |
4526 | + self.assertRaises(ValueError, |
4527 | + UdpStatsDClient, 'localhost', 'malformed') |
4528 | + self.assertRaises(ValueError, |
4529 | + UdpStatsDClient, 0, 8000) |
4530 | + |
4531 | + def test_udpstatsd_socket_nonblocking(self): |
4532 | + client = UdpStatsDClient('localhost', 8000) |
4533 | + client.connect() |
4534 | + # According to the python docs (and the source, I've checked) |
4535 | + # setblocking(0) is the same as settimeout(0.0). |
4536 | + self.assertEqual(client.socket.gettimeout(), 0.0) |
4537 | |
4538 | === added file 'txstatsd/tests/test_configurableprocessor.py' |
4539 | --- txstatsd/tests/test_configurableprocessor.py 1970-01-01 00:00:00 +0000 |
4540 | +++ txstatsd/tests/test_configurableprocessor.py 2012-05-14 16:57:19 +0000 |
4541 | @@ -0,0 +1,152 @@ |
4542 | +import time |
4543 | + |
4544 | +from unittest import TestCase |
4545 | + |
4546 | +from twisted.plugins.distinct_plugin import distinct_metric_factory |
4547 | + |
4548 | +from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor |
4549 | + |
4550 | +class FlushMessagesTest(TestCase): |
4551 | + |
4552 | + def test_flush_counter_with_empty_prefix(self): |
4553 | + """ |
4554 | + Ensure no prefix features if none is supplied. |
4555 | + B{Note}: The C{ConfigurableMessageProcessor} reports |
4556 | + the counter value, and not the normalized version as |
4557 | + seen in the StatsD-compliant C{Processor}. |
4558 | + """ |
4559 | + configurable_processor = ConfigurableMessageProcessor( |
4560 | + time_function=lambda: 42) |
4561 | + configurable_processor.process("gorets:17|c") |
4562 | + messages = configurable_processor.flush() |
4563 | + self.assertEqual(("gorets.count", 17, 42), messages[0]) |
4564 | + self.assertEqual(("statsd.numStats", 1, 42), messages[1]) |
4565 | + |
4566 | + def test_flush_counter_with_prefix(self): |
4567 | + """ |
4568 | + Ensure the prefix features if one is supplied. |
4569 | + """ |
4570 | + configurable_processor = ConfigurableMessageProcessor( |
4571 | + time_function=lambda: 42, message_prefix="test.metric") |
4572 | + configurable_processor.process("gorets:17|c") |
4573 | + messages = configurable_processor.flush() |
4574 | + self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0]) |
4575 | + self.assertEqual(("test.metric.statsd.numStats", 1, 42), |
4576 | + messages[1]) |
4577 | + |
4578 | + def test_flush_counter_with_internal_prefix(self): |
4579 | + """ |
4580 | + Ensure the prefix features if one is supplied. |
4581 | + """ |
4582 | + configurable_processor = ConfigurableMessageProcessor( |
4583 | + time_function=lambda: 42, message_prefix="test.metric", |
4584 | + internal_metrics_prefix="statsd.foo.") |
4585 | + configurable_processor.process("gorets:17|c") |
4586 | + messages = configurable_processor.flush() |
4587 | + self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0]) |
4588 | + self.assertEqual(("statsd.foo.numStats", 1, 42), |
4589 | + messages[1]) |
4590 | + |
4591 | + def test_flush_plugin(self): |
4592 | + """ |
4593 | + Ensure the prefix features if one is supplied. |
4594 | + """ |
4595 | + configurable_processor = ConfigurableMessageProcessor( |
4596 | + time_function=lambda: 42, message_prefix="test.metric", |
4597 | + plugins=[distinct_metric_factory]) |
4598 | + configurable_processor.process("gorets:17|pd") |
4599 | + messages = configurable_processor.flush() |
4600 | + self.assertEquals(("test.metric.gorets.count", 1, 42), messages[0]) |
4601 | + |
4602 | + def test_flush_single_timer_single_time(self): |
4603 | + """ |
4604 | + If a single timer with a single data point is present, all |
4605 | + percentiles will be set to the same value. |
4606 | + """ |
4607 | + configurable_processor = ConfigurableMessageProcessor( |
4608 | + time_function=lambda: 42) |
4609 | + |
4610 | + configurable_processor.process("glork:24|ms") |
4611 | + messages = configurable_processor.flush() |
4612 | + |
4613 | + self.assertEqual(('glork.15min_rate', 0.0, 42), messages[0]) |
4614 | + self.assertEqual(('glork.1min_rate', 0.0, 42), messages[1]) |
4615 | + self.assertEqual(('glork.5min_rate', 0.0, 42), messages[2]) |
4616 | + self.assertEqual(("glork.999percentile", 24.0, 42), messages[3]) |
4617 | + self.assertEqual(("glork.99percentile", 24.0, 42), messages[4]) |
4618 | + self.assertEqual(("glork.count", 1., 42), messages[5]) |
4619 | + self.assertEqual(("glork.max", 24.0, 42), messages[6]) |
4620 | + self.assertEqual(("glork.mean", 24.0, 42), messages[7]) |
4621 | + self.assertEqual(("glork.min", 24.0, 42), messages[8]) |
4622 | + self.assertEqual(("glork.stddev", 0.0, 42), messages[9]) |
4623 | + |
4624 | + def test_flush_single_timer_multiple_times(self): |
4625 | + """ |
4626 | + Test reporting of multiple timer metric samples. |
4627 | + """ |
4628 | + configurable_processor = ConfigurableMessageProcessor( |
4629 | + time_function=lambda: 42) |
4630 | + |
4631 | + configurable_processor.process("glork:4|ms") |
4632 | + configurable_processor.update_metrics() |
4633 | + configurable_processor.process("glork:8|ms") |
4634 | + configurable_processor.update_metrics() |
4635 | + configurable_processor.process("glork:15|ms") |
4636 | + configurable_processor.update_metrics() |
4637 | + configurable_processor.process("glork:16|ms") |
4638 | + configurable_processor.update_metrics() |
4639 | + configurable_processor.process("glork:23|ms") |
4640 | + configurable_processor.update_metrics() |
4641 | + configurable_processor.process("glork:42|ms") |
4642 | + configurable_processor.update_metrics() |
4643 | + |
4644 | + messages = configurable_processor.flush() |
4645 | + |
4646 | + self.assertEqual(('glork.15min_rate', 0.20000000000000001, 42), |
4647 | + messages[0]) |
4648 | + self.assertEqual(('glork.1min_rate', 0.20000000000000001, 42), |
4649 | + messages[1]) |
4650 | + self.assertEqual(('glork.5min_rate', 0.20000000000000001, 42), |
4651 | + messages[2]) |
4652 | + self.assertEqual(("glork.999percentile", 42.0, 42), messages[3]) |
4653 | + self.assertEqual(("glork.99percentile", 42.0, 42), messages[4]) |
4654 | + self.assertEqual(('glork.count', 6.0, 42), messages[5]) |
4655 | + self.assertEqual(("glork.max", 42.0, 42), messages[6]) |
4656 | + self.assertEqual(("glork.mean", 18.0, 42), messages[7]) |
4657 | + self.assertEqual(("glork.min", 4.0, 42), messages[8]) |
4658 | + self.assertEqual(("glork.stddev", 13.490738, 42), messages[9]) |
4659 | + |
4660 | + |
4661 | +class FlushMeterMetricMessagesTest(TestCase): |
4662 | + |
4663 | + def setUp(self): |
4664 | + self.configurable_processor = ConfigurableMessageProcessor( |
4665 | + time_function=self.wall_clock_time, message_prefix="test.metric") |
4666 | + self.time_now = int(time.time()) |
4667 | + |
4668 | + def wall_clock_time(self): |
4669 | + return self.time_now |
4670 | + |
4671 | + def mark_minutes(self, minutes): |
4672 | + for i in range(1, minutes * 60, 5): |
4673 | + self.processor.update_metrics() |
4674 | + |
4675 | + def test_flush_meter_metric_with_prefix(self): |
4676 | + """ |
4677 | + Test the correct rendering of the Graphite report for |
4678 | + a meter metric when a prefix is supplied. |
4679 | + """ |
4680 | + self.configurable_processor.process("gorets:3.0|m") |
4681 | + |
4682 | + self.time_now += 1 |
4683 | + messages = self.configurable_processor.flush() |
4684 | + self.assertEqual(("test.metric.gorets.15min_rate", 0.0, self.time_now), |
4685 | + messages[0]) |
4686 | + self.assertEqual(("test.metric.gorets.1min_rate", 0.0, self.time_now), |
4687 | + messages[1]) |
4688 | + self.assertEqual(("test.metric.gorets.5min_rate", 0.0, self.time_now), |
4689 | + messages[2]) |
4690 | + self.assertEqual(("test.metric.gorets.count", 3.0, self.time_now), |
4691 | + messages[3]) |
4692 | + self.assertEqual(("test.metric.gorets.mean_rate", 3.0, self.time_now), |
4693 | + messages[4]) |
4694 | |
4695 | === added file 'txstatsd/tests/test_httpinfo.py' |
4696 | --- txstatsd/tests/test_httpinfo.py 1970-01-01 00:00:00 +0000 |
4697 | +++ txstatsd/tests/test_httpinfo.py 2012-05-14 16:57:19 +0000 |
4698 | @@ -0,0 +1,128 @@ |
4699 | +# -*- coding: utf-8 *-* |
4700 | +import json |
4701 | + |
4702 | +from twisted.trial.unittest import TestCase |
4703 | + |
4704 | +from twisted.internet import reactor, defer, protocol |
4705 | +from twisted.application.service import IService |
4706 | +from twisted.web.client import Agent |
4707 | + |
4708 | +from txstatsd.metrics.timermetric import TimerMetricReporter |
4709 | +from txstatsd.server import httpinfo |
4710 | +from txstatsd import service |
4711 | + |
4712 | + |
4713 | +class Dummy: |
4714 | + flush_interval = 10 |
4715 | + last_flush_duration = 3 |
4716 | + last_process_duration = 2 |
4717 | + |
4718 | + |
4719 | +class ResponseCollector(protocol.Protocol): |
4720 | + |
4721 | + def __init__(self, finished): |
4722 | + self.finished = finished |
4723 | + self.data = [] |
4724 | + |
4725 | + def dataReceived(self, bytes): |
4726 | + self.data.append(bytes) |
4727 | + |
4728 | + def connectionLost(self, reason): |
4729 | + self.finished.callback("".join(self.data)) |
4730 | + |
4731 | + |
4732 | +def collect_response(response): |
4733 | + d = defer.Deferred() |
4734 | + c = ResponseCollector(d) |
4735 | + response.deliverBody(c) |
4736 | + return d |
4737 | + |
4738 | + |
4739 | +class HttpException(Exception): |
4740 | + |
4741 | + def __init__(self, response): |
4742 | + super(HttpException, self).__init__(response.phrase) |
4743 | + self.response = response |
4744 | + |
4745 | + |
4746 | +class ServiceTestsBuilder(TestCase): |
4747 | + |
4748 | + def setUp(self): |
4749 | + self.service = None |
4750 | + |
4751 | + @defer.inlineCallbacks |
4752 | + def get_results(self, path, **kwargs): |
4753 | + webport = 12323 |
4754 | + o = service.StatsDOptions() |
4755 | + o["http-port"] = webport |
4756 | + d = Dummy() |
4757 | + d.__dict__.update(kwargs) |
4758 | + self.service = s = httpinfo.makeService(o, d, d) |
4759 | + s.startService() |
4760 | + agent = Agent(reactor) |
4761 | + |
4762 | + result = yield agent.request('GET', |
4763 | + 'http://localhost:%s/%s' % (webport, path)) |
4764 | + if result.code != 200: |
4765 | + raise HttpException(result) |
4766 | + data = yield collect_response(result) |
4767 | + defer.returnValue(data) |
4768 | + |
4769 | + def tearDown(self): |
4770 | + if self.service is not None: |
4771 | + self.service.stopService() |
4772 | + |
4773 | + @defer.inlineCallbacks |
4774 | + def test_httpinfo_ok(self): |
4775 | + data = yield self.get_results("status") |
4776 | + self.assertEquals(json.loads(data)["status"], "OK") |
4777 | + |
4778 | + @defer.inlineCallbacks |
4779 | + def test_httpinfo_error(self): |
4780 | + try: |
4781 | + data = yield self.get_results("status", last_flush_duration=30) |
4782 | + except HttpException, e: |
4783 | + self.assertEquals(e.response.code, 500) |
4784 | + else: |
4785 | + self.fail("Not 500") |
4786 | + |
4787 | + @defer.inlineCallbacks |
4788 | + def test_httpinfo_timer(self): |
4789 | + try: |
4790 | + data = yield self.get_results("metrics/gorets", |
4791 | + timer_metrics={'gorets': 100}) |
4792 | + except HttpException, e: |
4793 | + self.assertEquals(e.response.code, 404) |
4794 | + else: |
4795 | + self.fail("Not 404") |
4796 | + |
4797 | + @defer.inlineCallbacks |
4798 | + def test_httpinfo_timer2(self): |
4799 | + """Returns the canonical empty histogram without data.""" |
4800 | + tmr = TimerMetricReporter('gorets') |
4801 | + data = yield self.get_results("metrics/gorets", |
4802 | + timer_metrics={'gorets': tmr}) |
4803 | + self.assertEquals(json.loads(data)["histogram"], [0.] * 10) |
4804 | + |
4805 | + @defer.inlineCallbacks |
4806 | + def test_httpinfo_timer3(self): |
4807 | + """Returns a valid histogram with data.""" |
4808 | + |
4809 | + tmr = TimerMetricReporter('gorets') |
4810 | + for i in range(1, 1001): |
4811 | + tmr.histogram.update(i) |
4812 | + data = yield self.get_results("metrics/gorets", |
4813 | + timer_metrics={'gorets': tmr}) |
4814 | + hist = json.loads(data) |
4815 | + self.assertTrue(isinstance(hist, dict)) |
4816 | + self.assertEquals(sum(hist["histogram"]), 1000) |
4817 | + |
4818 | + @defer.inlineCallbacks |
4819 | + def test_httpinfo_fake_plugin(self): |
4820 | + """Also works for plugins.""" |
4821 | + |
4822 | + tmr = TimerMetricReporter('gorets') |
4823 | + data = yield self.get_results("metrics/gorets", |
4824 | + timer_metrics={}, plugin_metrics={'gorets': tmr}) |
4825 | + hist = json.loads(data) |
4826 | + self.assertTrue(isinstance(hist, dict)) |
4827 | |
4828 | === added file 'txstatsd/tests/test_loggingprocessor.py' |
4829 | --- txstatsd/tests/test_loggingprocessor.py 1970-01-01 00:00:00 +0000 |
4830 | +++ txstatsd/tests/test_loggingprocessor.py 2012-05-14 16:57:19 +0000 |
4831 | @@ -0,0 +1,64 @@ |
4832 | + |
4833 | +from unittest import TestCase |
4834 | + |
4835 | +from twisted.plugins.distinct_plugin import distinct_metric_factory |
4836 | + |
4837 | +from txstatsd.server.loggingprocessor import LoggingMessageProcessor |
4838 | + |
4839 | + |
4840 | +class FakeMeterMetric(object): |
4841 | + def report(self, *args): |
4842 | + return [('Sample report', 1, 2)] |
4843 | + |
4844 | +class TestLogger(object): |
4845 | + def __init__(self): |
4846 | + self.log = '' |
4847 | + |
4848 | + def info(self, measurement): |
4849 | + self.log += measurement + "\n" |
4850 | + |
4851 | + |
4852 | +class TestLoggingMessageProcessor(TestCase): |
4853 | + |
4854 | + def test_logger_with_no_info(self): |
4855 | + def invoker(): |
4856 | + logger = 'logger' |
4857 | + LoggingMessageProcessor(logger) |
4858 | + |
4859 | + self.assertRaises(TypeError, invoker) |
4860 | + |
4861 | + def test_logger_with_non_callable_info(self): |
4862 | + def invoker(): |
4863 | + class Logger(object): |
4864 | + def __init__(self): |
4865 | + self.info = 'logger' |
4866 | + |
4867 | + logger = Logger() |
4868 | + LoggingMessageProcessor(logger) |
4869 | + |
4870 | + self.assertRaises(TypeError, invoker) |
4871 | + |
4872 | + def test_logger(self): |
4873 | + logger = TestLogger() |
4874 | + processor = LoggingMessageProcessor(logger) |
4875 | + metric = FakeMeterMetric() |
4876 | + processor.meter_metrics['test'] = metric |
4877 | + processor.flush() |
4878 | + expected = ["Out: %s %s %s" % message |
4879 | + for message in metric.report()] |
4880 | + self.assertFalse(set(expected).difference(logger.log.splitlines())) |
4881 | + |
4882 | + def test_logger_plugin(self): |
4883 | + logger = TestLogger() |
4884 | + processor = LoggingMessageProcessor( |
4885 | + logger, plugins=[distinct_metric_factory], |
4886 | + time_function=lambda: 42) |
4887 | + msg_in = "gorets:17|pd" |
4888 | + processor.process(msg_in) |
4889 | + processor.flush() |
4890 | + messages = processor.plugin_metrics['gorets'].flush( |
4891 | + 10, processor.time_function()) |
4892 | + expected = ["In: %s" % msg_in]\ |
4893 | + + ["Out: %s %s %s" % message |
4894 | + for message in messages] |
4895 | + self.assertFalse(set(expected).difference(logger.log.splitlines())) |
4896 | |
4897 | === added file 'txstatsd/tests/test_metrics.py' |
4898 | --- txstatsd/tests/test_metrics.py 1970-01-01 00:00:00 +0000 |
4899 | +++ txstatsd/tests/test_metrics.py 2012-05-14 16:57:19 +0000 |
4900 | @@ -0,0 +1,120 @@ |
4901 | +"""Tests for the Metrics convenience class.""" |
4902 | + |
4903 | +import re |
4904 | +import time |
4905 | +from unittest import TestCase |
4906 | +from txstatsd.metrics.extendedmetrics import ExtendedMetrics |
4907 | +from txstatsd.metrics.metrics import Metrics |
4908 | + |
4909 | + |
4910 | +class FakeStatsDClient(object): |
4911 | + |
4912 | + def connect(self): |
4913 | + """Connect to the StatsD server.""" |
4914 | + pass |
4915 | + |
4916 | + def disconnect(self): |
4917 | + """Disconnect from the StatsD server.""" |
4918 | + pass |
4919 | + |
4920 | + def write(self, data): |
4921 | + """Send the metric to the StatsD server.""" |
4922 | + self.data = data |
4923 | + |
4924 | + |
4925 | +class TestMetrics(TestCase): |
4926 | + |
4927 | + def setUp(self): |
4928 | + self.connection = FakeStatsDClient() |
4929 | + self.metrics = Metrics(self.connection, 'txstatsd.tests') |
4930 | + |
4931 | + def test_gauge(self): |
4932 | + """Test reporting of a gauge metric sample.""" |
4933 | + self.metrics.gauge('gauge', 102) |
4934 | + self.assertEqual(self.connection.data, |
4935 | + 'txstatsd.tests.gauge:102|g') |
4936 | + |
4937 | + def test_meter(self): |
4938 | + """Test reporting of a meter metric sample.""" |
4939 | + self.metrics.meter('meter', 3) |
4940 | + self.assertEqual(self.connection.data, |
4941 | + 'txstatsd.tests.meter:3|m') |
4942 | + |
4943 | + def test_counter(self): |
4944 | + """Test the increment and decrement operations.""" |
4945 | + self.metrics.increment('counter', 18) |
4946 | + self.assertEqual(self.connection.data, |
4947 | + 'txstatsd.tests.counter:18|c') |
4948 | + self.metrics.decrement('counter', 9) |
4949 | + self.assertEqual(self.connection.data, |
4950 | + 'txstatsd.tests.counter:-9|c') |
4951 | + |
4952 | + def test_timing(self): |
4953 | + """Test the timing operation.""" |
4954 | + self.metrics.timing('timing', 101.1234) |
4955 | + self.assertEqual(self.connection.data, |
4956 | + 'txstatsd.tests.timing:101123.4|ms') |
4957 | + |
4958 | + def test_timing_automatic(self): |
4959 | + """Test the automatic timing operation with explicit reset""" |
4960 | + start_time = time.time() |
4961 | + |
4962 | + self.metrics.reset_timing() |
4963 | + time.sleep(.1) |
4964 | + self.metrics.timing('timing') |
4965 | + |
4966 | + elapsed = time.time() - start_time |
4967 | + |
4968 | + label, val, units = re.split(":|\|", self.connection.data) |
4969 | + self.assertEqual(label, 'txstatsd.tests.timing') |
4970 | + self.assertEqual(units, 'ms') |
4971 | + self.assertTrue(100 <= float(val) <= elapsed * 1000) |
4972 | + |
4973 | + def test_timing_automatic_implicit_reset(self): |
4974 | + """Test the automatic timing operation with implicit reset""" |
4975 | + start_time = time.time() |
4976 | + |
4977 | + self.metrics.timing('something_else') |
4978 | + time.sleep(.1) |
4979 | + self.metrics.timing('timing') |
4980 | + |
4981 | + elapsed = time.time() - start_time |
4982 | + |
4983 | + label, val, units = re.split(":|\|", self.connection.data) |
4984 | + self.assertEqual(label, 'txstatsd.tests.timing') |
4985 | + self.assertEqual(units, 'ms') |
4986 | + self.assertTrue(100 <= float(val) <= elapsed * 1000) |
4987 | + |
4988 | + def test_generic(self): |
4989 | + """Test the GenericMetric class.""" |
4990 | + self.metrics.report('users', "pepe", "pd") |
4991 | + self.assertEqual(self.connection.data, |
4992 | + 'txstatsd.tests.users:pepe|pd') |
4993 | + |
4994 | + def test_empty_namespace(self): |
4995 | + """Test reporting of an empty namespace.""" |
4996 | + self.metrics.namespace = None |
4997 | + self.metrics.gauge('gauge', 213) |
4998 | + self.assertEqual(self.connection.data, |
4999 | + 'gauge:213|g') |
5000 | + |
Theres already a plugin using the "d" kay, that counts distinct values exaclty using redis for real time reports and postgres for long term storage. maybe we could change this metric to "dx" or something like that?
see https:/ /code.launchpad .net/~txstatsd- dev/txstatsd/ distinct- plugin
Is there any reason to do this and not use graphite to calculate the derivative?