Merge lp:~mirsad-vojnikovic-deactivatedaccount/lava-dispatcher/scheduler-daemon into lp:lava-dispatcher

Proposed by Mirsad Vojnikovic
Status: Superseded
Proposed branch: lp:~mirsad-vojnikovic-deactivatedaccount/lava-dispatcher/scheduler-daemon
Merge into: lp:lava-dispatcher
Diff against target: 332 lines (+311/-0)
4 files modified
lava-schedulerd (+100/-0)
scheduler/daemon/async_job.py (+33/-0)
scheduler/daemon/daemon.py (+133/-0)
scheduler/daemon/helpers.py (+45/-0)
To merge this branch: bzr merge lp:~mirsad-vojnikovic-deactivatedaccount/lava-dispatcher/scheduler-daemon
Reviewer Review Type Date Requested Status
Linaro Validation Team Pending
Review via email: mp+55240@code.launchpad.net

This proposal has been superseded by a proposal from 2011-04-05.

Description of the change

Here is a daemon proposal, very modest actually. It takes one job definition from TestJob table and executes it in a dispatcher class I created for testing purposes. This dispatcher class will not do anything special, save print the action names and wait 10 seconds before going on. It is rather easy to modify to do the real job. I think this is a good starter for the daemon job which is in front of us.

Next step is to put more jobs as threads, while updating status of device and test job.

To post a comment you must log in.
Revision history for this message
Mirsad Vojnikovic (mirsad-vojnikovic-deactivatedaccount) wrote :

Sorry, forgot to mention how to run the daemon:

 * 'schedulerd.py start' - start
 * 'schedulerd.py stop' - stop
 * 'schedulerd.py debug' - debug mode, printing messages to terminal

Revision history for this message
Spring Zhang (qzhang) wrote :

110 + message = "pidfile %s does not exist. Daemon not running?\n"
111 + sys.stderr.write(message % self.pidfile)
It's better to place message in one line, or it seems the message is split.Like:
message = "pidfile %s does not exist. Daemon not running?\n" %self.pidfile

It seems your alignment is by 8 spaces/<tab>, I remember it is 4 spaces. Can Paul confirm?

155 + self.connection = sqlite3.connect('./scheduler/database.db')
Can it be a constant variable for './scheduler/database.db'? Not hardcode in function

71 + file(self.pidfile,'w+').write("%s\n" % pid)
The file object needs to close?

150 + def __init__(self):
151 + self.connection = None
152 + self.cursor = None
153 +
154 + def connect_db(self):
155 + self.connection = sqlite3.connect('./scheduler/database.db')
156 + self.cursor = self.connection.cursor()
Can the two methods merge to __init__()?

207 + db_helper = DbHelper()
And can the database name call from caller by a parameter, not specify in DbHelper class?

188 + #params = cmd.get('parameters', {})
189 + #step = lava_commands[cmd['command']](target)
190 + #step.run(**params)
191 +
192 +
Please delete unused statement.

You can use the dispatcher.py from lp:lava, just merge from lp:lava in your branch.

Revision history for this message
Mirsad Vojnikovic (mirsad-vojnikovic-deactivatedaccount) wrote :

Spring, thanks for your comments, but I have decided to drop this proposal since it lacks some important stuff. I will try to reuse your comments for the next proposal instead.

22. By Paul Larson

add a dispatcher.run_job() to actually do the work, so that it can be
called from the outside by things like the scheduler

23. By Paul Larson

Use virtualenv to install the python dependencies and reorganize
entrypoints a bit.

24. By Paul Larson

enable login with openid

25. By Paul Larson

Hi, this branch enables the admin interface for the scheduler
application. Creating a superuser for yourself locally when using OpenID
is a bit tedious, but I've included a helper and instructions how to do
it in the README.

26. By Paul Larson

update top level structure of lava

27. By Paul Larson

Merge initial support for submitting results to launch control

28. By Michael Hudson-Doyle

small README updates

29. By Michael Hudson-Doyle

fix ROOT_URLCONF

30. By Michael Hudson-Doyle

some more clues for doc/QUICKSTART

31. By Paul Larson

allow graphical tests to run properly by exporting the display on the
client

32. By Michael Hudson-Doyle

mention some packages that need to be installed

33. By Paul Larson

Fix make to work under lucid too

34. By Michael Hudson-Doyle

borrow style and templates from launch-control

merges ~mirsad-vojnikovic/lava/scheduler-l-c-style with a tweak from me

35. By Michael Hudson-Doyle

this seems to be needed

36. By Paul Larson

results need to be searched for in /tmp

37. By Mirsad <mirsad@ubuntu1004desktop>

Updated daemon for latest comments. Added tags field to Device table. Changed to GPL2+

Unmerged revisions

37. By Mirsad <mirsad@ubuntu1004desktop>

Updated daemon for latest comments. Added tags field to Device table. Changed to GPL2+

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'lava-schedulerd'
--- lava-schedulerd 1970-01-01 00:00:00 +0000
+++ lava-schedulerd 2011-04-05 21:11:05 +0000
@@ -0,0 +1,100 @@
1#!/usr/bin/env python
2
3import sys, time, json
4from scheduler.daemon.daemon import Daemon
5from scheduler.daemon.helpers import DbHelper, TestResult
6from scheduler.daemon.async_job import AsyncJob
7from dispatcher import run_job
8
9class SchedulerDaemon(Daemon):
10 def run(self):
11 db_helper = DbHelper()
12
13 while True:
14 #Pick up next job
15 db_helper.connect_db()
16 next_job = db_helper.get_next_job()
17
18 if next_job != None:
19 #Prepare TestResult object
20 test_result = TestResult(
21 job_id = next_job['id'],
22 device_id = next_job['target_id'],
23 result = 0
24 )
25
26 #Initiate async job
27 job = AsyncJob(
28 target = worker,
29 callback = callback,
30 timeout = next_job['timeout'],
31 timeout_callback = timeout_callback,
32 test_result = test_result,
33 args = (next_job['definition'],)
34 )
35
36 #Set device to RUNNING state
37 db_helper.set_device_state(next_job['target_id'], 2)
38 #Set job to RUNNING state
39 db_helper.set_job_state(next_job['id'], 1)
40
41 #Start job
42 job.start()
43
44 db_helper.disconnect_db()
45 time.sleep(1)
46
47def timeout_callback():
48 """
49 Callback to handle job timeout
50 """
51 #What to do here?
52 #Update job, device status
53 pass
54
55def callback(test_result=None):
56 """
57 Callback to handle normal job ending
58 """
59 #Connect to database
60 db_helper = DbHelper()
61 db_helper.connect_db()
62
63 #Set device to IDLE state
64 db_helper.set_device_state(test_result.device_id, 1)
65 #Set job to COMPLETE state
66 db_helper.set_job_state(test_result.job_id, 2)
67
68 #Disconnect from database
69 db_helper.disconnect_db()
70
71def worker(next_job, q, test_result):
72 """
73 Worker function running jobs
74 """
75 #Missing result from dispatcher here
76 run_job(json.loads(next_job))
77
78 test_result.result = 1
79 q.put(test_result)
80
81
82if __name__ == "__main__":
83 daemon = SchedulerDaemon('/tmp/scheduler-daemon.pid')
84 if len(sys.argv) == 2:
85 if 'start' == sys.argv[1]:
86 daemon.start()
87 elif 'stop' == sys.argv[1]:
88 daemon.stop()
89 elif 'restart' == sys.argv[1]:
90 daemon.restart()
91 elif 'debug' == sys.argv[1]:
92 #Print to terminal, debug purpose only!
93 daemon.run()
94 else:
95 print "Unknown command"
96 sys.exit(2)
97 sys.exit(0)
98 else:
99 print "usage: %s start|stop|restart" % sys.argv[0]
100 sys.exit(2)
0101
=== added directory 'scheduler/daemon'
=== added file 'scheduler/daemon/__init__.py'
=== added file 'scheduler/daemon/async_job.py'
--- scheduler/daemon/async_job.py 1970-01-01 00:00:00 +0000
+++ scheduler/daemon/async_job.py 2011-04-05 21:11:05 +0000
@@ -0,0 +1,33 @@
1from multiprocessing import Queue, Process
2
3class AsyncJob(object):
4 def __init__(self, target=None, callback=None, timeout=None,
5 timeout_callback=None, test_result=None, args=(), kwargs={}):
6
7 self.target = target
8 self.callback = callback
9 self.timeout = timeout
10 self.timeout_callback = timeout_callback
11 self.test_result = test_result
12 self.args = args
13 self.kwargs = kwargs
14
15 def start(self):
16 self.proc = Process(target=self._proxy)
17 self.proc.start()
18
19 def _proxy(self):
20 # Get result from worker via Queue
21 q = Queue()
22
23 p = Process(target=self.target, args=(self.args[0], q, self.test_result))
24 p.start()
25 p.join(timeout=self.timeout)
26
27 if p.is_alive():
28 p.terminate()
29 if self.timeout_callback:
30 self.timeout_callback()
31 else:
32 if self.callback:
33 self.callback(q.get())
034
=== added file 'scheduler/daemon/daemon.py'
--- scheduler/daemon/daemon.py 1970-01-01 00:00:00 +0000
+++ scheduler/daemon/daemon.py 2011-04-05 21:11:05 +0000
@@ -0,0 +1,133 @@
1#!/usr/bin/env python
2"""
3Python daemon from:
4http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
5"""
6
7import sys, os, time, atexit
8from signal import SIGTERM
9
10class Daemon:
11 """
12 A generic daemon class.
13
14 Usage: subclass the Daemon class and override the run() method
15 """
16 def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
17 self.stdin = stdin
18 self.stdout = stdout
19 self.stderr = stderr
20 self.pidfile = pidfile
21
22 def daemonize(self):
23 """
24 do the UNIX double-fork magic, see Stevens' "Advanced
25 Programming in the UNIX Environment" for details (ISBN 0201563177)
26 http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
27 """
28 try:
29 pid = os.fork()
30 if pid > 0:
31 # exit first parent
32 sys.exit(0)
33 except OSError, e:
34 sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
35 sys.exit(1)
36
37 # decouple from parent environment
38 os.chdir("/")
39 os.setsid()
40 os.umask(0)
41
42 # do second fork
43 try:
44 pid = os.fork()
45 if pid > 0:
46 # exit from second parent
47 sys.exit(0)
48 except OSError, e:
49 sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
50 sys.exit(1)
51
52 # redirect standard file descriptors
53 sys.stdout.flush()
54 sys.stderr.flush()
55 si = file(self.stdin, 'r')
56 so = file(self.stdout, 'a+')
57 se = file(self.stderr, 'a+', 0)
58 os.dup2(si.fileno(), sys.stdin.fileno())
59 os.dup2(so.fileno(), sys.stdout.fileno())
60 os.dup2(se.fileno(), sys.stderr.fileno())
61
62 # write pidfile
63 atexit.register(self.delpid)
64 pid = str(os.getpid())
65 file(self.pidfile,'w+').write("%s\n" % pid)
66
67 def delpid(self):
68 os.remove(self.pidfile)
69
70 def start(self):
71 """
72 Start the daemon
73 """
74 # Check for a pidfile to see if the daemon already runs
75 try:
76 pf = file(self.pidfile,'r')
77 pid = int(pf.read().strip())
78 pf.close()
79 except IOError:
80 pid = None
81
82 if pid:
83 message = "pidfile %s already exist. Daemon already running?\n"
84 sys.stderr.write(message % self.pidfile)
85 sys.exit(1)
86
87 # Start the daemon
88 self.daemonize()
89 self.run()
90
91 def stop(self):
92 """
93 Stop the daemon
94 """
95 # Get the pid from the pidfile
96 try:
97 pf = file(self.pidfile,'r')
98 pid = int(pf.read().strip())
99 pf.close()
100 except IOError:
101 pid = None
102
103 if not pid:
104 message = "pidfile %s does not exist. Daemon not running?\n"
105 sys.stderr.write(message % self.pidfile)
106 return # not an error in a restart
107
108 # Try killing the daemon process
109 try:
110 while 1:
111 os.kill(pid, SIGTERM)
112 time.sleep(0.1)
113 except OSError, err:
114 err = str(err)
115 if err.find("No such process") > 0:
116 if os.path.exists(self.pidfile):
117 os.remove(self.pidfile)
118 else:
119 print str(err)
120 sys.exit(1)
121
122 def restart(self):
123 """
124 Restart the daemon
125 """
126 self.stop()
127 self.start()
128
129 def run(self):
130 """
131 You should override this method when you subclass Daemon. It will be called after the process has been
132 daemonized by start() or restart().
133 """
0134
=== added file 'scheduler/daemon/helpers.py'
--- scheduler/daemon/helpers.py 1970-01-01 00:00:00 +0000
+++ scheduler/daemon/helpers.py 2011-04-05 21:11:05 +0000
@@ -0,0 +1,45 @@
1import sys
2import sqlite3
3
4class TestResult(object):
5 def __init__(self, job_id=0, device_id=0, result=0):
6 self.job_id = job_id
7 self.device_id = device_id
8 self.result = result
9
10 def __str__(self):
11 return 'Job ID: %s, Device ID: %s' % (self.job_id, self.device_id)
12
13class DbHelper():
14
15 def __init__(self):
16 self.connection = None
17 self.cursor = None
18
19 def connect_db(self):
20 self.connection = sqlite3.connect('./scheduler/database.db')
21 self.connection.row_factory = sqlite3.Row
22 self.cursor = self.connection.cursor()
23
24 def disconnect_db(self):
25 self.connection.close()
26
27 def get_next_job(self):
28 # sql = """select * from scheduler_app_testjob where status = 0 and submit_time =
29 # (select min(submit_time) from scheduler_app_testjob where status = 0)"""
30 sql = """select * from scheduler_app_testjob where status = 0 and submit_time =
31 (select min(submit_time) from scheduler_app_testjob where exists
32 (select * from scheduler_app_device where id=scheduler_app_testjob.target_id
33 and scheduler_app_device.status=1) and status = 0)"""
34 self.cursor.execute(sql)
35 return self.cursor.fetchone()
36
37 def set_job_state(self, job_id, status):
38 sql = "update scheduler_app_testjob set status = ? where id = ?"
39 self.cursor.execute(sql, (status, job_id))
40 self.connection.commit()
41
42 def set_device_state(self, device_id, status):
43 sql = "update scheduler_app_device set status = ? where id = ?"
44 self.cursor.execute(sql, (status, device_id))
45 self.connection.commit()

Subscribers

People subscribed via source and target branches