Merge lp:~notmyname/swift/internal_proxy_reload into lp:~hudson-openstack/swift/trunk
- internal_proxy_reload
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | clayg | ||||
Approved revision: | 233 | ||||
Merged at revision: | 235 | ||||
Proposed branch: | lp:~notmyname/swift/internal_proxy_reload | ||||
Merge into: | lp:~hudson-openstack/swift/trunk | ||||
Diff against target: |
428 lines (+241/-83) 3 files modified
swift/common/internal_proxy.py (+70/-74) swift/stats/log_processor.py (+6/-7) test/unit/common/test_internal_proxy.py (+165/-2) |
||||
To merge this branch: | bzr merge lp:~notmyname/swift/internal_proxy_reload | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
clayg | Approve | ||
Greg Lange (community) | Approve | ||
Review via email: mp+51377@code.launchpad.net |
Commit message
change internal proxy to make calls to handle_request with a copy of the request rather than the request itself
Description of the change
change internal proxy to make calls to handle_request with a copy of the request rather than the request itself. This prevents side-effects in the case of retries (handle_request() modifies the given request object).
clayg (clay-gerrard) wrote : | # |
I'm really glad that internal_proxy is getting some tests! But, I think some of the new webob_request_copy stuff is still not quite right... I can still reproduce the problem described in the bug report.
Also I have some other thoughts on some other stuff that you may or may not want to do differently...
swift/common/
47: since webob has no idea how big this file like object is... doesn't this mess with content_length?
47: what happens if you read from req_copy's body? Can you still read that same data from the orig_req?
48: shouldn't you return the req_copy?
77: why update the copy each time? why not update the request before we make a copy?
107: why inline? just parametrize source_file...
111: Why? it already has read? AttributeError if _source_file is StringIO?
112: why?
119: why?
120: why not kwarg these when you create the request?
122: isn't etag normally a header? What is the intention here?
124-135: it'd be nice if webob_request_copy could optionally support req's who need something like "make_request_
170: might be cool to make container a kwarg that defaults to '' - so that you can do account get's when you don't specify an container arg!
187: should this method really return an empty list on 404? If you don't do a "full_listing" you get None (which seems more correct)
test/unit/
59: that doesn't seem like it should be True, wouldn't the copy have a different object id?
60: what about with a body?
62: this seems to assert that DumbBaseApplica
89: seems like this would fit better up next to test_handle_
99: did you get the 200 or the 500 back? What if you exceed retries?
126: isn't it a container PUT?
133: what about the retry loop?
John Dickinson (notmyname) wrote : | # |
inline
On Mar 2, 2011, at 1:46 PM, clayg wrote:
> Review: Needs Fixing
> I'm really glad that internal_proxy is getting some tests! But, I think some of the new webob_request_copy stuff is still not quite right... I can still reproduce the problem described in the bug report.
>
> Also I have some other thoughts on some other stuff that you may or may not want to do differently...
>
> swift/common/
> 47: since webob has no idea how big this file like object is... doesn't this mess with content_length?
needs fixing for other reasons anyway (see below)
> 47: what happens if you read from req_copy's body? Can you still read that same data from the orig_req?
doesn't seem like it. need a better way to do this
> 48: shouldn't you return the req_copy?
ya, dumb mistake on my part. no wonder things were working for me
> 77: why update the copy each time? why not update the request before we make a copy?
because handle_request modifies the request
> 107: why inline? just parametrize source_file...
because I have to call it again for a retry (to reset the read pointer)
> 111: Why? it already has read? AttributeError if _source_file is StringIO?
unfortunately, StringIO is used in the code, so this needs fixed
> 112: why?
to ensure that we read from the start (important for retries)
> 119: why?
needed for the proxy server
> 120: why not kwarg these when you create the request?
meh
> 122: isn't etag normally a header? What is the intention here?
webob sets it with the attribute
> 124-135: it'd be nice if webob_request_copy could optionally support req's who need something like "make_request_
perhaps, but we only need the copy body logic once, in upload_file
> 170: might be cool to make container a kwarg that defaults to '' - so that you can do account get's when you don't specify an container arg!
sure, but that's not needed now
> 187: should this method really return an empty list on 404? If you don't do a "full_listing" you get None (which seems more correct)
changed to be more like client.py (the original source of this function). now raises error on error response. need to update log_processor to handle errors
>
> test/unit/
> 59: that doesn't seem like it should be True, wouldn't the copy have a different object id?
changed to test the specific parts of the request that are important at this time
> 60: what about with a body?
> 62: this seems to assert that DumbBaseApplica
- 232. By John Dickinson
-
improved internal proxy tests and changed internal_
proxy.get_ container_ list - 233. By John Dickinson
-
better handling of internal_
proxy.get_ container_ list response in log_processor
Preview Diff
1 | === modified file 'swift/common/internal_proxy.py' |
2 | --- swift/common/internal_proxy.py 2011-01-04 23:34:43 +0000 |
3 | +++ swift/common/internal_proxy.py 2011-03-08 17:00:13 +0000 |
4 | @@ -23,25 +23,45 @@ |
5 | |
6 | class MemcacheStub(object): |
7 | |
8 | - def get(self, *a, **kw): |
9 | - return None |
10 | - |
11 | - def set(self, *a, **kw): |
12 | - return None |
13 | - |
14 | - def incr(self, *a, **kw): |
15 | + def get(self, *a, **kw): # pragma: no cover |
16 | + return None |
17 | + |
18 | + def set(self, *a, **kw): # pragma: no cover |
19 | + return None |
20 | + |
21 | + def incr(self, *a, **kw): # pragma: no cover |
22 | return 0 |
23 | |
24 | - def delete(self, *a, **kw): |
25 | - return None |
26 | - |
27 | - def set_multi(self, *a, **kw): |
28 | - return None |
29 | - |
30 | - def get_multi(self, *a, **kw): |
31 | + def delete(self, *a, **kw): # pragma: no cover |
32 | + return None |
33 | + |
34 | + def set_multi(self, *a, **kw): # pragma: no cover |
35 | + return None |
36 | + |
37 | + def get_multi(self, *a, **kw): # pragma: no cover |
38 | return [] |
39 | |
40 | |
41 | +def make_request_body_file(source_file, compress=True): |
42 | + if hasattr(source_file, 'seek'): |
43 | + source_file.seek(0) |
44 | + else: |
45 | + source_file = open(source_file, 'rb') |
46 | + if compress: |
47 | + compressed_file = CompressingFileReader(source_file) |
48 | + return compressed_file |
49 | + return source_file |
50 | + |
51 | + |
52 | +def webob_request_copy(orig_req, source_file=None, compress=True): |
53 | + req_copy = orig_req.copy() |
54 | + if source_file: |
55 | + req_copy.body_file = make_request_body_file(source_file, |
56 | + compress=compress) |
57 | + req_copy.content_length = orig_req.content_length |
58 | + return req_copy |
59 | + |
60 | + |
61 | class InternalProxy(object): |
62 | """ |
63 | Set up a private instance of a proxy server that allows normal requests |
64 | @@ -59,6 +79,20 @@ |
65 | logger=logger) |
66 | self.retries = retries |
67 | |
68 | + def _handle_request(self, req, source_file=None, compress=True): |
69 | + req = self.upload_app.update_request(req) |
70 | + req_copy = webob_request_copy(req, source_file=source_file, |
71 | + compress=compress) |
72 | + resp = self.upload_app.handle_request(req_copy) |
73 | + tries = 1 |
74 | + while (resp.status_int < 200 or resp.status_int > 299) \ |
75 | + and tries < self.retries: |
76 | + req_copy = webob_request_copy(req, source_file=source_file, |
77 | + compress=compress) |
78 | + resp = self.upload_app.handle_request(req_copy) |
79 | + tries += 1 |
80 | + return resp |
81 | + |
82 | def upload_file(self, source_file, account, container, object_name, |
83 | compress=True, content_type='application/x-gzip', |
84 | etag=None): |
85 | @@ -81,33 +115,14 @@ |
86 | return False |
87 | |
88 | # upload the file to the account |
89 | - req = webob.Request.blank(target_name, |
90 | + req = webob.Request.blank(target_name, content_type=content_type, |
91 | environ={'REQUEST_METHOD': 'PUT'}, |
92 | headers={'Transfer-Encoding': 'chunked'}) |
93 | - if compress: |
94 | - if hasattr(source_file, 'read'): |
95 | - compressed_file = CompressingFileReader(source_file) |
96 | - else: |
97 | - compressed_file = CompressingFileReader( |
98 | - open(source_file, 'rb')) |
99 | - req.body_file = compressed_file |
100 | - else: |
101 | - if not hasattr(source_file, 'read'): |
102 | - source_file = open(source_file, 'rb') |
103 | - req.body_file = source_file |
104 | - req.account = account |
105 | - req.content_type = content_type |
106 | req.content_length = None # to make sure we send chunked data |
107 | if etag: |
108 | - req.etag = etag |
109 | - resp = self.upload_app.handle_request( |
110 | - self.upload_app.update_request(req)) |
111 | - tries = 1 |
112 | - while (resp.status_int < 200 or resp.status_int > 299) \ |
113 | - and tries <= self.retries: |
114 | - resp = self.upload_app.handle_request( |
115 | - self.upload_app.update_request(req)) |
116 | - tries += 1 |
117 | + req.headers['etag'] = etag |
118 | + resp = self._handle_request(req, source_file=source_file, |
119 | + compress=compress) |
120 | if not (200 <= resp.status_int < 300): |
121 | return False |
122 | return True |
123 | @@ -124,15 +139,7 @@ |
124 | req = webob.Request.blank('/v1/%s/%s/%s' % |
125 | (account, container, object_name), |
126 | environ={'REQUEST_METHOD': 'GET'}) |
127 | - req.account = account |
128 | - resp = self.upload_app.handle_request( |
129 | - self.upload_app.update_request(req)) |
130 | - tries = 1 |
131 | - while (resp.status_int < 200 or resp.status_int > 299) \ |
132 | - and tries <= self.retries: |
133 | - resp = self.upload_app.handle_request( |
134 | - self.upload_app.update_request(req)) |
135 | - tries += 1 |
136 | + resp = self._handle_request(req) |
137 | return resp.status_int, resp.app_iter |
138 | |
139 | def create_container(self, account, container): |
140 | @@ -145,37 +152,31 @@ |
141 | """ |
142 | req = webob.Request.blank('/v1/%s/%s' % (account, container), |
143 | environ={'REQUEST_METHOD': 'PUT'}) |
144 | - req.account = account |
145 | - resp = self.upload_app.handle_request( |
146 | - self.upload_app.update_request(req)) |
147 | - tries = 1 |
148 | - while (resp.status_int < 200 or resp.status_int > 299) \ |
149 | - and tries <= self.retries: |
150 | - resp = self.upload_app.handle_request( |
151 | - self.upload_app.update_request(req)) |
152 | - tries += 1 |
153 | + resp = self._handle_request(req) |
154 | return 200 <= resp.status_int < 300 |
155 | |
156 | def get_container_list(self, account, container, marker=None, |
157 | end_marker=None, limit=None, prefix=None, |
158 | delimiter=None, full_listing=True): |
159 | """ |
160 | - Get container listing. |
161 | + Get a listing of objects for the container. |
162 | |
163 | :param account: account name for the container |
164 | - :param container: container name to get the listing of |
165 | + :param container: container name to get a listing for |
166 | :param marker: marker query |
167 | :param end_marker: end marker query |
168 | - :param limit: limit to query |
169 | + :param limit: limit query |
170 | :param prefix: prefix query |
171 | - :param delimeter: delimeter for query |
172 | - :param full_listing: if True, make enough requests to get all listings |
173 | + :param delimeter: string to delimit the queries on |
174 | + :param full_listing: if True, return a full listing, else returns a max |
175 | + of 10000 listings |
176 | :returns: list of objects |
177 | """ |
178 | if full_listing: |
179 | rv = [] |
180 | listing = self.get_container_list(account, container, marker, |
181 | - end_marker, limit, prefix, delimiter, full_listing=False) |
182 | + end_marker, limit, prefix, |
183 | + delimiter, full_listing=False) |
184 | while listing: |
185 | rv.extend(listing) |
186 | if not delimiter: |
187 | @@ -183,9 +184,11 @@ |
188 | else: |
189 | marker = listing[-1].get('name', listing[-1].get('subdir')) |
190 | listing = self.get_container_list(account, container, marker, |
191 | - end_marker, limit, prefix, delimiter, full_listing=False) |
192 | + end_marker, limit, prefix, |
193 | + delimiter, |
194 | + full_listing=False) |
195 | return rv |
196 | - path = '/v1/%s/%s' % (account, container) |
197 | + path = '/v1/%s/%s' % (account, quote(container)) |
198 | qs = 'format=json' |
199 | if marker: |
200 | qs += '&marker=%s' % quote(marker) |
201 | @@ -199,16 +202,9 @@ |
202 | qs += '&delimiter=%s' % quote(delimiter) |
203 | path += '?%s' % qs |
204 | req = webob.Request.blank(path, environ={'REQUEST_METHOD': 'GET'}) |
205 | - req.account = account |
206 | - resp = self.upload_app.handle_request( |
207 | - self.upload_app.update_request(req)) |
208 | - tries = 1 |
209 | - while (resp.status_int < 200 or resp.status_int > 299) \ |
210 | - and tries <= self.retries: |
211 | - resp = self.upload_app.handle_request( |
212 | - self.upload_app.update_request(req)) |
213 | - tries += 1 |
214 | + resp = self._handle_request(req) |
215 | + if resp.status_int < 200 or resp.status_int >= 300: |
216 | + return [] # TODO: distinguish between 404 and empty container |
217 | if resp.status_int == 204: |
218 | return [] |
219 | - if 200 <= resp.status_int < 300: |
220 | - return json_loads(resp.body) |
221 | + return json_loads(resp.body) |
222 | |
223 | === modified file 'swift/stats/log_processor.py' |
224 | --- swift/stats/log_processor.py 2011-03-03 21:48:38 +0000 |
225 | +++ swift/stats/log_processor.py 2011-03-08 17:00:13 +0000 |
226 | @@ -147,13 +147,12 @@ |
227 | marker=search_key, |
228 | end_marker=end_key) |
229 | results = [] |
230 | - if container_listing is not None: |
231 | - if listing_filter is None: |
232 | - listing_filter = set() |
233 | - for item in container_listing: |
234 | - name = item['name'] |
235 | - if name not in listing_filter: |
236 | - results.append(name) |
237 | + if listing_filter is None: |
238 | + listing_filter = set() |
239 | + for item in container_listing: |
240 | + name = item['name'] |
241 | + if name not in listing_filter: |
242 | + results.append(name) |
243 | return results |
244 | |
245 | def get_object_data(self, swift_account, container_name, object_name, |
246 | |
247 | === modified file 'test/unit/common/test_internal_proxy.py' |
248 | --- test/unit/common/test_internal_proxy.py 2011-01-04 23:34:43 +0000 |
249 | +++ test/unit/common/test_internal_proxy.py 2011-03-08 17:00:13 +0000 |
250 | @@ -16,13 +16,176 @@ |
251 | # TODO: Tests |
252 | |
253 | import unittest |
254 | +import webob |
255 | +import tempfile |
256 | +import json |
257 | + |
258 | from swift.common import internal_proxy |
259 | |
260 | +class DumbBaseApplicationFactory(object): |
261 | + |
262 | + def __init__(self, status_codes, body=''): |
263 | + self.status_codes = status_codes[:] |
264 | + self.body = body |
265 | + |
266 | + def __call__(self, *a, **kw): |
267 | + app = DumbBaseApplication(*a, **kw) |
268 | + app.status_codes = self.status_codes |
269 | + try: |
270 | + app.default_status_code = self.status_codes[-1] |
271 | + except IndexError: |
272 | + app.default_status_code = 200 |
273 | + app.body = self.body |
274 | + return app |
275 | + |
276 | +class DumbBaseApplication(object): |
277 | + |
278 | + def __init__(self, *a, **kw): |
279 | + self.status_codes = [] |
280 | + self.default_status_code = 200 |
281 | + self.call_count = 0 |
282 | + self.body = '' |
283 | + |
284 | + def handle_request(self, req): |
285 | + self.call_count += 1 |
286 | + req.path_info_pop() |
287 | + if isinstance(self.body, list): |
288 | + try: |
289 | + body = self.body.pop(0) |
290 | + except IndexError: |
291 | + body = '' |
292 | + else: |
293 | + body = self.body |
294 | + resp = webob.Response(request=req, body=body, |
295 | + conditional_response=True) |
296 | + try: |
297 | + resp.status_int = self.status_codes.pop(0) |
298 | + except IndexError: |
299 | + resp.status_int = self.default_status_code |
300 | + return resp |
301 | + |
302 | + def update_request(self, req): |
303 | + return req |
304 | + |
305 | |
306 | class TestInternalProxy(unittest.TestCase): |
307 | |
308 | - def test_placeholder(self): |
309 | - pass |
310 | + def test_webob_request_copy(self): |
311 | + req = webob.Request.blank('/') |
312 | + req2 = internal_proxy.webob_request_copy(req) |
313 | + self.assertEquals(req.path, req2.path) |
314 | + self.assertEquals(req.path_info, req2.path_info) |
315 | + self.assertFalse(req is req2) |
316 | + |
317 | + def test_handle_request(self): |
318 | + status_codes = [200] |
319 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
320 | + status_codes) |
321 | + p = internal_proxy.InternalProxy() |
322 | + req = webob.Request.blank('/') |
323 | + orig_req = internal_proxy.webob_request_copy(req) |
324 | + resp = p._handle_request(req) |
325 | + self.assertEquals(req.path_info, orig_req.path_info) |
326 | + |
327 | + def test_handle_request_with_retries(self): |
328 | + status_codes = [500, 200] |
329 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
330 | + status_codes) |
331 | + p = internal_proxy.InternalProxy(retries=3) |
332 | + req = webob.Request.blank('/') |
333 | + orig_req = internal_proxy.webob_request_copy(req) |
334 | + resp = p._handle_request(req) |
335 | + self.assertEquals(req.path_info, orig_req.path_info) |
336 | + self.assertEquals(p.upload_app.call_count, 2) |
337 | + self.assertEquals(resp.status_int, 200) |
338 | + |
339 | + def test_get_object(self): |
340 | + status_codes = [200] |
341 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
342 | + status_codes) |
343 | + p = internal_proxy.InternalProxy() |
344 | + code, body = p.get_object('a', 'c', 'o') |
345 | + body = ''.join(body) |
346 | + self.assertEquals(code, 200) |
347 | + self.assertEquals(body, '') |
348 | + |
349 | + def test_create_container(self): |
350 | + status_codes = [200] |
351 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
352 | + status_codes) |
353 | + p = internal_proxy.InternalProxy() |
354 | + resp = p.create_container('a', 'c') |
355 | + self.assertTrue(resp) |
356 | + |
357 | + def test_handle_request_with_retries_all_error(self): |
358 | + status_codes = [500, 500, 500, 500, 500] |
359 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
360 | + status_codes) |
361 | + p = internal_proxy.InternalProxy(retries=3) |
362 | + req = webob.Request.blank('/') |
363 | + orig_req = internal_proxy.webob_request_copy(req) |
364 | + resp = p._handle_request(req) |
365 | + self.assertEquals(req.path_info, orig_req.path_info) |
366 | + self.assertEquals(p.upload_app.call_count, 3) |
367 | + self.assertEquals(resp.status_int, 500) |
368 | + |
369 | + def test_get_container_list_empty(self): |
370 | + status_codes = [200] |
371 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
372 | + status_codes, body='[]') |
373 | + p = internal_proxy.InternalProxy() |
374 | + resp = p.get_container_list('a', 'c') |
375 | + self.assertEquals(resp, []) |
376 | + |
377 | + def test_get_container_list_no_body(self): |
378 | + status_codes = [204] |
379 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
380 | + status_codes, body='') |
381 | + p = internal_proxy.InternalProxy() |
382 | + resp = p.get_container_list('a', 'c') |
383 | + self.assertEquals(resp, []) |
384 | + |
385 | + def test_get_container_list_full_listing(self): |
386 | + status_codes = [200, 200] |
387 | + obj_a = dict(name='foo', hash='foo', bytes=3, |
388 | + content_type='text/plain', last_modified='2011/01/01') |
389 | + obj_b = dict(name='bar', hash='bar', bytes=3, |
390 | + content_type='text/plain', last_modified='2011/01/01') |
391 | + body = [json.dumps([obj_a]), json.dumps([obj_b]), json.dumps([])] |
392 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
393 | + status_codes, body=body) |
394 | + p = internal_proxy.InternalProxy() |
395 | + resp = p.get_container_list('a', 'c') |
396 | + expected = ['foo', 'bar'] |
397 | + self.assertEquals([x['name'] for x in resp], expected) |
398 | + |
399 | + def test_get_container_list_full(self): |
400 | + status_codes = [204] |
401 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
402 | + status_codes, body='') |
403 | + p = internal_proxy.InternalProxy() |
404 | + resp = p.get_container_list('a', 'c', marker='a', end_marker='b', |
405 | + limit=100, prefix='/', delimiter='.') |
406 | + self.assertEquals(resp, []) |
407 | + |
408 | + def test_upload_file(self): |
409 | + status_codes = [200, 200] # container PUT + object PUT |
410 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
411 | + status_codes) |
412 | + p = internal_proxy.InternalProxy() |
413 | + with tempfile.NamedTemporaryFile() as file_obj: |
414 | + resp = p.upload_file(file_obj.name, 'a', 'c', 'o') |
415 | + self.assertTrue(resp) |
416 | + |
417 | + def test_upload_file_with_retries(self): |
418 | + status_codes = [200, 500, 200] # container PUT + error + object PUT |
419 | + internal_proxy.BaseApplication = DumbBaseApplicationFactory( |
420 | + status_codes) |
421 | + p = internal_proxy.InternalProxy(retries=3) |
422 | + with tempfile.NamedTemporaryFile() as file_obj: |
423 | + resp = p.upload_file(file_obj, 'a', 'c', 'o') |
424 | + self.assertTrue(resp) |
425 | + self.assertEquals(p.upload_app.call_count, 3) |
426 | |
427 | |
428 | if __name__ == '__main__': |
Looks good.