Merge lp:~lucio.torre/txstatsd/add-distinct-count into lp:txstatsd

Proposed by Lucio Torre
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 40
Merged at revision: 48
Proposed branch: lp:~lucio.torre/txstatsd/add-distinct-count
Merge into: lp:txstatsd
Diff against target: 400 lines (+303/-0)
6 files modified
txstatsd/metrics/distinctmetric.py (+153/-0)
txstatsd/metrics/metrics.py (+8/-0)
txstatsd/server/configurableprocessor.py (+7/-0)
txstatsd/server/processor.py (+29/-0)
txstatsd/tests/metrics/test_distinct.py (+80/-0)
txstatsd/tests/test_processor.py (+26/-0)
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-distinct-count
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Review via email: mp+80391@code.launchpad.net

Commit message

add probabilistic distinct counter

Description of the change

Implements a probabilistic distinct counter with sliding windows.

See: http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.12.7100
for an idea on the algorithm.

It uses custom hash functions that are tested for uniform distribution (now skipped, they take too long).

To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) wrote :

1. Why not use something like 'd' for the postfix instead of 'distinct'?

2. There's some copy-n-paste issues (gauge -> distinct in docstrings)

3. TestDistinc -> TestDistinct

4. Maybe 'add' can be simplified a bit? Instead of looping 2x256 (list comp + enumerate) do a generator expression inside enumerate?

5. Missing blank line between docstring and __init__

6. (unrelated to the branch) it seems pretty obvious that adding a new type of metric needs to touch a lot of code. I particularly dislike constructs like this:

175 + if not name in self._metrics:
176 + metric = DistinctMetric(self.connection, name)
177 + self._metrics[name] = metric

Maybe something can be done to simplify it in the future.

review: Needs Fixing
Revision history for this message
Lucio Torre (lucio.torre) wrote :

1- because it wont matter much in the packet size and its more clear. I can change it if you want, it just seamed better. I dont like when we start growing code letters that we repeat in various parts of the code and they carry no meaning.

2- fixed

3- fixed

4- Turned the hash loop into a generator so its just one pass of size n_hashes

5- fixed.

6- There is a lot to fix there. I dont think that is that bad, as the metric will be created in the entry point for those measurements. Still, all the changes that need to be added for process, flush and configurable processor seem a bit repetitive. But yes, it needs some work there, we should discuss it. I always saw this work a part of leaving behind statsd compatibility.

Revision history for this message
Sidnei da Silva (sidnei) wrote :

Re: 1, please change it for the sake of consistency. While I agree its not a lot relative to the whole packet, I think it's worth saving every byte we can.

The remaining changes look good. +1

Note you can set a commit message and flip the MP to approved, and tarmac will pick it up.

review: Approve
Revision history for this message
Ubuntu One Server Tarmac Bot (ubuntuone-server-tarmac) wrote :

There are additional revisions which have not been approved in review. Please seek review and approval of these new revisions.

Revision history for this message
Sidnei da Silva (sidnei) :
review: Approve
Revision history for this message
Ubuntu One Server Tarmac Bot (ubuntuone-server-tarmac) wrote :
Download full text (15.5 KiB)

The attempt to merge lp:~lucio.torre/txstatsd/add-distinct-count into lp:txstatsd failed. Below is the output from the failed tests.

                                                                        [ERROR]
txstatsd.tests.metrics.test_histogrammetric
  TestHistogramReporterMetric
    test_histogram_of_numbers_1_through_10000 ... [OK]
    test_histogram_with_zero_recorded_values ... [OK]
txstatsd.tests.metrics.test_timermetric
  TestBlankTimerMetric
    test_count ... [OK]
    test_fifteen_minute_rate ... [OK]
    test_five_minute_rate ... [OK]
    test_max ... [OK]
    test_mean ... [OK]
    test_mean_rate ... [OK]
    test_min ... [OK]
    test_no_values ... [OK]
    test_one_minute_rate ... [OK]
    test_percentiles ... [OK]
    test_std_dev ... [OK]
  TestTimingSeriesEvents
    test_count ... [OK]
    test_max ... [OK]
    test_mean ... [OK]
    test_min ... [OK]
    test_percentiles ... [OK]
    test_std_dev ... [OK]
    test_values ... [OK]
