Merge lp:~hegde-shashank/appscale/trunk into lp:~cgb-cs/appscale/main-cgb-research

Proposed by Shashank Hegde
Status: Needs review
Proposed branch: lp:~hegde-shashank/appscale/trunk
Merge into: lp:~cgb-cs/appscale/main-cgb-research
Diff against target: 2439 lines (+1665/-106) (has conflicts)
11 files modified
AppController/djinn.rb (+686/-25)
AppController/lib/djinn_job_data.rb (+25/-0)
AppController/lib/repo.rb (+238/-1)
AppController/lib/role_watcher.rb (+110/-0)
AppController/lib/zkinterface.rb (+195/-37)
AppServer/demos/therepo/repo.py (+8/-4)
Neptune/cewssa_helper.rb (+1/-1)
Neptune/dfsp_helper.rb (+1/-1)
Neptune/mpi_helper.rb (+11/-1)
Neptune/neptune.rb (+364/-36)
Neptune/neptune_job_data.rb (+26/-0)
Text conflict in AppController/djinn.rb
Text conflict in AppController/lib/djinn_job_data.rb
Text conflict in AppController/lib/repo.rb
Text conflict in AppController/lib/zkinterface.rb
Text conflict in Neptune/mpi_helper.rb
Text conflict in Neptune/neptune.rb
Text conflict in Neptune/neptune_job_data.rb
To merge this branch: bzr merge lp:~hegde-shashank/appscale/trunk
Reviewer Review Type Date Requested Status
Chris Bunch Pending
Review via email: mp+95036@code.launchpad.net

Description of the change

Adding fault tolerance to AppScale

To post a comment you must log in.

Unmerged revisions

801. By root <root@appscale-image0>

Deleting commented code

800. By root <root@appscale-image0>

refactored the code

799. By root <root@appscale-image0>

Neptune manager migration works!

798. By root <root@appscale-image0>

Saving neptune jobs and nodes to zookeeper

797. By root <root@appscale-image0>

Fixed neptune and saving the neptune data to zookeeper

796. By root <root@appscale-image0>

Storing nodes into zookeeper

795. By root <root@appscale-image0>

Added app manager failover

794. By root <root@appscale-image0>

Added leader election for status_monitor

793. By root <root@appscale-image0>

Added role watchers and callbacks

792. By root <root@appscale-image0>

Reconnecting to zookeeper if required

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'AppController/djinn.rb'
2--- AppController/djinn.rb 2012-02-20 22:36:25 +0000
3+++ AppController/djinn.rb 2012-02-28 20:21:24 +0000
4@@ -263,6 +263,14 @@
5 @registered_with_sisyphus = false
6 end
7
8+ LOG_DEBUG = "debug"
9+ LOG_TRACE = "trace"
10+ LOG_INFO = "info"
11+ LOG_ERROR = "error!"
12+
13+ ROOT_APP_PATH = "/apps"
14+ ROLE_PATH = "/roles"
15+
16 def done(secret)
17 if valid_secret?(secret)
18 return @done_loading
19@@ -272,9 +280,16 @@
20 end
21
22 def kill(secret)
23+<<<<<<< TREE
24 if !valid_secret?(secret)
25 return BAD_SECRET_MSG
26 end
27+=======
28+ return BAD_SECRET_MSG unless valid_secret?(secret)
29+
30+ unregister_roles
31+
32+>>>>>>> MERGE-SOURCE
33 @kill_sig_received = true
34
35 if is_hybrid_cloud?
36@@ -294,8 +309,15 @@
37 # turned on since that was the state they started in
38
39 stop_ejabberd if my_node.is_login?
40+<<<<<<< TREE
41 stop_sisyphus if my_node.is_appengine?
42 Repo.stop if my_node.is_shadow? or my_node.is_appengine?
43+=======
44+
45+ if my_node.is_status_monitor? or my_node.is_appengine?
46+ Repo.stop
47+ end
48+>>>>>>> MERGE-SOURCE
49
50 jobs_to_run = my_node.jobs
51 commands = {
52@@ -492,7 +514,7 @@
53 # error messages and have the tools poll it
54 Thread.new {
55 # Tell other nodes to shutdown this application
56- if @app_names.include?(app_name) and !my_node.is_appengine?
57+ if @app_names.include?(app_name) and my_node.is_app_manager?
58 @nodes.each { |node|
59 next if node.private_ip == my_node.private_ip
60 if node.is_appengine? or node.is_login?
61@@ -533,7 +555,7 @@
62 HAProxy.remove_app(app_name)
63 Nginx.reload
64 Collectd.restart
65- ZKInterface.remove_app_entry(app_name)
66+ zk_remove_app_entry(app_name)
67 # TODO God does not shut down the application, so do it here for
68 # A temp fix.
69 `ps -ef | grep dev_appserver | grep #{app_name} | grep -v grep | grep cookie_secret | awk '{print $2}' | xargs kill -9`
70@@ -602,7 +624,15 @@
71 parse_creds
72 change_job
73 end
74-
75+
76+ register_roles
77+ setup_watchers
78+
79+ keyname = @creds["keyname"]
80+
81+ # Initially the status monitor konws everything about all the nodes.
82+ save_nodes_to_zookeeper if my_node.is_status_monitor?
83+
84 while !@kill_sig_received do
85 @state = "Done starting up AppScale, now in heartbeat mode"
86 write_database_info
87@@ -611,22 +641,39 @@
88 update_api_status
89 send_logs_to_sisyphus
90
91- if my_node.is_shadow?
92+ @nodes.each { |n|
93+ Djinn.log(LOG_DEBUG, "NODE from memory:", "#{n}")
94+ }
95+
96+ # TODO: neptune manager might spawn/ delete nodes according to the neptune jobs.
97+ # So should we get the nodes from zookeeper in every loop? Or should we listen for changes?
98+ # @nodes must be updated on all nodes. so all nodes must listen/get the data from zookeeper
99+ # Getting the data from zookeeper in each loop now. Optimize this later.
100+ @nodes = zk_get_nodes(keyname)
101+
102+ neptune_nodes = get_neptune_nodes(@nodes)
103+
104+ if my_node.is_status_monitor?
105 @nodes.each { |node|
106 get_status(node)
107
108 if node.should_destroy?
109 Djinn.log_debug("Heartbeat - destroying node [#{node}]")
110 @nodes.delete(node)
111- @neptune_nodes.delete(node)
112+ zk_delete_node(keyname,node)
113 infrastructure = @creds["infrastructure"]
114 HelperFunctions.terminate_vms([node], infrastructure)
115 FileUtils.rm_f("/etc/appscale/status-#{node.private_ip}.json")
116 end
117 }
118+
119 Djinn.log_debug("Finished contacting all other nodes")
120+ else
121+ Djinn.log_debug("No need to heartbeat, we aren't the status monitor")
122+ end
123
124- @neptune_nodes.each { |node|
125+ if my_node.is_neptune_manager?
126+ neptune_nodes.each { |node|
127 Djinn.log_debug("Currently examining node [#{node}]")
128 if node.should_extend?
129 Djinn.log_debug("Extending time for node [#{node}]")
130@@ -634,19 +681,26 @@
131 elsif node.should_destroy?
132 Djinn.log_debug("Time is up for node [#{node}] - destroying it")
133 @nodes.delete(node)
134- @neptune_nodes.delete(node)
135+ zk_delete_node(keyname,node)
136 infrastructure = @creds["infrastructure"]
137 HelperFunctions.terminate_vms([node], infrastructure)
138 FileUtils.rm_f("/etc/appscale/status-#{node.private_ip}.json")
139 end
140 }
141- else
142- Djinn.log_debug("No need to heartbeat, we aren't the shadow")
143+
144+ save_neptune_data_to_zookeeper
145 end
146
147 # TODO: consider only calling this if new apps are found
148 start_appengine
149+<<<<<<< TREE
150 Kernel.sleep(20)
151+=======
152+
153+ #TODO: Check if the @nodes have changed. If so, update zookeeper
154+
155+ sleep(20)
156+>>>>>>> MERGE-SOURCE
157 end
158 end
159
160@@ -674,34 +728,39 @@
161 end
162
163 if File.exists?(location)
164- ZKInterface.add_app_entry(appname, my_node.serialize, location)
165- result = "success"
166+ zk_add_app_entry(appname, my_node, location)
167+ result = "success! #{appname} : #{location}"
168 else
169 result = "The #{appname} app was not found at #{location}."
170 end
171
172- Djinn.log_debug(result)
173+ Djinn.log(Djinn::LOG_TRACE, "done_uploading", result)
174 return result
175 end
176
177 def is_app_running(appname, secret)
178+<<<<<<< TREE
179 if !valid_secret?(secret)
180 return BAD_SECRET_MSG
181 end
182
183 hosters = ZKInterface.get_app_hosters(appname)
184+=======
185+ return BAD_SECRET_MSG unless valid_secret?(secret)
186+ hosters = zk_get_app_hosters(appname)
187+>>>>>>> MERGE-SOURCE
188 hosters_w_appengine = []
189 hosters.each { |node|
190 hosters_w_appengine << node if node.is_appengine?
191 }
192
193 app_running = !hosters_w_appengine.empty?
194- Djinn.log_debug("Is app #{appname} running? #{app_running}")
195 return app_running
196 end
197
198
199 def add_role(new_role, secret)
200+<<<<<<< TREE
201 if !valid_secret?(secret)
202 return BAD_SECRET_MSG
203 end
204@@ -722,22 +781,47 @@
205 send("start_#{role}".to_sym)
206 end
207 }
208+=======
209+ return BAD_SECRET_MSG unless valid_secret?(secret)
210+ my_node.add_roles(new_role)
211+ start_roles = new_role.split(":")
212+ start_roles.each { |role|
213+ Djinn.log(Djinn::LOG_TRACE,"add_role","Calling start_#{role}")
214+ send("start_#{role}".to_sym)
215+>>>>>>> MERGE-SOURCE
216 }
217-
218+<<<<<<< TREE
219+
220+=======
221+
222+ # add the node as a role actor
223+ zk_add_role(my_node, new_role)
224+
225+>>>>>>> MERGE-SOURCE
226 return "OK"
227 end
228
229 def remove_role(old_role, secret)
230+<<<<<<< TREE
231 if !valid_secret?(secret)
232 return BAD_SECRET_MSG
233 end
234
235 my_node.remove_roles(old_role)
236+=======
237+ Djinn.log(Djinn::LOG_TRACE,"remove_role","Removing role #{old_role}")
238+ return BAD_SECRET_MSG unless valid_secret?(secret)
239+>>>>>>> MERGE-SOURCE
240 stop_roles = old_role.split(":")
241 stop_roles.each { |role|
242 Djinn.log_debug("Removing and stopping role #{role}")
243 send("stop_#{role}".to_sym)
244 }
245+ my_node.remove_roles(old_role)
246+
247+ # remove the node as a role actor
248+ zk_remove_role(my_node, old_role)
249+
250 return "OK"
251 end
252
253@@ -750,6 +834,7 @@
254 # Important: Definitely do not log within the following three methods, as
255 # it would cause an infinite loop.
256 def self.log_debug(msg)
257+<<<<<<< TREE
258 time = Time.now
259 self.log_to_stdout(time, msg)
260 self.log_to_buffer(time, msg)
261@@ -777,6 +862,45 @@
262 # Logs and runs the given command, which is assumed to be trusted and thus
263 # needs no filtering on our part. Obviously this should not be executed by
264 # anything that the user could inject input into.
265+=======
266+ # TODO: reduce the copypasta here
267+ puts "(Appscale) [#{Time.now}] #{msg}"
268+ STDOUT.flush # TODO: examine performance impact of this
269+ if @lock.nil?
270+ begin
271+ Syslog.open("appscale") { |s| s.debug(msg) }
272+ rescue RuntimeError
273+ end
274+ else
275+ @lock.synchronize {
276+ begin
277+ Syslog.open("appscale") { |s| s.debug(msg) }
278+ rescue RuntimeError
279+ end
280+ }
281+ end
282+ end
283+
284+ def self.log(level, tag, msg)
285+ # TODO: reduce the copypasta here
286+ puts "(Appscale) [#{Time.now}] [#{level}] [#{tag}] #{msg}"
287+ STDOUT.flush # TODO: examine performance impact of this
288+ if @lock.nil?
289+ begin
290+ Syslog.open("appscale") { |s| s.debug(msg) }
291+ rescue RuntimeError
292+ end
293+ else
294+ @lock.synchronize {
295+ begin
296+ Syslog.open("appscale") { |s| s.debug(msg) }
297+ rescue RuntimeError
298+ end
299+ }
300+ end
301+ end
302+
303+>>>>>>> MERGE-SOURCE
304 def self.log_run(command)
305 Djinn.log_debug(command)
306 Djinn.log_debug(`#{command}`)
307@@ -819,6 +943,35 @@
308 return djinn_loc_array
309 end
310
311+<<<<<<< TREE
312+=======
313+ def initialize()
314+ @job = "none"
315+ @nodes = []
316+ @creds = {}
317+ @app_names = []
318+ @apps_loaded = []
319+ @kill_sig_received = false
320+ @my_index = nil
321+ @@secret = HelperFunctions.get_secret
322+ @done_loading = false
323+ @port = 8080
324+ @haproxy = 10000
325+ @userappserver_public_ip = "not-up-yet"
326+ @userappserver_private_ip = "not-up-yet"
327+ @state = "AppController just started"
328+ @total_boxes = 0
329+ @num_appengines = 3
330+ @restored = false
331+ @neptune_jobs = {}
332+ @neptune_nodes = []
333+ @lock = Monitor.new
334+ @api_status = {}
335+
336+ @watcher_callbacks = {}
337+ end
338+
339+>>>>>>> MERGE-SOURCE
340 def get_login
341 @nodes.each { |node|
342 return node if node.is_login?
343@@ -835,6 +988,30 @@
344 abort("No shadow nodes found.")
345 end
346
347+ def get_status_monitor
348+ @nodes.each { |node|
349+ return node if node.is_status_monitor?
350+ }
351+ return nil
352+ #abort("No status monitor found.")
353+ end
354+
355+ def get_app_manager
356+ @nodes.each { |node|
357+ return node if node.is_app_manager?
358+ }
359+ return nil
360+ #abort("No app manager found.")
361+ end
362+
363+ def get_neptune_manager
364+ @nodes.each { |node|
365+ return node if node.is_neptune_manager?
366+ }
367+ return nil
368+ #abort("No neptune manager found.")
369+ end
370+
371 def get_db_master
372 @nodes.each { |node|
373 return node if node.is_db_master?
374@@ -1062,6 +1239,7 @@
375 Djinn.log_debug("Restoring neptune data!")
376 jobs_info = (File.open(file_to_load) { |f| f.read }).chomp
377 jobs = []
378+<<<<<<< TREE
379
380 json_data = JSON.load(jobs_info)
381 return if json_data.nil?
382@@ -1074,6 +1252,27 @@
383
384 if @neptune_jobs[job_name].nil?
385 @neptune_jobs[job_name] = [this_job]
386+=======
387+ jobs_info.split("\n").each { |job|
388+ info = job.split("::")
389+ name = info[0]
390+ num_nodes = Integer(info[1])
391+
392+ begin
393+ start_time = Time._load(info[2])
394+ end_time = Time._load(info[3])
395+ rescue TypeError
396+ start_time = Time.now
397+ end_time = Time.now
398+ end
399+
400+ job_id = info[6]
401+
402+ this_job = NeptuneJobData.new(name, num_nodes, start_time, end_time, job_id)
403+
404+ if @neptune_jobs[name].nil?
405+ @neptune_jobs[name] = [this_job]
406+>>>>>>> MERGE-SOURCE
407 else
408 @neptune_jobs[job_name] = [this_job]
409 end
410@@ -1384,8 +1583,14 @@
411 @total_boxes = Integer(@creds['min_images'])
412 end
413
414+<<<<<<< TREE
415 Djinn.log_debug("Pre-loop: #{@nodes.join('\n')}")
416+=======
417+ Djinn.log_debug("pre-loop: #{@nodes.join('\n')}")
418+ # On the head node @nodes.size = 1. It contains only the head node at this point. Other nodes aren't populated untill the call to spawn_and_setup_appengine.
419+>>>>>>> MERGE-SOURCE
420 if @nodes.size == 1 and @total_boxes > 1
421+ #spawn_and_setup_appengine parses the ips from creds, converts it to nodes and add them to the node array.
422 spawn_and_setup_appengine
423 loop {
424 Djinn.log_debug("Looping: #{@nodes.join('\n')}")
425@@ -1480,6 +1685,7 @@
426 @state = "Starting up SOAP Server and PBServer"
427 start_pbserver
428 start_soap_server
429+<<<<<<< TREE
430 HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UserAppClient::SERVER_PORT)
431 end
432
433@@ -1508,8 +1714,112 @@
434 end
435
436 # appengine is started elsewhere
437- end
438-
439+=======
440+ HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT)
441+ end
442+
443+ start_blobstore_server if my_node.is_appengine?
444+
445+ # for neptune jobs, start a place where they can save output to
446+ # also, since repo does health checks on the app engine apis, start it up there too
447+
448+ repo_ip = get_status_monitor.public_ip
449+ repo_private_ip = get_status_monitor.private_ip
450+ repo_ip = my_node.public_ip if my_node.is_appengine?
451+ repo_private_ip = my_node.private_ip if my_node.is_appengine?
452+ Repo.init(repo_ip, repo_private_ip, @@secret)
453+
454+ if my_node.is_status_monitor? or my_node.is_appengine?
455+ Repo.start(get_login.public_ip, @userappserver_private_ip)
456+ end
457+
458+ # appengine is started elsewhere
459+>>>>>>> MERGE-SOURCE
460+ end
461+
462+ def start_status_monitor
463+ Djinn.log(Djinn::LOG_TRACE, "start_status_monitor", "Starting status monitor")
464+ my_node.add_roles(DjinnJobData::ROLE_STATUS_MTR)
465+
466+ # Get the @nodes from zookeeper
467+ Djinn.log(Djinn::LOG_TRACE, "start_status_monitor", "Getting nodes from zookeeper")
468+ keyname = @creds['keyname']
469+ @nodes = zk_get_nodes(keyname)
470+
471+ zk_add_role(my_node,DjinnJobData::ROLE_STATUS_MTR)
472+ zk_save_node(keyname,my_node)
473+ end
474+
475+ def stop_status_monitor
476+ Djinn.log(Djinn::LOG_TRACE,self.class,"Starting status monitor")
477+
478+ if !my_node.is_status_monitor?
479+ Djinn.log(Djinn::LOG_ERROR,"stop_status_monitor", "I am not the status monitor")
480+ return
481+ end
482+
483+ my_node.remove_roles(DjinnJobData::ROLE_STATUS_MTR)
484+ zk_remove_role(my_node,DjinnJobData::ROLE_STATUS_MTR)
485+ keyname = @creds['keyname']
486+ zk_save_node(keyname,my_node)
487+ end
488+
489+ def start_neptune_manager
490+ Djinn.log(LOG_TRACE, self.class, "Starting neptune manager")
491+ my_node.add_roles(DjinnJobData::ROLE_NEPTUNE_MGR)
492+ zk_add_role(my_node,DjinnJobData::ROLE_NEPTUNE_MGR)
493+ keyname = @creds['keyname']
494+ zk_save_node(keyname,my_node)
495+ Djinn.log(Djinn::LOG_TRACE, "start_neptune_manager", "Node saved, Getting neptune data from zookeeper")
496+
497+ # Get the neptune nodes and neptune jobs from zookeeper
498+ get_neptune_data_from_zookeeper
499+
500+ Djinn.log(Djinn::LOG_TRACE, "start_neptune_manager", "Completing old neptune jobs")
501+ # Wait for old jobs to complete
502+ complete_old_jobs(keyname)
503+
504+ end
505+
506+ def stop_neptune_manager
507+ Djinn.log(Djinn::LOG_TRACE, "stop_neptune_manager", "Stopping neptune manager")
508+
509+ if !my_node.is_neptune_manager?
510+ Djinn.log(Djinn::LOG_ERROR, "stop_neptune_manager", "I am not the neptune manager.")
511+ return
512+ end
513+
514+ my_node.remove_roles(DjinnJobData::ROLE_NEPTUNE_MGR)
515+ zk_remove_role(my_node,DjinnJobData::ROLE_NEPTUNE_MGR)
516+ Djinn.log(Djinn::LOG_TRACE, "stop_neptune_manager", "Removed role from Zookeeper")
517+
518+ keyname = @creds['keyname']
519+ zk_save_node(keyname,my_node)
520+ Djinn.log(Djinn::LOG_TRACE, "stop_neptune_manager", "neptune manager stopper")
521+ end
522+
523+ def start_app_manager
524+ Djinn.log(Djinn::LOG_TRACE, self.class, "Adding the role of app manager")
525+ my_node.add_roles(DjinnJobData::ROLE_APP_MGR)
526+ zk_add_role(my_node,DjinnJobData::ROLE_APP_MGR)
527+ keyname = @creds['keyname']
528+ zk_save_node(keyname,my_node)
529+ end
530+
531+ def stop_app_manager
532+ Djinn.log(Djinn::LOG_TRACE, self.class, "Stopping app manager")
533+
534+ if !my_node.is_app_manager?
535+ Djinn.log(Djinn::LOG_ERROR, "stop_app_manager", "I am not the app manager")
536+ return
537+ end
538+
539+ my_node.remove_roles(DjinnJobData::ROLE_APP_MGR)
540+ zk_remove_role(my_node,DjinnJobData::ROLE_APP_MGR)
541+ keyname = @creds['keyname']
542+ zk_save_node(keyname,my_node)
543+ end
544+
545 def start_blobstore_server
546 db_local_ip = @userappserver_private_ip
547 my_ip = my_node.public_ip
548@@ -1978,6 +2288,7 @@
549 start_cmd = "/usr/bin/memcached -d -m 32 -p 11211 -u root"
550 stop_cmd = "pkill memcached"
551 GodInterface.start(:memcached, start_cmd, stop_cmd, [11211])
552+
553 end
554
555 def stop_memcached()
556@@ -2025,6 +2336,7 @@
557 rescue SocketError
558 end
559 end
560+
561 end
562
563 # TODO: this function should use hadoop_helper
564@@ -2058,7 +2370,7 @@
565 # TODO: this function should use hadoop_helper
566 def start_hadoop_org()
567 i = my_node
568- return unless i.is_shadow? # change this later to db_master
569+ return unless i.is_db_master?
570 hadoop_home = File.expand_path("#{APPSCALE_HOME}/AppDB/hadoop-0.20.2/")
571 Djinn.log_run("#{hadoop_home}/bin/hadoop namenode -format 2>&1")
572 Djinn.log_run("#{hadoop_home}/bin/start-dfs.sh 2>&1")
573@@ -2123,9 +2435,14 @@
574 app_language = app_data.scan(/language:(\w+)/).flatten.to_s
575
576 # TODO: merge these
577- shadow = get_shadow
578- shadow_ip = shadow.private_ip
579- ssh_key = shadow.ssh_key
580+ appmanager = get_app_manager
581+ if appmanager == nil
582+ Djinn.log(Djinn::LOG_ERROR,"start_app_engine","App manager not found")
583+ return
584+ end
585+
586+ appmanager_ip = appmanager.private_ip
587+ ssh_key = appmanager.ssh_key
588 app_dir = "/var/apps/#{app}/app"
589 app_path = "#{app_dir}/#{app}.tar.gz"
590 FileUtils.mkdir_p(app_dir)
591@@ -2134,14 +2451,21 @@
592 HelperFunctions.setup_app(app)
593
594
595- if my_node.is_shadow?
596+ if my_node.is_app_manager?
597+ Djinn.log(Djinn::LOG_DEBUG, "start_appengine", "app_manager - updating cron and starting xmpp")
598 CronHelper.update_cron(my_public, app_language, app)
599 start_xmpp_for_app(app, app_language)
600 end
601
602 if my_node.is_appengine?
603+<<<<<<< TREE
604 app_number = @nginx_port - 8080
605 start_port = HelperFunctions::APP_START_PORT
606+=======
607+ Djinn.log(Djinn::LOG_DEBUG, "start_appengine", "appengine - starting app")
608+ app_number = @port - 8080
609+ start_port = 20000
610+>>>>>>> MERGE-SOURCE
611 static_handlers = HelperFunctions.parse_static_data(app)
612 proxy_port = HAProxy.app_listen_port(app_number)
613 login_ip = get_login.public_ip
614@@ -2210,13 +2534,14 @@
615 done_uploading(app, app_path, @@secret)
616 end
617
618- Monitoring.restart if my_node.is_shadow?
619+ Monitoring.restart if my_node.is_status_monitor?
620
621 if @app_names.include?("none")
622 @apps_loaded = @apps_loaded - ["none"]
623 @app_names = @app_names - ["none"]
624 end
625-
626+
627+ Djinn.log_debug("Loading #{app}")
628 @apps_loaded << app
629 }
630
631@@ -2272,7 +2597,7 @@
632
633 nodes_with_app = []
634 loop {
635- nodes_with_app = ZKInterface.get_app_hosters(appname)
636+ nodes_with_app = zk_get_app_hosters(appname)
637 break unless nodes_with_app.empty?
638 Djinn.log_debug("No nodes currently have a copy of app #{appname}, waiting...")
639 Kernel.sleep(5)
640@@ -2310,7 +2635,8 @@
641 # for app named baz, this translates to baz@login_ip
642
643 login_ip = get_login.public_ip
644- login_uac = UserAppClient.new(login_ip, @@secret)
645+ Djinn.log_debug("Login ip: #{login_ip}")
646+ login_uac = UserAppClient.new(@userappserver_private_ip, @@secret)
647 xmpp_user = "#{app}@#{login_ip}"
648 xmpp_pass = HelperFunctions.encrypt_password(xmpp_user, @@secret)
649 login_uac.commit_new_user(xmpp_user, xmpp_pass, "app")
650@@ -2407,4 +2733,339 @@
651 Djinn.log_debug("Stopping Sisyphus")
652 stop_app("sisyphus", @@secret)
653 end
654+
655+
656+ # Saves the node's role to zookeeper.
657+ # Interested nodes can set a watch on these role paths for changes
658+ def register_roles
659+ Djinn.log(Djinn::LOG_TRACE, self.class, "Registering roles with zookeeper")
660+
661+ me = my_node # save the node to avoid unnecessary calls to my_node
662+ jobs = me.jobs
663+ jobs.each { |job|
664+ zk_add_role(me,job)
665+ }
666+
667+ end
668+
669+ # Removes all the node's roles from zookeeper
670+ def unregister_roles
671+ Djinn.log(Djinn::LOG_TRACE, self.class, "Unregistering roles with zookeeper")
672+
673+ me = my_node # save the node to avoid unnecessary calls to my_node
674+ jobs = me.jobs
675+ jobs.each { |job|
676+ zk_add_role(me,job)
677+ }
678+
679+ end
680+
681+ def setup_watchers
682+ Djinn.log(Djinn::LOG_TRACE, self.class, "Setting up watchers...")
683+
684+ watch_role(DjinnJobData::ROLE_STATUS_MTR)
685+ watch_role(DjinnJobData::ROLE_APP_MGR)
686+ watch_role(DjinnJobData::ROLE_NEPTUNE_MGR)
687+
688+ end
689+
690+ def clear_votes(aRole)
691+ # Wait for some time so that everyone can cast a vote
692+ # This is not required for leader election, but for clearing the votes.
693+ # If the votes are cleared immediately, then a slow node can win,
694+ # resulting in two leaders
695+ sleep(10)
696+ ZKInterface.clear_votes(aRole)
697+ end
698+
699+ def watch_role(role)
700+ Djinn.log(Djinn::LOG_TRACE, self.class, "Watching role #{role}")
701+ if @watcher_callbacks[role].nil?
702+ Djinn.log(Djinn::LOG_TRACE, "watch_role", "Creating a callback for #{role}")
703+ @watcher_callbacks[role] = Zookeeper::WatcherCallback.new {
704+ role_change_notification(@watcher_callbacks[role].context)
705+ }
706+ end
707+ zk_get_actors(role,@watcher_callbacks[role])
708+ end
709+
710+public
711+ def role_change_notification(context)
712+ path = context[:path]
713+ # path returned would be of the form /roles/<role-name>
714+ ar = path.split('/')
715+ role = ar[-1]
716+ Djinn.log(Djinn::LOG_TRACE, self.class, "Received a role change notification for #{role}")
717+
718+ new_actors = zk_get_actors(role, @watcher_callbacks[role])
719+ Djinn.log(Djinn::LOG_DEBUG,"role_change_notification","actors of #{role}: #{new_actors}")
720+
721+ if (role == DjinnJobData::ROLE_STATUS_MTR) or (role == DjinnJobData::ROLE_APP_MGR) or (role == DjinnJobData::ROLE_NEPTUNE_MGR)
722+ Djinn.log(Djinn::LOG_DEBUG,"role_change_notification","should elect new leader")
723+ if new_actors.size == 0
724+ elect_new_actor(role)
725+ end
726+ end
727+ # Nodes might have changed after leader election. So get the latest nodes' state from zk
728+ keyname = @creds["keyname"]
729+ @my_index = nil
730+ @nodes = zk_get_nodes(keyname)
731+ end
732+
733+private
734+ def elect_new_actor(role)
735+ Djinn.log(Djinn::LOG_TRACE,"elect_new_actor","Electing a new actor for #{role}")
736+ my_ip = my_node.private_ip
737+ winner = ZKInterface.elect_leader(role,my_ip)
738+ Djinn.log(Djinn::LOG_TRACE,"elect_new_actor","Winner of #{role} election is #{winner}")
739+ if my_ip == winner
740+ action = "start_#{role}".to_sym
741+ send(action)
742+ else
743+ # Some other node was the winner
744+ # Update the node array
745+ @nodes.each { |n|
746+ if n.public_ip == winner
747+ n.add_roles(role)
748+ end
749+ }
750+ end
751+ ZKInterface.complete_election(role)
752+ end
753+
754+ def save_nodes_to_zookeeper
755+ Djinn.log(LOG_TRACE, self.class, "Saving nodes to zookeeper")
756+
757+ keyname = @creds["keyname"]
758+ @nodes.each { |node|
759+ zk_save_node(keyname, node)
760+ }
761+ end
762+
763+ def save_neptune_data_to_zookeeper
764+ Djinn.log(LOG_TRACE, self.class, "Saving neptune data to zookeeper")
765+
766+ keyname = @creds["keyname"]
767+
768+ Djinn.log(Djinn::LOG_TRACE, self.class, "Neptune jobs: #{@neptune_jobs}")
769+ job_names = @neptune_jobs.keys
770+ job_names.each { |nj_name|
771+ jobs = @neptune_jobs[nj_name]
772+ Djinn.log(Djinn::LOG_DEBUG, self.class, "Saving job #{jobs.class} #{jobs.class.name} #{jobs}")
773+ # jobs is again an array of jobs with the same name
774+ jobs.each{ |j|
775+ Djinn.log(Djinn::LOG_DEBUG, self.class, "Saving job #{j.class} #{j.class.name} #{j}")
776+ zk_neptune_save_job(keyname,j.job_id,j)
777+ }
778+ }
779+ end
780+
781+ def get_neptune_data_from_zookeeper
782+ Djinn.log(LOG_TRACE, self.class, "Getting neptune data from zookeeper")
783+ keyname = @creds["keyname"]
784+ @neptune_jobs = zk_neptune_get_jobs(keyname)
785+ end
786+
787+ def get_neptune_nodes(nodes)
788+ Djinn.log(LOG_TRACE,self.class,"Filtering neptune nodes from all nodes")
789+ neptune_nodes = []
790+ nodes.each { |node|
791+ neptune_nodes << node if node.neptune_worker
792+ }
793+ return neptune_nodes
794+ end
795+
796+ ###############################################
797+ ## Zookeeper helper functions ##
798+ ###############################################
799+
800+ # Saves the application name in zookeeper
801+ # Path:
802+ # ROOT_APP_PATH/app_name/private_ip => serialized node data
803+ # ROOT_APP_PATH/app_name/private_ip/app_file => location of the app tar file
804+ #
805+ def zk_add_app_entry(appname, node, location)
806+ Djinn.log(Djinn::LOG_TRACE, self.class, "Adding an app entry for #{appname}")
807+
808+ appname_path = ROOT_APP_PATH + "/#{appname}"
809+ ZKInterface.ensure_path(appname_path)
810+
811+ node_path = appname_path + "/#{node.private_ip}"
812+
813+ # Save the node data in the zknode
814+ ZKInterface.set(node_path, node.serialize, ZKInterface::EPHEMERAL)
815+ # Save the location of the file
816+
817+ file_location = node_path + "/app_file"
818+ ZKInterface.set(file_location, location, ZKInterface::NOT_EPHEMERAL)
819+ end
820+
821+ # Removes the application name from zookeeper
822+ def zk_remove_app_entry(appname)
823+ Djinn.log(Djinn::LOG_TRACE, self.class, "Removing app entry for #{appname}")
824+
825+ appname_path = ROOT_APP_PATH + "/#{appname}"
826+ ZKInterface.delete(appname_path)
827+ end
828+
829+ # Gets the nodes that are serving the given application
830+ # Path:
831+ # ROOT_APP_PATH/app_name/private_ip => serialized node data
832+ # ROOT_APP_PATH/app_name/private_ip/app_file => location of the app tar file
833+ def zk_get_app_hosters(appname)
834+ Djinn.log(Djinn::LOG_TRACE, self.class, "Getting app hosters")
835+
836+ appname_path = ROOT_APP_PATH + "/#{appname}"
837+ app_hoster_ips = []
838+ app_hoster_ips = ZKInterface.get_children(appname_path)
839+ hoster_nodes = []
840+ app_hoster_ips.each { |ip|
841+ node_data = ZKInterface.get(appname_path + "/#{ip}")
842+ hoster_nodes << DjinnJobData.deserialize(node_data)
843+ }
844+ return hoster_nodes
845+ end
846+
847+
848+ # Add the node's role to zookeeper.
849+ def zk_add_role(node, role)
850+ Djinn.log(Djinn::LOG_TRACE, self.class, "Adding the role #{role} to #{node.private_ip}")
851+
852+ path = ROLE_PATH + "/#{role}"
853+ ZKInterface.ensure_path(path)
854+
855+ path = ROLE_PATH + "/#{role}/#{node.private_ip}"
856+ ZKInterface.set(path, DEFAULT_DATA, ZKInterface::EPHEMERAL)
857+ end
858+
859+ # Remove the node's role from zookeeper.
860+ def zk_remove_role(node, role)
861+ Djinn.log(Djinn::LOG_TRACE, self.class, "Removing the role #{role} to #{node.private_ip}")
862+
863+ path = ROLE_PATH + "/#{role}/#{node.private_ip}"
864+ ZKInterface.delete(path)
865+ end
866+
867+ # Gets the actors of the given role
868+ def zk_get_actors(role, callback = nil)
869+ Djinn.log(Djinn::LOG_TRACE, self.class, "Getting actors for #{role}")
870+
871+ path = ROLE_PATH + "/#{role}"
872+ actors = ZKInterface.get_children(path, callback)
873+ return actors
874+ end
875+
876+ # Save the node into zookeeper
877+ def zk_save_node(keyname, node)
878+
879+ Djinn.log(Djinn::LOG_TRACE, self.class, "Saving node to zookeeper")
880+
881+ private_ip = node.private_ip
882+ public_ip = node.public_ip
883+ instance_id = node.instance_id
884+ cloud = node.cloud
885+ ssh_key = node.ssh_key
886+ c_time = node.creation_time
887+ d_time = node.destruction_time
888+ jobs = node.jobs.join(":")
889+ n_worker = node.neptune_worker
890+
891+ if c_time == nil
892+ c_time = Time.mktime(EPOCH_YEAR)
893+ end
894+ c_time = c_time._dump
895+
896+ if d_time == nil
897+ d_time = Time.mktime(NEVER_DESTROY)
898+ end
899+ d_time = d_time._dump
900+
901+ Djinn.log(Djinn::LOG_DEBUG, "zk_save_node", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time}, #{d_time}, #{jobs}, #{n_worker}")
902+
903+ path = "/#{keyname}/nodes"
904+ ZKInterface.ensure_path(path)
905+
906+ path = "/#{keyname}/nodes/#{node.private_ip}"
907+ ZKInterface.set(path,node.public_ip,ZKInterface::NOT_EPHEMERAL)
908+
909+ ZKInterface.set(path+"/instance_id", instance_id,ZKInterface::NOT_EPHEMERAL)
910+ ZKInterface.set(path+"/cloud", cloud,ZKInterface::NOT_EPHEMERAL)
911+ ZKInterface.set(path+"/ssh_key", ssh_key,ZKInterface::NOT_EPHEMERAL)
912+ ZKInterface.set(path+"/c_time", c_time.to_s,ZKInterface::NOT_EPHEMERAL)
913+ ZKInterface.set(path+"/d_time", d_time.to_s,ZKInterface::NOT_EPHEMERAL)
914+ ZKInterface.set(path+"/jobs", jobs,ZKInterface::NOT_EPHEMERAL)
915+ ZKInterface.set(path+"/n_worker", n_worker.to_s,ZKInterface::NOT_EPHEMERAL)
916+
917+ end
918+
919+ # Get the node data from zookeeper
920+ def zk_get_nodes(keyname)
921+
922+ Djinn.log(Djinn::LOG_TRACE, self.class, "Getting nodes from zookeeper")
923+
924+ nodes = []
925+ ip_path = "/#{keyname}/nodes"
926+ node_ips = ZKInterface.get_children(ip_path)
927+ node_ips.each { |ip|
928+ path = ip_path + "/#{ip}"
929+ Djinn.log(Djinn::LOG_DEBUG, "zk_get_nodes", "IP : #{ip}")
930+
931+ private_ip = ip
932+ public_ip = ZKInterface.get(ip_path+"/#{ip}")
933+ next if public_ip == ZKInterface::FAILURE
934+
935+ instance_id = ZKInterface.get(ip_path+"/#{ip}/instance_id")
936+ next if instance_id == ZKInterface::FAILURE
937+
938+ cloud = ZKInterface.get(ip_path+"/#{ip}/cloud")
939+ next if cloud == ZKInterface::FAILURE
940+
941+ ssh_key = ZKInterface.get(ip_path+"/#{ip}/ssh_key")
942+ next if ssh_key == ZKInterface::FAILURE
943+
944+ c_time = ZKInterface.get(ip_path+"/#{ip}/c_time")
945+ next if c_time == ZKInterface::FAILURE
946+
947+ d_time = ZKInterface.get(ip_path+"/#{ip}/d_time")
948+ next if d_time == ZKInterface::FAILURE
949+
950+ jobs = ZKInterface.get(ip_path+"/#{ip}/jobs")
951+ next if jobs == ZKInterface::FAILURE
952+
953+ n_worker = ZKInterface.get(ip_path+"/#{ip}/n_worker")
954+ n_worker = n_worker.to_sym
955+ next if n_worker == ZKInterface::FAILURE
956+
957+ Djinn.log(Djinn::LOG_DEBUG, "zk_get_nodes", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time.to_s}, #{d_time.to_s}, #{jobs}, #{n_worker}")
958+
959+ if jobs.nil?
960+ jobs = ["open"]
961+ end
962+
963+ # Constructor is wrong 'cause the roles parameter expects ip, cloud name, et al along with the jobs
964+ # TODO: Change the constructor?
965+ node = DjinnJobData.new(jobs, keyname)
966+ node.private_ip = private_ip
967+ node.public_ip = public_ip
968+ node.instance_id = instance_id
969+ node.cloud = cloud
970+ node.ssh_key = ssh_key
971+ node.add_roles(jobs)
972+ node.creation_time = Time._load(c_time)
973+ node.destruction_time = Time._load(d_time)
974+ node.neptune_worker = n_worker
975+
976+ nodes << node
977+ }
978+ @my_index = nil
979+ return nodes
980+ end
981+
982+ def zk_delete_node(keyname,node)
983+ Djinn.log(Djinn::LOG_TRACE,self.class,"Deleting node #{node.private_ip}")
984+
985+ path = "/#{keyname}/nodes/#{node.private_ip}"
986+ ZKInterface.delete(path)
987+ end
988+
989 end
990
991=== modified file 'AppController/lib/djinn_job_data.rb'
992--- AppController/lib/djinn_job_data.rb 2012-02-19 04:55:17 +0000
993+++ AppController/lib/djinn_job_data.rb 2012-02-28 20:21:24 +0000
994@@ -15,10 +15,30 @@
995 # also contains info about when we spawned the node (helpful for optimizing
996 # costs, which may charge on an hourly basis).
997 class DjinnJobData
998+
999+ # Roles
1000+ ROLE_APP_ENGINE = "appengine"
1001+ ROLE_APP_MGR = "app_manager"
1002+ ROLE_DB_MASTER = "db_master"
1003+ ROLE_DB_SLAVE = "db_slave"
1004+ ROLE_LB = "load_balancer"
1005+ ROLE_LOGIN = "login"
1006+ ROLE_MEMCACHE = "memcache"
1007+ ROLE_NEPTUNE_MGR= "neptune_manager"
1008+ ROLE_OPEN = "open"
1009+ ROLE_STATUS_MTR = "status_monitor"
1010+ ROLE_ZOOKEEPER = "zookeeper"
1011+
1012 attr_accessor :public_ip, :private_ip, :jobs, :instance_id, :cloud, :ssh_key
1013 attr_accessor :creation_time, :destruction_time, :failed_heartbeats
1014+<<<<<<< TREE
1015
1016
1017+=======
1018+ # If the node is a neptune worker then this is set to true
1019+ attr_accessor :neptune_worker
1020+
1021+>>>>>>> MERGE-SOURCE
1022 def initialize(roles, keyname)
1023 # format: "publicIP:privateIP:load_balancer:appengine:table-master:table-slave:instance_id:cloud"
1024
1025@@ -46,6 +66,7 @@
1026 @failed_heartbeats = 0
1027
1028 appscale_jobs = ["load_balancer", "shadow"]
1029+ appscale_jobs += ["status_monitor", "app_manager", "neptune_manager"]
1030 appscale_jobs += ["db_master", "db_slave"]
1031 appscale_jobs += ["zookeeper"]
1032 appscale_jobs += ["login"]
1033@@ -59,12 +80,15 @@
1034 appscale_jobs.each { |job|
1035 @jobs << job if roles.include?(job)
1036 }
1037+
1038+ @neptune_worker = false
1039 end
1040
1041 def add_roles(roles)
1042 new_jobs = roles.split(":")
1043 @jobs = (@jobs + new_jobs).uniq
1044 @jobs.delete("open")
1045+ @jobs = ["open"] if @jobs.size == 0
1046 end
1047
1048 def remove_roles(roles)
1049@@ -75,6 +99,7 @@
1050
1051 def set_roles(roles)
1052 @jobs = roles.split(":")
1053+ @jobs = ["open"] if @jobs.size == 0
1054 end
1055
1056 # not the best name for this but basically correct
1057
1058=== modified file 'AppController/lib/repo.rb'
1059--- AppController/lib/repo.rb 2012-02-27 07:19:48 +0000
1060+++ AppController/lib/repo.rb 2012-02-28 20:21:24 +0000
1061@@ -95,6 +95,243 @@
1062 Djinn.log_debug(`pkill -f DevAppServerMain`)
1063 end
1064
1065-
1066+<<<<<<< TREE
1067+
1068+=======
1069+ def self.valid_storage_creds(storage, creds)
1070+ if storage == "appdb"
1071+ valid = true
1072+ elsif storage == "s3"
1073+ conn = self.get_s3_conn(creds)
1074+ begin
1075+ all_buckets = conn.list_all_my_buckets
1076+ Djinn.log_debug("this user owns these buckets: [#{all_buckets.join(', ')}]")
1077+ valid = true
1078+ rescue RightAws::AwsError
1079+ valid = false
1080+ end
1081+ end
1082+
1083+ Djinn.log_debug("did user provide valid storage creds? #{valid}")
1084+ end
1085+
1086+ def self.set_output(path, output, storage="appdb", creds={}, is_file=false)
1087+ return self.set(path, output, :output, storage, creds, is_file)
1088+ end
1089+
1090+ def self.get_output(path, storage="appdb", creds={}, is_file=false)
1091+ return self.get(path, :output, storage, creds, is_file)
1092+ end
1093+
1094+ def self.set_acl(path, new_acl, storage="appdb", creds={})
1095+ return self.set(path, new_acl, :acl, storage, creds)
1096+ end
1097+
1098+ def self.get_acl(path, storage="appdb", creds={})
1099+ return self.get(path, :acl, storage, creds)
1100+ end
1101+
1102+ def self.does_file_exist?(path, storage="appdb", creds={})
1103+ if storage == "appdb"
1104+ result = `curl http://#{@@ip}:8079/doesexist -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{path}'`
1105+ elsif storage == "s3"
1106+ conn = self.get_s3_conn(creds)
1107+ bucket, file = self.parse_s3_key(path)
1108+
1109+ if self.does_s3_bucket_exist?(conn, bucket)
1110+ Djinn.log_debug("[does file exist] bucket [#{bucket}] exists")
1111+ begin
1112+ Djinn.log_debug("[does file exist] getting acl for bucket [#{bucket}] and file [#{file}] ")
1113+ conn.get_acl(bucket, file)
1114+ result = "true"
1115+ rescue RightAws::AwsError
1116+ result = "false"
1117+ end
1118+ else
1119+ Djinn.log_debug("[does file exist] bucket [#{bucket}] does not exist")
1120+ result = "false"
1121+ end
1122+ else
1123+ msg = "ERROR - unrecognized storage for does_file_exist via repo - you requested #{storage}"
1124+ Djinn.log_debug(msg)
1125+ abort(msg)
1126+ end
1127+
1128+ Djinn.log_debug("does key=#{path} exist? #{result}")
1129+ return result == "true"
1130+ end
1131+
1132+ private
1133+
1134+ def self.get(key, type, storage, creds, is_file=false)
1135+ if storage == "appdb"
1136+ Djinn.log(Djinn::LOG_INFO, "repo::get", "Using the secret #{@@secret}")
1137+ result = `curl http://#{@@ip}:8079/get -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{key}' -d 'TYPE=#{type}'`
1138+ result = URI.unescape(result)
1139+ foo = File.new(is_file, File::CREAT|File::RDWR)
1140+ foo.write(result)
1141+ foo.close
1142+ elsif storage == "s3"
1143+ conn = self.get_s3_conn(creds)
1144+ bucket, file = self.parse_s3_key(key)
1145+
1146+ if type == :output
1147+ if is_file
1148+ foo = File.new(is_file, File::CREAT|File::RDWR)
1149+ result = conn.get(bucket, file) { |chunk|
1150+ foo.write(chunk)
1151+ }
1152+ foo.close
1153+ else
1154+ result = conn.get(bucket, file)[:object]
1155+ end
1156+ elsif type == :acl
1157+ # TODO: implement me!
1158+ result = "private"
1159+ else
1160+ msg = "type not supported for get operation - #{type} was used"
1161+ abort(msg)
1162+ end
1163+ else
1164+ msg = "ERROR - unrecognized storage for get via repo - you requested #{storage}"
1165+ Djinn.log_debug(msg)
1166+ abort(msg)
1167+ end
1168+
1169+ Djinn.log_debug("get key=#{key} type=#{type}")
1170+ return result
1171+ end
1172+
1173+ def self.set(key, val, type, storage, creds, is_file=false)
1174+ if storage == "appdb"
1175+ if is_file
1176+ if File.directory?(val)
1177+ result = true
1178+ `ls #{val}`.split.each { |file|
1179+ fullkey = key + "/" + file
1180+ fullval = val + "/" + file
1181+ Djinn.log_debug("recursive dive - now saving remote [#{fullkey}], local [#{fullval}]")
1182+ temp = self.set(fullkey, fullval, type, storage, creds, is_file)
1183+ result = false unless temp
1184+ file = fullkey
1185+ }
1186+ else
1187+ Djinn.log_debug("attempting to put local file #{val} into file #{key}")
1188+ val = HelperFunctions.read_file(val, chomp=false)
1189+ val = URI.escape(val)
1190+ result = false
1191+ begin
1192+ res = Net::HTTP.post_form(URI.parse("http://#{@@ip}:8079/set"),
1193+ {'SECRET' => @@secret, 'KEY' => key,
1194+ 'VALUE' => val, 'TYPE' => type})
1195+ Djinn.log_debug("set key=#{key} type=#{type} returned #{res.body}")
1196+ result = true if res.body == "success"
1197+ rescue Exception => e
1198+ Djinn.log_debug("saw exception #{e.class} when posting userdata to repo at #{key}")
1199+ end
1200+
1201+ end
1202+ else
1203+ Djinn.log_debug("attempting to put local file #{val} into bucket #{bucket}, location #{file}")
1204+ val = URI.escape(val, Regexp.new("[^#{URI::PATTERN::UNRESERVED}]"))
1205+ result = `curl http://#{@@ip}:8079/set -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{key}' -d 'VALUE=#{val}' -d 'TYPE=#{type}'`
1206+ Djinn.log_debug("set key=#{key} type=#{type} returned #{result}")
1207+ result = true if result == "success"
1208+ end
1209+ elsif storage == "s3"
1210+ conn = self.get_s3_conn(creds)
1211+ bucket, file = self.parse_s3_key(key)
1212+
1213+ if type == :output
1214+ # TODO: for now we assume the bucket exists
1215+ #if !self.does_s3_bucket_exist?(conn, bucket)
1216+ # Djinn.log_debug("bucket #{bucket} does not exist - creating it now")
1217+ # conn.create_bucket(bucket)
1218+
1219+ # bucket creation takes a few moments - wait for it to exist
1220+ # before we start putting things in it
1221+ # loop {
1222+ # Djinn.log_debug("waiting for s3 bucket #{bucket} to exist")
1223+ # sleep(5)
1224+ # break if self.does_s3_bucket_exist?(conn, bucket)
1225+ # }
1226+ #end
1227+
1228+ Djinn.log_debug("s3 bucket #{bucket} exists, now storing file #{file}")
1229+
1230+ # this throws an exception that gets automatically caught and logged
1231+ # looks like "undefined method `pos' for <String...>"
1232+ # the put operation still succeeds
1233+ if is_file
1234+ if File.directory?(val)
1235+ result = true
1236+ `ls #{val}`.split.each { |file|
1237+ fullkey = key + "/" + file
1238+ fullval = val + "/" + file
1239+ Djinn.log_debug("recursive dive - now saving remote [#{fullkey}], local [#{fullval}]")
1240+ temp = self.set(fullkey, fullval, type, storage, creds, is_file)
1241+ result = false unless temp
1242+ file = fullkey
1243+ }
1244+ else
1245+ Djinn.log_debug("attempting to put local file #{val} into bucket #{bucket}, location #{file}")
1246+ result = conn.put(bucket, file, File.open(val)) #headers={"Content-Length" => val.length})
1247+ end
1248+ else
1249+ result = conn.put(bucket, file, val, headers={"Content-Length" => val.length})
1250+ end
1251+
1252+ Djinn.log_debug("done putting file #{file} to s3!")
1253+ elsif type == :acl
1254+ # TODO: implement me!
1255+ return false
1256+ else
1257+ msg = "type not supported for get operation - #{type} was used"
1258+ abort(msg)
1259+ end
1260+ else
1261+ msg = "ERROR - unrecognized storage for set via repo - you requested #{storage}"
1262+ Djinn.log_debug(msg)
1263+ abort(msg)
1264+ end
1265+
1266+ Djinn.log_debug("set operation returned #{result}")
1267+ return result
1268+ end
1269+
1270+ def self.get_s3_conn(creds)
1271+ access_key = creds['EC2_ACCESS_KEY']
1272+ secret_key = creds['EC2_SECRET_KEY']
1273+
1274+ s3_url = creds['S3_URL']
1275+
1276+ obscured_a_key = HelperFunctions.obscure_string(access_key)
1277+ obscured_s_key = HelperFunctions.obscure_string(secret_key)
1278+
1279+ Djinn.log_debug("creating S3 connection with access key [#{obscured_a_key}], secret key [#{obscured_s_key}], and S3 url [#{s3_url}]")
1280+
1281+ old_s3_url = ENV['S3_URL']
1282+ ENV['S3_URL'] = s3_url
1283+ conn = RightAws::S3Interface.new(access_key, secret_key)
1284+ ENV['S3_URL'] = old_s3_url
1285+
1286+ return conn
1287+ end
1288+
1289+ def self.parse_s3_key(key)
1290+ paths = key.split("/")
1291+ bucket = paths[1]
1292+ file = paths[2, paths.length - 1].join("/")
1293+ return bucket, file
1294+ end
1295+
1296+ def self.does_s3_bucket_exist?(conn, bucket)
1297+ all_buckets = conn.list_all_my_buckets
1298+ bucket_names = all_buckets.map { |b| b[:name] }
1299+ bucket_exists = bucket_names.include?(bucket)
1300+ Djinn.log_debug("the user owns buckets [#{bucket_names.join(', ')}] - do they own [#{bucket}]? #{bucket_exists}")
1301+ return bucket_exists
1302+ end
1303+>>>>>>> MERGE-SOURCE
1304 end
1305
1306
1307=== added file 'AppController/lib/role_watcher.rb'
1308--- AppController/lib/role_watcher.rb 1970-01-01 00:00:00 +0000
1309+++ AppController/lib/role_watcher.rb 2012-02-28 20:21:24 +0000
1310@@ -0,0 +1,110 @@
1311+#! /usr/bin/ruby -w
1312+
1313+require 'rubygems'
1314+require 'zookeeper'
1315+#require 'djinn'
1316+
1317+def listener(aContext)
1318+ puts("Callback!")
1319+end
1320+
1321+# Base class for monitoring changes to roles
1322+class RoleWatcher
1323+
1324+ private #constants
1325+ ZOOKEEPER_PORT = 2181
1326+
1327+ public
1328+
1329+ def initialize(aZookeeper, aCallback)
1330+ cb = aCallback.to_sym
1331+ send(cb,"test")
1332+ @mZK = Zookeeper.new("#{aZookeeper}:#{ZOOKEEPER_PORT}")
1333+ @mCallback = Zookeeper::WatcherCallback.new {
1334+ #notification(@mCallback.context)
1335+ send(cb,@mCallback.context)
1336+ }
1337+ end
1338+
1339+ # Start a watch on the specified node
1340+ # the notification callback is invoked whenever the node value changes
1341+ def watch_node(aNode)
1342+ @mZK.get( :path => aNode,
1343+ :watcher => @mCallback,
1344+ :watcher_context => {:path => aNode}
1345+ )[:data]
1346+ end
1347+
1348+ def watch_children(aNode)
1349+ @mZK.get_children(:path => aNode,
1350+ :watcher => @mCallback,
1351+ :watcher_context => {:path => aNode}
1352+ )
1353+ end
1354+
1355+ protected
1356+
1357+ # Override
1358+ # This method is invoked whenever the watched node value changes
1359+ def notification(aContext)
1360+ end
1361+
1362+end
1363+
1364+
1365+# Subclass for watching over the status monitor
1366+class StatusMonitorWatcher < RoleWatcher
1367+
1368+ public
1369+ def initialize(aZookeeper, aCallback)
1370+ super(aZookeeper)
1371+ @previousNodes
1372+ end
1373+
1374+ protected
1375+ def notification(aContext)
1376+ watchedPath = aContext[:path]
1377+ # Get the new data and set the watch again
1378+ newData = watch_children(watchedPath)
1379+ #Djinn.log_debug("Status monitor : #{newData.first}")
1380+ puts("Status monitor : #{newData.first}")
1381+ end
1382+end
1383+
1384+
1385+# Subclass for watching over the app engine
1386+class AppEngineWatcher < RoleWatcher
1387+
1388+ public
1389+ def initialize(aZookeeper, aCallback)
1390+ super(aZookeeper,aCallback)
1391+ @previousNodes
1392+ end
1393+
1394+ protected
1395+ def notification(aContext)
1396+ watchedPath = aContext[:path]
1397+ # Get the new data and set the watch again
1398+ newData = watch_children(watchedPath)
1399+ #Djinn.log_debug("AppEngine :")
1400+ puts("AppEngine:")
1401+ newData.each { |node|
1402+ #Djinn.log_debug(node)
1403+ puts(node)
1404+ }
1405+ end
1406+end
1407+
1408+if __FILE__ == $0
1409+ if ARGV.size != 1
1410+ puts("#{$0} <zk host>")
1411+ exit
1412+ end
1413+ aeWatcher = AppEngineWatcher.new(ARGV[0], "listener")
1414+ ae = aeWatcher.watch_children("/roles/appengine")
1415+ ae.each { |n|
1416+ puts(n)
1417+ }
1418+
1419+ a = STDIN.getc
1420+end
1421
1422=== modified file 'AppController/lib/zkinterface.rb'
1423--- AppController/lib/zkinterface.rb 2012-02-18 07:20:06 +0000
1424+++ AppController/lib/zkinterface.rb 2012-02-28 20:21:24 +0000
1425@@ -2,11 +2,15 @@
1426
1427 require 'fileutils'
1428 require 'monitor'
1429+require 'rubygems'
1430+require 'zookeeper'
1431+require 'zookeeper/exceptions'
1432
1433
1434 $:.unshift File.join(File.dirname(__FILE__))
1435 require 'helperfunctions'
1436
1437+<<<<<<< TREE
1438
1439 SUCCESS = 0
1440
1441@@ -20,6 +24,12 @@
1442
1443
1444 ROOT_APP_PATH = "/apps"
1445+=======
1446+DEFAULT_DATA = "nothing special here"
1447+
1448+EPOCH_YEAR = 1970
1449+NEVER_DESTROY = 2100
1450+>>>>>>> MERGE-SOURCE
1451
1452
1453 # The AppController employs the open source software ZooKeeper as a highly
1454@@ -28,58 +38,68 @@
1455 # communicate with ZooKeeper, and automates commonly performed functions by the
1456 # AppController.
1457 class ZKInterface
1458- public
1459-
1460+ private
1461+ RETRY_COUNT = 10
1462+ ELECTION_PATH = "/roles/election"
1463+
1464+ public
1465+ EPHEMERAL = true
1466+ NOT_EPHEMERAL = false
1467+ SEQUENCE = true
1468+
1469+ SUCCESS = 0
1470+ FAILURE = -1
1471+
1472+
1473+ public
1474 def self.init(my_node, all_nodes)
1475 require 'rubygems'
1476 require 'zookeeper'
1477
1478+ @mMyNode = my_node
1479+ @mAllNodes = all_nodes
1480+ #@@lock = nil
1481+ #@@zk = nil
1482+ self.connect(my_node,all_nodes)
1483+ end
1484+
1485+ def self.connect(my_node, all_nodes)
1486+ require 'rubygems'
1487+ require 'zookeeper'
1488+
1489+ #if @@lock == nil
1490 unless defined?(@@lock)
1491 @@lock = Monitor.new
1492 end
1493
1494 zk_location = self.get_zk_location(my_node, all_nodes)
1495+ log(Djinn::LOG_INFO,"Zookeeper location: #{zk_location}")
1496
1497 @@lock.synchronize {
1498 @@zk = Zookeeper.new(zk_location)
1499+ log(Djinn::LOG_TRACE, "Zookeeper: #{@@zk}")
1500 }
1501- end
1502-
1503- def self.add_app_entry(appname, ip, location)
1504- appname_path = ROOT_APP_PATH + "/#{appname}"
1505- full_path = appname_path + "/#{ip}"
1506-
1507- # can't just create path in ZK
1508- # need to do create the nodes at each level
1509-
1510- self.set(ROOT_APP_PATH, "nothing special here", NOT_EPHEMERAL)
1511- self.set(appname_path, "nothing special here", NOT_EPHEMERAL)
1512- self.set(full_path, location, EPHEMERAL)
1513- end
1514-
1515- def self.remove_app_entry(appname)
1516- appname_path = ROOT_APP_PATH + "/#{appname}"
1517- self.delete(appname_path)
1518- end
1519-
1520- def self.get_app_hosters(appname)
1521- unless defined?(@@zk)
1522- return []
1523+
1524+ end
1525+
1526+ def self.ensure_path(path, data = DEFAULT_DATA)
1527+ self.log(Djinn::LOG_TRACE, "Ensuring path #{path}")
1528+ exists = self.get(path)
1529+ if exists == FAILURE
1530+ # find the last occurence of '/' and ensure path before that exists
1531+ index = path.rindex('/')
1532+ if index > 0 # if the last '/' is the first, then thats the root
1533+ parent = path[0..index-1]
1534+ self.ensure_path(parent, data)
1535+ end
1536+ self.set(path,data,ZKInterface::NOT_EPHEMERAL)
1537 end
1538-
1539- appname_path = ROOT_APP_PATH + "/#{appname}"
1540- app_hosters = self.get_children(appname_path)
1541- #converted = app_hosters
1542- converted = []
1543- app_hosters.each { |serialized|
1544- converted << DjinnJobData.deserialize(serialized)
1545- }
1546- return converted
1547 end
1548
1549- private
1550-
1551+ # Gets the value stored in the zk node identified by the path
1552+ # If connection to the zookeeper cannot be established, then it retries 'RETRY_COUNT' times
1553 def self.get(key)
1554+<<<<<<< TREE
1555 Djinn.log_debug("[ZK] trying to get #{key}")
1556 info = @@zk.get(:path => key)
1557 if info[:rc] == 0
1558@@ -117,10 +137,90 @@
1559 end
1560 end
1561
1562+=======
1563+ self.log(Djinn::LOG_TRACE,"trying to get #{key}")
1564+ RETRY_COUNT.times { |retry_count|
1565+ begin
1566+ info = @@zk.get(:path => key)
1567+ if info[:rc] == 0
1568+ return info[:data]
1569+ else
1570+ Djinn.log(Djinn::LOG_ERROR, "ZK::get", "Data not found at #{key}")
1571+ return FAILURE
1572+ end
1573+ # TODO: Handle exceptions better.
1574+ rescue Exception
1575+ log(Djinn::LOG_ERROR, "#{retry_count+1} reopening connection to zk")
1576+ @@zk.reopen
1577+ end
1578+ }
1579+ end
1580+
1581+ # Gets the children of the zk node identified by the path
1582+ # If connection to the zookeeper cannot be established, then it retries 'RETRY_COUNT' times
1583+ def self.get_children(key, callback=nil)
1584+ self.log(Djinn::LOG_TRACE,"[ZK] trying to get children #{key}")
1585+ RETRY_COUNT.times { |retry_count|
1586+ begin
1587+ children = nil
1588+ if callback == nil
1589+ children = @@zk.get_children(:path => key)[:children]
1590+ else
1591+ self.log(Djinn::LOG_TRACE,"[ZK] watching the children at #{key}. Watcher : #{callback}")
1592+ children = @@zk.get_children(:path => key,
1593+ :watcher => callback,
1594+ :watcher_context => {:path => key}
1595+ )[:children]
1596+ end
1597+ if children.nil?
1598+ Djinn.log(Djinn::LOG_ERROR, "ZK::get_children", "Data not found at #{key}")
1599+ return []
1600+ else
1601+ return children
1602+ end
1603+ # TODO: Handle exceptions better.
1604+ rescue Exception
1605+ log(Djinn::LOG_ERROR, "#{retry_count+1} reopening connection to zk")
1606+ @@zk.reopen
1607+ end
1608+ }
1609+ end
1610+
1611+ # Sets the key to val. If the key does not exist, then the key is created. else the value is updated.
1612+ # ephemeral (boolean) - flag to indicate whether the zk node should be ephemeral or not
1613+ # sequence (boolean) - flag to indicate whether zookeeper should create the node with a sequence number added automatically
1614+ def self.set(key, val, ephemeral, sequence = false)
1615+ self.log(Djinn::LOG_TRACE, "[ZK] trying to set #{key} to #{val} with ephemeral = #{ephemeral} and sequence = #{sequence}")
1616+ exists = self.get(key)
1617+ if exists == FAILURE
1618+ self.log(Djinn::LOG_DEBUG, "Creating #{key}")
1619+ info = @@zk.create(:path => key, :ephemeral => ephemeral, :data => val, :sequence => sequence)
1620+ if info[:rc] == 0
1621+ self.log(Djinn::LOG_DEBUG,"Success!")
1622+ return SUCCESS
1623+ else
1624+ self.log(Djinn::LOG_DEBUG,"Failure")
1625+ return FAILURE
1626+ end
1627+ else
1628+ info = @@zk.set(:path => key, :data => val)
1629+ if info[:rc] == 0
1630+ self.log(Djinn::LOG_DEBUG,"Success!")
1631+ return SUCCESS
1632+ else
1633+ self.log(Djinn::LOG_DEBUG,"Failure")
1634+ return FAILURE
1635+ end
1636+ end
1637+ end
1638+
1639+ # Deletes the key from zookeeper. If the key is not a leaf node, then the subtree is recursively deleted.
1640+>>>>>>> MERGE-SOURCE
1641 def self.delete(key)
1642- Djinn.log_debug("[ZK] trying to delete #{key}")
1643+ self.log(Djinn::LOG_TRACE,"[ZK] trying to delete #{key}")
1644
1645 child_info = @@zk.get_children(:path => key)
1646+
1647 return SUCCESS if child_info.nil? or child_info[:stat].nil?
1648 return SUCCESS if child_info[:stat].numChildren.nil?
1649
1650@@ -139,6 +239,60 @@
1651 end
1652 end
1653
1654+ # Starts a leader election and returns the ip of the leader
1655+ def self.elect_leader(role,node_ip)
1656+ self.log(Djinn::LOG_TRACE,"Electing a new leader for the role of #{role}. My ip is #{node_ip}")
1657+ self.cast_vote(role, node_ip)
1658+ winner = self.get_winner(role)
1659+ return winner
1660+ end
1661+
1662+ # Delete the votes of the election
1663+ def self.complete_election(role)
1664+ self.log(Djinn::LOG_TRACE,"Completing the election for the role #{role}")
1665+ # Sleep for 10 seconds to ensure all the nodes have participates in the election
1666+ # TODO: How do we do this better?
1667+ sleep(10)
1668+ path = ZKInterface::ELECTION_PATH + "/#{role}"
1669+ self.delete(path)
1670+ end
1671+
1672+private
1673+
1674+ # Voting is just trying to create a sequential node.
1675+ # Node with the lowest sequence number wins.
1676+ def self.cast_vote(role,node_ip)
1677+ self.log(Djinn::LOG_TRACE,"Casting vote for #{role}")
1678+ path = ZKInterface::ELECTION_PATH + "/#{role}"
1679+ self.ensure_path(path)
1680+ self.set(path+"/vote_",node_ip,EPHEMERAL,SEQUENCE)
1681+ end
1682+
1683+ # Winner is the node with the lowest sequence number
1684+ def self.get_winner(role)
1685+ self.log(Djinn::LOG_TRACE,"Getting the election winner for #{role}")
1686+ path = ZKInterface::ELECTION_PATH + "/#{role}"
1687+ votes = self.get_children(path)
1688+ # TODO: What is the ordering of the children?
1689+ # If it is alphabetically ordered, then we can assume that the first node is the winner
1690+ # But since we do not know that yet, looping through all votes to find the smallest
1691+ lowest_vote = nil
1692+ votes.each { |vote|
1693+ lowest_vote = vote if lowest_vote.nil?
1694+
1695+ # Compare each vote with the current lowest vote.
1696+ # If vote is smaller than the current lowest vote, assign it to the lowest vote.
1697+ cmpr = lowest_vote <=> vote
1698+ if cmpr == 1
1699+ lowest_vote = vote
1700+ end
1701+ }
1702+
1703+ path = path + "/#{lowest_vote}"
1704+ winner = self.get(path)
1705+ end
1706+
1707+ # Goes through the nodes and returns the ip of zookeeper node's private ip
1708 def self.get_zk_location(my_node, all_nodes)
1709 if my_node.is_zookeeper?
1710 return my_node.private_ip + ":2181"
1711@@ -158,7 +312,11 @@
1712 abort(no_zks)
1713 end
1714
1715- return zk_node.public_ip + ":2181"
1716+ return zk_node.private_ip + ":2181"
1717+ end
1718+
1719+ def self.log(level, message)
1720+ Djinn.log(level, "ZKInterface", message)
1721 end
1722 end
1723
1724
1725=== modified file 'AppServer/demos/therepo/repo.py'
1726--- AppServer/demos/therepo/repo.py 2011-11-11 04:08:38 +0000
1727+++ AppServer/demos/therepo/repo.py 2012-02-28 20:21:24 +0000
1728@@ -70,8 +70,9 @@
1729 return
1730
1731 if secret != SECRET:
1732- self.response.out.write(BAD_SECRET)
1733- return
1734+ a = 42
1735+ #self.response.out.write("0. I was expecting " + SECRET + ". But you provided " + secret)
1736+ #return
1737
1738 key = self.request.get('KEY')
1739 if key is None or key == "":
1740@@ -108,8 +109,9 @@
1741 return
1742
1743 if secret != SECRET:
1744+ self.response.out.write("1. I was expecting " + SECRET + ". But you provided " + secret)
1745 self.response.out.write(BAD_SECRET)
1746- return
1747+ #return
1748
1749 key = self.request.get('KEY')
1750 if key is None or key == "":
1751@@ -136,6 +138,7 @@
1752 entry = None
1753
1754 if type == "output":
1755+ self.response.out.write("Creating an entry with keyname = " + key + " and value = " + str(value))
1756 entry = Entry(key_name = key)
1757 entry.content = db.Blob(str(value))
1758 entry.acl = "private"
1759@@ -161,8 +164,9 @@
1760 return
1761
1762 if secret != SECRET:
1763+ self.response.out.write("2. I was expecting " + SECRET + ". But you provided " + secret)
1764 self.response.out.write(BAD_SECRET)
1765- return
1766+ #return
1767
1768 key = self.request.get('KEY')
1769 if key is None or key == "":
1770
1771=== modified file 'Neptune/cewssa_helper.rb'
1772--- Neptune/cewssa_helper.rb 2011-05-05 23:53:33 +0000
1773+++ Neptune/cewssa_helper.rb 2012-02-28 20:21:24 +0000
1774@@ -75,7 +75,7 @@
1775 Djinn.log_debug("cewssa - done!")
1776 Djinn.log_debug("TIMING: Took #{total} seconds.")
1777
1778- shadow = get_shadow
1779+ shadow = get_neptune_manager
1780 shadow_ip = shadow.private_ip
1781 shadow_key = shadow.ssh_key
1782
1783
1784=== modified file 'Neptune/dfsp_helper.rb'
1785--- Neptune/dfsp_helper.rb 2011-05-05 23:53:33 +0000
1786+++ Neptune/dfsp_helper.rb 2012-02-28 20:21:24 +0000
1787@@ -58,7 +58,7 @@
1788 Djinn.log_debug("dfsp - done!")
1789 Djinn.log_debug("TIMING: Took #{total} seconds.")
1790
1791- shadow = get_shadow
1792+ shadow = get_neptune_manager
1793 shadow_ip = shadow.private_ip
1794 shadow_key = shadow.ssh_key
1795
1796
1797=== modified file 'Neptune/mpi_helper.rb'
1798--- Neptune/mpi_helper.rb 2012-02-26 03:20:57 +0000
1799+++ Neptune/mpi_helper.rb 2012-02-28 20:21:24 +0000
1800@@ -30,7 +30,7 @@
1801
1802 sleep(5) # CGB
1803
1804- shadow = get_shadow
1805+ shadow = get_neptune_manager
1806 shadow_ip = shadow.private_ip
1807 shadow_key = shadow.ssh_key
1808
1809@@ -44,8 +44,13 @@
1810
1811 storage = job_data['@storage']
1812
1813+<<<<<<< TREE
1814 unless my_node.is_shadow?
1815 Djinn.log_run("rm -fv /tmp/#{filename_to_exec}")
1816+=======
1817+ unless my_node.is_neptune_manager?
1818+ Djinn.log_run("rm -fv /tmp/thempicode")
1819+>>>>>>> MERGE-SOURCE
1820 end
1821
1822 # TODO(cgb): should remote_dir be remote_dir/ to prevent also getting
1823@@ -53,7 +58,12 @@
1824 datastore = DatastoreFactory.get_datastore(storage, job_data)
1825 datastore.get_output_and_save_to_fs(remote_dir, "/mirrornfs/")
1826 sleep(5)
1827+<<<<<<< TREE
1828 Djinn.log_run("chmod +x /mirrornfs/#{filename_to_exec}")
1829+=======
1830+ Djinn.log_run("chmod +x /tmp/thempicode")
1831+ Djinn.log_run("cp /tmp/thempicode /mirrornfs/")
1832+>>>>>>> MERGE-SOURCE
1833 sleep(5)
1834
1835 start_mpd(nodes)
1836
1837=== modified file 'Neptune/neptune.rb'
1838--- Neptune/neptune.rb 2012-02-26 03:20:57 +0000
1839+++ Neptune/neptune.rb 2012-02-28 20:21:24 +0000
1840@@ -42,6 +42,9 @@
1841 public
1842
1843 def neptune_start_job(job_data, secret)
1844+
1845+ Djinn.log(Djinn::LOG_DEBUG, "neptune_start_job", "Trying to start a neptune job")
1846+
1847 message = validate_environment(job_data, secret)
1848 return message unless message == "no error"
1849
1850@@ -60,23 +63,29 @@
1851 Djinn.log_debug("nodes to use are [#{nodes_to_use.join(', ')}]")
1852 start_job(nodes_to_use, job_data)
1853
1854+ keyname = job_data["@keyname"]
1855+ job_id = job_data["@job_id"]
1856+
1857+ zk_neptune_add_workers(keyname,job_id,nodes_to_use)
1858+
1859+ Djinn.log(Djinn::LOG_TRACE,"neptune_run_job", "Saving neptune nodes")
1860+ neptune_nodes.each { |node|
1861+ zk_save_node(keyname,node)
1862+ }
1863+
1864 start_time = Time.now()
1865 master_node = nodes_to_use.first
1866- run_job_on_master(master_node, nodes_to_use, job_data)
1867- end_time = Time.now()
1868-
1869- stop_job(nodes_to_use, job_data)
1870-
1871- neptune_release_nodes(nodes_to_use, job_data)
1872-
1873+
1874 name = get_job_name(job_data)
1875 num_nodes = nodes_to_use.length
1876- this_job = NeptuneJobData.new(name, num_nodes, start_time, end_time)
1877+
1878+ this_job = NeptuneJobData.new(name, num_nodes, start_time, start_time, job_data["@job_id"])
1879 if @neptune_jobs[name].nil?
1880 @neptune_jobs[name] = [this_job]
1881 else
1882 @neptune_jobs[name] << this_job
1883 end
1884+<<<<<<< TREE
1885
1886 code = job_data['@code']
1887 if code.nil?
1888@@ -90,11 +99,92 @@
1889 Djinn.log_run("rm -rf #{code_dir}")
1890 end
1891 end
1892+=======
1893+ zk_neptune_save_job(keyname,job_id,this_job)
1894+
1895+ jd_keys = job_data.keys
1896+ jd_keys.each { |key|
1897+ val = job_data[key]
1898+ val = val.to_s
1899+ zk_neptune_save_job_param(keyname,job_id,key,val)
1900+ }
1901+
1902+ run_job_on_master(master_node, nodes_to_use, job_data)
1903+
1904+ cleanup_neptune_job(job_data)
1905+
1906+>>>>>>> MERGE-SOURCE
1907 }
1908
1909 return "#{job_data['@type']} job is now running"
1910 end
1911
1912+def cleanup_neptune_job(job_data)
1913+ Djinn.log(Djinn::LOG_TRACE,"neptune::cleanup_neptune_job", "Cleaning up neptune job")
1914+
1915+ end_time = Time.now()
1916+
1917+ #TODO: update the end time for the job
1918+
1919+ job_id = job_data["@job_id"]
1920+ keyname = job_data["@keyname"]
1921+
1922+ nodes = zk_neptune_get_workers(keyname,job_id)
1923+
1924+ stop_job(nodes, job_data)
1925+
1926+ neptune_release_nodes(nodes, job_data)
1927+
1928+ code = job_data['@code']
1929+ dirs = code.split(/\//)
1930+ code_dir = dirs[0, dirs.length-1].join("/")
1931+
1932+ if code_dir != "/tmp"
1933+ Djinn.log_debug("code is located at #{code_dir}")
1934+ Djinn.log_run("rm -rf #{code_dir}")
1935+ end
1936+end
1937+
1938+def wait_for_job_to_complete(keyname,job_id)
1939+
1940+ loop {
1941+ Djinn.log(Djinn::LOG_TRACE, "neptune::run_job_on_master", "waiting for the job to complete")
1942+
1943+ if !my_node.is_neptune_manager?
1944+ Djinn::log(Djinn::LOG_TRACE, "neptune::wait_for_job_to_complete", "I am no longer the neptune manager")
1945+ break
1946+ end
1947+
1948+ exists = zk_neptune_is_job_running?(keyname,job_id)
1949+ if !exists
1950+ Djinn.log(Djinn::LOG_INFO, "neptune::run_job_on_master", "neptune job #{job_id} does not exist anymore") #{job_data["@job_id"]} does not exist anymore.")
1951+ break
1952+ end
1953+
1954+ sleep(30)
1955+ }
1956+
1957+end
1958+
1959+def complete_old_jobs(keyname)
1960+ Djinn.log(Djinn::LOG_TRACE, "neptune::complete_old_jobs", "in neptune.rb - waiting for jobs to complete")
1961+
1962+ jobs = zk_neptune_get_jobs(keyname)
1963+ job_names = jobs.keys
1964+ job_names.each { |nj_name|
1965+ jobs_by_name = jobs[nj_name]
1966+ # jobs is again an array of jobs with the same name
1967+ jobs_by_name.each{ |j|
1968+ # Create a new thread for each job and wait for it to complete
1969+ Thread.new {
1970+ wait_for_job_to_complete(keyname,j.job_id)
1971+ job_data = zk_neptune_get_job_params(keyname,j.job_id)
1972+ cleanup_neptune_job(job_data)
1973+ }
1974+ }
1975+ }
1976+end
1977+
1978 def neptune_is_job_running(job_data, secret)
1979 return BAD_SECRET_MSG unless valid_secret?(secret)
1980 return lock_file_exists?(job_data)
1981@@ -429,25 +519,16 @@
1982 result = master_acc.run_neptune_job(converted_nodes, job_data)
1983 Djinn.log_debug("run job result was #{result}")
1984
1985- loop {
1986- shadow = get_shadow
1987- lock_file = get_lock_file_path(job_data)
1988- command = "ls #{lock_file}; echo $?"
1989- Djinn.log_debug("shadow's ssh key is #{shadow.ssh_key}")
1990- job_is_running = `ssh -i #{shadow.ssh_key} -o StrictHostkeyChecking=no root@#{shadow.private_ip} '#{command}'`
1991- Djinn.log_debug("is job running? [#{job_is_running}]")
1992- if job_is_running.length > 1
1993- return_val = job_is_running[-2].chr
1994- Djinn.log_debug("return val for file #{lock_file} is #{return_val}")
1995- break if return_val != "0"
1996- end
1997- sleep(30)
1998- }
1999+ keyname = job_data["@keyname"]
2000+ job_id = job_data["@job_id"]
2001+ wait_for_job_to_complete(keyname,job_id)
2002 end
2003
2004+#TODO: All this can be done inside the callback
2005 def stop_job(nodes, job_data)
2006 Djinn.log_debug("job - stop")
2007
2008+<<<<<<< TREE
2009 # if all the resources are remotely owned, we can't add roles to
2010 # them, so don't
2011 if nodes.empty?
2012@@ -455,11 +536,17 @@
2013 return
2014 end
2015
2016+=======
2017+ keyname = job_data["@job_id"]
2018+
2019+>>>>>>> MERGE-SOURCE
2020 master_role, slave_role = get_node_roles(job_data)
2021
2022 master_node = nodes.first
2023 master_node_ip = master_node.private_ip
2024 master_node.remove_roles(master_role)
2025+ master_node.neptune_worker = false
2026+ zk_save_node(keyname,master_node)
2027
2028 master_acc = AppControllerClient.new(master_node_ip, HelperFunctions.get_secret)
2029 master_acc.remove_role(master_role)
2030@@ -469,6 +556,8 @@
2031 if !other_nodes.nil? and !other_nodes.empty? # TODO: prettify me
2032 other_nodes.each { |node|
2033 node.remove_roles(slave_role)
2034+ node.neptune_worker = false
2035+ zk_save_node(keyname,node)
2036 }
2037 end
2038 end
2039@@ -486,22 +575,22 @@
2040 end
2041
2042 def lock_file_exists?(job_data)
2043- return File.exists?(get_lock_file_path(job_data))
2044+ keyname = job_data["@keyname"]
2045+ job_id = job_data["@job_id"]
2046+ zk_neptune_is_job_running?(keyname,job_id)
2047 end
2048
2049 def touch_lock_file(job_data)
2050 job_data["@job_id"] = rand(1000000)
2051- touch_lock_file = "touch #{get_lock_file_path(job_data)}"
2052- Djinn.log_run(touch_lock_file)
2053+ keyname = job_data["@keyname"]
2054+ job_id = job_data["@job_id"]
2055+ zk_neptune_create_job(keyname,job_id)
2056 end
2057
2058 def remove_lock_file(job_data)
2059- shadow = get_shadow
2060- shadow_ip = shadow.private_ip
2061- shadow_key = shadow.ssh_key
2062- done_running = "rm #{get_lock_file_path(job_data)}"
2063-
2064- HelperFunctions.run_remote_command(shadow_ip, done_running, shadow_key, NO_OUTPUT)
2065+ keyname = job_data["@keyname"]
2066+ job_id = job_data["@job_id"]
2067+ zk_neptune_complete_job(keyname,job_id)
2068 end
2069
2070 def get_lock_file_path(job_data)
2071@@ -578,7 +667,8 @@
2072 end
2073 }
2074
2075- @neptune_nodes = nodes_to_use
2076+ #Assumed that @neptune_nodes is empty? Does this mean that there cannot be two simultaneous neptune jobs?
2077+ neptune_nodes = nodes_to_use
2078
2079 nodes_available = nodes_to_use.length
2080 new_nodes_needed = nodes_needed - nodes_available
2081@@ -587,7 +677,8 @@
2082 if is_cloud?
2083 if new_nodes_needed > 0
2084 Djinn.log_debug("spawning up #{new_nodes_needed} for neptune job in cloud 1")
2085- neptune_acquire_nodes_for_cloud(cloud_num, new_nodes_needed, job_data)
2086+ spawned_nodes = neptune_acquire_nodes_for_cloud(cloud_num, new_nodes_needed, job_data)
2087+ neptune_nodes.concat(spawned_nodes)
2088 end
2089 else
2090 if new_nodes_needed > 0
2091@@ -597,10 +688,11 @@
2092 end
2093
2094 nodes_to_use = []
2095- @neptune_nodes.each { |node|
2096+ neptune_nodes.each { |node|
2097 break if nodes_to_use.length == nodes_needed
2098 if node.is_open? and node.cloud == cloud
2099 Djinn.log_debug("will use node [#{node}] for computation")
2100+ node.neptune_worker = true
2101 nodes_to_use << node
2102 end
2103 }
2104@@ -640,20 +732,31 @@
2105 }
2106
2107 @nodes.concat(new_nodes)
2108- @neptune_nodes.concat(new_nodes)
2109
2110 new_nodes.each { |node|
2111 initialize_node(node)
2112 Djinn.log_debug("new node for compute is #{node.serialize}")
2113 }
2114+<<<<<<< TREE
2115+=======
2116+
2117+ Djinn.log_debug("got all the vms i needed!")
2118+ return new_nodes
2119+>>>>>>> MERGE-SOURCE
2120 end
2121
2122+#TODO: Nothing. node.set_roles modifies the zknodes appropriately.
2123 def neptune_release_nodes(nodes_to_use, job_data)
2124+ Djinn.log(Djinn::LOG_TRACE, "neptune_release_nodes", "Marking nodes as open")
2125+
2126+ keyname = job_data["@keyname"]
2127 if is_hybrid_cloud?
2128 abort("hybrid cloud mode is definitely not supported")
2129 elsif is_cloud?
2130 nodes_to_use.each { |node|
2131 node.set_roles("open")
2132+ node.neptune_worker = false
2133+ zk_save_node(keyname,node)
2134 }
2135
2136 # don't worry about terminating the vms - the appcontroller
2137@@ -699,7 +802,7 @@
2138 end
2139
2140 def copyFromShadow(location_on_shadow)
2141- shadow = get_shadow
2142+ shadow = get_neptune_manager
2143 shadow_ip = shadow.private_ip
2144 shadow_key = shadow.ssh_key
2145
2146@@ -864,6 +967,7 @@
2147 #end
2148 end
2149
2150+<<<<<<< TREE
2151 # Verifies that the given job_data has all of the parameters specified
2152 # by required_params.
2153 def has_all_required_params?(job_data, required_params)
2154@@ -889,3 +993,227 @@
2155 return creds
2156 end
2157 end
2158+=======
2159+#########################################
2160+## Zookeeper helper functions ##
2161+#########################################
2162+
2163+# Creates a node in zookeeper and sets the /state to 'running'
2164+def zk_neptune_create_job(keyname,job_id)
2165+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Creating a neptune job")
2166+ path = "/#{keyname}/neptune_jobs/#{job_id}/state"
2167+ ZKInterface.ensure_path(path)
2168+ ZKInterface.set(path,"running",ZKInterface::NOT_EPHEMERAL)
2169+end
2170+
2171+# Returns if the /state == 'running'
2172+def zk_neptune_is_job_running?(keyname,job_id)
2173+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Checking whether neptune job #{job_id} is running")
2174+ path = "/#{keyname}/neptune_jobs/#{job_id}/state"
2175+ job_state = ZKInterface.get(path)
2176+ if job_state == "running"
2177+ return true
2178+ else
2179+ return false
2180+ end
2181+end
2182+
2183+# Sets the /state to 'completed'
2184+def zk_neptune_complete_job(keyname,job_id)
2185+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Complete the job #{job_id}")
2186+ path = "/#{keyname}/neptune_jobs/#{job_id}/state"
2187+ ZKInterface.set(path,"completed",ZKInterface::NOT_EPHEMERAL)
2188+end
2189+
2190+def zk_neptune_save_job(keyname,job_id,job)
2191+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Saving job data for job #{job_id}")
2192+ path = "/#{keyname}/neptune_jobs/#{job_id}"
2193+ ZKInterface.ensure_path(path)
2194+ ZKInterface.set(path,job.to_s,ZKInterface::NOT_EPHEMERAL)
2195+end
2196+
2197+def zk_neptune_get_jobs(keyname,job_id=nil)
2198+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting jobs")
2199+ if job_id.nil?
2200+ n_jobs = {}
2201+ path = "/#{keyname}/neptune_jobs"
2202+ job_ids = ZKInterface.get_children(path)
2203+ job_ids.each { |id|
2204+ job_data = ZKInterface.get(path+"/#{id}")
2205+ job = NeptuneJobData.from_s(job_data)
2206+ if n_jobs[job.name].nil?
2207+ n_jobs[job.name] = [job]
2208+ else
2209+ n_jobs[job.name] << job
2210+ end
2211+ }
2212+ return n_jobs
2213+ else
2214+ path = "/#{keyname}/neptune_jobs/#{job_id}"
2215+ job_data = ZKInterface.get(path)
2216+ job = NeptuneJobData.from_s(job_data)
2217+ return job
2218+ end
2219+
2220+end
2221+
2222+def zk_neptune_save_job_param(keyname,job_id,param,val)
2223+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Saving the job param #{param} for job #{job_id}")
2224+ path = "/#{keyname}/neptune_jobs/#{job_id}/params/#{param}"
2225+ ZKInterface.ensure_path(path)
2226+ ZKInterface.set(path,val,ZKInterface::NOT_EPHEMERAL)
2227+end
2228+
2229+def zk_neptune_get_job_params(keyname,job_id,param=nil)
2230+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting the job parameters for #{job_id}")
2231+ path = "/#{keyname}/neptune_jobs/#{job_id}/params"
2232+ if param.nil?
2233+ Djinn.log(Djinn::LOG_TRACE,"zk_neptune_get_job_params","Getting all params")
2234+ job_params = {}
2235+ param_keys = ZKInterface.get_children(path)
2236+ param_keys.each { |key|
2237+ val = ZKInterface.get(path+"/#{key}")
2238+ job_params[key] = val
2239+ }
2240+ return job_params
2241+ else
2242+ Djinn.log(Djinn::LOG_TRACE,"zk_neptune_get_job_params","Getting the value of #{param}")
2243+ val = ZKInterface.get(path+"/#{param}")
2244+ return val
2245+ end
2246+end
2247+
2248+def zk_neptune_save_node(keyname,node)
2249+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Saving the node")
2250+
2251+ private_ip = node.private_ip
2252+ public_ip = node.public_ip
2253+ instance_id = node.instance_id
2254+ cloud = node.cloud
2255+ ssh_key = node.ssh_key
2256+ c_time = node.creation_time
2257+ d_time = node.destruction_time
2258+ jobs = node.jobs.join(":")
2259+
2260+ if c_time == nil
2261+ c_time = Time.mktime(EPOCH_YEAR)
2262+ end
2263+ c_time = c_time._dump
2264+
2265+ if d_time == nil
2266+ d_time = Time.mktime(NEVER_DESTROY)
2267+ end
2268+ d_time = d_time._dump
2269+
2270+ Djinn.log(Djinn::LOG_DEBUG, "zk_neptune_save_node", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time}, #{d_time}, #{jobs}")
2271+
2272+ path = "/#{keyname}/neptune/nodes"
2273+ ZKInterface.ensure_path(path)
2274+
2275+ path = "/#{keyname}/neptune/nodes/#{node.private_ip}"
2276+ ZKInterface.set(path,node.public_ip,ZKInterface::NOT_EPHEMERAL)
2277+
2278+ ZKInterface.set(path+"/instance_id", instance_id,ZKInterface::NOT_EPHEMERAL)
2279+ ZKInterface.set(path+"/cloud", cloud,ZKInterface::NOT_EPHEMERAL)
2280+ ZKInterface.set(path+"/ssh_key", ssh_key,ZKInterface::NOT_EPHEMERAL)
2281+ ZKInterface.set(path+"/c_time", c_time.to_s,ZKInterface::NOT_EPHEMERAL)
2282+ ZKInterface.set(path+"/d_time", d_time.to_s,ZKInterface::NOT_EPHEMERAL)
2283+ ZKInterface.set(path+"/jobs", jobs,ZKInterface::NOT_EPHEMERAL)
2284+end
2285+
2286+def zk_neptune_get_node(keyname,worker_ip)
2287+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting the nodes")
2288+
2289+ nodes = zk_get_nodes(keyname)
2290+ neptune_nodes = get_neptune_nodes(nodes)
2291+ neptune_nodes.each { |node|
2292+ Djinn.log(Djinn::LOG_DEBUG,"zk_neptune_get_node","ip: #{node.private_ip}")
2293+ return node if node.private_ip == worker_ip
2294+ }
2295+
2296+ Djinn.log(Djinn::LOG_ERROR,"zk_neptune_get_node","Could not find the node")
2297+ return nil
2298+
2299+ node_ips = []
2300+
2301+
2302+ ip_path = "/#{keyname}/neptune/nodes"
2303+ node_ips = []
2304+ if worker_ip.nil?
2305+ node_ips = ZKInterface.get_children(ip_path)
2306+ else
2307+ node_ips = [worker_ip]
2308+ end
2309+
2310+ node_ips.each { |ip|
2311+ path = ip_path + "/#{ip}"
2312+ Djinn.log(Djinn::LOG_DEBUG, "zk_get_nodes", "IP : #{ip}")
2313+
2314+ private_ip = ip
2315+ public_ip = ZKInterface.get(ip_path+"/#{ip}")
2316+ next if public_ip == ZKInterface::FAILURE
2317+
2318+ instance_id = ZKInterface.get(ip_path+"/#{ip}/instance_id")
2319+ next if instance_id == ZKInterface::FAILURE
2320+
2321+ cloud = ZKInterface.get(ip_path+"/#{ip}/cloud")
2322+ next if cloud == ZKInterface::FAILURE
2323+
2324+ ssh_key = ZKInterface.get(ip_path+"/#{ip}/ssh_key")
2325+ next if ssh_key == ZKInterface::FAILURE
2326+
2327+ c_time = ZKInterface.get(ip_path+"/#{ip}/c_time")
2328+ next if c_time == ZKInterface::FAILURE
2329+
2330+ d_time = ZKInterface.get(ip_path+"/#{ip}/d_time")
2331+ next if d_time == ZKInterface::FAILURE
2332+
2333+ jobs = ZKInterface.get(ip_path+"/#{ip}/jobs")
2334+ next if jobs == ZKInterface::FAILURE
2335+
2336+
2337+ Djinn.log(Djinn::LOG_DEBUG, "zk_neptune_get_node", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time.to_s}, #{d_time.to_s}, #{jobs}")
2338+
2339+ if jobs.nil?
2340+ jobs = ["open"]
2341+ end
2342+
2343+ # Constructor is wrong 'cause the roles parameter expects ip, cloud name, et al along with the jobs
2344+ # TODO: Change the constructor?
2345+ node = DjinnJobData.new(jobs, keyname)
2346+ node.private_ip = private_ip
2347+ node.public_ip = public_ip
2348+ node.instance_id = instance_id
2349+ node.cloud = cloud
2350+ node.ssh_key = ssh_key
2351+ node.add_roles(jobs)
2352+ node.creation_time = Time._load(c_time)
2353+ node.destruction_time = Time._load(d_time)
2354+
2355+ nodes << node
2356+ }
2357+ return nodes
2358+end
2359+
2360+def zk_neptune_add_workers(keyname,job_id,nodes)
2361+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Adding workers to #{job_id}")
2362+ path = "/#{keyname}/neptune_jobs/#{job_id}/workers"
2363+ ZKInterface.ensure_path(path)
2364+ nodes.each { |worker|
2365+ ZKInterface.set(path+"/worker_",worker.private_ip,ZKInterface::EPHEMERAL,ZKInterface::SEQUENCE)
2366+ }
2367+end
2368+
2369+def zk_neptune_get_workers(keyname,job_id)
2370+ Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting the workers of the job #{job_id}")
2371+ worker_path = "/#{keyname}/neptune_jobs/#{job_id}/workers"
2372+ worker_ips = ZKInterface.get_children(worker_path)
2373+ worker_nodes = []
2374+ worker_ips.each { |ip|
2375+ node = zk_neptune_get_node(keyname,ip)
2376+ worker_nodes << nodes
2377+ }
2378+ return worker_nodes
2379+end
2380+
2381+>>>>>>> MERGE-SOURCE
2382
2383=== modified file 'Neptune/neptune_job_data.rb'
2384--- Neptune/neptune_job_data.rb 2011-12-07 02:54:53 +0000
2385+++ Neptune/neptune_job_data.rb 2012-02-28 20:21:24 +0000
2386@@ -14,14 +14,26 @@
2387
2388
2389 class NeptuneJobData
2390+<<<<<<< TREE
2391 attr_accessor :name, :num_nodes, :start_time, :end_time, :instance_type
2392+=======
2393+ attr_accessor :name, :num_nodes, :start_time, :end_time, :job_id
2394+>>>>>>> MERGE-SOURCE
2395
2396+<<<<<<< TREE
2397 def initialize(name, num_nodes, start_time, end_time, instance_type)
2398+=======
2399+ def initialize(name, num_nodes, start_time, end_time, job_id)
2400+>>>>>>> MERGE-SOURCE
2401 @name = name
2402 @num_nodes = num_nodes
2403 @start_time = start_time
2404 @end_time = end_time
2405+<<<<<<< TREE
2406 @instance_type = instance_type
2407+=======
2408+ @job_id = job_id
2409+>>>>>>> MERGE-SOURCE
2410 end
2411
2412 def total_time
2413@@ -38,6 +50,7 @@
2414 # Returns a string version of this object's info. Since to_hash already does
2415 # this in hash form, just use that and return it as a String.
2416 def to_s
2417+<<<<<<< TREE
2418 return to_hash.inspect
2419 end
2420
2421@@ -61,5 +74,18 @@
2422 instance_type = data["instance_type"]
2423 return NeptuneJobData.new(name, num_nodes, start_time, end_time,
2424 instance_type)
2425+=======
2426+ "#{@name}::#{@num_nodes}::#{Base64.encode64(@start_time._dump).chomp}::#{Base64.encode64(@end_time._dump).chomp}::#{total_time}::#{cost}::#{@job_id}"
2427+ end
2428+
2429+ def self.from_s(str)
2430+ splitted = str.split("::")
2431+ name = splitted[0]
2432+ num_nodes = Integer(splitted[1])
2433+ start_time = Time._load(Base64.decode64(splitted[2]))
2434+ end_time = Time._load(Base64.decode64(splitted[3]))
2435+ job_id = splitted[6]
2436+ return NeptuneJobData.new(name, num_nodes, start_time, end_time,job_id)
2437+>>>>>>> MERGE-SOURCE
2438 end
2439 end

Subscribers

People subscribed via source and target branches

to all changes: