Merge lp:~djfroofy/txaws/921419-uploadpart into lp:txaws

Proposed by Drew Smathers
Status: Merged
Approved by: Duncan McGreggor
Approved revision: 145
Merged at revision: 147
Proposed branch: lp:~djfroofy/txaws/921419-uploadpart
Merge into: lp:txaws
Diff against target: 1301 lines (+648/-82)
9 files modified
txaws/client/_producers.py (+122/-0)
txaws/client/base.py (+159/-14)
txaws/client/tests/test_base.py (+53/-11)
txaws/client/tests/test_ssl.py (+20/-4)
txaws/s3/client.py (+85/-26)
txaws/s3/model.py (+29/-0)
txaws/s3/tests/test_client.py (+148/-27)
txaws/testing/payload.py (+9/-0)
txaws/testing/producers.py (+23/-0)
To merge this branch: bzr merge lp:~djfroofy/txaws/921419-uploadpart
Reviewer Review Type Date Requested Status
Duncan McGreggor Approve
Review via email: mp+92583@code.launchpad.net

Description of the change

This depends on initmultipart branch which in turn has other branch deps.

To post a comment you must log in.
Revision history for this message
Duncan McGreggor (oubiwann) wrote :

Merge away!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file 'txaws/client/_producers.py'
--- txaws/client/_producers.py 1970-01-01 00:00:00 +0000
+++ txaws/client/_producers.py 2012-02-10 21:24:20 +0000
@@ -0,0 +1,122 @@
1import os
2
3from zope.interface import implements
4
5from twisted.internet import defer, task
6from twisted.web.iweb import UNKNOWN_LENGTH, IBodyProducer
7
8
9# Code below for FileBodyProducer cut-and-paste from twisted source.
10# Currently this is not released so here temporarily for forward compat.
11
12
13class FileBodyProducer(object):
14 """
15 L{FileBodyProducer} produces bytes from an input file object incrementally
16 and writes them to a consumer.
17
18 Since file-like objects cannot be read from in an event-driven manner,
19 L{FileBodyProducer} uses a L{Cooperator} instance to schedule reads from
20 the file. This process is also paused and resumed based on notifications
21 from the L{IConsumer} provider being written to.
22
23 The file is closed after it has been read, or if the producer is stopped
24 early.
25
26 @ivar _inputFile: Any file-like object, bytes read from which will be
27 written to a consumer.
28
29 @ivar _cooperate: A method like L{Cooperator.cooperate} which is used to
30 schedule all reads.
31
32 @ivar _readSize: The number of bytes to read from C{_inputFile} at a time.
33 """
34 implements(IBodyProducer)
35
36 # Python 2.4 doesn't have these symbolic constants
37 _SEEK_SET = getattr(os, 'SEEK_SET', 0)
38 _SEEK_END = getattr(os, 'SEEK_END', 2)
39
40 def __init__(self, inputFile, cooperator=task, readSize=2 ** 16):
41 self._inputFile = inputFile
42 self._cooperate = cooperator.cooperate
43 self._readSize = readSize
44 self.length = self._determineLength(inputFile)
45
46
47 def _determineLength(self, fObj):
48 """
49 Determine how many bytes can be read out of C{fObj} (assuming it is not
50 modified from this point on). If the determination cannot be made,
51 return C{UNKNOWN_LENGTH}.
52 """
53 try:
54 seek = fObj.seek
55 tell = fObj.tell
56 except AttributeError:
57 return UNKNOWN_LENGTH
58 originalPosition = tell()
59 seek(0, self._SEEK_END)
60 end = tell()
61 seek(originalPosition, self._SEEK_SET)
62 return end - originalPosition
63
64
65 def stopProducing(self):
66 """
67 Permanently stop writing bytes from the file to the consumer by
68 stopping the underlying L{CooperativeTask}.
69 """
70 self._inputFile.close()
71 self._task.stop()
72
73
74 def startProducing(self, consumer):
75 """
76 Start a cooperative task which will read bytes from the input file and
77 write them to C{consumer}. Return a L{Deferred} which fires after all
78 bytes have been written.
79
80 @param consumer: Any L{IConsumer} provider
81 """
82 self._task = self._cooperate(self._writeloop(consumer))
83 d = self._task.whenDone()
84 def maybeStopped(reason):
85 # IBodyProducer.startProducing's Deferred isn't support to fire if
86 # stopProducing is called.
87 reason.trap(task.TaskStopped)
88 return defer.Deferred()
89 d.addCallbacks(lambda ignored: None, maybeStopped)
90 return d
91
92
93 def _writeloop(self, consumer):
94 """
95 Return an iterator which reads one chunk of bytes from the input file
96 and writes them to the consumer for each time it is iterated.
97 """
98 while True:
99 bytes = self._inputFile.read(self._readSize)
100 if not bytes:
101 self._inputFile.close()
102 break
103 consumer.write(bytes)
104 yield None
105
106
107 def pauseProducing(self):
108 """
109 Temporarily suspend copying bytes from the input file to the consumer
110 by pausing the L{CooperativeTask} which drives that activity.
111 """
112 self._task.pause()
113
114
115 def resumeProducing(self):
116 """
117 Undo the effects of a previous C{pauseProducing} and resume copying
118 bytes to the consumer by resuming the L{CooperativeTask} which drives
119 the write activity.
120 """
121 self._task.resume()
122
0123
=== modified file 'txaws/client/base.py'
--- txaws/client/base.py 2011-11-29 08:17:54 +0000
+++ txaws/client/base.py 2012-02-10 21:24:20 +0000
@@ -3,10 +3,25 @@
3except ImportError:3except ImportError:
4 from xml.parsers.expat import ExpatError as ParseError4 from xml.parsers.expat import ExpatError as ParseError
55
6import warnings
7from StringIO import StringIO
8
6from twisted.internet.ssl import ClientContextFactory9from twisted.internet.ssl import ClientContextFactory
10from twisted.internet.protocol import Protocol
11from twisted.internet.defer import Deferred, succeed
12from twisted.python import failure
7from twisted.web import http13from twisted.web import http
14from twisted.web.iweb import UNKNOWN_LENGTH
8from twisted.web.client import HTTPClientFactory15from twisted.web.client import HTTPClientFactory
16from twisted.web.client import Agent
17from twisted.web.client import ResponseDone
18from twisted.web.http import NO_CONTENT
19from twisted.web.http_headers import Headers
9from twisted.web.error import Error as TwistedWebError20from twisted.web.error import Error as TwistedWebError
21try:
22 from twisted.web.client import FileBodyProducer
23except ImportError:
24 from txaws.client._producers import FileBodyProducer
1025
11from txaws.util import parse26from txaws.util import parse
12from txaws.credentials import AWSCredentials27from txaws.credentials import AWSCredentials
@@ -59,9 +74,10 @@
59 @param query_factory: The class or function that produces a query74 @param query_factory: The class or function that produces a query
60 object for making requests to the EC2 service.75 object for making requests to the EC2 service.
61 @param parser: A parser object for parsing responses from the EC2 service.76 @param parser: A parser object for parsing responses from the EC2 service.
77 @param receiver_factory: Factory for receiving responses from EC2 service.
62 """78 """
63 def __init__(self, creds=None, endpoint=None, query_factory=None,79 def __init__(self, creds=None, endpoint=None, query_factory=None,
64 parser=None):80 parser=None, receiver_factory=None):
65 if creds is None:81 if creds is None:
66 creds = AWSCredentials()82 creds = AWSCredentials()
67 if endpoint is None:83 if endpoint is None:
@@ -69,22 +85,109 @@
69 self.creds = creds85 self.creds = creds
70 self.endpoint = endpoint86 self.endpoint = endpoint
71 self.query_factory = query_factory87 self.query_factory = query_factory
88 self.receiver_factory = receiver_factory
72 self.parser = parser89 self.parser = parser
7390
91class StreamingError(Exception):
92 """
93 Raised if more data or less data is received than expected.
94 """
95
96
97class StringIOBodyReceiver(Protocol):
98 """
99 Simple StringIO-based HTTP response body receiver.
100
101 TODO: perhaps there should be an interface specifying why
102 finished (Deferred) and content_length are necessary and
103 how to used them; eg. callback/errback finished on completion.
104 """
105 finished = None
106 content_length = None
107
108 def __init__(self):
109 self._buffer = StringIO()
110 self._received = 0
111
112 def dataReceived(self, bytes):
113 streaming = self.content_length is UNKNOWN_LENGTH
114 if not streaming and (self._received > self.content_length):
115 self.transport.loseConnection()
116 raise StreamingError(
117 "Buffer overflow - received more data than "
118 "Content-Length dictated: %d" % self.content_length)
119 # TODO should be some limit on how much we receive
120 self._buffer.write(bytes)
121 self._received += len(bytes)
122
123 def connectionLost(self, reason):
124 reason.trap(ResponseDone)
125 d = self.finished
126 self.finished = None
127 streaming = self.content_length is UNKNOWN_LENGTH
128 if streaming or (self._received == self.content_length):
129 d.callback(self._buffer.getvalue())
130 else:
131 f = failure.Failure(StreamingError("Connection lost before "
132 "receiving all data"))
133 d.errback(f)
134
135
136class WebClientContextFactory(ClientContextFactory):
137
138 def getContext(self, hostname, port):
139 return ClientContextFactory.getContext(self)
140
141
142class WebVerifyingContextFactory(VerifyingContextFactory):
143
144 def getContext(self, hostname, port):
145 return VerifyingContextFactory.getContext(self)
146
147
148class FakeClient(object):
149 """
150 XXX
151 A fake client object for some degree of backwards compatability for
152 code using the client attibute on BaseQuery to check url, status
153 etc.
154 """
155 url = None
156 status = None
74157
75class BaseQuery(object):158class BaseQuery(object):
76159
77 def __init__(self, action=None, creds=None, endpoint=None, reactor=None):160 def __init__(self, action=None, creds=None, endpoint=None, reactor=None,
161 body_producer=None, receiver_factory=None):
78 if not action:162 if not action:
79 raise TypeError("The query requires an action parameter.")163 raise TypeError("The query requires an action parameter.")
80 self.factory = HTTPClientFactory
81 self.action = action164 self.action = action
82 self.creds = creds165 self.creds = creds
83 self.endpoint = endpoint166 self.endpoint = endpoint
84 if reactor is None:167 if reactor is None:
85 from twisted.internet import reactor168 from twisted.internet import reactor
86 self.reactor = reactor169 self.reactor = reactor
87 self.client = None170 self._client = None
171 self.request_headers = None
172 self.response_headers = None
173 self.body_producer = body_producer
174 self.receiver_factory = receiver_factory or StringIOBodyReceiver
175
176 @property
177 def client(self):
178 if self._client is None:
179 self._client_deprecation_warning()
180 self._client = FakeClient()
181 return self._client
182
183 @client.setter
184 def client(self, value):
185 self._client_deprecation_warning()
186 self._client = value
187
188 def _client_deprecation_warning(self):
189 warnings.warn('The client attribute on BaseQuery is deprecated and'
190 ' will go away in future release.')
88191
89 def get_page(self, url, *args, **kwds):192 def get_page(self, url, *args, **kwds):
90 """193 """
@@ -95,16 +198,39 @@
95 """198 """
96 contextFactory = None199 contextFactory = None
97 scheme, host, port, path = parse(url)200 scheme, host, port, path = parse(url)
98 self.client = self.factory(url, *args, **kwds)201 data = kwds.get('postdata', None)
202 self._method = method = kwds.get('method', 'GET')
203 self.request_headers = self._headers(kwds.get('headers', {}))
204 if (self.body_producer is None) and (data is not None):
205 self.body_producer = FileBodyProducer(StringIO(data))
99 if scheme == "https":206 if scheme == "https":
100 if self.endpoint.ssl_hostname_verification:207 if self.endpoint.ssl_hostname_verification:
101 contextFactory = VerifyingContextFactory(host)208 contextFactory = WebVerifyingContextFactory(host)
102 else:209 else:
103 contextFactory = ClientContextFactory()210 contextFactory = WebClientContextFactory()
104 self.reactor.connectSSL(host, port, self.client, contextFactory)211 agent = Agent(self.reactor, contextFactory)
212 self.client.url = url
213 d = agent.request(method, url, self.request_headers,
214 self.body_producer)
105 else:215 else:
106 self.reactor.connectTCP(host, port, self.client)216 agent = Agent(self.reactor)
107 return self.client.deferred217 d = agent.request(method, url, self.request_headers,
218 self.body_producer)
219 d.addCallback(self._handle_response)
220 return d
221
222 def _headers(self, headers_dict):
223 """
224 Convert dictionary of headers into twisted.web.client.Headers object.
225 """
226 return Headers(dict((k,[v]) for (k,v) in headers_dict.items()))
227
228 def _unpack_headers(self, headers):
229 """
230 Unpack twisted.web.client.Headers object to dict. This is to provide
231 backwards compatability.
232 """
233 return dict((k,v[0]) for (k,v) in headers.getAllRawHeaders())
108234
109 def get_request_headers(self, *args, **kwds):235 def get_request_headers(self, *args, **kwds):
110 """236 """
@@ -114,8 +240,26 @@
114 The AWS S3 API depends upon setting headers. This method is provided as240 The AWS S3 API depends upon setting headers. This method is provided as
115 a convenience for debugging issues with the S3 communications.241 a convenience for debugging issues with the S3 communications.
116 """242 """
117 if self.client:243 if self.request_headers:
118 return self.client.headers244 return self._unpack_headers(self.request_headers)
245
246 def _handle_response(self, response):
247 """
248 Handle the HTTP response by memoing the headers and then delivering
249 bytes.
250 """
251 self.client.status = response.code
252 self.response_headers = headers = response.headers
253 # XXX This workaround (which needs to be improved at that) for possible
254 # bug in Twisted with new client:
255 # http://twistedmatrix.com/trac/ticket/5476
256 if self._method.upper() == 'HEAD' or response.code == NO_CONTENT:
257 return succeed('')
258 receiver = self.receiver_factory()
259 receiver.finished = d = Deferred()
260 receiver.content_length = response.length
261 response.deliverBody(receiver)
262 return d
119263
120 def get_response_headers(self, *args, **kwargs):264 def get_response_headers(self, *args, **kwargs):
121 """265 """
@@ -125,5 +269,6 @@
125 The AWS S3 API depends upon setting headers. This method is used by the269 The AWS S3 API depends upon setting headers. This method is used by the
126 head_object API call for getting a S3 object's metadata.270 head_object API call for getting a S3 object's metadata.
127 """271 """
128 if self.client:272 if self.response_headers:
129 return self.client.response_headers273 return self._unpack_headers(self.response_headers)
274
130275
=== modified file 'txaws/client/tests/test_base.py'
--- txaws/client/tests/test_base.py 2012-01-26 18:43:48 +0000
+++ txaws/client/tests/test_base.py 2012-02-10 21:24:20 +0000
@@ -1,6 +1,9 @@
1import os1import os
22
3from zope.interface import implements
4
3from twisted.internet import reactor5from twisted.internet import reactor
6from twisted.internet.defer import succeed
4from twisted.internet.error import ConnectionRefusedError7from twisted.internet.error import ConnectionRefusedError
5from twisted.protocols.policies import WrappingFactory8from twisted.protocols.policies import WrappingFactory
6from twisted.python import log9from twisted.python import log
@@ -8,14 +11,16 @@
8from twisted.python.failure import Failure11from twisted.python.failure import Failure
9from twisted.test.test_sslverify import makeCertificate12from twisted.test.test_sslverify import makeCertificate
10from twisted.web import server, static13from twisted.web import server, static
14from twisted.web.iweb import IBodyProducer
11from twisted.web.client import HTTPClientFactory15from twisted.web.client import HTTPClientFactory
12from twisted.web.error import Error as TwistedWebError16from twisted.web.error import Error as TwistedWebError
1317
14from txaws.client import ssl18from txaws.client import ssl
15from txaws.client.base import BaseClient, BaseQuery, error_wrapper19from txaws.client.base import BaseClient, BaseQuery, error_wrapper
20from txaws.client.base import StringIOBodyReceiver
16from txaws.service import AWSServiceEndpoint21from txaws.service import AWSServiceEndpoint
17from txaws.testing.base import TXAWSTestCase22from txaws.testing.base import TXAWSTestCase
1823from txaws.testing.producers import StringBodyProducer
1924
20class ErrorWrapperTestCase(TXAWSTestCase):25class ErrorWrapperTestCase(TXAWSTestCase):
2126
@@ -99,7 +104,6 @@
99104
100 def test_creation(self):105 def test_creation(self):
101 query = BaseQuery("an action", "creds", "http://endpoint")106 query = BaseQuery("an action", "creds", "http://endpoint")
102 self.assertEquals(query.factory, HTTPClientFactory)
103 self.assertEquals(query.action, "an action")107 self.assertEquals(query.action, "an action")
104 self.assertEquals(query.creds, "creds")108 self.assertEquals(query.creds, "creds")
105 self.assertEquals(query.endpoint, "http://endpoint")109 self.assertEquals(query.endpoint, "http://endpoint")
@@ -142,16 +146,52 @@
142 def test_get_response_headers_with_client(self):146 def test_get_response_headers_with_client(self):
143147
144 def check_results(results):148 def check_results(results):
149 #self.assertEquals(sorted(results.keys()), [
150 # "accept-ranges", "content-length", "content-type", "date",
151 # "last-modified", "server"])
152 # XXX I think newclient exludes content-length from headers?
153 # Also the header names are capitalized ... do we need to worry
154 # about backwards compat?
145 self.assertEquals(sorted(results.keys()), [155 self.assertEquals(sorted(results.keys()), [
146 "accept-ranges", "content-length", "content-type", "date",156 "Accept-Ranges", "Content-Type", "Date",
147 "last-modified", "server"])157 "Last-Modified", "Server"])
148 self.assertEquals(len(results.values()), 6)158 self.assertEquals(len(results.values()), 5)
149159
150 query = BaseQuery("an action", "creds", "http://endpoint")160 query = BaseQuery("an action", "creds", "http://endpoint")
151 d = query.get_page(self._get_url("file"))161 d = query.get_page(self._get_url("file"))
152 d.addCallback(query.get_response_headers)162 d.addCallback(query.get_response_headers)
153 return d.addCallback(check_results)163 return d.addCallback(check_results)
154164
165 def test_custom_body_producer(self):
166
167 def check_producer_was_used(ignore):
168 self.assertEqual(producer.written, 'test data')
169
170 producer = StringBodyProducer('test data')
171 query = BaseQuery("an action", "creds", "http://endpoint",
172 body_producer=producer)
173 d = query.get_page(self._get_url("file"), method='PUT')
174 return d.addCallback(check_producer_was_used)
175
176 def test_custom_receiver_factory(self):
177
178 class TestReceiverProtocol(StringIOBodyReceiver):
179 used = False
180
181 def __init__(self):
182 StringIOBodyReceiver.__init__(self)
183 TestReceiverProtocol.used = True
184
185 def check_used(ignore):
186 self.assert_(TestReceiverProtocol.used)
187
188 query = BaseQuery("an action", "creds", "http://endpoint",
189 receiver_factory=TestReceiverProtocol)
190 d = query.get_page(self._get_url("file"))
191 d.addCallback(self.assertEquals, "0123456789")
192 d.addCallback(check_used)
193 return d
194
155 # XXX for systems that don't have certs in the DEFAULT_CERT_PATH, this test195 # XXX for systems that don't have certs in the DEFAULT_CERT_PATH, this test
156 # will fail; instead, let's create some certs in a temp directory and set196 # will fail; instead, let's create some certs in a temp directory and set
157 # the DEFAULT_CERT_PATH to point there.197 # the DEFAULT_CERT_PATH to point there.
@@ -167,8 +207,9 @@
167 def __init__(self):207 def __init__(self):
168 self.connects = []208 self.connects = []
169209
170 def connectSSL(self, host, port, client, factory):210 def connectSSL(self, host, port, factory, contextFactory, timeout,
171 self.connects.append((host, port, client, factory))211 bindAddress):
212 self.connects.append((host, port, factory, contextFactory))
172213
173 certs = makeCertificate(O="Test Certificate", CN="something")[1]214 certs = makeCertificate(O="Test Certificate", CN="something")[1]
174 self.patch(ssl, "_ca_certs", certs)215 self.patch(ssl, "_ca_certs", certs)
@@ -176,9 +217,10 @@
176 endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)217 endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
177 query = BaseQuery("an action", "creds", endpoint, fake_reactor)218 query = BaseQuery("an action", "creds", endpoint, fake_reactor)
178 query.get_page("https://example.com/file")219 query.get_page("https://example.com/file")
179 [(host, port, client, factory)] = fake_reactor.connects220 [(host, port, factory, contextFactory)] = fake_reactor.connects
180 self.assertEqual("example.com", host)221 self.assertEqual("example.com", host)
181 self.assertEqual(443, port)222 self.assertEqual(443, port)
182 self.assertTrue(isinstance(factory, ssl.VerifyingContextFactory))223 wrappedFactory = contextFactory._webContext
183 self.assertEqual("example.com", factory.host)224 self.assertTrue(isinstance(wrappedFactory, ssl.VerifyingContextFactory))
184 self.assertNotEqual([], factory.caCerts)225 self.assertEqual("example.com", wrappedFactory.host)
226 self.assertNotEqual([], wrappedFactory.caCerts)
185227
=== modified file 'txaws/client/tests/test_ssl.py'
--- txaws/client/tests/test_ssl.py 2012-01-26 22:54:44 +0000
+++ txaws/client/tests/test_ssl.py 2012-02-10 21:24:20 +0000
@@ -12,6 +12,10 @@
12from twisted.python.filepath import FilePath12from twisted.python.filepath import FilePath
13from twisted.test.test_sslverify import makeCertificate13from twisted.test.test_sslverify import makeCertificate
14from twisted.web import server, static14from twisted.web import server, static
15try:
16 from twisted.web.client import ResponseFailed
17except ImportError:
18 from twisted.web._newclient import ResponseFailed
1519
16from txaws import exception20from txaws import exception
17from txaws.client import ssl21from txaws.client import ssl
@@ -32,6 +36,11 @@
32PUBSANKEY = sibpath("public_san.ssl")36PUBSANKEY = sibpath("public_san.ssl")
3337
3438
39class WebDefaultOpenSSLContextFactory(DefaultOpenSSLContextFactory):
40 def getContext(self, hostname=None, port=None):
41 return DefaultOpenSSLContextFactory.getContext(self)
42
43
35class BaseQuerySSLTestCase(TXAWSTestCase):44class BaseQuerySSLTestCase(TXAWSTestCase):
3645
37 def setUp(self):46 def setUp(self):
@@ -75,7 +84,7 @@
75 The L{VerifyingContextFactory} properly allows to connect to the84 The L{VerifyingContextFactory} properly allows to connect to the
76 endpoint if the certificates match.85 endpoint if the certificates match.
77 """86 """
78 context_factory = DefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)87 context_factory = WebDefaultOpenSSLContextFactory(PRIVKEY, PUBKEY)
79 self.port = reactor.listenSSL(88 self.port = reactor.listenSSL(
80 0, self.site, context_factory, interface="127.0.0.1")89 0, self.site, context_factory, interface="127.0.0.1")
81 self.portno = self.port.getHost().port90 self.portno = self.port.getHost().port
@@ -90,7 +99,7 @@
90 The L{VerifyingContextFactory} fails with a SSL error the certificates99 The L{VerifyingContextFactory} fails with a SSL error the certificates
91 can't be checked.100 can't be checked.
92 """101 """
93 context_factory = DefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)102 context_factory = WebDefaultOpenSSLContextFactory(BADPRIVKEY, BADPUBKEY)
94 self.port = reactor.listenSSL(103 self.port = reactor.listenSSL(
95 0, self.site, context_factory, interface="127.0.0.1")104 0, self.site, context_factory, interface="127.0.0.1")
96 self.portno = self.port.getHost().port105 self.portno = self.port.getHost().port
@@ -98,7 +107,14 @@
98 endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)107 endpoint = AWSServiceEndpoint(ssl_hostname_verification=True)
99 query = BaseQuery("an action", "creds", endpoint)108 query = BaseQuery("an action", "creds", endpoint)
100 d = query.get_page(self._get_url("file"))109 d = query.get_page(self._get_url("file"))
101 return self.assertFailure(d, SSLError)110 def fail(ignore):
111 self.fail('Expected SSLError')
112 def check_exception(why):
113 # XXX kind of a mess here ... need to unwrap the
114 # exception and check
115 root_exc = why.value[0][0].value
116 self.assert_(isinstance(root_exc, SSLError))
117 return d.addCallbacks(fail, check_exception)
102118
103 def test_ssl_verification_bypassed(self):119 def test_ssl_verification_bypassed(self):
104 """120 """
@@ -121,7 +137,7 @@
121 L{VerifyingContextFactory} supports checking C{subjectAltName} in the137 L{VerifyingContextFactory} supports checking C{subjectAltName} in the
122 certificate if it's available.138 certificate if it's available.
123 """139 """
124 context_factory = DefaultOpenSSLContextFactory(PRIVSANKEY, PUBSANKEY)140 context_factory = WebDefaultOpenSSLContextFactory(PRIVSANKEY, PUBSANKEY)
125 self.port = reactor.listenSSL(141 self.port = reactor.listenSSL(
126 0, self.site, context_factory, interface="127.0.0.1")142 0, self.site, context_factory, interface="127.0.0.1")
127 self.portno = self.port.getHost().port143 self.portno = self.port.getHost().port
128144
=== modified file 'txaws/s3/client.py'
--- txaws/s3/client.py 2012-01-28 00:39:00 +0000
+++ txaws/s3/client.py 2012-02-10 21:24:20 +0000
@@ -23,7 +23,7 @@
23from txaws.s3.model import (23from txaws.s3.model import (
24 Bucket, BucketItem, BucketListing, ItemOwner, LifecycleConfiguration,24 Bucket, BucketItem, BucketListing, ItemOwner, LifecycleConfiguration,
25 LifecycleConfigurationRule, NotificationConfiguration, RequestPayment,25 LifecycleConfigurationRule, NotificationConfiguration, RequestPayment,
26 VersioningConfiguration, WebsiteConfiguration)26 VersioningConfiguration, WebsiteConfiguration, MultipartInitiationResponse)
27from txaws.s3.exception import S3Error27from txaws.s3.exception import S3Error
28from txaws.service import AWSServiceEndpoint, S3_ENDPOINT28from txaws.service import AWSServiceEndpoint, S3_ENDPOINT
29from txaws.util import XML, calculate_md529from txaws.util import XML, calculate_md5
@@ -74,10 +74,12 @@
74class S3Client(BaseClient):74class S3Client(BaseClient):
75 """A client for S3."""75 """A client for S3."""
7676
77 def __init__(self, creds=None, endpoint=None, query_factory=None):77 def __init__(self, creds=None, endpoint=None, query_factory=None,
78 receiver_factory=None):
78 if query_factory is None:79 if query_factory is None:
79 query_factory = Query80 query_factory = Query
80 super(S3Client, self).__init__(creds, endpoint, query_factory)81 super(S3Client, self).__init__(creds, endpoint, query_factory,
82 receiver_factory=receiver_factory)
8183
82 def list_buckets(self):84 def list_buckets(self):
83 """85 """
@@ -87,7 +89,8 @@
87 the request.89 the request.
88 """90 """
89 query = self.query_factory(91 query = self.query_factory(
90 action="GET", creds=self.creds, endpoint=self.endpoint)92 action="GET", creds=self.creds, endpoint=self.endpoint,
93 receiver_factory=self.receiver_factory)
91 d = query.submit()94 d = query.submit()
92 return d.addCallback(self._parse_list_buckets)95 return d.addCallback(self._parse_list_buckets)
9396
@@ -131,7 +134,7 @@
131 """134 """
132 query = self.query_factory(135 query = self.query_factory(
133 action="GET", creds=self.creds, endpoint=self.endpoint,136 action="GET", creds=self.creds, endpoint=self.endpoint,
134 bucket=bucket)137 bucket=bucket, receiver_factory=self.receiver_factory)
135 d = query.submit()138 d = query.submit()
136 return d.addCallback(self._parse_get_bucket)139 return d.addCallback(self._parse_get_bucket)
137140
@@ -174,7 +177,8 @@
174 """177 """
175 query = self.query_factory(action="GET", creds=self.creds,178 query = self.query_factory(action="GET", creds=self.creds,
176 endpoint=self.endpoint, bucket=bucket,179 endpoint=self.endpoint, bucket=bucket,
177 object_name="?location")180 object_name="?location",
181 receiver_factory=self.receiver_factory)
178 d = query.submit()182 d = query.submit()
179 return d.addCallback(self._parse_bucket_location)183 return d.addCallback(self._parse_bucket_location)
180184
@@ -193,7 +197,8 @@
193 """197 """
194 query = self.query_factory(198 query = self.query_factory(
195 action='GET', creds=self.creds, endpoint=self.endpoint,199 action='GET', creds=self.creds, endpoint=self.endpoint,
196 bucket=bucket, object_name='?lifecycle')200 bucket=bucket, object_name='?lifecycle',
201 receiver_factory=self.receiver_factory)
197 return query.submit().addCallback(self._parse_lifecycle_config)202 return query.submit().addCallback(self._parse_lifecycle_config)
198203
199 def _parse_lifecycle_config(self, xml_bytes):204 def _parse_lifecycle_config(self, xml_bytes):
@@ -221,7 +226,8 @@
221 """226 """
222 query = self.query_factory(227 query = self.query_factory(
223 action='GET', creds=self.creds, endpoint=self.endpoint,228 action='GET', creds=self.creds, endpoint=self.endpoint,
224 bucket=bucket, object_name='?website')229 bucket=bucket, object_name='?website',
230 receiver_factory=self.receiver_factory)
225 return query.submit().addCallback(self._parse_website_config)231 return query.submit().addCallback(self._parse_website_config)
226232
227 def _parse_website_config(self, xml_bytes):233 def _parse_website_config(self, xml_bytes):
@@ -242,7 +248,8 @@
242 """248 """
243 query = self.query_factory(249 query = self.query_factory(
244 action='GET', creds=self.creds, endpoint=self.endpoint,250 action='GET', creds=self.creds, endpoint=self.endpoint,
245 bucket=bucket, object_name='?notification')251 bucket=bucket, object_name='?notification',
252 receiver_factory=self.receiver_factory)
246 return query.submit().addCallback(self._parse_notification_config)253 return query.submit().addCallback(self._parse_notification_config)
247254
248 def _parse_notification_config(self, xml_bytes):255 def _parse_notification_config(self, xml_bytes):
@@ -262,7 +269,8 @@
262 """269 """
263 query = self.query_factory(270 query = self.query_factory(
264 action='GET', creds=self.creds, endpoint=self.endpoint,271 action='GET', creds=self.creds, endpoint=self.endpoint,
265 bucket=bucket, object_name='?versioning')272 bucket=bucket, object_name='?versioning',
273 receiver_factory=self.receiver_factory)
266 return query.submit().addCallback(self._parse_versioning_config)274 return query.submit().addCallback(self._parse_versioning_config)
267275
268 def _parse_versioning_config(self, xml_bytes):276 def _parse_versioning_config(self, xml_bytes):
@@ -279,7 +287,8 @@
279 """287 """
280 query = self.query_factory(288 query = self.query_factory(
281 action='GET', creds=self.creds, endpoint=self.endpoint,289 action='GET', creds=self.creds, endpoint=self.endpoint,
282 bucket=bucket, object_name='?acl')290 bucket=bucket, object_name='?acl',
291 receiver_factory=self.receiver_factory)
283 return query.submit().addCallback(self._parse_acl)292 return query.submit().addCallback(self._parse_acl)
284293
285 def put_bucket_acl(self, bucket, access_control_policy):294 def put_bucket_acl(self, bucket, access_control_policy):
@@ -289,7 +298,8 @@
289 data = access_control_policy.to_xml()298 data = access_control_policy.to_xml()
290 query = self.query_factory(299 query = self.query_factory(
291 action='PUT', creds=self.creds, endpoint=self.endpoint,300 action='PUT', creds=self.creds, endpoint=self.endpoint,
292 bucket=bucket, object_name='?acl', data=data)301 bucket=bucket, object_name='?acl', data=data,
302 receiver_factory=self.receiver_factory)
293 return query.submit().addCallback(self._parse_acl)303 return query.submit().addCallback(self._parse_acl)
294304
295 def _parse_acl(self, xml_bytes):305 def _parse_acl(self, xml_bytes):
@@ -299,8 +309,8 @@
299 """309 """
300 return AccessControlPolicy.from_xml(xml_bytes)310 return AccessControlPolicy.from_xml(xml_bytes)
301311
302 def put_object(self, bucket, object_name, data, content_type=None,312 def put_object(self, bucket, object_name, data=None, content_type=None,
303 metadata={}, amz_headers={}):313 metadata={}, amz_headers={}, body_producer=None):
304 """314 """
305 Put an object in a bucket.315 Put an object in a bucket.
306316
@@ -318,7 +328,8 @@
318 action="PUT", creds=self.creds, endpoint=self.endpoint,328 action="PUT", creds=self.creds, endpoint=self.endpoint,
319 bucket=bucket, object_name=object_name, data=data,329 bucket=bucket, object_name=object_name, data=data,
320 content_type=content_type, metadata=metadata,330 content_type=content_type, metadata=metadata,
321 amz_headers=amz_headers)331 amz_headers=amz_headers, body_producer=body_producer,
332 receiver_factory=self.receiver_factory)
322 return query.submit()333 return query.submit()
323334
324 def copy_object(self, source_bucket, source_object_name, dest_bucket=None,335 def copy_object(self, source_bucket, source_object_name, dest_bucket=None,
@@ -344,7 +355,8 @@
344 query = self.query_factory(355 query = self.query_factory(
345 action="PUT", creds=self.creds, endpoint=self.endpoint,356 action="PUT", creds=self.creds, endpoint=self.endpoint,
346 bucket=dest_bucket, object_name=dest_object_name,357 bucket=dest_bucket, object_name=dest_object_name,
347 metadata=metadata, amz_headers=amz_headers)358 metadata=metadata, amz_headers=amz_headers,
359 receiver_factory=self.receiver_factory)
348 return query.submit()360 return query.submit()
349361
350 def get_object(self, bucket, object_name):362 def get_object(self, bucket, object_name):
@@ -353,7 +365,8 @@
353 """365 """
354 query = self.query_factory(366 query = self.query_factory(
355 action="GET", creds=self.creds, endpoint=self.endpoint,367 action="GET", creds=self.creds, endpoint=self.endpoint,
356 bucket=bucket, object_name=object_name)368 bucket=bucket, object_name=object_name,
369 receiver_factory=self.receiver_factory)
357 return query.submit()370 return query.submit()
358371
359 def head_object(self, bucket, object_name):372 def head_object(self, bucket, object_name):
@@ -384,7 +397,8 @@
384 data = access_control_policy.to_xml()397 data = access_control_policy.to_xml()
385 query = self.query_factory(398 query = self.query_factory(
386 action='PUT', creds=self.creds, endpoint=self.endpoint,399 action='PUT', creds=self.creds, endpoint=self.endpoint,
387 bucket=bucket, object_name='%s?acl' % object_name, data=data)400 bucket=bucket, object_name='%s?acl' % object_name, data=data,
401 receiver_factory=self.receiver_factory)
388 return query.submit().addCallback(self._parse_acl)402 return query.submit().addCallback(self._parse_acl)
389403
390 def get_object_acl(self, bucket, object_name):404 def get_object_acl(self, bucket, object_name):
@@ -393,7 +407,8 @@
393 """407 """
394 query = self.query_factory(408 query = self.query_factory(
395 action='GET', creds=self.creds, endpoint=self.endpoint,409 action='GET', creds=self.creds, endpoint=self.endpoint,
396 bucket=bucket, object_name='%s?acl' % object_name)410 bucket=bucket, object_name='%s?acl' % object_name,
411 receiver_factory=self.receiver_factory)
397 return query.submit().addCallback(self._parse_acl)412 return query.submit().addCallback(self._parse_acl)
398413
399 def put_request_payment(self, bucket, payer):414 def put_request_payment(self, bucket, payer):
@@ -407,7 +422,8 @@
407 data = RequestPayment(payer).to_xml()422 data = RequestPayment(payer).to_xml()
408 query = self.query_factory(423 query = self.query_factory(
409 action="PUT", creds=self.creds, endpoint=self.endpoint,424 action="PUT", creds=self.creds, endpoint=self.endpoint,
410 bucket=bucket, object_name="?requestPayment", data=data)425 bucket=bucket, object_name="?requestPayment", data=data,
426 receiver_factory=self.receiver_factory)
411 return query.submit()427 return query.submit()
412428
413 def get_request_payment(self, bucket):429 def get_request_payment(self, bucket):
@@ -419,7 +435,8 @@
419 """435 """
420 query = self.query_factory(436 query = self.query_factory(
421 action="GET", creds=self.creds, endpoint=self.endpoint,437 action="GET", creds=self.creds, endpoint=self.endpoint,
422 bucket=bucket, object_name="?requestPayment")438 bucket=bucket, object_name="?requestPayment",
439 receiver_factory=self.receiver_factory)
423 return query.submit().addCallback(self._parse_get_request_payment)440 return query.submit().addCallback(self._parse_get_request_payment)
424441
425 def _parse_get_request_payment(self, xml_bytes):442 def _parse_get_request_payment(self, xml_bytes):
@@ -429,17 +446,53 @@
429 """446 """
430 return RequestPayment.from_xml(xml_bytes).payer447 return RequestPayment.from_xml(xml_bytes).payer
431448
449 def init_multipart_upload(self, bucket, object_name, content_type=None,
450 metadata={}):
451 """
452 Initiate a multipart upload to a bucket.
453
454 @param bucket: The name of the bucket
455 @param object_name: The object name
456 @param content_type: The Content-Type for the object
457 @param metadata: C{dict} containing additional metadata
458 @return: C{str} upload_id
459 """
460 objectname_plus = '%s?uploads' % object_name
461 query = self.query_factory(
462 action="POST", creds=self.creds, endpoint=self.endpoint,
463 bucket=bucket, object_name=objectname_plus, data='',
464 content_type=content_type, metadata=metadata)
465 d = query.submit()
466 return d.addCallback(MultipartInitiationResponse.from_xml)
467
468 def upload_part(self, bucket, object_name, upload_id, part_number, data=None,
469 content_type=None, metadata={}, body_producer=None):
470 """
471 Upload a part of data correcsponding to a multipart upload.
472
473 @return: the C{Deferred} from underlying query.submit() call
474 """
475 parms = 'partNumber=%s&uploadId=%s' % (str(part_number), upload_id)
476 objectname_plus = '%s?%s' % (object_name, parms)
477 query = self.query_factory(
478 action="PUT", creds=self.creds, endpoint=self.endpoint,
479 bucket=bucket, object_name=objectname_plus, data=data,
480 content_type=content_type, metadata=metadata, body_producer=body_producer,
481 receiver_factory=self.receiver_factory)
482 d = query.submit()
483 return d.addCallback(query.get_response_headers)
432484
433class Query(BaseQuery):485class Query(BaseQuery):
434 """A query for submission to the S3 service."""486 """A query for submission to the S3 service."""
435487
436 def __init__(self, bucket=None, object_name=None, data="",488 def __init__(self, bucket=None, object_name=None, data="",
437 content_type=None, metadata={}, amz_headers={}, *args,489 content_type=None, metadata={}, amz_headers={},
438 **kwargs):490 body_producer=None, *args, **kwargs):
439 super(Query, self).__init__(*args, **kwargs)491 super(Query, self).__init__(*args, **kwargs)
440 self.bucket = bucket492 self.bucket = bucket
441 self.object_name = object_name493 self.object_name = object_name
442 self.data = data494 self.data = data
495 self.body_producer = body_producer
443 self.content_type = content_type496 self.content_type = content_type
444 self.metadata = metadata497 self.metadata = metadata
445 self.amz_headers = amz_headers498 self.amz_headers = amz_headers
@@ -463,9 +516,14 @@
463 """516 """
464 Build the list of headers needed in order to perform S3 operations.517 Build the list of headers needed in order to perform S3 operations.
465 """518 """
466 headers = {"Content-Length": len(self.data),519 if self.body_producer:
467 "Content-MD5": calculate_md5(self.data),520 content_length = self.body_producer.length
521 else:
522 content_length = len(self.data)
523 headers = {"Content-Length": content_length,
468 "Date": self.date}524 "Date": self.date}
525 if self.body_producer is None:
526 headers["Content-MD5"] = calculate_md5(self.data)
469 for key, value in self.metadata.iteritems():527 for key, value in self.metadata.iteritems():
470 headers["x-amz-meta-" + key] = value528 headers["x-amz-meta-" + key] = value
471 for key, value in self.amz_headers.iteritems():529 for key, value in self.amz_headers.iteritems():
@@ -529,5 +587,6 @@
529 self.endpoint, self.bucket, self.object_name)587 self.endpoint, self.bucket, self.object_name)
530 d = self.get_page(588 d = self.get_page(
531 url_context.get_url(), method=self.action, postdata=self.data,589 url_context.get_url(), method=self.action, postdata=self.data,
532 headers=self.get_headers())590 headers=self.get_headers(), body_producer=self.body_producer,
591 receiver_factory=self.receiver_factory)
533 return d.addErrback(s3_error_wrapper)592 return d.addErrback(s3_error_wrapper)
534593
=== modified file 'txaws/s3/model.py'
--- txaws/s3/model.py 2012-01-28 00:42:38 +0000
+++ txaws/s3/model.py 2012-02-10 21:24:20 +0000
@@ -150,3 +150,32 @@
150 """150 """
151 root = XML(xml_bytes)151 root = XML(xml_bytes)
152 return cls(root.findtext("Payer"))152 return cls(root.findtext("Payer"))
153
154
155class MultipartInitiationResponse(object):
156 """
157 A response to Initiate Multipart Upload
158 """
159
160 def __init__(self, bucket, object_name, upload_id):
161 """
162 @param bucket: The bucket name
163 @param object_name: The object name
164 @param upload_id: The upload id
165 """
166 self.bucket = bucket
167 self.object_name = object_name
168 self.upload_id = upload_id
169
170 @classmethod
171 def from_xml(cls, xml_bytes):
172 """
173 Create an instance of this from XML bytes.
174
175 @param xml_bytes: C{str} bytes of XML to parse
176 @return: and instance of L{MultipartInitiationResponse}
177 """
178 root = XML(xml_bytes)
179 return cls(root.findtext('Bucket'),
180 root.findtext('Key'),
181 root.findtext('UploadId'))
153182
=== modified file 'txaws/s3/tests/test_client.py'
--- txaws/s3/tests/test_client.py 2012-01-28 00:44:53 +0000
+++ txaws/s3/tests/test_client.py 2012-02-10 21:24:20 +0000
@@ -9,7 +9,8 @@
9else:9else:
10 s3clientSkip = None10 s3clientSkip = None
11from txaws.s3.acls import AccessControlPolicy11from txaws.s3.acls import AccessControlPolicy
12from txaws.s3.model import RequestPayment12from txaws.s3.model import RequestPayment, MultipartInitiationResponse
13from txaws.testing.producers import StringBodyProducer
13from txaws.service import AWSServiceEndpoint14from txaws.service import AWSServiceEndpoint
14from txaws.testing import payload15from txaws.testing import payload
15from txaws.testing.base import TXAWSTestCase16from txaws.testing.base import TXAWSTestCase
@@ -100,7 +101,8 @@
100101
101 class StubQuery(client.Query):102 class StubQuery(client.Query):
102103
103 def __init__(query, action, creds, endpoint):104 def __init__(query, action, creds, endpoint,
105 body_producer=None, receiver_factory=None):
104 super(StubQuery, query).__init__(106 super(StubQuery, query).__init__(
105 action=action, creds=creds)107 action=action, creds=creds)
106 self.assertEquals(action, "GET")108 self.assertEquals(action, "GET")
@@ -134,7 +136,8 @@
134136
135 class StubQuery(client.Query):137 class StubQuery(client.Query):
136138
137 def __init__(query, action, creds, endpoint, bucket=None):139 def __init__(query, action, creds, endpoint, bucket=None,
140 body_producer=None, receiver_factory=None):
138 super(StubQuery, query).__init__(141 super(StubQuery, query).__init__(
139 action=action, creds=creds, bucket=bucket)142 action=action, creds=creds, bucket=bucket)
140 self.assertEquals(action, "PUT")143 self.assertEquals(action, "PUT")
@@ -156,7 +159,8 @@
156159
157 class StubQuery(client.Query):160 class StubQuery(client.Query):
158161
159 def __init__(query, action, creds, endpoint, bucket=None):162 def __init__(query, action, creds, endpoint, bucket=None,
163 body_producer=None, receiver_factory=None):
160 super(StubQuery, query).__init__(164 super(StubQuery, query).__init__(
161 action=action, creds=creds, bucket=bucket)165 action=action, creds=creds, bucket=bucket)
162 self.assertEquals(action, "GET")166 self.assertEquals(action, "GET")
@@ -208,7 +212,8 @@
208 class StubQuery(client.Query):212 class StubQuery(client.Query):
209213
210 def __init__(query, action, creds, endpoint, bucket=None,214 def __init__(query, action, creds, endpoint, bucket=None,
211 object_name=None):215 object_name=None, body_producer=None,
216 receiver_factory=None):
212 super(StubQuery, query).__init__(action=action, creds=creds,217 super(StubQuery, query).__init__(action=action, creds=creds,
213 bucket=bucket,218 bucket=bucket,
214 object_name=object_name)219 object_name=object_name)
@@ -243,7 +248,8 @@
243 class StubQuery(client.Query):248 class StubQuery(client.Query):
244249
245 def __init__(query, action, creds, endpoint, bucket=None,250 def __init__(query, action, creds, endpoint, bucket=None,
246 object_name=None):251 object_name=None, body_producer=None,
252 receiver_factory=None):
247 super(StubQuery, query).__init__(action=action, creds=creds,253 super(StubQuery, query).__init__(action=action, creds=creds,
248 bucket=bucket,254 bucket=bucket,
249 object_name=object_name)255 object_name=object_name)
@@ -284,7 +290,8 @@
284 class StubQuery(client.Query):290 class StubQuery(client.Query):
285291
286 def __init__(query, action, creds, endpoint, bucket=None,292 def __init__(query, action, creds, endpoint, bucket=None,
287 object_name=None):293 object_name=None, body_producer=None,
294 receiver_factory=None):
288 super(StubQuery, query).__init__(action=action, creds=creds,295 super(StubQuery, query).__init__(action=action, creds=creds,
289 bucket=bucket,296 bucket=bucket,
290 object_name=object_name)297 object_name=object_name)
@@ -323,7 +330,8 @@
323 class StubQuery(client.Query):330 class StubQuery(client.Query):
324331
325 def __init__(query, action, creds, endpoint, bucket=None,332 def __init__(query, action, creds, endpoint, bucket=None,
326 object_name=None):333 object_name=None, body_producer=None,
334 receiver_factory=None):
327 super(StubQuery, query).__init__(action=action, creds=creds,335 super(StubQuery, query).__init__(action=action, creds=creds,
328 bucket=bucket,336 bucket=bucket,
329 object_name=object_name)337 object_name=object_name)
@@ -360,7 +368,8 @@
360 class StubQuery(client.Query):368 class StubQuery(client.Query):
361369
362 def __init__(query, action, creds, endpoint, bucket=None,370 def __init__(query, action, creds, endpoint, bucket=None,
363 object_name=None):371 object_name=None, body_producer=None,
372 receiver_factory=None):
364 super(StubQuery, query).__init__(action=action, creds=creds,373 super(StubQuery, query).__init__(action=action, creds=creds,
365 bucket=bucket,374 bucket=bucket,
366 object_name=object_name)375 object_name=object_name)
@@ -396,7 +405,8 @@
396 class StubQuery(client.Query):405 class StubQuery(client.Query):
397406
398 def __init__(query, action, creds, endpoint, bucket=None,407 def __init__(query, action, creds, endpoint, bucket=None,
399 object_name=None):408 object_name=None, body_producer=None,
409 receiver_factory=None):
400 super(StubQuery, query).__init__(action=action, creds=creds,410 super(StubQuery, query).__init__(action=action, creds=creds,
401 bucket=bucket,411 bucket=bucket,
402 object_name=object_name)412 object_name=object_name)
@@ -433,7 +443,8 @@
433 class StubQuery(client.Query):443 class StubQuery(client.Query):
434444
435 def __init__(query, action, creds, endpoint, bucket=None,445 def __init__(query, action, creds, endpoint, bucket=None,
436 object_name=None):446 object_name=None, body_producer=None,
447 receiver_factory=None):
437 super(StubQuery, query).__init__(action=action, creds=creds,448 super(StubQuery, query).__init__(action=action, creds=creds,
438 bucket=bucket,449 bucket=bucket,
439 object_name=object_name)450 object_name=object_name)
@@ -473,7 +484,8 @@
473 class StubQuery(client.Query):484 class StubQuery(client.Query):
474485
475 def __init__(query, action, creds, endpoint, bucket=None,486 def __init__(query, action, creds, endpoint, bucket=None,
476 object_name=None):487 object_name=None, body_producer=None,
488 receiver_factory=None):
477 super(StubQuery, query).__init__(action=action, creds=creds,489 super(StubQuery, query).__init__(action=action, creds=creds,
478 bucket=bucket,490 bucket=bucket,
479 object_name=object_name)491 object_name=object_name)
@@ -509,7 +521,8 @@
509 class StubQuery(client.Query):521 class StubQuery(client.Query):
510522
511 def __init__(query, action, creds, endpoint, bucket=None,523 def __init__(query, action, creds, endpoint, bucket=None,
512 object_name=None):524 object_name=None, body_producer=None,
525 receiver_factory=None):
513 super(StubQuery, query).__init__(action=action, creds=creds,526 super(StubQuery, query).__init__(action=action, creds=creds,
514 bucket=bucket,527 bucket=bucket,
515 object_name=object_name)528 object_name=object_name)
@@ -546,7 +559,8 @@
546 class StubQuery(client.Query):559 class StubQuery(client.Query):
547560
548 def __init__(query, action, creds, endpoint, bucket=None,561 def __init__(query, action, creds, endpoint, bucket=None,
549 object_name=None):562 object_name=None, body_producer=None,
563 receiver_factory=None):
550 super(StubQuery, query).__init__(action=action, creds=creds,564 super(StubQuery, query).__init__(action=action, creds=creds,
551 bucket=bucket,565 bucket=bucket,
552 object_name=object_name)566 object_name=object_name)
@@ -576,7 +590,8 @@
576590
577 class StubQuery(client.Query):591 class StubQuery(client.Query):
578592
579 def __init__(query, action, creds, endpoint, bucket=None):593 def __init__(query, action, creds, endpoint, bucket=None,
594 body_producer=None, receiver_factory=None):
580 super(StubQuery, query).__init__(595 super(StubQuery, query).__init__(
581 action=action, creds=creds, bucket=bucket)596 action=action, creds=creds, bucket=bucket)
582 self.assertEquals(action, "DELETE")597 self.assertEquals(action, "DELETE")
@@ -599,7 +614,8 @@
599 class StubQuery(client.Query):614 class StubQuery(client.Query):
600615
601 def __init__(query, action, creds, endpoint, bucket=None,616 def __init__(query, action, creds, endpoint, bucket=None,
602 object_name=None, data=""):617 object_name=None, data="", body_producer=None,
618 receiver_factory=None):
603 super(StubQuery, query).__init__(action=action, creds=creds,619 super(StubQuery, query).__init__(action=action, creds=creds,
604 bucket=bucket,620 bucket=bucket,
605 object_name=object_name,621 object_name=object_name,
@@ -630,7 +646,8 @@
630 class StubQuery(client.Query):646 class StubQuery(client.Query):
631647
632 def __init__(query, action, creds, endpoint, bucket=None,648 def __init__(query, action, creds, endpoint, bucket=None,
633 object_name=None, data=""):649 object_name=None, data="", receiver_factory=None,
650 body_producer=None):
634 super(StubQuery, query).__init__(action=action, creds=creds,651 super(StubQuery, query).__init__(action=action, creds=creds,
635 bucket=bucket,652 bucket=bucket,
636 object_name=object_name,653 object_name=object_name,
@@ -665,7 +682,7 @@
665682
666 def __init__(query, action, creds, endpoint, bucket=None,683 def __init__(query, action, creds, endpoint, bucket=None,
667 object_name=None, data=None, content_type=None,684 object_name=None, data=None, content_type=None,
668 metadata=None):685 metadata=None, body_producer=None, receiver_factory=None):
669 super(StubQuery, query).__init__(686 super(StubQuery, query).__init__(
670 action=action, creds=creds, bucket=bucket,687 action=action, creds=creds, bucket=bucket,
671 object_name=object_name, data=data,688 object_name=object_name, data=data,
@@ -701,7 +718,7 @@
701718
702 def __init__(query, action, creds, endpoint, bucket=None,719 def __init__(query, action, creds, endpoint, bucket=None,
703 object_name=None, data=None, content_type=None,720 object_name=None, data=None, content_type=None,
704 metadata=None):721 metadata=None, body_producer=None, receiver_factory=None):
705 super(StubQuery, query).__init__(722 super(StubQuery, query).__init__(
706 action=action, creds=creds, bucket=bucket,723 action=action, creds=creds, bucket=bucket,
707 object_name=object_name, data=data,724 object_name=object_name, data=data,
@@ -730,7 +747,8 @@
730747
731 def __init__(query, action, creds, endpoint, bucket=None,748 def __init__(query, action, creds, endpoint, bucket=None,
732 object_name=None, data=None, content_type=None,749 object_name=None, data=None, content_type=None,
733 metadata=None, amz_headers=None):750 metadata=None, amz_headers=None, body_producer=None,
751 receiver_factory=None):
734 super(StubQuery, query).__init__(752 super(StubQuery, query).__init__(
735 action=action, creds=creds, bucket=bucket,753 action=action, creds=creds, bucket=bucket,
736 object_name=object_name, data=data,754 object_name=object_name, data=data,
@@ -756,6 +774,42 @@
756 metadata={"key": "some meta data"},774 metadata={"key": "some meta data"},
757 amz_headers={"acl": "public-read"})775 amz_headers={"acl": "public-read"})
758776
777 def test_put_object_with_custom_body_producer(self):
778
779 class StubQuery(client.Query):
780
781 def __init__(query, action, creds, endpoint, bucket=None,
782 object_name=None, data=None, content_type=None,
783 metadata=None, amz_headers=None, body_producer=None,
784 receiver_factory=None):
785 super(StubQuery, query).__init__(
786 action=action, creds=creds, bucket=bucket,
787 object_name=object_name, data=data,
788 content_type=content_type, metadata=metadata,
789 amz_headers=amz_headers, body_producer=body_producer)
790 self.assertEqual(action, "PUT")
791 self.assertEqual(creds.access_key, "foo")
792 self.assertEqual(creds.secret_key, "bar")
793 self.assertEqual(query.bucket, "mybucket")
794 self.assertEqual(query.object_name, "objectname")
795 self.assertEqual(query.content_type, "text/plain")
796 self.assertEqual(query.metadata, {"key": "some meta data"})
797 self.assertEqual(query.amz_headers, {"acl": "public-read"})
798 self.assertIdentical(body_producer, string_producer)
799
800 def submit(query):
801 return succeed(None)
802
803
804 string_producer = StringBodyProducer("some data")
805 creds = AWSCredentials("foo", "bar")
806 s3 = client.S3Client(creds, query_factory=StubQuery)
807 return s3.put_object("mybucket", "objectname",
808 content_type="text/plain",
809 metadata={"key": "some meta data"},
810 amz_headers={"acl": "public-read"},
811 body_producer=string_producer)
812
759 def test_copy_object(self):813 def test_copy_object(self):
760 """814 """
761 L{S3Client.copy_object} creates a L{Query} to copy an object from one815 L{S3Client.copy_object} creates a L{Query} to copy an object from one
@@ -766,7 +820,8 @@
766820
767 def __init__(query, action, creds, endpoint, bucket=None,821 def __init__(query, action, creds, endpoint, bucket=None,
768 object_name=None, data=None, content_type=None,822 object_name=None, data=None, content_type=None,
769 metadata=None, amz_headers=None):823 metadata=None, amz_headers=None, body_producer=None,
824 receiver_factory=None):
770 super(StubQuery, query).__init__(825 super(StubQuery, query).__init__(
771 action=action, creds=creds, bucket=bucket,826 action=action, creds=creds, bucket=bucket,
772 object_name=object_name, data=data,827 object_name=object_name, data=data,
@@ -798,7 +853,8 @@
798853
799 def __init__(query, action, creds, endpoint, bucket=None,854 def __init__(query, action, creds, endpoint, bucket=None,
800 object_name=None, data=None, content_type=None,855 object_name=None, data=None, content_type=None,
801 metadata=None, amz_headers=None):856 metadata=None, amz_headers=None, body_producer=None,
857 receiver_factory=None):
802 super(StubQuery, query).__init__(858 super(StubQuery, query).__init__(
803 action=action, creds=creds, bucket=bucket,859 action=action, creds=creds, bucket=bucket,
804 object_name=object_name, data=data,860 object_name=object_name, data=data,
@@ -822,7 +878,7 @@
822878
823 def __init__(query, action, creds, endpoint, bucket=None,879 def __init__(query, action, creds, endpoint, bucket=None,
824 object_name=None, data=None, content_type=None,880 object_name=None, data=None, content_type=None,
825 metadata=None):881 metadata=None, body_producer=None, receiver_factory=None):
826 super(StubQuery, query).__init__(882 super(StubQuery, query).__init__(
827 action=action, creds=creds, bucket=bucket,883 action=action, creds=creds, bucket=bucket,
828 object_name=object_name, data=data,884 object_name=object_name, data=data,
@@ -846,7 +902,7 @@
846902
847 def __init__(query, action, creds, endpoint, bucket=None,903 def __init__(query, action, creds, endpoint, bucket=None,
848 object_name=None, data=None, content_type=None,904 object_name=None, data=None, content_type=None,
849 metadata=None):905 metadata=None, body_producer=None, receiver_factory=None):
850 super(StubQuery, query).__init__(906 super(StubQuery, query).__init__(
851 action=action, creds=creds, bucket=bucket,907 action=action, creds=creds, bucket=bucket,
852 object_name=object_name, data=data,908 object_name=object_name, data=data,
@@ -869,7 +925,8 @@
869 class StubQuery(client.Query):925 class StubQuery(client.Query):
870926
871 def __init__(query, action, creds, endpoint, bucket=None,927 def __init__(query, action, creds, endpoint, bucket=None,
872 object_name=None, data=""):928 object_name=None, data="", body_producer=None,
929 receiver_factory=None):
873 super(StubQuery, query).__init__(action=action, creds=creds,930 super(StubQuery, query).__init__(action=action, creds=creds,
874 bucket=bucket,931 bucket=bucket,
875 object_name=object_name,932 object_name=object_name,
@@ -902,7 +959,8 @@
902 class StubQuery(client.Query):959 class StubQuery(client.Query):
903960
904 def __init__(query, action, creds, endpoint, bucket=None,961 def __init__(query, action, creds, endpoint, bucket=None,
905 object_name=None, data=""):962 object_name=None, data="", body_producer=None,
963 receiver_factory=None):
906 super(StubQuery, query).__init__(action=action, creds=creds,964 super(StubQuery, query).__init__(action=action, creds=creds,
907 bucket=bucket,965 bucket=bucket,
908 object_name=object_name,966 object_name=object_name,
@@ -926,6 +984,68 @@
926 deferred = s3.get_object_acl("mybucket", "myobject")984 deferred = s3.get_object_acl("mybucket", "myobject")
927 return deferred.addCallback(check_result)985 return deferred.addCallback(check_result)
928986
987 def test_init_multipart_upload(self):
988
989 class StubQuery(client.Query):
990
991 def __init__(query, action, creds, endpoint, bucket=None,
992 object_name=None, data="", body_producer=None,
993 content_type=None, receiver_factory=None, metadata={}):
994 super(StubQuery, query).__init__(action=action, creds=creds,
995 bucket=bucket,
996 object_name=object_name,
997 data=data)
998 self.assertEquals(action, "POST")
999 self.assertEqual(creds.access_key, "foo")
1000 self.assertEqual(creds.secret_key, "bar")
1001 self.assertEqual(query.bucket, "example-bucket")
1002 self.assertEqual(query.object_name, "example-object?uploads")
1003 self.assertEqual(query.data, "")
1004 self.assertEqual(query.metadata, {})
1005
1006 def submit(query, url_context=None):
1007 return succeed(payload.sample_s3_init_multipart_upload_result)
1008
1009
1010 def check_result(result):
1011 self.assert_(isinstance(result, MultipartInitiationResponse))
1012 self.assertEqual(result.bucket, "example-bucket")
1013 self.assertEqual(result.object_name, "example-object")
1014 self.assertEqual(result.upload_id, "deadbeef")
1015
1016 creds = AWSCredentials("foo", "bar")
1017 s3 = client.S3Client(creds, query_factory=StubQuery)
1018 deferred = s3.init_multipart_upload("example-bucket", "example-object")
1019 return deferred.addCallback(check_result)
1020
1021 def test_upload_part(self):
1022
1023 class StubQuery(client.Query):
1024
1025 def __init__(query, action, creds, endpoint, bucket=None,
1026 object_name=None, data="", body_producer=None,
1027 content_type=None, receiver_factory=None, metadata={}):
1028 super(StubQuery, query).__init__(action=action, creds=creds,
1029 bucket=bucket,
1030 object_name=object_name,
1031 data=data)
1032 self.assertEquals(action, "PUT")
1033 self.assertEqual(creds.access_key, "foo")
1034 self.assertEqual(creds.secret_key, "bar")
1035 self.assertEqual(query.bucket, "example-bucket")
1036 self.assertEqual(query.object_name,
1037 "example-object?partNumber=3&uploadId=testid")
1038 self.assertEqual(query.data, "some data")
1039 self.assertEqual(query.metadata, {})
1040
1041 def submit(query, url_context=None):
1042 return succeed(None)
1043
1044 creds = AWSCredentials("foo", "bar")
1045 s3 = client.S3Client(creds, query_factory=StubQuery)
1046 return s3.upload_part("example-bucket", "example-object", "testid", 3,
1047 "some data")
1048
929S3ClientTestCase.skip = s3clientSkip1049S3ClientTestCase.skip = s3clientSkip
9301050
9311051
@@ -1077,7 +1197,8 @@
1077 """1197 """
1078 class StubQuery(client.Query):1198 class StubQuery(client.Query):
10791199
1080 def __init__(query, action, creds, endpoint, bucket):1200 def __init__(query, action, creds, endpoint, bucket,
1201 body_producer=None, receiver_factory=None):
1081 super(StubQuery, query).__init__(1202 super(StubQuery, query).__init__(
1082 action=action, creds=creds, bucket=bucket)1203 action=action, creds=creds, bucket=bucket)
1083 self.assertEquals(action, "GET")1204 self.assertEquals(action, "GET")
10841205
=== modified file 'txaws/testing/payload.py'
--- txaws/testing/payload.py 2012-01-28 00:39:00 +0000
+++ txaws/testing/payload.py 2012-02-10 21:24:20 +0000
@@ -1085,3 +1085,12 @@
1085 <Status>Enabled</Status>1085 <Status>Enabled</Status>
1086 <MfaDelete>Disabled</MfaDelete>1086 <MfaDelete>Disabled</MfaDelete>
1087</VersioningConfiguration>"""1087</VersioningConfiguration>"""
1088
1089sample_s3_init_multipart_upload_result = """\
1090<InitiateMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
1091 <Bucket>example-bucket</Bucket>
1092 <Key>example-object</Key>
1093 <UploadId>deadbeef</UploadId>
1094</InitiateMultipartUploadResult>"""
1095
1096
10881097
=== added file 'txaws/testing/producers.py'
--- txaws/testing/producers.py 1970-01-01 00:00:00 +0000
+++ txaws/testing/producers.py 2012-02-10 21:24:20 +0000
@@ -0,0 +1,23 @@
1from zope.interface import implements
2
3from twisted.internet.defer import succeed
4from twisted.web.iweb import IBodyProducer
5
6class StringBodyProducer(object):
7 implements(IBodyProducer)
8
9 def __init__(self, data):
10 self.data = data
11 self.length = len(data)
12 self.written = None
13
14 def startProducing(self, consumer):
15 consumer.write(self.data)
16 self.written = self.data
17 return succeed(None)
18
19 def pauseProducing(self):
20 pass
21
22 def stopProducing(self):
23 pass

Subscribers

People subscribed via source and target branches