Merge lp:~sidnei/txstatsd/distinct-plugin-webstats into lp:~beuno/txstatsd/distinct-plugin-webstats
- distinct-plugin-webstats
- Merge into distinct-plugin-webstats
Proposed by
Sidnei da Silva
Status: | Merged |
---|---|
Merged at revision: | 13 |
Proposed branch: | lp:~sidnei/txstatsd/distinct-plugin-webstats |
Merge into: | lp:~beuno/txstatsd/distinct-plugin-webstats |
Diff against target: |
423 lines (+183/-115) 2 files modified
distinctdb/distinctmetric.py (+60/-46) distinctdb/tests/test_distinct.py (+123/-69) |
To merge this branch: | bzr merge lp:~sidnei/txstatsd/distinct-plugin-webstats |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Martin Albisetti | Pending | ||
Review via email: mp+97457@code.launchpad.net |
Commit message
Description of the change
Refactor a bit so that top/count are sub urls of the DistinctMetricR
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'distinctdb/distinctmetric.py' |
2 | --- distinctdb/distinctmetric.py 2012-03-12 23:18:39 +0000 |
3 | +++ distinctdb/distinctmetric.py 2012-03-14 16:56:19 +0000 |
4 | @@ -1,13 +1,13 @@ |
5 | import json |
6 | import time |
7 | import threading |
8 | +from functools import partial |
9 | |
10 | import psycopg2 |
11 | import redis |
12 | |
13 | from zope.interface import implements |
14 | -from twisted.internet import reactor |
15 | -from twisted.internet.task import deferLater |
16 | +from twisted.internet import reactor, threads |
17 | from twisted.web import server, resource |
18 | from txstatsd.itxstatsd import IMetric |
19 | |
20 | @@ -16,56 +16,40 @@ |
21 | ONE_DAY = 24 * ONE_HOUR |
22 | |
23 | |
24 | +class JSONMethodResource(resource.Resource): |
25 | + """Renders the result of calling C{name} on C{target} as a json result.""" |
26 | + isLeaf = True |
27 | + |
28 | + def __init__(self, target, name): |
29 | + self.target = target |
30 | + self.name = name |
31 | + |
32 | + def _render_json_error(self, request, result): |
33 | + """Fetch the data, make the request return""" |
34 | + request.write(json.dumps({"error": result.getErrorMessage()})) |
35 | + request.finish() |
36 | + |
37 | + def _render_json_result(self, request, result): |
38 | + """Fetch the data, make the request return""" |
39 | + request.write(json.dumps({"result": result})) |
40 | + request.finish() |
41 | + |
42 | + def render_GET(self, request): |
43 | + """Return the web request asynchronously.""" |
44 | + d = threads.deferToThread(getattr(self.target, self.name), **request.args) |
45 | + d.addCallback(partial(self._render_json_result, request)) |
46 | + d.addErrback(partial(self._render_json_error, request)) |
47 | + return server.NOT_DONE_YET |
48 | + |
49 | + |
50 | class DistinctResource(resource.Resource): |
51 | """Returns statistics about unique values.""" |
52 | - isLeaf = True |
53 | |
54 | def __init__(self, reporter): |
55 | resource.Resource.__init__(self) |
56 | self.reporter = reporter |
57 | - |
58 | - def _return_request(self, request): |
59 | - """Fetch the data, make the request return""" |
60 | - #XXX: No idea how to properly distinguish which method to call |
61 | - data = self.get_distinct_count(request.args['since'], |
62 | - request.args['until']) |
63 | - request.write(json.dumps(data)) |
64 | - request.finish() |
65 | - |
66 | - def render_GET(self, request): |
67 | - """Return the web request asynchronously.""" |
68 | - d = deferLater(reactor, 5, lambda: request) |
69 | - d.addCallback(self._return_request) |
70 | - return server.NOT_DONE_YET |
71 | - |
72 | - def get_distinct_count(self, since, until): |
73 | - """Get a distinct count for a path between certain ranges.""" |
74 | - since_bucket = self.reporter.get_bucket_no(since) |
75 | - until_bucket = self.reporter.get_bucket_no(until) |
76 | - path = self.reporter.prefix + self.reporter.name |
77 | - c = psycopg2.connect(self.reporter.dsn) |
78 | - cr = c.cursor() |
79 | - cr.execute("SELECT COUNT(DISTINCT value) FROM points " |
80 | - "INNER JOIN paths ON (paths.id = points.path_id) " |
81 | - "WHERE paths.path = %s AND bucket BETWEEN %s AND %s", ( |
82 | - path, since_bucket, until_bucket,)) |
83 | - row = cr.fetchone() |
84 | - return row[0] |
85 | - |
86 | - def get_distinct_top_value(self, since, until, how_many=20): |
87 | - """Get the top distinct values for a path between certain ranges.""" |
88 | - since_bucket = self.reporter.get_bucket_no(since) |
89 | - until_bucket = self.reporter.get_bucket_no(until) |
90 | - path = self.reporter.prefix + self.reporter.name |
91 | - c = psycopg2.connect(self.reporter.dsn) |
92 | - cr = c.cursor() |
93 | - cr.execute("SELECT value, COUNT(value) AS cnt FROM points " |
94 | - "INNER JOIN paths ON (paths.id = points.path_id) " |
95 | - "WHERE paths.path = %s AND bucket BETWEEN %s AND %s" |
96 | - "GROUP BY value ORDER BY cnt DESC LIMIT %s", ( |
97 | - path, since_bucket, until_bucket, how_many,)) |
98 | - rows = cr.fetchall() |
99 | - return rows |
100 | + self.putChild("top", JSONMethodResource(self.reporter, "get_distinct_top_value")) |
101 | + self.putChild("count", JSONMethodResource(self.reporter, "get_distinct_count")) |
102 | |
103 | |
104 | class DistinctMetricReporter(object): |
105 | @@ -209,3 +193,33 @@ |
106 | for item, value in items.iteritems(): |
107 | metrics.append((self.prefix + self.name + item, value, timestamp)) |
108 | return metrics |
109 | + |
110 | + def get_distinct_count(self, since, until): |
111 | + """Get a distinct count for a path between certain ranges.""" |
112 | + since_bucket = self.get_bucket_no(since) |
113 | + until_bucket = self.get_bucket_no(until) |
114 | + path = self.prefix + self.name |
115 | + c = psycopg2.connect(self.dsn) |
116 | + cr = c.cursor() |
117 | + cr.execute("SELECT COUNT(DISTINCT value) FROM points " |
118 | + "INNER JOIN paths ON (paths.id = points.path_id) " |
119 | + "WHERE paths.path = %s AND bucket BETWEEN %s AND %s", ( |
120 | + path, since_bucket, until_bucket,)) |
121 | + row = cr.fetchone() |
122 | + return row[0] |
123 | + |
124 | + def get_distinct_top_value(self, since, until, how_many=20): |
125 | + """Get the top distinct values for a path between certain ranges.""" |
126 | + since_bucket = self.get_bucket_no(since) |
127 | + until_bucket = self.get_bucket_no(until) |
128 | + path = self.prefix + self.name |
129 | + c = psycopg2.connect(self.dsn) |
130 | + cr = c.cursor() |
131 | + cr.execute("SELECT value, COUNT(value) AS cnt FROM points " |
132 | + "INNER JOIN paths ON (paths.id = points.path_id) " |
133 | + "WHERE paths.path = %s AND bucket BETWEEN %s AND %s" |
134 | + "GROUP BY value ORDER BY cnt DESC LIMIT %s", ( |
135 | + path, since_bucket, until_bucket, how_many,)) |
136 | + rows = cr.fetchall() |
137 | + return rows |
138 | + |
139 | |
140 | === modified file 'distinctdb/tests/test_distinct.py' |
141 | --- distinctdb/tests/test_distinct.py 2012-03-12 22:36:36 +0000 |
142 | +++ distinctdb/tests/test_distinct.py 2012-03-14 16:56:19 +0000 |
143 | @@ -1,10 +1,12 @@ |
144 | # Copyright (C) 2011 Canonical |
145 | # All Rights Reserved |
146 | |
147 | +import json |
148 | import ConfigParser |
149 | from cStringIO import StringIO |
150 | import os |
151 | import time |
152 | + |
153 | try: |
154 | from subprocess import check_output |
155 | except ImportError: |
156 | @@ -21,12 +23,32 @@ |
157 | from twisted.internet import reactor |
158 | from twisted.plugin import getPlugins |
159 | from twisted.plugins import distinctdbplugin |
160 | +from twisted.web.test.test_web import DummyRequest |
161 | + |
162 | from txstatsd.itxstatsd import IMetricFactory |
163 | from txstatsd import service |
164 | |
165 | from distinctdb import distinctmetric as distinct |
166 | |
167 | |
168 | +class DummyReporter(object): |
169 | + |
170 | + def __init__(self): |
171 | + self.called = [] |
172 | + |
173 | + def get_foo(self): |
174 | + self.called.append(("get_foo", ())) |
175 | + return "foo" |
176 | + |
177 | + def get_distinct_count(self, since, until): |
178 | + self.called.append(("get_distinct_count", (since, until))) |
179 | + return 42 |
180 | + |
181 | + def get_distinct_top_value(self, since, until, how_many=20): |
182 | + self.called.append(("get_distinct_top_value", (since, until, how_many))) |
183 | + return [("one", 1), ("two", 1)] |
184 | + |
185 | + |
186 | class TestDistinctMetricReporter(TestCase): |
187 | |
188 | def test_get_bucket_no(self): |
189 | @@ -102,8 +124,62 @@ |
190 | self.assertEquals(dmr.dsn, "dbdsn") |
191 | |
192 | |
193 | +class TestJSONMethodResource(TestCase): |
194 | + |
195 | + def test_render_result_as_json(self): |
196 | + reporter = DummyReporter() |
197 | + request = DummyRequest([]) |
198 | + resource = distinct.JSONMethodResource(reporter, "get_foo") |
199 | + def check(result): |
200 | + self.assertEquals({"result": "foo"}, |
201 | + json.loads("".join(request.written))) |
202 | + d = request.notifyFinish() |
203 | + d.addCallback(check) |
204 | + d.addErrback(self.fail) |
205 | + request.render(resource) |
206 | + return d |
207 | + |
208 | + |
209 | class TestDistinctResource(TestCase): |
210 | |
211 | + def test_render_top_resource(self): |
212 | + reporter = DummyReporter() |
213 | + request = DummyRequest([]) |
214 | + request.args = {"since": time.time(), "until": time.time() + 1} |
215 | + resource = distinct.DistinctResource(reporter) |
216 | + child_resource = resource.getChildWithDefault("top", request) |
217 | + def check(result): |
218 | + self.assertEquals(json.dumps({"result": [("one", 1), ("two", 1)]}), |
219 | + "".join(request.written)) |
220 | + d = request.notifyFinish() |
221 | + d.addCallback(check) |
222 | + request.render(child_resource) |
223 | + return d |
224 | + |
225 | + def test_render_count_resource(self): |
226 | + reporter = DummyReporter() |
227 | + request = DummyRequest([]) |
228 | + request.args = {"since": time.time(), "until": time.time() + 1} |
229 | + resource = distinct.DistinctResource(reporter) |
230 | + child_resource = resource.getChildWithDefault("count", request) |
231 | + def check(result): |
232 | + self.assertEquals(json.dumps({"result": 42}), |
233 | + "".join(request.written)) |
234 | + d = request.notifyFinish() |
235 | + d.addCallback(check) |
236 | + request.render(child_resource) |
237 | + return d |
238 | + |
239 | + |
240 | +class TestPlugin(TestCase): |
241 | + |
242 | + def test_factory(self): |
243 | + self.assertTrue(distinctdbplugin.distinct_metric_factory in \ |
244 | + list(getPlugins(IMetricFactory))) |
245 | + |
246 | + |
247 | +class TestDatabase(TestCase): |
248 | + |
249 | def setUp(self): |
250 | rootdir = check_output(["bzr", "root"]).strip() |
251 | dsn_file = os.path.join(rootdir, "tmp", "pg.dsn") |
252 | @@ -123,10 +199,45 @@ |
253 | dmr.update(name) |
254 | dmr._save_bucket(dmr.bucket, bucket_no) |
255 | |
256 | + |
257 | +class TestDatabaseMetricStorage(TestDatabase): |
258 | + |
259 | + def test_connect(self): |
260 | + cr = self.conn.cursor() |
261 | + cr.execute("SELECT 0") |
262 | + result = cr.fetchall() |
263 | + self.assertTrue(result, [(0,)]) |
264 | + |
265 | + def test_create_metric_id(self): |
266 | + dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
267 | + dmr._save_bucket({}, 0) |
268 | + cr = self.conn.cursor() |
269 | + cr.execute("SELECT * FROM paths WHERE path = 'test'") |
270 | + cr.execute("SELECT * FROM paths") |
271 | + self.assertEquals(len(cr.fetchall()), 1) |
272 | + |
273 | + def test_find_saved_data(self): |
274 | + dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
275 | + dmr.update("one") |
276 | + dmr.update("one") |
277 | + dmr.update("two") |
278 | + dmr._save_bucket(dmr.bucket, 0) |
279 | + cr = self.conn.cursor() |
280 | + cr.execute("SELECT * FROM points ORDER BY value") |
281 | + rows = cr.fetchall() |
282 | + self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2), |
283 | + (dmr.metric_id, 0, "two", 1)]) |
284 | + |
285 | + def test_load_metric_id(self): |
286 | + dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
287 | + dmr._save_bucket({}, 0) |
288 | + dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
289 | + dmr2._save_bucket({}, 0) |
290 | + self.assertEquals(dmr.metric_id, dmr2.metric_id) |
291 | + |
292 | def test_get_distinct_count(self): |
293 | """Test get_distinct_count.""" |
294 | dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
295 | - dr = distinct.DistinctResource(dmr) |
296 | t1 = time.time() |
297 | # add a day |
298 | t2 = time.time() + 60 * 60 * 24 + 1 |
299 | @@ -135,32 +246,31 @@ |
300 | # Make sure we're using different buckets |
301 | self.assertNotEqual(b1, b2) |
302 | self._create_test_data_points("one", b1) |
303 | - count = dr.get_distinct_count(t1, t2) |
304 | + count = dmr.get_distinct_count(t1, t2) |
305 | self.assertEqual(count, 1) |
306 | self._create_test_data_points("two", b1) |
307 | - count = dr.get_distinct_count(t1, t2) |
308 | + count = dmr.get_distinct_count(t1, t2) |
309 | self.assertEqual(count, 2) |
310 | # Add it again, make sure the distinct is filtering it out |
311 | self._create_test_data_points("two", b1) |
312 | - count = dr.get_distinct_count(t1, t2) |
313 | + count = dmr.get_distinct_count(t1, t2) |
314 | self.assertEqual(count, 2) |
315 | # Now we add to a newer bucket, but still within range |
316 | self._create_test_data_points("three", b2) |
317 | - count = dr.get_distinct_count(t1, t2) |
318 | + count = dmr.get_distinct_count(t1, t2) |
319 | self.assertEqual(count, 3) |
320 | # Now to a older bucket, out of range |
321 | self._create_test_data_points("zero", b1 - 1) |
322 | - count = dr.get_distinct_count(t1, t2) |
323 | + count = dmr.get_distinct_count(t1, t2) |
324 | self.assertEqual(count, 3) |
325 | # Now to a newer bucket, out of range |
326 | self._create_test_data_points("infinity", b2 + 1) |
327 | - count = dr.get_distinct_count(t1, t2) |
328 | + count = dmr.get_distinct_count(t1, t2) |
329 | self.assertEqual(count, 3) |
330 | |
331 | def test_get_distinct_top_value(self): |
332 | """Test get_distinct_top_value.""" |
333 | dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
334 | - dr = distinct.DistinctResource(dmr) |
335 | t1 = time.time() |
336 | # add a day |
337 | t2 = time.time() + 60 * 60 * 24 + 1 |
338 | @@ -169,80 +279,24 @@ |
339 | # Make sure we're using different buckets |
340 | self.assertNotEqual(b1, b2) |
341 | self._create_test_data_points("one", b1) |
342 | - values = dr.get_distinct_top_value(t1, t2) |
343 | + values = dmr.get_distinct_top_value(t1, t2) |
344 | self.assertEqual(values, [("one", 1)]) |
345 | self._create_test_data_points("two", b1) |
346 | - values = dr.get_distinct_top_value(t1, t2) |
347 | + values = dmr.get_distinct_top_value(t1, t2) |
348 | self.assertEqual(values, [("one", 1), ("two", 1)]) |
349 | self._create_test_data_points("one", b1) |
350 | - values = dr.get_distinct_top_value(t1, t2) |
351 | + values = dmr.get_distinct_top_value(t1, t2) |
352 | self.assertEqual(values, [("one", 2), ("two", 1)]) |
353 | # Create a third, and make "two" have 2 values so "three" is last |
354 | self._create_test_data_points("two", b1) |
355 | self._create_test_data_points("three", b1) |
356 | - values = dr.get_distinct_top_value(t1, t2) |
357 | + values = dmr.get_distinct_top_value(t1, t2) |
358 | self.assertEqual(values, [("one", 2), ("two", 2), ("three", 1)]) |
359 | # Only get the top 2 |
360 | - values = dr.get_distinct_top_value(t1, t2, how_many=2) |
361 | + values = dmr.get_distinct_top_value(t1, t2, how_many=2) |
362 | self.assertEqual(values, [("one", 2), ("two", 2)]) |
363 | |
364 | |
365 | -class TestPlugin(TestCase): |
366 | - |
367 | - def test_factory(self): |
368 | - self.assertTrue(distinctdbplugin.distinct_metric_factory in \ |
369 | - list(getPlugins(IMetricFactory))) |
370 | - |
371 | - |
372 | -class TestDatabase(TestCase): |
373 | - |
374 | - def setUp(self): |
375 | - rootdir = check_output(["bzr", "root"]).strip() |
376 | - dsn_file = os.path.join(rootdir, "tmp", "pg.dsn") |
377 | - self.dsn = open(dsn_file).read() |
378 | - self.conn = psycopg2.connect(self.dsn) |
379 | - |
380 | - def tearDown(self): |
381 | - cr = self.conn.cursor() |
382 | - cr.execute("rollback") |
383 | - cr.execute("DELETE FROM paths") |
384 | - cr.execute("DELETE FROM points") |
385 | - cr.execute("commit") |
386 | - |
387 | - def test_connect(self): |
388 | - cr = self.conn.cursor() |
389 | - cr.execute("SELECT 0") |
390 | - result = cr.fetchall() |
391 | - self.assertTrue(result, [(0,)]) |
392 | - |
393 | - def test_create_metric_id(self): |
394 | - dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
395 | - dmr._save_bucket({}, 0) |
396 | - cr = self.conn.cursor() |
397 | - cr.execute("SELECT * FROM paths WHERE path = 'test'") |
398 | - cr.execute("SELECT * FROM paths") |
399 | - self.assertEquals(len(cr.fetchall()), 1) |
400 | - |
401 | - def test_find_saved_data(self): |
402 | - dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
403 | - dmr.update("one") |
404 | - dmr.update("one") |
405 | - dmr.update("two") |
406 | - dmr._save_bucket(dmr.bucket, 0) |
407 | - cr = self.conn.cursor() |
408 | - cr.execute("SELECT * FROM points ORDER BY value") |
409 | - rows = cr.fetchall() |
410 | - self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2), |
411 | - (dmr.metric_id, 0, "two", 1)]) |
412 | - |
413 | - def test_load_metric_id(self): |
414 | - dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
415 | - dmr._save_bucket({}, 0) |
416 | - dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn) |
417 | - dmr2._save_bucket({}, 0) |
418 | - self.assertEquals(dmr.metric_id, dmr2.metric_id) |
419 | - |
420 | - |
421 | class TestRedis(TestCase): |
422 | |
423 | def setUp(self): |