Merge lp:~dkeeney/txaws/add-multipart-upload into lp:txaws

Proposed by Zooko Wilcox-O'Hearn
Status: Needs review
Proposed branch: lp:~dkeeney/txaws/add-multipart-upload
Merge into: lp:txaws
Diff against target: 589 lines (+467/-5) (has conflicts)
8 files modified
bin/txaws-delete-upload (+45/-0)
bin/txaws-list-uploads (+40/-0)
bin/txaws-post-upload (+86/-0)
bin/txaws-put-object (+1/-1)
txaws/client/base.py (+6/-0)
txaws/s3/client.py (+205/-4)
txaws/s3/multipart.py (+81/-0)
txaws/script.py (+3/-0)
Text conflict in txaws/client/base.py
Text conflict in txaws/s3/client.py
To merge this branch: bzr merge lp:~dkeeney/txaws/add-multipart-upload
Reviewer Review Type Date Requested Status
Zooko Wilcox-O'Hearn Needs Fixing
Review via email: mp+76647@code.launchpad.net

Description of the change

Okay, I read the patches on this branch: lp:~dkeeney/txaws/add-multipart-upload . The only problem I noticed is that the most recent changes -- http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/77 , http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/78 , and http://bazaar.launchpad.net/~dkeeney/txaws/add-multipart-upload/revision/79 -- don't have accompanying unit tests. It looks like to me from perusing the patches that this should be sufficient to support multipart upload to S3.

To post a comment you must log in.
Revision history for this message
Zooko Wilcox-O'Hearn (zooko) wrote :

Oh, and also a lack of tests for all of the other changes in the branch. I would favor adding tests before merging, actually, so I guess I should now undo my "suggest merge" action?

review: Needs Fixing
Revision history for this message
David Keeney (dkeeney) wrote :

I would be flattered to have my code merged, so I will write some
tests for untested things.

Should I request a merge myself, once that is done?

David

On Thu, Sep 22, 2011 at 2:39 PM, Zooko O'Whielacronx <email address hidden> wrote:
> Review: Needs Fixing
>
> Oh, and also a lack of tests for all of the other changes in the branch. I would favor adding tests before merging, actually, so I guess I should now undo my "suggest merge" action?
> --
> https://code.launchpad.net/~dkeeney/txaws/add-multipart-upload/+merge/76647
> You are the owner of lp:~dkeeney/txaws/add-multipart-upload.
>

Unmerged revisions

79. By David Keeney <email address hidden>

raise exception on no page from finish

78. By David Keeney <email address hidden>

refine upload finish handling

77. By David Keeney <email address hidden>

response reading added to multipart finish

76. By Eugene Oden

added a get_object_stream() method to the s3 client

75. By Eugene Oden

modified txaws-post-upload to avoid reading files into memory
minor documentation fix for txaws/script.py

74. By David Keeney <email address hidden>

removed transitional crap

73. By David Keeney <email address hidden>

removed transitional crap

72. By Eugene Oden

mark the new scripts as executable

71. By Eugene Oden

the rest of david's changes

70. By Eugene Oden

merging david's changes with the latest trunk

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file 'bin/txaws-delete-upload'
2--- bin/txaws-delete-upload 1970-01-01 00:00:00 +0000
3+++ bin/txaws-delete-upload 2011-09-22 20:23:28 +0000
4@@ -0,0 +1,45 @@
5+#!/usr/bin/env python
6+"""
7+%prog [options]
8+"""
9+
10+import sys
11+
12+from txaws.credentials import AWSCredentials
13+from txaws.script import parse_options
14+from txaws.service import AWSServiceRegion
15+from txaws.reactor import reactor
16+
17+def printResults(results):
18+ #print results
19+ return 0
20+
21+def printError(error):
22+ print error.value
23+ return 1
24+
25+def finish(return_code):
26+ reactor.stop(exitStatus=return_code)
27+
28+options, args = parse_options(__doc__.strip())
29+if options.bucket is None:
30+ print "Error Message: A bucket name is required."
31+ sys.exit(1)
32+elif options.object_name is None:
33+ print "Error Message: An object name is required."
34+ sys.exit(1)
35+elif options.uploadid is None:
36+ print "Error Message: An uploadId (-u) is required."
37+ sys.exit(1)
38+creds = AWSCredentials(options.access_key, options.secret_key)
39+region = AWSServiceRegion(
40+ creds=creds, region=options.region, s3_uri=options.url)
41+client = region.get_s3_client()
42+
43+d = client.abort_object(options.bucket, options.object_name, options.uploadid)
44+d.addCallback(printResults)
45+d.addErrback(printError)
46+d.addCallback(finish)
47+# We use a custom reactor so that we can return the exit status from
48+# reactor.run().
49+sys.exit(reactor.run())
50
51=== added file 'bin/txaws-list-uploads'
52--- bin/txaws-list-uploads 1970-01-01 00:00:00 +0000
53+++ bin/txaws-list-uploads 2011-09-22 20:23:28 +0000
54@@ -0,0 +1,40 @@
55+#!/usr/bin/env python
56+"""
57+%prog [options]
58+"""
59+
60+import sys
61+
62+from txaws.credentials import AWSCredentials
63+from txaws.script import parse_options
64+from txaws.service import AWSServiceRegion
65+from txaws.reactor import reactor
66+
67+
68+def printResults(results):
69+ print "\nUploads:"
70+ for upload in results:
71+ print "%s\n %s (%s)" %(upload[0],upload[2],upload[1])
72+ print "Total uploads: %s\n" % len(list(results))
73+ return 0
74+
75+def printError(error):
76+ print error.value
77+ return 1
78+
79+def finish(return_code):
80+ reactor.stop(exitStatus=return_code)
81+
82+options, args = parse_options(__doc__.strip())
83+creds = AWSCredentials(options.access_key, options.secret_key)
84+region = AWSServiceRegion(
85+ creds=creds, region=options.region, s3_uri=options.url)
86+client = region.get_s3_client()
87+
88+d = client.list_mpuploads(options.bucket)
89+d.addCallback(printResults)
90+d.addErrback(printError)
91+d.addCallback(finish)
92+# We use a custom reactor so that we can return the exit status from
93+# reactor.run().
94+sys.exit(reactor.run())
95
96=== added file 'bin/txaws-post-upload'
97--- bin/txaws-post-upload 1970-01-01 00:00:00 +0000
98+++ bin/txaws-post-upload 2011-09-22 20:23:28 +0000
99@@ -0,0 +1,86 @@
100+#!/usr/bin/env python
101+"""
102+%prog [options]
103+"""
104+
105+import os
106+import sys
107+import StringIO
108+
109+from txaws.credentials import AWSCredentials
110+from txaws.script import parse_options
111+from txaws.service import AWSServiceRegion
112+from txaws.reactor import reactor
113+from txaws.s3.multipart import MultipartManager
114+
115+def printResults(results):
116+ print results
117+ print 'next part num:', mgr.partNumber()
118+ return 0
119+
120+def printError(error):
121+ print error.value
122+ return 1
123+
124+def finish(return_code):
125+ reactor.stop(exitStatus=return_code)
126+
127+
128+options, args = parse_options(__doc__.strip())
129+if options.bucket is None:
130+ print "Error Message: A bucket name is required."
131+ sys.exit(1)
132+
133+filename = options.object_filename
134+if filename:
135+ options.object_name = os.path.basename(filename)
136+ try:
137+ options.object_data = open(filename,'rb')
138+ except Exception, error:
139+ print error
140+ sys.exit(1)
141+elif options.object_name is None:
142+ print "Error Message: An object name is required."
143+ sys.exit(1)
144+else:
145+ # turn input data into file-like obj
146+ options.object_data = StringIO.StringIO(options.object_data)
147+ options.object_data.seek(0)
148+
149+creds = AWSCredentials(options.access_key, options.secret_key)
150+region = AWSServiceRegion(creds=creds, region=options.region,
151+ s3_uri=options.url)
152+client = region.get_s3_client()
153+
154+mgr = MultipartManager(client, options.bucket, options.object_name,
155+ options.content_type)
156+
157+# send first block of data
158+def startdata(uploadId):
159+ if uploadId:
160+ senddata()
161+ else:
162+ print 'uploadId not obtained'
163+ reactor.stop(exitStatus=1)
164+
165+# send each block of data
166+def senddata(ign=None):
167+ tosend = options.object_data.read(6000000)
168+ if tosend:
169+ d = mgr.send_part(tosend)
170+ d.addErrback(printError)
171+ d.addCallback(senddata)
172+ else:
173+ d = mgr.finish()
174+ d.addCallback(printResults)
175+ d.addCallback(finish)
176+ d.addErrback(printError)
177+
178+# start init
179+d = mgr.initialize()
180+d.addCallback(startdata)
181+d.addErrback(printError)
182+
183+# We use a custom reactor so that we can return the exit status from
184+# reactor.run().
185+sys.exit(reactor.run())
186
187=== modified file 'bin/txaws-put-object'
188--- bin/txaws-put-object 2011-04-13 03:23:49 +0000
189+++ bin/txaws-put-object 2011-09-22 20:23:28 +0000
190@@ -33,7 +33,7 @@
191 if filename:
192 options.object_name = os.path.basename(filename)
193 try:
194- options.object_data = open(filename).read()
195+ options.object_data = open(filename,'rb').read()
196 except Exception, error:
197 print error
198 sys.exit(1)
199
200=== modified file 'txaws/client/base.py'
201--- txaws/client/base.py 2011-04-21 21:16:37 +0000
202+++ txaws/client/base.py 2011-09-22 20:23:28 +0000
203@@ -1,7 +1,12 @@
204+<<<<<<< TREE
205 try:
206 from xml.etree.ElementTree import ParseError
207 except ImportError:
208 from xml.parsers.expat import ExpatError as ParseError
209+=======
210+
211+from xml.parsers.expat import ExpatError
212+>>>>>>> MERGE-SOURCE
213
214 from twisted.internet import reactor, ssl
215 from twisted.web import http
216@@ -91,6 +96,7 @@
217 """
218 contextFactory = None
219 scheme, host, port, path = parse(url)
220+ #print >> sys.stderr, 'gp:'+str(path)
221 self.client = self.factory(url, *args, **kwds)
222 if scheme == 'https':
223 contextFactory = ssl.ClientContextFactory()
224
225=== modified file 'txaws/s3/client.py'
226--- txaws/s3/client.py 2011-04-14 19:50:30 +0000
227+++ txaws/s3/client.py 2011-09-22 20:23:28 +0000
228@@ -12,8 +12,13 @@
229 functionality in this wrapper.
230 """
231 import mimetypes
232+import time
233
234 from twisted.web.http import datetimeToString
235+from twisted.web2 import stream, http_headers
236+from twisted.web2 import http as web2_http
237+from twisted.web2.client import http as web2_client_http
238+from twisted.internet import protocol, reactor, ssl
239
240 from epsilon.extime import Time
241
242@@ -23,11 +28,14 @@
243 Bucket, BucketItem, BucketListing, ItemOwner, RequestPayment)
244 from txaws.s3.exception import S3Error
245 from txaws.service import AWSServiceEndpoint, S3_ENDPOINT
246-from txaws.util import XML, calculate_md5
247+from txaws.util import XML, calculate_md5, parse
248
249
250 def s3_error_wrapper(error):
251- error_wrapper(error, S3Error)
252+ print '\n'
253+ print str(error.__dict__)
254+ print str(error)
255+ #error_wrapper(error, S3Error)
256
257
258 class URLContext(object):
259@@ -64,9 +72,15 @@
260 class S3Client(BaseClient):
261 """A client for S3."""
262
263- def __init__(self, creds=None, endpoint=None, query_factory=None):
264+ def __init__(self, creds=None, endpoint=None, query_factory=None, streaming_query_factory=None):
265 if query_factory is None:
266 query_factory = Query
267+
268+ if streaming_query_factory is None:
269+ streaming_query_factory = StreamingQuery
270+
271+ self.streaming_query_factory = streaming_query_factory
272+
273 super(S3Client, self).__init__(creds, endpoint, query_factory)
274
275 def list_buckets(self):
276@@ -257,6 +271,15 @@
277 bucket=bucket, object_name=object_name)
278 return query.submit()
279
280+ def get_object_stream(self, bucket, object_name):
281+ """
282+ Get an object from a bucket.
283+ """
284+ query = self.streaming_query_factory(
285+ action="GET", creds=self.creds, endpoint=self.endpoint,
286+ bucket=bucket, object_name=object_name)
287+ return query.submit()
288+
289 def head_object(self, bucket, object_name):
290 """
291 Retrieve object metadata only.
292@@ -278,6 +301,7 @@
293 bucket=bucket, object_name=object_name)
294 return query.submit()
295
296+<<<<<<< TREE
297 def get_object_acl(self, bucket, object_name):
298 """
299 Get the access control policy for an object.
300@@ -320,6 +344,140 @@
301 """
302 return RequestPayment.from_xml(xml_bytes).payer
303
304+=======
305+ def post_object_init(self, bucket, object_name, content_type=None,
306+ metadata={}):
307+ """
308+ Starts a multipart upload to a bucket.
309+ Any existing object of the same name will be replaced.
310+
311+ returns a string uploadId
312+ """
313+ objectname_plus = object_name+'?uploads'
314+ query = self.query_factory(
315+ action="POST", creds=self.creds, endpoint=self.endpoint,
316+ bucket=bucket, object_name=objectname_plus, data='',
317+ content_type=content_type, metadata=metadata)
318+ d = query.submit()
319+ return d.addCallback(self._parse_post_init)
320+
321+ def _parse_post_init(self, xml_bytes):
322+ """parse the response to post_object_init"""
323+ root = XML(xml_bytes)
324+ uploadId = root.findtext("UploadId")
325+ return uploadId
326+
327+ def put_object_part(self,bucket,object_name,uploadId,data,partNumber,
328+ content_type=None,metadata={}):
329+ """
330+ Continues a multipart upload to a bucket.
331+ Sends another slug of data
332+
333+ returns deferred from query.submit
334+ """
335+ assert(uploadId,'start multipart upload with a .post_object_init')
336+ # this is backwards from docs, but it works
337+ parms = 'partNumber='+str(partNumber)+'&uploadId='+uploadId
338+ objectname_plus = object_name+'?'+parms
339+ query = self.query_factory(
340+ action="PUT", creds=self.creds, endpoint=self.endpoint,
341+ bucket=bucket, object_name=objectname_plus, data=data,
342+ content_type=content_type, metadata=metadata)
343+ d = query.submit()
344+ d.addCallback(query.get_response_headers)
345+ return d
346+
347+ def abort_object(self,bucket,object_name,uploadId,
348+ content_type=None,metadata={}):
349+ """
350+ Aborts a multipart upload to a bucket.
351+
352+ returns deferred from query.submit
353+ """
354+ assert(uploadId,'cannot abort an unidentified upload')
355+ objectname_plus = object_name+'?uploadId='+uploadId
356+ query = self.query_factory(
357+ action="DELETE", creds=self.creds, endpoint=self.endpoint,
358+ bucket=bucket, object_name=objectname_plus, data='',
359+ content_type=content_type, metadata=metadata)
360+ d = query.submit()
361+ return d
362+
363+ def post_object_finish(self, bucket, object_name, uploadId, partsList,
364+ content_type=None, metadata={}):
365+ """
366+ Completes the multipart upload.
367+
368+ Can be slow, returns deferred
369+ """
370+ assert(uploadId,'start multipart upload with a .post_object_init')
371+ # assemble xml
372+ body = self._buildxmlPartsList(partsList)
373+ #
374+ objectname_plus = object_name+'?uploadId='+uploadId
375+ query = self.query_factory(
376+ action="POST", creds=self.creds, endpoint=self.endpoint,
377+ bucket=bucket, object_name=objectname_plus, data=body,
378+ content_type=content_type, metadata=metadata)
379+ d = query.submit()
380+ d.addCallback(self._parse_finish_response)
381+ return d
382+
383+ def _buildxmlPartsList(self,partsList):
384+ xml = []
385+ partsList.sort(key=lambda p: p[0])
386+ xml.append('<CompleteMultipartUpload>')
387+ for pt in partsList:
388+ xml.append('<Part>')
389+ xmlp = ''.join(['<PartNumber>',pt[0],'</PartNumber>'])
390+ xml.append(xmlp)
391+ xmlp = ''.join(['<ETag>',pt[1],'</ETag>'])
392+ xml.append(xmlp)
393+ xml.append('</Part>')
394+ xml.append('</CompleteMultipartUpload>')
395+ return '\n'.join(xml)
396+
397+ def _parse_finish_response(self,xml_bytes):
398+ """parse the response to post_object_init"""
399+ root = XML(xml_bytes)
400+ errorNode = root.find("Error")
401+ uploadRes = root.findtext("Key")
402+ if errorNode:
403+ error = errorNode.findtext('Message')
404+ raise txaws.s3.exception.S3Error('Error: %s'%error)
405+ if uploadRes:
406+ return 'multipart upload complete %s'%uploadRes
407+ raise txaws.s3.exception.S3Error(
408+ 'multipart upload finish did not return valid page: \n%s'%xml_bytes)
409+
410+ def list_mpuploads(self, bucket, content_type=None, metadata={}):
411+ """
412+ Gets a list of started but not finished multipart uploads in a bucket.
413+
414+ returns a list
415+ """
416+ path = '?uploads'
417+ query = self.query_factory(
418+ action="GET", creds=self.creds, endpoint=self.endpoint,
419+ bucket=bucket, object_name=path, data='',
420+ content_type=content_type, metadata=metadata)
421+ d = query.submit()
422+ return d.addCallback(self._parse_mpupload_list)
423+
424+ def _parse_mpupload_list(self, xml_bytes):
425+ """
426+ Parse XML multipart upload list response.
427+ """
428+ root = XML(xml_bytes)
429+ uploads = []
430+ for uploads_data in root.findall("Upload"):
431+ uploadId = uploads_data.findtext("UploadId")
432+ key = uploads_data.findtext("Key")
433+ initdate = uploads_data.findtext("Initiated")
434+ uploads.append((uploadId,initdate,key))
435+ return uploads
436+
437+>>>>>>> MERGE-SOURCE
438
439 class Query(BaseQuery):
440 """A query for submission to the S3 service."""
441@@ -417,4 +575,47 @@
442 d = self.get_page(
443 url_context.get_url(), method=self.action, postdata=self.data,
444 headers=self.get_headers())
445- return d.addErrback(s3_error_wrapper)
446+ return d
447+ #return d.addErrback(s3_error_wrapper)
448+
449+class StreamingQuery(Query):
450+ def get_page(self, url, method="GET", postdata=None, headers=None,
451+ agent=None, timeout=0, cookies=None,
452+ followRedirect=True, redirectLimit=20, afterFoundGet=False):
453+ scheme, host, port, path = parse(url)
454+
455+ postdata = stream.MemoryStream(postdata)
456+
457+ rawHeaders = {}
458+
459+ for name, value in headers.items():
460+ rawHeaders[name] = [str(value)]
461+
462+ headers = http_headers.Headers(rawHeaders=rawHeaders)
463+
464+ request = web2_client_http.ClientRequest(method, url, headers, None)
465+
466+ client = protocol.ClientCreator(reactor, web2_client_http.HTTPClientProtocol)
467+
468+ if scheme == 'https':
469+ contextFactory = ssl.ClientContextFactory()
470+ deferred = client.connectSSL(host, port, contextFactory)
471+ else:
472+ deferred = client.connectTCP(host, port)
473+
474+ def connected(proto):
475+ d = proto.submitRequest(request)
476+ d.addCallback(handleResponse)
477+
478+ return d
479+
480+ def handleResponse(response):
481+ if not response.code in (200, 204):
482+ error = web2_http.HTTPError(response)
483+ raise error
484+
485+ return response.stream
486+
487+ deferred.addCallback(connected)
488+
489+ return deferred
490
491=== added file 'txaws/s3/multipart.py'
492--- txaws/s3/multipart.py 1970-01-01 00:00:00 +0000
493+++ txaws/s3/multipart.py 2011-09-22 20:23:28 +0000
494@@ -0,0 +1,81 @@
495+
496+
497+
498+class MultipartManager(object):
499+ """A client for S3."""
500+
501+ def __init__(self, client, bucket=None, object_name=None, content_type=None,
502+ metadata={}):
503+ self.client = client
504+ self.object_name = object_name
505+ self.bucket = bucket
506+ self.content_type = content_type
507+ self.metadata = metadata
508+ self.uploadId = False
509+ self.partTuples = []
510+
511+ def initialize(self):
512+ """
513+ Starts a multipart upload to a bucket.
514+ Any existing object of the same name will be replaced.
515+
516+ returns a deferred
517+ """
518+ def saveUploadId(uploadId):
519+ self.uploadId = uploadId
520+ return uploadId
521+ d = self.client.post_object_init(bucket=self.bucket,
522+ object_name=self.object_name,
523+ content_type=self.content_type,
524+ metadata=self.metadata)
525+ return d.addCallback(saveUploadId)
526+
527+ def partNumber(self):
528+ return str(len(self.partTuples)+1)
529+
530+ def send_part(self, data):
531+ """
532+ Continues a multipart upload to a bucket.
533+ Sends another slug of data
534+
535+ returns deferred from query.submit
536+ """
537+ assert(self.uploadId,'start multipart upload with a .post_object_init')
538+ def process_headers(hdrs):
539+ # save eTags and partNumbers for use by finish
540+ etag = hdrs['etag'][0].strip('""')
541+ partNum = self.partNumber()
542+ tupl = (partNum,etag)
543+ self.partTuples.append(tupl)
544+ return hdrs
545+ d = self.client.put_object_part(bucket=self.bucket,
546+ object_name=self.object_name,
547+ uploadId=self.uploadId, data=data,
548+ partNumber=self.partNumber())
549+ d.addCallback(process_headers)
550+ return d
551+
552+ def abort(self):
553+ """
554+ Aborts the multipart upload.
555+
556+ Returns deferred
557+ """
558+ d = self.client.abort_object(bucket=self.bucket,
559+ object_name=self.object_name,
560+ uploadId=self.uploadId)
561+ return d
562+
563+ def finish(self):
564+ """
565+ Completes the multipart upload.
566+
567+ Can be slow, returns deferred
568+ """
569+ d = self.client.post_object_finish(bucket=self.bucket,
570+ object_name=self.object_name,
571+ uploadId=self.uploadId,
572+ partsList=self.partTuples)
573+ return d
574+
575+
576
577=== modified file 'txaws/script.py'
578--- txaws/script.py 2009-11-22 21:53:54 +0000
579+++ txaws/script.py 2011-09-22 20:23:28 +0000
580@@ -32,6 +32,9 @@
581 parser.add_option(
582 "-c", "--content-type", dest="content_type",
583 help="content type of the object")
584+ parser.add_option(
585+ "-u", "--upload-id", dest="uploadid",
586+ help="upload id of the object")
587 options, args = parser.parse_args()
588 if not (options.access_key and options.secret_key):
589 parser.error(

Subscribers

People subscribed via source and target branches