Merge lp:~dobey/ubuntuone-storage-protocol/validate-ssl-cert into lp:ubuntuone-storage-protocol

Proposed by dobey
Status: Merged
Approved by: Alejandro J. Cura
Approved revision: 152
Merged at revision: 148
Proposed branch: lp:~dobey/ubuntuone-storage-protocol/validate-ssl-cert
Merge into: lp:ubuntuone-storage-protocol
Diff against target: 262 lines (+227/-12)
2 files modified
tests/test_context.py (+182/-0)
ubuntuone/storageprotocol/context.py (+45/-12)
To merge this branch: bzr merge lp:~dobey/ubuntuone-storage-protocol/validate-ssl-cert
Reviewer Review Type Date Requested Status
Alejandro J. Cura (community) Approve
Diego Sarmentero (community) Approve
Review via email: mp+109362@code.launchpad.net

Commit message

Be more strict when validating the SSL certificate

To post a comment you must log in.
150. By dobey

Oops forgot the error module for CertificateError

151. By Alejandro J. Cura

Merge in the missing tests code

152. By dobey

Add a test to ensure get_ssl_context raises CertificateError without hostname

Revision history for this message
Diego Sarmentero (diegosarmentero) wrote :

+1

review: Approve
Revision history for this message
Alejandro J. Cura (alecu) wrote :

Code looks eerily familiar :-)
+1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'tests/test_context.py'
2--- tests/test_context.py 1970-01-01 00:00:00 +0000
3+++ tests/test_context.py 2012-06-08 14:09:20 +0000
4@@ -0,0 +1,182 @@
5+# tests.test_context - test ssl context creation
6+#
7+# Copyright 2012 Canonical Ltd.
8+#
9+# This program is free software: you can redistribute it and/or modify it
10+# under the terms of the GNU Affero General Public License version 3,
11+# as published by the Free Software Foundation.
12+#
13+# This program is distributed in the hope that it will be useful, but
14+# WITHOUT ANY WARRANTY; without even the implied warranties of
15+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16+# PURPOSE. See the GNU Affero General Public License for more details.
17+#
18+# You should have received a copy of the GNU Affero General Public License
19+# along with this program. If not, see <http://www.gnu.org/licenses/>.
20+#
21+# In addition, as a special exception, the copyright holders give
22+# permission to link the code of portions of this program with the
23+# OpenSSL library under certain conditions as described in each
24+# individual source file, and distribute linked combinations
25+# including the two.
26+# You must obey the GNU General Public License in all respects
27+# for all of the code used other than OpenSSL. If you modify
28+# file(s) with this exception, you may extend this exception to your
29+# version of the file(s), but you are not obligated to do so. If you
30+# do not wish to do so, delete this exception statement from your
31+# version. If you delete this exception statement from all source
32+# files in the program, then also delete it here.
33+
34+import os
35+
36+from OpenSSL import crypto, SSL
37+from twisted.internet import defer, error, reactor, ssl
38+from twisted.trial import unittest
39+from twisted.web import client, resource, server
40+
41+from ubuntuone.storageprotocol import context
42+
43+
44+class FakeCerts(object):
45+ """CA and Server certificate."""
46+
47+ def __init__(self, testcase, common_name="fake.domain"):
48+ """Initialize this fake instance."""
49+ self.cert_dir = os.path.join(testcase.mktemp(), 'certs')
50+ if not os.path.exists(self.cert_dir):
51+ os.makedirs(self.cert_dir)
52+
53+ ca_key = self._build_key()
54+ ca_req = self._build_request(ca_key, "Fake Cert Authority")
55+ self.ca_cert = self._build_cert(ca_req, ca_req, ca_key)
56+
57+ server_key = self._build_key()
58+ server_req = self._build_request(server_key, common_name)
59+ server_cert = self._build_cert(server_req, self.ca_cert, ca_key)
60+
61+ self.server_key_path = self._save_key(server_key, "server_key.pem")
62+ self.server_cert_path = self._save_cert(server_cert, "server_cert.pem")
63+
64+ def _save_key(self, key, filename):
65+ """Save a certificate."""
66+ data = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
67+ return self._save(filename, data)
68+
69+ def _save_cert(self, cert, filename):
70+ """Save a certificate."""
71+ data = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
72+ return self._save(filename, data)
73+
74+ def _save(self, filename, data):
75+ """Save a key or certificate, and return the full path."""
76+ fullpath = os.path.join(self.cert_dir, filename)
77+ if os.path.exists(fullpath):
78+ os.unlink(fullpath)
79+ with open(fullpath, 'wt') as fd:
80+ fd.write(data)
81+ return fullpath
82+
83+ def _build_key(self):
84+ """Create a private/public key, save it in a temp dir."""
85+ key = crypto.PKey()
86+ key.generate_key(crypto.TYPE_RSA, 1024)
87+ return key
88+
89+ def _build_request(self, key, common_name):
90+ """Create a new certificate request."""
91+ request = crypto.X509Req()
92+ request.get_subject().CN = common_name
93+ request.set_pubkey(key)
94+ request.sign(key, "md5")
95+ return request
96+
97+ def _build_cert(self, request, ca_cert, ca_key):
98+ """Create a new certificate."""
99+ certificate = crypto.X509()
100+ certificate.set_serial_number(1)
101+ certificate.set_issuer(ca_cert.get_subject())
102+ certificate.set_subject(request.get_subject())
103+ certificate.set_pubkey(request.get_pubkey())
104+ certificate.gmtime_adj_notBefore(0)
105+ certificate.gmtime_adj_notAfter(3600) # valid for one hour
106+ certificate.sign(ca_key, "md5")
107+ return certificate
108+
109+
110+class FakeResource(resource.Resource):
111+ """A fake resource."""
112+
113+ isLeaf = True
114+
115+ def render(self, request):
116+ """Render this resource."""
117+ return "ok"
118+
119+
120+class SSLContextTestCase(unittest.TestCase):
121+ """Tests for the context.get_ssl_context function."""
122+
123+ @defer.inlineCallbacks
124+ def verify_context(self, server_context, client_context):
125+ """Verify a client context with a given server context."""
126+ site = server.Site(FakeResource())
127+ port = reactor.listenSSL(0, site, server_context)
128+ self.addCleanup(port.stopListening)
129+ url = "https://localhost:%d" % port.getHost().port
130+ result = yield client.getPage(url, contextFactory=client_context)
131+ self.assertEqual(result, "ok")
132+
133+ @defer.inlineCallbacks
134+ def test_no_verify(self):
135+ """Test the no_verify option."""
136+ certs = FakeCerts(self, "localhost")
137+ server_context = ssl.DefaultOpenSSLContextFactory(
138+ certs.server_key_path, certs.server_cert_path)
139+ client_context = context.get_ssl_context(no_verify=True,
140+ hostname="localhost")
141+
142+ yield self.verify_context(server_context, client_context)
143+
144+ def test_no_hostname(self):
145+ """Test that calling without hostname arg raises proper error."""
146+ self.assertRaises(error.CertificateError,
147+ context.get_ssl_context, False)
148+
149+ @defer.inlineCallbacks
150+ def test_fails_certificate(self):
151+ """A wrong certificate is rejected."""
152+ certs = FakeCerts(self, "localhost")
153+ server_context = ssl.DefaultOpenSSLContextFactory(
154+ certs.server_key_path, certs.server_cert_path)
155+ client_context = context.get_ssl_context(no_verify=False,
156+ hostname="localhost")
157+
158+ d = self.verify_context(server_context, client_context)
159+ e = yield self.assertFailure(d, SSL.Error)
160+ self.assertEqual(e[0][0][1], "SSL3_GET_SERVER_CERTIFICATE")
161+
162+ @defer.inlineCallbacks
163+ def test_fails_hostname(self):
164+ """A wrong hostname is rejected."""
165+ certs = FakeCerts(self, "thisiswronghost.net")
166+ server_context = ssl.DefaultOpenSSLContextFactory(
167+ certs.server_key_path, certs.server_cert_path)
168+ self.patch(context, "get_certificates", lambda: [certs.ca_cert])
169+ client_context = context.get_ssl_context(no_verify=False,
170+ hostname="localhost")
171+
172+ d = self.verify_context(server_context, client_context)
173+ e = yield self.assertFailure(d, SSL.Error)
174+ self.assertEqual(e[0][0][1], "SSL3_GET_SERVER_CERTIFICATE")
175+
176+ @defer.inlineCallbacks
177+ def test_matches_all(self):
178+ """A valid certificate passes checks."""
179+ certs = FakeCerts(self, "localhost")
180+ server_context = ssl.DefaultOpenSSLContextFactory(
181+ certs.server_key_path, certs.server_cert_path)
182+ self.patch(context, "get_certificates", lambda: [certs.ca_cert])
183+ client_context = context.get_ssl_context(no_verify=False,
184+ hostname="localhost")
185+
186+ yield self.verify_context(server_context, client_context)
187
188=== modified file 'ubuntuone/storageprotocol/context.py'
189--- ubuntuone/storageprotocol/context.py 2012-03-29 20:28:09 +0000
190+++ ubuntuone/storageprotocol/context.py 2012-06-08 14:09:20 +0000
191@@ -33,7 +33,8 @@
192 import sys
193
194 from OpenSSL import SSL
195-from twisted.internet import ssl
196+from twisted.internet import error, ssl
197+from twisted.python import log
198
199 if sys.platform == "win32":
200 # diable pylint warning, as it may be the wrong platform
201@@ -58,18 +59,50 @@
202 ssl_cert_location = '/etc/ssl/certs'
203
204
205-def get_ssl_context(no_verify):
206- """ Get the ssl context """
207+class HostnameVerifyContextFactory(ssl.CertificateOptions):
208+ """Does hostname checks in addition to certificate checks."""
209+
210+ def __init__(self, hostname, *args, **kwargs):
211+ """Initialize this instance."""
212+ super(HostnameVerifyContextFactory, self).__init__(*args, **kwargs)
213+ self.expected_hostname = hostname
214+
215+ def verify_server_hostname(self, conn, cert, errno, depth, preverifyOK):
216+ """Verify the server hostname."""
217+ if depth == 0:
218+ # No extra checks because U1 certs have the right commonName
219+ if self.expected_hostname != cert.get_subject().commonName:
220+ log.err("Host name does not match certificate. "
221+ "Expected %s but got %s." % (self.expected_hostname,
222+ cert.get_subject().commonName))
223+ return False
224+ return preverifyOK
225+
226+ def getContext(self):
227+ """The context returned will verify the hostname too."""
228+ ctx = super(HostnameVerifyContextFactory, self).getContext()
229+ flags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
230+ ctx.set_verify(flags, self.verify_server_hostname)
231+ return ctx
232+
233+
234+def get_certificates():
235+ """Get a list of certificate paths."""
236+ ca_file = ssl.Certificate.loadPEM(file(os.path.join(ssl_cert_location,
237+ 'UbuntuOne-Go_Daddy_Class_2_CA.pem'), 'r').read())
238+ ca_file_2 = ssl.Certificate.loadPEM(file(os.path.join(ssl_cert_location,
239+ 'UbuntuOne-Go_Daddy_CA.pem'), 'r').read())
240+ return [ca_file.original, ca_file_2.original]
241+
242+
243+def get_ssl_context(no_verify, hostname=None):
244+ """Get the ssl context."""
245 if no_verify:
246 ctx = ssl.ClientContextFactory()
247 else:
248- ca_file = ssl.Certificate.loadPEM(file(
249- os.path.join(ssl_cert_location,
250- 'UbuntuOne-Go_Daddy_Class_2_CA.pem'), 'r').read())
251- ca_file_2 = ssl.Certificate.loadPEM(file(
252- os.path.join(ssl_cert_location,
253- 'UbuntuOne-Go_Daddy_CA.pem'), 'r').read())
254- ctx = ssl.CertificateOptions(verify=True,
255- caCerts=[ca_file.original, ca_file_2.original],
256- method=SSL.SSLv23_METHOD)
257+ if hostname is None:
258+ raise error.CertificateError(
259+ 'No hostname specified. Unable to verify SSL certificate.')
260+ ctx = HostnameVerifyContextFactory(hostname, verify=True,
261+ caCerts=get_certificates(), method=SSL.SSLv23_METHOD)
262 return ctx

Subscribers

People subscribed via source and target branches