Merge lp:~abentley/lazr.jobrunner/launchpad-via-celery into lp:lazr.jobrunner
- launchpad-via-celery
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 21 |
Proposed branch: | lp:~abentley/lazr.jobrunner/launchpad-via-celery |
Merge into: | lp:lazr.jobrunner |
Diff against target: |
874 lines (+352/-334) 17 files modified
.bzrignore (+1/-1) Makefile (+7/-4) setup.py (+2/-1) src/lazr.jobrunner.egg-info/PKG-INFO (+0/-46) src/lazr.jobrunner.egg-info/dependency_links.txt (+0/-1) src/lazr.jobrunner.egg-info/entry_points.txt (+0/-3) src/lazr.jobrunner.egg-info/namespace_packages.txt (+0/-1) src/lazr.jobrunner.egg-info/not-zip-safe (+0/-1) src/lazr.jobrunner.egg-info/top_level.txt (+0/-1) src/lazr/jobrunner/__init__.py (+1/-0) src/lazr/jobrunner/celerytask.py (+45/-0) src/lazr/jobrunner/jobrunner.py (+7/-22) src/lazr/jobrunner/tests/config1.py (+1/-1) src/lazr/jobrunner/tests/test_celerytask.py (+286/-0) src/lazr/jobrunner/tests/test_jobrunner.py (+0/-250) src/lazr/jobrunner/tests/time_limit_config.py (+1/-1) src/lazr/jobrunner/version.txt (+1/-1) |
To merge this branch: | bzr merge lp:~abentley/lazr.jobrunner/launchpad-via-celery |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Abel Deuring (community) | code | Approve | |
Review via email: mp+98894@code.launchpad.net |
Commit message
Update to support run-via-celery in Launchpad
Description of the change
This branch improves the compatibility of the celery job-running code with Launchpad.
The biggest change is that Celery-specific code has been isolated in its own modules. This is because importing from Celery has side-effects (i.e. loading a config module), so only code that intends to relate to Celery should import it.
I've removed src/lazr.
I've added a "lint" target to the makefile, so "make lint" will check lint. However, it lints all .py files in src. Since we don't have significant lint-broken code, we can ensure a completely lint-clean tree. I've cleaned up what lint we did have.
I've updated the version to 0.2 in setup.py, so that LP code can request the development egg instead of the 0.1 code.
I've moved celeryconfig.py to src/lazr/
I've copied the Launchpad version of LeaseHeld into jobrunner, and updated Launchpad to import it.
I've tweaked runJobHandleError to take a reference to job.fail early, so that it can be invoked when the DB transaction is invalid.
I've added getJobRunner to RunJobHandleError, so that Launchpad can use lp.services.
Preview Diff
1 | === modified file '.bzrignore' |
2 | --- .bzrignore 2012-02-23 17:11:46 +0000 |
3 | +++ .bzrignore 2012-03-22 18:19:18 +0000 |
4 | @@ -6,4 +6,4 @@ |
5 | tmp |
6 | build |
7 | dist |
8 | -src/lazr.jobrunner.egg-info/SOURCES.txt |
9 | +src/lazr.jobrunner.egg-info |
10 | |
11 | === modified file 'Makefile' |
12 | --- Makefile 2012-03-01 18:41:07 +0000 |
13 | +++ Makefile 2012-03-22 18:19:18 +0000 |
14 | @@ -1,9 +1,9 @@ |
15 | +develop: bin/python |
16 | + bin/buildout |
17 | + |
18 | check: bin/python |
19 | bin/nosetests |
20 | |
21 | -develop: bin/python |
22 | - bin/buildout |
23 | - |
24 | bin/buildout: |
25 | python -S bootstrap.py |
26 | |
27 | @@ -13,4 +13,7 @@ |
28 | clean: |
29 | bzr clean-tree --force --ignored --unknown |
30 | |
31 | -.PHONY: develop check clean |
32 | +lint: |
33 | + pocketlint `find src -name '*.py'` |
34 | + |
35 | +.PHONY: check clean develop lint |
36 | |
37 | === modified file 'setup.py' |
38 | --- setup.py 2012-03-15 15:14:59 +0000 |
39 | +++ setup.py 2012-03-22 18:19:18 +0000 |
40 | @@ -22,12 +22,13 @@ |
41 | NEWS = open(os.path.join(here, 'NEWS.txt')).read() |
42 | |
43 | |
44 | -version = '0.1' |
45 | +version = '0.2' |
46 | |
47 | install_requires = [ |
48 | # List your project dependencies here. |
49 | # For more details, see: |
50 | # http://packages.python.org/distribute/setuptools.html#declaring-dependencies |
51 | + 'celery', |
52 | ] |
53 | |
54 | |
55 | |
56 | === removed directory 'src/lazr.jobrunner.egg-info' |
57 | === removed file 'src/lazr.jobrunner.egg-info/PKG-INFO' |
58 | --- src/lazr.jobrunner.egg-info/PKG-INFO 2012-03-15 14:59:09 +0000 |
59 | +++ src/lazr.jobrunner.egg-info/PKG-INFO 1970-01-01 00:00:00 +0000 |
60 | @@ -1,46 +0,0 @@ |
61 | -Metadata-Version: 1.0 |
62 | -Name: lazr.jobrunner |
63 | -Version: 0.1 |
64 | -Summary: A Celery based job runner |
65 | -Home-page: UNKNOWN |
66 | -Author: UNKNOWN |
67 | -Author-email: UNKNOWN |
68 | -License: UNKNOWN |
69 | -Description: This file requires editing |
70 | - ========================== |
71 | - |
72 | - Note to the author: Please add something informative to this README *before* |
73 | - releasing your software, as `a little documentation goes a long way`_. Both |
74 | - README.rst (this file) and NEWS.txt (release notes) will be included in your |
75 | - package metadata which gets displayed in the PyPI page for your project. |
76 | - |
77 | - You can take a look at the README.txt of other projects, such as repoze.bfg |
78 | - (http://bfg.repoze.org/trac/browser/trunk/README.txt) for some ideas. |
79 | - |
80 | - .. _`a little documentation goes a long way`: http://www.martinaspeli.net/articles/a-little-documentation-goes-a-long-way |
81 | - |
82 | - Credits |
83 | - ------- |
84 | - |
85 | - - `Distribute`_ |
86 | - - `Buildout`_ |
87 | - - `modern-package-template`_ |
88 | - |
89 | - .. _Buildout: http://www.buildout.org/ |
90 | - .. _Distribute: http://pypi.python.org/pypi/distribute |
91 | - .. _`modern-package-template`: http://pypi.python.org/pypi/modern-package-template |
92 | - |
93 | - |
94 | - News |
95 | - ==== |
96 | - |
97 | - 0.1 |
98 | - --- |
99 | - |
100 | - *Release date: 15-Mar-2012* |
101 | - |
102 | - * Initial reelease |
103 | - |
104 | - |
105 | - |
106 | -Platform: UNKNOWN |
107 | |
108 | === removed file 'src/lazr.jobrunner.egg-info/dependency_links.txt' |
109 | --- src/lazr.jobrunner.egg-info/dependency_links.txt 2012-02-23 10:32:17 +0000 |
110 | +++ src/lazr.jobrunner.egg-info/dependency_links.txt 1970-01-01 00:00:00 +0000 |
111 | @@ -1,1 +0,0 @@ |
112 | - |
113 | |
114 | === removed file 'src/lazr.jobrunner.egg-info/entry_points.txt' |
115 | --- src/lazr.jobrunner.egg-info/entry_points.txt 2012-02-23 10:32:17 +0000 |
116 | +++ src/lazr.jobrunner.egg-info/entry_points.txt 1970-01-01 00:00:00 +0000 |
117 | @@ -1,3 +0,0 @@ |
118 | -[console_scripts] |
119 | -lazr.jobrunner = lazr.jobrunner:main |
120 | - |
121 | |
122 | === removed file 'src/lazr.jobrunner.egg-info/namespace_packages.txt' |
123 | --- src/lazr.jobrunner.egg-info/namespace_packages.txt 2012-02-23 10:32:17 +0000 |
124 | +++ src/lazr.jobrunner.egg-info/namespace_packages.txt 1970-01-01 00:00:00 +0000 |
125 | @@ -1,1 +0,0 @@ |
126 | -lazr |
127 | |
128 | === removed file 'src/lazr.jobrunner.egg-info/not-zip-safe' |
129 | --- src/lazr.jobrunner.egg-info/not-zip-safe 2012-02-23 10:32:17 +0000 |
130 | +++ src/lazr.jobrunner.egg-info/not-zip-safe 1970-01-01 00:00:00 +0000 |
131 | @@ -1,1 +0,0 @@ |
132 | - |
133 | |
134 | === removed file 'src/lazr.jobrunner.egg-info/top_level.txt' |
135 | --- src/lazr.jobrunner.egg-info/top_level.txt 2012-02-23 10:32:17 +0000 |
136 | +++ src/lazr.jobrunner.egg-info/top_level.txt 1970-01-01 00:00:00 +0000 |
137 | @@ -1,1 +0,0 @@ |
138 | -lazr |
139 | |
140 | === modified file 'src/lazr/jobrunner/__init__.py' |
141 | --- src/lazr/jobrunner/__init__.py 2012-02-23 10:32:17 +0000 |
142 | +++ src/lazr/jobrunner/__init__.py 2012-03-22 18:19:18 +0000 |
143 | @@ -1,4 +1,5 @@ |
144 | # Example package with a console entry point |
145 | |
146 | + |
147 | def main(): |
148 | print "Hello World" |
149 | |
150 | === renamed file 'celeryconfig.py' => 'src/lazr/jobrunner/celeryconfig.py' |
151 | === added file 'src/lazr/jobrunner/celerytask.py' |
152 | --- src/lazr/jobrunner/celerytask.py 1970-01-01 00:00:00 +0000 |
153 | +++ src/lazr/jobrunner/celerytask.py 2012-03-22 18:19:18 +0000 |
154 | @@ -0,0 +1,45 @@ |
155 | +# Copyright 2012 Canonical Ltd. All rights reserved. |
156 | +# |
157 | +# This file is part of lazr.jobrunner |
158 | +# |
159 | +# lazr.jobrunner is free software: you can redistribute it and/or modify |
160 | +# it under the terms of the GNU Lesser General Public License as published by |
161 | +# the Free Software Foundation, version 3 of the License. |
162 | +# |
163 | +# lazr.jobrunner is distributed in the hope that it will be useful, but |
164 | +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY |
165 | +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
166 | +# License for more details. |
167 | +# |
168 | +# You should have received a copy of the GNU Lesser General Public License |
169 | +# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>. |
170 | + |
171 | +__metaclass__ = type |
172 | + |
173 | + |
174 | +from celery.task import Task |
175 | +from lazr.jobrunner.jobrunner import ( |
176 | + JobRunner, |
177 | + LeaseHeld, |
178 | + memory_limit, |
179 | + ) |
180 | + |
181 | + |
182 | +class RunJob(Task): |
183 | + |
184 | + abstract = True |
185 | + |
186 | + oops_config = None |
187 | + |
188 | + def getJobRunner(self): |
189 | + return JobRunner(oops_config=self.oops_config) |
190 | + |
191 | + def run(self, job_id): |
192 | + job = self.job_source.get(job_id) |
193 | + try: |
194 | + job.acquireLease() |
195 | + except LeaseHeld: |
196 | + return |
197 | + runner = self.getJobRunner() |
198 | + with memory_limit(self.job_source.memory_limit): |
199 | + runner.runJobHandleError(job) |
200 | |
201 | === modified file 'src/lazr/jobrunner/jobrunner.py' |
202 | --- src/lazr/jobrunner/jobrunner.py 2012-03-16 18:27:52 +0000 |
203 | +++ src/lazr/jobrunner/jobrunner.py 2012-03-22 18:19:18 +0000 |
204 | @@ -26,8 +26,6 @@ |
205 | ) |
206 | import sys |
207 | |
208 | -from celery.task import Task |
209 | - |
210 | |
211 | class SuspendJobException(Exception): |
212 | """Raised when a running job wants to suspend itself.""" |
213 | @@ -35,7 +33,10 @@ |
214 | |
215 | |
216 | class LeaseHeld(Exception): |
217 | - """Raised when a lease cannot be acquired.""" |
218 | + """Raised when attempting to acquire a lease that is already held.""" |
219 | + |
220 | + def __init__(self): |
221 | + Exception.__init__(self, 'Lease is already held.') |
222 | |
223 | |
224 | class JobStatus: |
225 | @@ -65,23 +66,6 @@ |
226 | FAILED, SUSPENDED]) |
227 | |
228 | |
229 | -class RunJob(Task): |
230 | - |
231 | - abstract = True |
232 | - |
233 | - oops_config = None |
234 | - |
235 | - def run(self, job_id): |
236 | - job = self.job_source.get(job_id) |
237 | - try: |
238 | - job.acquireLease() |
239 | - except LeaseHeld: |
240 | - return |
241 | - runner = JobRunner(oops_config=self.oops_config) |
242 | - with memory_limit(self.job_source.memory_limit): |
243 | - runner.runJobHandleError(job) |
244 | - |
245 | - |
246 | @contextmanager |
247 | def memory_limit(limit): |
248 | if limit is not None: |
249 | @@ -169,6 +153,7 @@ |
250 | self.logger.info( |
251 | 'Running %s in status %s' % ( |
252 | self.job_str(job), job.status.title)) |
253 | + fail = job.fail |
254 | job.start(manage_transaction=True) |
255 | do_retry = False |
256 | try: |
257 | @@ -186,7 +171,7 @@ |
258 | job.suspend(manage_transaction=True) |
259 | self.incomplete_jobs.append(job) |
260 | except Exception: |
261 | - job.fail(manage_transaction=True) |
262 | + fail(manage_transaction=True) |
263 | self.incomplete_jobs.append(job) |
264 | raise |
265 | else: |
266 | @@ -216,7 +201,7 @@ |
267 | self.logger.exception("Failure in _doOops: %s" % e) |
268 | info = sys.exc_info() |
269 | report = job.makeOopsReport(self.oops_config, info) |
270 | - oops_ids = self.oops_config.publish(report) |
271 | + self.oops_config.publish(report) |
272 | return report |
273 | |
274 | @staticmethod |
275 | |
276 | === modified file 'src/lazr/jobrunner/tests/config1.py' |
277 | --- src/lazr/jobrunner/tests/config1.py 2012-03-09 21:50:46 +0000 |
278 | +++ src/lazr/jobrunner/tests/config1.py 2012-03-22 18:19:18 +0000 |
279 | @@ -1,6 +1,6 @@ |
280 | BROKER_VHOST = "/" |
281 | CELERY_RESULT_BACKEND = "amqp" |
282 | -CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", ) |
283 | +CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", ) |
284 | CELERYD_CONCURRENCY = 1 |
285 | import os |
286 | import oops |
287 | |
288 | === added file 'src/lazr/jobrunner/tests/test_celerytask.py' |
289 | --- src/lazr/jobrunner/tests/test_celerytask.py 1970-01-01 00:00:00 +0000 |
290 | +++ src/lazr/jobrunner/tests/test_celerytask.py 2012-03-22 18:19:18 +0000 |
291 | @@ -0,0 +1,286 @@ |
292 | +# Copyright 2012 Canonical Ltd. All rights reserved. |
293 | +# |
294 | +# This file is part of lazr.jobrunner |
295 | +# |
296 | +# lazr.jobrunner is free software: you can redistribute it and/or modify |
297 | +# it under the terms of the GNU Lesser General Public License as published by |
298 | +# the Free Software Foundation, version 3 of the License. |
299 | +# |
300 | +# lazr.jobrunner is distributed in the hope that it will be useful, but |
301 | +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY |
302 | +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
303 | +# License for more details. |
304 | +# |
305 | +# You should have received a copy of the GNU Lesser General Public License |
306 | +# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>. |
307 | + |
308 | +__metaclass__ = type |
309 | + |
310 | + |
311 | +import contextlib |
312 | +import errno |
313 | +import json |
314 | +import os |
315 | +import os.path |
316 | +from resource import ( |
317 | + getrlimit, |
318 | + RLIMIT_AS, |
319 | + ) |
320 | +import shutil |
321 | +import subprocess |
322 | +import tempfile |
323 | +from time import sleep |
324 | +from unittest import TestCase |
325 | +os.environ.setdefault('CELERY_CONFIG_MODULE', 'lazr.jobrunner.celeryconfig') |
326 | + |
327 | +from celery.exceptions import SoftTimeLimitExceeded |
328 | + |
329 | +from lazr.jobrunner.celerytask import RunJob |
330 | +from lazr.jobrunner.jobrunner import ( |
331 | + JobStatus, |
332 | + ) |
333 | +from lazr.jobrunner.tests.test_jobrunner import ( |
334 | + FakeJob, |
335 | + ) |
336 | + |
337 | + |
338 | +def get_root(): |
339 | + import lazr.jobrunner |
340 | + root = os.path.join(os.path.dirname(lazr.jobrunner.__file__), '../../../') |
341 | + return os.path.normpath(root) |
342 | + |
343 | + |
344 | +@contextlib.contextmanager |
345 | +def running(cmd_name, cmd_args, env=None, cwd=None): |
346 | + proc = subprocess.Popen((cmd_name,) + cmd_args, env=env, |
347 | + stderr=subprocess.PIPE, cwd=cwd) |
348 | + try: |
349 | + yield proc |
350 | + finally: |
351 | + proc.terminate() |
352 | + proc.wait() |
353 | + |
354 | + |
355 | +def celeryd(config_module, file_job_dir): |
356 | + cmd_args = ('--config', config_module) |
357 | + environ = dict(os.environ) |
358 | + environ['FILE_JOB_DIR'] = file_job_dir |
359 | + return running('bin/celeryd', cmd_args, environ, cwd=get_root()) |
360 | + |
361 | + |
362 | +@contextlib.contextmanager |
363 | +def tempdir(): |
364 | + dirname = tempfile.mkdtemp() |
365 | + try: |
366 | + yield dirname |
367 | + finally: |
368 | + shutil.rmtree(dirname) |
369 | + |
370 | + |
371 | +class FakeJobSource: |
372 | + |
373 | + memory_limit = None |
374 | + |
375 | + def __init__(self): |
376 | + self.jobs = {} |
377 | + |
378 | + def get(self, job_id): |
379 | + return self.jobs[job_id] |
380 | + |
381 | + |
382 | +class FileJob(FakeJob): |
383 | + |
384 | + def __init__(self, job_source, job_id, output=None, |
385 | + status=JobStatus.WAITING, exception=None, sleep=None): |
386 | + super(FileJob, self).__init__(job_id) |
387 | + self.job_source = job_source |
388 | + self.output = output |
389 | + self.status = status |
390 | + self.exception = exception |
391 | + self.sleep = sleep |
392 | + |
393 | + def save(self): |
394 | + self.job_source.set(self) |
395 | + |
396 | + def run(self): |
397 | + super(FileJob, self).run() |
398 | + if self.sleep is not None: |
399 | + sleep(self.sleep) |
400 | + if self.exception is not None: |
401 | + raise Exception(self.exception) |
402 | + if self.output is not None: |
403 | + self.job_source.set_output(self, self.output) |
404 | + |
405 | + |
406 | +class FileJobSource: |
407 | + |
408 | + memory_limit = None |
409 | + |
410 | + def __init__(self, root): |
411 | + self.root = root |
412 | + self.job_root = os.path.join(self.root, 'job') |
413 | + self.output_root = os.path.join(self.root, 'output') |
414 | + |
415 | + def ensure_dir(path): |
416 | + try: |
417 | + os.mkdir(path) |
418 | + except OSError, e: |
419 | + if e.errno != errno.EEXIST: |
420 | + raise |
421 | + ensure_dir(self.job_root) |
422 | + ensure_dir(self.output_root) |
423 | + |
424 | + def _job_file(self, job_id, mode): |
425 | + return open(os.path.join(self.job_root, str(job_id)), mode) |
426 | + |
427 | + def _job_output_file(self, job_id, mode): |
428 | + return open(os.path.join(self.output_root, str(job_id)), mode) |
429 | + |
430 | + def get(self, job_id): |
431 | + with self._job_file(job_id, 'r') as job_file: |
432 | + job_data = json.load(job_file) |
433 | + job_data['status'] = JobStatus.by_value[job_data['status']] |
434 | + return FileJob(self, **job_data) |
435 | + |
436 | + def set(self, job): |
437 | + with self._job_file(job.job_id, 'w') as job_file: |
438 | + job_info = { |
439 | + 'job_id': job.job_id, |
440 | + 'output': job.output, |
441 | + 'status': job.status.value, |
442 | + 'exception': job.exception, |
443 | + 'sleep': job.sleep, |
444 | + } |
445 | + json.dump(job_info, job_file) |
446 | + |
447 | + def get_output(self, job): |
448 | + try: |
449 | + with self._job_output_file(job.job_id, 'r') as job_output_file: |
450 | + return job_output_file.read() |
451 | + except IOError, e: |
452 | + if e.errno == errno.ENOENT: |
453 | + return None |
454 | + raise |
455 | + |
456 | + def set_output(self, job, output): |
457 | + with self._job_output_file(job.job_id, 'w') as job_output_file: |
458 | + job_output_file.write(output) |
459 | + |
460 | + |
461 | +class RunFileJob(RunJob): |
462 | + |
463 | + name = 'run_file_job' |
464 | + |
465 | + def __init__(self): |
466 | + self.file_job_dir = None |
467 | + |
468 | + @property |
469 | + def job_source(self): |
470 | + return FileJobSource(self.file_job_dir) |
471 | + |
472 | + |
473 | +class TestRunJob(TestCase): |
474 | + |
475 | + @staticmethod |
476 | + def makeFakeJobSource(job=None): |
477 | + js = FakeJobSource() |
478 | + if job is None: |
479 | + job = FakeJob(10) |
480 | + js.jobs[job.job_id] = job |
481 | + return js |
482 | + |
483 | + @staticmethod |
484 | + def runJob(js): |
485 | + task = RunJob() |
486 | + task.job_source = js |
487 | + task.run(10) |
488 | + |
489 | + def test_run(self): |
490 | + js = self.makeFakeJobSource() |
491 | + self.assertTrue(js.jobs[10].unrun) |
492 | + self.runJob(js) |
493 | + self.assertFalse(js.jobs[10].unrun) |
494 | + |
495 | + def test_memory_limit(self): |
496 | + |
497 | + class MemoryCheckJob(FakeJob): |
498 | + |
499 | + def run(self): |
500 | + super(MemoryCheckJob, self).run() |
501 | + self.current_memory_limit = getrlimit(RLIMIT_AS)[0] |
502 | + |
503 | + start_limits = getrlimit(RLIMIT_AS) |
504 | + js = FakeJobSource() |
505 | + job = MemoryCheckJob(10) |
506 | + js.jobs[10] = job |
507 | + js.memory_limit = 1024 ** 3 |
508 | + task = RunJob() |
509 | + task.job_source = js |
510 | + task.run(10) |
511 | + self.assertEqual(1024 ** 3, job.current_memory_limit) |
512 | + self.assertEqual(start_limits, getrlimit(RLIMIT_AS)) |
513 | + |
514 | + def test_acquires_lease(self): |
515 | + js = self.makeFakeJobSource() |
516 | + self.assertFalse(js.jobs[10].lease_held) |
517 | + self.runJob(js) |
518 | + self.assertTrue(js.jobs[10].lease_held) |
519 | + |
520 | + def test_skips_failed_acquisition(self): |
521 | + js = self.makeFakeJobSource() |
522 | + js.jobs[10].acquireLease() |
523 | + self.runJob(js) |
524 | + self.assertTrue(js.jobs[10].unrun) |
525 | + |
526 | + |
527 | +class TestCeleryD(TestCase): |
528 | + |
529 | + def test_run_job(self): |
530 | + with tempdir() as temp_dir: |
531 | + js = FileJobSource(temp_dir) |
532 | + job = FileJob(js, 10, 'my_output') |
533 | + job.save() |
534 | + result = RunFileJob.delay(10) |
535 | + self.assertIs(None, js.get_output(job)) |
536 | + self.assertEqual(JobStatus.WAITING, job.status) |
537 | + with celeryd('lazr.jobrunner.tests.config1', temp_dir): |
538 | + result.wait(10) |
539 | + job = js.get(job.job_id) |
540 | + self.assertEqual('my_output', js.get_output(job)) |
541 | + self.assertEqual(JobStatus.COMPLETED, job.status) |
542 | + |
543 | + def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1', |
544 | + **kwargs): |
545 | + js = FileJobSource(temp_dir) |
546 | + job = FileJob(js, 10, **kwargs) |
547 | + job.save() |
548 | + result = RunFileJob.delay(10) |
549 | + with celeryd(config, temp_dir) as proc: |
550 | + try: |
551 | + result.wait(10) |
552 | + except SoftTimeLimitExceeded: |
553 | + pass |
554 | + job = js.get(job.job_id) |
555 | + return job, proc |
556 | + |
557 | + def test_run_job_emits_oopses(self): |
558 | + with tempdir() as temp_dir: |
559 | + job, proc = self.run_file_job( |
560 | + temp_dir, exception='Catch me if you can!') |
561 | + err = proc.stderr.read() |
562 | + self.assertEqual(JobStatus.FAILED, job.status) |
563 | + self.assertIs(None, job.job_source.get_output(job)) |
564 | + self.assertIn( |
565 | + "OOPS while executing job 10: [] Exception(u'Catch me if you" |
566 | + " can!',)", err) |
567 | + |
568 | + def test_timeout_long(self): |
569 | + """Raises exception when a job exceeds the configured time limit.""" |
570 | + with tempdir() as temp_dir: |
571 | + job, proc = self.run_file_job( |
572 | + temp_dir, config='lazr.jobrunner.tests.time_limit_config', |
573 | + sleep=10) |
574 | + self.assertEqual(JobStatus.FAILED, job.status) |
575 | + err = proc.stderr.read() |
576 | + self.assertIn( |
577 | + 'OOPS while executing job 10: [] SoftTimeLimitExceeded', err) |
578 | |
579 | === modified file 'src/lazr/jobrunner/tests/test_jobrunner.py' |
580 | --- src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-16 18:27:52 +0000 |
581 | +++ src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-22 18:19:18 +0000 |
582 | @@ -17,21 +17,9 @@ |
583 | __metaclass__ = type |
584 | |
585 | import contextlib |
586 | -import errno |
587 | -import json |
588 | import logging |
589 | -import os.path |
590 | -from resource import ( |
591 | - getrlimit, |
592 | - RLIMIT_AS, |
593 | - ) |
594 | -import subprocess |
595 | -import shutil |
596 | -import tempfile |
597 | -from time import sleep |
598 | from unittest import TestCase |
599 | |
600 | -from celery.exceptions import SoftTimeLimitExceeded |
601 | import oops |
602 | from zope.testing.loghandler import Handler |
603 | |
604 | @@ -40,7 +28,6 @@ |
605 | JobRunner, |
606 | JobStatus, |
607 | LeaseHeld, |
608 | - RunJob, |
609 | SuspendJobException, |
610 | ) |
611 | |
612 | @@ -97,243 +84,6 @@ |
613 | return [('foo', 'bar')] |
614 | |
615 | |
616 | -class FakeJobSource: |
617 | - |
618 | - memory_limit = None |
619 | - |
620 | - def __init__(self): |
621 | - self.jobs = {} |
622 | - |
623 | - def get(self, job_id): |
624 | - return self.jobs[job_id] |
625 | - |
626 | - |
627 | -class FileJob(FakeJob): |
628 | - |
629 | - def __init__(self, job_source, job_id, output=None, |
630 | - status=JobStatus.WAITING, exception=None, sleep=None): |
631 | - super(FileJob, self).__init__(job_id) |
632 | - self.job_source = job_source |
633 | - self.output = output |
634 | - self.status = status |
635 | - self.exception = exception |
636 | - self.sleep = sleep |
637 | - |
638 | - def save(self): |
639 | - self.job_source.set(self) |
640 | - |
641 | - def run(self): |
642 | - super(FileJob, self).run() |
643 | - if self.sleep is not None: |
644 | - sleep(self.sleep) |
645 | - if self.exception is not None: |
646 | - raise Exception(self.exception) |
647 | - if self.output is not None: |
648 | - self.job_source.set_output(self, self.output) |
649 | - |
650 | - |
651 | -class FileJobSource: |
652 | - |
653 | - memory_limit = None |
654 | - |
655 | - def __init__(self, root): |
656 | - self.root = root |
657 | - self.job_root = os.path.join(self.root, 'job') |
658 | - self.output_root = os.path.join(self.root, 'output') |
659 | - def ensure_dir(path): |
660 | - try: |
661 | - os.mkdir(path) |
662 | - except OSError, e: |
663 | - if e.errno != errno.EEXIST: |
664 | - raise |
665 | - ensure_dir(self.job_root) |
666 | - ensure_dir(self.output_root) |
667 | - |
668 | - def _job_file(self, job_id, mode): |
669 | - return open(os.path.join(self.job_root, str(job_id)), mode) |
670 | - |
671 | - def _job_output_file(self, job_id, mode): |
672 | - return open(os.path.join(self.output_root, str(job_id)), mode) |
673 | - |
674 | - def get(self, job_id): |
675 | - with self._job_file(job_id, 'r') as job_file: |
676 | - job_data = json.load(job_file) |
677 | - job_data['status'] = JobStatus.by_value[job_data['status']] |
678 | - return FileJob(self, **job_data) |
679 | - |
680 | - def set(self, job): |
681 | - with self._job_file(job.job_id, 'w') as job_file: |
682 | - job_info = { |
683 | - 'job_id': job.job_id, |
684 | - 'output': job.output, |
685 | - 'status': job.status.value, |
686 | - 'exception': job.exception, |
687 | - 'sleep': job.sleep, |
688 | - } |
689 | - json.dump(job_info, job_file) |
690 | - |
691 | - def get_output(self, job): |
692 | - try: |
693 | - with self._job_output_file(job.job_id, 'r') as job_output_file: |
694 | - return job_output_file.read() |
695 | - except IOError, e: |
696 | - if e.errno == errno.ENOENT: |
697 | - return None |
698 | - raise |
699 | - |
700 | - def set_output(self, job, output): |
701 | - with self._job_output_file(job.job_id, 'w') as job_output_file: |
702 | - job_output_file.write(output) |
703 | - |
704 | - |
705 | -class TestRunJob(TestCase): |
706 | - |
707 | - @staticmethod |
708 | - def makeFakeJobSource(job=None): |
709 | - js = FakeJobSource() |
710 | - if job is None: |
711 | - job = FakeJob(10) |
712 | - js.jobs[job.job_id] = job |
713 | - return js |
714 | - |
715 | - @staticmethod |
716 | - def runJob(js): |
717 | - task = RunJob() |
718 | - task.job_source = js |
719 | - task.run(10) |
720 | - |
721 | - def test_run(self): |
722 | - js = self.makeFakeJobSource() |
723 | - self.assertTrue(js.jobs[10].unrun) |
724 | - self.runJob(js) |
725 | - self.assertFalse(js.jobs[10].unrun) |
726 | - |
727 | - def test_memory_limit(self): |
728 | - |
729 | - class MemoryCheckJob(FakeJob): |
730 | - |
731 | - def run(self): |
732 | - super(MemoryCheckJob, self).run() |
733 | - self.current_memory_limit = getrlimit(RLIMIT_AS)[0] |
734 | - |
735 | - start_limits = getrlimit(RLIMIT_AS) |
736 | - js = FakeJobSource() |
737 | - job = MemoryCheckJob(10) |
738 | - js.jobs[10] = job |
739 | - js.memory_limit = 1024 ** 3 |
740 | - task = RunJob() |
741 | - task.job_source = js |
742 | - task.run(10) |
743 | - self.assertEqual(1024 ** 3, job.current_memory_limit) |
744 | - self.assertEqual(start_limits, getrlimit(RLIMIT_AS)) |
745 | - |
746 | - def test_acquires_lease(self): |
747 | - js = self.makeFakeJobSource() |
748 | - self.assertFalse(js.jobs[10].lease_held) |
749 | - self.runJob(js) |
750 | - self.assertTrue(js.jobs[10].lease_held) |
751 | - |
752 | - def test_skips_failed_acquisition(self): |
753 | - js = self.makeFakeJobSource() |
754 | - js.jobs[10].acquireLease() |
755 | - self.runJob(js) |
756 | - self.assertTrue(js.jobs[10].unrun) |
757 | - |
758 | - |
759 | -def get_root(): |
760 | - import lazr.jobrunner |
761 | - root = os.path.join(os.path.dirname(lazr.jobrunner.__file__), '../../../') |
762 | - return os.path.normpath(root) |
763 | - |
764 | - |
765 | -@contextlib.contextmanager |
766 | -def tempdir(): |
767 | - dirname = tempfile.mkdtemp() |
768 | - try: |
769 | - yield dirname |
770 | - finally: |
771 | - shutil.rmtree(dirname) |
772 | - |
773 | - |
774 | -@contextlib.contextmanager |
775 | -def celeryd(config_module, file_job_dir): |
776 | - cmdname = os.path.join(get_root(), 'bin/celeryd') |
777 | - environ = dict(os.environ) |
778 | - environ['FILE_JOB_DIR'] = file_job_dir |
779 | - proc = subprocess.Popen([cmdname, '--config', config_module], env=environ, |
780 | - stderr=subprocess.PIPE) |
781 | - try: |
782 | - yield proc |
783 | - finally: |
784 | - proc.terminate() |
785 | - proc.wait() |
786 | - |
787 | - |
788 | -class RunFileJob(RunJob): |
789 | - |
790 | - name = 'run_file_job' |
791 | - |
792 | - def __init__(self): |
793 | - file_job_dir = None |
794 | - |
795 | - @property |
796 | - def job_source(self): |
797 | - return FileJobSource(self.file_job_dir) |
798 | - |
799 | - |
800 | -class TestCeleryD(TestCase): |
801 | - |
802 | - def test_run_job(self): |
803 | - with tempdir() as temp_dir: |
804 | - js = FileJobSource(temp_dir) |
805 | - job = FileJob(js, 10, 'my_output') |
806 | - job.save() |
807 | - result = RunFileJob.delay(10) |
808 | - self.assertIs(None, js.get_output(job)) |
809 | - self.assertEqual(JobStatus.WAITING, job.status) |
810 | - with celeryd('lazr.jobrunner.tests.config1', temp_dir): |
811 | - result.wait(10) |
812 | - job = js.get(job.job_id) |
813 | - self.assertEqual('my_output', js.get_output(job)) |
814 | - self.assertEqual(JobStatus.COMPLETED, job.status) |
815 | - |
816 | - def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1', |
817 | - **kwargs): |
818 | - js = FileJobSource(temp_dir) |
819 | - job = FileJob(js, 10, **kwargs) |
820 | - job.save() |
821 | - result = RunFileJob.delay(10) |
822 | - with celeryd(config, temp_dir) as proc: |
823 | - try: |
824 | - result.wait(10) |
825 | - except SoftTimeLimitExceeded: |
826 | - pass |
827 | - job = js.get(job.job_id) |
828 | - return job, proc |
829 | - |
830 | - def test_run_job_emits_oopses(self): |
831 | - with tempdir() as temp_dir: |
832 | - job, proc = self.run_file_job( |
833 | - temp_dir, exception='Catch me if you can!') |
834 | - err = proc.stderr.read() |
835 | - self.assertEqual(JobStatus.FAILED, job.status) |
836 | - self.assertIs(None, job.job_source.get_output(job)) |
837 | - self.assertIn( |
838 | - "OOPS while executing job 10: [] Exception(u'Catch me if you" |
839 | - " can!',)", err) |
840 | - |
841 | - def test_timeout_long(self): |
842 | - """Raises exception when a job exceeds the configured time limit.""" |
843 | - with tempdir() as temp_dir: |
844 | - job, proc = self.run_file_job( |
845 | - temp_dir, config='lazr.jobrunner.tests.time_limit_config', |
846 | - sleep=10) |
847 | - self.assertEqual(JobStatus.FAILED, job.status) |
848 | - err = proc.stderr.read() |
849 | - self.assertIn( |
850 | - 'OOPS while executing job 10: [] SoftTimeLimitExceeded', err) |
851 | - |
852 | - |
853 | class OOPSTestRepository: |
854 | """Record OOPSes in memory.""" |
855 | |
856 | |
857 | === modified file 'src/lazr/jobrunner/tests/time_limit_config.py' |
858 | --- src/lazr/jobrunner/tests/time_limit_config.py 2012-03-14 14:22:04 +0000 |
859 | +++ src/lazr/jobrunner/tests/time_limit_config.py 2012-03-22 18:19:18 +0000 |
860 | @@ -1,6 +1,6 @@ |
861 | BROKER_VHOST = "/" |
862 | CELERY_RESULT_BACKEND = "amqp" |
863 | -CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", ) |
864 | +CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", ) |
865 | CELERYD_CONCURRENCY = 1 |
866 | CELERYD_TASK_SOFT_TIME_LIMIT = 1 |
867 | import os |
868 | |
869 | === modified file 'src/lazr/jobrunner/version.txt' |
870 | --- src/lazr/jobrunner/version.txt 2012-03-15 14:59:09 +0000 |
871 | +++ src/lazr/jobrunner/version.txt 2012-03-22 18:19:18 +0000 |
872 | @@ -1,1 +1,1 @@ |
873 | -0.1 |
874 | +0.2 |
looks good