Merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin into lp:~txstatsd-dev/txstatsd/distinct-plugin

Proposed by Lucio Torre
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 6
Merged at revision: 2
Proposed branch: lp:~lucio.torre/txstatsd/add-distinct-db-plugin
Merge into: lp:~txstatsd-dev/txstatsd/distinct-plugin
Diff against target: 489 lines (+353/-55)
10 files modified
README (+10/-0)
bin/schema.sql (+3/-0)
bin/start-database.sh (+63/-0)
bin/stop-database.sh (+22/-0)
distinctdb/distinctmetric.py (+73/-33)
distinctdb/tests/test_distinct.py (+147/-0)
distinctdb/version.py (+1/-1)
setup.py (+1/-1)
twisted/plugins/distinct_plugin.py (+0/-20)
twisted/plugins/distinctdbplugin.py (+33/-0)
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Review via email: mp+85031@code.launchpad.net

Description of the change

talk to the database to save the buckets of info

To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) wrote :

My biggest concern here is that if this fails to connect to postgres, it will go in an infinite loop, with no logging at all and no sleep() between retries, so it can be a very tight loop consumin cpu with no visibility.

It should limit the amount of retries and at least have some logging if postgres cannot be reached.

Revision history for this message
Sidnei da Silva (sidnei) :
review: Needs Fixing
5. By Lucio Torre

remove while, use INSERT .. RETURNING

6. By Lucio Torre

added test files

Revision history for this message
Sidnei da Silva (sidnei) wrote :

Looks good now. My last wish is a Makefile with a 'test' target that performs the steps outlined in the README. So that I can add this as a distinct (no pun intended) project in tarmac and run the tests before merge. You can choose to ignore me though.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'README'
2--- README 1970-01-01 00:00:00 +0000
3+++ README 2011-12-09 20:08:25 +0000
4@@ -0,0 +1,10 @@
5+To test:
6+./bin/start-database.sh
7+psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
8+
9+Then:
10+trial distinctdb
11+
12+When done:
13+./bin/stop-database.sh
14+
15
16=== added directory 'bin'
17=== added file 'bin/schema.sql'
18--- bin/schema.sql 1970-01-01 00:00:00 +0000
19+++ bin/schema.sql 2011-12-09 20:08:25 +0000
20@@ -0,0 +1,3 @@
21+CREATE TABLE paths (id SERIAL PRIMARY KEY NOT NULL, path TEXT NOT NULL UNIQUE);
22+CREATE TABLE points (path_id INTEGER, bucket INTEGER, value TEXT, count INTEGER);
23+CREATE INDEX points_idx ON points (path_id, bucket);
24
25=== added file 'bin/start-database.sh'
26--- bin/start-database.sh 1970-01-01 00:00:00 +0000
27+++ bin/start-database.sh 2011-12-09 20:08:25 +0000
28@@ -0,0 +1,63 @@
29+#! /bin/bash
30+
31+ROOTDIR=${ROOTDIR:-`bzr root`}
32+if [ ! -d "$ROOTDIR" ]; then
33+ echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
34+ exit 1
35+fi
36+
37+DATABASES="
38+distinct
39+"
40+
41+function setup_database() {
42+ local TESTDIR=$1
43+
44+ echo "## Starting postgres in $TESTDIR ##"
45+ mkdir -p "$TESTDIR/data"
46+ chmod 700 "$TESTDIR/data"
47+
48+ export PGHOST="$TESTDIR"
49+ export PGDATA="$TESTDIR/data"
50+ if [ -d /usr/lib/postgresql/8.4 ]; then
51+ export PGBINDIR=/usr/lib/postgresql/8.4/bin
52+ elif [ -d /usr/lib/postgresql/8.3 ]; then
53+ export PGBINDIR=/usr/lib/postgresql/8.3/bin
54+ else
55+ echo "Cannot find valid parent for PGBINDIR"
56+ fi
57+ $PGBINDIR/initdb -E UNICODE -D $PGDATA
58+ # set up the database options file
59+ if [ ! -e $PGDATA/postgresql.conf ]; then
60+ echo "PostgreSQL data directory apparently didn't init"
61+ else
62+ (
63+ cat <<EOF
64+search_path='\$user,public,ts2'
65+add_missing_from=false
66+log_statement='all'
67+log_line_prefix='[%m] %q%u@%d %c '
68+fsync = off
69+EOF
70+ ) > $PGDATA/postgresql.conf
71+ fi
72+ $PGBINDIR/initdb -A trust &>/dev/null
73+ $PGBINDIR/pg_ctl start -w -D $TESTDIR/data -l $TESTDIR/postgres.log -o "-F -k $TESTDIR -h ''"
74+ for db in $DATABASES; do
75+ $PGBINDIR/createdb --encoding UNICODE "$db" &>/dev/null
76+ $PGBINDIR/createlang plpgsql "$db"
77+ done
78+ $PGBINDIR/createuser --superuser --createdb "postgres" &>/dev/null
79+ # create the additional users we need via a psql script
80+ $PGBINDIR/psql -U postgres template1 <<EOF
81+CREATE ROLE client INHERIT;
82+
83+CREATE USER client IN ROLE client;
84+EOF
85+ echo "To set your environment so psql will connect to this DB instance type:"
86+ echo " export PGHOST=$TESTDIR"
87+ echo "## Done. ##"
88+ echo -n host=$TESTDIR dbname=distinct > $ROOTDIR/tmp/pg.dsn
89+}
90+
91+setup_database $ROOTDIR/tmp/db1
92
93=== added file 'bin/stop-database.sh'
94--- bin/stop-database.sh 1970-01-01 00:00:00 +0000
95+++ bin/stop-database.sh 2011-12-09 20:08:25 +0000
96@@ -0,0 +1,22 @@
97+#! /bin/bash
98+
99+ROOTDIR=${ROOTDIR:-`bzr root`}
100+if [ ! -d "$ROOTDIR" ]; then
101+ echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
102+ exit 1
103+fi
104+
105+if [ -d /usr/lib/postgresql/8.4 ]; then
106+ PGBINDIR=/usr/lib/postgresql/8.4/bin
107+elif [ -d /usr/lib/postgresql/8.3 ]; then
108+ PGBINDIR=/usr/lib/postgresql/8.3/bin
109+else
110+ echo "Cannot find valid parent for PGBINDIR"
111+fi
112+
113+# setting PGDATA tells pg_ctl which DB to talk to
114+export PGDATA=$ROOTDIR/tmp/db1/data/
115+$PGBINDIR/pg_ctl status > /dev/null
116+if [ $? = 0 ]; then
117+ $PGBINDIR/pg_ctl stop -t 60 -w -m fast
118+fi
119
120=== renamed directory 'distinctplugin' => 'distinctdb'
121=== modified file 'distinctdb/distinctmetric.py'
122--- distinctplugin/distinctmetric.py 2011-12-06 19:39:21 +0000
123+++ distinctdb/distinctmetric.py 2011-12-09 20:08:25 +0000
124@@ -1,17 +1,23 @@
125+import time
126+
127+import psycopg2
128+
129+from zope.interface import implements
130+from twisted.internet.threads import deferToThread
131+from txstatsd.itxstatsd import IMetric
132+
133+ONEDAY = 60 * 60 * 24
134+
135+
136 class DistinctMetricReporter(object):
137 """
138- Keeps an estimate of the distinct numbers of items seen on various
139- sliding windows of time.
140+ Keeps an mesurement of the distinct numbers of items seen and the times
141+ it has seen each one.
142 """
143 implements(IMetric)
144
145- MESSAGE = (
146- "$prefix%(key)s.count_1min %(count_1min)s %(timestamp)s\n"
147- "$prefix%(key)s.count_1hour %(count_1hour)s %(timestamp)s\n"
148- "$prefix%(key)s.count_1day %(count_1day)s %(timestamp)s\n"
149- "$prefix%(key)s.count %(count)s %(timestamp)s\n")
150-
151- def __init__(self, name, wall_time_func=time.time, prefix=""):
152+ def __init__(self, name, wall_time_func=time.time, prefix="",
153+ bucket_size=ONEDAY, dsn=None):
154 """Construct a metric we expect to be periodically updated.
155
156 @param name: Indicates what is being instrumented.
157@@ -21,36 +27,70 @@
158 """
159 self.name = name
160 self.wall_time_func = wall_time_func
161- self.counter = SlidingDistinctCounter(32, 32)
162 if prefix:
163 prefix += '.'
164- self.message = Template(DistinctMetricReporter.MESSAGE).substitute(
165- prefix=prefix)
166-
167- def count(self):
168- return self.counter.distinct()
169-
170- def count_1min(self, now):
171- return self.counter.distinct(now - 60)
172-
173- def count_1hour(self, now):
174- return self.counter.distinct(now - 60 * 60)
175-
176- def count_1day(self, now):
177- return self.counter.distinct(now - 60 * 60 * 24)
178+ self.prefix = prefix
179+ self.bucket_size = bucket_size
180+ self.dsn = dsn
181+ self.metric_id = None
182+ self.build_bucket()
183+
184+ def build_bucket(self, timestamp=None):
185+ self.max = 0
186+ self.bucket = {}
187+ self.bucket_no = self.get_bucket_no(timestamp)
188+
189+ def get_bucket_no(self, timestamp=None):
190+ if timestamp is None:
191+ timestamp = self.wall_time_func()
192+ return int(timestamp / (self.bucket_size))
193
194 def process(self, fields):
195 self.update(fields[0])
196
197 def update(self, item):
198- self.counter.add(self.wall_time_func(), item)
199+ value = self.bucket.get(item, 0) + 1
200+
201+ self.bucket[item] = value
202+ if value > self.max:
203+ self.max = value
204+
205+ def _save_bucket(self, bucket, bucket_no):
206+ path = self.prefix + self.name
207+ if self.metric_id is None:
208+ c = psycopg2.connect(self.dsn)
209+ cr = c.cursor()
210+ cr.execute("SELECT * FROM paths WHERE path = %s", (path,))
211+ row = cr.fetchone()
212+ if row is None:
213+ cr.execute("INSERT INTO paths (path) VALUES (%s) "
214+ "RETURNING (id)", (path,))
215+ row = cr.fetchone()
216+ cr.execute("commit")
217+
218+ self.metric_id = row[0]
219+
220+ for i, (k, v) in enumerate(bucket.iteritems()):
221+ cr.execute("INSERT INTO points (path_id, bucket, value, count) "
222+ "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
223+ k, v))
224+ if i % 1000 == 0:
225+ cr.execute("commit")
226+ cr.execute("commit")
227+
228+ def save_bucket(self, bucket, bucket_no):
229+ if self.dsn is not None:
230+ deferToThread(self._save_bucket, bucket, bucket_no)
231
232 def flush(self, interval, timestamp):
233- now = self.wall_time_func()
234- return self.message % {
235- "key": self.name,
236- "count": self.count(),
237- "count_1min": self.count_1min(now),
238- "count_1hour": self.count_1hour(now),
239- "count_1day": self.count_1day(now),
240- "timestamp": timestamp}
241+ current_bucket = self.get_bucket_no(timestamp)
242+ if current_bucket != self.bucket_no:
243+ self.save_bucket(self.bucket, self.bucket_no)
244+ self.build_bucket(timestamp)
245+
246+ metrics = []
247+ items = {".count": len(self.bucket),
248+ ".max": self.max}
249+ for item, value in items.iteritems():
250+ metrics.append((self.prefix + self.name + item, value, timestamp))
251+ return metrics
252
253=== added directory 'distinctdb/tests'
254=== added file 'distinctdb/tests/__init__.py'
255=== added file 'distinctdb/tests/test_distinct.py'
256--- distinctdb/tests/test_distinct.py 1970-01-01 00:00:00 +0000
257+++ distinctdb/tests/test_distinct.py 2011-12-09 20:08:25 +0000
258@@ -0,0 +1,147 @@
259+# Copyright (C) 2011 Canonical
260+# All Rights Reserved
261+
262+import ConfigParser
263+from cStringIO import StringIO
264+import os
265+import time
266+import subprocess
267+
268+import psycopg2
269+
270+from twisted.trial.unittest import TestCase
271+from twisted.plugin import getPlugins
272+from twisted.plugins import distinctdbplugin
273+from txstatsd.itxstatsd import IMetricFactory
274+from txstatsd import service
275+
276+from distinctdb import distinctmetric as distinct
277+
278+
279+class TestDistinctMetricReporter(TestCase):
280+
281+ def test_get_bucket_no(self):
282+ _wall_time = [0]
283+
284+ def _time():
285+ return _wall_time[0]
286+
287+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
288+ self.assertEquals(dmr.get_bucket_no(), 0)
289+ _wall_time = [60 * 60 * 24 + 1]
290+ dmr.update("one")
291+ self.assertEquals(dmr.get_bucket_no(), 1)
292+
293+ def test_max(self):
294+ _wall_time = [0]
295+
296+ def _time():
297+ return _wall_time[0]
298+
299+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
300+ self.assertEquals(dmr.get_bucket_no(), 0)
301+ self.assertEquals(dmr.max, 0)
302+ dmr.update("one")
303+ dmr.update("one")
304+ dmr.update("two")
305+ self.assertEquals(dmr.max, 2)
306+ dmr.flush(1, 60 * 60 * 24 + 1)
307+ dmr.update("one")
308+ self.assertEquals(dmr.max, 1)
309+
310+ def test_reports(self):
311+ _wall_time = [0]
312+
313+ def _time():
314+ return _wall_time[0]
315+
316+ result = {}
317+
318+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
319+
320+ def save(b, b_no):
321+ result["bucket"] = b
322+ result["bucket_no"] = b_no
323+ dmr.save_bucket = save
324+ dmr.update("one")
325+ dmr.update("one")
326+ dmr.update("two")
327+ day = 60 * 60 * 24 + 1
328+ dmr.flush(1, day)
329+ dmr.update("three")
330+ self.assertEquals(result,
331+ {"bucket": {"one": 2, "two": 1}, "bucket_no": 0})
332+ self.assertEquals(dmr.flush(1, day),
333+ [("test.max", 1, day), ("test.count", 1, day)])
334+
335+ def test_configure(self):
336+ class TestOptions(service.OptionsGlue):
337+ optParameters = [["test", "t", "default", "help"]]
338+ config_section = "statsd"
339+
340+ o = TestOptions()
341+ config_file = ConfigParser.RawConfigParser()
342+ config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
343+ "dsn = dbdsn\nbucket_size = 100"))
344+ o.configure(config_file)
345+ dmf = distinctdbplugin.DistinctMetricFactory()
346+ dmf.configure(o)
347+ dmr = dmf.build_metric("foo", "bar", time.time)
348+ self.assertEquals(dmr.bucket_size, 100)
349+ self.assertEquals(dmr.dsn, "dbdsn")
350+
351+
352+class TestPlugin(TestCase):
353+
354+ def test_factory(self):
355+ self.assertTrue(distinctdbplugin.distinct_metric_factory in \
356+ list(getPlugins(IMetricFactory)))
357+
358+
359+class TestDatabase(TestCase):
360+
361+ def setUp(self):
362+ rootdir = subprocess.check_output(["bzr", "root"]).strip()
363+ dsn_file = os.path.join(rootdir, "tmp", "pg.dsn")
364+ self.dsn = open(dsn_file).read()
365+ self.conn = psycopg2.connect(self.dsn)
366+
367+ def tearDown(self):
368+ cr = self.conn.cursor()
369+ cr.execute("rollback")
370+ cr.execute("DELETE FROM paths")
371+ cr.execute("DELETE FROM points")
372+ cr.execute("commit")
373+
374+ def test_connect(self):
375+ cr = self.conn.cursor()
376+ cr.execute("SELECT 0")
377+ result = cr.fetchall()
378+ self.assertTrue(result, [(0,)])
379+
380+ def test_create_metric_id(self):
381+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
382+ dmr._save_bucket({}, 0)
383+ cr = self.conn.cursor()
384+ cr.execute("SELECT * FROM paths WHERE path = 'test'")
385+ cr.execute("SELECT * FROM paths")
386+ self.assertEquals(len(cr.fetchall()), 1)
387+
388+ def test_find_saved_data(self):
389+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
390+ dmr.update("one")
391+ dmr.update("one")
392+ dmr.update("two")
393+ dmr._save_bucket(dmr.bucket, 0)
394+ cr = self.conn.cursor()
395+ cr.execute("SELECT * FROM points ORDER BY value")
396+ rows = cr.fetchall()
397+ self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2),
398+ (dmr.metric_id, 0, "two", 1)])
399+
400+ def test_load_metric_id(self):
401+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
402+ dmr._save_bucket({}, 0)
403+ dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn)
404+ dmr2._save_bucket({}, 0)
405+ self.assertEquals(dmr.metric_id, dmr2.metric_id)
406
407=== modified file 'distinctdb/version.py'
408--- distinctplugin/version.py 2011-12-06 19:39:21 +0000
409+++ distinctdb/version.py 2011-12-09 20:08:25 +0000
410@@ -1,1 +1,1 @@
411-distinctplugin = "0.0.1"
412\ No newline at end of file
413+distinctplugin = "0.0.1"
414
415=== modified file 'setup.py'
416--- setup.py 2011-12-06 19:39:21 +0000
417+++ setup.py 2011-12-09 20:08:25 +0000
418@@ -41,7 +41,7 @@
419 list(getPlugins(IPlugin))
420
421 setup(
422- cmdclass = {'install': TxPluginInstaller},
423+ cmdclass={'install': TxPluginInstaller},
424 name="distinctplugin",
425 version=version.distinctplugin,
426 description="A txstatsd plugin for distinct counts",
427
428=== removed file 'twisted/plugins/distinct_plugin.py'
429--- twisted/plugins/distinct_plugin.py 2011-12-06 19:39:21 +0000
430+++ twisted/plugins/distinct_plugin.py 1970-01-01 00:00:00 +0000
431@@ -1,20 +0,0 @@
432-from zope.interface import implements
433-
434-from twisted.plugin import IPlugin
435-from txstatsd.itxstatsd import IMetricFactory
436-from distinctcount.distinctmetric import DistinctMetricReporter
437-
438-class DistinctMetricFactory(object):
439- implements(IMetricFactory, IPlugin)
440-
441- name = "distinct"
442- metric_type = "d"
443-
444- def build_metric(self, prefix, name, wall_time_func=None):
445- return DistinctMetricReporter(name, prefix=prefix,
446- wall_time_func=wall_time_func)
447-
448- def configure(self, options):
449- pass
450-
451-distinct_metric_factory = DistinctMetricFactory()
452
453=== added file 'twisted/plugins/distinctdbplugin.py'
454--- twisted/plugins/distinctdbplugin.py 1970-01-01 00:00:00 +0000
455+++ twisted/plugins/distinctdbplugin.py 2011-12-09 20:08:25 +0000
456@@ -0,0 +1,33 @@
457+from zope.interface import implements
458+
459+from twisted.plugin import IPlugin
460+from txstatsd.itxstatsd import IMetricFactory
461+from distinctdb.distinctmetric import DistinctMetricReporter, ONEDAY
462+
463+
464+class DistinctMetricFactory(object):
465+ implements(IMetricFactory, IPlugin)
466+
467+ name = "distinct"
468+ metric_type = "d"
469+
470+ bucket_size = None
471+ dsn = None
472+ metric_ids = None
473+
474+ def build_metric(self, prefix, name, wall_time_func=None):
475+ return DistinctMetricReporter(name, prefix=prefix,
476+ wall_time_func=wall_time_func,
477+ bucket_size=self.bucket_size,
478+ dsn=self.dsn)
479+
480+ def configure(self, options):
481+ self.section = dict(options.get("plugin_distinctdb", {}))
482+ try:
483+ self.bucket_size = int(self.section.get("bucket_size", ONEDAY))
484+ except ValueError:
485+ self.bucket_size = ONEDAY
486+
487+ self.dsn = self.section.get("dsn", None)
488+
489+distinct_metric_factory = DistinctMetricFactory()

Subscribers

People subscribed via source and target branches