Merge ~barryprice/spade/+git/spade:black into spade:master

Proposed by Barry Price
Status: Superseded
Proposed branch: ~barryprice/spade/+git/spade:black
Merge into: spade:master
Prerequisite: ~barryprice/spade/+git/spade:flake8
Diff against target: 905 lines (+201/-191)
10 files modified
Makefile (+2/-1)
spade/AzureBucket.py (+61/-63)
spade/BucketObject.py (+2/-3)
spade/GCEBucket.py (+14/-33)
spade/LocalBucket.py (+15/-3)
spade/ObjectIO.py (+5/-8)
spade/S3Bucket.py (+61/-43)
spade/Spade.py (+14/-8)
spade/StdioBucket.py (+1/-1)
spade/SwiftBucket.py (+26/-28)
Reviewer Review Type Date Requested Status
Spade Developers Pending
Review via email: mp+394570@code.launchpad.net

This proposal has been superseded by a proposal from 2020-11-27.

Commit message

Reformatted with black

To post a comment you must log in.
~barryprice/spade/+git/spade:black updated
c85283a... by Barry Price

Typo

6817bc5... by Barry Price

Merge branch 'master' into flake8

1734965... by Barry Price

Merge branch 'flake8' into black

Unmerged commits

1734965... by Barry Price

Merge branch 'flake8' into black

6817bc5... by Barry Price

Merge branch 'master' into flake8

c85283a... by Barry Price

Typo

a86afa5... by Barry Price

Reformatted with black

aa2d273... by Barry Price

Address all Flake8 errors bar complexity (noqa: C901)

7e34f18... by Barry Price

