Merge lp:~ewollesen/charms/trusty/apache-spark/spark-config into lp:~bigdata-dev/charms/trusty/apache-spark/trunk

Proposed by Eric Wollesen
Status: Needs review
Proposed branch: lp:~ewollesen/charms/trusty/apache-spark/spark-config
Merge into: lp:~bigdata-dev/charms/trusty/apache-spark/trunk
Diff against target: 235 lines (+125/-17)
4 files modified
config.yaml (+17/-1)
hooks/callbacks.py (+20/-16)
hooks/eawutils.py (+45/-0)
tests/100-spark-config (+43/-0)
To merge this branch: bzr merge lp:~ewollesen/charms/trusty/apache-spark/spark-config
Reviewer Review Type Date Requested Status
Juju Big Data Development Pending
Review via email: mp+260782@code.launchpad.net

Description of the change

Adds config options for +spark_local_dir+ and +spark_driver_cores+.

To post a comment you must log in.
13. By Eric Wollesen

Merged ~bigdata-dev's trunk

14. By Eric Wollesen

Re-arrange the install callback, so config-changed works

15. By Eric Wollesen

Take two at a spark config amulet test.

The amulet configure method isn't working as I would expect. It doesn't
appear to be triggering the juju hooks that effect the requested
changes. As a result, the values aren't being written to disk, and
the test fails. Modifying values via juju set, however, works fine.

Unmerged revisions

15. By Eric Wollesen

Take two at a spark config amulet test.

The amulet configure method isn't working as I would expect. It doesn't
appear to be triggering the juju hooks that effect the requested
changes. As a result, the values aren't being written to disk, and
the test fails. Modifying values via juju set, however, works fine.

14. By Eric Wollesen

Re-arrange the install callback, so config-changed works

13. By Eric Wollesen

Merged ~bigdata-dev's trunk

12. By Eric Wollesen

Adds configuration for driver cores and local dir.

See config.yaml for details.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'config.yaml'
--- config.yaml 2015-05-30 04:34:18 +0000
+++ config.yaml 2015-06-03 05:31:01 +0000
@@ -4,4 +4,20 @@
4 default: ''4 default: ''
5 description: |5 description: |
6 URL from which to fetch resources (e.g., Hadoop binaries) instead of Launchpad.6 URL from which to fetch resources (e.g., Hadoop binaries) instead of Launchpad.
7 7 spark_driver_cores:
8 type: int
9 default: 1
10 description: |
11 Number of cores to use for the driver process, only in cluster
12 mode.
13 spark_local_dir:
14 type: string
15 default: /tmp
16 description: |
17 Directory to use for "scratch" space in Spark, including map
18 output files and RDDs that get stored on disk. This should be on a
19 fast, local disk in your system. It can also be a comma-separated
20 list of multiple directories on different disks. NOTE: In Spark
21 1.0 and later this will be overriden by SPARK_LOCAL_DIRS
22 (Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set
23 by the cluster manager.
824
=== modified file 'hooks/callbacks.py'
--- hooks/callbacks.py 2015-06-02 13:45:28 +0000
+++ hooks/callbacks.py 2015-06-03 05:31:01 +0000
@@ -1,4 +1,3 @@
1
2from subprocess import check_output, Popen1from subprocess import check_output, Popen
32
4import jujuresources3import jujuresources
@@ -6,7 +5,7 @@
6from charmhelpers.core import unitdata5from charmhelpers.core import unitdata
7from charmhelpers.contrib.bigdata import utils6from charmhelpers.contrib.bigdata import utils
8from path import Path7from path import Path
98import eawutils
109
11class Spark(object):10class Spark(object):
1211
@@ -18,8 +17,11 @@
18 return unitdata.kv().get('spark.installed')17 return unitdata.kv().get('spark.installed')
1918
20 def install(self, force=False):19 def install(self, force=False):
21 if not force and self.is_installed():20 if force or not self.is_installed():
22 return21 install_spark()
22 self.configure_spark()
23
24 def install_spark(self):
23 mirror_url = hookenv.config()['resources_mirror']25 mirror_url = hookenv.config()['resources_mirror']
24 jujuresources.fetch('spark-%s' % self.cpu_arch, mirror_url=mirror_url)26 jujuresources.fetch('spark-%s' % self.cpu_arch, mirror_url=mirror_url)
25 jujuresources.install('spark-%s' % self.cpu_arch,27 jujuresources.install('spark-%s' % self.cpu_arch,
@@ -29,9 +31,8 @@
29 self.dist_config.add_dirs()31 self.dist_config.add_dirs()
30 self.dist_config.add_packages()32 self.dist_config.add_packages()
31 self.setup_spark_config()33 self.setup_spark_config()
32 self.configure_spark()
33 unitdata.kv().set('spark.installed', True)34 unitdata.kv().set('spark.installed', True)
34 35
35 def install_demo(self):36 def install_demo(self):
36 '''37 '''
37 Install demo.sh script to /home/ubuntu38 Install demo.sh script to /home/ubuntu
@@ -42,11 +43,11 @@
42 Path(demo_source).copy(demo_target)43 Path(demo_source).copy(demo_target)
43 Path(demo_target).chmod(0o755)44 Path(demo_target).chmod(0o755)
44 Path(demo_target).chown('ubuntu', 'hadoop')45 Path(demo_target).chown('ubuntu', 'hadoop')
45 46
46 def setup_spark_config(self):47 def setup_spark_config(self):
47 '''48 '''
48 copy Spark's default configuration files to spark_conf property defined49 copy Spark's default configuration files to spark_conf property defined
49 in dist.yaml 50 in dist.yaml
50 '''51 '''
51 conf_dir = self.dist_config.path('spark') / 'conf'52 conf_dir = self.dist_config.path('spark') / 'conf'
52 self.dist_config.path('spark_conf').rmtree_p()53 self.dist_config.path('spark_conf').rmtree_p()
@@ -64,10 +65,10 @@
64 utils.re_edit_in_place(spark_log4j, {65 utils.re_edit_in_place(spark_log4j, {
65 r'log4j.rootCategory=INFO, console': 'log4j.rootCategory=ERROR, console',66 r'log4j.rootCategory=INFO, console': 'log4j.rootCategory=ERROR, console',
66 })67 })
67 68
68 def configure_spark(self):69 def configure_spark(self):
69 '''70 '''
70 Configure spark environment for all users 71 Configure spark environment for all users
71 '''72 '''
72 from subprocess import call73 from subprocess import call
73 spark_bin = self.dist_config.path('spark') / 'bin'74 spark_bin = self.dist_config.path('spark') / 'bin'
@@ -78,12 +79,15 @@
78 env['SPARK_CONF_DIR'] = self.dist_config.path('spark_conf')79 env['SPARK_CONF_DIR'] = self.dist_config.path('spark_conf')
79 self.configure_spark_hdfs()80 self.configure_spark_hdfs()
80 self.spark_optimize()81 self.spark_optimize()
82 spark_default = self.dist_config.path('spark_conf') / 'spark-defaults.conf'
83 spark_config = eawutils.getSparkConfig(hookenv.config())
84 eawutils.updateSparkConfig(spark_default, spark_config)
81 cmd = "chown -R ubuntu:hadoop {}".format (spark_home)85 cmd = "chown -R ubuntu:hadoop {}".format (spark_home)
82 call(cmd.split())86 call(cmd.split())
83 cmd = "chown -R ubuntu:hadoop {}".format (self.dist_config.path('spark_conf'))87 cmd = "chown -R ubuntu:hadoop {}".format (self.dist_config.path('spark_conf'))
84 call(cmd.split())88 call(cmd.split())
85 89
86 def configure_spark_hdfs(self): 90 def configure_spark_hdfs(self):
87 e = utils.read_etc_env()91 e = utils.read_etc_env()
88 utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', '/user/ubuntu/directory', env=e)92 utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', '/user/ubuntu/directory', env=e)
89 utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', '/user/ubuntu/directory', env=e)93 utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', '/user/ubuntu/directory', env=e)
@@ -107,19 +111,19 @@
107 r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled true',111 r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled true',
108 r'.*spark.eventLog.dir *.*':'spark.eventLog.dir hdfs:///user/ubuntu/directory',112 r'.*spark.eventLog.dir *.*':'spark.eventLog.dir hdfs:///user/ubuntu/directory',
109 })113 })
110 114
111 115
112 def start(self):116 def start(self):
113 e = utils.read_etc_env()117 e = utils.read_etc_env()
114 spark_home = self.dist_config.path('spark')118 spark_home = self.dist_config.path('spark')
115 if utils.jps("HistoryServer"):119 if utils.jps("HistoryServer"):
116 self.stop()120 self.stop()
117 utils.run_as('ubuntu', '{}/sbin/start-history-server.sh'.format(spark_home), 'hdfs:///user/ubuntu/directory', env=e)121 utils.run_as('ubuntu', '{}/sbin/start-history-server.sh'.format(spark_home), 'hdfs:///user/ubuntu/directory', env=e)
118 122
119 def stop(self):123 def stop(self):
120 e = utils.read_etc_env()124 e = utils.read_etc_env()
121 spark_home = self.dist_config.path('spark')125 spark_home = self.dist_config.path('spark')
122 utils.run_as('ubuntu', '{}/sbin/stop-history-server.sh'.format(spark_home), env=e) 126 utils.run_as('ubuntu', '{}/sbin/stop-history-server.sh'.format(spark_home), env=e)
123127
124 def cleanup(self):128 def cleanup(self):
125 self.dist_config.remove_dirs()129 self.dist_config.remove_dirs()
126130
=== added file 'hooks/eawutils.py'
--- hooks/eawutils.py 1970-01-01 00:00:00 +0000
+++ hooks/eawutils.py 2015-06-03 05:31:01 +0000
@@ -0,0 +1,45 @@
1# These functions should live in charmhelpers.contrib.bigdata.utils or
2# somewhere similar.
3import re
4from charmhelpers.contrib.bigdata import utils
5
6def updateSparkConfig(path, config):
7 """Updates spark config settings in +path+.
8
9 Assumes +path+ is in spark config file syntax."""
10 inserts, updates = calcSparkConfigUpserts(path, config)
11
12 utils.re_edit_in_place(path, updates)
13 with open(path, 'a') as configFile:
14 for item in inserts.items():
15 configFile.write("%s\t%s\n" % item)
16
17def calcSparkConfigUpserts(path, config):
18 """Calculate upserts to transform +path+ to +config+, idempotently.
19
20 Returns (inserts, updates)."""
21 inserts = config.copy()
22 updates = {}
23
24 with open(path, 'r') as configFile:
25 for line in configFile.readlines():
26 if line.startswith("#") or re.match('\A\s*\Z', line):
27 continue
28 key = line.split(None, 1)[0]
29 if key in config:
30 updates["^%s\s.*" % key] = "%s\t%s" % (key, config[key])
31 inserts.pop(key)
32
33 return inserts, updates
34
35def getKeysStartingWith(d, prefix):
36 "Return a dict of the keys prefixed with +prefix+."
37 return dict([(k,v) for k,v in d.items() if k.startswith(prefix)])
38
39def underscoreToDot(d):
40 "Return the dictionary with underscores in keys replaced with dots."
41 return dict([(k.replace("_", "."),v) for k,v in d.items()])
42
43def getSparkConfig(config):
44 "Return a dict of the keys prefixed with 'spark.', that have non-default values."
45 return underscoreToDot(getKeysStartingWith(config, "spark_"))
046
=== added file 'tests/100-spark-config'
--- tests/100-spark-config 1970-01-01 00:00:00 +0000
+++ tests/100-spark-config 2015-06-03 05:31:01 +0000
@@ -0,0 +1,43 @@
1#!/usr/bin/python3
2
3import unittest
4import amulet
5
6
7class TestSparkConfig(unittest.TestCase):
8 """
9 Configuration settings test for Apache Spark.
10 """
11
12 @classmethod
13 def setUpClass(cls):
14 cls.d = amulet.Deployment(series='trusty')
15 #### Deploy a hadoop cluster
16 cls.d.add('yarn-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
17 cls.d.add('hdfs-master', charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
18 cls.d.add('compute-slave', charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
19 cls.d.add('hadoop-plugin', charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
20 cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
21 cls.d.relate('yarn-master:resourcemanager', 'hadoop-plugin:resourcemanager')
22 cls.d.relate('hadoop-plugin:namenode', 'hdfs-master:namenode')
23
24 cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
25 cls.d.relate('compute-slave:datanode', 'hdfs-master:datanode')
26
27 ### Add Spark Service
28 cls.d.add('spark', 'apache-spark')
29 cls.d.configure('spark', {'spark_driver_cores': 2,
30 'spark_local_dir': '/var'})
31 cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
32
33 cls.d.setup(timeout=9000)
34 cls.d.sentry.wait()
35 cls.unit = cls.d.sentry.unit['spark/0']
36
37 def test_config_setting(self):
38 output, retcode = self.unit.run("grep -Pq 'spark.driver.cores\t2' /etc/spark/conf/spark-defaults.conf")
39 self.assertEqual(retcode, 0, 'failed to configure spark service\n')
40
41
42if __name__ == '__main__':
43 unittest.main()

Subscribers

People subscribed via source and target branches