Merge lp:~prateek/duplicity/s3-glacier into lp:duplicity/0.6

Proposed by someone1
Status: Merged
Merged at revision: 963
Proposed branch: lp:~prateek/duplicity/s3-glacier
Merge into: lp:duplicity/0.6
Diff against target: 998 lines (+321/-416)
8 files modified
README (+1/-0)
bin/duplicity (+11/-0)
bin/duplicity.1 (+38/-0)
duplicity/backends/_boto_multi.py (+75/-308)
duplicity/backends/_boto_single.py (+171/-104)
duplicity/backends/botobackend.py (+11/-4)
duplicity/commandline.py (+8/-0)
duplicity/globals.py (+6/-0)
To merge this branch: bzr merge lp:~prateek/duplicity/s3-glacier
Reviewer Review Type Date Requested Status
edso Needs Information
Review via email: mp+207719@code.launchpad.net

Commit message

Fixes https://bugs.launchpad.net/duplicity/+bug/1039511 - Adds support to detect when a file is on Glacier and initiates a restore to S3. Also merges overlapping code in the boto backends
Fixes https://bugs.launchpad.net/duplicity/+bug/1243246 - Adds a --s3_multipart_max_timeout input option to limit the max execution time of a chunked upload to S3. Also adds debug message to calculate upload speed.

Description of the change

How this addresses bug 1039511:
If a file located in S3 is found to be on Glacier, it will initiate a restore to S3 and wait until the file is ready to continue the restoration process.

This branch also merged _boto_single and _boto_multi as a majority of the code overlaps, so to make updates easier, having _boto_multi as a subclass to _boto_single makes it so changes to shared code is only done in one place.

To post a comment you must log in.
Revision history for this message
edso (ed.so) wrote :

Prateek,

please add your new switches to bin/dupliciy.1 manpage.
also update the requirements documentation in there that you updated in Readme or changed for the backend.

aside from that i assume you extensively tested the changes?

..ede/duply.net

review: Needs Fixing
Revision history for this message
someone1 (prateek) wrote :

I can add the additions to the manpage document. I have been using my branch for backups for 2-3 months now. I can do restores from Glacier without issue. The addition of the Google Storage backend was not tested but I have no reason to think my changes should break it as it utilizes the same boto API as the S3 backend. I am not sure what other forms of testing you'd like me to try out.

My backup job runs a full backup every 28 days and incremental backups in between. I have about 300GB of data I backup.

lp:~prateek/duplicity/s3-glacier updated
933. By someone1

Updated manpage and tweaked boto backend connection reset

Revision history for this message
someone1 (prateek) wrote :

I updated the manpage accordingly and added entries for the other undocumented S3 options (one of which I added myself in a patch submitted years ago). I also noticed that boto has been caching connections in the storage_uri object which was not being cleared out when resetting the S3 connection. I've modified this code and will begin testing it across 3 production systems I manage.

Let me know what you think.

Revision history for this message
edso (ed.so) wrote :

looks good to me and sounds even better (the testing part :).. thx ede/duply.net

review: Approve
lp:~prateek/duplicity/s3-glacier updated
934. By someone1

Make sure each process in a multipart upload get their own fresh connection

Revision history for this message
someone1 (prateek) wrote :

There is an import error, I will push an update to this branch to fix it.

Revision history for this message
someone1 (prateek) wrote :

I meant to push this up immediately but I thought I try a test upload first. It ran through fine, here is the fix: http://bazaar.launchpad.net/~prateek/duplicity/s3-glacier/revision/935

Do you prefer I put in another Merge request or just one off this separately?

Revision history for this message
Kenneth Loafman (kenneth-loafman) wrote :

Will just merge it in separately.

On Wed, Feb 26, 2014 at 3:08 PM, someone1 <email address hidden> wrote:

> I meant to push this up immediately but I thought I try a test upload
> first. It ran through fine, here is the fix:
> http://bazaar.launchpad.net/~prateek/duplicity/s3-glacier/revision/935
>
> Do you prefer I put in another Merge request or just one off this
> separately?
> --
> https://code.launchpad.net/~prateek/duplicity/s3-glacier/+merge/207719
> You are subscribed to branch lp:duplicity.
>

Revision history for this message
edso (ed.so) wrote :

On 26.02.2014 22:08, someone1 wrote:
> I meant to push this up immediately but I thought I try a test upload first. It ran through fine, here is the fix: http://bazaar.launchpad.net/~prateek/duplicity/s3-glacier/revision/935
>
> Do you prefer I put in another Merge request or just one off this separately?
>

could you please check how duplicity behaves after that. lazy imports in _init are intentionally there to circumvent import errors during initial backend imports.

just check how duplicity behaves when not having boto avail and using a different backend, say file:// for simplicity. it shouldn't complain about missing boto in that case.

..ede/duply.net

Revision history for this message
edso (ed.so) wrote :

Prateek.. anay news on the above? ..ede

review: Needs Information
Revision history for this message
someone1 (prateek) wrote :

I got the following error:
Import of duplicity.backends.botobackend Failed: No module named boto

I've updated my branch with lazy imports.

On Wed, Mar 5, 2014 at 12:13 PM, edso <email address hidden> wrote:

> Review: Needs Information
>
> Prateek.. anay news on the above? ..ede
> --
> https://code.launchpad.net/~prateek/duplicity/s3-glacier/+merge/207719
> You are the owner of lp:~prateek/duplicity/s3-glacier.
>

Revision history for this message
someone1 (prateek) wrote :

I will put a merge request in for the import fix after I perform a backup tonight and make sure there are no further issues.

As an FYI - a full backup failed last week due to insufficient space on my server at one of the sites I support. There was a successful full backup at another site I manage.

Incremental backups have been going smoothly at all 3 sites I use with the changes I submitted.

Restoration from S3/Glacier have been working as well.

Revision history for this message
edso (ed.so) wrote :

unfortunately that was already merged.. please create a branch against trunk for Ken to merge.

thanks for all your efforts.. ede/duply.net

On 05.03.2014 18:54, someone1 wrote:
> I got the following error:
> Import of duplicity.backends.botobackend Failed: No module named boto
>
> I've updated my branch with lazy imports.
>
>
> On Wed, Mar 5, 2014 at 12:13 PM, edso <email address hidden> wrote:
>
>> Review: Needs Information
>>
>> Prateek.. anay news on the above? ..ede
>> --
>> https://code.launchpad.net/~prateek/duplicity/s3-glacier/+merge/207719
>> You are the owner of lp:~prateek/duplicity/s3-glacier.
>>
>

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'README'
2--- README 2014-01-24 12:39:40 +0000
3+++ README 2014-02-26 19:49:10 +0000
4@@ -29,6 +29,7 @@
5 * Boto 2.0 or later for single-processing S3 or GCS access (default)
6 * Boto 2.1.1 or later for multi-processing S3 access
7 * Python v2.6 or later for multi-processing S3 access
8+ * Boto 2.7.0 or later for Glacier S3 access
9
10 If you install from the source package, you will also need:
11
12
13=== modified file 'bin/duplicity'
14--- bin/duplicity 2014-02-05 02:57:01 +0000
15+++ bin/duplicity 2014-02-26 19:49:10 +0000
16@@ -735,6 +735,15 @@
17 log.Progress(_('Processed volume %d of %d') % (cur_vol[0], num_vols),
18 cur_vol[0], num_vols)
19
20+ if hasattr(globals.backend, 'pre_process_download'):
21+ file_names = []
22+ for backup_set in backup_setlist:
23+ manifest = backup_set.get_manifest()
24+ volumes = manifest.get_containing_volumes(index)
25+ for vol_num in volumes:
26+ file_names.append(backup_set.volume_name_dict[vol_num])
27+ globals.backend.pre_process_download(file_names)
28+
29 fileobj_iters = map(get_fileobj_iter, backup_setlist)
30 tarfiles = map(patchdir.TarFile_FromFileobjs, fileobj_iters)
31 return patchdir.tarfiles2rop_iter(tarfiles, index)
32@@ -1142,6 +1151,8 @@
33 local_missing = [] # don't download if we can't decrypt
34 for fn in local_spurious:
35 remove_local(fn)
36+ if hasattr(globals.backend, 'pre_process_download'):
37+ globals.backend.pre_process_download(local_missing)
38 for fn in local_missing:
39 copy_to_local(fn)
40 else:
41
42=== modified file 'bin/duplicity.1'
43--- bin/duplicity.1 2014-01-31 12:41:00 +0000
44+++ bin/duplicity.1 2014-02-26 19:49:10 +0000
45@@ -778,6 +778,44 @@
46 characters or other characters that are not valid in a hostname.
47
48 .TP
49+.BI "--s3-use-rrs"
50+Store volumes using Reduced Redundnacy Storage when uploading to Amazon S3.
51+This will lower the cost of storage but also lower the durability of stored
52+volumnes to 99.99% instead the 99.999999999% durability offered by Standard
53+Storage on S3.
54+
55+.TP
56+.BI "--s3-use-multiprocessing"
57+Allow multipart volumne uploads to S3 through multiprocessing. This option
58+requires Python 2.6 and can be used to make uploads to S3 more efficient.
59+If enabled, files duplicity uploads to S3 will be split into chunks and
60+uploaded in parallel. Useful if you want to saturate your bandwidth
61+or if large files are failing during upload.
62+
63+.TP
64+.BI "--s3-multipart-chunk-size"
65+Chunk size (in MB) used for S3 multipart uploads. Make this smaller than
66+.B --volsize
67+to maximize the use of your bandwidth. For example, a chunk size of 10MB
68+with a volsize of 30MB will result in 3 chunks per volume upload.
69+
70+.TP
71+.BI "--s3-multipart-max-procs"
72+Specify the maximum number of processes to spawn when performing a multipart
73+upload to S3. By default, this will choose the number of processors detected
74+on your system (e.g. 4 for a 4-core system). You can adjust this number as
75+required to ensure you don't overload your system while maximizing the use of
76+your bandwidth.
77+
78+.TP
79+.BI "--s3_multipart_max_timeout"
80+You can control the maximum time (in seconds) a multipart upload can spend on
81+uploading a single chunk to S3. This may be useful if you find your system
82+hanging on multipart uploads or if you'd like to control the time variance
83+when uploading to S3 to ensure you kill connections to slow S3 endpoints.
84+
85+
86+.TP
87 .BI "--scp-command " command
88 .B (only ssh pexpect backend with --use-scp enabled)
89 The
90
91=== modified file 'duplicity/backends/_boto_multi.py'
92--- duplicity/backends/_boto_multi.py 2014-01-13 15:54:13 +0000
93+++ duplicity/backends/_boto_multi.py 2014-02-26 19:49:10 +0000
94@@ -22,20 +22,20 @@
95
96 import os
97 import sys
98-import time
99 import threading
100 import Queue
101-
102-import duplicity.backend
103+import time
104+import traceback
105
106 from duplicity import globals
107 from duplicity import log
108 from duplicity.errors import * #@UnusedWildImport
109-from duplicity.util import exception_traceback
110-from duplicity.backend import retry
111 from duplicity.filechunkio import FileChunkIO
112 from duplicity import progress
113
114+from _boto_single import BotoBackend as BotoSingleBackend
115+from _boto_single import get_connection
116+
117 BOTO_MIN_VERSION = "2.1.1"
118
119 # Multiprocessing is not supported on *BSD
120@@ -61,100 +61,13 @@
121 def run(self):
122 while not self.finish:
123 try:
124- args = self.queue.get(True, 1)
125+ args = self.queue.get(True, 1)
126 progress.report_transfer(args[0], args[1])
127 except Queue.Empty, e:
128 pass
129-
130-
131-def get_connection(scheme, parsed_url):
132- try:
133- import boto
134- assert boto.Version >= BOTO_MIN_VERSION
135-
136- from boto.s3.connection import S3Connection
137- assert hasattr(S3Connection, 'lookup')
138-
139- # Newer versions of boto default to using
140- # virtual hosting for buckets as a result of
141- # upstream deprecation of the old-style access
142- # method by Amazon S3. This change is not
143- # backwards compatible (in particular with
144- # respect to upper case characters in bucket
145- # names); so we default to forcing use of the
146- # old-style method unless the user has
147- # explicitly asked us to use new-style bucket
148- # access.
149- #
150- # Note that if the user wants to use new-style
151- # buckets, we use the subdomain calling form
152- # rather than given the option of both
153- # subdomain and vhost. The reason being that
154- # anything addressable as a vhost, is also
155- # addressable as a subdomain. Seeing as the
156- # latter is mostly a convenience method of
157- # allowing browse:able content semi-invisibly
158- # being hosted on S3, the former format makes
159- # a lot more sense for us to use - being
160- # explicit about what is happening (the fact
161- # that we are talking to S3 servers).
162-
163- try:
164- from boto.s3.connection import OrdinaryCallingFormat
165- from boto.s3.connection import SubdomainCallingFormat
166- cfs_supported = True
167- calling_format = OrdinaryCallingFormat()
168- except ImportError:
169- cfs_supported = False
170- calling_format = None
171-
172- if globals.s3_use_new_style:
173- if cfs_supported:
174- calling_format = SubdomainCallingFormat()
175- else:
176- log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
177- "requested, but does not seem to be supported by the "
178- "boto library. Either you need to upgrade your boto "
179- "library or duplicity has failed to correctly detect "
180- "the appropriate support.",
181- log.ErrorCode.boto_old_style)
182- else:
183- if cfs_supported:
184- calling_format = OrdinaryCallingFormat()
185- else:
186- calling_format = None
187-
188- except ImportError:
189- log.FatalError("This backend (s3) requires boto library, version %s or later, "
190- "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
191- log.ErrorCode.boto_lib_too_old)
192-
193- if scheme == 's3+http':
194- # Use the default Amazon S3 host.
195- conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection))
196- else:
197- assert scheme == 's3'
198- conn = S3Connection(
199- host = parsed_url.hostname,
200- is_secure=(not globals.s3_unencrypted_connection))
201-
202- if hasattr(conn, 'calling_format'):
203- if calling_format is None:
204- log.FatalError("It seems we previously failed to detect support for calling "
205- "formats in the boto library, yet the support is there. This is "
206- "almost certainly a duplicity bug.",
207- log.ErrorCode.boto_calling_format)
208- else:
209- conn.calling_format = calling_format
210-
211- else:
212- # Duplicity hangs if boto gets a null bucket name.
213- # HC: Caught a socket error, trying to recover
214- raise BackendException('Boto requires a bucket name.')
215- return conn
216-
217-
218-class BotoBackend(duplicity.backend.Backend):
219+
220+
221+class BotoBackend(BotoSingleBackend):
222 """
223 Backend for Amazon's Simple Storage System, (aka Amazon S3), though
224 the use of the boto module, (http://code.google.com/p/boto/).
225@@ -167,199 +80,32 @@
226 """
227
228 def __init__(self, parsed_url):
229- duplicity.backend.Backend.__init__(self, parsed_url)
230-
231- from boto.s3.key import Key
232- from boto.s3.multipart import MultiPartUpload
233-
234- # This folds the null prefix and all null parts, which means that:
235- # //MyBucket/ and //MyBucket are equivalent.
236- # //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent.
237- self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/'))
238-
239- if self.url_parts:
240- self.bucket_name = self.url_parts.pop(0)
241- else:
242- # Duplicity hangs if boto gets a null bucket name.
243- # HC: Caught a socket error, trying to recover
244- raise BackendException('Boto requires a bucket name.')
245-
246- self.scheme = parsed_url.scheme
247-
248- self.key_class = Key
249-
250- if self.url_parts:
251- self.key_prefix = '%s/' % '/'.join(self.url_parts)
252- else:
253- self.key_prefix = ''
254-
255- self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url)
256- self.parsed_url = parsed_url
257- self.resetConnection()
258-
259- def resetConnection(self):
260- self.bucket = None
261- self.conn = get_connection(self.scheme, self.parsed_url)
262- self.bucket = self.conn.lookup(self.bucket_name)
263-
264- def put(self, source_path, remote_filename=None):
265- from boto.s3.connection import Location
266- if globals.s3_european_buckets:
267- if not globals.s3_use_new_style:
268- log.FatalError("European bucket creation was requested, but not new-style "
269- "bucket addressing (--s3-use-new-style)",
270- log.ErrorCode.s3_bucket_not_style)
271- #Network glitch may prevent first few attempts of creating/looking up a bucket
272- for n in range(1, globals.num_retries+1):
273- if self.bucket:
274- break
275- if n > 1:
276- time.sleep(30)
277- try:
278- try:
279- self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
280- except Exception, e:
281- if "NoSuchBucket" in str(e):
282- if globals.s3_european_buckets:
283- self.bucket = self.conn.create_bucket(self.bucket_name,
284- location=Location.EU)
285- else:
286- self.bucket = self.conn.create_bucket(self.bucket_name)
287- else:
288- raise e
289- except Exception, e:
290- log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)"
291- "" % (n, self.bucket_name,
292- e.__class__.__name__,
293- str(e)))
294- self.resetConnection()
295-
296- if not remote_filename:
297- remote_filename = source_path.get_filename()
298- key = self.key_prefix + remote_filename
299- for n in range(1, globals.num_retries+1):
300- if n > 1:
301- # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
302- time.sleep(10)
303-
304- if globals.s3_use_rrs:
305- storage_class = 'REDUCED_REDUNDANCY'
306- else:
307- storage_class = 'STANDARD'
308- log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
309- try:
310- headers = {
311- 'Content-Type': 'application/octet-stream',
312- 'x-amz-storage-class': storage_class
313- }
314- self.upload(source_path.name, key, headers)
315- self.resetConnection()
316- return
317- except Exception, e:
318- log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
319- "" % (self.straight_url,
320- remote_filename,
321- n,
322- e.__class__.__name__,
323- str(e)))
324- log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
325- self.resetConnection()
326- log.Warn("Giving up trying to upload %s/%s after %d attempts" %
327- (self.straight_url, remote_filename, globals.num_retries))
328- raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
329-
330- def get(self, remote_filename, local_path):
331- key = self.key_class(self.bucket)
332- key.key = self.key_prefix + remote_filename
333- for n in range(1, globals.num_retries+1):
334- if n > 1:
335- # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
336- time.sleep(10)
337- log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
338- try:
339- key.get_contents_to_filename(local_path.name)
340- local_path.setdata()
341- self.resetConnection()
342- return
343- except Exception, e:
344- log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
345- "" % (self.straight_url,
346- remote_filename,
347- n,
348- e.__class__.__name__,
349- str(e)), 1)
350- log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
351- self.resetConnection()
352- log.Warn("Giving up trying to download %s/%s after %d attempts" %
353- (self.straight_url, remote_filename, globals.num_retries))
354- raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
355-
356- def _list(self):
357- if not self.bucket:
358- raise BackendException("No connection to backend")
359-
360- for n in range(1, globals.num_retries+1):
361- if n > 1:
362- # sleep before retry
363- time.sleep(30)
364- log.Info("Listing %s" % self.straight_url)
365- try:
366- return self._list_filenames_in_bucket()
367- except Exception, e:
368- log.Warn("List %s failed (attempt #%d, reason: %s: %s)"
369- "" % (self.straight_url,
370- n,
371- e.__class__.__name__,
372- str(e)), 1)
373- log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
374- log.Warn("Giving up trying to list %s after %d attempts" %
375- (self.straight_url, globals.num_retries))
376- raise BackendException("Error listng %s" % self.straight_url)
377-
378- def _list_filenames_in_bucket(self):
379- # We add a 'd' to the prefix to make sure it is not null (for boto) and
380- # to optimize the listing of our filenames, which always begin with 'd'.
381- # This will cause a failure in the regression tests as below:
382- # FAIL: Test basic backend operations
383- # <tracback snipped>
384- # AssertionError: Got list: []
385- # Wanted: ['testfile']
386- # Because of the need for this optimization, it should be left as is.
387- #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
388- filename_list = []
389- for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
390- try:
391- filename = k.key.replace(self.key_prefix, '', 1)
392- filename_list.append(filename)
393- log.Debug("Listed %s/%s" % (self.straight_url, filename))
394- except AttributeError:
395- pass
396- return filename_list
397-
398- def delete(self, filename_list):
399- for filename in filename_list:
400- self.bucket.delete_key(self.key_prefix + filename)
401- log.Debug("Deleted %s/%s" % (self.straight_url, filename))
402-
403- @retry
404- def _query_file_info(self, filename, raise_errors=False):
405- try:
406- key = self.bucket.lookup(self.key_prefix + filename)
407- if key is None:
408- return {'size': -1}
409- return {'size': key.size}
410- except Exception, e:
411- log.Warn("Query %s/%s failed: %s"
412- "" % (self.straight_url,
413- filename,
414- str(e)))
415- self.resetConnection()
416- if raise_errors:
417- raise e
418- else:
419- return {'size': None}
420+ BotoSingleBackend.__init__(self, parsed_url)
421+ self._setup_pool()
422+
423+ def _setup_pool(self):
424+ number_of_procs = globals.s3_multipart_max_procs
425+ if not number_of_procs:
426+ number_of_procs = multiprocessing.cpu_count()
427+
428+ if getattr(self, '_pool', False):
429+ log.Debug("A process pool already exists. Destroying previous pool.")
430+ self._pool.terminate()
431+ self._pool.join()
432+ self._pool = None
433+
434+ log.Debug("Setting multipart boto backend process pool to %d processes" % number_of_procs)
435+
436+ self._pool = multiprocessing.Pool(processes=number_of_procs)
437+
438+ def close(self):
439+ BotoSingleBackend.close(self)
440+ log.Debug("Closing pool")
441+ self._pool.terminate()
442+ self._pool.join()
443
444 def upload(self, filename, key, headers=None):
445+ import boto
446 chunk_size = globals.s3_multipart_chunk_size
447
448 # Check minimum chunk size for S3
449@@ -379,7 +125,7 @@
450
451 log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks))
452
453- mp = self.bucket.initiate_multipart_upload(key, headers)
454+ mp = self.bucket.initiate_multipart_upload(key.key, headers)
455
456 # Initiate a queue to share progress data between the pool
457 # workers and a consumer thread, that will collect and report
458@@ -389,57 +135,81 @@
459 queue = manager.Queue()
460 consumer = ConsumerThread(queue)
461 consumer.start()
462-
463- pool = multiprocessing.Pool(processes=chunks)
464+ tasks = []
465 for n in range(chunks):
466- params = [self.scheme, self.parsed_url, self.bucket_name,
467- mp.id, filename, n, chunk_size, globals.num_retries,
468- queue]
469- pool.apply_async(multipart_upload_worker, params)
470- pool.close()
471- pool.join()
472+ storage_uri = boto.storage_uri(self.boto_uri_str)
473+ params = [self.scheme, self.parsed_url, storage_uri, self.bucket_name,
474+ mp.id, filename, n, chunk_size, globals.num_retries,
475+ queue]
476+ tasks.append(self._pool.apply_async(multipart_upload_worker, params))
477+
478+ log.Debug("Waiting for the pool to finish processing %s tasks" % len(tasks))
479+ while tasks:
480+ try:
481+ tasks[0].wait(timeout=globals.s3_multipart_max_timeout)
482+ if tasks[0].ready():
483+ if tasks[0].successful():
484+ del tasks[0]
485+ else:
486+ log.Debug("Part upload not successful, aborting multipart upload.")
487+ self._setup_pool()
488+ break
489+ else:
490+ raise multiprocessing.TimeoutError
491+ except multiprocessing.TimeoutError:
492+ log.Debug("%s tasks did not finish by the specified timeout, aborting multipart upload and resetting pool." % len(tasks))
493+ self._setup_pool()
494+ break
495+
496+ log.Debug("Done waiting for the pool to finish processing")
497
498 # Terminate the consumer thread, if any
499 if globals.progress:
500 consumer.finish = True
501 consumer.join()
502
503- if len(mp.get_all_parts()) < chunks:
504+ if len(tasks) > 0 or len(mp.get_all_parts()) < chunks:
505 mp.cancel_upload()
506 raise BackendException("Multipart upload failed. Aborted.")
507
508 return mp.complete_upload()
509
510
511-def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename,
512- offset, bytes, num_retries, queue):
513+def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
514+ filename, offset, bytes, num_retries, queue):
515 """
516 Worker method for uploading a file chunk to S3 using multipart upload.
517 Note that the file chunk is read into memory, so it's important to keep
518 this number reasonably small.
519 """
520- import traceback
521
522 def _upload_callback(uploaded, total):
523 worker_name = multiprocessing.current_process().name
524 log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
525 if not queue is None:
526- queue.put([uploaded, total]) # Push data to the consumer thread
527+ queue.put([uploaded, total]) # Push data to the consumer thread
528
529 def _upload(num_retries):
530 worker_name = multiprocessing.current_process().name
531 log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
532 try:
533- conn = get_connection(scheme, parsed_url)
534+ conn = get_connection(scheme, parsed_url, storage_uri)
535 bucket = conn.lookup(bucket_name)
536
537- for mp in bucket.get_all_multipart_uploads():
538+ for mp in bucket.list_multipart_uploads():
539 if mp.id == multipart_id:
540 with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
541+ start = time.time()
542 mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
543- num_cb=max(2, 8 * bytes / (1024 * 1024))
544- ) # Max num of callbacks = 8 times x megabyte
545+ num_cb=max(2, 8 * bytes / (1024 * 1024))
546+ ) # Max num of callbacks = 8 times x megabyte
547+ end = time.time()
548+ log.Debug("{name}: Uploaded chunk {chunk} at roughly {speed} bytes/second".format(name=worker_name, chunk=offset+1, speed=(bytes/max(1, abs(end-start)))))
549 break
550+ conn.close()
551+ conn = None
552+ bucket = None
553+ del conn
554 except Exception, e:
555 traceback.print_exc()
556 if num_retries:
557@@ -452,6 +222,3 @@
558 log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))
559
560 return _upload(num_retries)
561-
562-duplicity.backend.register_backend("s3", BotoBackend)
563-duplicity.backend.register_backend("s3+http", BotoBackend)
564
565=== modified file 'duplicity/backends/_boto_single.py'
566--- duplicity/backends/_boto_single.py 2014-01-13 15:54:13 +0000
567+++ duplicity/backends/_boto_single.py 2014-02-26 19:49:10 +0000
568@@ -19,6 +19,7 @@
569 # along with duplicity; if not, write to the Free Software Foundation,
570 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
571
572+import os
573 import time
574
575 import duplicity.backend
576@@ -29,7 +30,90 @@
577 from duplicity.backend import retry
578 from duplicity import progress
579
580-BOTO_MIN_VERSION = "2.0"
581+BOTO_MIN_VERSION = "2.1.1"
582+
583+
584+def get_connection(scheme, parsed_url, storage_uri):
585+ try:
586+ from boto.s3.connection import S3Connection
587+ assert hasattr(S3Connection, 'lookup')
588+
589+ # Newer versions of boto default to using
590+ # virtual hosting for buckets as a result of
591+ # upstream deprecation of the old-style access
592+ # method by Amazon S3. This change is not
593+ # backwards compatible (in particular with
594+ # respect to upper case characters in bucket
595+ # names); so we default to forcing use of the
596+ # old-style method unless the user has
597+ # explicitly asked us to use new-style bucket
598+ # access.
599+ #
600+ # Note that if the user wants to use new-style
601+ # buckets, we use the subdomain calling form
602+ # rather than given the option of both
603+ # subdomain and vhost. The reason being that
604+ # anything addressable as a vhost, is also
605+ # addressable as a subdomain. Seeing as the
606+ # latter is mostly a convenience method of
607+ # allowing browse:able content semi-invisibly
608+ # being hosted on S3, the former format makes
609+ # a lot more sense for us to use - being
610+ # explicit about what is happening (the fact
611+ # that we are talking to S3 servers).
612+
613+ try:
614+ from boto.s3.connection import OrdinaryCallingFormat
615+ from boto.s3.connection import SubdomainCallingFormat
616+ cfs_supported = True
617+ calling_format = OrdinaryCallingFormat()
618+ except ImportError:
619+ cfs_supported = False
620+ calling_format = None
621+
622+ if globals.s3_use_new_style:
623+ if cfs_supported:
624+ calling_format = SubdomainCallingFormat()
625+ else:
626+ log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
627+ "requested, but does not seem to be supported by the "
628+ "boto library. Either you need to upgrade your boto "
629+ "library or duplicity has failed to correctly detect "
630+ "the appropriate support.",
631+ log.ErrorCode.boto_old_style)
632+ else:
633+ if cfs_supported:
634+ calling_format = OrdinaryCallingFormat()
635+ else:
636+ calling_format = None
637+
638+ except ImportError:
639+ log.FatalError("This backend (s3) requires boto library, version %s or later, "
640+ "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
641+ log.ErrorCode.boto_lib_too_old)
642+
643+ if not parsed_url.hostname:
644+ # Use the default host.
645+ conn = storage_uri.connect(is_secure=(not globals.s3_unencrypted_connection))
646+ else:
647+ assert scheme == 's3'
648+ conn = storage_uri.connect(host=parsed_url.hostname,
649+ is_secure=(not globals.s3_unencrypted_connection))
650+
651+ if hasattr(conn, 'calling_format'):
652+ if calling_format is None:
653+ log.FatalError("It seems we previously failed to detect support for calling "
654+ "formats in the boto library, yet the support is there. This is "
655+ "almost certainly a duplicity bug.",
656+ log.ErrorCode.boto_calling_format)
657+ else:
658+ conn.calling_format = calling_format
659+
660+ else:
661+ # Duplicity hangs if boto gets a null bucket name.
662+ # HC: Caught a socket error, trying to recover
663+ raise BackendException('Boto requires a bucket name.')
664+ return conn
665
666
667 class BotoBackend(duplicity.backend.Backend):
668@@ -76,96 +160,28 @@
669 # boto uses scheme://bucket[/name] and specifies hostname on connect()
670 self.boto_uri_str = '://'.join((parsed_url.scheme[:2],
671 parsed_url.path.lstrip('/')))
672- self.storage_uri = boto.storage_uri(self.boto_uri_str)
673 self.resetConnection()
674+ self._listed_keys = {}
675+
676+ def close(self):
677+ del self._listed_keys
678+ self._listed_keys = {}
679+ self.bucket = None
680+ self.conn = None
681+ self.storage_uri = None
682+ del self.conn
683+ del self.storage_uri
684
685 def resetConnection(self):
686+ if getattr(self, 'conn', False):
687+ self.conn.close()
688 self.bucket = None
689 self.conn = None
690-
691- try:
692- from boto.s3.connection import S3Connection
693- from boto.s3.key import Key
694- assert hasattr(S3Connection, 'lookup')
695-
696- # Newer versions of boto default to using
697- # virtual hosting for buckets as a result of
698- # upstream deprecation of the old-style access
699- # method by Amazon S3. This change is not
700- # backwards compatible (in particular with
701- # respect to upper case characters in bucket
702- # names); so we default to forcing use of the
703- # old-style method unless the user has
704- # explicitly asked us to use new-style bucket
705- # access.
706- #
707- # Note that if the user wants to use new-style
708- # buckets, we use the subdomain calling form
709- # rather than given the option of both
710- # subdomain and vhost. The reason being that
711- # anything addressable as a vhost, is also
712- # addressable as a subdomain. Seeing as the
713- # latter is mostly a convenience method of
714- # allowing browse:able content semi-invisibly
715- # being hosted on S3, the former format makes
716- # a lot more sense for us to use - being
717- # explicit about what is happening (the fact
718- # that we are talking to S3 servers).
719-
720- try:
721- from boto.s3.connection import OrdinaryCallingFormat
722- from boto.s3.connection import SubdomainCallingFormat
723- cfs_supported = True
724- calling_format = OrdinaryCallingFormat()
725- except ImportError:
726- cfs_supported = False
727- calling_format = None
728-
729- if globals.s3_use_new_style:
730- if cfs_supported:
731- calling_format = SubdomainCallingFormat()
732- else:
733- log.FatalError("Use of new-style (subdomain) S3 bucket addressing was"
734- "requested, but does not seem to be supported by the "
735- "boto library. Either you need to upgrade your boto "
736- "library or duplicity has failed to correctly detect "
737- "the appropriate support.",
738- log.ErrorCode.boto_old_style)
739- else:
740- if cfs_supported:
741- calling_format = OrdinaryCallingFormat()
742- else:
743- calling_format = None
744-
745- except ImportError:
746- log.FatalError("This backend (s3) requires boto library, version %s or later, "
747- "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION,
748- log.ErrorCode.boto_lib_too_old)
749-
750- if not self.parsed_url.hostname:
751- # Use the default host.
752- self.conn = self.storage_uri.connect(
753- is_secure=(not globals.s3_unencrypted_connection))
754- else:
755- assert self.scheme == 's3'
756- self.conn = self.storage_uri.connect(
757- host=self.parsed_url.hostname,
758- is_secure=(not globals.s3_unencrypted_connection))
759-
760- if hasattr(self.conn, 'calling_format'):
761- if calling_format is None:
762- log.FatalError("It seems we previously failed to detect support for calling "
763- "formats in the boto library, yet the support is there. This is "
764- "almost certainly a duplicity bug.",
765- log.ErrorCode.boto_calling_format)
766- else:
767- self.conn.calling_format = calling_format
768-
769- else:
770- # Duplicity hangs if boto gets a null bucket name.
771- # HC: Caught a socket error, trying to recover
772- raise BackendException('Boto requires a bucket name.')
773-
774+ self.storage_uri = None
775+ del self.conn
776+ del self.storage_uri
777+ self.storage_uri = boto.storage_uri(self.boto_uri_str)
778+ self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri)
779 self.bucket = self.conn.lookup(self.bucket_name)
780
781 def put(self, source_path, remote_filename=None):
782@@ -181,6 +197,7 @@
783 break
784 if n > 1:
785 time.sleep(30)
786+ self.resetConnection()
787 try:
788 try:
789 self.bucket = self.conn.get_bucket(self.bucket_name, validate=True)
790@@ -198,7 +215,6 @@
791 "" % (n, self.bucket_name,
792 e.__class__.__name__,
793 str(e)))
794- self.resetConnection()
795
796 if not remote_filename:
797 remote_filename = source_path.get_filename()
798@@ -215,14 +231,17 @@
799 storage_class = 'STANDARD'
800 log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class))
801 try:
802- key.set_contents_from_filename(source_path.name, {'Content-Type': 'application/octet-stream',
803- 'x-amz-storage-class': storage_class},
804- cb=progress.report_transfer,
805- num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
806- ) # Max num of callbacks = 8 times x megabyte
807-
808- key.close()
809+ headers = {
810+ 'Content-Type': 'application/octet-stream',
811+ 'x-amz-storage-class': storage_class
812+ }
813+ upload_start = time.time()
814+ self.upload(source_path.name, key, headers)
815+ upload_end = time.time()
816+ total_s = abs(upload_end-upload_start) or 1 # prevent a zero value!
817+ rough_upload_speed = os.path.getsize(source_path.name)/total_s
818 self.resetConnection()
819+ log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed))
820 return
821 except Exception, e:
822 log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)"
823@@ -238,19 +257,18 @@
824 raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename))
825
826 def get(self, remote_filename, local_path):
827+ key_name = self.key_prefix + remote_filename
828+ self.pre_process_download(remote_filename, wait=True)
829+ key = self._listed_keys[key_name]
830 for n in range(1, globals.num_retries+1):
831 if n > 1:
832 # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
833 time.sleep(10)
834 log.Info("Downloading %s/%s" % (self.straight_url, remote_filename))
835 try:
836- key_name = self.key_prefix + remote_filename
837- key = self.bucket.get_key(key_name)
838- if key is None:
839- raise BackendException("%s: key not found" % key_name)
840+ self.resetConnection()
841 key.get_contents_to_filename(local_path.name)
842 local_path.setdata()
843- self.resetConnection()
844 return
845 except Exception, e:
846 log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)"
847@@ -260,7 +278,7 @@
848 e.__class__.__name__,
849 str(e)), 1)
850 log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
851- self.resetConnection()
852+
853 log.Warn("Giving up trying to download %s/%s after %d attempts" %
854 (self.straight_url, remote_filename, globals.num_retries))
855 raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename))
856@@ -273,6 +291,7 @@
857 if n > 1:
858 # sleep before retry
859 time.sleep(30)
860+ self.resetConnection()
861 log.Info("Listing %s" % self.straight_url)
862 try:
863 return self._list_filenames_in_bucket()
864@@ -298,10 +317,11 @@
865 # Because of the need for this optimization, it should be left as is.
866 #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'):
867 filename_list = []
868- for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'):
869+ for k in self.bucket.list(prefix=self.key_prefix, delimiter='/'):
870 try:
871 filename = k.key.replace(self.key_prefix, '', 1)
872 filename_list.append(filename)
873+ self._listed_keys[k.key] = k
874 log.Debug("Listed %s/%s" % (self.straight_url, filename))
875 except AttributeError:
876 pass
877@@ -330,6 +350,53 @@
878 else:
879 return {'size': None}
880
881-duplicity.backend.register_backend("gs", BotoBackend)
882-duplicity.backend.register_backend("s3", BotoBackend)
883-duplicity.backend.register_backend("s3+http", BotoBackend)
884+ def upload(self, filename, key, headers):
885+ key.set_contents_from_filename(filename, headers,
886+ cb=progress.report_transfer,
887+ num_cb=(max(2, 8 * globals.volsize / (1024 * 1024)))
888+ ) # Max num of callbacks = 8 times x megabyte
889+ key.close()
890+
891+ def pre_process_download(self, files_to_download, wait=False):
892+ # Used primarily to move files in Glacier to S3
893+ if isinstance(files_to_download, basestring):
894+ files_to_download = [files_to_download]
895+
896+ for remote_filename in files_to_download:
897+ success = False
898+ for n in range(1, globals.num_retries+1):
899+ if n > 1:
900+ # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long)
901+ time.sleep(10)
902+ self.resetConnection()
903+ try:
904+ key_name = self.key_prefix + remote_filename
905+ if not self._listed_keys.get(key_name, False):
906+ self._listed_keys[key_name] = list(self.bucket.list(key_name))[0]
907+ key = self._listed_keys[key_name]
908+
909+ if key.storage_class == "GLACIER":
910+ # We need to move the file out of glacier
911+ if not self.bucket.get_key(key.key).ongoing_restore:
912+ log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename)
913+ key.restore(days=1) # Shouldn't need this again after 1 day
914+ if wait:
915+ log.Info("Waiting for file %s to restore from Glacier" % remote_filename)
916+ while self.bucket.get_key(key.key).ongoing_restore:
917+ time.sleep(60)
918+ self.resetConnection()
919+ log.Info("File %s was successfully restored from Glacier" % remote_filename)
920+ success = True
921+ break
922+ except Exception, e:
923+ log.Warn("Restoration from Glacier for file %s/%s failed (attempt #%d, reason: %s: %s)"
924+ "" % (self.straight_url,
925+ remote_filename,
926+ n,
927+ e.__class__.__name__,
928+ str(e)), 1)
929+ log.Debug("Backtrace of previous error: %s" % (exception_traceback(),))
930+ if not success:
931+ log.Warn("Giving up trying to restore %s/%s after %d attempts" %
932+ (self.straight_url, remote_filename, globals.num_retries))
933+ raise BackendException("Error restoring %s/%s from Glacier to S3" % (self.straight_url, remote_filename))
934
935=== modified file 'duplicity/backends/botobackend.py'
936--- duplicity/backends/botobackend.py 2012-02-29 16:40:41 +0000
937+++ duplicity/backends/botobackend.py 2014-02-26 19:49:10 +0000
938@@ -20,13 +20,20 @@
939 # along with duplicity; if not, write to the Free Software Foundation,
940 # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
941
942+import duplicity.backend
943 from duplicity import globals
944 import sys
945+from _boto_multi import BotoBackend as BotoMultiUploadBackend
946+from _boto_single import BotoBackend as BotoSingleUploadBackend
947
948 if globals.s3_use_multiprocessing:
949- if sys.version_info[:2] < (2,6):
950- print "Sorry, S3 multiprocessing requires version 2.5 or later of python"
951+ if sys.version_info[:2] < (2, 6):
952+ print "Sorry, S3 multiprocessing requires version 2.6 or later of python"
953 sys.exit(1)
954- import _boto_multi
955+ duplicity.backend.register_backend("gs", BotoMultiUploadBackend)
956+ duplicity.backend.register_backend("s3", BotoMultiUploadBackend)
957+ duplicity.backend.register_backend("s3+http", BotoMultiUploadBackend)
958 else:
959- import _boto_single
960+ duplicity.backend.register_backend("gs", BotoSingleUploadBackend)
961+ duplicity.backend.register_backend("s3", BotoSingleUploadBackend)
962+ duplicity.backend.register_backend("s3+http", BotoSingleUploadBackend)
963
964=== modified file 'duplicity/commandline.py'
965--- duplicity/commandline.py 2014-01-31 12:41:00 +0000
966+++ duplicity/commandline.py 2014-02-26 19:49:10 +0000
967@@ -495,6 +495,14 @@
968 parser.add_option("--s3-multipart-chunk-size", type = "int", action = "callback", metavar = _("number"),
969 callback = lambda o, s, v, p: setattr(p.values, "s3_multipart_chunk_size", v * 1024 * 1024))
970
971+ # Number of processes to set the Processor Pool to when uploading multipart
972+ # uploads to S3. Use this to control the maximum simultaneous uploads to S3.
973+ parser.add_option("--s3-multipart-max-procs", type="int", metavar=_("number"))
974+
975+ # Number of seconds to wait for each part of a multipart upload to S3. Use this
976+ # to prevent hangups when doing a multipart upload to S3.
977+ parser.add_option("--s3_multipart_max_timeout", type="int", metavar=_("number"))
978+
979 # Option to allow the s3/boto backend use the multiprocessing version.
980 # By default it is off since it does not work for Python 2.4 or 2.5.
981 if sys.version_info[:2] >= (2, 6):
982
983=== modified file 'duplicity/globals.py'
984--- duplicity/globals.py 2014-01-31 12:41:00 +0000
985+++ duplicity/globals.py 2014-02-26 19:49:10 +0000
986@@ -200,6 +200,12 @@
987 # Minimum chunk size accepted by S3
988 s3_multipart_minimum_chunk_size = 5 * 1024 * 1024
989
990+# Maximum number of processes to use while doing a multipart upload to S3
991+s3_multipart_max_procs = None
992+
993+# Maximum time to wait for a part to finish when doig a multipart upload to S3
994+s3_multipart_max_timeout = None
995+
996 # Whether to use the full email address as the user name when
997 # logging into an imap server. If false just the user name
998 # part of the email address is used.

Subscribers

People subscribed via source and target branches

to all changes: