Merge lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 13801
Proposed branch: lp:~abentley/launchpad/simplify-twisted-runner
Merge into: lp:launchpad
Diff against target: 104 lines (+28/-33)
2 files modified
lib/lp/services/job/runner.py (+13/-33)
lib/lp/services/job/tests/test_runner.py (+15/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/simplify-twisted-runner
Reviewer Review Type Date Requested Status
j.c.sackett (community) Approve
Review via email: mp+72933@code.launchpad.net

Commit message

Simplify Twisted job runner.

Description of the change

= Summary =
Fix bug 833888, "Twisted job runner is more complex than needed" and possibly 605772, "merge-proposal-jobs is "hanging", apparently with nothing to do"

== Proposed fix ==
Stop using ParallelLimitedTaskConsumer

== Pre-implementation notes ==
Discussed with deryck

== Implementation details ==
The Twisted job runner now iterates through the list of jobs only once, instead of trying to process new jobs as they become ready.

== Tests ==
bin/test -v test_runner

== Demo and Q/A ==
Run any job script that uses the TwistedJobRunner. It should work when there are pending jobs, and when there are no pending jobs.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py

To post a comment you must log in.
Revision history for this message
j.c.sackett (jcsackett) wrote :

Looks like a good simplification. Thanks, Aaron.

Revision history for this message
j.c.sackett (jcsackett) :
review: Approve
Revision history for this message
Robert Collins (lifeless) wrote :

Looks fine to me; it does mean we won't be parallelising - is that an issue?

Revision history for this message
Aaron Bentley (abentley) wrote :

I don't think it'll be an issue, because we haven't done it so far. In fact, we already rely on the fact that we're not parallelized in the way we kill workers that have had failures. I do think that if we want to pursue parallelization, we'd still be able to do it with Ampoule, even without ParallelLimitedTaskConsumer, but it's also not clear to me that Ampoule is the right long-term strategy. We might want to use Rabbit + Celery, for example.

Revision history for this message
Aaron Bentley (abentley) wrote :

This breaks when a LeaseHeld is raised, because then runJobInSubprocess returns None, not a Deferred.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2011-08-19 14:12:28 +0000
+++ lib/lp/services/job/runner.py 2011-08-25 18:05:26 +0000
@@ -40,7 +40,6 @@
40from lazr.delegates import delegates40from lazr.delegates import delegates
41import transaction41import transaction
42from twisted.internet import (42from twisted.internet import (
43 defer,
44 reactor,43 reactor,
45 )44 )
46from twisted.protocols import amp45from twisted.protocols import amp
@@ -61,10 +60,6 @@
61from lp.services.mail.sendmail import MailController60from lp.services.mail.sendmail import MailController
62from lp.services.scripts.base import LaunchpadCronScript61from lp.services.scripts.base import LaunchpadCronScript
63from lp.services.twistedsupport import run_reactor62from lp.services.twistedsupport import run_reactor
64from lp.services.twistedsupport.task import (
65 ParallelLimitedTaskConsumer,
66 PollingTaskSource,
67 )
6863
6964
70class BaseRunnableJobSource:65class BaseRunnableJobSource:
@@ -472,36 +467,21 @@
472 oops = self._doOops(job, sys.exc_info())467 oops = self._doOops(job, sys.exc_info())
473 self._logOopsId(oops['id'])468 self._logOopsId(oops['id'])
474469
475 def getTaskSource(self):
476 """Return a task source for all jobs in job_source."""
477
478 def producer():
479 while True:
480 # XXX: JonathanLange bug=741204: If we're getting all of the
481 # jobs at the start anyway, we can use a DeferredSemaphore,
482 # instead of the more complex PollingTaskSource, which is
483 # better suited to cases where we don't know how much work
484 # there will be.
485 jobs = list(self.job_source.iterReady())
486 if len(jobs) == 0:
487 yield None
488 for job in jobs:
489 yield lambda: self.runJobInSubprocess(job)
490 return PollingTaskSource(5, producer().next)
491
492 def doConsumer(self):
493 """Create a ParallelLimitedTaskConsumer for this job type."""
494 # 1 is hard-coded for now until we're sure we'd get gains by running
495 # more than one at a time. Note that several tests, including
496 # test_timeout, rely on this being 1.
497 consumer = ParallelLimitedTaskConsumer(1, logger=None)
498 return consumer.consume(self.getTaskSource())
499
500 def runAll(self):470 def runAll(self):
501 """Run all ready jobs, and any that become ready while running."""471 """Run all ready jobs."""
502 self.pool.start()472 self.pool.start()
503 d = defer.maybeDeferred(self.doConsumer)473 try:
504 d.addCallbacks(self.terminated, self.failed)474 jobs = list(self.job_source.iterReady())
475 if len(jobs) == 0:
476 self.terminated()
477 return
478 d = self.runJobInSubprocess(jobs[0])
479 for job in jobs[1:]:
480 d.addCallback(lambda ignored: self.runJobInSubprocess(job))
481 d.addCallbacks(self.terminated, self.failed)
482 except:
483 self.terminated()
484 raise
505485
506 def terminated(self, ignored=None):486 def terminated(self, ignored=None):
507 """Callback to stop the processpool and reactor."""487 """Callback to stop the processpool and reactor."""
508488
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2011-08-19 14:25:39 +0000
+++ lib/lp/services/job/tests/test_runner.py 2011-08-25 18:05:26 +0000
@@ -500,6 +500,13 @@
500 self.x = '*' * (10 ** 6)500 self.x = '*' * (10 ** 6)
501501
502502
503class NoJobs(StaticJobSource):
504
505 done = False
506
507 jobs = []
508
509
503class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):510class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
504511
505 layer = ZopelessDatabaseLayer512 layer = ZopelessDatabaseLayer
@@ -608,6 +615,14 @@
608 oops = self.getOopsReport(runner, 0)615 oops = self.getOopsReport(runner, 0)
609 self.assertEqual('MemoryError', oops.type)616 self.assertEqual('MemoryError', oops.type)
610617
618 def test_no_jobs(self):
619 logger = BufferLogger()
620 logger.setLevel(logging.INFO)
621 runner = TwistedJobRunner.runFromSource(
622 NoJobs, 'branchscanner', logger)
623 self.assertEqual(
624 (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
625
611626
612class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):627class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
613628