Merge lp:~alecu/ubuntuone-client/proxy-tunnel-webcalls into lp:ubuntuone-client

Proposed by Alejandro J. Cura
Status: Merged
Approved by: Alejandro J. Cura
Approved revision: 1231
Merged at revision: 1208
Proposed branch: lp:~alecu/ubuntuone-client/proxy-tunnel-webcalls
Merge into: lp:ubuntuone-client
Diff against target: 524 lines (+97/-205)
2 files modified
tests/syncdaemon/test_action_queue.py (+51/-137)
ubuntuone/syncdaemon/action_queue.py (+46/-68)
To merge this branch: bzr merge lp:~alecu/ubuntuone-client/proxy-tunnel-webcalls
Reviewer Review Type Date Requested Status
Roberto Alsina (community) Approve
Eric Casteleijn (community) Approve
Diego Sarmentero (community) Approve
Review via email: mp+97134@code.launchpad.net

Commit message

- Use the txweb webclient from sso for webcalls, so they can be proxied too (LP: #929207, LP: #929212).

Description of the change

- Use the txweb webclient from sso for webcalls, so they can be proxied too. (LP: #929207, LP: #929212)

**NOTE** This branch depends on this SSO branch: lp:~alecu/ubuntu-sso-client/updated-txweb

To post a comment you must log in.
1231. By Alejandro J. Cura

timestamp is already corrected by txweb in sso

Revision history for this message
Alejandro J. Cura (alecu) wrote :

*** NOTE ***

This branch depends on this SSO branch: lp:~alecu/ubuntu-sso-client/updated-txweb

Revision history for this message
Diego Sarmentero (diegosarmentero) wrote :

GREAT WORK!
+1

review: Approve
Revision history for this message
Eric Casteleijn (thisfred) wrote :

Looks good to me

review: Approve
Revision history for this message
Roberto Alsina (ralsina) wrote :

looks good, seems to work...

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_action_queue.py'
2--- tests/syncdaemon/test_action_queue.py 2012-03-06 20:32:14 +0000
3+++ tests/syncdaemon/test_action_queue.py 2012-03-13 03:12:18 +0000
4@@ -23,7 +23,6 @@
5 import operator
6 import os
7 import unittest
8-import urllib2
9 import uuid
10
11 from functools import wraps
12@@ -33,7 +32,7 @@
13
14 from mocker import Mocker, MockerTestCase, ANY, expect
15 from oauth import oauth
16-from twisted.internet import defer, threads, reactor
17+from twisted.internet import defer, reactor
18 from twisted.internet import error as twisted_error
19 from twisted.python.failure import DefaultException, Failure
20 from twisted.web import server
21@@ -399,6 +398,21 @@
22 self.assertEqual(defined_args[0], 'self')
23 self.assertEqual(set(defined_args[1:]), set(evtargs))
24
25+ @defer.inlineCallbacks
26+ def test_get_webclient(self):
27+ """The webclient is created if it does not exist."""
28+ self.assertEqual(self.action_queue.webclient, None)
29+ webclient = yield self.action_queue.get_webclient()
30+ self.assertNotEqual(webclient, None)
31+
32+ @defer.inlineCallbacks
33+ def test_get_webclient_existing(self):
34+ """The webclient is not created again if it exists."""
35+ fake_wc = object()
36+ self.patch(self.action_queue, "webclient", fake_wc)
37+ webclient = yield self.action_queue.get_webclient()
38+ self.assertEqual(webclient, fake_wc)
39+
40
41 class TestLoggingStorageClient(TwistedTestCase):
42 """Tests for ensuring magic hash dont show in logs."""
43@@ -2519,67 +2533,26 @@
44 self.assertEqual(NODE, self.command.node_id)
45 self.assertEqual(True, self.command.is_public)
46
47- def test_run_defers_work_to_thread(self):
48- """Test that work is deferred to a thread."""
49- original = threads.deferToThread
50- self.called = False
51-
52- def check(function):
53- self.called = True
54- self.assertEqual(
55- self.command._change_public_access_http, function)
56- return defer.Deferred()
57-
58- threads.deferToThread = check
59- try:
60- res = self.command._run()
61- finally:
62- threads.deferToThread = original
63-
64- self.assertIsInstance(res, defer.Deferred)
65- self.assertTrue(self.called, "deferToThread was called")
66-
67+ @defer.inlineCallbacks
68 def test_change_public_access_http(self):
69- """Test the blocking portion of the command."""
70- self.called = False
71- def check(request):
72- self.called = True
73- url = 'https://one.ubuntu.com/files/api/set_public/%s:%s' % (
74+ """Test the command."""
75+
76+ def check_webcall(request_iri, method=None, post_content=None):
77+ """Check the webcall made by this command."""
78+ iri = u'https://one.ubuntu.com/files/api/set_public/%s:%s' % (
79 base64.urlsafe_b64encode(VOLUME.bytes).strip("="),
80 base64.urlsafe_b64encode(NODE.bytes).strip("="))
81- self.assertEqual(url, request.get_full_url())
82- self.assertEqual("is_public=True", request.get_data())
83- return StringIO(
84- '{"is_public": true, "public_url": "http://example.com"}')
85-
86- from ubuntuone.syncdaemon import action_queue
87- self.patch(action_queue.timestamp_checker, "get_faithful_time",
88- lambda: 1)
89- action_queue.urlopen = check
90- try:
91- res = self.command._change_public_access_http()
92- finally:
93- action_queue.urlopen = urllib2.urlopen
94-
95+ self.assertEqual(iri, request_iri)
96+ self.assertEqual("is_public=True", post_content)
97+ content = '{"is_public": true, "public_url": "http://example.com"}'
98+ response = action_queue.txweb.Response(content)
99+ return defer.succeed(response)
100+
101+ self.patch(self.action_queue, "webcall", check_webcall)
102+ res = yield self.command._run()
103 self.assertEqual(
104 {'is_public': True, 'public_url': 'http://example.com'}, res)
105
106- def test_change_public_access_http_uses_timestamp(self):
107- """The timestamp is used for oauth signing."""
108- fake_timestamp = 12345678
109-
110- def fake_urlopen(request):
111- """A fake urlopen."""
112- auth = request.headers["Authorization"]
113- expected = 'oauth_timestamp="%d"' % fake_timestamp
114- self.assertIn(expected, auth)
115- return StringIO("[]")
116-
117- self.patch(action_queue.timestamp_checker, "get_faithful_time",
118- lambda: fake_timestamp)
119- self.patch(action_queue, "urlopen", fake_urlopen)
120- self.command._change_public_access_http()
121-
122 def test_handle_success_push_event(self):
123 """Test AQ_CHANGE_PUBLIC_ACCESS_OK is pushed on success."""
124 response = {'is_public': True, 'public_url': 'http://example.com'}
125@@ -2592,8 +2565,7 @@
126 def test_handle_failure_push_event(self):
127 """Test AQ_CHANGE_PUBLIC_ACCESS_ERROR is pushed on failure."""
128 msg = 'Something went wrong'
129- failure = Failure(urllib2.HTTPError(
130- "http://example.com", 500, "Error", [], StringIO(msg)))
131+ failure = Failure(action_queue.txweb.WebClientError("Misc Error", msg))
132 self.command.handle_failure(failure=failure)
133 event = ('AQ_CHANGE_PUBLIC_ACCESS_ERROR',
134 {'share_id': VOLUME, 'node_id': NODE, 'error': msg})
135@@ -2620,11 +2592,11 @@
136 default_url = 'https://one.ubuntu.com/files/api/public_files'
137 request_queue = RequestQueue(action_queue=self.action_queue)
138 command = GetPublicFiles(request_queue)
139- self.assertEqual(command._url, default_url)
140+ self.assertEqual(command._iri, default_url)
141 custom_url = 'http://example.com:1234/files/api/public_files'
142 command_2 = GetPublicFiles(request_queue,
143- base_url='http://example.com:1234')
144- self.assertEqual(command_2._url, custom_url)
145+ base_iri=u'http://example.com:1234')
146+ self.assertEqual(command_2._iri, custom_url)
147
148 def test_change_public_access(self):
149 """Test the get_public_files method.."""
150@@ -2634,75 +2606,39 @@
151 """Test proper inheritance."""
152 self.assertTrue(isinstance(self.command, ActionQueueCommand))
153
154- def test_run_defers_work_to_thread(self):
155- """Test that work is deferred to a thread."""
156- original = threads.deferToThread
157- self.called = False
158-
159- def check(function):
160- self.called = True
161- self.assertEqual(
162- self.command._get_public_files_http, function)
163- return defer.Deferred()
164-
165- threads.deferToThread = check
166- try:
167- res = self.command._run()
168- finally:
169- threads.deferToThread = original
170-
171- self.assertIsInstance(res, defer.Deferred)
172- self.assertTrue(self.called, "deferToThread was called")
173-
174+ @defer.inlineCallbacks
175 def test_get_public_files_http(self):
176- """Test the blocking portion of the command."""
177- self.called = False
178+ """Test the _run method of the command."""
179 node_id = uuid.uuid4()
180 nodekey = '%s' % (base64.urlsafe_b64encode(node_id.bytes).strip("="))
181 node_id_2 = uuid.uuid4()
182 nodekey_2 = '%s' % (base64.urlsafe_b64encode(
183 node_id_2.bytes).strip("="))
184 volume_id = uuid.uuid4()
185- def check(request):
186- self.called = True
187- url = 'https://one.ubuntu.com/files/api/public_files'
188- self.assertEqual(url, request.get_full_url())
189- return StringIO(
190+
191+ def check_webcall(request_iri, method=None):
192+ """Check the webcall made by this command."""
193+ """Check the webcall made by this command."""
194+ iri = u'https://one.ubuntu.com/files/api/public_files'
195+ self.assertEqual(method.upper(), "GET")
196+ self.assertEqual(iri, request_iri)
197+ content = (
198 '[{"nodekey": "%s", "volume_id": null,"public_url": '
199 '"http://example.com"}, '
200 '{"nodekey": "%s", "volume_id": "%s", "public_url": '
201 '"http://example.com"}]' % (nodekey, nodekey_2, volume_id))
202-
203- from ubuntuone.syncdaemon import action_queue
204- self.patch(action_queue.timestamp_checker, "get_faithful_time",
205- lambda: 1)
206- action_queue.urlopen = check
207- try:
208- res = self.command._get_public_files_http()
209- finally:
210- action_queue.urlopen = urllib2.urlopen
211+ response = action_queue.txweb.Response(content)
212+ return defer.succeed(response)
213+
214+ self.patch(self.action_queue, "webcall", check_webcall)
215+ res = yield self.command._run()
216+
217 self.assertEqual([{'node_id': str(node_id), 'volume_id': '',
218 'public_url': 'http://example.com'},
219 {'node_id': str(node_id_2),
220 'volume_id': str(volume_id),
221 'public_url': 'http://example.com'}], res)
222
223- def test_get_public_files_http_uses_timestamp(self):
224- """The timestamp is used for oauth signing."""
225- fake_timestamp = 12345678
226-
227- def fake_urlopen(request):
228- """A fake urlopen."""
229- auth = request.headers["Authorization"]
230- expected = 'oauth_timestamp="%d"' % fake_timestamp
231- self.assertIn(expected, auth)
232- return StringIO("[]")
233-
234- self.patch(action_queue.timestamp_checker, "get_faithful_time",
235- lambda: fake_timestamp)
236- self.patch(action_queue, "urlopen", fake_urlopen)
237- self.command._get_public_files_http()
238-
239 def test_handle_success_push_event(self):
240 """Test AQ_PUBLIC_FILES_LIST_OK is pushed on success."""
241 response = [{'node_id': uuid.uuid4(), 'volume_id':None,
242@@ -2714,8 +2650,7 @@
243 def test_handle_failure_push_event(self):
244 """Test AQ_PUBLIC_FILES_LIST_ERROR is pushed on failure."""
245 msg = 'Something went wrong'
246- failure = Failure(urllib2.HTTPError(
247- "http://example.com", 500, "Error", [], StringIO(msg)))
248+ failure = Failure(action_queue.txweb.WebClientError("Misc Error", msg))
249 self.command.handle_failure(failure=failure)
250 event = ('AQ_PUBLIC_FILES_LIST_ERROR', {'error': msg})
251 self.assertIn(event, self.command.action_queue.event_queue.events)
252@@ -3700,27 +3635,6 @@
253 self.assertEqual('share_name', name)
254 self.assertTrue(read_only)
255
256- @defer.inlineCallbacks
257- def test_create_share_http_uses_timestamp(self):
258- """The timestamp is used for oauth signing."""
259- fake_timestamp = 12345678
260-
261- def fake_urlopen(request):
262- """A fake urlopen."""
263- auth = request.headers["Authorization"]
264- expected = 'oauth_timestamp="%d"' % fake_timestamp
265- self.assertIn(expected, auth)
266-
267- self.patch(action_queue.timestamp_checker, "get_faithful_time",
268- lambda: fake_timestamp)
269- self.patch(action_queue, "urlopen", fake_urlopen)
270- self.user_connect()
271- command = CreateShare(self.request_queue, 'node_id',
272- 'share_to@example.com', 'share_name',
273- ACCESS_LEVEL_RO, 'marker', 'path')
274- self.assertTrue(command.use_http, 'CreateShare should be in http mode')
275- yield command._run()
276-
277 def test_possible_markers(self):
278 """Test that it returns the correct values."""
279 cmd = CreateShare(self.request_queue, 'node_id', 'shareto@example.com',
280
281=== modified file 'ubuntuone/syncdaemon/action_queue.py'
282--- ubuntuone/syncdaemon/action_queue.py 2012-03-10 00:39:26 +0000
283+++ ubuntuone/syncdaemon/action_queue.py 2012-03-13 03:12:18 +0000
284@@ -31,19 +31,18 @@
285 from collections import deque, defaultdict
286 from functools import partial
287 from urllib import urlencode
288-from urllib2 import urlopen, Request, HTTPError
289 from urlparse import urljoin
290
291 import OpenSSL.SSL
292
293 from zope.interface import implements
294-from twisted.internet import reactor, defer, threads, task
295+from twisted.internet import reactor, defer, task
296 from twisted.internet import error as twisted_errors
297 from twisted.names import client as dns_client
298 from twisted.python.failure import Failure, DefaultException
299
300 from oauth import oauth
301-from ubuntu_sso.utils import timestamp_checker
302+from ubuntu_sso.utils.webclient import txweb
303 from ubuntuone import clientdefs
304 from ubuntuone.platform import platform, remove_file
305 from ubuntuone.storageprotocol import protocol_pb2, content_hash
306@@ -685,6 +684,8 @@
307 # credentials
308 self.token = None
309 self.consumer = None
310+ self.credentials = None
311+ self.webclient = None
312
313 self.client = None # an instance of self.protocol
314
315@@ -730,6 +731,7 @@
316
317 def handle_SYS_USER_CONNECT(self, access_token):
318 """Stow the access token away for later use."""
319+ self.credentials = access_token
320 self.token = oauth.OAuthToken(access_token['token'],
321 access_token['token_secret'])
322 self.consumer = oauth.OAuthConsumer(access_token['consumer_key'],
323@@ -818,6 +820,25 @@
324 else:
325 return defer.succeed((self.host, self.port))
326
327+
328+ @defer.inlineCallbacks
329+ def webcall(self, iri, **kwargs):
330+ """Perform a web call to the api servers."""
331+ webclient = yield self.get_webclient()
332+ response = yield webclient.request(iri,
333+ oauth_credentials=self.credentials, **kwargs)
334+ defer.returnValue(response)
335+
336+ @defer.inlineCallbacks
337+ def get_webclient(self):
338+ """Get the webclient, creating it if needed."""
339+ if self.webclient is None:
340+ client = yield self.tunnel_runner.get_client()
341+ self.webclient = txweb.WebClient(connector=client,
342+ appname="Ubuntu One",
343+ oauth_sign_plain=True)
344+ defer.returnValue(self.webclient)
345+
346 @defer.inlineCallbacks
347 def _make_connection(self, result):
348 """Do the real connect call."""
349@@ -1778,36 +1799,23 @@
350 if share_to and re.match(EREGEX, share_to):
351 self.use_http = True
352
353+ @defer.inlineCallbacks
354 def _create_share_http(self, node_id, user, name, read_only):
355 """Create a share using the HTTP Web API method."""
356
357- url = "https://one.ubuntu.com/files/api/offer_share/"
358- method = oauth.OAuthSignatureMethod_PLAINTEXT()
359- timestamp = timestamp_checker.get_faithful_time()
360- parameters = {"oauth_timestamp": timestamp}
361- request = oauth.OAuthRequest.from_consumer_and_token(
362- http_url=url,
363- http_method="POST",
364- parameters=parameters,
365- oauth_consumer=self.action_queue.consumer,
366- token=self.action_queue.token)
367- request.sign_request(method, self.action_queue.consumer,
368- self.action_queue.token)
369+ iri = u"https://one.ubuntu.com/files/api/offer_share/"
370 data = dict(offer_to_email=user,
371 read_only=read_only,
372 node_id=node_id,
373 share_name=name)
374 pdata = urlencode(data)
375- headers = request.to_header()
376- req = Request(url, pdata, headers)
377- urlopen(req)
378+ yield self.action_queue.webcall(iri, method="POST", post_content=pdata)
379
380 def _run(self):
381 """Do the actual running."""
382 if self.use_http:
383 # External user, do the HTTP REST method
384- return threads.deferToThread(self._create_share_http,
385- self.node_id, self.share_to,
386+ return self._create_share_http(self.node_id, self.share_to,
387 self.name,
388 self.access_level != ACCESS_LEVEL_RW)
389 else:
390@@ -1831,7 +1839,7 @@
391 """It didn't work! Push the event."""
392 self.action_queue.event_queue.push('AQ_CREATE_SHARE_ERROR',
393 marker=self.marker,
394- error=failure.getErrorMessage())
395+ error=failure.value[1])
396
397 def _acquire_pathlock(self):
398 """Acquire pathlock."""
399@@ -2113,6 +2121,7 @@
400 self.node_id = node_id
401 self.is_public = is_public
402
403+ @defer.inlineCallbacks
404 def _change_public_access_http(self):
405 """Change public access using the HTTP Web API method."""
406
407@@ -2123,28 +2132,16 @@
408 base64.urlsafe_b64encode(self.share_id.bytes).strip("="),
409 node_key)
410
411- url = "https://one.ubuntu.com/files/api/set_public/%s" % (node_key,)
412- method = oauth.OAuthSignatureMethod_PLAINTEXT()
413- timestamp = timestamp_checker.get_faithful_time()
414- parameters = {"oauth_timestamp": timestamp}
415- request = oauth.OAuthRequest.from_consumer_and_token(
416- http_url=url,
417- http_method="POST",
418- parameters=parameters,
419- oauth_consumer=self.action_queue.consumer,
420- token=self.action_queue.token)
421- request.sign_request(method, self.action_queue.consumer,
422- self.action_queue.token)
423+ iri = u"https://one.ubuntu.com/files/api/set_public/%s" % (node_key,)
424 data = dict(is_public=bool(self.is_public))
425 pdata = urlencode(data)
426- headers = request.to_header()
427- req = Request(url, pdata, headers)
428- response = urlopen(req)
429- return simplejson.load(response)
430+ response = yield self.action_queue.webcall(iri, method="POST",
431+ post_content=pdata)
432+ defer.returnValue(simplejson.loads(response.content))
433
434 def _run(self):
435 """See ActionQueueCommand."""
436- return threads.deferToThread(self._change_public_access_http)
437+ return self._change_public_access_http()
438
439 def handle_success(self, success):
440 """See ActionQueueCommand."""
441@@ -2156,51 +2153,36 @@
442
443 def handle_failure(self, failure):
444 """It didn't work! Push the event."""
445- if issubclass(failure.type, HTTPError):
446- message = failure.value.read()
447- else:
448- message = failure.getErrorMessage()
449 self.action_queue.event_queue.push('AQ_CHANGE_PUBLIC_ACCESS_ERROR',
450 share_id=self.share_id,
451 node_id=self.node_id,
452- error=message)
453+ error=failure.value[1])
454
455
456 class GetPublicFiles(ActionQueueCommand):
457 """Get the list of public files."""
458
459- __slots__ = ('_url',)
460+ __slots__ = ('_iri',)
461 logged_attrs = ActionQueueCommand.logged_attrs + __slots__
462
463- def __init__(self, request_queue, base_url='https://one.ubuntu.com'):
464+ def __init__(self, request_queue, base_iri=u'https://one.ubuntu.com'):
465 super(GetPublicFiles, self).__init__(request_queue)
466- self._url = urljoin(base_url, 'files/api/public_files')
467+ self._iri = urljoin(base_iri, u'files/api/public_files')
468
469+ @defer.inlineCallbacks
470 def _get_public_files_http(self):
471 """Get public files list using the HTTP Web API method."""
472
473- method = oauth.OAuthSignatureMethod_PLAINTEXT()
474- timestamp = timestamp_checker.get_faithful_time()
475- parameters = {"oauth_timestamp": timestamp}
476- request = oauth.OAuthRequest.from_consumer_and_token(
477- http_url=self._url,
478- http_method="GET",
479- parameters=parameters,
480- oauth_consumer=self.action_queue.consumer,
481- token=self.action_queue.token)
482- request.sign_request(method, self.action_queue.consumer,
483- self.action_queue.token)
484- headers = request.to_header()
485- req = Request(self._url, headers=headers)
486- response = urlopen(req)
487- files = simplejson.load(response)
488+ response = yield self.action_queue.webcall(self._iri, method="GET")
489+
490+ files = simplejson.loads(response.content)
491 # translate nodekeys to (volume_id, node_id)
492 for pf in files:
493 _, node_id = self.split_nodekey(pf.pop('nodekey'))
494 volume_id = pf['volume_id']
495 pf['volume_id'] = '' if volume_id is None else volume_id
496 pf['node_id'] = node_id
497- return files
498+ defer.returnValue(files)
499
500 @property
501 def uniqueness(self):
502@@ -2213,7 +2195,7 @@
503
504 def _run(self):
505 """See ActionQueueCommand."""
506- return threads.deferToThread(self._get_public_files_http)
507+ return self._get_public_files_http()
508
509 def handle_success(self, success):
510 """See ActionQueueCommand."""
511@@ -2222,12 +2204,8 @@
512
513 def handle_failure(self, failure):
514 """It didn't work! Push the event."""
515- if issubclass(failure.type, HTTPError):
516- message = failure.value.read()
517- else:
518- message = failure.getErrorMessage()
519 self.action_queue.event_queue.push('AQ_PUBLIC_FILES_LIST_ERROR',
520- error=message)
521+ error=failure.value[1])
522
523 def split_nodekey(self, nodekey):
524 """Split a node key into a share_id, node_id."""

Subscribers

People subscribed via source and target branches