Merge lp:~abentley/launchpad/rollback-13801 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 13815
Proposed branch: lp:~abentley/launchpad/rollback-13801
Merge into: lp:launchpad
Diff against target: 104 lines (+33/-28)
2 files modified
lib/lp/services/job/runner.py (+33/-13)
lib/lp/services/job/tests/test_runner.py (+0/-15)
To merge this branch: bzr merge lp:~abentley/launchpad/rollback-13801
Reviewer Review Type Date Requested Status
Aaron Bentley (community) Approve
Review via email: mp+73275@code.launchpad.net

Commit message

Roll back r13801

Description of the change

= Summary =
Fix bug #836900: twisted job runner cannot handle LeaseHeld

== Proposed fix ==
roll back r13801

== Pre-implementation notes ==

== Implementation details ==

== Tests ==

== Demo and Q/A ==

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py

To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/runner.py'
2--- lib/lp/services/job/runner.py 2011-08-25 16:57:30 +0000
3+++ lib/lp/services/job/runner.py 2011-08-29 18:40:15 +0000
4@@ -40,6 +40,7 @@
5 from lazr.delegates import delegates
6 import transaction
7 from twisted.internet import (
8+ defer,
9 reactor,
10 )
11 from twisted.protocols import amp
12@@ -60,6 +61,10 @@
13 from lp.services.mail.sendmail import MailController
14 from lp.services.scripts.base import LaunchpadCronScript
15 from lp.services.twistedsupport import run_reactor
16+from lp.services.twistedsupport.task import (
17+ ParallelLimitedTaskConsumer,
18+ PollingTaskSource,
19+ )
20
21
22 class BaseRunnableJobSource:
23@@ -467,21 +472,36 @@
24 oops = self._doOops(job, sys.exc_info())
25 self._logOopsId(oops['id'])
26
27+ def getTaskSource(self):
28+ """Return a task source for all jobs in job_source."""
29+
30+ def producer():
31+ while True:
32+ # XXX: JonathanLange bug=741204: If we're getting all of the
33+ # jobs at the start anyway, we can use a DeferredSemaphore,
34+ # instead of the more complex PollingTaskSource, which is
35+ # better suited to cases where we don't know how much work
36+ # there will be.
37+ jobs = list(self.job_source.iterReady())
38+ if len(jobs) == 0:
39+ yield None
40+ for job in jobs:
41+ yield lambda: self.runJobInSubprocess(job)
42+ return PollingTaskSource(5, producer().next)
43+
44+ def doConsumer(self):
45+ """Create a ParallelLimitedTaskConsumer for this job type."""
46+ # 1 is hard-coded for now until we're sure we'd get gains by running
47+ # more than one at a time. Note that several tests, including
48+ # test_timeout, rely on this being 1.
49+ consumer = ParallelLimitedTaskConsumer(1, logger=None)
50+ return consumer.consume(self.getTaskSource())
51+
52 def runAll(self):
53- """Run all ready jobs."""
54+ """Run all ready jobs, and any that become ready while running."""
55 self.pool.start()
56- try:
57- jobs = list(self.job_source.iterReady())
58- if len(jobs) == 0:
59- self.terminated()
60- return
61- d = self.runJobInSubprocess(jobs[0])
62- for job in jobs[1:]:
63- d.addCallback(lambda ignored: self.runJobInSubprocess(job))
64- d.addCallbacks(self.terminated, self.failed)
65- except:
66- self.terminated()
67- raise
68+ d = defer.maybeDeferred(self.doConsumer)
69+ d.addCallbacks(self.terminated, self.failed)
70
71 def terminated(self, ignored=None):
72 """Callback to stop the processpool and reactor."""
73
74=== modified file 'lib/lp/services/job/tests/test_runner.py'
75--- lib/lp/services/job/tests/test_runner.py 2011-08-25 16:57:30 +0000
76+++ lib/lp/services/job/tests/test_runner.py 2011-08-29 18:40:15 +0000
77@@ -500,13 +500,6 @@
78 self.x = '*' * (10 ** 6)
79
80
81-class NoJobs(StaticJobSource):
82-
83- done = False
84-
85- jobs = []
86-
87-
88 class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
89
90 layer = ZopelessDatabaseLayer
91@@ -615,14 +608,6 @@
92 oops = self.getOopsReport(runner, 0)
93 self.assertEqual('MemoryError', oops.type)
94
95- def test_no_jobs(self):
96- logger = BufferLogger()
97- logger.setLevel(logging.INFO)
98- runner = TwistedJobRunner.runFromSource(
99- NoJobs, 'branchscanner', logger)
100- self.assertEqual(
101- (0, 0), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
102-
103
104 class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
105