Merge lp:~yolanda.robla/ubuntu/saucy/freeradius/dep-8-tests into lp:ubuntu/saucy/freeradius

Proposed by Yolanda Robla
Status: Merged
Merge reported by: James Page
Merged at revision: not available
Proposed branch: lp:~yolanda.robla/ubuntu/saucy/freeradius/dep-8-tests
Merge into: lp:ubuntu/saucy/freeradius
Diff against target: 1372 lines (+1323/-0)
8 files modified
debian/changelog (+6/-0)
debian/control (+1/-0)
debian/tests/clients (+34/-0)
debian/tests/control (+3/-0)
debian/tests/daemon (+13/-0)
debian/tests/freeradius (+6/-0)
debian/tests/test-freeradius.py (+116/-0)
debian/tests/testlib.py (+1144/-0)
To merge this branch: bzr merge lp:~yolanda.robla/ubuntu/saucy/freeradius/dep-8-tests
Reviewer Review Type Date Requested Status
James Page Approve
Ubuntu branches Pending
Review via email: mp+166215@code.launchpad.net

Description of the change

Added autopkgtests

To post a comment you must log in.
Revision history for this message
James Page (james-page) wrote :

LGTM - uploading.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'debian/changelog'
--- debian/changelog 2012-12-29 00:54:44 +0000
+++ debian/changelog 2013-05-29 10:49:29 +0000
@@ -1,3 +1,9 @@
1freeradius (2.1.12+dfsg-1.2ubuntu2) saucy; urgency=low
2
3 * d/tests: added autopkgtests
4
5 -- Yolanda Robla <yolanda.robla@canonical.com> Fri, 24 May 2013 16:06:20 +0200
6
1freeradius (2.1.12+dfsg-1.2ubuntu1) raring; urgency=low7freeradius (2.1.12+dfsg-1.2ubuntu1) raring; urgency=low
28
3 * Fix FTBFS with multiarched python.9 * Fix FTBFS with multiarched python.
410
=== modified file 'debian/control'
--- debian/control 2012-12-29 00:54:44 +0000
+++ debian/control 2013-05-29 10:49:29 +0000
@@ -25,6 +25,7 @@
25Uploaders: Stephen Gran <sgran@debian.org>, Mark Hymers <mhy@debian.org>25Uploaders: Stephen Gran <sgran@debian.org>, Mark Hymers <mhy@debian.org>
26Standards-Version: 3.9.226Standards-Version: 3.9.2
27Homepage: http://www.freeradius.org/27Homepage: http://www.freeradius.org/
28XS-Testsuite: autopkgtest
2829
29Package: freeradius30Package: freeradius
30Architecture: any31Architecture: any
3132
=== added directory 'debian/tests'
=== added file 'debian/tests/clients'
--- debian/tests/clients 1970-01-01 00:00:00 +0000
+++ debian/tests/clients 2013-05-29 10:49:29 +0000
@@ -0,0 +1,34 @@
1#!/bin/bash
2#-------------------------
3# Testing client utilities
4#-------------------------
5set -e
6
7HELP_CLIENTS=('radsniff')
8for client in "${HELP_CLIENTS[@]}"; do
9 RET=$($client -h 2>&1 > /dev/null)
10
11 if [[ $RET ]]; then
12 echo "ERROR, ${client} is not running"
13 fi
14done
15
16VERSION_CLIENTS=('radclient' 'radeapclient')
17for client in "${VERSION_CLIENTS[@]}"; do
18 RET=$($client -v 2>&1 > /dev/null)
19
20 if [[ $RET ]]; then
21 echo "ERROR, ${client} is not running"
22 exit $RET
23 fi
24done
25
26ALONE_CLIENTS=('radlast')
27for client in "${ALONE_CLIENTS[@]}"; do
28 RET=$($client 2>&1 > /dev/null)
29
30 if [[ $RET ]]; then
31 echo "ERROR, ${client} is not running"
32 exit $RET
33 fi
34done
035
=== added file 'debian/tests/control'
--- debian/tests/control 1970-01-01 00:00:00 +0000
+++ debian/tests/control 2013-05-29 10:49:29 +0000
@@ -0,0 +1,3 @@
1Tests: freeradius daemon clients
2Depends: freeradius, freeradius-utils, python-unit, lsb-release
3Restrictions: needs-root
04
=== added file 'debian/tests/daemon'
--- debian/tests/daemon 1970-01-01 00:00:00 +0000
+++ debian/tests/daemon 2013-05-29 10:49:29 +0000
@@ -0,0 +1,13 @@
1#!/bin/bash
2#-------------------
3# Testing freeradius
4#-------------------
5set -e
6DAEMON=freeradius
7
8if pidof -x $DAEMON > /dev/null; then
9 echo "OK"
10else
11 echo "ERROR: ${DAEMON} IS NOT RUNNING"
12 exit 1
13fi
014
=== added file 'debian/tests/freeradius'
--- debian/tests/freeradius 1970-01-01 00:00:00 +0000
+++ debian/tests/freeradius 2013-05-29 10:49:29 +0000
@@ -0,0 +1,6 @@
1#!/bin/bash
2#-------------------
3# Testing freeradius
4#-------------------
5set -e
6python `dirname $0`/test-freeradius.py 2>&1
07
=== added file 'debian/tests/test-freeradius.py'
--- debian/tests/test-freeradius.py 1970-01-01 00:00:00 +0000
+++ debian/tests/test-freeradius.py 2013-05-29 10:49:29 +0000
@@ -0,0 +1,116 @@
1#!/usr/bin/python
2#
3# test-freeradius.py quality assurance test script for freeradius
4# Copyright (C) 2009-2012 Canonical Ltd.
5# Author: Marc Deslauriers <marc.deslauriers@ubuntu.com>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License version 3,
9# as published by the Free Software Foundation.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
18#
19# packages required for test to run:
20# QRT-Packages: freeradius python-unit
21# packages where more than one package can satisfy a runtime requirement:
22# QRT-Alternates:
23# files and directories required for the test to run:
24# QRT-Depends:
25# QRT-Privilege: root
26
27'''
28 How to run against a clean schroot named 'lucid':
29 schroot -c lucid -u root -- sh -c 'apt-get -y install python-unit lsb-release freeradius && ./test-freeradius.py -v'
30
31'''
32
33
34import unittest, subprocess, sys, tempfile, os, socket, time
35import testlib
36
37try:
38 from private.qrt.freeradius import PrivateFreeradiusTest
39except ImportError:
40 class PrivateFreeradiusTest(object):
41 '''Empty class'''
42 print >>sys.stdout, "Skipping private tests"
43
44class FreeradiusTest(testlib.TestlibCase, PrivateFreeradiusTest):
45 '''Test FreeRadius.'''
46
47 def setUp(self):
48 '''Set up prior to each test_* function'''
49 self.daemon = testlib.TestDaemon("/etc/init.d/freeradius")
50 self.tmpdir = tempfile.mkdtemp(prefix='freeradius-', dir='/tmp')
51 self.auth_approved = "code 2"
52 self.auth_denied = "code 3"
53
54 # Add a default user
55 self.users_file = "/etc/freeradius/users"
56 self.test_user = "testuser"
57 self.test_pass = "testpassword"
58 config_line = '%s Cleartext-Password := "%s"' % (self.test_user, self.test_pass)
59 testlib.config_replace(self.users_file, config_line, append=True)
60
61 rc, result = self.daemon.restart()
62 self.assertTrue(rc, result)
63
64 def tearDown(self):
65 '''Clean up after each test_* function'''
66
67 if os.path.exists(self.tmpdir):
68 testlib.recursive_rm(self.tmpdir)
69
70 testlib.config_restore(self.users_file)
71
72 def _test_auth(self, username, password, expected_string, expected_rc=0):
73 '''Tests authentication'''
74
75 handle, tmpname = testlib.mkstemp_fill("User-Name=%s,Password=%s" % (username, password), dir=self.tmpdir)
76
77 # can't use radtest as there's no way to set a timeout or number of retries
78 rc, report = testlib.cmd(['/usr/bin/radclient', '-r', '2', '-f', tmpname, '-s', 'localhost:1812', 'auth', 'testing123'])
79 result = 'Got exit code %d, expected %d\n' % (rc, expected_rc)
80 self.assertEquals(expected_rc, rc, result + report)
81
82 result = 'Could not find %s in output: %s\n' % (expected_string, report)
83 self.assertTrue(expected_string in report, result)
84
85
86 def test_valid_user(self):
87 '''Test a valid user'''
88
89 self._test_auth(self.test_user, self.test_pass, self.auth_approved)
90
91 def test_invalid_user(self):
92 '''Test an invalid user'''
93
94 self._test_auth('xxubuntuxx', 'xxrocksxx', self.auth_denied, 1)
95
96
97 def test_cve_2009_3111(self):
98 '''Test CVE-2009-3111'''
99
100 # This is same as CVE-2003-0967
101 # PoC from here: http://marc.info/?l=bugtraq&m=106944220426970
102
103 # Send a crafted packet
104 kaboom = "\x01\x01\x00\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x45\x02"
105 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
106 s.connect(('localhost', 1812))
107 s.send(kaboom)
108 s.close()
109 time.sleep(1)
110
111 # See if it still works
112 self._test_auth(self.test_user, self.test_pass, self.auth_approved)
113
114if __name__ == '__main__':
115 # simple
116 unittest.main()
0117
=== added file 'debian/tests/testlib.py'
--- debian/tests/testlib.py 1970-01-01 00:00:00 +0000
+++ debian/tests/testlib.py 2013-05-29 10:49:29 +0000
@@ -0,0 +1,1144 @@
1#
2# testlib.py quality assurance test script
3# Copyright (C) 2008-2011 Canonical Ltd.
4#
5# This library is free software; you can redistribute it and/or
6# modify it under the terms of the GNU Library General Public
7# License as published by the Free Software Foundation; either
8# version 2 of the License.
9#
10# This library is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13# Library General Public License for more details.
14#
15# You should have received a copy of the GNU Library General Public
16# License along with this program. If not, see
17# <http://www.gnu.org/licenses/>.
18#
19
20'''Common classes and functions for package tests.'''
21
22import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob
23import sys, socket, gzip
24from stat import *
25from encodings import string_escape
26
27import warnings
28warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
29try:
30 import apt_pkg
31 apt_pkg.InitSystem();
32except:
33 # On non-Debian system, fall back to simple comparison without debianisms
34 class apt_pkg(object):
35 def VersionCompare(one, two):
36 list_one = one.split('.')
37 list_two = two.split('.')
38 while len(list_one)>0 and len(list_two)>0:
39 if list_one[0] > list_two[0]:
40 return 1
41 if list_one[0] < list_two[0]:
42 return -1
43 list_one.pop(0)
44 list_two.pop(0)
45 return 0
46
47bogus_nxdomain = "208.69.32.132"
48
49# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
50# This is needed so that the subprocesses that produce endless output
51# actually quit when the reader goes away.
52import signal
53def subprocess_setup():
54 # Python installs a SIGPIPE handler by default. This is usually not what
55 # non-Python subprocesses expect.
56 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
57
58class TimedOutException(Exception):
59 def __init__(self, value = "Timed Out"):
60 self.value = value
61 def __str__(self):
62 return repr(self.value)
63
64def _restore_backup(path):
65 pathbackup = path + '.autotest'
66 if os.path.exists(pathbackup):
67 shutil.move(pathbackup, path)
68
69def _save_backup(path):
70 pathbackup = path + '.autotest'
71 if os.path.exists(path) and not os.path.exists(pathbackup):
72 shutil.copy2(path, pathbackup)
73 # copy2 does not copy ownership, so do it here.
74 # Reference: http://docs.python.org/library/shutil.html
75 a = os.stat(path)
76 os.chown(pathbackup, a[4], a[5])
77
78def config_copydir(path):
79 if os.path.exists(path) and not os.path.isdir(path):
80 raise OSError, "'%s' is not a directory" % (path)
81 _restore_backup(path)
82
83 pathbackup = path + '.autotest'
84 if os.path.exists(path):
85 shutil.copytree(path, pathbackup, symlinks=True)
86
87def config_replace(path,contents,append=False):
88 '''Replace (or append) to a config file'''
89 _restore_backup(path)
90 if os.path.exists(path):
91 _save_backup(path)
92 if append:
93 contents = file(path).read() + contents
94 open(path, 'w').write(contents)
95
96def config_comment(path, field):
97 _save_backup(path)
98 contents = ""
99 for line in file(path):
100 if re.search("^\s*%s\s*=" % (field), line):
101 line = "#" + line
102 contents += line
103
104 open(path+'.new', 'w').write(contents)
105 os.rename(path+'.new', path)
106
107def config_set(path, field, value, spaces=True):
108 _save_backup(path)
109 contents = ""
110 if spaces==True:
111 setting = '%s = %s\n' % (field, value)
112 else:
113 setting = '%s=%s\n' % (field, value)
114 found = False
115 for line in file(path):
116 if re.search("^\s*%s\s*=" % (field), line):
117 found = True
118 line = setting
119 contents += line
120 if not found:
121 contents += setting
122
123 open(path+'.new', 'w').write(contents)
124 os.rename(path+'.new', path)
125
126def config_patch(path, patch, depth=1):
127 '''Patch a config file'''
128 _restore_backup(path)
129 _save_backup(path)
130
131 handle, name = mkstemp_fill(patch)
132 rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
133 os.unlink(name)
134 if rc != 0:
135 raise Exception("Patch failed")
136
137def config_restore(path):
138 '''Rename a replaced config file back to its initial state'''
139 _restore_backup(path)
140
141def timeout(secs, f, *args):
142 def handler(signum, frame):
143 raise TimedOutException()
144
145 old = signal.signal(signal.SIGALRM, handler)
146 result = None
147 signal.alarm(secs)
148 try:
149 result = f(*args)
150 finally:
151 signal.alarm(0)
152 signal.signal(signal.SIGALRM, old)
153
154 return result
155
156def require_nonroot():
157 if os.geteuid() == 0:
158 print >>sys.stderr, "This series of tests should be run as a regular user with sudo access, not as root."
159 sys.exit(1)
160
161def require_root():
162 if os.geteuid() != 0:
163 print >>sys.stderr, "This series of tests should be run with root privileges (e.g. via sudo)."
164 sys.exit(1)
165
166def require_sudo():
167 if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
168 print >>sys.stderr, "This series of tests must be run under sudo."
169 sys.exit(1)
170 if os.environ['SUDO_USER'] == 'root':
171 print >>sys.stderr, 'Please run this test using sudo from a regular user. (You ran sudo from root.)'
172 sys.exit(1)
173
174def random_string(length,lower=False):
175 '''Return a random string, consisting of ASCII letters, with given
176 length.'''
177
178 s = ''
179 selection = string.letters
180 if lower:
181 selection = string.lowercase
182 maxind = len(selection)-1
183 for l in range(length):
184 s += selection[random.randint(0, maxind)]
185 return s
186
187def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
188 '''As tempfile.mkstemp does, return a (file, name) pair, but with
189 prefilled contents.'''
190
191 handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
192 os.close(handle)
193 handle = file(name,"w+")
194 handle.write(contents)
195 handle.flush()
196 handle.seek(0)
197
198 return handle, name
199
200def create_fill(path, contents, mode=0644):
201 '''Safely create a page'''
202 # make the temp file in the same dir as the destination file so we
203 # don't get invalid cross-device link errors when we rename
204 handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
205 handle.close()
206 os.rename(name, path)
207 os.chmod(path, mode)
208
209def login_exists(login):
210 '''Checks whether the given login exists on the system.'''
211
212 try:
213 pwd.getpwnam(login)
214 return True
215 except KeyError:
216 return False
217
218def group_exists(group):
219 '''Checks whether the given login exists on the system.'''
220
221 try:
222 grp.getgrnam(group)
223 return True
224 except KeyError:
225 return False
226
227def recursive_rm(dirPath, contents_only=False):
228 '''recursively remove directory'''
229 names = os.listdir(dirPath)
230 for name in names:
231 path = os.path.join(dirPath, name)
232 if os.path.islink(path) or not os.path.isdir(path):
233 os.unlink(path)
234 else:
235 recursive_rm(path)
236 if contents_only == False:
237 os.rmdir(dirPath)
238
239def check_pidfile(exe, pidfile):
240 '''Checks if pid in pidfile is running'''
241 if not os.path.exists(pidfile):
242 return False
243
244 # get the pid
245 try:
246 fd = open(pidfile, 'r')
247 pid = fd.readline().rstrip('\n')
248 fd.close()
249 except:
250 return False
251
252 return check_pid(exe, pid)
253
254def check_pid(exe, pid):
255 '''Checks if pid is running'''
256 cmdline = "/proc/%s/cmdline" % (str(pid))
257 if not os.path.exists(cmdline):
258 return False
259
260 # get the command line
261 try:
262 fd = open(cmdline, 'r')
263 tmp = fd.readline().split('\0')
264 fd.close()
265 except:
266 return False
267
268 # this allows us to match absolute paths or just the executable name
269 if re.match('^' + exe + '$', tmp[0]) or \
270 re.match('.*/' + exe + '$', tmp[0]) or \
271 re.match('^' + exe + ': ', tmp[0]) or \
272 re.match('^\(' + exe + '\)', tmp[0]):
273 return True
274
275 return False
276
277def check_port(port, proto, ver=4):
278 '''Check if something is listening on the specified port.
279 WARNING: for some reason this does not work with a bind mounted /proc
280 '''
281 assert (port >= 1)
282 assert (port <= 65535)
283 assert (proto.lower() == "tcp" or proto.lower() == "udp")
284 assert (ver == 4 or ver == 6)
285
286 fn = "/proc/net/%s" % (proto)
287 if ver == 6:
288 fn += str(ver)
289
290 rc, report = cmd(['cat', fn])
291 assert (rc == 0)
292
293 hport = "%0.4x" % port
294
295 if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
296 return True
297 return False
298
299def get_arch():
300 '''Get the current architecture'''
301 rc, report = cmd(['uname', '-m'])
302 assert (rc == 0)
303 return report.strip()
304
305def get_memory():
306 '''Gets total ram and swap'''
307 meminfo = "/proc/meminfo"
308 memtotal = 0
309 swaptotal = 0
310 if not os.path.exists(meminfo):
311 return (False, False)
312
313 try:
314 fd = open(meminfo, 'r')
315 for line in fd.readlines():
316 splitline = line.split()
317 if splitline[0] == 'MemTotal:':
318 memtotal = int(splitline[1])
319 elif splitline[0] == 'SwapTotal:':
320 swaptotal = int(splitline[1])
321 fd.close()
322 except:
323 return (False, False)
324
325 return (memtotal,swaptotal)
326
327def is_running_in_vm():
328 '''Check if running under a VM'''
329 # add other virtualization environments here
330 for search in ['QEMU Virtual CPU']:
331 rc, report = cmd_pipe(['dmesg'], ['grep', search])
332 if rc == 0:
333 return True
334 return False
335
336def ubuntu_release():
337 '''Get the Ubuntu release'''
338 f = "/etc/lsb-release"
339 try:
340 size = os.stat(f)[ST_SIZE]
341 except:
342 return "UNKNOWN"
343
344 if size > 1024*1024:
345 raise IOError, 'Could not open "%s" (too big)' % f
346
347 try:
348 fh = open("/etc/lsb-release", 'r')
349 except:
350 raise
351
352 lines = fh.readlines()
353 fh.close()
354
355 pat = re.compile(r'DISTRIB_CODENAME')
356 for line in lines:
357 if pat.search(line):
358 return line.split('=')[1].rstrip('\n').rstrip('\r')
359
360 return "UNKNOWN"
361
362def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
363 '''Try to execute given command (array) and return its stdout, or return
364 a textual error if it failed.'''
365
366 try:
367 sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
368 except OSError, e:
369 return [127, str(e)]
370
371 out, outerr = sp.communicate(input)
372 # Handle redirection of stdout
373 if out == None:
374 out = ''
375 # Handle redirection of stderr
376 if outerr == None:
377 outerr = ''
378 return [sp.returncode,out+outerr]
379
380def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
381 '''Try to pipe command1 into command2.'''
382 try:
383 sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
384 sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
385 except OSError, e:
386 return [127, str(e)]
387
388 out = sp2.communicate(input)[0]
389 return [sp2.returncode,out]
390
391def cwd_has_enough_space(cdir, total_bytes):
392 '''Determine if the partition of the current working directory has 'bytes'
393 free.'''
394 rc, df_output = cmd(['df'])
395 result = 'Got exit code %d, expected %d\n' % (rc, 0)
396 if rc != 0:
397 return False
398
399 kb = total_bytes / 1024
400
401 mounts = dict()
402 for line in df_output.splitlines():
403 if '/' not in line:
404 continue
405 tmp = line.split()
406 mounts[tmp[5]] = int(tmp[3])
407
408 cdir = os.getcwd()
409 while cdir != '/':
410 if not mounts.has_key(cdir):
411 cdir = os.path.dirname(cdir)
412 continue
413 if kb < mounts[cdir]:
414 return True
415 else:
416 return False
417
418 if kb < mounts['/']:
419 return True
420
421 return False
422
423def get_md5(filename):
424 '''Gets the md5sum of the file specified'''
425
426 (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
427 expected = 0
428 assert (expected == rc)
429
430 return report.split(' ')[0]
431
432def dpkg_compare_installed_version(pkg, check, version):
433 '''Gets the version for the installed package, and compares it to the
434 specified version.
435 '''
436 (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
437 assert (rc == 0)
438 assert ("Status: install ok installed" in report)
439 installed_version = ""
440 for line in report.splitlines():
441 if line.startswith("Version: "):
442 installed_version = line.split()[1]
443
444 assert (installed_version != "")
445
446 (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
447 assert (rc == 0 or rc == 1)
448 if rc == 0:
449 return True
450 return False
451
452def prepare_source(source, builder, cached_src, build_src, patch_system):
453 '''Download and unpack source package, installing necessary build depends,
454 adjusting the permissions for the 'builder' user, and returning the
455 directory of the unpacked source. Patch system can be one of:
456 - cdbs
457 - dpatch
458 - quilt
459 - quiltv3
460 - None (not the string)
461
462 This is normally used like this:
463
464 def setUp(self):
465 ...
466 self.topdir = os.getcwd()
467 self.cached_src = os.path.join(os.getcwd(), "source")
468 self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
469 self.builder = testlib.TestUser()
470 testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
471 os.chmod(self.tmpdir, 0775)
472
473 def tearDown(self):
474 ...
475 self.builder = None
476 self.topdir = os.getcwd()
477 if os.path.exists(self.tmpdir):
478 testlib.recursive_rm(self.tmpdir)
479
480 def test_suite_build(self):
481 ...
482 build_dir = testlib.prepare_source('foo', \
483 self.builder, \
484 self.cached_src, \
485 os.path.join(self.tmpdir, \
486 os.path.basename(self.cached_src)),
487 "quilt")
488 os.chdir(build_dir)
489
490 # Example for typical build, adjust as necessary
491 print ""
492 print " make clean"
493 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
494
495 print " configure"
496 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
497
498 print " make (will take a while)"
499 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
500
501 print " make check (will take a while)",
502 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
503 expected = 0
504 result = 'Got exit code %d, expected %d\n' % (rc, expected)
505 self.assertEquals(expected, rc, result + report)
506
507 def test_suite_cleanup(self):
508 ...
509 if os.path.exists(self.cached_src):
510 testlib.recursive_rm(self.cached_src)
511
512 It is up to the caller to clean up cached_src and build_src (as in the
513 above example, often the build_src is in a tmpdir that is cleaned in
514 tearDown() and the cached_src is cleaned in a one time clean-up
515 operation (eg 'test_suite_cleanup()) which must be run after the build
516 suite test (obviously).
517 '''
518
519 # Make sure we have a clean slate
520 assert (os.path.exists(os.path.dirname(build_src)))
521 assert (not os.path.exists(build_src))
522
523 cdir = os.getcwd()
524 if os.path.exists(cached_src):
525 shutil.copytree(cached_src, build_src)
526 os.chdir(build_src)
527 else:
528 # Only install the build dependencies on the initial setup
529 rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
530 assert (rc == 0)
531
532 os.makedirs(build_src)
533 os.chdir(build_src)
534
535 # These are always needed
536 pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
537 rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
538 assert (rc == 0)
539
540 rc, report = cmd(['apt-get','source',source])
541 assert (rc == 0)
542 shutil.copytree(build_src, cached_src)
543
544 unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
545
546 # Now apply the patches. Do it here so that we don't mess up our cached
547 # sources.
548 os.chdir(unpacked_dir)
549 assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
550 if patch_system != None and patch_system != "quiltv3":
551 if patch_system == "quilt":
552 os.environ.setdefault('QUILT_PATCHES','debian/patches')
553 rc, report = cmd(['quilt', 'push', '-a'])
554 assert (rc == 0)
555 elif patch_system == "cdbs":
556 rc, report = cmd(['./debian/rules', 'apply-patches'])
557 assert (rc == 0)
558 elif patch_system == "dpatch":
559 rc, report = cmd(['dpatch', 'apply-all'])
560 assert (rc == 0)
561
562 cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
563 os.chdir(cdir)
564
565 return unpacked_dir
566
567def _aa_status():
568 '''Get aa-status output'''
569 exe = "/usr/sbin/aa-status"
570 assert (os.path.exists(exe))
571 if os.geteuid() == 0:
572 return cmd([exe])
573 return cmd(['sudo', exe])
574
575def is_apparmor_loaded(path):
576 '''Check if profile is loaded'''
577 rc, report = _aa_status()
578 if rc != 0:
579 return False
580
581 for line in report.splitlines():
582 if line.endswith(path):
583 return True
584 return False
585
586def is_apparmor_confined(path):
587 '''Check if application is confined'''
588 rc, report = _aa_status()
589 if rc != 0:
590 return False
591
592 for line in report.splitlines():
593 if re.search('%s \(' % path, line):
594 return True
595 return False
596
597def check_apparmor(path, first_ubuntu_release, is_running=True):
598 '''Check if path is loaded and confined for everything higher than the
599 first Ubuntu release specified.
600
601 Usage:
602 rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
603 if rc < 0:
604 return self._skipped(report)
605
606 expected = 0
607 result = 'Got exit code %d, expected %d\n' % (rc, expected)
608 self.assertEquals(expected, rc, result + report)
609 '''
610 global manager
611 rc = -1
612
613 if manager.lsb_release["Release"] < first_ubuntu_release:
614 return (rc, "Skipped apparmor check")
615
616 if not os.path.exists('/sbin/apparmor_parser'):
617 return (rc, "Skipped (couldn't find apparmor_parser)")
618
619 rc = 0
620 msg = ""
621 if not is_apparmor_loaded(path):
622 rc = 1
623 msg = "Profile not loaded for '%s'" % path
624
625 # this check only makes sense it the 'path' is currently executing
626 if is_running and rc == 0 and not is_apparmor_confined(path):
627 rc = 1
628 msg = "'%s' is not running in enforce mode" % path
629
630 return (rc, msg)
631
632def get_gcc_version(gcc, full=True):
633 gcc_version = 'none'
634 if not gcc.startswith('/'):
635 gcc = '/usr/bin/%s' % (gcc)
636 if os.path.exists(gcc):
637 gcc_version = 'unknown'
638 lines = cmd([gcc,'-v'])[1].strip().splitlines()
639 version_lines = [x for x in lines if x.startswith('gcc version')]
640 if len(version_lines) == 1:
641 gcc_version = " ".join(version_lines[0].split()[2:])
642 if not full:
643 return gcc_version.split()[0]
644 return gcc_version
645
646def is_kdeinit_running():
647 '''Test if kdeinit is running'''
648 # applications that use kdeinit will spawn it if it isn't running in the
649 # test. This is a problem because it does not exit. This is a helper to
650 # check for it.
651 rc, report = cmd(['ps', 'x'])
652 if 'kdeinit4 Running' not in report:
653 print >>sys.stderr, ("kdeinit not running (you may start/stop any KDE application then run this script again)")
654 return False
655 return True
656
657def get_pkgconfig_flags(libs=[]):
658 '''Find pkg-config flags for libraries'''
659 assert (len(libs) > 0)
660 rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
661 expected = 0
662 if rc != expected:
663 print >>sys.stderr, 'Got exit code %d, expected %d\n' % (rc, expected)
664 assert(rc == expected)
665 return pkg_config.split()
666
667class TestDaemon:
668 '''Helper class to manage daemons consistently'''
669 def __init__(self, init):
670 '''Setup daemon attributes'''
671 self.initscript = init
672
673 def start(self):
674 '''Start daemon'''
675 rc, report = cmd([self.initscript, 'start'])
676 expected = 0
677 result = 'Got exit code %d, expected %d\n' % (rc, expected)
678 time.sleep(2)
679 if expected != rc:
680 return (False, result + report)
681
682 if "fail" in report:
683 return (False, "Found 'fail' in report\n" + report)
684
685 return (True, "")
686
687 def stop(self):
688 '''Stop daemon'''
689 rc, report = cmd([self.initscript, 'stop'])
690 expected = 0
691 result = 'Got exit code %d, expected %d\n' % (rc, expected)
692 if expected != rc:
693 return (False, result + report)
694
695 if "fail" in report:
696 return (False, "Found 'fail' in report\n" + report)
697
698 return (True, "")
699
700 def reload(self):
701 '''Reload daemon'''
702 rc, report = cmd([self.initscript, 'force-reload'])
703 expected = 0
704 result = 'Got exit code %d, expected %d\n' % (rc, expected)
705 if expected != rc:
706 return (False, result + report)
707
708 if "fail" in report:
709 return (False, "Found 'fail' in report\n" + report)
710
711 return (True, "")
712
713 def restart(self):
714 '''Restart daemon'''
715 (res, str) = self.stop()
716 if not res:
717 return (res, str)
718
719 (res, str) = self.start()
720 if not res:
721 return (res, str)
722
723 return (True, "")
724
725 def status(self):
726 '''Check daemon status'''
727 rc, report = cmd([self.initscript, 'status'])
728 expected = 0
729 result = 'Got exit code %d, expected %d\n' % (rc, expected)
730 if expected != rc:
731 return (False, result + report)
732
733 if "fail" in report:
734 return (False, "Found 'fail' in report\n" + report)
735
736 return (True, "")
737
738class TestlibManager(object):
739 '''Singleton class used to set up per-test-run information'''
740 def __init__(self):
741 # Set glibc aborts to dump to stderr instead of the tty so test output
742 # is more sane.
743 os.environ.setdefault('LIBC_FATAL_STDERR_','1')
744
745 # check verbosity
746 self.verbosity = False
747 if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
748 self.verbosity = True
749
750 # Load LSB release file
751 self.lsb_release = dict()
752 if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
753 raise OSError, "Please install 'lsb-release'"
754 for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].splitlines():
755 field, value = line.split(':',1)
756 value=value.strip()
757 field=field.strip()
758 # Convert numerics
759 try:
760 value = float(value)
761 except:
762 pass
763 self.lsb_release.setdefault(field,value)
764
765 # FIXME: hack OEM releases into known-Ubuntu versions
766 if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
767 if self.lsb_release['Release'] == 1.0:
768 self.lsb_release['Distributor ID'] = "Ubuntu"
769 self.lsb_release['Release'] = 8.04
770 else:
771 raise OSError, "Unknown version of HP MIE"
772
773 # FIXME: hack to assume a most-recent release if we're not
774 # running under Ubuntu.
775 if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
776 self.lsb_release['Release'] = 10000
777 # Adjust Linaro release to pretend to be Ubuntu
778 if self.lsb_release['Distributor ID'] in ["Linaro"]:
779 self.lsb_release['Distributor ID'] = "Ubuntu"
780 self.lsb_release['Release'] -= 0.01
781
782 # Load arch
783 if not os.path.exists('/usr/bin/dpkg'):
784 machine = cmd(['uname','-m'])[1].strip()
785 if machine.endswith('86'):
786 self.dpkg_arch = 'i386'
787 elif machine.endswith('_64'):
788 self.dpkg_arch = 'amd64'
789 elif machine.startswith('arm'):
790 self.dpkg_arch = 'armel'
791 else:
792 raise ValueError, "Unknown machine type '%s'" % (machine)
793 else:
794 self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
795
796 # Find kernel version
797 self.kernel_is_ubuntu = False
798 self.kernel_version_signature = None
799 self.kernel_version = cmd(["uname","-r"])[1].strip()
800 versig = '/proc/version_signature'
801 if os.path.exists(versig):
802 self.kernel_is_ubuntu = True
803 self.kernel_version_signature = file(versig).read().strip()
804 self.kernel_version_ubuntu = self.kernel_version
805 elif os.path.exists('/usr/bin/dpkg'):
806 # this can easily be inaccurate but is only an issue for Dapper
807 rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
808 if rc == 0:
809 self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
810 self.kernel_version_ubuntu = self.kernel_version_signature
811 if self.kernel_version_signature == None:
812 # Attempt to fall back to something for non-Debian-based
813 self.kernel_version_signature = self.kernel_version
814 self.kernel_version_ubuntu = self.kernel_version
815 # Build ubuntu version without hardware suffix
816 try:
817 self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
818 except:
819 pass
820
821 # Find gcc version
822 self.gcc_version = get_gcc_version('gcc')
823
824 # Find libc
825 self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
826
827 # Report self
828 if self.verbosity:
829 kernel = self.kernel_version_ubuntu
830 if kernel != self.kernel_version_signature:
831 kernel += " (%s)" % (self.kernel_version_signature)
832 print >>sys.stdout, "Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
833 sys.argv[0],
834 self.lsb_release['Distributor ID'],
835 self.lsb_release['Release'],
836 kernel,
837 self.dpkg_arch,
838 os.geteuid(), os.getuid(),
839 os.environ.get('SUDO_USER', ''))
840 sys.stdout.flush()
841
842 # Additional heuristics
843 #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
844 # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
845 # sys.stdout.flush()
846 # time.sleep(0.5)
847 # sys.stdout.write("destroyed\n")
848 # time.sleep(0.5)
849
850 def hello(self, msg):
851 print >>sys.stderr, "Hello from %s" % (msg)
852# The central instance
853manager = TestlibManager()
854
855class TestlibCase(unittest.TestCase):
856 def __init__(self, *args):
857 '''This is called for each TestCase test instance, which isn't much better
858 than SetUp.'''
859
860 unittest.TestCase.__init__(self, *args)
861
862 # Attach to and duplicate dicts from manager singleton
863 self.manager = manager
864 #self.manager.hello(repr(self) + repr(*args))
865 self.my_verbosity = self.manager.verbosity
866 self.lsb_release = self.manager.lsb_release
867 self.dpkg_arch = self.manager.dpkg_arch
868 self.kernel_version = self.manager.kernel_version
869 self.kernel_version_signature = self.manager.kernel_version_signature
870 self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
871 self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
872 self.gcc_version = self.manager.gcc_version
873 self.path_libc = self.manager.path_libc
874
875 def version_compare(self, one, two):
876 return apt_pkg.VersionCompare(one,two)
877
878 def assertFileType(self, filename, filetype):
879 '''Checks the file type of the file specified'''
880
881 (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
882 out = out.strip()
883 expected = 0
884 # Absolutely no idea why this happens on Hardy
885 if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
886 rc = 0
887 result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
888 self.assertEquals(expected, rc, result)
889
890 filetype = '^%s$' % (filetype)
891 result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
892 self.assertNotEquals(None, re.search(filetype, out), result)
893
894 def yank_commonname_from_cert(self, certfile):
895 '''Extract the commonName from a given PEM'''
896 rc, out = cmd(['openssl','asn1parse','-in',certfile])
897 if rc == 0:
898 ready = False
899 for line in out.splitlines():
900 if ready:
901 return line.split(':')[-1]
902 if ':commonName' in line:
903 ready = True
904 return socket.getfqdn()
905
906 def announce(self, text):
907 if self.my_verbosity:
908 print >>sys.stdout, "(%s) " % (text),
909 sys.stdout.flush()
910
911 def make_clean(self):
912 rc, output = self.shell_cmd(['make','clean'])
913 self.assertEquals(rc, 0, output)
914
915 def get_makefile_compiler(self):
916 # Find potential compiler name
917 compiler = 'gcc'
918 if os.path.exists('Makefile'):
919 for line in open('Makefile'):
920 if line.startswith('CC') and '=' in line:
921 items = [x.strip() for x in line.split('=')]
922 if items[0] == 'CC':
923 compiler = items[1]
924 break
925 return compiler
926
927 def make_target(self, target, expected=0):
928 '''Compile a target and report output'''
929
930 compiler = self.get_makefile_compiler()
931 rc, output = self.shell_cmd(['make',target])
932 self.assertEquals(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
933 self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
934 return output
935
936 # call as return testlib.skipped()
937 def _skipped(self, reason=""):
938 '''Provide a visible way to indicate that a test was skipped'''
939 if reason != "":
940 reason = ': %s' % (reason)
941 self.announce("skipped%s" % (reason))
942 return False
943
944 def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
945 argstr = "'" + "', '".join(args).strip() + "'"
946 rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
947 report = 'Command: ' + argstr + '\nOutput:\n' + out
948 return rc, report, out
949
950 def shell_cmd(self, args, stdin=None):
951 return cmd(args,stdin=stdin)
952
953 def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
954 '''Test a shell command matches a specific exit code'''
955 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
956 result = 'Got exit code %d, expected %d\n' % (rc, expected)
957 self.assertEquals(expected, rc, msg + result + report)
958
959 def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
960 '''Test a shell command doesn't match a specific exit code'''
961 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
962 result = 'Got (unwanted) exit code %d\n' % rc
963 self.assertNotEquals(unwanted, rc, msg + result + report)
964
965 def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
966 '''Test a shell command contains a specific output'''
967 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
968 result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
969 if not invert:
970 self.assertTrue(text in out, msg + result + report)
971 else:
972 self.assertFalse(text in out, msg + result + report)
973
974 def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
975 '''Test a shell command matches a specific output'''
976 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
977 result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
978 if not invert:
979 self.assertEquals(text, out, msg + result + report)
980 else:
981 self.assertNotEquals(text, out, msg + result + report)
982 if expected != None:
983 result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
984 self.assertEquals(rc, expected, msg + result + report)
985
986 def _word_find(self, report, content, invert=False):
987 '''Check for a specific string'''
988 if invert:
989 warning = 'Found "%s"\n' % content
990 self.assertTrue(content not in report, warning + report)
991 else:
992 warning = 'Could not find "%s"\n' % content
993 self.assertTrue(content in report, warning + report)
994
995 def _test_sysctl_value(self, path, expected, msg=None, exists=True):
996 sysctl = '/proc/sys/%s' % (path)
997 self.assertEquals(exists, os.path.exists(sysctl), sysctl)
998 value = None
999 if exists:
1000 value = int(file(sysctl).read())
1001 report = "%s is not %d: %d" % (sysctl, expected, value)
1002 if msg:
1003 report += " (%s)" % (msg)
1004 self.assertEquals(value, expected, report)
1005 return value
1006
1007 def set_sysctl_value(self, path, desired):
1008 sysctl = '/proc/sys/%s' % (path)
1009 self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
1010 file(sysctl,'w').write(str(desired))
1011 self._test_sysctl_value(path, desired)
1012
1013 def kernel_at_least(self, introduced):
1014 return self.version_compare(self.kernel_version_ubuntu,
1015 introduced) >= 0
1016
1017 def kernel_claims_cve_fixed(self, cve):
1018 changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
1019 if os.path.exists(changelog):
1020 for line in gzip.open(changelog):
1021 if cve in line and not "revert" in line and not "Revert" in line:
1022 return True
1023 return False
1024
1025class TestGroup:
1026 '''Create a temporary test group and remove it again in the dtor.'''
1027
1028 def __init__(self, group=None, lower=False):
1029 '''Create a new group'''
1030
1031 self.group = None
1032 if group:
1033 if group_exists(group):
1034 raise ValueError, 'group name already exists'
1035 else:
1036 while(True):
1037 group = random_string(7,lower=lower)
1038 if not group_exists(group):
1039 break
1040
1041 assert subprocess.call(['groupadd',group]) == 0
1042 self.group = group
1043 g = grp.getgrnam(self.group)
1044 self.gid = g[2]
1045
1046 def __del__(self):
1047 '''Remove the created group.'''
1048
1049 if self.group:
1050 rc, report = cmd(['groupdel', self.group])
1051 assert rc == 0
1052
1053class TestUser:
1054 '''Create a temporary test user and remove it again in the dtor.'''
1055
1056 def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
1057 '''Create a new user account with a random password.
1058
1059 By default, the login name is random, too, but can be explicitly
1060 specified with 'login'. By default, a home directory is created, this
1061 can be suppressed with 'home=False'.'''
1062
1063 self.login = None
1064
1065 if os.geteuid() != 0:
1066 raise ValueError, "You must be root to run this test"
1067
1068 if login:
1069 if login_exists(login):
1070 raise ValueError, 'login name already exists'
1071 else:
1072 while(True):
1073 login = 't' + random_string(7,lower=lower)
1074 if not login_exists(login):
1075 break
1076
1077 self.salt = random_string(2)
1078 self.password = random_string(8,lower=lower)
1079 self.crypted = crypt.crypt(self.password, self.salt)
1080
1081 creation = ['useradd', '-p', self.crypted]
1082 if home:
1083 creation += ['-m']
1084 if group:
1085 creation += ['-G',group]
1086 if uidmin:
1087 creation += ['-K','UID_MIN=%d'%uidmin]
1088 if shell:
1089 creation += ['-s',shell]
1090 creation += [login]
1091 assert subprocess.call(creation) == 0
1092 # Set GECOS
1093 assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
1094
1095 self.login = login
1096 p = pwd.getpwnam(self.login)
1097 self.uid = p[2]
1098 self.gid = p[3]
1099 self.gecos = p[4]
1100 self.home = p[5]
1101 self.shell = p[6]
1102
1103 def __del__(self):
1104 '''Remove the created user account.'''
1105
1106 if self.login:
1107 # sanity check the login name so we don't accidentally wipe too much
1108 if len(self.login)>3 and not '/' in self.login:
1109 subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
1110 rc, report = cmd(['userdel', '-f', self.login])
1111 assert rc == 0
1112
1113 def add_to_group(self, group):
1114 '''Add user to the specified group name'''
1115 rc, report = cmd(['usermod', '-G', group, self.login])
1116 if rc != 0:
1117 print report
1118 assert rc == 0
1119
1120# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
1121class TimeoutFunctionException(Exception):
1122 """Exception to raise on a timeout"""
1123 pass
1124class TimeoutFunction:
1125 def __init__(self, function, timeout):
1126 self.timeout = timeout
1127 self.function = function
1128
1129 def handle_timeout(self, signum, frame):
1130 raise TimeoutFunctionException()
1131
1132 def __call__(self, *args, **kwargs):
1133 old = signal.signal(signal.SIGALRM, self.handle_timeout)
1134 signal.alarm(self.timeout)
1135 try:
1136 result = self.function(*args, **kwargs)
1137 finally:
1138 signal.signal(signal.SIGALRM, old)
1139 signal.alarm(0)
1140 return result
1141
1142def main():
1143 print "hi"
1144 unittest.main()

Subscribers

People subscribed via source and target branches

to all changes: