Merge lp:~tribaal/landscape-client/ceph-usage-report into lp:~landscape/landscape-client/trunk
- ceph-usage-report
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Chris Glass | ||||
Approved revision: | 611 | ||||
Merged at revision: | 605 | ||||
Proposed branch: | lp:~tribaal/landscape-client/ceph-usage-report | ||||
Merge into: | lp:~landscape/landscape-client/trunk | ||||
Diff against target: |
461 lines (+435/-1) 3 files modified
landscape/message_schemas.py (+7/-1) landscape/monitor/ceph.py (+150/-0) landscape/monitor/tests/test_ceph.py (+278/-0) |
||||
To merge this branch: | bzr merge lp:~tribaal/landscape-client/ceph-usage-report | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Alberto Donato (community) | Approve | ||
Jerry Seutter (community) | Approve | ||
Review via email: mp+143048@code.launchpad.net |
Commit message
Add a Ceph storage monitor plugin that reports Ceph disk usage to the server
Description of the change
This branch adds a Ceph storage monitor plugin that reports ceph usage to the server if usage statistics are available from the host (namely, it tries running "ceph status" and "ceph quorum_status" to gather ring information).
This approach was chosen because the current state of the python library does not allow to extract this information easily. It is however pretty simple to change that in the future.
Jerry Seutter (jseutter) wrote : | # |
+1 looks good
Just some lintian errors:
landscape/
landscape/
landscape/
landscape/
- 610. By Chris Glass
-
lint fixes
Chris Glass (tribaal) wrote : | # |
All errors should be fixed.
Alberto Donato (ack) wrote : | # |
Looks good! +1
A few nitpicks:
#1:
+ ring_id = self._ceph_ring_id
+ if ring_id is None:
+ ring_id = self._get_
+ self._ceph_ring_id = ring_id
I think you can write this as
+ if self._ceph_ring_id is None:
+ self._ceph_ring_id = self._get_
#2:
+ self.assertNotE
+ self.assertEqua
The first assert is redundant.
#3:
+ if len(message[
len() can be dropped.
- 611. By Chris Glass
-
Fixes all comments.
Preview Diff
1 | === modified file 'landscape/message_schemas.py' |
2 | --- landscape/message_schemas.py 2013-01-09 13:20:10 +0000 |
3 | +++ landscape/message_schemas.py 2013-01-15 10:26:22 +0000 |
4 | @@ -111,6 +111,11 @@ |
5 | "cpu-usages": List(Tuple(Int(), Float())), |
6 | }) |
7 | |
8 | +CEPH_USAGE = Message("ceph-usage", { |
9 | + "ceph-usages": List(Tuple(Int(), Float())), |
10 | + "ring-id": utf8, |
11 | + }) |
12 | + |
13 | MEMORY_INFO = Message("memory-info", { |
14 | "memory-info": List(Tuple(Float(), Int(), Int())), |
15 | }) |
16 | @@ -420,5 +425,6 @@ |
17 | ADD_PACKAGES, PACKAGE_REPORTER_RESULT, TEXT_MESSAGE, TEST, |
18 | CUSTOM_GRAPH, REBOOT_REQUIRED, APT_PREFERENCES, EUCALYPTUS_INFO, |
19 | EUCALYPTUS_INFO_ERROR, NETWORK_DEVICE, NETWORK_ACTIVITY, |
20 | - REBOOT_REQUIRED_INFO, UPDATE_MANAGER_INFO, CPU_USAGE]: |
21 | + REBOOT_REQUIRED_INFO, UPDATE_MANAGER_INFO, CPU_USAGE, |
22 | + CEPH_USAGE]: |
23 | message_schemas[schema.type] = schema |
24 | |
25 | === added file 'landscape/monitor/ceph.py' |
26 | --- landscape/monitor/ceph.py 1970-01-01 00:00:00 +0000 |
27 | +++ landscape/monitor/ceph.py 2013-01-15 10:26:22 +0000 |
28 | @@ -0,0 +1,150 @@ |
29 | +import time |
30 | +import os |
31 | + |
32 | +from landscape.accumulate import Accumulator |
33 | +from landscape.lib.monitor import CoverageMonitor |
34 | +from landscape.lib.command import run_command, CommandError |
35 | +from landscape.monitor.plugin import MonitorPlugin |
36 | + |
37 | +ACCUMULATOR_KEY = "ceph-usage-accumulator" |
38 | +CEPH_CONFIG_FILE = "/etc/ceph/ceph.conf" |
39 | + |
40 | + |
41 | +class CephUsage(MonitorPlugin): |
42 | + """ |
43 | + Plugin that captures Ceph usage information. This only works if the client |
44 | + runs on one of the Ceph monitor nodes, and it noops otherwise. |
45 | + """ |
46 | + persist_name = "ceph-usage" |
47 | + # Prevent the Plugin base-class from scheduling looping calls. |
48 | + run_interval = None |
49 | + |
50 | + def __init__(self, interval=30, monitor_interval=60 * 60, |
51 | + create_time=time.time): |
52 | + self._interval = interval |
53 | + self._monitor_interval = monitor_interval |
54 | + self._ceph_usage_points = [] |
55 | + self._ceph_ring_id = None |
56 | + self._create_time = create_time |
57 | + self._ceph_config = CEPH_CONFIG_FILE |
58 | + |
59 | + def register(self, registry): |
60 | + super(CephUsage, self).register(registry) |
61 | + self._accumulate = Accumulator(self._persist, registry.step_size) |
62 | + |
63 | + self.registry.reactor.call_every(self._interval, self.run) |
64 | + |
65 | + self._monitor = CoverageMonitor(self._interval, 0.8, |
66 | + "Ceph usage snapshot", |
67 | + create_time=self._create_time) |
68 | + self.registry.reactor.call_every(self._monitor_interval, |
69 | + self._monitor.log) |
70 | + self.registry.reactor.call_on("stop", self._monitor.log, priority=2000) |
71 | + self.call_on_accepted("ceph-usage", self.send_message, True) |
72 | + |
73 | + def create_message(self): |
74 | + ceph_points = self._ceph_usage_points |
75 | + ring_id = self._ceph_ring_id |
76 | + self._ceph_usage_points = [] |
77 | + return {"type": "ceph-usage", "ceph-usages": ceph_points, |
78 | + "ring-id": ring_id} |
79 | + |
80 | + def send_message(self, urgent=False): |
81 | + message = self.create_message() |
82 | + if message["ceph-usages"] and message["ring-id"] is not None: |
83 | + self.registry.broker.send_message(message, urgent=urgent) |
84 | + |
85 | + def exchange(self, urgent=False): |
86 | + self.registry.broker.call_if_accepted("ceph-usage", |
87 | + self.send_message, urgent) |
88 | + |
89 | + def run(self): |
90 | + self._monitor.ping() |
91 | + |
92 | + config_file = self._ceph_config |
93 | + # Check if a ceph config file is available. No need to run anything |
94 | + # if we know that we're not on a Ceph monitor node anyway. |
95 | + if not os.path.exists(config_file): |
96 | + # There is no config file - it's not a ceph machine. |
97 | + return None |
98 | + |
99 | + # Extract the ceph ring Id and cache it. |
100 | + if self._ceph_ring_id is None: |
101 | + self._ceph_ring_id = self._get_ceph_ring_id() |
102 | + |
103 | + new_timestamp = int(self._create_time()) |
104 | + new_ceph_usage = self._get_ceph_usage() |
105 | + |
106 | + step_data = None |
107 | + if new_ceph_usage is not None: |
108 | + step_data = self._accumulate(new_timestamp, new_ceph_usage, |
109 | + ACCUMULATOR_KEY) |
110 | + if step_data is not None: |
111 | + self._ceph_usage_points.append(step_data) |
112 | + |
113 | + def _get_ceph_usage(self): |
114 | + """ |
115 | + Grab the ceph usage data by parsing the output of the "ceph status" |
116 | + command output. |
117 | + """ |
118 | + output = self._get_ceph_command_output() |
119 | + |
120 | + if output is None: |
121 | + return None |
122 | + |
123 | + lines = output.split("\n") |
124 | + |
125 | + pg_line = None |
126 | + for line in lines: |
127 | + if "pgmap" in line: |
128 | + pg_line = line.split() |
129 | + break |
130 | + |
131 | + if pg_line is None: |
132 | + return None |
133 | + |
134 | + total = pg_line[-3] # Total space |
135 | + available = pg_line[-6] # Available for objects |
136 | + #used = pg_line[-9] # Used by objects |
137 | + # Note: used + available is NOT equal to total (there is some used |
138 | + # space for duplication and system info etc...) |
139 | + |
140 | + filled = int(total) - int(available) |
141 | + |
142 | + return filled / float(total) |
143 | + |
144 | + def _get_ceph_command_output(self): |
145 | + try: |
146 | + output = run_command("ceph status") |
147 | + except (OSError, CommandError): |
148 | + # If the command line client isn't available, we assume it's not |
149 | + # a ceph monitor machine. |
150 | + return None |
151 | + return output |
152 | + |
153 | + def _get_ceph_ring_id(self): |
154 | + output = self._get_quorum_command_output() |
155 | + lines = output.split("\n") |
156 | + fsid_line = None |
157 | + for line in lines: |
158 | + if "fsid" in line: |
159 | + fsid_line = line.split() |
160 | + break |
161 | + |
162 | + if fsid_line is None: |
163 | + return None |
164 | + |
165 | + wrapped_id = fsid_line[-1] |
166 | + ring_id = wrapped_id.replace('",', '') |
167 | + ring_id = ring_id.replace('"', '') |
168 | + |
169 | + return ring_id |
170 | + |
171 | + def _get_quorum_command_output(self): |
172 | + try: |
173 | + output = run_command("ceph quorum_status") |
174 | + except (OSError, CommandError): |
175 | + # If the command line client isn't available, we assume it's not |
176 | + # a ceph monitor machine. |
177 | + return None |
178 | + return output |
179 | |
180 | === added file 'landscape/monitor/tests/test_ceph.py' |
181 | --- landscape/monitor/tests/test_ceph.py 1970-01-01 00:00:00 +0000 |
182 | +++ landscape/monitor/tests/test_ceph.py 2013-01-15 10:26:22 +0000 |
183 | @@ -0,0 +1,278 @@ |
184 | +from landscape.tests.helpers import LandscapeTest, MonitorHelper |
185 | +from landscape.monitor.ceph import CephUsage |
186 | + |
187 | + |
188 | +SAMPLE_TEMPLATE = (" health HEALTH_WARN 6 pgs degraded; 6 pgs stuck " |
189 | +"unclean\n" |
190 | +"monmap e2: 3 mons at {server-269703f4-5217-495a-b7f2-b3b3473c1719=" |
191 | +"10.55.60.238:6789/0,server-3f370698-f3b0-4cbe-8db9-a18e304c952b=" |
192 | +"10.55.60.141:6789/0,server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff=" |
193 | +"10.55.60.241:6789/0}, election epoch 8, quorum 0,1,2 " |
194 | +"server-269703f4-5217-495a-b7f2-b3b3473c1719," |
195 | +"server-3f370698-f3b0-4cbe-8db9-a18e304c952b," |
196 | +"server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff\n " |
197 | +"osdmap e9: 3 osds: 3 up, 3 in\n " |
198 | +"pgmap v114: 192 pgs: 186 active+clean, 6 active+degraded; " |
199 | +"0 bytes data, %s MB used, %s MB / %s MB avail\n " |
200 | +"mdsmap e1: 0/0/1 up\n\n") |
201 | + |
202 | +SAMPLE_OUTPUT = SAMPLE_TEMPLATE % (4296, 53880, 61248) |
203 | + |
204 | +SAMPLE_QUORUM = ('' |
205 | +'{ "election_epoch": 8,\n' |
206 | +' "quorum": [\n' |
207 | +' 0,\n' |
208 | +' 1,\n' |
209 | +' 2],\n' |
210 | +' "monmap": { "epoch": 2,\n' |
211 | +' "fsid": "%s",\n' |
212 | +' "modified": "2013-01-13 16:58:00.141737",\n' |
213 | +' "created": "0.000000",\n' |
214 | +' "mons": [\n' |
215 | +' { "rank": 0,\n' |
216 | +' "name": "server-1be72d64-0ff2-4ac1-ad13-1c06c8201011",\n' |
217 | +' "addr": "10.55.60.188:6789\/0"},\n' |
218 | +' { "rank": 1,\n' |
219 | +' "name": "server-e847f147-ed13-46c2-8e6d-768aa32657ab",\n' |
220 | +' "addr": "10.55.60.202:6789\/0"},\n' |
221 | +' { "rank": 2,\n' |
222 | +' "name": "server-3c831a0b-51d5-43a9-95d5-63644f0965cc",\n' |
223 | +' "addr": "10.55.60.205:6789\/0"}]}}\n' |
224 | +) |
225 | + |
226 | +SAMPLE_QUORUM_OUTPUT = SAMPLE_QUORUM % "ecbb8960-0e21-11e2-b495-83a88f44db01" |
227 | + |
228 | + |
229 | +class CephUsagePluginTest(LandscapeTest): |
230 | + helpers = [MonitorHelper] |
231 | + |
232 | + def test_get_ceph_usage_if_command_not_found(self): |
233 | + """ |
234 | + When the ceph command cannot be found or accessed, the |
235 | + C{_get_ceph_usage} method returns None. |
236 | + """ |
237 | + plugin = CephUsage(create_time=self.reactor.time) |
238 | + |
239 | + def return_none(): |
240 | + return None |
241 | + |
242 | + plugin._get_ceph_command_output = return_none |
243 | + |
244 | + self.monitor.add(plugin) |
245 | + |
246 | + result = plugin._get_ceph_usage() |
247 | + self.assertIs(None, result) |
248 | + |
249 | + def test_get_ceph_usage(self): |
250 | + """ |
251 | + When the ceph command call returns output, the _get_ceph_usage method |
252 | + returns the percentage of used space. |
253 | + """ |
254 | + plugin = CephUsage(create_time=self.reactor.time) |
255 | + |
256 | + def return_output(): |
257 | + return SAMPLE_OUTPUT |
258 | + |
259 | + plugin._get_ceph_command_output = return_output |
260 | + |
261 | + self.monitor.add(plugin) |
262 | + |
263 | + result = plugin._get_ceph_usage() |
264 | + self.assertEqual(0.12029780564263323, result) |
265 | + |
266 | + def test_get_ceph_usage_empty_disk(self): |
267 | + """ |
268 | + When the ceph command call returns output for empty disks, the |
269 | + _get_ceph_usage method returns 0.0 . |
270 | + """ |
271 | + plugin = CephUsage(create_time=self.reactor.time) |
272 | + |
273 | + def return_output(): |
274 | + return SAMPLE_TEMPLATE % (0, 100, 100) |
275 | + |
276 | + plugin._get_ceph_command_output = return_output |
277 | + |
278 | + self.monitor.add(plugin) |
279 | + |
280 | + result = plugin._get_ceph_usage() |
281 | + self.assertEqual(0.0, result) |
282 | + |
283 | + def test_get_ceph_usage_full_disk(self): |
284 | + """ |
285 | + When the ceph command call returns output for empty disks, the |
286 | + _get_ceph_usage method returns 1.0 . |
287 | + """ |
288 | + plugin = CephUsage(create_time=self.reactor.time) |
289 | + |
290 | + def return_output(): |
291 | + return SAMPLE_TEMPLATE % (100, 0, 100) |
292 | + |
293 | + plugin._get_ceph_command_output = return_output |
294 | + |
295 | + self.monitor.add(plugin) |
296 | + |
297 | + result = plugin._get_ceph_usage() |
298 | + self.assertEqual(1.0, result) |
299 | + |
300 | + def test_get_ceph_usage_no_information(self): |
301 | + """ |
302 | + When the ceph command outputs something that does not contain the |
303 | + disk usage information, the _get_ceph_usage method returns None. |
304 | + """ |
305 | + plugin = CephUsage(create_time=self.reactor.time) |
306 | + |
307 | + def return_output(): |
308 | + return "Blah\nblah" |
309 | + |
310 | + plugin._get_ceph_command_output = return_output |
311 | + |
312 | + self.monitor.add(plugin) |
313 | + |
314 | + result = plugin._get_ceph_usage() |
315 | + self.assertEqual(None, result) |
316 | + |
317 | + def test_never_exchange_empty_messages(self): |
318 | + """ |
319 | + The plugin will create a message with an empty |
320 | + C{ceph-usages} list when no previous data is available. If an empty |
321 | + message is created during exchange, it should not be queued. |
322 | + """ |
323 | + self.mstore.set_accepted_types(["ceph-usage"]) |
324 | + |
325 | + plugin = CephUsage(create_time=self.reactor.time) |
326 | + self.monitor.add(plugin) |
327 | + |
328 | + self.monitor.exchange() |
329 | + self.assertEqual(len(self.mstore.get_pending_messages()), 0) |
330 | + |
331 | + def test_exchange_messages(self): |
332 | + """ |
333 | + The Ceph usage plugin queues message when manager.exchange() |
334 | + is called. |
335 | + """ |
336 | + ring_id = "whatever" |
337 | + self.mstore.set_accepted_types(["ceph-usage"]) |
338 | + |
339 | + plugin = CephUsage(create_time=self.reactor.time) |
340 | + plugin._ceph_usage_points = [(60, 1.0)] |
341 | + plugin._ceph_ring_id = ring_id |
342 | + self.monitor.add(plugin) |
343 | + |
344 | + self.monitor.exchange() |
345 | + |
346 | + self.assertMessages(self.mstore.get_pending_messages(), |
347 | + [{"type": "ceph-usage", |
348 | + "ceph-usages": [(60, 1.0)], |
349 | + "ring-id": ring_id}]) |
350 | + |
351 | + def test_create_message(self): |
352 | + """ |
353 | + Calling create_message returns an expected message. |
354 | + """ |
355 | + plugin = CephUsage(create_time=self.reactor.time) |
356 | + self.monitor.add(plugin) |
357 | + |
358 | + ring_id = "blah" |
359 | + plugin._ceph_usage_points = [] |
360 | + plugin._ceph_ring_id = ring_id |
361 | + message = plugin.create_message() |
362 | + |
363 | + self.assertIn("type", message) |
364 | + self.assertEqual(message["type"], "ceph-usage") |
365 | + self.assertIn("ceph-usages", message) |
366 | + self.assertEqual(ring_id, message["ring-id"]) |
367 | + ceph_usages = message["ceph-usages"] |
368 | + self.assertEqual(len(ceph_usages), 0) |
369 | + |
370 | + point = (60, 1.0) |
371 | + plugin._ceph_usage_points = [point] |
372 | + message = plugin.create_message() |
373 | + self.assertIn("type", message) |
374 | + self.assertEqual(message["type"], "ceph-usage") |
375 | + self.assertIn("ceph-usages", message) |
376 | + self.assertEqual(ring_id, message["ring-id"]) |
377 | + ceph_usages = message["ceph-usages"] |
378 | + self.assertEqual(len(ceph_usages), 1) |
379 | + self.assertEqual([point], ceph_usages) |
380 | + |
381 | + def test_no_message_if_not_accepted(self): |
382 | + """ |
383 | + Don't add any messages at all if the broker isn't currently |
384 | + accepting their type. |
385 | + """ |
386 | + interval = 30 |
387 | + |
388 | + plugin = CephUsage(create_time=self.reactor.time, |
389 | + interval=interval) |
390 | + |
391 | + self.monitor.add(plugin) |
392 | + |
393 | + self.reactor.advance(self.monitor.step_size * 2) |
394 | + self.monitor.exchange() |
395 | + |
396 | + self.mstore.set_accepted_types(["ceph-usage"]) |
397 | + self.assertMessages(list(self.mstore.get_pending_messages()), []) |
398 | + |
399 | + def test_get_ceph_ring_id(self): |
400 | + """ |
401 | + When given a well formatted command output, the _get_ceph_ring_id() |
402 | + method returns the correct ring_id. |
403 | + """ |
404 | + plugin = CephUsage(create_time=self.reactor.time) |
405 | + |
406 | + uuid = "i-am-a-uuid" |
407 | + |
408 | + def return_output(): |
409 | + return SAMPLE_QUORUM % uuid |
410 | + |
411 | + plugin._get_quorum_command_output = return_output |
412 | + |
413 | + self.monitor.add(plugin) |
414 | + |
415 | + result = plugin._get_ceph_ring_id() |
416 | + self.assertEqual(uuid, result) |
417 | + |
418 | + def test_get_ceph_ring_id_no_information(self): |
419 | + """ |
420 | + When the _get_quorum_command_output method returns something without |
421 | + the ring uuid information present, the _get-ceph_ring_id method returns |
422 | + None. |
423 | + """ |
424 | + plugin = CephUsage(create_time=self.reactor.time) |
425 | + |
426 | + def return_output(): |
427 | + return "Blah\nblah" |
428 | + |
429 | + plugin._get_quorum_command_output = return_output |
430 | + |
431 | + self.monitor.add(plugin) |
432 | + |
433 | + result = plugin._get_ceph_ring_id() |
434 | + self.assertEqual(None, result) |
435 | + |
436 | + def test_plugin_run(self): |
437 | + """ |
438 | + The plugin's run() method fills the _ceph_usage_points with |
439 | + accumulated samples after each C{monitor.step_size} period. |
440 | + The _ceph_ring_id member of the plugin is also filled with the output |
441 | + of the _get_ceph_ring_id method. |
442 | + """ |
443 | + plugin = CephUsage(create_time=self.reactor.time) |
444 | + uuid = "i-am-a-unique-snowflake" |
445 | + |
446 | + def return_quorum(): |
447 | + return SAMPLE_QUORUM % uuid |
448 | + |
449 | + def return_full_disk(): |
450 | + return SAMPLE_TEMPLATE % (100, 0, 100) |
451 | + |
452 | + plugin._ceph_config = "/etc/hosts" |
453 | + plugin._get_quorum_command_output = return_quorum |
454 | + plugin._get_ceph_command_output = return_full_disk |
455 | + |
456 | + self.monitor.add(plugin) |
457 | + |
458 | + self.reactor.advance(self.monitor.step_size * 2) |
459 | + |
460 | + self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points) |
461 | + self.assertEqual(uuid, plugin._ceph_ring_id) |
+1 looks good
Just some lintian errors: message_ schemas. py:116: 14: E203 whitespace before ':' monitor/ ceph.py: 12:1: E302 expected 2 blank lines, found 1 monitor/ tests/test_ ceph.py: 5:80: E501 line too long (80 characters) monitor/ tests/test_ ceph.py: 223:9: E301 expected 1 blank line, found 0
landscape/
landscape/
landscape/
landscape/