Merge lp:~vila/udd/no-shell-true into lp:udd

Proposed by Vincent Ladeuil
Status: Merged
Approved by: Jelmer Vernooij
Approved revision: 439
Merged at revision: 405
Proposed branch: lp:~vila/udd/no-shell-true
Merge into: lp:udd
Prerequisite: lp:~vila/udd/589523-import-timeout
Diff against target: 380 lines (+128/-55)
3 files modified
icommon.py (+10/-18)
mass_import.py (+18/-7)
tests.py (+100/-30)
To merge this branch: bzr merge lp:~vila/udd/no-shell-true
Reviewer Review Type Date Requested Status
James Westby Approve
Review via email: mp+50921@code.launchpad.net

Description of the change

As mentioned in previous submissions, using shell=True with subprocesses led to complications for handling kills.

In preparation for a next submission that will allow more than import_package.py to be invoked, I also want to get rid of shell=True for security concerns.

I also added a test to ensure that we can still capture the output/error of a python subprocess if it catch interrupting signals and handled them.

Note that we can't reliably guarantee what return code we will get when a process is killed and I've stopped cheating in this regard ;)

To post a comment you must log in.
Revision history for this message
James Westby (james-w) wrote :

94 + cmd = 'import time; while True: time.sleep(0.001)'

Should this have a timeout?

Other than that this looks good.

Thanks,

James

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 2/23/2011 10:01 AM, James Westby wrote:
> Review: Approve
> 94 + cmd = 'import time; while True: time.sleep(0.001)'
>
> Should this have a timeout?
>
> Other than that this looks good.
>
> Thanks,
>
> James
>

It intentionally doesn't, because it is testing the kill code. But you
have a point, setting it to:

for i in xrange(10000): time.sleep(0.001)

would give us 10s of a process running, and not leave something forever
if the test fails for some reason.

John
=:->

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk1lPsMACgkQJdeBCYSNAAN9ewCfZ5GCUl4wYCdZuOvXbHb147+7
BckAoLeQrQTvbLHmtIPzFF/z4LQNo2kW
=a/qV
-----END PGP SIGNATURE-----

Revision history for this message
Vincent Ladeuil (vila) wrote :

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> On 2/23/2011 10:01 AM, James Westby wrote:
> > Review: Approve
> > 94 + cmd = 'import time; while True: time.sleep(0.001)'
> >
> > Should this have a timeout?
> It intentionally doesn't, because it is testing the kill code.

Yup.

> But you have a point, setting it to:

Yup.

There are addCleanup(t.kill) that *should* kill them but...

lp:~vila/udd/no-shell-true updated
435. By Vincent Ladeuil

Merge 589523-import-timeout into no-shell-true

436. By Vincent Ladeuil

Merge 589523-import-timeout into no-shell-true

437. By Vincent Ladeuil

Merge 589523-import-timeout into no-shell-true

438. By Vincent Ladeuil

Make sure the sleepers die in the end (still peaceful, they sleep ;)

439. By Vincent Ladeuil

Oops, we don't rely on the shell to find executables now and we rely on the traceback being available to categorize failures.

440. By Vincent Ladeuil

Tests requiring commits are a pita when it comes to typos.

441. By Vincent Ladeuil

Fix bad parsing of max_threads files, die untested obvious fixes, die !

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'icommon.py'
--- icommon.py 2011-02-24 13:53:11 +0000
+++ icommon.py 2011-02-24 13:53:11 +0000
@@ -1940,10 +1940,10 @@
19401940
1941class SubprocessMonitor(CatchingExceptionThread):1941class SubprocessMonitor(CatchingExceptionThread):
19421942
1943 def __init__(self, cmd, max_duration=None):1943 def __init__(self, args, max_duration=None):
1944 """Create a SubprocessMonitor.1944 """Create a SubprocessMonitor.
19451945
1946 :param cmd: The command string to be run in the subprocess.1946 :param args: A list of arguments to give to python.
19471947
1948 :param max_duration: The max number of seconds the command is expected1948 :param max_duration: The max number of seconds the command is expected
1949 to be run (elapsed time).1949 to be run (elapsed time).
@@ -1952,26 +1952,20 @@
1952 self.stopped = threading.Event()1952 self.stopped = threading.Event()
1953 super(SubprocessMonitor, self).__init__(target=self.spawn,1953 super(SubprocessMonitor, self).__init__(target=self.spawn,
1954 sync_event=self.started)1954 sync_event=self.started)
1955 self.cmd = cmd1955 self.args = args
1956 self.proc_pid = None1956 self.proc_pid = None
1957 self.retcode = None1957 self.retcode = None
1958 self.out = self.err = None
1958 self.started_at = self.ended_at = None1959 self.started_at = self.ended_at = None
1959 self.max_duration = max_duration1960 self.max_duration = max_duration
19601961
1961 def spawn(self):1962 def spawn(self):
1962 """Spawn the command in a subprocess."""1963 """Spawn the python command in a subprocess."""
1963 def child_setup():1964 proc = subprocess.Popen(['python'] + self.args,
1964 # python installs a SIGPIPE handler by default and signal1965 executable=sys.executable,
1965 # dispositions set to SIG_IGN (used in that case) are preserved
1966 # across fork and exceve. We don't want to propagate this to
1967 # non-python subprocesses.
1968 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
1969 proc = subprocess.Popen(self.cmd,
1970 shell=True,
1971 stdout=subprocess.PIPE,1966 stdout=subprocess.PIPE,
1972 stderr=subprocess.PIPE,1967 stderr=subprocess.PIPE,
1973 stdin=subprocess.PIPE,1968 stdin=subprocess.PIPE)
1974 preexec_fn=child_setup)
1975 self.proc_pid = proc.pid1969 self.proc_pid = proc.pid
1976 self.started_at = time.time()1970 self.started_at = time.time()
1977 self.switch_and_set(self.stopped)1971 self.switch_and_set(self.stopped)
@@ -1981,9 +1975,8 @@
1981 self.retcode = proc.returncode1975 self.retcode = proc.returncode
1982 except OSError, e:1976 except OSError, e:
1983 if e.errno in (errno.ECHILD, errno.ESRCH):1977 if e.errno in (errno.ECHILD, errno.ESRCH):
1984 # The process doesn't exist anymore, pretend is has been1978 # The process doesn't exist anymore
1985 # terminated1979 pass
1986 self.retcode = -signal.SIGTERM
1987 else:1980 else:
1988 raise1981 raise
1989 self.stopped.set()1982 self.stopped.set()
@@ -2011,7 +2004,6 @@
2011 if signum is None:2004 if signum is None:
2012 signum = signal.SIGTERM2005 signum = signal.SIGTERM
2013 os.kill(self.proc_pid, signum)2006 os.kill(self.proc_pid, signum)
2014 self.retcode = -signum
2015 killed = True2007 killed = True
2016 except OSError, e:2008 except OSError, e:
2017 if e.errno in (errno.ECHILD, errno.ESRCH):2009 if e.errno in (errno.ECHILD, errno.ESRCH):
20182010
=== modified file 'mass_import.py'
--- mass_import.py 2011-02-24 13:53:11 +0000
+++ mass_import.py 2011-02-24 13:53:11 +0000
@@ -103,6 +103,9 @@
103logger.addHandler(progress_handler)103logger.addHandler(progress_handler)
104104
105logger.info("Starting up")105logger.info("Starting up")
106# Some context to ease debug
107logger.info("PATH: %s" % os.environ.get('PATH', None))
108logger.info("CWD: %s" % os.getcwd())
106109
107110
108logger.debug("Getting Launchpad object")111logger.debug("Getting Launchpad object")
@@ -115,13 +118,13 @@
115118
116class Importer(icommon.SubprocessMonitor):119class Importer(icommon.SubprocessMonitor):
117120
118 def __init__(self, cmd, package_name, job_id):121 def __init__(self, args, package_name, job_id):
119 # FIXME: We should put the max_duration in a config file when it122 # FIXME: We should put the max_duration in a config file when it
120 # becomes available. Until then, during the wheezy import the longest123 # becomes available. Until then, during the wheezy import the longest
121 # import took more than 2 hours, so 24 is generous enough. On the other124 # import took more than 2 hours, so 24 is generous enough. On the other
122 # hand, we had evidence of imports hanging for weeks (at least a month125 # hand, we had evidence of imports hanging for weeks (at least a month
123 # even in one case, so 24 hours will get rid of them)126 # even in one case, so 24 hours will get rid of them)
124 super(Importer, self).__init__(cmd % (package_name,),127 super(Importer, self).__init__(args,
125 # No more than a day128 # No more than a day
126 max_duration=24*60*60)129 max_duration=24*60*60)
127 self.package_name = package_name130 self.package_name = package_name
@@ -133,7 +136,9 @@
133 if self.retcode == icommon.no_lock_returncode:136 if self.retcode == icommon.no_lock_returncode:
134 logger.info("Couldn't lock %s, skipping" % self.package_name)137 logger.info("Couldn't lock %s, skipping" % self.package_name)
135 else:138 else:
136 unicode_output = self.out.decode("utf-8", "replace")139 # We mostly care about self.err which contains the traceback here
140 output = self.out + self.err
141 unicode_output = output.decode("utf-8", "replace")
137 ascii_output = unicode_output.encode("ascii", "replace")142 ascii_output = unicode_output.encode("ascii", "replace")
138 success = self.retcode == 0143 success = self.retcode == 0
139 if success:144 if success:
@@ -174,7 +179,8 @@
174 except Queue.Empty:179 except Queue.Empty:
175 self.unfinished_tasks += 1180 self.unfinished_tasks += 1
176 job_id, package_name = self.next_job()181 job_id, package_name = self.next_job()
177 return Importer('import_package.py %s',182 return Importer(['scripts/import_package.py',
183 package_name],
178 package_name, job_id)184 package_name, job_id)
179185
180 def next_job(self):186 def next_job(self):
@@ -268,10 +274,13 @@
268 # Then check the graceful stop file274 # Then check the graceful stop file
269 if os.path.exists(icommon.stop_file):275 if os.path.exists(icommon.stop_file):
270 self.driver.queue_closed.set()276 self.driver.queue_closed.set()
277 logger.info('Driver would not process new requests')
271 continue278 continue
272 # And the number of threads279 # And the number of threads
273 nb_threads = self.get_max_threads()280 nb_threads = self.get_max_threads()
274 if self.driver.max_threads != nb_threads:281 if self.driver.max_threads != nb_threads:
282 logger.info('Read %d in %s'
283 % (nb_threads, icommon.max_threads_file))
275 self.driver.max_threads = nb_threads284 self.driver.max_threads = nb_threads
276 # Catch driver exception if any285 # Catch driver exception if any
277 self.report_driver_exception()286 self.report_driver_exception()
@@ -286,10 +295,12 @@
286 if os.path.exists(icommon.max_threads_file):295 if os.path.exists(icommon.max_threads_file):
287 f = open(icommon.max_threads_file)296 f = open(icommon.max_threads_file)
288 try:297 try:
289 nb_threads = int(f.readlines()[0].strip())298 line = f.readline().strip()
299 if line:
300 nb_threads = int(line)
290 except Exception, e:301 except Exception, e:
291 logger.warning('Error reading max threads file %s: %s',302 logger.warning('Error reading max threads file %s: %s'
292 (icommon.max_threads_file, str(e)))303 % (icommon.max_threads_file, str(e)))
293 finally:304 finally:
294 f.close()305 f.close()
295 return nb_threads306 return nb_threads
296307
=== modified file 'tests.py'
--- tests.py 2011-02-24 13:53:11 +0000
+++ tests.py 2011-02-24 13:53:11 +0000
@@ -381,50 +381,56 @@
381381
382class TestSubprocessMonitor(tests.TestCase):382class TestSubprocessMonitor(tests.TestCase):
383383
384 def run_in_subprocess(self, cmd, klass=None):384 def run_in_subprocess(self, args, klass=None):
385 if klass is None:385 if klass is None:
386 klass = icommon.SubprocessMonitor386 klass = icommon.SubprocessMonitor
387 sub = klass(cmd)387 sub = klass(args)
388 self.addCleanup(sub.join, 0)388 self.addCleanup(sub.join, 0)
389 sub.start()389 sub.start()
390 return sub390 return sub
391391
392 def run_sleeper_in_subprocess(self, klass=None):
393 """Run a sleeper in a subprocess.
394
395 This is used to simulate a never-ending process without consuming too
396 much CPU cycles.
397 """
398 cmd = 'import time; while True: time.sleep(0.001)'
399 return self.run_in_subprocess(['-c', cmd], klass=klass)
400
392 def test_broken_command(self):401 def test_broken_command(self):
393 sub = self.run_in_subprocess('this-is-not-a-program')402 sub = self.run_in_subprocess(['this-is-not-a-python-script'])
394 sub.stopped.wait()403 sub.stopped.wait()
395 self.assertEquals(True, sub.started.isSet())404 self.assertEquals(True, sub.started.isSet())
396 self.assertEquals('/bin/sh: this-is-not-a-program: not found\n',405 self.assertEquals(
397 sub.err)406 "python: can't open file 'this-is-not-a-python-script':"
407 " [Errno 2] No such file or directory\n",
408 sub.err)
398409
399 def test_success(self):410 def test_success(self):
400 sub = self.run_in_subprocess('echo toto')411 sub = self.run_in_subprocess(['-c', 'print "toto"'])
401 sub.stopped.wait()412 sub.stopped.wait()
402 self.assertEquals('toto\n', sub.out)413 self.assertEquals('toto\n', sub.out)
403 self.assertEquals('', sub.err)414 self.assertEquals('', sub.err)
404 self.assertEquals(0, sub.retcode)415 self.assertEquals(0, sub.retcode)
405416
406 def test_failure(self):417 def test_failure(self):
407 sub = self.run_in_subprocess('false')418 sub = self.run_in_subprocess(['-c', 'import sys; sys.exit(1)'])
408 sub.stopped.wait()419 sub.stopped.wait()
409 self.assertEquals('', sub.out)420 self.assertEquals('', sub.out)
410 self.assertEquals('', sub.err)421 self.assertEquals('', sub.err)
411 self.assertEquals(1, sub.retcode)422 self.assertEquals(1, sub.retcode)
412423
413 def test_kill(self):424 def test_kill(self):
414 sub = self.run_in_subprocess('yes')425 sub = self.run_sleeper_in_subprocess()
415 sub.started.wait()426 sub.started.wait()
416 self.assertEquals(True, sub.kill())427 self.assertEquals(True, sub.kill())
417 sub.stopped.wait()428 sub.stopped.wait()
418 self.assertEquals(-signal.SIGTERM, sub.retcode)429 self.assertEquals(-signal.SIGTERM, sub.retcode)
419430
420 def test_kill_SIGKILL(self):431 def test_kill_SIGKILL(self):
421 sub = self.run_in_subprocess('yes')432 sub = self.run_sleeper_in_subprocess()
422 sub.started.wait()433 sub.started.wait()
423 # For oscure reasons, using SIGABRT instead of SIGKILL and stopping
424 # below left the 'yes' process alive and the SubprocessMonitor happily
425 # consuming memroy (8GB mark reached while sitting under pdb, which is
426 # good enough to consider bug #589532 addressed.
427 # import pdb; pdb.set_trace()
428 self.assertEquals(True, sub.kill(signal.SIGKILL))434 self.assertEquals(True, sub.kill(signal.SIGKILL))
429 sub.stopped.wait()435 sub.stopped.wait()
430 self.assertEquals(-signal.SIGKILL, sub.retcode)436 self.assertEquals(-signal.SIGKILL, sub.retcode)
@@ -432,17 +438,17 @@
432 def test_kill_catch_zombie(self):438 def test_kill_catch_zombie(self):
433 kill_me = threading.Event()439 kill_me = threading.Event()
434 resume = threading.Event()440 resume = threading.Event()
435 class TestSubprocessMonitor(icommon.SubprocessMonitor):441 class TestMonitor(icommon.SubprocessMonitor):
436442
437 def switch_and_set(self, new):443 def switch_and_set(self, new):
438 super(TestSubprocessMonitor, self).switch_and_set(new)444 super(TestMonitor, self).switch_and_set(new)
439 if new is self.stopped:445 if new is self.stopped:
440 # The process is running but we haven't called446 # The process is running but we haven't called
441 # proc.communicate yet.447 # proc.communicate yet.
442 kill_me.set()448 kill_me.set()
443 resume.wait()449 resume.wait()
444450
445 sub = self.run_in_subprocess('yes', TestSubprocessMonitor)451 sub = self.run_sleeper_in_subprocess(klass=TestMonitor)
446 self.addCleanup(sub.join, 0)452 self.addCleanup(sub.join, 0)
447 sub.started.wait()453 sub.started.wait()
448 kill_me.wait()454 kill_me.wait()
@@ -450,25 +456,90 @@
450 os.kill(sub.proc_pid, signal.SIGTERM)456 os.kill(sub.proc_pid, signal.SIGTERM)
451 pid, st = os.waitpid(sub.proc_pid, 0)457 pid, st = os.waitpid(sub.proc_pid, 0)
452 self.assertEquals(sub.proc_pid, pid)458 self.assertEquals(sub.proc_pid, pid)
459 # Only the low byte contains the signal and the high bit if for core
460 # dumps.
461 self.assertEquals(signal.SIGTERM, st & 0x7F)
453 # Release the thread so it can call proc.communicate462 # Release the thread so it can call proc.communicate
454 resume.set()463 resume.set()
455 # Now pretend we don't know the subprocess is dead464 # Now pretend we don't know the subprocess is dead
456 self.assertEquals(False, sub.kill())465 self.assertEquals(False, sub.kill())
457 sub.join()466 sub.join()
458 self.assertEquals(-signal.SIGTERM, sub.retcode)467 # No useful retcode available
459468 self.assertEquals(None, sub.retcode)
469 # Nothing either in stdout/stderr. This means that subprocesses should
470 # handled SIGTERM to allow us to collect their data.
471 self.assertIs(None, sub.out)
472 self.assertIs(None, sub.err)
473
474
475class TestSubprocessMonitorWithScript(tests.TestCaseInTempDir):
476
477 def run_in_subprocess(self, args, klass=None):
478 if klass is None:
479 klass = icommon.SubprocessMonitor
480 sub = klass(args)
481 self.addCleanup(sub.join, 0)
482 sub.start()
483 return sub
484
485 def test_kill_caught(self):
486 self.build_tree_contents([('test.py',
487 """\
488class Killed(Exception):
489
490 def __str__(self):
491
492 return "I'm so dead"
493
494def die(signum, frame):
495 raise Killed
496import signal
497signal.signal(signal.SIGTERM, die)
498f = open('ready_to_die', 'w')
499try:
500 f.write('Yes I am\\n')
501finally:
502 f.close()
503import time
504slice = 0.001 # seconds
505max = 120.0 # seconds
506# We want to die long after the test fail (it it succeeds we're dead already)
507while xrange(max/slice):
508 time.sleep(0.001)
509""")])
510 sub = self.run_in_subprocess(['test.py'])
511 sub.started.wait()
512 # Gross hack to ensure the script ran long enough to catch the signal
513 ready = False
514 while not ready:
515 try:
516 f = open('ready_to_die')
517 ready = True
518 except:
519 pass
520 self.assertEquals(True, sub.kill())
521 sub.stopped.wait()
522 # The signal has been caught, we just get a failure return code, not
523 # funky stuff with the signal in the low byte and so on
524 self.assertEquals(1, sub.retcode)
525 # after the split, the last line is empty
526 self.assertEndsWith(sub.err.split('\n')[-2], "I'm so dead")
460527
461class Sleeper(icommon.SubprocessMonitor):528class Sleeper(icommon.SubprocessMonitor):
462529
463 def __init__(self, sleep_time=0.001, max_duration=None):530 def __init__(self, sleep_time=0.001, max_duration=None):
464 # sleep can't be killed (or interrupted) so we don't sleep for long but531 # sleep can't be killed (or interrupted) so we don't sleep for long but
465 # keep sleeping until killed532 # keep sleeping until killed
466 super(Sleeper, self).__init__('while true ; do sleep %f ; done'533 super(Sleeper, self).__init__(
467 % (sleep_time,),534 # Make sure we die long after the test finish (to avoid leaks) but
468 max_duration=max_duration)535 # still use a value that allow *some* debug (you want more for
469536 # *heavy* debug :)
470537 ['-c', 'import time; while xrange(120.0/%s): time.sleep(%s)'
471class TestDriverFeatures(tests.TestCase):538 % (sleep_time, sleep_time)],
539 max_duration=max_duration)
540
541
542class TestThreadDriver(tests.TestCase):
472543
473 def get_driver(self, *args, **kwargs):544 def get_driver(self, *args, **kwargs):
474 klass = kwargs.pop('klass', None)545 klass = kwargs.pop('klass', None)
@@ -557,10 +628,9 @@
557628
558 def collect(thread):629 def collect(thread):
559 self.assertIs(self.thread, thread)630 self.assertIs(self.thread, thread)
560 # false returns 1
561 self.assertEquals(1, thread.retcode)631 self.assertEquals(1, thread.retcode)
562 self.terminated.set()632 self.terminated.set()
563 self.thread = TestThread('false')633 self.thread = TestThread(['-c', 'import sys; sys.exit(1)'])
564 self.addCleanup(self.thread.join, 0)634 self.addCleanup(self.thread.join, 0)
565 q = Queue.Queue(1)635 q = Queue.Queue(1)
566 q.put(self.thread)636 q.put(self.thread)
@@ -583,7 +653,8 @@
583 self.addCleanup(driver.stop.set)653 self.addCleanup(driver.stop.set)
584 # Wait for the subprocess thread to start654 # Wait for the subprocess thread to start
585 self.started.wait()655 self.started.wait()
586 # the thread is waiting but not yet deleted from the driver656 # the thread is started and at most wait for collection on
657 # self.terminated so it's still there
587 self.assertLength(1, driver.threads)658 self.assertLength(1, driver.threads)
588 self.assertEquals(self.thread, driver.threads[0])659 self.assertEquals(self.thread, driver.threads[0])
589 self.resumed.set()660 self.resumed.set()
@@ -596,11 +667,10 @@
596 class TestThread(icommon.SubprocessMonitor):667 class TestThread(icommon.SubprocessMonitor):
597668
598 def collect(thread):669 def collect(thread):
599 # false returns 1
600 self.assertEquals(1, thread.retcode)670 self.assertEquals(1, thread.retcode)
601 raise ExceptionForTests()671 raise ExceptionForTests()
602 q = Queue.Queue(1)672 q = Queue.Queue(1)
603 t = TestThread('false')673 t = TestThread(['-c', 'import sys; sys.exit(1)'])
604 self.addCleanup(t.join, 0)674 self.addCleanup(t.join, 0)
605 q.put(t)675 q.put(t)
606 driver = self.get_started_driver(queue=q, max_threads=1)676 driver = self.get_started_driver(queue=q, max_threads=1)

Subscribers

People subscribed via source and target branches