Modernise, switch to python3 exclusively

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/Makefile b/Makefile
index 03544a4..ceb744e 100644
--- a/Makefile
+++ b/Makefile
@@ -2,7 +2,8 @@ version := $(shell python3 -c "from spade import __version__; print(__version__)
2tempfiles_snap := snap/snapcraft.yaml spade*.snap2tempfiles_snap := snap/snapcraft.yaml spade*.snap
33
4blacken:4blacken:
5 @echo "Normalising python layout with black (but not really)."5 @echo "Normalising python layout with black"
6 @tox -e black
67
7lint: blacken8lint: blacken
8 @echo "Running flake8"9 @echo "Running flake8"
diff --git a/spade/AzureBucket.py b/spade/AzureBucket.py
index 0f7a62d..e04830a 100644
--- a/spade/AzureBucket.py
+++ b/spade/AzureBucket.py
@@ -26,15 +26,14 @@ from spade.ObjectIO import ObjectIO
2626
27DEFAULT_BLOCKSIZE = 1024 * 1024 * 1027DEFAULT_BLOCKSIZE = 1024 * 1024 * 10
28DEFAULT_DELIMITER = "/"28DEFAULT_DELIMITER = "/"
29AZURE_SEGMENTSIZE = 1024 * 1024 * 4 # Azure block (segment) limit = 4MB29AZURE_SEGMENTSIZE = 1024 * 1024 * 4 # Azure block (segment) limit = 4MB
30MAX_AZURE_OBJECTSIZE = 64 * 1024 * 1024 # Azure block blob limit = 64MB30MAX_AZURE_OBJECTSIZE = 64 * 1024 * 1024 # Azure block blob limit = 64MB
31USER_META_PREFIX = "x-ms-meta-"31USER_META_PREFIX = "x-ms-meta-"
3232
3333
34class AzureBucket():34class AzureBucket:
35 def __init__(self, creds):35 def __init__(self, creds):
36 wanted = ["private_key",36 wanted = ["private_key", "storage_account"]
37 "storage_account"]
38 for k in wanted:37 for k in wanted:
39 if k not in creds:38 if k not in creds:
40 # Missing creds39 # Missing creds
@@ -67,9 +66,19 @@ class AzureBucket():
67 def _create_azure_auth_signature(self, bucket, headers, query, method):66 def _create_azure_auth_signature(self, bucket, headers, query, method):
68 # Note: auth_headers are in a specific order67 # Note: auth_headers are in a specific order
69 # see https://msdn.microsoft.com/en-gb/library/azure/dd179428.aspx68 # see https://msdn.microsoft.com/en-gb/library/azure/dd179428.aspx
70 auth_headers = ("Content-Encoding", "Content-Language", "Content-Length", "Content-MD5",69 auth_headers = (
71 "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match",70 "Content-Encoding",
72 "If-Unmodified-Since", "Range")71 "Content-Language",
72 "Content-Length",
73 "Content-MD5",
74 "Content-Type",
75 "Date",
76 "If-Modified-Since",
77 "If-Match",
78 "If-None-Match",
79 "If-Unmodified-Since",
80 "Range",
81 )
73 canonicalized_headers = self._dict_to_str(headers, strip_final=False, startswith="x-ms-")82 canonicalized_headers = self._dict_to_str(headers, strip_final=False, startswith="x-ms-")
74 query_string = self._dict_to_str(query)83 query_string = self._dict_to_str(query)
7584
@@ -84,9 +93,11 @@ class AzureBucket():
84 signature += canonicalized_headers93 signature += canonicalized_headers
85 signature += canonicalized_resource94 signature += canonicalized_resource
8695
87 encoded_sig = base64.b64encode(hmac.new(base64.b64decode(self.primaryKey),96 encoded_sig = base64.b64encode(
88 msg=signature.encode("utf-8"),97 hmac.new(
89 digestmod=hashlib.sha256).digest())98 base64.b64decode(self.primaryKey), msg=signature.encode("utf-8"), digestmod=hashlib.sha256
99 ).digest()
100 )
90 return encoded_sig.decode('utf-8')101 return encoded_sig.decode('utf-8')
91102
92 # Requests documented here: https://msdn.microsoft.com/en-GB/library/azure/dd135733.aspx103 # Requests documented here: https://msdn.microsoft.com/en-GB/library/azure/dd135733.aspx
@@ -104,10 +115,9 @@ class AzureBucket():
104 headers["Authorization"] = "SharedKey {}:{}".format(self.storageAccount, auth)115 headers["Authorization"] = "SharedKey {}:{}".format(self.storageAccount, auth)
105116
106 query_string = self._dict_to_str(query, kv_sep="=", entry_sep="&")117 query_string = self._dict_to_str(query, kv_sep="=", entry_sep="&")
107 response, content = h.request(uri=self.blobUri + bucket + "?" + query_string,118 response, content = h.request(
108 method=method,119 uri=self.blobUri + bucket + "?" + query_string, method=method, body=body, headers=headers
109 body=body,120 )
110 headers=headers)
111 return response, content121 return response, content
112122
113 def listBuckets(self):123 def listBuckets(self):
@@ -153,9 +163,7 @@ class AzureBucket():
153163
154 def getBucketMetadata(self, bucketName):164 def getBucketMetadata(self, bucketName):
155 # Standard metadata165 # Standard metadata
156 response, content = self._send_azure_request(bucketName,166 response, content = self._send_azure_request(bucketName, query={"restype": "container"}, method="HEAD")
157 query={"restype": "container"},
158 method="HEAD")
159 if response.get("status") == "200":167 if response.get("status") == "200":
160 co = BucketObject(bucketName)168 co = BucketObject(bucketName)
161 co.size = int(response.get("content-length", 0))169 co.size = int(response.get("content-length", 0))
@@ -169,9 +177,7 @@ class AzureBucket():
169177
170 # User-defined metadata178 # User-defined metadata
171 query = {"restype": "container", "comp": "metadata"}179 query = {"restype": "container", "comp": "metadata"}
172 response, content = self._send_azure_request(bucketName,180 response, content = self._send_azure_request(bucketName, query=query, method="HEAD")
173 query=query,
174 method="HEAD")
175 if response.get("status") == "200":181 if response.get("status") == "200":
176 for k, v in response.items():182 for k, v in response.items():
177 if k.lower().startswith(USER_META_PREFIX):183 if k.lower().startswith(USER_META_PREFIX):
@@ -187,8 +193,7 @@ class AzureBucket():
187193
188 def getObjectMetadata(self, bucketName, objectName):194 def getObjectMetadata(self, bucketName, objectName):
189 # Standard metadata195 # Standard metadata
190 response, content = self._send_azure_request(bucketName + "/" + objectName,196 response, content = self._send_azure_request(bucketName + "/" + objectName, method="HEAD")
191 method="HEAD")
192 if response.get("status") == "200":197 if response.get("status") == "200":
193 co = BucketObject(objectName)198 co = BucketObject(objectName)
194 co.size = int(response.get("content-length", 0))199 co.size = int(response.get("content-length", 0))
@@ -201,9 +206,9 @@ class AzureBucket():
201 raise BucketError("Azure Failed to find object metadata")206 raise BucketError("Azure Failed to find object metadata")
202207
203 # User-defined metadata208 # User-defined metadata
204 response, content = self._send_azure_request(bucketName + "/" + objectName,209 response, content = self._send_azure_request(
205 query={"comp": "metadata"},210 bucketName + "/" + objectName, query={"comp": "metadata"}, method="HEAD"
206 method="HEAD")211 )
207 if response.get("status") == "200":212 if response.get("status") == "200":
208 for k, v in response.items():213 for k, v in response.items():
209 if k.lower().startswith(USER_META_PREFIX):214 if k.lower().startswith(USER_META_PREFIX):
@@ -225,9 +230,13 @@ class AzureBucket():
225 return metadata.rawMetadata.get("etag", "")230 return metadata.rawMetadata.get("etag", "")
226231
227 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):232 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):
228 systemMetadata = ["x-ms-blob-cache-control", "x-ms-blob-content-type",233 systemMetadata = [
229 "x-ms-blob-content-md5", "x-ms-blob-content-encoding",234 "x-ms-blob-cache-control",
230 "x-ms-blob-content-language"]235 "x-ms-blob-content-type",
236 "x-ms-blob-content-md5",
237 "x-ms-blob-content-encoding",
238 "x-ms-blob-content-language",
239 ]
231240
232 newStandardMetadata = {}241 newStandardMetadata = {}
233 newUserMetadata = {}242 newUserMetadata = {}
@@ -252,18 +261,16 @@ class AzureBucket():
252 newUserMetadata[USER_META_PREFIX + k] = v261 newUserMetadata[USER_META_PREFIX + k] = v
253262
254 # Standard metadata263 # Standard metadata
255 response, content = self._send_azure_request(bucketName + "/" + objectName,264 response, content = self._send_azure_request(
256 query={"comp": "properties"},265 bucketName + "/" + objectName, query={"comp": "properties"}, headers=newStandardMetadata, method="PUT"
257 headers=newStandardMetadata,266 )
258 method="PUT")
259 if response.get("status") != "200":267 if response.get("status") != "200":
260 raise BucketError("Azure Failed to set standard object metadata")268 raise BucketError("Azure Failed to set standard object metadata")
261269
262 # User metadata270 # User metadata
263 response, content = self._send_azure_request(bucketName + "/" + objectName,271 response, content = self._send_azure_request(
264 query={"comp": "metadata"},272 bucketName + "/" + objectName, query={"comp": "metadata"}, headers=newUserMetadata, method="PUT"
265 headers=newUserMetadata,273 )
266 method="PUT")
267 if response.get("status") != "200":274 if response.get("status") != "200":
268 raise BucketError("Azure Failed to set user object metadata")275 raise BucketError("Azure Failed to set user object metadata")
269 return276 return
@@ -272,8 +279,7 @@ class AzureBucket():
272 headers = {}279 headers = {}
273 if rangeStart is not None and rangeEnd is not None:280 if rangeStart is not None and rangeEnd is not None:
274 headers["Range"] = "bytes={}-{}".format(rangeStart, rangeEnd)281 headers["Range"] = "bytes={}-{}".format(rangeStart, rangeEnd)
275 response, content = self._send_azure_request(bucketName + "/" + objectName,282 response, content = self._send_azure_request(bucketName + "/" + objectName, headers=headers)
276 headers=headers)
277 if response.get("status") == "200" or response.get("status") == "206":283 if response.get("status") == "200" or response.get("status") == "206":
278 return content284 return content
279 elif response.get("status") == "416":285 elif response.get("status") == "416":
@@ -293,11 +299,13 @@ class AzureBucket():
293 blockId = base64.b64encode("{:06}".format(blockNumber).encode('utf-8')).decode('utf-8')299 blockId = base64.b64encode("{:06}".format(blockNumber).encode('utf-8')).decode('utf-8')
294 headers = {}300 headers = {}
295 headers["Content-Length"] = str(len(blockContent))301 headers["Content-Length"] = str(len(blockContent))
296 response, content = self._send_azure_request(bucketName + "/" + objectName,302 response, content = self._send_azure_request(
297 query={"comp": "block", "blockid": blockId},303 bucketName + "/" + objectName,
298 headers=headers,304 query={"comp": "block", "blockid": blockId},
299 body=blockContent,305 headers=headers,
300 method="PUT")306 body=blockContent,
307 method="PUT",
308 )
301 if not response.get("status") == "201":309 if not response.get("status") == "201":
302 raise BucketError("Azure Failed to upload multi-part block")310 raise BucketError("Azure Failed to upload multi-part block")
303 blockList.append(blockId)311 blockList.append(blockId)
@@ -312,11 +320,9 @@ class AzureBucket():
312 xml += "</BlockList>\n"320 xml += "</BlockList>\n"
313 headers = {}321 headers = {}
314 headers["Content-Length"] = str(len(xml))322 headers["Content-Length"] = str(len(xml))
315 response, content = self._send_azure_request(bucketName + "/" + objectName,323 response, content = self._send_azure_request(
316 query={"comp": "blocklist"},324 bucketName + "/" + objectName, query={"comp": "blocklist"}, headers=headers, body=xml, method="PUT"
317 headers=headers,325 )
318 body=xml,
319 method="PUT")
320 if not response.get("status") == "201":326 if not response.get("status") == "201":
321 raise BucketError("Azure Failed to upload blocklist")327 raise BucketError("Azure Failed to upload blocklist")
322 return328 return
@@ -329,10 +335,9 @@ class AzureBucket():
329 headers["Content-Length"] = str(size)335 headers["Content-Length"] = str(size)
330 headers["Content-Type"] = "application/octet-stream"336 headers["Content-Type"] = "application/octet-stream"
331 headers["x-ms-blob-type"] = "BlockBlob"337 headers["x-ms-blob-type"] = "BlockBlob"
332 response, content = self._send_azure_request(bucketName + "/" + objectName,338 response, content = self._send_azure_request(
333 headers=headers,339 bucketName + "/" + objectName, headers=headers, body=src, method="PUT"
334 body=src,340 )
335 method="PUT")
336 if response.get("status") == "201":341 if response.get("status") == "201":
337 return342 return
338 else:343 else:
@@ -342,8 +347,7 @@ class AzureBucket():
342 return ObjectIO(self, bucketName, objectName, blockSize)347 return ObjectIO(self, bucketName, objectName, blockSize)
343348
344 def deleteObject(self, bucketName, objectName):349 def deleteObject(self, bucketName, objectName):
345 response, content = self._send_azure_request(bucketName + "/" + objectName,350 response, content = self._send_azure_request(bucketName + "/" + objectName, method="DELETE")
346 method="DELETE")
347 if response.get("status") == "202":351 if response.get("status") == "202":
348 return352 return
349 else:353 else:
@@ -351,9 +355,7 @@ class AzureBucket():
351355
352 def createBucket(self, bucketName):356 def createBucket(self, bucketName):
353 query = {"restype": "container"}357 query = {"restype": "container"}
354 response, content = self._send_azure_request(bucketName,358 response, content = self._send_azure_request(bucketName, query=query, method="PUT")
355 query=query,
356 method="PUT")
357 if response.get("status") == "201":359 if response.get("status") == "201":
358 return360 return
359 else:361 else:
@@ -362,9 +364,7 @@ class AzureBucket():
362 # Note: Azure doesn't care if the container is empty or not364 # Note: Azure doesn't care if the container is empty or not
363 def deleteBucket(self, bucketName):365 def deleteBucket(self, bucketName):
364 query = {"restype": "container"}366 query = {"restype": "container"}
365 response, content = self._send_azure_request(bucketName,367 response, content = self._send_azure_request(bucketName, query=query, method="DELETE")
366 query=query,
367 method="DELETE")
368 if response.get("status") == "202":368 if response.get("status") == "202":
369 return369 return
370 else:370 else:
@@ -378,9 +378,7 @@ class AzureBucket():
378 return []378 return []
379379
380 query = {"restype": "container", "comp": "acl"}380 query = {"restype": "container", "comp": "acl"}
381 response, content = self._send_azure_request(bucketName,381 response, content = self._send_azure_request(bucketName, query=query, method="HEAD")
382 query=query,
383 method="HEAD")
384 if response.get("status") != "200":382 if response.get("status") != "200":
385 raise BucketError("Azure Failed to read ACL data")383 raise BucketError("Azure Failed to read ACL data")
386384
diff --git a/spade/BucketObject.py b/spade/BucketObject.py
index 9f8a258..7d35d24 100644
--- a/spade/BucketObject.py
+++ b/spade/BucketObject.py
@@ -1,5 +1,4 @@
1class BucketObject():1class BucketObject:
2
3 class ObjectType:2 class ObjectType:
4 File = 03 File = 0
5 Directory = 14 Directory = 1
@@ -18,7 +17,7 @@ class BucketError(Exception):
18 pass17 pass
1918
2019
21class BucketAcl():20class BucketAcl:
22 def __init__(self):21 def __init__(self):
23 self.acls = {}22 self.acls = {}
2423
diff --git a/spade/GCEBucket.py b/spade/GCEBucket.py
index 2029886..56761f1 100644
--- a/spade/GCEBucket.py
+++ b/spade/GCEBucket.py
@@ -23,6 +23,7 @@ import datetime
23import base6423import base64
24from apiclient import discovery, errors24from apiclient import discovery, errors
25from apiclient.http import MediaIoBaseUpload25from apiclient.http import MediaIoBaseUpload
26
26try:27try:
27 from oauth2client.service_account import ServiceAccountCredentials28 from oauth2client.service_account import ServiceAccountCredentials
28except ImportError:29except ImportError:
@@ -34,11 +35,9 @@ DEFAULT_BLOCKSIZE = 1024 * 1024 * 10
34DEFAULT_DELIMITER = "/"35DEFAULT_DELIMITER = "/"
3536
3637
37class GCEBucket():38class GCEBucket:
38 def __init__(self, creds):39 def __init__(self, creds):
39 wanted = ["client_email",40 wanted = ["client_email", "private_key", "project"]
40 "private_key",
41 "project"]
42 for k in wanted:41 for k in wanted:
43 if k not in creds:42 if k not in creds:
44 # Missing creds43 # Missing creds
@@ -47,12 +46,9 @@ class GCEBucket():
47 # httplib2.debuglevel = 446 # httplib2.debuglevel = 4
48 cloud_storage_auth_scope = 'https://www.googleapis.com/auth/devstorage.full_control'47 cloud_storage_auth_scope = 'https://www.googleapis.com/auth/devstorage.full_control'
49 try:48 try:
50 c = ServiceAccountCredentials.from_json_keyfile_dict(creds,49 c = ServiceAccountCredentials.from_json_keyfile_dict(creds, scopes=cloud_storage_auth_scope)
51 scopes=cloud_storage_auth_scope)
52 except NameError:50 except NameError:
53 c = SignedJwtAssertionCredentials(creds['client_email'],51 c = SignedJwtAssertionCredentials(creds['client_email'], creds['private_key'], cloud_storage_auth_scope)
54 creds['private_key'],
55 cloud_storage_auth_scope)
56 http = c.authorize(httplib2.Http())52 http = c.authorize(httplib2.Http())
57 self.gce = discovery.build('storage', 'v1', http=http)53 self.gce = discovery.build('storage', 'v1', http=http)
58 self.project = creds["project"]54 self.project = creds["project"]
@@ -79,10 +75,7 @@ class GCEBucket():
79 def listBucketContents(self, bucketName, path=None, delimiter=DEFAULT_DELIMITER):75 def listBucketContents(self, bucketName, path=None, delimiter=DEFAULT_DELIMITER):
80 contents = []76 contents = []
81 fields_to_return = 'nextPageToken,prefixes,items'77 fields_to_return = 'nextPageToken,prefixes,items'
82 req = self.gce.objects().list(bucket=bucketName,78 req = self.gce.objects().list(bucket=bucketName, prefix=path, delimiter=delimiter, fields=fields_to_return)
83 prefix=path,
84 delimiter=delimiter,
85 fields=fields_to_return)
86 while req:79 while req:
87 try:80 try:
88 resp = req.execute()81 resp = req.execute()
@@ -106,8 +99,7 @@ class GCEBucket():
106 return contents99 return contents
107100
108 def getBucketMetadata(self, bucketName):101 def getBucketMetadata(self, bucketName):
109 req = self.gce.buckets().get(bucket=bucketName,102 req = self.gce.buckets().get(bucket=bucketName, projection="full")
110 projection="full")
111 try:103 try:
112 resp = req.execute()104 resp = req.execute()
113 except errors.HttpError:105 except errors.HttpError:
@@ -126,9 +118,7 @@ class GCEBucket():
126 raise BucketError("GCE Setting bucket metadata is not supported in GCE")118 raise BucketError("GCE Setting bucket metadata is not supported in GCE")
127119
128 def getObjectMetadata(self, bucketName, objectName):120 def getObjectMetadata(self, bucketName, objectName):
129 req = self.gce.objects().get(bucket=bucketName,121 req = self.gce.objects().get(bucket=bucketName, object=objectName, projection="full")
130 object=objectName,
131 projection="full")
132 try:122 try:
133 resp = req.execute()123 resp = req.execute()
134 except errors.HttpError:124 except errors.HttpError:
@@ -157,8 +147,7 @@ class GCEBucket():
157 return hexChecksum147 return hexChecksum
158148
159 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):149 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):
160 systemMetadata = ["cacheControl", "contentDisposition", "contentEncoding",150 systemMetadata = ["cacheControl", "contentDisposition", "contentEncoding", "contentLanguage", "contentType"]
161 "contentLanguage", "contentType"]
162 newMetadata = {}151 newMetadata = {}
163 m = self.getObjectMetadata(bucketName, objectName)152 m = self.getObjectMetadata(bucketName, objectName)
164 # Keep existing acls, system metadata and (optionally) user metadata153 # Keep existing acls, system metadata and (optionally) user metadata
@@ -177,9 +166,7 @@ class GCEBucket():
177 newMetadata[k] = v166 newMetadata[k] = v
178 else:167 else:
179 newMetadata["metadata"][k] = v168 newMetadata["metadata"][k] = v
180 req = self.gce.objects().update(bucket=bucketName,169 req = self.gce.objects().update(bucket=bucketName, object=objectName, body=newMetadata)
181 object=objectName,
182 body=newMetadata)
183 try:170 try:
184 req.execute()171 req.execute()
185 except errors.HttpError:172 except errors.HttpError:
@@ -190,22 +177,18 @@ class GCEBucket():
190 headers = {}177 headers = {}
191 if rangeStart is not None and rangeEnd is not None:178 if rangeStart is not None and rangeEnd is not None:
192 headers['range'] = "bytes={}-{}".format(rangeStart, rangeEnd)179 headers['range'] = "bytes={}-{}".format(rangeStart, rangeEnd)
193 req = self.gce.objects().get_media(bucket=bucketName,180 req = self.gce.objects().get_media(bucket=bucketName, object=objectName)
194 object=objectName)
195 http = req.http181 http = req.http
196 resp, data = http.request(req.uri, headers=headers)182 resp, data = http.request(req.uri, headers=headers)
197 if resp.get("status") == "416":183 if resp.get("status") == "416":
198 # Requested range not satisfiable184 # Requested range not satisfiable
199 data = ""185 data = ""
200 elif resp.get("status")[0] != "2":186 elif resp.get("status")[0] != "2":
201 raise(BucketError("GCE Failed to find object"))187 raise (BucketError("GCE Failed to find object"))
202 return data188 return data
203189
204 def putObject(self, bucketName, objectName, src, size, blockSize=DEFAULT_BLOCKSIZE):190 def putObject(self, bucketName, objectName, src, size, blockSize=DEFAULT_BLOCKSIZE):
205 media = MediaIoBaseUpload(src,191 media = MediaIoBaseUpload(src, mimetype="application/octet-stream", chunksize=blockSize, resumable=True)
206 mimetype="application/octet-stream",
207 chunksize=blockSize,
208 resumable=True)
209 req = self.gce.objects().insert(bucket=bucketName, name=objectName, media_body=media)192 req = self.gce.objects().insert(bucket=bucketName, name=objectName, media_body=media)
210 resp = None193 resp = None
211 while resp is None:194 while resp is None:
@@ -243,9 +226,7 @@ class GCEBucket():
243 return226 return
244227
245 aclTypes = ["list", "read", "write"]228 aclTypes = ["list", "read", "write"]
246 aclTypeMap = {"read": "READER",229 aclTypeMap = {"read": "READER", "write": "WRITER", "list": "READER"}
247 "write": "WRITER",
248 "list": "READER"}
249230
250 def translateAcl(self, aclData, aliases):231 def translateAcl(self, aclData, aliases):
251 """232 """
diff --git a/spade/LocalBucket.py b/spade/LocalBucket.py
index f2293ec..9f3f974 100644
--- a/spade/LocalBucket.py
+++ b/spade/LocalBucket.py
@@ -10,7 +10,7 @@ DEFAULT_BLOCKSIZE = 1024 * 1024 * 10
10DEFAULT_DELIMITER = "/"10DEFAULT_DELIMITER = "/"
1111
1212
13class LocalBucket():13class LocalBucket:
14 def __init__(self, creds):14 def __init__(self, creds):
15 wanted = ["bucket_root"]15 wanted = ["bucket_root"]
16 for k in wanted:16 for k in wanted:
@@ -79,8 +79,20 @@ class LocalBucket():
79 co.date = self.standardTimestamp(stat.st_mtime)79 co.date = self.standardTimestamp(stat.st_mtime)
80 co.contentType = "text/plain"80 co.contentType = "text/plain"
81 raw = {}81 raw = {}
82 for num, name in enumerate(["st_mode", "st_ino", "st_dev", "st_nlink", "st_uid",82 for num, name in enumerate(
83 "st_gid", "st_size", "st_atime", "st_mtime", "st_ctime"]):83 [
84 "st_mode",
85 "st_ino",
86 "st_dev",
87 "st_nlink",
88 "st_uid",
89 "st_gid",
90 "st_size",
91 "st_atime",
92 "st_mtime",
93 "st_ctime",
94 ]
95 ):
84 raw[name] = stat[num]96 raw[name] = stat[num]
85 co.rawMetadata = raw97 co.rawMetadata = raw
86 return co98 return co
diff --git a/spade/ObjectIO.py b/spade/ObjectIO.py
index fc2b38f..dfdb804 100644
--- a/spade/ObjectIO.py
+++ b/spade/ObjectIO.py
@@ -2,7 +2,7 @@ import os
2from spade.BucketObject import BucketError2from spade.BucketObject import BucketError
33
44
5class ObjectIO():5class ObjectIO:
6 def __init__(self, parent, bucketName, objectName, blockSize=-1):6 def __init__(self, parent, bucketName, objectName, blockSize=-1):
7 self.parent = parent7 self.parent = parent
8 self.bucketName = bucketName8 self.bucketName = bucketName
@@ -36,10 +36,7 @@ class ObjectIO():
36 if self.closed:36 if self.closed:
37 raise ValueError37 raise ValueError
3838
39 data = self.parent.getObject(self.bucketName,39 data = self.parent.getObject(self.bucketName, self.objectName, self.pos, min(self.pos + n - 1, self.len))
40 self.objectName,
41 self.pos,
42 min(self.pos + n - 1, self.len))
43 self.pos = self.pos + len(data)40 self.pos = self.pos + len(data)
44 return data41 return data
4542
@@ -56,19 +53,19 @@ class ObjectIO():
56 while self.readlinePos < len(self.readlineBuffer):53 while self.readlinePos < len(self.readlineBuffer):
57 lineEndPos = self.readlineBuffer.find(b"\n", self.readlinePos)54 lineEndPos = self.readlineBuffer.find(b"\n", self.readlinePos)
58 if lineEndPos >= 0:55 if lineEndPos >= 0:
59 line = self.readlineBuffer[self.readlinePos:lineEndPos + 1].decode('utf-8')56 line = self.readlineBuffer[self.readlinePos : lineEndPos + 1].decode('utf-8')
60 self.readlinePos = lineEndPos + 157 self.readlinePos = lineEndPos + 1
61 return line58 return line
62 else:59 else:
63 newData = self.read(self.blockSize)60 newData = self.read(self.blockSize)
64 if not newData:61 if not newData:
65 return62 return
66 self.readlineBuffer = self.readlineBuffer[self.readlinePos:] + newData63 self.readlineBuffer = self.readlineBuffer[self.readlinePos :] + newData
67 self.readlinePos = 064 self.readlinePos = 0
6865
69 # In case the last line has no newline66 # In case the last line has no newline
70 if self.readlinePos < len(self.readlineBuffer):67 if self.readlinePos < len(self.readlineBuffer):
71 return self.readlineBuffer[self.readlinePos:]68 return self.readlineBuffer[self.readlinePos :]
7269
73 def readlines(self):70 def readlines(self):
74 while True:71 while True:
diff --git a/spade/S3Bucket.py b/spade/S3Bucket.py
index 6d3036e..67f38fb 100644
--- a/spade/S3Bucket.py
+++ b/spade/S3Bucket.py
@@ -25,15 +25,13 @@ from spade.ObjectIO import ObjectIO
25DEFAULT_BLOCKSIZE = 1024 * 1024 * 1025DEFAULT_BLOCKSIZE = 1024 * 1024 * 10
26DEFAULT_DELIMITER = "/"26DEFAULT_DELIMITER = "/"
27MAX_S3_OBJECTSIZE = 5 * 1024 * 1024 * 1024 # S3 object PUT limit = 5GB27MAX_S3_OBJECTSIZE = 5 * 1024 * 1024 * 1024 # S3 object PUT limit = 5GB
28S3_SEGMENTSIZE = 1024 * 1024 * 1024 # Use 1GB segments for large object uploads28S3_SEGMENTSIZE = 1024 * 1024 * 1024 # Use 1GB segments for large object uploads
29USER_META_PREFIX = "x-amz-meta-"29USER_META_PREFIX = "x-amz-meta-"
3030
3131
32class S3Bucket():32class S3Bucket:
33 def __init__(self, creds):33 def __init__(self, creds):
34 wanted = ["aws_access_key_id",34 wanted = ["aws_access_key_id", "aws_secret_access_key", "aws_region"]
35 "aws_secret_access_key",
36 "aws_region"]
37 for k in wanted:35 for k in wanted:
38 if k not in creds:36 if k not in creds:
39 # Missing creds37 # Missing creds
@@ -44,8 +42,9 @@ class S3Bucket():
4442
45 def oldLibraryFixes(self):43 def oldLibraryFixes(self):
46 try:44 try:
47 self.regions = [{"name": region.name, "endpoint": region.endpoint}45 self.regions = [
48 for region in boto.regioninfo.get_regions("s3")]46 {"name": region.name, "endpoint": region.endpoint} for region in boto.regioninfo.get_regions("s3")
47 ]
49 except AttributeError:48 except AttributeError:
50 # Old boto library doesn't have this, so we'll use the ones we know about49 # Old boto library doesn't have this, so we'll use the ones we know about
51 self.regions = [50 self.regions = [
@@ -63,9 +62,22 @@ class S3Bucket():
63 {'name': 'us-east-1', 'endpoint': 's3.amazonaws.com'},62 {'name': 'us-east-1', 'endpoint': 's3.amazonaws.com'},
64 ]63 ]
6564
66 self.base_fields = set(['content-length', 'content-language', 'content-disposition',65 self.base_fields = set(
67 'content-encoding', 'expires', 'content-md5', 'last-modified',66 [
68 'etag', 'cache-control', 'date', 'content-type', 'x-robots-tag'])67 'content-length',
68 'content-language',
69 'content-disposition',
70 'content-encoding',
71 'expires',
72 'content-md5',
73 'last-modified',
74 'etag',
75 'cache-control',
76 'date',
77 'content-type',
78 'x-robots-tag',
79 ]
80 )
6981
70 def _connect(self):82 def _connect(self):
71 self.endpoint = None83 self.endpoint = None
@@ -78,11 +90,13 @@ class S3Bucket():
78 if not self.endpoint:90 if not self.endpoint:
79 raise NotImplementedError91 raise NotImplementedError
8092
81 self.s3 = boto.s3.connection.S3Connection(self.creds['aws_access_key_id'],93 self.s3 = boto.s3.connection.S3Connection(
82 self.creds['aws_secret_access_key'],94 self.creds['aws_access_key_id'],
83 is_secure=True,95 self.creds['aws_secret_access_key'],
84 host=self.endpoint,96 is_secure=True,
85 calling_format=OrdinaryCallingFormat())97 host=self.endpoint,
98 calling_format=OrdinaryCallingFormat(),
99 )
86100
87 # Boto/S3 gives a 301 redirect response if we try to access a bucket101 # Boto/S3 gives a 301 redirect response if we try to access a bucket
88 # in another region, so make sure we're in the right one102 # in another region, so make sure we're in the right one
@@ -94,7 +108,7 @@ class S3Bucket():
94108
95 # Strip off the first section (the current region)109 # Strip off the first section (the current region)
96 try:110 try:
97 websiteEndpoint = websiteEndpoint[websiteEndpoint.find(".") + 1:]111 websiteEndpoint = websiteEndpoint[websiteEndpoint.find(".") + 1 :]
98 except UnboundLocalError:112 except UnboundLocalError:
99 raise BucketError("S3 Failed to find bucket")113 raise BucketError("S3 Failed to find bucket")
100114
@@ -176,10 +190,12 @@ class S3Bucket():
176 return metadata.rawMetadata.get("etag", "").strip('"')190 return metadata.rawMetadata.get("etag", "").strip('"')
177191
178 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):192 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):
179 systemMetadata = ["x-amz-server-side-encryption",193 systemMetadata = [
180 "x-amz-storage-class",194 "x-amz-server-side-encryption",
181 "x-amz-website-redirect-location",195 "x-amz-storage-class",
182 "x-amz-server-side-encryption-aws-kms-key-id"]196 "x-amz-website-redirect-location",
197 "x-amz-server-side-encryption-aws-kms-key-id",
198 ]
183 self.confirmRegion(bucketName)199 self.confirmRegion(bucketName)
184200
185 # Metadata to remove (if any)201 # Metadata to remove (if any)
@@ -300,19 +316,17 @@ class S3Bucket():
300 return316 return
301317
302 aclTypes = ["list", "read", "write", "readacl", "writeacl"]318 aclTypes = ["list", "read", "write", "readacl", "writeacl"]
303 aclTypeMap = {"read": "READ",319 aclTypeMap = {"read": "READ", "write": "WRITE", "list": "READ", "readacl": "READ_ACP", "writeacl": "WRITE_ACP"}
304 "write": "WRITE",
305 "list": "READ",
306 "readacl": "READ_ACP",
307 "writeacl": "WRITE_ACP"}
308320
309 def createAclData(self, user, type):321 def createAclData(self, user, type):
310 aclData = {"s3_display_name": None,322 aclData = {
311 "s3_id": user,323 "s3_display_name": None,
312 "s3_uri": None,324 "s3_id": user,
313 "s3_email_address": None,325 "s3_uri": None,
314 "s3_type": "CanonicalUser",326 "s3_email_address": None,
315 "s3_permission": self.aclTypeMap[type]}327 "s3_type": "CanonicalUser",
328 "s3_permission": self.aclTypeMap[type],
329 }
316 return aclData330 return aclData
317331
318 def translateAcl(self, aclData, aliases):332 def translateAcl(self, aclData, aliases):
@@ -343,12 +357,14 @@ class S3Bucket():
343357
344 acls = BucketAcl()358 acls = BucketAcl()
345 for grant in acl.acl.grants:359 for grant in acl.acl.grants:
346 aclData = {"s3_display_name": grant.display_name,360 aclData = {
347 "s3_id": grant.id,361 "s3_display_name": grant.display_name,
348 "s3_uri": grant.uri,362 "s3_id": grant.id,
349 "s3_email_address": grant.email_address,363 "s3_uri": grant.uri,
350 "s3_type": grant.type,364 "s3_email_address": grant.email_address,
351 "s3_permission": grant.permission}365 "s3_type": grant.type,
366 "s3_permission": grant.permission,
367 }
352 if grant.permission in ["READ", "FULL_CONTROL"]:368 if grant.permission in ["READ", "FULL_CONTROL"]:
353 if objectName:369 if objectName:
354 acls.add("read", aclData)370 acls.add("read", aclData)
@@ -392,12 +408,14 @@ class S3Bucket():
392 uniqueAcls.append(user)408 uniqueAcls.append(user)
393409
394 for user in uniqueAcls:410 for user in uniqueAcls:
395 grant = boto.s3.acl.Grant(permission=user["s3_permission"],411 grant = boto.s3.acl.Grant(
396 type=user["s3_type"],412 permission=user["s3_permission"],
397 id=user["s3_id"],413 type=user["s3_type"],
398 display_name=user["s3_display_name"],414 id=user["s3_id"],
399 uri=user["s3_uri"],415 display_name=user["s3_display_name"],
400 email_address=["s3_email_address"])416 uri=user["s3_uri"],
417 email_address=["s3_email_address"],
418 )
401 s3acl.acl.add_grant(grant)419 s3acl.acl.add_grant(grant)
402420
403 try:421 try:
diff --git a/spade/Spade.py b/spade/Spade.py
index b8f4b12..5ebdfc0 100644
--- a/spade/Spade.py
+++ b/spade/Spade.py
@@ -6,21 +6,25 @@ import re
6import json6import json
7import fnmatch7import fnmatch
8from spade.BucketObject import BucketObject, BucketError8from spade.BucketObject import BucketObject, BucketError
9
9try:10try:
10 from urllib.parse import urlparse11 from urllib.parse import urlparse
11except ImportError:12except ImportError:
12 from urlparse import urlparse13 from urlparse import urlparse
1314
14import gzip15import gzip
16
15compressionMethods = ["gzip"]17compressionMethods = ["gzip"]
16if sys.version_info.major >= 3 and sys.version_info.minor >= 3:18if sys.version_info.major >= 3 and sys.version_info.minor >= 3:
17 import bz219 import bz2
20
18 compressionMethods.append("bzip2")21 compressionMethods.append("bzip2")
19 import lzma22 import lzma
23
20 compressionMethods.append("xz")24 compressionMethods.append("xz")
21else:25else:
22 # Define a fake lzma.LZMAError26 # Define a fake lzma.LZMAError
23 class lzma():27 class lzma:
24 class LZMAError(Exception):28 class LZMAError(Exception):
25 pass29 pass
2630
@@ -29,7 +33,7 @@ DEFAULT_BLOCKSIZE = 1024 * 1024 * 10
29DEFAULT_DELIMITER = "/"33DEFAULT_DELIMITER = "/"
3034
3135
32class Result():36class Result:
33 def __init__(self):37 def __init__(self):
34 self.output = []38 self.output = []
35 self.error = None39 self.error = None
@@ -38,7 +42,7 @@ class Result():
38 return "<Result object. output: '{}' error: '{}'>".format(self.output, self.error)42 return "<Result object. output: '{}' error: '{}'>".format(self.output, self.error)
3943
4044
41class Bucket():45class Bucket:
42 def __init__(self, configFile):46 def __init__(self, configFile):
43 self.configFile = configFile47 self.configFile = configFile
44 self.config = json.loads(open(configFile, 'r').read())48 self.config = json.loads(open(configFile, 'r').read())
@@ -102,21 +106,27 @@ class Bucket():
102106
103 if bucketType == "swift":107 if bucketType == "swift":
104 from spade.SwiftBucket import SwiftBucket108 from spade.SwiftBucket import SwiftBucket
109
105 return SwiftBucket(self.creds[scheme])110 return SwiftBucket(self.creds[scheme])
106 elif bucketType == "s3":111 elif bucketType == "s3":
107 from spade.S3Bucket import S3Bucket112 from spade.S3Bucket import S3Bucket
113
108 return S3Bucket(self.creds[scheme])114 return S3Bucket(self.creds[scheme])
109 elif bucketType == "gce":115 elif bucketType == "gce":
110 from spade.GCEBucket import GCEBucket116 from spade.GCEBucket import GCEBucket
117
111 return GCEBucket(self.creds[scheme])118 return GCEBucket(self.creds[scheme])
112 elif bucketType == "azure":119 elif bucketType == "azure":
113 from spade.AzureBucket import AzureBucket120 from spade.AzureBucket import AzureBucket
121
114 return AzureBucket(self.creds[scheme])122 return AzureBucket(self.creds[scheme])
115 elif bucketType == "local":123 elif bucketType == "local":
116 from spade.LocalBucket import LocalBucket124 from spade.LocalBucket import LocalBucket
125
117 return LocalBucket(self.creds[scheme])126 return LocalBucket(self.creds[scheme])
118 elif bucketType == "stdio":127 elif bucketType == "stdio":
119 from spade.StdioBucket import StdioBucket128 from spade.StdioBucket import StdioBucket
129
120 return StdioBucket(self.creds[scheme])130 return StdioBucket(self.creds[scheme])
121 else:131 else:
122 raise NotImplementedError132 raise NotImplementedError
@@ -449,11 +459,7 @@ class Bucket():
449 ret.error = e459 ret.error = e
450 yield ret460 yield ret
451 try:461 try:
452 dstBucket.putObject(dstUrl.netloc,462 dstBucket.putObject(dstUrl.netloc, dstObjName, fSrc, objMetadata.size, blockSize=args.blockSize)
453 dstObjName,
454 fSrc,
455 objMetadata.size,
456 blockSize=args.blockSize)
457 except BucketError as e:463 except BucketError as e:
458 ret.error = e464 ret.error = e
459 fSrc.close()465 fSrc.close()
diff --git a/spade/StdioBucket.py b/spade/StdioBucket.py
index ef6cb43..31ba1f6 100644
--- a/spade/StdioBucket.py
+++ b/spade/StdioBucket.py
@@ -8,7 +8,7 @@ from spade.ObjectIO import ObjectIO
8DEFAULT_BLOCKSIZE = 1024 * 1024 * 108DEFAULT_BLOCKSIZE = 1024 * 1024 * 10
99
1010
11class StdioBucket():11class StdioBucket:
12 def __init__(self, creds):12 def __init__(self, creds):
13 return13 return
1414
diff --git a/spade/SwiftBucket.py b/spade/SwiftBucket.py
index 2125b85..0c3adb0 100644
--- a/spade/SwiftBucket.py
+++ b/spade/SwiftBucket.py
@@ -23,19 +23,15 @@ from spade.ObjectIO import ObjectIO
2323
24DEFAULT_BLOCKSIZE = 1024 * 1024 * 1024DEFAULT_BLOCKSIZE = 1024 * 1024 * 10
25DEFAULT_DELIMITER = "/"25DEFAULT_DELIMITER = "/"
26MAX_SWIFT_OBJECTSIZE = 5 * 1024 * 1024 * 1024 # Swift object limit = 5GB26MAX_SWIFT_OBJECTSIZE = 5 * 1024 * 1024 * 1024 # Swift object limit = 5GB
27SWIFT_SEGMENTSIZE = 1024 * 1024 * 1024 # Use 1GB segments for large object uploads27SWIFT_SEGMENTSIZE = 1024 * 1024 * 1024 # Use 1GB segments for large object uploads
28USER_META_PREFIX = "x-object-meta-"28USER_META_PREFIX = "x-object-meta-"
29ACL_META_PREFIX = "x-container-"29ACL_META_PREFIX = "x-container-"
3030
3131
32class SwiftBucket():32class SwiftBucket:
33 def __init__(self, creds):33 def __init__(self, creds):
34 wanted = ["os_username",34 wanted = ["os_username", "os_password", "os_tenant_name", "os_auth_url", "os_region_name"]
35 "os_password",
36 "os_tenant_name",
37 "os_auth_url",
38 "os_region_name"]
39 for k in wanted:35 for k in wanted:
40 if k not in creds:36 if k not in creds:
41 # Missing creds37 # Missing creds
@@ -51,7 +47,8 @@ class SwiftBucket():
51 key=creds["os_password"],47 key=creds["os_password"],
52 retries=5,48 retries=5,
53 auth_version="2.0",49 auth_version="2.0",
54 os_options=options)50 os_options=options,
51 )
5552
56 def standardTimestamp(self, timestamp):53 def standardTimestamp(self, timestamp):
57 try:54 try:
@@ -109,8 +106,7 @@ class SwiftBucket():
109 return co106 return co
110107
111 def setBucketMetadata(self, bucketName, metadata, overwrite=False): # noqa: C901108 def setBucketMetadata(self, bucketName, metadata, overwrite=False): # noqa: C901
112 systemMetadata = ["content-type", "x-container-bytes-used",109 systemMetadata = ["content-type", "x-container-bytes-used", "x-container-object-count:", "x-timestamp"]
113 "x-container-object-count:", "x-timestamp"]
114 newMetadata = {}110 newMetadata = {}
115 # Keep existing system metadata and (optionally) user metadata111 # Keep existing system metadata and (optionally) user metadata
116 m = self.getBucketMetadata(bucketName)112 m = self.getBucketMetadata(bucketName)
@@ -163,8 +159,14 @@ class SwiftBucket():
163 return metadata.rawMetadata.get("etag", "").strip('"')159 return metadata.rawMetadata.get("etag", "").strip('"')
164160
165 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):161 def setObjectMetadata(self, bucketName, objectName, metadata, overwrite=False):
166 systemMetadata = ["content-type", "content-encoding", "content-disposition",162 systemMetadata = [
167 "x-delete-at", "x-delete-after", "etag"]163 "content-type",
164 "content-encoding",
165 "content-disposition",
166 "x-delete-at",
167 "x-delete-after",
168 "etag",
169 ]
168 newMetadata = {}170 newMetadata = {}
169 # Keep existing system metadata and (optionally) user metadata171 # Keep existing system metadata and (optionally) user metadata
170 m = self.getObjectMetadata(bucketName, objectName)172 m = self.getObjectMetadata(bucketName, objectName)
@@ -223,11 +225,13 @@ class SwiftBucket():
223 segmentName = '{}/{}/{}/{}/{:08d}'.format(objectName, mtime, size, SWIFT_SEGMENTSIZE, segmentId)225 segmentName = '{}/{}/{}/{}/{:08d}'.format(objectName, mtime, size, SWIFT_SEGMENTSIZE, segmentId)
224 segmentId += 1226 segmentId += 1
225 try:227 try:
226 self.swift.put_object(segmentBucketName,228 self.swift.put_object(
227 segmentName,229 segmentBucketName,
228 src,230 segmentName,
229 content_length=segmentSize,231 src,
230 content_type="application/octet-stream")232 content_length=segmentSize,
233 content_type="application/octet-stream",
234 )
231 except swiftclient.ClientException:235 except swiftclient.ClientException:
232 raise BucketError("Swift Failed to put large object segment")236 raise BucketError("Swift Failed to put large object segment")
233 bytesRead += segmentSize237 bytesRead += segmentSize
@@ -239,12 +243,9 @@ class SwiftBucket():
239 'x-object-meta-mtime': mtime,243 'x-object-meta-mtime': mtime,
240 }244 }
241 try:245 try:
242 self.swift.put_object(bucketName,246 self.swift.put_object(
243 objectName,247 bucketName, objectName, '', content_length=0, content_type="application/octet-stream", headers=headers
244 '',248 )
245 content_length=0,
246 content_type="application/octet-stream",
247 headers=headers)
248 except swiftclient.ClientException:249 except swiftclient.ClientException:
249 raise BucketError("Swift Failed to put large object manifest")250 raise BucketError("Swift Failed to put large object manifest")
250 return251 return
@@ -254,10 +255,7 @@ class SwiftBucket():
254 return self.putLargeObject(bucketName, objectName, src, size, min(blockSize, SWIFT_SEGMENTSIZE))255 return self.putLargeObject(bucketName, objectName, src, size, min(blockSize, SWIFT_SEGMENTSIZE))
255256
256 try:257 try:
257 self.swift.put_object(bucketName,258 self.swift.put_object(bucketName, objectName, src, content_type="application/octet-stream")
258 objectName,
259 src,
260 content_type="application/octet-stream")
261 except swiftclient.exceptions.ClientException:259 except swiftclient.exceptions.ClientException:
262 raise BucketError("Swift Failed to put object")260 raise BucketError("Swift Failed to put object")
263 return261 return

Subscribers

People subscribed via source and target branches

to all changes: