Merge lp:~maxb/udd/modular-threads into lp:udd
- modular-threads
- Merge into import-scripts
Status: | Merged |
---|---|
Approved by: | Vincent Ladeuil |
Approved revision: | 467 |
Merged at revision: | 466 |
Proposed branch: | lp:~maxb/udd/modular-threads |
Merge into: | lp:udd |
Diff against target: |
669 lines (+256/-258) 5 files modified
mass_import.py (+3/-5) selftest.py (+1/-1) udd/icommon.py (+0/-240) udd/tests/test_threads.py (+10/-12) udd/threads.py (+242/-0) |
To merge this branch: | bzr merge lp:~maxb/udd/modular-threads |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Vincent Ladeuil | Approve | ||
Ubuntu Distributed Development Developers | Pending | ||
Review via email: mp+64640@code.launchpad.net |
Commit message
Description of the change
A chunk of splitting up icommon.
I ran a local mass_import.py to check it still worked.
John A Meinel (jameinel) wrote : | # |
Vincent Ladeuil (vila) wrote : | # |
Please don't mix up refactoring and changes like:
40 - self.driver.
41 + self.driver.
What's the point of this change ? The comment above this line
clearly states that the value has been tweaked to match user
expectations. In this case, you're making the user waits up to 5
seconds before getting some feedback that the importer will stop
and there have been cases where a losa was trigger happy and
killed the job instead of letting it finish properly which lost
some valuable feedback (I don't remember the exact details, only
that killing wasn't the right solution).
I don't mind discussing this kind of change if you feel strongly
about it but let's do that in a specific proposal then.
For the rest, I prefer importing modules rather than symbols, but...
well, I won't block on this.
So feel free to land the rest.
Max Bowsher (maxb) wrote : | # |
Whoops!
That was a temporary local hack whilst trying to figure out why mass_import was tight-looping for me locally. It should never have been committed.
Max Bowsher (maxb) wrote : | # |
> I prefer importing modules rather than symbols, but...
> well, I won't block on this.
I do not have any strong preference in this case, so I am quite happy to go along with yours here.
I only feel strongly about importing symbols directly when the same symbol is used many times, and the advantage of brevity is significant.
Preview Diff
1 | === modified file 'mass_import.py' |
2 | --- mass_import.py 2011-06-15 00:28:18 +0000 |
3 | +++ mass_import.py 2011-06-15 14:39:30 +0000 |
4 | @@ -17,9 +17,7 @@ |
5 | |
6 | from udd import icommon |
7 | from udd import iconfig |
8 | - |
9 | -#import httplib2 |
10 | -#httplib2.debuglevel = 1 |
11 | +from udd import threads |
12 | |
13 | |
14 | class WatchedFileHandler(logging.FileHandler): |
15 | @@ -129,7 +127,7 @@ |
16 | ''.join(traceback.format_exception(exc_class, exc_value, |
17 | exc_tb)))) |
18 | |
19 | -class Importer(icommon.SubprocessMonitor): |
20 | +class Importer(threads.SubprocessMonitor): |
21 | |
22 | def __init__(self, args, package_name, job_id): |
23 | # FIXME: We should put the max_duration in a config file when it |
24 | @@ -223,7 +221,7 @@ |
25 | self.tried = set() |
26 | |
27 | |
28 | -class ImportDriver(icommon.ThreadDriver): |
29 | +class ImportDriver(threads.ThreadDriver): |
30 | """Monitor the ThreadedImporter. |
31 | |
32 | This includes planning and spawning imports, tracking their success or |
33 | |
34 | === modified file 'selftest.py' |
35 | --- selftest.py 2011-06-14 23:58:08 +0000 |
36 | +++ selftest.py 2011-06-15 14:39:30 +0000 |
37 | @@ -44,10 +44,10 @@ |
38 | 'udd.tests.test_config', |
39 | 'udd.tests.test_import_list', |
40 | 'udd.tests.test_import_package', |
41 | - 'udd.tests.test_mass_import', |
42 | 'udd.tests.test_package_to_import', |
43 | 'udd.tests.test_revid_database', |
44 | 'udd.tests.test_status_database', |
45 | + 'udd.tests.test_threads', |
46 | ])) |
47 | self.test = suite |
48 | |
49 | |
50 | === modified file 'udd/icommon.py' |
51 | --- udd/icommon.py 2011-06-15 06:54:16 +0000 |
52 | +++ udd/icommon.py 2011-06-15 14:39:30 +0000 |
53 | @@ -1,23 +1,17 @@ |
54 | -#!/usr/bin/python |
55 | - |
56 | import cgi |
57 | import datetime |
58 | import errno |
59 | import fcntl |
60 | import operator |
61 | import os |
62 | -import Queue |
63 | import random |
64 | import re |
65 | import shutil |
66 | import simplejson |
67 | -import signal |
68 | import socket |
69 | import sqlite3 |
70 | import StringIO |
71 | -import subprocess |
72 | import sys |
73 | -import threading |
74 | import time |
75 | import urllib |
76 | import urllib2 |
77 | @@ -42,12 +36,6 @@ |
78 | ) |
79 | from bzrlib.trace import mutter |
80 | |
81 | -try: |
82 | - from bzrlib import cethread |
83 | -except ImportError: |
84 | - # Use our copy then |
85 | - from udd import cethread |
86 | - |
87 | from launchpadlib.credentials import Credentials |
88 | from launchpadlib.errors import HTTPError |
89 | from launchpadlib.launchpad import Launchpad, EDGE_SERVICE_ROOT |
90 | @@ -2007,234 +1995,6 @@ |
91 | f.close() |
92 | |
93 | |
94 | -class CatchingExceptionThread(cethread.CatchingExceptionThread): |
95 | - """Isolating features that needs to be backported to bzrlib. |
96 | - |
97 | - None so far. |
98 | - """ |
99 | - |
100 | - |
101 | -class SubprocessMonitor(CatchingExceptionThread): |
102 | - |
103 | - def __init__(self, args, max_duration=None): |
104 | - """Create a SubprocessMonitor. |
105 | - |
106 | - :param args: A list of arguments to give to python. |
107 | - |
108 | - :param max_duration: The max number of seconds the command is expected |
109 | - to be run (elapsed time). |
110 | - """ |
111 | - self.started = threading.Event() |
112 | - self.stopped = threading.Event() |
113 | - super(SubprocessMonitor, self).__init__(target=self.spawn, |
114 | - sync_event=self.started) |
115 | - self.args = args |
116 | - self.proc_pid = None |
117 | - self.retcode = None |
118 | - self.out = self.err = None |
119 | - self.started_at = self.ended_at = None |
120 | - self.max_duration = max_duration |
121 | - self.killed_with = None |
122 | - self.killed_at = None |
123 | - |
124 | - def spawn(self): |
125 | - """Spawn the python command in a subprocess.""" |
126 | - proc = subprocess.Popen(['python'] + self.args, |
127 | - executable=sys.executable, |
128 | - stdout=subprocess.PIPE, |
129 | - stderr=subprocess.PIPE, |
130 | - stdin=subprocess.PIPE) |
131 | - self.proc_pid = proc.pid |
132 | - self.started_at = time.time() |
133 | - self.switch_and_set(self.stopped) |
134 | - try: |
135 | - self.out, self.err = proc.communicate() |
136 | - self.ended_at = time.time() |
137 | - self.retcode = proc.returncode |
138 | - except OSError, e: |
139 | - if e.errno in (errno.ECHILD, errno.ESRCH): |
140 | - # The process doesn't exist anymore |
141 | - pass |
142 | - else: |
143 | - raise |
144 | - self.stopped.set() |
145 | - |
146 | - def collect(self, timeout=None): |
147 | - """Collect relevant data before discarding the thread. |
148 | - |
149 | - This must be called by the controlling thread. Be aware that since this |
150 | - calls join() it may raise the pending exception which must be cleared |
151 | - if this behaviour is not the intended one. |
152 | - |
153 | - Daughter classes can override this method to collect any relevant data |
154 | - (including the pending exception) before the thread is discarded. |
155 | - """ |
156 | - self.join(timeout) |
157 | - |
158 | - def kill_with_escalation(self, grace_period=None): |
159 | - """Attempt to kill the subprocess. |
160 | - |
161 | - :param grace_period: A floating number of seconds to wait before using |
162 | - SIGKILL to kill the subprocess. |
163 | - |
164 | - When first called, `signal.SIGTERM` is used to allow the subprocess to |
165 | - handle the kill cleanly. If called repeatedly, this will not try to |
166 | - kill the process again until the end of the specified |
167 | - `grace_period`. When the grace period expires, `signal.SIGKILL` is used |
168 | - so that subprocesses that don't shutdown when receiving |
169 | - `signal.SIGTERM` can still be shut down. |
170 | - |
171 | - This must be called by the controlling thread. |
172 | - """ |
173 | - killed = False |
174 | - if self.killed_with is None: |
175 | - killed = self._kill(signal.SIGTERM) |
176 | - else: |
177 | - # We already tried to kill this this subprocess, if it didn't |
178 | - # during the grace period, we'll kill it harder |
179 | - now = time.time() |
180 | - if grace_period is None: |
181 | - # FIXME: This should go into a config file -- vila 2011-03-18 |
182 | - # 10 seconds should be more than enough for a process to die or |
183 | - # something wrong is happening in which case it's better to get |
184 | - # this problematic subprocess down as quickly as possible. |
185 | - grace_period = 10.0 |
186 | - if now - self.killed_at > grace_period: |
187 | - killed = self._kill(signal.SIGKILL) |
188 | - return killed |
189 | - |
190 | - def _kill(self, signum): |
191 | - killed = False |
192 | - # Don't attempt to kill the subprocess if it hasn't been started |
193 | - if self.proc_pid is not None: |
194 | - try: |
195 | - os.kill(self.proc_pid, signum) |
196 | - killed = True |
197 | - self.killed_with = signum |
198 | - self.killed_at = time.time() |
199 | - except OSError, e: |
200 | - if e.errno in (errno.ECHILD, errno.ESRCH): |
201 | - # The process doesn't exist anymore. |
202 | - pass |
203 | - else: |
204 | - raise |
205 | - return killed |
206 | - |
207 | - def has_exceeded_time_quota(self, duration=None): |
208 | - """Is the thread running for too long ? |
209 | - |
210 | - :param duration: If specified will override the ``max_duration`` |
211 | - attribute. |
212 | - """ |
213 | - if duration is None: |
214 | - duration = self.max_duration |
215 | - if (duration is not None |
216 | - and self.started_at is not None and self.ended_at is None): |
217 | - # A timeout is defined, the thread has been started but is not |
218 | - # finished yet |
219 | - if (time.time() - self.started_at) > self.max_duration: |
220 | - return True |
221 | - return False |
222 | - |
223 | - |
224 | -class ThreadDriver(CatchingExceptionThread): |
225 | - |
226 | - def __init__(self, queue, max_threads, target=None): |
227 | - """Create a ThreadDriver. |
228 | - |
229 | - :param queue: A thread provider. |
230 | - |
231 | - :param max_threads: Maximum number of threads allowed to run |
232 | - concurrently. |
233 | - |
234 | - :param target: What callable should be run in the thread. |
235 | - """ |
236 | - self.started = threading.Event() |
237 | - self.stopped = threading.Event() |
238 | - self.stop = threading.Event() |
239 | - if target is None: |
240 | - target = self.drive |
241 | - super(ThreadDriver, self).__init__(target=target, |
242 | - sync_event=self.started) |
243 | - self.queue = queue |
244 | - self.queue_closed = threading.Event() |
245 | - self.max_threads = max_threads |
246 | - self.threads = [] |
247 | - |
248 | - def drive(self): |
249 | - self.before_start() |
250 | - # From now on the caller will let us run until we stop or die |
251 | - self.switch_and_set(self.stopped) |
252 | - while True: |
253 | - # Calling wait ensures we can be interrupted properly, but we don't |
254 | - # want to block so we use a timeout. We don't use 0 though or other |
255 | - # threads may not get any cycle otherwise. |
256 | - self.stop.wait(0.001) |
257 | - if self.stop.isSet(): |
258 | - break |
259 | - # Check for threads exceeding their expected lifetime |
260 | - self.check_time_quota() |
261 | - self.do_one_step() |
262 | - self.before_stop() |
263 | - self.stopped.set() |
264 | - |
265 | - def before_start(self): |
266 | - pass |
267 | - |
268 | - def check_time_quota(self): |
269 | - """Ensure threads don't exceed their elapsed time quota. |
270 | - |
271 | - :returns: The list of killed threads. |
272 | - """ |
273 | - killed = [] |
274 | - for t in self.threads: |
275 | - if t.has_exceeded_time_quota(): |
276 | - # Die you sucker |
277 | - t.kill_with_escalation() |
278 | - killed.append(t) |
279 | - return killed |
280 | - |
281 | - def do_one_step(self): |
282 | - self.collect_terminated_threads() |
283 | - if (not self.stop.isSet() and not self.queue_closed.isSet() |
284 | - and len(self.threads) < self.max_threads): |
285 | - self.start_new_thread() |
286 | - if self.queue_closed.isSet() and not self.threads: |
287 | - # The queue is closed and there are no more running threads |
288 | - self.stop.set() |
289 | - |
290 | - def before_stop(self): |
291 | - self.kill_remaining_threads() |
292 | - |
293 | - def start_new_thread(self): |
294 | - # Get a new thread from the queue and start it |
295 | - try: |
296 | - new_thread = self.queue.get_nowait() |
297 | - except Queue.Empty: |
298 | - return None |
299 | - self.threads.append(new_thread) |
300 | - new_thread.start() |
301 | - new_thread.started.wait() |
302 | - # Tell the queue we're done with this thread |
303 | - self.queue.task_done() |
304 | - return new_thread |
305 | - |
306 | - def collect_terminated_threads(self): |
307 | - # If there are terminated threads, collect them |
308 | - for t in self.threads[:]: |
309 | - if not t.isAlive(): |
310 | - t.collect() |
311 | - self.threads.remove(t) |
312 | - |
313 | - def kill_remaining_threads(self): |
314 | - # Kill all the remaining threads |
315 | - for t in self.threads[:]: |
316 | - t.kill_with_escalation() |
317 | - # Collect the thread whether we could kill it or not |
318 | - t.collect() |
319 | - self.threads.remove(t) |
320 | - |
321 | - |
322 | def refresh_possible_transports(possible_transports): |
323 | """Prune stale connections from possible_transports. |
324 | |
325 | |
326 | === renamed file 'udd/tests/test_mass_import.py' => 'udd/tests/test_threads.py' |
327 | --- udd/tests/test_mass_import.py 2011-06-15 00:28:18 +0000 |
328 | +++ udd/tests/test_threads.py 2011-06-15 14:39:30 +0000 |
329 | @@ -4,11 +4,9 @@ |
330 | import threading |
331 | import time |
332 | |
333 | - |
334 | from bzrlib import tests |
335 | |
336 | - |
337 | -from udd import icommon |
338 | +from udd import threads |
339 | |
340 | |
341 | def get_test_script_path(name): |
342 | @@ -28,7 +26,7 @@ |
343 | |
344 | def run_in_subprocess(self, args, klass=None): |
345 | if klass is None: |
346 | - klass = icommon.SubprocessMonitor |
347 | + klass = threads.SubprocessMonitor |
348 | sub = klass(args) |
349 | self.addCleanup(sub.join, 0) |
350 | sub.start() |
351 | @@ -83,7 +81,7 @@ |
352 | def test_kill_catch_zombie(self): |
353 | kill_me = threading.Event() |
354 | resume = threading.Event() |
355 | - class TestMonitor(icommon.SubprocessMonitor): |
356 | + class TestMonitor(threads.SubprocessMonitor): |
357 | |
358 | def switch_and_set(self, new): |
359 | super(TestMonitor, self).switch_and_set(new) |
360 | @@ -121,7 +119,7 @@ |
361 | |
362 | def run_in_subprocess(self, args, klass=None): |
363 | if klass is None: |
364 | - klass = icommon.SubprocessMonitor |
365 | + klass = threads.SubprocessMonitor |
366 | sub = klass(args) |
367 | self.addCleanup(sub.join, 0) |
368 | sub.start() |
369 | @@ -187,7 +185,7 @@ |
370 | self.assertEquals(sub.out, 'Try harder\n') |
371 | |
372 | |
373 | -class Sleeper(icommon.SubprocessMonitor): |
374 | +class Sleeper(threads.SubprocessMonitor): |
375 | |
376 | def __init__(self, sleep_time=0.001, max_duration=None): |
377 | # sleep can't be killed (or interrupted) so we don't sleep for long but |
378 | @@ -206,7 +204,7 @@ |
379 | def get_driver(self, *args, **kwargs): |
380 | klass = kwargs.pop('klass', None) |
381 | if klass is None: |
382 | - klass = icommon.ThreadDriver |
383 | + klass = threads.ThreadDriver |
384 | return klass(*args, **kwargs) |
385 | |
386 | def get_started_driver(self, target=None, override=None, queue=None, |
387 | @@ -286,7 +284,7 @@ |
388 | self.started = threading.Event() |
389 | self.resumed = threading.Event() |
390 | self.terminated = threading.Event() |
391 | - class TestThread(icommon.SubprocessMonitor): |
392 | + class TestThread(threads.SubprocessMonitor): |
393 | |
394 | def collect(thread): |
395 | self.assertIs(self.thread, thread) |
396 | @@ -297,7 +295,7 @@ |
397 | q = Queue.Queue(1) |
398 | q.put(self.thread) |
399 | |
400 | - class TestDriver(icommon.ThreadDriver): |
401 | + class TestDriver(threads.ThreadDriver): |
402 | |
403 | def start_new_thread(driver): |
404 | t = super(TestDriver, driver).start_new_thread() |
405 | @@ -326,7 +324,7 @@ |
406 | self.assertLength(0, driver.threads) |
407 | |
408 | def test_thread_exception_raised_in_driver(self): |
409 | - class TestThread(icommon.SubprocessMonitor): |
410 | + class TestThread(threads.SubprocessMonitor): |
411 | |
412 | def collect(thread): |
413 | self.assertEquals(1, thread.retcode) |
414 | @@ -436,7 +434,7 @@ |
415 | self.addCleanup(sleeper.kill_with_escalation) |
416 | q.put(sleeper) |
417 | self.killed = None |
418 | - class TestDriver(icommon.ThreadDriver): |
419 | + class TestDriver(threads.ThreadDriver): |
420 | |
421 | def check_time_quota(driver): |
422 | killed = super(TestDriver, driver).check_time_quota() |
423 | |
424 | === added file 'udd/threads.py' |
425 | --- udd/threads.py 1970-01-01 00:00:00 +0000 |
426 | +++ udd/threads.py 2011-06-15 14:39:30 +0000 |
427 | @@ -0,0 +1,242 @@ |
428 | +import errno |
429 | +import os |
430 | +import Queue |
431 | +import signal |
432 | +import subprocess |
433 | +import sys |
434 | +import threading |
435 | +import time |
436 | + |
437 | +try: |
438 | + from bzrlib import cethread |
439 | +except ImportError: |
440 | + # Use our copy then |
441 | + from udd import cethread |
442 | + |
443 | + |
444 | +class CatchingExceptionThread(cethread.CatchingExceptionThread): |
445 | + """Isolating features that needs to be backported to bzrlib. |
446 | + |
447 | + None so far. |
448 | + """ |
449 | + |
450 | + |
451 | +class SubprocessMonitor(CatchingExceptionThread): |
452 | + |
453 | + def __init__(self, args, max_duration=None): |
454 | + """Create a SubprocessMonitor. |
455 | + |
456 | + :param args: A list of arguments to give to python. |
457 | + |
458 | + :param max_duration: The max number of seconds the command is expected |
459 | + to be run (elapsed time). |
460 | + """ |
461 | + self.started = threading.Event() |
462 | + self.stopped = threading.Event() |
463 | + super(SubprocessMonitor, self).__init__(target=self.spawn, |
464 | + sync_event=self.started) |
465 | + self.args = args |
466 | + self.proc_pid = None |
467 | + self.retcode = None |
468 | + self.out = self.err = None |
469 | + self.started_at = self.ended_at = None |
470 | + self.max_duration = max_duration |
471 | + self.killed_with = None |
472 | + self.killed_at = None |
473 | + |
474 | + def spawn(self): |
475 | + """Spawn the python command in a subprocess.""" |
476 | + proc = subprocess.Popen(['python'] + self.args, |
477 | + executable=sys.executable, |
478 | + stdout=subprocess.PIPE, |
479 | + stderr=subprocess.PIPE, |
480 | + stdin=subprocess.PIPE) |
481 | + self.proc_pid = proc.pid |
482 | + self.started_at = time.time() |
483 | + self.switch_and_set(self.stopped) |
484 | + try: |
485 | + self.out, self.err = proc.communicate() |
486 | + self.ended_at = time.time() |
487 | + self.retcode = proc.returncode |
488 | + except OSError, e: |
489 | + if e.errno in (errno.ECHILD, errno.ESRCH): |
490 | + # The process doesn't exist anymore |
491 | + pass |
492 | + else: |
493 | + raise |
494 | + self.stopped.set() |
495 | + |
496 | + def collect(self, timeout=None): |
497 | + """Collect relevant data before discarding the thread. |
498 | + |
499 | + This must be called by the controlling thread. Be aware that since this |
500 | + calls join() it may raise the pending exception which must be cleared |
501 | + if this behaviour is not the intended one. |
502 | + |
503 | + Daughter classes can override this method to collect any relevant data |
504 | + (including the pending exception) before the thread is discarded. |
505 | + """ |
506 | + self.join(timeout) |
507 | + |
508 | + def kill_with_escalation(self, grace_period=None): |
509 | + """Attempt to kill the subprocess. |
510 | + |
511 | + :param grace_period: A floating number of seconds to wait before using |
512 | + SIGKILL to kill the subprocess. |
513 | + |
514 | + When first called, `signal.SIGTERM` is used to allow the subprocess to |
515 | + handle the kill cleanly. If called repeatedly, this will not try to |
516 | + kill the process again until the end of the specified |
517 | + `grace_period`. When the grace period expires, `signal.SIGKILL` is used |
518 | + so that subprocesses that don't shutdown when receiving |
519 | + `signal.SIGTERM` can still be shut down. |
520 | + |
521 | + This must be called by the controlling thread. |
522 | + """ |
523 | + killed = False |
524 | + if self.killed_with is None: |
525 | + killed = self._kill(signal.SIGTERM) |
526 | + else: |
527 | + # We already tried to kill this this subprocess, if it didn't |
528 | + # during the grace period, we'll kill it harder |
529 | + now = time.time() |
530 | + if grace_period is None: |
531 | + # FIXME: This should go into a config file -- vila 2011-03-18 |
532 | + # 10 seconds should be more than enough for a process to die or |
533 | + # something wrong is happening in which case it's better to get |
534 | + # this problematic subprocess down as quickly as possible. |
535 | + grace_period = 10.0 |
536 | + if now - self.killed_at > grace_period: |
537 | + killed = self._kill(signal.SIGKILL) |
538 | + return killed |
539 | + |
540 | + def _kill(self, signum): |
541 | + killed = False |
542 | + # Don't attempt to kill the subprocess if it hasn't been started |
543 | + if self.proc_pid is not None: |
544 | + try: |
545 | + os.kill(self.proc_pid, signum) |
546 | + killed = True |
547 | + self.killed_with = signum |
548 | + self.killed_at = time.time() |
549 | + except OSError, e: |
550 | + if e.errno in (errno.ECHILD, errno.ESRCH): |
551 | + # The process doesn't exist anymore. |
552 | + pass |
553 | + else: |
554 | + raise |
555 | + return killed |
556 | + |
557 | + def has_exceeded_time_quota(self, duration=None): |
558 | + """Is the thread running for too long ? |
559 | + |
560 | + :param duration: If specified will override the ``max_duration`` |
561 | + attribute. |
562 | + """ |
563 | + if duration is None: |
564 | + duration = self.max_duration |
565 | + if (duration is not None |
566 | + and self.started_at is not None and self.ended_at is None): |
567 | + # A timeout is defined, the thread has been started but is not |
568 | + # finished yet |
569 | + if (time.time() - self.started_at) > self.max_duration: |
570 | + return True |
571 | + return False |
572 | + |
573 | + |
574 | +class ThreadDriver(CatchingExceptionThread): |
575 | + |
576 | + def __init__(self, queue, max_threads, target=None): |
577 | + """Create a ThreadDriver. |
578 | + |
579 | + :param queue: A thread provider. |
580 | + |
581 | + :param max_threads: Maximum number of threads allowed to run |
582 | + concurrently. |
583 | + |
584 | + :param target: What callable should be run in the thread. |
585 | + """ |
586 | + self.started = threading.Event() |
587 | + self.stopped = threading.Event() |
588 | + self.stop = threading.Event() |
589 | + if target is None: |
590 | + target = self.drive |
591 | + super(ThreadDriver, self).__init__(target=target, |
592 | + sync_event=self.started) |
593 | + self.queue = queue |
594 | + self.queue_closed = threading.Event() |
595 | + self.max_threads = max_threads |
596 | + self.threads = [] |
597 | + |
598 | + def drive(self): |
599 | + self.before_start() |
600 | + # From now on the caller will let us run until we stop or die |
601 | + self.switch_and_set(self.stopped) |
602 | + while True: |
603 | + # Calling wait ensures we can be interrupted properly, but we don't |
604 | + # want to block so we use a timeout. We don't use 0 though or other |
605 | + # threads may not get any cycle otherwise. |
606 | + self.stop.wait(0.001) |
607 | + if self.stop.isSet(): |
608 | + break |
609 | + # Check for threads exceeding their expected lifetime |
610 | + self.check_time_quota() |
611 | + self.do_one_step() |
612 | + self.before_stop() |
613 | + self.stopped.set() |
614 | + |
615 | + def before_start(self): |
616 | + pass |
617 | + |
618 | + def check_time_quota(self): |
619 | + """Ensure threads don't exceed their elapsed time quota. |
620 | + |
621 | + :returns: The list of killed threads. |
622 | + """ |
623 | + killed = [] |
624 | + for t in self.threads: |
625 | + if t.has_exceeded_time_quota(): |
626 | + # Die you sucker |
627 | + t.kill_with_escalation() |
628 | + killed.append(t) |
629 | + return killed |
630 | + |
631 | + def do_one_step(self): |
632 | + self.collect_terminated_threads() |
633 | + if (not self.stop.isSet() and not self.queue_closed.isSet() |
634 | + and len(self.threads) < self.max_threads): |
635 | + self.start_new_thread() |
636 | + if self.queue_closed.isSet() and not self.threads: |
637 | + # The queue is closed and there are no more running threads |
638 | + self.stop.set() |
639 | + |
640 | + def before_stop(self): |
641 | + self.kill_remaining_threads() |
642 | + |
643 | + def start_new_thread(self): |
644 | + # Get a new thread from the queue and start it |
645 | + try: |
646 | + new_thread = self.queue.get_nowait() |
647 | + except Queue.Empty: |
648 | + return None |
649 | + self.threads.append(new_thread) |
650 | + new_thread.start() |
651 | + new_thread.started.wait() |
652 | + # Tell the queue we're done with this thread |
653 | + self.queue.task_done() |
654 | + return new_thread |
655 | + |
656 | + def collect_terminated_threads(self): |
657 | + # If there are terminated threads, collect them |
658 | + for t in self.threads[:]: |
659 | + if not t.isAlive(): |
660 | + t.collect() |
661 | + self.threads.remove(t) |
662 | + |
663 | + def kill_remaining_threads(self): |
664 | + # Kill all the remaining threads |
665 | + for t in self.threads[:]: |
666 | + t.kill_with_escalation() |
667 | + # Collect the thread whether we could kill it or not |
668 | + t.collect() |
669 | + self.threads.remove(t) |
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
On 6/15/2011 9:32 AM, Max Bowsher wrote: stopped. wait(0. 2) stopped. wait(5) stopped. isSet() :
> This includes planning and spawning imports, tracking their success or
> @@ -301,7 +302,7 @@
> # doing all the filesystem calls every milisecond but the user
> # shouldn't have to wait too long to see his modifications
> # taken into account either.
> - self.driver.
> + self.driver.
> if self.driver.
> break
> # Then check the graceful stop file
>
That is more than just moving the code around.
The rest seems fine to me, but I remember talking with vila quite a bit
about his threading patches, so I'd like him to take a look at this.
John enigmail. mozdev. org/
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://
iEYEARECAAYFAk3 4nF0ACgkQJdeBCY SNAAPiwACeMTfzR EOn35a2JYxSq4vi Athn jQxlR8JucUGr5k5 Lq
yzcAn0YiAeITR4U
=/CNm
-----END PGP SIGNATURE-----