Merge lp:~mandel/ubuntu-sso-client/pinned-certs into lp:ubuntu-sso-client

Proposed by Manuel de la Peña on 2012-03-14
Status: Rejected
Rejected by: Manuel de la Peña on 2012-04-02
Proposed branch: lp:~mandel/ubuntu-sso-client/pinned-certs
Merge into: lp:ubuntu-sso-client
Prerequisite: lp:~mandel/ubuntu-sso-client/libsoup-ssl-dialog
Diff against target: 480 lines (+304/-51)
6 files modified
ubuntu_sso/utils/webclient/common.py (+23/-10)
ubuntu_sso/utils/webclient/libsoup.py (+3/-13)
ubuntu_sso/utils/webclient/qtnetwork.py (+2/-22)
ubuntu_sso/utils/webclient/ssl_certs.py (+105/-0)
ubuntu_sso/utils/webclient/tests/test_ssl_certs.py (+151/-0)
ubuntu_sso/utils/webclient/tests/test_webclient.py (+20/-6)
To merge this branch: bzr merge lp:~mandel/ubuntu-sso-client/pinned-certs
Reviewer Review Type Date Requested Status
Manuel de la Peña (community) Disapprove on 2012-04-02
dobey (community) Approve on 2012-03-16
Roberto Alsina (community) 2012-03-14 Approve on 2012-03-15
Review via email: mp+97486@code.launchpad.net

Commit Message

- Added a ssl cert vault that will allow to remember pinned ssl certificates (LP: #955339)

Description of the Change

- Added a ssl cert vault that will allow to remember pinned ssl certificates (LP: #955339)

To post a comment you must log in.
Roberto Alsina (ralsina) wrote :

Looks good to me, but testing this IRL is a pain.

review: Approve
dobey (dobey) wrote :

Looks ok.

review: Approve
Manuel de la Peña (mandel) wrote :

Rejecting since we won't use this code.

review: Disapprove

Unmerged revisions

952. By Manuel de la Peña on 2012-03-15

Remove unused function.

951. By Manuel de la Peña on 2012-03-15

Merged libsoup-ssl-dialog into pinned-certs.

950. By Manuel de la Peña on 2012-03-14

Merged libsoup-ssl-dialog into pinned-certs.

949. By Manuel de la Peña on 2012-03-14

Added the required code to store the pinned certificates in order to not promp the ssl dialog constantly.

948. By Manuel de la Peña on 2012-03-13

Pass the cert pem from the child WebClient implementations to the base class.

947. By Manuel de la Peña on 2012-03-12

Reduced diff size.

946. By Manuel de la Peña on 2012-03-12

Reduced diff size.

945. By Manuel de la Peña on 2012-03-12

Merged with trunk.

944. By Manuel de la Peña on 2012-03-12

Added the required code to let the libsoup webclient implementation deal with ssl cert errors.

943. By Manuel de la Peña on 2012-03-12

Made changes to the tests so that they work correctly.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ubuntu_sso/utils/webclient/common.py'
2--- ubuntu_sso/utils/webclient/common.py 2012-03-15 11:43:19 +0000
3+++ ubuntu_sso/utils/webclient/common.py 2012-03-15 11:43:19 +0000
4@@ -21,6 +21,7 @@
5
6 from httplib2 import iri2uri
7 from oauth import oauth
8+from OpenSSL.crypto import load_certificate, FILETYPE_PEM
9 from twisted.internet import defer
10 from urlparse import urlparse
11
12@@ -28,6 +29,8 @@
13 from ubuntu_sso.logger import setup_logging
14 from ubuntu_sso.utils.runner import spawn_program
15 from ubuntu_sso.utils.ui import SSL_DETAILS_TEMPLATE
16+from ubuntu_sso.utils.ui import SSL_DETAILS_TEMPLATE
17+from ubuntu_sso.utils.webclient.ssl_certs import PinnedSSLCertsVault
18 from ubuntu_sso.utils.webclient.timestamp import TimestampChecker
19
20 SSL_DIALOG = 'ubuntu-sso-ssl-certificate-qt'
21@@ -109,6 +112,7 @@
22 self.proxy_username = None
23 self.proxy_password = None
24 self.oauth_sign_plain = oauth_sign_plain
25+ self.ssl_vault = PinnedSSLCertsVault()
26
27 def request(self, iri, method="GET", extra_headers=None,
28 oauth_credentials=None, post_content=None):
29@@ -238,9 +242,16 @@
30 return_code)
31 defer.returnValue(False)
32
33- def format_ssl_details(self, details):
34+ def get_ssl_subject_details(self, cert):
35 """Return a formatted string with the details."""
36- return SSL_DETAILS_TEMPLATE % details
37+ cert_subject = cert.get_subject()
38+ details = dict(organization=cert_subject.O,
39+ common_name=cert_subject.CN,
40+ locality_name=cert_subject.L,
41+ unit=cert_subject.OU,
42+ country_name=cert_subject.C,
43+ state_name=cert_subject.ST)
44+ return details
45
46 def _launch_ssl_dialog(self, domain, details):
47 """Launch a dialog used to approve the ssl cert."""
48@@ -252,16 +263,18 @@
49 '--appname', self.appname)
50 return spawn_program(args)
51
52- def _was_ssl_accepted(self, cert_details):
53- """Return if the cert was already accepted."""
54- # TODO: Ensure that we look at pinned certs in a following branch
55- return False
56-
57 @defer.inlineCallbacks
58- def request_ssl_cert_approval(self, domain, details):
59+ def request_ssl_cert_approval(self, cert_pem):
60 """Request the user for ssl approval."""
61- if self._was_ssl_accepted(details):
62+ cert = load_certificate(FILETYPE_PEM, cert_pem)
63+ if self.ssl_vault.is_certificate_pinned(cert):
64 defer.returnValue(True)
65
66- return_code = yield self._launch_ssl_dialog(domain, details)
67+ subject_details = self.get_ssl_subject_details(cert)
68+ details = SSL_DETAILS_TEMPLATE % subject_details
69+ return_code = yield self._launch_ssl_dialog(
70+ subject_details['common_name'],
71+ details)
72+ if return_code == USER_SUCCESS:
73+ self.ssl_vault.store_certificate(cert)
74 defer.returnValue(return_code == USER_SUCCESS)
75
76=== modified file 'ubuntu_sso/utils/webclient/libsoup.py'
77--- ubuntu_sso/utils/webclient/libsoup.py 2012-03-15 11:43:19 +0000
78+++ ubuntu_sso/utils/webclient/libsoup.py 2012-03-15 11:43:19 +0000
79@@ -17,7 +17,6 @@
80
81 import httplib
82
83-from OpenSSL.crypto import load_certificate, FILETYPE_PEM
84 from twisted.internet import defer
85
86 from ubuntu_sso.logger import setup_logging
87@@ -67,17 +66,9 @@
88 if message.status_code == httplib.OK:
89 response = self._get_response(message)
90 if is_ssl and errors.real != 0:
91- cert = load_certificate(FILETYPE_PEM,
92- raw_cert.props.certificate_pem)
93- cert_subject = cert.get_subject()
94- ssl_details = dict(organization=cert_subject.O,
95- common_name=cert_subject.CN,
96- locality_name=cert_subject.L,
97- unit=cert_subject.OU,
98- country_name=cert_subject.C,
99- state_name=cert_subject.ST)
100 e = SSLCertError(message.reason_phrase,
101- ssl_details=ssl_details, response=response)
102+ ssl_details=raw_cert.props.certificate_pem,
103+ response=response)
104 d.errback(e)
105 else:
106 d.callback(response)
107@@ -136,8 +127,7 @@
108 failure.trap(SSLCertError)
109 logger.debug('SSL cert errors found.')
110 trust_cert = yield self.request_ssl_cert_approval(
111- failure.value.ssl_details['common_name'],
112- self.format_ssl_details(failure.value.ssl_details))
113+ failure.value.ssl_details)
114 if trust_cert:
115 defer.returnValue(failure.value.response)
116
117
118=== modified file 'ubuntu_sso/utils/webclient/qtnetwork.py'
119--- ubuntu_sso/utils/webclient/qtnetwork.py 2012-03-09 12:04:31 +0000
120+++ ubuntu_sso/utils/webclient/qtnetwork.py 2012-03-15 11:43:19 +0000
121@@ -30,7 +30,6 @@
122 QNetworkProxyFactory,
123 QNetworkReply,
124 QNetworkRequest,
125- QSslCertificate,
126 )
127 from twisted.internet import defer
128
129@@ -266,31 +265,12 @@
130 exception = WebClientError(error_string, content)
131 d.errback(exception)
132
133- def _get_certificate_details(self, cert):
134- """Return an string with the details of the certificate."""
135- detail_titles = {QSslCertificate.Organization: 'organization',
136- QSslCertificate.CommonName: 'common_name',
137- QSslCertificate.LocalityName: 'locality_name',
138- QSslCertificate.OrganizationalUnitName: 'unit',
139- QSslCertificate.CountryName: 'country_name',
140- QSslCertificate.StateOrProvinceName: 'state_name'}
141- details = {}
142- for info, title in detail_titles.iteritems():
143- details[title] = str(cert.issuerInfo(info))
144- return self.format_ssl_details(details)
145-
146- def _get_certificate_host(self, cert):
147- """Return the host of the cert."""
148- return str(cert.issuerInfo(QSslCertificate.CommonName))
149-
150 @defer.inlineCallbacks
151 def _handle_ssl_errors(self, reply, errors):
152 """Handle the case in which we got an ssl error."""
153 # ask the user if the cer should be trusted
154- cert = errors[0].certificate()
155- trust_cert = yield self.request_ssl_cert_approval(
156- self._get_certificate_host(cert),
157- self._get_certificate_details(cert))
158+ cert_pem = errors[0].certificate().toPem()
159+ trust_cert = yield self.request_ssl_cert_approval(cert_pem)
160 if trust_cert:
161 reply.ignoreSslErrors()
162
163
164=== added file 'ubuntu_sso/utils/webclient/ssl_certs.py'
165--- ubuntu_sso/utils/webclient/ssl_certs.py 1970-01-01 00:00:00 +0000
166+++ ubuntu_sso/utils/webclient/ssl_certs.py 2012-03-15 11:43:19 +0000
167@@ -0,0 +1,105 @@
168+# -*- coding: utf-8 -*-
169+#
170+# Copyright 2011 Canonical Ltd.
171+#
172+# This program is free software: you can redistribute it and/or modify it
173+# under the terms of the GNU General Public License version 3, as published
174+# by the Free Software Foundation.
175+#
176+# This program is distributed in the hope that it will be useful, but
177+# WITHOUT ANY WARRANTY; without even the implied warranties of
178+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
179+# PURPOSE. See the GNU General Public License for more details.
180+#
181+# You should have received a copy of the GNU General Public License along
182+# with this program. If not, see <http://www.gnu.org/licenses/>.
183+"""Store and retrieve certificates that have been pinned."""
184+
185+import base64
186+import os
187+import tempfile
188+
189+from ubuntu_sso import xdg_base_directory
190+from ubuntu_sso.logger import setup_logging
191+
192+PINNED_CERT_LINE_FORMAT = ('%(domain)s:%(port)s\t%(hash_algorithm)s\t'
193+ '%(fingerprint)s\t%(override_type)s\t'
194+ '%(serial_number)s\n')
195+HTTPS_PORT = 443
196+HASH_ALGORITHM = ('sha1', 'OID.2.16.840.1.101.3.4.2.1')
197+DEFAULT_OVERRIDE_TYPE = 'MUT'
198+PINNED_CERT_DIR = os.path.join(xdg_base_directory.xdg_cache_home, 'sso',
199+ 'ssl-certs')
200+
201+logger = setup_logging("ubuntu_sso.utils.webclient.ssl_certs")
202+
203+
204+class PinnedSSLCertsVault(object):
205+ """Represents a vault that will store the pinned certs."""
206+
207+ def __init__(self, certs_path=PINNED_CERT_DIR):
208+ """Create a new instance."""
209+ self.certs_path = certs_path
210+ if not os.path.exists(self.certs_path):
211+ os.makedirs(self.certs_path)
212+
213+ def _get_pinned_certificate_line(self, cert):
214+ """Return the line of a pinned cert.
215+
216+ The following format is used:
217+ Fields are separated by a tab character. Each line is terminated
218+ by a line feed character (UNIX format).
219+
220+ 1. domainname:port : port 443 for HTTPS (SSL)
221+ 2. hash algorithm OID:
222+ SHA1-256: OID.2.16.840.1.101.3.4.2.1 (most used)
223+ SHA-384: OID.2.16.840.1.101.3.4.2.2
224+ SHA-512: OID.2.16.840.1.101.3.4.2.3
225+ 3. Certificate fingerprint using previous hash algorithm
226+ 4. One or more characters for override type:
227+ M : allow mismatches in the hostname
228+ U : allow untrusted certs (whether it's self signed cert or
229+ a missing or invalid issuer cert)
230+ T : allow errors in the validity time, for example, for expired or
231+ not yet valid certs
232+ 5. Certificate's serial number and the issuer name as a base64
233+ encoded string
234+
235+ more details can be found at:
236+ https://developer.mozilla.org/En/Cert_override.txt
237+ """
238+ cert_subject = cert.get_subject()
239+ serial_number = '%s@%s' % (cert.get_serial_number(), cert.get_issuer())
240+ details = dict(domain=cert_subject.CN,
241+ port=HTTPS_PORT,
242+ hash_algorithm=HASH_ALGORITHM[1],
243+ fingerprint=cert.digest(HASH_ALGORITHM[0]),
244+ override_type=DEFAULT_OVERRIDE_TYPE,
245+ serial_number=base64.b64encode(serial_number))
246+ return PINNED_CERT_LINE_FORMAT % details
247+
248+ def _get_cert_file_name(self, cert):
249+ """Return the file name to use with the cert."""
250+ fingerprint = cert.digest(HASH_ALGORITHM[0])
251+ return fingerprint.replace(':', '-')
252+
253+ def store_certificate(self, cert):
254+ """Store a new certificate."""
255+ _, temp_path = tempfile.mkstemp(prefix='tmp-', dir=self.certs_path)
256+ with open(temp_path, 'wb') as fd:
257+ fd.write(self._get_pinned_certificate_line(cert))
258+ logger.debug('Temp file %s written', temp_path)
259+ fingerprint = self._get_cert_file_name(cert)
260+ os.rename(temp_path, os.path.join(self.certs_path, fingerprint))
261+ logger.debug('Cert with fingerprint %s added.', fingerprint)
262+
263+ def is_certificate_pinned(self, cert):
264+ """Return if a certificate was already pinned."""
265+ fingerprint = self._get_cert_file_name(cert)
266+ return fingerprint in self.pinned_certificates
267+
268+ @property
269+ def pinned_certificates(self):
270+ """Return all the fingerprints of the stored certs."""
271+ return [name for name in os.listdir(self.certs_path) if not
272+ name.startswith('tmp-')]
273
274=== added file 'ubuntu_sso/utils/webclient/tests/test_ssl_certs.py'
275--- ubuntu_sso/utils/webclient/tests/test_ssl_certs.py 1970-01-01 00:00:00 +0000
276+++ ubuntu_sso/utils/webclient/tests/test_ssl_certs.py 2012-03-15 11:43:19 +0000
277@@ -0,0 +1,151 @@
278+# -*- coding: utf-8 -*-
279+#
280+# Copyright 2011 Canonical Ltd.
281+#
282+# This program is free software: you can redistribute it and/or modify it
283+# under the terms of the GNU General Public License version 3, as published
284+# by the Free Software Foundation.
285+#
286+# This program is distributed in the hope that it will be useful, but
287+# WITHOUT ANY WARRANTY; without even the implied warranties of
288+# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
289+# PURPOSE. See the GNU General Public License for more details.
290+#
291+# You should have received a copy of the GNU General Public License along
292+# with this program. If not, see <http://www.gnu.org/licenses/>.
293+"""Tests for the ssl certs operations."""
294+
295+import os
296+import tempfile
297+
298+from OpenSSL import crypto
299+from socket import gethostname
300+from twisted.internet import defer, reactor
301+from twisted.trial.unittest import TestCase
302+
303+from ubuntu_sso.utils.webclient import ssl_certs
304+
305+
306+class PinnedSSLCertsVaultTestCase(TestCase):
307+ """Test the addition of a new pinned cert."""
308+
309+ @defer.inlineCallbacks
310+ def setUp(self):
311+ """Set the diff tests."""
312+ yield super(PinnedSSLCertsVaultTestCase, self).setUp()
313+ self.cert_details = dict(organization='Canonical',
314+ common_name=gethostname(),
315+ locality_name='London',
316+ unit='Ubuntu One',
317+ country_name='UK',
318+ state_name='London',)
319+ self.cert = self._generate_cert(self.cert_details)
320+
321+ self.certs_path = self.mktemp()
322+ os.mkdir(self.certs_path)
323+ self.vault = ssl_certs.PinnedSSLCertsVault(self.certs_path)
324+
325+ def _generate_cert(self, cert_details):
326+ """Return a cert using the given details."""
327+ # create a key pair
328+ key = crypto.PKey()
329+ key.generate_key(crypto.TYPE_RSA, 1024)
330+
331+ # create a self-signed cert
332+ cert = crypto.X509()
333+ cert.get_subject().C = cert_details['country_name']
334+ cert.get_subject().ST = cert_details['state_name']
335+ cert.get_subject().L = cert_details['locality_name']
336+ cert.get_subject().O = cert_details['organization']
337+ cert.get_subject().OU = cert_details['unit']
338+ cert.get_subject().CN = cert_details['common_name']
339+ cert.set_serial_number(1000)
340+ cert.gmtime_adj_notBefore(0)
341+ cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
342+ cert.set_issuer(cert.get_subject())
343+ cert.set_pubkey(key)
344+ cert.sign(key, 'sha1')
345+
346+ return cert
347+
348+ def test_missing_dir(self):
349+ """Test addind a pinned cert when the dir is missing."""
350+ os.rmdir(self.certs_path)
351+ self.vault = ssl_certs.PinnedSSLCertsVault(self.certs_path)
352+ self.assertTrue(os.path.exists(self.certs_path))
353+
354+ def test_pinned_certificates(self):
355+ """Test getting the list of pinned certificates."""
356+ # create number of certs and ensure that the list is there.
357+ added = []
358+
359+ for index in (1, 2, 3):
360+ cert_details = self.cert_details.copy()
361+ cert_details['common_name'] = \
362+ cert_details['common_name'] + str(index)
363+ cert = self._generate_cert(cert_details)
364+ self.vault.store_certificate(cert)
365+ added.append(
366+ cert.digest(ssl_certs.HASH_ALGORITHM[0]).replace(':', '-'))
367+
368+ for fingerprint in added:
369+ self.assertIn(fingerprint, self.vault.pinned_certificates)
370+
371+ def test_pinned_certificates_temp_files(self):
372+ """Test getting the list of pinned certificates."""
373+ # create a number of tmp files and assert that they are not returned.
374+ tempfiles = []
375+ added = []
376+
377+ for index in (1, 2, 3):
378+ _, temp_path = tempfile.mkstemp(prefix='tmp-', dir=self.certs_path)
379+ with open(temp_path, 'w') as fd:
380+ fd.write("I'm a test!")
381+ tempfiles.append(temp_path)
382+
383+ for index in (1, 2, 3):
384+ cert_details = self.cert_details.copy()
385+ cert_details['common_name'] = \
386+ cert_details['common_name'] + str(index)
387+ cert = self._generate_cert(cert_details)
388+ self.vault.store_certificate(cert)
389+ added.append(
390+ cert.digest(ssl_certs.HASH_ALGORITHM[0]).replace(':', '-'))
391+
392+ for fingerprint in added:
393+ self.assertIn(fingerprint, self.vault.pinned_certificates)
394+
395+ for temp in tempfiles:
396+ self.assertNotIn(temp, self.vault.pinned_certificates)
397+
398+ def test_adding_certificated(self):
399+ """Test adding a new certificate."""
400+ self.vault.store_certificate(self.cert)
401+ fingerprint = self.cert.digest(
402+ ssl_certs.HASH_ALGORITHM[0]).replace(':', '-')
403+ self.assertIn(fingerprint, os.listdir(self.certs_path))
404+
405+ def _assert_added(self):
406+ """Assert the addition with multiple threads."""
407+ vault = ssl_certs.PinnedSSLCertsVault(self.certs_path)
408+ vault.store_certificate(self.cert)
409+ fingerprint = self.cert.digest(
410+ ssl_certs.HASH_ALGORITHM[0]).replace(':', '-')
411+ self.assertIn(fingerprint, vault.pinned_certificates)
412+ self.assertIn(fingerprint, os.listdir(self.certs_path))
413+
414+ def test_stepping_on_each_other(self):
415+ """Test using several vaults in threads."""
416+ # pylint: disable=E1101
417+ for _ in (1, 2, 3, 4, 5):
418+ reactor.callInThread(self._assert_added)
419+ # pylint: enable=E1101
420+
421+ def test_get_cert_filename(self):
422+ """Assert that the filename is correct."""
423+ expected = self.cert.digest(
424+ ssl_certs.HASH_ALGORITHM[0]).replace(':', '-')
425+ # pylint: disable=W0212
426+ filename = self.vault._get_cert_file_name(self.cert)
427+ # pylint: enable=W0212
428+ self.assertEqual(expected, filename)
429
430=== modified file 'ubuntu_sso/utils/webclient/tests/test_webclient.py'
431--- ubuntu_sso/utils/webclient/tests/test_webclient.py 2012-03-15 11:43:19 +0000
432+++ ubuntu_sso/utils/webclient/tests/test_webclient.py 2012-03-15 11:43:19 +0000
433@@ -892,7 +892,7 @@
434 with open(key_path, 'wt') as fd:
435 fd.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
436
437- return dict(key=key_path, cert=cert_path)
438+ return dict(key=key_path, cert=cert_path, cert_info=cert)
439
440
441 class CorrectProxyTestCase(BaseSSLTestCase):
442@@ -1012,6 +1012,8 @@
443 details = SSL_DETAILS_TEMPLATE % self.cert_details
444 self.assertIn(('_launch_ssl_dialog', gethostname(), details),
445 self.called)
446+ self.assertTrue(self.wc.ssl_vault.is_certificate_pinned(
447+ self.ssl_settings['cert_info']))
448
449 def test_ssl_fail_dialog_user_accepts(self):
450 """Test showing the dialog and accepting."""
451@@ -1026,12 +1028,24 @@
452 """Test showing the dialog and rejecting."""
453 self.failUnlessFailure(self.wc.request(self.base_iri + SIMPLERESOURCE),
454 webclient.WebClientError)
455-
456- def test_format_ssl_details(self):
457+ # assert the cert was not pinned
458+ self.assertFalse(self.wc.ssl_vault.is_certificate_pinned(
459+ self.ssl_settings['cert_info']))
460+
461+ @defer.inlineCallbacks
462+ def test_ssl_fail_pinned_cert(self):
463+ """Test when the cert is pinned."""
464+ self.patch(self.wc.ssl_vault, 'is_certificate_pinned', lambda _: True)
465+ result = yield self.wc.request(self.base_ssl_iri + SIMPLERESOURCE)
466+ details = SSL_DETAILS_TEMPLATE % self.cert_details
467+ self.assertNotIn(('_launch_ssl_dialog', gethostname(), details),
468+ self.called)
469+ self.assertEqual(SAMPLE_RESOURCE, result.content)
470+
471+ def test_get_ssl_subject_details(self):
472 """Assert that details are correctly formatted"""
473- details = SSL_DETAILS_TEMPLATE % self.cert_details
474- self.assertEqual(details,
475- self.wc.format_ssl_details(self.cert_details))
476+ self.assertEqual(self.cert_details,
477+ self.wc.get_ssl_subject_details(self.ssl_settings['cert_info']))
478
479 if WEBCLIENT_MODULE_NAME.endswith(".txweb"):
480 reason = 'SSL support has not yet been implemented.'

Subscribers

People subscribed via source and target branches