Merge lp:~theiw/txstatsd/deprecate-meter into lp:txstatsd

Proposed by Ian Wilkinson
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 18
Merged at revision: 15
Proposed branch: lp:~theiw/txstatsd/deprecate-meter
Merge into: lp:txstatsd
Diff against target: 1736 lines (+666/-655)
15 files modified
example-stats-client.tac (+14/-7)
txstatsd/client.py (+114/-0)
txstatsd/metrics.py (+0/-154)
txstatsd/metrics/gaugemetric.py (+22/-0)
txstatsd/metrics/metric.py (+47/-0)
txstatsd/metrics/metrics.py (+80/-0)
txstatsd/processor.py (+0/-140)
txstatsd/protocol.py (+0/-82)
txstatsd/report.py (+0/-46)
txstatsd/server/processor.py (+213/-0)
txstatsd/server/protocol.py (+61/-0)
txstatsd/service.py (+0/-128)
txstatsd/tests/test_metrics.py (+60/-72)
txstatsd/tests/test_processor.py (+54/-25)
txstatsd/version.py (+1/-1)
To merge this branch: bzr merge lp:~theiw/txstatsd/deprecate-meter
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Lucio Torre (community) Approve
Review via email: mp+68892@code.launchpad.net

Description of the change

[1] service.py placed at top-level.

[2] client.py placed at top-level

[3] Renamed StatsDClient to TwistedStatsDClient.

[4] report.py to top-level.

[5] processor.py refactored following the style introduced with gauge metric handling.

[6] meter.py to be removed.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) wrote :

 946 + metric = [float(v) for v in values]
 947 + metric.append(key)

 that code seems to imply that the contents of metric could be *values, key
 when in reality it has to be [value, key]
 so maybe it can be replaced with a one liner: metric = [ float(value[0]), key]

review: Approve
Revision history for this message
Sidnei da Silva (sidnei) wrote :

Looks good, although there's a few problems that need fixing before merge. I'll approve it and trust that you'll fix them. :)

[1] Some pep8 warnings:

txstatsd/server/processor.py:124:80: E501 line too long (81 characters)
txstatsd/client.py:51:39: W291 trailing whitespace
txstatsd/client.py:115:1: W391 blank line at end of file

[2] Some pyflakes warnings:

txstatsd/service.py:113: undefined name 'meter'

[3] A test failure:

===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/usr/lib/python2.7/dist-packages/twisted/trial/runner.py", line 575, in loadPackage
    module = modinfo.load()
  File "/usr/lib/python2.7/dist-packages/twisted/python/modules.py", line 383, in load
    return self.pathEntry.pythonPath.moduleLoader(self.name)
  File "/usr/lib/python2.7/dist-packages/twisted/python/reflect.py", line 464, in namedAny
    topLevelPackage = _importAndCheckStack(trialname)
  File "/src/txstatsd/deprecate-meter/txstatsd/tests/test_service.py", line 5, in <module>
    from txstatsd import service
  File "/src/txstatsd/deprecate-meter/txstatsd/service.py", line 12, in <module>
    from txstatsd.processor import MessageProcessor
exceptions.ImportError: No module named processor

txstatsd.tests.test_service

[4] Seems like some files where removed and added again. This not only breaks history but it can also cause conflicts for anyone that has pending branches which get merged after yours. I suggest starting fresh from trunk and reapplying your changes. You could even just take a patch from this branch and apply it on top of a clean branch.

review: Approve
lp:~theiw/txstatsd/deprecate-meter updated
19. By Ian Wilkinson

pep8 and correct failing test.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'example-stats-client.tac'
2--- example-stats-client.tac 2011-07-12 04:54:49 +0000
3+++ example-stats-client.tac 2011-07-25 16:11:30 +0000
4@@ -6,14 +6,20 @@
5
6 from twisted.application.service import Application
7
8-from txstatsd.protocol import StatsDClientProtocol
9-from txstatsd.metrics import TransportMeter
10+from txstatsd.client import (
11+ TwistedStatsDClient, StatsDClientProtocol)
12+from txstatsd.metrics.metrics import Metrics
13 from txstatsd.process import PROCESS_STATS
14 from txstatsd.report import ReportingService
15
16
17+STATSD_HOST = "127.0.0.1"
18+STATSD_PORT = 8125
19+
20 application = Application("example-stats-client")
21-meter = TransportMeter(prefix=socket.gethostname() + ".example-client")
22+statsd_client = TwistedStatsDClient(STATSD_HOST, STATSD_PORT)
23+metrics = Metrics(connection=statsd_client,
24+ namespace=socket.gethostname() + ".example-client")
25
26 reporting = ReportingService()
27 reporting.setServiceParent(application)
28@@ -24,13 +30,13 @@
29 def random_walker(name):
30 """Meters a random walk."""
31 if random.random() > 0.5:
32- meter.increment(name)
33+ metrics.increment(name)
34 else:
35- meter.decrement(name)
36+ metrics.decrement(name)
37
38 def random_normal(name):
39 """Meters samples from a normal distribution."""
40- meter.timing(name, random.normalvariate(10, 3))
41+ metrics.timing(name, random.normalvariate(10, 3))
42
43
44 for n in range(5):
45@@ -42,5 +48,6 @@
46 t.start(0.5, now=False)
47
48
49-protocol = StatsDClientProtocol("127.0.0.1", 8125, meter, 6000)
50+protocol = StatsDClientProtocol(STATSD_HOST, STATSD_PORT,
51+ statsd_client, 6000)
52 reactor.listenUDP(0, protocol)
53
54=== added file 'txstatsd/client.py'
55--- txstatsd/client.py 1970-01-01 00:00:00 +0000
56+++ txstatsd/client.py 2011-07-25 16:11:30 +0000
57@@ -0,0 +1,114 @@
58+
59+import socket
60+
61+from twisted.internet.protocol import DatagramProtocol
62+
63+
64+class StatsDClientProtocol(DatagramProtocol):
65+ """A Twisted-based implementation of the StatsD client protocol.
66+
67+ Data is sent via UDP to a StatsD server for aggregation.
68+ """
69+
70+ def __init__(self, host, port, client, interval=None):
71+ self.host = host
72+ self.port = port
73+ self.client = client
74+ self.interval = interval
75+
76+ def startProtocol(self):
77+ """Connect to destination host."""
78+ self.client.connect(self.transport)
79+
80+ def stopProtocol(self):
81+ """Connection was lost."""
82+ self.client.disconnect()
83+
84+
85+class TwistedStatsDClient(object):
86+
87+ def __init__(self, host, port,
88+ connect_callback=None, disconnect_callback=None):
89+ """Build a connection that reports to the endpoint (on
90+ C{host} and C{port}) using UDP.
91+
92+ @param host: The StatsD server host.
93+ @param port: The StatsD server port.
94+ @param connect_callback: The callback to invoke on connection.
95+ @param disconnect_callback: The callback to invoke on disconnection.
96+ """
97+
98+ self.host = host
99+ self.port = port
100+ self.connect_callback = connect_callback
101+ self.disconnect_callback = disconnect_callback
102+
103+ self.transport = None
104+
105+ def connect(self, transport=None):
106+ """Connect to the StatsD server."""
107+ self.transport = transport
108+ if self.transport is not None:
109+ if self.connect_callback is not None:
110+ self.connect_callback()
111+
112+ def disconnect(self):
113+ """Disconnect from the StatsD server."""
114+ if self.disconnect_callback is not None:
115+ self.disconnect_callback()
116+ self.transport = self.host = self.port = None
117+
118+ def write(self, data):
119+ """Send the metric to the StatsD server."""
120+ if self.transport is not None:
121+ self.transport.write(data, (self.host, self.port))
122+
123+
124+class UdpStatsDClient(object):
125+
126+ def __init__(self, host=None, port=None):
127+ """Build a connection that reports to C{host} and C{port})
128+ using UDP.
129+
130+ @param host: The StatsD host.
131+ @param port: The StatsD port.
132+ """
133+ self.host = host
134+ self.port = port
135+
136+ def connect(self):
137+ """Connect to the StatsD server."""
138+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
139+ self.addr = (self.host, self.port)
140+
141+ def disconnect(self):
142+ """Disconnect from the StatsD server."""
143+ if self.socket is not None:
144+ self.socket.close()
145+ self.socket = None
146+
147+ def write(self, data):
148+ """Send the metric to the StatsD server."""
149+ if self.addr is None or self.socket is None:
150+ return
151+ self.socket.sendto(data, self.addr)
152+
153+
154+class InternalClient(object):
155+ """A connection that can be used inside the C{StatsD} daemon itself."""
156+
157+ def __init__(self, processor):
158+ """
159+ A connection that writes directly to the C{MessageProcessor}.
160+ """
161+ self._processor = processor
162+
163+ def connect(self):
164+ pass
165+
166+ def disconnect(self):
167+ pass
168+
169+ def write(self, data):
170+ """Write directly to the C{MessageProcessor}."""
171+ self._processor.process(data)
172
173=== added directory 'txstatsd/metrics'
174=== removed file 'txstatsd/metrics.py'
175--- txstatsd/metrics.py 2011-07-12 04:54:49 +0000
176+++ txstatsd/metrics.py 1970-01-01 00:00:00 +0000
177@@ -1,154 +0,0 @@
178-import time
179-import socket
180-import random
181-
182-
183-class BaseMeter(object):
184-
185- def __init__(self, prefix="", sample_rate=1):
186- """Base implementation for a C{StatsD} client utility.
187-
188- All networking logic is left for the subclasses.
189-
190- prefix: a string to prepend to all metrics. useful to insert stuff like
191- server name.
192- sample_rate: only write C{sample_rate} percent of messages to the wire.
193- """
194- self.prefix = prefix
195- self.sample_rate = sample_rate
196-
197- def increment(self, name, count=1):
198- """Report and increase in name by count."""
199- self.send("%s:%s|c" % (name, count))
200-
201- def decrement(self, name, count=1):
202- """Report and decrease in name by count."""
203- self.increment(name, count * -1)
204-
205- def done(self, name):
206- """Report that name completed successfully."""
207- self.increment(name + ".done")
208-
209- def error(self, name):
210- """Report that name failed."""
211- self.increment(name + ".error")
212-
213- def timing(self, name, duration):
214- """Report that name took duration ms."""
215- self.send("%s:%s|ms" % (name, duration))
216-
217- def send(self, data):
218- """Send out C{data} to C{StatsD} if over C{sample_rate}."""
219-
220- if self.sample_rate < 1:
221- if random.random() > self.sample_rate:
222- return
223- data += "|@%s" % (self.sample_rate,)
224-
225- if self.prefix:
226- data = self.prefix + "." + data
227-
228- self.write(data)
229-
230- def write(self, data):
231- """Write out C{data} to the network."""
232- raise NotImplementedError()
233-
234-
235-class InProcessMeter(BaseMeter):
236- """A meter that can be used inside the C{StatsD} daemon itself."""
237-
238- def __init__(self, processor, prefix="", sample_rate=1):
239- self.processor = processor
240- BaseMeter.__init__(self, prefix=prefix, sample_rate=sample_rate)
241-
242- def write(self, data):
243- """Pass the data along directly to the C{Processor}."""
244- self.processor.process(data)
245-
246-
247-class Meter(BaseMeter):
248- """A trivial, non-Twisted-dependent meter."""
249-
250- def __init__(self, host=None, port=None, prefix="", sample_rate=1):
251- """Build a meter that reports to host:port over udp.
252-
253- host: statsd host
254- port: statsd port
255- prefix: a string to prepend to all metrics. useful to insert stuff like
256- server name.
257- sample_rate: only write C{sample_rate} percent of messages to the wire.
258- """
259- BaseMeter.__init__(self, prefix=prefix, sample_rate=sample_rate)
260- self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
261- self.addr = (host, port)
262-
263- def write(self, data):
264- """Write out C{data} to the network."""
265- if self.addr is None or self.socket is None:
266- return
267- self.socket.sendto(data, self.addr)
268-
269- def close(self):
270- """Close the socket."""
271- self.socket.close()
272- self.socket = None
273-
274-
275-class Measure(object):
276- """Context Manager for generic measuring."""
277-
278- def __init__(self, prefix, operation_name, meter=None):
279- if meter is None:
280- meter = Meter(prefix)
281- self.meter = meter
282- self.operation_name = operation_name
283- self.before = None
284-
285- def __enter__(self):
286- """Increase operation meter."""
287- self.before = time.time()
288- self.meter.increment(self.operation_name)
289-
290- def __exit__(self, exception_type, value, traceback):
291- """
292- Record time since __enter__, increase either the .done or the
293- .error meter, and decrease the operation meter.
294-
295- """
296- if exception_type is None:
297- self.meter.timing(self.operation_name, time.time() - self.before)
298- self.meter.increment(self.operation_name + '.done')
299- else:
300- self.meter.increment(self.operation_name + '.error')
301- self.meter.decrement(self.operation_name)
302-
303-
304-class TransportMeter(BaseMeter):
305-
306- transport = None
307- host = None
308- port = None
309-
310- def __init__(self, prefix="", sample_rate=1,
311- connect_callback=None, disconnect_callback=None):
312- self.connect_callback = connect_callback
313- self.disconnect_callback = disconnect_callback
314- BaseMeter.__init__(self, prefix=prefix, sample_rate=sample_rate)
315-
316- def connect(self, transport, host, port):
317- self.transport = transport
318- self.host = host
319- self.port = port
320- if self.connect_callback is not None:
321- self.connect_callback()
322-
323- def disconnect(self):
324- if self.disconnect_callback is not None:
325- self.disconnect_callback()
326- self.transport = self.host = self.port = None
327-
328- def write(self, data):
329- """Send metrics to the StatsD server using the transport."""
330- if self.transport is not None:
331- self.transport.write(data, (self.host, self.port))
332
333=== added file 'txstatsd/metrics/__init__.py'
334=== added file 'txstatsd/metrics/gaugemetric.py'
335--- txstatsd/metrics/gaugemetric.py 1970-01-01 00:00:00 +0000
336+++ txstatsd/metrics/gaugemetric.py 2011-07-25 16:11:30 +0000
337@@ -0,0 +1,22 @@
338+
339+from txstatsd.metrics.metric import Metric
340+
341+
342+class GaugeMetric(Metric):
343+ """A gauge metric is an instantaneous reading of a particular value."""
344+
345+ def __init__(self, connection, name, sample_rate=1):
346+ """Construct a metric that reports samples to the supplied
347+ C{connection}.
348+
349+ @param connection: The connection endpoint representing
350+ the StatsD server.
351+ @param name: Indicates what is being instrumented.
352+ @param sample_rate: Restrict the number of samples sent
353+ to the StatsD server based on the supplied C{sample_rate}.
354+ """
355+ Metric.__init__(self, connection, name, sample_rate=sample_rate)
356+
357+ def mark(self, value):
358+ """Report the C{value} for this gauge."""
359+ self.send("%s|g" % value)
360
361=== added file 'txstatsd/metrics/metric.py'
362--- txstatsd/metrics/metric.py 1970-01-01 00:00:00 +0000
363+++ txstatsd/metrics/metric.py 2011-07-25 16:11:30 +0000
364@@ -0,0 +1,47 @@
365+
366+import random
367+
368+
369+class Metric(object):
370+ """
371+ The foundation metric from which the specialized
372+ metrics are derived.
373+ """
374+
375+ def __init__(self, connection, name, sample_rate=1):
376+ """Construct a metric that reports samples to the supplied
377+ C{connection}.
378+
379+ @param connection: The connection endpoint representing
380+ the StatsD server.
381+ @param name: Specific description for this metric.
382+ @param sample_rate: Restrict the number of samples sent
383+ to the StatsD server based on the supplied C{sample_rate}.
384+ """
385+ self.connection = connection
386+ self.name = name
387+ self.sample_rate = sample_rate
388+
389+ def clear(self):
390+ """Responsibility of the specialized metrics."""
391+ pass
392+
393+ def send(self, data):
394+ """
395+ Message the C{data} to the C{StatsD} server according to the
396+ C{sample_rate}.
397+ """
398+
399+ if self.sample_rate < 1:
400+ if random.random() > self.sample_rate:
401+ return
402+ data += "|@%s" % (self.sample_rate,)
403+
404+ data = self.name + ":" + data
405+
406+ self.write(data)
407+
408+ def write(self, data):
409+ """Message the C{data} to the C{StatsD} server."""
410+ if self.connection is not None:
411+ self.connection.write(data)
412
413=== added file 'txstatsd/metrics/metrics.py'
414--- txstatsd/metrics/metrics.py 1970-01-01 00:00:00 +0000
415+++ txstatsd/metrics/metrics.py 2011-07-25 16:11:30 +0000
416@@ -0,0 +1,80 @@
417+
418+from txstatsd.metrics.gaugemetric import GaugeMetric
419+from txstatsd.metrics.metric import Metric
420+
421+
422+class Metrics(object):
423+ def __init__(self, connection=None, namespace=""):
424+ """A convenience class for reporting metric samples
425+ to a StatsD server (C{connection}).
426+
427+ @param connection: The connection endpoint representing
428+ the StatsD server.
429+ @param namespace: The top-level namespace identifying the
430+ origin of the samples.
431+ """
432+
433+ self.connection = connection
434+ self.namespace = namespace
435+ self._metrics = {}
436+
437+ def gauge(self, name, value, sample_rate=1):
438+ """Report an instantaneous reading of a particular value."""
439+ name = self.fully_qualify_name(name)
440+ if not name in self._metrics:
441+ gauge_metric = GaugeMetric(self.connection,
442+ name,
443+ sample_rate)
444+ self._metrics[name] = gauge_metric
445+ self._metrics[name].mark(value)
446+
447+ def increment(self, name, value, sample_rate=1):
448+ """Report and increase in name by count."""
449+ name = self.fully_qualify_name(name)
450+ if not name in self._metrics:
451+ metric = Metric(self.connection,
452+ name,
453+ sample_rate)
454+ self._metrics[name] = metric
455+ self._metrics[name].send("%s|c" % value)
456+
457+ def decrement(self, name, value, sample_rate=1):
458+ """Report and decrease in name by count."""
459+ name = self.fully_qualify_name(name)
460+ if not name in self._metrics:
461+ metric = Metric(self.connection,
462+ name,
463+ sample_rate)
464+ self._metrics[name] = metric
465+ self._metrics[name].send("%s|c" % -value)
466+
467+ def timing(self, name, duration, sample_rate=1):
468+ """Report this sample performed in duration ms."""
469+ name = self.fully_qualify_name(name)
470+ if not name in self._metrics:
471+ metric = Metric(self.connection,
472+ name,
473+ sample_rate)
474+ self._metrics[name] = metric
475+ self._metrics[name].send("%s|ms" % duration)
476+
477+ def clear(self, name):
478+ """Allow the metric to re-initialize its internal state."""
479+ name = self.fully_qualify_name(name)
480+ if name in self._metrics:
481+ metric = self._metrics[name]
482+ if getattr(metric, 'clear', None) is not None:
483+ metric.clear()
484+
485+ def fully_qualify_name(self, name):
486+ """Compose the fully-qualified name: namespace and name."""
487+ fully_qualified_name = ""
488+ if self.namespace is not None:
489+ fully_qualified_name = self.namespace
490+ if name is not None:
491+ # prepend the separator should we have a namespace
492+ if self.namespace is not None and len(self.namespace) > 0:
493+ fully_qualified_name += "." + name
494+ else:
495+ fully_qualified_name = name
496+ return fully_qualified_name
497
498=== removed file 'txstatsd/processor.py'
499--- txstatsd/processor.py 2011-07-05 05:27:22 +0000
500+++ txstatsd/processor.py 1970-01-01 00:00:00 +0000
501@@ -1,140 +0,0 @@
502-import re
503-import time
504-import logging
505-
506-from twisted.python import log
507-
508-
509-SPACES = re.compile("\s+")
510-SLASHES = re.compile("\/+")
511-NON_ALNUM = re.compile("[^a-zA-Z_\-0-9\.]")
512-RATE = re.compile("^@([\d\.]+)")
513-COUNTERS_MESSAGE = (
514- "stats.%(key)s %(value)s %(timestamp)s\n"
515- "stats_counts.%(key)s %(count)s %(timestamp)s\n")
516-TIMERS_MESSAGE = (
517- "stats.timers.%(key)s.mean %(mean)s %(timestamp)s\n"
518- "stats.timers.%(key)s.upper %(upper)s %(timestamp)s\n"
519- "stats.timers.%(key)s.upper_%(percent)s %(threshold_upper)s"
520- " %(timestamp)s\n"
521- "stats.timers.%(key)s.lower %(lower)s %(timestamp)s\n"
522- "stats.timers.%(key)s.count %(count)s %(timestamp)s\n")
523-
524-
525-def normalize_key(key):
526- """
527- Normalize a key that might contain spaces, forward-slashes and other
528- special characters into something that is acceptable by graphite.
529- """
530- key = SPACES.sub("_", key)
531- key = SLASHES.sub("-", key)
532- key = NON_ALNUM.sub("", key)
533- return key
534-
535-
536-class MessageProcessor(object):
537-
538- def __init__(self, time_function=time.time):
539- self.time_function = time_function
540- self.timers = {}
541- self.counters = {}
542-
543- def fail(self, message):
544- """Log and discard malformed message."""
545- log.msg("Bad line: %r" % message, logLevel=logging.DEBUG)
546-
547- def process(self, message):
548- """
549- Process a single entry, adding it to either C{counters} or C{timers}
550- depending on which kind of message it is.
551- """
552- if not ":" in message:
553- return self.fail(message)
554-
555- key, data = message.strip().split(":", 1)
556- if not "|" in data:
557- return self.fail(message)
558-
559- fields = data.split("|")
560- if len(fields) < 2 or len(fields) > 3:
561- return self.fail(message)
562-
563- try:
564- value = float(fields[0])
565- except (TypeError, ValueError):
566- return self.fail(message)
567-
568- key = normalize_key(key)
569-
570- if fields[1] == "c":
571- rate = 1
572- if len(fields) == 3:
573- match = RATE.match(fields[2])
574- if match is None:
575- return self.fail(message)
576- rate = match.group(1)
577- if key not in self.counters:
578- self.counters[key] = 0
579- self.counters[key] += value * (1 / float(rate))
580- elif fields[1] == "ms":
581- if key not in self.timers:
582- self.timers[key] = []
583- self.timers[key].append(value)
584- else:
585- return self.fail(message)
586-
587- def flush(self, interval=10000, percent=90):
588- """
589- Flush all queued stats, computing a normalized count based on
590- C{interval} and mean timings based on C{threshold}.
591- """
592- messages = []
593- num_stats = 0
594- interval = interval / 1000
595- timestamp = int(self.time_function())
596-
597- for key, count in self.counters.iteritems():
598- self.counters[key] = 0
599-
600- value = count / interval
601- messages.append(COUNTERS_MESSAGE % {
602- "key": key,
603- "value": value,
604- "count": count,
605- "timestamp": timestamp})
606- num_stats += 1
607-
608- threshold_value = ((100 - percent) / 100.0)
609- for key, timers in self.timers.iteritems():
610- count = len(timers)
611- if count > 0:
612- self.timers[key] = []
613-
614- timers.sort()
615- lower = timers[0]
616- upper = timers[-1]
617- count = len(timers)
618-
619- mean = lower
620- threshold_upper = upper
621-
622- if count > 1:
623- index = count - int(round(threshold_value * count))
624- timers = timers[:index]
625- threshold_upper = timers[-1]
626- mean = sum(timers) / index
627-
628- num_stats += 1
629-
630- messages.append(TIMERS_MESSAGE % {
631- "key": key,
632- "mean": mean,
633- "upper": upper,
634- "percent": percent,
635- "threshold_upper": threshold_upper,
636- "lower": lower,
637- "count": count,
638- "timestamp": timestamp})
639-
640- messages.append("statsd.numStats %s %s" % (num_stats, timestamp))
641- return messages
642
643=== removed file 'txstatsd/protocol.py'
644--- txstatsd/protocol.py 2011-07-13 16:58:42 +0000
645+++ txstatsd/protocol.py 1970-01-01 00:00:00 +0000
646@@ -1,82 +0,0 @@
647-from twisted.internet import task, defer
648-from twisted.protocols.basic import LineOnlyReceiver
649-from twisted.internet.protocol import (
650- DatagramProtocol, ReconnectingClientFactory)
651-
652-
653-class StatsDServerProtocol(DatagramProtocol):
654- """A Twisted-based implementation of the StatsD server.
655-
656- Data is received via UDP for local aggregation and then sent to a Graphite
657- server via TCP.
658- """
659-
660- def __init__(self, processor):
661- self.processor = processor
662-
663- def datagramReceived(self, data, (host, port)):
664- """Process received data and store it locally."""
665- self.processor.process(data)
666-
667-
668-class StatsDClientProtocol(DatagramProtocol):
669- """A Twisted-based implementation of the StatsD client protocol.
670-
671- Data is sent via UDP to a StatsD server for aggregation.
672- """
673-
674- def __init__(self, host, port, meter, interval=None):
675- self.host = host
676- self.port = port
677- self.meter = meter
678- self.interval = interval
679-
680- def startProtocol(self):
681- """Connect to destination host."""
682- self.meter.connect(self.transport, self.host, self.port)
683-
684- def stopProtocol(self):
685- """Connection was lost."""
686- self.meter.disconnect()
687-
688-
689-class GraphiteProtocol(LineOnlyReceiver):
690- """A client protocol for talking to Graphite.
691-
692- Messages to Graphite are line-based and C{\n}-separated.
693- """
694-
695- delimiter = "\n"
696-
697- def __init__(self, processor, interval):
698- self.processor = processor
699- self.interval = interval
700- self.flush_task = task.LoopingCall(self.flushProcessor)
701- self.flush_task.start(self.interval / 1000, False)
702-
703- @defer.inlineCallbacks
704- def flushProcessor(self):
705- """Flush messages queued in the processor to Graphite."""
706- for message in self.processor.flush(interval=self.interval):
707- for line in message.splitlines():
708- if self.connected:
709- self.sendLine(line)
710- yield
711-
712-
713-class GraphiteClientFactory(ReconnectingClientFactory):
714- """A reconnecting Graphite client."""
715-
716- def __init__(self, processor, interval):
717- self.processor = processor
718- self.interval = interval
719-
720- def buildProtocol(self, addr):
721- """
722- Build a new instance of the L{Graphite} protocol, bound to the
723- L{MessageProcessor}.
724- """
725- self.resetDelay()
726- protocol = GraphiteProtocol(self.processor, self.interval)
727- protocol.factory = self
728- return protocol
729
730=== added file 'txstatsd/report.py'
731--- txstatsd/report.py 1970-01-01 00:00:00 +0000
732+++ txstatsd/report.py 2011-07-25 16:11:30 +0000
733@@ -0,0 +1,46 @@
734+from twisted.internet.defer import maybeDeferred
735+from twisted.internet.task import LoopingCall
736+from twisted.python import log
737+
738+from functools import wraps
739+
740+from twisted.application.service import Service
741+
742+
743+class ReportingService(Service):
744+
745+ def __init__(self):
746+ self.tasks = []
747+
748+ def schedule(self, function, interval, report_function):
749+ """
750+ Schedule C{function} to be called every C{interval} seconds and then
751+ report gathered metrics to C{Graphite} using C{report_function}.
752+ """
753+ task = LoopingCall(self.wrapped(function, report_function))
754+ self.tasks.append((task, interval))
755+
756+ def wrapped(self, function, report_function):
757+ def report_metrics(metrics):
758+ """For each metric returned, call C{report_function} with it."""
759+ for name, value in metrics.items():
760+ report_function(name, value)
761+ return metrics
762+
763+ @wraps(function)
764+ def wrapper():
765+ """Wrap C{function} to report metrics or log a failure."""
766+ deferred = maybeDeferred(function)
767+ deferred.addCallback(report_metrics)
768+ deferred.addErrback(lambda failure: log.err(
769+ failure, "Error while processing %s" % function.func_name))
770+ return deferred
771+ return wrapper
772+
773+ def startService(self):
774+ for task, interval in self.tasks:
775+ task.start(interval, now=False)
776+
777+ def stopService(self):
778+ for task, interval in self.tasks:
779+ task.stop()
780
781=== removed file 'txstatsd/report.py'
782--- txstatsd/report.py 2011-07-12 13:54:16 +0000
783+++ txstatsd/report.py 1970-01-01 00:00:00 +0000
784@@ -1,46 +0,0 @@
785-from twisted.internet.defer import maybeDeferred
786-from twisted.internet.task import LoopingCall
787-from twisted.python import log
788-
789-from functools import wraps
790-
791-from twisted.application.service import Service
792-
793-
794-class ReportingService(Service):
795-
796- def __init__(self):
797- self.tasks = []
798-
799- def schedule(self, function, interval, report_function):
800- """
801- Schedule C{function} to be called every C{interval} seconds and then
802- report gathered metrics to C{Graphite} using C{report_function}.
803- """
804- task = LoopingCall(self.wrapped(function, report_function))
805- self.tasks.append((task, interval))
806-
807- def wrapped(self, function, report_function):
808- def report_metrics(metrics):
809- """For each metric returned, call C{report_function} with it."""
810- for name, value in metrics.items():
811- report_function(name, value)
812- return metrics
813-
814- @wraps(function)
815- def wrapper():
816- """Wrap C{function} to report metrics or log a failure."""
817- deferred = maybeDeferred(function)
818- deferred.addCallback(report_metrics)
819- deferred.addErrback(lambda failure: log.err(
820- failure, "Error while processing %s" % function.func_name))
821- return deferred
822- return wrapper
823-
824- def startService(self):
825- for task, interval in self.tasks:
826- task.start(interval, now=False)
827-
828- def stopService(self):
829- for task, interval in self.tasks:
830- task.stop()
831
832=== added directory 'txstatsd/server'
833=== added file 'txstatsd/server/__init__.py'
834=== added file 'txstatsd/server/processor.py'
835--- txstatsd/server/processor.py 1970-01-01 00:00:00 +0000
836+++ txstatsd/server/processor.py 2011-07-25 16:11:30 +0000
837@@ -0,0 +1,213 @@
838+from collections import deque
839+import re
840+import time
841+import logging
842+
843+from twisted.python import log
844+
845+
846+SPACES = re.compile("\s+")
847+SLASHES = re.compile("\/+")
848+NON_ALNUM = re.compile("[^a-zA-Z_\-0-9\.]")
849+RATE = re.compile("^@([\d\.]+)")
850+COUNTERS_MESSAGE = (
851+ "stats.%(key)s %(value)s %(timestamp)s\n"
852+ "stats_counts.%(key)s %(count)s %(timestamp)s\n")
853+TIMERS_MESSAGE = (
854+ "stats.timers.%(key)s.mean %(mean)s %(timestamp)s\n"
855+ "stats.timers.%(key)s.upper %(upper)s %(timestamp)s\n"
856+ "stats.timers.%(key)s.upper_%(percent)s %(threshold_upper)s"
857+ " %(timestamp)s\n"
858+ "stats.timers.%(key)s.lower %(lower)s %(timestamp)s\n"
859+ "stats.timers.%(key)s.count %(count)s %(timestamp)s\n")
860+
861+GAUGE_METRIC_MESSAGE = (
862+ "stats.gauge.%(key)s.value %(value)s %(timestamp)s\n")
863+
864+
865+def normalize_key(key):
866+ """
867+ Normalize a key that might contain spaces, forward-slashes and other
868+ special characters into something that is acceptable by graphite.
869+ """
870+ key = SPACES.sub("_", key)
871+ key = SLASHES.sub("-", key)
872+ key = NON_ALNUM.sub("", key)
873+ return key
874+
875+
876+class MessageProcessor(object):
877+
878+ def __init__(self, time_function=time.time):
879+ self.time_function = time_function
880+ self.timer_metrics = {}
881+ self.counter_metrics = {}
882+ self.gauge_metrics = deque()
883+
884+ def fail(self, message):
885+ """Log and discard malformed message."""
886+ log.msg("Bad line: %r" % message, logLevel=logging.DEBUG)
887+
888+ def process(self, message):
889+ """
890+ Process a single entry, adding it to either C{counters}, C{timers},
891+ or C{gauge_metrics} depending on which kind of message it is.
892+ """
893+ if not ":" in message:
894+ return self.fail(message)
895+
896+ key, data = message.strip().split(":", 1)
897+ if not "|" in data:
898+ return self.fail(message)
899+
900+ fields = data.split("|")
901+ if len(fields) < 2 or len(fields) > 3:
902+ return self.fail(message)
903+
904+ key = normalize_key(key)
905+
906+ if fields[1] == "c":
907+ self.process_counter_metric(key, fields, message)
908+ elif fields[1] == "ms":
909+ self.process_timer_metric(key, fields[0], message)
910+ elif fields[1] == "g":
911+ self.process_gauge_metric(key, fields[0], message)
912+ else:
913+ return self.fail(message)
914+
915+ def process_timer_metric(self, key, value, message):
916+ try:
917+ value = float(value)
918+ except (TypeError, ValueError):
919+ return self.fail(message)
920+ if key not in self.timer_metrics:
921+ self.timer_metrics[key] = []
922+ self.timer_metrics[key].append(value)
923+
924+ def process_counter_metric(self, key, composite, message):
925+ try:
926+ value = float(composite[0])
927+ except (TypeError, ValueError):
928+ return self.fail(message)
929+ rate = 1
930+ if len(composite) == 3:
931+ match = RATE.match(composite[2])
932+ if match is None:
933+ return self.fail(message)
934+ rate = match.group(1)
935+ if key not in self.counter_metrics:
936+ self.counter_metrics[key] = 0
937+ self.counter_metrics[key] += value * (1 / float(rate))
938+
939+ def process_gauge_metric(self, key, composite, message):
940+ values = composite.split(":")
941+ if not len(values) == 1:
942+ return self.fail(message)
943+
944+ try:
945+ metric = [float(values[0]), key]
946+ self.gauge_metrics.append(metric)
947+ except (TypeError, ValueError):
948+ self.fail(message)
949+
950+ def flush(self, interval=10000, percent=90):
951+ """
952+ Flush all queued stats, computing a normalized count based on
953+ C{interval} and mean timings based on C{threshold}.
954+ """
955+ messages = []
956+ num_stats = 0
957+ interval = interval / 1000
958+ timestamp = int(self.time_function())
959+
960+ counter_metrics, events = self.flush_counter_metrics(interval,
961+ timestamp)
962+ if events > 0:
963+ messages.extend(counter_metrics)
964+ num_stats += events
965+
966+ timer_metrics, events = self.flush_timer_metrics(percent, timestamp)
967+ if events > 0:
968+ messages.extend(timer_metrics)
969+ num_stats += events
970+
971+ gauge_metrics, events = self.flush_gauge_metrics(timestamp)
972+ if events > 0:
973+ messages.extend(gauge_metrics)
974+ num_stats += events
975+
976+ messages.append("statsd.numStats %s %s" % (num_stats, timestamp))
977+ return messages
978+
979+ def flush_counter_metrics(self, interval, timestamp):
980+ metrics = []
981+ events = 0
982+ for key, count in self.counter_metrics.iteritems():
983+ self.counter_metrics[key] = 0
984+
985+ value = count / interval
986+ message = COUNTERS_MESSAGE % {
987+ "key": key,
988+ "value": value,
989+ "count": count,
990+ "timestamp": timestamp}
991+ metrics.append(message)
992+ events += 1
993+
994+ return (metrics, events)
995+
996+ def flush_timer_metrics(self, percent, timestamp):
997+ metrics = []
998+ events = 0
999+
1000+ threshold_value = ((100 - percent) / 100.0)
1001+ for key, timers in self.timer_metrics.iteritems():
1002+ count = len(timers)
1003+ if count > 0:
1004+ self.timer_metrics[key] = []
1005+
1006+ timers.sort()
1007+ lower = timers[0]
1008+ upper = timers[-1]
1009+ count = len(timers)
1010+
1011+ mean = lower
1012+ threshold_upper = upper
1013+
1014+ if count > 1:
1015+ index = count - int(round(threshold_value * count))
1016+ timers = timers[:index]
1017+ threshold_upper = timers[-1]
1018+ mean = sum(timers) / index
1019+
1020+ message = TIMERS_MESSAGE % {
1021+ "key": key,
1022+ "mean": mean,
1023+ "upper": upper,
1024+ "percent": percent,
1025+ "threshold_upper": threshold_upper,
1026+ "lower": lower,
1027+ "count": count,
1028+ "timestamp": timestamp}
1029+ metrics.append(message)
1030+ events += 1
1031+
1032+ return (metrics, events)
1033+
1034+ def flush_gauge_metrics(self, timestamp):
1035+ metrics = []
1036+ events = 0
1037+ for metric in self.gauge_metrics:
1038+ value = metric[0]
1039+ key = metric[1]
1040+
1041+ message = GAUGE_METRIC_MESSAGE % {
1042+ "key": key,
1043+ "value": value,
1044+ "timestamp": timestamp}
1045+ metrics.append(message)
1046+ events += 1
1047+
1048+ self.gauge_metrics.clear()
1049+
1050+ return (metrics, events)
1051
1052=== added file 'txstatsd/server/protocol.py'
1053--- txstatsd/server/protocol.py 1970-01-01 00:00:00 +0000
1054+++ txstatsd/server/protocol.py 2011-07-25 16:11:30 +0000
1055@@ -0,0 +1,61 @@
1056+from twisted.internet import task, defer
1057+from twisted.protocols.basic import LineOnlyReceiver
1058+from twisted.internet.protocol import (
1059+ DatagramProtocol, ReconnectingClientFactory)
1060+
1061+
1062+class StatsDServerProtocol(DatagramProtocol):
1063+ """A Twisted-based implementation of the StatsD server.
1064+
1065+ Data is received via UDP for local aggregation and then sent to a Graphite
1066+ server via TCP.
1067+ """
1068+
1069+ def __init__(self, processor):
1070+ self.processor = processor
1071+
1072+ def datagramReceived(self, data, (host, port)):
1073+ """Process received data and store it locally."""
1074+ self.processor.process(data)
1075+
1076+
1077+class GraphiteProtocol(LineOnlyReceiver):
1078+ """A client protocol for talking to Graphite.
1079+
1080+ Messages to Graphite are line-based and C{\n}-separated.
1081+ """
1082+
1083+ delimiter = "\n"
1084+
1085+ def __init__(self, processor, interval):
1086+ self.processor = processor
1087+ self.interval = interval
1088+ self.flush_task = task.LoopingCall(self.flushProcessor)
1089+ self.flush_task.start(self.interval / 1000, False)
1090+
1091+ @defer.inlineCallbacks
1092+ def flushProcessor(self):
1093+ """Flush messages queued in the processor to Graphite."""
1094+ for message in self.processor.flush(interval=self.interval):
1095+ for line in message.splitlines():
1096+ if self.connected:
1097+ self.sendLine(line)
1098+ yield
1099+
1100+
1101+class GraphiteClientFactory(ReconnectingClientFactory):
1102+ """A reconnecting Graphite client."""
1103+
1104+ def __init__(self, processor, interval):
1105+ self.processor = processor
1106+ self.interval = interval
1107+
1108+ def buildProtocol(self, addr):
1109+ """
1110+ Build a new instance of the L{Graphite} protocol, bound to the
1111+ L{MessageProcessor}.
1112+ """
1113+ self.resetDelay()
1114+ protocol = GraphiteProtocol(self.processor, self.interval)
1115+ protocol.factory = self
1116+ return protocol
1117
1118=== added file 'txstatsd/service.py'
1119--- txstatsd/service.py 1970-01-01 00:00:00 +0000
1120+++ txstatsd/service.py 2011-07-25 16:11:30 +0000
1121@@ -0,0 +1,130 @@
1122+import socket
1123+import ConfigParser
1124+
1125+from twisted.application.internet import TCPClient, UDPServer
1126+from twisted.application.service import MultiService
1127+from twisted.internet import reactor
1128+from twisted.python import usage, util
1129+
1130+from txstatsd import process
1131+from txstatsd.client import InternalClient
1132+from txstatsd.metrics.metrics import Metrics
1133+from txstatsd.server.processor import MessageProcessor
1134+from txstatsd.server.protocol import GraphiteClientFactory, StatsDServerProtocol
1135+from txstatsd.report import ReportingService
1136+
1137+_unset = object()
1138+
1139+
1140+class OptionsGlue(usage.Options):
1141+ """
1142+ Extends usage.Options to also read parameters from a config file.
1143+ """
1144+
1145+ optParameters = []
1146+
1147+ def __init__(self):
1148+ self.glue_defaults = {}
1149+ self._config_file = None
1150+ config = ["config", "c", None, "Config file to use."]
1151+
1152+ new_params = []
1153+
1154+ def process_parameter(parameter):
1155+ long, short, default, doc, paramType = util.padTo(5, parameter)
1156+ self.glue_defaults[long] = default
1157+ new_params.append(
1158+ [long, short, _unset, doc, paramType])
1159+
1160+ for parameter in self.glue_parameters:
1161+ if parameter[0] == "config" or parameter[1] == "c":
1162+ raise ValueError("the --config/-c parameter is reserved.")
1163+ process_parameter(parameter)
1164+ process_parameter(config)
1165+
1166+ # we need to change self.__class__.optParameters as usage.Options
1167+ # will collect the config from there, not self.optParameters:
1168+ # reflect.accumulateClassList(self.__class__, 'optParameters',
1169+ # parameters)
1170+ self.__class__.optParameters = new_params
1171+
1172+ super(OptionsGlue, self).__init__()
1173+
1174+ def __getitem__(self, item):
1175+ result = super(OptionsGlue, self).__getitem__(item)
1176+ if result is not _unset:
1177+ return result
1178+
1179+ fname = super(OptionsGlue, self).__getitem__("config")
1180+ if fname is not _unset:
1181+ self._config_file = ConfigParser.RawConfigParser()
1182+ self._config_file.read(fname)
1183+
1184+ if self._config_file is not None:
1185+ try:
1186+ result = self._config_file.get("statsd", item)
1187+ except ConfigParser.NoOptionError:
1188+ pass
1189+ else:
1190+ if item in self._dispatch:
1191+ result = self._dispatch[item].coerce(result)
1192+ return result
1193+
1194+ return self.glue_defaults[item]
1195+
1196+
1197+class StatsDOptions(OptionsGlue):
1198+ """
1199+ The set of configuration settings for txStatsD.
1200+ """
1201+ glue_parameters = [
1202+ ["carbon-cache-host", "h", "localhost",
1203+ "The host where carbon cache is listening."],
1204+ ["carbon-cache-port", "p", 2003,
1205+ "The port where carbon cache is listening.", int],
1206+ ["listen-port", "l", 8125,
1207+ "The UDP port where we will listen.", int],
1208+ ["flush-interval", "i", 10000,
1209+ "The number of milliseconds between each flush.", int],
1210+ ["prefix", "p", None,
1211+ "Prefix to use when reporting stats.", str],
1212+ ["report", "r", None,
1213+ "Which additional stats to report {process|net|io|system}.", str],
1214+ ]
1215+
1216+
1217+def createService(options):
1218+ """Create a txStatsD service."""
1219+
1220+ service = MultiService()
1221+ service.setName("statsd")
1222+ processor = MessageProcessor()
1223+ prefix = options["prefix"]
1224+ if prefix is None:
1225+ prefix = socket.gethostname() + ".statsd"
1226+
1227+ connection = InternalClient(processor)
1228+ metrics = Metrics(connection, namespace=prefix)
1229+
1230+ if options["report"] is not None:
1231+ reporting = ReportingService()
1232+ reporting.setServiceParent(service)
1233+ reporting.schedule(
1234+ process.report_reactor_stats(reactor), 10, metrics.increment)
1235+ reports = [name.strip() for name in options["report"].split(",")]
1236+ for report_name in reports:
1237+ for reporter in getattr(process, "%s_STATS" %
1238+ report_name.upper(), ()):
1239+ reporting.schedule(reporter, 10, metrics.increment)
1240+
1241+ factory = GraphiteClientFactory(processor, options["flush-interval"])
1242+ client = TCPClient(options["carbon-cache-host"],
1243+ options["carbon-cache-port"],
1244+ factory)
1245+ client.setServiceParent(service)
1246+
1247+ listener = UDPServer(options["listen-port"],
1248+ StatsDServerProtocol(processor))
1249+ listener.setServiceParent(service)
1250+
1251+ return service
1252
1253=== removed file 'txstatsd/service.py'
1254--- txstatsd/service.py 2011-07-12 04:54:49 +0000
1255+++ txstatsd/service.py 1970-01-01 00:00:00 +0000
1256@@ -1,128 +0,0 @@
1257-import socket
1258-import ConfigParser
1259-
1260-from twisted.application.internet import TCPClient, UDPServer
1261-from twisted.application.service import MultiService
1262-from twisted.internet import reactor
1263-from twisted.python import usage, util
1264-
1265-from txstatsd import process
1266-from txstatsd.metrics import InProcessMeter
1267-from txstatsd.processor import MessageProcessor
1268-from txstatsd.protocol import GraphiteClientFactory, StatsDServerProtocol
1269-from txstatsd.report import ReportingService
1270-
1271-_unset = object()
1272-
1273-
1274-class OptionsGlue(usage.Options):
1275- """
1276- Extends usage.Options to also read parameters from a config file.
1277- """
1278-
1279- optParameters = []
1280-
1281- def __init__(self):
1282- self.glue_defaults = {}
1283- self._config_file = None
1284- config = ["config", "c", None, "Config file to use."]
1285-
1286- new_params = []
1287-
1288- def process_parameter(parameter):
1289- long, short, default, doc, paramType = util.padTo(5, parameter)
1290- self.glue_defaults[long] = default
1291- new_params.append(
1292- [long, short, _unset, doc, paramType])
1293-
1294- for parameter in self.glue_parameters:
1295- if parameter[0] == "config" or parameter[1] == "c":
1296- raise ValueError("the --config/-c parameter is reserved.")
1297- process_parameter(parameter)
1298- process_parameter(config)
1299-
1300- # we need to change self.__class__.optParameters as usage.Options
1301- # will collect the config from there, not self.optParameters:
1302- # reflect.accumulateClassList(self.__class__, 'optParameters',
1303- # parameters)
1304- self.__class__.optParameters = new_params
1305-
1306- super(OptionsGlue, self).__init__()
1307-
1308- def __getitem__(self, item):
1309- result = super(OptionsGlue, self).__getitem__(item)
1310- if result is not _unset:
1311- return result
1312-
1313- fname = super(OptionsGlue, self).__getitem__("config")
1314- if fname is not _unset:
1315- self._config_file = ConfigParser.RawConfigParser()
1316- self._config_file.read(fname)
1317-
1318- if self._config_file is not None:
1319- try:
1320- result = self._config_file.get("statsd", item)
1321- except ConfigParser.NoOptionError:
1322- pass
1323- else:
1324- if item in self._dispatch:
1325- result = self._dispatch[item].coerce(result)
1326- return result
1327-
1328- return self.glue_defaults[item]
1329-
1330-
1331-class StatsDOptions(OptionsGlue):
1332- """
1333- The set of configuration settings for txStatsD.
1334- """
1335- glue_parameters = [
1336- ["carbon-cache-host", "h", "localhost",
1337- "The host where carbon cache is listening."],
1338- ["carbon-cache-port", "p", 2003,
1339- "The port where carbon cache is listening.", int],
1340- ["listen-port", "l", 8125,
1341- "The UDP port where we will listen.", int],
1342- ["flush-interval", "i", 10000,
1343- "The number of milliseconds between each flush.", int],
1344- ["prefix", "p", None,
1345- "Prefix to use when reporting stats.", str],
1346- ["report", "r", None,
1347- "Which additional stats to report {process|net|io|system}.", str],
1348- ]
1349-
1350-
1351-def createService(options):
1352- """Create a txStatsD service."""
1353-
1354- service = MultiService()
1355- service.setName("statsd")
1356- processor = MessageProcessor()
1357- prefix = options["prefix"]
1358- if prefix is None:
1359- prefix = socket.gethostname() + ".statsd"
1360-
1361- meter = InProcessMeter(processor, prefix=prefix)
1362-
1363- if options["report"] is not None:
1364- reporting = ReportingService()
1365- reporting.setServiceParent(service)
1366- reporting.schedule(
1367- process.report_reactor_stats(reactor), 10, meter.increment)
1368- reports = [name.strip() for name in options["report"].split(",")]
1369- for report_name in reports:
1370- for reporter in getattr(process, "%s_STATS" %
1371- report_name.upper(), ()):
1372- reporting.schedule(reporter, 10, meter.increment)
1373-
1374- factory = GraphiteClientFactory(processor, options["flush-interval"])
1375- client = TCPClient(options["carbon-cache-host"],
1376- options["carbon-cache-port"],
1377- factory)
1378- client.setServiceParent(service)
1379-
1380- listener = UDPServer(options["listen-port"],
1381- StatsDServerProtocol(processor))
1382- listener.setServiceParent(service)
1383-
1384- return service
1385
1386=== modified file 'txstatsd/tests/test_metrics.py'
1387--- txstatsd/tests/test_metrics.py 2011-06-21 17:14:01 +0000
1388+++ txstatsd/tests/test_metrics.py 2011-07-25 16:11:30 +0000
1389@@ -1,72 +1,60 @@
1390-"""Tests for metrics context manager."""
1391-
1392-from mocker import ANY, MockerTestCase
1393-from txstatsd.metrics import Measure
1394-
1395-
1396-class MeasureTest(MockerTestCase):
1397- """Test case for the L{Measure} context manager."""
1398-
1399- def test_measure(self):
1400- """Basic test."""
1401- operation_name = 'fake_function'
1402- meter = self.mocker.mock()
1403- meter.increment(operation_name)
1404- meter.timing(operation_name, ANY)
1405- meter.increment(operation_name + '.done')
1406- meter.decrement(operation_name)
1407- self.mocker.replay()
1408- result = []
1409-
1410- def fake_function():
1411- """Useles method."""
1412- result.append(0)
1413-
1414- with Measure('test_prefix', 'fake_function', meter):
1415- fake_function()
1416- self.assertEqual([0], result)
1417-
1418- def test_measure_timing(self):
1419- """Test the timing works."""
1420- operation_name = 'fake_function'
1421- MockTime = self.mocker.replace('time.time') # pylint: disable=C0103
1422- self.expect(MockTime()).result(10)
1423- self.expect(MockTime()).result(15)
1424- meter = self.mocker.mock()
1425- meter.increment(operation_name)
1426- meter.timing(operation_name, 5)
1427- meter.increment(operation_name + '.done')
1428- meter.decrement(operation_name)
1429- self.mocker.replay()
1430-
1431- def fake_function():
1432- """Useless method."""
1433-
1434- with Measure('test_prefix', 'fake_function', meter):
1435- fake_function()
1436-
1437- def test_measure_handle_exceptions(self):
1438- """Test exceptions."""
1439-
1440- class TestException(Exception):
1441- """Exception used to test the wrapper."""
1442- pass
1443-
1444- operation_name = 'fake_function'
1445- meter = self.mocker.mock()
1446- meter.increment(operation_name)
1447- meter.increment(operation_name + '.error')
1448- meter.decrement(operation_name)
1449- self.mocker.replay()
1450-
1451- def fake_function():
1452- """Fake Method that raises an exception."""
1453- raise TestException()
1454-
1455- try:
1456- with Measure('test_prefix', 'fake_function', meter):
1457- fake_function()
1458- except TestException:
1459- self.assertTrue(True)
1460- else:
1461- self.fail('measure context manager did not reraise exception.')
1462+"""Tests for the Metrics convenience class."""
1463+
1464+from unittest import TestCase
1465+
1466+from txstatsd.metrics.metrics import Metrics
1467+
1468+
1469+class FakeStatsDClient(object):
1470+
1471+ def connect(self):
1472+ """Connect to the StatsD server."""
1473+ pass
1474+
1475+ def disconnect(self):
1476+ """Disconnect from the StatsD server."""
1477+ pass
1478+
1479+ def write(self, data):
1480+ """Send the metric to the StatsD server."""
1481+ self.data = data
1482+
1483+
1484+class TestMetrics(TestCase):
1485+
1486+ def setUp(self):
1487+ self.connection = FakeStatsDClient()
1488+ self.metrics = Metrics(self.connection, 'txstatsd.tests')
1489+
1490+ def test_gauge(self):
1491+ """Test reporting of a gauge metric sample."""
1492+ self.metrics.gauge('gauge', 102)
1493+ self.assertEqual(self.connection.data,
1494+ 'txstatsd.tests.gauge:102|g')
1495+
1496+ def test_counter(self):
1497+ """Test the increment and decrement operations."""
1498+ self.metrics.increment('counter', 18)
1499+ self.assertEqual(self.connection.data,
1500+ 'txstatsd.tests.counter:18|c')
1501+ self.metrics.decrement('counter', 9)
1502+ self.assertEqual(self.connection.data,
1503+ 'txstatsd.tests.counter:-9|c')
1504+
1505+ def test_timing(self):
1506+ """Test the timing operation."""
1507+ self.metrics.timing('timing', 101123)
1508+ self.assertEqual(self.connection.data,
1509+ 'txstatsd.tests.timing:101123|ms')
1510+
1511+ def test_empty_namespace(self):
1512+ """Test reporting of an empty namespace."""
1513+ self.metrics.namespace = None
1514+ self.metrics.gauge('gauge', 213)
1515+ self.assertEqual(self.connection.data,
1516+ 'gauge:213|g')
1517+
1518+ self.metrics.namespace = ''
1519+ self.metrics.gauge('gauge', 413)
1520+ self.assertEqual(self.connection.data,
1521+ 'gauge:413|g')
1522
1523=== modified file 'txstatsd/tests/test_processor.py'
1524--- txstatsd/tests/test_processor.py 2011-06-23 14:24:04 +0000
1525+++ txstatsd/tests/test_processor.py 2011-07-25 16:11:30 +0000
1526@@ -1,6 +1,6 @@
1527 from unittest import TestCase
1528
1529-from txstatsd.processor import MessageProcessor
1530+from txstatsd.server.processor import MessageProcessor
1531
1532
1533 class TestMessageProcessor(MessageProcessor):
1534@@ -25,8 +25,8 @@
1535 unit. 'c' is simply used to signal that this is a counter message.
1536 """
1537 self.processor.process("gorets:1|c")
1538- self.assertEqual(1, len(self.processor.counters))
1539- self.assertEqual(1.0, self.processor.counters["gorets"])
1540+ self.assertEqual(1, len(self.processor.counter_metrics))
1541+ self.assertEqual(1.0, self.processor.counter_metrics["gorets"])
1542
1543 def test_receive_counter_rate(self):
1544 """
1545@@ -37,8 +37,8 @@
1546 counter value.
1547 """
1548 self.processor.process("gorets:1|c|@0.1")
1549- self.assertEqual(1, len(self.processor.counters))
1550- self.assertEqual(10.0, self.processor.counters["gorets"])
1551+ self.assertEqual(1, len(self.processor.counter_metrics))
1552+ self.assertEqual(10.0, self.processor.counter_metrics["gorets"])
1553
1554 def test_receive_timer(self):
1555 """
1556@@ -46,16 +46,28 @@
1557 identifier and '320' is the time in milliseconds.
1558 """
1559 self.processor.process("glork:320|ms")
1560- self.assertEqual(1, len(self.processor.timers))
1561- self.assertEqual([320], self.processor.timers["glork"])
1562+ self.assertEqual(1, len(self.processor.timer_metrics))
1563+ self.assertEqual([320], self.processor.timer_metrics["glork"])
1564+
1565+ def test_receive_gauge_metric(self):
1566+ """
1567+ A gauge metric message takes the form:
1568+ '<name>:<count>|g'.
1569+ 'g' indicates this is a gauge metric message.
1570+ """
1571+ self.processor.process("gorets:9.6|g")
1572+ self.assertEqual(1, len(self.processor.gauge_metrics))
1573+ self.assertEqual(
1574+ [9.6, 'gorets'],
1575+ self.processor.gauge_metrics.pop())
1576
1577 def test_receive_message_no_fields(self):
1578 """
1579 If a timer message has no fields, it is logged and discarded.
1580 """
1581 self.processor.process("glork")
1582- self.assertEqual(0, len(self.processor.timers))
1583- self.assertEqual(0, len(self.processor.counters))
1584+ self.assertEqual(0, len(self.processor.timer_metrics))
1585+ self.assertEqual(0, len(self.processor.counter_metrics))
1586 self.assertEqual(["glork"], self.processor.failures)
1587
1588 def test_receive_counter_no_value(self):
1589@@ -63,7 +75,7 @@
1590 If a counter message has no value, it is logged and discarded.
1591 """
1592 self.processor.process("gorets:|c")
1593- self.assertEqual(0, len(self.processor.counters))
1594+ self.assertEqual(0, len(self.processor.counter_metrics))
1595 self.assertEqual(["gorets:|c"], self.processor.failures)
1596
1597 def test_receive_timer_no_value(self):
1598@@ -71,7 +83,7 @@
1599 If a timer message has no value, it is logged and discarded.
1600 """
1601 self.processor.process("glork:|ms")
1602- self.assertEqual(0, len(self.processor.timers))
1603+ self.assertEqual(0, len(self.processor.timer_metrics))
1604 self.assertEqual(["glork:|ms"], self.processor.failures)
1605
1606 def test_receive_not_enough_fields(self):
1607@@ -79,8 +91,8 @@
1608 If a timer message has not enough fields, it is logged and discarded.
1609 """
1610 self.processor.process("glork:1")
1611- self.assertEqual(0, len(self.processor.timers))
1612- self.assertEqual(0, len(self.processor.counters))
1613+ self.assertEqual(0, len(self.processor.timer_metrics))
1614+ self.assertEqual(0, len(self.processor.counter_metrics))
1615 self.assertEqual(["glork:1"], self.processor.failures)
1616
1617 def test_receive_too_many_fields(self):
1618@@ -88,8 +100,8 @@
1619 If a timer message has too many fields, it is logged and discarded.
1620 """
1621 self.processor.process("gorets:1|c|@0.1|yay")
1622- self.assertEqual(0, len(self.processor.timers))
1623- self.assertEqual(0, len(self.processor.counters))
1624+ self.assertEqual(0, len(self.processor.timer_metrics))
1625+ self.assertEqual(0, len(self.processor.counter_metrics))
1626 self.assertEqual(["gorets:1|c|@0.1|yay"], self.processor.failures)
1627
1628
1629@@ -110,28 +122,28 @@
1630 If a counter is present, flushing it will generate a counter message
1631 normalized to the default interval.
1632 """
1633- self.processor.counters["gorets"] = 42
1634+ self.processor.counter_metrics["gorets"] = 42
1635 messages = self.processor.flush()
1636 self.assertEqual(2, len(messages))
1637 counters = messages[0].splitlines()
1638 self.assertEqual("stats.gorets 4 42", counters[0])
1639 self.assertEqual("stats_counts.gorets 42 42", counters[1])
1640 self.assertEqual("statsd.numStats 1 42", messages[1])
1641- self.assertEqual(0, self.processor.counters["gorets"])
1642+ self.assertEqual(0, self.processor.counter_metrics["gorets"])
1643
1644 def test_flush_counter_one_second_interval(self):
1645 """
1646 It is possible to flush counters with a one-second interval, in which
1647 case the counter value will be unchanged.
1648 """
1649- self.processor.counters["gorets"] = 42
1650+ self.processor.counter_metrics["gorets"] = 42
1651 messages = self.processor.flush(interval=1000)
1652 self.assertEqual(2, len(messages))
1653 counters = messages[0].splitlines()
1654 self.assertEqual("stats.gorets 42 42", counters[0])
1655 self.assertEqual("stats_counts.gorets 42 42", counters[1])
1656 self.assertEqual("statsd.numStats 1 42", messages[1])
1657- self.assertEqual(0, self.processor.counters["gorets"])
1658+ self.assertEqual(0, self.processor.counter_metrics["gorets"])
1659
1660 def test_flush_single_timer_single_time(self):
1661 """
1662@@ -139,7 +151,7 @@
1663 threshold_upper, lower, mean will be set to the same value. Timer is
1664 reset after flush is called.
1665 """
1666- self.processor.timers["glork"] = [24]
1667+ self.processor.timer_metrics["glork"] = [24]
1668 messages = self.processor.flush()
1669 self.assertEqual(2, len(messages))
1670 timers = messages[0].splitlines()
1671@@ -149,7 +161,7 @@
1672 self.assertEqual("stats.timers.glork.lower 24 42", timers[3])
1673 self.assertEqual("stats.timers.glork.count 1 42", timers[4])
1674 self.assertEqual("statsd.numStats 1 42", messages[1])
1675- self.assertEqual([], self.processor.timers["glork"])
1676+ self.assertEqual([], self.processor.timer_metrics["glork"])
1677
1678 def test_flush_single_timer_multiple_times(self):
1679 """
1680@@ -160,7 +172,7 @@
1681 - count will be the count of data points
1682 - mean will be the mean value within the 90th percentile
1683 """
1684- self.processor.timers["glork"] = [4, 8, 15, 16, 23, 42]
1685+ self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42]
1686 messages = self.processor.flush()
1687 self.assertEqual(2, len(messages))
1688 timers = messages[0].splitlines()
1689@@ -170,7 +182,7 @@
1690 self.assertEqual("stats.timers.glork.lower 4 42", timers[3])
1691 self.assertEqual("stats.timers.glork.count 6 42", timers[4])
1692 self.assertEqual("statsd.numStats 1 42", messages[1])
1693- self.assertEqual([], self.processor.timers["glork"])
1694+ self.assertEqual([], self.processor.timer_metrics["glork"])
1695
1696 def test_flush_single_timer_50th_percentile(self):
1697 """
1698@@ -184,7 +196,7 @@
1699 - count will be the count of data points
1700 - mean will be the mean value within the 50th percentile
1701 """
1702- self.processor.timers["glork"] = [4, 8, 15, 16, 23, 42]
1703+ self.processor.timer_metrics["glork"] = [4, 8, 15, 16, 23, 42]
1704 messages = self.processor.flush(percent=50)
1705 self.assertEqual(2, len(messages))
1706 timers = messages[0].splitlines()
1707@@ -194,4 +206,21 @@
1708 self.assertEqual("stats.timers.glork.lower 4 42", timers[3])
1709 self.assertEqual("stats.timers.glork.count 6 42", timers[4])
1710 self.assertEqual("statsd.numStats 1 42", messages[1])
1711- self.assertEqual([], self.processor.timers["glork"])
1712+ self.assertEqual([], self.processor.timer_metrics["glork"])
1713+
1714+ def test_flush_gauge_metric(self):
1715+ """
1716+ Test the correct rendering of the Graphite report for
1717+ a gauge metric.
1718+ """
1719+
1720+ self.processor.process("gorets:9.6|g")
1721+
1722+ messages = self.processor.flush()
1723+ self.assertEqual(2, len(messages))
1724+ gauge_metric = messages[0].splitlines()
1725+ self.assertEqual(
1726+ "stats.gauge.gorets.value 9.6 42", gauge_metric[0])
1727+ self.assertEqual(
1728+ "statsd.numStats 1 42", messages[1])
1729+ self.assertEqual(0, len(self.processor.gauge_metrics))
1730
1731=== modified file 'txstatsd/version.py'
1732--- txstatsd/version.py 2011-06-20 19:11:22 +0000
1733+++ txstatsd/version.py 2011-07-25 16:11:30 +0000
1734@@ -1,1 +1,1 @@
1735-txstatsd = "0.1.0"
1736+txstatsd = "0.1.1"

Subscribers

People subscribed via source and target branches