Merge lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup into lp:launchpad

Proposed by Colin Watson on 2018-07-26
Status: Merged
Merged at revision: 18739
Proposed branch: lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup
Merge into: lp:launchpad
Diff against target: 47 lines (+20/-9)
1 file modified
lib/lp/services/ (+20/-9)
To merge this branch: bzr merge lp:~cjwatson/launchpad/celery-soft-timeout-improve-cleanup
Reviewer Review Type Date Requested Status
William Grant code 2018-07-26 Approve on 2018-07-30
Review via email:

Commit message

Clean up with_timeout worker thread upon receiving SoftTimeLimitExceeded from celery.

Description of the change

I spent ages trying to construct unit tests for this, but I couldn't manage to make it work: mocks don't propagate into celery worker processes. I have at least tested it in a local deployment and confirmed that, before this patch, a request from a job that received SoftTimeLimitExceeded emitted a log entry some time later indicating that it hadn't been cancelled, while with this patch that doesn't happen.

I'm not certain that this is what caused the celery master to stop dispatching workers, since I couldn't reproduce that particular failure mode locally, but this seems like a possible cause and worth trying.

To post a comment you must log in.
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/services/'
--- lib/lp/services/ 2018-07-13 12:48:19 +0000
+++ lib/lp/services/ 2018-07-26 14:58:23 +0000
@@ -202,6 +202,16 @@
203 def __call__(self, f):203 def __call__(self, f):
204 """Wraps the method."""204 """Wraps the method."""
205 def cleanup(t, args):
206 if self.cleanup is not None:
207 if isinstance(self.cleanup, basestring):
208 # 'self' will be first positional argument.
209 getattr(args[0], self.cleanup)()
210 else:
211 self.cleanup()
212 # Collect cleaned-up worker thread.
213 t.join()
205 def call_with_timeout(*args, **kwargs):215 def call_with_timeout(*args, **kwargs):
206 # Ensure that we have a timeout before we start the thread216 # Ensure that we have a timeout before we start the thread
207 timeout = self.timeout217 timeout = self.timeout
@@ -214,16 +224,17 @@
214 timeout = timeout()224 timeout = timeout()
215 t = ThreadCapturingResult(f, args, kwargs)225 t = ThreadCapturingResult(f, args, kwargs)
216 t.start()226 t.start()
217 t.join(timeout)227 try:
228 t.join(timeout)
229 except Exception as e:
230 # This will commonly be SoftTimeLimitExceeded from celery,
231 # since celery's timeout often happens before the job's due
232 # to job setup time.
233 if t.isAlive():
234 cleanup(t, args)
235 raise
218 if t.isAlive():236 if t.isAlive():
219 if self.cleanup is not None:237 cleanup(t, args)
220 if isinstance(self.cleanup, basestring):
221 # 'self' will be first positional argument.
222 getattr(args[0], self.cleanup)()
223 else:
224 self.cleanup()
225 # Collect cleaned-up worker thread.
226 t.join()
227 raise TimeoutError("timeout exceeded.")238 raise TimeoutError("timeout exceeded.")
228 if getattr(t, 'exc_info', None) is not None:239 if getattr(t, 'exc_info', None) is not None:
229 exc_info = t.exc_info240 exc_info = t.exc_info