Merge lp:~adeuring/lazr.jobrunner/early-reference-to-job.fail-test into lp:lazr.jobrunner

Proposed by Abel Deuring on 2012-03-26
Status: Merged
Approved by: Aaron Bentley on 2012-03-26
Approved revision: 24
Merged at revision: 23
Proposed branch: lp:~adeuring/lazr.jobrunner/early-reference-to-job.fail-test
Merge into: lp:lazr.jobrunner
Diff against target: 161 lines (+81/-20)
2 files modified
src/lazr/jobrunner/jobrunner.py (+20/-19)
src/lazr/jobrunner/tests/test_jobrunner.py (+61/-1)
To merge this branch: bzr merge lp:~adeuring/lazr.jobrunner/early-reference-to-job.fail-test
Reviewer Review Type Date Requested Status
Aaron Bentley (community) 2012-03-26 Approve on 2012-03-26
Review via email: mp+99312@code.launchpad.net

Description of the Change

This branch adds a test to imitate the possible behavviour of Storm
based job instances: That accessing methods or other of the instance
may fail when a DB error occurred. The test uses the job class
VolatileAttributesJob, where accessing the method 'fail' fails after
run() has been called.

I also noticed that JobRunner.run() left jobs in the status RUNNING,
if the methods suspend(), queue(), complete() fail. This can easily
happen, for example, when the method run() of a Storm based class seems
to succeed, but the call of transaction.commit() in job.complete()
fails due to an IntegrityError.

Hence I reorganised the exception handling in JobRunner.run() a bit,
so that job.fail() is called when suspend(), queue(), complete()
raise an error.

To post a comment you must log in.
Aaron Bentley (abentley) wrote :

Good catch.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/lazr/jobrunner/jobrunner.py'
2--- src/lazr/jobrunner/jobrunner.py 2012-03-26 09:09:17 +0000
3+++ src/lazr/jobrunner/jobrunner.py 2012-03-26 12:51:49 +0000
4@@ -159,29 +159,30 @@
5 do_retry = False
6 try:
7 try:
8- job.run()
9- except job.retry_error_types, e:
10- if job.attempt_count > job.max_retries:
11- raise
12- self.logger.exception(
13- "Scheduling retry due to %s: %s." % (
14- e.__class__.__name__, e))
15- do_retry = True
16- except SuspendJobException:
17- self.logger.debug("Job suspended itself")
18- job.suspend(manage_transaction=True)
19- self.incomplete_jobs.append(job)
20+ try:
21+ job.run()
22+ except job.retry_error_types, e:
23+ if job.attempt_count > job.max_retries:
24+ raise
25+ self.logger.exception(
26+ "Scheduling retry due to %s: %s." % (
27+ e.__class__.__name__, e))
28+ do_retry = True
29+ except SuspendJobException:
30+ self.logger.debug("Job suspended itself")
31+ job.suspend(manage_transaction=True)
32+ self.incomplete_jobs.append(job)
33+ else:
34+ if do_retry:
35+ job.queue(manage_transaction=True)
36+ self.incomplete_jobs.append(job)
37+ else:
38+ job.complete(manage_transaction=True)
39+ self.completed_jobs.append(job)
40 except Exception:
41 fail(manage_transaction=True)
42 self.incomplete_jobs.append(job)
43 raise
44- else:
45- if do_retry:
46- job.queue(manage_transaction=True)
47- self.incomplete_jobs.append(job)
48- else:
49- job.complete(manage_transaction=True)
50- self.completed_jobs.append(job)
51
52 def runJobHandleError(self, job):
53 """Run the specified job, handling errors."""
54
55=== modified file 'src/lazr/jobrunner/tests/test_jobrunner.py'
56--- src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-26 09:09:17 +0000
57+++ src/lazr/jobrunner/tests/test_jobrunner.py 2012-03-26 12:51:49 +0000
58@@ -36,7 +36,8 @@
59
60 retry_error_types = ()
61
62- def __init__(self, job_id, failure=None):
63+ def __init__(self, job_id, failure=None, error_in_queue=False,
64+ error_in_suspend=False, error_in_complete=False):
65 super(FakeJob, self).__init__(job_id)
66 self.unrun = True
67 self.failure = failure
68@@ -44,6 +45,9 @@
69 self.notifyUserError_called = False
70 self.queue_call_count = 0
71 self.lease_held = False
72+ self.error_in_queue = error_in_queue
73+ self.error_in_suspend = error_in_suspend
74+ self.error_in_complete = error_in_complete
75
76 def save(self):
77 pass
78@@ -63,6 +67,8 @@
79 self.save()
80
81 def complete(self, manage_transaction=False):
82+ if self.error_in_complete:
83+ raise Exception('complete() failed')
84 super(FakeJob, self).complete(manage_transaction)
85 self.save()
86
87@@ -74,9 +80,16 @@
88 self.notifyOops_called = True
89
90 def queue(self, manage_transaction=False):
91+ if self.error_in_queue:
92+ raise Exception('queue() failed')
93 super(FakeJob, self).queue(manage_transaction)
94 self.queue_call_count += 1
95
96+ def suspend(self, manage_transaction=False):
97+ if self.error_in_suspend:
98+ raise Exception('suspend() failed')
99+ super(FakeJob, self).suspend(manage_transaction)
100+
101 def notifyUserError(self, exception):
102 self.notifyUserError_called = True
103
104@@ -84,6 +97,22 @@
105 return [('foo', 'bar')]
106
107
108+class VolatileAttributesJob:
109+
110+ def __init__(self, job_id):
111+ self.job = FakeJob(job_id)
112+ self.failed = False
113+
114+ def run(self):
115+ self.failed = True
116+ raise Exception('foo')
117+
118+ def __getattr__(self, name):
119+ if name == 'fail' and self.failed:
120+ raise AttributeError('no access')
121+ return getattr(self.job, name)
122+
123+
124 class OOPSTestRepository:
125 """Record OOPSes in memory."""
126
127@@ -257,3 +286,34 @@
128 oopsMessage=message_storage.oopsMessage)
129 report = runner.runJobHandleError(job)
130 self.assertEqual({'foo': 'bar'}, report['extra_data'])
131+
132+ def test_job_with_volatile_attributes(self):
133+ # Properties of jobs that are Storm objects may not be accessible
134+ # when a database transaction failed. In this case, even the
135+ # method Job.fail() may not be accessible. JobRUnner.run()
136+ # handles this by storing an early reference to job.fail().
137+ job = VolatileAttributesJob(1)
138+ self.runner.runJobHandleError(job)
139+ self.assertEqual(JobStatus.FAILED, job.status)
140+
141+ def test_job_fails_in_complete(self):
142+ # If job.complete() fails, job.fail() is called.
143+ job = FakeJob(1, error_in_complete=True)
144+ self.runner.runJobHandleError(job)
145+ self.assertEqual(JobStatus.FAILED, job.status)
146+
147+ def test_job_fails_in_suspend(self):
148+ # If job.suspend() fails, job.fail() is called.
149+ job = FakeJob(1, SuspendJobException, error_in_suspend=True)
150+ self.runner.runJobHandleError(job)
151+ self.assertEqual(JobStatus.FAILED, job.status)
152+
153+ def test_job_fails_in_queue(self):
154+ # If job.queue() fails, job.fail() is called.
155+ class TryAgain(Exception):
156+ pass
157+
158+ job = FakeJob(1, TryAgain('once more'), error_in_queue=True)
159+ job.retry_error_types = (TryAgain, )
160+ self.runner.runJobHandleError(job)
161+ self.assertEqual(JobStatus.FAILED, job.status)

Subscribers

People subscribed via source and target branches

to all changes: