Merge lp:~sidnei/txstatsd/split-counters into lp:txstatsd

Proposed by Sidnei da Silva
Status: Merged
Approved by: Sidnei da Silva
Approved revision: 65
Merged at revision: 66
Proposed branch: lp:~sidnei/txstatsd/split-counters
Merge into: lp:txstatsd
Diff against target: 343 lines (+97/-67)
3 files modified
txstatsd/process.py (+42/-32)
txstatsd/service.py (+10/-6)
txstatsd/tests/test_process.py (+45/-29)
To merge this branch: bzr merge lp:~sidnei/txstatsd/split-counters
Reviewer Review Type Date Requested Status
Lucio Torre (community) Approve
Review via email: mp+90558@code.launchpad.net

Commit message

Fix counter-like reports to report delta since last call

Description of the change

Fix counter-like reporters to report delta since last call.

To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txstatsd/process.py'
2--- txstatsd/process.py 2011-11-18 17:13:52 +0000
3+++ txstatsd/process.py 2012-01-28 00:28:23 +0000
4@@ -4,8 +4,6 @@
5
6 from functools import update_wrapper
7
8-from twisted.internet import defer, fdesc, error
9-
10
11 MEMINFO_KEYS = ("MemTotal:", "MemFree:", "Buffers:",
12 "Cached:", "SwapCached:", "SwapTotal:",
13@@ -15,27 +13,9 @@
14
15
16 def load_file(filename):
17- """Load a file into memory with non blocking reads."""
18- fd = os.open(filename, os.O_RDONLY)
19- fdesc.setNonBlocking(fd)
20-
21- chunks = []
22- d = defer.Deferred()
23-
24- def read_loop(data=None):
25- """Inner loop."""
26- if data is not None:
27- chunks.append(data)
28- r = fdesc.readFromFD(fd, read_loop)
29- if isinstance(r, error.ConnectionDone):
30- os.close(fd)
31- d.callback("".join(chunks))
32- elif r is not None:
33- os.close(fd)
34- d.errback(r)
35-
36- read_loop("")
37- return d
38+ """Load a file into memory."""
39+ with open(filename, "r") as f:
40+ return f.read()
41
42
43 def parse_meminfo(data, prefix="sys.mem"):
44@@ -110,10 +90,7 @@
45 def get_memory_and_cpu(self, prefix="proc"):
46 """Report memory and CPU stats for C{process}."""
47 vsize, rss = self.process.get_memory_info()
48- utime, stime = self.process.get_cpu_times()
49 result = {prefix + ".cpu.percent": self.process.get_cpu_percent(),
50- prefix + ".cpu.user": utime,
51- prefix + ".cpu.system": stime,
52 prefix + ".memory.percent":
53 self.process.get_memory_percent(),
54 prefix + ".memory.vsize": vsize,
55@@ -122,6 +99,13 @@
56 result[prefix + ".threads"] = self.process.get_num_threads()
57 return result
58
59+ def get_cpu_counters(self, prefix="proc"):
60+ """Report memory and CPU counters for C{process}."""
61+ utime, stime = self.process.get_cpu_times()
62+ result = {prefix + ".cpu.user": utime,
63+ prefix + ".cpu.system": stime}
64+ return result
65+
66 def get_io_counters(self, prefix="proc.io"):
67 """Report IO statistics for C{process}."""
68 result = {}
69@@ -150,9 +134,34 @@
70 return result
71
72
73+def report_counters(report_function):
74+ """
75+ Report difference between last value and current value for wrapped
76+ function.
77+ """
78+ def generate():
79+ last_value = None
80+ while True:
81+ result = {}
82+ new_value = report_function()
83+ if last_value is None:
84+ last_value = new_value
85+ else:
86+ for key, value in new_value.iteritems():
87+ result[key] = value - last_value[key]
88+ last_value = new_value
89+ yield result
90+ generator = generate()
91+ def report():
92+ return generator.next()
93+ update_wrapper(report, report_function)
94+ return report
95+
96+
97 process_report = ProcessReport()
98 report_process_memory_and_cpu = process_report.get_memory_and_cpu
99-report_process_io_counters = process_report.get_io_counters
100+report_process_cpu_counters = report_counters(process_report.get_cpu_counters)
101+report_process_io_counters = report_counters(process_report.get_io_counters)
102 report_process_net_stats = process_report.get_net_stats
103
104
105@@ -188,20 +197,21 @@
106 def report_file_stats(filename, parser):
107 """Read statistics from a file and report them."""
108 def report():
109- deferred = load_file(filename)
110- deferred.addCallback(parser)
111- return deferred
112+ return parser(load_file(filename))
113 update_wrapper(report, report_file_stats)
114 return report
115
116
117 PROCESS_STATS = (report_process_memory_and_cpu,)
118
119+COUNTER_STATS = (report_process_cpu_counters,)
120+
121 IO_STATS = (report_process_io_counters,)
122
123 NET_STATS = (report_process_net_stats,)
124
125 SYSTEM_STATS = (report_file_stats("/proc/meminfo", parse_meminfo),
126 report_file_stats("/proc/loadavg", parse_loadavg),
127- report_file_stats("/proc/net/dev", parse_netdev),
128- report_system_stats)
129+ report_counters(report_file_stats("/proc/net/dev", parse_netdev)),
130+ report_counters(report_system_stats),
131+ )
132
133=== modified file 'txstatsd/service.py'
134--- txstatsd/service.py 2012-01-27 16:11:50 +0000
135+++ txstatsd/service.py 2012-01-28 00:28:23 +0000
136@@ -11,6 +11,7 @@
137
138 from txstatsd.client import InternalClient
139 from txstatsd.metrics.metrics import Metrics
140+from txstatsd.metrics.extendedmetrics import ExtendedMetrics
141 from txstatsd.server.processor import MessageProcessor
142 from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor
143 from txstatsd.server.protocol import (
144@@ -235,13 +236,13 @@
145 processor = MessageProcessor(plugins=plugin_metrics)
146 input_router = Router(processor, options['routing'], root_service)
147 connection = InternalClient(input_router)
148- metrics = Metrics(connection, namespace=prefix)
149+ metrics = Metrics(connection)
150 else:
151 processor = ConfigurableMessageProcessor(message_prefix=prefix,
152 plugins=plugin_metrics)
153 input_router = Router(processor, options['routing'], root_service)
154 connection = InternalClient(input_router)
155- metrics = Metrics(connection)
156+ metrics = ExtendedMetrics(connection)
157
158 if not options["carbon-cache-host"]:
159 options["carbon-cache-host"].append("127.0.0.1")
160@@ -259,20 +260,23 @@
161
162 # Schedule updates for those metrics expecting to be
163 # periodically updated, for example the meter metric.
164- reporting.schedule(processor.update_metrics, 5, None)
165- reporting.schedule(report_client_manager_stats, 10, metrics.gauge)
166+ reporting.schedule(processor.update_metrics, 10, None)
167+ reporting.schedule(report_client_manager_stats,
168+ options["flush-interval"] / 1000,
169+ metrics.gauge)
170
171 if options["report"] is not None:
172 from txstatsd import process
173 from twisted.internet import reactor
174
175 reporting.schedule(
176- process.report_reactor_stats(reactor), 10, metrics.gauge)
177+ process.report_reactor_stats(reactor), 60, metrics.gauge)
178 reports = [name.strip() for name in options["report"].split(",")]
179 for report_name in reports:
180 for reporter in getattr(process, "%s_STATS" %
181 report_name.upper(), ()):
182- reporting.schedule(reporter, 10, metrics.gauge)
183+ reporting.schedule(reporter, 60, metrics.gauge)
184+
185
186 # XXX Make this configurable.
187 router = ConsistentHashingRouter()
188
189=== modified file 'txstatsd/tests/test_process.py'
190--- txstatsd/tests/test_process.py 2011-08-08 09:28:45 +0000
191+++ txstatsd/tests/test_process.py 2012-01-28 00:28:23 +0000
192@@ -3,12 +3,11 @@
193 import sys
194
195 from mocker import MockerTestCase
196-from twisted.internet import defer
197 from twisted.trial.unittest import TestCase
198
199 from txstatsd.process import (
200 ProcessReport, load_file, parse_meminfo, parse_loadavg, parse_netdev,
201- report_system_stats, report_reactor_stats, report_threadpool_stats)
202+ report_system_stats, report_reactor_stats, report_threadpool_stats, report_counters)
203
204
205 meminfo = """\
206@@ -65,18 +64,6 @@
207 class TestSystemPerformance(TestCase, MockerTestCase):
208 """Test system performance monitoring."""
209
210- def assertSuccess(self, deferred, result=None):
211- """
212- Assert that the given C{deferred} results in the given C{result}.
213- """
214- self.assertTrue(isinstance(deferred, defer.Deferred))
215- return deferred.addCallback(self.assertEqual, result)
216-
217- def test_read(self):
218- """We can read files non blocking."""
219- d = load_file(__file__)
220- return self.assertSuccess(d, open(__file__).read())
221-
222 def test_loadinfo(self):
223 """We understand loadinfo."""
224 loadinfo = "1.02 1.08 1.14 2/2015 19420"
225@@ -91,8 +78,8 @@
226 self.assertEqual(r['sys.mem.Buffers'], 8052 * 1024)
227 self.assert_('sys.mem.HugePages_Rsvd' not in r)
228
229- def test_statinfo(self):
230- """System stat info is collected through psutil."""
231+ def test_cpu_counters(self):
232+ """System cpu counters are collected through psutil."""
233 cpu_times = psutil.cpu_times()
234 mock = self.mocker.replace("psutil.cpu_times")
235 self.expect(mock()).result(cpu_times)
236@@ -121,39 +108,50 @@
237 self.assertEqual(cpu_times.idle, result["sys.cpu.idle"])
238 self.assertEqual(cpu_times.irq, result["sys.cpu.irq"])
239
240- def test_self_statinfo(self):
241+ def test_self_cpu_and_memory_stats(self):
242 """
243- Process stat info is collected through psutil.
244+ Process cpu and memory stats are collected through psutil.
245
246 If the L{Process} implementation does not have C{get_num_threads} then
247 the number of threads will not be included in the output.
248 """
249 process = psutil.Process(os.getpid())
250 vsize, rss = process.get_memory_info()
251- utime, stime = process.get_cpu_times()
252 cpu_percent = process.get_cpu_percent()
253 memory_percent = process.get_memory_percent()
254
255 mock = self.mocker.mock()
256 self.expect(mock.get_memory_info()).result((vsize, rss))
257- self.expect(mock.get_cpu_times()).result((utime, stime))
258 self.expect(mock.get_cpu_percent()).result(cpu_percent)
259 self.expect(mock.get_memory_percent()).result(memory_percent)
260 self.expect(mock.get_num_threads).result(None)
261 self.mocker.replay()
262
263 result = ProcessReport(process=mock).get_memory_and_cpu()
264- self.assertEqual(utime, result["proc.cpu.user"])
265- self.assertEqual(stime, result["proc.cpu.system"])
266 self.assertEqual(cpu_percent, result["proc.cpu.percent"])
267 self.assertEqual(vsize, result["proc.memory.vsize"])
268 self.assertEqual(rss, result["proc.memory.rss"])
269 self.assertEqual(memory_percent, result["proc.memory.percent"])
270 self.failIf("proc.threads" in result)
271
272- def test_self_statinfo_with_num_threads(self):
273- """
274- Process stat info is collected through psutil.
275+ def test_self_cpu_counters(self):
276+ """
277+ Process cpu counters are collected through psutil.
278+ """
279+ process = psutil.Process(os.getpid())
280+ utime, stime = process.get_cpu_times()
281+
282+ mock = self.mocker.mock()
283+ self.expect(mock.get_cpu_times()).result((utime, stime))
284+ self.mocker.replay()
285+
286+ result = ProcessReport(process=mock).get_cpu_counters()
287+ self.assertEqual(utime, result["proc.cpu.user"])
288+ self.assertEqual(stime, result["proc.cpu.system"])
289+
290+ def test_self_cpu_and_memory_stats_with_num_threads(self):
291+ """
292+ Process cpu and memory stats are collected through psutil.
293
294 If the L{Process} implementation contains C{get_num_threads} then the
295 number of threads will be included in the output.
296@@ -161,21 +159,17 @@
297 """
298 process = psutil.Process(os.getpid())
299 vsize, rss = process.get_memory_info()
300- utime, stime = process.get_cpu_times()
301 cpu_percent = process.get_cpu_percent()
302 memory_percent = process.get_memory_percent()
303
304 mock = self.mocker.mock()
305 self.expect(mock.get_memory_info()).result((vsize, rss))
306- self.expect(mock.get_cpu_times()).result((utime, stime))
307 self.expect(mock.get_cpu_percent()).result(cpu_percent)
308 self.expect(mock.get_memory_percent()).result(memory_percent)
309 self.expect(mock.get_num_threads()).result(1)
310 self.mocker.replay()
311
312 result = ProcessReport(process=mock).get_memory_and_cpu()
313- self.assertEqual(utime, result["proc.cpu.user"])
314- self.assertEqual(stime, result["proc.cpu.system"])
315 self.assertEqual(cpu_percent, result["proc.cpu.percent"])
316 self.assertEqual(vsize, result["proc.memory.vsize"])
317 self.assertEqual(rss, result["proc.memory.rss"])
318@@ -304,3 +298,25 @@
319 "sys.net.tun0.bytes.sent": 5226635,
320 "sys.net.tun0.packets.received": 24837,
321 "sys.net.tun0.packets.sent": 26986})
322+
323+ def test_report_counters(self):
324+ """
325+ C{report_counters} keeps the last value of a called function and on the
326+ next call returns the difference between current return value and
327+ previous return value.
328+ """
329+ def generate():
330+ yield {"foo": 1}
331+ yield {"foo": 5}
332+ yield {"foo": 10}
333+ yield {"foo": 17}
334+ generate = generate()
335+ def reporter():
336+ return generate.next()
337+ wrapped = report_counters(reporter)
338+ self.assertEqual({}, wrapped())
339+ self.assertEqual({"foo": 4}, wrapped())
340+ self.assertEqual({"foo": 5}, wrapped())
341+ self.assertEqual({"foo": 7}, wrapped())
342+
343+

Subscribers

People subscribed via source and target branches