Merge lp:~abentley/launchpad/simplify-twisted-runner into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 13801
Proposed branch: lp:~abentley/launchpad/simplify-twisted-runner
Merge into: lp:launchpad
Diff against target: 104 lines (+28/-33)
2 files modified
lib/lp/services/job/runner.py (+13/-33)
lib/lp/services/job/tests/test_runner.py (+15/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/simplify-twisted-runner
Reviewer Review Type Date Requested Status
j.c.sackett (community) Approve
Review via email: mp+72933@code.launchpad.net

Commit message

Simplify Twisted job runner.

Description of the change

= Summary =
Fix bug 833888, "Twisted job runner is more complex than needed" and possibly 605772, "merge-proposal-jobs is "hanging", apparently with nothing to do"

== Proposed fix ==
Stop using ParallelLimitedTaskConsumer

== Pre-implementation notes ==
Discussed with deryck

== Implementation details ==
The Twisted job runner now iterates through the list of jobs only once, instead of trying to process new jobs as they become ready.

== Tests ==
bin/test -v test_runner

== Demo and Q/A ==
Run any job script that uses the TwistedJobRunner. It should work when there are pending jobs, and when there are no pending jobs.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py

To post a comment you must log in.
Revision history for this message
j.c.sackett (jcsackett) wrote :

Looks like a good simplification. Thanks, Aaron.

Revision history for this message
j.c.sackett (jcsackett) :
review: Approve
Revision history for this message
Robert Collins (lifeless) wrote :

Looks fine to me; it does mean we won't be parallelising - is that an issue?

Revision history for this message
Aaron Bentley (abentley) wrote :

I don't think it'll be an issue, because we haven't done it so far. In fact, we already rely on the fact that we're not parallelized in the way we kill workers that have had failures. I do think that if we want to pursue parallelization, we'd still be able to do it with Ampoule, even without ParallelLimitedTaskConsumer, but it's also not clear to me that Ampoule is the right long-term strategy. We might want to use Rabbit + Celery, for example.

Revision history for this message
Aaron Bentley (abentley) wrote :

This breaks when a LeaseHeld is raised, because then runJobInSubprocess returns None, not a Deferred.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/runner.py'
2--- lib/lp/services/job/runner.py 2011-08-19 14:12:28 +0000
3+++ lib/lp/services/job/runner.py 2011-08-25 18:05:26 +0000
4@@ -40,7 +40,6 @@
5 from lazr.delegates import delegates
6 import transaction
7 from twisted.internet import (
8- defer,
9 reactor,
10 )
11 from twisted.protocols import amp
12@@ -61,10 +60,6 @@
13 from lp.services.mail.sendmail import MailController
14 from lp.services.scripts.base import LaunchpadCronScript
15 from lp.services.twistedsupport import run_reactor
16-from lp.services.twistedsupport.task import (
17- ParallelLimitedTaskConsumer,
18- PollingTaskSource,
19- )
20
21
22 class BaseRunnableJobSource:
23@@ -472,36 +467,21 @@
24 oops = self._doOops(job, sys.exc_info())
25 self._logOopsId(oops['id'])
26
27- def getTaskSource(self):
28- """Return a task source for all jobs in job_source."""
29-
30- def producer():
31- while True:
32- # XXX: JonathanLange bug=741204: If we're getting all of the
33- # jobs at the start anyway, we can use a DeferredSemaphore,
34- # instead of the more complex PollingTaskSource, which is
35- # better suited to cases where we don't know how much work
36- # there will be.
37- jobs = list(self.job_source.iterReady())
38- if len(jobs) == 0:
39- yield None
40- for job in jobs:
41- yield lambda: self.runJobInSubprocess(job)
42- return PollingTaskSource(5, producer().next)
43-
44- def doConsumer(self):
45- """Create a ParallelLimitedTaskConsumer for this job type."""
46- # 1 is hard-coded for now until we're sure we'd get gains by running
47- # more than one at a time. Note that several tests, including
48- # test_timeout, rely on this being 1.
49- consumer = ParallelLimitedTaskConsumer(1, logger=None)
50- return consumer.consume(self.getTaskSource())
51-
52 def runAll(self):
53- """Run all ready jobs, and any that become ready while running."""
54+ """Run all ready jobs."""
55 self.pool.start()
56- d = defer.maybeDeferred(self.doConsumer)
57- d.addCallbacks(self.terminated, self.failed)
58+ try:
59+ jobs = list(self.job_source.iterReady())
60+ if len(jobs) == 0:
61+ self.terminated()
62+ return
63+ d = self.runJobInSubprocess(jobs[0])
64+ for job in jobs[1:]:
65+ d.addCallback(lambda ignored: self.runJobInSubprocess(job))
66+ d.addCallbacks(self.terminated, self.failed)
67+ except:
68+ self.terminated()
69+ raise
70
71 def terminated(self, ignored=None):
72 """Callback to stop the processpool and reactor."""
73
74=== modified file 'lib/lp/services/job/tests/test_runner.py'
75--- lib/lp/services/job/tests/test_runner.py 2011-08-19 14:25:39 +0000
76+++ lib/lp/services/job/tests/test_runner.py 2011-08-25 18:05:26 +0000
77@@ -500,6 +500,13 @@
78 self.x = '*' * (10 ** 6)
79
80
81+class NoJobs(StaticJobSource):
82+
83+ done = False
84+
85+ jobs = []
86+
87+
88 class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
89
90 layer = ZopelessDatabaseLayer
91@@ -608,6 +615,14 @@
92 oops = self.getOopsReport(runner, 0)
93 self.assertEqual('MemoryError', oops.type)
94
95+ def test_no_jobs(self):
96+ logger = BufferLogger()
97+ logger.setLevel(logging.INFO)
98+ runner = TwistedJobRunner.runFromSource(
99+ NoJobs, 'branchscanner', logger)
100+ self.assertEqual(
101+ (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
102+
103
104 class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
105