Merge lp:~mk-fraggod/txstatsd/txstatsd into lp:txstatsd

Proposed by Mike Kazantsev
Status: Merged
Merged at revision: 92
Proposed branch: lp:~mk-fraggod/txstatsd/txstatsd
Merge into: lp:txstatsd
Diff against target: 6367 lines (+6061/-0) (has conflicts)
56 files modified
.bzrignore (+2/-0)
LICENSE (+20/-0)
MANIFEST.in (+5/-0)
README (+21/-0)
example-stats-client.tac (+52/-0)
requirements.txt (+7/-0)
setup.py (+67/-0)
statsd.tac (+8/-0)
twisted/plugins/derive_plugin.py (+69/-0)
twisted/plugins/distinct_plugin.py (+20/-0)
twisted/plugins/txstatsd_plugin.py (+22/-0)
txstatsd.conf-example (+26/-0)
txstatsd/client.py (+165/-0)
txstatsd/itxstatsd.py (+54/-0)
txstatsd/metrics/countermetric.py (+67/-0)
txstatsd/metrics/distinctmetric.py (+150/-0)
txstatsd/metrics/extendedmetrics.py (+53/-0)
txstatsd/metrics/gaugemetric.py (+43/-0)
txstatsd/metrics/histogrammetric.py (+209/-0)
txstatsd/metrics/imetrics.py (+18/-0)
txstatsd/metrics/metermetric.py (+103/-0)
txstatsd/metrics/metric.py (+47/-0)
txstatsd/metrics/metrics.py (+140/-0)
txstatsd/metrics/timermetric.py (+161/-0)
txstatsd/process.py (+217/-0)
txstatsd/report.py (+64/-0)
txstatsd/server/configurableprocessor.py (+108/-0)
txstatsd/server/httpinfo.py (+60/-0)
txstatsd/server/loggingprocessor.py (+36/-0)
txstatsd/server/processor.py (+371/-0)
txstatsd/server/protocol.py (+63/-0)
txstatsd/server/router.py (+308/-0)
txstatsd/service.py (+340/-0)
txstatsd/stats/ewma.py (+70/-0)
txstatsd/stats/exponentiallydecayingsample.py (+124/-0)
txstatsd/stats/uniformsample.py (+45/-0)
txstatsd/tests/helper.py (+18/-0)
txstatsd/tests/metrics/__init__.py (+2/-0)
txstatsd/tests/metrics/test_derive.py (+77/-0)
txstatsd/tests/metrics/test_distinct.py (+94/-0)
txstatsd/tests/metrics/test_histogrammetric.py (+91/-0)
txstatsd/tests/metrics/test_timermetric.py (+139/-0)
txstatsd/tests/stats/test_ewma.py (+315/-0)
txstatsd/tests/stats/test_exponentiallydecayingsample.py (+84/-0)
txstatsd/tests/stats/test_uniformsample.py (+32/-0)
txstatsd/tests/test_client.py (+93/-0)
txstatsd/tests/test_configurableprocessor.py (+152/-0)
txstatsd/tests/test_httpinfo.py (+128/-0)
txstatsd/tests/test_loggingprocessor.py (+64/-0)
txstatsd/tests/test_metrics.py (+120/-0)
txstatsd/tests/test_process.py (+322/-0)
txstatsd/tests/test_processor.py (+406/-0)
txstatsd/tests/test_report.py (+84/-0)
txstatsd/tests/test_router.py (+218/-0)
txstatsd/tests/test_service.py (+286/-0)
txstatsd/version.py (+1/-0)
Conflict adding file .bzrignore.  Moved existing file to .bzrignore.moved.
Conflict adding file LICENSE.  Moved existing file to LICENSE.moved.
Conflict adding file MANIFEST.in.  Moved existing file to MANIFEST.in.moved.
Conflict adding file README.  Moved existing file to README.moved.
Conflict adding file example-stats-client.tac.  Moved existing file to example-stats-client.tac.moved.
Conflict adding file requirements.txt.  Moved existing file to requirements.txt.moved.
Conflict adding file setup.py.  Moved existing file to setup.py.moved.
Conflict adding file statsd.tac.  Moved existing file to statsd.tac.moved.
Conflict adding file twisted.  Moved existing file to twisted.moved.
Conflict adding file txstatsd.conf-example.  Moved existing file to txstatsd.conf-example.moved.
Conflict adding file txstatsd.  Moved existing file to txstatsd.moved.
To merge this branch: bzr merge lp:~mk-fraggod/txstatsd/txstatsd
Reviewer Review Type Date Requested Status
txStatsD Developers Pending
Review via email: mp+105687@code.launchpad.net

This proposal supersedes a proposal from 2012-04-19.

Description of the change

Adds --dump-mode option with passing data further to graphite (as per suggestion on the IRC) and derive_plugin for instant-rate meters, tests included.

I've squashed commits per-feature, original history can be found in the github fork (github.com/mk-fg/txstatsd, and upstream_merge branch there).

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) wrote : Posted in a previous version of this proposal

Theres already a plugin using the "d" kay, that counts distinct values exaclty using redis for real time reports and postgres for long term storage. maybe we could change this metric to "dx" or something like that?

see https://code.launchpad.net/~txstatsd-dev/txstatsd/distinct-plugin

Is there any reason to do this and not use graphite to calculate the derivative?

Revision history for this message
Mike Kazantsev (mk-fraggod) wrote : Posted in a previous version of this proposal

Will update to use "dx", sure, just failed to notice the conflict.

About graphite-side derivative calculation - I assume you question the need for "rate" value with "count" already present.

1. I find it harder (from usability standpoint) to make graphite calculate derivative (than just pre-calculate one) every time the value is needed. Especially true for wildcard graphs, dashboards and any custom data interface.

2. "derivative()" in graphite is incapable to handle overflow/restart correctly, producing huge spike whenever that happens, totally screwing up any graph, while "rate" sent from statsd does not suffer from that issue at all.

3. "count" offset (say, when the absolute level was known-baseline) can be calculated from rate by summing all the rate values since then, and the damage from overflow/restart with this approach is minimal, compared to "reset to a totally arbitrary value".

4. "count" value does not make any sense in nearly all cases - it's a completely arbitrary value offset from some point in time. All the real-world use-cases I have for such metric type are for "rate" only.

Initially I've left "count" there just for consistency with "meter" metric, which can be used if you need just "count", I guess, but at this point I think it was a mistake.

So what do you think about leaving only the "rate" value there?

It would definitely be better for my purposes - minus one nesting level, lesser load and disk usage.

