Merge lp:~vila/udd/use-new-driver into lp:udd
- use-new-driver
- Merge into import-scripts
Status: | Merged | ||||||||
---|---|---|---|---|---|---|---|---|---|
Approved by: | Jelmer Vernooij | ||||||||
Approved revision: | 427 | ||||||||
Merged at revision: | 405 | ||||||||
Proposed branch: | lp:~vila/udd/use-new-driver | ||||||||
Merge into: | lp:udd | ||||||||
Prerequisite: | lp:~vila/udd/robust-driver | ||||||||
Diff against target: |
530 lines (+138/-261) 3 files modified
icommon.py (+3/-3) mass_import.py (+126/-249) tests.py (+9/-9) |
||||||||
To merge this branch: | bzr merge lp:~vila/udd/use-new-driver | ||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
James Westby | Approve | ||
Review via email: mp+50644@code.launchpad.net |
Commit message
Description of the change
Build on top of lp:~vila/udd/robust-driver , this simplify the mass_import.py script.
ThreadedImporter has been renamed Importer for clarity, and just redefine a couple of methods mainly for logger calls() and db access.
AllQueue has been adapted to plug into the new design and will need more work, but this step is just to replace the existing implementation.
ImportDriver also redefine a couple of methods for the same reasons (logger and db).
ImportController uses the new ImportDriver features (well, ThreadDriver really) to simplify the control.
One feature has been lost here: changing the number of threads dynamically. I only realize that after doing a bunch of tests on package-
Note that this refactoring addresses the 'still active' spamming issue (but there wasn't a bug number for that right ?).
bug #589523 (adding a timeout for imports) will be far easier to fix now.
- 425. By Vincent Ladeuil
-
Renamed SubprocessThread to SubprocessMonitor (and fixes lp:714420)
Vincent Ladeuil (vila) wrote : | # |
- 426. By Vincent Ladeuil
-
Merge robust-driver into use-new-driver
- 427. By Vincent Ladeuil
-
This also fixes bug #717204
James Westby (james-w) wrote : | # |
157 + except Queue.Empty:
158 + self.unfinished
159 + job_id, package_name = self.next_job()
186 + except StopIteration:
187 + raise Queue.Empty
Won't this mean that Queue.Empty would bubble to the top and stop the importer in this case?
Is that intentional?
This looks like a good simplification, thanks.
James
Vincent Ladeuil (vila) wrote : | # |
> 157 + except Queue.Empty:
> 158 + self.unfinished
> 159 + job_id, package_name = self.next_job()
>
> 186 + except StopIteration:
> 187 + raise Queue.Empty
>
> Won't this mean that Queue.Empty would
be caught by start_new_thread which handles it as meaning: there is no job for me yet, return None.
> bubble to the top and stop the importer
So no.
> in this case?
> Is that intentional?
Yup, it's a nice property of Queue that you could chain them this way almost transparently (first the jobs, then the packages (and I intentionally left the interrupted ones re-queuing in the controller since that occurs only once)).
If needed, we could add some priority handling in the queue itself, but I think a better plan is to maintain a debian and a lp canaris that will just suspend the execution of imports when one of the sites is down (and kill and requeue the ones in progress at the top of the queue)
> This looks like a good simplification, thanks.
Very much appreciated :-)
James Westby (james-w) wrote : | # |
> > 157 + except Queue.Empty:
> > 158 + self.unfinished
> > 159 + job_id, package_name = self.next_job()
> >
> > 186 + except StopIteration:
> > 187 + raise Queue.Empty
> >
> > Won't this mean that Queue.Empty would
>
> be caught by start_new_thread which handles it as meaning: there is no job for
> me yet, return None.
Ah missed that.
> If needed, we could add some priority handling in the queue itself, but I
> think a better plan is to maintain a debian and a lp canaris that will just
> suspend the execution of imports when one of the sites is down (and kill and
> requeue the ones in progress at the top of the queue)
I think the canaries are a good idea. I still think that the priority handling
in the old code is useful. It's useful to be able to requeue a package as it is
blocking someone and have it done as soon as a slot is free. I've never felt the
need for more fine-grained than the three levels we have now though (ASAP, soon, and
idle).
Thanks,
James
Vincent Ladeuil (vila) wrote : | # |
>
> > If needed, we could add some priority handling in the queue itself, but I
> > think a better plan is to maintain a debian and a lp canaris that will just
> > suspend the execution of imports when one of the sites is down (and kill and
> > requeue the ones in progress at the top of the queue)
>
> I think the canaries are a good idea. I still think that the priority handling
> in the old code is useful.
The one in the db ? Oh my, I wouldn't touch that, it's perfect there.
> It's useful to be able to requeue a package as it
> is
> blocking someone and have it done as soon as a slot is free. I've never felt
> the
> need for more fine-grained than the three levels we have now though (ASAP,
> soon, and
> idle).
Yup. Good point about re-queuing on-demand, but that's already covered by requeue-package --priority no ? ISTM the way you wrote next_job() should give us that since it queries the db for each job... I'm not familiar enough with SQL (nor the requeue_package implementation) but if the query takes the priority into account my implementation will respect it.
When I was mentioning multiple queues/priority I was thinking about the losa proxied queries we may need in the future (including of course requeue-package or any other python script) which is the subject of my next submission.
James Westby (james-w) wrote : | # |
On Wed, 23 Feb 2011 21:27:58 -0000, Vincent Ladeuil <email address hidden> wrote:
> The one in the db ? Oh my, I wouldn't touch that, it's perfect there.
Exactly.
> Yup. Good point about re-queuing on-demand, but that's already covered
> by requeue-package --priority no ?
Indeed.
> ISTM the way you wrote next_job()
> should give us that since it queries the db for each job... I'm not
> familiar enough with SQL (nor the requeue_package implementation) but
> if the query takes the priority into account my implementation will
> respect it.
That's perfect then. IIRC the SQL does an ORDER BY prio DESC or similar.
> When I was mentioning multiple queues/priority I was thinking about
> the losa proxied queries we may need in the future (including of
> course requeue-package or any other python script) which is the
> subject of my next submission.
Ok, let's discuss it then.
I was merely trying to highlight the utility of the priority scheme that
is in place now, rather than arguing against any specific change.
Thanks,
James
Vincent Ladeuil (vila) wrote : | # |
> Ok, let's discuss it then.
Cool, we are in agreement then, see you on my next proposal ;)
> I was merely trying to highlight the utility of the priority scheme that
> is in place now,
Sorry, I should have been clearer about that but I don't fully understand yet all details (which is also why I tried to not change these parts :)
Preview Diff
1 | === modified file 'icommon.py' |
2 | --- icommon.py 2011-02-22 16:11:07 +0000 |
3 | +++ icommon.py 2011-02-22 16:11:07 +0000 |
4 | @@ -1938,13 +1938,13 @@ |
5 | """ |
6 | |
7 | |
8 | -class SubprocessThread(CatchingExceptionThread): |
9 | +class SubprocessMonitor(CatchingExceptionThread): |
10 | |
11 | def __init__(self, cmd): |
12 | self.started = threading.Event() |
13 | self.stopped = threading.Event() |
14 | - super(SubprocessThread, self).__init__(target=self.spawn, |
15 | - sync_event=self.started) |
16 | + super(SubprocessMonitor, self).__init__(target=self.spawn, |
17 | + sync_event=self.started) |
18 | self.cmd = cmd |
19 | self.proc_pid = None |
20 | self.retcode = None |
21 | |
22 | === modified file 'mass_import.py' |
23 | --- mass_import.py 2011-02-20 19:32:59 +0000 |
24 | +++ mass_import.py 2011-02-22 16:11:07 +0000 |
25 | @@ -3,6 +3,7 @@ |
26 | import codecs |
27 | import logging |
28 | import os |
29 | +import Queue |
30 | import random |
31 | import signal |
32 | from stat import ST_DEV, ST_INO |
33 | @@ -111,223 +112,116 @@ |
34 | d_archive = debian.main_archive |
35 | u_archive = ubuntu.main_archive |
36 | |
37 | -lp_lock = threading.Lock() |
38 | - |
39 | - |
40 | -def subprocess_setup(): |
41 | - signal.signal(signal.SIGPIPE, signal.SIG_DFL) |
42 | - |
43 | - |
44 | -def pool_base(name): |
45 | - if name.startswith("lib"): |
46 | - return name[:4] |
47 | - return name[0] |
48 | - |
49 | - |
50 | -class ThreadedImporter(threading.Thread): |
51 | - |
52 | - import_cmd = 'import_package.py %s' |
53 | - |
54 | - def __init__(self, package, job_id): |
55 | - super(ThreadedImporter, self).__init__() |
56 | - self.package = package |
57 | + |
58 | +class Importer(icommon.SubprocessMonitor): |
59 | + |
60 | + def __init__(self, cmd, package_name, job_id): |
61 | + super(Importer, self).__init__(cmd % (package_name,)) |
62 | + self.package_name = package_name |
63 | self.job_id = job_id |
64 | - self.success = None |
65 | - self.output = None |
66 | - self.proc_pid = None |
67 | - self.stopped = threading.Event() |
68 | + self.status_db = icommon.StatusDatabase(icommon.sqlite_file) |
69 | |
70 | - def run(self): |
71 | - success = False |
72 | - output = icommon.running_sentinel |
73 | - logger.info("Trying %s" % self.package) |
74 | - proc = subprocess.Popen(self.import_cmd % self.package, |
75 | - shell=True, stdout=subprocess.PIPE, |
76 | - stderr=subprocess.STDOUT, stdin=subprocess.PIPE, |
77 | - preexec_fn=subprocess_setup) |
78 | - # Save the pid so the process can be killed from another thread. |
79 | - self.proc_pid = proc.pid |
80 | - output, _ = proc.communicate() |
81 | - if proc.returncode == icommon.no_lock_returncode: |
82 | - logger.info("Couldn't lock %s, skipping" % self.package) |
83 | + def collect(self, timeout=None): |
84 | + super(Importer, self).collect(timeout) |
85 | + if self.retcode == icommon.no_lock_returncode: |
86 | + logger.info("Couldn't lock %s, skipping" % self.package_name) |
87 | else: |
88 | - self.success = (proc.returncode == 0) |
89 | - self.output = output |
90 | - self.stopped.set() |
91 | - |
92 | - def kill(self): |
93 | - if self.proc_id is not None: |
94 | - try: |
95 | - os.kill(self.proc_pid, signal.SIGTERM) |
96 | - self.proc_pid = None |
97 | - except OSError, e: |
98 | - if e.errno == errno.ESRCH: |
99 | - # The process doesn't exist anymore. |
100 | - self.proc_pid = None |
101 | - # We ignore all other exceptions but don't reset proc_pid |
102 | - # for them so that we can try to kill it again. We don't |
103 | - # re-raise either to let the caller continue. |
104 | - else: |
105 | - pass |
106 | - except: |
107 | - pass |
108 | - |
109 | - |
110 | -class Stop(Exception): |
111 | - pass |
112 | - |
113 | - |
114 | -class AllQueue(object): |
115 | - """A Queue that always returns a package, even if we are not sure that |
116 | - it is needed""" |
117 | + unicode_output = self.out.decode("utf-8", "replace") |
118 | + ascii_output = unicode_output.encode("ascii", "replace") |
119 | + success = self.retcode == 0 |
120 | + if success: |
121 | + logger.info("Success %s: %s" |
122 | + % (self.package_name, |
123 | + ascii_output.replace("\n", " "))) |
124 | + else: |
125 | + logger.warning("Importing %s failed:\n%s" % (self.package_name, |
126 | + ascii_output)) |
127 | + self.status_db.finish_job( |
128 | + self.package_name, self.job_id, success, |
129 | + unicode_output.encode("utf-8", "replace")) |
130 | + logger.info("thread for %s finished" % self.package_name) |
131 | + |
132 | + |
133 | +class AllQueue(Queue.Queue): |
134 | + """A Queue that always returns a package. |
135 | + |
136 | + The jobs in the status db are tried first. |
137 | + |
138 | + When no more jobs are pending, the package db is used, even if we are not |
139 | + sure that it is needed. |
140 | + """ |
141 | |
142 | def __init__(self): |
143 | + Queue.Queue.__init__(self) |
144 | self.tried = set() |
145 | - self.packages_db = icommon.PackageDatabase( |
146 | - icommon.sqlite_package_file) |
147 | + self.packages_db = icommon.PackageDatabase(icommon.sqlite_package_file) |
148 | self.status_db = icommon.StatusDatabase(icommon.sqlite_file) |
149 | |
150 | + def get_nowait(self): |
151 | + # FIXME: It's bogus to implement get_nowait() this way (without |
152 | + # locking) but we don't really care since we have a single consumer |
153 | + # and no producer. This should be changed to a different model anyway |
154 | + # so different kinds of jobs could be queued -- vila 20110221 |
155 | + try: |
156 | + job_id, package_name = Queue.Queue.get_nowait(self) |
157 | + except Queue.Empty: |
158 | + self.unfinished_tasks += 1 |
159 | + job_id, package_name = self.next_job() |
160 | + return Importer('import_package.py %s', |
161 | + package_name, job_id) |
162 | + |
163 | def next_job(self): |
164 | while True: |
165 | - try: |
166 | - to_try = self.packages_db.get_one(self.tried) |
167 | - except StopIteration: |
168 | - return (None, None) |
169 | - if to_try is not None: |
170 | - self.tried.add(to_try) |
171 | - job_id = self.status_db.start_package(to_try) |
172 | - if job_id is not None: |
173 | - return (job_id, to_try) |
174 | + # jobs first |
175 | + job_id, package = self.status_db.next_job() |
176 | + if package is not None: |
177 | + return job_id, package |
178 | else: |
179 | - self.tried = set() |
180 | - |
181 | - |
182 | -class ImportDriver(threading.Thread): |
183 | + # Now the packages |
184 | + try: |
185 | + to_try = self.packages_db.get_one(self.tried) |
186 | + except StopIteration: |
187 | + raise Queue.Empty |
188 | + if to_try is not None: |
189 | + self.tried.add(to_try) |
190 | + job_id = self.status_db.start_package(to_try) |
191 | + if job_id is not None: |
192 | + return (job_id, to_try) |
193 | + else: |
194 | + logger.info("All packages requeued, start again") |
195 | + self.tried = set() |
196 | + |
197 | + |
198 | +class ImportDriver(icommon.ThreadDriver): |
199 | """Monitor the ThreadedImporter. |
200 | |
201 | This includes planning and spawning imports, tracking their success or |
202 | failures and shutting them down. |
203 | """ |
204 | |
205 | - MAX_THREADS = 6 |
206 | - |
207 | - def __init__(self): |
208 | - super(ImportDriver, self).__init__() |
209 | - self.threads = [] |
210 | - self.stop_requested = threading.Event() |
211 | - self.stop_now = threading.Event() |
212 | - self.started = threading.Event() |
213 | - self.stopped = threading.Event() |
214 | - |
215 | - def request_stop(self): |
216 | - self.stop_requested.set() |
217 | - |
218 | - def should_stop(self): |
219 | - return self.stop_requested.isSet() or self.must_stop() |
220 | - |
221 | - def must_stop(self): |
222 | - return self.stop_now.isSet() |
223 | - |
224 | - def get_next(self): |
225 | - job_id, package = self.status_db.next_job() |
226 | - if package is None: |
227 | - (job_id, package) = self.queue.next_job() |
228 | - return (job_id, package) |
229 | - |
230 | - def sleep(self, max): |
231 | - for i in range(max): |
232 | - if self.should_stop(): |
233 | - return True |
234 | - time.sleep(1) |
235 | - return self.should_stop() |
236 | - |
237 | - def deep_sleep(self, max): |
238 | - for i in range(max): |
239 | - if self.must_stop(): |
240 | - return True |
241 | - time.sleep(1) |
242 | - return self.must_stop() |
243 | - |
244 | - def get_max_threads(self): |
245 | - if os.path.exists(icommon.max_threads_file): |
246 | - f = open(icommon.max_threads_file) |
247 | - try: |
248 | - return int(f.read().splitlines()[0].strip()) |
249 | - except Exception, e: |
250 | - logger.warning("Error reading max threads file: %s", str(e)) |
251 | - return self.MAX_THREADS |
252 | - finally: |
253 | - f.close() |
254 | - else: |
255 | - return self.MAX_THREADS |
256 | - |
257 | - def package_finished(self, package, job_id, success, output): |
258 | - unicode_output = output.decode("utf-8", "replace") |
259 | - ascii_output = unicode_output.encode("ascii", "replace") |
260 | - if success: |
261 | - logger.info("Success %s: %s" |
262 | - % (package, ascii_output.replace("\n", " "))) |
263 | - else: |
264 | - logger.warning("Importing %s failed:\n%s" % (package, ascii_output)) |
265 | - self.status_db.finish_job(package, job_id, success, |
266 | - unicode_output.encode("utf-8", "replace")) |
267 | - |
268 | - def _wait_until_threads_reaches(self, target): |
269 | - i = 0 |
270 | - while len(self.threads) > target: |
271 | - if i % 6 == 0: |
272 | - logger.info("threads for %s still active" |
273 | - % str([t.package for t in self.threads])) |
274 | - removed = self._retire_finished_threads() |
275 | - if not removed: |
276 | - self.sleep(10) |
277 | - i += 1 |
278 | - |
279 | - def _retire_finished_threads(self): |
280 | - removed = False |
281 | - for thread in self.threads[:]: |
282 | - if not thread.isAlive(): |
283 | - removed = True |
284 | - logger.info("thread for %s finished" % thread.package) |
285 | - if thread.success is not None: |
286 | - self.package_finished(thread.package, thread.job_id, |
287 | - thread.success, thread.output) |
288 | - self.threads.remove(thread) |
289 | - elif self.must_stop(): |
290 | - thread.kill() |
291 | - return removed |
292 | - |
293 | - def run(self): |
294 | - try: |
295 | - self.status_db = icommon.StatusDatabase(icommon.sqlite_file) |
296 | - self.queue = AllQueue() |
297 | - self.threads = [] |
298 | - # Release caller |
299 | - self.started.set() |
300 | - asked_to_stop = False |
301 | - while not self.should_stop(): |
302 | - self._retire_finished_threads() |
303 | - max_threads = self.get_max_threads() |
304 | - job_id, next = self.get_next() |
305 | - if next is None: |
306 | - logger.debug("No package in queue, sleeping") |
307 | - self.sleep(10) |
308 | - continue |
309 | - if self.should_stop(): |
310 | - continue |
311 | - logger.debug("Starting thread for %s" % next) |
312 | - new_thread = ThreadedImporter(next, job_id) |
313 | - new_thread.start() |
314 | - self.threads.append(new_thread) |
315 | - self._wait_until_threads_reaches(max_threads-1) |
316 | - # We've been asked to stop |
317 | - logger.info("Driver asked to stop") |
318 | - self._wait_until_threads_reaches(0) |
319 | - except Exception, e: |
320 | - logger.critical("Driver hit %s" % str(e)) |
321 | - finally: |
322 | - logger.info("Driver stopping") |
323 | - self.stopped.set() |
324 | + def __init__(self, max_threads): |
325 | + super(ImportDriver, self).__init__(None, max_threads) |
326 | + |
327 | + def before_start(self): |
328 | + self.queue = AllQueue() |
329 | + |
330 | + def start_new_thread(self): |
331 | + t = super(ImportDriver, self).start_new_thread() |
332 | + if t is not None: |
333 | + logger.debug("Starting thread for %s" % (t.package_name,)) |
334 | + |
335 | + def collect_terminated_threads(self): |
336 | + before = len(self.threads) |
337 | + super(ImportDriver, self).collect_terminated_threads() |
338 | + after = len(self.threads) |
339 | + if before != after: |
340 | + # Only mention the running threads when we add or remove some |
341 | + logger.info("threads for %r still active" |
342 | + % [t.package_name for t in self.threads]) |
343 | + |
344 | + |
345 | +class Stop(Exception): |
346 | + pass |
347 | |
348 | |
349 | class ImportController(object): |
350 | @@ -335,38 +229,8 @@ |
351 | |
352 | def __init__(self): |
353 | self.status_db = icommon.StatusDatabase(icommon.sqlite_file) |
354 | - self.stop_requested = False |
355 | - self.driver = ImportDriver() |
356 | - self.stopped = threading.Event() |
357 | - self.stopped.set() |
358 | - |
359 | - def request_stop(self): |
360 | - self.stop_requested = True |
361 | - |
362 | - def should_graceful_stop(self): |
363 | - if os.path.exists(icommon.stop_file): |
364 | - return True |
365 | - return self.should_stop() |
366 | - |
367 | - def should_stop(self): |
368 | - return self.stop_requested |
369 | - |
370 | - def check_stop(self): |
371 | - if self.should_stop(): |
372 | - self.driver.stop_now.set() |
373 | - raise Stop |
374 | - if self.should_graceful_stop(): |
375 | - self.driver.stop_requested.set() |
376 | - raise Stop |
377 | - |
378 | - def sleep(self, max): |
379 | - for i in range(max): |
380 | - self.check_stop() |
381 | - time.sleep(1) |
382 | - self.check_stop() |
383 | - |
384 | - def add_jobs_for_interrupted(self): |
385 | - self.status_db.add_jobs_for_interrupted() |
386 | + # Start with a 8-way Driver |
387 | + self.driver = ImportDriver(8) |
388 | |
389 | def run(self): |
390 | lock = icommon.lock_main() |
391 | @@ -374,25 +238,36 @@ |
392 | logger.info("Another main process is running") |
393 | raise Stop |
394 | try: |
395 | - self.add_jobs_for_interrupted() |
396 | - self.stopped.clear() |
397 | - try: |
398 | - self.driver.start() |
399 | - self.driver.started.wait() |
400 | - while True: |
401 | - if self.driver.stopped.isSet(): |
402 | - self.stopped.set() |
403 | - break |
404 | - self.sleep(10) |
405 | - finally: |
406 | - self.driver.stop_requested.set() |
407 | - logger.info("Waiting for driver to finish") |
408 | - self.driver.stopped.wait() |
409 | - self.stopped.set() |
410 | - logger.info("Finished") |
411 | + # First, re-queue previously interrupted jobs. |
412 | + self.status_db.add_jobs_for_interrupted() |
413 | + self.driver.start() |
414 | + self.driver.started.wait() |
415 | + while True: |
416 | + # First, check the driver |
417 | + self.driver.stopped.wait(0.1) |
418 | + if self.driver.stopped.isSet(): |
419 | + break |
420 | + # Then check the graceful stop file |
421 | + if os.path.exists(icommon.stop_file): |
422 | + self.driver.queue_closed.set() |
423 | + continue |
424 | + # Catch driver exception if any |
425 | + self.report_driver_exception() |
426 | + self.driver.join() |
427 | + logger.info("Finished") |
428 | finally: |
429 | lock.close() |
430 | |
431 | + def report_driver_exception(self): |
432 | + if self.driver.exception is not None: |
433 | + import traceback |
434 | + exc_class, exc_value, exc_tb = self.driver.exception |
435 | + logger.info('Driver failed with exception:\n%s' |
436 | + % ''.join(traceback.format_exception( |
437 | + exc_class, exc_value, exc_tb))) |
438 | + # No need to ever re-raise this exception, we just reported it |
439 | + self.driver.exception = None |
440 | + |
441 | |
442 | controller = ImportController() |
443 | |
444 | @@ -401,9 +276,11 @@ |
445 | logger.info("Received signal") |
446 | signal.signal(signum, signal.SIG_DFL) |
447 | |
448 | - controller.driver.stop_now.set() |
449 | + controller.driver.stop.set() |
450 | logger.info("Waiting for driver to finish") |
451 | controller.driver.stopped.wait() |
452 | + controller.report_driver_exception() |
453 | + controller.driver.join() |
454 | logger.info("Driver finished: stopping") |
455 | sys.exit(1) |
456 | |
457 | |
458 | === modified file 'tests.py' |
459 | --- tests.py 2011-02-22 16:11:07 +0000 |
460 | +++ tests.py 2011-02-22 16:11:07 +0000 |
461 | @@ -378,11 +378,11 @@ |
462 | pass |
463 | |
464 | |
465 | -class TestSubprocessThread(tests.TestCase): |
466 | +class TestSubprocessMonitor(tests.TestCase): |
467 | |
468 | def run_in_subprocess(self, cmd, kls=None): |
469 | if kls is None: |
470 | - kls = icommon.SubprocessThread |
471 | + kls = icommon.SubprocessMonitor |
472 | sub = kls(cmd) |
473 | self.addCleanup(sub.join, 0) |
474 | sub.start() |
475 | @@ -420,7 +420,7 @@ |
476 | sub = self.run_in_subprocess('yes') |
477 | sub.started.wait() |
478 | # For oscure reasons, using SIGABRT instead of SIGKILL and stopping |
479 | - # below left the 'yes' process alive and the SubprocessThread happily |
480 | + # below left the 'yes' process alive and the SubprocessMonitor happily |
481 | # consuming memroy (8GB mark reached while sitting under pdb, which is |
482 | # good enough to consider bug #589532 addressed. |
483 | # import pdb; pdb.set_trace() |
484 | @@ -430,16 +430,16 @@ |
485 | |
486 | def test_kill_catch_zombie(self): |
487 | control = threading.Event() |
488 | - class TestSubprocessThread(icommon.SubprocessThread): |
489 | + class TestSubprocessMonitor(icommon.SubprocessMonitor): |
490 | |
491 | def switch_and_set(self, new): |
492 | - super(TestSubprocessThread, self).switch_and_set(new) |
493 | + super(TestSubprocessMonitor, self).switch_and_set(new) |
494 | if new is self.stopped: |
495 | # The process is running but we haven't called |
496 | # proc.communicate yet. |
497 | control.wait() |
498 | |
499 | - sub = self.run_in_subprocess('yes', TestSubprocessThread) |
500 | + sub = self.run_in_subprocess('yes', TestSubprocessMonitor) |
501 | self.addCleanup(sub.join, 0) |
502 | sub.started.wait() |
503 | # Kill the subprocess ourselves |
504 | @@ -454,7 +454,7 @@ |
505 | self.assertEquals(-signal.SIGTERM, sub.retcode) |
506 | |
507 | |
508 | -class Sleeper(icommon.SubprocessThread): |
509 | +class Sleeper(icommon.SubprocessMonitor): |
510 | |
511 | def __init__(self): |
512 | # sleep can't be killed (or interrupted) so we don't sleep for long but |
513 | @@ -544,7 +544,7 @@ |
514 | self.started = threading.Event() |
515 | self.resumed = threading.Event() |
516 | self.terminated = threading.Event() |
517 | - class TestThread(icommon.SubprocessThread): |
518 | + class TestThread(icommon.SubprocessMonitor): |
519 | |
520 | def collect(thread): |
521 | self.assertIs(self.thread, thread) |
522 | @@ -584,7 +584,7 @@ |
523 | self.assertLength(0, driver.threads) |
524 | |
525 | def test_thread_exception_raised_in_driver(self): |
526 | - class TestThread(icommon.SubprocessThread): |
527 | + class TestThread(icommon.SubprocessMonitor): |
528 | |
529 | def collect(thread): |
530 | # false returns 1 |
There *was* a bug: #714420. So it's fixed now :)