Merge lp:~ack/landscape-client/ceph-monitor-plugin into lp:~landscape/landscape-client/trunk

Proposed by Alberto Donato
Status: Merged
Approved by: Alberto Donato
Approved revision: 694
Merged at revision: 684
Proposed branch: lp:~ack/landscape-client/ceph-monitor-plugin
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 939 lines (+282/-410)
7 files modified
landscape/lib/command.py (+0/-38)
landscape/lib/tests/test_command.py (+0/-32)
landscape/manager/config.py (+1/-1)
landscape/manager/tests/test_config.py (+2/-3)
landscape/monitor/cephusage.py (+108/-93)
landscape/monitor/config.py (+2/-1)
landscape/monitor/tests/test_cephusage.py (+169/-242)
To merge this branch: bzr merge lp:~ack/landscape-client/ceph-monitor-plugin
Reviewer Review Type Date Requested Status
Free Ekanayaka (community) Approve
Chris Glass (community) Approve
Review via email: mp+167056@code.launchpad.net

Commit message

This changes CephUsage from being a manager plugin to a monitor one, so it doesn't run as root.
It now looks for a landscape-specific ceph config and key in /var/lib/landscape/client/ceph-client, which is passed to the "ceph".

Description of the change

This changes CephUsage from being a manager plugin to a monitor one, so it doesn't run as root.
It now looks for a landscape-specific ceph config and key in /var/lib/landscape/client/ceph-client, which is passed to the "ceph".

The landscape-client charm at lp:~ack/charms/precise/landscape-client/ceph-client-interface creates the required ceph config for the plugin.

To post a comment you must log in.
Revision history for this message
Chris Glass (tribaal) wrote :

Nice, it's much cleaner as a monitor plugin! +1
Thanks for refactoring the tests too :)

[1]
Please add some docstrings comments on the monitor plugin class to explain how to generate a usable ceph key, since some people might need/want to use the ceph feature of landscape with a legacy ceph insallation or a non-charm one.
There is already some description of how it works in the docstring for _run_ceph_command() but it would be good to have a quick refresher in the class/module docstring as well.

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Looks good, +1!

#1

+ The configured keyring can be generated with:
+
+ ceph-authtool <keyring-file> --create-keyring
+ --name=client.landscape-client --add-key=<key>

Please add something like "This will be done automatically by the landscape-client charm, when deploying as a ceph node subordinate".

#2

+ output = run_command(" ".join(command))

To be picky, this should use Twisted's process protocol stuff (we have some helper for that), otherwise the reactor will block. It's important in case the ceph command hangs for some reason, for example. It can be addressed in a separate branch.

#3

Please mark the tests where you access private methods as whitebox ("test_wb_xxx").

review: Approve
Revision history for this message
🤖 Landscape Builder (landscape-builder) wrote :

There are additional revisions which have not been approved in review. Please seek review and approval of these new revisions.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== removed file 'landscape/lib/command.py'
2--- landscape/lib/command.py 2009-03-27 13:07:24 +0000
3+++ landscape/lib/command.py 1970-01-01 00:00:00 +0000
4@@ -1,38 +0,0 @@
5-"""Shell commands execution."""
6-import commands
7-
8-class CommandError(Exception):
9- """
10- Raised by L{run_command} in case of non-0 exit code.
11-
12- @cvar command: The shell command that failed.
13- @cvar exit_status: Its non-zero exit status.
14- @cvar output: The command's output.
15- """
16- def __init__(self, command, exit_status, output):
17- self.command = command
18- self.exit_status = exit_status
19- self.output = output
20-
21- def __str__(self):
22- return "'%s' exited with status %d (%s)" % (
23- self.command, self.exit_status, self.output)
24-
25- def __repr__(self):
26- return "<CommandError command=<%s> exit_status=%d output=<%s>>" % (
27- self.command, self.exit_status, self.output)
28-
29-
30-def run_command(command):
31- """
32- Execute a command in a shell and return the command's output.
33-
34- If the command's exit status is not 0 a L{CommandError} exception
35- is raised.
36- """
37- exit_status, output = commands.getstatusoutput(command)
38- # shift down 8 bits to get shell-like exit codes
39- exit_status = exit_status >> 8
40- if exit_status != 0:
41- raise CommandError(command, exit_status, output)
42- return output
43
44=== removed file 'landscape/lib/tests/test_command.py'
45--- landscape/lib/tests/test_command.py 2011-07-05 05:09:11 +0000
46+++ landscape/lib/tests/test_command.py 1970-01-01 00:00:00 +0000
47@@ -1,32 +0,0 @@
48-from landscape.tests.helpers import LandscapeTest
49-
50-from landscape.lib.command import run_command, CommandError
51-
52-
53-class CommandTest(LandscapeTest):
54-
55- def setUp(self):
56- super(CommandTest, self).setUp()
57-
58- def test_basic(self):
59- self.assertEqual(run_command("echo test"), "test")
60-
61- def test_non_0_exit_status(self):
62- try:
63- run_command("false")
64- except CommandError, error:
65- self.assertEqual(error.command, "false")
66- self.assertEqual(error.output, "")
67- self.assertEqual(error.exit_status, 1)
68- else:
69- self.fail("CommandError not raised")
70-
71- def test_error_str(self):
72- self.assertEqual(str(CommandError("test_command", 1, "test output")),
73- "'test_command' exited with status 1 "
74- "(test output)")
75-
76- def test_error_repr(self):
77- self.assertEqual(repr(CommandError("test_command", 1, "test output")),
78- "<CommandError command=<test_command> "
79- "exit_status=1 output=<test output>>")
80
81=== modified file 'landscape/manager/config.py'
82--- landscape/manager/config.py 2013-04-04 11:08:12 +0000
83+++ landscape/manager/config.py 2013-06-04 16:15:35 +0000
84@@ -6,7 +6,7 @@
85
86 ALL_PLUGINS = ["ProcessKiller", "PackageManager", "UserManager",
87 "ShutdownManager", "AptSources", "HardwareInfo",
88- "CephUsage", "KeystoneToken", "HAService"]
89+ "KeystoneToken", "HAService"]
90
91
92 class ManagerConfiguration(Configuration):
93
94=== modified file 'landscape/manager/tests/test_config.py'
95--- landscape/manager/tests/test_config.py 2013-04-04 11:38:19 +0000
96+++ landscape/manager/tests/test_config.py 2013-06-04 16:15:35 +0000
97@@ -12,9 +12,8 @@
98 def test_plugin_factories(self):
99 """By default all plugins are enabled."""
100 self.assertEqual(["ProcessKiller", "PackageManager", "UserManager",
101- "ShutdownManager", "AptSources",
102- "HardwareInfo", "CephUsage", "KeystoneToken",
103- "HAService"],
104+ "ShutdownManager", "AptSources", "HardwareInfo",
105+ "KeystoneToken", "HAService"],
106 ALL_PLUGINS)
107 self.assertEqual(ALL_PLUGINS, self.config.plugin_factories)
108
109
110=== renamed file 'landscape/manager/cephusage.py' => 'landscape/monitor/cephusage.py'
111--- landscape/manager/cephusage.py 2013-04-02 14:56:19 +0000
112+++ landscape/monitor/cephusage.py 2013-06-04 16:15:35 +0000
113@@ -4,63 +4,70 @@
114 import logging
115 import re
116
117+from twisted.internet.defer import inlineCallbacks, returnValue
118+
119 from landscape.accumulate import Accumulator
120 from landscape.lib.monitor import CoverageMonitor
121-from landscape.lib.command import run_command, CommandError
122-from landscape.lib.persist import Persist
123-from landscape.manager.plugin import ManagerPlugin
124-
125-ACCUMULATOR_KEY = "ceph-usage-accumulator"
126-CEPH_CONFIG_FILE = "/etc/ceph/ceph.conf"
127-
128-EXP = re.compile(".*pgmap.*data, (\d+) MB used, (\d+) MB / (\d+) MB avail.*",
129- flags=re.S)
130-
131-
132-class CephUsage(ManagerPlugin):
133+from landscape.lib.twisted_util import spawn_process
134+from landscape.monitor.plugin import MonitorPlugin
135+
136+
137+class CephUsage(MonitorPlugin):
138 """
139 Plugin that captures Ceph usage information. This only works if the client
140 runs on one of the Ceph monitor nodes, and it noops otherwise.
141+
142+ The plugin requires the 'ceph' command to be available, which is run with a
143+ config file in <data_path>/ceph-client/ceph.landscape-client.conf with the
144+ following config:
145+
146+ [global]
147+ auth supported = cephx
148+ keyring = <keyring-file>
149+ mon host = <ip>:6789
150+
151+ The configured keyring can be generated with:
152+
153+ ceph-authtool <keyring-file> --create-keyring
154+ --name=client.landscape-client --add-key=<key>
155+
156+ The landscape-client charm automatically provides the client configuration
157+ and key when deployed as subordinate of a ceph node.
158 """
159+
160 persist_name = "ceph-usage"
161 # Prevent the Plugin base-class from scheduling looping calls.
162 run_interval = None
163
164- def __init__(self, interval=30, exchange_interval=60 * 60,
165+ _usage_regexp = re.compile(
166+ ".*pgmap.*data, (\d+) MB used, (\d+) MB / (\d+) MB avail.*",
167+ flags=re.S)
168+
169+ def __init__(self, interval=30, monitor_interval=60 * 60,
170 create_time=time.time):
171 self._interval = interval
172- self._exchange_interval = exchange_interval
173+ self._monitor_interval = monitor_interval
174 self._ceph_usage_points = []
175 self._ceph_ring_id = None
176 self._create_time = create_time
177- self._ceph_config = CEPH_CONFIG_FILE
178+ self._ceph_config = None
179
180 def register(self, registry):
181 super(CephUsage, self).register(registry)
182+ self._ceph_config = os.path.join(
183+ self.registry.config.data_path, "ceph-client",
184+ "ceph.landscape-client.conf")
185+
186+ self._accumulate = Accumulator(self._persist, self._interval)
187+ self._monitor = CoverageMonitor(
188+ self._interval, 0.8, "Ceph usage snapshot",
189+ create_time=self._create_time)
190+
191 self.registry.reactor.call_every(self._interval, self.run)
192-
193- self._persist_filename = os.path.join(self.registry.config.data_path,
194- "ceph.bpickle")
195- self._persist = Persist(filename=self._persist_filename)
196-
197- self._accumulate = Accumulator(self._persist, self._interval)
198-
199- self._monitor = CoverageMonitor(self._interval, 0.8,
200- "Ceph usage snapshot",
201- create_time=self._create_time)
202- self.registry.reactor.call_every(self._exchange_interval,
203- self._monitor.log)
204+ self.registry.reactor.call_every(
205+ self._monitor_interval, self._monitor.log)
206 self.registry.reactor.call_on("stop", self._monitor.log, priority=2000)
207 self.call_on_accepted("ceph-usage", self.send_message, True)
208- self.registry.reactor.call_on("resynchronize", self._resynchronize)
209- self.registry.reactor.call_every(self.registry.config.flush_interval,
210- self.flush)
211-
212- def _resynchronize(self):
213- self._persist.remove(self.persist_name)
214-
215- def flush(self):
216- self._persist.save(self._persist_filename)
217
218 def create_message(self):
219 ceph_points = self._ceph_usage_points
220@@ -75,30 +82,31 @@
221 self.registry.broker.send_message(message, urgent=urgent)
222
223 def exchange(self, urgent=False):
224- self.registry.broker.call_if_accepted("ceph-usage",
225- self.send_message, urgent)
226+ self.registry.broker.call_if_accepted(
227+ "ceph-usage", self.send_message, urgent)
228
229+ @inlineCallbacks
230 def run(self):
231 self._monitor.ping()
232
233- config_file = self._ceph_config
234- # Check if a ceph config file is available. No need to run anything
235- # if we know that we're not on a Ceph monitor node anyway.
236- if not os.path.exists(config_file):
237- # There is no config file - it's not a ceph machine.
238- return None
239+ # Check if a ceph config file is available. If it's not , it's not a
240+ # ceph machine or ceph is set up yet. No need to run anything in this
241+ # case.
242+ if self._ceph_config is None or not os.path.exists(self._ceph_config):
243+ returnValue(None)
244
245 # Extract the ceph ring Id and cache it.
246 if self._ceph_ring_id is None:
247- self._ceph_ring_id = self._get_ceph_ring_id()
248+ self._ceph_ring_id = yield self._get_ceph_ring_id()
249
250 new_timestamp = int(self._create_time())
251- new_ceph_usage = self._get_ceph_usage()
252+ new_ceph_usage = yield self._get_ceph_usage()
253
254 step_data = None
255 if new_ceph_usage is not None:
256- step_data = self._accumulate(new_timestamp, new_ceph_usage,
257- ACCUMULATOR_KEY)
258+ step_data = self._accumulate(
259+ new_timestamp, new_ceph_usage, "ceph-usage-accumulator")
260+
261 if step_data is not None:
262 self._ceph_usage_points.append(step_data)
263
264@@ -107,52 +115,59 @@
265 Grab the ceph usage data by parsing the output of the "ceph status"
266 command output.
267 """
268- output = self._get_ceph_command_output()
269-
270- if output is None:
271- return None
272-
273- result = EXP.match(output)
274-
275- if not result:
276- logging.error("Could not parse command output: '%s'." % output)
277- return None
278-
279- (used, available, total) = result.groups()
280- # Note: used + available is NOT equal to total (there is some used
281- # space for duplication and system info etc...)
282-
283- filled = int(total) - int(available)
284-
285- return filled / float(total)
286-
287- def _get_ceph_command_output(self):
288- try:
289- output = run_command("ceph status")
290- except (OSError, CommandError):
291- # If the command line client isn't available, we assume it's not
292- # a ceph monitor machine.
293- return None
294- return output
295+
296+ def parse(output):
297+ if output is None:
298+ return None
299+
300+ result = self._usage_regexp.match(output)
301+ if not result:
302+ logging.error("Could not parse command output: '%s'." % output)
303+ return None
304+
305+ (used, available, total) = result.groups()
306+ # Note: used + available is NOT equal to total (there is some used
307+ # space for duplication and system info etc...)
308+ filled = int(total) - int(available)
309+ return filled / float(total)
310+
311+ return self._get_status_command_output().addCallback(parse)
312+
313+ def _get_status_command_output(self):
314+ return self._run_ceph_command("status")
315
316 def _get_ceph_ring_id(self):
317- output = self._get_quorum_command_output()
318- if output is None:
319- return None
320- try:
321- quorum_status = json.loads(output)
322- ring_id = quorum_status["monmap"]["fsid"]
323- except:
324- logging.error(
325- "Could not get ring_id from output: '%s'." % output)
326- return None
327- return ring_id
328+ """Extract ceph ring id from ceph command output."""
329+
330+ def parse(output):
331+ if output is None:
332+ return None
333+
334+ try:
335+ quorum_status = json.loads(output)
336+ ring_id = quorum_status["monmap"]["fsid"]
337+ except:
338+ logging.error(
339+ "Could not get ring_id from output: '%s'." % output)
340+ return None
341+ return ring_id
342+
343+ return self._get_quorum_command_output().addCallback(parse)
344
345 def _get_quorum_command_output(self):
346- try:
347- output = run_command("ceph quorum_status")
348- except (OSError, CommandError):
349- # If the command line client isn't available, we assume it's not
350- # a ceph monitor machine.
351- return None
352- return output
353+ return self._run_ceph_command("quorum_status")
354+
355+ def _run_ceph_command(self, *args):
356+ """
357+ Run the ceph command with the specified options using landscape ceph
358+ key. The keyring is expected to contain a configuration stanza with a
359+ key for the "client.landscape-client" id.
360+ """
361+ params = ["--conf", self._ceph_config, "--id", "landscape-client"]
362+ params.extend(args)
363+ deferred = spawn_process("ceph", args=params)
364+ # If the command line client isn't available, we assume it's not a ceph
365+ # monitor machine.
366+ deferred.addCallback(
367+ lambda (out, err, code): out if code == 0 else None)
368+ return deferred
369
370=== modified file 'landscape/monitor/config.py'
371--- landscape/monitor/config.py 2013-05-10 11:02:24 +0000
372+++ landscape/monitor/config.py 2013-06-04 16:15:35 +0000
373@@ -5,7 +5,8 @@
374 "LoadAverage", "MemoryInfo", "MountInfo", "ProcessorInfo",
375 "Temperature", "PackageMonitor", "UserMonitor",
376 "RebootRequired", "AptPreferences", "NetworkActivity",
377- "NetworkDevice", "UpdateManager", "CPUUsage", "SwiftDeviceInfo"]
378+ "NetworkDevice", "UpdateManager", "CPUUsage", "SwiftDeviceInfo",
379+ "CephUsage"]
380
381
382 class MonitorConfiguration(Configuration):
383
384=== renamed file 'landscape/manager/tests/test_ceph.py' => 'landscape/monitor/tests/test_cephusage.py'
385--- landscape/manager/tests/test_ceph.py 2013-04-02 14:56:19 +0000
386+++ landscape/monitor/tests/test_cephusage.py 2013-06-04 16:15:35 +0000
387@@ -1,171 +1,147 @@
388 import os
389 import json
390
391-from landscape.tests.helpers import LandscapeTest, ManagerHelper
392-from landscape.manager.cephusage import CephUsage
393-
394-
395-SAMPLE_OLD_TEMPLATE = (" health HEALTH_WARN 6 pgs degraded; 6 pgs stuck "
396-"unclean\n"
397-"monmap e2: 3 mons at {server-269703f4-5217-495a-b7f2-b3b3473c1719="
398-"10.55.60.238:6789/0,server-3f370698-f3b0-4cbe-8db9-a18e304c952b="
399-"10.55.60.141:6789/0,server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff="
400-"10.55.60.241:6789/0}, election epoch 8, quorum 0,1,2 "
401-"server-269703f4-5217-495a-b7f2-b3b3473c1719,"
402-"server-3f370698-f3b0-4cbe-8db9-a18e304c952b,"
403-"server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff\n "
404-"osdmap e9: 3 osds: 3 up, 3 in\n "
405-"pgmap v114: 192 pgs: 186 active+clean, 6 active+degraded; "
406-"0 bytes data, %s MB used, %s MB / %s MB avail\n "
407-"mdsmap e1: 0/0/1 up\n\n")
408-
409-SAMPLE_NEW_TEMPLATE = ("health HEALTH_OK\n"
410-" monmap e2: 3 mons at {inst-007=192.168.64.139:6789/0,"
411-"inst-008=192.168.64.140:6789/0,inst-009=192.168.64.141:6789/0}, "
412-"election epoch 6, quorum 0,1,2 inst-007,inst-008,inst-009\n"
413-" osdmap e28: 3 osds: 3 up, 3 in\n"
414-" pgmap v193861: 208 pgs: 208 active+clean; 5514 MB data, %s MB used, "
415-"%s MB / %s MB avail; 1739KB/s wr, 54op/s\n"
416-" mdsmap e1: 0/0/1 up\n")
417+from twisted.internet.defer import succeed
418+
419+from landscape.lib.fs import touch_file
420+from landscape.tests.helpers import LandscapeTest, MonitorHelper
421+from landscape.monitor.cephusage import CephUsage
422+
423+
424+SAMPLE_OLD_TEMPLATE = (
425+ " health HEALTH_WARN 6 pgs degraded; 6 pgs stuck "
426+ "unclean\n"
427+ "monmap e2: 3 mons at {server-269703f4-5217-495a-b7f2-b3b3473c1719="
428+ "10.55.60.238:6789/0,server-3f370698-f3b0-4cbe-8db9-a18e304c952b="
429+ "10.55.60.141:6789/0,server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff="
430+ "10.55.60.241:6789/0}, election epoch 8, quorum 0,1,2 "
431+ "server-269703f4-5217-495a-b7f2-b3b3473c1719,"
432+ "server-3f370698-f3b0-4cbe-8db9-a18e304c952b,"
433+ "server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff\n "
434+ "osdmap e9: 3 osds: 3 up, 3 in\n "
435+ "pgmap v114: 192 pgs: 186 active+clean, 6 active+degraded; "
436+ "0 bytes data, %s MB used, %s MB / %s MB avail\n "
437+ "mdsmap e1: 0/0/1 up\n\n")
438+
439+SAMPLE_NEW_TEMPLATE = (
440+ "health HEALTH_OK\n"
441+ " monmap e2: 3 mons at {inst-007=192.168.64.139:6789/0,"
442+ "inst-008=192.168.64.140:6789/0,inst-009=192.168.64.141:6789/0}, "
443+ "election epoch 6, quorum 0,1,2 inst-007,inst-008,inst-009\n"
444+ " osdmap e28: 3 osds: 3 up, 3 in\n"
445+ " pgmap v193861: 208 pgs: 208 active+clean; 5514 MB data, %s MB used, "
446+ "%s MB / %s MB avail; 1739KB/s wr, 54op/s\n"
447+ " mdsmap e1: 0/0/1 up\n")
448
449 SAMPLE_OUTPUT = SAMPLE_NEW_TEMPLATE % (4296, 53880, 61248)
450 SAMPLE_OLD_OUTPUT = SAMPLE_OLD_TEMPLATE % (4296, 53880, 61248)
451
452-SAMPLE_QUORUM = (''
453-'{ "election_epoch": 8,\n'
454-' "quorum": [\n'
455-' 0,\n'
456-' 1,\n'
457-' 2],\n'
458-' "monmap": { "epoch": 2,\n'
459-' "fsid": "%s",\n'
460-' "modified": "2013-01-13 16:58:00.141737",\n'
461-' "created": "0.000000",\n'
462-' "mons": [\n'
463-' { "rank": 0,\n'
464-' "name": "server-1be72d64-0ff2-4ac1-ad13-1c06c8201011",\n'
465-' "addr": "10.55.60.188:6789\/0"},\n'
466-' { "rank": 1,\n'
467-' "name": "server-e847f147-ed13-46c2-8e6d-768aa32657ab",\n'
468-' "addr": "10.55.60.202:6789\/0"},\n'
469-' { "rank": 2,\n'
470-' "name": "server-3c831a0b-51d5-43a9-95d5-63644f0965cc",\n'
471-' "addr": "10.55.60.205:6789\/0"}]}}\n'
472-)
473+SAMPLE_QUORUM = (
474+ '{ "election_epoch": 8,\n'
475+ ' "quorum": [\n'
476+ ' 0,\n'
477+ ' 1,\n'
478+ ' 2],\n'
479+ ' "monmap": { "epoch": 2,\n'
480+ ' "fsid": "%s",\n'
481+ ' "modified": "2013-01-13 16:58:00.141737",\n'
482+ ' "created": "0.000000",\n'
483+ ' "mons": [\n'
484+ ' { "rank": 0,\n'
485+ ' "name": "server-1be72d64-0ff2-4ac1-ad13-1c06c8201011",\n'
486+ ' "addr": "10.55.60.188:6789\/0"},\n'
487+ ' { "rank": 1,\n'
488+ ' "name": "server-e847f147-ed13-46c2-8e6d-768aa32657ab",\n'
489+ ' "addr": "10.55.60.202:6789\/0"},\n'
490+ ' { "rank": 2,\n'
491+ ' "name": "server-3c831a0b-51d5-43a9-95d5-63644f0965cc",\n'
492+ ' "addr": "10.55.60.205:6789\/0"}]}}\n')
493
494 SAMPLE_QUORUM_OUTPUT = SAMPLE_QUORUM % "ecbb8960-0e21-11e2-b495-83a88f44db01"
495
496
497 class CephUsagePluginTest(LandscapeTest):
498- helpers = [ManagerHelper]
499+ helpers = [MonitorHelper]
500
501 def setUp(self):
502 super(CephUsagePluginTest, self).setUp()
503 self.mstore = self.broker_service.message_store
504+ self.plugin = CephUsage(create_time=self.reactor.time)
505
506- def test_get_ceph_usage_if_command_not_found(self):
507+ def test_wb_get_ceph_usage_if_command_not_found(self):
508 """
509 When the ceph command cannot be found or accessed, the
510 C{_get_ceph_usage} method returns None.
511 """
512- plugin = CephUsage(create_time=self.reactor.time)
513-
514- def return_none():
515- return None
516-
517- plugin._get_ceph_command_output = return_none
518-
519- self.manager.add(plugin)
520-
521- result = plugin._get_ceph_usage()
522- self.assertIs(None, result)
523-
524- def test_get_ceph_usage(self):
525+ self.plugin._get_status_command_output = lambda: succeed(None)
526+ self.monitor.add(self.plugin)
527+
528+ self.assertIs(
529+ None, self.successResultOf(self.plugin._get_ceph_usage()))
530+
531+ def test_wb_get_ceph_usage(self):
532 """
533 When the ceph command call returns output, the _get_ceph_usage method
534 returns the percentage of used space.
535 """
536- plugin = CephUsage(create_time=self.reactor.time)
537-
538- def return_output():
539- return SAMPLE_OUTPUT
540-
541- plugin._get_ceph_command_output = return_output
542-
543- self.manager.add(plugin)
544-
545- result = plugin._get_ceph_usage()
546- self.assertEqual(0.12029780564263323, result)
547-
548- def test_get_ceph_usage_old_format(self):
549+ self.plugin._get_status_command_output = lambda: succeed(SAMPLE_OUTPUT)
550+ self.monitor.add(self.plugin)
551+
552+ self.assertEqual(
553+ 0.12029780564263323,
554+ self.successResultOf(self.plugin._get_ceph_usage()))
555+
556+ def test_wb_get_ceph_usage_old_format(self):
557 """
558 The _get_ceph_usage method understands command output in the "old"
559 format (the output changed around version 0.56.1)
560 """
561- plugin = CephUsage(create_time=self.reactor.time)
562-
563- def return_output():
564- return SAMPLE_OLD_OUTPUT
565-
566- plugin._get_ceph_command_output = return_output
567-
568- self.manager.add(plugin)
569-
570- result = plugin._get_ceph_usage()
571- self.assertEqual(0.12029780564263323, result)
572-
573- def test_get_ceph_usage_empty_disk(self):
574+ self.plugin._get_status_command_output = (
575+ lambda: succeed(SAMPLE_OLD_OUTPUT))
576+ self.monitor.add(self.plugin)
577+
578+ self.assertEqual(
579+ 0.12029780564263323,
580+ self.successResultOf(self.plugin._get_ceph_usage()))
581+
582+ def test_wb_get_ceph_usage_empty_disk(self):
583 """
584 When the ceph command call returns output for empty disks, the
585 _get_ceph_usage method returns 0.0 .
586 """
587- plugin = CephUsage(create_time=self.reactor.time)
588-
589- def return_output():
590- return SAMPLE_NEW_TEMPLATE % (0, 100, 100)
591-
592- plugin._get_ceph_command_output = return_output
593-
594- self.manager.add(plugin)
595-
596- result = plugin._get_ceph_usage()
597- self.assertEqual(0.0, result)
598-
599- def test_get_ceph_usage_full_disk(self):
600+ self.plugin._get_status_command_output = (
601+ lambda: succeed(SAMPLE_NEW_TEMPLATE % (0, 100, 100)))
602+ self.monitor.add(self.plugin)
603+
604+ self.assertEqual(
605+ 0.0, self.successResultOf(self.plugin._get_ceph_usage()))
606+
607+ def test_wb_get_ceph_usage_full_disk(self):
608 """
609 When the ceph command call returns output for empty disks, the
610 _get_ceph_usage method returns 1.0 .
611 """
612- plugin = CephUsage(create_time=self.reactor.time)
613-
614- def return_output():
615- return SAMPLE_NEW_TEMPLATE % (100, 0, 100)
616-
617- plugin._get_ceph_command_output = return_output
618-
619- self.manager.add(plugin)
620-
621- result = plugin._get_ceph_usage()
622- self.assertEqual(1.0, result)
623-
624- def test_get_ceph_usage_no_information(self):
625+ self.plugin._get_status_command_output = (
626+ lambda: succeed(SAMPLE_NEW_TEMPLATE % (100, 0, 100)))
627+
628+ self.monitor.add(self.plugin)
629+ self.assertEqual(
630+ 1.0, self.successResultOf(self.plugin._get_ceph_usage()))
631+
632+ def test_wb_get_ceph_usage_no_information(self):
633 """
634 When the ceph command outputs something that does not contain the
635 disk usage information, the _get_ceph_usage method returns None.
636 """
637- plugin = CephUsage(create_time=self.reactor.time)
638 output = "Blah\nblah"
639 error = "Could not parse command output: '%s'" % output
640 self.log_helper.ignore_errors(error)
641
642- def return_output():
643- return output
644- plugin._get_ceph_command_output = return_output
645-
646- self.manager.add(plugin)
647-
648- result = plugin._get_ceph_usage()
649- self.assertEqual(None, result)
650+ self.plugin._get_status_command_output = lambda: succeed(output)
651+
652+ self.monitor.add(self.plugin)
653+ self.assertIs(
654+ None, self.successResultOf(self.plugin._get_ceph_usage()))
655
656 def test_never_exchange_empty_messages(self):
657 """
658@@ -174,12 +150,10 @@
659 message is created during exchange, it should not be queued.
660 """
661 self.mstore.set_accepted_types(["ceph-usage"])
662-
663- plugin = CephUsage(create_time=self.reactor.time)
664- self.manager.add(plugin)
665-
666- self.manager.exchange()
667- self.assertEqual(len(self.mstore.get_pending_messages()), 0)
668+ self.monitor.add(self.plugin)
669+
670+ self.monitor.exchange()
671+ self.assertEqual(0, len(self.mstore.get_pending_messages()))
672
673 def test_exchange_messages(self):
674 """
675@@ -189,13 +163,11 @@
676 ring_id = "whatever"
677 self.mstore.set_accepted_types(["ceph-usage"])
678
679- plugin = CephUsage(create_time=self.reactor.time)
680- plugin._ceph_usage_points = [(60, 1.0)]
681- plugin._ceph_ring_id = ring_id
682- self.manager.add(plugin)
683-
684- self.manager.exchange()
685-
686+ self.plugin._ceph_usage_points = [(60, 1.0)]
687+ self.plugin._ceph_ring_id = ring_id
688+ self.monitor.add(self.plugin)
689+
690+ self.monitor.exchange()
691 self.assertMessages(self.mstore.get_pending_messages(),
692 [{"type": "ceph-usage",
693 "ceph-usages": [(60, 1.0)],
694@@ -205,13 +177,10 @@
695 """
696 Calling create_message returns an expected message.
697 """
698- plugin = CephUsage(create_time=self.reactor.time)
699- self.manager.add(plugin)
700-
701 ring_id = "blah"
702- plugin._ceph_usage_points = []
703- plugin._ceph_ring_id = ring_id
704- message = plugin.create_message()
705+ self.plugin._ceph_usage_points = []
706+ self.plugin._ceph_ring_id = ring_id
707+ message = self.plugin.create_message()
708
709 self.assertIn("type", message)
710 self.assertEqual(message["type"], "ceph-usage")
711@@ -221,8 +190,8 @@
712 self.assertEqual(len(ceph_usages), 0)
713
714 point = (60, 1.0)
715- plugin._ceph_usage_points = [point]
716- message = plugin.create_message()
717+ self.plugin._ceph_usage_points = [point]
718+ message = self.plugin.create_message()
719 self.assertIn("type", message)
720 self.assertEqual(message["type"], "ceph-usage")
721 self.assertIn("ceph-usages", message)
722@@ -233,101 +202,75 @@
723
724 def test_no_message_if_not_accepted(self):
725 """
726- Don't add any messages at all if the broker isn't currently
727- accepting their type.
728+ Don't add any messages at all if the broker isn't currently accepting
729+ their type.
730 """
731 interval = 30
732- exchange_interval = 300
733-
734- plugin = CephUsage(create_time=self.reactor.time,
735- interval=interval, exchange_interval=300)
736-
737- self.manager.add(plugin)
738-
739- self.reactor.advance(exchange_interval * 2)
740- self.manager.exchange()
741+ monitor_interval = 300
742+
743+ plugin = CephUsage(
744+ interval=interval, monitor_interval=monitor_interval,
745+ create_time=self.reactor.time)
746+
747+ self.monitor.add(plugin)
748+
749+ self.reactor.advance(monitor_interval * 2)
750+ self.monitor.exchange()
751
752 self.mstore.set_accepted_types(["ceph-usage"])
753 self.assertMessages(list(self.mstore.get_pending_messages()), [])
754
755- def test_get_ceph_ring_id(self):
756+ def test_wb_get_ceph_ring_id(self):
757 """
758 When given a well formatted command output, the _get_ceph_ring_id()
759 method returns the correct ring_id.
760 """
761- plugin = CephUsage(create_time=self.reactor.time)
762-
763 uuid = "i-am-a-uuid"
764-
765- def return_output():
766- return SAMPLE_QUORUM % uuid
767-
768- plugin._get_quorum_command_output = return_output
769-
770- self.manager.add(plugin)
771-
772- result = plugin._get_ceph_ring_id()
773- self.assertEqual(uuid, result)
774-
775- def test_get_ceph_ring_id_valid_json_no_information(self):
776+ self.plugin._get_quorum_command_output = (
777+ lambda: succeed(SAMPLE_QUORUM % uuid))
778+ self.assertEqual(
779+ uuid, self.successResultOf(self.plugin._get_ceph_ring_id()))
780+
781+ def test_wb_get_ceph_ring_id_valid_json_no_information(self):
782 """
783 When the _get_quorum_command_output method returns something without
784 the ring uuid information present but that is valid JSON, the
785 _get_ceph_ring_id method returns None.
786 """
787- plugin = CephUsage(create_time=self.reactor.time)
788 error = "Could not get ring_id from output: '{\"election_epoch\": 8}'."
789 self.log_helper.ignore_errors(error)
790
791 def return_output():
792 # Valid JSON - just without the info we're looking for.
793 data = {"election_epoch": 8}
794- return json.dumps(data)
795-
796- plugin._get_quorum_command_output = return_output
797-
798- self.manager.add(plugin)
799-
800- result = plugin._get_ceph_ring_id()
801- self.assertEqual(None, result)
802-
803- def test_get_ceph_ring_id_no_information(self):
804+ return succeed(json.dumps(data))
805+
806+ self.plugin._get_quorum_command_output = return_output
807+ self.assertIs(
808+ None, self.successResultOf(self.plugin._get_ceph_ring_id()))
809+
810+ def test_wb_get_ceph_ring_id_no_information(self):
811 """
812 When the _get_quorum_command_output method returns something without
813 the ring uuid information present, the _get_ceph_ring_id method returns
814 None.
815 """
816- plugin = CephUsage(create_time=self.reactor.time)
817 error = "Could not get ring_id from output: 'Blah\nblah'."
818 self.log_helper.ignore_errors(error)
819
820- def return_output():
821- return "Blah\nblah"
822-
823- plugin._get_quorum_command_output = return_output
824-
825- self.manager.add(plugin)
826-
827- result = plugin._get_ceph_ring_id()
828- self.assertEqual(None, result)
829-
830- def test_get_ceph_ring_id_command_exception(self):
831+ self.plugin._get_quorum_command_output = lambda: succeed("Blah\nblah")
832+ self.assertIs(
833+ None, self.successResultOf(self.plugin._get_ceph_ring_id()))
834+
835+ def test_wb_get_ceph_ring_id_command_exception(self):
836 """
837 When the _get_quorum_command_output method returns None (if an
838 exception happened for example), the _get_ceph_ring_id method
839 returns None and logs no error.
840 """
841- plugin = CephUsage(create_time=self.reactor.time)
842-
843- def return_output():
844- return None
845-
846- plugin._get_quorum_command_output = return_output
847-
848- self.manager.add(plugin)
849-
850- result = plugin._get_ceph_ring_id()
851- self.assertEqual(None, result)
852+ self.plugin._get_quorum_command_output = lambda: succeed(None)
853+ self.assertIs(
854+ None, self.successResultOf(self.plugin._get_ceph_ring_id()))
855
856 def test_plugin_run(self):
857 """
858@@ -336,60 +279,44 @@
859 The _ceph_ring_id member of the plugin is also filled with the output
860 of the _get_ceph_ring_id method.
861 """
862- exchange_interval = 300
863- interval = exchange_interval
864- plugin = CephUsage(create_time=self.reactor.time,
865- exchange_interval=exchange_interval,
866- interval=interval)
867+ monitor_interval = 300
868+ interval = monitor_interval
869+ plugin = CephUsage(
870+ interval=interval, monitor_interval=monitor_interval,
871+ create_time=self.reactor.time)
872+
873 uuid = "i-am-a-unique-snowflake"
874
875- def return_quorum():
876- return SAMPLE_QUORUM % uuid
877-
878- def return_full_disk():
879- return SAMPLE_NEW_TEMPLATE % (100, 0, 100)
880-
881- plugin._ceph_config = "/etc/hosts"
882- plugin._get_quorum_command_output = return_quorum
883- plugin._get_ceph_command_output = return_full_disk
884-
885- self.manager.add(plugin)
886-
887- self.reactor.advance(exchange_interval * 2)
888+ # The config file must be present for the plugin to run.
889+ ceph_client_dir = os.path.join(self.config.data_path, "ceph-client")
890+ ceph_conf = os.path.join(ceph_client_dir, "ceph.landscape-client.conf")
891+ os.mkdir(ceph_client_dir)
892+ touch_file(ceph_conf)
893+ plugin._ceph_config = ceph_conf
894+
895+ plugin._get_quorum_command_output = (
896+ lambda: succeed(SAMPLE_QUORUM % uuid))
897+ plugin._get_status_command_output = (
898+ lambda: succeed(SAMPLE_NEW_TEMPLATE % (100, 0, 100)))
899+ self.monitor.add(plugin)
900+
901+ self.reactor.advance(monitor_interval * 2)
902
903 self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points)
904 self.assertEqual(uuid, plugin._ceph_ring_id)
905
906- def test_flush_persists_data_to_disk(self):
907- """
908- The plugin's C{flush} method is called every C{flush_interval} and
909- creates the perists file.
910- """
911- flush_interval = self.config.flush_interval
912- persist_filename = os.path.join(self.config.data_path, "ceph.bpickle")
913-
914- self.assertFalse(os.path.exists(persist_filename))
915- plugin = CephUsage(create_time=self.reactor.time)
916- self.manager.add(plugin)
917-
918- self.reactor.advance(flush_interval)
919-
920- self.assertTrue(os.path.exists(persist_filename))
921-
922 def test_resynchronize_message_calls_resynchronize_method(self):
923 """
924 If the reactor fires a "resynchronize" even the C{_resynchronize}
925 method on the ceph plugin object is called.
926 """
927- plugin = CephUsage(create_time=self.reactor.time)
928-
929 self.called = False
930
931 def stub_resynchronize():
932 self.called = True
933- plugin._resynchronize = stub_resynchronize
934+ self.plugin._resynchronize = stub_resynchronize
935
936- self.manager.add(plugin)
937+ self.monitor.add(self.plugin)
938
939 self.reactor.fire("resynchronize")
940

Subscribers

People subscribed via source and target branches

to all changes: