Merge lp:~lucio.torre/txstatsd/add-long-count into lp:~txstatsd-dev/txstatsd/distinct-plugin

Proposed by Lucio Torre
Status: Merged
Approved by: Lucio Torre
Approved revision: 14
Merged at revision: 14
Proposed branch: lp:~lucio.torre/txstatsd/add-long-count
Merge into: lp:~txstatsd-dev/txstatsd/distinct-plugin
Diff against target: 289 lines (+92/-48)
3 files modified
distinctdb/distinctmetric.py (+46/-29)
distinctdb/tests/test_distinct.py (+39/-15)
twisted/plugins/distinctdbplugin.py (+7/-4)
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-long-count
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Review via email: mp+112406@code.launchpad.net

Description of the change

now 60 day counts are pushed to graphite too.
the value only changes on startup and when the buckets are saved (nightly)

To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) wrote :

+1, renaming long_count to longterm_distinct_days.

review: Approve
14. By Lucio Torre

renamed longterm_distinct_[count|days]

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'distinctdb/distinctmetric.py'
2--- distinctdb/distinctmetric.py 2012-04-26 18:16:25 +0000
3+++ distinctdb/distinctmetric.py 2012-06-27 20:06:19 +0000
4@@ -52,9 +52,9 @@
5 resource.Resource.__init__(self)
6 self.reporter = reporter
7 self.putChild("top",
8- JSONMethodResource(self.reporter, "get_distinct_top_value"))
9+ JSONMethodResource(self.reporter, "_get_distinct_top_value"))
10 self.putChild("count",
11- JSONMethodResource(self.reporter, "get_distinct_count"))
12+ JSONMethodResource(self.reporter, "_get_distinct_count"))
13
14
15 class DistinctMetricReporter(object):
16@@ -67,7 +67,8 @@
17 periods = [5 * ONE_MINUTE, ONE_HOUR, ONE_DAY]
18
19 def __init__(self, name, wall_time_func=time.time, prefix="",
20- bucket_size=ONE_DAY, dsn=None, redis_host=None, redis_port=None):
21+ bucket_size=ONE_DAY, dsn=None, redis_host=None, redis_port=None,
22+ longterm_distinct_days=60):
23 """Construct a metric we expect to be periodically updated.
24
25 @param name: Indicates what is being instrumented.
26@@ -90,10 +91,15 @@
27 self.build_bucket()
28 self.redis_flush_lock = threading.Lock()
29 self.redis_count = {}
30+ self.longterm_distinct_count = None
31+ self.longterm_distinct_days = longterm_distinct_days
32
33 if redis_host != None:
34 self.redis = redis.client.Redis(host=redis_host, port=redis_port)
35
36+ if self.dsn is not None:
37+ reactor.callInThread(self._update_longterm_distinct)
38+
39 def build_bucket(self, timestamp=None):
40 self.max = 0
41 self.bucket = {}
42@@ -152,30 +158,6 @@
43 def count_1day(self):
44 return self.count(ONE_DAY)
45
46- def _save_bucket(self, bucket, bucket_no):
47- path = self.prefix + self.name
48- c = psycopg2.connect(self.dsn)
49- cr = c.cursor()
50- if self.metric_id is None:
51-
52- cr.execute("SELECT * FROM paths WHERE path = %s", (path,))
53- row = cr.fetchone()
54- if row is None:
55- cr.execute("INSERT INTO paths (path) VALUES (%s) "
56- "RETURNING (id)", (path,))
57- row = cr.fetchone()
58- cr.execute("commit")
59-
60- self.metric_id = row[0]
61-
62- for i, (k, v) in enumerate(bucket.iteritems()):
63- cr.execute("INSERT INTO points (path_id, bucket, value, count) "
64- "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
65- k, v))
66- if i % 1000 == 0:
67- cr.execute("commit")
68- cr.execute("commit")
69-
70 def save_bucket(self, bucket, bucket_no):
71 if self.dsn is not None:
72 reactor.callInThread(self._save_bucket, bucket, bucket_no)
73@@ -196,11 +178,46 @@
74 ".count_1hour": self.count_1hour(),
75 ".count_1day": self.count_1day(),
76 }
77+
78+ if self.longterm_distinct_count != None:
79+ items[".count_%dday" % (self.longterm_distinct_days,)] = \
80+ self.longterm_distinct_count
81+
82 for item, value in items.iteritems():
83 metrics.append((self.prefix + self.name + item, value, timestamp))
84 return metrics
85
86- def get_distinct_count(self, since, until):
87+ def _save_bucket(self, bucket, bucket_no):
88+ path = self.prefix + self.name
89+ c = psycopg2.connect(self.dsn)
90+ cr = c.cursor()
91+ if self.metric_id is None:
92+
93+ cr.execute("SELECT * FROM paths WHERE path = %s", (path,))
94+ row = cr.fetchone()
95+ if row is None:
96+ cr.execute("INSERT INTO paths (path) VALUES (%s) "
97+ "RETURNING (id)", (path,))
98+ row = cr.fetchone()
99+ cr.execute("commit")
100+
101+ self.metric_id = row[0]
102+
103+ for i, (k, v) in enumerate(bucket.iteritems()):
104+ cr.execute("INSERT INTO points (path_id, bucket, value, count) "
105+ "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
106+ k, v))
107+ if i % 1000 == 0:
108+ cr.execute("commit")
109+ cr.execute("commit")
110+ self._update_longterm_distinct()
111+
112+ def _update_longterm_distinct(self):
113+ now = self.wall_time_func()
114+ self.longterm_distinct_count = self._get_distinct_count(
115+ now - self.longterm_distinct_days * ONE_DAY, now)
116+
117+ def _get_distinct_count(self, since, until):
118 """Get a distinct count for a path between certain ranges."""
119 since_bucket = self.get_bucket_no(since)
120 until_bucket = self.get_bucket_no(until)
121@@ -214,7 +231,7 @@
122 row = cr.fetchone()
123 return row[0]
124
125- def get_distinct_top_value(self, since, until, how_many=20):
126+ def _get_distinct_top_value(self, since, until, how_many=20):
127 """Get the top distinct values for a path between certain ranges."""
128 since_bucket = self.get_bucket_no(since)
129 until_bucket = self.get_bucket_no(until)
130
131=== modified file 'distinctdb/tests/test_distinct.py'
132--- distinctdb/tests/test_distinct.py 2012-04-26 18:16:25 +0000
133+++ distinctdb/tests/test_distinct.py 2012-06-27 20:06:19 +0000
134@@ -44,11 +44,11 @@
135 self.called.append(("get_foo", ()))
136 return "foo"
137
138- def get_distinct_count(self, since, until):
139+ def _get_distinct_count(self, since, until):
140 self.called.append(("get_distinct_count", (since, until)))
141 return 42
142
143- def get_distinct_top_value(self, since, until, how_many=20):
144+ def _get_distinct_top_value(self, since, until, how_many=20):
145 self.called.append(("get_distinct_top_value", (since, until, how_many)))
146 return [("one", 1), ("two", 1)]
147
148@@ -119,13 +119,14 @@
149 o = TestOptions()
150 config_file = ConfigParser.RawConfigParser()
151 config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
152- "dsn = dbdsn\nbucket_size = 100"))
153+ "dsn = dbdsn\nbucket_size = 100\nlongterm_distinct_days = 30"))
154 o.configure(config_file)
155 dmf = distinctdbplugin.DistinctMetricFactory()
156 dmf.configure(o)
157 dmr = dmf.build_metric("foo", "bar", time.time)
158 self.assertEquals(dmr.bucket_size, 100)
159 self.assertEquals(dmr.dsn, "dbdsn")
160+ self.assertEquals(dmr.longterm_distinct_days, 30)
161
162
163 class TestJSONMethodResource(TestCase):
164@@ -231,7 +232,7 @@
165
166 class ErrorDummyReporter(object):
167
168- def get_distinct_count(self, *args, **kweargs):
169+ def _get_distinct_count(self, *args, **kweargs):
170 raise Exception("die!")
171
172
173@@ -376,26 +377,26 @@
174 # Make sure we're using different buckets
175 self.assertNotEqual(b1, b2)
176 self._create_test_data_points("one", b1)
177- count = dmr.get_distinct_count(t1, t2)
178+ count = dmr._get_distinct_count(t1, t2)
179 self.assertEqual(count, 1)
180 self._create_test_data_points("two", b1)
181- count = dmr.get_distinct_count(t1, t2)
182+ count = dmr._get_distinct_count(t1, t2)
183 self.assertEqual(count, 2)
184 # Add it again, make sure the distinct is filtering it out
185 self._create_test_data_points("two", b1)
186- count = dmr.get_distinct_count(t1, t2)
187+ count = dmr._get_distinct_count(t1, t2)
188 self.assertEqual(count, 2)
189 # Now we add to a newer bucket, but still within range
190 self._create_test_data_points("three", b2)
191- count = dmr.get_distinct_count(t1, t2)
192+ count = dmr._get_distinct_count(t1, t2)
193 self.assertEqual(count, 3)
194 # Now to a older bucket, out of range
195 self._create_test_data_points("zero", b1 - 1)
196- count = dmr.get_distinct_count(t1, t2)
197+ count = dmr._get_distinct_count(t1, t2)
198 self.assertEqual(count, 3)
199 # Now to a newer bucket, out of range
200 self._create_test_data_points("infinity", b2 + 1)
201- count = dmr.get_distinct_count(t1, t2)
202+ count = dmr._get_distinct_count(t1, t2)
203 self.assertEqual(count, 3)
204
205 def test_get_distinct_top_value(self):
206@@ -409,23 +410,46 @@
207 # Make sure we're using different buckets
208 self.assertNotEqual(b1, b2)
209 self._create_test_data_points("one", b1)
210- values = dmr.get_distinct_top_value(t1, t2)
211+ values = dmr._get_distinct_top_value(t1, t2)
212 self.assertEqual(values, [("one", 1)])
213 self._create_test_data_points("two", b1)
214- values = dmr.get_distinct_top_value(t1, t2)
215+ values = dmr._get_distinct_top_value(t1, t2)
216 self.assertEqual(values, [("one", 1), ("two", 1)])
217 self._create_test_data_points("one", b1)
218- values = dmr.get_distinct_top_value(t1, t2)
219+ values = dmr._get_distinct_top_value(t1, t2)
220 self.assertEqual(values, [("one", 2), ("two", 1)])
221 # Create a third, and make "two" have 2 values so "three" is last
222 self._create_test_data_points("two", b1)
223 self._create_test_data_points("three", b1)
224- values = dmr.get_distinct_top_value(t1, t2)
225+ values = dmr._get_distinct_top_value(t1, t2)
226 self.assertEqual(values, [("one", 2), ("two", 2), ("three", 1)])
227 # Only get the top 2
228- values = dmr.get_distinct_top_value(t1, t2, how_many=2)
229+ values = dmr._get_distinct_top_value(t1, t2, how_many=2)
230 self.assertEqual(values, [("one", 2), ("two", 2)])
231
232+ def test_count_long_periods(self):
233+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
234+ dmr.update("one")
235+ dmr.update("one")
236+ dmr.update("two")
237+ dmr._save_bucket(dmr.bucket, dmr.get_bucket_no() - 2)
238+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
239+ # this should be running in bg now too
240+ # but run this way we ensure it has finished before the assert
241+ dmr._update_longterm_distinct()
242+ # value is loaded on startup
243+ self.assertEquals(dmr.longterm_distinct_count, 2)
244+ dmr.update("one")
245+ dmr.update("three")
246+ dmr._save_bucket(dmr.bucket, dmr.get_bucket_no() - 1)
247+ # value is updated on save_bucket
248+ self.assertEquals(dmr.longterm_distinct_count, 3)
249+ # reset bucket
250+ dmr.build_bucket()
251+ now = dmr.wall_time_func()
252+ result = dmr.flush(0, now)
253+ self.assertTrue(("test.count_60day", 3, now) in result, result)
254+
255
256 class TestRedis(TestCase):
257
258
259=== modified file 'twisted/plugins/distinctdbplugin.py'
260--- twisted/plugins/distinctdbplugin.py 2011-12-15 20:19:11 +0000
261+++ twisted/plugins/distinctdbplugin.py 2012-06-27 20:06:19 +0000
262@@ -14,13 +14,14 @@
263 bucket_size = None
264 dsn = None
265 metric_ids = None
266+ longterm_distinct_days = None
267
268 def build_metric(self, prefix, name, wall_time_func=None):
269 return DistinctMetricReporter(name, prefix=prefix,
270- wall_time_func=wall_time_func,
271- bucket_size=self.bucket_size,
272- dsn=self.dsn, redis_host=self.redis_host,
273- redis_port=self.redis_port)
274+ wall_time_func=wall_time_func, bucket_size=self.bucket_size,
275+ dsn=self.dsn, redis_host=self.redis_host,
276+ redis_port=self.redis_port,
277+ longterm_distinct_days=self.longterm_distinct_days)
278
279 def configure(self, options):
280 self.section = dict(options.get("plugin_distinctdb", {}))
281@@ -30,6 +31,8 @@
282 self.bucket_size = ONE_DAY
283
284 self.dsn = self.section.get("dsn", None)
285+ self.longterm_distinct_days = int(
286+ self.section.get("longterm_distinct_days", 60))
287 self.redis_host = self.section.get("redis_host", None)
288 self.redis_port = self.section.get("redis_port", None)
289 if self.redis_port is not None:

Subscribers

People subscribed via source and target branches