Merge lp:~ack/landscape-client/ceph-monitor-plugin into lp:~landscape/landscape-client/trunk

Proposed by Alberto Donato
Status: Merged
Approved by: Alberto Donato
Approved revision: 694
Merged at revision: 684
Proposed branch: lp:~ack/landscape-client/ceph-monitor-plugin
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 939 lines (+282/-410)
7 files modified
landscape/lib/command.py (+0/-38)
landscape/lib/tests/test_command.py (+0/-32)
landscape/manager/config.py (+1/-1)
landscape/manager/tests/test_config.py (+2/-3)
landscape/monitor/cephusage.py (+108/-93)
landscape/monitor/config.py (+2/-1)
landscape/monitor/tests/test_cephusage.py (+169/-242)
To merge this branch: bzr merge lp:~ack/landscape-client/ceph-monitor-plugin
Reviewer Review Type Date Requested Status
Free Ekanayaka (community) Approve
Chris Glass (community) Approve
Review via email: mp+167056@code.launchpad.net

Commit message

This changes CephUsage from being a manager plugin to a monitor one, so it doesn't run as root.
It now looks for a landscape-specific ceph config and key in /var/lib/landscape/client/ceph-client, which is passed to the "ceph".

Description of the change

This changes CephUsage from being a manager plugin to a monitor one, so it doesn't run as root.
It now looks for a landscape-specific ceph config and key in /var/lib/landscape/client/ceph-client, which is passed to the "ceph".

The landscape-client charm at lp:~ack/charms/precise/landscape-client/ceph-client-interface creates the required ceph config for the plugin.

To post a comment you must log in.
Revision history for this message
Chris Glass (tribaal) wrote :

Nice, it's much cleaner as a monitor plugin! +1
Thanks for refactoring the tests too :)

[1]
Please add some docstrings comments on the monitor plugin class to explain how to generate a usable ceph key, since some people might need/want to use the ceph feature of landscape with a legacy ceph insallation or a non-charm one.
There is already some description of how it works in the docstring for _run_ceph_command() but it would be good to have a quick refresher in the class/module docstring as well.

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Looks good, +1!

#1

+ The configured keyring can be generated with:
+
+ ceph-authtool <keyring-file> --create-keyring
+ --name=client.landscape-client --add-key=<key>

Please add something like "This will be done automatically by the landscape-client charm, when deploying as a ceph node subordinate".

#2

+ output = run_command(" ".join(command))

To be picky, this should use Twisted's process protocol stuff (we have some helper for that), otherwise the reactor will block. It's important in case the ceph command hangs for some reason, for example. It can be addressed in a separate branch.

#3

Please mark the tests where you access private methods as whitebox ("test_wb_xxx").

review: Approve
Revision history for this message
🤖 Landscape Builder (landscape-builder) wrote :

