Merge lp:~yolanda.robla/ubuntu/saucy/postfix/dep-8-tests into lp:ubuntu/saucy/postfix

Proposed by Yolanda Robla
Status: Merged
Merged at revision: 57
Proposed branch: lp:~yolanda.robla/ubuntu/saucy/postfix/dep-8-tests
Merge into: lp:ubuntu/saucy/postfix
Diff against target: 1744 lines (+1705/-0)
6 files modified
debian/changelog (+6/-0)
debian/control (+1/-0)
debian/tests/control (+3/-0)
debian/tests/postfix (+16/-0)
debian/tests/test-postfix.py (+535/-0)
debian/tests/testlib.py (+1144/-0)
To merge this branch: bzr merge lp:~yolanda.robla/ubuntu/saucy/postfix/dep-8-tests
Reviewer Review Type Date Requested Status
Dimitri John Ledkov Approve
James Hunt (community) Approve
Daniel Holbach Needs Fixing
Ubuntu branches Pending
Review via email: mp+161610@code.launchpad.net

Description of the change

Added dep-8 tests

To post a comment you must log in.
Revision history for this message
Daniel Holbach (dholbach) wrote :

Could you add a changelog entry in debian/changelog for the upload? It might also be a good idea to forward the change to Debian.

review: Needs Fixing
57. By Yolanda Robla

d/tests: added dep-8-tests

Revision history for this message
Yolanda Robla (yolanda.robla) wrote :

recheck

Revision history for this message
James Hunt (jamesodhunt) wrote :

LGTM. Has this been submitted to Debian yet?

Revision history for this message
James Hunt (jamesodhunt) wrote :

status update.

review: Approve
Revision history for this message
Dimitri John Ledkov (xnox) wrote :

