Merge lp:~abentley/launchpad/ampoule-0.1.1 into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Gavin Panella
Approved revision: not available
Merged at revision: not available
Proposed branch: lp:~abentley/launchpad/ampoule-0.1.1
Merge into: lp:launchpad
Diff against target: 239 lines (+36/-69)
5 files modified
lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py (+2/-1)
lib/lp/code/model/branchmergeproposaljob.py (+1/-9)
lib/lp/services/job/runner.py (+30/-45)
lib/lp/services/job/tests/test_runner.py (+1/-9)
versions.cfg (+2/-5)
To merge this branch: bzr merge lp:~abentley/launchpad/ampoule-0.1.1
Reviewer Review Type Date Requested Status
Gavin Panella (community) Approve
Review via email: mp+18481@code.launchpad.net
To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) wrote :

= Summary =
Update to Ampoule 0.2.0.

== Proposed fix ==
This branch updates ampoule and also adds a version for pyOpenSSL, which is a
new Ampoule dependency. This new version of ampoule has many changes that I
suggested, to push as much code upstream as I thought was architecturally
sound.

== Pre-implementation notes ==
None

== Implementation details ==
Instead of having a separate AMPChild subclass for every JobSource, a new
ampoule feature allows us to parameterize the JobRunnerProcess. We supply a
full path to the JobSource. In JobRunnerProcess, we import this and use it. I
used 'exec' for this, because the import module looked crazy complicated.

Instead of supplying our own BOOTSTRAP code in order to provide a bit more
startup code, a new ampoule feature lets us simply implement the context
manager protocol in JobRunnerProcess.

Instead of subclassing pool.ProcessPool to use SIGHUP, ampoule now lets us
parameterize the timout_signal.

Instead of converting our leases, which are datetimes, into timeout durations,
a new ampoule feature lets us supply them as deadlines.

job_class was renamed to job_source where this was a better description of its
usage.

== Tests ==
bin/test -vt test_runner -t update_preview_diffs

== Demo and Q/A ==
No visible change.

= Launchpad lint =

Checking for conflicts. and issues in doctests and templates.
Running jslint, xmllint, pyflakes, and pylint.
Using normal rules.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
  versions.cfg
  lib/lp/code/model/branchmergeproposaljob.py

== Pyflakes notices ==

lib/lp/services/job/runner.py
    240: undefined name 'job_source'

^^^ This is because of the exec.

== Pylint notices ==

lib/lp/services/job/runner.py
    35: [F0401] Unable to import 'lazr.delegates' (No module named delegates)
    239: [W0122, JobRunnerProcess.__init__] Use of the exec statement
    240: [E0602, JobRunnerProcess.__init__] Undefined variable 'job_source'

^^^ This is because of the exec.

lib/lp/code/model/branchmergeproposaljob.py
    22: [F0401] Unable to import 'lazr.delegates' (No module named delegates)
    23: [F0401] Unable to import 'lazr.enum' (No module named enum)

Revision history for this message
Aaron Bentley (abentley) wrote :

After submitting this, I remembered about __import__, so I've replaced exec with __import__.

Revision history for this message
Gavin Panella (allenap) wrote :

Hi Aaron,

I can't pretend to fully understand all of this, but the changes look
good. I have one tiny comment.

Gavin.

> === modified file 'lib/lp/services/job/runner.py'
> --- lib/lp/services/job/runner.py 2010-01-16 10:20:46 +0000
> +++ lib/lp/services/job/runner.py 2010-02-03 16:02:29 +0000
> @@ -15,6 +15,7 @@
> ]
>
>
> +from calendar import timegm
> import contextlib
> import logging
> import os
> @@ -22,9 +23,8 @@
> import sys
>
> from ampoule import child, pool, main
> -from twisted.internet import defer, error, reactor, stdio
> +from twisted.internet import defer, reactor
> from twisted.protocols import amp
> -from twisted.python import log, reflect
>
> from zope.component import getUtility
> from zope.security.proxy import removeSecurityProxy
> @@ -231,9 +231,25 @@
> class JobRunnerProcess(child.AMPChild):
> """Base class for processes that run jobs."""
>
> - def __init__(self):
> + def __init__(self, job_source_name):
> child.AMPChild.__init__(self)
> - self.context_manager = self.job_class.contextManager()
> + segments = job_source_name.split('.')
> + module = '.'.join(segments[:-1])
> + name = segments[-1]

Could you simplify this to:

  module, name = job_source_name.rsplit('.', 1)

?

Ah, I guess it wouldn't work if there are no "."s in job_source_name.

review: Approve
Revision history for this message
Aaron Bentley (abentley) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Gavin Panella wrote:
>> - def __init__(self):
>> + def __init__(self, job_source_name):
>> child.AMPChild.__init__(self)
>> - self.context_manager = self.job_class.contextManager()
>> + segments = job_source_name.split('.')
>> + module = '.'.join(segments[:-1])
>> + name = segments[-1]
>
> Could you simplify this to:
>
> module, name = job_source_name.rsplit('.', 1)

Done. Thanks for the tip.

> Ah, I guess it wouldn't work if there are no "."s in job_source_name.

If there are no dots, the import will fail anyhow. I can't imagine any
time when it would be sensible to supply a job_source_name with no dots
in it, though.

Aaron
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (GNU/Linux)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAktppBUACgkQ0F+nu1YWqI2AyACeM7qOYIfkj+04WACDGO5Mt+QA
QUQAni+O5BLOSNU31EOpbUauFrVgX3JF
=ieiV
-----END PGP SIGNATURE-----

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py'
--- lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py 2010-02-01 18:37:00 +0000
+++ lib/lp/bugs/windmill/tests/test_bug_inline_subscriber.py 2010-02-03 22:24:19 +0000
@@ -18,7 +18,8 @@
18 layer = BugsWindmillLayer18 layer = BugsWindmillLayer
19 suite_name = 'Inline bug page subscribers test'19 suite_name = 'Inline bug page subscribers test'
2020
21 def test_inline_subscriber(self):21 def DISABLED_test_inline_subscriber(self):
22 # This test fails intermittently. See bug #516781.
22 """Test inline subscribing on bugs pages.23 """Test inline subscribing on bugs pages.
2324
24 This test makes sure that subscribing and unsubscribing25 This test makes sure that subscribing and unsubscribing
2526
=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
--- lib/lp/code/model/branchmergeproposaljob.py 2010-01-11 20:07:17 +0000
+++ lib/lp/code/model/branchmergeproposaljob.py 2010-02-03 22:24:19 +0000
@@ -51,7 +51,7 @@
51from lp.codehosting.vfs import get_multi_server, get_scanner_server51from lp.codehosting.vfs import get_multi_server, get_scanner_server
52from lp.services.job.model.job import Job52from lp.services.job.model.job import Job
53from lp.services.job.interfaces.job import IRunnableJob53from lp.services.job.interfaces.job import IRunnableJob
54from lp.services.job.runner import BaseRunnableJob, JobRunnerProcess54from lp.services.job.runner import BaseRunnableJob
5555
5656
57class BranchMergeProposalJobType(DBEnumeratedType):57class BranchMergeProposalJobType(DBEnumeratedType):
@@ -283,14 +283,6 @@
283 self.branch_merge_proposal.preview_diff = preview283 self.branch_merge_proposal.preview_diff = preview
284284
285285
286class UpdatePreviewDiffProcess(JobRunnerProcess):
287 """A process that runs UpdatePreviewDiffJobs"""
288 job_class = UpdatePreviewDiffJob
289
290
291UpdatePreviewDiffJob.amp = UpdatePreviewDiffProcess
292
293
294class CreateMergeProposalJob(BaseRunnableJob):286class CreateMergeProposalJob(BaseRunnableJob):
295 """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`."""287 """See `ICreateMergeProposalJob` and `ICreateMergeProposalJobSource`."""
296288
297289
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2010-02-03 11:17:12 +0000
+++ lib/lp/services/job/runner.py 2010-02-03 22:24:18 +0000
@@ -15,6 +15,7 @@
15 ]15 ]
1616
1717
18from calendar import timegm
18import contextlib19import contextlib
19import logging20import logging
20import os21import os
@@ -22,9 +23,8 @@
22import sys23import sys
2324
24from ampoule import child, pool, main25from ampoule import child, pool, main
25from twisted.internet import defer, error, reactor, stdio26from twisted.internet import defer, reactor
26from twisted.protocols import amp27from twisted.protocols import amp
27from twisted.python import log, reflect
2828
29from zope.component import getUtility29from zope.component import getUtility
30from zope.security.proxy import removeSecurityProxy30from zope.security.proxy import removeSecurityProxy
@@ -242,9 +242,23 @@
242class JobRunnerProcess(child.AMPChild):242class JobRunnerProcess(child.AMPChild):
243 """Base class for processes that run jobs."""243 """Base class for processes that run jobs."""
244244
245 def __init__(self):245 def __init__(self, job_source_name):
246 child.AMPChild.__init__(self)246 child.AMPChild.__init__(self)
247 self.context_manager = self.job_class.contextManager()247 module, name = job_source_name.rsplit('.', 1)
248 source_module = __import__(module, fromlist=[name])
249 self.job_source = getattr(source_module, name)
250 self.context_manager = self.job_source.contextManager()
251
252 @staticmethod
253 def __enter__():
254 def handler(signum, frame):
255 raise TimeoutError
256 scripts.execute_zcml_for_scripts(use_web_security=False)
257 signal(SIGHUP, handler)
258
259 @staticmethod
260 def __exit__(exc_type, exc_val, exc_tb):
261 pass
248262
249 def makeConnection(self, transport):263 def makeConnection(self, transport):
250 """The Job context is entered on connect."""264 """The Job context is entered on connect."""
@@ -258,9 +272,9 @@
258272
259 @RunJobCommand.responder273 @RunJobCommand.responder
260 def runJobCommand(self, job_id):274 def runJobCommand(self, job_id):
261 """Run a job of this job_class according to its job id."""275 """Run a job from this job_source according to its job id."""
262 runner = BaseJobRunner()276 runner = BaseJobRunner()
263 job = self.job_class.get(job_id)277 job = self.job_source.get(job_id)
264 oops = runner.runJobHandleError(job)278 oops = runner.runJobHandleError(job)
265 if oops is None:279 if oops is None:
266 oops_id = ''280 oops_id = ''
@@ -269,28 +283,22 @@
269 return {'success': len(runner.completed_jobs), 'oops_id': oops_id}283 return {'success': len(runner.completed_jobs), 'oops_id': oops_id}
270284
271285
272class HUPProcessPool(pool.ProcessPool):
273 """A ProcessPool that kills with HUP."""
274
275 def _handleTimeout(self, child):
276 try:
277 child.transport.signalProcess(SIGHUP)
278 except error.ProcessExitedAlready:
279 pass
280
281
282class TwistedJobRunner(BaseJobRunner):286class TwistedJobRunner(BaseJobRunner):
283 """Run Jobs via twisted."""287 """Run Jobs via twisted."""
284288
285 def __init__(self, job_source, job_amp, logger=None, error_utility=None):289 def __init__(self, job_source, logger=None, error_utility=None):
286 starter = main.ProcessStarter(290 starter = main.ProcessStarter(
287 bootstrap=BOOTSTRAP, packages=('twisted', 'ampoule'),291 packages=('twisted', 'ampoule'),
288 env={'PYTHONPATH': os.environ['PYTHONPATH'],292 env={'PYTHONPATH': os.environ['PYTHONPATH'],
289 'PATH': os.environ['PATH'],293 'PATH': os.environ['PATH'],
290 'LPCONFIG': os.environ['LPCONFIG']})294 'LPCONFIG': os.environ['LPCONFIG']})
291 super(TwistedJobRunner, self).__init__(logger, error_utility)295 super(TwistedJobRunner, self).__init__(logger, error_utility)
292 self.job_source = job_source296 self.job_source = job_source
293 self.pool = HUPProcessPool(job_amp, starter=starter, min=0)297 import_name = '%s.%s' % (
298 removeSecurityProxy(job_source).__module__, job_source.__name__)
299 self.pool = pool.ProcessPool(
300 JobRunnerProcess, ampChildArgs=[import_name], starter=starter,
301 min=0, timeout_signal=SIGHUP)
294302
295 def runJobInSubprocess(self, job):303 def runJobInSubprocess(self, job):
296 """Run the job_class with the specified id in the process pool.304 """Run the job_class with the specified id in the process pool.
@@ -304,12 +312,9 @@
304 self.incomplete_jobs.append(job)312 self.incomplete_jobs.append(job)
305 return313 return
306 job_id = job.id314 job_id = job.id
307 timeout = job.getTimeout()315 deadline = timegm(job.lease_expires.timetuple())
308 # work around ampoule bug
309 if timeout == 0:
310 timeout = 0.0000000000001
311 deferred = self.pool.doWork(316 deferred = self.pool.doWork(
312 RunJobCommand, job_id = job_id, _timeout=timeout)317 RunJobCommand, job_id = job_id, _deadline=deadline)
313 def update(response):318 def update(response):
314 if response['success']:319 if response['success']:
315 self.completed_jobs.append(job)320 self.completed_jobs.append(job)
@@ -362,8 +367,7 @@
362 def runFromSource(cls, job_source, logger, error_utility=None):367 def runFromSource(cls, job_source, logger, error_utility=None):
363 """Run all ready jobs provided by the specified source."""368 """Run all ready jobs provided by the specified source."""
364 logger.info("Running through Twisted.")369 logger.info("Running through Twisted.")
365 runner = cls(job_source, removeSecurityProxy(job_source).amp, logger,370 runner = cls(job_source, logger, error_utility)
366 error_utility)
367 reactor.callWhenRunning(runner.runAll)371 reactor.callWhenRunning(runner.runAll)
368 handler = getsignal(SIGCHLD)372 handler = getsignal(SIGCHLD)
369 try:373 try:
@@ -396,22 +400,3 @@
396400
397 def __init__(self):401 def __init__(self):
398 Exception.__init__(self, "Job ran too long.")402 Exception.__init__(self, "Job ran too long.")
399
400
401BOOTSTRAP = """\
402import sys
403from twisted.application import reactors
404reactors.installReactor(sys.argv[-2])
405from lp.services.job.runner import bootstrap
406bootstrap(sys.argv[-1])
407"""
408
409def bootstrap(ampChildPath):
410 def handler(signum, frame):
411 raise TimeoutError
412 signal(SIGHUP, handler)
413 log.startLogging(sys.stderr)
414 ampChild = reflect.namedAny(ampChildPath)
415 stdio.StandardIO(ampChild(), 3, 4)
416 scripts.execute_zcml_for_scripts(use_web_security=False)
417 reactor.run()
418403
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2010-01-16 10:20:46 +0000
+++ lib/lp/services/job/tests/test_runner.py 2010-02-03 22:24:18 +0000
@@ -18,7 +18,7 @@
1818
19from lp.testing.mail_helpers import pop_notifications19from lp.testing.mail_helpers import pop_notifications
20from lp.services.job.runner import (20from lp.services.job.runner import (
21 JobRunner, BaseRunnableJob, JobRunnerProcess, TwistedJobRunner21 JobRunner, BaseRunnableJob, TwistedJobRunner
22)22)
23from lp.services.job.interfaces.job import JobStatus, IRunnableJob23from lp.services.job.interfaces.job import JobStatus, IRunnableJob
24from lp.services.job.model.job import Job24from lp.services.job.model.job import Job
@@ -283,14 +283,6 @@
283 sleep(30)283 sleep(30)
284284
285285
286class StuckJobProcess(JobRunnerProcess):
287
288 job_class = StuckJob
289
290
291StuckJob.amp = StuckJobProcess
292
293
294class ListLogger:286class ListLogger:
295287
296 def __init__(self):288 def __init__(self):
297289
=== modified file 'versions.cfg'
--- versions.cfg 2010-02-03 10:57:21 +0000
+++ versions.cfg 2010-02-03 22:24:18 +0000
@@ -4,11 +4,7 @@
4[versions]4[versions]
5# Alphabetical, case-insensitive, please! :-)5# Alphabetical, case-insensitive, please! :-)
66
7# from -r 3:lp:~launchpad/ampoule/launchpad-tweaked7ampoule = 0.2.0
8# To reproduce:
9# bzr export ampoule-0.1.0-lp-1.tar.gz lp:~launchpad/ampoule/launchpad-tweaked\
10# -r 3
11ampoule = 0.1.0-lp-1
12# Non-released bzr version from bzr+ssh://bazaar.launchpad.net/~mwhudson/bzr/2.1.0b4-lp28# Non-released bzr version from bzr+ssh://bazaar.launchpad.net/~mwhudson/bzr/2.1.0b4-lp2
13bzr = 2.1b4-lp29bzr = 2.1b4-lp2
14chameleon.core = 1.0b3510chameleon.core = 1.0b35
@@ -50,6 +46,7 @@
50PasteDeploy = 1.3.346PasteDeploy = 1.3.3
51pyasn1 = 0.0.9a47pyasn1 = 0.0.9a
52pycrypto = 2.0.148pycrypto = 2.0.1
49pyOpenSSL = 0.10
53python-memcached = 1.4550python-memcached = 1.45
54python-openid = 2.2.151python-openid = 2.2.1
55pytz = 2009l52pytz = 2009l