Merge lp:~jml/launchpad/test-timeout-505913 into lp:launchpad

Proposed by Jonathan Lange
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 12660
Proposed branch: lp:~jml/launchpad/test-timeout-505913
Merge into: lp:launchpad
Diff against target: 373 lines (+143/-62)
4 files modified
lib/lp/services/job/runner.py (+45/-11)
lib/lp/services/job/tests/test_job.py (+4/-9)
lib/lp/services/job/tests/test_runner.py (+93/-40)
lib/lp/testing/__init__.py (+1/-2)
To merge this branch: bzr merge lp:~jml/launchpad/test-timeout-505913
Reviewer Review Type Date Requested Status
Aaron Bentley (community) Approve
j.c.sackett (community) Approve
Review via email: mp+54598@code.launchpad.net

Commit message

[r=abentley,jcsackett][bug=505913] Fix the intermittent test failure in TestTwistedJobRunner.test_timeout by handling the case where the timeout occurs during framework code.

Description of the change

This branch, I believe, fixes one of the intermittently failing tests. The linked bug has most of the relevant details. Essentially, the timeout was happening at random times. When it happened while we were actually running the job, everything was good. When it happened during some of the surrounding code (e.g. while the child process was unmarshalling input to turn it into a potential request to do a job), then it was ignored.

This is because the timeout mechanism was to raise an exception. In Twisted, when a SIGHUP signal handler raises an exception, it just raises in the code that's currently running. If we're in the reactor loop at the time, then it is logged as an unhandled error.

Instead of raising an exception, this branch just exits the worker process with a particular exit code (TwistedJobRunner.TIMEOUT_CODE). When the runner sees a process exit with that code, we ignore the ProcessDone "failure" and log a timeout OOPS instead.

The testing strategy for this was to duplicate the test_timeout test that was failing intermittently. One of them would have the timeout bumped up a little (from 1 to 5) to make it more likely to exercise the case where the timeout happens during an actual job and not in surrounding machinery. The other has a much, much shorter timeout (0.05), which is guaranteed to hit the machinery.

In the branch, you'll see lots of other comments and cleanups. It's been a meandering experience, and I've tried to leave what I've learned marked on the walls of the labyrinth.

To post a comment you must log in.
Revision history for this message
j.c.sackett (jcsackett) wrote :

This looks good. I'm curious about one piece of code though:

+ def _logTimeout(self, job):
+ try:
+ raise TimeoutError
+ except TimeoutError:
+ oops = self._doOops(job, sys.exc_info())
+ self._logOopsId(oops.id)

Given all that's happening is an exception being raised in the try block, which immediately goes to the exception, couldn't this all be replaced with the following?

+ def _logTimeout(self, job):
+ oops = self._doOops(job, sys.exc_info())
+ self._logOopsId(oops.id)

Or is there some sort of async thing here I'm missing?

The code as written certainly works, so this isn't a blocker to approval; I'm just assuming I'm missing something, and want to know what.

review: Approve
Revision history for this message
Jonathan Lange (jml) wrote :

On Thu, Mar 24, 2011 at 3:44 PM, j.c.sackett
<email address hidden> wrote:
> Review: Approve
> This looks good. I'm curious about one piece of code though:
>
> +    def _logTimeout(self, job):
> +        try:
> +            raise TimeoutError
> +        except TimeoutError:
> +            oops = self._doOops(job, sys.exc_info())
> +            self._logOopsId(oops.id)
>
> Given all that's happening is an exception being raised in the try block, which immediately goes to the exception, couldn't this all be replaced with the following?
>
> +    def _logTimeout(self, job):
> +        oops = self._doOops(job, sys.exc_info())
> +        self._logOopsId(oops.id)
>
> Or is there some sort of async thing here I'm missing?
>

It's an error conversion thing, not an async thing. We're getting a
ProcessDone error, but we want to log a TimeoutError. That's all
that's going on. I'll add some comments to make this clearer.

Thanks!
jml

Revision history for this message
Aaron Bentley (abentley) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2011-02-09 17:43:54 +0000
+++ lib/lp/services/job/runner.py 2011-03-24 18:38:33 +0000
@@ -1,12 +1,10 @@
1# Copyright 2009 Canonical Ltd. This software is licensed under the1# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""Facilities for running Jobs."""4"""Facilities for running Jobs."""
55
6
7__metaclass__ = type6__metaclass__ = type
87
9
10__all__ = [8__all__ = [
11 'BaseRunnableJob',9 'BaseRunnableJob',
12 'JobCronScript',10 'JobCronScript',
@@ -303,8 +301,23 @@
303301
304 @classmethod302 @classmethod
305 def __enter__(cls):303 def __enter__(cls):
306
307 def handler(signum, frame):304 def handler(signum, frame):
305 # We raise an exception **and** schedule a call to exit the
306 # process hard. This is because we cannot rely on the exception
307 # being raised during useful code. Sometimes, it will be raised
308 # while the reactor is looping, which means that it will be
309 # ignored.
310 #
311 # If the exception is raised during the actual job, then we'll get
312 # a nice traceback indicating what timed out, and that will be
313 # logged as an OOPS.
314 #
315 # Regardless of where the exception is raised, we'll hard exit the
316 # process and have a TimeoutError OOPS logged, although that will
317 # have a crappy traceback. See the job_raised callback in
318 # TwistedJobRunner.runJobInSubprocess for the other half of that.
319 reactor.callFromThread(
320 reactor.callLater, 0, os._exit, TwistedJobRunner.TIMEOUT_CODE)
308 raise TimeoutError321 raise TimeoutError
309 scripts.execute_zcml_for_scripts(use_web_security=False)322 scripts.execute_zcml_for_scripts(use_web_security=False)
310 signal(SIGHUP, handler)323 signal(SIGHUP, handler)
@@ -340,6 +353,8 @@
340class TwistedJobRunner(BaseJobRunner):353class TwistedJobRunner(BaseJobRunner):
341 """Run Jobs via twisted."""354 """Run Jobs via twisted."""
342355
356 TIMEOUT_CODE = 42
357
343 def __init__(self, job_source, dbuser, logger=None, error_utility=None):358 def __init__(self, job_source, dbuser, logger=None, error_utility=None):
344 env = {'PATH': os.environ['PATH']}359 env = {'PATH': os.environ['PATH']}
345 for name in ('PYTHONPATH', 'LPCONFIG'):360 for name in ('PYTHONPATH', 'LPCONFIG'):
@@ -373,7 +388,7 @@
373 self.logger.debug(388 self.logger.debug(
374 'Running %r, lease expires %s', job, job.lease_expires)389 'Running %r, lease expires %s', job, job.lease_expires)
375 deferred = self.pool.doWork(390 deferred = self.pool.doWork(
376 RunJobCommand, job_id = job_id, _deadline=deadline)391 RunJobCommand, job_id=job_id, _deadline=deadline)
377392
378 def update(response):393 def update(response):
379 if response['success']:394 if response['success']:
@@ -387,17 +402,36 @@
387402
388 def job_raised(failure):403 def job_raised(failure):
389 self.incomplete_jobs.append(job)404 self.incomplete_jobs.append(job)
390 info = (failure.type, failure.value, failure.tb)405 exit_code = getattr(failure.value, 'exitCode', None)
391 oops = self._doOops(job, info)406 if exit_code == self.TIMEOUT_CODE:
392 self._logOopsId(oops.id)407 # The process ended with the error code that we have
408 # arbitrarily chosen to indicate a timeout. Rather than log
409 # that error (ProcessDone), we log a TimeoutError instead.
410 self._logTimeout(job)
411 else:
412 info = (failure.type, failure.value, failure.tb)
413 oops = self._doOops(job, info)
414 self._logOopsId(oops.id)
393 deferred.addCallbacks(update, job_raised)415 deferred.addCallbacks(update, job_raised)
394 return deferred416 return deferred
395417
418 def _logTimeout(self, job):
419 try:
420 raise TimeoutError
421 except TimeoutError:
422 oops = self._doOops(job, sys.exc_info())
423 self._logOopsId(oops.id)
424
396 def getTaskSource(self):425 def getTaskSource(self):
397 """Return a task source for all jobs in job_source."""426 """Return a task source for all jobs in job_source."""
398427
399 def producer():428 def producer():
400 while True:429 while True:
430 # XXX: JonathanLange bug=741204: If we're getting all of the
431 # jobs at the start anyway, we can use a DeferredSemaphore,
432 # instead of the more complex PollingTaskSource, which is
433 # better suited to cases where we don't know how much work
434 # there will be.
401 jobs = list(self.job_source.iterReady())435 jobs = list(self.job_source.iterReady())
402 if len(jobs) == 0:436 if len(jobs) == 0:
403 yield None437 yield None
@@ -407,9 +441,9 @@
407441
408 def doConsumer(self):442 def doConsumer(self):
409 """Create a ParallelLimitedTaskConsumer for this job type."""443 """Create a ParallelLimitedTaskConsumer for this job type."""
410 logger = logging.getLogger('gloop')444 # 1 is hard-coded for now until we're sure we'd get gains by running
411 logger.addHandler(logging.StreamHandler(sys.stdout))445 # more than one at a time. Note that test_timeout relies on this
412 logger.setLevel(logging.DEBUG)446 # being 1.
413 consumer = ParallelLimitedTaskConsumer(1, logger=None)447 consumer = ParallelLimitedTaskConsumer(1, logger=None)
414 return consumer.consume(self.getTaskSource())448 return consumer.consume(self.getTaskSource())
415449
416450
=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py 2010-10-04 19:50:45 +0000
+++ lib/lp/services/job/tests/test_job.py 2011-03-24 18:38:33 +0000
@@ -1,11 +1,10 @@
1# Copyright 2009 Canonical Ltd. This software is licensed under the1# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4__metaclass__ = type4__metaclass__ = type
55
6from datetime import datetime6from datetime import datetime
7import time7import time
8from unittest import TestLoader
98
10import pytz9import pytz
11from storm.locals import Store10from storm.locals import Store
@@ -18,7 +17,7 @@
18 MAIN_STORE,17 MAIN_STORE,
19 )18 )
20from canonical.launchpad.webapp.testing import verifyObject19from canonical.launchpad.webapp.testing import verifyObject
21from canonical.testing.layers import LaunchpadZopelessLayer20from canonical.testing.layers import ZopelessDatabaseLayer
22from lp.services.job.interfaces.job import (21from lp.services.job.interfaces.job import (
23 IJob,22 IJob,
24 JobStatus,23 JobStatus,
@@ -34,7 +33,7 @@
34class TestJob(TestCase):33class TestJob(TestCase):
35 """Ensure Job behaves as intended."""34 """Ensure Job behaves as intended."""
3635
37 layer = LaunchpadZopelessLayer36 layer = ZopelessDatabaseLayer
3837
39 def test_implements_IJob(self):38 def test_implements_IJob(self):
40 """Job should implement IJob."""39 """Job should implement IJob."""
@@ -211,7 +210,7 @@
211class TestReadiness(TestCase):210class TestReadiness(TestCase):
212 """Test the implementation of readiness."""211 """Test the implementation of readiness."""
213212
214 layer = LaunchpadZopelessLayer213 layer = ZopelessDatabaseLayer
215214
216 def _sampleData(self):215 def _sampleData(self):
217 store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)216 store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)
@@ -296,7 +295,3 @@
296 job = Job()295 job = Job()
297 job.acquireLease(-300)296 job.acquireLease(-300)
298 self.assertEqual(0, job.getTimeout())297 self.assertEqual(0, job.getTimeout())
299
300
301def test_suite():
302 return TestLoader().loadTestsFromName(__name__)
303298
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2010-12-23 00:38:29 +0000
+++ lib/lp/services/job/tests/test_runner.py 2011-03-24 18:38:33 +0000
@@ -7,19 +7,16 @@
7import sys7import sys
8from textwrap import dedent8from textwrap import dedent
9from time import sleep9from time import sleep
10from unittest import TestLoader
1110
12import transaction11import transaction
13from zope.component import getUtility
14from zope.interface import implements12from zope.interface import implements
1513
14from canonical.config import config
16from canonical.launchpad.webapp import errorlog15from canonical.launchpad.webapp import errorlog
17from canonical.launchpad.webapp.interfaces import (16from canonical.testing.layers import (
18 DEFAULT_FLAVOR,17 LaunchpadZopelessLayer,
19 IStoreSelector,18 ZopelessDatabaseLayer,
20 MAIN_STORE,
21 )19 )
22from canonical.testing.layers import LaunchpadZopelessLayer
23from lp.code.interfaces.branchmergeproposal import IUpdatePreviewDiffJobSource20from lp.code.interfaces.branchmergeproposal import IUpdatePreviewDiffJobSource
24from lp.services.job.interfaces.job import (21from lp.services.job.interfaces.job import (
25 IRunnableJob,22 IRunnableJob,
@@ -329,42 +326,68 @@
329326
330 done = False327 done = False
331328
329 # A list of jobs to run: id, lease_length, delay.
330 #
331 # For the first job, have a very long lease, so that it
332 # doesn't expire and so we soak up the ZCML loading time. For the
333 # second job, have a short lease so we hit the timeout.
334 jobs = [
335 (0, 10000, 0),
336 (1, 5, 30),
337 ]
338
332 @classmethod339 @classmethod
333 def iterReady(cls):340 def iterReady(cls):
334 if not cls.done:341 if not cls.done:
335 yield StuckJob(1)342 for id, lease_length, delay in cls.jobs:
336 yield StuckJob(2)343 yield cls(id, lease_length, delay)
337 cls.done = True344 cls.done = True
338345
339 @staticmethod346 @classmethod
340 def get(id):347 def get(cls, id):
341 return StuckJob(id)348 id, lease_length, delay = cls.jobs[id]
349 return cls(id, lease_length, delay)
342350
343 def __init__(self, id):351 def __init__(self, id, lease_length, delay):
344 self.id = id352 self.id = id
353 self.lease_length = lease_length
354 self.delay = delay
345 self.job = Job()355 self.job = Job()
346356
357 def __repr__(self):
358 return '<StuckJob(%r, lease_length=%s, delay=%s)>' % (
359 self.id, self.lease_length, self.delay)
360
347 def acquireLease(self):361 def acquireLease(self):
348 if self.id == 2:362 return self.job.acquireLease(self.lease_length)
349 lease_length = 1
350 else:
351 lease_length = 10000
352 return self.job.acquireLease(lease_length)
353363
354 def run(self):364 def run(self):
355 if self.id == 2:365 sleep(self.delay)
356 sleep(30)366
357 else:367
358 store = getUtility(IStoreSelector).get(MAIN_STORE, DEFAULT_FLAVOR)368class ShorterStuckJob(StuckJob):
359 assert (369 """Simulation of a job that stalls."""
360 'user=branchscanner' in store._connection._raw_connection.dsn)370
371 jobs = [
372 (0, 10000, 0),
373 (1, 0.05, 30),
374 ]
361375
362376
363class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):377class TestTwistedJobRunner(ZopeTestInSubProcess, TestCaseWithFactory):
364378
365 layer = LaunchpadZopelessLayer379 layer = ZopelessDatabaseLayer
366380
367 def test_timeout(self):381 def setUp(self):
382 super(TestTwistedJobRunner, self).setUp()
383 # The test relies on _pythonpath being importable. Thus we need to add
384 # a directory that contains _pythonpath to the sys.path. We can rely
385 # on the root directory of the checkout containing _pythonpath.
386 if config.root not in sys.path:
387 sys.path.append(config.root)
388 self.addCleanup(sys.path.remove, config.root)
389
390 def test_timeout_long(self):
368 """When a job exceeds its lease, an exception is raised.391 """When a job exceeds its lease, an exception is raised.
369392
370 Unfortunately, timeouts include the time it takes for the zope393 Unfortunately, timeouts include the time it takes for the zope
@@ -373,18 +396,52 @@
373 """396 """
374 logger = BufferLogger()397 logger = BufferLogger()
375 logger.setLevel(logging.INFO)398 logger.setLevel(logging.INFO)
399 # StuckJob is actually a source of two jobs. The first is fast, the
400 # second slow.
376 runner = TwistedJobRunner.runFromSource(401 runner = TwistedJobRunner.runFromSource(
377 StuckJob, 'branchscanner', logger)402 StuckJob, 'branchscanner', logger)
378403
379 self.assertEqual(1, len(runner.completed_jobs))404 # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
380 self.assertEqual(1, len(runner.incomplete_jobs))405 # condition. Another OOPS could be logged. Also confusing because it
381 oops = errorlog.globalErrorUtility.getLastOopsReport()406 # might be polluted by values from previous jobs.
382 self.assertEqual(dedent("""\407 oops = errorlog.globalErrorUtility.getLastOopsReport()
383 INFO Running through Twisted.408 self.assertEqual(
384 INFO Job resulted in OOPS: %s409 (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
385 """) % oops.id, logger.getLogBuffer())410 self.assertEqual(
386 self.assertEqual('TimeoutError', oops.type)411 (dedent("""\
387 self.assertIn('Job ran too long.', oops.value)412 INFO Running through Twisted.
413 INFO Job resulted in OOPS: %s
414 """) % oops.id,
415 'TimeoutError', 'Job ran too long.'),
416 (logger.getLogBuffer(), oops.type, oops.value))
417
418 def test_timeout_short(self):
419 """When a job exceeds its lease, an exception is raised.
420
421 Unfortunately, timeouts include the time it takes for the zope
422 machinery to start up, so we run a job that will not time out first,
423 followed by a job that is sure to time out.
424 """
425 logger = BufferLogger()
426 logger.setLevel(logging.INFO)
427 # StuckJob is actually a source of two jobs. The first is fast, the
428 # second slow.
429 runner = TwistedJobRunner.runFromSource(
430 ShorterStuckJob, 'branchscanner', logger)
431
432 # XXX: JonathanLange 2011-03-23 bug=740443: Potential source of race
433 # condition. Another OOPS could be logged. Also confusing because it
434 # might be polluted by values from previous jobs.
435 oops = errorlog.globalErrorUtility.getLastOopsReport()
436 self.assertEqual(
437 (1, 1), (len(runner.completed_jobs), len(runner.incomplete_jobs)))
438 self.assertEqual(
439 (dedent("""\
440 INFO Running through Twisted.
441 INFO Job resulted in OOPS: %s
442 """) % oops.id,
443 'TimeoutError', 'Job ran too long.'),
444 (logger.getLogBuffer(), oops.type, oops.value))
388445
389446
390class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):447class TestJobCronScript(ZopeTestInSubProcess, TestCaseWithFactory):
@@ -425,7 +482,3 @@
425 cronscript.main()482 cronscript.main()
426 finally:483 finally:
427 errorlog.globalErrorUtility = old_errorlog484 errorlog.globalErrorUtility = old_errorlog
428
429
430def test_suite():
431 return TestLoader().loadTestsFromName(__name__)
432485
=== modified file 'lib/lp/testing/__init__.py'
--- lib/lp/testing/__init__.py 2011-03-21 21:06:46 +0000
+++ lib/lp/testing/__init__.py 2011-03-24 18:38:33 +0000
@@ -12,7 +12,6 @@
12 'api_url',12 'api_url',
13 'build_yui_unittest_suite',13 'build_yui_unittest_suite',
14 'BrowserTestCase',14 'BrowserTestCase',
15 'capture_events',
16 'celebrity_logged_in',15 'celebrity_logged_in',
17 'FakeTime',16 'FakeTime',
18 'get_lsb_information',17 'get_lsb_information',
@@ -29,7 +28,7 @@
29 'normalize_whitespace',28 'normalize_whitespace',
30 'oauth_access_token_for',29 'oauth_access_token_for',
31 'person_logged_in',30 'person_logged_in',
32 'quote_jquery_expression'31 'quote_jquery_expression',
33 'record_statements',32 'record_statements',
34 'run_with_login',33 'run_with_login',
35 'run_with_storm_debug',34 'run_with_storm_debug',