Merge lp:~yolanda.robla/ubuntu/saucy/postfix/dep-8-tests into lp:ubuntu/saucy/postfix

Proposed by Yolanda Robla
Status: Merged
Merged at revision: 57
Proposed branch: lp:~yolanda.robla/ubuntu/saucy/postfix/dep-8-tests
Merge into: lp:ubuntu/saucy/postfix
Diff against target: 1744 lines (+1705/-0)
6 files modified
debian/changelog (+6/-0)
debian/control (+1/-0)
debian/tests/control (+3/-0)
debian/tests/postfix (+16/-0)
debian/tests/test-postfix.py (+535/-0)
debian/tests/testlib.py (+1144/-0)
To merge this branch: bzr merge lp:~yolanda.robla/ubuntu/saucy/postfix/dep-8-tests
Reviewer Review Type Date Requested Status
Dimitri John Ledkov Approve
James Hunt (community) Approve
Daniel Holbach (community) Needs Fixing
Ubuntu branches Pending
Review via email: mp+161610@code.launchpad.net

Description of the change

Added dep-8 tests

To post a comment you must log in.
Revision history for this message
Daniel Holbach (dholbach) wrote :

Could you add a changelog entry in debian/changelog for the upload? It might also be a good idea to forward the change to Debian.

review: Needs Fixing
57. By Yolanda Robla

d/tests: added dep-8-tests

Revision history for this message
Yolanda Robla (yolanda.robla) wrote :

recheck

Revision history for this message
James Hunt (jamesodhunt) wrote :

LGTM. Has this been submitted to Debian yet?

Revision history for this message
James Hunt (jamesodhunt) wrote :

status update.

review: Approve
Revision history for this message
Dimitri John Ledkov (xnox) wrote :

Had to run "update-maintainer" to modify maintainer/original-maintainer fields in debian/control.
Looks very good otherwise.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'debian/changelog'
2--- debian/changelog 2013-03-22 10:30:29 +0000
3+++ debian/changelog 2013-05-07 21:42:30 +0000
4@@ -1,3 +1,9 @@
5+postfix (2.10.0-3ubuntu1) saucy; urgency=low
6+
7+ * d/tests: added dep-8-tests
8+
9+ -- Yolanda <yolanda.robla@canonical.com> Tue, 07 May 2013 23:34:04 +0200
10+
11 postfix (2.10.0-3) unstable; urgency=low
12
13 [LaMont Jones]
14
15=== modified file 'debian/control'
16--- debian/control 2013-03-04 09:03:31 +0000
17+++ debian/control 2013-05-07 21:42:30 +0000
18@@ -7,6 +7,7 @@
19 Build-Depends: debhelper (>= 7), po-debconf (>= 0.5.0), groff-base, patch, lsb-release, libdb-dev (>=4.6.19), libldap2-dev (>=2.1), libpcre3-dev, libmysqlclient-dev|libmysqlclient15-dev|libmysqlclient14-dev, libssl-dev (>=0.9.7), libsasl2-dev, libpq-dev, libcdb-dev, hardening-wrapper, dpkg-dev (>= 1.15.5), libsqlite3-dev
20 Vcs-Browser: http://git.debian.org/?p=users/lamont/postfix.git
21 Vcs-Git: git://git.debian.org/~lamont/postfix.git
22+XS-Testsuite: autopkgtest
23
24 Package: postfix
25 Architecture: any
26
27=== added directory 'debian/tests'
28=== added file 'debian/tests/control'
29--- debian/tests/control 1970-01-01 00:00:00 +0000
30+++ debian/tests/control 2013-05-07 21:42:30 +0000
31@@ -0,0 +1,3 @@
32+Tests: postfix
33+Depends: python-unit, procmail, sasl2-bin, python-pexpect, lsb-release
34+Restrictions: needs-root
35
36=== added file 'debian/tests/postfix'
37--- debian/tests/postfix 1970-01-01 00:00:00 +0000
38+++ debian/tests/postfix 2013-05-07 21:42:30 +0000
39@@ -0,0 +1,16 @@
40+#!/bin/bash
41+#----------------
42+# Testing postfix
43+#----------------
44+set -e
45+
46+# reconfigure postfix
47+debconf-set-selections <<< "postfix postfix/mailname string localhost" 2>&1
48+debconf-set-selections <<< "postfix postfix/main_mailer_type string 'Internet Site'" 2>&1
49+
50+# install and modify
51+hostname localhost
52+apt-get install -y postfix 2>&1
53+hostname --fqdn > /etc/mailname
54+/etc/init.d/postfix restart 2>&1
55+python `dirname $0`/test-postfix.py 2>&1
56
57=== added file 'debian/tests/test-postfix.py'
58--- debian/tests/test-postfix.py 1970-01-01 00:00:00 +0000
59+++ debian/tests/test-postfix.py 2013-05-07 21:42:30 +0000
60@@ -0,0 +1,535 @@
61+#!/usr/bin/python
62+#
63+# test-postfix.py quality assurance test script for postfix
64+# Copyright (C) 2008-2012 Canonical Ltd.
65+# Author: Kees Cook <kees@ubuntu.com>
66+# Author: Marc Deslauriers <marc.deslauriers@canonical.com>
67+# Author: Jamie Strandboge <jamie@canonical.com>
68+#
69+# This program is free software: you can redistribute it and/or modify
70+# it under the terms of the GNU General Public License version 3,
71+# as published by the Free Software Foundation.
72+#
73+# This program is distributed in the hope that it will be useful,
74+# but WITHOUT ANY WARRANTY; without even the implied warranty of
75+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
76+# GNU General Public License for more details.
77+#
78+# You should have received a copy of the GNU General Public License
79+# along with this program. If not, see <http://www.gnu.org/licenses/>.
80+#
81+# QRT-Packages: postfix sasl2-bin procmail python-pexpect
82+# QRT-Privilege: root
83+# QRT-Conflicts: exim4
84+
85+'''
86+ Note: When installing postfix, select "Internet Site". This script will
87+ not work if "Local Only" was selected.
88+
89+ How to run against a clean schroot named 'hardy':
90+ schroot -c hardy -u root -- sh -c 'apt-get -y install procmail python-unit postfix sasl2-bin python-pexpect lsb-release && ./test-postfix.py -v'
91+
92+ Tests:
93+ 00: setup
94+ 10: basic plain auth setup
95+ 11: above, but with CVE reproducers
96+ 20: sasl non-PLAIN setup
97+ 21: 20, but with CVE reproducers
98+ 99: restore configs
99+'''
100+
101+import unittest, subprocess, re, pexpect, smtplib, socket, os, time, tempfile
102+import testlib
103+
104+class PostfixTest(testlib.TestlibCase):
105+ '''Test Postfix MTA.'''
106+
107+ def _setUp(self):
108+ '''Create server configs.'''
109+
110+ # Move listener to localhost:2525
111+ conf_file = '/etc/postfix/master.cf'
112+ lines = open(conf_file)
113+ contents = ''
114+ for cfline in lines:
115+ if cfline.startswith('smtp') and 'smtpd' in cfline and 'inet' in cfline:
116+ contents += '127.0.0.1:2525 inet n - - - - smtpd\n'
117+ else:
118+ contents += "%s\n" % cfline
119+ testlib.config_replace(conf_file, contents, append=False)
120+
121+ conf_file = '/etc/postfix/main.cf'
122+ # Use mbox only
123+ testlib.config_comment(conf_file,'home_mailbox')
124+ testlib.config_set(conf_file,'mailbox_command','procmail -a "$EXTENSION"')
125+
126+ # Turn on sasl
127+ self._setup_sasl("PLAIN")
128+ reply = self._check_auth("PLAIN")
129+
130+
131+ def setUp(self):
132+ '''Set up prior to each test_* function'''
133+ # list of files that we update
134+ self.conf_files = [ '/etc/postfix/master.cf', '/etc/postfix/main.cf', '/etc/default/saslauthd', '/etc/postfix/sasl/smtpd.conf', '/etc/sasldb2']
135+
136+ self.user = testlib.TestUser(lower=True)
137+ self.s = None
138+ # Silently allow for this connection to fail, to handle the
139+ # initial setup of the postfix server.
140+ try:
141+ self.s = smtplib.SMTP('localhost', port=2525)
142+ except:
143+ pass
144+
145+ def _tearDown(self):
146+ '''Restore server configs'''
147+ for f in self.conf_files:
148+ testlib.config_restore(f)
149+
150+ # put saslauthd back
151+ for f in ['/var/spool/postfix/var/run/saslauthd', '/var/run/saslauthd']:
152+ if os.path.isfile(f) or os.path.islink(f):
153+ os.unlink(f)
154+ elif os.path.exists(f):
155+ testlib.recursive_rm(f)
156+ subprocess.call(['mkdir','-p','/var/run/saslauthd'])
157+ subprocess.call(['/etc/init.d/saslauthd', 'stop'], stdout=subprocess.PIPE)
158+ subprocess.call(['/etc/init.d/saslauthd', 'start'], stdout=subprocess.PIPE)
159+
160+ def tearDown(self):
161+ '''Clean up after each test_* function'''
162+
163+ try:
164+ self.s.quit()
165+ except:
166+ pass
167+ self.user = None
168+
169+ def _restart_server(self):
170+ '''Restart server'''
171+ subprocess.call(['/etc/init.d/postfix', 'stop'], stdout=subprocess.PIPE)
172+ assert subprocess.call(['/etc/init.d/postfix', 'start'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
173+ # Postfix exits its init script before the master listener has started
174+ time.sleep(2)
175+
176+ def _setup_sasl(self, mech, other_mech="", force_sasldb=False):
177+ '''Setup sasl for mech'''
178+ conf_file = '/etc/postfix/main.cf'
179+ for field in ['smtpd_sasl_type','smtpd_sasl_local_domain','smtpd_tls_auth_only']:
180+ testlib.config_comment(conf_file,field)
181+ testlib.config_set(conf_file,'smtpd_sasl_path','smtpd')
182+ testlib.config_set(conf_file,'smtpd_sasl_auth_enable','yes')
183+ #testlib.config_set(conf_file,'broken_sasl_auth_clients','yes')
184+ testlib.config_set(conf_file,'smtpd_sasl_authenticated_header','yes')
185+ testlib.config_set(conf_file,'smtpd_tls_loglevel','2')
186+
187+ # setup smtpd.conf and the sasl users
188+ contents = ''
189+
190+ self.assertTrue(mech in ['LOGIN', 'PLAIN', 'CRAM-MD5', 'DIGEST-MD5'], "Invalid mech: %s" % mech)
191+
192+ if not force_sasldb and (mech == "PLAIN" or mech == "LOGIN"):
193+ conf_file = '/etc/default/saslauthd'
194+ testlib.config_set(conf_file, 'START', 'yes', spaces=False)
195+
196+ contents = '''
197+pwcheck_method: saslauthd
198+allowanonymouslogin: 0
199+allowplaintext: 1
200+mech_list: %s %s
201+''' % (mech, other_mech)
202+
203+ # attach SASL to postfix chroot
204+ subprocess.call(['mkdir','-p','/var/spool/postfix/var/run/saslauthd'])
205+ subprocess.call(['rm','-rf','/var/run/saslauthd'])
206+ subprocess.call(['ln','-s','/var/spool/postfix/var/run/saslauthd','/var/run/saslauthd'])
207+ subprocess.call(['/etc/init.d/saslauthd', 'stop'], stdout=subprocess.PIPE)
208+ assert subprocess.call(['/etc/init.d/saslauthd', 'start'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
209+
210+ # Force crackful perms so chroot'd postfix can talk to saslauthd
211+ subprocess.call(['chmod','o+x','/var/spool/postfix/var/run/saslauthd'])
212+ else:
213+ plaintext = "1"
214+ if mech == "LOGIN" or mech == "PLAIN":
215+ plaintext = "0"
216+ contents = '''
217+pwcheck_method: auxprop
218+allowanonymouslogin: 0
219+allowplaintext: %s
220+mech_list: %s %s
221+''' % (plaintext, mech, other_mech)
222+
223+ # Add user to sasldb2
224+ testlib.config_replace("/etc/sasldb2", '', append=False)
225+
226+ rc, report = testlib.cmd(['postconf', '-h', 'myhostname'])
227+ expected = 0
228+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
229+ self.assertEquals(expected, rc, result + report)
230+
231+ child = pexpect.spawn('saslpasswd2 -c -u %s %s' % (report.strip(), self.user.login))
232+ time.sleep(0.2)
233+ child.expect(r'.*[pP]assword', timeout=5)
234+ time.sleep(0.2)
235+ child.sendline(self.user.password)
236+ time.sleep(0.2)
237+ child.expect(r'.*(for verification)', timeout=5)
238+ time.sleep(0.2)
239+ child.sendline(self.user.password)
240+ time.sleep(0.2)
241+ rc = child.expect('\n', timeout=5)
242+ time.sleep(0.2)
243+ self.assertEquals(rc, expected, "passwd returned %d" %(rc))
244+
245+ child.kill(0)
246+
247+ os.chmod("/etc/sasldb2", 0640)
248+ rc, report = testlib.cmd(['chgrp', 'postfix', '/etc/sasldb2'])
249+ expected = 0
250+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
251+ self.assertEquals(expected, rc, result + report)
252+
253+ # Force crackful perms so chroot'd postfix can talk to saslauthd
254+ subprocess.call(['mv', '-f', '/etc/sasldb2', '/var/spool/postfix/etc'])
255+ subprocess.call(['ln', '-s', '/var/spool/postfix/etc/sasldb2', '/etc/sasldb2'])
256+
257+ conf_file = '/etc/postfix/sasl/smtpd.conf'
258+ testlib.config_replace(conf_file, contents, append=False)
259+
260+ # Restart server
261+ self._restart_server()
262+
263+ def _is_listening(self):
264+ '''Is the server listening'''
265+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266+ s.settimeout(5)
267+ s.connect(('localhost',2525))
268+ greeting = s.recv(1024)
269+ # 220 gorgon.outflux.net ESMTP Postfix (Ubuntu)
270+ self.assertTrue(greeting.startswith('220 '),greeting)
271+ self.assertTrue('ESMTP' in greeting,greeting)
272+ self.assertTrue('Postfix' in greeting,greeting)
273+ self.assertFalse('MTA' in greeting,greeting)
274+ s.close()
275+
276+ def test_00_listening(self):
277+ '''Postfix is listening'''
278+ # Get the main instance running
279+ self._setUp()
280+
281+ self._is_listening()
282+
283+ def _vrfy(self, address, valid = True):
284+ self.s.putcmd("vrfy",address)
285+ code, msg = self.s.getreply()
286+ reply = '%d %s' % (code, msg)
287+ if valid:
288+ self.assertEquals(code, 252, reply)
289+ self.assertTrue(address in msg, reply)
290+ else:
291+ self.assertEquals(code, 550, reply)
292+ self.assertTrue('Recipient address rejected' in msg, reply)
293+ self.assertTrue('<%s>' % (address) in msg, reply)
294+
295+ def test_10_commands(self):
296+ '''Basic SMTP commands'''
297+
298+ #s = smtplib.SMTP('localhost', port=2525)
299+ # EHLO
300+ code, msg = self.s.ehlo()
301+ reply = '%d %s' % (code, msg)
302+ self.assertEquals(code, 250, reply)
303+ self.assertEquals(self.s.does_esmtp, 1, reply)
304+ self.assertTrue('8BITMIME' in self.s.ehlo_resp, reply)
305+ # No help available
306+ self.s.putcmd("help")
307+ code, msg = self.s.getreply()
308+ reply = '%d %s' % (code, msg)
309+ self.assertEquals(code, 502, reply)
310+ self.assertTrue('Error' in msg, reply)
311+ # VRFY addresses
312+ self._vrfy('address@example.com', valid=True)
313+ self._vrfy('does-not-exist', valid=False)
314+ self._vrfy(self.user.login, valid=True)
315+
316+ def _test_deliver_mail(self, user_sent_to, auth_user=None, auth_pass=None, use_tls=False):
317+ '''Perform mail delivery'''
318+
319+ if auth_user and auth_pass:
320+ self.s.login(auth_user, auth_pass)
321+ if use_tls:
322+ self.s.starttls()
323+ failed = self.s.sendmail('root',[user_sent_to.login,'does-not-exist'],'''From: Rooty <root>
324+To: "%s" <%s>
325+Subject: This is test 1
326+
327+Hello, nice to meet you.
328+''' % (user_sent_to.gecos, user_sent_to.login))
329+ #for addr in failed.keys():
330+ # print '%s %d %s' % (addr, failed[addr][0], failed[addr][1])
331+ self.assertEquals(len(failed),1,failed)
332+ self.assertTrue(failed.has_key('does-not-exist'),failed)
333+ self.assertEquals(failed['does-not-exist'][0],550,failed)
334+
335+ # Frighteningly, postfix seems to accept email before confirming
336+ # a successful write to disk for the recipient!
337+ time.sleep(2)
338+
339+ def _test_mail_in_spool(self, user_directed_to, target_spool_user=None, spool_file=None, auth_user=None, use_tls=False):
340+ '''Check that mail arrived in the spool'''
341+
342+ # Handle the case of forwarded emails
343+ if target_spool_user == None:
344+ target_spool_user = user_directed_to
345+ # Read delivered email
346+ if spool_file == None:
347+ spool_file = '/var/mail/%s' % (target_spool_user.login)
348+ time.sleep(1)
349+ contents = open(spool_file).read()
350+ # Server-side added headers...
351+ self.assertTrue('\nReceived: ' in contents, contents)
352+ if use_tls and self.lsb_release['Release'] > 6.06:
353+ expected = ' (Postfix) with ESMTPS id '
354+ else:
355+ expected = ' (Postfix) with ESMTP id '
356+ if auth_user:
357+ if self.lsb_release['Release'] < 8.04:
358+ self._skipped("Received header portion")
359+ else:
360+ expected = ' (Postfix) with ESMTPA id '
361+ self.assertTrue('(Authenticated sender: %s)' % (auth_user))
362+ self.assertTrue(expected in contents, 'Looking for "%s" in email:\n%s' % (expected, contents))
363+ self.assertTrue('\nMessage-Id: ' in contents, contents)
364+ self.assertTrue('\nDate: ' in contents, contents)
365+ # client-side headers/body...
366+ self.assertTrue('\nSubject: This is test 1' in contents, contents)
367+ self.assertTrue('\nFrom: Rooty' in contents, contents)
368+ self.assertTrue('\nTo: "Buddy %s" <%s@' % (user_directed_to.login, user_directed_to.login) in contents, contents)
369+ self.assertTrue('\nHello, nice to meet you.' in contents, contents)
370+
371+ def _test_roundtrip_mail(self, user_sent_to, user_to_check=None, spool_file=None, auth_user=None, auth_pass=None, use_tls=False):
372+ '''Send and check email delivery'''
373+ self._test_deliver_mail(user_sent_to, auth_user, auth_pass, use_tls=use_tls)
374+ self._test_mail_in_spool(user_sent_to, user_to_check, spool_file, auth_user=auth_user, use_tls=use_tls)
375+
376+ def test_10_sending_mail_direct(self):
377+ '''Mail delivered normally'''
378+ self._test_roundtrip_mail(self.user)
379+
380+ def test_10_sending_mail_direct_with_tls(self):
381+ '''Mail delivered normally with TLS'''
382+ self._test_roundtrip_mail(self.user, use_tls=True)
383+
384+ def test_10_sending_mail_direct_auth(self):
385+ '''Mail authentication'''
386+ # Verify rejected bad password and user
387+ self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
388+ self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
389+ self.s.login(self.user.login, self.user.password)
390+
391+ def test_10_sending_mail_direct_auth_full(self):
392+ '''Mail delivered with authentication'''
393+ # Perform end-to-end authentication test
394+ self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
395+
396+ def _write_forward(self, user, contents):
397+ forward_filename = '/home/%s/.forward' % (user.login)
398+ open(forward_filename,'w').write(contents)
399+ os.chown(forward_filename, user.uid, user.gid)
400+
401+ def test_10_sending_mail_forward_normal(self):
402+ '''Mail delivered via .forward'''
403+
404+ forward_user = testlib.TestUser(lower=True)
405+ self._write_forward(forward_user, self.user.login+'\n')
406+ self._test_roundtrip_mail(forward_user, self.user)
407+
408+ def test_10_sending_mail_forward_xternal(self):
409+ '''Mail processed by commands in .forward'''
410+
411+ # Create user-writable redirected mbox destination
412+ mbox, mbox_name = testlib.mkstemp_fill('',prefix='test-postfix.mbox-')
413+ mbox.close()
414+ os.chown(mbox_name, self.user.uid, self.user.gid)
415+
416+ # Create a script to run in the .forward
417+ redir, redir_name = testlib.mkstemp_fill('''#!/bin/bash
418+/bin/cat > "%s"
419+''' % (mbox_name),prefix='test-postfix.redir-')
420+ redir.close()
421+ os.chmod(redir_name,0755)
422+
423+ self._write_forward(self.user,'|%s\n' % (redir_name))
424+
425+ # SKIP TESTING, FAILS IN TESTBED
426+ #self._test_roundtrip_mail(self.user, spool_file=mbox_name)
427+
428+ os.unlink(redir_name)
429+ os.unlink(mbox_name)
430+
431+ def test_11_security_CVE_2008_2936(self):
432+ '''CVE-2008-2936 fixed'''
433+
434+ # First, create our "target" file
435+ secret = '/root/secret.txt'
436+ open(secret,'w').write('Secret information\n')
437+ os.chmod(secret, 0700)
438+
439+ # Now, create a symlink to the target (we're going to use /var/tmp
440+ # since we're assuming it, /root, /var/mail are on the same filesystem.
441+ # For most chroot testing, /tmp is mounted from the real machine.
442+ if os.path.exists('/var/tmp/secret.link'):
443+ os.unlink('/var/tmp/secret.link')
444+ self.assertEquals(subprocess.call(['su','-c','ln -s /root/secret.txt /var/tmp/secret.link',self.user.login]),0,"Symlink creation")
445+
446+ # Now, the hardlink, which in ubuntu's case needs to be done by root.
447+ os.link('/var/tmp/secret.link','/var/mail/%s' % (self.user.login))
448+
449+ # Email delivered to this user will be written to the root-owned
450+ # file now if the CVE is unfixed.
451+ failed = self.s.sendmail('root',[self.user.login],'''From: Evil <root>
452+To: "%s" <%s>
453+Subject: This is an overwrite test
454+
455+Hello, nice to pwn you.
456+''' % (self.user.gecos, self.user.login))
457+ self.assertEquals(len(failed),0,failed)
458+
459+ # Pause for delivery
460+ time.sleep(2)
461+
462+ contents = open(secret).read()
463+ # Clean up before possible failures
464+ os.unlink('/var/mail/%s' % (self.user.login))
465+ os.unlink('/var/tmp/secret.link')
466+ os.unlink(secret)
467+ # Check results
468+ self.assertTrue('Secret information' in contents, contents)
469+ self.assertFalse('nice to pwn you' in contents, contents)
470+
471+ def _check_auth(self, mech):
472+ '''Check AUTH: side effect-- self.s is set'''
473+ try:
474+ self.s.quit()
475+ except:
476+ pass
477+ self.s = smtplib.SMTP('localhost', port=2525)
478+
479+ self._is_listening()
480+
481+ # has mech
482+ code, msg = self.s.ehlo()
483+ reply = '%d %s' % (code, msg)
484+ self.assertEquals(code, 250, reply)
485+ self.assertEquals(self.s.does_esmtp, 1, reply)
486+ self.assertTrue('%s' % mech in self.s.ehlo_resp, reply)
487+ return reply
488+
489+ def test_20_sasldb_cram_md5(self):
490+ '''Test sasldb CRAM-MD5'''
491+ # Quit the setUp() connection, restart the server and reconnect
492+ self.s.quit()
493+ self._setup_sasl("CRAM-MD5")
494+
495+ reply = self._check_auth("CRAM-MD5")
496+ self.assertTrue('PLAIN' not in reply, reply)
497+
498+ # Verify rejected bad password and user
499+ self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
500+ self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
501+
502+ # Perform end-to-end authentication test
503+ self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
504+
505+ def test_20_sasldb_digest_md5(self):
506+ '''Test sasldb DIGEST-MD5 is supported'''
507+ # Quit the setUp() connection, restart the server and reconnect
508+ self.s.quit()
509+ self._setup_sasl("DIGEST-MD5")
510+
511+ reply = self._check_auth("DIGEST-MD5")
512+ self.assertTrue('PLAIN' not in reply, reply)
513+
514+ # TODO: Perform end-to-end authentication test (need alternative to smtplib)
515+ #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
516+ #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
517+ #self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
518+
519+ def test_20_sasldb_login(self):
520+ '''Test sasldb LOGIN is supported'''
521+ # Quit the setUp() connection, restart the server and reconnect
522+ self.s.quit()
523+ self._setup_sasl("LOGIN", force_sasldb=True)
524+
525+ reply = self._check_auth("LOGIN")
526+ self.assertTrue('PLAIN' not in reply, reply)
527+
528+ # TODO: Perform end-to-end authentication test (need alternative to smtplib)
529+ #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
530+ #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
531+ #self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
532+
533+ def test_20_sasldb_plain(self):
534+ '''Test sasldb PLAIN'''
535+ # Quit the setUp() connection, restart the server and reconnect
536+ self.s.quit()
537+ self._setup_sasl("PLAIN", force_sasldb=True)
538+
539+ reply = self._check_auth("PLAIN")
540+
541+ # Verify rejected bad password and user
542+ self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
543+ self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
544+ # TODO: Perform end-to-end authentication test (need alternative to smtplib)
545+ self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
546+
547+ def test_21_security_CVE_2011_1720(self):
548+ '''CVE-2011-1720 fixed'''
549+ # http://www.postfix.org/CVE-2011-1720.html
550+
551+ # setup sasl and connect
552+ self.s.quit()
553+ self._setup_sasl("CRAM-MD5", "DIGEST-MD5")
554+
555+ # verify sasl support
556+ rc, report = testlib.cmd(['postconf', 'smtpd_sasl_auth_enable'])
557+ expected = 0
558+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
559+ self.assertEquals(expected, rc, result + report)
560+ self.assertTrue('yes' in report, "Could not find 'yes' in report:\n%s" % report)
561+
562+ if self.lsb_release['Release'] > 6.06:
563+ rc, report = testlib.cmd(['postconf', 'smtpd_sasl_type'])
564+ expected = 0
565+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
566+ self.assertEquals(expected, rc, result + report)
567+ self.assertTrue('cyrus' in report, "Could not find 'cyrus' in report:\n%s" % report)
568+
569+ # ehlo
570+ reply = self._check_auth("CRAM-MD5")
571+ self.assertTrue('DIGEST-MD5' in reply, reply)
572+
573+ code, msg = self.s.docmd("AUTH", "CRAM-MD5")
574+ reply = '%d %s' % (code, msg)
575+ self.assertEquals(code, 334, reply)
576+
577+ code, msg = self.s.docmd("*")
578+ reply = '%d %s' % (code, msg)
579+ self.assertEquals(code, 501, reply)
580+
581+ error = False
582+ try:
583+ code, msg = self.s.docmd("AUTH", "DIGEST-MD5")
584+ except:
585+ error = True
586+ self.assertFalse(error, "server disconnected")
587+ reply = '%d %s' % (code, msg)
588+ self.assertEquals(code, 334, reply)
589+
590+ def test_99_restore(self):
591+ '''Restore configuration'''
592+ self._tearDown()
593+
594+if __name__ == '__main__':
595+ unittest.main()
596
597=== added file 'debian/tests/testlib.py'
598--- debian/tests/testlib.py 1970-01-01 00:00:00 +0000
599+++ debian/tests/testlib.py 2013-05-07 21:42:30 +0000
600@@ -0,0 +1,1144 @@
601+#
602+# testlib.py quality assurance test script
603+# Copyright (C) 2008-2011 Canonical Ltd.
604+#
605+# This library is free software; you can redistribute it and/or
606+# modify it under the terms of the GNU Library General Public
607+# License as published by the Free Software Foundation; either
608+# version 2 of the License.
609+#
610+# This library is distributed in the hope that it will be useful,
611+# but WITHOUT ANY WARRANTY; without even the implied warranty of
612+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
613+# Library General Public License for more details.
614+#
615+# You should have received a copy of the GNU Library General Public
616+# License along with this program. If not, see
617+# <http://www.gnu.org/licenses/>.
618+#
619+
620+'''Common classes and functions for package tests.'''
621+
622+import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob
623+import sys, socket, gzip
624+from stat import *
625+from encodings import string_escape
626+
627+import warnings
628+warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
629+try:
630+ import apt_pkg
631+ apt_pkg.InitSystem();
632+except:
633+ # On non-Debian system, fall back to simple comparison without debianisms
634+ class apt_pkg(object):
635+ def VersionCompare(one, two):
636+ list_one = one.split('.')
637+ list_two = two.split('.')
638+ while len(list_one)>0 and len(list_two)>0:
639+ if list_one[0] > list_two[0]:
640+ return 1
641+ if list_one[0] < list_two[0]:
642+ return -1
643+ list_one.pop(0)
644+ list_two.pop(0)
645+ return 0
646+
647+bogus_nxdomain = "208.69.32.132"
648+
649+# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
650+# This is needed so that the subprocesses that produce endless output
651+# actually quit when the reader goes away.
652+import signal
653+def subprocess_setup():
654+ # Python installs a SIGPIPE handler by default. This is usually not what
655+ # non-Python subprocesses expect.
656+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
657+
658+class TimedOutException(Exception):
659+ def __init__(self, value = "Timed Out"):
660+ self.value = value
661+ def __str__(self):
662+ return repr(self.value)
663+
664+def _restore_backup(path):
665+ pathbackup = path + '.autotest'
666+ if os.path.exists(pathbackup):
667+ shutil.move(pathbackup, path)
668+
669+def _save_backup(path):
670+ pathbackup = path + '.autotest'
671+ if os.path.exists(path) and not os.path.exists(pathbackup):
672+ shutil.copy2(path, pathbackup)
673+ # copy2 does not copy ownership, so do it here.
674+ # Reference: http://docs.python.org/library/shutil.html
675+ a = os.stat(path)
676+ os.chown(pathbackup, a[4], a[5])
677+
678+def config_copydir(path):
679+ if os.path.exists(path) and not os.path.isdir(path):
680+ raise OSError, "'%s' is not a directory" % (path)
681+ _restore_backup(path)
682+
683+ pathbackup = path + '.autotest'
684+ if os.path.exists(path):
685+ shutil.copytree(path, pathbackup, symlinks=True)
686+
687+def config_replace(path,contents,append=False):
688+ '''Replace (or append) to a config file'''
689+ _restore_backup(path)
690+ if os.path.exists(path):
691+ _save_backup(path)
692+ if append:
693+ contents = file(path).read() + contents
694+ open(path, 'w').write(contents)
695+
696+def config_comment(path, field):
697+ _save_backup(path)
698+ contents = ""
699+ for line in file(path):
700+ if re.search("^\s*%s\s*=" % (field), line):
701+ line = "#" + line
702+ contents += line
703+
704+ open(path+'.new', 'w').write(contents)
705+ os.rename(path+'.new', path)
706+
707+def config_set(path, field, value, spaces=True):
708+ _save_backup(path)
709+ contents = ""
710+ if spaces==True:
711+ setting = '%s = %s\n' % (field, value)
712+ else:
713+ setting = '%s=%s\n' % (field, value)
714+ found = False
715+ for line in file(path):
716+ if re.search("^\s*%s\s*=" % (field), line):
717+ found = True
718+ line = setting
719+ contents += line
720+ if not found:
721+ contents += setting
722+
723+ open(path+'.new', 'w').write(contents)
724+ os.rename(path+'.new', path)
725+
726+def config_patch(path, patch, depth=1):
727+ '''Patch a config file'''
728+ _restore_backup(path)
729+ _save_backup(path)
730+
731+ handle, name = mkstemp_fill(patch)
732+ rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
733+ os.unlink(name)
734+ if rc != 0:
735+ raise Exception("Patch failed")
736+
737+def config_restore(path):
738+ '''Rename a replaced config file back to its initial state'''
739+ _restore_backup(path)
740+
741+def timeout(secs, f, *args):
742+ def handler(signum, frame):
743+ raise TimedOutException()
744+
745+ old = signal.signal(signal.SIGALRM, handler)
746+ result = None
747+ signal.alarm(secs)
748+ try:
749+ result = f(*args)
750+ finally:
751+ signal.alarm(0)
752+ signal.signal(signal.SIGALRM, old)
753+
754+ return result
755+
756+def require_nonroot():
757+ if os.geteuid() == 0:
758+ print >>sys.stderr, "This series of tests should be run as a regular user with sudo access, not as root."
759+ sys.exit(1)
760+
761+def require_root():
762+ if os.geteuid() != 0:
763+ print >>sys.stderr, "This series of tests should be run with root privileges (e.g. via sudo)."
764+ sys.exit(1)
765+
766+def require_sudo():
767+ if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
768+ print >>sys.stderr, "This series of tests must be run under sudo."
769+ sys.exit(1)
770+ if os.environ['SUDO_USER'] == 'root':
771+ print >>sys.stderr, 'Please run this test using sudo from a regular user. (You ran sudo from root.)'
772+ sys.exit(1)
773+
774+def random_string(length,lower=False):
775+ '''Return a random string, consisting of ASCII letters, with given
776+ length.'''
777+
778+ s = ''
779+ selection = string.letters
780+ if lower:
781+ selection = string.lowercase
782+ maxind = len(selection)-1
783+ for l in range(length):
784+ s += selection[random.randint(0, maxind)]
785+ return s
786+
787+def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
788+ '''As tempfile.mkstemp does, return a (file, name) pair, but with
789+ prefilled contents.'''
790+
791+ handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
792+ os.close(handle)
793+ handle = file(name,"w+")
794+ handle.write(contents)
795+ handle.flush()
796+ handle.seek(0)
797+
798+ return handle, name
799+
800+def create_fill(path, contents, mode=0644):
801+ '''Safely create a page'''
802+ # make the temp file in the same dir as the destination file so we
803+ # don't get invalid cross-device link errors when we rename
804+ handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
805+ handle.close()
806+ os.rename(name, path)
807+ os.chmod(path, mode)
808+
809+def login_exists(login):
810+ '''Checks whether the given login exists on the system.'''
811+
812+ try:
813+ pwd.getpwnam(login)
814+ return True
815+ except KeyError:
816+ return False
817+
818+def group_exists(group):
819+ '''Checks whether the given login exists on the system.'''
820+
821+ try:
822+ grp.getgrnam(group)
823+ return True
824+ except KeyError:
825+ return False
826+
827+def recursive_rm(dirPath, contents_only=False):
828+ '''recursively remove directory'''
829+ names = os.listdir(dirPath)
830+ for name in names:
831+ path = os.path.join(dirPath, name)
832+ if os.path.islink(path) or not os.path.isdir(path):
833+ os.unlink(path)
834+ else:
835+ recursive_rm(path)
836+ if contents_only == False:
837+ os.rmdir(dirPath)
838+
839+def check_pidfile(exe, pidfile):
840+ '''Checks if pid in pidfile is running'''
841+ if not os.path.exists(pidfile):
842+ return False
843+
844+ # get the pid
845+ try:
846+ fd = open(pidfile, 'r')
847+ pid = fd.readline().rstrip('\n')
848+ fd.close()
849+ except:
850+ return False
851+
852+ return check_pid(exe, pid)
853+
854+def check_pid(exe, pid):
855+ '''Checks if pid is running'''
856+ cmdline = "/proc/%s/cmdline" % (str(pid))
857+ if not os.path.exists(cmdline):
858+ return False
859+
860+ # get the command line
861+ try:
862+ fd = open(cmdline, 'r')
863+ tmp = fd.readline().split('\0')
864+ fd.close()
865+ except:
866+ return False
867+
868+ # this allows us to match absolute paths or just the executable name
869+ if re.match('^' + exe + '$', tmp[0]) or \
870+ re.match('.*/' + exe + '$', tmp[0]) or \
871+ re.match('^' + exe + ': ', tmp[0]) or \
872+ re.match('^\(' + exe + '\)', tmp[0]):
873+ return True
874+
875+ return False
876+
877+def check_port(port, proto, ver=4):
878+ '''Check if something is listening on the specified port.
879+ WARNING: for some reason this does not work with a bind mounted /proc
880+ '''
881+ assert (port >= 1)
882+ assert (port <= 65535)
883+ assert (proto.lower() == "tcp" or proto.lower() == "udp")
884+ assert (ver == 4 or ver == 6)
885+
886+ fn = "/proc/net/%s" % (proto)
887+ if ver == 6:
888+ fn += str(ver)
889+
890+ rc, report = cmd(['cat', fn])
891+ assert (rc == 0)
892+
893+ hport = "%0.4x" % port
894+
895+ if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
896+ return True
897+ return False
898+
899+def get_arch():
900+ '''Get the current architecture'''
901+ rc, report = cmd(['uname', '-m'])
902+ assert (rc == 0)
903+ return report.strip()
904+
905+def get_memory():
906+ '''Gets total ram and swap'''
907+ meminfo = "/proc/meminfo"
908+ memtotal = 0
909+ swaptotal = 0
910+ if not os.path.exists(meminfo):
911+ return (False, False)
912+
913+ try:
914+ fd = open(meminfo, 'r')
915+ for line in fd.readlines():
916+ splitline = line.split()
917+ if splitline[0] == 'MemTotal:':
918+ memtotal = int(splitline[1])
919+ elif splitline[0] == 'SwapTotal:':
920+ swaptotal = int(splitline[1])
921+ fd.close()
922+ except:
923+ return (False, False)
924+
925+ return (memtotal,swaptotal)
926+
927+def is_running_in_vm():
928+ '''Check if running under a VM'''
929+ # add other virtualization environments here
930+ for search in ['QEMU Virtual CPU']:
931+ rc, report = cmd_pipe(['dmesg'], ['grep', search])
932+ if rc == 0:
933+ return True
934+ return False
935+
936+def ubuntu_release():
937+ '''Get the Ubuntu release'''
938+ f = "/etc/lsb-release"
939+ try:
940+ size = os.stat(f)[ST_SIZE]
941+ except:
942+ return "UNKNOWN"
943+
944+ if size > 1024*1024:
945+ raise IOError, 'Could not open "%s" (too big)' % f
946+
947+ try:
948+ fh = open("/etc/lsb-release", 'r')
949+ except:
950+ raise
951+
952+ lines = fh.readlines()
953+ fh.close()
954+
955+ pat = re.compile(r'DISTRIB_CODENAME')
956+ for line in lines:
957+ if pat.search(line):
958+ return line.split('=')[1].rstrip('\n').rstrip('\r')
959+
960+ return "UNKNOWN"
961+
962+def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
963+ '''Try to execute given command (array) and return its stdout, or return
964+ a textual error if it failed.'''
965+
966+ try:
967+ sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
968+ except OSError, e:
969+ return [127, str(e)]
970+
971+ out, outerr = sp.communicate(input)
972+ # Handle redirection of stdout
973+ if out == None:
974+ out = ''
975+ # Handle redirection of stderr
976+ if outerr == None:
977+ outerr = ''
978+ return [sp.returncode,out+outerr]
979+
980+def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
981+ '''Try to pipe command1 into command2.'''
982+ try:
983+ sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
984+ sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
985+ except OSError, e:
986+ return [127, str(e)]
987+
988+ out = sp2.communicate(input)[0]
989+ return [sp2.returncode,out]
990+
991+def cwd_has_enough_space(cdir, total_bytes):
992+ '''Determine if the partition of the current working directory has 'bytes'
993+ free.'''
994+ rc, df_output = cmd(['df'])
995+ result = 'Got exit code %d, expected %d\n' % (rc, 0)
996+ if rc != 0:
997+ return False
998+
999+ kb = total_bytes / 1024
1000+
1001+ mounts = dict()
1002+ for line in df_output.splitlines():
1003+ if '/' not in line:
1004+ continue
1005+ tmp = line.split()
1006+ mounts[tmp[5]] = int(tmp[3])
1007+
1008+ cdir = os.getcwd()
1009+ while cdir != '/':
1010+ if not mounts.has_key(cdir):
1011+ cdir = os.path.dirname(cdir)
1012+ continue
1013+ if kb < mounts[cdir]:
1014+ return True
1015+ else:
1016+ return False
1017+
1018+ if kb < mounts['/']:
1019+ return True
1020+
1021+ return False
1022+
1023+def get_md5(filename):
1024+ '''Gets the md5sum of the file specified'''
1025+
1026+ (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
1027+ expected = 0
1028+ assert (expected == rc)
1029+
1030+ return report.split(' ')[0]
1031+
1032+def dpkg_compare_installed_version(pkg, check, version):
1033+ '''Gets the version for the installed package, and compares it to the
1034+ specified version.
1035+ '''
1036+ (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
1037+ assert (rc == 0)
1038+ assert ("Status: install ok installed" in report)
1039+ installed_version = ""
1040+ for line in report.splitlines():
1041+ if line.startswith("Version: "):
1042+ installed_version = line.split()[1]
1043+
1044+ assert (installed_version != "")
1045+
1046+ (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
1047+ assert (rc == 0 or rc == 1)
1048+ if rc == 0:
1049+ return True
1050+ return False
1051+
1052+def prepare_source(source, builder, cached_src, build_src, patch_system):
1053+ '''Download and unpack source package, installing necessary build depends,
1054+ adjusting the permissions for the 'builder' user, and returning the
1055+ directory of the unpacked source. Patch system can be one of:
1056+ - cdbs
1057+ - dpatch
1058+ - quilt
1059+ - quiltv3
1060+ - None (not the string)
1061+
1062+ This is normally used like this:
1063+
1064+ def setUp(self):
1065+ ...
1066+ self.topdir = os.getcwd()
1067+ self.cached_src = os.path.join(os.getcwd(), "source")
1068+ self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
1069+ self.builder = testlib.TestUser()
1070+ testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
1071+ os.chmod(self.tmpdir, 0775)
1072+
1073+ def tearDown(self):
1074+ ...
1075+ self.builder = None
1076+ self.topdir = os.getcwd()
1077+ if os.path.exists(self.tmpdir):
1078+ testlib.recursive_rm(self.tmpdir)
1079+
1080+ def test_suite_build(self):
1081+ ...
1082+ build_dir = testlib.prepare_source('foo', \
1083+ self.builder, \
1084+ self.cached_src, \
1085+ os.path.join(self.tmpdir, \
1086+ os.path.basename(self.cached_src)),
1087+ "quilt")
1088+ os.chdir(build_dir)
1089+
1090+ # Example for typical build, adjust as necessary
1091+ print ""
1092+ print " make clean"
1093+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
1094+
1095+ print " configure"
1096+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
1097+
1098+ print " make (will take a while)"
1099+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
1100+
1101+ print " make check (will take a while)",
1102+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
1103+ expected = 0
1104+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1105+ self.assertEquals(expected, rc, result + report)
1106+
1107+ def test_suite_cleanup(self):
1108+ ...
1109+ if os.path.exists(self.cached_src):
1110+ testlib.recursive_rm(self.cached_src)
1111+
1112+ It is up to the caller to clean up cached_src and build_src (as in the
1113+ above example, often the build_src is in a tmpdir that is cleaned in
1114+ tearDown() and the cached_src is cleaned in a one time clean-up
1115+ operation (eg 'test_suite_cleanup()) which must be run after the build
1116+ suite test (obviously).
1117+ '''
1118+
1119+ # Make sure we have a clean slate
1120+ assert (os.path.exists(os.path.dirname(build_src)))
1121+ assert (not os.path.exists(build_src))
1122+
1123+ cdir = os.getcwd()
1124+ if os.path.exists(cached_src):
1125+ shutil.copytree(cached_src, build_src)
1126+ os.chdir(build_src)
1127+ else:
1128+ # Only install the build dependencies on the initial setup
1129+ rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
1130+ assert (rc == 0)
1131+
1132+ os.makedirs(build_src)
1133+ os.chdir(build_src)
1134+
1135+ # These are always needed
1136+ pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
1137+ rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
1138+ assert (rc == 0)
1139+
1140+ rc, report = cmd(['apt-get','source',source])
1141+ assert (rc == 0)
1142+ shutil.copytree(build_src, cached_src)
1143+
1144+ unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
1145+
1146+ # Now apply the patches. Do it here so that we don't mess up our cached
1147+ # sources.
1148+ os.chdir(unpacked_dir)
1149+ assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
1150+ if patch_system != None and patch_system != "quiltv3":
1151+ if patch_system == "quilt":
1152+ os.environ.setdefault('QUILT_PATCHES','debian/patches')
1153+ rc, report = cmd(['quilt', 'push', '-a'])
1154+ assert (rc == 0)
1155+ elif patch_system == "cdbs":
1156+ rc, report = cmd(['./debian/rules', 'apply-patches'])
1157+ assert (rc == 0)
1158+ elif patch_system == "dpatch":
1159+ rc, report = cmd(['dpatch', 'apply-all'])
1160+ assert (rc == 0)
1161+
1162+ cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
1163+ os.chdir(cdir)
1164+
1165+ return unpacked_dir
1166+
1167+def _aa_status():
1168+ '''Get aa-status output'''
1169+ exe = "/usr/sbin/aa-status"
1170+ assert (os.path.exists(exe))
1171+ if os.geteuid() == 0:
1172+ return cmd([exe])
1173+ return cmd(['sudo', exe])
1174+
1175+def is_apparmor_loaded(path):
1176+ '''Check if profile is loaded'''
1177+ rc, report = _aa_status()
1178+ if rc != 0:
1179+ return False
1180+
1181+ for line in report.splitlines():
1182+ if line.endswith(path):
1183+ return True
1184+ return False
1185+
1186+def is_apparmor_confined(path):
1187+ '''Check if application is confined'''
1188+ rc, report = _aa_status()
1189+ if rc != 0:
1190+ return False
1191+
1192+ for line in report.splitlines():
1193+ if re.search('%s \(' % path, line):
1194+ return True
1195+ return False
1196+
1197+def check_apparmor(path, first_ubuntu_release, is_running=True):
1198+ '''Check if path is loaded and confined for everything higher than the
1199+ first Ubuntu release specified.
1200+
1201+ Usage:
1202+ rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
1203+ if rc < 0:
1204+ return self._skipped(report)
1205+
1206+ expected = 0
1207+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1208+ self.assertEquals(expected, rc, result + report)
1209+ '''
1210+ global manager
1211+ rc = -1
1212+
1213+ if manager.lsb_release["Release"] < first_ubuntu_release:
1214+ return (rc, "Skipped apparmor check")
1215+
1216+ if not os.path.exists('/sbin/apparmor_parser'):
1217+ return (rc, "Skipped (couldn't find apparmor_parser)")
1218+
1219+ rc = 0
1220+ msg = ""
1221+ if not is_apparmor_loaded(path):
1222+ rc = 1
1223+ msg = "Profile not loaded for '%s'" % path
1224+
1225+ # this check only makes sense it the 'path' is currently executing
1226+ if is_running and rc == 0 and not is_apparmor_confined(path):
1227+ rc = 1
1228+ msg = "'%s' is not running in enforce mode" % path
1229+
1230+ return (rc, msg)
1231+
1232+def get_gcc_version(gcc, full=True):
1233+ gcc_version = 'none'
1234+ if not gcc.startswith('/'):
1235+ gcc = '/usr/bin/%s' % (gcc)
1236+ if os.path.exists(gcc):
1237+ gcc_version = 'unknown'
1238+ lines = cmd([gcc,'-v'])[1].strip().splitlines()
1239+ version_lines = [x for x in lines if x.startswith('gcc version')]
1240+ if len(version_lines) == 1:
1241+ gcc_version = " ".join(version_lines[0].split()[2:])
1242+ if not full:
1243+ return gcc_version.split()[0]
1244+ return gcc_version
1245+
1246+def is_kdeinit_running():
1247+ '''Test if kdeinit is running'''
1248+ # applications that use kdeinit will spawn it if it isn't running in the
1249+ # test. This is a problem because it does not exit. This is a helper to
1250+ # check for it.
1251+ rc, report = cmd(['ps', 'x'])
1252+ if 'kdeinit4 Running' not in report:
1253+ print >>sys.stderr, ("kdeinit not running (you may start/stop any KDE application then run this script again)")
1254+ return False
1255+ return True
1256+
1257+def get_pkgconfig_flags(libs=[]):
1258+ '''Find pkg-config flags for libraries'''
1259+ assert (len(libs) > 0)
1260+ rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
1261+ expected = 0
1262+ if rc != expected:
1263+ print >>sys.stderr, 'Got exit code %d, expected %d\n' % (rc, expected)
1264+ assert(rc == expected)
1265+ return pkg_config.split()
1266+
1267+class TestDaemon:
1268+ '''Helper class to manage daemons consistently'''
1269+ def __init__(self, init):
1270+ '''Setup daemon attributes'''
1271+ self.initscript = init
1272+
1273+ def start(self):
1274+ '''Start daemon'''
1275+ rc, report = cmd([self.initscript, 'start'])
1276+ expected = 0
1277+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1278+ time.sleep(2)
1279+ if expected != rc:
1280+ return (False, result + report)
1281+
1282+ if "fail" in report:
1283+ return (False, "Found 'fail' in report\n" + report)
1284+
1285+ return (True, "")
1286+
1287+ def stop(self):
1288+ '''Stop daemon'''
1289+ rc, report = cmd([self.initscript, 'stop'])
1290+ expected = 0
1291+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1292+ if expected != rc:
1293+ return (False, result + report)
1294+
1295+ if "fail" in report:
1296+ return (False, "Found 'fail' in report\n" + report)
1297+
1298+ return (True, "")
1299+
1300+ def reload(self):
1301+ '''Reload daemon'''
1302+ rc, report = cmd([self.initscript, 'force-reload'])
1303+ expected = 0
1304+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1305+ if expected != rc:
1306+ return (False, result + report)
1307+
1308+ if "fail" in report:
1309+ return (False, "Found 'fail' in report\n" + report)
1310+
1311+ return (True, "")
1312+
1313+ def restart(self):
1314+ '''Restart daemon'''
1315+ (res, str) = self.stop()
1316+ if not res:
1317+ return (res, str)
1318+
1319+ (res, str) = self.start()
1320+ if not res:
1321+ return (res, str)
1322+
1323+ return (True, "")
1324+
1325+ def status(self):
1326+ '''Check daemon status'''
1327+ rc, report = cmd([self.initscript, 'status'])
1328+ expected = 0
1329+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1330+ if expected != rc:
1331+ return (False, result + report)
1332+
1333+ if "fail" in report:
1334+ return (False, "Found 'fail' in report\n" + report)
1335+
1336+ return (True, "")
1337+
1338+class TestlibManager(object):
1339+ '''Singleton class used to set up per-test-run information'''
1340+ def __init__(self):
1341+ # Set glibc aborts to dump to stderr instead of the tty so test output
1342+ # is more sane.
1343+ os.environ.setdefault('LIBC_FATAL_STDERR_','1')
1344+
1345+ # check verbosity
1346+ self.verbosity = False
1347+ if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
1348+ self.verbosity = True
1349+
1350+ # Load LSB release file
1351+ self.lsb_release = dict()
1352+ if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
1353+ raise OSError, "Please install 'lsb-release'"
1354+ for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].splitlines():
1355+ field, value = line.split(':',1)
1356+ value=value.strip()
1357+ field=field.strip()
1358+ # Convert numerics
1359+ try:
1360+ value = float(value)
1361+ except:
1362+ pass
1363+ self.lsb_release.setdefault(field,value)
1364+
1365+ # FIXME: hack OEM releases into known-Ubuntu versions
1366+ if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
1367+ if self.lsb_release['Release'] == 1.0:
1368+ self.lsb_release['Distributor ID'] = "Ubuntu"
1369+ self.lsb_release['Release'] = 8.04
1370+ else:
1371+ raise OSError, "Unknown version of HP MIE"
1372+
1373+ # FIXME: hack to assume a most-recent release if we're not
1374+ # running under Ubuntu.
1375+ if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
1376+ self.lsb_release['Release'] = 10000
1377+ # Adjust Linaro release to pretend to be Ubuntu
1378+ if self.lsb_release['Distributor ID'] in ["Linaro"]:
1379+ self.lsb_release['Distributor ID'] = "Ubuntu"
1380+ self.lsb_release['Release'] -= 0.01
1381+
1382+ # Load arch
1383+ if not os.path.exists('/usr/bin/dpkg'):
1384+ machine = cmd(['uname','-m'])[1].strip()
1385+ if machine.endswith('86'):
1386+ self.dpkg_arch = 'i386'
1387+ elif machine.endswith('_64'):
1388+ self.dpkg_arch = 'amd64'
1389+ elif machine.startswith('arm'):
1390+ self.dpkg_arch = 'armel'
1391+ else:
1392+ raise ValueError, "Unknown machine type '%s'" % (machine)
1393+ else:
1394+ self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
1395+
1396+ # Find kernel version
1397+ self.kernel_is_ubuntu = False
1398+ self.kernel_version_signature = None
1399+ self.kernel_version = cmd(["uname","-r"])[1].strip()
1400+ versig = '/proc/version_signature'
1401+ if os.path.exists(versig):
1402+ self.kernel_is_ubuntu = True
1403+ self.kernel_version_signature = file(versig).read().strip()
1404+ self.kernel_version_ubuntu = self.kernel_version
1405+ elif os.path.exists('/usr/bin/dpkg'):
1406+ # this can easily be inaccurate but is only an issue for Dapper
1407+ rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
1408+ if rc == 0:
1409+ self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
1410+ self.kernel_version_ubuntu = self.kernel_version_signature
1411+ if self.kernel_version_signature == None:
1412+ # Attempt to fall back to something for non-Debian-based
1413+ self.kernel_version_signature = self.kernel_version
1414+ self.kernel_version_ubuntu = self.kernel_version
1415+ # Build ubuntu version without hardware suffix
1416+ try:
1417+ self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
1418+ except:
1419+ pass
1420+
1421+ # Find gcc version
1422+ self.gcc_version = get_gcc_version('gcc')
1423+
1424+ # Find libc
1425+ self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
1426+
1427+ # Report self
1428+ if self.verbosity:
1429+ kernel = self.kernel_version_ubuntu
1430+ if kernel != self.kernel_version_signature:
1431+ kernel += " (%s)" % (self.kernel_version_signature)
1432+ print >>sys.stdout, "Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
1433+ sys.argv[0],
1434+ self.lsb_release['Distributor ID'],
1435+ self.lsb_release['Release'],
1436+ kernel,
1437+ self.dpkg_arch,
1438+ os.geteuid(), os.getuid(),
1439+ os.environ.get('SUDO_USER', ''))
1440+ sys.stdout.flush()
1441+
1442+ # Additional heuristics
1443+ #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
1444+ # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
1445+ # sys.stdout.flush()
1446+ # time.sleep(0.5)
1447+ # sys.stdout.write("destroyed\n")
1448+ # time.sleep(0.5)
1449+
1450+ def hello(self, msg):
1451+ print >>sys.stderr, "Hello from %s" % (msg)
1452+# The central instance
1453+manager = TestlibManager()
1454+
1455+class TestlibCase(unittest.TestCase):
1456+ def __init__(self, *args):
1457+ '''This is called for each TestCase test instance, which isn't much better
1458+ than SetUp.'''
1459+
1460+ unittest.TestCase.__init__(self, *args)
1461+
1462+ # Attach to and duplicate dicts from manager singleton
1463+ self.manager = manager
1464+ #self.manager.hello(repr(self) + repr(*args))
1465+ self.my_verbosity = self.manager.verbosity
1466+ self.lsb_release = self.manager.lsb_release
1467+ self.dpkg_arch = self.manager.dpkg_arch
1468+ self.kernel_version = self.manager.kernel_version
1469+ self.kernel_version_signature = self.manager.kernel_version_signature
1470+ self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
1471+ self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
1472+ self.gcc_version = self.manager.gcc_version
1473+ self.path_libc = self.manager.path_libc
1474+
1475+ def version_compare(self, one, two):
1476+ return apt_pkg.VersionCompare(one,two)
1477+
1478+ def assertFileType(self, filename, filetype):
1479+ '''Checks the file type of the file specified'''
1480+
1481+ (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
1482+ out = out.strip()
1483+ expected = 0
1484+ # Absolutely no idea why this happens on Hardy
1485+ if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
1486+ rc = 0
1487+ result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
1488+ self.assertEquals(expected, rc, result)
1489+
1490+ filetype = '^%s$' % (filetype)
1491+ result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
1492+ self.assertNotEquals(None, re.search(filetype, out), result)
1493+
1494+ def yank_commonname_from_cert(self, certfile):
1495+ '''Extract the commonName from a given PEM'''
1496+ rc, out = cmd(['openssl','asn1parse','-in',certfile])
1497+ if rc == 0:
1498+ ready = False
1499+ for line in out.splitlines():
1500+ if ready:
1501+ return line.split(':')[-1]
1502+ if ':commonName' in line:
1503+ ready = True
1504+ return socket.getfqdn()
1505+
1506+ def announce(self, text):
1507+ if self.my_verbosity:
1508+ print >>sys.stdout, "(%s) " % (text),
1509+ sys.stdout.flush()
1510+
1511+ def make_clean(self):
1512+ rc, output = self.shell_cmd(['make','clean'])
1513+ self.assertEquals(rc, 0, output)
1514+
1515+ def get_makefile_compiler(self):
1516+ # Find potential compiler name
1517+ compiler = 'gcc'
1518+ if os.path.exists('Makefile'):
1519+ for line in open('Makefile'):
1520+ if line.startswith('CC') and '=' in line:
1521+ items = [x.strip() for x in line.split('=')]
1522+ if items[0] == 'CC':
1523+ compiler = items[1]
1524+ break
1525+ return compiler
1526+
1527+ def make_target(self, target, expected=0):
1528+ '''Compile a target and report output'''
1529+
1530+ compiler = self.get_makefile_compiler()
1531+ rc, output = self.shell_cmd(['make',target])
1532+ self.assertEquals(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
1533+ self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
1534+ return output
1535+
1536+ # call as return testlib.skipped()
1537+ def _skipped(self, reason=""):
1538+ '''Provide a visible way to indicate that a test was skipped'''
1539+ if reason != "":
1540+ reason = ': %s' % (reason)
1541+ self.announce("skipped%s" % (reason))
1542+ return False
1543+
1544+ def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
1545+ argstr = "'" + "', '".join(args).strip() + "'"
1546+ rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
1547+ report = 'Command: ' + argstr + '\nOutput:\n' + out
1548+ return rc, report, out
1549+
1550+ def shell_cmd(self, args, stdin=None):
1551+ return cmd(args,stdin=stdin)
1552+
1553+ def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
1554+ '''Test a shell command matches a specific exit code'''
1555+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1556+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1557+ self.assertEquals(expected, rc, msg + result + report)
1558+
1559+ def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
1560+ '''Test a shell command doesn't match a specific exit code'''
1561+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1562+ result = 'Got (unwanted) exit code %d\n' % rc
1563+ self.assertNotEquals(unwanted, rc, msg + result + report)
1564+
1565+ def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
1566+ '''Test a shell command contains a specific output'''
1567+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1568+ result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
1569+ if not invert:
1570+ self.assertTrue(text in out, msg + result + report)
1571+ else:
1572+ self.assertFalse(text in out, msg + result + report)
1573+
1574+ def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
1575+ '''Test a shell command matches a specific output'''
1576+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1577+ result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
1578+ if not invert:
1579+ self.assertEquals(text, out, msg + result + report)
1580+ else:
1581+ self.assertNotEquals(text, out, msg + result + report)
1582+ if expected != None:
1583+ result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
1584+ self.assertEquals(rc, expected, msg + result + report)
1585+
1586+ def _word_find(self, report, content, invert=False):
1587+ '''Check for a specific string'''
1588+ if invert:
1589+ warning = 'Found "%s"\n' % content
1590+ self.assertTrue(content not in report, warning + report)
1591+ else:
1592+ warning = 'Could not find "%s"\n' % content
1593+ self.assertTrue(content in report, warning + report)
1594+
1595+ def _test_sysctl_value(self, path, expected, msg=None, exists=True):
1596+ sysctl = '/proc/sys/%s' % (path)
1597+ self.assertEquals(exists, os.path.exists(sysctl), sysctl)
1598+ value = None
1599+ if exists:
1600+ value = int(file(sysctl).read())
1601+ report = "%s is not %d: %d" % (sysctl, expected, value)
1602+ if msg:
1603+ report += " (%s)" % (msg)
1604+ self.assertEquals(value, expected, report)
1605+ return value
1606+
1607+ def set_sysctl_value(self, path, desired):
1608+ sysctl = '/proc/sys/%s' % (path)
1609+ self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
1610+ file(sysctl,'w').write(str(desired))
1611+ self._test_sysctl_value(path, desired)
1612+
1613+ def kernel_at_least(self, introduced):
1614+ return self.version_compare(self.kernel_version_ubuntu,
1615+ introduced) >= 0
1616+
1617+ def kernel_claims_cve_fixed(self, cve):
1618+ changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
1619+ if os.path.exists(changelog):
1620+ for line in gzip.open(changelog):
1621+ if cve in line and not "revert" in line and not "Revert" in line:
1622+ return True
1623+ return False
1624+
1625+class TestGroup:
1626+ '''Create a temporary test group and remove it again in the dtor.'''
1627+
1628+ def __init__(self, group=None, lower=False):
1629+ '''Create a new group'''
1630+
1631+ self.group = None
1632+ if group:
1633+ if group_exists(group):
1634+ raise ValueError, 'group name already exists'
1635+ else:
1636+ while(True):
1637+ group = random_string(7,lower=lower)
1638+ if not group_exists(group):
1639+ break
1640+
1641+ assert subprocess.call(['groupadd',group]) == 0
1642+ self.group = group
1643+ g = grp.getgrnam(self.group)
1644+ self.gid = g[2]
1645+
1646+ def __del__(self):
1647+ '''Remove the created group.'''
1648+
1649+ if self.group:
1650+ rc, report = cmd(['groupdel', self.group])
1651+ assert rc == 0
1652+
1653+class TestUser:
1654+ '''Create a temporary test user and remove it again in the dtor.'''
1655+
1656+ def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
1657+ '''Create a new user account with a random password.
1658+
1659+ By default, the login name is random, too, but can be explicitly
1660+ specified with 'login'. By default, a home directory is created, this
1661+ can be suppressed with 'home=False'.'''
1662+
1663+ self.login = None
1664+
1665+ if os.geteuid() != 0:
1666+ raise ValueError, "You must be root to run this test"
1667+
1668+ if login:
1669+ if login_exists(login):
1670+ raise ValueError, 'login name already exists'
1671+ else:
1672+ while(True):
1673+ login = 't' + random_string(7,lower=lower)
1674+ if not login_exists(login):
1675+ break
1676+
1677+ self.salt = random_string(2)
1678+ self.password = random_string(8,lower=lower)
1679+ self.crypted = crypt.crypt(self.password, self.salt)
1680+
1681+ creation = ['useradd', '-p', self.crypted]
1682+ if home:
1683+ creation += ['-m']
1684+ if group:
1685+ creation += ['-G',group]
1686+ if uidmin:
1687+ creation += ['-K','UID_MIN=%d'%uidmin]
1688+ if shell:
1689+ creation += ['-s',shell]
1690+ creation += [login]
1691+ assert subprocess.call(creation) == 0
1692+ # Set GECOS
1693+ assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
1694+
1695+ self.login = login
1696+ p = pwd.getpwnam(self.login)
1697+ self.uid = p[2]
1698+ self.gid = p[3]
1699+ self.gecos = p[4]
1700+ self.home = p[5]
1701+ self.shell = p[6]
1702+
1703+ def __del__(self):
1704+ '''Remove the created user account.'''
1705+
1706+ if self.login:
1707+ # sanity check the login name so we don't accidentally wipe too much
1708+ if len(self.login)>3 and not '/' in self.login:
1709+ subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
1710+ rc, report = cmd(['userdel', '-f', self.login])
1711+ assert rc == 0
1712+
1713+ def add_to_group(self, group):
1714+ '''Add user to the specified group name'''
1715+ rc, report = cmd(['usermod', '-G', group, self.login])
1716+ if rc != 0:
1717+ print report
1718+ assert rc == 0
1719+
1720+# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
1721+class TimeoutFunctionException(Exception):
1722+ """Exception to raise on a timeout"""
1723+ pass
1724+class TimeoutFunction:
1725+ def __init__(self, function, timeout):
1726+ self.timeout = timeout
1727+ self.function = function
1728+
1729+ def handle_timeout(self, signum, frame):
1730+ raise TimeoutFunctionException()
1731+
1732+ def __call__(self, *args, **kwargs):
1733+ old = signal.signal(signal.SIGALRM, self.handle_timeout)
1734+ signal.alarm(self.timeout)
1735+ try:
1736+ result = self.function(*args, **kwargs)
1737+ finally:
1738+ signal.signal(signal.SIGALRM, old)
1739+ signal.alarm(0)
1740+ return result
1741+
1742+def main():
1743+ print "hi"
1744+ unittest.main()

Subscribers

People subscribed via source and target branches

to all changes: