Merge lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup into lp:launchpad

Proposed by Colin Watson on 2018-07-26
Status: Merged
Merged at revision: 18739
Proposed branch: lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup
Merge into: lp:launchpad
Diff against target: 47 lines (+20/-9)
1 file modified
lib/lp/services/ (+20/-9)
To merge this branch: bzr merge lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup
Reviewer Review Type Date Requested Status
William Grant code 2018-07-26 Approve on 2018-07-30
Review via email:

Commit message

Clean up with_timeout worker thread upon receiving SoftTimeLimitExceeded from celery.

Description of the change

I spent ages trying to construct unit tests for this, but I couldn't manage to make it work: mocks don't propagate into celery worker processes. I have at least tested it in a local deployment and confirmed that, before this patch, a request from a job that received SoftTimeLimitExceeded emitted a log entry some time later indicating that it hadn't been cancelled, while with this patch that doesn't happen.

I'm not certain that this is what caused the celery master to stop dispatching workers, since I couldn't reproduce that particular failure mode locally, but this seems like a possible cause and worth trying.

To post a comment you must log in.
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/'
2--- lib/lp/services/ 2018-07-13 12:48:19 +0000
3+++ lib/lp/services/ 2018-07-26 14:58:23 +0000
4@@ -202,6 +202,16 @@
6 def __call__(self, f):
7 """Wraps the method."""
8+ def cleanup(t, args):
9+ if self.cleanup is not None:
10+ if isinstance(self.cleanup, basestring):
11+ # 'self' will be first positional argument.
12+ getattr(args[0], self.cleanup)()
13+ else:
14+ self.cleanup()
15+ # Collect cleaned-up worker thread.
16+ t.join()
18 def call_with_timeout(*args, **kwargs):
19 # Ensure that we have a timeout before we start the thread
20 timeout = self.timeout
21@@ -214,16 +224,17 @@
22 timeout = timeout()
23 t = ThreadCapturingResult(f, args, kwargs)
24 t.start()
25- t.join(timeout)
26+ try:
27+ t.join(timeout)
28+ except Exception as e:
29+ # This will commonly be SoftTimeLimitExceeded from celery,
30+ # since celery's timeout often happens before the job's due
31+ # to job setup time.
32+ if t.isAlive():
33+ cleanup(t, args)
34+ raise
35 if t.isAlive():
36- if self.cleanup is not None:
37- if isinstance(self.cleanup, basestring):
38- # 'self' will be first positional argument.
39- getattr(args[0], self.cleanup)()
40- else:
41- self.cleanup()
42- # Collect cleaned-up worker thread.
43- t.join()
44+ cleanup(t, args)
45 raise TimeoutError("timeout exceeded.")
46 if getattr(t, 'exc_info', None) is not None:
47 exc_info = t.exc_info