Merge lp:~abentley/launchpad/simplify-twisted-runner-3 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 13943
Proposed branch: lp:~abentley/launchpad/simplify-twisted-runner-3
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/simplify-twisted-runner-2
Diff against target: 83 lines (+23/-14)
1 file modified
lib/lp/services/job/runner.py (+23/-14)
To merge this branch: bzr merge lp:~abentley/launchpad/simplify-twisted-runner-3
Reviewer Review Type Date Requested Status
Brad Crittenden (community) code Approve
Review via email: mp+74868@code.launchpad.net

Commit message

Inject sys.path into subprocesses.

Description of the change

= Summary =
Fix issues invoking TwistedJobRunner

== Proposed fix ==
Inject the current sys.path into subprocesses as PYTHONPATH

== Pre-implementation notes ==
None

== Implementation details ==
This builds on previous work toward simplifying the TwistedJobRunner.

It fixes issues finding and launching the JobRunnerProcess class by injecting the current sys.path into subprocesses as PYTHONPATH.

It also simplifies the code further by using the inclineCallbacks decorator.

It also improves handling of process startup failures by handling the case when result is None.

It also outputs 'No jobs to run.' if no jobs were found, to clarify output.

It also raises 'Could not acquire lease...' to an INFO message, so that it's at the same level as 'Running ...' messages.

== Tests ==
bin/test test_runner

== Demo and Q/A ==
Create a branch and run cronscripts/scan_branches.py on qastaging. It should complete successfully.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py

To post a comment you must log in.
Revision history for this message
Brad Crittenden (bac) wrote :

Looks great Aaron.

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2011-09-09 20:05:32 +0000
+++ lib/lp/services/job/runner.py 2011-09-09 20:05:36 +0000
@@ -43,10 +43,14 @@
43 reactor,43 reactor,
44 )44 )
45from twisted.internet.defer import (45from twisted.internet.defer import (
46 inlineCallbacks,
46 succeed,47 succeed,
47 )48 )
48from twisted.protocols import amp49from twisted.protocols import amp
49from twisted.python import log50from twisted.python import (
51 failure,
52 log,
53 )
50from zope.component import getUtility54from zope.component import getUtility
51from zope.security.proxy import removeSecurityProxy55from zope.security.proxy import removeSecurityProxy
5256
@@ -183,7 +187,7 @@
183 try:187 try:
184 job.acquireLease()188 job.acquireLease()
185 except LeaseHeld:189 except LeaseHeld:
186 self.logger.debug(190 self.logger.info(
187 'Could not acquire lease for %s' % self.job_str(job))191 'Could not acquire lease for %s' % self.job_str(job))
188 self.incomplete_jobs.append(job)192 self.incomplete_jobs.append(job)
189 return False193 return False
@@ -406,11 +410,10 @@
406410
407 def __init__(self, job_source, dbuser, logger=None, error_utility=None):411 def __init__(self, job_source, dbuser, logger=None, error_utility=None):
408 env = {'PATH': os.environ['PATH']}412 env = {'PATH': os.environ['PATH']}
409 for name in ('PYTHONPATH', 'LPCONFIG'):413 if 'LPCONFIG' in os.environ:
410 if name in os.environ:414 env['LPCONFIG'] = os.environ['LPCONFIG']
411 env[name] = os.environ[name]415 env['PYTHONPATH'] = os.pathsep.join(sys.path)
412 starter = main.ProcessStarter(416 starter = main.ProcessStarter(env=env)
413 packages=('_pythonpath', 'twisted', 'ampoule'), env=env)
414 super(TwistedJobRunner, self).__init__(logger, error_utility)417 super(TwistedJobRunner, self).__init__(logger, error_utility)
415 self.job_source = job_source418 self.job_source = job_source
416 self.import_name = '%s.%s' % (419 self.import_name = '%s.%s' % (
@@ -441,6 +444,10 @@
441 RunJobCommand, job_id=job_id, _deadline=deadline)444 RunJobCommand, job_id=job_id, _deadline=deadline)
442445
443 def update(response):446 def update(response):
447 if response is None:
448 self.incomplete_jobs.append(job)
449 self.logger.debug('No response for %r', job)
450 return
444 if response['success']:451 if response['success']:
445 self.completed_jobs.append(job)452 self.completed_jobs.append(job)
446 self.logger.debug('Finished %r', job)453 self.logger.debug('Finished %r', job)
@@ -475,18 +482,20 @@
475 oops = self._doOops(job, sys.exc_info())482 oops = self._doOops(job, sys.exc_info())
476 self._logOopsId(oops['id'])483 self._logOopsId(oops['id'])
477484
485 @inlineCallbacks
478 def runAll(self):486 def runAll(self):
479 """Run all ready jobs."""487 """Run all ready jobs."""
480 self.pool.start()488 self.pool.start()
481 try:489 try:
482 jobs = list(self.job_source.iterReady())490 try:
483 if len(jobs) == 0:491 job = None
492 for job in self.job_source.iterReady():
493 yield self.runJobInSubprocess(job)
494 if job is None:
495 self.logger.info('No jobs to run.')
484 self.terminated()496 self.terminated()
485 return497 except:
486 d = self.runJobInSubprocess(jobs[0])498 self.failed(failure.Failure())
487 for job in jobs[1:]:
488 d.addCallback(lambda ignored: self.runJobInSubprocess(job))
489 d.addCallbacks(self.terminated, self.failed)
490 except:499 except:
491 self.terminated()500 self.terminated()
492 raise501 raise