Merge lp:~sidnei/txstatsd/distinct-plugin-webstats into lp:~beuno/txstatsd/distinct-plugin-webstats

Proposed by Sidnei da Silva
Status: Merged
Merged at revision: 13
Proposed branch: lp:~sidnei/txstatsd/distinct-plugin-webstats
Merge into: lp:~beuno/txstatsd/distinct-plugin-webstats
Diff against target: 423 lines (+183/-115)
2 files modified
distinctdb/distinctmetric.py (+60/-46)
distinctdb/tests/test_distinct.py (+123/-69)
To merge this branch: bzr merge lp:~sidnei/txstatsd/distinct-plugin-webstats
Reviewer Review Type Date Requested Status
Martin Albisetti Pending
Review via email: mp+97457@code.launchpad.net

Description of the change

Refactor a bit so that top/count are sub urls of the DistinctMetricResource.

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'distinctdb/distinctmetric.py'
2--- distinctdb/distinctmetric.py 2012-03-12 23:18:39 +0000
3+++ distinctdb/distinctmetric.py 2012-03-14 16:56:19 +0000
4@@ -1,13 +1,13 @@
5 import json
6 import time
7 import threading
8+from functools import partial
9
10 import psycopg2
11 import redis
12
13 from zope.interface import implements
14-from twisted.internet import reactor
15-from twisted.internet.task import deferLater
16+from twisted.internet import reactor, threads
17 from twisted.web import server, resource
18 from txstatsd.itxstatsd import IMetric
19
20@@ -16,56 +16,40 @@
21 ONE_DAY = 24 * ONE_HOUR
22
23
24+class JSONMethodResource(resource.Resource):
25+ """Renders the result of calling C{name} on C{target} as a json result."""
26+ isLeaf = True
27+
28+ def __init__(self, target, name):
29+ self.target = target
30+ self.name = name
31+
32+ def _render_json_error(self, request, result):
33+ """Fetch the data, make the request return"""
34+ request.write(json.dumps({"error": result.getErrorMessage()}))
35+ request.finish()
36+
37+ def _render_json_result(self, request, result):
38+ """Fetch the data, make the request return"""
39+ request.write(json.dumps({"result": result}))
40+ request.finish()
41+
42+ def render_GET(self, request):
43+ """Return the web request asynchronously."""
44+ d = threads.deferToThread(getattr(self.target, self.name), **request.args)
45+ d.addCallback(partial(self._render_json_result, request))
46+ d.addErrback(partial(self._render_json_error, request))
47+ return server.NOT_DONE_YET
48+
49+
50 class DistinctResource(resource.Resource):
51 """Returns statistics about unique values."""
52- isLeaf = True
53
54 def __init__(self, reporter):
55 resource.Resource.__init__(self)
56 self.reporter = reporter
57-
58- def _return_request(self, request):
59- """Fetch the data, make the request return"""
60- #XXX: No idea how to properly distinguish which method to call
61- data = self.get_distinct_count(request.args['since'],
62- request.args['until'])
63- request.write(json.dumps(data))
64- request.finish()
65-
66- def render_GET(self, request):
67- """Return the web request asynchronously."""
68- d = deferLater(reactor, 5, lambda: request)
69- d.addCallback(self._return_request)
70- return server.NOT_DONE_YET
71-
72- def get_distinct_count(self, since, until):
73- """Get a distinct count for a path between certain ranges."""
74- since_bucket = self.reporter.get_bucket_no(since)
75- until_bucket = self.reporter.get_bucket_no(until)
76- path = self.reporter.prefix + self.reporter.name
77- c = psycopg2.connect(self.reporter.dsn)
78- cr = c.cursor()
79- cr.execute("SELECT COUNT(DISTINCT value) FROM points "
80- "INNER JOIN paths ON (paths.id = points.path_id) "
81- "WHERE paths.path = %s AND bucket BETWEEN %s AND %s", (
82- path, since_bucket, until_bucket,))
83- row = cr.fetchone()
84- return row[0]
85-
86- def get_distinct_top_value(self, since, until, how_many=20):
87- """Get the top distinct values for a path between certain ranges."""
88- since_bucket = self.reporter.get_bucket_no(since)
89- until_bucket = self.reporter.get_bucket_no(until)
90- path = self.reporter.prefix + self.reporter.name
91- c = psycopg2.connect(self.reporter.dsn)
92- cr = c.cursor()
93- cr.execute("SELECT value, COUNT(value) AS cnt FROM points "
94- "INNER JOIN paths ON (paths.id = points.path_id) "
95- "WHERE paths.path = %s AND bucket BETWEEN %s AND %s"
96- "GROUP BY value ORDER BY cnt DESC LIMIT %s", (
97- path, since_bucket, until_bucket, how_many,))
98- rows = cr.fetchall()
99- return rows
100+ self.putChild("top", JSONMethodResource(self.reporter, "get_distinct_top_value"))
101+ self.putChild("count", JSONMethodResource(self.reporter, "get_distinct_count"))
102
103
104 class DistinctMetricReporter(object):
105@@ -209,3 +193,33 @@
106 for item, value in items.iteritems():
107 metrics.append((self.prefix + self.name + item, value, timestamp))
108 return metrics
109+
110+ def get_distinct_count(self, since, until):
111+ """Get a distinct count for a path between certain ranges."""
112+ since_bucket = self.get_bucket_no(since)
113+ until_bucket = self.get_bucket_no(until)
114+ path = self.prefix + self.name
115+ c = psycopg2.connect(self.dsn)
116+ cr = c.cursor()
117+ cr.execute("SELECT COUNT(DISTINCT value) FROM points "
118+ "INNER JOIN paths ON (paths.id = points.path_id) "
119+ "WHERE paths.path = %s AND bucket BETWEEN %s AND %s", (
120+ path, since_bucket, until_bucket,))
121+ row = cr.fetchone()
122+ return row[0]
123+
124+ def get_distinct_top_value(self, since, until, how_many=20):
125+ """Get the top distinct values for a path between certain ranges."""
126+ since_bucket = self.get_bucket_no(since)
127+ until_bucket = self.get_bucket_no(until)
128+ path = self.prefix + self.name
129+ c = psycopg2.connect(self.dsn)
130+ cr = c.cursor()
131+ cr.execute("SELECT value, COUNT(value) AS cnt FROM points "
132+ "INNER JOIN paths ON (paths.id = points.path_id) "
133+ "WHERE paths.path = %s AND bucket BETWEEN %s AND %s"
134+ "GROUP BY value ORDER BY cnt DESC LIMIT %s", (
135+ path, since_bucket, until_bucket, how_many,))
136+ rows = cr.fetchall()
137+ return rows
138+
139
140=== modified file 'distinctdb/tests/test_distinct.py'
141--- distinctdb/tests/test_distinct.py 2012-03-12 22:36:36 +0000
142+++ distinctdb/tests/test_distinct.py 2012-03-14 16:56:19 +0000
143@@ -1,10 +1,12 @@
144 # Copyright (C) 2011 Canonical
145 # All Rights Reserved
146
147+import json
148 import ConfigParser
149 from cStringIO import StringIO
150 import os
151 import time
152+
153 try:
154 from subprocess import check_output
155 except ImportError:
156@@ -21,12 +23,32 @@
157 from twisted.internet import reactor
158 from twisted.plugin import getPlugins
159 from twisted.plugins import distinctdbplugin
160+from twisted.web.test.test_web import DummyRequest
161+
162 from txstatsd.itxstatsd import IMetricFactory
163 from txstatsd import service
164
165 from distinctdb import distinctmetric as distinct
166
167
168+class DummyReporter(object):
169+
170+ def __init__(self):
171+ self.called = []
172+
173+ def get_foo(self):
174+ self.called.append(("get_foo", ()))
175+ return "foo"
176+
177+ def get_distinct_count(self, since, until):
178+ self.called.append(("get_distinct_count", (since, until)))
179+ return 42
180+
181+ def get_distinct_top_value(self, since, until, how_many=20):
182+ self.called.append(("get_distinct_top_value", (since, until, how_many)))
183+ return [("one", 1), ("two", 1)]
184+
185+
186 class TestDistinctMetricReporter(TestCase):
187
188 def test_get_bucket_no(self):
189@@ -102,8 +124,62 @@
190 self.assertEquals(dmr.dsn, "dbdsn")
191
192
193+class TestJSONMethodResource(TestCase):
194+
195+ def test_render_result_as_json(self):
196+ reporter = DummyReporter()
197+ request = DummyRequest([])
198+ resource = distinct.JSONMethodResource(reporter, "get_foo")
199+ def check(result):
200+ self.assertEquals({"result": "foo"},
201+ json.loads("".join(request.written)))
202+ d = request.notifyFinish()
203+ d.addCallback(check)
204+ d.addErrback(self.fail)
205+ request.render(resource)
206+ return d
207+
208+
209 class TestDistinctResource(TestCase):
210
211+ def test_render_top_resource(self):
212+ reporter = DummyReporter()
213+ request = DummyRequest([])
214+ request.args = {"since": time.time(), "until": time.time() + 1}
215+ resource = distinct.DistinctResource(reporter)
216+ child_resource = resource.getChildWithDefault("top", request)
217+ def check(result):
218+ self.assertEquals(json.dumps({"result": [("one", 1), ("two", 1)]}),
219+ "".join(request.written))
220+ d = request.notifyFinish()
221+ d.addCallback(check)
222+ request.render(child_resource)
223+ return d
224+
225+ def test_render_count_resource(self):
226+ reporter = DummyReporter()
227+ request = DummyRequest([])
228+ request.args = {"since": time.time(), "until": time.time() + 1}
229+ resource = distinct.DistinctResource(reporter)
230+ child_resource = resource.getChildWithDefault("count", request)
231+ def check(result):
232+ self.assertEquals(json.dumps({"result": 42}),
233+ "".join(request.written))
234+ d = request.notifyFinish()
235+ d.addCallback(check)
236+ request.render(child_resource)
237+ return d
238+
239+
240+class TestPlugin(TestCase):
241+
242+ def test_factory(self):
243+ self.assertTrue(distinctdbplugin.distinct_metric_factory in \
244+ list(getPlugins(IMetricFactory)))
245+
246+
247+class TestDatabase(TestCase):
248+
249 def setUp(self):
250 rootdir = check_output(["bzr", "root"]).strip()
251 dsn_file = os.path.join(rootdir, "tmp", "pg.dsn")
252@@ -123,10 +199,45 @@
253 dmr.update(name)
254 dmr._save_bucket(dmr.bucket, bucket_no)
255
256+
257+class TestDatabaseMetricStorage(TestDatabase):
258+
259+ def test_connect(self):
260+ cr = self.conn.cursor()
261+ cr.execute("SELECT 0")
262+ result = cr.fetchall()
263+ self.assertTrue(result, [(0,)])
264+
265+ def test_create_metric_id(self):
266+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
267+ dmr._save_bucket({}, 0)
268+ cr = self.conn.cursor()
269+ cr.execute("SELECT * FROM paths WHERE path = 'test'")
270+ cr.execute("SELECT * FROM paths")
271+ self.assertEquals(len(cr.fetchall()), 1)
272+
273+ def test_find_saved_data(self):
274+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
275+ dmr.update("one")
276+ dmr.update("one")
277+ dmr.update("two")
278+ dmr._save_bucket(dmr.bucket, 0)
279+ cr = self.conn.cursor()
280+ cr.execute("SELECT * FROM points ORDER BY value")
281+ rows = cr.fetchall()
282+ self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2),
283+ (dmr.metric_id, 0, "two", 1)])
284+
285+ def test_load_metric_id(self):
286+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
287+ dmr._save_bucket({}, 0)
288+ dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn)
289+ dmr2._save_bucket({}, 0)
290+ self.assertEquals(dmr.metric_id, dmr2.metric_id)
291+
292 def test_get_distinct_count(self):
293 """Test get_distinct_count."""
294 dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
295- dr = distinct.DistinctResource(dmr)
296 t1 = time.time()
297 # add a day
298 t2 = time.time() + 60 * 60 * 24 + 1
299@@ -135,32 +246,31 @@
300 # Make sure we're using different buckets
301 self.assertNotEqual(b1, b2)
302 self._create_test_data_points("one", b1)
303- count = dr.get_distinct_count(t1, t2)
304+ count = dmr.get_distinct_count(t1, t2)
305 self.assertEqual(count, 1)
306 self._create_test_data_points("two", b1)
307- count = dr.get_distinct_count(t1, t2)
308+ count = dmr.get_distinct_count(t1, t2)
309 self.assertEqual(count, 2)
310 # Add it again, make sure the distinct is filtering it out
311 self._create_test_data_points("two", b1)
312- count = dr.get_distinct_count(t1, t2)
313+ count = dmr.get_distinct_count(t1, t2)
314 self.assertEqual(count, 2)
315 # Now we add to a newer bucket, but still within range
316 self._create_test_data_points("three", b2)
317- count = dr.get_distinct_count(t1, t2)
318+ count = dmr.get_distinct_count(t1, t2)
319 self.assertEqual(count, 3)
320 # Now to a older bucket, out of range
321 self._create_test_data_points("zero", b1 - 1)
322- count = dr.get_distinct_count(t1, t2)
323+ count = dmr.get_distinct_count(t1, t2)
324 self.assertEqual(count, 3)
325 # Now to a newer bucket, out of range
326 self._create_test_data_points("infinity", b2 + 1)
327- count = dr.get_distinct_count(t1, t2)
328+ count = dmr.get_distinct_count(t1, t2)
329 self.assertEqual(count, 3)
330
331 def test_get_distinct_top_value(self):
332 """Test get_distinct_top_value."""
333 dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
334- dr = distinct.DistinctResource(dmr)
335 t1 = time.time()
336 # add a day
337 t2 = time.time() + 60 * 60 * 24 + 1
338@@ -169,80 +279,24 @@
339 # Make sure we're using different buckets
340 self.assertNotEqual(b1, b2)
341 self._create_test_data_points("one", b1)
342- values = dr.get_distinct_top_value(t1, t2)
343+ values = dmr.get_distinct_top_value(t1, t2)
344 self.assertEqual(values, [("one", 1)])
345 self._create_test_data_points("two", b1)
346- values = dr.get_distinct_top_value(t1, t2)
347+ values = dmr.get_distinct_top_value(t1, t2)
348 self.assertEqual(values, [("one", 1), ("two", 1)])
349 self._create_test_data_points("one", b1)
350- values = dr.get_distinct_top_value(t1, t2)
351+ values = dmr.get_distinct_top_value(t1, t2)
352 self.assertEqual(values, [("one", 2), ("two", 1)])
353 # Create a third, and make "two" have 2 values so "three" is last
354 self._create_test_data_points("two", b1)
355 self._create_test_data_points("three", b1)
356- values = dr.get_distinct_top_value(t1, t2)
357+ values = dmr.get_distinct_top_value(t1, t2)
358 self.assertEqual(values, [("one", 2), ("two", 2), ("three", 1)])
359 # Only get the top 2
360- values = dr.get_distinct_top_value(t1, t2, how_many=2)
361+ values = dmr.get_distinct_top_value(t1, t2, how_many=2)
362 self.assertEqual(values, [("one", 2), ("two", 2)])
363
364
365-class TestPlugin(TestCase):
366-
367- def test_factory(self):
368- self.assertTrue(distinctdbplugin.distinct_metric_factory in \
369- list(getPlugins(IMetricFactory)))
370-
371-
372-class TestDatabase(TestCase):
373-
374- def setUp(self):
375- rootdir = check_output(["bzr", "root"]).strip()
376- dsn_file = os.path.join(rootdir, "tmp", "pg.dsn")
377- self.dsn = open(dsn_file).read()
378- self.conn = psycopg2.connect(self.dsn)
379-
380- def tearDown(self):
381- cr = self.conn.cursor()
382- cr.execute("rollback")
383- cr.execute("DELETE FROM paths")
384- cr.execute("DELETE FROM points")
385- cr.execute("commit")
386-
387- def test_connect(self):
388- cr = self.conn.cursor()
389- cr.execute("SELECT 0")
390- result = cr.fetchall()
391- self.assertTrue(result, [(0,)])
392-
393- def test_create_metric_id(self):
394- dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
395- dmr._save_bucket({}, 0)
396- cr = self.conn.cursor()
397- cr.execute("SELECT * FROM paths WHERE path = 'test'")
398- cr.execute("SELECT * FROM paths")
399- self.assertEquals(len(cr.fetchall()), 1)
400-
401- def test_find_saved_data(self):
402- dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
403- dmr.update("one")
404- dmr.update("one")
405- dmr.update("two")
406- dmr._save_bucket(dmr.bucket, 0)
407- cr = self.conn.cursor()
408- cr.execute("SELECT * FROM points ORDER BY value")
409- rows = cr.fetchall()
410- self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2),
411- (dmr.metric_id, 0, "two", 1)])
412-
413- def test_load_metric_id(self):
414- dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
415- dmr._save_bucket({}, 0)
416- dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn)
417- dmr2._save_bucket({}, 0)
418- self.assertEquals(dmr.metric_id, dmr2.metric_id)
419-
420-
421 class TestRedis(TestCase):
422
423 def setUp(self):

Subscribers

People subscribed via source and target branches