Merge lp:~nataliabidart/ubuntu-sso-client/stable-3-0-update-2.99.2 into lp:ubuntu-sso-client/stable-3-0

Proposed by Natalia Bidart
Status: Merged
Approved by: dobey
Approved revision: 821
Merged at revision: 820
Proposed branch: lp:~nataliabidart/ubuntu-sso-client/stable-3-0-update-2.99.2
Merge into: lp:ubuntu-sso-client/stable-3-0
Diff against target: 8787 lines (+3936/-3449)
37 files modified
ubuntu_sso/account.py (+11/-14)
ubuntu_sso/credentials.py (+61/-25)
ubuntu_sso/logger.py (+20/-0)
ubuntu_sso/main/__init__.py (+257/-111)
ubuntu_sso/main/linux.py (+181/-164)
ubuntu_sso/main/tests/__init__.py (+37/-0)
ubuntu_sso/main/tests/test_clients.py (+373/-0)
ubuntu_sso/main/tests/test_common.py (+911/-155)
ubuntu_sso/main/tests/test_linux.py (+0/-1275)
ubuntu_sso/main/tests/test_windows.py (+18/-926)
ubuntu_sso/main/windows.py (+185/-663)
ubuntu_sso/networkstate/__init__.py (+8/-0)
ubuntu_sso/networkstate/linux.py (+24/-0)
ubuntu_sso/networkstate/tests/test_linux.py (+137/-2)
ubuntu_sso/networkstate/tests/test_windows.py (+64/-0)
ubuntu_sso/networkstate/windows.py (+13/-5)
ubuntu_sso/qt/controllers.py (+12/-3)
ubuntu_sso/qt/tests/__init__.py (+7/-4)
ubuntu_sso/qt/tests/login_u_p.py (+37/-29)
ubuntu_sso/qt/tests/show_gui.py (+36/-20)
ubuntu_sso/qt/tests/test_controllers.py (+0/-2)
ubuntu_sso/tests/__init__.py (+41/-0)
ubuntu_sso/tests/test_account.py (+17/-3)
ubuntu_sso/tests/test_credentials.py (+11/-14)
ubuntu_sso/utils/ipc.py (+361/-0)
ubuntu_sso/utils/tests/test_common.py (+5/-1)
ubuntu_sso/utils/tests/test_ipc.py (+505/-0)
ubuntu_sso/utils/tests/test_txsecrets.py (+22/-0)
ubuntu_sso/utils/webclient/common.py (+16/-4)
ubuntu_sso/utils/webclient/gsettings.py (+63/-0)
ubuntu_sso/utils/webclient/libsoup.py (+32/-4)
ubuntu_sso/utils/webclient/qtnetwork.py (+36/-6)
ubuntu_sso/utils/webclient/restful.py (+52/-0)
ubuntu_sso/utils/webclient/tests/test_gsettings.py (+131/-0)
ubuntu_sso/utils/webclient/tests/test_restful.py (+148/-0)
ubuntu_sso/utils/webclient/tests/test_webclient.py (+96/-16)
ubuntu_sso/utils/webclient/txweb.py (+8/-3)
To merge this branch: bzr merge lp:~nataliabidart/ubuntu-sso-client/stable-3-0-update-2.99.2
Reviewer Review Type Date Requested Status
dobey (community) Approve
Roberto Alsina (community) Approve
Review via email: mp+88896@code.launchpad.net

Commit message

[ Alejandro J. Cura <email address hidden> ]
  - An async, proxy-enabled replacement for lazr.restfulclient
  - Fix filter that broke maverick and lucid build; removed demo script that
    gives lint error.
  - Proxy aware webclient with QtNetwork and libsoup backends, and integration
    tests (LP: #884963 LP: #884968 LP: #884970 LP: #884971 LP: #911844).
  - Silence warnings printed by dbus on the test run (LP: #912920).
[ Natalia B. Bidart <email address hidden> ]
  - Remove duplications and unify code for IPC using Twisted's Perspective
    Broker between ubuntu-sso-client and ubuntuone-client (LP: #834730).
[ Diego Sarmentero <email address hidden> ]
  - Fixed: Missing page: No network detected (LP: #804610).

To post a comment you must log in.
Revision history for this message
Roberto Alsina (ralsina) wrote :

+1

review: Approve
Revision history for this message
dobey (dobey) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'ubuntu_sso/account.py'
--- ubuntu_sso/account.py 2011-09-22 14:29:21 +0000
+++ ubuntu_sso/account.py 2012-01-17 16:51:50 +0000
@@ -96,15 +96,12 @@
96class Account(object):96class Account(object):
97 """Login and register users using the Ubuntu Single Sign On service."""97 """Login and register users using the Ubuntu Single Sign On service."""
9898
99 def __init__(self, sso_service_class=None):99 def __init__(self, service_url=None):
100 """Create a new SSO Account manager."""100 """Create a new SSO Account manager."""
101 if sso_service_class is None:101 if service_url is not None:
102 self.sso_service_class = ServiceRoot102 self.service_url = service_url
103 else:103 else:
104 self.sso_service_class = sso_service_class104 self.service_url = os.environ.get('USSOC_SERVICE_URL', SERVICE_URL)
105
106 self.service_url = os.environ.get('USSOC_SERVICE_URL', SERVICE_URL)
107
108 logger.info('Created a new SSO access layer for service url %r',105 logger.info('Created a new SSO access layer for service url %r',
109 self.service_url)106 self.service_url)
110107
@@ -134,7 +131,7 @@
134 """Generate a captcha using the SSO service."""131 """Generate a captcha using the SSO service."""
135 logger.debug('generate_captcha: requesting captcha, filename: %r',132 logger.debug('generate_captcha: requesting captcha, filename: %r',
136 filename)133 filename)
137 sso_service = self.sso_service_class(None, self.service_url)134 sso_service = ServiceRoot(None, self.service_url)
138 captcha = sso_service.captchas.new()135 captcha = sso_service.captchas.new()
139136
140 # download captcha and save to 'filename'137 # download captcha and save to 'filename'
@@ -156,7 +153,7 @@
156 logger.debug('register_user: email: %r password: <hidden>, '153 logger.debug('register_user: email: %r password: <hidden>, '
157 'displayname: %r, captcha_id: %r, captcha_solution: %r',154 'displayname: %r, captcha_id: %r, captcha_solution: %r',
158 email, displayname, captcha_id, captcha_solution)155 email, displayname, captcha_id, captcha_solution)
159 sso_service = self.sso_service_class(None, self.service_url)156 sso_service = ServiceRoot(None, self.service_url)
160 if not self._valid_email(email):157 if not self._valid_email(email):
161 logger.error('register_user: InvalidEmailError for email: %r',158 logger.error('register_user: InvalidEmailError for email: %r',
162 email)159 email)
@@ -185,7 +182,7 @@
185 logger.debug('login: email: %r password: <hidden>, token_name: %r',182 logger.debug('login: email: %r password: <hidden>, token_name: %r',
186 email, token_name)183 email, token_name)
187 basic = BasicHttpAuthorizer(email, password)184 basic = BasicHttpAuthorizer(email, password)
188 sso_service = self.sso_service_class(basic, self.service_url)185 sso_service = ServiceRoot(basic, self.service_url)
189 service = sso_service.authentications.authenticate186 service = sso_service.authentications.authenticate
190187
191 try:188 try:
@@ -209,7 +206,7 @@
209 token['consumer_key'],206 token['consumer_key'],
210 token['consumer_secret'],207 token['consumer_secret'],
211 oauth_token)208 oauth_token)
212 sso_service = self.sso_service_class(authorizer, self.service_url)209 sso_service = ServiceRoot(authorizer, self.service_url)
213210
214 me_info = sso_service.accounts.me()211 me_info = sso_service.accounts.me()
215 key = 'preferred_email'212 key = 'preferred_email'
@@ -232,7 +229,7 @@
232 token['consumer_key'],229 token['consumer_key'],
233 token['consumer_secret'],230 token['consumer_secret'],
234 oauth_token)231 oauth_token)
235 sso_service = self.sso_service_class(authorizer, self.service_url)232 sso_service = ServiceRoot(authorizer, self.service_url)
236 result = sso_service.accounts.validate_email(email_token=email_token)233 result = sso_service.accounts.validate_email(email_token=email_token)
237 logger.info('validate_email: email: %r result: %r', email, result)234 logger.info('validate_email: email: %r result: %r', email, result)
238 if 'errors' in result:235 if 'errors' in result:
@@ -245,7 +242,7 @@
245242
246 def request_password_reset_token(self, email):243 def request_password_reset_token(self, email):
247 """Request a token to reset the password for the account 'email'."""244 """Request a token to reset the password for the account 'email'."""
248 sso_service = self.sso_service_class(None, self.service_url)245 sso_service = ServiceRoot(None, self.service_url)
249 service = sso_service.registrations.request_password_reset_token246 service = sso_service.registrations.request_password_reset_token
250 try:247 try:
251 result = service(email=email)248 result = service(email=email)
@@ -266,7 +263,7 @@
266 'request_password_reset_token'.263 'request_password_reset_token'.
267264
268 """265 """
269 sso_service = self.sso_service_class(None, self.service_url)266 sso_service = ServiceRoot(None, self.service_url)
270 service = sso_service.registrations.set_new_password267 service = sso_service.registrations.set_new_password
271 try:268 try:
272 result = service(email=email, token=token,269 result = service(email=email, token=token,
273270
=== modified file 'ubuntu_sso/credentials.py'
--- ubuntu_sso/credentials.py 2012-01-03 16:04:29 +0000
+++ ubuntu_sso/credentials.py 2012-01-17 16:51:50 +0000
@@ -44,7 +44,6 @@
44from functools import wraps44from functools import wraps
4545
46from twisted.internet import defer46from twisted.internet import defer
47from twisted.internet.defer import inlineCallbacks, returnValue
4847
49from ubuntu_sso import NO_OP, utils48from ubuntu_sso import NO_OP, utils
50from ubuntu_sso.keyring import Keyring49from ubuntu_sso.keyring import Keyring
@@ -105,7 +104,7 @@
105 """Decorate 'f' to catch all errors."""104 """Decorate 'f' to catch all errors."""
106105
107 @wraps(f)106 @wraps(f)
108 @inlineCallbacks107 @defer.inlineCallbacks
109 def inner(self, *a, **kw):108 def inner(self, *a, **kw):
110 """Call 'f' within a try-except block.109 """Call 'f' within a try-except block.
111110
@@ -123,7 +122,7 @@
123 error_dict = {ERROR_KEY: msg,122 error_dict = {ERROR_KEY: msg,
124 ERROR_DETAIL_KEY: traceback.format_exc()}123 ERROR_DETAIL_KEY: traceback.format_exc()}
125 self.error_cb(error_dict)124 self.error_cb(error_dict)
126 returnValue(result)125 defer.returnValue(result)
127126
128 return inner127 return inner
129128
@@ -185,7 +184,7 @@
185 self.inner = None # will hold the GUI or SSOLoginRoot instance184 self.inner = None # will hold the GUI or SSOLoginRoot instance
186185
187 @handle_failures(msg='Problem while retrieving credentials')186 @handle_failures(msg='Problem while retrieving credentials')
188 @inlineCallbacks187 @defer.inlineCallbacks
189 def _login_success_cb(self, app_name, email):188 def _login_success_cb(self, app_name, email):
190 """Store credentials when the login/registration succeeded.189 """Store credentials when the login/registration succeeded.
191190
@@ -209,7 +208,7 @@
209 return208 return
210209
211 self.success_cb(creds)210 self.success_cb(creds)
212 returnValue(0)211 defer.returnValue(0)
213212
214 def _auth_denial_cb(self, app_name):213 def _auth_denial_cb(self, app_name):
215 """The user decided not to allow the registration or login."""214 """The user decided not to allow the registration or login."""
@@ -265,16 +264,49 @@
265 @handle_exceptions(msg='Problem logging with email and password.')264 @handle_exceptions(msg='Problem logging with email and password.')
266 def _do_login(self, email, password):265 def _do_login(self, email, password):
267 """Login using email/password, connect outcome signals."""266 """Login using email/password, connect outcome signals."""
268 from ubuntu_sso.main import SSOLoginRoot267 from ubuntu_sso.main import SSOLogin
269 self.inner = SSOLoginRoot()268
270 self.inner.login(app_name=self.app_name, email=email,269 d = defer.Deferred()
271 password=password,270
272 result_cb=self._login_success_cb,271 class DummyProxy(object):
273 error_cb=self._error_cb,272 """A temporary proxy to handle non-traditional login."""
274 not_validated_cb=self._error_cb)273
274 # pylint: disable=C0103
275
276 def LoggedIn(self, app_name, result):
277 """User was logged in."""
278 d.callback(result)
279
280 def LoginError(self, app_name, error):
281 """There was an error on login."""
282 d.errback(error)
283
284 def UserNotValidated(self, app_name, email):
285 """User is not validated."""
286 d.callback(None)
287
288 # pylint: enable=C0103
289
290 self.inner = SSOLogin(proxy=DummyProxy())
291 self.inner.login(app_name=self.app_name,
292 email=email, password=password)
293
294 def _success(result):
295 """Check if 'result' is a valid token, and callback properly."""
296 if result is not None:
297 return self._login_success_cb(self.app_name, email)
298 else:
299 error_dict = {
300 'errtype': 'UserNotValidated',
301 'message': email,
302 }
303 self._error_cb(self.app_name, error_dict)
304
305 d.addCallback(_success)
306 d.addErrback(lambda f: self._error_cb(self.app_name, f.value))
275307
276 @handle_failures(msg='Problem while retrieving credentials')308 @handle_failures(msg='Problem while retrieving credentials')
277 @inlineCallbacks309 @defer.inlineCallbacks
278 def _login_or_register(self, login_only, email=None, password=None):310 def _login_or_register(self, login_only, email=None, password=None):
279 """Get credentials if found else prompt the GUI."""311 """Get credentials if found else prompt the GUI."""
280 logger.info("_login_or_register: login_only=%r email=%r.",312 logger.info("_login_or_register: login_only=%r email=%r.",
@@ -303,20 +335,20 @@
303 logger.debug('Calling success callback at %r.', self._success_cb)335 logger.debug('Calling success callback at %r.', self._success_cb)
304 self._success_cb(self.app_name, creds)336 self._success_cb(self.app_name, creds)
305337
306 @inlineCallbacks338 @defer.inlineCallbacks
307 def find_credentials(self):339 def find_credentials(self):
308 """Get the credentials for 'self.app_name'. Return {} if not there."""340 """Get the credentials for 'self.app_name'. Return {} if not there."""
309 creds = yield Keyring().get_credentials(self.app_name)341 creds = yield Keyring().get_credentials(self.app_name)
310 logger.info('find_credentials: self.app_name %r, '342 logger.info('find_credentials: self.app_name %r, '
311 'result is {}? %s', self.app_name, creds is None)343 'result is {}? %s', self.app_name, creds is None)
312 returnValue(creds if creds is not None else {})344 defer.returnValue(creds if creds is not None else {})
313345
314 @inlineCallbacks346 @defer.inlineCallbacks
315 def clear_credentials(self):347 def clear_credentials(self):
316 """Clear the credentials for 'self.app_name'."""348 """Clear the credentials for 'self.app_name'."""
317 yield Keyring().delete_credentials(self.app_name)349 yield Keyring().delete_credentials(self.app_name)
318350
319 @inlineCallbacks351 @defer.inlineCallbacks
320 def store_credentials(self, token):352 def store_credentials(self, token):
321 """Store the credentials for 'self.app_name'."""353 """Store the credentials for 'self.app_name'."""
322 yield Keyring().set_credentials(self.app_name, token)354 yield Keyring().set_credentials(self.app_name, token)
@@ -325,11 +357,15 @@
325 """Get credentials if found else prompt the GUI to register."""357 """Get credentials if found else prompt the GUI to register."""
326 return self._login_or_register(login_only=False)358 return self._login_or_register(login_only=False)
327359
328 def login(self):360 def login(self, email=None, password=None):
329 """Get credentials if found else prompt the GUI to login."""361 """Get credentials if found else prompt the GUI to login.
330 return self._login_or_register(login_only=True)362
331363 if 'email' and 'password' are given, do not prompt the user and use
332 def login_email_password(self, email, password):364 that to retrieve a token.
333 """Get credentials if found else login using email and password."""365
334 return self._login_or_register(login_only=True,366 """
335 email=email, password=password)367 if email is None or password is None:
368 return self._login_or_register(login_only=True)
369 else:
370 return self._login_or_register(login_only=True,
371 email=email, password=password)
336372
=== modified file 'ubuntu_sso/logger.py'
--- ubuntu_sso/logger.py 2011-11-09 21:21:47 +0000
+++ ubuntu_sso/logger.py 2012-01-17 16:51:50 +0000
@@ -25,6 +25,7 @@
25import os25import os
26import sys26import sys
2727
28from functools import wraps
28from logging.handlers import RotatingFileHandler29from logging.handlers import RotatingFileHandler
2930
30from ubuntu_sso.xdg_base_directory import unicode_path, xdg_cache_home31from ubuntu_sso.xdg_base_directory import unicode_path, xdg_cache_home
@@ -61,3 +62,22 @@
61 logger.addHandler(debug_handler)62 logger.addHandler(debug_handler)
6263
63 return logger64 return logger
65
66
67def log_call(log_func):
68 """Decorator to log, using 'log_func', calls to functions."""
69
70 def middle(f):
71 """Return a function that will act as 'f' but will log the call."""
72
73 @wraps(f)
74 def inner(instance, *a, **kw):
75 """Call 'f(*a, **kw)' and return its result. Log that call."""
76 log_func('%r: emitting %r with args %r and kwargs %r',
77 instance.__class__.__name__, f.__name__, a, kw)
78 result = f(instance, *a, **kw)
79 return result
80
81 return inner
82
83 return middle
6484
=== modified file 'ubuntu_sso/main/__init__.py'
--- ubuntu_sso/main/__init__.py 2011-12-19 19:35:06 +0000
+++ ubuntu_sso/main/__init__.py 2012-01-17 16:51:50 +0000
@@ -1,9 +1,5 @@
1# -*- coding: utf-8 -*-1# -*- coding: utf-8 -*-
2#2#
3# Author: Natalia Bidart <natalia.bidart@canonical.com>
4# Author: Alejandro J. Cura <alecu@canonical.com>
5# Author: Manuel de la Pena <manuel@canonical.com>
6#
7# Copyright 2011 Canonical Ltd.3# Copyright 2011 Canonical Ltd.
8#4#
9# This program is free software: you can redistribute it and/or modify it5# This program is free software: you can redistribute it and/or modify it
@@ -17,10 +13,18 @@
17#13#
18# You should have received a copy of the GNU General Public License along14# You should have received a copy of the GNU General Public License along
19# with this program. If not, see <http://www.gnu.org/licenses/>.15# with this program. If not, see <http://www.gnu.org/licenses/>.
20"""Main object implementations."""16"""Single Sign On client main module.
17
18Provides a utility which accepts requests to the Ubuntu Single Sign On
19service. The OAuth process is handled, including adding the OAuth access token
20to the local keyring.
21
22"""
2123
22import sys24import sys
2325
26from twisted.internet import defer
27
24from ubuntu_sso.account import Account28from ubuntu_sso.account import Account
25from ubuntu_sso.credentials import (29from ubuntu_sso.credentials import (
26 Credentials,30 Credentials,
@@ -35,16 +39,30 @@
35 WINDOW_ID_KEY,39 WINDOW_ID_KEY,
36)40)
37from ubuntu_sso.keyring import get_token_name, Keyring41from ubuntu_sso.keyring import get_token_name, Keyring
38from ubuntu_sso.logger import setup_logging42from ubuntu_sso.logger import setup_logging, log_call
3943
4044
41logger = setup_logging("ubuntu_sso.main")45logger = setup_logging("ubuntu_sso.main")
42U1_PING_URL = "https://one.ubuntu.com/oauth/sso-finished-so-get-tokens/"46U1_PING_URL = "https://one.ubuntu.com/oauth/sso-finished-so-get-tokens/"
43TIMEOUT_INTERVAL = 10000 # 10 seconds47TIMEOUT_INTERVAL = 10000 # 10 seconds
4448
49# pylint: disable=C0103
50
51if sys.platform == 'win32':
52 from ubuntu_sso.main import windows
53 source = windows
54 TIMEOUT_INTERVAL = 10000000000 # forever (hack)
55else:
56 from ubuntu_sso.main import linux
57 source = linux
58
59UbuntuSSOProxy = source.UbuntuSSOProxy
60get_sso_client = source.get_sso_client
61thread_execute = source.blocking
62
4563
46def except_to_errdict(e):64def except_to_errdict(e):
47 """Turn an exception into a dictionary to return thru DBus."""65 """Turn an exception into a dictionary to return thru IPC."""
48 result = {66 result = {
49 "errtype": e.__class__.__name__,67 "errtype": e.__class__.__name__,
50 }68 }
@@ -58,35 +76,71 @@
58 return result76 return result
5977
6078
61class SSOLoginRoot(object):79class SSOLogin(object):
62 """Login thru the Single Sign On service."""80 """Login thru the Single Sign On service."""
6381
64 def __init__(self, sso_login_processor_class=Account,82 def __init__(self, proxy):
65 sso_service_class=None):
66 """Initiate the Login object."""83 """Initiate the Login object."""
67 self.sso_login_processor_class = sso_login_processor_class84 self.processor = Account()
68 self.processor = self.sso_login_processor_class(85 self.proxy = proxy
69 sso_service_class=sso_service_class)86
7087 @log_call(logger.debug)
71 def generate_captcha(self, app_name, filename, result_cb,88 def CaptchaGenerated(self, app_name, result):
72 error_cb):89 """Signal thrown after the captcha is generated."""
90 self.proxy.CaptchaGenerated(app_name, result)
91
92 @log_call(logger.debug)
93 def CaptchaGenerationError(self, app_name, error):
94 """Signal thrown when there's a problem generating the captcha."""
95 error_dict = except_to_errdict(error)
96 self.proxy.CaptchaGenerationError(app_name, error_dict)
97
98 def generate_captcha(self, app_name, filename):
73 """Call the matching method in the processor."""99 """Call the matching method in the processor."""
74 def f():100 def f():
75 """Inner function that will be run in a thread."""101 """Inner function that will be run in a thread."""
76 return self.processor.generate_captcha(filename)102 return self.processor.generate_captcha(filename)
77 thread_execute(f, app_name, result_cb, error_cb)103 thread_execute(f, app_name,
104 self.CaptchaGenerated, self.CaptchaGenerationError)
105
106 @log_call(logger.debug)
107 def UserRegistered(self, app_name, result):
108 """Signal thrown when the user is registered."""
109 self.proxy.UserRegistered(app_name, result)
110
111 @log_call(logger.debug)
112 def UserRegistrationError(self, app_name, error):
113 """Signal thrown when there's a problem registering the user."""
114 error_dict = except_to_errdict(error)
115 self.proxy.UserRegistrationError(app_name, error_dict)
78116
79 def register_user(self, app_name, email, password, name, captcha_id,117 def register_user(self, app_name, email, password, name, captcha_id,
80 captcha_solution, result_cb, error_cb):118 captcha_solution):
81 """Call the matching method in the processor."""119 """Call the matching method in the processor."""
82 def f():120 def f():
83 """Inner function that will be run in a thread."""121 """Inner function that will be run in a thread."""
84 return self.processor.register_user(email, password, name,122 return self.processor.register_user(email, password, name,
85 captcha_id, captcha_solution)123 captcha_id, captcha_solution)
86 thread_execute(f, app_name, result_cb, error_cb)124 thread_execute(f, app_name,
87125 self.UserRegistered, self.UserRegistrationError)
88 def login(self, app_name, email, password, result_cb,126
89 error_cb, not_validated_cb):127 @log_call(logger.debug)
128 def LoggedIn(self, app_name, result):
129 """Signal thrown when the user is logged in."""
130 self.proxy.LoggedIn(app_name, result)
131
132 @log_call(logger.debug)
133 def LoginError(self, app_name, error):
134 """Signal thrown when there is a problem in the login."""
135 error_dict = except_to_errdict(error)
136 self.proxy.LoginError(app_name, error_dict)
137
138 @log_call(logger.debug)
139 def UserNotValidated(self, app_name, email):
140 """Signal thrown when the user is not validated."""
141 self.proxy.UserNotValidated(app_name, email)
142
143 def login(self, app_name, email, password):
90 """Call the matching method in the processor."""144 """Call the matching method in the processor."""
91 def f():145 def f():
92 """Inner function that will be run in a thread."""146 """Inner function that will be run in a thread."""
@@ -103,18 +157,25 @@
103 is_validated = self.processor.is_validated(credentials)157 is_validated = self.processor.is_validated(credentials)
104 logger.debug('user is validated? %r.', is_validated)158 logger.debug('user is validated? %r.', is_validated)
105 if is_validated:159 if is_validated:
106 # pylint: disable=E1101
107 d = Keyring().set_credentials(app_name, credentials)160 d = Keyring().set_credentials(app_name, credentials)
108 d.addCallback(lambda _: result_cb(app_name, email))161 d.addCallback(lambda _: self.LoggedIn(app_name, email))
109 d.addErrback(lambda failure: \162 d.addErrback(lambda f: self.LoginError(app_name, f.value))
110 error_cb(app_name,
111 except_to_errdict(failure.value)))
112 else:163 else:
113 not_validated_cb(app_name, email)164 self.UserNotValidated(app_name, email)
114 thread_execute(f, app_name, success_cb, error_cb)165 thread_execute(f, app_name, success_cb, self.LoginError)
115166
116 def validate_email(self, app_name, email, password, email_token,167 @log_call(logger.debug)
117 result_cb, error_cb):168 def EmailValidated(self, app_name, result):
169 """Signal thrown after the email is validated."""
170 self.proxy.EmailValidated(app_name, result)
171
172 @log_call(logger.debug)
173 def EmailValidationError(self, app_name, error):
174 """Signal thrown when there's a problem validating the email."""
175 error_dict = except_to_errdict(error)
176 self.proxy.EmailValidationError(app_name, error_dict)
177
178 def validate_email(self, app_name, email, password, email_token):
118 """Call the matching method in the processor."""179 """Call the matching method in the processor."""
119180
120 def f():181 def f():
@@ -126,33 +187,54 @@
126187
127 def success_cb(app_name, credentials):188 def success_cb(app_name, credentials):
128 """Validation finished successfully."""189 """Validation finished successfully."""
129 # pylint: disable=E1101
130 d = Keyring().set_credentials(app_name, credentials)190 d = Keyring().set_credentials(app_name, credentials)
131 d.addCallback(lambda _: result_cb(app_name, email))191 d.addCallback(lambda _: self.EmailValidated(app_name, email))
132 failure_cb = lambda f: error_cb(app_name, f.value)192 failure_cb = lambda f: self.EmailValidationError(app_name, f.value)
133 d.addErrback(failure_cb)193 d.addErrback(failure_cb)
134194
135 thread_execute(f, app_name, success_cb, error_cb)195 thread_execute(f, app_name, success_cb, self.EmailValidationError)
136196
137 def request_password_reset_token(self, app_name, email,197 @log_call(logger.debug)
138 result_cb, error_cb):198 def PasswordResetTokenSent(self, app_name, result):
199 """Signal thrown when the token is succesfully sent."""
200 self.proxy.PasswordResetTokenSent(app_name, result)
201
202 @log_call(logger.debug)
203 def PasswordResetError(self, app_name, error):
204 """Signal thrown when there's a problem sending the token."""
205 error_dict = except_to_errdict(error)
206 self.proxy.PasswordResetError(app_name, error_dict)
207
208 def request_password_reset_token(self, app_name, email):
139 """Call the matching method in the processor."""209 """Call the matching method in the processor."""
140 def f():210 def f():
141 """Inner function that will be run in a thread."""211 """Inner function that will be run in a thread."""
142 return self.processor.request_password_reset_token(email)212 return self.processor.request_password_reset_token(email)
143 thread_execute(f, app_name, result_cb, error_cb)213 thread_execute(f, app_name,
144214 self.PasswordResetTokenSent, self.PasswordResetError)
145 def set_new_password(self, app_name, email, token, new_password,215
146 result_cb, error_cb):216 @log_call(logger.debug)
217 def PasswordChanged(self, app_name, result):
218 """Signal thrown when the token is succesfully sent."""
219 self.proxy.PasswordChanged(app_name, result)
220
221 @log_call(logger.debug)
222 def PasswordChangeError(self, app_name, error):
223 """Signal thrown when there's a problem sending the token."""
224 error_dict = except_to_errdict(error)
225 self.proxy.PasswordChangeError(app_name, error_dict)
226
227 def set_new_password(self, app_name, email, token, new_password):
147 """Call the matching method in the processor."""228 """Call the matching method in the processor."""
148 def f():229 def f():
149 """Inner function that will be run in a thread."""230 """Inner function that will be run in a thread."""
150 return self.processor.set_new_password(email, token,231 return self.processor.set_new_password(email, token,
151 new_password)232 new_password)
152 thread_execute(f, app_name, result_cb, error_cb)233 thread_execute(f, app_name,
153234 self.PasswordChanged, self.PasswordChangeError)
154235
155class CredentialsManagementRoot(object):236
237class CredentialsManagement(object):
156 """Object that manages credentials.238 """Object that manages credentials.
157239
158 Every exposed method in this class requires one mandatory argument:240 Every exposed method in this class requires one mandatory argument:
@@ -178,27 +260,12 @@
178260
179 """261 """
180262
181 def __init__(self, timeout_func, shutdown_func, found_cb, error_cb,263 def __init__(self, timeout_func, shutdown_func, proxy):
182 denied_cb, *args, **kwargs):264 super(CredentialsManagement, self).__init__()
183 """Create a new instance.
184
185 - 'found_cb' is a callback that will be executed when the credentials
186 were found.
187
188 - 'error_cb' is a callback that will be executed when there was an
189 error getting the credentials.
190
191 - 'denied_cb' is a callback that will be executed when the user denied
192 the use of the crendetials.
193
194 """
195 super(CredentialsManagementRoot, self).__init__(*args, **kwargs)
196 self._ref_count = 0265 self._ref_count = 0
197 self.timeout_func = timeout_func266 self.timeout_func = timeout_func
198 self.shutdown_func = shutdown_func267 self.shutdown_func = shutdown_func
199 self.found_cb = found_cb268 self.proxy = proxy
200 self.error_cb = error_cb
201 self.denied_cb = denied_cb
202269
203 def _get_ref_count(self):270 def _get_ref_count(self):
204 """Get value of ref_count."""271 """Get value of ref_count."""
@@ -236,12 +303,54 @@
236 """Retrieve values from the generic param 'args'."""303 """Retrieve values from the generic param 'args'."""
237 result = dict(i for i in args.iteritems() if i[0] in self.valid_keys)304 result = dict(i for i in args.iteritems() if i[0] in self.valid_keys)
238 result[WINDOW_ID_KEY] = int(args.get(WINDOW_ID_KEY, 0))305 result[WINDOW_ID_KEY] = int(args.get(WINDOW_ID_KEY, 0))
239 result[SUCCESS_CB_KEY] = self.found_cb306 result[SUCCESS_CB_KEY] = self.CredentialsFound
240 result[ERROR_CB_KEY] = self.error_cb307 result[ERROR_CB_KEY] = self.CredentialsError
241 result[DENIAL_CB_KEY] = self.denied_cb308 result[DENIAL_CB_KEY] = self.AuthorizationDenied
242 return result309 return result
243310
244 def find_credentials(self, app_name, args, success_cb, error_cb):311 @log_call(logger.info)
312 def AuthorizationDenied(self, app_name):
313 """Signal thrown when the user denies the authorization."""
314 self.ref_count -= 1
315 self.proxy.AuthorizationDenied(app_name)
316
317 # do not use log_call decorator since we should not log credentials
318 def CredentialsFound(self, app_name, credentials):
319 """Signal thrown when the credentials are found."""
320 self.ref_count -= 1
321 logger.info('%s: emitting CredentialsFound with app_name "%s".',
322 self.__class__.__name__, app_name)
323 self.proxy.CredentialsFound(app_name, credentials)
324
325 @log_call(logger.info)
326 def CredentialsNotFound(self, app_name):
327 """Signal thrown when the credentials are not found."""
328 self.ref_count -= 1
329 self.proxy.CredentialsNotFound(app_name)
330
331 @log_call(logger.info)
332 def CredentialsCleared(self, app_name):
333 """Signal thrown when the credentials were cleared."""
334 self.ref_count -= 1
335 self.proxy.CredentialsCleared(app_name)
336
337 @log_call(logger.info)
338 def CredentialsStored(self, app_name):
339 """Signal thrown when the credentials were cleared."""
340 self.ref_count -= 1
341 self.proxy.CredentialsStored(app_name)
342
343 @log_call(logger.error)
344 def CredentialsError(self, app_name, error):
345 """Signal thrown when there is a problem getting the credentials."""
346 self.ref_count -= 1
347 if isinstance(error, dict):
348 error_dict = error
349 else:
350 error_dict = except_to_errdict(error)
351 self.proxy.CredentialsError(app_name, error_dict)
352
353 def find_credentials(self, app_name, args, success_cb=None, error_cb=None):
245 """Look for the credentials for an application.354 """Look for the credentials for an application.
246355
247 - 'app_name': the name of the application which credentials are356 - 'app_name': the name of the application which credentials are
@@ -249,21 +358,47 @@
249358
250 - 'args' is a dictionary, currently not used.359 - 'args' is a dictionary, currently not used.
251360
252 - 'success_cb' is a callback that will be execute if the operation was361 - 'success_cb', if not None, will be executed if the operation was
253 a success.362 a success.
254363
255 - 'error_cb' is a callback that will be executed if the operation had364 - 'error_cb', if not None, will be executed if the operation had
256 an error.365 an error.
257366
258 """367 """
368 def _analize_creds(credentials):
369 """Find credentials and notify using signals."""
370 if credentials is not None and len(credentials) > 0:
371 self.CredentialsFound(app_name, credentials)
372 else:
373 self.CredentialsNotFound(app_name)
374
375 def _tweaked_success_cb(creds):
376 """Decrease ref counter and call 'success_cb'."""
377 self.ref_count -= 1
378 success_cb(creds)
379
380 if success_cb is None:
381 _success_cb = _analize_creds
382 else:
383 _success_cb = _tweaked_success_cb
384
385 def _tweaked_error_cb(error, app):
386 """Decrease ref counter and call 'error_cb', modifying the dict."""
387 self.ref_count -= 1
388 error_cb(except_to_errdict(error.value))
389
390 if error_cb is None:
391 _error_cb = lambda f, _: self.CredentialsError(app_name, f.value)
392 else:
393 _error_cb = _tweaked_error_cb
394
259 self.ref_count += 1395 self.ref_count += 1
260 obj = Credentials(app_name)396 obj = Credentials(app_name)
261 d = obj.find_credentials()397 d = obj.find_credentials()
262 # pylint: disable=E1101398 d.addCallback(_success_cb)
263 d.addCallback(success_cb)399 d.addErrback(_error_cb, app_name)
264 d.addErrback(error_cb, app_name)
265400
266 def clear_credentials(self, app_name, args, success_cb, error_cb):401 def clear_credentials(self, app_name, args):
267 """Clear the credentials for an application.402 """Clear the credentials for an application.
268403
269 - 'app_name': the name of the application which credentials are404 - 'app_name': the name of the application which credentials are
@@ -271,21 +406,14 @@
271406
272 - 'args' is a dictionary, currently not used.407 - 'args' is a dictionary, currently not used.
273408
274 - 'success_cb' is a callback that will be execute if the operation was
275 a success.
276
277 - 'error_cb' is a callback that will be executed if the operation had
278 an error.
279
280 """409 """
281 self.ref_count += 1410 self.ref_count += 1
282 obj = Credentials(app_name)411 obj = Credentials(app_name)
283 d = obj.clear_credentials()412 d = obj.clear_credentials()
284 # pylint: disable=E1101413 d.addCallback(lambda _: self.CredentialsCleared(app_name))
285 d.addCallback(success_cb)414 d.addErrback(lambda f: self.CredentialsError(app_name, f.value))
286 d.addErrback(error_cb, app_name)
287415
288 def store_credentials(self, app_name, args, success_cb, error_cb):416 def store_credentials(self, app_name, args):
289 """Store the token for an application.417 """Store the token for an application.
290418
291 - 'app_name': the name of the application which credentials are419 - 'app_name': the name of the application which credentials are
@@ -295,18 +423,12 @@
295 the following mandatory keys: 'token', 'token_key', 'consumer_key',423 the following mandatory keys: 'token', 'token_key', 'consumer_key',
296 'consumer_secret'.424 'consumer_secret'.
297425
298 - 'success_cb' is a callback that will be execute if the operation was
299 a success.
300
301 - 'error_cb' is a callback that will be executed if the operation had
302 an error.
303 """426 """
304 self.ref_count += 1427 self.ref_count += 1
305 obj = Credentials(app_name)428 obj = Credentials(app_name)
306 d = obj.store_credentials(args)429 d = obj.store_credentials(args)
307 # pylint: disable=E1101430 d.addCallback(lambda _: self.CredentialsStored(app_name))
308 d.addCallback(success_cb)431 d.addErrback(lambda f: self.CredentialsError(app_name, f.value))
309 d.addErrback(error_cb, app_name)
310432
311 def register(self, app_name, args):433 def register(self, app_name, args):
312 """Get credentials if found else prompt GUI to register."""434 """Get credentials if found else prompt GUI to register."""
@@ -331,23 +453,47 @@
331 email = args.pop('email')453 email = args.pop('email')
332 password = args.pop('password')454 password = args.pop('password')
333 obj = Credentials(app_name, **self._parse_args(args))455 obj = Credentials(app_name, **self._parse_args(args))
334 obj.login_email_password(email=email, password=password)456 obj.login(email=email, password=password)
335457
336
337# pylint: disable=C0103
338
339if sys.platform == 'win32':
340 from ubuntu_sso.main import windows
341 source = windows
342 TIMEOUT_INTERVAL = 10000000000 # forever
343else:
344 from ubuntu_sso.main import linux
345 source = linux
346
347CredentialsManagement = source.CredentialsManagement
348get_sso_login_backend = source.get_sso_login_backend
349main = source.main
350SSOLogin = source.SSOLogin
351thread_execute = source.blocking
352458
353# pylint: enable=C0103459# pylint: enable=C0103
460
461class UbuntuSSOService(object):
462 """Manager that exposes the diff referenceable objects."""
463
464 def __init__(self):
465 self.proxy = UbuntuSSOProxy(self)
466 self.sso_login = None
467 self.cred_manager = None
468
469 @defer.inlineCallbacks
470 def start(self):
471 """Start the service."""
472 logger.debug('Starting up Ubuntu SSO service...')
473 try:
474 yield self.proxy.start()
475 except:
476 logger.exception('Can not start Ubuntu SSO service:')
477 raise
478 else:
479 logger.info('Ubuntu SSO service started.')
480
481 self.sso_login = SSOLogin(proxy=self.proxy.sso_login)
482 self.cred_manager = CredentialsManagement(
483 timeout_func=source.timeout_func,
484 shutdown_func=source.shutdown_func,
485 proxy=self.proxy.cred_manager)
486
487 def shutdown(self):
488 """Shutdown the service."""
489 return self.proxy.shutdown()
490
491
492def main():
493 """Run the backend service."""
494 logger.info('Setting up Ubuntu SSO service.')
495 source.start_setup()
496 service = UbuntuSSOService()
497 d = service.start()
498 d.addBoth(source.finish_setup)
499 source.main()
354500
=== modified file 'ubuntu_sso/main/linux.py'
--- ubuntu_sso/main/linux.py 2011-12-19 19:35:06 +0000
+++ ubuntu_sso/main/linux.py 2012-01-17 16:51:50 +0000
@@ -13,23 +13,26 @@
13#13#
14# You should have received a copy of the GNU General Public License along14# You should have received a copy of the GNU General Public License along
15# with this program. If not, see <http://www.gnu.org/licenses/>.15# with this program. If not, see <http://www.gnu.org/licenses/>.
16"""Single Sign On login handler.16"""Main module implementation specific for linux.
1717
18An utility which accepts requests for Ubuntu Single Sign On login over D-Bus.18This module should never import from the multiplatform one (main/__init__.py),
1919but the other way around. Likewise, this module should *not* have any logic
20The OAuth process is handled, including adding the OAuth access token to the20regarding error processing or decision making about when to send a given
21local keyring.21signal.
22
23Also, most of the logging is being made in the main module to avoid
24duplication between the different platform implementations.
2225
23"""26"""
2427
25import threading28import threading
26import signal29import signal
27import sys
2830
29import dbus.mainloop.glib31import dbus.mainloop.glib
30import dbus.service32import dbus.service
31import gtk33import gtk
3234
35from twisted.internet import defer
3336
34from ubuntu_sso import (37from ubuntu_sso import (
35 DBUS_ACCOUNT_PATH,38 DBUS_ACCOUNT_PATH,
@@ -39,13 +42,7 @@
39 DBUS_IFACE_USER_NAME,42 DBUS_IFACE_USER_NAME,
40 NO_OP,43 NO_OP,
41)44)
42from ubuntu_sso.account import Account
43from ubuntu_sso.logger import setup_logging45from ubuntu_sso.logger import setup_logging
44from ubuntu_sso.main import (
45 CredentialsManagementRoot,
46 SSOLoginRoot,
47 except_to_errdict,
48)
4946
5047
51# Disable the invalid name warning, as we have a lot of DBus style names48# Disable the invalid name warning, as we have a lot of DBus style names
@@ -62,162 +59,124 @@
62 try:59 try:
63 result_cb(app_name, f())60 result_cb(app_name, f())
64 except Exception, e: # pylint: disable=W070361 except Exception, e: # pylint: disable=W0703
65 msg = "Exception while running DBus blocking code in a thread:"62 msg = "Exception while running blocking code in a thread:"
66 logger.exception(msg)63 logger.exception(msg)
67 error_cb(app_name, except_to_errdict(e))64 error_cb(app_name, e)
68 threading.Thread(target=_in_thread).start()65 threading.Thread(target=_in_thread).start()
6966
7067
71class SSOLogin(dbus.service.Object):68class SSOLoginProxy(dbus.service.Object):
72 """Login thru the Single Sign On service."""69 """Login thru the Single Sign On service."""
7370
74 # Operator not preceded by a space (fails with dbus decorators)71 # Operator not preceded by a space (fails with dbus decorators)
75 # pylint: disable=C032272 # pylint: disable=C0322
7673
77 def __init__(self, bus_name, object_path=DBUS_ACCOUNT_PATH,74 def __init__(self, root, *args, **kwargs):
78 sso_login_processor_class=Account,
79 sso_service_class=None):
80 """Initiate the Login object."""75 """Initiate the Login object."""
81 dbus.service.Object.__init__(self, object_path=object_path,76 super(SSOLoginProxy, self).__init__(*args, **kwargs)
82 bus_name=bus_name)77 self.root = root
83 self.root = SSOLoginRoot(sso_login_processor_class, sso_service_class)
8478
85 # generate_capcha signals79 # generate_capcha signals
86 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")80 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
87 def CaptchaGenerated(self, app_name, result):81 def CaptchaGenerated(self, app_name, result):
88 """Signal thrown after the captcha is generated."""82 """Signal thrown after the captcha is generated."""
89 logger.debug('SSOLogin: emitting CaptchaGenerated with app_name "%s" '
90 'and result %r', app_name, result)
9183
92 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")84 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
93 def CaptchaGenerationError(self, app_name, error):85 def CaptchaGenerationError(self, app_name, error):
94 """Signal thrown when there's a problem generating the captcha."""86 """Signal thrown when there's a problem generating the captcha."""
95 logger.debug('SSOLogin: emitting CaptchaGenerationError with '
96 'app_name "%s" and error %r', app_name, error)
9787
98 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,88 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
99 in_signature='ss')89 in_signature='ss')
100 def generate_captcha(self, app_name, filename):90 def generate_captcha(self, app_name, filename):
101 """Call the matching method in the processor."""91 """Call the matching method in the processor."""
102 self.root.generate_captcha(app_name, filename,92 self.root.sso_login.generate_captcha(app_name, filename)
103 self.CaptchaGenerated,
104 self.CaptchaGenerationError)
10593
106 # register_user signals94 # register_user signals
107 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")95 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
108 def UserRegistered(self, app_name, result):96 def UserRegistered(self, app_name, result):
109 """Signal thrown when the user is registered."""97 """Signal thrown when the user is registered."""
110 logger.debug('SSOLogin: emitting UserRegistered with app_name "%s" '
111 'and result %r', app_name, result)
11298
113 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")99 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
114 def UserRegistrationError(self, app_name, error):100 def UserRegistrationError(self, app_name, error):
115 """Signal thrown when there's a problem registering the user."""101 """Signal thrown when there's a problem registering the user."""
116 logger.debug('SSOLogin: emitting UserRegistrationError with '
117 'app_name "%s" and error %r', app_name, error)
118102
119 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,103 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
120 in_signature='ssssss')104 in_signature='ssssss')
121 def register_user(self, app_name, email, password, name,105 def register_user(self, app_name, email, password, name,
122 captcha_id, captcha_solution):106 captcha_id, captcha_solution):
123 """Call the matching method in the processor."""107 """Call the matching method in the processor."""
124 self.root.register_user(app_name, email, password, name, captcha_id,108 self.root.sso_login.register_user(app_name, email, password, name,
125 captcha_solution,109 captcha_id, captcha_solution)
126 self.UserRegistered,
127 self.UserRegistrationError)
128110
129 # login signals111 # login signals
130 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")112 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
131 def LoggedIn(self, app_name, result):113 def LoggedIn(self, app_name, result):
132 """Signal thrown when the user is logged in."""114 """Signal thrown when the user is logged in."""
133 logger.debug('SSOLogin: emitting LoggedIn with app_name "%s" '
134 'and result %r', app_name, result)
135115
136 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")116 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
137 def LoginError(self, app_name, error):117 def LoginError(self, app_name, error):
138 """Signal thrown when there is a problem in the login."""118 """Signal thrown when there is a problem in the login."""
139 logger.debug('SSOLogin: emitting LoginError with '
140 'app_name "%s" and error %r', app_name, error)
141119
142 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")120 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
143 def UserNotValidated(self, app_name, result):121 def UserNotValidated(self, app_name, result):
144 """Signal thrown when the user is not validated."""122 """Signal thrown when the user is not validated."""
145 logger.debug('SSOLogin: emitting UserNotValidated with app_name "%s" '
146 'and result %r', app_name, result)
147123
148 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,124 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
149 in_signature='sss')125 in_signature='sss')
150 def login(self, app_name, email, password):126 def login(self, app_name, email, password):
151 """Call the matching method in the processor."""127 """Call the matching method in the processor."""
152 self.root.login(app_name, email, password, self.LoggedIn,128 self.root.sso_login.login(app_name, email, password)
153 self.LoginError, self.UserNotValidated)
154129
155 # validate_email signals130 # validate_email signals
156 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")131 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
157 def EmailValidated(self, app_name, result):132 def EmailValidated(self, app_name, result):
158 """Signal thrown after the email is validated."""133 """Signal thrown after the email is validated."""
159 logger.debug('SSOLogin: emitting EmailValidated with app_name "%s" '
160 'and result %r', app_name, result)
161134
162 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")135 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
163 def EmailValidationError(self, app_name, error):136 def EmailValidationError(self, app_name, error):
164 """Signal thrown when there's a problem validating the email."""137 """Signal thrown when there's a problem validating the email."""
165 logger.debug('SSOLogin: emitting EmailValidationError with '
166 'app_name "%s" and error %r', app_name, error)
167138
168 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,139 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
169 in_signature='ssss')140 in_signature='ssss')
170 def validate_email(self, app_name, email, password, email_token):141 def validate_email(self, app_name, email, password, email_token):
171 """Call the matching method in the processor."""142 """Call the matching method in the processor."""
172 self.root.validate_email(app_name, email, password, email_token,143 self.root.sso_login.validate_email(app_name,
173 self.EmailValidated,144 email, password, email_token)
174 self.EmailValidationError)
175145
176 # request_password_reset_token signals146 # request_password_reset_token signals
177 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")147 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
178 def PasswordResetTokenSent(self, app_name, result):148 def PasswordResetTokenSent(self, app_name, result):
179 """Signal thrown when the token is succesfully sent."""149 """Signal thrown when the token is succesfully sent."""
180 logger.debug('SSOLogin: emitting PasswordResetTokenSent with app_name '
181 '"%s" and result %r', app_name, result)
182150
183 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")151 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
184 def PasswordResetError(self, app_name, error):152 def PasswordResetError(self, app_name, error):
185 """Signal thrown when there's a problem sending the token."""153 """Signal thrown when there's a problem sending the token."""
186 logger.debug('SSOLogin: emitting PasswordResetError with '
187 'app_name "%s" and error %r', app_name, error)
188154
189 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,155 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
190 in_signature='ss')156 in_signature='ss')
191 def request_password_reset_token(self, app_name, email):157 def request_password_reset_token(self, app_name, email):
192 """Call the matching method in the processor."""158 """Call the matching method in the processor."""
193 self.root.request_password_reset_token(app_name, email,159 self.root.sso_login.request_password_reset_token(app_name, email)
194 self.PasswordResetTokenSent,
195 self.PasswordResetError)
196160
197 # set_new_password signals161 # set_new_password signals
198 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")162 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
199 def PasswordChanged(self, app_name, result):163 def PasswordChanged(self, app_name, result):
200 """Signal thrown when the token is succesfully sent."""164 """Signal thrown when the token is succesfully sent."""
201 logger.debug('SSOLogin: emitting PasswordChanged with app_name "%s" '
202 'and result %r', app_name, result)
203165
204 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")166 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
205 def PasswordChangeError(self, app_name, error):167 def PasswordChangeError(self, app_name, error):
206 """Signal thrown when there's a problem sending the token."""168 """Signal thrown when there's a problem sending the token."""
207 logger.debug('SSOLogin: emitting PasswordChangeError with '
208 'app_name "%s" and error %r', app_name, error)
209169
210 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,170 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
211 in_signature='ssss')171 in_signature='ssss')
212 def set_new_password(self, app_name, email, token, new_password):172 def set_new_password(self, app_name, email, token, new_password):
213 """Call the matching method in the processor."""173 """Call the matching method in the processor."""
214 self.root.set_new_password(app_name, email, token, new_password,174 self.root.sso_login.set_new_password(app_name,
215 self.PasswordChanged,175 email, token, new_password)
216 self.PasswordChangeError)176
217177
218178class CredentialsManagementProxy(dbus.service.Object):
219class CredentialsManagement(dbus.service.Object):179 """Object that manages credentials.
220 """DBus object that manages credentials.
221180
222 Every exposed method in this class requires one mandatory argument:181 Every exposed method in this class requires one mandatory argument:
223182
@@ -242,67 +201,36 @@
242201
243 """202 """
244203
245 def __init__(self, timeout_func, shutdown_func, *args, **kwargs):204 def __init__(self, root, *args, **kwargs):
246 super(CredentialsManagement, self).__init__(*args, **kwargs)205 super(CredentialsManagementProxy, self).__init__(*args, **kwargs)
247 self.root = CredentialsManagementRoot(timeout_func, shutdown_func,206 self.root = root
248 self.CredentialsFound,
249 self.CredentialsError,
250 self.AuthorizationDenied)
251207
252 # Operator not preceded by a space (fails with dbus decorators)208 # Operator not preceded by a space (fails with dbus decorators)
253 # pylint: disable=C0322209 # pylint: disable=C0322
254210
255 def _process_failure(self, failure, app_name):
256 """Process the 'failure' and emit CredentialsError."""
257 self.CredentialsError(app_name, except_to_errdict(failure.value))
258
259 def shutdown(self):
260 """If no ongoing requests, call self.shutdown_func."""
261 logger.debug('shutdown!, ref_count is %r.', self.root.ref_count)
262 self.root.shutdown()
263
264 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')211 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
265 def AuthorizationDenied(self, app_name):212 def AuthorizationDenied(self, app_name):
266 """Signal thrown when the user denies the authorization."""213 """Signal thrown when the user denies the authorization."""
267 self.root.ref_count -= 1
268 logger.info('%s: emitting AuthorizationDenied with app_name "%s".',
269 self.__class__.__name__, app_name)
270214
271 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')215 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
272 def CredentialsFound(self, app_name, credentials):216 def CredentialsFound(self, app_name, credentials):
273 """Signal thrown when the credentials are found."""217 """Signal thrown when the credentials are found."""
274 self.root.ref_count -= 1
275 logger.info('%s: emitting CredentialsFound with app_name "%s".',
276 self.__class__.__name__, app_name)
277218
278 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')219 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
279 def CredentialsNotFound(self, app_name):220 def CredentialsNotFound(self, app_name):
280 """Signal thrown when the credentials are not found."""221 """Signal thrown when the credentials are not found."""
281 self.root.ref_count -= 1
282 logger.info('%s: emitting CredentialsNotFound with app_name "%s".',
283 self.__class__.__name__, app_name)
284222
285 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')223 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
286 def CredentialsCleared(self, app_name):224 def CredentialsCleared(self, app_name):
287 """Signal thrown when the credentials were cleared."""225 """Signal thrown when the credentials were cleared."""
288 self.root.ref_count -= 1
289 logger.info('%s: emitting CredentialsCleared with app_name "%s".',
290 self.__class__.__name__, app_name)
291226
292 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')227 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
293 def CredentialsStored(self, app_name):228 def CredentialsStored(self, app_name):
294 """Signal thrown when the credentials were cleared."""229 """Signal thrown when the credentials were cleared."""
295 self.root.ref_count -= 1
296 logger.info('%s: emitting CredentialsStored with app_name "%s".',
297 self.__class__.__name__, app_name)
298230
299 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')231 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
300 def CredentialsError(self, app_name, error_dict):232 def CredentialsError(self, app_name, error_dict):
301 """Signal thrown when there is a problem getting the credentials."""233 """Signal thrown when there is a problem getting the credentials."""
302 self.root.ref_count -= 1
303 logger.error('%s: emitting CredentialsError with app_name "%s" and '
304 'error_dict %r.', self.__class__.__name__, app_name,
305 error_dict)
306234
307 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,235 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
308 in_signature='sa{ss}', out_signature='')236 in_signature='sa{ss}', out_signature='')
@@ -315,16 +243,7 @@
315 - 'args' is a dictionary, currently not used.243 - 'args' is a dictionary, currently not used.
316244
317 """245 """
318246 self.root.cred_manager.find_credentials(app_name, args)
319 def success_cb(credentials):
320 """Find credentials and notify using signals."""
321 if credentials is not None and len(credentials) > 0:
322 self.CredentialsFound(app_name, credentials)
323 else:
324 self.CredentialsNotFound(app_name)
325
326 self.root.find_credentials(app_name, args, success_cb,
327 self._process_failure)
328247
329 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,248 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
330 in_signature="sa{ss}", out_signature="a{ss}",249 in_signature="sa{ss}", out_signature="a{ss}",
@@ -337,20 +256,13 @@
337256
338 """257 """
339258
340 def decrease_counter_success(credentials):259 def _drop_dict(error_dict):
341 """Call 'reply_handler' and decrease the root ref counter."""260 """Call 'error_handler' properly."""
342 reply_handler(credentials)261 error_handler(dbus.service.DBusException(error_dict['errtype']))
343 self.root.ref_count -= 1262
344263 self.root.cred_manager.find_credentials(app_name, args,
345 def decrease_counter_error(failure, app_name):264 success_cb=reply_handler,
346 """Call 'error_handler' and decrease the root ref counter."""265 error_cb=_drop_dict)
347 error_dict = except_to_errdict(failure.value)
348 error_handler(dbus.service.DBusException(error_dict))
349 self.root.ref_count -= 1
350
351 self.root.find_credentials(app_name, args,
352 success_cb=decrease_counter_success,
353 error_cb=decrease_counter_error)
354266
355 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,267 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
356 in_signature='sa{ss}', out_signature='')268 in_signature='sa{ss}', out_signature='')
@@ -363,9 +275,7 @@
363 - 'args' is a dictionary, currently not used.275 - 'args' is a dictionary, currently not used.
364276
365 """277 """
366 self.root.clear_credentials(app_name, args,278 self.root.cred_manager.clear_credentials(app_name, args)
367 lambda _: self.CredentialsCleared(app_name),
368 self._process_failure)
369279
370 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,280 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
371 in_signature='sa{ss}', out_signature='')281 in_signature='sa{ss}', out_signature='')
@@ -380,21 +290,19 @@
380 'consumer_secret'.290 'consumer_secret'.
381291
382 """292 """
383 self.root.store_credentials(app_name, args,293 self.root.cred_manager.store_credentials(app_name, args)
384 lambda _: self.CredentialsStored(app_name),
385 self._process_failure)
386294
387 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,295 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
388 in_signature='sa{ss}', out_signature='')296 in_signature='sa{ss}', out_signature='')
389 def register(self, app_name, args):297 def register(self, app_name, args):
390 """Get credentials if found else prompt GUI to register."""298 """Get credentials if found else prompt GUI to register."""
391 self.root.register(app_name, args)299 self.root.cred_manager.register(app_name, args)
392300
393 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,301 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
394 in_signature='sa{ss}', out_signature='')302 in_signature='sa{ss}', out_signature='')
395 def login(self, app_name, args):303 def login(self, app_name, args):
396 """Get credentials if found else prompt GUI to login."""304 """Get credentials if found else prompt GUI to login."""
397 self.root.login(app_name, args)305 self.root.cred_manager.login(app_name, args)
398306
399 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,307 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
400 in_signature='sa{ss}', out_signature='')308 in_signature='sa{ss}', out_signature='')
@@ -406,12 +314,121 @@
406 returned trough the CredentialsFound signal.314 returned trough the CredentialsFound signal.
407315
408 """316 """
409 self.root.login_email_password(app_name, args)317 self.root.cred_manager.login_email_password(app_name, args)
410318
411319
412def get_sso_login_backend():320class UbuntuSSOProxy(object):
413 """Get the backend for the Login service."""321 """Object that exposes the diff referenceable objects."""
414 raise NotImplementedError()322
323 def __init__(self, root):
324 self.root = root
325 self.bus = dbus.SessionBus()
326 self.sso_login = None
327 self.cred_manager = None
328
329 def start(self):
330 """Start listening, nothing async to be done in this platform."""
331 # Register DBus service for making sure we run only one instance
332 name = self.bus.request_name(DBUS_BUS_NAME,
333 dbus.bus.NAME_FLAG_DO_NOT_QUEUE)
334 if name == dbus.bus.REQUEST_NAME_REPLY_EXISTS:
335 raise AlreadyStartedError()
336
337 bus_name = dbus.service.BusName(DBUS_BUS_NAME, bus=self.bus)
338 self.sso_login = SSOLoginProxy(self.root,
339 bus_name=bus_name,
340 object_path=DBUS_ACCOUNT_PATH)
341 self.cred_manager = CredentialsManagementProxy(self.root,
342 bus_name=bus_name,
343 object_path=DBUS_CREDENTIALS_PATH)
344
345 return defer.succeed(None)
346
347 def shutdown(self):
348 """Shutdown the service."""
349 self.sso_login.remove_from_connection()
350 self.cred_manager.remove_from_connection()
351 self.bus.release_name(DBUS_BUS_NAME)
352 return defer.succeed(None)
353
354
355# ============================== client classes ==============================
356
357
358class RemoteClient(object):
359 """Client that can perform calls to remote DBus object."""
360
361 bus_name = None
362 path = None
363 interface = None
364
365 def __init__(self):
366 self.bus = dbus.SessionBus()
367 obj = self.bus.get_object(bus_name=self.bus_name,
368 object_path=self.path,
369 follow_name_owner_changes=True)
370 self.dbus_iface = dbus.Interface(obj, dbus_interface=self.interface)
371 self.dbus_iface.call_method = self.call_method
372 self.dbus_iface.disconnect_from_signal = lambda _, sig: sig.remove()
373
374 def call_method(self, method_name, *args, **kwargs):
375 """Call asynchronously 'method_name(*args)'.
376
377 Return a deferred that will be fired when the call finishes.
378
379 """
380 d = defer.Deferred()
381
382 reply_handler = kwargs.get('reply_handler', None)
383 if reply_handler is not None:
384 d.addCallback(lambda a: reply_handler(*a))
385
386 error_handler = kwargs.get('error_handler', None)
387 if error_handler is not None:
388 d.addErrback(lambda f: error_handler(f.value))
389
390 self.bus.call_async(
391 bus_name=self.bus_name, object_path=self.path,
392 dbus_interface=self.interface, method=method_name,
393 signature=None, args=args,
394 reply_handler=lambda *a: d.callback(a),
395 error_handler=d.errback)
396
397 return d
398
399
400class SSOLoginClient(RemoteClient):
401 """Access the UserManagement DBus interface."""
402
403 bus_name = DBUS_BUS_NAME
404 path = DBUS_ACCOUNT_PATH
405 interface = DBUS_IFACE_USER_NAME
406
407
408class CredentialsManagementClient(RemoteClient):
409 """Access the CredentialsManagement DBus interface."""
410
411 bus_name = DBUS_BUS_NAME
412 path = DBUS_CREDENTIALS_PATH
413 interface = DBUS_CREDENTIALS_IFACE
414
415
416class UbuntuSSOClient(object):
417 """Base client that provides remote access to the sso API."""
418
419 def __init__(self):
420 self.sso_login = SSOLoginClient().dbus_iface
421 self.cred_manager = CredentialsManagementClient().dbus_iface
422
423 def disconnect(self):
424 """No need to disconnect DBus proxy objects."""
425 return defer.succeed(None)
426
427
428def get_sso_client():
429 """Get a client to access the SSO service."""
430 result = UbuntuSSOClient()
431 return defer.succeed(result)
415432
416433
417def sighup_handler(*a, **kw):434def sighup_handler(*a, **kw):
@@ -422,32 +439,32 @@
422 #439 #
423 # gtk.main_quit and the logger methods are safe to be called from any440 # gtk.main_quit and the logger methods are safe to be called from any
424 # thread. Just don't call other random stuff here.441 # thread. Just don't call other random stuff here.
425 logger.info("Stoping Ubuntu SSO login manager since SIGHUP was received.")442 logger.info("Stoping Ubuntu SSO service since SIGHUP was received.")
426 gtk.main_quit()443 gtk.main_quit()
427444
428445
429def main():446class AlreadyStartedError(Exception):
430 """Run the backend service."""447 """The backend service has already been started."""
448
449
450timeout_func = gtk.timeout_add
451shutdown_func = gtk.main_quit
452
453
454def start_setup():
455 """Setup the env to run the service."""
431 dbus.mainloop.glib.threads_init()456 dbus.mainloop.glib.threads_init()
432 gtk.gdk.threads_init()457 gtk.gdk.threads_init()
433 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)458 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
434459
435 bus = dbus.SessionBus()460
436 # Register DBus service for making sure we run only one instance461def finish_setup(result):
437 name = bus.request_name(DBUS_BUS_NAME,462 """Run the specific mainloop only if no failure ocurred."""
438 dbus.bus.NAME_FLAG_DO_NOT_QUEUE)463 if result is None: # no failure ocurred, start the service
439 if name == dbus.bus.REQUEST_NAME_REPLY_EXISTS:464 logger.debug("Hooking up SIGHUP with handler %r.", sighup_handler)
440 logger.error("Ubuntu SSO login manager already running, quitting.")465 signal.signal(signal.SIGHUP, sighup_handler)
441 sys.exit(0)466 gtk.main()
442467
443 logger.debug("Hooking up SIGHUP with handler %r.", sighup_handler)468
444 signal.signal(signal.SIGHUP, sighup_handler)469def main():
445470 """Run the specific mainloop."""
446 logger.info("Starting Ubuntu SSO login manager for bus %r.", DBUS_BUS_NAME)
447 bus_name = dbus.service.BusName(DBUS_BUS_NAME, bus=dbus.SessionBus())
448 SSOLogin(bus_name, object_path=DBUS_ACCOUNT_PATH)
449 CredentialsManagement(timeout_func=gtk.timeout_add,
450 shutdown_func=gtk.main_quit,
451 bus_name=bus_name, object_path=DBUS_CREDENTIALS_PATH)
452
453 gtk.main()
454471
=== modified file 'ubuntu_sso/main/tests/__init__.py'
--- ubuntu_sso/main/tests/__init__.py 2011-02-24 13:06:07 +0000
+++ ubuntu_sso/main/tests/__init__.py 2012-01-17 16:51:50 +0000
@@ -15,3 +15,40 @@
15# You should have received a copy of the GNU General Public License along15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Test the different main implementations."""17"""Test the different main implementations."""
18
19import sys
20
21from twisted.internet import defer
22
23from ubuntu_sso.tests import TOKEN
24
25# Invalid name "BaseTestCase"
26# pylint: disable=C0103
27
28if sys.platform == 'win32':
29 from ubuntu_sso.utils.tests.test_ipc import BaseIPCTestCase
30 BaseTestCase = BaseIPCTestCase
31else:
32 from ubuntuone.devtools.testcases.dbus import DBusTestCase
33 BaseTestCase = DBusTestCase
34
35# pylint: enable=C0103
36
37
38class FakedCredentials(object):
39 """A very dummy Credentials object."""
40
41 def __init__(self, *a, **kw):
42 self.login = self.register = lambda *a, **kw: None
43
44 def find_credentials(self, *a, **kw):
45 """Retrieve credentials."""
46 return defer.succeed(TOKEN)
47
48 def clear_credentials(self, *a, **kw):
49 """Clear credentials."""
50 return defer.succeed(None)
51
52 def store_credentials(self, *a, **kw):
53 """Store credentials."""
54 return defer.succeed(None)
1855
=== added file 'ubuntu_sso/main/tests/test_clients.py'
--- ubuntu_sso/main/tests/test_clients.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/tests/test_clients.py 2012-01-17 16:51:50 +0000
@@ -0,0 +1,373 @@
1# -*- coding: utf-8 -*-
2#
3# Copyright 2012 Canonical Ltd.
4#
5# This program is free software: you can redistribute it and/or modify it
6# under the terms of the GNU General Public License version 3, as published
7# by the Free Software Foundation.
8#
9# This program is distributed in the hope that it will be useful, but
10# WITHOUT ANY WARRANTY; without even the implied warranties of
11# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12# PURPOSE. See the GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License along
15# with this program. If not, see <http://www.gnu.org/licenses/>.
16"""Tests for the main SSO client code."""
17
18from twisted.internet import defer
19from ubuntuone.devtools.testcases import skipIfOS
20
21from ubuntu_sso import main
22from ubuntu_sso.tests import (
23 APP_NAME,
24 CAPTCHA_ID,
25 CAPTCHA_SOLUTION,
26 EMAIL,
27 EMAIL_TOKEN,
28 NAME,
29 PASSWORD,
30 TOKEN,
31)
32from ubuntu_sso.main.tests import BaseTestCase, FakedCredentials
33
34FILENAME = 'sample filename'
35
36
37class FakedKeyring(object):
38 """A faked Keyring object."""
39
40 _keys = {}
41
42 def get_credentials(self, app_name):
43 """Return the credentials for app_name."""
44 return defer.succeed(self._keys.get(app_name, {}))
45
46 def delete_credentials(self, app_name):
47 """Delete the credentials for app_name."""
48 self._keys.pop(app_name, None)
49 return defer.succeed(None)
50
51 def set_credentials(self, app_name, token):
52 """Store the credentials for app_name."""
53 self._keys[app_name] = token
54 return defer.succeed(None)
55
56
57class AbstractTestCase(BaseTestCase):
58 """The base test case with platform specific support."""
59
60 timeout = 2
61 method = None
62 backend_method = method
63 params = ()
64 success_signal = None
65 backend_result = None
66 success_result = None
67 error_signal = None
68
69 @defer.inlineCallbacks
70 def setUp(self):
71 yield super(AbstractTestCase, self).setUp()
72 self.keyring = FakedKeyring()
73 self.patch(main, 'Keyring', lambda: self.keyring)
74
75 # avoid putting stuff in the mainloops
76 self.patch(main.source, 'timeout_func', lambda *a: None)
77 self.patch(main.source, 'shutdown_func', lambda *a: None)
78
79 self.sso_service = main.UbuntuSSOService()
80 yield self.sso_service.start()
81 self.addCleanup(self.sso_service.shutdown)
82
83 self.sso_client = yield main.get_sso_client()
84 self.addCleanup(self.sso_client.disconnect)
85
86 self.client = self.patchable_backend = None
87
88 if self.backend_method is None:
89 self.backend_method = self.method
90
91 def _backend_succeed(self, *args, **kwargs):
92 """Make self.patchable_backend return self.backend_result."""
93 return self.backend_result
94
95 def _backend_fail(self, *args, **kwargs):
96 """Make self.patchable_backend raise an exception."""
97 raise ValueError((args, kwargs))
98
99 @defer.inlineCallbacks
100 def assert_method_correct(self, success_signal, error_signal,
101 patched_backend_method, expected_result=None):
102 """Calling 'self.method' works ok.
103
104 Check that 'success_signal' is emitted, and make the test fail if
105 error_signal is received.
106
107 The self.patchable_backend will be patched with
108 'patched_backend_method'.
109
110 """
111 if self.method is None:
112 defer.returnValue(None)
113
114 d = defer.Deferred()
115
116 cb = lambda *a: d.callback(a)
117 match = self.client.connect_to_signal(success_signal, cb)
118 self.addCleanup(self.client.disconnect_from_signal,
119 success_signal, match)
120
121 eb = lambda *a: d.errback(AssertionError(a))
122 match = self.client.connect_to_signal(error_signal, eb)
123 self.addCleanup(self.client.disconnect_from_signal,
124 error_signal, match)
125
126 self.patch(self.patchable_backend, self.backend_method,
127 patched_backend_method)
128
129 yield self.client.call_method(self.method, *self.params)
130
131 result = yield d
132 self.assertEqual(expected_result, result)
133
134 def test_success(self):
135 """Test that the 'method' works ok."""
136 success_signal = self.success_signal
137 error_signal = self.error_signal
138 patched_backend_method = self._backend_succeed
139 expected_result = self.success_result
140
141 return self.assert_method_correct(success_signal, error_signal,
142 patched_backend_method, expected_result)
143
144 def test_error(self):
145 """Test that the 'method' fails as expected."""
146 success_signal = self.error_signal
147 error_signal = self.success_signal
148 patched_backend_method = self._backend_fail
149 expected_result = (APP_NAME, dict(errtype='ValueError'))
150
151 return self.assert_method_correct(success_signal, error_signal,
152 patched_backend_method, expected_result)
153
154
155class SSOLoginProxyTestCase(AbstractTestCase):
156 """Test the SSOLoginProxy interface."""
157
158 @defer.inlineCallbacks
159 def setUp(self):
160 yield super(SSOLoginProxyTestCase, self).setUp()
161 self.client = self.sso_client.sso_login
162 self.patchable_backend = self.sso_service.sso_login.processor
163
164
165class GenerateCaptchaTestCase(SSOLoginProxyTestCase):
166 """Test the generate_captcha method."""
167
168 method = 'generate_captcha'
169 params = (APP_NAME, FILENAME)
170 success_signal = 'CaptchaGenerated'
171 error_signal = 'CaptchaGenerationError'
172 backend_result = 'a captcha id'
173 success_result = (APP_NAME, backend_result)
174
175
176class RegisterUserTestCase(SSOLoginProxyTestCase):
177 """Test the register_user method."""
178
179 method = 'register_user'
180 params = (APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID, CAPTCHA_SOLUTION)
181 success_signal = 'UserRegistered'
182 error_signal = 'UserRegistrationError'
183 backend_result = EMAIL
184 success_result = (APP_NAME, backend_result)
185
186
187class LoginTestCase(SSOLoginProxyTestCase):
188 """Test the login method."""
189
190 method = 'login'
191 params = (APP_NAME, EMAIL, PASSWORD)
192 success_signal = 'LoggedIn'
193 error_signal = 'LoginError'
194 backend_result = TOKEN
195 success_result = (APP_NAME, EMAIL)
196
197 @defer.inlineCallbacks
198 def test_success(self):
199 """Test that the 'method' works ok when the user is validated."""
200 self.patch(self.patchable_backend, 'is_validated', lambda _: True)
201
202 yield super(LoginTestCase, self).test_success()
203
204 expected_credentials = yield self.keyring.get_credentials(APP_NAME)
205 self.assertEqual(expected_credentials, TOKEN)
206
207 def test_not_validated(self):
208 """Test that the 'method' works ok when the user is not validated."""
209 self.patch(self.patchable_backend, 'is_validated', lambda _: False)
210 self.patch(self, 'success_signal', 'UserNotValidated')
211
212 return super(LoginTestCase, self).test_success()
213
214 def test_error_when_setting_credentials(self):
215 """The 'error_signal' is emitted when credentials can't be set."""
216 self.patch(self.patchable_backend, 'is_validated', lambda _: True)
217 exc = TypeError('foo')
218 self.patch(self.keyring, 'set_credentials', lambda *a: defer.fail(exc))
219 patched_backend_method = lambda *a, **kw: self.backend_result
220 expected_result = (APP_NAME, dict(errtype='TypeError', message='foo'))
221
222 return self.assert_method_correct('LoginError', 'LoggedIn',
223 patched_backend_method, expected_result)
224
225
226class ValidateEmailTestCase(SSOLoginProxyTestCase):
227 """Test the validate_email method."""
228
229 method = 'validate_email'
230 params = (APP_NAME, EMAIL, PASSWORD, EMAIL_TOKEN)
231 success_signal = 'EmailValidated'
232 error_signal = 'EmailValidationError'
233 backend_result = TOKEN
234 success_result = (APP_NAME, EMAIL)
235
236
237class RequestPasswordResetTokenTestCase(SSOLoginProxyTestCase):
238 """Test the request_password_reset_token method."""
239
240 method = 'request_password_reset_token'
241 params = (APP_NAME, EMAIL)
242 success_signal = 'PasswordResetTokenSent'
243 error_signal = 'PasswordResetError'
244 backend_result = EMAIL
245 success_result = (APP_NAME, backend_result)
246
247
248class SetNewPasswordTestCase(SSOLoginProxyTestCase):
249 """Test the set_new_password method."""
250
251 method = 'set_new_password'
252 params = (APP_NAME, EMAIL, EMAIL_TOKEN, PASSWORD)
253 success_signal = 'PasswordChanged'
254 error_signal = 'PasswordChangeError'
255 backend_result = EMAIL
256 success_result = (APP_NAME, backend_result)
257
258
259class CredentialsManagementProxyTestCase(AbstractTestCase):
260 """Tests for the CredentialsManagementProxy DBus interface."""
261
262 error_signal = 'CredentialsError'
263 args = dict(foo='bar', fuh='baz')
264 params = (APP_NAME, args)
265
266 @defer.inlineCallbacks
267 def setUp(self):
268 yield super(CredentialsManagementProxyTestCase, self).setUp()
269 self.credentials = FakedCredentials()
270 self.patch(main, 'Credentials', lambda *a, **kw: self.credentials)
271
272 self.client = self.sso_client.cred_manager
273 self.patchable_backend = self.credentials
274
275 def _backend_succeed(self, *args, **kwargs):
276 """Make self.patchable_backend return self.backend_result."""
277 return defer.succeed(self.backend_result)
278
279 def _backend_fail(self, *args, **kwargs):
280 """Make self.patchable_backend return a failed deferred."""
281 return defer.fail(ValueError((args, kwargs)))
282
283
284class FindCredentialsTestCase(CredentialsManagementProxyTestCase):
285 """Test the find_credentials method."""
286
287 method = 'find_credentials'
288 success_signal = 'CredentialsFound'
289 backend_result = TOKEN
290 success_result = (APP_NAME, backend_result)
291
292 @skipIfOS('win32', 'find_credentials_sync is only provided in Linux '
293 'due to compatibility issues with old clients.')
294 @defer.inlineCallbacks
295 def test_find_credentials_sync(self):
296 """The credentials are asked and returned in a sync call."""
297 d = defer.Deferred()
298
299 self.client.call_method('find_credentials_sync',
300 APP_NAME, self.args,
301 reply_handler=d.callback,
302 error_handler=d.errback)
303 creds = yield d
304 self.assertEqual(creds, TOKEN)
305
306 @skipIfOS('win32', 'find_credentials_sync is only provided in Linux '
307 'due to compatibility issues with old clients.')
308 @defer.inlineCallbacks
309 def test_find_credentials_sync_error(self):
310 """If find_credentials_sync fails, error_handler is called."""
311 self.patch(self.credentials, 'find_credentials', self._backend_fail)
312 d = defer.Deferred()
313
314 self.client.call_method('find_credentials_sync',
315 APP_NAME, self.args,
316 reply_handler=d.errback,
317 error_handler=d.callback)
318 error = yield d
319 error = error.args[0]
320 self.assertEqual(error, 'ValueError')
321
322
323class ClearCredentialsTestCase(CredentialsManagementProxyTestCase):
324 """Test the clear_credentials method."""
325
326 method = 'clear_credentials'
327 success_signal = 'CredentialsCleared'
328 success_result = (APP_NAME,)
329
330
331class StoreCredentialsTestCase(CredentialsManagementProxyTestCase):
332 """Test the store_credentials method."""
333
334 method = 'store_credentials'
335 success_signal = 'CredentialsStored'
336 success_result = (APP_NAME,)
337
338
339class CredentialsManagementOpsTestCase(CredentialsManagementProxyTestCase):
340 """Tests for the CredentialsManagementProxy login/register methods."""
341
342 success_signal = 'CredentialsFound'
343 success_result = (APP_NAME, TOKEN)
344
345 def _backend_succeed(self, *args, **kwargs):
346 """Make self.patchable_backend return self.backend_result."""
347 self.sso_service.cred_manager.CredentialsFound(APP_NAME, TOKEN)
348
349 def _backend_fail(self, *args, **kwargs):
350 """Make self.patchable_backend fail."""
351 self.sso_service.cred_manager.CredentialsError(APP_NAME,
352 dict(errtype='ValueError'))
353
354
355class RegisterTestCase(CredentialsManagementOpsTestCase):
356 """Test the register method."""
357
358 method = 'register'
359
360
361class LoginOnlyTestCase(CredentialsManagementOpsTestCase):
362 """Test the login method."""
363
364 method = 'login'
365
366
367class LoginEmailPasswordTestCase(CredentialsManagementOpsTestCase):
368 """Test the login method."""
369
370 method = 'login_email_password'
371 backend_method = 'login'
372 args = dict(email=EMAIL, password=PASSWORD)
373 params = (APP_NAME, args)
0374
=== modified file 'ubuntu_sso/main/tests/test_common.py'
--- ubuntu_sso/main/tests/test_common.py 2011-09-27 14:06:12 +0000
+++ ubuntu_sso/main/tests/test_common.py 2012-01-17 16:51:50 +0000
@@ -1,12 +1,6 @@
1# -*- coding: utf-8 -*-1# -*- coding: utf-8 -*-
2#2#
3# test_main - tests for ubuntu_sso.main3# Copyright 2009-2011 Canonical Ltd.
4#
5# Author: Natalia Bidart <natalia.bidart@canonical.com>
6# Author: Alejandro J. Cura <alecu@canonical.com>
7# Author: Manuel de la Pena <manuel@canonical.com>
8#
9# Copyright 2009-2010 Canonical Ltd.
10#4#
11# This program is free software: you can redistribute it and/or modify it5# This program is free software: you can redistribute it and/or modify it
12# under the terms of the GNU General Public License version 3, as published6# under the terms of the GNU General Public License version 3, as published
@@ -19,161 +13,923 @@
19#13#
20# You should have received a copy of the GNU General Public License along14# You should have received a copy of the GNU General Public License along
21# with this program. If not, see <http://www.gnu.org/licenses/>.15# with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Tests share by diff platforms."""16"""Tests for the main SSO client code."""
2317
24from unittest import TestCase18import logging
2519
26from mocker import MockerTestCase, MATCH20from twisted.internet import defer
2721from ubuntuone.devtools.handlers import MementoHandler
22
23from ubuntu_sso import main
28from ubuntu_sso.main import (24from ubuntu_sso.main import (
29 CredentialsManagement,25 CredentialsManagement,
26 except_to_errdict,
30 SSOLogin,27 SSOLogin,
28 thread_execute,
29 TIMEOUT_INTERVAL,
30 UbuntuSSOService,
31)31)
3232from ubuntu_sso.main import (HELP_TEXT_KEY, PING_URL_KEY,
3333 TC_URL_KEY, UI_CLASS_KEY, UI_MODULE_KEY, WINDOW_ID_KEY,
34class SSOLoginMockedTestCase(MockerTestCase):34 SUCCESS_CB_KEY, ERROR_CB_KEY, DENIAL_CB_KEY)
35 """Test that the call are relied correctly."""35from ubuntu_sso.main.tests import FakedCredentials
3636from ubuntu_sso.tests import (APP_NAME, TC_URL, HELP_TEXT, CAPTCHA_ID,
37 def setUp(self):37 CAPTCHA_SOLUTION, EMAIL, EMAIL_TOKEN, NAME, PASSWORD, PING_URL, TOKEN,
38 """Setup tests."""38 WINDOW_ID, Recorder, TestCase)
39 super(SSOLoginMockedTestCase, self).setUp()39
40 self.root = self.mocker.mock()40
41 mockbusname = self.mocker.mock()41# Access to a protected member 'yyy' of a client class
42 mockbus = self.mocker.mock()42# pylint: disable=W0212
43 mockbusname.get_bus()43
44 self.mocker.result(mockbus)44
45 self.login = SSOLogin(mockbus)45class SampleException(Exception):
46 self.login.root = self.root46 """The exception that will be thrown by the fake thread_execute."""
47 self.mocker.reset()47
4848
49 def test_generate_captcha(self):49def fake_ok_thread_execute(f, app, cb, eb):
50 """Test that the call is relayed."""50 """A fake thread_execute function that succeeds."""
51 app_name = 'app'51 cb(app, f())
52 filename = 'file'52
53 self.root.generate_captcha(app_name, filename,53
54 MATCH(callable), MATCH(callable))54def fake_err_thread_execute(f, app, cb, eb):
55 self.mocker.replay()55 """A fake thread_execute function that fails."""
56 self.login.generate_captcha(app_name, filename)56 try:
5757 f()
58 def test_register_user(self):58 except Exception, e: # pylint: disable=W0703
59 """Test that the call is relayed."""59 eb(app, except_to_errdict(e))
60 app_name = 'app'60 else:
61 email = 'email'61 eb(app, except_to_errdict(SampleException()))
62 password = 'pwd'62
63 name = 'display name'63
64 captcha_id = 'id'64class FakedProxy(Recorder):
65 captcha_solution = 'hello'65 """A faked multiplatform proxy."""
66 self.root.register_user(app_name, email, password, name, captcha_id,66
67 captcha_solution,67
68 MATCH(callable), MATCH(callable))68class FakedAccount(Recorder):
69 self.mocker.replay()69 """A faked Account processor."""
70 self.login.register_user(app_name, email, password, name, captcha_id,70
71 captcha_solution)71
7272class FakedCredentialsFactory(Recorder):
73 def test_login(self):73 """A very dummy Credentials object."""
74 """Test that the call is relayed."""74
75 app_name = 'app'75 credentials = FakedCredentials()
76 email = 'email'76
77 password = 'password'77 # pylint: disable=C0103
78 self.root.login(app_name, email, password,78
79 MATCH(callable), MATCH(callable),79 def Credentials(self, *a, **kw):
80 MATCH(callable))80 """Return always the same Credentials instance."""
81 self.mocker.mock()81 return self.credentials
82 self.mocker.replay()82
83 self.login.login(app_name, email, password)83 # pylint: enable=C0103
8484
85 def test_validate_email(self):85
86 """Test that the call is relayed."""86class TestExceptToErrdictException(Exception):
87 app_name = 'app'87 """A dummy exception for the following testcase."""
88 email = 'email'88
89 password = 'passwrd'89
90 email_token = 'token'90class ExceptToErrdictTestCase(TestCase):
91 self.root.validate_email(app_name, email, password, email_token,91 """Tests for the except_to_errdict function."""
92 MATCH(callable),92
93 MATCH(callable))93 def test_first_arg_is_dict(self):
94 self.mocker.replay()94 """If the first arg is a dict, use it as the base dict."""
95 self.login.validate_email(app_name, email, password, email_token)95 sample_dict = {
9696 "errorcode1": "error message 1",
97 def test_request_password_reset_token(self):97 "errorcode2": "error message 2",
98 """Test that the call is relayed."""98 "errorcode3": "error message 3",
99 app_name = 'app'99 }
100 email = 'email'100 e = TestExceptToErrdictException(sample_dict)
101 self.root.request_password_reset_token(app_name, email,101 result = except_to_errdict(e)
102 MATCH(callable),102
103 MATCH(callable))103 self.assertEqual(result["errtype"], e.__class__.__name__)
104 self.mocker.replay()104 for k in sample_dict.keys():
105 self.login.request_password_reset_token(app_name, email)105 self.assertIn(k, result)
106106 self.assertEqual(result[k], sample_dict[k])
107 def test_set_new_password(self):107
108 """Test that the call is relayed."""108 def test_first_arg_is_str(self):
109 app_name = 'app'109 """If the first arg is a str, use it as the message."""
110 email = 'email'110 sample_string = "a sample string"
111 token = 'token'111 e = TestExceptToErrdictException(sample_string)
112 new_password = 'new'112 result = except_to_errdict(e)
113 self.root.set_new_password(app_name, email, token, new_password,113 self.assertEqual(result["errtype"], e.__class__.__name__)
114 MATCH(callable),114 self.assertEqual(result["message"], sample_string)
115 MATCH(callable))115
116 self.mocker.replay()116 def test_first_arg_is_unicode(self):
117 self.login.set_new_password(app_name, email, token, new_password)117 """If the first arg is a unicode, use it as the message."""
118118 sample_string = u"a sample string"
119119 e = TestExceptToErrdictException(sample_string)
120class CredentialsManagementMockedTestCase(MockerTestCase, TestCase):120 result = except_to_errdict(e)
121 """Test that the call are relied correctly."""121 self.assertEqual(result["errtype"], e.__class__.__name__)
122122 self.assertEqual(result["message"], sample_string)
123 def setUp(self):123
124 """Setup tests."""124 def test_no_args_at_all(self):
125 super(CredentialsManagementMockedTestCase, self).setUp()125 """If there are no args, use the class docstring."""
126 self.root = self.mocker.mock()126 e = TestExceptToErrdictException()
127 self.cred = CredentialsManagement(None, None)127 result = except_to_errdict(e)
128 self.cred.root = self.root128 self.assertEqual(result["errtype"], e.__class__.__name__)
129129 self.assertEqual(result["message"], e.__class__.__doc__)
130
131 def test_some_other_thing_as_first_arg(self):
132 """If first arg is not basestring nor dict, then repr all args."""
133 sample_args = (None, u"unicode2\ufffd", "errorcode3")
134 e = TestExceptToErrdictException(*sample_args)
135 result = except_to_errdict(e)
136 self.assertEqual(result["errtype"], e.__class__.__name__)
137
138
139class BaseTestCase(TestCase):
140 """Base test case."""
141
142 timeout = 2
143
144 @defer.inlineCallbacks
145 def setUp(self):
146 yield super(BaseTestCase, self).setUp()
147 self.proxy = FakedProxy()
148
149 def assert_recorder_called(self, fake, method, *args, **kwargs):
150 """Check that 'fake.method(*args, **kwargs)' was called."""
151 self.assertEqual(fake._called[method], [(args, kwargs)])
152
153
154class SSOLoginTestCase(BaseTestCase):
155 """Test the SSOLogin class."""
156
157 @defer.inlineCallbacks
158 def setUp(self):
159 yield super(SSOLoginTestCase, self).setUp()
160
161 def ksc(keyring, k, val):
162 """Assert over token and app_name."""
163 self.assertEqual(k, APP_NAME)
164 self.assertEqual(val, TOKEN)
165 self.keyring_was_set = True
166 self.keyring_values = k, val
167 return defer.succeed(None)
168
169 self.patch(main.Keyring, "set_credentials", ksc)
170 self.patch(main, 'Account', FakedAccount)
171 self.patch(main, "thread_execute", fake_ok_thread_execute)
172 self.keyring_was_set = False
173 self.keyring_values = None
174
175 self.obj = SSOLogin(self.proxy)
176
177 def test_creation(self):
178 """Test that the object creation is successful."""
179 self.assertIsInstance(self.obj.processor, main.Account)
180 self.assertTrue(self.obj.proxy is self.proxy)
181
182 @defer.inlineCallbacks
183 def test_generate_captcha(self):
184 """Test that the captcha method works ok."""
185 d = defer.Deferred()
186 filename = "sample filename"
187 expected_result = "expected result"
188
189 self.patch(self.obj, "CaptchaGenerated", lambda *a: d.callback(a))
190 self.patch(self.obj, "CaptchaGenerationError", d.errback)
191
192 self.obj.processor._next_result = expected_result
193 self.obj.generate_captcha(APP_NAME, filename)
194
195 app_name, result = yield d
196 self.assertEqual(result, expected_result)
197 self.assertEqual(app_name, APP_NAME)
198 self.assert_recorder_called(self.obj.processor, 'generate_captcha',
199 filename)
200
201 @defer.inlineCallbacks
202 def test_register_user(self):
203 """Test that the register_user method works ok."""
204 d = defer.Deferred()
205 expected_result = "expected result"
206
207 self.patch(self.obj, "UserRegistered", lambda *a: d.callback(a))
208 self.patch(self.obj, "UserRegistrationError", d.errback)
209
210 self.obj.processor._next_result = expected_result
211 self.obj.register_user(APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID,
212 CAPTCHA_SOLUTION)
213
214 app_name, result = yield d
215 self.assertEqual(result, expected_result)
216 self.assertEqual(app_name, APP_NAME)
217 self.assert_recorder_called(self.obj.processor, 'register_user',
218 EMAIL, PASSWORD, NAME, CAPTCHA_ID, CAPTCHA_SOLUTION)
219
220 @defer.inlineCallbacks
221 def test_login(self):
222 """Test that the login method works ok."""
223 d = defer.Deferred()
224
225 self.patch(self.obj, "LoggedIn", lambda *a: d.callback(a))
226 self.patch(self.obj, "LoginError", d.errback)
227 self.patch(self.obj, "UserNotValidated", d.errback)
228
229 self.obj.processor._next_result = TOKEN
230 self.obj.login(APP_NAME, EMAIL, PASSWORD)
231
232 app_name, result = yield d
233 self.assertEqual(result, EMAIL)
234 self.assertEqual(app_name, APP_NAME)
235 self.assertTrue(self.keyring_was_set, "The keyring should be set")
236 self.assert_recorder_called(self.obj.processor, 'login',
237 EMAIL, PASSWORD, main.get_token_name(APP_NAME))
238
239 @defer.inlineCallbacks
240 def test_validate_email(self):
241 """Test that the validate_email method works ok."""
242 d = defer.Deferred()
243
244 self.patch(self.obj, "EmailValidated", lambda *a: d.callback(a))
245 self.patch(self.obj, "EmailValidationError", d.errback)
246
247 self.obj.processor._next_result = TOKEN
248 self.obj.validate_email(APP_NAME, EMAIL, PASSWORD, EMAIL_TOKEN)
249
250 app_name, result = yield d
251 self.assertEqual(result, EMAIL)
252 self.assertEqual(app_name, APP_NAME)
253 self.assertTrue(self.keyring_was_set, "The keyring should be set")
254 self.assert_recorder_called(self.obj.processor, 'validate_email',
255 EMAIL, PASSWORD, EMAIL_TOKEN, main.get_token_name(APP_NAME))
256
257 @defer.inlineCallbacks
258 def test_request_password_reset_token(self):
259 """Test that the request_password_reset_token method works ok."""
260 d = defer.Deferred()
261
262 self.patch(self.obj, "PasswordResetTokenSent",
263 lambda *a: d.callback(a))
264 self.patch(self.obj, "PasswordResetError", d.errback)
265
266 self.obj.processor._next_result = EMAIL
267 self.obj.request_password_reset_token(APP_NAME, EMAIL)
268
269 app_name, result = yield d
270 self.assertEqual(result, EMAIL)
271 self.assertEqual(app_name, APP_NAME)
272 self.assert_recorder_called(self.obj.processor,
273 'request_password_reset_token', EMAIL)
274
275 @defer.inlineCallbacks
276 def test_set_new_password(self):
277 """Test that the set_new_password method works ok."""
278 d = defer.Deferred()
279
280 self.patch(self.obj, "PasswordChanged", lambda *a: d.callback(a))
281 self.patch(self.obj, "PasswordChangeError", d.errback)
282
283 self.obj.processor._next_result = EMAIL
284 self.obj.set_new_password(APP_NAME, EMAIL, EMAIL_TOKEN, PASSWORD)
285
286 app_name, result = yield d
287 self.assertEqual(result, EMAIL)
288 self.assertEqual(app_name, APP_NAME)
289 self.assert_recorder_called(self.obj.processor,
290 'set_new_password', EMAIL, EMAIL_TOKEN, PASSWORD)
291
292
293class SSOLoginWithErrorTestCase(SSOLoginTestCase):
294 """Test the SSOLogin class."""
295
296 @defer.inlineCallbacks
297 def setUp(self):
298 yield super(SSOLoginWithErrorTestCase, self).setUp()
299 self.patch(main, "thread_execute", fake_err_thread_execute)
300
301 @defer.inlineCallbacks
302 def test_generate_captcha(self):
303 """Test that the captcha method fails as expected."""
304 d = defer.Deferred()
305 filename = "sample filename"
306
307 self.patch(self.obj, "CaptchaGenerated", d.errback)
308 self.patch(self.obj, "CaptchaGenerationError",
309 lambda *a: d.callback(a))
310 self.obj.generate_captcha(APP_NAME, filename)
311
312 app_name, errdict = yield d
313 self.assertEqual(errdict["errtype"], "SampleException")
314 self.assertEqual(app_name, APP_NAME)
315
316 @defer.inlineCallbacks
317 def test_register_user(self):
318 """Test that the register_user method fails as expected."""
319 d = defer.Deferred()
320
321 self.patch(self.obj, "UserRegistered", d.errback)
322 self.patch(self.obj, "UserRegistrationError", lambda *a: d.callback(a))
323 self.obj.register_user(APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID,
324 CAPTCHA_SOLUTION)
325
326 app_name, errdict = yield d
327 self.assertEqual(errdict["errtype"], "SampleException")
328 self.assertEqual(app_name, APP_NAME)
329
330 @defer.inlineCallbacks
331 def test_login(self):
332 """The login method fails as expected when get_token_name fails."""
333 d = defer.Deferred()
334
335 def fake_gtn(*args):
336 """A fake get_token_name that fails."""
337 raise SampleException()
338
339 self.patch(main, "get_token_name", fake_gtn)
340 self.patch(self.obj, "LoggedIn", d.errback)
341 self.patch(self.obj, "LoginError", lambda *a: d.callback(a))
342 self.patch(self.obj, "UserNotValidated", d.errback)
343 self.obj.login(APP_NAME, EMAIL, PASSWORD)
344
345 app_name, errdict = yield d
346 self.assertEqual(app_name, APP_NAME)
347 self.assertEqual(errdict["errtype"], "SampleException")
348 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
349
350 @defer.inlineCallbacks
351 def test_login_set_credentials(self):
352 """The login method fails as expected when set_credentials fails."""
353 d = defer.Deferred()
354
355 def fake_set_creds(*args):
356 """A fake Keyring.set_credentials that fails."""
357 return defer.fail(SampleException())
358
359 self.patch(main.Keyring, "set_credentials", fake_set_creds)
360 fail = lambda app, res: d.errback((app, res))
361 self.patch(self.obj, "LoggedIn", fail)
362 self.patch(self.obj, "LoginError", lambda *a: d.callback(a))
363 self.patch(self.obj, "UserNotValidated", fail)
364 self.obj.login(APP_NAME, EMAIL, PASSWORD)
365
366 app_name, errdict = yield d
367 self.assertEqual(app_name, APP_NAME)
368 self.assertEqual(errdict["errtype"], "SampleException")
369 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
370
371 @defer.inlineCallbacks
372 def test_validate_email(self):
373 """Test that the validate_email method fails as expected."""
374 d = defer.Deferred()
375
376 def fake_gtn(*args):
377 """A fake get_token_name that fails."""
378 raise SampleException()
379
380 self.patch(main, "get_token_name", fake_gtn)
381
382 self.patch(self.obj, "EmailValidated", d.errback)
383 self.patch(self.obj, "EmailValidationError", lambda *a: d.callback(a))
384 self.obj.validate_email(APP_NAME, EMAIL, PASSWORD, EMAIL_TOKEN)
385
386 app_name, errdict = yield d
387 self.assertEqual(app_name, APP_NAME)
388 self.assertEqual(errdict["errtype"], "SampleException")
389 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
390
391 @defer.inlineCallbacks
392 def test_request_password_reset_token(self):
393 """Test the request_password_reset_token method fails as expected."""
394 d = defer.Deferred()
395
396 self.patch(self.obj, "PasswordResetTokenSent", d.errback)
397 self.patch(self.obj, "PasswordResetError", lambda *a: d.callback(a))
398 self.obj.request_password_reset_token(APP_NAME, EMAIL)
399
400 app_name, errdict = yield d
401 self.assertEqual(errdict["errtype"], "SampleException")
402 self.assertEqual(app_name, APP_NAME)
403
404 @defer.inlineCallbacks
405 def test_set_new_password(self):
406 """Test that the set_new_password method fails as expected."""
407 d = defer.Deferred()
408
409 self.patch(self.obj, "PasswordChanged", d.errback)
410 self.patch(self.obj, "PasswordChangeError", lambda *a: d.callback(a))
411 self.obj.set_new_password(APP_NAME, EMAIL, EMAIL_TOKEN, PASSWORD)
412
413 app_name, errdict = yield d
414 self.assertEqual(errdict["errtype"], "SampleException")
415 self.assertEqual(app_name, APP_NAME)
416
417
418class BlockingFunctionTestCase(TestCase):
419 """Tests for the "thread_execute" function."""
420
421 timeout = 5
422
423 @defer.inlineCallbacks
424 def test_thread_execute(self):
425 """Test the normal behaviour."""
426 d = defer.Deferred()
427 expected_result = "expected result"
428
429 def f():
430 """No failure."""
431 return expected_result
432
433 thread_execute(f, APP_NAME, lambda *a: d.callback(a), d.errback)
434
435 app_name, result = yield d
436
437 self.assertEqual(result, expected_result)
438 self.assertEqual(app_name, APP_NAME)
439
440 @defer.inlineCallbacks
441 def test_thread_execute_error(self):
442 """Test the behaviour when an Exception is raised."""
443 d = defer.Deferred()
444 expected_error_message = "expected error message"
445 expected_exc = SampleException(expected_error_message)
446
447 def f():
448 """Failure."""
449 raise expected_exc
450
451 thread_execute(f, APP_NAME, d.errback, lambda *a: d.callback(a))
452
453 app_name, error = yield d
454
455 self.assertEqual(app_name, APP_NAME)
456 self.assertEqual(error, expected_exc)
457
458
459class CredentialsManagementTestCase(BaseTestCase):
460 """Tests for the CredentialsManagement DBus interface."""
461
462 base_args = {
463 HELP_TEXT_KEY: HELP_TEXT, PING_URL_KEY: PING_URL,
464 TC_URL_KEY: TC_URL, WINDOW_ID_KEY: WINDOW_ID,
465 UI_CLASS_KEY: 'SuperUI', UI_MODULE_KEY: 'foo.bar.baz',
466 }
467
468 @defer.inlineCallbacks
469 def setUp(self):
470 yield super(CredentialsManagementTestCase, self).setUp()
471
472 self.factory = FakedCredentialsFactory()
473 self.patch(main, 'Credentials', self.factory.Credentials)
474 self.obj = CredentialsManagement(timeout_func=lambda *a: None,
475 shutdown_func=lambda *a: None,
476 proxy=self.proxy)
477 self.args = {}
478 self.cred_args = {}
479
480 self.memento = MementoHandler()
481 self.memento.setLevel(logging.DEBUG)
482 main.logger.addHandler(self.memento)
483 self.addCleanup(main.logger.removeHandler, self.memento)
484
485
486class CredentialsManagementRefCountingTestCase(CredentialsManagementTestCase):
487 """Tests for the CredentialsManagement ref counting."""
488
489 @defer.inlineCallbacks
490 def assert_refcounter_increased(self, method_name, signal_name=None):
491 """Assert that calling 'method_name' increases the ref counter."""
492 if signal_name is not None:
493 d = defer.Deferred()
494 self.patch(self.obj, signal_name, lambda *a: d.callback(a))
495 else:
496 d = defer.succeed(None)
497
498 getattr(self.obj, method_name)(APP_NAME, self.args)
499
500 yield d
501 self.assertEqual(self.obj.ref_count, 1)
502
503 def assert_refcounter_decreased(self, signal_name, *a, **kw):
504 """Assert that calling 'signal_name' decreases the ref counter."""
505 self.obj.ref_count = 3
506
507 getattr(self.obj, signal_name)(*a, **kw)
508
509 self.assertEqual(self.obj.ref_count, 2)
510
511 def assert_refcounter_negative(self, signal_name, *a, **kw):
512 """Assert that calling 'signal_name' decreases the ref counter.
513
514 If decreased value is negative, assert that a warning log was made.
515
516 """
517 self.obj._ref_count = -3 # overwrite internal to force a negative
518
519 getattr(self.obj, signal_name)(*a, **kw)
520
521 self.assertEqual(self.obj.ref_count, 0)
522 msg = 'Attempting to decrease ref_count to a negative value (-4).'
523 self.assertTrue(self.memento.check_warning(msg))
524
525 def test_ref_counting(self):
526 """Ref counting is in place."""
527 self.assertEqual(self.obj.ref_count, 0)
528
529 @defer.inlineCallbacks
130 def test_find_credentials(self):530 def test_find_credentials(self):
131 """Test that the call is relayed."""531 """Keep proper track of ongoing requests."""
132 app_name = 'app'532 yield self.assert_refcounter_increased('find_credentials',
133 args = 'args'533 'CredentialsFound')
134 self.root.find_credentials(app_name, args, MATCH(callable),534
135 MATCH(callable))535 @defer.inlineCallbacks
136 self.mocker.replay()536 def test_find_credentials_with_success_cb(self):
137 self.cred.find_credentials(app_name, args)537 """Keep proper track of ongoing requests."""
138538 d = defer.Deferred()
539 counter = []
540
541 def in_the_middle():
542 """Store the ref_count value."""
543 counter.append(self.obj.ref_count)
544 return defer.succeed(TOKEN)
545
546 self.patch(self.factory.credentials, 'find_credentials', in_the_middle)
547 self.obj.find_credentials(APP_NAME, self.args,
548 success_cb=d.callback, error_cb=d.errback)
549 yield d
550 self.assertEqual(counter, [1])
551 self.assertEqual(self.obj.ref_count, 0)
552
553 @defer.inlineCallbacks
554 def test_find_credentials_with_error_cb(self):
555 """Keep proper track of ongoing requests."""
556 d = defer.Deferred()
557 counter = []
558
559 def in_the_middle():
560 """Store the ref_count value."""
561 counter.append(self.obj.ref_count)
562 return defer.fail('foo')
563
564 self.patch(self.factory.credentials, 'find_credentials', in_the_middle)
565 self.obj.find_credentials(APP_NAME, self.args,
566 success_cb=d.errback, error_cb=d.callback)
567
568 yield d
569 self.assertEqual(counter, [1])
570 self.assertEqual(self.obj.ref_count, 0)
571
572 @defer.inlineCallbacks
139 def test_clear_credentials(self):573 def test_clear_credentials(self):
140 """Test that the call is relayed."""574 """Keep proper track of ongoing requests."""
141 app_name = 'app'575 yield self.assert_refcounter_increased('clear_credentials',
142 args = 'args'576 'CredentialsCleared')
143 self.root.clear_credentials(app_name, args, MATCH(callable),
144 MATCH(callable))
145 self.mocker.replay()
146 self.cred.clear_credentials(app_name, args)
147577
578 @defer.inlineCallbacks
148 def test_store_credentials(self):579 def test_store_credentials(self):
149 """Test that the call is relayed."""580 """Keep proper track of ongoing requests."""
150 app_name = 'app'581 yield self.assert_refcounter_increased('store_credentials',
151 args = 'args'582 'CredentialsStored')
152 self.root.store_credentials(app_name, args, MATCH(callable),583
153 MATCH(callable))584 def test_register(self):
154 self.mocker.replay()585 """Keep proper track of ongoing requests."""
155 self.cred.store_credentials(app_name, args)586 yield self.assert_refcounter_increased('register')
156587
157 def test_register(self):588 def test_login(self):
158 """Test that the call is relayed."""589 """Keep proper track of ongoing requests."""
159 app_name = 'app'590 yield self.assert_refcounter_increased('login')
160 args = 'args'591
161 self.root.register(app_name, args)592 def test_several_requests(self):
162 self.mocker.replay()593 """Requests can be nested."""
163 self.cred.register(app_name, args)594 self.obj.login(APP_NAME, self.args)
164595 self.obj.register(APP_NAME, self.args)
165 def test_login(self):596 self.obj.login(APP_NAME, self.args)
166 """Test that the call is relayed."""597 self.obj.register(APP_NAME, self.args)
167 app_name = 'app'598 self.obj.register(APP_NAME, self.args)
168 args = 'args'599
169 self.root.login(app_name, args)600 self.assertEqual(self.obj.ref_count, 5)
170 self.mocker.replay()601
171 self.cred.login(app_name, args)602 def test_credentials_found(self):
603 """Ref counter is decreased when a signal is sent."""
604 self.assert_refcounter_decreased('CredentialsFound', APP_NAME, TOKEN)
605
606 def test_credentials_not_found(self):
607 """Ref counter is decreased when a signal is sent."""
608 self.assert_refcounter_decreased('CredentialsNotFound', APP_NAME)
609
610 def test_credentials_cleared(self):
611 """Ref counter is decreased when a signal is sent."""
612 self.assert_refcounter_decreased('CredentialsCleared', APP_NAME)
613
614 def test_credentials_stored(self):
615 """Ref counter is decreased when a signal is sent."""
616 self.assert_refcounter_decreased('CredentialsStored', APP_NAME)
617
618 def test_credentials_error(self):
619 """Ref counter is decreased when a signal is sent."""
620 self.assert_refcounter_decreased('CredentialsError', APP_NAME,
621 SampleException('test'))
622
623 def test_authorization_denied(self):
624 """Ref counter is decreased when a signal is sent."""
625 self.assert_refcounter_decreased('AuthorizationDenied', APP_NAME)
626
627 def test_credentials_found_when_ref_count_is_not_positive(self):
628 """Ref counter is decreased when a signal is sent."""
629 self.assert_refcounter_negative('CredentialsFound', APP_NAME, TOKEN)
630
631 def test_credentials_not_found_when_ref_count_is_not_positive(self):
632 """Ref counter is decreased when a signal is sent."""
633 self.assert_refcounter_negative('CredentialsNotFound', APP_NAME)
634
635 def test_credentials_cleared_when_ref_count_is_not_positive(self):
636 """Ref counter is decreased when a signal is sent."""
637 self.assert_refcounter_negative('CredentialsCleared', APP_NAME)
638
639 def test_credentials_stored_when_ref_count_is_not_positive(self):
640 """Ref counter is decreased when a signal is sent."""
641 self.assert_refcounter_negative('CredentialsStored', APP_NAME)
642
643 def test_credentials_error_when_ref_count_is_not_positive(self):
644 """Ref counter is decreased when a signal is sent."""
645 self.assert_refcounter_negative('CredentialsError', APP_NAME,
646 SampleException('test'))
647
648 def test_autorization_denied_when_ref_count_is_not_positive(self):
649 """Ref counter is decreased when a signal is sent."""
650 self.assert_refcounter_negative('AuthorizationDenied', APP_NAME)
651
652 def test_on_zero_ref_count_shutdown(self):
653 """When ref count reaches 0, queue shutdown op."""
654 self.patch(self.obj, 'timeout_func', self._set_called)
655 self.obj.login(APP_NAME, self.args)
656 self.obj.CredentialsFound(APP_NAME, TOKEN)
657
658 self.assertEqual(self._called,
659 ((TIMEOUT_INTERVAL, self.obj.shutdown), {}))
660
661 def test_on_non_zero_ref_count_do_not_shutdown(self):
662 """If ref count is not 0, do not queue shutdown op."""
663 self.patch(self.obj, 'timeout_func', self._set_called)
664 self.obj.login(APP_NAME, self.args)
665
666 self.assertEqual(self._called, False)
667
668 def test_on_non_zero_ref_count_after_zero_do_not_shutdown(self):
669 """If the shutdown was queued, do not quit if counter is not zero."""
670
671 def fake_timeout_func(interval, func):
672 """Start a new request when the timer is started."""
673 self.obj.register(APP_NAME, self.args)
674 assert self.obj.ref_count > 0
675 func()
676
677 self.patch(self.obj, 'timeout_func', fake_timeout_func)
678 self.patch(self.obj, 'shutdown_func', self._set_called)
679
680 self.obj.login(APP_NAME, self.args)
681 self.obj.CredentialsFound(APP_NAME, TOKEN)
682 # counter reached 0, timeout_func was called
683
684 self.assertEqual(self._called, False, 'shutdown_func was not called')
685
686 def test_zero_ref_count_after_zero_do_shutdown(self):
687 """If the shutdown was queued, do quit if counter is zero."""
688
689 def fake_timeout_func(interval, func):
690 """Start a new request when the timer is started."""
691 assert self.obj.ref_count == 0
692 func()
693
694 self.patch(self.obj, 'timeout_func', fake_timeout_func)
695 self.patch(self.obj, 'shutdown_func', self._set_called)
696
697 self.obj.login(APP_NAME, self.args)
698 self.obj.CredentialsFound(APP_NAME, TOKEN)
699 # counter reached 0, timeout_func was called
700
701 self.assertEqual(self._called, ((), {}), 'shutdown_func was called')
702
703
704class CredentialsManagementClearTestCase(CredentialsManagementTestCase):
705 """Tests for the CredentialsManagement clear method."""
706
707 method = 'clear_credentials'
708 success_signal = 'CredentialsCleared'
709 success_result = (APP_NAME,)
710 others_should_errback = []
711
712 @defer.inlineCallbacks
713 def setUp(self):
714 yield super(CredentialsManagementClearTestCase, self).setUp()
715 self.call_method = getattr(self.obj, self.method)
716
717 def test_backend_called(self):
718 """The credentials backend is properly called."""
719 self.call_method(APP_NAME, self.args)
720 self.assert_recorder_called(self.factory, 'Credentials', APP_NAME)
721
722 @defer.inlineCallbacks
723 def test_does_not_block(self):
724 """Calling 'method' does not block but return thru signals."""
725 d = defer.Deferred()
726
727 self.patch(self.obj, self.success_signal, lambda *a: d.callback(a))
728 self.patch(self.obj, 'CredentialsError',
729 lambda *a: d.errback(SampleException(a)))
730 for signal in self.others_should_errback:
731 self.patch(self.obj, signal,
732 lambda *a: d.errback(AssertionError(a)))
733
734 self.call_method(APP_NAME, self.args)
735
736 result = yield d
737 self.assertEqual(result, self.success_result)
738
739 @defer.inlineCallbacks
740 def test_handles_error(self):
741 """If calling the backend fails, CredentialsError is sent."""
742 d = defer.Deferred()
743
744 self.patch(self.obj, self.success_signal, d.errback)
745 self.patch(self.obj, 'CredentialsError', lambda *a: d.callback(a))
746
747 exc = SampleException('baz')
748 self.patch(self.factory.credentials, self.method,
749 lambda *a: defer.fail(exc))
750 self.call_method(APP_NAME, self.args)
751
752 app_name, error = yield d
753 self.assertEqual(error, exc)
754 self.assertEqual(app_name, APP_NAME)
755
756
757class CredentialsManagementStoreTestCase(CredentialsManagementClearTestCase):
758 """Tests for the CredentialsManagement store method."""
759
760 method = 'store_credentials'
761 success_signal = 'CredentialsStored'
762
763
764class CredentialsManagementFindTestCase(CredentialsManagementClearTestCase):
765 """Tests for the CredentialsManagement find method."""
766
767 method = 'find_credentials'
768 success_signal = 'CredentialsFound'
769 success_result = (APP_NAME, TOKEN)
770 others_should_errback = ['CredentialsNotFound']
771
772 @defer.inlineCallbacks
773 def test_find_credentials_with_success_cb(self):
774 """The credentials are asked and returned in a thread_execute call."""
775 d = defer.Deferred()
776
777 self.obj.find_credentials(APP_NAME, self.args,
778 success_cb=d.callback, error_cb=d.errback)
779
780 creds = yield d
781 expected = yield self.factory.credentials.find_credentials()
782 self.assertEqual(creds, expected)
783 self.assert_recorder_called(self.factory, 'Credentials', APP_NAME)
784
785 @defer.inlineCallbacks
786 def test_find_credentials_with_error_cb(self):
787 """If find_credentials fails, error_cb is called."""
788 d = defer.Deferred()
789
790 self.patch(self.factory.credentials, 'find_credentials',
791 lambda *a: defer.fail(SampleException('baz')))
792 self.obj.find_credentials(APP_NAME, self.args,
793 success_cb=d.errback, error_cb=d.callback)
794
795 errdict = yield d
796 self.assertEqual(errdict["errtype"], "SampleException")
797 self.assert_recorder_called(self.factory, 'Credentials', APP_NAME)
798
799
800class CredentialsManagementNotFindTestCase(CredentialsManagementFindTestCase):
801 """Tests for the CredentialsManagement find method."""
802
803 success_signal = 'CredentialsNotFound'
804 success_result = (APP_NAME,)
805 others_should_errback = ['CredentialsFound']
806
807 @defer.inlineCallbacks
808 def setUp(self):
809 yield super(CredentialsManagementNotFindTestCase, self).setUp()
810 self.call_method = getattr(self.obj, self.method)
811 self.patch(self.factory.credentials, self.method,
812 lambda: defer.succeed({}))
813
814
815class CredentialsManagementOpsTestCase(CredentialsManagementTestCase):
816 """Tests for the CredentialsManagement login/register methods."""
817
818 @defer.inlineCallbacks
819 def setUp(self):
820 yield super(CredentialsManagementOpsTestCase, self).setUp()
821 self.args = dict((k, str(v)) for k, v in self.base_args.iteritems())
822 self.cred_args = self.base_args.copy()
823 self.cred_args[SUCCESS_CB_KEY] = self.obj.CredentialsFound
824 self.cred_args[ERROR_CB_KEY] = self.obj.CredentialsError
825 self.cred_args[DENIAL_CB_KEY] = self.obj.AuthorizationDenied
826
827 def test_register(self):
828 """The registration is correct."""
829 self.obj.register(APP_NAME, self.args)
830 self.assert_recorder_called(self.factory, 'Credentials',
831 APP_NAME, **self.cred_args)
832
833 def test_login(self):
834 """The login is correct."""
835 self.obj.login(APP_NAME, self.args)
836 self.assert_recorder_called(self.factory, 'Credentials',
837 APP_NAME, **self.cred_args)
172838
173 def test_login_email_password(self):839 def test_login_email_password(self):
174 """Test that the call is relayed."""840 """The login_email_password is correct."""
175 app_name = 'app'841 self.args['email'] = EMAIL
176 args = 'args'842 self.args['password'] = PASSWORD
177 self.root.login_email_password(app_name, args)843 self.obj.login_email_password(APP_NAME, self.args)
178 self.mocker.replay()844 self.assert_recorder_called(self.factory, 'Credentials',
179 self.cred.login_email_password(app_name, args)845 APP_NAME, **self.cred_args)
846
847
848class CredentialsManagementParamsTestCase(CredentialsManagementOpsTestCase):
849 """Tests for the CredentialsManagement extra parameters handling."""
850
851 @defer.inlineCallbacks
852 def setUp(self):
853 yield super(CredentialsManagementParamsTestCase, self).setUp()
854 self.args['dummy'] = 'nothing useful'
855
856
857class CredentialsManagementSignalsTestCase(CredentialsManagementTestCase):
858 """Tests for the CredentialsManagement DBus signals."""
859
860 def test_credentials_found(self):
861 """The CredentialsFound signal."""
862 self.obj.CredentialsFound(APP_NAME, TOKEN)
863 msgs = (self.obj.__class__.__name__,
864 self.obj.CredentialsFound.__name__, APP_NAME)
865 self.assertTrue(self.memento.check_info(*msgs))
866
867 msg = 'credentials must not be logged (found %r in log).'
868 for val in TOKEN.itervalues():
869 self.assertFalse(self.memento.check_info(val), msg % val)
870
871 def test_credentials_not_found(self):
872 """The CredentialsNotFound signal."""
873 self.obj.CredentialsNotFound(APP_NAME)
874 msgs = (self.obj.__class__.__name__,
875 self.obj.CredentialsNotFound.__name__, APP_NAME)
876 self.assertTrue(self.memento.check_info(*msgs))
877
878 def test_credentials_cleared(self):
879 """The CredentialsCleared signal."""
880 self.obj.CredentialsCleared(APP_NAME)
881 msgs = (self.obj.__class__.__name__,
882 self.obj.CredentialsCleared.__name__, APP_NAME)
883 self.assertTrue(self.memento.check_info(*msgs))
884
885 def test_credentials_stored(self):
886 """The CredentialsStored signal."""
887 self.obj.CredentialsStored(APP_NAME)
888 msgs = (self.obj.__class__.__name__,
889 self.obj.CredentialsStored.__name__, APP_NAME)
890 self.assertTrue(self.memento.check_info(*msgs))
891
892 def test_credentials_error(self):
893 """The CredentialsError signal."""
894 error = {'error_message': 'failed!', 'detailed error': 'yadda yadda'}
895 self.obj.CredentialsError(APP_NAME, error)
896 msgs = (self.obj.__class__.__name__,
897 self.obj.CredentialsError.__name__,
898 APP_NAME, str(error))
899 self.assertTrue(self.memento.check_error(*msgs))
900
901 def test_authorization_denied(self):
902 """The AuthorizationDenied signal."""
903 self.obj.AuthorizationDenied(APP_NAME)
904 msgs = (self.obj.__class__.__name__,
905 self.obj.AuthorizationDenied.__name__, APP_NAME)
906 self.assertTrue(self.memento.check_info(*msgs))
907
908
909class UbuntuSSOServiceTestCase(BaseTestCase):
910 """Test suite for the UbuntuSSOService class."""
911
912 @defer.inlineCallbacks
913 def setUp(self):
914 yield super(UbuntuSSOServiceTestCase, self).setUp()
915 self.proxy.sso_login = object()
916 self.proxy.cred_manager = object()
917 self.params = []
918 self.patch(main, 'UbuntuSSOProxy',
919 lambda _: self.params.append(_) or self.proxy)
920 self.obj = UbuntuSSOService()
921 yield self.obj.start()
922
923 def test_creation(self):
924 """Creation paremeter is properly used."""
925 self.assertEqual(self.params, [self.obj])
926
927 def test_sso_login(self):
928 """Attributes has the expected value."""
929 self.assertIsInstance(self.obj.sso_login, SSOLogin)
930 self.assertTrue(self.obj.sso_login.proxy is self.proxy.sso_login)
931
932 def test_cred_manager(self):
933 """Attributes has the expected value."""
934 self.assertIsInstance(self.obj.cred_manager, CredentialsManagement)
935 self.assertTrue(self.obj.cred_manager.proxy is self.proxy.cred_manager)
180936
=== removed file 'ubuntu_sso/main/tests/test_linux.py'
--- ubuntu_sso/main/tests/test_linux.py 2011-12-19 19:35:06 +0000
+++ ubuntu_sso/main/tests/test_linux.py 1970-01-01 00:00:00 +0000
@@ -1,1275 +0,0 @@
1# -*- coding: utf-8 -*-
2#
3# test_main - tests for ubuntu_sso.main
4#
5# Author: Natalia Bidart <natalia.bidart@canonical.com>
6# Author: Alejandro J. Cura <alecu@canonical.com>
7#
8# Copyright 2009-2010 Canonical Ltd.
9#
10# This program is free software: you can redistribute it and/or modify it
11# under the terms of the GNU General Public License version 3, as published
12# by the Free Software Foundation.
13#
14# This program is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranties of
16# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
17# PURPOSE. See the GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along
20# with this program. If not, see <http://www.gnu.org/licenses/>.
21"""Tests for the main SSO client code."""
22
23import logging
24
25from mocker import Mocker, ARGS, KWARGS
26from twisted.internet import defer
27from ubuntuone.devtools.handlers import MementoHandler
28
29import ubuntu_sso.keyring
30import ubuntu_sso.main
31import ubuntu_sso.main.linux
32
33from ubuntu_sso import DBUS_CREDENTIALS_IFACE
34from ubuntu_sso.main import (
35 except_to_errdict,
36 CredentialsManagement,
37 SSOLogin,
38 TIMEOUT_INTERVAL,
39)
40from ubuntu_sso.main.linux import blocking
41from ubuntu_sso.main import (HELP_TEXT_KEY, PING_URL_KEY,
42 TC_URL_KEY, UI_CLASS_KEY, UI_MODULE_KEY, WINDOW_ID_KEY,
43 SUCCESS_CB_KEY, ERROR_CB_KEY, DENIAL_CB_KEY)
44from ubuntu_sso.tests import (APP_NAME, TC_URL, HELP_TEXT, CAPTCHA_ID,
45 CAPTCHA_SOLUTION, EMAIL, EMAIL_TOKEN, NAME, PASSWORD, PING_URL, TOKEN,
46 TOKEN_NAME, WINDOW_ID, TestCase)
47
48
49# Access to a protected member 'yyy' of a client class
50# pylint: disable=W0212
51
52
53class BlockingSampleException(Exception):
54 """The exception that will be thrown by the fake blocking."""
55
56
57def fake_ok_blocking(f, app, cb, eb):
58 """A fake blocking function that succeeds."""
59 cb(app, f())
60
61
62def fake_err_blocking(f, app, cb, eb):
63 """A fake blocking function that fails."""
64 try:
65 f()
66 except Exception, e: # pylint: disable=W0703
67 eb(app, except_to_errdict(e))
68 else:
69 eb(app, except_to_errdict(BlockingSampleException()))
70
71
72class SsoDbusTestCase(TestCase):
73 """Test the SSOLogin DBus interface."""
74
75 timeout = 2
76
77 @defer.inlineCallbacks
78 def setUp(self):
79 """Create the mocking bus."""
80 yield super(SsoDbusTestCase, self).setUp()
81 self.mocker = Mocker()
82 self.mockbusname = self.mocker.mock()
83 mockbus = self.mocker.mock()
84 self.mockbusname.get_bus()
85 self.mocker.result(mockbus)
86 mockbus._register_object_path(ARGS)
87 self.mockprocessorclass = None
88
89 def ksc(keyring, k, val):
90 """Assert over token and app_name."""
91 self.assertEqual(k, APP_NAME)
92 self.assertEqual(val, TOKEN)
93 self.keyring_was_set = True
94 self.keyring_values = k, val
95 return defer.succeed(None)
96
97 self.patch(ubuntu_sso.main.Keyring, "set_credentials", ksc)
98 self.keyring_was_set = False
99 self.keyring_values = None
100
101 @defer.inlineCallbacks
102 def tearDown(self):
103 """Verify the mocking bus and shut it down."""
104 self.mocker.verify()
105 self.mocker.restore()
106 yield super(SsoDbusTestCase, self).tearDown()
107
108 def test_creation(self):
109 """Test that the object creation is successful."""
110 self.mocker.replay()
111 SSOLogin(self.mockbusname)
112
113 def create_mock_processor(self):
114 """Create a mock processor from a dummy processor class."""
115 self.mockprocessorclass = self.mocker.mock()
116 mockprocessor = self.mocker.mock()
117 self.mockprocessorclass(ARGS, KWARGS)
118 self.mocker.result(mockprocessor)
119 return mockprocessor
120
121 def test_generate_captcha(self):
122 """Test that the captcha method works ok."""
123 d = defer.Deferred()
124 filename = "sample filename"
125 expected_result = "expected result"
126 self.create_mock_processor().generate_captcha(filename)
127 self.mocker.result(expected_result)
128 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
129 self.mocker.replay()
130
131 def verify(app_name, result):
132 """The actual test."""
133 self.assertEqual(result, expected_result)
134 self.assertEqual(app_name, APP_NAME)
135 d.callback(result)
136
137 client = SSOLogin(self.mockbusname,
138 sso_login_processor_class=self.mockprocessorclass)
139 self.patch(client, "CaptchaGenerated", verify)
140 self.patch(client, "CaptchaGenerationError", d.errback)
141 client.generate_captcha(APP_NAME, filename)
142 return d
143
144 def test_generate_captcha_error(self):
145 """Test that the captcha method fails as expected."""
146 d = defer.Deferred()
147 filename = "sample filename"
148 expected_result = "expected result"
149 self.create_mock_processor().generate_captcha(filename)
150 self.mocker.result(expected_result)
151 self.patch(ubuntu_sso.main, "thread_execute", fake_err_blocking)
152 self.mocker.replay()
153
154 def verify(app_name, errdict):
155 """The actual test."""
156 self.assertEqual(errdict["errtype"], "BlockingSampleException")
157 self.assertEqual(app_name, APP_NAME)
158 d.callback("Ok")
159
160 client = SSOLogin(self.mockbusname,
161 sso_login_processor_class=self.mockprocessorclass)
162 self.patch(client, "CaptchaGenerated", d.errback)
163 self.patch(client, "CaptchaGenerationError", verify)
164 client.generate_captcha(APP_NAME, filename)
165 return d
166
167 def test_register_user(self):
168 """Test that the register_user method works ok."""
169 d = defer.Deferred()
170 expected_result = "expected result"
171 self.create_mock_processor().register_user(EMAIL, PASSWORD, NAME,
172 CAPTCHA_ID, CAPTCHA_SOLUTION)
173 self.mocker.result(expected_result)
174 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
175 self.mocker.replay()
176
177 def verify(app_name, result):
178 """The actual test."""
179 self.assertEqual(result, expected_result)
180 self.assertEqual(app_name, APP_NAME)
181 d.callback(result)
182
183 client = SSOLogin(self.mockbusname,
184 sso_login_processor_class=self.mockprocessorclass)
185 self.patch(client, "UserRegistered", verify)
186 self.patch(client, "UserRegistrationError", d.errback)
187 client.register_user(APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID,
188 CAPTCHA_SOLUTION)
189 return d
190
191 def test_register_user_error(self):
192 """Test that the register_user method fails as expected."""
193 d = defer.Deferred()
194 expected_result = "expected result"
195 self.create_mock_processor().register_user(EMAIL, PASSWORD, NAME,
196 CAPTCHA_ID, CAPTCHA_SOLUTION)
197 self.mocker.result(expected_result)
198 self.patch(ubuntu_sso.main, "thread_execute", fake_err_blocking)
199 self.mocker.replay()
200
201 def verify(app_name, errdict):
202 """The actual test."""
203 self.assertEqual(errdict["errtype"], "BlockingSampleException")
204 self.assertEqual(app_name, APP_NAME)
205 d.callback("Ok")
206
207 client = SSOLogin(self.mockbusname,
208 sso_login_processor_class=self.mockprocessorclass)
209 self.patch(client, "UserRegistered", d.errback)
210 self.patch(client, "UserRegistrationError", verify)
211 client.register_user(APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID,
212 CAPTCHA_SOLUTION)
213 return d
214
215 def test_login(self):
216 """Test that the login method works ok."""
217 d = defer.Deferred()
218 processor = self.create_mock_processor()
219 processor.login(EMAIL, PASSWORD, TOKEN_NAME)
220 self.mocker.result(TOKEN)
221 processor.is_validated(TOKEN)
222 self.mocker.result(True)
223 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
224 self.mocker.replay()
225
226 def verify(app_name, result):
227 """The actual test."""
228 self.assertEqual(result, EMAIL)
229 self.assertEqual(app_name, APP_NAME)
230 self.assertTrue(self.keyring_was_set, "The keyring should be set")
231 d.callback(result)
232
233 client = SSOLogin(self.mockbusname,
234 sso_login_processor_class=self.mockprocessorclass)
235 self.patch(client, "LoggedIn", verify)
236 self.patch(client, "LoginError", d.errback)
237 self.patch(client, "UserNotValidated", d.errback)
238 client.login(APP_NAME, EMAIL, PASSWORD)
239 return d
240
241 def test_login_user_not_validated(self):
242 """Test that the login sends EmailNotValidated signal."""
243 d = defer.Deferred()
244 processor = self.create_mock_processor()
245 processor.login(EMAIL, PASSWORD, TOKEN_NAME)
246 self.mocker.result(TOKEN)
247 processor.is_validated(TOKEN)
248 self.mocker.result(False)
249 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
250 self.mocker.replay()
251
252 def verify(app_name, email):
253 """The actual test."""
254 self.assertEqual(app_name, APP_NAME)
255 self.assertEqual(email, EMAIL)
256 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
257 d.callback("Ok")
258
259 client = SSOLogin(self.mockbusname,
260 sso_login_processor_class=self.mockprocessorclass)
261 self.patch(client, "LoggedIn", d.errback)
262 self.patch(client, "LoginError", d.errback)
263 self.patch(client, "UserNotValidated", verify)
264 client.login(APP_NAME, EMAIL, PASSWORD)
265 return d
266
267 def test_login_error_get_token_name(self):
268 """The login method fails as expected when get_token_name fails."""
269 d = defer.Deferred()
270 self.create_mock_processor()
271 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
272
273 def fake_gtn(*args):
274 """A fake get_token_name that fails."""
275 raise BlockingSampleException()
276
277 self.patch(ubuntu_sso.main, "get_token_name", fake_gtn)
278 self.mocker.replay()
279
280 def verify(app_name, errdict):
281 """The actual test."""
282 self.assertEqual(app_name, APP_NAME)
283 self.assertEqual(errdict["errtype"], "BlockingSampleException")
284 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
285 d.callback("Ok")
286
287 client = SSOLogin(self.mockbusname,
288 sso_login_processor_class=self.mockprocessorclass)
289 self.patch(client, "LoggedIn", d.errback)
290 self.patch(client, "LoginError", verify)
291 self.patch(client, "UserNotValidated", d.errback)
292 client.login(APP_NAME, EMAIL, PASSWORD)
293 return d
294
295 def test_login_error_set_credentials(self):
296 """The login method fails as expected when set_credentials fails."""
297 d = defer.Deferred()
298 processor = self.create_mock_processor()
299 processor.login(EMAIL, PASSWORD, TOKEN_NAME)
300 self.mocker.result(TOKEN)
301 processor.is_validated(TOKEN)
302 self.mocker.result(True)
303
304 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
305
306 def fake_set_creds(*args):
307 """A fake Keyring.set_credentials that fails."""
308 return defer.fail(BlockingSampleException())
309
310 self.patch(ubuntu_sso.main.Keyring, "set_credentials", fake_set_creds)
311 self.mocker.replay()
312
313 def verify(app_name, errdict):
314 """The actual test."""
315 self.assertEqual(app_name, APP_NAME)
316 self.assertEqual(errdict["errtype"], "BlockingSampleException")
317 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
318 d.callback("Ok")
319
320 client = SSOLogin(self.mockbusname,
321 sso_login_processor_class=self.mockprocessorclass)
322 fail = lambda app, res: d.errback((app, res))
323 self.patch(client, "LoggedIn", fail)
324 self.patch(client, "LoginError", verify)
325 self.patch(client, "UserNotValidated", fail)
326 client.login(APP_NAME, EMAIL, PASSWORD)
327 return d
328
329 def test_validate_email(self):
330 """Test that the validate_email method works ok."""
331 d = defer.Deferred()
332 self.create_mock_processor().validate_email(EMAIL, PASSWORD,
333 EMAIL_TOKEN, TOKEN_NAME)
334 self.mocker.result(TOKEN)
335 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
336 self.mocker.replay()
337
338 def verify(app_name, result):
339 """The actual test."""
340 self.assertEqual(result, EMAIL)
341 self.assertEqual(app_name, APP_NAME)
342 self.assertTrue(self.keyring_was_set, "The keyring should be set")
343 d.callback(result)
344
345 client = SSOLogin(self.mockbusname,
346 sso_login_processor_class=self.mockprocessorclass)
347 self.patch(client, "EmailValidated", verify)
348 self.patch(client, "EmailValidationError", d.errback)
349 client.validate_email(APP_NAME, EMAIL, PASSWORD, EMAIL_TOKEN)
350 return d
351
352 def test_validate_email_error(self):
353 """Test that the validate_email method fails as expected."""
354 d = defer.Deferred()
355 self.create_mock_processor()
356 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
357
358 def fake_gtn(*args):
359 """A fake get_token_name that fails."""
360 raise BlockingSampleException()
361
362 self.patch(ubuntu_sso.main, "get_token_name", fake_gtn)
363 self.mocker.replay()
364
365 def verify(app_name, errdict):
366 """The actual test."""
367 self.assertEqual(app_name, APP_NAME)
368 self.assertEqual(errdict["errtype"], "BlockingSampleException")
369 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
370 d.callback("Ok")
371
372 client = SSOLogin(self.mockbusname,
373 sso_login_processor_class=self.mockprocessorclass)
374 self.patch(client, "EmailValidated", d.errback)
375 self.patch(client, "EmailValidationError", verify)
376 client.validate_email(APP_NAME, EMAIL, PASSWORD, EMAIL_TOKEN)
377 return d
378
379 def test_request_password_reset_token(self):
380 """Test that the request_password_reset_token method works ok."""
381 d = defer.Deferred()
382 processor = self.create_mock_processor()
383 processor.request_password_reset_token(EMAIL)
384 self.patch(ubuntu_sso.main, "thread_execute", fake_ok_blocking)
385 self.mocker.result(EMAIL)
386 self.mocker.replay()
387
388 def verify(app_name, result):
389 """The actual test."""
390 self.assertEqual(result, EMAIL)
391 self.assertEqual(app_name, APP_NAME)
392 d.callback(result)
393
394 client = SSOLogin(self.mockbusname,
395 sso_login_processor_class=self.mockprocessorclass)
396 self.patch(client, "PasswordResetTokenSent", verify)
397 self.patch(client, "PasswordResetError", d.errback)
398 client.request_password_reset_token(APP_NAME, EMAIL)
399 return d
400
401 def test_request_password_reset_token_error(self):
402 """Test the request_password_reset_token method fails as expected."""
403 d = defer.Deferred()
404
405 self.create_mock_processor().request_password_reset_token(EMAIL)
406 self.mocker.result(EMAIL)
407 self.patch(ubuntu_sso.main, "thread_execute", fake_err_blocking)
408 self.mocker.replay()
409
410 def verify(app_name, errdict):
411 """The actual test."""
412 self.assertEqual(errdict["errtype"], "BlockingSampleException")
413 self.assertEqual(app_name, APP_NAME)
414 d.callback("Ok")
415
416 client = SSOLogin(self.mockbusname,
417 sso_login_processor_class=self.mockprocessorclass)
418 self.patch(client, "PasswordResetTokenSent", d.errback)
419 self.patch(client, "PasswordResetError", verify)
420 client.request_password_reset_token(APP_NAME, EMAIL)
421 return d
422
423 def test_set_new_password(self):
424 """Test that the set_new_password method works ok."""
425 d = defer.Deferred()
426 self.create_mock_processor().set_new_password(EMAIL, EMAIL_TOKEN,
427 PASSWORD)
428 self.mocker.result(EMAIL)
429 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
430 self.mocker.replay()
431
432 def verify(app_name, result):
433 """The actual test."""
434 self.assertEqual(result, EMAIL)
435 self.assertEqual(app_name, APP_NAME)
436 d.callback(result)
437
438 client = SSOLogin(self.mockbusname,
439 sso_login_processor_class=self.mockprocessorclass)
440 self.patch(client, "PasswordChanged", verify)
441 self.patch(client, "PasswordChangeError", d.errback)
442 client.set_new_password(APP_NAME, EMAIL, EMAIL_TOKEN, PASSWORD)
443 return d
444
445 def test_set_new_password_error(self):
446 """Test that the set_new_password method fails as expected."""
447 d = defer.Deferred()
448 expected_result = "expected result"
449
450 self.create_mock_processor().set_new_password(EMAIL, EMAIL_TOKEN,
451 PASSWORD)
452 self.mocker.result(expected_result)
453 self.patch(ubuntu_sso.main, "thread_execute", fake_err_blocking)
454 self.mocker.replay()
455
456 def verify(app_name, errdict):
457 """The actual test."""
458 self.assertEqual(errdict["errtype"], "BlockingSampleException")
459 self.assertEqual(app_name, APP_NAME)
460 d.callback("Ok")
461
462 client = SSOLogin(self.mockbusname,
463 sso_login_processor_class=self.mockprocessorclass)
464 self.patch(client, "PasswordChanged", d.errback)
465 self.patch(client, "PasswordChangeError", verify)
466 client.set_new_password(APP_NAME, EMAIL, EMAIL_TOKEN, PASSWORD)
467 return d
468
469
470class BlockingFunctionTestCase(TestCase):
471 """Tests for the "blocking" function."""
472
473 timeout = 5
474
475 def test_blocking(self):
476 """Test the normal behaviour."""
477 d = defer.Deferred()
478 expected_result = "expected result"
479
480 def f():
481 """No failure."""
482 return expected_result
483
484 def verify(app_name, result):
485 """The actual test."""
486 self.assertEqual(result, expected_result)
487 self.assertEqual(app_name, APP_NAME)
488 d.callback(result)
489
490 blocking(f, APP_NAME, verify, d.errback)
491 return d
492
493 def test_blocking_error(self):
494 """Test the behaviour when an Exception is raised."""
495 d = defer.Deferred()
496 expected_error_message = "expected error message"
497
498 def f():
499 """Failure."""
500 raise BlockingSampleException(expected_error_message)
501
502 def verify(app_name, errdict):
503 """The actual test."""
504 self.assertEqual(app_name, APP_NAME)
505 self.assertEqual(errdict["errtype"], "BlockingSampleException")
506 self.assertEqual(errdict["message"], expected_error_message)
507 d.callback("Ok")
508
509 blocking(f, APP_NAME, d.errback, verify)
510 return d
511
512
513class TestExceptToErrdictException(Exception):
514 """A dummy exception for the following testcase."""
515
516
517class ExceptToErrdictTestCase(TestCase):
518 """Tests for the except_to_errdict function."""
519
520 def test_first_arg_is_dict(self):
521 """If the first arg is a dict, use it as the base dict."""
522 sample_dict = {
523 "errorcode1": "error message 1",
524 "errorcode2": "error message 2",
525 "errorcode3": "error message 3",
526 }
527 e = TestExceptToErrdictException(sample_dict)
528 result = except_to_errdict(e)
529
530 self.assertEqual(result["errtype"], e.__class__.__name__)
531 for k in sample_dict.keys():
532 self.assertIn(k, result)
533 self.assertEqual(result[k], sample_dict[k])
534
535 def test_first_arg_is_str(self):
536 """If the first arg is a str, use it as the message."""
537 sample_string = "a sample string"
538 e = TestExceptToErrdictException(sample_string)
539 result = except_to_errdict(e)
540 self.assertEqual(result["errtype"], e.__class__.__name__)
541 self.assertEqual(result["message"], sample_string)
542
543 def test_first_arg_is_unicode(self):
544 """If the first arg is a unicode, use it as the message."""
545 sample_string = u"a sample string"
546 e = TestExceptToErrdictException(sample_string)
547 result = except_to_errdict(e)
548 self.assertEqual(result["errtype"], e.__class__.__name__)
549 self.assertEqual(result["message"], sample_string)
550
551 def test_no_args_at_all(self):
552 """If there are no args, use the class docstring."""
553 e = TestExceptToErrdictException()
554 result = except_to_errdict(e)
555 self.assertEqual(result["errtype"], e.__class__.__name__)
556 self.assertEqual(result["message"], e.__class__.__doc__)
557
558 def test_some_other_thing_as_first_arg(self):
559 """If first arg is not basestring nor dict, then repr all args."""
560 sample_args = (None, u"unicode2\ufffd", "errorcode3")
561 e = TestExceptToErrdictException(*sample_args)
562 result = except_to_errdict(e)
563 self.assertEqual(result["errtype"], e.__class__.__name__)
564
565
566class CredentialsManagementTestCase(TestCase):
567 """Tests for the CredentialsManagement DBus interface."""
568
569 timeout = 2
570 base_args = {HELP_TEXT_KEY: HELP_TEXT, PING_URL_KEY: PING_URL,
571 TC_URL_KEY: TC_URL, WINDOW_ID_KEY: WINDOW_ID,
572 UI_CLASS_KEY: 'SuperUI', UI_MODULE_KEY: 'foo.bar.baz',
573 }
574
575 @defer.inlineCallbacks
576 def setUp(self):
577 yield super(CredentialsManagementTestCase, self).setUp()
578
579 self.mocker = Mocker()
580 self.client = CredentialsManagement(timeout_func=lambda *a: None,
581 shutdown_func=lambda *a: None)
582 self.args = {}
583 self.cred_args = {}
584
585 self.memento = MementoHandler()
586 self.memento.setLevel(logging.DEBUG)
587 ubuntu_sso.main.logger.addHandler(self.memento)
588 self.addCleanup(ubuntu_sso.main.logger.removeHandler, self.memento)
589
590 @defer.inlineCallbacks
591 def tearDown(self):
592 """Verify the mocking stuff and shut it down."""
593 self.mocker.verify()
594 self.mocker.restore()
595 yield super(CredentialsManagementTestCase, self).tearDown()
596
597 def assert_dbus_method_correct(self, method, out_signature=''):
598 """Check that 'method' is a dbus method with proper signatures."""
599 self.assertTrue(method._dbus_is_method)
600 self.assertEqual(method._dbus_interface, DBUS_CREDENTIALS_IFACE)
601 self.assertEqual(method._dbus_in_signature, 'sa{ss}')
602 self.assertEqual(method._dbus_out_signature, out_signature)
603
604 def create_mock_backend(self):
605 """Create a mock backend."""
606 mock_class = self.mocker.replace("ubuntu_sso.credentials.Credentials")
607 mock_class(APP_NAME, **self.cred_args)
608 creds_obj = self.mocker.mock()
609 self.mocker.result(creds_obj)
610
611 return creds_obj
612
613 def test_is_dbus_object(self):
614 """CredentialsManagement is a Dbus object."""
615 self.assertIsInstance(self.client,
616 ubuntu_sso.main.linux.dbus.service.Object)
617
618
619class FakeCredentials(object):
620 """A very dummy Credentials object."""
621
622 def __init__(self, *a, **kw):
623 self.clear_credentials = lambda *a: defer.succeed(None)
624 self.store_credentials = lambda *a: defer.succeed(None)
625 self.login = self.register = lambda *a: None
626
627 def find_credentials(self, *a, **kw):
628 """Retrieve credentials."""
629 return defer.succeed(TOKEN)
630
631
632class CredentialsManagementRefCountingTestCase(CredentialsManagementTestCase):
633 """Tests for the CredentialsManagement ref counting."""
634
635 @defer.inlineCallbacks
636 def setUp(self):
637 yield super(CredentialsManagementRefCountingTestCase, self).setUp()
638 self.patch(ubuntu_sso.main, 'Credentials', FakeCredentials)
639
640 def test_ref_counting(self):
641 """Ref counting is in place."""
642 self.assertEqual(self.client.root.ref_count, 0)
643
644 def test_find_credentials(self):
645 """Keep proper track of on going requests."""
646 d = defer.Deferred()
647
648 def verify(*args):
649 """Make the check."""
650 self.assertEqual(self.client.root.ref_count, 1)
651 d.callback(True)
652
653 self.patch(self.client, 'CredentialsFound', verify)
654 self.client.find_credentials(APP_NAME, self.args)
655
656 return d
657
658 @defer.inlineCallbacks
659 def test_find_credentials_sync(self):
660 """Keep proper track of on going requests."""
661 d = defer.Deferred()
662
663 def verify(*args):
664 """Make the check."""
665 self.assertEqual(self.client.root.ref_count, 1)
666 d.callback(True)
667
668 self.client.find_credentials_sync(APP_NAME, self.args,
669 reply_handler=verify,
670 error_handler=d.errback)
671 yield d
672 self.assertEqual(self.client.root.ref_count, 0)
673
674 @defer.inlineCallbacks
675 def test_find_credentials_sync_error(self):
676 """Keep proper track of on going requests."""
677 d = defer.Deferred()
678
679 def verify(*args):
680 """Make the check."""
681 self.assertEqual(self.client.root.ref_count, 1)
682 d.callback(True)
683
684 self.patch(ubuntu_sso.main.Credentials, 'find_credentials',
685 lambda *a: defer.fail('foo'))
686 self.client.find_credentials_sync(APP_NAME, self.args,
687 reply_handler=d.errback,
688 error_handler=verify)
689
690 yield d
691 self.assertEqual(self.client.root.ref_count, 0)
692
693 def test_clear_credentials(self):
694 """Keep proper track of on going requests."""
695 d = defer.Deferred()
696
697 def verify(*args):
698 """Make the check."""
699 self.assertEqual(self.client.root.ref_count, 1)
700 d.callback(True)
701
702 self.patch(self.client, 'CredentialsCleared', verify)
703 self.client.clear_credentials(APP_NAME, self.args)
704
705 return d
706
707 def test_store_credentials(self):
708 """Keep proper track of on going requests."""
709 d = defer.Deferred()
710
711 def verify(*args):
712 """Make the check."""
713 self.assertEqual(self.client.root.ref_count, 1)
714 d.callback(True)
715
716 self.patch(self.client, 'CredentialsStored', verify)
717 self.client.store_credentials(APP_NAME, self.args)
718
719 return d
720
721 def test_register(self):
722 """Keep proper track of on going requests."""
723 self.client.register(APP_NAME, self.args)
724
725 self.assertEqual(self.client.root.ref_count, 1)
726
727 def test_login(self):
728 """Keep proper track of on going requests."""
729 self.client.login(APP_NAME, self.args)
730
731 self.assertEqual(self.client.root.ref_count, 1)
732
733 def test_several_requests(self):
734 """Requests can be nested."""
735 self.client.login(APP_NAME, self.args)
736 self.client.register(APP_NAME, self.args)
737 self.client.login(APP_NAME, self.args)
738 self.client.register(APP_NAME, self.args)
739 self.client.register(APP_NAME, self.args)
740
741 self.assertEqual(self.client.root.ref_count, 5)
742
743 def test_credentials_found(self):
744 """Ref counter is decreased when a signal is sent."""
745 self.client.root.ref_count = 3
746 self.client.CredentialsFound(APP_NAME, TOKEN)
747
748 self.assertEqual(self.client.root.ref_count, 2)
749
750 def test_credentials_not_found(self):
751 """Ref counter is decreased when a signal is sent."""
752 self.client.root.ref_count = 3
753 self.client.CredentialsNotFound(APP_NAME)
754
755 self.assertEqual(self.client.root.ref_count, 2)
756
757 def test_credentials_cleared(self):
758 """Ref counter is decreased when a signal is sent."""
759 self.client.root.ref_count = 3
760 self.client.CredentialsCleared(APP_NAME)
761
762 self.assertEqual(self.client.root.ref_count, 2)
763
764 def test_credentials_stored(self):
765 """Ref counter is decreased when a signal is sent."""
766 self.client.root.ref_count = 3
767 self.client.CredentialsStored(APP_NAME)
768
769 self.assertEqual(self.client.root.ref_count, 2)
770
771 def test_credentials_error(self):
772 """Ref counter is decreased when a signal is sent."""
773 self.client.root.ref_count = 3
774 self.client.CredentialsError(APP_NAME, {'error_type': 'test'})
775
776 self.assertEqual(self.client.root.ref_count, 2)
777
778 def test_authorization_denied(self):
779 """Ref counter is decreased when a signal is sent."""
780 self.client.root.ref_count = 3
781 self.client.AuthorizationDenied(APP_NAME)
782
783 self.assertEqual(self.client.root.ref_count, 2)
784
785 def test_credentials_found_when_ref_count_is_not_positive(self):
786 """Ref counter is decreased when a signal is sent."""
787 self.client.root._ref_count = -3
788 self.client.CredentialsFound(APP_NAME, TOKEN)
789
790 self.assertEqual(self.client.root.ref_count, 0)
791 msg = 'Attempting to decrease ref_count to a negative value (-4).'
792 self.assertTrue(self.memento.check_warning(msg))
793
794 def test_credentials_not_found_when_ref_count_is_not_positive(self):
795 """Ref counter is decreased when a signal is sent."""
796 self.client.root._ref_count = -3
797 self.client.CredentialsNotFound(APP_NAME)
798
799 self.assertEqual(self.client.root.ref_count, 0)
800 msg = 'Attempting to decrease ref_count to a negative value (-4).'
801 self.assertTrue(self.memento.check_warning(msg))
802
803 def test_credentials_cleared_when_ref_count_is_not_positive(self):
804 """Ref counter is decreased when a signal is sent."""
805 self.client.root._ref_count = -3
806 self.client.CredentialsCleared(APP_NAME)
807
808 self.assertEqual(self.client.root.ref_count, 0)
809 msg = 'Attempting to decrease ref_count to a negative value (-4).'
810 self.assertTrue(self.memento.check_warning(msg))
811
812 def test_credentials_stored_when_ref_count_is_not_positive(self):
813 """Ref counter is decreased when a signal is sent."""
814 self.client.root._ref_count = -3
815 self.client.CredentialsStored(APP_NAME)
816
817 self.assertEqual(self.client.root.ref_count, 0)
818 msg = 'Attempting to decrease ref_count to a negative value (-4).'
819 self.assertTrue(self.memento.check_warning(msg))
820
821 def test_credentials_error_when_ref_count_is_not_positive(self):
822 """Ref counter is decreased when a signal is sent."""
823 self.client.root._ref_count = -3
824 self.client.CredentialsError(APP_NAME, {'error_type': 'test'})
825
826 self.assertEqual(self.client.root.ref_count, 0)
827 msg = 'Attempting to decrease ref_count to a negative value (-4).'
828 self.assertTrue(self.memento.check_warning(msg))
829
830 def test_autorization_denied_when_ref_count_is_not_positive(self):
831 """Ref counter is decreased when a signal is sent."""
832 self.client.root._ref_count = -3
833 self.client.AuthorizationDenied(APP_NAME)
834
835 self.assertEqual(self.client.root.ref_count, 0)
836 msg = 'Attempting to decrease ref_count to a negative value (-4).'
837 self.assertTrue(self.memento.check_warning(msg))
838
839 def test_on_zero_ref_count_shutdown(self):
840 """When ref count reaches 0, queue shutdown op."""
841 self.patch(self.client.root, 'timeout_func', self._set_called)
842 self.client.login(APP_NAME, self.args)
843 self.client.CredentialsFound(APP_NAME, TOKEN)
844
845 self.assertEqual(self._called,
846 ((TIMEOUT_INTERVAL, self.client.root.shutdown), {}))
847
848 def test_on_non_zero_ref_count_do_not_shutdown(self):
849 """If ref count is not 0, do not queue shutdown op."""
850 self.patch(self.client.root, 'timeout_func', self._set_called)
851 self.client.login(APP_NAME, self.args)
852
853 self.assertEqual(self._called, False)
854
855 def test_on_non_zero_ref_count_after_zero_do_not_shutdown(self):
856 """If the shutdown was queued, do not quit if counter is not zero."""
857
858 def fake_timeout_func(interval, func):
859 """Start a new request when the timer is started."""
860 self.client.register(APP_NAME, self.args)
861 assert self.client.root.ref_count > 0
862 func()
863
864 self.patch(self.client.root, 'timeout_func', fake_timeout_func)
865 self.patch(self.client.root, 'shutdown_func', self._set_called)
866
867 self.client.login(APP_NAME, self.args)
868 self.client.CredentialsFound(APP_NAME, TOKEN)
869 # counter reached 0, timeout_func was called
870
871 self.assertEqual(self._called, False, 'shutdown_func was not called')
872
873 def test_zero_ref_count_after_zero_do_shutdown(self):
874 """If the shutdown was queued, do quit if counter is zero."""
875
876 def fake_timeout_func(interval, func):
877 """Start a new request when the timer is started."""
878 assert self.client.root.ref_count == 0
879 func()
880
881 self.patch(self.client.root, 'timeout_func', fake_timeout_func)
882 self.patch(self.client.root, 'shutdown_func', self._set_called)
883
884 self.client.login(APP_NAME, self.args)
885 self.client.CredentialsFound(APP_NAME, TOKEN)
886 # counter reached 0, timeout_func was called
887
888 self.assertEqual(self._called, ((), {}), 'shutdown_func was called')
889
890
891class CredentialsManagementFindTestCase(CredentialsManagementTestCase):
892 """Tests for the CredentialsManagement find method."""
893
894 def test_find_credentials(self):
895 """The credentials are asked and returned in signals."""
896 self.create_mock_backend().find_credentials()
897 self.mocker.result(defer.succeed(None))
898 self.mocker.replay()
899
900 self.client.find_credentials(APP_NAME, self.args)
901 self.assert_dbus_method_correct(self.client.find_credentials)
902
903 def test_find_credentials_does_not_block_when_found(self):
904 """Calling find_credentials does not block but return thru signals.
905
906 If the creds are found, CredentialsFound is emitted.
907
908 """
909 d = defer.Deferred()
910
911 def verify(app_name, creds):
912 """The actual test."""
913 try:
914 self.assertEqual(app_name, APP_NAME)
915 self.assertEqual(creds, TOKEN)
916 except Exception, e: # pylint: disable=W0703
917 d.errback(e)
918 else:
919 d.callback(creds)
920
921 self.patch(self.client, 'CredentialsFound', verify)
922 self.patch(self.client, 'CredentialsNotFound', d.errback)
923
924 self.create_mock_backend().find_credentials()
925 self.mocker.result(defer.succeed(TOKEN))
926 self.mocker.replay()
927
928 self.client.find_credentials(APP_NAME, self.args)
929 return d
930
931 def test_find_credentials_does_not_block_when_not_found(self):
932 """Calling find_credentials does not block but return thru signals.
933
934 If the creds are not found, CredentialsNotFound is emitted.
935
936 """
937 d = defer.Deferred()
938
939 def verify(app_name):
940 """The actual test."""
941 try:
942 self.assertEqual(app_name, APP_NAME)
943 except Exception, e: # pylint: disable=W0703
944 d.errback(e)
945 else:
946 d.callback(app_name)
947
948 self.patch(self.client, 'CredentialsFound',
949 lambda app, creds: d.errback(app))
950 self.patch(self.client, 'CredentialsNotFound', verify)
951
952 self.create_mock_backend().find_credentials()
953 self.mocker.result(defer.succeed({}))
954 self.mocker.replay()
955
956 self.client.find_credentials(APP_NAME, self.args)
957 return d
958
959 def test_find_credentials_error(self):
960 """If find_credentials fails, CredentialsError is sent."""
961 d = defer.Deferred()
962
963 def verify(app_name, errdict):
964 """The actual test."""
965 self.assertEqual(errdict["errtype"], "BlockingSampleException")
966 self.assertEqual(app_name, APP_NAME)
967 d.callback("Ok")
968
969 self.patch(self.client, 'CredentialsFound',
970 lambda app, creds: d.errback(app))
971 self.patch(self.client, 'CredentialsNotFound', d.errback)
972 self.patch(self.client, 'CredentialsError', verify)
973
974 self.create_mock_backend().find_credentials()
975 self.mocker.result(defer.fail(BlockingSampleException()))
976 self.mocker.replay()
977
978 self.client.find_credentials(APP_NAME, self.args)
979 return d
980
981 def test_find_credentials_sync(self):
982 """The credentials are asked and returned in a blocking call."""
983 d = defer.Deferred()
984
985 def verify(creds):
986 """The actual test."""
987 try:
988 self.assertEqual(creds, TOKEN)
989 except Exception, e: # pylint: disable=W0703
990 d.errback(e)
991 else:
992 d.callback(creds)
993
994 self.create_mock_backend().find_credentials()
995 self.mocker.result(defer.succeed(TOKEN))
996 self.mocker.replay()
997
998 self.client.find_credentials_sync(APP_NAME, self.args,
999 reply_handler=verify,
1000 error_handler=d.errback)
1001 self.assert_dbus_method_correct(self.client.find_credentials_sync,
1002 out_signature='a{ss}')
1003 return d
1004
1005 def test_find_credentials_sync_error(self):
1006 """If find_credentials_sync fails, error_handler is called."""
1007 d = defer.Deferred()
1008
1009 def verify(error):
1010 """The actual test."""
1011 try:
1012 errdict = error.args[0]
1013 self.assertEqual(errdict["errtype"], "BlockingSampleException")
1014 except Exception, e: # pylint: disable=W0703
1015 d.errback(e)
1016 else:
1017 d.callback("Ok")
1018
1019 self.create_mock_backend().find_credentials()
1020 self.mocker.result(defer.fail(BlockingSampleException()))
1021 self.mocker.replay()
1022
1023 self.client.find_credentials_sync(APP_NAME, self.args,
1024 reply_handler=d.errback,
1025 error_handler=verify)
1026 return d
1027
1028
1029class CredentialsManagementClearTestCase(CredentialsManagementTestCase):
1030 """Tests for the CredentialsManagement clear method."""
1031
1032 def test_clear_credentials(self):
1033 """The credentials are removed."""
1034 self.create_mock_backend().clear_credentials()
1035 self.mocker.result(defer.succeed(APP_NAME))
1036 self.mocker.replay()
1037
1038 self.client.clear_credentials(APP_NAME, self.args)
1039 self.assert_dbus_method_correct(self.client.clear_credentials)
1040
1041 def test_clear_credentials_does_not_block(self):
1042 """Calling clear_credentials does not block but return thru signals."""
1043 d = defer.Deferred()
1044
1045 def verify(app_name):
1046 """The actual test."""
1047 try:
1048 self.assertEqual(app_name, APP_NAME)
1049 except Exception, e: # pylint: disable=W0703
1050 d.errback(e)
1051 else:
1052 d.callback(app_name)
1053
1054 self.patch(self.client, 'CredentialsCleared', verify)
1055 self.patch(self.client, 'CredentialsError',
1056 lambda app, err: d.errback(app))
1057
1058 self.create_mock_backend().clear_credentials()
1059 self.mocker.result(defer.succeed(APP_NAME))
1060 self.mocker.replay()
1061
1062 self.client.clear_credentials(APP_NAME, self.args)
1063 return d
1064
1065 def test_clear_credentials_error(self):
1066 """If clear_credentials fails, CredentialsError is sent."""
1067 d = defer.Deferred()
1068
1069 def verify(app_name, errdict):
1070 """The actual test."""
1071 self.assertEqual(errdict["errtype"], "BlockingSampleException")
1072 self.assertEqual(app_name, APP_NAME)
1073 d.callback("Ok")
1074
1075 self.patch(self.client, 'CredentialsCleared', d.errback)
1076 self.patch(self.client, 'CredentialsError', verify)
1077
1078 self.create_mock_backend().clear_credentials()
1079 self.mocker.result(defer.fail(BlockingSampleException()))
1080 self.mocker.replay()
1081
1082 self.client.clear_credentials(APP_NAME, self.args)
1083 return d
1084
1085
1086class CredentialsManagementStoreTestCase(CredentialsManagementTestCase):
1087 """Tests for the CredentialsManagement store method."""
1088
1089 def test_store_credentials(self):
1090 """The credentials are stored and the outcome is a signal."""
1091 self.create_mock_backend().store_credentials(TOKEN)
1092 self.mocker.result(defer.succeed(APP_NAME))
1093 self.mocker.replay()
1094
1095 self.client.store_credentials(APP_NAME, TOKEN)
1096 self.assert_dbus_method_correct(self.client.store_credentials)
1097
1098 def test_store_credentials_does_not_block(self):
1099 """Calling store_credentials does not block but return thru signals.
1100
1101 If the creds are stored, CredentialsStored is emitted.
1102
1103 """
1104 d = defer.Deferred()
1105
1106 def verify(app_name):
1107 """The actual test."""
1108 try:
1109 self.assertEqual(app_name, APP_NAME)
1110 except Exception, e: # pylint: disable=W0703
1111 d.errback(e)
1112 else:
1113 d.callback(app_name)
1114
1115 self.patch(self.client, 'CredentialsStored', verify)
1116 self.patch(self.client, 'CredentialsError',
1117 lambda app, err: d.errback(app))
1118
1119 self.create_mock_backend().store_credentials(TOKEN)
1120 self.mocker.result(defer.succeed(APP_NAME))
1121 self.mocker.replay()
1122
1123 self.client.store_credentials(APP_NAME, TOKEN)
1124 return d
1125
1126 def test_store_credentials_error(self):
1127 """If store_credentials fails, CredentialsError is sent."""
1128 d = defer.Deferred()
1129
1130 def verify(app_name, errdict):
1131 """The actual test."""
1132 self.assertEqual(errdict["errtype"], "BlockingSampleException")
1133 self.assertEqual(app_name, APP_NAME)
1134 d.callback("Ok")
1135
1136 self.patch(self.client, 'CredentialsStored', d.errback)
1137 self.patch(self.client, 'CredentialsError', verify)
1138
1139 self.create_mock_backend().store_credentials(TOKEN)
1140 self.mocker.result(defer.fail(BlockingSampleException()))
1141 self.mocker.replay()
1142
1143 self.client.store_credentials(APP_NAME, TOKEN)
1144 return d
1145
1146
1147class CredentialsManagementOpsTestCase(CredentialsManagementTestCase):
1148 """Tests for the CredentialsManagement login/register methods."""
1149
1150 @defer.inlineCallbacks
1151 def setUp(self):
1152 yield super(CredentialsManagementOpsTestCase, self).setUp()
1153 self.args = dict((k, str(v)) for k, v in self.base_args.iteritems())
1154 self.cred_args = self.base_args.copy()
1155 self.cred_args[SUCCESS_CB_KEY] = self.client.CredentialsFound
1156 self.cred_args[ERROR_CB_KEY] = self.client.CredentialsError
1157 self.cred_args[DENIAL_CB_KEY] = self.client.AuthorizationDenied
1158
1159 def test_register(self):
1160 """The registration is correct."""
1161 self.create_mock_backend().register()
1162 self.mocker.replay()
1163
1164 self.client.register(APP_NAME, self.args)
1165 self.assert_dbus_method_correct(self.client.register)
1166
1167 def test_login(self):
1168 """The login is correct."""
1169 self.create_mock_backend().login()
1170 self.mocker.replay()
1171
1172 self.client.login(APP_NAME, self.args)
1173 self.assert_dbus_method_correct(self.client.login)
1174
1175 def test_login_email_password(self):
1176 """The login_email_password is correct."""
1177 self.create_mock_backend().login_email_password(email=EMAIL,
1178 password=PASSWORD)
1179 self.mocker.replay()
1180
1181 self.args['email'] = EMAIL
1182 self.args['password'] = PASSWORD
1183 self.client.login_email_password(APP_NAME, self.args)
1184 self.assert_dbus_method_correct(self.client.login_email_password)
1185
1186
1187class CredentialsManagementParamsTestCase(CredentialsManagementOpsTestCase):
1188 """Tests for the CredentialsManagement extra parameters handling."""
1189
1190 @defer.inlineCallbacks
1191 def setUp(self):
1192 yield super(CredentialsManagementParamsTestCase, self).setUp()
1193 self.args['dummy'] = 'nothing useful'
1194
1195
1196class CredentialsManagementSignalsTestCase(TestCase):
1197 """Tests for the CredentialsManagement DBus signals."""
1198
1199 @defer.inlineCallbacks
1200 def setUp(self):
1201 """Set up."""
1202 yield super(CredentialsManagementSignalsTestCase, self).setUp()
1203 self.client = CredentialsManagement(timeout_func=lambda *a: None,
1204 shutdown_func=lambda *a: None)
1205
1206 self.memento = MementoHandler()
1207 self.memento.setLevel(logging.DEBUG)
1208 ubuntu_sso.main.linux.logger.addHandler(self.memento)
1209 self.addCleanup(ubuntu_sso.main.linux.logger.removeHandler,
1210 self.memento)
1211
1212 def assert_dbus_signal_correct(self, signal, signature):
1213 """Check that 'signal' is a dbus signal with proper 'signature'."""
1214 self.assertTrue(signal._dbus_is_signal)
1215 self.assertEqual(signal._dbus_interface, DBUS_CREDENTIALS_IFACE)
1216 self.assertEqual(signal._dbus_signature, signature)
1217
1218 def test_credentials_found(self):
1219 """The CredentialsFound signal."""
1220 self.client.CredentialsFound(APP_NAME, TOKEN)
1221 msgs = (self.client.__class__.__name__,
1222 self.client.CredentialsFound.__name__, APP_NAME)
1223 self.assertTrue(self.memento.check_info(*msgs))
1224
1225 msg = 'credentials must not be logged (found %r in log).'
1226 for val in TOKEN.itervalues():
1227 self.assertFalse(self.memento.check_info(val), msg % val)
1228
1229 self.assert_dbus_signal_correct(self.client.CredentialsFound, 'sa{ss}')
1230
1231 def test_credentials_not_found(self):
1232 """The CredentialsNotFound signal."""
1233 self.client.CredentialsNotFound(APP_NAME)
1234 msgs = (self.client.__class__.__name__,
1235 self.client.CredentialsNotFound.__name__, APP_NAME)
1236 self.assertTrue(self.memento.check_info(*msgs))
1237 self.assert_dbus_signal_correct(self.client.CredentialsNotFound, 's')
1238
1239 def test_credentials_cleared(self):
1240 """The CredentialsCleared signal."""
1241 self.client.CredentialsCleared(APP_NAME)
1242 msgs = (self.client.__class__.__name__,
1243 self.client.CredentialsCleared.__name__, APP_NAME)
1244 self.assertTrue(self.memento.check_info(*msgs))
1245
1246 self.assert_dbus_signal_correct(self.client.CredentialsCleared, 's')
1247
1248 def test_credentials_stored(self):
1249 """The CredentialsStored signal."""
1250 self.client.CredentialsStored(APP_NAME)
1251 msgs = (self.client.__class__.__name__,
1252 self.client.CredentialsStored.__name__, APP_NAME)
1253 self.assertTrue(self.memento.check_info(*msgs))
1254
1255 self.assert_dbus_signal_correct(self.client.CredentialsStored, 's')
1256
1257 def test_credentials_error(self):
1258 """The CredentialsError signal."""
1259 error = {'error_message': 'failed!', 'detailed error': 'yadda yadda'}
1260 self.client.CredentialsError(APP_NAME, error)
1261 msgs = (self.client.__class__.__name__,
1262 self.client.CredentialsError.__name__,
1263 APP_NAME, str(error))
1264 self.assertTrue(self.memento.check_error(*msgs))
1265
1266 self.assert_dbus_signal_correct(self.client.CredentialsError, 'sa{ss}')
1267
1268 def test_authorization_denied(self):
1269 """The AuthorizationDenied signal."""
1270 self.client.AuthorizationDenied(APP_NAME)
1271 msgs = (self.client.__class__.__name__,
1272 self.client.AuthorizationDenied.__name__, APP_NAME)
1273 self.assertTrue(self.memento.check_info(*msgs))
1274
1275 self.assert_dbus_signal_correct(self.client.AuthorizationDenied, 's')
12760
=== modified file 'ubuntu_sso/main/tests/test_windows.py'
--- ubuntu_sso/main/tests/test_windows.py 2011-12-19 19:35:06 +0000
+++ ubuntu_sso/main/tests/test_windows.py 2012-01-17 16:51:50 +0000
@@ -1,6 +1,4 @@
1# -*- coding: utf-8 -*-1# -*- coding: utf-8 -*-
2# Authors: Manuel de la Pena <manuel@canonical.com>
3# Alejandro J. Cura <alecu@canonical.com>
4#2#
5# Copyright 2011 Canonical Ltd.3# Copyright 2011 Canonical Ltd.
6#4#
@@ -15,932 +13,16 @@
15#13#
16# You should have received a copy of the GNU General Public License along14# You should have received a copy of the GNU General Public License along
17# with this program. If not, see <http://www.gnu.org/licenses/>.15# with this program. If not, see <http://www.gnu.org/licenses/>.
18"""Windows tests."""16"""Windows specific tests for the main module."""
1917
20# pylint: disable=F040118# pylint: disable=F0401
21from _winreg import REG_SZ19from _winreg import REG_SZ
2220
23from mocker import MATCH, Mocker, MockerTestCase
24
25from twisted.internet import defer, reactor
26from twisted.trial.unittest import TestCase
27from twisted.spread.pb import (
28 DeadReferenceError,
29 PBClientFactory,
30 PBServerFactory,
31 Broker,
32)
33from ubuntu_sso import main
34from ubuntu_sso.main import windows21from ubuntu_sso.main import windows
35from ubuntu_sso.main.windows import (
36 signal,
37 CredentialsManagement,
38 CredentialsManagementClient,
39 LOCALHOST,
40 SignalBroadcaster,
41 SSOLogin,
42 SSOLoginClient,
43 UbuntuSSORoot,
44 UbuntuSSOClient,
45 get_sso_pb_port,
46)
47
48# because we are using twisted we have java like names C0103 and
49# the issues that mocker brings with it like W0104
50# pylint: disable=C0103,W0104,E1101,W0201
51
52
53class SaveProtocolServerFactory(PBServerFactory):
54 """A PBServerFactory that saves the latest connected client."""
55
56 protocolInstance = None
57
58 def clientConnectionMade(self, protocol):
59 """Keep track of the given protocol."""
60 self.protocolInstance = protocol
61
62
63class SaveClientFactory(PBClientFactory):
64 """Client Factory that knows when we disconnected."""
65
66 def __init__(self, connected_d, disconnected_d):
67 """Create a new instance."""
68 PBClientFactory.__init__(self)
69 self.connected_d = connected_d
70 self.disconnected_d = disconnected_d
71
72 def clientConnectionMade(self, broker):
73 """Connection made."""
74 PBClientFactory.clientConnectionMade(self, broker)
75 self.connected_d.callback(True)
76
77 def clientConnectionLost(self, connector, reason, reconnecting=0):
78 """Connection lost."""
79 self.disconnected_d.callback(True)
80
81
82class ServerProtocol(Broker):
83 """Server protocol that allows us to clean the tests."""
84
85 def connectionLost(self, *a):
86 self.factory.onConnectionLost.callback(self)
87
88
89class ConnectedTestCase(TestCase):
90 """Base test case with a client and a server."""
91
92 @defer.inlineCallbacks
93 def setUp(self):
94 """Set up for the tests."""
95 yield super(ConnectedTestCase, self).setUp()
96 self.server_disconnected = defer.Deferred()
97 self.client_disconnected = defer.Deferred()
98 self.listener = None
99 self.connector = None
100 self.server_factory = None
101 self.client_factory = None
102
103 def setup_client_server(self, sso_root):
104 """Set tests."""
105 port = get_sso_pb_port()
106 self.listener = self._listen_server(sso_root, self.server_disconnected,
107 port)
108 connected = defer.Deferred()
109 self.connector = self._connect_client(connected,
110 self.client_disconnected, port)
111 self.addCleanup(self.teardown_client_server)
112 return connected
113
114 def _listen_server(self, sso_root, d, port):
115 """Start listenting."""
116 self.server_factory = SaveProtocolServerFactory(sso_root)
117 self.server_factory.onConnectionLost = d
118 self.server_factory.protocol = ServerProtocol
119 return reactor.listenTCP(port, self.server_factory)
120
121 def _connect_client(self, d1, d2, port):
122 """Connect client."""
123 self.client_factory = SaveClientFactory(d1, d2)
124 return reactor.connectTCP(LOCALHOST, port, self.client_factory)
125
126 def teardown_client_server(self):
127 """Clean resources."""
128 self.connector.disconnect()
129 d = defer.maybeDeferred(self.listener.stopListening)
130 return defer.gatherResults([d, self.client_disconnected,
131 self.server_disconnected])
132
133
134class FakeDecoratedObject(object):
135 """An object that has decorators."""
136
137 def __init__(self):
138 """Create a new instance."""
139 super(FakeDecoratedObject, self).__init__()
140
141 @signal
142 def on_no_args(self):
143 """Get no args passwed."""
144
145 @signal
146 def on_just_args(self, *args):
147 """Just get args."""
148
149 @signal
150 def on_just_kwargs(self, **kwargs):
151 """Just get kwargs."""
152
153 @signal
154 def on_both_args(self, *args, **kwargs):
155 """Both args."""
156
157
158class TestException(Exception):
159 """A test exception."""
160
161 @classmethod
162 def fail(cls, *args):
163 """Raise this exception."""
164 raise cls(*args)
165
166
167# pylint: disable=W0201
168class SignalTestCase(MockerTestCase):
169 """Test the signal decorator."""
170
171 @defer.inlineCallbacks
172 def setUp(self):
173 yield super(SignalTestCase, self).setUp()
174 self.fake_object = FakeDecoratedObject()
175 self.cb = self.mocker.mock()
176
177 def test_no_args(self):
178 """Test when the cb should have no args."""
179 self.fake_object.on_no_args_cb = self.cb
180 self.cb()
181 self.mocker.replay()
182 self.fake_object.on_no_args()
183
184 def test_just_args(self):
185 """Test when the cb just has *args"""
186 first = 'first'
187 second = 'second'
188 self.fake_object.on_just_args_cb = self.cb
189 self.cb(first, second)
190 self.mocker.replay()
191 self.fake_object.on_just_args(first, second)
192
193 def test_just_kwargs(self):
194 """Test when the cb just has kwargs."""
195 first = 'first'
196 second = 'second'
197 self.fake_object.on_just_kwargs_cb = self.cb
198 self.cb(first=first, second=second)
199 self.mocker.replay()
200 self.fake_object.on_just_kwargs(first=first, second=second)
201
202 def test_just_kwargs_empty(self):
203 """Test when the cb just has kwargs."""
204 self.fake_object.on_just_kwargs_cb = self.cb
205 self.cb()
206 self.mocker.replay()
207 self.fake_object.on_just_kwargs()
208
209 def test_both_args(self):
210 """Test with args and kwargs."""
211 first = 'first'
212 second = 'second'
213 self.fake_object.on_both_args_cb = self.cb
214 self.cb(first, second, first=first, second=second)
215 self.mocker.replay()
216 self.fake_object.on_both_args(first, second, first=first,
217 second=second)
218
219 def test_both_args_no_kwargs(self):
220 """Test with args and kwargs."""
221 first = 'first'
222 second = 'second'
223 self.fake_object.on_both_args_cb = self.cb
224 self.cb(first, second)
225 self.mocker.replay()
226 self.fake_object.on_both_args(first, second)
227
228 def test_both_args_no_args(self):
229 """Test with args and kwargs."""
230 first = 'first'
231 second = 'second'
232 self.fake_object.on_both_args_cb = self.cb
233 self.cb(first=first, second=second)
234 self.mocker.replay()
235 self.fake_object.on_both_args(first=first, second=second)
236# pylint: enable=W0201
237
238
239class FakeDeadRemoteClient(object):
240 """A fake dead remote client."""
241
242 def callRemote(self, signal_name, *args, **kwargs):
243 """Fails with DeadReferenceError."""
244 raise DeadReferenceError("Calling Stale Broker")
245
246
247class FakeAliveRemoteClient(object):
248 """A fake alive remote client."""
249
250 def __init__(self):
251 self.called = False
252
253 def callRemote(self, signal_name, *args, **kwargs):
254 """Returns a succeed."""
255 self.called = True
256 return defer.succeed(None)
257
258
259class SignalBroadcasterTestCase(TestCase):
260 """Test the SignalBroadcaster class."""
261
262 def test_emit_signal_dead_reference(self):
263 """Test dead reference while emiting the signal."""
264 fake_remote_client = FakeDeadRemoteClient()
265 sb = SignalBroadcaster()
266 sb.remote_register_to_signals(fake_remote_client)
267 self.assertIn(fake_remote_client, sb.clients)
268 sb.emit_signal("sample_signal")
269 self.assertNotIn(fake_remote_client, sb.clients)
270
271 def test_emit_signal_some_dead_some_not(self):
272 """Test a clean reference after a dead one."""
273 fake_dead_remote = FakeDeadRemoteClient()
274 fake_alive_remote = FakeAliveRemoteClient()
275 sb = SignalBroadcaster()
276 sb.remote_register_to_signals(fake_dead_remote)
277 sb.remote_register_to_signals(fake_alive_remote)
278 sb.emit_signal("sample_signal")
279 self.assertTrue(fake_alive_remote.called, "The alive must be called.")
280
281
282class SignalHandlingTestCase(TestCase):
283 """Base for test suites that deal with IPC signal handling."""
284
285 timeout = 3
286 signal_names = []
287 first_signal = None
288
289 def create_handler(self, d):
290 """Create a handler in a new namespace."""
291 return lambda *args: d.callback(args)
292
293 def install_handlers(self, client):
294 """Install the signal handlers."""
295 signal_handlers = []
296
297 for signal_name in self.signal_names:
298 d = defer.Deferred()
299 handler = self.create_handler(d)
300 handler_name = "on_%s_cb" % signal_name
301 setattr(client, handler_name, handler)
302 signal_handlers.append(d)
303 self.first_signal = defer.DeferredList(signal_handlers,
304 fireOnOneCallback=True,
305 fireOnOneErrback=True)
306
307 @defer.inlineCallbacks
308 def assert_fired(self, name, *args):
309 """Assert that the given signal was fired."""
310 signal_args, signal_index = yield self.first_signal
311 self.assertEqual(args, signal_args[:len(args)])
312 self.assertEqual(name, self.signal_names[signal_index])
313
314
315class SSOLoginTestCase(ConnectedTestCase, SignalHandlingTestCase):
316 """Test the login class."""
317
318 signal_names = [
319 'captcha_generated',
320 'captcha_generation_error',
321 'user_registered',
322 'user_registration_error',
323 'logged_in',
324 'login_error',
325 'user_not_validated',
326 'email_validated',
327 'email_validation_error',
328 'password_reset_token_sent',
329 'password_reset_error',
330 'password_changed',
331 'password_change_error',
332 ]
333
334 @defer.inlineCallbacks
335 def setUp(self):
336 """Setup tests."""
337 yield super(SSOLoginTestCase, self).setUp()
338 self.mocker = Mocker()
339 self.root = self.mocker.mock()
340 self.login = SSOLogin(None)
341 # start pb
342 self.sso_root = UbuntuSSORoot(sso_login=self.login)
343 # pylint: disable=E1101
344 yield self.setup_client_server(self.sso_root)
345 self.client = yield self._get_client()
346 # pylint: enable=E1101
347
348 @defer.inlineCallbacks
349 def _get_client(self):
350 """Get the client."""
351 # request the remote object and create a client
352 root = yield self.client_factory.getRootObject()
353 remote = yield root.callRemote('get_sso_login')
354 client = SSOLoginClient(remote)
355 yield client.register_to_signals()
356 self.addCleanup(client.unregister_to_signals)
357
358 self.install_handlers(client)
359 defer.returnValue(client)
360
361 @defer.inlineCallbacks
362 def test_emit_captcha_generated(self):
363 """Test that the cb was called."""
364 app_name = 'app'
365 result = 'result'
366
367 self.login.emit_captcha_generated(app_name, result)
368 yield self.assert_fired("captcha_generated", app_name, result)
369
370 @defer.inlineCallbacks
371 def test_emit_captcha_generation_error(self):
372 """Test that the cb was called."""
373 app_name = 'app'
374 filename = 'file'
375
376 self.patch(self.login.root.processor, "generate_captcha",
377 TestException.fail)
378 self.login.generate_captcha(app_name, filename)
379 yield self.assert_fired("captcha_generation_error", app_name)
380
381 @defer.inlineCallbacks
382 def test_generate_captcha(self):
383 """Test the call from the client."""
384 app_name = 'app'
385 filename = 'file'
386 self.login.root = self.root
387
388 self.root.generate_captcha(app_name, filename,
389 self.login.emit_captcha_generated,
390 self.login.emit_captcha_generation_error)
391 self.mocker.replay()
392 yield self.client.generate_captcha(app_name, filename)
393 yield self.client.unregister_to_signals()
394 self.mocker.verify()
395
396 @defer.inlineCallbacks
397 def test_emit_user_registered(self):
398 """Test that the cb was called."""
399 app_name = 'app'
400 result = 'result'
401
402 self.login.emit_user_registered(app_name, result)
403 yield self.assert_fired("user_registered", app_name, result)
404
405 @defer.inlineCallbacks
406 def test_emit_user_registration_error(self):
407 """Test that the cb was called."""
408 app_name = 'app'
409
410 self.patch(self.login.root.processor, "register_user",
411 TestException.fail)
412 self.login.register_user(app_name, "email", "password", "name",
413 "captcha_id", "captcha_solution")
414 yield self.assert_fired("user_registration_error", app_name)
415
416 @defer.inlineCallbacks
417 def test_register_user(self):
418 """Test the call from the client."""
419 app_name = 'app'
420 email = 'email'
421 password = 'password'
422 displayname = 'name'
423 captcha_id = 'captcha_id'
424 captcha_solution = 'captcha_solution'
425 self.login.root = self.root
426
427 self.root.register_user(app_name, email, password, displayname,
428 captcha_id, captcha_solution,
429 self.login.emit_user_registered,
430 self.login.emit_user_registration_error)
431 self.mocker.replay()
432 yield self.client.register_user(app_name, email, password, displayname,
433 captcha_id, captcha_solution)
434 yield self.client.unregister_to_signals()
435 self.mocker.verify()
436
437 @defer.inlineCallbacks
438 def test_emit_logged_in(self):
439 """Test that the cb was called."""
440 app_name = 'app'
441 result = 'result'
442
443 self.login.emit_logged_in(app_name, result)
444 yield self.assert_fired("logged_in", app_name)
445
446 @defer.inlineCallbacks
447 def test_emit_login_error(self):
448 """Test that the db was called."""
449 app_name = 'app'
450
451 self.patch(main, "get_token_name", lambda _: None)
452 self.patch(self.login.root.processor, "login",
453 TestException.fail)
454 self.login.login(app_name, "email", "password")
455 yield self.assert_fired("login_error", app_name)
456
457 @defer.inlineCallbacks
458 def test_emit_user_not_validated(self):
459 """Test that the cb was called."""
460 app_name = 'app'
461 result = 'result'
462
463 self.login.emit_user_not_validated(app_name, result)
464 yield self.assert_fired("user_not_validated", app_name)
465
466 @defer.inlineCallbacks
467 def test_login(self):
468 """Test the call from the client."""
469 app_name = 'app'
470 email = 'email'
471 password = 'password'
472 self.login.root = self.root
473
474 self.root.login(app_name, email, password,
475 self.login.emit_logged_in,
476 self.login.emit_login_error,
477 self.login.emit_user_not_validated)
478 self.mocker.replay()
479 yield self.client.login(app_name, email, password)
480 yield self.client.unregister_to_signals()
481 self.mocker.verify()
482
483 @defer.inlineCallbacks
484 def test_emit_email_validated(self):
485 """Test the cb was called."""
486 app_name = 'app'
487 result = 'result'
488
489 self.login.emit_email_validated(app_name, result)
490 yield self.assert_fired("email_validated", app_name)
491
492 @defer.inlineCallbacks
493 def test_emit_email_validation_error(self):
494 """Test the cb was called."""
495 app_name = 'app'
496
497 self.patch(main, "get_token_name", lambda _: None)
498 self.patch(self.login.root.processor, "validate_email",
499 TestException.fail)
500 self.login.validate_email(app_name, "email", "password", "token")
501 yield self.assert_fired("email_validation_error", app_name)
502
503 @defer.inlineCallbacks
504 def test_validate_email(self):
505 """Test the client calll."""
506 app_name = 'app'
507 email = 'email'
508 password = 'password'
509 email_token = 'token'
510 self.login.root = self.root
511
512 self.root.validate_email(app_name, email, password, email_token,
513 self.login.emit_email_validated,
514 self.login.emit_email_validation_error)
515 self.mocker.replay()
516 yield self.client.validate_email(app_name, email, password,
517 email_token)
518 yield self.client.unregister_to_signals()
519 self.mocker.verify()
520
521 @defer.inlineCallbacks
522 def test_emit_password_reset_token_sent(self):
523 """Test the cb was called."""
524 app_name = 'app'
525 result = 'result'
526
527 self.login.emit_password_reset_token_sent(app_name, result)
528 yield self.assert_fired("password_reset_token_sent", app_name)
529
530 @defer.inlineCallbacks
531 def test_emit_password_reset_error(self):
532 """Test the cb was called."""
533 app_name = 'app'
534
535 self.patch(self.login.root.processor, "request_password_reset_token",
536 TestException.fail)
537 self.login.request_password_reset_token(app_name, "email")
538 yield self.assert_fired("password_reset_error", app_name)
539
540 @defer.inlineCallbacks
541 def test_request_password_reset_token(self):
542 """Test the client call."""
543 app_name = 'app'
544 email = 'email'
545 self.login.root = self.root
546
547 self.root.request_password_reset_token(app_name, email,
548 self.login.emit_password_reset_token_sent,
549 self.login.emit_password_reset_error)
550 self.mocker.replay()
551 self.client.request_password_reset_token(app_name, email)
552 yield self.client.unregister_to_signals()
553 self.mocker.verify()
554
555 @defer.inlineCallbacks
556 def test_emit_password_changed(self):
557 """Test the cb was called."""
558 app_name = 'app'
559 result = 'result'
560
561 self.login.emit_password_changed(app_name, result)
562 yield self.assert_fired("password_changed", app_name)
563
564 @defer.inlineCallbacks
565 def test_emit_password_change_error(self):
566 """Test the cb was called."""
567 app_name = 'app'
568
569 self.patch(self.login.root.processor, "set_new_password",
570 TestException.fail)
571 self.login.set_new_password(app_name, "email", "token", "password")
572 yield self.assert_fired("password_change_error", app_name)
573
574 @defer.inlineCallbacks
575 def test_set_new_password(self):
576 """Test the client call."""
577 app_name = 'app'
578 email = 'email'
579 token = 'token'
580 new_password = 'password'
581 self.login.root = self.root
582
583 self.root.set_new_password(app_name, email, token, new_password,
584 self.login.emit_password_changed,
585 self.login.emit_password_change_error)
586 self.mocker.replay()
587 yield self.client.set_new_password(app_name, email, token,
588 new_password)
589 yield self.client.unregister_to_signals()
590 self.mocker.verify()
591
592
593class CredentialsManagementTestCase(ConnectedTestCase, TestCase):
594 """Test the management class."""
595
596 @defer.inlineCallbacks
597 def setUp(self):
598 """Set up tests."""
599 yield super(CredentialsManagementTestCase, self).setUp()
600 self.mocker = Mocker()
601 self.root = self.mocker.mock()
602 self.except_to_errdict = self.mocker.replace(
603 'ubuntu_sso.main.except_to_errdict')
604 self.creds = CredentialsManagement(None, None)
605 self.creds.root = self.root
606 # start pb
607 self.sso_root = UbuntuSSORoot(cred_manager=self.creds)
608 # pylint: disable=E1101
609 yield self.setup_client_server(self.sso_root)
610 self.client = yield self._get_client()
611 # pylint: enable=E1101
612
613 @defer.inlineCallbacks
614 def _get_client(self):
615 """Get the client."""
616 # request the remote object and create a client
617 root = yield self.client_factory.getRootObject()
618 remote = yield root.callRemote('get_cred_manager')
619 client = CredentialsManagementClient(remote)
620 yield client.register_to_signals()
621 self.addCleanup(client.unregister_to_signals)
622 # set the cb
623 for signal_name in ['on_authorization_denied_cb',
624 'on_credentials_found_cb',
625 'on_credentials_not_found_cb',
626 'on_credentials_cleared_cb',
627 'on_credentials_stored_cb',
628 'on_credentials_error_cb']:
629 setattr(client, signal_name, self.mocker.mock())
630 defer.returnValue(client)
631
632 @defer.inlineCallbacks
633 def test_shutdown(self):
634 """Test that root is called."""
635 # pylint: disable=W0104
636 self.root.ref_count
637 # pylint: enable=W0104
638 self.mocker.result(1)
639 self.root.shutdown()
640 self.mocker.replay()
641 yield self.client.shutdown()
642 yield self.client.unregister_to_signals()
643
644 @defer.inlineCallbacks
645 def test_emit_authorization_denied(self):
646 """Test the callback is called."""
647 app_name = 'app'
648
649 # pylint: disable=W0104
650 self.root.ref_count
651 # pylint: enable=W0104
652 self.mocker.result(1)
653 self.root.ref_count = 0
654 self.client.on_authorization_denied_cb(app_name)
655 self.mocker.replay()
656 self.creds.emit_authorization_denied(app_name)
657 yield self.client.unregister_to_signals()
658 self.mocker.verify()
659
660 @defer.inlineCallbacks
661 def test_emit_credentials_found(self):
662 """Test the callback is called."""
663 app_name = 'app'
664 creds = 'creds'
665
666 # pylint: disable=W0104
667 self.root.ref_count
668 # pylint: enable=W0104
669 self.mocker.result(1)
670 self.root.ref_count = 0
671 self.client.on_credentials_found_cb(app_name, creds)
672 self.mocker.replay()
673 self.creds.emit_credentials_found(app_name, creds)
674 yield self.client.unregister_to_signals()
675 self.mocker.verify()
676
677 @defer.inlineCallbacks
678 def test_emit_credentials_not_found(self):
679 """Test the callback is called."""
680 app_name = 'app'
681
682 # pylint: disable=W0104
683 self.root.ref_count
684 # pylint: enable=W0104
685 self.mocker.result(1)
686 self.root.ref_count = 0
687 self.client.on_credentials_not_found_cb(app_name)
688 self.mocker.replay()
689 self.creds.emit_credentials_not_found(app_name)
690 yield self.client.unregister_to_signals()
691 self.mocker.verify()
692
693 @defer.inlineCallbacks
694 def test_emit_credentials_cleared(self):
695 """Test the callback is called."""
696 app_name = 'app'
697
698 # pylint: disable=W0104
699 self.root.ref_count
700 # pylint: enable=W0104
701 self.mocker.result(1)
702 self.root.ref_count = 0
703 self.client.on_credentials_cleared_cb(app_name)
704 self.mocker.replay()
705 self.creds.emit_credentials_cleared(app_name)
706 yield self.client.unregister_to_signals()
707 self.mocker.verify()
708
709 @defer.inlineCallbacks
710 def test_emit_credentials_stored(self):
711 """Test the callback is called."""
712 app_name = 'app'
713
714 # pylint: disable=W0104
715 self.root.ref_count
716 # pylint: enable=W0104
717 self.mocker.result(1)
718 self.root.ref_count = 0
719 self.client.on_credentials_stored_cb(app_name)
720 self.mocker.replay()
721 self.creds.emit_credentials_stored(app_name)
722 yield self.client.unregister_to_signals()
723 self.mocker.verify()
724
725 @defer.inlineCallbacks
726 def test_emit_credentials_error(self):
727 """Test the callback is called."""
728 app_name = 'app'
729 raised_error = 'error'
730
731 # pylint: disable=W0104
732 self.root.ref_count
733 # pylint: enable=W0104
734 self.mocker.result(1)
735 self.root.ref_count = 0
736 self.client.on_credentials_error_cb(app_name, raised_error)
737 self.mocker.replay()
738 self.creds.emit_credentials_error(app_name, raised_error)
739 yield self.client.unregister_to_signals()
740 self.mocker.verify()
741
742 @defer.inlineCallbacks
743 def test_find_credentials(self):
744 """Test that root is called."""
745 app_name = 'app'
746 args = 'args'
747
748 # pylint: disable=W0212
749 self.root.find_credentials(app_name, args, MATCH(callable),
750 self.creds._process_failure)
751 # pylint: enable=W0212
752 self.root.shutdown()
753 yield self.client.find_credentials(app_name, args)
754 yield self.client.unregister_to_signals()
755
756 @defer.inlineCallbacks
757 def test_clear_credentials(self):
758 """Test that root is called."""
759 app_name = 'app'
760 args = 'args'
761
762 # pylint: disable=W0212
763 self.root.clear_credentials(app_name, args, MATCH(callable),
764 self.creds._process_failure)
765 # pylint: enable=W0212
766 self.mocker.replay()
767 yield self.client.clear_credentials(app_name, args)
768 yield self.client.unregister_to_signals()
769
770 @defer.inlineCallbacks
771 def test_store_credentials(self):
772 """Test that root is called."""
773 app_name = 'app'
774 args = 'args'
775
776 # pylint: disable=W0212
777 self.root.store_credentials(app_name, args, MATCH(callable),
778 self.creds._process_failure)
779 # pylint: enable=W0212
780 self.mocker.replay()
781 yield self.client.store_credentials(app_name, args)
782 yield self.client.unregister_to_signals()
783
784 @defer.inlineCallbacks
785 def test_register(self):
786 """Test that root is called."""
787 app_name = 'app'
788 args = 'args'
789
790 self.root.register(app_name, args)
791 self.mocker.replay()
792 yield self.client.register(app_name, args)
793 yield self.client.unregister_to_signals()
794
795 @defer.inlineCallbacks
796 def test_login(self):
797 """Test that root is called."""
798 app_name = 'app'
799 args = 'args'
800
801 self.root.login(app_name, args)
802 self.mocker.replay()
803 yield self.client.login(app_name, args)
804 yield self.client.unregister_to_signals()
805
806 @defer.inlineCallbacks
807 def test_login_email_password(self):
808 """Test that root is called."""
809 app_name = 'app'
810 args = 'args'
811
812 self.root.login_email_password(app_name, args)
813 self.mocker.replay()
814 yield self.client.login_email_password(app_name, args)
815 yield self.client.unregister_to_signals()
816
817
818class MockRemoteObject(object):
819 """A mock RemoteObject."""
820
821 def __init__(self):
822 """A place to store MockRemoteObjects created by this."""
823 self.children = []
824
825 def callRemote(self, method_name, *args, **kwargs):
826 """Any call to a remote object returns one of us."""
827 new_child = MockRemoteObject()
828 self.children.append(new_child)
829 return defer.succeed(new_child)
830
831
832class MockPBClientFactory(object):
833 """A mock PBClientFactory."""
834
835 connected = False
836 root_object = None
837
838 def getRootObject(self):
839 """Store that we were called; return a deferred."""
840 assert self.connected == True
841 self.root_object = MockRemoteObject()
842 return defer.succeed(self.root_object)
843
844
845class MockReactorResult(object):
846 """A mock result when calling twisted.reactor.connectTCP."""
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches