Merge lp:~lucio.torre/txstatsd/add-distinct-count into lp:txstatsd
- add-distinct-count
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Sidnei da Silva |
Approved revision: | 40 |
Merged at revision: | 48 |
Proposed branch: | lp:~lucio.torre/txstatsd/add-distinct-count |
Merge into: | lp:txstatsd |
Diff against target: |
400 lines (+303/-0) 6 files modified
txstatsd/metrics/distinctmetric.py (+153/-0) txstatsd/metrics/metrics.py (+8/-0) txstatsd/server/configurableprocessor.py (+7/-0) txstatsd/server/processor.py (+29/-0) txstatsd/tests/metrics/test_distinct.py (+80/-0) txstatsd/tests/test_processor.py (+26/-0) |
To merge this branch: | bzr merge lp:~lucio.torre/txstatsd/add-distinct-count |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Sidnei da Silva | Approve | ||
Review via email: mp+80391@code.launchpad.net |
Commit message
add probabilistic distinct counter
Description of the change
Implements a probabilistic distinct counter with sliding windows.
See: http://
for an idea on the algorithm.
It uses custom hash functions that are tested for uniform distribution (now skipped, they take too long).
Lucio Torre (lucio.torre) wrote : | # |
1- because it wont matter much in the packet size and its more clear. I can change it if you want, it just seamed better. I dont like when we start growing code letters that we repeat in various parts of the code and they carry no meaning.
2- fixed
3- fixed
4- Turned the hash loop into a generator so its just one pass of size n_hashes
5- fixed.
6- There is a lot to fix there. I dont think that is that bad, as the metric will be created in the entry point for those measurements. Still, all the changes that need to be added for process, flush and configurable processor seem a bit repetitive. But yes, it needs some work there, we should discuss it. I always saw this work a part of leaving behind statsd compatibility.
Sidnei da Silva (sidnei) wrote : | # |
Re: 1, please change it for the sake of consistency. While I agree its not a lot relative to the whole packet, I think it's worth saving every byte we can.
The remaining changes look good. +1
Note you can set a commit message and flip the MP to approved, and tarmac will pick it up.
Ubuntu One Server Tarmac Bot (ubuntuone-server-tarmac) wrote : | # |
There are additional revisions which have not been approved in review. Please seek review and approval of these new revisions.
Sidnei da Silva (sidnei) : | # |
Ubuntu One Server Tarmac Bot (ubuntuone-server-tarmac) wrote : | # |
The attempt to merge lp:~lucio.torre/txstatsd/add-distinct-count into lp:txstatsd failed. Below is the output from the failed tests.
txstatsd.
TestHistogram
test_
test_
txstatsd.
TestBlankTime
test_count ... [OK]
test_
test_
test_max ... [OK]
test_mean ... [OK]
test_mean_rate ... [OK]
test_min ... [OK]
test_no_values ... [OK]
test_
test_
test_std_dev ... [OK]
TestTimingSer
test_count ... [OK]
test_max ... [OK]
test_mean ... [OK]
test_min ... [OK]
test_
test_std_dev ... [OK]
test_values ... [OK]
txstatsd.
TestEwmaFifte
test_
test_
test_
test_first_tick ... [OK]
test_
test_
test_
test_
test_one_minute ... [OK]
test_
test_
test_
test_
test_
test_
Preview Diff
1 | === added file 'txstatsd/metrics/distinctmetric.py' |
2 | --- txstatsd/metrics/distinctmetric.py 1970-01-01 00:00:00 +0000 |
3 | +++ txstatsd/metrics/distinctmetric.py 2011-10-26 16:55:26 +0000 |
4 | @@ -0,0 +1,153 @@ |
5 | +# Copyright (C) 2011 Canonical |
6 | +# All Rights Reserved |
7 | +""" |
8 | +Implements a probabilistic distinct counter with sliding windows. |
9 | + |
10 | +Based on: |
11 | +http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.12.7100 |
12 | + |
13 | +And extended for sliding windows. |
14 | +""" |
15 | +import random |
16 | +import time |
17 | + |
18 | +from string import Template |
19 | + |
20 | +from txstatsd.metrics.metric import Metric |
21 | + |
22 | + |
23 | +class SBoxHash(object): |
24 | + """A very fast hash. |
25 | + |
26 | + This class create a random hash function that is very fast. |
27 | + Based on SBOXes. Not Crypto Strong. |
28 | + |
29 | + Two instances of this class will hash differently. |
30 | + """ |
31 | + |
32 | + def __init__(self): |
33 | + self.table = [random.randint(0, 0xFFFFFFFF - 1) for i in range(256)] |
34 | + |
35 | + def hash(self, data): |
36 | + value = 0 |
37 | + for c in data: |
38 | + value = value ^ self.table[ord(c)] |
39 | + value = value * 3 |
40 | + value = value & 0xFFFFFFFF |
41 | + return value |
42 | + |
43 | + |
44 | +def hash(data): |
45 | + """Hash data using a random hasher.""" |
46 | + p = SBoxHash() |
47 | + return p.hash(data) |
48 | + |
49 | + |
50 | +def zeros(n): |
51 | + """Count the zeros to the right of the binary representation of n.""" |
52 | + count = 0 |
53 | + i = 0 |
54 | + while True: |
55 | + v = (n >> i) |
56 | + if v <= 0: |
57 | + return count |
58 | + if v & 1: |
59 | + return count |
60 | + count += 1 |
61 | + i += 1 |
62 | + return count |
63 | + |
64 | + |
65 | +class SlidingDistinctCounter(object): |
66 | + """A probabilistic distinct counter with sliding windows.""" |
67 | + |
68 | + def __init__(self, n_hashes, n_buckets): |
69 | + self.n_hashes = n_hashes |
70 | + self.n_buckets = n_buckets |
71 | + |
72 | + self.hashes = [SBoxHash() for i in range(n_hashes)] |
73 | + self.buckets = [[0] * n_buckets for i in range(n_hashes)] |
74 | + |
75 | + def add(self, when, item): |
76 | + hashes = (h.hash(item) for h in self.hashes) |
77 | + for i, value in enumerate(hashes): |
78 | + self.buckets[i][min(self.n_buckets - 1, zeros(value))] = when |
79 | + |
80 | + def distinct(self, since=0): |
81 | + total = 0.0 |
82 | + for i in range(self.n_hashes): |
83 | + least0 = 0 |
84 | + for b in range(self.n_buckets): |
85 | + if self.buckets[i][b] <= since: |
86 | + break |
87 | + least0 += 1 |
88 | + total += least0 |
89 | + v = total / self.n_hashes |
90 | + return int((2 ** v) / 0.77351) |
91 | + |
92 | + |
93 | +class DistinctMetric(Metric): |
94 | + """ |
95 | + Keeps an estimate of the distinct numbers of items seen on various |
96 | + sliding windows of time. |
97 | + """ |
98 | + |
99 | + def mark(self, item): |
100 | + """Report this item was seen.""" |
101 | + self.send("%s|d" % item) |
102 | + |
103 | + |
104 | +class DistinctMetricReporter(object): |
105 | + """ |
106 | + Keeps an estimate of the distinct numbers of items seen on various |
107 | + sliding windows of time. |
108 | + """ |
109 | + |
110 | + MESSAGE = ( |
111 | + "$prefix%(key)s.count_1min %(count_1min)s %(timestamp)s\n" |
112 | + "$prefix%(key)s.count_1hour %(count_1hour)s %(timestamp)s\n" |
113 | + "$prefix%(key)s.count_1day %(count_1day)s %(timestamp)s\n" |
114 | + "$prefix%(key)s.count %(count)s %(timestamp)s\n" |
115 | + ) |
116 | + |
117 | + def __init__(self, name, wall_time_func=time.time, prefix=""): |
118 | + """Construct a metric we expect to be periodically updated. |
119 | + |
120 | + @param name: Indicates what is being instrumented. |
121 | + @param wall_time_func: Function for obtaining wall time. |
122 | + @param prefix: If present, a string to prepend to the message |
123 | + composed when C{report} is called. |
124 | + """ |
125 | + self.name = name |
126 | + self.wall_time_func = wall_time_func |
127 | + self.counter = SlidingDistinctCounter(32, 32) |
128 | + if prefix: |
129 | + prefix += '.' |
130 | + self.message = Template(DistinctMetricReporter.MESSAGE).substitute( |
131 | + prefix=prefix) |
132 | + |
133 | + def count(self): |
134 | + return self.counter.distinct() |
135 | + |
136 | + def count_1min(self, now): |
137 | + return self.counter.distinct(now - 60) |
138 | + |
139 | + def count_1hour(self, now): |
140 | + return self.counter.distinct(now - 60 * 60) |
141 | + |
142 | + def count_1day(self, now): |
143 | + return self.counter.distinct(now - 60 * 60 * 24) |
144 | + |
145 | + def update(self, item): |
146 | + """Adds a seen item.""" |
147 | + self.counter.add(self.wall_time_func(), item) |
148 | + |
149 | + def report(self, timestamp): |
150 | + now = self.wall_time_func() |
151 | + return self.message % { |
152 | + "key": self.name, |
153 | + "count": self.count(), |
154 | + "count_1min": self.count_1min(now), |
155 | + "count_1hour": self.count_1hour(now), |
156 | + "count_1day": self.count_1day(now), |
157 | + "timestamp": timestamp} |
158 | |
159 | === modified file 'txstatsd/metrics/metrics.py' |
160 | --- txstatsd/metrics/metrics.py 2011-08-25 17:41:37 +0000 |
161 | +++ txstatsd/metrics/metrics.py 2011-10-26 16:55:26 +0000 |
162 | @@ -1,6 +1,7 @@ |
163 | |
164 | from txstatsd.metrics.gaugemetric import GaugeMetric |
165 | from txstatsd.metrics.metermetric import MeterMetric |
166 | +from txstatsd.metrics.distinctmetric import DistinctMetric |
167 | from txstatsd.metrics.metric import Metric |
168 | |
169 | |
170 | @@ -69,6 +70,13 @@ |
171 | self._metrics[name] = metric |
172 | self._metrics[name].send("%s|ms" % duration) |
173 | |
174 | + def distinct(self, name, item): |
175 | + name = self.fully_qualify_name(name) |
176 | + if not name in self._metrics: |
177 | + metric = DistinctMetric(self.connection, name) |
178 | + self._metrics[name] = metric |
179 | + self._metrics[name].mark(item) |
180 | + |
181 | def clear(self, name): |
182 | """Allow the metric to re-initialize its internal state.""" |
183 | name = self.fully_qualify_name(name) |
184 | |
185 | === modified file 'txstatsd/server/configurableprocessor.py' |
186 | --- txstatsd/server/configurableprocessor.py 2011-10-10 16:08:12 +0000 |
187 | +++ txstatsd/server/configurableprocessor.py 2011-10-26 16:55:26 +0000 |
188 | @@ -71,6 +71,13 @@ |
189 | self.meter_metrics[key] = metric |
190 | self.meter_metrics[key].mark(value) |
191 | |
192 | + def compose_distinct_metric(self, key, item): |
193 | + if not key in self.distinct_metrics: |
194 | + metric = DistinctMetricReporter(key, self.time_function, |
195 | + prefix=self.message_prefix) |
196 | + self.distinct_metrics[key] = metric |
197 | + self.distinct_metrics[key].update(item) |
198 | + |
199 | def flush_counter_metrics(self, interval, timestamp): |
200 | metrics = [] |
201 | events = 0 |
202 | |
203 | === modified file 'txstatsd/server/processor.py' |
204 | --- txstatsd/server/processor.py 2011-10-10 16:08:12 +0000 |
205 | +++ txstatsd/server/processor.py 2011-10-26 16:55:26 +0000 |
206 | @@ -6,6 +6,7 @@ |
207 | from twisted.python import log |
208 | |
209 | from txstatsd.metrics.metermetric import MeterMetricReporter |
210 | +from txstatsd.metrics.distinctmetric import DistinctMetricReporter |
211 | |
212 | |
213 | SPACES = re.compile("\s+") |
214 | @@ -60,6 +61,7 @@ |
215 | self.counter_metrics = {} |
216 | self.gauge_metrics = deque() |
217 | self.meter_metrics = {} |
218 | + self.distinct_metrics = {} |
219 | |
220 | def fail(self, message): |
221 | """Log and discard malformed message.""" |
222 | @@ -91,9 +93,21 @@ |
223 | self.process_gauge_metric(key, fields[0], message) |
224 | elif fields[1] == "m": |
225 | self.process_meter_metric(key, fields[0], message) |
226 | + elif fields[1] == "d": |
227 | + self.process_distinct_metric(key, fields[0], message) |
228 | else: |
229 | return self.fail(message) |
230 | |
231 | + def process_distinct_metric(self, key, item, message): |
232 | + self.compose_distinct_metric(key, str(item)) |
233 | + |
234 | + def compose_distinct_metric(self, key, item): |
235 | + if not key in self.distinct_metrics: |
236 | + metric = DistinctMetricReporter(key, self.time_function, |
237 | + prefix="stats.distinct") |
238 | + self.distinct_metrics[key] = metric |
239 | + self.distinct_metrics[key].update(item) |
240 | + |
241 | def process_timer_metric(self, key, duration, message): |
242 | try: |
243 | duration = float(duration) |
244 | @@ -192,6 +206,11 @@ |
245 | messages.extend(meter_metrics) |
246 | num_stats += events |
247 | |
248 | + distinct_metrics, events = self.flush_distinct_metrics(timestamp) |
249 | + if events > 0: |
250 | + messages.extend(distinct_metrics) |
251 | + num_stats += events |
252 | + |
253 | self.flush_metrics_summary(messages, num_stats, timestamp) |
254 | return messages |
255 | |
256 | @@ -278,6 +297,16 @@ |
257 | |
258 | return (metrics, events) |
259 | |
260 | + def flush_distinct_metrics(self, timestamp): |
261 | + metrics = [] |
262 | + events = 0 |
263 | + for metric in self.distinct_metrics.itervalues(): |
264 | + message = metric.report(timestamp) |
265 | + metrics.append(message) |
266 | + events += 1 |
267 | + |
268 | + return (metrics, events) |
269 | + |
270 | def flush_metrics_summary(self, messages, num_stats, timestamp): |
271 | messages.append("statsd.numStats %s %s\n" % (num_stats, timestamp)) |
272 | |
273 | |
274 | === added file 'txstatsd/tests/metrics/test_distinct.py' |
275 | --- txstatsd/tests/metrics/test_distinct.py 1970-01-01 00:00:00 +0000 |
276 | +++ txstatsd/tests/metrics/test_distinct.py 2011-10-26 16:55:26 +0000 |
277 | @@ -0,0 +1,80 @@ |
278 | +# Copyright (C) 2011 Canonical |
279 | +# All Rights Reserved |
280 | +import random |
281 | + |
282 | +from scipy.stats import chi2 |
283 | + |
284 | +from twisted.trial.unittest import TestCase |
285 | + |
286 | +import txstatsd.metrics.distinctmetric as distinct |
287 | + |
288 | + |
289 | +class TestHash(TestCase): |
290 | + def test_hash_chars(self): |
291 | + "For one table, all chars map to different chars" |
292 | + results = set() |
293 | + for c in range(256): |
294 | + random.seed(1) |
295 | + h = distinct.hash(chr(c)) |
296 | + results.add(h) |
297 | + self.assertEquals(len(results), 256) |
298 | + |
299 | + def test_chi_square(self): |
300 | + N = 10000 |
301 | + |
302 | + for (bits, buckets) in [(-1, 1024), (24, 256), |
303 | + (16, 256), (8, 256), (0, 256)]: |
304 | + bins = [0] * buckets |
305 | + for i in range(N): |
306 | + v = distinct.hash(str(i)) |
307 | + if bits < 0: |
308 | + bin = v / (0xFFFFFFFF / buckets) |
309 | + else: |
310 | + bin = (v >> bits) & 0xFF |
311 | + bins[bin] += 1 |
312 | + value = sum(((x - N / buckets) ** 2) / (N / buckets) for x in bins) |
313 | + pval = chi2.cdf(value, N) |
314 | + if pval > 0.5: |
315 | + print bins, pval |
316 | + self.assertTrue(pval < 0.5, "bits %s, pval == %s" % (bits, pval)) |
317 | + test_chi_square.skip = "Takes too long to run every time." |
318 | + |
319 | + |
320 | +class TestZeros(TestCase): |
321 | + def test_zeros(self): |
322 | + self.assertEquals(distinct.zeros(1), 0) |
323 | + self.assertEquals(distinct.zeros(2), 1) |
324 | + self.assertEquals(distinct.zeros(4), 2) |
325 | + self.assertEquals(distinct.zeros(5), 0) |
326 | + self.assertEquals(distinct.zeros(8), 3) |
327 | + self.assertEquals(distinct.zeros(9), 0) |
328 | + |
329 | +class TestDistinct(TestCase): |
330 | + def test_all(self): |
331 | + random.seed(1) |
332 | + |
333 | + for r in [1000, 10000]: |
334 | + cd = distinct.SlidingDistinctCounter(32, 32) |
335 | + for i in range(r): |
336 | + cd.add(1, str(i)) |
337 | + error = abs(cd.distinct() - r) |
338 | + self.assertTrue(error < 0.15 * r) |
339 | + |
340 | + |
341 | +class TestDistinctMetricReporter(TestCase): |
342 | + def test_reports(self): |
343 | + random.seed(1) |
344 | + _wall_time = [0] |
345 | + def _time(): |
346 | + return _wall_time[0] |
347 | + |
348 | + dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time) |
349 | + for i in range(3000): |
350 | + _wall_time[0] = i * 50 |
351 | + dmr.update(str(i)) |
352 | + now = _time() |
353 | + self.assertTrue(abs(dmr.count() - 3000) < 600) |
354 | + self.assertTrue(abs(dmr.count_1min(now) - 1) < 2) |
355 | + self.assertTrue(abs(dmr.count_1hour(now) - 72) < 15) |
356 | + self.assertTrue(abs(dmr.count_1day(now) - 1728) < 500) |
357 | + self.assertTrue("count_1hour" in dmr.report(now)) |
358 | |
359 | === modified file 'txstatsd/tests/test_processor.py' |
360 | --- txstatsd/tests/test_processor.py 2011-10-07 19:49:25 +0000 |
361 | +++ txstatsd/tests/test_processor.py 2011-10-26 16:55:26 +0000 |
362 | @@ -63,6 +63,16 @@ |
363 | [9.6, 'gorets'], |
364 | self.processor.gauge_metrics.pop()) |
365 | |
366 | + def test_receive_distinct_metric(self): |
367 | + """ |
368 | + A distinct metric message takes the form: |
369 | + '<name>:<item>|d'. |
370 | + 'distinct' indicates this is a distinct metric message. |
371 | + """ |
372 | + self.processor.process("gorets:one|d") |
373 | + self.assertEqual(1, len(self.processor.distinct_metrics)) |
374 | + self.assertTrue(self.processor.distinct_metrics["gorets"].count() > 0) |
375 | + |
376 | def test_receive_message_no_fields(self): |
377 | """ |
378 | If a timer message has no fields, it is logged and discarded. |
379 | @@ -227,6 +237,22 @@ |
380 | "statsd.numStats 1 42", messages[1].splitlines()[0]) |
381 | self.assertEqual(0, len(self.processor.gauge_metrics)) |
382 | |
383 | + def test_flush_distinct_metric(self): |
384 | + """ |
385 | + Test the correct rendering of the Graphite report for |
386 | + a distinct metric. |
387 | + """ |
388 | + |
389 | + self.processor.process("gorets:item|d") |
390 | + |
391 | + messages = self.processor.flush() |
392 | + self.assertEqual(2, len(messages)) |
393 | + metrics = messages[0] |
394 | + self.assertTrue("stats.distinct.gorets.count " in metrics) |
395 | + self.assertTrue("stats.distinct.gorets.count_1hour" in metrics) |
396 | + self.assertTrue("stats.distinct.gorets.count_1min" in metrics) |
397 | + self.assertTrue("stats.distinct.gorets.count_1day" in metrics) |
398 | + |
399 | |
400 | class FlushMeterMetricMessagesTest(TestCase): |
401 |
1. Why not use something like 'd' for the postfix instead of 'distinct'?
2. There's some copy-n-paste issues (gauge -> distinct in docstrings)
3. TestDistinc -> TestDistinct
4. Maybe 'add' can be simplified a bit? Instead of looping 2x256 (list comp + enumerate) do a generator expression inside enumerate?
5. Missing blank line between docstring and __init__
6. (unrelated to the branch) it seems pretty obvious that adding a new type of metric needs to touch a lot of code. I particularly dislike constructs like this:
175 + if not name in self._metrics: self.connection , name)
176 + metric = DistinctMetric(
177 + self._metrics[name] = metric
Maybe something can be done to simplify it in the future.