Merge lp:~facundo/ubuntuone-client/aqc_errors into lp:ubuntuone-client

Proposed by Facundo Batista
Status: Merged
Approved by: Facundo Batista
Approved revision: 484
Merged at revision: not available
Proposed branch: lp:~facundo/ubuntuone-client/aqc_errors
Merge into: lp:ubuntuone-client
Diff against target: 612 lines (+209/-118)
2 files modified
tests/syncdaemon/test_action_queue.py (+141/-9)
ubuntuone/syncdaemon/action_queue.py (+68/-109)
To merge this branch: bzr merge lp:~facundo/ubuntuone-client/aqc_errors
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Natalia Bidart (community) Approve
Review via email: mp+23132@code.launchpad.net

Commit message

Make ActionQueueCommand to handle errors by their type, not message.

Description of the change

Make ActionQueueCommand to handle errors by their type, not message.

The same for a couple of specific commands.

And fixed a detail that made ACQ to *never* retry an operation on a retryable error.

Tests included.

To post a comment you must log in.
Revision history for this message
Natalia Bidart (nataliabidart) wrote :

Very nice!

review: Approve
Revision history for this message
John Lenton (chipaca) wrote :

Is this a fix for something that is going into Lucid? If so, please link it. If not, I'm afraid it would be a lot easier if we merge it after the freeze.

review: Needs Information
Revision history for this message
John Lenton (chipaca) wrote :

OK, brown paper bag for me. I promise I *looked* for the bugs.
Getting more coffee, now.

Revision history for this message
John Lenton (chipaca) :
review: Abstain
Revision history for this message
John Lenton (chipaca) wrote :

The failure.check thing is much better than what it was doing before. So obviously the right thing to do, now that I see it :)

The change from strings to actual classes is also great.

review: Approve
Revision history for this message
dobey (dobey) wrote :

Attempt to merge lp:~facundo/ubuntuone-client/aqc_errors into lp:ubuntuone-client failed due to merge conflicts:

text conflict in tests/syncdaemon/test_action_queue.py
text conflict in ubuntuone/syncdaemon/action_queue.py

Revision history for this message
Facundo Batista (facundo) wrote :

Solved the conflicts!

Revision history for this message
John Lenton (chipaca) wrote :
Download full text (34.6 KiB)

The attempt to merge lp:~facundo/ubuntuone-client/aqc_errors into lp:ubuntuone-client failed.Below is the output from the failed tests.

/usr/bin/gnome-autogen.sh
checking for autoconf >= 2.53...
(B testing autoconf2.50... not found.
  testing autoconf... found 2.65
checking for automake >= 1.10...
(B testing automake-1.11... found 1.11.1
checking for libtool >= 1.5...
(B testing libtoolize... found 2.2.6b
checking for intltool >= 0.30...
(B testing intltoolize... found 0.41.0
checking for pkg-config >= 0.14.0...
(B testing pkg-config... found 0.22
Checking for required M4 macros...
(BChecking for forbidden M4 macros...
(BProcessing ./configure.ac
(BRunning libtoolize...
(Blibtoolize: putting auxiliary files in `.'.
libtoolize: copying file `./ltmain.sh'
libtoolize: putting macros in AC_CONFIG_MACRO_DIR, `m4'.
libtoolize: copying file `m4/libtool.m4'
libtoolize: copying file `m4/ltoptions.m4'
libtoolize: copying file `m4/ltsugar.m4'
libtoolize: copying file `m4/ltversion.m4'
libtoolize: copying file `m4/lt~obsolete.m4'
Running intltoolize...
(BRunning aclocal-1.11...
(BRunning autoconf...
(BRunning autoheader...
(BRunning automake-1.11...
(BRunning ./configure --prefix=/usr ...
(Bchecking for a BSD-compatible install... /usr/bin/install -c
checking whether build environment is sane... yes
checking for a thread-safe mkdir -p... /bin/mkdir -p
checking for gawk... gawk
checking whether make sets $(MAKE)... yes
checking for style of include used by make... GNU
checking for gcc... gcc
checking whether the C compiler works... yes
checking for C compiler default output file name... a.out
checking for suffix of executables...
checking whether we are cross compiling... no
checking for suffix of object files... o
checking whether we are using the GNU C compiler... yes
checking whether gcc accepts -g... yes
checking for gcc option to accept ISO C89... none needed
checking dependency style of gcc... gcc3
checking for library containing strerror... none required
checking for gcc... (cached) gcc
checking whether we are using the GNU C compiler... (cached) yes
checking whether gcc accepts -g... (cached) yes
checking for gcc option to accept ISO C89... (cached) none needed
checking dependency style of gcc... (cached) gcc3
checking build system type... x86_64-unknown-linux-gnu
checking host system type... x86_64-unknown-linux-gnu
checking for a sed that does not truncate output... /bin/sed
checking for grep that handles long lines and -e... /bin/grep
checking for egrep... /bin/grep -E
checking for fgrep... /bin/grep -F
checking for ld used by gcc... /usr/bin/ld
checking if the linker (/usr/bin/ld) is GNU ld... yes
checking for BSD- or MS-compatible name lister (nm)... /usr/bin/nm -B
checking the name lister (/usr/bin/nm -B) interface... BSD nm
checking whether ln -s works... yes
checking the maximum length of command line arguments... 1572864
checking whether the shell understands some XSI constructs... yes
checking whether the shell understands "+="... yes
checking for /usr/bin/ld option to reload object files... -r
checking for objdu...

Revision history for this message
Facundo Batista (facundo) wrote :

It seems there's a problem in your set up, there, as errors is there.

I'm setting this back to approved, to run by someone else. Chipaca, if you see this message, let's talk, :)

Revision history for this message
John Lenton (chipaca) wrote :

On Mon, Apr 12, 2010 at 05:05:46PM -0000, Facundo Batista wrote:
> It seems there's a problem in your set up, there, as errors is there.
>
> I'm setting this back to approved, to run by someone else. Chipaca, if you see this message, let's talk, :)

sure thing

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'tests/syncdaemon/test_action_queue.py'
2--- tests/syncdaemon/test_action_queue.py 2010-04-12 13:19:40 +0000
3+++ tests/syncdaemon/test_action_queue.py 2010-04-12 16:06:36 +0000
4@@ -33,6 +33,7 @@
5 from functools import wraps
6 from StringIO import StringIO
7 from twisted.internet import defer, threads, reactor
8+from twisted.internet import error as twisted_error
9 from twisted.python.failure import DefaultException, Failure
10 from twisted.web import server
11
12@@ -784,6 +785,113 @@
13 self.assertTrue(res is None)
14
15
16+class ActionQueueCommandErrors(ConnectedBaseTestCase):
17+ """Test the error handling in ActionQueueCommand."""
18+
19+ def setUp(self):
20+ res = super(ActionQueueCommandErrors, self).setUp()
21+
22+ self.deferred = defer.Deferred()
23+
24+ class MyLogger(object):
25+ """Fake logger that just stores error and warning calls."""
26+ def __init__(self):
27+ self.logged = None
28+
29+ def error(self, *a):
30+ """Mark that this method was called."""
31+ self.logged = "error"
32+
33+ def warn(self, *a):
34+ """Mark that this method was called."""
35+ self.logged = "warn"
36+
37+ def debug(self, *a):
38+ """Nothing."""
39+
40+ class MyCommand(ActionQueueCommand):
41+ """Inherit ACQ to provide a retry signaller and a custom log."""
42+ # class-closure, cannot use self, pylint: disable-msg=E0213
43+ def __init__(innerself, request_queue):
44+ super(MyCommand, innerself).__init__(request_queue)
45+ innerself.log = MyLogger()
46+
47+ def retry(innerself):
48+ """Signal the retry."""
49+ self.deferred.callback(True)
50+
51+ self.rq = RequestQueue(name='foo', action_queue=self.action_queue)
52+ self.command = MyCommand(self.rq)
53+ return res
54+
55+ def test_suppressed_yes_knownerrors(self):
56+ """Check that the log is in warning for the known errors."""
57+ def send_failure_and_check(errnum, exception_class):
58+ """Send the failure."""
59+ # prepare what to send
60+ protocol_msg = protocol_pb2.Message()
61+ protocol_msg.type = protocol_pb2.Message.ERROR
62+ protocol_msg.error.type = errnum
63+ err = exception_class("request", protocol_msg)
64+
65+ # set up and test
66+ self.command.log.logged = None
67+ self.command.end_errback(failure=Failure(err))
68+ self.assertEqual(self.command.log.logged, "warn",
69+ "Bad log in exception %s" % (exception_class,))
70+
71+ known_errors = [x for x in errors._error_mapping.items()
72+ if x[1] != errors.InternalError]
73+ for errnum, exception_class in known_errors:
74+ send_failure_and_check(errnum, exception_class)
75+
76+ def test_suppressed_no_internalerror(self):
77+ """Check that the log is in error for InternalError."""
78+ # prepare what to send
79+ protocol_msg = protocol_pb2.Message()
80+ protocol_msg.type = protocol_pb2.Message.ERROR
81+ protocol_msg.error.type = protocol_pb2.Error.INTERNAL_ERROR
82+ err = errors.InternalError("request", protocol_msg)
83+
84+ # set up and test
85+ self.command.end_errback(failure=Failure(err))
86+ self.assertEqual(self.command.log.logged, "error")
87+
88+ def test_suppressed_yes_cancelled(self):
89+ """Check that the log is in warning for Cancelled."""
90+ err = errors.RequestCancelledError("CANCELLED")
91+ self.command.end_errback(failure=Failure(err))
92+ self.assertEqual(self.command.log.logged, "warn")
93+
94+ def test_suppressed_yes_and_retry_when_connectiondone(self):
95+ """Check that the log is in warning and retries for ConnectionDone."""
96+ self.command.running = True
97+ err = twisted_error.ConnectionDone()
98+ self.command.end_errback(failure=Failure(err))
99+ self.assertEqual(self.command.log.logged, "warn")
100+ return self.deferred
101+
102+ def test_retry_connectionlost(self):
103+ """Check that it retries when ConnectionLost."""
104+ self.command.running = True
105+ err = twisted_error.ConnectionLost()
106+ self.command.end_errback(failure=Failure(err))
107+ return self.deferred
108+
109+ def test_retry_tryagain(self):
110+ """Check that it retries when TryAgain."""
111+ # prepare what to send
112+ self.command.running = True
113+ protocol_msg = protocol_pb2.Message()
114+ protocol_msg.type = protocol_pb2.Message.ERROR
115+ protocol_msg.error.type = protocol_pb2.Error.TRY_AGAIN
116+ err = errors.TryAgainError("request", protocol_msg)
117+
118+ # set up and test
119+ self.command.end_errback(failure=Failure(err))
120+ return self.deferred
121+
122+
123 class ListVolumesTestCase(ConnectedBaseTestCase):
124 """Test for ListVolumes ActionQueueCommand."""
125
126@@ -1171,9 +1279,8 @@
127
128 def test_failure_with_CANCELLED(self):
129 """AQ_DOWNLOAD_CANCELLED is pushed."""
130- msg = 'CANCELLED'
131- failure = Failure(DefaultException(msg))
132- res = self.command.handle_failure(failure=failure)
133+ err = errors.RequestCancelledError("CANCELLED")
134+ res = self.command.handle_failure(failure=Failure(err))
135 kwargs = dict(share_id='a_share_id', node_id='a_node_id',
136 server_hash='a_server_hash')
137 events = [('AQ_DOWNLOAD_CANCELLED', (), kwargs)]
138@@ -1193,9 +1300,11 @@
139
140 def test_failure_with_DOES_NOT_EXIST(self):
141 """AQ_DOWNLOAD_DOES_NOT_EXIST is pushed."""
142- msg = 'DOES_NOT_EXIST'
143- failure = Failure(DefaultException(msg))
144- res = self.command.handle_failure(failure=failure)
145+ protocol_msg = protocol_pb2.Message()
146+ protocol_msg.type = protocol_pb2.Message.ERROR
147+ protocol_msg.error.type = protocol_pb2.Error.DOES_NOT_EXIST
148+ err = errors.DoesNotExistError("request", protocol_msg)
149+ res = self.command.handle_failure(failure=Failure(err))
150 kwargs = dict(share_id='a_share_id', node_id='a_node_id')
151 events = [('AQ_DOWNLOAD_DOES_NOT_EXIST', (), kwargs)]
152 self.assertEquals(events, self.command.action_queue.event_queue.events)
153@@ -1224,9 +1333,10 @@
154
155 def setUp(self):
156 """Init."""
157- res = super(UploadTestCase, self).setUp()
158+ super(UploadTestCase, self).setUp()
159
160- request_queue = RequestQueue(name='FOO', action_queue=self.action_queue)
161+ self.rq = request_queue = RequestQueue(name='FOO',
162+ action_queue=self.action_queue)
163 self.command = Upload(request_queue, share_id='a_share_id',
164 node_id='a_node_id', previous_hash='prev_hash',
165 hash='yadda', crc32=0, size=0,
166@@ -1234,7 +1344,29 @@
167 tempfile_factory=lambda: None)
168 self.command.start_unqueued() # create the logger
169
170- return res
171+ def test_upload_in_progress(self):
172+ """Test Upload retries on UploadInProgress."""
173+ # monkeypatching is not allowed, let's do inheritance
174+ d = defer.Deferred()
175+ class MyUpload(Upload):
176+ """Just to redefine retry."""
177+ def retry(self):
178+ """Detect retry was called."""
179+ d.callback(True)
180+
181+ # set up the command
182+ command = MyUpload(self.rq, 'share', 'bode', 'prvhash', 'currhash',
183+ 0, 0, lambda: None, lambda: None)
184+ command.start_unqueued() # create log in the instance
185+ command.running = True
186+
187+ # send the failure
188+ protocol_msg = protocol_pb2.Message()
189+ protocol_msg.type = protocol_pb2.Message.ERROR
190+ protocol_msg.error.type = protocol_pb2.Error.UPLOAD_IN_PROGRESS
191+ err = errors.UploadInProgressError("request", protocol_msg)
192+ command.end_errback(failure=Failure(err))
193+ return d
194
195 def test_handle_success_push_event(self):
196 """Test AQ_UPLOAD_FINISHED is pushed on success."""
197
198=== modified file 'ubuntuone/syncdaemon/action_queue.py'
199--- ubuntuone/syncdaemon/action_queue.py 2010-04-12 13:19:40 +0000
200+++ ubuntuone/syncdaemon/action_queue.py 2010-04-12 16:06:36 +0000
201@@ -44,11 +44,13 @@
202
203 from zope.interface import implements
204 from twisted.internet import reactor, defer, threads
205+from twisted.internet import error as twisted_errors
206 from twisted.names import client as dns_client
207 from twisted.python.failure import Failure, DefaultException
208
209 from oauth import oauth
210-from ubuntuone.storageprotocol import errors, protocol_pb2
211+from ubuntuone.storageprotocol import protocol_pb2
212+from ubuntuone.storageprotocol import errors as protocol_errors
213 from ubuntuone.storageprotocol.client import (
214 ThrottlingStorageClient, ThrottlingStorageClientFactory
215 )
216@@ -913,12 +915,12 @@
217 return
218 except request_error, failure:
219 self.event_queue.push(event_error, error=str(failure))
220- except errors.AuthenticationRequiredError, failure:
221+ except protocol_errors.AuthenticationRequiredError, failure:
222 # we need to separate this case from the rest because an
223 # AuthenticationRequiredError is an StorageRequestError,
224 # and we treat it differently.
225 self.event_queue.push('SYS_UNKNOWN_ERROR')
226- except errors.StorageRequestError, failure:
227+ except protocol_errors.StorageRequestError, failure:
228 self.event_queue.push('SYS_SERVER_ERROR', error=str(failure))
229 except Exception, failure:
230 self.event_queue.push('SYS_UNKNOWN_ERROR')
231@@ -940,7 +942,7 @@
232 """Check if the client protocol version matches that of the server."""
233 check_version_d = self._send_request_and_handle_errors(
234 request=self.client.protocol_version,
235- request_error=errors.UnsupportedVersionError,
236+ request_error=protocol_errors.UnsupportedVersionError,
237 event_error='SYS_PROTOCOL_VERSION_ERROR',
238 event_ok='SYS_PROTOCOL_VERSION_OK'
239 )
240@@ -986,7 +988,7 @@
241 """Authenticate the client against the server using oauth_consumer."""
242 authenticate_d = self._send_request_and_handle_errors(
243 request=self.client.oauth_authenticate,
244- request_error=errors.AuthenticationFailedError,
245+ request_error=protocol_errors.AuthenticationFailedError,
246 event_error='SYS_AUTH_ERROR', event_ok='SYS_AUTH_OK',
247 args=(oauth_consumer, self.token)
248 )
249@@ -1231,21 +1233,20 @@
250 class ActionQueueCommand(object):
251 """Base of all the action queue commands."""
252
253- # protobuf doesn't seem to have much introspectionable stuff
254- # without going into private attributes
255+ # the info used in the protocol errors is hidden, but very useful!
256 # pylint: disable-msg=W0212
257- known_error_messages = (set(protocol_pb2._ERROR_ERRORTYPE.values_by_name)
258- | set(['CANCELLED']))
259- suppressed_error_messages = (known_error_messages
260- - set(['INTERNAL_ERROR'])
261- | set(['Cleaned up',
262- 'Connection was closed cleanly.']))
263- retryable_errors = set([
264- 'Cleaned up',
265- 'TRY_AGAIN',
266- 'Connection was closed cleanly.',
267- 'Connection to the other side was lost in a non-clean fashion.',
268- ])
269+ suppressed_error_messages = (
270+ [x for x in protocol_errors._error_mapping.values()
271+ if x is not protocol_errors.InternalError] +
272+ [protocol_errors.RequestCancelledError,
273+ twisted_errors.ConnectionDone, RequestCleanedUp]
274+ )
275+
276+ retryable_errors = (
277+ protocol_errors.TryAgainError,
278+ twisted_errors.ConnectionDone,
279+ twisted_errors.ConnectionLost,
280+ )
281
282 logged_attrs = ()
283
284@@ -1277,12 +1278,9 @@
285
286 def check_conditions(self):
287 """Check conditions on which the command may be waiting."""
288- pass
289
290 def demark(self, *maybe_markers):
291- """
292- Arrange to have maybe_markers realized
293- """
294+ """Arrange to have maybe_markers realized."""
295 l = []
296 for marker in maybe_markers:
297 if IMarker.providedBy(marker):
298@@ -1303,9 +1301,9 @@
299
300 @staticmethod
301 def unwrap(results):
302- """
303- Unpack the values from the result of a DeferredList. If
304- there's a failure, return it instead.
305+ """Unpack the values from the result of a DeferredList.
306+
307+ If there's a failure, return it instead.
308 """
309 values = []
310 for result in results:
311@@ -1321,9 +1319,7 @@
312 return values
313
314 def end_callback(self, arg):
315- """
316- It worked!
317- """
318+ """It worked!"""
319 if self.running:
320 self.log.debug('success')
321 return self.handle_success(arg)
322@@ -1331,18 +1327,17 @@
323 self.log.debug('not running, so no success')
324
325 def end_errback(self, failure):
326- """
327- It failed!
328- """
329+ """It failed!"""
330 error_message = failure.getErrorMessage()
331- if error_message not in self.suppressed_error_messages:
332+ if failure.check(*self.suppressed_error_messages):
333+ self.log.warn('failure', error_message)
334+ else:
335 self.log.error('failure', error_message)
336 self.log.debug('traceback follows:\n\n' + failure.getTraceback())
337- else:
338- self.log.warn('failure', error_message)
339+ was_running = self.running
340 self.cleanup()
341- if error_message in self.retryable_errors:
342- if self.running:
343+ if failure.check(*self.retryable_errors):
344+ if was_running:
345 reactor.callLater(0.1, self.retry)
346 else:
347 return self.handle_failure(failure)
348@@ -1352,37 +1347,29 @@
349 self.log = self.make_logger()
350
351 def start(self, _=None):
352- """
353- Queue the command.
354- """
355+ """Queue the command."""
356 self.start_unqueued()
357 self.log.debug('queueing in the %s', self._queue.name)
358 self._queue.queue(self)
359
360 def cleanup(self):
361- """
362- Do whatever is needed to clean up from a failure (such as stop
363- producers and other such that aren't cleaned up appropriately
364- on their own)
365+ """Do whatever is needed to clean up from a failure.
366+
367+ For example, stop producers and others that aren't cleaned up
368+ appropriately on their own.
369 """
370 self.running = False
371 self.log.debug('cleanup')
372
373 def _start(self):
374- """
375- Do the specialized pre-run setup
376- """
377+ """Do the specialized pre-run setup."""
378 return defer.succeed(None)
379
380 def store_marker_result(self, _):
381- """
382- Called when all the markers are realized.
383- """
384+ """Called when all the markers are realized."""
385
386 def run(self):
387- """
388- Do the deed.
389- """
390+ """Do the deed."""
391 self.running = True
392 if self.start_done:
393 self.log.debug('retrying')
394@@ -1409,37 +1396,31 @@
395 return d
396
397 def handle_success(self, success):
398- """
399- Do anthing that's needed to handle success of the operation.
400- """
401+ """Do anthing that's needed to handle success of the operation."""
402 return success
403
404 def handle_failure(self, failure):
405- """
406- Do anthing that's needed to handle failure of the operation.
407+ """Do anthing that's needed to handle failure of the operation.
408+
409 Note that cancellation and TRY_AGAIN are already handled.
410 """
411 return failure
412
413 def cleanup_and_retry(self):
414- """
415- First, cleanup; then, retry :)
416- """
417+ """First, cleanup; then, retry :)"""
418 self.log.debug('cleanup and retry')
419 self.cleanup()
420 return self.retry()
421
422 def retry(self):
423- """
424- Request cancelled or TRY_AGAIN. Well then, try again!
425- """
426+ """Request cancelled or TRY_AGAIN. Well then, try again!"""
427 self.running = False
428 self.log.debug('will retry')
429 return self._queue.queue_top(self)
430
431 @property
432 def action_queue(self):
433- """Returns the action queue."""
434+ """Return the action queue."""
435 return self._queue.action_queue
436
437 def __str__(self, str_attrs=None):
438@@ -2214,33 +2195,25 @@
439 self.action_queue.cancel_download(self.share_id, self.node_id)
440
441 def _start(self):
442- """
443- Do the specialized pre-run setup
444- """
445+ """Do the specialized pre-run setup."""
446 return self.demark(self.node_id)
447
448 def cancel(self):
449- """
450- Cancel the download.
451- """
452+ """Cancel the download."""
453 self.cancelled = True
454 if self.download_req is not None:
455 self.download_req.cancel()
456 self.cleanup()
457
458 def store_marker_result(self, (node_id,)):
459- """
460- Called when all the markers are realized.
461- """
462+ """Called when all the markers are realized."""
463 self.node_id = node_id
464 self.start_done = True
465
466 def _run(self):
467- """
468- Do the actual running
469- """
470+ """Do the actual running."""
471 if self.cancelled:
472- return defer.fail(RuntimeError('CANCELLED'))
473+ return defer.fail(RequestCleanedUp('CANCELLED'))
474 if self.fileobj is None:
475 try:
476 self.fileobj = self.fileobj_factory()
477@@ -2269,16 +2242,14 @@
478 self.download_req = req
479 downloading[self.share_id, self.node_id]['req'] = req
480 d = req.deferred
481- d.addBoth(lambda x: defer.fail(RuntimeError('CANCELLED'))
482+ d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
483 if self.cancelled else x)
484 d.addCallback(passit(
485 lambda _: downloading.pop((self.share_id, self.node_id))))
486 return d
487
488 def handle_success(self, _):
489- """
490- It worked! Push the event.
491- """
492+ """It worked! Push the event."""
493 self.sync()
494 # for directories, a FINISHED; for files, a COMMIT (the Nanny
495 # will issue the FINISHED if it's ok)
496@@ -2292,27 +2263,25 @@
497 server_hash=self.server_hash)
498
499 def handle_failure(self, failure):
500- """
501- It didn't work! Push the event.
502- """
503+ """It didn't work! Push the event."""
504 downloading = self.action_queue.downloading
505 if (self.share_id, self.node_id) in downloading and \
506 downloading[self.share_id, self.node_id]['command'] is self:
507 del downloading[self.share_id, self.node_id]
508 self.reset_fileobj()
509- error_msg = failure.getErrorMessage()
510- if error_msg == 'CANCELLED':
511+ if failure.check(protocol_errors.RequestCancelledError,
512+ RequestCleanedUp):
513 self.action_queue.event_queue.push('AQ_DOWNLOAD_CANCELLED',
514 share_id=self.share_id,
515 node_id=self.node_id,
516 server_hash=self.server_hash)
517- elif error_msg == 'DOES_NOT_EXIST':
518+ elif failure.check(protocol_errors.DoesNotExistError):
519 self.action_queue.event_queue.push('AQ_DOWNLOAD_DOES_NOT_EXIST',
520 share_id=self.share_id,
521 node_id=self.node_id)
522 else:
523 self.action_queue.event_queue.push('AQ_DOWNLOAD_ERROR',
524- error=error_msg,
525+ error=failure.getErrorMessage(),
526 share_id=self.share_id,
527 node_id=self.node_id,
528 server_hash=self.server_hash)
529@@ -2328,9 +2297,7 @@
530 self.fileobj.truncate(0)
531
532 def cb(self, bytes):
533- """
534- A streaming decompressor
535- """
536+ """A streaming decompressor."""
537 dloading = self.action_queue.downloading[self.share_id,
538 self.node_id]
539 dloading['n_bytes_read'] += len(bytes)
540@@ -2339,16 +2306,12 @@
541 # see the downloaded size
542
543 def nacb(self, **kwargs):
544- """
545- set the node attrs in the 'currently downloading' dict
546- """
547+ """Set the node attrs in the 'currently downloading' dict."""
548 self.action_queue.downloading[self.share_id,
549 self.node_id].update(kwargs)
550
551 def sync(self):
552- """
553- Flush the buffers and sync them to disk if possible
554- """
555+ """Flush the buffers and sync them to disk if possible."""
556 remains = self.gunzip.flush()
557 if remains:
558 self.fileobj.write(remains)
559@@ -2391,8 +2354,8 @@
560
561 logged_attrs = ('share_id', 'node_id', 'previous_hash', 'hash', 'crc32',
562 'size', 'fileobj_factory')
563- retryable_errors = (ActionQueueCommand.retryable_errors
564- | set(['UPLOAD_IN_PROGRESS']))
565+ retryable_errors = ActionQueueCommand.retryable_errors + (
566+ protocol_errors.UploadInProgressError,)
567
568 def __init__(self, request_queue, share_id, node_id, previous_hash, hash,
569 crc32, size, fileobj_factory, tempfile_factory):
570@@ -2436,9 +2399,7 @@
571 self.upload_req.producer.stopProducing()
572
573 def _start(self):
574- """
575- Do the specialized pre-run setup
576- """
577+ """Do the specialized pre-run setup."""
578 d = defer.Deferred()
579
580 uploading = {"hash": self.hash, "req": self}
581@@ -2446,7 +2407,7 @@
582
583 d = self.action_queue.zip_queue.zip(self)
584 d.addCallback(lambda _: self.demark(self.node_id))
585- d.addBoth(lambda x: defer.fail(RuntimeError('CANCELLED'))
586+ d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
587 if self.cancelled else x)
588 return d
589
590@@ -2462,11 +2423,9 @@
591 self.start_done = True
592
593 def _run(self):
594- """
595- Do the actual running
596- """
597+ """Do the actual running."""
598 if self.cancelled:
599- return defer.fail(RuntimeError('CANCELLED'))
600+ return defer.fail(RequestCleanedUp('CANCELLED'))
601 uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
602 "req": self}
603 self.action_queue.uploading[self.share_id, self.node_id] = uploading
604@@ -2484,7 +2443,7 @@
605 self.crc32, self.size, self.deflated_size, f)
606 self.upload_req = req
607 d = req.deferred
608- d.addBoth(lambda x: defer.fail(RuntimeError('CANCELLED'))
609+ d.addBoth(lambda x: defer.fail(RequestCleanedUp('CANCELLED'))
610 if self.cancelled else x)
611 d.addBoth(passit(lambda _:
612 self.action_queue.uploading.pop((self.share_id,

Subscribers

People subscribed via source and target branches