Merge lp:~mirsad-vojnikovic-deactivatedaccount/lava-dispatcher/scheduler-daemon into lp:lava-dispatcher

Proposed by Mirsad Vojnikovic
Status: Superseded
Proposed branch: lp:~mirsad-vojnikovic-deactivatedaccount/lava-dispatcher/scheduler-daemon
Merge into: lp:lava-dispatcher
Diff against target: 332 lines (+311/-0)
4 files modified
lava-schedulerd (+100/-0)
scheduler/daemon/async_job.py (+33/-0)
scheduler/daemon/daemon.py (+133/-0)
scheduler/daemon/helpers.py (+45/-0)
To merge this branch: bzr merge lp:~mirsad-vojnikovic-deactivatedaccount/lava-dispatcher/scheduler-daemon
Reviewer Review Type Date Requested Status
Linaro Validation Team Pending
Review via email: mp+55240@code.launchpad.net

This proposal has been superseded by a proposal from 2011-04-05.

Description of the change

Here is a daemon proposal, very modest actually. It takes one job definition from TestJob table and executes it in a dispatcher class I created for testing purposes. This dispatcher class will not do anything special, save print the action names and wait 10 seconds before going on. It is rather easy to modify to do the real job. I think this is a good starter for the daemon job which is in front of us.

Next step is to put more jobs as threads, while updating status of device and test job.

To post a comment you must log in.
Revision history for this message
Mirsad Vojnikovic (mirsad-vojnikovic-deactivatedaccount) wrote :

Sorry, forgot to mention how to run the daemon:

 * 'schedulerd.py start' - start
 * 'schedulerd.py stop' - stop
 * 'schedulerd.py debug' - debug mode, printing messages to terminal

Revision history for this message
Spring Zhang (qzhang) wrote :

110 + message = "pidfile %s does not exist. Daemon not running?\n"
111 + sys.stderr.write(message % self.pidfile)
It's better to place message in one line, or it seems the message is split.Like:
message = "pidfile %s does not exist. Daemon not running?\n" %self.pidfile

It seems your alignment is by 8 spaces/<tab>, I remember it is 4 spaces. Can Paul confirm?

155 + self.connection = sqlite3.connect('./scheduler/database.db')
Can it be a constant variable for './scheduler/database.db'? Not hardcode in function

71 + file(self.pidfile,'w+').write("%s\n" % pid)
The file object needs to close?

150 + def __init__(self):
151 + self.connection = None
152 + self.cursor = None
153 +
154 + def connect_db(self):
155 + self.connection = sqlite3.connect('./scheduler/database.db')
156 + self.cursor = self.connection.cursor()
Can the two methods merge to __init__()?

207 + db_helper = DbHelper()
And can the database name call from caller by a parameter, not specify in DbHelper class?

188 + #params = cmd.get('parameters', {})
189 + #step = lava_commands[cmd['command']](target)
190 + #step.run(**params)
191 +
192 +
Please delete unused statement.

You can use the dispatcher.py from lp:lava, just merge from lp:lava in your branch.

Revision history for this message
Mirsad Vojnikovic (mirsad-vojnikovic-deactivatedaccount) wrote :

Spring, thanks for your comments, but I have decided to drop this proposal since it lacks some important stuff. I will try to reuse your comments for the next proposal instead.

22. By Paul Larson

add a dispatcher.run_job() to actually do the work, so that it can be
called from the outside by things like the scheduler

23. By Paul Larson

Use virtualenv to install the python dependencies and reorganize
entrypoints a bit.

24. By Paul Larson

enable login with openid

25. By Paul Larson

Hi, this branch enables the admin interface for the scheduler
application. Creating a superuser for yourself locally when using OpenID
is a bit tedious, but I've included a helper and instructions how to do
it in the README.

26. By Paul Larson

update top level structure of lava

27. By Paul Larson

Merge initial support for submitting results to launch control

28. By Michael Hudson-Doyle

small README updates

29. By Michael Hudson-Doyle

fix ROOT_URLCONF

30. By Michael Hudson-Doyle

some more clues for doc/QUICKSTART

31. By Paul Larson

allow graphical tests to run properly by exporting the display on the
client

32. By Michael Hudson-Doyle

mention some packages that need to be installed

33. By Paul Larson

Fix make to work under lucid too

34. By Michael Hudson-Doyle

borrow style and templates from launch-control

merges ~mirsad-vojnikovic/lava/scheduler-l-c-style with a tweak from me

35. By Michael Hudson-Doyle

this seems to be needed

36. By Paul Larson

results need to be searched for in /tmp

37. By Mirsad <mirsad@ubuntu1004desktop>

Updated daemon for latest comments. Added tags field to Device table. Changed to GPL2+

Unmerged revisions

37. By Mirsad <mirsad@ubuntu1004desktop>

Updated daemon for latest comments. Added tags field to Device table. Changed to GPL2+

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'lava-schedulerd'
2--- lava-schedulerd 1970-01-01 00:00:00 +0000
3+++ lava-schedulerd 2011-04-05 21:11:05 +0000
4@@ -0,0 +1,100 @@
5+#!/usr/bin/env python
6+
7+import sys, time, json
8+from scheduler.daemon.daemon import Daemon
9+from scheduler.daemon.helpers import DbHelper, TestResult
10+from scheduler.daemon.async_job import AsyncJob
11+from dispatcher import run_job
12+
13+class SchedulerDaemon(Daemon):
14+ def run(self):
15+ db_helper = DbHelper()
16+
17+ while True:
18+ #Pick up next job
19+ db_helper.connect_db()
20+ next_job = db_helper.get_next_job()
21+
22+ if next_job != None:
23+ #Prepare TestResult object
24+ test_result = TestResult(
25+ job_id = next_job['id'],
26+ device_id = next_job['target_id'],
27+ result = 0
28+ )
29+
30+ #Initiate async job
31+ job = AsyncJob(
32+ target = worker,
33+ callback = callback,
34+ timeout = next_job['timeout'],
35+ timeout_callback = timeout_callback,
36+ test_result = test_result,
37+ args = (next_job['definition'],)
38+ )
39+
40+ #Set device to RUNNING state
41+ db_helper.set_device_state(next_job['target_id'], 2)
42+ #Set job to RUNNING state
43+ db_helper.set_job_state(next_job['id'], 1)
44+
45+ #Start job
46+ job.start()
47+
48+ db_helper.disconnect_db()
49+ time.sleep(1)
50+
51+def timeout_callback():
52+ """
53+ Callback to handle job timeout
54+ """
55+ #What to do here?
56+ #Update job, device status
57+ pass
58+
59+def callback(test_result=None):
60+ """
61+ Callback to handle normal job ending
62+ """
63+ #Connect to database
64+ db_helper = DbHelper()
65+ db_helper.connect_db()
66+
67+ #Set device to IDLE state
68+ db_helper.set_device_state(test_result.device_id, 1)
69+ #Set job to COMPLETE state
70+ db_helper.set_job_state(test_result.job_id, 2)
71+
72+ #Disconnect from database
73+ db_helper.disconnect_db()
74+
75+def worker(next_job, q, test_result):
76+ """
77+ Worker function running jobs
78+ """
79+ #Missing result from dispatcher here
80+ run_job(json.loads(next_job))
81+
82+ test_result.result = 1
83+ q.put(test_result)
84+
85+
86+if __name__ == "__main__":
87+ daemon = SchedulerDaemon('/tmp/scheduler-daemon.pid')
88+ if len(sys.argv) == 2:
89+ if 'start' == sys.argv[1]:
90+ daemon.start()
91+ elif 'stop' == sys.argv[1]:
92+ daemon.stop()
93+ elif 'restart' == sys.argv[1]:
94+ daemon.restart()
95+ elif 'debug' == sys.argv[1]:
96+ #Print to terminal, debug purpose only!
97+ daemon.run()
98+ else:
99+ print "Unknown command"
100+ sys.exit(2)
101+ sys.exit(0)
102+ else:
103+ print "usage: %s start|stop|restart" % sys.argv[0]
104+ sys.exit(2)
105
106=== added directory 'scheduler/daemon'
107=== added file 'scheduler/daemon/__init__.py'
108=== added file 'scheduler/daemon/async_job.py'
109--- scheduler/daemon/async_job.py 1970-01-01 00:00:00 +0000
110+++ scheduler/daemon/async_job.py 2011-04-05 21:11:05 +0000
111@@ -0,0 +1,33 @@
112+from multiprocessing import Queue, Process
113+
114+class AsyncJob(object):
115+ def __init__(self, target=None, callback=None, timeout=None,
116+ timeout_callback=None, test_result=None, args=(), kwargs={}):
117+
118+ self.target = target
119+ self.callback = callback
120+ self.timeout = timeout
121+ self.timeout_callback = timeout_callback
122+ self.test_result = test_result
123+ self.args = args
124+ self.kwargs = kwargs
125+
126+ def start(self):
127+ self.proc = Process(target=self._proxy)
128+ self.proc.start()
129+
130+ def _proxy(self):
131+ # Get result from worker via Queue
132+ q = Queue()
133+
134+ p = Process(target=self.target, args=(self.args[0], q, self.test_result))
135+ p.start()
136+ p.join(timeout=self.timeout)
137+
138+ if p.is_alive():
139+ p.terminate()
140+ if self.timeout_callback:
141+ self.timeout_callback()
142+ else:
143+ if self.callback:
144+ self.callback(q.get())
145
146=== added file 'scheduler/daemon/daemon.py'
147--- scheduler/daemon/daemon.py 1970-01-01 00:00:00 +0000
148+++ scheduler/daemon/daemon.py 2011-04-05 21:11:05 +0000
149@@ -0,0 +1,133 @@
150+#!/usr/bin/env python
151+"""
152+Python daemon from:
153+http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
154+"""
155+
156+import sys, os, time, atexit
157+from signal import SIGTERM
158+
159+class Daemon:
160+ """
161+ A generic daemon class.
162+
163+ Usage: subclass the Daemon class and override the run() method
164+ """
165+ def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
166+ self.stdin = stdin
167+ self.stdout = stdout
168+ self.stderr = stderr
169+ self.pidfile = pidfile
170+
171+ def daemonize(self):
172+ """
173+ do the UNIX double-fork magic, see Stevens' "Advanced
174+ Programming in the UNIX Environment" for details (ISBN 0201563177)
175+ http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
176+ """
177+ try:
178+ pid = os.fork()
179+ if pid > 0:
180+ # exit first parent
181+ sys.exit(0)
182+ except OSError, e:
183+ sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
184+ sys.exit(1)
185+
186+ # decouple from parent environment
187+ os.chdir("/")
188+ os.setsid()
189+ os.umask(0)
190+
191+ # do second fork
192+ try:
193+ pid = os.fork()
194+ if pid > 0:
195+ # exit from second parent
196+ sys.exit(0)
197+ except OSError, e:
198+ sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
199+ sys.exit(1)
200+
201+ # redirect standard file descriptors
202+ sys.stdout.flush()
203+ sys.stderr.flush()
204+ si = file(self.stdin, 'r')
205+ so = file(self.stdout, 'a+')
206+ se = file(self.stderr, 'a+', 0)
207+ os.dup2(si.fileno(), sys.stdin.fileno())
208+ os.dup2(so.fileno(), sys.stdout.fileno())
209+ os.dup2(se.fileno(), sys.stderr.fileno())
210+
211+ # write pidfile
212+ atexit.register(self.delpid)
213+ pid = str(os.getpid())
214+ file(self.pidfile,'w+').write("%s\n" % pid)
215+
216+ def delpid(self):
217+ os.remove(self.pidfile)
218+
219+ def start(self):
220+ """
221+ Start the daemon
222+ """
223+ # Check for a pidfile to see if the daemon already runs
224+ try:
225+ pf = file(self.pidfile,'r')
226+ pid = int(pf.read().strip())
227+ pf.close()
228+ except IOError:
229+ pid = None
230+
231+ if pid:
232+ message = "pidfile %s already exist. Daemon already running?\n"
233+ sys.stderr.write(message % self.pidfile)
234+ sys.exit(1)
235+
236+ # Start the daemon
237+ self.daemonize()
238+ self.run()
239+
240+ def stop(self):
241+ """
242+ Stop the daemon
243+ """
244+ # Get the pid from the pidfile
245+ try:
246+ pf = file(self.pidfile,'r')
247+ pid = int(pf.read().strip())
248+ pf.close()
249+ except IOError:
250+ pid = None
251+
252+ if not pid:
253+ message = "pidfile %s does not exist. Daemon not running?\n"
254+ sys.stderr.write(message % self.pidfile)
255+ return # not an error in a restart
256+
257+ # Try killing the daemon process
258+ try:
259+ while 1:
260+ os.kill(pid, SIGTERM)
261+ time.sleep(0.1)
262+ except OSError, err:
263+ err = str(err)
264+ if err.find("No such process") > 0:
265+ if os.path.exists(self.pidfile):
266+ os.remove(self.pidfile)
267+ else:
268+ print str(err)
269+ sys.exit(1)
270+
271+ def restart(self):
272+ """
273+ Restart the daemon
274+ """
275+ self.stop()
276+ self.start()
277+
278+ def run(self):
279+ """
280+ You should override this method when you subclass Daemon. It will be called after the process has been
281+ daemonized by start() or restart().
282+ """
283
284=== added file 'scheduler/daemon/helpers.py'
285--- scheduler/daemon/helpers.py 1970-01-01 00:00:00 +0000
286+++ scheduler/daemon/helpers.py 2011-04-05 21:11:05 +0000
287@@ -0,0 +1,45 @@
288+import sys
289+import sqlite3
290+
291+class TestResult(object):
292+ def __init__(self, job_id=0, device_id=0, result=0):
293+ self.job_id = job_id
294+ self.device_id = device_id
295+ self.result = result
296+
297+ def __str__(self):
298+ return 'Job ID: %s, Device ID: %s' % (self.job_id, self.device_id)
299+
300+class DbHelper():
301+
302+ def __init__(self):
303+ self.connection = None
304+ self.cursor = None
305+
306+ def connect_db(self):
307+ self.connection = sqlite3.connect('./scheduler/database.db')
308+ self.connection.row_factory = sqlite3.Row
309+ self.cursor = self.connection.cursor()
310+
311+ def disconnect_db(self):
312+ self.connection.close()
313+
314+ def get_next_job(self):
315+ # sql = """select * from scheduler_app_testjob where status = 0 and submit_time =
316+ # (select min(submit_time) from scheduler_app_testjob where status = 0)"""
317+ sql = """select * from scheduler_app_testjob where status = 0 and submit_time =
318+ (select min(submit_time) from scheduler_app_testjob where exists
319+ (select * from scheduler_app_device where id=scheduler_app_testjob.target_id
320+ and scheduler_app_device.status=1) and status = 0)"""
321+ self.cursor.execute(sql)
322+ return self.cursor.fetchone()
323+
324+ def set_job_state(self, job_id, status):
325+ sql = "update scheduler_app_testjob set status = ? where id = ?"
326+ self.cursor.execute(sql, (status, job_id))
327+ self.connection.commit()
328+
329+ def set_device_state(self, device_id, status):
330+ sql = "update scheduler_app_device set status = ? where id = ?"
331+ self.cursor.execute(sql, (status, device_id))
332+ self.connection.commit()

Subscribers

People subscribed via source and target branches