Merge lp:~lucio.torre/txstatsd/add-plugins into lp:txstatsd

Proposed by Lucio Torre
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 54
Merged at revision: 51
Proposed branch: lp:~lucio.torre/txstatsd/add-plugins
Merge into: lp:txstatsd
Diff against target: 862 lines (+308/-98)
17 files modified
twisted/plugins/distinct_plugin.py (+20/-0)
txstatsd.conf-example (+3/-0)
txstatsd/itxstatsd.py (+54/-0)
txstatsd/metrics/distinctmetric.py (+26/-22)
txstatsd/metrics/metrics.py (+25/-1)
txstatsd/process.py (+2/-1)
txstatsd/server/configurableprocessor.py (+5/-2)
txstatsd/server/loggingprocessor.py (+10/-3)
txstatsd/server/processor.py (+35/-24)
txstatsd/service.py (+18/-7)
txstatsd/tests/metrics/test_distinct.py (+23/-12)
txstatsd/tests/test_configurableprocessor.py (+16/-2)
txstatsd/tests/test_loggingprocessor.py (+23/-9)
txstatsd/tests/test_metrics.py (+6/-0)
txstatsd/tests/test_processor.py (+29/-14)
txstatsd/tests/test_service.py (+12/-0)
txstatsd/version.py (+1/-1)
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-plugins
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Ian Wilkinson (community) Approve
Review via email: mp+82712@code.launchpad.net

Commit message

Add pluggable metric kind support to txstatsd.

Description of the change

Add pluggable metric kind support to txstatsd.

If you add an IMetricFactory provider in a twisted/plugins/ directory of your project and it is in the python path, it will be picked up and available for reporting. (see the probabilistic distinct count as an example of this).

To post a comment you must log in.
Revision history for this message
Ian Wilkinson (theiw) wrote :

+1

Some quick comments:

* version.py needs a bump
* including a plugin_ section in txstatsd.conf-example might be useful
* rather than metric plugins, I would be tempted by processor plugins. This would involve:
- processor.py becoming an abstract processor, instantiated with a registry of metrics
- statsdprocessor.py specialising the abstract processor with its metrics registry populated by the supported StatsD metrics
- pluggableprocessor.py a specialised processor, with a metrics processor and functionality for handling processor plugins
- processorplugin.py replaces the metrics-oriented extensions introduced with the distinct_plugin.py
- twisted/plugins/codahale_plugin.py would then allow the inclusion of the Coda Hale metrics
- the pluggableprocessor.py could allow aggregation of processor plugins, for example, we would be able to support Coda Hale metrics, and logging capability
* itxstatsd.py could be txstatsd/metrics/imetric.py
* introduce txstatsd/config.py with the accumulateClassList and OptionsGlue from service.py. Have an extended OptionsGlue that includes the plugin support
* best stop there!

review: Approve
Revision history for this message
Ian Wilkinson (theiw) wrote :

In the earlier comment, the following:

"- pluggableprocessor.py a specialised processor, with a metrics processor and functionality for handling processor plugins"

should have read:

"- pluggableprocessor.py a specialised processor, with a metrics registry and functionality for handling processor plugins"

Revision history for this message
Sidnei da Silva (sidnei) wrote :

1. Please rename itxstatsd.py to interfaces.py

2. Please fix this docstring:

+ def test_generic(self):
+ """Test the timing operation."""

3. The IMetricFactory attribute is named 'metric_kind_key' but elsewhere in the code 'metric_type' is used. Maybe rename the attribute to 'metric_type'?

4. Should this variable be named 'generic_metric'?

   gauge_metric = GenericMetric

5. I couldn't figure out what GenericMetric is used for. Maybe a refactoring leftover?

6. Please keep at least a line between class and first function:

+class TestPlugin(TestCase):
+ def test_factory(self):

review: Needs Fixing
lp:~lucio.torre/txstatsd/add-plugins updated
52. By Lucio Torre

fixed per review comments

53. By Lucio Torre

unify naming

Revision history for this message
Lucio Torre (lucio.torre) wrote :

ian,
- done
- done
- too much refactoring there to add features and rewrite everything on one branch.
- "A convention for defining interfaces is do so in a file named like ProjectName/projectname/iprojectname.py" http://twistedmatrix.com/documents/current/core/howto/plugin.html
- left for the big refactoring.

sidnei,
1. "A convention for defining interfaces is do so in a file named like ProjectName/projectname/iprojectname.py" http://twistedmatrix.com/documents/current/core/howto/plugin.html

2. done

3. done (and changed other locations to use metric_type too as discussed on irc)

4. done

5. for sending metric that have no specific interface support in the client side and not adding dependencies on plugins there too.

6. fixed

lp:~lucio.torre/txstatsd/add-plugins updated
54. By Lucio Torre

adding tests for passing arguments on flush

Revision history for this message
Sidnei da Silva (sidnei) wrote :

Looks good. +1!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'twisted/plugins/distinct_plugin.py'
2--- twisted/plugins/distinct_plugin.py 1970-01-01 00:00:00 +0000
3+++ twisted/plugins/distinct_plugin.py 2011-11-18 20:11:23 +0000
4@@ -0,0 +1,20 @@
5+from zope.interface import implements
6+
7+from twisted.plugin import IPlugin
8+from txstatsd.itxstatsd import IMetricFactory
9+from txstatsd.metrics.distinctmetric import DistinctMetricReporter
10+
11+class DistinctMetricFactory(object):
12+ implements(IMetricFactory, IPlugin)
13+
14+ name = "pdistinct"
15+ metric_type = "pd"
16+
17+ def build_metric(self, prefix, name, wall_time_func=None):
18+ return DistinctMetricReporter(name, prefix=prefix,
19+ wall_time_func=wall_time_func)
20+
21+ def configure(self, options):
22+ pass
23+
24+distinct_metric_factory = DistinctMetricFactory()
25
26=== modified file 'txstatsd.conf-example'
27--- txstatsd.conf-example 2011-09-02 10:11:11 +0000
28+++ txstatsd.conf-example 2011-11-18 20:11:23 +0000
29@@ -21,3 +21,6 @@
30 # configured monitor-response.
31 monitor-message: txstatsd ping
32 monitor-response: txstatsd pong
33+
34+[plugin_sample]
35+sample-key: sample-value
36\ No newline at end of file
37
38=== added file 'txstatsd/itxstatsd.py'
39--- txstatsd/itxstatsd.py 1970-01-01 00:00:00 +0000
40+++ txstatsd/itxstatsd.py 2011-11-18 20:11:23 +0000
41@@ -0,0 +1,54 @@
42+from zope.interface import Interface, Attribute
43+
44+
45+class IMetricFactory(Interface):
46+ name = Attribute("""
47+ @type name C{str}
48+ @ivar name: The name of this kind of metric
49+ """)
50+
51+ metric_type = Attribute("""
52+ @type metric_type: C{str}
53+ @ivar metric_type: The string that will be used by clients to send
54+ metrics of this kind to the server.
55+ """)
56+
57+ def build_metric(prefix, name, wall_time_func=None):
58+ """
59+ Returns an object that implements the C{IMetric} interface for name.
60+
61+ @type prefix: C{str}
62+ @param prefix: The prefix used for reporting this metric.
63+ @type name: C{str}
64+ @param name: The name used for reporting this metric.
65+ """
66+
67+ def configure(options):
68+ """
69+ Configures the factory. Will be called at startup by the service
70+ factory.
71+
72+ @type options: C{twisted.python.usage.Options}
73+ @param options: The configuration options.
74+ """
75+
76+
77+class IMetric(Interface):
78+ def process(fields):
79+ """
80+ Process new data for this metric.
81+
82+ @type fields: C{list}
83+ @param fields: The list of message parts. Usually in the form of
84+ (value, metric_type, [sample_rate])
85+ """
86+
87+ def flush(interval, timestamp):
88+ """
89+ Returns a string with new line separated list of metrics to report.
90+
91+ @type interval: C{float}
92+ @param interval: The time since last flush.
93+ @type timestamp: C{float}
94+ @param timestamp: The timestamp for now.
95+ """
96
97=== modified file 'txstatsd/metrics/distinctmetric.py'
98--- txstatsd/metrics/distinctmetric.py 2011-10-26 16:48:42 +0000
99+++ txstatsd/metrics/distinctmetric.py 2011-11-18 20:11:23 +0000
100@@ -13,21 +13,23 @@
101
102 from string import Template
103
104+from zope.interface import implements
105 from txstatsd.metrics.metric import Metric
106-
107-
108+from txstatsd.itxstatsd import IMetric
109+
110+
111 class SBoxHash(object):
112 """A very fast hash.
113-
114+
115 This class create a random hash function that is very fast.
116 Based on SBOXes. Not Crypto Strong.
117-
118+
119 Two instances of this class will hash differently.
120 """
121-
122+
123 def __init__(self):
124 self.table = [random.randint(0, 0xFFFFFFFF - 1) for i in range(256)]
125-
126+
127 def hash(self, data):
128 value = 0
129 for c in data:
130@@ -35,14 +37,14 @@
131 value = value * 3
132 value = value & 0xFFFFFFFF
133 return value
134-
135-
136+
137+
138 def hash(data):
139 """Hash data using a random hasher."""
140 p = SBoxHash()
141 return p.hash(data)
142-
143-
144+
145+
146 def zeros(n):
147 """Count the zeros to the right of the binary representation of n."""
148 count = 0
149@@ -60,19 +62,19 @@
150
151 class SlidingDistinctCounter(object):
152 """A probabilistic distinct counter with sliding windows."""
153-
154+
155 def __init__(self, n_hashes, n_buckets):
156 self.n_hashes = n_hashes
157 self.n_buckets = n_buckets
158-
159+
160 self.hashes = [SBoxHash() for i in range(n_hashes)]
161 self.buckets = [[0] * n_buckets for i in range(n_hashes)]
162-
163+
164 def add(self, when, item):
165 hashes = (h.hash(item) for h in self.hashes)
166 for i, value in enumerate(hashes):
167 self.buckets[i][min(self.n_buckets - 1, zeros(value))] = when
168-
169+
170 def distinct(self, since=0):
171 total = 0.0
172 for i in range(self.n_hashes):
173@@ -84,7 +86,7 @@
174 total += least0
175 v = total / self.n_hashes
176 return int((2 ** v) / 0.77351)
177-
178+
179
180 class DistinctMetric(Metric):
181 """
182@@ -102,13 +104,13 @@
183 Keeps an estimate of the distinct numbers of items seen on various
184 sliding windows of time.
185 """
186+ implements(IMetric)
187
188 MESSAGE = (
189 "$prefix%(key)s.count_1min %(count_1min)s %(timestamp)s\n"
190 "$prefix%(key)s.count_1hour %(count_1hour)s %(timestamp)s\n"
191 "$prefix%(key)s.count_1day %(count_1day)s %(timestamp)s\n"
192- "$prefix%(key)s.count %(count)s %(timestamp)s\n"
193- )
194+ "$prefix%(key)s.count %(count)s %(timestamp)s\n")
195
196 def __init__(self, name, wall_time_func=time.time, prefix=""):
197 """Construct a metric we expect to be periodically updated.
198@@ -131,18 +133,20 @@
199
200 def count_1min(self, now):
201 return self.counter.distinct(now - 60)
202-
203+
204 def count_1hour(self, now):
205 return self.counter.distinct(now - 60 * 60)
206-
207+
208 def count_1day(self, now):
209 return self.counter.distinct(now - 60 * 60 * 24)
210
211+ def process(self, fields):
212+ self.update(fields[0])
213+
214 def update(self, item):
215- """Adds a seen item."""
216 self.counter.add(self.wall_time_func(), item)
217-
218- def report(self, timestamp):
219+
220+ def flush(self, interval, timestamp):
221 now = self.wall_time_func()
222 return self.message % {
223 "key": self.name,
224
225=== modified file 'txstatsd/metrics/metrics.py'
226--- txstatsd/metrics/metrics.py 2011-10-25 20:22:10 +0000
227+++ txstatsd/metrics/metrics.py 2011-11-18 20:11:23 +0000
228@@ -5,6 +5,16 @@
229 from txstatsd.metrics.metric import Metric
230
231
232+class GenericMetric(Metric):
233+ def __init__(self, connection, key, name, sample_rate=1):
234+ super(GenericMetric, self).__init__(connection, name,
235+ sample_rate=sample_rate)
236+ self.key = key
237+
238+ def mark(self, value):
239+ self.send("%s|%s" % (value, self.key))
240+
241+
242 class Metrics(object):
243 def __init__(self, connection=None, namespace=""):
244 """A convenience class for reporting metric samples
245@@ -20,6 +30,20 @@
246 self.namespace = namespace
247 self._metrics = {}
248
249+ def report(self, name, value, metric_type, sample_rate=1):
250+ """Report a generic metric.
251+
252+ Used for server side plugins without client support.
253+ """
254+ name = self.fully_qualify_name(name)
255+ if not name in self._metrics:
256+ metric = GenericMetric(self.connection,
257+ metric_type,
258+ name,
259+ sample_rate)
260+ self._metrics[name] = metric
261+ self._metrics[name].mark(value)
262+
263 def gauge(self, name, value, sample_rate=1):
264 """Report an instantaneous reading of a particular value."""
265 name = self.fully_qualify_name(name)
266@@ -76,7 +100,7 @@
267 metric = DistinctMetric(self.connection, name)
268 self._metrics[name] = metric
269 self._metrics[name].mark(item)
270-
271+
272 def clear(self, name):
273 """Allow the metric to re-initialize its internal state."""
274 name = self.fully_qualify_name(name)
275
276=== modified file 'txstatsd/process.py'
277--- txstatsd/process.py 2011-08-08 09:28:45 +0000
278+++ txstatsd/process.py 2011-11-18 20:11:23 +0000
279@@ -114,7 +114,8 @@
280 result = {prefix + ".cpu.percent": self.process.get_cpu_percent(),
281 prefix + ".cpu.user": utime,
282 prefix + ".cpu.system": stime,
283- prefix + ".memory.percent": self.process.get_memory_percent(),
284+ prefix + ".memory.percent":
285+ self.process.get_memory_percent(),
286 prefix + ".memory.vsize": vsize,
287 prefix + ".memory.rss": rss}
288 if getattr(self.process, "get_num_threads", None) is not None:
289
290=== modified file 'txstatsd/server/configurableprocessor.py'
291--- txstatsd/server/configurableprocessor.py 2011-10-25 20:22:10 +0000
292+++ txstatsd/server/configurableprocessor.py 2011-11-18 20:11:23 +0000
293@@ -24,9 +24,9 @@
294
295 METRICS_SUMMARY = "statsd.numStats %s %s\n"
296
297- def __init__(self, time_function=time.time, message_prefix=""):
298+ def __init__(self, time_function=time.time, message_prefix="", plugins=None):
299 super(ConfigurableMessageProcessor, self).__init__(
300- time_function=time_function)
301+ time_function=time_function, plugins=plugins)
302
303 if message_prefix:
304 self.metrics_summary = message_prefix + '.' + \
305@@ -38,6 +38,9 @@
306 self.message_prefix = message_prefix
307 self.gauge_metrics = {}
308
309+ def get_message_prefix(self, kind):
310+ return self.message_prefix
311+
312 def compose_timer_metric(self, key, duration):
313 if not key in self.timer_metrics:
314 metric = TimerMetricReporter(key, prefix=self.message_prefix)
315
316=== modified file 'txstatsd/server/loggingprocessor.py'
317--- txstatsd/server/loggingprocessor.py 2011-10-04 16:22:19 +0000
318+++ txstatsd/server/loggingprocessor.py 2011-11-18 20:11:23 +0000
319@@ -11,18 +11,20 @@
320 attribute.)
321 """
322
323- def __init__(self, logger, time_function=time.time, message_prefix=""):
324+ def __init__(self, logger, time_function=time.time, message_prefix="", plugins=None):
325 super(LoggingMessageProcessor, self).__init__(
326- time_function=time_function, message_prefix=message_prefix)
327+ time_function=time_function, message_prefix=message_prefix,
328+ plugins=plugins)
329
330 logger_info = getattr(logger, 'info', None)
331 if logger_info is None or not callable(logger_info):
332 raise TypeError()
333 self.logger = logger
334
335- def flush(self):
336+ def flush(self, interval=10000, percent=90):
337 """Log all received metric samples to the supplied log file."""
338 timestamp = int(self.time_function())
339+ interval = interval / 1000
340
341 def log_metrics(metrics):
342 for metric in metrics.itervalues():
343@@ -34,4 +36,9 @@
344 log_metrics(self.gauge_metrics)
345 log_metrics(self.meter_metrics)
346 log_metrics(self.timer_metrics)
347+
348+ for metric in self.plugin_metrics.itervalues():
349+ report = metric.flush(interval, timestamp)
350+ for measurement in report.splitlines():
351+ self.logger.info(measurement)
352
353
354=== modified file 'txstatsd/server/processor.py'
355--- txstatsd/server/processor.py 2011-10-26 16:48:42 +0000
356+++ txstatsd/server/processor.py 2011-11-18 20:11:23 +0000
357@@ -50,7 +50,7 @@
358 GAUGE_METRIC_MESSAGE = (
359 "stats.gauge.%(key)s.value %(value)s %(timestamp)s\n")
360
361- def __init__(self, time_function=time.time):
362+ def __init__(self, time_function=time.time, plugins=None):
363 self.time_function = time_function
364
365 self.counters_message = MessageProcessor.COUNTERS_MESSAGE
366@@ -63,6 +63,13 @@
367 self.meter_metrics = {}
368 self.distinct_metrics = {}
369
370+ self.plugins = {}
371+ self.plugin_metrics = {}
372+
373+ if plugins is not None:
374+ for plugin in plugins:
375+ self.plugins[plugin.metric_type] = plugin
376+
377 def fail(self, message):
378 """Log and discard malformed message."""
379 log.msg("Bad line: %r" % message, logLevel=logging.DEBUG)
380@@ -84,30 +91,33 @@
381 return self.fail(message)
382
383 key = normalize_key(key)
384+ metric_type = fields[1]
385
386- if fields[1] == "c":
387+ if metric_type == "c":
388 self.process_counter_metric(key, fields, message)
389- elif fields[1] == "ms":
390+ elif metric_type == "ms":
391 self.process_timer_metric(key, fields[0], message)
392- elif fields[1] == "g":
393+ elif metric_type == "g":
394 self.process_gauge_metric(key, fields[0], message)
395- elif fields[1] == "m":
396+ elif metric_type == "m":
397 self.process_meter_metric(key, fields[0], message)
398- elif fields[1] == "d":
399- self.process_distinct_metric(key, fields[0], message)
400+ elif metric_type in self.plugins:
401+ self.process_plugin_metric(metric_type, key, fields, message)
402 else:
403 return self.fail(message)
404
405- def process_distinct_metric(self, key, item, message):
406- self.compose_distinct_metric(key, str(item))
407-
408- def compose_distinct_metric(self, key, item):
409- if not key in self.distinct_metrics:
410- metric = DistinctMetricReporter(key, self.time_function,
411- prefix="stats.distinct")
412- self.distinct_metrics[key] = metric
413- self.distinct_metrics[key].update(item)
414-
415+ def get_message_prefix(self, kind):
416+ return "stats." + kind
417+
418+ def process_plugin_metric(self, metric_type, key, items, message):
419+ if not key in self.plugin_metrics:
420+ factory = self.plugins[metric_type]
421+ metric = factory.build_metric(
422+ self.get_message_prefix(factory.name),
423+ name=key, wall_time_func=self.time_function)
424+ self.plugin_metrics[key] = metric
425+ self.plugin_metrics[key].process(items)
426+
427 def process_timer_metric(self, key, duration, message):
428 try:
429 duration = float(duration)
430@@ -206,11 +216,11 @@
431 messages.extend(meter_metrics)
432 num_stats += events
433
434- distinct_metrics, events = self.flush_distinct_metrics(timestamp)
435+ plugin_metrics, events = self.flush_plugin_metrics(interval, timestamp)
436 if events > 0:
437- messages.extend(distinct_metrics)
438+ messages.extend(plugin_metrics)
439 num_stats += events
440-
441+
442 self.flush_metrics_summary(messages, num_stats, timestamp)
443 return messages
444
445@@ -297,16 +307,17 @@
446
447 return (metrics, events)
448
449- def flush_distinct_metrics(self, timestamp):
450+ def flush_plugin_metrics(self, interval, timestamp):
451 metrics = []
452 events = 0
453- for metric in self.distinct_metrics.itervalues():
454- message = metric.report(timestamp)
455+
456+ for metric in self.plugin_metrics.itervalues():
457+ message = metric.flush(interval, timestamp)
458 metrics.append(message)
459 events += 1
460
461 return (metrics, events)
462-
463+
464 def flush_metrics_summary(self, messages, num_stats, timestamp):
465 messages.append("statsd.numStats %s %s\n" % (num_stats, timestamp))
466
467
468=== modified file 'txstatsd/service.py'
469--- txstatsd/service.py 2011-10-25 14:33:14 +0000
470+++ txstatsd/service.py 2011-11-18 20:11:23 +0000
471@@ -7,6 +7,7 @@
472 from twisted.application.internet import TCPClient, UDPServer
473 from twisted.application.service import MultiService
474 from twisted.python import usage
475+from twisted.plugin import getPlugins
476
477 from txstatsd.client import InternalClient
478 from txstatsd.metrics.metrics import Metrics
479@@ -15,11 +16,12 @@
480 from txstatsd.server.protocol import (
481 GraphiteClientFactory, StatsDServerProtocol)
482 from txstatsd.report import ReportingService
483+from txstatsd.itxstatsd import IMetricFactory
484
485
486 def accumulateClassList(classObj, attr, listObj,
487 baseClass=None, excludeClass=None):
488- """Accumulate all attributes of a given name in a class hierarchy
489+ """Accumulate all attributes of a given name in a class hierarchy
490 into a single list.
491
492 Assuming all class attributes of this name are lists.
493@@ -35,8 +37,7 @@
494 """Extends usage.Options to also read parameters from a config file."""
495
496 optParameters = [
497- ["config", "c", None, "Config file to use."]
498- ]
499+ ["config", "c", None, "Config file to use."]]
500
501 def __init__(self):
502 parameters = []
503@@ -99,6 +100,10 @@
504 value = self._dispatch[name].coerce(value)
505 self[name] = value
506
507+ for section in config_file.sections():
508+ if section.startswith("plugin_"):
509+ self[section] = config_file.items(section)
510+
511
512 class StatsDOptions(OptionsGlue):
513 """
514@@ -122,8 +127,7 @@
515 ["monitor-response", "o", "txstatsd pong",
516 "Response we should send monitoring agent.", str],
517 ["statsd-compliance", "s", 1,
518- "Produce StatsD-compliant messages.", int]
519- ]
520+ "Produce StatsD-compliant messages.", int]]
521
522 def __init__(self):
523 self.config_section = 'statsd'
524@@ -140,12 +144,19 @@
525 if prefix is None:
526 prefix = socket.gethostname() + ".statsd"
527
528+ # initialize plugins
529+ plugin_metrics = []
530+ for plugin in getPlugins(IMetricFactory):
531+ plugin.configure(options)
532+ plugin_metrics.append(plugin)
533+
534 if options["statsd-compliance"]:
535- processor = MessageProcessor()
536+ processor = MessageProcessor(plugins=plugin_metrics)
537 connection = InternalClient(processor)
538 metrics = Metrics(connection, namespace=prefix)
539 else:
540- processor = ConfigurableMessageProcessor(message_prefix=prefix)
541+ processor = ConfigurableMessageProcessor(message_prefix=prefix,
542+ plugins=plugin_metrics)
543 connection = InternalClient(processor)
544 metrics = Metrics(connection)
545
546
547=== modified file 'txstatsd/tests/metrics/test_distinct.py'
548--- txstatsd/tests/metrics/test_distinct.py 2011-10-26 03:26:21 +0000
549+++ txstatsd/tests/metrics/test_distinct.py 2011-11-18 20:11:23 +0000
550@@ -5,11 +5,13 @@
551 from scipy.stats import chi2
552
553 from twisted.trial.unittest import TestCase
554-
555+from twisted.plugin import getPlugins
556+from twisted.plugins import distinct_plugin
557 import txstatsd.metrics.distinctmetric as distinct
558-
559-
560-class TestHash(TestCase):
561+from txstatsd.itxstatsd import IMetricFactory
562+
563+
564+class TestHash(TestCase):
565 def test_hash_chars(self):
566 "For one table, all chars map to different chars"
567 results = set()
568@@ -18,10 +20,10 @@
569 h = distinct.hash(chr(c))
570 results.add(h)
571 self.assertEquals(len(results), 256)
572-
573+
574 def test_chi_square(self):
575 N = 10000
576-
577+
578 for (bits, buckets) in [(-1, 1024), (24, 256),
579 (16, 256), (8, 256), (0, 256)]:
580 bins = [0] * buckets
581@@ -48,26 +50,27 @@
582 self.assertEquals(distinct.zeros(5), 0)
583 self.assertEquals(distinct.zeros(8), 3)
584 self.assertEquals(distinct.zeros(9), 0)
585-
586+
587+
588 class TestDistinct(TestCase):
589 def test_all(self):
590 random.seed(1)
591-
592+
593 for r in [1000, 10000]:
594 cd = distinct.SlidingDistinctCounter(32, 32)
595 for i in range(r):
596 cd.add(1, str(i))
597 error = abs(cd.distinct() - r)
598 self.assertTrue(error < 0.15 * r)
599-
600-
601+
602+
603 class TestDistinctMetricReporter(TestCase):
604 def test_reports(self):
605 random.seed(1)
606 _wall_time = [0]
607 def _time():
608 return _wall_time[0]
609-
610+
611 dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
612 for i in range(3000):
613 _wall_time[0] = i * 50
614@@ -77,4 +80,12 @@
615 self.assertTrue(abs(dmr.count_1min(now) - 1) < 2)
616 self.assertTrue(abs(dmr.count_1hour(now) - 72) < 15)
617 self.assertTrue(abs(dmr.count_1day(now) - 1728) < 500)
618- self.assertTrue("count_1hour" in dmr.report(now))
619+ self.assertTrue("count_1hour" in dmr.flush(1, now))
620+
621+
622+class TestPlugin(TestCase):
623+
624+ def test_factory(self):
625+ self.assertTrue(distinct_plugin.distinct_metric_factory in \
626+ list(getPlugins(IMetricFactory)))
627+
628
629=== modified file 'txstatsd/tests/test_configurableprocessor.py'
630--- txstatsd/tests/test_configurableprocessor.py 2011-10-10 16:08:12 +0000
631+++ txstatsd/tests/test_configurableprocessor.py 2011-11-18 20:11:23 +0000
632@@ -2,9 +2,10 @@
633
634 from unittest import TestCase
635
636+from twisted.plugins.distinct_plugin import distinct_metric_factory
637+
638 from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor
639
640-
641 class FlushMessagesTest(TestCase):
642
643 def test_flush_counter_with_empty_prefix(self):
644@@ -36,7 +37,20 @@
645 self.assertEqual("test.metric.gorets.count 17 42", counters[0])
646 self.assertEqual("test.metric.statsd.numStats 1 42",
647 messages[1].splitlines()[0])
648-
649+
650+ def test_flush_plugin(self):
651+ """
652+ Ensure the prefix features if one is supplied.
653+ """
654+ configurable_processor = ConfigurableMessageProcessor(
655+ time_function=lambda: 42, message_prefix="test.metric",
656+ plugins=[distinct_metric_factory])
657+ configurable_processor.process("gorets:17|pd")
658+ messages = configurable_processor.flush()
659+ self.assertEqual(2, len(messages))
660+ self.assertTrue("test.metric.gorets" in messages[0])
661+
662+
663 def test_flush_single_timer_single_time(self):
664 """
665 If a single timer with a single data point is present, all
666
667=== modified file 'txstatsd/tests/test_loggingprocessor.py'
668--- txstatsd/tests/test_loggingprocessor.py 2011-10-05 15:55:01 +0000
669+++ txstatsd/tests/test_loggingprocessor.py 2011-11-18 20:11:23 +0000
670@@ -1,6 +1,8 @@
671
672 from unittest import TestCase
673
674+from twisted.plugins.distinct_plugin import distinct_metric_factory
675+
676 from txstatsd.server.loggingprocessor import LoggingMessageProcessor
677
678
679@@ -8,6 +10,13 @@
680 def report(self, *args):
681 return 'Sample report'
682
683+class TestLogger(object):
684+ def __init__(self):
685+ self.log = ''
686+
687+ def info(self, measurement):
688+ self.log += measurement + "\n"
689+
690
691 class TestLoggingMessageProcessor(TestCase):
692
693@@ -30,16 +39,21 @@
694 self.assertRaises(TypeError, invoker)
695
696 def test_logger(self):
697- class Logger(object):
698- def __init__(self):
699- self.log = ''
700-
701- def info(self, measurement):
702- self.log += measurement
703-
704- logger = Logger()
705+ logger = TestLogger()
706 processor = LoggingMessageProcessor(logger)
707 metric = FakeMeterMetric()
708 processor.meter_metrics['test'] = metric
709 processor.flush()
710- self.assertEqual(metric.report(), logger.log)
711+ self.assertEqual(metric.report() + "\n", logger.log)
712+
713+ def test_logger_plugin(self):
714+ logger = TestLogger()
715+ processor = LoggingMessageProcessor(
716+ logger, plugins=[distinct_metric_factory],
717+ time_function=lambda: 42)
718+ processor.process("gorets:17|pd")
719+ processor.flush()
720+ self.assertEqual(processor.plugin_metrics['gorets'].flush(
721+ 10, processor.time_function()),
722+ logger.log)
723+
724
725=== modified file 'txstatsd/tests/test_metrics.py'
726--- txstatsd/tests/test_metrics.py 2011-08-26 12:16:45 +0000
727+++ txstatsd/tests/test_metrics.py 2011-11-18 20:11:23 +0000
728@@ -53,6 +53,12 @@
729 self.assertEqual(self.connection.data,
730 'txstatsd.tests.timing:101123|ms')
731
732+ def test_generic(self):
733+ """Test the GenericMetric class."""
734+ self.metrics.report('users', "pepe", "pd")
735+ self.assertEqual(self.connection.data,
736+ 'txstatsd.tests.users:pepe|pd')
737+
738 def test_empty_namespace(self):
739 """Test reporting of an empty namespace."""
740 self.metrics.namespace = None
741
742=== modified file 'txstatsd/tests/test_processor.py'
743--- txstatsd/tests/test_processor.py 2011-10-26 16:48:42 +0000
744+++ txstatsd/tests/test_processor.py 2011-11-18 20:11:23 +0000
745@@ -1,14 +1,16 @@
746 import time
747
748 from unittest import TestCase
749+from twisted.plugin import getPlugins
750
751 from txstatsd.server.processor import MessageProcessor
752-
753+from txstatsd.itxstatsd import IMetricFactory
754
755 class TestMessageProcessor(MessageProcessor):
756
757 def __init__(self):
758- super(TestMessageProcessor, self).__init__()
759+ super(TestMessageProcessor, self).__init__(
760+ plugins=getPlugins(IMetricFactory))
761 self.failures = []
762
763 def fail(self, message):
764@@ -66,12 +68,12 @@
765 def test_receive_distinct_metric(self):
766 """
767 A distinct metric message takes the form:
768- '<name>:<item>|d'.
769- 'distinct' indicates this is a distinct metric message.
770+ '<name>:<item>|pd'.
771+ 'pd' indicates this is a probabilistic distinct metric message.
772 """
773- self.processor.process("gorets:one|d")
774- self.assertEqual(1, len(self.processor.distinct_metrics))
775- self.assertTrue(self.processor.distinct_metrics["gorets"].count() > 0)
776+ self.processor.process("gorets:one|pd")
777+ self.assertEqual(1, len(self.processor.plugin_metrics))
778+ self.assertTrue(self.processor.plugin_metrics["gorets"].count() > 0)
779
780 def test_receive_message_no_fields(self):
781 """
782@@ -120,7 +122,8 @@
783 class FlushMessagesTest(TestCase):
784
785 def setUp(self):
786- self.processor = MessageProcessor(time_function=lambda: 42)
787+ self.processor = MessageProcessor(time_function=lambda: 42,
788+ plugins=getPlugins(IMetricFactory))
789
790 def test_flush_no_stats(self):
791 """
792@@ -243,15 +246,27 @@
793 a distinct metric.
794 """
795
796- self.processor.process("gorets:item|d")
797-
798+ self.processor.process("gorets:item|pd")
799+
800 messages = self.processor.flush()
801 self.assertEqual(2, len(messages))
802 metrics = messages[0]
803- self.assertTrue("stats.distinct.gorets.count " in metrics)
804- self.assertTrue("stats.distinct.gorets.count_1hour" in metrics)
805- self.assertTrue("stats.distinct.gorets.count_1min" in metrics)
806- self.assertTrue("stats.distinct.gorets.count_1day" in metrics)
807+ self.assertTrue("stats.pdistinct.gorets.count " in metrics)
808+ self.assertTrue("stats.pdistinct.gorets.count_1hour" in metrics)
809+ self.assertTrue("stats.pdistinct.gorets.count_1min" in metrics)
810+ self.assertTrue("stats.pdistinct.gorets.count_1day" in metrics)
811+
812+ def test_flush_plugin_arguments(self):
813+ """Test the passing of arguments for flush."""
814+
815+ class FakeMetric(object):
816+ def flush(self, interval, timestamp):
817+ self.data = interval, timestamp
818+
819+ self.processor.plugin_metrics["somemetric"] = FakeMetric()
820+ self.processor.flush(41000)
821+ self.assertEquals((41,42),
822+ self.processor.plugin_metrics["somemetric"].data)
823
824
825 class FlushMeterMetricMessagesTest(TestCase):
826
827=== modified file 'txstatsd/tests/test_service.py'
828--- txstatsd/tests/test_service.py 2011-09-29 08:50:43 +0000
829+++ txstatsd/tests/test_service.py 2011-11-18 20:11:23 +0000
830@@ -2,6 +2,7 @@
831 import ConfigParser
832 import tempfile
833 from unittest import TestCase
834+from StringIO import StringIO
835
836 from twisted.internet.defer import inlineCallbacks, Deferred
837 from twisted.internet.protocol import DatagramProtocol
838@@ -104,6 +105,17 @@
839 f, o = self.get_file_parser([["number", "n", 5, "help", int]])
840 o.parseOptions(["--config", f.name])
841 self.assertEquals(5, o["number"])
842+
843+ def test_support_plugin_sections(self):
844+ class TestOptions(service.OptionsGlue):
845+ optParameters = [["test", "t", "default", "help"]]
846+ config_section = "statsd"
847+
848+ o = TestOptions()
849+ config_file = ConfigParser.RawConfigParser()
850+ config_file.readfp(StringIO("[statsd]\n\n[plugin_test]\nfoo = bar\n"))
851+ o.configure(config_file)
852+ self.assertEquals(o["plugin_test"], config_file.items("plugin_test"))
853
854
855 class Agent(DatagramProtocol):
856
857=== modified file 'txstatsd/version.py'
858--- txstatsd/version.py 2011-11-02 10:50:24 +0000
859+++ txstatsd/version.py 2011-11-18 20:11:23 +0000
860@@ -1,1 +1,1 @@
861-txstatsd = "0.6.4"
862+txstatsd = "0.7.0"

Subscribers

People subscribed via source and target branches