Merge lp:~adeuring/launchpad/abort-transaction-in-job-queue into lp:launchpad

Proposed by Abel Deuring on 2012-04-12
Status: Merged
Approved by: Abel Deuring on 2012-04-13
Approved revision: no longer in the source branch.
Merged at revision: 15095
Proposed branch: lp:~adeuring/launchpad/abort-transaction-in-job-queue
Merge into: lp:launchpad
Diff against target: 177 lines (+118/-7)
3 files modified
lib/lp/services/job/interfaces/job.py (+5/-5)
lib/lp/services/job/model/job.py (+4/-2)
lib/lp/services/job/tests/test_job.py (+109/-0)
To merge this branch: bzr merge lp:~adeuring/launchpad/abort-transaction-in-job-queue
Reviewer Review Type Date Requested Status
j.c.sackett (community) 2012-04-12 Approve on 2012-04-12
Richard Harding (community) code* 2012-04-12 Approve on 2012-04-12
Review via email: mp+101762@code.launchpad.net

Commit Message

Parameter abort_transaction added to Job.queue()

Description of the Change

This branch adds a parameter "abort_transaction" to the method Job.queue().

Jobs are now controlled by the lazr.jobrunner mudule, where I recently
added a timeout mechanism: This Celery based job runner can define
more than one queues for the same jobs, with different timeout values.
If a job times out in a "fast" queue, it cen be re-queue in another
queue with a longer timeout value.

This requires to change the status of the job back to JobStatus.WAITING.
This is done in Job.queue() -- but this method is also called when
a job raises an exception that is listed in Job.retry_error_types.

This retry mechanism assumes that the job left the database in a
consistent state when it raised the "retry exception", hence Job.queue()
calls transaction.commit(). This is mostly likely bad when the job
is interrupted by a timeout, so I added the parameter abort_transaction,
which is True by default.

I also noticed that the paremeter manage_transaction of the methods
Job.start(), Job.complete(), Job.fail() etc was not tested, so I added
these tests:

./bin/test -vvt lp.services.job.tests.test_job.TestJob.test_.*_manages_transactions

no lint

To post a comment you must log in.
Abel Deuring (adeuring) wrote :

...and I noticed that the methods start(), complete(),fail() etc of the class IJob were not up to date: I added the parameter manage_transaction.

Richard Harding (rharding) wrote :

Abel, feel free to correct me here.

- I worry that having method params for manage_transaction that they could be called and not match. So start has it set to true, but not for fail and so we end up with left over transactions not getting processed correctly. It would seem an object level attribute of 'has' or 'uses' transaction would be a better single point to hold that all of the process methods check might help keep things more consistant.
- There's no mention of the LoC policy. Please note in the MP if this is offset somewhere.
- The comments in the test code seem prematurely new lined. For instance: #74-75 would fit in one line per out lint policy. Since this would reduce the LoC impact could these be adjusted?

Thanks for adding the new tests!

review: Needs Information (code*)
Abel Deuring (adeuring) wrote :

Rick, right, the parameter manage_transaction is indeed good candidate for inconsistencies. The problem here: The job runner implementation in the main LP code (now obsolete) called commit() or abort() where needed. The new implementation in lazr.jobrunner does not do this: It theory. it should completely ignore transactions, because these are relevant for _job_ but not the job runner. The problem with this approach is that some of the methods start(), queue() (sorry, can't remember right now which exactly) are called in parts of the main LP code which is not directly related to "real" job management. (I think there is for example code which creates a job instance and then immediately puts it into the status "suspended".) Committing transactions there would be bad, so I added the parameter manage_transaction. It is only set to True in lazr.jobrunner, but it is used the consistently.

Your suggestion to use something like Job.transaction to automatically decide if start(), complete() etc is very interesting but will require some more thoughts where this property should be added. Another option would be to replace the calls of queue() or suspend() outside the job runner with some other methods like "tweakJobStatusWithoutDBCommits()" -- after all, the job status is supposed to be managed by a job runner but not in model code. (example: lp/buildmaster/model/packagebuild.py, line 278) The we could drop the parameter manage_transaction completely.

But I would like to defer this to another branch.

LoC policy: Once lazr.jobrunner is fully implenented and running, Aaron and I expect that we can remove a reasonable amount of lines from LP.

Richard Harding (rharding) wrote :

Thanks for the response Abel, ok'ing.

review: Approve (code*)
j.c.sackett (jcsackett) wrote :

I have nothing further to add to Rick's comments; thanks for this code and the other jobrunner work.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/interfaces/job.py'
2--- lib/lp/services/job/interfaces/job.py 2012-04-06 17:28:25 +0000
3+++ lib/lp/services/job/interfaces/job.py 2012-04-12 17:19:19 +0000
4@@ -115,22 +115,22 @@
5 def getTimeout():
6 """Determine how long this job can run before timing out."""
7
8- def start():
9+ def start(manage_transaction=False):
10 """Mark the job as started."""
11
12- def complete():
13+ def complete(manage_transaction=False):
14 """Mark the job as completed."""
15
16- def fail():
17+ def fail(manage_transaction=False):
18 """Indicate that the job has failed permanently.
19
20 Only running jobs can fail.
21 """
22
23- def queue():
24+ def queue(manage_transaction=False, abort_transaction=False):
25 """Mark the job as queued for processing."""
26
27- def suspend():
28+ def suspend(manage_transaction=False):
29 """Mark the job as suspended.
30
31 Only waiting jobs can be suspended."""
32
33=== modified file 'lib/lp/services/job/model/job.py'
34--- lib/lp/services/job/model/job.py 2012-04-09 19:02:25 +0000
35+++ lib/lp/services/job/model/job.py 2012-04-12 17:19:19 +0000
36@@ -189,10 +189,12 @@
37 if manage_transaction:
38 transaction.commit()
39
40- def queue(self, manage_transaction=False):
41+ def queue(self, manage_transaction=False, abort_transaction=False):
42 """See `IJob`."""
43- # Commit the transaction to update the DB time.
44 if manage_transaction:
45+ if abort_transaction:
46+ transaction.abort()
47+ # Commit the transaction to update the DB time.
48 transaction.commit()
49 self._set_status(JobStatus.WAITING)
50 self.date_finished = datetime.datetime.now(UTC)
51
52=== modified file 'lib/lp/services/job/tests/test_job.py'
53--- lib/lp/services/job/tests/test_job.py 2012-03-21 16:16:08 +0000
54+++ lib/lp/services/job/tests/test_job.py 2012-04-12 17:19:19 +0000
55@@ -9,6 +9,7 @@
56 import pytz
57 from lazr.jobrunner.jobrunner import LeaseHeld
58 from storm.locals import Store
59+import transaction
60
61 from lp.services.database.constants import UTC_NOW
62 from lp.services.database.lpstorm import IStore
63@@ -252,6 +253,114 @@
64 self.assertEqual(
65 status in Job.PENDING_STATUSES, job.is_pending)
66
67+ def test_start_manages_transactions(self):
68+ # Job.start() does not commit the transaction by default.
69+ with TransactionRecorder() as recorder:
70+ job = Job()
71+ job.start()
72+ self.assertEqual([], recorder.transaction_calls)
73+
74+ # If explicitly specified, Job.start() commits the transaction.
75+ with TransactionRecorder() as recorder:
76+ job = Job()
77+ job.start(manage_transaction=True)
78+ self.assertEqual(['commit'], recorder.transaction_calls)
79+
80+ def test_complete_manages_transactions(self):
81+ # Job.complete() does not commit the transaction by default.
82+ job = Job()
83+ job.start()
84+ with TransactionRecorder() as recorder:
85+ job.complete()
86+ self.assertEqual([], recorder.transaction_calls)
87+
88+ # If explicitly specified, Job.complete() commits the transaction.
89+ job = Job()
90+ job.start()
91+ with TransactionRecorder() as recorder:
92+ job.complete(manage_transaction=True)
93+ self.assertEqual(['commit', 'commit'], recorder.transaction_calls)
94+
95+ def test_fail_manages_transactions(self):
96+ # Job.fail() does not commit the transaction by default.
97+ job = Job()
98+ job.start()
99+ with TransactionRecorder() as recorder:
100+ job.fail()
101+ self.assertEqual([], recorder.transaction_calls)
102+
103+ # If explicitly specified, Job.fail() commits the transaction.
104+ # Note that there is an additional commit to update the job status.
105+ job = Job()
106+ job.start()
107+ with TransactionRecorder() as recorder:
108+ job.fail(manage_transaction=True)
109+ self.assertEqual(['abort', 'commit'], recorder.transaction_calls)
110+
111+ def test_queue_manages_transactions(self):
112+ # Job.queue() does not commit the transaction by default.
113+ job = Job()
114+ job.start()
115+ with TransactionRecorder() as recorder:
116+ job.queue()
117+ self.assertEqual([], recorder.transaction_calls)
118+
119+ # If explicitly specified, Job.queue() commits the transaction.
120+ # Note that there is an additional commit to update the job status.
121+ job = Job()
122+ job.start()
123+ with TransactionRecorder() as recorder:
124+ job.queue(manage_transaction=True)
125+ self.assertEqual(['commit', 'commit'], recorder.transaction_calls)
126+
127+ # If abort_transaction=True is also passed to Job.queue()
128+ # the transaction is first aborted, then two times committed.
129+ job = Job()
130+ job.start()
131+ with TransactionRecorder() as recorder:
132+ job.queue(manage_transaction=True, abort_transaction=True)
133+ self.assertEqual(
134+ ['abort', 'commit', 'commit'], recorder.transaction_calls)
135+
136+ def test_suspend_manages_transactions(self):
137+ # Job.suspend() does not commit the transaction by default.
138+ job = Job()
139+ job.start()
140+ with TransactionRecorder() as recorder:
141+ job.suspend()
142+ self.assertEqual([], recorder.transaction_calls)
143+
144+ # If explicitly specified, Job.suspend() commits the transaction.
145+ job = Job()
146+ job.start()
147+ with TransactionRecorder() as recorder:
148+ job.suspend(manage_transaction=True)
149+ self.assertEqual(['commit'], recorder.transaction_calls)
150+
151+
152+class TransactionRecorder:
153+ def __init__(self):
154+ self.transaction_calls = []
155+
156+ def __enter__(self):
157+ self.real_commit = transaction.commit
158+ self.real_abort = transaction.abort
159+ transaction.commit = self.commit
160+ transaction.abort = self.abort
161+ return self
162+
163+ def __exit__(self, exc_type, exc_val, exc_tb):
164+ transaction.commit = self.real_commit
165+ transaction.abort = self.real_abort
166+
167+ def commit(self):
168+ self.transaction_calls.append('commit')
169+ self.real_commit()
170+
171+ def abort(self):
172+ self.transaction_calls.append('abort')
173+ self.real_abort()
174+
175
176 class TestReadiness(TestCase):
177 """Test the implementation of readiness."""