Merge lp:~sidnei/txstatsd/platform-and-process-stats into lp:txstatsd

Proposed by Sidnei da Silva
Status: Merged
Merged at revision: 12
Proposed branch: lp:~sidnei/txstatsd/platform-and-process-stats
Merge into: lp:txstatsd
Diff against target: 754 lines (+562/-22)
10 files modified
.bzrignore (+1/-0)
example-stats-client.tac (+10/-3)
statsd.tac (+1/-2)
txstatsd/metrics.py (+24/-2)
txstatsd/process.py (+171/-0)
txstatsd/protocol.py (+9/-8)
txstatsd/report.py (+46/-0)
txstatsd/service.py (+32/-7)
txstatsd/tests/test_process.py (+266/-0)
txstatsd/tests/test_service.py (+2/-0)
To merge this branch: bzr merge lp:~sidnei/txstatsd/platform-and-process-stats
Reviewer Review Type Date Requested Status
Lucio Torre (community) Approve
Review via email: mp+67195@code.launchpad.net

Description of the change

Add process and platform stats reading from U1. Allow for easily reporting a set of stats from the statsd server and from each statsd client.

To post a comment you must log in.
15. By Sidnei da Silva

- Get memory/cpu percent through psutil.

16. By Sidnei da Silva

- Report io counters if psutil implements them.

17. By Sidnei da Silva

- Wrong name

18. By Sidnei da Silva

- Add network reporting

19. By Sidnei da Silva

- Filter out non-tcp

20. By Sidnei da Silva

- Rename some keys

21. By Sidnei da Silva

- Add simple reporter classes for twisted threadpool and reactor.

22. By Sidnei da Silva

- Report reactor by default

23. By Sidnei da Silva

- Change meminfo and loadvg prefixes.

Revision history for this message
Lucio Torre (lucio.torre) wrote :

1. The metric prefixes have to end with ".", which seems error prone. The code should add that.
2. To report the metrics you are using counters, i know we need gauges and we dont have them, but incrementing because someone else divides later seems like we are leaking details.
3. Changing the interface of StatsDClientProtocol to take a function that is executed as an argument to report_stats that reads a file and then calls the target function seems like too much coupling for this specific use case.
4. socket.gethostname() will not work as metrics prefix on EC2, where the hostname is the internal ip and might change after rollouts.

I dont have solutions to some of those issues. Its your call.

24. By Sidnei da Silva

- Do not require a trailing dot for prefix.
- Separate process stats reporting into a slightly more generic ReportingService.
- Make statsd plugin prefix configurable.

Revision history for this message
Sidnei da Silva (sidnei) wrote :

On Mon, Jul 11, 2011 at 9:48 PM, Lucio Torre <email address hidden> wrote:
> 1. The metric prefixes have to end with ".", which seems error prone. The code should add that.

The idea was that by not adding the '.' in the code, someone could use
an empty prefix, but that use-case is much more unlikely I guess.
Changed it to forcibly add '.'.

> 2. To report the metrics you are using counters, i know we need gauges and we dont have them, but incrementing because someone else divides later seems like we are leaking details.

Refactored a bit to make it easy to change from counters to gauges
when they are added (by passing a report_function around).

> 3. Changing the interface of StatsDClientProtocol to take a function that is executed as an argument to report_stats that reads a file and then calls the target function seems like too much coupling for this specific use case.

Removed that and created a ReportingService that keeps track of a list
of LoopingCall objects instead, with a schedule(function, interval,
report_function) method.

> 4. socket.gethostname() will not work as metrics prefix on EC2, where the hostname is the internal ip and might change after rollouts.

Indeed. I guess it'd be good practice to set a proper hostname once
you get the instance up. Anyway, made the prefix configurable through
the plugin by adding a --prefix argument.

25. By Sidnei da Silva

- Don't fail if get_num_threads is not available.

26. By Sidnei da Silva

- Forgot to add report module

Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.bzrignore'
2--- .bzrignore 2011-06-20 19:11:22 +0000
3+++ .bzrignore 2011-07-12 13:56:27 +0000
4@@ -1,1 +1,2 @@
5 _trial_temp*
6+twisted/plugins/dropin.cache
7
8=== modified file 'example-stats-client.tac'
9--- example-stats-client.tac 2011-06-23 14:19:21 +0000
10+++ example-stats-client.tac 2011-07-12 13:56:27 +0000
11@@ -8,11 +8,18 @@
12
13 from txstatsd.protocol import StatsDClientProtocol
14 from txstatsd.metrics import TransportMeter
15+from txstatsd.process import PROCESS_STATS
16+from txstatsd.report import ReportingService
17
18
19 application = Application("example-stats-client")
20-meter = TransportMeter(prefix=socket.gethostname())
21-
22+meter = TransportMeter(prefix=socket.gethostname() + ".example-client")
23+
24+reporting = ReportingService()
25+reporting.setServiceParent(application)
26+
27+for report in PROCESS_STATS:
28+ reporting.schedule(report, 10, meter.increment)
29
30 def random_walker(name):
31 """Meters a random walk."""
32@@ -35,5 +42,5 @@
33 t.start(0.5, now=False)
34
35
36-protocol = StatsDClientProtocol("127.0.0.1", 8125, meter)
37+protocol = StatsDClientProtocol("127.0.0.1", 8125, meter, 6000)
38 reactor.listenUDP(0, protocol)
39
40=== modified file 'statsd.tac'
41--- statsd.tac 2011-07-04 21:43:01 +0000
42+++ statsd.tac 2011-07-12 13:56:27 +0000
43@@ -4,6 +4,5 @@
44
45 application = Application("statsd")
46
47-statsd_service = service.createService(service.StatsdOptions())
48+statsd_service = service.createService(service.StatsDOptions())
49 statsd_service.setServiceParent(application)
50-
51
52=== modified file 'txstatsd/metrics.py'
53--- txstatsd/metrics.py 2011-07-05 05:27:22 +0000
54+++ txstatsd/metrics.py 2011-07-12 13:56:27 +0000
55@@ -55,6 +55,18 @@
56 raise NotImplementedError()
57
58
59+class InProcessMeter(BaseMeter):
60+ """A meter that can be used inside the C{StatsD} daemon itself."""
61+
62+ def __init__(self, processor, prefix="", sample_rate=1):
63+ self.processor = processor
64+ BaseMeter.__init__(self, prefix=prefix, sample_rate=sample_rate)
65+
66+ def write(self, data):
67+ """Pass the data along directly to the C{Processor}."""
68+ self.processor.process(data)
69+
70+
71 class Meter(BaseMeter):
72 """A trivial, non-Twisted-dependent meter."""
73
74@@ -118,12 +130,22 @@
75 host = None
76 port = None
77
78- def connected(self, transport, host, port):
79+ def __init__(self, prefix="", sample_rate=1,
80+ connect_callback=None, disconnect_callback=None):
81+ self.connect_callback = connect_callback
82+ self.disconnect_callback = disconnect_callback
83+ BaseMeter.__init__(self, prefix=prefix, sample_rate=sample_rate)
84+
85+ def connect(self, transport, host, port):
86 self.transport = transport
87 self.host = host
88 self.port = port
89+ if self.connect_callback is not None:
90+ self.connect_callback()
91
92- def disconnected(self):
93+ def disconnect(self):
94+ if self.disconnect_callback is not None:
95+ self.disconnect_callback()
96 self.transport = self.host = self.port = None
97
98 def write(self, data):
99
100=== added file 'txstatsd/process.py'
101--- txstatsd/process.py 1970-01-01 00:00:00 +0000
102+++ txstatsd/process.py 2011-07-12 13:56:27 +0000
103@@ -0,0 +1,171 @@
104+import os
105+import socket
106+import psutil
107+
108+from functools import update_wrapper
109+
110+from twisted.internet import defer, fdesc, error
111+
112+
113+MEMINFO_KEYS = ("MemTotal:", "MemFree:", "Buffers:",
114+ "Cached:", "SwapCached:", "SwapTotal:",
115+ "SwapFree:")
116+
117+MULTIPLIERS = {"kB": 1024, "mB": 1024 * 1024}
118+
119+
120+def load_file(filename):
121+ """Load a file into memory with non blocking reads."""
122+
123+ fd = os.open(filename, os.O_RDONLY)
124+ fdesc.setNonBlocking(fd)
125+
126+ chunks = []
127+ d = defer.Deferred()
128+
129+ def read_loop(data=None):
130+ """Inner loop."""
131+ if data is not None:
132+ chunks.append(data)
133+ r = fdesc.readFromFD(fd, read_loop)
134+ if isinstance(r, error.ConnectionDone):
135+ os.close(fd)
136+ d.callback("".join(chunks))
137+ elif r is not None:
138+ os.close(fd)
139+ d.errback(r)
140+
141+ read_loop("")
142+ return d
143+
144+
145+def parse_meminfo(data, prefix="sys.mem"):
146+ """Parse data from /proc/meminfo."""
147+ result = {}
148+
149+ for line in data.split("\n"):
150+ if not line:
151+ continue
152+ parts = [x for x in line.split(" ") if x]
153+ if not parts[0] in MEMINFO_KEYS:
154+ continue
155+
156+ multiple = 1
157+
158+ if len(parts) == 3:
159+ multiple = MULTIPLIERS[parts[2]]
160+
161+ # remove ':'
162+ label = parts[0][:-1]
163+ amount = int(parts[1]) * multiple
164+ result[prefix + "." + label] = amount
165+
166+ return result
167+
168+
169+def parse_loadavg(data, prefix="sys.loadavg"):
170+ """Parse data from /proc/loadavg."""
171+ return dict(zip(
172+ (prefix + ".oneminute",
173+ prefix + ".fiveminutes",
174+ prefix + ".fifthteenminutes"),
175+ [float(x) for x in data.split()[:3]]))
176+
177+
178+def report_process_memory_and_cpu(process=psutil.Process(os.getpid()),
179+ prefix="proc"):
180+ """Report memory and CPU stats for C{process}."""
181+ vsize, rss = process.get_memory_info()
182+ utime, stime = process.get_cpu_times()
183+ result = {prefix + ".cpu.percent": process.get_cpu_percent(),
184+ prefix + ".cpu.user": utime,
185+ prefix + ".cpu.system": stime,
186+ prefix + ".memory.percent": process.get_memory_percent(),
187+ prefix + ".memory.vsize": vsize,
188+ prefix + ".memory.rss": rss}
189+ if getattr(process, "get_num_threads", None) is not None:
190+ result[prefix + ".threads"] = process.get_num_threads()
191+ return result
192+
193+
194+def report_process_io_counters(process=psutil.Process(os.getpid()),
195+ prefix="proc.io"):
196+ """Report IO statistics for C{process}."""
197+ result = {}
198+ if getattr(process, "get_io_counters", None) is not None:
199+ (read_count, write_count,
200+ read_bytes, write_bytes) = process.get_io_counters()
201+ result.update({
202+ prefix + ".read.count": read_count,
203+ prefix + ".write.count": write_count,
204+ prefix + ".read.bytes": read_bytes,
205+ prefix + ".write.bytes": write_bytes})
206+ return result
207+
208+
209+def report_process_net_stats(process=psutil.Process(os.getpid()),
210+ prefix="proc.net"):
211+ """Report active connection statistics for C{process}."""
212+ result = {}
213+ if getattr(process, "get_connections", None) is not None:
214+ for connection in process.get_connections():
215+ fd, family, _type, laddr, raddr, status = connection
216+ if _type == socket.SOCK_STREAM:
217+ key = prefix + ".status.%s" % status.lower()
218+ if not key in result:
219+ result[key] = 1
220+ else:
221+ result[key] += 1
222+ return result
223+
224+
225+def report_system_stats(prefix="sys"):
226+ cpu_times = psutil.cpu_times()
227+ return {prefix + ".cpu.idle": cpu_times.idle,
228+ prefix + ".cpu.iowait": cpu_times.iowait,
229+ prefix + ".cpu.irq": cpu_times.irq,
230+ prefix + ".cpu.nice": cpu_times.nice,
231+ prefix + ".cpu.system": cpu_times.system,
232+ prefix + ".cpu.user": cpu_times.user}
233+
234+
235+def report_threadpool_stats(threadpool, prefix="threadpool"):
236+ """Report stats about a given threadpool."""
237+ def report():
238+ return {prefix + ".working": len(threadpool.working),
239+ prefix + ".queue": threadpool.q.qsize(),
240+ prefix + ".waiters": len(threadpool.waiters),
241+ prefix + ".threads": len(threadpool.threads)}
242+ update_wrapper(report, report_threadpool_stats)
243+ return report
244+
245+
246+def report_reactor_stats(reactor, prefix="reactor"):
247+ """Report statistics about a twisted reactor."""
248+ def report():
249+ return {prefix + ".readers": len(reactor.getReaders()),
250+ prefix + ".writers": len(reactor.getWriters())}
251+
252+ update_wrapper(report, report_reactor_stats)
253+ return report
254+
255+
256+def report_file_stats(filename, parser):
257+ """Read statistics from a file and report them."""
258+ def report():
259+ deferred = load_file(filename)
260+ deferred.addCallback(parser)
261+ return deferred
262+ update_wrapper(report, report_file_stats)
263+ return report
264+
265+
266+PROCESS_STATS = (report_process_memory_and_cpu,)
267+
268+IO_STATS = (report_process_io_counters,)
269+
270+NET_STATS = (report_process_net_stats,)
271+
272+SYSTEM_STATS = (report_file_stats("/proc/meminfo", parse_meminfo),
273+ report_file_stats("/proc/loadavg", parse_loadavg),
274+ report_system_stats)
275
276=== modified file 'txstatsd/protocol.py'
277--- txstatsd/protocol.py 2011-06-22 00:18:58 +0000
278+++ txstatsd/protocol.py 2011-07-12 13:56:27 +0000
279@@ -1,7 +1,7 @@
280 import logging
281
282 from twisted.python import log
283-from twisted.internet import task
284+from twisted.internet import task, defer
285 from twisted.protocols.basic import LineOnlyReceiver
286 from twisted.internet.protocol import (
287 DatagramProtocol, ReconnectingClientFactory)
288@@ -27,21 +27,22 @@
289 class StatsDClientProtocol(DatagramProtocol):
290 """A Twisted-based implementation of the StatsD client protocol.
291
292- Data is sent via ConnectedUDP to a StatsD server for aggregation.
293+ Data is sent via UDP to a StatsD server for aggregation.
294 """
295
296- def __init__(self, host, port, meter):
297+ def __init__(self, host, port, meter, interval=None):
298 self.host = host
299 self.port = port
300 self.meter = meter
301+ self.interval = interval
302
303 def startProtocol(self):
304 """Connect to destination host."""
305- self.meter.connected(self.transport, self.host, self.port)
306+ self.meter.connect(self.transport, self.host, self.port)
307
308 def stopProtocol(self):
309 """Connection was lost."""
310- self.meter.disconnected()
311+ self.meter.disconnect()
312
313
314 class GraphiteProtocol(LineOnlyReceiver):
315@@ -65,7 +66,7 @@
316 log.msg("Connected. Scheduling flush to now + %ds." %
317 (self.interval / 1000), logLevel=logging.DEBUG)
318 self.flush_task = task.LoopingCall(self.flushProcessor)
319- self.flush_task.start(self.interval / 1000)
320+ self.flush_task.start(self.interval / 1000, False)
321
322 def connectionLost(self, reason):
323 """
324@@ -77,12 +78,12 @@
325 log.msg("Canceling scheduled flush.", logLevel=logging.DEBUG)
326 self.flush_task.stop()
327
328+ @defer.inlineCallbacks
329 def flushProcessor(self):
330 """Flush messages queued in the processor to Graphite."""
331- log.msg("Flushing messages.", logLevel=logging.DEBUG)
332 for message in self.processor.flush(interval=self.interval):
333 for line in message.splitlines():
334- self.sendLine(line)
335+ yield self.sendLine(line)
336
337
338 class GraphiteClientFactory(ReconnectingClientFactory):
339
340=== added file 'txstatsd/report.py'
341--- txstatsd/report.py 1970-01-01 00:00:00 +0000
342+++ txstatsd/report.py 2011-07-12 13:56:27 +0000
343@@ -0,0 +1,46 @@
344+from twisted.internet.defer import maybeDeferred
345+from twisted.internet.task import LoopingCall
346+from twisted.python import log
347+
348+from functools import wraps
349+
350+from twisted.application.service import Service
351+
352+
353+class ReportingService(Service):
354+
355+ def __init__(self):
356+ self.tasks = []
357+
358+ def schedule(self, function, interval, report_function):
359+ """
360+ Schedule C{function} to be called every C{interval} seconds and then
361+ report gathered metrics to C{Graphite} using C{report_function}.
362+ """
363+ task = LoopingCall(self.wrapped(function, report_function))
364+ self.tasks.append((task, interval))
365+
366+ def wrapped(self, function, report_function):
367+ def report_metrics(metrics):
368+ """For each metric returned, call C{report_function} with it."""
369+ for name, value in metrics.items():
370+ report_function(name, value)
371+ return metrics
372+
373+ @wraps(function)
374+ def wrapper():
375+ """Wrap C{function} to report metrics or log a failure."""
376+ deferred = maybeDeferred(function)
377+ deferred.addCallback(report_metrics)
378+ deferred.addErrback(lambda failure: log.err(
379+ failure, "Error while processing %s" % function.func_name))
380+ return deferred
381+ return wrapper
382+
383+ def startService(self):
384+ for task, interval in self.tasks:
385+ task.start(interval, now=False)
386+
387+ def stopService(self):
388+ for task, interval in self.tasks:
389+ task.stop()
390
391=== modified file 'txstatsd/service.py'
392--- txstatsd/service.py 2011-07-05 05:27:22 +0000
393+++ txstatsd/service.py 2011-07-12 13:56:27 +0000
394@@ -1,11 +1,16 @@
395+import socket
396 import ConfigParser
397
398 from twisted.application.internet import TCPClient, UDPServer
399 from twisted.application.service import MultiService
400+from twisted.internet import reactor
401 from twisted.python import usage, util
402
403+from txstatsd import process
404+from txstatsd.metrics import InProcessMeter
405 from txstatsd.processor import MessageProcessor
406 from txstatsd.protocol import GraphiteClientFactory, StatsDServerProtocol
407+from txstatsd.report import ReportingService
408
409 _unset = object()
410
411@@ -73,13 +78,17 @@
412 """
413 glue_parameters = [
414 ["carbon-cache-host", "h", "localhost",
415- "The host where carbon cache is listening."],
416+ "The host where carbon cache is listening."],
417 ["carbon-cache-port", "p", 2003,
418- "The port where carbon cache is listening.", int],
419+ "The port where carbon cache is listening.", int],
420 ["listen-port", "l", 8125,
421- "The UDP port where we will listen.", int],
422+ "The UDP port where we will listen.", int],
423 ["flush-interval", "i", 10000,
424- "The number of milliseconds between each flush.", int],
425+ "The number of milliseconds between each flush.", int],
426+ ["prefix", "p", None,
427+ "Prefix to use when reporting stats.", str],
428+ ["report", "r", None,
429+ "Which additional stats to report {process|net|io|system}.", str],
430 ]
431
432
433@@ -89,11 +98,27 @@
434 service = MultiService()
435 service.setName("statsd")
436 processor = MessageProcessor()
437+ prefix = options["prefix"]
438+ if prefix is None:
439+ prefix = socket.gethostname() + ".statsd"
440+
441+ meter = InProcessMeter(processor, prefix=prefix)
442+
443+ if options["report"] is not None:
444+ reporting = ReportingService()
445+ reporting.setServiceParent(service)
446+ reporting.schedule(
447+ process.report_reactor_stats(reactor), 10, meter.increment)
448+ reports = [name.strip() for name in options["report"].split(",")]
449+ for report_name in reports:
450+ for reporter in getattr(process, "%s_STATS" %
451+ report_name.upper(), ()):
452+ reporting.schedule(reporter, 10, meter.increment)
453
454 factory = GraphiteClientFactory(processor, options["flush-interval"])
455- client = TCPClient(
456- options["carbon-cache-host"], options["carbon-cache-port"],
457- factory)
458+ client = TCPClient(options["carbon-cache-host"],
459+ options["carbon-cache-port"],
460+ factory)
461 client.setServiceParent(service)
462
463 listener = UDPServer(options["listen-port"],
464
465=== added file 'txstatsd/tests/test_process.py'
466--- txstatsd/tests/test_process.py 1970-01-01 00:00:00 +0000
467+++ txstatsd/tests/test_process.py 2011-07-12 13:56:27 +0000
468@@ -0,0 +1,266 @@
469+import os
470+import psutil
471+
472+from mocker import MockerTestCase
473+from twisted.internet import defer
474+from twisted.trial.unittest import TestCase
475+
476+from txstatsd.process import (
477+ load_file, parse_meminfo, parse_loadavg,
478+ report_process_memory_and_cpu, report_process_io_counters,
479+ report_process_net_stats, report_system_stats,
480+ report_reactor_stats, report_threadpool_stats)
481+
482+
483+meminfo = """\
484+MemTotal: 8190436 kB
485+MemFree: 995724 kB
486+Buffers: 8052 kB
487+Cached: 344824 kB
488+SwapCached: 170828 kB
489+Active: 4342436 kB
490+Inactive: 907076 kB
491+Active(anon): 4210168 kB
492+Inactive(anon): 756096 kB
493+Active(file): 132268 kB
494+Inactive(file): 150980 kB
495+Unevictable: 641692 kB
496+Mlocked: 641676 kB
497+SwapTotal: 23993336 kB
498+SwapFree: 22750588 kB
499+Dirty: 740 kB
500+Writeback: 0 kB
501+AnonPages: 5453396 kB
502+Mapped: 259524 kB
503+Shmem: 69952 kB
504+Slab: 142444 kB
505+SReclaimable: 83188 kB
506+SUnreclaim: 59256 kB
507+KernelStack: 16144 kB
508+PageTables: 88384 kB
509+NFS_Unstable: 0 kB
510+Bounce: 0 kB
511+WritebackTmp: 0 kB
512+CommitLimit: 28088552 kB
513+Committed_AS: 11178564 kB
514+VmallocTotal: 34359738367 kB
515+VmallocUsed: 156208 kB
516+VmallocChunk: 34359579400 kB
517+HardwareCorrupted: 0 kB
518+HugePages_Total: 0
519+HugePages_Free: 0
520+HugePages_Rsvd: 0
521+HugePages_Surp: 0
522+Hugepagesize: 2048 kB
523+DirectMap4k: 7968640 kB
524+DirectMap2M: 415744 kB"""
525+
526+
527+class TestSystemPerformance(TestCase, MockerTestCase):
528+ """Test system performance monitoring."""
529+
530+ def assertSuccess(self, deferred, result=None):
531+ """
532+ Assert that the given C{deferred} results in the given C{result}.
533+ """
534+ self.assertTrue(isinstance(deferred, defer.Deferred))
535+ return deferred.addCallback(self.assertEqual, result)
536+
537+ def test_read(self):
538+ """We can read files non blocking."""
539+ d = load_file(__file__)
540+ return self.assertSuccess(d, open(__file__).read())
541+
542+ def test_loadinfo(self):
543+ """We understand loadinfo."""
544+ loadinfo = "1.02 1.08 1.14 2/2015 19420"
545+ self.assertEqual(parse_loadavg(loadinfo), {
546+ "sys.loadavg.oneminute": 1.02,
547+ "sys.loadavg.fiveminutes": 1.08,
548+ "sys.loadavg.fifthteenminutes": 1.14})
549+
550+ def test_meminfo(self):
551+ """We understand meminfo."""
552+ r = parse_meminfo(meminfo)
553+ self.assertEqual(r['sys.mem.Buffers'], 8052 * 1024)
554+ self.assert_('sys.mem.HugePages_Rsvd' not in r)
555+
556+ def test_statinfo(self):
557+ """System stat info is collected through psutil."""
558+ cpu_times = psutil.cpu_times()
559+ mock = self.mocker.replace("psutil.cpu_times")
560+ self.expect(mock()).result(cpu_times)
561+ self.mocker.replay()
562+
563+ result = report_system_stats()
564+ self.assertEqual(cpu_times.idle, result["sys.cpu.idle"])
565+ self.assertEqual(cpu_times.iowait, result["sys.cpu.iowait"])
566+ self.assertEqual(cpu_times.irq, result["sys.cpu.irq"])
567+ self.assertEqual(cpu_times.nice, result["sys.cpu.nice"])
568+ self.assertEqual(cpu_times.system, result["sys.cpu.system"])
569+ self.assertEqual(cpu_times.user, result["sys.cpu.user"])
570+
571+ def test_self_statinfo(self):
572+ """
573+ Process stat info is collected through psutil.
574+
575+ If the L{Process} implementation does not have C{get_num_threads} then
576+ the number of threads will not be included in the output.
577+ """
578+ process = psutil.Process(os.getpid())
579+ vsize, rss = process.get_memory_info()
580+ utime, stime = process.get_cpu_times()
581+ cpu_percent = process.get_cpu_percent()
582+ memory_percent = process.get_memory_percent()
583+
584+ mock = self.mocker.mock()
585+ self.expect(mock.get_memory_info()).result((vsize, rss))
586+ self.expect(mock.get_cpu_times()).result((utime, stime))
587+ self.expect(mock.get_cpu_percent()).result(cpu_percent)
588+ self.expect(mock.get_memory_percent()).result(memory_percent)
589+ self.expect(mock.get_num_threads).result(None)
590+ self.mocker.replay()
591+
592+ result = report_process_memory_and_cpu(process=mock)
593+ self.assertEqual(utime, result["proc.cpu.user"])
594+ self.assertEqual(stime, result["proc.cpu.system"])
595+ self.assertEqual(cpu_percent, result["proc.cpu.percent"])
596+ self.assertEqual(vsize, result["proc.memory.vsize"])
597+ self.assertEqual(rss, result["proc.memory.rss"])
598+ self.assertEqual(memory_percent, result["proc.memory.percent"])
599+ self.failIf("proc.threads" in result)
600+
601+ def test_self_statinfo_with_num_threads(self):
602+ """
603+ Process stat info is collected through psutil.
604+
605+ If the L{Process} implementation contains C{get_num_threads} then the
606+ number of threads will be included in the output.
607+
608+ """
609+ process = psutil.Process(os.getpid())
610+ vsize, rss = process.get_memory_info()
611+ utime, stime = process.get_cpu_times()
612+ cpu_percent = process.get_cpu_percent()
613+ memory_percent = process.get_memory_percent()
614+
615+ mock = self.mocker.mock()
616+ self.expect(mock.get_memory_info()).result((vsize, rss))
617+ self.expect(mock.get_cpu_times()).result((utime, stime))
618+ self.expect(mock.get_cpu_percent()).result(cpu_percent)
619+ self.expect(mock.get_memory_percent()).result(memory_percent)
620+ self.expect(mock.get_num_threads()).result(1)
621+ self.mocker.replay()
622+
623+ result = report_process_memory_and_cpu(process=mock)
624+ self.assertEqual(utime, result["proc.cpu.user"])
625+ self.assertEqual(stime, result["proc.cpu.system"])
626+ self.assertEqual(cpu_percent, result["proc.cpu.percent"])
627+ self.assertEqual(vsize, result["proc.memory.vsize"])
628+ self.assertEqual(rss, result["proc.memory.rss"])
629+ self.assertEqual(memory_percent, result["proc.memory.percent"])
630+ self.assertEqual(1, result["proc.threads"])
631+
632+ def test_ioinfo(self):
633+ """Process IO info is collected through psutil."""
634+ mock = self.mocker.mock()
635+ self.expect(mock.get_io_counters).result(None)
636+ self.mocker.replay()
637+
638+ # If the version of psutil doesn't have the C{get_io_counters},
639+ # then io stats are not included in the output.
640+ result = report_process_io_counters(process=mock)
641+ self.failIf("proc.io.read.count" in result)
642+ self.failIf("proc.io.write.count" in result)
643+ self.failIf("proc.io.read.bytes" in result)
644+ self.failIf("proc.io.write.bytes" in result)
645+
646+ def test_ioinfo_with_get_io_counters(self):
647+ """
648+ Process IO info is collected through psutil.
649+
650+ If C{get_io_counters} is implemented by the L{Process} object,
651+ then io information will be returned with the process information.
652+ """
653+ io_counters = (10, 42, 125, 16)
654+
655+ mock = self.mocker.mock()
656+ self.expect(mock.get_io_counters).result(mock)
657+ self.expect(mock.get_io_counters()).result(io_counters)
658+ self.mocker.replay()
659+
660+ result = report_process_io_counters(process=mock)
661+ self.assertEqual(10, result["proc.io.read.count"])
662+ self.assertEqual(42, result["proc.io.write.count"])
663+ self.assertEqual(125, result["proc.io.read.bytes"])
664+ self.assertEqual(16, result["proc.io.write.bytes"])
665+
666+ def test_netinfo_no_get_connections(self):
667+ """
668+ Process connection info is collected through psutil.
669+
670+ If the version of psutil doesn't implement C{get_connections} for
671+ L{Process}, then no information is returned.
672+ """
673+ mock = self.mocker.mock()
674+ self.expect(mock.get_connections).result(None)
675+ self.mocker.replay()
676+
677+ # If the version of psutil doesn't have the C{get_io_counters},
678+ # then io stats are not included in the output.
679+ result = report_process_net_stats(process=mock)
680+ self.failIf("proc.net.status.established" in result)
681+
682+ def test_netinfo_with_get_connections(self):
683+ """
684+ Process connection info is collected through psutil.
685+
686+ If the version of psutil implements C{get_connections} for L{Process},
687+ then a count of connections in each state is returned.
688+ """
689+ connections = [
690+ (115, 2, 1, ("10.0.0.1", 48776),
691+ ("93.186.135.91", 80), "ESTABLISHED"),
692+ (117, 2, 1, ("10.0.0.1", 43761),
693+ ("72.14.234.100", 80), "CLOSING"),
694+ (119, 2, 1, ("10.0.0.1", 60759),
695+ ("72.14.234.104", 80), "ESTABLISHED"),
696+ (123, 2, 1, ("10.0.0.1", 51314),
697+ ("72.14.234.83", 443), "SYN_SENT")
698+ ]
699+
700+ mock = self.mocker.mock()
701+ self.expect(mock.get_connections).result(mock)
702+ self.expect(mock.get_connections()).result(connections)
703+ self.mocker.replay()
704+
705+ result = report_process_net_stats(process=mock)
706+ self.assertEqual(2, result["proc.net.status.established"])
707+ self.assertEqual(1, result["proc.net.status.closing"])
708+ self.assertEqual(1, result["proc.net.status.syn_sent"])
709+
710+ def test_reactor_stats(self):
711+ """Given a twisted reactor, pull out some stats from it."""
712+ mock = self.mocker.mock()
713+ self.expect(mock.getReaders()).result([None, None, None])
714+ self.expect(mock.getWriters()).result([None, None])
715+ self.mocker.replay()
716+
717+ result = report_reactor_stats(mock)()
718+ self.assertEqual(3, result["reactor.readers"])
719+ self.assertEqual(2, result["reactor.writers"])
720+
721+ def test_threadpool_stats(self):
722+ """Given a twisted threadpool, pull out some stats from it."""
723+ mock = self.mocker.mock()
724+ self.expect(mock.q.qsize()).result(42)
725+ self.expect(mock.threads).result(6 * [None])
726+ self.expect(mock.waiters).result(2 * [None])
727+ self.expect(mock.working).result(4 * [None])
728+ self.mocker.replay()
729+
730+ result = report_threadpool_stats(mock)()
731+ self.assertEqual(42, result["threadpool.queue"])
732+ self.assertEqual(6, result["threadpool.threads"])
733+ self.assertEqual(2, result["threadpool.waiters"])
734+ self.assertEqual(4, result["threadpool.working"])
735
736=== modified file 'txstatsd/tests/test_service.py'
737--- txstatsd/tests/test_service.py 2011-07-05 05:27:22 +0000
738+++ txstatsd/tests/test_service.py 2011-07-12 13:56:27 +0000
739@@ -6,6 +6,7 @@
740
741
742 class GlueOptionsTestCase(TestCase):
743+
744 def test_defaults(self):
745 """
746 Defaults get passed over to the instance.
747@@ -94,6 +95,7 @@
748
749
750 class ServiceTests(TestCase):
751+
752 def test_service(self):
753 """
754 The StatsD service can be created.

Subscribers

People subscribed via source and target branches