Merge lp:~dobey/ubuntuone-storage-protocol/validate-ssl-cert into lp:ubuntuone-storage-protocol

Proposed by dobey
Status: Merged
Approved by: Alejandro J. Cura
Approved revision: 152
Merged at revision: 148
Proposed branch: lp:~dobey/ubuntuone-storage-protocol/validate-ssl-cert
Merge into: lp:ubuntuone-storage-protocol
Diff against target: 262 lines (+227/-12)
2 files modified
tests/test_context.py (+182/-0)
ubuntuone/storageprotocol/context.py (+45/-12)
To merge this branch: bzr merge lp:~dobey/ubuntuone-storage-protocol/validate-ssl-cert
Reviewer Review Type Date Requested Status
Alejandro J. Cura (community) Approve
Diego Sarmentero (community) Approve
Review via email: mp+109362@code.launchpad.net

Commit message

Be more strict when validating the SSL certificate

To post a comment you must log in.
150. By dobey

Oops forgot the error module for CertificateError

151. By Alejandro J. Cura

Merge in the missing tests code

152. By dobey

Add a test to ensure get_ssl_context raises CertificateError without hostname

Revision history for this message
Diego Sarmentero (diegosarmentero) wrote :

+1

review: Approve
Revision history for this message
Alejandro J. Cura (alecu) wrote :

Code looks eerily familiar :-)
+1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'tests/test_context.py'
--- tests/test_context.py 1970-01-01 00:00:00 +0000
+++ tests/test_context.py 2012-06-08 14:09:20 +0000
@@ -0,0 +1,182 @@
1# tests.test_context - test ssl context creation
2#
3# Copyright 2012 Canonical Ltd.
4#
5# This program is free software: you can redistribute it and/or modify it
6# under the terms of the GNU Affero General Public License version 3,
7# as published by the Free Software Foundation.
8#
9# This program is distributed in the hope that it will be useful, but
10# WITHOUT ANY WARRANTY; without even the implied warranties of
11# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12# PURPOSE. See the GNU Affero General Public License for more details.
13#
14# You should have received a copy of the GNU Affero General Public License
15# along with this program. If not, see <http://www.gnu.org/licenses/>.
16#
17# In addition, as a special exception, the copyright holders give
18# permission to link the code of portions of this program with the
19# OpenSSL library under certain conditions as described in each
20# individual source file, and distribute linked combinations
21# including the two.
22# You must obey the GNU General Public License in all respects
23# for all of the code used other than OpenSSL. If you modify
24# file(s) with this exception, you may extend this exception to your
25# version of the file(s), but you are not obligated to do so. If you
26# do not wish to do so, delete this exception statement from your
27# version. If you delete this exception statement from all source
28# files in the program, then also delete it here.
29
30import os
31
32from OpenSSL import crypto, SSL
33from twisted.internet import defer, error, reactor, ssl
34from twisted.trial import unittest
35from twisted.web import client, resource, server
36
37from ubuntuone.storageprotocol import context
38
39
40class FakeCerts(object):
41 """CA and Server certificate."""
42
43 def __init__(self, testcase, common_name="fake.domain"):
44 """Initialize this fake instance."""
45 self.cert_dir = os.path.join(testcase.mktemp(), 'certs')
46 if not os.path.exists(self.cert_dir):
47 os.makedirs(self.cert_dir)
48
49 ca_key = self._build_key()
50 ca_req = self._build_request(ca_key, "Fake Cert Authority")
51 self.ca_cert = self._build_cert(ca_req, ca_req, ca_key)
52
53 server_key = self._build_key()
54 server_req = self._build_request(server_key, common_name)
55 server_cert = self._build_cert(server_req, self.ca_cert, ca_key)
56
57 self.server_key_path = self._save_key(server_key, "server_key.pem")
58 self.server_cert_path = self._save_cert(server_cert, "server_cert.pem")
59
60 def _save_key(self, key, filename):
61 """Save a certificate."""
62 data = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
63 return self._save(filename, data)
64
65 def _save_cert(self, cert, filename):
66 """Save a certificate."""
67 data = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
68 return self._save(filename, data)
69
70 def _save(self, filename, data):
71 """Save a key or certificate, and return the full path."""
72 fullpath = os.path.join(self.cert_dir, filename)
73 if os.path.exists(fullpath):
74 os.unlink(fullpath)
75 with open(fullpath, 'wt') as fd:
76 fd.write(data)
77 return fullpath
78
79 def _build_key(self):
80 """Create a private/public key, save it in a temp dir."""
81 key = crypto.PKey()
82 key.generate_key(crypto.TYPE_RSA, 1024)
83 return key
84
85 def _build_request(self, key, common_name):
86 """Create a new certificate request."""
87 request = crypto.X509Req()
88 request.get_subject().CN = common_name
89 request.set_pubkey(key)
90 request.sign(key, "md5")
91 return request
92
93 def _build_cert(self, request, ca_cert, ca_key):
94 """Create a new certificate."""
95 certificate = crypto.X509()
96 certificate.set_serial_number(1)
97 certificate.set_issuer(ca_cert.get_subject())
98 certificate.set_subject(request.get_subject())
99 certificate.set_pubkey(request.get_pubkey())
100 certificate.gmtime_adj_notBefore(0)
101 certificate.gmtime_adj_notAfter(3600) # valid for one hour
102 certificate.sign(ca_key, "md5")
103 return certificate
104
105
106class FakeResource(resource.Resource):
107 """A fake resource."""
108
109 isLeaf = True
110
111 def render(self, request):
112 """Render this resource."""
113 return "ok"
114
115
116class SSLContextTestCase(unittest.TestCase):
117 """Tests for the context.get_ssl_context function."""
118
119 @defer.inlineCallbacks
120 def verify_context(self, server_context, client_context):
121 """Verify a client context with a given server context."""
122 site = server.Site(FakeResource())
123 port = reactor.listenSSL(0, site, server_context)
124 self.addCleanup(port.stopListening)
125 url = "https://localhost:%d" % port.getHost().port
126 result = yield client.getPage(url, contextFactory=client_context)
127 self.assertEqual(result, "ok")
128
129 @defer.inlineCallbacks
130 def test_no_verify(self):
131 """Test the no_verify option."""
132 certs = FakeCerts(self, "localhost")
133 server_context = ssl.DefaultOpenSSLContextFactory(
134 certs.server_key_path, certs.server_cert_path)
135 client_context = context.get_ssl_context(no_verify=True,
136 hostname="localhost")
137
138 yield self.verify_context(server_context, client_context)
139
140 def test_no_hostname(self):
141 """Test that calling without hostname arg raises proper error."""
142 self.assertRaises(error.CertificateError,
143 context.get_ssl_context, False)
144
145 @defer.inlineCallbacks
146 def test_fails_certificate(self):
147 """A wrong certificate is rejected."""
148 certs = FakeCerts(self, "localhost")
149 server_context = ssl.DefaultOpenSSLContextFactory(
150 certs.server_key_path, certs.server_cert_path)
151 client_context = context.get_ssl_context(no_verify=False,
152 hostname="localhost")
153
154 d = self.verify_context(server_context, client_context)
155 e = yield self.assertFailure(d, SSL.Error)
156 self.assertEqual(e[0][0][1], "SSL3_GET_SERVER_CERTIFICATE")
157
158 @defer.inlineCallbacks
159 def test_fails_hostname(self):
160 """A wrong hostname is rejected."""
161 certs = FakeCerts(self, "thisiswronghost.net")
162 server_context = ssl.DefaultOpenSSLContextFactory(
163 certs.server_key_path, certs.server_cert_path)
164 self.patch(context, "get_certificates", lambda: [certs.ca_cert])
165 client_context = context.get_ssl_context(no_verify=False,
166 hostname="localhost")
167
168 d = self.verify_context(server_context, client_context)
169 e = yield self.assertFailure(d, SSL.Error)
170 self.assertEqual(e[0][0][1], "SSL3_GET_SERVER_CERTIFICATE")
171
172 @defer.inlineCallbacks
173 def test_matches_all(self):
174 """A valid certificate passes checks."""
175 certs = FakeCerts(self, "localhost")
176 server_context = ssl.DefaultOpenSSLContextFactory(
177 certs.server_key_path, certs.server_cert_path)
178 self.patch(context, "get_certificates", lambda: [certs.ca_cert])
179 client_context = context.get_ssl_context(no_verify=False,
180 hostname="localhost")
181
182 yield self.verify_context(server_context, client_context)
0183
=== modified file 'ubuntuone/storageprotocol/context.py'
--- ubuntuone/storageprotocol/context.py 2012-03-29 20:28:09 +0000
+++ ubuntuone/storageprotocol/context.py 2012-06-08 14:09:20 +0000
@@ -33,7 +33,8 @@
33import sys33import sys
3434
35from OpenSSL import SSL35from OpenSSL import SSL
36from twisted.internet import ssl36from twisted.internet import error, ssl
37from twisted.python import log
3738
38if sys.platform == "win32":39if sys.platform == "win32":
39 # diable pylint warning, as it may be the wrong platform40 # diable pylint warning, as it may be the wrong platform
@@ -58,18 +59,50 @@
58 ssl_cert_location = '/etc/ssl/certs'59 ssl_cert_location = '/etc/ssl/certs'
5960
6061
61def get_ssl_context(no_verify):62class HostnameVerifyContextFactory(ssl.CertificateOptions):
62 """ Get the ssl context """63 """Does hostname checks in addition to certificate checks."""
64
65 def __init__(self, hostname, *args, **kwargs):
66 """Initialize this instance."""
67 super(HostnameVerifyContextFactory, self).__init__(*args, **kwargs)
68 self.expected_hostname = hostname
69
70 def verify_server_hostname(self, conn, cert, errno, depth, preverifyOK):
71 """Verify the server hostname."""
72 if depth == 0:
73 # No extra checks because U1 certs have the right commonName
74 if self.expected_hostname != cert.get_subject().commonName:
75 log.err("Host name does not match certificate. "
76 "Expected %s but got %s." % (self.expected_hostname,
77 cert.get_subject().commonName))
78 return False
79 return preverifyOK
80
81 def getContext(self):
82 """The context returned will verify the hostname too."""
83 ctx = super(HostnameVerifyContextFactory, self).getContext()
84 flags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
85 ctx.set_verify(flags, self.verify_server_hostname)
86 return ctx
87
88
89def get_certificates():
90 """Get a list of certificate paths."""
91 ca_file = ssl.Certificate.loadPEM(file(os.path.join(ssl_cert_location,
92 'UbuntuOne-Go_Daddy_Class_2_CA.pem'), 'r').read())
93 ca_file_2 = ssl.Certificate.loadPEM(file(os.path.join(ssl_cert_location,
94 'UbuntuOne-Go_Daddy_CA.pem'), 'r').read())
95 return [ca_file.original, ca_file_2.original]
96
97
98def get_ssl_context(no_verify, hostname=None):
99 """Get the ssl context."""
63 if no_verify:100 if no_verify:
64 ctx = ssl.ClientContextFactory()101 ctx = ssl.ClientContextFactory()
65 else:102 else:
66 ca_file = ssl.Certificate.loadPEM(file(103 if hostname is None:
67 os.path.join(ssl_cert_location,104 raise error.CertificateError(
68 'UbuntuOne-Go_Daddy_Class_2_CA.pem'), 'r').read())105 'No hostname specified. Unable to verify SSL certificate.')
69 ca_file_2 = ssl.Certificate.loadPEM(file(106 ctx = HostnameVerifyContextFactory(hostname, verify=True,
70 os.path.join(ssl_cert_location,107 caCerts=get_certificates(), method=SSL.SSLv23_METHOD)
71 'UbuntuOne-Go_Daddy_CA.pem'), 'r').read())
72 ctx = ssl.CertificateOptions(verify=True,
73 caCerts=[ca_file.original, ca_file_2.original],
74 method=SSL.SSLv23_METHOD)
75 return ctx108 return ctx

Subscribers

People subscribed via source and target branches