Merge lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 13831
Proposed branch: lp:~abentley/launchpad/simplify-twisted-runner-2
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/simplify-twisted-runner
Diff against target: 142 lines (+55/-20)
2 files modified
lib/lp/services/job/runner.py (+28/-20)
lib/lp/services/job/tests/test_runner.py (+27/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/simplify-twisted-runner-2
Reviewer Review Type Date Requested Status
Deryck Hodge (community) code Approve
Review via email: mp+73402@code.launchpad.net

Commit message

Simplify Twisted job runner.

Description of the change

= Summary =
Ensure runJobInSubprocess doesn't violate its expected return value.

== Proposed fix ==
Always return a Deferred from runJobInSubprocess, by returning twisted.internet.defer.success(None) instead of None when a LeaseHeld is raised.

== Pre-implementation notes ==
None

== Implementation details ==
Unified lease acquisition by moving it to BaseJobRunner.acquireLease.
Also extracted job string generation to BaseJobRunner.job_str, and so it can be used by BaseJobRunner.acquireLease.

== Tests ==
bin/test -t test_lease_held_handled

== Demo and Q/A ==
Propose a merge on qastaging. Run merge-proposal-jobs.py on qastaging. It
should run successfully.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py

To post a comment you must log in.
Revision history for this message
Deryck Hodge (deryck) :
review: Approve (code)
Revision history for this message
Aaron Bentley (abentley) wrote :

rolling back due to mysterious problems in QA: https://pastebin.canonical.com/52118/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/runner.py'
2--- lib/lp/services/job/runner.py 2011-08-30 16:10:29 +0000
3+++ lib/lp/services/job/runner.py 2011-08-30 16:10:30 +0000
4@@ -42,6 +42,9 @@
5 from twisted.internet import (
6 reactor,
7 )
8+from twisted.internet.defer import (
9+ succeed,
10+ )
11 from twisted.protocols import amp
12 from twisted.python import log
13 from zope.component import getUtility
14@@ -173,15 +176,32 @@
15 if self.error_utility is None:
16 self.error_utility = errorlog.globalErrorUtility
17
18+ def acquireLease(self, job):
19+ self.logger.debug(
20+ 'Trying to acquire lease for job in state %s' % (
21+ job.status.title,))
22+ try:
23+ job.acquireLease()
24+ except LeaseHeld:
25+ self.logger.debug(
26+ 'Could not acquire lease for %s' % self.job_str(job))
27+ self.incomplete_jobs.append(job)
28+ return False
29+ return True
30+
31+ @staticmethod
32+ def job_str(job):
33+ class_name = job.__class__.__name__
34+ ijob_id = removeSecurityProxy(job).job.id
35+ return '%s (ID %d)' % (class_name, ijob_id)
36+
37 def runJob(self, job):
38 """Attempt to run a job, updating its status as appropriate."""
39 job = IRunnableJob(job)
40
41- class_name = job.__class__.__name__
42- job_id = removeSecurityProxy(job).job.id
43 self.logger.info(
44- 'Running %s (ID %d) in status %s' % (
45- class_name, job_id, job.status.title,))
46+ 'Running %s in status %s' % (
47+ self.job_str(job), job.status.title))
48 job.start()
49 transaction.commit()
50 do_retry = False
51@@ -291,14 +311,7 @@
52 """Run all the Jobs for this JobRunner."""
53 for job in self.jobs:
54 job = IRunnableJob(job)
55- self.logger.debug(
56- 'Trying to acquire lease for job in state %s' % (
57- job.status.title,))
58- try:
59- job.acquireLease()
60- except LeaseHeld:
61- self.logger.debug('Could not acquire lease for job')
62- self.incomplete_jobs.append(job)
63+ if not self.acquireLease(job):
64 continue
65 # Commit transaction to clear the row lock.
66 transaction.commit()
67@@ -412,21 +425,16 @@
68 :return: a Deferred that fires when the job has completed.
69 """
70 job = IRunnableJob(job)
71- try:
72- job.acquireLease()
73- except LeaseHeld:
74- self.incomplete_jobs.append(job)
75- return
76+ if not self.acquireLease(job):
77+ return succeed(None)
78 # Commit transaction to clear the row lock.
79 transaction.commit()
80 job_id = job.id
81 deadline = timegm(job.lease_expires.timetuple())
82
83 # Log the job class and database ID for debugging purposes.
84- class_name = job.__class__.__name__
85- ijob_id = removeSecurityProxy(job).job.id
86 self.logger.info(
87- 'Running %s (ID %d).' % (class_name, ijob_id))
88+ 'Running %s.' % self.job_str(job))
89 self.logger.debug(
90 'Running %r, lease expires %s', job, job.lease_expires)
91 deferred = self.pool.doWork(
92
93=== modified file 'lib/lp/services/job/tests/test_runner.py'
94--- lib/lp/services/job/tests/test_runner.py 2011-08-30 16:10:29 +0000
95+++ lib/lp/services/job/tests/test_runner.py 2011-08-30 16:10:30 +0000
96@@ -23,6 +23,7 @@
97 from lp.services.job.interfaces.job import (
98 IRunnableJob,
99 JobStatus,
100+ LeaseHeld,
101 SuspendJobException,
102 )
103 from lp.services.job.model.job import Job
104@@ -507,6 +508,22 @@
105 jobs = []
106
107
108+class LeaseHeldJob(StaticJobSource):
109+
110+ implements(IRunnableJob)
111+
112+ jobs = [()]
113+
114+ done = False
115+
116+ def __init__(self, id):
117+ self.job = Job()
118+ self.id = id
119+
120+ def acquireLease(self):
121+ raise LeaseHeld()
122+
123+
124 class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
125
126 layer = ZopelessDatabaseLayer
127@@ -623,6 +640,16 @@
128 self.assertEqual(
129 (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
130
131+ def test_lease_held_handled(self):
132+ """Jobs that raise LeaseHeld are handled correctly."""
133+ logger = BufferLogger()
134+ logger.setLevel(logging.DEBUG)
135+ runner = TwistedJobRunner.runFromSource(
136+ LeaseHeldJob, 'branchscanner', logger)
137+ self.assertIn('Could not acquire lease', logger.getLogBuffer())
138+ self.assertEqual(
139+ (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
140+
141
142 class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
143