Merge lp:~nataliabidart/ubuntu-sso-client/add-oauth-to-url into lp:ubuntu-sso-client

Proposed by Natalia Bidart on 2012-01-27
Status: Merged
Approved by: Natalia Bidart on 2012-01-30
Approved revision: 841
Merged at revision: 841
Proposed branch: lp:~nataliabidart/ubuntu-sso-client/add-oauth-to-url
Merge into: lp:ubuntu-sso-client
Diff against target: 203 lines (+107/-32)
2 files modified
ubuntu_sso/utils/webclient/common.py (+23/-3)
ubuntu_sso/utils/webclient/tests/test_webclient.py (+84/-29)
To merge this branch: bzr merge lp:~nataliabidart/ubuntu-sso-client/add-oauth-to-url
Reviewer Review Type Date Requested Status
Alejandro J. Cura (community) 2012-01-27 Approve on 2012-01-27
Review via email: mp+90522@code.launchpad.net

Commit Message

- Add a method to obtain an OAuth signed uri.

To post a comment you must log in.
Alejandro J. Cura (alecu) wrote :

Looks great!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ubuntu_sso/utils/webclient/common.py'
2--- ubuntu_sso/utils/webclient/common.py 2012-01-25 15:02:50 +0000
3+++ ubuntu_sso/utils/webclient/common.py 2012-01-27 20:24:24 +0000
4@@ -19,6 +19,7 @@
5
6 from httplib2 import iri2uri
7 from oauth import oauth
8+from twisted.internet import defer
9
10 from ubuntu_sso.utils.webclient.timestamp import TimestampChecker
11
12@@ -103,18 +104,21 @@
13 def iri_to_uri(self, iri):
14 """Transform a unicode iri into a ascii uri."""
15 if not isinstance(iri, unicode):
16- raise TypeError
17+ raise TypeError('iri %r should be unicode.' % iri)
18 return bytes(iri2uri(iri))
19
20- def build_oauth_headers(self, method, uri, credentials, timestamp):
21+ def build_oauth_request(self, method, uri, credentials, timestamp,
22+ parameters=None):
23 """Build an oauth request given some credentials."""
24 consumer = oauth.OAuthConsumer(credentials["consumer_key"],
25 credentials["consumer_secret"])
26 token = oauth.OAuthToken(credentials["token"],
27 credentials["token_secret"])
28- parameters = {}
29+ if parameters is None:
30+ parameters = {}
31 if timestamp:
32 parameters["oauth_timestamp"] = timestamp
33+
34 request = oauth.OAuthRequest.from_consumer_and_token(
35 http_url=uri,
36 http_method=method,
37@@ -126,7 +130,23 @@
38 else:
39 sig_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
40 request.sign_request(sig_method, consumer, token)
41+ return request
42+
43+ def build_oauth_headers(self, method, uri, credentials, timestamp):
44+ """Build oauth request headers given some credentials."""
45+ request = self.build_oauth_request(method, uri, credentials, timestamp)
46 return request.to_header()
47
48+ @defer.inlineCallbacks
49+ def build_signed_iri(self, iri, credentials, parameters=None):
50+ """Build a new iri signing 'iri' with 'credentials'."""
51+ uri = self.iri_to_uri(iri)
52+ timestamp = yield self.get_timestamp()
53+ request = self.build_oauth_request(method='GET', uri=uri,
54+ credentials=credentials,
55+ timestamp=timestamp,
56+ parameters=parameters)
57+ defer.returnValue(request.to_url())
58+
59 def shutdown(self):
60 """Shut down all pending requests (if possible)."""
61
62=== modified file 'ubuntu_sso/utils/webclient/tests/test_webclient.py'
63--- ubuntu_sso/utils/webclient/tests/test_webclient.py 2012-01-27 16:09:40 +0000
64+++ ubuntu_sso/utils/webclient/tests/test_webclient.py 2012-01-27 20:24:24 +0000
65@@ -23,6 +23,7 @@
66 from twisted.cred import checkers, portal
67 from twisted.internet import defer
68 from twisted.web import http, resource, server, guard
69+from urllib2 import urlparse
70
71 from ubuntuone.devtools.testcases import TestCase
72 from ubuntuone.devtools.testcases.squid import SquidTestCase
73@@ -470,8 +471,28 @@
74 self.assertNotIn("cLaVe", hd)
75
76
77-class OAuthTestCase(TestCase):
78- """Test for the oauth signing code."""
79+class OAuthPlainTextTestCase(TestCase):
80+ """Test for the oauth signing code using PLAINTEXT."""
81+
82+ oauth_sign = "PLAINTEXT"
83+ sample_method = "GET"
84+ sample_url = "http://one.ubuntu.com/"
85+ sample_params = {'next': 'yadda-yadda-yo', 'foo': 'naranja fanta'}
86+ timestamp = 1
87+ expected_params = {
88+ "oauth_timestamp": str(timestamp),
89+ "oauth_consumer_key": SAMPLE_CREDENTIALS["consumer_key"],
90+ "oauth_token": SAMPLE_CREDENTIALS["token"],
91+ "oauth_nonce": ANY_VALUE,
92+ "oauth_signature": ANY_VALUE,
93+ }
94+
95+ @defer.inlineCallbacks
96+ def setUp(self):
97+ yield super(OAuthPlainTextTestCase, self).setUp()
98+ self.expected_params["oauth_signature_method"] = self.oauth_sign
99+ oauth_sign_plain = self.oauth_sign == "PLAINTEXT"
100+ self.wc = BaseWebClient(oauth_sign_plain=oauth_sign_plain)
101
102 def parse_oauth_header(self, header):
103 """Parse an oauth header into a tuple of (method, params_dict)."""
104@@ -484,38 +505,72 @@
105
106 return method, params
107
108- def do_build_oauth_headers(self, expected_sign_method, oauth_sign_plain):
109- """Build the oauth headers for a sample request."""
110-
111- sample_method = "GET"
112- sample_url = "http://one.ubuntu.com/"
113- timestamp = 1
114- expected_params = {
115- "oauth_timestamp": str(timestamp),
116- "oauth_consumer_key": SAMPLE_CREDENTIALS["consumer_key"],
117- "oauth_signature_method": expected_sign_method,
118- "oauth_token": SAMPLE_CREDENTIALS["token"],
119- "oauth_nonce": ANY_VALUE,
120- "oauth_signature": ANY_VALUE,
121- }
122-
123- wc = BaseWebClient(oauth_sign_plain=oauth_sign_plain)
124- headers = wc.build_oauth_headers(sample_method,
125- sample_url, SAMPLE_CREDENTIALS, timestamp)
126-
127+ def assert_headers_correct(self, headers):
128+ """Check that 'headers' match expected_params."""
129 method, params = self.parse_oauth_header(headers["Authorization"])
130
131 self.assertEqual(method, "OAuth")
132
133- for k, expected_value in expected_params.iteritems():
134+ for k, expected_value in self.expected_params.iteritems():
135 self.assertIn(k, params)
136 if expected_value is not ANY_VALUE:
137 self.assertEqual(params[k], expected_value)
138
139- def test_build_oauth_headers_hmac_sha1(self):
140- """Build the oauth headers with a sha1 signature."""
141- self.do_build_oauth_headers("HMAC-SHA1", oauth_sign_plain=False)
142-
143- def test_build_oauth_headers_plaintext(self):
144- """Build the oauth headers with a sha1 signature."""
145- self.do_build_oauth_headers("PLAINTEXT", oauth_sign_plain=True)
146+ def test_build_oauth_headers(self):
147+ """Check that the oauth headers are properly built."""
148+ headers = self.wc.build_oauth_headers(self.sample_method,
149+ self.sample_url, SAMPLE_CREDENTIALS, self.timestamp)
150+ self.assert_headers_correct(headers)
151+
152+ def test_build_oauth_request(self, params=None):
153+ """Check that the oauth request are properly built."""
154+ request = self.wc.build_oauth_request(self.sample_method,
155+ self.sample_url, SAMPLE_CREDENTIALS, self.timestamp,
156+ parameters=params)
157+
158+ self.assert_headers_correct(request.to_header())
159+ self.assertEqual(request.http_url, self.sample_url)
160+ if params is not None:
161+ for param, value in params.iteritems():
162+ self.assertIn(param, request.parameters)
163+ actual = request.parameters[param]
164+ self.assertEqual(value, actual)
165+
166+ def test_build_oauth_request_with_params(self):
167+ """Check that the oauth request are properly with params."""
168+ self.test_build_oauth_request(params=self.sample_params)
169+
170+ @defer.inlineCallbacks
171+ def test_build_signed_iri(self, params=None):
172+ """Check that the oauth signed iri is properly built."""
173+ self.patch(self.wc, 'get_timestamp', lambda: self.timestamp)
174+ iri = u'http://foo.bar.baz/ñandú'
175+ uri = self.wc.iri_to_uri(iri)
176+ request = self.wc.build_oauth_request(self.sample_method,
177+ uri, SAMPLE_CREDENTIALS,
178+ self.timestamp, parameters=params)
179+
180+ result = yield self.wc.build_signed_iri(iri, SAMPLE_CREDENTIALS,
181+ parameters=params)
182+
183+ scheme, netloc, path, params, query, _ = urlparse.urlparse(result)
184+
185+ self.assertEqual(urlparse.urljoin(scheme + '://' + netloc, path), uri)
186+
187+ expected = request.parameters
188+ actual = dict(urlparse.parse_qsl(query))
189+ for i in ('oauth_nonce', 'oauth_signature'):
190+ expected.pop(i)
191+ actual.pop(i)
192+ actual['oauth_timestamp'] = int(actual['oauth_timestamp'])
193+ self.assertEqual(expected, actual)
194+
195+ def test_build_signed_iri_with_params(self, params=None):
196+ """Check that the oauth signed iri is properly built with params."""
197+ return self.test_build_signed_iri(params=self.sample_params)
198+
199+
200+class OAuthHmacSha1TestCase(OAuthPlainTextTestCase):
201+ """Test for the oauth signing code using HMAC-SHA1."""
202+
203+ oauth_sign = "HMAC-SHA1"

Subscribers

People subscribed via source and target branches