Merge lp:~nataliabidart/ubuntu/natty/ubuntu-sso-client/ubuntu-sso-client-1.1.12 into lp:ubuntu/natty/ubuntu-sso-client

Proposed by Natalia Bidart
Status: Merged
Merged at revision: 27
Proposed branch: lp:~nataliabidart/ubuntu/natty/ubuntu-sso-client/ubuntu-sso-client-1.1.12
Merge into: lp:ubuntu/natty/ubuntu-sso-client
Diff against target: 6817 lines (+3866/-2670)
35 files modified
PKG-INFO (+1/-1)
debian/changelog (+19/-0)
run-tests (+1/-1)
run-tests.bat (+64/-0)
setup.py (+4/-2)
ubuntu_sso/account.py (+7/-4)
ubuntu_sso/gtk/gui.py (+8/-10)
ubuntu_sso/gtk/tests/test_gui.py (+1/-6)
ubuntu_sso/keyring.py (+0/-188)
ubuntu_sso/keyring/__init__.py (+90/-0)
ubuntu_sso/keyring/linux.py (+141/-0)
ubuntu_sso/keyring/tests/__init__.py (+17/-0)
ubuntu_sso/keyring/tests/test_linux.py (+259/-0)
ubuntu_sso/keyring/tests/test_windows.py (+62/-0)
ubuntu_sso/keyring/windows.py (+60/-0)
ubuntu_sso/main.py (+0/-604)
ubuntu_sso/main/__init__.py (+375/-0)
ubuntu_sso/main/linux.py (+486/-0)
ubuntu_sso/main/tests/__init__.py (+17/-0)
ubuntu_sso/main/tests/test_common.py (+245/-0)
ubuntu_sso/main/tests/test_linux.py (+1306/-0)
ubuntu_sso/main/tests/test_windows.py (+17/-0)
ubuntu_sso/main/windows.py (+17/-0)
ubuntu_sso/networkstate.py (+0/-108)
ubuntu_sso/networkstate/__init__.py (+40/-0)
ubuntu_sso/networkstate/linux.py (+108/-0)
ubuntu_sso/networkstate/tests/__init__.py (+17/-0)
ubuntu_sso/networkstate/tests/test_linux.py (+181/-0)
ubuntu_sso/networkstate/tests/test_windows.py (+119/-0)
ubuntu_sso/networkstate/windows.py (+199/-0)
ubuntu_sso/tests/bin/show_nm_state (+1/-1)
ubuntu_sso/tests/test_account.py (+4/-2)
ubuntu_sso/tests/test_keyring.py (+0/-258)
ubuntu_sso/tests/test_main.py (+0/-1304)
ubuntu_sso/tests/test_networkstate.py (+0/-181)
To merge this branch: bzr merge lp:~nataliabidart/ubuntu/natty/ubuntu-sso-client/ubuntu-sso-client-1.1.12
Reviewer Review Type Date Requested Status
Ubuntu Sponsors Pending
Review via email: mp+54531@code.launchpad.net

Description of the change

  * New upstream release:
    [ Natalia B. Bidart <email address hidden> ]
      - Tests for a particular package are now inside that package.
      - Register now uses the 'displayname' field to pass it on to SSO as
        display name (LP: #709494).
    [ Manuel de la Pena <email address hidden> ]
      - Fix main issues.
      - First step of implementing the code in main on windows.
      - Fixed setup.py issues so that we can build .debs (LP: #735383).
      - Added the network status implementation for windows (LP: #727680).
      - Added an implementation of the keyring on windows (LP: #684967).
      - Added script to run tests on windows using u1trial (LP: #684988).
      - Added bat to run tests on windows (LP: #684988).

To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'PKG-INFO'
--- PKG-INFO 2011-02-03 15:46:51 +0000
+++ PKG-INFO 2011-03-23 14:22:30 +0000
@@ -1,6 +1,6 @@
1Metadata-Version: 1.11Metadata-Version: 1.1
2Name: ubuntu-sso-client2Name: ubuntu-sso-client
3Version: 1.1.113Version: 1.1.12
4Summary: Ubuntu Single Sign-On client4Summary: Ubuntu Single Sign-On client
5Home-page: https://launchpad.net/ubuntu-sso-client5Home-page: https://launchpad.net/ubuntu-sso-client
6Author: Natalia Bidart6Author: Natalia Bidart
77
=== modified file 'debian/changelog'
--- debian/changelog 2011-02-04 08:41:02 +0000
+++ debian/changelog 2011-03-23 14:22:30 +0000
@@ -1,3 +1,22 @@
1ubuntu-sso-client (1.1.12-0ubuntu1) UNRELEASED; urgency=low
2
3 * New upstream release:
4
5 [ Natalia B. Bidart <natalia.bidart@canonical.com> ]
6 - Tests for a particular package are now inside that package.
7 - Register now uses the 'displayname' field to pass it on to SSO as
8 display name (LP: #709494).
9 [ Manuel de la Pena <mandel@themacaque.com> ]
10 - Fix main issues.
11 - First step of implementing the code in main on windows.
12 - Fixed setup.py issues so that we can build .debs (LP: #735383).
13 - Added the network status implementation for windows (LP: #727680).
14 - Added an implementation of the keyring on windows (LP: #684967).
15 - Added script to run tests on windows using u1trial (LP: #684988).
16 - Added bat to run tests on windows (LP: #684988).
17
18 -- Natalia Bidart (nessita) <nataliabidart@gmail.com> Tue, 22 Mar 2011 23:31:22 -0300
19
1ubuntu-sso-client (1.1.11-0ubuntu1) natty; urgency=low20ubuntu-sso-client (1.1.11-0ubuntu1) natty; urgency=low
221
3 * New upstream release:22 * New upstream release:
423
=== modified file 'run-tests'
--- run-tests 2011-01-12 18:56:56 +0000
+++ run-tests 2011-03-23 14:22:30 +0000
@@ -33,5 +33,5 @@
33}33}
3434
35echo "Running test suite for ""$MODULE"35echo "Running test suite for ""$MODULE"
36`which xvfb-run` u1trial "$MODULE" && style_check36`which xvfb-run` u1trial "$MODULE" -i "test_windows.py" && style_check
37rm -rf _trial_temp37rm -rf _trial_temp
3838
=== added file 'run-tests.bat'
--- run-tests.bat 1970-01-01 00:00:00 +0000
+++ run-tests.bat 2011-03-23 14:22:30 +0000
@@ -0,0 +1,64 @@
1:: Author: Manuel de la Pena <manuel@canonical.com>
2::
3:: Copyright 2010 Canonical Ltd.
4::
5:: This program is free software: you can redistribute it and/or modify it
6:: under the terms of the GNU General Public License version 3, as published
7:: by the Free Software Foundation.
8::
9:: This program is distributed in the hope that it will be useful, but
10:: WITHOUT ANY WARRANTY; without even the implied warranties of
11:: MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12:: PURPOSE. See the GNU General Public License for more details.
13::
14:: You should have received a copy of the GNU General Public License along
15:: with this program. If not, see <http://www.gnu.org/licenses/>.
16@ECHO off
17:: We could have Python 2.6 or 2.7 on Windows. In order to check availability,
18:: we should first check for 2.7, and run the tests, otherwise fall back to 2.6.
19SET PYTHONPATH=""
20:: This is very annoying; FOR /F will work differently depending on the output
21:: of reg which is not consistent between OS versions (XP, 7). We must choose
22:: the tokens according to OS version.
23SET PYTHONPATHTOKENS=3
24VER | FIND "XP" > nul
25IF %ERRORLEVEL% == 0 SET PYTHONPATHTOKENS=4
26ECHO Checking if python 2.7 is in the system
27:: Look for python 2.7
28FOR /F "tokens=%PYTHONPATHTOKENS%" %%A IN ('REG QUERY HKLM\Software\Python\PythonCore\2.7\InstallPath /ve') DO @SET PYTHONPATH=%%A
29IF NOT %PYTHONPATH% == "" GOTO :PYTHONPRESENT
30ECHO Checking if python 2.6 is in the system
31:: we do not have python 2.7 in the system, try to find 2.6
32FOR /F "tokens=%PYTHONPATHTOKENS%" %%A IN ('REG QUERY HKLM\Software\Python\PythonCore\2.6\InstallPath /ve') DO @SET PYTHONPATH=%%A
33IF NOT %PYTHONPATH% == "" GOTO :PYTHONPRESENT
34
35:: we do not have python (2.6 or 2.7) this could hapen in the case that the
36:: user installed the 32version in a 64 machine, let check if the software was installed in the wow key
37
38:: Look for python 2.7 in WoW64
39ECHO Checking if python 2.7 32 is in the system
40FOR /F "tokens=%PYTHONPATHTOKENS%" %%A IN ('REG QUERY HKLM\Software\Wow6432Node\Python\PythonCore\2.7\InstallPath /ve') DO @SET PYTHONPATH=%%A
41IF NOT %PYTHONPATH% == "" GOTO :PYTHONPRESENT
42ECHO Checking if python 2.6 32 is in the system
43:: we do not have python 2.7 in the system, try to find 2.6
44FOR /F "tokens=%PYTHONPATHTOKENS%" %%A IN ('REG QUERY HKLM\Software\Wow6432Node\Python\PythonCore\2.6\InstallPath /ve') DO @SET PYTHONPATH=%%A
45IF NOT %PYTHONPATH% == "" GOTO :PYTHONPRESENT
46
47ECHO Please ensure you have python installed
48GOTO :END
49
50
51:PYTHONPRESENT
52ECHO Python found, executing the tests...
53:: execute the tests with a number of ignored linux only modules
54"%PYTHONPATH%\python.exe" "%PYTHONPATH%\Scripts\u1trial" -c ubuntu_sso -i "test_gui.py, test_linux.py, test_txsecrets.py"
55"%PYTHONPATH%\python.exe" "%PYTHONPATH%\Scripts\u1lint" ubuntu_sso
56:: test for style if we can, if pep8 is not present, move to the end
57IF EXIST "%PYTHONPATH%Scripts\pep8.exe"
58"%PYTHONPATH%\Scripts\pep8.exe" --repeat ubuntu_sso
59ELSE
60ECHO Style checks were not done
61:: Delete the temp folders
62RMDIR /s /q _trial_temp
63RMDIR /s /q .coverage
64:END
0\ No newline at end of file65\ No newline at end of file
166
=== modified file 'setup.py'
--- setup.py 2011-02-03 15:46:51 +0000
+++ setup.py 2011-03-23 14:22:30 +0000
@@ -86,7 +86,7 @@
8686
87DistUtilsExtra.auto.setup(87DistUtilsExtra.auto.setup(
88 name='ubuntu-sso-client',88 name='ubuntu-sso-client',
89 version='1.1.11',89 version='1.1.12',
90 license='GPL v3',90 license='GPL v3',
91 author='Natalia Bidart',91 author='Natalia Bidart',
92 author_email='natalia.bidart@canonical.com',92 author_email='natalia.bidart@canonical.com',
@@ -94,7 +94,9 @@
94 long_description='Desktop service to allow applications to sign in' \94 long_description='Desktop service to allow applications to sign in' \
95 'to Ubuntu services via SSO',95 'to Ubuntu services via SSO',
96 url='https://launchpad.net/ubuntu-sso-client',96 url='https://launchpad.net/ubuntu-sso-client',
97 packages=['ubuntu_sso', 'ubuntu_sso.gtk', 'ubuntu_sso.utils'],97 packages=['ubuntu_sso', 'ubuntu_sso.gtk', 'ubuntu_sso.utils',
98 'ubuntu_sso.keyring', 'ubuntu_sso.networkstate',
99 'ubuntu_sso.main'],
98 data_files=[100 data_files=[
99 ('share/dbus-1/services', ['data/com.ubuntu.sso.service']),101 ('share/dbus-1/services', ['data/com.ubuntu.sso.service']),
100 ('lib/ubuntu-sso-client', ['bin/ubuntu-sso-login']),102 ('lib/ubuntu-sso-client', ['bin/ubuntu-sso-login']),
101103
=== modified file 'ubuntu_sso/account.py'
--- ubuntu_sso/account.py 2010-12-16 16:31:34 +0000
+++ ubuntu_sso/account.py 2011-03-23 14:22:30 +0000
@@ -127,11 +127,12 @@
127127
128 return captcha['captcha_id']128 return captcha['captcha_id']
129129
130 def register_user(self, email, password, captcha_id, captcha_solution):130 def register_user(self, email, password, displayname,
131 captcha_id, captcha_solution):
131 """Register a new user with 'email' and 'password'."""132 """Register a new user with 'email' and 'password'."""
132 logger.debug('register_user: email: %r password: <hidden>, '133 logger.debug('register_user: email: %r password: <hidden>, '
133 'captcha_id: %r, captcha_solution: %r',134 'displayname: %r, captcha_id: %r, captcha_solution: %r',
134 email, captcha_id, captcha_solution)135 email, displayname, captcha_id, captcha_solution)
135 sso_service = self.sso_service_class(None, self.service_url)136 sso_service = self.sso_service_class(None, self.service_url)
136 if not self._valid_email(email):137 if not self._valid_email(email):
137 logger.error('register_user: InvalidEmailError for email: %r',138 logger.error('register_user: InvalidEmailError for email: %r',
@@ -142,7 +143,9 @@
142 raise InvalidPasswordError()143 raise InvalidPasswordError()
143144
144 result = sso_service.registrations.register(145 result = sso_service.registrations.register(
145 email=email, password=password, captcha_id=captcha_id,146 email=email, password=password,
147 displayname=displayname,
148 captcha_id=captcha_id,
146 captcha_solution=captcha_solution)149 captcha_solution=captcha_solution)
147 logger.info('register_user: email: %r result: %r', email, result)150 logger.info('register_user: email: %r result: %r', email, result)
148151
149152
=== modified file 'ubuntu_sso/gtk/gui.py'
--- ubuntu_sso/gtk/gui.py 2011-01-12 18:56:56 +0000
+++ ubuntu_sso/gtk/gui.py 2011-03-23 14:22:30 +0000
@@ -371,8 +371,6 @@
371 msg = 'UbuntuSSOClientGUI: failed set_transient_for win id %r'371 msg = 'UbuntuSSOClientGUI: failed set_transient_for win id %r'
372 logger.exception(msg, window_id)372 logger.exception(msg, window_id)
373373
374 # Hidding unused widgets to save some space (LP #627440).
375 self.name_entry.hide()
376 self.yes_to_updates_checkbutton.hide()374 self.yes_to_updates_checkbutton.hide()
377375
378 self.window.show()376 self.window.show()
@@ -777,11 +775,10 @@
777775
778 error = False776 error = False
779777
780 # Hidding unused widgets to save some space (LP #627440).778 name = self.name_entry.get_text()
781 #name = self.name_entry.get_text()779 if not name:
782 #if not name:780 self.name_entry.set_warning(self.FIELD_REQUIRED)
783 # self.name_entry.set_warning(self.FIELD_REQUIRED)781 error = True
784 # error = True
785782
786 # check email783 # check email
787 email1 = self.email1_entry.get_text()784 email1 = self.email1_entry.get_text()
@@ -820,10 +817,11 @@
820 self.user_password = password1817 self.user_password = password1
821818
822 logger.info('Calling register_user with email %r, password <hidden>,' \819 logger.info('Calling register_user with email %r, password <hidden>,' \
823 ' captcha_id %r and captcha_solution %r.', email1,820 ' name %r, captcha_id %r and captcha_solution %r.', email1,
824 self._captcha_id, captcha_solution)821 name, self._captcha_id, captcha_solution)
825 f = self.backend.register_user822 f = self.backend.register_user
826 f(self.app_name, email1, password1, self._captcha_id, captcha_solution,823 f(self.app_name, email1, password1, name,
824 self._captcha_id, captcha_solution,
827 reply_handler=NO_OP, error_handler=NO_OP)825 reply_handler=NO_OP, error_handler=NO_OP)
828826
829 def on_verify_token_button_clicked(self, *args, **kwargs):827 def on_verify_token_button_clicked(self, *args, **kwargs):
830828
=== modified file 'ubuntu_sso/gtk/tests/test_gui.py'
--- ubuntu_sso/gtk/tests/test_gui.py 2011-01-12 18:56:56 +0000
+++ ubuntu_sso/gtk/tests/test_gui.py 2011-03-23 14:22:30 +0000
@@ -713,7 +713,7 @@
713 expected = 'register_user'713 expected = 'register_user'
714 self.assertIn(expected, self.ui.backend._called)714 self.assertIn(expected, self.ui.backend._called)
715 self.assertEqual(self.ui.backend._called[expected],715 self.assertEqual(self.ui.backend._called[expected],
716 ((APP_NAME, EMAIL, PASSWORD, CAPTCHA_ID,716 ((APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID,
717 CAPTCHA_SOLUTION),717 CAPTCHA_SOLUTION),
718 dict(reply_handler=gui.NO_OP,718 dict(reply_handler=gui.NO_OP,
719 error_handler=gui.NO_OP)))719 error_handler=gui.NO_OP)))
@@ -1314,11 +1314,6 @@
1314 self.ui.FIELD_REQUIRED)1314 self.ui.FIELD_REQUIRED)
1315 self.assertNotIn('register_user', self.ui.backend._called)1315 self.assertNotIn('register_user', self.ui.backend._called)
13161316
1317 # Unused variable 'skip'
1318 # pylint: disable=W0612
1319 test_warning_is_shown_if_name_empty.skip = \
1320 'Unused for now, will be hidden to save space (LP: #627440).'
1321
1322 def test_warning_is_shown_if_empty_email(self):1317 def test_warning_is_shown_if_empty_email(self):
1323 """A warning message is shown if emails are empty."""1318 """A warning message is shown if emails are empty."""
1324 self.ui.email1_entry.set_text('')1319 self.ui.email1_entry.set_text('')
13251320
=== added directory 'ubuntu_sso/keyring'
=== removed file 'ubuntu_sso/keyring.py'
--- ubuntu_sso/keyring.py 2010-11-30 13:21:17 +0000
+++ ubuntu_sso/keyring.py 1970-01-01 00:00:00 +0000
@@ -1,188 +0,0 @@
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2010 Canonical
4#
5# Authors:
6# Andrew Higginson
7# Alejandro J. Cura <alecu@canonical.com>
8#
9# This program is free software; you can redistribute it and/or modify it under
10# the terms of the GNU General Public License as published by the Free Software
11# Foundation; version 3.
12#
13# This program is distributed in the hope that it will be useful, but WITHOUT
14# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
16# details.
17#
18# You should have received a copy of the GNU General Public License along with
19# this program; if not, write to the Free Software Foundation, Inc.,
20# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21
22"""Handle keys in the local kerying."""
23
24import socket
25import urllib
26import urlparse
27
28from twisted.internet.defer import inlineCallbacks, returnValue
29
30from ubuntu_sso.logger import setup_logging
31from ubuntu_sso.utils.txsecrets import SecretService
32
33
34logger = setup_logging("ubuntu_sso.keyring")
35TOKEN_SEPARATOR = ' @ '
36SEPARATOR_REPLACEMENT = ' AT '
37
38U1_APP_NAME = "Ubuntu One"
39U1_KEY_NAME = "UbuntuOne token for https://ubuntuone.com"
40U1_KEY_ATTR = {
41 "oauth-consumer-key": "ubuntuone",
42 "ubuntuone-realm": "https://ubuntuone.com",
43}
44
45
46def get_old_token_name(app_name):
47 """Build the token name (old style)."""
48 quoted_app_name = urllib.quote(app_name)
49 computer_name = socket.gethostname()
50 quoted_computer_name = urllib.quote(computer_name)
51 return "%s - %s" % (quoted_app_name, quoted_computer_name)
52
53
54def get_token_name(app_name):
55 """Build the token name."""
56 computer_name = socket.gethostname()
57 computer_name = computer_name.replace(TOKEN_SEPARATOR,
58 SEPARATOR_REPLACEMENT)
59 return TOKEN_SEPARATOR.join((app_name, computer_name)).encode('utf-8')
60
61
62class Keyring(object):
63 """A Keyring for a given application name."""
64
65 def __init__(self):
66 """Initialize this instance."""
67 self.service = SecretService()
68
69 @inlineCallbacks
70 def _find_keyring_item(self, app_name, attr=None):
71 """Return the keyring item or None if not found."""
72 if attr is None:
73 logger.debug("getting attr")
74 attr = self._get_keyring_attr(app_name)
75 logger.debug("finding all items")
76 items = yield self.service.search_items(attr)
77 if len(items) == 0:
78 # if no items found, return None
79 logger.debug("No items found")
80 returnValue(None)
81
82 logger.debug("Returning first item found")
83 returnValue(items[0])
84
85 def _get_keyring_attr(self, app_name):
86 """Build the keyring attributes for this credentials."""
87 attr = {"key-type": "Ubuntu SSO credentials",
88 "token-name": get_token_name(app_name)}
89 return attr
90
91 @inlineCallbacks
92 def set_credentials(self, app_name, cred):
93 """Set the credentials of the Ubuntu SSO item."""
94 # Creates the secret from the credentials
95 secret = urllib.urlencode(cred)
96
97 attr = self._get_keyring_attr(app_name)
98 # Add our SSO credentials to the keyring
99 yield self.service.open_session()
100 collection = yield self.service.get_default_collection()
101 yield collection.create_item(app_name, attr, secret, True)
102
103 @inlineCallbacks
104 def _migrate_old_token_name(self, app_name):
105 """Migrate credentials with old name, store them with new name."""
106 logger.debug("getting keyring attr")
107 attr = self._get_keyring_attr(app_name)
108 logger.debug("getting old token name")
109 attr['token-name'] = get_old_token_name(app_name)
110 logger.debug("finding keyring item")
111 item = yield self._find_keyring_item(app_name, attr=attr)
112 if item is not None:
113 logger.debug("setting credentials")
114 yield self.set_credentials(app_name,
115 dict(urlparse.parse_qsl(item.secret)))
116 logger.debug("deleting old item")
117 yield item.delete()
118
119 logger.debug("finding keyring item")
120 result = yield self._find_keyring_item(app_name)
121 logger.debug("returning result value")
122 returnValue(result)
123
124 @inlineCallbacks
125 def get_credentials(self, app_name):
126 """A deferred with the secret of the SSO item in a dictionary."""
127 # If we have no attributes, return None
128 logger.debug("getting credentials")
129 yield self.service.open_session()
130 logger.debug("calling find item")
131 item = yield self._find_keyring_item(app_name)
132 if item is None:
133 logger.debug("migrating token")
134 item = yield self._migrate_old_token_name(app_name)
135
136 if item is not None:
137 logger.debug("parsing secret")
138 secret = yield item.get_value()
139 returnValue(dict(urlparse.parse_qsl(secret)))
140 else:
141 # if no item found, try getting the old credentials
142 if app_name == U1_APP_NAME:
143 logger.debug("trying old credentials")
144 old_creds = yield try_old_credentials(app_name)
145 returnValue(old_creds)
146 # nothing was found
147 returnValue(None)
148
149 @inlineCallbacks
150 def delete_credentials(self, app_name):
151 """Delete a set of credentials from the keyring."""
152 attr = self._get_keyring_attr(app_name)
153 # Add our SSO credentials to the keyring
154 yield self.service.open_session()
155 collection = yield self.service.get_default_collection()
156 yield collection.create_item(app_name, attr, "secret!", True)
157
158 item = yield self._find_keyring_item(app_name)
159 if item is not None:
160 yield item.delete()
161
162
163class UbuntuOneOAuthKeyring(Keyring):
164 """A particular Keyring for Ubuntu One."""
165
166 def _get_keyring_attr(self, app_name):
167 """Build the keyring attributes for this credentials."""
168 return U1_KEY_ATTR
169
170
171@inlineCallbacks
172def try_old_credentials(app_name):
173 """Try to get old U1 credentials and format them as new."""
174 logger.debug('trying to get old credentials.')
175 old_creds = yield UbuntuOneOAuthKeyring().get_credentials(U1_KEY_NAME)
176 if old_creds is not None:
177 # Old creds found, build a new credentials dict with them
178 creds = {
179 'consumer_key': "ubuntuone",
180 'consumer_secret': "hammertime",
181 'name': U1_KEY_NAME,
182 'token': old_creds["oauth_token"],
183 'token_secret': old_creds["oauth_token_secret"],
184 }
185 logger.debug('found old credentials')
186 returnValue(creds)
187 logger.debug('try_old_credentials: No old credentials for this app.')
188 returnValue(None)
1890
=== added file 'ubuntu_sso/keyring/__init__.py'
--- ubuntu_sso/keyring/__init__.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/keyring/__init__.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,90 @@
1# -*- coding: utf-8 -*-
2# Authors:
3# Andrew Higginson
4# Alejandro J. Cura <alecu@canonical.com>
5# Manuel de la Pena <manuel@canonical.com>
6#
7# Copyright 2011 Canonical Ltd.
8#
9# This program is free software: you can redistribute it and/or modify it
10# under the terms of the GNU General Public License version 3, as published
11# by the Free Software Foundation.
12#
13# This program is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranties of
15# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16# PURPOSE. See the GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License along
19# with this program. If not, see <http://www.gnu.org/licenses/>.
20"""Implementations of different keyrings."""
21
22import socket
23import sys
24import urllib
25
26from twisted.internet.defer import inlineCallbacks, returnValue
27
28from ubuntu_sso.logger import setup_logging
29
30logger = setup_logging("ubuntu_sso.keyring")
31
32TOKEN_SEPARATOR = ' @ '
33SEPARATOR_REPLACEMENT = ' AT '
34
35U1_APP_NAME = "Ubuntu One"
36U1_KEY_NAME = "UbuntuOne token for https://ubuntuone.com"
37U1_KEY_ATTR = {
38 "oauth-consumer-key": "ubuntuone",
39 "ubuntuone-realm": "https://ubuntuone.com",
40}
41
42
43def get_old_token_name(app_name):
44 """Build the token name (old style)."""
45 quoted_app_name = urllib.quote(app_name)
46 computer_name = socket.gethostname()
47 quoted_computer_name = urllib.quote(computer_name)
48 return "%s - %s" % (quoted_app_name, quoted_computer_name)
49
50
51def get_token_name(app_name):
52 """Build the token name."""
53 computer_name = socket.gethostname()
54 computer_name = computer_name.replace(TOKEN_SEPARATOR,
55 SEPARATOR_REPLACEMENT)
56 return TOKEN_SEPARATOR.join((app_name, computer_name)).encode('utf-8')
57
58
59@inlineCallbacks
60def try_old_credentials(app_name):
61 """Try to get old U1 credentials and format them as new."""
62 logger.debug('trying to get old credentials.')
63 old_creds = yield UbuntuOneOAuthKeyring().get_credentials(U1_KEY_NAME)
64 if old_creds is not None:
65 # Old creds found, build a new credentials dict with them
66 creds = {
67 'consumer_key': "ubuntuone",
68 'consumer_secret': "hammertime",
69 'name': U1_KEY_NAME,
70 'token': old_creds["oauth_token"],
71 'token_secret': old_creds["oauth_token_secret"],
72 }
73 logger.debug('found old credentials')
74 returnValue(creds)
75 logger.debug('try_old_credentials: No old credentials for this app.')
76 returnValue(None)
77
78
79if sys.platform == 'win32':
80 from ubuntu_sso.keyring.windows import Keyring
81else:
82 from ubuntu_sso.keyring.linux import Keyring
83
84
85class UbuntuOneOAuthKeyring(Keyring):
86 """A particular Keyring for Ubuntu One."""
87
88 def _get_keyring_attr(self, app_name):
89 """Build the keyring attributes for this credentials."""
90 return U1_KEY_ATTR
091
=== added file 'ubuntu_sso/keyring/linux.py'
--- ubuntu_sso/keyring/linux.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/keyring/linux.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,141 @@
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2010 Canonical
4#
5# Authors:
6# Andrew Higginson
7# Alejandro J. Cura <alecu@canonical.com>
8# Natalia B. Bidart <natalia.bidart@canonical.com>
9# Manuel de la Pena <manuel@canonical.com>
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; version 3.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc.,
22# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23
24"""Handle keys in the local kerying."""
25
26import urllib
27import urlparse
28
29from twisted.internet.defer import inlineCallbacks, returnValue
30
31from ubuntu_sso.logger import setup_logging
32from ubuntu_sso.utils.txsecrets import SecretService
33from ubuntu_sso.keyring import (
34 get_token_name,
35 get_old_token_name,
36 U1_APP_NAME,
37 try_old_credentials)
38
39
40logger = setup_logging("ubuntu_sso.keyring")
41
42
43class Keyring(object):
44 """A Keyring for a given application name."""
45
46 def __init__(self):
47 """Initialize this instance."""
48 self.service = SecretService()
49
50 @inlineCallbacks
51 def _find_keyring_item(self, app_name, attr=None):
52 """Return the keyring item or None if not found."""
53 if attr is None:
54 logger.debug("getting attr")
55 attr = self._get_keyring_attr(app_name)
56 logger.debug("finding all items")
57 items = yield self.service.search_items(attr)
58 if len(items) == 0:
59 # if no items found, return None
60 logger.debug("No items found")
61 returnValue(None)
62
63 logger.debug("Returning first item found")
64 returnValue(items[0])
65
66 def _get_keyring_attr(self, app_name):
67 """Build the keyring attributes for this credentials."""
68 attr = {"key-type": "Ubuntu SSO credentials",
69 "token-name": get_token_name(app_name)}
70 return attr
71
72 @inlineCallbacks
73 def set_credentials(self, app_name, cred):
74 """Set the credentials of the Ubuntu SSO item."""
75 # Creates the secret from the credentials
76 secret = urllib.urlencode(cred)
77
78 attr = self._get_keyring_attr(app_name)
79 # Add our SSO credentials to the keyring
80 yield self.service.open_session()
81 collection = yield self.service.get_default_collection()
82 yield collection.create_item(app_name, attr, secret, True)
83
84 @inlineCallbacks
85 def _migrate_old_token_name(self, app_name):
86 """Migrate credentials with old name, store them with new name."""
87 logger.debug("getting keyring attr")
88 attr = self._get_keyring_attr(app_name)
89 logger.debug("getting old token name")
90 attr['token-name'] = get_old_token_name(app_name)
91 logger.debug("finding keyring item")
92 item = yield self._find_keyring_item(app_name, attr=attr)
93 if item is not None:
94 logger.debug("setting credentials")
95 yield self.set_credentials(app_name,
96 dict(urlparse.parse_qsl(item.secret)))
97 logger.debug("deleting old item")
98 yield item.delete()
99
100 logger.debug("finding keyring item")
101 result = yield self._find_keyring_item(app_name)
102 logger.debug("returning result value")
103 returnValue(result)
104
105 @inlineCallbacks
106 def get_credentials(self, app_name):
107 """A deferred with the secret of the SSO item in a dictionary."""
108 # If we have no attributes, return None
109 logger.debug("getting credentials")
110 yield self.service.open_session()
111 logger.debug("calling find item")
112 item = yield self._find_keyring_item(app_name)
113 if item is None:
114 logger.debug("migrating token")
115 item = yield self._migrate_old_token_name(app_name)
116
117 if item is not None:
118 logger.debug("parsing secret")
119 secret = yield item.get_value()
120 returnValue(dict(urlparse.parse_qsl(secret)))
121 else:
122 # if no item found, try getting the old credentials
123 if app_name == U1_APP_NAME:
124 logger.debug("trying old credentials")
125 old_creds = yield try_old_credentials(app_name)
126 returnValue(old_creds)
127 # nothing was found
128 returnValue(None)
129
130 @inlineCallbacks
131 def delete_credentials(self, app_name):
132 """Delete a set of credentials from the keyring."""
133 attr = self._get_keyring_attr(app_name)
134 # Add our SSO credentials to the keyring
135 yield self.service.open_session()
136 collection = yield self.service.get_default_collection()
137 yield collection.create_item(app_name, attr, "secret!", True)
138
139 item = yield self._find_keyring_item(app_name)
140 if item is not None:
141 yield item.delete()
0142
=== added directory 'ubuntu_sso/keyring/tests'
=== added file 'ubuntu_sso/keyring/tests/__init__.py'
--- ubuntu_sso/keyring/tests/__init__.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/keyring/tests/__init__.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,17 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Keyring tests."""
018
=== added file 'ubuntu_sso/keyring/tests/test_linux.py'
--- ubuntu_sso/keyring/tests/test_linux.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/keyring/tests/test_linux.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,259 @@
1# -*- coding: utf-8 -*-
2#
3# test_keyring - tests for ubuntu_sso.keyring
4#
5# Author: Alejandro J. Cura <alecu@canonical.com>
6# Author: Natalia B. Bidart <natalia.bidart@canonical.com>
7#
8# Copyright 2010 Canonical Ltd.
9#
10# This program is free software: you can redistribute it and/or modify it
11# under the terms of the GNU General Public License version 3, as published
12# by the Free Software Foundation.
13#
14# This program is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranties of
16# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
17# PURPOSE. See the GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along
20# with this program. If not, see <http://www.gnu.org/licenses/>.
21"""Tests for the keyring.py module."""
22
23import socket
24
25from twisted.internet import defer
26from twisted.internet.defer import inlineCallbacks
27from twisted.trial.unittest import TestCase
28
29from ubuntu_sso import keyring as common_keyring
30from ubuntu_sso.keyring import linux as keyring
31from ubuntu_sso.tests import APP_NAME
32
33
34def build_fake_gethostname(fake_hostname):
35 """Return a fake hostname getter."""
36 return lambda *a: fake_hostname
37
38
39class MockItem(object):
40 """An item contains a secret, lookup attributes and has a label."""
41
42 def __init__(self, label, collection, attr, value):
43 """Initialize a new Item."""
44 self.label = label
45 self.collection = collection
46 self.attributes = attr
47 self.value = value
48
49 def get_value(self):
50 """Retrieve the secret for this item."""
51 return defer.succeed(self.value)
52
53 def delete(self):
54 """Delete this item."""
55 self.collection.items.remove(self)
56 return defer.succeed(None)
57
58 def matches(self, search_attr):
59 """See if this item matches a given search."""
60 for k, val in search_attr.items():
61 if k not in self.attributes:
62 return False
63 if self.attributes[k] != val:
64 return False
65 return True
66
67
68class MockCollection(object):
69 """A collection of items containing secrets."""
70
71 def __init__(self, label, service):
72 """Initialize a new collection."""
73 self.label = label
74 self.service = service
75 self.items = []
76
77 def create_item(self, label, attr, value, replace=True):
78 """Create an item with the given attributes, secret and label."""
79 item = MockItem(label, self, attr, value)
80 self.items.append(item)
81 return defer.succeed(item)
82
83
84class MockSecretService(object):
85 """A class that mocks txsecrets.SecretService."""
86
87 def __init__(self, *args, **kwargs):
88 super(MockSecretService, self).__init__(*args, **kwargs)
89 self.collections = {}
90
91 def open_session(self, window_id=0):
92 """Open a unique session for the caller application."""
93 return defer.succeed(self)
94
95 def search_items(self, attributes):
96 """Find items in any collection."""
97 results = []
98 for collection in self.collections.values():
99 for item in collection.items:
100 if item.matches(attributes):
101 results.append(item)
102 return defer.succeed(results)
103
104 def create_collection(self, label):
105 """Create a new collection with the specified properties."""
106 collection = MockCollection(label, self)
107 self.collections[label] = collection
108 if "default" not in self.collections:
109 self.collections["default"] = collection
110 return defer.succeed(collection)
111
112 def get_default_collection(self):
113 """The collection were default items should be created."""
114 if len(self.collections) == 0:
115 self.create_collection("default")
116 return defer.succeed(self.collections["default"])
117
118
119class TestTokenNameBuilder(TestCase):
120 """Test the method that builds the token name."""
121
122 def test_get_simple_token_name(self):
123 """A simple token name is built right."""
124 sample_app_name = "UbuntuTwo"
125 sample_hostname = "Darkstar"
126 expected_result = "UbuntuTwo @ Darkstar"
127
128 fake_gethostname = build_fake_gethostname(sample_hostname)
129 self.patch(socket, "gethostname", fake_gethostname)
130 result = keyring.get_token_name(sample_app_name)
131 self.assertEqual(result, expected_result)
132
133 def test_get_complex_token_name_for_app_name(self):
134 """A complex token name is built right too."""
135 sample_app_name = "Ubuntu @ Eleven"
136 sample_hostname = "Mate+Cocido"
137 expected_result = "Ubuntu @ Eleven @ Mate+Cocido"
138
139 fake_gethostname = build_fake_gethostname(sample_hostname)
140 self.patch(socket, "gethostname", fake_gethostname)
141 result = keyring.get_token_name(sample_app_name)
142 self.assertEqual(result, expected_result)
143
144 def test_get_complex_token_name_for_hostname(self):
145 """A complex token name is built right too."""
146 sample_app_name = "Ubuntu Eleven"
147 sample_hostname = "Mate @ Cocido"
148 expected_result = "Ubuntu Eleven @ Mate AT Cocido"
149
150 fake_gethostname = build_fake_gethostname(sample_hostname)
151 self.patch(socket, "gethostname", fake_gethostname)
152 result = keyring.get_token_name(sample_app_name)
153 self.assertEqual(result, expected_result)
154
155
156class TestKeyring(TestCase):
157 """Test the keyring related functions."""
158
159 timeout = 5
160
161 def setUp(self):
162 """Initialize the mock used in these tests."""
163 self.mock_service = None
164 self.service = self.patch(keyring, "SecretService",
165 self.get_mock_service)
166 fake_gethostname = build_fake_gethostname("darkstar")
167 self.patch(socket, "gethostname", fake_gethostname)
168
169 def get_mock_service(self):
170 """Create only one instance of the mock service per test."""
171 if self.mock_service == None:
172 self.mock_service = MockSecretService()
173 return self.mock_service
174
175 @inlineCallbacks
176 def test_set_credentials(self):
177 """Test that the set method does not erase previous keys."""
178 sample_creds = {"name": "sample creds name"}
179 sample_creds2 = {"name": "sample creds name 2"}
180 kr = keyring.Keyring()
181 yield kr.set_credentials("appname", sample_creds)
182 yield kr.set_credentials("appname", sample_creds2)
183
184 # pylint: disable=E1101
185 self.assertEqual(len(kr.service.collections["default"].items), 2)
186
187 @inlineCallbacks
188 def test_delete_credentials(self):
189 """Test that a given key is deleted."""
190 sample_creds = {"name": "sample creds name"}
191 kr = keyring.Keyring()
192 yield kr.set_credentials("appname", sample_creds)
193 yield kr.delete_credentials("appname")
194
195 # pylint: disable=E1101
196 self.assertEqual(len(kr.service.collections["default"].items), 1)
197
198 @inlineCallbacks
199 def test_get_credentials(self):
200 """Test that credentials are properly retrieved."""
201 sample_creds = {"name": "sample creds name"}
202 kr = keyring.Keyring()
203 yield kr.set_credentials("appname", sample_creds)
204
205 result = yield kr.get_credentials("appname")
206 self.assertEqual(result, sample_creds)
207
208 @inlineCallbacks
209 def test_get_credentials_migrating_token(self):
210 """Test that credentials are properly retrieved and migrated."""
211 sample_creds = {"name": "sample creds name"}
212 kr = keyring.Keyring()
213 self.patch(keyring, "get_token_name", keyring.get_old_token_name)
214 yield kr.set_credentials(APP_NAME, sample_creds)
215
216 result = yield kr.get_credentials(APP_NAME)
217 self.assertEqual(result, sample_creds)
218
219 @inlineCallbacks
220 def test_get_old_cred_found(self):
221 """The method returns a new set of creds if old creds are found."""
222 sample_oauth_token = "sample oauth token"
223 sample_oauth_secret = "sample oauth secret"
224 old_creds = {
225 "oauth_token": sample_oauth_token,
226 "oauth_token_secret": sample_oauth_secret,
227 }
228 u1kr = common_keyring.UbuntuOneOAuthKeyring()
229 yield u1kr.set_credentials(keyring.U1_APP_NAME, old_creds)
230
231 kr = keyring.Keyring()
232 result = yield kr.get_credentials(keyring.U1_APP_NAME)
233 self.assertIn("token", result)
234 self.assertEqual(result["token"], sample_oauth_token)
235 self.assertIn("token_secret", result)
236 self.assertEqual(result["token_secret"], sample_oauth_secret)
237
238 @inlineCallbacks
239 def test_get_old_cred_found_but_not_asked_for(self):
240 """Returns None if old creds are present but the appname is not U1"""
241 sample_oauth_token = "sample oauth token"
242 sample_oauth_secret = "sample oauth secret"
243 old_creds = {
244 "oauth_token": sample_oauth_token,
245 "oauth_token_secret": sample_oauth_secret,
246 }
247 u1kr = common_keyring.UbuntuOneOAuthKeyring()
248 yield u1kr.set_credentials(keyring.U1_APP_NAME, old_creds)
249
250 kr = keyring.Keyring()
251 result = yield kr.get_credentials("Software Center")
252 self.assertEqual(result, None)
253
254 @inlineCallbacks
255 def test_get_old_cred_not_found(self):
256 """The method returns None if no old nor new credentials found."""
257 kr = keyring.Keyring()
258 result = yield kr.get_credentials(keyring.U1_APP_NAME)
259 self.assertEqual(result, None)
0260
=== added file 'ubuntu_sso/keyring/tests/test_windows.py'
--- ubuntu_sso/keyring/tests/test_windows.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/keyring/tests/test_windows.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,62 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Test the windows keyring implementation."""
18
19from json import dumps
20from mocker import MockerTestCase
21from twisted.internet.defer import inlineCallbacks
22
23from ubuntu_sso.keyring.windows import Keyring, USERNAME
24
25
26class TestWindowsKeyring(MockerTestCase):
27 """Test the windows keyring implementation."""
28
29 def setUp(self):
30 """Setup tests."""
31 super(TestWindowsKeyring, self).setUp()
32 self.keyring_lib = self.mocker.mock()
33 self.keyring = Keyring(self.keyring_lib)
34
35 @inlineCallbacks
36 def test_set_credentials(self):
37 """Test setting the credentials."""
38 app_name = 'name'
39 password = dict(password='password')
40 self.keyring_lib.set_password(app_name, USERNAME, dumps(password))
41 self.mocker.replay()
42 yield self.keyring.set_credentials(app_name, password)
43
44 @inlineCallbacks
45 def test_get_credentials(self):
46 """Test deleting the credentials."""
47 app_name = 'name'
48 password = dict(password='password')
49 self.keyring_lib.get_password(app_name, USERNAME)
50 self.mocker.result(dumps(password))
51 self.mocker.replay()
52 result = yield self.keyring.get_credentials(app_name)
53 self.assertEqual(password, result)
54
55 @inlineCallbacks
56 def test_delete_credentials(self):
57 """Test deleting the credentials."""
58 app_name = 'name'
59 self.keyring_lib.delete_password(app_name, USERNAME)
60 self.mocker.replay()
61 result = yield self.keyring.delete_credentials(app_name)
62 self.assertTrue(result is None)
063
=== added file 'ubuntu_sso/keyring/windows.py'
--- ubuntu_sso/keyring/windows.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/keyring/windows.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,60 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Keyring implementation on Windows."""
18
19from json import loads, dumps
20
21from twisted.internet.threads import deferToThread
22
23USERNAME = 'ubuntu_sso'
24
25
26class Keyring(object):
27 """A Keyring for a given application name."""
28
29 def __init__(self, keyring=None):
30 """Create a new instance."""
31 if keyring is None:
32 import keyring as pykeyring
33 keyring = pykeyring
34 self.keyring = keyring
35
36 def set_credentials(self, app_name, cred):
37 """Set the credentials of the Ubuntu SSO item."""
38 # the windows keyring can only store a pair username-password
39 # so we store the data using ubuntu_sso as the user name. Then
40 # the cred will be stored as the string representation of the dict.
41 return deferToThread(self.keyring.set_password, app_name, USERNAME,
42 dumps(cred))
43
44 def _get_credentials_obj(self, app_name):
45 """A dict with the credentials."""
46 creds = self.keyring.get_password(app_name, USERNAME)
47 return loads(creds)
48
49 def get_credentials(self, app_name):
50 """A deferred with the secret of the SSO item in a dictionary."""
51 return deferToThread(self._get_credentials_obj, app_name)
52
53 def delete_credentials(self, app_name):
54 """Delete a set of credentials from the keyring."""
55 # this call depends on a patch I sent to pykeyring. The patch has
56 # not landed as of version 0.5.1. If you have that version you can
57 # clone my patch in the following way:
58 # hg clone https://bitbucket.org/mandel/pykeyring-delete-password
59 # pylint: disable=E1103
60 return deferToThread(self.keyring.delete_password, app_name, USERNAME)
061
=== added directory 'ubuntu_sso/main'
=== removed file 'ubuntu_sso/main.py'
--- ubuntu_sso/main.py 2011-01-28 16:38:26 +0000
+++ ubuntu_sso/main.py 1970-01-01 00:00:00 +0000
@@ -1,604 +0,0 @@
1# -*- coding: utf-8 -*-
2#
3# ubuntu_sso.main - main login handling interface
4#
5# Author: Natalia Bidart <natalia.bidart@canonical.com>
6# Author: Alejandro J. Cura <alecu@canonical.com>
7#
8# Copyright 2009 Canonical Ltd.
9#
10# This program is free software: you can redistribute it and/or modify it
11# under the terms of the GNU General Public License version 3, as published
12# by the Free Software Foundation.
13#
14# This program is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranties of
16# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
17# PURPOSE. See the GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along
20# with this program. If not, see <http://www.gnu.org/licenses/>.
21"""Single Sign On login handler.
22
23An utility which accepts requests for Ubuntu Single Sign On login over D-Bus.
24
25The OAuth process is handled, including adding the OAuth access token to the
26local keyring.
27
28"""
29
30import os
31import threading
32import warnings
33
34import dbus.service
35
36from ubuntu_sso import (DBUS_ACCOUNT_PATH, DBUS_IFACE_USER_NAME,
37 DBUS_IFACE_CRED_NAME, DBUS_CREDENTIALS_IFACE, NO_OP)
38from ubuntu_sso.account import Account
39from ubuntu_sso.credentials import (Credentials, HELP_TEXT_KEY, PING_URL_KEY,
40 TC_URL_KEY, UI_CLASS_KEY, UI_MODULE_KEY, WINDOW_ID_KEY,
41 SUCCESS_CB_KEY, ERROR_CB_KEY, DENIAL_CB_KEY,
42 ERROR_KEY, ERROR_DETAIL_KEY)
43from ubuntu_sso.keyring import get_token_name, U1_APP_NAME, Keyring
44from ubuntu_sso.logger import setup_logging
45
46
47# Disable the invalid name warning, as we have a lot of DBus style names
48# pylint: disable=C0103
49
50
51logger = setup_logging("ubuntu_sso.main")
52U1_PING_URL = "https://one.ubuntu.com/oauth/sso-finished-so-get-tokens/"
53TIMEOUT_INTERVAL = 10000 # 10 seconds
54
55
56class SSOLoginProcessor(Account):
57 """Login and register users using the Ubuntu Single Sign On service.
58
59 Alias classname to maintain backwards compatibility. DO NOT USE, use
60 ubuntu_sso.account.Account instead.
61 """
62
63 def __init__(self, sso_service_class=None):
64 """Create a new SSO Account manager."""
65 msg = 'Use ubuntu_sso.account.Account instead.'
66 warnings.warn(msg, DeprecationWarning)
67 super(SSOLoginProcessor, self).__init__(sso_service_class)
68
69
70def except_to_errdict(e):
71 """Turn an exception into a dictionary to return thru DBus."""
72 result = {
73 "errtype": e.__class__.__name__,
74 }
75 if len(e.args) == 0:
76 result["message"] = e.__class__.__doc__
77 elif isinstance(e.args[0], dict):
78 result.update(e.args[0])
79 elif isinstance(e.args[0], basestring):
80 result["message"] = e.args[0]
81
82 return result
83
84
85def blocking(f, app_name, result_cb, error_cb):
86 """Run f in a thread; return or throw an exception thru the callbacks."""
87 def _in_thread():
88 """The part that runs inside the thread."""
89 try:
90 result_cb(app_name, f())
91 except Exception, e: # pylint: disable=W0703
92 msg = "Exception while running DBus blocking code in a thread:"
93 logger.exception(msg)
94 error_cb(app_name, except_to_errdict(e))
95 threading.Thread(target=_in_thread).start()
96
97
98class SSOLogin(dbus.service.Object):
99 """Login thru the Single Sign On service."""
100
101 # Operator not preceded by a space (fails with dbus decorators)
102 # pylint: disable=C0322
103
104 def __init__(self, bus_name, object_path=DBUS_ACCOUNT_PATH,
105 sso_login_processor_class=Account,
106 sso_service_class=None):
107 """Initiate the Login object."""
108 dbus.service.Object.__init__(self, object_path=object_path,
109 bus_name=bus_name)
110 self.sso_login_processor_class = sso_login_processor_class
111 self.processor = self.sso_login_processor_class(
112 sso_service_class=sso_service_class)
113
114 # generate_capcha signals
115 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
116 def CaptchaGenerated(self, app_name, result):
117 """Signal thrown after the captcha is generated."""
118 logger.debug('SSOLogin: emitting CaptchaGenerated with app_name "%s" '
119 'and result %r', app_name, result)
120
121 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
122 def CaptchaGenerationError(self, app_name, error):
123 """Signal thrown when there's a problem generating the captcha."""
124 logger.debug('SSOLogin: emitting CaptchaGenerationError with '
125 'app_name "%s" and error %r', app_name, error)
126
127 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
128 in_signature='ss')
129 def generate_captcha(self, app_name, filename):
130 """Call the matching method in the processor."""
131 def f():
132 """Inner function that will be run in a thread."""
133 return self.processor.generate_captcha(filename)
134 blocking(f, app_name, self.CaptchaGenerated,
135 self.CaptchaGenerationError)
136
137 # register_user signals
138 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
139 def UserRegistered(self, app_name, result):
140 """Signal thrown when the user is registered."""
141 logger.debug('SSOLogin: emitting UserRegistered with app_name "%s" '
142 'and result %r', app_name, result)
143
144 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
145 def UserRegistrationError(self, app_name, error):
146 """Signal thrown when there's a problem registering the user."""
147 logger.debug('SSOLogin: emitting UserRegistrationError with '
148 'app_name "%s" and error %r', app_name, error)
149
150 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
151 in_signature='sssss')
152 def register_user(self, app_name, email, password,
153 captcha_id, captcha_solution):
154 """Call the matching method in the processor."""
155 def f():
156 """Inner function that will be run in a thread."""
157 return self.processor.register_user(email, password,
158 captcha_id, captcha_solution)
159 blocking(f, app_name, self.UserRegistered, self.UserRegistrationError)
160
161 # login signals
162 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
163 def LoggedIn(self, app_name, result):
164 """Signal thrown when the user is logged in."""
165 logger.debug('SSOLogin: emitting LoggedIn with app_name "%s" '
166 'and result %r', app_name, result)
167
168 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
169 def LoginError(self, app_name, error):
170 """Signal thrown when there is a problem in the login."""
171 logger.debug('SSOLogin: emitting LoginError with '
172 'app_name "%s" and error %r', app_name, error)
173
174 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
175 def UserNotValidated(self, app_name, result):
176 """Signal thrown when the user is not validated."""
177 logger.debug('SSOLogin: emitting UserNotValidated with app_name "%s" '
178 'and result %r', app_name, result)
179
180 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
181 in_signature='sss')
182 def login(self, app_name, email, password):
183 """Call the matching method in the processor."""
184 def f():
185 """Inner function that will be run in a thread."""
186 token_name = get_token_name(app_name)
187 logger.debug('login: token_name %r, email %r, password <hidden>.',
188 token_name, email)
189 credentials = self.processor.login(email, password, token_name)
190 logger.debug('login returned not None credentials? %r.',
191 credentials is not None)
192 return credentials
193
194 def success_cb(app_name, credentials):
195 """Login finished successfull."""
196 is_validated = self.processor.is_validated(credentials)
197 logger.debug('user is validated? %r.', is_validated)
198 if is_validated:
199 # pylint: disable=E1101
200 d = Keyring().set_credentials(app_name, credentials)
201 d.addCallback(lambda _: self.LoggedIn(app_name, email))
202 d.addErrback(lambda failure: \
203 self.LoginError(app_name,
204 except_to_errdict(failure.value)))
205 else:
206 self.UserNotValidated(app_name, email)
207 blocking(f, app_name, success_cb, self.LoginError)
208
209 # validate_email signals
210 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
211 def EmailValidated(self, app_name, result):
212 """Signal thrown after the email is validated."""
213 logger.debug('SSOLogin: emitting EmailValidated with app_name "%s" '
214 'and result %r', app_name, result)
215
216 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
217 def EmailValidationError(self, app_name, error):
218 """Signal thrown when there's a problem validating the email."""
219 logger.debug('SSOLogin: emitting EmailValidationError with '
220 'app_name "%s" and error %r', app_name, error)
221
222 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
223 in_signature='ssss')
224 def validate_email(self, app_name, email, password, email_token):
225 """Call the matching method in the processor."""
226
227 def f():
228 """Inner function that will be run in a thread."""
229 token_name = get_token_name(app_name)
230 credentials = self.processor.validate_email(email, password,
231 email_token, token_name)
232 return credentials
233
234 def success_cb(app_name, credentials):
235 """Validation finished successfully."""
236 # pylint: disable=E1101
237 d = Keyring().set_credentials(app_name, credentials)
238 d.addCallback(lambda _: self.EmailValidated(app_name, email))
239 failure_cb = lambda f: self.EmailValidationError(app_name, f.value)
240 d.addErrback(failure_cb)
241
242 blocking(f, app_name, success_cb, self.EmailValidationError)
243
244 # request_password_reset_token signals
245 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
246 def PasswordResetTokenSent(self, app_name, result):
247 """Signal thrown when the token is succesfully sent."""
248 logger.debug('SSOLogin: emitting PasswordResetTokenSent with app_name '
249 '"%s" and result %r', app_name, result)
250
251 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
252 def PasswordResetError(self, app_name, error):
253 """Signal thrown when there's a problem sending the token."""
254 logger.debug('SSOLogin: emitting PasswordResetError with '
255 'app_name "%s" and error %r', app_name, error)
256
257 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
258 in_signature='ss')
259 def request_password_reset_token(self, app_name, email):
260 """Call the matching method in the processor."""
261 def f():
262 """Inner function that will be run in a thread."""
263 return self.processor.request_password_reset_token(email)
264 blocking(f, app_name, self.PasswordResetTokenSent,
265 self.PasswordResetError)
266
267 # set_new_password signals
268 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
269 def PasswordChanged(self, app_name, result):
270 """Signal thrown when the token is succesfully sent."""
271 logger.debug('SSOLogin: emitting PasswordChanged with app_name "%s" '
272 'and result %r', app_name, result)
273
274 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
275 def PasswordChangeError(self, app_name, error):
276 """Signal thrown when there's a problem sending the token."""
277 logger.debug('SSOLogin: emitting PasswordChangeError with '
278 'app_name "%s" and error %r', app_name, error)
279
280 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
281 in_signature='ssss')
282 def set_new_password(self, app_name, email, token, new_password):
283 """Call the matching method in the processor."""
284 def f():
285 """Inner function that will be run in a thread."""
286 return self.processor.set_new_password(email, token,
287 new_password)
288 blocking(f, app_name, self.PasswordChanged, self.PasswordChangeError)
289
290
291class SSOCredentials(dbus.service.Object):
292 """DBus object that gets credentials, and login/registers if needed."""
293
294 # Operator not preceded by a space (fails with dbus decorators)
295 # pylint: disable=C0322
296
297 def __init__(self, *args, **kwargs):
298 dbus.service.Object.__init__(self, *args, **kwargs)
299 self.ping_url = os.environ.get('USSOC_PING_URL', U1_PING_URL)
300
301 def _process_error(self, app_name, error_dict):
302 """Process the 'error_dict' and emit CredentialsError."""
303 msg = error_dict.get(ERROR_KEY, 'No error message given.')
304 detail = error_dict.get(ERROR_DETAIL_KEY, 'No detailed error given.')
305 self.CredentialsError(app_name, msg, detail)
306
307 @dbus.service.signal(DBUS_IFACE_CRED_NAME, signature="s")
308 def AuthorizationDenied(self, app_name):
309 """Signal thrown when the user denies the authorization."""
310 logger.info('SSOCredentials: emitting AuthorizationDenied with '
311 'app_name "%s"', app_name)
312
313 @dbus.service.signal(DBUS_IFACE_CRED_NAME, signature="sa{ss}")
314 def CredentialsFound(self, app_name, credentials):
315 """Signal thrown when the credentials are found."""
316 logger.info('SSOCredentials: emitting CredentialsFound with '
317 'app_name "%s"', app_name)
318
319 @dbus.service.signal(DBUS_IFACE_CRED_NAME, signature="sss")
320 def CredentialsError(self, app_name, error_message, detailed_error):
321 """Signal thrown when there is a problem finding the credentials."""
322 logger.error('SSOCredentials: emitting CredentialsError with app_name '
323 '"%s" and error_message %r', app_name, error_message)
324
325 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
326 in_signature="s", out_signature="a{ss}",
327 async_callbacks=("callback", "errback"))
328 def find_credentials(self, app_name, callback=NO_OP, errback=NO_OP):
329 """Get the credentials from the keyring or {} if not there."""
330
331 def log_result(result):
332 """Log the result and continue."""
333 logger.info('find_credentials: app_name "%s", result is {}? %s',
334 app_name, result == {})
335 return result
336
337 d = Credentials(app_name=app_name).find_credentials()
338 # pylint: disable=E1101
339 d.addCallback(log_result)
340 d.addCallbacks(callback, errback)
341
342 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
343 in_signature="sssx", out_signature="")
344 def login_or_register_to_get_credentials(self, app_name,
345 terms_and_conditions_url,
346 help_text, window_id):
347 """Get credentials if found else prompt GUI to login or register.
348
349 'app_name' will be displayed in the GUI.
350 'terms_and_conditions_url' will be the URL pointing to T&C.
351 'help_text' is an explanatory text for the end-users, will be shown
352 below the headers.
353 'window_id' is the id of the window which will be set as a parent of
354 the GUI. If 0, no parent will be set.
355
356 """
357 ping_url = self.ping_url if app_name == U1_APP_NAME else None
358 obj = Credentials(app_name=app_name, ping_url=ping_url,
359 tc_url=terms_and_conditions_url,
360 help_text=help_text, window_id=window_id,
361 success_cb=self.CredentialsFound,
362 error_cb=self._process_error,
363 denial_cb=self.AuthorizationDenied)
364 obj.register()
365
366 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
367 in_signature="ssx", out_signature="")
368 def login_to_get_credentials(self, app_name, help_text, window_id):
369 """Get credentials if found else prompt GUI just to login
370
371 'app_name' will be displayed in the GUI.
372 'help_text' is an explanatory text for the end-users, will be shown
373 before the login fields.
374 'window_id' is the id of the window which will be set as a parent of
375 the GUI. If 0, no parent will be set.
376
377 """
378 ping_url = self.ping_url if app_name == U1_APP_NAME else None
379 obj = Credentials(app_name=app_name, ping_url=ping_url, tc_url=None,
380 help_text=help_text, window_id=window_id,
381 success_cb=self.CredentialsFound,
382 error_cb=self._process_error,
383 denial_cb=self.AuthorizationDenied)
384 obj.login()
385
386 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
387 in_signature='s', out_signature='',
388 async_callbacks=("callback", "errback"))
389 def clear_token(self, app_name, callback=NO_OP, errback=NO_OP):
390 """Clear the token for an application from the keyring.
391
392 'app_name' is the name of the application.
393 """
394 d = Credentials(app_name=app_name).clear_credentials()
395 # pylint: disable=E1101
396 d.addCallbacks(lambda _: callback(), errback)
397
398
399class CredentialsManagement(dbus.service.Object):
400 """DBus object that manages credentials.
401
402 Every exposed method in this class requires one mandatory argument:
403
404 - 'app_name': the name of the application. Will be displayed in the
405 GUI header, plus it will be used to find/build/clear tokens.
406
407 And accepts another parameter named 'args', which is a dictionary that
408 can contain the following:
409
410 - 'help_text': an explanatory text for the end-users, will be
411 shown below the header. This is an optional free text field.
412
413 - 'ping_url': the url to open after successful token retrieval. If
414 defined, the email will be attached to the url and will be pinged
415 with a OAuth-signed request.
416
417 - 'tc_url': the link to the Terms and Conditions page. If defined,
418 the checkbox to agree to the terms will link to it.
419
420 - 'window_id': the id of the window which will be set as a parent
421 of the GUI. If not defined, no parent will be set.
422
423 """
424
425 def __init__(self, timeout_func, shutdown_func, *args, **kwargs):
426 super(CredentialsManagement, self).__init__(*args, **kwargs)
427 self._ref_count = 0
428 self.timeout_func = timeout_func
429 self.shutdown_func = shutdown_func
430
431 # Operator not preceded by a space (fails with dbus decorators)
432 # pylint: disable=C0322
433
434 valid_keys = (HELP_TEXT_KEY, PING_URL_KEY, TC_URL_KEY,
435 UI_CLASS_KEY, UI_MODULE_KEY, WINDOW_ID_KEY)
436
437 def _parse_args(self, args):
438 """Retrieve values from the generic param 'args'."""
439 result = dict(i for i in args.iteritems() if i[0] in self.valid_keys)
440 result[WINDOW_ID_KEY] = int(args.get(WINDOW_ID_KEY, 0))
441 result[SUCCESS_CB_KEY] = self.CredentialsFound
442 result[ERROR_CB_KEY] = self.CredentialsError
443 result[DENIAL_CB_KEY] = self.AuthorizationDenied
444 return result
445
446 def _process_failure(self, failure, app_name):
447 """Process the 'failure' and emit CredentialsError."""
448 self.CredentialsError(app_name, except_to_errdict(failure.value))
449
450 def _get_ref_count(self):
451 """Get value of ref_count."""
452 return self._ref_count
453
454 def _set_ref_count(self, new_value):
455 """Set a new value to ref_count."""
456 logger.debug('ref_count is %r, changing value to %r.',
457 self._ref_count, new_value)
458 if new_value < 0:
459 self._ref_count = 0
460 msg = 'Attempting to decrease ref_count to a negative value (%r).'
461 logger.warning(msg, new_value)
462 else:
463 self._ref_count = new_value
464
465 if self._ref_count == 0:
466 logger.debug('Setting up timer with %r (%r, %r).',
467 self.timeout_func, TIMEOUT_INTERVAL, self.shutdown)
468 self.timeout_func(TIMEOUT_INTERVAL, self.shutdown)
469
470 ref_count = property(fget=_get_ref_count, fset=_set_ref_count)
471
472 def shutdown(self):
473 """If no ongoing requests, call self.shutdown_func."""
474 logger.debug('shutdown!, ref_count is %r.', self._ref_count)
475 if self._ref_count == 0:
476 logger.info('Shutting down, calling %r.', self.shutdown_func)
477 self.shutdown_func()
478
479 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
480 def AuthorizationDenied(self, app_name):
481 """Signal thrown when the user denies the authorization."""
482 self.ref_count -= 1
483 logger.info('%s: emitting AuthorizationDenied with app_name "%s".',
484 self.__class__.__name__, app_name)
485
486 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
487 def CredentialsFound(self, app_name, credentials):
488 """Signal thrown when the credentials are found."""
489 self.ref_count -= 1
490 logger.info('%s: emitting CredentialsFound with app_name "%s".',
491 self.__class__.__name__, app_name)
492
493 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
494 def CredentialsNotFound(self, app_name):
495 """Signal thrown when the credentials are not found."""
496 self.ref_count -= 1
497 logger.info('%s: emitting CredentialsNotFound with app_name "%s".',
498 self.__class__.__name__, app_name)
499
500 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
501 def CredentialsCleared(self, app_name):
502 """Signal thrown when the credentials were cleared."""
503 self.ref_count -= 1
504 logger.info('%s: emitting CredentialsCleared with app_name "%s".',
505 self.__class__.__name__, app_name)
506
507 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
508 def CredentialsStored(self, app_name):
509 """Signal thrown when the credentials were cleared."""
510 self.ref_count -= 1
511 logger.info('%s: emitting CredentialsStored with app_name "%s".',
512 self.__class__.__name__, app_name)
513
514 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
515 def CredentialsError(self, app_name, error_dict):
516 """Signal thrown when there is a problem getting the credentials."""
517 self.ref_count -= 1
518 logger.error('%s: emitting CredentialsError with app_name "%s" and '
519 'error_dict %r.', self.__class__.__name__, app_name,
520 error_dict)
521
522 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
523 in_signature='sa{ss}', out_signature='')
524 def find_credentials(self, app_name, args):
525 """Look for the credentials for an application.
526
527 - 'app_name': the name of the application which credentials are
528 going to be removed.
529
530 - 'args' is a dictionary, currently not used.
531
532 """
533 self.ref_count += 1
534
535 def success_cb(credentials):
536 """Find credentials and notify using signals."""
537 if credentials is not None and len(credentials) > 0:
538 self.CredentialsFound(app_name, credentials)
539 else:
540 self.CredentialsNotFound(app_name)
541
542 obj = Credentials(app_name)
543 d = obj.find_credentials()
544 # pylint: disable=E1101
545 d.addCallback(success_cb)
546 d.addErrback(self._process_failure, app_name)
547
548 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
549 in_signature='sa{ss}', out_signature='')
550 def clear_credentials(self, app_name, args):
551 """Clear the credentials for an application.
552
553 - 'app_name': the name of the application which credentials are
554 going to be removed.
555
556 - 'args' is a dictionary, currently not used.
557
558 """
559 self.ref_count += 1
560
561 obj = Credentials(app_name)
562 d = obj.clear_credentials()
563 # pylint: disable=E1101
564 d.addCallback(lambda _: self.CredentialsCleared(app_name))
565 d.addErrback(self._process_failure, app_name)
566
567 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
568 in_signature='sa{ss}', out_signature='')
569 def store_credentials(self, app_name, args):
570 """Store the token for an application.
571
572 - 'app_name': the name of the application which credentials are
573 going to be stored.
574
575 - 'args' is the dictionary holding the credentials. Needs to provide
576 the following mandatory keys: 'token', 'token_key', 'consumer_key',
577 'consumer_secret'.
578
579 """
580 self.ref_count += 1
581
582 obj = Credentials(app_name)
583 d = obj.store_credentials(args)
584 # pylint: disable=E1101
585 d.addCallback(lambda _: self.CredentialsStored(app_name))
586 d.addErrback(self._process_failure, app_name)
587
588 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
589 in_signature='sa{ss}', out_signature='')
590 def register(self, app_name, args):
591 """Get credentials if found else prompt GUI to register."""
592 self.ref_count += 1
593
594 obj = Credentials(app_name, **self._parse_args(args))
595 obj.register()
596
597 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
598 in_signature='sa{ss}', out_signature='')
599 def login(self, app_name, args):
600 """Get credentials if found else prompt GUI to login."""
601 self.ref_count += 1
602
603 obj = Credentials(app_name, **self._parse_args(args))
604 obj.login()
6050
=== added file 'ubuntu_sso/main/__init__.py'
--- ubuntu_sso/main/__init__.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/__init__.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,375 @@
1# -*- coding: utf-8 -*-
2#
3# Author: Natalia Bidart <natalia.bidart@canonical.com>
4# Author: Alejandro J. Cura <alecu@canonical.com>
5# Author: Manuel de la Pena <manuel@canonical.com>
6#
7# Copyright 2011 Canonical Ltd.
8#
9# This program is free software: you can redistribute it and/or modify it
10# under the terms of the GNU General Public License version 3, as published
11# by the Free Software Foundation.
12#
13# This program is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranties of
15# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16# PURPOSE. See the GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License along
19# with this program. If not, see <http://www.gnu.org/licenses/>.
20"""Main object implementations."""
21
22import os
23import sys
24import warnings
25
26from ubuntu_sso import NO_OP
27from ubuntu_sso.account import Account
28from ubuntu_sso.credentials import (Credentials, HELP_TEXT_KEY, PING_URL_KEY,
29 TC_URL_KEY, UI_CLASS_KEY, UI_MODULE_KEY, WINDOW_ID_KEY,
30 SUCCESS_CB_KEY, ERROR_CB_KEY, DENIAL_CB_KEY)
31from ubuntu_sso.keyring import get_token_name, U1_APP_NAME, Keyring
32from ubuntu_sso.logger import setup_logging
33
34logger = setup_logging("ubuntu_sso.main")
35U1_PING_URL = "https://one.ubuntu.com/oauth/sso-finished-so-get-tokens/"
36
37
38class SSOLoginProcessor(Account):
39 """Login and register users using the Ubuntu Single Sign On service.
40
41 Alias classname to maintain backwards compatibility. DO NOT USE, use
42 ubuntu_sso.account.Account instead.
43 """
44
45 def __init__(self, sso_service_class=None):
46 """Create a new SSO Account manager."""
47 msg = 'Use ubuntu_sso.account.Account instead.'
48 warnings.warn(msg, DeprecationWarning)
49 super(SSOLoginProcessor, self).__init__(sso_service_class)
50
51
52def except_to_errdict(e):
53 """Turn an exception into a dictionary to return thru DBus."""
54 result = {
55 "errtype": e.__class__.__name__,
56 }
57 if len(e.args) == 0:
58 result["message"] = e.__class__.__doc__
59 elif isinstance(e.args[0], dict):
60 result.update(e.args[0])
61 elif isinstance(e.args[0], basestring):
62 result["message"] = e.args[0]
63
64 return result
65
66
67class SSOLoginRoot(object):
68 """Login thru the Single Sign On service."""
69
70 def __init__(self, sso_login_processor_class=Account,
71 sso_service_class=None):
72 """Initiate the Login object."""
73 self.sso_login_processor_class = sso_login_processor_class
74 self.processor = self.sso_login_processor_class(
75 sso_service_class=sso_service_class)
76
77 def generate_captcha(self, app_name, filename, thread_execute, result_cb,
78 error_cb):
79 """Call the matching method in the processor."""
80 def f():
81 """Inner function that will be run in a thread."""
82 return self.processor.generate_captcha(filename)
83 thread_execute(f, app_name, result_cb, error_cb)
84
85 def register_user(self, app_name, email, password, name, captcha_id,
86 captcha_solution, thread_execute, result_cb, error_cb):
87 """Call the matching method in the processor."""
88 def f():
89 """Inner function that will be run in a thread."""
90 return self.processor.register_user(email, password, name,
91 captcha_id, captcha_solution)
92 thread_execute(f, app_name, result_cb, error_cb)
93
94 def login(self, app_name, email, password, thread_execute, result_cb,
95 error_cb, not_validated_cb):
96 """Call the matching method in the processor."""
97 def f():
98 """Inner function that will be run in a thread."""
99 token_name = get_token_name(app_name)
100 logger.debug('login: token_name %r, email %r, password <hidden>.',
101 token_name, email)
102 credentials = self.processor.login(email, password, token_name)
103 logger.debug('login returned not None credentials? %r.',
104 credentials is not None)
105 return credentials
106
107 def success_cb(app_name, credentials):
108 """Login finished successfull."""
109 is_validated = self.processor.is_validated(credentials)
110 logger.debug('user is validated? %r.', is_validated)
111 if is_validated:
112 # pylint: disable=E1101
113 d = Keyring().set_credentials(app_name, credentials)
114 d.addCallback(lambda _: result_cb(app_name, email))
115 d.addErrback(lambda failure: \
116 error_cb(app_name,
117 except_to_errdict(failure.value)))
118 else:
119 not_validated_cb(app_name, email)
120 thread_execute(f, app_name, success_cb, error_cb)
121
122 def validate_email(self, app_name, email, password, email_token,
123 thread_execute, result_cb, error_cb):
124 """Call the matching method in the processor."""
125
126 def f():
127 """Inner function that will be run in a thread."""
128 token_name = get_token_name(app_name)
129 credentials = self.processor.validate_email(email, password,
130 email_token, token_name)
131 return credentials
132
133 def success_cb(app_name, credentials):
134 """Validation finished successfully."""
135 # pylint: disable=E1101
136 d = Keyring().set_credentials(app_name, credentials)
137 d.addCallback(lambda _: result_cb(app_name, email))
138 failure_cb = lambda f: error_cb(app_name, f.value)
139 d.addErrback(failure_cb)
140
141 thread_execute(f, app_name, success_cb, error_cb)
142
143 def request_password_reset_token(self, app_name, email, thread_execute,
144 result_cb, error_cb):
145 """Call the matching method in the processor."""
146 def f():
147 """Inner function that will be run in a thread."""
148 return self.processor.request_password_reset_token(email)
149 thread_execute(f, app_name, result_cb, error_cb)
150
151 def set_new_password(self, app_name, email, token, new_password,
152 thread_execute, result_cb, error_cb):
153 """Call the matching method in the processor."""
154 def f():
155 """Inner function that will be run in a thread."""
156 return self.processor.set_new_password(email, token,
157 new_password)
158 thread_execute(f, app_name, result_cb, error_cb)
159
160
161class SSOCredentialsRoot(object):
162 """Object that gets credentials, and login/registers if needed."""
163
164 def __init__(self):
165 self.ping_url = os.environ.get('USSOC_PING_URL', U1_PING_URL)
166
167 def find_credentials(self, app_name, callback=NO_OP, errback=NO_OP):
168 """Get the credentials from the keyring or {} if not there."""
169
170 def log_result(result):
171 """Log the result and continue."""
172 logger.info('find_credentials: app_name "%s", result is {}? %s',
173 app_name, result == {})
174 return result
175
176 d = Credentials(app_name=app_name).find_credentials()
177 # pylint: disable=E1101
178 d.addCallback(log_result)
179 d.addCallbacks(callback, errback)
180
181 def login_or_register_to_get_credentials(self, app_name,
182 terms_and_conditions_url,
183 help_text, window_id,
184 success_cb, error_cb, denial_cb):
185 """Get credentials if found else prompt GUI to login or register.
186
187 'app_name' will be displayed in the GUI.
188 'terms_and_conditions_url' will be the URL pointing to T&C.
189 'help_text' is an explanatory text for the end-users, will be shown
190 below the headers.
191 'window_id' is the id of the window which will be set as a parent of
192 the GUI. If 0, no parent will be set.
193
194 """
195 ping_url = self.ping_url if app_name == U1_APP_NAME else None
196 obj = Credentials(app_name=app_name, ping_url=ping_url,
197 tc_url=terms_and_conditions_url,
198 help_text=help_text, window_id=window_id,
199 success_cb=success_cb, error_cb=error_cb,
200 denial_cb=denial_cb)
201 obj.register()
202
203 def login_to_get_credentials(self, app_name, help_text, window_id,
204 success_cb, error_cb, denial_cb):
205 """Get credentials if found else prompt GUI just to login
206
207 'app_name' will be displayed in the GUI.
208 'help_text' is an explanatory text for the end-users, will be shown
209 before the login fields.
210 'window_id' is the id of the window which will be set as a parent of
211 the GUI. If 0, no parent will be set.
212
213 """
214 ping_url = self.ping_url if app_name == U1_APP_NAME else None
215 obj = Credentials(app_name=app_name, ping_url=ping_url, tc_url=None,
216 help_text=help_text, window_id=window_id,
217 success_cb=success_cb, error_cb=error_cb,
218 denial_cb=denial_cb)
219 obj.login()
220
221 def clear_token(self, app_name, callback=NO_OP, errback=NO_OP):
222 """Clear the token for an application from the keyring.
223
224 'app_name' is the name of the application.
225 """
226 d = Credentials(app_name=app_name).clear_credentials()
227 # pylint: disable=E1101
228 d.addCallbacks(lambda _: callback(), errback)
229
230
231class CredentialsManagementRoot(object):
232 """Object that manages credentials.
233
234 Every exposed method in this class requires one mandatory argument:
235
236 - 'app_name': the name of the application. Will be displayed in the
237 GUI header, plus it will be used to find/build/clear tokens.
238
239 And accepts another parameter named 'args', which is a dictionary that
240 can contain the following:
241
242 - 'help_text': an explanatory text for the end-users, will be
243 shown below the header. This is an optional free text field.
244
245 - 'ping_url': the url to open after successful token retrieval. If
246 defined, the email will be attached to the url and will be pinged
247 with a OAuth-signed request.
248
249 - 'tc_url': the link to the Terms and Conditions page. If defined,
250 the checkbox to agree to the terms will link to it.
251
252 - 'window_id': the id of the window which will be set as a parent
253 of the GUI. If not defined, no parent will be set.
254
255 """
256
257 def __init__(self, found_cb, error_cb, denied_cb, *args, **kwargs):
258 """Create a new instance.
259
260 - 'found_cb' is a callback that will be executed when the credentials
261 were found.
262
263 - 'error_cb' is a callback that will be executed when there was an
264 error getting the credentials.
265
266 - 'denied_cb' is a callback that will be executed when the user denied
267 the use of the crendetials.
268
269 """
270 super(CredentialsManagementRoot, self).__init__(*args, **kwargs)
271 self.found_cb = found_cb
272 self.error_cb = error_cb
273 self.denied_cb = denied_cb
274
275 valid_keys = (HELP_TEXT_KEY, PING_URL_KEY, TC_URL_KEY,
276 UI_CLASS_KEY, UI_MODULE_KEY, WINDOW_ID_KEY)
277
278 def _parse_args(self, args):
279 """Retrieve values from the generic param 'args'."""
280 result = dict(i for i in args.iteritems() if i[0] in self.valid_keys)
281 result[WINDOW_ID_KEY] = int(args.get(WINDOW_ID_KEY, 0))
282 result[SUCCESS_CB_KEY] = self.found_cb
283 result[ERROR_CB_KEY] = self.error_cb
284 result[DENIAL_CB_KEY] = self.denied_cb
285 return result
286
287 def find_credentials(self, app_name, args, success_cb, error_cb):
288 """Look for the credentials for an application.
289
290 - 'app_name': the name of the application which credentials are
291 going to be removed.
292
293 - 'args' is a dictionary, currently not used.
294
295 - 'success_cb' is a callback that will be execute if the operation was
296 a success.
297
298 - 'error_cb' is a callback that will be executed if the operation had
299 an error.
300
301 """
302
303 obj = Credentials(app_name)
304 d = obj.find_credentials()
305 # pylint: disable=E1101
306 d.addCallback(success_cb)
307 d.addErrback(error_cb, app_name)
308
309 def clear_credentials(self, app_name, args, success_cb, error_cb):
310 """Clear the credentials for an application.
311
312 - 'app_name': the name of the application which credentials are
313 going to be removed.
314
315 - 'args' is a dictionary, currently not used.
316
317 - 'success_cb' is a callback that will be execute if the operation was
318 a success.
319
320 - 'error_cb' is a callback that will be executed if the operation had
321 an error.
322
323 """
324
325 obj = Credentials(app_name)
326 d = obj.clear_credentials()
327 # pylint: disable=E1101
328 d.addCallback(success_cb)
329 d.addErrback(error_cb, app_name)
330
331 def store_credentials(self, app_name, args, success_cb, error_cb):
332 """Store the token for an application.
333
334 - 'app_name': the name of the application which credentials are
335 going to be stored.
336
337 - 'args' is the dictionary holding the credentials. Needs to provide
338 the following mandatory keys: 'token', 'token_key', 'consumer_key',
339 'consumer_secret'.
340
341 - 'success_cb' is a callback that will be execute if the operation was
342 a success.
343
344 - 'error_cb' is a callback that will be executed if the operation had
345 an error.
346 """
347
348 obj = Credentials(app_name)
349 d = obj.store_credentials(args)
350 # pylint: disable=E1101
351 d.addCallback(success_cb)
352 d.addErrback(error_cb, app_name)
353
354 def register(self, app_name, args):
355 """Get credentials if found else prompt GUI to register."""
356 obj = Credentials(app_name, **self._parse_args(args))
357 obj.register()
358
359 def login(self, app_name, args):
360 """Get credentials if found else prompt GUI to login."""
361 obj = Credentials(app_name, **self._parse_args(args))
362 obj.login()
363
364# pylint: disable=C0103
365SSOLogin = None
366SSOCredentials = None
367CredentialsManagement = None
368
369if sys.platform == 'win32':
370 pass
371else:
372 from ubuntu_sso.main import linux
373 SSOLogin = linux.SSOLogin
374 SSOCredentials = linux.SSOCredentials
375 CredentialsManagement = linux.CredentialsManagement
0376
=== added file 'ubuntu_sso/main/linux.py'
--- ubuntu_sso/main/linux.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/linux.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,486 @@
1# -*- coding: utf-8 -*-
2#
3# ubuntu_sso.main - main login handling interface
4#
5# Author: Natalia Bidart <natalia.bidart@canonical.com>
6# Author: Alejandro J. Cura <alecu@canonical.com>
7#
8# Copyright 2009 Canonical Ltd.
9#
10# This program is free software: you can redistribute it and/or modify it
11# under the terms of the GNU General Public License version 3, as published
12# by the Free Software Foundation.
13#
14# This program is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranties of
16# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
17# PURPOSE. See the GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along
20# with this program. If not, see <http://www.gnu.org/licenses/>.
21"""Single Sign On login handler.
22
23An utility which accepts requests for Ubuntu Single Sign On login over D-Bus.
24
25The OAuth process is handled, including adding the OAuth access token to the
26local keyring.
27
28"""
29
30import threading
31
32import dbus.service
33
34from ubuntu_sso import (DBUS_ACCOUNT_PATH, DBUS_IFACE_USER_NAME,
35 DBUS_IFACE_CRED_NAME, DBUS_CREDENTIALS_IFACE, NO_OP)
36from ubuntu_sso.account import Account
37from ubuntu_sso.credentials import ERROR_KEY, ERROR_DETAIL_KEY
38from ubuntu_sso.logger import setup_logging
39from ubuntu_sso.main import (CredentialsManagementRoot, SSOLoginRoot,
40 SSOCredentialsRoot, except_to_errdict)
41
42
43# Disable the invalid name warning, as we have a lot of DBus style names
44# pylint: disable=C0103
45
46
47logger = setup_logging("ubuntu_sso.main")
48TIMEOUT_INTERVAL = 10000 # 10 seconds
49
50
51def blocking(f, app_name, result_cb, error_cb):
52 """Run f in a thread; return or throw an exception thru the callbacks."""
53 def _in_thread():
54 """The part that runs inside the thread."""
55 try:
56 result_cb(app_name, f())
57 except Exception, e: # pylint: disable=W0703
58 msg = "Exception while running DBus blocking code in a thread:"
59 logger.exception(msg)
60 error_cb(app_name, except_to_errdict(e))
61 threading.Thread(target=_in_thread).start()
62
63
64class SSOLogin(dbus.service.Object):
65 """Login thru the Single Sign On service."""
66
67 # Operator not preceded by a space (fails with dbus decorators)
68 # pylint: disable=C0322
69
70 def __init__(self, bus_name, object_path=DBUS_ACCOUNT_PATH,
71 sso_login_processor_class=Account,
72 sso_service_class=None):
73 """Initiate the Login object."""
74 dbus.service.Object.__init__(self, object_path=object_path,
75 bus_name=bus_name)
76 self.root = SSOLoginRoot(sso_login_processor_class, sso_service_class)
77
78 # generate_capcha signals
79 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
80 def CaptchaGenerated(self, app_name, result):
81 """Signal thrown after the captcha is generated."""
82 logger.debug('SSOLogin: emitting CaptchaGenerated with app_name "%s" '
83 'and result %r', app_name, result)
84
85 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
86 def CaptchaGenerationError(self, app_name, error):
87 """Signal thrown when there's a problem generating the captcha."""
88 logger.debug('SSOLogin: emitting CaptchaGenerationError with '
89 'app_name "%s" and error %r', app_name, error)
90
91 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
92 in_signature='ss')
93 def generate_captcha(self, app_name, filename):
94 """Call the matching method in the processor."""
95 self.root.generate_captcha(app_name, filename, blocking,
96 self.CaptchaGenerated,
97 self.CaptchaGenerationError)
98
99 # register_user signals
100 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
101 def UserRegistered(self, app_name, result):
102 """Signal thrown when the user is registered."""
103 logger.debug('SSOLogin: emitting UserRegistered with app_name "%s" '
104 'and result %r', app_name, result)
105
106 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
107 def UserRegistrationError(self, app_name, error):
108 """Signal thrown when there's a problem registering the user."""
109 logger.debug('SSOLogin: emitting UserRegistrationError with '
110 'app_name "%s" and error %r', app_name, error)
111
112 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
113 in_signature='ssssss')
114 def register_user(self, app_name, email, password, name,
115 captcha_id, captcha_solution):
116 """Call the matching method in the processor."""
117 self.root.register_user(app_name, email, password, name, captcha_id,
118 captcha_solution, blocking,
119 self.UserRegistered,
120 self.UserRegistrationError)
121
122 # login signals
123 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
124 def LoggedIn(self, app_name, result):
125 """Signal thrown when the user is logged in."""
126 logger.debug('SSOLogin: emitting LoggedIn with app_name "%s" '
127 'and result %r', app_name, result)
128
129 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
130 def LoginError(self, app_name, error):
131 """Signal thrown when there is a problem in the login."""
132 logger.debug('SSOLogin: emitting LoginError with '
133 'app_name "%s" and error %r', app_name, error)
134
135 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
136 def UserNotValidated(self, app_name, result):
137 """Signal thrown when the user is not validated."""
138 logger.debug('SSOLogin: emitting UserNotValidated with app_name "%s" '
139 'and result %r', app_name, result)
140
141 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
142 in_signature='sss')
143 def login(self, app_name, email, password):
144 """Call the matching method in the processor."""
145 self.root.login(app_name, email, password, blocking, self.LoggedIn,
146 self.LoginError, self.UserNotValidated)
147
148 # validate_email signals
149 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
150 def EmailValidated(self, app_name, result):
151 """Signal thrown after the email is validated."""
152 logger.debug('SSOLogin: emitting EmailValidated with app_name "%s" '
153 'and result %r', app_name, result)
154
155 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
156 def EmailValidationError(self, app_name, error):
157 """Signal thrown when there's a problem validating the email."""
158 logger.debug('SSOLogin: emitting EmailValidationError with '
159 'app_name "%s" and error %r', app_name, error)
160
161 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
162 in_signature='ssss')
163 def validate_email(self, app_name, email, password, email_token):
164 """Call the matching method in the processor."""
165 self.root.validate_email(app_name, email, password, email_token,
166 blocking, self.EmailValidated,
167 self.EmailValidationError)
168
169 # request_password_reset_token signals
170 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
171 def PasswordResetTokenSent(self, app_name, result):
172 """Signal thrown when the token is succesfully sent."""
173 logger.debug('SSOLogin: emitting PasswordResetTokenSent with app_name '
174 '"%s" and result %r', app_name, result)
175
176 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
177 def PasswordResetError(self, app_name, error):
178 """Signal thrown when there's a problem sending the token."""
179 logger.debug('SSOLogin: emitting PasswordResetError with '
180 'app_name "%s" and error %r', app_name, error)
181
182 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
183 in_signature='ss')
184 def request_password_reset_token(self, app_name, email):
185 """Call the matching method in the processor."""
186 self.root.request_password_reset_token(app_name, email, blocking,
187 self.PasswordResetTokenSent,
188 self.PasswordResetError)
189
190 # set_new_password signals
191 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="ss")
192 def PasswordChanged(self, app_name, result):
193 """Signal thrown when the token is succesfully sent."""
194 logger.debug('SSOLogin: emitting PasswordChanged with app_name "%s" '
195 'and result %r', app_name, result)
196
197 @dbus.service.signal(DBUS_IFACE_USER_NAME, signature="sa{ss}")
198 def PasswordChangeError(self, app_name, error):
199 """Signal thrown when there's a problem sending the token."""
200 logger.debug('SSOLogin: emitting PasswordChangeError with '
201 'app_name "%s" and error %r', app_name, error)
202
203 @dbus.service.method(dbus_interface=DBUS_IFACE_USER_NAME,
204 in_signature='ssss')
205 def set_new_password(self, app_name, email, token, new_password):
206 """Call the matching method in the processor."""
207 self.root.set_new_password(app_name, email, token, new_password,
208 blocking, self.PasswordChanged,
209 self.PasswordChangeError)
210
211
212class SSOCredentials(dbus.service.Object):
213 """DBus object that gets credentials, and login/registers if needed."""
214
215 # Operator not preceded by a space (fails with dbus decorators)
216 # pylint: disable=C0322
217
218 def __init__(self, *args, **kwargs):
219 dbus.service.Object.__init__(self, *args, **kwargs)
220 self.root = SSOCredentialsRoot()
221
222 def _process_error(self, app_name, error_dict):
223 """Process the 'error_dict' and emit CredentialsError."""
224 msg = error_dict.get(ERROR_KEY, 'No error message given.')
225 detail = error_dict.get(ERROR_DETAIL_KEY, 'No detailed error given.')
226 self.CredentialsError(app_name, msg, detail)
227
228 @dbus.service.signal(DBUS_IFACE_CRED_NAME, signature="s")
229 def AuthorizationDenied(self, app_name):
230 """Signal thrown when the user denies the authorization."""
231 logger.info('SSOCredentials: emitting AuthorizationDenied with '
232 'app_name "%s"', app_name)
233
234 @dbus.service.signal(DBUS_IFACE_CRED_NAME, signature="sa{ss}")
235 def CredentialsFound(self, app_name, credentials):
236 """Signal thrown when the credentials are found."""
237 logger.info('SSOCredentials: emitting CredentialsFound with '
238 'app_name "%s"', app_name)
239
240 @dbus.service.signal(DBUS_IFACE_CRED_NAME, signature="sss")
241 def CredentialsError(self, app_name, error_message, detailed_error):
242 """Signal thrown when there is a problem finding the credentials."""
243 logger.error('SSOCredentials: emitting CredentialsError with app_name '
244 '"%s" and error_message %r', app_name, error_message)
245
246 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
247 in_signature="s", out_signature="a{ss}",
248 async_callbacks=("callback", "errback"))
249 def find_credentials(self, app_name, callback=NO_OP, errback=NO_OP):
250 """Get the credentials from the keyring or {} if not there."""
251 self.root.find_credentials(app_name, callback, errback)
252
253 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
254 in_signature="sssx", out_signature="")
255 def login_or_register_to_get_credentials(self, app_name,
256 terms_and_conditions_url,
257 help_text, window_id):
258 """Get credentials if found else prompt GUI to login or register.
259
260 'app_name' will be displayed in the GUI.
261 'terms_and_conditions_url' will be the URL pointing to T&C.
262 'help_text' is an explanatory text for the end-users, will be shown
263 below the headers.
264 'window_id' is the id of the window which will be set as a parent of
265 the GUI. If 0, no parent will be set.
266
267 """
268 self.root.login_or_register_to_get_credentials(app_name,
269 terms_and_conditions_url,
270 help_text, window_id,
271 self.CredentialsFound,
272 self._process_error,
273 self.AuthorizationDenied)
274
275 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
276 in_signature="ssx", out_signature="")
277 def login_to_get_credentials(self, app_name, help_text, window_id):
278 """Get credentials if found else prompt GUI just to login
279
280 'app_name' will be displayed in the GUI.
281 'help_text' is an explanatory text for the end-users, will be shown
282 before the login fields.
283 'window_id' is the id of the window which will be set as a parent of
284 the GUI. If 0, no parent will be set.
285
286 """
287 self.root.login_to_get_credentials(app_name, help_text, window_id,
288 self.CredentialsFound,
289 self._process_error,
290 self.AuthorizationDenied)
291
292 @dbus.service.method(dbus_interface=DBUS_IFACE_CRED_NAME,
293 in_signature='s', out_signature='',
294 async_callbacks=("callback", "errback"))
295 def clear_token(self, app_name, callback=NO_OP, errback=NO_OP):
296 """Clear the token for an application from the keyring.
297
298 'app_name' is the name of the application.
299 """
300 self.root.clear_token(app_name, callback, errback)
301
302
303class CredentialsManagement(dbus.service.Object):
304 """DBus object that manages credentials.
305
306 Every exposed method in this class requires one mandatory argument:
307
308 - 'app_name': the name of the application. Will be displayed in the
309 GUI header, plus it will be used to find/build/clear tokens.
310
311 And accepts another parameter named 'args', which is a dictionary that
312 can contain the following:
313
314 - 'help_text': an explanatory text for the end-users, will be
315 shown below the header. This is an optional free text field.
316
317 - 'ping_url': the url to open after successful token retrieval. If
318 defined, the email will be attached to the url and will be pinged
319 with a OAuth-signed request.
320
321 - 'tc_url': the link to the Terms and Conditions page. If defined,
322 the checkbox to agree to the terms will link to it.
323
324 - 'window_id': the id of the window which will be set as a parent
325 of the GUI. If not defined, no parent will be set.
326
327 """
328
329 def __init__(self, timeout_func, shutdown_func, *args, **kwargs):
330 super(CredentialsManagement, self).__init__(*args, **kwargs)
331 self._ref_count = 0
332 self.timeout_func = timeout_func
333 self.shutdown_func = shutdown_func
334 self.root = CredentialsManagementRoot(self.CredentialsFound,
335 self.CredentialsError,
336 self.AuthorizationDenied)
337
338 # Operator not preceded by a space (fails with dbus decorators)
339 # pylint: disable=C0322
340
341 def _process_failure(self, failure, app_name):
342 """Process the 'failure' and emit CredentialsError."""
343 self.CredentialsError(app_name, except_to_errdict(failure.value))
344
345 def _get_ref_count(self):
346 """Get value of ref_count."""
347 return self._ref_count
348
349 def _set_ref_count(self, new_value):
350 """Set a new value to ref_count."""
351 logger.debug('ref_count is %r, changing value to %r.',
352 self._ref_count, new_value)
353 if new_value < 0:
354 self._ref_count = 0
355 msg = 'Attempting to decrease ref_count to a negative value (%r).'
356 logger.warning(msg, new_value)
357 else:
358 self._ref_count = new_value
359
360 if self._ref_count == 0:
361 logger.debug('Setting up timer with %r (%r, %r).',
362 self.timeout_func, TIMEOUT_INTERVAL, self.shutdown)
363 self.timeout_func(TIMEOUT_INTERVAL, self.shutdown)
364
365 ref_count = property(fget=_get_ref_count, fset=_set_ref_count)
366
367 def shutdown(self):
368 """If no ongoing requests, call self.shutdown_func."""
369 logger.debug('shutdown!, ref_count is %r.', self._ref_count)
370 if self._ref_count == 0:
371 logger.info('Shutting down, calling %r.', self.shutdown_func)
372 self.shutdown_func()
373
374 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
375 def AuthorizationDenied(self, app_name):
376 """Signal thrown when the user denies the authorization."""
377 self.ref_count -= 1
378 logger.info('%s: emitting AuthorizationDenied with app_name "%s".',
379 self.__class__.__name__, app_name)
380
381 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
382 def CredentialsFound(self, app_name, credentials):
383 """Signal thrown when the credentials are found."""
384 self.ref_count -= 1
385 logger.info('%s: emitting CredentialsFound with app_name "%s".',
386 self.__class__.__name__, app_name)
387
388 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
389 def CredentialsNotFound(self, app_name):
390 """Signal thrown when the credentials are not found."""
391 self.ref_count -= 1
392 logger.info('%s: emitting CredentialsNotFound with app_name "%s".',
393 self.__class__.__name__, app_name)
394
395 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
396 def CredentialsCleared(self, app_name):
397 """Signal thrown when the credentials were cleared."""
398 self.ref_count -= 1
399 logger.info('%s: emitting CredentialsCleared with app_name "%s".',
400 self.__class__.__name__, app_name)
401
402 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='s')
403 def CredentialsStored(self, app_name):
404 """Signal thrown when the credentials were cleared."""
405 self.ref_count -= 1
406 logger.info('%s: emitting CredentialsStored with app_name "%s".',
407 self.__class__.__name__, app_name)
408
409 @dbus.service.signal(DBUS_CREDENTIALS_IFACE, signature='sa{ss}')
410 def CredentialsError(self, app_name, error_dict):
411 """Signal thrown when there is a problem getting the credentials."""
412 self.ref_count -= 1
413 logger.error('%s: emitting CredentialsError with app_name "%s" and '
414 'error_dict %r.', self.__class__.__name__, app_name,
415 error_dict)
416
417 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
418 in_signature='sa{ss}', out_signature='')
419 def find_credentials(self, app_name, args):
420 """Look for the credentials for an application.
421
422 - 'app_name': the name of the application which credentials are
423 going to be removed.
424
425 - 'args' is a dictionary, currently not used.
426
427 """
428 self.ref_count += 1
429
430 def success_cb(credentials):
431 """Find credentials and notify using signals."""
432 if credentials is not None and len(credentials) > 0:
433 self.CredentialsFound(app_name, credentials)
434 else:
435 self.CredentialsNotFound(app_name)
436
437 self.root.find_credentials(app_name, args, success_cb,
438 self._process_failure)
439
440 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
441 in_signature='sa{ss}', out_signature='')
442 def clear_credentials(self, app_name, args):
443 """Clear the credentials for an application.
444
445 - 'app_name': the name of the application which credentials are
446 going to be removed.
447
448 - 'args' is a dictionary, currently not used.
449
450 """
451 self.ref_count += 1
452 self.root.clear_credentials(app_name, args,
453 lambda _: self.CredentialsCleared(app_name),
454 self._process_failure)
455
456 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
457 in_signature='sa{ss}', out_signature='')
458 def store_credentials(self, app_name, args):
459 """Store the token for an application.
460
461 - 'app_name': the name of the application which credentials are
462 going to be stored.
463
464 - 'args' is the dictionary holding the credentials. Needs to provide
465 the following mandatory keys: 'token', 'token_key', 'consumer_key',
466 'consumer_secret'.
467
468 """
469 self.ref_count += 1
470 self.root.store_credentials(app_name, args,
471 lambda _: self.CredentialsStored(app_name),
472 self._process_failure)
473
474 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
475 in_signature='sa{ss}', out_signature='')
476 def register(self, app_name, args):
477 """Get credentials if found else prompt GUI to register."""
478 self.ref_count += 1
479 self.root.register(app_name, args)
480
481 @dbus.service.method(dbus_interface=DBUS_CREDENTIALS_IFACE,
482 in_signature='sa{ss}', out_signature='')
483 def login(self, app_name, args):
484 """Get credentials if found else prompt GUI to login."""
485 self.ref_count += 1
486 self.root.login(app_name, args)
0487
=== added directory 'ubuntu_sso/main/tests'
=== added file 'ubuntu_sso/main/tests/__init__.py'
--- ubuntu_sso/main/tests/__init__.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/tests/__init__.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,17 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Test the different main implementations."""
018
=== added file 'ubuntu_sso/main/tests/test_common.py'
--- ubuntu_sso/main/tests/test_common.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/tests/test_common.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,245 @@
1# -*- coding: utf-8 -*-
2#
3# test_main - tests for ubuntu_sso.main
4#
5# Author: Natalia Bidart <natalia.bidart@canonical.com>
6# Author: Alejandro J. Cura <alecu@canonical.com>
7# Author: Manuel de la Pena <manuel@canonical.com>
8#
9# Copyright 2009-2010 Canonical Ltd.
10#
11# This program is free software: you can redistribute it and/or modify it
12# under the terms of the GNU General Public License version 3, as published
13# by the Free Software Foundation.
14#
15# This program is distributed in the hope that it will be useful, but
16# WITHOUT ANY WARRANTY; without even the implied warranties of
17# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
18# PURPOSE. See the GNU General Public License for more details.
19#
20# You should have received a copy of the GNU General Public License along
21# with this program. If not, see <http://www.gnu.org/licenses/>.
22"""Tests share by diff platforms."""
23
24import os
25
26from unittest import TestCase
27from mocker import MockerTestCase, MATCH
28from ubuntu_sso.main import (
29 CredentialsManagement,
30 SSOCredentialsRoot,
31 SSOCredentials,
32 SSOLogin,
33 U1_PING_URL)
34
35
36class EnvironOverridesTestCase(TestCase):
37 """Some URLs can be set from the environment for testing/QA purposes."""
38
39 def test_override_ping_url(self):
40 """The ping url can be set from the environ via USSOC_PING_URL."""
41 fake_url = 'this is not really a URL'
42 old_url = os.environ.get('USSOC_PING_URL')
43 os.environ['USSOC_PING_URL'] = fake_url
44 try:
45 creds = SSOCredentialsRoot()
46 self.assertEqual(creds.ping_url, fake_url)
47 finally:
48 if old_url:
49 os.environ['USSOC_PING_URL'] = old_url
50 else:
51 del os.environ['USSOC_PING_URL']
52
53 def test_no_override_ping_url(self):
54 """If the environ is unset, the default ping url is used."""
55 creds = SSOCredentialsRoot()
56 self.assertEqual(creds.ping_url, U1_PING_URL)
57
58
59class SSOLoginMockedTestCase(MockerTestCase):
60 """Test that the call are relied correctly."""
61
62 def setUp(self):
63 """Setup tests."""
64 super(SSOLoginMockedTestCase, self).setUp()
65 self.root = self.mocker.mock()
66 mockbusname = self.mocker.mock()
67 mockbus = self.mocker.mock()
68 mockbusname.get_bus()
69 self.mocker.result(mockbus)
70 self.login = SSOLogin(mockbus)
71 self.login.root = self.root
72 self.mocker.reset()
73
74 def test_generate_captcha(self):
75 """Test that the call is relayed."""
76 app_name = 'app'
77 filename = 'file'
78 self.root.generate_captcha(app_name, filename, MATCH(callable),
79 MATCH(callable), MATCH(callable))
80 self.mocker.replay()
81 self.login.generate_captcha(app_name, filename)
82
83 def test_register_user(self):
84 """Test that the call is relayed."""
85 app_name = 'app'
86 email = 'email'
87 password = 'pwd'
88 name = 'display name'
89 captcha_id = 'id'
90 captcha_solution = 'hello'
91 self.root.register_user(app_name, email, password, name, captcha_id,
92 captcha_solution, MATCH(callable),
93 MATCH(callable), MATCH(callable))
94 self.mocker.replay()
95 self.login.register_user(app_name, email, password, name, captcha_id,
96 captcha_solution)
97
98 def test_login(self):
99 """Test that the call is relayed."""
100 app_name = 'app'
101 email = 'email'
102 password = 'password'
103 self.root.login(app_name, email, password, MATCH(callable),
104 MATCH(callable), MATCH(callable),
105 MATCH(callable))
106 self.mocker.mock()
107 self.mocker.replay()
108 self.login.login(app_name, email, password)
109
110 def test_validate_email(self):
111 """Test that the call is relayed."""
112 app_name = 'app'
113 email = 'email'
114 password = 'passwrd'
115 email_token = 'token'
116 self.root.validate_email(app_name, email, password, email_token,
117 MATCH(callable), MATCH(callable),
118 MATCH(callable))
119 self.mocker.replay()
120 self.login.validate_email(app_name, email, password, email_token)
121
122 def test_request_password_reset_tolen(self):
123 """Test that the call is relayed."""
124 app_name = 'app'
125 email = 'email'
126 self.root.request_password_reset_token(app_name, email,
127 MATCH(callable),
128 MATCH(callable),
129 MATCH(callable))
130 self.mocker.replay()
131 self.login.request_password_reset_token(app_name, email)
132
133 def test_set_new_password(self):
134 """Test that the call is relayed."""
135 app_name = 'app'
136 email = 'email'
137 token = 'token'
138 new_password = 'new'
139 self.root.set_new_password(app_name, email, token, new_password,
140 MATCH(callable), MATCH(callable),
141 MATCH(callable))
142 self.mocker.replay()
143 self.login.set_new_password(app_name, email, token, new_password)
144
145
146class SSOCredentialsMockedTestCase(MockerTestCase):
147 """Test that the call are relied correctly."""
148
149 def setUp(self):
150 """Setup tests."""
151 super(SSOCredentialsMockedTestCase, self).setUp()
152 self.root = self.mocker.mock()
153 mockbusname = self.mocker.mock()
154 mockbus = self.mocker.mock()
155 mockbusname.get_bus()
156 self.mocker.result(mockbus)
157 self.cred = SSOCredentials(mockbus)
158 self.cred.root = self.root
159 self.mocker.reset()
160
161 def test_find_credentials(self):
162 """Test that the call is relayed."""
163 app_name = 'app'
164 result_cb = error_cb = lambda: None
165 self.root.find_credentials(app_name, result_cb, error_cb)
166 self.mocker.mock()
167 self.mocker.replay()
168 self.cred.find_credentials(app_name, result_cb, error_cb)
169
170 def test_login_or_register_to_get_credentials(self):
171 """Test that the call is relayed."""
172 app_name = 'app'
173 terms = 'terms'
174 help_text = 'help'
175 window_id = 'id'
176 self.root.login_or_register_to_get_credentials(app_name, terms,
177 help_text, window_id,
178 MATCH(callable),
179 MATCH(callable),
180 MATCH(callable))
181 self.mocker.replay()
182 self.cred.login_or_register_to_get_credentials(app_name, terms,
183 help_text, window_id)
184
185 def test_clear_token(self):
186 """Test that the call is relayed."""
187 app_name = 'app'
188 result_cb = error_cb = lambda: None
189 self.root.clear_token(app_name, result_cb, error_cb)
190 self.mocker.replay()
191 self.cred.clear_token(app_name, result_cb, error_cb)
192
193
194class CredentialsManagementMockedTestCase(MockerTestCase):
195 """Test that the call are relied correctly."""
196
197 def setUp(self):
198 """Setup tests."""
199 super(CredentialsManagementMockedTestCase, self).setUp()
200 self.root = self.mocker.mock()
201 self.cred = CredentialsManagement(None, None)
202 self.cred.root = self.root
203
204 def test_find_credentials(self):
205 """Test that the call is relayed."""
206 app_name = 'app'
207 args = 'args'
208 self.root.find_credentials(app_name, args, MATCH(callable),
209 MATCH(callable))
210 self.mocker.replay()
211 self.cred.find_credentials(app_name, args)
212
213 def test_clear_credentials(self):
214 """Test that the call is relayed."""
215 app_name = 'app'
216 args = 'args'
217 self.root.clear_credentials(app_name, args, MATCH(callable),
218 MATCH(callable))
219 self.mocker.replay()
220 self.cred.clear_credentials(app_name, args)
221
222 def test_store_credentials(self):
223 """Test that the call is relayed."""
224 app_name = 'app'
225 args = 'args'
226 self.root.store_credentials(app_name, args, MATCH(callable),
227 MATCH(callable))
228 self.mocker.replay()
229 self.cred.store_credentials(app_name, args)
230
231 def test_register(self):
232 """Test that the call is relayed."""
233 app_name = 'app'
234 args = 'args'
235 self.root.register(app_name, args)
236 self.mocker.replay()
237 self.cred.register(app_name, args)
238
239 def test_login(self):
240 """Test that the call is relayed."""
241 app_name = 'app'
242 args = 'args'
243 self.root.login(app_name, args)
244 self.mocker.replay()
245 self.cred.login(app_name, args)
0246
=== added file 'ubuntu_sso/main/tests/test_linux.py'
--- ubuntu_sso/main/tests/test_linux.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/tests/test_linux.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,1306 @@
1# -*- coding: utf-8 -*-
2#
3# test_main - tests for ubuntu_sso.main
4#
5# Author: Natalia Bidart <natalia.bidart@canonical.com>
6# Author: Alejandro J. Cura <alecu@canonical.com>
7#
8# Copyright 2009-2010 Canonical Ltd.
9#
10# This program is free software: you can redistribute it and/or modify it
11# under the terms of the GNU General Public License version 3, as published
12# by the Free Software Foundation.
13#
14# This program is distributed in the hope that it will be useful, but
15# WITHOUT ANY WARRANTY; without even the implied warranties of
16# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
17# PURPOSE. See the GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along
20# with this program. If not, see <http://www.gnu.org/licenses/>.
21"""Tests for the main SSO client code."""
22
23import logging
24import os
25
26from mocker import Mocker, MockerTestCase, ARGS, KWARGS
27from twisted.internet import defer
28from twisted.internet.defer import Deferred, inlineCallbacks
29from twisted.trial.unittest import TestCase
30from ubuntuone.devtools.handlers import MementoHandler
31
32import ubuntu_sso.keyring
33import ubuntu_sso.main
34import ubuntu_sso.main.linux
35
36from ubuntu_sso import DBUS_CREDENTIALS_IFACE
37from ubuntu_sso.keyring import U1_APP_NAME
38from ubuntu_sso.main import (U1_PING_URL, except_to_errdict,
39 CredentialsManagement, SSOCredentials, SSOLogin)
40from ubuntu_sso.main.linux import TIMEOUT_INTERVAL, blocking
41from ubuntu_sso.main import (HELP_TEXT_KEY, PING_URL_KEY,
42 TC_URL_KEY, UI_CLASS_KEY, UI_MODULE_KEY, WINDOW_ID_KEY,
43 SUCCESS_CB_KEY, ERROR_CB_KEY, DENIAL_CB_KEY)
44from ubuntu_sso.tests import (APP_NAME, TC_URL, HELP_TEXT, CAPTCHA_ID,
45 CAPTCHA_SOLUTION, EMAIL, EMAIL_TOKEN, NAME, PASSWORD, PING_URL, TOKEN,
46 TOKEN_NAME, WINDOW_ID, TestCase)
47
48
49# Access to a protected member 'yyy' of a client class
50# pylint: disable=W0212
51
52
53class BlockingSampleException(Exception):
54 """The exception that will be thrown by the fake blocking."""
55
56
57def fake_ok_blocking(f, app, cb, eb):
58 """A fake blocking function that succeeds."""
59 cb(app, f())
60
61
62def fake_err_blocking(f, app, cb, eb):
63 """A fake blocking function that fails."""
64 try:
65 f()
66 except Exception, e: # pylint: disable=W0703
67 eb(app, except_to_errdict(e))
68 else:
69 eb(app, except_to_errdict(BlockingSampleException()))
70
71
72class SsoDbusTestCase(TestCase):
73 """Test the SSOLogin DBus interface."""
74
75 timeout = 2
76
77 def setUp(self):
78 """Create the mocking bus."""
79 self.mocker = Mocker()
80 self.mockbusname = self.mocker.mock()
81 mockbus = self.mocker.mock()
82 self.mockbusname.get_bus()
83 self.mocker.result(mockbus)
84 mockbus._register_object_path(ARGS)
85 self.mockprocessorclass = None
86
87 def ksc(keyring, k, val):
88 """Assert over token and app_name."""
89 self.assertEqual(k, APP_NAME)
90 self.assertEqual(val, TOKEN)
91 self.keyring_was_set = True
92 self.keyring_values = k, val
93 return defer.succeed(None)
94
95 self.patch(ubuntu_sso.main.Keyring, "set_credentials", ksc)
96 self.keyring_was_set = False
97 self.keyring_values = None
98
99 def tearDown(self):
100 """Verify the mocking bus and shut it down."""
101 self.mocker.verify()
102 self.mocker.restore()
103
104 def test_creation(self):
105 """Test that the object creation is successful."""
106 self.mocker.replay()
107 SSOLogin(self.mockbusname)
108
109 def create_mock_processor(self):
110 """Create a mock processor from a dummy processor class."""
111 self.mockprocessorclass = self.mocker.mock()
112 mockprocessor = self.mocker.mock()
113 self.mockprocessorclass(ARGS, KWARGS)
114 self.mocker.result(mockprocessor)
115 return mockprocessor
116
117 def test_generate_captcha(self):
118 """Test that the captcha method works ok."""
119 d = Deferred()
120 filename = "sample filename"
121 expected_result = "expected result"
122 self.create_mock_processor().generate_captcha(filename)
123 self.mocker.result(expected_result)
124 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
125 self.mocker.replay()
126
127 def verify(app_name, result):
128 """The actual test."""
129 self.assertEqual(result, expected_result)
130 self.assertEqual(app_name, APP_NAME)
131 d.callback(result)
132
133 client = SSOLogin(self.mockbusname,
134 sso_login_processor_class=self.mockprocessorclass)
135 self.patch(client, "CaptchaGenerated", verify)
136 self.patch(client, "CaptchaGenerationError", d.errback)
137 client.generate_captcha(APP_NAME, filename)
138 return d
139
140 def test_generate_captcha_error(self):
141 """Test that the captcha method fails as expected."""
142 d = Deferred()
143 filename = "sample filename"
144 expected_result = "expected result"
145 self.create_mock_processor().generate_captcha(filename)
146 self.mocker.result(expected_result)
147 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
148 self.mocker.replay()
149
150 def verify(app_name, errdict):
151 """The actual test."""
152 self.assertEqual(errdict["errtype"], "BlockingSampleException")
153 self.assertEqual(app_name, APP_NAME)
154 d.callback("Ok")
155
156 client = SSOLogin(self.mockbusname,
157 sso_login_processor_class=self.mockprocessorclass)
158 self.patch(client, "CaptchaGenerated", d.errback)
159 self.patch(client, "CaptchaGenerationError", verify)
160 client.generate_captcha(APP_NAME, filename)
161 return d
162
163 def test_register_user(self):
164 """Test that the register_user method works ok."""
165 d = Deferred()
166 expected_result = "expected result"
167 self.create_mock_processor().register_user(EMAIL, PASSWORD, NAME,
168 CAPTCHA_ID, CAPTCHA_SOLUTION)
169 self.mocker.result(expected_result)
170 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
171 self.mocker.replay()
172
173 def verify(app_name, result):
174 """The actual test."""
175 self.assertEqual(result, expected_result)
176 self.assertEqual(app_name, APP_NAME)
177 d.callback(result)
178
179 client = SSOLogin(self.mockbusname,
180 sso_login_processor_class=self.mockprocessorclass)
181 self.patch(client, "UserRegistered", verify)
182 self.patch(client, "UserRegistrationError", d.errback)
183 client.register_user(APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID,
184 CAPTCHA_SOLUTION)
185 return d
186
187 def test_register_user_error(self):
188 """Test that the register_user method fails as expected."""
189 d = Deferred()
190 expected_result = "expected result"
191 self.create_mock_processor().register_user(EMAIL, PASSWORD, NAME,
192 CAPTCHA_ID, CAPTCHA_SOLUTION)
193 self.mocker.result(expected_result)
194 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
195 self.mocker.replay()
196
197 def verify(app_name, errdict):
198 """The actual test."""
199 self.assertEqual(errdict["errtype"], "BlockingSampleException")
200 self.assertEqual(app_name, APP_NAME)
201 d.callback("Ok")
202
203 client = SSOLogin(self.mockbusname,
204 sso_login_processor_class=self.mockprocessorclass)
205 self.patch(client, "UserRegistered", d.errback)
206 self.patch(client, "UserRegistrationError", verify)
207 client.register_user(APP_NAME, EMAIL, PASSWORD, NAME, CAPTCHA_ID,
208 CAPTCHA_SOLUTION)
209 return d
210
211 def test_login(self):
212 """Test that the login method works ok."""
213 d = Deferred()
214 processor = self.create_mock_processor()
215 processor.login(EMAIL, PASSWORD, TOKEN_NAME)
216 self.mocker.result(TOKEN)
217 processor.is_validated(TOKEN)
218 self.mocker.result(True)
219 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
220 self.mocker.replay()
221
222 def verify(app_name, result):
223 """The actual test."""
224 self.assertEqual(result, EMAIL)
225 self.assertEqual(app_name, APP_NAME)
226 self.assertTrue(self.keyring_was_set, "The keyring should be set")
227 d.callback(result)
228
229 client = SSOLogin(self.mockbusname,
230 sso_login_processor_class=self.mockprocessorclass)
231 self.patch(client, "LoggedIn", verify)
232 self.patch(client, "LoginError", d.errback)
233 self.patch(client, "UserNotValidated", d.errback)
234 client.login(APP_NAME, EMAIL, PASSWORD)
235 return d
236
237 def test_login_user_not_validated(self):
238 """Test that the login sends EmailNotValidated signal."""
239 d = Deferred()
240 processor = self.create_mock_processor()
241 processor.login(EMAIL, PASSWORD, TOKEN_NAME)
242 self.mocker.result(TOKEN)
243 processor.is_validated(TOKEN)
244 self.mocker.result(False)
245 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
246 self.mocker.replay()
247
248 def verify(app_name, email):
249 """The actual test."""
250 self.assertEqual(app_name, APP_NAME)
251 self.assertEqual(email, EMAIL)
252 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
253 d.callback("Ok")
254
255 client = SSOLogin(self.mockbusname,
256 sso_login_processor_class=self.mockprocessorclass)
257 self.patch(client, "LoggedIn", d.errback)
258 self.patch(client, "LoginError", d.errback)
259 self.patch(client, "UserNotValidated", verify)
260 client.login(APP_NAME, EMAIL, PASSWORD)
261 return d
262
263 def test_login_error_get_token_name(self):
264 """The login method fails as expected when get_token_name fails."""
265 d = Deferred()
266 self.create_mock_processor()
267 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
268
269 def fake_gtn(*args):
270 """A fake get_token_name that fails."""
271 raise BlockingSampleException()
272
273 self.patch(ubuntu_sso.main, "get_token_name", fake_gtn)
274 self.mocker.replay()
275
276 def verify(app_name, errdict):
277 """The actual test."""
278 self.assertEqual(app_name, APP_NAME)
279 self.assertEqual(errdict["errtype"], "BlockingSampleException")
280 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
281 d.callback("Ok")
282
283 client = SSOLogin(self.mockbusname,
284 sso_login_processor_class=self.mockprocessorclass)
285 self.patch(client, "LoggedIn", d.errback)
286 self.patch(client, "LoginError", verify)
287 self.patch(client, "UserNotValidated", d.errback)
288 client.login(APP_NAME, EMAIL, PASSWORD)
289 return d
290
291 def test_login_error_set_credentials(self):
292 """The login method fails as expected when set_credentials fails."""
293 d = Deferred()
294 processor = self.create_mock_processor()
295 processor.login(EMAIL, PASSWORD, TOKEN_NAME)
296 self.mocker.result(TOKEN)
297 processor.is_validated(TOKEN)
298 self.mocker.result(True)
299
300 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
301
302 def fake_set_creds(*args):
303 """A fake Keyring.set_credentials that fails."""
304 return defer.fail(BlockingSampleException())
305
306 self.patch(ubuntu_sso.main.Keyring, "set_credentials", fake_set_creds)
307 self.mocker.replay()
308
309 def verify(app_name, errdict):
310 """The actual test."""
311 self.assertEqual(app_name, APP_NAME)
312 self.assertEqual(errdict["errtype"], "BlockingSampleException")
313 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
314 d.callback("Ok")
315
316 client = SSOLogin(self.mockbusname,
317 sso_login_processor_class=self.mockprocessorclass)
318 fail = lambda app, res: d.errback((app, res))
319 self.patch(client, "LoggedIn", fail)
320 self.patch(client, "LoginError", verify)
321 self.patch(client, "UserNotValidated", fail)
322 client.login(APP_NAME, EMAIL, PASSWORD)
323 return d
324
325 def test_validate_email(self):
326 """Test that the validate_email method works ok."""
327 d = Deferred()
328 self.create_mock_processor().validate_email(EMAIL, PASSWORD,
329 EMAIL_TOKEN, TOKEN_NAME)
330 self.mocker.result(TOKEN)
331 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
332 self.mocker.replay()
333
334 def verify(app_name, result):
335 """The actual test."""
336 self.assertEqual(result, EMAIL)
337 self.assertEqual(app_name, APP_NAME)
338 self.assertTrue(self.keyring_was_set, "The keyring should be set")
339 d.callback(result)
340
341 client = SSOLogin(self.mockbusname,
342 sso_login_processor_class=self.mockprocessorclass)
343 self.patch(client, "EmailValidated", verify)
344 self.patch(client, "EmailValidationError", d.errback)
345 client.validate_email(APP_NAME, EMAIL, PASSWORD, EMAIL_TOKEN)
346 return d
347
348 def test_validate_email_error(self):
349 """Test that the validate_email method fails as expected."""
350 d = Deferred()
351 self.create_mock_processor()
352 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
353
354 def fake_gtn(*args):
355 """A fake get_token_name that fails."""
356 raise BlockingSampleException()
357
358 self.patch(ubuntu_sso.main, "get_token_name", fake_gtn)
359 self.mocker.replay()
360
361 def verify(app_name, errdict):
362 """The actual test."""
363 self.assertEqual(app_name, APP_NAME)
364 self.assertEqual(errdict["errtype"], "BlockingSampleException")
365 self.assertFalse(self.keyring_was_set, "Keyring should not be set")
366 d.callback("Ok")
367
368 client = SSOLogin(self.mockbusname,
369 sso_login_processor_class=self.mockprocessorclass)
370 self.patch(client, "EmailValidated", d.errback)
371 self.patch(client, "EmailValidationError", verify)
372 client.validate_email(APP_NAME, EMAIL, PASSWORD, EMAIL_TOKEN)
373 return d
374
375 def test_request_password_reset_token(self):
376 """Test that the request_password_reset_token method works ok."""
377 d = Deferred()
378 processor = self.create_mock_processor()
379 processor.request_password_reset_token(EMAIL)
380 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
381 self.mocker.result(EMAIL)
382 self.mocker.replay()
383
384 def verify(app_name, result):
385 """The actual test."""
386 self.assertEqual(result, EMAIL)
387 self.assertEqual(app_name, APP_NAME)
388 d.callback(result)
389
390 client = SSOLogin(self.mockbusname,
391 sso_login_processor_class=self.mockprocessorclass)
392 self.patch(client, "PasswordResetTokenSent", verify)
393 self.patch(client, "PasswordResetError", d.errback)
394 client.request_password_reset_token(APP_NAME, EMAIL)
395 return d
396
397 def test_request_password_reset_token_error(self):
398 """Test the request_password_reset_token method fails as expected."""
399 d = Deferred()
400
401 self.create_mock_processor().request_password_reset_token(EMAIL)
402 self.mocker.result(EMAIL)
403 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
404 self.mocker.replay()
405
406 def verify(app_name, errdict):
407 """The actual test."""
408 self.assertEqual(errdict["errtype"], "BlockingSampleException")
409 self.assertEqual(app_name, APP_NAME)
410 d.callback("Ok")
411
412 client = SSOLogin(self.mockbusname,
413 sso_login_processor_class=self.mockprocessorclass)
414 self.patch(client, "PasswordResetTokenSent", d.errback)
415 self.patch(client, "PasswordResetError", verify)
416 client.request_password_reset_token(APP_NAME, EMAIL)
417 return d
418
419 def test_set_new_password(self):
420 """Test that the set_new_password method works ok."""
421 d = Deferred()
422 self.create_mock_processor().set_new_password(EMAIL, EMAIL_TOKEN,
423 PASSWORD)
424 self.mocker.result(EMAIL)
425 self.patch(ubuntu_sso.main.linux, "blocking", fake_ok_blocking)
426 self.mocker.replay()
427
428 def verify(app_name, result):
429 """The actual test."""
430 self.assertEqual(result, EMAIL)
431 self.assertEqual(app_name, APP_NAME)
432 d.callback(result)
433
434 client = SSOLogin(self.mockbusname,
435 sso_login_processor_class=self.mockprocessorclass)
436 self.patch(client, "PasswordChanged", verify)
437 self.patch(client, "PasswordChangeError", d.errback)
438 client.set_new_password(APP_NAME, EMAIL, EMAIL_TOKEN, PASSWORD)
439 return d
440
441 def test_set_new_password_error(self):
442 """Test that the set_new_password method fails as expected."""
443 d = Deferred()
444 expected_result = "expected result"
445
446 self.create_mock_processor().set_new_password(EMAIL, EMAIL_TOKEN,
447 PASSWORD)
448 self.mocker.result(expected_result)
449 self.patch(ubuntu_sso.main.linux, "blocking", fake_err_blocking)
450 self.mocker.replay()
451
452 def verify(app_name, errdict):
453 """The actual test."""
454 self.assertEqual(errdict["errtype"], "BlockingSampleException")
455 self.assertEqual(app_name, APP_NAME)
456 d.callback("Ok")
457
458 client = SSOLogin(self.mockbusname,
459 sso_login_processor_class=self.mockprocessorclass)
460 self.patch(client, "PasswordChanged", d.errback)
461 self.patch(client, "PasswordChangeError", verify)
462 client.set_new_password(APP_NAME, EMAIL, EMAIL_TOKEN, PASSWORD)
463 return d
464
465
466class BlockingFunctionTestCase(TestCase):
467 """Tests for the "blocking" function."""
468
469 timeout = 5
470
471 def test_blocking(self):
472 """Test the normal behaviour."""
473 d = Deferred()
474 expected_result = "expected result"
475
476 def f():
477 """No failure."""
478 return expected_result
479
480 def verify(app_name, result):
481 """The actual test."""
482 self.assertEqual(result, expected_result)
483 self.assertEqual(app_name, APP_NAME)
484 d.callback(result)
485
486 blocking(f, APP_NAME, verify, d.errback)
487 return d
488
489 def test_blocking_error(self):
490 """Test the behaviour when an Exception is raised."""
491 d = Deferred()
492 expected_error_message = "expected error message"
493
494 def f():
495 """Failure."""
496 raise BlockingSampleException(expected_error_message)
497
498 def verify(app_name, errdict):
499 """The actual test."""
500 self.assertEqual(app_name, APP_NAME)
501 self.assertEqual(errdict["errtype"], "BlockingSampleException")
502 self.assertEqual(errdict["message"], expected_error_message)
503 d.callback("Ok")
504
505 blocking(f, APP_NAME, d.errback, verify)
506 return d
507
508
509class TestExceptToErrdictException(Exception):
510 """A dummy exception for the following testcase."""
511
512
513class ExceptToErrdictTestCase(TestCase):
514 """Tests for the except_to_errdict function."""
515
516 def test_first_arg_is_dict(self):
517 """If the first arg is a dict, use it as the base dict."""
518 sample_dict = {
519 "errorcode1": "error message 1",
520 "errorcode2": "error message 2",
521 "errorcode3": "error message 3",
522 }
523 e = TestExceptToErrdictException(sample_dict)
524 result = except_to_errdict(e)
525
526 self.assertEqual(result["errtype"], e.__class__.__name__)
527 for k in sample_dict.keys():
528 self.assertIn(k, result)
529 self.assertEqual(result[k], sample_dict[k])
530
531 def test_first_arg_is_str(self):
532 """If the first arg is a str, use it as the message."""
533 sample_string = "a sample string"
534 e = TestExceptToErrdictException(sample_string)
535 result = except_to_errdict(e)
536 self.assertEqual(result["errtype"], e.__class__.__name__)
537 self.assertEqual(result["message"], sample_string)
538
539 def test_first_arg_is_unicode(self):
540 """If the first arg is a unicode, use it as the message."""
541 sample_string = u"a sample string"
542 e = TestExceptToErrdictException(sample_string)
543 result = except_to_errdict(e)
544 self.assertEqual(result["errtype"], e.__class__.__name__)
545 self.assertEqual(result["message"], sample_string)
546
547 def test_no_args_at_all(self):
548 """If there are no args, use the class docstring."""
549 e = TestExceptToErrdictException()
550 result = except_to_errdict(e)
551 self.assertEqual(result["errtype"], e.__class__.__name__)
552 self.assertEqual(result["message"], e.__class__.__doc__)
553
554 def test_some_other_thing_as_first_arg(self):
555 """If first arg is not basestring nor dict, then repr all args."""
556 sample_args = (None, u"unicode2\ufffd", "errorcode3")
557 e = TestExceptToErrdictException(*sample_args)
558 result = except_to_errdict(e)
559 self.assertEqual(result["errtype"], e.__class__.__name__)
560
561
562class RegisterSampleException(Exception):
563 """A mock exception thrown just when testing."""
564
565
566class ApplicationCredentialsTestCase(TestCase, MockerTestCase):
567 """Tests for the ApplicationCredentials related DBus methods."""
568
569 timeout = 5
570
571 def setUp(self):
572 MockerTestCase.setUp(self)
573
574 self.client = SSOCredentials(self.mocker.mock())
575
576 mock_class = self.mocker.replace("ubuntu_sso.credentials.Credentials")
577 mock_class(app_name=APP_NAME)
578 self.creds_obj = self.mocker.mock()
579 self.mocker.result(self.creds_obj)
580
581 @inlineCallbacks
582 def test_find_credentials(self):
583 """find_credentials immediately returns the token when found."""
584 expected_creds = "expected creds"
585 self.creds_obj.find_credentials()
586 self.mocker.result(defer.succeed(expected_creds))
587 self.mocker.replay()
588
589 d = Deferred()
590 self.client.find_credentials(APP_NAME, d.callback, d.errback)
591 creds = yield d
592 self.assertEqual(creds, expected_creds)
593
594 @inlineCallbacks
595 def test_credentials_not_found(self):
596 """find_credentials immediately returns {} when no creds found."""
597 expected_creds = {}
598 self.creds_obj.find_credentials()
599 self.mocker.result(defer.succeed(expected_creds))
600 self.mocker.replay()
601
602 d = Deferred()
603 self.client.find_credentials(APP_NAME, d.callback, d.errback)
604 creds = yield d
605 self.assertEqual(creds, expected_creds)
606
607
608class ApplicationCredentialsGUITestCase(TestCase, MockerTestCase):
609 """Tests for the ApplicationCredentials register/login DBus method."""
610
611 app_name = APP_NAME
612 ping_url = None
613
614 def setUp(self):
615 MockerTestCase.setUp(self)
616 self.client = SSOCredentials(self.mocker.mock())
617 self.args = {PING_URL_KEY: self.ping_url,
618 TC_URL_KEY: TC_URL, HELP_TEXT_KEY: HELP_TEXT,
619 WINDOW_ID_KEY: WINDOW_ID,
620 SUCCESS_CB_KEY: self.client.CredentialsFound,
621 ERROR_CB_KEY: self.client._process_error,
622 DENIAL_CB_KEY: self.client.AuthorizationDenied}
623
624 def test_login_or_register(self):
625 """login_or_register is correct."""
626 mock_class = self.mocker.replace("ubuntu_sso.credentials.Credentials")
627 mock_class(app_name=self.app_name, **self.args)
628 creds_obj = self.mocker.mock()
629 self.mocker.result(creds_obj)
630
631 creds_obj.register()
632 self.mocker.replay()
633
634 args = (self.app_name, TC_URL, HELP_TEXT, WINDOW_ID)
635 self.client.login_or_register_to_get_credentials(*args)
636
637 def test_login_only(self):
638 """login_or_register is correct."""
639 self.args[TC_URL_KEY] = None
640 mock_class = self.mocker.replace("ubuntu_sso.credentials.Credentials")
641 mock_class(app_name=self.app_name, **self.args)
642 creds_obj = self.mocker.mock()
643 self.mocker.result(creds_obj)
644
645 creds_obj.login()
646 self.mocker.replay()
647
648 args = (self.app_name, HELP_TEXT, WINDOW_ID)
649 self.client.login_to_get_credentials(*args)
650
651
652class ApplicationCredentialsU1TestCase(ApplicationCredentialsGUITestCase):
653 """Tests for the ApplicationCredentials register/login DBus method.
654
655 Specifically for APP_NAME == U1_APP_NAME.
656
657 """
658
659 app_name = U1_APP_NAME
660 ping_url = U1_PING_URL
661
662
663class ApplicationCredentialsClearTokenTestCase(TestCase, MockerTestCase):
664 """Tests for the ApplicationCredentials related DBus methods."""
665
666 def test_clear_token(self):
667 """Check that clear_token tries removing the correct token."""
668 mock_class = self.mocker.replace("ubuntu_sso.credentials.Credentials")
669 mock_class(app_name=APP_NAME)
670 creds_obj = self.mocker.mock()
671 self.mocker.result(creds_obj)
672
673 creds_obj.clear_credentials()
674 self.mocker.result(defer.succeed(None))
675 self.mocker.replay()
676
677 client = SSOCredentials(self.mocker.mock())
678 client.clear_token(APP_NAME)
679
680
681class EnvironOverridesTestCase(TestCase):
682 """Some URLs can be set from the environment for testing/QA purposes."""
683
684 def test_override_ping_url(self):
685 """The ping url can be set from the environ via USSOC_PING_URL."""
686 fake_url = 'this is not really a URL'
687 old_url = os.environ.get('USSOC_PING_URL')
688 os.environ['USSOC_PING_URL'] = fake_url
689 try:
690 creds = SSOCredentials(None)
691 self.assertEqual(creds.root.ping_url, fake_url)
692 finally:
693 if old_url:
694 os.environ['USSOC_PING_URL'] = old_url
695 else:
696 del os.environ['USSOC_PING_URL']
697
698 def test_no_override_ping_url(self):
699 """If the environ is unset, the default ping url is used."""
700 creds = SSOCredentials(None)
701 self.assertEqual(creds.root.ping_url, U1_PING_URL)
702
703
704class CredentialsManagementTestCase(TestCase):
705 """Tests for the CredentialsManagement DBus interface."""
706
707 timeout = 2
708 base_args = {HELP_TEXT_KEY: HELP_TEXT, PING_URL_KEY: PING_URL,
709 TC_URL_KEY: TC_URL, WINDOW_ID_KEY: WINDOW_ID,
710 UI_CLASS_KEY: 'SuperUI', UI_MODULE_KEY: 'foo.bar.baz',
711 }
712
713 def setUp(self):
714 super(CredentialsManagementTestCase, self).setUp()
715
716 self.mocker = Mocker()
717 self.client = CredentialsManagement(timeout_func=lambda *a: None,
718 shutdown_func=lambda *a: None)
719 self.args = {}
720 self.cred_args = {}
721
722 self.memento = MementoHandler()
723 self.memento.setLevel(logging.DEBUG)
724 ubuntu_sso.main.logger.addHandler(self.memento)
725
726 def tearDown(self):
727 """Verify the mocking stuff and shut it down."""
728 self.mocker.verify()
729 self.mocker.restore()
730 super(CredentialsManagementTestCase, self).tearDown()
731
732 def assert_dbus_method_correct(self, method):
733 """Check that 'method' is a dbus method with proper signatures."""
734 self.assertTrue(method._dbus_is_method)
735 self.assertEqual(method._dbus_interface, DBUS_CREDENTIALS_IFACE)
736 self.assertEqual(method._dbus_in_signature, 'sa{ss}')
737 self.assertEqual(method._dbus_out_signature, '')
738
739 def create_mock_backend(self):
740 """Create a mock backend."""
741 mock_class = self.mocker.replace("ubuntu_sso.credentials.Credentials")
742 mock_class(APP_NAME, **self.cred_args)
743 creds_obj = self.mocker.mock()
744 self.mocker.result(creds_obj)
745
746 return creds_obj
747
748 def test_is_dbus_object(self):
749 """CredentialsManagement is a Dbus object."""
750 self.assertIsInstance(self.client,
751 ubuntu_sso.main.linux.dbus.service.Object)
752
753
754class FakeCredentials(object):
755 """A very dummy Credentials object."""
756
757 def __init__(self, *a, **kw):
758 self.find_credentials = lambda *a: defer.succeed(TOKEN)
759 self.clear_credentials = lambda *a: defer.succeed(None)
760 self.store_credentials = lambda *a: defer.succeed(None)
761 self.login = self.register = lambda *a: None
762
763
764class CredentialsManagementRefCountingTestCase(CredentialsManagementTestCase):
765 """Tests for the CredentialsManagement ref counting."""
766
767 def setUp(self):
768 super(CredentialsManagementRefCountingTestCase, self).setUp()
769 self.patch(ubuntu_sso.main, 'Credentials', FakeCredentials)
770
771 def test_ref_counting(self):
772 """Ref counting is in place."""
773 self.assertEqual(self.client.ref_count, 0)
774
775 def test_find_credentials(self):
776 """Keep proper track of on going requests."""
777 d = Deferred()
778
779 def verify(*args):
780 """Make the check."""
781 self.assertEqual(self.client.ref_count, 1)
782 d.callback(True)
783
784 self.patch(self.client, 'CredentialsFound', verify)
785 self.client.find_credentials(APP_NAME, self.args)
786
787 return d
788
789 def test_clear_credentials(self):
790 """Keep proper track of on going requests."""
791 d = Deferred()
792
793 def verify(*args):
794 """Make the check."""
795 self.assertEqual(self.client.ref_count, 1)
796 d.callback(True)
797
798 self.patch(self.client, 'CredentialsCleared', verify)
799 self.client.clear_credentials(APP_NAME, self.args)
800
801 return d
802
803 def test_store_credentials(self):
804 """Keep proper track of on going requests."""
805 d = Deferred()
806
807 def verify(*args):
808 """Make the check."""
809 self.assertEqual(self.client.ref_count, 1)
810 d.callback(True)
811
812 self.patch(self.client, 'CredentialsStored', verify)
813 self.client.store_credentials(APP_NAME, self.args)
814
815 return d
816
817 def test_register(self):
818 """Keep proper track of on going requests."""
819 self.client.register(APP_NAME, self.args)
820
821 self.assertEqual(self.client.ref_count, 1)
822
823 def test_login(self):
824 """Keep proper track of on going requests."""
825 self.client.login(APP_NAME, self.args)
826
827 self.assertEqual(self.client.ref_count, 1)
828
829 def test_several_requests(self):
830 """Requests can be nested."""
831 self.client.login(APP_NAME, self.args)
832 self.client.register(APP_NAME, self.args)
833 self.client.login(APP_NAME, self.args)
834 self.client.register(APP_NAME, self.args)
835 self.client.register(APP_NAME, self.args)
836
837 self.assertEqual(self.client.ref_count, 5)
838
839 def test_credentials_found(self):
840 """Ref counter is decreased when a signal is sent."""
841 self.client.ref_count = 3
842 self.client.CredentialsFound(APP_NAME, TOKEN)
843
844 self.assertEqual(self.client.ref_count, 2)
845
846 def test_credentials_not_found(self):
847 """Ref counter is decreased when a signal is sent."""
848 self.client.ref_count = 3
849 self.client.CredentialsNotFound(APP_NAME)
850
851 self.assertEqual(self.client.ref_count, 2)
852
853 def test_credentials_cleared(self):
854 """Ref counter is decreased when a signal is sent."""
855 self.client.ref_count = 3
856 self.client.CredentialsCleared(APP_NAME)
857
858 self.assertEqual(self.client.ref_count, 2)
859
860 def test_credentials_stored(self):
861 """Ref counter is decreased when a signal is sent."""
862 self.client.ref_count = 3
863 self.client.CredentialsStored(APP_NAME)
864
865 self.assertEqual(self.client.ref_count, 2)
866
867 def test_credentials_error(self):
868 """Ref counter is decreased when a signal is sent."""
869 self.client.ref_count = 3
870 self.client.CredentialsError(APP_NAME, {'error_type': 'test'})
871
872 self.assertEqual(self.client.ref_count, 2)
873
874 def test_authorization_denied(self):
875 """Ref counter is decreased when a signal is sent."""
876 self.client.ref_count = 3
877 self.client.AuthorizationDenied(APP_NAME)
878
879 self.assertEqual(self.client.ref_count, 2)
880
881 def test_credentials_found_when_ref_count_is_not_positive(self):
882 """Ref counter is decreased when a signal is sent."""
883 self.client._ref_count = -3
884 self.client.CredentialsFound(APP_NAME, TOKEN)
885
886 self.assertEqual(self.client.ref_count, 0)
887 msg = 'Attempting to decrease ref_count to a negative value (-4).'
888 self.assertTrue(self.memento.check_warning(msg))
889
890 def test_credentials_not_found_when_ref_count_is_not_positive(self):
891 """Ref counter is decreased when a signal is sent."""
892 self.client._ref_count = -3
893 self.client.CredentialsNotFound(APP_NAME)
894
895 self.assertEqual(self.client.ref_count, 0)
896 msg = 'Attempting to decrease ref_count to a negative value (-4).'
897 self.assertTrue(self.memento.check_warning(msg))
898
899 def test_credentials_cleared_when_ref_count_is_not_positive(self):
900 """Ref counter is decreased when a signal is sent."""
901 self.client._ref_count = -3
902 self.client.CredentialsCleared(APP_NAME)
903
904 self.assertEqual(self.client.ref_count, 0)
905 msg = 'Attempting to decrease ref_count to a negative value (-4).'
906 self.assertTrue(self.memento.check_warning(msg))
907
908 def test_credentials_stored_when_ref_count_is_not_positive(self):
909 """Ref counter is decreased when a signal is sent."""
910 self.client._ref_count = -3
911 self.client.CredentialsStored(APP_NAME)
912
913 self.assertEqual(self.client.ref_count, 0)
914 msg = 'Attempting to decrease ref_count to a negative value (-4).'
915 self.assertTrue(self.memento.check_warning(msg))
916
917 def test_credentials_error_when_ref_count_is_not_positive(self):
918 """Ref counter is decreased when a signal is sent."""
919 self.client._ref_count = -3
920 self.client.CredentialsError(APP_NAME, {'error_type': 'test'})
921
922 self.assertEqual(self.client.ref_count, 0)
923 msg = 'Attempting to decrease ref_count to a negative value (-4).'
924 self.assertTrue(self.memento.check_warning(msg))
925
926 def test_autorization_denied_when_ref_count_is_not_positive(self):
927 """Ref counter is decreased when a signal is sent."""
928 self.client._ref_count = -3
929 self.client.AuthorizationDenied(APP_NAME)
930
931 self.assertEqual(self.client.ref_count, 0)
932 msg = 'Attempting to decrease ref_count to a negative value (-4).'
933 self.assertTrue(self.memento.check_warning(msg))
934
935 def test_on_zero_ref_count_shutdown(self):
936 """When ref count reaches 0, queue shutdown op."""
937 self.client.timeout_func = self._set_called
938 self.client.login(APP_NAME, self.args)
939 self.client.CredentialsFound(APP_NAME, TOKEN)
940
941 self.assertEqual(self._called,
942 ((TIMEOUT_INTERVAL, self.client.shutdown), {}))
943
944 def test_on_non_zero_ref_count_do_not_shutdown(self):
945 """If ref count is not 0, do not queue shutdown op."""
946 self.client.timeout_func = self._set_called
947 self.client.login(APP_NAME, self.args)
948
949 self.assertEqual(self._called, False)
950
951 def test_on_non_zero_ref_count_after_zero_do_not_shutdown(self):
952 """If the shutdown was queued, do not quit if counter is not zero."""
953
954 def fake_timeout_func(interval, func):
955 """Start a new request when the timer is started."""
956 self.client.register(APP_NAME, self.args)
957 assert self.client.ref_count > 0
958 func()
959
960 self.client.timeout_func = fake_timeout_func
961 self.client.shutdown_func = self._set_called
962
963 self.client.login(APP_NAME, self.args)
964 self.client.CredentialsFound(APP_NAME, TOKEN)
965 # counter reached 0, timeout_func was called
966
967 self.assertEqual(self._called, False, 'shutdown_func was not called')
968
969 def test_zero_ref_count_after_zero_do_shutdown(self):
970 """If the shutdown was queued, do quit if counter is zero."""
971
972 def fake_timeout_func(interval, func):
973 """Start a new request when the timer is started."""
974 assert self.client.ref_count == 0
975 func()
976
977 self.client.timeout_func = fake_timeout_func
978 self.client.shutdown_func = self._set_called
979
980 self.client.login(APP_NAME, self.args)
981 self.client.CredentialsFound(APP_NAME, TOKEN)
982 # counter reached 0, timeout_func was called
983
984 self.assertEqual(self._called, ((), {}), 'shutdown_func was called')
985
986
987class CredentialsManagementFindTestCase(CredentialsManagementTestCase):
988 """Tests for the CredentialsManagement find method."""
989
990 def test_find_credentials(self):
991 """The credentials are asked and returned in signals."""
992 self.create_mock_backend().find_credentials()
993 self.mocker.result(defer.succeed(None))
994 self.mocker.replay()
995
996 self.client.find_credentials(APP_NAME, self.args)
997 self.assert_dbus_method_correct(self.client.find_credentials)
998
999 def test_find_credentials_does_not_block_when_found(self):
1000 """Calling find_credentials does not block but return thru signals.
1001
1002 If the creds are found, CredentialsFound is emitted.
1003
1004 """
1005 d = Deferred()
1006
1007 def verify(app_name, creds):
1008 """The actual test."""
1009 try:
1010 self.assertEqual(app_name, APP_NAME)
1011 self.assertEqual(creds, TOKEN)
1012 except Exception, e: # pylint: disable=W0703
1013 d.errback(e)
1014 else:
1015 d.callback(creds)
1016
1017 self.patch(self.client, 'CredentialsFound', verify)
1018 self.patch(self.client, 'CredentialsNotFound', d.errback)
1019
1020 self.create_mock_backend().find_credentials()
1021 self.mocker.result(defer.succeed(TOKEN))
1022 self.mocker.replay()
1023
1024 self.client.find_credentials(APP_NAME, self.args)
1025 return d
1026
1027 def test_find_credentials_does_not_block_when_not_found(self):
1028 """Calling find_credentials does not block but return thru signals.
1029
1030 If the creds are not found, CredentialsNotFound is emitted.
1031
1032 """
1033 d = Deferred()
1034
1035 def verify(app_name):
1036 """The actual test."""
1037 try:
1038 self.assertEqual(app_name, APP_NAME)
1039 except Exception, e: # pylint: disable=W0703
1040 d.errback(e)
1041 else:
1042 d.callback(app_name)
1043
1044 self.patch(self.client, 'CredentialsFound',
1045 lambda app, creds: d.errback(app))
1046 self.patch(self.client, 'CredentialsNotFound', verify)
1047
1048 self.create_mock_backend().find_credentials()
1049 self.mocker.result(defer.succeed({}))
1050 self.mocker.replay()
1051
1052 self.client.find_credentials(APP_NAME, self.args)
1053 return d
1054
1055 def test_find_credentials_error(self):
1056 """If find_credentials fails, CredentialsError is sent."""
1057 d = Deferred()
1058
1059 def verify(app_name, errdict):
1060 """The actual test."""
1061 self.assertEqual(errdict["errtype"], "BlockingSampleException")
1062 self.assertEqual(app_name, APP_NAME)
1063 d.callback("Ok")
1064
1065 self.patch(self.client, 'CredentialsFound',
1066 lambda app, creds: d.errback(app))
1067 self.patch(self.client, 'CredentialsNotFound', d.errback)
1068 self.patch(self.client, 'CredentialsError', verify)
1069
1070 self.create_mock_backend().find_credentials()
1071 self.mocker.result(defer.fail(BlockingSampleException()))
1072 self.mocker.replay()
1073
1074 self.client.find_credentials(APP_NAME, self.args)
1075 return d
1076
1077
1078class CredentialsManagementClearTestCase(CredentialsManagementTestCase):
1079 """Tests for the CredentialsManagement clear method."""
1080
1081 def test_clear_credentials(self):
1082 """The credentials are removed."""
1083 self.create_mock_backend().clear_credentials()
1084 self.mocker.result(defer.succeed(APP_NAME))
1085 self.mocker.replay()
1086
1087 self.client.clear_credentials(APP_NAME, self.args)
1088 self.assert_dbus_method_correct(self.client.clear_credentials)
1089
1090 def test_clear_credentials_does_not_block(self):
1091 """Calling clear_credentials does not block but return thru signals."""
1092 d = Deferred()
1093
1094 def verify(app_name):
1095 """The actual test."""
1096 try:
1097 self.assertEqual(app_name, APP_NAME)
1098 except Exception, e: # pylint: disable=W0703
1099 d.errback(e)
1100 else:
1101 d.callback(app_name)
1102
1103 self.patch(self.client, 'CredentialsCleared', verify)
1104 self.patch(self.client, 'CredentialsError',
1105 lambda app, err: d.errback(app))
1106
1107 self.create_mock_backend().clear_credentials()
1108 self.mocker.result(defer.succeed(APP_NAME))
1109 self.mocker.replay()
1110
1111 self.client.clear_credentials(APP_NAME, self.args)
1112 return d
1113
1114 def test_clear_credentials_error(self):
1115 """If clear_credentials fails, CredentialsError is sent."""
1116 d = Deferred()
1117
1118 def verify(app_name, errdict):
1119 """The actual test."""
1120 self.assertEqual(errdict["errtype"], "BlockingSampleException")
1121 self.assertEqual(app_name, APP_NAME)
1122 d.callback("Ok")
1123
1124 self.patch(self.client, 'CredentialsCleared', d.errback)
1125 self.patch(self.client, 'CredentialsError', verify)
1126
1127 self.create_mock_backend().clear_credentials()
1128 self.mocker.result(defer.fail(BlockingSampleException()))
1129 self.mocker.replay()
1130
1131 self.client.clear_credentials(APP_NAME, self.args)
1132 return d
1133
1134
1135class CredentialsManagementStoreTestCase(CredentialsManagementTestCase):
1136 """Tests for the CredentialsManagement store method."""
1137
1138 def test_store_credentials(self):
1139 """The credentials are stored and the outcome is a signal."""
1140 self.create_mock_backend().store_credentials(TOKEN)
1141 self.mocker.result(defer.succeed(APP_NAME))
1142 self.mocker.replay()
1143
1144 self.client.store_credentials(APP_NAME, TOKEN)
1145 self.assert_dbus_method_correct(self.client.store_credentials)
1146
1147 def test_store_credentials_does_not_block(self):
1148 """Calling store_credentials does not block but return thru signals.
1149
1150 If the creds are stored, CredentialsStored is emitted.
1151
1152 """
1153 d = Deferred()
1154
1155 def verify(app_name):
1156 """The actual test."""
1157 try:
1158 self.assertEqual(app_name, APP_NAME)
1159 except Exception, e: # pylint: disable=W0703
1160 d.errback(e)
1161 else:
1162 d.callback(app_name)
1163
1164 self.patch(self.client, 'CredentialsStored', verify)
1165 self.patch(self.client, 'CredentialsError',
1166 lambda app, err: d.errback(app))
1167
1168 self.create_mock_backend().store_credentials(TOKEN)
1169 self.mocker.result(defer.succeed(APP_NAME))
1170 self.mocker.replay()
1171
1172 self.client.store_credentials(APP_NAME, TOKEN)
1173 return d
1174
1175 def test_store_credentials_error(self):
1176 """If store_credentials fails, CredentialsError is sent."""
1177 d = Deferred()
1178
1179 def verify(app_name, errdict):
1180 """The actual test."""
1181 self.assertEqual(errdict["errtype"], "BlockingSampleException")
1182 self.assertEqual(app_name, APP_NAME)
1183 d.callback("Ok")
1184
1185 self.patch(self.client, 'CredentialsStored', d.errback)
1186 self.patch(self.client, 'CredentialsError', verify)
1187
1188 self.create_mock_backend().store_credentials(TOKEN)
1189 self.mocker.result(defer.fail(BlockingSampleException()))
1190 self.mocker.replay()
1191
1192 self.client.store_credentials(APP_NAME, TOKEN)
1193 return d
1194
1195
1196class CredentialsManagementOpsTestCase(CredentialsManagementTestCase):
1197 """Tests for the CredentialsManagement login/register methods."""
1198
1199 def setUp(self):
1200 super(CredentialsManagementOpsTestCase, self).setUp()
1201 self.args = dict((k, str(v)) for k, v in self.base_args.iteritems())
1202 self.cred_args = self.base_args.copy()
1203 self.cred_args[SUCCESS_CB_KEY] = self.client.CredentialsFound
1204 self.cred_args[ERROR_CB_KEY] = self.client.CredentialsError
1205 self.cred_args[DENIAL_CB_KEY] = self.client.AuthorizationDenied
1206
1207 def test_register(self):
1208 """The registration is correct."""
1209 self.create_mock_backend().register()
1210 self.mocker.replay()
1211
1212 self.client.register(APP_NAME, self.args)
1213 self.assert_dbus_method_correct(self.client.register)
1214
1215 def test_login(self):
1216 """The login is correct."""
1217 self.create_mock_backend().login()
1218 self.mocker.replay()
1219
1220 self.client.login(APP_NAME, self.args)
1221 self.assert_dbus_method_correct(self.client.login)
1222
1223
1224class CredentialsManagementParamsTestCase(CredentialsManagementOpsTestCase):
1225 """Tests for the CredentialsManagement extra parameters handling."""
1226
1227 def setUp(self):
1228 super(CredentialsManagementParamsTestCase, self).setUp()
1229 self.args['dummy'] = 'nothing useful'
1230
1231
1232class CredentialsManagementSignalsTestCase(TestCase):
1233 """Tests for the CredentialsManagement DBus signals."""
1234
1235 def setUp(self):
1236 self.client = CredentialsManagement(timeout_func=lambda *a: None,
1237 shutdown_func=lambda *a: None)
1238
1239 self.memento = MementoHandler()
1240 self.memento.setLevel(logging.DEBUG)
1241 ubuntu_sso.main.logger.addHandler(self.memento)
1242
1243 def assert_dbus_signal_correct(self, signal, signature):
1244 """Check that 'signal' is a dbus signal with proper 'signature'."""
1245 self.assertTrue(signal._dbus_is_signal)
1246 self.assertEqual(signal._dbus_interface, DBUS_CREDENTIALS_IFACE)
1247 self.assertEqual(signal._dbus_signature, signature)
1248
1249 def test_credentials_found(self):
1250 """The CredentialsFound signal."""
1251 self.client.CredentialsFound(APP_NAME, TOKEN)
1252 msgs = (self.client.__class__.__name__,
1253 self.client.CredentialsFound.__name__, APP_NAME)
1254 self.assertTrue(self.memento.check_info(*msgs))
1255
1256 msg = 'credentials must not be logged (found %r in log).'
1257 for val in TOKEN.itervalues():
1258 self.assertFalse(self.memento.check_info(val), msg % val)
1259
1260 self.assert_dbus_signal_correct(self.client.CredentialsFound, 'sa{ss}')
1261
1262 def test_credentials_not_found(self):
1263 """The CredentialsNotFound signal."""
1264 self.client.CredentialsNotFound(APP_NAME)
1265 msgs = (self.client.__class__.__name__,
1266 self.client.CredentialsNotFound.__name__, APP_NAME)
1267 self.assertTrue(self.memento.check_info(*msgs))
1268 self.assert_dbus_signal_correct(self.client.CredentialsNotFound, 's')
1269
1270 def test_credentials_cleared(self):
1271 """The CredentialsCleared signal."""
1272 self.client.CredentialsCleared(APP_NAME)
1273 msgs = (self.client.__class__.__name__,
1274 self.client.CredentialsCleared.__name__, APP_NAME)
1275 self.assertTrue(self.memento.check_info(*msgs))
1276
1277 self.assert_dbus_signal_correct(self.client.CredentialsCleared, 's')
1278
1279 def test_credentials_stored(self):
1280 """The CredentialsStored signal."""
1281 self.client.CredentialsStored(APP_NAME)
1282 msgs = (self.client.__class__.__name__,
1283 self.client.CredentialsStored.__name__, APP_NAME)
1284 self.assertTrue(self.memento.check_info(*msgs))
1285
1286 self.assert_dbus_signal_correct(self.client.CredentialsStored, 's')
1287
1288 def test_credentials_error(self):
1289 """The CredentialsError signal."""
1290 error = {'error_message': 'failed!', 'detailed error': 'yadda yadda'}
1291 self.client.CredentialsError(APP_NAME, error)
1292 msgs = (self.client.__class__.__name__,
1293 self.client.CredentialsError.__name__,
1294 APP_NAME, str(error))
1295 self.assertTrue(self.memento.check_error(*msgs))
1296
1297 self.assert_dbus_signal_correct(self.client.CredentialsError, 'sa{ss}')
1298
1299 def test_authorization_denied(self):
1300 """The AuthorizationDenied signal."""
1301 self.client.AuthorizationDenied(APP_NAME)
1302 msgs = (self.client.__class__.__name__,
1303 self.client.AuthorizationDenied.__name__, APP_NAME)
1304 self.assertTrue(self.memento.check_info(*msgs))
1305
1306 self.assert_dbus_signal_correct(self.client.AuthorizationDenied, 's')
01307
=== added file 'ubuntu_sso/main/tests/test_windows.py'
--- ubuntu_sso/main/tests/test_windows.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/tests/test_windows.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,17 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Windows tests."""
018
=== added file 'ubuntu_sso/main/windows.py'
--- ubuntu_sso/main/windows.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/main/windows.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,17 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Main implementation on windows."""
018
=== added directory 'ubuntu_sso/networkstate'
=== removed file 'ubuntu_sso/networkstate.py'
--- ubuntu_sso/networkstate.py 2010-10-11 13:22:16 +0000
+++ ubuntu_sso/networkstate.py 1970-01-01 00:00:00 +0000
@@ -1,108 +0,0 @@
1# -*- coding: utf-8 -*-
2#
3# networkstate - detect the current state of the network
4#
5# Author: Alejandro J. Cura <alecu@canonical.com>
6#
7# Copyright 2010 Canonical Ltd.
8#
9# This program is free software: you can redistribute it and/or modify it
10# under the terms of the GNU General Public License version 3, as published
11# by the Free Software Foundation.
12#
13# This program is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranties of
15# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16# PURPOSE. See the GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License along
19# with this program. If not, see <http://www.gnu.org/licenses/>.
20"""Implementation of network state detection."""
21
22import dbus
23
24from ubuntu_sso.logger import setup_logging
25logger = setup_logging("ubuntu_sso.networkstate")
26
27# Values returned by the callback
28ONLINE, OFFLINE, UNKNOWN = object(), object(), object()
29
30NM_STATE_NAMES = {
31 ONLINE: "online",
32 OFFLINE: "offline",
33 UNKNOWN: "unknown",
34}
35
36# Internal NetworkManager State constants
37NM_STATE_UNKNOWN = 0
38NM_STATE_ASLEEP = 1
39NM_STATE_CONNECTING = 2
40NM_STATE_CONNECTED = 3
41NM_STATE_DISCONNECTED = 4
42
43NM_DBUS_INTERFACE = "org.freedesktop.NetworkManager"
44NM_DBUS_OBJECTPATH = "/org/freedesktop/NetworkManager"
45DBUS_UNKNOWN_SERVICE = "org.freedesktop.DBus.Error.ServiceUnknown"
46
47
48class NetworkManagerState(object):
49 """Checks the state of NetworkManager thru DBus."""
50
51 def __init__(self, result_cb, dbus_module=dbus):
52 """Initialize this instance with a result and error callbacks."""
53 self.result_cb = result_cb
54 self.dbus = dbus_module
55 self.state_signal = None
56
57 def call_result_cb(self, state):
58 """Return the state thru the result callback."""
59 if self.state_signal:
60 self.state_signal.remove()
61 self.result_cb(state)
62
63 def got_state(self, state):
64 """Called by DBus when the state is retrieved from NM."""
65 if state == NM_STATE_CONNECTED:
66 self.call_result_cb(ONLINE)
67 elif state == NM_STATE_CONNECTING:
68 logger.debug("Currently connecting, waiting for signal")
69 else:
70 self.call_result_cb(OFFLINE)
71
72 def got_error(self, error):
73 """Called by DBus when the state is retrieved from NM."""
74 if isinstance(error, self.dbus.exceptions.DBusException) and \
75 error.get_dbus_name() == DBUS_UNKNOWN_SERVICE:
76 logger.debug("Network Manager not present")
77 self.call_result_cb(UNKNOWN)
78 else:
79 logger.error("Error contacting NetworkManager: %s" % \
80 str(error))
81 self.call_result_cb(UNKNOWN)
82
83 def state_changed(self, state):
84 """Called when a signal is emmited by Network Manager."""
85 if int(state) == NM_STATE_CONNECTED:
86 self.call_result_cb(ONLINE)
87 elif int(state) == NM_STATE_DISCONNECTED:
88 self.call_result_cb(OFFLINE)
89 else:
90 logger.debug("Not yet connected: continuing to wait")
91
92 def find_online_state(self):
93 """Get the network state and return it thru the set callback."""
94 try:
95 sysbus = self.dbus.SystemBus()
96 nm_proxy = sysbus.get_object(NM_DBUS_INTERFACE,
97 NM_DBUS_OBJECTPATH,
98 follow_name_owner_changes=True)
99 nm_if = self.dbus.Interface(nm_proxy, NM_DBUS_INTERFACE)
100 self.state_signal = nm_if.connect_to_signal(
101 signal_name="StateChanged",
102 handler_function=self.state_changed,
103 dbus_interface=NM_DBUS_INTERFACE)
104 nm_proxy.Get(NM_DBUS_INTERFACE, "State",
105 reply_handler=self.got_state,
106 error_handler=self.got_error)
107 except Exception, e: # pylint: disable=W0703
108 self.got_error(e)
1090
=== added file 'ubuntu_sso/networkstate/__init__.py'
--- ubuntu_sso/networkstate/__init__.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/networkstate/__init__.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,40 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Platform specific network status."""
18
19import sys
20
21# ignore global naming issues.
22# pylint: disable=C0103
23
24NetworkManagerState = None
25ONLINE = None
26OFFLINE = None
27UNKNOWN = None
28
29if sys.platform == 'win32':
30 from ubuntu_sso.networkstate import windows
31 NetworkManagerState = windows.NetworkManagerState
32 ONLINE = windows.ONLINE
33 OFFLINE = windows.OFFLINE
34 UNKNOWN = windows.UNKNOWN
35else:
36 from ubuntu_sso.networkstate import linux
37 NetworkManagerState = linux.NetworkManagerState
38 ONLINE = linux.ONLINE
39 OFFLINE = linux.OFFLINE
40 UNKNOWN = linux.UNKNOWN
041
=== added file 'ubuntu_sso/networkstate/linux.py'
--- ubuntu_sso/networkstate/linux.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/networkstate/linux.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,108 @@
1# -*- coding: utf-8 -*-
2#
3# networkstate - detect the current state of the network
4#
5# Author: Alejandro J. Cura <alecu@canonical.com>
6#
7# Copyright 2010 Canonical Ltd.
8#
9# This program is free software: you can redistribute it and/or modify it
10# under the terms of the GNU General Public License version 3, as published
11# by the Free Software Foundation.
12#
13# This program is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranties of
15# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16# PURPOSE. See the GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License along
19# with this program. If not, see <http://www.gnu.org/licenses/>.
20"""Implementation of network state detection."""
21
22import dbus
23
24from ubuntu_sso.logger import setup_logging
25logger = setup_logging("ubuntu_sso.networkstate")
26
27# Values returned by the callback
28ONLINE, OFFLINE, UNKNOWN = object(), object(), object()
29
30NM_STATE_NAMES = {
31 ONLINE: "online",
32 OFFLINE: "offline",
33 UNKNOWN: "unknown",
34}
35
36# Internal NetworkManager State constants
37NM_STATE_UNKNOWN = 0
38NM_STATE_ASLEEP = 1
39NM_STATE_CONNECTING = 2
40NM_STATE_CONNECTED = 3
41NM_STATE_DISCONNECTED = 4
42
43NM_DBUS_INTERFACE = "org.freedesktop.NetworkManager"
44NM_DBUS_OBJECTPATH = "/org/freedesktop/NetworkManager"
45DBUS_UNKNOWN_SERVICE = "org.freedesktop.DBus.Error.ServiceUnknown"
46
47
48class NetworkManagerState(object):
49 """Checks the state of NetworkManager thru DBus."""
50
51 def __init__(self, result_cb, dbus_module=dbus):
52 """Initialize this instance with a result and error callbacks."""
53 self.result_cb = result_cb
54 self.dbus = dbus_module
55 self.state_signal = None
56
57 def call_result_cb(self, state):
58 """Return the state thru the result callback."""
59 if self.state_signal:
60 self.state_signal.remove()
61 self.result_cb(state)
62
63 def got_state(self, state):
64 """Called by DBus when the state is retrieved from NM."""
65 if state == NM_STATE_CONNECTED:
66 self.call_result_cb(ONLINE)
67 elif state == NM_STATE_CONNECTING:
68 logger.debug("Currently connecting, waiting for signal")
69 else:
70 self.call_result_cb(OFFLINE)
71
72 def got_error(self, error):
73 """Called by DBus when the state is retrieved from NM."""
74 if isinstance(error, self.dbus.exceptions.DBusException) and \
75 error.get_dbus_name() == DBUS_UNKNOWN_SERVICE:
76 logger.debug("Network Manager not present")
77 self.call_result_cb(UNKNOWN)
78 else:
79 logger.error("Error contacting NetworkManager: %s" % \
80 str(error))
81 self.call_result_cb(UNKNOWN)
82
83 def state_changed(self, state):
84 """Called when a signal is emmited by Network Manager."""
85 if int(state) == NM_STATE_CONNECTED:
86 self.call_result_cb(ONLINE)
87 elif int(state) == NM_STATE_DISCONNECTED:
88 self.call_result_cb(OFFLINE)
89 else:
90 logger.debug("Not yet connected: continuing to wait")
91
92 def find_online_state(self):
93 """Get the network state and return it thru the set callback."""
94 try:
95 sysbus = self.dbus.SystemBus()
96 nm_proxy = sysbus.get_object(NM_DBUS_INTERFACE,
97 NM_DBUS_OBJECTPATH,
98 follow_name_owner_changes=True)
99 nm_if = self.dbus.Interface(nm_proxy, NM_DBUS_INTERFACE)
100 self.state_signal = nm_if.connect_to_signal(
101 signal_name="StateChanged",
102 handler_function=self.state_changed,
103 dbus_interface=NM_DBUS_INTERFACE)
104 nm_proxy.Get(NM_DBUS_INTERFACE, "State",
105 reply_handler=self.got_state,
106 error_handler=self.got_error)
107 except Exception, e: # pylint: disable=W0703
108 self.got_error(e)
0109
=== added directory 'ubuntu_sso/networkstate/tests'
=== added file 'ubuntu_sso/networkstate/tests/__init__.py'
--- ubuntu_sso/networkstate/tests/__init__.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/networkstate/tests/__init__.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,17 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Test the different networkstatus implementations."""
018
=== added file 'ubuntu_sso/networkstate/tests/test_linux.py'
--- ubuntu_sso/networkstate/tests/test_linux.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/networkstate/tests/test_linux.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,181 @@
1# -*- coding: utf-8 -*-
2#
3# test_networkstate - tests for ubuntu_sso.networkstate
4#
5# Author: Alejandro J. Cura <alecu@canonical.com>
6#
7# Copyright 2010 Canonical Ltd.
8#
9# This program is free software: you can redistribute it and/or modify it
10# under the terms of the GNU General Public License version 3, as published
11# by the Free Software Foundation.
12#
13# This program is distributed in the hope that it will be useful, but
14# WITHOUT ANY WARRANTY; without even the implied warranties of
15# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
16# PURPOSE. See the GNU General Public License for more details.
17#
18# You should have received a copy of the GNU General Public License along
19# with this program. If not, see <http://www.gnu.org/licenses/>.
20"""Tests for the network state detection code."""
21
22from ubuntu_sso.networkstate import (NetworkManagerState,
23 ONLINE, OFFLINE, UNKNOWN)
24from ubuntu_sso.networkstate.linux import (DBUS_UNKNOWN_SERVICE,
25 NM_STATE_DISCONNECTED,
26 NM_STATE_CONNECTING,
27 NM_STATE_CONNECTED)
28
29from mocker import ARGS, KWARGS, ANY, MockerTestCase
30
31
32class TestException(Exception):
33 """An exception to test error conditions."""
34 def get_dbus_name(self):
35 """A fake dbus name for this exception."""
36 return "Test Exception Message"
37
38
39class TestNmNotAvailableException(Exception):
40 """An exception to test unavailability conditions."""
41 def get_dbus_name(self):
42 """The real name of the dbus error when NM is not running."""
43 return DBUS_UNKNOWN_SERVICE
44
45
46class NetworkManagerStateTestCase(MockerTestCase):
47 """Test NetworkManager state retrieval code."""
48
49 def setUp(self):
50 """Setup the mocker dbus object tree."""
51 self.dbusmock = self.mocker.mock()
52 self.dbusmock.SystemBus()
53 sysbusmock = self.mocker.mock()
54 self.mocker.result(sysbusmock)
55
56 sysbusmock.get_object(ARGS, KWARGS)
57 proxymock = self.mocker.mock()
58 self.mocker.result(proxymock)
59
60 self.dbusmock.Interface(proxymock, ANY)
61 ifmock = self.mocker.mock()
62 self.mocker.result(ifmock)
63
64 ifmock.connect_to_signal(ARGS, KWARGS)
65 signalmock = self.mocker.mock()
66 self.mocker.result(signalmock)
67
68 proxymock.Get(ARGS, KWARGS)
69 signalmock.remove()
70
71 self.mocker.replay()
72
73 def test_nm_online(self):
74 """Check the connected case."""
75
76 def got_state_cb(state):
77 """State was given."""
78 self.assertEquals(state, ONLINE)
79
80 nms = NetworkManagerState(got_state_cb, self.dbusmock)
81 nms.find_online_state()
82 nms.got_state(NM_STATE_CONNECTED)
83
84 def test_nm_offline(self):
85 """Check the disconnected case."""
86
87 def got_state_cb(state):
88 """State was given."""
89 self.assertEquals(state, OFFLINE)
90
91 nms = NetworkManagerState(got_state_cb, self.dbusmock)
92 nms.find_online_state()
93 nms.got_state(NM_STATE_DISCONNECTED)
94
95 def test_nm_connecting_then_online(self):
96 """Check the waiting for connection case."""
97
98 def got_state_cb(state):
99 """State was given."""
100 self.assertEquals(state, ONLINE)
101
102 nms = NetworkManagerState(got_state_cb, self.dbusmock)
103 nms.find_online_state()
104 nms.got_state(NM_STATE_CONNECTING)
105 nms.state_changed(NM_STATE_CONNECTED)
106
107 def test_nm_connecting_then_offline(self):
108 """Check the waiting but fail case."""
109
110 def got_state_cb(state):
111 """State was given."""
112 self.assertEquals(state, OFFLINE)
113
114 nms = NetworkManagerState(got_state_cb, self.dbusmock)
115 nms.find_online_state()
116 nms.got_state(NM_STATE_CONNECTING)
117 nms.state_changed(NM_STATE_DISCONNECTED)
118
119
120class NetworkManagerStateErrorsTestCase(MockerTestCase):
121 """Test NetworkManager state retrieval code."""
122
123 # Statement seems to have no effect
124 # pylint: disable=W0104
125
126 def setUp(self):
127 """Setup the mocker dbus object tree."""
128 self.dbusmock = self.mocker.mock()
129 self.dbusmock.SystemBus()
130 self.sysbusmock = self.mocker.mock()
131 self.mocker.result(self.sysbusmock)
132 self.sysbusmock.get_object(ARGS, KWARGS)
133
134 def mock_except_while_getting_proxy(self, exc):
135 """Simulate an exception while getting the DBus proxy object."""
136 self.mocker.throw(exc)
137 self.dbusmock.exceptions.DBusException
138 self.mocker.result(exc)
139 self.mocker.replay()
140
141 def mock_dbus_error_while_getting_state(self, exc):
142 """Simulate an exception while getting the State."""
143 proxymock = self.mocker.mock()
144 self.mocker.result(proxymock)
145
146 self.dbusmock.Interface(proxymock, ANY)
147 ifmock = self.mocker.mock()
148 self.mocker.result(ifmock)
149
150 ifmock.connect_to_signal(ARGS, KWARGS)
151 signalmock = self.mocker.mock()
152 self.mocker.result(signalmock)
153
154 proxymock.Get(ARGS, KWARGS)
155 self.dbusmock.exceptions.DBusException
156 self.mocker.result(exc)
157 signalmock.remove()
158 self.mocker.replay()
159
160 def test_nm_not_running(self):
161 """Check the case when NM is not running."""
162
163 def got_state_cb(state):
164 """State was given."""
165 self.assertEquals(state, UNKNOWN)
166
167 self.mock_dbus_error_while_getting_state(TestNmNotAvailableException)
168 nms = NetworkManagerState(got_state_cb, self.dbusmock)
169 nms.find_online_state()
170 nms.got_error(TestNmNotAvailableException())
171
172 def test_dbus_problem(self):
173 """Check the case when DBus throws some other exception."""
174
175 def got_state_cb(state):
176 """State was given."""
177 self.assertEquals(state, UNKNOWN)
178
179 self.mock_except_while_getting_proxy(TestException)
180 nms = NetworkManagerState(got_state_cb, self.dbusmock)
181 nms.find_online_state()
0182
=== added file 'ubuntu_sso/networkstate/tests/test_windows.py'
--- ubuntu_sso/networkstate/tests/test_windows.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/networkstate/tests/test_windows.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,119 @@
1# -*- coding: utf-8 -*-
2#
3# Author: Manuel de la Pena<manuel@canonical.com>
4#
5# Copyright 2011 Canonical Ltd.
6#
7# This program is free software: you can redistribute it and/or modify it
8# under the terms of the GNU General Public License version 3, as published
9# by the Free Software Foundation.
10#
11# This program is distributed in the hope that it will be useful, but
12# WITHOUT ANY WARRANTY; without even the implied warranties of
13# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14# PURPOSE. See the GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License along
17# with this program. If not, see <http://www.gnu.org/licenses/>.
18"""Tests for the network manager."""
19from mocker import MockerTestCase
20from ubuntu_sso.networkstate.windows import (
21 NetworkManager,
22 NetworkManagerState,
23 ONLINE,
24 OFFLINE)
25
26
27class TestNetworkManager(MockerTestCase):
28 """Test he Network Manager."""
29
30 def setUp(self):
31 super(TestNetworkManager, self).setUp()
32 self.connection_info = self.mocker.mock()
33 self.connection_no_info = self.mocker.mock()
34 self.disconnected = self.mocker.mock()
35 self.manager = NetworkManager(self.connection_no_info,
36 self.connection_info, self.disconnected)
37
38 def test_connection_made(self):
39 """Ensure db is called."""
40 self.connection_info()
41 self.mocker.replay()
42 self.manager.ConnectionMade()
43
44 def test_connection_made_no_cb(self):
45 """Ensure db is called."""
46 self.manager.connected_cb_info = None
47 self.mocker.replay()
48 self.manager.ConnectionMade()
49
50 def test_connection_made_no_info(self):
51 """Ensure db is called."""
52 self.connection_no_info()
53 self.mocker.replay()
54 self.manager.ConnectionMadeNoQOCInfo()
55
56 def test_connection_made_no_info_no_cb(self):
57 """Ensure db is called."""
58 self.manager.connected_cb = None
59 self.mocker.replay()
60 self.manager.ConnectionMadeNoQOCInfo()
61
62 def test_disconnection(self):
63 """Ensure db is called."""
64 self.disconnected()
65 self.mocker.replay()
66 self.manager.ConnectionLost()
67
68 def test_disconnection_no_cb(self):
69 """Ensure db is called."""
70 self.manager.disconnected_cb = None
71 self.mocker.replay()
72 self.manager.ConnectionLost()
73
74
75class TestNetworkManagerState(MockerTestCase):
76 """Test he Network Manager State."""
77
78 def setUp(self):
79 super(TestNetworkManagerState, self).setUp()
80 self.network_manager = self.mocker.mock()
81 self.is_connected = self.mocker.replace(
82 'ubuntu_sso.networkstate.windows.is_machine_connected')
83 self.thread = self.mocker.mock()
84 self.cb = self.mocker.mock()
85 self.state = NetworkManagerState(self.cb)
86
87 def test_connection_made(self):
88 """Test that the cb is actually called."""
89 self.cb(ONLINE)
90 self.mocker.replay()
91 self.state.connection_made()
92
93 def test_connection_lost(self):
94 """Test that the cb is actually called."""
95 self.cb(OFFLINE)
96 self.mocker.replay()
97 self.state.connection_lost()
98
99 def test_find_online_state_not_connected(self):
100 """Test that we do find the online state correctly."""
101 self.is_connected()
102 self.mocker.result(False)
103 self.cb(OFFLINE)
104 self.mocker.result(self.thread)
105 self.thread.start()
106 self.mocker.replay()
107 self.state.find_online_state(listener=self.network_manager,
108 listener_thread=self.thread)
109
110 def test_find_online_state_connected(self):
111 """Test that we do find the online state correctly."""
112 self.is_connected()
113 self.mocker.result(ONLINE)
114 self.cb(ONLINE)
115 self.mocker.result(self.thread)
116 self.thread.start()
117 self.mocker.replay()
118 self.state.find_online_state(listener=self.network_manager,
119 listener_thread=self.thread)
0120
=== added file 'ubuntu_sso/networkstate/windows.py'
--- ubuntu_sso/networkstate/windows.py 1970-01-01 00:00:00 +0000
+++ ubuntu_sso/networkstate/windows.py 2011-03-23 14:22:30 +0000
@@ -0,0 +1,199 @@
1# -*- coding: utf-8 -*-
2# Author: Manuel de la Pena <manuel@canonical.com>
3#
4# Copyright 2011 Canonical Ltd.
5#
6# This program is free software: you can redistribute it and/or modify it
7# under the terms of the GNU General Public License version 3, as published
8# by the Free Software Foundation.
9#
10# This program is distributed in the hope that it will be useful, but
11# WITHOUT ANY WARRANTY; without even the implied warranties of
12# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
13# PURPOSE. See the GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License along
16# with this program. If not, see <http://www.gnu.org/licenses/>.
17"""Network status implementation on Windows."""
18
19
20# pylint: disable=F0401
21# distutils-extra: ignore-import=pythoncom,win32com.server.policy
22# distutils-extra: ignore-import=win32com.client
23import pythoncom
24# pylint: enable=F0401
25from ctypes import windll, byref
26from ctypes.wintypes import DWORD
27from threading import Thread
28# pylint: disable=F0401
29from win32com.server.policy import DesignatedWrapPolicy
30from win32com.client import Dispatch
31# pylint: enable=F0401
32
33from ubuntu_sso.logger import setup_logging
34
35logger = setup_logging("ubuntu_sso.networkstate")
36
37# naming errors are deliberated because we are following the COM naming to make
38# it clear for later developers.
39# pylint: disable=C0103
40
41# Values returned by the callback
42ONLINE, OFFLINE, UNKNOWN = object(), object(), object()
43
44## from EventSys.h
45PROGID_EventSystem = "EventSystem.EventSystem"
46PROGID_EventSubscription = "EventSystem.EventSubscription"
47
48# SENS (System Event Notification Service) values for the events,
49# this events contain the uuid of the event, the name of the event to be used
50# as well as the method name of the method in the ISesNetwork interface that
51# will be executed for the event.
52# For more info look at:
53# http://msdn.microsoft.com/en-us/library/aa377384(v=vs.85).aspx
54
55SUBSCRIPTION_NETALIVE = ('{cd1dcbd6-a14d-4823-a0d2-8473afde360f}',
56 'UbuntuOne Network Alive',
57 'ConnectionMade')
58
59SUBSCRIPTION_NETALIVE_NOQOC = ('{a82f0e80-1305-400c-ba56-375ae04264a1}',
60 'UbuntuOne Net Alive No Info',
61 'ConnectionMadeNoQOCInfo')
62
63SUBSCRIPTION_NETLOST = ('{45233130-b6c3-44fb-a6af-487c47cee611}',
64 'UbuntuOne Network Lost',
65 'ConnectionLost')
66
67SUBSCRIPTION_REACH = ('{4c6b2afa-3235-4185-8558-57a7a922ac7b}',
68 'UbuntuOne Network Reach',
69 'ConnectionMade')
70
71SUBSCRIPTION_REACH_NOQOC = ('{db62fa23-4c3e-47a3-aef2-b843016177cf}',
72 'UbuntuOne Network Reach No Info',
73 'ConnectionMadeNoQOCInfo')
74
75SUBSCRIPTION_REACH_NOQOC2 = ('{d4d8097a-60c6-440d-a6da-918b619ae4b7}',
76 'UbuntuOne Network Reach No Info 2',
77 'ConnectionMadeNoQOCInfo')
78
79SUBSCRIPTIONS = [SUBSCRIPTION_NETALIVE,
80 SUBSCRIPTION_NETALIVE_NOQOC,
81 SUBSCRIPTION_NETLOST,
82 SUBSCRIPTION_REACH,
83 SUBSCRIPTION_REACH_NOQOC,
84 SUBSCRIPTION_REACH_NOQOC2]
85
86SENSGUID_EVENTCLASS_NETWORK = '{d5978620-5b9f-11d1-8dd2-00aa004abd5e}'
87SENSGUID_PUBLISHER = "{5fee1bd6-5b9b-11d1-8dd2-00aa004abd5e}"
88
89# uuid of the implemented com interface
90IID_ISesNetwork = '{d597bab1-5b9f-11d1-8dd2-00aa004abd5e}'
91
92
93class NetworkManager(DesignatedWrapPolicy):
94 """Implement ISesNetwork to know about the network status."""
95
96 _com_interfaces_ = [IID_ISesNetwork]
97 _public_methods_ = ['ConnectionMade',
98 'ConnectionMadeNoQOCInfo',
99 'ConnectionLost']
100 _reg_clsid_ = '{41B032DA-86B5-4907-A7F7-958E59333010}'
101 _reg_progid_ = "UbuntuOne.NetworkManager"
102
103 def __init__(self, connected_cb=None, connected_cb_info=None,
104 disconnected_cb=None):
105 # pylint: disable=E1101
106 self._wrap_(self)
107 # pylint: enable=E1101
108 self.connected_cb = connected_cb
109 self.connected_cb_info = connected_cb_info
110 self.disconnected_cb = disconnected_cb
111
112 def ConnectionMade(self, *args):
113 """Tell that the connection is up again."""
114 logger.info('Connection was made.')
115 if self.connected_cb_info:
116 self.connected_cb_info()
117
118 def ConnectionMadeNoQOCInfo(self, *args):
119 """Tell that the connection is up again."""
120 logger.info('Connection was made no info.')
121 if self.connected_cb:
122 self.connected_cb()
123
124 def ConnectionLost(self, *args):
125 """Tell the connection was lost."""
126 logger.info('Connection was lost.')
127 if self.disconnected_cb:
128 self.disconnected_cb()
129
130 def register(self):
131 """Register to listen to network events."""
132 # call the CoInitialize to allow the registration to run in another
133 # thread
134 pythoncom.CoInitialize()
135 # interface to be used by com
136 manager_interface = pythoncom.WrapObject(self)
137 event_system = Dispatch(PROGID_EventSystem)
138 # register to listen to each of the events to make sure that
139 # the code will work on all platforms.
140 for current_event in SUBSCRIPTIONS:
141 # create an event subscription and add it to the event
142 # service
143 event_subscription = Dispatch(PROGID_EventSubscription)
144 event_subscription.EventClassId = SENSGUID_EVENTCLASS_NETWORK
145 event_subscription.PublisherID = SENSGUID_PUBLISHER
146 event_subscription.SubscriptionID = current_event[0]
147 event_subscription.SubscriptionName = current_event[1]
148 event_subscription.MethodName = current_event[2]
149 event_subscription.SubscriberInterface = manager_interface
150 event_subscription.PerUser = True
151 # store the event
152 try:
153 event_system.Store(PROGID_EventSubscription,
154 event_subscription)
155 except pythoncom.com_error as e:
156 logger.error(
157 'Error registering %s to event %s', e, current_event[1])
158
159 pythoncom.PumpMessages()
160
161
162def is_machine_connected():
163 """Return if the machine is connected to the internet."""
164 wininet = windll.wininet
165 flags = DWORD()
166 connected = wininet.InternetGetConnectedState(byref(flags), None)
167 return connected == 1
168
169
170class NetworkManagerState(object):
171 """Check for status changed in the network on Windows."""
172
173 def __init__(self, result_cb, **kwargs):
174 """Initialize this instance with a result and error callbacks."""
175 self.result_cb = result_cb
176
177 def connection_made(self):
178 """Return the connection state over the call back."""
179 self.result_cb(ONLINE)
180
181 def connection_lost(self):
182 """Return the connection was lost over the call back."""
183 self.result_cb(OFFLINE)
184
The diff has been truncated for viewing.

Subscribers

People subscribed via source and target branches