Merge lp:~gandelman-a/charms/precise/rabbitmq-server/ceph-support into lp:~openstack-charmers/charms/precise/rabbitmq-server/ha-support
- Precise Pangolin (12.04)
- ceph-support
- Merge into ha-support
Status: | Merged |
---|---|
Merged at revision: | 41 |
Proposed branch: | lp:~gandelman-a/charms/precise/rabbitmq-server/ceph-support |
Merge into: | lp:~openstack-charmers/charms/precise/rabbitmq-server/ha-support |
Diff against target: |
935 lines (+729/-31) 7 files modified
config.yaml (+13/-1) hooks/lib/ceph.py (+240/-0) hooks/rabbit_utils.py (+27/-0) hooks/rabbitmq-server-relations.py (+133/-26) hooks/utils.py (+312/-2) metadata.yaml (+3/-1) revision (+1/-1) |
To merge this branch: | bzr merge lp:~gandelman-a/charms/precise/rabbitmq-server/ceph-support |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Adam Gandelman (community) | Needs Resubmitting | ||
James Page | Needs Fixing | ||
Review via email: mp+151343@code.launchpad.net |
Commit message
Description of the change
Adds ceph-backed HA clustering, similar to our current mysql configuration. HA configuration is deferred until ceph relation exists. ceph relations use newly refactored ceph.py (from mysql) to setup rbd device and move data to it.
Adam Gandelman (gandelman-a) wrote : | # |
- 55. By Adam Gandelman
-
Drop utils.py backup.
James Page (james-page) wrote : | # |
I'll give this a more better test later this week after UDS - one comment for the time being
1) utils.py/do_hooks()
Hook itself should not be called in the try/except block as KeyErrors will not propagate back up into hook failures with the hook functions themselves - hence the original call to hook_func() in the else block.
James Page (james-page) wrote : | # |
Adam
I gave this a good test on precise; generally worked OK but the cluster hooks retain the build rabbitmq style cluster calls which cause hook execution failures on the cluster hook
Do these need to be dropped? The cookie syncs to /var/lib/rabbitmq which is on the rbd device. Maybe only doing the rabbitmq style clustered should be mutex with relating to ceph/being clustered.
Adam Gandelman (gandelman-a) wrote : | # |
James- I'd like to preserve the native rabbitmq clustering support that existed before adding the ceph/pacemaker requirements. I thought I had checked to ensure that it the lower-level clustering did not interfere but I'll check agian. Do you remember the order of operations wrt bringing up the cluster? add-relation ceph, add-relation hacluster, add-unit rabbitmq-server? I thought I had tested them all but I'll try again to trigger the errors you're seeing.
- 56. By Adam Gandelman
-
Only run peer-hooks for rabbit native clustering if no hacluster relation exists.
Adam Gandelman (gandelman-a) wrote : | # |
James-
Seems the issue was when building the cluster in the following order:
deploy rabbitmq-server
add-relation rabbitmq-server ceph
add-relation rabbitmq-server hacluster
add-unit rabbitmq-server
Part of the hacluster configuration requires setting a common rabbitmq node-name by which the server will be addressed. Native rabbitmq clustering (as configured in the peer relations) requires being able to address each server in the cluster as a unique name+host. Simplest solution is to only configure native peer clustering in the absence of an 'ha' relation.
- 57. By Adam Gandelman
-
Use or create /etc/rabbitmq/
rabbitmq- env.conf for setting node name. The .d directory is not available on newer package versions, update or
create rabbitmq-env.conf instead when setting node name. - 58. By Adam Gandelman
-
Add missing import.
- 59. By Adam Gandelman
-
Add hostname to node name.
- 60. By Adam Gandelman
-
do_hooks(): Update exception handling.
Adam Gandelman (gandelman-a) wrote : | # |
Updated do_hooks() to match correct exception handling that we use in other charms.
Moved setting of RABBITMQ_NODENAME to /etc/rabbitmq/
Tested on raring, too.
Preview Diff
1 | === modified file 'config.yaml' |
2 | --- config.yaml 2013-01-18 09:52:27 +0000 |
3 | +++ config.yaml 2013-03-08 20:30:29 +0000 |
4 | @@ -41,4 +41,16 @@ |
5 | description: | |
6 | Default multicast port number that will be used to communicate between |
7 | HA Cluster nodes. |
8 | - |
9 | + rbd-size: |
10 | + type: string |
11 | + default: 5G |
12 | + description: | |
13 | + Default rbd storage size to create when setting up block storage. |
14 | + This value should be specified in GB (e.g. 100G). |
15 | + rbd-name: |
16 | + type: string |
17 | + default: rabbitmq1 |
18 | + description: | |
19 | + The name that will be used to create the Ceph's RBD image with. If the |
20 | + image name exists in Ceph, it will be re-used and the data will be |
21 | + overwritten. |
22 | |
23 | === added symlink 'hooks/ceph-relation-changed' |
24 | === target is u'rabbitmq-server-relations.py' |
25 | === added symlink 'hooks/ceph-relation-joined' |
26 | === target is u'rabbitmq-server-relations.py' |
27 | === added directory 'hooks/lib' |
28 | === added file 'hooks/lib/__init__.py' |
29 | === added file 'hooks/lib/ceph.py' |
30 | --- hooks/lib/ceph.py 1970-01-01 00:00:00 +0000 |
31 | +++ hooks/lib/ceph.py 2013-03-08 20:30:29 +0000 |
32 | @@ -0,0 +1,240 @@ |
33 | +import utils |
34 | +import commands |
35 | +import re |
36 | +import subprocess |
37 | +import sys |
38 | +import os |
39 | +import shutil |
40 | + |
41 | +KEYRING = '/etc/ceph/ceph.client.%s.keyring' |
42 | +KEYFILE = '/etc/ceph/ceph.client.%s.key' |
43 | + |
44 | +CEPH_CONF = """[global] |
45 | + auth supported = %(auth)s |
46 | + keyring = %(keyring)s |
47 | + mon host = %(mon_hosts)s |
48 | +""" |
49 | + |
50 | +def execute(cmd): |
51 | + subprocess.check_call(cmd) |
52 | + |
53 | + |
54 | +def execute_shell(cmd): |
55 | + subprocess.check_call(cmd, shell=True) |
56 | + |
57 | + |
58 | +def install(): |
59 | + ceph_dir = "/etc/ceph" |
60 | + if not os.path.isdir(ceph_dir): |
61 | + os.mkdir(ceph_dir) |
62 | + utils.install('ceph-common') |
63 | + |
64 | + |
65 | +def rbd_exists(service, pool, rbd_img): |
66 | + (rc, out) = commands.getstatusoutput('rbd list --id %s --pool %s' %\ |
67 | + (service, pool)) |
68 | + return rbd_img in out |
69 | + |
70 | + |
71 | +def create_rbd_image(service, pool, image, sizemb): |
72 | + cmd = [ |
73 | + 'rbd', |
74 | + 'create', |
75 | + image, |
76 | + '--size', |
77 | + str(sizemb), |
78 | + '--id', |
79 | + service, |
80 | + '--pool', |
81 | + pool |
82 | + ] |
83 | + execute(cmd) |
84 | + |
85 | + |
86 | +def pool_exists(service, name): |
87 | + (rc, out) = commands.getstatusoutput("rados --id %s lspools" % service) |
88 | + return name in out |
89 | + |
90 | +def create_pool(service, name): |
91 | + cmd = [ |
92 | + 'rados', |
93 | + '--id', |
94 | + service, |
95 | + 'mkpool', |
96 | + name |
97 | + ] |
98 | + execute(cmd) |
99 | + |
100 | + |
101 | +def keyfile_path(service): |
102 | + return KEYFILE % service |
103 | + |
104 | +def keyring_path(service): |
105 | + return KEYRING % service |
106 | + |
107 | +def create_keyring(service, key): |
108 | + keyring = keyring_path(service) |
109 | + if os.path.exists(keyring): |
110 | + utils.juju_log('INFO', 'ceph: Keyring exists at %s.' % keyring) |
111 | + cmd = [ |
112 | + 'ceph-authtool', |
113 | + keyring, |
114 | + '--create-keyring', |
115 | + '--name=client.%s' % service, |
116 | + '--add-key=%s' % key |
117 | + ] |
118 | + execute(cmd) |
119 | + utils.juju_log('INFO', 'ceph: Created new ring at %s.' % keyring) |
120 | + |
121 | + |
122 | +def create_key_file(service, key): |
123 | + # create a file containing the key |
124 | + keyfile = keyfile_path(service) |
125 | + if os.path.exists(keyfile): |
126 | + utils.juju_log('INFO', 'ceph: Keyfile exists at %s.' % keyfile) |
127 | + fd = open(keyfile, 'w') |
128 | + fd.write(key) |
129 | + fd.close() |
130 | + utils.juju_log('INFO', 'ceph: Created new keyfile at %s.' % keyfile) |
131 | + |
132 | + |
133 | +def get_ceph_nodes(): |
134 | + hosts = [] |
135 | + for r_id in utils.relation_ids('ceph'): |
136 | + for unit in utils.relation_list(r_id): |
137 | + hosts.append(utils.relation_get('private-address', |
138 | + unit=unit, rid=r_id)) |
139 | + return hosts |
140 | + |
141 | + |
142 | +def configure(service, key, auth): |
143 | + create_keyring(service, key) |
144 | + create_key_file(service, key) |
145 | + hosts = get_ceph_nodes() |
146 | + mon_hosts = ",".join(map(str, hosts)) |
147 | + keyring = keyring_path(service) |
148 | + with open('/etc/ceph/ceph.conf', 'w') as ceph_conf: |
149 | + ceph_conf.write(CEPH_CONF % locals()) |
150 | + modprobe_kernel_module('rbd') |
151 | + |
152 | + |
153 | +def image_mapped(image_name): |
154 | + (rc, out) = commands.getstatusoutput('rbd showmapped') |
155 | + return image_name in out |
156 | + |
157 | +def map_block_storage(service, pool, image): |
158 | + cmd = [ |
159 | + 'rbd', |
160 | + 'map', |
161 | + '%s/%s' % (pool, image), |
162 | + '--user', |
163 | + service, |
164 | + '--secret', |
165 | + keyfile_path(service), |
166 | + ] |
167 | + execute(cmd) |
168 | + |
169 | + |
170 | +def filesystem_mounted(fs): |
171 | + return subprocess.call(['grep', '-wqs', fs, '/proc/mounts']) == 0 |
172 | + |
173 | +def make_filesystem(blk_device, fstype='ext4'): |
174 | + utils.juju_log('INFO', |
175 | + 'ceph: Formatting block device %s as filesystem %s.' %\ |
176 | + (blk_device, fstype)) |
177 | + cmd = ['mkfs', '-t', fstype, blk_device] |
178 | + execute(cmd) |
179 | + |
180 | + |
181 | +def place_data_on_ceph(service, blk_device, data_src_dst, fstype='ext4'): |
182 | + # mount block device into /mnt |
183 | + cmd = ['mount', '-t', fstype, blk_device, '/mnt'] |
184 | + execute(cmd) |
185 | + |
186 | + # copy data to /mnt |
187 | + try: |
188 | + copy_files(data_src_dst, '/mnt') |
189 | + except: |
190 | + pass |
191 | + |
192 | + # umount block device |
193 | + cmd = ['umount', '/mnt'] |
194 | + execute(cmd) |
195 | + |
196 | + _dir = os.stat(data_src_dst) |
197 | + uid = _dir.st_uid |
198 | + gid = _dir.st_gid |
199 | + |
200 | + # re-mount where the data should originally be |
201 | + cmd = ['mount', '-t', fstype, blk_device, data_src_dst] |
202 | + execute(cmd) |
203 | + |
204 | + # ensure original ownership of new mount. |
205 | + cmd = ['chown', '-R', '%s:%s' % (uid, gid), data_src_dst] |
206 | + execute(cmd) |
207 | + |
208 | +# TODO: re-use |
209 | +def modprobe_kernel_module(module): |
210 | + utils.juju_log('INFO','Loading kernel module') |
211 | + cmd = ['modprobe', module] |
212 | + execute(cmd) |
213 | + cmd = 'echo %s >> /etc/modules' % module |
214 | + execute_shell(cmd) |
215 | + |
216 | + |
217 | +def copy_files(src, dst, symlinks=False, ignore=None): |
218 | + for item in os.listdir(src): |
219 | + s = os.path.join(src, item) |
220 | + d = os.path.join(dst, item) |
221 | + if os.path.isdir(s): |
222 | + shutil.copytree(s, d, symlinks, ignore) |
223 | + else: |
224 | + shutil.copy2(s, d) |
225 | + |
226 | +def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point, |
227 | + blk_device, fstype, system_services=[]): |
228 | + """ |
229 | + To be called from the current cluster leader. |
230 | + Ensures given pool and RBD image exists, is mapped to a block device, |
231 | + and the device is formatted and mounted at the given mount_point. |
232 | + |
233 | + If formatting a device for the first time, data existing at mount_point |
234 | + will be migrated to the RBD device before being remounted. |
235 | + |
236 | + All services listed in system_services will be stopped prior to data |
237 | + migration and restarted when complete. |
238 | + """ |
239 | + # Ensure pool, RBD image, RBD mappings are in place. |
240 | + if not pool_exists(service, pool): |
241 | + utils.juju_log('INFO', 'ceph: Creating new pool %s.' % pool) |
242 | + create_pool(service, pool) |
243 | + |
244 | + if not rbd_exists(service, pool, rbd_img): |
245 | + utils.juju_log('INFO', 'ceph: Creating RBD image (%s).' % rbd_img) |
246 | + create_rbd_image(service, pool, rbd_img, sizemb) |
247 | + |
248 | + if not image_mapped(rbd_img): |
249 | + utils.juju_log('INFO', 'ceph: Mapping RBD Image as a Block Device.') |
250 | + map_block_storage(service, pool, rbd_img) |
251 | + |
252 | + # make file system |
253 | + # TODO: What happens if for whatever reason this is run again and |
254 | + # the data is already in the rbd device and/or is mounted?? |
255 | + # When it is mounted already, it will fail to make the fs |
256 | + # XXX: This is really sketchy! Need to at least add an fstab entry |
257 | + # otherwise this hook will blow away existing data if its executed |
258 | + # after a reboot. |
259 | + if not filesystem_mounted(mount_point): |
260 | + make_filesystem(blk_device, fstype) |
261 | + |
262 | + for svc in system_services: |
263 | + if utils.running(svc): |
264 | + utils.juju_log('INFO', |
265 | + 'Stopping services %s prior to migrating '\ |
266 | + 'data' % svc) |
267 | + utils.stop(svc) |
268 | + |
269 | + place_data_on_ceph(service, blk_device, mount_point, fstype) |
270 | + |
271 | + for svc in system_services: |
272 | + utils.start(svc) |
273 | |
274 | === modified file 'hooks/rabbit_utils.py' |
275 | --- hooks/rabbit_utils.py 2013-01-23 21:56:44 +0000 |
276 | +++ hooks/rabbit_utils.py 2013-03-08 20:30:29 +0000 |
277 | @@ -1,3 +1,4 @@ |
278 | +import os |
279 | import re |
280 | import subprocess |
281 | import utils |
282 | @@ -95,3 +96,29 @@ |
283 | subprocess.check_call(cmd) |
284 | cmd = [RABBITMQ_CTL, 'start_app'] |
285 | subprocess.check_call(cmd) |
286 | + |
287 | + |
288 | +def set_node_name(name): |
289 | + # update or append RABBITMQ_NODENAME to environment config. |
290 | + # rabbitmq.conf.d is not present on all releases, so use or create |
291 | + # rabbitmq-env.conf instead. |
292 | + conf = '/etc/rabbitmq/rabbitmq-env.conf' |
293 | + |
294 | + if not os.path.isfile(conf): |
295 | + utils.juju_log('INFO', '%s does not exist, creating.' % conf) |
296 | + with open(conf, 'wb') as out: |
297 | + out.write('RABBITMQ_NODENAME=%s\n' % name) |
298 | + return |
299 | + |
300 | + out = [] |
301 | + f = False |
302 | + for line in open(conf).readlines(): |
303 | + if line.strip().startswith('RABBITMQ_NODENAME'): |
304 | + f = True |
305 | + line = 'RABBITMQ_NODENAME=%s\n' % name |
306 | + out.append(line) |
307 | + if not f: |
308 | + out.append('RABBITMQ_NODENAME=%s\n' % name) |
309 | + utils.juju_log('INFO', 'Updating %s, RABBITMQ_NODENAME=%s' % (conf, name)) |
310 | + with open(conf, 'wb') as conf: |
311 | + conf.write(''.join(out)) |
312 | |
313 | === modified file 'hooks/rabbitmq-server-relations.py' |
314 | --- hooks/rabbitmq-server-relations.py 2013-01-23 21:56:44 +0000 |
315 | +++ hooks/rabbitmq-server-relations.py 2013-03-08 20:30:29 +0000 |
316 | @@ -1,34 +1,37 @@ |
317 | #!/usr/bin/python |
318 | |
319 | -import rabbit_utils as rabbit |
320 | -import utils |
321 | - |
322 | import os |
323 | +import shutil |
324 | import sys |
325 | import subprocess |
326 | |
327 | + |
328 | +import rabbit_utils as rabbit |
329 | +import utils |
330 | +import lib.ceph as ceph |
331 | + |
332 | +SERVICE_NAME = os.getenv('JUJU_UNIT_NAME').split('/')[0] |
333 | +POOL_NAME = SERVICE_NAME |
334 | +RABBIT_DIR='/var/lib/rabbitmq' |
335 | + |
336 | def install(): |
337 | utils.install(*rabbit.PACKAGES) |
338 | utils.expose(5672) |
339 | |
340 | -def amqp_changed(): |
341 | - l_unit_no=os.getenv('JUJU_UNIT_NAME').split('/')[1] |
342 | - r_unit_no=None |
343 | - for rid in utils.relation_ids('cluster'): |
344 | - for unit in utils.relation_list(rid): |
345 | - r_unit_no = unit.split('/')[1] |
346 | - if l_unit_no > r_unit_no: |
347 | - msg = 'amqp_changed(): Deferring amqp_changed to leader.' |
348 | - utils.juju_log('INFO', msg) |
349 | - return |
350 | +def amqp_changed(relation_id=None, remote_unit=None): |
351 | + if not utils.eligible_leader('res_rabbitmq_vip'): |
352 | + msg = 'amqp_changed(): Deferring amqp_changed to eligible_leader.' |
353 | + utils.juju_log('INFO', msg) |
354 | + return |
355 | |
356 | - rabbit_user=utils.relation_get('username') |
357 | - vhost=utils.relation_get('vhost') |
358 | + rabbit_user = utils.relation_get('username', rid=relation_id, |
359 | + unit=remote_unit) |
360 | + vhost = utils.relation_get('vhost', rid=relation_id, unit=remote_unit) |
361 | if None in [rabbit_user, vhost]: |
362 | utils.juju_log('INFO', 'amqp_changed(): Relation not ready.') |
363 | return |
364 | |
365 | - password_file = '/var/lib/juju/%s.passwd' % rabbit_user |
366 | + password_file = os.path.join(RABBIT_DIR, '%s.passwd' % rabbit_user) |
367 | if os.path.exists(password_file): |
368 | password = open(password_file).read().strip() |
369 | else: |
370 | @@ -47,10 +50,17 @@ |
371 | if utils.is_clustered(): |
372 | relation_settings['clustered'] = 'true' |
373 | relation_settings['vip'] = utils.config_get('vip') |
374 | + if relation_id: |
375 | + relation_settings['rid'] = relation_id |
376 | utils.relation_set(**relation_settings) |
377 | |
378 | |
379 | def cluster_joined(): |
380 | + if utils.is_relation_made('ha'): |
381 | + utils.juju_log('INFO', |
382 | + 'hacluster relation is present, skipping native '\ |
383 | + 'rabbitmq cluster config.') |
384 | + return |
385 | l_unit_no = os.getenv('JUJU_UNIT_NAME').split('/')[1] |
386 | r_unit_no = os.getenv('JUJU_REMOTE_UNIT').split('/')[1] |
387 | if l_unit_no > r_unit_no: |
388 | @@ -66,6 +76,11 @@ |
389 | |
390 | |
391 | def cluster_changed(): |
392 | + if utils.is_relation_made('ha'): |
393 | + utils.juju_log('INFO', |
394 | + 'hacluster relation is present, skipping native '\ |
395 | + 'rabbitmq cluster config.') |
396 | + return |
397 | l_unit_no = os.getenv('JUJU_UNIT_NAME').split('/')[1] |
398 | r_unit_no = os.getenv('JUJU_REMOTE_UNIT').split('/')[1] |
399 | if l_unit_no < r_unit_no: |
400 | @@ -98,37 +113,126 @@ |
401 | vip = utils.config_get('vip') |
402 | vip_iface = utils.config_get('vip_iface') |
403 | vip_cidr = utils.config_get('vip_cidr') |
404 | + rbd_name = utils.config_get('rbd-name') |
405 | + |
406 | if None in [corosync_bindiface, corosync_mcastport, vip, vip_iface, |
407 | - vip_cidr]: |
408 | + vip_cidr, rbd_name]: |
409 | utils.juju_log('ERROR', 'Insufficient configuration data to '\ |
410 | 'configure hacluster.') |
411 | sys.exit(1) |
412 | |
413 | + |
414 | + if not utils.is_relation_made('ceph'): |
415 | + utils.juju_log('INFO', |
416 | + 'ha_joined: No ceph relation yet, deferring.') |
417 | + return |
418 | + |
419 | + # rabbit node-name need to match on all nodes. |
420 | + utils.juju_log('INFO','Stopping rabbitmq-server.') |
421 | + utils.stop('rabbitmq-server') |
422 | + |
423 | + rabbit.set_node_name('%s@localhost' % SERVICE_NAME) |
424 | + |
425 | relation_settings = {} |
426 | relation_settings['corosync_bindiface'] = corosync_bindiface |
427 | relation_settings['corosync_mcastport'] = corosync_mcastport |
428 | + |
429 | relation_settings['resources'] = { |
430 | - 'res_rabbitmq_vip': 'ocf:heartbeat:IPaddr2' |
431 | + 'res_rabbitmq_rbd':'ocf:ceph:rbd', |
432 | + 'res_rabbitmq_fs':'ocf:heartbeat:Filesystem', |
433 | + 'res_rabbitmq_vip':'ocf:heartbeat:IPaddr2', |
434 | + 'res_rabbitmq-server':'lsb:rabbitmq-server', |
435 | } |
436 | + |
437 | relation_settings['resource_params'] = { |
438 | - 'res_rabbitmq_vip': ('params ip="%s" cider_netmask="%s" nic="%s"' %\ |
439 | - (vip, vip_cidr, vip_iface)) |
440 | - } |
441 | - utils.relation_set(**relation_settings) |
442 | + 'res_rabbitmq_rbd': 'params name="%s" pool="%s" user="%s" secret="%s"' %\ |
443 | + (rbd_name, POOL_NAME, SERVICE_NAME, ceph.keyfile_path(SERVICE_NAME)), |
444 | + 'res_rabbitmq_fs': 'params device="/dev/rbd/%s/%s" directory="%s" '\ |
445 | + 'fstype="ext4" op start start-delay="10s"' %\ |
446 | + (POOL_NAME, rbd_name, RABBIT_DIR), |
447 | + 'res_rabbitmq_vip':'params ip="%s" cidr_netmask="%s" nic="%s"' %\ |
448 | + (vip, vip_cidr, vip_iface), |
449 | + 'res_rabbitmqd':'op start start-delay="5s" op monitor interval="5s"', |
450 | + } |
451 | + |
452 | + relation_settings['groups'] = { |
453 | + 'grp_rabbitmq':'res_rabbitmq_rbd res_rabbitmq_fs res_rabbitmq_vip '\ |
454 | + 'res_rabbitmq-server', |
455 | + } |
456 | + |
457 | + for rel_id in utils.relation_ids('ha'): |
458 | + utils.relation_set(rid=rel_id, **relation_settings) |
459 | |
460 | |
461 | def ha_changed(): |
462 | - if not utils.is_clustered: |
463 | + if not utils.is_clustered(): |
464 | return |
465 | vip = utils.config_get('vip') |
466 | utils.juju_log('INFO', 'ha_changed(): We are now HA clustered. '\ |
467 | 'Advertising our VIP (%s) to all AMQP clients.' %\ |
468 | vip) |
469 | relation_settings = {'vip': vip, 'clustered': 'true'} |
470 | + |
471 | + # need to re-authenticate all clients since node-name changed. |
472 | for rid in utils.relation_ids('amqp'): |
473 | - relation_settings['rid'] = rid |
474 | - utils.relation_set(**relation_settings) |
475 | - |
476 | + for unit in utils.relation_list(rid): |
477 | + amqp_changed(relation_id=rid, remote_unit=unit) |
478 | + |
479 | + |
480 | +def ceph_joined(): |
481 | + utils.juju_log('INFO', 'Start Ceph Relation Joined') |
482 | + ceph.install() |
483 | + utils.juju_log('INFO', 'Finish Ceph Relation Joined') |
484 | + |
485 | + |
486 | +def ceph_changed(): |
487 | + utils.juju_log('INFO', 'Start Ceph Relation Changed') |
488 | + auth = utils.relation_get('auth') |
489 | + key = utils.relation_get('key') |
490 | + if None in [auth, key]: |
491 | + utils.juju_log('INFO', 'Missing key or auth in relation') |
492 | + sys.exit(0) |
493 | + |
494 | + ceph.configure(service=SERVICE_NAME, key=key, auth=auth) |
495 | + |
496 | + if utils.eligible_leader('res_rabbitmq_vip'): |
497 | + rbd_img = utils.config_get('rbd-name') |
498 | + rbd_size = utils.config_get('rbd-size') |
499 | + sizemb = int(rbd_size.split('G')[0]) * 1024 |
500 | + blk_device = '/dev/rbd/%s/%s' % (POOL_NAME, rbd_img) |
501 | + ceph.ensure_ceph_storage(service=SERVICE_NAME, pool=POOL_NAME, |
502 | + rbd_img=rbd_img, sizemb=sizemb, |
503 | + fstype='ext4', mount_point=RABBIT_DIR, |
504 | + blk_device=blk_device, |
505 | + system_services=['rabbitmq-server']) |
506 | + else: |
507 | + utils.juju_log('INFO', |
508 | + 'This is not the peer leader. Not configuring RBD.') |
509 | + utils.juju_log('INFO','Stopping rabbitmq-server.') |
510 | + utils.stop('rabbitmq-server') |
511 | + |
512 | + # If 'ha' relation has been made before the 'ceph' relation |
513 | + # it is important to make sure the ha-relation data is being |
514 | + # sent. |
515 | + if utils.is_relation_made('ha'): |
516 | + utils.juju_log('INFO', '*ha* relation exists. Triggering ha_joined()') |
517 | + ha_joined() |
518 | + else: |
519 | + utils.juju_log('INFO', '*ha* relation does not exist.') |
520 | + utils.juju_log('INFO', 'Finish Ceph Relation Changed') |
521 | + |
522 | + |
523 | +def upgrade_charm(): |
524 | + # Ensure older passwd files in /var/lib/juju are moved to |
525 | + # /var/lib/rabbitmq which will end up replicated if clustered. |
526 | + for f in [f for f in os.listdir('/var/lib/juju') |
527 | + if os.path.isfile(os.path.join('/var/lib/juju', f))]: |
528 | + if f.endswith('.passwd'): |
529 | + s = os.path.join('/var/lib/juju', f) |
530 | + d = os.path.join('/var/lib/rabbitmq', f) |
531 | + m = 'upgrade_charm: Migrating stored passwd from %s to %s.' % (s, d) |
532 | + utils.juju_log('INFO', m) |
533 | + shutil.move(s, d) |
534 | |
535 | hooks = { |
536 | 'install': install, |
537 | @@ -137,6 +241,9 @@ |
538 | 'cluster-relation-changed': cluster_changed, |
539 | 'ha-relation-joined': ha_joined, |
540 | 'ha-relation-changed': ha_changed, |
541 | + 'ceph-relation-joined': ceph_joined, |
542 | + 'ceph-relation-changed': ceph_changed, |
543 | + 'upgrade-charm': upgrade_charm |
544 | } |
545 | |
546 | utils.do_hooks(hooks) |
547 | |
548 | === added symlink 'hooks/upgrade-charm' |
549 | === target is u'rabbitmq-server-relations.py' |
550 | === modified file 'hooks/utils.py' |
551 | --- hooks/utils.py 2013-01-23 21:56:44 +0000 |
552 | +++ hooks/utils.py 2013-03-08 20:30:29 +0000 |
553 | @@ -11,6 +11,10 @@ |
554 | import subprocess |
555 | import socket |
556 | import sys |
557 | +import fcntl |
558 | +import struct |
559 | +import json |
560 | +import time |
561 | |
562 | |
563 | def do_hooks(hooks): |
564 | @@ -25,6 +29,19 @@ |
565 | hook_func() |
566 | |
567 | |
568 | +def can_install(): |
569 | + try: |
570 | + fd = os.open("/var/lib/dpkg/lock", os.O_RDWR|os.O_CREAT|os.O_NOFOLLOW, 0640) |
571 | + fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) |
572 | + except IOError, message: |
573 | + os.close(fd) |
574 | + return False |
575 | + else: |
576 | + fcntl.lockf(fd, fcntl.LOCK_UN) |
577 | + os.close(fd) |
578 | + return True |
579 | + |
580 | + |
581 | def install(*pkgs): |
582 | cmd = [ |
583 | 'apt-get', |
584 | @@ -33,10 +50,131 @@ |
585 | ] |
586 | for pkg in pkgs: |
587 | cmd.append(pkg) |
588 | + while not can_install(): |
589 | + juju_log('INFO', |
590 | + "dpkg is busy, can't install %s yet, waiting..." % pkgs) |
591 | + time.sleep(1) |
592 | subprocess.check_call(cmd) |
593 | |
594 | TEMPLATES_DIR = 'templates' |
595 | |
596 | +try: |
597 | + import jinja2 |
598 | +except ImportError: |
599 | + install('python-jinja2') |
600 | + import jinja2 |
601 | + |
602 | +try: |
603 | + from netaddr import IPNetwork |
604 | +except ImportError: |
605 | + install('python-netaddr') |
606 | + from netaddr import IPNetwork |
607 | + |
608 | +try: |
609 | + import dns.resolver |
610 | +except ImportError: |
611 | + install('python-dnspython') |
612 | + import dns.resolver |
613 | + |
614 | + |
615 | +def render_template(template_name, context, template_dir=TEMPLATES_DIR): |
616 | + templates = jinja2.Environment( |
617 | + loader=jinja2.FileSystemLoader(template_dir) |
618 | + ) |
619 | + template = templates.get_template(template_name) |
620 | + return template.render(context) |
621 | + |
622 | + |
623 | +CLOUD_ARCHIVE = \ |
624 | +""" # Ubuntu Cloud Archive |
625 | +deb http://ubuntu-cloud.archive.canonical.com/ubuntu {} main |
626 | +""" |
627 | + |
628 | +CLOUD_ARCHIVE_POCKETS = { |
629 | + 'precise-folsom': 'precise-updates/folsom', |
630 | + 'precise-folsom/updates': 'precise-updates/folsom', |
631 | + 'precise-folsom/proposed': 'precise-proposed/folsom', |
632 | + 'precise-grizzly': 'precise-updates/grizzly', |
633 | + 'precise-grizzly/updates': 'precise-updates/grizzly', |
634 | + 'precise-grizzly/proposed': 'precise-proposed/grizzly' |
635 | + } |
636 | + |
637 | + |
638 | +def execute(cmd, die=False, echo=False): |
639 | + """ Executes a command |
640 | + |
641 | + if die=True, script will exit(1) if command does not return 0 |
642 | + if echo=True, output of command will be printed to stdout |
643 | + |
644 | + returns a tuple: (stdout, stderr, return code) |
645 | + """ |
646 | + p = subprocess.Popen(cmd.split(" "), |
647 | + stdout=subprocess.PIPE, |
648 | + stdin=subprocess.PIPE, |
649 | + stderr=subprocess.PIPE) |
650 | + stdout="" |
651 | + stderr="" |
652 | + |
653 | + def print_line(l): |
654 | + if echo: |
655 | + print l.strip('\n') |
656 | + sys.stdout.flush() |
657 | + |
658 | + for l in iter(p.stdout.readline, ''): |
659 | + print_line(l) |
660 | + stdout += l |
661 | + for l in iter(p.stderr.readline, ''): |
662 | + print_line(l) |
663 | + stderr += l |
664 | + |
665 | + p.communicate() |
666 | + rc = p.returncode |
667 | + |
668 | + if die and rc != 0: |
669 | + error_out("ERROR: command %s return non-zero.\n" % cmd) |
670 | + return (stdout, stderr, rc) |
671 | + |
672 | + |
673 | +def configure_source(): |
674 | + source = str(config_get('openstack-origin')) |
675 | + if not source: |
676 | + return |
677 | + if source.startswith('ppa:'): |
678 | + cmd = [ |
679 | + 'add-apt-repository', |
680 | + source |
681 | + ] |
682 | + subprocess.check_call(cmd) |
683 | + if source.startswith('cloud:'): |
684 | + install('ubuntu-cloud-keyring') |
685 | + pocket = source.split(':')[1] |
686 | + with open('/etc/apt/sources.list.d/cloud-archive.list', 'w') as apt: |
687 | + apt.write(CLOUD_ARCHIVE.format(CLOUD_ARCHIVE_POCKETS[pocket])) |
688 | + if source.startswith('deb'): |
689 | + l = len(source.split('|')) |
690 | + if l == 2: |
691 | + (apt_line, key) = source.split('|') |
692 | + cmd = [ |
693 | + 'apt-key', |
694 | + 'adv', '--keyserver keyserver.ubuntu.com', |
695 | + '--recv-keys', key |
696 | + ] |
697 | + subprocess.check_call(cmd) |
698 | + elif l == 1: |
699 | + apt_line = source |
700 | + |
701 | + with open('/etc/apt/sources.list.d/quantum.list', 'w') as apt: |
702 | + apt.write(apt_line + "\n") |
703 | + cmd = [ |
704 | + 'apt-get', |
705 | + 'update' |
706 | + ] |
707 | + subprocess.check_call(cmd) |
708 | + |
709 | +# Protocols |
710 | +TCP = 'TCP' |
711 | +UDP = 'UDP' |
712 | + |
713 | |
714 | def expose(port, protocol='TCP'): |
715 | cmd = [ |
716 | @@ -88,6 +226,25 @@ |
717 | return value |
718 | |
719 | |
720 | +def relation_get_dict(relation_id=None, remote_unit=None): |
721 | + """Obtain all relation data as dict by way of JSON""" |
722 | + cmd = 'relation-get --format=json' |
723 | + if relation_id: |
724 | + cmd += ' -r %s' % relation_id |
725 | + if remote_unit: |
726 | + remote_unit_orig = os.getenv('JUJU_REMOTE_UNIT', None) |
727 | + os.environ['JUJU_REMOTE_UNIT'] = remote_unit |
728 | + j = execute(cmd, die=True)[0] |
729 | + if remote_unit and remote_unit_orig: |
730 | + os.environ['JUJU_REMOTE_UNIT'] = remote_unit_orig |
731 | + d = json.loads(j) |
732 | + settings = {} |
733 | + # convert unicode to strings |
734 | + for k, v in d.iteritems(): |
735 | + settings[str(k)] = str(v) |
736 | + return settings |
737 | + |
738 | + |
739 | def relation_set(**kwargs): |
740 | cmd = [ |
741 | 'relation-set' |
742 | @@ -127,6 +284,118 @@ |
743 | return value |
744 | |
745 | |
746 | +def get_unit_hostname(): |
747 | + return socket.gethostname() |
748 | + |
749 | + |
750 | +def get_unit_name(): |
751 | + return os.environ.get('JUJU_UNIT_NAME').replace('/','-') |
752 | + |
753 | + |
754 | +def get_host_ip(hostname=unit_get('private-address')): |
755 | + try: |
756 | + # Test to see if already an IPv4 address |
757 | + socket.inet_aton(hostname) |
758 | + return hostname |
759 | + except socket.error: |
760 | + pass |
761 | + try: |
762 | + answers = dns.resolver.query(hostname, 'A') |
763 | + if answers: |
764 | + return answers[0].address |
765 | + except dns.resolver.NXDOMAIN: |
766 | + pass |
767 | + return None |
768 | + |
769 | + |
770 | +def restart(*services): |
771 | + for service in services: |
772 | + subprocess.check_call(['service', service, 'restart']) |
773 | + |
774 | + |
775 | +def stop(*services): |
776 | + for service in services: |
777 | + subprocess.check_call(['service', service, 'stop']) |
778 | + |
779 | + |
780 | +def start(*services): |
781 | + for service in services: |
782 | + subprocess.check_call(['service', service, 'start']) |
783 | + |
784 | + |
785 | +def running(service): |
786 | + # TODO: ensure compat. /w sysv init scripts. |
787 | + try: |
788 | + output = subprocess.check_output(['service', service, 'status']) |
789 | + except subprocess.CalledProcessError: |
790 | + return False |
791 | + else: |
792 | + if ("start/running" in output or |
793 | + "is running" in output): |
794 | + return True |
795 | + else: |
796 | + return False |
797 | + |
798 | + |
799 | +def disable_upstart_services(*services): |
800 | + for service in services: |
801 | + with open("/etc/init/{}.override".format(service), "w") as override: |
802 | + override.write("manual") |
803 | + |
804 | + |
805 | +def enable_upstart_services(*services): |
806 | + for service in services: |
807 | + path = '/etc/init/{}.override'.format(service) |
808 | + if os.path.exists(path): |
809 | + os.remove(path) |
810 | + |
811 | + |
812 | +def disable_lsb_services(*services): |
813 | + for service in services: |
814 | + subprocess.check_call(['update-rc.d', '-f', service, 'remove']) |
815 | + |
816 | + |
817 | +def enable_lsb_services(*services): |
818 | + for service in services: |
819 | + subprocess.check_call(['update-rc.d', '-f', service, 'defaults']) |
820 | + |
821 | + |
822 | +def get_iface_ipaddr(iface): |
823 | + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
824 | + return socket.inet_ntoa(fcntl.ioctl( |
825 | + s.fileno(), |
826 | + 0x8919, # SIOCGIFADDR |
827 | + struct.pack('256s', iface[:15]) |
828 | + )[20:24]) |
829 | + |
830 | + |
831 | +def get_iface_netmask(iface): |
832 | + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
833 | + return socket.inet_ntoa(fcntl.ioctl( |
834 | + s.fileno(), |
835 | + 0x891b, # SIOCGIFNETMASK |
836 | + struct.pack('256s', iface[:15]) |
837 | + )[20:24]) |
838 | + |
839 | + |
840 | +def get_netmask_cidr(netmask): |
841 | + netmask = netmask.split('.') |
842 | + binary_str = '' |
843 | + for octet in netmask: |
844 | + binary_str += bin(int(octet))[2:].zfill(8) |
845 | + return str(len(binary_str.rstrip('0'))) |
846 | + |
847 | + |
848 | +def get_network_address(iface): |
849 | + if iface: |
850 | + network = "{}/{}".format(get_iface_ipaddr(iface), |
851 | + get_netmask_cidr(get_iface_netmask(iface))) |
852 | + ip = IPNetwork(network) |
853 | + return str(ip.network) |
854 | + else: |
855 | + return None |
856 | + |
857 | + |
858 | def is_clustered(): |
859 | for r_id in (relation_ids('ha') or []): |
860 | for unit in (relation_list(r_id) or []): |
861 | @@ -138,10 +407,51 @@ |
862 | return False |
863 | |
864 | |
865 | -def is_leader(): |
866 | - status = execute('crm resource show res_ks_vip', echo=True)[0].strip() |
867 | +def is_leader(res): |
868 | + status = execute('crm resource show %s' % res, echo=True)[0].strip() |
869 | hostname = execute('hostname', echo=True)[0].strip() |
870 | if hostname in status: |
871 | return True |
872 | else: |
873 | return False |
874 | + |
875 | + |
876 | +def peer_units(): |
877 | + peers = [] |
878 | + for r_id in (relation_ids('cluster') or []): |
879 | + for unit in (relation_list(r_id) or []): |
880 | + peers.append(unit) |
881 | + return peers |
882 | + |
883 | + |
884 | +def oldest_peer(peers): |
885 | + local_unit_no = os.getenv('JUJU_UNIT_NAME').split('/')[1] |
886 | + for peer in peers: |
887 | + remote_unit_no = peer.split('/')[1] |
888 | + if remote_unit_no < local_unit_no: |
889 | + return False |
890 | + return True |
891 | + |
892 | + |
893 | +def eligible_leader(res): |
894 | + if is_clustered(): |
895 | + if not is_leader(res): |
896 | + juju_log('INFO', 'Deferring action to CRM leader.') |
897 | + return False |
898 | + else: |
899 | + peers = peer_units() |
900 | + if peers and not oldest_peer(peers): |
901 | + juju_log('INFO', 'Deferring action to oldest service unit.') |
902 | + return False |
903 | + return True |
904 | + |
905 | + |
906 | +def is_relation_made(relation=None): |
907 | + relation_data = [] |
908 | + for r_id in (relation_ids(relation) or []): |
909 | + for unit in (relation_list(r_id) or []): |
910 | + relation_data.append(relation_get_dict(relation_id=r_id, |
911 | + remote_unit=unit)) |
912 | + if not relation_data: |
913 | + return False |
914 | + return True |
915 | |
916 | === modified file 'metadata.yaml' |
917 | --- metadata.yaml 2013-01-16 20:43:12 +0000 |
918 | +++ metadata.yaml 2013-03-08 20:30:29 +0000 |
919 | @@ -12,6 +12,8 @@ |
920 | ha: |
921 | interface: hacluster |
922 | scope: container |
923 | + ceph: |
924 | + interface: ceph-client |
925 | peers: |
926 | cluster: |
927 | - interface: rabbitmq |
928 | + interface: rabbitmq-ha |
929 | |
930 | === modified file 'revision' |
931 | --- revision 2013-01-23 21:56:44 +0000 |
932 | +++ revision 2013-03-08 20:30:29 +0000 |
933 | @@ -1,1 +1,1 @@ |
934 | -60 |
935 | +81 |
Note: deployment to precise currently requires the hacluster changes proposed here:
https:/ /code.launchpad .net/~gandelman -a/charms/ precise/ hacluster/ ocf/+merge/ 151345