Merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin into lp:txstatsd

Proposed by Lucio Torre
Status: Rejected
Rejected by: Lucio Torre
Proposed branch: lp:~lucio.torre/txstatsd/add-distinct-db-plugin
Merge into: lp:txstatsd
Diff against target: 493 lines (+439/-0) (has conflicts)
9 files modified
README (+10/-0)
bin/schema.sql (+3/-0)
bin/start-database.sh (+63/-0)
bin/stop-database.sh (+22/-0)
distinctdb/distinctmetric.py (+96/-0)
distinctdb/tests/test_distinct.py (+147/-0)
distinctdb/version.py (+1/-0)
setup.py (+64/-0)
twisted/plugins/distinctdbplugin.py (+33/-0)
Conflict adding file README.  Moved existing file to README.moved.
Conflict adding file setup.py.  Moved existing file to setup.py.moved.
Conflict adding file twisted.  Moved existing file to twisted.moved.
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin
Reviewer Review Type Date Requested Status
txStatsD Developers Pending
Review via email: mp+91811@code.launchpad.net

Description of the change

missing test files

To post a comment you must log in.

Unmerged revisions

6. By Lucio Torre

added test files

5. By Lucio Torre

remove while, use INSERT .. RETURNING

4. By Lucio Torre

linted

3. By Lucio Torre

added some files

2. By Lucio Torre

almost done

1. By Lucio Torre

empty plugin skeleton

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'README'
--- README 1970-01-01 00:00:00 +0000
+++ README 2012-02-07 12:32:37 +0000
@@ -0,0 +1,10 @@
1To test:
2./bin/start-database.sh
3psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
4
5Then:
6trial distinctdb
7
8When done:
9./bin/stop-database.sh
10
011
=== renamed file 'README' => 'README.moved'
=== added directory 'bin'
=== added file 'bin/schema.sql'
--- bin/schema.sql 1970-01-01 00:00:00 +0000
+++ bin/schema.sql 2012-02-07 12:32:37 +0000
@@ -0,0 +1,3 @@
1CREATE TABLE paths (id SERIAL PRIMARY KEY NOT NULL, path TEXT NOT NULL UNIQUE);
2CREATE TABLE points (path_id INTEGER, bucket INTEGER, value TEXT, count INTEGER);
3CREATE INDEX points_idx ON points (path_id, bucket);
04
=== added file 'bin/start-database.sh'
--- bin/start-database.sh 1970-01-01 00:00:00 +0000
+++ bin/start-database.sh 2012-02-07 12:32:37 +0000
@@ -0,0 +1,63 @@
1#! /bin/bash
2
3ROOTDIR=${ROOTDIR:-`bzr root`}
4if [ ! -d "$ROOTDIR" ]; then
5 echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
6 exit 1
7fi
8
9DATABASES="
10distinct
11"
12
13function setup_database() {
14 local TESTDIR=$1
15
16 echo "## Starting postgres in $TESTDIR ##"
17 mkdir -p "$TESTDIR/data"
18 chmod 700 "$TESTDIR/data"
19
20 export PGHOST="$TESTDIR"
21 export PGDATA="$TESTDIR/data"
22 if [ -d /usr/lib/postgresql/8.4 ]; then
23 export PGBINDIR=/usr/lib/postgresql/8.4/bin
24 elif [ -d /usr/lib/postgresql/8.3 ]; then
25 export PGBINDIR=/usr/lib/postgresql/8.3/bin
26 else
27 echo "Cannot find valid parent for PGBINDIR"
28 fi
29 $PGBINDIR/initdb -E UNICODE -D $PGDATA
30 # set up the database options file
31 if [ ! -e $PGDATA/postgresql.conf ]; then
32 echo "PostgreSQL data directory apparently didn't init"
33 else
34 (
35 cat <<EOF
36search_path='\$user,public,ts2'
37add_missing_from=false
38log_statement='all'
39log_line_prefix='[%m] %q%u@%d %c '
40fsync = off
41EOF
42 ) > $PGDATA/postgresql.conf
43 fi
44 $PGBINDIR/initdb -A trust &>/dev/null
45 $PGBINDIR/pg_ctl start -w -D $TESTDIR/data -l $TESTDIR/postgres.log -o "-F -k $TESTDIR -h ''"
46 for db in $DATABASES; do
47 $PGBINDIR/createdb --encoding UNICODE "$db" &>/dev/null
48 $PGBINDIR/createlang plpgsql "$db"
49 done
50 $PGBINDIR/createuser --superuser --createdb "postgres" &>/dev/null
51 # create the additional users we need via a psql script
52 $PGBINDIR/psql -U postgres template1 <<EOF
53CREATE ROLE client INHERIT;
54
55CREATE USER client IN ROLE client;
56EOF
57 echo "To set your environment so psql will connect to this DB instance type:"
58 echo " export PGHOST=$TESTDIR"
59 echo "## Done. ##"
60 echo -n host=$TESTDIR dbname=distinct > $ROOTDIR/tmp/pg.dsn
61}
62
63setup_database $ROOTDIR/tmp/db1
064
=== added file 'bin/stop-database.sh'
--- bin/stop-database.sh 1970-01-01 00:00:00 +0000
+++ bin/stop-database.sh 2012-02-07 12:32:37 +0000
@@ -0,0 +1,22 @@
1#! /bin/bash
2
3ROOTDIR=${ROOTDIR:-`bzr root`}
4if [ ! -d "$ROOTDIR" ]; then
5 echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
6 exit 1
7fi
8
9if [ -d /usr/lib/postgresql/8.4 ]; then
10 PGBINDIR=/usr/lib/postgresql/8.4/bin
11elif [ -d /usr/lib/postgresql/8.3 ]; then
12 PGBINDIR=/usr/lib/postgresql/8.3/bin
13else
14 echo "Cannot find valid parent for PGBINDIR"
15fi
16
17# setting PGDATA tells pg_ctl which DB to talk to
18export PGDATA=$ROOTDIR/tmp/db1/data/
19$PGBINDIR/pg_ctl status > /dev/null
20if [ $? = 0 ]; then
21 $PGBINDIR/pg_ctl stop -t 60 -w -m fast
22fi
023
=== added directory 'distinctdb'
=== added file 'distinctdb/__init__.py'
=== added file 'distinctdb/distinctmetric.py'
--- distinctdb/distinctmetric.py 1970-01-01 00:00:00 +0000
+++ distinctdb/distinctmetric.py 2012-02-07 12:32:37 +0000
@@ -0,0 +1,96 @@
1import time
2
3import psycopg2
4
5from zope.interface import implements
6from twisted.internet.threads import deferToThread
7from txstatsd.itxstatsd import IMetric
8
9ONEDAY = 60 * 60 * 24
10
11
12class DistinctMetricReporter(object):
13 """
14 Keeps an mesurement of the distinct numbers of items seen and the times
15 it has seen each one.
16 """
17 implements(IMetric)
18
19 def __init__(self, name, wall_time_func=time.time, prefix="",
20 bucket_size=ONEDAY, dsn=None):
21 """Construct a metric we expect to be periodically updated.
22
23 @param name: Indicates what is being instrumented.
24 @param wall_time_func: Function for obtaining wall time.
25 @param prefix: If present, a string to prepend to the message
26 composed when C{report} is called.
27 """
28 self.name = name
29 self.wall_time_func = wall_time_func
30 if prefix:
31 prefix += '.'
32 self.prefix = prefix
33 self.bucket_size = bucket_size
34 self.dsn = dsn
35 self.metric_id = None
36 self.build_bucket()
37
38 def build_bucket(self, timestamp=None):
39 self.max = 0
40 self.bucket = {}
41 self.bucket_no = self.get_bucket_no(timestamp)
42
43 def get_bucket_no(self, timestamp=None):
44 if timestamp is None:
45 timestamp = self.wall_time_func()
46 return int(timestamp / (self.bucket_size))
47
48 def process(self, fields):
49 self.update(fields[0])
50
51 def update(self, item):
52 value = self.bucket.get(item, 0) + 1
53
54 self.bucket[item] = value
55 if value > self.max:
56 self.max = value
57
58 def _save_bucket(self, bucket, bucket_no):
59 path = self.prefix + self.name
60 if self.metric_id is None:
61 c = psycopg2.connect(self.dsn)
62 cr = c.cursor()
63 cr.execute("SELECT * FROM paths WHERE path = %s", (path,))
64 row = cr.fetchone()
65 if row is None:
66 cr.execute("INSERT INTO paths (path) VALUES (%s) "
67 "RETURNING (id)", (path,))
68 row = cr.fetchone()
69 cr.execute("commit")
70
71 self.metric_id = row[0]
72
73 for i, (k, v) in enumerate(bucket.iteritems()):
74 cr.execute("INSERT INTO points (path_id, bucket, value, count) "
75 "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
76 k, v))
77 if i % 1000 == 0:
78 cr.execute("commit")
79 cr.execute("commit")
80
81 def save_bucket(self, bucket, bucket_no):
82 if self.dsn is not None:
83 deferToThread(self._save_bucket, bucket, bucket_no)
84
85 def flush(self, interval, timestamp):
86 current_bucket = self.get_bucket_no(timestamp)
87 if current_bucket != self.bucket_no:
88 self.save_bucket(self.bucket, self.bucket_no)
89 self.build_bucket(timestamp)
90
91 metrics = []
92 items = {".count": len(self.bucket),
93 ".max": self.max}
94 for item, value in items.iteritems():
95 metrics.append((self.prefix + self.name + item, value, timestamp))
96 return metrics
097
=== added directory 'distinctdb/tests'
=== added file 'distinctdb/tests/__init__.py'
=== added file 'distinctdb/tests/test_distinct.py'
--- distinctdb/tests/test_distinct.py 1970-01-01 00:00:00 +0000
+++ distinctdb/tests/test_distinct.py 2012-02-07 12:32:37 +0000
@@ -0,0 +1,147 @@
1# Copyright (C) 2011 Canonical
2# All Rights Reserved
3
4import ConfigParser
5from cStringIO import StringIO
6import os
7import time
8import subprocess
9
10import psycopg2
11
12from twisted.trial.unittest import TestCase
13from twisted.plugin import getPlugins
14from twisted.plugins import distinctdbplugin
15from txstatsd.itxstatsd import IMetricFactory
16from txstatsd import service
17
18from distinctdb import distinctmetric as distinct
19
20
21class TestDistinctMetricReporter(TestCase):
22
23 def test_get_bucket_no(self):
24 _wall_time = [0]
25
26 def _time():
27 return _wall_time[0]
28
29 dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
30 self.assertEquals(dmr.get_bucket_no(), 0)
31 _wall_time = [60 * 60 * 24 + 1]
32 dmr.update("one")
33 self.assertEquals(dmr.get_bucket_no(), 1)
34
35 def test_max(self):
36 _wall_time = [0]
37
38 def _time():
39 return _wall_time[0]
40
41 dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
42 self.assertEquals(dmr.get_bucket_no(), 0)
43 self.assertEquals(dmr.max, 0)
44 dmr.update("one")
45 dmr.update("one")
46 dmr.update("two")
47 self.assertEquals(dmr.max, 2)
48 dmr.flush(1, 60 * 60 * 24 + 1)
49 dmr.update("one")
50 self.assertEquals(dmr.max, 1)
51
52 def test_reports(self):
53 _wall_time = [0]
54
55 def _time():
56 return _wall_time[0]
57
58 result = {}
59
60 dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
61
62 def save(b, b_no):
63 result["bucket"] = b
64 result["bucket_no"] = b_no
65 dmr.save_bucket = save
66 dmr.update("one")
67 dmr.update("one")
68 dmr.update("two")
69 day = 60 * 60 * 24 + 1
70 dmr.flush(1, day)
71 dmr.update("three")
72 self.assertEquals(result,
73 {"bucket": {"one": 2, "two": 1}, "bucket_no": 0})
74 self.assertEquals(dmr.flush(1, day),
75 [("test.max", 1, day), ("test.count", 1, day)])
76
77 def test_configure(self):
78 class TestOptions(service.OptionsGlue):
79 optParameters = [["test", "t", "default", "help"]]
80 config_section = "statsd"
81
82 o = TestOptions()
83 config_file = ConfigParser.RawConfigParser()
84 config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
85 "dsn = dbdsn\nbucket_size = 100"))
86 o.configure(config_file)
87 dmf = distinctdbplugin.DistinctMetricFactory()
88 dmf.configure(o)
89 dmr = dmf.build_metric("foo", "bar", time.time)
90 self.assertEquals(dmr.bucket_size, 100)
91 self.assertEquals(dmr.dsn, "dbdsn")
92
93
94class TestPlugin(TestCase):
95
96 def test_factory(self):
97 self.assertTrue(distinctdbplugin.distinct_metric_factory in \
98 list(getPlugins(IMetricFactory)))
99
100
101class TestDatabase(TestCase):
102
103 def setUp(self):
104 rootdir = subprocess.check_output(["bzr", "root"]).strip()
105 dsn_file = os.path.join(rootdir, "tmp", "pg.dsn")
106 self.dsn = open(dsn_file).read()
107 self.conn = psycopg2.connect(self.dsn)
108
109 def tearDown(self):
110 cr = self.conn.cursor()
111 cr.execute("rollback")
112 cr.execute("DELETE FROM paths")
113 cr.execute("DELETE FROM points")
114 cr.execute("commit")
115
116 def test_connect(self):
117 cr = self.conn.cursor()
118 cr.execute("SELECT 0")
119 result = cr.fetchall()
120 self.assertTrue(result, [(0,)])
121
122 def test_create_metric_id(self):
123 dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
124 dmr._save_bucket({}, 0)
125 cr = self.conn.cursor()
126 cr.execute("SELECT * FROM paths WHERE path = 'test'")
127 cr.execute("SELECT * FROM paths")
128 self.assertEquals(len(cr.fetchall()), 1)
129
130 def test_find_saved_data(self):
131 dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
132 dmr.update("one")
133 dmr.update("one")
134 dmr.update("two")
135 dmr._save_bucket(dmr.bucket, 0)
136 cr = self.conn.cursor()
137 cr.execute("SELECT * FROM points ORDER BY value")
138 rows = cr.fetchall()
139 self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2),
140 (dmr.metric_id, 0, "two", 1)])
141
142 def test_load_metric_id(self):
143 dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
144 dmr._save_bucket({}, 0)
145 dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn)
146 dmr2._save_bucket({}, 0)
147 self.assertEquals(dmr.metric_id, dmr2.metric_id)
0148
=== added file 'distinctdb/version.py'
--- distinctdb/version.py 1970-01-01 00:00:00 +0000
+++ distinctdb/version.py 2012-02-07 12:32:37 +0000
@@ -0,0 +1,1 @@
1distinctplugin = "0.0.1"
02
=== added file 'setup.py'
--- setup.py 1970-01-01 00:00:00 +0000
+++ setup.py 2012-02-07 12:32:37 +0000
@@ -0,0 +1,64 @@
1from distutils.core import setup
2from distutils.command.install import install
3import os
4
5from twisted.plugin import IPlugin, getPlugins
6
7from distinctplugin import version
8
9# If setuptools is present, use it to find_packages(), and also
10# declare our dependency on epsilon.
11extra_setup_args = {}
12try:
13 import setuptools
14 from setuptools import find_packages
15except ImportError:
16 def find_packages():
17 """
18 Compatibility wrapper.
19
20 Taken from storm setup.py.
21 """
22 packages = []
23 for directory, subdirectories, files in os.walk("distinctplugin"):
24 if '__init__.py' in files:
25 packages.append(directory.replace(os.sep, '.'))
26 return packages
27
28long_description = """
29A plugin for txstatsd to count disctinct values using redis and postgres.
30"""
31
32
33class TxPluginInstaller(install):
34 def run(self):
35 install.run(self)
36 # Make sure we refresh the plugin list when installing, so we know
37 # we have enough write permissions.
38 # see http://twistedmatrix.com/documents/current/core/howto/plugin.html
39 # "when installing or removing software which provides Twisted plugins,
40 # the site administrator should be sure the cache is regenerated"
41 list(getPlugins(IPlugin))
42
43setup(
44 cmdclass={'install': TxPluginInstaller},
45 name="distinctplugin",
46 version=version.distinctplugin,
47 description="A txstatsd plugin for distinct counts",
48 author="txStatsD Developers",
49 url="https://launchpad.net/txstatsd",
50 license="MIT",
51 packages=find_packages() + ["twisted.plugins"],
52 long_description=long_description,
53 classifiers=[
54 "Development Status :: 4 - Beta",
55 "Intended Audience :: Developers",
56 "Intended Audience :: System Administrators",
57 "Intended Audience :: Information Technology",
58 "Programming Language :: Python",
59 "Topic :: Database",
60 "Topic :: Internet :: WWW/HTTP",
61 "License :: OSI Approved :: MIT License",
62 ],
63 **extra_setup_args
64 )
065
=== renamed file 'setup.py' => 'setup.py.moved'
=== added directory 'twisted'
=== renamed directory 'twisted' => 'twisted.moved'
=== added directory 'twisted/plugins'
=== added file 'twisted/plugins/distinctdbplugin.py'
--- twisted/plugins/distinctdbplugin.py 1970-01-01 00:00:00 +0000
+++ twisted/plugins/distinctdbplugin.py 2012-02-07 12:32:37 +0000
@@ -0,0 +1,33 @@
1from zope.interface import implements
2
3from twisted.plugin import IPlugin
4from txstatsd.itxstatsd import IMetricFactory
5from distinctdb.distinctmetric import DistinctMetricReporter, ONEDAY
6
7
8class DistinctMetricFactory(object):
9 implements(IMetricFactory, IPlugin)
10
11 name = "distinct"
12 metric_type = "d"
13
14 bucket_size = None
15 dsn = None
16 metric_ids = None
17
18 def build_metric(self, prefix, name, wall_time_func=None):
19 return DistinctMetricReporter(name, prefix=prefix,
20 wall_time_func=wall_time_func,
21 bucket_size=self.bucket_size,
22 dsn=self.dsn)
23
24 def configure(self, options):
25 self.section = dict(options.get("plugin_distinctdb", {}))
26 try:
27 self.bucket_size = int(self.section.get("bucket_size", ONEDAY))
28 except ValueError:
29 self.bucket_size = ONEDAY
30
31 self.dsn = self.section.get("dsn", None)
32
33distinct_metric_factory = DistinctMetricFactory()

Subscribers

People subscribed via source and target branches