Merge lp:~sidnei/txstatsd/split-counters into lp:txstatsd
- split-counters
- Merge into trunk
Proposed by
Sidnei da Silva
Status: | Merged |
---|---|
Approved by: | Sidnei da Silva |
Approved revision: | 65 |
Merged at revision: | 66 |
Proposed branch: | lp:~sidnei/txstatsd/split-counters |
Merge into: | lp:txstatsd |
Diff against target: |
343 lines (+97/-67) 3 files modified
txstatsd/process.py (+42/-32) txstatsd/service.py (+10/-6) txstatsd/tests/test_process.py (+45/-29) |
To merge this branch: | bzr merge lp:~sidnei/txstatsd/split-counters |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Lucio Torre (community) | Approve | ||
Review via email: mp+90558@code.launchpad.net |
Commit message
Fix counter-like reports to report delta since last call
Description of the change
Fix counter-like reporters to report delta since last call.
To post a comment you must log in.
Revision history for this message
Lucio Torre (lucio.torre) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'txstatsd/process.py' |
2 | --- txstatsd/process.py 2011-11-18 17:13:52 +0000 |
3 | +++ txstatsd/process.py 2012-01-28 00:28:23 +0000 |
4 | @@ -4,8 +4,6 @@ |
5 | |
6 | from functools import update_wrapper |
7 | |
8 | -from twisted.internet import defer, fdesc, error |
9 | - |
10 | |
11 | MEMINFO_KEYS = ("MemTotal:", "MemFree:", "Buffers:", |
12 | "Cached:", "SwapCached:", "SwapTotal:", |
13 | @@ -15,27 +13,9 @@ |
14 | |
15 | |
16 | def load_file(filename): |
17 | - """Load a file into memory with non blocking reads.""" |
18 | - fd = os.open(filename, os.O_RDONLY) |
19 | - fdesc.setNonBlocking(fd) |
20 | - |
21 | - chunks = [] |
22 | - d = defer.Deferred() |
23 | - |
24 | - def read_loop(data=None): |
25 | - """Inner loop.""" |
26 | - if data is not None: |
27 | - chunks.append(data) |
28 | - r = fdesc.readFromFD(fd, read_loop) |
29 | - if isinstance(r, error.ConnectionDone): |
30 | - os.close(fd) |
31 | - d.callback("".join(chunks)) |
32 | - elif r is not None: |
33 | - os.close(fd) |
34 | - d.errback(r) |
35 | - |
36 | - read_loop("") |
37 | - return d |
38 | + """Load a file into memory.""" |
39 | + with open(filename, "r") as f: |
40 | + return f.read() |
41 | |
42 | |
43 | def parse_meminfo(data, prefix="sys.mem"): |
44 | @@ -110,10 +90,7 @@ |
45 | def get_memory_and_cpu(self, prefix="proc"): |
46 | """Report memory and CPU stats for C{process}.""" |
47 | vsize, rss = self.process.get_memory_info() |
48 | - utime, stime = self.process.get_cpu_times() |
49 | result = {prefix + ".cpu.percent": self.process.get_cpu_percent(), |
50 | - prefix + ".cpu.user": utime, |
51 | - prefix + ".cpu.system": stime, |
52 | prefix + ".memory.percent": |
53 | self.process.get_memory_percent(), |
54 | prefix + ".memory.vsize": vsize, |
55 | @@ -122,6 +99,13 @@ |
56 | result[prefix + ".threads"] = self.process.get_num_threads() |
57 | return result |
58 | |
59 | + def get_cpu_counters(self, prefix="proc"): |
60 | + """Report memory and CPU counters for C{process}.""" |
61 | + utime, stime = self.process.get_cpu_times() |
62 | + result = {prefix + ".cpu.user": utime, |
63 | + prefix + ".cpu.system": stime} |
64 | + return result |
65 | + |
66 | def get_io_counters(self, prefix="proc.io"): |
67 | """Report IO statistics for C{process}.""" |
68 | result = {} |
69 | @@ -150,9 +134,34 @@ |
70 | return result |
71 | |
72 | |
73 | +def report_counters(report_function): |
74 | + """ |
75 | + Report difference between last value and current value for wrapped |
76 | + function. |
77 | + """ |
78 | + def generate(): |
79 | + last_value = None |
80 | + while True: |
81 | + result = {} |
82 | + new_value = report_function() |
83 | + if last_value is None: |
84 | + last_value = new_value |
85 | + else: |
86 | + for key, value in new_value.iteritems(): |
87 | + result[key] = value - last_value[key] |
88 | + last_value = new_value |
89 | + yield result |
90 | + generator = generate() |
91 | + def report(): |
92 | + return generator.next() |
93 | + update_wrapper(report, report_function) |
94 | + return report |
95 | + |
96 | + |
97 | process_report = ProcessReport() |
98 | report_process_memory_and_cpu = process_report.get_memory_and_cpu |
99 | -report_process_io_counters = process_report.get_io_counters |
100 | +report_process_cpu_counters = report_counters(process_report.get_cpu_counters) |
101 | +report_process_io_counters = report_counters(process_report.get_io_counters) |
102 | report_process_net_stats = process_report.get_net_stats |
103 | |
104 | |
105 | @@ -188,20 +197,21 @@ |
106 | def report_file_stats(filename, parser): |
107 | """Read statistics from a file and report them.""" |
108 | def report(): |
109 | - deferred = load_file(filename) |
110 | - deferred.addCallback(parser) |
111 | - return deferred |
112 | + return parser(load_file(filename)) |
113 | update_wrapper(report, report_file_stats) |
114 | return report |
115 | |
116 | |
117 | PROCESS_STATS = (report_process_memory_and_cpu,) |
118 | |
119 | +COUNTER_STATS = (report_process_cpu_counters,) |
120 | + |
121 | IO_STATS = (report_process_io_counters,) |
122 | |
123 | NET_STATS = (report_process_net_stats,) |
124 | |
125 | SYSTEM_STATS = (report_file_stats("/proc/meminfo", parse_meminfo), |
126 | report_file_stats("/proc/loadavg", parse_loadavg), |
127 | - report_file_stats("/proc/net/dev", parse_netdev), |
128 | - report_system_stats) |
129 | + report_counters(report_file_stats("/proc/net/dev", parse_netdev)), |
130 | + report_counters(report_system_stats), |
131 | + ) |
132 | |
133 | === modified file 'txstatsd/service.py' |
134 | --- txstatsd/service.py 2012-01-27 16:11:50 +0000 |
135 | +++ txstatsd/service.py 2012-01-28 00:28:23 +0000 |
136 | @@ -11,6 +11,7 @@ |
137 | |
138 | from txstatsd.client import InternalClient |
139 | from txstatsd.metrics.metrics import Metrics |
140 | +from txstatsd.metrics.extendedmetrics import ExtendedMetrics |
141 | from txstatsd.server.processor import MessageProcessor |
142 | from txstatsd.server.configurableprocessor import ConfigurableMessageProcessor |
143 | from txstatsd.server.protocol import ( |
144 | @@ -235,13 +236,13 @@ |
145 | processor = MessageProcessor(plugins=plugin_metrics) |
146 | input_router = Router(processor, options['routing'], root_service) |
147 | connection = InternalClient(input_router) |
148 | - metrics = Metrics(connection, namespace=prefix) |
149 | + metrics = Metrics(connection) |
150 | else: |
151 | processor = ConfigurableMessageProcessor(message_prefix=prefix, |
152 | plugins=plugin_metrics) |
153 | input_router = Router(processor, options['routing'], root_service) |
154 | connection = InternalClient(input_router) |
155 | - metrics = Metrics(connection) |
156 | + metrics = ExtendedMetrics(connection) |
157 | |
158 | if not options["carbon-cache-host"]: |
159 | options["carbon-cache-host"].append("127.0.0.1") |
160 | @@ -259,20 +260,23 @@ |
161 | |
162 | # Schedule updates for those metrics expecting to be |
163 | # periodically updated, for example the meter metric. |
164 | - reporting.schedule(processor.update_metrics, 5, None) |
165 | - reporting.schedule(report_client_manager_stats, 10, metrics.gauge) |
166 | + reporting.schedule(processor.update_metrics, 10, None) |
167 | + reporting.schedule(report_client_manager_stats, |
168 | + options["flush-interval"] / 1000, |
169 | + metrics.gauge) |
170 | |
171 | if options["report"] is not None: |
172 | from txstatsd import process |
173 | from twisted.internet import reactor |
174 | |
175 | reporting.schedule( |
176 | - process.report_reactor_stats(reactor), 10, metrics.gauge) |
177 | + process.report_reactor_stats(reactor), 60, metrics.gauge) |
178 | reports = [name.strip() for name in options["report"].split(",")] |
179 | for report_name in reports: |
180 | for reporter in getattr(process, "%s_STATS" % |
181 | report_name.upper(), ()): |
182 | - reporting.schedule(reporter, 10, metrics.gauge) |
183 | + reporting.schedule(reporter, 60, metrics.gauge) |
184 | + |
185 | |
186 | # XXX Make this configurable. |
187 | router = ConsistentHashingRouter() |
188 | |
189 | === modified file 'txstatsd/tests/test_process.py' |
190 | --- txstatsd/tests/test_process.py 2011-08-08 09:28:45 +0000 |
191 | +++ txstatsd/tests/test_process.py 2012-01-28 00:28:23 +0000 |
192 | @@ -3,12 +3,11 @@ |
193 | import sys |
194 | |
195 | from mocker import MockerTestCase |
196 | -from twisted.internet import defer |
197 | from twisted.trial.unittest import TestCase |
198 | |
199 | from txstatsd.process import ( |
200 | ProcessReport, load_file, parse_meminfo, parse_loadavg, parse_netdev, |
201 | - report_system_stats, report_reactor_stats, report_threadpool_stats) |
202 | + report_system_stats, report_reactor_stats, report_threadpool_stats, report_counters) |
203 | |
204 | |
205 | meminfo = """\ |
206 | @@ -65,18 +64,6 @@ |
207 | class TestSystemPerformance(TestCase, MockerTestCase): |
208 | """Test system performance monitoring.""" |
209 | |
210 | - def assertSuccess(self, deferred, result=None): |
211 | - """ |
212 | - Assert that the given C{deferred} results in the given C{result}. |
213 | - """ |
214 | - self.assertTrue(isinstance(deferred, defer.Deferred)) |
215 | - return deferred.addCallback(self.assertEqual, result) |
216 | - |
217 | - def test_read(self): |
218 | - """We can read files non blocking.""" |
219 | - d = load_file(__file__) |
220 | - return self.assertSuccess(d, open(__file__).read()) |
221 | - |
222 | def test_loadinfo(self): |
223 | """We understand loadinfo.""" |
224 | loadinfo = "1.02 1.08 1.14 2/2015 19420" |
225 | @@ -91,8 +78,8 @@ |
226 | self.assertEqual(r['sys.mem.Buffers'], 8052 * 1024) |
227 | self.assert_('sys.mem.HugePages_Rsvd' not in r) |
228 | |
229 | - def test_statinfo(self): |
230 | - """System stat info is collected through psutil.""" |
231 | + def test_cpu_counters(self): |
232 | + """System cpu counters are collected through psutil.""" |
233 | cpu_times = psutil.cpu_times() |
234 | mock = self.mocker.replace("psutil.cpu_times") |
235 | self.expect(mock()).result(cpu_times) |
236 | @@ -121,39 +108,50 @@ |
237 | self.assertEqual(cpu_times.idle, result["sys.cpu.idle"]) |
238 | self.assertEqual(cpu_times.irq, result["sys.cpu.irq"]) |
239 | |
240 | - def test_self_statinfo(self): |
241 | + def test_self_cpu_and_memory_stats(self): |
242 | """ |
243 | - Process stat info is collected through psutil. |
244 | + Process cpu and memory stats are collected through psutil. |
245 | |
246 | If the L{Process} implementation does not have C{get_num_threads} then |
247 | the number of threads will not be included in the output. |
248 | """ |
249 | process = psutil.Process(os.getpid()) |
250 | vsize, rss = process.get_memory_info() |
251 | - utime, stime = process.get_cpu_times() |
252 | cpu_percent = process.get_cpu_percent() |
253 | memory_percent = process.get_memory_percent() |
254 | |
255 | mock = self.mocker.mock() |
256 | self.expect(mock.get_memory_info()).result((vsize, rss)) |
257 | - self.expect(mock.get_cpu_times()).result((utime, stime)) |
258 | self.expect(mock.get_cpu_percent()).result(cpu_percent) |
259 | self.expect(mock.get_memory_percent()).result(memory_percent) |
260 | self.expect(mock.get_num_threads).result(None) |
261 | self.mocker.replay() |
262 | |
263 | result = ProcessReport(process=mock).get_memory_and_cpu() |
264 | - self.assertEqual(utime, result["proc.cpu.user"]) |
265 | - self.assertEqual(stime, result["proc.cpu.system"]) |
266 | self.assertEqual(cpu_percent, result["proc.cpu.percent"]) |
267 | self.assertEqual(vsize, result["proc.memory.vsize"]) |
268 | self.assertEqual(rss, result["proc.memory.rss"]) |
269 | self.assertEqual(memory_percent, result["proc.memory.percent"]) |
270 | self.failIf("proc.threads" in result) |
271 | |
272 | - def test_self_statinfo_with_num_threads(self): |
273 | - """ |
274 | - Process stat info is collected through psutil. |
275 | + def test_self_cpu_counters(self): |
276 | + """ |
277 | + Process cpu counters are collected through psutil. |
278 | + """ |
279 | + process = psutil.Process(os.getpid()) |
280 | + utime, stime = process.get_cpu_times() |
281 | + |
282 | + mock = self.mocker.mock() |
283 | + self.expect(mock.get_cpu_times()).result((utime, stime)) |
284 | + self.mocker.replay() |
285 | + |
286 | + result = ProcessReport(process=mock).get_cpu_counters() |
287 | + self.assertEqual(utime, result["proc.cpu.user"]) |
288 | + self.assertEqual(stime, result["proc.cpu.system"]) |
289 | + |
290 | + def test_self_cpu_and_memory_stats_with_num_threads(self): |
291 | + """ |
292 | + Process cpu and memory stats are collected through psutil. |
293 | |
294 | If the L{Process} implementation contains C{get_num_threads} then the |
295 | number of threads will be included in the output. |
296 | @@ -161,21 +159,17 @@ |
297 | """ |
298 | process = psutil.Process(os.getpid()) |
299 | vsize, rss = process.get_memory_info() |
300 | - utime, stime = process.get_cpu_times() |
301 | cpu_percent = process.get_cpu_percent() |
302 | memory_percent = process.get_memory_percent() |
303 | |
304 | mock = self.mocker.mock() |
305 | self.expect(mock.get_memory_info()).result((vsize, rss)) |
306 | - self.expect(mock.get_cpu_times()).result((utime, stime)) |
307 | self.expect(mock.get_cpu_percent()).result(cpu_percent) |
308 | self.expect(mock.get_memory_percent()).result(memory_percent) |
309 | self.expect(mock.get_num_threads()).result(1) |
310 | self.mocker.replay() |
311 | |
312 | result = ProcessReport(process=mock).get_memory_and_cpu() |
313 | - self.assertEqual(utime, result["proc.cpu.user"]) |
314 | - self.assertEqual(stime, result["proc.cpu.system"]) |
315 | self.assertEqual(cpu_percent, result["proc.cpu.percent"]) |
316 | self.assertEqual(vsize, result["proc.memory.vsize"]) |
317 | self.assertEqual(rss, result["proc.memory.rss"]) |
318 | @@ -304,3 +298,25 @@ |
319 | "sys.net.tun0.bytes.sent": 5226635, |
320 | "sys.net.tun0.packets.received": 24837, |
321 | "sys.net.tun0.packets.sent": 26986}) |
322 | + |
323 | + def test_report_counters(self): |
324 | + """ |
325 | + C{report_counters} keeps the last value of a called function and on the |
326 | + next call returns the difference between current return value and |
327 | + previous return value. |
328 | + """ |
329 | + def generate(): |
330 | + yield {"foo": 1} |
331 | + yield {"foo": 5} |
332 | + yield {"foo": 10} |
333 | + yield {"foo": 17} |
334 | + generate = generate() |
335 | + def reporter(): |
336 | + return generate.next() |
337 | + wrapped = report_counters(reporter) |
338 | + self.assertEqual({}, wrapped()) |
339 | + self.assertEqual({"foo": 4}, wrapped()) |
340 | + self.assertEqual({"foo": 5}, wrapped()) |
341 | + self.assertEqual({"foo": 7}, wrapped()) |
342 | + |
343 | + |