Merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin into lp:txstatsd

Proposed by Lucio Torre on 2012-02-07
Status: Rejected
Rejected by: Lucio Torre on 2012-02-07
Proposed branch: lp:~lucio.torre/txstatsd/add-distinct-db-plugin
Merge into: lp:txstatsd
Diff against target: 493 lines (+439/-0) (has conflicts)
9 files modified
README (+10/-0)
bin/schema.sql (+3/-0)
bin/start-database.sh (+63/-0)
bin/stop-database.sh (+22/-0)
distinctdb/distinctmetric.py (+96/-0)
distinctdb/tests/test_distinct.py (+147/-0)
distinctdb/version.py (+1/-0)
setup.py (+64/-0)
twisted/plugins/distinctdbplugin.py (+33/-0)
Conflict adding file README.  Moved existing file to README.moved.
Conflict adding file setup.py.  Moved existing file to setup.py.moved.
Conflict adding file twisted.  Moved existing file to twisted.moved.
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin
Reviewer Review Type Date Requested Status
txStatsD Developers 2012-02-07 Pending
Review via email: mp+91811@code.launchpad.net

Description of the change

missing test files

To post a comment you must log in.

Unmerged revisions

6. By Lucio Torre on 2011-12-09

added test files

5. By Lucio Torre on 2011-12-09

remove while, use INSERT .. RETURNING

4. By Lucio Torre on 2011-12-08

linted

3. By Lucio Torre on 2011-12-08

added some files

2. By Lucio Torre on 2011-12-08

almost done

1. By Lucio Torre on 2011-12-06

empty plugin skeleton

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'README'
2--- README 1970-01-01 00:00:00 +0000
3+++ README 2012-02-07 12:32:37 +0000
4@@ -0,0 +1,10 @@
5+To test:
6+./bin/start-database.sh
7+psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
8+
9+Then:
10+trial distinctdb
11+
12+When done:
13+./bin/stop-database.sh
14+
15
16=== renamed file 'README' => 'README.moved'
17=== added directory 'bin'
18=== added file 'bin/schema.sql'
19--- bin/schema.sql 1970-01-01 00:00:00 +0000
20+++ bin/schema.sql 2012-02-07 12:32:37 +0000
21@@ -0,0 +1,3 @@
22+CREATE TABLE paths (id SERIAL PRIMARY KEY NOT NULL, path TEXT NOT NULL UNIQUE);
23+CREATE TABLE points (path_id INTEGER, bucket INTEGER, value TEXT, count INTEGER);
24+CREATE INDEX points_idx ON points (path_id, bucket);
25
26=== added file 'bin/start-database.sh'
27--- bin/start-database.sh 1970-01-01 00:00:00 +0000
28+++ bin/start-database.sh 2012-02-07 12:32:37 +0000
29@@ -0,0 +1,63 @@
30+#! /bin/bash
31+
32+ROOTDIR=${ROOTDIR:-`bzr root`}
33+if [ ! -d "$ROOTDIR" ]; then
34+ echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
35+ exit 1
36+fi
37+
38+DATABASES="
39+distinct
40+"
41+
42+function setup_database() {
43+ local TESTDIR=$1
44+
45+ echo "## Starting postgres in $TESTDIR ##"
46+ mkdir -p "$TESTDIR/data"
47+ chmod 700 "$TESTDIR/data"
48+
49+ export PGHOST="$TESTDIR"
50+ export PGDATA="$TESTDIR/data"
51+ if [ -d /usr/lib/postgresql/8.4 ]; then
52+ export PGBINDIR=/usr/lib/postgresql/8.4/bin
53+ elif [ -d /usr/lib/postgresql/8.3 ]; then
54+ export PGBINDIR=/usr/lib/postgresql/8.3/bin
55+ else
56+ echo "Cannot find valid parent for PGBINDIR"
57+ fi
58+ $PGBINDIR/initdb -E UNICODE -D $PGDATA
59+ # set up the database options file
60+ if [ ! -e $PGDATA/postgresql.conf ]; then
61+ echo "PostgreSQL data directory apparently didn't init"
62+ else
63+ (
64+ cat <<EOF
65+search_path='\$user,public,ts2'
66+add_missing_from=false
67+log_statement='all'
68+log_line_prefix='[%m] %q%u@%d %c '
69+fsync = off
70+EOF
71+ ) > $PGDATA/postgresql.conf
72+ fi
73+ $PGBINDIR/initdb -A trust &>/dev/null
74+ $PGBINDIR/pg_ctl start -w -D $TESTDIR/data -l $TESTDIR/postgres.log -o "-F -k $TESTDIR -h ''"
75+ for db in $DATABASES; do
76+ $PGBINDIR/createdb --encoding UNICODE "$db" &>/dev/null
77+ $PGBINDIR/createlang plpgsql "$db"
78+ done
79+ $PGBINDIR/createuser --superuser --createdb "postgres" &>/dev/null
80+ # create the additional users we need via a psql script
81+ $PGBINDIR/psql -U postgres template1 <<EOF
82+CREATE ROLE client INHERIT;
83+
84+CREATE USER client IN ROLE client;
85+EOF
86+ echo "To set your environment so psql will connect to this DB instance type:"
87+ echo " export PGHOST=$TESTDIR"
88+ echo "## Done. ##"
89+ echo -n host=$TESTDIR dbname=distinct > $ROOTDIR/tmp/pg.dsn
90+}
91+
92+setup_database $ROOTDIR/tmp/db1
93
94=== added file 'bin/stop-database.sh'
95--- bin/stop-database.sh 1970-01-01 00:00:00 +0000
96+++ bin/stop-database.sh 2012-02-07 12:32:37 +0000
97@@ -0,0 +1,22 @@
98+#! /bin/bash
99+
100+ROOTDIR=${ROOTDIR:-`bzr root`}
101+if [ ! -d "$ROOTDIR" ]; then
102+ echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
103+ exit 1
104+fi
105+
106+if [ -d /usr/lib/postgresql/8.4 ]; then
107+ PGBINDIR=/usr/lib/postgresql/8.4/bin
108+elif [ -d /usr/lib/postgresql/8.3 ]; then
109+ PGBINDIR=/usr/lib/postgresql/8.3/bin
110+else
111+ echo "Cannot find valid parent for PGBINDIR"
112+fi
113+
114+# setting PGDATA tells pg_ctl which DB to talk to
115+export PGDATA=$ROOTDIR/tmp/db1/data/
116+$PGBINDIR/pg_ctl status > /dev/null
117+if [ $? = 0 ]; then
118+ $PGBINDIR/pg_ctl stop -t 60 -w -m fast
119+fi
120
121=== added directory 'distinctdb'
122=== added file 'distinctdb/__init__.py'
123=== added file 'distinctdb/distinctmetric.py'
124--- distinctdb/distinctmetric.py 1970-01-01 00:00:00 +0000
125+++ distinctdb/distinctmetric.py 2012-02-07 12:32:37 +0000
126@@ -0,0 +1,96 @@
127+import time
128+
129+import psycopg2
130+
131+from zope.interface import implements
132+from twisted.internet.threads import deferToThread
133+from txstatsd.itxstatsd import IMetric
134+
135+ONEDAY = 60 * 60 * 24
136+
137+
138+class DistinctMetricReporter(object):
139+ """
140+ Keeps an mesurement of the distinct numbers of items seen and the times
141+ it has seen each one.
142+ """
143+ implements(IMetric)
144+
145+ def __init__(self, name, wall_time_func=time.time, prefix="",
146+ bucket_size=ONEDAY, dsn=None):
147+ """Construct a metric we expect to be periodically updated.
148+
149+ @param name: Indicates what is being instrumented.
150+ @param wall_time_func: Function for obtaining wall time.
151+ @param prefix: If present, a string to prepend to the message
152+ composed when C{report} is called.
153+ """
154+ self.name = name
155+ self.wall_time_func = wall_time_func
156+ if prefix:
157+ prefix += '.'
158+ self.prefix = prefix
159+ self.bucket_size = bucket_size
160+ self.dsn = dsn
161+ self.metric_id = None
162+ self.build_bucket()
163+
164+ def build_bucket(self, timestamp=None):
165+ self.max = 0
166+ self.bucket = {}
167+ self.bucket_no = self.get_bucket_no(timestamp)
168+
169+ def get_bucket_no(self, timestamp=None):
170+ if timestamp is None:
171+ timestamp = self.wall_time_func()
172+ return int(timestamp / (self.bucket_size))
173+
174+ def process(self, fields):
175+ self.update(fields[0])
176+
177+ def update(self, item):
178+ value = self.bucket.get(item, 0) + 1
179+
180+ self.bucket[item] = value
181+ if value > self.max:
182+ self.max = value
183+
184+ def _save_bucket(self, bucket, bucket_no):
185+ path = self.prefix + self.name
186+ if self.metric_id is None:
187+ c = psycopg2.connect(self.dsn)
188+ cr = c.cursor()
189+ cr.execute("SELECT * FROM paths WHERE path = %s", (path,))
190+ row = cr.fetchone()
191+ if row is None:
192+ cr.execute("INSERT INTO paths (path) VALUES (%s) "
193+ "RETURNING (id)", (path,))
194+ row = cr.fetchone()
195+ cr.execute("commit")
196+
197+ self.metric_id = row[0]
198+
199+ for i, (k, v) in enumerate(bucket.iteritems()):
200+ cr.execute("INSERT INTO points (path_id, bucket, value, count) "
201+ "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
202+ k, v))
203+ if i % 1000 == 0:
204+ cr.execute("commit")
205+ cr.execute("commit")
206+
207+ def save_bucket(self, bucket, bucket_no):
208+ if self.dsn is not None:
209+ deferToThread(self._save_bucket, bucket, bucket_no)
210+
211+ def flush(self, interval, timestamp):
212+ current_bucket = self.get_bucket_no(timestamp)
213+ if current_bucket != self.bucket_no:
214+ self.save_bucket(self.bucket, self.bucket_no)
215+ self.build_bucket(timestamp)
216+
217+ metrics = []
218+ items = {".count": len(self.bucket),
219+ ".max": self.max}
220+ for item, value in items.iteritems():
221+ metrics.append((self.prefix + self.name + item, value, timestamp))
222+ return metrics
223
224=== added directory 'distinctdb/tests'
225=== added file 'distinctdb/tests/__init__.py'
226=== added file 'distinctdb/tests/test_distinct.py'
227--- distinctdb/tests/test_distinct.py 1970-01-01 00:00:00 +0000
228+++ distinctdb/tests/test_distinct.py 2012-02-07 12:32:37 +0000
229@@ -0,0 +1,147 @@
230+# Copyright (C) 2011 Canonical
231+# All Rights Reserved
232+
233+import ConfigParser
234+from cStringIO import StringIO
235+import os
236+import time
237+import subprocess
238+
239+import psycopg2
240+
241+from twisted.trial.unittest import TestCase
242+from twisted.plugin import getPlugins
243+from twisted.plugins import distinctdbplugin
244+from txstatsd.itxstatsd import IMetricFactory
245+from txstatsd import service
246+
247+from distinctdb import distinctmetric as distinct
248+
249+
250+class TestDistinctMetricReporter(TestCase):
251+
252+ def test_get_bucket_no(self):
253+ _wall_time = [0]
254+
255+ def _time():
256+ return _wall_time[0]
257+
258+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
259+ self.assertEquals(dmr.get_bucket_no(), 0)
260+ _wall_time = [60 * 60 * 24 + 1]
261+ dmr.update("one")
262+ self.assertEquals(dmr.get_bucket_no(), 1)
263+
264+ def test_max(self):
265+ _wall_time = [0]
266+
267+ def _time():
268+ return _wall_time[0]
269+
270+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
271+ self.assertEquals(dmr.get_bucket_no(), 0)
272+ self.assertEquals(dmr.max, 0)
273+ dmr.update("one")
274+ dmr.update("one")
275+ dmr.update("two")
276+ self.assertEquals(dmr.max, 2)
277+ dmr.flush(1, 60 * 60 * 24 + 1)
278+ dmr.update("one")
279+ self.assertEquals(dmr.max, 1)
280+
281+ def test_reports(self):
282+ _wall_time = [0]
283+
284+ def _time():
285+ return _wall_time[0]
286+
287+ result = {}
288+
289+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
290+
291+ def save(b, b_no):
292+ result["bucket"] = b
293+ result["bucket_no"] = b_no
294+ dmr.save_bucket = save
295+ dmr.update("one")
296+ dmr.update("one")
297+ dmr.update("two")
298+ day = 60 * 60 * 24 + 1
299+ dmr.flush(1, day)
300+ dmr.update("three")
301+ self.assertEquals(result,
302+ {"bucket": {"one": 2, "two": 1}, "bucket_no": 0})
303+ self.assertEquals(dmr.flush(1, day),
304+ [("test.max", 1, day), ("test.count", 1, day)])
305+
306+ def test_configure(self):
307+ class TestOptions(service.OptionsGlue):
308+ optParameters = [["test", "t", "default", "help"]]
309+ config_section = "statsd"
310+
311+ o = TestOptions()
312+ config_file = ConfigParser.RawConfigParser()
313+ config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
314+ "dsn = dbdsn\nbucket_size = 100"))
315+ o.configure(config_file)
316+ dmf = distinctdbplugin.DistinctMetricFactory()
317+ dmf.configure(o)
318+ dmr = dmf.build_metric("foo", "bar", time.time)
319+ self.assertEquals(dmr.bucket_size, 100)
320+ self.assertEquals(dmr.dsn, "dbdsn")
321+
322+
323+class TestPlugin(TestCase):
324+
325+ def test_factory(self):
326+ self.assertTrue(distinctdbplugin.distinct_metric_factory in \
327+ list(getPlugins(IMetricFactory)))
328+
329+
330+class TestDatabase(TestCase):
331+
332+ def setUp(self):
333+ rootdir = subprocess.check_output(["bzr", "root"]).strip()
334+ dsn_file = os.path.join(rootdir, "tmp", "pg.dsn")
335+ self.dsn = open(dsn_file).read()
336+ self.conn = psycopg2.connect(self.dsn)
337+
338+ def tearDown(self):
339+ cr = self.conn.cursor()
340+ cr.execute("rollback")
341+ cr.execute("DELETE FROM paths")
342+ cr.execute("DELETE FROM points")
343+ cr.execute("commit")
344+
345+ def test_connect(self):
346+ cr = self.conn.cursor()
347+ cr.execute("SELECT 0")
348+ result = cr.fetchall()
349+ self.assertTrue(result, [(0,)])
350+
351+ def test_create_metric_id(self):
352+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
353+ dmr._save_bucket({}, 0)
354+ cr = self.conn.cursor()
355+ cr.execute("SELECT * FROM paths WHERE path = 'test'")
356+ cr.execute("SELECT * FROM paths")
357+ self.assertEquals(len(cr.fetchall()), 1)
358+
359+ def test_find_saved_data(self):
360+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
361+ dmr.update("one")
362+ dmr.update("one")
363+ dmr.update("two")
364+ dmr._save_bucket(dmr.bucket, 0)
365+ cr = self.conn.cursor()
366+ cr.execute("SELECT * FROM points ORDER BY value")
367+ rows = cr.fetchall()
368+ self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2),
369+ (dmr.metric_id, 0, "two", 1)])
370+
371+ def test_load_metric_id(self):
372+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
373+ dmr._save_bucket({}, 0)
374+ dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn)
375+ dmr2._save_bucket({}, 0)
376+ self.assertEquals(dmr.metric_id, dmr2.metric_id)
377
378=== added file 'distinctdb/version.py'
379--- distinctdb/version.py 1970-01-01 00:00:00 +0000
380+++ distinctdb/version.py 2012-02-07 12:32:37 +0000
381@@ -0,0 +1,1 @@
382+distinctplugin = "0.0.1"
383
384=== added file 'setup.py'
385--- setup.py 1970-01-01 00:00:00 +0000
386+++ setup.py 2012-02-07 12:32:37 +0000
387@@ -0,0 +1,64 @@
388+from distutils.core import setup
389+from distutils.command.install import install
390+import os
391+
392+from twisted.plugin import IPlugin, getPlugins
393+
394+from distinctplugin import version
395+
396+# If setuptools is present, use it to find_packages(), and also
397+# declare our dependency on epsilon.
398+extra_setup_args = {}
399+try:
400+ import setuptools
401+ from setuptools import find_packages
402+except ImportError:
403+ def find_packages():
404+ """
405+ Compatibility wrapper.
406+
407+ Taken from storm setup.py.
408+ """
409+ packages = []
410+ for directory, subdirectories, files in os.walk("distinctplugin"):
411+ if '__init__.py' in files:
412+ packages.append(directory.replace(os.sep, '.'))
413+ return packages
414+
415+long_description = """
416+A plugin for txstatsd to count disctinct values using redis and postgres.
417+"""
418+
419+
420+class TxPluginInstaller(install):
421+ def run(self):
422+ install.run(self)
423+ # Make sure we refresh the plugin list when installing, so we know
424+ # we have enough write permissions.
425+ # see http://twistedmatrix.com/documents/current/core/howto/plugin.html
426+ # "when installing or removing software which provides Twisted plugins,
427+ # the site administrator should be sure the cache is regenerated"
428+ list(getPlugins(IPlugin))
429+
430+setup(
431+ cmdclass={'install': TxPluginInstaller},
432+ name="distinctplugin",
433+ version=version.distinctplugin,
434+ description="A txstatsd plugin for distinct counts",
435+ author="txStatsD Developers",
436+ url="https://launchpad.net/txstatsd",
437+ license="MIT",
438+ packages=find_packages() + ["twisted.plugins"],
439+ long_description=long_description,
440+ classifiers=[
441+ "Development Status :: 4 - Beta",
442+ "Intended Audience :: Developers",
443+ "Intended Audience :: System Administrators",
444+ "Intended Audience :: Information Technology",
445+ "Programming Language :: Python",
446+ "Topic :: Database",
447+ "Topic :: Internet :: WWW/HTTP",
448+ "License :: OSI Approved :: MIT License",
449+ ],
450+ **extra_setup_args
451+ )
452
453=== renamed file 'setup.py' => 'setup.py.moved'
454=== added directory 'twisted'
455=== renamed directory 'twisted' => 'twisted.moved'
456=== added directory 'twisted/plugins'
457=== added file 'twisted/plugins/distinctdbplugin.py'
458--- twisted/plugins/distinctdbplugin.py 1970-01-01 00:00:00 +0000
459+++ twisted/plugins/distinctdbplugin.py 2012-02-07 12:32:37 +0000
460@@ -0,0 +1,33 @@
461+from zope.interface import implements
462+
463+from twisted.plugin import IPlugin
464+from txstatsd.itxstatsd import IMetricFactory
465+from distinctdb.distinctmetric import DistinctMetricReporter, ONEDAY
466+
467+
468+class DistinctMetricFactory(object):
469+ implements(IMetricFactory, IPlugin)
470+
471+ name = "distinct"
472+ metric_type = "d"
473+
474+ bucket_size = None
475+ dsn = None
476+ metric_ids = None
477+
478+ def build_metric(self, prefix, name, wall_time_func=None):
479+ return DistinctMetricReporter(name, prefix=prefix,
480+ wall_time_func=wall_time_func,
481+ bucket_size=self.bucket_size,
482+ dsn=self.dsn)
483+
484+ def configure(self, options):
485+ self.section = dict(options.get("plugin_distinctdb", {}))
486+ try:
487+ self.bucket_size = int(self.section.get("bucket_size", ONEDAY))
488+ except ValueError:
489+ self.bucket_size = ONEDAY
490+
491+ self.dsn = self.section.get("dsn", None)
492+
493+distinct_metric_factory = DistinctMetricFactory()

Subscribers

People subscribed via source and target branches