txstatsd.tests.stats.test_ewma
  TestEwmaFifteenMinute
    test_eight_minutes ... [OK]
    test_eleven_minutes ... [OK]
    test_fifteen_minutes ... [OK]
    test_first_tick ... [OK]
    test_five_minutes ... [OK]
    test_four_minutes ... [OK]
    test_fourteen_minutes ... [OK]
    test_nine_minutes ... [OK]
    test_one_minute ... [OK]
    test_seven_minutes ... [OK]
    test_six_minutes ... [OK]
    test_ten_minutes ... [OK]
    test_thirteen_minutes ... [OK]
    test_three_minutes ... [OK]
    test_twelve_minutes ... ...

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'txstatsd/metrics/distinctmetric.py'
2--- txstatsd/metrics/distinctmetric.py 1970-01-01 00:00:00 +0000
3+++ txstatsd/metrics/distinctmetric.py 2011-10-26 16:55:26 +0000
4@@ -0,0 +1,153 @@
5+# Copyright (C) 2011 Canonical
6+# All Rights Reserved
7+"""
8+Implements a probabilistic distinct counter with sliding windows.
9+
10+Based on:
11+http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.12.7100
12+
13+And extended for sliding windows.
14+"""
15+import random
16+import time
17+
18+from string import Template
19+
20+from txstatsd.metrics.metric import Metric
21+
22+
23+class SBoxHash(object):
24+ """A very fast hash.
25+
26+ This class create a random hash function that is very fast.
27+ Based on SBOXes. Not Crypto Strong.
28+
29+ Two instances of this class will hash differently.
30+ """
31+
32+ def __init__(self):
33+ self.table = [random.randint(0, 0xFFFFFFFF - 1) for i in range(256)]
34+
35+ def hash(self, data):
36+ value = 0
37+ for c in data:
38+ value = value ^ self.table[ord(c)]
39+ value = value * 3
40+ value = value & 0xFFFFFFFF
41+ return value
42+
43+
44+def hash(data):
45+ """Hash data using a random hasher."""
46+ p = SBoxHash()
47+ return p.hash(data)
48+
49+
50+def zeros(n):
51+ """Count the zeros to the right of the binary representation of n."""
52+ count = 0
53+ i = 0
54+ while True:
55+ v = (n >> i)
56+ if v <= 0:
57+ return count
58+ if v & 1:
59+ return count
60+ count += 1
61+ i += 1
62+ return count
63+
64+
65+class SlidingDistinctCounter(object):
66+ """A probabilistic distinct counter with sliding windows."""
67+
68+ def __init__(self, n_hashes, n_buckets):
69+ self.n_hashes = n_hashes
70+ self.n_buckets = n_buckets
71+
72+ self.hashes = [SBoxHash() for i in range(n_hashes)]
73+ self.buckets = [[0] * n_buckets for i in range(n_hashes)]
74+
75+ def add(self, when, item):
76+ hashes = (h.hash(item) for h in self.hashes)
77+ for i, value in enumerate(hashes):
78+ self.buckets[i][min(self.n_buckets - 1, zeros(value))] = when
79+
80+ def distinct(self, since=0):
81+ total = 0.0
82+ for i in range(self.n_hashes):
83+ least0 = 0
84+ for b in range(self.n_buckets):
85+ if self.buckets[i][b] <= since:
86+ break
87+ least0 += 1
88+ total += least0
89+ v = total / self.n_hashes
90+ return int((2 ** v) / 0.77351)
91+
92+
93+class DistinctMetric(Metric):
94+ """
95+ Keeps an estimate of the distinct numbers of items seen on various
96+ sliding windows of time.
97+ """
98+
99+ def mark(self, item):
100+ """Report this item was seen."""
101+ self.send("%s|d" % item)
102+
103+
104+class DistinctMetricReporter(object):
105+ """
106+ Keeps an estimate of the distinct numbers of items seen on various
107+ sliding windows of time.
108+ """
109+
110+ MESSAGE = (
111+ "$prefix%(key)s.count_1min %(count_1min)s %(timestamp)s\n"
112+ "$prefix%(key)s.count_1hour %(count_1hour)s %(timestamp)s\n"
113+ "$prefix%(key)s.count_1day %(count_1day)s %(timestamp)s\n"
114+ "$prefix%(key)s.count %(count)s %(timestamp)s\n"
115+ )
116+
117+ def __init__(self, name, wall_time_func=time.time, prefix=""):
118+ """Construct a metric we expect to be periodically updated.
119+
120+ @param name: Indicates what is being instrumented.
121+ @param wall_time_func: Function for obtaining wall time.
122+ @param prefix: If present, a string to prepend to the message
123+ composed when C{report} is called.
124+ """
125+ self.name = name
126+ self.wall_time_func = wall_time_func
127+ self.counter = SlidingDistinctCounter(32, 32)
128+ if prefix:
129+ prefix += '.'
130+ self.message = Template(DistinctMetricReporter.MESSAGE).substitute(
131+ prefix=prefix)
132+
133+ def count(self):
134+ return self.counter.distinct()
135+
136+ def count_1min(self, now):
137+ return self.counter.distinct(now - 60)
138+
139+ def count_1hour(self, now):
140+ return self.counter.distinct(now - 60 * 60)
141+
142+ def count_1day(self, now):
143+ return self.counter.distinct(now - 60 * 60 * 24)
144+
145+ def update(self, item):
146+ """Adds a seen item."""
147+ self.counter.add(self.wall_time_func(), item)
148+
149+ def report(self, timestamp):
150+ now = self.wall_time_func()
151+ return self.message % {
152+ "key": self.name,
153+ "count": self.count(),
154+ "count_1min": self.count_1min(now),
155+ "count_1hour": self.count_1hour(now),
156+ "count_1day": self.count_1day(now),
157+ "timestamp": timestamp}
158
159=== modified file 'txstatsd/metrics/metrics.py'
160--- txstatsd/metrics/metrics.py 2011-08-25 17:41:37 +0000
161+++ txstatsd/metrics/metrics.py 2011-10-26 16:55:26 +0000
162@@ -1,6 +1,7 @@
163
164 from txstatsd.metrics.gaugemetric import GaugeMetric
165 from txstatsd.metrics.metermetric import MeterMetric
166+from txstatsd.metrics.distinctmetric import DistinctMetric
167 from txstatsd.metrics.metric import Metric
168
169
170@@ -69,6 +70,13 @@
171 self._metrics[name] = metric
172 self._metrics[name].send("%s|ms" % duration)
173
174+ def distinct(self, name, item):
175+ name = self.fully_qualify_name(name)
176+ if not name in self._metrics:
177+ metric = DistinctMetric(self.connection, name)
178+ self._metrics[name] = metric
179+ self._metrics[name].mark(item)
180+
181 def clear(self, name):
182 """Allow the metric to re-initialize its internal state."""
183 name = self.fully_qualify_name(name)
184
185=== modified file 'txstatsd/server/configurableprocessor.py'
186--- txstatsd/server/configurableprocessor.py 2011-10-10 16:08:12 +0000
187+++ txstatsd/server/configurableprocessor.py 2011-10-26 16:55:26 +0000
188@@ -71,6 +71,13 @@
189 self.meter_metrics[key] = metric
190 self.meter_metrics[key].mark(value)
191
192+ def compose_distinct_metric(self, key, item):
193+ if not key in self.distinct_metrics:
194+ metric = DistinctMetricReporter(key, self.time_function,
195+ prefix=self.message_prefix)
196+ self.distinct_metrics[key] = metric
197+ self.distinct_metrics[key].update(item)
198+
199 def flush_counter_metrics(self, interval, timestamp):
200 metrics = []
201 events = 0
202
203=== modified file 'txstatsd/server/processor.py'
204--- txstatsd/server/processor.py 2011-10-10 16:08:12 +0000
205+++ txstatsd/server/processor.py 2011-10-26 16:55:26 +0000
206@@ -6,6 +6,7 @@
207 from twisted.python import log
208
209 from txstatsd.metrics.metermetric import MeterMetricReporter
210+from txstatsd.metrics.distinctmetric import DistinctMetricReporter
211
212
213 SPACES = re.compile("\s+")
214@@ -60,6 +61,7 @@
215 self.counter_metrics = {}
216 self.gauge_metrics = deque()
217 self.meter_metrics = {}
218+ self.distinct_metrics = {}
219
220 def fail(self, message):
221 """Log and discard malformed message."""
222@@ -91,9 +93,21 @@
223 self.process_gauge_metric(key, fields[0], message)
224 elif fields[1] == "m":
225 self.process_meter_metric(key, fields[0], message)
226+ elif fields[1] == "d":
227+ self.process_distinct_metric(key, fields[0], message)
228 else:
229 return self.fail(message)
230
231+ def process_distinct_metric(self, key, item, message):
232+ self.compose_distinct_metric(key, str(item))
233+
234+ def compose_distinct_metric(self, key, item):
235+ if not key in self.distinct_metrics:
236+ metric = DistinctMetricReporter(key, self.time_function,
237+ prefix="stats.distinct")
238+ self.distinct_metrics[key] = metric
239+ self.distinct_metrics[key].update(item)
240+
241 def process_timer_metric(self, key, duration, message):
242 try:
243 duration = float(duration)
244@@ -192,6 +206,11 @@
245 messages.extend(meter_metrics)
246 num_stats += events
247
248+ distinct_metrics, events = self.flush_distinct_metrics(timestamp)
249+ if events > 0:
250+ messages.extend(distinct_metrics)
251+ num_stats += events
252+
253 self.flush_metrics_summary(messages, num_stats, timestamp)
254 return messages
255
256@@ -278,6 +297,16 @@
257
258 return (metrics, events)
259
260+ def flush_distinct_metrics(self, timestamp):
261+ metrics = []
262+ events = 0
263+ for metric in self.distinct_metrics.itervalues():
264+ message = metric.report(timestamp)
265+ metrics.append(message)
266+ events += 1
267+
268+ return (metrics, events)
269+
270 def flush_metrics_summary(self, messages, num_stats, timestamp):
271 messages.append("statsd.numStats %s %s\n" % (num_stats, timestamp))
272
273
274=== added file 'txstatsd/tests/metrics/test_distinct.py'
275--- txstatsd/tests/metrics/test_distinct.py 1970-01-01 00:00:00 +0000
276+++ txstatsd/tests/metrics/test_distinct.py 2011-10-26 16:55:26 +0000
277@@ -0,0 +1,80 @@
278+# Copyright (C) 2011 Canonical
279+# All Rights Reserved
280+import random
281+
282+from scipy.stats import chi2
283+
284+from twisted.trial.unittest import TestCase
285+
286+import txstatsd.metrics.distinctmetric as distinct
287+
288+
289+class TestHash(TestCase):
290+ def test_hash_chars(self):
291+ "For one table, all chars map to different chars"
292+ results = set()
293+ for c in range(256):
294+ random.seed(1)
295+ h = distinct.hash(chr(c))
296+ results.add(h)
297+ self.assertEquals(len(results), 256)
298+
299+ def test_chi_square(self):
300+ N = 10000
301+
302+ for (bits, buckets) in [(-1, 1024), (24, 256),
303+ (16, 256), (8, 256), (0, 256)]:
304+ bins = [0] * buckets
305+ for i in range(N):
306+ v = distinct.hash(str(i))
307+ if bits < 0:
308+ bin = v / (0xFFFFFFFF / buckets)
309+ else:
310+ bin = (v >> bits) & 0xFF
311+ bins[bin] += 1
312+ value = sum(((x - N / buckets) ** 2) / (N / buckets) for x in bins)
313+ pval = chi2.cdf(value, N)
314+ if pval > 0.5:
315+ print bins, pval
316+ self.assertTrue(pval < 0.5, "bits %s, pval == %s" % (bits, pval))
317+ test_chi_square.skip = "Takes too long to run every time."
318+
319+
320+class TestZeros(TestCase):
321+ def test_zeros(self):
322+ self.assertEquals(distinct.zeros(1), 0)
323+ self.assertEquals(distinct.zeros(2), 1)
324+ self.assertEquals(distinct.zeros(4), 2)
325+ self.assertEquals(distinct.zeros(5), 0)
326+ self.assertEquals(distinct.zeros(8), 3)
327+ self.assertEquals(distinct.zeros(9), 0)
328+
329+class TestDistinct(TestCase):
330+ def test_all(self):
331+ random.seed(1)
332+
333+ for r in [1000, 10000]:
334+ cd = distinct.SlidingDistinctCounter(32, 32)
335+ for i in range(r):
336+ cd.add(1, str(i))
337+ error = abs(cd.distinct() - r)
338+ self.assertTrue(error < 0.15 * r)
339+
340+
341+class TestDistinctMetricReporter(TestCase):
342+ def test_reports(self):
343+ random.seed(1)
344+ _wall_time = [0]
345+ def _time():
346+ return _wall_time[0]
347+
348+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
349+ for i in range(3000):
350+ _wall_time[0] = i * 50
351+ dmr.update(str(i))
352+ now = _time()
353+ self.assertTrue(abs(dmr.count() - 3000) < 600)
354+ self.assertTrue(abs(dmr.count_1min(now) - 1) < 2)
355+ self.assertTrue(abs(dmr.count_1hour(now) - 72) < 15)
356+ self.assertTrue(abs(dmr.count_1day(now) - 1728) < 500)
357+ self.assertTrue("count_1hour" in dmr.report(now))
358
359=== modified file 'txstatsd/tests/test_processor.py'
360--- txstatsd/tests/test_processor.py 2011-10-07 19:49:25 +0000
361+++ txstatsd/tests/test_processor.py 2011-10-26 16:55:26 +0000
362@@ -63,6 +63,16 @@
363 [9.6, 'gorets'],
364 self.processor.gauge_metrics.pop())
365
366+ def test_receive_distinct_metric(self):
367+ """
368+ A distinct metric message takes the form:
369+ '<name>:<item>|d'.
370+ 'distinct' indicates this is a distinct metric message.
371+ """
372+ self.processor.process("gorets:one|d")
373+ self.assertEqual(1, len(self.processor.distinct_metrics))
374+ self.assertTrue(self.processor.distinct_metrics["gorets"].count() > 0)
375+
376 def test_receive_message_no_fields(self):
377 """
378 If a timer message has no fields, it is logged and discarded.
379@@ -227,6 +237,22 @@
380 "statsd.numStats 1 42", messages[1].splitlines()[0])
381 self.assertEqual(0, len(self.processor.gauge_metrics))
382
383+ def test_flush_distinct_metric(self):
384+ """
385+ Test the correct rendering of the Graphite report for
386+ a distinct metric.
387+ """
388+
389+ self.processor.process("gorets:item|d")
390+
391+ messages = self.processor.flush()
392+ self.assertEqual(2, len(messages))
393+ metrics = messages[0]
394+ self.assertTrue("stats.distinct.gorets.count " in metrics)
395+ self.assertTrue("stats.distinct.gorets.count_1hour" in metrics)
396+ self.assertTrue("stats.distinct.gorets.count_1min" in metrics)
397+ self.assertTrue("stats.distinct.gorets.count_1day" in metrics)
398+
399
400 class FlushMeterMetricMessagesTest(TestCase):
401

Subscribers

People subscribed via source and target branches