Merge lp:~yolanda.robla/ubuntu/saucy/freeradius/dep-8-tests into lp:ubuntu/saucy/freeradius

Proposed by Yolanda Robla
Status: Merged
Merge reported by: James Page
Merged at revision: not available
Proposed branch: lp:~yolanda.robla/ubuntu/saucy/freeradius/dep-8-tests
Merge into: lp:ubuntu/saucy/freeradius
Diff against target: 1372 lines (+1323/-0)
8 files modified
debian/changelog (+6/-0)
debian/control (+1/-0)
debian/tests/clients (+34/-0)
debian/tests/control (+3/-0)
debian/tests/daemon (+13/-0)
debian/tests/freeradius (+6/-0)
debian/tests/test-freeradius.py (+116/-0)
debian/tests/testlib.py (+1144/-0)
To merge this branch: bzr merge lp:~yolanda.robla/ubuntu/saucy/freeradius/dep-8-tests
Reviewer Review Type Date Requested Status
James Page Approve
Ubuntu branches Pending
Review via email: mp+166215@code.launchpad.net

Description of the change

Added autopkgtests

To post a comment you must log in.
Revision history for this message
James Page (james-page) wrote :

LGTM - uploading.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'debian/changelog'
2--- debian/changelog 2012-12-29 00:54:44 +0000
3+++ debian/changelog 2013-05-29 10:49:29 +0000
4@@ -1,3 +1,9 @@
5+freeradius (2.1.12+dfsg-1.2ubuntu2) saucy; urgency=low
6+
7+ * d/tests: added autopkgtests
8+
9+ -- Yolanda Robla <yolanda.robla@canonical.com> Fri, 24 May 2013 16:06:20 +0200
10+
11 freeradius (2.1.12+dfsg-1.2ubuntu1) raring; urgency=low
12
13 * Fix FTBFS with multiarched python.
14
15=== modified file 'debian/control'
16--- debian/control 2012-12-29 00:54:44 +0000
17+++ debian/control 2013-05-29 10:49:29 +0000
18@@ -25,6 +25,7 @@
19 Uploaders: Stephen Gran <sgran@debian.org>, Mark Hymers <mhy@debian.org>
20 Standards-Version: 3.9.2
21 Homepage: http://www.freeradius.org/
22+XS-Testsuite: autopkgtest
23
24 Package: freeradius
25 Architecture: any
26
27=== added directory 'debian/tests'
28=== added file 'debian/tests/clients'
29--- debian/tests/clients 1970-01-01 00:00:00 +0000
30+++ debian/tests/clients 2013-05-29 10:49:29 +0000
31@@ -0,0 +1,34 @@
32+#!/bin/bash
33+#-------------------------
34+# Testing client utilities
35+#-------------------------
36+set -e
37+
38+HELP_CLIENTS=('radsniff')
39+for client in "${HELP_CLIENTS[@]}"; do
40+ RET=$($client -h 2>&1 > /dev/null)
41+
42+ if [[ $RET ]]; then
43+ echo "ERROR, ${client} is not running"
44+ fi
45+done
46+
47+VERSION_CLIENTS=('radclient' 'radeapclient')
48+for client in "${VERSION_CLIENTS[@]}"; do
49+ RET=$($client -v 2>&1 > /dev/null)
50+
51+ if [[ $RET ]]; then
52+ echo "ERROR, ${client} is not running"
53+ exit $RET
54+ fi
55+done
56+
57+ALONE_CLIENTS=('radlast')
58+for client in "${ALONE_CLIENTS[@]}"; do
59+ RET=$($client 2>&1 > /dev/null)
60+
61+ if [[ $RET ]]; then
62+ echo "ERROR, ${client} is not running"
63+ exit $RET
64+ fi
65+done
66
67=== added file 'debian/tests/control'
68--- debian/tests/control 1970-01-01 00:00:00 +0000
69+++ debian/tests/control 2013-05-29 10:49:29 +0000
70@@ -0,0 +1,3 @@
71+Tests: freeradius daemon clients
72+Depends: freeradius, freeradius-utils, python-unit, lsb-release
73+Restrictions: needs-root
74
75=== added file 'debian/tests/daemon'
76--- debian/tests/daemon 1970-01-01 00:00:00 +0000
77+++ debian/tests/daemon 2013-05-29 10:49:29 +0000
78@@ -0,0 +1,13 @@
79+#!/bin/bash
80+#-------------------
81+# Testing freeradius
82+#-------------------
83+set -e
84+DAEMON=freeradius
85+
86+if pidof -x $DAEMON > /dev/null; then
87+ echo "OK"
88+else
89+ echo "ERROR: ${DAEMON} IS NOT RUNNING"
90+ exit 1
91+fi
92
93=== added file 'debian/tests/freeradius'
94--- debian/tests/freeradius 1970-01-01 00:00:00 +0000
95+++ debian/tests/freeradius 2013-05-29 10:49:29 +0000
96@@ -0,0 +1,6 @@
97+#!/bin/bash
98+#-------------------
99+# Testing freeradius
100+#-------------------
101+set -e
102+python `dirname $0`/test-freeradius.py 2>&1
103
104=== added file 'debian/tests/test-freeradius.py'
105--- debian/tests/test-freeradius.py 1970-01-01 00:00:00 +0000
106+++ debian/tests/test-freeradius.py 2013-05-29 10:49:29 +0000
107@@ -0,0 +1,116 @@
108+#!/usr/bin/python
109+#
110+# test-freeradius.py quality assurance test script for freeradius
111+# Copyright (C) 2009-2012 Canonical Ltd.
112+# Author: Marc Deslauriers <marc.deslauriers@ubuntu.com>
113+#
114+# This program is free software: you can redistribute it and/or modify
115+# it under the terms of the GNU General Public License version 3,
116+# as published by the Free Software Foundation.
117+#
118+# This program is distributed in the hope that it will be useful,
119+# but WITHOUT ANY WARRANTY; without even the implied warranty of
120+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
121+# GNU General Public License for more details.
122+#
123+# You should have received a copy of the GNU General Public License
124+# along with this program. If not, see <http://www.gnu.org/licenses/>.
125+#
126+# packages required for test to run:
127+# QRT-Packages: freeradius python-unit
128+# packages where more than one package can satisfy a runtime requirement:
129+# QRT-Alternates:
130+# files and directories required for the test to run:
131+# QRT-Depends:
132+# QRT-Privilege: root
133+
134+'''
135+ How to run against a clean schroot named 'lucid':
136+ schroot -c lucid -u root -- sh -c 'apt-get -y install python-unit lsb-release freeradius && ./test-freeradius.py -v'
137+
138+'''
139+
140+
141+import unittest, subprocess, sys, tempfile, os, socket, time
142+import testlib
143+
144+try:
145+ from private.qrt.freeradius import PrivateFreeradiusTest
146+except ImportError:
147+ class PrivateFreeradiusTest(object):
148+ '''Empty class'''
149+ print >>sys.stdout, "Skipping private tests"
150+
151+class FreeradiusTest(testlib.TestlibCase, PrivateFreeradiusTest):
152+ '''Test FreeRadius.'''
153+
154+ def setUp(self):
155+ '''Set up prior to each test_* function'''
156+ self.daemon = testlib.TestDaemon("/etc/init.d/freeradius")
157+ self.tmpdir = tempfile.mkdtemp(prefix='freeradius-', dir='/tmp')
158+ self.auth_approved = "code 2"
159+ self.auth_denied = "code 3"
160+
161+ # Add a default user
162+ self.users_file = "/etc/freeradius/users"
163+ self.test_user = "testuser"
164+ self.test_pass = "testpassword"
165+ config_line = '%s Cleartext-Password := "%s"' % (self.test_user, self.test_pass)
166+ testlib.config_replace(self.users_file, config_line, append=True)
167+
168+ rc, result = self.daemon.restart()
169+ self.assertTrue(rc, result)
170+
171+ def tearDown(self):
172+ '''Clean up after each test_* function'''
173+
174+ if os.path.exists(self.tmpdir):
175+ testlib.recursive_rm(self.tmpdir)
176+
177+ testlib.config_restore(self.users_file)
178+
179+ def _test_auth(self, username, password, expected_string, expected_rc=0):
180+ '''Tests authentication'''
181+
182+ handle, tmpname = testlib.mkstemp_fill("User-Name=%s,Password=%s" % (username, password), dir=self.tmpdir)
183+
184+ # can't use radtest as there's no way to set a timeout or number of retries
185+ rc, report = testlib.cmd(['/usr/bin/radclient', '-r', '2', '-f', tmpname, '-s', 'localhost:1812', 'auth', 'testing123'])
186+ result = 'Got exit code %d, expected %d\n' % (rc, expected_rc)
187+ self.assertEquals(expected_rc, rc, result + report)
188+
189+ result = 'Could not find %s in output: %s\n' % (expected_string, report)
190+ self.assertTrue(expected_string in report, result)
191+
192+
193+ def test_valid_user(self):
194+ '''Test a valid user'''
195+
196+ self._test_auth(self.test_user, self.test_pass, self.auth_approved)
197+
198+ def test_invalid_user(self):
199+ '''Test an invalid user'''
200+
201+ self._test_auth('xxubuntuxx', 'xxrocksxx', self.auth_denied, 1)
202+
203+
204+ def test_cve_2009_3111(self):
205+ '''Test CVE-2009-3111'''
206+
207+ # This is same as CVE-2003-0967
208+ # PoC from here: http://marc.info/?l=bugtraq&m=106944220426970
209+
210+ # Send a crafted packet
211+ kaboom = "\x01\x01\x00\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x45\x02"
212+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
213+ s.connect(('localhost', 1812))
214+ s.send(kaboom)
215+ s.close()
216+ time.sleep(1)
217+
218+ # See if it still works
219+ self._test_auth(self.test_user, self.test_pass, self.auth_approved)
220+
221+if __name__ == '__main__':
222+ # simple
223+ unittest.main()
224
225=== added file 'debian/tests/testlib.py'
226--- debian/tests/testlib.py 1970-01-01 00:00:00 +0000
227+++ debian/tests/testlib.py 2013-05-29 10:49:29 +0000
228@@ -0,0 +1,1144 @@
229+#
230+# testlib.py quality assurance test script
231+# Copyright (C) 2008-2011 Canonical Ltd.
232+#
233+# This library is free software; you can redistribute it and/or
234+# modify it under the terms of the GNU Library General Public
235+# License as published by the Free Software Foundation; either
236+# version 2 of the License.
237+#
238+# This library is distributed in the hope that it will be useful,
239+# but WITHOUT ANY WARRANTY; without even the implied warranty of
240+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
241+# Library General Public License for more details.
242+#
243+# You should have received a copy of the GNU Library General Public
244+# License along with this program. If not, see
245+# <http://www.gnu.org/licenses/>.
246+#
247+
248+'''Common classes and functions for package tests.'''
249+
250+import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob
251+import sys, socket, gzip
252+from stat import *
253+from encodings import string_escape
254+
255+import warnings
256+warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
257+try:
258+ import apt_pkg
259+ apt_pkg.InitSystem();
260+except:
261+ # On non-Debian system, fall back to simple comparison without debianisms
262+ class apt_pkg(object):
263+ def VersionCompare(one, two):
264+ list_one = one.split('.')
265+ list_two = two.split('.')
266+ while len(list_one)>0 and len(list_two)>0:
267+ if list_one[0] > list_two[0]:
268+ return 1
269+ if list_one[0] < list_two[0]:
270+ return -1
271+ list_one.pop(0)
272+ list_two.pop(0)
273+ return 0
274+
275+bogus_nxdomain = "208.69.32.132"
276+
277+# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
278+# This is needed so that the subprocesses that produce endless output
279+# actually quit when the reader goes away.
280+import signal
281+def subprocess_setup():
282+ # Python installs a SIGPIPE handler by default. This is usually not what
283+ # non-Python subprocesses expect.
284+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
285+
286+class TimedOutException(Exception):
287+ def __init__(self, value = "Timed Out"):
288+ self.value = value
289+ def __str__(self):
290+ return repr(self.value)
291+
292+def _restore_backup(path):
293+ pathbackup = path + '.autotest'
294+ if os.path.exists(pathbackup):
295+ shutil.move(pathbackup, path)
296+
297+def _save_backup(path):
298+ pathbackup = path + '.autotest'
299+ if os.path.exists(path) and not os.path.exists(pathbackup):
300+ shutil.copy2(path, pathbackup)
301+ # copy2 does not copy ownership, so do it here.
302+ # Reference: http://docs.python.org/library/shutil.html
303+ a = os.stat(path)
304+ os.chown(pathbackup, a[4], a[5])
305+
306+def config_copydir(path):
307+ if os.path.exists(path) and not os.path.isdir(path):
308+ raise OSError, "'%s' is not a directory" % (path)
309+ _restore_backup(path)
310+
311+ pathbackup = path + '.autotest'
312+ if os.path.exists(path):
313+ shutil.copytree(path, pathbackup, symlinks=True)
314+
315+def config_replace(path,contents,append=False):
316+ '''Replace (or append) to a config file'''
317+ _restore_backup(path)
318+ if os.path.exists(path):
319+ _save_backup(path)
320+ if append:
321+ contents = file(path).read() + contents
322+ open(path, 'w').write(contents)
323+
324+def config_comment(path, field):
325+ _save_backup(path)
326+ contents = ""
327+ for line in file(path):
328+ if re.search("^\s*%s\s*=" % (field), line):
329+ line = "#" + line
330+ contents += line
331+
332+ open(path+'.new', 'w').write(contents)
333+ os.rename(path+'.new', path)
334+
335+def config_set(path, field, value, spaces=True):
336+ _save_backup(path)
337+ contents = ""
338+ if spaces==True:
339+ setting = '%s = %s\n' % (field, value)
340+ else:
341+ setting = '%s=%s\n' % (field, value)
342+ found = False
343+ for line in file(path):
344+ if re.search("^\s*%s\s*=" % (field), line):
345+ found = True
346+ line = setting
347+ contents += line
348+ if not found:
349+ contents += setting
350+
351+ open(path+'.new', 'w').write(contents)
352+ os.rename(path+'.new', path)
353+
354+def config_patch(path, patch, depth=1):
355+ '''Patch a config file'''
356+ _restore_backup(path)
357+ _save_backup(path)
358+
359+ handle, name = mkstemp_fill(patch)
360+ rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
361+ os.unlink(name)
362+ if rc != 0:
363+ raise Exception("Patch failed")
364+
365+def config_restore(path):
366+ '''Rename a replaced config file back to its initial state'''
367+ _restore_backup(path)
368+
369+def timeout(secs, f, *args):
370+ def handler(signum, frame):
371+ raise TimedOutException()
372+
373+ old = signal.signal(signal.SIGALRM, handler)
374+ result = None
375+ signal.alarm(secs)
376+ try:
377+ result = f(*args)
378+ finally:
379+ signal.alarm(0)
380+ signal.signal(signal.SIGALRM, old)
381+
382+ return result
383+
384+def require_nonroot():
385+ if os.geteuid() == 0:
386+ print >>sys.stderr, "This series of tests should be run as a regular user with sudo access, not as root."
387+ sys.exit(1)
388+
389+def require_root():
390+ if os.geteuid() != 0:
391+ print >>sys.stderr, "This series of tests should be run with root privileges (e.g. via sudo)."
392+ sys.exit(1)
393+
394+def require_sudo():
395+ if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
396+ print >>sys.stderr, "This series of tests must be run under sudo."
397+ sys.exit(1)
398+ if os.environ['SUDO_USER'] == 'root':
399+ print >>sys.stderr, 'Please run this test using sudo from a regular user. (You ran sudo from root.)'
400+ sys.exit(1)
401+
402+def random_string(length,lower=False):
403+ '''Return a random string, consisting of ASCII letters, with given
404+ length.'''
405+
406+ s = ''
407+ selection = string.letters
408+ if lower:
409+ selection = string.lowercase
410+ maxind = len(selection)-1
411+ for l in range(length):
412+ s += selection[random.randint(0, maxind)]
413+ return s
414+
415+def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
416+ '''As tempfile.mkstemp does, return a (file, name) pair, but with
417+ prefilled contents.'''
418+
419+ handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
420+ os.close(handle)
421+ handle = file(name,"w+")
422+ handle.write(contents)
423+ handle.flush()
424+ handle.seek(0)
425+
426+ return handle, name
427+
428+def create_fill(path, contents, mode=0644):
429+ '''Safely create a page'''
430+ # make the temp file in the same dir as the destination file so we
431+ # don't get invalid cross-device link errors when we rename
432+ handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
433+ handle.close()
434+ os.rename(name, path)
435+ os.chmod(path, mode)
436+
437+def login_exists(login):
438+ '''Checks whether the given login exists on the system.'''
439+
440+ try:
441+ pwd.getpwnam(login)
442+ return True
443+ except KeyError:
444+ return False
445+
446+def group_exists(group):
447+ '''Checks whether the given login exists on the system.'''
448+
449+ try:
450+ grp.getgrnam(group)
451+ return True
452+ except KeyError:
453+ return False
454+
455+def recursive_rm(dirPath, contents_only=False):
456+ '''recursively remove directory'''
457+ names = os.listdir(dirPath)
458+ for name in names:
459+ path = os.path.join(dirPath, name)
460+ if os.path.islink(path) or not os.path.isdir(path):
461+ os.unlink(path)
462+ else:
463+ recursive_rm(path)
464+ if contents_only == False:
465+ os.rmdir(dirPath)
466+
467+def check_pidfile(exe, pidfile):
468+ '''Checks if pid in pidfile is running'''
469+ if not os.path.exists(pidfile):
470+ return False
471+
472+ # get the pid
473+ try:
474+ fd = open(pidfile, 'r')
475+ pid = fd.readline().rstrip('\n')
476+ fd.close()
477+ except:
478+ return False
479+
480+ return check_pid(exe, pid)
481+
482+def check_pid(exe, pid):
483+ '''Checks if pid is running'''
484+ cmdline = "/proc/%s/cmdline" % (str(pid))
485+ if not os.path.exists(cmdline):
486+ return False
487+
488+ # get the command line
489+ try:
490+ fd = open(cmdline, 'r')
491+ tmp = fd.readline().split('\0')
492+ fd.close()
493+ except:
494+ return False
495+
496+ # this allows us to match absolute paths or just the executable name
497+ if re.match('^' + exe + '$', tmp[0]) or \
498+ re.match('.*/' + exe + '$', tmp[0]) or \
499+ re.match('^' + exe + ': ', tmp[0]) or \
500+ re.match('^\(' + exe + '\)', tmp[0]):
501+ return True
502+
503+ return False
504+
505+def check_port(port, proto, ver=4):
506+ '''Check if something is listening on the specified port.
507+ WARNING: for some reason this does not work with a bind mounted /proc
508+ '''
509+ assert (port >= 1)
510+ assert (port <= 65535)
511+ assert (proto.lower() == "tcp" or proto.lower() == "udp")
512+ assert (ver == 4 or ver == 6)
513+
514+ fn = "/proc/net/%s" % (proto)
515+ if ver == 6:
516+ fn += str(ver)
517+
518+ rc, report = cmd(['cat', fn])
519+ assert (rc == 0)
520+
521+ hport = "%0.4x" % port
522+
523+ if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
524+ return True
525+ return False
526+
527+def get_arch():
528+ '''Get the current architecture'''
529+ rc, report = cmd(['uname', '-m'])
530+ assert (rc == 0)
531+ return report.strip()
532+
533+def get_memory():
534+ '''Gets total ram and swap'''
535+ meminfo = "/proc/meminfo"
536+ memtotal = 0
537+ swaptotal = 0
538+ if not os.path.exists(meminfo):
539+ return (False, False)
540+
541+ try:
542+ fd = open(meminfo, 'r')
543+ for line in fd.readlines():
544+ splitline = line.split()
545+ if splitline[0] == 'MemTotal:':
546+ memtotal = int(splitline[1])
547+ elif splitline[0] == 'SwapTotal:':
548+ swaptotal = int(splitline[1])
549+ fd.close()
550+ except:
551+ return (False, False)
552+
553+ return (memtotal,swaptotal)
554+
555+def is_running_in_vm():
556+ '''Check if running under a VM'''
557+ # add other virtualization environments here
558+ for search in ['QEMU Virtual CPU']:
559+ rc, report = cmd_pipe(['dmesg'], ['grep', search])
560+ if rc == 0:
561+ return True
562+ return False
563+
564+def ubuntu_release():
565+ '''Get the Ubuntu release'''
566+ f = "/etc/lsb-release"
567+ try:
568+ size = os.stat(f)[ST_SIZE]
569+ except:
570+ return "UNKNOWN"
571+
572+ if size > 1024*1024:
573+ raise IOError, 'Could not open "%s" (too big)' % f
574+
575+ try:
576+ fh = open("/etc/lsb-release", 'r')
577+ except:
578+ raise
579+
580+ lines = fh.readlines()
581+ fh.close()
582+
583+ pat = re.compile(r'DISTRIB_CODENAME')
584+ for line in lines:
585+ if pat.search(line):
586+ return line.split('=')[1].rstrip('\n').rstrip('\r')
587+
588+ return "UNKNOWN"
589+
590+def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
591+ '''Try to execute given command (array) and return its stdout, or return
592+ a textual error if it failed.'''
593+
594+ try:
595+ sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
596+ except OSError, e:
597+ return [127, str(e)]
598+
599+ out, outerr = sp.communicate(input)
600+ # Handle redirection of stdout
601+ if out == None:
602+ out = ''
603+ # Handle redirection of stderr
604+ if outerr == None:
605+ outerr = ''
606+ return [sp.returncode,out+outerr]
607+
608+def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
609+ '''Try to pipe command1 into command2.'''
610+ try:
611+ sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
612+ sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
613+ except OSError, e:
614+ return [127, str(e)]
615+
616+ out = sp2.communicate(input)[0]
617+ return [sp2.returncode,out]
618+
619+def cwd_has_enough_space(cdir, total_bytes):
620+ '''Determine if the partition of the current working directory has 'bytes'
621+ free.'''
622+ rc, df_output = cmd(['df'])
623+ result = 'Got exit code %d, expected %d\n' % (rc, 0)
624+ if rc != 0:
625+ return False
626+
627+ kb = total_bytes / 1024
628+
629+ mounts = dict()
630+ for line in df_output.splitlines():
631+ if '/' not in line:
632+ continue
633+ tmp = line.split()
634+ mounts[tmp[5]] = int(tmp[3])
635+
636+ cdir = os.getcwd()
637+ while cdir != '/':
638+ if not mounts.has_key(cdir):
639+ cdir = os.path.dirname(cdir)
640+ continue
641+ if kb < mounts[cdir]:
642+ return True
643+ else:
644+ return False
645+
646+ if kb < mounts['/']:
647+ return True
648+
649+ return False
650+
651+def get_md5(filename):
652+ '''Gets the md5sum of the file specified'''
653+
654+ (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
655+ expected = 0
656+ assert (expected == rc)
657+
658+ return report.split(' ')[0]
659+
660+def dpkg_compare_installed_version(pkg, check, version):
661+ '''Gets the version for the installed package, and compares it to the
662+ specified version.
663+ '''
664+ (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
665+ assert (rc == 0)
666+ assert ("Status: install ok installed" in report)
667+ installed_version = ""
668+ for line in report.splitlines():
669+ if line.startswith("Version: "):
670+ installed_version = line.split()[1]
671+
672+ assert (installed_version != "")
673+
674+ (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
675+ assert (rc == 0 or rc == 1)
676+ if rc == 0:
677+ return True
678+ return False
679+
680+def prepare_source(source, builder, cached_src, build_src, patch_system):
681+ '''Download and unpack source package, installing necessary build depends,
682+ adjusting the permissions for the 'builder' user, and returning the
683+ directory of the unpacked source. Patch system can be one of:
684+ - cdbs
685+ - dpatch
686+ - quilt
687+ - quiltv3
688+ - None (not the string)
689+
690+ This is normally used like this:
691+
692+ def setUp(self):
693+ ...
694+ self.topdir = os.getcwd()
695+ self.cached_src = os.path.join(os.getcwd(), "source")
696+ self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
697+ self.builder = testlib.TestUser()
698+ testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
699+ os.chmod(self.tmpdir, 0775)
700+
701+ def tearDown(self):
702+ ...
703+ self.builder = None
704+ self.topdir = os.getcwd()
705+ if os.path.exists(self.tmpdir):
706+ testlib.recursive_rm(self.tmpdir)
707+
708+ def test_suite_build(self):
709+ ...
710+ build_dir = testlib.prepare_source('foo', \
711+ self.builder, \
712+ self.cached_src, \
713+ os.path.join(self.tmpdir, \
714+ os.path.basename(self.cached_src)),
715+ "quilt")
716+ os.chdir(build_dir)
717+
718+ # Example for typical build, adjust as necessary
719+ print ""
720+ print " make clean"
721+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
722+
723+ print " configure"
724+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
725+
726+ print " make (will take a while)"
727+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
728+
729+ print " make check (will take a while)",
730+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
731+ expected = 0
732+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
733+ self.assertEquals(expected, rc, result + report)
734+
735+ def test_suite_cleanup(self):
736+ ...
737+ if os.path.exists(self.cached_src):
738+ testlib.recursive_rm(self.cached_src)
739+
740+ It is up to the caller to clean up cached_src and build_src (as in the
741+ above example, often the build_src is in a tmpdir that is cleaned in
742+ tearDown() and the cached_src is cleaned in a one time clean-up
743+ operation (eg 'test_suite_cleanup()) which must be run after the build
744+ suite test (obviously).
745+ '''
746+
747+ # Make sure we have a clean slate
748+ assert (os.path.exists(os.path.dirname(build_src)))
749+ assert (not os.path.exists(build_src))
750+
751+ cdir = os.getcwd()
752+ if os.path.exists(cached_src):
753+ shutil.copytree(cached_src, build_src)
754+ os.chdir(build_src)
755+ else:
756+ # Only install the build dependencies on the initial setup
757+ rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
758+ assert (rc == 0)
759+
760+ os.makedirs(build_src)
761+ os.chdir(build_src)
762+
763+ # These are always needed
764+ pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
765+ rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
766+ assert (rc == 0)
767+
768+ rc, report = cmd(['apt-get','source',source])
769+ assert (rc == 0)
770+ shutil.copytree(build_src, cached_src)
771+
772+ unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
773+
774+ # Now apply the patches. Do it here so that we don't mess up our cached
775+ # sources.
776+ os.chdir(unpacked_dir)
777+ assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
778+ if patch_system != None and patch_system != "quiltv3":
779+ if patch_system == "quilt":
780+ os.environ.setdefault('QUILT_PATCHES','debian/patches')
781+ rc, report = cmd(['quilt', 'push', '-a'])
782+ assert (rc == 0)
783+ elif patch_system == "cdbs":
784+ rc, report = cmd(['./debian/rules', 'apply-patches'])
785+ assert (rc == 0)
786+ elif patch_system == "dpatch":
787+ rc, report = cmd(['dpatch', 'apply-all'])
788+ assert (rc == 0)
789+
790+ cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
791+ os.chdir(cdir)
792+
793+ return unpacked_dir
794+
795+def _aa_status():
796+ '''Get aa-status output'''
797+ exe = "/usr/sbin/aa-status"
798+ assert (os.path.exists(exe))
799+ if os.geteuid() == 0:
800+ return cmd([exe])
801+ return cmd(['sudo', exe])
802+
803+def is_apparmor_loaded(path):
804+ '''Check if profile is loaded'''
805+ rc, report = _aa_status()
806+ if rc != 0:
807+ return False
808+
809+ for line in report.splitlines():
810+ if line.endswith(path):
811+ return True
812+ return False
813+
814+def is_apparmor_confined(path):
815+ '''Check if application is confined'''
816+ rc, report = _aa_status()
817+ if rc != 0:
818+ return False
819+
820+ for line in report.splitlines():
821+ if re.search('%s \(' % path, line):
822+ return True
823+ return False
824+
825+def check_apparmor(path, first_ubuntu_release, is_running=True):
826+ '''Check if path is loaded and confined for everything higher than the
827+ first Ubuntu release specified.
828+
829+ Usage:
830+ rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
831+ if rc < 0:
832+ return self._skipped(report)
833+
834+ expected = 0
835+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
836+ self.assertEquals(expected, rc, result + report)
837+ '''
838+ global manager
839+ rc = -1
840+
841+ if manager.lsb_release["Release"] < first_ubuntu_release:
842+ return (rc, "Skipped apparmor check")
843+
844+ if not os.path.exists('/sbin/apparmor_parser'):
845+ return (rc, "Skipped (couldn't find apparmor_parser)")
846+
847+ rc = 0
848+ msg = ""
849+ if not is_apparmor_loaded(path):
850+ rc = 1
851+ msg = "Profile not loaded for '%s'" % path
852+
853+ # this check only makes sense it the 'path' is currently executing
854+ if is_running and rc == 0 and not is_apparmor_confined(path):
855+ rc = 1
856+ msg = "'%s' is not running in enforce mode" % path
857+
858+ return (rc, msg)
859+
860+def get_gcc_version(gcc, full=True):
861+ gcc_version = 'none'
862+ if not gcc.startswith('/'):
863+ gcc = '/usr/bin/%s' % (gcc)
864+ if os.path.exists(gcc):
865+ gcc_version = 'unknown'
866+ lines = cmd([gcc,'-v'])[1].strip().splitlines()
867+ version_lines = [x for x in lines if x.startswith('gcc version')]
868+ if len(version_lines) == 1:
869+ gcc_version = " ".join(version_lines[0].split()[2:])
870+ if not full:
871+ return gcc_version.split()[0]
872+ return gcc_version
873+
874+def is_kdeinit_running():
875+ '''Test if kdeinit is running'''
876+ # applications that use kdeinit will spawn it if it isn't running in the
877+ # test. This is a problem because it does not exit. This is a helper to
878+ # check for it.
879+ rc, report = cmd(['ps', 'x'])
880+ if 'kdeinit4 Running' not in report:
881+ print >>sys.stderr, ("kdeinit not running (you may start/stop any KDE application then run this script again)")
882+ return False
883+ return True
884+
885+def get_pkgconfig_flags(libs=[]):
886+ '''Find pkg-config flags for libraries'''
887+ assert (len(libs) > 0)
888+ rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
889+ expected = 0
890+ if rc != expected:
891+ print >>sys.stderr, 'Got exit code %d, expected %d\n' % (rc, expected)
892+ assert(rc == expected)
893+ return pkg_config.split()
894+
895+class TestDaemon:
896+ '''Helper class to manage daemons consistently'''
897+ def __init__(self, init):
898+ '''Setup daemon attributes'''
899+ self.initscript = init
900+
901+ def start(self):
902+ '''Start daemon'''
903+ rc, report = cmd([self.initscript, 'start'])
904+ expected = 0
905+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
906+ time.sleep(2)
907+ if expected != rc:
908+ return (False, result + report)
909+
910+ if "fail" in report:
911+ return (False, "Found 'fail' in report\n" + report)
912+
913+ return (True, "")
914+
915+ def stop(self):
916+ '''Stop daemon'''
917+ rc, report = cmd([self.initscript, 'stop'])
918+ expected = 0
919+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
920+ if expected != rc:
921+ return (False, result + report)
922+
923+ if "fail" in report:
924+ return (False, "Found 'fail' in report\n" + report)
925+
926+ return (True, "")
927+
928+ def reload(self):
929+ '''Reload daemon'''
930+ rc, report = cmd([self.initscript, 'force-reload'])
931+ expected = 0
932+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
933+ if expected != rc:
934+ return (False, result + report)
935+
936+ if "fail" in report:
937+ return (False, "Found 'fail' in report\n" + report)
938+
939+ return (True, "")
940+
941+ def restart(self):
942+ '''Restart daemon'''
943+ (res, str) = self.stop()
944+ if not res:
945+ return (res, str)
946+
947+ (res, str) = self.start()
948+ if not res:
949+ return (res, str)
950+
951+ return (True, "")
952+
953+ def status(self):
954+ '''Check daemon status'''
955+ rc, report = cmd([self.initscript, 'status'])
956+ expected = 0
957+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
958+ if expected != rc:
959+ return (False, result + report)
960+
961+ if "fail" in report:
962+ return (False, "Found 'fail' in report\n" + report)
963+
964+ return (True, "")
965+
966+class TestlibManager(object):
967+ '''Singleton class used to set up per-test-run information'''
968+ def __init__(self):
969+ # Set glibc aborts to dump to stderr instead of the tty so test output
970+ # is more sane.
971+ os.environ.setdefault('LIBC_FATAL_STDERR_','1')
972+
973+ # check verbosity
974+ self.verbosity = False
975+ if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
976+ self.verbosity = True
977+
978+ # Load LSB release file
979+ self.lsb_release = dict()
980+ if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
981+ raise OSError, "Please install 'lsb-release'"
982+ for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].splitlines():
983+ field, value = line.split(':',1)
984+ value=value.strip()
985+ field=field.strip()
986+ # Convert numerics
987+ try:
988+ value = float(value)
989+ except:
990+ pass
991+ self.lsb_release.setdefault(field,value)
992+
993+ # FIXME: hack OEM releases into known-Ubuntu versions
994+ if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
995+ if self.lsb_release['Release'] == 1.0:
996+ self.lsb_release['Distributor ID'] = "Ubuntu"
997+ self.lsb_release['Release'] = 8.04
998+ else:
999+ raise OSError, "Unknown version of HP MIE"
1000+
1001+ # FIXME: hack to assume a most-recent release if we're not
1002+ # running under Ubuntu.
1003+ if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
1004+ self.lsb_release['Release'] = 10000
1005+ # Adjust Linaro release to pretend to be Ubuntu
1006+ if self.lsb_release['Distributor ID'] in ["Linaro"]:
1007+ self.lsb_release['Distributor ID'] = "Ubuntu"
1008+ self.lsb_release['Release'] -= 0.01
1009+
1010+ # Load arch
1011+ if not os.path.exists('/usr/bin/dpkg'):
1012+ machine = cmd(['uname','-m'])[1].strip()
1013+ if machine.endswith('86'):
1014+ self.dpkg_arch = 'i386'
1015+ elif machine.endswith('_64'):
1016+ self.dpkg_arch = 'amd64'
1017+ elif machine.startswith('arm'):
1018+ self.dpkg_arch = 'armel'
1019+ else:
1020+ raise ValueError, "Unknown machine type '%s'" % (machine)
1021+ else:
1022+ self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
1023+
1024+ # Find kernel version
1025+ self.kernel_is_ubuntu = False
1026+ self.kernel_version_signature = None
1027+ self.kernel_version = cmd(["uname","-r"])[1].strip()
1028+ versig = '/proc/version_signature'
1029+ if os.path.exists(versig):
1030+ self.kernel_is_ubuntu = True
1031+ self.kernel_version_signature = file(versig).read().strip()
1032+ self.kernel_version_ubuntu = self.kernel_version
1033+ elif os.path.exists('/usr/bin/dpkg'):
1034+ # this can easily be inaccurate but is only an issue for Dapper
1035+ rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
1036+ if rc == 0:
1037+ self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
1038+ self.kernel_version_ubuntu = self.kernel_version_signature
1039+ if self.kernel_version_signature == None:
1040+ # Attempt to fall back to something for non-Debian-based
1041+ self.kernel_version_signature = self.kernel_version
1042+ self.kernel_version_ubuntu = self.kernel_version
1043+ # Build ubuntu version without hardware suffix
1044+ try:
1045+ self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
1046+ except:
1047+ pass
1048+
1049+ # Find gcc version
1050+ self.gcc_version = get_gcc_version('gcc')
1051+
1052+ # Find libc
1053+ self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
1054+
1055+ # Report self
1056+ if self.verbosity:
1057+ kernel = self.kernel_version_ubuntu
1058+ if kernel != self.kernel_version_signature:
1059+ kernel += " (%s)" % (self.kernel_version_signature)
1060+ print >>sys.stdout, "Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
1061+ sys.argv[0],
1062+ self.lsb_release['Distributor ID'],
1063+ self.lsb_release['Release'],
1064+ kernel,
1065+ self.dpkg_arch,
1066+ os.geteuid(), os.getuid(),
1067+ os.environ.get('SUDO_USER', ''))
1068+ sys.stdout.flush()
1069+
1070+ # Additional heuristics
1071+ #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
1072+ # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
1073+ # sys.stdout.flush()
1074+ # time.sleep(0.5)
1075+ # sys.stdout.write("destroyed\n")
1076+ # time.sleep(0.5)
1077+
1078+ def hello(self, msg):
1079+ print >>sys.stderr, "Hello from %s" % (msg)
1080+# The central instance
1081+manager = TestlibManager()
1082+
1083+class TestlibCase(unittest.TestCase):
1084+ def __init__(self, *args):
1085+ '''This is called for each TestCase test instance, which isn't much better
1086+ than SetUp.'''
1087+
1088+ unittest.TestCase.__init__(self, *args)
1089+
1090+ # Attach to and duplicate dicts from manager singleton
1091+ self.manager = manager
1092+ #self.manager.hello(repr(self) + repr(*args))
1093+ self.my_verbosity = self.manager.verbosity
1094+ self.lsb_release = self.manager.lsb_release
1095+ self.dpkg_arch = self.manager.dpkg_arch
1096+ self.kernel_version = self.manager.kernel_version
1097+ self.kernel_version_signature = self.manager.kernel_version_signature
1098+ self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
1099+ self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
1100+ self.gcc_version = self.manager.gcc_version
1101+ self.path_libc = self.manager.path_libc
1102+
1103+ def version_compare(self, one, two):
1104+ return apt_pkg.VersionCompare(one,two)
1105+
1106+ def assertFileType(self, filename, filetype):
1107+ '''Checks the file type of the file specified'''
1108+
1109+ (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
1110+ out = out.strip()
1111+ expected = 0
1112+ # Absolutely no idea why this happens on Hardy
1113+ if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
1114+ rc = 0
1115+ result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
1116+ self.assertEquals(expected, rc, result)
1117+
1118+ filetype = '^%s$' % (filetype)
1119+ result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
1120+ self.assertNotEquals(None, re.search(filetype, out), result)
1121+
1122+ def yank_commonname_from_cert(self, certfile):
1123+ '''Extract the commonName from a given PEM'''
1124+ rc, out = cmd(['openssl','asn1parse','-in',certfile])
1125+ if rc == 0:
1126+ ready = False
1127+ for line in out.splitlines():
1128+ if ready:
1129+ return line.split(':')[-1]
1130+ if ':commonName' in line:
1131+ ready = True
1132+ return socket.getfqdn()
1133+
1134+ def announce(self, text):
1135+ if self.my_verbosity:
1136+ print >>sys.stdout, "(%s) " % (text),
1137+ sys.stdout.flush()
1138+
1139+ def make_clean(self):
1140+ rc, output = self.shell_cmd(['make','clean'])
1141+ self.assertEquals(rc, 0, output)
1142+
1143+ def get_makefile_compiler(self):
1144+ # Find potential compiler name
1145+ compiler = 'gcc'
1146+ if os.path.exists('Makefile'):
1147+ for line in open('Makefile'):
1148+ if line.startswith('CC') and '=' in line:
1149+ items = [x.strip() for x in line.split('=')]
1150+ if items[0] == 'CC':
1151+ compiler = items[1]
1152+ break
1153+ return compiler
1154+
1155+ def make_target(self, target, expected=0):
1156+ '''Compile a target and report output'''
1157+
1158+ compiler = self.get_makefile_compiler()
1159+ rc, output = self.shell_cmd(['make',target])
1160+ self.assertEquals(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
1161+ self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
1162+ return output
1163+
1164+ # call as return testlib.skipped()
1165+ def _skipped(self, reason=""):
1166+ '''Provide a visible way to indicate that a test was skipped'''
1167+ if reason != "":
1168+ reason = ': %s' % (reason)
1169+ self.announce("skipped%s" % (reason))
1170+ return False
1171+
1172+ def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
1173+ argstr = "'" + "', '".join(args).strip() + "'"
1174+ rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
1175+ report = 'Command: ' + argstr + '\nOutput:\n' + out
1176+ return rc, report, out
1177+
1178+ def shell_cmd(self, args, stdin=None):
1179+ return cmd(args,stdin=stdin)
1180+
1181+ def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
1182+ '''Test a shell command matches a specific exit code'''
1183+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1184+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
1185+ self.assertEquals(expected, rc, msg + result + report)
1186+
1187+ def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
1188+ '''Test a shell command doesn't match a specific exit code'''
1189+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1190+ result = 'Got (unwanted) exit code %d\n' % rc
1191+ self.assertNotEquals(unwanted, rc, msg + result + report)
1192+
1193+ def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
1194+ '''Test a shell command contains a specific output'''
1195+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1196+ result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
1197+ if not invert:
1198+ self.assertTrue(text in out, msg + result + report)
1199+ else:
1200+ self.assertFalse(text in out, msg + result + report)
1201+
1202+ def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
1203+ '''Test a shell command matches a specific output'''
1204+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
1205+ result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
1206+ if not invert:
1207+ self.assertEquals(text, out, msg + result + report)
1208+ else:
1209+ self.assertNotEquals(text, out, msg + result + report)
1210+ if expected != None:
1211+ result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
1212+ self.assertEquals(rc, expected, msg + result + report)
1213+
1214+ def _word_find(self, report, content, invert=False):
1215+ '''Check for a specific string'''
1216+ if invert:
1217+ warning = 'Found "%s"\n' % content
1218+ self.assertTrue(content not in report, warning + report)
1219+ else:
1220+ warning = 'Could not find "%s"\n' % content
1221+ self.assertTrue(content in report, warning + report)
1222+
1223+ def _test_sysctl_value(self, path, expected, msg=None, exists=True):
1224+ sysctl = '/proc/sys/%s' % (path)
1225+ self.assertEquals(exists, os.path.exists(sysctl), sysctl)
1226+ value = None
1227+ if exists:
1228+ value = int(file(sysctl).read())
1229+ report = "%s is not %d: %d" % (sysctl, expected, value)
1230+ if msg:
1231+ report += " (%s)" % (msg)
1232+ self.assertEquals(value, expected, report)
1233+ return value
1234+
1235+ def set_sysctl_value(self, path, desired):
1236+ sysctl = '/proc/sys/%s' % (path)
1237+ self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
1238+ file(sysctl,'w').write(str(desired))
1239+ self._test_sysctl_value(path, desired)
1240+
1241+ def kernel_at_least(self, introduced):
1242+ return self.version_compare(self.kernel_version_ubuntu,
1243+ introduced) >= 0
1244+
1245+ def kernel_claims_cve_fixed(self, cve):
1246+ changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
1247+ if os.path.exists(changelog):
1248+ for line in gzip.open(changelog):
1249+ if cve in line and not "revert" in line and not "Revert" in line:
1250+ return True
1251+ return False
1252+
1253+class TestGroup:
1254+ '''Create a temporary test group and remove it again in the dtor.'''
1255+
1256+ def __init__(self, group=None, lower=False):
1257+ '''Create a new group'''
1258+
1259+ self.group = None
1260+ if group:
1261+ if group_exists(group):
1262+ raise ValueError, 'group name already exists'
1263+ else:
1264+ while(True):
1265+ group = random_string(7,lower=lower)
1266+ if not group_exists(group):
1267+ break
1268+
1269+ assert subprocess.call(['groupadd',group]) == 0
1270+ self.group = group
1271+ g = grp.getgrnam(self.group)
1272+ self.gid = g[2]
1273+
1274+ def __del__(self):
1275+ '''Remove the created group.'''
1276+
1277+ if self.group:
1278+ rc, report = cmd(['groupdel', self.group])
1279+ assert rc == 0
1280+
1281+class TestUser:
1282+ '''Create a temporary test user and remove it again in the dtor.'''
1283+
1284+ def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
1285+ '''Create a new user account with a random password.
1286+
1287+ By default, the login name is random, too, but can be explicitly
1288+ specified with 'login'. By default, a home directory is created, this
1289+ can be suppressed with 'home=False'.'''
1290+
1291+ self.login = None
1292+
1293+ if os.geteuid() != 0:
1294+ raise ValueError, "You must be root to run this test"
1295+
1296+ if login:
1297+ if login_exists(login):
1298+ raise ValueError, 'login name already exists'
1299+ else:
1300+ while(True):
1301+ login = 't' + random_string(7,lower=lower)
1302+ if not login_exists(login):
1303+ break
1304+
1305+ self.salt = random_string(2)
1306+ self.password = random_string(8,lower=lower)
1307+ self.crypted = crypt.crypt(self.password, self.salt)
1308+
1309+ creation = ['useradd', '-p', self.crypted]
1310+ if home:
1311+ creation += ['-m']
1312+ if group:
1313+ creation += ['-G',group]
1314+ if uidmin:
1315+ creation += ['-K','UID_MIN=%d'%uidmin]
1316+ if shell:
1317+ creation += ['-s',shell]
1318+ creation += [login]
1319+ assert subprocess.call(creation) == 0
1320+ # Set GECOS
1321+ assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
1322+
1323+ self.login = login
1324+ p = pwd.getpwnam(self.login)
1325+ self.uid = p[2]
1326+ self.gid = p[3]
1327+ self.gecos = p[4]
1328+ self.home = p[5]
1329+ self.shell = p[6]
1330+
1331+ def __del__(self):
1332+ '''Remove the created user account.'''
1333+
1334+ if self.login:
1335+ # sanity check the login name so we don't accidentally wipe too much
1336+ if len(self.login)>3 and not '/' in self.login:
1337+ subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
1338+ rc, report = cmd(['userdel', '-f', self.login])
1339+ assert rc == 0
1340+
1341+ def add_to_group(self, group):
1342+ '''Add user to the specified group name'''
1343+ rc, report = cmd(['usermod', '-G', group, self.login])
1344+ if rc != 0:
1345+ print report
1346+ assert rc == 0
1347+
1348+# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
1349+class TimeoutFunctionException(Exception):
1350+ """Exception to raise on a timeout"""
1351+ pass
1352+class TimeoutFunction:
1353+ def __init__(self, function, timeout):
1354+ self.timeout = timeout
1355+ self.function = function
1356+
1357+ def handle_timeout(self, signum, frame):
1358+ raise TimeoutFunctionException()
1359+
1360+ def __call__(self, *args, **kwargs):
1361+ old = signal.signal(signal.SIGALRM, self.handle_timeout)
1362+ signal.alarm(self.timeout)
1363+ try:
1364+ result = self.function(*args, **kwargs)
1365+ finally:
1366+ signal.signal(signal.SIGALRM, old)
1367+ signal.alarm(0)
1368+ return result
1369+
1370+def main():
1371+ print "hi"
1372+ unittest.main()

Subscribers

People subscribed via source and target branches

to all changes: