Merge lp:~lucio.torre/txstatsd/add-plugins into lp:txstatsd
- add-plugins
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Sidnei da Silva |
Approved revision: | 54 |
Merged at revision: | 51 |
Proposed branch: | lp:~lucio.torre/txstatsd/add-plugins |
Merge into: | lp:txstatsd |
Diff against target: |
862 lines (+308/-98) 17 files modified
twisted/plugins/distinct_plugin.py (+20/-0) txstatsd.conf-example (+3/-0) txstatsd/itxstatsd.py (+54/-0) txstatsd/metrics/distinctmetric.py (+26/-22) txstatsd/metrics/metrics.py (+25/-1) txstatsd/process.py (+2/-1) txstatsd/server/configurableprocessor.py (+5/-2) txstatsd/server/loggingprocessor.py (+10/-3) txstatsd/server/processor.py (+35/-24) txstatsd/service.py (+18/-7) txstatsd/tests/metrics/test_distinct.py (+23/-12) txstatsd/tests/test_configurableprocessor.py (+16/-2) txstatsd/tests/test_loggingprocessor.py (+23/-9) txstatsd/tests/test_metrics.py (+6/-0) txstatsd/tests/test_processor.py (+29/-14) txstatsd/tests/test_service.py (+12/-0) txstatsd/version.py (+1/-1) |
To merge this branch: | bzr merge lp:~lucio.torre/txstatsd/add-plugins |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Sidnei da Silva | Approve | ||
Ian Wilkinson (community) | Approve | ||
Review via email: mp+82712@code.launchpad.net |
Commit message
Add pluggable metric kind support to txstatsd.
Description of the change
Add pluggable metric kind support to txstatsd.
If you add an IMetricFactory provider in a twisted/plugins/ directory of your project and it is in the python path, it will be picked up and available for reporting. (see the probabilistic distinct count as an example of this).
Ian Wilkinson (theiw) wrote : | # |
In the earlier comment, the following:
"- pluggableproces
should have read:
"- pluggableproces
Sidnei da Silva (sidnei) wrote : | # |
1. Please rename itxstatsd.py to interfaces.py
2. Please fix this docstring:
+ def test_generic(self):
+ """Test the timing operation."""
3. The IMetricFactory attribute is named 'metric_kind_key' but elsewhere in the code 'metric_type' is used. Maybe rename the attribute to 'metric_type'?
4. Should this variable be named 'generic_metric'?
gauge_metric = GenericMetric
5. I couldn't figure out what GenericMetric is used for. Maybe a refactoring leftover?
6. Please keep at least a line between class and first function:
+class TestPlugin(
+ def test_factory(self):
- 52. By Lucio Torre
-
fixed per review comments
- 53. By Lucio Torre
-
unify naming
Lucio Torre (lucio.torre) wrote : | # |
ian,
- done
- done
- too much refactoring there to add features and rewrite everything on one branch.
- "A convention for defining interfaces is do so in a file named like ProjectName/
- left for the big refactoring.
sidnei,
1. "A convention for defining interfaces is do so in a file named like ProjectName/
2. done
3. done (and changed other locations to use metric_type too as discussed on irc)
4. done
5. for sending metric that have no specific interface support in the client side and not adding dependencies on plugins there too.
6. fixed
- 54. By Lucio Torre
-
adding tests for passing arguments on flush
Sidnei da Silva (sidnei) wrote : | # |
Looks good. +1!
Preview Diff
1 | === added file 'twisted/plugins/distinct_plugin.py' |
2 | --- twisted/plugins/distinct_plugin.py 1970-01-01 00:00:00 +0000 |
3 | +++ twisted/plugins/distinct_plugin.py 2011-11-18 20:11:23 +0000 |
4 | @@ -0,0 +1,20 @@ |
5 | +from zope.interface import implements |
6 | + |
7 | +from twisted.plugin import IPlugin |
8 | +from txstatsd.itxstatsd import IMetricFactory |
9 | +from txstatsd.metrics.distinctmetric import DistinctMetricReporter |
10 | + |
11 | +class DistinctMetricFactory(object): |
12 | + implements(IMetricFactory, IPlugin) |
13 | + |
14 | + name = "pdistinct" |
15 | + metric_type = "pd" |
16 | + |
17 | + def build_metric(self, prefix, name, wall_time_func=None): |
18 | + return DistinctMetricReporter(name, prefix=prefix, |
19 | + wall_time_func=wall_time_func) |
20 | + |
21 | + def configure(self, options): |
22 | + pass |
23 | + |
24 | +distinct_metric_factory = DistinctMetricFactory() |
25 | |
26 | === modified file 'txstatsd.conf-example' |
27 | --- txstatsd.conf-example 2011-09-02 10:11:11 +0000 |
28 | +++ txstatsd.conf-example 2011-11-18 20:11:23 +0000 |
29 | @@ -21,3 +21,6 @@ |
30 | # configured monitor-response. |
31 | monitor-message: txstatsd ping |
32 | monitor-response: txstatsd pong |
33 | + |
34 | +[plugin_sample] |
35 | +sample-key: sample-value |
36 | \ No newline at end of file |
37 | |
38 | === added file 'txstatsd/itxstatsd.py' |
39 | --- txstatsd/itxstatsd.py 1970-01-01 00:00:00 +0000 |
40 | +++ txstatsd/itxstatsd.py 2011-11-18 20:11:23 +0000 |
41 | @@ -0,0 +1,54 @@ |
42 | +from zope.interface import Interface, Attribute |
43 | + |
44 | + |
45 | +class IMetricFactory(Interface): |
46 | + name = Attribute(""" |
47 | + @type name C{str} |
48 | + @ivar name: The name of this kind of metric |
49 | + """) |
50 | + |
51 | + metric_type = Attribute(""" |
52 | + @type metric_type: C{str} |
53 | + @ivar metric_type: The string that will be used by clients to send |
54 | + metrics of this kind to the server. |
55 | + """) |
56 | + |
57 | + def build_metric(prefix, name, wall_time_func=None): |
58 | + """ |
59 | + Returns an object that implements the C{IMetric} interface for name. |
60 | + |
61 | + @type prefix: C{str} |
62 | + @param prefix: The prefix used for reporting this metric. |
63 | + @type name: C{str} |
64 | + @param name: The name used for reporting this metric. |
65 | + """ |
66 | + |
67 | + def configure(options): |
68 | + """ |
69 | + Configures the factory. Will be called at startup by the service |
70 | + factory. |
71 | + |
72 | + @type options: C{twisted.python.usage.Options} |
73 | + @param options: The configuration options. |
74 | + """ |
75 | + |
76 | + |
77 | +class IMetric(Interface): |
78 | + def process(fields): |
79 | + """ |
80 | + Process new data for this metric. |
81 | + |
82 | + @type fields: C{list} |
83 | + @param fields: The list of message parts. Usually in the form of |
84 | + (value, metric_type, [sample_rate]) |
85 | + """ |
86 | + |
87 | + def flush(interval, timestamp): |
88 | + """ |
89 | + Returns a string with new line separated list of metrics to report. |
90 | + |
91 | + @type interval: C{float} |
92 | + @param interval: The time since last flush. |
93 | + @type timestamp: C{float} |
94 | + @param timestamp: The timestamp for now. |
95 | + """ |
96 | |
97 | === modified file 'txstatsd/metrics/distinctmetric.py' |
98 | --- txstatsd/metrics/distinctmetric.py 2011-10-26 16:48:42 +0000 |
99 | +++ txstatsd/metrics/distinctmetric.py 2011-11-18 20:11:23 +0000 |
100 | @@ -13,21 +13,23 @@ |
101 | |
102 | from string import Template |
103 | |
104 | +from zope.interface import implements |
105 | from txstatsd.metrics.metric import Metric |
106 | - |
107 | - |
108 | +from txstatsd.itxstatsd import IMetric |
109 | + |
110 | + |
111 | class SBoxHash(object): |
112 | """A very fast hash. |
113 | - |
114 | + |
115 | This class create a random hash function that is very fast. |
116 | Based on SBOXes. Not Crypto Strong. |
117 | - |
118 | + |
119 | Two instances of this class will hash differently. |
120 | """ |
121 | - |
122 | + |
123 | def __init__(self): |
124 | self.table = [random.randint(0, 0xFFFFFFFF - 1) for i in range(256)] |
125 | - |
126 | + |
127 | def hash(self, data): |
128 | value = 0 |
129 | for c in data: |
130 | @@ -35,14 +37,14 @@ |
131 | value = value * 3 |
132 | value = value & 0xFFFFFFFF |
133 | return value |
134 | - |
135 | - |
136 | + |
137 | + |
138 | def hash(data): |
139 | """Hash data using a random hasher.""" |
140 | p = SBoxHash() |
141 | return p.hash(data) |
142 | - |
143 | - |
144 | + |
145 | + |
146 | def zeros(n): |
147 | """Count the zeros to the right of the binary representation of n.""" |
148 | count = 0 |
149 | @@ -60,19 +62,19 @@ |
150 | |
151 | class SlidingDistinctCounter(object): |
152 | """A probabilistic distinct counter with sliding windows.""" |
153 | - |
154 | + |
155 | def __init__(self, n_hashes, n_buckets): |
156 | self.n_hashes = n_hashes |
157 | self.n_buckets = n_buckets |
158 | - |
159 | + |
160 | self.hashes = [SBoxHash() for i in range(n_hashes)] |
161 | self.buckets = [[0] * n_buckets for i in range(n_hashes)] |
162 | - |
163 | + |
164 | def add(self, when, item): |
165 | hashes = (h.hash(item) for h in self.hashes) |
166 | for i, value in enumerate(hashes): |
167 | self.buckets[i][min(self.n_buckets - 1, zeros(value))] = when |
168 | - |
169 | + |
170 | def distinct(self, since=0): |
171 | total = 0.0 |
172 | for i in range(self.n_hashes): |
173 | @@ -84,7 +86,7 @@ |
174 | total += least0 |
175 | v = total / self.n_hashes |
176 | return int((2 ** v) / 0.77351) |
177 | - |
178 | + |
179 | |
180 | class DistinctMetric(Metric): |
181 | """ |
182 | @@ -102,13 +104,13 @@ |
183 | Keeps an estimate of the distinct numbers of items seen on various |
184 | sliding windows of time. |
185 | """ |
186 | + implements(IMetric) |
187 | |
188 | MESSAGE = ( |
189 | "$prefix%(key)s.count_1min %(count_1min)s %(timestamp)s\n" |
190 | "$prefix%(key)s.count_1hour %(count_1hour)s %(timestamp)s\n" |
191 | "$prefix%(key)s.count_1day %(count_1day)s %(timestamp)s\n" |
192 | - "$prefix%(key)s.count %(count)s %(timestamp)s\n" |
193 | - ) |
194 | + "$prefix%(key)s.count %(count)s %(timestamp)s\n") |
195 | |
196 | def __init__(self, name, wall_time_func=time.time, prefix=""): |
197 | """Construct a metric we expect to be periodically updated. |
198 | @@ -131,18 +133,20 @@ |
199 | |
200 | def count_1min(self, now): |
201 | return self.counter.distinct(now - 60) |
202 | - |
203 | + |
204 | def count_1hour(self, now): |
205 | return self.counter.distinct(now - 60 * 60) |
206 | - |
207 | + |
208 | def count_1day(self, now): |
209 | return self.counter.distinct(now - 60 * 60 * 24) |
210 | |
211 | + def process(self, fields): |
212 | + self.update(fields[0]) |
213 | + |
214 | def update(self, item): |
215 | - """Adds a seen item.""" |
216 | self.counter.add(self.wall_time_func(), item) |
217 | - |
218 | - def report(self, timestamp): |
219 | + |
220 | + def flush(self, interval, timestamp): |
221 | now = self.wall_time_func() |
222 | return self.message % { |
223 | "key": self.name, |
224 | |
225 | === modified file 'txstatsd/metrics/metrics.py' |
226 | --- txstatsd/metrics/metrics.py 2011-10-25 20:22:10 +0000 |
227 | +++ txstatsd/metrics/metrics.py 2011-11-18 20:11:23 +0000 |
228 | @@ -5,6 +5,16 @@ |
229 | from txstatsd.metrics.metric import Metric |
230 | |
231 | |
232 | +class GenericMetric(Metric): |
233 | + def __init__(self, connection, key, name, sample_rate=1): |
234 | + super(GenericMetric, self).__init__(connection, name, |
235 | + sample_rate=sample_rate) |
236 | + self.key = key |
237 | + |
238 | + def mark(self, value): |
239 | + self.send("%s|%s" % (value, self.key)) |
240 | + |
241 | + |
242 | class Metrics(object): |
243 | def __init__(self, connection=None, namespace=""): |
244 | """A convenience class for reporting metric samples |
245 | @@ -20,6 +30,20 @@ |
246 | self.namespace = namespace |
247 | self._metrics = {} |
248 | |
249 | + def report(self, name, value, metric_type, sample_rate=1): |
250 | + """Report a generic metric. |
251 | + |
252 | + Used for server side plugins without client support. |
253 | + """ |
254 | + name = self.fully_qualify_name(name) |
255 | + if not name in self._metrics: |
256 | + metric = GenericMetric(self.connection, |
257 | + metric_type, |
258 | + name, |
259 | + sample_rate) |
260 | + self._metrics[name] = metric |
261 | + self._metrics[name].mark(value) |
262 | + |
263 | def gauge(self, name, value, sample_rate=1): |
264 | """Report an instantaneous reading of a particular value.""" |
265 | name = self.fully_qualify_name(name) |
266 | @@ -76,7 +100,7 @@ |
267 | metric = DistinctMetric(self.connection, name) |
268 | self._metrics[name] = metric |
269 | self._metrics[name].mark(item) |
270 | - |
271 | + |
272 | def clear(self, name): |
273 | """Allow the metric to re-initialize its internal state.""" |
274 | name = self.fully_qualify_name(name) |
275 | |
276 | === modified file 'txstatsd/process.py' |
277 | --- txstatsd/process.py 2011-08-08 09:28:45 +0000 |
278 | +++ txstatsd/process.py 2011-11-18 20:11:23 +0000 |
279 | @@ -114,7 +114,8 @@ |
280 | result = {prefix + ".cpu.percent": self.process.get_cpu_percent(), |
281 | prefix + ".cpu.user": utime, |
282 | prefix + ".cpu.system": stime, |
283 | - prefix + ".memory.percent": self.process.get_memory_percent(), |
284 | + prefix + ".memory.percent": |
285 | + self.process.get_memory_percent(), |
286 | prefix + ".memory.vsize": vsize, |
287 | prefix + ".memory.rss": rss} |
288 | if getattr(self.process, "get_num_threads", None) is not None: |
289 | |
290 | === modified file 'txstatsd/server/configurableprocessor.py' |
291 | --- txstatsd/server/configurableprocessor.py 2011-10-25 20:22:10 +0000 |
292 | +++ txstatsd/server/configurableprocessor.py 2011-11-18 20:11:23 +0000 |
293 | @@ -24,9 +24,9 @@ |
294 | |
295 | METRICS_SUMMARY = "statsd.numStats %s %s\n" |
296 | |
297 | - def __init__(self, time_function=time.time, message_prefix=""): |
298 | + def __init__(self, time_function=time.time, message_prefix="", plugins=None): |
299 | super(ConfigurableMessageProcessor, self).__init__( |
300 | - time_function=time_function) |
301 | + time_function=time_function, plugins=plugins) |
302 | |
303 | if message_prefix: |
304 | self.metrics_summary = message_prefix + '.' + \ |
305 | @@ -38,6 +38,9 @@ |
306 | self.message_prefix = message_prefix |
307 | self.gauge_metrics = {} |
308 | |
309 | + def get_message_prefix(self, kind): |
310 | + return self.message_prefix |
311 | + |
312 | def compose_timer_metric(self, key, duration): |
313 | if not key in self.timer_metrics: |
314 | metric = TimerMetricReporter(key, prefix=self.message_prefix) |
315 | |
316 | === modified file 'txstatsd/server/loggingprocessor.py' |
317 | --- txstatsd/server/loggingprocessor.py 2011-10-04 16:22:19 +0000 |
318 | +++ txstatsd/server/loggingprocessor.py 2011-11-18 20:11:23 +0000 |
319 | @@ -11,18 +11,20 @@ |
320 | attribute.) |
321 | """ |
322 | |
323 | - def __init__(self, logger, time_function=time.time, message_prefix=""): |
324 | + def __init__(self, logger, time_function=time.time, message_prefix="", plugins=None): |
325 | super(LoggingMessageProcessor, self).__init__( |
326 | - time_function=time_function, message_prefix=message_prefix) |
327 | + time_function=time_function, message_prefix=message_prefix, |
328 | + plugins=plugins) |
329 | |
330 | logger_info = getattr(logger, 'info', None) |
331 | if logger_info is None or not callable(logger_info): |
332 | raise TypeError() |
333 | self.logger = logger |
334 | |
335 | - def flush(self): |
336 | + def flush(self, interval=10000, percent=90): |
337 | """Log all received metric samples to the supplied log file.""" |
338 | timestamp = int(self.time_function()) |
339 | + interval = interval / 1000 |
340 | |
341 | def log_metrics(metrics): |
342 | for metric in metrics.itervalues(): |
343 | @@ -34,4 +36,9 @@ |
344 | log_metrics(self.gauge_metrics) |
345 | log_metrics(self.meter_metrics) |
346 | log_metrics(self.timer_metrics) |
347 | + |
348 | + for metric in self.plugin_metrics.itervalues(): |
349 | + report = metric.flush(interval, timestamp) |
350 | + for measurement in report.splitlines(): |
351 | + self.logger.info(measurement) |
352 | |
353 | |
354 | === modified file 'txstatsd/server/processor.py' |
355 | --- txstatsd/server/processor.py 2011-10-26 16:48:42 +0000 |
356 | +++ txstatsd/server/processor.py 2011-11-18 20:11:23 +0000 |
357 | @@ -50,7 +50,7 @@ |
358 | GAUGE_METRIC_MESSAGE = ( |
359 | "stats.gauge.%(key)s.value %(value)s %(timestamp)s\n") |
360 | |
361 | - def __init__(self, time_function=time.time): |
362 | + def __init__(self, time_function=time.time, plugins=None): |
363 | self.time_function = time_function |
364 | |
365 | self.counters_message = MessageProcessor.COUNTERS_MESSAGE |
366 | @@ -63,6 +63,13 @@ |
367 | self.meter_metrics = {} |
368 | self.distinct_metrics = {} |
369 | |
370 | + self.plugins = {} |
371 | + self.plugin_metrics = {} |
372 | + |
373 | + if plugins is not None: |
374 | + for plugin in plugins: |
375 | + self.plugins[plugin.metric_type] = plugin |
376 | + |
377 | def fail(self, message): |
378 | """Log and discard malformed message.""" |
379 | log.msg("Bad line: %r" % message, logLevel=logging.DEBUG) |
380 | @@ -84,30 +91,33 @@ |
381 | return self.fail(message) |
382 | |
383 | key = normalize_key(key) |
384 | + metric_type = fields[1] |
385 | |
386 | - if fields[1] == "c": |
387 | + if metric_type == "c": |
388 | self.process_counter_metric(key, fields, message) |
389 | - elif fields[1] == "ms": |
390 | + elif metric_type == "ms": |
391 | self.process_timer_metric(key, fields[0], message) |
392 | - elif fields[1] == "g": |
393 | + elif metric_type == "g": |
394 | self.process_gauge_metric(key, fields[0], message) |
395 | - elif fields[1] == "m": |
396 | + elif metric_type == "m": |
397 | self.process_meter_metric(key, fields[0], message) |
398 | - elif fields[1] == "d": |
399 | - self.process_distinct_metric(key, fields[0], message) |
400 | + elif metric_type in self.plugins: |
401 | + self.process_plugin_metric(metric_type, key, fields, message) |
402 | else: |
403 | return self.fail(message) |
404 | |
405 | - def process_distinct_metric(self, key, item, message): |
406 | - self.compose_distinct_metric(key, str(item)) |
407 | - |
408 | - def compose_distinct_metric(self, key, item): |
409 | - if not key in self.distinct_metrics: |
410 | - metric = DistinctMetricReporter(key, self.time_function, |
411 | - prefix="stats.distinct") |
412 | - self.distinct_metrics[key] = metric |
413 | - self.distinct_metrics[key].update(item) |
414 | - |
415 | + def get_message_prefix(self, kind): |
416 | + return "stats." + kind |
417 | + |
418 | + def process_plugin_metric(self, metric_type, key, items, message): |
419 | + if not key in self.plugin_metrics: |
420 | + factory = self.plugins[metric_type] |
421 | + metric = factory.build_metric( |
422 | + self.get_message_prefix(factory.name), |
423 | + name=key, wall_time_func=self.time_function) |
424 | + self.plugin_metrics[key] = metric |
425 | + self.plugin_metrics[key].process(items) |
426 | + |
427 | def process_timer_metric(self, key, duration, message): |
428 | try: |
429 | duration = float(duration) |
430 | @@ -206,11 +216,11 @@ |
431 | messages.extend(meter_metrics) |
432 | num_stats += events |
433 | |
434 | - distinct_metrics, events = self.flush_distinct_metrics(timestamp) |
435 | + plugin_metrics, events = self.flush_plugin_metrics(interval, timestamp) |
436 | if events > 0: |
437 | - messages.extend(distinct_metrics) |
438 | + messages.extend(plugin_metrics) |
439 | num_stats += events |
440 | - |
441 | + |
442 | self.flush_metrics_summary(messages, num_stats, timestamp) |
443 | return messages |
444 | |
445 | @@ -297,16 +307,17 @@ |
446 | |
447 | return (metrics, events) |
448 | |
449 | - def flush_distinct_metrics(self, timestamp): |
450 | + def flush_plugin_metrics(self, interval, timestamp): |
451 | metrics = [] |
452 | events = 0 |
453 | - for metric in self.distinct_metrics.itervalues(): |
454 | - message = metric.report(timestamp) |
455 | + |
456 | + for metric in self.plugin_metrics.itervalues(): |
457 | + message = metric.flush(interval, timestamp) |
458 | metrics.append(message) |
459 | events += 1 |
460 | |
461 | return (metrics, events) |
462 | - |
463 | + |
464 | def flush_metrics_summary(self, messages, num_stats, timestamp): |
465 | messages.append("statsd.numStats %s %s\n" % (num_stats, timestamp)) |
466 | |
467 | |
468 | === modified file 'txstatsd/service.py' |
469 | --- txstatsd/service.py 2011-10-25 14:33:14 +0000 |
470 | +++ txstatsd/service.py 2011-11-18 20:11:23 +0000 |
471 | @@ -7,6 +7,7 @@ |
472 | from twisted.application.internet import TCPClient, UDPServer |
473 | from twisted.application.service import MultiService |
474 | from twisted.python import usage |
475 | +from twisted.plugin import getPlugins |
476 | |
477 | from txstatsd.client import InternalClient |
478 | from txstatsd.metrics.metrics import Metrics |
479 | @@ -15,11 +16,12 @@ |
480 | from txstatsd.server.protocol import ( |
481 | GraphiteClientFactory, StatsDServerProtocol) |
482 | from txstatsd.report import ReportingService |
483 | +from txstatsd.itxstatsd import IMetricFactory |
484 | |
485 | |
486 | def accumulateClassList(classObj, attr, listObj, |
487 | baseClass=None, excludeClass=None): |
488 | - """Accumulate all attributes of a given name in a class hierarchy |
489 | + """Accumulate all attributes of a given name in a class hierarchy |
490 | into a single list. |
491 | |
492 | Assuming all class attributes of this name are lists. |
493 | @@ -35,8 +37,7 @@ |
494 | """Extends usage.Options to also read parameters from a config file.""" |
495 | |
496 | optParameters = [ |
497 | - ["config", "c", None, "Config file to use."] |
498 | - ] |
499 | + ["config", "c", None, "Config file to use."]] |
500 | |
501 | def __init__(self): |
502 | parameters = [] |
503 | @@ -99,6 +100,10 @@ |
504 | value = self._dispatch[name].coerce(value) |
505 | self[name] = value |
506 | |
507 | + for section in config_file.sections(): |
508 | + if section.startswith("plugin_"): |
509 | + self[section] = config_file.items(section) |
510 | + |
511 | |
512 | class StatsDOptions(OptionsGlue): |
513 | """ |
514 | @@ -122,8 +127,7 @@ |
515 | ["monitor-response", "o", "txstatsd pong", |
516 | "Response we should send monitoring agent.", str], |
517 | ["statsd-compliance", "s", 1, |
518 | - "Produce StatsD-compliant messages.", int] |
519 | - ] |
520 | + "Produce StatsD-compliant messages.", int]] |
521 | |
522 | def __init__(self): |
523 | self.config_section = 'statsd' |
524 | @@ -140,12 +144,19 @@ |
525 | if prefix is None: |
526 | prefix = socket.gethostname() + ".statsd" |
527 | |
528 | + # initialize plugins |
529 | + plugin_metrics = [] |
530 | + for plugin in getPlugins(IMetricFactory): |
531 | + plugin.configure(options) |
532 | + plugin_metrics.append(plugin) |
533 | + |
534 | if options["statsd-compliance"]: |
535 | - processor = MessageProcessor() |
536 | + processor = MessageProcessor(plugins=plugin_metrics) |
537 | connection = InternalClient(processor) |
538 | metrics = Metrics(connection, namespace=prefix) |
539 | else: |
540 | - processor = ConfigurableMessageProcessor(message_prefix=prefix) |
541 | + processor = ConfigurableMessageProcessor(message_prefix=prefix, |
542 | + plugins=plugin_metrics) |
543 | connection = InternalClient(processor) |
544 | metrics = Metrics(connection) |
545 | |
546 | |
547 | === modified file 'txstatsd/tests/metrics/test_distinct.py' |
548 | --- txstatsd/tests/metrics/test_distinct.py 2011-10-26 03:26:21 +0000 |
549 | +++ txstatsd/tests/metrics/test_distinct.py 2011-11-18 20:11:23 +0000 |
550 | @@ -5,11 +5,13 @@ |
551 | from scipy.stats import chi2 |
552 | |
553 | from twisted.trial.unittest import TestCase |
554 | - |
555 | +from twisted.plugin import getPlugins |
556 | +from twisted.plugins import distinct_plugin |
557 | import txstatsd.metrics.distinctmetric as distinct |
558 | - |
559 | - |
560 | -class TestHash(TestCase): |
561 | +from txstatsd.itxstatsd import IMetricFactory |
562 | + |
563 | + |
564 | +class TestHash(TestCase): |
565 | def test_hash_chars(self): |
566 | "For one table, all chars map to different chars" |
567 | results = set() |
568 | @@ -18,10 +20,10 @@ |
569 | h = distinct.hash(chr(c)) |
570 | results.add(h) |
571 | self.assertEquals(len(results), 256) |
572 | - |
573 | + |
574 | def test_chi_square(self): |
575 | N = 10000 |
576 | - |
577 | + |
578 | for (bits, buckets) in [(-1, 1024), (24, 256), |
579 | (16, 256), (8, 256), (0, 256)]: |
580 | bins = [0] * buckets |
581 | @@ -48,26 +50,27 @@ |
582 | self.assertEquals(distinct.zeros(5), 0) |
583 | self.assertEquals(distinct.zeros(8), 3) |
584 | self.assertEquals(distinct.zeros(9), 0) |
585 | - |
586 | + |
587 | + |
588 | class TestDistinct(TestCase): |
589 | def test_all(self): |
590 | random.seed(1) |
591 | - |
592 | + |
593 | for r in [1000, 10000]: |
594 | cd = distinct.SlidingDistinctCounter(32, 32) |
595 | for i in range(r): |
596 | cd.add(1, str(i)) |
597 | error = abs(cd.distinct() - r) |
598 | self.assertTrue(error < 0.15 * r) |
599 | - |
600 | - |
601 | + |
602 | + |
603 | class TestDistinctMetricReporter(TestCase): |
604 | def test_reports(self): |
605 | random.seed(1) |
606 | _wall_time = [0] |
607 | def _time(): |
608 | return _wall_time[0] |
609 | - |
610 | + |
611 | dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time) |
612 | for i in range(3000): |
613 | _wall_time[0] = i * 50 |
614 | @@ -77,4 +80,12 @@ |
615 | self.assertTrue(abs(dmr.count_1min(now) - 1) < 2) |
616 | self.assertTrue(abs(dmr.count_1hour(now) - 72) < 15) |
617 | self.assertTrue(abs(dmr.count_1day(now) - 1728) < 500) |
618 | - self.assertTrue("count_1hour" in dmr.report(now)) |
619 | + self.assertTrue("count_1hour" in dmr.flush(1, now)) |
620 | + |
621 | + |
622 | +class TestPlugin(TestCase): |
623 | + |
624 | + def test_factory(self): |
625 | + self.assertTrue(distinct_plugin.distinct_metric_factory in \ |
626 | + list(getPlugins(IMetricFactory))) |
627 | + |
628 | |
629 | === modified file 'txstatsd/tests/test_configurableprocessor.py' |
630 | --- txstatsd/tests/test_configurableprocessor.py 2011-10-10 16:08:12 +0000 |
631 | +++ txstatsd/tests/test_configurableprocessor.py 2011-11-18 20:11:23 +0000 |
632 | @@ -2,9 +2,10 @@ |
633 | |
634 | from unittest import TestCase |
635 | |
636 | +from twisted.plugins.distinct_plugin import distinct_metric_factory |
637 | + |
638 | from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor |
639 | |
640 | - |
641 | class FlushMessagesTest(TestCase): |
642 | |
643 | def test_flush_counter_with_empty_prefix(self): |
644 | @@ -36,7 +37,20 @@ |
645 | self.assertEqual("test.metric.gorets.count 17 42", counters[0]) |
646 | self.assertEqual("test.metric.statsd.numStats 1 42", |
647 | messages[1].splitlines()[0]) |
648 | - |
649 | + |
650 | + def test_flush_plugin(self): |
651 | + """ |
652 | + Ensure the prefix features if one is supplied. |
653 | + """ |
654 | + configurable_processor = ConfigurableMessageProcessor( |
655 | + time_function=lambda: 42, message_prefix="test.metric", |
656 | + plugins=[distinct_metric_factory]) |
657 | + configurable_processor.process("gorets:17|pd") |
658 | + messages = configurable_processor.flush() |
659 | + self.assertEqual(2, len(messages)) |
660 | + self.assertTrue("test.metric.gorets" in messages[0]) |
661 | + |
662 | + |
663 | def test_flush_single_timer_single_time(self): |
664 | """ |
665 | If a single timer with a single data point is present, all |
666 | |
667 | === modified file 'txstatsd/tests/test_loggingprocessor.py' |
668 | --- txstatsd/tests/test_loggingprocessor.py 2011-10-05 15:55:01 +0000 |
669 | +++ txstatsd/tests/test_loggingprocessor.py 2011-11-18 20:11:23 +0000 |
670 | @@ -1,6 +1,8 @@ |
671 | |
672 | from unittest import TestCase |
673 | |
674 | +from twisted.plugins.distinct_plugin import distinct_metric_factory |
675 | + |
676 | from txstatsd.server.loggingprocessor import LoggingMessageProcessor |
677 | |
678 | |
679 | @@ -8,6 +10,13 @@ |
680 | def report(self, *args): |
681 | return 'Sample report' |
682 | |
683 | +class TestLogger(object): |
684 | + def __init__(self): |
685 | + self.log = '' |
686 | + |
687 | + def info(self, measurement): |
688 | + self.log += measurement + "\n" |
689 | + |
690 | |
691 | class TestLoggingMessageProcessor(TestCase): |
692 | |
693 | @@ -30,16 +39,21 @@ |
694 | self.assertRaises(TypeError, invoker) |
695 | |
696 | def test_logger(self): |
697 | - class Logger(object): |
698 | - def __init__(self): |
699 | - self.log = '' |
700 | - |
701 | - def info(self, measurement): |
702 | - self.log += measurement |
703 | - |
704 | - logger = Logger() |
705 | + logger = TestLogger() |
706 | processor = LoggingMessageProcessor(logger) |
707 | metric = FakeMeterMetric() |
708 | processor.meter_metrics['test'] = metric |
709 | processor.flush() |
710 | - self.assertEqual(metric.report(), logger.log) |
711 | + self.assertEqual(metric.report() + "\n", logger.log) |
712 | + |
713 | + def test_logger_plugin(self): |
714 | + logger = TestLogger() |
715 | + processor = LoggingMessageProcessor( |
716 | + logger, plugins=[distinct_metric_factory], |
717 | + time_function=lambda: 42) |
718 | + processor.process("gorets:17|pd") |
719 | + processor.flush() |
720 | + self.assertEqual(processor.plugin_metrics['gorets'].flush( |
721 | + 10, processor.time_function()), |
722 | + logger.log) |
723 | + |
724 | |
725 | === modified file 'txstatsd/tests/test_metrics.py' |
726 | --- txstatsd/tests/test_metrics.py 2011-08-26 12:16:45 +0000 |
727 | +++ txstatsd/tests/test_metrics.py 2011-11-18 20:11:23 +0000 |
728 | @@ -53,6 +53,12 @@ |
729 | self.assertEqual(self.connection.data, |
730 | 'txstatsd.tests.timing:101123|ms') |
731 | |
732 | + def test_generic(self): |
733 | + """Test the GenericMetric class.""" |
734 | + self.metrics.report('users', "pepe", "pd") |
735 | + self.assertEqual(self.connection.data, |
736 | + 'txstatsd.tests.users:pepe|pd') |
737 | + |
738 | def test_empty_namespace(self): |
739 | """Test reporting of an empty namespace.""" |
740 | self.metrics.namespace = None |
741 | |
742 | === modified file 'txstatsd/tests/test_processor.py' |
743 | --- txstatsd/tests/test_processor.py 2011-10-26 16:48:42 +0000 |
744 | +++ txstatsd/tests/test_processor.py 2011-11-18 20:11:23 +0000 |
745 | @@ -1,14 +1,16 @@ |
746 | import time |
747 | |
748 | from unittest import TestCase |
749 | +from twisted.plugin import getPlugins |
750 | |
751 | from txstatsd.server.processor import MessageProcessor |
752 | - |
753 | +from txstatsd.itxstatsd import IMetricFactory |
754 | |
755 | class TestMessageProcessor(MessageProcessor): |
756 | |
757 | def __init__(self): |
758 | - super(TestMessageProcessor, self).__init__() |
759 | + super(TestMessageProcessor, self).__init__( |
760 | + plugins=getPlugins(IMetricFactory)) |
761 | self.failures = [] |
762 | |
763 | def fail(self, message): |
764 | @@ -66,12 +68,12 @@ |
765 | def test_receive_distinct_metric(self): |
766 | """ |
767 | A distinct metric message takes the form: |
768 | - '<name>:<item>|d'. |
769 | - 'distinct' indicates this is a distinct metric message. |
770 | + '<name>:<item>|pd'. |
771 | + 'pd' indicates this is a probabilistic distinct metric message. |
772 | """ |
773 | - self.processor.process("gorets:one|d") |
774 | - self.assertEqual(1, len(self.processor.distinct_metrics)) |
775 | - self.assertTrue(self.processor.distinct_metrics["gorets"].count() > 0) |
776 | + self.processor.process("gorets:one|pd") |
777 | + self.assertEqual(1, len(self.processor.plugin_metrics)) |
778 | + self.assertTrue(self.processor.plugin_metrics["gorets"].count() > 0) |
779 | |
780 | def test_receive_message_no_fields(self): |
781 | """ |
782 | @@ -120,7 +122,8 @@ |
783 | class FlushMessagesTest(TestCase): |
784 | |
785 | def setUp(self): |
786 | - self.processor = MessageProcessor(time_function=lambda: 42) |
787 | + self.processor = MessageProcessor(time_function=lambda: 42, |
788 | + plugins=getPlugins(IMetricFactory)) |
789 | |
790 | def test_flush_no_stats(self): |
791 | """ |
792 | @@ -243,15 +246,27 @@ |
793 | a distinct metric. |
794 | """ |
795 | |
796 | - self.processor.process("gorets:item|d") |
797 | - |
798 | + self.processor.process("gorets:item|pd") |
799 | + |
800 | messages = self.processor.flush() |
801 | self.assertEqual(2, len(messages)) |
802 | metrics = messages[0] |
803 | - self.assertTrue("stats.distinct.gorets.count " in metrics) |
804 | - self.assertTrue("stats.distinct.gorets.count_1hour" in metrics) |
805 | - self.assertTrue("stats.distinct.gorets.count_1min" in metrics) |
806 | - self.assertTrue("stats.distinct.gorets.count_1day" in metrics) |
807 | + self.assertTrue("stats.pdistinct.gorets.count " in metrics) |
808 | + self.assertTrue("stats.pdistinct.gorets.count_1hour" in metrics) |
809 | + self.assertTrue("stats.pdistinct.gorets.count_1min" in metrics) |
810 | + self.assertTrue("stats.pdistinct.gorets.count_1day" in metrics) |
811 | + |
812 | + def test_flush_plugin_arguments(self): |
813 | + """Test the passing of arguments for flush.""" |
814 | + |
815 | + class FakeMetric(object): |
816 | + def flush(self, interval, timestamp): |
817 | + self.data = interval, timestamp |
818 | + |
819 | + self.processor.plugin_metrics["somemetric"] = FakeMetric() |
820 | + self.processor.flush(41000) |
821 | + self.assertEquals((41,42), |
822 | + self.processor.plugin_metrics["somemetric"].data) |
823 | |
824 | |
825 | class FlushMeterMetricMessagesTest(TestCase): |
826 | |
827 | === modified file 'txstatsd/tests/test_service.py' |
828 | --- txstatsd/tests/test_service.py 2011-09-29 08:50:43 +0000 |
829 | +++ txstatsd/tests/test_service.py 2011-11-18 20:11:23 +0000 |
830 | @@ -2,6 +2,7 @@ |
831 | import ConfigParser |
832 | import tempfile |
833 | from unittest import TestCase |
834 | +from StringIO import StringIO |
835 | |
836 | from twisted.internet.defer import inlineCallbacks, Deferred |
837 | from twisted.internet.protocol import DatagramProtocol |
838 | @@ -104,6 +105,17 @@ |
839 | f, o = self.get_file_parser([["number", "n", 5, "help", int]]) |
840 | o.parseOptions(["--config", f.name]) |
841 | self.assertEquals(5, o["number"]) |
842 | + |
843 | + def test_support_plugin_sections(self): |
844 | + class TestOptions(service.OptionsGlue): |
845 | + optParameters = [["test", "t", "default", "help"]] |
846 | + config_section = "statsd" |
847 | + |
848 | + o = TestOptions() |
849 | + config_file = ConfigParser.RawConfigParser() |
850 | + config_file.readfp(StringIO("[statsd]\n\n[plugin_test]\nfoo = bar\n")) |
851 | + o.configure(config_file) |
852 | + self.assertEquals(o["plugin_test"], config_file.items("plugin_test")) |
853 | |
854 | |
855 | class Agent(DatagramProtocol): |
856 | |
857 | === modified file 'txstatsd/version.py' |
858 | --- txstatsd/version.py 2011-11-02 10:50:24 +0000 |
859 | +++ txstatsd/version.py 2011-11-18 20:11:23 +0000 |
860 | @@ -1,1 +1,1 @@ |
861 | -txstatsd = "0.6.4" |
862 | +txstatsd = "0.7.0" |
+1
Some quick comments:
* version.py needs a bump conf-example might be useful sor.py a specialised processor, with a metrics processor and functionality for handling processor plugins plugins/ codahale_ plugin. py would then allow the inclusion of the Coda Hale metrics sor.py could allow aggregation of processor plugins, for example, we would be able to support Coda Hale metrics, and logging capability metrics/ imetric. py
* including a plugin_ section in txstatsd.
* rather than metric plugins, I would be tempted by processor plugins. This would involve:
- processor.py becoming an abstract processor, instantiated with a registry of metrics
- statsdprocessor.py specialising the abstract processor with its metrics registry populated by the supported StatsD metrics
- pluggableproces
- processorplugin.py replaces the metrics-oriented extensions introduced with the distinct_plugin.py
- twisted/
- the pluggableproces
* itxstatsd.py could be txstatsd/
* introduce txstatsd/config.py with the accumulateClassList and OptionsGlue from service.py. Have an extended OptionsGlue that includes the plugin support
* best stop there!