Merge lp:~cgb-cs/appscale/main-consistency into lp:appscale

Proposed by Chris Bunch
Status: Rejected
Rejected by: Navraj Chohan
Proposed branch: lp:~cgb-cs/appscale/main-consistency
Merge into: lp:appscale
Diff against target: 446 lines (+158/-42)
8 files modified
AppController/djinn.rb (+64/-10)
AppController/lib/repo.rb (+21/-9)
AppDB/cassandra/cassandra_helper.rb (+31/-0)
AppDB/cassandra/py_cassandra.py (+23/-10)
AppDB/voldemort/voldemort_helper.rb (+2/-2)
AppServer/demos/therepo/repo.py (+2/-2)
Neptune/neptune.rb (+8/-5)
Neptune/ssa_helper.rb (+7/-4)
To merge this branch: bzr merge lp:~cgb-cs/appscale/main-consistency
Reviewer Review Type Date Requested Status
Navraj Chohan (community) Disapprove
Review via email: mp+77709@code.launchpad.net

Description of the change

Adds the ability to specify consistency levels for Cassandra - tools-consistency branch also required for this.

To post a comment you must log in.
lp:~cgb-cs/appscale/main-consistency updated
787. By Chris Bunch

refactored get_nearest_ip to ping all db nodes and return the one that responded fastest, instead of just picking one at random

788. By Chris Bunch

fixed repo app to store and retrieve binary items with appdb, and restored repo template, which was corrupted by a previous revision. also refactored get_nearest_db_ip to default to the first db node in case other machines dont respond to pings

789. By Chris Bunch

re-enabling hybrid cloud Neptune jobs and removing output from SSA jobs properly now

790. By Chris Bunch

fixed copy-pasta'd code used for hybrid cloud deployments

791. By Chris Bunch

added documentation, installing nslookup on lucid, and telling ssa helper to log when it is done running a job and not report individual run times (as there can be up to 1 million of them)

Revision history for this message
Navraj Chohan (nchohan) :
review: Disapprove

Unmerged revisions

791. By Chris Bunch

added documentation, installing nslookup on lucid, and telling ssa helper to log when it is done running a job and not report individual run times (as there can be up to 1 million of them)

790. By Chris Bunch

fixed copy-pasta'd code used for hybrid cloud deployments

789. By Chris Bunch

re-enabling hybrid cloud Neptune jobs and removing output from SSA jobs properly now

788. By Chris Bunch

fixed repo app to store and retrieve binary items with appdb, and restored repo template, which was corrupted by a previous revision. also refactored get_nearest_db_ip to default to the first db node in case other machines dont respond to pings

787. By Chris Bunch

refactored get_nearest_ip to ping all db nodes and return the one that responded fastest, instead of just picking one at random

786. By Chris Bunch

added ability to alter cassandra's read and write consistency settings

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'AppController/djinn.rb'
2--- AppController/djinn.rb 2011-09-18 03:02:52 +0000
3+++ AppController/djinn.rb 2011-10-11 17:59:26 +0000
4@@ -98,11 +98,16 @@
5
6 public
7
8+ # This method is exposed via SOAP and lets the tools and other AppControllers
9+ # know if this node is done starting up all the services it runs.
10 def done(secret)
11 return BAD_SECRET_MSG unless valid_secret?(secret)
12 return @done_loading
13 end
14
15+ # This method is exposed via SOAP and is used to stop all the services in a
16+ # given node for Xen and KVM deployments. In cloud deployments, this method
17+ # kills all the nodes across all clouds.
18 def kill(secret)
19 return BAD_SECRET_MSG unless valid_secret?(secret)
20 @kill_sig_received = true
21@@ -155,6 +160,13 @@
22 return "OK"
23 end
24
25+ # This method is exposed via SOAP and is called to initialize an
26+ # AppController. This specifically involves telling it who else is running
27+ # in the system (djinn_locations) and what they are working on, how to
28+ # access the datastore (database_credentials), and the list of applications
29+ # that should be loaded (app_names). The tools will call this method on the
30+ # first node in the system (the master), and the master will call this method
31+ # on the other nodes once they are ready.
32 def set_parameters(djinn_locations, database_credentials, app_names, secret)
33 return BAD_SECRET_MSG unless valid_secret?(secret)
34
35@@ -212,6 +224,8 @@
36 return "OK"
37 end
38
39+ # Tells an AppController which applications should be loaded (via their
40+ # application IDs).
41 def set_apps(app_names, secret)
42 return BAD_SECRET_MSG unless valid_secret?(secret)
43
44@@ -649,8 +663,34 @@
45 # If there is a local database then use it
46 local_ip
47 else
48- # Otherwise just select one randomly
49- db_ips.sort_by { rand }[0]
50+ # Otherwise, ping all of them five times and pick the one that responded
51+ # the fastest. Note that this can cause problems if a node doesn't respond
52+ # to pings.
53+ num_times_to_ping = 5
54+ timeout = 10 # seconds
55+
56+ Djinn.log_debug("Finding the fastest DB node to use")
57+
58+ fastest_ip = db_ips[0]
59+ fastest_time = INFINITY
60+ db_ips.each { |ip|
61+ ping_data = `ping #{ip} -c #{num_times_to_ping} -w #{timeout}`
62+ times = ping_data.scan(/time=(.*) ms/).flatten
63+ times = times.map! { |time| Float(time) } # convert Strings to Floats
64+ sum = times.reduce(0.0) { |sum, val| sum + val}
65+ avg = sum / times.length
66+ if avg < fastest_time
67+ Djinn.log_debug("Found faster node: #{ip} responded in #{avg} ms")
68+ fastest_ip = ip
69+ fastest_time = avg
70+ end
71+ }
72+
73+ Djinn.log_debug("The node that responded the fastest out of the ips " +
74+ "[#{db_ips.join(', ')}] was #{fastest_ip}, which responded in " +
75+ "#{fastest_time} msec")
76+
77+ return fastest_ip
78 end
79 end
80
81@@ -1003,9 +1043,9 @@
82 end
83
84 @nodes.each { |node|
85- #pub = location.public_ip
86+ #pub = node.public_ip
87 #if pub =~ /#{FQDN_REGEX}/
88- # location.public_ip = HelperFunctions.convert_fqdn_to_ip(pub)
89+ # node.public_ip = HelperFunctions.convert_fqdn_to_ip(pub)
90 #end
91
92 pri = node.private_ip
93@@ -1013,7 +1053,7 @@
94 begin
95 node.private_ip = HelperFunctions.convert_fqdn_to_ip(pri)
96 rescue Exception => e
97- node.private_ip = location.public_ip
98+ node.private_ip = node.public_ip
99 end
100 end
101 }
102@@ -1178,10 +1218,14 @@
103 # for neptune jobs, start a place where they can save output to
104 # also, since repo does health checks on the app engine apis, start it up there too
105
106- repo_ip = get_shadow.public_ip
107- repo_private_ip = get_shadow.private_ip
108- repo_ip = my_node.public_ip if my_node.is_appengine?
109- repo_private_ip = my_node.private_ip if my_node.is_appengine?
110+ if my_node.is_appengine?
111+ repo_ip = my_node.public_ip
112+ repo_private_ip = my_node.private_ip
113+ else
114+ repo_ip = get_shadow.public_ip
115+ repo_private_ip = get_shadow.private_ip
116+ end
117+
118 Repo.init(repo_ip, repo_private_ip, @@secret)
119
120 if my_node.is_shadow? or my_node.is_appengine?
121@@ -1503,9 +1547,19 @@
122 # Invoke datastore helper function
123 setup_db_config_files(master_ip, slave_ips, @creds)
124
125+ # lucid doesn't have nslookup - for now just install it
126+ Djinn.log_run("apt-get install -y dnsutils")
127+
128 all_nodes = ""
129+ IP_REGEX = /\d+\.\d+\.\d+\.\d+/
130 @nodes.each_with_index { |node, index|
131- all_nodes << "#{node.private_ip} appscale-image#{index}\n"
132+ ip_to_use = node.private_ip
133+ if ip_to_use !~ IP_REGEX
134+ Djinn.log_debug("[etc hosts] #{ip_to_use} wasn't an IP address. converting...")
135+ ip_to_use =`nslookup #{ip_to_use}`.scan(/Address: (#{IP_REGEX})/).flatten.to_s
136+ Djinn.log_debug("[etc hosts] #{node,private_ip} was converted to #{ip_to_use}")
137+ end
138+ all_nodes << "#{ip_to_use} appscale-image#{index}\n"
139 }
140
141 etc_hosts = "/etc/hosts"
142
143=== modified file 'AppController/lib/repo.rb'
144--- AppController/lib/repo.rb 2011-08-10 03:22:11 +0000
145+++ AppController/lib/repo.rb 2011-10-11 17:59:26 +0000
146@@ -127,8 +127,16 @@
147
148 def self.get(key, type, storage, creds, is_file=false)
149 if storage == "appdb"
150- result = `curl http://#{@@ip}:8079/get -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{key}' -d 'TYPE=#{type}'`
151- result = URI.unescape(result)
152+ Djinn.log_debug("performing a get on key [#{key}], type [#{type}]")
153+ get_url = "http://#{@@ip}:8079/get"
154+ params = {'SECRET' => @@secret, 'KEY' => key, 'TYPE' => type}
155+ data = Net::HTTP.post_form(URI.parse(get_url), params).body
156+ decoded_data = Base64.decode64(data)
157+
158+ if is_file
159+ HelperFunctions.write_file(is_file, decoded_data)
160+ end
161+ result = decoded_data
162 elsif storage == "s3"
163 conn = self.get_s3_conn(creds)
164 bucket, file = self.parse_s3_key(key)
165@@ -179,20 +187,24 @@
166
167 result = false
168 begin
169- res = Net::HTTP.post_form(URI.parse("http://#{@@ip}:8079/set"),
170- {'SECRET' => @@secret, 'KEY' => key,
171- 'VALUE' => val, 'TYPE' => type})
172+ encoded_val = Base64.encode64(val)
173+ set_url = "http://#{@@ip}:8079/set"
174+ params = {'SECRET' => @@secret, 'KEY' => key,
175+ 'VALUE' => encoded_val, 'TYPE' => type}
176+ res = Net::HTTP.post_form(URI.parse(set_url), params)
177 Djinn.log_debug("set key=#{key} type=#{type} returned #{res.body}")
178 result = true if res.body == "success"
179 rescue Exception => e
180 Djinn.log_debug("saw exception #{e.class} when posting userdata to repo at #{key}")
181 end
182-
183 end
184 else
185- Djinn.log_debug("attempting to put local file #{val} into bucket #{bucket}, location #{file}")
186- val = URI.escape(val, Regexp.new("[^#{URI::PATTERN::UNRESERVED}]"))
187- result = `curl http://#{@@ip}:8079/set -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{key}' -d 'VALUE=#{val}' -d 'TYPE=#{type}'`
188+ Djinn.log_debug("attempting to put local file into location #{key}")
189+ encoded_val = Base64.encode64(val)
190+ set_url = "http://#{@@ip}:8079/set"
191+ params = {'SECRET' => @@secret, 'KEY' => key,
192+ 'VALUE' => encoded_val, 'TYPE' => type}
193+ result = Net::HTTP.post_form(URI.parse(set_url), params).body
194 Djinn.log_debug("set key=#{key} type=#{type} returned #{result}")
195 result = true if result == "success"
196 end
197
198=== modified file 'AppDB/cassandra/cassandra_helper.rb'
199--- AppDB/cassandra/cassandra_helper.rb 2011-05-30 01:04:15 +0000
200+++ AppDB/cassandra/cassandra_helper.rb 2011-10-11 17:59:26 +0000
201@@ -36,6 +36,37 @@
202 }
203 }
204 }
205+
206+ setup_consistency_settings(creds)
207+end
208+
209+# Writes the Cassandra interface file's read and write consistency levels.
210+# Requires the parameter 'creds' to have defined at least a read or write
211+# policy - if these are not defined, the default value in py_cassandra.py
212+# is used.
213+
214+def setup_consistency_settings(creds)
215+ return if creds["read_factor"].nil? and creds["write_factor"].nil?
216+ cassandra_interface = "#{APPSCALE_HOME}/AppDB/cassandra/py_cassandra.py"
217+
218+ contents = ""
219+ File.open(cassandra_interface) { |source_file|
220+ contents = source_file.read
221+
222+ if !creds["read_factor"].nil?
223+ read_factor = "CONSISTENCY_#{creds['read_factor']}"
224+ contents.gsub!(/READ_FACTOR = (.*)/, "READ_FACTOR = #{read_factor}")
225+ end
226+
227+ if !creds["write_factor"].nil?
228+ write_factor = "CONSISTENCY_#{creds['write_factor']}"
229+ contents.gsub!(/WRITE_FACTOR = (.*)/, "WRITE_FACTOR = #{write_factor}")
230+ end
231+ }
232+
233+ File.open(cassandra_interface, "w+") { |dest_file|
234+ dest_file.write(contents)
235+ }
236 end
237
238 def start_db_master()
239
240=== modified file 'AppDB/cassandra/py_cassandra.py'
241--- AppDB/cassandra/py_cassandra.py 2011-05-31 00:01:41 +0000
242+++ AppDB/cassandra/py_cassandra.py 2011-10-11 17:59:26 +0000
243@@ -38,10 +38,19 @@
244 DEFAULT_HOST = "localhost"
245 DEFAULT_PORT = 9160
246
247-#CONSISTENCY_ZERO = 0 # don't use this for reads
248+
249+CONSISTENCY_ZERO = 0 # don't use this for reads
250+CONSISTENCY_ANY = pycassa.cassandra.ttypes.ConsistencyLevel.ANY
251 CONSISTENCY_ONE = pycassa.cassandra.ttypes.ConsistencyLevel.ONE
252 CONSISTENCY_QUORUM = pycassa.cassandra.ttypes.ConsistencyLevel.QUORUM
253-#CONSISTENCY_ALL = 5 # don't use this for reads (next version may fix this)
254+CONSISTENCY_ALL = pycassa.cassandra.ttypes.ConsistencyLevel.ALL # don't use this for reads (next version may fix this)
255+
256+# These lines should not be removed: the AppController will replace these
257+# values if the user specifies it via the command-line. These default values
258+# are chosen to favor reads but keep strong consistency (consistent with the
259+# 20:1 read/write ratio cited by the Megastore paper).
260+READ_FACTOR = CONSISTENCY_ONE
261+WRITE_FACTOR = CONSISTENCY_ALL
262
263 MAX_ROW_COUNT = 10000000
264 table_cache = {}
265@@ -69,8 +78,8 @@
266 path = ColumnPath(COLUMN_FAMILY)
267 client = self.__setup_connection()
268 # Result is a column type which has name, value, timestamp
269- result = client.get_slice(row_key, path, slice_predicate,
270- CONSISTENCY_QUORUM)
271+ global READ_FACTOR
272+ result = client.get_slice(row_key, path, slice_predicate, READ_FACTOR)
273 for column in column_names:
274 for r in result:
275 c = r.column
276@@ -111,7 +120,8 @@
277 mutation = Mutation(column_or_supercolumn=c_or_sc)
278 mutations.append(mutation)
279 mutation_map = {row_key : { COLUMN_FAMILY : mutations } }
280- client.batch_mutate(mutation_map, CONSISTENCY_QUORUM)
281+ global WRITE_FACTOR
282+ client.batch_mutate(mutation_map, WRITE_FACTOR)
283 """except Exception, ex:
284 print "EXCEPTION"
285 self.logger.debug("Exception %s" % ex)
286@@ -137,10 +147,11 @@
287 end_key = table_name + '/~'
288 try:
289 cf = pycassa.ColumnFamily(self.pool, 'Standard1')
290+ global READ_FACTOR
291 keyslices = cf.get_range(columns=column_names,
292 start=start_key,
293 finish=end_key,
294- read_consistency_level=CONSISTENCY_QUORUM)
295+ read_consistency_level=READ_FACTOR)
296 keyslices = list(keyslices)
297 except Exception, ex:
298 self.logger.debug("Exception %s" % ex)
299@@ -173,8 +184,8 @@
300 client = self.__setup_connection()
301 curtime = self.timestamp()
302 # Result is a column type which has name, value, timestamp
303- client.remove(row_key, path, curtime,
304- CONSISTENCY_QUORUM)
305+ global WRITE_FACTOR
306+ client.remove(row_key, path, curtime, WRITE_FACTOR)
307 except Exception, ex:
308 self.logger.debug("Exception %s" % ex)
309 ret[0]+=("Exception: %s"%ex)
310@@ -213,10 +224,11 @@
311 end_key = table_name + '/~'
312 try:
313 cf = pycassa.ColumnFamily(self.pool, 'Standard1')
314+ global READ_FACTOR
315 keyslices = cf.get_range(columns=[],
316 start=start_key,
317 finish=end_key,
318- read_consistency_level=CONSISTENCY_QUORUM)
319+ read_consistency_level=READ_FACTOR)
320 except Exception, ex:
321 self.logger.debug("Exception %s" % ex)
322 result[0]+=("Exception: %s"%ex)
323@@ -225,10 +237,11 @@
324 for keyslice in keyslices:
325 row_key = keyslice[0]
326 client = self.__setup_connection()
327+ global WRITE_FACTOR
328 client.remove(row_key,
329 path,
330 curtime,
331- CONSISTENCY_QUORUM)
332+ WRITE_FACTOR)
333 keys_removed = True
334 if table_name not in table_cache and keys_removed:
335 result[0] += "Table does not exist"
336
337=== modified file 'AppDB/voldemort/voldemort_helper.rb'
338--- AppDB/voldemort/voldemort_helper.rb 2010-05-27 21:59:59 +0000
339+++ AppDB/voldemort/voldemort_helper.rb 2011-10-11 17:59:26 +0000
340@@ -39,8 +39,8 @@
341 # TODO: this should not use djinn class field.
342 setup_cluster_config(voldemort_conf_loc, database_nodes)
343 setup_server_config(voldemort_server_template, voldemort_conf_loc, my_db_id)
344- r = creds["voldemortr"]
345- w = creds["voldemortw"]
346+ r = creds["read_factor"]
347+ w = creds["write_factor"]
348 setup_stores_config(voldemort_stores_temp, voldemort_stores_loc, creds["replication"], r, w)
349 end # setup
350
351
352=== modified file 'AppServer/demos/therepo/repo.py'
353--- AppServer/demos/therepo/repo.py 2011-06-28 21:08:21 +0000
354+++ AppServer/demos/therepo/repo.py 2011-10-11 17:59:26 +0000
355@@ -31,7 +31,7 @@
356
357 import logging
358
359-SECRET = "SeHIb1ctOKWJ3RyBLPL1dE0XqJe52dMZ"
360+SECRET = "PLACE SECRET HERE"
361
362 NO_SECRET = "you failed to provide a secret"
363 BAD_SECRET = "you provided a bad secret"
364@@ -134,7 +134,7 @@
365
366 if type == "output":
367 entry = Entry(key_name = key)
368- entry.content = db.Blob(value)
369+ entry.content = db.Blob(str(value))
370 entry.acl = "private"
371 else: # type is acl
372 entry = Entry.get_by_key_name(key)
373
374=== modified file 'Neptune/neptune.rb'
375--- Neptune/neptune.rb 2011-06-07 01:42:50 +0000
376+++ Neptune/neptune.rb 2011-10-11 17:59:26 +0000
377@@ -33,7 +33,7 @@
378 Djinn.log_debug("got run request - #{job_data.inspect}")
379
380 prejob_status = can_run_job(job_data)
381- Djinn.log_debug("Pre-job status for job_data [#{job_data}] is [#{prejob_status}]")
382+ Djinn.log_debug("Pre-job status for job_data [#{job_data.inspect}] is [#{prejob_status}]")
383 unless prejob_status == :ok
384 return prejob_status
385 end
386@@ -62,6 +62,10 @@
387 end
388
389 code = job_data['@code']
390+ if code.nil? # e.g., in SSA runs, where the code is specified via '@tar'
391+ code = job_data['@tar']
392+ end
393+
394 dirs = code.split(/\//)
395 code_dir = dirs[0, dirs.length-1].join("/")
396
397@@ -463,7 +467,8 @@
398 Djinn.log_debug("acquiring nodes for hybrid cloud neptune job")
399
400 if nodes_needed.class == Array
401- nodes_needed = Hash[nodes_needed]
402+ Djinn.log_debug("received array with contents: #{nodes_needed.join(', ')}")
403+ nodes_needed = Hash[*nodes_needed]
404 Djinn.log_debug("request received to spawn hybrid nodes: #{nodes_needed.inspect}")
405 else
406 Djinn.log_debug("nodes_needed was not the right class - should have been Array but was #{nodes_needed.class}")
407@@ -565,9 +570,7 @@
408 end
409
410 def neptune_release_nodes(nodes_to_use, job_data)
411- if is_hybrid_cloud?
412- abort("hybrid cloud mode is definitely not supported")
413- elsif is_cloud?
414+ if is_hybrid_cloud? or is_cloud?
415 nodes_to_use.each { |node|
416 node.set_roles("open")
417 }
418
419=== modified file 'Neptune/ssa_helper.rb'
420--- Neptune/ssa_helper.rb 2011-06-02 03:35:37 +0000
421+++ Neptune/ssa_helper.rb 2011-10-11 17:59:26 +0000
422@@ -89,7 +89,10 @@
423 loop {
424 trajectories_left = trajectories - done
425 Djinn.log_debug("Need to run #{trajectories_left} more trajectories on #{cores} cores")
426- break if trajectories_left.zero?
427+ if trajectories_left.zero?
428+ Djinn.log_debug("Done running trajectories!")
429+ break
430+ end
431 need_to_run = [trajectories_left, cores].min
432
433 Djinn.log_debug("Running #{need_to_run} trajectories")
434@@ -167,10 +170,10 @@
435 TIMING: stddev compute time is #{standard_deviation(c_times)} seconds.
436 TIMING: average storage time is #{average(s_times)} seconds.
437 TIMING: stddev storage time is #{standard_deviation(s_times)} seconds.
438- RAW_DATA: node times are: [#{node_times.join(', ')}]
439- RAW_DATA: compute times are: [#{c_times.join(', ')}]
440- RAW_DATA: storage times are: [#{s_times.join(', ')}]
441 BAZ
442+ #RAW_DATA: node times are: [#{node_times.join(', ')}]
443+ #RAW_DATA: compute times are: [#{c_times.join(', ')}]
444+ #RAW_DATA: storage times are: [#{s_times.join(', ')}]
445
446 Djinn.log_debug(timing_info)
447

Subscribers

People subscribed via source and target branches