Merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin into lp:~txstatsd-dev/txstatsd/distinct-plugin
- add-distinct-db-plugin
- Merge into distinct-plugin
Proposed by
Lucio Torre
Status: | Merged |
---|---|
Approved by: | Sidnei da Silva |
Approved revision: | 6 |
Merged at revision: | 2 |
Proposed branch: | lp:~lucio.torre/txstatsd/add-distinct-db-plugin |
Merge into: | lp:~txstatsd-dev/txstatsd/distinct-plugin |
Diff against target: |
489 lines (+353/-55) 10 files modified
README (+10/-0) bin/schema.sql (+3/-0) bin/start-database.sh (+63/-0) bin/stop-database.sh (+22/-0) distinctdb/distinctmetric.py (+73/-33) distinctdb/tests/test_distinct.py (+147/-0) distinctdb/version.py (+1/-1) setup.py (+1/-1) twisted/plugins/distinct_plugin.py (+0/-20) twisted/plugins/distinctdbplugin.py (+33/-0) |
To merge this branch: | bzr merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Sidnei da Silva | Approve | ||
Review via email: mp+85031@code.launchpad.net |
Commit message
Description of the change
talk to the database to save the buckets of info
To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) wrote : | # |
Revision history for this message
Sidnei da Silva (sidnei) : | # |
review:
Needs Fixing
- 5. By Lucio Torre
-
remove while, use INSERT .. RETURNING
- 6. By Lucio Torre
-
added test files
Revision history for this message
Sidnei da Silva (sidnei) wrote : | # |
Looks good now. My last wish is a Makefile with a 'test' target that performs the steps outlined in the README. So that I can add this as a distinct (no pun intended) project in tarmac and run the tests before merge. You can choose to ignore me though.
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === added file 'README' |
2 | --- README 1970-01-01 00:00:00 +0000 |
3 | +++ README 2011-12-09 20:08:25 +0000 |
4 | @@ -0,0 +1,10 @@ |
5 | +To test: |
6 | +./bin/start-database.sh |
7 | +psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql |
8 | + |
9 | +Then: |
10 | +trial distinctdb |
11 | + |
12 | +When done: |
13 | +./bin/stop-database.sh |
14 | + |
15 | |
16 | === added directory 'bin' |
17 | === added file 'bin/schema.sql' |
18 | --- bin/schema.sql 1970-01-01 00:00:00 +0000 |
19 | +++ bin/schema.sql 2011-12-09 20:08:25 +0000 |
20 | @@ -0,0 +1,3 @@ |
21 | +CREATE TABLE paths (id SERIAL PRIMARY KEY NOT NULL, path TEXT NOT NULL UNIQUE); |
22 | +CREATE TABLE points (path_id INTEGER, bucket INTEGER, value TEXT, count INTEGER); |
23 | +CREATE INDEX points_idx ON points (path_id, bucket); |
24 | |
25 | === added file 'bin/start-database.sh' |
26 | --- bin/start-database.sh 1970-01-01 00:00:00 +0000 |
27 | +++ bin/start-database.sh 2011-12-09 20:08:25 +0000 |
28 | @@ -0,0 +1,63 @@ |
29 | +#! /bin/bash |
30 | + |
31 | +ROOTDIR=${ROOTDIR:-`bzr root`} |
32 | +if [ ! -d "$ROOTDIR" ]; then |
33 | + echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2 |
34 | + exit 1 |
35 | +fi |
36 | + |
37 | +DATABASES=" |
38 | +distinct |
39 | +" |
40 | + |
41 | +function setup_database() { |
42 | + local TESTDIR=$1 |
43 | + |
44 | + echo "## Starting postgres in $TESTDIR ##" |
45 | + mkdir -p "$TESTDIR/data" |
46 | + chmod 700 "$TESTDIR/data" |
47 | + |
48 | + export PGHOST="$TESTDIR" |
49 | + export PGDATA="$TESTDIR/data" |
50 | + if [ -d /usr/lib/postgresql/8.4 ]; then |
51 | + export PGBINDIR=/usr/lib/postgresql/8.4/bin |
52 | + elif [ -d /usr/lib/postgresql/8.3 ]; then |
53 | + export PGBINDIR=/usr/lib/postgresql/8.3/bin |
54 | + else |
55 | + echo "Cannot find valid parent for PGBINDIR" |
56 | + fi |
57 | + $PGBINDIR/initdb -E UNICODE -D $PGDATA |
58 | + # set up the database options file |
59 | + if [ ! -e $PGDATA/postgresql.conf ]; then |
60 | + echo "PostgreSQL data directory apparently didn't init" |
61 | + else |
62 | + ( |
63 | + cat <<EOF |
64 | +search_path='\$user,public,ts2' |
65 | +add_missing_from=false |
66 | +log_statement='all' |
67 | +log_line_prefix='[%m] %q%u@%d %c ' |
68 | +fsync = off |
69 | +EOF |
70 | + ) > $PGDATA/postgresql.conf |
71 | + fi |
72 | + $PGBINDIR/initdb -A trust &>/dev/null |
73 | + $PGBINDIR/pg_ctl start -w -D $TESTDIR/data -l $TESTDIR/postgres.log -o "-F -k $TESTDIR -h ''" |
74 | + for db in $DATABASES; do |
75 | + $PGBINDIR/createdb --encoding UNICODE "$db" &>/dev/null |
76 | + $PGBINDIR/createlang plpgsql "$db" |
77 | + done |
78 | + $PGBINDIR/createuser --superuser --createdb "postgres" &>/dev/null |
79 | + # create the additional users we need via a psql script |
80 | + $PGBINDIR/psql -U postgres template1 <<EOF |
81 | +CREATE ROLE client INHERIT; |
82 | + |
83 | +CREATE USER client IN ROLE client; |
84 | +EOF |
85 | + echo "To set your environment so psql will connect to this DB instance type:" |
86 | + echo " export PGHOST=$TESTDIR" |
87 | + echo "## Done. ##" |
88 | + echo -n host=$TESTDIR dbname=distinct > $ROOTDIR/tmp/pg.dsn |
89 | +} |
90 | + |
91 | +setup_database $ROOTDIR/tmp/db1 |
92 | |
93 | === added file 'bin/stop-database.sh' |
94 | --- bin/stop-database.sh 1970-01-01 00:00:00 +0000 |
95 | +++ bin/stop-database.sh 2011-12-09 20:08:25 +0000 |
96 | @@ -0,0 +1,22 @@ |
97 | +#! /bin/bash |
98 | + |
99 | +ROOTDIR=${ROOTDIR:-`bzr root`} |
100 | +if [ ! -d "$ROOTDIR" ]; then |
101 | + echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2 |
102 | + exit 1 |
103 | +fi |
104 | + |
105 | +if [ -d /usr/lib/postgresql/8.4 ]; then |
106 | + PGBINDIR=/usr/lib/postgresql/8.4/bin |
107 | +elif [ -d /usr/lib/postgresql/8.3 ]; then |
108 | + PGBINDIR=/usr/lib/postgresql/8.3/bin |
109 | +else |
110 | + echo "Cannot find valid parent for PGBINDIR" |
111 | +fi |
112 | + |
113 | +# setting PGDATA tells pg_ctl which DB to talk to |
114 | +export PGDATA=$ROOTDIR/tmp/db1/data/ |
115 | +$PGBINDIR/pg_ctl status > /dev/null |
116 | +if [ $? = 0 ]; then |
117 | + $PGBINDIR/pg_ctl stop -t 60 -w -m fast |
118 | +fi |
119 | |
120 | === renamed directory 'distinctplugin' => 'distinctdb' |
121 | === modified file 'distinctdb/distinctmetric.py' |
122 | --- distinctplugin/distinctmetric.py 2011-12-06 19:39:21 +0000 |
123 | +++ distinctdb/distinctmetric.py 2011-12-09 20:08:25 +0000 |
124 | @@ -1,17 +1,23 @@ |
125 | +import time |
126 | + |
127 | +import psycopg2 |
128 | + |
129 | +from zope.interface import implements |
130 | +from twisted.internet.threads import deferToThread |
131 | +from txstatsd.itxstatsd import IMetric |
132 | + |
133 | +ONEDAY = 60 * 60 * 24 |
134 | + |
135 | + |
136 | class DistinctMetricReporter(object): |
137 | """ |
138 | - Keeps an estimate of the distinct numbers of items seen on various |
139 | - sliding windows of time. |
140 | + Keeps an mesurement of the distinct numbers of items seen and the times |
141 | + it has seen each one. |
142 | """ |
143 | implements(IMetric) |
144 | |
145 | - MESSAGE = ( |
146 | - "$prefix%(key)s.count_1min %(count_1min)s %(timestamp)s\n" |
147 | - "$prefix%(key)s.count_1hour %(count_1hour)s %(timestamp)s\n" |
148 | - "$prefix%(key)s.count_1day %(count_1day)s %(timestamp)s\n" |
149 | - "$prefix%(key)s.count %(count)s %(timestamp)s\n") |
150 | - |
151 | - def __init__(self, name, wall_time_func=time.time, prefix=""): |
152 | + def __init__(self, name, wall_time_func=time.time, prefix="", |
153 | + bucket_size=ONEDAY, dsn=None): |
154 | """Construct a metric we expect to be periodically updated. |
155 | |
156 | @param name: Indicates what is being instrumented. |
157 | @@ -21,36 +27,70 @@ |
158 | """ |
159 | self.name = name |
160 | self.wall_time_func = wall_time_func |
161 | - self.counter = SlidingDistinctCounter(32, 32) |
162 | if prefix: |
163 | prefix += '.' |
164 | - self.message = Template(DistinctMetricReporter.MESSAGE).substitute( |
165 | - prefix=prefix) |
166 | - |
167 | - def count(self): |
168 | - return self.counter.distinct() |
169 | - |
170 | - def count_1min(self, now): |
171 | - return self.counter.distinct(now - 60) |
172 | - |
173 | - def count_1hour(self, now): |
174 | - return self.counter.distinct(now - 60 * 60) |
175 | - |
176 | - def count_1day(self, now): |
177 | - return self.counter.distinct(now - 60 * 60 * 24) |
178 | + self.prefix = prefix |
179 | + self.bucket_size = bucket_size |
180 | + self.dsn = dsn |
181 | + self.metric_id = None |
182 | + self.build_bucket() |
183 | + |
184 | + def build_bucket(self, timestamp=None): |
185 | + self.max = 0 |
186 | + self.bucket = {} |
187 | + self.bucket_no = self.get_bucket_no(timestamp) |
188 | + |
189 | + def get_bucket_no(self, timestamp=None): |
190 | + if timestamp is None: |
191 | + timestamp = self.wall_time_func() |
192 | + return int(timestamp / (self.bucket_size)) |
193 | |
194 | def process(self, fields): |
195 | self.update(fields[0]) |
196 | |
197 | def update(self, item): |
198 | - self.counter.add(self.wall_time_func(), item) |
199 | + value = self.bucket.get(item, 0) + 1 |
200 | + |
201 | + self.bucket[item] = value |
202 | + if value > self.max: |
203 | + self.max = value |
204 | + |
205 | + def _save_bucket(self, bucket, bucket_no): |
206 | + path = self.prefix + self.name |
207 | + if self.metric_id is None: |
208 | + c = psycopg2.connect(self.dsn) |
209 | + cr = c.cursor() |
210 | + cr.execute("SELECT * FROM paths WHERE path = %s", (path,)) |
211 | + row = cr.fetchone() |
212 | + if row is None: |
213 | + cr.execute("INSERT INTO paths (path) VALUES (%s) " |
214 | + "RETURNING (id)", (path,)) |
215 | + row = cr.fetchone() |
216 | + cr.execute("commit") |
217 | + |
218 | + self.metric_id = row[0] |
219 | + |
220 | + for i, (k, v) in enumerate(bucket.iteritems()): |
221 | + cr.execute("INSERT INTO points (path_id, bucket, value, count) " |
222 | + "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no, |
223 | + k, v)) |
224 | + if i % 1000 == 0: |
225 | + cr.execute("commit") |
226 | + cr.execute("commit") |
227 | + |
228 | + def save_bucket(self, bucket, bucket_no): |
229 | + if self.dsn is not None: |
230 | + deferToThread(self._save_bucket, bucket, bucket_no) |
231 | |
232 | def flush(self, interval, timestamp): |
233 | - now = self.wall_time_func() |
234 | - return self.message % { |
235 | - "key": self.name, |
236 | - "count": self.count(), |
237 | - "count_1min": self.count_1min(now), |
238 | - "count_1hour": self.count_1hour(now), |
239 | - "count_1day": self.count_1day(now), |
240 | - "timestamp": timestamp} |
241 | + current_bucket = self.get_bucket_no(timestamp) |
242 | + if current_bucket != self.bucket_no: |
243 | + self.save_bucket(self.bucket, self.bucket_no) |
244 | + self.build_bucket(timestamp) |
245 | + |
246 | + metrics = [] |
247 | + items = {".count": len(self.bucket), |
248 | + ".max": self.max} |
249 | + for item, value in items.iteritems(): |
250 | + metrics.append((self.prefix + self.name + item, value, timestamp)) |
251 | + return metrics |
252 | |
253 | === added directory 'distinctdb/tests' |
254 | === added file 'distinctdb/tests/__init__.py' |
255 | === added file 'distinctdb/tests/test_distinct.py' |
256 | --- distinctdb/tests/test_distinct.py 1970-01-01 00:00:00 +0000 |
257 | +++ distinctdb/tests/test_distinct.py 2011-12-09 20:08:25 +0000 |
258 | @@ -0,0 +1,147 @@ |
259 | +# Copyright (C) 2011 Canonical |
260 | +# All Rights Reserved |
261 | + |
262 | +import ConfigParser |
263 | +from cStringIO import StringIO |
264 | +import os |
265 | +import time |
266 | +import subprocess |
267 | + |
268 | +import psycopg2 |
269 | + |
270 | +from twisted.trial.unittest import TestCase |
271 | +from twisted.plugin import getPlugins |
272 | +from twisted.plugins import distinctdbplugin |
273 | +from txstatsd.itxstatsd import IMetricFactory |
274 | +from txstatsd import service |
275 | + |
276 | +from distinctdb import distinctmetric as distinct |
277 | + |
278 | + |
279 | +class TestDistinctMetricReporter(TestCase): |
280 | + |
281 | + def test_get_bucket_no(self): |
282 | + _wall_time = [0] |
283 | + |
284 | + def _time(): |
285 | + return _wall_time[0] |
286 | + |
287 | + dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time) |
288 | + self.assertEquals(dmr.get_bucket_no(), 0) |
289 | + _wall_time = [60 * 60 * 24 + 1] |
290 | + dmr.update("one") |
291 | + self.assertEquals(dmr.get_bucket_no(), 1) |
292 | + |
293 | + def test_max(self): |
294 | + _wall_time = [0] |
295 | + |
296 | + def _time(): |
297 | + return _wall_time[0] |
298 | + |
299 | + dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time) |
300 | + self.assertEquals(dmr.get_bucket_no(), 0) |
301 | + self.assertEquals(dmr.max, 0) |
302 | + dmr.update("one") |
303 | + dmr.update("one") |
304 | + dmr.update("two") |
305 | + self.assertEquals(dmr.max, 2) |
306 | + dmr.flush(1, 60 * 60 * 24 + 1) |
307 | + dmr.update("one") |
308 | + self.assertEquals(dmr.max, 1) |
309 | + |
310 | + def test_reports(self): |
311 | + _wall_time = [0] |
312 | + |
313 | + def _time(): |
314 | + return _wall_time[0] |
315 | + |
316 | + result = {} |
317 | + |
318 | + dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time) |
319 | + |
320 | + def save(b, b_no): |
321 | + result["bucket"] = b |
322 | + result["bucket_no"] = b_no |
323 | + dmr.save_bucket = save |
324 | + dmr.update("one") |
325 | + dmr.update("one") |
326 | + dmr.update("two") |
327 | + day = 60 * 60 * 24 + 1 |
328 | + dmr.flush(1, day) |
329 | + dmr.update("three") |
330 | + self.assertEquals(result, |
331 | + {"bucket": {"one": 2, "two": 1}, "bucket_no": 0}) |
332 | + self.assertEquals(dmr.flush(1, day), |
333 | + [("test.max", 1, day), ("test.count", 1, day)]) |
334 | + |
335 | + def test_configure(self): |
336 | + class TestOptions(service.OptionsGlue): |
337 | + optParameters = [["test", "t", "default", "help"]] |
338 | + config_section = "statsd" |
339 | + |
340 | + o = TestOptions() |
341 | + config_file = ConfigParser.RawConfigParser() |
342 | + config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n" |
343 | + "dsn = dbdsn\nbucket_size = 100")) |
344 | + o.configure(config_file) |
345 | + dmf = distinctdbplugin.DistinctMetricFactory() |
346 | + dmf.configure(o) |
347 | + dmr = dmf.build_metric("foo", "bar", time.time) |
348 | + self.assertEquals(dmr.bucket_size, 100) |
349 | + self.assertEquals(dmr.dsn, "dbdsn") |
350 | + |
351 | + |
352 | +class TestPlugin(TestCase): |
353 | + |
354 | + def test_factory(self): |
355 | + self.assertTrue(distinctdbplugin.distinct_metric_factory in \ |
356 | + list(getPlugins(IMetricFactory))) |
357 | + |
358 | + |
359 | +class TestDatabase(TestCase): |
360 | + |
361 | + def setUp(self): |
362 | + rootdir = subprocess.check_output(["bzr", "root"]).strip() |
363 | + dsn_file = os.path.join(rootdir, "tmp", "pg.dsn") |
364 | + self.dsn = open(dsn_file).read() |
365 | + self.conn = psycopg2.connect(self.dsn) |
366 | + |
367 | + def tearDown(self): |
368 | + cr = self.conn.cursor() |
369 | + cr.execute("rollback") |
370 | + cr.execute("DELETE FROM paths") |
371 | + cr.execute("DELETE FROM points") |
372 | + cr.execute("commit") |
373 | + |
374 | + def test_connect(self): |
375 | + cr = self.conn.cursor() |
376 | + cr.execute("SELECT 0") |
377 | + result = cr.fetchall() |
378 | + self.assertTrue(result, [(0,)]) |
379 | + |
380 | + def test_create_metric_id(self): |
381 | + dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
382 | + dmr._save_bucket({}, 0) |
383 | + cr = self.conn.cursor() |
384 | + cr.execute("SELECT * FROM paths WHERE path = 'test'") |
385 | + cr.execute("SELECT * FROM paths") |
386 | + self.assertEquals(len(cr.fetchall()), 1) |
387 | + |
388 | + def test_find_saved_data(self): |
389 | + dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
390 | + dmr.update("one") |
391 | + dmr.update("one") |
392 | + dmr.update("two") |
393 | + dmr._save_bucket(dmr.bucket, 0) |
394 | + cr = self.conn.cursor() |
395 | + cr.execute("SELECT * FROM points ORDER BY value") |
396 | + rows = cr.fetchall() |
397 | + self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2), |
398 | + (dmr.metric_id, 0, "two", 1)]) |
399 | + |
400 | + def test_load_metric_id(self): |
401 | + dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
402 | + dmr._save_bucket({}, 0) |
403 | + dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
404 | + dmr2._save_bucket({}, 0) |
405 | + self.assertEquals(dmr.metric_id, dmr2.metric_id) |
406 | |
407 | === modified file 'distinctdb/version.py' |
408 | --- distinctplugin/version.py 2011-12-06 19:39:21 +0000 |
409 | +++ distinctdb/version.py 2011-12-09 20:08:25 +0000 |
410 | @@ -1,1 +1,1 @@ |
411 | -distinctplugin = "0.0.1" |
412 | \ No newline at end of file |
413 | +distinctplugin = "0.0.1" |
414 | |
415 | === modified file 'setup.py' |
416 | --- setup.py 2011-12-06 19:39:21 +0000 |
417 | +++ setup.py 2011-12-09 20:08:25 +0000 |
418 | @@ -41,7 +41,7 @@ |
419 | list(getPlugins(IPlugin)) |
420 | |
421 | setup( |
422 | - cmdclass = {'install': TxPluginInstaller}, |
423 | + cmdclass={'install': TxPluginInstaller}, |
424 | name="distinctplugin", |
425 | version=version.distinctplugin, |
426 | description="A txstatsd plugin for distinct counts", |
427 | |
428 | === removed file 'twisted/plugins/distinct_plugin.py' |
429 | --- twisted/plugins/distinct_plugin.py 2011-12-06 19:39:21 +0000 |
430 | +++ twisted/plugins/distinct_plugin.py 1970-01-01 00:00:00 +0000 |
431 | @@ -1,20 +0,0 @@ |
432 | -from zope.interface import implements |
433 | - |
434 | -from twisted.plugin import IPlugin |
435 | -from txstatsd.itxstatsd import IMetricFactory |
436 | -from distinctcount.distinctmetric import DistinctMetricReporter |
437 | - |
438 | -class DistinctMetricFactory(object): |
439 | - implements(IMetricFactory, IPlugin) |
440 | - |
441 | - name = "distinct" |
442 | - metric_type = "d" |
443 | - |
444 | - def build_metric(self, prefix, name, wall_time_func=None): |
445 | - return DistinctMetricReporter(name, prefix=prefix, |
446 | - wall_time_func=wall_time_func) |
447 | - |
448 | - def configure(self, options): |
449 | - pass |
450 | - |
451 | -distinct_metric_factory = DistinctMetricFactory() |
452 | |
453 | === added file 'twisted/plugins/distinctdbplugin.py' |
454 | --- twisted/plugins/distinctdbplugin.py 1970-01-01 00:00:00 +0000 |
455 | +++ twisted/plugins/distinctdbplugin.py 2011-12-09 20:08:25 +0000 |
456 | @@ -0,0 +1,33 @@ |
457 | +from zope.interface import implements |
458 | + |
459 | +from twisted.plugin import IPlugin |
460 | +from txstatsd.itxstatsd import IMetricFactory |
461 | +from distinctdb.distinctmetric import DistinctMetricReporter, ONEDAY |
462 | + |
463 | + |
464 | +class DistinctMetricFactory(object): |
465 | + implements(IMetricFactory, IPlugin) |
466 | + |
467 | + name = "distinct" |
468 | + metric_type = "d" |
469 | + |
470 | + bucket_size = None |
471 | + dsn = None |
472 | + metric_ids = None |
473 | + |
474 | + def build_metric(self, prefix, name, wall_time_func=None): |
475 | + return DistinctMetricReporter(name, prefix=prefix, |
476 | + wall_time_func=wall_time_func, |
477 | + bucket_size=self.bucket_size, |
478 | + dsn=self.dsn) |
479 | + |
480 | + def configure(self, options): |
481 | + self.section = dict(options.get("plugin_distinctdb", {})) |
482 | + try: |
483 | + self.bucket_size = int(self.section.get("bucket_size", ONEDAY)) |
484 | + except ValueError: |
485 | + self.bucket_size = ONEDAY |
486 | + |
487 | + self.dsn = self.section.get("dsn", None) |
488 | + |
489 | +distinct_metric_factory = DistinctMetricFactory() |
My biggest concern here is that if this fails to connect to postgres, it will go in an infinite loop, with no logging at all and no sleep() between retries, so it can be a very tight loop consumin cpu with no visibility.
It should limit the amount of retries and at least have some logging if postgres cannot be reached.