Merge lp:~tribaal/landscape-client/ceph-usage-report into lp:~landscape/landscape-client/trunk

Proposed by Chris Glass
Status: Merged
Approved by: Chris Glass
Approved revision: 611
Merged at revision: 605
Proposed branch: lp:~tribaal/landscape-client/ceph-usage-report
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 461 lines (+435/-1)
3 files modified
landscape/message_schemas.py (+7/-1)
landscape/monitor/ceph.py (+150/-0)
landscape/monitor/tests/test_ceph.py (+278/-0)
To merge this branch: bzr merge lp:~tribaal/landscape-client/ceph-usage-report
Reviewer Review Type Date Requested Status
Alberto Donato (community) Approve
Jerry Seutter (community) Approve
Review via email: mp+143048@code.launchpad.net

Commit message

Add a Ceph storage monitor plugin that reports Ceph disk usage to the server

Description of the change

This branch adds a Ceph storage monitor plugin that reports ceph usage to the server if usage statistics are available from the host (namely, it tries running "ceph status" and "ceph quorum_status" to gather ring information).

This approach was chosen because the current state of the python library does not allow to extract this information easily. It is however pretty simple to change that in the future.

To post a comment you must log in.
Revision history for this message
Jerry Seutter (jseutter) wrote :

+1 looks good

Just some lintian errors:
landscape/message_schemas.py:116:14: E203 whitespace before ':'
landscape/monitor/ceph.py:12:1: E302 expected 2 blank lines, found 1
landscape/monitor/tests/test_ceph.py:5:80: E501 line too long (80 characters)
landscape/monitor/tests/test_ceph.py:223:9: E301 expected 1 blank line, found 0

review: Approve
Revision history for this message
Jerry Seutter (jseutter) wrote :

+1 looks good

Just some lintian errors:
landscape/message_schemas.py:116:14: E203 whitespace before ':'
landscape/monitor/ceph.py:12:1: E302 expected 2 blank lines, found 1
landscape/monitor/tests/test_ceph.py:5:80: E501 line too long (80 characters)
landscape/monitor/tests/test_ceph.py:223:9: E301 expected 1 blank line, found 0

review: Approve
610. By Chris Glass

lint fixes

Revision history for this message
Chris Glass (tribaal) wrote :

All errors should be fixed.

Revision history for this message
Alberto Donato (ack) wrote :

Looks good! +1

A few nitpicks:

#1:
+ ring_id = self._ceph_ring_id
+ if ring_id is None:
+ ring_id = self._get_ceph_ring_id()
+ self._ceph_ring_id = ring_id

I think you can write this as

+ if self._ceph_ring_id is None:
+ self._ceph_ring_id = self._get_ceph_ring_id()

#2:
+ self.assertNotEqual([], plugin._ceph_usage_points)
+ self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points)

The first assert is redundant.

#3:
+ if len(message["ceph-usages"]) and message["ring-id"] is not None:

len() can be dropped.

review: Approve
611. By Chris Glass

