Merge lp:~lucio.torre/txstatsd/add-redis-to-distinct-plugin into lp:~txstatsd-dev/txstatsd/distinct-plugin

Proposed by Lucio Torre
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 7
Merged at revision: 3
Proposed branch: lp:~lucio.torre/txstatsd/add-redis-to-distinct-plugin
Merge into: lp:~txstatsd-dev/txstatsd/distinct-plugin
Diff against target: 542 lines (+398/-14)
7 files modified
Makefile (+26/-0)
bin/redis.conf (+213/-0)
bin/start-redis.sh (+5/-0)
bin/stop-redis.sh (+3/-0)
distinctdb/distinctmetric.py (+62/-6)
distinctdb/tests/test_distinct.py (+80/-4)
twisted/plugins/distinctdbplugin.py (+9/-4)
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-redis-to-distinct-plugin
Reviewer Review Type Date Requested Status
Sidnei da Silva Approve
Review via email: mp+91821@code.launchpad.net

Commit message

Use redis for realtime stats.

Description of the change

yes? please?

To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'Makefile'
2--- Makefile 1970-01-01 00:00:00 +0000
3+++ Makefile 2012-02-07 13:11:25 +0000
4@@ -0,0 +1,26 @@
5+
6+trial:
7+ trial distinctdb/
8+
9+start-database:
10+ ./bin/start-database.sh
11+ psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
12+
13+stop-database:
14+ ./bin/stop-database.sh
15+
16+start-redis:
17+ ./bin/start-redis.sh
18+
19+stop-redis:
20+ ./bin/stop-redis.sh
21+
22+start: start-database start-redis
23+
24+stop: stop-redis stop-database
25+
26+clean:
27+ rm -rf ./tmp/
28+test: start trial stop clean
29+
30+.PHONY: trial test start-database stop-database start-redis stop-redis start stop
31
32=== added file 'bin/redis.conf'
33--- bin/redis.conf 1970-01-01 00:00:00 +0000
34+++ bin/redis.conf 2012-02-07 13:11:25 +0000
35@@ -0,0 +1,213 @@
36+# Redis configuration file example
37+
38+# Note on units: when memory size is needed, it is possible to specifiy
39+# it in the usual form of 1k 5GB 4M and so forth:
40+#
41+# 1k => 1000 bytes
42+# 1kb => 1024 bytes
43+# 1m => 1000000 bytes
44+# 1mb => 1024*1024 bytes
45+# 1g => 1000000000 bytes
46+# 1gb => 1024*1024*1024 bytes
47+#
48+# units are case insensitive so 1GB 1Gb 1gB are all the same.
49+
50+# By default Redis does not run as a daemon. Use 'yes' if you need it.
51+# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
52+daemonize no
53+
54+# When running daemonized, Redis writes a pid file in /var/run/redis.pid by
55+# default. You can specify a custom pid file location here.
56+# pidfile ./tmp/redis.pid
57+
58+# Accept connections on the specified port, default is 6379
59+port 16379
60+
61+# If you want you can bind a single interface, if the bind option is not
62+# specified all the interfaces will listen for incoming connections.
63+#
64+bind 127.0.0.1
65+
66+# Close the connection after a client is idle for N seconds (0 to disable)
67+timeout 300
68+
69+# Set server verbosity to 'debug'
70+# it can be one of:
71+# debug (a lot of information, useful for development/testing)
72+# verbose (many rarely useful info, but not a mess like the debug level)
73+# notice (moderately verbose, what you want in production probably)
74+# warning (only very important / critical messages are logged)
75+loglevel debug
76+
77+# Specify the log file name. Also 'stdout' can be used to force
78+# Redis to log on the standard output. Note that if you use standard
79+# output for logging but daemonize, logs will be sent to /dev/null
80+logfile tmp/redis/redis-server.log
81+
82+# Set the number of databases. The default database is DB 0, you can select
83+# a different one on a per-connection basis using SELECT <dbid> where
84+# dbid is a number between 0 and 'databases'-1
85+databases 16
86+
87+################################ SNAPSHOTTING #################################
88+#
89+# Save the DB on disk:
90+#
91+# save <seconds> <changes>
92+#
93+# Will save the DB if both the given number of seconds and the given
94+# number of write operations against the DB occurred.
95+#
96+# In the example below the behaviour will be to save:
97+# after 900 sec (15 min) if at least 1 key changed
98+# after 300 sec (5 min) if at least 10 keys changed
99+# after 60 sec if at least 10000 keys changed
100+#
101+# Note: you can disable saving at all commenting all the "save" lines.
102+
103+save 900 1
104+save 300 10
105+save 60 10000
106+
107+# Compress string objects using LZF when dump .rdb databases?
108+# For default that's set to 'yes' as it's almost always a win.
109+# If you want to save some CPU in the saving child set it to 'no' but
110+# the dataset will likely be bigger if you have compressible values or keys.
111+rdbcompression yes
112+
113+# The filename where to dump the DB
114+dbfilename dump.rdb
115+
116+# The working directory.
117+#
118+# The DB will be written inside this directory, with the filename specified
119+# above using the 'dbfilename' configuration directive.
120+#
121+# Also the Append Only File will be created inside this directory.
122+#
123+# Note that you must specify a directory here, not a file name.
124+dir tmp/redis
125+
126+################################# REPLICATION #################################
127+
128+# Master-Slave replication. Use slaveof to make a Redis instance a copy of
129+# another Redis server. Note that the configuration is local to the slave
130+# so for example it is possible to configure the slave to save the DB with a
131+# different interval, or to listen to another port, and so on.
132+#
133+# slaveof <masterip> <masterport>
134+
135+# If the master is password protected (using the "requirepass" configuration
136+# directive below) it is possible to tell the slave to authenticate before
137+# starting the replication synchronization process, otherwise the master will
138+# refuse the slave request.
139+#
140+# masterauth <master-password>
141+
142+################################## SECURITY ###################################
143+
144+# Require clients to issue AUTH <PASSWORD> before processing any other
145+# commands. This might be useful in environments in which you do not trust
146+# others with access to the host running redis-server.
147+#
148+# This should stay commented out for backward compatibility and because most
149+# people do not need auth (e.g. they run their own servers).
150+#
151+# Warning: since Redis is pretty fast an outside user can try up to
152+# 150k passwords per second against a good box. This means that you should
153+# use a very strong password otherwise it will be very easy to break.
154+#
155+# requirepass foobared
156+
157+################################### LIMITS ####################################
158+
159+# Set the max number of connected clients at the same time. By default there
160+# is no limit, and it's up to the number of file descriptors the Redis process
161+# is able to open. The special value '0' means no limits.
162+# Once the limit is reached Redis will close all the new connections sending
163+# an error 'max number of clients reached'.
164+#
165+# maxclients 128
166+
167+# Don't use more memory than the specified amount of bytes.
168+# When the memory limit is reached Redis will try to remove keys with an
169+# EXPIRE set. It will try to start freeing keys that are going to expire
170+# in little time and preserve keys with a longer time to live.
171+# Redis will also try to remove objects from free lists if possible.
172+#
173+# If all this fails, Redis will start to reply with errors to commands
174+# that will use more memory, like SET, LPUSH, and so on, and will continue
175+# to reply to most read-only commands like GET.
176+#
177+# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a
178+# 'state' server or cache, not as a real DB. When Redis is used as a real
179+# database the memory usage will grow over the weeks, it will be obvious if
180+# it is going to use too much memory in the long run, and you'll have the time
181+# to upgrade. With maxmemory after the limit is reached you'll start to get
182+# errors for write operations, and this may even lead to DB inconsistency.
183+#
184+# maxmemory <bytes>
185+
186+############################## APPEND ONLY MODE ###############################
187+
188+# By default Redis asynchronously dumps the dataset on disk. If you can live
189+# with the idea that the latest records will be lost if something like a crash
190+# happens this is the preferred way to run Redis. If instead you care a lot
191+# about your data and don't want to that a single record can get lost you should
192+# enable the append only mode: when this mode is enabled Redis will append
193+# every write operation received in the file appendonly.aof. This file will
194+# be read on startup in order to rebuild the full dataset in memory.
195+#
196+# Note that you can have both the async dumps and the append only file if you
197+# like (you have to comment the "save" statements above to disable the dumps).
198+# Still if append only mode is enabled Redis will load the data from the
199+# log file at startup ignoring the dump.rdb file.
200+#
201+# IMPORTANT: Check the BGREWRITEAOF to check how to rewrite the append
202+# log file in background when it gets too big.
203+
204+appendonly no
205+
206+# The name of the append only file (default: "appendonly.aof")
207+# appendfilename appendonly.aof
208+
209+# The fsync() call tells the Operating System to actually write data on disk
210+# instead to wait for more data in the output buffer. Some OS will really flush
211+# data on disk, some other OS will just try to do it ASAP.
212+#
213+# Redis supports three different modes:
214+#
215+# no: don't fsync, just let the OS flush the data when it wants. Faster.
216+# always: fsync after every write to the append only log . Slow, Safest.
217+# everysec: fsync only if one second passed since the last fsync. Compromise.
218+#
219+# The default is "everysec" that's usually the right compromise between
220+# speed and data safety. It's up to you to understand if you can relax this to
221+# "no" that will will let the operating system flush the output buffer when
222+# it wants, for better performances (but if you can live with the idea of
223+# some data loss consider the default persistence mode that's snapshotting),
224+# or on the contrary, use "always" that's very slow but a bit safer than
225+# everysec.
226+#
227+# If unsure, use "everysec".
228+
229+# appendfsync always
230+appendfsync everysec
231+# appendfsync no
232+
233+############################### ADVANCED CONFIG ###############################
234+
235+# Glue small output buffers together in order to send small replies in a
236+# single TCP packet. Uses a bit more CPU but most of the times it is a win
237+# in terms of number of queries per second. Use 'yes' if unsure.
238+glueoutputbuf yes
239+
240+################################## INCLUDES ###################################
241+
242+# Include one or more other config files here. This is useful if you
243+# have a standard template that goes to all redis server but also need
244+# to customize a few per-server settings. Include files can include
245+# other files, so use this wisely.
246+#
247+# include /path/to/local.conf
248+# include /path/to/other.conf
249
250=== added file 'bin/start-redis.sh'
251--- bin/start-redis.sh 1970-01-01 00:00:00 +0000
252+++ bin/start-redis.sh 2012-02-07 13:11:25 +0000
253@@ -0,0 +1,5 @@
254+#!/bin/bash
255+
256+mkdir -p tmp/redis
257+/sbin/start-stop-daemon --start -b -m -d . -p tmp/redis.pid --exec /usr/bin/redis-server -- `pwd`/bin/redis.conf
258+
259
260=== added file 'bin/stop-redis.sh'
261--- bin/stop-redis.sh 1970-01-01 00:00:00 +0000
262+++ bin/stop-redis.sh 2012-02-07 13:11:25 +0000
263@@ -0,0 +1,3 @@
264+#!/bin/bash
265+
266+/sbin/start-stop-daemon --stop -p tmp/redis.pid --exec /usr/bin/redis-server -- `pwd`/bin/redis.conf
267
268=== modified file 'distinctdb/distinctmetric.py'
269--- distinctdb/distinctmetric.py 2011-12-09 14:47:23 +0000
270+++ distinctdb/distinctmetric.py 2012-02-07 13:11:25 +0000
271@@ -1,12 +1,16 @@
272 import time
273+import threading
274
275 import psycopg2
276+import redis
277
278 from zope.interface import implements
279-from twisted.internet.threads import deferToThread
280+from twisted.internet import reactor
281 from txstatsd.itxstatsd import IMetric
282
283-ONEDAY = 60 * 60 * 24
284+ONE_MINUTE = 60
285+ONE_HOUR = 60 * ONE_MINUTE
286+ONE_DAY = 24 * ONE_HOUR
287
288
289 class DistinctMetricReporter(object):
290@@ -16,8 +20,10 @@
291 """
292 implements(IMetric)
293
294+ periods = [5 * ONE_MINUTE, ONE_HOUR, ONE_DAY]
295+
296 def __init__(self, name, wall_time_func=time.time, prefix="",
297- bucket_size=ONEDAY, dsn=None):
298+ bucket_size=ONE_DAY, dsn=None, redis_host=None, redis_port=None):
299 """Construct a metric we expect to be periodically updated.
300
301 @param name: Indicates what is being instrumented.
302@@ -32,8 +38,17 @@
303 self.prefix = prefix
304 self.bucket_size = bucket_size
305 self.dsn = dsn
306+ self.redis_host = redis_host
307+ if redis_port is None:
308+ redis.port = 6379
309+ self.redis_port = redis_port
310 self.metric_id = None
311 self.build_bucket()
312+ self.redis_flush_lock = threading.Lock()
313+ self.redis_count = {}
314+
315+ if redis_host != None:
316+ self.redis = redis.client.Redis(host=redis_host, port=redis_port)
317
318 def build_bucket(self, timestamp=None):
319 self.max = 0
320@@ -55,6 +70,40 @@
321 if value > self.max:
322 self.max = value
323
324+ now = self.wall_time_func()
325+ if self.redis_host is not None:
326+ reactor.callInThread(self._update_count, item, now)
327+
328+ def bucket_name_for(self, period):
329+ return "bucket_" + str(period)
330+
331+ def _update_count(self, value, when):
332+ for period in self.periods:
333+ self.redis.zadd(self.bucket_name_for(period), value, when)
334+
335+ def _flush_redis(self, now):
336+ if self.redis_flush_lock.acquire(False) is False:
337+ return
338+ try:
339+ for period in self.periods:
340+ bucket = self.bucket_name_for(period)
341+ self.redis.zremrangebyscore(bucket, 0, now - period)
342+ self.redis_count[bucket] = self.redis.zcard(bucket)
343+ finally:
344+ self.redis_flush_lock.release()
345+
346+ def count(self, period):
347+ return self.redis_count.get(self.bucket_name_for(period), 0)
348+
349+ def count_5min(self):
350+ return self.count(5 * ONE_MINUTE)
351+
352+ def count_1hour(self):
353+ return self.count(ONE_HOUR)
354+
355+ def count_1day(self):
356+ return self.count(ONE_DAY)
357+
358 def _save_bucket(self, bucket, bucket_no):
359 path = self.prefix + self.name
360 if self.metric_id is None:
361@@ -80,7 +129,7 @@
362
363 def save_bucket(self, bucket, bucket_no):
364 if self.dsn is not None:
365- deferToThread(self._save_bucket, bucket, bucket_no)
366+ reactor.callInThread(self._save_bucket, bucket, bucket_no)
367
368 def flush(self, interval, timestamp):
369 current_bucket = self.get_bucket_no(timestamp)
370@@ -88,9 +137,16 @@
371 self.save_bucket(self.bucket, self.bucket_no)
372 self.build_bucket(timestamp)
373
374+ if self.redis_host is not None:
375+ reactor.callInThread(self._flush_redis, timestamp)
376+
377 metrics = []
378- items = {".count": len(self.bucket),
379- ".max": self.max}
380+ items = {".messages": len(self.bucket),
381+ ".max": self.max,
382+ ".count_5min": self.count_5min(),
383+ ".count_1hour": self.count_1hour(),
384+ ".count_1day": self.count_1day(),
385+ }
386 for item, value in items.iteritems():
387 metrics.append((self.prefix + self.name + item, value, timestamp))
388 return metrics
389
390=== modified file 'distinctdb/tests/test_distinct.py'
391--- distinctdb/tests/test_distinct.py 2011-12-09 20:04:06 +0000
392+++ distinctdb/tests/test_distinct.py 2012-02-07 13:11:25 +0000
393@@ -5,11 +5,19 @@
394 from cStringIO import StringIO
395 import os
396 import time
397-import subprocess
398+try:
399+ from subprocess import check_output
400+except ImportError:
401+ import subprocess
402+ def check_output(args):
403+ return subprocess.Popen(args,
404+ stdout=subprocess.PIPE).communicate()[0]
405
406 import psycopg2
407+import redis
408
409 from twisted.trial.unittest import TestCase
410+from twisted.internet import reactor
411 from twisted.plugin import getPlugins
412 from twisted.plugins import distinctdbplugin
413 from txstatsd.itxstatsd import IMetricFactory
414@@ -71,8 +79,10 @@
415 dmr.update("three")
416 self.assertEquals(result,
417 {"bucket": {"one": 2, "two": 1}, "bucket_no": 0})
418- self.assertEquals(dmr.flush(1, day),
419- [("test.max", 1, day), ("test.count", 1, day)])
420+ result = dmr.flush(1, day)
421+
422+ self.assertTrue(("test.max", 1, day) in result)
423+ self.assertTrue(("test.messages", 1, day) in result)
424
425 def test_configure(self):
426 class TestOptions(service.OptionsGlue):
427@@ -101,7 +111,7 @@
428 class TestDatabase(TestCase):
429
430 def setUp(self):
431- rootdir = subprocess.check_output(["bzr", "root"]).strip()
432+ rootdir = check_output(["bzr", "root"]).strip()
433 dsn_file = os.path.join(rootdir, "tmp", "pg.dsn")
434 self.dsn = open(dsn_file).read()
435 self.conn = psycopg2.connect(self.dsn)
436@@ -145,3 +155,69 @@
437 dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn)
438 dmr2._save_bucket({}, 0)
439 self.assertEquals(dmr.metric_id, dmr2.metric_id)
440+
441+
442+class TestRedis(TestCase):
443+
444+ def setUp(self):
445+ reactor._initThreadPool()
446+ reactor.threadpool.start()
447+
448+ def tearDown(self):
449+ r = redis.client.Redis(host="localhost", port=16379)
450+ r.flushdb()
451+ reactor.threadpool.stop()
452+
453+ def test_connect(self):
454+ r = redis.client.Redis(host="localhost", port=16379)
455+ r.ping()
456+
457+ def test_configure(self):
458+ class TestOptions(service.OptionsGlue):
459+ optParameters = [["test", "t", "default", "help"]]
460+ config_section = "statsd"
461+
462+ o = TestOptions()
463+ config_file = ConfigParser.RawConfigParser()
464+ config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
465+ "redis_host = localhost\nredis_port = 16379"))
466+ o.configure(config_file)
467+ dmf = distinctdbplugin.DistinctMetricFactory()
468+ dmf.configure(o)
469+ dmr = dmf.build_metric("foo", "bar", time.time)
470+ self.assertEquals(dmr.redis_host, "localhost")
471+ self.assertEquals(dmr.redis_port, 16379)
472+
473+ def test_usage(self):
474+ dmr = distinct.DistinctMetricReporter("test",
475+ redis_host="localhost", redis_port=16379)
476+
477+ self.assertEquals(dmr.count_1hour(), 0)
478+ dmr._update_count("one", 0)
479+ dmr._flush_redis(1)
480+ self.assertEquals(dmr.count_1hour(), 1)
481+ dmr._update_count("one", 0)
482+ dmr._flush_redis(1)
483+ self.assertEquals(dmr.count_1hour(), 1)
484+ dmr._update_count("two", 30 * distinct.ONE_MINUTE)
485+ dmr._flush_redis(30 * distinct.ONE_MINUTE)
486+ self.assertEquals(dmr.count_1hour(), 2)
487+ dmr._flush_redis(distinct.ONE_HOUR + 10 * distinct.ONE_MINUTE)
488+ self.assertEquals(dmr.count_1hour(), 1)
489+
490+ def test_load(self):
491+ dmr = distinct.DistinctMetricReporter("test",
492+ redis_host="localhost", redis_port=16379)
493+ start = time.time()
494+ for i in range(10000):
495+ dmr.update(str(i % 1000))
496+
497+ while True:
498+ w = len(reactor.threadpool.working)
499+ if w == 0:
500+ break
501+ time.sleep(0.1)
502+ dmr._flush_redis(time.time())
503+ duration = time.time() - start
504+ self.assertEquals(dmr.count_1hour(), 1000)
505+ self.assertTrue(duration < 10)
506
507=== modified file 'twisted/plugins/distinctdbplugin.py'
508--- twisted/plugins/distinctdbplugin.py 2011-12-08 21:08:38 +0000
509+++ twisted/plugins/distinctdbplugin.py 2012-02-07 13:11:25 +0000
510@@ -2,7 +2,7 @@
511
512 from twisted.plugin import IPlugin
513 from txstatsd.itxstatsd import IMetricFactory
514-from distinctdb.distinctmetric import DistinctMetricReporter, ONEDAY
515+from distinctdb.distinctmetric import DistinctMetricReporter, ONE_DAY
516
517
518 class DistinctMetricFactory(object):
519@@ -19,15 +19,20 @@
520 return DistinctMetricReporter(name, prefix=prefix,
521 wall_time_func=wall_time_func,
522 bucket_size=self.bucket_size,
523- dsn=self.dsn)
524+ dsn=self.dsn, redis_host=self.redis_host,
525+ redis_port=self.redis_port)
526
527 def configure(self, options):
528 self.section = dict(options.get("plugin_distinctdb", {}))
529 try:
530- self.bucket_size = int(self.section.get("bucket_size", ONEDAY))
531+ self.bucket_size = int(self.section.get("bucket_size", ONE_DAY))
532 except ValueError:
533- self.bucket_size = ONEDAY
534+ self.bucket_size = ONE_DAY
535
536 self.dsn = self.section.get("dsn", None)
537+ self.redis_host = self.section.get("redis_host", None)
538+ self.redis_port = self.section.get("redis_port", None)
539+ if self.redis_port is not None:
540+ self.redis_port = int(self.redis_port)
541
542 distinct_metric_factory = DistinctMetricFactory()

Subscribers

People subscribed via source and target branches