If you believe it'd make sense to have only ".count" in mainline, existing "meter" metric should probably be used instead, and the plugin isn't needed there at all, I guess.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file '.bzrignore'
2--- .bzrignore 1970-01-01 00:00:00 +0000
3+++ .bzrignore 2012-05-14 16:57:19 +0000
4@@ -0,0 +1,2 @@
5+_trial_temp*
6+twisted/plugins/dropin.cache
7
8=== renamed file '.bzrignore' => '.bzrignore.moved'
9=== added file 'LICENSE'
10--- LICENSE 1970-01-01 00:00:00 +0000
11+++ LICENSE 2012-05-14 16:57:19 +0000
12@@ -0,0 +1,20 @@
13+Copyright (C) 2011 Canonical Services Ltd
14+
15+Permission is hereby granted, free of charge, to any person obtaining
16+a copy of this software and associated documentation files (the
17+"Software"), to deal in the Software without restriction, including
18+without limitation the rights to use, copy, modify, merge, publish,
19+distribute, sublicense, and/or sell copies of the Software, and to
20+permit persons to whom the Software is furnished to do so, subject to
21+the following conditions:
22+
23+The above copyright notice and this permission notice shall be
24+included in all copies or substantial portions of the Software.
25+
26+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
27+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
28+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
29+IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
30+CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
31+TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
32+SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
33
34=== renamed file 'LICENSE' => 'LICENSE.moved'
35=== added file 'MANIFEST.in'
36--- MANIFEST.in 1970-01-01 00:00:00 +0000
37+++ MANIFEST.in 2012-05-14 16:57:19 +0000
38@@ -0,0 +1,5 @@
39+include README
40+include LICENSE
41+include setup.py
42+recursive-include txstatsd *.py
43+graft twisted
44
45=== renamed file 'MANIFEST.in' => 'MANIFEST.in.moved'
46=== added file 'README'
47--- README 1970-01-01 00:00:00 +0000
48+++ README 2012-05-14 16:57:19 +0000
49@@ -0,0 +1,21 @@
50+Dependencies
51+------------
52+
53+* Python
54+
55+* The twisted python package (python-twisted on Debian or similar systems)
56+
57+Things present here
58+-------------------
59+
60+* The txstatsd python package, containing a server and a client implementation
61+ for the statsd protocol.
62+
63+* The metrics support borrows from Coda Hale's Metrics project
64+ https://github.com/codahale/metrics.
65+
66+License
67+-------
68+
69+txStatsD is open source software, MIT License. See the LICENSE file for more
70+details.
71
72=== renamed file 'README' => 'README.moved'
73=== added file 'example-stats-client.tac'
74--- example-stats-client.tac 1970-01-01 00:00:00 +0000
75+++ example-stats-client.tac 2012-05-14 16:57:19 +0000
76@@ -0,0 +1,52 @@
77+import socket
78+import random
79+
80+from twisted.internet import reactor
81+from twisted.internet import task
82+
83+from twisted.application.service import Application
84+
85+from txstatsd.client import (
86+ TwistedStatsDClient, StatsDClientProtocol)
87+from txstatsd.metrics.metrics import Metrics
88+from txstatsd.process import PROCESS_STATS
89+from txstatsd.report import ReportingService
90+
91+
92+STATSD_HOST = "127.0.0.1"
93+STATSD_PORT = 8125
94+
95+application = Application("example-stats-client")
96+statsd_client = TwistedStatsDClient(STATSD_HOST, STATSD_PORT)
97+metrics = Metrics(connection=statsd_client,
98+ namespace=socket.gethostname() + ".example-client")
99+
100+reporting = ReportingService()
101+reporting.setServiceParent(application)
102+
103+for report in PROCESS_STATS:
104+ reporting.schedule(report, 10, metrics.increment)
105+
106+def random_walker(name):
107+ """Meters a random walk."""
108+ if random.random() > 0.5:
109+ metrics.increment(name)
110+ else:
111+ metrics.decrement(name)
112+
113+def random_normal(name):
114+ """Meters samples from a normal distribution."""
115+ metrics.timing(name, random.normalvariate(10, 3))
116+
117+
118+for n in range(5):
119+ t = task.LoopingCall(random_walker, name="walker%i" % n)
120+ t.start(0.5, now=False)
121+
122+for n in range(5):
123+ t = task.LoopingCall(random_normal, name="normal%i" % n)
124+ t.start(0.5, now=False)
125+
126+
127+protocol = StatsDClientProtocol(statsd_client)
128+reactor.listenUDP(0, protocol)
129
130=== renamed file 'example-stats-client.tac' => 'example-stats-client.tac.moved'
131=== added file 'requirements.txt'
132--- requirements.txt 1970-01-01 00:00:00 +0000
133+++ requirements.txt 2012-05-14 16:57:19 +0000
134@@ -0,0 +1,7 @@
135+Twisted==11.0.0
136+mocker==1.1
137+numpy==1.6.1
138+psutil==0.4.0
139+scipy==0.9.0
140+wsgiref==0.1.2
141+zope.interface==3.8.0
142
143=== renamed file 'requirements.txt' => 'requirements.txt.moved'
144=== added file 'setup.py'
145--- setup.py 1970-01-01 00:00:00 +0000
146+++ setup.py 2012-05-14 16:57:19 +0000
147@@ -0,0 +1,67 @@
148+from distutils.core import setup
149+from distutils.command.install import install
150+from glob import glob
151+import os
152+
153+from twisted.plugin import IPlugin, getPlugins
154+
155+from txstatsd import version
156+
157+# If setuptools is present, use it to find_packages(), and also
158+# declare our dependency on epsilon.
159+extra_setup_args = {}
160+try:
161+ import setuptools
162+ from setuptools import find_packages
163+except ImportError:
164+ def find_packages():
165+ """
166+ Compatibility wrapper.
167+
168+ Taken from storm setup.py.
169+ """
170+ packages = []
171+ for directory, subdirectories, files in os.walk("txstatsd"):
172+ if '__init__.py' in files:
173+ packages.append(directory.replace(os.sep, '.'))
174+ return packages
175+
176+long_description = """
177+Twisted-based implementation of a statsd-compatible server and client.
178+"""
179+
180+
181+class TxPluginInstaller(install):
182+ def run(self):
183+ install.run(self)
184+ # Make sure we refresh the plugin list when installing, so we know
185+ # we have enough write permissions.
186+ # see http://twistedmatrix.com/documents/current/core/howto/plugin.html
187+ # "when installing or removing software which provides Twisted plugins,
188+ # the site administrator should be sure the cache is regenerated"
189+ list(getPlugins(IPlugin))
190+
191+setup(
192+ cmdclass = {'install': TxPluginInstaller},
193+ name="txStatsD",
194+ version=version.txstatsd,
195+ description="A network daemon for aggregating statistics",
196+ author="txStatsD Developers",
197+ url="https://launchpad.net/txstatsd",
198+ license="MIT",
199+ packages=find_packages() + ["twisted.plugins"],
200+ scripts=glob("./bin/*"),
201+ long_description=long_description,
202+ classifiers=[
203+ "Development Status :: 4 - Beta",
204+ "Intended Audience :: Developers",
205+ "Intended Audience :: System Administrators",
206+ "Intended Audience :: Information Technology",
207+ "Programming Language :: Python",
208+ "Topic :: Database",
209+ "Topic :: Internet :: WWW/HTTP",
210+ "License :: OSI Approved :: MIT License",
211+ ],
212+ **extra_setup_args
213+ )
214+
215
216=== renamed file 'setup.py' => 'setup.py.moved'
217=== added file 'statsd.tac'
218--- statsd.tac 1970-01-01 00:00:00 +0000
219+++ statsd.tac 2012-05-14 16:57:19 +0000
220@@ -0,0 +1,8 @@
221+from twisted.application.service import Application
222+
223+from txstatsd import service
224+
225+application = Application("statsd")
226+
227+statsd_service = service.createService(service.StatsDOptions())
228+statsd_service.setServiceParent(application)
229
230=== renamed file 'statsd.tac' => 'statsd.tac.moved'
231=== added directory 'twisted'
232=== renamed directory 'twisted' => 'twisted.moved'
233=== added directory 'twisted/plugins'
234=== added file 'twisted/plugins/derive_plugin.py'
235--- twisted/plugins/derive_plugin.py 1970-01-01 00:00:00 +0000
236+++ twisted/plugins/derive_plugin.py 2012-05-14 16:57:19 +0000
237@@ -0,0 +1,69 @@
238+from zope.interface import implements
239+
240+from twisted.plugin import IPlugin
241+from txstatsd.itxstatsd import IMetric, IMetricFactory
242+from txstatsd.metrics.metric import Metric
243+
244+import logging, time
245+
246+
247+class DeriveMetricReporter(object):
248+ """
249+ A simplier meter metric which measures instant throughput rate for each interval.
250+ """
251+ implements(IMetric)
252+
253+ def __init__(self, name, wall_time_func=time.time, prefix=""):
254+ """Construct a metric we expect to be periodically updated.
255+
256+ @param name: Indicates what is being instrumented.
257+ @param wall_time_func: Function for obtaining wall time.
258+ """
259+ self.name = name
260+ self.wall_time_func = wall_time_func
261+ if prefix: prefix += "."
262+ self.prefix = prefix
263+ self.update = self.count = 0
264+ self.poll_time = self.wall_time_func()
265+
266+ def process(self, fields):
267+ """
268+ Process new data for this metric.
269+
270+ @type fields: C{list}
271+ @param fields: The list of message parts. Usually in the form of
272+ (value, metric_type, [sample_rate])
273+ """
274+ val, k = fields
275+ self.update += float(val)
276+
277+ def flush(self, interval, timestamp):
278+ """
279+ Returns a string with new line separated list of metrics to report.
280+
281+ @type interval: C{float}
282+ @param interval: The time since last flush.
283+ @type timestamp: C{float}
284+ @param timestamp: The timestamp for now.
285+ """
286+ poll_prev, self.poll_time = self.poll_time, self.wall_time_func()
287+ if self.poll_time == poll_prev: return list()
288+ rate = float(self.update) / (self.poll_time - poll_prev)
289+ self.count, self.update = self.count + self.update, 0
290+ return list(
291+ (self.prefix + self.name + "." + item, round(value, 6), timestamp)
292+ for item, value in [("count", self.count), ("rate", rate)] )
293+
294+
295+class DeriveMetricFactory(object):
296+ implements(IMetricFactory, IPlugin)
297+
298+ name = "derive"
299+ metric_type = "dx"
300+
301+ def build_metric(self, prefix, name, wall_time_func=None):
302+ return DeriveMetricReporter(name, prefix=prefix, wall_time_func=wall_time_func)
303+
304+ def configure(self, options): pass
305+
306+derive_metric_factory = DeriveMetricFactory()
307
308=== added file 'twisted/plugins/distinct_plugin.py'
309--- twisted/plugins/distinct_plugin.py 1970-01-01 00:00:00 +0000
310+++ twisted/plugins/distinct_plugin.py 2012-05-14 16:57:19 +0000
311@@ -0,0 +1,20 @@
312+from zope.interface import implements
313+
314+from twisted.plugin import IPlugin
315+from txstatsd.itxstatsd import IMetricFactory
316+from txstatsd.metrics.distinctmetric import DistinctMetricReporter
317+
318+class DistinctMetricFactory(object):
319+ implements(IMetricFactory, IPlugin)
320+
321+ name = "pdistinct"
322+ metric_type = "pd"
323+
324+ def build_metric(self, prefix, name, wall_time_func=None):
325+ return DistinctMetricReporter(name, prefix=prefix,
326+ wall_time_func=wall_time_func)
327+
328+ def configure(self, options):
329+ pass
330+
331+distinct_metric_factory = DistinctMetricFactory()
332
333=== added file 'twisted/plugins/txstatsd_plugin.py'
334--- twisted/plugins/txstatsd_plugin.py 1970-01-01 00:00:00 +0000
335+++ twisted/plugins/txstatsd_plugin.py 2012-05-14 16:57:19 +0000
336@@ -0,0 +1,22 @@
337+from zope.interface import implements
338+
339+from twisted.plugin import IPlugin
340+from twisted.application.service import IServiceMaker
341+
342+from txstatsd import service
343+
344+
345+class StatsDServiceMaker(object):
346+ implements(IServiceMaker, IPlugin)
347+ tapname = "statsd"
348+ description = "Collect and aggregate stats for graphite."
349+ options = service.StatsDOptions
350+
351+ def makeService(self, options):
352+ """
353+ Construct a StatsD service.
354+ """
355+ return service.createService(options)
356+
357+# Now construct an object which *provides* the relevant interfaces
358+serviceMaker = StatsDServiceMaker()
359
360=== added directory 'txstatsd'
361=== added file 'txstatsd.conf-example'
362--- txstatsd.conf-example 1970-01-01 00:00:00 +0000
363+++ txstatsd.conf-example 2012-05-14 16:57:19 +0000
364@@ -0,0 +1,26 @@
365+[statsd]
366+# The host where carbon cache is listening.
367+carbon-cache-host: localhost
368+# The port where carbon cache is listening.
369+carbon-cache-port: 2003
370+# The UDP port where we will listen.
371+listen-port: 8125
372+
373+# The number of milliseconds between each flush.
374+flush-interval: 60000
375+
376+# Which additional stats to report {process|net|io|system}.
377+report:
378+# Prefix to use when reporting stats.
379+prefix:
380+# Produce StatsD-compliant messages.
381+statsd-compliance: 1
382+
383+# Support application monitoring. UDP echo is initially supported.
384+# Should we receive the monitor-message, we respond with the
385+# configured monitor-response.
386+monitor-message: txstatsd ping
387+monitor-response: txstatsd pong
388+
389+[plugin_sample]
390+sample-key: sample-value
391\ No newline at end of file
392
393=== renamed file 'txstatsd.conf-example' => 'txstatsd.conf-example.moved'
394=== renamed directory 'txstatsd' => 'txstatsd.moved'
395=== added file 'txstatsd/__init__.py'
396=== added file 'txstatsd/client.py'
397--- txstatsd/client.py 1970-01-01 00:00:00 +0000
398+++ txstatsd/client.py 2012-05-14 16:57:19 +0000
399@@ -0,0 +1,165 @@
400+import socket
401+
402+from twisted.internet.defer import inlineCallbacks, returnValue
403+from twisted.internet.protocol import DatagramProtocol
404+from twisted.python import log
405+
406+
407+class StatsDClientProtocol(DatagramProtocol):
408+ """A Twisted-based implementation of the StatsD client protocol.
409+
410+ Data is sent via UDP to a StatsD server for aggregation.
411+ """
412+
413+ def __init__(self, client):
414+ self.client = client
415+
416+ def startProtocol(self):
417+ """Connect to destination host."""
418+ self.client.connect(self.transport)
419+
420+ def stopProtocol(self):
421+ """Connection was lost."""
422+ self.client.disconnect()
423+
424+
425+class TwistedStatsDClient(object):
426+
427+ def __init__(self, host, port,
428+ connect_callback=None,
429+ disconnect_callback=None,
430+ resolver_errback=None):
431+ """
432+ Build a connection that reports to the endpoint (on C{host} and
433+ C{port}) using UDP.
434+
435+ @param host: The StatsD server host.
436+ @param port: The StatsD server port.
437+ @param resolver_errback: The errback to invoke should
438+ issues occur resolving the supplied C{host}.
439+ @param connect_callback: The callback to invoke on connection.
440+ @param disconnect_callback: The callback to invoke on disconnection.
441+ """
442+ from twisted.internet import reactor
443+
444+ self.reactor = reactor
445+
446+ @inlineCallbacks
447+ def resolve(host):
448+ self.host = yield reactor.resolve(host)
449+ returnValue(self.host)
450+
451+ self.host = None
452+ self.resolver = resolve(host)
453+ if resolver_errback is None:
454+ self.resolver.addErrback(log.err)
455+ else:
456+ self.resolver.addErrback(resolver_errback)
457+
458+ self.port = port
459+ self.connect_callback = connect_callback
460+ self.disconnect_callback = disconnect_callback
461+
462+ self.transport = None
463+
464+ @inlineCallbacks
465+ def connect(self, transport=None):
466+ """Connect to the StatsD server."""
467+ host = yield self.resolver
468+ if host is not None:
469+ self.transport = transport
470+ if self.transport is not None:
471+ if self.connect_callback is not None:
472+ self.connect_callback()
473+
474+ def disconnect(self):
475+ """Disconnect from the StatsD server."""
476+ if self.disconnect_callback is not None:
477+ self.disconnect_callback()
478+ self.transport = None
479+
480+ def write(self, data, callback=None):
481+ """Send the metric to the StatsD server.
482+
483+ @param data: The data to be sent.
484+ @param callback: The callback to which the result should be sent.
485+ B{Note}: The C{callback} will be called in the C{reactor}
486+ thread, and not in the thread of the original caller.
487+ """
488+ self.reactor.callFromThread(self._write, data, callback)
489+
490+ def _write(self, data, callback):
491+ """Send the metric to the StatsD server.
492+
493+ @param data: The data to be sent.
494+ @param callback: The callback to which the result should be sent.
495+ @raise twisted.internet.error.MessageLengthError: If the size of data
496+ is too large.
497+ """
498+ if self.host is not None and self.transport is not None:
499+ try:
500+ bytes_sent = self.transport.write(data, (self.host, self.port))
501+ if callback is not None:
502+ callback(bytes_sent)
503+ except (OverflowError, TypeError, socket.error, socket.gaierror):
504+ if callback is not None:
505+ callback(None)
506+
507+
508+class UdpStatsDClient(object):
509+
510+ def __init__(self, host=None, port=None):
511+ """Build a connection that reports to C{host} and C{port})
512+ using UDP.
513+
514+ @param host: The StatsD host.
515+ @param port: The StatsD port.
516+ @raise ValueError: If the C{host} and C{port} cannot be
517+ resolved (for the case where they are not C{None}).
518+ """
519+ self.host = host
520+ self.port = port
521+
522+ if host is not None and port is not None:
523+ try:
524+ self.host, self.port = socket.getaddrinfo(
525+ host, port, socket.AF_INET,
526+ socket.SOCK_DGRAM, socket.SOL_UDP)[0][4]
527+ except (TypeError, IndexError, socket.error, socket.gaierror):
528+ raise ValueError("The address cannot be resolved.")
529+
530+ self.socket = None
531+
532+ def connect(self):
533+ """Connect to the StatsD server."""
534+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
535+ self.socket.setblocking(0)
536+
537+ def disconnect(self):
538+ """Disconnect from the StatsD server."""
539+ if self.socket is not None:
540+ self.socket.close()
541+ self.socket = None
542+
543+ def write(self, data):
544+ """Send the metric to the StatsD server."""
545+ if self.host is None or self.port is None or self.socket is None:
546+ return
547+ try:
548+ return self.socket.sendto(data, (self.host, self.port))
549+ except (socket.error, socket.gaierror):
550+ return None
551+
552+
553+class InternalClient(object):
554+ """A connection that can be used inside the C{StatsD} daemon itself."""
555+
556+ def __init__(self, processor):
557+ """
558+ A connection that writes directly to the C{MessageProcessor}.
559+ """
560+ self._processor = processor
561+
562+ def write(self, data):
563+ """Write directly to the C{MessageProcessor}."""
564+ self._processor.process(data)
565
566=== added file 'txstatsd/itxstatsd.py'
567--- txstatsd/itxstatsd.py 1970-01-01 00:00:00 +0000
568+++ txstatsd/itxstatsd.py 2012-05-14 16:57:19 +0000
569@@ -0,0 +1,54 @@
570+from zope.interface import Interface, Attribute
571+
572+
573+class IMetricFactory(Interface):
574+ name = Attribute("""
575+ @type name C{str}
576+ @ivar name: The name of this kind of metric
577+ """)
578+
579+ metric_type = Attribute("""
580+ @type metric_type: C{str}
581+ @ivar metric_type: The string that will be used by clients to send
582+ metrics of this kind to the server.
583+ """)
584+
585+ def build_metric(prefix, name, wall_time_func=None):
586+ """
587+ Returns an object that implements the C{IMetric} interface for name.
588+
589+ @type prefix: C{str}
590+ @param prefix: The prefix used for reporting this metric.
591+ @type name: C{str}
592+ @param name: The name used for reporting this metric.
593+ """
594+
595+ def configure(options):
596+ """
597+ Configures the factory. Will be called at startup by the service
598+ factory.
599+
600+ @type options: C{twisted.python.usage.Options}
601+ @param options: The configuration options.
602+ """
603+
604+
605+class IMetric(Interface):
606+ def process(fields):
607+ """
608+ Process new data for this metric.
609+
610+ @type fields: C{list}
611+ @param fields: The list of message parts. Usually in the form of
612+ (value, metric_type, [sample_rate])
613+ """
614+
615+ def flush(interval, timestamp):
616+ """
617+ Returns a string with new line separated list of metrics to report.
618+
619+ @type interval: C{float}
620+ @param interval: The time since last flush.
621+ @type timestamp: C{float}
622+ @param timestamp: The timestamp for now.
623+ """
624
625=== added directory 'txstatsd/metrics'
626=== added file 'txstatsd/metrics/__init__.py'
627=== added file 'txstatsd/metrics/countermetric.py'
628--- txstatsd/metrics/countermetric.py 1970-01-01 00:00:00 +0000
629+++ txstatsd/metrics/countermetric.py 2012-05-14 16:57:19 +0000
630@@ -0,0 +1,67 @@
631+import math
632+
633+from txstatsd.metrics.metric import Metric
634+
635+
636+class CounterMetric(Metric):
637+ """An incrementing and decrementing counter metric."""
638+
639+ def __init__(self, connection, name, sample_rate=1):
640+ """Construct a metric that reports samples to the supplied
641+ C{connection}.
642+
643+ @param connection: The connection endpoint representing
644+ the StatsD server.
645+ @param name: Indicates what is being instrumented.
646+ @param sample_rate: Restrict the number of samples sent
647+ to the StatsD server based on the supplied C{sample_rate}.
648+ """
649+ Metric.__init__(self, connection, name, sample_rate=sample_rate)
650+
651+ self._count = 0
652+
653+ def increment(self, value):
654+ """Increment the counter by C{value}"""
655+ self._count += value
656+ self._update(self._count)
657+
658+ def decrement(self, value):
659+ """Decrement the counter by C{value}"""
660+ self._count -= value
661+ self._update(self._count)
662+
663+ def count(self):
664+ """Returns the counter's current value."""
665+ return self._count
666+
667+ def clear(self):
668+ """Resets the counter to 0."""
669+ self._count = 0
670+ self._update(self._count)
671+
672+ def _update(self, value):
673+ """Report the counter."""
674+ self.send("%s|c" % value)
675+
676+
677+class CounterMetricReporter(object):
678+ """An incrementing and decrementing counter metric."""
679+
680+ def __init__(self, name, prefix=""):
681+ """Construct a metric we expect to be periodically updated.
682+
683+ @param name: Indicates what is being instrumented.
684+ """
685+ self.name = name
686+
687+ if prefix:
688+ prefix += "."
689+ self.prefix = prefix
690+ self.count = 0
691+
692+ def mark(self, value):
693+ self.count = value
694+
695+ def report(self, timestamp):
696+ return [(self.prefix + self.name + ".count",
697+ math.trunc(self.count), timestamp)]
698
699=== added file 'txstatsd/metrics/distinctmetric.py'
700--- txstatsd/metrics/distinctmetric.py 1970-01-01 00:00:00 +0000
701+++ txstatsd/metrics/distinctmetric.py 2012-05-14 16:57:19 +0000
702@@ -0,0 +1,150 @@
703+# Copyright (C) 2011 Canonical
704+# All Rights Reserved
705+"""
706+Implements a probabilistic distinct counter with sliding windows.
707+
708+Based on:
709+http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.12.7100
710+
711+And extended for sliding windows.
712+"""
713+import random
714+import time
715+
716+from zope.interface import implements
717+
718+from txstatsd.metrics.metric import Metric
719+from txstatsd.itxstatsd import IMetric
720+
721+
722+class SBoxHash(object):
723+ """A very fast hash.
724+
725+ This class create a random hash function that is very fast.
726+ Based on SBOXes. Not Crypto Strong.
727+
728+ Two instances of this class will hash differently.
729+ """
730+
731+ def __init__(self):
732+ self.table = [random.randint(0, 0xFFFFFFFF - 1) for i in range(256)]
733+
734+ def hash(self, data):
735+ value = 0
736+ for c in data:
737+ value = value ^ self.table[ord(c)]
738+ value = value * 3
739+ value = value & 0xFFFFFFFF
740+ return value
741+
742+
743+def hash(data):
744+ """Hash data using a random hasher."""
745+ p = SBoxHash()
746+ return p.hash(data)
747+
748+
749+def zeros(n):
750+ """Count the zeros to the right of the binary representation of n."""
751+ count = 0
752+ i = 0
753+ while True:
754+ v = (n >> i)
755+ if v <= 0:
756+ return count
757+ if v & 1:
758+ return count
759+ count += 1
760+ i += 1
761+ return count
762+
763+
764+class SlidingDistinctCounter(object):
765+ """A probabilistic distinct counter with sliding windows."""
766+
767+ def __init__(self, n_hashes, n_buckets):
768+ self.n_hashes = n_hashes
769+ self.n_buckets = n_buckets
770+
771+ self.hashes = [SBoxHash() for i in range(n_hashes)]
772+ self.buckets = [[0] * n_buckets for i in range(n_hashes)]
773+
774+ def add(self, when, item):
775+ hashes = (h.hash(item) for h in self.hashes)
776+ for i, value in enumerate(hashes):
777+ self.buckets[i][min(self.n_buckets - 1, zeros(value))] = when
778+
779+ def distinct(self, since=0):
780+ total = 0.0
781+ for i in range(self.n_hashes):
782+ least0 = 0
783+ for b in range(self.n_buckets):
784+ if self.buckets[i][b] <= since:
785+ break
786+ least0 += 1
787+ total += least0
788+ v = total / self.n_hashes
789+ return int((2 ** v) / 0.77351)
790+
791+
792+class DistinctMetric(Metric):
793+ """
794+ Keeps an estimate of the distinct numbers of items seen on various
795+ sliding windows of time.
796+ """
797+
798+ def mark(self, item):
799+ """Report this item was seen."""
800+ self.send("%s|d" % item)
801+
802+
803+class DistinctMetricReporter(object):
804+ """
805+ Keeps an estimate of the distinct numbers of items seen on various
806+ sliding windows of time.
807+ """
808+ implements(IMetric)
809+
810+ def __init__(self, name, wall_time_func=time.time, prefix=""):
811+ """Construct a metric we expect to be periodically updated.
812+
813+ @param name: Indicates what is being instrumented.
814+ @param wall_time_func: Function for obtaining wall time.
815+ @param prefix: If present, a string to prepend to the message
816+ composed when C{report} is called.
817+ """
818+ self.name = name
819+ self.wall_time_func = wall_time_func
820+ self.counter = SlidingDistinctCounter(32, 32)
821+ if prefix:
822+ prefix += "."
823+ self.prefix = prefix
824+
825+ def count(self):
826+ return self.counter.distinct()
827+
828+ def count_1min(self, now):
829+ return self.counter.distinct(now - 60)
830+
831+ def count_1hour(self, now):
832+ return self.counter.distinct(now - 60 * 60)
833+
834+ def count_1day(self, now):
835+ return self.counter.distinct(now - 60 * 60 * 24)
836+
837+ def process(self, fields):
838+ self.update(fields[0])
839+
840+ def update(self, item):
841+ self.counter.add(self.wall_time_func(), item)
842+
843+ def flush(self, interval, timestamp):
844+ now = self.wall_time_func()
845+ metrics = []
846+ items = {".count": self.count(),
847+ ".count_1min": self.count_1min(now),
848+ ".count_1hour": self.count_1hour(now),
849+ ".count_1day": self.count_1day(now)}
850+ for item, value in items.iteritems():
851+ metrics.append((self.prefix + self.name + item, value, timestamp))
852+ return metrics
853
854=== added file 'txstatsd/metrics/extendedmetrics.py'
855--- txstatsd/metrics/extendedmetrics.py 1970-01-01 00:00:00 +0000
856+++ txstatsd/metrics/extendedmetrics.py 2012-05-14 16:57:19 +0000
857@@ -0,0 +1,53 @@
858+
859+from txstatsd.metrics.countermetric import CounterMetric
860+from txstatsd.metrics.timermetric import TimerMetric
861+from txstatsd.metrics.metrics import Metrics
862+
863+
864+class ExtendedMetrics(Metrics):
865+
866+ def __init__(self, connection=None, namespace=""):
867+ """A convenience class for reporting metric samples
868+ to a C{txstatsd} server configured with the
869+ L{ConfigurableProcessor<txstatsd.server.configurableprocessor>}
870+ processor.
871+
872+ @param connection: The connection endpoint representing
873+ the C{txstatsd} server.
874+ @param namespace: The top-level namespace identifying the
875+ origin of the samples.
876+ """
877+
878+ super(ExtendedMetrics, self).__init__(connection, namespace)
879+
880+ def increment(self, name, value=1, sample_rate=1):
881+ """Report and increase in name by count."""
882+ name = self.fully_qualify_name(name)
883+ if not name in self._metrics:
884+ metric = CounterMetric(self.connection,
885+ name,
886+ sample_rate)
887+ self._metrics[name] = metric
888+ self._metrics[name].increment(value)
889+
890+ def decrement(self, name, value=1, sample_rate=1):
891+ """Report and decrease in name by count."""
892+ name = self.fully_qualify_name(name)
893+ if not name in self._metrics:
894+ metric = CounterMetric(self.connection,
895+ name,
896+ sample_rate)
897+ self._metrics[name] = metric
898+ self._metrics[name].decrement(value)
899+
900+ def timing(self, name, duration=None, sample_rate=1):
901+ """Report this sample performed in duration seconds."""
902+ if duration is None:
903+ duration = self.calculate_duration()
904+ name = self.fully_qualify_name(name)
905+ if not name in self._metrics:
906+ metric = TimerMetric(self.connection,
907+ name,
908+ sample_rate)
909+ self._metrics[name] = metric
910+ self._metrics[name].mark(duration)
911
912=== added file 'txstatsd/metrics/gaugemetric.py'
913--- txstatsd/metrics/gaugemetric.py 1970-01-01 00:00:00 +0000
914+++ txstatsd/metrics/gaugemetric.py 2012-05-14 16:57:19 +0000
915@@ -0,0 +1,43 @@
916+from txstatsd.metrics.metric import Metric
917+
918+
919+class GaugeMetric(Metric):
920+ """A gauge metric is an instantaneous reading of a particular value."""
921+
922+ def __init__(self, connection, name, sample_rate=1):
923+ """Construct a metric that reports samples to the supplied
924+ C{connection}.
925+
926+ @param connection: The connection endpoint representing
927+ the StatsD server.
928+ @param name: Indicates what is being instrumented.
929+ @param sample_rate: Restrict the number of samples sent
930+ to the StatsD server based on the supplied C{sample_rate}.
931+ """
932+ Metric.__init__(self, connection, name, sample_rate=sample_rate)
933+
934+ def mark(self, value):
935+ """Report the C{value} for this gauge."""
936+ self.send("%s|g" % value)
937+
938+
939+class GaugeMetricReporter(object):
940+ """A gauge metric is an instantaneous reading of a particular value."""
941+
942+ def __init__(self, name, prefix=""):
943+ """Construct a metric we expect to be periodically updated.
944+
945+ @param name: Indicates what is being instrumented.
946+ """
947+ self.name = name
948+
949+ if prefix:
950+ prefix += "."
951+ self.prefix = prefix
952+ self.value = 0
953+
954+ def mark(self, value):
955+ self.value = value
956+
957+ def report(self, timestamp):
958+ return [(self.prefix + self.name + ".value", self.value, timestamp)]
959
960=== added file 'txstatsd/metrics/histogrammetric.py'
961--- txstatsd/metrics/histogrammetric.py 1970-01-01 00:00:00 +0000
962+++ txstatsd/metrics/histogrammetric.py 2012-05-14 16:57:19 +0000
963@@ -0,0 +1,209 @@
964+# -*- coding: utf-8 *-*
965+import math
966+
967+from txstatsd.stats.exponentiallydecayingsample \
968+ import ExponentiallyDecayingSample
969+from txstatsd.stats.uniformsample import UniformSample
970+
971+
972+class HistogramMetricReporter(object):
973+ """
974+ A metric which calculates the distribution of a value.
975+
976+ See:
977+ - U{Accurately computing running variance
978+ <http://www.johndcook.com/standard_deviation.html>}
979+ """
980+
981+ @classmethod
982+ def using_uniform_sample(cls, prefix=""):
983+ """
984+ Uses a uniform sample of 1028 elements, which offers a 99.9%
985+ confidence level with a 5% margin of error assuming a normal
986+ distribution.
987+ """
988+ sample = UniformSample(1028)
989+ return HistogramMetricReporter(sample, prefix=prefix)
990+
991+ @classmethod
992+ def using_exponentially_decaying_sample(cls, prefix=""):
993+ """
994+ Uses an exponentially decaying sample of 1028 elements, which offers
995+ a 99.9% confidence level with a 5% margin of error assuming a normal
996+ distribution, and an alpha factor of 0.015, which heavily biases
997+ the sample to the past 5 minutes of measurements.
998+ """
999+ sample = ExponentiallyDecayingSample(1028, 0.015)
1000+ return HistogramMetricReporter(sample, prefix=prefix)
1001+
1002+ def __init__(self, sample, prefix=""):
1003+ """Creates a new HistogramMetric with the given sample.
1004+
1005+ @param sample: The sample to create a histogram from.
1006+ """
1007+ self.sample = sample
1008+
1009+ if prefix:
1010+ prefix += "."
1011+ self.prefix = prefix
1012+
1013+ self._min = 0
1014+ self._max = 0
1015+ self._sum = 0
1016+
1017+ # These are for the Welford algorithm for calculating running
1018+ # variance without floating-point doom.
1019+ self.variance = [-1.0, 0.0] # M, S
1020+ self.count = 0
1021+
1022+ self.clear()
1023+
1024+ def clear(self):
1025+ """Clears all recorded values."""
1026+ self.sample.clear()
1027+ self.count = 0
1028+ self._max = None
1029+ self._min = None
1030+ self._sum = 0
1031+ self.variance = [-1.0, 0.0]
1032+
1033+ def update(self, value, name=""):
1034+ """Adds a recorded value.
1035+
1036+ @param value: The length of the value.
1037+ """
1038+ self.count += 1
1039+ self.sample.update(value)
1040+ self.set_max(value)
1041+ self.set_min(value)
1042+ self._sum += value
1043+ self.update_variance(value)
1044+
1045+ def report(self, timestamp):
1046+ # median, 75, 95, 98, 99, 99.9 percentile
1047+ percentiles = self.percentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999)
1048+ metrics = []
1049+ items = {
1050+ ".min": self.min(),
1051+ ".max": self.max(),
1052+ ".mean": self.mean(),
1053+ ".stddev": self.std_dev(),
1054+ ".median": percentiles[0],
1055+ ".75percentile": percentiles[1],
1056+ ".95percentile": percentiles[2],
1057+ ".98percentile": percentiles[3],
1058+ ".99percentile": percentiles[4],
1059+ ".999percentile": percentiles[5]}
1060+
1061+ for item, value in items.itervalues():
1062+ metrics.append((self.prefix + self.name + item, value, timestamp))
1063+ return metrics
1064+
1065+ def min(self):
1066+ """Returns the smallest recorded value."""
1067+ return self._min if self.count > 0 else 0.0
1068+
1069+ def max(self):
1070+ """Returns the largest recorded value."""
1071+ return self._max if self.count > 0 else 0.0
1072+
1073+ def mean(self):
1074+ """Returns the arithmetic mean of all recorded values."""
1075+ return float(self._sum) / self.count if self.count > 0 else 0.0
1076+
1077+ def std_dev(self):
1078+ """Returns the standard deviation of all recorded values."""
1079+ return math.sqrt(self.get_variance()) if self.count > 0 else 0.0
1080+
1081+ def percentiles(self, *percentiles):
1082+ """Returns a list of values at the given percentiles.
1083+
1084+ @param percentiles one or more percentiles
1085+ """
1086+
1087+ scores = [0.0] * len(percentiles)
1088+ if self.count > 0:
1089+ values = self.sample.get_values()
1090+ values.sort()
1091+
1092+ for i in range(len(percentiles)):
1093+ p = percentiles[i]
1094+ pos = p * (len(values) + 1)
1095+ if pos < 1:
1096+ scores[i] = values[0]
1097+ elif pos >= len(values):
1098+ scores[i] = values[-1]
1099+ else:
1100+ lower = values[int(pos) - 1]
1101+ upper = values[int(pos)]
1102+ scores[i] = lower + (pos - math.floor(pos)) * (
1103+ upper - lower)
1104+
1105+ return scores
1106+
1107+ def histogram(self):
1108+ """Returns an histogram of the sample.
1109+ """
1110+
1111+ # If we dont have data, build an empty histogram.
1112+ if not self.count > 0:
1113+ return [0.0] * 10
1114+
1115+ # Sturges Rule for selecting the number of bins
1116+ # Sturges, H. A. (1926) The choice of a class interval.
1117+ # Journal of the American Statistical Association 21, 65–66.
1118+ n_bins = int(math.ceil(1 + math.log(self.count, 2)))
1119+
1120+ scores = [0.0] * n_bins
1121+
1122+ values = self.sample.get_values()
1123+ max_value = float(max(values))
1124+ min_value = float(min(values))
1125+ value_range = max_value - min_value
1126+
1127+ for value in values:
1128+ pos = int(((value - min_value) / value_range) * n_bins)
1129+ if pos == n_bins:
1130+ pos -= 1
1131+
1132+ scores[pos] += 1
1133+ return scores
1134+
1135+ def get_values(self):
1136+ """Returns a list of all values in the histogram's sample."""
1137+ return self.sample.get_values()
1138+
1139+ def get_variance(self):
1140+ if self.count <= 1:
1141+ return 0.0
1142+ return self.variance[1] / (self.count - 1)
1143+
1144+ def set_max(self, potential_max):
1145+ if self._max is None:
1146+ self._max = potential_max
1147+ else:
1148+ self._max = max(self.max(), potential_max)
1149+
1150+ def set_min(self, potential_min):
1151+ if self._min is None:
1152+ self._min = potential_min
1153+ else:
1154+ self._min = min(self.min(), potential_min)
1155+
1156+ def update_variance(self, value):
1157+ old_values = self.variance
1158+ new_values = [0.0, 0.0]
1159+ if old_values[0] == -1:
1160+ new_values[0] = value
1161+ new_values[1] = 0.0
1162+ else:
1163+ old_m = old_values[0]
1164+ old_s = old_values[1]
1165+
1166+ new_m = old_m + (float(value - old_m) / self.count)
1167+ new_s = old_s + (float(value - old_m) * (value - new_m))
1168+
1169+ new_values[0] = new_m
1170+ new_values[1] = new_s
1171+
1172+ self.variance = new_values
1173
1174=== added file 'txstatsd/metrics/imetrics.py'
1175--- txstatsd/metrics/imetrics.py 1970-01-01 00:00:00 +0000
1176+++ txstatsd/metrics/imetrics.py 2012-05-14 16:57:19 +0000
1177@@ -0,0 +1,18 @@
1178+from zope.interface import Interface
1179+
1180+class IMetrics(Interface):
1181+ """
1182+ Provides a global utility for sending out metric information.
1183+ """
1184+
1185+ def gauge(name, value, sample_rate=1):
1186+ """Record an absolute reading for C{name} with C{value}."""
1187+
1188+ def increment(name, value=1, sample_rate=1):
1189+ """Increment counter C{name} by C{count}."""
1190+
1191+ def decrement(name, value=1, sample_rate=1):
1192+ """Decrement counter C{name} by C{count}."""
1193+
1194+ def timing(name, duration=None, sample_rate=1):
1195+ """Report that C{name} took C{duration} seconds."""
1196
1197=== added file 'txstatsd/metrics/metermetric.py'
1198--- txstatsd/metrics/metermetric.py 1970-01-01 00:00:00 +0000
1199+++ txstatsd/metrics/metermetric.py 2012-05-14 16:57:19 +0000
1200@@ -0,0 +1,103 @@
1201+import time
1202+
1203+from txstatsd.metrics.metric import Metric
1204+from txstatsd.stats.ewma import Ewma
1205+
1206+
1207+class MeterMetric(Metric):
1208+ """
1209+ A meter metric which measures mean throughput and one-, five-, and
1210+ fifteen-minute exponentially-weighted moving average throughputs.
1211+
1212+ See:
1213+ - U{EMA
1214+ <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>}
1215+ """
1216+
1217+ def __init__(self, connection, name, sample_rate=1):
1218+ """Construct a metric that reports samples to the supplied
1219+ C{connection}.
1220+
1221+ @param connection: The connection endpoint representing
1222+ the StatsD server.
1223+ @param name: Indicates what is being instrumented.
1224+ @param sample_rate: Restrict the number of samples sent
1225+ to the StatsD server based on the supplied C{sample_rate}.
1226+ """
1227+ Metric.__init__(self, connection, name, sample_rate=sample_rate)
1228+
1229+ def mark(self, value=1):
1230+ """Mark the occurrence of a given number (C{value}) of events."""
1231+ self.send("%s|m" % value)
1232+
1233+
1234+class MeterMetricReporter(object):
1235+ """
1236+ A meter metric which measures mean throughput and one-, five-, and
1237+ fifteen-minute exponentially-weighted moving average throughputs.
1238+
1239+ See:
1240+ - U{EMA
1241+ <http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average>}
1242+ """
1243+
1244+ def __init__(self, name, wall_time_func=time.time, prefix=""):
1245+ """Construct a metric we expect to be periodically updated.
1246+
1247+ @param name: Indicates what is being instrumented.
1248+ @param wall_time_func: Function for obtaining wall time.
1249+ """
1250+ self.name = name
1251+ self.wall_time_func = wall_time_func
1252+
1253+ if prefix:
1254+ prefix += "."
1255+ self.prefix = prefix
1256+
1257+ self.m1_rate = Ewma.one_minute_ewma()
1258+ self.m5_rate = Ewma.five_minute_ewma()
1259+ self.m15_rate = Ewma.fifteen_minute_ewma()
1260+ self.count = 0
1261+ self.start_time = self.wall_time_func()
1262+
1263+ def mark(self, value=1):
1264+ """Mark the occurrence of a given number of events."""
1265+ self.count += value
1266+ self.m1_rate.update(value)
1267+ self.m5_rate.update(value)
1268+ self.m15_rate.update(value)
1269+
1270+ def tick(self):
1271+ """Updates the moving averages."""
1272+ self.m1_rate.tick()
1273+ self.m5_rate.tick()
1274+ self.m15_rate.tick()
1275+
1276+ def report(self, timestamp):
1277+ metrics = []
1278+ items = {".count": self.count,
1279+ ".mean_rate": self.mean_rate(),
1280+ ".1min_rate": self.one_minute_rate(),
1281+ ".5min_rate": self.five_minute_rate(),
1282+ ".15min_rate": self.fifteen_minute_rate()}
1283+
1284+ for item, value in items.iteritems():
1285+ metrics.append((self.prefix + self.name + item,
1286+ round(value, 6), timestamp))
1287+ return metrics
1288+
1289+ def fifteen_minute_rate(self):
1290+ return self.m15_rate.rate
1291+
1292+ def five_minute_rate(self):
1293+ return self.m5_rate.rate
1294+
1295+ def one_minute_rate(self):
1296+ return self.m1_rate.rate
1297+
1298+ def mean_rate(self):
1299+ if self.count == 0:
1300+ return 0.0
1301+ else:
1302+ elapsed = self.wall_time_func() - self.start_time
1303+ return float(self.count) / elapsed
1304
1305=== added file 'txstatsd/metrics/metric.py'
1306--- txstatsd/metrics/metric.py 1970-01-01 00:00:00 +0000
1307+++ txstatsd/metrics/metric.py 2012-05-14 16:57:19 +0000
1308@@ -0,0 +1,47 @@
1309+
1310+import random
1311+
1312+
1313+class Metric(object):
1314+ """
1315+ The foundation metric from which the specialized
1316+ metrics are derived.
1317+ """
1318+
1319+ def __init__(self, connection, name, sample_rate=1):
1320+ """Construct a metric that reports samples to the supplied
1321+ C{connection}.
1322+
1323+ @param connection: The connection endpoint representing
1324+ the StatsD server.
1325+ @param name: Specific description for this metric.
1326+ @param sample_rate: Restrict the number of samples sent
1327+ to the StatsD server based on the supplied C{sample_rate}.
1328+ """
1329+ self.connection = connection
1330+ self.name = name
1331+ self.sample_rate = sample_rate
1332+
1333+ def clear(self):
1334+ """Responsibility of the specialized metrics."""
1335+ pass
1336+
1337+ def send(self, data):
1338+ """
1339+ Message the C{data} to the C{StatsD} server according to the
1340+ C{sample_rate}.
1341+ """
1342+
1343+ if self.sample_rate < 1:
1344+ if random.random() > self.sample_rate:
1345+ return
1346+ data += "|@%s" % (self.sample_rate,)
1347+
1348+ data = self.name + ":" + data
1349+
1350+ self.write(data)
1351+
1352+ def write(self, data):
1353+ """Message the C{data} to the C{StatsD} server."""
1354+ if self.connection is not None:
1355+ self.connection.write(data)
1356
1357=== added file 'txstatsd/metrics/metrics.py'
1358--- txstatsd/metrics/metrics.py 1970-01-01 00:00:00 +0000
1359+++ txstatsd/metrics/metrics.py 2012-05-14 16:57:19 +0000
1360@@ -0,0 +1,140 @@
1361+
1362+import time
1363+from txstatsd.metrics.gaugemetric import GaugeMetric
1364+from txstatsd.metrics.metermetric import MeterMetric
1365+from txstatsd.metrics.distinctmetric import DistinctMetric
1366+from txstatsd.metrics.metric import Metric
1367+
1368+
1369+class GenericMetric(Metric):
1370+ def __init__(self, connection, key, name, sample_rate=1):
1371+ super(GenericMetric, self).__init__(connection, name,
1372+ sample_rate=sample_rate)
1373+ self.key = key
1374+
1375+ def mark(self, value):
1376+ self.send("%s|%s" % (value, self.key))
1377+
1378+
1379+class Metrics(object):
1380+ def __init__(self, connection=None, namespace=""):
1381+ """A convenience class for reporting metric samples
1382+ to a StatsD server (C{connection}).
1383+
1384+ @param connection: The connection endpoint representing
1385+ the StatsD server.
1386+ @param namespace: The top-level namespace identifying the
1387+ origin of the samples.
1388+ """
1389+
1390+ self.connection = connection
1391+ self.namespace = namespace
1392+ self._metrics = {}
1393+ self.last_time = 0
1394+
1395+ def report(self, name, value, metric_type, sample_rate=1):
1396+ """Report a generic metric.
1397+
1398+ Used for server side plugins without client support.
1399+ """
1400+ name = self.fully_qualify_name(name)
1401+ if not name in self._metrics:
1402+ metric = GenericMetric(self.connection,
1403+ metric_type,
1404+ name,
1405+ sample_rate)
1406+ self._metrics[name] = metric
1407+ self._metrics[name].mark(value)
1408+
1409+ def gauge(self, name, value, sample_rate=1):
1410+ """Report an instantaneous reading of a particular value."""
1411+ name = self.fully_qualify_name(name)
1412+ if not name in self._metrics:
1413+ gauge_metric = GaugeMetric(self.connection,
1414+ name,
1415+ sample_rate)
1416+ self._metrics[name] = gauge_metric
1417+ self._metrics[name].mark(value)
1418+
1419+ def meter(self, name, value=1, sample_rate=1):
1420+ """Mark the occurrence of a given number of events."""
1421+ name = self.fully_qualify_name(name)
1422+ if not name in self._metrics:
1423+ meter_metric = MeterMetric(self.connection,
1424+ name,
1425+ sample_rate)
1426+ self._metrics[name] = meter_metric
1427+ self._metrics[name].mark(value)
1428+
1429+ def increment(self, name, value=1, sample_rate=1):
1430+ """Report and increase in name by count."""
1431+ name = self.fully_qualify_name(name)
1432+ if not name in self._metrics:
1433+ metric = Metric(self.connection,
1434+ name,
1435+ sample_rate)
1436+ self._metrics[name] = metric
1437+ self._metrics[name].send("%s|c" % value)
1438+
1439+ def decrement(self, name, value=1, sample_rate=1):
1440+ """Report and decrease in name by count."""
1441+ name = self.fully_qualify_name(name)
1442+ if not name in self._metrics:
1443+ metric = Metric(self.connection,
1444+ name,
1445+ sample_rate)
1446+ self._metrics[name] = metric
1447+ self._metrics[name].send("%s|c" % -value)
1448+
1449+ def reset_timing(self):
1450+ """Resets the duration timer for the next call to timing()"""
1451+ self.last_time = time.time()
1452+
1453+ def calculate_duration(self):
1454+ """Resets the duration timer and returns the elapsed duration"""
1455+ current_time = time.time()
1456+ duration = current_time - self.last_time
1457+ self.last_time = current_time
1458+ return duration
1459+
1460+ def timing(self, name, duration=None, sample_rate=1):
1461+ """Report that this sample performed in duration seconds.
1462+ Default duration is the actual elapsed time since
1463+ the last call to this method or reset_timing()"""
1464+ if duration is None:
1465+ duration = self.calculate_duration()
1466+ name = self.fully_qualify_name(name)
1467+ if not name in self._metrics:
1468+ metric = Metric(self.connection,
1469+ name,
1470+ sample_rate)
1471+ self._metrics[name] = metric
1472+ self._metrics[name].send("%s|ms" % (duration * 1000))
1473+
1474+ def distinct(self, name, item):
1475+ name = self.fully_qualify_name(name)
1476+ if not name in self._metrics:
1477+ metric = DistinctMetric(self.connection, name)
1478+ self._metrics[name] = metric
1479+ self._metrics[name].mark(item)
1480+
1481+ def clear(self, name):
1482+ """Allow the metric to re-initialize its internal state."""
1483+ name = self.fully_qualify_name(name)
1484+ if name in self._metrics:
1485+ metric = self._metrics[name]
1486+ if getattr(metric, 'clear', None) is not None:
1487+ metric.clear()
1488+
1489+ def fully_qualify_name(self, name):
1490+ """Compose the fully-qualified name: namespace and name."""
1491+ fully_qualified_name = ""
1492+ if self.namespace is not None:
1493+ fully_qualified_name = self.namespace
1494+ if name is not None:
1495+ # prepend the separator should we have a namespace
1496+ if self.namespace is not None and len(self.namespace) > 0:
1497+ fully_qualified_name += "." + name
1498+ else:
1499+ fully_qualified_name = name
1500+ return fully_qualified_name
1501
1502=== added file 'txstatsd/metrics/timermetric.py'
1503--- txstatsd/metrics/timermetric.py 1970-01-01 00:00:00 +0000
1504+++ txstatsd/metrics/timermetric.py 2012-05-14 16:57:19 +0000
1505@@ -0,0 +1,161 @@
1506+import json
1507+import time
1508+
1509+from twisted.web import resource
1510+
1511+from txstatsd.metrics.histogrammetric import HistogramMetricReporter
1512+from txstatsd.metrics.metermetric import MeterMetricReporter
1513+from txstatsd.metrics.metric import Metric
1514+from txstatsd.stats.exponentiallydecayingsample \
1515+ import ExponentiallyDecayingSample
1516+
1517+
1518+class TimerMetric(Metric):
1519+ """
1520+ A timer metric which aggregates timing durations and provides duration
1521+ statistics, plus throughput statistics via L{MeterMetric}.
1522+ """
1523+
1524+ def __init__(self, connection, name, sample_rate=1):
1525+ """Construct a metric that reports samples to the supplied
1526+ C{connection}.
1527+
1528+ @param connection: The connection endpoint representing
1529+ the StatsD server.
1530+ @param name: Indicates what is being instrumented.
1531+ @param sample_rate: Restrict the number of samples sent
1532+ to the StatsD server based on the supplied C{sample_rate}.
1533+ """
1534+ Metric.__init__(self, connection, name, sample_rate=sample_rate)
1535+
1536+ def mark(self, duration):
1537+ """Report this sample performed in duration (measured in seconds)."""
1538+ self.send("%s|ms" % (duration * 1000))
1539+
1540+
1541+class TimerResource(resource.Resource):
1542+ isLeaf = True
1543+
1544+ def __init__(self, reporter):
1545+ resource.Resource.__init__(self)
1546+ self.reporter = reporter
1547+
1548+ def render_GET(self, request):
1549+ result = dict(
1550+ histogram=self.reporter.histogram.histogram(),
1551+ max_value=self.reporter.max(),
1552+ min_value=self.reporter.min())
1553+ return json.dumps(result)
1554+
1555+
1556+class TimerMetricReporter(object):
1557+ """
1558+ A timer metric which aggregates timing durations and provides duration
1559+ statistics, plus throughput statistics via L{MeterMetricReporter}.
1560+ """
1561+
1562+ def __init__(self, name, wall_time_func=time.time, prefix=""):
1563+ """Construct a metric we expect to be periodically updated.
1564+
1565+ @param name: Indicates what is being instrumented.
1566+ @param wall_time_func: Function for obtaining wall time.
1567+ @param prefix: If present, a string to prepend to the message
1568+ composed when C{report} is called.
1569+ """
1570+ self.name = name
1571+ self.wall_time_func = wall_time_func
1572+
1573+ if prefix:
1574+ prefix += "."
1575+ self.prefix = prefix
1576+
1577+ sample = ExponentiallyDecayingSample(1028, 0.015)
1578+ self.histogram = HistogramMetricReporter(sample)
1579+ self.meter = MeterMetricReporter(
1580+ "calls", wall_time_func=self.wall_time_func)
1581+ self.clear()
1582+
1583+ def clear(self):
1584+ """Clears all recorded durations."""
1585+ self.histogram.clear()
1586+
1587+ def count(self):
1588+ return self.histogram.count
1589+
1590+ def fifteen_minute_rate(self):
1591+ return self.meter.fifteen_minute_rate()
1592+
1593+ def five_minute_rate(self):
1594+ return self.meter.five_minute_rate()
1595+
1596+ def mean_rate(self):
1597+ return self.meter.mean_rate()
1598+
1599+ def one_minute_rate(self):
1600+ return self.meter.one_minute_rate()
1601+
1602+ def max(self):
1603+ """Returns the longest recorded duration."""
1604+ return self.histogram.max()
1605+
1606+ def min(self):
1607+ """Returns the shortest recorded duration."""
1608+ return self.histogram.min()
1609+
1610+ def mean(self):
1611+ """Returns the arithmetic mean of all recorded durations."""
1612+ return self.histogram.mean()
1613+
1614+ def std_dev(self):
1615+ """Returns the standard deviation of all recorded durations."""
1616+ return self.histogram.std_dev()
1617+
1618+ def getResource(self):
1619+ """Return an http resource to represent this."""
1620+ return TimerResource(self)
1621+
1622+ def percentiles(self, *percentiles):
1623+ """
1624+ Returns an array of durations at the given percentiles.
1625+
1626+ @param percentiles: One or more percentiles.
1627+ """
1628+ return [percentile for percentile in
1629+ self.histogram.percentiles(*percentiles)]
1630+
1631+ def get_values(self):
1632+ """Returns a list of all recorded durations in the timer's sample."""
1633+ return [value for value in self.histogram.get_values()]
1634+
1635+ def update(self, duration):
1636+ """Adds a recorded duration.
1637+
1638+ @param duration: The length of the duration in seconds.
1639+ """
1640+ if duration >= 0:
1641+ self.histogram.update(duration)
1642+ self.meter.mark()
1643+
1644+ def tick(self):
1645+ """Updates the moving averages."""
1646+ self.meter.tick()
1647+
1648+ def report(self, timestamp):
1649+ # median, 75, 95, 98, 99, 99.9 percentile
1650+ percentiles = self.percentiles(0.5, 0.75, 0.95, 0.98, 0.99, 0.999)
1651+ metrics = []
1652+ items = {".min": self.min(),
1653+ ".max": self.max(),
1654+ ".mean": self.mean(),
1655+ ".stddev": self.std_dev(),
1656+ ".99percentile": percentiles[4],
1657+ ".999percentile": percentiles[5],
1658+ ".count": self.meter.count,
1659+ ".1min_rate": self.meter.one_minute_rate(),
1660+ ".5min_rate": self.meter.five_minute_rate(),
1661+ ".15min_rate": self.meter.fifteen_minute_rate()}
1662+
1663+ for item, value in items.iteritems():
1664+ metrics.append((self.prefix + self.name + item,
1665+ round(value, 6), timestamp))
1666+ return metrics
1667
1668=== added file 'txstatsd/process.py'
1669--- txstatsd/process.py 1970-01-01 00:00:00 +0000
1670+++ txstatsd/process.py 2012-05-14 16:57:19 +0000
1671@@ -0,0 +1,217 @@
1672+import os
1673+import socket
1674+import psutil
1675+
1676+from functools import update_wrapper
1677+
1678+
1679+MEMINFO_KEYS = ("MemTotal:", "MemFree:", "Buffers:",
1680+ "Cached:", "SwapCached:", "SwapTotal:",
1681+ "SwapFree:")
1682+
1683+MULTIPLIERS = {"kB": 1024, "mB": 1024 * 1024}
1684+
1685+
1686+def load_file(filename):
1687+ """Load a file into memory."""
1688+ with open(filename, "r") as f:
1689+ return f.read()
1690+
1691+
1692+def parse_meminfo(data, prefix="sys.mem"):
1693+ """Parse data from /proc/meminfo."""
1694+ result = {}
1695+
1696+ for line in data.split("\n"):
1697+ if not line:
1698+ continue
1699+ parts = [x for x in line.split(" ") if x]
1700+ if not parts[0] in MEMINFO_KEYS:
1701+ continue
1702+
1703+ multiple = 1
1704+
1705+ if len(parts) == 3:
1706+ multiple = MULTIPLIERS[parts[2]]
1707+
1708+ # remove ':'
1709+ label = parts[0][:-1]
1710+ amount = int(parts[1]) * multiple
1711+ result[prefix + "." + label] = amount
1712+
1713+ return result
1714+
1715+
1716+def parse_loadavg(data, prefix="sys.loadavg"):
1717+ """Parse data from /proc/loadavg."""
1718+ return dict(zip(
1719+ (prefix + ".oneminute",
1720+ prefix + ".fiveminutes",
1721+ prefix + ".fifthteenminutes"),
1722+ [float(x) for x in data.split()[:3]]))
1723+
1724+
1725+def parse_netdev(data, prefix="sys.net"):
1726+ """Parse data from /proc/net/dev."""
1727+ lines = data.splitlines()
1728+ # Parse out the column headers as keys.
1729+ _, receive_columns, transmit_columns = lines[1].split("|")
1730+ columns = ["recv_%s" % column for column in receive_columns.split()]
1731+ columns.extend(["send_%s" % column for column in transmit_columns.split()])
1732+
1733+ # Parse out the network devices.
1734+ result = {}
1735+ for line in lines[2:]:
1736+ if not ":" in line:
1737+ continue
1738+ device, data = line.split(":")
1739+ device = device.strip()
1740+ data = dict(zip(columns, map(int, data.split())))
1741+ result.update({
1742+ "%s.%s.bytes.received" % (prefix, device): data["recv_bytes"],
1743+ "%s.%s.bytes.sent" % (prefix, device): data["send_bytes"],
1744+ "%s.%s.packets.received" % (prefix, device): data["recv_packets"],
1745+ "%s.%s.packets.sent" % (prefix, device): data["send_packets"]})
1746+ return result
1747+
1748+
1749+class ProcessReport(object):
1750+
1751+ def __init__(self, process=None):
1752+ self._process = process
1753+
1754+ @property
1755+ def process(self):
1756+ """Override property with current process on first access."""
1757+ if self._process is None:
1758+ self._process = psutil.Process(os.getpid())
1759+ return self._process
1760+
1761+ def get_memory_and_cpu(self, prefix="proc"):
1762+ """Report memory and CPU stats for C{process}."""
1763+ vsize, rss = self.process.get_memory_info()
1764+ result = {prefix + ".cpu.percent": self.process.get_cpu_percent(),
1765+ prefix + ".memory.percent":
1766+ self.process.get_memory_percent(),
1767+ prefix + ".memory.vsize": vsize,
1768+ prefix + ".memory.rss": rss}
1769+ if getattr(self.process, "get_num_threads", None) is not None:
1770+ result[prefix + ".threads"] = self.process.get_num_threads()
1771+ return result
1772+
1773+ def get_cpu_counters(self, prefix="proc"):
1774+ """Report memory and CPU counters for C{process}."""
1775+ utime, stime = self.process.get_cpu_times()
1776+ result = {prefix + ".cpu.user": utime,
1777+ prefix + ".cpu.system": stime}
1778+ return result
1779+
1780+ def get_io_counters(self, prefix="proc.io"):
1781+ """Report IO statistics for C{process}."""
1782+ result = {}
1783+ if getattr(self.process, "get_io_counters", None) is not None:
1784+ (read_count, write_count,
1785+ read_bytes, write_bytes) = self.process.get_io_counters()
1786+ result.update({
1787+ prefix + ".read.count": read_count,
1788+ prefix + ".write.count": write_count,
1789+ prefix + ".read.bytes": read_bytes,
1790+ prefix + ".write.bytes": write_bytes})
1791+ return result
1792+
1793+ def get_net_stats(self, prefix="proc.net"):
1794+ """Report active connection statistics for C{process}."""
1795+ result = {}
1796+ if getattr(self.process, "get_connections", None) is not None:
1797+ for connection in self.process.get_connections():
1798+ fd, family, _type, laddr, raddr, status = connection
1799+ if _type == socket.SOCK_STREAM:
1800+ key = prefix + ".status.%s" % status.lower()
1801+ if not key in result:
1802+ result[key] = 1
1803+ else:
1804+ result[key] += 1
1805+ return result
1806+
1807+
1808+def report_counters(report_function):
1809+ """
1810+ Report difference between last value and current value for wrapped
1811+ function.
1812+ """
1813+ def generate():
1814+ last_value = None
1815+ while True:
1816+ result = {}
1817+ new_value = report_function()
1818+ if last_value is None:
1819+ last_value = new_value
1820+ else:
1821+ for key, value in new_value.iteritems():
1822+ result[key] = value - last_value[key]
1823+ last_value = new_value
1824+ yield result
1825+ generator = generate()
1826+ def report():
1827+ return generator.next()
1828+ update_wrapper(report, report_function)
1829+ return report
1830+
1831+
1832+process_report = ProcessReport()
1833+report_process_memory_and_cpu = process_report.get_memory_and_cpu
1834+report_process_cpu_counters = report_counters(process_report.get_cpu_counters)
1835+report_process_io_counters = report_counters(process_report.get_io_counters)
1836+report_process_net_stats = process_report.get_net_stats
1837+
1838+
1839+def report_system_stats(prefix="sys"):
1840+ cpu_times = psutil.cpu_times()._asdict()
1841+ system_stats = {}
1842+ for mode, time in cpu_times.iteritems():
1843+ system_stats[prefix + ".cpu." + mode] = time
1844+ return system_stats
1845+
1846+
1847+def report_threadpool_stats(threadpool, prefix="threadpool"):
1848+ """Report stats about a given threadpool."""
1849+ def report():
1850+ return {prefix + ".working": len(threadpool.working),
1851+ prefix + ".queue": threadpool.q.qsize(),
1852+ prefix + ".waiters": len(threadpool.waiters),
1853+ prefix + ".threads": len(threadpool.threads)}
1854+ update_wrapper(report, report_threadpool_stats)
1855+ return report
1856+
1857+
1858+def report_reactor_stats(reactor, prefix="reactor"):
1859+ """Report statistics about a twisted reactor."""
1860+ def report():
1861+ return {prefix + ".readers": len(reactor.getReaders()),
1862+ prefix + ".writers": len(reactor.getWriters())}
1863+
1864+ update_wrapper(report, report_reactor_stats)
1865+ return report
1866+
1867+
1868+def report_file_stats(filename, parser):
1869+ """Read statistics from a file and report them."""
1870+ def report():
1871+ return parser(load_file(filename))
1872+ update_wrapper(report, report_file_stats)
1873+ return report
1874+
1875+
1876+PROCESS_STATS = (report_process_memory_and_cpu,)
1877+
1878+COUNTER_STATS = (report_process_cpu_counters,)
1879+
1880+IO_STATS = (report_process_io_counters,)
1881+
1882+NET_STATS = (report_process_net_stats,)
1883+
1884+SYSTEM_STATS = (report_file_stats("/proc/meminfo", parse_meminfo),
1885+ report_file_stats("/proc/loadavg", parse_loadavg),
1886+ report_counters(report_file_stats("/proc/net/dev", parse_netdev)),
1887+ report_counters(report_system_stats),
1888+ )
1889
1890=== added file 'txstatsd/report.py'
1891--- txstatsd/report.py 1970-01-01 00:00:00 +0000
1892+++ txstatsd/report.py 2012-05-14 16:57:19 +0000
1893@@ -0,0 +1,64 @@
1894+from twisted.internet.defer import maybeDeferred
1895+from twisted.internet.task import LoopingCall
1896+from twisted.python import log
1897+
1898+from functools import wraps
1899+
1900+from twisted.application.service import Service
1901+from twisted.internet import threads
1902+
1903+
1904+class ReportingService(Service):
1905+
1906+ def __init__(self, instance_name="", clock=None):
1907+ self.tasks = []
1908+ self.clock = clock
1909+ self.instance_name = instance_name
1910+
1911+ def schedule(self, function, interval, report_function):
1912+ """
1913+ Schedule C{function} to be called every C{interval} seconds and then
1914+ report gathered metrics to C{Graphite} using C{report_function}.
1915+
1916+ If C{report_function} is C{None}, it just calls the function without
1917+ reporting the metrics.
1918+ """
1919+ if report_function is not None:
1920+ call = self.wrapped(function, report_function)
1921+ else:
1922+ call = function
1923+ task = LoopingCall(call)
1924+ if self.clock is not None:
1925+ task.clock = self.clock
1926+ self.tasks.append((task, interval))
1927+ if self.running:
1928+ task.start(interval, now=True)
1929+
1930+ def wrapped(self, function, report_function):
1931+ def report_metrics(metrics):
1932+ """For each metric returned, call C{report_function} with it."""
1933+ for name, value in metrics.items():
1934+ if self.instance_name:
1935+ name = self.instance_name + "." + name
1936+ report_function(name, value)
1937+ return metrics
1938+
1939+ @wraps(function)
1940+ def wrapper():
1941+ """Wrap C{function} to report metrics or log a failure."""
1942+ deferred = maybeDeferred(function)
1943+ deferred.addCallback(report_metrics)
1944+ deferred.addErrback(lambda failure: log.err(
1945+ failure, "Error while processing %s" % function.func_name))
1946+ return deferred
1947+ return wrapper
1948+
1949+ def startService(self):
1950+ Service.startService(self)
1951+ for task, interval in self.tasks:
1952+ task.start(interval, now=False)
1953+
1954+ def stopService(self):
1955+ for task, interval in self.tasks:
1956+ task.stop()
1957+ Service.stopService(self)
1958
1959=== added directory 'txstatsd/server'
1960=== added file 'txstatsd/server/__init__.py'
1961=== added file 'txstatsd/server/configurableprocessor.py'
1962--- txstatsd/server/configurableprocessor.py 1970-01-01 00:00:00 +0000
1963+++ txstatsd/server/configurableprocessor.py 2012-05-14 16:57:19 +0000
1964@@ -0,0 +1,108 @@
1965+
1966+import time
1967+
1968+from txstatsd.metrics.countermetric import CounterMetricReporter
1969+from txstatsd.metrics.gaugemetric import GaugeMetricReporter
1970+from txstatsd.metrics.metermetric import MeterMetricReporter
1971+from txstatsd.metrics.timermetric import TimerMetricReporter
1972+from txstatsd.server.processor import MessageProcessor
1973+
1974+
1975+class ConfigurableMessageProcessor(MessageProcessor):
1976+ """
1977+ This specialised C{MessageProcessor} supports behaviour
1978+ that is not StatsD-compliant.
1979+
1980+ Currently, this extends to:
1981+ - Allow a prefix to be added to the composed messages sent
1982+ to the Graphite server.
1983+ - Report an instantaneous reading of a particular value.
1984+ - Report an incrementing and decrementing counter metric.
1985+ - Report a timer metric which aggregates timing durations and provides
1986+ duration statistics, plus throughput statistics.
1987+ """
1988+
1989+ def __init__(self, time_function=time.time, message_prefix="",
1990+ internal_metrics_prefix="", plugins=None):
1991+ super(ConfigurableMessageProcessor, self).__init__(
1992+ time_function=time_function, plugins=plugins)
1993+
1994+ if not internal_metrics_prefix and not message_prefix:
1995+ internal_metrics_prefix = "statsd."
1996+ elif message_prefix and not internal_metrics_prefix:
1997+ internal_metrics_prefix = message_prefix + "." + "statsd."
1998+ self.internal_metrics_prefix = internal_metrics_prefix
1999+ self.message_prefix = message_prefix
2000+ self.gauge_metrics = {}
2001+
2002+ def get_message_prefix(self, kind):
2003+ return self.message_prefix
2004+
2005+ def compose_timer_metric(self, key, duration):
2006+ if not key in self.timer_metrics:
2007+ metric = TimerMetricReporter(key, prefix=self.message_prefix)
2008+ self.timer_metrics[key] = metric
2009+ self.timer_metrics[key].update(duration)
2010+
2011+ def process_counter_metric(self, key, composite, message):
2012+ try:
2013+ value = float(composite[0])
2014+ except (TypeError, ValueError):
2015+ return self.fail(message)
2016+
2017+ self.compose_counter_metric(key, value)
2018+
2019+ def compose_counter_metric(self, key, value):
2020+ if not key in self.counter_metrics:
2021+ metric = CounterMetricReporter(key, prefix=self.message_prefix)
2022+ self.counter_metrics[key] = metric
2023+ self.counter_metrics[key].mark(value)
2024+
2025+ def compose_gauge_metric(self, key, value):
2026+ if not key in self.gauge_metrics:
2027+ metric = GaugeMetricReporter(key, prefix=self.message_prefix)
2028+ self.gauge_metrics[key] = metric
2029+ self.gauge_metrics[key].mark(value)
2030+
2031+ def compose_meter_metric(self, key, value):
2032+ if not key in self.meter_metrics:
2033+ metric = MeterMetricReporter(key, self.time_function,
2034+ prefix=self.message_prefix)
2035+ self.meter_metrics[key] = metric
2036+ self.meter_metrics[key].mark(value)
2037+
2038+ def flush_counter_metrics(self, interval, timestamp):
2039+ metrics = []
2040+ events = 0
2041+ for metric in self.counter_metrics.itervalues():
2042+ messages = metric.report(timestamp)
2043+ metrics.extend(messages)
2044+ events += 1
2045+
2046+ return (metrics, events)
2047+
2048+ def flush_gauge_metrics(self, timestamp):
2049+ metrics = []
2050+ events = 0
2051+ for metric in self.gauge_metrics.itervalues():
2052+ messages = metric.report(timestamp)
2053+ metrics.extend(messages)
2054+ events += 1
2055+
2056+ return (metrics, events)
2057+
2058+ def flush_timer_metrics(self, percent, timestamp):
2059+ metrics = []
2060+ events = 0
2061+ for metric in self.timer_metrics.itervalues():
2062+ messages = metric.report(timestamp)
2063+ metrics.extend(messages)
2064+ events += 1
2065+
2066+ return (metrics, events)
2067+
2068+ def update_metrics(self):
2069+ super(ConfigurableMessageProcessor, self).update_metrics()
2070+
2071+ for metric in self.timer_metrics.itervalues():
2072+ metric.tick()
2073
2074=== added file 'txstatsd/server/httpinfo.py'
2075--- txstatsd/server/httpinfo.py 1970-01-01 00:00:00 +0000
2076+++ txstatsd/server/httpinfo.py 2012-05-14 16:57:19 +0000
2077@@ -0,0 +1,60 @@
2078+# -*- coding: utf-8 *-*
2079+import json
2080+
2081+from twisted.application import service, internet
2082+from twisted.web import server, resource, http
2083+
2084+
2085+class Status(resource.Resource):
2086+ isLeaf = True
2087+ time_high_water = 0.7
2088+
2089+ def __init__(self, processor, statsd_service):
2090+ resource.Resource.__init__(self)
2091+ self.processor = processor
2092+ self.statsd_service = statsd_service
2093+
2094+ def render_GET(self, request):
2095+ data = dict(flush_time=self.processor.last_flush_duration,
2096+ process_time=self.processor.last_process_duration,
2097+ flush_interval=self.statsd_service.flush_interval)
2098+ if data["flush_interval"] * self.time_high_water < (
2099+ data["process_time"] + data["flush_time"]):
2100+ data["status"] = "ERROR"
2101+ request.setResponseCode(http.INTERNAL_SERVER_ERROR)
2102+ return json.dumps(data)
2103+ data["status"] = "OK"
2104+ return json.dumps(data)
2105+
2106+
2107+class Metrics(resource.Resource):
2108+
2109+ def __init__(self, processor):
2110+ resource.Resource.__init__(self)
2111+ self.processor = processor
2112+
2113+ def getChild(self, name, request):
2114+ metric = self.processor.timer_metrics.get(name, None) or \
2115+ self.processor.plugin_metrics.get(name, None)
2116+ if metric is None:
2117+ return resource.NoResource()
2118+
2119+ meth = getattr(metric, "getResource", None)
2120+
2121+ if meth is None:
2122+ return resource.NoResource()
2123+
2124+ return meth()
2125+
2126+
2127+def makeService(options, processor, statsd_service):
2128+
2129+ if options["http-port"] is None:
2130+ return service.MultiService()
2131+
2132+ root = resource.Resource()
2133+ root.putChild("status", Status(processor, statsd_service))
2134+ root.putChild("metrics", Metrics(processor))
2135+ site = server.Site(root)
2136+ s = internet.TCPServer(int(options["http-port"]), site)
2137+ return s
2138
2139=== added file 'txstatsd/server/loggingprocessor.py'
2140--- txstatsd/server/loggingprocessor.py 1970-01-01 00:00:00 +0000
2141+++ txstatsd/server/loggingprocessor.py 2012-05-14 16:57:19 +0000
2142@@ -0,0 +1,36 @@
2143+
2144+import time
2145+
2146+from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor
2147+
2148+
2149+class LoggingMessageProcessor(ConfigurableMessageProcessor):
2150+ """
2151+ This specialised C{MessageProcessor} logs the received metrics
2152+ using the supplied logger (which should have a callable C{info}
2153+ attribute.)
2154+ """
2155+
2156+ def __init__( self, logger, time_function=time.time,
2157+ message_prefix="", plugins=None, **kwz ):
2158+ super(LoggingMessageProcessor, self).__init__(
2159+ time_function=time_function, message_prefix=message_prefix,
2160+ plugins=plugins, **kwz )
2161+
2162+ logger_info = getattr(logger, "info", None)
2163+ if logger_info is None or not callable(logger_info):
2164+ raise TypeError()
2165+ self.logger = logger
2166+
2167+ def process_message(self, message, metric_type, key, fields):
2168+ self.logger.info("In: %s" % message)
2169+ return super(LoggingMessageProcessor, self)\
2170+ .process_message(message, metric_type, key, fields)
2171+
2172+ def flush(self, interval=10000, percent=90):
2173+ """Log all received metric samples to the supplied logger."""
2174+ messages = list( super(LoggingMessageProcessor, self)\
2175+ .flush(interval=interval, percent=percent) )
2176+ for msg in messages:
2177+ self.logger.info("Out: %s %s %s" % msg)
2178+ return messages
2179
2180=== added file 'txstatsd/server/processor.py'
2181--- txstatsd/server/processor.py 1970-01-01 00:00:00 +0000
2182+++ txstatsd/server/processor.py 2012-05-14 16:57:19 +0000
2183@@ -0,0 +1,371 @@
2184+from collections import deque
2185+import re
2186+import time
2187+import logging
2188+
2189+from twisted.python import log
2190+
2191+from txstatsd.metrics.metermetric import MeterMetricReporter
2192+
2193+
2194+SPACES = re.compile("\s+")
2195+SLASHES = re.compile("\/+")
2196+NON_ALNUM = re.compile("[^a-zA-Z_\-0-9\.]")
2197+RATE = re.compile("^@([\d\.]+)")
2198+
2199+
2200+def normalize_key(key):
2201+ """
2202+ Normalize a key that might contain spaces, forward-slashes and other
2203+ special characters into something that is acceptable by graphite.
2204+ """
2205+ key = SPACES.sub("_", key)
2206+ key = SLASHES.sub("-", key)
2207+ key = NON_ALNUM.sub("", key)
2208+ return key
2209+
2210+
2211+class BaseMessageProcessor(object):
2212+
2213+ def process(self, message):
2214+ """
2215+ """
2216+ if not ":" in message:
2217+ return self.fail(message)
2218+
2219+ key, data = message.strip().split(":", 1)
2220+ if not "|" in data:
2221+ return self.fail(message)
2222+
2223+ fields = data.split("|")
2224+ if len(fields) < 2 or len(fields) > 3:
2225+ return self.fail(message)
2226+
2227+ key = normalize_key(key)
2228+ metric_type = fields[1]
2229+ return self.process_message(message, metric_type, key, fields)
2230+
2231+ def rebuild_message(self, metric_type, key, fields):
2232+ return key + ":" + "|".join(fields)
2233+
2234+ def fail(self, message):
2235+ """Log and discard malformed message."""
2236+ log.msg("Bad line: %r" % message, logLevel=logging.DEBUG)
2237+
2238+
2239+class MessageProcessor(BaseMessageProcessor):
2240+ """
2241+ This C{MessageProcessor} produces StatsD-compliant messages
2242+ for publishing to a Graphite server.
2243+ Metrics behaviour that varies from StatsD should be placed in
2244+ some specialised C{MessageProcessor} (see L{ConfigurableMessageProcessor
2245+ <txstatsd.server.configurableprocessor.ConfigurableMessageProcessor>}).
2246+ """
2247+
2248+ def __init__(self, time_function=time.time, plugins=None):
2249+ self.time_function = time_function
2250+
2251+ self.stats_prefix = "stats."
2252+ self.internal_metrics_prefix = "statsd."
2253+ self.count_prefix = "stats_counts."
2254+ self.timer_prefix = self.stats_prefix + "timers."
2255+ self.gauge_prefix = self.stats_prefix + "gauge."
2256+
2257+ self.process_timings = {}
2258+ self.by_type = {}
2259+ self.last_flush_duration = 0
2260+ self.last_process_duration = 0
2261+
2262+ self.timer_metrics = {}
2263+ self.counter_metrics = {}
2264+ self.gauge_metrics = deque()
2265+ self.meter_metrics = {}
2266+
2267+ self.plugins = {}
2268+ self.plugin_metrics = {}
2269+
2270+ if plugins is not None:
2271+ for plugin in plugins:
2272+ self.plugins[plugin.metric_type] = plugin
2273+
2274+ def process_message(self, message, metric_type, key, fields):
2275+ """
2276+ Process a single entry, adding it to either C{counters}, C{timers},
2277+ or C{gauge_metrics} depending on which kind of message it is.
2278+ """
2279+ start = self.time_function()
2280+ if metric_type == "c":
2281+ self.process_counter_metric(key, fields, message)
2282+ elif metric_type == "ms":
2283+ self.process_timer_metric(key, fields[0], message)
2284+ elif metric_type == "g":
2285+ self.process_gauge_metric(key, fields[0], message)
2286+ elif metric_type == "m":
2287+ self.process_meter_metric(key, fields[0], message)
2288+ elif metric_type in self.plugins:
2289+ self.process_plugin_metric(metric_type, key, fields, message)
2290+ else:
2291+ return self.fail(message)
2292+ self.process_timings.setdefault(metric_type, 0)
2293+ self.process_timings[metric_type] += self.time_function() - start
2294+ self.by_type.setdefault(metric_type, 0)
2295+ self.by_type[metric_type] += 1
2296+
2297+ def get_message_prefix(self, kind):
2298+ return "stats." + kind
2299+
2300+ def process_plugin_metric(self, metric_type, key, items, message):
2301+ if not key in self.plugin_metrics:
2302+ factory = self.plugins[metric_type]
2303+ metric = factory.build_metric(
2304+ self.get_message_prefix(factory.name),
2305+ name=key, wall_time_func=self.time_function)
2306+ self.plugin_metrics[key] = metric
2307+ self.plugin_metrics[key].process(items)
2308+
2309+ def process_timer_metric(self, key, duration, message):
2310+ try:
2311+ duration = float(duration)
2312+ except (TypeError, ValueError):
2313+ return self.fail(message)
2314+
2315+ self.compose_timer_metric(key, duration)
2316+
2317+ def compose_timer_metric(self, key, duration):
2318+ if key not in self.timer_metrics:
2319+ self.timer_metrics[key] = []
2320+ self.timer_metrics[key].append(duration)
2321+
2322+ def process_counter_metric(self, key, composite, message):
2323+ try:
2324+ value = float(composite[0])
2325+ except (TypeError, ValueError):
2326+ return self.fail(message)
2327+ rate = 1
2328+ if len(composite) == 3:
2329+ match = RATE.match(composite[2])
2330+ if match is None:
2331+ return self.fail(message)
2332+ rate = match.group(1)
2333+
2334+ self.compose_counter_metric(key, value, rate)
2335+
2336+ def compose_counter_metric(self, key, value, rate):
2337+ if key not in self.counter_metrics:
2338+ self.counter_metrics[key] = 0
2339+ self.counter_metrics[key] += value * (1 / float(rate))
2340+
2341+ def process_gauge_metric(self, key, composite, message):
2342+ values = composite.split(":")
2343+ if not len(values) == 1:
2344+ return self.fail(message)
2345+
2346+ try:
2347+ value = float(values[0])
2348+ except (TypeError, ValueError):
2349+ self.fail(message)
2350+
2351+ self.compose_gauge_metric(key, value)
2352+
2353+ def compose_gauge_metric(self, key, value):
2354+ metric = [value, key]
2355+ self.gauge_metrics.append(metric)
2356+
2357+ def process_meter_metric(self, key, composite, message):
2358+ values = composite.split(":")
2359+ if not len(values) == 1:
2360+ return self.fail(message)
2361+
2362+ try:
2363+ value = float(values[0])
2364+ except (TypeError, ValueError):
2365+ self.fail(message)
2366+
2367+ self.compose_meter_metric(key, value)
2368+
2369+ def compose_meter_metric(self, key, value):
2370+ if not key in self.meter_metrics:
2371+ metric = MeterMetricReporter(key, self.time_function,
2372+ prefix="stats.meter")
2373+ self.meter_metrics[key] = metric
2374+ self.meter_metrics[key].mark(value)
2375+
2376+ def flush(self, interval=10000, percent=90):
2377+ """
2378+ Flush all queued stats, computing a normalized count based on
2379+ C{interval} and mean timings based on C{threshold}.
2380+ """
2381+ messages = []
2382+ per_metric = {}
2383+ num_stats = 0
2384+ interval = interval / 1000
2385+ timestamp = int(self.time_function())
2386+
2387+ start = self.time_function()
2388+ counter_metrics, events = self.flush_counter_metrics(interval,
2389+ timestamp)
2390+ duration = self.time_function() - start
2391+ if events > 0:
2392+ messages.extend(sorted(counter_metrics))
2393+ num_stats += events
2394+ per_metric["counter"] = (events, duration)
2395+
2396+ start = self.time_function()
2397+ timer_metrics, events = self.flush_timer_metrics(percent, timestamp)
2398+ duration = self.time_function() - start
2399+ if events > 0:
2400+ messages.extend(sorted(timer_metrics))
2401+ num_stats += events
2402+ per_metric["timer"] = (events, duration)
2403+
2404+ start = self.time_function()
2405+ gauge_metrics, events = self.flush_gauge_metrics(timestamp)
2406+ duration = self.time_function() - start
2407+ if events > 0:
2408+ messages.extend(sorted(gauge_metrics))
2409+ num_stats += events
2410+ per_metric["gauge"] = (events, duration)
2411+
2412+ start = self.time_function()
2413+ meter_metrics, events = self.flush_meter_metrics(timestamp)
2414+ duration = self.time_function() - start
2415+ if events > 0:
2416+ messages.extend(sorted(meter_metrics))
2417+ num_stats += events
2418+ per_metric["meter"] = (events, duration)
2419+
2420+ start = self.time_function()
2421+ plugin_metrics, events = self.flush_plugin_metrics(interval, timestamp)
2422+ duration = self.time_function() - start
2423+ if events > 0:
2424+ messages.extend(sorted(plugin_metrics))
2425+ num_stats += events
2426+ per_metric["plugin"] = (events, duration)
2427+
2428+ self.flush_metrics_summary(messages, num_stats, per_metric, timestamp)
2429+ return messages
2430+
2431+ def flush_counter_metrics(self, interval, timestamp):
2432+ metrics = []
2433+ events = 0
2434+ for key, count in self.counter_metrics.iteritems():
2435+ self.counter_metrics[key] = 0
2436+
2437+ value = count / interval
2438+ metrics.append((self.stats_prefix + key, value, timestamp))
2439+ metrics.append((self.count_prefix + key, count, timestamp))
2440+ events += 1
2441+
2442+ return (metrics, events)
2443+
2444+ def flush_timer_metrics(self, percent, timestamp):
2445+ metrics = []
2446+ events = 0
2447+
2448+ threshold_value = ((100 - percent) / 100.0)
2449+ for key, timers in self.timer_metrics.iteritems():
2450+ count = len(timers)
2451+ if count > 0:
2452+ self.timer_metrics[key] = []
2453+
2454+ timers.sort()
2455+ lower = timers[0]
2456+ upper = timers[-1]
2457+ count = len(timers)
2458+
2459+ mean = lower
2460+ threshold_upper = upper
2461+
2462+ if count > 1:
2463+ index = count - int(round(threshold_value * count))
2464+ timers = timers[:index]
2465+ threshold_upper = timers[-1]
2466+ mean = sum(timers) / index
2467+
2468+ items = {".mean": mean,
2469+ ".upper": upper,
2470+ ".upper_%s" % percent: threshold_upper,
2471+ ".lower": lower,
2472+ ".count": count}
2473+ for item, value in items.iteritems():
2474+ metrics.append((self.timer_prefix + key + item,
2475+ value, timestamp))
2476+ events += 1
2477+
2478+ return (metrics, events)
2479+
2480+ def flush_gauge_metrics(self, timestamp):
2481+ metrics = []
2482+ events = 0
2483+ for metric in self.gauge_metrics:
2484+ value = metric[0]
2485+ key = metric[1]
2486+
2487+ metrics.append((self.gauge_prefix + key + ".value",
2488+ value, timestamp))
2489+ events += 1
2490+
2491+ self.gauge_metrics.clear()
2492+
2493+ return (metrics, events)
2494+
2495+ def flush_meter_metrics(self, timestamp):
2496+ metrics = []
2497+ events = 0
2498+ for metric in self.meter_metrics.itervalues():
2499+ messages = metric.report(timestamp)
2500+ metrics.extend(messages)
2501+ events += 1
2502+
2503+ return (metrics, events)
2504+
2505+ def flush_plugin_metrics(self, interval, timestamp):
2506+ metrics = []
2507+ events = 0
2508+
2509+ for metric in self.plugin_metrics.itervalues():
2510+ messages = metric.flush(interval, timestamp)
2511+ metrics.extend(messages)
2512+ events += 1
2513+
2514+ return (metrics, events)
2515+
2516+ def flush_metrics_summary(self, messages, num_stats,
2517+ per_metric, timestamp):
2518+
2519+ messages.append((self.internal_metrics_prefix + "numStats",
2520+ num_stats, timestamp))
2521+
2522+ self.last_flush_duration = 0
2523+ for name, (value, duration) in per_metric.iteritems():
2524+ messages.extend([
2525+ (self.internal_metrics_prefix +
2526+ "flush.%s.count" % name,
2527+ value, timestamp),
2528+ (self.internal_metrics_prefix +
2529+ "flush.%s.duration" % name,
2530+ duration * 1000, timestamp)])
2531+ log.msg("Flushed %d %s metrics in %.6f" %
2532+ (value, name, duration))
2533+ self.last_flush_duration += duration
2534+
2535+ self.last_process_duration = 0
2536+ for metric_type, duration in self.process_timings.iteritems():
2537+ messages.extend([
2538+ (self.internal_metrics_prefix +
2539+ "receive.%s.count" %
2540+ metric_type, self.by_type[metric_type], timestamp),
2541+ (self.internal_metrics_prefix +
2542+ "receive.%s.duration" %
2543+ metric_type, duration * 1000, timestamp)
2544+ ])
2545+ log.msg("Processing %d %s metrics took %.6f" %
2546+ (self.by_type[metric_type], metric_type, duration))
2547+ self.last_process_duration += duration
2548+
2549+ self.process_timings.clear()
2550+ self.by_type.clear()
2551+
2552+ def update_metrics(self):
2553+ for metric in self.meter_metrics.itervalues():
2554+ metric.tick()
2555
2556=== added file 'txstatsd/server/protocol.py'
2557--- txstatsd/server/protocol.py 1970-01-01 00:00:00 +0000
2558+++ txstatsd/server/protocol.py 2012-05-14 16:57:19 +0000
2559@@ -0,0 +1,63 @@
2560+from twisted.internet.protocol import (
2561+ DatagramProtocol, Protocol, Factory)
2562+from twisted.protocols.basic import LineReceiver
2563+
2564+
2565+class StatsDServerProtocol(DatagramProtocol):
2566+ """A Twisted-based implementation of the StatsD server.
2567+
2568+ Data is received via UDP for local aggregation and then sent to a Graphite
2569+ server via TCP.
2570+ """
2571+
2572+ def __init__(self, processor,
2573+ monitor_message=None, monitor_response=None):
2574+ self.processor = processor
2575+ self.monitor_message = monitor_message
2576+ self.monitor_response = monitor_response
2577+
2578+ def datagramReceived(self, data, (host, port)):
2579+ """Process received data and store it locally."""
2580+ if data == self.monitor_message:
2581+ # Send the expected response to the
2582+ # monitoring agent.
2583+ self.transport.write(self.monitor_response, (host, port))
2584+ else:
2585+ self.processor.process(data)
2586+
2587+
2588+class StatsDTCPServerProtocol(LineReceiver):
2589+ """A Twisted-based implementation of the StatsD server over TCP.
2590+
2591+ Data is received via TCP for local aggregation and then sent to a Graphite
2592+ server via TCP.
2593+ """
2594+
2595+ def __init__(self, processor,
2596+ monitor_message=None, monitor_response=None):
2597+ self.processor = processor
2598+ self.monitor_message = monitor_message
2599+ self.monitor_response = monitor_response
2600+
2601+ def lineReceived(self, data):
2602+ """Process received data and store it locally."""
2603+ if data == self.monitor_message:
2604+ # Send the expected response to the
2605+ # monitoring agent.
2606+ self.transport.write(self.monitor_response)
2607+ else:
2608+ self.processor.process(data)
2609+
2610+
2611+class StatsDTCPServerFactory(Factory):
2612+
2613+ def __init__(self, processor,
2614+ monitor_message=None, monitor_response=None):
2615+ self.processor = processor
2616+ self.monitor_message = monitor_message
2617+ self.monitor_response = monitor_response
2618+
2619+ def buildProtocol(self, addr):
2620+ return StatsDTCPServerProtocol(self.processor,
2621+ self.monitor_message, self.monitor_response)
2622+
2623
2624=== added file 'txstatsd/server/router.py'
2625--- txstatsd/server/router.py 1970-01-01 00:00:00 +0000
2626+++ txstatsd/server/router.py 2012-05-14 16:57:19 +0000
2627@@ -0,0 +1,308 @@
2628+"""
2629+Routes messages to different processors.
2630+
2631+Rules are of the form:
2632+ condition => target
2633+Where each on of condition and target are of the form:
2634+ name [arguments]*
2635+And the arguments are dependant on the name.
2636+
2637+Each rule is applied to the message on the order they are specified.
2638+
2639+Conditions supported:
2640+ any: will match all messages
2641+ metric_type [type]+: will match a metric of any of the types specified
2642+ path_like fnmatch_exp: will match the path against the expression with
2643+ fnmatch.
2644+ not [rule..]: will return the negation of the result of rule
2645+
2646+
2647+Targets supported:
2648+ drop: will drop the message, stopping any further processing.
2649+ redirect_udp host port: will send to (host, port) by udp
2650+ redirect_tcp host port: will send to (host, port) by tcp
2651+ rewrite pattern repl: will rewrite the path like re.sub
2652+ set_metric_type metric_type: will make the metric of type metric_type
2653+
2654+"""
2655+import fnmatch
2656+import re
2657+import time
2658+
2659+from zope.interface import implements
2660+
2661+from twisted.application.internet import UDPServer
2662+from twisted.application.service import Service
2663+from twisted.internet import interfaces
2664+from twisted.internet.protocol import (
2665+ DatagramProtocol, ReconnectingClientFactory, Protocol)
2666+from twisted.internet import defer
2667+from twisted.python import log
2668+
2669+from txstatsd.server.processor import BaseMessageProcessor
2670+
2671+
2672+class StopProcessingException(Exception):
2673+
2674+ pass
2675+
2676+
2677+class TCPRedirectService(Service):
2678+
2679+ def __init__(self, host, port, factory):
2680+ self.host = host
2681+ self.port = port
2682+ self.factory = factory
2683+
2684+ def startService(self):
2685+ from twisted.internet import reactor
2686+
2687+ reactor.connectTCP(self.host, self.port, self.factory)
2688+ return Service.startService(self)
2689+
2690+ def stopService(self):
2691+ self.factory.stopTrying()
2692+ if self.factory.protocol:
2693+ self.factory.protocol.transport.loseConnection()
2694+ return Service.stopService(self)
2695+
2696+
2697+class TCPRedirectClientFactory(ReconnectingClientFactory):
2698+
2699+ def __init__(self, callback=None):
2700+ self.callback = callback
2701+ self.protocol = None
2702+
2703+ def buildProtocol(self, addr):
2704+ from twisted.internet import reactor
2705+
2706+ self.resetDelay()
2707+ self.protocol = TCPRedirectProtocol()
2708+ if self.callback:
2709+ reactor.callLater(0, self.callback)
2710+ self.callback = None
2711+
2712+ return self.protocol
2713+
2714+ def write(self, data):
2715+ if self.protocol:
2716+ self.protocol.write(data)
2717+
2718+
2719+class TCPRedirectProtocol(Protocol):
2720+ """A client protocol for redicting messages over tcp.
2721+ """
2722+
2723+ implements(interfaces.IPushProducer)
2724+
2725+ def __init__(self):
2726+ self.paused = False
2727+ self.last_paused = None
2728+ self.dropped = 0
2729+
2730+ def connectionMade(self):
2731+ """
2732+ A connection has been made, register ourselves as a producer for the
2733+ bound transport.
2734+ """
2735+ self.transport.registerProducer(self, True)
2736+
2737+ def pauseProducing(self):
2738+ """Pause producing messages, since the buffer is full."""
2739+ self.last_paused = int(time.time())
2740+ self.paused = True
2741+
2742+ stopProducing = pauseProducing
2743+
2744+ def resumeProducing(self):
2745+ """We can write to the transport again. Yay!."""
2746+ time_now = int(time.time())
2747+ log("Resumed TCP redirect. "
2748+ "Dropped %s messages during %s seconds ",
2749+ self.dropped, time_now - self.last_paused)
2750+ self.paused = False
2751+ self.dropped = 0
2752+ self.last_paused = None
2753+
2754+ def write(self, line):
2755+ if self.paused:
2756+ self.dropped += 1
2757+ return
2758+
2759+ if line[-2:] != "\r\n":
2760+ if line[-1] == "\r":
2761+ line += "\n"
2762+ else:
2763+ line += "\r\n"
2764+ self.transport.write(line)
2765+
2766+
2767+class UDPRedirectProtocol(DatagramProtocol):
2768+
2769+ def __init__(self, host, port, callback):
2770+ self.host = host
2771+ self.port = port
2772+ self.callback = callback
2773+
2774+ def startProtocol(self):
2775+ self.transport.connect(self.host, self.port)
2776+ self.callback()
2777+
2778+ def write(self, data):
2779+ if self.transport is not None:
2780+ self.transport.write(data)
2781+
2782+
2783+class Router(BaseMessageProcessor):
2784+
2785+ def __init__(self, message_processor, rules_config, service=None):
2786+ """Configure a router with rules_config.
2787+
2788+ rules_config is a new_line separeted list of rules.
2789+ """
2790+ self.rules_config = rules_config
2791+ self.message_processor = message_processor
2792+ self.flush = message_processor.flush
2793+ self.ready = defer.succeed(None)
2794+ self.service = service
2795+ self.rules = self.build_rules(rules_config)
2796+
2797+ def build_condition(self, condition):
2798+ condition_parts = [
2799+ p.strip() for p in condition.split(" ") if p]
2800+ condition_factory = getattr(self,
2801+ "build_condition_" + condition_parts[0], None)
2802+ if condition_factory is None:
2803+ raise ValueError("unknown condition %s" %
2804+ (condition_parts[0],))
2805+ condition_function = condition_factory(*condition_parts[1:])
2806+ return condition_function
2807+
2808+ def build_rules(self, rules_config):
2809+ rules = []
2810+ for line in rules_config.split("\n"):
2811+ if not line:
2812+ continue
2813+
2814+ condition, target = line.split("=>")
2815+ condition_function = self.build_condition(condition)
2816+
2817+ target_parts = [
2818+ p.strip() for p in target.split(" ") if p]
2819+
2820+ target_factory = getattr(self,
2821+ "build_target_" + target_parts[0], None)
2822+
2823+ if target_factory is None:
2824+ raise ValueError("unknown target %s" %
2825+ (target_parts[0],))
2826+
2827+ rules.append((
2828+ condition_function,
2829+ target_factory(*target_parts[1:])))
2830+ return rules
2831+
2832+ def build_condition_any(self):
2833+ """Returns a condition that always matches."""
2834+ return lambda *args: True
2835+
2836+ def build_condition_not(self, *args):
2837+ """
2838+ Returns a condition that negates the condition from its arguments.
2839+ """
2840+ other_condition = self.build_condition(" ".join(args))
2841+
2842+ def not_condition(metric_type, key, fields):
2843+ return not other_condition(metric_type, key, fields)
2844+ return not_condition
2845+
2846+ def build_condition_metric_type(self, *metric_types):
2847+ """Returns a condition that matched on metric kind."""
2848+ def metric_type_condition(metric_type, key, fields):
2849+ return (metric_type in metric_types)
2850+ return metric_type_condition
2851+
2852+ def build_condition_path_like(self, pattern):
2853+ def path_like_condition(metric_type, key, fields):
2854+ return fnmatch.fnmatch(key, pattern)
2855+ return path_like_condition
2856+
2857+ def build_target_drop(self):
2858+ """Returns a target that stops the processing of a message."""
2859+ def drop(*args):
2860+ return
2861+ return drop
2862+
2863+ def build_target_rewrite(self, pattern, repl, dup="no-dup"):
2864+ rexp = re.compile(pattern)
2865+
2866+ def rewrite_target(metric_type, key, fields):
2867+ if dup == "dup" and rexp.match(key) is not None:
2868+ yield metric_type, key, fields
2869+ key = rexp.sub(repl, key)
2870+ yield metric_type, key, fields
2871+
2872+ return rewrite_target
2873+
2874+ def build_target_set_metric_type(self, metric_type, dup="no-dup"):
2875+ def set_metric_type(_, key, fields):
2876+ if dup == "dup":
2877+ yield _, key, fields
2878+ yield metric_type, key, fields
2879+ return set_metric_type
2880+
2881+ def build_target_redirect_udp(self, host, port):
2882+ if self.service is None:
2883+ return lambda *args: True
2884+
2885+ port = int(port)
2886+ d = defer.Deferred()
2887+ self.ready.addCallback(lambda _: d)
2888+ protocol = UDPRedirectProtocol(host, port, lambda: d.callback(None))
2889+
2890+ client = UDPServer(0, protocol)
2891+ client.setServiceParent(self.service)
2892+
2893+ def redirect_udp_target(metric_type, key, fields):
2894+ message = self.rebuild_message(metric_type, key, fields)
2895+ protocol.write(message)
2896+ yield metric_type, key, fields
2897+ return redirect_udp_target
2898+
2899+ def build_target_redirect_tcp(self, host, port):
2900+ if self.service is None:
2901+ return lambda *args: True
2902+
2903+ port = int(port)
2904+ d = defer.Deferred()
2905+ self.ready.addCallback(lambda _: d)
2906+ factory = TCPRedirectClientFactory(lambda: d.callback(None))
2907+
2908+ redirect_service = TCPRedirectService(host, port, factory)
2909+ redirect_service.setServiceParent(self.service)
2910+
2911+ def redirect_tcp_target(metric_type, key, fields):
2912+ message = self.rebuild_message(metric_type, key, fields)
2913+ factory.write(message)
2914+ yield metric_type, key, fields
2915+ return redirect_tcp_target
2916+
2917+ def process_message(self, message, metric_type, key, fields):
2918+ metrics = [(metric_type, key, fields)]
2919+ if self.rules:
2920+ for condition, target in self.rules:
2921+ pending, metrics = metrics, []
2922+ if not pending:
2923+ return
2924+ for metric_type, key, fields in pending:
2925+ if not condition(metric_type, key, fields):
2926+ metrics.append((metric_type, key, fields))
2927+ continue
2928+ result = target(metric_type, key, fields)
2929+ if result is not None:
2930+ metrics.extend(result)
2931+
2932+ for (metric_type, key, fields) in metrics:
2933+ message = self.rebuild_message(metric_type, key, fields)
2934+ self.message_processor.process_message(message, metric_type,
2935+ key, fields)
2936
2937=== added file 'txstatsd/service.py'
2938--- txstatsd/service.py 1970-01-01 00:00:00 +0000
2939+++ txstatsd/service.py 2012-05-14 16:57:19 +0000
2940@@ -0,0 +1,340 @@
2941+
2942+import getopt
2943+import sys
2944+import time
2945+import ConfigParser
2946+import platform
2947+import functools
2948+
2949+from twisted.application.internet import UDPServer, TCPServer
2950+from twisted.application.service import MultiService
2951+from twisted.python import usage, log
2952+from twisted.plugin import getPlugins
2953+
2954+from txstatsd.client import InternalClient
2955+from txstatsd.metrics.metrics import Metrics
2956+from txstatsd.metrics.extendedmetrics import ExtendedMetrics
2957+from txstatsd.server.processor import MessageProcessor
2958+from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor
2959+from txstatsd.server.loggingprocessor import LoggingMessageProcessor
2960+from txstatsd.server.protocol import (
2961+ StatsDServerProtocol, StatsDTCPServerFactory)
2962+from txstatsd.server.router import Router
2963+from txstatsd.server import httpinfo
2964+from txstatsd.report import ReportingService
2965+from txstatsd.itxstatsd import IMetricFactory
2966+from twisted.application.service import Service
2967+from twisted.internet import task
2968+
2969+
2970+def accumulateClassList(classObj, attr, listObj,
2971+ baseClass=None, excludeClass=None):
2972+ """Accumulate all attributes of a given name in a class hierarchy
2973+ into a single list.
2974+
2975+ Assuming all class attributes of this name are lists.
2976+ """
2977+ for base in classObj.__bases__:
2978+ accumulateClassList(base, attr, listObj, excludeClass=excludeClass)
2979+ if excludeClass != classObj:
2980+ if baseClass is None or baseClass in classObj.__bases__:
2981+ listObj.extend(classObj.__dict__.get(attr, []))
2982+
2983+
2984+class OptionsGlue(usage.Options):
2985+ """Extends usage.Options to also read parameters from a config file."""
2986+
2987+ optParameters = [
2988+ ["config", "c", None, "Config file to use."]]
2989+
2990+ def __init__(self):
2991+ parameters = []
2992+ accumulateClassList(self.__class__, 'optParameters',
2993+ parameters, excludeClass=OptionsGlue)
2994+ for parameter in parameters:
2995+ if parameter[0] == "config" or parameter[1] == "c":
2996+ raise ValueError("the --config/-c parameter is reserved.")
2997+
2998+ self.overridden_options = []
2999+
3000+ super(OptionsGlue, self).__init__()
3001+
3002+ def opt_config(self, config_path):
3003+ self['config'] = config_path
3004+
3005+ opt_c = opt_config
3006+
3007+ def parseOptions(self, options=None):
3008+ """Obtain overridden options."""
3009+
3010+ if options is None:
3011+ options = sys.argv[1:]
3012+ try:
3013+ opts, args = getopt.getopt(options,
3014+ self.shortOpt, self.longOpt)
3015+ except getopt.error, e:
3016+ raise usage.UsageError(str(e))
3017+
3018+ for opt, arg in opts:
3019+ if opt[1] == '-':
3020+ opt = opt[2:]
3021+ else:
3022+ opt = opt[1:]
3023+ self.overridden_options.append(opt)
3024+
3025+ super(OptionsGlue, self).parseOptions(options=options)
3026+
3027+ def postOptions(self):
3028+ """Read the configuration file if one is provided."""
3029+ if self['config'] is not None:
3030+ config_file = ConfigParser.RawConfigParser()
3031+ config_file.read(self['config'])
3032+
3033+ self.configure(config_file)
3034+
3035+ def overridden_option(self, opt):
3036+ """Return whether this option was overridden."""
3037+ return opt in self.overridden_options
3038+
3039+ def configure(self, config_file):
3040+ """Read the configuration items, coercing types as required."""
3041+ for name, value in config_file.items(self.config_section):
3042+ self._coerce_option(name, value)
3043+
3044+ for section in sorted(config_file.sections()):
3045+ if section.startswith("plugin_"):
3046+ self[section] = config_file.items(section)
3047+ if section.startswith("carbon-cache"):
3048+ for name, value in config_file.items(section):
3049+ self._coerce_option(name, value)
3050+
3051+ def _coerce_option(self, name, value):
3052+ """Coerce a single option, checking for overriden options."""
3053+ # Overridden options have precedence
3054+ if not self.overridden_option(name):
3055+ # Options appends '=' when gathering the parameters
3056+ if (name + '=') in self.longOpt:
3057+ # Coerce the type if required
3058+ if name in self._dispatch:
3059+ if isinstance(self._dispatch[name], usage.CoerceParameter):
3060+ value = self._dispatch[name].coerce(value)
3061+ else:
3062+ self._dispatch[name](name, value)
3063+ return
3064+ self[name] = value
3065+
3066+
3067+class StatsDOptions(OptionsGlue):
3068+ """
3069+ The set of configuration settings for txStatsD.
3070+ """
3071+
3072+ optParameters = [
3073+ ["carbon-cache-host", "h", None,
3074+ "The host where carbon cache is listening.", str],
3075+ ["carbon-cache-port", "p", None,
3076+ "The port where carbon cache is listening.", int],
3077+ ["carbon-cache-name", "n", None,
3078+ "An identifier for the carbon-cache instance."],
3079+ ["listen-port", "l", 8125,
3080+ "The UDP port where we will listen.", int],
3081+ ["flush-interval", "i", 60000,
3082+ "The number of milliseconds between each flush.", int],
3083+ ["prefix", "x", None,
3084+ "Prefix to use when reporting stats.", str],
3085+ ["instance-name", "N", None,
3086+ "Instance name for our own stats reporting.", str],
3087+ ["report", "r", None,
3088+ "Which additional stats to report {process|net|io|system}.", str],
3089+ ["monitor-message", "m", "txstatsd ping",
3090+ "Message we expect from monitoring agent.", str],
3091+ ["monitor-response", "o", "txstatsd pong",
3092+ "Response we should send monitoring agent.", str],
3093+ ["statsd-compliance", "s", 1,
3094+ "Produce StatsD-compliant messages.", int],
3095+ ["dump-mode", "d", 0,
3096+ "Dump received and aggregated metrics"
3097+ " before passing them to carbon.", int],
3098+ ["routing", "g", "",
3099+ "Routing rules", str],
3100+ ["listen-tcp-port", "t", None,
3101+ "The TCP port where we will listen.", int],
3102+ ["max-queue-size", "Q", 20000,
3103+ "Maximum send queue size per destination.", int],
3104+ ["max-datapoints-per-message", "M", 1000,
3105+ "Maximum datapoints per message to carbon-cache.", int],
3106+ ["http-port", "P", None,
3107+ "The httpinfo port.", int],
3108+ ]
3109+
3110+ def __init__(self):
3111+ self.config_section = 'statsd'
3112+ super(StatsDOptions, self).__init__()
3113+ self["carbon-cache-host"] = []
3114+ self["carbon-cache-port"] = []
3115+ self["carbon-cache-name"] = []
3116+
3117+ def opt_carbon_cache_host(self, host):
3118+ self["carbon-cache-host"].append(host)
3119+
3120+ def opt_carbon_cache_port(self, port):
3121+ self["carbon-cache-port"].append(usage.portCoerce(port))
3122+
3123+ def opt_carbon_cache_name(self, name):
3124+ self["carbon-cache-name"].append(name)
3125+
3126+
3127+class StatsDService(Service):
3128+
3129+ def __init__(self, carbon_client, processor, flush_interval, clock=None):
3130+ self.carbon_client = carbon_client
3131+ self.processor = processor
3132+ self.flush_interval = flush_interval
3133+ self.flush_task = task.LoopingCall(self.flushProcessor)
3134+ if clock is not None:
3135+ self.flush_task.clock = clock
3136+
3137+ def flushProcessor(self):
3138+ """Flush messages queued in the processor to Graphite."""
3139+ flushed = 0
3140+ start = time.time()
3141+ for metric, value, timestamp in self.processor.flush(
3142+ interval=self.flush_interval):
3143+ self.carbon_client.sendDatapoint(metric, (timestamp, value))
3144+ flushed += 1
3145+ log.msg("Flushed total %d metrics in %.6f" %
3146+ (flushed, time.time() - start))
3147+
3148+ def startService(self):
3149+ self.flush_task.start(self.flush_interval / 1000, False)
3150+
3151+ def stopService(self):
3152+ if self.flush_task.running:
3153+ self.flush_task.stop()
3154+
3155+
3156+def report_client_manager_stats():
3157+ from carbon.instrumentation import stats
3158+
3159+ current_stats = stats.copy()
3160+ for name, value in list(current_stats.items()):
3161+ del current_stats[name]
3162+ if name.startswith("destinations"):
3163+ current_stats[name.replace(":", "_")] = value
3164+ stats[name] = 0
3165+ return current_stats
3166+
3167+
3168+def createService(options):
3169+ """Create a txStatsD service."""
3170+ from carbon.routers import ConsistentHashingRouter
3171+ from carbon.client import CarbonClientManager
3172+ from carbon.conf import settings
3173+
3174+ settings.MAX_QUEUE_SIZE = options["max-queue-size"]
3175+ settings.MAX_DATAPOINTS_PER_MESSAGE = options["max-datapoints-per-message"]
3176+
3177+ root_service = MultiService()
3178+ root_service.setName("statsd")
3179+
3180+ prefix = options["prefix"]
3181+ if prefix is None:
3182+ prefix = "statsd"
3183+
3184+ instance_name = options["instance-name"]
3185+ if not instance_name:
3186+ instance_name = platform.node()
3187+
3188+ # initialize plugins
3189+ plugin_metrics = []
3190+ for plugin in getPlugins(IMetricFactory):
3191+ plugin.configure(options)
3192+ plugin_metrics.append(plugin)
3193+
3194+ processor = None
3195+ if options["dump-mode"]:
3196+ # LoggingMessageProcessor supersedes
3197+ # any other processor class in "dump-mode"
3198+ assert not hasattr(log, 'info')
3199+ log.info = log.msg # for compatibility with LMP logger interface
3200+ processor = functools.partial(LoggingMessageProcessor, logger=log)
3201+
3202+ if options["statsd-compliance"]:
3203+ processor = (processor or MessageProcessor)(plugins=plugin_metrics)
3204+ input_router = Router(processor, options['routing'], root_service)
3205+ connection = InternalClient(input_router)
3206+ metrics = Metrics(connection)
3207+ else:
3208+ processor = (processor or ConfigurableMessageProcessor)(
3209+ message_prefix=prefix,
3210+ internal_metrics_prefix=prefix + "." + instance_name + ".",
3211+ plugins=plugin_metrics)
3212+ input_router = Router(processor, options['routing'], root_service)
3213+ connection = InternalClient(input_router)
3214+ metrics = ExtendedMetrics(connection)
3215+
3216+ if not options["carbon-cache-host"]:
3217+ options["carbon-cache-host"].append("127.0.0.1")
3218+ if not options["carbon-cache-port"]:
3219+ options["carbon-cache-port"].append(2004)
3220+ if not options["carbon-cache-name"]:
3221+ options["carbon-cache-name"].append(None)
3222+
3223+ reporting = ReportingService(instance_name)
3224+ reporting.setServiceParent(root_service)
3225+
3226+ # Schedule updates for those metrics expecting to be
3227+ # periodically updated, for example the meter metric.
3228+ reporting.schedule(processor.update_metrics, 10, None)
3229+ reporting.schedule(report_client_manager_stats,
3230+ options["flush-interval"] / 1000,
3231+ metrics.gauge)
3232+
3233+ if options["report"] is not None:
3234+ from txstatsd import process
3235+ from twisted.internet import reactor
3236+
3237+ reporting.schedule(
3238+ process.report_reactor_stats(reactor), 60, metrics.gauge)
3239+ reports = [name.strip() for name in options["report"].split(",")]
3240+ for report_name in reports:
3241+ for reporter in getattr(process, "%s_STATS" %
3242+ report_name.upper(), ()):
3243+ reporting.schedule(reporter, 60, metrics.gauge)
3244+
3245+ # XXX Make this configurable.
3246+ router = ConsistentHashingRouter()
3247+ carbon_client = CarbonClientManager(router)
3248+ carbon_client.setServiceParent(root_service)
3249+
3250+ for host, port, name in zip(options["carbon-cache-host"],
3251+ options["carbon-cache-port"],
3252+ options["carbon-cache-name"]):
3253+ carbon_client.startClient((host, port, name))
3254+
3255+ statsd_service = StatsDService(carbon_client, input_router,
3256+ options["flush-interval"])
3257+ statsd_service.setServiceParent(root_service)
3258+
3259+ statsd_server_protocol = StatsDServerProtocol(
3260+ input_router,
3261+ monitor_message=options["monitor-message"],
3262+ monitor_response=options["monitor-response"])
3263+
3264+ listener = UDPServer(options["listen-port"], statsd_server_protocol)
3265+ listener.setServiceParent(root_service)
3266+
3267+ if options["listen-tcp-port"] is not None:
3268+ statsd_tcp_server_factory = StatsDTCPServerFactory(
3269+ input_router,
3270+ monitor_message=options["monitor-message"],
3271+ monitor_response=options["monitor-response"])
3272+
3273+ listener = TCPServer(options["listen-tcp-port"],
3274+ statsd_tcp_server_factory)
3275+ listener.setServiceParent(root_service)
3276+
3277+ httpinfo_service = httpinfo.makeService(options, processor, statsd_service)
3278+ httpinfo_service.setServiceParent(root_service)
3279+
3280+ return root_service
3281
3282=== added directory 'txstatsd/stats'
3283=== added file 'txstatsd/stats/__init__.py'
3284=== added file 'txstatsd/stats/ewma.py'
3285--- txstatsd/stats/ewma.py 1970-01-01 00:00:00 +0000
3286+++ txstatsd/stats/ewma.py 2012-05-14 16:57:19 +0000
3287@@ -0,0 +1,70 @@
3288+
3289+"""
3290+An exponentially-weighted moving average.
3291+
3292+See:
3293+- U{UNIX Load Average Part 1: How It Works
3294+ <http://www.teamquest.com/pdfs/whitepaper/ldavg1.pdf>}
3295+- U{UNIX Load Average Part 2: Not Your Average Average
3296+ <http://www.teamquest.com/pdfs/whitepaper/ldavg2.pdf>}
3297+"""
3298+
3299+import math
3300+
3301+
3302+class Ewma(object):
3303+ M1_ALPHA = 1 - math.exp(-5 / 60.0)
3304+ M5_ALPHA = 1 - math.exp(-5 / 60.0 / 5)
3305+ M15_ALPHA = 1 - math.exp(-5 / 60.0 / 15)
3306+
3307+ @classmethod
3308+ def one_minute_ewma(cls):
3309+ """
3310+ Creates a new C{Ewma} which is equivalent to the UNIX one minute
3311+ load average and which expects to be ticked every 5 seconds.
3312+ """
3313+ return Ewma(Ewma.M1_ALPHA, 5)
3314+
3315+ @classmethod
3316+ def five_minute_ewma(cls):
3317+ """
3318+ Creates a new C{Ewma} which is equivalent to the UNIX five minute
3319+ load average and which expects to be ticked every 5 seconds.
3320+ """
3321+ return Ewma(Ewma.M5_ALPHA, 5)
3322+
3323+ @classmethod
3324+ def fifteen_minute_ewma(cls):
3325+ """
3326+ Creates a new C{Ewma} which is equivalent to the UNIX fifteen
3327+ minute load average and which expects to be ticked every 5 seconds.
3328+ """
3329+ return Ewma(Ewma.M15_ALPHA, 5)
3330+
3331+ def __init__(self, alpha, interval):
3332+ """Create a new C{Ewma} with a specific smoothing constant.
3333+
3334+ @param alpha: The smoothing constant.
3335+ @param interval: The expected tick interval in seconds.
3336+ """
3337+ self.interval = interval
3338+ self.alpha = float(alpha)
3339+
3340+ self.initialized = False
3341+ self.rate = 0.0
3342+ self.uncounted = 0
3343+
3344+ def update(self, n):
3345+ """Update the moving average with a new value."""
3346+ self.uncounted += n
3347+
3348+ def tick(self):
3349+ """Mark the passage of time and decay the current rate accordingly."""
3350+ count = self.uncounted
3351+ self.uncounted = 0
3352+ instant_rate = float(count) / self.interval
3353+ if self.initialized:
3354+ self.rate += (self.alpha * (instant_rate - self.rate))
3355+ else:
3356+ self.rate = instant_rate
3357+ self.initialized = True
3358
3359=== added file 'txstatsd/stats/exponentiallydecayingsample.py'
3360--- txstatsd/stats/exponentiallydecayingsample.py 1970-01-01 00:00:00 +0000
3361+++ txstatsd/stats/exponentiallydecayingsample.py 2012-05-14 16:57:19 +0000
3362@@ -0,0 +1,124 @@
3363+import bisect
3364+import math
3365+import random
3366+import time
3367+
3368+
3369+class ExponentiallyDecayingSample(object):
3370+ """
3371+ An exponentially-decaying random sample of values. Uses Cormode et
3372+ al's forward-decaying priority reservoir sampling method to produce a
3373+ statistically representative sample, exponentially biased towards newer
3374+ entries.
3375+
3376+ See:
3377+ - U{Cormode et al. Forward Decay: A Practical Time Decay Model for
3378+ Streaming Systems. ICDE '09: Proceedings of the 2009 IEEE International
3379+ Conference on Data Engineering (2009)
3380+ <http://www.research.att.com/people/Cormode_Graham/
3381+ library/publications/CormodeShkapenyukSrivastavaXu09.pdf>}
3382+ """
3383+
3384+ # 10 minutes (in seconds)
3385+ RESCALE_THRESHOLD = 60 * 10
3386+
3387+ def __init__(self, reservoir_size, alpha, wall_time=None):
3388+ """Creates a new C{ExponentiallyDecayingSample}.
3389+
3390+ @param reservoir_size: The number of samples to keep in the sampling
3391+ reservoir.
3392+ @parama alpha: The exponential decay factor; the higher this is,
3393+ the more biased the sample will be towards newer values.
3394+ """
3395+ self._values = []
3396+ self.alpha = alpha
3397+ self.reservoir_size = reservoir_size
3398+
3399+ self.count = 0
3400+ self.start_time = 0
3401+ self.next_scale_time = 0
3402+
3403+ if wall_time is None:
3404+ wall_time = time.time
3405+ self._wall_time = wall_time
3406+ self.clear()
3407+
3408+ def clear(self):
3409+ self._values = []
3410+ self.count = 0
3411+ self.start_time = self.tick()
3412+ self.next_scale_time = (
3413+ self._wall_time() + self.RESCALE_THRESHOLD)
3414+
3415+ def size(self):
3416+ return min(self.reservoir_size, self.count)
3417+
3418+ def update(self, value, timestamp=None):
3419+ """Adds an old value with a fixed timestamp to the sample.
3420+
3421+ @param value: The value to be added.
3422+ @param timestamp: The epoch timestamp of *value* in seconds.
3423+ """
3424+
3425+ now = self._wall_time()
3426+ next = self.next_scale_time
3427+ if now >= next:
3428+ self.rescale(now, next)
3429+
3430+ if timestamp is None:
3431+ timestamp = self.tick()
3432+
3433+ priority = self.weight(timestamp - self.start_time) / random.random()
3434+ self.count += 1
3435+ new_count = self.count
3436+ if new_count <= self.reservoir_size:
3437+ bisect.insort(self._values, (priority, value))
3438+ else:
3439+ first = self._values[0][0]
3440+
3441+ if first < priority:
3442+ bisect.insort(self._values, (priority, value))
3443+ self._values = self._values[1:]
3444+
3445+ def get_values(self):
3446+ return [v for (k, v) in self._values]
3447+
3448+ def tick(self):
3449+ return self._wall_time()
3450+
3451+ def weight(self, t):
3452+ return math.exp(self.alpha * t)
3453+
3454+ def rescale(self, now, next):
3455+ """
3456+ A common feature of the above techniques - indeed, the key technique
3457+ that allows us to track the decayed weights efficiently - is that they
3458+ maintain counts and other quantities based on g(ti - L), and only
3459+ scale by g(t - L) at query time. But while g(ti - L)/g(t-L) is
3460+ guaranteed to lie between zero and one, the intermediate values of
3461+ g(ti - L) could become very large. For polynomial functions, these
3462+ values should not grow too large, and should be effectively
3463+ represented in practice by floating point values without loss of
3464+ precision. For exponential functions, these values could grow quite
3465+ large as new values of (ti - L) become large, and potentially exceed
3466+ the capacity of common floating point types. However, since the values
3467+ stored by the algorithms are linear combinations of g values (scaled
3468+ sums), they can be rescaled relative to a new landmark. That is, by
3469+ the analysis of exponential decay in Section III-A, the choice of L
3470+ does not affect the final result. We can therefore multiply each value
3471+ based on L by a factor of exp(-alpha(L' - L)), and obtain the correct
3472+ value as if we had instead computed relative to a new landmark L' (and
3473+ then use this new L' at query time). This can be done with a linear
3474+ pass over whatever data structure is being used.
3475+ """
3476+
3477+ self.next_scale_time = (
3478+ now + self.RESCALE_THRESHOLD)
3479+ old_start_time = self.start_time
3480+ self.start_time = self.tick()
3481+
3482+ new_values = []
3483+ for k, v in self._values:
3484+ nk = k * math.exp(-self.alpha * (self.start_time - old_start_time))
3485+ new_values.append((nk, v))
3486+ self._values = new_values
3487
3488=== added file 'txstatsd/stats/uniformsample.py'
3489--- txstatsd/stats/uniformsample.py 1970-01-01 00:00:00 +0000
3490+++ txstatsd/stats/uniformsample.py 2012-05-14 16:57:19 +0000
3491@@ -0,0 +1,45 @@
3492+
3493+import random
3494+import sys
3495+
3496+
3497+class UniformSample(object):
3498+ """
3499+ A random sample of a stream of values. Uses Vitter's Algorithm R to
3500+ produce a statistically representative sample.
3501+
3502+ See:
3503+ - U{Random Sampling with a Reservoir
3504+ <http://www.cs.umd.edu/~samir/498/vitter.pdf>}
3505+ """
3506+
3507+ def __init__(self, reservoir_size):
3508+ """Creates a new C{UniformSample}.
3509+
3510+ @param reservoir_size: The number of samples to keep in the sampling
3511+ reservoir.
3512+ """
3513+ self._values = [0 for i in range(reservoir_size)]
3514+ self._count = 0
3515+ self.clear()
3516+
3517+ def clear(self):
3518+ self._values = [0 for i in range(len(self._values))]
3519+ self._count = 0
3520+
3521+ def size(self):
3522+ c = self._count
3523+ return len(self._values) if c > len(self._values) else c
3524+
3525+ def update(self, value):
3526+ self._count += 1
3527+ if self._count <= len(self._values):
3528+ self._values[self._count - 1] = value
3529+ else:
3530+ r = random.randint(1, sys.maxint) % self._count
3531+ if r < len(self._values):
3532+ self._values[r] = value
3533+
3534+ def get_values(self):
3535+ s = self.size()
3536+ return [self._values[i] for i in range(0, s)]
3537
3538=== added directory 'txstatsd/tests'
3539=== added file 'txstatsd/tests/__init__.py'
3540=== added file 'txstatsd/tests/helper.py'
3541--- txstatsd/tests/helper.py 1970-01-01 00:00:00 +0000
3542+++ txstatsd/tests/helper.py 2012-05-14 16:57:19 +0000
3543@@ -0,0 +1,18 @@
3544+from txstatsd.metrics.metrics import Metrics
3545+
3546+class FakeStatsDClient(object):
3547+ """A fake C{StatsDClient} that simply appends to metrics.data on write."""
3548+
3549+ def __init__(self, metrics):
3550+ self.metrics = metrics
3551+
3552+ def write(self, data):
3553+ self.metrics.data.append(data)
3554+
3555+
3556+class FakeMetrics(Metrics):
3557+ """A fake C{IMeter} that simply stores metrics locally."""
3558+
3559+ def __init__(self, namespace=""):
3560+ Metrics.__init__(self, FakeStatsDClient(self), namespace=namespace)
3561+ self.data = []
3562
3563=== added directory 'txstatsd/tests/metrics'
3564=== added file 'txstatsd/tests/metrics/__init__.py'
3565--- txstatsd/tests/metrics/__init__.py 1970-01-01 00:00:00 +0000
3566+++ txstatsd/tests/metrics/__init__.py 2012-05-14 16:57:19 +0000
3567@@ -0,0 +1,2 @@
3568+
3569+
3570
3571=== added file 'txstatsd/tests/metrics/test_derive.py'
3572--- txstatsd/tests/metrics/test_derive.py 1970-01-01 00:00:00 +0000
3573+++ txstatsd/tests/metrics/test_derive.py 2012-05-14 16:57:19 +0000
3574@@ -0,0 +1,77 @@
3575+import itertools as it, operator as op, functools as ft
3576+
3577+import random
3578+
3579+from twisted.trial.unittest import TestCase
3580+from twisted.plugin import getPlugins
3581+from twisted.plugins import derive_plugin
3582+from txstatsd.itxstatsd import IMetricFactory
3583+
3584+
3585+class TestDeriveMetricReporter(TestCase):
3586+
3587+ mt = derive_plugin.DeriveMetricFactory.metric_type
3588+
3589+
3590+ def test_fastpoll(self):
3591+ random.seed(1)
3592+
3593+ wall_time = random.randint(1, 1000)
3594+ reporter = derive_plugin.DeriveMetricReporter(
3595+ 'test', wall_time_func=lambda: wall_time )
3596+ self.assertEquals(reporter.flush(random.randint(1, 1000), wall_time), list())
3597+
3598+ for j in xrange(random.randint(1, 100)):
3599+ reporter.process([random.random() * 1e3, self.mt])
3600+ self.assertEquals(reporter.flush(random.randint(1, 1000), wall_time), list())
3601+
3602+
3603+ def test_interface(self):
3604+ random.seed(1)
3605+
3606+ wall_time = random.random() * 1e6
3607+ reporter = derive_plugin.DeriveMetricReporter(
3608+ 'test', prefix='some.prefix', wall_time_func=lambda: wall_time )
3609+
3610+ count = random.randint(1, 100)
3611+ for j in xrange(count):
3612+ wall_time += 1
3613+ reporter.process([random.random() * 1e3, self.mt])
3614+
3615+ ts = random.random() * 1e6
3616+ values = reporter.flush(count, ts)
3617+ self.assertEquals(len(values), 2)
3618+ self.assertEquals(set(it.imap(len, values)), {3})
3619+ self.assertEquals(set(it.imap(op.itemgetter(2), values)), {ts})
3620+ self.assertEquals(
3621+ set(it.imap(op.itemgetter(0), values)),
3622+ {'some.prefix.test.count', 'some.prefix.test.rate'} )
3623+
3624+
3625+ def test_values(self):
3626+ random.seed(1)
3627+
3628+ wall_time = 0
3629+ reporter = derive_plugin.DeriveMetricReporter(
3630+ 'test', wall_time_func=lambda: wall_time )
3631+
3632+ val_count = 0
3633+ for i in xrange(100):
3634+ rate = (random.random() - 0.5) * 1e3
3635+ count = random.randint(2, 100)
3636+
3637+ for j in xrange(1, count):
3638+ wall_time += 1
3639+ reporter.process([rate, self.mt])
3640+ val_count += rate
3641+
3642+ values = dict(it.imap(op.itemgetter(0, 1), reporter.flush(count, wall_time)))
3643+ self.assertTrue(abs(values['test.count'] - val_count) < count**-1)
3644+ self.assertTrue(abs(values['test.rate'] - rate) < count**-1)
3645+
3646+
3647+class TestPlugin(TestCase):
3648+
3649+ def test_factory(self):
3650+ self.assertTrue(derive_plugin.derive_metric_factory in \
3651+ list(getPlugins(IMetricFactory)))
3652
3653=== added file 'txstatsd/tests/metrics/test_distinct.py'
3654--- txstatsd/tests/metrics/test_distinct.py 1970-01-01 00:00:00 +0000
3655+++ txstatsd/tests/metrics/test_distinct.py 2012-05-14 16:57:19 +0000
3656@@ -0,0 +1,94 @@
3657+# Copyright (C) 2011 Canonical
3658+# All Rights Reserved
3659+import random
3660+
3661+from scipy.stats import chi2
3662+
3663+from twisted.trial.unittest import TestCase
3664+from twisted.plugin import getPlugins
3665+from twisted.plugins import distinct_plugin
3666+import txstatsd.metrics.distinctmetric as distinct
3667+from txstatsd.itxstatsd import IMetricFactory
3668+
3669+
3670+class TestHash(TestCase):
3671+
3672+ def test_hash_chars(self):
3673+ "For one table, all chars map to different chars"
3674+ results = set()
3675+ for c in range(256):
3676+ random.seed(1)
3677+ h = distinct.hash(chr(c))
3678+ results.add(h)
3679+ self.assertEquals(len(results), 256)
3680+
3681+ def test_chi_square(self):
3682+ N = 10000
3683+
3684+ for (bits, buckets) in [(-1, 1024), (24, 256),
3685+ (16, 256), (8, 256), (0, 256)]:
3686+ bins = [0] * buckets
3687+ for i in range(N):
3688+ v = distinct.hash(str(i))
3689+ if bits < 0:
3690+ bin = v / (0xFFFFFFFF / buckets)
3691+ else:
3692+ bin = (v >> bits) & 0xFF
3693+ bins[bin] += 1
3694+ value = sum(((x - N / buckets) ** 2) / (N / buckets) for x in bins)
3695+ pval = chi2.cdf(value, N)
3696+ if pval > 0.5:
3697+ print bins, pval
3698+ self.assertTrue(pval < 0.5, "bits %s, pval == %s" % (bits, pval))
3699+ test_chi_square.skip = "Takes too long to run every time."
3700+
3701+
3702+class TestZeros(TestCase):
3703+
3704+ def test_zeros(self):
3705+ self.assertEquals(distinct.zeros(1), 0)
3706+ self.assertEquals(distinct.zeros(2), 1)
3707+ self.assertEquals(distinct.zeros(4), 2)
3708+ self.assertEquals(distinct.zeros(5), 0)
3709+ self.assertEquals(distinct.zeros(8), 3)
3710+ self.assertEquals(distinct.zeros(9), 0)
3711+
3712+
3713+class TestDistinct(TestCase):
3714+
3715+ def test_all(self):
3716+ random.seed(1)
3717+
3718+ for r in [1000, 10000]:
3719+ cd = distinct.SlidingDistinctCounter(32, 32)
3720+ for i in range(r):
3721+ cd.add(1, str(i))
3722+ error = abs(cd.distinct() - r)
3723+ self.assertTrue(error < 0.15 * r)
3724+
3725+
3726+class TestDistinctMetricReporter(TestCase):
3727+
3728+ def test_reports(self):
3729+ random.seed(1)
3730+ _wall_time = [0]
3731+ def _time():
3732+ return _wall_time[0]
3733+
3734+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
3735+ for i in range(3000):
3736+ _wall_time[0] = i * 50
3737+ dmr.update(str(i))
3738+ now = _time()
3739+ self.assertTrue(abs(dmr.count() - 3000) < 600)
3740+ self.assertTrue(abs(dmr.count_1min(now) - 1) < 2)
3741+ self.assertTrue(abs(dmr.count_1hour(now) - 72) < 15)
3742+ self.assertTrue(abs(dmr.count_1day(now) - 1728) < 500)
3743+
3744+
3745+class TestPlugin(TestCase):
3746+
3747+ def test_factory(self):
3748+ self.assertTrue(distinct_plugin.distinct_metric_factory in \
3749+ list(getPlugins(IMetricFactory)))
3750+
3751
3752=== added file 'txstatsd/tests/metrics/test_histogrammetric.py'
3753--- txstatsd/tests/metrics/test_histogrammetric.py 1970-01-01 00:00:00 +0000
3754+++ txstatsd/tests/metrics/test_histogrammetric.py 2012-05-14 16:57:19 +0000
3755@@ -0,0 +1,91 @@
3756+
3757+import math
3758+from unittest import TestCase
3759+
3760+from txstatsd.metrics.histogrammetric import HistogramMetricReporter
3761+from txstatsd.stats.uniformsample import UniformSample
3762+
3763+
3764+class TestHistogramReporterMetric(TestCase):
3765+
3766+ def test_histogram_with_zero_recorded_values(self):
3767+ sample = UniformSample(100)
3768+ histogram = HistogramMetricReporter(sample)
3769+
3770+ self.assertEqual(histogram.count, 0, 'Should have a count of 0')
3771+ self.assertEqual(histogram.max(), 0,
3772+ 'Should have a max of 0')
3773+ self.assertEqual(histogram.min(), 0,
3774+ 'Should have a min of 0')
3775+
3776+ self.assertEqual(histogram.mean(), 0,
3777+ 'Should have a mean of 0')
3778+
3779+ self.assertEqual(histogram.std_dev(), 0,
3780+ 'Should have a standard deviation of 0')
3781+
3782+ percentiles = histogram.percentiles(0.5, 0.75, 0.99)
3783+ self.assertTrue(
3784+ (math.fabs(percentiles[0] - 0) < 0.01),
3785+ 'Should calculate percentiles')
3786+ self.assertTrue(
3787+ (math.fabs(percentiles[1] - 0) < 0.01),
3788+ 'Should calculate percentiles')
3789+ self.assertTrue(
3790+ (math.fabs(percentiles[2] - 0) < 0.01),
3791+ 'Should calculate percentiles')
3792+
3793+ self.assertEqual(len(histogram.get_values()), 0,
3794+ 'Should have no values')
3795+
3796+ def test_histogram_of_numbers_1_through_10000(self):
3797+ sample = UniformSample(100000)
3798+ histogram = HistogramMetricReporter(sample)
3799+ for i in range(1, 10001):
3800+ histogram.update(i)
3801+
3802+ self.assertEqual(histogram.count, 10000,
3803+ 'Should have a count of 10000')
3804+
3805+ self.assertEqual(histogram.max(), 10000,
3806+ 'Should have a max of 10000')
3807+ self.assertEqual(histogram.min(), 1,
3808+ 'Should have a min of 1')
3809+
3810+ self.assertTrue(
3811+ (math.fabs(histogram.mean() - 5000.5) < 0.01),
3812+ 'Should have a mean value of 5000.5')
3813+
3814+ self.assertTrue(
3815+ (math.fabs(histogram.std_dev() - 2886.89) < 0.01),
3816+ 'Should have a standard deviation of X')
3817+
3818+ percentiles = histogram.percentiles(0.5, 0.75, 0.99)
3819+ self.assertTrue(
3820+ (math.fabs(percentiles[0] - 5000.5) < 0.01),
3821+ 'Should calculate percentiles')
3822+ self.assertTrue(
3823+ (math.fabs(percentiles[1] - 7500.75) < 0.01),
3824+ 'Should calculate percentiles')
3825+ self.assertTrue(
3826+ (math.fabs(percentiles[2] - 9900.99) < 0.01),
3827+ 'Should calculate percentiles')
3828+
3829+ values = [i for i in range(1, 10001)]
3830+ self.assertEqual(histogram.get_values(), values,
3831+ 'Should have 10000 values')
3832+
3833+ def test_histogram_histogram(self):
3834+ sample = UniformSample(100000)
3835+ histogram = HistogramMetricReporter(sample)
3836+ for i in range(1001, 11001):
3837+ histogram.update(i)
3838+
3839+ hist = histogram.histogram()
3840+ self.assertEquals(sum(hist), 10000)
3841+
3842+ total = sum(hist)
3843+ binsize = int(total / len(hist))
3844+ for i in hist:
3845+ self.assertTrue(abs(i - binsize) <= 1)
3846+
3847
3848=== added file 'txstatsd/tests/metrics/test_timermetric.py'
3849--- txstatsd/tests/metrics/test_timermetric.py 1970-01-01 00:00:00 +0000
3850+++ txstatsd/tests/metrics/test_timermetric.py 2012-05-14 16:57:19 +0000
3851@@ -0,0 +1,139 @@
3852+
3853+import math
3854+
3855+from twisted.trial.unittest import TestCase
3856+
3857+from txstatsd.metrics.timermetric import TimerMetricReporter
3858+
3859+
3860+class TestBlankTimerMetric(TestCase):
3861+ def setUp(self):
3862+ self.timer = TimerMetricReporter('test')
3863+ self.timer.tick()
3864+
3865+ def test_max(self):
3866+ self.assertEqual(
3867+ self.timer.max(), 0,
3868+ 'Should have a max of zero')
3869+
3870+ def test_min(self):
3871+ self.assertEqual(
3872+ self.timer.min(), 0,
3873+ 'Should have a min of zero')
3874+
3875+ def test_mean(self):
3876+ self.assertEqual(
3877+ self.timer.max(), 0,
3878+ 'Should have a mean of zero')
3879+
3880+ def test_count(self):
3881+ self.assertEqual(
3882+ self.timer.count(), 0,
3883+ 'Should have a count of zero')
3884+
3885+ def test_std_dev(self):
3886+ self.assertEqual(
3887+ self.timer.std_dev(), 0,
3888+ 'Should have a standard deviation of zero')
3889+
3890+ def test_percentiles(self):
3891+ percentiles = self.timer.percentiles(0.5, 0.95, 0.98, 0.99, 0.999)
3892+ self.assertEqual(
3893+ percentiles[0], 0,
3894+ 'Should have median of zero')
3895+ self.assertEqual(
3896+ percentiles[1], 0,
3897+ 'Should have p95 of zero')
3898+ self.assertEqual(
3899+ percentiles[2], 0,
3900+ 'Should have p98 of zero')
3901+ self.assertEqual(
3902+ percentiles[3], 0,
3903+ 'Should have p99 of zero')
3904+ self.assertEqual(
3905+ percentiles[4], 0,
3906+ 'Should have p99.9 of zero')
3907+
3908+ def test_mean_rate(self):
3909+ self.assertEqual(
3910+ self.timer.mean_rate(), 0,
3911+ 'Should have a mean rate of zero')
3912+
3913+ def test_one_minute_rate(self):
3914+ self.assertEqual(
3915+ self.timer.one_minute_rate(), 0,
3916+ 'Should have a one-minute rate of zero`')
3917+
3918+ def test_five_minute_rate(self):
3919+ self.assertEqual(
3920+ self.timer.five_minute_rate(), 0,
3921+ 'Should have a five-minute rate of zero')
3922+
3923+ def test_fifteen_minute_rate(self):
3924+ self.assertEqual(
3925+ self.timer.fifteen_minute_rate(), 0,
3926+ 'Should have a fifteen-minute rate of zero')
3927+
3928+ def test_no_values(self):
3929+ self.assertEqual(
3930+ len(self.timer.get_values()), 0,
3931+ 'Should have no values')
3932+
3933+
3934+class TestTimingSeriesEvents(TestCase):
3935+ def setUp(self):
3936+ self.timer = TimerMetricReporter('test')
3937+ self.timer.tick()
3938+ self.timer.update(10)
3939+ self.timer.update(20)
3940+ self.timer.update(20)
3941+ self.timer.update(30)
3942+ self.timer.update(40)
3943+
3944+ def test_count(self):
3945+ self.assertEqual(
3946+ self.timer.count(), 5,
3947+ 'Should record the count')
3948+
3949+ def test_min(self):
3950+ self.assertTrue(
3951+ (math.fabs(self.timer.min() - 10.0) < 0.001),
3952+ 'Should calculate the minimum duration')
3953+
3954+ def test_max(self):
3955+ self.assertTrue(
3956+ (math.fabs(self.timer.max() - 40.0) < 0.001),
3957+ 'Should calculate the maximum duration')
3958+
3959+ def test_mean(self):
3960+ self.assertTrue(
3961+ (math.fabs(self.timer.mean() - 24.0) < 0.001),
3962+ 'Should calculate the mean duration')
3963+
3964+ def test_std_dev(self):
3965+ self.assertTrue(
3966+ (math.fabs(self.timer.std_dev() - 11.401) < 0.001),
3967+ 'Should calculate the standard deviation')
3968+
3969+ def test_percentiles(self):
3970+ percentiles = self.timer.percentiles(0.5, 0.95, 0.98, 0.99, 0.999)
3971+ self.assertTrue(
3972+ (math.fabs(percentiles[0] - 20.0) < 0.001),
3973+ 'Should calculate the median')
3974+ self.assertTrue(
3975+ (math.fabs(percentiles[1] - 40.0) < 0.001),
3976+ 'Should calculate the p95')
3977+ self.assertTrue(
3978+ (math.fabs(percentiles[2] - 40.0) < 0.001),
3979+ 'Should calculate the p98')
3980+ self.assertTrue(
3981+ (math.fabs(percentiles[3] - 40.0) < 0.001),
3982+ 'Should calculate the p99')
3983+ self.assertTrue(
3984+ (math.fabs(percentiles[4] - 40.0) < 0.001),
3985+ 'Should calculate the p999')
3986+
3987+ def test_values(self):
3988+ self.assertEqual(
3989+ set(self.timer.get_values()), set([10, 20, 20, 30, 40]),
3990+ 'Should have a series of values')
3991
3992=== added directory 'txstatsd/tests/stats'
3993=== added file 'txstatsd/tests/stats/__init__.py'
3994=== added file 'txstatsd/tests/stats/test_ewma.py'
3995--- txstatsd/tests/stats/test_ewma.py 1970-01-01 00:00:00 +0000
3996+++ txstatsd/tests/stats/test_ewma.py 2012-05-14 16:57:19 +0000
3997@@ -0,0 +1,315 @@
3998+
3999+import math
4000+from unittest import TestCase
4001+
4002+from txstatsd.stats.ewma import Ewma
4003+
4004+
4005+def mark_minutes(minutes, ewma):
4006+ for i in range(1, minutes * 60, 5):
4007+ ewma.tick()
4008+
4009+class TestEwmaOneMinute(TestCase):
4010+ def setUp(self):
4011+ self.ewma = Ewma.one_minute_ewma()
4012+ self.ewma.update(3)
4013+ self.ewma.tick()
4014+
4015+ def test_first_tick(self):
4016+ self.assertTrue(
4017+ (math.fabs(self.ewma.rate - 0.6) < 0.000001),
4018+ 'Should have a rate of 0.6 events/sec after the first tick')
4019+
4020+ def test_one_minute(self):
4021+ mark_minutes(1, self.ewma)
4022+ self.assertTrue(
4023+ (math.fabs(self.ewma.rate - 0.22072766) < 0.00000001),
4024+ 'Should have a rate of 0.22072766 events/sec after 1 minute')
4025+
4026+ def test_two_minutes(self):
4027+ mark_minutes(2, self.ewma)
4028+ self.assertTrue(
4029+ (math.fabs(self.ewma.rate - 0.08120117) < 0.00000001),
4030+ 'Should have a rate of 0.08120117 events/sec after 2 minutes')
4031+
4032+ def test_three_minutes(self):
4033+ mark_minutes(3, self.ewma)
4034+ self.assertTrue(
4035+ (math.fabs(self.ewma.rate - 0.02987224) < 0.00000001),
4036+ 'Should have a rate of 0.02987224 events/sec after 3 minutes')
4037+
4038+ def test_four_minutes(self):
4039+ mark_minutes(4, self.ewma)
4040+ self.assertTrue(
4041+ (math.fabs(self.ewma.rate - 0.01098938) < 0.00000001),
4042+ 'Should have a rate of 0.01098938 events/sec after 4 minutes')
4043+
4044+ def test_five_minutes(self):
4045+ mark_minutes(5, self.ewma)
4046+ self.assertTrue(
4047+ (math.fabs(self.ewma.rate - 0.00404277) < 0.00000001),
4048+ 'Should have a rate of 0.00404277 events/sec after 5 minutes')
4049+
4050+ def test_six_minutes(self):
4051+ mark_minutes(6, self.ewma)
4052+ self.assertTrue(
4053+ (math.fabs(self.ewma.rate - 0.00148725) < 0.00000001),
4054+ 'Should have a rate of 0.00148725 events/sec after 6 minutes')
4055+
4056+ def test_seven_minutes(self):
4057+ mark_minutes(7, self.ewma)
4058+ self.assertTrue(
4059+ (math.fabs(self.ewma.rate - 0.00054713) < 0.00000001),
4060+ 'Should have a rate of 0.00054713 events/sec after 7 minutes')
4061+
4062+ def test_eight_minutes(self):
4063+ mark_minutes(8, self.ewma)
4064+ self.assertTrue(
4065+ (math.fabs(self.ewma.rate - 0.00020128) < 0.00000001),
4066+ 'Should have a rate of 0.00020128 events/sec after 8 minutes')
4067+
4068+ def test_nine_minutes(self):
4069+ mark_minutes(9, self.ewma)
4070+ self.assertTrue(
4071+ (math.fabs(self.ewma.rate - 0.00007405) < 0.00000001),
4072+ 'Should have a rate of 0.00007405 events/sec after 9 minutes')
4073+
4074+ def test_ten_minutes(self):
4075+ mark_minutes(10, self.ewma)
4076+ self.assertTrue(
4077+ (math.fabs(self.ewma.rate - 0.00002724) < 0.00000001),
4078+ 'Should have a rate of 0.00002724 events/sec after 10 minutes')
4079+
4080+ def test_eleven_minutes(self):
4081+ mark_minutes(11, self.ewma)
4082+ self.assertTrue(
4083+ (math.fabs(self.ewma.rate - 0.00001002) < 0.00000001),
4084+ 'Should have a rate of 0.00001002 events/sec after 11 minutes')
4085+
4086+ def test_twelve_minutes(self):
4087+ mark_minutes(12, self.ewma)
4088+ self.assertTrue(
4089+ (math.fabs(self.ewma.rate - 0.00000369) < 0.00000001),
4090+ 'Should have a rate of 0.00000369 events/sec after 12 minutes')
4091+
4092+ def test_thirteen_minutes(self):
4093+ mark_minutes(13, self.ewma)
4094+ self.assertTrue(
4095+ (math.fabs(self.ewma.rate - 0.00000136) < 0.00000001),
4096+ 'Should have a rate of 0.00000136 events/sec after 13 minutes')
4097+
4098+ def test_fourteen_minutes(self):
4099+ mark_minutes(14, self.ewma)
4100+ self.assertTrue(
4101+ (math.fabs(self.ewma.rate - 0.00000050) < 0.00000001),
4102+ 'Should have a rate of 0.00000050 events/sec after 14 minutes')
4103+
4104+ def test_fifteen_minutes(self):
4105+ mark_minutes(15, self.ewma)
4106+ self.assertTrue(
4107+ (math.fabs(self.ewma.rate - 0.00000018) < 0.00000001),
4108+ 'Should have a rate of 0.00000018 events/sec after 15 minutes')
4109+
4110+
4111+class TestEwmaFiveMinute(TestCase):
4112+ def setUp(self):
4113+ self.ewma = Ewma.five_minute_ewma()
4114+ self.ewma.update(3)
4115+ self.ewma.tick()
4116+
4117+ def test_first_tick(self):
4118+ self.assertTrue(
4119+ (math.fabs(self.ewma.rate - 0.6) < 0.000001),
4120+ 'Should have a rate of 0.6 events/sec after the first tick')
4121+
4122+ def test_one_minute(self):
4123+ mark_minutes(1, self.ewma)
4124+ self.assertTrue(
4125+ (math.fabs(self.ewma.rate - 0.49123845) < 0.00000001),
4126+ 'Should have a rate of 0.49123845 events/sec after 1 minute')
4127+
4128+ def test_two_minutes(self):
4129+ mark_minutes(2, self.ewma)
4130+ self.assertTrue(
4131+ (math.fabs(self.ewma.rate - 0.40219203) < 0.00000001),
4132+ 'Should have a rate of 0.40219203 events/sec after 2 minutes')
4133+
4134+ def test_three_minutes(self):
4135+ mark_minutes(3, self.ewma)
4136+ self.assertTrue(
4137+ (math.fabs(self.ewma.rate - 0.32928698) < 0.00000001),
4138+ 'Should have a rate of 0.32928698 events/sec after 3 minutes')
4139+
4140+ def test_four_minutes(self):
4141+ mark_minutes(4, self.ewma)
4142+ self.assertTrue(
4143+ (math.fabs(self.ewma.rate - 0.26959738) < 0.00000001),
4144+ 'Should have a rate of 0.26959738 events/sec after 4 minutes')
4145+
4146+ def test_five_minutes(self):
4147+ mark_minutes(5, self.ewma)
4148+ self.assertTrue(
4149+ (math.fabs(self.ewma.rate - 0.22072766) < 0.00000001),
4150+ 'Should have a rate of 0.22072766 events/sec after 5 minutes')
4151+
4152+ def test_six_minutes(self):
4153+ mark_minutes(6, self.ewma)
4154+ self.assertTrue(
4155+ (math.fabs(self.ewma.rate - 0.18071653) < 0.00000001),
4156+ 'Should have a rate of 0.18071653 events/sec after 6 minutes')
4157+
4158+ def test_seven_minutes(self):
4159+ mark_minutes(7, self.ewma)
4160+ self.assertTrue(
4161+ (math.fabs(self.ewma.rate - 0.14795818) < 0.00000001),
4162+ 'Should have a rate of 0.14795818 events/sec after 7 minutes')
4163+
4164+ def test_eight_minutes(self):
4165+ mark_minutes(8, self.ewma)
4166+ self.assertTrue(
4167+ (math.fabs(self.ewma.rate - 0.12113791) < 0.00000001),
4168+ 'Should have a rate of 0.12113791 events/sec after 8 minutes')
4169+
4170+ def test_nine_minutes(self):
4171+ mark_minutes(9, self.ewma)
4172+ self.assertTrue(
4173+ (math.fabs(self.ewma.rate - 0.09917933) < 0.00000001),
4174+ 'Should have a rate of 0.09917933 events/sec after 9 minutes')
4175+
4176+ def test_ten_minutes(self):
4177+ mark_minutes(10, self.ewma)
4178+ self.assertTrue(
4179+ (math.fabs(self.ewma.rate - 0.08120117) < 0.00000001),
4180+ 'Should have a rate of 0.08120117 events/sec after 10 minutes')
4181+
4182+ def test_eleven_minutes(self):
4183+ mark_minutes(11, self.ewma)
4184+ self.assertTrue(
4185+ (math.fabs(self.ewma.rate - 0.06648190) < 0.00000001),
4186+ 'Should have a rate of 0.06648190 events/sec after 11 minutes')
4187+
4188+ def test_twelve_minutes(self):
4189+ mark_minutes(12, self.ewma)
4190+ self.assertTrue(
4191+ (math.fabs(self.ewma.rate - 0.05443077) < 0.00000001),
4192+ 'Should have a rate of 0.05443077 events/sec after 12 minutes')
4193+
4194+ def test_thirteen_minutes(self):
4195+ mark_minutes(13, self.ewma)
4196+ self.assertTrue(
4197+ (math.fabs(self.ewma.rate - 0.04456415) < 0.00000001),
4198+ 'Should have a rate of 0.04456415 events/sec after 13 minutes')
4199+
4200+ def test_fourteen_minutes(self):
4201+ mark_minutes(14, self.ewma)
4202+ self.assertTrue(
4203+ (math.fabs(self.ewma.rate - 0.03648604) < 0.00000001),
4204+ 'Should have a rate of 0.03648604 events/sec after 14 minutes')
4205+
4206+ def test_fifteen_minutes(self):
4207+ mark_minutes(15, self.ewma)
4208+ self.assertTrue(
4209+ (math.fabs(self.ewma.rate - 0.02987224) < 0.00000001),
4210+ 'Should have a rate of 0.02987224 events/sec after 15 minutes')
4211+
4212+
4213+class TestEwmaFifteenMinute(TestCase):
4214+ def setUp(self):
4215+ self.ewma = Ewma.fifteen_minute_ewma()
4216+ self.ewma.update(3)
4217+ self.ewma.tick()
4218+
4219+ def test_first_tick(self):
4220+ self.assertTrue(
4221+ (math.fabs(self.ewma.rate - 0.6) < 0.000001),
4222+ 'Should have a rate of 0.6 events/sec after the first tick')
4223+
4224+ def test_one_minute(self):
4225+ mark_minutes(1, self.ewma)
4226+ self.assertTrue(
4227+ (math.fabs(self.ewma.rate - 0.56130419) < 0.00000001),
4228+ 'Should have a rate of 0.56130419 events/sec after 1 minute')
4229+
4230+ def test_two_minutes(self):
4231+ mark_minutes(2, self.ewma)
4232+ self.assertTrue(
4233+ (math.fabs(self.ewma.rate - 0.52510399) < 0.00000001),
4234+ 'Should have a rate of 0.52510399 events/sec after 2 minutes')
4235+
4236+ def test_three_minutes(self):
4237+ mark_minutes(3, self.ewma)
4238+ self.assertTrue(
4239+ (math.fabs(self.ewma.rate - 0.49123845) < 0.00000001),
4240+ 'Should have a rate of 0.49123845 events/sec after 3 minutes')
4241+
4242+ def test_four_minutes(self):
4243+ mark_minutes(4, self.ewma)
4244+ self.assertTrue(
4245+ (math.fabs(self.ewma.rate - 0.45955700) < 0.00000001),
4246+ 'Should have a rate of 0.45955700 events/sec after 4 minutes')
4247+
4248+ def test_five_minutes(self):
4249+ mark_minutes(5, self.ewma)
4250+ self.assertTrue(
4251+ (math.fabs(self.ewma.rate - 0.42991879) < 0.00000001),
4252+ 'Should have a rate of 0.42991879 events/sec after 5 minutes')
4253+
4254+ def test_six_minutes(self):
4255+ mark_minutes(6, self.ewma)
4256+ self.assertTrue(
4257+ (math.fabs(self.ewma.rate - 0.40219203) < 0.00000001),
4258+ 'Should have a rate of 0.40219203 events/sec after 6 minutes')
4259+
4260+ def test_seven_minutes(self):
4261+ mark_minutes(7, self.ewma)
4262+ self.assertTrue(
4263+ (math.fabs(self.ewma.rate - 0.37625345) < 0.00000001),
4264+ 'Should have a rate of 0.37625345 events/sec after 7 minutes')
4265+
4266+ def test_eight_minutes(self):
4267+ mark_minutes(8, self.ewma)
4268+ self.assertTrue(
4269+ (math.fabs(self.ewma.rate - 0.35198773) < 0.00000001),
4270+ 'Should have a rate of 0.35198773 events/sec after 8 minutes')
4271+
4272+ def test_nine_minutes(self):
4273+ mark_minutes(9, self.ewma)
4274+ self.assertTrue(
4275+ (math.fabs(self.ewma.rate - 0.32928698) < 0.00000001),
4276+ 'Should have a rate of 0.32928698 events/sec after 9 minutes')
4277+
4278+ def test_ten_minutes(self):
4279+ mark_minutes(10, self.ewma)
4280+ self.assertTrue(
4281+ (math.fabs(self.ewma.rate - 0.30805027) < 0.00000001),
4282+ 'Should have a rate of 0.30805027 events/sec after 10 minutes')
4283+
4284+ def test_eleven_minutes(self):
4285+ mark_minutes(11, self.ewma)
4286+ self.assertTrue(
4287+ (math.fabs(self.ewma.rate - 0.28818318) < 0.00000001),
4288+ 'Should have a rate of 0.28818318 events/sec after 11 minutes')
4289+
4290+ def test_twelve_minutes(self):
4291+ mark_minutes(12, self.ewma)
4292+ self.assertTrue(
4293+ (math.fabs(self.ewma.rate - 0.26959738) < 0.00000001),
4294+ 'Should have a rate of 0.26959738 events/sec after 12 minutes')
4295+
4296+ def test_thirteen_minutes(self):
4297+ mark_minutes(13, self.ewma)
4298+ self.assertTrue(
4299+ (math.fabs(self.ewma.rate - 0.25221023) < 0.00000001),
4300+ 'Should have a rate of 0.25221023 events/sec after 13 minutes')
4301+
4302+ def test_fourteen_minutes(self):
4303+ mark_minutes(14, self.ewma)
4304+ self.assertTrue(
4305+ (math.fabs(self.ewma.rate - 0.23594443) < 0.00000001),
4306+ 'Should have a rate of 0.23594443 events/sec after 14 minutes')
4307+
4308+ def test_fifteen_minutes(self):
4309+ mark_minutes(15, self.ewma)
4310+ self.assertTrue(
4311+ (math.fabs(self.ewma.rate - 0.22072766) < 0.00000001),
4312+ 'Should have a rate of 0.22072766 events/sec after 15 minutes')
4313
4314=== added file 'txstatsd/tests/stats/test_exponentiallydecayingsample.py'
4315--- txstatsd/tests/stats/test_exponentiallydecayingsample.py 1970-01-01 00:00:00 +0000
4316+++ txstatsd/tests/stats/test_exponentiallydecayingsample.py 2012-05-14 16:57:19 +0000
4317@@ -0,0 +1,84 @@
4318+import random
4319+
4320+from twisted.trial.unittest import TestCase
4321+
4322+from txstatsd.stats.exponentiallydecayingsample import (
4323+ ExponentiallyDecayingSample)
4324+
4325+
4326+class TestExponentiallyDecayingSample(TestCase):
4327+
4328+ def test_100_out_of_1000_elements(self):
4329+ population = [i for i in range(0, 100)]
4330+ sample = ExponentiallyDecayingSample(1000, 0.99)
4331+ for i in population:
4332+ sample.update(i)
4333+
4334+ self.assertEqual(sample.size(), 100, 'Should have 100 elements')
4335+ self.assertEqual(len(sample.get_values()), 100,
4336+ 'Should have 100 elements')
4337+ self.assertEqual(
4338+ len(set(sample.get_values()).difference(set(population))), 0,
4339+ 'Should only have elements from the population')
4340+
4341+ def test_100_out_of_10_elements(self):
4342+ population = [i for i in range(0, 10)]
4343+ sample = ExponentiallyDecayingSample(100, 0.99)
4344+ for i in population:
4345+ sample.update(i)
4346+
4347+ self.assertEqual(sample.size(), 10)
4348+ self.assertEqual(len(sample.get_values()), 10,
4349+ 'Should have 10 elements')
4350+ self.assertEqual(
4351+ len(set(sample.get_values()).difference(set(population))), 0,
4352+ 'Should only have elements from the population')
4353+
4354+ def test_heavily_biased_100_out_of_1000_elements(self):
4355+ population = [i for i in range(0, 100)]
4356+ sample = ExponentiallyDecayingSample(1000, 0.01)
4357+ for i in population:
4358+ sample.update(i)
4359+
4360+ self.assertEqual(sample.size(), 100, 'Should have 100 elements')
4361+ self.assertEqual(len(sample.get_values()), 100,
4362+ 'Should have 100 elements')
4363+
4364+ self.assertEqual(
4365+ len(set(sample.get_values()).difference(set(population))), 0,
4366+ 'Should only have elements from the population')
4367+
4368+ def test_ewma_sample_load(self):
4369+
4370+ _time = [10000]
4371+
4372+ def wtime():
4373+ return _time[0]
4374+
4375+ sample = ExponentiallyDecayingSample(100, 0.99, wall_time=wtime)
4376+ sample.RESCALE_THRESHOLD = 100
4377+ sample.clear()
4378+ for i in xrange(10000000):
4379+ sample.update(random.normalvariate(0, 10))
4380+ _time[0] += 1
4381+
4382+ self.assertEqual(sample.size(), 100)
4383+ self.assertEqual(len(sample.get_values()), 100,
4384+ 'Should have 100 elements')
4385+ test_ewma_sample_load.skip = "takes too long to run"
4386+
4387+ def test_ewma_overflow(self):
4388+ """Long pauses on metric input should not overflow weight."""
4389+ _time = [10000]
4390+
4391+ def wtime():
4392+ return _time[0]
4393+
4394+ sample = ExponentiallyDecayingSample(100, 0.99, wall_time=wtime)
4395+ for i in xrange(100):
4396+ sample.update(random.normalvariate(0, 10))
4397+ _time[0] += 10000
4398+
4399+ self.assertEqual(sample.size(), 100)
4400+ self.assertEqual(len(sample.get_values()), 100,
4401+ 'Should have 100 elements')
4402
4403=== added file 'txstatsd/tests/stats/test_uniformsample.py'
4404--- txstatsd/tests/stats/test_uniformsample.py 1970-01-01 00:00:00 +0000
4405+++ txstatsd/tests/stats/test_uniformsample.py 2012-05-14 16:57:19 +0000
4406@@ -0,0 +1,32 @@
4407+
4408+from unittest import TestCase
4409+
4410+from txstatsd.stats.uniformsample import UniformSample
4411+
4412+
4413+class TestUniformSample(TestCase):
4414+ def test_100_out_of_1000_elements(self):
4415+ population = [i for i in range(0, 1000)]
4416+ sample = UniformSample(100)
4417+ for i in population:
4418+ sample.update(i)
4419+
4420+ self.assertEqual(sample.size(), 100, 'Should have 100 elements')
4421+ self.assertEqual(len(sample.get_values()), 100,
4422+ 'Should have 100 elements')
4423+ self.assertEqual(
4424+ len(set(sample.get_values()).difference(set(population))), 0,
4425+ 'Should only have elements from the population')
4426+
4427+ def test_100_out_of_10_elements(self):
4428+ population = [i for i in range(0, 10)]
4429+ sample = UniformSample(100)
4430+ for i in population:
4431+ sample.update(i)
4432+
4433+ self.assertEqual(sample.size(), 10, 'Should have 10 elements')
4434+ self.assertEqual(len(sample.get_values()), 10,
4435+ 'Should have 10 elements')
4436+ self.assertEqual(
4437+ len(set(sample.get_values()).difference(set(population))), 0,
4438+ 'Should only have elements from the population')
4439
4440=== added file 'txstatsd/tests/test_client.py'
4441--- txstatsd/tests/test_client.py 1970-01-01 00:00:00 +0000
4442+++ txstatsd/tests/test_client.py 2012-05-14 16:57:19 +0000
4443@@ -0,0 +1,93 @@
4444+"""Tests for the various client classes."""
4445+
4446+from twisted.internet import reactor
4447+from twisted.internet.defer import inlineCallbacks, Deferred
4448+from twisted.python import log
4449+from twisted.trial.unittest import TestCase
4450+
4451+from txstatsd.client import (
4452+ StatsDClientProtocol, TwistedStatsDClient, UdpStatsDClient)
4453+
4454+
4455+class TestClient(TestCase):
4456+
4457+ def setUp(self):
4458+ super(TestClient, self).setUp()
4459+ self.client = None
4460+ self.exception = None
4461+
4462+ def tearDown(self):
4463+ if self.client:
4464+ self.client.transport.stopListening()
4465+ super(TestClient, self).tearDown()
4466+
4467+ def test_twistedstatsd_write_with_wellformed_address(self):
4468+ self.client = TwistedStatsDClient('127.0.0.1', 8000)
4469+ protocol = StatsDClientProtocol(self.client)
4470+ reactor.listenUDP(0, protocol)
4471+
4472+ def ensure_bytes_sent(bytes_sent):
4473+ self.assertEqual(bytes_sent, len('message'))
4474+
4475+ def exercise(callback):
4476+ self.client.write('message', callback=callback)
4477+
4478+ d = Deferred()
4479+ d.addCallback(ensure_bytes_sent)
4480+ reactor.callWhenRunning(exercise, d.callback)
4481+ return d
4482+
4483+ @inlineCallbacks
4484+ def test_twistedstatsd_with_malformed_address_and_errback(self):
4485+ def ensure_exception_raised(ignore):
4486+ self.assertTrue(self.exception.startswith("DNS lookup failed"))
4487+
4488+ def capture_exception_raised(failure):
4489+ self.exception = failure.getErrorMessage()
4490+
4491+ yield TwistedStatsDClient(
4492+ '256.0.0.0', 1,
4493+ resolver_errback=capture_exception_raised)
4494+
4495+ d = Deferred()
4496+ d.addCallback(ensure_exception_raised)
4497+ reactor.callLater(.5, d.callback, None)
4498+ yield d
4499+
4500+ @inlineCallbacks
4501+ def test_twistedstatsd_with_malformed_address_and_no_errback(self):
4502+ def ensure_exception_raised(ignore):
4503+ self.assertTrue(self.exception.startswith("DNS lookup failed"))
4504+
4505+ def capture_exception_raised(failure):
4506+ self.exception = failure.getErrorMessage()
4507+
4508+ self.patch(log, "err", capture_exception_raised)
4509+
4510+ yield TwistedStatsDClient('256.0.0.0', 1)
4511+
4512+ d = Deferred()
4513+ d.addCallback(ensure_exception_raised)
4514+ reactor.callLater(.5, d.callback, None)
4515+ yield d
4516+
4517+ def test_udpstatsd_wellformed_address(self):
4518+ client = UdpStatsDClient('localhost', 8000)
4519+ self.assertEqual(client.host, '127.0.0.1')
4520+ client = UdpStatsDClient(None, None)
4521+ self.assertEqual(client.host, None)
4522+
4523+ def test_udpstatsd_malformed_address(self):
4524+ self.assertRaises(ValueError,
4525+ UdpStatsDClient, 'localhost', -1)
4526+ self.assertRaises(ValueError,
4527+ UdpStatsDClient, 'localhost', 'malformed')
4528+ self.assertRaises(ValueError,
4529+ UdpStatsDClient, 0, 8000)
4530+
4531+ def test_udpstatsd_socket_nonblocking(self):
4532+ client = UdpStatsDClient('localhost', 8000)
4533+ client.connect()
4534+ # According to the python docs (and the source, I've checked)
4535+ # setblocking(0) is the same as settimeout(0.0).
4536+ self.assertEqual(client.socket.gettimeout(), 0.0)
4537
4538=== added file 'txstatsd/tests/test_configurableprocessor.py'
4539--- txstatsd/tests/test_configurableprocessor.py 1970-01-01 00:00:00 +0000
4540+++ txstatsd/tests/test_configurableprocessor.py 2012-05-14 16:57:19 +0000
4541@@ -0,0 +1,152 @@
4542+import time
4543+
4544+from unittest import TestCase
4545+
4546+from twisted.plugins.distinct_plugin import distinct_metric_factory
4547+
4548+from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor
4549+
4550+class FlushMessagesTest(TestCase):
4551+
4552+ def test_flush_counter_with_empty_prefix(self):
4553+ """
4554+ Ensure no prefix features if none is supplied.
4555+ B{Note}: The C{ConfigurableMessageProcessor} reports
4556+ the counter value, and not the normalized version as
4557+ seen in the StatsD-compliant C{Processor}.
4558+ """
4559+ configurable_processor = ConfigurableMessageProcessor(
4560+ time_function=lambda: 42)
4561+ configurable_processor.process("gorets:17|c")
4562+ messages = configurable_processor.flush()
4563+ self.assertEqual(("gorets.count", 17, 42), messages[0])
4564+ self.assertEqual(("statsd.numStats", 1, 42), messages[1])
4565+
4566+ def test_flush_counter_with_prefix(self):
4567+ """
4568+ Ensure the prefix features if one is supplied.
4569+ """
4570+ configurable_processor = ConfigurableMessageProcessor(
4571+ time_function=lambda: 42, message_prefix="test.metric")
4572+ configurable_processor.process("gorets:17|c")
4573+ messages = configurable_processor.flush()
4574+ self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0])
4575+ self.assertEqual(("test.metric.statsd.numStats", 1, 42),
4576+ messages[1])
4577+
4578+ def test_flush_counter_with_internal_prefix(self):
4579+ """
4580+ Ensure the prefix features if one is supplied.
4581+ """
4582+ configurable_processor = ConfigurableMessageProcessor(
4583+ time_function=lambda: 42, message_prefix="test.metric",
4584+ internal_metrics_prefix="statsd.foo.")
4585+ configurable_processor.process("gorets:17|c")
4586+ messages = configurable_processor.flush()
4587+ self.assertEqual(("test.metric.gorets.count", 17, 42), messages[0])
4588+ self.assertEqual(("statsd.foo.numStats", 1, 42),
4589+ messages[1])
4590+
4591+ def test_flush_plugin(self):
4592+ """
4593+ Ensure the prefix features if one is supplied.
4594+ """
4595+ configurable_processor = ConfigurableMessageProcessor(
4596+ time_function=lambda: 42, message_prefix="test.metric",
4597+ plugins=[distinct_metric_factory])
4598+ configurable_processor.process("gorets:17|pd")
4599+ messages = configurable_processor.flush()
4600+ self.assertEquals(("test.metric.gorets.count", 1, 42), messages[0])
4601+
4602+ def test_flush_single_timer_single_time(self):
4603+ """
4604+ If a single timer with a single data point is present, all
4605+ percentiles will be set to the same value.
4606+ """
4607+ configurable_processor = ConfigurableMessageProcessor(
4608+ time_function=lambda: 42)
4609+
4610+ configurable_processor.process("glork:24|ms")
4611+ messages = configurable_processor.flush()
4612+
4613+ self.assertEqual(('glork.15min_rate', 0.0, 42), messages[0])
4614+ self.assertEqual(('glork.1min_rate', 0.0, 42), messages[1])
4615+ self.assertEqual(('glork.5min_rate', 0.0, 42), messages[2])
4616+ self.assertEqual(("glork.999percentile", 24.0, 42), messages[3])
4617+ self.assertEqual(("glork.99percentile", 24.0, 42), messages[4])
4618+ self.assertEqual(("glork.count", 1., 42), messages[5])
4619+ self.assertEqual(("glork.max", 24.0, 42), messages[6])
4620+ self.assertEqual(("glork.mean", 24.0, 42), messages[7])
4621+ self.assertEqual(("glork.min", 24.0, 42), messages[8])
4622+ self.assertEqual(("glork.stddev", 0.0, 42), messages[9])
4623+
4624+ def test_flush_single_timer_multiple_times(self):
4625+ """
4626+ Test reporting of multiple timer metric samples.
4627+ """
4628+ configurable_processor = ConfigurableMessageProcessor(
4629+ time_function=lambda: 42)
4630+
4631+ configurable_processor.process("glork:4|ms")
4632+ configurable_processor.update_metrics()
4633+ configurable_processor.process("glork:8|ms")
4634+ configurable_processor.update_metrics()
4635+ configurable_processor.process("glork:15|ms")
4636+ configurable_processor.update_metrics()
4637+ configurable_processor.process("glork:16|ms")
4638+ configurable_processor.update_metrics()
4639+ configurable_processor.process("glork:23|ms")
4640+ configurable_processor.update_metrics()
4641+ configurable_processor.process("glork:42|ms")
4642+ configurable_processor.update_metrics()
4643+
4644+ messages = configurable_processor.flush()
4645+
4646+ self.assertEqual(('glork.15min_rate', 0.20000000000000001, 42),
4647+ messages[0])
4648+ self.assertEqual(('glork.1min_rate', 0.20000000000000001, 42),
4649+ messages[1])
4650+ self.assertEqual(('glork.5min_rate', 0.20000000000000001, 42),
4651+ messages[2])
4652+ self.assertEqual(("glork.999percentile", 42.0, 42), messages[3])
4653+ self.assertEqual(("glork.99percentile", 42.0, 42), messages[4])
4654+ self.assertEqual(('glork.count', 6.0, 42), messages[5])
4655+ self.assertEqual(("glork.max", 42.0, 42), messages[6])
4656+ self.assertEqual(("glork.mean", 18.0, 42), messages[7])
4657+ self.assertEqual(("glork.min", 4.0, 42), messages[8])
4658+ self.assertEqual(("glork.stddev", 13.490738, 42), messages[9])
4659+
4660+
4661+class FlushMeterMetricMessagesTest(TestCase):
4662+
4663+ def setUp(self):
4664+ self.configurable_processor = ConfigurableMessageProcessor(
4665+ time_function=self.wall_clock_time, message_prefix="test.metric")
4666+ self.time_now = int(time.time())
4667+
4668+ def wall_clock_time(self):
4669+ return self.time_now
4670+
4671+ def mark_minutes(self, minutes):
4672+ for i in range(1, minutes * 60, 5):
4673+ self.processor.update_metrics()
4674+
4675+ def test_flush_meter_metric_with_prefix(self):
4676+ """
4677+ Test the correct rendering of the Graphite report for
4678+ a meter metric when a prefix is supplied.
4679+ """
4680+ self.configurable_processor.process("gorets:3.0|m")
4681+
4682+ self.time_now += 1
4683+ messages = self.configurable_processor.flush()
4684+ self.assertEqual(("test.metric.gorets.15min_rate", 0.0, self.time_now),
4685+ messages[0])
4686+ self.assertEqual(("test.metric.gorets.1min_rate", 0.0, self.time_now),
4687+ messages[1])
4688+ self.assertEqual(("test.metric.gorets.5min_rate", 0.0, self.time_now),
4689+ messages[2])
4690+ self.assertEqual(("test.metric.gorets.count", 3.0, self.time_now),
4691+ messages[3])
4692+ self.assertEqual(("test.metric.gorets.mean_rate", 3.0, self.time_now),
4693+ messages[4])
4694
4695=== added file 'txstatsd/tests/test_httpinfo.py'
4696--- txstatsd/tests/test_httpinfo.py 1970-01-01 00:00:00 +0000
4697+++ txstatsd/tests/test_httpinfo.py 2012-05-14 16:57:19 +0000
4698@@ -0,0 +1,128 @@
4699+# -*- coding: utf-8 *-*
4700+import json
4701+
4702+from twisted.trial.unittest import TestCase
4703+
4704+from twisted.internet import reactor, defer, protocol
4705+from twisted.application.service import IService
4706+from twisted.web.client import Agent
4707+
4708+from txstatsd.metrics.timermetric import TimerMetricReporter
4709+from txstatsd.server import httpinfo
4710+from txstatsd import service
4711+
4712+
4713+class Dummy:
4714+ flush_interval = 10
4715+ last_flush_duration = 3
4716+ last_process_duration = 2
4717+
4718+
4719+class ResponseCollector(protocol.Protocol):
4720+
4721+ def __init__(self, finished):
4722+ self.finished = finished
4723+ self.data = []
4724+
4725+ def dataReceived(self, bytes):
4726+ self.data.append(bytes)
4727+
4728+ def connectionLost(self, reason):
4729+ self.finished.callback("".join(self.data))
4730+
4731+
4732+def collect_response(response):
4733+ d = defer.Deferred()
4734+ c = ResponseCollector(d)
4735+ response.deliverBody(c)
4736+ return d
4737+
4738+
4739+class HttpException(Exception):
4740+
4741+ def __init__(self, response):
4742+ super(HttpException, self).__init__(response.phrase)
4743+ self.response = response
4744+
4745+
4746+class ServiceTestsBuilder(TestCase):
4747+
4748+ def setUp(self):
4749+ self.service = None
4750+
4751+ @defer.inlineCallbacks
4752+ def get_results(self, path, **kwargs):
4753+ webport = 12323
4754+ o = service.StatsDOptions()
4755+ o["http-port"] = webport
4756+ d = Dummy()
4757+ d.__dict__.update(kwargs)
4758+ self.service = s = httpinfo.makeService(o, d, d)
4759+ s.startService()
4760+ agent = Agent(reactor)
4761+
4762+ result = yield agent.request('GET',
4763+ 'http://localhost:%s/%s' % (webport, path))
4764+ if result.code != 200:
4765+ raise HttpException(result)
4766+ data = yield collect_response(result)
4767+ defer.returnValue(data)
4768+
4769+ def tearDown(self):
4770+ if self.service is not None:
4771+ self.service.stopService()
4772+
4773+ @defer.inlineCallbacks
4774+ def test_httpinfo_ok(self):
4775+ data = yield self.get_results("status")
4776+ self.assertEquals(json.loads(data)["status"], "OK")
4777+
4778+ @defer.inlineCallbacks
4779+ def test_httpinfo_error(self):
4780+ try:
4781+ data = yield self.get_results("status", last_flush_duration=30)
4782+ except HttpException, e:
4783+ self.assertEquals(e.response.code, 500)
4784+ else:
4785+ self.fail("Not 500")
4786+
4787+ @defer.inlineCallbacks
4788+ def test_httpinfo_timer(self):
4789+ try:
4790+ data = yield self.get_results("metrics/gorets",
4791+ timer_metrics={'gorets': 100})
4792+ except HttpException, e:
4793+ self.assertEquals(e.response.code, 404)
4794+ else:
4795+ self.fail("Not 404")
4796+
4797+ @defer.inlineCallbacks
4798+ def test_httpinfo_timer2(self):
4799+ """Returns the canonical empty histogram without data."""
4800+ tmr = TimerMetricReporter('gorets')
4801+ data = yield self.get_results("metrics/gorets",
4802+ timer_metrics={'gorets': tmr})
4803+ self.assertEquals(json.loads(data)["histogram"], [0.] * 10)
4804+
4805+ @defer.inlineCallbacks
4806+ def test_httpinfo_timer3(self):
4807+ """Returns a valid histogram with data."""
4808+
4809+ tmr = TimerMetricReporter('gorets')
4810+ for i in range(1, 1001):
4811+ tmr.histogram.update(i)
4812+ data = yield self.get_results("metrics/gorets",
4813+ timer_metrics={'gorets': tmr})
4814+ hist = json.loads(data)
4815+ self.assertTrue(isinstance(hist, dict))
4816+ self.assertEquals(sum(hist["histogram"]), 1000)
4817+
4818+ @defer.inlineCallbacks
4819+ def test_httpinfo_fake_plugin(self):
4820+ """Also works for plugins."""
4821+
4822+ tmr = TimerMetricReporter('gorets')
4823+ data = yield self.get_results("metrics/gorets",
4824+ timer_metrics={}, plugin_metrics={'gorets': tmr})
4825+ hist = json.loads(data)
4826+ self.assertTrue(isinstance(hist, dict))
4827
4828=== added file 'txstatsd/tests/test_loggingprocessor.py'
4829--- txstatsd/tests/test_loggingprocessor.py 1970-01-01 00:00:00 +0000
4830+++ txstatsd/tests/test_loggingprocessor.py 2012-05-14 16:57:19 +0000
4831@@ -0,0 +1,64 @@
4832+
4833+from unittest import TestCase
4834+
4835+from twisted.plugins.distinct_plugin import distinct_metric_factory
4836+
4837+from txstatsd.server.loggingprocessor import LoggingMessageProcessor
4838+
4839+
4840+class FakeMeterMetric(object):
4841+ def report(self, *args):
4842+ return [('Sample report', 1, 2)]
4843+
4844+class TestLogger(object):
4845+ def __init__(self):
4846+ self.log = ''
4847+
4848+ def info(self, measurement):
4849+ self.log += measurement + "\n"
4850+
4851+
4852+class TestLoggingMessageProcessor(TestCase):
4853+
4854+ def test_logger_with_no_info(self):
4855+ def invoker():
4856+ logger = 'logger'
4857+ LoggingMessageProcessor(logger)
4858+
4859+ self.assertRaises(TypeError, invoker)
4860+
4861+ def test_logger_with_non_callable_info(self):
4862+ def invoker():
4863+ class Logger(object):
4864+ def __init__(self):
4865+ self.info = 'logger'
4866+
4867+ logger = Logger()
4868+ LoggingMessageProcessor(logger)
4869+
4870+ self.assertRaises(TypeError, invoker)
4871+
4872+ def test_logger(self):
4873+ logger = TestLogger()
4874+ processor = LoggingMessageProcessor(logger)
4875+ metric = FakeMeterMetric()
4876+ processor.meter_metrics['test'] = metric
4877+ processor.flush()
4878+ expected = ["Out: %s %s %s" % message
4879+ for message in metric.report()]
4880+ self.assertFalse(set(expected).difference(logger.log.splitlines()))
4881+
4882+ def test_logger_plugin(self):
4883+ logger = TestLogger()
4884+ processor = LoggingMessageProcessor(
4885+ logger, plugins=[distinct_metric_factory],
4886+ time_function=lambda: 42)
4887+ msg_in = "gorets:17|pd"
4888+ processor.process(msg_in)
4889+ processor.flush()
4890+ messages = processor.plugin_metrics['gorets'].flush(
4891+ 10, processor.time_function())
4892+ expected = ["In: %s" % msg_in]\
4893+ + ["Out: %s %s %s" % message
4894+ for message in messages]
4895+ self.assertFalse(set(expected).difference(logger.log.splitlines()))
4896
4897=== added file 'txstatsd/tests/test_metrics.py'
4898--- txstatsd/tests/test_metrics.py 1970-01-01 00:00:00 +0000
4899+++ txstatsd/tests/test_metrics.py 2012-05-14 16:57:19 +0000
4900@@ -0,0 +1,120 @@
4901+"""Tests for the Metrics convenience class."""
4902+
4903+import re
4904+import time
4905+from unittest import TestCase
4906+from txstatsd.metrics.extendedmetrics import ExtendedMetrics
4907+from txstatsd.metrics.metrics import Metrics
4908+
4909+
4910+class FakeStatsDClient(object):
4911+
4912+ def connect(self):
4913+ """Connect to the StatsD server."""
4914+ pass
4915+
4916+ def disconnect(self):
4917+ """Disconnect from the StatsD server."""
4918+ pass
4919+
4920+ def write(self, data):
4921+ """Send the metric to the StatsD server."""
4922+ self.data = data
4923+
4924+
4925+class TestMetrics(TestCase):
4926+
4927+ def setUp(self):
4928+ self.connection = FakeStatsDClient()
4929+ self.metrics = Metrics(self.connection, 'txstatsd.tests')
4930+
4931+ def test_gauge(self):
4932+ """Test reporting of a gauge metric sample."""
4933+ self.metrics.gauge('gauge', 102)
4934+ self.assertEqual(self.connection.data,
4935+ 'txstatsd.tests.gauge:102|g')
4936+
4937+ def test_meter(self):
4938+ """Test reporting of a meter metric sample."""
4939+ self.metrics.meter('meter', 3)
4940+ self.assertEqual(self.connection.data,
4941+ 'txstatsd.tests.meter:3|m')
4942+
4943+ def test_counter(self):
4944+ """Test the increment and decrement operations."""
4945+ self.metrics.increment('counter', 18)
4946+ self.assertEqual(self.connection.data,
4947+ 'txstatsd.tests.counter:18|c')
4948+ self.metrics.decrement('counter', 9)
4949+ self.assertEqual(self.connection.data,
4950+ 'txstatsd.tests.counter:-9|c')
4951+
4952+ def test_timing(self):
4953+ """Test the timing operation."""
4954+ self.metrics.timing('timing', 101.1234)
4955+ self.assertEqual(self.connection.data,
4956+ 'txstatsd.tests.timing:101123.4|ms')
4957+
4958+ def test_timing_automatic(self):
4959+ """Test the automatic timing operation with explicit reset"""
4960+ start_time = time.time()
4961+
4962+ self.metrics.reset_timing()
4963+ time.sleep(.1)
4964+ self.metrics.timing('timing')
4965+
4966+ elapsed = time.time() - start_time
4967+
4968+ label, val, units = re.split(":|\|", self.connection.data)
4969+ self.assertEqual(label, 'txstatsd.tests.timing')
4970+ self.assertEqual(units, 'ms')
4971+ self.assertTrue(100 <= float(val) <= elapsed * 1000)
4972+
4973+ def test_timing_automatic_implicit_reset(self):
4974+ """Test the automatic timing operation with implicit reset"""
4975+ start_time = time.time()
4976+
4977+ self.metrics.timing('something_else')
4978+ time.sleep(.1)
4979+ self.metrics.timing('timing')
4980+
4981+ elapsed = time.time() - start_time
4982+
4983+ label, val, units = re.split(":|\|", self.connection.data)
4984+ self.assertEqual(label, 'txstatsd.tests.timing')
4985+ self.assertEqual(units, 'ms')
4986+ self.assertTrue(100 <= float(val) <= elapsed * 1000)
4987+
4988+ def test_generic(self):
4989+ """Test the GenericMetric class."""
4990+ self.metrics.report('users', "pepe", "pd")
4991+ self.assertEqual(self.connection.data,
4992+ 'txstatsd.tests.users:pepe|pd')
4993+
4994+ def test_empty_namespace(self):
4995+ """Test reporting of an empty namespace."""
4996+ self.metrics.namespace = None
4997+ self.metrics.gauge('gauge', 213)
4998+ self.assertEqual(self.connection.data,
4999+ 'gauge:213|g')
5000+
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches