Merge lp:~stylesen/lava-scheduler/review-take-1 into lp:lava-scheduler/multinode

Proposed by Senthil Kumaran S
Status: Merged
Approved by: Neil Williams
Approved revision: 287
Merged at revision: 285
Proposed branch: lp:~stylesen/lava-scheduler/review-take-1
Merge into: lp:lava-scheduler/multinode
Diff against target: 1036 lines (+293/-550)
11 files modified
lava_scheduler_app/api.py (+2/-2)
lava_scheduler_app/management/commands/schedulermonitor.py (+1/-1)
lava_scheduler_app/models.py (+8/-1)
lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html (+2/-2)
lava_scheduler_app/utils.py (+50/-49)
lava_scheduler_app/views.py (+2/-2)
lava_scheduler_daemon/board.py (+0/-355)
lava_scheduler_daemon/dbjobsource.py (+2/-76)
lava_scheduler_daemon/job.py (+204/-6)
lava_scheduler_daemon/service.py (+21/-55)
lava_scheduler_daemon/tests/test_board.py (+1/-1)
To merge this branch: bzr merge lp:~stylesen/lava-scheduler/review-take-1
Reviewer Review Type Date Requested Status
Neil Williams Approve
Review via email: mp+182635@code.launchpad.net

Description of the change

Addressed all review comments from Antonio and removed all legacy code which is no longer required.

To post a comment you must log in.
Revision history for this message
Neil Williams (codehelp) wrote :

Thanks Senthil - tested on multinode.v.l.o & approved.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lava_scheduler_app/api.py'
2--- lava_scheduler_app/api.py 2013-08-16 09:21:16 +0000
3+++ lava_scheduler_app/api.py 2013-08-28 13:17:52 +0000
4@@ -49,7 +49,7 @@
5 job = TestJob.objects.accessible_by_principal(self.user).get(pk=job_id)
6 except TestJob.DoesNotExist:
7 raise xmlrpclib.Fault(404, "Specified job not found.")
8- if job.target_group:
9+ if job.is_multinode:
10 return self.submit_job(job.multinode_definition)
11 else:
12 return self.submit_job(job.definition)
13@@ -60,7 +60,7 @@
14 job = TestJob.objects.get(pk=job_id)
15 if not job.can_cancel(self.user):
16 raise xmlrpclib.Fault(403, "Permission denied.")
17- if job.target_group:
18+ if job.is_multinode:
19 multinode_jobs = TestJob.objects.all().filter(
20 target_group=job.target_group)
21 for multinode_job in multinode_jobs:
22
23=== modified file 'lava_scheduler_app/management/commands/schedulermonitor.py'
24--- lava_scheduler_app/management/commands/schedulermonitor.py 2012-12-03 05:03:38 +0000
25+++ lava_scheduler_app/management/commands/schedulermonitor.py 2013-08-28 13:17:52 +0000
26@@ -31,7 +31,7 @@
27
28 def handle(self, *args, **options):
29 from twisted.internet import reactor
30- from lava_scheduler_daemon.board import Job
31+ from lava_scheduler_daemon.job import Job
32 daemon_options = self._configure(options)
33 source = DatabaseJobSource()
34 dispatcher, board_name, json_file = args
35
36=== modified file 'lava_scheduler_app/models.py'
37--- lava_scheduler_app/models.py 2013-08-22 05:58:24 +0000
38+++ lava_scheduler_app/models.py 2013-08-28 13:17:52 +0000
39@@ -640,13 +640,20 @@
40
41 @property
42 def sub_jobs_list(self):
43- if self.target_group:
44+ if self.is_multinode:
45 jobs = TestJob.objects.filter(
46 target_group=self.target_group).order_by('id')
47 return jobs
48 else:
49 return None
50
51+ @property
52+ def is_multinode(self):
53+ if self.target_group:
54+ return True
55+ else:
56+ return False
57+
58
59 class DeviceStateTransition(models.Model):
60 created_on = models.DateTimeField(auto_now_add=True)
61
62=== modified file 'lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html'
63--- lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html 2013-08-22 05:58:24 +0000
64+++ lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html 2013-08-28 13:17:52 +0000
65@@ -63,7 +63,7 @@
66 <dt>Finished at:</dt>
67 <dd>{{ job.end_time|default:"not finished" }}</dd>
68
69- {% if job.target_group %}
70+ {% if job.is_multinode %}
71 <dt>Sub Jobs:</dt>
72 {% for subjob in job.sub_jobs_list %}
73 <dd>
74@@ -87,7 +87,7 @@
75 <li>
76 <a href="{% url lava.scheduler.job.definition job.pk %}">Definition</a>
77 </li>
78- {% if job.target_group %}
79+ {% if job.is_multinode %}
80 <li>
81 <a href="{% url lava.scheduler.job.multinode_definition job.pk %}"> Multinode Definition</a>
82 </li>
83
84=== modified file 'lava_scheduler_app/utils.py'
85--- lava_scheduler_app/utils.py 2013-08-23 09:52:50 +0000
86+++ lava_scheduler_app/utils.py 2013-08-28 13:17:52 +0000
87@@ -45,49 +45,54 @@
88 node_json = {}
89 all_nodes = {}
90 node_actions = {}
91- if "device_group" in json_jobdata:
92- # get all the roles and create node action list for each role.
93- for group in json_jobdata["device_group"]:
94- node_actions[group["role"]] = []
95-
96- # Take each action and assign it to proper roles. If roles are not
97- # specified for a specific action, then assign it to all the roles.
98- all_actions = json_jobdata["actions"]
99- for role in node_actions.keys():
100- for action in all_actions:
101- new_action = copy.deepcopy(action)
102- if 'parameters' in new_action \
103- and 'role' in new_action["parameters"]:
104- if new_action["parameters"]["role"] == role:
105- new_action["parameters"].pop('role', None)
106- node_actions[role].append(new_action)
107- else:
108+
109+ # Check if we are operating on multinode job data. Else return the job
110+ # data as it is.
111+ if "device_group" in json_jobdata and target_group:
112+ pass
113+ else:
114+ return json_jobdata
115+
116+ # get all the roles and create node action list for each role.
117+ for group in json_jobdata["device_group"]:
118+ node_actions[group["role"]] = []
119+
120+ # Take each action and assign it to proper roles. If roles are not
121+ # specified for a specific action, then assign it to all the roles.
122+ all_actions = json_jobdata["actions"]
123+ for role in node_actions.keys():
124+ for action in all_actions:
125+ new_action = copy.deepcopy(action)
126+ if 'parameters' in new_action \
127+ and 'role' in new_action["parameters"]:
128+ if new_action["parameters"]["role"] == role:
129+ new_action["parameters"].pop('role', None)
130 node_actions[role].append(new_action)
131-
132- group_count = 0
133- for clients in json_jobdata["device_group"]:
134- group_count += int(clients["count"])
135- for clients in json_jobdata["device_group"]:
136- role = str(clients["role"])
137- count = int(clients["count"])
138- node_json[role] = []
139- for c in range(0, count):
140- node_json[role].append({})
141- node_json[role][c]["timeout"] = json_jobdata["timeout"]
142- node_json[role][c]["job_name"] = json_jobdata["job_name"]
143- node_json[role][c]["tags"] = clients["tags"]
144- node_json[role][c]["group_size"] = group_count
145- node_json[role][c]["target_group"] = target_group
146- node_json[role][c]["actions"] = node_actions[role]
147-
148- node_json[role][c]["role"] = role
149- # multinode node stage 2
150- node_json[role][c]["logging_level"] = json_jobdata["logging_level"]
151- node_json[role][c]["device_type"] = clients["device_type"]
152-
153- return node_json
154-
155- return 0
156+ else:
157+ node_actions[role].append(new_action)
158+
159+ group_count = 0
160+ for clients in json_jobdata["device_group"]:
161+ group_count += int(clients["count"])
162+ for clients in json_jobdata["device_group"]:
163+ role = str(clients["role"])
164+ count = int(clients["count"])
165+ node_json[role] = []
166+ for c in range(0, count):
167+ node_json[role].append({})
168+ node_json[role][c]["timeout"] = json_jobdata["timeout"]
169+ node_json[role][c]["job_name"] = json_jobdata["job_name"]
170+ node_json[role][c]["tags"] = clients["tags"]
171+ node_json[role][c]["group_size"] = group_count
172+ node_json[role][c]["target_group"] = target_group
173+ node_json[role][c]["actions"] = node_actions[role]
174+
175+ node_json[role][c]["role"] = role
176+ # multinode node stage 2
177+ node_json[role][c]["logging_level"] = json_jobdata["logging_level"]
178+ node_json[role][c]["device_type"] = clients["device_type"]
179+
180+ return node_json
181
182
183 def requested_device_count(json_data):
184@@ -100,17 +105,13 @@
185
186 {'kvm': 1, 'qemu': 3, 'panda': 1}
187
188- If the job is not a multinode job, then return None.
189+ If the job is not a multinode job, then return an empty dictionary.
190 """
191 job_data = simplejson.loads(json_data)
192+ requested_devices = {}
193 if 'device_group' in job_data:
194- requested_devices = {}
195 for device_group in job_data['device_group']:
196 device_type = device_group['device_type']
197 count = device_group['count']
198 requested_devices[device_type] = count
199- return requested_devices
200- else:
201- # TODO: Put logic to check whether we have requested devices attached
202- # to this lava-server, even if it is a single node job?
203- return None
204+ return requested_devices
205
206=== modified file 'lava_scheduler_app/views.py'
207--- lava_scheduler_app/views.py 2013-08-27 15:18:24 +0000
208+++ lava_scheduler_app/views.py 2013-08-28 13:17:52 +0000
209@@ -801,7 +801,7 @@
210 def job_cancel(request, pk):
211 job = get_restricted_job(request.user, pk)
212 if job.can_cancel(request.user):
213- if job.target_group:
214+ if job.is_multinode:
215 multinode_jobs = TestJob.objects.all().filter(
216 target_group=job.target_group)
217 for multinode_job in multinode_jobs:
218@@ -826,7 +826,7 @@
219 if job.can_resubmit(request.user):
220 response_data["is_authorized"] = True
221
222- if job.target_group:
223+ if job.is_multinode:
224 definition = job.multinode_definition
225 else:
226 definition = job.definition
227
228=== removed file 'lava_scheduler_daemon/board.py'
229--- lava_scheduler_daemon/board.py 2013-08-19 10:44:11 +0000
230+++ lava_scheduler_daemon/board.py 1970-01-01 00:00:00 +0000
231@@ -1,355 +0,0 @@
232-import json
233-import os
234-import signal
235-import tempfile
236-import logging
237-
238-from twisted.internet.error import ProcessDone, ProcessExitedAlready
239-from twisted.internet.protocol import ProcessProtocol
240-from twisted.internet import defer, task
241-
242-
243-def catchall_errback(logger):
244- def eb(failure):
245- logger.error(
246- '%s: %s\n%s', failure.type.__name__, failure.value,
247- failure.getTraceback())
248- return eb
249-
250-
251-class DispatcherProcessProtocol(ProcessProtocol):
252-
253- def __init__(self, deferred, job):
254- self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
255- self.deferred = deferred
256- self.log_size = 0
257- self.job = job
258-
259- def childDataReceived(self, childFD, data):
260- self.log_size += len(data)
261- if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
262- if not self.job._killing:
263- self.job.cancel("exceeded log size limit")
264-
265- def childConnectionLost(self, childFD):
266- self.logger.info("childConnectionLost for %s: %s",
267- self.job.board_name, childFD)
268-
269- def processExited(self, reason):
270- self.logger.info("processExited for %s: %s",
271- self.job.board_name, reason.value)
272-
273- def processEnded(self, reason):
274- self.logger.info("processEnded for %s: %s",
275- self.job.board_name, reason.value)
276- self.deferred.callback(reason.value.exitCode)
277-
278-
279-class Job(object):
280-
281- def __init__(self, job_data, dispatcher, source, board_name, reactor,
282- daemon_options):
283- self.job_data = job_data
284- self.dispatcher = dispatcher
285- self.source = source
286- self.board_name = board_name
287- self.logger = logging.getLogger(__name__ + '.Job.' + board_name)
288- self.reactor = reactor
289- self.daemon_options = daemon_options
290- self._json_file = None
291- self._source_lock = defer.DeferredLock()
292- self._checkCancel_call = task.LoopingCall(self._checkCancel)
293- self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
294- self._time_limit_call = None
295- self._killing = False
296- self._kill_reason = ''
297-
298- def _checkCancel(self):
299- if self._killing:
300- self.cancel()
301- else:
302- return self._source_lock.run(
303- self.source.jobCheckForCancellation,
304- self.board_name).addCallback(self._maybeCancel)
305-
306- def cancel(self, reason=None):
307- if not self._killing:
308- if reason is None:
309- reason = "killing job for unknown reason"
310- self._kill_reason = reason
311- self.logger.info(reason)
312- self._killing = True
313- if self._signals:
314- signame = self._signals.pop(0)
315- else:
316- self.logger.warning("self._signals is empty!")
317- signame = 'SIGKILL'
318- self.logger.info(
319- 'attempting to kill job with signal %s' % signame)
320- try:
321- self._protocol.transport.signalProcess(getattr(signal, signame))
322- except ProcessExitedAlready:
323- pass
324-
325- def _maybeCancel(self, cancel):
326- if cancel:
327- self.cancel("killing job by user request")
328- else:
329- logging.debug('not cancelling')
330-
331- def _time_limit_exceeded(self):
332- self._time_limit_call = None
333- self.cancel("killing job for exceeding timeout")
334-
335- def run(self):
336- d = self.source.getOutputDirForJobOnBoard(self.board_name)
337- return d.addCallback(self._run).addErrback(
338- catchall_errback(self.logger))
339-
340- def _run(self, output_dir):
341- d = defer.Deferred()
342- json_data = self.job_data
343- fd, self._json_file = tempfile.mkstemp()
344- with os.fdopen(fd, 'wb') as f:
345- json.dump(json_data, f)
346- self._protocol = DispatcherProcessProtocol(d, self)
347- self.reactor.spawnProcess(
348- self._protocol, self.dispatcher, args=[
349- self.dispatcher, self._json_file, '--output-dir', output_dir],
350- childFDs={0: 0, 1: 'r', 2: 'r'}, env=None)
351- self._checkCancel_call.start(10)
352- timeout = max(
353- json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
354- self._time_limit_call = self.reactor.callLater(
355- timeout, self._time_limit_exceeded)
356- d.addBoth(self._exited)
357- return d
358-
359- def _exited(self, exit_code):
360- self.logger.info("job finished on %s", self.job_data['target'])
361- if self._json_file is not None:
362- os.unlink(self._json_file)
363- self.logger.info("reporting job completed")
364- if self._time_limit_call is not None:
365- self._time_limit_call.cancel()
366- self._checkCancel_call.stop()
367- return self._source_lock.run(
368- self.source.jobCompleted,
369- self.board_name,
370- exit_code,
371- self._killing).addCallback(
372- lambda r: exit_code)
373-
374-
375-class SchedulerMonitorPP(ProcessProtocol):
376-
377- def __init__(self, d, board_name):
378- self.d = d
379- self.board_name = board_name
380- self.logger = logging.getLogger(__name__ + '.SchedulerMonitorPP')
381-
382- def childDataReceived(self, childFD, data):
383- self.logger.warning(
384- "scheduler monitor for %s produced output: %r on fd %s",
385- self.board_name, data, childFD)
386-
387- def processEnded(self, reason):
388- if not reason.check(ProcessDone):
389- self.logger.error(
390- "scheduler monitor for %s crashed: %s",
391- self.board_name, reason)
392- self.d.callback(None)
393-
394-
395-class MonitorJob(object):
396-
397- def __init__(self, job_data, dispatcher, source, board_name, reactor,
398- daemon_options):
399- self.logger = logging.getLogger(__name__ + '.MonitorJob')
400- self.job_data = job_data
401- self.dispatcher = dispatcher
402- self.source = source
403- self.board_name = board_name
404- self.reactor = reactor
405- self.daemon_options = daemon_options
406- self._json_file = None
407-
408- def run(self):
409- d = defer.Deferred()
410- json_data = self.job_data
411- fd, self._json_file = tempfile.mkstemp()
412- with os.fdopen(fd, 'wb') as f:
413- json.dump(json_data, f)
414-
415- childFDs = {0: 0, 1: 1, 2: 2}
416- args = [
417- 'setsid', 'lava-server', 'manage', 'schedulermonitor',
418- self.dispatcher, str(self.board_name), self._json_file,
419- '-l', self.daemon_options['LOG_LEVEL']]
420- if self.daemon_options['LOG_FILE_PATH']:
421- args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
422- childFDs = None
423- self.logger.info('executing "%s"', ' '.join(args))
424- self.reactor.spawnProcess(
425- SchedulerMonitorPP(d, self.board_name), 'setsid',
426- childFDs=childFDs, env=None, args=args)
427- d.addBoth(self._exited)
428- return d
429-
430- def _exited(self, result):
431- if self._json_file is not None:
432- os.unlink(self._json_file)
433- return result
434-
435-
436-class Board(object):
437- """
438- A board runs jobs. A board can be in four main states:
439-
440- * stopped (S)
441- * the board is not looking for or processing jobs
442- * checking (C)
443- * a call to check for a new job is in progress
444- * waiting (W)
445- * no job was found by the last call to getJobForBoard and so the board
446- is waiting for a while before calling again.
447- * running (R)
448- * a job is running (or a job has completed but the call to jobCompleted
449- on the job source has not)
450-
451- In addition, because we can't stop a job instantly nor abort a check for a
452- new job safely (because a if getJobForBoard returns a job, it has already
453- been marked as started), there are variations on the 'checking' and
454- 'running' states -- 'checking with stop requested' (C+S) and 'running with
455- stop requested' (R+S). Even this is a little simplistic as there is the
456- possibility of .start() being called before the process of stopping
457- completes, but we deal with this by deferring any actions taken by
458- .start() until the board is really stopped.
459-
460- Events that cause state transitions are:
461-
462- * start() is called. We cheat and pretend that this can only happen in
463- the stopped state by stopping first, and then move into the C state.
464-
465- * stop() is called. If we in the C or R state we move to C+S or R+S
466- resepectively. If we are in S, C+S or R+S, we stay there. If we are
467- in W, we just move straight to S.
468-
469- * getJobForBoard() returns a job. We can only be in C or C+S here, and
470- move into R or R+S respectively.
471-
472- * getJobForBoard() indicates that there is no job to perform. Again we
473- can only be in C or C+S and move into W or S respectively.
474-
475- * a job completes (i.e. the call to jobCompleted() on the source
476- returns). We can only be in R or R+S and move to C or S respectively.
477-
478- * the timer that being in state W implies expires. We move into C.
479-
480- The cheating around start means that interleaving start and stop calls may
481- not always do what you expect. So don't mess around in that way please.
482- """
483-
484- job_cls = MonitorJob
485-
486- def __init__(self, source, board_name, dispatcher, reactor, daemon_options,
487- job_cls=None):
488- self.source = source
489- self.board_name = board_name
490- self.dispatcher = dispatcher
491- self.reactor = reactor
492- self.daemon_options = daemon_options
493- if job_cls is not None:
494- self.job_cls = job_cls
495- self.running_job = None
496- self._check_call = None
497- self._stopping_deferreds = []
498- self.logger = logging.getLogger(__name__ + '.Board.' + board_name)
499- self.checking = False
500-
501- def _state_name(self):
502- if self.running_job:
503- state = "R"
504- elif self._check_call:
505- assert not self._stopping_deferreds
506- state = "W"
507- elif self.checking:
508- state = "C"
509- else:
510- assert not self._stopping_deferreds
511- state = "S"
512- if self._stopping_deferreds:
513- state += "+S"
514- return state
515-
516- def start(self):
517- self.logger.debug("start requested")
518- self.stop().addCallback(self._start)
519-
520- def _start(self, ignored):
521- self.logger.debug("starting")
522- self._stopping_deferreds = []
523- self._checkForJob()
524-
525- def stop(self):
526- self.logger.debug("stopping")
527- if self._check_call is not None:
528- self._check_call.cancel()
529- self._check_call = None
530-
531- if self.running_job is not None or self.checking:
532- self.logger.debug("job running; deferring stop")
533- self._stopping_deferreds.append(defer.Deferred())
534- return self._stopping_deferreds[-1]
535- else:
536- self.logger.debug("stopping immediately")
537- return defer.succeed(None)
538-
539- def _checkForJob(self):
540- self.logger.debug("checking for job")
541- self._check_call = None
542- self.checking = True
543- self.source.getJobForBoard(self.board_name).addCallbacks(
544- self._maybeStartJob, self._ebCheckForJob)
545-
546- def _ebCheckForJob(self, result):
547- self.logger.error(
548- '%s: %s\n%s', result.type.__name__, result.value,
549- result.getTraceback())
550- self._maybeStartJob(None)
551-
552- def _finish_stop(self):
553- self.logger.debug(
554- "calling %s deferreds returned from stop()",
555- len(self._stopping_deferreds))
556- for d in self._stopping_deferreds:
557- d.callback(None)
558- self._stopping_deferreds = []
559-
560- def _maybeStartJob(self, job_data):
561- self.checking = False
562- if job_data is None:
563- self.logger.debug("no job found")
564- if self._stopping_deferreds:
565- self._finish_stop()
566- else:
567- self._check_call = self.reactor.callLater(
568- 10, self._checkForJob)
569- return
570- self.logger.info("starting job %r", job_data)
571- self.running_job = self.job_cls(
572- job_data, self.dispatcher, self.source, self.board_name,
573- self.reactor, self.daemon_options, None)
574- d = self.running_job.run()
575- d.addCallbacks(self._cbJobFinished, self._ebJobFinished)
576-
577- def _ebJobFinished(self, result):
578- self.logger.exception(result.value)
579- self._checkForJob()
580-
581- def _cbJobFinished(self, result):
582- self.running_job = None
583- if self._stopping_deferreds:
584- self._finish_stop()
585- else:
586- self._checkForJob()
587
588=== modified file 'lava_scheduler_daemon/dbjobsource.py'
589--- lava_scheduler_daemon/dbjobsource.py 2013-08-12 12:45:13 +0000
590+++ lava_scheduler_daemon/dbjobsource.py 2013-08-28 13:17:52 +0000
591@@ -93,21 +93,6 @@
592 transaction.leave_transaction_management()
593 return self.deferToThread(wrapper, *args, **kw)
594
595- def getBoardList_impl(self):
596- self.logger.info("Checking configured devices")
597- configured_boards = [
598- x.hostname for x in dispatcher_config.get_devices()]
599- boards = []
600- for d in configured_boards:
601- self.logger.info("%s is configured" % d.hostname)
602- for d in Device.objects.all():
603- if d.hostname in configured_boards:
604- boards.append({'hostname': d.hostname})
605- return boards
606-
607- def getBoardList(self):
608- return self.deferForDB(self.getBoardList_impl)
609-
610 def _get_health_check_jobs(self):
611 """Gets the list of configured boards and checks which are the boards
612 that require health check.
613@@ -203,7 +188,7 @@
614 continue
615 if devices:
616 for d in devices:
617- self.logger.info("Checking %s" % d.hostname)
618+ self.logger.debug("Checking %s" % d.hostname)
619 if d.hostname in configured_boards:
620 if job:
621 job = self._fix_device(d, job)
622@@ -214,7 +199,7 @@
623 # target_group are assigned devices.
624 final_job_list = copy.deepcopy(job_list)
625 for job in job_list:
626- if job.target_group:
627+ if job.is_multinode:
628 multinode_jobs = TestJob.objects.all().filter(
629 target_group=job.target_group)
630
631@@ -301,65 +286,6 @@
632 else:
633 return None
634
635- def getJobForBoard_impl(self, board_name):
636- while True:
637- device = Device.objects.get(hostname=board_name)
638- if device.status != Device.IDLE:
639- return None
640- if not device.device_type.health_check_job:
641- run_health_check = False
642- elif device.health_status == Device.HEALTH_UNKNOWN:
643- run_health_check = True
644- elif device.health_status == Device.HEALTH_LOOPING:
645- run_health_check = True
646- elif not device.last_health_report_job:
647- run_health_check = True
648- else:
649- run_health_check = device.last_health_report_job.end_time < datetime.datetime.now() - datetime.timedelta(days=1)
650- if run_health_check:
651- job = self._getHealthCheckJobForBoard(device)
652- else:
653- job = self._getJobFromQueue(device)
654- if job:
655- DeviceStateTransition.objects.create(
656- created_by=None, device=device, old_state=device.status,
657- new_state=Device.RUNNING, message=None, job=job).save()
658- job.status = TestJob.RUNNING
659- job.start_time = datetime.datetime.utcnow()
660- job.actual_device = device
661- device.status = Device.RUNNING
662- shutil.rmtree(job.output_dir, ignore_errors=True)
663- device.current_job = job
664- try:
665- # The unique constraint on current_job may cause this to
666- # fail in the case of concurrent requests for different
667- # boards grabbing the same job. If there are concurrent
668- # requests for the *same* board they may both return the
669- # same job -- this is an application level bug though.
670- device.save()
671- except IntegrityError:
672- self.logger.info(
673- "job %s has been assigned to another board -- "
674- "rolling back", job.id)
675- transaction.rollback()
676- continue
677- else:
678- job.log_file.save(
679- 'job-%s.log' % job.id, ContentFile(''), save=False)
680- job.submit_token = AuthToken.objects.create(user=job.submitter)
681- job.save()
682- json_data = self._get_json_data(job)
683- transaction.commit()
684- return json_data
685- else:
686- # _getHealthCheckJobForBoard can offline the board, so commit
687- # in this branch too.
688- transaction.commit()
689- return None
690-
691- def getJobForBoard(self, board_name):
692- return self.deferForDB(self.getJobForBoard_impl, board_name)
693-
694 def getJobDetails_impl(self, job):
695 job.status = TestJob.RUNNING
696 job.start_time = datetime.datetime.utcnow()
697
698=== modified file 'lava_scheduler_daemon/job.py'
699--- lava_scheduler_daemon/job.py 2013-07-17 12:48:53 +0000
700+++ lava_scheduler_daemon/job.py 2013-08-28 13:17:52 +0000
701@@ -16,13 +16,211 @@
702 # You should have received a copy of the GNU Affero General Public License
703 # along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>.
704
705+import json
706+import os
707+import signal
708+import tempfile
709 import logging
710
711-from twisted.internet import defer
712-from lava_scheduler_daemon.board import MonitorJob
713-
714-
715-class NewJob(object):
716+from twisted.internet.error import ProcessDone, ProcessExitedAlready
717+from twisted.internet.protocol import ProcessProtocol
718+from twisted.internet import defer, task
719+
720+
721+def catchall_errback(logger):
722+ def eb(failure):
723+ logger.error(
724+ '%s: %s\n%s', failure.type.__name__, failure.value,
725+ failure.getTraceback())
726+ return eb
727+
728+
729+class DispatcherProcessProtocol(ProcessProtocol):
730+
731+ def __init__(self, deferred, job):
732+ self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol')
733+ self.deferred = deferred
734+ self.log_size = 0
735+ self.job = job
736+
737+ def childDataReceived(self, childFD, data):
738+ self.log_size += len(data)
739+ if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']:
740+ if not self.job._killing:
741+ self.job.cancel("exceeded log size limit")
742+
743+ def childConnectionLost(self, childFD):
744+ self.logger.info("childConnectionLost for %s: %s",
745+ self.job.board_name, childFD)
746+
747+ def processExited(self, reason):
748+ self.logger.info("processExited for %s: %s",
749+ self.job.board_name, reason.value)
750+
751+ def processEnded(self, reason):
752+ self.logger.info("processEnded for %s: %s",
753+ self.job.board_name, reason.value)
754+ self.deferred.callback(reason.value.exitCode)
755+
756+
757+class Job(object):
758+
759+ def __init__(self, job_data, dispatcher, source, board_name, reactor,
760+ daemon_options):
761+ self.job_data = job_data
762+ self.dispatcher = dispatcher
763+ self.source = source
764+ self.board_name = board_name
765+ self.logger = logging.getLogger(__name__ + '.Job.' + board_name)
766+ self.reactor = reactor
767+ self.daemon_options = daemon_options
768+ self._json_file = None
769+ self._source_lock = defer.DeferredLock()
770+ self._checkCancel_call = task.LoopingCall(self._checkCancel)
771+ self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL']
772+ self._time_limit_call = None
773+ self._killing = False
774+ self._kill_reason = ''
775+
776+ def _checkCancel(self):
777+ if self._killing:
778+ self.cancel()
779+ else:
780+ return self._source_lock.run(
781+ self.source.jobCheckForCancellation,
782+ self.board_name).addCallback(self._maybeCancel)
783+
784+ def cancel(self, reason=None):
785+ if not self._killing:
786+ if reason is None:
787+ reason = "killing job for unknown reason"
788+ self._kill_reason = reason
789+ self.logger.info(reason)
790+ self._killing = True
791+ if self._signals:
792+ signame = self._signals.pop(0)
793+ else:
794+ self.logger.warning("self._signals is empty!")
795+ signame = 'SIGKILL'
796+ self.logger.info(
797+ 'attempting to kill job with signal %s' % signame)
798+ try:
799+ self._protocol.transport.signalProcess(getattr(signal, signame))
800+ except ProcessExitedAlready:
801+ pass
802+
803+ def _maybeCancel(self, cancel):
804+ if cancel:
805+ self.cancel("killing job by user request")
806+ else:
807+ logging.debug('not cancelling')
808+
809+ def _time_limit_exceeded(self):
810+ self._time_limit_call = None
811+ self.cancel("killing job for exceeding timeout")
812+
813+ def run(self):
814+ d = self.source.getOutputDirForJobOnBoard(self.board_name)
815+ return d.addCallback(self._run).addErrback(
816+ catchall_errback(self.logger))
817+
818+ def _run(self, output_dir):
819+ d = defer.Deferred()
820+ json_data = self.job_data
821+ fd, self._json_file = tempfile.mkstemp()
822+ with os.fdopen(fd, 'wb') as f:
823+ json.dump(json_data, f)
824+ self._protocol = DispatcherProcessProtocol(d, self)
825+ self.reactor.spawnProcess(
826+ self._protocol, self.dispatcher, args=[
827+ self.dispatcher, self._json_file, '--output-dir', output_dir],
828+ childFDs={0: 0, 1: 'r', 2: 'r'}, env=None)
829+ self._checkCancel_call.start(10)
830+ timeout = max(
831+ json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT'])
832+ self._time_limit_call = self.reactor.callLater(
833+ timeout, self._time_limit_exceeded)
834+ d.addBoth(self._exited)
835+ return d
836+
837+ def _exited(self, exit_code):
838+ self.logger.info("job finished on %s", self.job_data['target'])
839+ if self._json_file is not None:
840+ os.unlink(self._json_file)
841+ self.logger.info("reporting job completed")
842+ if self._time_limit_call is not None:
843+ self._time_limit_call.cancel()
844+ self._checkCancel_call.stop()
845+ return self._source_lock.run(
846+ self.source.jobCompleted,
847+ self.board_name,
848+ exit_code,
849+ self._killing).addCallback(
850+ lambda r: exit_code)
851+
852+
853+class SchedulerMonitorPP(ProcessProtocol):
854+
855+ def __init__(self, d, board_name):
856+ self.d = d
857+ self.board_name = board_name
858+ self.logger = logging.getLogger(__name__ + '.SchedulerMonitorPP')
859+
860+ def childDataReceived(self, childFD, data):
861+ self.logger.warning(
862+ "scheduler monitor for %s produced output: %r on fd %s",
863+ self.board_name, data, childFD)
864+
865+ def processEnded(self, reason):
866+ if not reason.check(ProcessDone):
867+ self.logger.error(
868+ "scheduler monitor for %s crashed: %s",
869+ self.board_name, reason)
870+ self.d.callback(None)
871+
872+
873+class MonitorJob(object):
874+
875+ def __init__(self, job_data, dispatcher, source, board_name, reactor,
876+ daemon_options):
877+ self.logger = logging.getLogger(__name__ + '.MonitorJob')
878+ self.job_data = job_data
879+ self.dispatcher = dispatcher
880+ self.source = source
881+ self.board_name = board_name
882+ self.reactor = reactor
883+ self.daemon_options = daemon_options
884+ self._json_file = None
885+
886+ def run(self):
887+ d = defer.Deferred()
888+ json_data = self.job_data
889+ fd, self._json_file = tempfile.mkstemp()
890+ with os.fdopen(fd, 'wb') as f:
891+ json.dump(json_data, f)
892+
893+ childFDs = {0: 0, 1: 1, 2: 2}
894+ args = [
895+ 'setsid', 'lava-server', 'manage', 'schedulermonitor',
896+ self.dispatcher, str(self.board_name), self._json_file,
897+ '-l', self.daemon_options['LOG_LEVEL']]
898+ if self.daemon_options['LOG_FILE_PATH']:
899+ args.extend(['-f', self.daemon_options['LOG_FILE_PATH']])
900+ childFDs = None
901+ self.logger.info('executing "%s"', ' '.join(args))
902+ self.reactor.spawnProcess(
903+ SchedulerMonitorPP(d, self.board_name), 'setsid',
904+ childFDs=childFDs, env=None, args=args)
905+ d.addBoth(self._exited)
906+ return d
907+
908+ def _exited(self, result):
909+ if self._json_file is not None:
910+ os.unlink(self._json_file)
911+ return result
912+
913+
914+class JobRunner(object):
915 job_cls = MonitorJob
916
917 def __init__(self, source, job, dispatcher, reactor, daemon_options,
918@@ -39,7 +237,7 @@
919 if job_cls is not None:
920 self.job_cls = job_cls
921 self.running_job = None
922- self.logger = logging.getLogger(__name__ + '.NewJob.' + str(job.id))
923+ self.logger = logging.getLogger(__name__ + '.JobRunner.' + str(job.id))
924
925 def start(self):
926 self.logger.debug("processing job")
927
928=== modified file 'lava_scheduler_daemon/service.py'
929--- lava_scheduler_daemon/service.py 2013-07-22 12:43:45 +0000
930+++ lava_scheduler_daemon/service.py 2013-08-28 13:17:52 +0000
931@@ -1,62 +1,28 @@
932+# Copyright (C) 2013 Linaro Limited
933+#
934+# Author: Senthil Kumaran <senthil.kumaran@linaro.org>
935+#
936+# This file is part of LAVA Scheduler.
937+#
938+# LAVA Scheduler is free software: you can redistribute it and/or modify it
939+# under the terms of the GNU Affero General Public License version 3 as
940+# published by the Free Software Foundation
941+#
942+# LAVA Scheduler is distributed in the hope that it will be useful, but
943+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
944+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
945+# more details.
946+#
947+# You should have received a copy of the GNU Affero General Public License
948+# along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>.
949+
950 import logging
951
952 from twisted.application.service import Service
953 from twisted.internet import defer
954 from twisted.internet.task import LoopingCall
955
956-from lava_scheduler_daemon.board import Board, catchall_errback
957-from lava_scheduler_daemon.job import NewJob
958-
959-
960-class BoardSet(Service):
961-
962- def __init__(self, source, dispatcher, reactor, daemon_options):
963- self.logger = logging.getLogger(__name__ + '.BoardSet')
964- self.source = source
965- self.boards = {}
966- self.dispatcher = dispatcher
967- self.reactor = reactor
968- self.daemon_options = daemon_options
969- self._update_boards_call = LoopingCall(self._updateBoards)
970- self._update_boards_call.clock = reactor
971-
972- def _updateBoards(self):
973- self.logger.debug("Refreshing board list")
974- return self.source.getBoardList().addCallback(
975- self._cbUpdateBoards).addErrback(catchall_errback(self.logger))
976-
977- def _cbUpdateBoards(self, board_cfgs):
978- '''board_cfgs is an array of dicts {hostname=name} '''
979- new_boards = {}
980- for board_cfg in board_cfgs:
981- board_name = board_cfg['hostname']
982-
983- if board_cfg['hostname'] in self.boards:
984- board = self.boards.pop(board_name)
985- new_boards[board_name] = board
986- else:
987- self.logger.info("Adding board: %s" % board_name)
988- new_boards[board_name] = Board(
989- self.source, board_name, self.dispatcher, self.reactor,
990- self.daemon_options)
991- new_boards[board_name].start()
992- for board in self.boards.values():
993- self.logger.info("Removing board: %s" % board.board_name)
994- board.stop()
995- self.boards = new_boards
996-
997- def startService(self):
998- self._update_boards_call.start(20)
999-
1000- def stopService(self):
1001- self._update_boards_call.stop()
1002- ds = []
1003- dead_boards = []
1004- for board in self.boards.itervalues():
1005- ds.append(board.stop().addCallback(dead_boards.append))
1006- self.logger.info(
1007- "waiting for %s boards", len(self.boards) - len(dead_boards))
1008- return defer.gatherResults(ds)
1009+from lava_scheduler_daemon.job import JobRunner, catchall_errback
1010
1011
1012 class JobQueue(Service):
1013@@ -77,8 +43,8 @@
1014
1015 def _cbCheckJobs(self, job_list):
1016 for job in job_list:
1017- new_job = NewJob(self.source, job, self.dispatcher, self.reactor,
1018- self.daemon_options)
1019+ new_job = JobRunner(self.source, job, self.dispatcher,
1020+ self.reactor, self.daemon_options)
1021 self.logger.info("Starting Job: %d " % job.id)
1022 new_job.start()
1023
1024
1025=== modified file 'lava_scheduler_daemon/tests/test_board.py'
1026--- lava_scheduler_daemon/tests/test_board.py 2013-07-17 12:48:53 +0000
1027+++ lava_scheduler_daemon/tests/test_board.py 2013-08-28 13:17:52 +0000
1028@@ -38,7 +38,7 @@
1029
1030 class TestJob(object):
1031
1032- def __init__(self, job_data, dispatcher, source, board_name, reactor, options, use_celery):
1033+ def __init__(self, job_data, dispatcher, source, board_name, reactor, options):
1034 self.json_data = job_data
1035 self.dispatcher = dispatcher
1036 self.reactor = reactor

Subscribers

People subscribed via source and target branches

to status/vote changes: