Merge lp:~prateek/duplicity/s3-glacier into lp:duplicity/0.6
- s3-glacier
- Merge into 0.6-series
Status: | Merged | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Merged at revision: | 963 | ||||||||||||
Proposed branch: | lp:~prateek/duplicity/s3-glacier | ||||||||||||
Merge into: | lp:duplicity/0.6 | ||||||||||||
Diff against target: |
998 lines (+321/-416) 8 files modified
README (+1/-0) bin/duplicity (+11/-0) bin/duplicity.1 (+38/-0) duplicity/backends/_boto_multi.py (+75/-308) duplicity/backends/_boto_single.py (+171/-104) duplicity/backends/botobackend.py (+11/-4) duplicity/commandline.py (+8/-0) duplicity/globals.py (+6/-0) |
||||||||||||
To merge this branch: | bzr merge lp:~prateek/duplicity/s3-glacier | ||||||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
edso | Needs Information | ||
Review via email: mp+207719@code.launchpad.net |
Commit message
Fixes https:/
Fixes https:/
Description of the change
How this addresses bug 1039511:
If a file located in S3 is found to be on Glacier, it will initiate a restore to S3 and wait until the file is ready to continue the restoration process.
This branch also merged _boto_single and _boto_multi as a majority of the code overlaps, so to make updates easier, having _boto_multi as a subclass to _boto_single makes it so changes to shared code is only done in one place.
someone1 (prateek) wrote : | # |
I can add the additions to the manpage document. I have been using my branch for backups for 2-3 months now. I can do restores from Glacier without issue. The addition of the Google Storage backend was not tested but I have no reason to think my changes should break it as it utilizes the same boto API as the S3 backend. I am not sure what other forms of testing you'd like me to try out.
My backup job runs a full backup every 28 days and incremental backups in between. I have about 300GB of data I backup.
someone1 (prateek) wrote : | # |
I updated the manpage accordingly and added entries for the other undocumented S3 options (one of which I added myself in a patch submitted years ago). I also noticed that boto has been caching connections in the storage_uri object which was not being cleared out when resetting the S3 connection. I've modified this code and will begin testing it across 3 production systems I manage.
Let me know what you think.
edso (ed.so) wrote : | # |
looks good to me and sounds even better (the testing part :).. thx ede/duply.net
someone1 (prateek) wrote : | # |
There is an import error, I will push an update to this branch to fix it.
someone1 (prateek) wrote : | # |
I meant to push this up immediately but I thought I try a test upload first. It ran through fine, here is the fix: http://
Do you prefer I put in another Merge request or just one off this separately?
Kenneth Loafman (kenneth-loafman) wrote : | # |
Will just merge it in separately.
On Wed, Feb 26, 2014 at 3:08 PM, someone1 <email address hidden> wrote:
> I meant to push this up immediately but I thought I try a test upload
> first. It ran through fine, here is the fix:
> http://
>
> Do you prefer I put in another Merge request or just one off this
> separately?
> --
> https:/
> You are subscribed to branch lp:duplicity.
>
edso (ed.so) wrote : | # |
On 26.02.2014 22:08, someone1 wrote:
> I meant to push this up immediately but I thought I try a test upload first. It ran through fine, here is the fix: http://
>
> Do you prefer I put in another Merge request or just one off this separately?
>
could you please check how duplicity behaves after that. lazy imports in _init are intentionally there to circumvent import errors during initial backend imports.
just check how duplicity behaves when not having boto avail and using a different backend, say file:// for simplicity. it shouldn't complain about missing boto in that case.
..ede/duply.net
edso (ed.so) wrote : | # |
Prateek.. anay news on the above? ..ede
someone1 (prateek) wrote : | # |
I got the following error:
Import of duplicity.
I've updated my branch with lazy imports.
On Wed, Mar 5, 2014 at 12:13 PM, edso <email address hidden> wrote:
> Review: Needs Information
>
> Prateek.. anay news on the above? ..ede
> --
> https:/
> You are the owner of lp:~prateek/duplicity/s3-glacier.
>
someone1 (prateek) wrote : | # |
I will put a merge request in for the import fix after I perform a backup tonight and make sure there are no further issues.
As an FYI - a full backup failed last week due to insufficient space on my server at one of the sites I support. There was a successful full backup at another site I manage.
Incremental backups have been going smoothly at all 3 sites I use with the changes I submitted.
Restoration from S3/Glacier have been working as well.
edso (ed.so) wrote : | # |
unfortunately that was already merged.. please create a branch against trunk for Ken to merge.
thanks for all your efforts.. ede/duply.net
On 05.03.2014 18:54, someone1 wrote:
> I got the following error:
> Import of duplicity.
>
> I've updated my branch with lazy imports.
>
>
> On Wed, Mar 5, 2014 at 12:13 PM, edso <email address hidden> wrote:
>
>> Review: Needs Information
>>
>> Prateek.. anay news on the above? ..ede
>> --
>> https:/
>> You are the owner of lp:~prateek/duplicity/s3-glacier.
>>
>
Preview Diff
1 | === modified file 'README' | |||
2 | --- README 2014-01-24 12:39:40 +0000 | |||
3 | +++ README 2014-02-26 19:49:10 +0000 | |||
4 | @@ -29,6 +29,7 @@ | |||
5 | 29 | * Boto 2.0 or later for single-processing S3 or GCS access (default) | 29 | * Boto 2.0 or later for single-processing S3 or GCS access (default) |
6 | 30 | * Boto 2.1.1 or later for multi-processing S3 access | 30 | * Boto 2.1.1 or later for multi-processing S3 access |
7 | 31 | * Python v2.6 or later for multi-processing S3 access | 31 | * Python v2.6 or later for multi-processing S3 access |
8 | 32 | * Boto 2.7.0 or later for Glacier S3 access | ||
9 | 32 | 33 | ||
10 | 33 | If you install from the source package, you will also need: | 34 | If you install from the source package, you will also need: |
11 | 34 | 35 | ||
12 | 35 | 36 | ||
13 | === modified file 'bin/duplicity' | |||
14 | --- bin/duplicity 2014-02-05 02:57:01 +0000 | |||
15 | +++ bin/duplicity 2014-02-26 19:49:10 +0000 | |||
16 | @@ -735,6 +735,15 @@ | |||
17 | 735 | log.Progress(_('Processed volume %d of %d') % (cur_vol[0], num_vols), | 735 | log.Progress(_('Processed volume %d of %d') % (cur_vol[0], num_vols), |
18 | 736 | cur_vol[0], num_vols) | 736 | cur_vol[0], num_vols) |
19 | 737 | 737 | ||
20 | 738 | if hasattr(globals.backend, 'pre_process_download'): | ||
21 | 739 | file_names = [] | ||
22 | 740 | for backup_set in backup_setlist: | ||
23 | 741 | manifest = backup_set.get_manifest() | ||
24 | 742 | volumes = manifest.get_containing_volumes(index) | ||
25 | 743 | for vol_num in volumes: | ||
26 | 744 | file_names.append(backup_set.volume_name_dict[vol_num]) | ||
27 | 745 | globals.backend.pre_process_download(file_names) | ||
28 | 746 | |||
29 | 738 | fileobj_iters = map(get_fileobj_iter, backup_setlist) | 747 | fileobj_iters = map(get_fileobj_iter, backup_setlist) |
30 | 739 | tarfiles = map(patchdir.TarFile_FromFileobjs, fileobj_iters) | 748 | tarfiles = map(patchdir.TarFile_FromFileobjs, fileobj_iters) |
31 | 740 | return patchdir.tarfiles2rop_iter(tarfiles, index) | 749 | return patchdir.tarfiles2rop_iter(tarfiles, index) |
32 | @@ -1142,6 +1151,8 @@ | |||
33 | 1142 | local_missing = [] # don't download if we can't decrypt | 1151 | local_missing = [] # don't download if we can't decrypt |
34 | 1143 | for fn in local_spurious: | 1152 | for fn in local_spurious: |
35 | 1144 | remove_local(fn) | 1153 | remove_local(fn) |
36 | 1154 | if hasattr(globals.backend, 'pre_process_download'): | ||
37 | 1155 | globals.backend.pre_process_download(local_missing) | ||
38 | 1145 | for fn in local_missing: | 1156 | for fn in local_missing: |
39 | 1146 | copy_to_local(fn) | 1157 | copy_to_local(fn) |
40 | 1147 | else: | 1158 | else: |
41 | 1148 | 1159 | ||
42 | === modified file 'bin/duplicity.1' | |||
43 | --- bin/duplicity.1 2014-01-31 12:41:00 +0000 | |||
44 | +++ bin/duplicity.1 2014-02-26 19:49:10 +0000 | |||
45 | @@ -778,6 +778,44 @@ | |||
46 | 778 | characters or other characters that are not valid in a hostname. | 778 | characters or other characters that are not valid in a hostname. |
47 | 779 | 779 | ||
48 | 780 | .TP | 780 | .TP |
49 | 781 | .BI "--s3-use-rrs" | ||
50 | 782 | Store volumes using Reduced Redundnacy Storage when uploading to Amazon S3. | ||
51 | 783 | This will lower the cost of storage but also lower the durability of stored | ||
52 | 784 | volumnes to 99.99% instead the 99.999999999% durability offered by Standard | ||
53 | 785 | Storage on S3. | ||
54 | 786 | |||
55 | 787 | .TP | ||
56 | 788 | .BI "--s3-use-multiprocessing" | ||
57 | 789 | Allow multipart volumne uploads to S3 through multiprocessing. This option | ||
58 | 790 | requires Python 2.6 and can be used to make uploads to S3 more efficient. | ||
59 | 791 | If enabled, files duplicity uploads to S3 will be split into chunks and | ||
60 | 792 | uploaded in parallel. Useful if you want to saturate your bandwidth | ||
61 | 793 | or if large files are failing during upload. | ||
62 | 794 | |||
63 | 795 | .TP | ||
64 | 796 | .BI "--s3-multipart-chunk-size" | ||
65 | 797 | Chunk size (in MB) used for S3 multipart uploads. Make this smaller than | ||
66 | 798 | .B --volsize | ||
67 | 799 | to maximize the use of your bandwidth. For example, a chunk size of 10MB | ||
68 | 800 | with a volsize of 30MB will result in 3 chunks per volume upload. | ||
69 | 801 | |||
70 | 802 | .TP | ||
71 | 803 | .BI "--s3-multipart-max-procs" | ||
72 | 804 | Specify the maximum number of processes to spawn when performing a multipart | ||
73 | 805 | upload to S3. By default, this will choose the number of processors detected | ||
74 | 806 | on your system (e.g. 4 for a 4-core system). You can adjust this number as | ||
75 | 807 | required to ensure you don't overload your system while maximizing the use of | ||
76 | 808 | your bandwidth. | ||
77 | 809 | |||
78 | 810 | .TP | ||
79 | 811 | .BI "--s3_multipart_max_timeout" | ||
80 | 812 | You can control the maximum time (in seconds) a multipart upload can spend on | ||
81 | 813 | uploading a single chunk to S3. This may be useful if you find your system | ||
82 | 814 | hanging on multipart uploads or if you'd like to control the time variance | ||
83 | 815 | when uploading to S3 to ensure you kill connections to slow S3 endpoints. | ||
84 | 816 | |||
85 | 817 | |||
86 | 818 | .TP | ||
87 | 781 | .BI "--scp-command " command | 819 | .BI "--scp-command " command |
88 | 782 | .B (only ssh pexpect backend with --use-scp enabled) | 820 | .B (only ssh pexpect backend with --use-scp enabled) |
89 | 783 | The | 821 | The |
90 | 784 | 822 | ||
91 | === modified file 'duplicity/backends/_boto_multi.py' | |||
92 | --- duplicity/backends/_boto_multi.py 2014-01-13 15:54:13 +0000 | |||
93 | +++ duplicity/backends/_boto_multi.py 2014-02-26 19:49:10 +0000 | |||
94 | @@ -22,20 +22,20 @@ | |||
95 | 22 | 22 | ||
96 | 23 | import os | 23 | import os |
97 | 24 | import sys | 24 | import sys |
98 | 25 | import time | ||
99 | 26 | import threading | 25 | import threading |
100 | 27 | import Queue | 26 | import Queue |
103 | 28 | 27 | import time | |
104 | 29 | import duplicity.backend | 28 | import traceback |
105 | 30 | 29 | ||
106 | 31 | from duplicity import globals | 30 | from duplicity import globals |
107 | 32 | from duplicity import log | 31 | from duplicity import log |
108 | 33 | from duplicity.errors import * #@UnusedWildImport | 32 | from duplicity.errors import * #@UnusedWildImport |
109 | 34 | from duplicity.util import exception_traceback | ||
110 | 35 | from duplicity.backend import retry | ||
111 | 36 | from duplicity.filechunkio import FileChunkIO | 33 | from duplicity.filechunkio import FileChunkIO |
112 | 37 | from duplicity import progress | 34 | from duplicity import progress |
113 | 38 | 35 | ||
114 | 36 | from _boto_single import BotoBackend as BotoSingleBackend | ||
115 | 37 | from _boto_single import get_connection | ||
116 | 38 | |||
117 | 39 | BOTO_MIN_VERSION = "2.1.1" | 39 | BOTO_MIN_VERSION = "2.1.1" |
118 | 40 | 40 | ||
119 | 41 | # Multiprocessing is not supported on *BSD | 41 | # Multiprocessing is not supported on *BSD |
120 | @@ -61,100 +61,13 @@ | |||
121 | 61 | def run(self): | 61 | def run(self): |
122 | 62 | while not self.finish: | 62 | while not self.finish: |
123 | 63 | try: | 63 | try: |
125 | 64 | args = self.queue.get(True, 1) | 64 | args = self.queue.get(True, 1) |
126 | 65 | progress.report_transfer(args[0], args[1]) | 65 | progress.report_transfer(args[0], args[1]) |
127 | 66 | except Queue.Empty, e: | 66 | except Queue.Empty, e: |
128 | 67 | pass | 67 | pass |
219 | 68 | 68 | ||
220 | 69 | 69 | ||
221 | 70 | def get_connection(scheme, parsed_url): | 70 | class BotoBackend(BotoSingleBackend): |
132 | 71 | try: | ||
133 | 72 | import boto | ||
134 | 73 | assert boto.Version >= BOTO_MIN_VERSION | ||
135 | 74 | |||
136 | 75 | from boto.s3.connection import S3Connection | ||
137 | 76 | assert hasattr(S3Connection, 'lookup') | ||
138 | 77 | |||
139 | 78 | # Newer versions of boto default to using | ||
140 | 79 | # virtual hosting for buckets as a result of | ||
141 | 80 | # upstream deprecation of the old-style access | ||
142 | 81 | # method by Amazon S3. This change is not | ||
143 | 82 | # backwards compatible (in particular with | ||
144 | 83 | # respect to upper case characters in bucket | ||
145 | 84 | # names); so we default to forcing use of the | ||
146 | 85 | # old-style method unless the user has | ||
147 | 86 | # explicitly asked us to use new-style bucket | ||
148 | 87 | # access. | ||
149 | 88 | # | ||
150 | 89 | # Note that if the user wants to use new-style | ||
151 | 90 | # buckets, we use the subdomain calling form | ||
152 | 91 | # rather than given the option of both | ||
153 | 92 | # subdomain and vhost. The reason being that | ||
154 | 93 | # anything addressable as a vhost, is also | ||
155 | 94 | # addressable as a subdomain. Seeing as the | ||
156 | 95 | # latter is mostly a convenience method of | ||
157 | 96 | # allowing browse:able content semi-invisibly | ||
158 | 97 | # being hosted on S3, the former format makes | ||
159 | 98 | # a lot more sense for us to use - being | ||
160 | 99 | # explicit about what is happening (the fact | ||
161 | 100 | # that we are talking to S3 servers). | ||
162 | 101 | |||
163 | 102 | try: | ||
164 | 103 | from boto.s3.connection import OrdinaryCallingFormat | ||
165 | 104 | from boto.s3.connection import SubdomainCallingFormat | ||
166 | 105 | cfs_supported = True | ||
167 | 106 | calling_format = OrdinaryCallingFormat() | ||
168 | 107 | except ImportError: | ||
169 | 108 | cfs_supported = False | ||
170 | 109 | calling_format = None | ||
171 | 110 | |||
172 | 111 | if globals.s3_use_new_style: | ||
173 | 112 | if cfs_supported: | ||
174 | 113 | calling_format = SubdomainCallingFormat() | ||
175 | 114 | else: | ||
176 | 115 | log.FatalError("Use of new-style (subdomain) S3 bucket addressing was" | ||
177 | 116 | "requested, but does not seem to be supported by the " | ||
178 | 117 | "boto library. Either you need to upgrade your boto " | ||
179 | 118 | "library or duplicity has failed to correctly detect " | ||
180 | 119 | "the appropriate support.", | ||
181 | 120 | log.ErrorCode.boto_old_style) | ||
182 | 121 | else: | ||
183 | 122 | if cfs_supported: | ||
184 | 123 | calling_format = OrdinaryCallingFormat() | ||
185 | 124 | else: | ||
186 | 125 | calling_format = None | ||
187 | 126 | |||
188 | 127 | except ImportError: | ||
189 | 128 | log.FatalError("This backend (s3) requires boto library, version %s or later, " | ||
190 | 129 | "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION, | ||
191 | 130 | log.ErrorCode.boto_lib_too_old) | ||
192 | 131 | |||
193 | 132 | if scheme == 's3+http': | ||
194 | 133 | # Use the default Amazon S3 host. | ||
195 | 134 | conn = S3Connection(is_secure=(not globals.s3_unencrypted_connection)) | ||
196 | 135 | else: | ||
197 | 136 | assert scheme == 's3' | ||
198 | 137 | conn = S3Connection( | ||
199 | 138 | host = parsed_url.hostname, | ||
200 | 139 | is_secure=(not globals.s3_unencrypted_connection)) | ||
201 | 140 | |||
202 | 141 | if hasattr(conn, 'calling_format'): | ||
203 | 142 | if calling_format is None: | ||
204 | 143 | log.FatalError("It seems we previously failed to detect support for calling " | ||
205 | 144 | "formats in the boto library, yet the support is there. This is " | ||
206 | 145 | "almost certainly a duplicity bug.", | ||
207 | 146 | log.ErrorCode.boto_calling_format) | ||
208 | 147 | else: | ||
209 | 148 | conn.calling_format = calling_format | ||
210 | 149 | |||
211 | 150 | else: | ||
212 | 151 | # Duplicity hangs if boto gets a null bucket name. | ||
213 | 152 | # HC: Caught a socket error, trying to recover | ||
214 | 153 | raise BackendException('Boto requires a bucket name.') | ||
215 | 154 | return conn | ||
216 | 155 | |||
217 | 156 | |||
218 | 157 | class BotoBackend(duplicity.backend.Backend): | ||
222 | 158 | """ | 71 | """ |
223 | 159 | Backend for Amazon's Simple Storage System, (aka Amazon S3), though | 72 | Backend for Amazon's Simple Storage System, (aka Amazon S3), though |
224 | 160 | the use of the boto module, (http://code.google.com/p/boto/). | 73 | the use of the boto module, (http://code.google.com/p/boto/). |
225 | @@ -167,199 +80,32 @@ | |||
226 | 167 | """ | 80 | """ |
227 | 168 | 81 | ||
228 | 169 | def __init__(self, parsed_url): | 82 | def __init__(self, parsed_url): |
420 | 170 | duplicity.backend.Backend.__init__(self, parsed_url) | 83 | BotoSingleBackend.__init__(self, parsed_url) |
421 | 171 | 84 | self._setup_pool() | |
422 | 172 | from boto.s3.key import Key | 85 | |
423 | 173 | from boto.s3.multipart import MultiPartUpload | 86 | def _setup_pool(self): |
424 | 174 | 87 | number_of_procs = globals.s3_multipart_max_procs | |
425 | 175 | # This folds the null prefix and all null parts, which means that: | 88 | if not number_of_procs: |
426 | 176 | # //MyBucket/ and //MyBucket are equivalent. | 89 | number_of_procs = multiprocessing.cpu_count() |
427 | 177 | # //MyBucket//My///My/Prefix/ and //MyBucket/My/Prefix are equivalent. | 90 | |
428 | 178 | self.url_parts = filter(lambda x: x != '', parsed_url.path.split('/')) | 91 | if getattr(self, '_pool', False): |
429 | 179 | 92 | log.Debug("A process pool already exists. Destroying previous pool.") | |
430 | 180 | if self.url_parts: | 93 | self._pool.terminate() |
431 | 181 | self.bucket_name = self.url_parts.pop(0) | 94 | self._pool.join() |
432 | 182 | else: | 95 | self._pool = None |
433 | 183 | # Duplicity hangs if boto gets a null bucket name. | 96 | |
434 | 184 | # HC: Caught a socket error, trying to recover | 97 | log.Debug("Setting multipart boto backend process pool to %d processes" % number_of_procs) |
435 | 185 | raise BackendException('Boto requires a bucket name.') | 98 | |
436 | 186 | 99 | self._pool = multiprocessing.Pool(processes=number_of_procs) | |
437 | 187 | self.scheme = parsed_url.scheme | 100 | |
438 | 188 | 101 | def close(self): | |
439 | 189 | self.key_class = Key | 102 | BotoSingleBackend.close(self) |
440 | 190 | 103 | log.Debug("Closing pool") | |
441 | 191 | if self.url_parts: | 104 | self._pool.terminate() |
442 | 192 | self.key_prefix = '%s/' % '/'.join(self.url_parts) | 105 | self._pool.join() |
252 | 193 | else: | ||
253 | 194 | self.key_prefix = '' | ||
254 | 195 | |||
255 | 196 | self.straight_url = duplicity.backend.strip_auth_from_url(parsed_url) | ||
256 | 197 | self.parsed_url = parsed_url | ||
257 | 198 | self.resetConnection() | ||
258 | 199 | |||
259 | 200 | def resetConnection(self): | ||
260 | 201 | self.bucket = None | ||
261 | 202 | self.conn = get_connection(self.scheme, self.parsed_url) | ||
262 | 203 | self.bucket = self.conn.lookup(self.bucket_name) | ||
263 | 204 | |||
264 | 205 | def put(self, source_path, remote_filename=None): | ||
265 | 206 | from boto.s3.connection import Location | ||
266 | 207 | if globals.s3_european_buckets: | ||
267 | 208 | if not globals.s3_use_new_style: | ||
268 | 209 | log.FatalError("European bucket creation was requested, but not new-style " | ||
269 | 210 | "bucket addressing (--s3-use-new-style)", | ||
270 | 211 | log.ErrorCode.s3_bucket_not_style) | ||
271 | 212 | #Network glitch may prevent first few attempts of creating/looking up a bucket | ||
272 | 213 | for n in range(1, globals.num_retries+1): | ||
273 | 214 | if self.bucket: | ||
274 | 215 | break | ||
275 | 216 | if n > 1: | ||
276 | 217 | time.sleep(30) | ||
277 | 218 | try: | ||
278 | 219 | try: | ||
279 | 220 | self.bucket = self.conn.get_bucket(self.bucket_name, validate=True) | ||
280 | 221 | except Exception, e: | ||
281 | 222 | if "NoSuchBucket" in str(e): | ||
282 | 223 | if globals.s3_european_buckets: | ||
283 | 224 | self.bucket = self.conn.create_bucket(self.bucket_name, | ||
284 | 225 | location=Location.EU) | ||
285 | 226 | else: | ||
286 | 227 | self.bucket = self.conn.create_bucket(self.bucket_name) | ||
287 | 228 | else: | ||
288 | 229 | raise e | ||
289 | 230 | except Exception, e: | ||
290 | 231 | log.Warn("Failed to create bucket (attempt #%d) '%s' failed (reason: %s: %s)" | ||
291 | 232 | "" % (n, self.bucket_name, | ||
292 | 233 | e.__class__.__name__, | ||
293 | 234 | str(e))) | ||
294 | 235 | self.resetConnection() | ||
295 | 236 | |||
296 | 237 | if not remote_filename: | ||
297 | 238 | remote_filename = source_path.get_filename() | ||
298 | 239 | key = self.key_prefix + remote_filename | ||
299 | 240 | for n in range(1, globals.num_retries+1): | ||
300 | 241 | if n > 1: | ||
301 | 242 | # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long) | ||
302 | 243 | time.sleep(10) | ||
303 | 244 | |||
304 | 245 | if globals.s3_use_rrs: | ||
305 | 246 | storage_class = 'REDUCED_REDUNDANCY' | ||
306 | 247 | else: | ||
307 | 248 | storage_class = 'STANDARD' | ||
308 | 249 | log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class)) | ||
309 | 250 | try: | ||
310 | 251 | headers = { | ||
311 | 252 | 'Content-Type': 'application/octet-stream', | ||
312 | 253 | 'x-amz-storage-class': storage_class | ||
313 | 254 | } | ||
314 | 255 | self.upload(source_path.name, key, headers) | ||
315 | 256 | self.resetConnection() | ||
316 | 257 | return | ||
317 | 258 | except Exception, e: | ||
318 | 259 | log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)" | ||
319 | 260 | "" % (self.straight_url, | ||
320 | 261 | remote_filename, | ||
321 | 262 | n, | ||
322 | 263 | e.__class__.__name__, | ||
323 | 264 | str(e))) | ||
324 | 265 | log.Debug("Backtrace of previous error: %s" % (exception_traceback(),)) | ||
325 | 266 | self.resetConnection() | ||
326 | 267 | log.Warn("Giving up trying to upload %s/%s after %d attempts" % | ||
327 | 268 | (self.straight_url, remote_filename, globals.num_retries)) | ||
328 | 269 | raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename)) | ||
329 | 270 | |||
330 | 271 | def get(self, remote_filename, local_path): | ||
331 | 272 | key = self.key_class(self.bucket) | ||
332 | 273 | key.key = self.key_prefix + remote_filename | ||
333 | 274 | for n in range(1, globals.num_retries+1): | ||
334 | 275 | if n > 1: | ||
335 | 276 | # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long) | ||
336 | 277 | time.sleep(10) | ||
337 | 278 | log.Info("Downloading %s/%s" % (self.straight_url, remote_filename)) | ||
338 | 279 | try: | ||
339 | 280 | key.get_contents_to_filename(local_path.name) | ||
340 | 281 | local_path.setdata() | ||
341 | 282 | self.resetConnection() | ||
342 | 283 | return | ||
343 | 284 | except Exception, e: | ||
344 | 285 | log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)" | ||
345 | 286 | "" % (self.straight_url, | ||
346 | 287 | remote_filename, | ||
347 | 288 | n, | ||
348 | 289 | e.__class__.__name__, | ||
349 | 290 | str(e)), 1) | ||
350 | 291 | log.Debug("Backtrace of previous error: %s" % (exception_traceback(),)) | ||
351 | 292 | self.resetConnection() | ||
352 | 293 | log.Warn("Giving up trying to download %s/%s after %d attempts" % | ||
353 | 294 | (self.straight_url, remote_filename, globals.num_retries)) | ||
354 | 295 | raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename)) | ||
355 | 296 | |||
356 | 297 | def _list(self): | ||
357 | 298 | if not self.bucket: | ||
358 | 299 | raise BackendException("No connection to backend") | ||
359 | 300 | |||
360 | 301 | for n in range(1, globals.num_retries+1): | ||
361 | 302 | if n > 1: | ||
362 | 303 | # sleep before retry | ||
363 | 304 | time.sleep(30) | ||
364 | 305 | log.Info("Listing %s" % self.straight_url) | ||
365 | 306 | try: | ||
366 | 307 | return self._list_filenames_in_bucket() | ||
367 | 308 | except Exception, e: | ||
368 | 309 | log.Warn("List %s failed (attempt #%d, reason: %s: %s)" | ||
369 | 310 | "" % (self.straight_url, | ||
370 | 311 | n, | ||
371 | 312 | e.__class__.__name__, | ||
372 | 313 | str(e)), 1) | ||
373 | 314 | log.Debug("Backtrace of previous error: %s" % (exception_traceback(),)) | ||
374 | 315 | log.Warn("Giving up trying to list %s after %d attempts" % | ||
375 | 316 | (self.straight_url, globals.num_retries)) | ||
376 | 317 | raise BackendException("Error listng %s" % self.straight_url) | ||
377 | 318 | |||
378 | 319 | def _list_filenames_in_bucket(self): | ||
379 | 320 | # We add a 'd' to the prefix to make sure it is not null (for boto) and | ||
380 | 321 | # to optimize the listing of our filenames, which always begin with 'd'. | ||
381 | 322 | # This will cause a failure in the regression tests as below: | ||
382 | 323 | # FAIL: Test basic backend operations | ||
383 | 324 | # <tracback snipped> | ||
384 | 325 | # AssertionError: Got list: [] | ||
385 | 326 | # Wanted: ['testfile'] | ||
386 | 327 | # Because of the need for this optimization, it should be left as is. | ||
387 | 328 | #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'): | ||
388 | 329 | filename_list = [] | ||
389 | 330 | for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'): | ||
390 | 331 | try: | ||
391 | 332 | filename = k.key.replace(self.key_prefix, '', 1) | ||
392 | 333 | filename_list.append(filename) | ||
393 | 334 | log.Debug("Listed %s/%s" % (self.straight_url, filename)) | ||
394 | 335 | except AttributeError: | ||
395 | 336 | pass | ||
396 | 337 | return filename_list | ||
397 | 338 | |||
398 | 339 | def delete(self, filename_list): | ||
399 | 340 | for filename in filename_list: | ||
400 | 341 | self.bucket.delete_key(self.key_prefix + filename) | ||
401 | 342 | log.Debug("Deleted %s/%s" % (self.straight_url, filename)) | ||
402 | 343 | |||
403 | 344 | @retry | ||
404 | 345 | def _query_file_info(self, filename, raise_errors=False): | ||
405 | 346 | try: | ||
406 | 347 | key = self.bucket.lookup(self.key_prefix + filename) | ||
407 | 348 | if key is None: | ||
408 | 349 | return {'size': -1} | ||
409 | 350 | return {'size': key.size} | ||
410 | 351 | except Exception, e: | ||
411 | 352 | log.Warn("Query %s/%s failed: %s" | ||
412 | 353 | "" % (self.straight_url, | ||
413 | 354 | filename, | ||
414 | 355 | str(e))) | ||
415 | 356 | self.resetConnection() | ||
416 | 357 | if raise_errors: | ||
417 | 358 | raise e | ||
418 | 359 | else: | ||
419 | 360 | return {'size': None} | ||
443 | 361 | 106 | ||
444 | 362 | def upload(self, filename, key, headers=None): | 107 | def upload(self, filename, key, headers=None): |
445 | 108 | import boto | ||
446 | 363 | chunk_size = globals.s3_multipart_chunk_size | 109 | chunk_size = globals.s3_multipart_chunk_size |
447 | 364 | 110 | ||
448 | 365 | # Check minimum chunk size for S3 | 111 | # Check minimum chunk size for S3 |
449 | @@ -379,7 +125,7 @@ | |||
450 | 379 | 125 | ||
451 | 380 | log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks)) | 126 | log.Debug("Uploading %d bytes in %d chunks" % (bytes, chunks)) |
452 | 381 | 127 | ||
454 | 382 | mp = self.bucket.initiate_multipart_upload(key, headers) | 128 | mp = self.bucket.initiate_multipart_upload(key.key, headers) |
455 | 383 | 129 | ||
456 | 384 | # Initiate a queue to share progress data between the pool | 130 | # Initiate a queue to share progress data between the pool |
457 | 385 | # workers and a consumer thread, that will collect and report | 131 | # workers and a consumer thread, that will collect and report |
458 | @@ -389,57 +135,81 @@ | |||
459 | 389 | queue = manager.Queue() | 135 | queue = manager.Queue() |
460 | 390 | consumer = ConsumerThread(queue) | 136 | consumer = ConsumerThread(queue) |
461 | 391 | consumer.start() | 137 | consumer.start() |
464 | 392 | 138 | tasks = [] | |
463 | 393 | pool = multiprocessing.Pool(processes=chunks) | ||
465 | 394 | for n in range(chunks): | 139 | for n in range(chunks): |
472 | 395 | params = [self.scheme, self.parsed_url, self.bucket_name, | 140 | storage_uri = boto.storage_uri(self.boto_uri_str) |
473 | 396 | mp.id, filename, n, chunk_size, globals.num_retries, | 141 | params = [self.scheme, self.parsed_url, storage_uri, self.bucket_name, |
474 | 397 | queue] | 142 | mp.id, filename, n, chunk_size, globals.num_retries, |
475 | 398 | pool.apply_async(multipart_upload_worker, params) | 143 | queue] |
476 | 399 | pool.close() | 144 | tasks.append(self._pool.apply_async(multipart_upload_worker, params)) |
477 | 400 | pool.join() | 145 | |
478 | 146 | log.Debug("Waiting for the pool to finish processing %s tasks" % len(tasks)) | ||
479 | 147 | while tasks: | ||
480 | 148 | try: | ||
481 | 149 | tasks[0].wait(timeout=globals.s3_multipart_max_timeout) | ||
482 | 150 | if tasks[0].ready(): | ||
483 | 151 | if tasks[0].successful(): | ||
484 | 152 | del tasks[0] | ||
485 | 153 | else: | ||
486 | 154 | log.Debug("Part upload not successful, aborting multipart upload.") | ||
487 | 155 | self._setup_pool() | ||
488 | 156 | break | ||
489 | 157 | else: | ||
490 | 158 | raise multiprocessing.TimeoutError | ||
491 | 159 | except multiprocessing.TimeoutError: | ||
492 | 160 | log.Debug("%s tasks did not finish by the specified timeout, aborting multipart upload and resetting pool." % len(tasks)) | ||
493 | 161 | self._setup_pool() | ||
494 | 162 | break | ||
495 | 163 | |||
496 | 164 | log.Debug("Done waiting for the pool to finish processing") | ||
497 | 401 | 165 | ||
498 | 402 | # Terminate the consumer thread, if any | 166 | # Terminate the consumer thread, if any |
499 | 403 | if globals.progress: | 167 | if globals.progress: |
500 | 404 | consumer.finish = True | 168 | consumer.finish = True |
501 | 405 | consumer.join() | 169 | consumer.join() |
502 | 406 | 170 | ||
504 | 407 | if len(mp.get_all_parts()) < chunks: | 171 | if len(tasks) > 0 or len(mp.get_all_parts()) < chunks: |
505 | 408 | mp.cancel_upload() | 172 | mp.cancel_upload() |
506 | 409 | raise BackendException("Multipart upload failed. Aborted.") | 173 | raise BackendException("Multipart upload failed. Aborted.") |
507 | 410 | 174 | ||
508 | 411 | return mp.complete_upload() | 175 | return mp.complete_upload() |
509 | 412 | 176 | ||
510 | 413 | 177 | ||
513 | 414 | def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename, | 178 | def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id, |
514 | 415 | offset, bytes, num_retries, queue): | 179 | filename, offset, bytes, num_retries, queue): |
515 | 416 | """ | 180 | """ |
516 | 417 | Worker method for uploading a file chunk to S3 using multipart upload. | 181 | Worker method for uploading a file chunk to S3 using multipart upload. |
517 | 418 | Note that the file chunk is read into memory, so it's important to keep | 182 | Note that the file chunk is read into memory, so it's important to keep |
518 | 419 | this number reasonably small. | 183 | this number reasonably small. |
519 | 420 | """ | 184 | """ |
520 | 421 | import traceback | ||
521 | 422 | 185 | ||
522 | 423 | def _upload_callback(uploaded, total): | 186 | def _upload_callback(uploaded, total): |
523 | 424 | worker_name = multiprocessing.current_process().name | 187 | worker_name = multiprocessing.current_process().name |
524 | 425 | log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total)) | 188 | log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total)) |
525 | 426 | if not queue is None: | 189 | if not queue is None: |
527 | 427 | queue.put([uploaded, total]) # Push data to the consumer thread | 190 | queue.put([uploaded, total]) # Push data to the consumer thread |
528 | 428 | 191 | ||
529 | 429 | def _upload(num_retries): | 192 | def _upload(num_retries): |
530 | 430 | worker_name = multiprocessing.current_process().name | 193 | worker_name = multiprocessing.current_process().name |
531 | 431 | log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1)) | 194 | log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1)) |
532 | 432 | try: | 195 | try: |
534 | 433 | conn = get_connection(scheme, parsed_url) | 196 | conn = get_connection(scheme, parsed_url, storage_uri) |
535 | 434 | bucket = conn.lookup(bucket_name) | 197 | bucket = conn.lookup(bucket_name) |
536 | 435 | 198 | ||
538 | 436 | for mp in bucket.get_all_multipart_uploads(): | 199 | for mp in bucket.list_multipart_uploads(): |
539 | 437 | if mp.id == multipart_id: | 200 | if mp.id == multipart_id: |
540 | 438 | with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd: | 201 | with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd: |
541 | 202 | start = time.time() | ||
542 | 439 | mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback, | 203 | mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback, |
545 | 440 | num_cb=max(2, 8 * bytes / (1024 * 1024)) | 204 | num_cb=max(2, 8 * bytes / (1024 * 1024)) |
546 | 441 | ) # Max num of callbacks = 8 times x megabyte | 205 | ) # Max num of callbacks = 8 times x megabyte |
547 | 206 | end = time.time() | ||
548 | 207 | log.Debug("{name}: Uploaded chunk {chunk} at roughly {speed} bytes/second".format(name=worker_name, chunk=offset+1, speed=(bytes/max(1, abs(end-start))))) | ||
549 | 442 | break | 208 | break |
550 | 209 | conn.close() | ||
551 | 210 | conn = None | ||
552 | 211 | bucket = None | ||
553 | 212 | del conn | ||
554 | 443 | except Exception, e: | 213 | except Exception, e: |
555 | 444 | traceback.print_exc() | 214 | traceback.print_exc() |
556 | 445 | if num_retries: | 215 | if num_retries: |
557 | @@ -452,6 +222,3 @@ | |||
558 | 452 | log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1)) | 222 | log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1)) |
559 | 453 | 223 | ||
560 | 454 | return _upload(num_retries) | 224 | return _upload(num_retries) |
561 | 455 | |||
562 | 456 | duplicity.backend.register_backend("s3", BotoBackend) | ||
563 | 457 | duplicity.backend.register_backend("s3+http", BotoBackend) | ||
564 | 458 | 225 | ||
565 | === modified file 'duplicity/backends/_boto_single.py' | |||
566 | --- duplicity/backends/_boto_single.py 2014-01-13 15:54:13 +0000 | |||
567 | +++ duplicity/backends/_boto_single.py 2014-02-26 19:49:10 +0000 | |||
568 | @@ -19,6 +19,7 @@ | |||
569 | 19 | # along with duplicity; if not, write to the Free Software Foundation, | 19 | # along with duplicity; if not, write to the Free Software Foundation, |
570 | 20 | # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | 20 | # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
571 | 21 | 21 | ||
572 | 22 | import os | ||
573 | 22 | import time | 23 | import time |
574 | 23 | 24 | ||
575 | 24 | import duplicity.backend | 25 | import duplicity.backend |
576 | @@ -29,7 +30,90 @@ | |||
577 | 29 | from duplicity.backend import retry | 30 | from duplicity.backend import retry |
578 | 30 | from duplicity import progress | 31 | from duplicity import progress |
579 | 31 | 32 | ||
581 | 32 | BOTO_MIN_VERSION = "2.0" | 33 | BOTO_MIN_VERSION = "2.1.1" |
582 | 34 | |||
583 | 35 | |||
584 | 36 | def get_connection(scheme, parsed_url, storage_uri): | ||
585 | 37 | try: | ||
586 | 38 | from boto.s3.connection import S3Connection | ||
587 | 39 | assert hasattr(S3Connection, 'lookup') | ||
588 | 40 | |||
589 | 41 | # Newer versions of boto default to using | ||
590 | 42 | # virtual hosting for buckets as a result of | ||
591 | 43 | # upstream deprecation of the old-style access | ||
592 | 44 | # method by Amazon S3. This change is not | ||
593 | 45 | # backwards compatible (in particular with | ||
594 | 46 | # respect to upper case characters in bucket | ||
595 | 47 | # names); so we default to forcing use of the | ||
596 | 48 | # old-style method unless the user has | ||
597 | 49 | # explicitly asked us to use new-style bucket | ||
598 | 50 | # access. | ||
599 | 51 | # | ||
600 | 52 | # Note that if the user wants to use new-style | ||
601 | 53 | # buckets, we use the subdomain calling form | ||
602 | 54 | # rather than given the option of both | ||
603 | 55 | # subdomain and vhost. The reason being that | ||
604 | 56 | # anything addressable as a vhost, is also | ||
605 | 57 | # addressable as a subdomain. Seeing as the | ||
606 | 58 | # latter is mostly a convenience method of | ||
607 | 59 | # allowing browse:able content semi-invisibly | ||
608 | 60 | # being hosted on S3, the former format makes | ||
609 | 61 | # a lot more sense for us to use - being | ||
610 | 62 | # explicit about what is happening (the fact | ||
611 | 63 | # that we are talking to S3 servers). | ||
612 | 64 | |||
613 | 65 | try: | ||
614 | 66 | from boto.s3.connection import OrdinaryCallingFormat | ||
615 | 67 | from boto.s3.connection import SubdomainCallingFormat | ||
616 | 68 | cfs_supported = True | ||
617 | 69 | calling_format = OrdinaryCallingFormat() | ||
618 | 70 | except ImportError: | ||
619 | 71 | cfs_supported = False | ||
620 | 72 | calling_format = None | ||
621 | 73 | |||
622 | 74 | if globals.s3_use_new_style: | ||
623 | 75 | if cfs_supported: | ||
624 | 76 | calling_format = SubdomainCallingFormat() | ||
625 | 77 | else: | ||
626 | 78 | log.FatalError("Use of new-style (subdomain) S3 bucket addressing was" | ||
627 | 79 | "requested, but does not seem to be supported by the " | ||
628 | 80 | "boto library. Either you need to upgrade your boto " | ||
629 | 81 | "library or duplicity has failed to correctly detect " | ||
630 | 82 | "the appropriate support.", | ||
631 | 83 | log.ErrorCode.boto_old_style) | ||
632 | 84 | else: | ||
633 | 85 | if cfs_supported: | ||
634 | 86 | calling_format = OrdinaryCallingFormat() | ||
635 | 87 | else: | ||
636 | 88 | calling_format = None | ||
637 | 89 | |||
638 | 90 | except ImportError: | ||
639 | 91 | log.FatalError("This backend (s3) requires boto library, version %s or later, " | ||
640 | 92 | "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION, | ||
641 | 93 | log.ErrorCode.boto_lib_too_old) | ||
642 | 94 | |||
643 | 95 | if not parsed_url.hostname: | ||
644 | 96 | # Use the default host. | ||
645 | 97 | conn = storage_uri.connect(is_secure=(not globals.s3_unencrypted_connection)) | ||
646 | 98 | else: | ||
647 | 99 | assert scheme == 's3' | ||
648 | 100 | conn = storage_uri.connect(host=parsed_url.hostname, | ||
649 | 101 | is_secure=(not globals.s3_unencrypted_connection)) | ||
650 | 102 | |||
651 | 103 | if hasattr(conn, 'calling_format'): | ||
652 | 104 | if calling_format is None: | ||
653 | 105 | log.FatalError("It seems we previously failed to detect support for calling " | ||
654 | 106 | "formats in the boto library, yet the support is there. This is " | ||
655 | 107 | "almost certainly a duplicity bug.", | ||
656 | 108 | log.ErrorCode.boto_calling_format) | ||
657 | 109 | else: | ||
658 | 110 | conn.calling_format = calling_format | ||
659 | 111 | |||
660 | 112 | else: | ||
661 | 113 | # Duplicity hangs if boto gets a null bucket name. | ||
662 | 114 | # HC: Caught a socket error, trying to recover | ||
663 | 115 | raise BackendException('Boto requires a bucket name.') | ||
664 | 116 | return conn | ||
665 | 33 | 117 | ||
666 | 34 | 118 | ||
667 | 35 | class BotoBackend(duplicity.backend.Backend): | 119 | class BotoBackend(duplicity.backend.Backend): |
668 | @@ -76,96 +160,28 @@ | |||
669 | 76 | # boto uses scheme://bucket[/name] and specifies hostname on connect() | 160 | # boto uses scheme://bucket[/name] and specifies hostname on connect() |
670 | 77 | self.boto_uri_str = '://'.join((parsed_url.scheme[:2], | 161 | self.boto_uri_str = '://'.join((parsed_url.scheme[:2], |
671 | 78 | parsed_url.path.lstrip('/'))) | 162 | parsed_url.path.lstrip('/'))) |
672 | 79 | self.storage_uri = boto.storage_uri(self.boto_uri_str) | ||
673 | 80 | self.resetConnection() | 163 | self.resetConnection() |
674 | 164 | self._listed_keys = {} | ||
675 | 165 | |||
676 | 166 | def close(self): | ||
677 | 167 | del self._listed_keys | ||
678 | 168 | self._listed_keys = {} | ||
679 | 169 | self.bucket = None | ||
680 | 170 | self.conn = None | ||
681 | 171 | self.storage_uri = None | ||
682 | 172 | del self.conn | ||
683 | 173 | del self.storage_uri | ||
684 | 81 | 174 | ||
685 | 82 | def resetConnection(self): | 175 | def resetConnection(self): |
686 | 176 | if getattr(self, 'conn', False): | ||
687 | 177 | self.conn.close() | ||
688 | 83 | self.bucket = None | 178 | self.bucket = None |
689 | 84 | self.conn = None | 179 | self.conn = None |
774 | 85 | 180 | self.storage_uri = None | |
775 | 86 | try: | 181 | del self.conn |
776 | 87 | from boto.s3.connection import S3Connection | 182 | del self.storage_uri |
777 | 88 | from boto.s3.key import Key | 183 | self.storage_uri = boto.storage_uri(self.boto_uri_str) |
778 | 89 | assert hasattr(S3Connection, 'lookup') | 184 | self.conn = get_connection(self.scheme, self.parsed_url, self.storage_uri) |
695 | 90 | |||
696 | 91 | # Newer versions of boto default to using | ||
697 | 92 | # virtual hosting for buckets as a result of | ||
698 | 93 | # upstream deprecation of the old-style access | ||
699 | 94 | # method by Amazon S3. This change is not | ||
700 | 95 | # backwards compatible (in particular with | ||
701 | 96 | # respect to upper case characters in bucket | ||
702 | 97 | # names); so we default to forcing use of the | ||
703 | 98 | # old-style method unless the user has | ||
704 | 99 | # explicitly asked us to use new-style bucket | ||
705 | 100 | # access. | ||
706 | 101 | # | ||
707 | 102 | # Note that if the user wants to use new-style | ||
708 | 103 | # buckets, we use the subdomain calling form | ||
709 | 104 | # rather than given the option of both | ||
710 | 105 | # subdomain and vhost. The reason being that | ||
711 | 106 | # anything addressable as a vhost, is also | ||
712 | 107 | # addressable as a subdomain. Seeing as the | ||
713 | 108 | # latter is mostly a convenience method of | ||
714 | 109 | # allowing browse:able content semi-invisibly | ||
715 | 110 | # being hosted on S3, the former format makes | ||
716 | 111 | # a lot more sense for us to use - being | ||
717 | 112 | # explicit about what is happening (the fact | ||
718 | 113 | # that we are talking to S3 servers). | ||
719 | 114 | |||
720 | 115 | try: | ||
721 | 116 | from boto.s3.connection import OrdinaryCallingFormat | ||
722 | 117 | from boto.s3.connection import SubdomainCallingFormat | ||
723 | 118 | cfs_supported = True | ||
724 | 119 | calling_format = OrdinaryCallingFormat() | ||
725 | 120 | except ImportError: | ||
726 | 121 | cfs_supported = False | ||
727 | 122 | calling_format = None | ||
728 | 123 | |||
729 | 124 | if globals.s3_use_new_style: | ||
730 | 125 | if cfs_supported: | ||
731 | 126 | calling_format = SubdomainCallingFormat() | ||
732 | 127 | else: | ||
733 | 128 | log.FatalError("Use of new-style (subdomain) S3 bucket addressing was" | ||
734 | 129 | "requested, but does not seem to be supported by the " | ||
735 | 130 | "boto library. Either you need to upgrade your boto " | ||
736 | 131 | "library or duplicity has failed to correctly detect " | ||
737 | 132 | "the appropriate support.", | ||
738 | 133 | log.ErrorCode.boto_old_style) | ||
739 | 134 | else: | ||
740 | 135 | if cfs_supported: | ||
741 | 136 | calling_format = OrdinaryCallingFormat() | ||
742 | 137 | else: | ||
743 | 138 | calling_format = None | ||
744 | 139 | |||
745 | 140 | except ImportError: | ||
746 | 141 | log.FatalError("This backend (s3) requires boto library, version %s or later, " | ||
747 | 142 | "(http://code.google.com/p/boto/)." % BOTO_MIN_VERSION, | ||
748 | 143 | log.ErrorCode.boto_lib_too_old) | ||
749 | 144 | |||
750 | 145 | if not self.parsed_url.hostname: | ||
751 | 146 | # Use the default host. | ||
752 | 147 | self.conn = self.storage_uri.connect( | ||
753 | 148 | is_secure=(not globals.s3_unencrypted_connection)) | ||
754 | 149 | else: | ||
755 | 150 | assert self.scheme == 's3' | ||
756 | 151 | self.conn = self.storage_uri.connect( | ||
757 | 152 | host=self.parsed_url.hostname, | ||
758 | 153 | is_secure=(not globals.s3_unencrypted_connection)) | ||
759 | 154 | |||
760 | 155 | if hasattr(self.conn, 'calling_format'): | ||
761 | 156 | if calling_format is None: | ||
762 | 157 | log.FatalError("It seems we previously failed to detect support for calling " | ||
763 | 158 | "formats in the boto library, yet the support is there. This is " | ||
764 | 159 | "almost certainly a duplicity bug.", | ||
765 | 160 | log.ErrorCode.boto_calling_format) | ||
766 | 161 | else: | ||
767 | 162 | self.conn.calling_format = calling_format | ||
768 | 163 | |||
769 | 164 | else: | ||
770 | 165 | # Duplicity hangs if boto gets a null bucket name. | ||
771 | 166 | # HC: Caught a socket error, trying to recover | ||
772 | 167 | raise BackendException('Boto requires a bucket name.') | ||
773 | 168 | |||
779 | 169 | self.bucket = self.conn.lookup(self.bucket_name) | 185 | self.bucket = self.conn.lookup(self.bucket_name) |
780 | 170 | 186 | ||
781 | 171 | def put(self, source_path, remote_filename=None): | 187 | def put(self, source_path, remote_filename=None): |
782 | @@ -181,6 +197,7 @@ | |||
783 | 181 | break | 197 | break |
784 | 182 | if n > 1: | 198 | if n > 1: |
785 | 183 | time.sleep(30) | 199 | time.sleep(30) |
786 | 200 | self.resetConnection() | ||
787 | 184 | try: | 201 | try: |
788 | 185 | try: | 202 | try: |
789 | 186 | self.bucket = self.conn.get_bucket(self.bucket_name, validate=True) | 203 | self.bucket = self.conn.get_bucket(self.bucket_name, validate=True) |
790 | @@ -198,7 +215,6 @@ | |||
791 | 198 | "" % (n, self.bucket_name, | 215 | "" % (n, self.bucket_name, |
792 | 199 | e.__class__.__name__, | 216 | e.__class__.__name__, |
793 | 200 | str(e))) | 217 | str(e))) |
794 | 201 | self.resetConnection() | ||
795 | 202 | 218 | ||
796 | 203 | if not remote_filename: | 219 | if not remote_filename: |
797 | 204 | remote_filename = source_path.get_filename() | 220 | remote_filename = source_path.get_filename() |
798 | @@ -215,14 +231,17 @@ | |||
799 | 215 | storage_class = 'STANDARD' | 231 | storage_class = 'STANDARD' |
800 | 216 | log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class)) | 232 | log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class)) |
801 | 217 | try: | 233 | try: |
809 | 218 | key.set_contents_from_filename(source_path.name, {'Content-Type': 'application/octet-stream', | 234 | headers = { |
810 | 219 | 'x-amz-storage-class': storage_class}, | 235 | 'Content-Type': 'application/octet-stream', |
811 | 220 | cb=progress.report_transfer, | 236 | 'x-amz-storage-class': storage_class |
812 | 221 | num_cb=(max(2, 8 * globals.volsize / (1024 * 1024))) | 237 | } |
813 | 222 | ) # Max num of callbacks = 8 times x megabyte | 238 | upload_start = time.time() |
814 | 223 | 239 | self.upload(source_path.name, key, headers) | |
815 | 224 | key.close() | 240 | upload_end = time.time() |
816 | 241 | total_s = abs(upload_end-upload_start) or 1 # prevent a zero value! | ||
817 | 242 | rough_upload_speed = os.path.getsize(source_path.name)/total_s | ||
818 | 225 | self.resetConnection() | 243 | self.resetConnection() |
819 | 244 | log.Debug("Uploaded %s/%s to %s Storage at roughly %f bytes/second" % (self.straight_url, remote_filename, storage_class, rough_upload_speed)) | ||
820 | 226 | return | 245 | return |
821 | 227 | except Exception, e: | 246 | except Exception, e: |
822 | 228 | log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)" | 247 | log.Warn("Upload '%s/%s' failed (attempt #%d, reason: %s: %s)" |
823 | @@ -238,19 +257,18 @@ | |||
824 | 238 | raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename)) | 257 | raise BackendException("Error uploading %s/%s" % (self.straight_url, remote_filename)) |
825 | 239 | 258 | ||
826 | 240 | def get(self, remote_filename, local_path): | 259 | def get(self, remote_filename, local_path): |
827 | 260 | key_name = self.key_prefix + remote_filename | ||
828 | 261 | self.pre_process_download(remote_filename, wait=True) | ||
829 | 262 | key = self._listed_keys[key_name] | ||
830 | 241 | for n in range(1, globals.num_retries+1): | 263 | for n in range(1, globals.num_retries+1): |
831 | 242 | if n > 1: | 264 | if n > 1: |
832 | 243 | # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long) | 265 | # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long) |
833 | 244 | time.sleep(10) | 266 | time.sleep(10) |
834 | 245 | log.Info("Downloading %s/%s" % (self.straight_url, remote_filename)) | 267 | log.Info("Downloading %s/%s" % (self.straight_url, remote_filename)) |
835 | 246 | try: | 268 | try: |
840 | 247 | key_name = self.key_prefix + remote_filename | 269 | self.resetConnection() |
837 | 248 | key = self.bucket.get_key(key_name) | ||
838 | 249 | if key is None: | ||
839 | 250 | raise BackendException("%s: key not found" % key_name) | ||
841 | 251 | key.get_contents_to_filename(local_path.name) | 270 | key.get_contents_to_filename(local_path.name) |
842 | 252 | local_path.setdata() | 271 | local_path.setdata() |
843 | 253 | self.resetConnection() | ||
844 | 254 | return | 272 | return |
845 | 255 | except Exception, e: | 273 | except Exception, e: |
846 | 256 | log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)" | 274 | log.Warn("Download %s/%s failed (attempt #%d, reason: %s: %s)" |
847 | @@ -260,7 +278,7 @@ | |||
848 | 260 | e.__class__.__name__, | 278 | e.__class__.__name__, |
849 | 261 | str(e)), 1) | 279 | str(e)), 1) |
850 | 262 | log.Debug("Backtrace of previous error: %s" % (exception_traceback(),)) | 280 | log.Debug("Backtrace of previous error: %s" % (exception_traceback(),)) |
852 | 263 | self.resetConnection() | 281 | |
853 | 264 | log.Warn("Giving up trying to download %s/%s after %d attempts" % | 282 | log.Warn("Giving up trying to download %s/%s after %d attempts" % |
854 | 265 | (self.straight_url, remote_filename, globals.num_retries)) | 283 | (self.straight_url, remote_filename, globals.num_retries)) |
855 | 266 | raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename)) | 284 | raise BackendException("Error downloading %s/%s" % (self.straight_url, remote_filename)) |
856 | @@ -273,6 +291,7 @@ | |||
857 | 273 | if n > 1: | 291 | if n > 1: |
858 | 274 | # sleep before retry | 292 | # sleep before retry |
859 | 275 | time.sleep(30) | 293 | time.sleep(30) |
860 | 294 | self.resetConnection() | ||
861 | 276 | log.Info("Listing %s" % self.straight_url) | 295 | log.Info("Listing %s" % self.straight_url) |
862 | 277 | try: | 296 | try: |
863 | 278 | return self._list_filenames_in_bucket() | 297 | return self._list_filenames_in_bucket() |
864 | @@ -298,10 +317,11 @@ | |||
865 | 298 | # Because of the need for this optimization, it should be left as is. | 317 | # Because of the need for this optimization, it should be left as is. |
866 | 299 | #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'): | 318 | #for k in self.bucket.list(prefix = self.key_prefix + 'd', delimiter = '/'): |
867 | 300 | filename_list = [] | 319 | filename_list = [] |
869 | 301 | for k in self.bucket.list(prefix = self.key_prefix, delimiter = '/'): | 320 | for k in self.bucket.list(prefix=self.key_prefix, delimiter='/'): |
870 | 302 | try: | 321 | try: |
871 | 303 | filename = k.key.replace(self.key_prefix, '', 1) | 322 | filename = k.key.replace(self.key_prefix, '', 1) |
872 | 304 | filename_list.append(filename) | 323 | filename_list.append(filename) |
873 | 324 | self._listed_keys[k.key] = k | ||
874 | 305 | log.Debug("Listed %s/%s" % (self.straight_url, filename)) | 325 | log.Debug("Listed %s/%s" % (self.straight_url, filename)) |
875 | 306 | except AttributeError: | 326 | except AttributeError: |
876 | 307 | pass | 327 | pass |
877 | @@ -330,6 +350,53 @@ | |||
878 | 330 | else: | 350 | else: |
879 | 331 | return {'size': None} | 351 | return {'size': None} |
880 | 332 | 352 | ||
884 | 333 | duplicity.backend.register_backend("gs", BotoBackend) | 353 | def upload(self, filename, key, headers): |
885 | 334 | duplicity.backend.register_backend("s3", BotoBackend) | 354 | key.set_contents_from_filename(filename, headers, |
886 | 335 | duplicity.backend.register_backend("s3+http", BotoBackend) | 355 | cb=progress.report_transfer, |
887 | 356 | num_cb=(max(2, 8 * globals.volsize / (1024 * 1024))) | ||
888 | 357 | ) # Max num of callbacks = 8 times x megabyte | ||
889 | 358 | key.close() | ||
890 | 359 | |||
891 | 360 | def pre_process_download(self, files_to_download, wait=False): | ||
892 | 361 | # Used primarily to move files in Glacier to S3 | ||
893 | 362 | if isinstance(files_to_download, basestring): | ||
894 | 363 | files_to_download = [files_to_download] | ||
895 | 364 | |||
896 | 365 | for remote_filename in files_to_download: | ||
897 | 366 | success = False | ||
898 | 367 | for n in range(1, globals.num_retries+1): | ||
899 | 368 | if n > 1: | ||
900 | 369 | # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long) | ||
901 | 370 | time.sleep(10) | ||
902 | 371 | self.resetConnection() | ||
903 | 372 | try: | ||
904 | 373 | key_name = self.key_prefix + remote_filename | ||
905 | 374 | if not self._listed_keys.get(key_name, False): | ||
906 | 375 | self._listed_keys[key_name] = list(self.bucket.list(key_name))[0] | ||
907 | 376 | key = self._listed_keys[key_name] | ||
908 | 377 | |||
909 | 378 | if key.storage_class == "GLACIER": | ||
910 | 379 | # We need to move the file out of glacier | ||
911 | 380 | if not self.bucket.get_key(key.key).ongoing_restore: | ||
912 | 381 | log.Info("File %s is in Glacier storage, restoring to S3" % remote_filename) | ||
913 | 382 | key.restore(days=1) # Shouldn't need this again after 1 day | ||
914 | 383 | if wait: | ||
915 | 384 | log.Info("Waiting for file %s to restore from Glacier" % remote_filename) | ||
916 | 385 | while self.bucket.get_key(key.key).ongoing_restore: | ||
917 | 386 | time.sleep(60) | ||
918 | 387 | self.resetConnection() | ||
919 | 388 | log.Info("File %s was successfully restored from Glacier" % remote_filename) | ||
920 | 389 | success = True | ||
921 | 390 | break | ||
922 | 391 | except Exception, e: | ||
923 | 392 | log.Warn("Restoration from Glacier for file %s/%s failed (attempt #%d, reason: %s: %s)" | ||
924 | 393 | "" % (self.straight_url, | ||
925 | 394 | remote_filename, | ||
926 | 395 | n, | ||
927 | 396 | e.__class__.__name__, | ||
928 | 397 | str(e)), 1) | ||
929 | 398 | log.Debug("Backtrace of previous error: %s" % (exception_traceback(),)) | ||
930 | 399 | if not success: | ||
931 | 400 | log.Warn("Giving up trying to restore %s/%s after %d attempts" % | ||
932 | 401 | (self.straight_url, remote_filename, globals.num_retries)) | ||
933 | 402 | raise BackendException("Error restoring %s/%s from Glacier to S3" % (self.straight_url, remote_filename)) | ||
934 | 336 | 403 | ||
935 | === modified file 'duplicity/backends/botobackend.py' | |||
936 | --- duplicity/backends/botobackend.py 2012-02-29 16:40:41 +0000 | |||
937 | +++ duplicity/backends/botobackend.py 2014-02-26 19:49:10 +0000 | |||
938 | @@ -20,13 +20,20 @@ | |||
939 | 20 | # along with duplicity; if not, write to the Free Software Foundation, | 20 | # along with duplicity; if not, write to the Free Software Foundation, |
940 | 21 | # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | 21 | # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
941 | 22 | 22 | ||
942 | 23 | import duplicity.backend | ||
943 | 23 | from duplicity import globals | 24 | from duplicity import globals |
944 | 24 | import sys | 25 | import sys |
945 | 26 | from _boto_multi import BotoBackend as BotoMultiUploadBackend | ||
946 | 27 | from _boto_single import BotoBackend as BotoSingleUploadBackend | ||
947 | 25 | 28 | ||
948 | 26 | if globals.s3_use_multiprocessing: | 29 | if globals.s3_use_multiprocessing: |
951 | 27 | if sys.version_info[:2] < (2,6): | 30 | if sys.version_info[:2] < (2, 6): |
952 | 28 | print "Sorry, S3 multiprocessing requires version 2.5 or later of python" | 31 | print "Sorry, S3 multiprocessing requires version 2.6 or later of python" |
953 | 29 | sys.exit(1) | 32 | sys.exit(1) |
955 | 30 | import _boto_multi | 33 | duplicity.backend.register_backend("gs", BotoMultiUploadBackend) |
956 | 34 | duplicity.backend.register_backend("s3", BotoMultiUploadBackend) | ||
957 | 35 | duplicity.backend.register_backend("s3+http", BotoMultiUploadBackend) | ||
958 | 31 | else: | 36 | else: |
960 | 32 | import _boto_single | 37 | duplicity.backend.register_backend("gs", BotoSingleUploadBackend) |
961 | 38 | duplicity.backend.register_backend("s3", BotoSingleUploadBackend) | ||
962 | 39 | duplicity.backend.register_backend("s3+http", BotoSingleUploadBackend) | ||
963 | 33 | 40 | ||
964 | === modified file 'duplicity/commandline.py' | |||
965 | --- duplicity/commandline.py 2014-01-31 12:41:00 +0000 | |||
966 | +++ duplicity/commandline.py 2014-02-26 19:49:10 +0000 | |||
967 | @@ -495,6 +495,14 @@ | |||
968 | 495 | parser.add_option("--s3-multipart-chunk-size", type = "int", action = "callback", metavar = _("number"), | 495 | parser.add_option("--s3-multipart-chunk-size", type = "int", action = "callback", metavar = _("number"), |
969 | 496 | callback = lambda o, s, v, p: setattr(p.values, "s3_multipart_chunk_size", v * 1024 * 1024)) | 496 | callback = lambda o, s, v, p: setattr(p.values, "s3_multipart_chunk_size", v * 1024 * 1024)) |
970 | 497 | 497 | ||
971 | 498 | # Number of processes to set the Processor Pool to when uploading multipart | ||
972 | 499 | # uploads to S3. Use this to control the maximum simultaneous uploads to S3. | ||
973 | 500 | parser.add_option("--s3-multipart-max-procs", type="int", metavar=_("number")) | ||
974 | 501 | |||
975 | 502 | # Number of seconds to wait for each part of a multipart upload to S3. Use this | ||
976 | 503 | # to prevent hangups when doing a multipart upload to S3. | ||
977 | 504 | parser.add_option("--s3_multipart_max_timeout", type="int", metavar=_("number")) | ||
978 | 505 | |||
979 | 498 | # Option to allow the s3/boto backend use the multiprocessing version. | 506 | # Option to allow the s3/boto backend use the multiprocessing version. |
980 | 499 | # By default it is off since it does not work for Python 2.4 or 2.5. | 507 | # By default it is off since it does not work for Python 2.4 or 2.5. |
981 | 500 | if sys.version_info[:2] >= (2, 6): | 508 | if sys.version_info[:2] >= (2, 6): |
982 | 501 | 509 | ||
983 | === modified file 'duplicity/globals.py' | |||
984 | --- duplicity/globals.py 2014-01-31 12:41:00 +0000 | |||
985 | +++ duplicity/globals.py 2014-02-26 19:49:10 +0000 | |||
986 | @@ -200,6 +200,12 @@ | |||
987 | 200 | # Minimum chunk size accepted by S3 | 200 | # Minimum chunk size accepted by S3 |
988 | 201 | s3_multipart_minimum_chunk_size = 5 * 1024 * 1024 | 201 | s3_multipart_minimum_chunk_size = 5 * 1024 * 1024 |
989 | 202 | 202 | ||
990 | 203 | # Maximum number of processes to use while doing a multipart upload to S3 | ||
991 | 204 | s3_multipart_max_procs = None | ||
992 | 205 | |||
993 | 206 | # Maximum time to wait for a part to finish when doig a multipart upload to S3 | ||
994 | 207 | s3_multipart_max_timeout = None | ||
995 | 208 | |||
996 | 203 | # Whether to use the full email address as the user name when | 209 | # Whether to use the full email address as the user name when |
997 | 204 | # logging into an imap server. If false just the user name | 210 | # logging into an imap server. If false just the user name |
998 | 205 | # part of the email address is used. | 211 | # part of the email address is used. |
Prateek,
please add your new switches to bin/dupliciy.1 manpage.
also update the requirements documentation in there that you updated in Readme or changed for the backend.
aside from that i assume you extensively tested the changes?
..ede/duply.net