Fixes all comments.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'landscape/message_schemas.py'
--- landscape/message_schemas.py 2013-01-09 13:20:10 +0000
+++ landscape/message_schemas.py 2013-01-15 10:26:22 +0000
@@ -111,6 +111,11 @@
111 "cpu-usages": List(Tuple(Int(), Float())),111 "cpu-usages": List(Tuple(Int(), Float())),
112 })112 })
113113
114CEPH_USAGE = Message("ceph-usage", {
115 "ceph-usages": List(Tuple(Int(), Float())),
116 "ring-id": utf8,
117 })
118
114MEMORY_INFO = Message("memory-info", {119MEMORY_INFO = Message("memory-info", {
115 "memory-info": List(Tuple(Float(), Int(), Int())),120 "memory-info": List(Tuple(Float(), Int(), Int())),
116 })121 })
@@ -420,5 +425,6 @@
420 ADD_PACKAGES, PACKAGE_REPORTER_RESULT, TEXT_MESSAGE, TEST,425 ADD_PACKAGES, PACKAGE_REPORTER_RESULT, TEXT_MESSAGE, TEST,
421 CUSTOM_GRAPH, REBOOT_REQUIRED, APT_PREFERENCES, EUCALYPTUS_INFO,426 CUSTOM_GRAPH, REBOOT_REQUIRED, APT_PREFERENCES, EUCALYPTUS_INFO,
422 EUCALYPTUS_INFO_ERROR, NETWORK_DEVICE, NETWORK_ACTIVITY,427 EUCALYPTUS_INFO_ERROR, NETWORK_DEVICE, NETWORK_ACTIVITY,
423 REBOOT_REQUIRED_INFO, UPDATE_MANAGER_INFO, CPU_USAGE]:428 REBOOT_REQUIRED_INFO, UPDATE_MANAGER_INFO, CPU_USAGE,
429 CEPH_USAGE]:
424 message_schemas[schema.type] = schema430 message_schemas[schema.type] = schema
425431
=== added file 'landscape/monitor/ceph.py'
--- landscape/monitor/ceph.py 1970-01-01 00:00:00 +0000
+++ landscape/monitor/ceph.py 2013-01-15 10:26:22 +0000
@@ -0,0 +1,150 @@
1import time
2import os
3
4from landscape.accumulate import Accumulator
5from landscape.lib.monitor import CoverageMonitor
6from landscape.lib.command import run_command, CommandError
7from landscape.monitor.plugin import MonitorPlugin
8
9ACCUMULATOR_KEY = "ceph-usage-accumulator"
10CEPH_CONFIG_FILE = "/etc/ceph/ceph.conf"
11
12
13class CephUsage(MonitorPlugin):
14 """
15 Plugin that captures Ceph usage information. This only works if the client
16 runs on one of the Ceph monitor nodes, and it noops otherwise.
17 """
18 persist_name = "ceph-usage"
19 # Prevent the Plugin base-class from scheduling looping calls.
20 run_interval = None
21
22 def __init__(self, interval=30, monitor_interval=60 * 60,
23 create_time=time.time):
24 self._interval = interval
25 self._monitor_interval = monitor_interval
26 self._ceph_usage_points = []
27 self._ceph_ring_id = None
28 self._create_time = create_time
29 self._ceph_config = CEPH_CONFIG_FILE
30
31 def register(self, registry):
32 super(CephUsage, self).register(registry)
33 self._accumulate = Accumulator(self._persist, registry.step_size)
34
35 self.registry.reactor.call_every(self._interval, self.run)
36
37 self._monitor = CoverageMonitor(self._interval, 0.8,
38 "Ceph usage snapshot",
39 create_time=self._create_time)
40 self.registry.reactor.call_every(self._monitor_interval,
41 self._monitor.log)
42 self.registry.reactor.call_on("stop", self._monitor.log, priority=2000)
43 self.call_on_accepted("ceph-usage", self.send_message, True)
44
45 def create_message(self):
46 ceph_points = self._ceph_usage_points
47 ring_id = self._ceph_ring_id
48 self._ceph_usage_points = []
49 return {"type": "ceph-usage", "ceph-usages": ceph_points,
50 "ring-id": ring_id}
51
52 def send_message(self, urgent=False):
53 message = self.create_message()
54 if message["ceph-usages"] and message["ring-id"] is not None:
55 self.registry.broker.send_message(message, urgent=urgent)
56
57 def exchange(self, urgent=False):
58 self.registry.broker.call_if_accepted("ceph-usage",
59 self.send_message, urgent)
60
61 def run(self):
62 self._monitor.ping()
63
64 config_file = self._ceph_config
65 # Check if a ceph config file is available. No need to run anything
66 # if we know that we're not on a Ceph monitor node anyway.
67 if not os.path.exists(config_file):
68 # There is no config file - it's not a ceph machine.
69 return None
70
71 # Extract the ceph ring Id and cache it.
72 if self._ceph_ring_id is None:
73 self._ceph_ring_id = self._get_ceph_ring_id()
74
75 new_timestamp = int(self._create_time())
76 new_ceph_usage = self._get_ceph_usage()
77
78 step_data = None
79 if new_ceph_usage is not None:
80 step_data = self._accumulate(new_timestamp, new_ceph_usage,
81 ACCUMULATOR_KEY)
82 if step_data is not None:
83 self._ceph_usage_points.append(step_data)
84
85 def _get_ceph_usage(self):
86 """
87 Grab the ceph usage data by parsing the output of the "ceph status"
88 command output.
89 """
90 output = self._get_ceph_command_output()
91
92 if output is None:
93 return None
94
95 lines = output.split("\n")
96
97 pg_line = None
98 for line in lines:
99 if "pgmap" in line:
100 pg_line = line.split()
101 break
102
103 if pg_line is None:
104 return None
105
106 total = pg_line[-3] # Total space
107 available = pg_line[-6] # Available for objects
108 #used = pg_line[-9] # Used by objects
109 # Note: used + available is NOT equal to total (there is some used
110 # space for duplication and system info etc...)
111
112 filled = int(total) - int(available)
113
114 return filled / float(total)
115
116 def _get_ceph_command_output(self):
117 try:
118 output = run_command("ceph status")
119 except (OSError, CommandError):
120 # If the command line client isn't available, we assume it's not
121 # a ceph monitor machine.
122 return None
123 return output
124
125 def _get_ceph_ring_id(self):
126 output = self._get_quorum_command_output()
127 lines = output.split("\n")
128 fsid_line = None
129 for line in lines:
130 if "fsid" in line:
131 fsid_line = line.split()
132 break
133
134 if fsid_line is None:
135 return None
136
137 wrapped_id = fsid_line[-1]
138 ring_id = wrapped_id.replace('",', '')
139 ring_id = ring_id.replace('"', '')
140
141 return ring_id
142
143 def _get_quorum_command_output(self):
144 try:
145 output = run_command("ceph quorum_status")
146 except (OSError, CommandError):
147 # If the command line client isn't available, we assume it's not
148 # a ceph monitor machine.
149 return None
150 return output
0151
=== added file 'landscape/monitor/tests/test_ceph.py'
--- landscape/monitor/tests/test_ceph.py 1970-01-01 00:00:00 +0000
+++ landscape/monitor/tests/test_ceph.py 2013-01-15 10:26:22 +0000
@@ -0,0 +1,278 @@
1from landscape.tests.helpers import LandscapeTest, MonitorHelper
2from landscape.monitor.ceph import CephUsage
3
4
5SAMPLE_TEMPLATE = (" health HEALTH_WARN 6 pgs degraded; 6 pgs stuck "
6"unclean\n"
7"monmap e2: 3 mons at {server-269703f4-5217-495a-b7f2-b3b3473c1719="
8"10.55.60.238:6789/0,server-3f370698-f3b0-4cbe-8db9-a18e304c952b="
9"10.55.60.141:6789/0,server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff="
10"10.55.60.241:6789/0}, election epoch 8, quorum 0,1,2 "
11"server-269703f4-5217-495a-b7f2-b3b3473c1719,"
12"server-3f370698-f3b0-4cbe-8db9-a18e304c952b,"
13"server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff\n "
14"osdmap e9: 3 osds: 3 up, 3 in\n "
15"pgmap v114: 192 pgs: 186 active+clean, 6 active+degraded; "
16"0 bytes data, %s MB used, %s MB / %s MB avail\n "
17"mdsmap e1: 0/0/1 up\n\n")
18
19SAMPLE_OUTPUT = SAMPLE_TEMPLATE % (4296, 53880, 61248)
20
21SAMPLE_QUORUM = (''
22'{ "election_epoch": 8,\n'
23' "quorum": [\n'
24' 0,\n'
25' 1,\n'
26' 2],\n'
27' "monmap": { "epoch": 2,\n'
28' "fsid": "%s",\n'
29' "modified": "2013-01-13 16:58:00.141737",\n'
30' "created": "0.000000",\n'
31' "mons": [\n'
32' { "rank": 0,\n'
33' "name": "server-1be72d64-0ff2-4ac1-ad13-1c06c8201011",\n'
34' "addr": "10.55.60.188:6789\/0"},\n'
35' { "rank": 1,\n'
36' "name": "server-e847f147-ed13-46c2-8e6d-768aa32657ab",\n'
37' "addr": "10.55.60.202:6789\/0"},\n'
38' { "rank": 2,\n'
39' "name": "server-3c831a0b-51d5-43a9-95d5-63644f0965cc",\n'
40' "addr": "10.55.60.205:6789\/0"}]}}\n'
41)
42
43SAMPLE_QUORUM_OUTPUT = SAMPLE_QUORUM % "ecbb8960-0e21-11e2-b495-83a88f44db01"
44
45
46class CephUsagePluginTest(LandscapeTest):
47 helpers = [MonitorHelper]
48
49 def test_get_ceph_usage_if_command_not_found(self):
50 """
51 When the ceph command cannot be found or accessed, the
52 C{_get_ceph_usage} method returns None.
53 """
54 plugin = CephUsage(create_time=self.reactor.time)
55
56 def return_none():
57 return None
58
59 plugin._get_ceph_command_output = return_none
60
61 self.monitor.add(plugin)
62
63 result = plugin._get_ceph_usage()
64 self.assertIs(None, result)
65
66 def test_get_ceph_usage(self):
67 """
68 When the ceph command call returns output, the _get_ceph_usage method
69 returns the percentage of used space.
70 """
71 plugin = CephUsage(create_time=self.reactor.time)
72
73 def return_output():
74 return SAMPLE_OUTPUT
75
76 plugin._get_ceph_command_output = return_output
77
78 self.monitor.add(plugin)
79
80 result = plugin._get_ceph_usage()
81 self.assertEqual(0.12029780564263323, result)
82
83 def test_get_ceph_usage_empty_disk(self):
84 """
85 When the ceph command call returns output for empty disks, the
86 _get_ceph_usage method returns 0.0 .
87 """
88 plugin = CephUsage(create_time=self.reactor.time)
89
90 def return_output():
91 return SAMPLE_TEMPLATE % (0, 100, 100)
92
93 plugin._get_ceph_command_output = return_output
94
95 self.monitor.add(plugin)
96
97 result = plugin._get_ceph_usage()
98 self.assertEqual(0.0, result)
99
100 def test_get_ceph_usage_full_disk(self):
101 """
102 When the ceph command call returns output for empty disks, the
103 _get_ceph_usage method returns 1.0 .
104 """
105 plugin = CephUsage(create_time=self.reactor.time)
106
107 def return_output():
108 return SAMPLE_TEMPLATE % (100, 0, 100)
109
110 plugin._get_ceph_command_output = return_output
111
112 self.monitor.add(plugin)
113
114 result = plugin._get_ceph_usage()
115 self.assertEqual(1.0, result)
116
117 def test_get_ceph_usage_no_information(self):
118 """
119 When the ceph command outputs something that does not contain the
120 disk usage information, the _get_ceph_usage method returns None.
121 """
122 plugin = CephUsage(create_time=self.reactor.time)
123
124 def return_output():
125 return "Blah\nblah"
126
127 plugin._get_ceph_command_output = return_output
128
129 self.monitor.add(plugin)
130
131 result = plugin._get_ceph_usage()
132 self.assertEqual(None, result)
133
134 def test_never_exchange_empty_messages(self):
135 """
136 The plugin will create a message with an empty
137 C{ceph-usages} list when no previous data is available. If an empty
138 message is created during exchange, it should not be queued.
139 """
140 self.mstore.set_accepted_types(["ceph-usage"])
141
142 plugin = CephUsage(create_time=self.reactor.time)
143 self.monitor.add(plugin)
144
145 self.monitor.exchange()
146 self.assertEqual(len(self.mstore.get_pending_messages()), 0)
147
148 def test_exchange_messages(self):
149 """
150 The Ceph usage plugin queues message when manager.exchange()
151 is called.
152 """
153 ring_id = "whatever"
154 self.mstore.set_accepted_types(["ceph-usage"])
155
156 plugin = CephUsage(create_time=self.reactor.time)
157 plugin._ceph_usage_points = [(60, 1.0)]
158 plugin._ceph_ring_id = ring_id
159 self.monitor.add(plugin)
160
161 self.monitor.exchange()
162
163 self.assertMessages(self.mstore.get_pending_messages(),
164 [{"type": "ceph-usage",
165 "ceph-usages": [(60, 1.0)],
166 "ring-id": ring_id}])
167
168 def test_create_message(self):
169 """
170 Calling create_message returns an expected message.
171 """
172 plugin = CephUsage(create_time=self.reactor.time)
173 self.monitor.add(plugin)
174
175 ring_id = "blah"
176 plugin._ceph_usage_points = []
177 plugin._ceph_ring_id = ring_id
178 message = plugin.create_message()
179
180 self.assertIn("type", message)
181 self.assertEqual(message["type"], "ceph-usage")
182 self.assertIn("ceph-usages", message)
183 self.assertEqual(ring_id, message["ring-id"])
184 ceph_usages = message["ceph-usages"]
185 self.assertEqual(len(ceph_usages), 0)
186
187 point = (60, 1.0)
188 plugin._ceph_usage_points = [point]
189 message = plugin.create_message()
190 self.assertIn("type", message)
191 self.assertEqual(message["type"], "ceph-usage")
192 self.assertIn("ceph-usages", message)
193 self.assertEqual(ring_id, message["ring-id"])
194 ceph_usages = message["ceph-usages"]
195 self.assertEqual(len(ceph_usages), 1)
196 self.assertEqual([point], ceph_usages)
197
198 def test_no_message_if_not_accepted(self):
199 """
200 Don't add any messages at all if the broker isn't currently
201 accepting their type.
202 """
203 interval = 30
204
205 plugin = CephUsage(create_time=self.reactor.time,
206 interval=interval)
207
208 self.monitor.add(plugin)
209
210 self.reactor.advance(self.monitor.step_size * 2)
211 self.monitor.exchange()
212
213 self.mstore.set_accepted_types(["ceph-usage"])
214 self.assertMessages(list(self.mstore.get_pending_messages()), [])
215
216 def test_get_ceph_ring_id(self):
217 """
218 When given a well formatted command output, the _get_ceph_ring_id()
219 method returns the correct ring_id.
220 """
221 plugin = CephUsage(create_time=self.reactor.time)
222
223 uuid = "i-am-a-uuid"
224
225 def return_output():
226 return SAMPLE_QUORUM % uuid
227
228 plugin._get_quorum_command_output = return_output
229
230 self.monitor.add(plugin)
231
232 result = plugin._get_ceph_ring_id()
233 self.assertEqual(uuid, result)
234
235 def test_get_ceph_ring_id_no_information(self):
236 """
237 When the _get_quorum_command_output method returns something without
238 the ring uuid information present, the _get-ceph_ring_id method returns
239 None.
240 """
241 plugin = CephUsage(create_time=self.reactor.time)
242
243 def return_output():
244 return "Blah\nblah"
245
246 plugin._get_quorum_command_output = return_output
247
248 self.monitor.add(plugin)
249
250 result = plugin._get_ceph_ring_id()
251 self.assertEqual(None, result)
252
253 def test_plugin_run(self):
254 """
255 The plugin's run() method fills the _ceph_usage_points with
256 accumulated samples after each C{monitor.step_size} period.
257 The _ceph_ring_id member of the plugin is also filled with the output
258 of the _get_ceph_ring_id method.
259 """
260 plugin = CephUsage(create_time=self.reactor.time)
261 uuid = "i-am-a-unique-snowflake"
262
263 def return_quorum():
264 return SAMPLE_QUORUM % uuid
265
266 def return_full_disk():
267 return SAMPLE_TEMPLATE % (100, 0, 100)
268
269 plugin._ceph_config = "/etc/hosts"
270 plugin._get_quorum_command_output = return_quorum
271 plugin._get_ceph_command_output = return_full_disk
272
273 self.monitor.add(plugin)
274
275 self.reactor.advance(self.monitor.step_size * 2)
276
277 self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points)
278 self.assertEqual(uuid, plugin._ceph_ring_id)

Subscribers

People subscribed via source and target branches

to all changes: