Merge lp:~chipaca/ubuntuone-client/multicon into lp:ubuntuone-client

Proposed by John Lenton
Status: Merged
Approved by: dobey
Approved revision: 202
Merged at revision: not available
Proposed branch: lp:~chipaca/ubuntuone-client/multicon
Merge into: lp:ubuntuone-client
Diff against target: None lines
To merge this branch: bzr merge lp:~chipaca/ubuntuone-client/multicon
Reviewer Review Type Date Requested Status
Rick McBride (community) Approve
dobey (community) Approve
Nicola Larosa (community) Approve
Review via email: mp+10998@code.launchpad.net

Commit message

detect and avoid multiple connections that break the single-minded state machine

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

This should avoid the multiple simultaneous live connections issue that is breaking things for a lot of people.

202. By John Lenton

moved server rescan to action queue too, for symmetry

Revision history for this message
Nicola Larosa (teknico) wrote :

Code looks good, tests pass.

review: Approve
Revision history for this message
dobey (dobey) :
review: Approve
Revision history for this message
Rick McBride (rmcbride) wrote :

Looks good

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'ubuntuone/syncdaemon/action_queue.py'
2--- ubuntuone/syncdaemon/action_queue.py 2009-08-31 15:41:20 +0000
3+++ ubuntuone/syncdaemon/action_queue.py 2009-09-01 18:51:00 +0000
4@@ -28,6 +28,7 @@
5 import os
6 import random
7 import tempfile
8+import traceback
9 import zlib
10
11 from zope.interface import implements
12@@ -213,7 +214,7 @@
13 might not be described in failure).
14 """
15 logger.warning('connection lost: %s' % reason.getErrorMessage())
16- self.factory.event_queue.push('SYS_CONNECTION_LOST')
17+ self.factory.connectionLost(self, reason)
18 LoggingStorageClient.connectionLost(self, reason)
19
20
21@@ -721,6 +722,14 @@
22
23 event_queue.subscribe(self)
24
25+ def connectionLost(self, protocol, reason):
26+ """
27+ The connection went down, for some reason (which might or
28+ might not be described in reason).
29+ """
30+ if protocol is self.client:
31+ self.event_queue.push('SYS_CONNECTION_LOST')
32+
33 def check_conditions(self):
34 """Poll conditions on which running actions may be waiting."""
35 self.content_queue.check_conditions()
36@@ -837,6 +846,7 @@
37 """
38 Start the circus going.
39 """
40+ self.client = None
41 self.deferred = defer.Deferred()
42 d = self._lookup_srv()
43 def _connect(result):
44@@ -850,12 +860,116 @@
45 d.addCallback(_connect)
46 return self.deferred
47
48+ def disconnect(self):
49+ """
50+ Shut down the client, if it isn't already.
51+ """
52+ if self.client is not None:
53+ self.client.disconnect()
54+ self.client = None
55+
56 def conectionFailed(self, reason=None):
57 """
58 Called when the connect() call fails
59 """
60 self.deferred.errback(reason)
61
62+ @defer.inlineCallbacks
63+ def check_version(self):
64+ """
65+ Check the client protocol version matches that of the
66+ server. Call callback on success, errback on failure.
67+ """
68+ # if the client changes while we're waiting, this message is
69+ # old news and should be discarded (the message would
70+ # typically be a failure: timeout or disconnect). So keep the
71+ # original client around for comparison.
72+ client = self.client
73+ try:
74+ yield client.protocol_version()
75+ except Exception, e:
76+ if client is self.client:
77+ logger.error("Protocol version error")
78+ logger.debug('traceback follows:\n\n' + traceback.format_exc())
79+ if e.message == 'UNSUPPORTED_VERSION':
80+ self.event_queue.push('SYS_PROTOCOL_VERSION_ERROR',
81+ error=e.message)
82+ else:
83+ self.event_queue.push('SYS_UNKNOWN_ERROR')
84+ # it looks like we won't be authenticating, so hook up the
85+ # for-testing deferred now
86+ self.deferred.callback(Failure(e))
87+ else:
88+ if client is self.client:
89+ logger.info("Protocol version OK")
90+ self.event_queue.push('SYS_PROTOCOL_VERSION_OK')
91+
92+ @defer.inlineCallbacks
93+ def set_capabilities(self, caps):
94+ """Set the capabilities with the server"""
95+ client = self.client
96+ is_failed = None
97+ try:
98+ req = (yield client.query_caps(caps))
99+ except Exception, e:
100+ is_failed = e
101+ else:
102+ if not req.accepted:
103+ is_failed = StandardError("The server doesn't have"
104+ " the requested capabilities")
105+
106+ if client is self.client:
107+ if is_failed is not None:
108+ logger.error("Capabilities query failed: %s" % is_failed)
109+ # Push the error to the event queue,
110+ self.event_queue.push('SYS_SET_CAPABILITIES_ERROR',
111+ error=is_failed.message)
112+ # it looks like we won't be authenticating, so hook up the
113+ # for-testing deferred now
114+ self.deferred.callback(Failure(is_failed))
115+ return
116+
117+ is_failed = None
118+ try:
119+ req = (yield client.set_caps(caps))
120+ except Exception, e:
121+ if client is self.client:
122+ is_failed = e
123+ else:
124+ if client is self.client:
125+ if req.accepted:
126+ self.event_queue.push('SYS_SET_CAPABILITIES_OK')
127+ defer.returnValue(self.client)
128+ else:
129+ is_failed = StandardError("The server denied setting"
130+ " %s capabilities" % req.caps)
131+ if is_failed is not None:
132+ logger.error("Capabilities set failed: %s" % is_failed)
133+ # Push the error to the event queue,
134+ self.event_queue.push('SYS_SET_CAPABILITIES_ERROR',
135+ error=is_failed.message)
136+ # it looks like we won't be authenticating, so hook up the
137+ # for-testing deferred now
138+ self.deferred.callback(Failure(is_failed))
139+
140+ @defer.inlineCallbacks
141+ def authenticate(self, oauth_consumer):
142+ client = self.client
143+ try:
144+ yield client.oauth_authenticate(oauth_consumer, self.token)
145+ except Exception, e:
146+ if client is not self.client:
147+ return
148+ logger.error("OAuth failed: %s", e.message)
149+ self.event_queue.push('SYS_OAUTH_ERROR', error=e.message)
150+ self.deferred.callback(Failure(e))
151+ else:
152+ if client is not self.client:
153+ return
154+ logger.info("Oauth OK")
155+ self.event_queue.push('SYS_OAUTH_OK')
156+ self.deferred.callback(client)
157+
158 def get_root(self, marker):
159 """
160 Get the user's root uuid. Use the uuid_map, so the caller can
161
162=== modified file 'ubuntuone/syncdaemon/main.py'
163--- ubuntuone/syncdaemon/main.py 2009-08-27 18:44:53 +0000
164+++ ubuntuone/syncdaemon/main.py 2009-09-01 18:41:31 +0000
165@@ -252,106 +252,28 @@
166 """
167 Check the client protocol version matches that of the server.
168 """
169- d = self.action_q.client.protocol_version()
170- def protocol_callback(_):
171- """protocol check was OK"""
172- self.logger.info("Protocol version OK")
173- self.event_q.push('SYS_PROTOCOL_VERSION_OK')
174- def protocol_errback(failure):
175- """protocol check was *not* OK"""
176- self.logger.error("Protocol version error")
177- self.logger.debug('traceback follows:\n\n' + failure.getTraceback())
178- if failure.value.message == 'UNSUPPORTED_VERSION':
179- self.event_q.push('SYS_PROTOCOL_VERSION_ERROR',
180- error=failure.getErrorMessage())
181- else:
182- self.event_q.push('SYS_UNKNOWN_ERROR')
183- # it looks like we won't be authenticating, so hook up the
184- # for-testing action_queue deferred now
185- d.chainDeferred(self.action_q.deferred)
186- return failure
187- d.addCallbacks(protocol_callback, protocol_errback)
188+ self.action_q.check_version()
189
190 def authenticate(self):
191 """
192 Do the OAuth dance.
193 """
194- d = self.action_q.client.oauth_authenticate(self.oauth_client.consumer,
195- self.action_q.token)
196- def oauth_errback(failure):
197- "OAuth failed"
198- self.logger.error("OAuth failed: %s", failure)
199- self.event_q.push('SYS_OAUTH_ERROR', error=failure)
200- return failure
201- def oauth_callback(a):
202- "OAuth succeeded"
203- self.logger.info("Oauth OK")
204- self.event_q.push('SYS_OAUTH_OK')
205- return self.action_q.client
206- d.addCallbacks(oauth_callback, oauth_errback)
207- d.chainDeferred(self.action_q.deferred)
208+ self.action_q.authenticate(self.oauth_client.consumer)
209
210 def set_capabilities(self):
211 """Set the capabilities with the server"""
212 self.logger.debug("capabilities query: %r", syncdaemon.REQUIRED_CAPS)
213- d = self.action_q.client.query_caps(syncdaemon.REQUIRED_CAPS)
214-
215- def set_caps_callback(req):
216- "Caps query succeeded"
217- if req.accepted:
218- self.event_q.push('SYS_SET_CAPABILITIES_OK')
219- return self.action_q.client
220- else:
221- de = defer.fail("The server denied setting %s capabilities" % \
222- req.caps)
223- de.addErrback(set_caps_errback)
224- return de
225-
226- def set_caps_errback(failure):
227- "Caps set failed"
228- self.logger.error("Capabilities set failed: %s", failure)
229- self.event_q.push('SYS_SET_CAPABILITIES_ERROR',
230- error=failure.getErrorMessage())
231- # it looks like we won't be authenticating, so hook up the
232- # for-testing action_queue deferred now
233- d.chainDeferred(self.action_q.deferred)
234- return failure
235-
236- def query_caps_callback(req):
237- "Caps query succeeded"
238- self.logger.debug("Capabilities query accepted: %r", req.accepted)
239- if req.accepted:
240- set_d = self.action_q.client.set_caps(syncdaemon.REQUIRED_CAPS)
241- set_d.addCallbacks(set_caps_callback, set_caps_errback)
242- return set_d
243- else:
244- # the server don't have the requested capabilities.
245- # Add the query_errback and return a failure,
246- # in the future we might want to reconnect to another server
247- de = defer.fail("The server don't have the requested"
248- " capabilities")
249- de.addErrback(query_caps_errback)
250- return de
251-
252- def query_caps_errback(failure):
253- "Caps query failed"
254- self.logger.error("Capabilities query failed: %s", failure)
255- # Push the error to the event queue,
256- self.event_q.push('SYS_SET_CAPABILITIES_ERROR',
257- error=failure.getErrorMessage())
258- # it looks like we won't be authenticating, so hook up the
259- # for-testing action_queue deferred now
260- d.chainDeferred(self.action_q.deferred)
261- return failure
262-
263- d.addCallbacks(query_caps_callback, query_caps_errback)
264+ self.action_q.set_capabilities(syncdaemon.REQUIRED_CAPS)
265
266 @defer.inlineCallbacks
267 def server_rescan(self):
268 """
269 Do the server rescan
270 """
271+ client = self.action_q.client
272 yield self.get_root(object())
273+ if client is not self.action_q.client:
274+ return
275 self.event_q.push('SYS_SERVER_RESCAN_STARTING')
276 data = self.fs.get_data_for_server_rescan()
277 self.logger.info("server rescan: will query %d objects" % len(data))
278@@ -362,6 +284,8 @@
279 % (share or '/root/', node, hash))
280 self.logger.debug("server rescan: all data shown")
281 yield self.action_q.client.query(data)
282+ if client is not self.action_q.client:
283+ return
284 self.logger.info("server rescan: done")
285 self.event_q.push('SYS_SERVER_RESCAN_DONE')
286
287
288=== modified file 'ubuntuone/syncdaemon/state.py'
289--- ubuntuone/syncdaemon/state.py 2009-08-21 13:44:34 +0000
290+++ ubuntuone/syncdaemon/state.py 2009-08-28 20:46:35 +0000
291@@ -95,9 +95,7 @@
292 """
293 self.watchdog = None
294 self.num_timeouts += 1
295- client = self.main.action_q.client
296- if client is not None:
297- client.disconnect()
298+ self.main.action_q.disconnect()
299 self.main.event_q.push('SYS_HANDSHAKE_TIMEOUT')
300
301 @property

Subscribers

People subscribed via source and target branches