Had to run "update-maintainer" to modify maintainer/original-maintainer fields in debian/control.
Looks very good otherwise.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'debian/changelog'
--- debian/changelog 2013-03-22 10:30:29 +0000
+++ debian/changelog 2013-05-07 21:42:30 +0000
@@ -1,3 +1,9 @@
1postfix (2.10.0-3ubuntu1) saucy; urgency=low
2
3 * d/tests: added dep-8-tests
4
5 -- Yolanda <yolanda.robla@canonical.com> Tue, 07 May 2013 23:34:04 +0200
6
1postfix (2.10.0-3) unstable; urgency=low7postfix (2.10.0-3) unstable; urgency=low
28
3 [LaMont Jones]9 [LaMont Jones]
410
=== modified file 'debian/control'
--- debian/control 2013-03-04 09:03:31 +0000
+++ debian/control 2013-05-07 21:42:30 +0000
@@ -7,6 +7,7 @@
7Build-Depends: debhelper (>= 7), po-debconf (>= 0.5.0), groff-base, patch, lsb-release, libdb-dev (>=4.6.19), libldap2-dev (>=2.1), libpcre3-dev, libmysqlclient-dev|libmysqlclient15-dev|libmysqlclient14-dev, libssl-dev (>=0.9.7), libsasl2-dev, libpq-dev, libcdb-dev, hardening-wrapper, dpkg-dev (>= 1.15.5), libsqlite3-dev7Build-Depends: debhelper (>= 7), po-debconf (>= 0.5.0), groff-base, patch, lsb-release, libdb-dev (>=4.6.19), libldap2-dev (>=2.1), libpcre3-dev, libmysqlclient-dev|libmysqlclient15-dev|libmysqlclient14-dev, libssl-dev (>=0.9.7), libsasl2-dev, libpq-dev, libcdb-dev, hardening-wrapper, dpkg-dev (>= 1.15.5), libsqlite3-dev
8Vcs-Browser: http://git.debian.org/?p=users/lamont/postfix.git8Vcs-Browser: http://git.debian.org/?p=users/lamont/postfix.git
9Vcs-Git: git://git.debian.org/~lamont/postfix.git9Vcs-Git: git://git.debian.org/~lamont/postfix.git
10XS-Testsuite: autopkgtest
1011
11Package: postfix12Package: postfix
12Architecture: any13Architecture: any
1314
=== added directory 'debian/tests'
=== added file 'debian/tests/control'
--- debian/tests/control 1970-01-01 00:00:00 +0000
+++ debian/tests/control 2013-05-07 21:42:30 +0000
@@ -0,0 +1,3 @@
1Tests: postfix
2Depends: python-unit, procmail, sasl2-bin, python-pexpect, lsb-release
3Restrictions: needs-root
04
=== added file 'debian/tests/postfix'
--- debian/tests/postfix 1970-01-01 00:00:00 +0000
+++ debian/tests/postfix 2013-05-07 21:42:30 +0000
@@ -0,0 +1,16 @@
1#!/bin/bash
2#----------------
3# Testing postfix
4#----------------
5set -e
6
7# reconfigure postfix
8debconf-set-selections <<< "postfix postfix/mailname string localhost" 2>&1
9debconf-set-selections <<< "postfix postfix/main_mailer_type string 'Internet Site'" 2>&1
10
11# install and modify
12hostname localhost
13apt-get install -y postfix 2>&1
14hostname --fqdn > /etc/mailname
15/etc/init.d/postfix restart 2>&1
16python `dirname $0`/test-postfix.py 2>&1
017
=== added file 'debian/tests/test-postfix.py'
--- debian/tests/test-postfix.py 1970-01-01 00:00:00 +0000
+++ debian/tests/test-postfix.py 2013-05-07 21:42:30 +0000
@@ -0,0 +1,535 @@
1#!/usr/bin/python
2#
3# test-postfix.py quality assurance test script for postfix
4# Copyright (C) 2008-2012 Canonical Ltd.
5# Author: Kees Cook <kees@ubuntu.com>
6# Author: Marc Deslauriers <marc.deslauriers@canonical.com>
7# Author: Jamie Strandboge <jamie@canonical.com>
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License version 3,
11# as published by the Free Software Foundation.
12#
13# This program is distributed in the hope that it will be useful,
14# but WITHOUT ANY WARRANTY; without even the implied warranty of
15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16# GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License
19# along with this program. If not, see <http://www.gnu.org/licenses/>.
20#
21# QRT-Packages: postfix sasl2-bin procmail python-pexpect
22# QRT-Privilege: root
23# QRT-Conflicts: exim4
24
25'''
26 Note: When installing postfix, select "Internet Site". This script will
27 not work if "Local Only" was selected.
28
29 How to run against a clean schroot named 'hardy':
30 schroot -c hardy -u root -- sh -c 'apt-get -y install procmail python-unit postfix sasl2-bin python-pexpect lsb-release && ./test-postfix.py -v'
31
32 Tests:
33 00: setup
34 10: basic plain auth setup
35 11: above, but with CVE reproducers
36 20: sasl non-PLAIN setup
37 21: 20, but with CVE reproducers
38 99: restore configs
39'''
40
41import unittest, subprocess, re, pexpect, smtplib, socket, os, time, tempfile
42import testlib
43
44class PostfixTest(testlib.TestlibCase):
45 '''Test Postfix MTA.'''
46
47 def _setUp(self):
48 '''Create server configs.'''
49
50 # Move listener to localhost:2525
51 conf_file = '/etc/postfix/master.cf'
52 lines = open(conf_file)
53 contents = ''
54 for cfline in lines:
55 if cfline.startswith('smtp') and 'smtpd' in cfline and 'inet' in cfline:
56 contents += '127.0.0.1:2525 inet n - - - - smtpd\n'
57 else:
58 contents += "%s\n" % cfline
59 testlib.config_replace(conf_file, contents, append=False)
60
61 conf_file = '/etc/postfix/main.cf'
62 # Use mbox only
63 testlib.config_comment(conf_file,'home_mailbox')
64 testlib.config_set(conf_file,'mailbox_command','procmail -a "$EXTENSION"')
65
66 # Turn on sasl
67 self._setup_sasl("PLAIN")
68 reply = self._check_auth("PLAIN")
69
70
71 def setUp(self):
72 '''Set up prior to each test_* function'''
73 # list of files that we update
74 self.conf_files = [ '/etc/postfix/master.cf', '/etc/postfix/main.cf', '/etc/default/saslauthd', '/etc/postfix/sasl/smtpd.conf', '/etc/sasldb2']
75
76 self.user = testlib.TestUser(lower=True)
77 self.s = None
78 # Silently allow for this connection to fail, to handle the
79 # initial setup of the postfix server.
80 try:
81 self.s = smtplib.SMTP('localhost', port=2525)
82 except:
83 pass
84
85 def _tearDown(self):
86 '''Restore server configs'''
87 for f in self.conf_files:
88 testlib.config_restore(f)
89
90 # put saslauthd back
91 for f in ['/var/spool/postfix/var/run/saslauthd', '/var/run/saslauthd']:
92 if os.path.isfile(f) or os.path.islink(f):
93 os.unlink(f)
94 elif os.path.exists(f):
95 testlib.recursive_rm(f)
96 subprocess.call(['mkdir','-p','/var/run/saslauthd'])
97 subprocess.call(['/etc/init.d/saslauthd', 'stop'], stdout=subprocess.PIPE)
98 subprocess.call(['/etc/init.d/saslauthd', 'start'], stdout=subprocess.PIPE)
99
100 def tearDown(self):
101 '''Clean up after each test_* function'''
102
103 try:
104 self.s.quit()
105 except:
106 pass
107 self.user = None
108
109 def _restart_server(self):
110 '''Restart server'''
111 subprocess.call(['/etc/init.d/postfix', 'stop'], stdout=subprocess.PIPE)
112 assert subprocess.call(['/etc/init.d/postfix', 'start'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
113 # Postfix exits its init script before the master listener has started
114 time.sleep(2)
115
116 def _setup_sasl(self, mech, other_mech="", force_sasldb=False):
117 '''Setup sasl for mech'''
118 conf_file = '/etc/postfix/main.cf'
119 for field in ['smtpd_sasl_type','smtpd_sasl_local_domain','smtpd_tls_auth_only']:
120 testlib.config_comment(conf_file,field)
121 testlib.config_set(conf_file,'smtpd_sasl_path','smtpd')
122 testlib.config_set(conf_file,'smtpd_sasl_auth_enable','yes')
123 #testlib.config_set(conf_file,'broken_sasl_auth_clients','yes')
124 testlib.config_set(conf_file,'smtpd_sasl_authenticated_header','yes')
125 testlib.config_set(conf_file,'smtpd_tls_loglevel','2')
126
127 # setup smtpd.conf and the sasl users
128 contents = ''
129
130 self.assertTrue(mech in ['LOGIN', 'PLAIN', 'CRAM-MD5', 'DIGEST-MD5'], "Invalid mech: %s" % mech)
131
132 if not force_sasldb and (mech == "PLAIN" or mech == "LOGIN"):
133 conf_file = '/etc/default/saslauthd'
134 testlib.config_set(conf_file, 'START', 'yes', spaces=False)
135
136 contents = '''
137pwcheck_method: saslauthd
138allowanonymouslogin: 0
139allowplaintext: 1
140mech_list: %s %s
141''' % (mech, other_mech)
142
143 # attach SASL to postfix chroot
144 subprocess.call(['mkdir','-p','/var/spool/postfix/var/run/saslauthd'])
145 subprocess.call(['rm','-rf','/var/run/saslauthd'])
146 subprocess.call(['ln','-s','/var/spool/postfix/var/run/saslauthd','/var/run/saslauthd'])
147 subprocess.call(['/etc/init.d/saslauthd', 'stop'], stdout=subprocess.PIPE)
148 assert subprocess.call(['/etc/init.d/saslauthd', 'start'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0
149
150 # Force crackful perms so chroot'd postfix can talk to saslauthd
151 subprocess.call(['chmod','o+x','/var/spool/postfix/var/run/saslauthd'])
152 else:
153 plaintext = "1"
154 if mech == "LOGIN" or mech == "PLAIN":
155 plaintext = "0"
156 contents = '''
157pwcheck_method: auxprop
158allowanonymouslogin: 0
159allowplaintext: %s
160mech_list: %s %s
161''' % (plaintext, mech, other_mech)
162
163 # Add user to sasldb2
164 testlib.config_replace("/etc/sasldb2", '', append=False)
165
166 rc, report = testlib.cmd(['postconf', '-h', 'myhostname'])
167 expected = 0
168 result = 'Got exit code %d, expected %d\n' % (rc, expected)
169 self.assertEquals(expected, rc, result + report)
170
171 child = pexpect.spawn('saslpasswd2 -c -u %s %s' % (report.strip(), self.user.login))
172 time.sleep(0.2)
173 child.expect(r'.*[pP]assword', timeout=5)
174 time.sleep(0.2)
175 child.sendline(self.user.password)
176 time.sleep(0.2)
177 child.expect(r'.*(for verification)', timeout=5)
178 time.sleep(0.2)
179 child.sendline(self.user.password)
180 time.sleep(0.2)
181 rc = child.expect('\n', timeout=5)
182 time.sleep(0.2)
183 self.assertEquals(rc, expected, "passwd returned %d" %(rc))
184
185 child.kill(0)
186
187 os.chmod("/etc/sasldb2", 0640)
188 rc, report = testlib.cmd(['chgrp', 'postfix', '/etc/sasldb2'])
189 expected = 0
190 result = 'Got exit code %d, expected %d\n' % (rc, expected)
191 self.assertEquals(expected, rc, result + report)
192
193 # Force crackful perms so chroot'd postfix can talk to saslauthd
194 subprocess.call(['mv', '-f', '/etc/sasldb2', '/var/spool/postfix/etc'])
195 subprocess.call(['ln', '-s', '/var/spool/postfix/etc/sasldb2', '/etc/sasldb2'])
196
197 conf_file = '/etc/postfix/sasl/smtpd.conf'
198 testlib.config_replace(conf_file, contents, append=False)
199
200 # Restart server
201 self._restart_server()
202
203 def _is_listening(self):
204 '''Is the server listening'''
205 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
206 s.settimeout(5)
207 s.connect(('localhost',2525))
208 greeting = s.recv(1024)
209 # 220 gorgon.outflux.net ESMTP Postfix (Ubuntu)
210 self.assertTrue(greeting.startswith('220 '),greeting)
211 self.assertTrue('ESMTP' in greeting,greeting)
212 self.assertTrue('Postfix' in greeting,greeting)
213 self.assertFalse('MTA' in greeting,greeting)
214 s.close()
215
216 def test_00_listening(self):
217 '''Postfix is listening'''
218 # Get the main instance running
219 self._setUp()
220
221 self._is_listening()
222
223 def _vrfy(self, address, valid = True):
224 self.s.putcmd("vrfy",address)
225 code, msg = self.s.getreply()
226 reply = '%d %s' % (code, msg)
227 if valid:
228 self.assertEquals(code, 252, reply)
229 self.assertTrue(address in msg, reply)
230 else:
231 self.assertEquals(code, 550, reply)
232 self.assertTrue('Recipient address rejected' in msg, reply)
233 self.assertTrue('<%s>' % (address) in msg, reply)
234
235 def test_10_commands(self):
236 '''Basic SMTP commands'''
237
238 #s = smtplib.SMTP('localhost', port=2525)
239 # EHLO
240 code, msg = self.s.ehlo()
241 reply = '%d %s' % (code, msg)
242 self.assertEquals(code, 250, reply)
243 self.assertEquals(self.s.does_esmtp, 1, reply)
244 self.assertTrue('8BITMIME' in self.s.ehlo_resp, reply)
245 # No help available
246 self.s.putcmd("help")
247 code, msg = self.s.getreply()
248 reply = '%d %s' % (code, msg)
249 self.assertEquals(code, 502, reply)
250 self.assertTrue('Error' in msg, reply)
251 # VRFY addresses
252 self._vrfy('address@example.com', valid=True)
253 self._vrfy('does-not-exist', valid=False)
254 self._vrfy(self.user.login, valid=True)
255
256 def _test_deliver_mail(self, user_sent_to, auth_user=None, auth_pass=None, use_tls=False):
257 '''Perform mail delivery'''
258
259 if auth_user and auth_pass:
260 self.s.login(auth_user, auth_pass)
261 if use_tls:
262 self.s.starttls()
263 failed = self.s.sendmail('root',[user_sent_to.login,'does-not-exist'],'''From: Rooty <root>
264To: "%s" <%s>
265Subject: This is test 1
266
267Hello, nice to meet you.
268''' % (user_sent_to.gecos, user_sent_to.login))
269 #for addr in failed.keys():
270 # print '%s %d %s' % (addr, failed[addr][0], failed[addr][1])
271 self.assertEquals(len(failed),1,failed)
272 self.assertTrue(failed.has_key('does-not-exist'),failed)
273 self.assertEquals(failed['does-not-exist'][0],550,failed)
274
275 # Frighteningly, postfix seems to accept email before confirming
276 # a successful write to disk for the recipient!
277 time.sleep(2)
278
279 def _test_mail_in_spool(self, user_directed_to, target_spool_user=None, spool_file=None, auth_user=None, use_tls=False):
280 '''Check that mail arrived in the spool'''
281
282 # Handle the case of forwarded emails
283 if target_spool_user == None:
284 target_spool_user = user_directed_to
285 # Read delivered email
286 if spool_file == None:
287 spool_file = '/var/mail/%s' % (target_spool_user.login)
288 time.sleep(1)
289 contents = open(spool_file).read()
290 # Server-side added headers...
291 self.assertTrue('\nReceived: ' in contents, contents)
292 if use_tls and self.lsb_release['Release'] > 6.06:
293 expected = ' (Postfix) with ESMTPS id '
294 else:
295 expected = ' (Postfix) with ESMTP id '
296 if auth_user:
297 if self.lsb_release['Release'] < 8.04:
298 self._skipped("Received header portion")
299 else:
300 expected = ' (Postfix) with ESMTPA id '
301 self.assertTrue('(Authenticated sender: %s)' % (auth_user))
302 self.assertTrue(expected in contents, 'Looking for "%s" in email:\n%s' % (expected, contents))
303 self.assertTrue('\nMessage-Id: ' in contents, contents)
304 self.assertTrue('\nDate: ' in contents, contents)
305 # client-side headers/body...
306 self.assertTrue('\nSubject: This is test 1' in contents, contents)
307 self.assertTrue('\nFrom: Rooty' in contents, contents)
308 self.assertTrue('\nTo: "Buddy %s" <%s@' % (user_directed_to.login, user_directed_to.login) in contents, contents)
309 self.assertTrue('\nHello, nice to meet you.' in contents, contents)
310
311 def _test_roundtrip_mail(self, user_sent_to, user_to_check=None, spool_file=None, auth_user=None, auth_pass=None, use_tls=False):
312 '''Send and check email delivery'''
313 self._test_deliver_mail(user_sent_to, auth_user, auth_pass, use_tls=use_tls)
314 self._test_mail_in_spool(user_sent_to, user_to_check, spool_file, auth_user=auth_user, use_tls=use_tls)
315
316 def test_10_sending_mail_direct(self):
317 '''Mail delivered normally'''
318 self._test_roundtrip_mail(self.user)
319
320 def test_10_sending_mail_direct_with_tls(self):
321 '''Mail delivered normally with TLS'''
322 self._test_roundtrip_mail(self.user, use_tls=True)
323
324 def test_10_sending_mail_direct_auth(self):
325 '''Mail authentication'''
326 # Verify rejected bad password and user
327 self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
328 self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
329 self.s.login(self.user.login, self.user.password)
330
331 def test_10_sending_mail_direct_auth_full(self):
332 '''Mail delivered with authentication'''
333 # Perform end-to-end authentication test
334 self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
335
336 def _write_forward(self, user, contents):
337 forward_filename = '/home/%s/.forward' % (user.login)
338 open(forward_filename,'w').write(contents)
339 os.chown(forward_filename, user.uid, user.gid)
340
341 def test_10_sending_mail_forward_normal(self):
342 '''Mail delivered via .forward'''
343
344 forward_user = testlib.TestUser(lower=True)
345 self._write_forward(forward_user, self.user.login+'\n')
346 self._test_roundtrip_mail(forward_user, self.user)
347
348 def test_10_sending_mail_forward_xternal(self):
349 '''Mail processed by commands in .forward'''
350
351 # Create user-writable redirected mbox destination
352 mbox, mbox_name = testlib.mkstemp_fill('',prefix='test-postfix.mbox-')
353 mbox.close()
354 os.chown(mbox_name, self.user.uid, self.user.gid)
355
356 # Create a script to run in the .forward
357 redir, redir_name = testlib.mkstemp_fill('''#!/bin/bash
358/bin/cat > "%s"
359''' % (mbox_name),prefix='test-postfix.redir-')
360 redir.close()
361 os.chmod(redir_name,0755)
362
363 self._write_forward(self.user,'|%s\n' % (redir_name))
364
365 # SKIP TESTING, FAILS IN TESTBED
366 #self._test_roundtrip_mail(self.user, spool_file=mbox_name)
367
368 os.unlink(redir_name)
369 os.unlink(mbox_name)
370
371 def test_11_security_CVE_2008_2936(self):
372 '''CVE-2008-2936 fixed'''
373
374 # First, create our "target" file
375 secret = '/root/secret.txt'
376 open(secret,'w').write('Secret information\n')
377 os.chmod(secret, 0700)
378
379 # Now, create a symlink to the target (we're going to use /var/tmp
380 # since we're assuming it, /root, /var/mail are on the same filesystem.
381 # For most chroot testing, /tmp is mounted from the real machine.
382 if os.path.exists('/var/tmp/secret.link'):
383 os.unlink('/var/tmp/secret.link')
384 self.assertEquals(subprocess.call(['su','-c','ln -s /root/secret.txt /var/tmp/secret.link',self.user.login]),0,"Symlink creation")
385
386 # Now, the hardlink, which in ubuntu's case needs to be done by root.
387 os.link('/var/tmp/secret.link','/var/mail/%s' % (self.user.login))
388
389 # Email delivered to this user will be written to the root-owned
390 # file now if the CVE is unfixed.
391 failed = self.s.sendmail('root',[self.user.login],'''From: Evil <root>
392To: "%s" <%s>
393Subject: This is an overwrite test
394
395Hello, nice to pwn you.
396''' % (self.user.gecos, self.user.login))
397 self.assertEquals(len(failed),0,failed)
398
399 # Pause for delivery
400 time.sleep(2)
401
402 contents = open(secret).read()
403 # Clean up before possible failures
404 os.unlink('/var/mail/%s' % (self.user.login))
405 os.unlink('/var/tmp/secret.link')
406 os.unlink(secret)
407 # Check results
408 self.assertTrue('Secret information' in contents, contents)
409 self.assertFalse('nice to pwn you' in contents, contents)
410
411 def _check_auth(self, mech):
412 '''Check AUTH: side effect-- self.s is set'''
413 try:
414 self.s.quit()
415 except:
416 pass
417 self.s = smtplib.SMTP('localhost', port=2525)
418
419 self._is_listening()
420
421 # has mech
422 code, msg = self.s.ehlo()
423 reply = '%d %s' % (code, msg)
424 self.assertEquals(code, 250, reply)
425 self.assertEquals(self.s.does_esmtp, 1, reply)
426 self.assertTrue('%s' % mech in self.s.ehlo_resp, reply)
427 return reply
428
429 def test_20_sasldb_cram_md5(self):
430 '''Test sasldb CRAM-MD5'''
431 # Quit the setUp() connection, restart the server and reconnect
432 self.s.quit()
433 self._setup_sasl("CRAM-MD5")
434
435 reply = self._check_auth("CRAM-MD5")
436 self.assertTrue('PLAIN' not in reply, reply)
437
438 # Verify rejected bad password and user
439 self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
440 self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
441
442 # Perform end-to-end authentication test
443 self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
444
445 def test_20_sasldb_digest_md5(self):
446 '''Test sasldb DIGEST-MD5 is supported'''
447 # Quit the setUp() connection, restart the server and reconnect
448 self.s.quit()
449 self._setup_sasl("DIGEST-MD5")
450
451 reply = self._check_auth("DIGEST-MD5")
452 self.assertTrue('PLAIN' not in reply, reply)
453
454 # TODO: Perform end-to-end authentication test (need alternative to smtplib)
455 #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
456 #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
457 #self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
458
459 def test_20_sasldb_login(self):
460 '''Test sasldb LOGIN is supported'''
461 # Quit the setUp() connection, restart the server and reconnect
462 self.s.quit()
463 self._setup_sasl("LOGIN", force_sasldb=True)
464
465 reply = self._check_auth("LOGIN")
466 self.assertTrue('PLAIN' not in reply, reply)
467
468 # TODO: Perform end-to-end authentication test (need alternative to smtplib)
469 #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
470 #self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
471 #self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
472
473 def test_20_sasldb_plain(self):
474 '''Test sasldb PLAIN'''
475 # Quit the setUp() connection, restart the server and reconnect
476 self.s.quit()
477 self._setup_sasl("PLAIN", force_sasldb=True)
478
479 reply = self._check_auth("PLAIN")
480
481 # Verify rejected bad password and user
482 self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, 'root', 'crapcrapcrap')
483 self.assertRaises(smtplib.SMTPAuthenticationError, self.s.login, self.user.login, 'crapcrapcrap')
484 # TODO: Perform end-to-end authentication test (need alternative to smtplib)
485 self._test_roundtrip_mail(self.user, auth_user=self.user.login, auth_pass=self.user.password)
486
487 def test_21_security_CVE_2011_1720(self):
488 '''CVE-2011-1720 fixed'''
489 # http://www.postfix.org/CVE-2011-1720.html
490
491 # setup sasl and connect
492 self.s.quit()
493 self._setup_sasl("CRAM-MD5", "DIGEST-MD5")
494
495 # verify sasl support
496 rc, report = testlib.cmd(['postconf', 'smtpd_sasl_auth_enable'])
497 expected = 0
498 result = 'Got exit code %d, expected %d\n' % (rc, expected)
499 self.assertEquals(expected, rc, result + report)
500 self.assertTrue('yes' in report, "Could not find 'yes' in report:\n%s" % report)
501
502 if self.lsb_release['Release'] > 6.06:
503 rc, report = testlib.cmd(['postconf', 'smtpd_sasl_type'])
504 expected = 0
505 result = 'Got exit code %d, expected %d\n' % (rc, expected)
506 self.assertEquals(expected, rc, result + report)
507 self.assertTrue('cyrus' in report, "Could not find 'cyrus' in report:\n%s" % report)
508
509 # ehlo
510 reply = self._check_auth("CRAM-MD5")
511 self.assertTrue('DIGEST-MD5' in reply, reply)
512
513 code, msg = self.s.docmd("AUTH", "CRAM-MD5")
514 reply = '%d %s' % (code, msg)
515 self.assertEquals(code, 334, reply)
516
517 code, msg = self.s.docmd("*")
518 reply = '%d %s' % (code, msg)
519 self.assertEquals(code, 501, reply)
520
521 error = False
522 try:
523 code, msg = self.s.docmd("AUTH", "DIGEST-MD5")
524 except:
525 error = True
526 self.assertFalse(error, "server disconnected")
527 reply = '%d %s' % (code, msg)
528 self.assertEquals(code, 334, reply)
529
530 def test_99_restore(self):
531 '''Restore configuration'''
532 self._tearDown()
533
534if __name__ == '__main__':
535 unittest.main()
0536
=== added file 'debian/tests/testlib.py'
--- debian/tests/testlib.py 1970-01-01 00:00:00 +0000
+++ debian/tests/testlib.py 2013-05-07 21:42:30 +0000
@@ -0,0 +1,1144 @@
1#
2# testlib.py quality assurance test script
3# Copyright (C) 2008-2011 Canonical Ltd.
4#
5# This library is free software; you can redistribute it and/or
6# modify it under the terms of the GNU Library General Public
7# License as published by the Free Software Foundation; either
8# version 2 of the License.
9#
10# This library is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13# Library General Public License for more details.
14#
15# You should have received a copy of the GNU Library General Public
16# License along with this program. If not, see
17# <http://www.gnu.org/licenses/>.
18#
19
20'''Common classes and functions for package tests.'''
21
22import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob
23import sys, socket, gzip
24from stat import *
25from encodings import string_escape
26
27import warnings
28warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
29try:
30 import apt_pkg
31 apt_pkg.InitSystem();
32except:
33 # On non-Debian system, fall back to simple comparison without debianisms
34 class apt_pkg(object):
35 def VersionCompare(one, two):
36 list_one = one.split('.')
37 list_two = two.split('.')
38 while len(list_one)>0 and len(list_two)>0:
39 if list_one[0] > list_two[0]:
40 return 1
41 if list_one[0] < list_two[0]:
42 return -1
43 list_one.pop(0)
44 list_two.pop(0)
45 return 0
46
47bogus_nxdomain = "208.69.32.132"
48
49# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
50# This is needed so that the subprocesses that produce endless output
51# actually quit when the reader goes away.
52import signal
53def subprocess_setup():
54 # Python installs a SIGPIPE handler by default. This is usually not what
55 # non-Python subprocesses expect.
56 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
57
58class TimedOutException(Exception):
59 def __init__(self, value = "Timed Out"):
60 self.value = value
61 def __str__(self):
62 return repr(self.value)
63
64def _restore_backup(path):
65 pathbackup = path + '.autotest'
66 if os.path.exists(pathbackup):
67 shutil.move(pathbackup, path)
68
69def _save_backup(path):
70 pathbackup = path + '.autotest'
71 if os.path.exists(path) and not os.path.exists(pathbackup):
72 shutil.copy2(path, pathbackup)
73 # copy2 does not copy ownership, so do it here.
74 # Reference: http://docs.python.org/library/shutil.html
75 a = os.stat(path)
76 os.chown(pathbackup, a[4], a[5])
77
78def config_copydir(path):
79 if os.path.exists(path) and not os.path.isdir(path):
80 raise OSError, "'%s' is not a directory" % (path)
81 _restore_backup(path)
82
83 pathbackup = path + '.autotest'
84 if os.path.exists(path):
85 shutil.copytree(path, pathbackup, symlinks=True)
86
87def config_replace(path,contents,append=False):
88 '''Replace (or append) to a config file'''
89 _restore_backup(path)
90 if os.path.exists(path):
91 _save_backup(path)
92 if append:
93 contents = file(path).read() + contents
94 open(path, 'w').write(contents)
95
96def config_comment(path, field):
97 _save_backup(path)
98 contents = ""
99 for line in file(path):
100 if re.search("^\s*%s\s*=" % (field), line):
101 line = "#" + line
102 contents += line
103
104 open(path+'.new', 'w').write(contents)
105 os.rename(path+'.new', path)
106
107def config_set(path, field, value, spaces=True):
108 _save_backup(path)
109 contents = ""
110 if spaces==True:
111 setting = '%s = %s\n' % (field, value)
112 else:
113 setting = '%s=%s\n' % (field, value)
114 found = False
115 for line in file(path):
116 if re.search("^\s*%s\s*=" % (field), line):
117 found = True
118 line = setting
119 contents += line
120 if not found:
121 contents += setting
122
123 open(path+'.new', 'w').write(contents)
124 os.rename(path+'.new', path)
125
126def config_patch(path, patch, depth=1):
127 '''Patch a config file'''
128 _restore_backup(path)
129 _save_backup(path)
130
131 handle, name = mkstemp_fill(patch)
132 rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
133 os.unlink(name)
134 if rc != 0:
135 raise Exception("Patch failed")
136
137def config_restore(path):
138 '''Rename a replaced config file back to its initial state'''
139 _restore_backup(path)
140
141def timeout(secs, f, *args):
142 def handler(signum, frame):
143 raise TimedOutException()
144
145 old = signal.signal(signal.SIGALRM, handler)
146 result = None
147 signal.alarm(secs)
148 try:
149 result = f(*args)
150 finally:
151 signal.alarm(0)
152 signal.signal(signal.SIGALRM, old)
153
154 return result
155
156def require_nonroot():
157 if os.geteuid() == 0:
158 print >>sys.stderr, "This series of tests should be run as a regular user with sudo access, not as root."
159 sys.exit(1)
160
161def require_root():
162 if os.geteuid() != 0:
163 print >>sys.stderr, "This series of tests should be run with root privileges (e.g. via sudo)."
164 sys.exit(1)
165
166def require_sudo():
167 if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
168 print >>sys.stderr, "This series of tests must be run under sudo."
169 sys.exit(1)
170 if os.environ['SUDO_USER'] == 'root':
171 print >>sys.stderr, 'Please run this test using sudo from a regular user. (You ran sudo from root.)'
172 sys.exit(1)
173
174def random_string(length,lower=False):
175 '''Return a random string, consisting of ASCII letters, with given
176 length.'''
177
178 s = ''
179 selection = string.letters
180 if lower:
181 selection = string.lowercase
182 maxind = len(selection)-1
183 for l in range(length):
184 s += selection[random.randint(0, maxind)]
185 return s
186
187def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
188 '''As tempfile.mkstemp does, return a (file, name) pair, but with
189 prefilled contents.'''
190
191 handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
192 os.close(handle)
193 handle = file(name,"w+")
194 handle.write(contents)
195 handle.flush()
196 handle.seek(0)
197
198 return handle, name
199
200def create_fill(path, contents, mode=0644):
201 '''Safely create a page'''
202 # make the temp file in the same dir as the destination file so we
203 # don't get invalid cross-device link errors when we rename
204 handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
205 handle.close()
206 os.rename(name, path)
207 os.chmod(path, mode)
208
209def login_exists(login):
210 '''Checks whether the given login exists on the system.'''
211
212 try:
213 pwd.getpwnam(login)
214 return True
215 except KeyError:
216 return False
217
218def group_exists(group):
219 '''Checks whether the given login exists on the system.'''
220
221 try:
222 grp.getgrnam(group)
223 return True
224 except KeyError:
225 return False
226
227def recursive_rm(dirPath, contents_only=False):
228 '''recursively remove directory'''
229 names = os.listdir(dirPath)
230 for name in names:
231 path = os.path.join(dirPath, name)
232 if os.path.islink(path) or not os.path.isdir(path):
233 os.unlink(path)
234 else:
235 recursive_rm(path)
236 if contents_only == False:
237 os.rmdir(dirPath)
238
239def check_pidfile(exe, pidfile):
240 '''Checks if pid in pidfile is running'''
241 if not os.path.exists(pidfile):
242 return False
243
244 # get the pid
245 try:
246 fd = open(pidfile, 'r')
247 pid = fd.readline().rstrip('\n')
248 fd.close()
249 except:
250 return False
251
252 return check_pid(exe, pid)
253
254def check_pid(exe, pid):
255 '''Checks if pid is running'''
256 cmdline = "/proc/%s/cmdline" % (str(pid))
257 if not os.path.exists(cmdline):
258 return False
259
260 # get the command line
261 try:
262 fd = open(cmdline, 'r')
263 tmp = fd.readline().split('\0')
264 fd.close()
265 except:
266 return False
267
268 # this allows us to match absolute paths or just the executable name
269 if re.match('^' + exe + '$', tmp[0]) or \
270 re.match('.*/' + exe + '$', tmp[0]) or \
271 re.match('^' + exe + ': ', tmp[0]) or \
272 re.match('^\(' + exe + '\)', tmp[0]):
273 return True
274
275 return False
276
277def check_port(port, proto, ver=4):
278 '''Check if something is listening on the specified port.
279 WARNING: for some reason this does not work with a bind mounted /proc
280 '''
281 assert (port >= 1)
282 assert (port <= 65535)
283 assert (proto.lower() == "tcp" or proto.lower() == "udp")
284 assert (ver == 4 or ver == 6)
285
286 fn = "/proc/net/%s" % (proto)
287 if ver == 6:
288 fn += str(ver)
289
290 rc, report = cmd(['cat', fn])
291 assert (rc == 0)
292
293 hport = "%0.4x" % port
294
295 if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
296 return True
297 return False
298
299def get_arch():
300 '''Get the current architecture'''
301 rc, report = cmd(['uname', '-m'])
302 assert (rc == 0)
303 return report.strip()
304
305def get_memory():
306 '''Gets total ram and swap'''
307 meminfo = "/proc/meminfo"
308 memtotal = 0
309 swaptotal = 0
310 if not os.path.exists(meminfo):
311 return (False, False)
312
313 try:
314 fd = open(meminfo, 'r')
315 for line in fd.readlines():
316 splitline = line.split()
317 if splitline[0] == 'MemTotal:':
318 memtotal = int(splitline[1])
319 elif splitline[0] == 'SwapTotal:':
320 swaptotal = int(splitline[1])
321 fd.close()
322 except:
323 return (False, False)
324
325 return (memtotal,swaptotal)
326
327def is_running_in_vm():
328 '''Check if running under a VM'''
329 # add other virtualization environments here
330 for search in ['QEMU Virtual CPU']:
331 rc, report = cmd_pipe(['dmesg'], ['grep', search])
332 if rc == 0:
333 return True
334 return False
335
336def ubuntu_release():
337 '''Get the Ubuntu release'''
338 f = "/etc/lsb-release"
339 try:
340 size = os.stat(f)[ST_SIZE]
341 except:
342 return "UNKNOWN"
343
344 if size > 1024*1024:
345 raise IOError, 'Could not open "%s" (too big)' % f
346
347 try:
348 fh = open("/etc/lsb-release", 'r')
349 except:
350 raise
351
352 lines = fh.readlines()
353 fh.close()
354
355 pat = re.compile(r'DISTRIB_CODENAME')
356 for line in lines:
357 if pat.search(line):
358 return line.split('=')[1].rstrip('\n').rstrip('\r')
359
360 return "UNKNOWN"
361
362def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
363 '''Try to execute given command (array) and return its stdout, or return
364 a textual error if it failed.'''
365
366 try:
367 sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
368 except OSError, e:
369 return [127, str(e)]
370
371 out, outerr = sp.communicate(input)
372 # Handle redirection of stdout
373 if out == None:
374 out = ''
375 # Handle redirection of stderr
376 if outerr == None:
377 outerr = ''
378 return [sp.returncode,out+outerr]
379
380def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
381 '''Try to pipe command1 into command2.'''
382 try:
383 sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
384 sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
385 except OSError, e:
386 return [127, str(e)]
387
388 out = sp2.communicate(input)[0]
389 return [sp2.returncode,out]
390
391def cwd_has_enough_space(cdir, total_bytes):
392 '''Determine if the partition of the current working directory has 'bytes'
393 free.'''
394 rc, df_output = cmd(['df'])
395 result = 'Got exit code %d, expected %d\n' % (rc, 0)
396 if rc != 0:
397 return False
398
399 kb = total_bytes / 1024
400
401 mounts = dict()
402 for line in df_output.splitlines():
403 if '/' not in line:
404 continue
405 tmp = line.split()
406 mounts[tmp[5]] = int(tmp[3])
407
408 cdir = os.getcwd()
409 while cdir != '/':
410 if not mounts.has_key(cdir):
411 cdir = os.path.dirname(cdir)
412 continue
413 if kb < mounts[cdir]:
414 return True
415 else:
416 return False
417
418 if kb < mounts['/']:
419 return True
420
421 return False
422
423def get_md5(filename):
424 '''Gets the md5sum of the file specified'''
425
426 (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
427 expected = 0
428 assert (expected == rc)
429
430 return report.split(' ')[0]
431
432def dpkg_compare_installed_version(pkg, check, version):
433 '''Gets the version for the installed package, and compares it to the
434 specified version.
435 '''
436 (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
437 assert (rc == 0)
438 assert ("Status: install ok installed" in report)
439 installed_version = ""
440 for line in report.splitlines():
441 if line.startswith("Version: "):
442 installed_version = line.split()[1]
443
444 assert (installed_version != "")
445
446 (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
447 assert (rc == 0 or rc == 1)
448 if rc == 0:
449 return True
450 return False
451
452def prepare_source(source, builder, cached_src, build_src, patch_system):
453 '''Download and unpack source package, installing necessary build depends,
454 adjusting the permissions for the 'builder' user, and returning the
455 directory of the unpacked source. Patch system can be one of:
456 - cdbs
457 - dpatch
458 - quilt
459 - quiltv3
460 - None (not the string)
461
462 This is normally used like this:
463
464 def setUp(self):
465 ...
466 self.topdir = os.getcwd()
467 self.cached_src = os.path.join(os.getcwd(), "source")
468 self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
469 self.builder = testlib.TestUser()
470 testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
471 os.chmod(self.tmpdir, 0775)
472
473 def tearDown(self):
474 ...
475 self.builder = None
476 self.topdir = os.getcwd()
477 if os.path.exists(self.tmpdir):
478 testlib.recursive_rm(self.tmpdir)
479
480 def test_suite_build(self):
481 ...
482 build_dir = testlib.prepare_source('foo', \
483 self.builder, \
484 self.cached_src, \
485 os.path.join(self.tmpdir, \
486 os.path.basename(self.cached_src)),
487 "quilt")
488 os.chdir(build_dir)
489
490 # Example for typical build, adjust as necessary
491 print ""
492 print " make clean"
493 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
494
495 print " configure"
496 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
497
498 print " make (will take a while)"
499 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
500
501 print " make check (will take a while)",
502 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
503 expected = 0
504 result = 'Got exit code %d, expected %d\n' % (rc, expected)
505 self.assertEquals(expected, rc, result + report)
506
507 def test_suite_cleanup(self):
508 ...
509 if os.path.exists(self.cached_src):
510 testlib.recursive_rm(self.cached_src)
511
512 It is up to the caller to clean up cached_src and build_src (as in the
513 above example, often the build_src is in a tmpdir that is cleaned in
514 tearDown() and the cached_src is cleaned in a one time clean-up
515 operation (eg 'test_suite_cleanup()) which must be run after the build
516 suite test (obviously).
517 '''
518
519 # Make sure we have a clean slate
520 assert (os.path.exists(os.path.dirname(build_src)))
521 assert (not os.path.exists(build_src))
522
523 cdir = os.getcwd()
524 if os.path.exists(cached_src):
525 shutil.copytree(cached_src, build_src)
526 os.chdir(build_src)
527 else:
528 # Only install the build dependencies on the initial setup
529 rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
530 assert (rc == 0)
531
532 os.makedirs(build_src)
533 os.chdir(build_src)
534
535 # These are always needed
536 pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
537 rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
538 assert (rc == 0)
539
540 rc, report = cmd(['apt-get','source',source])
541 assert (rc == 0)
542 shutil.copytree(build_src, cached_src)
543
544 unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
545
546 # Now apply the patches. Do it here so that we don't mess up our cached
547 # sources.
548 os.chdir(unpacked_dir)
549 assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
550 if patch_system != None and patch_system != "quiltv3":
551 if patch_system == "quilt":
552 os.environ.setdefault('QUILT_PATCHES','debian/patches')
553 rc, report = cmd(['quilt', 'push', '-a'])
554 assert (rc == 0)
555 elif patch_system == "cdbs":
556 rc, report = cmd(['./debian/rules', 'apply-patches'])
557 assert (rc == 0)
558 elif patch_system == "dpatch":
559 rc, report = cmd(['dpatch', 'apply-all'])
560 assert (rc == 0)
561
562 cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
563 os.chdir(cdir)
564
565 return unpacked_dir
566
567def _aa_status():
568 '''Get aa-status output'''
569 exe = "/usr/sbin/aa-status"
570 assert (os.path.exists(exe))
571 if os.geteuid() == 0:
572 return cmd([exe])
573 return cmd(['sudo', exe])
574
575def is_apparmor_loaded(path):
576 '''Check if profile is loaded'''
577 rc, report = _aa_status()
578 if rc != 0:
579 return False
580
581 for line in report.splitlines():
582 if line.endswith(path):
583 return True
584 return False
585
586def is_apparmor_confined(path):
587 '''Check if application is confined'''
588 rc, report = _aa_status()
589 if rc != 0:
590 return False
591
592 for line in report.splitlines():
593 if re.search('%s \(' % path, line):
594 return True
595 return False
596
597def check_apparmor(path, first_ubuntu_release, is_running=True):
598 '''Check if path is loaded and confined for everything higher than the
599 first Ubuntu release specified.
600
601 Usage:
602 rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
603 if rc < 0:
604 return self._skipped(report)
605
606 expected = 0
607 result = 'Got exit code %d, expected %d\n' % (rc, expected)
608 self.assertEquals(expected, rc, result + report)
609 '''
610 global manager
611 rc = -1
612
613 if manager.lsb_release["Release"] < first_ubuntu_release:
614 return (rc, "Skipped apparmor check")
615
616 if not os.path.exists('/sbin/apparmor_parser'):
617 return (rc, "Skipped (couldn't find apparmor_parser)")
618
619 rc = 0
620 msg = ""
621 if not is_apparmor_loaded(path):
622 rc = 1
623 msg = "Profile not loaded for '%s'" % path
624
625 # this check only makes sense it the 'path' is currently executing
626 if is_running and rc == 0 and not is_apparmor_confined(path):
627 rc = 1
628 msg = "'%s' is not running in enforce mode" % path
629
630 return (rc, msg)
631
632def get_gcc_version(gcc, full=True):
633 gcc_version = 'none'
634 if not gcc.startswith('/'):
635 gcc = '/usr/bin/%s' % (gcc)
636 if os.path.exists(gcc):
637 gcc_version = 'unknown'
638 lines = cmd([gcc,'-v'])[1].strip().splitlines()
639 version_lines = [x for x in lines if x.startswith('gcc version')]
640 if len(version_lines) == 1:
641 gcc_version = " ".join(version_lines[0].split()[2:])
642 if not full:
643 return gcc_version.split()[0]
644 return gcc_version
645
646def is_kdeinit_running():
647 '''Test if kdeinit is running'''
648 # applications that use kdeinit will spawn it if it isn't running in the
649 # test. This is a problem because it does not exit. This is a helper to
650 # check for it.
651 rc, report = cmd(['ps', 'x'])
652 if 'kdeinit4 Running' not in report:
653 print >>sys.stderr, ("kdeinit not running (you may start/stop any KDE application then run this script again)")
654 return False
655 return True
656
657def get_pkgconfig_flags(libs=[]):
658 '''Find pkg-config flags for libraries'''
659 assert (len(libs) > 0)
660 rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
661 expected = 0
662 if rc != expected:
663 print >>sys.stderr, 'Got exit code %d, expected %d\n' % (rc, expected)
664 assert(rc == expected)
665 return pkg_config.split()
666
667class TestDaemon:
668 '''Helper class to manage daemons consistently'''
669 def __init__(self, init):
670 '''Setup daemon attributes'''
671 self.initscript = init
672
673 def start(self):
674 '''Start daemon'''
675 rc, report = cmd([self.initscript, 'start'])
676 expected = 0
677 result = 'Got exit code %d, expected %d\n' % (rc, expected)
678 time.sleep(2)
679 if expected != rc:
680 return (False, result + report)
681
682 if "fail" in report:
683 return (False, "Found 'fail' in report\n" + report)
684
685 return (True, "")
686
687 def stop(self):
688 '''Stop daemon'''
689 rc, report = cmd([self.initscript, 'stop'])
690 expected = 0
691 result = 'Got exit code %d, expected %d\n' % (rc, expected)
692 if expected != rc:
693 return (False, result + report)
694
695 if "fail" in report:
696 return (False, "Found 'fail' in report\n" + report)
697
698 return (True, "")
699
700 def reload(self):
701 '''Reload daemon'''
702 rc, report = cmd([self.initscript, 'force-reload'])
703 expected = 0
704 result = 'Got exit code %d, expected %d\n' % (rc, expected)
705 if expected != rc:
706 return (False, result + report)
707
708 if "fail" in report:
709 return (False, "Found 'fail' in report\n" + report)
710
711 return (True, "")
712
713 def restart(self):
714 '''Restart daemon'''
715 (res, str) = self.stop()
716 if not res:
717 return (res, str)
718
719 (res, str) = self.start()
720 if not res:
721 return (res, str)
722
723 return (True, "")
724
725 def status(self):
726 '''Check daemon status'''
727 rc, report = cmd([self.initscript, 'status'])
728 expected = 0
729 result = 'Got exit code %d, expected %d\n' % (rc, expected)
730 if expected != rc:
731 return (False, result + report)
732
733 if "fail" in report:
734 return (False, "Found 'fail' in report\n" + report)
735
736 return (True, "")
737
738class TestlibManager(object):
739 '''Singleton class used to set up per-test-run information'''
740 def __init__(self):
741 # Set glibc aborts to dump to stderr instead of the tty so test output
742 # is more sane.
743 os.environ.setdefault('LIBC_FATAL_STDERR_','1')
744
745 # check verbosity
746 self.verbosity = False
747 if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
748 self.verbosity = True
749
750 # Load LSB release file
751 self.lsb_release = dict()
752 if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
753 raise OSError, "Please install 'lsb-release'"
754 for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].splitlines():
755 field, value = line.split(':',1)
756 value=value.strip()
757 field=field.strip()
758 # Convert numerics
759 try:
760 value = float(value)
761 except:
762 pass
763 self.lsb_release.setdefault(field,value)
764
765 # FIXME: hack OEM releases into known-Ubuntu versions
766 if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
767 if self.lsb_release['Release'] == 1.0:
768 self.lsb_release['Distributor ID'] = "Ubuntu"
769 self.lsb_release['Release'] = 8.04
770 else:
771 raise OSError, "Unknown version of HP MIE"
772
773 # FIXME: hack to assume a most-recent release if we're not
774 # running under Ubuntu.
775 if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
776 self.lsb_release['Release'] = 10000
777 # Adjust Linaro release to pretend to be Ubuntu
778 if self.lsb_release['Distributor ID'] in ["Linaro"]:
779 self.lsb_release['Distributor ID'] = "Ubuntu"
780 self.lsb_release['Release'] -= 0.01
781
782 # Load arch
783 if not os.path.exists('/usr/bin/dpkg'):
784 machine = cmd(['uname','-m'])[1].strip()
785 if machine.endswith('86'):
786 self.dpkg_arch = 'i386'
787 elif machine.endswith('_64'):
788 self.dpkg_arch = 'amd64'
789 elif machine.startswith('arm'):
790 self.dpkg_arch = 'armel'
791 else:
792 raise ValueError, "Unknown machine type '%s'" % (machine)
793 else:
794 self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
795
796 # Find kernel version
797 self.kernel_is_ubuntu = False
798 self.kernel_version_signature = None
799 self.kernel_version = cmd(["uname","-r"])[1].strip()
800 versig = '/proc/version_signature'
801 if os.path.exists(versig):
802 self.kernel_is_ubuntu = True
803 self.kernel_version_signature = file(versig).read().strip()
804 self.kernel_version_ubuntu = self.kernel_version
805 elif os.path.exists('/usr/bin/dpkg'):
806 # this can easily be inaccurate but is only an issue for Dapper
807 rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
808 if rc == 0:
809 self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
810 self.kernel_version_ubuntu = self.kernel_version_signature
811 if self.kernel_version_signature == None:
812 # Attempt to fall back to something for non-Debian-based
813 self.kernel_version_signature = self.kernel_version
814 self.kernel_version_ubuntu = self.kernel_version
815 # Build ubuntu version without hardware suffix
816 try:
817 self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
818 except:
819 pass
820
821 # Find gcc version
822 self.gcc_version = get_gcc_version('gcc')
823
824 # Find libc
825 self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
826
827 # Report self
828 if self.verbosity:
829 kernel = self.kernel_version_ubuntu
830 if kernel != self.kernel_version_signature:
831 kernel += " (%s)" % (self.kernel_version_signature)
832 print >>sys.stdout, "Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
833 sys.argv[0],
834 self.lsb_release['Distributor ID'],
835 self.lsb_release['Release'],
836 kernel,
837 self.dpkg_arch,
838 os.geteuid(), os.getuid(),
839 os.environ.get('SUDO_USER', ''))
840 sys.stdout.flush()
841
842 # Additional heuristics
843 #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
844 # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
845 # sys.stdout.flush()
846 # time.sleep(0.5)
847 # sys.stdout.write("destroyed\n")
848 # time.sleep(0.5)
849
850 def hello(self, msg):
851 print >>sys.stderr, "Hello from %s" % (msg)
852# The central instance
853manager = TestlibManager()
854
855class TestlibCase(unittest.TestCase):
856 def __init__(self, *args):
857 '''This is called for each TestCase test instance, which isn't much better
858 than SetUp.'''
859
860 unittest.TestCase.__init__(self, *args)
861
862 # Attach to and duplicate dicts from manager singleton
863 self.manager = manager
864 #self.manager.hello(repr(self) + repr(*args))
865 self.my_verbosity = self.manager.verbosity
866 self.lsb_release = self.manager.lsb_release
867 self.dpkg_arch = self.manager.dpkg_arch
868 self.kernel_version = self.manager.kernel_version
869 self.kernel_version_signature = self.manager.kernel_version_signature
870 self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
871 self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
872 self.gcc_version = self.manager.gcc_version
873 self.path_libc = self.manager.path_libc
874
875 def version_compare(self, one, two):
876 return apt_pkg.VersionCompare(one,two)
877
878 def assertFileType(self, filename, filetype):
879 '''Checks the file type of the file specified'''
880
881 (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
882 out = out.strip()
883 expected = 0
884 # Absolutely no idea why this happens on Hardy
885 if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
886 rc = 0
887 result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
888 self.assertEquals(expected, rc, result)
889
890 filetype = '^%s$' % (filetype)
891 result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
892 self.assertNotEquals(None, re.search(filetype, out), result)
893
894 def yank_commonname_from_cert(self, certfile):
895 '''Extract the commonName from a given PEM'''
896 rc, out = cmd(['openssl','asn1parse','-in',certfile])
897 if rc == 0:
898 ready = False
899 for line in out.splitlines():
900 if ready:
901 return line.split(':')[-1]
902 if ':commonName' in line:
903 ready = True
904 return socket.getfqdn()
905
906 def announce(self, text):
907 if self.my_verbosity:
908 print >>sys.stdout, "(%s) " % (text),
909 sys.stdout.flush()
910
911 def make_clean(self):
912 rc, output = self.shell_cmd(['make','clean'])
913 self.assertEquals(rc, 0, output)
914
915 def get_makefile_compiler(self):
916 # Find potential compiler name
917 compiler = 'gcc'
918 if os.path.exists('Makefile'):
919 for line in open('Makefile'):
920 if line.startswith('CC') and '=' in line:
921 items = [x.strip() for x in line.split('=')]
922 if items[0] == 'CC':
923 compiler = items[1]
924 break
925 return compiler
926
927 def make_target(self, target, expected=0):
928 '''Compile a target and report output'''
929
930 compiler = self.get_makefile_compiler()
931 rc, output = self.shell_cmd(['make',target])
932 self.assertEquals(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
933 self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
934 return output
935
936 # call as return testlib.skipped()
937 def _skipped(self, reason=""):
938 '''Provide a visible way to indicate that a test was skipped'''
939 if reason != "":
940 reason = ': %s' % (reason)
941 self.announce("skipped%s" % (reason))
942 return False
943
944 def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
945 argstr = "'" + "', '".join(args).strip() + "'"
946 rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
947 report = 'Command: ' + argstr + '\nOutput:\n' + out
948 return rc, report, out
949
950 def shell_cmd(self, args, stdin=None):
951 return cmd(args,stdin=stdin)
952
953 def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
954 '''Test a shell command matches a specific exit code'''
955 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
956 result = 'Got exit code %d, expected %d\n' % (rc, expected)
957 self.assertEquals(expected, rc, msg + result + report)
958
959 def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
960 '''Test a shell command doesn't match a specific exit code'''
961 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
962 result = 'Got (unwanted) exit code %d\n' % rc
963 self.assertNotEquals(unwanted, rc, msg + result + report)
964
965 def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
966 '''Test a shell command contains a specific output'''
967 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
968 result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
969 if not invert:
970 self.assertTrue(text in out, msg + result + report)
971 else:
972 self.assertFalse(text in out, msg + result + report)
973
974 def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
975 '''Test a shell command matches a specific output'''
976 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
977 result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
978 if not invert:
979 self.assertEquals(text, out, msg + result + report)
980 else:
981 self.assertNotEquals(text, out, msg + result + report)
982 if expected != None:
983 result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
984 self.assertEquals(rc, expected, msg + result + report)
985
986 def _word_find(self, report, content, invert=False):
987 '''Check for a specific string'''
988 if invert:
989 warning = 'Found "%s"\n' % content
990 self.assertTrue(content not in report, warning + report)
991 else:
992 warning = 'Could not find "%s"\n' % content
993 self.assertTrue(content in report, warning + report)
994
995 def _test_sysctl_value(self, path, expected, msg=None, exists=True):
996 sysctl = '/proc/sys/%s' % (path)
997 self.assertEquals(exists, os.path.exists(sysctl), sysctl)
998 value = None
999 if exists:
1000 value = int(file(sysctl).read())
1001 report = "%s is not %d: %d" % (sysctl, expected, value)
1002 if msg:
1003 report += " (%s)" % (msg)
1004 self.assertEquals(value, expected, report)
1005 return value
1006
1007 def set_sysctl_value(self, path, desired):
1008 sysctl = '/proc/sys/%s' % (path)
1009 self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
1010 file(sysctl,'w').write(str(desired))
1011 self._test_sysctl_value(path, desired)
1012
1013 def kernel_at_least(self, introduced):
1014 return self.version_compare(self.kernel_version_ubuntu,
1015 introduced) >= 0
1016
1017 def kernel_claims_cve_fixed(self, cve):
1018 changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
1019 if os.path.exists(changelog):
1020 for line in gzip.open(changelog):
1021 if cve in line and not "revert" in line and not "Revert" in line:
1022 return True
1023 return False
1024
1025class TestGroup:
1026 '''Create a temporary test group and remove it again in the dtor.'''
1027
1028 def __init__(self, group=None, lower=False):
1029 '''Create a new group'''
1030
1031 self.group = None
1032 if group:
1033 if group_exists(group):
1034 raise ValueError, 'group name already exists'
1035 else:
1036 while(True):
1037 group = random_string(7,lower=lower)
1038 if not group_exists(group):
1039 break
1040
1041 assert subprocess.call(['groupadd',group]) == 0
1042 self.group = group
1043 g = grp.getgrnam(self.group)
1044 self.gid = g[2]
1045
1046 def __del__(self):
1047 '''Remove the created group.'''
1048
1049 if self.group:
1050 rc, report = cmd(['groupdel', self.group])
1051 assert rc == 0
1052
1053class TestUser:
1054 '''Create a temporary test user and remove it again in the dtor.'''
1055
1056 def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
1057 '''Create a new user account with a random password.
1058
1059 By default, the login name is random, too, but can be explicitly
1060 specified with 'login'. By default, a home directory is created, this
1061 can be suppressed with 'home=False'.'''
1062
1063 self.login = None
1064
1065 if os.geteuid() != 0:
1066 raise ValueError, "You must be root to run this test"
1067
1068 if login:
1069 if login_exists(login):
1070 raise ValueError, 'login name already exists'
1071 else:
1072 while(True):
1073 login = 't' + random_string(7,lower=lower)
1074 if not login_exists(login):
1075 break
1076
1077 self.salt = random_string(2)
1078 self.password = random_string(8,lower=lower)
1079 self.crypted = crypt.crypt(self.password, self.salt)
1080
1081 creation = ['useradd', '-p', self.crypted]
1082 if home:
1083 creation += ['-m']
1084 if group:
1085 creation += ['-G',group]
1086 if uidmin:
1087 creation += ['-K','UID_MIN=%d'%uidmin]
1088 if shell:
1089 creation += ['-s',shell]
1090 creation += [login]
1091 assert subprocess.call(creation) == 0
1092 # Set GECOS
1093 assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
1094
1095 self.login = login
1096 p = pwd.getpwnam(self.login)
1097 self.uid = p[2]
1098 self.gid = p[3]
1099 self.gecos = p[4]
1100 self.home = p[5]
1101 self.shell = p[6]
1102
1103 def __del__(self):
1104 '''Remove the created user account.'''
1105
1106 if self.login:
1107 # sanity check the login name so we don't accidentally wipe too much
1108 if len(self.login)>3 and not '/' in self.login:
1109 subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
1110 rc, report = cmd(['userdel', '-f', self.login])
1111 assert rc == 0
1112
1113 def add_to_group(self, group):
1114 '''Add user to the specified group name'''
1115 rc, report = cmd(['usermod', '-G', group, self.login])
1116 if rc != 0:
1117 print report
1118 assert rc == 0
1119
1120# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
1121class TimeoutFunctionException(Exception):
1122 """Exception to raise on a timeout"""
1123 pass
1124class TimeoutFunction:
1125 def __init__(self, function, timeout):
1126 self.timeout = timeout
1127 self.function = function
1128
1129 def handle_timeout(self, signum, frame):
1130 raise TimeoutFunctionException()
1131
1132 def __call__(self, *args, **kwargs):
1133 old = signal.signal(signal.SIGALRM, self.handle_timeout)
1134 signal.alarm(self.timeout)
1135 try:
1136 result = self.function(*args, **kwargs)
1137 finally:
1138 signal.signal(signal.SIGALRM, old)
1139 signal.alarm(0)
1140 return result
1141
1142def main():
1143 print "hi"
1144 unittest.main()

Subscribers

People subscribed via source and target branches

to all changes: