Merge lp:~vila/udd/no-shell-true into lp:udd

Proposed by Vincent Ladeuil
Status: Merged
Approved by: Jelmer Vernooij
Approved revision: 439
Merged at revision: 405
Proposed branch: lp:~vila/udd/no-shell-true
Merge into: lp:udd
Prerequisite: lp:~vila/udd/589523-import-timeout
Diff against target: 380 lines (+128/-55)
3 files modified
icommon.py (+10/-18)
mass_import.py (+18/-7)
tests.py (+100/-30)
To merge this branch: bzr merge lp:~vila/udd/no-shell-true
Reviewer Review Type Date Requested Status
James Westby Approve
Review via email: mp+50921@code.launchpad.net

Description of the change

As mentioned in previous submissions, using shell=True with subprocesses led to complications for handling kills.

In preparation for a next submission that will allow more than import_package.py to be invoked, I also want to get rid of shell=True for security concerns.

I also added a test to ensure that we can still capture the output/error of a python subprocess if it catch interrupting signals and handled them.

Note that we can't reliably guarantee what return code we will get when a process is killed and I've stopped cheating in this regard ;)

To post a comment you must log in.
Revision history for this message
James Westby (james-w) wrote :

94 + cmd = 'import time; while True: time.sleep(0.001)'

Should this have a timeout?

Other than that this looks good.

Thanks,

James

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 2/23/2011 10:01 AM, James Westby wrote:
> Review: Approve
> 94 + cmd = 'import time; while True: time.sleep(0.001)'
>
> Should this have a timeout?
>
> Other than that this looks good.
>
> Thanks,
>
> James
>

It intentionally doesn't, because it is testing the kill code. But you
have a point, setting it to:

for i in xrange(10000): time.sleep(0.001)

would give us 10s of a process running, and not leave something forever
if the test fails for some reason.

John
=:->

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk1lPsMACgkQJdeBCYSNAAN9ewCfZ5GCUl4wYCdZuOvXbHb147+7
BckAoLeQrQTvbLHmtIPzFF/z4LQNo2kW
=a/qV
-----END PGP SIGNATURE-----

Revision history for this message
Vincent Ladeuil (vila) wrote :

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> On 2/23/2011 10:01 AM, James Westby wrote:
> > Review: Approve
> > 94 + cmd = 'import time; while True: time.sleep(0.001)'
> >
> > Should this have a timeout?
> It intentionally doesn't, because it is testing the kill code.

Yup.

> But you have a point, setting it to:

Yup.

There are addCleanup(t.kill) that *should* kill them but...

lp:~vila/udd/no-shell-true updated
435. By Vincent Ladeuil

Merge 589523-import-timeout into no-shell-true

436. By Vincent Ladeuil

Merge 589523-import-timeout into no-shell-true

437. By Vincent Ladeuil

Merge 589523-import-timeout into no-shell-true

438. By Vincent Ladeuil

Make sure the sleepers die in the end (still peaceful, they sleep ;)

439. By Vincent Ladeuil

Oops, we don't rely on the shell to find executables now and we rely on the traceback being available to categorize failures.

440. By Vincent Ladeuil

Tests requiring commits are a pita when it comes to typos.

441. By Vincent Ladeuil

Fix bad parsing of max_threads files, die untested obvious fixes, die !

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'icommon.py'
2--- icommon.py 2011-02-24 13:53:11 +0000
3+++ icommon.py 2011-02-24 13:53:11 +0000
4@@ -1940,10 +1940,10 @@
5
6 class SubprocessMonitor(CatchingExceptionThread):
7
8- def __init__(self, cmd, max_duration=None):
9+ def __init__(self, args, max_duration=None):
10 """Create a SubprocessMonitor.
11
12- :param cmd: The command string to be run in the subprocess.
13+ :param args: A list of arguments to give to python.
14
15 :param max_duration: The max number of seconds the command is expected
16 to be run (elapsed time).
17@@ -1952,26 +1952,20 @@
18 self.stopped = threading.Event()
19 super(SubprocessMonitor, self).__init__(target=self.spawn,
20 sync_event=self.started)
21- self.cmd = cmd
22+ self.args = args
23 self.proc_pid = None
24 self.retcode = None
25+ self.out = self.err = None
26 self.started_at = self.ended_at = None
27 self.max_duration = max_duration
28
29 def spawn(self):
30- """Spawn the command in a subprocess."""
31- def child_setup():
32- # python installs a SIGPIPE handler by default and signal
33- # dispositions set to SIG_IGN (used in that case) are preserved
34- # across fork and exceve. We don't want to propagate this to
35- # non-python subprocesses.
36- signal.signal(signal.SIGPIPE, signal.SIG_DFL)
37- proc = subprocess.Popen(self.cmd,
38- shell=True,
39+ """Spawn the python command in a subprocess."""
40+ proc = subprocess.Popen(['python'] + self.args,
41+ executable=sys.executable,
42 stdout=subprocess.PIPE,
43 stderr=subprocess.PIPE,
44- stdin=subprocess.PIPE,
45- preexec_fn=child_setup)
46+ stdin=subprocess.PIPE)
47 self.proc_pid = proc.pid
48 self.started_at = time.time()
49 self.switch_and_set(self.stopped)
50@@ -1981,9 +1975,8 @@
51 self.retcode = proc.returncode
52 except OSError, e:
53 if e.errno in (errno.ECHILD, errno.ESRCH):
54- # The process doesn't exist anymore, pretend is has been
55- # terminated
56- self.retcode = -signal.SIGTERM
57+ # The process doesn't exist anymore
58+ pass
59 else:
60 raise
61 self.stopped.set()
62@@ -2011,7 +2004,6 @@
63 if signum is None:
64 signum = signal.SIGTERM
65 os.kill(self.proc_pid, signum)
66- self.retcode = -signum
67 killed = True
68 except OSError, e:
69 if e.errno in (errno.ECHILD, errno.ESRCH):
70
71=== modified file 'mass_import.py'
72--- mass_import.py 2011-02-24 13:53:11 +0000
73+++ mass_import.py 2011-02-24 13:53:11 +0000
74@@ -103,6 +103,9 @@
75 logger.addHandler(progress_handler)
76
77 logger.info("Starting up")
78+# Some context to ease debug
79+logger.info("PATH: %s" % os.environ.get('PATH', None))
80+logger.info("CWD: %s" % os.getcwd())
81
82
83 logger.debug("Getting Launchpad object")
84@@ -115,13 +118,13 @@
85
86 class Importer(icommon.SubprocessMonitor):
87
88- def __init__(self, cmd, package_name, job_id):
89+ def __init__(self, args, package_name, job_id):
90 # FIXME: We should put the max_duration in a config file when it
91 # becomes available. Until then, during the wheezy import the longest
92 # import took more than 2 hours, so 24 is generous enough. On the other
93 # hand, we had evidence of imports hanging for weeks (at least a month
94 # even in one case, so 24 hours will get rid of them)
95- super(Importer, self).__init__(cmd % (package_name,),
96+ super(Importer, self).__init__(args,
97 # No more than a day
98 max_duration=24*60*60)
99 self.package_name = package_name
100@@ -133,7 +136,9 @@
101 if self.retcode == icommon.no_lock_returncode:
102 logger.info("Couldn't lock %s, skipping" % self.package_name)
103 else:
104- unicode_output = self.out.decode("utf-8", "replace")
105+ # We mostly care about self.err which contains the traceback here
106+ output = self.out + self.err
107+ unicode_output = output.decode("utf-8", "replace")
108 ascii_output = unicode_output.encode("ascii", "replace")
109 success = self.retcode == 0
110 if success:
111@@ -174,7 +179,8 @@
112 except Queue.Empty:
113 self.unfinished_tasks += 1
114 job_id, package_name = self.next_job()
115- return Importer('import_package.py %s',
116+ return Importer(['scripts/import_package.py',
117+ package_name],
118 package_name, job_id)
119
120 def next_job(self):
121@@ -268,10 +274,13 @@
122 # Then check the graceful stop file
123 if os.path.exists(icommon.stop_file):
124 self.driver.queue_closed.set()
125+ logger.info('Driver would not process new requests')
126 continue
127 # And the number of threads
128 nb_threads = self.get_max_threads()
129 if self.driver.max_threads != nb_threads:
130+ logger.info('Read %d in %s'
131+ % (nb_threads, icommon.max_threads_file))
132 self.driver.max_threads = nb_threads
133 # Catch driver exception if any
134 self.report_driver_exception()
135@@ -286,10 +295,12 @@
136 if os.path.exists(icommon.max_threads_file):
137 f = open(icommon.max_threads_file)
138 try:
139- nb_threads = int(f.readlines()[0].strip())
140+ line = f.readline().strip()
141+ if line:
142+ nb_threads = int(line)
143 except Exception, e:
144- logger.warning('Error reading max threads file %s: %s',
145- (icommon.max_threads_file, str(e)))
146+ logger.warning('Error reading max threads file %s: %s'
147+ % (icommon.max_threads_file, str(e)))
148 finally:
149 f.close()
150 return nb_threads
151
152=== modified file 'tests.py'
153--- tests.py 2011-02-24 13:53:11 +0000
154+++ tests.py 2011-02-24 13:53:11 +0000
155@@ -381,50 +381,56 @@
156
157 class TestSubprocessMonitor(tests.TestCase):
158
159- def run_in_subprocess(self, cmd, klass=None):
160+ def run_in_subprocess(self, args, klass=None):
161 if klass is None:
162 klass = icommon.SubprocessMonitor
163- sub = klass(cmd)
164+ sub = klass(args)
165 self.addCleanup(sub.join, 0)
166 sub.start()
167 return sub
168
169+ def run_sleeper_in_subprocess(self, klass=None):
170+ """Run a sleeper in a subprocess.
171+
172+ This is used to simulate a never-ending process without consuming too
173+ much CPU cycles.
174+ """
175+ cmd = 'import time; while True: time.sleep(0.001)'
176+ return self.run_in_subprocess(['-c', cmd], klass=klass)
177+
178 def test_broken_command(self):
179- sub = self.run_in_subprocess('this-is-not-a-program')
180+ sub = self.run_in_subprocess(['this-is-not-a-python-script'])
181 sub.stopped.wait()
182 self.assertEquals(True, sub.started.isSet())
183- self.assertEquals('/bin/sh: this-is-not-a-program: not found\n',
184- sub.err)
185+ self.assertEquals(
186+ "python: can't open file 'this-is-not-a-python-script':"
187+ " [Errno 2] No such file or directory\n",
188+ sub.err)
189
190 def test_success(self):
191- sub = self.run_in_subprocess('echo toto')
192+ sub = self.run_in_subprocess(['-c', 'print "toto"'])
193 sub.stopped.wait()
194 self.assertEquals('toto\n', sub.out)
195 self.assertEquals('', sub.err)
196 self.assertEquals(0, sub.retcode)
197
198 def test_failure(self):
199- sub = self.run_in_subprocess('false')
200+ sub = self.run_in_subprocess(['-c', 'import sys; sys.exit(1)'])
201 sub.stopped.wait()
202 self.assertEquals('', sub.out)
203 self.assertEquals('', sub.err)
204 self.assertEquals(1, sub.retcode)
205
206 def test_kill(self):
207- sub = self.run_in_subprocess('yes')
208+ sub = self.run_sleeper_in_subprocess()
209 sub.started.wait()
210 self.assertEquals(True, sub.kill())
211 sub.stopped.wait()
212 self.assertEquals(-signal.SIGTERM, sub.retcode)
213
214 def test_kill_SIGKILL(self):
215- sub = self.run_in_subprocess('yes')
216+ sub = self.run_sleeper_in_subprocess()
217 sub.started.wait()
218- # For oscure reasons, using SIGABRT instead of SIGKILL and stopping
219- # below left the 'yes' process alive and the SubprocessMonitor happily
220- # consuming memroy (8GB mark reached while sitting under pdb, which is
221- # good enough to consider bug #589532 addressed.
222- # import pdb; pdb.set_trace()
223 self.assertEquals(True, sub.kill(signal.SIGKILL))
224 sub.stopped.wait()
225 self.assertEquals(-signal.SIGKILL, sub.retcode)
226@@ -432,17 +438,17 @@
227 def test_kill_catch_zombie(self):
228 kill_me = threading.Event()
229 resume = threading.Event()
230- class TestSubprocessMonitor(icommon.SubprocessMonitor):
231+ class TestMonitor(icommon.SubprocessMonitor):
232
233 def switch_and_set(self, new):
234- super(TestSubprocessMonitor, self).switch_and_set(new)
235+ super(TestMonitor, self).switch_and_set(new)
236 if new is self.stopped:
237 # The process is running but we haven't called
238 # proc.communicate yet.
239 kill_me.set()
240 resume.wait()
241
242- sub = self.run_in_subprocess('yes', TestSubprocessMonitor)
243+ sub = self.run_sleeper_in_subprocess(klass=TestMonitor)
244 self.addCleanup(sub.join, 0)
245 sub.started.wait()
246 kill_me.wait()
247@@ -450,25 +456,90 @@
248 os.kill(sub.proc_pid, signal.SIGTERM)
249 pid, st = os.waitpid(sub.proc_pid, 0)
250 self.assertEquals(sub.proc_pid, pid)
251+ # Only the low byte contains the signal and the high bit if for core
252+ # dumps.
253+ self.assertEquals(signal.SIGTERM, st & 0x7F)
254 # Release the thread so it can call proc.communicate
255 resume.set()
256 # Now pretend we don't know the subprocess is dead
257 self.assertEquals(False, sub.kill())
258 sub.join()
259- self.assertEquals(-signal.SIGTERM, sub.retcode)
260-
261+ # No useful retcode available
262+ self.assertEquals(None, sub.retcode)
263+ # Nothing either in stdout/stderr. This means that subprocesses should
264+ # handled SIGTERM to allow us to collect their data.
265+ self.assertIs(None, sub.out)
266+ self.assertIs(None, sub.err)
267+
268+
269+class TestSubprocessMonitorWithScript(tests.TestCaseInTempDir):
270+
271+ def run_in_subprocess(self, args, klass=None):
272+ if klass is None:
273+ klass = icommon.SubprocessMonitor
274+ sub = klass(args)
275+ self.addCleanup(sub.join, 0)
276+ sub.start()
277+ return sub
278+
279+ def test_kill_caught(self):
280+ self.build_tree_contents([('test.py',
281+ """\
282+class Killed(Exception):
283+
284+ def __str__(self):
285+
286+ return "I'm so dead"
287+
288+def die(signum, frame):
289+ raise Killed
290+import signal
291+signal.signal(signal.SIGTERM, die)
292+f = open('ready_to_die', 'w')
293+try:
294+ f.write('Yes I am\\n')
295+finally:
296+ f.close()
297+import time
298+slice = 0.001 # seconds
299+max = 120.0 # seconds
300+# We want to die long after the test fail (it it succeeds we're dead already)
301+while xrange(max/slice):
302+ time.sleep(0.001)
303+""")])
304+ sub = self.run_in_subprocess(['test.py'])
305+ sub.started.wait()
306+ # Gross hack to ensure the script ran long enough to catch the signal
307+ ready = False
308+ while not ready:
309+ try:
310+ f = open('ready_to_die')
311+ ready = True
312+ except:
313+ pass
314+ self.assertEquals(True, sub.kill())
315+ sub.stopped.wait()
316+ # The signal has been caught, we just get a failure return code, not
317+ # funky stuff with the signal in the low byte and so on
318+ self.assertEquals(1, sub.retcode)
319+ # after the split, the last line is empty
320+ self.assertEndsWith(sub.err.split('\n')[-2], "I'm so dead")
321
322 class Sleeper(icommon.SubprocessMonitor):
323
324 def __init__(self, sleep_time=0.001, max_duration=None):
325 # sleep can't be killed (or interrupted) so we don't sleep for long but
326 # keep sleeping until killed
327- super(Sleeper, self).__init__('while true ; do sleep %f ; done'
328- % (sleep_time,),
329- max_duration=max_duration)
330-
331-
332-class TestDriverFeatures(tests.TestCase):
333+ super(Sleeper, self).__init__(
334+ # Make sure we die long after the test finish (to avoid leaks) but
335+ # still use a value that allow *some* debug (you want more for
336+ # *heavy* debug :)
337+ ['-c', 'import time; while xrange(120.0/%s): time.sleep(%s)'
338+ % (sleep_time, sleep_time)],
339+ max_duration=max_duration)
340+
341+
342+class TestThreadDriver(tests.TestCase):
343
344 def get_driver(self, *args, **kwargs):
345 klass = kwargs.pop('klass', None)
346@@ -557,10 +628,9 @@
347
348 def collect(thread):
349 self.assertIs(self.thread, thread)
350- # false returns 1
351 self.assertEquals(1, thread.retcode)
352 self.terminated.set()
353- self.thread = TestThread('false')
354+ self.thread = TestThread(['-c', 'import sys; sys.exit(1)'])
355 self.addCleanup(self.thread.join, 0)
356 q = Queue.Queue(1)
357 q.put(self.thread)
358@@ -583,7 +653,8 @@
359 self.addCleanup(driver.stop.set)
360 # Wait for the subprocess thread to start
361 self.started.wait()
362- # the thread is waiting but not yet deleted from the driver
363+ # the thread is started and at most wait for collection on
364+ # self.terminated so it's still there
365 self.assertLength(1, driver.threads)
366 self.assertEquals(self.thread, driver.threads[0])
367 self.resumed.set()
368@@ -596,11 +667,10 @@
369 class TestThread(icommon.SubprocessMonitor):
370
371 def collect(thread):
372- # false returns 1
373 self.assertEquals(1, thread.retcode)
374 raise ExceptionForTests()
375 q = Queue.Queue(1)
376- t = TestThread('false')
377+ t = TestThread(['-c', 'import sys; sys.exit(1)'])
378 self.addCleanup(t.join, 0)
379 q.put(t)
380 driver = self.get_started_driver(queue=q, max_threads=1)

Subscribers

People subscribed via source and target branches