Merge lp:~devcamcar/nova/improve_smoketests into lp:~hudson-openstack/nova/trunk

Proposed by Devin Carlen
Status: Merged
Approved by: Soren Hansen
Approved revision: 397
Merged at revision: 456
Proposed branch: lp:~devcamcar/nova/improve_smoketests
Merge into: lp:~hudson-openstack/nova/trunk
Diff against target: 1321 lines (+575/-706)
6 files modified
smoketests/admin_smoketests.py (+92/-0)
smoketests/base.py (+154/-0)
smoketests/flags.py (+3/-10)
smoketests/novatestcase.py (+0/-130)
smoketests/smoketest.py (+0/-566)
smoketests/user_smoketests.py (+326/-0)
To merge this branch: bzr merge lp:~devcamcar/nova/improve_smoketests
Reviewer Review Type Date Requested Status
Soren Hansen (community) Approve
Vish Ishaya (community) Approve
Jay Pipes (community) Needs Information
Review via email: mp+40155@code.launchpad.net

Commit message

Refactored smoketests to use novarc environment and to separate user and admin specific tests.

Description of the change

Refactored smoketests to use novarc environment and to separate user and admin specific tests.

To post a comment you must log in.
Revision history for this message
Devin Carlen (devcamcar) wrote :

This branch addresses several underlying problems with the smoke tests. Most importantly, the tests had bit rotted. The tests were all grouped together and all relied on having nova admin credentials to run all of the tests, which is not ideal. Now the tests are split into admin and user tests.

The tests require you to source the appropriate novarc file prior to running them. For admin tests, you will need to source the admin user's credentials.

The tests also no longer rely on complicated network configurations. The network is left up to the user. If you need to be connected to a VPN to communicate with nova, then you need to do so for these tests as well.

This allows more specific and fine grained testing to be done, and will allow me to add some tests soon for launching VPN instances, connecting via OpenVPN, and doing deeper testing that way as well.

Revision history for this message
Jay Pipes (jaypipes) wrote :

I don't see the novarc files. Did you bzr add them?

review: Needs Information
Revision history for this message
Devin Carlen (devcamcar) wrote :

No novarc files are included. I may have explained it a bit confusingly, but basically you are supposed to source whatever novarc file you want to use to run the tests. The user tests can be run as whomever you want. The admin tests need to be run using admin's novarc.

Revision history for this message
Vish Ishaya (vishvananda) wrote :

lgtm

review: Approve
Revision history for this message
Soren Hansen (soren) wrote :

Yay! Now we just need a way to run these from Hudson.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'smoketests/admin_smoketests.py'
--- smoketests/admin_smoketests.py 1970-01-01 00:00:00 +0000
+++ smoketests/admin_smoketests.py 2010-11-04 22:56:08 +0000
@@ -0,0 +1,92 @@
1# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
3# Copyright 2010 United States Government as represented by the
4# Administrator of the National Aeronautics and Space Administration.
5# All Rights Reserved.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18
19import os
20import random
21import sys
22import time
23import unittest
24import zipfile
25
26from nova import adminclient
27from smoketests import flags
28from smoketests import base
29
30
31SUITE_NAMES = '[user]'
32
33FLAGS = flags.FLAGS
34flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
35
36# TODO(devamcar): Use random tempfile
37ZIP_FILENAME = '/tmp/nova-me-x509.zip'
38
39TEST_PREFIX = 'test%s' % int(random.random()*1000000)
40TEST_USERNAME = '%suser' % TEST_PREFIX
41TEST_PROJECTNAME = '%sproject' % TEST_PREFIX
42
43
44class AdminSmokeTestCase(base.SmokeTestCase):
45 def setUp(self):
46 self.admin = adminclient.NovaAdminClient(
47 access_key=os.getenv('EC2_ACCESS_KEY'),
48 secret_key=os.getenv('EC2_SECRET_KEY'),
49 clc_url=os.getenv('EC2_URL'),
50 region=FLAGS.region)
51
52
53class UserTests(AdminSmokeTestCase):
54 """ Test admin credentials and user creation. """
55
56 def test_001_admin_can_connect(self):
57 conn = self.admin.connection_for('admin', 'admin')
58 self.assert_(conn)
59
60 def test_002_admin_can_create_user(self):
61 user = self.admin.create_user(TEST_USERNAME)
62 self.assertEqual(user.username, TEST_USERNAME)
63
64 def test_003_admin_can_create_project(self):
65 project = self.admin.create_project(TEST_PROJECTNAME,
66 TEST_USERNAME)
67 self.assertEqual(project.projectname, TEST_PROJECTNAME)
68
69 def test_004_user_can_download_credentials(self):
70 buf = self.admin.get_zip(TEST_USERNAME, TEST_PROJECTNAME)
71 output = open(ZIP_FILENAME, 'w')
72 output.write(buf)
73 output.close()
74
75 zip = zipfile.ZipFile(ZIP_FILENAME, 'a', zipfile.ZIP_DEFLATED)
76 bad = zip.testzip()
77 zip.close()
78
79 self.failIf(bad)
80
81 def test_999_tearDown(self):
82 self.admin.delete_project(TEST_PROJECTNAME)
83 self.admin.delete_user(TEST_USERNAME)
84 try:
85 os.remove(ZIP_FILENAME)
86 except:
87 pass
88
89if __name__ == "__main__":
90 suites = {'user': unittest.makeSuite(UserTests)}
91 sys.exit(base.run_tests(suites))
92
093
=== added file 'smoketests/base.py'
--- smoketests/base.py 1970-01-01 00:00:00 +0000
+++ smoketests/base.py 2010-11-04 22:56:08 +0000
@@ -0,0 +1,154 @@
1# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
3# Copyright 2010 United States Government as represented by the
4# Administrator of the National Aeronautics and Space Administration.
5# All Rights Reserved.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18
19import boto
20import commands
21import httplib
22import os
23import paramiko
24import random
25import sys
26import unittest
27from boto.ec2.regioninfo import RegionInfo
28
29from smoketests import flags
30
31FLAGS = flags.FLAGS
32
33
34class SmokeTestCase(unittest.TestCase):
35 def connect_ssh(self, ip, key_name):
36 # TODO(devcamcar): set a more reasonable connection timeout time
37 key = paramiko.RSAKey.from_private_key_file('/tmp/%s.pem' % key_name)
38 client = paramiko.SSHClient()
39 client.set_missing_host_key_policy(paramiko.WarningPolicy())
40 client.connect(ip, username='root', pkey=key)
41 stdin, stdout, stderr = client.exec_command('uptime')
42 print 'uptime: ', stdout.read()
43 return client
44
45 def can_ping(self, ip):
46 """ Attempt to ping the specified IP, and give up after 1 second. """
47
48 # NOTE(devcamcar): ping timeout flag is different in OSX.
49 if sys.platform == 'darwin':
50 timeout_flag = 't'
51 else:
52 timeout_flag = 'w'
53
54 status, output = commands.getstatusoutput('ping -c1 -%s1 %s' %
55 (timeout_flag, ip))
56 return status == 0
57
58 def connection_for_env(self, **kwargs):
59 """
60 Returns a boto ec2 connection for the current environment.
61 """
62 access_key = os.getenv('EC2_ACCESS_KEY')
63 secret_key = os.getenv('EC2_SECRET_KEY')
64 clc_url = os.getenv('EC2_URL')
65
66 if not access_key or not secret_key or not clc_url:
67 raise Exception('Missing EC2 environment variables. Please source '
68 'the appropriate novarc file before running this '
69 'test.')
70
71 parts = self.split_clc_url(clc_url)
72 return boto.connect_ec2(aws_access_key_id=access_key,
73 aws_secret_access_key=secret_key,
74 is_secure=parts['is_secure'],
75 region=RegionInfo(None,
76 'nova',
77 parts['ip']),
78 port=parts['port'],
79 path='/services/Cloud',
80 **kwargs)
81
82 def split_clc_url(self, clc_url):
83 """
84 Splits a cloud controller endpoint url.
85 """
86 parts = httplib.urlsplit(clc_url)
87 is_secure = parts.scheme == 'https'
88 ip, port = parts.netloc.split(':')
89 return {'ip': ip, 'port': int(port), 'is_secure': is_secure}
90
91 def create_key_pair(self, conn, key_name):
92 try:
93 os.remove('/tmp/%s.pem' % key_name)
94 except:
95 pass
96 key = conn.create_key_pair(key_name)
97 key.save('/tmp/')
98 return key
99
100 def delete_key_pair(self, conn, key_name):
101 conn.delete_key_pair(key_name)
102 try:
103 os.remove('/tmp/%s.pem' % key_name)
104 except:
105 pass
106
107 def bundle_image(self, image, kernel=False):
108 cmd = 'euca-bundle-image -i %s' % image
109 if kernel:
110 cmd += ' --kernel true'
111 status, output = commands.getstatusoutput(cmd)
112 if status != 0:
113 print '%s -> \n %s' % (cmd, output)
114 raise Exception(output)
115 return True
116
117 def upload_image(self, bucket_name, image):
118 cmd = 'euca-upload-bundle -b %s -m /tmp/%s.manifest.xml' % (bucket_name, image)
119 status, output = commands.getstatusoutput(cmd)
120 if status != 0:
121 print '%s -> \n %s' % (cmd, output)
122 raise Exception(output)
123 return True
124
125 def delete_bundle_bucket(self, bucket_name):
126 cmd = 'euca-delete-bundle --clear -b %s' % (bucket_name)
127 status, output = commands.getstatusoutput(cmd)
128 if status != 0:
129 print '%s -> \n%s' % (cmd, output)
130 raise Exception(output)
131 return True
132
133def run_tests(suites):
134 argv = FLAGS(sys.argv)
135
136 if not os.getenv('EC2_ACCESS_KEY'):
137 print >> sys.stderr, 'Missing EC2 environment variables. Please ' \
138 'source the appropriate novarc file before ' \
139 'running this test.'
140 return 1
141
142 if FLAGS.suite:
143 try:
144 suite = suites[FLAGS.suite]
145 except KeyError:
146 print >> sys.stderr, 'Available test suites:', \
147 ', '.join(suites.keys())
148 return 1
149
150 unittest.TextTestRunner(verbosity=2).run(suite)
151 else:
152 for suite in suites.itervalues():
153 unittest.TextTestRunner(verbosity=2).run(suite)
154
0155
=== modified file 'smoketests/flags.py'
--- smoketests/flags.py 2010-07-15 16:07:40 +0000
+++ smoketests/flags.py 2010-11-04 22:56:08 +0000
@@ -1,7 +1,7 @@
1# vim: tabstop=4 shiftwidth=4 softtabstop=41# vim: tabstop=4 shiftwidth=4 softtabstop=4
22
3# Copyright 2010 United States Government as represented by the3# Copyright 2010 United States Government as represented by the
4# Administrator of the National Aeronautics and Space Administration. 4# Administrator of the National Aeronautics and Space Administration.
5# All Rights Reserved.5# All Rights Reserved.
6#6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may7# Licensed under the Apache License, Version 2.0 (the "License"); you may
@@ -33,13 +33,6 @@
33# __GLOBAL FLAGS ONLY__33# __GLOBAL FLAGS ONLY__
34# Define any app-specific flags in their own files, docs at:34# Define any app-specific flags in their own files, docs at:
35# http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#3935# http://code.google.com/p/python-gflags/source/browse/trunk/gflags.py#39
36DEFINE_string('admin_access_key', 'admin', 'Access key for admin user')36DEFINE_string('region', 'nova', 'Region to use')
37DEFINE_string('admin_secret_key', 'admin', 'Secret key for admin user')37DEFINE_string('test_image', 'ami-tiny', 'Image to use for launch tests')
38DEFINE_string('clc_ip', '127.0.0.1', 'IP of cloud controller API')
39DEFINE_string('bundle_kernel', 'openwrt-x86-vmlinuz',
40 'Local kernel file to use for bundling tests')
41DEFINE_string('bundle_image', 'openwrt-x86-ext2.image',
42 'Local image file to use for bundling tests')
43#DEFINE_string('vpn_image_id', 'ami-CLOUDPIPE',
44# 'AMI for cloudpipe vpn server')
4538
4639
=== removed file 'smoketests/novatestcase.py'
--- smoketests/novatestcase.py 2010-07-15 16:07:40 +0000
+++ smoketests/novatestcase.py 1970-01-01 00:00:00 +0000
@@ -1,130 +0,0 @@
1# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
3# Copyright 2010 United States Government as represented by the
4# Administrator of the National Aeronautics and Space Administration.
5# All Rights Reserved.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18
19import commands
20import os
21import random
22import sys
23import unittest
24
25
26import paramiko
27
28from nova import adminclient
29from smoketests import flags
30
31FLAGS = flags.FLAGS
32
33
34class NovaTestCase(unittest.TestCase):
35 def setUp(self):
36 self.nova_admin = adminclient.NovaAdminClient(
37 access_key=FLAGS.admin_access_key,
38 secret_key=FLAGS.admin_secret_key,
39 clc_ip=FLAGS.clc_ip)
40
41 def tearDown(self):
42 pass
43
44 def connect_ssh(self, ip, key_name):
45 # TODO(devcamcar): set a more reasonable connection timeout time
46 key = paramiko.RSAKey.from_private_key_file('/tmp/%s.pem' % key_name)
47 client = paramiko.SSHClient()
48 client.load_system_host_keys()
49 client.set_missing_host_key_policy(paramiko.WarningPolicy())
50 client.connect(ip, username='root', pkey=key)
51 stdin, stdout, stderr = client.exec_command('uptime')
52 print 'uptime: ', stdout.read()
53 return client
54
55 def can_ping(self, ip):
56 return commands.getstatusoutput('ping -c 1 %s' % ip)[0] == 0
57
58 @property
59 def admin(self):
60 return self.nova_admin.connection_for('admin')
61
62 def connection_for(self, username):
63 return self.nova_admin.connection_for(username)
64
65 def create_user(self, username):
66 return self.nova_admin.create_user(username)
67
68 def get_user(self, username):
69 return self.nova_admin.get_user(username)
70
71 def delete_user(self, username):
72 return self.nova_admin.delete_user(username)
73
74 def get_signed_zip(self, username):
75 return self.nova_admin.get_zip(username)
76
77 def create_key_pair(self, conn, key_name):
78 try:
79 os.remove('/tmp/%s.pem' % key_name)
80 except:
81 pass
82 key = conn.create_key_pair(key_name)
83 key.save('/tmp/')
84 return key
85
86 def delete_key_pair(self, conn, key_name):
87 conn.delete_key_pair(key_name)
88 try:
89 os.remove('/tmp/%s.pem' % key_name)
90 except:
91 pass
92
93 def bundle_image(self, image, kernel=False):
94 cmd = 'euca-bundle-image -i %s' % image
95 if kernel:
96 cmd += ' --kernel true'
97 status, output = commands.getstatusoutput(cmd)
98 if status != 0:
99 print '%s -> \n %s' % (cmd, output)
100 raise Exception(output)
101 return True
102
103 def upload_image(self, bucket_name, image):
104 cmd = 'euca-upload-bundle -b %s -m /tmp/%s.manifest.xml' % (bucket_name, image)
105 status, output = commands.getstatusoutput(cmd)
106 if status != 0:
107 print '%s -> \n %s' % (cmd, output)
108 raise Exception(output)
109 return True
110
111 def delete_bundle_bucket(self, bucket_name):
112 cmd = 'euca-delete-bundle --clear -b %s' % (bucket_name)
113 status, output = commands.getstatusoutput(cmd)
114 if status != 0:
115 print '%s -> \n%s' % (cmd, output)
116 raise Exception(output)
117 return True
118
119 def register_image(self, bucket_name, manifest):
120 conn = nova_admin.connection_for('admin')
121 return conn.register_image("%s/%s.manifest.xml" % (bucket_name, manifest))
122
123 def setUp_test_image(self, image, kernel=False):
124 self.bundle_image(image, kernel=kernel)
125 bucket = "auto_test_%s" % int(random.random() * 1000000)
126 self.upload_image(bucket, image)
127 return self.register_image(bucket, image)
128
129 def tearDown_test_image(self, conn, image_id):
130 conn.deregister_image(image_id)
1310
=== removed file 'smoketests/smoketest.py'
--- smoketests/smoketest.py 2010-07-15 16:07:40 +0000
+++ smoketests/smoketest.py 1970-01-01 00:00:00 +0000
@@ -1,566 +0,0 @@
1# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
3# Copyright 2010 United States Government as represented by the
4# Administrator of the National Aeronautics and Space Administration.
5# All Rights Reserved.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18
19import commands
20import os
21import random
22import re
23import sys
24import time
25import unittest
26import zipfile
27
28
29import paramiko
30
31from smoketests import flags
32from smoketests import novatestcase
33
34SUITE_NAMES = '[user, image, security, public_network, volume]'
35
36FLAGS = flags.FLAGS
37flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
38
39# TODO(devamcar): Use random tempfile
40ZIP_FILENAME = '/tmp/nova-me-x509.zip'
41
42data = {}
43
44test_prefix = 'test%s' % int(random.random()*1000000)
45test_username = '%suser' % test_prefix
46test_bucket = '%s_bucket' % test_prefix
47test_key = '%s_key' % test_prefix
48
49# Test admin credentials and user creation
50class UserTests(novatestcase.NovaTestCase):
51 def test_001_admin_can_connect(self):
52 conn = self.connection_for('admin')
53 self.assert_(conn)
54
55 def test_002_admin_can_create_user(self):
56 userinfo = self.create_user(test_username)
57 self.assertEqual(userinfo.username, test_username)
58
59 def test_003_user_can_download_credentials(self):
60 buf = self.get_signed_zip(test_username)
61 output = open(ZIP_FILENAME, 'w')
62 output.write(buf)
63 output.close()
64
65 zip = zipfile.ZipFile(ZIP_FILENAME, 'a', zipfile.ZIP_DEFLATED)
66 bad = zip.testzip()
67 zip.close()
68
69 self.failIf(bad)
70
71 def test_999_tearDown(self):
72 self.delete_user(test_username)
73 user = self.get_user(test_username)
74 self.assert_(user is None)
75 try:
76 os.remove(ZIP_FILENAME)
77 except:
78 pass
79
80# Test image bundling, registration, and launching
81class ImageTests(novatestcase.NovaTestCase):
82 def test_000_setUp(self):
83 self.create_user(test_username)
84
85 def test_001_admin_can_bundle_image(self):
86 self.assertTrue(self.bundle_image(FLAGS.bundle_image))
87
88 def test_002_admin_can_upload_image(self):
89 self.assertTrue(self.upload_image(test_bucket, FLAGS.bundle_image))
90
91 def test_003_admin_can_register_image(self):
92 image_id = self.register_image(test_bucket, FLAGS.bundle_image)
93 self.assert_(image_id is not None)
94 data['image_id'] = image_id
95
96 def test_004_admin_can_bundle_kernel(self):
97 self.assertTrue(self.bundle_image(FLAGS.bundle_kernel, kernel=True))
98
99 def test_005_admin_can_upload_kernel(self):
100 self.assertTrue(self.upload_image(test_bucket, FLAGS.bundle_kernel))
101
102 def test_006_admin_can_register_kernel(self):
103 # FIXME(devcamcar): registration should verify that bucket/manifest
104 # exists before returning successfully.
105 kernel_id = self.register_image(test_bucket, FLAGS.bundle_kernel)
106 self.assert_(kernel_id is not None)
107 data['kernel_id'] = kernel_id
108
109 def test_007_admin_images_are_available_within_10_seconds(self):
110 for i in xrange(10):
111 image = self.admin.get_image(data['image_id'])
112 if image and image.state == 'available':
113 break
114 time.sleep(1)
115 else:
116 print image.state
117 self.assert_(False) # wasn't available within 10 seconds
118 self.assert_(image.type == 'machine')
119
120 for i in xrange(10):
121 kernel = self.admin.get_image(data['kernel_id'])
122 if kernel and kernel.state == 'available':
123 break
124 time.sleep(1)
125 else:
126 self.assert_(False) # wasn't available within 10 seconds
127 self.assert_(kernel.type == 'kernel')
128
129 def test_008_admin_can_describe_image_attribute(self):
130 attrs = self.admin.get_image_attribute(data['image_id'],
131 'launchPermission')
132 self.assert_(attrs.name, 'launch_permission')
133
134 def test_009_me_cannot_see_non_public_images(self):
135 conn = self.connection_for(test_username)
136 images = conn.get_all_images(image_ids=[data['image_id']])
137 self.assertEqual(len(images), 0)
138
139 def test_010_admin_can_modify_image_launch_permission(self):
140 conn = self.connection_for(test_username)
141
142 self.admin.modify_image_attribute(image_id=data['image_id'],
143 operation='add',
144 attribute='launchPermission',
145 groups='all')
146
147 image = conn.get_image(data['image_id'])
148 self.assertEqual(image.id, data['image_id'])
149
150 def test_011_me_can_list_public_images(self):
151 conn = self.connection_for(test_username)
152 images = conn.get_all_images(image_ids=[data['image_id']])
153 self.assertEqual(len(images), 1)
154 pass
155
156 def test_012_me_can_see_launch_permission(self):
157 attrs = self.admin.get_image_attribute(data['image_id'],
158 'launchPermission')
159 self.assert_(attrs.name, 'launch_permission')
160 self.assert_(attrs.groups[0], 'all')
161
162 # FIXME: add tests that user can launch image
163
164# def test_013_user_can_launch_admin_public_image(self):
165# # TODO: Use openwrt kernel instead of default kernel
166# conn = self.connection_for(test_username)
167# reservation = conn.run_instances(data['image_id'])
168# self.assertEqual(len(reservation.instances), 1)
169# data['my_instance_id'] = reservation.instances[0].id
170
171# def test_014_instances_launch_within_30_seconds(self):
172# pass
173
174# def test_015_user_can_terminate(self):
175# conn = self.connection_for(test_username)
176# terminated = conn.terminate_instances(
177# instance_ids=[data['my_instance_id']])
178# self.assertEqual(len(terminated), 1)
179
180 def test_016_admin_can_deregister_kernel(self):
181 self.assertTrue(self.admin.deregister_image(data['kernel_id']))
182
183 def test_017_admin_can_deregister_image(self):
184 self.assertTrue(self.admin.deregister_image(data['image_id']))
185
186 def test_018_admin_can_delete_bundle(self):
187 self.assertTrue(self.delete_bundle_bucket(test_bucket))
188
189 def test_999_tearDown(self):
190 data = {}
191 self.delete_user(test_username)
192
193
194# Test key pairs and security groups
195class SecurityTests(novatestcase.NovaTestCase):
196 def test_000_setUp(self):
197 self.create_user(test_username + '_me')
198 self.create_user(test_username + '_you')
199 data['image_id'] = 'ami-tiny'
200
201 def test_001_me_can_create_keypair(self):
202 conn = self.connection_for(test_username + '_me')
203 key = self.create_key_pair(conn, test_key)
204 self.assertEqual(key.name, test_key)
205
206 def test_002_you_can_create_keypair(self):
207 conn = self.connection_for(test_username + '_you')
208 key = self.create_key_pair(conn, test_key+ 'yourkey')
209 self.assertEqual(key.name, test_key+'yourkey')
210
211 def test_003_me_can_create_instance_with_keypair(self):
212 conn = self.connection_for(test_username + '_me')
213 reservation = conn.run_instances(data['image_id'], key_name=test_key)
214 self.assertEqual(len(reservation.instances), 1)
215 data['my_instance_id'] = reservation.instances[0].id
216
217 def test_004_me_can_obtain_private_ip_within_60_seconds(self):
218 conn = self.connection_for(test_username + '_me')
219 reservations = conn.get_all_instances([data['my_instance_id']])
220 instance = reservations[0].instances[0]
221 # allow 60 seconds to exit pending with IP
222 for x in xrange(60):
223 instance.update()
224 if instance.state != u'pending':
225 break
226 time.sleep(1)
227 else:
228 self.assert_(False)
229 # self.assertEqual(instance.state, u'running')
230 ip = reservations[0].instances[0].private_dns_name
231 self.failIf(ip == '0.0.0.0')
232 data['my_private_ip'] = ip
233 print data['my_private_ip'],
234
235 def test_005_can_ping_private_ip(self):
236 for x in xrange(120):
237 # ping waits for 1 second
238 status, output = commands.getstatusoutput(
239 'ping -c1 -w1 %s' % data['my_private_ip'])
240 if status == 0:
241 break
242 else:
243 self.assert_('could not ping instance')
244 #def test_005_me_cannot_ssh_when_unauthorized(self):
245 # self.assertRaises(paramiko.SSHException, self.connect_ssh,
246 # data['my_private_ip'], 'mykey')
247
248 #def test_006_me_can_authorize_ssh(self):
249 # conn = self.connection_for(test_username + '_me')
250 # self.assertTrue(
251 # conn.authorize_security_group(
252 # 'default',
253 # ip_protocol='tcp',
254 # from_port=22,
255 # to_port=22,
256 # cidr_ip='0.0.0.0/0'
257 # )
258 # )
259
260 def test_007_me_can_ssh_when_authorized(self):
261 conn = self.connect_ssh(data['my_private_ip'], test_key)
262 conn.close()
263
264 #def test_008_me_can_revoke_ssh_authorization(self):
265 # conn = self.connection_for('me')
266 # self.assertTrue(
267 # conn.revoke_security_group(
268 # 'default',
269 # ip_protocol='tcp',
270 # from_port=22,
271 # to_port=22,
272 # cidr_ip='0.0.0.0/0'
273 # )
274 # )
275
276 #def test_009_you_cannot_ping_my_instance(self):
277 # TODO: should ping my_private_ip from with an instance started by you.
278 #self.assertFalse(self.can_ping(data['my_private_ip']))
279
280 def test_010_you_cannot_ssh_to_my_instance(self):
281 try:
282 conn = self.connect_ssh(data['my_private_ip'],
283 test_key + 'yourkey')
284 conn.close()
285 except paramiko.SSHException:
286 pass
287 else:
288 self.fail("expected SSHException")
289
290 def test_999_tearDown(self):
291 conn = self.connection_for(test_username + '_me')
292 self.delete_key_pair(conn, test_key)
293 if data.has_key('my_instance_id'):
294 conn.terminate_instances([data['my_instance_id']])
295
296 conn = self.connection_for(test_username + '_you')
297 self.delete_key_pair(conn, test_key + 'yourkey')
298
299 conn = self.connection_for('admin')
300 self.delete_user(test_username + '_me')
301 self.delete_user(test_username + '_you')
302 #self.tearDown_test_image(conn, data['image_id'])
303
304# TODO: verify wrt image boots
305# build python into wrt image
306# build boto/m2crypto into wrt image
307# build euca2ools into wrt image
308# build a script to download and unpack credentials
309# - return "ok" to stdout for comparison in self.assertEqual()
310# build a script to bundle the instance
311# build a script to upload the bundle
312
313# status, output = commands.getstatusoutput('cmd')
314# if status == 0:
315# print 'ok'
316# else:
317# print output
318
319# Testing rebundling
320class RebundlingTests(novatestcase.NovaTestCase):
321 def test_000_setUp(self):
322 self.create_user('me')
323 self.create_user('you')
324 # TODO: create keypair for me
325 # upload smoketest img
326 # run instance
327
328 def test_001_me_can_download_credentials_within_instance(self):
329 conn = self.connect_ssh(data['my_private_ip'], 'mykey')
330 stdin, stdout = conn.exec_command(
331 'python ~/smoketests/install-credentials.py')
332 conn.close()
333 self.assertEqual(stdout, 'ok')
334
335 def test_002_me_can_rebundle_within_instance(self):
336 conn = self.connect_ssh(data['my_private_ip'], 'mykey')
337 stdin, stdout = conn.exec_command(
338 'python ~/smoketests/rebundle-instance.py')
339 conn.close()
340 self.assertEqual(stdout, 'ok')
341
342 def test_003_me_can_upload_image_within_instance(self):
343 conn = self.connect_ssh(data['my_private_ip'], 'mykey')
344 stdin, stdout = conn.exec_command(
345 'python ~/smoketests/upload-bundle.py')
346 conn.close()
347 self.assertEqual(stdout, 'ok')
348
349 def test_004_me_can_register_image_within_instance(self):
350 conn = self.connect_ssh(data['my_private_ip'], 'mykey')
351 stdin, stdout = conn.exec_command(
352 'python ~/smoketests/register-image.py')
353 conn.close()
354 if re.matches('ami-{\w+}', stdout):
355 data['my_image_id'] = stdout.strip()
356 else:
357 self.fail('expected ami-nnnnnn, got:\n ' + stdout)
358
359 def test_005_you_cannot_see_my_private_image(self):
360 conn = self.connection_for('you')
361 image = conn.get_image(data['my_image_id'])
362 self.assertEqual(image, None)
363
364 def test_006_me_can_make_image_public(self):
365 conn = self.connection_for(test_username)
366 conn.modify_image_attribute(image_id=data['my_image_id'],
367 operation='add',
368 attribute='launchPermission',
369 groups='all')
370
371 def test_007_you_can_see_my_public_image(self):
372 conn = self.connection_for('you')
373 image = conn.get_image(data['my_image_id'])
374 self.assertEqual(image.id, data['my_image_id'])
375
376 def test_999_tearDown(self):
377 self.delete_user('me')
378 self.delete_user('you')
379
380 #if data.has_key('image_id'):
381 # deregister rebundled image
382
383 # TODO: tear down instance
384 # delete keypairs
385 data = {}
386
387# Test elastic IPs
388class ElasticIPTests(novatestcase.NovaTestCase):
389 def test_000_setUp(self):
390 data['image_id'] = 'ami-tiny'
391
392 self.create_user('me')
393 conn = self.connection_for('me')
394 self.create_key_pair(conn, 'mykey')
395
396 conn = self.connection_for('admin')
397 #data['image_id'] = self.setUp_test_image(FLAGS.bundle_image)
398
399 def test_001_me_can_launch_image_with_keypair(self):
400 conn = self.connection_for('me')
401 reservation = conn.run_instances(data['image_id'], key_name='mykey')
402 self.assertEqual(len(reservation.instances), 1)
403 data['my_instance_id'] = reservation.instances[0].id
404
405 def test_002_me_can_allocate_elastic_ip(self):
406 conn = self.connection_for('me')
407 data['my_public_ip'] = conn.allocate_address()
408 self.assert_(data['my_public_ip'].public_ip)
409
410 def test_003_me_can_associate_ip_with_instance(self):
411 self.assertTrue(data['my_public_ip'].associate(data['my_instance_id']))
412
413 def test_004_me_can_ssh_with_public_ip(self):
414 conn = self.connect_ssh(data['my_public_ip'].public_ip, 'mykey')
415 conn.close()
416
417 def test_005_me_can_disassociate_ip_from_instance(self):
418 self.assertTrue(data['my_public_ip'].disassociate())
419
420 def test_006_me_can_deallocate_elastic_ip(self):
421 self.assertTrue(data['my_public_ip'].delete())
422
423 def test_999_tearDown(self):
424 conn = self.connection_for('me')
425 self.delete_key_pair(conn, 'mykey')
426
427 conn = self.connection_for('admin')
428 #self.tearDown_test_image(conn, data['image_id'])
429 data = {}
430
431ZONE = 'nova'
432DEVICE = 'vdb'
433# Test iscsi volumes
434class VolumeTests(novatestcase.NovaTestCase):
435 def test_000_setUp(self):
436 self.create_user(test_username)
437 data['image_id'] = 'ami-tiny' # A7370FE3
438
439 conn = self.connection_for(test_username)
440 self.create_key_pair(conn, test_key)
441 reservation = conn.run_instances(data['image_id'],
442 instance_type='m1.tiny',
443 key_name=test_key)
444 data['instance_id'] = reservation.instances[0].id
445 data['private_ip'] = reservation.instances[0].private_dns_name
446 # wait for instance to show up
447 for x in xrange(120):
448 # ping waits for 1 second
449 status, output = commands.getstatusoutput(
450 'ping -c1 -w1 %s' % data['private_ip'])
451 if status == 0:
452 break
453 else:
454 self.fail('unable to ping instance')
455
456 def test_001_me_can_create_volume(self):
457 conn = self.connection_for(test_username)
458 volume = conn.create_volume(1, ZONE)
459 self.assertEqual(volume.size, 1)
460 data['volume_id'] = volume.id
461 # give network time to find volume
462 time.sleep(5)
463
464 def test_002_me_can_attach_volume(self):
465 conn = self.connection_for(test_username)
466 conn.attach_volume(
467 volume_id = data['volume_id'],
468 instance_id = data['instance_id'],
469 device = '/dev/%s' % DEVICE
470 )
471 # give instance time to recognize volume
472 time.sleep(5)
473
474 def test_003_me_can_mount_volume(self):
475 conn = self.connect_ssh(data['private_ip'], test_key)
476 # FIXME(devcamcar): the tiny image doesn't create the node properly
477 # this will make /dev/vd* if it doesn't exist
478 stdin, stdout, stderr = conn.exec_command(
479 'grep %s /proc/partitions |' + \
480 '`awk \'{print "mknod /dev/"$4" b "$1" "$2}\'`' % DEVICE)
481 commands = []
482 commands.append('mkdir -p /mnt/vol')
483 commands.append('mkfs.ext2 /dev/%s' % DEVICE)
484 commands.append('mount /dev/%s /mnt/vol' % DEVICE)
485 commands.append('echo success')
486 stdin, stdout, stderr = conn.exec_command(' && '.join(commands))
487 out = stdout.read()
488 conn.close()
489 if not out.strip().endswith('success'):
490 self.fail('Unable to mount: %s %s' % (out, stderr.read()))
491
492 def test_004_me_can_write_to_volume(self):
493 conn = self.connect_ssh(data['private_ip'], test_key)
494 # FIXME(devcamcar): This doesn't fail if the volume hasn't been mounted
495 stdin, stdout, stderr = conn.exec_command(
496 'echo hello > /mnt/vol/test.txt')
497 err = stderr.read()
498 conn.close()
499 if len(err) > 0:
500 self.fail('Unable to write to mount: %s' % (err))
501
502 def test_005_volume_is_correct_size(self):
503 conn = self.connect_ssh(data['private_ip'], test_key)
504 stdin, stdout, stderr = conn.exec_command(
505 "df -h | grep %s | awk {'print $2'}" % DEVICE)
506 out = stdout.read()
507 conn.close()
508 if not out.strip() == '1007.9M':
509 self.fail('Volume is not the right size: %s %s' % (out, stderr.read()))
510
511 def test_006_me_can_umount_volume(self):
512 conn = self.connect_ssh(data['private_ip'], test_key)
513 stdin, stdout, stderr = conn.exec_command('umount /mnt/vol')
514 err = stderr.read()
515 conn.close()
516 if len(err) > 0:
517 self.fail('Unable to unmount: %s' % (err))
518
519 def test_007_me_can_detach_volume(self):
520 conn = self.connection_for(test_username)
521 self.assertTrue(conn.detach_volume(volume_id = data['volume_id']))
522
523 def test_008_me_can_delete_volume(self):
524 conn = self.connection_for(test_username)
525 self.assertTrue(conn.delete_volume(data['volume_id']))
526
527 def test_009_volume_size_must_be_int(self):
528 conn = self.connection_for(test_username)
529 self.assertRaises(Exception, conn.create_volume, 'foo', ZONE)
530
531 def test_999_tearDown(self):
532 global data
533 conn = self.connection_for(test_username)
534 self.delete_key_pair(conn, test_key)
535 if data.has_key('instance_id'):
536 conn.terminate_instances([data['instance_id']])
537 self.delete_user(test_username)
538 data = {}
539
540def build_suites():
541 return {
542 'user': unittest.makeSuite(UserTests),
543 'image': unittest.makeSuite(ImageTests),
544 'security': unittest.makeSuite(SecurityTests),
545 'public_network': unittest.makeSuite(ElasticIPTests),
546 'volume': unittest.makeSuite(VolumeTests),
547 }
548
549def main():
550 argv = FLAGS(sys.argv)
551 suites = build_suites()
552
553 if FLAGS.suite:
554 try:
555 suite = suites[FLAGS.suite]
556 except KeyError:
557 print >> sys.stderr, 'Available test suites:', SUITE_NAMES
558 return 1
559
560 unittest.TextTestRunner(verbosity=2).run(suite)
561 else:
562 for suite in suites.itervalues():
563 unittest.TextTestRunner(verbosity=2).run(suite)
564
565if __name__ == "__main__":
566 sys.exit(main())
5670
=== added file 'smoketests/user_smoketests.py'
--- smoketests/user_smoketests.py 1970-01-01 00:00:00 +0000
+++ smoketests/user_smoketests.py 2010-11-04 22:56:08 +0000
@@ -0,0 +1,326 @@
1# vim: tabstop=4 shiftwidth=4 softtabstop=4
2
3# Copyright 2010 United States Government as represented by the
4# Administrator of the National Aeronautics and Space Administration.
5# All Rights Reserved.
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may
8# not use this file except in compliance with the License. You may obtain
9# a copy of the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations
17# under the License.
18
19import commands
20import os
21import random
22import socket
23import sys
24import time
25import unittest
26
27from smoketests import flags
28from smoketests import base
29
30
31SUITE_NAMES = '[image, instance, volume]'
32
33FLAGS = flags.FLAGS
34flags.DEFINE_string('suite', None, 'Specific test suite to run ' + SUITE_NAMES)
35flags.DEFINE_string('bundle_kernel', 'openwrt-x86-vmlinuz',
36 'Local kernel file to use for bundling tests')
37flags.DEFINE_string('bundle_image', 'openwrt-x86-ext2.image',
38 'Local image file to use for bundling tests')
39
40TEST_PREFIX = 'test%s' % int (random.random()*1000000)
41TEST_BUCKET = '%s_bucket' % TEST_PREFIX
42TEST_KEY = '%s_key' % TEST_PREFIX
43TEST_DATA = {}
44
45
46class UserSmokeTestCase(base.SmokeTestCase):
47 def setUp(self):
48 global TEST_DATA
49 self.conn = self.connection_for_env()
50 self.data = TEST_DATA
51
52
53class ImageTests(UserSmokeTestCase):
54 def test_001_can_bundle_image(self):
55 self.assertTrue(self.bundle_image(FLAGS.bundle_image))
56
57 def test_002_can_upload_image(self):
58 self.assertTrue(self.upload_image(TEST_BUCKET, FLAGS.bundle_image))
59
60 def test_003_can_register_image(self):
61 image_id = self.conn.register_image('%s/%s.manifest.xml' %
62 (TEST_BUCKET, FLAGS.bundle_image))
63 self.assert_(image_id is not None)
64 self.data['image_id'] = image_id
65
66 def test_004_can_bundle_kernel(self):
67 self.assertTrue(self.bundle_image(FLAGS.bundle_kernel, kernel=True))
68
69 def test_005_can_upload_kernel(self):
70 self.assertTrue(self.upload_image(TEST_BUCKET, FLAGS.bundle_kernel))
71
72 def test_006_can_register_kernel(self):
73 kernel_id = self.conn.register_image('%s/%s.manifest.xml' %
74 (TEST_BUCKET, FLAGS.bundle_kernel))
75 self.assert_(kernel_id is not None)
76 self.data['kernel_id'] = kernel_id
77
78 def test_007_images_are_available_within_10_seconds(self):
79 for i in xrange(10):
80 image = self.conn.get_image(self.data['image_id'])
81 if image and image.state == 'available':
82 break
83 time.sleep(1)
84 else:
85 print image.state
86 self.assert_(False) # wasn't available within 10 seconds
87 self.assert_(image.type == 'machine')
88
89 for i in xrange(10):
90 kernel = self.conn.get_image(self.data['kernel_id'])
91 if kernel and kernel.state == 'available':
92 break
93 time.sleep(1)
94 else:
95 self.assert_(False) # wasn't available within 10 seconds
96 self.assert_(kernel.type == 'kernel')
97
98 def test_008_can_describe_image_attribute(self):
99 attrs = self.conn.get_image_attribute(self.data['image_id'],
100 'launchPermission')
101 self.assert_(attrs.name, 'launch_permission')
102
103 def test_009_can_modify_image_launch_permission(self):
104 self.conn.modify_image_attribute(image_id=self.data['image_id'],
105 operation='add',
106 attribute='launchPermission',
107 groups='all')
108 image = self.conn.get_image(self.data['image_id'])
109 self.assertEqual(image.id, self.data['image_id'])
110
111 def test_010_can_see_launch_permission(self):
112 attrs = self.conn.get_image_attribute(self.data['image_id'],
113 'launchPermission')
114 self.assert_(attrs.name, 'launch_permission')
115 self.assert_(attrs.attrs['groups'][0], 'all')
116
117 def test_011_user_can_deregister_kernel(self):
118 self.assertTrue(self.conn.deregister_image(self.data['kernel_id']))
119
120 def test_012_can_deregister_image(self):
121 self.assertTrue(self.conn.deregister_image(self.data['image_id']))
122
123 def test_013_can_delete_bundle(self):
124 self.assertTrue(self.delete_bundle_bucket(TEST_BUCKET))
125
126
127class InstanceTests(UserSmokeTestCase):
128 def test_001_can_create_keypair(self):
129 key = self.create_key_pair(self.conn, TEST_KEY)
130 self.assertEqual(key.name, TEST_KEY)
131
132 def test_002_can_create_instance_with_keypair(self):
133 reservation = self.conn.run_instances(FLAGS.test_image,
134 key_name=TEST_KEY,
135 instance_type='m1.tiny')
136 self.assertEqual(len(reservation.instances), 1)
137 self.data['instance_id'] = reservation.instances[0].id
138
139 def test_003_instance_runs_within_60_seconds(self):
140 reservations = self.conn.get_all_instances([data['instance_id']])
141 instance = reservations[0].instances[0]
142 # allow 60 seconds to exit pending with IP
143 for x in xrange(60):
144 instance.update()
145 if instance.state == u'running':
146 break
147 time.sleep(1)
148 else:
149 self.fail('instance failed to start')
150 ip = reservations[0].instances[0].private_dns_name
151 self.failIf(ip == '0.0.0.0')
152 self.data['private_ip'] = ip
153 print self.data['private_ip']
154
155 def test_004_can_ping_private_ip(self):
156 for x in xrange(120):
157 # ping waits for 1 second
158 status, output = commands.getstatusoutput(
159 'ping -c1 %s' % self.data['private_ip'])
160 if status == 0:
161 break
162 else:
163 self.fail('could not ping instance')
164
165 def test_005_can_ssh_to_private_ip(self):
166 for x in xrange(30):
167 try:
168 conn = self.connect_ssh(self.data['private_ip'], TEST_KEY)
169 conn.close()
170 except Exception:
171 time.sleep(1)
172 else:
173 break
174 else:
175 self.fail('could not ssh to instance')
176
177 def test_006_can_allocate_elastic_ip(self):
178 result = self.conn.allocate_address()
179 self.assertTrue(hasattr(result, 'public_ip'))
180 self.data['public_ip'] = result.public_ip
181
182 def test_007_can_associate_ip_with_instance(self):
183 result = self.conn.associate_address(self.data['instance_id'],
184 self.data['public_ip'])
185 self.assertTrue(result)
186
187 def test_008_can_ssh_with_public_ip(self):
188 for x in xrange(30):
189 try:
190 conn = self.connect_ssh(self.data['public_ip'], TEST_KEY)
191 conn.close()
192 except socket.error:
193 time.sleep(1)
194 else:
195 break
196 else:
197 self.fail('could not ssh to instance')
198
199 def test_009_can_disassociate_ip_from_instance(self):
200 result = self.conn.disassociate_address(self.data['public_ip'])
201 self.assertTrue(result)
202
203 def test_010_can_deallocate_elastic_ip(self):
204 result = self.conn.release_address(self.data['public_ip'])
205 self.assertTrue(result)
206
207 def test_999_tearDown(self):
208 self.delete_key_pair(self.conn, TEST_KEY)
209 if self.data.has_key('instance_id'):
210 self.conn.terminate_instances([data['instance_id']])
211
212
213class VolumeTests(UserSmokeTestCase):
214 def setUp(self):
215 super(VolumeTests, self).setUp()
216 self.device = '/dev/vdb'
217
218 def test_000_setUp(self):
219 self.create_key_pair(self.conn, TEST_KEY)
220 reservation = self.conn.run_instances(FLAGS.test_image,
221 instance_type='m1.tiny',
222 key_name=TEST_KEY)
223 instance = reservation.instances[0]
224 self.data['instance'] = instance
225 for x in xrange(120):
226 if self.can_ping(instance.private_dns_name):
227 break
228 else:
229 self.fail('unable to start instance')
230
231 def test_001_can_create_volume(self):
232 volume = self.conn.create_volume(1, 'nova')
233 self.assertEqual(volume.size, 1)
234 self.data['volume'] = volume
235 # Give network time to find volume.
236 time.sleep(5)
237
238 def test_002_can_attach_volume(self):
239 volume = self.data['volume']
240
241 for x in xrange(10):
242 if volume.status == u'available':
243 break
244 time.sleep(5)
245 volume.update()
246 else:
247 self.fail('cannot attach volume with state %s' % volume.status)
248
249 volume.attach(self.data['instance'].id, self.device)
250
251 # Volumes seems to report "available" too soon.
252 for x in xrange(10):
253 if volume.status == u'in-use':
254 break
255 time.sleep(5)
256 volume.update()
257
258 self.assertEqual(volume.status, u'in-use')
259
260 # Give instance time to recognize volume.
261 time.sleep(5)
262
263 def test_003_can_mount_volume(self):
264 ip = self.data['instance'].private_dns_name
265 conn = self.connect_ssh(ip, TEST_KEY)
266 commands = []
267 commands.append('mkdir -p /mnt/vol')
268 commands.append('mkfs.ext2 %s' % self.device)
269 commands.append('mount %s /mnt/vol' % self.device)
270 commands.append('echo success')
271 stdin, stdout, stderr = conn.exec_command(' && '.join(commands))
272 out = stdout.read()
273 conn.close()
274 if not out.strip().endswith('success'):
275 self.fail('Unable to mount: %s %s' % (out, stderr.read()))
276
277 def test_004_can_write_to_volume(self):
278 ip = self.data['instance'].private_dns_name
279 conn = self.connect_ssh(ip, TEST_KEY)
280 # FIXME(devcamcar): This doesn't fail if the volume hasn't been mounted
281 stdin, stdout, stderr = conn.exec_command(
282 'echo hello > /mnt/vol/test.txt')
283 err = stderr.read()
284 conn.close()
285 if len(err) > 0:
286 self.fail('Unable to write to mount: %s' % (err))
287
288 def test_005_volume_is_correct_size(self):
289 ip = self.data['instance'].private_dns_name
290 conn = self.connect_ssh(ip, TEST_KEY)
291 stdin, stdout, stderr = conn.exec_command(
292 "df -h | grep %s | awk {'print $2'}" % self.device)
293 out = stdout.read()
294 conn.close()
295 if not out.strip() == '1008M':
296 self.fail('Volume is not the right size: %s %s' %
297 (out, stderr.read()))
298
299 def test_006_me_can_umount_volume(self):
300 ip = self.data['instance'].private_dns_name
301 conn = self.connect_ssh(ip, TEST_KEY)
302 stdin, stdout, stderr = conn.exec_command('umount /mnt/vol')
303 err = stderr.read()
304 conn.close()
305 if len(err) > 0:
306 self.fail('Unable to unmount: %s' % (err))
307
308 def test_007_me_can_detach_volume(self):
309 result = self.conn.detach_volume(volume_id=self.data['volume'].id)
310 self.assertTrue(result)
311 time.sleep(5)
312
313 def test_008_me_can_delete_volume(self):
314 result = self.conn.delete_volume(self.data['volume'].id)
315 self.assertTrue(result)
316
317 def test_999_tearDown(self):
318 self.conn.terminate_instances([self.data['instance'].id])
319 self.conn.delete_key_pair(TEST_KEY)
320
321
322if __name__ == "__main__":
323 suites = {'image': unittest.makeSuite(ImageTests),
324 'instance': unittest.makeSuite(InstanceTests),
325 'volume': unittest.makeSuite(VolumeTests)}
326 sys.exit(base.run_tests(suites))