Merge lp:~facundo/ubuntuone-client/aqc_errors into lp:ubuntuone-client
- aqc_errors
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Facundo Batista | ||||
Approved revision: | 484 | ||||
Merged at revision: | not available | ||||
Proposed branch: | lp:~facundo/ubuntuone-client/aqc_errors | ||||
Merge into: | lp:ubuntuone-client | ||||
Diff against target: |
612 lines (+209/-118) 2 files modified
tests/syncdaemon/test_action_queue.py (+141/-9) ubuntuone/syncdaemon/action_queue.py (+68/-109) |
||||
To merge this branch: | bzr merge lp:~facundo/ubuntuone-client/aqc_errors | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Natalia Bidart (community) | Approve | ||
Review via email: mp+23132@code.launchpad.net |
Commit message
Make ActionQueueCommand to handle errors by their type, not message.
Description of the change
Make ActionQueueCommand to handle errors by their type, not message.
The same for a couple of specific commands.
And fixed a detail that made ACQ to *never* retry an operation on a retryable error.
Tests included.
John Lenton (chipaca) wrote : | # |
Is this a fix for something that is going into Lucid? If so, please link it. If not, I'm afraid it would be a lot easier if we merge it after the freeze.
John Lenton (chipaca) wrote : | # |
OK, brown paper bag for me. I promise I *looked* for the bugs.
Getting more coffee, now.
John Lenton (chipaca) : | # |
John Lenton (chipaca) wrote : | # |
The failure.check thing is much better than what it was doing before. So obviously the right thing to do, now that I see it :)
The change from strings to actual classes is also great.
dobey (dobey) wrote : | # |
Attempt to merge lp:~facundo/ubuntuone-client/aqc_errors into lp:ubuntuone-client failed due to merge conflicts:
text conflict in tests/syncdaemo
text conflict in ubuntuone/
Facundo Batista (facundo) wrote : | # |
Solved the conflicts!
John Lenton (chipaca) wrote : | # |
The attempt to merge lp:~facundo/ubuntuone-client/aqc_errors into lp:ubuntuone-client failed.Below is the output from the failed tests.
/usr/bin/
[1mchecking for autoconf >= 2.53...
[m(B testing autoconf2.50... not found.
testing autoconf... found 2.65
[1mchecking for automake >= 1.10...
[m(B testing automake-1.11... found 1.11.1
[1mchecking for libtool >= 1.5...
[m(B testing libtoolize... found 2.2.6b
[1mchecking for intltool >= 0.30...
[m(B testing intltoolize... found 0.41.0
[1mchecking for pkg-config >= 0.14.0...
[m(B testing pkg-config... found 0.22
[1mChecking for required M4 macros...
[m(B[1mChecking for forbidden M4 macros...
[m(B
[m(B[1mRunning libtoolize...
[m(Blibtoolize: putting auxiliary files in `.'.
libtoolize: copying file `./ltmain.sh'
libtoolize: putting macros in AC_CONFIG_
libtoolize: copying file `m4/libtool.m4'
libtoolize: copying file `m4/ltoptions.m4'
libtoolize: copying file `m4/ltsugar.m4'
libtoolize: copying file `m4/ltversion.m4'
libtoolize: copying file `m4/lt~obsolete.m4'
[1mRunning intltoolize...
[m(B[1mRunning aclocal-1.11...
[m(B[1mRunning autoconf...
[m(B[1mRunning autoheader...
[m(B[1mRunning automake-1.11...
[m(B[1mRunning ./configure --prefix=/usr ...
[m(Bchecking for a BSD-compatible install... /usr/bin/install -c
checking whether build environment is sane... yes
checking for a thread-safe mkdir -p... /bin/mkdir -p
checking for gawk... gawk
checking whether make sets $(MAKE)... yes
checking for style of include used by make... GNU
checking for gcc... gcc
checking whether the C compiler works... yes
checking for C compiler default output file name... a.out
checking for suffix of executables...
checking whether we are cross compiling... no
checking for suffix of object files... o
checking whether we are using the GNU C compiler... yes
checking whether gcc accepts -g... yes
checking for gcc option to accept ISO C89... none needed
checking dependency style of gcc... gcc3
checking for library containing strerror... none required
checking for gcc... (cached) gcc
checking whether we are using the GNU C compiler... (cached) yes
checking whether gcc accepts -g... (cached) yes
checking for gcc option to accept ISO C89... (cached) none needed
checking dependency style of gcc... (cached) gcc3
checking build system type... x86_64-
checking host system type... x86_64-
checking for a sed that does not truncate output... /bin/sed
checking for grep that handles long lines and -e... /bin/grep
checking for egrep... /bin/grep -E
checking for fgrep... /bin/grep -F
checking for ld used by gcc... /usr/bin/ld
checking if the linker (/usr/bin/ld) is GNU ld... yes
checking for BSD- or MS-compatible name lister (nm)... /usr/bin/nm -B
checking the name lister (/usr/bin/nm -B) interface... BSD nm
checking whether ln -s works... yes
checking the maximum length of command line arguments... 1572864
checking whether the shell understands some XSI constructs... yes
checking whether the shell understands "+="... yes
checking for /usr/bin/ld option to reload object files... -r
checking for objdu...
Facundo Batista (facundo) wrote : | # |
It seems there's a problem in your set up, there, as errors is there.
I'm setting this back to approved, to run by someone else. Chipaca, if you see this message, let's talk, :)
John Lenton (chipaca) wrote : | # |
On Mon, Apr 12, 2010 at 05:05:46PM -0000, Facundo Batista wrote:
> It seems there's a problem in your set up, there, as errors is there.
>
> I'm setting this back to approved, to run by someone else. Chipaca, if you see this message, let's talk, :)
sure thing
Preview Diff
1 | === modified file 'tests/syncdaemon/test_action_queue.py' |
2 | --- tests/syncdaemon/test_action_queue.py 2010-04-12 13:19:40 +0000 |
3 | +++ tests/syncdaemon/test_action_queue.py 2010-04-12 16:06:36 +0000 |
4 | @@ -33,6 +33,7 @@ |
5 | from functools import wraps |
6 | from StringIO import StringIO |
7 | from twisted.internet import defer, threads, reactor |
8 | +from twisted.internet import error as twisted_error |
9 | from twisted.python.failure import DefaultException, Failure |
10 | from twisted.web import server |
11 | |
12 | @@ -784,6 +785,113 @@ |
13 | self.assertTrue(res is None) |
14 | |
15 | |
16 | +class ActionQueueCommandErrors(ConnectedBaseTestCase): |
17 | + """Test the error handling in ActionQueueCommand.""" |
18 | + |
19 | + def setUp(self): |
20 | + res = super(ActionQueueCommandErrors, self).setUp() |
21 | + |
22 | + self.deferred = defer.Deferred() |
23 | + |
24 | + class MyLogger(object): |
25 | + """Fake logger that just stores error and warning calls.""" |
26 | + def __init__(self): |
27 | + self.logged = None |
28 | + |
29 | + def error(self, *a): |
30 | + """Mark that this method was called.""" |
31 | + self.logged = "error" |
32 | + |
33 | + def warn(self, *a): |
34 | + """Mark that this method was called.""" |
35 | + self.logged = "warn" |
36 | + |
37 | + def debug(self, *a): |
38 | + """Nothing.""" |
39 | + |
40 | + class MyCommand(ActionQueueCommand): |
41 | + """Inherit ACQ to provide a retry signaller and a custom log.""" |
42 | + # class-closure, cannot use self, pylint: disable-msg=E0213 |
43 | + def __init__(innerself, request_queue): |
44 | + super(MyCommand, innerself).__init__(request_queue) |
45 | + innerself.log = MyLogger() |
46 | + |
47 | + def retry(innerself): |
48 | + """Signal the retry.""" |
49 | + self.deferred.callback(True) |
50 | + |
51 | + self.rq = RequestQueue(name='foo', action_queue=self.action_queue) |
52 | + self.command = MyCommand(self.rq) |
53 | + return res |
54 | + |
55 | + def test_suppressed_yes_knownerrors(self): |
56 | + """Check that the log is in warning for the known errors.""" |
57 | + def send_failure_and_check(errnum, exception_class): |
58 | + """Send the failure.""" |
59 | + # prepare what to send |
60 | + protocol_msg = protocol_pb2.Message() |
61 | + protocol_msg.type = protocol_pb2.Message.ERROR |
62 | + protocol_msg.error.type = errnum |
63 | + err = exception_class("request", protocol_msg) |
64 | + |
65 | + # set up and test |
66 | + self.command.log.logged = None |
67 | + self.command.end_errback(failure=Failure(err)) |
68 | + self.assertEqual(self.command.log.logged, "warn", |
69 | + "Bad log in exception %s" % (exception_class,)) |
70 | + |
71 | + known_errors = [x for x in errors._error_mapping.items() |
72 | + if x[1] != errors.InternalError] |
73 | + for errnum, exception_class in known_errors: |
74 | + send_failure_and_check(errnum, exception_class) |
75 | + |
76 | + def test_suppressed_no_internalerror(self): |
77 | + """Check that the log is in error for InternalError.""" |
78 | + # prepare what to send |
79 | + protocol_msg = protocol_pb2.Message() |
80 | + protocol_msg.type = protocol_pb2.Message.ERROR |
81 | + protocol_msg.error.type = protocol_pb2.Error.INTERNAL_ERROR |
82 | + err = errors.InternalError("request", protocol_msg) |
83 | + |
84 | + # set up and test |
85 | + self.command.end_errback(failure=Failure(err)) |
86 | + self.assertEqual(self.command.log.logged, "error") |
87 | + |
88 | + def test_suppressed_yes_cancelled(self): |
89 | + """Check that the log is in warning for Cancelled.""" |
90 | + err = errors.RequestCancelledError("CANCELLED") |
91 | + self.command.end_errback(failure=Failure(err)) |
92 | + self.assertEqual(self.command.log.logged, "warn") |
93 | + |
94 | + def test_suppressed_yes_and_retry_when_connectiondone(self): |
95 | + """Check that the log is in warning and retries for ConnectionDone.""" |
96 | + self.command.running = True |
97 | + err = twisted_error.ConnectionDone() |
98 | + self.command.end_errback(failure=Failure(err)) |
99 | + self.assertEqual(self.command.log.logged, "warn") |
100 | + return self.deferred |
101 | + |
102 | + def test_retry_connectionlost(self): |
103 | + """Check that it retries when ConnectionLost.""" |
104 | + self.command.running = True |
105 | + err = twisted_error.ConnectionLost() |
106 | + self.command.end_errback(failure=Failure(err)) |
107 | + return self.deferred |
108 | + |
109 | + def test_retry_tryagain(self): |
110 | + """Check that it retries when TryAgain.""" |
111 | + # prepare what to send |
112 | + self.command.running = True |
113 | + protocol_msg = protocol_pb2.Message() |
114 | + protocol_msg.type = protocol_pb2.Message.ERROR |
115 | + protocol_msg.error.type = protocol_pb2.Error.TRY_AGAIN |
116 | + err = errors.TryAgainError("request", protocol_msg) |
117 | + |
118 | + # set up and test |
119 | + self.command.end_errback(failure=Failure(err)) |
120 | + return self.deferred |
121 | + |
122 | + |
123 | class ListVolumesTestCase(ConnectedBaseTestCase): |
124 | """Test for ListVolumes ActionQueueCommand.""" |
125 | |
126 | @@ -1171,9 +1279,8 @@ |
127 | |
128 | def test_failure_with_CANCELLED(self): |
129 | """AQ_DOWNLOAD_CANCELLED is pushed.""" |
130 | - msg = 'CANCELLED' |
131 | - failure = Failure(DefaultException(msg)) |
132 | - res = self.command.handle_failure(failure=failure) |
133 | + err = errors.RequestCancelledError("CANCELLED") |
134 | + res = self.command.handle_failure(failure=Failure(err)) |
135 | kwargs = dict(share_id='a_share_id', node_id='a_node_id', |
136 | server_hash='a_server_hash') |
137 | events = [('AQ_DOWNLOAD_CANCELLED', (), kwargs)] |
138 | @@ -1193,9 +1300,11 @@ |
139 | |
140 | def test_failure_with_DOES_NOT_EXIST(self): |
141 | """AQ_DOWNLOAD_DOES_NOT_EXIST is pushed.""" |
142 | - msg = 'DOES_NOT_EXIST' |
143 | - failure = Failure(DefaultException(msg)) |
144 | - res = self.command.handle_failure(failure=failure) |
145 | + protocol_msg = protocol_pb2.Message() |
146 | + protocol_msg.type = protocol_pb2.Message.ERROR |
147 | + protocol_msg.error.type = protocol_pb2.Error.DOES_NOT_EXIST |
148 | + err = errors.DoesNotExistError("request", protocol_msg) |
149 | + res = self.command.handle_failure(failure=Failure(err)) |
150 | kwargs = dict(share_id='a_share_id', node_id='a_node_id') |
151 | events = [('AQ_DOWNLOAD_DOES_NOT_EXIST', (), kwargs)] |
152 | self.assertEquals(events, self.command.action_queue.event_queue.events) |
153 | @@ -1224,9 +1333,10 @@ |
154 | |
155 | def setUp(self): |
156 | """Init.""" |
157 | - res = super(UploadTestCase, self).setUp() |
158 | + super(UploadTestCase, self).setUp() |
159 | |
160 | - request_queue = RequestQueue(name='FOO', action_queue=self.action_queue) |
161 | + self.rq = request_queue = RequestQueue(name='FOO', |
162 | + action_queue=self.action_queue) |
163 | self.command = Upload(request_queue, share_id='a_share_id', |
164 | node_id='a_node_id', previous_hash='prev_hash', |
165 | hash='yadda', crc32=0, size=0, |
166 | @@ -1234,7 +1344,29 @@ |
167 | tempfile_factory=lambda: None) |
168 | self.command.start_unqueued() # create the logger |
169 | |
170 | - return res |
171 | + def test_upload_in_progress(self): |
172 | + """Test Upload retries on UploadInProgress.""" |
173 | + # monkeypatching is not allowed, let's do inheritance |
174 | + d = defer.Deferred() |
175 | + class MyUpload(Upload): |
176 | + """Just to redefine retry.""" |
177 | + def retry(self): |
178 | + """Detect retry was called.""" |
179 | + d.callback(True) |
180 | + |
181 | + # set up the command |
182 | + command = MyUpload(self.rq, 'share', 'bode', 'prvhash', 'currhash', |
183 | + 0, 0, lambda: None, lambda: None) |
184 | + command.start_unqueued() # create log in the instance |
185 | + command.running = True |
186 | + |
187 | + # send the failure |
188 | + protocol_msg = protocol_pb2.Message() |
189 | + protocol_msg.type = protocol_pb2.Message.ERROR |
190 | + protocol_msg.error.type = protocol_pb2.Error.UPLOAD_IN_PROGRESS |
191 | + err = errors.UploadInProgressError("request", protocol_msg) |
192 | + command.end_errback(failure=Failure(err)) |
193 | + return d |
194 | |
195 | def test_handle_success_push_event(self): |
196 | """Test AQ_UPLOAD_FINISHED is pushed on success.""" |
197 | |
198 | === modified file 'ubuntuone/syncdaemon/action_queue.py' |
199 | --- ubuntuone/syncdaemon/action_queue.py 2010-04-12 13:19:40 +0000 |
200 | +++ ubuntuone/syncdaemon/action_queue.py 2010-04-12 16:06:36 +0000 |
201 | @@ -44,11 +44,13 @@ |
202 | |
203 | from zope.interface import implements |
204 | from twisted.internet import reactor, defer, threads |
205 | +from twisted.internet import error as twisted_errors |
206 | from twisted.names import client as dns_client |
207 | from twisted.python.failure import Failure, DefaultException |
208 | |
209 | from oauth import oauth |
210 | -from ubuntuone.storageprotocol import errors, protocol_pb2 |
211 | +from ubuntuone.storageprotocol import protocol_pb2 |
212 | +from ubuntuone.storageprotocol import errors as protocol_errors |
213 | from ubuntuone.storageprotocol.client import ( |
214 | ThrottlingStorageClient, ThrottlingStorageClientFactory |
215 | ) |
216 | @@ -913,12 +915,12 @@ |
217 | return |
218 | except request_error, failure: |
219 | self.event_queue.push(event_error, error=str(failure)) |
220 | - except errors.AuthenticationRequiredError, failure: |
221 | + except protocol_errors.AuthenticationRequiredError, failure: |
222 | # we need to separate this case from the rest because an |
223 | # AuthenticationRequiredError is an StorageRequestError, |
224 | # and we treat it differently. |
225 | self.event_queue.push('SYS_UNKNOWN_ERROR') |
226 | - except errors.StorageRequestError, failure: |
227 | + except protocol_errors.StorageRequestError, failure: |
228 | self.event_queue.push('SYS_SERVER_ERROR', error=str(failure)) |
229 | except Exception, failure: |
230 | self.event_queue.push('SYS_UNKNOWN_ERROR') |
231 | @@ -940,7 +942,7 @@ |
232 | """Check if the client protocol version matches that of the server.""" |
233 | check_version_d = self._send_request_and_handle_errors( |
234 | request=self.client.protocol_version, |
235 | - request_error=errors.UnsupportedVersionError, |
236 | + request_error=protocol_errors.UnsupportedVersionError, |
237 | event_error='SYS_PROTOCOL_VERSION_ERROR', |
238 | event_ok='SYS_PROTOCOL_VERSION_OK' |
239 | ) |
240 | @@ -986,7 +988,7 @@ |
241 | """Authenticate the client against the server using oauth_consumer.""" |
242 | authenticate_d = self._send_request_and_handle_errors( |
243 | request=self.client.oauth_authenticate, |
244 | - request_error=errors.AuthenticationFailedError, |
245 | + request_error=protocol_errors.AuthenticationFailedError, |
246 | event_error='SYS_AUTH_ERROR', event_ok='SYS_AUTH_OK', |
247 | args=(oauth_consumer, self.token) |
248 | ) |
249 | @@ -1231,21 +1233,20 @@ |
250 | class ActionQueueCommand(object): |
251 | """Base of all the action queue commands.""" |
252 | |
253 | - # protobuf doesn't seem to have much introspectionable stuff |
254 | - # without going into private attributes |
255 | + # the info used in the protocol errors is hidden, but very useful! |
256 | # pylint: disable-msg=W0212 |
257 | - known_error_messages = (set(protocol_pb2._ERROR_ERRORTYPE.values_by_name) |
258 | - | set(['CANCELLED'])) |
259 | - suppressed_error_messages = (known_error_messages |
260 | - - set(['INTERNAL_ERROR']) |
261 | - | set(['Cleaned up', |
262 | - 'Connection was closed cleanly.'])) |
263 | - retryable_errors = set([ |
264 | - 'Cleaned up', |
265 | - 'TRY_AGAIN', |
266 | - 'Connection was closed cleanly.', |
267 | - 'Connection to the other side was lost in a non-clean fashion.', |
268 | - ]) |
269 | + suppressed_error_messages = ( |
270 | + [x for x in protocol_errors._error_mapping.values() |
271 | + if x is not protocol_errors.InternalError] + |
272 | + [protocol_errors.RequestCancelledError, |
273 | + twisted_errors.ConnectionDone, RequestCleanedUp] |
274 | + ) |
275 | + |
276 | + retryable_errors = ( |
277 | + protocol_errors.TryAgainError, |
278 | + twisted_errors.ConnectionDone, |
279 | + twisted_errors.ConnectionLost, |
280 | + ) |
281 | |
282 | logged_attrs = () |
283 | |
284 | @@ -1277,12 +1278,9 @@ |
285 | |
286 | def check_conditions(self): |
287 | """Check conditions on which the command may be waiting.""" |
288 | - pass |
289 | |
290 | def demark(self, *maybe_markers): |
291 | - """ |
292 | - Arrange to have maybe_markers realized |
293 | - """ |
294 | + """Arrange to have maybe_markers realized.""" |
295 | l = [] |
296 | for marker in maybe_markers: |
297 | if IMarker.providedBy(marker): |
298 | @@ -1303,9 +1301,9 @@ |
299 | |
300 | @staticmethod |
301 | def unwrap(results): |
302 | - """ |
303 | - Unpack the values from the result of a DeferredList. If |
304 | - there's a failure, return it instead. |
305 | + """Unpack the values from the result of a DeferredList. |
306 | + |
307 | + If there's a failure, return it instead. |
308 | """ |
309 | values = [] |
310 | for result in results: |
311 | @@ -1321,9 +1319,7 @@ |
312 | return values |
313 | |
314 | def end_callback(self, arg): |
315 | - """ |
316 | - It worked! |
317 | - """ |
318 | + """It worked!""" |
319 | if self.running: |
320 | self.log.debug('success') |
321 | return self.handle_success(arg) |
322 | @@ -1331,18 +1327,17 @@ |
323 | self.log.debug('not running, so no success') |
324 | |
325 | def end_errback(self, failure): |
326 | - """ |
327 | - It failed! |
328 | - """ |
329 | + """It failed!""" |
330 | error_message = failure.getErrorMessage() |
331 | - if error_message not in self.suppressed_error_messages: |
332 | + if failure.check(*self.suppressed_error_messages): |
333 | + self.log.warn('failure', error_message) |
334 | + else: |
335 | self.log.error('failure', error_message) |
336 | self.log.debug('traceback follows:\n\n' + failure.getTraceback()) |
337 | - else: |
338 | - self.log.warn('failure', error_message) |
339 | + was_running = self.running |
340 | self.cleanup() |
341 | - if error_message in self.retryable_errors: |
342 | - if self.running: |
343 | + if failure.check(*self.retryable_errors): |
344 | + if was_running: |
345 | reactor.callLater(0.1, self.retry) |
346 | else: |
347 | return self.handle_failure(failure) |
348 | @@ -1352,37 +1347,29 @@ |
349 | self.log = self.make_logger() |
350 | |
351 | def start(self, _=None): |
352 | - """ |
353 | - Queue the command. |
354 | - """ |
355 | + """Queue the command.""" |
356 | self.start_unqueued() |
357 | self.log.debug('queueing in the %s', self._queue.name) |
358 | self._queue.queue(self) |
359 | |
360 | def cleanup(self): |
361 | - """ |
362 | - Do whatever is needed to clean up from a failure (such as stop |
363 | - producers and other such that aren't cleaned up appropriately |
364 | - on their own) |
365 | + """Do whatever is needed to clean up from a failure. |
366 | + |
367 | + For example, stop producers and others that aren't cleaned up |
368 | + appropriately on their own. |
369 | """ |
370 | self.running = False |
371 | self.log.debug('cleanup') |
372 | |
373 | def _start(self): |
374 | - """ |
375 | - Do the specialized pre-run setup |
376 | - """ |
377 | + """Do the specialized pre-run setup.""" |
378 | return defer.succeed(None) |
379 | |
380 | def store_marker_result(self, _): |
381 | - """ |
382 | - Called when all the markers are realized. |
383 | - """ |
384 | + """Called when all the markers are realized.""" |
385 | |
386 | def run(self): |
387 | - """ |
388 | - Do the deed. |
389 | - """ |
390 | + """Do the deed.""" |
391 | self.running = True |
392 | if self.start_done: |
393 | self.log.debug('retrying') |
394 | @@ -1409,37 +1396,31 @@ |
395 | return d |
396 | |
397 | def handle_success(self, success): |
398 | - """ |
399 | - Do anthing that's needed to handle success of the operation. |
400 | - """ |
401 | + """Do anthing that's needed to handle success of the operation.""" |
402 | return success |
403 | |
404 | def handle_failure(self, failure): |
405 | - """ |
406 | - Do anthing that's needed to handle failure of the operation. |
407 | + """Do anthing that's needed to handle failure of the operation. |
408 | + |
409 | Note that cancellation and TRY_AGAIN are already handled. |
410 | """ |
411 | return failure |
412 | |
413 | def cleanup_and_retry(self): |
414 | - """ |
415 | - First, cleanup; then, retry :) |
416 | - """ |
417 | + """First, cleanup; then, retry :)""" |
418 | self.log.debug('cleanup and retry') |
419 | self.cleanup() |
420 | return self.retry() |
421 | |
422 | def retry(self): |
423 | - """ |
424 | - Request cancelled or TRY_AGAIN. Well then, try again! |
425 | - """ |
426 | + """Request cancelled or TRY_AGAIN. Well then, try again!""" |
427 | self.running = False |
428 | self.log.debug('will retry') |
429 | return self._queue.queue_top(self) |
430 | |
431 | @property |
432 | def action_queue(self): |
433 | - """Returns the action queue.""" |
434 | + """Return the action queue.""" |
435 | return self._queue.action_queue |
436 | |
437 | def __str__(self, str_attrs=None): |
438 | @@ -2214,33 +2195,25 @@ |
439 | self.action_queue.cancel_download(self.share_id, self.node_id) |
440 | |
441 | def _start(self): |
442 | - """ |
443 | - Do the specialized pre-run setup |
444 | - """ |
445 | + """Do the specialized pre-run setup.""" |
446 | return self.demark(self.node_id) |
447 | |
448 | def cancel(self): |
449 | - """ |
450 | - Cancel the download. |
451 | - """ |
452 | + """Cancel the download.""" |
453 | self.cancelled = True |
454 | if self.download_req is not None: |
455 | self.download_req.cancel() |
456 | self.cleanup() |
457 | |
458 | def store_marker_result(self, (node_id,)): |
459 | - """ |
460 | - Called when all the markers are realized. |
461 | - """ |
462 | + """Called when all the markers are realized.""" |
463 | self.node_id = node_id |
464 | self.start_done = True |
465 | |
466 | def _run(self): |
467 | - """ |
468 | - Do the actual running |
469 | - """ |
470 | + """Do the actual running.""" |
471 | if self.cancelled: |
472 | - return defer.fail(RuntimeError('CANCELLED')) |
473 | + return defer.fail(RequestCleanedUp('CANCELLED')) |
474 | if self.fileobj is None: |
475 | try: |
476 | self.fileobj = self.fileobj_factory() |
477 | @@ -2269,16 +2242,14 @@ |
478 | self.download_req = req |
479 | downloading[self.share_id, self.node_id]['req'] = req |
480 | d = req.deferred |
481 | - d.addBoth(lambda x: defer.fail(RuntimeError('CANCELLED')) |
482 | + d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED')) |
483 | if self.cancelled else x) |
484 | d.addCallback(passit( |
485 | lambda _: downloading.pop((self.share_id, self.node_id)))) |
486 | return d |
487 | |
488 | def handle_success(self, _): |
489 | - """ |
490 | - It worked! Push the event. |
491 | - """ |
492 | + """It worked! Push the event.""" |
493 | self.sync() |
494 | # for directories, a FINISHED; for files, a COMMIT (the Nanny |
495 | # will issue the FINISHED if it's ok) |
496 | @@ -2292,27 +2263,25 @@ |
497 | server_hash=self.server_hash) |
498 | |
499 | def handle_failure(self, failure): |
500 | - """ |
501 | - It didn't work! Push the event. |
502 | - """ |
503 | + """It didn't work! Push the event.""" |
504 | downloading = self.action_queue.downloading |
505 | if (self.share_id, self.node_id) in downloading and \ |
506 | downloading[self.share_id, self.node_id]['command'] is self: |
507 | del downloading[self.share_id, self.node_id] |
508 | self.reset_fileobj() |
509 | - error_msg = failure.getErrorMessage() |
510 | - if error_msg == 'CANCELLED': |
511 | + if failure.check(protocol_errors.RequestCancelledError, |
512 | + RequestCleanedUp): |
513 | self.action_queue.event_queue.push('AQ_DOWNLOAD_CANCELLED', |
514 | share_id=self.share_id, |
515 | node_id=self.node_id, |
516 | server_hash=self.server_hash) |
517 | - elif error_msg == 'DOES_NOT_EXIST': |
518 | + elif failure.check(protocol_errors.DoesNotExistError): |
519 | self.action_queue.event_queue.push('AQ_DOWNLOAD_DOES_NOT_EXIST', |
520 | share_id=self.share_id, |
521 | node_id=self.node_id) |
522 | else: |
523 | self.action_queue.event_queue.push('AQ_DOWNLOAD_ERROR', |
524 | - error=error_msg, |
525 | + error=failure.getErrorMessage(), |
526 | share_id=self.share_id, |
527 | node_id=self.node_id, |
528 | server_hash=self.server_hash) |
529 | @@ -2328,9 +2297,7 @@ |
530 | self.fileobj.truncate(0) |
531 | |
532 | def cb(self, bytes): |
533 | - """ |
534 | - A streaming decompressor |
535 | - """ |
536 | + """A streaming decompressor.""" |
537 | dloading = self.action_queue.downloading[self.share_id, |
538 | self.node_id] |
539 | dloading['n_bytes_read'] += len(bytes) |
540 | @@ -2339,16 +2306,12 @@ |
541 | # see the downloaded size |
542 | |
543 | def nacb(self, **kwargs): |
544 | - """ |
545 | - set the node attrs in the 'currently downloading' dict |
546 | - """ |
547 | + """Set the node attrs in the 'currently downloading' dict.""" |
548 | self.action_queue.downloading[self.share_id, |
549 | self.node_id].update(kwargs) |
550 | |
551 | def sync(self): |
552 | - """ |
553 | - Flush the buffers and sync them to disk if possible |
554 | - """ |
555 | + """Flush the buffers and sync them to disk if possible.""" |
556 | remains = self.gunzip.flush() |
557 | if remains: |
558 | self.fileobj.write(remains) |
559 | @@ -2391,8 +2354,8 @@ |
560 | |
561 | logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32', |
562 | 'size', 'fileobj_factory') |
563 | - retryable_errors = (ActionQueueCommand.retryable_errors |
564 | - | set(['UPLOAD_IN_PROGRESS'])) |
565 | + retryable_errors = ActionQueueCommand.retryable_errors + ( |
566 | + protocol_errors.UploadInProgressError,) |
567 | |
568 | def __init__(self, request_queue, share_id, node_id, previous_hash, hash, |
569 | crc32, size, fileobj_factory, tempfile_factory): |
570 | @@ -2436,9 +2399,7 @@ |
571 | self.upload_req.producer.stopProducing() |
572 | |
573 | def _start(self): |
574 | - """ |
575 | - Do the specialized pre-run setup |
576 | - """ |
577 | + """Do the specialized pre-run setup.""" |
578 | d = defer.Deferred() |
579 | |
580 | uploading = {"hash": self.hash, "req": self} |
581 | @@ -2446,7 +2407,7 @@ |
582 | |
583 | d = self.action_queue.zip_queue.zip(self) |
584 | d.addCallback(lambda _: self.demark(self.node_id)) |
585 | - d.addBoth(lambda x: defer.fail(RuntimeError('CANCELLED')) |
586 | + d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED')) |
587 | if self.cancelled else x) |
588 | return d |
589 | |
590 | @@ -2462,11 +2423,9 @@ |
591 | self.start_done = True |
592 | |
593 | def _run(self): |
594 | - """ |
595 | - Do the actual running |
596 | - """ |
597 | + """Do the actual running.""" |
598 | if self.cancelled: |
599 | - return defer.fail(RuntimeError('CANCELLED')) |
600 | + return defer.fail(RequestCleanedUp('CANCELLED')) |
601 | uploading = {"hash": self.hash, "deflated_size": self.deflated_size, |
602 | "req": self} |
603 | self.action_queue.uploading[self.share_id, self.node_id] = uploading |
604 | @@ -2484,7 +2443,7 @@ |
605 | self.crc32, self.size, self.deflated_size, f) |
606 | self.upload_req = req |
607 | d = req.deferred |
608 | - d.addBoth(lambda x: defer.fail(RuntimeError('CANCELLED')) |
609 | + d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED')) |
610 | if self.cancelled else x) |
611 | d.addBoth(passit(lambda _: |
612 | self.action_queue.uploading.pop((self.share_id, |
Very nice!