Merge lp:~tribaal/landscape-client/ceph-usage-report into lp:~landscape/landscape-client/trunk

Proposed by Chris Glass
Status: Merged
Approved by: Chris Glass
Approved revision: 611
Merged at revision: 605
Proposed branch: lp:~tribaal/landscape-client/ceph-usage-report
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 461 lines (+435/-1)
3 files modified
landscape/message_schemas.py (+7/-1)
landscape/monitor/ceph.py (+150/-0)
landscape/monitor/tests/test_ceph.py (+278/-0)
To merge this branch: bzr merge lp:~tribaal/landscape-client/ceph-usage-report
Reviewer Review Type Date Requested Status
Alberto Donato (community) Approve
Jerry Seutter (community) Approve
Review via email: mp+143048@code.launchpad.net

Commit message

Add a Ceph storage monitor plugin that reports Ceph disk usage to the server

Description of the change

This branch adds a Ceph storage monitor plugin that reports ceph usage to the server if usage statistics are available from the host (namely, it tries running "ceph status" and "ceph quorum_status" to gather ring information).

This approach was chosen because the current state of the python library does not allow to extract this information easily. It is however pretty simple to change that in the future.

To post a comment you must log in.
Revision history for this message
Jerry Seutter (jseutter) wrote :

+1 looks good

Just some lintian errors:
landscape/message_schemas.py:116:14: E203 whitespace before ':'
landscape/monitor/ceph.py:12:1: E302 expected 2 blank lines, found 1
landscape/monitor/tests/test_ceph.py:5:80: E501 line too long (80 characters)
landscape/monitor/tests/test_ceph.py:223:9: E301 expected 1 blank line, found 0

review: Approve
Revision history for this message
Jerry Seutter (jseutter) wrote :

+1 looks good

Just some lintian errors:
landscape/message_schemas.py:116:14: E203 whitespace before ':'
landscape/monitor/ceph.py:12:1: E302 expected 2 blank lines, found 1
landscape/monitor/tests/test_ceph.py:5:80: E501 line too long (80 characters)
landscape/monitor/tests/test_ceph.py:223:9: E301 expected 1 blank line, found 0

review: Approve
610. By Chris Glass

lint fixes

Revision history for this message
Chris Glass (tribaal) wrote :

All errors should be fixed.

Revision history for this message
Alberto Donato (ack) wrote :

Looks good! +1

A few nitpicks:

#1:
+ ring_id = self._ceph_ring_id
+ if ring_id is None:
+ ring_id = self._get_ceph_ring_id()
+ self._ceph_ring_id = ring_id

I think you can write this as

+ if self._ceph_ring_id is None:
+ self._ceph_ring_id = self._get_ceph_ring_id()

#2:
+ self.assertNotEqual([], plugin._ceph_usage_points)
+ self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points)

The first assert is redundant.

#3:
+ if len(message["ceph-usages"]) and message["ring-id"] is not None:

len() can be dropped.

review: Approve
611. By Chris Glass

Fixes all comments.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/message_schemas.py'
2--- landscape/message_schemas.py 2013-01-09 13:20:10 +0000
3+++ landscape/message_schemas.py 2013-01-15 10:26:22 +0000
4@@ -111,6 +111,11 @@
5 "cpu-usages": List(Tuple(Int(), Float())),
6 })
7
8+CEPH_USAGE = Message("ceph-usage", {
9+ "ceph-usages": List(Tuple(Int(), Float())),
10+ "ring-id": utf8,
11+ })
12+
13 MEMORY_INFO = Message("memory-info", {
14 "memory-info": List(Tuple(Float(), Int(), Int())),
15 })
16@@ -420,5 +425,6 @@
17 ADD_PACKAGES, PACKAGE_REPORTER_RESULT, TEXT_MESSAGE, TEST,
18 CUSTOM_GRAPH, REBOOT_REQUIRED, APT_PREFERENCES, EUCALYPTUS_INFO,
19 EUCALYPTUS_INFO_ERROR, NETWORK_DEVICE, NETWORK_ACTIVITY,
20- REBOOT_REQUIRED_INFO, UPDATE_MANAGER_INFO, CPU_USAGE]:
21+ REBOOT_REQUIRED_INFO, UPDATE_MANAGER_INFO, CPU_USAGE,
22+ CEPH_USAGE]:
23 message_schemas[schema.type] = schema
24
25=== added file 'landscape/monitor/ceph.py'
26--- landscape/monitor/ceph.py 1970-01-01 00:00:00 +0000
27+++ landscape/monitor/ceph.py 2013-01-15 10:26:22 +0000
28@@ -0,0 +1,150 @@
29+import time
30+import os
31+
32+from landscape.accumulate import Accumulator
33+from landscape.lib.monitor import CoverageMonitor
34+from landscape.lib.command import run_command, CommandError
35+from landscape.monitor.plugin import MonitorPlugin
36+
37+ACCUMULATOR_KEY = "ceph-usage-accumulator"
38+CEPH_CONFIG_FILE = "/etc/ceph/ceph.conf"
39+
40+
41+class CephUsage(MonitorPlugin):
42+ """
43+ Plugin that captures Ceph usage information. This only works if the client
44+ runs on one of the Ceph monitor nodes, and it noops otherwise.
45+ """
46+ persist_name = "ceph-usage"
47+ # Prevent the Plugin base-class from scheduling looping calls.
48+ run_interval = None
49+
50+ def __init__(self, interval=30, monitor_interval=60 * 60,
51+ create_time=time.time):
52+ self._interval = interval
53+ self._monitor_interval = monitor_interval
54+ self._ceph_usage_points = []
55+ self._ceph_ring_id = None
56+ self._create_time = create_time
57+ self._ceph_config = CEPH_CONFIG_FILE
58+
59+ def register(self, registry):
60+ super(CephUsage, self).register(registry)
61+ self._accumulate = Accumulator(self._persist, registry.step_size)
62+
63+ self.registry.reactor.call_every(self._interval, self.run)
64+
65+ self._monitor = CoverageMonitor(self._interval, 0.8,
66+ "Ceph usage snapshot",
67+ create_time=self._create_time)
68+ self.registry.reactor.call_every(self._monitor_interval,
69+ self._monitor.log)
70+ self.registry.reactor.call_on("stop", self._monitor.log, priority=2000)
71+ self.call_on_accepted("ceph-usage", self.send_message, True)
72+
73+ def create_message(self):
74+ ceph_points = self._ceph_usage_points
75+ ring_id = self._ceph_ring_id
76+ self._ceph_usage_points = []
77+ return {"type": "ceph-usage", "ceph-usages": ceph_points,
78+ "ring-id": ring_id}
79+
80+ def send_message(self, urgent=False):
81+ message = self.create_message()
82+ if message["ceph-usages"] and message["ring-id"] is not None:
83+ self.registry.broker.send_message(message, urgent=urgent)
84+
85+ def exchange(self, urgent=False):
86+ self.registry.broker.call_if_accepted("ceph-usage",
87+ self.send_message, urgent)
88+
89+ def run(self):
90+ self._monitor.ping()
91+
92+ config_file = self._ceph_config
93+ # Check if a ceph config file is available. No need to run anything
94+ # if we know that we're not on a Ceph monitor node anyway.
95+ if not os.path.exists(config_file):
96+ # There is no config file - it's not a ceph machine.
97+ return None
98+
99+ # Extract the ceph ring Id and cache it.
100+ if self._ceph_ring_id is None:
101+ self._ceph_ring_id = self._get_ceph_ring_id()
102+
103+ new_timestamp = int(self._create_time())
104+ new_ceph_usage = self._get_ceph_usage()
105+
106+ step_data = None
107+ if new_ceph_usage is not None:
108+ step_data = self._accumulate(new_timestamp, new_ceph_usage,
109+ ACCUMULATOR_KEY)
110+ if step_data is not None:
111+ self._ceph_usage_points.append(step_data)
112+
113+ def _get_ceph_usage(self):
114+ """
115+ Grab the ceph usage data by parsing the output of the "ceph status"
116+ command output.
117+ """
118+ output = self._get_ceph_command_output()
119+
120+ if output is None:
121+ return None
122+
123+ lines = output.split("\n")
124+
125+ pg_line = None
126+ for line in lines:
127+ if "pgmap" in line:
128+ pg_line = line.split()
129+ break
130+
131+ if pg_line is None:
132+ return None
133+
134+ total = pg_line[-3] # Total space
135+ available = pg_line[-6] # Available for objects
136+ #used = pg_line[-9] # Used by objects
137+ # Note: used + available is NOT equal to total (there is some used
138+ # space for duplication and system info etc...)
139+
140+ filled = int(total) - int(available)
141+
142+ return filled / float(total)
143+
144+ def _get_ceph_command_output(self):
145+ try:
146+ output = run_command("ceph status")
147+ except (OSError, CommandError):
148+ # If the command line client isn't available, we assume it's not
149+ # a ceph monitor machine.
150+ return None
151+ return output
152+
153+ def _get_ceph_ring_id(self):
154+ output = self._get_quorum_command_output()
155+ lines = output.split("\n")
156+ fsid_line = None
157+ for line in lines:
158+ if "fsid" in line:
159+ fsid_line = line.split()
160+ break
161+
162+ if fsid_line is None:
163+ return None
164+
165+ wrapped_id = fsid_line[-1]
166+ ring_id = wrapped_id.replace('",', '')
167+ ring_id = ring_id.replace('"', '')
168+
169+ return ring_id
170+
171+ def _get_quorum_command_output(self):
172+ try:
173+ output = run_command("ceph quorum_status")
174+ except (OSError, CommandError):
175+ # If the command line client isn't available, we assume it's not
176+ # a ceph monitor machine.
177+ return None
178+ return output
179
180=== added file 'landscape/monitor/tests/test_ceph.py'
181--- landscape/monitor/tests/test_ceph.py 1970-01-01 00:00:00 +0000
182+++ landscape/monitor/tests/test_ceph.py 2013-01-15 10:26:22 +0000
183@@ -0,0 +1,278 @@
184+from landscape.tests.helpers import LandscapeTest, MonitorHelper
185+from landscape.monitor.ceph import CephUsage
186+
187+
188+SAMPLE_TEMPLATE = (" health HEALTH_WARN 6 pgs degraded; 6 pgs stuck "
189+"unclean\n"
190+"monmap e2: 3 mons at {server-269703f4-5217-495a-b7f2-b3b3473c1719="
191+"10.55.60.238:6789/0,server-3f370698-f3b0-4cbe-8db9-a18e304c952b="
192+"10.55.60.141:6789/0,server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff="
193+"10.55.60.241:6789/0}, election epoch 8, quorum 0,1,2 "
194+"server-269703f4-5217-495a-b7f2-b3b3473c1719,"
195+"server-3f370698-f3b0-4cbe-8db9-a18e304c952b,"
196+"server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff\n "
197+"osdmap e9: 3 osds: 3 up, 3 in\n "
198+"pgmap v114: 192 pgs: 186 active+clean, 6 active+degraded; "
199+"0 bytes data, %s MB used, %s MB / %s MB avail\n "
200+"mdsmap e1: 0/0/1 up\n\n")
201+
202+SAMPLE_OUTPUT = SAMPLE_TEMPLATE % (4296, 53880, 61248)
203+
204+SAMPLE_QUORUM = (''
205+'{ "election_epoch": 8,\n'
206+' "quorum": [\n'
207+' 0,\n'
208+' 1,\n'
209+' 2],\n'
210+' "monmap": { "epoch": 2,\n'
211+' "fsid": "%s",\n'
212+' "modified": "2013-01-13 16:58:00.141737",\n'
213+' "created": "0.000000",\n'
214+' "mons": [\n'
215+' { "rank": 0,\n'
216+' "name": "server-1be72d64-0ff2-4ac1-ad13-1c06c8201011",\n'
217+' "addr": "10.55.60.188:6789\/0"},\n'
218+' { "rank": 1,\n'
219+' "name": "server-e847f147-ed13-46c2-8e6d-768aa32657ab",\n'
220+' "addr": "10.55.60.202:6789\/0"},\n'
221+' { "rank": 2,\n'
222+' "name": "server-3c831a0b-51d5-43a9-95d5-63644f0965cc",\n'
223+' "addr": "10.55.60.205:6789\/0"}]}}\n'
224+)
225+
226+SAMPLE_QUORUM_OUTPUT = SAMPLE_QUORUM % "ecbb8960-0e21-11e2-b495-83a88f44db01"
227+
228+
229+class CephUsagePluginTest(LandscapeTest):
230+ helpers = [MonitorHelper]
231+
232+ def test_get_ceph_usage_if_command_not_found(self):
233+ """
234+ When the ceph command cannot be found or accessed, the
235+ C{_get_ceph_usage} method returns None.
236+ """
237+ plugin = CephUsage(create_time=self.reactor.time)
238+
239+ def return_none():
240+ return None
241+
242+ plugin._get_ceph_command_output = return_none
243+
244+ self.monitor.add(plugin)
245+
246+ result = plugin._get_ceph_usage()
247+ self.assertIs(None, result)
248+
249+ def test_get_ceph_usage(self):
250+ """
251+ When the ceph command call returns output, the _get_ceph_usage method
252+ returns the percentage of used space.
253+ """
254+ plugin = CephUsage(create_time=self.reactor.time)
255+
256+ def return_output():
257+ return SAMPLE_OUTPUT
258+
259+ plugin._get_ceph_command_output = return_output
260+
261+ self.monitor.add(plugin)
262+
263+ result = plugin._get_ceph_usage()
264+ self.assertEqual(0.12029780564263323, result)
265+
266+ def test_get_ceph_usage_empty_disk(self):
267+ """
268+ When the ceph command call returns output for empty disks, the
269+ _get_ceph_usage method returns 0.0 .
270+ """
271+ plugin = CephUsage(create_time=self.reactor.time)
272+
273+ def return_output():
274+ return SAMPLE_TEMPLATE % (0, 100, 100)
275+
276+ plugin._get_ceph_command_output = return_output
277+
278+ self.monitor.add(plugin)
279+
280+ result = plugin._get_ceph_usage()
281+ self.assertEqual(0.0, result)
282+
283+ def test_get_ceph_usage_full_disk(self):
284+ """
285+ When the ceph command call returns output for empty disks, the
286+ _get_ceph_usage method returns 1.0 .
287+ """
288+ plugin = CephUsage(create_time=self.reactor.time)
289+
290+ def return_output():
291+ return SAMPLE_TEMPLATE % (100, 0, 100)
292+
293+ plugin._get_ceph_command_output = return_output
294+
295+ self.monitor.add(plugin)
296+
297+ result = plugin._get_ceph_usage()
298+ self.assertEqual(1.0, result)
299+
300+ def test_get_ceph_usage_no_information(self):
301+ """
302+ When the ceph command outputs something that does not contain the
303+ disk usage information, the _get_ceph_usage method returns None.
304+ """
305+ plugin = CephUsage(create_time=self.reactor.time)
306+
307+ def return_output():
308+ return "Blah\nblah"
309+
310+ plugin._get_ceph_command_output = return_output
311+
312+ self.monitor.add(plugin)
313+
314+ result = plugin._get_ceph_usage()
315+ self.assertEqual(None, result)
316+
317+ def test_never_exchange_empty_messages(self):
318+ """
319+ The plugin will create a message with an empty
320+ C{ceph-usages} list when no previous data is available. If an empty
321+ message is created during exchange, it should not be queued.
322+ """
323+ self.mstore.set_accepted_types(["ceph-usage"])
324+
325+ plugin = CephUsage(create_time=self.reactor.time)
326+ self.monitor.add(plugin)
327+
328+ self.monitor.exchange()
329+ self.assertEqual(len(self.mstore.get_pending_messages()), 0)
330+
331+ def test_exchange_messages(self):
332+ """
333+ The Ceph usage plugin queues message when manager.exchange()
334+ is called.
335+ """
336+ ring_id = "whatever"
337+ self.mstore.set_accepted_types(["ceph-usage"])
338+
339+ plugin = CephUsage(create_time=self.reactor.time)
340+ plugin._ceph_usage_points = [(60, 1.0)]
341+ plugin._ceph_ring_id = ring_id
342+ self.monitor.add(plugin)
343+
344+ self.monitor.exchange()
345+
346+ self.assertMessages(self.mstore.get_pending_messages(),
347+ [{"type": "ceph-usage",
348+ "ceph-usages": [(60, 1.0)],
349+ "ring-id": ring_id}])
350+
351+ def test_create_message(self):
352+ """
353+ Calling create_message returns an expected message.
354+ """
355+ plugin = CephUsage(create_time=self.reactor.time)
356+ self.monitor.add(plugin)
357+
358+ ring_id = "blah"
359+ plugin._ceph_usage_points = []
360+ plugin._ceph_ring_id = ring_id
361+ message = plugin.create_message()
362+
363+ self.assertIn("type", message)
364+ self.assertEqual(message["type"], "ceph-usage")
365+ self.assertIn("ceph-usages", message)
366+ self.assertEqual(ring_id, message["ring-id"])
367+ ceph_usages = message["ceph-usages"]
368+ self.assertEqual(len(ceph_usages), 0)
369+
370+ point = (60, 1.0)
371+ plugin._ceph_usage_points = [point]
372+ message = plugin.create_message()
373+ self.assertIn("type", message)
374+ self.assertEqual(message["type"], "ceph-usage")
375+ self.assertIn("ceph-usages", message)
376+ self.assertEqual(ring_id, message["ring-id"])
377+ ceph_usages = message["ceph-usages"]
378+ self.assertEqual(len(ceph_usages), 1)
379+ self.assertEqual([point], ceph_usages)
380+
381+ def test_no_message_if_not_accepted(self):
382+ """
383+ Don't add any messages at all if the broker isn't currently
384+ accepting their type.
385+ """
386+ interval = 30
387+
388+ plugin = CephUsage(create_time=self.reactor.time,
389+ interval=interval)
390+
391+ self.monitor.add(plugin)
392+
393+ self.reactor.advance(self.monitor.step_size * 2)
394+ self.monitor.exchange()
395+
396+ self.mstore.set_accepted_types(["ceph-usage"])
397+ self.assertMessages(list(self.mstore.get_pending_messages()), [])
398+
399+ def test_get_ceph_ring_id(self):
400+ """
401+ When given a well formatted command output, the _get_ceph_ring_id()
402+ method returns the correct ring_id.
403+ """
404+ plugin = CephUsage(create_time=self.reactor.time)
405+
406+ uuid = "i-am-a-uuid"
407+
408+ def return_output():
409+ return SAMPLE_QUORUM % uuid
410+
411+ plugin._get_quorum_command_output = return_output
412+
413+ self.monitor.add(plugin)
414+
415+ result = plugin._get_ceph_ring_id()
416+ self.assertEqual(uuid, result)
417+
418+ def test_get_ceph_ring_id_no_information(self):
419+ """
420+ When the _get_quorum_command_output method returns something without
421+ the ring uuid information present, the _get-ceph_ring_id method returns
422+ None.
423+ """
424+ plugin = CephUsage(create_time=self.reactor.time)
425+
426+ def return_output():
427+ return "Blah\nblah"
428+
429+ plugin._get_quorum_command_output = return_output
430+
431+ self.monitor.add(plugin)
432+
433+ result = plugin._get_ceph_ring_id()
434+ self.assertEqual(None, result)
435+
436+ def test_plugin_run(self):
437+ """
438+ The plugin's run() method fills the _ceph_usage_points with
439+ accumulated samples after each C{monitor.step_size} period.
440+ The _ceph_ring_id member of the plugin is also filled with the output
441+ of the _get_ceph_ring_id method.
442+ """
443+ plugin = CephUsage(create_time=self.reactor.time)
444+ uuid = "i-am-a-unique-snowflake"
445+
446+ def return_quorum():
447+ return SAMPLE_QUORUM % uuid
448+
449+ def return_full_disk():
450+ return SAMPLE_TEMPLATE % (100, 0, 100)
451+
452+ plugin._ceph_config = "/etc/hosts"
453+ plugin._get_quorum_command_output = return_quorum
454+ plugin._get_ceph_command_output = return_full_disk
455+
456+ self.monitor.add(plugin)
457+
458+ self.reactor.advance(self.monitor.step_size * 2)
459+
460+ self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points)
461+ self.assertEqual(uuid, plugin._ceph_ring_id)

Subscribers

People subscribed via source and target branches

to all changes: