Merge lp:~abentley/launchpad/simplify-twisted-runner-2 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 13831
Proposed branch: lp:~abentley/launchpad/simplify-twisted-runner-2
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/simplify-twisted-runner
Diff against target: 142 lines (+55/-20)
2 files modified
lib/lp/services/job/runner.py (+28/-20)
lib/lp/services/job/tests/test_runner.py (+27/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/simplify-twisted-runner-2
Reviewer Review Type Date Requested Status
Deryck Hodge (community) code Approve
Review via email: mp+73402@code.launchpad.net

Commit message

Simplify Twisted job runner.

Description of the change

= Summary =
Ensure runJobInSubprocess doesn't violate its expected return value.

== Proposed fix ==
Always return a Deferred from runJobInSubprocess, by returning twisted.internet.defer.success(None) instead of None when a LeaseHeld is raised.

== Pre-implementation notes ==
None

== Implementation details ==
Unified lease acquisition by moving it to BaseJobRunner.acquireLease.
Also extracted job string generation to BaseJobRunner.job_str, and so it can be used by BaseJobRunner.acquireLease.

== Tests ==
bin/test -t test_lease_held_handled

== Demo and Q/A ==
Propose a merge on qastaging. Run merge-proposal-jobs.py on qastaging. It
should run successfully.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py

To post a comment you must log in.
Revision history for this message
Deryck Hodge (deryck) :
review: Approve (code)
Revision history for this message
Aaron Bentley (abentley) wrote :

rolling back due to mysterious problems in QA: https://pastebin.canonical.com/52118/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2011-08-30 16:10:29 +0000
+++ lib/lp/services/job/runner.py 2011-08-30 16:10:30 +0000
@@ -42,6 +42,9 @@
42from twisted.internet import (42from twisted.internet import (
43 reactor,43 reactor,
44 )44 )
45from twisted.internet.defer import (
46 succeed,
47 )
45from twisted.protocols import amp48from twisted.protocols import amp
46from twisted.python import log49from twisted.python import log
47from zope.component import getUtility50from zope.component import getUtility
@@ -173,15 +176,32 @@
173 if self.error_utility is None:176 if self.error_utility is None:
174 self.error_utility = errorlog.globalErrorUtility177 self.error_utility = errorlog.globalErrorUtility
175178
179 def acquireLease(self, job):
180 self.logger.debug(
181 'Trying to acquire lease for job in state %s' % (
182 job.status.title,))
183 try:
184 job.acquireLease()
185 except LeaseHeld:
186 self.logger.debug(
187 'Could not acquire lease for %s' % self.job_str(job))
188 self.incomplete_jobs.append(job)
189 return False
190 return True
191
192 @staticmethod
193 def job_str(job):
194 class_name = job.__class__.__name__
195 ijob_id = removeSecurityProxy(job).job.id
196 return '%s (ID %d)' % (class_name, ijob_id)
197
176 def runJob(self, job):198 def runJob(self, job):
177 """Attempt to run a job, updating its status as appropriate."""199 """Attempt to run a job, updating its status as appropriate."""
178 job = IRunnableJob(job)200 job = IRunnableJob(job)
179201
180 class_name = job.__class__.__name__
181 job_id = removeSecurityProxy(job).job.id
182 self.logger.info(202 self.logger.info(
183 'Running %s (ID %d) in status %s' % (203 'Running %s in status %s' % (
184 class_name, job_id, job.status.title,))204 self.job_str(job), job.status.title))
185 job.start()205 job.start()
186 transaction.commit()206 transaction.commit()
187 do_retry = False207 do_retry = False
@@ -291,14 +311,7 @@
291 """Run all the Jobs for this JobRunner."""311 """Run all the Jobs for this JobRunner."""
292 for job in self.jobs:312 for job in self.jobs:
293 job = IRunnableJob(job)313 job = IRunnableJob(job)
294 self.logger.debug(314 if not self.acquireLease(job):
295 'Trying to acquire lease for job in state %s' % (
296 job.status.title,))
297 try:
298 job.acquireLease()
299 except LeaseHeld:
300 self.logger.debug('Could not acquire lease for job')
301 self.incomplete_jobs.append(job)
302 continue315 continue
303 # Commit transaction to clear the row lock.316 # Commit transaction to clear the row lock.
304 transaction.commit()317 transaction.commit()
@@ -412,21 +425,16 @@
412 :return: a Deferred that fires when the job has completed.425 :return: a Deferred that fires when the job has completed.
413 """426 """
414 job = IRunnableJob(job)427 job = IRunnableJob(job)
415 try:428 if not self.acquireLease(job):
416 job.acquireLease()429 return succeed(None)
417 except LeaseHeld:
418 self.incomplete_jobs.append(job)
419 return
420 # Commit transaction to clear the row lock.430 # Commit transaction to clear the row lock.
421 transaction.commit()431 transaction.commit()
422 job_id = job.id432 job_id = job.id
423 deadline = timegm(job.lease_expires.timetuple())433 deadline = timegm(job.lease_expires.timetuple())
424434
425 # Log the job class and database ID for debugging purposes.435 # Log the job class and database ID for debugging purposes.
426 class_name = job.__class__.__name__
427 ijob_id = removeSecurityProxy(job).job.id
428 self.logger.info(436 self.logger.info(
429 'Running %s (ID %d).' % (class_name, ijob_id))437 'Running %s.' % self.job_str(job))
430 self.logger.debug(438 self.logger.debug(
431 'Running %r, lease expires %s', job, job.lease_expires)439 'Running %r, lease expires %s', job, job.lease_expires)
432 deferred = self.pool.doWork(440 deferred = self.pool.doWork(
433441
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2011-08-30 16:10:29 +0000
+++ lib/lp/services/job/tests/test_runner.py 2011-08-30 16:10:30 +0000
@@ -23,6 +23,7 @@
23from lp.services.job.interfaces.job import (23from lp.services.job.interfaces.job import (
24 IRunnableJob,24 IRunnableJob,
25 JobStatus,25 JobStatus,
26 LeaseHeld,
26 SuspendJobException,27 SuspendJobException,
27 )28 )
28from lp.services.job.model.job import Job29from lp.services.job.model.job import Job
@@ -507,6 +508,22 @@
507 jobs = []508 jobs = []
508509
509510
511class LeaseHeldJob(StaticJobSource):
512
513 implements(IRunnableJob)
514
515 jobs = [()]
516
517 done = False
518
519 def __init__(self, id):
520 self.job = Job()
521 self.id = id
522
523 def acquireLease(self):
524 raise LeaseHeld()
525
526
510class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):527class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
511528
512 layer = ZopelessDatabaseLayer529 layer = ZopelessDatabaseLayer
@@ -623,6 +640,16 @@
623 self.assertEqual(640 self.assertEqual(
624 (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))641 (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
625642
643 def test_lease_held_handled(self):
644 """Jobs that raise LeaseHeld are handled correctly."""
645 logger = BufferLogger()
646 logger.setLevel(logging.DEBUG)
647 runner = TwistedJobRunner.runFromSource(
648 LeaseHeldJob, 'branchscanner', logger)
649 self.assertIn('Could not acquire lease', logger.getLogBuffer())
650 self.assertEqual(
651 (0, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
652
626653
627class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):654class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
628655