Merge lp:~lucio.torre/txstatsd/add-redis-to-distinct-plugin into lp:txstatsd

Proposed by Lucio Torre
Status: Rejected
Rejected by: Lucio Torre
Proposed branch: lp:~lucio.torre/txstatsd/add-redis-to-distinct-plugin
Merge into: lp:txstatsd
Diff against target: 977 lines (+903/-0) (has conflicts)
13 files modified
Makefile (+26/-0)
README (+10/-0)
bin/redis.conf (+312/-0)
bin/schema.sql (+3/-0)
bin/start-database.sh (+63/-0)
bin/start-redis.sh (+3/-0)
bin/stop-database.sh (+22/-0)
bin/stop-redis.sh (+1/-0)
distinctdb/distinctmetric.py (+160/-0)
distinctdb/tests/test_distinct.py (+200/-0)
distinctdb/version.py (+1/-0)
setup.py (+64/-0)
twisted/plugins/distinctdbplugin.py (+38/-0)
Conflict adding file README.  Moved existing file to README.moved.
Conflict adding file setup.py.  Moved existing file to setup.py.moved.
Conflict adding file twisted.  Moved existing file to twisted.moved.
To merge this branch: bzr merge lp:~lucio.torre/txstatsd/add-redis-to-distinct-plugin
Reviewer Review Type Date Requested Status
txStatsD Developers Pending
Review via email: mp+85948@code.launchpad.net

Description of the change

add redis support for realtime stats

To post a comment you must log in.
4. By Lucio Torre

added load test + cleanup

Unmerged revisions

7. By Lucio Torre

made compatible with old versions, even more

6. By Lucio Torre

made config compatible with lucid

5. By Lucio Torre

path to startstopdaemon

4. By Lucio Torre

added load test + cleanup

3. By Lucio Torre

redis support for realtime stats

2. By Lucio Torre

bzr merge lp:~lucio.torre/txstatsd/add-distinct-db-plugin

1. By Lucio Torre

empty plugin skeleton

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'Makefile'
2--- Makefile 1970-01-01 00:00:00 +0000
3+++ Makefile 2011-12-15 20:24:01 +0000
4@@ -0,0 +1,26 @@
5+
6+trial:
7+ trial distinctdb/
8+
9+start-database:
10+ ./bin/start-database.sh
11+ psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
12+
13+stop-database:
14+ ./bin/stop-database.sh
15+
16+start-redis:
17+ ./bin/start-redis.sh
18+
19+stop-redis:
20+ ./bin/stop-redis.sh
21+
22+start: start-database start-redis
23+
24+stop: stop-redis stop-database
25+
26+clean:
27+ rm -rf ./tmp/
28+test: start trial stop clean
29+
30+.PHONY: trial test start-database stop-database start-redis stop-redis start stop
31
32=== added file 'README'
33--- README 1970-01-01 00:00:00 +0000
34+++ README 2011-12-15 20:24:01 +0000
35@@ -0,0 +1,10 @@
36+To test:
37+./bin/start-database.sh
38+psql -h `pwd`/tmp/db1 -d distinct < bin/schema.sql
39+
40+Then:
41+trial distinctdb
42+
43+When done:
44+./bin/stop-database.sh
45+
46
47=== renamed file 'README' => 'README.moved'
48=== added directory 'bin'
49=== added file 'bin/redis.conf'
50--- bin/redis.conf 1970-01-01 00:00:00 +0000
51+++ bin/redis.conf 2011-12-15 20:24:01 +0000
52@@ -0,0 +1,312 @@
53+# Redis configuration file example
54+
55+# Note on units: when memory size is needed, it is possible to specifiy
56+# it in the usual form of 1k 5GB 4M and so forth:
57+#
58+# 1k => 1000 bytes
59+# 1kb => 1024 bytes
60+# 1m => 1000000 bytes
61+# 1mb => 1024*1024 bytes
62+# 1g => 1000000000 bytes
63+# 1gb => 1024*1024*1024 bytes
64+#
65+# units are case insensitive so 1GB 1Gb 1gB are all the same.
66+
67+# By default Redis does not run as a daemon. Use 'yes' if you need it.
68+# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
69+daemonize no
70+
71+# When running daemonized, Redis writes a pid file in /var/run/redis.pid by
72+# default. You can specify a custom pid file location here.
73+# pidfile ./tmp/redis.pid
74+
75+# Accept connections on the specified port, default is 6379
76+port 16379
77+
78+# If you want you can bind a single interface, if the bind option is not
79+# specified all the interfaces will listen for incoming connections.
80+#
81+bind 127.0.0.1
82+
83+# Close the connection after a client is idle for N seconds (0 to disable)
84+timeout 300
85+
86+# Set server verbosity to 'debug'
87+# it can be one of:
88+# debug (a lot of information, useful for development/testing)
89+# verbose (many rarely useful info, but not a mess like the debug level)
90+# notice (moderately verbose, what you want in production probably)
91+# warning (only very important / critical messages are logged)
92+loglevel verbose
93+
94+# Specify the log file name. Also 'stdout' can be used to force
95+# Redis to log on the standard output. Note that if you use standard
96+# output for logging but daemonize, logs will be sent to /dev/null
97+logfile tmp/redis/redis-server.log
98+
99+# Set the number of databases. The default database is DB 0, you can select
100+# a different one on a per-connection basis using SELECT <dbid> where
101+# dbid is a number between 0 and 'databases'-1
102+databases 16
103+
104+################################ SNAPSHOTTING #################################
105+#
106+# Save the DB on disk:
107+#
108+# save <seconds> <changes>
109+#
110+# Will save the DB if both the given number of seconds and the given
111+# number of write operations against the DB occurred.
112+#
113+# In the example below the behaviour will be to save:
114+# after 900 sec (15 min) if at least 1 key changed
115+# after 300 sec (5 min) if at least 10 keys changed
116+# after 60 sec if at least 10000 keys changed
117+#
118+# Note: you can disable saving at all commenting all the "save" lines.
119+
120+save 900 1
121+save 300 10
122+save 60 10000
123+
124+# Compress string objects using LZF when dump .rdb databases?
125+# For default that's set to 'yes' as it's almost always a win.
126+# If you want to save some CPU in the saving child set it to 'no' but
127+# the dataset will likely be bigger if you have compressible values or keys.
128+rdbcompression yes
129+
130+# The filename where to dump the DB
131+dbfilename dump.rdb
132+
133+# The working directory.
134+#
135+# The DB will be written inside this directory, with the filename specified
136+# above using the 'dbfilename' configuration directive.
137+#
138+# Also the Append Only File will be created inside this directory.
139+#
140+# Note that you must specify a directory here, not a file name.
141+dir tmp/redis
142+
143+################################# REPLICATION #################################
144+
145+# Master-Slave replication. Use slaveof to make a Redis instance a copy of
146+# another Redis server. Note that the configuration is local to the slave
147+# so for example it is possible to configure the slave to save the DB with a
148+# different interval, or to listen to another port, and so on.
149+#
150+# slaveof <masterip> <masterport>
151+
152+# If the master is password protected (using the "requirepass" configuration
153+# directive below) it is possible to tell the slave to authenticate before
154+# starting the replication synchronization process, otherwise the master will
155+# refuse the slave request.
156+#
157+# masterauth <master-password>
158+
159+################################## SECURITY ###################################
160+
161+# Require clients to issue AUTH <PASSWORD> before processing any other
162+# commands. This might be useful in environments in which you do not trust
163+# others with access to the host running redis-server.
164+#
165+# This should stay commented out for backward compatibility and because most
166+# people do not need auth (e.g. they run their own servers).
167+#
168+# Warning: since Redis is pretty fast an outside user can try up to
169+# 150k passwords per second against a good box. This means that you should
170+# use a very strong password otherwise it will be very easy to break.
171+#
172+# requirepass foobared
173+
174+################################### LIMITS ####################################
175+
176+# Set the max number of connected clients at the same time. By default there
177+# is no limit, and it's up to the number of file descriptors the Redis process
178+# is able to open. The special value '0' means no limits.
179+# Once the limit is reached Redis will close all the new connections sending
180+# an error 'max number of clients reached'.
181+#
182+# maxclients 128
183+
184+# Don't use more memory than the specified amount of bytes.
185+# When the memory limit is reached Redis will try to remove keys with an
186+# EXPIRE set. It will try to start freeing keys that are going to expire
187+# in little time and preserve keys with a longer time to live.
188+# Redis will also try to remove objects from free lists if possible.
189+#
190+# If all this fails, Redis will start to reply with errors to commands
191+# that will use more memory, like SET, LPUSH, and so on, and will continue
192+# to reply to most read-only commands like GET.
193+#
194+# WARNING: maxmemory can be a good idea mainly if you want to use Redis as a
195+# 'state' server or cache, not as a real DB. When Redis is used as a real
196+# database the memory usage will grow over the weeks, it will be obvious if
197+# it is going to use too much memory in the long run, and you'll have the time
198+# to upgrade. With maxmemory after the limit is reached you'll start to get
199+# errors for write operations, and this may even lead to DB inconsistency.
200+#
201+# maxmemory <bytes>
202+
203+############################## APPEND ONLY MODE ###############################
204+
205+# By default Redis asynchronously dumps the dataset on disk. If you can live
206+# with the idea that the latest records will be lost if something like a crash
207+# happens this is the preferred way to run Redis. If instead you care a lot
208+# about your data and don't want to that a single record can get lost you should
209+# enable the append only mode: when this mode is enabled Redis will append
210+# every write operation received in the file appendonly.aof. This file will
211+# be read on startup in order to rebuild the full dataset in memory.
212+#
213+# Note that you can have both the async dumps and the append only file if you
214+# like (you have to comment the "save" statements above to disable the dumps).
215+# Still if append only mode is enabled Redis will load the data from the
216+# log file at startup ignoring the dump.rdb file.
217+#
218+# IMPORTANT: Check the BGREWRITEAOF to check how to rewrite the append
219+# log file in background when it gets too big.
220+
221+appendonly no
222+
223+# The name of the append only file (default: "appendonly.aof")
224+# appendfilename appendonly.aof
225+
226+# The fsync() call tells the Operating System to actually write data on disk
227+# instead to wait for more data in the output buffer. Some OS will really flush
228+# data on disk, some other OS will just try to do it ASAP.
229+#
230+# Redis supports three different modes:
231+#
232+# no: don't fsync, just let the OS flush the data when it wants. Faster.
233+# always: fsync after every write to the append only log . Slow, Safest.
234+# everysec: fsync only if one second passed since the last fsync. Compromise.
235+#
236+# The default is "everysec" that's usually the right compromise between
237+# speed and data safety. It's up to you to understand if you can relax this to
238+# "no" that will will let the operating system flush the output buffer when
239+# it wants, for better performances (but if you can live with the idea of
240+# some data loss consider the default persistence mode that's snapshotting),
241+# or on the contrary, use "always" that's very slow but a bit safer than
242+# everysec.
243+#
244+# If unsure, use "everysec".
245+
246+# appendfsync always
247+appendfsync everysec
248+# appendfsync no
249+
250+################################ VIRTUAL MEMORY ###############################
251+
252+# Virtual Memory allows Redis to work with datasets bigger than the actual
253+# amount of RAM needed to hold the whole dataset in memory.
254+# In order to do so very used keys are taken in memory while the other keys
255+# are swapped into a swap file, similarly to what operating systems do
256+# with memory pages.
257+#
258+# To enable VM just set 'vm-enabled' to yes, and set the following three
259+# VM parameters accordingly to your needs.
260+
261+vm-enabled no
262+# vm-enabled yes
263+
264+# This is the path of the Redis swap file. As you can guess, swap files
265+# can't be shared by different Redis instances, so make sure to use a swap
266+# file for every redis process you are running. Redis will complain if the
267+# swap file is already in use.
268+#
269+# The best kind of storage for the Redis swap file (that's accessed at random)
270+# is a Solid State Disk (SSD).
271+#
272+# *** WARNING *** if you are using a shared hosting the default of putting
273+# the swap file under /tmp is not secure. Create a dir with access granted
274+# only to Redis user and configure Redis to create the swap file there.
275+vm-swap-file tmp/redis/redis.swap
276+
277+# vm-max-memory configures the VM to use at max the specified amount of
278+# RAM. Everything that deos not fit will be swapped on disk *if* possible, that
279+# is, if there is still enough contiguous space in the swap file.
280+#
281+# With vm-max-memory 0 the system will swap everything it can. Not a good
282+# default, just specify the max amount of RAM you can in bytes, but it's
283+# better to leave some margin. For instance specify an amount of RAM
284+# that's more or less between 60 and 80% of your free RAM.
285+vm-max-memory 0
286+
287+# Redis swap files is split into pages. An object can be saved using multiple
288+# contiguous pages, but pages can't be shared between different objects.
289+# So if your page is too big, small objects swapped out on disk will waste
290+# a lot of space. If you page is too small, there is less space in the swap
291+# file (assuming you configured the same number of total swap file pages).
292+#
293+# If you use a lot of small objects, use a page size of 64 or 32 bytes.
294+# If you use a lot of big objects, use a bigger page size.
295+# If unsure, use the default :)
296+vm-page-size 32
297+
298+# Number of total memory pages in the swap file.
299+# Given that the page table (a bitmap of free/used pages) is taken in memory,
300+# every 8 pages on disk will consume 1 byte of RAM.
301+#
302+# The total swap size is vm-page-size * vm-pages
303+#
304+# With the default of 32-bytes memory pages and 134217728 pages Redis will
305+# use a 4 GB swap file, that will use 16 MB of RAM for the page table.
306+#
307+# It's better to use the smallest acceptable value for your application,
308+# but the default is large in order to work in most conditions.
309+vm-pages 134217728
310+
311+# Max number of VM I/O threads running at the same time.
312+# This threads are used to read/write data from/to swap file, since they
313+# also encode and decode objects from disk to memory or the reverse, a bigger
314+# number of threads can help with big objects even if they can't help with
315+# I/O itself as the physical device may not be able to couple with many
316+# reads/writes operations at the same time.
317+#
318+# The special value of 0 turn off threaded I/O and enables the blocking
319+# Virtual Memory implementation.
320+vm-max-threads 4
321+
322+############################### ADVANCED CONFIG ###############################
323+
324+# Glue small output buffers together in order to send small replies in a
325+# single TCP packet. Uses a bit more CPU but most of the times it is a win
326+# in terms of number of queries per second. Use 'yes' if unsure.
327+glueoutputbuf yes
328+
329+# Hashes are encoded in a special way (much more memory efficient) when they
330+# have at max a given numer of elements, and the biggest element does not
331+# exceed a given threshold. You can configure this limits with the following
332+# configuration directives.
333+hash-max-zipmap-entries 64
334+hash-max-zipmap-value 512
335+
336+# Active rehashing uses 1 millisecond every 100 milliseconds of CPU time in
337+# order to help rehashing the main Redis hash table (the one mapping top-level
338+# keys to values). The hash table implementation redis uses (see dict.c)
339+# performs a lazy rehashing: the more operation you run into an hash table
340+# that is rhashing, the more rehashing "steps" are performed, so if the
341+# server is idle the rehashing is never complete and some more memory is used
342+# by the hash table.
343+#
344+# The default is to use this millisecond 10 times every second in order to
345+# active rehashing the main dictionaries, freeing memory when possible.
346+#
347+# If unsure:
348+# use "activerehashing no" if you have hard latency requirements and it is
349+# not a good thing in your environment that Redis can reply form time to time
350+# to queries with 2 milliseconds delay.
351+#
352+# use "activerehashing yes" if you don't have such hard requirements but
353+# want to free memory asap when possible.
354+activerehashing yes
355+
356+################################## INCLUDES ###################################
357+
358+# Include one or more other config files here. This is useful if you
359+# have a standard template that goes to all redis server but also need
360+# to customize a few per-server settings. Include files can include
361+# other files, so use this wisely.
362+#
363+# include /path/to/local.conf
364+# include /path/to/other.conf
365
366=== added file 'bin/schema.sql'
367--- bin/schema.sql 1970-01-01 00:00:00 +0000
368+++ bin/schema.sql 2011-12-15 20:24:01 +0000
369@@ -0,0 +1,3 @@
370+CREATE TABLE paths (id SERIAL PRIMARY KEY NOT NULL, path TEXT NOT NULL UNIQUE);
371+CREATE TABLE points (path_id INTEGER, bucket INTEGER, value TEXT, count INTEGER);
372+CREATE INDEX points_idx ON points (path_id, bucket);
373
374=== added file 'bin/start-database.sh'
375--- bin/start-database.sh 1970-01-01 00:00:00 +0000
376+++ bin/start-database.sh 2011-12-15 20:24:01 +0000
377@@ -0,0 +1,63 @@
378+#! /bin/bash
379+
380+ROOTDIR=${ROOTDIR:-`bzr root`}
381+if [ ! -d "$ROOTDIR" ]; then
382+ echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
383+ exit 1
384+fi
385+
386+DATABASES="
387+distinct
388+"
389+
390+function setup_database() {
391+ local TESTDIR=$1
392+
393+ echo "## Starting postgres in $TESTDIR ##"
394+ mkdir -p "$TESTDIR/data"
395+ chmod 700 "$TESTDIR/data"
396+
397+ export PGHOST="$TESTDIR"
398+ export PGDATA="$TESTDIR/data"
399+ if [ -d /usr/lib/postgresql/8.4 ]; then
400+ export PGBINDIR=/usr/lib/postgresql/8.4/bin
401+ elif [ -d /usr/lib/postgresql/8.3 ]; then
402+ export PGBINDIR=/usr/lib/postgresql/8.3/bin
403+ else
404+ echo "Cannot find valid parent for PGBINDIR"
405+ fi
406+ $PGBINDIR/initdb -E UNICODE -D $PGDATA
407+ # set up the database options file
408+ if [ ! -e $PGDATA/postgresql.conf ]; then
409+ echo "PostgreSQL data directory apparently didn't init"
410+ else
411+ (
412+ cat <<EOF
413+search_path='\$user,public,ts2'
414+add_missing_from=false
415+log_statement='all'
416+log_line_prefix='[%m] %q%u@%d %c '
417+fsync = off
418+EOF
419+ ) > $PGDATA/postgresql.conf
420+ fi
421+ $PGBINDIR/initdb -A trust &>/dev/null
422+ $PGBINDIR/pg_ctl start -w -D $TESTDIR/data -l $TESTDIR/postgres.log -o "-F -k $TESTDIR -h ''"
423+ for db in $DATABASES; do
424+ $PGBINDIR/createdb --encoding UNICODE "$db" &>/dev/null
425+ $PGBINDIR/createlang plpgsql "$db"
426+ done
427+ $PGBINDIR/createuser --superuser --createdb "postgres" &>/dev/null
428+ # create the additional users we need via a psql script
429+ $PGBINDIR/psql -U postgres template1 <<EOF
430+CREATE ROLE client INHERIT;
431+
432+CREATE USER client IN ROLE client;
433+EOF
434+ echo "To set your environment so psql will connect to this DB instance type:"
435+ echo " export PGHOST=$TESTDIR"
436+ echo "## Done. ##"
437+ echo -n host=$TESTDIR dbname=distinct > $ROOTDIR/tmp/pg.dsn
438+}
439+
440+setup_database $ROOTDIR/tmp/db1
441
442=== added file 'bin/start-redis.sh'
443--- bin/start-redis.sh 1970-01-01 00:00:00 +0000
444+++ bin/start-redis.sh 2011-12-15 20:24:01 +0000
445@@ -0,0 +1,3 @@
446+mkdir -p tmp/redis
447+start-stop-daemon --start -b -m -d . -p tmp/redis.pid --exec /usr/bin/redis-server -- `pwd`/bin/redis.conf
448+
449
450=== added file 'bin/stop-database.sh'
451--- bin/stop-database.sh 1970-01-01 00:00:00 +0000
452+++ bin/stop-database.sh 2011-12-15 20:24:01 +0000
453@@ -0,0 +1,22 @@
454+#! /bin/bash
455+
456+ROOTDIR=${ROOTDIR:-`bzr root`}
457+if [ ! -d "$ROOTDIR" ]; then
458+ echo "ROOTDIR '$ROOTDIR' doesn't exist" >&2
459+ exit 1
460+fi
461+
462+if [ -d /usr/lib/postgresql/8.4 ]; then
463+ PGBINDIR=/usr/lib/postgresql/8.4/bin
464+elif [ -d /usr/lib/postgresql/8.3 ]; then
465+ PGBINDIR=/usr/lib/postgresql/8.3/bin
466+else
467+ echo "Cannot find valid parent for PGBINDIR"
468+fi
469+
470+# setting PGDATA tells pg_ctl which DB to talk to
471+export PGDATA=$ROOTDIR/tmp/db1/data/
472+$PGBINDIR/pg_ctl status > /dev/null
473+if [ $? = 0 ]; then
474+ $PGBINDIR/pg_ctl stop -t 60 -w -m fast
475+fi
476
477=== added file 'bin/stop-redis.sh'
478--- bin/stop-redis.sh 1970-01-01 00:00:00 +0000
479+++ bin/stop-redis.sh 2011-12-15 20:24:01 +0000
480@@ -0,0 +1,1 @@
481+start-stop-daemon --stop -p tmp/redis.pid --exec /usr/bin/redis-server -- `pwd`/bin/redis.conf
482
483=== added directory 'distinctdb'
484=== added file 'distinctdb/__init__.py'
485=== added file 'distinctdb/distinctmetric.py'
486--- distinctdb/distinctmetric.py 1970-01-01 00:00:00 +0000
487+++ distinctdb/distinctmetric.py 2011-12-15 20:24:01 +0000
488@@ -0,0 +1,160 @@
489+import time
490+import threading
491+
492+import psycopg2
493+import redis
494+
495+from zope.interface import implements
496+from twisted.internet import reactor
497+from txstatsd.itxstatsd import IMetric
498+
499+ONE_MINUTE = 60
500+ONE_HOUR = 60 * ONE_MINUTE
501+ONE_DAY = 24 * ONE_HOUR
502+
503+
504+class DistinctMetricReporter(object):
505+ """
506+ Keeps an mesurement of the distinct numbers of items seen and the times
507+ it has seen each one.
508+ """
509+ implements(IMetric)
510+
511+ periods = [ONE_MINUTE, 5 * ONE_MINUTE, ONE_HOUR, 12 * ONE_HOUR, ONE_DAY]
512+
513+ def __init__(self, name, wall_time_func=time.time, prefix="",
514+ bucket_size=ONE_DAY, dsn=None, redis_host=None, redis_port=None):
515+ """Construct a metric we expect to be periodically updated.
516+
517+ @param name: Indicates what is being instrumented.
518+ @param wall_time_func: Function for obtaining wall time.
519+ @param prefix: If present, a string to prepend to the message
520+ composed when C{report} is called.
521+ """
522+ self.name = name
523+ self.wall_time_func = wall_time_func
524+ if prefix:
525+ prefix += '.'
526+ self.prefix = prefix
527+ self.bucket_size = bucket_size
528+ self.dsn = dsn
529+ self.redis_host = redis_host
530+ if redis_port is None:
531+ redis.port = 6379
532+ self.redis_port = redis_port
533+ self.metric_id = None
534+ self.build_bucket()
535+ self.redis_flush_lock = threading.Lock()
536+ self.redis_count = {}
537+
538+ if redis_host != None:
539+ self.redis = redis.client.Redis(host=redis_host, port=redis_port)
540+
541+ def build_bucket(self, timestamp=None):
542+ self.max = 0
543+ self.bucket = {}
544+ self.bucket_no = self.get_bucket_no(timestamp)
545+
546+ def get_bucket_no(self, timestamp=None):
547+ if timestamp is None:
548+ timestamp = self.wall_time_func()
549+ return int(timestamp / (self.bucket_size))
550+
551+ def process(self, fields):
552+ self.update(fields[0])
553+
554+ def update(self, item):
555+ value = self.bucket.get(item, 0) + 1
556+
557+ self.bucket[item] = value
558+ if value > self.max:
559+ self.max = value
560+
561+ now = self.wall_time_func()
562+ if self.redis_host is not None:
563+ reactor.callInThread(self._update_count, value, now)
564+
565+ def bucket_name_for(self, period):
566+ return "bucket_" + str(period)
567+
568+ def _update_count(self, period, value, when):
569+ for period in self.periods:
570+ self.redis.zadd(self.bucket_name_for(period), value, when)
571+
572+ def _flush_redis(self, now):
573+ if self.redis_flush_lock.acquire(False) is False:
574+ return
575+ try:
576+ for period in self.periods:
577+ bucket = self.bucket_name_for(period)
578+ self.redis.zremrangebyscore(bucket, 0, now - period)
579+ self.redis_count[bucket] = self.redis.zcard(bucket)
580+ finally:
581+ self.redis_flush_lock.release()
582+
583+ def count(self, period):
584+ return self.redis_count.get(self.bucket_name_for(period), 0)
585+
586+ def count_1min(self):
587+ return self.count(ONE_MINUTE)
588+
589+ def count_5min(self):
590+ return self.count(5 * ONE_MINUTE)
591+
592+ def count_1hour(self):
593+ return self.count(ONE_HOUR)
594+
595+ def count_12hours(self):
596+ return self.count(12 * ONE_HOUR)
597+
598+ def count_1day(self):
599+ return self.count(ONE_DAY)
600+
601+ def _save_bucket(self, bucket, bucket_no):
602+ path = self.prefix + self.name
603+ if self.metric_id is None:
604+ c = psycopg2.connect(self.dsn)
605+ cr = c.cursor()
606+ cr.execute("SELECT * FROM paths WHERE path = %s", (path,))
607+ row = cr.fetchone()
608+ if row is None:
609+ cr.execute("INSERT INTO paths (path) VALUES (%s) "
610+ "RETURNING (id)", (path,))
611+ row = cr.fetchone()
612+ cr.execute("commit")
613+
614+ self.metric_id = row[0]
615+
616+ for i, (k, v) in enumerate(bucket.iteritems()):
617+ cr.execute("INSERT INTO points (path_id, bucket, value, count) "
618+ "VALUES (%s, %s, %s, %s)", (self.metric_id, bucket_no,
619+ k, v))
620+ if i % 1000 == 0:
621+ cr.execute("commit")
622+ cr.execute("commit")
623+
624+ def save_bucket(self, bucket, bucket_no):
625+ if self.dsn is not None:
626+ reactor.callInThread(self._save_bucket, bucket, bucket_no)
627+
628+ def flush(self, interval, timestamp):
629+ current_bucket = self.get_bucket_no(timestamp)
630+ if current_bucket != self.bucket_no:
631+ self.save_bucket(self.bucket, self.bucket_no)
632+ self.build_bucket(timestamp)
633+
634+ if self.redis_host is not None:
635+ reactor.callInThread(self._flush_redis, timestamp)
636+
637+ metrics = []
638+ items = {".messages": len(self.bucket),
639+ ".max": self.max,
640+ ".count_1min": self.count_1min(),
641+ ".count_5min": self.count_5min(),
642+ ".count_1hour": self.count_1hour(),
643+ ".count_12hours": self.count_12hours(),
644+ ".count_1day": self.count_1day(),
645+ }
646+ for item, value in items.iteritems():
647+ metrics.append((self.prefix + self.name + item, value, timestamp))
648+ return metrics
649
650=== added directory 'distinctdb/tests'
651=== added file 'distinctdb/tests/__init__.py'
652=== added file 'distinctdb/tests/test_distinct.py'
653--- distinctdb/tests/test_distinct.py 1970-01-01 00:00:00 +0000
654+++ distinctdb/tests/test_distinct.py 2011-12-15 20:24:01 +0000
655@@ -0,0 +1,200 @@
656+# Copyright (C) 2011 Canonical
657+# All Rights Reserved
658+
659+import ConfigParser
660+from cStringIO import StringIO
661+import os
662+import time
663+try:
664+ from subprocess import check_output
665+except ImportError:
666+ import subprocess
667+ def check_output(args):
668+ return subprocess.Popen(args,
669+ stdout=subprocess.PIPE).communicate()[0]
670+
671+import psycopg2
672+import redis
673+
674+from twisted.trial.unittest import TestCase
675+from twisted.plugin import getPlugins
676+from twisted.plugins import distinctdbplugin
677+from txstatsd.itxstatsd import IMetricFactory
678+from txstatsd import service
679+
680+from distinctdb import distinctmetric as distinct
681+
682+
683+class TestDistinctMetricReporter(TestCase):
684+
685+ def test_get_bucket_no(self):
686+ _wall_time = [0]
687+
688+ def _time():
689+ return _wall_time[0]
690+
691+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
692+ self.assertEquals(dmr.get_bucket_no(), 0)
693+ _wall_time = [60 * 60 * 24 + 1]
694+ dmr.update("one")
695+ self.assertEquals(dmr.get_bucket_no(), 1)
696+
697+ def test_max(self):
698+ _wall_time = [0]
699+
700+ def _time():
701+ return _wall_time[0]
702+
703+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
704+ self.assertEquals(dmr.get_bucket_no(), 0)
705+ self.assertEquals(dmr.max, 0)
706+ dmr.update("one")
707+ dmr.update("one")
708+ dmr.update("two")
709+ self.assertEquals(dmr.max, 2)
710+ dmr.flush(1, 60 * 60 * 24 + 1)
711+ dmr.update("one")
712+ self.assertEquals(dmr.max, 1)
713+
714+ def test_reports(self):
715+ _wall_time = [0]
716+
717+ def _time():
718+ return _wall_time[0]
719+
720+ result = {}
721+
722+ dmr = distinct.DistinctMetricReporter("test", wall_time_func=_time)
723+
724+ def save(b, b_no):
725+ result["bucket"] = b
726+ result["bucket_no"] = b_no
727+ dmr.save_bucket = save
728+ dmr.update("one")
729+ dmr.update("one")
730+ dmr.update("two")
731+ day = 60 * 60 * 24 + 1
732+ dmr.flush(1, day)
733+ dmr.update("three")
734+ self.assertEquals(result,
735+ {"bucket": {"one": 2, "two": 1}, "bucket_no": 0})
736+ result = dmr.flush(1, day)
737+
738+ self.assertTrue(("test.max", 1, day) in result)
739+ self.assertTrue(("test.messages", 1, day) in result)
740+
741+ def test_configure(self):
742+ class TestOptions(service.OptionsGlue):
743+ optParameters = [["test", "t", "default", "help"]]
744+ config_section = "statsd"
745+
746+ o = TestOptions()
747+ config_file = ConfigParser.RawConfigParser()
748+ config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
749+ "dsn = dbdsn\nbucket_size = 100"))
750+ o.configure(config_file)
751+ dmf = distinctdbplugin.DistinctMetricFactory()
752+ dmf.configure(o)
753+ dmr = dmf.build_metric("foo", "bar", time.time)
754+ self.assertEquals(dmr.bucket_size, 100)
755+ self.assertEquals(dmr.dsn, "dbdsn")
756+
757+
758+class TestPlugin(TestCase):
759+
760+ def test_factory(self):
761+ self.assertTrue(distinctdbplugin.distinct_metric_factory in \
762+ list(getPlugins(IMetricFactory)))
763+
764+
765+class TestDatabase(TestCase):
766+
767+ def setUp(self):
768+ rootdir = check_output(["bzr", "root"]).strip()
769+ dsn_file = os.path.join(rootdir, "tmp", "pg.dsn")
770+ self.dsn = open(dsn_file).read()
771+ self.conn = psycopg2.connect(self.dsn)
772+
773+ def tearDown(self):
774+ cr = self.conn.cursor()
775+ cr.execute("rollback")
776+ cr.execute("DELETE FROM paths")
777+ cr.execute("DELETE FROM points")
778+ cr.execute("commit")
779+
780+ def test_connect(self):
781+ cr = self.conn.cursor()
782+ cr.execute("SELECT 0")
783+ result = cr.fetchall()
784+ self.assertTrue(result, [(0,)])
785+
786+ def test_create_metric_id(self):
787+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
788+ dmr._save_bucket({}, 0)
789+ cr = self.conn.cursor()
790+ cr.execute("SELECT * FROM paths WHERE path = 'test'")
791+ cr.execute("SELECT * FROM paths")
792+ self.assertEquals(len(cr.fetchall()), 1)
793+
794+ def test_find_saved_data(self):
795+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
796+ dmr.update("one")
797+ dmr.update("one")
798+ dmr.update("two")
799+ dmr._save_bucket(dmr.bucket, 0)
800+ cr = self.conn.cursor()
801+ cr.execute("SELECT * FROM points ORDER BY value")
802+ rows = cr.fetchall()
803+ self.assertEquals(rows, [(dmr.metric_id, 0, "one", 2),
804+ (dmr.metric_id, 0, "two", 1)])
805+
806+ def test_load_metric_id(self):
807+ dmr = distinct.DistinctMetricReporter("test", dsn=self.dsn)
808+ dmr._save_bucket({}, 0)
809+ dmr2 = distinct.DistinctMetricReporter("test", dsn=self.dsn)
810+ dmr2._save_bucket({}, 0)
811+ self.assertEquals(dmr.metric_id, dmr2.metric_id)
812+
813+
814+class TestRedis(TestCase):
815+
816+ def tearDown(self):
817+ r = redis.client.Redis(host="localhost", port=16379)
818+ r.flushdb()
819+
820+ def test_connect(self):
821+ r = redis.client.Redis(host="localhost", port=16379)
822+ r.ping()
823+
824+ def test_configure(self):
825+ class TestOptions(service.OptionsGlue):
826+ optParameters = [["test", "t", "default", "help"]]
827+ config_section = "statsd"
828+
829+ o = TestOptions()
830+ config_file = ConfigParser.RawConfigParser()
831+ config_file.readfp(StringIO("[statsd]\n\n[plugin_distinctdb]\n"
832+ "redis_host = localhost\nredis_port = 16379"))
833+ o.configure(config_file)
834+ dmf = distinctdbplugin.DistinctMetricFactory()
835+ dmf.configure(o)
836+ dmr = dmf.build_metric("foo", "bar", time.time)
837+ self.assertEquals(dmr.redis_host, "localhost")
838+ self.assertEquals(dmr.redis_port, 16379)
839+
840+ def test_usage(self):
841+ dmr = distinct.DistinctMetricReporter("test",
842+ redis_host="localhost", redis_port=16379)
843+
844+ self.assertEquals(dmr.count_1hour(), 0)
845+ dmr._update_count(distinct.ONE_HOUR, "one", 0)
846+ dmr._flush_redis(1)
847+ self.assertEquals(dmr.count_1hour(), 1)
848+ dmr._update_count(distinct.ONE_HOUR, "one", 0)
849+ dmr._flush_redis(1)
850+ self.assertEquals(dmr.count_1hour(), 1)
851+ dmr._update_count(distinct.ONE_HOUR, "two", 30 * distinct.ONE_MINUTE)
852+ dmr._flush_redis(30 * distinct.ONE_MINUTE)
853+ self.assertEquals(dmr.count_1hour(), 2)
854+ dmr._flush_redis(distinct.ONE_HOUR + 10 * distinct.ONE_MINUTE)
855+ self.assertEquals(dmr.count_1hour(), 1)
856
857=== added file 'distinctdb/version.py'
858--- distinctdb/version.py 1970-01-01 00:00:00 +0000
859+++ distinctdb/version.py 2011-12-15 20:24:01 +0000
860@@ -0,0 +1,1 @@
861+distinctplugin = "0.0.1"
862
863=== added file 'setup.py'
864--- setup.py 1970-01-01 00:00:00 +0000
865+++ setup.py 2011-12-15 20:24:01 +0000
866@@ -0,0 +1,64 @@
867+from distutils.core import setup
868+from distutils.command.install import install
869+import os
870+
871+from twisted.plugin import IPlugin, getPlugins
872+
873+from distinctplugin import version
874+
875+# If setuptools is present, use it to find_packages(), and also
876+# declare our dependency on epsilon.
877+extra_setup_args = {}
878+try:
879+ import setuptools
880+ from setuptools import find_packages
881+except ImportError:
882+ def find_packages():
883+ """
884+ Compatibility wrapper.
885+
886+ Taken from storm setup.py.
887+ """
888+ packages = []
889+ for directory, subdirectories, files in os.walk("distinctplugin"):
890+ if '__init__.py' in files:
891+ packages.append(directory.replace(os.sep, '.'))
892+ return packages
893+
894+long_description = """
895+A plugin for txstatsd to count disctinct values using redis and postgres.
896+"""
897+
898+
899+class TxPluginInstaller(install):
900+ def run(self):
901+ install.run(self)
902+ # Make sure we refresh the plugin list when installing, so we know
903+ # we have enough write permissions.
904+ # see http://twistedmatrix.com/documents/current/core/howto/plugin.html
905+ # "when installing or removing software which provides Twisted plugins,
906+ # the site administrator should be sure the cache is regenerated"
907+ list(getPlugins(IPlugin))
908+
909+setup(
910+ cmdclass={'install': TxPluginInstaller},
911+ name="distinctplugin",
912+ version=version.distinctplugin,
913+ description="A txstatsd plugin for distinct counts",
914+ author="txStatsD Developers",
915+ url="https://launchpad.net/txstatsd",
916+ license="MIT",
917+ packages=find_packages() + ["twisted.plugins"],
918+ long_description=long_description,
919+ classifiers=[
920+ "Development Status :: 4 - Beta",
921+ "Intended Audience :: Developers",
922+ "Intended Audience :: System Administrators",
923+ "Intended Audience :: Information Technology",
924+ "Programming Language :: Python",
925+ "Topic :: Database",
926+ "Topic :: Internet :: WWW/HTTP",
927+ "License :: OSI Approved :: MIT License",
928+ ],
929+ **extra_setup_args
930+ )
931
932=== renamed file 'setup.py' => 'setup.py.moved'
933=== added directory 'twisted'
934=== renamed directory 'twisted' => 'twisted.moved'
935=== added directory 'twisted/plugins'
936=== added file 'twisted/plugins/distinctdbplugin.py'
937--- twisted/plugins/distinctdbplugin.py 1970-01-01 00:00:00 +0000
938+++ twisted/plugins/distinctdbplugin.py 2011-12-15 20:24:01 +0000
939@@ -0,0 +1,38 @@
940+from zope.interface import implements
941+
942+from twisted.plugin import IPlugin
943+from txstatsd.itxstatsd import IMetricFactory
944+from distinctdb.distinctmetric import DistinctMetricReporter, ONE_DAY
945+
946+
947+class DistinctMetricFactory(object):
948+ implements(IMetricFactory, IPlugin)
949+
950+ name = "distinct"
951+ metric_type = "d"
952+
953+ bucket_size = None
954+ dsn = None
955+ metric_ids = None
956+
957+ def build_metric(self, prefix, name, wall_time_func=None):
958+ return DistinctMetricReporter(name, prefix=prefix,
959+ wall_time_func=wall_time_func,
960+ bucket_size=self.bucket_size,
961+ dsn=self.dsn, redis_host=self.redis_host,
962+ redis_port=self.redis_port)
963+
964+ def configure(self, options):
965+ self.section = dict(options.get("plugin_distinctdb", {}))
966+ try:
967+ self.bucket_size = int(self.section.get("bucket_size", ONE_DAY))
968+ except ValueError:
969+ self.bucket_size = ONE_DAY
970+
971+ self.dsn = self.section.get("dsn", None)
972+ self.redis_host = self.section.get("redis_host", None)
973+ self.redis_port = self.section.get("redis_port", None)
974+ if self.redis_port is not None:
975+ self.redis_port = int(self.redis_port)
976+
977+distinct_metric_factory = DistinctMetricFactory()

Subscribers

People subscribed via source and target branches