Merge lp:~stylesen/lava-scheduler/review-take-1 into lp:lava-scheduler/multinode
- review-take-1
- Merge into multinode
Proposed by
Senthil Kumaran S
Status: | Merged |
---|---|
Approved by: | Neil Williams |
Approved revision: | 287 |
Merged at revision: | 285 |
Proposed branch: | lp:~stylesen/lava-scheduler/review-take-1 |
Merge into: | lp:lava-scheduler/multinode |
Diff against target: |
1036 lines (+293/-550) 11 files modified
lava_scheduler_app/api.py (+2/-2) lava_scheduler_app/management/commands/schedulermonitor.py (+1/-1) lava_scheduler_app/models.py (+8/-1) lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html (+2/-2) lava_scheduler_app/utils.py (+50/-49) lava_scheduler_app/views.py (+2/-2) lava_scheduler_daemon/board.py (+0/-355) lava_scheduler_daemon/dbjobsource.py (+2/-76) lava_scheduler_daemon/job.py (+204/-6) lava_scheduler_daemon/service.py (+21/-55) lava_scheduler_daemon/tests/test_board.py (+1/-1) |
To merge this branch: | bzr merge lp:~stylesen/lava-scheduler/review-take-1 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Neil Williams | Approve | ||
Review via email: mp+182635@code.launchpad.net |
Commit message
Description of the change
Addressed all review comments from Antonio and removed all legacy code which is no longer required.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'lava_scheduler_app/api.py' |
2 | --- lava_scheduler_app/api.py 2013-08-16 09:21:16 +0000 |
3 | +++ lava_scheduler_app/api.py 2013-08-28 13:17:52 +0000 |
4 | @@ -49,7 +49,7 @@ |
5 | job = TestJob.objects.accessible_by_principal(self.user).get(pk=job_id) |
6 | except TestJob.DoesNotExist: |
7 | raise xmlrpclib.Fault(404, "Specified job not found.") |
8 | - if job.target_group: |
9 | + if job.is_multinode: |
10 | return self.submit_job(job.multinode_definition) |
11 | else: |
12 | return self.submit_job(job.definition) |
13 | @@ -60,7 +60,7 @@ |
14 | job = TestJob.objects.get(pk=job_id) |
15 | if not job.can_cancel(self.user): |
16 | raise xmlrpclib.Fault(403, "Permission denied.") |
17 | - if job.target_group: |
18 | + if job.is_multinode: |
19 | multinode_jobs = TestJob.objects.all().filter( |
20 | target_group=job.target_group) |
21 | for multinode_job in multinode_jobs: |
22 | |
23 | === modified file 'lava_scheduler_app/management/commands/schedulermonitor.py' |
24 | --- lava_scheduler_app/management/commands/schedulermonitor.py 2012-12-03 05:03:38 +0000 |
25 | +++ lava_scheduler_app/management/commands/schedulermonitor.py 2013-08-28 13:17:52 +0000 |
26 | @@ -31,7 +31,7 @@ |
27 | |
28 | def handle(self, *args, **options): |
29 | from twisted.internet import reactor |
30 | - from lava_scheduler_daemon.board import Job |
31 | + from lava_scheduler_daemon.job import Job |
32 | daemon_options = self._configure(options) |
33 | source = DatabaseJobSource() |
34 | dispatcher, board_name, json_file = args |
35 | |
36 | === modified file 'lava_scheduler_app/models.py' |
37 | --- lava_scheduler_app/models.py 2013-08-22 05:58:24 +0000 |
38 | +++ lava_scheduler_app/models.py 2013-08-28 13:17:52 +0000 |
39 | @@ -640,13 +640,20 @@ |
40 | |
41 | @property |
42 | def sub_jobs_list(self): |
43 | - if self.target_group: |
44 | + if self.is_multinode: |
45 | jobs = TestJob.objects.filter( |
46 | target_group=self.target_group).order_by('id') |
47 | return jobs |
48 | else: |
49 | return None |
50 | |
51 | + @property |
52 | + def is_multinode(self): |
53 | + if self.target_group: |
54 | + return True |
55 | + else: |
56 | + return False |
57 | + |
58 | |
59 | class DeviceStateTransition(models.Model): |
60 | created_on = models.DateTimeField(auto_now_add=True) |
61 | |
62 | === modified file 'lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html' |
63 | --- lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html 2013-08-22 05:58:24 +0000 |
64 | +++ lava_scheduler_app/templates/lava_scheduler_app/job_sidebar.html 2013-08-28 13:17:52 +0000 |
65 | @@ -63,7 +63,7 @@ |
66 | <dt>Finished at:</dt> |
67 | <dd>{{ job.end_time|default:"not finished" }}</dd> |
68 | |
69 | - {% if job.target_group %} |
70 | + {% if job.is_multinode %} |
71 | <dt>Sub Jobs:</dt> |
72 | {% for subjob in job.sub_jobs_list %} |
73 | <dd> |
74 | @@ -87,7 +87,7 @@ |
75 | <li> |
76 | <a href="{% url lava.scheduler.job.definition job.pk %}">Definition</a> |
77 | </li> |
78 | - {% if job.target_group %} |
79 | + {% if job.is_multinode %} |
80 | <li> |
81 | <a href="{% url lava.scheduler.job.multinode_definition job.pk %}"> Multinode Definition</a> |
82 | </li> |
83 | |
84 | === modified file 'lava_scheduler_app/utils.py' |
85 | --- lava_scheduler_app/utils.py 2013-08-23 09:52:50 +0000 |
86 | +++ lava_scheduler_app/utils.py 2013-08-28 13:17:52 +0000 |
87 | @@ -45,49 +45,54 @@ |
88 | node_json = {} |
89 | all_nodes = {} |
90 | node_actions = {} |
91 | - if "device_group" in json_jobdata: |
92 | - # get all the roles and create node action list for each role. |
93 | - for group in json_jobdata["device_group"]: |
94 | - node_actions[group["role"]] = [] |
95 | - |
96 | - # Take each action and assign it to proper roles. If roles are not |
97 | - # specified for a specific action, then assign it to all the roles. |
98 | - all_actions = json_jobdata["actions"] |
99 | - for role in node_actions.keys(): |
100 | - for action in all_actions: |
101 | - new_action = copy.deepcopy(action) |
102 | - if 'parameters' in new_action \ |
103 | - and 'role' in new_action["parameters"]: |
104 | - if new_action["parameters"]["role"] == role: |
105 | - new_action["parameters"].pop('role', None) |
106 | - node_actions[role].append(new_action) |
107 | - else: |
108 | + |
109 | + # Check if we are operating on multinode job data. Else return the job |
110 | + # data as it is. |
111 | + if "device_group" in json_jobdata and target_group: |
112 | + pass |
113 | + else: |
114 | + return json_jobdata |
115 | + |
116 | + # get all the roles and create node action list for each role. |
117 | + for group in json_jobdata["device_group"]: |
118 | + node_actions[group["role"]] = [] |
119 | + |
120 | + # Take each action and assign it to proper roles. If roles are not |
121 | + # specified for a specific action, then assign it to all the roles. |
122 | + all_actions = json_jobdata["actions"] |
123 | + for role in node_actions.keys(): |
124 | + for action in all_actions: |
125 | + new_action = copy.deepcopy(action) |
126 | + if 'parameters' in new_action \ |
127 | + and 'role' in new_action["parameters"]: |
128 | + if new_action["parameters"]["role"] == role: |
129 | + new_action["parameters"].pop('role', None) |
130 | node_actions[role].append(new_action) |
131 | - |
132 | - group_count = 0 |
133 | - for clients in json_jobdata["device_group"]: |
134 | - group_count += int(clients["count"]) |
135 | - for clients in json_jobdata["device_group"]: |
136 | - role = str(clients["role"]) |
137 | - count = int(clients["count"]) |
138 | - node_json[role] = [] |
139 | - for c in range(0, count): |
140 | - node_json[role].append({}) |
141 | - node_json[role][c]["timeout"] = json_jobdata["timeout"] |
142 | - node_json[role][c]["job_name"] = json_jobdata["job_name"] |
143 | - node_json[role][c]["tags"] = clients["tags"] |
144 | - node_json[role][c]["group_size"] = group_count |
145 | - node_json[role][c]["target_group"] = target_group |
146 | - node_json[role][c]["actions"] = node_actions[role] |
147 | - |
148 | - node_json[role][c]["role"] = role |
149 | - # multinode node stage 2 |
150 | - node_json[role][c]["logging_level"] = json_jobdata["logging_level"] |
151 | - node_json[role][c]["device_type"] = clients["device_type"] |
152 | - |
153 | - return node_json |
154 | - |
155 | - return 0 |
156 | + else: |
157 | + node_actions[role].append(new_action) |
158 | + |
159 | + group_count = 0 |
160 | + for clients in json_jobdata["device_group"]: |
161 | + group_count += int(clients["count"]) |
162 | + for clients in json_jobdata["device_group"]: |
163 | + role = str(clients["role"]) |
164 | + count = int(clients["count"]) |
165 | + node_json[role] = [] |
166 | + for c in range(0, count): |
167 | + node_json[role].append({}) |
168 | + node_json[role][c]["timeout"] = json_jobdata["timeout"] |
169 | + node_json[role][c]["job_name"] = json_jobdata["job_name"] |
170 | + node_json[role][c]["tags"] = clients["tags"] |
171 | + node_json[role][c]["group_size"] = group_count |
172 | + node_json[role][c]["target_group"] = target_group |
173 | + node_json[role][c]["actions"] = node_actions[role] |
174 | + |
175 | + node_json[role][c]["role"] = role |
176 | + # multinode node stage 2 |
177 | + node_json[role][c]["logging_level"] = json_jobdata["logging_level"] |
178 | + node_json[role][c]["device_type"] = clients["device_type"] |
179 | + |
180 | + return node_json |
181 | |
182 | |
183 | def requested_device_count(json_data): |
184 | @@ -100,17 +105,13 @@ |
185 | |
186 | {'kvm': 1, 'qemu': 3, 'panda': 1} |
187 | |
188 | - If the job is not a multinode job, then return None. |
189 | + If the job is not a multinode job, then return an empty dictionary. |
190 | """ |
191 | job_data = simplejson.loads(json_data) |
192 | + requested_devices = {} |
193 | if 'device_group' in job_data: |
194 | - requested_devices = {} |
195 | for device_group in job_data['device_group']: |
196 | device_type = device_group['device_type'] |
197 | count = device_group['count'] |
198 | requested_devices[device_type] = count |
199 | - return requested_devices |
200 | - else: |
201 | - # TODO: Put logic to check whether we have requested devices attached |
202 | - # to this lava-server, even if it is a single node job? |
203 | - return None |
204 | + return requested_devices |
205 | |
206 | === modified file 'lava_scheduler_app/views.py' |
207 | --- lava_scheduler_app/views.py 2013-08-27 15:18:24 +0000 |
208 | +++ lava_scheduler_app/views.py 2013-08-28 13:17:52 +0000 |
209 | @@ -801,7 +801,7 @@ |
210 | def job_cancel(request, pk): |
211 | job = get_restricted_job(request.user, pk) |
212 | if job.can_cancel(request.user): |
213 | - if job.target_group: |
214 | + if job.is_multinode: |
215 | multinode_jobs = TestJob.objects.all().filter( |
216 | target_group=job.target_group) |
217 | for multinode_job in multinode_jobs: |
218 | @@ -826,7 +826,7 @@ |
219 | if job.can_resubmit(request.user): |
220 | response_data["is_authorized"] = True |
221 | |
222 | - if job.target_group: |
223 | + if job.is_multinode: |
224 | definition = job.multinode_definition |
225 | else: |
226 | definition = job.definition |
227 | |
228 | === removed file 'lava_scheduler_daemon/board.py' |
229 | --- lava_scheduler_daemon/board.py 2013-08-19 10:44:11 +0000 |
230 | +++ lava_scheduler_daemon/board.py 1970-01-01 00:00:00 +0000 |
231 | @@ -1,355 +0,0 @@ |
232 | -import json |
233 | -import os |
234 | -import signal |
235 | -import tempfile |
236 | -import logging |
237 | - |
238 | -from twisted.internet.error import ProcessDone, ProcessExitedAlready |
239 | -from twisted.internet.protocol import ProcessProtocol |
240 | -from twisted.internet import defer, task |
241 | - |
242 | - |
243 | -def catchall_errback(logger): |
244 | - def eb(failure): |
245 | - logger.error( |
246 | - '%s: %s\n%s', failure.type.__name__, failure.value, |
247 | - failure.getTraceback()) |
248 | - return eb |
249 | - |
250 | - |
251 | -class DispatcherProcessProtocol(ProcessProtocol): |
252 | - |
253 | - def __init__(self, deferred, job): |
254 | - self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol') |
255 | - self.deferred = deferred |
256 | - self.log_size = 0 |
257 | - self.job = job |
258 | - |
259 | - def childDataReceived(self, childFD, data): |
260 | - self.log_size += len(data) |
261 | - if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']: |
262 | - if not self.job._killing: |
263 | - self.job.cancel("exceeded log size limit") |
264 | - |
265 | - def childConnectionLost(self, childFD): |
266 | - self.logger.info("childConnectionLost for %s: %s", |
267 | - self.job.board_name, childFD) |
268 | - |
269 | - def processExited(self, reason): |
270 | - self.logger.info("processExited for %s: %s", |
271 | - self.job.board_name, reason.value) |
272 | - |
273 | - def processEnded(self, reason): |
274 | - self.logger.info("processEnded for %s: %s", |
275 | - self.job.board_name, reason.value) |
276 | - self.deferred.callback(reason.value.exitCode) |
277 | - |
278 | - |
279 | -class Job(object): |
280 | - |
281 | - def __init__(self, job_data, dispatcher, source, board_name, reactor, |
282 | - daemon_options): |
283 | - self.job_data = job_data |
284 | - self.dispatcher = dispatcher |
285 | - self.source = source |
286 | - self.board_name = board_name |
287 | - self.logger = logging.getLogger(__name__ + '.Job.' + board_name) |
288 | - self.reactor = reactor |
289 | - self.daemon_options = daemon_options |
290 | - self._json_file = None |
291 | - self._source_lock = defer.DeferredLock() |
292 | - self._checkCancel_call = task.LoopingCall(self._checkCancel) |
293 | - self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL'] |
294 | - self._time_limit_call = None |
295 | - self._killing = False |
296 | - self._kill_reason = '' |
297 | - |
298 | - def _checkCancel(self): |
299 | - if self._killing: |
300 | - self.cancel() |
301 | - else: |
302 | - return self._source_lock.run( |
303 | - self.source.jobCheckForCancellation, |
304 | - self.board_name).addCallback(self._maybeCancel) |
305 | - |
306 | - def cancel(self, reason=None): |
307 | - if not self._killing: |
308 | - if reason is None: |
309 | - reason = "killing job for unknown reason" |
310 | - self._kill_reason = reason |
311 | - self.logger.info(reason) |
312 | - self._killing = True |
313 | - if self._signals: |
314 | - signame = self._signals.pop(0) |
315 | - else: |
316 | - self.logger.warning("self._signals is empty!") |
317 | - signame = 'SIGKILL' |
318 | - self.logger.info( |
319 | - 'attempting to kill job with signal %s' % signame) |
320 | - try: |
321 | - self._protocol.transport.signalProcess(getattr(signal, signame)) |
322 | - except ProcessExitedAlready: |
323 | - pass |
324 | - |
325 | - def _maybeCancel(self, cancel): |
326 | - if cancel: |
327 | - self.cancel("killing job by user request") |
328 | - else: |
329 | - logging.debug('not cancelling') |
330 | - |
331 | - def _time_limit_exceeded(self): |
332 | - self._time_limit_call = None |
333 | - self.cancel("killing job for exceeding timeout") |
334 | - |
335 | - def run(self): |
336 | - d = self.source.getOutputDirForJobOnBoard(self.board_name) |
337 | - return d.addCallback(self._run).addErrback( |
338 | - catchall_errback(self.logger)) |
339 | - |
340 | - def _run(self, output_dir): |
341 | - d = defer.Deferred() |
342 | - json_data = self.job_data |
343 | - fd, self._json_file = tempfile.mkstemp() |
344 | - with os.fdopen(fd, 'wb') as f: |
345 | - json.dump(json_data, f) |
346 | - self._protocol = DispatcherProcessProtocol(d, self) |
347 | - self.reactor.spawnProcess( |
348 | - self._protocol, self.dispatcher, args=[ |
349 | - self.dispatcher, self._json_file, '--output-dir', output_dir], |
350 | - childFDs={0: 0, 1: 'r', 2: 'r'}, env=None) |
351 | - self._checkCancel_call.start(10) |
352 | - timeout = max( |
353 | - json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT']) |
354 | - self._time_limit_call = self.reactor.callLater( |
355 | - timeout, self._time_limit_exceeded) |
356 | - d.addBoth(self._exited) |
357 | - return d |
358 | - |
359 | - def _exited(self, exit_code): |
360 | - self.logger.info("job finished on %s", self.job_data['target']) |
361 | - if self._json_file is not None: |
362 | - os.unlink(self._json_file) |
363 | - self.logger.info("reporting job completed") |
364 | - if self._time_limit_call is not None: |
365 | - self._time_limit_call.cancel() |
366 | - self._checkCancel_call.stop() |
367 | - return self._source_lock.run( |
368 | - self.source.jobCompleted, |
369 | - self.board_name, |
370 | - exit_code, |
371 | - self._killing).addCallback( |
372 | - lambda r: exit_code) |
373 | - |
374 | - |
375 | -class SchedulerMonitorPP(ProcessProtocol): |
376 | - |
377 | - def __init__(self, d, board_name): |
378 | - self.d = d |
379 | - self.board_name = board_name |
380 | - self.logger = logging.getLogger(__name__ + '.SchedulerMonitorPP') |
381 | - |
382 | - def childDataReceived(self, childFD, data): |
383 | - self.logger.warning( |
384 | - "scheduler monitor for %s produced output: %r on fd %s", |
385 | - self.board_name, data, childFD) |
386 | - |
387 | - def processEnded(self, reason): |
388 | - if not reason.check(ProcessDone): |
389 | - self.logger.error( |
390 | - "scheduler monitor for %s crashed: %s", |
391 | - self.board_name, reason) |
392 | - self.d.callback(None) |
393 | - |
394 | - |
395 | -class MonitorJob(object): |
396 | - |
397 | - def __init__(self, job_data, dispatcher, source, board_name, reactor, |
398 | - daemon_options): |
399 | - self.logger = logging.getLogger(__name__ + '.MonitorJob') |
400 | - self.job_data = job_data |
401 | - self.dispatcher = dispatcher |
402 | - self.source = source |
403 | - self.board_name = board_name |
404 | - self.reactor = reactor |
405 | - self.daemon_options = daemon_options |
406 | - self._json_file = None |
407 | - |
408 | - def run(self): |
409 | - d = defer.Deferred() |
410 | - json_data = self.job_data |
411 | - fd, self._json_file = tempfile.mkstemp() |
412 | - with os.fdopen(fd, 'wb') as f: |
413 | - json.dump(json_data, f) |
414 | - |
415 | - childFDs = {0: 0, 1: 1, 2: 2} |
416 | - args = [ |
417 | - 'setsid', 'lava-server', 'manage', 'schedulermonitor', |
418 | - self.dispatcher, str(self.board_name), self._json_file, |
419 | - '-l', self.daemon_options['LOG_LEVEL']] |
420 | - if self.daemon_options['LOG_FILE_PATH']: |
421 | - args.extend(['-f', self.daemon_options['LOG_FILE_PATH']]) |
422 | - childFDs = None |
423 | - self.logger.info('executing "%s"', ' '.join(args)) |
424 | - self.reactor.spawnProcess( |
425 | - SchedulerMonitorPP(d, self.board_name), 'setsid', |
426 | - childFDs=childFDs, env=None, args=args) |
427 | - d.addBoth(self._exited) |
428 | - return d |
429 | - |
430 | - def _exited(self, result): |
431 | - if self._json_file is not None: |
432 | - os.unlink(self._json_file) |
433 | - return result |
434 | - |
435 | - |
436 | -class Board(object): |
437 | - """ |
438 | - A board runs jobs. A board can be in four main states: |
439 | - |
440 | - * stopped (S) |
441 | - * the board is not looking for or processing jobs |
442 | - * checking (C) |
443 | - * a call to check for a new job is in progress |
444 | - * waiting (W) |
445 | - * no job was found by the last call to getJobForBoard and so the board |
446 | - is waiting for a while before calling again. |
447 | - * running (R) |
448 | - * a job is running (or a job has completed but the call to jobCompleted |
449 | - on the job source has not) |
450 | - |
451 | - In addition, because we can't stop a job instantly nor abort a check for a |
452 | - new job safely (because a if getJobForBoard returns a job, it has already |
453 | - been marked as started), there are variations on the 'checking' and |
454 | - 'running' states -- 'checking with stop requested' (C+S) and 'running with |
455 | - stop requested' (R+S). Even this is a little simplistic as there is the |
456 | - possibility of .start() being called before the process of stopping |
457 | - completes, but we deal with this by deferring any actions taken by |
458 | - .start() until the board is really stopped. |
459 | - |
460 | - Events that cause state transitions are: |
461 | - |
462 | - * start() is called. We cheat and pretend that this can only happen in |
463 | - the stopped state by stopping first, and then move into the C state. |
464 | - |
465 | - * stop() is called. If we in the C or R state we move to C+S or R+S |
466 | - resepectively. If we are in S, C+S or R+S, we stay there. If we are |
467 | - in W, we just move straight to S. |
468 | - |
469 | - * getJobForBoard() returns a job. We can only be in C or C+S here, and |
470 | - move into R or R+S respectively. |
471 | - |
472 | - * getJobForBoard() indicates that there is no job to perform. Again we |
473 | - can only be in C or C+S and move into W or S respectively. |
474 | - |
475 | - * a job completes (i.e. the call to jobCompleted() on the source |
476 | - returns). We can only be in R or R+S and move to C or S respectively. |
477 | - |
478 | - * the timer that being in state W implies expires. We move into C. |
479 | - |
480 | - The cheating around start means that interleaving start and stop calls may |
481 | - not always do what you expect. So don't mess around in that way please. |
482 | - """ |
483 | - |
484 | - job_cls = MonitorJob |
485 | - |
486 | - def __init__(self, source, board_name, dispatcher, reactor, daemon_options, |
487 | - job_cls=None): |
488 | - self.source = source |
489 | - self.board_name = board_name |
490 | - self.dispatcher = dispatcher |
491 | - self.reactor = reactor |
492 | - self.daemon_options = daemon_options |
493 | - if job_cls is not None: |
494 | - self.job_cls = job_cls |
495 | - self.running_job = None |
496 | - self._check_call = None |
497 | - self._stopping_deferreds = [] |
498 | - self.logger = logging.getLogger(__name__ + '.Board.' + board_name) |
499 | - self.checking = False |
500 | - |
501 | - def _state_name(self): |
502 | - if self.running_job: |
503 | - state = "R" |
504 | - elif self._check_call: |
505 | - assert not self._stopping_deferreds |
506 | - state = "W" |
507 | - elif self.checking: |
508 | - state = "C" |
509 | - else: |
510 | - assert not self._stopping_deferreds |
511 | - state = "S" |
512 | - if self._stopping_deferreds: |
513 | - state += "+S" |
514 | - return state |
515 | - |
516 | - def start(self): |
517 | - self.logger.debug("start requested") |
518 | - self.stop().addCallback(self._start) |
519 | - |
520 | - def _start(self, ignored): |
521 | - self.logger.debug("starting") |
522 | - self._stopping_deferreds = [] |
523 | - self._checkForJob() |
524 | - |
525 | - def stop(self): |
526 | - self.logger.debug("stopping") |
527 | - if self._check_call is not None: |
528 | - self._check_call.cancel() |
529 | - self._check_call = None |
530 | - |
531 | - if self.running_job is not None or self.checking: |
532 | - self.logger.debug("job running; deferring stop") |
533 | - self._stopping_deferreds.append(defer.Deferred()) |
534 | - return self._stopping_deferreds[-1] |
535 | - else: |
536 | - self.logger.debug("stopping immediately") |
537 | - return defer.succeed(None) |
538 | - |
539 | - def _checkForJob(self): |
540 | - self.logger.debug("checking for job") |
541 | - self._check_call = None |
542 | - self.checking = True |
543 | - self.source.getJobForBoard(self.board_name).addCallbacks( |
544 | - self._maybeStartJob, self._ebCheckForJob) |
545 | - |
546 | - def _ebCheckForJob(self, result): |
547 | - self.logger.error( |
548 | - '%s: %s\n%s', result.type.__name__, result.value, |
549 | - result.getTraceback()) |
550 | - self._maybeStartJob(None) |
551 | - |
552 | - def _finish_stop(self): |
553 | - self.logger.debug( |
554 | - "calling %s deferreds returned from stop()", |
555 | - len(self._stopping_deferreds)) |
556 | - for d in self._stopping_deferreds: |
557 | - d.callback(None) |
558 | - self._stopping_deferreds = [] |
559 | - |
560 | - def _maybeStartJob(self, job_data): |
561 | - self.checking = False |
562 | - if job_data is None: |
563 | - self.logger.debug("no job found") |
564 | - if self._stopping_deferreds: |
565 | - self._finish_stop() |
566 | - else: |
567 | - self._check_call = self.reactor.callLater( |
568 | - 10, self._checkForJob) |
569 | - return |
570 | - self.logger.info("starting job %r", job_data) |
571 | - self.running_job = self.job_cls( |
572 | - job_data, self.dispatcher, self.source, self.board_name, |
573 | - self.reactor, self.daemon_options, None) |
574 | - d = self.running_job.run() |
575 | - d.addCallbacks(self._cbJobFinished, self._ebJobFinished) |
576 | - |
577 | - def _ebJobFinished(self, result): |
578 | - self.logger.exception(result.value) |
579 | - self._checkForJob() |
580 | - |
581 | - def _cbJobFinished(self, result): |
582 | - self.running_job = None |
583 | - if self._stopping_deferreds: |
584 | - self._finish_stop() |
585 | - else: |
586 | - self._checkForJob() |
587 | |
588 | === modified file 'lava_scheduler_daemon/dbjobsource.py' |
589 | --- lava_scheduler_daemon/dbjobsource.py 2013-08-12 12:45:13 +0000 |
590 | +++ lava_scheduler_daemon/dbjobsource.py 2013-08-28 13:17:52 +0000 |
591 | @@ -93,21 +93,6 @@ |
592 | transaction.leave_transaction_management() |
593 | return self.deferToThread(wrapper, *args, **kw) |
594 | |
595 | - def getBoardList_impl(self): |
596 | - self.logger.info("Checking configured devices") |
597 | - configured_boards = [ |
598 | - x.hostname for x in dispatcher_config.get_devices()] |
599 | - boards = [] |
600 | - for d in configured_boards: |
601 | - self.logger.info("%s is configured" % d.hostname) |
602 | - for d in Device.objects.all(): |
603 | - if d.hostname in configured_boards: |
604 | - boards.append({'hostname': d.hostname}) |
605 | - return boards |
606 | - |
607 | - def getBoardList(self): |
608 | - return self.deferForDB(self.getBoardList_impl) |
609 | - |
610 | def _get_health_check_jobs(self): |
611 | """Gets the list of configured boards and checks which are the boards |
612 | that require health check. |
613 | @@ -203,7 +188,7 @@ |
614 | continue |
615 | if devices: |
616 | for d in devices: |
617 | - self.logger.info("Checking %s" % d.hostname) |
618 | + self.logger.debug("Checking %s" % d.hostname) |
619 | if d.hostname in configured_boards: |
620 | if job: |
621 | job = self._fix_device(d, job) |
622 | @@ -214,7 +199,7 @@ |
623 | # target_group are assigned devices. |
624 | final_job_list = copy.deepcopy(job_list) |
625 | for job in job_list: |
626 | - if job.target_group: |
627 | + if job.is_multinode: |
628 | multinode_jobs = TestJob.objects.all().filter( |
629 | target_group=job.target_group) |
630 | |
631 | @@ -301,65 +286,6 @@ |
632 | else: |
633 | return None |
634 | |
635 | - def getJobForBoard_impl(self, board_name): |
636 | - while True: |
637 | - device = Device.objects.get(hostname=board_name) |
638 | - if device.status != Device.IDLE: |
639 | - return None |
640 | - if not device.device_type.health_check_job: |
641 | - run_health_check = False |
642 | - elif device.health_status == Device.HEALTH_UNKNOWN: |
643 | - run_health_check = True |
644 | - elif device.health_status == Device.HEALTH_LOOPING: |
645 | - run_health_check = True |
646 | - elif not device.last_health_report_job: |
647 | - run_health_check = True |
648 | - else: |
649 | - run_health_check = device.last_health_report_job.end_time < datetime.datetime.now() - datetime.timedelta(days=1) |
650 | - if run_health_check: |
651 | - job = self._getHealthCheckJobForBoard(device) |
652 | - else: |
653 | - job = self._getJobFromQueue(device) |
654 | - if job: |
655 | - DeviceStateTransition.objects.create( |
656 | - created_by=None, device=device, old_state=device.status, |
657 | - new_state=Device.RUNNING, message=None, job=job).save() |
658 | - job.status = TestJob.RUNNING |
659 | - job.start_time = datetime.datetime.utcnow() |
660 | - job.actual_device = device |
661 | - device.status = Device.RUNNING |
662 | - shutil.rmtree(job.output_dir, ignore_errors=True) |
663 | - device.current_job = job |
664 | - try: |
665 | - # The unique constraint on current_job may cause this to |
666 | - # fail in the case of concurrent requests for different |
667 | - # boards grabbing the same job. If there are concurrent |
668 | - # requests for the *same* board they may both return the |
669 | - # same job -- this is an application level bug though. |
670 | - device.save() |
671 | - except IntegrityError: |
672 | - self.logger.info( |
673 | - "job %s has been assigned to another board -- " |
674 | - "rolling back", job.id) |
675 | - transaction.rollback() |
676 | - continue |
677 | - else: |
678 | - job.log_file.save( |
679 | - 'job-%s.log' % job.id, ContentFile(''), save=False) |
680 | - job.submit_token = AuthToken.objects.create(user=job.submitter) |
681 | - job.save() |
682 | - json_data = self._get_json_data(job) |
683 | - transaction.commit() |
684 | - return json_data |
685 | - else: |
686 | - # _getHealthCheckJobForBoard can offline the board, so commit |
687 | - # in this branch too. |
688 | - transaction.commit() |
689 | - return None |
690 | - |
691 | - def getJobForBoard(self, board_name): |
692 | - return self.deferForDB(self.getJobForBoard_impl, board_name) |
693 | - |
694 | def getJobDetails_impl(self, job): |
695 | job.status = TestJob.RUNNING |
696 | job.start_time = datetime.datetime.utcnow() |
697 | |
698 | === modified file 'lava_scheduler_daemon/job.py' |
699 | --- lava_scheduler_daemon/job.py 2013-07-17 12:48:53 +0000 |
700 | +++ lava_scheduler_daemon/job.py 2013-08-28 13:17:52 +0000 |
701 | @@ -16,13 +16,211 @@ |
702 | # You should have received a copy of the GNU Affero General Public License |
703 | # along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>. |
704 | |
705 | +import json |
706 | +import os |
707 | +import signal |
708 | +import tempfile |
709 | import logging |
710 | |
711 | -from twisted.internet import defer |
712 | -from lava_scheduler_daemon.board import MonitorJob |
713 | - |
714 | - |
715 | -class NewJob(object): |
716 | +from twisted.internet.error import ProcessDone, ProcessExitedAlready |
717 | +from twisted.internet.protocol import ProcessProtocol |
718 | +from twisted.internet import defer, task |
719 | + |
720 | + |
721 | +def catchall_errback(logger): |
722 | + def eb(failure): |
723 | + logger.error( |
724 | + '%s: %s\n%s', failure.type.__name__, failure.value, |
725 | + failure.getTraceback()) |
726 | + return eb |
727 | + |
728 | + |
729 | +class DispatcherProcessProtocol(ProcessProtocol): |
730 | + |
731 | + def __init__(self, deferred, job): |
732 | + self.logger = logging.getLogger(__name__ + '.DispatcherProcessProtocol') |
733 | + self.deferred = deferred |
734 | + self.log_size = 0 |
735 | + self.job = job |
736 | + |
737 | + def childDataReceived(self, childFD, data): |
738 | + self.log_size += len(data) |
739 | + if self.log_size > self.job.daemon_options['LOG_FILE_SIZE_LIMIT']: |
740 | + if not self.job._killing: |
741 | + self.job.cancel("exceeded log size limit") |
742 | + |
743 | + def childConnectionLost(self, childFD): |
744 | + self.logger.info("childConnectionLost for %s: %s", |
745 | + self.job.board_name, childFD) |
746 | + |
747 | + def processExited(self, reason): |
748 | + self.logger.info("processExited for %s: %s", |
749 | + self.job.board_name, reason.value) |
750 | + |
751 | + def processEnded(self, reason): |
752 | + self.logger.info("processEnded for %s: %s", |
753 | + self.job.board_name, reason.value) |
754 | + self.deferred.callback(reason.value.exitCode) |
755 | + |
756 | + |
757 | +class Job(object): |
758 | + |
759 | + def __init__(self, job_data, dispatcher, source, board_name, reactor, |
760 | + daemon_options): |
761 | + self.job_data = job_data |
762 | + self.dispatcher = dispatcher |
763 | + self.source = source |
764 | + self.board_name = board_name |
765 | + self.logger = logging.getLogger(__name__ + '.Job.' + board_name) |
766 | + self.reactor = reactor |
767 | + self.daemon_options = daemon_options |
768 | + self._json_file = None |
769 | + self._source_lock = defer.DeferredLock() |
770 | + self._checkCancel_call = task.LoopingCall(self._checkCancel) |
771 | + self._signals = ['SIGINT', 'SIGINT', 'SIGTERM', 'SIGTERM', 'SIGKILL'] |
772 | + self._time_limit_call = None |
773 | + self._killing = False |
774 | + self._kill_reason = '' |
775 | + |
776 | + def _checkCancel(self): |
777 | + if self._killing: |
778 | + self.cancel() |
779 | + else: |
780 | + return self._source_lock.run( |
781 | + self.source.jobCheckForCancellation, |
782 | + self.board_name).addCallback(self._maybeCancel) |
783 | + |
784 | + def cancel(self, reason=None): |
785 | + if not self._killing: |
786 | + if reason is None: |
787 | + reason = "killing job for unknown reason" |
788 | + self._kill_reason = reason |
789 | + self.logger.info(reason) |
790 | + self._killing = True |
791 | + if self._signals: |
792 | + signame = self._signals.pop(0) |
793 | + else: |
794 | + self.logger.warning("self._signals is empty!") |
795 | + signame = 'SIGKILL' |
796 | + self.logger.info( |
797 | + 'attempting to kill job with signal %s' % signame) |
798 | + try: |
799 | + self._protocol.transport.signalProcess(getattr(signal, signame)) |
800 | + except ProcessExitedAlready: |
801 | + pass |
802 | + |
803 | + def _maybeCancel(self, cancel): |
804 | + if cancel: |
805 | + self.cancel("killing job by user request") |
806 | + else: |
807 | + logging.debug('not cancelling') |
808 | + |
809 | + def _time_limit_exceeded(self): |
810 | + self._time_limit_call = None |
811 | + self.cancel("killing job for exceeding timeout") |
812 | + |
813 | + def run(self): |
814 | + d = self.source.getOutputDirForJobOnBoard(self.board_name) |
815 | + return d.addCallback(self._run).addErrback( |
816 | + catchall_errback(self.logger)) |
817 | + |
818 | + def _run(self, output_dir): |
819 | + d = defer.Deferred() |
820 | + json_data = self.job_data |
821 | + fd, self._json_file = tempfile.mkstemp() |
822 | + with os.fdopen(fd, 'wb') as f: |
823 | + json.dump(json_data, f) |
824 | + self._protocol = DispatcherProcessProtocol(d, self) |
825 | + self.reactor.spawnProcess( |
826 | + self._protocol, self.dispatcher, args=[ |
827 | + self.dispatcher, self._json_file, '--output-dir', output_dir], |
828 | + childFDs={0: 0, 1: 'r', 2: 'r'}, env=None) |
829 | + self._checkCancel_call.start(10) |
830 | + timeout = max( |
831 | + json_data['timeout'], self.daemon_options['MIN_JOB_TIMEOUT']) |
832 | + self._time_limit_call = self.reactor.callLater( |
833 | + timeout, self._time_limit_exceeded) |
834 | + d.addBoth(self._exited) |
835 | + return d |
836 | + |
837 | + def _exited(self, exit_code): |
838 | + self.logger.info("job finished on %s", self.job_data['target']) |
839 | + if self._json_file is not None: |
840 | + os.unlink(self._json_file) |
841 | + self.logger.info("reporting job completed") |
842 | + if self._time_limit_call is not None: |
843 | + self._time_limit_call.cancel() |
844 | + self._checkCancel_call.stop() |
845 | + return self._source_lock.run( |
846 | + self.source.jobCompleted, |
847 | + self.board_name, |
848 | + exit_code, |
849 | + self._killing).addCallback( |
850 | + lambda r: exit_code) |
851 | + |
852 | + |
853 | +class SchedulerMonitorPP(ProcessProtocol): |
854 | + |
855 | + def __init__(self, d, board_name): |
856 | + self.d = d |
857 | + self.board_name = board_name |
858 | + self.logger = logging.getLogger(__name__ + '.SchedulerMonitorPP') |
859 | + |
860 | + def childDataReceived(self, childFD, data): |
861 | + self.logger.warning( |
862 | + "scheduler monitor for %s produced output: %r on fd %s", |
863 | + self.board_name, data, childFD) |
864 | + |
865 | + def processEnded(self, reason): |
866 | + if not reason.check(ProcessDone): |
867 | + self.logger.error( |
868 | + "scheduler monitor for %s crashed: %s", |
869 | + self.board_name, reason) |
870 | + self.d.callback(None) |
871 | + |
872 | + |
873 | +class MonitorJob(object): |
874 | + |
875 | + def __init__(self, job_data, dispatcher, source, board_name, reactor, |
876 | + daemon_options): |
877 | + self.logger = logging.getLogger(__name__ + '.MonitorJob') |
878 | + self.job_data = job_data |
879 | + self.dispatcher = dispatcher |
880 | + self.source = source |
881 | + self.board_name = board_name |
882 | + self.reactor = reactor |
883 | + self.daemon_options = daemon_options |
884 | + self._json_file = None |
885 | + |
886 | + def run(self): |
887 | + d = defer.Deferred() |
888 | + json_data = self.job_data |
889 | + fd, self._json_file = tempfile.mkstemp() |
890 | + with os.fdopen(fd, 'wb') as f: |
891 | + json.dump(json_data, f) |
892 | + |
893 | + childFDs = {0: 0, 1: 1, 2: 2} |
894 | + args = [ |
895 | + 'setsid', 'lava-server', 'manage', 'schedulermonitor', |
896 | + self.dispatcher, str(self.board_name), self._json_file, |
897 | + '-l', self.daemon_options['LOG_LEVEL']] |
898 | + if self.daemon_options['LOG_FILE_PATH']: |
899 | + args.extend(['-f', self.daemon_options['LOG_FILE_PATH']]) |
900 | + childFDs = None |
901 | + self.logger.info('executing "%s"', ' '.join(args)) |
902 | + self.reactor.spawnProcess( |
903 | + SchedulerMonitorPP(d, self.board_name), 'setsid', |
904 | + childFDs=childFDs, env=None, args=args) |
905 | + d.addBoth(self._exited) |
906 | + return d |
907 | + |
908 | + def _exited(self, result): |
909 | + if self._json_file is not None: |
910 | + os.unlink(self._json_file) |
911 | + return result |
912 | + |
913 | + |
914 | +class JobRunner(object): |
915 | job_cls = MonitorJob |
916 | |
917 | def __init__(self, source, job, dispatcher, reactor, daemon_options, |
918 | @@ -39,7 +237,7 @@ |
919 | if job_cls is not None: |
920 | self.job_cls = job_cls |
921 | self.running_job = None |
922 | - self.logger = logging.getLogger(__name__ + '.NewJob.' + str(job.id)) |
923 | + self.logger = logging.getLogger(__name__ + '.JobRunner.' + str(job.id)) |
924 | |
925 | def start(self): |
926 | self.logger.debug("processing job") |
927 | |
928 | === modified file 'lava_scheduler_daemon/service.py' |
929 | --- lava_scheduler_daemon/service.py 2013-07-22 12:43:45 +0000 |
930 | +++ lava_scheduler_daemon/service.py 2013-08-28 13:17:52 +0000 |
931 | @@ -1,62 +1,28 @@ |
932 | +# Copyright (C) 2013 Linaro Limited |
933 | +# |
934 | +# Author: Senthil Kumaran <senthil.kumaran@linaro.org> |
935 | +# |
936 | +# This file is part of LAVA Scheduler. |
937 | +# |
938 | +# LAVA Scheduler is free software: you can redistribute it and/or modify it |
939 | +# under the terms of the GNU Affero General Public License version 3 as |
940 | +# published by the Free Software Foundation |
941 | +# |
942 | +# LAVA Scheduler is distributed in the hope that it will be useful, but |
943 | +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY |
944 | +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for |
945 | +# more details. |
946 | +# |
947 | +# You should have received a copy of the GNU Affero General Public License |
948 | +# along with LAVA Scheduler. If not, see <http://www.gnu.org/licenses/>. |
949 | + |
950 | import logging |
951 | |
952 | from twisted.application.service import Service |
953 | from twisted.internet import defer |
954 | from twisted.internet.task import LoopingCall |
955 | |
956 | -from lava_scheduler_daemon.board import Board, catchall_errback |
957 | -from lava_scheduler_daemon.job import NewJob |
958 | - |
959 | - |
960 | -class BoardSet(Service): |
961 | - |
962 | - def __init__(self, source, dispatcher, reactor, daemon_options): |
963 | - self.logger = logging.getLogger(__name__ + '.BoardSet') |
964 | - self.source = source |
965 | - self.boards = {} |
966 | - self.dispatcher = dispatcher |
967 | - self.reactor = reactor |
968 | - self.daemon_options = daemon_options |
969 | - self._update_boards_call = LoopingCall(self._updateBoards) |
970 | - self._update_boards_call.clock = reactor |
971 | - |
972 | - def _updateBoards(self): |
973 | - self.logger.debug("Refreshing board list") |
974 | - return self.source.getBoardList().addCallback( |
975 | - self._cbUpdateBoards).addErrback(catchall_errback(self.logger)) |
976 | - |
977 | - def _cbUpdateBoards(self, board_cfgs): |
978 | - '''board_cfgs is an array of dicts {hostname=name} ''' |
979 | - new_boards = {} |
980 | - for board_cfg in board_cfgs: |
981 | - board_name = board_cfg['hostname'] |
982 | - |
983 | - if board_cfg['hostname'] in self.boards: |
984 | - board = self.boards.pop(board_name) |
985 | - new_boards[board_name] = board |
986 | - else: |
987 | - self.logger.info("Adding board: %s" % board_name) |
988 | - new_boards[board_name] = Board( |
989 | - self.source, board_name, self.dispatcher, self.reactor, |
990 | - self.daemon_options) |
991 | - new_boards[board_name].start() |
992 | - for board in self.boards.values(): |
993 | - self.logger.info("Removing board: %s" % board.board_name) |
994 | - board.stop() |
995 | - self.boards = new_boards |
996 | - |
997 | - def startService(self): |
998 | - self._update_boards_call.start(20) |
999 | - |
1000 | - def stopService(self): |
1001 | - self._update_boards_call.stop() |
1002 | - ds = [] |
1003 | - dead_boards = [] |
1004 | - for board in self.boards.itervalues(): |
1005 | - ds.append(board.stop().addCallback(dead_boards.append)) |
1006 | - self.logger.info( |
1007 | - "waiting for %s boards", len(self.boards) - len(dead_boards)) |
1008 | - return defer.gatherResults(ds) |
1009 | +from lava_scheduler_daemon.job import JobRunner, catchall_errback |
1010 | |
1011 | |
1012 | class JobQueue(Service): |
1013 | @@ -77,8 +43,8 @@ |
1014 | |
1015 | def _cbCheckJobs(self, job_list): |
1016 | for job in job_list: |
1017 | - new_job = NewJob(self.source, job, self.dispatcher, self.reactor, |
1018 | - self.daemon_options) |
1019 | + new_job = JobRunner(self.source, job, self.dispatcher, |
1020 | + self.reactor, self.daemon_options) |
1021 | self.logger.info("Starting Job: %d " % job.id) |
1022 | new_job.start() |
1023 | |
1024 | |
1025 | === modified file 'lava_scheduler_daemon/tests/test_board.py' |
1026 | --- lava_scheduler_daemon/tests/test_board.py 2013-07-17 12:48:53 +0000 |
1027 | +++ lava_scheduler_daemon/tests/test_board.py 2013-08-28 13:17:52 +0000 |
1028 | @@ -38,7 +38,7 @@ |
1029 | |
1030 | class TestJob(object): |
1031 | |
1032 | - def __init__(self, job_data, dispatcher, source, board_name, reactor, options, use_celery): |
1033 | + def __init__(self, job_data, dispatcher, source, board_name, reactor, options): |
1034 | self.json_data = job_data |
1035 | self.dispatcher = dispatcher |
1036 | self.reactor = reactor |
Thanks Senthil - tested on multinode.v.l.o & approved.