Merge lp:~sidnei/txstatsd/add-rate-limit into lp:~txstatsd-dev/txstatsd/distinct-plugin

Proposed by Sidnei da Silva
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 15
Merged at revision: 16
Proposed branch: lp:~sidnei/txstatsd/add-rate-limit
Merge into: lp:~txstatsd-dev/txstatsd/distinct-plugin
Diff against target: 510 lines (+212/-55)
10 files modified
.bzrignore (+3/-0)
Makefile (+11/-9)
bin/start-database.sh (+12/-8)
bin/start-redis.sh (+1/-2)
bin/stop-database.sh (+11/-8)
bin/stop-redis.sh (+2/-2)
distinctdb/distinctmetric.py (+23/-14)
distinctdb/ratelimit.py (+49/-0)
distinctdb/tests/test_distinct.py (+21/-12)
distinctdb/tests/test_ratelimit.py (+79/-0)
To merge this branch: bzr merge lp:~sidnei/txstatsd/add-rate-limit
Reviewer Review Type Date Requested Status
Lucio Torre (community) Approve
Review via email: mp+197457@code.launchpad.net

Commit message

Add a simple rate limit implementation to keep rate-limiting stats in redis.

Description of the change

Add a simple rate limit implementation to keep rate-limiting stats in redis.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file '.bzrignore'
2--- .bzrignore 1970-01-01 00:00:00 +0000
3+++ .bzrignore 2013-12-03 12:47:43 +0000
4@@ -0,0 +1,3 @@
5+twisted/plugins/dropin.cache
6+tmp
7+_trial_temp
8
9=== modified file 'Makefile'
10--- Makefile 2011-12-15 20:19:11 +0000
11+++ Makefile 2013-12-03 12:47:43 +0000
12@@ -1,26 +1,28 @@
13
14 trial:
15- trial distinctdb/
16+ @trial distinctdb/
17
18-start-database:
19- ./bin/start-database.sh
20- psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
21+start-database:
22+ @./bin/start-database.sh
23+ @psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
24
25 stop-database:
26- ./bin/stop-database.sh
27+ @./bin/stop-database.sh
28
29 start-redis:
30- ./bin/start-redis.sh
31+ @./bin/start-redis.sh
32
33 stop-redis:
34- ./bin/stop-redis.sh
35+ @./bin/stop-redis.sh
36
37 start: start-database start-redis
38
39 stop: stop-redis stop-database
40
41 clean:
42- rm -rf ./tmp/
43-test: start trial stop clean
44+ @rm -rf ./tmp/
45+
46+test: start
47+ @$(MAKE) trial stop clean || ($(MAKE) stop clean; false)
48
49 .PHONY: trial test start-database stop-database start-redis stop-redis start stop
50
51=== modified file 'bin/start-database.sh'
52--- bin/start-database.sh 2012-04-26 18:16:25 +0000
53+++ bin/start-database.sh 2013-12-03 12:47:43 +0000
54@@ -1,4 +1,6 @@
55-#! /bin/bash
56+#!/usr/bin/env bash
57+
58+set -u
59
60 ROOTDIR=${ROOTDIR:-`bzr root`}
61 if [ ! -d "$ROOTDIR" ]; then
62@@ -12,6 +14,7 @@
63
64 function setup_database() {
65 local TESTDIR=$1
66+ local PGBINDIR=
67
68 echo "## Starting postgres in $TESTDIR ##"
69 mkdir -p "$TESTDIR/data"
70@@ -19,14 +22,15 @@
71
72 export PGHOST="$TESTDIR"
73 export PGDATA="$TESTDIR/data"
74- if [ -d /usr/lib/postgresql/8.4 ]; then
75- export PGBINDIR=/usr/lib/postgresql/8.4/bin
76- elif [ -d /usr/lib/postgresql/8.3 ]; then
77- export PGBINDIR=/usr/lib/postgresql/8.3/bin
78- elif [ -d /usr/lib/postgresql/9.1 ]; then
79- export PGBINDIR=/usr/lib/postgresql/9.1/bin
80- else
81+ for pgver in 9.1 8.4; do
82+ if [ -d /usr/lib/postgresql/$pgver ]; then
83+ PGBINDIR=/usr/lib/postgresql/$pgver/bin
84+ break
85+ fi
86+ done
87+ if [ -z "$PGBINDIR" ]; then
88 echo "Cannot find valid parent for PGBINDIR"
89+ exit 1
90 fi
91 $PGBINDIR/initdb -E UNICODE -D $PGDATA
92 # set up the database options file
93
94=== modified file 'bin/start-redis.sh'
95--- bin/start-redis.sh 2011-12-16 00:27:45 +0000
96+++ bin/start-redis.sh 2013-12-03 12:47:43 +0000
97@@ -1,5 +1,4 @@
98-#!/bin/bash
99+#!/usr/bin/env bash
100
101 mkdir -p tmp/redis
102 /sbin/start-stop-daemon --start -b -m -d . -p tmp/redis.pid --exec /usr/bin/redis-server -- `pwd`/bin/redis.conf
103-
104
105=== modified file 'bin/stop-database.sh'
106--- bin/stop-database.sh 2012-04-26 18:16:25 +0000
107+++ bin/stop-database.sh 2013-12-03 12:47:43 +0000
108@@ -1,4 +1,6 @@
109-#! /bin/bash
110+#!/usr/bin/env bash
111+
112+set -u
113
114 ROOTDIR=${ROOTDIR:-`bzr root`}
115 if [ ! -d "$ROOTDIR" ]; then
116@@ -6,14 +8,15 @@
117 exit 1
118 fi
119
120-if [ -d /usr/lib/postgresql/8.4 ]; then
121- PGBINDIR=/usr/lib/postgresql/8.4/bin
122-elif [ -d /usr/lib/postgresql/8.3 ]; then
123- PGBINDIR=/usr/lib/postgresql/8.3/bin
124-elif [ -d /usr/lib/postgresql/9.1 ]; then
125- PGBINDIR=/usr/lib/postgresql/9.1/bin
126-else
127+for pgver in 9.1 8.4; do
128+ if [ -d /usr/lib/postgresql/$pgver ]; then
129+ PGBINDIR=/usr/lib/postgresql/$pgver/bin
130+ break
131+ fi
132+done
133+if [ -z "$PGBINDIR" ]; then
134 echo "Cannot find valid parent for PGBINDIR"
135+ exit 1
136 fi
137
138 # setting PGDATA tells pg_ctl which DB to talk to
139
140=== modified file 'bin/stop-redis.sh'
141--- bin/stop-redis.sh 2011-12-16 00:27:45 +0000
142+++ bin/stop-redis.sh 2013-12-03 12:47:43 +0000
143@@ -1,3 +1,3 @@
144-#!/bin/bash
145+#!/usr/bin/env bash
146
147-/sbin/start-stop-daemon --stop -p tmp/redis.pid --exec /usr/bin/redis-server -- `pwd`/bin/redis.conf
148+/sbin/start-stop-daemon --oknodo --stop -p tmp/redis.pid --exec /usr/bin/redis-server -- `pwd`/bin/redis.conf
149
150=== modified file 'distinctdb/distinctmetric.py'
151--- distinctdb/distinctmetric.py 2012-06-27 20:04:53 +0000
152+++ distinctdb/distinctmetric.py 2013-12-03 12:47:43 +0000
153@@ -6,6 +6,7 @@
154 import psycopg2
155 import redis
156
157+from .ratelimit import RateLimiter
158 from zope.interface import implements
159 from twisted.internet import reactor, threads
160 from twisted.web import server, resource
161@@ -15,6 +16,10 @@
162 ONE_HOUR = 60 * ONE_MINUTE
163 ONE_DAY = 24 * ONE_HOUR
164
165+MINUTE_LIMITER = RateLimiter(span=ONE_MINUTE, bucket_size=10)
166+HOUR_LIMITER = RateLimiter(span=ONE_HOUR, bucket_size=10 * ONE_MINUTE)
167+DAY_LIMITER = RateLimiter(span=ONE_DAY, bucket_size=3 * ONE_HOUR)
168+
169
170 class JSONMethodResource(resource.Resource):
171 """Renders the result of calling C{name} on C{target} as a json result."""
172@@ -51,10 +56,10 @@
173 def __init__(self, reporter):
174 resource.Resource.__init__(self)
175 self.reporter = reporter
176- self.putChild("top",
177- JSONMethodResource(self.reporter, "_get_distinct_top_value"))
178- self.putChild("count",
179- JSONMethodResource(self.reporter, "_get_distinct_count"))
180+ self.putChild("top", JSONMethodResource(self.reporter,
181+ "_get_distinct_top_value"))
182+ self.putChild("count", JSONMethodResource(self.reporter,
183+ "_get_distinct_count"))
184
185
186 class DistinctMetricReporter(object):
187@@ -64,11 +69,12 @@
188 """
189 implements(IMetric)
190
191- periods = [5 * ONE_MINUTE, ONE_HOUR, ONE_DAY]
192+ periods = (5 * ONE_MINUTE, ONE_HOUR, ONE_DAY)
193+ limiters = (HOUR_LIMITER,)
194
195 def __init__(self, name, wall_time_func=time.time, prefix="",
196- bucket_size=ONE_DAY, dsn=None, redis_host=None, redis_port=None,
197- longterm_distinct_days=60):
198+ bucket_size=ONE_DAY, dsn=None, redis_host=None,
199+ redis_port=None, longterm_distinct_days=60):
200 """Construct a metric we expect to be periodically updated.
201
202 @param name: Indicates what is being instrumented.
203@@ -85,7 +91,7 @@
204 self.dsn = dsn
205 self.redis_host = redis_host
206 if redis_port is None:
207- redis.port = 6379
208+ redis_port = 6379
209 self.redis_port = redis_port
210 self.metric_id = None
211 self.build_bucket()
212@@ -94,7 +100,7 @@
213 self.longterm_distinct_count = None
214 self.longterm_distinct_days = longterm_distinct_days
215
216- if redis_host != None:
217+ if redis_host is not None:
218 self.redis = redis.client.Redis(host=redis_host, port=redis_port)
219
220 if self.dsn is not None:
221@@ -143,6 +149,9 @@
222 bucket = self.bucket_name_for(period)
223 self.redis.zremrangebyscore(bucket, 0, now - period)
224 self.redis_count[bucket] = self.redis.zcard(bucket)
225+ for item, count in self.bucket.iteritems():
226+ for limiter in self.limiters:
227+ limiter.add(self.redis, "%s:%s" % (self.name, item), count)
228 finally:
229 self.redis_flush_lock.release()
230
231@@ -179,7 +188,7 @@
232 ".count_1day": self.count_1day(),
233 }
234
235- if self.longterm_distinct_count != None:
236+ if self.longterm_distinct_count is not None:
237 items[".count_%dday" % (self.longterm_distinct_days,)] = \
238 self.longterm_distinct_count
239
240@@ -197,7 +206,7 @@
241 row = cr.fetchone()
242 if row is None:
243 cr.execute("INSERT INTO paths (path) VALUES (%s) "
244- "RETURNING (id)", (path,))
245+ "RETURNING (id)", (path,))
246 row = cr.fetchone()
247 cr.execute("commit")
248
249@@ -205,8 +214,8 @@
250
251 for i, (k, v) in enumerate(bucket.iteritems()):
252 cr.execute("INSERT INTO points (path_id, bucket, value, count) "
253- "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
254- k, v))
255+ "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
256+ k, v))
257 if i % 1000 == 0:
258 cr.execute("commit")
259 cr.execute("commit")
260@@ -241,7 +250,7 @@
261 cr.execute("SELECT value, SUM(count) AS cnt FROM points "
262 "INNER JOIN paths ON (paths.id = points.path_id) "
263 "WHERE paths.path = %s AND bucket BETWEEN %s AND %s"
264- "GROUP BY value ORDER BY cnt DESC LIMIT %s", (
265+ "GROUP BY value ORDER BY cnt DESC, value ASC LIMIT %s", (
266 path, since_bucket, until_bucket, how_many,))
267 rows = cr.fetchall()
268 return rows
269
270=== added file 'distinctdb/ratelimit.py'
271--- distinctdb/ratelimit.py 1970-01-01 00:00:00 +0000
272+++ distinctdb/ratelimit.py 2013-12-03 12:47:43 +0000
273@@ -0,0 +1,49 @@
274+"""
275+A rate limiter, heavily based on https://chris6f.com/rate-limiting-with-redis
276+"""
277+
278+import time
279+import math
280+
281+
282+class RateLimiter(object):
283+
284+ def __init__(self, span, bucket_size, expires=None, tick=time.time):
285+ self.bucket_size = bucket_size
286+ self.buckets = int(math.floor(float(span) / bucket_size)) + 2
287+ self.span = span
288+ self.expires = span if expires is None else expires
289+ self.tick = tick
290+
291+ def add(self, client, key, count=1):
292+ p = client.pipeline()
293+ bucket = self.bucket()
294+ key = "rl:%s:%s" % (self.span, key)
295+ p.hincrby(key, bucket, count)
296+ p.expire(key, self.expires)
297+ p.hdel(key, (bucket + 1) % self.buckets)
298+ p.hdel(key, (bucket + 2) % self.buckets)
299+ p.execute()
300+
301+ def count(self, client, key, interval):
302+ p = client.pipeline()
303+ bucket = self.bucket()
304+
305+ key = "rl:%s:%s" % (self.span, key)
306+
307+ # Asking for an interval smaller than the bucket size rounds it up to
308+ # one bucket. We do this to avoid getting false negatives when
309+ # accidentally asking for too small an interval.
310+ count = max(int(math.floor(float(interval) / self.bucket_size)), 1)
311+ for i in xrange(0, count):
312+ p.hget(key, (bucket - i) % self.buckets)
313+
314+ results = p.execute()
315+ total = 0
316+ for value in results:
317+ if value is not None:
318+ total += int(value)
319+ return total
320+
321+ def bucket(self):
322+ return int((self.tick() / self.bucket_size) % self.buckets)
323
324=== modified file 'distinctdb/tests/test_distinct.py'
325--- distinctdb/tests/test_distinct.py 2012-06-27 20:04:53 +0000
326+++ distinctdb/tests/test_distinct.py 2013-12-03 12:47:43 +0000
327@@ -13,8 +13,7 @@
328 import subprocess
329
330 def check_output(args):
331- return subprocess.Popen(args,
332- stdout=subprocess.PIPE).communicate()[0]
333+ return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0]
334
335 import psycopg2
336 import redis
337@@ -49,7 +48,8 @@
338 return 42
339
340 def _get_distinct_top_value(self, since, until, how_many=20):
341- self.called.append(("get_distinct_top_value", (since, until, how_many)))
342+ self.called.append(("get_distinct_top_value",
343+ (since, until, how_many)))
344 return [("one", 1), ("two", 1)]
345
346
347@@ -105,7 +105,7 @@
348 dmr.flush(1, day)
349 dmr.update("three")
350 self.assertEquals(result,
351- {"bucket": {"one": 2, "two": 1}, "bucket_no": 0})
352+ {"bucket": {"one": 2, "two": 1}, "bucket_no": 0})
353 result = dmr.flush(1, day)
354
355 self.assertTrue(("test.max", 1, day) in result)
356@@ -119,7 +119,8 @@
357 o = TestOptions()
358 config_file = ConfigParser.RawConfigParser()
359 config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
360- "dsn = dbdsn\nbucket_size = 100\nlongterm_distinct_days = 30"))
361+ "dsn = dbdsn\nbucket_size = 100\n"
362+ "longterm_distinct_days = 30"))
363 o.configure(config_file)
364 dmf = distinctdbplugin.DistinctMetricFactory()
365 dmf.configure(o)
366@@ -184,7 +185,7 @@
367 class TestPlugin(TestCase):
368
369 def test_factory(self):
370- self.assertTrue(distinctdbplugin.distinct_metric_factory in \
371+ self.assertTrue(distinctdbplugin.distinct_metric_factory in
372 list(getPlugins(IMetricFactory)))
373
374
375@@ -473,8 +474,9 @@
376
377 o = TestOptions()
378 config_file = ConfigParser.RawConfigParser()
379- config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
380- "redis_host = localhost\nredis_port = 16379"))
381+ config_file.readfp(
382+ StringIO("[statsd]\n\n[plugin_distinctdb]\n"
383+ "redis_host = localhost\nredis_port = 16379"))
384 o.configure(config_file)
385 dmf = distinctdbplugin.DistinctMetricFactory()
386 dmf.configure(o)
387@@ -483,8 +485,8 @@
388 self.assertEquals(dmr.redis_port, 16379)
389
390 def test_usage(self):
391- dmr = distinct.DistinctMetricReporter("test",
392- redis_host="localhost", redis_port=16379)
393+ dmr = distinct.DistinctMetricReporter(
394+ "test", redis_host="localhost", redis_port=16379)
395
396 self.assertEquals(dmr.count_1hour(), 0)
397 dmr._update_count("one", 0)
398@@ -500,8 +502,8 @@
399 self.assertEquals(dmr.count_1hour(), 1)
400
401 def test_load(self):
402- dmr = distinct.DistinctMetricReporter("test",
403- redis_host="localhost", redis_port=16379)
404+ dmr = distinct.DistinctMetricReporter(
405+ "test", redis_host="localhost", redis_port=16379)
406 start = time.time()
407 for i in range(10000):
408 dmr.update(str(i % 1000))
409@@ -511,10 +513,17 @@
410 if w == 0:
411 break
412 time.sleep(0.1)
413+
414 dmr._flush_redis(time.time())
415 duration = time.time() - start
416 self.assertEquals(dmr.count_1hour(), 1000)
417 self.assertTrue(duration < 10)
418+ # self.assertEquals(10, distinct.MINUTE_LIMITER.count(
419+ # dmr.redis, "test:42", 10))
420+ self.assertEquals(10, distinct.HOUR_LIMITER.count(
421+ dmr.redis, "test:42", 10 * distinct.ONE_MINUTE))
422+ # self.assertEquals(10, distinct.DAY_LIMITER.count(
423+ # dmr.redis, "test:42", 3 * distinct.ONE_HOUR))
424
425 def test_bucket_name(self):
426 dmr1 = distinct.DistinctMetricReporter("somename")
427
428=== added file 'distinctdb/tests/test_ratelimit.py'
429--- distinctdb/tests/test_ratelimit.py 1970-01-01 00:00:00 +0000
430+++ distinctdb/tests/test_ratelimit.py 2013-12-03 12:47:43 +0000
431@@ -0,0 +1,79 @@
432+import time
433+import redis
434+
435+from distinctdb.ratelimit import RateLimiter
436+from twisted.trial.unittest import TestCase
437+
438+
439+class Clock(object):
440+
441+ def __init__(self):
442+ self.value = 0
443+
444+ def set(self, value):
445+ self.value = value
446+
447+ def __call__(self):
448+ return self.value
449+
450+
451+class RateLimitTest(TestCase):
452+
453+ def setUp(self):
454+ self.client = redis.client.Redis(host="localhost", port=16379)
455+ self.clock = Clock()
456+ self.limiter = RateLimiter(span=60, bucket_size=10,
457+ tick=self.clock)
458+ return super(RateLimitTest, self).setUp()
459+
460+ def tearDown(self):
461+ self.client.flushdb()
462+ return super(RateLimitTest, self).tearDown()
463+
464+ def test_count(self):
465+ for i in xrange(10):
466+ self.clock.set(i * 10)
467+ self.limiter.add(self.client, 'test')
468+
469+ self.assertEqual(6, self.limiter.count(self.client, 'test', 80))
470+ self.assertEqual(6, self.limiter.count(self.client, 'test', 60))
471+ self.assertEqual(4, self.limiter.count(self.client, 'test', 40))
472+ self.assertEqual(2, self.limiter.count(self.client, 'test', 20))
473+ self.assertEqual(1, self.limiter.count(self.client, 'test', 10))
474+
475+ # Asking for an interval smaller than the bucket size rounds it up to
476+ # one bucket. We do this to avoid getting false negatives when
477+ # accidentally asking for too small an interval.
478+ self.assertEqual(1, self.limiter.count(self.client, 'test', 1))
479+
480+ self.clock.set(100)
481+ self.assertEqual(3, self.limiter.count(self.client, 'test', 40))
482+ self.assertEqual(1, self.limiter.count(self.client, 'test', 20))
483+ self.assertEqual(0, self.limiter.count(self.client, 'test', 10))
484+
485+ self.clock.set(110)
486+ self.assertEqual(2, self.limiter.count(self.client, 'test', 40))
487+ self.assertEqual(0, self.limiter.count(self.client, 'test', 20))
488+ self.assertEqual(0, self.limiter.count(self.client, 'test', 10))
489+
490+
491+class RateLimitExpireTest(TestCase):
492+
493+ def setUp(self):
494+ self.client = redis.client.Redis(host="localhost", port=16379)
495+ self.clock = Clock()
496+ self.limiter = RateLimiter(span=1, bucket_size=1,
497+ tick=self.clock)
498+ return super(RateLimitExpireTest, self).setUp()
499+
500+ def tearDown(self):
501+ self.client.flushdb()
502+ return super(RateLimitExpireTest, self).tearDown()
503+
504+ def test_expire(self):
505+ for i in xrange(2):
506+ self.limiter.add(self.client, 'test')
507+
508+ self.assertEqual(2, self.limiter.count(self.client, 'test', 1))
509+ time.sleep(2)
510+ self.assertEqual(0, self.limiter.count(self.client, 'test', 1))

Subscribers

People subscribed via source and target branches