Merge lp:~yolanda.robla/nut/dep-8-tests into lp:ubuntu/saucy/nut

Proposed by Yolanda Robla
Status: Work in progress
Proposed branch: lp:~yolanda.robla/nut/dep-8-tests
Merge into: lp:ubuntu/saucy/nut
Diff against target: 1621 lines (+1582/-0)
6 files modified
debian/changelog (+6/-0)
debian/control (+1/-0)
debian/tests/control (+3/-0)
debian/tests/nut (+6/-0)
debian/tests/test-nut.py (+422/-0)
debian/tests/testlib.py (+1144/-0)
To merge this branch: bzr merge lp:~yolanda.robla/nut/dep-8-tests
Reviewer Review Type Date Requested Status
Martin Pitt Needs Fixing
James Hunt (community) Approve
Daniel Holbach (community) Needs Fixing
Jean-Baptiste Lallement (community) Approve
Review via email: mp+161414@code.launchpad.net
To post a comment you must log in.
lp:~yolanda.robla/nut/dep-8-tests updated
42. By Yolanda Robla

fixing test, no need to install packages

Revision history for this message
Jean-Baptiste Lallement (jibel) wrote :

Thanks for your work. Reviewed and tested on the QA infrastructure and it works fine.

On a side note, if several tests from the QRT are going to be ported to DEP8 it would make sense to package testlib separately to avoid code redundancy and make maintenance easier.

review: Approve
Revision history for this message
Daniel Holbach (dholbach) wrote :

Could you add a changelog entry in debian/changelog for the upload? It might also be a good idea to forward the change to Debian.

review: Needs Fixing
lp:~yolanda.robla/nut/dep-8-tests updated
43. By Yolanda Robla

d/tests: added dep-8-tests

Revision history for this message
Yolanda Robla (yolanda.robla) wrote :

recheck

Revision history for this message
James Hunt (jamesodhunt) wrote :

LGTM. Has this been submitted to Debian yet?

Revision history for this message
Yolanda Robla (yolanda.robla) wrote :

No, nothing submitted, i was waiting for the MP to be approved.

Revision history for this message
James Hunt (jamesodhunt) wrote :

Ok :)

review: Approve
Revision history for this message
Jamie Strandboge (jdstrand) wrote :

FYI, this has now diverged from QRT. I fixed it in a different way when /etc/init.d/nut moved to /etc/init.d/nut-server (and client).

Revision history for this message
Yolanda Robla (yolanda.robla) wrote :

There are problems with nut branch, it's giving an error:http://package-import.ubuntu.com/status/nut.html#2012-07-04%2020:31:15.037950

Changes have been submitted to debian manually, so they can be picked up when they are accepted.

Revision history for this message
Martin Pitt (pitti) wrote :

I ran this on current saucy (run-adt-test -sS lp:~yolanda.robla/nut/dep-8-tests nut) and got a failure:

adt-run: trace: & ubtree0t-nut: - - - - - - - - - - results - - - - - - - - - -
ubtree0t-nut FAIL non-zero exit status 1
adt-run: trace: & ubtree0t-nut: - - - - - - - - - - stdout - - - - - - - - - -
test_CVE_2012_2944 (__main__.BasicTest)
Test CVE-2012-2944 ... FAIL
test_daemons_pid (__main__.BasicTest)
Test daemons using PID files ... ok
test_daemons_service (__main__.BasicTest)
Test daemons using "service status" ... ok
test_upsc_device_list (__main__.BasicTest)
Test NUT client interface (upsc): device(s) listing ... ok
test_upsd_IPv4 (__main__.BasicTest)
Test upsd IPv4 reachability ... ok
test_upsd_IPv6 (__main__.BasicTest)
Test upsd IPv6 reachability ... ok
test_upsmon_notif (__main__.BasicTest)
Test upsmon notifications ... ok
test_upsrw (__main__.BasicTest)
Test upsrw ... ok

======================================================================
FAIL: test_CVE_2012_2944 (__main__.BasicTest)
Test CVE-2012-2944
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/tmp/tmp.K0RNk6YJAb/ubtree0-ubtree/debian/tests/test-nut.py", line 396, in test_CVE_2012_2944
    self.assertFalse(os.path.exists(pidfile), "Found %s" % pidfile)
AssertionError: Found /var/run/nut/upsd.pid

----------------------------------------------------------------------
Ran 8 tests in 62.702s

May this be related to what Jamie said in above comment?

Please set back to "needs review" once you are done, setting to WIP now to make it disappear from the sponsoring queue.

Thanks!

review: Needs Fixing

Unmerged revisions

43. By Yolanda Robla

d/tests: added dep-8-tests

42. By Yolanda Robla

fixing test, no need to install packages

41. By Yolanda Robla

adding dep-8 tests

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'debian/changelog'
--- debian/changelog 2009-12-18 14:04:47 +0000
+++ debian/changelog 2013-05-07 21:46:24 +0000
@@ -1,3 +1,9 @@
1nut (2.4.1-3.2ubuntu2) saucy; urgency=low
2
3 * d/tests: added dep-8-tests
4
5 -- Yolanda <yolanda.robla@canonical.com> Tue, 07 May 2013 23:42:31 +0200
6
1nut (2.4.1-3.2ubuntu1) lucid; urgency=low7nut (2.4.1-3.2ubuntu1) lucid; urgency=low
28
3 * Resynchronize with debian, remaining changes:9 * Resynchronize with debian, remaining changes:
410
=== modified file 'debian/control'
--- debian/control 2009-11-06 01:34:44 +0000
+++ debian/control 2013-05-07 21:46:24 +0000
@@ -9,6 +9,7 @@
9Homepage: http://www.networkupstools.org9Homepage: http://www.networkupstools.org
10Vcs-Browser: http://svn.debian.org/wsvn/nut10Vcs-Browser: http://svn.debian.org/wsvn/nut
11Vcs-Svn: svn://svn.debian.org/nut/trunk11Vcs-Svn: svn://svn.debian.org/nut/trunk
12XS-Testsuite: autopkgtest
1213
13Package: nut14Package: nut
14Architecture: any15Architecture: any
1516
=== added directory 'debian/tests'
=== added file 'debian/tests/control'
--- debian/tests/control 1970-01-01 00:00:00 +0000
+++ debian/tests/control 2013-05-07 21:46:24 +0000
@@ -0,0 +1,3 @@
1Tests: nut
2Depends: python-unit, nut-server, nut-client
3Restrictions: needs-root
04
=== added file 'debian/tests/nut'
--- debian/tests/nut 1970-01-01 00:00:00 +0000
+++ debian/tests/nut 2013-05-07 21:46:24 +0000
@@ -0,0 +1,6 @@
1#!/bin/bash
2#------------
3# Testing nut
4#------------
5set -e
6python `dirname $0`/test-nut.py 2>&1
07
=== added file 'debian/tests/test-nut.py'
--- debian/tests/test-nut.py 1970-01-01 00:00:00 +0000
+++ debian/tests/test-nut.py 2013-05-07 21:46:24 +0000
@@ -0,0 +1,422 @@
1#!/usr/bin/python
2#
3# test-nut.py quality assurance test script
4# Copyright (C) 2008-2011 Arnaud Quette <aquette@debian.org>
5# Copyright (C) 2012 Jamie Strandboge <jamie@canonical.com>
6#
7# This program is free software: you can redistribute it and/or modify
8# it under the terms of the GNU General Public License version 3,
9# as published by the Free Software Foundation.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <http://www.gnu.org/licenses/>.
18#
19
20'''
21 *** IMPORTANT ***
22 DO NOT RUN ON A PRODUCTION SERVER.
23 *** IMPORTANT ***
24
25 How to run (up to natty):
26 $ sudo apt-get -y install python-unit nut
27 $ sudo ./test-nut.py -v
28
29 How to run (oneiric+):
30 $ sudo apt-get -y install python-unit nut-server nut-client
31 $ sudo ./test-nut.py -v
32
33 NOTE:
34 - NUT architecture (helps understanding):
35 http://www.networkupstools.org/docs/developer-guide.chunked/ar01s02.html#_the_layering
36
37 - These tests only validate the NUT software framework itself (communication
38 between the drivers, server and client layers ; events propagation and
39 detection). The critical part of NUT, Ie the driver layer which
40 communicate with actual devices, can only be tested with real hardware!
41
42 - These tests use the NUT simulation driver (dummy-ups) to emulate real
43 hardware behavior, and generate events (power failure, low battery, ...).
44
45 TODO:
46 - improve test duration, by reworking NutTestCommon._setUp() and the way
47 daemons are started (ie, always)
48 - more events testing (upsmon / upssched)
49 - test syslog and wall output
50 - test UPS redundancy
51 - test Powerchain (once available!)
52 - test AppArmor (once available!)
53 - add hardware testing as Private tests?
54 - load a .dev file, and test a full output
55
56 QA INFORMATION:
57 - NUT provides "make check" and "make distcheck" in its source distribution
58 - NUT provides Quality Assurance information, to track all efforts:
59 http://www.networkupstools.org/nut-qa.html
60'''
61
62# QRT-Packages: python-unit netcat-openbsd
63# QRT-Alternates: nut-server nut
64# QRT-Alternates: nut-client nut
65# nut-dev is needed for the dummy driver on hardy
66# QRT-Alternates: nut-dev
67# QRT-Privilege: root
68# QRT-Depends:
69
70
71import unittest, subprocess, sys, os, time
72import tempfile
73import testlib
74
75use_private = True
76try:
77 from private.qrt.nut import PrivateNutTest
78except ImportError:
79 class PrivateNutTest(object):
80 '''Empty class'''
81 print >>sys.stdout, "Skipping private tests"
82
83
84class NutTestCommon(testlib.TestlibCase):
85 '''Common functions'''
86
87 # FIXME: initscript will be splitted into nut-server and nut-client
88 # (Debian bug #634858)
89 initscript = "/etc/init.d/nut-server"
90 hosts_file = "/etc/hosts"
91 powerdownflag = "/etc/killpower"
92 shutdowncmd = "/tmp/shutdowncmd"
93 notifyscript = "/tmp/nutifyme"
94 notifylog = "/tmp/notify.log"
95
96 def _setUp(self):
97 '''Set up prior to each test_* function'''
98 '''We generate a NUT config using the dummmy-ups driver
99 and standard settings for local monitoring
100 '''
101 self.tmpdir = ""
102 self.rundir = "/var/run/nut"
103 testlib.cmd(['/bin/rm -f' + self.powerdownflag])
104
105 testlib.config_replace('/etc/nut/ups.conf', '''
106[dummy-dev1]
107 driver = dummy-ups
108 port = dummy.dev
109 desc = "simulation device"
110 ''')
111
112 if self.lsb_release['Release'] <= 8.04:
113 testlib.config_replace('/etc/nut/upsd.conf', '''
114ACL dummy-net 127.0.0.1/8
115ACL dummy-net2 ::1/64
116ACL all 0.0.0.0/0
117ACCEPT dummy-net dummy-net2
118REJECT all
119 ''')
120 else:
121 testlib.config_replace('/etc/nut/upsd.conf', '''# just to touch the file''')
122
123 extra_cfgs = ''
124 if self.lsb_release['Release'] <= 8.04:
125 extra_cfgs = ''' allowfrom = dummy-net dummy-net2
126'''
127 testlib.config_replace('/etc/nut/upsd.users', '''
128[admin]
129 password = dummypass
130 actions = SET
131 instcmds = ALL
132%s
133[monuser]
134 password = dummypass
135 upsmon master
136%s ''' %(extra_cfgs, extra_cfgs))
137
138 testlib.config_replace('/etc/nut/upsmon.conf', '''
139MONITOR dummy-dev1@localhost 1 monuser dummy-pass master
140MINSUPPLIES 1
141SHUTDOWNCMD "/usr/bin/touch ''' + self.shutdowncmd + '"\n'
142'''POWERDOWNFLAG ''' + self.powerdownflag + '\n'
143'''
144NOTIFYCMD ''' + self.notifyscript + '\n'
145'''
146NOTIFYFLAG ONLINE SYSLOG+EXEC
147NOTIFYFLAG ONBATT SYSLOG+EXEC
148NOTIFYFLAG LOWBATT SYSLOG+EXEC
149NOTIFYFLAG FSD SYSLOG+EXEC
150# NOTIFYFLAG COMMOK SYSLOG+EXEC
151# NOTIFYFLAG COMMBAD SYSLOG+EXEC
152NOTIFYFLAG SHUTDOWN SYSLOG+EXEC
153# NOTIFYFLAG REPLBATT SYSLOG+EXEC
154# NOTIFYFLAG NOCOMM SYSLOG+EXEC
155# NOTIFYFLAG NOPARENT SYSLOG+EXEC
156
157# Shorten test duration by:
158# Speeding up polling frequency
159POLLFREQ 2
160# And final wait delay
161FINALDELAY 0
162'''
163)
164
165 testlib.create_fill(self.notifyscript, '''
166#! /bin/bash
167echo "$*" > ''' + self.notifylog + '\n', mode=0755)
168
169 # dummy-ups absolutely needs a data file, even if empty
170 testlib.config_replace('/etc/nut/dummy.dev', '''
171ups.mfr: Dummy Manufacturer
172ups.model: Dummy UPS
173ups.status: OL
174# Set a big enough timer to avoid value reset, due to reading loop
175TIMER 600
176 ''')
177
178 testlib.config_replace('/etc/nut/nut.conf', '''MODE=standalone''')
179
180 # Add known friendly IP names for localhost v4 and v6
181 # FIXME: find a way to determine if v4 / v6 are enabled, and a way to
182 # get v4 / v6 names
183 testlib.config_replace(self.hosts_file, '''#
184127.0.0.1 localv4
185::1 localv6
186''', append=True)
187
188 if self.lsb_release['Release'] <= 8.04:
189 testlib.config_replace('/etc/default/nut', '''#
190START_UPSD=yes
191UPSD_OPTIONS=""
192START_UPSMON=yes
193UPSMON_OPTIONS=""
194''', append=False)
195
196 # Start the framework
197 self._restart()
198
199 def _tearDown(self):
200 '''Clean up after each test_* function'''
201 self._stop()
202 time.sleep(2)
203 os.unlink('/etc/nut/ups.conf')
204 os.unlink('/etc/nut/upsd.conf')
205 os.unlink('/etc/nut/upsd.users')
206 os.unlink('/etc/nut/upsmon.conf')
207 os.unlink('/etc/nut/dummy.dev')
208 os.unlink('/etc/nut/nut.conf')
209 testlib.config_restore('/etc/nut/ups.conf')
210 testlib.config_restore('/etc/nut/upsd.conf')
211 testlib.config_restore('/etc/nut/upsd.users')
212 testlib.config_restore('/etc/nut/upsmon.conf')
213 testlib.config_restore('/etc/nut/dummy.dev')
214 testlib.config_restore('/etc/nut/nut.conf')
215 if os.path.exists(self.notifyscript):
216 os.unlink(self.notifyscript)
217 if os.path.exists(self.shutdowncmd):
218 os.unlink(self.shutdowncmd)
219 testlib.config_restore(self.hosts_file)
220 if self.lsb_release['Release'] <= 8.04:
221 testlib.config_restore('/etc/default/nut')
222
223 if os.path.exists(self.tmpdir):
224 testlib.recursive_rm(self.tmpdir)
225
226 # this is needed because of the potentially hung upsd process in the
227 # CVE-2012-2944 test
228 testlib.cmd(['killall', 'upsd'])
229 testlib.cmd(['killall', '-9', 'upsd'])
230
231 def _start(self):
232 '''Start NUT'''
233 rc, report = testlib.cmd([self.initscript, 'start'])
234 expected = 0
235 result = 'Got exit code %d, expected %d\n' % (rc, expected)
236 self.assertEquals(expected, rc, result + report)
237 time.sleep(2)
238
239 def _stop(self):
240 '''Stop NUT'''
241 rc, report = testlib.cmd([self.initscript, 'stop'])
242 expected = 0
243 result = 'Got exit code %d, expected %d\n' % (rc, expected)
244 self.assertEquals(expected, rc, result + report)
245
246 def _reload(self):
247 '''Reload NUT'''
248 rc, report = testlib.cmd([self.initscript, 'force-reload'])
249 expected = 0
250 result = 'Got exit code %d, expected %d\n' % (rc, expected)
251 self.assertEquals(expected, rc, result + report)
252
253 def _restart(self):
254 '''Restart NUT'''
255 self._stop()
256 time.sleep(2)
257 self._start()
258
259 def _status(self):
260 '''NUT Status'''
261 rc, report = testlib.cmd([self.initscript, 'status'])
262 expected = 0
263 if self.lsb_release['Release'] <= 8.04:
264 self._skipped("init script does not support status command")
265 expected = 1
266 result = 'Got exit code %d, expected %d\n' % (rc, expected)
267 self.assertEquals(expected, rc, result + report)
268
269 def _testDaemons(self, daemons):
270 '''Daemons running'''
271 for d in daemons:
272 # A note on the driver pid file: its name is
273 # <ups.conf section name>-<driver name>.pid
274 # ex: dummy-dev1-dummy-ups.pid
275 if d == 'dummy-ups' :
276 pidfile = os.path.join(self.rundir, 'dummy-ups-dummy-dev1.pid')
277 else :
278 pidfile = os.path.join(self.rundir, d + '.pid')
279 warning = "Could not find pidfile '" + pidfile + "'"
280 self.assertTrue(os.path.exists(pidfile), warning)
281 self.assertTrue(testlib.check_pidfile(d, pidfile), d + ' is not running')
282
283 def _nut_setvar(self, var, value):
284 '''Test upsrw'''
285 rc, report = testlib.cmd(['/bin/upsrw', '-s', var + '=' + value,
286 '-u', 'admin' , '-p', 'dummypass', 'dummy-dev1@localhost'])
287 self.assertTrue(rc == 0, 'upsrw: ' + report)
288 return rc,report
289
290
291class BasicTest(NutTestCommon, PrivateNutTest):
292 '''Test basic NUT functionalities'''
293
294 def setUp(self):
295 '''Setup mechanisms'''
296 NutTestCommon._setUp(self)
297
298 def tearDown(self):
299 '''Shutdown methods'''
300 NutTestCommon._tearDown(self)
301
302 def test_daemons_service(self):
303 '''Test daemons using "service status"'''
304 self._status()
305
306 def test_daemons_pid(self):
307 '''Test daemons using PID files'''
308 # upsmon does not work because ups-client is still missing
309 daemons = [ 'dummy-ups', 'upsd']
310 self._testDaemons(daemons)
311
312 def test_upsd_IPv4(self):
313 '''Test upsd IPv4 reachability'''
314 rc, report = testlib.cmd(['/bin/upsc', '-l', 'localv4'])
315 self.assertTrue('dummy-dev1' in report, 'dummy-dev1 should be present in device(s) listing: ' + report)
316
317 def test_upsd_IPv6(self):
318 '''Test upsd IPv6 reachability'''
319 rc, report = testlib.cmd(['/bin/upsc', '-l', 'localv6'])
320 self.assertTrue('dummy-dev1' in report, 'dummy-dev1 should be present in device(s) listing: ' + report)
321
322 def test_upsc_device_list(self):
323 '''Test NUT client interface (upsc): device(s) listing'''
324 rc, report = testlib.cmd(['/bin/upsc', '-L'])
325 self.assertTrue('dummy-dev1: simulation device' in report, 'dummy-dev1 should be present in device(s) listing: ' + report)
326
327 def _test_upsc_status(self):
328 '''Test NUT client interface (upsc): data access'''
329 rc, report = testlib.cmd(['/bin/upsc', 'dummy-dev1', 'ups.status'])
330 self.assertTrue('OL' in report, 'UPS Status: ' + report + 'should be OL')
331
332 #def test_upsc_powerchain(self):
333 # '''Test NUT client interface (upsc): Powerchain(s) listing'''
334 # rc, report = testlib.cmd(['/bin/upsc', '-p'])
335 # Result == Main ; dummy-dev1 ; $hostname
336 # self.assertTrue('dummy-dev1' in report, 'dummy-dev1 should be present in device(s) listing: ' + report)
337
338 def test_upsrw(self):
339 '''Test upsrw'''
340 # Set ups.status to OB (On Battery)...
341 self._nut_setvar('ups.model', 'Test')
342 time.sleep(2)
343 # and check the result on the client side
344 rc, report = testlib.cmd(['/bin/upsc', 'dummy-dev1@localhost', 'ups.model'])
345 self.assertTrue('Test' in report, 'UPS Model: ' + report + 'should be Test')
346
347 # FIXME: need a simulation counterpart, not yet implemented
348 #def test_upscmd(self):
349 # '''Test upscmd'''
350
351 def test_upsmon_notif(self):
352 '''Test upsmon notifications'''
353 # Set ups.status to OB (On Battery)...
354 self._nut_setvar('ups.status', 'OB')
355 time.sleep(1)
356 # and check the result on the client side
357 rc, report = testlib.cmd(['/bin/upsc', 'dummy-dev1@localhost', 'ups.status'])
358 self.assertTrue('OB' in report, 'UPS Status: ' + report + 'should be OB')
359
360 #def test_upsmon_shutdown(self):
361 # '''Test upsmon basic shutdown (single UPS, low battery status)'''
362 # self._nut_setvar('ups.status', 'OB LB')
363 # time.sleep(2)
364 # # and check the result on the client side
365 # rc, report = testlib.cmd(['/bin/upsc', 'dummy-dev1@localhost', 'ups.status'])
366 # self.assertTrue('OB LB' in report, 'UPS Status: ' + report + 'should be OB LB')
367 # # FIXME: improve with a 2 sec loop * 5 tries
368 # time.sleep(3)
369 # # Check for powerdownflag and shutdowncmd (needed for halt!)
370 # # FIXME: replace by a call to 'upsmon -K'
371 # self.assertTrue(os.path.exists(self.powerdownflag), 'POWERDOWNFLAG has not been set!')
372 # self.assertTrue(os.path.exists(self.shutdowncmd), 'SHUTDOWNCMD has not been executed!')
373
374 def test_CVE_2012_2944(self):
375 '''Test CVE-2012-2944'''
376 self.tmpdir = tempfile.mkdtemp(dir='/tmp', prefix="testlib-")
377 # First send bad input. We need to do this in a script because python
378 # functions don't like our embedded NULs
379 script = os.path.join(self.tmpdir, 'script.sh')
380 contents = '''#!/bin/sh
381printf '\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\n' | nc -q 1 127.0.0.1 3493
382sleep 1
383dd if=/dev/urandom count=64 | nc -q 1 127.0.0.1 3493
384'''
385 testlib.create_fill(script, contents, mode=0755)
386 rc, report = testlib.cmd([script])
387
388 # It should not have crashed. Let's see if it did
389 self._testDaemons(['upsd'])
390 self.assertTrue('ERR UNKNOWN-COMMAND' in report, "Could not find 'ERR UNKNOWN-COMMAND' in:\n%s" % report)
391
392 # This CVE may also result in a hung upsd. Try to kill it, if it is
393 # still around, it is hung
394 testlib.cmd(['killall', 'upsd'])
395 pidfile = os.path.join(self.rundir, 'upsd.pid')
396 self.assertFalse(os.path.exists(pidfile), "Found %s" % pidfile)
397 self.assertFalse(testlib.check_pidfile('upsd', pidfile), 'upsd is hung')
398 #subprocess.call(['bash'])
399
400# FIXME
401#class AdvancedTest(NutTestCommon, PrivateNutTest):
402# '''Test advanced NUT functionalities'''
403
404if __name__ == '__main__':
405
406 suite = unittest.TestSuite()
407 # more configurable
408 if (len(sys.argv) == 1 or sys.argv[1] == '-v'):
409 suite.addTest(unittest.TestLoader().loadTestsFromTestCase(BasicTest))
410
411 # Pull in private tests
412 #if use_private:
413 # suite.addTest(unittest.TestLoader().loadTestsFromTestCase(MyPrivateTest))
414
415 else:
416 print '''Usage:
417 test-nut.py [-v] basic tests
418'''
419 sys.exit(1)
420 rc = unittest.TextTestRunner(verbosity=2).run(suite)
421 if not rc.wasSuccessful():
422 sys.exit(1)
0423
=== added file 'debian/tests/testlib.py'
--- debian/tests/testlib.py 1970-01-01 00:00:00 +0000
+++ debian/tests/testlib.py 2013-05-07 21:46:24 +0000
@@ -0,0 +1,1144 @@
1#
2# testlib.py quality assurance test script
3# Copyright (C) 2008-2011 Canonical Ltd.
4#
5# This library is free software; you can redistribute it and/or
6# modify it under the terms of the GNU Library General Public
7# License as published by the Free Software Foundation; either
8# version 2 of the License.
9#
10# This library is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13# Library General Public License for more details.
14#
15# You should have received a copy of the GNU Library General Public
16# License along with this program. If not, see
17# <http://www.gnu.org/licenses/>.
18#
19
20'''Common classes and functions for package tests.'''
21
22import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob
23import sys, socket, gzip
24from stat import *
25from encodings import string_escape
26
27import warnings
28warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
29try:
30 import apt_pkg
31 apt_pkg.InitSystem();
32except:
33 # On non-Debian system, fall back to simple comparison without debianisms
34 class apt_pkg(object):
35 def VersionCompare(one, two):
36 list_one = one.split('.')
37 list_two = two.split('.')
38 while len(list_one)>0 and len(list_two)>0:
39 if list_one[0] > list_two[0]:
40 return 1
41 if list_one[0] < list_two[0]:
42 return -1
43 list_one.pop(0)
44 list_two.pop(0)
45 return 0
46
47bogus_nxdomain = "208.69.32.132"
48
49# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
50# This is needed so that the subprocesses that produce endless output
51# actually quit when the reader goes away.
52import signal
53def subprocess_setup():
54 # Python installs a SIGPIPE handler by default. This is usually not what
55 # non-Python subprocesses expect.
56 signal.signal(signal.SIGPIPE, signal.SIG_DFL)
57
58class TimedOutException(Exception):
59 def __init__(self, value = "Timed Out"):
60 self.value = value
61 def __str__(self):
62 return repr(self.value)
63
64def _restore_backup(path):
65 pathbackup = path + '.autotest'
66 if os.path.exists(pathbackup):
67 shutil.move(pathbackup, path)
68
69def _save_backup(path):
70 pathbackup = path + '.autotest'
71 if os.path.exists(path) and not os.path.exists(pathbackup):
72 shutil.copy2(path, pathbackup)
73 # copy2 does not copy ownership, so do it here.
74 # Reference: http://docs.python.org/library/shutil.html
75 a = os.stat(path)
76 os.chown(pathbackup, a[4], a[5])
77
78def config_copydir(path):
79 if os.path.exists(path) and not os.path.isdir(path):
80 raise OSError, "'%s' is not a directory" % (path)
81 _restore_backup(path)
82
83 pathbackup = path + '.autotest'
84 if os.path.exists(path):
85 shutil.copytree(path, pathbackup, symlinks=True)
86
87def config_replace(path,contents,append=False):
88 '''Replace (or append) to a config file'''
89 _restore_backup(path)
90 if os.path.exists(path):
91 _save_backup(path)
92 if append:
93 contents = file(path).read() + contents
94 open(path, 'w').write(contents)
95
96def config_comment(path, field):
97 _save_backup(path)
98 contents = ""
99 for line in file(path):
100 if re.search("^\s*%s\s*=" % (field), line):
101 line = "#" + line
102 contents += line
103
104 open(path+'.new', 'w').write(contents)
105 os.rename(path+'.new', path)
106
107def config_set(path, field, value, spaces=True):
108 _save_backup(path)
109 contents = ""
110 if spaces==True:
111 setting = '%s = %s\n' % (field, value)
112 else:
113 setting = '%s=%s\n' % (field, value)
114 found = False
115 for line in file(path):
116 if re.search("^\s*%s\s*=" % (field), line):
117 found = True
118 line = setting
119 contents += line
120 if not found:
121 contents += setting
122
123 open(path+'.new', 'w').write(contents)
124 os.rename(path+'.new', path)
125
126def config_patch(path, patch, depth=1):
127 '''Patch a config file'''
128 _restore_backup(path)
129 _save_backup(path)
130
131 handle, name = mkstemp_fill(patch)
132 rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
133 os.unlink(name)
134 if rc != 0:
135 raise Exception("Patch failed")
136
137def config_restore(path):
138 '''Rename a replaced config file back to its initial state'''
139 _restore_backup(path)
140
141def timeout(secs, f, *args):
142 def handler(signum, frame):
143 raise TimedOutException()
144
145 old = signal.signal(signal.SIGALRM, handler)
146 result = None
147 signal.alarm(secs)
148 try:
149 result = f(*args)
150 finally:
151 signal.alarm(0)
152 signal.signal(signal.SIGALRM, old)
153
154 return result
155
156def require_nonroot():
157 if os.geteuid() == 0:
158 print >>sys.stderr, "This series of tests should be run as a regular user with sudo access, not as root."
159 sys.exit(1)
160
161def require_root():
162 if os.geteuid() != 0:
163 print >>sys.stderr, "This series of tests should be run with root privileges (e.g. via sudo)."
164 sys.exit(1)
165
166def require_sudo():
167 if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
168 print >>sys.stderr, "This series of tests must be run under sudo."
169 sys.exit(1)
170 if os.environ['SUDO_USER'] == 'root':
171 print >>sys.stderr, 'Please run this test using sudo from a regular user. (You ran sudo from root.)'
172 sys.exit(1)
173
174def random_string(length,lower=False):
175 '''Return a random string, consisting of ASCII letters, with given
176 length.'''
177
178 s = ''
179 selection = string.letters
180 if lower:
181 selection = string.lowercase
182 maxind = len(selection)-1
183 for l in range(length):
184 s += selection[random.randint(0, maxind)]
185 return s
186
187def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
188 '''As tempfile.mkstemp does, return a (file, name) pair, but with
189 prefilled contents.'''
190
191 handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
192 os.close(handle)
193 handle = file(name,"w+")
194 handle.write(contents)
195 handle.flush()
196 handle.seek(0)
197
198 return handle, name
199
200def create_fill(path, contents, mode=0644):
201 '''Safely create a page'''
202 # make the temp file in the same dir as the destination file so we
203 # don't get invalid cross-device link errors when we rename
204 handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
205 handle.close()
206 os.rename(name, path)
207 os.chmod(path, mode)
208
209def login_exists(login):
210 '''Checks whether the given login exists on the system.'''
211
212 try:
213 pwd.getpwnam(login)
214 return True
215 except KeyError:
216 return False
217
218def group_exists(group):
219 '''Checks whether the given login exists on the system.'''
220
221 try:
222 grp.getgrnam(group)
223 return True
224 except KeyError:
225 return False
226
227def recursive_rm(dirPath, contents_only=False):
228 '''recursively remove directory'''
229 names = os.listdir(dirPath)
230 for name in names:
231 path = os.path.join(dirPath, name)
232 if os.path.islink(path) or not os.path.isdir(path):
233 os.unlink(path)
234 else:
235 recursive_rm(path)
236 if contents_only == False:
237 os.rmdir(dirPath)
238
239def check_pidfile(exe, pidfile):
240 '''Checks if pid in pidfile is running'''
241 if not os.path.exists(pidfile):
242 return False
243
244 # get the pid
245 try:
246 fd = open(pidfile, 'r')
247 pid = fd.readline().rstrip('\n')
248 fd.close()
249 except:
250 return False
251
252 return check_pid(exe, pid)
253
254def check_pid(exe, pid):
255 '''Checks if pid is running'''
256 cmdline = "/proc/%s/cmdline" % (str(pid))
257 if not os.path.exists(cmdline):
258 return False
259
260 # get the command line
261 try:
262 fd = open(cmdline, 'r')
263 tmp = fd.readline().split('\0')
264 fd.close()
265 except:
266 return False
267
268 # this allows us to match absolute paths or just the executable name
269 if re.match('^' + exe + '$', tmp[0]) or \
270 re.match('.*/' + exe + '$', tmp[0]) or \
271 re.match('^' + exe + ': ', tmp[0]) or \
272 re.match('^\(' + exe + '\)', tmp[0]):
273 return True
274
275 return False
276
277def check_port(port, proto, ver=4):
278 '''Check if something is listening on the specified port.
279 WARNING: for some reason this does not work with a bind mounted /proc
280 '''
281 assert (port >= 1)
282 assert (port <= 65535)
283 assert (proto.lower() == "tcp" or proto.lower() == "udp")
284 assert (ver == 4 or ver == 6)
285
286 fn = "/proc/net/%s" % (proto)
287 if ver == 6:
288 fn += str(ver)
289
290 rc, report = cmd(['cat', fn])
291 assert (rc == 0)
292
293 hport = "%0.4x" % port
294
295 if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
296 return True
297 return False
298
299def get_arch():
300 '''Get the current architecture'''
301 rc, report = cmd(['uname', '-m'])
302 assert (rc == 0)
303 return report.strip()
304
305def get_memory():
306 '''Gets total ram and swap'''
307 meminfo = "/proc/meminfo"
308 memtotal = 0
309 swaptotal = 0
310 if not os.path.exists(meminfo):
311 return (False, False)
312
313 try:
314 fd = open(meminfo, 'r')
315 for line in fd.readlines():
316 splitline = line.split()
317 if splitline[0] == 'MemTotal:':
318 memtotal = int(splitline[1])
319 elif splitline[0] == 'SwapTotal:':
320 swaptotal = int(splitline[1])
321 fd.close()
322 except:
323 return (False, False)
324
325 return (memtotal,swaptotal)
326
327def is_running_in_vm():
328 '''Check if running under a VM'''
329 # add other virtualization environments here
330 for search in ['QEMU Virtual CPU']:
331 rc, report = cmd_pipe(['dmesg'], ['grep', search])
332 if rc == 0:
333 return True
334 return False
335
336def ubuntu_release():
337 '''Get the Ubuntu release'''
338 f = "/etc/lsb-release"
339 try:
340 size = os.stat(f)[ST_SIZE]
341 except:
342 return "UNKNOWN"
343
344 if size > 1024*1024:
345 raise IOError, 'Could not open "%s" (too big)' % f
346
347 try:
348 fh = open("/etc/lsb-release", 'r')
349 except:
350 raise
351
352 lines = fh.readlines()
353 fh.close()
354
355 pat = re.compile(r'DISTRIB_CODENAME')
356 for line in lines:
357 if pat.search(line):
358 return line.split('=')[1].rstrip('\n').rstrip('\r')
359
360 return "UNKNOWN"
361
362def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
363 '''Try to execute given command (array) and return its stdout, or return
364 a textual error if it failed.'''
365
366 try:
367 sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
368 except OSError, e:
369 return [127, str(e)]
370
371 out, outerr = sp.communicate(input)
372 # Handle redirection of stdout
373 if out == None:
374 out = ''
375 # Handle redirection of stderr
376 if outerr == None:
377 outerr = ''
378 return [sp.returncode,out+outerr]
379
380def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
381 '''Try to pipe command1 into command2.'''
382 try:
383 sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
384 sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
385 except OSError, e:
386 return [127, str(e)]
387
388 out = sp2.communicate(input)[0]
389 return [sp2.returncode,out]
390
391def cwd_has_enough_space(cdir, total_bytes):
392 '''Determine if the partition of the current working directory has 'bytes'
393 free.'''
394 rc, df_output = cmd(['df'])
395 result = 'Got exit code %d, expected %d\n' % (rc, 0)
396 if rc != 0:
397 return False
398
399 kb = total_bytes / 1024
400
401 mounts = dict()
402 for line in df_output.splitlines():
403 if '/' not in line:
404 continue
405 tmp = line.split()
406 mounts[tmp[5]] = int(tmp[3])
407
408 cdir = os.getcwd()
409 while cdir != '/':
410 if not mounts.has_key(cdir):
411 cdir = os.path.dirname(cdir)
412 continue
413 if kb < mounts[cdir]:
414 return True
415 else:
416 return False
417
418 if kb < mounts['/']:
419 return True
420
421 return False
422
423def get_md5(filename):
424 '''Gets the md5sum of the file specified'''
425
426 (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
427 expected = 0
428 assert (expected == rc)
429
430 return report.split(' ')[0]
431
432def dpkg_compare_installed_version(pkg, check, version):
433 '''Gets the version for the installed package, and compares it to the
434 specified version.
435 '''
436 (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
437 assert (rc == 0)
438 assert ("Status: install ok installed" in report)
439 installed_version = ""
440 for line in report.splitlines():
441 if line.startswith("Version: "):
442 installed_version = line.split()[1]
443
444 assert (installed_version != "")
445
446 (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
447 assert (rc == 0 or rc == 1)
448 if rc == 0:
449 return True
450 return False
451
452def prepare_source(source, builder, cached_src, build_src, patch_system):
453 '''Download and unpack source package, installing necessary build depends,
454 adjusting the permissions for the 'builder' user, and returning the
455 directory of the unpacked source. Patch system can be one of:
456 - cdbs
457 - dpatch
458 - quilt
459 - quiltv3
460 - None (not the string)
461
462 This is normally used like this:
463
464 def setUp(self):
465 ...
466 self.topdir = os.getcwd()
467 self.cached_src = os.path.join(os.getcwd(), "source")
468 self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
469 self.builder = testlib.TestUser()
470 testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
471 os.chmod(self.tmpdir, 0775)
472
473 def tearDown(self):
474 ...
475 self.builder = None
476 self.topdir = os.getcwd()
477 if os.path.exists(self.tmpdir):
478 testlib.recursive_rm(self.tmpdir)
479
480 def test_suite_build(self):
481 ...
482 build_dir = testlib.prepare_source('foo', \
483 self.builder, \
484 self.cached_src, \
485 os.path.join(self.tmpdir, \
486 os.path.basename(self.cached_src)),
487 "quilt")
488 os.chdir(build_dir)
489
490 # Example for typical build, adjust as necessary
491 print ""
492 print " make clean"
493 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
494
495 print " configure"
496 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
497
498 print " make (will take a while)"
499 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
500
501 print " make check (will take a while)",
502 rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
503 expected = 0
504 result = 'Got exit code %d, expected %d\n' % (rc, expected)
505 self.assertEquals(expected, rc, result + report)
506
507 def test_suite_cleanup(self):
508 ...
509 if os.path.exists(self.cached_src):
510 testlib.recursive_rm(self.cached_src)
511
512 It is up to the caller to clean up cached_src and build_src (as in the
513 above example, often the build_src is in a tmpdir that is cleaned in
514 tearDown() and the cached_src is cleaned in a one time clean-up
515 operation (eg 'test_suite_cleanup()) which must be run after the build
516 suite test (obviously).
517 '''
518
519 # Make sure we have a clean slate
520 assert (os.path.exists(os.path.dirname(build_src)))
521 assert (not os.path.exists(build_src))
522
523 cdir = os.getcwd()
524 if os.path.exists(cached_src):
525 shutil.copytree(cached_src, build_src)
526 os.chdir(build_src)
527 else:
528 # Only install the build dependencies on the initial setup
529 rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
530 assert (rc == 0)
531
532 os.makedirs(build_src)
533 os.chdir(build_src)
534
535 # These are always needed
536 pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
537 rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
538 assert (rc == 0)
539
540 rc, report = cmd(['apt-get','source',source])
541 assert (rc == 0)
542 shutil.copytree(build_src, cached_src)
543
544 unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
545
546 # Now apply the patches. Do it here so that we don't mess up our cached
547 # sources.
548 os.chdir(unpacked_dir)
549 assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
550 if patch_system != None and patch_system != "quiltv3":
551 if patch_system == "quilt":
552 os.environ.setdefault('QUILT_PATCHES','debian/patches')
553 rc, report = cmd(['quilt', 'push', '-a'])
554 assert (rc == 0)
555 elif patch_system == "cdbs":
556 rc, report = cmd(['./debian/rules', 'apply-patches'])
557 assert (rc == 0)
558 elif patch_system == "dpatch":
559 rc, report = cmd(['dpatch', 'apply-all'])
560 assert (rc == 0)
561
562 cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
563 os.chdir(cdir)
564
565 return unpacked_dir
566
567def _aa_status():
568 '''Get aa-status output'''
569 exe = "/usr/sbin/aa-status"
570 assert (os.path.exists(exe))
571 if os.geteuid() == 0:
572 return cmd([exe])
573 return cmd(['sudo', exe])
574
575def is_apparmor_loaded(path):
576 '''Check if profile is loaded'''
577 rc, report = _aa_status()
578 if rc != 0:
579 return False
580
581 for line in report.splitlines():
582 if line.endswith(path):
583 return True
584 return False
585
586def is_apparmor_confined(path):
587 '''Check if application is confined'''
588 rc, report = _aa_status()
589 if rc != 0:
590 return False
591
592 for line in report.splitlines():
593 if re.search('%s \(' % path, line):
594 return True
595 return False
596
597def check_apparmor(path, first_ubuntu_release, is_running=True):
598 '''Check if path is loaded and confined for everything higher than the
599 first Ubuntu release specified.
600
601 Usage:
602 rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
603 if rc < 0:
604 return self._skipped(report)
605
606 expected = 0
607 result = 'Got exit code %d, expected %d\n' % (rc, expected)
608 self.assertEquals(expected, rc, result + report)
609 '''
610 global manager
611 rc = -1
612
613 if manager.lsb_release["Release"] < first_ubuntu_release:
614 return (rc, "Skipped apparmor check")
615
616 if not os.path.exists('/sbin/apparmor_parser'):
617 return (rc, "Skipped (couldn't find apparmor_parser)")
618
619 rc = 0
620 msg = ""
621 if not is_apparmor_loaded(path):
622 rc = 1
623 msg = "Profile not loaded for '%s'" % path
624
625 # this check only makes sense it the 'path' is currently executing
626 if is_running and rc == 0 and not is_apparmor_confined(path):
627 rc = 1
628 msg = "'%s' is not running in enforce mode" % path
629
630 return (rc, msg)
631
632def get_gcc_version(gcc, full=True):
633 gcc_version = 'none'
634 if not gcc.startswith('/'):
635 gcc = '/usr/bin/%s' % (gcc)
636 if os.path.exists(gcc):
637 gcc_version = 'unknown'
638 lines = cmd([gcc,'-v'])[1].strip().splitlines()
639 version_lines = [x for x in lines if x.startswith('gcc version')]
640 if len(version_lines) == 1:
641 gcc_version = " ".join(version_lines[0].split()[2:])
642 if not full:
643 return gcc_version.split()[0]
644 return gcc_version
645
646def is_kdeinit_running():
647 '''Test if kdeinit is running'''
648 # applications that use kdeinit will spawn it if it isn't running in the
649 # test. This is a problem because it does not exit. This is a helper to
650 # check for it.
651 rc, report = cmd(['ps', 'x'])
652 if 'kdeinit4 Running' not in report:
653 print >>sys.stderr, ("kdeinit not running (you may start/stop any KDE application then run this script again)")
654 return False
655 return True
656
657def get_pkgconfig_flags(libs=[]):
658 '''Find pkg-config flags for libraries'''
659 assert (len(libs) > 0)
660 rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
661 expected = 0
662 if rc != expected:
663 print >>sys.stderr, 'Got exit code %d, expected %d\n' % (rc, expected)
664 assert(rc == expected)
665 return pkg_config.split()
666
667class TestDaemon:
668 '''Helper class to manage daemons consistently'''
669 def __init__(self, init):
670 '''Setup daemon attributes'''
671 self.initscript = init
672
673 def start(self):
674 '''Start daemon'''
675 rc, report = cmd([self.initscript, 'start'])
676 expected = 0
677 result = 'Got exit code %d, expected %d\n' % (rc, expected)
678 time.sleep(2)
679 if expected != rc:
680 return (False, result + report)
681
682 if "fail" in report:
683 return (False, "Found 'fail' in report\n" + report)
684
685 return (True, "")
686
687 def stop(self):
688 '''Stop daemon'''
689 rc, report = cmd([self.initscript, 'stop'])
690 expected = 0
691 result = 'Got exit code %d, expected %d\n' % (rc, expected)
692 if expected != rc:
693 return (False, result + report)
694
695 if "fail" in report:
696 return (False, "Found 'fail' in report\n" + report)
697
698 return (True, "")
699
700 def reload(self):
701 '''Reload daemon'''
702 rc, report = cmd([self.initscript, 'force-reload'])
703 expected = 0
704 result = 'Got exit code %d, expected %d\n' % (rc, expected)
705 if expected != rc:
706 return (False, result + report)
707
708 if "fail" in report:
709 return (False, "Found 'fail' in report\n" + report)
710
711 return (True, "")
712
713 def restart(self):
714 '''Restart daemon'''
715 (res, str) = self.stop()
716 if not res:
717 return (res, str)
718
719 (res, str) = self.start()
720 if not res:
721 return (res, str)
722
723 return (True, "")
724
725 def status(self):
726 '''Check daemon status'''
727 rc, report = cmd([self.initscript, 'status'])
728 expected = 0
729 result = 'Got exit code %d, expected %d\n' % (rc, expected)
730 if expected != rc:
731 return (False, result + report)
732
733 if "fail" in report:
734 return (False, "Found 'fail' in report\n" + report)
735
736 return (True, "")
737
738class TestlibManager(object):
739 '''Singleton class used to set up per-test-run information'''
740 def __init__(self):
741 # Set glibc aborts to dump to stderr instead of the tty so test output
742 # is more sane.
743 os.environ.setdefault('LIBC_FATAL_STDERR_','1')
744
745 # check verbosity
746 self.verbosity = False
747 if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
748 self.verbosity = True
749
750 # Load LSB release file
751 self.lsb_release = dict()
752 if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
753 raise OSError, "Please install 'lsb-release'"
754 for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].splitlines():
755 field, value = line.split(':',1)
756 value=value.strip()
757 field=field.strip()
758 # Convert numerics
759 try:
760 value = float(value)
761 except:
762 pass
763 self.lsb_release.setdefault(field,value)
764
765 # FIXME: hack OEM releases into known-Ubuntu versions
766 if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
767 if self.lsb_release['Release'] == 1.0:
768 self.lsb_release['Distributor ID'] = "Ubuntu"
769 self.lsb_release['Release'] = 8.04
770 else:
771 raise OSError, "Unknown version of HP MIE"
772
773 # FIXME: hack to assume a most-recent release if we're not
774 # running under Ubuntu.
775 if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
776 self.lsb_release['Release'] = 10000
777 # Adjust Linaro release to pretend to be Ubuntu
778 if self.lsb_release['Distributor ID'] in ["Linaro"]:
779 self.lsb_release['Distributor ID'] = "Ubuntu"
780 self.lsb_release['Release'] -= 0.01
781
782 # Load arch
783 if not os.path.exists('/usr/bin/dpkg'):
784 machine = cmd(['uname','-m'])[1].strip()
785 if machine.endswith('86'):
786 self.dpkg_arch = 'i386'
787 elif machine.endswith('_64'):
788 self.dpkg_arch = 'amd64'
789 elif machine.startswith('arm'):
790 self.dpkg_arch = 'armel'
791 else:
792 raise ValueError, "Unknown machine type '%s'" % (machine)
793 else:
794 self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
795
796 # Find kernel version
797 self.kernel_is_ubuntu = False
798 self.kernel_version_signature = None
799 self.kernel_version = cmd(["uname","-r"])[1].strip()
800 versig = '/proc/version_signature'
801 if os.path.exists(versig):
802 self.kernel_is_ubuntu = True
803 self.kernel_version_signature = file(versig).read().strip()
804 self.kernel_version_ubuntu = self.kernel_version
805 elif os.path.exists('/usr/bin/dpkg'):
806 # this can easily be inaccurate but is only an issue for Dapper
807 rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
808 if rc == 0:
809 self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
810 self.kernel_version_ubuntu = self.kernel_version_signature
811 if self.kernel_version_signature == None:
812 # Attempt to fall back to something for non-Debian-based
813 self.kernel_version_signature = self.kernel_version
814 self.kernel_version_ubuntu = self.kernel_version
815 # Build ubuntu version without hardware suffix
816 try:
817 self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
818 except:
819 pass
820
821 # Find gcc version
822 self.gcc_version = get_gcc_version('gcc')
823
824 # Find libc
825 self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
826
827 # Report self
828 if self.verbosity:
829 kernel = self.kernel_version_ubuntu
830 if kernel != self.kernel_version_signature:
831 kernel += " (%s)" % (self.kernel_version_signature)
832 print >>sys.stdout, "Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
833 sys.argv[0],
834 self.lsb_release['Distributor ID'],
835 self.lsb_release['Release'],
836 kernel,
837 self.dpkg_arch,
838 os.geteuid(), os.getuid(),
839 os.environ.get('SUDO_USER', ''))
840 sys.stdout.flush()
841
842 # Additional heuristics
843 #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
844 # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
845 # sys.stdout.flush()
846 # time.sleep(0.5)
847 # sys.stdout.write("destroyed\n")
848 # time.sleep(0.5)
849
850 def hello(self, msg):
851 print >>sys.stderr, "Hello from %s" % (msg)
852# The central instance
853manager = TestlibManager()
854
855class TestlibCase(unittest.TestCase):
856 def __init__(self, *args):
857 '''This is called for each TestCase test instance, which isn't much better
858 than SetUp.'''
859
860 unittest.TestCase.__init__(self, *args)
861
862 # Attach to and duplicate dicts from manager singleton
863 self.manager = manager
864 #self.manager.hello(repr(self) + repr(*args))
865 self.my_verbosity = self.manager.verbosity
866 self.lsb_release = self.manager.lsb_release
867 self.dpkg_arch = self.manager.dpkg_arch
868 self.kernel_version = self.manager.kernel_version
869 self.kernel_version_signature = self.manager.kernel_version_signature
870 self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
871 self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
872 self.gcc_version = self.manager.gcc_version
873 self.path_libc = self.manager.path_libc
874
875 def version_compare(self, one, two):
876 return apt_pkg.VersionCompare(one,two)
877
878 def assertFileType(self, filename, filetype):
879 '''Checks the file type of the file specified'''
880
881 (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
882 out = out.strip()
883 expected = 0
884 # Absolutely no idea why this happens on Hardy
885 if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
886 rc = 0
887 result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
888 self.assertEquals(expected, rc, result)
889
890 filetype = '^%s$' % (filetype)
891 result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
892 self.assertNotEquals(None, re.search(filetype, out), result)
893
894 def yank_commonname_from_cert(self, certfile):
895 '''Extract the commonName from a given PEM'''
896 rc, out = cmd(['openssl','asn1parse','-in',certfile])
897 if rc == 0:
898 ready = False
899 for line in out.splitlines():
900 if ready:
901 return line.split(':')[-1]
902 if ':commonName' in line:
903 ready = True
904 return socket.getfqdn()
905
906 def announce(self, text):
907 if self.my_verbosity:
908 print >>sys.stdout, "(%s) " % (text),
909 sys.stdout.flush()
910
911 def make_clean(self):
912 rc, output = self.shell_cmd(['make','clean'])
913 self.assertEquals(rc, 0, output)
914
915 def get_makefile_compiler(self):
916 # Find potential compiler name
917 compiler = 'gcc'
918 if os.path.exists('Makefile'):
919 for line in open('Makefile'):
920 if line.startswith('CC') and '=' in line:
921 items = [x.strip() for x in line.split('=')]
922 if items[0] == 'CC':
923 compiler = items[1]
924 break
925 return compiler
926
927 def make_target(self, target, expected=0):
928 '''Compile a target and report output'''
929
930 compiler = self.get_makefile_compiler()
931 rc, output = self.shell_cmd(['make',target])
932 self.assertEquals(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
933 self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
934 return output
935
936 # call as return testlib.skipped()
937 def _skipped(self, reason=""):
938 '''Provide a visible way to indicate that a test was skipped'''
939 if reason != "":
940 reason = ': %s' % (reason)
941 self.announce("skipped%s" % (reason))
942 return False
943
944 def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
945 argstr = "'" + "', '".join(args).strip() + "'"
946 rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
947 report = 'Command: ' + argstr + '\nOutput:\n' + out
948 return rc, report, out
949
950 def shell_cmd(self, args, stdin=None):
951 return cmd(args,stdin=stdin)
952
953 def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
954 '''Test a shell command matches a specific exit code'''
955 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
956 result = 'Got exit code %d, expected %d\n' % (rc, expected)
957 self.assertEquals(expected, rc, msg + result + report)
958
959 def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
960 '''Test a shell command doesn't match a specific exit code'''
961 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
962 result = 'Got (unwanted) exit code %d\n' % rc
963 self.assertNotEquals(unwanted, rc, msg + result + report)
964
965 def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
966 '''Test a shell command contains a specific output'''
967 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
968 result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
969 if not invert:
970 self.assertTrue(text in out, msg + result + report)
971 else:
972 self.assertFalse(text in out, msg + result + report)
973
974 def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
975 '''Test a shell command matches a specific output'''
976 rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
977 result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
978 if not invert:
979 self.assertEquals(text, out, msg + result + report)
980 else:
981 self.assertNotEquals(text, out, msg + result + report)
982 if expected != None:
983 result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
984 self.assertEquals(rc, expected, msg + result + report)
985
986 def _word_find(self, report, content, invert=False):
987 '''Check for a specific string'''
988 if invert:
989 warning = 'Found "%s"\n' % content
990 self.assertTrue(content not in report, warning + report)
991 else:
992 warning = 'Could not find "%s"\n' % content
993 self.assertTrue(content in report, warning + report)
994
995 def _test_sysctl_value(self, path, expected, msg=None, exists=True):
996 sysctl = '/proc/sys/%s' % (path)
997 self.assertEquals(exists, os.path.exists(sysctl), sysctl)
998 value = None
999 if exists:
1000 value = int(file(sysctl).read())
1001 report = "%s is not %d: %d" % (sysctl, expected, value)
1002 if msg:
1003 report += " (%s)" % (msg)
1004 self.assertEquals(value, expected, report)
1005 return value
1006
1007 def set_sysctl_value(self, path, desired):
1008 sysctl = '/proc/sys/%s' % (path)
1009 self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
1010 file(sysctl,'w').write(str(desired))
1011 self._test_sysctl_value(path, desired)
1012
1013 def kernel_at_least(self, introduced):
1014 return self.version_compare(self.kernel_version_ubuntu,
1015 introduced) >= 0
1016
1017 def kernel_claims_cve_fixed(self, cve):
1018 changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
1019 if os.path.exists(changelog):
1020 for line in gzip.open(changelog):
1021 if cve in line and not "revert" in line and not "Revert" in line:
1022 return True
1023 return False
1024
1025class TestGroup:
1026 '''Create a temporary test group and remove it again in the dtor.'''
1027
1028 def __init__(self, group=None, lower=False):
1029 '''Create a new group'''
1030
1031 self.group = None
1032 if group:
1033 if group_exists(group):
1034 raise ValueError, 'group name already exists'
1035 else:
1036 while(True):
1037 group = random_string(7,lower=lower)
1038 if not group_exists(group):
1039 break
1040
1041 assert subprocess.call(['groupadd',group]) == 0
1042 self.group = group
1043 g = grp.getgrnam(self.group)
1044 self.gid = g[2]
1045
1046 def __del__(self):
1047 '''Remove the created group.'''
1048
1049 if self.group:
1050 rc, report = cmd(['groupdel', self.group])
1051 assert rc == 0
1052
1053class TestUser:
1054 '''Create a temporary test user and remove it again in the dtor.'''
1055
1056 def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
1057 '''Create a new user account with a random password.
1058
1059 By default, the login name is random, too, but can be explicitly
1060 specified with 'login'. By default, a home directory is created, this
1061 can be suppressed with 'home=False'.'''
1062
1063 self.login = None
1064
1065 if os.geteuid() != 0:
1066 raise ValueError, "You must be root to run this test"
1067
1068 if login:
1069 if login_exists(login):
1070 raise ValueError, 'login name already exists'
1071 else:
1072 while(True):
1073 login = 't' + random_string(7,lower=lower)
1074 if not login_exists(login):
1075 break
1076
1077 self.salt = random_string(2)
1078 self.password = random_string(8,lower=lower)
1079 self.crypted = crypt.crypt(self.password, self.salt)
1080
1081 creation = ['useradd', '-p', self.crypted]
1082 if home:
1083 creation += ['-m']
1084 if group:
1085 creation += ['-G',group]
1086 if uidmin:
1087 creation += ['-K','UID_MIN=%d'%uidmin]
1088 if shell:
1089 creation += ['-s',shell]
1090 creation += [login]
1091 assert subprocess.call(creation) == 0
1092 # Set GECOS
1093 assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
1094
1095 self.login = login
1096 p = pwd.getpwnam(self.login)
1097 self.uid = p[2]
1098 self.gid = p[3]
1099 self.gecos = p[4]
1100 self.home = p[5]
1101 self.shell = p[6]
1102
1103 def __del__(self):
1104 '''Remove the created user account.'''
1105
1106 if self.login:
1107 # sanity check the login name so we don't accidentally wipe too much
1108 if len(self.login)>3 and not '/' in self.login:
1109 subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
1110 rc, report = cmd(['userdel', '-f', self.login])
1111 assert rc == 0
1112
1113 def add_to_group(self, group):
1114 '''Add user to the specified group name'''
1115 rc, report = cmd(['usermod', '-G', group, self.login])
1116 if rc != 0:
1117 print report
1118 assert rc == 0
1119
1120# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
1121class TimeoutFunctionException(Exception):
1122 """Exception to raise on a timeout"""
1123 pass
1124class TimeoutFunction:
1125 def __init__(self, function, timeout):
1126 self.timeout = timeout
1127 self.function = function
1128
1129 def handle_timeout(self, signum, frame):
1130 raise TimeoutFunctionException()
1131
1132 def __call__(self, *args, **kwargs):
1133 old = signal.signal(signal.SIGALRM, self.handle_timeout)
1134 signal.alarm(self.timeout)
1135 try:
1136 result = self.function(*args, **kwargs)
1137 finally:
1138 signal.signal(signal.SIGALRM, old)
1139 signal.alarm(0)
1140 return result
1141
1142def main():
1143 print "hi"
1144 unittest.main()

Subscribers

People subscribed via source and target branches

to all changes: