Merge lp:~hegde-shashank/appscale/trunk into lp:~cgb-cs/appscale/main-cgb-research
- trunk
- Merge into main-cgb-research
Proposed by
Shashank Hegde
Status: | Needs review |
---|---|
Proposed branch: | lp:~hegde-shashank/appscale/trunk |
Merge into: | lp:~cgb-cs/appscale/main-cgb-research |
Diff against target: |
2439 lines (+1665/-106) (has conflicts) 11 files modified
AppController/djinn.rb (+686/-25) AppController/lib/djinn_job_data.rb (+25/-0) AppController/lib/repo.rb (+238/-1) AppController/lib/role_watcher.rb (+110/-0) AppController/lib/zkinterface.rb (+195/-37) AppServer/demos/therepo/repo.py (+8/-4) Neptune/cewssa_helper.rb (+1/-1) Neptune/dfsp_helper.rb (+1/-1) Neptune/mpi_helper.rb (+11/-1) Neptune/neptune.rb (+364/-36) Neptune/neptune_job_data.rb (+26/-0) Text conflict in AppController/djinn.rb Text conflict in AppController/lib/djinn_job_data.rb Text conflict in AppController/lib/repo.rb Text conflict in AppController/lib/zkinterface.rb Text conflict in Neptune/mpi_helper.rb Text conflict in Neptune/neptune.rb Text conflict in Neptune/neptune_job_data.rb |
To merge this branch: | bzr merge lp:~hegde-shashank/appscale/trunk |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Chris Bunch | Pending | ||
Review via email: mp+95036@code.launchpad.net |
Commit message
Description of the change
Adding fault tolerance to AppScale
To post a comment you must log in.
Unmerged revisions
- 801. By root <root@appscale-image0>
-
Deleting commented code
- 800. By root <root@appscale-image0>
-
refactored the code
- 799. By root <root@appscale-image0>
-
Neptune manager migration works!
- 798. By root <root@appscale-image0>
-
Saving neptune jobs and nodes to zookeeper
- 797. By root <root@appscale-image0>
-
Fixed neptune and saving the neptune data to zookeeper
- 796. By root <root@appscale-image0>
-
Storing nodes into zookeeper
- 795. By root <root@appscale-image0>
-
Added app manager failover
- 794. By root <root@appscale-image0>
-
Added leader election for status_monitor
- 793. By root <root@appscale-image0>
-
Added role watchers and callbacks
- 792. By root <root@appscale-image0>
-
Reconnecting to zookeeper if required
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'AppController/djinn.rb' |
2 | --- AppController/djinn.rb 2012-02-20 22:36:25 +0000 |
3 | +++ AppController/djinn.rb 2012-02-28 20:21:24 +0000 |
4 | @@ -263,6 +263,14 @@ |
5 | @registered_with_sisyphus = false |
6 | end |
7 | |
8 | + LOG_DEBUG = "debug" |
9 | + LOG_TRACE = "trace" |
10 | + LOG_INFO = "info" |
11 | + LOG_ERROR = "error!" |
12 | + |
13 | + ROOT_APP_PATH = "/apps" |
14 | + ROLE_PATH = "/roles" |
15 | + |
16 | def done(secret) |
17 | if valid_secret?(secret) |
18 | return @done_loading |
19 | @@ -272,9 +280,16 @@ |
20 | end |
21 | |
22 | def kill(secret) |
23 | +<<<<<<< TREE |
24 | if !valid_secret?(secret) |
25 | return BAD_SECRET_MSG |
26 | end |
27 | +======= |
28 | + return BAD_SECRET_MSG unless valid_secret?(secret) |
29 | + |
30 | + unregister_roles |
31 | + |
32 | +>>>>>>> MERGE-SOURCE |
33 | @kill_sig_received = true |
34 | |
35 | if is_hybrid_cloud? |
36 | @@ -294,8 +309,15 @@ |
37 | # turned on since that was the state they started in |
38 | |
39 | stop_ejabberd if my_node.is_login? |
40 | +<<<<<<< TREE |
41 | stop_sisyphus if my_node.is_appengine? |
42 | Repo.stop if my_node.is_shadow? or my_node.is_appengine? |
43 | +======= |
44 | + |
45 | + if my_node.is_status_monitor? or my_node.is_appengine? |
46 | + Repo.stop |
47 | + end |
48 | +>>>>>>> MERGE-SOURCE |
49 | |
50 | jobs_to_run = my_node.jobs |
51 | commands = { |
52 | @@ -492,7 +514,7 @@ |
53 | # error messages and have the tools poll it |
54 | Thread.new { |
55 | # Tell other nodes to shutdown this application |
56 | - if @app_names.include?(app_name) and !my_node.is_appengine? |
57 | + if @app_names.include?(app_name) and my_node.is_app_manager? |
58 | @nodes.each { |node| |
59 | next if node.private_ip == my_node.private_ip |
60 | if node.is_appengine? or node.is_login? |
61 | @@ -533,7 +555,7 @@ |
62 | HAProxy.remove_app(app_name) |
63 | Nginx.reload |
64 | Collectd.restart |
65 | - ZKInterface.remove_app_entry(app_name) |
66 | + zk_remove_app_entry(app_name) |
67 | # TODO God does not shut down the application, so do it here for |
68 | # A temp fix. |
69 | `ps -ef | grep dev_appserver | grep #{app_name} | grep -v grep | grep cookie_secret | awk '{print $2}' | xargs kill -9` |
70 | @@ -602,7 +624,15 @@ |
71 | parse_creds |
72 | change_job |
73 | end |
74 | - |
75 | + |
76 | + register_roles |
77 | + setup_watchers |
78 | + |
79 | + keyname = @creds["keyname"] |
80 | + |
81 | + # Initially the status monitor konws everything about all the nodes. |
82 | + save_nodes_to_zookeeper if my_node.is_status_monitor? |
83 | + |
84 | while !@kill_sig_received do |
85 | @state = "Done starting up AppScale, now in heartbeat mode" |
86 | write_database_info |
87 | @@ -611,22 +641,39 @@ |
88 | update_api_status |
89 | send_logs_to_sisyphus |
90 | |
91 | - if my_node.is_shadow? |
92 | + @nodes.each { |n| |
93 | + Djinn.log(LOG_DEBUG, "NODE from memory:", "#{n}") |
94 | + } |
95 | + |
96 | + # TODO: neptune manager might spawn/ delete nodes according to the neptune jobs. |
97 | + # So should we get the nodes from zookeeper in every loop? Or should we listen for changes? |
98 | + # @nodes must be updated on all nodes. so all nodes must listen/get the data from zookeeper |
99 | + # Getting the data from zookeeper in each loop now. Optimize this later. |
100 | + @nodes = zk_get_nodes(keyname) |
101 | + |
102 | + neptune_nodes = get_neptune_nodes(@nodes) |
103 | + |
104 | + if my_node.is_status_monitor? |
105 | @nodes.each { |node| |
106 | get_status(node) |
107 | |
108 | if node.should_destroy? |
109 | Djinn.log_debug("Heartbeat - destroying node [#{node}]") |
110 | @nodes.delete(node) |
111 | - @neptune_nodes.delete(node) |
112 | + zk_delete_node(keyname,node) |
113 | infrastructure = @creds["infrastructure"] |
114 | HelperFunctions.terminate_vms([node], infrastructure) |
115 | FileUtils.rm_f("/etc/appscale/status-#{node.private_ip}.json") |
116 | end |
117 | } |
118 | + |
119 | Djinn.log_debug("Finished contacting all other nodes") |
120 | + else |
121 | + Djinn.log_debug("No need to heartbeat, we aren't the status monitor") |
122 | + end |
123 | |
124 | - @neptune_nodes.each { |node| |
125 | + if my_node.is_neptune_manager? |
126 | + neptune_nodes.each { |node| |
127 | Djinn.log_debug("Currently examining node [#{node}]") |
128 | if node.should_extend? |
129 | Djinn.log_debug("Extending time for node [#{node}]") |
130 | @@ -634,19 +681,26 @@ |
131 | elsif node.should_destroy? |
132 | Djinn.log_debug("Time is up for node [#{node}] - destroying it") |
133 | @nodes.delete(node) |
134 | - @neptune_nodes.delete(node) |
135 | + zk_delete_node(keyname,node) |
136 | infrastructure = @creds["infrastructure"] |
137 | HelperFunctions.terminate_vms([node], infrastructure) |
138 | FileUtils.rm_f("/etc/appscale/status-#{node.private_ip}.json") |
139 | end |
140 | } |
141 | - else |
142 | - Djinn.log_debug("No need to heartbeat, we aren't the shadow") |
143 | + |
144 | + save_neptune_data_to_zookeeper |
145 | end |
146 | |
147 | # TODO: consider only calling this if new apps are found |
148 | start_appengine |
149 | +<<<<<<< TREE |
150 | Kernel.sleep(20) |
151 | +======= |
152 | + |
153 | + #TODO: Check if the @nodes have changed. If so, update zookeeper |
154 | + |
155 | + sleep(20) |
156 | +>>>>>>> MERGE-SOURCE |
157 | end |
158 | end |
159 | |
160 | @@ -674,34 +728,39 @@ |
161 | end |
162 | |
163 | if File.exists?(location) |
164 | - ZKInterface.add_app_entry(appname, my_node.serialize, location) |
165 | - result = "success" |
166 | + zk_add_app_entry(appname, my_node, location) |
167 | + result = "success! #{appname} : #{location}" |
168 | else |
169 | result = "The #{appname} app was not found at #{location}." |
170 | end |
171 | |
172 | - Djinn.log_debug(result) |
173 | + Djinn.log(Djinn::LOG_TRACE, "done_uploading", result) |
174 | return result |
175 | end |
176 | |
177 | def is_app_running(appname, secret) |
178 | +<<<<<<< TREE |
179 | if !valid_secret?(secret) |
180 | return BAD_SECRET_MSG |
181 | end |
182 | |
183 | hosters = ZKInterface.get_app_hosters(appname) |
184 | +======= |
185 | + return BAD_SECRET_MSG unless valid_secret?(secret) |
186 | + hosters = zk_get_app_hosters(appname) |
187 | +>>>>>>> MERGE-SOURCE |
188 | hosters_w_appengine = [] |
189 | hosters.each { |node| |
190 | hosters_w_appengine << node if node.is_appengine? |
191 | } |
192 | |
193 | app_running = !hosters_w_appengine.empty? |
194 | - Djinn.log_debug("Is app #{appname} running? #{app_running}") |
195 | return app_running |
196 | end |
197 | |
198 | |
199 | def add_role(new_role, secret) |
200 | +<<<<<<< TREE |
201 | if !valid_secret?(secret) |
202 | return BAD_SECRET_MSG |
203 | end |
204 | @@ -722,22 +781,47 @@ |
205 | send("start_#{role}".to_sym) |
206 | end |
207 | } |
208 | +======= |
209 | + return BAD_SECRET_MSG unless valid_secret?(secret) |
210 | + my_node.add_roles(new_role) |
211 | + start_roles = new_role.split(":") |
212 | + start_roles.each { |role| |
213 | + Djinn.log(Djinn::LOG_TRACE,"add_role","Calling start_#{role}") |
214 | + send("start_#{role}".to_sym) |
215 | +>>>>>>> MERGE-SOURCE |
216 | } |
217 | - |
218 | +<<<<<<< TREE |
219 | + |
220 | +======= |
221 | + |
222 | + # add the node as a role actor |
223 | + zk_add_role(my_node, new_role) |
224 | + |
225 | +>>>>>>> MERGE-SOURCE |
226 | return "OK" |
227 | end |
228 | |
229 | def remove_role(old_role, secret) |
230 | +<<<<<<< TREE |
231 | if !valid_secret?(secret) |
232 | return BAD_SECRET_MSG |
233 | end |
234 | |
235 | my_node.remove_roles(old_role) |
236 | +======= |
237 | + Djinn.log(Djinn::LOG_TRACE,"remove_role","Removing role #{old_role}") |
238 | + return BAD_SECRET_MSG unless valid_secret?(secret) |
239 | +>>>>>>> MERGE-SOURCE |
240 | stop_roles = old_role.split(":") |
241 | stop_roles.each { |role| |
242 | Djinn.log_debug("Removing and stopping role #{role}") |
243 | send("stop_#{role}".to_sym) |
244 | } |
245 | + my_node.remove_roles(old_role) |
246 | + |
247 | + # remove the node as a role actor |
248 | + zk_remove_role(my_node, old_role) |
249 | + |
250 | return "OK" |
251 | end |
252 | |
253 | @@ -750,6 +834,7 @@ |
254 | # Important: Definitely do not log within the following three methods, as |
255 | # it would cause an infinite loop. |
256 | def self.log_debug(msg) |
257 | +<<<<<<< TREE |
258 | time = Time.now |
259 | self.log_to_stdout(time, msg) |
260 | self.log_to_buffer(time, msg) |
261 | @@ -777,6 +862,45 @@ |
262 | # Logs and runs the given command, which is assumed to be trusted and thus |
263 | # needs no filtering on our part. Obviously this should not be executed by |
264 | # anything that the user could inject input into. |
265 | +======= |
266 | + # TODO: reduce the copypasta here |
267 | + puts "(Appscale) [#{Time.now}] #{msg}" |
268 | + STDOUT.flush # TODO: examine performance impact of this |
269 | + if @lock.nil? |
270 | + begin |
271 | + Syslog.open("appscale") { |s| s.debug(msg) } |
272 | + rescue RuntimeError |
273 | + end |
274 | + else |
275 | + @lock.synchronize { |
276 | + begin |
277 | + Syslog.open("appscale") { |s| s.debug(msg) } |
278 | + rescue RuntimeError |
279 | + end |
280 | + } |
281 | + end |
282 | + end |
283 | + |
284 | + def self.log(level, tag, msg) |
285 | + # TODO: reduce the copypasta here |
286 | + puts "(Appscale) [#{Time.now}] [#{level}] [#{tag}] #{msg}" |
287 | + STDOUT.flush # TODO: examine performance impact of this |
288 | + if @lock.nil? |
289 | + begin |
290 | + Syslog.open("appscale") { |s| s.debug(msg) } |
291 | + rescue RuntimeError |
292 | + end |
293 | + else |
294 | + @lock.synchronize { |
295 | + begin |
296 | + Syslog.open("appscale") { |s| s.debug(msg) } |
297 | + rescue RuntimeError |
298 | + end |
299 | + } |
300 | + end |
301 | + end |
302 | + |
303 | +>>>>>>> MERGE-SOURCE |
304 | def self.log_run(command) |
305 | Djinn.log_debug(command) |
306 | Djinn.log_debug(`#{command}`) |
307 | @@ -819,6 +943,35 @@ |
308 | return djinn_loc_array |
309 | end |
310 | |
311 | +<<<<<<< TREE |
312 | +======= |
313 | + def initialize() |
314 | + @job = "none" |
315 | + @nodes = [] |
316 | + @creds = {} |
317 | + @app_names = [] |
318 | + @apps_loaded = [] |
319 | + @kill_sig_received = false |
320 | + @my_index = nil |
321 | + @@secret = HelperFunctions.get_secret |
322 | + @done_loading = false |
323 | + @port = 8080 |
324 | + @haproxy = 10000 |
325 | + @userappserver_public_ip = "not-up-yet" |
326 | + @userappserver_private_ip = "not-up-yet" |
327 | + @state = "AppController just started" |
328 | + @total_boxes = 0 |
329 | + @num_appengines = 3 |
330 | + @restored = false |
331 | + @neptune_jobs = {} |
332 | + @neptune_nodes = [] |
333 | + @lock = Monitor.new |
334 | + @api_status = {} |
335 | + |
336 | + @watcher_callbacks = {} |
337 | + end |
338 | + |
339 | +>>>>>>> MERGE-SOURCE |
340 | def get_login |
341 | @nodes.each { |node| |
342 | return node if node.is_login? |
343 | @@ -835,6 +988,30 @@ |
344 | abort("No shadow nodes found.") |
345 | end |
346 | |
347 | + def get_status_monitor |
348 | + @nodes.each { |node| |
349 | + return node if node.is_status_monitor? |
350 | + } |
351 | + return nil |
352 | + #abort("No status monitor found.") |
353 | + end |
354 | + |
355 | + def get_app_manager |
356 | + @nodes.each { |node| |
357 | + return node if node.is_app_manager? |
358 | + } |
359 | + return nil |
360 | + #abort("No app manager found.") |
361 | + end |
362 | + |
363 | + def get_neptune_manager |
364 | + @nodes.each { |node| |
365 | + return node if node.is_neptune_manager? |
366 | + } |
367 | + return nil |
368 | + #abort("No neptune manager found.") |
369 | + end |
370 | + |
371 | def get_db_master |
372 | @nodes.each { |node| |
373 | return node if node.is_db_master? |
374 | @@ -1062,6 +1239,7 @@ |
375 | Djinn.log_debug("Restoring neptune data!") |
376 | jobs_info = (File.open(file_to_load) { |f| f.read }).chomp |
377 | jobs = [] |
378 | +<<<<<<< TREE |
379 | |
380 | json_data = JSON.load(jobs_info) |
381 | return if json_data.nil? |
382 | @@ -1074,6 +1252,27 @@ |
383 | |
384 | if @neptune_jobs[job_name].nil? |
385 | @neptune_jobs[job_name] = [this_job] |
386 | +======= |
387 | + jobs_info.split("\n").each { |job| |
388 | + info = job.split("::") |
389 | + name = info[0] |
390 | + num_nodes = Integer(info[1]) |
391 | + |
392 | + begin |
393 | + start_time = Time._load(info[2]) |
394 | + end_time = Time._load(info[3]) |
395 | + rescue TypeError |
396 | + start_time = Time.now |
397 | + end_time = Time.now |
398 | + end |
399 | + |
400 | + job_id = info[6] |
401 | + |
402 | + this_job = NeptuneJobData.new(name, num_nodes, start_time, end_time, job_id) |
403 | + |
404 | + if @neptune_jobs[name].nil? |
405 | + @neptune_jobs[name] = [this_job] |
406 | +>>>>>>> MERGE-SOURCE |
407 | else |
408 | @neptune_jobs[job_name] = [this_job] |
409 | end |
410 | @@ -1384,8 +1583,14 @@ |
411 | @total_boxes = Integer(@creds['min_images']) |
412 | end |
413 | |
414 | +<<<<<<< TREE |
415 | Djinn.log_debug("Pre-loop: #{@nodes.join('\n')}") |
416 | +======= |
417 | + Djinn.log_debug("pre-loop: #{@nodes.join('\n')}") |
418 | + # On the head node @nodes.size = 1. It contains only the head node at this point. Other nodes aren't populated untill the call to spawn_and_setup_appengine. |
419 | +>>>>>>> MERGE-SOURCE |
420 | if @nodes.size == 1 and @total_boxes > 1 |
421 | + #spawn_and_setup_appengine parses the ips from creds, converts it to nodes and add them to the node array. |
422 | spawn_and_setup_appengine |
423 | loop { |
424 | Djinn.log_debug("Looping: #{@nodes.join('\n')}") |
425 | @@ -1480,6 +1685,7 @@ |
426 | @state = "Starting up SOAP Server and PBServer" |
427 | start_pbserver |
428 | start_soap_server |
429 | +<<<<<<< TREE |
430 | HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UserAppClient::SERVER_PORT) |
431 | end |
432 | |
433 | @@ -1508,8 +1714,112 @@ |
434 | end |
435 | |
436 | # appengine is started elsewhere |
437 | - end |
438 | - |
439 | +======= |
440 | + HelperFunctions.sleep_until_port_is_open(HelperFunctions.local_ip, UA_SERVER_PORT) |
441 | + end |
442 | + |
443 | + start_blobstore_server if my_node.is_appengine? |
444 | + |
445 | + # for neptune jobs, start a place where they can save output to |
446 | + # also, since repo does health checks on the app engine apis, start it up there too |
447 | + |
448 | + repo_ip = get_status_monitor.public_ip |
449 | + repo_private_ip = get_status_monitor.private_ip |
450 | + repo_ip = my_node.public_ip if my_node.is_appengine? |
451 | + repo_private_ip = my_node.private_ip if my_node.is_appengine? |
452 | + Repo.init(repo_ip, repo_private_ip, @@secret) |
453 | + |
454 | + if my_node.is_status_monitor? or my_node.is_appengine? |
455 | + Repo.start(get_login.public_ip, @userappserver_private_ip) |
456 | + end |
457 | + |
458 | + # appengine is started elsewhere |
459 | +>>>>>>> MERGE-SOURCE |
460 | + end |
461 | + |
462 | + def start_status_monitor |
463 | + Djinn.log(Djinn::LOG_TRACE, "start_status_monitor", "Starting status monitor") |
464 | + my_node.add_roles(DjinnJobData::ROLE_STATUS_MTR) |
465 | + |
466 | + # Get the @nodes from zookeeper |
467 | + Djinn.log(Djinn::LOG_TRACE, "start_status_monitor", "Getting nodes from zookeeper") |
468 | + keyname = @creds['keyname'] |
469 | + @nodes = zk_get_nodes(keyname) |
470 | + |
471 | + zk_add_role(my_node,DjinnJobData::ROLE_STATUS_MTR) |
472 | + zk_save_node(keyname,my_node) |
473 | + end |
474 | + |
475 | + def stop_status_monitor |
476 | + Djinn.log(Djinn::LOG_TRACE,self.class,"Starting status monitor") |
477 | + |
478 | + if !my_node.is_status_monitor? |
479 | + Djinn.log(Djinn::LOG_ERROR,"stop_status_monitor", "I am not the status monitor") |
480 | + return |
481 | + end |
482 | + |
483 | + my_node.remove_roles(DjinnJobData::ROLE_STATUS_MTR) |
484 | + zk_remove_role(my_node,DjinnJobData::ROLE_STATUS_MTR) |
485 | + keyname = @creds['keyname'] |
486 | + zk_save_node(keyname,my_node) |
487 | + end |
488 | + |
489 | + def start_neptune_manager |
490 | + Djinn.log(LOG_TRACE, self.class, "Starting neptune manager") |
491 | + my_node.add_roles(DjinnJobData::ROLE_NEPTUNE_MGR) |
492 | + zk_add_role(my_node,DjinnJobData::ROLE_NEPTUNE_MGR) |
493 | + keyname = @creds['keyname'] |
494 | + zk_save_node(keyname,my_node) |
495 | + Djinn.log(Djinn::LOG_TRACE, "start_neptune_manager", "Node saved, Getting neptune data from zookeeper") |
496 | + |
497 | + # Get the neptune nodes and neptune jobs from zookeeper |
498 | + get_neptune_data_from_zookeeper |
499 | + |
500 | + Djinn.log(Djinn::LOG_TRACE, "start_neptune_manager", "Completing old neptune jobs") |
501 | + # Wait for old jobs to complete |
502 | + complete_old_jobs(keyname) |
503 | + |
504 | + end |
505 | + |
506 | + def stop_neptune_manager |
507 | + Djinn.log(Djinn::LOG_TRACE, "stop_neptune_manager", "Stopping neptune manager") |
508 | + |
509 | + if !my_node.is_neptune_manager? |
510 | + Djinn.log(Djinn::LOG_ERROR, "stop_neptune_manager", "I am not the neptune manager.") |
511 | + return |
512 | + end |
513 | + |
514 | + my_node.remove_roles(DjinnJobData::ROLE_NEPTUNE_MGR) |
515 | + zk_remove_role(my_node,DjinnJobData::ROLE_NEPTUNE_MGR) |
516 | + Djinn.log(Djinn::LOG_TRACE, "stop_neptune_manager", "Removed role from Zookeeper") |
517 | + |
518 | + keyname = @creds['keyname'] |
519 | + zk_save_node(keyname,my_node) |
520 | + Djinn.log(Djinn::LOG_TRACE, "stop_neptune_manager", "neptune manager stopper") |
521 | + end |
522 | + |
523 | + def start_app_manager |
524 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Adding the role of app manager") |
525 | + my_node.add_roles(DjinnJobData::ROLE_APP_MGR) |
526 | + zk_add_role(my_node,DjinnJobData::ROLE_APP_MGR) |
527 | + keyname = @creds['keyname'] |
528 | + zk_save_node(keyname,my_node) |
529 | + end |
530 | + |
531 | + def stop_app_manager |
532 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Stopping app manager") |
533 | + |
534 | + if !my_node.is_app_manager? |
535 | + Djinn.log(Djinn::LOG_ERROR, "stop_app_manager", "I am not the app manager") |
536 | + return |
537 | + end |
538 | + |
539 | + my_node.remove_roles(DjinnJobData::ROLE_APP_MGR) |
540 | + zk_remove_role(my_node,DjinnJobData::ROLE_APP_MGR) |
541 | + keyname = @creds['keyname'] |
542 | + zk_save_node(keyname,my_node) |
543 | + end |
544 | + |
545 | def start_blobstore_server |
546 | db_local_ip = @userappserver_private_ip |
547 | my_ip = my_node.public_ip |
548 | @@ -1978,6 +2288,7 @@ |
549 | start_cmd = "/usr/bin/memcached -d -m 32 -p 11211 -u root" |
550 | stop_cmd = "pkill memcached" |
551 | GodInterface.start(:memcached, start_cmd, stop_cmd, [11211]) |
552 | + |
553 | end |
554 | |
555 | def stop_memcached() |
556 | @@ -2025,6 +2336,7 @@ |
557 | rescue SocketError |
558 | end |
559 | end |
560 | + |
561 | end |
562 | |
563 | # TODO: this function should use hadoop_helper |
564 | @@ -2058,7 +2370,7 @@ |
565 | # TODO: this function should use hadoop_helper |
566 | def start_hadoop_org() |
567 | i = my_node |
568 | - return unless i.is_shadow? # change this later to db_master |
569 | + return unless i.is_db_master? |
570 | hadoop_home = File.expand_path("#{APPSCALE_HOME}/AppDB/hadoop-0.20.2/") |
571 | Djinn.log_run("#{hadoop_home}/bin/hadoop namenode -format 2>&1") |
572 | Djinn.log_run("#{hadoop_home}/bin/start-dfs.sh 2>&1") |
573 | @@ -2123,9 +2435,14 @@ |
574 | app_language = app_data.scan(/language:(\w+)/).flatten.to_s |
575 | |
576 | # TODO: merge these |
577 | - shadow = get_shadow |
578 | - shadow_ip = shadow.private_ip |
579 | - ssh_key = shadow.ssh_key |
580 | + appmanager = get_app_manager |
581 | + if appmanager == nil |
582 | + Djinn.log(Djinn::LOG_ERROR,"start_app_engine","App manager not found") |
583 | + return |
584 | + end |
585 | + |
586 | + appmanager_ip = appmanager.private_ip |
587 | + ssh_key = appmanager.ssh_key |
588 | app_dir = "/var/apps/#{app}/app" |
589 | app_path = "#{app_dir}/#{app}.tar.gz" |
590 | FileUtils.mkdir_p(app_dir) |
591 | @@ -2134,14 +2451,21 @@ |
592 | HelperFunctions.setup_app(app) |
593 | |
594 | |
595 | - if my_node.is_shadow? |
596 | + if my_node.is_app_manager? |
597 | + Djinn.log(Djinn::LOG_DEBUG, "start_appengine", "app_manager - updating cron and starting xmpp") |
598 | CronHelper.update_cron(my_public, app_language, app) |
599 | start_xmpp_for_app(app, app_language) |
600 | end |
601 | |
602 | if my_node.is_appengine? |
603 | +<<<<<<< TREE |
604 | app_number = @nginx_port - 8080 |
605 | start_port = HelperFunctions::APP_START_PORT |
606 | +======= |
607 | + Djinn.log(Djinn::LOG_DEBUG, "start_appengine", "appengine - starting app") |
608 | + app_number = @port - 8080 |
609 | + start_port = 20000 |
610 | +>>>>>>> MERGE-SOURCE |
611 | static_handlers = HelperFunctions.parse_static_data(app) |
612 | proxy_port = HAProxy.app_listen_port(app_number) |
613 | login_ip = get_login.public_ip |
614 | @@ -2210,13 +2534,14 @@ |
615 | done_uploading(app, app_path, @@secret) |
616 | end |
617 | |
618 | - Monitoring.restart if my_node.is_shadow? |
619 | + Monitoring.restart if my_node.is_status_monitor? |
620 | |
621 | if @app_names.include?("none") |
622 | @apps_loaded = @apps_loaded - ["none"] |
623 | @app_names = @app_names - ["none"] |
624 | end |
625 | - |
626 | + |
627 | + Djinn.log_debug("Loading #{app}") |
628 | @apps_loaded << app |
629 | } |
630 | |
631 | @@ -2272,7 +2597,7 @@ |
632 | |
633 | nodes_with_app = [] |
634 | loop { |
635 | - nodes_with_app = ZKInterface.get_app_hosters(appname) |
636 | + nodes_with_app = zk_get_app_hosters(appname) |
637 | break unless nodes_with_app.empty? |
638 | Djinn.log_debug("No nodes currently have a copy of app #{appname}, waiting...") |
639 | Kernel.sleep(5) |
640 | @@ -2310,7 +2635,8 @@ |
641 | # for app named baz, this translates to baz@login_ip |
642 | |
643 | login_ip = get_login.public_ip |
644 | - login_uac = UserAppClient.new(login_ip, @@secret) |
645 | + Djinn.log_debug("Login ip: #{login_ip}") |
646 | + login_uac = UserAppClient.new(@userappserver_private_ip, @@secret) |
647 | xmpp_user = "#{app}@#{login_ip}" |
648 | xmpp_pass = HelperFunctions.encrypt_password(xmpp_user, @@secret) |
649 | login_uac.commit_new_user(xmpp_user, xmpp_pass, "app") |
650 | @@ -2407,4 +2733,339 @@ |
651 | Djinn.log_debug("Stopping Sisyphus") |
652 | stop_app("sisyphus", @@secret) |
653 | end |
654 | + |
655 | + |
656 | + # Saves the node's role to zookeeper. |
657 | + # Interested nodes can set a watch on these role paths for changes |
658 | + def register_roles |
659 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Registering roles with zookeeper") |
660 | + |
661 | + me = my_node # save the node to avoid unnecessary calls to my_node |
662 | + jobs = me.jobs |
663 | + jobs.each { |job| |
664 | + zk_add_role(me,job) |
665 | + } |
666 | + |
667 | + end |
668 | + |
669 | + # Removes all the node's roles from zookeeper |
670 | + def unregister_roles |
671 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Unregistering roles with zookeeper") |
672 | + |
673 | + me = my_node # save the node to avoid unnecessary calls to my_node |
674 | + jobs = me.jobs |
675 | + jobs.each { |job| |
676 | + zk_add_role(me,job) |
677 | + } |
678 | + |
679 | + end |
680 | + |
681 | + def setup_watchers |
682 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Setting up watchers...") |
683 | + |
684 | + watch_role(DjinnJobData::ROLE_STATUS_MTR) |
685 | + watch_role(DjinnJobData::ROLE_APP_MGR) |
686 | + watch_role(DjinnJobData::ROLE_NEPTUNE_MGR) |
687 | + |
688 | + end |
689 | + |
690 | + def clear_votes(aRole) |
691 | + # Wait for some time so that everyone can cast a vote |
692 | + # This is not required for leader election, but for clearing the votes. |
693 | + # If the votes are cleared immediately, then a slow node can win, |
694 | + # resulting in two leaders |
695 | + sleep(10) |
696 | + ZKInterface.clear_votes(aRole) |
697 | + end |
698 | + |
699 | + def watch_role(role) |
700 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Watching role #{role}") |
701 | + if @watcher_callbacks[role].nil? |
702 | + Djinn.log(Djinn::LOG_TRACE, "watch_role", "Creating a callback for #{role}") |
703 | + @watcher_callbacks[role] = Zookeeper::WatcherCallback.new { |
704 | + role_change_notification(@watcher_callbacks[role].context) |
705 | + } |
706 | + end |
707 | + zk_get_actors(role,@watcher_callbacks[role]) |
708 | + end |
709 | + |
710 | +public |
711 | + def role_change_notification(context) |
712 | + path = context[:path] |
713 | + # path returned would be of the form /roles/<role-name> |
714 | + ar = path.split('/') |
715 | + role = ar[-1] |
716 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Received a role change notification for #{role}") |
717 | + |
718 | + new_actors = zk_get_actors(role, @watcher_callbacks[role]) |
719 | + Djinn.log(Djinn::LOG_DEBUG,"role_change_notification","actors of #{role}: #{new_actors}") |
720 | + |
721 | + if (role == DjinnJobData::ROLE_STATUS_MTR) or (role == DjinnJobData::ROLE_APP_MGR) or (role == DjinnJobData::ROLE_NEPTUNE_MGR) |
722 | + Djinn.log(Djinn::LOG_DEBUG,"role_change_notification","should elect new leader") |
723 | + if new_actors.size == 0 |
724 | + elect_new_actor(role) |
725 | + end |
726 | + end |
727 | + # Nodes might have changed after leader election. So get the latest nodes' state from zk |
728 | + keyname = @creds["keyname"] |
729 | + @my_index = nil |
730 | + @nodes = zk_get_nodes(keyname) |
731 | + end |
732 | + |
733 | +private |
734 | + def elect_new_actor(role) |
735 | + Djinn.log(Djinn::LOG_TRACE,"elect_new_actor","Electing a new actor for #{role}") |
736 | + my_ip = my_node.private_ip |
737 | + winner = ZKInterface.elect_leader(role,my_ip) |
738 | + Djinn.log(Djinn::LOG_TRACE,"elect_new_actor","Winner of #{role} election is #{winner}") |
739 | + if my_ip == winner |
740 | + action = "start_#{role}".to_sym |
741 | + send(action) |
742 | + else |
743 | + # Some other node was the winner |
744 | + # Update the node array |
745 | + @nodes.each { |n| |
746 | + if n.public_ip == winner |
747 | + n.add_roles(role) |
748 | + end |
749 | + } |
750 | + end |
751 | + ZKInterface.complete_election(role) |
752 | + end |
753 | + |
754 | + def save_nodes_to_zookeeper |
755 | + Djinn.log(LOG_TRACE, self.class, "Saving nodes to zookeeper") |
756 | + |
757 | + keyname = @creds["keyname"] |
758 | + @nodes.each { |node| |
759 | + zk_save_node(keyname, node) |
760 | + } |
761 | + end |
762 | + |
763 | + def save_neptune_data_to_zookeeper |
764 | + Djinn.log(LOG_TRACE, self.class, "Saving neptune data to zookeeper") |
765 | + |
766 | + keyname = @creds["keyname"] |
767 | + |
768 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Neptune jobs: #{@neptune_jobs}") |
769 | + job_names = @neptune_jobs.keys |
770 | + job_names.each { |nj_name| |
771 | + jobs = @neptune_jobs[nj_name] |
772 | + Djinn.log(Djinn::LOG_DEBUG, self.class, "Saving job #{jobs.class} #{jobs.class.name} #{jobs}") |
773 | + # jobs is again an array of jobs with the same name |
774 | + jobs.each{ |j| |
775 | + Djinn.log(Djinn::LOG_DEBUG, self.class, "Saving job #{j.class} #{j.class.name} #{j}") |
776 | + zk_neptune_save_job(keyname,j.job_id,j) |
777 | + } |
778 | + } |
779 | + end |
780 | + |
781 | + def get_neptune_data_from_zookeeper |
782 | + Djinn.log(LOG_TRACE, self.class, "Getting neptune data from zookeeper") |
783 | + keyname = @creds["keyname"] |
784 | + @neptune_jobs = zk_neptune_get_jobs(keyname) |
785 | + end |
786 | + |
787 | + def get_neptune_nodes(nodes) |
788 | + Djinn.log(LOG_TRACE,self.class,"Filtering neptune nodes from all nodes") |
789 | + neptune_nodes = [] |
790 | + nodes.each { |node| |
791 | + neptune_nodes << node if node.neptune_worker |
792 | + } |
793 | + return neptune_nodes |
794 | + end |
795 | + |
796 | + ############################################### |
797 | + ## Zookeeper helper functions ## |
798 | + ############################################### |
799 | + |
800 | + # Saves the application name in zookeeper |
801 | + # Path: |
802 | + # ROOT_APP_PATH/app_name/private_ip => serialized node data |
803 | + # ROOT_APP_PATH/app_name/private_ip/app_file => location of the app tar file |
804 | + # |
805 | + def zk_add_app_entry(appname, node, location) |
806 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Adding an app entry for #{appname}") |
807 | + |
808 | + appname_path = ROOT_APP_PATH + "/#{appname}" |
809 | + ZKInterface.ensure_path(appname_path) |
810 | + |
811 | + node_path = appname_path + "/#{node.private_ip}" |
812 | + |
813 | + # Save the node data in the zknode |
814 | + ZKInterface.set(node_path, node.serialize, ZKInterface::EPHEMERAL) |
815 | + # Save the location of the file |
816 | + |
817 | + file_location = node_path + "/app_file" |
818 | + ZKInterface.set(file_location, location, ZKInterface::NOT_EPHEMERAL) |
819 | + end |
820 | + |
821 | + # Removes the application name from zookeeper |
822 | + def zk_remove_app_entry(appname) |
823 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Removing app entry for #{appname}") |
824 | + |
825 | + appname_path = ROOT_APP_PATH + "/#{appname}" |
826 | + ZKInterface.delete(appname_path) |
827 | + end |
828 | + |
829 | + # Gets the nodes that are serving the given application |
830 | + # Path: |
831 | + # ROOT_APP_PATH/app_name/private_ip => serialized node data |
832 | + # ROOT_APP_PATH/app_name/private_ip/app_file => location of the app tar file |
833 | + def zk_get_app_hosters(appname) |
834 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Getting app hosters") |
835 | + |
836 | + appname_path = ROOT_APP_PATH + "/#{appname}" |
837 | + app_hoster_ips = [] |
838 | + app_hoster_ips = ZKInterface.get_children(appname_path) |
839 | + hoster_nodes = [] |
840 | + app_hoster_ips.each { |ip| |
841 | + node_data = ZKInterface.get(appname_path + "/#{ip}") |
842 | + hoster_nodes << DjinnJobData.deserialize(node_data) |
843 | + } |
844 | + return hoster_nodes |
845 | + end |
846 | + |
847 | + |
848 | + # Add the node's role to zookeeper. |
849 | + def zk_add_role(node, role) |
850 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Adding the role #{role} to #{node.private_ip}") |
851 | + |
852 | + path = ROLE_PATH + "/#{role}" |
853 | + ZKInterface.ensure_path(path) |
854 | + |
855 | + path = ROLE_PATH + "/#{role}/#{node.private_ip}" |
856 | + ZKInterface.set(path, DEFAULT_DATA, ZKInterface::EPHEMERAL) |
857 | + end |
858 | + |
859 | + # Remove the node's role from zookeeper. |
860 | + def zk_remove_role(node, role) |
861 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Removing the role #{role} to #{node.private_ip}") |
862 | + |
863 | + path = ROLE_PATH + "/#{role}/#{node.private_ip}" |
864 | + ZKInterface.delete(path) |
865 | + end |
866 | + |
867 | + # Gets the actors of the given role |
868 | + def zk_get_actors(role, callback = nil) |
869 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Getting actors for #{role}") |
870 | + |
871 | + path = ROLE_PATH + "/#{role}" |
872 | + actors = ZKInterface.get_children(path, callback) |
873 | + return actors |
874 | + end |
875 | + |
876 | + # Save the node into zookeeper |
877 | + def zk_save_node(keyname, node) |
878 | + |
879 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Saving node to zookeeper") |
880 | + |
881 | + private_ip = node.private_ip |
882 | + public_ip = node.public_ip |
883 | + instance_id = node.instance_id |
884 | + cloud = node.cloud |
885 | + ssh_key = node.ssh_key |
886 | + c_time = node.creation_time |
887 | + d_time = node.destruction_time |
888 | + jobs = node.jobs.join(":") |
889 | + n_worker = node.neptune_worker |
890 | + |
891 | + if c_time == nil |
892 | + c_time = Time.mktime(EPOCH_YEAR) |
893 | + end |
894 | + c_time = c_time._dump |
895 | + |
896 | + if d_time == nil |
897 | + d_time = Time.mktime(NEVER_DESTROY) |
898 | + end |
899 | + d_time = d_time._dump |
900 | + |
901 | + Djinn.log(Djinn::LOG_DEBUG, "zk_save_node", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time}, #{d_time}, #{jobs}, #{n_worker}") |
902 | + |
903 | + path = "/#{keyname}/nodes" |
904 | + ZKInterface.ensure_path(path) |
905 | + |
906 | + path = "/#{keyname}/nodes/#{node.private_ip}" |
907 | + ZKInterface.set(path,node.public_ip,ZKInterface::NOT_EPHEMERAL) |
908 | + |
909 | + ZKInterface.set(path+"/instance_id", instance_id,ZKInterface::NOT_EPHEMERAL) |
910 | + ZKInterface.set(path+"/cloud", cloud,ZKInterface::NOT_EPHEMERAL) |
911 | + ZKInterface.set(path+"/ssh_key", ssh_key,ZKInterface::NOT_EPHEMERAL) |
912 | + ZKInterface.set(path+"/c_time", c_time.to_s,ZKInterface::NOT_EPHEMERAL) |
913 | + ZKInterface.set(path+"/d_time", d_time.to_s,ZKInterface::NOT_EPHEMERAL) |
914 | + ZKInterface.set(path+"/jobs", jobs,ZKInterface::NOT_EPHEMERAL) |
915 | + ZKInterface.set(path+"/n_worker", n_worker.to_s,ZKInterface::NOT_EPHEMERAL) |
916 | + |
917 | + end |
918 | + |
919 | + # Get the node data from zookeeper |
920 | + def zk_get_nodes(keyname) |
921 | + |
922 | + Djinn.log(Djinn::LOG_TRACE, self.class, "Getting nodes from zookeeper") |
923 | + |
924 | + nodes = [] |
925 | + ip_path = "/#{keyname}/nodes" |
926 | + node_ips = ZKInterface.get_children(ip_path) |
927 | + node_ips.each { |ip| |
928 | + path = ip_path + "/#{ip}" |
929 | + Djinn.log(Djinn::LOG_DEBUG, "zk_get_nodes", "IP : #{ip}") |
930 | + |
931 | + private_ip = ip |
932 | + public_ip = ZKInterface.get(ip_path+"/#{ip}") |
933 | + next if public_ip == ZKInterface::FAILURE |
934 | + |
935 | + instance_id = ZKInterface.get(ip_path+"/#{ip}/instance_id") |
936 | + next if instance_id == ZKInterface::FAILURE |
937 | + |
938 | + cloud = ZKInterface.get(ip_path+"/#{ip}/cloud") |
939 | + next if cloud == ZKInterface::FAILURE |
940 | + |
941 | + ssh_key = ZKInterface.get(ip_path+"/#{ip}/ssh_key") |
942 | + next if ssh_key == ZKInterface::FAILURE |
943 | + |
944 | + c_time = ZKInterface.get(ip_path+"/#{ip}/c_time") |
945 | + next if c_time == ZKInterface::FAILURE |
946 | + |
947 | + d_time = ZKInterface.get(ip_path+"/#{ip}/d_time") |
948 | + next if d_time == ZKInterface::FAILURE |
949 | + |
950 | + jobs = ZKInterface.get(ip_path+"/#{ip}/jobs") |
951 | + next if jobs == ZKInterface::FAILURE |
952 | + |
953 | + n_worker = ZKInterface.get(ip_path+"/#{ip}/n_worker") |
954 | + n_worker = n_worker.to_sym |
955 | + next if n_worker == ZKInterface::FAILURE |
956 | + |
957 | + Djinn.log(Djinn::LOG_DEBUG, "zk_get_nodes", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time.to_s}, #{d_time.to_s}, #{jobs}, #{n_worker}") |
958 | + |
959 | + if jobs.nil? |
960 | + jobs = ["open"] |
961 | + end |
962 | + |
963 | + # Constructor is wrong 'cause the roles parameter expects ip, cloud name, et al along with the jobs |
964 | + # TODO: Change the constructor? |
965 | + node = DjinnJobData.new(jobs, keyname) |
966 | + node.private_ip = private_ip |
967 | + node.public_ip = public_ip |
968 | + node.instance_id = instance_id |
969 | + node.cloud = cloud |
970 | + node.ssh_key = ssh_key |
971 | + node.add_roles(jobs) |
972 | + node.creation_time = Time._load(c_time) |
973 | + node.destruction_time = Time._load(d_time) |
974 | + node.neptune_worker = n_worker |
975 | + |
976 | + nodes << node |
977 | + } |
978 | + @my_index = nil |
979 | + return nodes |
980 | + end |
981 | + |
982 | + def zk_delete_node(keyname,node) |
983 | + Djinn.log(Djinn::LOG_TRACE,self.class,"Deleting node #{node.private_ip}") |
984 | + |
985 | + path = "/#{keyname}/nodes/#{node.private_ip}" |
986 | + ZKInterface.delete(path) |
987 | + end |
988 | + |
989 | end |
990 | |
991 | === modified file 'AppController/lib/djinn_job_data.rb' |
992 | --- AppController/lib/djinn_job_data.rb 2012-02-19 04:55:17 +0000 |
993 | +++ AppController/lib/djinn_job_data.rb 2012-02-28 20:21:24 +0000 |
994 | @@ -15,10 +15,30 @@ |
995 | # also contains info about when we spawned the node (helpful for optimizing |
996 | # costs, which may charge on an hourly basis). |
997 | class DjinnJobData |
998 | + |
999 | + # Roles |
1000 | + ROLE_APP_ENGINE = "appengine" |
1001 | + ROLE_APP_MGR = "app_manager" |
1002 | + ROLE_DB_MASTER = "db_master" |
1003 | + ROLE_DB_SLAVE = "db_slave" |
1004 | + ROLE_LB = "load_balancer" |
1005 | + ROLE_LOGIN = "login" |
1006 | + ROLE_MEMCACHE = "memcache" |
1007 | + ROLE_NEPTUNE_MGR= "neptune_manager" |
1008 | + ROLE_OPEN = "open" |
1009 | + ROLE_STATUS_MTR = "status_monitor" |
1010 | + ROLE_ZOOKEEPER = "zookeeper" |
1011 | + |
1012 | attr_accessor :public_ip, :private_ip, :jobs, :instance_id, :cloud, :ssh_key |
1013 | attr_accessor :creation_time, :destruction_time, :failed_heartbeats |
1014 | +<<<<<<< TREE |
1015 | |
1016 | |
1017 | +======= |
1018 | + # If the node is a neptune worker then this is set to true |
1019 | + attr_accessor :neptune_worker |
1020 | + |
1021 | +>>>>>>> MERGE-SOURCE |
1022 | def initialize(roles, keyname) |
1023 | # format: "publicIP:privateIP:load_balancer:appengine:table-master:table-slave:instance_id:cloud" |
1024 | |
1025 | @@ -46,6 +66,7 @@ |
1026 | @failed_heartbeats = 0 |
1027 | |
1028 | appscale_jobs = ["load_balancer", "shadow"] |
1029 | + appscale_jobs += ["status_monitor", "app_manager", "neptune_manager"] |
1030 | appscale_jobs += ["db_master", "db_slave"] |
1031 | appscale_jobs += ["zookeeper"] |
1032 | appscale_jobs += ["login"] |
1033 | @@ -59,12 +80,15 @@ |
1034 | appscale_jobs.each { |job| |
1035 | @jobs << job if roles.include?(job) |
1036 | } |
1037 | + |
1038 | + @neptune_worker = false |
1039 | end |
1040 | |
1041 | def add_roles(roles) |
1042 | new_jobs = roles.split(":") |
1043 | @jobs = (@jobs + new_jobs).uniq |
1044 | @jobs.delete("open") |
1045 | + @jobs = ["open"] if @jobs.size == 0 |
1046 | end |
1047 | |
1048 | def remove_roles(roles) |
1049 | @@ -75,6 +99,7 @@ |
1050 | |
1051 | def set_roles(roles) |
1052 | @jobs = roles.split(":") |
1053 | + @jobs = ["open"] if @jobs.size == 0 |
1054 | end |
1055 | |
1056 | # not the best name for this but basically correct |
1057 | |
1058 | === modified file 'AppController/lib/repo.rb' |
1059 | --- AppController/lib/repo.rb 2012-02-27 07:19:48 +0000 |
1060 | +++ AppController/lib/repo.rb 2012-02-28 20:21:24 +0000 |
1061 | @@ -95,6 +95,243 @@ |
1062 | Djinn.log_debug(`pkill -f DevAppServerMain`) |
1063 | end |
1064 | |
1065 | - |
1066 | +<<<<<<< TREE |
1067 | + |
1068 | +======= |
1069 | + def self.valid_storage_creds(storage, creds) |
1070 | + if storage == "appdb" |
1071 | + valid = true |
1072 | + elsif storage == "s3" |
1073 | + conn = self.get_s3_conn(creds) |
1074 | + begin |
1075 | + all_buckets = conn.list_all_my_buckets |
1076 | + Djinn.log_debug("this user owns these buckets: [#{all_buckets.join(', ')}]") |
1077 | + valid = true |
1078 | + rescue RightAws::AwsError |
1079 | + valid = false |
1080 | + end |
1081 | + end |
1082 | + |
1083 | + Djinn.log_debug("did user provide valid storage creds? #{valid}") |
1084 | + end |
1085 | + |
1086 | + def self.set_output(path, output, storage="appdb", creds={}, is_file=false) |
1087 | + return self.set(path, output, :output, storage, creds, is_file) |
1088 | + end |
1089 | + |
1090 | + def self.get_output(path, storage="appdb", creds={}, is_file=false) |
1091 | + return self.get(path, :output, storage, creds, is_file) |
1092 | + end |
1093 | + |
1094 | + def self.set_acl(path, new_acl, storage="appdb", creds={}) |
1095 | + return self.set(path, new_acl, :acl, storage, creds) |
1096 | + end |
1097 | + |
1098 | + def self.get_acl(path, storage="appdb", creds={}) |
1099 | + return self.get(path, :acl, storage, creds) |
1100 | + end |
1101 | + |
1102 | + def self.does_file_exist?(path, storage="appdb", creds={}) |
1103 | + if storage == "appdb" |
1104 | + result = `curl http://#{@@ip}:8079/doesexist -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{path}'` |
1105 | + elsif storage == "s3" |
1106 | + conn = self.get_s3_conn(creds) |
1107 | + bucket, file = self.parse_s3_key(path) |
1108 | + |
1109 | + if self.does_s3_bucket_exist?(conn, bucket) |
1110 | + Djinn.log_debug("[does file exist] bucket [#{bucket}] exists") |
1111 | + begin |
1112 | + Djinn.log_debug("[does file exist] getting acl for bucket [#{bucket}] and file [#{file}] ") |
1113 | + conn.get_acl(bucket, file) |
1114 | + result = "true" |
1115 | + rescue RightAws::AwsError |
1116 | + result = "false" |
1117 | + end |
1118 | + else |
1119 | + Djinn.log_debug("[does file exist] bucket [#{bucket}] does not exist") |
1120 | + result = "false" |
1121 | + end |
1122 | + else |
1123 | + msg = "ERROR - unrecognized storage for does_file_exist via repo - you requested #{storage}" |
1124 | + Djinn.log_debug(msg) |
1125 | + abort(msg) |
1126 | + end |
1127 | + |
1128 | + Djinn.log_debug("does key=#{path} exist? #{result}") |
1129 | + return result == "true" |
1130 | + end |
1131 | + |
1132 | + private |
1133 | + |
1134 | + def self.get(key, type, storage, creds, is_file=false) |
1135 | + if storage == "appdb" |
1136 | + Djinn.log(Djinn::LOG_INFO, "repo::get", "Using the secret #{@@secret}") |
1137 | + result = `curl http://#{@@ip}:8079/get -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{key}' -d 'TYPE=#{type}'` |
1138 | + result = URI.unescape(result) |
1139 | + foo = File.new(is_file, File::CREAT|File::RDWR) |
1140 | + foo.write(result) |
1141 | + foo.close |
1142 | + elsif storage == "s3" |
1143 | + conn = self.get_s3_conn(creds) |
1144 | + bucket, file = self.parse_s3_key(key) |
1145 | + |
1146 | + if type == :output |
1147 | + if is_file |
1148 | + foo = File.new(is_file, File::CREAT|File::RDWR) |
1149 | + result = conn.get(bucket, file) { |chunk| |
1150 | + foo.write(chunk) |
1151 | + } |
1152 | + foo.close |
1153 | + else |
1154 | + result = conn.get(bucket, file)[:object] |
1155 | + end |
1156 | + elsif type == :acl |
1157 | + # TODO: implement me! |
1158 | + result = "private" |
1159 | + else |
1160 | + msg = "type not supported for get operation - #{type} was used" |
1161 | + abort(msg) |
1162 | + end |
1163 | + else |
1164 | + msg = "ERROR - unrecognized storage for get via repo - you requested #{storage}" |
1165 | + Djinn.log_debug(msg) |
1166 | + abort(msg) |
1167 | + end |
1168 | + |
1169 | + Djinn.log_debug("get key=#{key} type=#{type}") |
1170 | + return result |
1171 | + end |
1172 | + |
1173 | + def self.set(key, val, type, storage, creds, is_file=false) |
1174 | + if storage == "appdb" |
1175 | + if is_file |
1176 | + if File.directory?(val) |
1177 | + result = true |
1178 | + `ls #{val}`.split.each { |file| |
1179 | + fullkey = key + "/" + file |
1180 | + fullval = val + "/" + file |
1181 | + Djinn.log_debug("recursive dive - now saving remote [#{fullkey}], local [#{fullval}]") |
1182 | + temp = self.set(fullkey, fullval, type, storage, creds, is_file) |
1183 | + result = false unless temp |
1184 | + file = fullkey |
1185 | + } |
1186 | + else |
1187 | + Djinn.log_debug("attempting to put local file #{val} into file #{key}") |
1188 | + val = HelperFunctions.read_file(val, chomp=false) |
1189 | + val = URI.escape(val) |
1190 | + result = false |
1191 | + begin |
1192 | + res = Net::HTTP.post_form(URI.parse("http://#{@@ip}:8079/set"), |
1193 | + {'SECRET' => @@secret, 'KEY' => key, |
1194 | + 'VALUE' => val, 'TYPE' => type}) |
1195 | + Djinn.log_debug("set key=#{key} type=#{type} returned #{res.body}") |
1196 | + result = true if res.body == "success" |
1197 | + rescue Exception => e |
1198 | + Djinn.log_debug("saw exception #{e.class} when posting userdata to repo at #{key}") |
1199 | + end |
1200 | + |
1201 | + end |
1202 | + else |
1203 | + Djinn.log_debug("attempting to put local file #{val} into bucket #{bucket}, location #{file}") |
1204 | + val = URI.escape(val, Regexp.new("[^#{URI::PATTERN::UNRESERVED}]")) |
1205 | + result = `curl http://#{@@ip}:8079/set -X POST -d 'SECRET=#{@@secret}' -d 'KEY=#{key}' -d 'VALUE=#{val}' -d 'TYPE=#{type}'` |
1206 | + Djinn.log_debug("set key=#{key} type=#{type} returned #{result}") |
1207 | + result = true if result == "success" |
1208 | + end |
1209 | + elsif storage == "s3" |
1210 | + conn = self.get_s3_conn(creds) |
1211 | + bucket, file = self.parse_s3_key(key) |
1212 | + |
1213 | + if type == :output |
1214 | + # TODO: for now we assume the bucket exists |
1215 | + #if !self.does_s3_bucket_exist?(conn, bucket) |
1216 | + # Djinn.log_debug("bucket #{bucket} does not exist - creating it now") |
1217 | + # conn.create_bucket(bucket) |
1218 | + |
1219 | + # bucket creation takes a few moments - wait for it to exist |
1220 | + # before we start putting things in it |
1221 | + # loop { |
1222 | + # Djinn.log_debug("waiting for s3 bucket #{bucket} to exist") |
1223 | + # sleep(5) |
1224 | + # break if self.does_s3_bucket_exist?(conn, bucket) |
1225 | + # } |
1226 | + #end |
1227 | + |
1228 | + Djinn.log_debug("s3 bucket #{bucket} exists, now storing file #{file}") |
1229 | + |
1230 | + # this throws an exception that gets automatically caught and logged |
1231 | + # looks like "undefined method `pos' for <String...>" |
1232 | + # the put operation still succeeds |
1233 | + if is_file |
1234 | + if File.directory?(val) |
1235 | + result = true |
1236 | + `ls #{val}`.split.each { |file| |
1237 | + fullkey = key + "/" + file |
1238 | + fullval = val + "/" + file |
1239 | + Djinn.log_debug("recursive dive - now saving remote [#{fullkey}], local [#{fullval}]") |
1240 | + temp = self.set(fullkey, fullval, type, storage, creds, is_file) |
1241 | + result = false unless temp |
1242 | + file = fullkey |
1243 | + } |
1244 | + else |
1245 | + Djinn.log_debug("attempting to put local file #{val} into bucket #{bucket}, location #{file}") |
1246 | + result = conn.put(bucket, file, File.open(val)) #headers={"Content-Length" => val.length}) |
1247 | + end |
1248 | + else |
1249 | + result = conn.put(bucket, file, val, headers={"Content-Length" => val.length}) |
1250 | + end |
1251 | + |
1252 | + Djinn.log_debug("done putting file #{file} to s3!") |
1253 | + elsif type == :acl |
1254 | + # TODO: implement me! |
1255 | + return false |
1256 | + else |
1257 | + msg = "type not supported for get operation - #{type} was used" |
1258 | + abort(msg) |
1259 | + end |
1260 | + else |
1261 | + msg = "ERROR - unrecognized storage for set via repo - you requested #{storage}" |
1262 | + Djinn.log_debug(msg) |
1263 | + abort(msg) |
1264 | + end |
1265 | + |
1266 | + Djinn.log_debug("set operation returned #{result}") |
1267 | + return result |
1268 | + end |
1269 | + |
1270 | + def self.get_s3_conn(creds) |
1271 | + access_key = creds['EC2_ACCESS_KEY'] |
1272 | + secret_key = creds['EC2_SECRET_KEY'] |
1273 | + |
1274 | + s3_url = creds['S3_URL'] |
1275 | + |
1276 | + obscured_a_key = HelperFunctions.obscure_string(access_key) |
1277 | + obscured_s_key = HelperFunctions.obscure_string(secret_key) |
1278 | + |
1279 | + Djinn.log_debug("creating S3 connection with access key [#{obscured_a_key}], secret key [#{obscured_s_key}], and S3 url [#{s3_url}]") |
1280 | + |
1281 | + old_s3_url = ENV['S3_URL'] |
1282 | + ENV['S3_URL'] = s3_url |
1283 | + conn = RightAws::S3Interface.new(access_key, secret_key) |
1284 | + ENV['S3_URL'] = old_s3_url |
1285 | + |
1286 | + return conn |
1287 | + end |
1288 | + |
1289 | + def self.parse_s3_key(key) |
1290 | + paths = key.split("/") |
1291 | + bucket = paths[1] |
1292 | + file = paths[2, paths.length - 1].join("/") |
1293 | + return bucket, file |
1294 | + end |
1295 | + |
1296 | + def self.does_s3_bucket_exist?(conn, bucket) |
1297 | + all_buckets = conn.list_all_my_buckets |
1298 | + bucket_names = all_buckets.map { |b| b[:name] } |
1299 | + bucket_exists = bucket_names.include?(bucket) |
1300 | + Djinn.log_debug("the user owns buckets [#{bucket_names.join(', ')}] - do they own [#{bucket}]? #{bucket_exists}") |
1301 | + return bucket_exists |
1302 | + end |
1303 | +>>>>>>> MERGE-SOURCE |
1304 | end |
1305 | |
1306 | |
1307 | === added file 'AppController/lib/role_watcher.rb' |
1308 | --- AppController/lib/role_watcher.rb 1970-01-01 00:00:00 +0000 |
1309 | +++ AppController/lib/role_watcher.rb 2012-02-28 20:21:24 +0000 |
1310 | @@ -0,0 +1,110 @@ |
1311 | +#! /usr/bin/ruby -w |
1312 | + |
1313 | +require 'rubygems' |
1314 | +require 'zookeeper' |
1315 | +#require 'djinn' |
1316 | + |
1317 | +def listener(aContext) |
1318 | + puts("Callback!") |
1319 | +end |
1320 | + |
1321 | +# Base class for monitoring changes to roles |
1322 | +class RoleWatcher |
1323 | + |
1324 | + private #constants |
1325 | + ZOOKEEPER_PORT = 2181 |
1326 | + |
1327 | + public |
1328 | + |
1329 | + def initialize(aZookeeper, aCallback) |
1330 | + cb = aCallback.to_sym |
1331 | + send(cb,"test") |
1332 | + @mZK = Zookeeper.new("#{aZookeeper}:#{ZOOKEEPER_PORT}") |
1333 | + @mCallback = Zookeeper::WatcherCallback.new { |
1334 | + #notification(@mCallback.context) |
1335 | + send(cb,@mCallback.context) |
1336 | + } |
1337 | + end |
1338 | + |
1339 | + # Start a watch on the specified node |
1340 | + # the notification callback is invoked whenever the node value changes |
1341 | + def watch_node(aNode) |
1342 | + @mZK.get( :path => aNode, |
1343 | + :watcher => @mCallback, |
1344 | + :watcher_context => {:path => aNode} |
1345 | + )[:data] |
1346 | + end |
1347 | + |
1348 | + def watch_children(aNode) |
1349 | + @mZK.get_children(:path => aNode, |
1350 | + :watcher => @mCallback, |
1351 | + :watcher_context => {:path => aNode} |
1352 | + ) |
1353 | + end |
1354 | + |
1355 | + protected |
1356 | + |
1357 | + # Override |
1358 | + # This method is invoked whenever the watched node value changes |
1359 | + def notification(aContext) |
1360 | + end |
1361 | + |
1362 | +end |
1363 | + |
1364 | + |
1365 | +# Subclass for watching over the status monitor |
1366 | +class StatusMonitorWatcher < RoleWatcher |
1367 | + |
1368 | + public |
1369 | + def initialize(aZookeeper, aCallback) |
1370 | + super(aZookeeper) |
1371 | + @previousNodes |
1372 | + end |
1373 | + |
1374 | + protected |
1375 | + def notification(aContext) |
1376 | + watchedPath = aContext[:path] |
1377 | + # Get the new data and set the watch again |
1378 | + newData = watch_children(watchedPath) |
1379 | + #Djinn.log_debug("Status monitor : #{newData.first}") |
1380 | + puts("Status monitor : #{newData.first}") |
1381 | + end |
1382 | +end |
1383 | + |
1384 | + |
1385 | +# Subclass for watching over the app engine |
1386 | +class AppEngineWatcher < RoleWatcher |
1387 | + |
1388 | + public |
1389 | + def initialize(aZookeeper, aCallback) |
1390 | + super(aZookeeper,aCallback) |
1391 | + @previousNodes |
1392 | + end |
1393 | + |
1394 | + protected |
1395 | + def notification(aContext) |
1396 | + watchedPath = aContext[:path] |
1397 | + # Get the new data and set the watch again |
1398 | + newData = watch_children(watchedPath) |
1399 | + #Djinn.log_debug("AppEngine :") |
1400 | + puts("AppEngine:") |
1401 | + newData.each { |node| |
1402 | + #Djinn.log_debug(node) |
1403 | + puts(node) |
1404 | + } |
1405 | + end |
1406 | +end |
1407 | + |
1408 | +if __FILE__ == $0 |
1409 | + if ARGV.size != 1 |
1410 | + puts("#{$0} <zk host>") |
1411 | + exit |
1412 | + end |
1413 | + aeWatcher = AppEngineWatcher.new(ARGV[0], "listener") |
1414 | + ae = aeWatcher.watch_children("/roles/appengine") |
1415 | + ae.each { |n| |
1416 | + puts(n) |
1417 | + } |
1418 | + |
1419 | + a = STDIN.getc |
1420 | +end |
1421 | |
1422 | === modified file 'AppController/lib/zkinterface.rb' |
1423 | --- AppController/lib/zkinterface.rb 2012-02-18 07:20:06 +0000 |
1424 | +++ AppController/lib/zkinterface.rb 2012-02-28 20:21:24 +0000 |
1425 | @@ -2,11 +2,15 @@ |
1426 | |
1427 | require 'fileutils' |
1428 | require 'monitor' |
1429 | +require 'rubygems' |
1430 | +require 'zookeeper' |
1431 | +require 'zookeeper/exceptions' |
1432 | |
1433 | |
1434 | $:.unshift File.join(File.dirname(__FILE__)) |
1435 | require 'helperfunctions' |
1436 | |
1437 | +<<<<<<< TREE |
1438 | |
1439 | SUCCESS = 0 |
1440 | |
1441 | @@ -20,6 +24,12 @@ |
1442 | |
1443 | |
1444 | ROOT_APP_PATH = "/apps" |
1445 | +======= |
1446 | +DEFAULT_DATA = "nothing special here" |
1447 | + |
1448 | +EPOCH_YEAR = 1970 |
1449 | +NEVER_DESTROY = 2100 |
1450 | +>>>>>>> MERGE-SOURCE |
1451 | |
1452 | |
1453 | # The AppController employs the open source software ZooKeeper as a highly |
1454 | @@ -28,58 +38,68 @@ |
1455 | # communicate with ZooKeeper, and automates commonly performed functions by the |
1456 | # AppController. |
1457 | class ZKInterface |
1458 | - public |
1459 | - |
1460 | + private |
1461 | + RETRY_COUNT = 10 |
1462 | + ELECTION_PATH = "/roles/election" |
1463 | + |
1464 | + public |
1465 | + EPHEMERAL = true |
1466 | + NOT_EPHEMERAL = false |
1467 | + SEQUENCE = true |
1468 | + |
1469 | + SUCCESS = 0 |
1470 | + FAILURE = -1 |
1471 | + |
1472 | + |
1473 | + public |
1474 | def self.init(my_node, all_nodes) |
1475 | require 'rubygems' |
1476 | require 'zookeeper' |
1477 | |
1478 | + @mMyNode = my_node |
1479 | + @mAllNodes = all_nodes |
1480 | + #@@lock = nil |
1481 | + #@@zk = nil |
1482 | + self.connect(my_node,all_nodes) |
1483 | + end |
1484 | + |
1485 | + def self.connect(my_node, all_nodes) |
1486 | + require 'rubygems' |
1487 | + require 'zookeeper' |
1488 | + |
1489 | + #if @@lock == nil |
1490 | unless defined?(@@lock) |
1491 | @@lock = Monitor.new |
1492 | end |
1493 | |
1494 | zk_location = self.get_zk_location(my_node, all_nodes) |
1495 | + log(Djinn::LOG_INFO,"Zookeeper location: #{zk_location}") |
1496 | |
1497 | @@lock.synchronize { |
1498 | @@zk = Zookeeper.new(zk_location) |
1499 | + log(Djinn::LOG_TRACE, "Zookeeper: #{@@zk}") |
1500 | } |
1501 | - end |
1502 | - |
1503 | - def self.add_app_entry(appname, ip, location) |
1504 | - appname_path = ROOT_APP_PATH + "/#{appname}" |
1505 | - full_path = appname_path + "/#{ip}" |
1506 | - |
1507 | - # can't just create path in ZK |
1508 | - # need to do create the nodes at each level |
1509 | - |
1510 | - self.set(ROOT_APP_PATH, "nothing special here", NOT_EPHEMERAL) |
1511 | - self.set(appname_path, "nothing special here", NOT_EPHEMERAL) |
1512 | - self.set(full_path, location, EPHEMERAL) |
1513 | - end |
1514 | - |
1515 | - def self.remove_app_entry(appname) |
1516 | - appname_path = ROOT_APP_PATH + "/#{appname}" |
1517 | - self.delete(appname_path) |
1518 | - end |
1519 | - |
1520 | - def self.get_app_hosters(appname) |
1521 | - unless defined?(@@zk) |
1522 | - return [] |
1523 | + |
1524 | + end |
1525 | + |
1526 | + def self.ensure_path(path, data = DEFAULT_DATA) |
1527 | + self.log(Djinn::LOG_TRACE, "Ensuring path #{path}") |
1528 | + exists = self.get(path) |
1529 | + if exists == FAILURE |
1530 | + # find the last occurence of '/' and ensure path before that exists |
1531 | + index = path.rindex('/') |
1532 | + if index > 0 # if the last '/' is the first, then thats the root |
1533 | + parent = path[0..index-1] |
1534 | + self.ensure_path(parent, data) |
1535 | + end |
1536 | + self.set(path,data,ZKInterface::NOT_EPHEMERAL) |
1537 | end |
1538 | - |
1539 | - appname_path = ROOT_APP_PATH + "/#{appname}" |
1540 | - app_hosters = self.get_children(appname_path) |
1541 | - #converted = app_hosters |
1542 | - converted = [] |
1543 | - app_hosters.each { |serialized| |
1544 | - converted << DjinnJobData.deserialize(serialized) |
1545 | - } |
1546 | - return converted |
1547 | end |
1548 | |
1549 | - private |
1550 | - |
1551 | + # Gets the value stored in the zk node identified by the path |
1552 | + # If connection to the zookeeper cannot be established, then it retries 'RETRY_COUNT' times |
1553 | def self.get(key) |
1554 | +<<<<<<< TREE |
1555 | Djinn.log_debug("[ZK] trying to get #{key}") |
1556 | info = @@zk.get(:path => key) |
1557 | if info[:rc] == 0 |
1558 | @@ -117,10 +137,90 @@ |
1559 | end |
1560 | end |
1561 | |
1562 | +======= |
1563 | + self.log(Djinn::LOG_TRACE,"trying to get #{key}") |
1564 | + RETRY_COUNT.times { |retry_count| |
1565 | + begin |
1566 | + info = @@zk.get(:path => key) |
1567 | + if info[:rc] == 0 |
1568 | + return info[:data] |
1569 | + else |
1570 | + Djinn.log(Djinn::LOG_ERROR, "ZK::get", "Data not found at #{key}") |
1571 | + return FAILURE |
1572 | + end |
1573 | + # TODO: Handle exceptions better. |
1574 | + rescue Exception |
1575 | + log(Djinn::LOG_ERROR, "#{retry_count+1} reopening connection to zk") |
1576 | + @@zk.reopen |
1577 | + end |
1578 | + } |
1579 | + end |
1580 | + |
1581 | + # Gets the children of the zk node identified by the path |
1582 | + # If connection to the zookeeper cannot be established, then it retries 'RETRY_COUNT' times |
1583 | + def self.get_children(key, callback=nil) |
1584 | + self.log(Djinn::LOG_TRACE,"[ZK] trying to get children #{key}") |
1585 | + RETRY_COUNT.times { |retry_count| |
1586 | + begin |
1587 | + children = nil |
1588 | + if callback == nil |
1589 | + children = @@zk.get_children(:path => key)[:children] |
1590 | + else |
1591 | + self.log(Djinn::LOG_TRACE,"[ZK] watching the children at #{key}. Watcher : #{callback}") |
1592 | + children = @@zk.get_children(:path => key, |
1593 | + :watcher => callback, |
1594 | + :watcher_context => {:path => key} |
1595 | + )[:children] |
1596 | + end |
1597 | + if children.nil? |
1598 | + Djinn.log(Djinn::LOG_ERROR, "ZK::get_children", "Data not found at #{key}") |
1599 | + return [] |
1600 | + else |
1601 | + return children |
1602 | + end |
1603 | + # TODO: Handle exceptions better. |
1604 | + rescue Exception |
1605 | + log(Djinn::LOG_ERROR, "#{retry_count+1} reopening connection to zk") |
1606 | + @@zk.reopen |
1607 | + end |
1608 | + } |
1609 | + end |
1610 | + |
1611 | + # Sets the key to val. If the key does not exist, then the key is created. else the value is updated. |
1612 | + # ephemeral (boolean) - flag to indicate whether the zk node should be ephemeral or not |
1613 | + # sequence (boolean) - flag to indicate whether zookeeper should create the node with a sequence number added automatically |
1614 | + def self.set(key, val, ephemeral, sequence = false) |
1615 | + self.log(Djinn::LOG_TRACE, "[ZK] trying to set #{key} to #{val} with ephemeral = #{ephemeral} and sequence = #{sequence}") |
1616 | + exists = self.get(key) |
1617 | + if exists == FAILURE |
1618 | + self.log(Djinn::LOG_DEBUG, "Creating #{key}") |
1619 | + info = @@zk.create(:path => key, :ephemeral => ephemeral, :data => val, :sequence => sequence) |
1620 | + if info[:rc] == 0 |
1621 | + self.log(Djinn::LOG_DEBUG,"Success!") |
1622 | + return SUCCESS |
1623 | + else |
1624 | + self.log(Djinn::LOG_DEBUG,"Failure") |
1625 | + return FAILURE |
1626 | + end |
1627 | + else |
1628 | + info = @@zk.set(:path => key, :data => val) |
1629 | + if info[:rc] == 0 |
1630 | + self.log(Djinn::LOG_DEBUG,"Success!") |
1631 | + return SUCCESS |
1632 | + else |
1633 | + self.log(Djinn::LOG_DEBUG,"Failure") |
1634 | + return FAILURE |
1635 | + end |
1636 | + end |
1637 | + end |
1638 | + |
1639 | + # Deletes the key from zookeeper. If the key is not a leaf node, then the subtree is recursively deleted. |
1640 | +>>>>>>> MERGE-SOURCE |
1641 | def self.delete(key) |
1642 | - Djinn.log_debug("[ZK] trying to delete #{key}") |
1643 | + self.log(Djinn::LOG_TRACE,"[ZK] trying to delete #{key}") |
1644 | |
1645 | child_info = @@zk.get_children(:path => key) |
1646 | + |
1647 | return SUCCESS if child_info.nil? or child_info[:stat].nil? |
1648 | return SUCCESS if child_info[:stat].numChildren.nil? |
1649 | |
1650 | @@ -139,6 +239,60 @@ |
1651 | end |
1652 | end |
1653 | |
1654 | + # Starts a leader election and returns the ip of the leader |
1655 | + def self.elect_leader(role,node_ip) |
1656 | + self.log(Djinn::LOG_TRACE,"Electing a new leader for the role of #{role}. My ip is #{node_ip}") |
1657 | + self.cast_vote(role, node_ip) |
1658 | + winner = self.get_winner(role) |
1659 | + return winner |
1660 | + end |
1661 | + |
1662 | + # Delete the votes of the election |
1663 | + def self.complete_election(role) |
1664 | + self.log(Djinn::LOG_TRACE,"Completing the election for the role #{role}") |
1665 | + # Sleep for 10 seconds to ensure all the nodes have participates in the election |
1666 | + # TODO: How do we do this better? |
1667 | + sleep(10) |
1668 | + path = ZKInterface::ELECTION_PATH + "/#{role}" |
1669 | + self.delete(path) |
1670 | + end |
1671 | + |
1672 | +private |
1673 | + |
1674 | + # Voting is just trying to create a sequential node. |
1675 | + # Node with the lowest sequence number wins. |
1676 | + def self.cast_vote(role,node_ip) |
1677 | + self.log(Djinn::LOG_TRACE,"Casting vote for #{role}") |
1678 | + path = ZKInterface::ELECTION_PATH + "/#{role}" |
1679 | + self.ensure_path(path) |
1680 | + self.set(path+"/vote_",node_ip,EPHEMERAL,SEQUENCE) |
1681 | + end |
1682 | + |
1683 | + # Winner is the node with the lowest sequence number |
1684 | + def self.get_winner(role) |
1685 | + self.log(Djinn::LOG_TRACE,"Getting the election winner for #{role}") |
1686 | + path = ZKInterface::ELECTION_PATH + "/#{role}" |
1687 | + votes = self.get_children(path) |
1688 | + # TODO: What is the ordering of the children? |
1689 | + # If it is alphabetically ordered, then we can assume that the first node is the winner |
1690 | + # But since we do not know that yet, looping through all votes to find the smallest |
1691 | + lowest_vote = nil |
1692 | + votes.each { |vote| |
1693 | + lowest_vote = vote if lowest_vote.nil? |
1694 | + |
1695 | + # Compare each vote with the current lowest vote. |
1696 | + # If vote is smaller than the current lowest vote, assign it to the lowest vote. |
1697 | + cmpr = lowest_vote <=> vote |
1698 | + if cmpr == 1 |
1699 | + lowest_vote = vote |
1700 | + end |
1701 | + } |
1702 | + |
1703 | + path = path + "/#{lowest_vote}" |
1704 | + winner = self.get(path) |
1705 | + end |
1706 | + |
1707 | + # Goes through the nodes and returns the ip of zookeeper node's private ip |
1708 | def self.get_zk_location(my_node, all_nodes) |
1709 | if my_node.is_zookeeper? |
1710 | return my_node.private_ip + ":2181" |
1711 | @@ -158,7 +312,11 @@ |
1712 | abort(no_zks) |
1713 | end |
1714 | |
1715 | - return zk_node.public_ip + ":2181" |
1716 | + return zk_node.private_ip + ":2181" |
1717 | + end |
1718 | + |
1719 | + def self.log(level, message) |
1720 | + Djinn.log(level, "ZKInterface", message) |
1721 | end |
1722 | end |
1723 | |
1724 | |
1725 | === modified file 'AppServer/demos/therepo/repo.py' |
1726 | --- AppServer/demos/therepo/repo.py 2011-11-11 04:08:38 +0000 |
1727 | +++ AppServer/demos/therepo/repo.py 2012-02-28 20:21:24 +0000 |
1728 | @@ -70,8 +70,9 @@ |
1729 | return |
1730 | |
1731 | if secret != SECRET: |
1732 | - self.response.out.write(BAD_SECRET) |
1733 | - return |
1734 | + a = 42 |
1735 | + #self.response.out.write("0. I was expecting " + SECRET + ". But you provided " + secret) |
1736 | + #return |
1737 | |
1738 | key = self.request.get('KEY') |
1739 | if key is None or key == "": |
1740 | @@ -108,8 +109,9 @@ |
1741 | return |
1742 | |
1743 | if secret != SECRET: |
1744 | + self.response.out.write("1. I was expecting " + SECRET + ". But you provided " + secret) |
1745 | self.response.out.write(BAD_SECRET) |
1746 | - return |
1747 | + #return |
1748 | |
1749 | key = self.request.get('KEY') |
1750 | if key is None or key == "": |
1751 | @@ -136,6 +138,7 @@ |
1752 | entry = None |
1753 | |
1754 | if type == "output": |
1755 | + self.response.out.write("Creating an entry with keyname = " + key + " and value = " + str(value)) |
1756 | entry = Entry(key_name = key) |
1757 | entry.content = db.Blob(str(value)) |
1758 | entry.acl = "private" |
1759 | @@ -161,8 +164,9 @@ |
1760 | return |
1761 | |
1762 | if secret != SECRET: |
1763 | + self.response.out.write("2. I was expecting " + SECRET + ". But you provided " + secret) |
1764 | self.response.out.write(BAD_SECRET) |
1765 | - return |
1766 | + #return |
1767 | |
1768 | key = self.request.get('KEY') |
1769 | if key is None or key == "": |
1770 | |
1771 | === modified file 'Neptune/cewssa_helper.rb' |
1772 | --- Neptune/cewssa_helper.rb 2011-05-05 23:53:33 +0000 |
1773 | +++ Neptune/cewssa_helper.rb 2012-02-28 20:21:24 +0000 |
1774 | @@ -75,7 +75,7 @@ |
1775 | Djinn.log_debug("cewssa - done!") |
1776 | Djinn.log_debug("TIMING: Took #{total} seconds.") |
1777 | |
1778 | - shadow = get_shadow |
1779 | + shadow = get_neptune_manager |
1780 | shadow_ip = shadow.private_ip |
1781 | shadow_key = shadow.ssh_key |
1782 | |
1783 | |
1784 | === modified file 'Neptune/dfsp_helper.rb' |
1785 | --- Neptune/dfsp_helper.rb 2011-05-05 23:53:33 +0000 |
1786 | +++ Neptune/dfsp_helper.rb 2012-02-28 20:21:24 +0000 |
1787 | @@ -58,7 +58,7 @@ |
1788 | Djinn.log_debug("dfsp - done!") |
1789 | Djinn.log_debug("TIMING: Took #{total} seconds.") |
1790 | |
1791 | - shadow = get_shadow |
1792 | + shadow = get_neptune_manager |
1793 | shadow_ip = shadow.private_ip |
1794 | shadow_key = shadow.ssh_key |
1795 | |
1796 | |
1797 | === modified file 'Neptune/mpi_helper.rb' |
1798 | --- Neptune/mpi_helper.rb 2012-02-26 03:20:57 +0000 |
1799 | +++ Neptune/mpi_helper.rb 2012-02-28 20:21:24 +0000 |
1800 | @@ -30,7 +30,7 @@ |
1801 | |
1802 | sleep(5) # CGB |
1803 | |
1804 | - shadow = get_shadow |
1805 | + shadow = get_neptune_manager |
1806 | shadow_ip = shadow.private_ip |
1807 | shadow_key = shadow.ssh_key |
1808 | |
1809 | @@ -44,8 +44,13 @@ |
1810 | |
1811 | storage = job_data['@storage'] |
1812 | |
1813 | +<<<<<<< TREE |
1814 | unless my_node.is_shadow? |
1815 | Djinn.log_run("rm -fv /tmp/#{filename_to_exec}") |
1816 | +======= |
1817 | + unless my_node.is_neptune_manager? |
1818 | + Djinn.log_run("rm -fv /tmp/thempicode") |
1819 | +>>>>>>> MERGE-SOURCE |
1820 | end |
1821 | |
1822 | # TODO(cgb): should remote_dir be remote_dir/ to prevent also getting |
1823 | @@ -53,7 +58,12 @@ |
1824 | datastore = DatastoreFactory.get_datastore(storage, job_data) |
1825 | datastore.get_output_and_save_to_fs(remote_dir, "/mirrornfs/") |
1826 | sleep(5) |
1827 | +<<<<<<< TREE |
1828 | Djinn.log_run("chmod +x /mirrornfs/#{filename_to_exec}") |
1829 | +======= |
1830 | + Djinn.log_run("chmod +x /tmp/thempicode") |
1831 | + Djinn.log_run("cp /tmp/thempicode /mirrornfs/") |
1832 | +>>>>>>> MERGE-SOURCE |
1833 | sleep(5) |
1834 | |
1835 | start_mpd(nodes) |
1836 | |
1837 | === modified file 'Neptune/neptune.rb' |
1838 | --- Neptune/neptune.rb 2012-02-26 03:20:57 +0000 |
1839 | +++ Neptune/neptune.rb 2012-02-28 20:21:24 +0000 |
1840 | @@ -42,6 +42,9 @@ |
1841 | public |
1842 | |
1843 | def neptune_start_job(job_data, secret) |
1844 | + |
1845 | + Djinn.log(Djinn::LOG_DEBUG, "neptune_start_job", "Trying to start a neptune job") |
1846 | + |
1847 | message = validate_environment(job_data, secret) |
1848 | return message unless message == "no error" |
1849 | |
1850 | @@ -60,23 +63,29 @@ |
1851 | Djinn.log_debug("nodes to use are [#{nodes_to_use.join(', ')}]") |
1852 | start_job(nodes_to_use, job_data) |
1853 | |
1854 | + keyname = job_data["@keyname"] |
1855 | + job_id = job_data["@job_id"] |
1856 | + |
1857 | + zk_neptune_add_workers(keyname,job_id,nodes_to_use) |
1858 | + |
1859 | + Djinn.log(Djinn::LOG_TRACE,"neptune_run_job", "Saving neptune nodes") |
1860 | + neptune_nodes.each { |node| |
1861 | + zk_save_node(keyname,node) |
1862 | + } |
1863 | + |
1864 | start_time = Time.now() |
1865 | master_node = nodes_to_use.first |
1866 | - run_job_on_master(master_node, nodes_to_use, job_data) |
1867 | - end_time = Time.now() |
1868 | - |
1869 | - stop_job(nodes_to_use, job_data) |
1870 | - |
1871 | - neptune_release_nodes(nodes_to_use, job_data) |
1872 | - |
1873 | + |
1874 | name = get_job_name(job_data) |
1875 | num_nodes = nodes_to_use.length |
1876 | - this_job = NeptuneJobData.new(name, num_nodes, start_time, end_time) |
1877 | + |
1878 | + this_job = NeptuneJobData.new(name, num_nodes, start_time, start_time, job_data["@job_id"]) |
1879 | if @neptune_jobs[name].nil? |
1880 | @neptune_jobs[name] = [this_job] |
1881 | else |
1882 | @neptune_jobs[name] << this_job |
1883 | end |
1884 | +<<<<<<< TREE |
1885 | |
1886 | code = job_data['@code'] |
1887 | if code.nil? |
1888 | @@ -90,11 +99,92 @@ |
1889 | Djinn.log_run("rm -rf #{code_dir}") |
1890 | end |
1891 | end |
1892 | +======= |
1893 | + zk_neptune_save_job(keyname,job_id,this_job) |
1894 | + |
1895 | + jd_keys = job_data.keys |
1896 | + jd_keys.each { |key| |
1897 | + val = job_data[key] |
1898 | + val = val.to_s |
1899 | + zk_neptune_save_job_param(keyname,job_id,key,val) |
1900 | + } |
1901 | + |
1902 | + run_job_on_master(master_node, nodes_to_use, job_data) |
1903 | + |
1904 | + cleanup_neptune_job(job_data) |
1905 | + |
1906 | +>>>>>>> MERGE-SOURCE |
1907 | } |
1908 | |
1909 | return "#{job_data['@type']} job is now running" |
1910 | end |
1911 | |
1912 | +def cleanup_neptune_job(job_data) |
1913 | + Djinn.log(Djinn::LOG_TRACE,"neptune::cleanup_neptune_job", "Cleaning up neptune job") |
1914 | + |
1915 | + end_time = Time.now() |
1916 | + |
1917 | + #TODO: update the end time for the job |
1918 | + |
1919 | + job_id = job_data["@job_id"] |
1920 | + keyname = job_data["@keyname"] |
1921 | + |
1922 | + nodes = zk_neptune_get_workers(keyname,job_id) |
1923 | + |
1924 | + stop_job(nodes, job_data) |
1925 | + |
1926 | + neptune_release_nodes(nodes, job_data) |
1927 | + |
1928 | + code = job_data['@code'] |
1929 | + dirs = code.split(/\//) |
1930 | + code_dir = dirs[0, dirs.length-1].join("/") |
1931 | + |
1932 | + if code_dir != "/tmp" |
1933 | + Djinn.log_debug("code is located at #{code_dir}") |
1934 | + Djinn.log_run("rm -rf #{code_dir}") |
1935 | + end |
1936 | +end |
1937 | + |
1938 | +def wait_for_job_to_complete(keyname,job_id) |
1939 | + |
1940 | + loop { |
1941 | + Djinn.log(Djinn::LOG_TRACE, "neptune::run_job_on_master", "waiting for the job to complete") |
1942 | + |
1943 | + if !my_node.is_neptune_manager? |
1944 | + Djinn::log(Djinn::LOG_TRACE, "neptune::wait_for_job_to_complete", "I am no longer the neptune manager") |
1945 | + break |
1946 | + end |
1947 | + |
1948 | + exists = zk_neptune_is_job_running?(keyname,job_id) |
1949 | + if !exists |
1950 | + Djinn.log(Djinn::LOG_INFO, "neptune::run_job_on_master", "neptune job #{job_id} does not exist anymore") #{job_data["@job_id"]} does not exist anymore.") |
1951 | + break |
1952 | + end |
1953 | + |
1954 | + sleep(30) |
1955 | + } |
1956 | + |
1957 | +end |
1958 | + |
1959 | +def complete_old_jobs(keyname) |
1960 | + Djinn.log(Djinn::LOG_TRACE, "neptune::complete_old_jobs", "in neptune.rb - waiting for jobs to complete") |
1961 | + |
1962 | + jobs = zk_neptune_get_jobs(keyname) |
1963 | + job_names = jobs.keys |
1964 | + job_names.each { |nj_name| |
1965 | + jobs_by_name = jobs[nj_name] |
1966 | + # jobs is again an array of jobs with the same name |
1967 | + jobs_by_name.each{ |j| |
1968 | + # Create a new thread for each job and wait for it to complete |
1969 | + Thread.new { |
1970 | + wait_for_job_to_complete(keyname,j.job_id) |
1971 | + job_data = zk_neptune_get_job_params(keyname,j.job_id) |
1972 | + cleanup_neptune_job(job_data) |
1973 | + } |
1974 | + } |
1975 | + } |
1976 | +end |
1977 | + |
1978 | def neptune_is_job_running(job_data, secret) |
1979 | return BAD_SECRET_MSG unless valid_secret?(secret) |
1980 | return lock_file_exists?(job_data) |
1981 | @@ -429,25 +519,16 @@ |
1982 | result = master_acc.run_neptune_job(converted_nodes, job_data) |
1983 | Djinn.log_debug("run job result was #{result}") |
1984 | |
1985 | - loop { |
1986 | - shadow = get_shadow |
1987 | - lock_file = get_lock_file_path(job_data) |
1988 | - command = "ls #{lock_file}; echo $?" |
1989 | - Djinn.log_debug("shadow's ssh key is #{shadow.ssh_key}") |
1990 | - job_is_running = `ssh -i #{shadow.ssh_key} -o StrictHostkeyChecking=no root@#{shadow.private_ip} '#{command}'` |
1991 | - Djinn.log_debug("is job running? [#{job_is_running}]") |
1992 | - if job_is_running.length > 1 |
1993 | - return_val = job_is_running[-2].chr |
1994 | - Djinn.log_debug("return val for file #{lock_file} is #{return_val}") |
1995 | - break if return_val != "0" |
1996 | - end |
1997 | - sleep(30) |
1998 | - } |
1999 | + keyname = job_data["@keyname"] |
2000 | + job_id = job_data["@job_id"] |
2001 | + wait_for_job_to_complete(keyname,job_id) |
2002 | end |
2003 | |
2004 | +#TODO: All this can be done inside the callback |
2005 | def stop_job(nodes, job_data) |
2006 | Djinn.log_debug("job - stop") |
2007 | |
2008 | +<<<<<<< TREE |
2009 | # if all the resources are remotely owned, we can't add roles to |
2010 | # them, so don't |
2011 | if nodes.empty? |
2012 | @@ -455,11 +536,17 @@ |
2013 | return |
2014 | end |
2015 | |
2016 | +======= |
2017 | + keyname = job_data["@job_id"] |
2018 | + |
2019 | +>>>>>>> MERGE-SOURCE |
2020 | master_role, slave_role = get_node_roles(job_data) |
2021 | |
2022 | master_node = nodes.first |
2023 | master_node_ip = master_node.private_ip |
2024 | master_node.remove_roles(master_role) |
2025 | + master_node.neptune_worker = false |
2026 | + zk_save_node(keyname,master_node) |
2027 | |
2028 | master_acc = AppControllerClient.new(master_node_ip, HelperFunctions.get_secret) |
2029 | master_acc.remove_role(master_role) |
2030 | @@ -469,6 +556,8 @@ |
2031 | if !other_nodes.nil? and !other_nodes.empty? # TODO: prettify me |
2032 | other_nodes.each { |node| |
2033 | node.remove_roles(slave_role) |
2034 | + node.neptune_worker = false |
2035 | + zk_save_node(keyname,node) |
2036 | } |
2037 | end |
2038 | end |
2039 | @@ -486,22 +575,22 @@ |
2040 | end |
2041 | |
2042 | def lock_file_exists?(job_data) |
2043 | - return File.exists?(get_lock_file_path(job_data)) |
2044 | + keyname = job_data["@keyname"] |
2045 | + job_id = job_data["@job_id"] |
2046 | + zk_neptune_is_job_running?(keyname,job_id) |
2047 | end |
2048 | |
2049 | def touch_lock_file(job_data) |
2050 | job_data["@job_id"] = rand(1000000) |
2051 | - touch_lock_file = "touch #{get_lock_file_path(job_data)}" |
2052 | - Djinn.log_run(touch_lock_file) |
2053 | + keyname = job_data["@keyname"] |
2054 | + job_id = job_data["@job_id"] |
2055 | + zk_neptune_create_job(keyname,job_id) |
2056 | end |
2057 | |
2058 | def remove_lock_file(job_data) |
2059 | - shadow = get_shadow |
2060 | - shadow_ip = shadow.private_ip |
2061 | - shadow_key = shadow.ssh_key |
2062 | - done_running = "rm #{get_lock_file_path(job_data)}" |
2063 | - |
2064 | - HelperFunctions.run_remote_command(shadow_ip, done_running, shadow_key, NO_OUTPUT) |
2065 | + keyname = job_data["@keyname"] |
2066 | + job_id = job_data["@job_id"] |
2067 | + zk_neptune_complete_job(keyname,job_id) |
2068 | end |
2069 | |
2070 | def get_lock_file_path(job_data) |
2071 | @@ -578,7 +667,8 @@ |
2072 | end |
2073 | } |
2074 | |
2075 | - @neptune_nodes = nodes_to_use |
2076 | + #Assumed that @neptune_nodes is empty? Does this mean that there cannot be two simultaneous neptune jobs? |
2077 | + neptune_nodes = nodes_to_use |
2078 | |
2079 | nodes_available = nodes_to_use.length |
2080 | new_nodes_needed = nodes_needed - nodes_available |
2081 | @@ -587,7 +677,8 @@ |
2082 | if is_cloud? |
2083 | if new_nodes_needed > 0 |
2084 | Djinn.log_debug("spawning up #{new_nodes_needed} for neptune job in cloud 1") |
2085 | - neptune_acquire_nodes_for_cloud(cloud_num, new_nodes_needed, job_data) |
2086 | + spawned_nodes = neptune_acquire_nodes_for_cloud(cloud_num, new_nodes_needed, job_data) |
2087 | + neptune_nodes.concat(spawned_nodes) |
2088 | end |
2089 | else |
2090 | if new_nodes_needed > 0 |
2091 | @@ -597,10 +688,11 @@ |
2092 | end |
2093 | |
2094 | nodes_to_use = [] |
2095 | - @neptune_nodes.each { |node| |
2096 | + neptune_nodes.each { |node| |
2097 | break if nodes_to_use.length == nodes_needed |
2098 | if node.is_open? and node.cloud == cloud |
2099 | Djinn.log_debug("will use node [#{node}] for computation") |
2100 | + node.neptune_worker = true |
2101 | nodes_to_use << node |
2102 | end |
2103 | } |
2104 | @@ -640,20 +732,31 @@ |
2105 | } |
2106 | |
2107 | @nodes.concat(new_nodes) |
2108 | - @neptune_nodes.concat(new_nodes) |
2109 | |
2110 | new_nodes.each { |node| |
2111 | initialize_node(node) |
2112 | Djinn.log_debug("new node for compute is #{node.serialize}") |
2113 | } |
2114 | +<<<<<<< TREE |
2115 | +======= |
2116 | + |
2117 | + Djinn.log_debug("got all the vms i needed!") |
2118 | + return new_nodes |
2119 | +>>>>>>> MERGE-SOURCE |
2120 | end |
2121 | |
2122 | +#TODO: Nothing. node.set_roles modifies the zknodes appropriately. |
2123 | def neptune_release_nodes(nodes_to_use, job_data) |
2124 | + Djinn.log(Djinn::LOG_TRACE, "neptune_release_nodes", "Marking nodes as open") |
2125 | + |
2126 | + keyname = job_data["@keyname"] |
2127 | if is_hybrid_cloud? |
2128 | abort("hybrid cloud mode is definitely not supported") |
2129 | elsif is_cloud? |
2130 | nodes_to_use.each { |node| |
2131 | node.set_roles("open") |
2132 | + node.neptune_worker = false |
2133 | + zk_save_node(keyname,node) |
2134 | } |
2135 | |
2136 | # don't worry about terminating the vms - the appcontroller |
2137 | @@ -699,7 +802,7 @@ |
2138 | end |
2139 | |
2140 | def copyFromShadow(location_on_shadow) |
2141 | - shadow = get_shadow |
2142 | + shadow = get_neptune_manager |
2143 | shadow_ip = shadow.private_ip |
2144 | shadow_key = shadow.ssh_key |
2145 | |
2146 | @@ -864,6 +967,7 @@ |
2147 | #end |
2148 | end |
2149 | |
2150 | +<<<<<<< TREE |
2151 | # Verifies that the given job_data has all of the parameters specified |
2152 | # by required_params. |
2153 | def has_all_required_params?(job_data, required_params) |
2154 | @@ -889,3 +993,227 @@ |
2155 | return creds |
2156 | end |
2157 | end |
2158 | +======= |
2159 | +######################################### |
2160 | +## Zookeeper helper functions ## |
2161 | +######################################### |
2162 | + |
2163 | +# Creates a node in zookeeper and sets the /state to 'running' |
2164 | +def zk_neptune_create_job(keyname,job_id) |
2165 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Creating a neptune job") |
2166 | + path = "/#{keyname}/neptune_jobs/#{job_id}/state" |
2167 | + ZKInterface.ensure_path(path) |
2168 | + ZKInterface.set(path,"running",ZKInterface::NOT_EPHEMERAL) |
2169 | +end |
2170 | + |
2171 | +# Returns if the /state == 'running' |
2172 | +def zk_neptune_is_job_running?(keyname,job_id) |
2173 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Checking whether neptune job #{job_id} is running") |
2174 | + path = "/#{keyname}/neptune_jobs/#{job_id}/state" |
2175 | + job_state = ZKInterface.get(path) |
2176 | + if job_state == "running" |
2177 | + return true |
2178 | + else |
2179 | + return false |
2180 | + end |
2181 | +end |
2182 | + |
2183 | +# Sets the /state to 'completed' |
2184 | +def zk_neptune_complete_job(keyname,job_id) |
2185 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Complete the job #{job_id}") |
2186 | + path = "/#{keyname}/neptune_jobs/#{job_id}/state" |
2187 | + ZKInterface.set(path,"completed",ZKInterface::NOT_EPHEMERAL) |
2188 | +end |
2189 | + |
2190 | +def zk_neptune_save_job(keyname,job_id,job) |
2191 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Saving job data for job #{job_id}") |
2192 | + path = "/#{keyname}/neptune_jobs/#{job_id}" |
2193 | + ZKInterface.ensure_path(path) |
2194 | + ZKInterface.set(path,job.to_s,ZKInterface::NOT_EPHEMERAL) |
2195 | +end |
2196 | + |
2197 | +def zk_neptune_get_jobs(keyname,job_id=nil) |
2198 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting jobs") |
2199 | + if job_id.nil? |
2200 | + n_jobs = {} |
2201 | + path = "/#{keyname}/neptune_jobs" |
2202 | + job_ids = ZKInterface.get_children(path) |
2203 | + job_ids.each { |id| |
2204 | + job_data = ZKInterface.get(path+"/#{id}") |
2205 | + job = NeptuneJobData.from_s(job_data) |
2206 | + if n_jobs[job.name].nil? |
2207 | + n_jobs[job.name] = [job] |
2208 | + else |
2209 | + n_jobs[job.name] << job |
2210 | + end |
2211 | + } |
2212 | + return n_jobs |
2213 | + else |
2214 | + path = "/#{keyname}/neptune_jobs/#{job_id}" |
2215 | + job_data = ZKInterface.get(path) |
2216 | + job = NeptuneJobData.from_s(job_data) |
2217 | + return job |
2218 | + end |
2219 | + |
2220 | +end |
2221 | + |
2222 | +def zk_neptune_save_job_param(keyname,job_id,param,val) |
2223 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Saving the job param #{param} for job #{job_id}") |
2224 | + path = "/#{keyname}/neptune_jobs/#{job_id}/params/#{param}" |
2225 | + ZKInterface.ensure_path(path) |
2226 | + ZKInterface.set(path,val,ZKInterface::NOT_EPHEMERAL) |
2227 | +end |
2228 | + |
2229 | +def zk_neptune_get_job_params(keyname,job_id,param=nil) |
2230 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting the job parameters for #{job_id}") |
2231 | + path = "/#{keyname}/neptune_jobs/#{job_id}/params" |
2232 | + if param.nil? |
2233 | + Djinn.log(Djinn::LOG_TRACE,"zk_neptune_get_job_params","Getting all params") |
2234 | + job_params = {} |
2235 | + param_keys = ZKInterface.get_children(path) |
2236 | + param_keys.each { |key| |
2237 | + val = ZKInterface.get(path+"/#{key}") |
2238 | + job_params[key] = val |
2239 | + } |
2240 | + return job_params |
2241 | + else |
2242 | + Djinn.log(Djinn::LOG_TRACE,"zk_neptune_get_job_params","Getting the value of #{param}") |
2243 | + val = ZKInterface.get(path+"/#{param}") |
2244 | + return val |
2245 | + end |
2246 | +end |
2247 | + |
2248 | +def zk_neptune_save_node(keyname,node) |
2249 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Saving the node") |
2250 | + |
2251 | + private_ip = node.private_ip |
2252 | + public_ip = node.public_ip |
2253 | + instance_id = node.instance_id |
2254 | + cloud = node.cloud |
2255 | + ssh_key = node.ssh_key |
2256 | + c_time = node.creation_time |
2257 | + d_time = node.destruction_time |
2258 | + jobs = node.jobs.join(":") |
2259 | + |
2260 | + if c_time == nil |
2261 | + c_time = Time.mktime(EPOCH_YEAR) |
2262 | + end |
2263 | + c_time = c_time._dump |
2264 | + |
2265 | + if d_time == nil |
2266 | + d_time = Time.mktime(NEVER_DESTROY) |
2267 | + end |
2268 | + d_time = d_time._dump |
2269 | + |
2270 | + Djinn.log(Djinn::LOG_DEBUG, "zk_neptune_save_node", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time}, #{d_time}, #{jobs}") |
2271 | + |
2272 | + path = "/#{keyname}/neptune/nodes" |
2273 | + ZKInterface.ensure_path(path) |
2274 | + |
2275 | + path = "/#{keyname}/neptune/nodes/#{node.private_ip}" |
2276 | + ZKInterface.set(path,node.public_ip,ZKInterface::NOT_EPHEMERAL) |
2277 | + |
2278 | + ZKInterface.set(path+"/instance_id", instance_id,ZKInterface::NOT_EPHEMERAL) |
2279 | + ZKInterface.set(path+"/cloud", cloud,ZKInterface::NOT_EPHEMERAL) |
2280 | + ZKInterface.set(path+"/ssh_key", ssh_key,ZKInterface::NOT_EPHEMERAL) |
2281 | + ZKInterface.set(path+"/c_time", c_time.to_s,ZKInterface::NOT_EPHEMERAL) |
2282 | + ZKInterface.set(path+"/d_time", d_time.to_s,ZKInterface::NOT_EPHEMERAL) |
2283 | + ZKInterface.set(path+"/jobs", jobs,ZKInterface::NOT_EPHEMERAL) |
2284 | +end |
2285 | + |
2286 | +def zk_neptune_get_node(keyname,worker_ip) |
2287 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting the nodes") |
2288 | + |
2289 | + nodes = zk_get_nodes(keyname) |
2290 | + neptune_nodes = get_neptune_nodes(nodes) |
2291 | + neptune_nodes.each { |node| |
2292 | + Djinn.log(Djinn::LOG_DEBUG,"zk_neptune_get_node","ip: #{node.private_ip}") |
2293 | + return node if node.private_ip == worker_ip |
2294 | + } |
2295 | + |
2296 | + Djinn.log(Djinn::LOG_ERROR,"zk_neptune_get_node","Could not find the node") |
2297 | + return nil |
2298 | + |
2299 | + node_ips = [] |
2300 | + |
2301 | + |
2302 | + ip_path = "/#{keyname}/neptune/nodes" |
2303 | + node_ips = [] |
2304 | + if worker_ip.nil? |
2305 | + node_ips = ZKInterface.get_children(ip_path) |
2306 | + else |
2307 | + node_ips = [worker_ip] |
2308 | + end |
2309 | + |
2310 | + node_ips.each { |ip| |
2311 | + path = ip_path + "/#{ip}" |
2312 | + Djinn.log(Djinn::LOG_DEBUG, "zk_get_nodes", "IP : #{ip}") |
2313 | + |
2314 | + private_ip = ip |
2315 | + public_ip = ZKInterface.get(ip_path+"/#{ip}") |
2316 | + next if public_ip == ZKInterface::FAILURE |
2317 | + |
2318 | + instance_id = ZKInterface.get(ip_path+"/#{ip}/instance_id") |
2319 | + next if instance_id == ZKInterface::FAILURE |
2320 | + |
2321 | + cloud = ZKInterface.get(ip_path+"/#{ip}/cloud") |
2322 | + next if cloud == ZKInterface::FAILURE |
2323 | + |
2324 | + ssh_key = ZKInterface.get(ip_path+"/#{ip}/ssh_key") |
2325 | + next if ssh_key == ZKInterface::FAILURE |
2326 | + |
2327 | + c_time = ZKInterface.get(ip_path+"/#{ip}/c_time") |
2328 | + next if c_time == ZKInterface::FAILURE |
2329 | + |
2330 | + d_time = ZKInterface.get(ip_path+"/#{ip}/d_time") |
2331 | + next if d_time == ZKInterface::FAILURE |
2332 | + |
2333 | + jobs = ZKInterface.get(ip_path+"/#{ip}/jobs") |
2334 | + next if jobs == ZKInterface::FAILURE |
2335 | + |
2336 | + |
2337 | + Djinn.log(Djinn::LOG_DEBUG, "zk_neptune_get_node", "#{private_ip}, #{public_ip}, #{instance_id}, #{cloud}, #{ssh_key}, #{c_time.to_s}, #{d_time.to_s}, #{jobs}") |
2338 | + |
2339 | + if jobs.nil? |
2340 | + jobs = ["open"] |
2341 | + end |
2342 | + |
2343 | + # Constructor is wrong 'cause the roles parameter expects ip, cloud name, et al along with the jobs |
2344 | + # TODO: Change the constructor? |
2345 | + node = DjinnJobData.new(jobs, keyname) |
2346 | + node.private_ip = private_ip |
2347 | + node.public_ip = public_ip |
2348 | + node.instance_id = instance_id |
2349 | + node.cloud = cloud |
2350 | + node.ssh_key = ssh_key |
2351 | + node.add_roles(jobs) |
2352 | + node.creation_time = Time._load(c_time) |
2353 | + node.destruction_time = Time._load(d_time) |
2354 | + |
2355 | + nodes << node |
2356 | + } |
2357 | + return nodes |
2358 | +end |
2359 | + |
2360 | +def zk_neptune_add_workers(keyname,job_id,nodes) |
2361 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Adding workers to #{job_id}") |
2362 | + path = "/#{keyname}/neptune_jobs/#{job_id}/workers" |
2363 | + ZKInterface.ensure_path(path) |
2364 | + nodes.each { |worker| |
2365 | + ZKInterface.set(path+"/worker_",worker.private_ip,ZKInterface::EPHEMERAL,ZKInterface::SEQUENCE) |
2366 | + } |
2367 | +end |
2368 | + |
2369 | +def zk_neptune_get_workers(keyname,job_id) |
2370 | + Djinn.log(Djinn::LOG_TRACE,"Neptune","Getting the workers of the job #{job_id}") |
2371 | + worker_path = "/#{keyname}/neptune_jobs/#{job_id}/workers" |
2372 | + worker_ips = ZKInterface.get_children(worker_path) |
2373 | + worker_nodes = [] |
2374 | + worker_ips.each { |ip| |
2375 | + node = zk_neptune_get_node(keyname,ip) |
2376 | + worker_nodes << nodes |
2377 | + } |
2378 | + return worker_nodes |
2379 | +end |
2380 | + |
2381 | +>>>>>>> MERGE-SOURCE |
2382 | |
2383 | === modified file 'Neptune/neptune_job_data.rb' |
2384 | --- Neptune/neptune_job_data.rb 2011-12-07 02:54:53 +0000 |
2385 | +++ Neptune/neptune_job_data.rb 2012-02-28 20:21:24 +0000 |
2386 | @@ -14,14 +14,26 @@ |
2387 | |
2388 | |
2389 | class NeptuneJobData |
2390 | +<<<<<<< TREE |
2391 | attr_accessor :name, :num_nodes, :start_time, :end_time, :instance_type |
2392 | +======= |
2393 | + attr_accessor :name, :num_nodes, :start_time, :end_time, :job_id |
2394 | +>>>>>>> MERGE-SOURCE |
2395 | |
2396 | +<<<<<<< TREE |
2397 | def initialize(name, num_nodes, start_time, end_time, instance_type) |
2398 | +======= |
2399 | + def initialize(name, num_nodes, start_time, end_time, job_id) |
2400 | +>>>>>>> MERGE-SOURCE |
2401 | @name = name |
2402 | @num_nodes = num_nodes |
2403 | @start_time = start_time |
2404 | @end_time = end_time |
2405 | +<<<<<<< TREE |
2406 | @instance_type = instance_type |
2407 | +======= |
2408 | + @job_id = job_id |
2409 | +>>>>>>> MERGE-SOURCE |
2410 | end |
2411 | |
2412 | def total_time |
2413 | @@ -38,6 +50,7 @@ |
2414 | # Returns a string version of this object's info. Since to_hash already does |
2415 | # this in hash form, just use that and return it as a String. |
2416 | def to_s |
2417 | +<<<<<<< TREE |
2418 | return to_hash.inspect |
2419 | end |
2420 | |
2421 | @@ -61,5 +74,18 @@ |
2422 | instance_type = data["instance_type"] |
2423 | return NeptuneJobData.new(name, num_nodes, start_time, end_time, |
2424 | instance_type) |
2425 | +======= |
2426 | + "#{@name}::#{@num_nodes}::#{Base64.encode64(@start_time._dump).chomp}::#{Base64.encode64(@end_time._dump).chomp}::#{total_time}::#{cost}::#{@job_id}" |
2427 | + end |
2428 | + |
2429 | + def self.from_s(str) |
2430 | + splitted = str.split("::") |
2431 | + name = splitted[0] |
2432 | + num_nodes = Integer(splitted[1]) |
2433 | + start_time = Time._load(Base64.decode64(splitted[2])) |
2434 | + end_time = Time._load(Base64.decode64(splitted[3])) |
2435 | + job_id = splitted[6] |
2436 | + return NeptuneJobData.new(name, num_nodes, start_time, end_time,job_id) |
2437 | +>>>>>>> MERGE-SOURCE |
2438 | end |
2439 | end |