Merge lp:~abentley/launchpad/simplify-twisted-runner-3 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 13943
Proposed branch: lp:~abentley/launchpad/simplify-twisted-runner-3
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/simplify-twisted-runner-2
Diff against target: 83 lines (+23/-14)
1 file modified
lib/lp/services/job/runner.py (+23/-14)
To merge this branch: bzr merge lp:~abentley/launchpad/simplify-twisted-runner-3
Reviewer Review Type Date Requested Status
Brad Crittenden (community) code Approve
Review via email: mp+74868@code.launchpad.net

Commit message

Inject sys.path into subprocesses.

Description of the change

= Summary =
Fix issues invoking TwistedJobRunner

== Proposed fix ==
Inject the current sys.path into subprocesses as PYTHONPATH

== Pre-implementation notes ==
None

== Implementation details ==
This builds on previous work toward simplifying the TwistedJobRunner.

It fixes issues finding and launching the JobRunnerProcess class by injecting the current sys.path into subprocesses as PYTHONPATH.

It also simplifies the code further by using the inclineCallbacks decorator.

It also improves handling of process startup failures by handling the case when result is None.

It also outputs 'No jobs to run.' if no jobs were found, to clarify output.

It also raises 'Could not acquire lease...' to an INFO message, so that it's at the same level as 'Running ...' messages.

== Tests ==
bin/test test_runner

== Demo and Q/A ==
Create a branch and run cronscripts/scan_branches.py on qastaging. It should complete successfully.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py

To post a comment you must log in.
Brad Crittenden (bac) wrote :

Looks great Aaron.

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/runner.py'
2--- lib/lp/services/job/runner.py 2011-09-09 20:05:32 +0000
3+++ lib/lp/services/job/runner.py 2011-09-09 20:05:36 +0000
4@@ -43,10 +43,14 @@
5 reactor,
6 )
7 from twisted.internet.defer import (
8+ inlineCallbacks,
9 succeed,
10 )
11 from twisted.protocols import amp
12-from twisted.python import log
13+from twisted.python import (
14+ failure,
15+ log,
16+ )
17 from zope.component import getUtility
18 from zope.security.proxy import removeSecurityProxy
19
20@@ -183,7 +187,7 @@
21 try:
22 job.acquireLease()
23 except LeaseHeld:
24- self.logger.debug(
25+ self.logger.info(
26 'Could not acquire lease for %s' % self.job_str(job))
27 self.incomplete_jobs.append(job)
28 return False
29@@ -406,11 +410,10 @@
30
31 def __init__(self, job_source, dbuser, logger=None, error_utility=None):
32 env = {'PATH': os.environ['PATH']}
33- for name in ('PYTHONPATH', 'LPCONFIG'):
34- if name in os.environ:
35- env[name] = os.environ[name]
36- starter = main.ProcessStarter(
37- packages=('_pythonpath', 'twisted', 'ampoule'), env=env)
38+ if 'LPCONFIG' in os.environ:
39+ env['LPCONFIG'] = os.environ['LPCONFIG']
40+ env['PYTHONPATH'] = os.pathsep.join(sys.path)
41+ starter = main.ProcessStarter(env=env)
42 super(TwistedJobRunner, self).__init__(logger, error_utility)
43 self.job_source = job_source
44 self.import_name = '%s.%s' % (
45@@ -441,6 +444,10 @@
46 RunJobCommand, job_id=job_id, _deadline=deadline)
47
48 def update(response):
49+ if response is None:
50+ self.incomplete_jobs.append(job)
51+ self.logger.debug('No response for %r', job)
52+ return
53 if response['success']:
54 self.completed_jobs.append(job)
55 self.logger.debug('Finished %r', job)
56@@ -475,18 +482,20 @@
57 oops = self._doOops(job, sys.exc_info())
58 self._logOopsId(oops['id'])
59
60+ @inlineCallbacks
61 def runAll(self):
62 """Run all ready jobs."""
63 self.pool.start()
64 try:
65- jobs = list(self.job_source.iterReady())
66- if len(jobs) == 0:
67+ try:
68+ job = None
69+ for job in self.job_source.iterReady():
70+ yield self.runJobInSubprocess(job)
71+ if job is None:
72+ self.logger.info('No jobs to run.')
73 self.terminated()
74- return
75- d = self.runJobInSubprocess(jobs[0])
76- for job in jobs[1:]:
77- d.addCallback(lambda ignored: self.runJobInSubprocess(job))
78- d.addCallbacks(self.terminated, self.failed)
79+ except:
80+ self.failed(failure.Failure())
81 except:
82 self.terminated()
83 raise