Merge lp:~abentley/lazr.jobrunner/launchpad-via-celery into lp:lazr.jobrunner

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 21
Proposed branch: lp:~abentley/lazr.jobrunner/launchpad-via-celery
Merge into: lp:lazr.jobrunner
Diff against target: 874 lines (+352/-334)
17 files modified
.bzrignore (+1/-1)
Makefile (+7/-4)
setup.py (+2/-1)
src/lazr.jobrunner.egg-info/PKG-INFO (+0/-46)
src/lazr.jobrunner.egg-info/dependency_links.txt (+0/-1)
src/lazr.jobrunner.egg-info/entry_points.txt (+0/-3)
src/lazr.jobrunner.egg-info/namespace_packages.txt (+0/-1)
src/lazr.jobrunner.egg-info/not-zip-safe (+0/-1)
src/lazr.jobrunner.egg-info/top_level.txt (+0/-1)
src/lazr/jobrunner/__init__.py (+1/-0)
src/lazr/jobrunner/celerytask.py (+45/-0)
src/lazr/jobrunner/jobrunner.py (+7/-22)
src/lazr/jobrunner/tests/config1.py (+1/-1)
src/lazr/jobrunner/tests/test_celerytask.py (+286/-0)
src/lazr/jobrunner/tests/test_jobrunner.py (+0/-250)
src/lazr/jobrunner/tests/time_limit_config.py (+1/-1)
src/lazr/jobrunner/version.txt (+1/-1)
To merge this branch: bzr merge lp:~abentley/lazr.jobrunner/launchpad-via-celery
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+98894@code.launchpad.net

Commit message

Update to support run-via-celery in Launchpad

Description of the change

This branch improves the compatibility of the celery job-running code with Launchpad.

The biggest change is that Celery-specific code has been isolated in its own modules. This is because importing from Celery has side-effects (i.e. loading a config module), so only code that intends to relate to Celery should import it.

I've removed src/lazr.jobrunner.egg-info from version control, because AFAICT its files are all auto-generated by bin/buildout.

I've added a "lint" target to the makefile, so "make lint" will check lint. However, it lints all .py files in src. Since we don't have significant lint-broken code, we can ensure a completely lint-clean tree. I've cleaned up what lint we did have.

I've updated the version to 0.2 in setup.py, so that LP code can request the development egg instead of the 0.1 code.

I've moved celeryconfig.py to src/lazr/jobrunner/celeryconfig.py. This is currently used only by the job runner.

I've copied the Launchpad version of LeaseHeld into jobrunner, and updated Launchpad to import it.

I've tweaked runJobHandleError to take a reference to job.fail early, so that it can be invoked when the DB transaction is invalid.

I've added getJobRunner to RunJobHandleError, so that Launchpad can use lp.services.job.runner.BaseJobRunner.

To post a comment you must log in.
Revision history for this message
Abel Deuring (adeuring) wrote :

looks good

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.bzrignore'
2--- .bzrignore 2012-02-23 17:11:46 +0000
3+++ .bzrignore 2012-03-22 18:19:18 +0000
4@@ -6,4 +6,4 @@
5 tmp
6 build
7 dist
8-src/lazr.jobrunner.egg-info/SOURCES.txt
9+src/lazr.jobrunner.egg-info
10
11=== modified file 'Makefile'
12--- Makefile 2012-03-01 18:41:07 +0000
13+++ Makefile 2012-03-22 18:19:18 +0000
14@@ -1,9 +1,9 @@
15+develop: bin/python
16+ bin/buildout
17+
18 check: bin/python
19 bin/nosetests
20
21-develop: bin/python
22- bin/buildout
23-
24 bin/buildout:
25 python -S bootstrap.py
26
27@@ -13,4 +13,7 @@
28 clean:
29 bzr clean-tree --force --ignored --unknown
30
31-.PHONY: develop check clean
32+lint:
33+ pocketlint `find src -name '*.py'`
34+
35+.PHONY: check clean develop lint
36
37=== modified file 'setup.py'
38--- setup.py 2012-03-15 15:14:59 +0000
39+++ setup.py 2012-03-22 18:19:18 +0000
40@@ -22,12 +22,13 @@
41 NEWS = open(os.path.join(here, 'NEWS.txt')).read()
42
43
44-version = '0.1'
45+version = '0.2'
46
47 install_requires = [
48 # List your project dependencies here.
49 # For more details, see:
50 # http://packages.python.org/distribute/setuptools.html#declaring-dependencies
51+ 'celery',
52 ]
53
54
55
56=== removed directory 'src/lazr.jobrunner.egg-info'
57=== removed file 'src/lazr.jobrunner.egg-info/PKG-INFO'
58--- src/lazr.jobrunner.egg-info/PKG-INFO 2012-03-15 14:59:09 +0000
59+++ src/lazr.jobrunner.egg-info/PKG-INFO 1970-01-01 00:00:00 +0000
60@@ -1,46 +0,0 @@
61-Metadata-Version: 1.0
62-Name: lazr.jobrunner
63-Version: 0.1
64-Summary: A Celery based job runner
65-Home-page: UNKNOWN
66-Author: UNKNOWN
67-Author-email: UNKNOWN
68-License: UNKNOWN
69-Description: This file requires editing
70- ==========================
71-
72- Note to the author: Please add something informative to this README *before*
73- releasing your software, as `a little documentation goes a long way`_. Both
74- README.rst (this file) and NEWS.txt (release notes) will be included in your
75- package metadata which gets displayed in the PyPI page for your project.
76-
77- You can take a look at the README.txt of other projects, such as repoze.bfg
78- (http://bfg.repoze.org/trac/browser/trunk/README.txt) for some ideas.
79-
80- .. _`a little documentation goes a long way`: http://www.martinaspeli.net/articles/a-little-documentation-goes-a-long-way
81-
82- Credits
83- -------
84-
85- - `Distribute`_
86- - `Buildout`_
87- - `modern-package-template`_
88-
89- .. _Buildout: http://www.buildout.org/
90- .. _Distribute: http://pypi.python.org/pypi/distribute
91- .. _`modern-package-template`: http://pypi.python.org/pypi/modern-package-template
92-
93-
94- News
95- ====
96-
97- 0.1
98- ---
99-
100- *Release date: 15-Mar-2012*
101-
102- * Initial reelease
103-
104-
105-
106-Platform: UNKNOWN
107
108=== removed file 'src/lazr.jobrunner.egg-info/dependency_links.txt'
109--- src/lazr.jobrunner.egg-info/dependency_links.txt 2012-02-23 10:32:17 +0000
110+++ src/lazr.jobrunner.egg-info/dependency_links.txt 1970-01-01 00:00:00 +0000
111@@ -1,1 +0,0 @@
112-
113
114=== removed file 'src/lazr.jobrunner.egg-info/entry_points.txt'
115--- src/lazr.jobrunner.egg-info/entry_points.txt 2012-02-23 10:32:17 +0000
116+++ src/lazr.jobrunner.egg-info/entry_points.txt 1970-01-01 00:00:00 +0000
117@@ -1,3 +0,0 @@
118-[console_scripts]
119-lazr.jobrunner = lazr.jobrunner:main
120-
121
122=== removed file 'src/lazr.jobrunner.egg-info/namespace_packages.txt'
123--- src/lazr.jobrunner.egg-info/namespace_packages.txt 2012-02-23 10:32:17 +0000
124+++ src/lazr.jobrunner.egg-info/namespace_packages.txt 1970-01-01 00:00:00 +0000
125@@ -1,1 +0,0 @@
126-lazr
127
128=== removed file 'src/lazr.jobrunner.egg-info/not-zip-safe'
129--- src/lazr.jobrunner.egg-info/not-zip-safe 2012-02-23 10:32:17 +0000
130+++ src/lazr.jobrunner.egg-info/not-zip-safe 1970-01-01 00:00:00 +0000
131@@ -1,1 +0,0 @@
132-
133
134=== removed file 'src/lazr.jobrunner.egg-info/top_level.txt'
135--- src/lazr.jobrunner.egg-info/top_level.txt 2012-02-23 10:32:17 +0000
136+++ src/lazr.jobrunner.egg-info/top_level.txt 1970-01-01 00:00:00 +0000
137@@ -1,1 +0,0 @@
138-lazr
139
140=== modified file 'src/lazr/jobrunner/__init__.py'
141--- src/lazr/jobrunner/__init__.py 2012-02-23 10:32:17 +0000
142+++ src/lazr/jobrunner/__init__.py 2012-03-22 18:19:18 +0000
143@@ -1,4 +1,5 @@
144 # Example package with a console entry point
145
146+
147 def main():
148 print "Hello World"
149
150=== renamed file 'celeryconfig.py' => 'src/lazr/jobrunner/celeryconfig.py'
151=== added file 'src/lazr/jobrunner/celerytask.py'
152--- src/lazr/jobrunner/celerytask.py 1970-01-01 00:00:00 +0000
153+++ src/lazr/jobrunner/celerytask.py 2012-03-22 18:19:18 +0000
154@@ -0,0 +1,45 @@
155+# Copyright 2012 Canonical Ltd. All rights reserved.
156+#
157+# This file is part of lazr.jobrunner
158+#
159+# lazr.jobrunner is free software: you can redistribute it and/or modify
160+# it under the terms of the GNU Lesser General Public License as published by
161+# the Free Software Foundation, version 3 of the License.
162+#
163+# lazr.jobrunner is distributed in the hope that it will be useful, but
164+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
165+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
166+# License for more details.
167+#
168+# You should have received a copy of the GNU Lesser General Public License
169+# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>.
170+
171+__metaclass__ = type
172+
173+
174+from celery.task import Task
175+from lazr.jobrunner.jobrunner import (
176+ JobRunner,
177+ LeaseHeld,
178+ memory_limit,
179+ )
180+
181+
182+class RunJob(Task):
183+
184+ abstract = True
185+
186+ oops_config = None
187+
188+ def getJobRunner(self):
189+ return JobRunner(oops_config=self.oops_config)
190+
191+ def run(self, job_id):
192+ job = self.job_source.get(job_id)
193+ try:
194+ job.acquireLease()
195+ except LeaseHeld:
196+ return
197+ runner = self.getJobRunner()
198+ with memory_limit(self.job_source.memory_limit):
199+ runner.runJobHandleError(job)
200
201=== modified file 'src/lazr/jobrunner/jobrunner.py'
202--- src/lazr/jobrunner/jobrunner.py 2012-03-16 18:27:52 +0000
203+++ src/lazr/jobrunner/jobrunner.py 2012-03-22 18:19:18 +0000
204@@ -26,8 +26,6 @@
205 )
206 import sys
207
208-from celery.task import Task
209-
210
211 class SuspendJobException(Exception):
212 """Raised when a running job wants to suspend itself."""
213@@ -35,7 +33,10 @@
214
215
216 class LeaseHeld(Exception):
217- """Raised when a lease cannot be acquired."""
218+ """Raised when attempting to acquire a lease that is already held."""
219+
220+ def __init__(self):
221+ Exception.__init__(self, 'Lease is already held.')
222
223
224 class JobStatus:
225@@ -65,23 +66,6 @@
226 FAILED, SUSPENDED])
227
228
229-class RunJob(Task):
230-
231- abstract = True
232-
233- oops_config = None
234-
235- def run(self, job_id):
236- job = self.job_source.get(job_id)
237- try:
238- job.acquireLease()
239- except LeaseHeld:
240- return
241- runner = JobRunner(oops_config=self.oops_config)
242- with memory_limit(self.job_source.memory_limit):
243- runner.runJobHandleError(job)
244-
245-
246 @contextmanager
247 def memory_limit(limit):
248 if limit is not None:
249@@ -169,6 +153,7 @@
250 self.logger.info(
251 'Running %s in status %s' % (
252 self.job_str(job), job.status.title))
253+ fail = job.fail
254 job.start(manage_transaction=True)
255 do_retry = False
256 try:
257@@ -186,7 +171,7 @@
258 job.suspend(manage_transaction=True)
259 self.incomplete_jobs.append(job)
260 except Exception:
261- job.fail(manage_transaction=True)
262+ fail(manage_transaction=True)
263 self.incomplete_jobs.append(job)
264 raise
265 else:
266@@ -216,7 +201,7 @@
267 self.logger.exception("Failure in _doOops: %s" % e)
268 info = sys.exc_info()
269 report = job.makeOopsReport(self.oops_config, info)
270- oops_ids = self.oops_config.publish(report)
271+ self.oops_config.publish(report)
272 return report
273
274 @staticmethod
275
276=== modified file 'src/lazr/jobrunner/tests/config1.py'
277--- src/lazr/jobrunner/tests/config1.py 2012-03-09 21:50:46 +0000
278+++ src/lazr/jobrunner/tests/config1.py 2012-03-22 18:19:18 +0000
279@@ -1,6 +1,6 @@
280 BROKER_VHOST = "/"
281 CELERY_RESULT_BACKEND = "amqp"
282-CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", )
283+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
284 CELERYD_CONCURRENCY = 1
285 import os
286 import oops
287
288=== added file 'src/lazr/jobrunner/tests/test_celerytask.py'
289--- src/lazr/jobrunner/tests/test_celerytask.py 1970-01-01 00:00:00 +0000
290+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-03-22 18:19:18 +0000
291@@ -0,0 +1,286 @@
292+# Copyright 2012 Canonical Ltd. All rights reserved.
293+#
294+# This file is part of lazr.jobrunner
295+#
296+# lazr.jobrunner is free software: you can redistribute it and/or modify
297+# it under the terms of the GNU Lesser General Public License as published by
298+# the Free Software Foundation, version 3 of the License.
299+#
300+# lazr.jobrunner is distributed in the hope that it will be useful, but
301+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
302+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
303+# License for more details.
304+#
305+# You should have received a copy of the GNU Lesser General Public License
306+# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>.
307+
308+__metaclass__ = type
309+
310+
311+import contextlib
312+import errno
313+import json
314+import os
315+import os.path
316+from resource import (
317+ getrlimit,
318+ RLIMIT_AS,
319+ )
320+import shutil
321+import subprocess
322+import tempfile
323+from time import sleep
324+from unittest import TestCase
325+os.environ.setdefault('CELERY_CONFIG_MODULE', 'lazr.jobrunner.celeryconfig')
326+
327+from celery.exceptions import SoftTimeLimitExceeded
328+
329+from lazr.jobrunner.celerytask import RunJob
330+from lazr.jobrunner.jobrunner import (
331+ JobStatus,
332+ )
333+from lazr.jobrunner.tests.test_jobrunner import (
334+ FakeJob,
335+ )
336+
337+
338+def get_root():
339+ import lazr.jobrunner
340+ root = os.path.join(os.path.dirname(lazr.jobrunner.__file__), '../../../')
341+ return os.path.normpath(root)
342+
343+
344+@contextlib.contextmanager
345+def running(cmd_name, cmd_args, env=None, cwd=None):
346+ proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
347+ stderr=subprocess.PIPE, cwd=cwd)
348+ try:
349+ yield proc
350+ finally:
351+ proc.terminate()
352+ proc.wait()
353+
354+
355+def celeryd(config_module, file_job_dir):
356+ cmd_args = ('--config', config_module)
357+ environ = dict(os.environ)
358+ environ['FILE_JOB_DIR'] = file_job_dir
359+ return running('bin/celeryd', cmd_args, environ, cwd=get_root())
360+
361+
362+@contextlib.contextmanager
363+def tempdir():
364+ dirname = tempfile.mkdtemp()
365+ try:
366+ yield dirname
367+ finally:
368+ shutil.rmtree(dirname)
369+
370+
371+class FakeJobSource:
372+
373+ memory_limit = None
374+
375+ def __init__(self):
376+ self.jobs = {}
377+
378+ def get(self, job_id):
379+ return self.jobs[job_id]
380+
381+
382+class FileJob(FakeJob):
383+
384+ def __init__(self, job_source, job_id, output=None,
385+ status=JobStatus.WAITING, exception=None, sleep=None):
386+ super(FileJob, self).__init__(job_id)
387+ self.job_source = job_source
388+ self.output = output
389+ self.status = status
390+ self.exception = exception
391+ self.sleep = sleep
392+
393+ def save(self):
394+ self.job_source.set(self)
395+
396+ def run(self):
397+ super(FileJob, self).run()
398+ if self.sleep is not None:
399+ sleep(self.sleep)
400+ if self.exception is not None:
401+ raise Exception(self.exception)
402+ if self.output is not None:
403+ self.job_source.set_output(self, self.output)
404+
405+
406+class FileJobSource:
407+
408+ memory_limit = None
409+
410+ def __init__(self, root):
411+ self.root = root
412+ self.job_root = os.path.join(self.root, 'job')
413+ self.output_root = os.path.join(self.root, 'output')
414+
415+ def ensure_dir(path):
416+ try:
417+ os.mkdir(path)
418+ except OSError, e:
419+ if e.errno != errno.EEXIST:
420+ raise
421+ ensure_dir(self.job_root)
422+ ensure_dir(self.output_root)
423+
424+ def _job_file(self, job_id, mode):
425+ return open(os.path.join(self.job_root, str(job_id)), mode)
426+
427+ def _job_output_file(self, job_id, mode):
428+ return open(os.path.join(self.output_root, str(job_id)), mode)
429+
430+ def get(self, job_id):
431+ with self._job_file(job_id, 'r') as job_file:
432+ job_data = json.load(job_file)
433+ job_data['status'] = JobStatus.by_value[job_data['status']]
434+ return FileJob(self, **job_data)
435+
436+ def set(self, job):
437+ with self._job_file(job.job_id, 'w') as job_file:
438+ job_info = {
439+ 'job_id': job.job_id,
440+ 'output': job.output,
441+ 'status': job.status.value,
442+ 'exception': job.exception,
443+ 'sleep': job.sleep,
444+ }
445+ json.dump(job_info, job_file)
446+
447+ def get_output(self, job):
448+ try:
449+ with self._job_output_file(job.job_id, 'r') as job_output_file:
450+ return job_output_file.read()
451+ except IOError, e:
452+ if e.errno == errno.ENOENT:
453+ return None
454+ raise
455+
456+ def set_output(self, job, output):
457+ with self._job_output_file(job.job_id, 'w') as job_output_file:
458+ job_output_file.write(output)
459+
460+
461+class RunFileJob(RunJob):
462+
463+ name = 'run_file_job'
464+
465+ def __init__(self):
466+ self.file_job_dir = None
467+
468+ @property
469+ def job_source(self):
470+ return FileJobSource(self.file_job_dir)
471+
472+
473+class TestRunJob(TestCase):
474+
475+ @staticmethod
476+ def makeFakeJobSource(job=None):
477+ js = FakeJobSource()
478+ if job is None:
479+ job = FakeJob(10)
480+ js.jobs[job.job_id] = job
481+ return js
482+
483+ @staticmethod
484+ def runJob(js):
485+ task = RunJob()
486+ task.job_source = js
487+ task.run(10)
488+
489+ def test_run(self):
490+ js = self.makeFakeJobSource()
491+ self.assertTrue(js.jobs[10].unrun)
492+ self.runJob(js)
493+ self.assertFalse(js.jobs[10].unrun)
494+
495+ def test_memory_limit(self):
496+
497+ class MemoryCheckJob(FakeJob):
498+
499+ def run(self):
500+ super(MemoryCheckJob, self).run()
501+ self.current_memory_limit = getrlimit(RLIMIT_AS)[0]
502+
503+ start_limits = getrlimit(RLIMIT_AS)
504+ js = FakeJobSource()
505+ job = MemoryCheckJob(10)
506+ js.jobs[10] = job
507+ js.memory_limit = 1024 ** 3
508+ task = RunJob()
509+ task.job_source = js
510+ task.run(10)
511+ self.assertEqual(1024 ** 3, job.current_memory_limit)
512+ self.assertEqual(start_limits, getrlimit(RLIMIT_AS))
513+
514+ def test_acquires_lease(self):
515+ js = self.makeFakeJobSource()
516+ self.assertFalse(js.jobs[10].lease_held)
517+ self.runJob(js)
518+ self.assertTrue(js.jobs[10].lease_held)
519+
520+ def test_skips_failed_acquisition(self):
521+ js = self.makeFakeJobSource()
522+ js.jobs[10].acquireLease()
523+ self.runJob(js)
524+ self.assertTrue(js.jobs[10].unrun)
525+
526+
527+class TestCeleryD(TestCase):
528+
529+ def test_run_job(self):
530+ with tempdir() as temp_dir:
531+ js = FileJobSource(temp_dir)
532+ job = FileJob(js, 10, 'my_output')
533+ job.save()
534+ result = RunFileJob.delay(10)
535+ self.assertIs(None, js.get_output(job))
536+ self.assertEqual(JobStatus.WAITING, job.status)
537+ with celeryd('lazr.jobrunner.tests.config1', temp_dir):
538+ result.wait(10)
539+ job = js.get(job.job_id)
540+ self.assertEqual('my_output', js.get_output(job))
541+ self.assertEqual(JobStatus.COMPLETED, job.status)
542+
543+ def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1',
544+ **kwargs):
545+ js = FileJobSource(temp_dir)
546+ job = FileJob(js, 10, **kwargs)
547+ job.save()
548+ result = RunFileJob.delay(10)
549+ with celeryd(config, temp_dir) as proc:
550+ try:
551+ result.wait(10)
552+ except SoftTimeLimitExceeded:
553+ pass
554+ job = js.get(job.job_id)
555+ return job, proc
556+
557+ def test_run_job_emits_oopses(self):
558+ with tempdir() as temp_dir:
559+ job, proc = self.run_file_job(
560+ temp_dir, exception='Catch me if you can!')
561+ err = proc.stderr.read()
562+ self.assertEqual(JobStatus.FAILED, job.status)
563+ self.assertIs(None, job.job_source.get_output(job))
564+ self.assertIn(
565+ "OOPS while executing job 10: [] Exception(u'Catch me if you"
566+ " can!',)", err)
567+
568+ def test_timeout_long(self):
569+ """Raises exception when a job exceeds the configured time limit."""
570+ with tempdir() as temp_dir:
571+ job, proc = self.run_file_job(
572+ temp_dir, config='lazr.jobrunner.tests.time_limit_config',
573+ sleep=10)
574+ self.assertEqual(JobStatus.FAILED, job.status)
575+ err = proc.stderr.read()
576+ self.assertIn(
577+ 'OOPS while executing job 10: [] SoftTimeLimitExceeded', err)
578
579=== modified file 'src/lazr/jobrunner/tests/test_jobrunner.py'
580--- src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-16 18:27:52 +0000
581+++ src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-22 18:19:18 +0000
582@@ -17,21 +17,9 @@
583 __metaclass__ = type
584
585 import contextlib
586-import errno
587-import json
588 import logging
589-import os.path
590-from resource import (
591- getrlimit,
592- RLIMIT_AS,
593- )
594-import subprocess
595-import shutil
596-import tempfile
597-from time import sleep
598 from unittest import TestCase
599
600-from celery.exceptions import SoftTimeLimitExceeded
601 import oops
602 from zope.testing.loghandler import Handler
603
604@@ -40,7 +28,6 @@
605 JobRunner,
606 JobStatus,
607 LeaseHeld,
608- RunJob,
609 SuspendJobException,
610 )
611
612@@ -97,243 +84,6 @@
613 return [('foo', 'bar')]
614
615
616-class FakeJobSource:
617-
618- memory_limit = None
619-
620- def __init__(self):
621- self.jobs = {}
622-
623- def get(self, job_id):
624- return self.jobs[job_id]
625-
626-
627-class FileJob(FakeJob):
628-
629- def __init__(self, job_source, job_id, output=None,
630- status=JobStatus.WAITING, exception=None, sleep=None):
631- super(FileJob, self).__init__(job_id)
632- self.job_source = job_source
633- self.output = output
634- self.status = status
635- self.exception = exception
636- self.sleep = sleep
637-
638- def save(self):
639- self.job_source.set(self)
640-
641- def run(self):
642- super(FileJob, self).run()
643- if self.sleep is not None:
644- sleep(self.sleep)
645- if self.exception is not None:
646- raise Exception(self.exception)
647- if self.output is not None:
648- self.job_source.set_output(self, self.output)
649-
650-
651-class FileJobSource:
652-
653- memory_limit = None
654-
655- def __init__(self, root):
656- self.root = root
657- self.job_root = os.path.join(self.root, 'job')
658- self.output_root = os.path.join(self.root, 'output')
659- def ensure_dir(path):
660- try:
661- os.mkdir(path)
662- except OSError, e:
663- if e.errno != errno.EEXIST:
664- raise
665- ensure_dir(self.job_root)
666- ensure_dir(self.output_root)
667-
668- def _job_file(self, job_id, mode):
669- return open(os.path.join(self.job_root, str(job_id)), mode)
670-
671- def _job_output_file(self, job_id, mode):
672- return open(os.path.join(self.output_root, str(job_id)), mode)
673-
674- def get(self, job_id):
675- with self._job_file(job_id, 'r') as job_file:
676- job_data = json.load(job_file)
677- job_data['status'] = JobStatus.by_value[job_data['status']]
678- return FileJob(self, **job_data)
679-
680- def set(self, job):
681- with self._job_file(job.job_id, 'w') as job_file:
682- job_info = {
683- 'job_id': job.job_id,
684- 'output': job.output,
685- 'status': job.status.value,
686- 'exception': job.exception,
687- 'sleep': job.sleep,
688- }
689- json.dump(job_info, job_file)
690-
691- def get_output(self, job):
692- try:
693- with self._job_output_file(job.job_id, 'r') as job_output_file:
694- return job_output_file.read()
695- except IOError, e:
696- if e.errno == errno.ENOENT:
697- return None
698- raise
699-
700- def set_output(self, job, output):
701- with self._job_output_file(job.job_id, 'w') as job_output_file:
702- job_output_file.write(output)
703-
704-
705-class TestRunJob(TestCase):
706-
707- @staticmethod
708- def makeFakeJobSource(job=None):
709- js = FakeJobSource()
710- if job is None:
711- job = FakeJob(10)
712- js.jobs[job.job_id] = job
713- return js
714-
715- @staticmethod
716- def runJob(js):
717- task = RunJob()
718- task.job_source = js
719- task.run(10)
720-
721- def test_run(self):
722- js = self.makeFakeJobSource()
723- self.assertTrue(js.jobs[10].unrun)
724- self.runJob(js)
725- self.assertFalse(js.jobs[10].unrun)
726-
727- def test_memory_limit(self):
728-
729- class MemoryCheckJob(FakeJob):
730-
731- def run(self):
732- super(MemoryCheckJob, self).run()
733- self.current_memory_limit = getrlimit(RLIMIT_AS)[0]
734-
735- start_limits = getrlimit(RLIMIT_AS)
736- js = FakeJobSource()
737- job = MemoryCheckJob(10)
738- js.jobs[10] = job
739- js.memory_limit = 1024 ** 3
740- task = RunJob()
741- task.job_source = js
742- task.run(10)
743- self.assertEqual(1024 ** 3, job.current_memory_limit)
744- self.assertEqual(start_limits, getrlimit(RLIMIT_AS))
745-
746- def test_acquires_lease(self):
747- js = self.makeFakeJobSource()
748- self.assertFalse(js.jobs[10].lease_held)
749- self.runJob(js)
750- self.assertTrue(js.jobs[10].lease_held)
751-
752- def test_skips_failed_acquisition(self):
753- js = self.makeFakeJobSource()
754- js.jobs[10].acquireLease()
755- self.runJob(js)
756- self.assertTrue(js.jobs[10].unrun)
757-
758-
759-def get_root():
760- import lazr.jobrunner
761- root = os.path.join(os.path.dirname(lazr.jobrunner.__file__), '../../../')
762- return os.path.normpath(root)
763-
764-
765-@contextlib.contextmanager
766-def tempdir():
767- dirname = tempfile.mkdtemp()
768- try:
769- yield dirname
770- finally:
771- shutil.rmtree(dirname)
772-
773-
774-@contextlib.contextmanager
775-def celeryd(config_module, file_job_dir):
776- cmdname = os.path.join(get_root(), 'bin/celeryd')
777- environ = dict(os.environ)
778- environ['FILE_JOB_DIR'] = file_job_dir
779- proc = subprocess.Popen([cmdname, '--config', config_module], env=environ,
780- stderr=subprocess.PIPE)
781- try:
782- yield proc
783- finally:
784- proc.terminate()
785- proc.wait()
786-
787-
788-class RunFileJob(RunJob):
789-
790- name = 'run_file_job'
791-
792- def __init__(self):
793- file_job_dir = None
794-
795- @property
796- def job_source(self):
797- return FileJobSource(self.file_job_dir)
798-
799-
800-class TestCeleryD(TestCase):
801-
802- def test_run_job(self):
803- with tempdir() as temp_dir:
804- js = FileJobSource(temp_dir)
805- job = FileJob(js, 10, 'my_output')
806- job.save()
807- result = RunFileJob.delay(10)
808- self.assertIs(None, js.get_output(job))
809- self.assertEqual(JobStatus.WAITING, job.status)
810- with celeryd('lazr.jobrunner.tests.config1', temp_dir):
811- result.wait(10)
812- job = js.get(job.job_id)
813- self.assertEqual('my_output', js.get_output(job))
814- self.assertEqual(JobStatus.COMPLETED, job.status)
815-
816- def run_file_job(self, temp_dir, config='lazr.jobrunner.tests.config1',
817- **kwargs):
818- js = FileJobSource(temp_dir)
819- job = FileJob(js, 10, **kwargs)
820- job.save()
821- result = RunFileJob.delay(10)
822- with celeryd(config, temp_dir) as proc:
823- try:
824- result.wait(10)
825- except SoftTimeLimitExceeded:
826- pass
827- job = js.get(job.job_id)
828- return job, proc
829-
830- def test_run_job_emits_oopses(self):
831- with tempdir() as temp_dir:
832- job, proc = self.run_file_job(
833- temp_dir, exception='Catch me if you can!')
834- err = proc.stderr.read()
835- self.assertEqual(JobStatus.FAILED, job.status)
836- self.assertIs(None, job.job_source.get_output(job))
837- self.assertIn(
838- "OOPS while executing job 10: [] Exception(u'Catch me if you"
839- " can!',)", err)
840-
841- def test_timeout_long(self):
842- """Raises exception when a job exceeds the configured time limit."""
843- with tempdir() as temp_dir:
844- job, proc = self.run_file_job(
845- temp_dir, config='lazr.jobrunner.tests.time_limit_config',
846- sleep=10)
847- self.assertEqual(JobStatus.FAILED, job.status)
848- err = proc.stderr.read()
849- self.assertIn(
850- 'OOPS while executing job 10: [] SoftTimeLimitExceeded', err)
851-
852-
853 class OOPSTestRepository:
854 """Record OOPSes in memory."""
855
856
857=== modified file 'src/lazr/jobrunner/tests/time_limit_config.py'
858--- src/lazr/jobrunner/tests/time_limit_config.py 2012-03-14 14:22:04 +0000
859+++ src/lazr/jobrunner/tests/time_limit_config.py 2012-03-22 18:19:18 +0000
860@@ -1,6 +1,6 @@
861 BROKER_VHOST = "/"
862 CELERY_RESULT_BACKEND = "amqp"
863-CELERY_IMPORTS = ("lazr.jobrunner.tests.test_jobrunner", )
864+CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
865 CELERYD_CONCURRENCY = 1
866 CELERYD_TASK_SOFT_TIME_LIMIT = 1
867 import os
868
869=== modified file 'src/lazr/jobrunner/version.txt'
870--- src/lazr/jobrunner/version.txt 2012-03-15 14:59:09 +0000
871+++ src/lazr/jobrunner/version.txt 2012-03-22 18:19:18 +0000
872@@ -1,1 +1,1 @@
873-0.1
874+0.2

Subscribers

People subscribed via source and target branches

to all changes: