Merge lp:~mandel/ubuntu-sso-client/pinned-certs into lp:ubuntu-sso-client

Proposed by Manuel de la Peña
Status: Rejected
Rejected by: Manuel de la Peña
Proposed branch: lp:~mandel/ubuntu-sso-client/pinned-certs
Merge into: lp:ubuntu-sso-client
Prerequisite: lp:~mandel/ubuntu-sso-client/libsoup-ssl-dialog
Diff against target: 480 lines (+304/-51)
6 files modified
ubuntu_sso/utils/webclient/common.py (+23/-10)
ubuntu_sso/utils/webclient/libsoup.py (+3/-13)
ubuntu_sso/utils/webclient/qtnetwork.py (+2/-22)
ubuntu_sso/utils/webclient/ssl_certs.py (+105/-0)
ubuntu_sso/utils/webclient/tests/test_ssl_certs.py (+151/-0)
ubuntu_sso/utils/webclient/tests/test_webclient.py (+20/-6)
To merge this branch: bzr merge lp:~mandel/ubuntu-sso-client/pinned-certs
Reviewer Review Type Date Requested Status
Manuel de la Peña (community) Disapprove
dobey (community) Approve
Roberto Alsina (community) Approve
Review via email: mp+97486@code.launchpad.net

Commit message

- Added a ssl cert vault that will allow to remember pinned ssl certificates (LP: #955339)

Description of the change

- Added a ssl cert vault that will allow to remember pinned ssl certificates (LP: #955339)

To post a comment you must log in.
Revision history for this message
Roberto Alsina (ralsina) wrote :

Looks good to me, but testing this IRL is a pain.

review: Approve
Revision history for this message
dobey (dobey) wrote :

Looks ok.

review: Approve
Revision history for this message
Manuel de la Peña (mandel) wrote :

Rejecting since we won't use this code.

review: Disapprove

Unmerged revisions

952. By Manuel de la Peña

Remove unused function.

951. By Manuel de la Peña

Merged libsoup-ssl-dialog into pinned-certs.

950. By Manuel de la Peña

Merged libsoup-ssl-dialog into pinned-certs.

949. By Manuel de la Peña

Added the required code to store the pinned certificates in order to not promp the ssl dialog constantly.

948. By Manuel de la Peña

Pass the cert pem from the child WebClient implementations to the base class.

947. By Manuel de la Peña

Reduced diff size.

946. By Manuel de la Peña

Reduced diff size.

945. By Manuel de la Peña

Merged with trunk.

944. By Manuel de la Peña

Added the required code to let the libsoup webclient implementation deal with ssl cert errors.

943. By Manuel de la Peña

Made changes to the tests so that they work correctly.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ubuntu_sso/utils/webclient/common.py'
2--- ubuntu_sso/utils/webclient/common.py 2012-03-15 11:43:19 +0000
3+++ ubuntu_sso/utils/webclient/common.py 2012-03-15 11:43:19 +0000
4@@ -21,6 +21,7 @@
5
6 from httplib2 import iri2uri
7 from oauth import oauth
8+from OpenSSL.crypto import load_certificate, FILETYPE_PEM
9 from twisted.internet import defer
10 from urlparse import urlparse
11
12@@ -28,6 +29,8 @@
13 from ubuntu_sso.logger import setup_logging
14 from ubuntu_sso.utils.runner import spawn_program
15 from ubuntu_sso.utils.ui import SSL_DETAILS_TEMPLATE
16+from ubuntu_sso.utils.ui import SSL_DETAILS_TEMPLATE
17+from ubuntu_sso.utils.webclient.ssl_certs import PinnedSSLCertsVault
18 from ubuntu_sso.utils.webclient.timestamp import TimestampChecker
19
20 SSL_DIALOG = 'ubuntu-sso-ssl-certificate-qt'
21@@ -109,6 +112,7 @@
22 self.proxy_username = None
23 self.proxy_password = None
24 self.oauth_sign_plain = oauth_sign_plain
25+ self.ssl_vault = PinnedSSLCertsVault()
26
27 def request(self, iri, method="GET", extra_headers=None,
28 oauth_credentials=None, post_content=None):
29@@ -238,9 +242,16 @@
30 return_code)
31 defer.returnValue(False)
32
33- def format_ssl_details(self, details):
34+ def get_ssl_subject_details(self, cert):
35 """Return a formatted string with the details."""
36- return SSL_DETAILS_TEMPLATE % details
37+ cert_subject = cert.get_subject()
38+ details = dict(organization=cert_subject.O,
39+ common_name=cert_subject.CN,
40+ locality_name=cert_subject.L,
41+ unit=cert_subject.OU,
42+ country_name=cert_subject.C,
43+ state_name=cert_subject.ST)
44+ return details
45
46 def _launch_ssl_dialog(self, domain, details):
47 """Launch a dialog used to approve the ssl cert."""
48@@ -252,16 +263,18 @@
49 '--appname', self.appname)
50 return spawn_program(args)
51
52- def _was_ssl_accepted(self, cert_details):
53- """Return if the cert was already accepted."""
54- # TODO: Ensure that we look at pinned certs in a following branch
55- return False
56-
57 @defer.inlineCallbacks
58- def request_ssl_cert_approval(self, domain, details):
59+ def request_ssl_cert_approval(self, cert_pem):
60 """Request the user for ssl approval."""
61- if self._was_ssl_accepted(details):
62+ cert = load_certificate(FILETYPE_PEM, cert_pem)
63+ if self.ssl_vault.is_certificate_pinned(cert):
64 defer.returnValue(True)
65
66- return_code = yield self._launch_ssl_dialog(domain, details)
67+ subject_details = self.get_ssl_subject_details(cert)
68+ details = SSL_DETAILS_TEMPLATE % subject_details
69+ return_code = yield self._launch_ssl_dialog(
70+ subject_details['common_name'],
71+ details)
72+ if return_code == USER_SUCCESS:
73+ self.ssl_vault.store_certificate(cert)
74 defer.returnValue(return_code == USER_SUCCESS)
75
76=== modified file 'ubuntu_sso/utils/webclient/libsoup.py'
77--- ubuntu_sso/utils/webclient/libsoup.py 2012-03-15 11:43:19 +0000
78+++ ubuntu_sso/utils/webclient/libsoup.py 2012-03-15 11:43:19 +0000
79@@ -17,7 +17,6 @@
80
81 import httplib
82
83-from OpenSSL.crypto import load_certificate, FILETYPE_PEM
84 from twisted.internet import defer
85
86 from ubuntu_sso.logger import setup_logging
87@@ -67,17 +66,9 @@
88 if message.status_code == httplib.OK:
89 response = self._get_response(message)
90 if is_ssl and errors.real != 0:
91- cert = load_certificate(FILETYPE_PEM,
92- raw_cert.props.certificate_pem)
93- cert_subject = cert.get_subject()
94- ssl_details = dict(organization=cert_subject.O,
95- common_name=cert_subject.CN,
96- locality_name=cert_subject.L,
97- unit=cert_subject.OU,
98- country_name=cert_subject.C,
99- state_name=cert_subject.ST)
100 e = SSLCertError(message.reason_phrase,
101- ssl_details=ssl_details, response=response)
102+ ssl_details=raw_cert.props.certificate_pem,
103+ response=response)
104 d.errback(e)
105 else:
106 d.callback(response)
107@@ -136,8 +127,7 @@
108 failure.trap(SSLCertError)
109 logger.debug('SSL cert errors found.')
110 trust_cert = yield self.request_ssl_cert_approval(
111- failure.value.ssl_details['common_name'],
112- self.format_ssl_details(failure.value.ssl_details))
113+ failure.value.ssl_details)
114 if trust_cert:
115 defer.returnValue(failure.value.response)
116
117
118=== modified file 'ubuntu_sso/utils/webclient/qtnetwork.py'
119--- ubuntu_sso/utils/webclient/qtnetwork.py 2012-03-09 12:04:31 +0000
120+++ ubuntu_sso/utils/webclient/qtnetwork.py 2012-03-15 11:43:19 +0000
121@@ -30,7 +30,6 @@
122 QNetworkProxyFactory,
123 QNetworkReply,
124 QNetworkRequest,
125- QSslCertificate,
126 )
127 from twisted.internet import defer
128
129@@ -266,31 +265,12 @@
130 exception = WebClientError(error_string, content)
131 d.errback(exception)
132
133- def _get_certificate_details(self, cert):
134- """Return an string with the details of the certificate."""
135- detail_titles = {QSslCertificate.Organization: 'organization',
136- QSslCertificate.CommonName: 'common_name',
137- QSslCertificate.LocalityName: 'locality_name',
138- QSslCertificate.OrganizationalUnitName: 'unit',
139- QSslCertificate.CountryName: 'country_name',
140- QSslCertificate.StateOrProvinceName: 'state_name'}
141- details = {}
142- for info, title in detail_titles.iteritems():
143- details[title] = str(cert.issuerInfo(info))
144- return self.format_ssl_details(details)
145-
146- def _get_certificate_host(self, cert):
147- """Return the host of the cert."""
148- return str(cert.issuerInfo(QSslCertificate.CommonName))
149-
150 @defer.inlineCallbacks
151 def _handle_ssl_errors(self, reply, errors):
152 """Handle the case in which we got an ssl error."""
153 # ask the user if the cer should be trusted
154- cert = errors[0].certificate()
155- trust_cert = yield self.request_ssl_cert_approval(
156- self._get_certificate_host(cert),
157- self._get_certificate_details(cert))
158+ cert_pem = errors[0].certificate().toPem()
159+ trust_cert = yield self.request_ssl_cert_approval(cert_pem)
160 if trust_cert:
161 reply.ignoreSslErrors()
162
163
164=== added file 'ubuntu_sso/utils/webclient/ssl_certs.py'
165--- ubuntu_sso/utils/webclient/ssl_certs.py 1970-01-01 00:00:00 +0000
166+++ ubuntu_sso/utils/webclient/ssl_certs.py 2012-03-15 11:43:19 +0000
167@@ -0,0 +1,105 @@
168+# -*- coding: utf-8 -*-
169+#
170+# Copyright 2011 Canonical Ltd.
171+#
172+# This program is free software: you can redistribute it and/or modify it
173+# under the terms of the GNU General Public License version 3, as published
174+# by the Free Software Foundation.
175+#
176+# This program is distributed in the hope that it will be useful, but
177+# WITHOUT ANY WARRANTY; without even the implied warranties of
178+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
179+# PURPOSE. See the GNU General Public License for more details.
180+#
181+# You should have received a copy of the GNU General Public License along
182+# with this program. If not, see <http://www.gnu.org/licenses/>.
183+"""Store and retrieve certificates that have been pinned."""
184+
185+import base64
186+import os
187+import tempfile
188+
189+from ubuntu_sso import xdg_base_directory
190+from ubuntu_sso.logger import setup_logging
191+
192+PINNED_CERT_LINE_FORMAT = ('%(domain)s:%(port)s\t%(hash_algorithm)s\t'
193+ '%(fingerprint)s\t%(override_type)s\t'
194+ '%(serial_number)s\n')
195+HTTPS_PORT = 443
196+HASH_ALGORITHM = ('sha1', 'OID.2.16.840.1.101.3.4.2.1')
197+DEFAULT_OVERRIDE_TYPE = 'MUT'
198+PINNED_CERT_DIR = os.path.join(xdg_base_directory.xdg_cache_home, 'sso',
199+ 'ssl-certs')
200+
201+logger = setup_logging("ubuntu_sso.utils.webclient.ssl_certs")
202+
203+
204+class PinnedSSLCertsVault(object):
205+ """Represents a vault that will store the pinned certs."""
206+
207+ def __init__(self, certs_path=PINNED_CERT_DIR):
208+ """Create a new instance."""
209+ self.certs_path = certs_path
210+ if not os.path.exists(self.certs_path):
211+ os.makedirs(self.certs_path)
212+
213+ def _get_pinned_certificate_line(self, cert):
214+ """Return the line of a pinned cert.
215+
216+ The following format is used:
217+ Fields are separated by a tab character. Each line is terminated
218+ by a line feed character (UNIX format).
219+
220+ 1. domainname:port : port 443 for HTTPS (SSL)
221+ 2. hash algorithm OID:
222+ SHA1-256: OID.2.16.840.1.101.3.4.2.1 (most used)
223+ SHA-384: OID.2.16.840.1.101.3.4.2.2
224+ SHA-512: OID.2.16.840.1.101.3.4.2.3
225+ 3. Certificate fingerprint using previous hash algorithm
226+ 4. One or more characters for override type:
227+ M : allow mismatches in the hostname
228+ U : allow untrusted certs (whether it's self signed cert or
229+ a missing or invalid issuer cert)
230+ T : allow errors in the validity time, for example, for expired or
231+ not yet valid certs
232+ 5. Certificate's serial number and the issuer name as a base64
233+ encoded string
234+
235+ more details can be found at:
236+ https://developer.mozilla.org/En/Cert_override.txt
237+ """
238+ cert_subject = cert.get_subject()
239+ serial_number = '%s@%s' % (cert.get_serial_number(), cert.get_issuer())
240+ details = dict(domain=cert_subject.CN,
241+ port=HTTPS_PORT,
242+ hash_algorithm=HASH_ALGORITHM[1],
243+ fingerprint=cert.digest(HASH_ALGORITHM[0]),
244+ override_type=DEFAULT_OVERRIDE_TYPE,
245+ serial_number=base64.b64encode(serial_number))
246+ return PINNED_CERT_LINE_FORMAT % details
247+
248+ def _get_cert_file_name(self, cert):
249+ """Return the file name to use with the cert."""
250+ fingerprint = cert.digest(HASH_ALGORITHM[0])
251+ return fingerprint.replace(':', '-')
252+
253+ def store_certificate(self, cert):
254+ """Store a new certificate."""
255+ _, temp_path = tempfile.mkstemp(prefix='tmp-', dir=self.certs_path)
256+ with open(temp_path, 'wb') as fd:
257+ fd.write(self._get_pinned_certificate_line(cert))
258+ logger.debug('Temp file %s written', temp_path)
259+ fingerprint = self._get_cert_file_name(cert)
260+ os.rename(temp_path, os.path.join(self.certs_path, fingerprint))
261+ logger.debug('Cert with fingerprint %s added.', fingerprint)
262+
263+ def is_certificate_pinned(self, cert):
264+ """Return if a certificate was already pinned."""
265+ fingerprint = self._get_cert_file_name(cert)
266+ return fingerprint in self.pinned_certificates
267+
268+ @property
269+ def pinned_certificates(self):
270+ """Return all the fingerprints of the stored certs."""
271+ return [name for name in os.listdir(self.certs_path) if not
272+ name.startswith('tmp-')]
273
274=== added file 'ubuntu_sso/utils/webclient/tests/test_ssl_certs.py'
275--- ubuntu_sso/utils/webclient/tests/test_ssl_certs.py 1970-01-01 00:00:00 +0000
276+++ ubuntu_sso/utils/webclient/tests/test_ssl_certs.py 2012-03-15 11:43:19 +0000
277@@ -0,0 +1,151 @@
278+# -*- coding: utf-8 -*-
279+#
280+# Copyright 2011 Canonical Ltd.
281+#
282+# This program is free software: you can redistribute it and/or modify it
283+# under the terms of the GNU General Public License version 3, as published
284+# by the Free Software Foundation.
285+#
286+# This program is distributed in the hope that it will be useful, but
287+# WITHOUT ANY WARRANTY; without even the implied warranties of
288+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
289+# PURPOSE. See the GNU General Public License for more details.
290+#
291+# You should have received a copy of the GNU General Public License along
292+# with this program. If not, see <http://www.gnu.org/licenses/>.
293+"""Tests for the ssl certs operations."""
294+
295+import os
296+import tempfile
297+
298+from OpenSSL import crypto
299+from socket import gethostname
300+from twisted.internet import defer, reactor
301+from twisted.trial.unittest import TestCase
302+
303+from ubuntu_sso.utils.webclient import ssl_certs
304+
305+
306+class PinnedSSLCertsVaultTestCase(TestCase):
307+ """Test the addition of a new pinned cert."""
308+
309+ @defer.inlineCallbacks
310+ def setUp(self):
311+ """Set the diff tests."""
312+ yield super(PinnedSSLCertsVaultTestCase, self).setUp()
313+ self.cert_details = dict(organization='Canonical',
314+ common_name=gethostname(),
315+ locality_name='London',
316+ unit='Ubuntu One',
317+ country_name='UK',
318+ state_name='London',)
319+ self.cert = self._generate_cert(self.cert_details)
320+
321+ self.certs_path = self.mktemp()
322+ os.mkdir(self.certs_path)
323+ self.vault = ssl_certs.PinnedSSLCertsVault(self.certs_path)
324+
325+ def _generate_cert(self, cert_details):
326+ """Return a cert using the given details."""
327+ # create a key pair
328+ key = crypto.PKey()
329+ key.generate_key(crypto.TYPE_RSA, 1024)
330+
331+ # create a self-signed cert
332+ cert = crypto.X509()
333+ cert.get_subject().C = cert_details['country_name']
334+ cert.get_subject().ST = cert_details['state_name']
335+ cert.get_subject().L = cert_details['locality_name']
336+ cert.get_subject().O = cert_details['organization']
337+ cert.get_subject().OU = cert_details['unit']
338+ cert.get_subject().CN = cert_details['common_name']
339+ cert.set_serial_number(1000)
340+ cert.gmtime_adj_notBefore(0)
341+ cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
342+ cert.set_issuer(cert.get_subject())
343+ cert.set_pubkey(key)
344+ cert.sign(key, 'sha1')
345+
346+ return cert
347+
348+ def test_missing_dir(self):
349+ """Test addind a pinned cert when the dir is missing."""
350+ os.rmdir(self.certs_path)
351+ self.vault = ssl_certs.PinnedSSLCertsVault(self.certs_path)
352+ self.assertTrue(os.path.exists(self.certs_path))
353+
354+ def test_pinned_certificates(self):
355+ """Test getting the list of pinned certificates."""
356+ # create number of certs and ensure that the list is there.
357+ added = []
358+
359+ for index in (1, 2, 3):
360+ cert_details = self.cert_details.copy()
361+ cert_details['common_name'] = \
362+ cert_details['common_name'] + str(index)
363+ cert = self._generate_cert(cert_details)
364+ self.vault.store_certificate(cert)
365+ added.append(
366+ cert.digest(ssl_certs.HASH_ALGORITHM[0]).replace(':', '-'))
367+
368+ for fingerprint in added:
369+ self.assertIn(fingerprint, self.vault.pinned_certificates)
370+
371+ def test_pinned_certificates_temp_files(self):
372+ """Test getting the list of pinned certificates."""
373+ # create a number of tmp files and assert that they are not returned.
374+ tempfiles = []
375+ added = []
376+
377+ for index in (1, 2, 3):
378+ _, temp_path = tempfile.mkstemp(prefix='tmp-', dir=self.certs_path)
379+ with open(temp_path, 'w') as fd:
380+ fd.write("I'm a test!")
381+ tempfiles.append(temp_path)
382+
383+ for index in (1, 2, 3):
384+ cert_details = self.cert_details.copy()
385+ cert_details['common_name'] = \
386+ cert_details['common_name'] + str(index)
387+ cert = self._generate_cert(cert_details)
388+ self.vault.store_certificate(cert)
389+ added.append(
390+ cert.digest(ssl_certs.HASH_ALGORITHM[0]).replace(':', '-'))
391+
392+ for fingerprint in added:
393+ self.assertIn(fingerprint, self.vault.pinned_certificates)
394+
395+ for temp in tempfiles:
396+ self.assertNotIn(temp, self.vault.pinned_certificates)
397+
398+ def test_adding_certificated(self):
399+ """Test adding a new certificate."""
400+ self.vault.store_certificate(self.cert)
401+ fingerprint = self.cert.digest(
402+ ssl_certs.HASH_ALGORITHM[0]).replace(':', '-')
403+ self.assertIn(fingerprint, os.listdir(self.certs_path))
404+
405+ def _assert_added(self):
406+ """Assert the addition with multiple threads."""
407+ vault = ssl_certs.PinnedSSLCertsVault(self.certs_path)
408+ vault.store_certificate(self.cert)
409+ fingerprint = self.cert.digest(
410+ ssl_certs.HASH_ALGORITHM[0]).replace(':', '-')
411+ self.assertIn(fingerprint, vault.pinned_certificates)
412+ self.assertIn(fingerprint, os.listdir(self.certs_path))
413+
414+ def test_stepping_on_each_other(self):
415+ """Test using several vaults in threads."""
416+ # pylint: disable=E1101
417+ for _ in (1, 2, 3, 4, 5):
418+ reactor.callInThread(self._assert_added)
419+ # pylint: enable=E1101
420+
421+ def test_get_cert_filename(self):
422+ """Assert that the filename is correct."""
423+ expected = self.cert.digest(
424+ ssl_certs.HASH_ALGORITHM[0]).replace(':', '-')
425+ # pylint: disable=W0212
426+ filename = self.vault._get_cert_file_name(self.cert)
427+ # pylint: enable=W0212
428+ self.assertEqual(expected, filename)
429
430=== modified file 'ubuntu_sso/utils/webclient/tests/test_webclient.py'
431--- ubuntu_sso/utils/webclient/tests/test_webclient.py 2012-03-15 11:43:19 +0000
432+++ ubuntu_sso/utils/webclient/tests/test_webclient.py 2012-03-15 11:43:19 +0000
433@@ -892,7 +892,7 @@
434 with open(key_path, 'wt') as fd:
435 fd.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
436
437- return dict(key=key_path, cert=cert_path)
438+ return dict(key=key_path, cert=cert_path, cert_info=cert)
439
440
441 class CorrectProxyTestCase(BaseSSLTestCase):
442@@ -1012,6 +1012,8 @@
443 details = SSL_DETAILS_TEMPLATE % self.cert_details
444 self.assertIn(('_launch_ssl_dialog', gethostname(), details),
445 self.called)
446+ self.assertTrue(self.wc.ssl_vault.is_certificate_pinned(
447+ self.ssl_settings['cert_info']))
448
449 def test_ssl_fail_dialog_user_accepts(self):
450 """Test showing the dialog and accepting."""
451@@ -1026,12 +1028,24 @@
452 """Test showing the dialog and rejecting."""
453 self.failUnlessFailure(self.wc.request(self.base_iri + SIMPLERESOURCE),
454 webclient.WebClientError)
455-
456- def test_format_ssl_details(self):
457+ # assert the cert was not pinned
458+ self.assertFalse(self.wc.ssl_vault.is_certificate_pinned(
459+ self.ssl_settings['cert_info']))
460+
461+ @defer.inlineCallbacks
462+ def test_ssl_fail_pinned_cert(self):
463+ """Test when the cert is pinned."""
464+ self.patch(self.wc.ssl_vault, 'is_certificate_pinned', lambda _: True)
465+ result = yield self.wc.request(self.base_ssl_iri + SIMPLERESOURCE)
466+ details = SSL_DETAILS_TEMPLATE % self.cert_details
467+ self.assertNotIn(('_launch_ssl_dialog', gethostname(), details),
468+ self.called)
469+ self.assertEqual(SAMPLE_RESOURCE, result.content)
470+
471+ def test_get_ssl_subject_details(self):
472 """Assert that details are correctly formatted"""
473- details = SSL_DETAILS_TEMPLATE % self.cert_details
474- self.assertEqual(details,
475- self.wc.format_ssl_details(self.cert_details))
476+ self.assertEqual(self.cert_details,
477+ self.wc.get_ssl_subject_details(self.ssl_settings['cert_info']))
478
479 if WEBCLIENT_MODULE_NAME.endswith(".txweb"):
480 reason = 'SSL support has not yet been implemented.'

Subscribers

People subscribed via source and target branches