Merge lp:~niemeyer/goamz/put-all into lp:~gophers/goamz/trunk
- put-all
- Merge into trunk
Status: | Merged |
---|---|
Merged at revision: | 31 |
Proposed branch: | lp:~niemeyer/goamz/put-all |
Merge into: | lp:~gophers/goamz/trunk |
Diff against target: |
632 lines (+366/-54) 5 files modified
s3/multi.go (+109/-17) s3/multi_test.go (+218/-26) s3/responses_test.go (+6/-6) s3/s3_test.go (+11/-4) s3/s3i_test.go (+22/-1) |
To merge this branch: | bzr merge lp:~niemeyer/goamz/put-all |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
The Go Language Gophers | Pending | ||
Review via email: mp+147829@code.launchpad.net |
Commit message
Description of the change
s3: Multi.PutAll for auto-chunking and resuming
Bucket.Multi method also added for trivial resume-or-init handling.
Gustavo Niemeyer (niemeyer) wrote : | # |
John A Meinel (jameinel) wrote : | # |
LGTM
https:/
File s3/multi.go (right):
https:/
s3/multi.go:272: // the new part, or otherwise overwritten with the new
content.
does partSize need to match exactly the pre-uploaded content? I don't
see places where that is checked here. Probably at least a doc comment
about it.
https:/
s3/multi.go:282: if totalSize == 0 || err != nil {
What will this return if totalSize is 0? the return of ListParts? But
doesn't that violate the 'you should return nil, otherwise the caller
gets an interface object that contains nil but isn't nil'?
So I would probably split this:
if err != nil {
return nil, err
} else if totalSize == 0 {
return nil, nil
}
Though since you don't have a result object, wouldn't you want a non-nil
err? Otherwise you'll get a segfault accessing the result later.
https:/
s3/multi.go:310: part, err := m.putPart(current, section, partSize,
md5b64)
As an aside, would there be any reason to do parallel uploads?
Dimiter Naydenov (dimitern) wrote : | # |
LGTM, a few suggestions.
https:/
File s3/multi.go (right):
https:/
s3/multi.go:227: var listPartsMax = 1000
Is this an absolute limit of all parts per client connection?
https:/
s3/multi.go:282: if totalSize == 0 || err != nil {
On 2013/02/12 07:32:38, jameinel wrote:
> What will this return if totalSize is 0? the return of ListParts? But
doesn't
> that violate the 'you should return nil, otherwise the caller gets an
interface
> object that contains nil but isn't nil'?
> So I would probably split this:
> if err != nil {
> return nil, err
> } else if totalSize == 0 {
> return nil, nil
> }
> Though since you don't have a result object, wouldn't you want a
non-nil err?
> Otherwise you'll get a segfault accessing the result later.
ISTM if totalSize == 0 it will return nil, nil, since there won't be an
error. But if it's not obvious, +1 on the suggestion.
https:/
File s3/multi_test.go (right):
https:/
s3/multi_
"JNbR_[
Maybe move this in assertId()? This seems to be used multiple times, so
it'll be better to have it at one place, I think.
Gustavo Niemeyer (niemeyer) wrote : | # |
Thanks for the reviews.
https:/
File s3/multi.go (right):
https:/
s3/multi.go:227: var listPartsMax = 1000
On 2013/02/12 08:02:52, dimitern wrote:
> Is this an absolute limit of all parts per client connection?
No, this is a per-request limit, and 1000 is the default limit as well.
It's here as a variable just so we can test it. I'll add a comment.
https:/
s3/multi.go:272: // the new part, or otherwise overwritten with the new
content.
On 2013/02/12 07:32:38, jameinel wrote:
> does partSize need to match exactly the pre-uploaded content? I don't
see places
> where that is checked here. Probably at least a doc comment about it.
It's actually explicitly checked, but that's a bit redundant with the
above statement. If the size changes, the checksum won't match. That
said, given that you felt the need to ask, it won't hurt to mention it
explicitly.
https:/
s3/multi.go:282: if totalSize == 0 || err != nil {
On 2013/02/12 07:32:38, jameinel wrote:
> So I would probably split this:
> if err != nil {
> return nil, err
> } else if totalSize == 0 {
> return nil, nil
> }
Hmm.. that does exactly the same thing that the current version does.
> Though since you don't have a result object, wouldn't you want a
non-nil err?
> Otherwise you'll get a segfault accessing the result later.
That's not the case with slices. The most conventional way to represent
an empty slice is with a nil/zero value. len, append, etc, all work
properly with them, and code taking slices should behave properly as
well, or may otherwise be considered broken.
https:/
s3/multi.go:310: part, err := m.putPart(current, section, partSize,
md5b64)
On 2013/02/12 07:32:38, jameinel wrote:
> As an aside, would there be any reason to do parallel uploads?
That's an interesting question. I guess we could probably get some speed
up by sending parts in parallel. It's a good topic to do some
experimentation on.
John A Meinel (jameinel) wrote : | # |
LGTM
https:/
File s3/multi.go (right):
https:/
s3/multi.go:282: if totalSize == 0 || err != nil {
That's fine. I wasn't thinking about it being a slice, it does seem
correct there.
The concern was stuff like: http://
But that doesn't apply here because err has type "error" rather than a
custom type that could be a nil pointer.
Roger Peppe (rogpeppe) wrote : | # |
really nice API.
a few suggestions below.
https:/
File s3/multi.go (right):
https:/
s3/multi.go:88: // inside b. If a multipart upload exists for key, it is
returned,
if we assume that a bucket implies its key, then i think the explanation
can become simpler. how about this, perhaps?
// Multi returns a multipart upload handler for the bucket.
// If one already exists, it is returned, otherwise
// a new multipart upload is initiated with contType and perm.
https:/
s3/multi.go:229: // ListParts returns the list of previously uploaded
parts in m.
// ordered by part number.
?
https:/
s3/multi.go:269: // PutAll sends all of r via a multipart upload with
parts no large
s/large/larger/
https:/
s3/multi.go:273: // PutAll returns all the parts of m (reused or not).
nice interface.
https:/
s3/multi.go:279: reuse := 1 // Index of next old part to consider
reusing.
I was a little confused here by the one-based indexing used for reuse
here - it makes it look like it's in the same units as current, but
reuse really is an index (into old) and current is a part number.
Also, current isn't really the index of the last good part handled,
AFAICS - it's the part number of the part we're about to consider
putting.
I'd be inclined to do:
reuse := 0 // Index of next old part to consider reusing.
current := 1 // Part number of current part.
and adjust the use of reuse below.
https:/
s3/multi.go:282: if totalSize == 0 || err != nil {
On 2013/02/12 08:02:52, dimitern wrote:
> On 2013/02/12 07:32:38, jameinel wrote:
> > What will this return if totalSize is 0? the return of ListParts?
But doesn't
> > that violate the 'you should return nil, otherwise the caller gets
an
> interface
> > object that contains nil but isn't nil'?
> >
> > So I would probably split this:
> > if err != nil {
> > return nil, err
> > } else if totalSize == 0 {
> > return nil, nil
> > }
> >
> > Though since you don't have a result object, wouldn't you want a
non-nil err?
> > Otherwise you'll get a segfault accessing the result later.
> ISTM if totalSize == 0 it will return nil, nil, since there won't be
an error.
> But if it's not obvious, +1 on the suggestion.
i don't mind the way it's phrased here, but i would like to see a test
case that tests putting a zero-length file live (i *presume* you can
read a file ok if it's had no PUT requests, but it's only a
presumption). perhaps i missed a relevant test though.
https:/
s3/multi.go:316: }
the logic above is moderately intricate - i'd like to see some more test
cases to go with TestPutAllResume,
at least a case with a reused part in the middle of other non-reused
parts, and vice versa.
Gustavo Niemeyer (niemeyer) wrote : | # |
https:/
File s3/multi.go (right):
https:/
s3/multi.go:88: // inside b. If a multipart upload exists for key, it is
returned,
On 2013/02/13 10:22:03, rog wrote:
> if we assume that a bucket implies its key, then i think the
explanation can
> become simpler. how about this, perhaps?
> // Multi returns a multipart upload handler for the bucket.
> // If one already exists, it is returned, otherwise
> // a new multipart upload is initiated with contType and perm.
A bucket doesn't imply its key. A bucket contains keys. What's described
above is not the same as what was documented in the method, or what the
method does.
https:/
s3/multi.go:229: // ListParts returns the list of previously uploaded
parts in m.
On 2013/02/13 10:22:03, rog wrote:
> // ordered by part number.
> ?
Sounds good.
https:/
s3/multi.go:269: // PutAll sends all of r via a multipart upload with
parts no large
On 2013/02/13 10:22:03, rog wrote:
> s/large/larger/
Thanks.
https:/
s3/multi.go:279: reuse := 1 // Index of next old part to consider
reusing.
On 2013/02/13 10:22:03, rog wrote:
> I was a little confused here by the one-based indexing used for reuse
here - it
> makes it look like it's in the same units as current, but reuse really
is an
> index (into old) and current is a part number.
> Also, current isn't really the index of the last good part handled,
AFAICS -
> it's the part number of the part we're about to consider putting.
> I'd be inclined to do:
> reuse := 0 // Index of next old part to consider reusing.
> current := 1 // Part number of current part.
> and adjust the use of reuse below.
Sounds good.
https:/
s3/multi.go:282: if totalSize == 0 || err != nil {
On 2013/02/13 10:22:03, rog wrote:
> On 2013/02/12 08:02:52, dimitern wrote:
> > On 2013/02/12 07:32:38, jameinel wrote:
> > > What will this return if totalSize is 0? the return of ListParts?
But
> doesn't
> > > that violate the 'you should return nil, otherwise the caller gets
an
> > interface
> > > object that contains nil but isn't nil'?
> > >
> > > So I would probably split this:
> > > if err != nil {
> > > return nil, err
> > > } else if totalSize == 0 {
> > > return nil, nil
> > > }
> > >
> > > Though since you don't have a result object, wouldn't you want a
non-nil
> err?
> > > Otherwise you'll get a segfault accessing the result later.
> >
> > ISTM if totalSize == 0 it will return nil, nil, since there won't be
an error.
> > But if it's not obvious, +1 on the suggestion.
> i don't mind the way it's phrased here, but i would like to see a test
case that
> tests putting a zero-length file live (i *presume* you can read a file
ok if
> it's had no PUT requests, but it's only a presumption). perhaps i
missed a
> relevant test though.
I don't think it matters whether that works or not in the server. That's
an edge case, and the ser...
- 32. By Gustavo Niemeyer
-
Changes after reviews.
Gustavo Niemeyer (niemeyer) wrote : | # |
Please take a look.
Roger Peppe (rogpeppe) wrote : | # |
LGTM modulo the below.
https:/
File s3/multi.go (right):
https:/
s3/multi.go:276: // new content.
As discussed, perhaps a comment like:
// Multipart uploads of zero length files may not work.
https:/
s3/multi.go:284: current := 1 // Index of latest good part handled.
This comment is still misleading. Current isn't an index in the same way
that reuse is, and at this point we haven't any good parts, so there's
no "latest" good part - current is the part number of the next part to
handle.
- 33. By Gustavo Niemeyer
-
Add edge case for zero-length files.
Gustavo Niemeyer (niemeyer) wrote : | # |
Please take a look.
Gustavo Niemeyer (niemeyer) wrote : | # |
*** Submitted:
s3: Multi.PutAll for auto-chunking and resuming
Bucket.Multi method also added for trivial resume-or-init handling.
R=jameinel, dimitern, rog
CC=
https:/
Preview Diff
1 | === modified file 's3/multi.go' |
2 | --- s3/multi.go 2013-02-05 22:56:23 +0000 |
3 | +++ s3/multi.go 2013-02-13 12:22:21 +0000 |
4 | @@ -4,6 +4,7 @@ |
5 | "bytes" |
6 | "crypto/md5" |
7 | "encoding/base64" |
8 | + "encoding/hex" |
9 | "encoding/xml" |
10 | "errors" |
11 | "io" |
12 | @@ -24,6 +25,7 @@ |
13 | UploadId string |
14 | } |
15 | |
16 | +// That's the default. Here just for testing. |
17 | var listMultiMax = 1000 |
18 | |
19 | type listMultiResp struct { |
20 | @@ -83,6 +85,22 @@ |
21 | panic("unreachable") |
22 | } |
23 | |
24 | +// Multi returns a multipart upload handler for the provided key |
25 | +// inside b. If a multipart upload exists for key, it is returned, |
26 | +// otherwise a new multipart upload is initiated with contType and perm. |
27 | +func (b *Bucket) Multi(key, contType string, perm ACL) (*Multi, error) { |
28 | + multis, _, err := b.ListMulti(key, "") |
29 | + if err != nil && !hasCode(err, "NoSuchUpload") { |
30 | + return nil, err |
31 | + } |
32 | + for _, m := range multis { |
33 | + if m.Key == key { |
34 | + return m, nil |
35 | + } |
36 | + } |
37 | + return b.InitMulti(key, contType, perm) |
38 | +} |
39 | + |
40 | // InitMulti initializes a new multipart upload at the provided |
41 | // key inside b and returns a value for manipulating it. |
42 | // |
43 | @@ -124,13 +142,17 @@ |
44 | // |
45 | // See http://goo.gl/pqZer for details. |
46 | func (m *Multi) PutPart(n int, r io.ReadSeeker) (Part, error) { |
47 | - length, b64md5, err := seekerInfo(r) |
48 | + partSize, _, md5b64, err := seekerInfo(r) |
49 | if err != nil { |
50 | return Part{}, err |
51 | } |
52 | + return m.putPart(n, r, partSize, md5b64) |
53 | +} |
54 | + |
55 | +func (m *Multi) putPart(n int, r io.ReadSeeker, partSize int64, md5b64 string) (Part, error) { |
56 | headers := map[string][]string{ |
57 | - "Content-Length": {strconv.FormatInt(length, 10)}, |
58 | - "Content-MD5": {b64md5}, |
59 | + "Content-Length": {strconv.FormatInt(partSize, 10)}, |
60 | + "Content-MD5": {md5b64}, |
61 | } |
62 | params := map[string][]string{ |
63 | "uploadId": {m.UploadId}, |
64 | @@ -164,23 +186,25 @@ |
65 | if etag == "" { |
66 | return Part{}, errors.New("part upload succeeded with no ETag") |
67 | } |
68 | - return Part{n, etag, length}, nil |
69 | + return Part{n, etag, partSize}, nil |
70 | } |
71 | panic("unreachable") |
72 | } |
73 | |
74 | -func seekerInfo(r io.ReadSeeker) (length int64, b64md5 string, err error) { |
75 | +func seekerInfo(r io.ReadSeeker) (size int64, md5hex string, md5b64 string, err error) { |
76 | _, err = r.Seek(0, 0) |
77 | if err != nil { |
78 | - return 0, "", err |
79 | + return 0, "", "", err |
80 | } |
81 | digest := md5.New() |
82 | - length, err = io.Copy(digest, r) |
83 | + size, err = io.Copy(digest, r) |
84 | if err != nil { |
85 | - return 0, "", err |
86 | + return 0, "", "", err |
87 | } |
88 | - b64md5 = base64.StdEncoding.EncodeToString(digest.Sum(nil)) |
89 | - return length, b64md5, nil |
90 | + sum := digest.Sum(nil) |
91 | + md5hex = hex.EncodeToString(sum) |
92 | + md5b64 = base64.StdEncoding.EncodeToString(sum) |
93 | + return size, md5hex, md5b64, nil |
94 | } |
95 | |
96 | type Part struct { |
97 | @@ -189,15 +213,23 @@ |
98 | Size int64 |
99 | } |
100 | |
101 | +type partSlice []Part |
102 | + |
103 | +func (s partSlice) Len() int { return len(s) } |
104 | +func (s partSlice) Less(i, j int) bool { return s[i].N < s[j].N } |
105 | +func (s partSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } |
106 | + |
107 | type listPartsResp struct { |
108 | NextPartNumberMarker string |
109 | IsTruncated bool |
110 | Part []Part |
111 | } |
112 | |
113 | +// That's the default. Here just for testing. |
114 | var listPartsMax = 1000 |
115 | |
116 | -// ListParts returns the list of previously uploaded parts in m. |
117 | +// ListParts returns the list of previously uploaded parts in m, |
118 | +// ordered by part number. |
119 | // |
120 | // See http://goo.gl/ePioY for details. |
121 | func (m *Multi) ListParts() ([]Part, error) { |
122 | @@ -205,7 +237,7 @@ |
123 | "uploadId": {m.UploadId}, |
124 | "max-parts": {strconv.FormatInt(int64(listPartsMax), 10)}, |
125 | } |
126 | - var parts []Part |
127 | + var parts partSlice |
128 | for attempt := attempts.Start(); attempt.Next(); { |
129 | req := &request{ |
130 | method: "GET", |
131 | @@ -223,6 +255,7 @@ |
132 | } |
133 | parts = append(parts, resp.Part...) |
134 | if !resp.IsTruncated { |
135 | + sort.Sort(parts) |
136 | return parts, nil |
137 | } |
138 | params["part-number-marker"] = []string{resp.NextPartNumberMarker} |
139 | @@ -231,6 +264,65 @@ |
140 | panic("unreachable") |
141 | } |
142 | |
143 | +type ReaderAtSeeker interface { |
144 | + io.ReaderAt |
145 | + io.ReadSeeker |
146 | +} |
147 | + |
148 | +// PutAll sends all of r via a multipart upload with parts no larger |
149 | +// than partSize bytes, which must be set to at least 5MB. |
150 | +// Parts previously uploaded are either reused if their checksum |
151 | +// and size match the new part, or otherwise overwritten with the |
152 | +// new content. |
153 | +// PutAll returns all the parts of m (reused or not). |
154 | +func (m *Multi) PutAll(r ReaderAtSeeker, partSize int64) ([]Part, error) { |
155 | + old, err := m.ListParts() |
156 | + if err != nil && !hasCode(err, "NoSuchUpload") { |
157 | + return nil, err |
158 | + } |
159 | + reuse := 0 // Index of next old part to consider reusing. |
160 | + current := 1 // Part number of latest good part handled. |
161 | + totalSize, err := r.Seek(0, 2) |
162 | + if err != nil { |
163 | + return nil, err |
164 | + } |
165 | + first := true // Must send at least one empty part if the file is empty. |
166 | + var result []Part |
167 | +NextSection: |
168 | + for offset := int64(0); offset < totalSize || first; offset += partSize { |
169 | + first = false |
170 | + if offset+partSize > totalSize { |
171 | + partSize = totalSize - offset |
172 | + } |
173 | + section := io.NewSectionReader(r, offset, partSize) |
174 | + _, md5hex, md5b64, err := seekerInfo(section) |
175 | + if err != nil { |
176 | + return nil, err |
177 | + } |
178 | + for reuse < len(old) && old[reuse].N <= current { |
179 | + // Looks like this part was already sent. |
180 | + part := &old[reuse] |
181 | + etag := `"` + md5hex + `"` |
182 | + if part.N == current && part.Size == partSize && part.ETag == etag { |
183 | + // Checksum matches. Reuse the old part. |
184 | + result = append(result, *part) |
185 | + current++ |
186 | + continue NextSection |
187 | + } |
188 | + reuse++ |
189 | + } |
190 | + |
191 | + // Part wasn't found or doesn't match. Send it. |
192 | + part, err := m.putPart(current, section, partSize, md5b64) |
193 | + if err != nil { |
194 | + return nil, err |
195 | + } |
196 | + result = append(result, part) |
197 | + current++ |
198 | + } |
199 | + return result, nil |
200 | +} |
201 | + |
202 | type completeUpload struct { |
203 | XMLName xml.Name `xml:"CompleteMultipartUpload"` |
204 | Parts completeParts `xml:"Part"` |
205 | @@ -298,14 +390,14 @@ |
206 | // See http://goo.gl/dnyJw for details. |
207 | func (m *Multi) Abort() error { |
208 | params := map[string][]string{ |
209 | - "uploadId": {m.UploadId}, |
210 | + "uploadId": {m.UploadId}, |
211 | } |
212 | for attempt := attempts.Start(); attempt.Next(); { |
213 | req := &request{ |
214 | - method: "DELETE", |
215 | - bucket: m.Bucket.Name, |
216 | - path: m.Key, |
217 | - params: params, |
218 | + method: "DELETE", |
219 | + bucket: m.Bucket.Name, |
220 | + path: m.Key, |
221 | + params: params, |
222 | } |
223 | err := m.Bucket.S3.query(req, nil) |
224 | if shouldRetry(err) && attempt.HasNext() { |
225 | |
226 | === modified file 's3/multi_test.go' |
227 | --- s3/multi_test.go 2013-02-01 05:58:42 +0000 |
228 | +++ s3/multi_test.go 2013-02-13 12:22:21 +0000 |
229 | @@ -2,12 +2,13 @@ |
230 | |
231 | import ( |
232 | "encoding/xml" |
233 | + "io" |
234 | + "io/ioutil" |
235 | "launchpad.net/goamz/s3" |
236 | . "launchpad.net/gocheck" |
237 | "strings" |
238 | ) |
239 | |
240 | - |
241 | func (s *S) TestInitMulti(c *C) { |
242 | testServer.Response(200, nil, InitMultiResultDump) |
243 | |
244 | @@ -26,6 +27,88 @@ |
245 | c.Assert(multi.UploadId, Matches, "JNbR_[A-Za-z0-9.]+QQ--") |
246 | } |
247 | |
248 | +func (s *S) TestMultiNoPreviousUpload(c *C) { |
249 | + // Don't retry the NoSuchUpload error. |
250 | + s.DisableRetries() |
251 | + |
252 | + testServer.Response(404, nil, NoSuchUploadErrorDump) |
253 | + testServer.Response(200, nil, InitMultiResultDump) |
254 | + |
255 | + b := s.s3.Bucket("sample") |
256 | + |
257 | + multi, err := b.Multi("multi", "text/plain", s3.Private) |
258 | + c.Assert(err, IsNil) |
259 | + |
260 | + req := testServer.WaitRequest() |
261 | + c.Assert(req.Method, Equals, "GET") |
262 | + c.Assert(req.URL.Path, Equals, "/sample/") |
263 | + c.Assert(req.Form["uploads"], DeepEquals, []string{""}) |
264 | + c.Assert(req.Form["prefix"], DeepEquals, []string{"multi"}) |
265 | + |
266 | + req = testServer.WaitRequest() |
267 | + c.Assert(req.Method, Equals, "POST") |
268 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
269 | + c.Assert(req.Form["uploads"], DeepEquals, []string{""}) |
270 | + |
271 | + c.Assert(multi.UploadId, Matches, "JNbR_[A-Za-z0-9.]+QQ--") |
272 | +} |
273 | + |
274 | +func (s *S) TestMultiReturnOld(c *C) { |
275 | + testServer.Response(200, nil, ListMultiResultDump) |
276 | + |
277 | + b := s.s3.Bucket("sample") |
278 | + |
279 | + multi, err := b.Multi("multi1", "text/plain", s3.Private) |
280 | + c.Assert(err, IsNil) |
281 | + c.Assert(multi.Key, Equals, "multi1") |
282 | + c.Assert(multi.UploadId, Equals, "iUVug89pPvSswrikD") |
283 | + |
284 | + req := testServer.WaitRequest() |
285 | + c.Assert(req.Method, Equals, "GET") |
286 | + c.Assert(req.URL.Path, Equals, "/sample/") |
287 | + c.Assert(req.Form["uploads"], DeepEquals, []string{""}) |
288 | + c.Assert(req.Form["prefix"], DeepEquals, []string{"multi1"}) |
289 | +} |
290 | + |
291 | +func (s *S) TestListParts(c *C) { |
292 | + testServer.Response(200, nil, InitMultiResultDump) |
293 | + testServer.Response(200, nil, ListPartsResultDump1) |
294 | + testServer.Response(404, nil, NoSuchUploadErrorDump) // :-( |
295 | + testServer.Response(200, nil, ListPartsResultDump2) |
296 | + |
297 | + b := s.s3.Bucket("sample") |
298 | + |
299 | + multi, err := b.InitMulti("multi", "text/plain", s3.Private) |
300 | + c.Assert(err, IsNil) |
301 | + |
302 | + parts, err := multi.ListParts() |
303 | + c.Assert(err, IsNil) |
304 | + c.Assert(parts, HasLen, 3) |
305 | + c.Assert(parts[0].N, Equals, 1) |
306 | + c.Assert(parts[0].Size, Equals, int64(5)) |
307 | + c.Assert(parts[0].ETag, Equals, `"ffc88b4ca90a355f8ddba6b2c3b2af5c"`) |
308 | + c.Assert(parts[1].N, Equals, 2) |
309 | + c.Assert(parts[1].Size, Equals, int64(5)) |
310 | + c.Assert(parts[1].ETag, Equals, `"d067a0fa9dc61a6e7195ca99696b5a89"`) |
311 | + c.Assert(parts[2].N, Equals, 3) |
312 | + c.Assert(parts[2].Size, Equals, int64(5)) |
313 | + c.Assert(parts[2].ETag, Equals, `"49dcd91231f801159e893fb5c6674985"`) |
314 | + testServer.WaitRequest() |
315 | + req := testServer.WaitRequest() |
316 | + c.Assert(req.Method, Equals, "GET") |
317 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
318 | + c.Assert(req.Form.Get("uploadId"), Matches, "JNbR_[A-Za-z0-9.]+QQ--") |
319 | + c.Assert(req.Form["max-parts"], DeepEquals, []string{"1000"}) |
320 | + |
321 | + testServer.WaitRequest() // The internal error. |
322 | + req = testServer.WaitRequest() |
323 | + c.Assert(req.Method, Equals, "GET") |
324 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
325 | + c.Assert(req.Form.Get("uploadId"), Matches, "JNbR_[A-Za-z0-9.]+QQ--") |
326 | + c.Assert(req.Form["max-parts"], DeepEquals, []string{"1000"}) |
327 | + c.Assert(req.Form["part-number-marker"], DeepEquals, []string{"2"}) |
328 | +} |
329 | + |
330 | func (s *S) TestPutPart(c *C) { |
331 | headers := map[string]string{ |
332 | "ETag": `"26f90efd10d614f100252ff56d88dad8"`, |
333 | @@ -54,44 +137,153 @@ |
334 | c.Assert(req.Header["Content-Md5"], DeepEquals, []string{"JvkO/RDWFPEAJS/1bYja2A=="}) |
335 | } |
336 | |
337 | -func (s *S) TestListParts(c *C) { |
338 | +func readAll(r io.Reader) string { |
339 | + data, err := ioutil.ReadAll(r) |
340 | + if err != nil { |
341 | + panic(err) |
342 | + } |
343 | + return string(data) |
344 | +} |
345 | + |
346 | +func (s *S) TestPutAllNoPreviousUpload(c *C) { |
347 | + // Don't retry the NoSuchUpload error. |
348 | + s.DisableRetries() |
349 | + |
350 | + etag1 := map[string]string{"ETag": `"etag1"`} |
351 | + etag2 := map[string]string{"ETag": `"etag2"`} |
352 | + etag3 := map[string]string{"ETag": `"etag3"`} |
353 | + testServer.Response(200, nil, InitMultiResultDump) |
354 | + testServer.Response(404, nil, NoSuchUploadErrorDump) |
355 | + testServer.Response(200, etag1, "") |
356 | + testServer.Response(200, etag2, "") |
357 | + testServer.Response(200, etag3, "") |
358 | + |
359 | + b := s.s3.Bucket("sample") |
360 | + |
361 | + multi, err := b.InitMulti("multi", "text/plain", s3.Private) |
362 | + c.Assert(err, IsNil) |
363 | + |
364 | + parts, err := multi.PutAll(strings.NewReader("part1part2last"), 5) |
365 | + c.Assert(parts, HasLen, 3) |
366 | + c.Assert(parts[0].ETag, Equals, `"etag1"`) |
367 | + c.Assert(parts[1].ETag, Equals, `"etag2"`) |
368 | + c.Assert(parts[2].ETag, Equals, `"etag3"`) |
369 | + c.Assert(err, IsNil) |
370 | + |
371 | + // Init |
372 | + testServer.WaitRequest() |
373 | + |
374 | + // List old parts. Won't find anything. |
375 | + req := testServer.WaitRequest() |
376 | + c.Assert(req.Method, Equals, "GET") |
377 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
378 | + |
379 | + // Send part 1. |
380 | + req = testServer.WaitRequest() |
381 | + c.Assert(req.Method, Equals, "PUT") |
382 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
383 | + c.Assert(req.Form["partNumber"], DeepEquals, []string{"1"}) |
384 | + c.Assert(req.Header["Content-Length"], DeepEquals, []string{"5"}) |
385 | + c.Assert(readAll(req.Body), Equals, "part1") |
386 | + |
387 | + // Send part 2. |
388 | + req = testServer.WaitRequest() |
389 | + c.Assert(req.Method, Equals, "PUT") |
390 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
391 | + c.Assert(req.Form["partNumber"], DeepEquals, []string{"2"}) |
392 | + c.Assert(req.Header["Content-Length"], DeepEquals, []string{"5"}) |
393 | + c.Assert(readAll(req.Body), Equals, "part2") |
394 | + |
395 | + // Send part 3 with shorter body. |
396 | + req = testServer.WaitRequest() |
397 | + c.Assert(req.Method, Equals, "PUT") |
398 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
399 | + c.Assert(req.Form["partNumber"], DeepEquals, []string{"3"}) |
400 | + c.Assert(req.Header["Content-Length"], DeepEquals, []string{"4"}) |
401 | + c.Assert(readAll(req.Body), Equals, "last") |
402 | +} |
403 | + |
404 | +func (s *S) TestPutAllZeroSizeFile(c *C) { |
405 | + // Don't retry the NoSuchUpload error. |
406 | + s.DisableRetries() |
407 | + |
408 | + etag1 := map[string]string{"ETag": `"etag1"`} |
409 | + testServer.Response(200, nil, InitMultiResultDump) |
410 | + testServer.Response(404, nil, NoSuchUploadErrorDump) |
411 | + testServer.Response(200, etag1, "") |
412 | + |
413 | + b := s.s3.Bucket("sample") |
414 | + |
415 | + multi, err := b.InitMulti("multi", "text/plain", s3.Private) |
416 | + c.Assert(err, IsNil) |
417 | + |
418 | + // Must send at least one part, so that completing it will work. |
419 | + parts, err := multi.PutAll(strings.NewReader(""), 5) |
420 | + c.Assert(parts, HasLen, 1) |
421 | + c.Assert(parts[0].ETag, Equals, `"etag1"`) |
422 | + c.Assert(err, IsNil) |
423 | + |
424 | + // Init |
425 | + testServer.WaitRequest() |
426 | + |
427 | + // List old parts. Won't find anything. |
428 | + req := testServer.WaitRequest() |
429 | + c.Assert(req.Method, Equals, "GET") |
430 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
431 | + |
432 | + // Send empty part. |
433 | + req = testServer.WaitRequest() |
434 | + c.Assert(req.Method, Equals, "PUT") |
435 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
436 | + c.Assert(req.Form["partNumber"], DeepEquals, []string{"1"}) |
437 | + c.Assert(req.Header["Content-Length"], DeepEquals, []string{"0"}) |
438 | + c.Assert(readAll(req.Body), Equals, "") |
439 | +} |
440 | + |
441 | +func (s *S) TestPutAllResume(c *C) { |
442 | + etag2 := map[string]string{"ETag": `"etag2"`} |
443 | testServer.Response(200, nil, InitMultiResultDump) |
444 | testServer.Response(200, nil, ListPartsResultDump1) |
445 | - testServer.Response(404, nil, NoSuchUploadErrorDump) // :-( |
446 | testServer.Response(200, nil, ListPartsResultDump2) |
447 | + testServer.Response(200, etag2, "") |
448 | |
449 | b := s.s3.Bucket("sample") |
450 | |
451 | multi, err := b.InitMulti("multi", "text/plain", s3.Private) |
452 | c.Assert(err, IsNil) |
453 | |
454 | - parts, err := multi.ListParts() |
455 | - c.Assert(err, IsNil) |
456 | + // "part1" and "part3" match the checksums in ResultDump1. |
457 | + // The middle one is a mismatch (it refers to "part2"). |
458 | + parts, err := multi.PutAll(strings.NewReader("part1partXpart3"), 5) |
459 | c.Assert(parts, HasLen, 3) |
460 | c.Assert(parts[0].N, Equals, 1) |
461 | - c.Assert(parts[0].Size, Equals, int64(8)) |
462 | - c.Assert(parts[0].ETag, Equals, `"26f90efd10d614f100252ff56d88dad8"`) |
463 | + c.Assert(parts[0].Size, Equals, int64(5)) |
464 | + c.Assert(parts[0].ETag, Equals, `"ffc88b4ca90a355f8ddba6b2c3b2af5c"`) |
465 | c.Assert(parts[1].N, Equals, 2) |
466 | - c.Assert(parts[1].Size, Equals, int64(8)) |
467 | - c.Assert(parts[1].ETag, Equals, `"b572ef59bfa4a719fdf2b3c13c583af8"`) |
468 | + c.Assert(parts[1].Size, Equals, int64(5)) |
469 | + c.Assert(parts[1].ETag, Equals, `"etag2"`) |
470 | c.Assert(parts[2].N, Equals, 3) |
471 | - c.Assert(parts[2].Size, Equals, int64(4)) |
472 | - c.Assert(parts[2].ETag, Equals, `"50dbe110509c7952ba70a96587bd7c40"`) |
473 | + c.Assert(parts[2].Size, Equals, int64(5)) |
474 | + c.Assert(parts[2].ETag, Equals, `"49dcd91231f801159e893fb5c6674985"`) |
475 | + c.Assert(err, IsNil) |
476 | |
477 | + // Init |
478 | testServer.WaitRequest() |
479 | + |
480 | + // List old parts, broken in two requests. |
481 | + for i := 0; i < 2; i++ { |
482 | + req := testServer.WaitRequest() |
483 | + c.Assert(req.Method, Equals, "GET") |
484 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
485 | + } |
486 | + |
487 | + // Send part 2, as it didn't match the checksum. |
488 | req := testServer.WaitRequest() |
489 | - c.Assert(req.Method, Equals, "GET") |
490 | - c.Assert(req.URL.Path, Equals, "/sample/multi") |
491 | - c.Assert(req.Form.Get("uploadId"), Matches, "JNbR_[A-Za-z0-9.]+QQ--") |
492 | - c.Assert(req.Form["max-parts"], DeepEquals, []string{"1000"}) |
493 | - |
494 | - testServer.WaitRequest() // The internal error. |
495 | - req = testServer.WaitRequest() |
496 | - c.Assert(req.Method, Equals, "GET") |
497 | - c.Assert(req.URL.Path, Equals, "/sample/multi") |
498 | - c.Assert(req.Form.Get("uploadId"), Matches, "JNbR_[A-Za-z0-9.]+QQ--") |
499 | - c.Assert(req.Form["max-parts"], DeepEquals, []string{"1000"}) |
500 | - c.Assert(req.Form["part-number-marker"], DeepEquals, []string{"2"}) |
501 | + c.Assert(req.Method, Equals, "PUT") |
502 | + c.Assert(req.URL.Path, Equals, "/sample/multi") |
503 | + c.Assert(req.Form["partNumber"], DeepEquals, []string{"2"}) |
504 | + c.Assert(req.Header["Content-Length"], DeepEquals, []string{"5"}) |
505 | + c.Assert(readAll(req.Body), Equals, "partX") |
506 | } |
507 | |
508 | func (s *S) TestMultiComplete(c *C) { |
509 | @@ -115,11 +307,11 @@ |
510 | c.Assert(req.URL.Path, Equals, "/sample/multi") |
511 | c.Assert(req.Form.Get("uploadId"), Matches, "JNbR_[A-Za-z0-9.]+QQ--") |
512 | |
513 | - var payload struct{ |
514 | + var payload struct { |
515 | XMLName xml.Name |
516 | - Part []struct{ |
517 | + Part []struct { |
518 | PartNumber int |
519 | - ETag string |
520 | + ETag string |
521 | } |
522 | } |
523 | |
524 | |
525 | === modified file 's3/responses_test.go' |
526 | --- s3/responses_test.go 2013-02-01 05:58:42 +0000 |
527 | +++ s3/responses_test.go 2013-02-13 12:22:21 +0000 |
528 | @@ -87,14 +87,14 @@ |
529 | <Part> |
530 | <PartNumber>1</PartNumber> |
531 | <LastModified>2013-01-30T13:45:51.000Z</LastModified> |
532 | - <ETag>"26f90efd10d614f100252ff56d88dad8"</ETag> |
533 | - <Size>8</Size> |
534 | + <ETag>"ffc88b4ca90a355f8ddba6b2c3b2af5c"</ETag> |
535 | + <Size>5</Size> |
536 | </Part> |
537 | <Part> |
538 | <PartNumber>2</PartNumber> |
539 | <LastModified>2013-01-30T13:45:52.000Z</LastModified> |
540 | - <ETag>"b572ef59bfa4a719fdf2b3c13c583af8"</ETag> |
541 | - <Size>8</Size> |
542 | + <ETag>"d067a0fa9dc61a6e7195ca99696b5a89"</ETag> |
543 | + <Size>5</Size> |
544 | </Part> |
545 | </ListPartsResult> |
546 | ` |
547 | @@ -121,8 +121,8 @@ |
548 | <Part> |
549 | <PartNumber>3</PartNumber> |
550 | <LastModified>2013-01-30T13:46:50.000Z</LastModified> |
551 | - <ETag>"50dbe110509c7952ba70a96587bd7c40"</ETag> |
552 | - <Size>4</Size> |
553 | + <ETag>"49dcd91231f801159e893fb5c6674985"</ETag> |
554 | + <Size>5</Size> |
555 | </Part> |
556 | </ListPartsResult> |
557 | ` |
558 | |
559 | === modified file 's3/s3_test.go' |
560 | --- s3/s3_test.go 2013-02-01 20:20:47 +0000 |
561 | +++ s3/s3_test.go 2013-02-13 12:22:21 +0000 |
562 | @@ -29,6 +29,13 @@ |
563 | testServer.Start() |
564 | auth := aws.Auth{"abc", "123"} |
565 | s.s3 = s3.New(auth, aws.Region{Name: "faux-region-1", S3Endpoint: testServer.URL}) |
566 | +} |
567 | + |
568 | +func (s *S) TearDownSuite(c *C) { |
569 | + s3.SetAttemptStrategy(nil) |
570 | +} |
571 | + |
572 | +func (s *S) SetUpTest(c *C) { |
573 | attempts := aws.AttemptStrategy{ |
574 | Total: 300 * time.Millisecond, |
575 | Delay: 100 * time.Millisecond, |
576 | @@ -36,14 +43,14 @@ |
577 | s3.SetAttemptStrategy(&attempts) |
578 | } |
579 | |
580 | -func (s *S) TearDownSuite(c *C) { |
581 | - s3.SetAttemptStrategy(nil) |
582 | -} |
583 | - |
584 | func (s *S) TearDownTest(c *C) { |
585 | testServer.Flush() |
586 | } |
587 | |
588 | +func (s *S) DisableRetries() { |
589 | + s3.SetAttemptStrategy(&aws.AttemptStrategy{}) |
590 | +} |
591 | + |
592 | // PutBucket docs: http://goo.gl/kBTCu |
593 | |
594 | func (s *S) TestPutBucket(c *C) { |
595 | |
596 | === modified file 's3/s3i_test.go' |
597 | --- s3/s3i_test.go 2013-02-11 21:32:40 +0000 |
598 | +++ s3/s3i_test.go 2013-02-13 12:22:21 +0000 |
599 | @@ -102,7 +102,7 @@ |
600 | } |
601 | |
602 | var attempts = aws.AttemptStrategy{ |
603 | - Min: 5, |
604 | + Min: 5, |
605 | Total: 20 * time.Second, |
606 | Delay: 100 * time.Millisecond, |
607 | } |
608 | @@ -567,3 +567,24 @@ |
609 | c.Assert(multis[1].Key, Equals, "a/multi3") |
610 | c.Assert(multis[1].UploadId, Matches, ".+") |
611 | } |
612 | + |
613 | +func (s *ClientTests) TestMultiPutAllZeroLength(c *C) { |
614 | + b := testBucket(s.s3) |
615 | + err := b.PutBucket(s3.Private) |
616 | + c.Assert(err, IsNil) |
617 | + |
618 | + multi, err := b.InitMulti("multi", "text/plain", s3.Private) |
619 | + c.Assert(err, IsNil) |
620 | + defer multi.Abort() |
621 | + |
622 | + // This tests an edge case. Amazon requires at least one |
623 | + // part for multiprat uploads to work, even the part is empty. |
624 | + parts, err := multi.PutAll(strings.NewReader(""), 5*1024*1024) |
625 | + c.Assert(err, IsNil) |
626 | + c.Assert(parts, HasLen, 1) |
627 | + c.Assert(parts[0].Size, Equals, int64(0)) |
628 | + c.Assert(parts[0].ETag, Equals, `"d41d8cd98f00b204e9800998ecf8427e"`) |
629 | + |
630 | + err = multi.Complete(parts) |
631 | + c.Assert(err, IsNil) |
632 | +} |
Reviewers: mp+147829_ code.launchpad. net,
Message:
Please take a look.
Description:
s3: Multi.PutAll for auto-chunking and resuming
Bucket.Multi method also added for trivial resume-or-init handling.
https:/ /code.launchpad .net/~niemeyer/ goamz/put- all/+merge/ 147829
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/7313081/
Affected files: test.go
A [revision details]
M s3/multi.go
M s3/multi_test.go
M s3/responses_
M s3/s3_test.go
M s3/s3i_test.go