There are additional revisions which have not been approved in review. Please seek review and approval of these new revisions.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== removed file 'landscape/lib/command.py'
--- landscape/lib/command.py 2009-03-27 13:07:24 +0000
+++ landscape/lib/command.py 1970-01-01 00:00:00 +0000
@@ -1,38 +0,0 @@
1"""Shell commands execution."""
2import commands
3
4class CommandError(Exception):
5 """
6 Raised by L{run_command} in case of non-0 exit code.
7
8 @cvar command: The shell command that failed.
9 @cvar exit_status: Its non-zero exit status.
10 @cvar output: The command's output.
11 """
12 def __init__(self, command, exit_status, output):
13 self.command = command
14 self.exit_status = exit_status
15 self.output = output
16
17 def __str__(self):
18 return "'%s' exited with status %d (%s)" % (
19 self.command, self.exit_status, self.output)
20
21 def __repr__(self):
22 return "<CommandError command=<%s> exit_status=%d output=<%s>>" % (
23 self.command, self.exit_status, self.output)
24
25
26def run_command(command):
27 """
28 Execute a command in a shell and return the command's output.
29
30 If the command's exit status is not 0 a L{CommandError} exception
31 is raised.
32 """
33 exit_status, output = commands.getstatusoutput(command)
34 # shift down 8 bits to get shell-like exit codes
35 exit_status = exit_status >> 8
36 if exit_status != 0:
37 raise CommandError(command, exit_status, output)
38 return output
390
=== removed file 'landscape/lib/tests/test_command.py'
--- landscape/lib/tests/test_command.py 2011-07-05 05:09:11 +0000
+++ landscape/lib/tests/test_command.py 1970-01-01 00:00:00 +0000
@@ -1,32 +0,0 @@
1from landscape.tests.helpers import LandscapeTest
2
3from landscape.lib.command import run_command, CommandError
4
5
6class CommandTest(LandscapeTest):
7
8 def setUp(self):
9 super(CommandTest, self).setUp()
10
11 def test_basic(self):
12 self.assertEqual(run_command("echo test"), "test")
13
14 def test_non_0_exit_status(self):
15 try:
16 run_command("false")
17 except CommandError, error:
18 self.assertEqual(error.command, "false")
19 self.assertEqual(error.output, "")
20 self.assertEqual(error.exit_status, 1)
21 else:
22 self.fail("CommandError not raised")
23
24 def test_error_str(self):
25 self.assertEqual(str(CommandError("test_command", 1, "test output")),
26 "'test_command' exited with status 1 "
27 "(test output)")
28
29 def test_error_repr(self):
30 self.assertEqual(repr(CommandError("test_command", 1, "test output")),
31 "<CommandError command=<test_command> "
32 "exit_status=1 output=<test output>>")
330
=== modified file 'landscape/manager/config.py'
--- landscape/manager/config.py 2013-04-04 11:08:12 +0000
+++ landscape/manager/config.py 2013-06-04 16:15:35 +0000
@@ -6,7 +6,7 @@
66
7ALL_PLUGINS = ["ProcessKiller", "PackageManager", "UserManager",7ALL_PLUGINS = ["ProcessKiller", "PackageManager", "UserManager",
8 "ShutdownManager", "AptSources", "HardwareInfo",8 "ShutdownManager", "AptSources", "HardwareInfo",
9 "CephUsage", "KeystoneToken", "HAService"]9 "KeystoneToken", "HAService"]
1010
1111
12class ManagerConfiguration(Configuration):12class ManagerConfiguration(Configuration):
1313
=== modified file 'landscape/manager/tests/test_config.py'
--- landscape/manager/tests/test_config.py 2013-04-04 11:38:19 +0000
+++ landscape/manager/tests/test_config.py 2013-06-04 16:15:35 +0000
@@ -12,9 +12,8 @@
12 def test_plugin_factories(self):12 def test_plugin_factories(self):
13 """By default all plugins are enabled."""13 """By default all plugins are enabled."""
14 self.assertEqual(["ProcessKiller", "PackageManager", "UserManager",14 self.assertEqual(["ProcessKiller", "PackageManager", "UserManager",
15 "ShutdownManager", "AptSources",15 "ShutdownManager", "AptSources", "HardwareInfo",
16 "HardwareInfo", "CephUsage", "KeystoneToken",16 "KeystoneToken", "HAService"],
17 "HAService"],
18 ALL_PLUGINS)17 ALL_PLUGINS)
19 self.assertEqual(ALL_PLUGINS, self.config.plugin_factories)18 self.assertEqual(ALL_PLUGINS, self.config.plugin_factories)
2019
2120
=== renamed file 'landscape/manager/cephusage.py' => 'landscape/monitor/cephusage.py'
--- landscape/manager/cephusage.py 2013-04-02 14:56:19 +0000
+++ landscape/monitor/cephusage.py 2013-06-04 16:15:35 +0000
@@ -4,63 +4,70 @@
4import logging4import logging
5import re5import re
66
7from twisted.internet.defer import inlineCallbacks, returnValue
8
7from landscape.accumulate import Accumulator9from landscape.accumulate import Accumulator
8from landscape.lib.monitor import CoverageMonitor10from landscape.lib.monitor import CoverageMonitor
9from landscape.lib.command import run_command, CommandError11from landscape.lib.twisted_util import spawn_process
10from landscape.lib.persist import Persist12from landscape.monitor.plugin import MonitorPlugin
11from landscape.manager.plugin import ManagerPlugin13
1214
13ACCUMULATOR_KEY = "ceph-usage-accumulator"15class CephUsage(MonitorPlugin):
14CEPH_CONFIG_FILE = "/etc/ceph/ceph.conf"
15
16EXP = re.compile(".*pgmap.*data, (\d+) MB used, (\d+) MB / (\d+) MB avail.*",
17 flags=re.S)
18
19
20class CephUsage(ManagerPlugin):
21 """16 """
22 Plugin that captures Ceph usage information. This only works if the client17 Plugin that captures Ceph usage information. This only works if the client
23 runs on one of the Ceph monitor nodes, and it noops otherwise.18 runs on one of the Ceph monitor nodes, and it noops otherwise.
19
20 The plugin requires the 'ceph' command to be available, which is run with a
21 config file in <data_path>/ceph-client/ceph.landscape-client.conf with the
22 following config:
23
24 [global]
25 auth supported = cephx
26 keyring = <keyring-file>
27 mon host = <ip>:6789
28
29 The configured keyring can be generated with:
30
31 ceph-authtool <keyring-file> --create-keyring
32 --name=client.landscape-client --add-key=<key>
33
34 The landscape-client charm automatically provides the client configuration
35 and key when deployed as subordinate of a ceph node.
24 """36 """
37
25 persist_name = "ceph-usage"38 persist_name = "ceph-usage"
26 # Prevent the Plugin base-class from scheduling looping calls.39 # Prevent the Plugin base-class from scheduling looping calls.
27 run_interval = None40 run_interval = None
2841
29 def __init__(self, interval=30, exchange_interval=60 * 60,42 _usage_regexp = re.compile(
43 ".*pgmap.*data, (\d+) MB used, (\d+) MB / (\d+) MB avail.*",
44 flags=re.S)
45
46 def __init__(self, interval=30, monitor_interval=60 * 60,
30 create_time=time.time):47 create_time=time.time):
31 self._interval = interval48 self._interval = interval
32 self._exchange_interval = exchange_interval49 self._monitor_interval = monitor_interval
33 self._ceph_usage_points = []50 self._ceph_usage_points = []
34 self._ceph_ring_id = None51 self._ceph_ring_id = None
35 self._create_time = create_time52 self._create_time = create_time
36 self._ceph_config = CEPH_CONFIG_FILE53 self._ceph_config = None
3754
38 def register(self, registry):55 def register(self, registry):
39 super(CephUsage, self).register(registry)56 super(CephUsage, self).register(registry)
57 self._ceph_config = os.path.join(
58 self.registry.config.data_path, "ceph-client",
59 "ceph.landscape-client.conf")
60
61 self._accumulate = Accumulator(self._persist, self._interval)
62 self._monitor = CoverageMonitor(
63 self._interval, 0.8, "Ceph usage snapshot",
64 create_time=self._create_time)
65
40 self.registry.reactor.call_every(self._interval, self.run)66 self.registry.reactor.call_every(self._interval, self.run)
4167 self.registry.reactor.call_every(
42 self._persist_filename = os.path.join(self.registry.config.data_path,68 self._monitor_interval, self._monitor.log)
43 "ceph.bpickle")
44 self._persist = Persist(filename=self._persist_filename)
45
46 self._accumulate = Accumulator(self._persist, self._interval)
47
48 self._monitor = CoverageMonitor(self._interval, 0.8,
49 "Ceph usage snapshot",
50 create_time=self._create_time)
51 self.registry.reactor.call_every(self._exchange_interval,
52 self._monitor.log)
53 self.registry.reactor.call_on("stop", self._monitor.log, priority=2000)69 self.registry.reactor.call_on("stop", self._monitor.log, priority=2000)
54 self.call_on_accepted("ceph-usage", self.send_message, True)70 self.call_on_accepted("ceph-usage", self.send_message, True)
55 self.registry.reactor.call_on("resynchronize", self._resynchronize)
56 self.registry.reactor.call_every(self.registry.config.flush_interval,
57 self.flush)
58
59 def _resynchronize(self):
60 self._persist.remove(self.persist_name)
61
62 def flush(self):
63 self._persist.save(self._persist_filename)
6471
65 def create_message(self):72 def create_message(self):
66 ceph_points = self._ceph_usage_points73 ceph_points = self._ceph_usage_points
@@ -75,30 +82,31 @@
75 self.registry.broker.send_message(message, urgent=urgent)82 self.registry.broker.send_message(message, urgent=urgent)
7683
77 def exchange(self, urgent=False):84 def exchange(self, urgent=False):
78 self.registry.broker.call_if_accepted("ceph-usage",85 self.registry.broker.call_if_accepted(
79 self.send_message, urgent)86 "ceph-usage", self.send_message, urgent)
8087
88 @inlineCallbacks
81 def run(self):89 def run(self):
82 self._monitor.ping()90 self._monitor.ping()
8391
84 config_file = self._ceph_config92 # Check if a ceph config file is available. If it's not , it's not a
85 # Check if a ceph config file is available. No need to run anything93 # ceph machine or ceph is set up yet. No need to run anything in this
86 # if we know that we're not on a Ceph monitor node anyway.94 # case.
87 if not os.path.exists(config_file):95 if self._ceph_config is None or not os.path.exists(self._ceph_config):
88 # There is no config file - it's not a ceph machine.96 returnValue(None)
89 return None
9097
91 # Extract the ceph ring Id and cache it.98 # Extract the ceph ring Id and cache it.
92 if self._ceph_ring_id is None:99 if self._ceph_ring_id is None:
93 self._ceph_ring_id = self._get_ceph_ring_id()100 self._ceph_ring_id = yield self._get_ceph_ring_id()
94101
95 new_timestamp = int(self._create_time())102 new_timestamp = int(self._create_time())
96 new_ceph_usage = self._get_ceph_usage()103 new_ceph_usage = yield self._get_ceph_usage()
97104
98 step_data = None105 step_data = None
99 if new_ceph_usage is not None:106 if new_ceph_usage is not None:
100 step_data = self._accumulate(new_timestamp, new_ceph_usage,107 step_data = self._accumulate(
101 ACCUMULATOR_KEY)108 new_timestamp, new_ceph_usage, "ceph-usage-accumulator")
109
102 if step_data is not None:110 if step_data is not None:
103 self._ceph_usage_points.append(step_data)111 self._ceph_usage_points.append(step_data)
104112
@@ -107,52 +115,59 @@
107 Grab the ceph usage data by parsing the output of the "ceph status"115 Grab the ceph usage data by parsing the output of the "ceph status"
108 command output.116 command output.
109 """117 """
110 output = self._get_ceph_command_output()118
111119 def parse(output):
112 if output is None:120 if output is None:
113 return None121 return None
114122
115 result = EXP.match(output)123 result = self._usage_regexp.match(output)
116124 if not result:
117 if not result:125 logging.error("Could not parse command output: '%s'." % output)
118 logging.error("Could not parse command output: '%s'." % output)126 return None
119 return None127
120128 (used, available, total) = result.groups()
121 (used, available, total) = result.groups()129 # Note: used + available is NOT equal to total (there is some used
122 # Note: used + available is NOT equal to total (there is some used130 # space for duplication and system info etc...)
123 # space for duplication and system info etc...)131 filled = int(total) - int(available)
124132 return filled / float(total)
125 filled = int(total) - int(available)133
126134 return self._get_status_command_output().addCallback(parse)
127 return filled / float(total)135
128136 def _get_status_command_output(self):
129 def _get_ceph_command_output(self):137 return self._run_ceph_command("status")
130 try:
131 output = run_command("ceph status")
132 except (OSError, CommandError):
133 # If the command line client isn't available, we assume it's not
134 # a ceph monitor machine.
135 return None
136 return output
137138
138 def _get_ceph_ring_id(self):139 def _get_ceph_ring_id(self):
139 output = self._get_quorum_command_output()140 """Extract ceph ring id from ceph command output."""
140 if output is None:141
141 return None142 def parse(output):
142 try:143 if output is None:
143 quorum_status = json.loads(output)144 return None
144 ring_id = quorum_status["monmap"]["fsid"]145
145 except:146 try:
146 logging.error(147 quorum_status = json.loads(output)
147 "Could not get ring_id from output: '%s'." % output)148 ring_id = quorum_status["monmap"]["fsid"]
148 return None149 except:
149 return ring_id150 logging.error(
151 "Could not get ring_id from output: '%s'." % output)
152 return None
153 return ring_id
154
155 return self._get_quorum_command_output().addCallback(parse)
150156
151 def _get_quorum_command_output(self):157 def _get_quorum_command_output(self):
152 try:158 return self._run_ceph_command("quorum_status")
153 output = run_command("ceph quorum_status")159
154 except (OSError, CommandError):160 def _run_ceph_command(self, *args):
155 # If the command line client isn't available, we assume it's not161 """
156 # a ceph monitor machine.162 Run the ceph command with the specified options using landscape ceph
157 return None163 key. The keyring is expected to contain a configuration stanza with a
158 return output164 key for the "client.landscape-client" id.
165 """
166 params = ["--conf", self._ceph_config, "--id", "landscape-client"]
167 params.extend(args)
168 deferred = spawn_process("ceph", args=params)
169 # If the command line client isn't available, we assume it's not a ceph
170 # monitor machine.
171 deferred.addCallback(
172 lambda (out, err, code): out if code == 0 else None)
173 return deferred
159174
=== modified file 'landscape/monitor/config.py'
--- landscape/monitor/config.py 2013-05-10 11:02:24 +0000
+++ landscape/monitor/config.py 2013-06-04 16:15:35 +0000
@@ -5,7 +5,8 @@
5 "LoadAverage", "MemoryInfo", "MountInfo", "ProcessorInfo",5 "LoadAverage", "MemoryInfo", "MountInfo", "ProcessorInfo",
6 "Temperature", "PackageMonitor", "UserMonitor",6 "Temperature", "PackageMonitor", "UserMonitor",
7 "RebootRequired", "AptPreferences", "NetworkActivity",7 "RebootRequired", "AptPreferences", "NetworkActivity",
8 "NetworkDevice", "UpdateManager", "CPUUsage", "SwiftDeviceInfo"]8 "NetworkDevice", "UpdateManager", "CPUUsage", "SwiftDeviceInfo",
9 "CephUsage"]
910
1011
11class MonitorConfiguration(Configuration):12class MonitorConfiguration(Configuration):
1213
=== renamed file 'landscape/manager/tests/test_ceph.py' => 'landscape/monitor/tests/test_cephusage.py'
--- landscape/manager/tests/test_ceph.py 2013-04-02 14:56:19 +0000
+++ landscape/monitor/tests/test_cephusage.py 2013-06-04 16:15:35 +0000
@@ -1,171 +1,147 @@
1import os1import os
2import json2import json
33
4from landscape.tests.helpers import LandscapeTest, ManagerHelper4from twisted.internet.defer import succeed
5from landscape.manager.cephusage import CephUsage5
66from landscape.lib.fs import touch_file
77from landscape.tests.helpers import LandscapeTest, MonitorHelper
8SAMPLE_OLD_TEMPLATE = (" health HEALTH_WARN 6 pgs degraded; 6 pgs stuck "8from landscape.monitor.cephusage import CephUsage
9"unclean\n"9
10"monmap e2: 3 mons at {server-269703f4-5217-495a-b7f2-b3b3473c1719="10
11"10.55.60.238:6789/0,server-3f370698-f3b0-4cbe-8db9-a18e304c952b="11SAMPLE_OLD_TEMPLATE = (
12"10.55.60.141:6789/0,server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff="12 " health HEALTH_WARN 6 pgs degraded; 6 pgs stuck "
13"10.55.60.241:6789/0}, election epoch 8, quorum 0,1,2 "13 "unclean\n"
14"server-269703f4-5217-495a-b7f2-b3b3473c1719,"14 "monmap e2: 3 mons at {server-269703f4-5217-495a-b7f2-b3b3473c1719="
15"server-3f370698-f3b0-4cbe-8db9-a18e304c952b,"15 "10.55.60.238:6789/0,server-3f370698-f3b0-4cbe-8db9-a18e304c952b="
16"server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff\n "16 "10.55.60.141:6789/0,server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff="
17"osdmap e9: 3 osds: 3 up, 3 in\n "17 "10.55.60.241:6789/0}, election epoch 8, quorum 0,1,2 "
18"pgmap v114: 192 pgs: 186 active+clean, 6 active+degraded; "18 "server-269703f4-5217-495a-b7f2-b3b3473c1719,"
19"0 bytes data, %s MB used, %s MB / %s MB avail\n "19 "server-3f370698-f3b0-4cbe-8db9-a18e304c952b,"
20"mdsmap e1: 0/0/1 up\n\n")20 "server-f635fa07-e36f-453c-b3d5-b4ce86fbc6ff\n "
2121 "osdmap e9: 3 osds: 3 up, 3 in\n "
22SAMPLE_NEW_TEMPLATE = ("health HEALTH_OK\n"22 "pgmap v114: 192 pgs: 186 active+clean, 6 active+degraded; "
23" monmap e2: 3 mons at {inst-007=192.168.64.139:6789/0,"23 "0 bytes data, %s MB used, %s MB / %s MB avail\n "
24"inst-008=192.168.64.140:6789/0,inst-009=192.168.64.141:6789/0}, "24 "mdsmap e1: 0/0/1 up\n\n")
25"election epoch 6, quorum 0,1,2 inst-007,inst-008,inst-009\n"25
26" osdmap e28: 3 osds: 3 up, 3 in\n"26SAMPLE_NEW_TEMPLATE = (
27" pgmap v193861: 208 pgs: 208 active+clean; 5514 MB data, %s MB used, "27 "health HEALTH_OK\n"
28"%s MB / %s MB avail; 1739KB/s wr, 54op/s\n"28 " monmap e2: 3 mons at {inst-007=192.168.64.139:6789/0,"
29" mdsmap e1: 0/0/1 up\n")29 "inst-008=192.168.64.140:6789/0,inst-009=192.168.64.141:6789/0}, "
30 "election epoch 6, quorum 0,1,2 inst-007,inst-008,inst-009\n"
31 " osdmap e28: 3 osds: 3 up, 3 in\n"
32 " pgmap v193861: 208 pgs: 208 active+clean; 5514 MB data, %s MB used, "
33 "%s MB / %s MB avail; 1739KB/s wr, 54op/s\n"
34 " mdsmap e1: 0/0/1 up\n")
3035
31SAMPLE_OUTPUT = SAMPLE_NEW_TEMPLATE % (4296, 53880, 61248)36SAMPLE_OUTPUT = SAMPLE_NEW_TEMPLATE % (4296, 53880, 61248)
32SAMPLE_OLD_OUTPUT = SAMPLE_OLD_TEMPLATE % (4296, 53880, 61248)37SAMPLE_OLD_OUTPUT = SAMPLE_OLD_TEMPLATE % (4296, 53880, 61248)
3338
34SAMPLE_QUORUM = (''39SAMPLE_QUORUM = (
35'{ "election_epoch": 8,\n'40 '{ "election_epoch": 8,\n'
36' "quorum": [\n'41 ' "quorum": [\n'
37' 0,\n'42 ' 0,\n'
38' 1,\n'43 ' 1,\n'
39' 2],\n'44 ' 2],\n'
40' "monmap": { "epoch": 2,\n'45 ' "monmap": { "epoch": 2,\n'
41' "fsid": "%s",\n'46 ' "fsid": "%s",\n'
42' "modified": "2013-01-13 16:58:00.141737",\n'47 ' "modified": "2013-01-13 16:58:00.141737",\n'
43' "created": "0.000000",\n'48 ' "created": "0.000000",\n'
44' "mons": [\n'49 ' "mons": [\n'
45' { "rank": 0,\n'50 ' { "rank": 0,\n'
46' "name": "server-1be72d64-0ff2-4ac1-ad13-1c06c8201011",\n'51 ' "name": "server-1be72d64-0ff2-4ac1-ad13-1c06c8201011",\n'
47' "addr": "10.55.60.188:6789\/0"},\n'52 ' "addr": "10.55.60.188:6789\/0"},\n'
48' { "rank": 1,\n'53 ' { "rank": 1,\n'
49' "name": "server-e847f147-ed13-46c2-8e6d-768aa32657ab",\n'54 ' "name": "server-e847f147-ed13-46c2-8e6d-768aa32657ab",\n'
50' "addr": "10.55.60.202:6789\/0"},\n'55 ' "addr": "10.55.60.202:6789\/0"},\n'
51' { "rank": 2,\n'56 ' { "rank": 2,\n'
52' "name": "server-3c831a0b-51d5-43a9-95d5-63644f0965cc",\n'57 ' "name": "server-3c831a0b-51d5-43a9-95d5-63644f0965cc",\n'
53' "addr": "10.55.60.205:6789\/0"}]}}\n'58 ' "addr": "10.55.60.205:6789\/0"}]}}\n')
54)
5559
56SAMPLE_QUORUM_OUTPUT = SAMPLE_QUORUM % "ecbb8960-0e21-11e2-b495-83a88f44db01"60SAMPLE_QUORUM_OUTPUT = SAMPLE_QUORUM % "ecbb8960-0e21-11e2-b495-83a88f44db01"
5761
5862
59class CephUsagePluginTest(LandscapeTest):63class CephUsagePluginTest(LandscapeTest):
60 helpers = [ManagerHelper]64 helpers = [MonitorHelper]
6165
62 def setUp(self):66 def setUp(self):
63 super(CephUsagePluginTest, self).setUp()67 super(CephUsagePluginTest, self).setUp()
64 self.mstore = self.broker_service.message_store68 self.mstore = self.broker_service.message_store
69 self.plugin = CephUsage(create_time=self.reactor.time)
6570
66 def test_get_ceph_usage_if_command_not_found(self):71 def test_wb_get_ceph_usage_if_command_not_found(self):
67 """72 """
68 When the ceph command cannot be found or accessed, the73 When the ceph command cannot be found or accessed, the
69 C{_get_ceph_usage} method returns None.74 C{_get_ceph_usage} method returns None.
70 """75 """
71 plugin = CephUsage(create_time=self.reactor.time)76 self.plugin._get_status_command_output = lambda: succeed(None)
7277 self.monitor.add(self.plugin)
73 def return_none():78
74 return None79 self.assertIs(
7580 None, self.successResultOf(self.plugin._get_ceph_usage()))
76 plugin._get_ceph_command_output = return_none81
7782 def test_wb_get_ceph_usage(self):
78 self.manager.add(plugin)
79
80 result = plugin._get_ceph_usage()
81 self.assertIs(None, result)
82
83 def test_get_ceph_usage(self):
84 """83 """
85 When the ceph command call returns output, the _get_ceph_usage method84 When the ceph command call returns output, the _get_ceph_usage method
86 returns the percentage of used space.85 returns the percentage of used space.
87 """86 """
88 plugin = CephUsage(create_time=self.reactor.time)87 self.plugin._get_status_command_output = lambda: succeed(SAMPLE_OUTPUT)
8988 self.monitor.add(self.plugin)
90 def return_output():89
91 return SAMPLE_OUTPUT90 self.assertEqual(
9291 0.12029780564263323,
93 plugin._get_ceph_command_output = return_output92 self.successResultOf(self.plugin._get_ceph_usage()))
9493
95 self.manager.add(plugin)94 def test_wb_get_ceph_usage_old_format(self):
96
97 result = plugin._get_ceph_usage()
98 self.assertEqual(0.12029780564263323, result)
99
100 def test_get_ceph_usage_old_format(self):
101 """95 """
102 The _get_ceph_usage method understands command output in the "old"96 The _get_ceph_usage method understands command output in the "old"
103 format (the output changed around version 0.56.1)97 format (the output changed around version 0.56.1)
104 """98 """
105 plugin = CephUsage(create_time=self.reactor.time)99 self.plugin._get_status_command_output = (
106100 lambda: succeed(SAMPLE_OLD_OUTPUT))
107 def return_output():101 self.monitor.add(self.plugin)
108 return SAMPLE_OLD_OUTPUT102
109103 self.assertEqual(
110 plugin._get_ceph_command_output = return_output104 0.12029780564263323,
111105 self.successResultOf(self.plugin._get_ceph_usage()))
112 self.manager.add(plugin)106
113107 def test_wb_get_ceph_usage_empty_disk(self):
114 result = plugin._get_ceph_usage()
115 self.assertEqual(0.12029780564263323, result)
116
117 def test_get_ceph_usage_empty_disk(self):
118 """108 """
119 When the ceph command call returns output for empty disks, the109 When the ceph command call returns output for empty disks, the
120 _get_ceph_usage method returns 0.0 .110 _get_ceph_usage method returns 0.0 .
121 """111 """
122 plugin = CephUsage(create_time=self.reactor.time)112 self.plugin._get_status_command_output = (
123113 lambda: succeed(SAMPLE_NEW_TEMPLATE % (0, 100, 100)))
124 def return_output():114 self.monitor.add(self.plugin)
125 return SAMPLE_NEW_TEMPLATE % (0, 100, 100)115
126116 self.assertEqual(
127 plugin._get_ceph_command_output = return_output117 0.0, self.successResultOf(self.plugin._get_ceph_usage()))
128118
129 self.manager.add(plugin)119 def test_wb_get_ceph_usage_full_disk(self):
130
131 result = plugin._get_ceph_usage()
132 self.assertEqual(0.0, result)
133
134 def test_get_ceph_usage_full_disk(self):
135 """120 """
136 When the ceph command call returns output for empty disks, the121 When the ceph command call returns output for empty disks, the
137 _get_ceph_usage method returns 1.0 .122 _get_ceph_usage method returns 1.0 .
138 """123 """
139 plugin = CephUsage(create_time=self.reactor.time)124 self.plugin._get_status_command_output = (
140125 lambda: succeed(SAMPLE_NEW_TEMPLATE % (100, 0, 100)))
141 def return_output():126
142 return SAMPLE_NEW_TEMPLATE % (100, 0, 100)127 self.monitor.add(self.plugin)
143128 self.assertEqual(
144 plugin._get_ceph_command_output = return_output129 1.0, self.successResultOf(self.plugin._get_ceph_usage()))
145130
146 self.manager.add(plugin)131 def test_wb_get_ceph_usage_no_information(self):
147
148 result = plugin._get_ceph_usage()
149 self.assertEqual(1.0, result)
150
151 def test_get_ceph_usage_no_information(self):
152 """132 """
153 When the ceph command outputs something that does not contain the133 When the ceph command outputs something that does not contain the
154 disk usage information, the _get_ceph_usage method returns None.134 disk usage information, the _get_ceph_usage method returns None.
155 """135 """
156 plugin = CephUsage(create_time=self.reactor.time)
157 output = "Blah\nblah"136 output = "Blah\nblah"
158 error = "Could not parse command output: '%s'" % output137 error = "Could not parse command output: '%s'" % output
159 self.log_helper.ignore_errors(error)138 self.log_helper.ignore_errors(error)
160139
161 def return_output():140 self.plugin._get_status_command_output = lambda: succeed(output)
162 return output141
163 plugin._get_ceph_command_output = return_output142 self.monitor.add(self.plugin)
164143 self.assertIs(
165 self.manager.add(plugin)144 None, self.successResultOf(self.plugin._get_ceph_usage()))
166
167 result = plugin._get_ceph_usage()
168 self.assertEqual(None, result)
169145
170 def test_never_exchange_empty_messages(self):146 def test_never_exchange_empty_messages(self):
171 """147 """
@@ -174,12 +150,10 @@
174 message is created during exchange, it should not be queued.150 message is created during exchange, it should not be queued.
175 """151 """
176 self.mstore.set_accepted_types(["ceph-usage"])152 self.mstore.set_accepted_types(["ceph-usage"])
177153 self.monitor.add(self.plugin)
178 plugin = CephUsage(create_time=self.reactor.time)154
179 self.manager.add(plugin)155 self.monitor.exchange()
180156 self.assertEqual(0, len(self.mstore.get_pending_messages()))
181 self.manager.exchange()
182 self.assertEqual(len(self.mstore.get_pending_messages()), 0)
183157
184 def test_exchange_messages(self):158 def test_exchange_messages(self):
185 """159 """
@@ -189,13 +163,11 @@
189 ring_id = "whatever"163 ring_id = "whatever"
190 self.mstore.set_accepted_types(["ceph-usage"])164 self.mstore.set_accepted_types(["ceph-usage"])
191165
192 plugin = CephUsage(create_time=self.reactor.time)166 self.plugin._ceph_usage_points = [(60, 1.0)]
193 plugin._ceph_usage_points = [(60, 1.0)]167 self.plugin._ceph_ring_id = ring_id
194 plugin._ceph_ring_id = ring_id168 self.monitor.add(self.plugin)
195 self.manager.add(plugin)169
196170 self.monitor.exchange()
197 self.manager.exchange()
198
199 self.assertMessages(self.mstore.get_pending_messages(),171 self.assertMessages(self.mstore.get_pending_messages(),
200 [{"type": "ceph-usage",172 [{"type": "ceph-usage",
201 "ceph-usages": [(60, 1.0)],173 "ceph-usages": [(60, 1.0)],
@@ -205,13 +177,10 @@
205 """177 """
206 Calling create_message returns an expected message.178 Calling create_message returns an expected message.
207 """179 """
208 plugin = CephUsage(create_time=self.reactor.time)
209 self.manager.add(plugin)
210
211 ring_id = "blah"180 ring_id = "blah"
212 plugin._ceph_usage_points = []181 self.plugin._ceph_usage_points = []
213 plugin._ceph_ring_id = ring_id182 self.plugin._ceph_ring_id = ring_id
214 message = plugin.create_message()183 message = self.plugin.create_message()
215184
216 self.assertIn("type", message)185 self.assertIn("type", message)
217 self.assertEqual(message["type"], "ceph-usage")186 self.assertEqual(message["type"], "ceph-usage")
@@ -221,8 +190,8 @@
221 self.assertEqual(len(ceph_usages), 0)190 self.assertEqual(len(ceph_usages), 0)
222191
223 point = (60, 1.0)192 point = (60, 1.0)
224 plugin._ceph_usage_points = [point]193 self.plugin._ceph_usage_points = [point]
225 message = plugin.create_message()194 message = self.plugin.create_message()
226 self.assertIn("type", message)195 self.assertIn("type", message)
227 self.assertEqual(message["type"], "ceph-usage")196 self.assertEqual(message["type"], "ceph-usage")
228 self.assertIn("ceph-usages", message)197 self.assertIn("ceph-usages", message)
@@ -233,101 +202,75 @@
233202
234 def test_no_message_if_not_accepted(self):203 def test_no_message_if_not_accepted(self):
235 """204 """
236 Don't add any messages at all if the broker isn't currently205 Don't add any messages at all if the broker isn't currently accepting
237 accepting their type.206 their type.
238 """207 """
239 interval = 30208 interval = 30
240 exchange_interval = 300209 monitor_interval = 300
241210
242 plugin = CephUsage(create_time=self.reactor.time,211 plugin = CephUsage(
243 interval=interval, exchange_interval=300)212 interval=interval, monitor_interval=monitor_interval,
244213 create_time=self.reactor.time)
245 self.manager.add(plugin)214
246215 self.monitor.add(plugin)
247 self.reactor.advance(exchange_interval * 2)216
248 self.manager.exchange()217 self.reactor.advance(monitor_interval * 2)
218 self.monitor.exchange()
249219
250 self.mstore.set_accepted_types(["ceph-usage"])220 self.mstore.set_accepted_types(["ceph-usage"])
251 self.assertMessages(list(self.mstore.get_pending_messages()), [])221 self.assertMessages(list(self.mstore.get_pending_messages()), [])
252222
253 def test_get_ceph_ring_id(self):223 def test_wb_get_ceph_ring_id(self):
254 """224 """
255 When given a well formatted command output, the _get_ceph_ring_id()225 When given a well formatted command output, the _get_ceph_ring_id()
256 method returns the correct ring_id.226 method returns the correct ring_id.
257 """227 """
258 plugin = CephUsage(create_time=self.reactor.time)
259
260 uuid = "i-am-a-uuid"228 uuid = "i-am-a-uuid"
261229 self.plugin._get_quorum_command_output = (
262 def return_output():230 lambda: succeed(SAMPLE_QUORUM % uuid))
263 return SAMPLE_QUORUM % uuid231 self.assertEqual(
264232 uuid, self.successResultOf(self.plugin._get_ceph_ring_id()))
265 plugin._get_quorum_command_output = return_output233
266234 def test_wb_get_ceph_ring_id_valid_json_no_information(self):
267 self.manager.add(plugin)
268
269 result = plugin._get_ceph_ring_id()
270 self.assertEqual(uuid, result)
271
272 def test_get_ceph_ring_id_valid_json_no_information(self):
273 """235 """
274 When the _get_quorum_command_output method returns something without236 When the _get_quorum_command_output method returns something without
275 the ring uuid information present but that is valid JSON, the237 the ring uuid information present but that is valid JSON, the
276 _get_ceph_ring_id method returns None.238 _get_ceph_ring_id method returns None.
277 """239 """
278 plugin = CephUsage(create_time=self.reactor.time)
279 error = "Could not get ring_id from output: '{\"election_epoch\": 8}'."240 error = "Could not get ring_id from output: '{\"election_epoch\": 8}'."
280 self.log_helper.ignore_errors(error)241 self.log_helper.ignore_errors(error)
281242
282 def return_output():243 def return_output():
283 # Valid JSON - just without the info we're looking for.244 # Valid JSON - just without the info we're looking for.
284 data = {"election_epoch": 8}245 data = {"election_epoch": 8}
285 return json.dumps(data)246 return succeed(json.dumps(data))
286247
287 plugin._get_quorum_command_output = return_output248 self.plugin._get_quorum_command_output = return_output
288249 self.assertIs(
289 self.manager.add(plugin)250 None, self.successResultOf(self.plugin._get_ceph_ring_id()))
290251
291 result = plugin._get_ceph_ring_id()252 def test_wb_get_ceph_ring_id_no_information(self):
292 self.assertEqual(None, result)
293
294 def test_get_ceph_ring_id_no_information(self):
295 """253 """
296 When the _get_quorum_command_output method returns something without254 When the _get_quorum_command_output method returns something without
297 the ring uuid information present, the _get_ceph_ring_id method returns255 the ring uuid information present, the _get_ceph_ring_id method returns
298 None.256 None.
299 """257 """
300 plugin = CephUsage(create_time=self.reactor.time)
301 error = "Could not get ring_id from output: 'Blah\nblah'."258 error = "Could not get ring_id from output: 'Blah\nblah'."
302 self.log_helper.ignore_errors(error)259 self.log_helper.ignore_errors(error)
303260
304 def return_output():261 self.plugin._get_quorum_command_output = lambda: succeed("Blah\nblah")
305 return "Blah\nblah"262 self.assertIs(
306263 None, self.successResultOf(self.plugin._get_ceph_ring_id()))
307 plugin._get_quorum_command_output = return_output264
308265 def test_wb_get_ceph_ring_id_command_exception(self):
309 self.manager.add(plugin)
310
311 result = plugin._get_ceph_ring_id()
312 self.assertEqual(None, result)
313
314 def test_get_ceph_ring_id_command_exception(self):
315 """266 """
316 When the _get_quorum_command_output method returns None (if an267 When the _get_quorum_command_output method returns None (if an
317 exception happened for example), the _get_ceph_ring_id method268 exception happened for example), the _get_ceph_ring_id method
318 returns None and logs no error.269 returns None and logs no error.
319 """270 """
320 plugin = CephUsage(create_time=self.reactor.time)271 self.plugin._get_quorum_command_output = lambda: succeed(None)
321272 self.assertIs(
322 def return_output():273 None, self.successResultOf(self.plugin._get_ceph_ring_id()))
323 return None
324
325 plugin._get_quorum_command_output = return_output
326
327 self.manager.add(plugin)
328
329 result = plugin._get_ceph_ring_id()
330 self.assertEqual(None, result)
331274
332 def test_plugin_run(self):275 def test_plugin_run(self):
333 """276 """
@@ -336,60 +279,44 @@
336 The _ceph_ring_id member of the plugin is also filled with the output279 The _ceph_ring_id member of the plugin is also filled with the output
337 of the _get_ceph_ring_id method.280 of the _get_ceph_ring_id method.
338 """281 """
339 exchange_interval = 300282 monitor_interval = 300
340 interval = exchange_interval283 interval = monitor_interval
341 plugin = CephUsage(create_time=self.reactor.time,284 plugin = CephUsage(
342 exchange_interval=exchange_interval,285 interval=interval, monitor_interval=monitor_interval,
343 interval=interval)286 create_time=self.reactor.time)
287
344 uuid = "i-am-a-unique-snowflake"288 uuid = "i-am-a-unique-snowflake"
345289
346 def return_quorum():290 # The config file must be present for the plugin to run.
347 return SAMPLE_QUORUM % uuid291 ceph_client_dir = os.path.join(self.config.data_path, "ceph-client")
348292 ceph_conf = os.path.join(ceph_client_dir, "ceph.landscape-client.conf")
349 def return_full_disk():293 os.mkdir(ceph_client_dir)
350 return SAMPLE_NEW_TEMPLATE % (100, 0, 100)294 touch_file(ceph_conf)
351295 plugin._ceph_config = ceph_conf
352 plugin._ceph_config = "/etc/hosts"296
353 plugin._get_quorum_command_output = return_quorum297 plugin._get_quorum_command_output = (
354 plugin._get_ceph_command_output = return_full_disk298 lambda: succeed(SAMPLE_QUORUM % uuid))
355299 plugin._get_status_command_output = (
356 self.manager.add(plugin)300 lambda: succeed(SAMPLE_NEW_TEMPLATE % (100, 0, 100)))
357301 self.monitor.add(plugin)
358 self.reactor.advance(exchange_interval * 2)302
303 self.reactor.advance(monitor_interval * 2)
359304
360 self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points)305 self.assertEqual([(300, 1.0), (600, 1.0)], plugin._ceph_usage_points)
361 self.assertEqual(uuid, plugin._ceph_ring_id)306 self.assertEqual(uuid, plugin._ceph_ring_id)
362307
363 def test_flush_persists_data_to_disk(self):
364 """
365 The plugin's C{flush} method is called every C{flush_interval} and
366 creates the perists file.
367 """
368 flush_interval = self.config.flush_interval
369 persist_filename = os.path.join(self.config.data_path, "ceph.bpickle")
370
371 self.assertFalse(os.path.exists(persist_filename))
372 plugin = CephUsage(create_time=self.reactor.time)
373 self.manager.add(plugin)
374
375 self.reactor.advance(flush_interval)
376
377 self.assertTrue(os.path.exists(persist_filename))
378
379 def test_resynchronize_message_calls_resynchronize_method(self):308 def test_resynchronize_message_calls_resynchronize_method(self):
380 """309 """
381 If the reactor fires a "resynchronize" even the C{_resynchronize}310 If the reactor fires a "resynchronize" even the C{_resynchronize}
382 method on the ceph plugin object is called.311 method on the ceph plugin object is called.
383 """312 """
384 plugin = CephUsage(create_time=self.reactor.time)
385
386 self.called = False313 self.called = False
387314
388 def stub_resynchronize():315 def stub_resynchronize():
389 self.called = True316 self.called = True
390 plugin._resynchronize = stub_resynchronize317 self.plugin._resynchronize = stub_resynchronize
391318
392 self.manager.add(plugin)319 self.monitor.add(self.plugin)
393320
394 self.reactor.fire("resynchronize")321 self.reactor.fire("resynchronize")
395322

Subscribers

People subscribed via source and target branches

to all changes: