Merge lp:~vila/udd/use-new-driver into lp:udd

Proposed by Vincent Ladeuil
Status: Merged
Approved by: Jelmer Vernooij
Approved revision: 427
Merged at revision: 405
Proposed branch: lp:~vila/udd/use-new-driver
Merge into: lp:udd
Prerequisite: lp:~vila/udd/robust-driver
Diff against target: 530 lines (+138/-261)
3 files modified
icommon.py (+3/-3)
mass_import.py (+126/-249)
tests.py (+9/-9)
To merge this branch: bzr merge lp:~vila/udd/use-new-driver
Reviewer Review Type Date Requested Status
James Westby Approve
Review via email: mp+50644@code.launchpad.net

Description of the change

Build on top of lp:~vila/udd/robust-driver , this simplify the mass_import.py script.

ThreadedImporter has been renamed Importer for clarity, and just redefine a couple of methods mainly for logger calls() and db access.

AllQueue has been adapted to plug into the new design and will need more work, but this step is just to replace the existing implementation.

ImportDriver also redefine a couple of methods for the same reasons (logger and db).

ImportController uses the new ImportDriver features (well, ThreadDriver really) to simplify the control.

One feature has been lost here: changing the number of threads dynamically. I only realize that after doing a bunch of tests on package-import.local so I'll add it again in a next submission (or during review).

Note that this refactoring addresses the 'still active' spamming issue (but there wasn't a bug number for that right ?).

bug #589523 (adding a timeout for imports) will be far easier to fix now.

To post a comment you must log in.
lp:~vila/udd/use-new-driver updated
425. By Vincent Ladeuil

Renamed SubprocessThread to SubprocessMonitor (and fixes lp:714420)

Revision history for this message
Vincent Ladeuil (vila) wrote :

There *was* a bug: #714420. So it's fixed now :)

lp:~vila/udd/use-new-driver updated
426. By Vincent Ladeuil

Merge robust-driver into use-new-driver

427. By Vincent Ladeuil

This also fixes bug #717204

Revision history for this message
James Westby (james-w) wrote :

157 + except Queue.Empty:
158 + self.unfinished_tasks += 1
159 + job_id, package_name = self.next_job()

186 + except StopIteration:
187 + raise Queue.Empty

Won't this mean that Queue.Empty would bubble to the top and stop the importer in this case?
Is that intentional?

This looks like a good simplification, thanks.

James

review: Needs Information
Revision history for this message
Vincent Ladeuil (vila) wrote :

> 157 + except Queue.Empty:
> 158 + self.unfinished_tasks += 1
> 159 + job_id, package_name = self.next_job()
>
> 186 + except StopIteration:
> 187 + raise Queue.Empty
>
> Won't this mean that Queue.Empty would

be caught by start_new_thread which handles it as meaning: there is no job for me yet, return None.

> bubble to the top and stop the importer

So no.

> in this case?
> Is that intentional?

Yup, it's a nice property of Queue that you could chain them this way almost transparently (first the jobs, then the packages (and I intentionally left the interrupted ones re-queuing in the controller since that occurs only once)).

If needed, we could add some priority handling in the queue itself, but I think a better plan is to maintain a debian and a lp canaris that will just suspend the execution of imports when one of the sites is down (and kill and requeue the ones in progress at the top of the queue)

> This looks like a good simplification, thanks.

Very much appreciated :-)

Revision history for this message
James Westby (james-w) wrote :

> > 157 + except Queue.Empty:
> > 158 + self.unfinished_tasks += 1
> > 159 + job_id, package_name = self.next_job()
> >
> > 186 + except StopIteration:
> > 187 + raise Queue.Empty
> >
> > Won't this mean that Queue.Empty would
>
> be caught by start_new_thread which handles it as meaning: there is no job for
> me yet, return None.

Ah missed that.

> If needed, we could add some priority handling in the queue itself, but I
> think a better plan is to maintain a debian and a lp canaris that will just
> suspend the execution of imports when one of the sites is down (and kill and
> requeue the ones in progress at the top of the queue)

I think the canaries are a good idea. I still think that the priority handling
in the old code is useful. It's useful to be able to requeue a package as it is
blocking someone and have it done as soon as a slot is free. I've never felt the
need for more fine-grained than the three levels we have now though (ASAP, soon, and
idle).

Thanks,

James

review: Approve
Revision history for this message
Vincent Ladeuil (vila) wrote :

>
> > If needed, we could add some priority handling in the queue itself, but I
> > think a better plan is to maintain a debian and a lp canaris that will just
> > suspend the execution of imports when one of the sites is down (and kill and
> > requeue the ones in progress at the top of the queue)
>
> I think the canaries are a good idea. I still think that the priority handling
> in the old code is useful.

The one in the db ? Oh my, I wouldn't touch that, it's perfect there.

> It's useful to be able to requeue a package as it
> is
> blocking someone and have it done as soon as a slot is free. I've never felt
> the
> need for more fine-grained than the three levels we have now though (ASAP,
> soon, and
> idle).

Yup. Good point about re-queuing on-demand, but that's already covered by requeue-package --priority no ? ISTM the way you wrote next_job() should give us that since it queries the db for each job... I'm not familiar enough with SQL (nor the requeue_package implementation) but if the query takes the priority into account my implementation will respect it.

When I was mentioning multiple queues/priority I was thinking about the losa proxied queries we may need in the future (including of course requeue-package or any other python script) which is the subject of my next submission.

Revision history for this message
James Westby (james-w) wrote :

On Wed, 23 Feb 2011 21:27:58 -0000, Vincent Ladeuil <email address hidden> wrote:
> The one in the db ? Oh my, I wouldn't touch that, it's perfect there.

Exactly.

> Yup. Good point about re-queuing on-demand, but that's already covered
> by requeue-package --priority no ?

Indeed.

> ISTM the way you wrote next_job()
> should give us that since it queries the db for each job... I'm not
> familiar enough with SQL (nor the requeue_package implementation) but
> if the query takes the priority into account my implementation will
> respect it.

That's perfect then. IIRC the SQL does an ORDER BY prio DESC or similar.

> When I was mentioning multiple queues/priority I was thinking about
> the losa proxied queries we may need in the future (including of
> course requeue-package or any other python script) which is the
> subject of my next submission.

Ok, let's discuss it then.

I was merely trying to highlight the utility of the priority scheme that
is in place now, rather than arguing against any specific change.

Thanks,

James

Revision history for this message
Vincent Ladeuil (vila) wrote :

> Ok, let's discuss it then.

Cool, we are in agreement then, see you on my next proposal ;)

> I was merely trying to highlight the utility of the priority scheme that
> is in place now,

Sorry, I should have been clearer about that but I don't fully understand yet all details (which is also why I tried to not change these parts :)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'icommon.py'
2--- icommon.py 2011-02-22 16:11:07 +0000
3+++ icommon.py 2011-02-22 16:11:07 +0000
4@@ -1938,13 +1938,13 @@
5 """
6
7
8-class SubprocessThread(CatchingExceptionThread):
9+class SubprocessMonitor(CatchingExceptionThread):
10
11 def __init__(self, cmd):
12 self.started = threading.Event()
13 self.stopped = threading.Event()
14- super(SubprocessThread, self).__init__(target=self.spawn,
15- sync_event=self.started)
16+ super(SubprocessMonitor, self).__init__(target=self.spawn,
17+ sync_event=self.started)
18 self.cmd = cmd
19 self.proc_pid = None
20 self.retcode = None
21
22=== modified file 'mass_import.py'
23--- mass_import.py 2011-02-20 19:32:59 +0000
24+++ mass_import.py 2011-02-22 16:11:07 +0000
25@@ -3,6 +3,7 @@
26 import codecs
27 import logging
28 import os
29+import Queue
30 import random
31 import signal
32 from stat import ST_DEV, ST_INO
33@@ -111,223 +112,116 @@
34 d_archive = debian.main_archive
35 u_archive = ubuntu.main_archive
36
37-lp_lock = threading.Lock()
38-
39-
40-def subprocess_setup():
41- signal.signal(signal.SIGPIPE, signal.SIG_DFL)
42-
43-
44-def pool_base(name):
45- if name.startswith("lib"):
46- return name[:4]
47- return name[0]
48-
49-
50-class ThreadedImporter(threading.Thread):
51-
52- import_cmd = 'import_package.py %s'
53-
54- def __init__(self, package, job_id):
55- super(ThreadedImporter, self).__init__()
56- self.package = package
57+
58+class Importer(icommon.SubprocessMonitor):
59+
60+ def __init__(self, cmd, package_name, job_id):
61+ super(Importer, self).__init__(cmd % (package_name,))
62+ self.package_name = package_name
63 self.job_id = job_id
64- self.success = None
65- self.output = None
66- self.proc_pid = None
67- self.stopped = threading.Event()
68+ self.status_db = icommon.StatusDatabase(icommon.sqlite_file)
69
70- def run(self):
71- success = False
72- output = icommon.running_sentinel
73- logger.info("Trying %s" % self.package)
74- proc = subprocess.Popen(self.import_cmd % self.package,
75- shell=True, stdout=subprocess.PIPE,
76- stderr=subprocess.STDOUT, stdin=subprocess.PIPE,
77- preexec_fn=subprocess_setup)
78- # Save the pid so the process can be killed from another thread.
79- self.proc_pid = proc.pid
80- output, _ = proc.communicate()
81- if proc.returncode == icommon.no_lock_returncode:
82- logger.info("Couldn't lock %s, skipping" % self.package)
83+ def collect(self, timeout=None):
84+ super(Importer, self).collect(timeout)
85+ if self.retcode == icommon.no_lock_returncode:
86+ logger.info("Couldn't lock %s, skipping" % self.package_name)
87 else:
88- self.success = (proc.returncode == 0)
89- self.output = output
90- self.stopped.set()
91-
92- def kill(self):
93- if self.proc_id is not None:
94- try:
95- os.kill(self.proc_pid, signal.SIGTERM)
96- self.proc_pid = None
97- except OSError, e:
98- if e.errno == errno.ESRCH:
99- # The process doesn't exist anymore.
100- self.proc_pid = None
101- # We ignore all other exceptions but don't reset proc_pid
102- # for them so that we can try to kill it again. We don't
103- # re-raise either to let the caller continue.
104- else:
105- pass
106- except:
107- pass
108-
109-
110-class Stop(Exception):
111- pass
112-
113-
114-class AllQueue(object):
115- """A Queue that always returns a package, even if we are not sure that
116- it is needed"""
117+ unicode_output = self.out.decode("utf-8", "replace")
118+ ascii_output = unicode_output.encode("ascii", "replace")
119+ success = self.retcode == 0
120+ if success:
121+ logger.info("Success %s: %s"
122+ % (self.package_name,
123+ ascii_output.replace("\n", " ")))
124+ else:
125+ logger.warning("Importing %s failed:\n%s" % (self.package_name,
126+ ascii_output))
127+ self.status_db.finish_job(
128+ self.package_name, self.job_id, success,
129+ unicode_output.encode("utf-8", "replace"))
130+ logger.info("thread for %s finished" % self.package_name)
131+
132+
133+class AllQueue(Queue.Queue):
134+ """A Queue that always returns a package.
135+
136+ The jobs in the status db are tried first.
137+
138+ When no more jobs are pending, the package db is used, even if we are not
139+ sure that it is needed.
140+ """
141
142 def __init__(self):
143+ Queue.Queue.__init__(self)
144 self.tried = set()
145- self.packages_db = icommon.PackageDatabase(
146- icommon.sqlite_package_file)
147+ self.packages_db = icommon.PackageDatabase(icommon.sqlite_package_file)
148 self.status_db = icommon.StatusDatabase(icommon.sqlite_file)
149
150+ def get_nowait(self):
151+ # FIXME: It's bogus to implement get_nowait() this way (without
152+ # locking) but we don't really care since we have a single consumer
153+ # and no producer. This should be changed to a different model anyway
154+ # so different kinds of jobs could be queued -- vila 20110221
155+ try:
156+ job_id, package_name = Queue.Queue.get_nowait(self)
157+ except Queue.Empty:
158+ self.unfinished_tasks += 1
159+ job_id, package_name = self.next_job()
160+ return Importer('import_package.py %s',
161+ package_name, job_id)
162+
163 def next_job(self):
164 while True:
165- try:
166- to_try = self.packages_db.get_one(self.tried)
167- except StopIteration:
168- return (None, None)
169- if to_try is not None:
170- self.tried.add(to_try)
171- job_id = self.status_db.start_package(to_try)
172- if job_id is not None:
173- return (job_id, to_try)
174+ # jobs first
175+ job_id, package = self.status_db.next_job()
176+ if package is not None:
177+ return job_id, package
178 else:
179- self.tried = set()
180-
181-
182-class ImportDriver(threading.Thread):
183+ # Now the packages
184+ try:
185+ to_try = self.packages_db.get_one(self.tried)
186+ except StopIteration:
187+ raise Queue.Empty
188+ if to_try is not None:
189+ self.tried.add(to_try)
190+ job_id = self.status_db.start_package(to_try)
191+ if job_id is not None:
192+ return (job_id, to_try)
193+ else:
194+ logger.info("All packages requeued, start again")
195+ self.tried = set()
196+
197+
198+class ImportDriver(icommon.ThreadDriver):
199 """Monitor the ThreadedImporter.
200
201 This includes planning and spawning imports, tracking their success or
202 failures and shutting them down.
203 """
204
205- MAX_THREADS = 6
206-
207- def __init__(self):
208- super(ImportDriver, self).__init__()
209- self.threads = []
210- self.stop_requested = threading.Event()
211- self.stop_now = threading.Event()
212- self.started = threading.Event()
213- self.stopped = threading.Event()
214-
215- def request_stop(self):
216- self.stop_requested.set()
217-
218- def should_stop(self):
219- return self.stop_requested.isSet() or self.must_stop()
220-
221- def must_stop(self):
222- return self.stop_now.isSet()
223-
224- def get_next(self):
225- job_id, package = self.status_db.next_job()
226- if package is None:
227- (job_id, package) = self.queue.next_job()
228- return (job_id, package)
229-
230- def sleep(self, max):
231- for i in range(max):
232- if self.should_stop():
233- return True
234- time.sleep(1)
235- return self.should_stop()
236-
237- def deep_sleep(self, max):
238- for i in range(max):
239- if self.must_stop():
240- return True
241- time.sleep(1)
242- return self.must_stop()
243-
244- def get_max_threads(self):
245- if os.path.exists(icommon.max_threads_file):
246- f = open(icommon.max_threads_file)
247- try:
248- return int(f.read().splitlines()[0].strip())
249- except Exception, e:
250- logger.warning("Error reading max threads file: %s", str(e))
251- return self.MAX_THREADS
252- finally:
253- f.close()
254- else:
255- return self.MAX_THREADS
256-
257- def package_finished(self, package, job_id, success, output):
258- unicode_output = output.decode("utf-8", "replace")
259- ascii_output = unicode_output.encode("ascii", "replace")
260- if success:
261- logger.info("Success %s: %s"
262- % (package, ascii_output.replace("\n", " ")))
263- else:
264- logger.warning("Importing %s failed:\n%s" % (package, ascii_output))
265- self.status_db.finish_job(package, job_id, success,
266- unicode_output.encode("utf-8", "replace"))
267-
268- def _wait_until_threads_reaches(self, target):
269- i = 0
270- while len(self.threads) > target:
271- if i % 6 == 0:
272- logger.info("threads for %s still active"
273- % str([t.package for t in self.threads]))
274- removed = self._retire_finished_threads()
275- if not removed:
276- self.sleep(10)
277- i += 1
278-
279- def _retire_finished_threads(self):
280- removed = False
281- for thread in self.threads[:]:
282- if not thread.isAlive():
283- removed = True
284- logger.info("thread for %s finished" % thread.package)
285- if thread.success is not None:
286- self.package_finished(thread.package, thread.job_id,
287- thread.success, thread.output)
288- self.threads.remove(thread)
289- elif self.must_stop():
290- thread.kill()
291- return removed
292-
293- def run(self):
294- try:
295- self.status_db = icommon.StatusDatabase(icommon.sqlite_file)
296- self.queue = AllQueue()
297- self.threads = []
298- # Release caller
299- self.started.set()
300- asked_to_stop = False
301- while not self.should_stop():
302- self._retire_finished_threads()
303- max_threads = self.get_max_threads()
304- job_id, next = self.get_next()
305- if next is None:
306- logger.debug("No package in queue, sleeping")
307- self.sleep(10)
308- continue
309- if self.should_stop():
310- continue
311- logger.debug("Starting thread for %s" % next)
312- new_thread = ThreadedImporter(next, job_id)
313- new_thread.start()
314- self.threads.append(new_thread)
315- self._wait_until_threads_reaches(max_threads-1)
316- # We've been asked to stop
317- logger.info("Driver asked to stop")
318- self._wait_until_threads_reaches(0)
319- except Exception, e:
320- logger.critical("Driver hit %s" % str(e))
321- finally:
322- logger.info("Driver stopping")
323- self.stopped.set()
324+ def __init__(self, max_threads):
325+ super(ImportDriver, self).__init__(None, max_threads)
326+
327+ def before_start(self):
328+ self.queue = AllQueue()
329+
330+ def start_new_thread(self):
331+ t = super(ImportDriver, self).start_new_thread()
332+ if t is not None:
333+ logger.debug("Starting thread for %s" % (t.package_name,))
334+
335+ def collect_terminated_threads(self):
336+ before = len(self.threads)
337+ super(ImportDriver, self).collect_terminated_threads()
338+ after = len(self.threads)
339+ if before != after:
340+ # Only mention the running threads when we add or remove some
341+ logger.info("threads for %r still active"
342+ % [t.package_name for t in self.threads])
343+
344+
345+class Stop(Exception):
346+ pass
347
348
349 class ImportController(object):
350@@ -335,38 +229,8 @@
351
352 def __init__(self):
353 self.status_db = icommon.StatusDatabase(icommon.sqlite_file)
354- self.stop_requested = False
355- self.driver = ImportDriver()
356- self.stopped = threading.Event()
357- self.stopped.set()
358-
359- def request_stop(self):
360- self.stop_requested = True
361-
362- def should_graceful_stop(self):
363- if os.path.exists(icommon.stop_file):
364- return True
365- return self.should_stop()
366-
367- def should_stop(self):
368- return self.stop_requested
369-
370- def check_stop(self):
371- if self.should_stop():
372- self.driver.stop_now.set()
373- raise Stop
374- if self.should_graceful_stop():
375- self.driver.stop_requested.set()
376- raise Stop
377-
378- def sleep(self, max):
379- for i in range(max):
380- self.check_stop()
381- time.sleep(1)
382- self.check_stop()
383-
384- def add_jobs_for_interrupted(self):
385- self.status_db.add_jobs_for_interrupted()
386+ # Start with a 8-way Driver
387+ self.driver = ImportDriver(8)
388
389 def run(self):
390 lock = icommon.lock_main()
391@@ -374,25 +238,36 @@
392 logger.info("Another main process is running")
393 raise Stop
394 try:
395- self.add_jobs_for_interrupted()
396- self.stopped.clear()
397- try:
398- self.driver.start()
399- self.driver.started.wait()
400- while True:
401- if self.driver.stopped.isSet():
402- self.stopped.set()
403- break
404- self.sleep(10)
405- finally:
406- self.driver.stop_requested.set()
407- logger.info("Waiting for driver to finish")
408- self.driver.stopped.wait()
409- self.stopped.set()
410- logger.info("Finished")
411+ # First, re-queue previously interrupted jobs.
412+ self.status_db.add_jobs_for_interrupted()
413+ self.driver.start()
414+ self.driver.started.wait()
415+ while True:
416+ # First, check the driver
417+ self.driver.stopped.wait(0.1)
418+ if self.driver.stopped.isSet():
419+ break
420+ # Then check the graceful stop file
421+ if os.path.exists(icommon.stop_file):
422+ self.driver.queue_closed.set()
423+ continue
424+ # Catch driver exception if any
425+ self.report_driver_exception()
426+ self.driver.join()
427+ logger.info("Finished")
428 finally:
429 lock.close()
430
431+ def report_driver_exception(self):
432+ if self.driver.exception is not None:
433+ import traceback
434+ exc_class, exc_value, exc_tb = self.driver.exception
435+ logger.info('Driver failed with exception:\n%s'
436+ % ''.join(traceback.format_exception(
437+ exc_class, exc_value, exc_tb)))
438+ # No need to ever re-raise this exception, we just reported it
439+ self.driver.exception = None
440+
441
442 controller = ImportController()
443
444@@ -401,9 +276,11 @@
445 logger.info("Received signal")
446 signal.signal(signum, signal.SIG_DFL)
447
448- controller.driver.stop_now.set()
449+ controller.driver.stop.set()
450 logger.info("Waiting for driver to finish")
451 controller.driver.stopped.wait()
452+ controller.report_driver_exception()
453+ controller.driver.join()
454 logger.info("Driver finished: stopping")
455 sys.exit(1)
456
457
458=== modified file 'tests.py'
459--- tests.py 2011-02-22 16:11:07 +0000
460+++ tests.py 2011-02-22 16:11:07 +0000
461@@ -378,11 +378,11 @@
462 pass
463
464
465-class TestSubprocessThread(tests.TestCase):
466+class TestSubprocessMonitor(tests.TestCase):
467
468 def run_in_subprocess(self, cmd, kls=None):
469 if kls is None:
470- kls = icommon.SubprocessThread
471+ kls = icommon.SubprocessMonitor
472 sub = kls(cmd)
473 self.addCleanup(sub.join, 0)
474 sub.start()
475@@ -420,7 +420,7 @@
476 sub = self.run_in_subprocess('yes')
477 sub.started.wait()
478 # For oscure reasons, using SIGABRT instead of SIGKILL and stopping
479- # below left the 'yes' process alive and the SubprocessThread happily
480+ # below left the 'yes' process alive and the SubprocessMonitor happily
481 # consuming memroy (8GB mark reached while sitting under pdb, which is
482 # good enough to consider bug #589532 addressed.
483 # import pdb; pdb.set_trace()
484@@ -430,16 +430,16 @@
485
486 def test_kill_catch_zombie(self):
487 control = threading.Event()
488- class TestSubprocessThread(icommon.SubprocessThread):
489+ class TestSubprocessMonitor(icommon.SubprocessMonitor):
490
491 def switch_and_set(self, new):
492- super(TestSubprocessThread, self).switch_and_set(new)
493+ super(TestSubprocessMonitor, self).switch_and_set(new)
494 if new is self.stopped:
495 # The process is running but we haven't called
496 # proc.communicate yet.
497 control.wait()
498
499- sub = self.run_in_subprocess('yes', TestSubprocessThread)
500+ sub = self.run_in_subprocess('yes', TestSubprocessMonitor)
501 self.addCleanup(sub.join, 0)
502 sub.started.wait()
503 # Kill the subprocess ourselves
504@@ -454,7 +454,7 @@
505 self.assertEquals(-signal.SIGTERM, sub.retcode)
506
507
508-class Sleeper(icommon.SubprocessThread):
509+class Sleeper(icommon.SubprocessMonitor):
510
511 def __init__(self):
512 # sleep can't be killed (or interrupted) so we don't sleep for long but
513@@ -544,7 +544,7 @@
514 self.started = threading.Event()
515 self.resumed = threading.Event()
516 self.terminated = threading.Event()
517- class TestThread(icommon.SubprocessThread):
518+ class TestThread(icommon.SubprocessMonitor):
519
520 def collect(thread):
521 self.assertIs(self.thread, thread)
522@@ -584,7 +584,7 @@
523 self.assertLength(0, driver.threads)
524
525 def test_thread_exception_raised_in_driver(self):
526- class TestThread(icommon.SubprocessThread):
527+ class TestThread(icommon.SubprocessMonitor):
528
529 def collect(thread):
530 # false returns 1

Subscribers

People subscribed via source and target branches