Merge lp:~ahasenack/txstatsd/txstatsd-packaging into lp:txstatsd

Proposed by Andreas Hasenack
Status: Superseded
Proposed branch: lp:~ahasenack/txstatsd/txstatsd-packaging
Merge into: lp:txstatsd
Diff against target: 1035 lines (+792/-22)
20 files modified
.bzrignore (+1/-0)
debian/changelog (+5/-0)
debian/compat (+1/-0)
debian/control (+15/-0)
debian/copyright (+43/-0)
debian/docs (+4/-0)
debian/postinst (+39/-0)
debian/postrm (+37/-0)
debian/preinst (+35/-0)
debian/prerm (+38/-0)
debian/rules (+13/-0)
example-stats-client.tac (+10/-3)
statsd.tac (+1/-2)
txstatsd/metrics.py (+24/-2)
txstatsd/process.py (+171/-0)
txstatsd/protocol.py (+9/-8)
txstatsd/report.py (+46/-0)
txstatsd/service.py (+32/-7)
txstatsd/tests/test_process.py (+266/-0)
txstatsd/tests/test_service.py (+2/-0)
To merge this branch: bzr merge lp:~ahasenack/txstatsd/txstatsd-packaging
Reviewer Review Type Date Requested Status
Landscape Pending
Landscape Pending
Review via email: mp+67638@code.launchpad.net

Description of the change

This branch adds a debian/ structure to the txstatsd source tree. It builds and installs, and python sees the module after installation, as does twistd.

The {post,pre}inst and {post,pre}rm scripts are straight from dh_make, I didn't change those, and seem to do the right thing. After installation, I checked and postinst had a section added by dh_pysupport.

These are the lint warnings I get:
W: python-txstatsd source: out-of-date-standards-version 3.8.3 (current is 3.9.1)
W: python-txstatsd: maintainer-script-empty postrm
W: python-txstatsd: maintainer-script-empty preinst

To post a comment you must log in.
Revision history for this message
Sidnei da Silva (sidnei) wrote :

python-psutil needs to be added as a dependency.

15. By Andreas Hasenack

Merged with trunk (the ~landscape one, not upstream).

16. By Andreas Hasenack

Added dependency for python-psutil.

17. By Andreas Hasenack

Include example-stats-client.tac in the documentation.

18. By Andreas Hasenack

One more example file for the docs.

19. By Andreas Hasenack

- Merged with trunk again.
- Added statsd.tac to the documentation directory.

20. By Andreas Hasenack

Change source package name to just txstatsd.

21. By Andreas Hasenack

- Changed source package name to just txstatsd
- Changed build dependency from python-twisted to python-twisted-core

22. By Andreas Hasenack

Merged with trunk to grab missing report.py.

23. By Andreas Hasenack

Adjusted dependencies.

24. By Andreas Hasenack

Actually, no need for python-dev as this is noarch.

25. By Andreas Hasenack

Fixed version/release numbers.

Unmerged revisions

25. By Andreas Hasenack

Fixed version/release numbers.

24. By Andreas Hasenack

Actually, no need for python-dev as this is noarch.

23. By Andreas Hasenack

Adjusted dependencies.

22. By Andreas Hasenack

Merged with trunk to grab missing report.py.

21. By Andreas Hasenack

- Changed source package name to just txstatsd
- Changed build dependency from python-twisted to python-twisted-core

20. By Andreas Hasenack

Change source package name to just txstatsd.

19. By Andreas Hasenack

- Merged with trunk again.
- Added statsd.tac to the documentation directory.

18. By Andreas Hasenack

One more example file for the docs.

17. By Andreas Hasenack

Include example-stats-client.tac in the documentation.

16. By Andreas Hasenack

Added dependency for python-psutil.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file '.bzrignore'
--- .bzrignore 2011-06-20 19:11:22 +0000
+++ .bzrignore 2011-07-12 14:22:56 +0000
@@ -1,1 +1,2 @@
1_trial_temp*1_trial_temp*
2twisted/plugins/dropin.cache
23
=== added directory 'debian'
=== added file 'debian/changelog'
--- debian/changelog 1970-01-01 00:00:00 +0000
+++ debian/changelog 2011-07-12 14:22:56 +0000
@@ -0,0 +1,5 @@
1txstatsd (0.1.0-0ubuntu1) unstable; urgency=low
2
3 * Initial Release.
4
5 -- Andreas Hasenack <andreas@canonical.com> Mon, 11 Jul 2011 19:29:34 -0300
06
=== added file 'debian/compat'
--- debian/compat 1970-01-01 00:00:00 +0000
+++ debian/compat 2011-07-12 14:22:56 +0000
@@ -0,0 +1,1 @@
17
02
=== added file 'debian/control'
--- debian/control 1970-01-01 00:00:00 +0000
+++ debian/control 2011-07-12 14:22:56 +0000
@@ -0,0 +1,15 @@
1Source: txstatsd
2Section: python
3Priority: extra
4Maintainer: Andreas Hasenack <andreas@canonical.com>
5Build-Depends: debhelper (>= 7), python-twisted-core, python-support
6Standards-Version: 3.8.3
7Homepage: https://launchpad.net/txstatsd
8
9Package: python-txstatsd
10Architecture: all
11Depends: ${python:Depends}, ${misc:Depends}, python-psutil, python-twisted-core
12Description: A network daemon for aggregating statistics
13 A network daemon for aggregating statistics (counters and timers), rolling
14 them up, then sending them to graphite. Really a port of
15 https://github.com/etsy/statsd to twisted.
016
=== added file 'debian/copyright'
--- debian/copyright 1970-01-01 00:00:00 +0000
+++ debian/copyright 2011-07-12 14:22:56 +0000
@@ -0,0 +1,43 @@
1This work was packaged for Debian by:
2
3 Andreas Hasenack <andreas@canonical.com> on Mon, 11 Jul 2011 19:29:34 -0300
4
5It was downloaded from:
6
7 https://launchpad.net/txstatsd
8
9Upstream Author(s):
10
11 Sidnei da Silva <sidnei@canonical.com>
12
13Copyright:
14
15 <Copyright (C) 2011 Canonical Services Ltd>
16
17License:
18
19 Permission is hereby granted, free of charge, to any person obtaining
20 a copy of this software and associated documentation files (the
21 "Software"), to deal in the Software without restriction, including
22 without limitation the rights to use, copy, modify, merge, publish,
23 distribute, sublicense, and/or sell copies of the Software, and to
24 permit persons to whom the Software is furnished to do so, subject to
25 the following conditions:
26
27 The above copyright notice and this permission notice shall be
28 included in all copies or substantial portions of the Software.
29
30 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
31 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
32 MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
33 IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
34 CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
35 TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
36 SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
37
38The Debian packaging is:
39
40 Copyright (C) 2011 Andreas Hasenack <andreas@canonical.com>
41
42and is licensed under MIT.
43
044
=== added file 'debian/docs'
--- debian/docs 1970-01-01 00:00:00 +0000
+++ debian/docs 2011-07-12 14:22:56 +0000
@@ -0,0 +1,4 @@
1README
2example-stats-client.tac
3txstatsd.conf-example
4statsd.tac
05
=== added file 'debian/postinst'
--- debian/postinst 1970-01-01 00:00:00 +0000
+++ debian/postinst 2011-07-12 14:22:56 +0000
@@ -0,0 +1,39 @@
1#!/bin/sh
2# postinst script for python-txstatsd
3#
4# see: dh_installdeb(1)
5
6set -e
7
8# summary of how this script can be called:
9# * <postinst> `configure' <most-recently-configured-version>
10# * <old-postinst> `abort-upgrade' <new version>
11# * <conflictor's-postinst> `abort-remove' `in-favour' <package>
12# <new-version>
13# * <postinst> `abort-remove'
14# * <deconfigured's-postinst> `abort-deconfigure' `in-favour'
15# <failed-install-package> <version> `removing'
16# <conflicting-package> <version>
17# for details, see http://www.debian.org/doc/debian-policy/ or
18# the debian-policy package
19
20
21case "$1" in
22 configure)
23 ;;
24
25 abort-upgrade|abort-remove|abort-deconfigure)
26 ;;
27
28 *)
29 echo "postinst called with unknown argument \`$1'" >&2
30 exit 1
31 ;;
32esac
33
34# dh_installdeb will replace this with shell code automatically
35# generated by other debhelper scripts.
36
37#DEBHELPER#
38
39exit 0
040
=== added file 'debian/postrm'
--- debian/postrm 1970-01-01 00:00:00 +0000
+++ debian/postrm 2011-07-12 14:22:56 +0000
@@ -0,0 +1,37 @@
1#!/bin/sh
2# postrm script for python-txstatsd
3#
4# see: dh_installdeb(1)
5
6set -e
7
8# summary of how this script can be called:
9# * <postrm> `remove'
10# * <postrm> `purge'
11# * <old-postrm> `upgrade' <new-version>
12# * <new-postrm> `failed-upgrade' <old-version>
13# * <new-postrm> `abort-install'
14# * <new-postrm> `abort-install' <old-version>
15# * <new-postrm> `abort-upgrade' <old-version>
16# * <disappearer's-postrm> `disappear' <overwriter>
17# <overwriter-version>
18# for details, see http://www.debian.org/doc/debian-policy/ or
19# the debian-policy package
20
21
22case "$1" in
23 purge|remove|upgrade|failed-upgrade|abort-install|abort-upgrade|disappear)
24 ;;
25
26 *)
27 echo "postrm called with unknown argument \`$1'" >&2
28 exit 1
29 ;;
30esac
31
32# dh_installdeb will replace this with shell code automatically
33# generated by other debhelper scripts.
34
35#DEBHELPER#
36
37exit 0
038
=== added file 'debian/preinst'
--- debian/preinst 1970-01-01 00:00:00 +0000
+++ debian/preinst 2011-07-12 14:22:56 +0000
@@ -0,0 +1,35 @@
1#!/bin/sh
2# preinst script for python-txstatsd
3#
4# see: dh_installdeb(1)
5
6set -e
7
8# summary of how this script can be called:
9# * <new-preinst> `install'
10# * <new-preinst> `install' <old-version>
11# * <new-preinst> `upgrade' <old-version>
12# * <old-preinst> `abort-upgrade' <new-version>
13# for details, see http://www.debian.org/doc/debian-policy/ or
14# the debian-policy package
15
16
17case "$1" in
18 install|upgrade)
19 ;;
20
21 abort-upgrade)
22 ;;
23
24 *)
25 echo "preinst called with unknown argument \`$1'" >&2
26 exit 1
27 ;;
28esac
29
30# dh_installdeb will replace this with shell code automatically
31# generated by other debhelper scripts.
32
33#DEBHELPER#
34
35exit 0
036
=== added file 'debian/prerm'
--- debian/prerm 1970-01-01 00:00:00 +0000
+++ debian/prerm 2011-07-12 14:22:56 +0000
@@ -0,0 +1,38 @@
1#!/bin/sh
2# prerm script for python-txstatsd
3#
4# see: dh_installdeb(1)
5
6set -e
7
8# summary of how this script can be called:
9# * <prerm> `remove'
10# * <old-prerm> `upgrade' <new-version>
11# * <new-prerm> `failed-upgrade' <old-version>
12# * <conflictor's-prerm> `remove' `in-favour' <package> <new-version>
13# * <deconfigured's-prerm> `deconfigure' `in-favour'
14# <package-being-installed> <version> `removing'
15# <conflicting-package> <version>
16# for details, see http://www.debian.org/doc/debian-policy/ or
17# the debian-policy package
18
19
20case "$1" in
21 remove|upgrade|deconfigure)
22 ;;
23
24 failed-upgrade)
25 ;;
26
27 *)
28 echo "prerm called with unknown argument \`$1'" >&2
29 exit 1
30 ;;
31esac
32
33# dh_installdeb will replace this with shell code automatically
34# generated by other debhelper scripts.
35
36#DEBHELPER#
37
38exit 0
039
=== added file 'debian/rules'
--- debian/rules 1970-01-01 00:00:00 +0000
+++ debian/rules 2011-07-12 14:22:56 +0000
@@ -0,0 +1,13 @@
1#!/usr/bin/make -f
2# -*- makefile -*-
3# Sample debian/rules that uses debhelper.
4# This file was originally written by Joey Hess and Craig Small.
5# As a special exception, when this file is copied by dh-make into a
6# dh-make output file, you may use that output file without restriction.
7# This special exception was added by Craig Small in version 0.37 of dh-make.
8
9# Uncomment this to turn on verbose mode.
10#export DH_VERBOSE=1
11
12%:
13 dh $@
014
=== modified file 'example-stats-client.tac'
--- example-stats-client.tac 2011-06-23 14:19:21 +0000
+++ example-stats-client.tac 2011-07-12 14:22:56 +0000
@@ -8,11 +8,18 @@
88
9from txstatsd.protocol import StatsDClientProtocol9from txstatsd.protocol import StatsDClientProtocol
10from txstatsd.metrics import TransportMeter10from txstatsd.metrics import TransportMeter
11from txstatsd.process import PROCESS_STATS
12from txstatsd.report import ReportingService
1113
1214
13application = Application("example-stats-client")15application = Application("example-stats-client")
14meter = TransportMeter(prefix=socket.gethostname())16meter = TransportMeter(prefix=socket.gethostname() + ".example-client")
1517
18reporting = ReportingService()
19reporting.setServiceParent(application)
20
21for report in PROCESS_STATS:
22 reporting.schedule(report, 10, meter.increment)
1623
17def random_walker(name):24def random_walker(name):
18 """Meters a random walk."""25 """Meters a random walk."""
@@ -35,5 +42,5 @@
35 t.start(0.5, now=False)42 t.start(0.5, now=False)
3643
3744
38protocol = StatsDClientProtocol("127.0.0.1", 8125, meter)45protocol = StatsDClientProtocol("127.0.0.1", 8125, meter, 6000)
39reactor.listenUDP(0, protocol)46reactor.listenUDP(0, protocol)
4047
=== modified file 'statsd.tac'
--- statsd.tac 2011-07-04 21:43:01 +0000
+++ statsd.tac 2011-07-12 14:22:56 +0000
@@ -4,6 +4,5 @@
44
5application = Application("statsd")5application = Application("statsd")
66
7statsd_service = service.createService(service.StatsdOptions())7statsd_service = service.createService(service.StatsDOptions())
8statsd_service.setServiceParent(application)8statsd_service.setServiceParent(application)
9
109
=== modified file 'txstatsd/metrics.py'
--- txstatsd/metrics.py 2011-07-05 05:27:22 +0000
+++ txstatsd/metrics.py 2011-07-12 14:22:56 +0000
@@ -55,6 +55,18 @@
55 raise NotImplementedError()55 raise NotImplementedError()
5656
5757
58class InProcessMeter(BaseMeter):
59 """A meter that can be used inside the C{StatsD} daemon itself."""
60
61 def __init__(self, processor, prefix="", sample_rate=1):
62 self.processor = processor
63 BaseMeter.__init__(self, prefix=prefix, sample_rate=sample_rate)
64
65 def write(self, data):
66 """Pass the data along directly to the C{Processor}."""
67 self.processor.process(data)
68
69
58class Meter(BaseMeter):70class Meter(BaseMeter):
59 """A trivial, non-Twisted-dependent meter."""71 """A trivial, non-Twisted-dependent meter."""
6072
@@ -118,12 +130,22 @@
118 host = None130 host = None
119 port = None131 port = None
120132
121 def connected(self, transport, host, port):133 def __init__(self, prefix="", sample_rate=1,
134 connect_callback=None, disconnect_callback=None):
135 self.connect_callback = connect_callback
136 self.disconnect_callback = disconnect_callback
137 BaseMeter.__init__(self, prefix=prefix, sample_rate=sample_rate)
138
139 def connect(self, transport, host, port):
122 self.transport = transport140 self.transport = transport
123 self.host = host141 self.host = host
124 self.port = port142 self.port = port
143 if self.connect_callback is not None:
144 self.connect_callback()
125145
126 def disconnected(self):146 def disconnect(self):
147 if self.disconnect_callback is not None:
148 self.disconnect_callback()
127 self.transport = self.host = self.port = None149 self.transport = self.host = self.port = None
128150
129 def write(self, data):151 def write(self, data):
130152
=== added file 'txstatsd/process.py'
--- txstatsd/process.py 1970-01-01 00:00:00 +0000
+++ txstatsd/process.py 2011-07-12 14:22:56 +0000
@@ -0,0 +1,171 @@
1import os
2import socket
3import psutil
4
5from functools import update_wrapper
6
7from twisted.internet import defer, fdesc, error
8
9
10MEMINFO_KEYS = ("MemTotal:", "MemFree:", "Buffers:",
11 "Cached:", "SwapCached:", "SwapTotal:",
12 "SwapFree:")
13
14MULTIPLIERS = {"kB": 1024, "mB": 1024 * 1024}
15
16
17def load_file(filename):
18 """Load a file into memory with non blocking reads."""
19
20 fd = os.open(filename, os.O_RDONLY)
21 fdesc.setNonBlocking(fd)
22
23 chunks = []
24 d = defer.Deferred()
25
26 def read_loop(data=None):
27 """Inner loop."""
28 if data is not None:
29 chunks.append(data)
30 r = fdesc.readFromFD(fd, read_loop)
31 if isinstance(r, error.ConnectionDone):
32 os.close(fd)
33 d.callback("".join(chunks))
34 elif r is not None:
35 os.close(fd)
36 d.errback(r)
37
38 read_loop("")
39 return d
40
41
42def parse_meminfo(data, prefix="sys.mem"):
43 """Parse data from /proc/meminfo."""
44 result = {}
45
46 for line in data.split("\n"):
47 if not line:
48 continue
49 parts = [x for x in line.split(" ") if x]
50 if not parts[0] in MEMINFO_KEYS:
51 continue
52
53 multiple = 1
54
55 if len(parts) == 3:
56 multiple = MULTIPLIERS[parts[2]]
57
58 # remove ':'
59 label = parts[0][:-1]
60 amount = int(parts[1]) * multiple
61 result[prefix + "." + label] = amount
62
63 return result
64
65
66def parse_loadavg(data, prefix="sys.loadavg"):
67 """Parse data from /proc/loadavg."""
68 return dict(zip(
69 (prefix + ".oneminute",
70 prefix + ".fiveminutes",
71 prefix + ".fifthteenminutes"),
72 [float(x) for x in data.split()[:3]]))
73
74
75def report_process_memory_and_cpu(process=psutil.Process(os.getpid()),
76 prefix="proc"):
77 """Report memory and CPU stats for C{process}."""
78 vsize, rss = process.get_memory_info()
79 utime, stime = process.get_cpu_times()
80 result = {prefix + ".cpu.percent": process.get_cpu_percent(),
81 prefix + ".cpu.user": utime,
82 prefix + ".cpu.system": stime,
83 prefix + ".memory.percent": process.get_memory_percent(),
84 prefix + ".memory.vsize": vsize,
85 prefix + ".memory.rss": rss}
86 if getattr(process, "get_num_threads", None) is not None:
87 result[prefix + ".threads"] = process.get_num_threads()
88 return result
89
90
91def report_process_io_counters(process=psutil.Process(os.getpid()),
92 prefix="proc.io"):
93 """Report IO statistics for C{process}."""
94 result = {}
95 if getattr(process, "get_io_counters", None) is not None:
96 (read_count, write_count,
97 read_bytes, write_bytes) = process.get_io_counters()
98 result.update({
99 prefix + ".read.count": read_count,
100 prefix + ".write.count": write_count,
101 prefix + ".read.bytes": read_bytes,
102 prefix + ".write.bytes": write_bytes})
103 return result
104
105
106def report_process_net_stats(process=psutil.Process(os.getpid()),
107 prefix="proc.net"):
108 """Report active connection statistics for C{process}."""
109 result = {}
110 if getattr(process, "get_connections", None) is not None:
111 for connection in process.get_connections():
112 fd, family, _type, laddr, raddr, status = connection
113 if _type == socket.SOCK_STREAM:
114 key = prefix + ".status.%s" % status.lower()
115 if not key in result:
116 result[key] = 1
117 else:
118 result[key] += 1
119 return result
120
121
122def report_system_stats(prefix="sys"):
123 cpu_times = psutil.cpu_times()
124 return {prefix + ".cpu.idle": cpu_times.idle,
125 prefix + ".cpu.iowait": cpu_times.iowait,
126 prefix + ".cpu.irq": cpu_times.irq,
127 prefix + ".cpu.nice": cpu_times.nice,
128 prefix + ".cpu.system": cpu_times.system,
129 prefix + ".cpu.user": cpu_times.user}
130
131
132def report_threadpool_stats(threadpool, prefix="threadpool"):
133 """Report stats about a given threadpool."""
134 def report():
135 return {prefix + ".working": len(threadpool.working),
136 prefix + ".queue": threadpool.q.qsize(),
137 prefix + ".waiters": len(threadpool.waiters),
138 prefix + ".threads": len(threadpool.threads)}
139 update_wrapper(report, report_threadpool_stats)
140 return report
141
142
143def report_reactor_stats(reactor, prefix="reactor"):
144 """Report statistics about a twisted reactor."""
145 def report():
146 return {prefix + ".readers": len(reactor.getReaders()),
147 prefix + ".writers": len(reactor.getWriters())}
148
149 update_wrapper(report, report_reactor_stats)
150 return report
151
152
153def report_file_stats(filename, parser):
154 """Read statistics from a file and report them."""
155 def report():
156 deferred = load_file(filename)
157 deferred.addCallback(parser)
158 return deferred
159 update_wrapper(report, report_file_stats)
160 return report
161
162
163PROCESS_STATS = (report_process_memory_and_cpu,)
164
165IO_STATS = (report_process_io_counters,)
166
167NET_STATS = (report_process_net_stats,)
168
169SYSTEM_STATS = (report_file_stats("/proc/meminfo", parse_meminfo),
170 report_file_stats("/proc/loadavg", parse_loadavg),
171 report_system_stats)
0172
=== modified file 'txstatsd/protocol.py'
--- txstatsd/protocol.py 2011-06-22 00:18:58 +0000
+++ txstatsd/protocol.py 2011-07-12 14:22:56 +0000
@@ -1,7 +1,7 @@
1import logging1import logging
22
3from twisted.python import log3from twisted.python import log
4from twisted.internet import task4from twisted.internet import task, defer
5from twisted.protocols.basic import LineOnlyReceiver5from twisted.protocols.basic import LineOnlyReceiver
6from twisted.internet.protocol import (6from twisted.internet.protocol import (
7 DatagramProtocol, ReconnectingClientFactory)7 DatagramProtocol, ReconnectingClientFactory)
@@ -27,21 +27,22 @@
27class StatsDClientProtocol(DatagramProtocol):27class StatsDClientProtocol(DatagramProtocol):
28 """A Twisted-based implementation of the StatsD client protocol.28 """A Twisted-based implementation of the StatsD client protocol.
2929
30 Data is sent via ConnectedUDP to a StatsD server for aggregation.30 Data is sent via UDP to a StatsD server for aggregation.
31 """31 """
3232
33 def __init__(self, host, port, meter):33 def __init__(self, host, port, meter, interval=None):
34 self.host = host34 self.host = host
35 self.port = port35 self.port = port
36 self.meter = meter36 self.meter = meter
37 self.interval = interval
3738
38 def startProtocol(self):39 def startProtocol(self):
39 """Connect to destination host."""40 """Connect to destination host."""
40 self.meter.connected(self.transport, self.host, self.port)41 self.meter.connect(self.transport, self.host, self.port)
4142
42 def stopProtocol(self):43 def stopProtocol(self):
43 """Connection was lost."""44 """Connection was lost."""
44 self.meter.disconnected()45 self.meter.disconnect()
4546
4647
47class GraphiteProtocol(LineOnlyReceiver):48class GraphiteProtocol(LineOnlyReceiver):
@@ -65,7 +66,7 @@
65 log.msg("Connected. Scheduling flush to now + %ds." %66 log.msg("Connected. Scheduling flush to now + %ds." %
66 (self.interval / 1000), logLevel=logging.DEBUG)67 (self.interval / 1000), logLevel=logging.DEBUG)
67 self.flush_task = task.LoopingCall(self.flushProcessor)68 self.flush_task = task.LoopingCall(self.flushProcessor)
68 self.flush_task.start(self.interval / 1000)69 self.flush_task.start(self.interval / 1000, False)
6970
70 def connectionLost(self, reason):71 def connectionLost(self, reason):
71 """72 """
@@ -77,12 +78,12 @@
77 log.msg("Canceling scheduled flush.", logLevel=logging.DEBUG)78 log.msg("Canceling scheduled flush.", logLevel=logging.DEBUG)
78 self.flush_task.stop()79 self.flush_task.stop()
7980
81 @defer.inlineCallbacks
80 def flushProcessor(self):82 def flushProcessor(self):
81 """Flush messages queued in the processor to Graphite."""83 """Flush messages queued in the processor to Graphite."""
82 log.msg("Flushing messages.", logLevel=logging.DEBUG)
83 for message in self.processor.flush(interval=self.interval):84 for message in self.processor.flush(interval=self.interval):
84 for line in message.splitlines():85 for line in message.splitlines():
85 self.sendLine(line)86 yield self.sendLine(line)
8687
8788
88class GraphiteClientFactory(ReconnectingClientFactory):89class GraphiteClientFactory(ReconnectingClientFactory):
8990
=== added file 'txstatsd/report.py'
--- txstatsd/report.py 1970-01-01 00:00:00 +0000
+++ txstatsd/report.py 2011-07-12 14:22:56 +0000
@@ -0,0 +1,46 @@
1from twisted.internet.defer import maybeDeferred
2from twisted.internet.task import LoopingCall
3from twisted.python import log
4
5from functools import wraps
6
7from twisted.application.service import Service
8
9
10class ReportingService(Service):
11
12 def __init__(self):
13 self.tasks = []
14
15 def schedule(self, function, interval, report_function):
16 """
17 Schedule C{function} to be called every C{interval} seconds and then
18 report gathered metrics to C{Graphite} using C{report_function}.
19 """
20 task = LoopingCall(self.wrapped(function, report_function))
21 self.tasks.append((task, interval))
22
23 def wrapped(self, function, report_function):
24 def report_metrics(metrics):
25 """For each metric returned, call C{report_function} with it."""
26 for name, value in metrics.items():
27 report_function(name, value)
28 return metrics
29
30 @wraps(function)
31 def wrapper():
32 """Wrap C{function} to report metrics or log a failure."""
33 deferred = maybeDeferred(function)
34 deferred.addCallback(report_metrics)
35 deferred.addErrback(lambda failure: log.err(
36 failure, "Error while processing %s" % function.func_name))
37 return deferred
38 return wrapper
39
40 def startService(self):
41 for task, interval in self.tasks:
42 task.start(interval, now=False)
43
44 def stopService(self):
45 for task, interval in self.tasks:
46 task.stop()
047
=== modified file 'txstatsd/service.py'
--- txstatsd/service.py 2011-07-05 05:27:22 +0000
+++ txstatsd/service.py 2011-07-12 14:22:56 +0000
@@ -1,11 +1,16 @@
1import socket
1import ConfigParser2import ConfigParser
23
3from twisted.application.internet import TCPClient, UDPServer4from twisted.application.internet import TCPClient, UDPServer
4from twisted.application.service import MultiService5from twisted.application.service import MultiService
6from twisted.internet import reactor
5from twisted.python import usage, util7from twisted.python import usage, util
68
9from txstatsd import process
10from txstatsd.metrics import InProcessMeter
7from txstatsd.processor import MessageProcessor11from txstatsd.processor import MessageProcessor
8from txstatsd.protocol import GraphiteClientFactory, StatsDServerProtocol12from txstatsd.protocol import GraphiteClientFactory, StatsDServerProtocol
13from txstatsd.report import ReportingService
914
10_unset = object()15_unset = object()
1116
@@ -73,13 +78,17 @@
73 """78 """
74 glue_parameters = [79 glue_parameters = [
75 ["carbon-cache-host", "h", "localhost",80 ["carbon-cache-host", "h", "localhost",
76 "The host where carbon cache is listening."],81 "The host where carbon cache is listening."],
77 ["carbon-cache-port", "p", 2003,82 ["carbon-cache-port", "p", 2003,
78 "The port where carbon cache is listening.", int],83 "The port where carbon cache is listening.", int],
79 ["listen-port", "l", 8125,84 ["listen-port", "l", 8125,
80 "The UDP port where we will listen.", int],85 "The UDP port where we will listen.", int],
81 ["flush-interval", "i", 10000,86 ["flush-interval", "i", 10000,
82 "The number of milliseconds between each flush.", int],87 "The number of milliseconds between each flush.", int],
88 ["prefix", "p", None,
89 "Prefix to use when reporting stats.", str],
90 ["report", "r", None,
91 "Which additional stats to report {process|net|io|system}.", str],
83 ]92 ]
8493
8594
@@ -89,11 +98,27 @@
89 service = MultiService()98 service = MultiService()
90 service.setName("statsd")99 service.setName("statsd")
91 processor = MessageProcessor()100 processor = MessageProcessor()
101 prefix = options["prefix"]
102 if prefix is None:
103 prefix = socket.gethostname() + ".statsd"
104
105 meter = InProcessMeter(processor, prefix=prefix)
106
107 if options["report"] is not None:
108 reporting = ReportingService()
109 reporting.setServiceParent(service)
110 reporting.schedule(
111 process.report_reactor_stats(reactor), 10, meter.increment)
112 reports = [name.strip() for name in options["report"].split(",")]
113 for report_name in reports:
114 for reporter in getattr(process, "%s_STATS" %
115 report_name.upper(), ()):
116 reporting.schedule(reporter, 10, meter.increment)
92117
93 factory = GraphiteClientFactory(processor, options["flush-interval"])118 factory = GraphiteClientFactory(processor, options["flush-interval"])
94 client = TCPClient(119 client = TCPClient(options["carbon-cache-host"],
95 options["carbon-cache-host"], options["carbon-cache-port"],120 options["carbon-cache-port"],
96 factory)121 factory)
97 client.setServiceParent(service)122 client.setServiceParent(service)
98123
99 listener = UDPServer(options["listen-port"],124 listener = UDPServer(options["listen-port"],
100125
=== added file 'txstatsd/tests/test_process.py'
--- txstatsd/tests/test_process.py 1970-01-01 00:00:00 +0000
+++ txstatsd/tests/test_process.py 2011-07-12 14:22:56 +0000
@@ -0,0 +1,266 @@
1import os
2import psutil
3
4from mocker import MockerTestCase
5from twisted.internet import defer
6from twisted.trial.unittest import TestCase
7
8from txstatsd.process import (
9 load_file, parse_meminfo, parse_loadavg,
10 report_process_memory_and_cpu, report_process_io_counters,
11 report_process_net_stats, report_system_stats,
12 report_reactor_stats, report_threadpool_stats)
13
14
15meminfo = """\
16MemTotal: 8190436 kB
17MemFree: 995724 kB
18Buffers: 8052 kB
19Cached: 344824 kB
20SwapCached: 170828 kB
21Active: 4342436 kB
22Inactive: 907076 kB
23Active(anon): 4210168 kB
24Inactive(anon): 756096 kB
25Active(file): 132268 kB
26Inactive(file): 150980 kB
27Unevictable: 641692 kB
28Mlocked: 641676 kB
29SwapTotal: 23993336 kB
30SwapFree: 22750588 kB
31Dirty: 740 kB
32Writeback: 0 kB
33AnonPages: 5453396 kB
34Mapped: 259524 kB
35Shmem: 69952 kB
36Slab: 142444 kB
37SReclaimable: 83188 kB
38SUnreclaim: 59256 kB
39KernelStack: 16144 kB
40PageTables: 88384 kB
41NFS_Unstable: 0 kB
42Bounce: 0 kB
43WritebackTmp: 0 kB
44CommitLimit: 28088552 kB
45Committed_AS: 11178564 kB
46VmallocTotal: 34359738367 kB
47VmallocUsed: 156208 kB
48VmallocChunk: 34359579400 kB
49HardwareCorrupted: 0 kB
50HugePages_Total: 0
51HugePages_Free: 0
52HugePages_Rsvd: 0
53HugePages_Surp: 0
54Hugepagesize: 2048 kB
55DirectMap4k: 7968640 kB
56DirectMap2M: 415744 kB"""
57
58
59class TestSystemPerformance(TestCase, MockerTestCase):
60 """Test system performance monitoring."""
61
62 def assertSuccess(self, deferred, result=None):
63 """
64 Assert that the given C{deferred} results in the given C{result}.
65 """
66 self.assertTrue(isinstance(deferred, defer.Deferred))
67 return deferred.addCallback(self.assertEqual, result)
68
69 def test_read(self):
70 """We can read files non blocking."""
71 d = load_file(__file__)
72 return self.assertSuccess(d, open(__file__).read())
73
74 def test_loadinfo(self):
75 """We understand loadinfo."""
76 loadinfo = "1.02 1.08 1.14 2/2015 19420"
77 self.assertEqual(parse_loadavg(loadinfo), {
78 "sys.loadavg.oneminute": 1.02,
79 "sys.loadavg.fiveminutes": 1.08,
80 "sys.loadavg.fifthteenminutes": 1.14})
81
82 def test_meminfo(self):
83 """We understand meminfo."""
84 r = parse_meminfo(meminfo)
85 self.assertEqual(r['sys.mem.Buffers'], 8052 * 1024)
86 self.assert_('sys.mem.HugePages_Rsvd' not in r)
87
88 def test_statinfo(self):
89 """System stat info is collected through psutil."""
90 cpu_times = psutil.cpu_times()
91 mock = self.mocker.replace("psutil.cpu_times")
92 self.expect(mock()).result(cpu_times)
93 self.mocker.replay()
94
95 result = report_system_stats()
96 self.assertEqual(cpu_times.idle, result["sys.cpu.idle"])
97 self.assertEqual(cpu_times.iowait, result["sys.cpu.iowait"])
98 self.assertEqual(cpu_times.irq, result["sys.cpu.irq"])
99 self.assertEqual(cpu_times.nice, result["sys.cpu.nice"])
100 self.assertEqual(cpu_times.system, result["sys.cpu.system"])
101 self.assertEqual(cpu_times.user, result["sys.cpu.user"])
102
103 def test_self_statinfo(self):
104 """
105 Process stat info is collected through psutil.
106
107 If the L{Process} implementation does not have C{get_num_threads} then
108 the number of threads will not be included in the output.
109 """
110 process = psutil.Process(os.getpid())
111 vsize, rss = process.get_memory_info()
112 utime, stime = process.get_cpu_times()
113 cpu_percent = process.get_cpu_percent()
114 memory_percent = process.get_memory_percent()
115
116 mock = self.mocker.mock()
117 self.expect(mock.get_memory_info()).result((vsize, rss))
118 self.expect(mock.get_cpu_times()).result((utime, stime))
119 self.expect(mock.get_cpu_percent()).result(cpu_percent)
120 self.expect(mock.get_memory_percent()).result(memory_percent)
121 self.expect(mock.get_num_threads).result(None)
122 self.mocker.replay()
123
124 result = report_process_memory_and_cpu(process=mock)
125 self.assertEqual(utime, result["proc.cpu.user"])
126 self.assertEqual(stime, result["proc.cpu.system"])
127 self.assertEqual(cpu_percent, result["proc.cpu.percent"])
128 self.assertEqual(vsize, result["proc.memory.vsize"])
129 self.assertEqual(rss, result["proc.memory.rss"])
130 self.assertEqual(memory_percent, result["proc.memory.percent"])
131 self.failIf("proc.threads" in result)
132
133 def test_self_statinfo_with_num_threads(self):
134 """
135 Process stat info is collected through psutil.
136
137 If the L{Process} implementation contains C{get_num_threads} then the
138 number of threads will be included in the output.
139
140 """
141 process = psutil.Process(os.getpid())
142 vsize, rss = process.get_memory_info()
143 utime, stime = process.get_cpu_times()
144 cpu_percent = process.get_cpu_percent()
145 memory_percent = process.get_memory_percent()
146
147 mock = self.mocker.mock()
148 self.expect(mock.get_memory_info()).result((vsize, rss))
149 self.expect(mock.get_cpu_times()).result((utime, stime))
150 self.expect(mock.get_cpu_percent()).result(cpu_percent)
151 self.expect(mock.get_memory_percent()).result(memory_percent)
152 self.expect(mock.get_num_threads()).result(1)
153 self.mocker.replay()
154
155 result = report_process_memory_and_cpu(process=mock)
156 self.assertEqual(utime, result["proc.cpu.user"])
157 self.assertEqual(stime, result["proc.cpu.system"])
158 self.assertEqual(cpu_percent, result["proc.cpu.percent"])
159 self.assertEqual(vsize, result["proc.memory.vsize"])
160 self.assertEqual(rss, result["proc.memory.rss"])
161 self.assertEqual(memory_percent, result["proc.memory.percent"])
162 self.assertEqual(1, result["proc.threads"])
163
164 def test_ioinfo(self):
165 """Process IO info is collected through psutil."""
166 mock = self.mocker.mock()
167 self.expect(mock.get_io_counters).result(None)
168 self.mocker.replay()
169
170 # If the version of psutil doesn't have the C{get_io_counters},
171 # then io stats are not included in the output.
172 result = report_process_io_counters(process=mock)
173 self.failIf("proc.io.read.count" in result)
174 self.failIf("proc.io.write.count" in result)
175 self.failIf("proc.io.read.bytes" in result)
176 self.failIf("proc.io.write.bytes" in result)
177
178 def test_ioinfo_with_get_io_counters(self):
179 """
180 Process IO info is collected through psutil.
181
182 If C{get_io_counters} is implemented by the L{Process} object,
183 then io information will be returned with the process information.
184 """
185 io_counters = (10, 42, 125, 16)
186
187 mock = self.mocker.mock()
188 self.expect(mock.get_io_counters).result(mock)
189 self.expect(mock.get_io_counters()).result(io_counters)
190 self.mocker.replay()
191
192 result = report_process_io_counters(process=mock)
193 self.assertEqual(10, result["proc.io.read.count"])
194 self.assertEqual(42, result["proc.io.write.count"])
195 self.assertEqual(125, result["proc.io.read.bytes"])
196 self.assertEqual(16, result["proc.io.write.bytes"])
197
198 def test_netinfo_no_get_connections(self):
199 """
200 Process connection info is collected through psutil.
201
202 If the version of psutil doesn't implement C{get_connections} for
203 L{Process}, then no information is returned.
204 """
205 mock = self.mocker.mock()
206 self.expect(mock.get_connections).result(None)
207 self.mocker.replay()
208
209 # If the version of psutil doesn't have the C{get_io_counters},
210 # then io stats are not included in the output.
211 result = report_process_net_stats(process=mock)
212 self.failIf("proc.net.status.established" in result)
213
214 def test_netinfo_with_get_connections(self):
215 """
216 Process connection info is collected through psutil.
217
218 If the version of psutil implements C{get_connections} for L{Process},
219 then a count of connections in each state is returned.
220 """
221 connections = [
222 (115, 2, 1, ("10.0.0.1", 48776),
223 ("93.186.135.91", 80), "ESTABLISHED"),
224 (117, 2, 1, ("10.0.0.1", 43761),
225 ("72.14.234.100", 80), "CLOSING"),
226 (119, 2, 1, ("10.0.0.1", 60759),
227 ("72.14.234.104", 80), "ESTABLISHED"),
228 (123, 2, 1, ("10.0.0.1", 51314),
229 ("72.14.234.83", 443), "SYN_SENT")
230 ]
231
232 mock = self.mocker.mock()
233 self.expect(mock.get_connections).result(mock)
234 self.expect(mock.get_connections()).result(connections)
235 self.mocker.replay()
236
237 result = report_process_net_stats(process=mock)
238 self.assertEqual(2, result["proc.net.status.established"])
239 self.assertEqual(1, result["proc.net.status.closing"])
240 self.assertEqual(1, result["proc.net.status.syn_sent"])
241
242 def test_reactor_stats(self):
243 """Given a twisted reactor, pull out some stats from it."""
244 mock = self.mocker.mock()
245 self.expect(mock.getReaders()).result([None, None, None])
246 self.expect(mock.getWriters()).result([None, None])
247 self.mocker.replay()
248
249 result = report_reactor_stats(mock)()
250 self.assertEqual(3, result["reactor.readers"])
251 self.assertEqual(2, result["reactor.writers"])
252
253 def test_threadpool_stats(self):
254 """Given a twisted threadpool, pull out some stats from it."""
255 mock = self.mocker.mock()
256 self.expect(mock.q.qsize()).result(42)
257 self.expect(mock.threads).result(6 * [None])
258 self.expect(mock.waiters).result(2 * [None])
259 self.expect(mock.working).result(4 * [None])
260 self.mocker.replay()
261
262 result = report_threadpool_stats(mock)()
263 self.assertEqual(42, result["threadpool.queue"])
264 self.assertEqual(6, result["threadpool.threads"])
265 self.assertEqual(2, result["threadpool.waiters"])
266 self.assertEqual(4, result["threadpool.working"])
0267
=== modified file 'txstatsd/tests/test_service.py'
--- txstatsd/tests/test_service.py 2011-07-05 05:27:22 +0000
+++ txstatsd/tests/test_service.py 2011-07-12 14:22:56 +0000
@@ -6,6 +6,7 @@
66
77
8class GlueOptionsTestCase(TestCase):8class GlueOptionsTestCase(TestCase):
9
9 def test_defaults(self):10 def test_defaults(self):
10 """11 """
11 Defaults get passed over to the instance.12 Defaults get passed over to the instance.
@@ -94,6 +95,7 @@
9495
9596
96class ServiceTests(TestCase):97class ServiceTests(TestCase):
98
97 def test_service(self):99 def test_service(self):
98 """100 """
99 The StatsD service can be created.101 The StatsD service can be created.

Subscribers

People subscribed via source and target branches