Merge ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests into launchpad:master
- Git
- lp:~canonical-launchpad-branches/launchpad/+git/bzr-personal
- cjwatson/timeout-with-requests
- Merge into master
Proposed by
William Grant
Status: | Merged |
---|---|
Merged at revision: | 59ad69fc9edb62336f98e3883e91341cb4876d58 |
Proposed branch: | ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests |
Merge into: | launchpad:master |
Diff against target: |
439 lines (+167/-60) 3 files modified
lib/lp/services/googlesearch/doc/google-searchservice.txt (+8/-0) lib/lp/services/tests/test_timeout.py (+47/-30) lib/lp/services/timeout.py (+112/-30) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Canonical Launchpad Branches | Pending | ||
Review via email: mp+264221@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/lib/lp/services/googlesearch/doc/google-searchservice.txt b/lib/lp/services/googlesearch/doc/google-searchservice.txt |
2 | index 88c5646..17763b5 100644 |
3 | --- a/lib/lp/services/googlesearch/doc/google-searchservice.txt |
4 | +++ b/lib/lp/services/googlesearch/doc/google-searchservice.txt |
5 | @@ -6,6 +6,12 @@ The GoogleSearchService is a Google Custom Service Business Edition |
6 | (cs-be) client. Given one or more terms, it will retrieve an XML |
7 | summary of the matching launchpad.net pages. |
8 | |
9 | +We silence logging of new HTTP connections from requests throughout. |
10 | + |
11 | + >>> from fixtures import FakeLogger |
12 | + >>> logger = FakeLogger() |
13 | + >>> logger.setUp() |
14 | + |
15 | |
16 | GoogleSearchService |
17 | =================== |
18 | @@ -616,3 +622,5 @@ cases a TimeoutError is issued. |
19 | # Restore the configuration and the timeout state. |
20 | >>> timeout_data = config.pop('timeout_data') |
21 | >>> set_default_timeout_function(old_timeout_function) |
22 | + |
23 | + >>> logger.cleanUp() |
24 | diff --git a/lib/lp/services/tests/test_timeout.py b/lib/lp/services/tests/test_timeout.py |
25 | index 6a20797..e34d7de 100644 |
26 | --- a/lib/lp/services/tests/test_timeout.py |
27 | +++ b/lib/lp/services/tests/test_timeout.py |
28 | @@ -1,4 +1,4 @@ |
29 | -# Copyright 2012 Canonical Ltd. This software is licensed under the |
30 | +# Copyright 2012-2015 Canonical Ltd. This software is licensed under the |
31 | # GNU Affero General Public License version 3 (see the file LICENSE). |
32 | |
33 | """timeout.py tests. |
34 | @@ -6,7 +6,6 @@ |
35 | |
36 | __metaclass__ = type |
37 | |
38 | -from cStringIO import StringIO |
39 | from SimpleXMLRPCServer import ( |
40 | SimpleXMLRPCRequestHandler, |
41 | SimpleXMLRPCServer, |
42 | @@ -14,13 +13,13 @@ from SimpleXMLRPCServer import ( |
43 | import socket |
44 | from textwrap import dedent |
45 | import threading |
46 | -import time |
47 | -import urllib2 |
48 | import xmlrpclib |
49 | |
50 | -from zope.interface import implements |
51 | +from requests.exceptions import ( |
52 | + ConnectionError, |
53 | + InvalidSchema, |
54 | + ) |
55 | |
56 | -from lp.services.log.logger import FakeLogger |
57 | from lp.services.timeout import ( |
58 | get_default_timeout_function, |
59 | set_default_timeout_function, |
60 | @@ -30,11 +29,11 @@ from lp.services.timeout import ( |
61 | with_timeout, |
62 | ) |
63 | from lp.testing import TestCase |
64 | -from lp.testing.layers import BaseLayer |
65 | |
66 | |
67 | @with_timeout() |
68 | -def no_default_timeout(): pass |
69 | +def no_default_timeout(): |
70 | + pass |
71 | |
72 | |
73 | class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler): |
74 | @@ -54,10 +53,12 @@ class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler): |
75 | class MySimpleXMLRPCServer(SimpleXMLRPCServer): |
76 | """Create a simple XMLRPC server to listen for requests.""" |
77 | allow_reuse_address = True |
78 | + |
79 | def serve_2_requests(self): |
80 | for i in range(2): |
81 | self.handle_request() |
82 | self.server_close() |
83 | + |
84 | def handle_error(self, request, address): |
85 | pass |
86 | |
87 | @@ -69,11 +70,13 @@ class TestTimeout(TestCase): |
88 | finishes before the supplied timeout, it should function normally. |
89 | """ |
90 | wait_evt = threading.Event() |
91 | + |
92 | @with_timeout(timeout=0.5) |
93 | def wait_100ms(): |
94 | """Function that waits for a supplied number of seconds.""" |
95 | wait_evt.wait(0.1) |
96 | return "Succeeded." |
97 | + |
98 | self.assertEqual("Succeeded.", wait_100ms()) |
99 | |
100 | def test_timeout_overrun(self): |
101 | @@ -87,11 +90,13 @@ class TestTimeout(TestCase): |
102 | # inform us that it is about to exit. |
103 | wait_evt = threading.Event() |
104 | stopping_evt = threading.Event() |
105 | + |
106 | @with_timeout(timeout=0.5) |
107 | def wait_for_event(): |
108 | """Function that waits for a supplied number of seconds.""" |
109 | wait_evt.wait() |
110 | stopping_evt.set() |
111 | + |
112 | self.assertRaises(TimeoutError, wait_for_event) |
113 | wait_evt.set() |
114 | stopping_evt.wait() |
115 | @@ -115,6 +120,7 @@ class TestTimeout(TestCase): |
116 | socket.setdefaulttimeout(5) |
117 | sockets = socket.socketpair() |
118 | closed = [] |
119 | + |
120 | def close_socket(): |
121 | closed.append(True) |
122 | sockets[0].shutdown(socket.SHUT_RDWR) |
123 | @@ -152,16 +158,17 @@ class TestTimeout(TestCase): |
124 | method.""" |
125 | def do_definition(): |
126 | @with_timeout(cleanup='not_a_method', timeout=0.5) |
127 | - def a_function(): pass |
128 | + def a_function(): |
129 | + pass |
130 | self.assertRaises(TypeError, do_definition) |
131 | |
132 | def test_timeout_uses_default(self): |
133 | - """If the timeout parameter isn't provided, it will default to the value |
134 | - returned by the function installed as "default_timeout_function". A |
135 | - function is used because it's useful for the timeout value to be |
136 | - determined dynamically. For example, if you want to limit the |
137 | - overall processing to 30s and you already did 14s, you want that timeout |
138 | - to be 16s. |
139 | + """If the timeout parameter isn't provided, it will default to the |
140 | + value returned by the function installed as |
141 | + "default_timeout_function". A function is used because it's useful |
142 | + for the timeout value to be determined dynamically. For example, if |
143 | + you want to limit the overall processing to 30s and you already did |
144 | + 14s, you want that timeout to be 16s. |
145 | |
146 | By default, there is no default_timeout_function. |
147 | """ |
148 | @@ -177,23 +184,25 @@ class TestTimeout(TestCase): |
149 | str(e)) |
150 | |
151 | def test_set_default_timeout(self): |
152 | - """the set_default_timeout_function() takes a function that should return |
153 | - the number of seconds to wait. |
154 | + """The set_default_timeout_function() takes a function that should |
155 | + return the number of seconds to wait. |
156 | """ |
157 | using_default = [] |
158 | + |
159 | def my_default_timeout(): |
160 | using_default.append(True) |
161 | return 1 |
162 | + |
163 | set_default_timeout_function(my_default_timeout) |
164 | self.addCleanup(set_default_timeout_function, None) |
165 | no_default_timeout() |
166 | self.assertEqual([True], using_default) |
167 | |
168 | def make_test_socket(self): |
169 | - """One common use case for timing out is when making an HTTP request to |
170 | - an external site to fetch content. To this end, the timeout module has |
171 | - a urlfetch() function that retrieve a URL using custom urllib2 handlers |
172 | - that will timeout using the default timeout function and clean-up the |
173 | + """One common use case for timing out is when making an HTTP request |
174 | + to an external site to fetch content. To this end, the timeout |
175 | + module has a urlfetch() function that retrieves a URL in such a way |
176 | + as to timeout using the default timeout function and clean-up the |
177 | socket properly. |
178 | """ |
179 | sock = socket.socket() |
180 | @@ -206,11 +215,11 @@ class TestTimeout(TestCase): |
181 | http_server_url = 'http://%s:%d/' % sock.getsockname() |
182 | return sock, http_server_url |
183 | |
184 | - def test_urlfetch_raises_urllib2_exceptions(self): |
185 | - """Normal urllib2 exceptions are raised.""" |
186 | + def test_urlfetch_raises_requests_exceptions(self): |
187 | + """Normal requests exceptions are raised.""" |
188 | sock, http_server_url = self.make_test_socket() |
189 | |
190 | - e = self.assertRaises(urllib2.URLError, urlfetch, http_server_url) |
191 | + e = self.assertRaises(ConnectionError, urlfetch, http_server_url) |
192 | self.assertIn('Connection refused', str(e)) |
193 | |
194 | def test_urlfetch_timeout_after_listen(self): |
195 | @@ -237,6 +246,7 @@ class TestTimeout(TestCase): |
196 | sock, http_server_url = self.make_test_socket() |
197 | sock.listen(1) |
198 | stop_event = threading.Event() |
199 | + |
200 | def slow_reply(): |
201 | (client_sock, client_addr) = sock.accept() |
202 | content = 'You are veeeeryyy patient!' |
203 | @@ -252,12 +262,15 @@ class TestTimeout(TestCase): |
204 | if stop_event.wait(0.05): |
205 | break |
206 | client_sock.close() |
207 | + |
208 | slow_thread = threading.Thread(target=slow_reply) |
209 | slow_thread.start() |
210 | saved_threads = set(threading.enumerate()) |
211 | self.assertRaises(TimeoutError, urlfetch, http_server_url) |
212 | - # Note that the cleanup also takes care of leaving no worker thread behind. |
213 | - remaining_threads = set(threading.enumerate()).difference(saved_threads) |
214 | + # Note that the cleanup also takes care of leaving no worker thread |
215 | + # behind. |
216 | + remaining_threads = set( |
217 | + threading.enumerate()).difference(saved_threads) |
218 | self.assertEqual(set(), remaining_threads) |
219 | stop_event.set() |
220 | slow_thread.join() |
221 | @@ -266,6 +279,7 @@ class TestTimeout(TestCase): |
222 | """When the request succeeds, the result content is returned.""" |
223 | sock, http_server_url = self.make_test_socket() |
224 | sock.listen(1) |
225 | + |
226 | def success_result(): |
227 | (client_sock, client_addr) = sock.accept() |
228 | client_sock.sendall(dedent("""\ |
229 | @@ -275,17 +289,20 @@ class TestTimeout(TestCase): |
230 | |
231 | Success.""")) |
232 | client_sock.close() |
233 | + |
234 | t = threading.Thread(target=success_result) |
235 | t.start() |
236 | self.assertEqual('Success.', urlfetch(http_server_url)) |
237 | t.join() |
238 | |
239 | - def test_urlfetch_only_supports_http_urls(self): |
240 | - """urlfetch() only supports http urls:""" |
241 | + def test_urlfetch_does_not_support_ftp_urls(self): |
242 | + """urlfetch() does not support ftp urls.""" |
243 | set_default_timeout_function(lambda: 1) |
244 | self.addCleanup(set_default_timeout_function, None) |
245 | - e = self.assertRaises(AssertionError, urlfetch, 'ftp://localhost') |
246 | - self.assertEqual('only http is supported.', str(e)) |
247 | + url = 'ftp://localhost/' |
248 | + e = self.assertRaises(InvalidSchema, urlfetch, url) |
249 | + self.assertEqual( |
250 | + "No connection adapters were found for '%s'" % url, str(e)) |
251 | |
252 | def test_xmlrpc_transport(self): |
253 | """ Another use case for timeouts is communicating with external |
254 | diff --git a/lib/lp/services/timeout.py b/lib/lp/services/timeout.py |
255 | index 877f276..d52c597 100644 |
256 | --- a/lib/lp/services/timeout.py |
257 | +++ b/lib/lp/services/timeout.py |
258 | @@ -1,4 +1,4 @@ |
259 | -# Copyright 2009 Canonical Ltd. This software is licensed under the |
260 | +# Copyright 2009-2015 Canonical Ltd. This software is licensed under the |
261 | # GNU Affero General Public License version 3 (see the file LICENSE). |
262 | |
263 | """Helpers to time out external operations.""" |
264 | @@ -14,16 +14,32 @@ __all__ = [ |
265 | "with_timeout", |
266 | ] |
267 | |
268 | -import httplib |
269 | import socket |
270 | import sys |
271 | -from threading import Thread |
272 | -import urllib2 |
273 | +from threading import ( |
274 | + Lock, |
275 | + Thread, |
276 | + ) |
277 | from xmlrpclib import ( |
278 | SafeTransport, |
279 | Transport, |
280 | ) |
281 | |
282 | +from requests import Session |
283 | +from requests.adapters import ( |
284 | + DEFAULT_POOLBLOCK, |
285 | + HTTPAdapter, |
286 | + ) |
287 | +from requests.packages.urllib3.connectionpool import ( |
288 | + HTTPConnectionPool, |
289 | + HTTPSConnectionPool, |
290 | + ) |
291 | +from requests.packages.urllib3.exceptions import ClosedPoolError |
292 | +from requests.packages.urllib3.poolmanager import ( |
293 | + PoolManager, |
294 | + SSL_KEYWORDS, |
295 | + ) |
296 | + |
297 | |
298 | default_timeout_function = None |
299 | |
300 | @@ -145,47 +161,113 @@ class with_timeout: |
301 | return call_with_timeout |
302 | |
303 | |
304 | -class CleanableHTTPHandler(urllib2.HTTPHandler): |
305 | - """Subclass of `urllib2.HTTPHandler` that can be cleaned-up.""" |
306 | +class CleanableConnectionPoolMixin: |
307 | + """Enhance urllib3's connection pools to support forced socket cleanup.""" |
308 | |
309 | - def http_open(self, req): |
310 | - """See `urllib2.HTTPHandler`.""" |
311 | - def connection_factory(*args, **kwargs): |
312 | - """Save the created connection so that we can clean it up.""" |
313 | - self.__conn = httplib.HTTPConnection(*args, **kwargs) |
314 | - return self.__conn |
315 | - return self.do_open(connection_factory, req) |
316 | + def __init__(self, *args, **kwargs): |
317 | + super(CleanableConnectionPoolMixin, self).__init__(*args, **kwargs) |
318 | + self._all_connections = [] |
319 | + self._all_connections_mutex = Lock() |
320 | |
321 | - def reset_connection(self): |
322 | - """Reset the underlying HTTP connection.""" |
323 | + def _new_conn(self): |
324 | + self._all_connections_mutex.acquire() |
325 | try: |
326 | - self.__conn.sock.shutdown(socket.SHUT_RDWR) |
327 | - except AttributeError: |
328 | - # It's possible that the other thread closed the socket |
329 | - # beforehand. |
330 | - pass |
331 | - self.__conn.close() |
332 | + if self._all_connections is None: |
333 | + raise ClosedPoolError(self, "Pool is closed.") |
334 | + conn = super(CleanableConnectionPoolMixin, self)._new_conn() |
335 | + self._all_connections.append(conn) |
336 | + return conn |
337 | + finally: |
338 | + self._all_connections_mutex.release() |
339 | + |
340 | + def close(self): |
341 | + self._all_connections_mutex.acquire() |
342 | + try: |
343 | + if self._all_connections is None: |
344 | + return |
345 | + for conn in self._all_connections: |
346 | + sock = getattr(conn, "sock", None) |
347 | + if sock is not None: |
348 | + sock.shutdown(socket.SHUT_RDWR) |
349 | + sock.close() |
350 | + conn.sock = None |
351 | + self._all_connections = None |
352 | + finally: |
353 | + self._all_connections_mutex.release() |
354 | + super(CleanableConnectionPoolMixin, self).close() |
355 | + |
356 | + |
357 | +class CleanableHTTPConnectionPool( |
358 | + CleanableConnectionPoolMixin, HTTPConnectionPool): |
359 | + pass |
360 | + |
361 | + |
362 | +class CleanableHTTPSConnectionPool( |
363 | + CleanableConnectionPoolMixin, HTTPSConnectionPool): |
364 | + pass |
365 | + |
366 | + |
367 | +cleanable_pool_classes_by_scheme = { |
368 | + "http": CleanableHTTPConnectionPool, |
369 | + "https": CleanableHTTPSConnectionPool, |
370 | + } |
371 | + |
372 | + |
373 | +class CleanablePoolManager(PoolManager): |
374 | + """A version of urllib3's PoolManager supporting forced socket cleanup.""" |
375 | + |
376 | + # XXX cjwatson 2015-03-11: Reimplements PoolManager._new_pool; check |
377 | + # this when upgrading requests. |
378 | + def _new_pool(self, scheme, host, port): |
379 | + if scheme not in cleanable_pool_classes_by_scheme: |
380 | + raise ValueError("Unhandled scheme: %s" % scheme) |
381 | + pool_cls = cleanable_pool_classes_by_scheme[scheme] |
382 | + if scheme == 'http': |
383 | + kwargs = self.connection_pool_kw.copy() |
384 | + for kw in SSL_KEYWORDS: |
385 | + kwargs.pop(kw, None) |
386 | + |
387 | + return pool_cls(host, port, **kwargs) |
388 | + |
389 | + |
390 | +class CleanableHTTPAdapter(HTTPAdapter): |
391 | + """Enhance HTTPAdapter to use CleanablePoolManager.""" |
392 | + |
393 | + # XXX cjwatson 2015-03-11: Reimplements HTTPAdapter.init_poolmanager; |
394 | + # check this when upgrading requests. |
395 | + def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK): |
396 | + # save these values for pickling |
397 | + self._pool_connections = connections |
398 | + self._pool_maxsize = maxsize |
399 | + self._pool_block = block |
400 | + |
401 | + self.poolmanager = CleanablePoolManager( |
402 | + num_pools=connections, maxsize=maxsize, block=block) |
403 | |
404 | |
405 | class URLFetcher: |
406 | """Object fetching remote URLs with a time out.""" |
407 | |
408 | @with_timeout(cleanup='cleanup') |
409 | - def fetch(self, url, data=None): |
410 | + def fetch(self, url, **request_kwargs): |
411 | """Fetch the URL using a custom HTTP handler supporting timeout.""" |
412 | - assert url.startswith('http://'), "only http is supported." |
413 | - self.handler = CleanableHTTPHandler() |
414 | - opener = urllib2.build_opener(self.handler) |
415 | - return opener.open(url, data).read() |
416 | + request_kwargs.setdefault("method", "GET") |
417 | + self.session = Session() |
418 | + # Don't honour things like environment proxy configuration. |
419 | + self.session.trust_env = False |
420 | + # Mount our custom adapters. |
421 | + self.session.mount("https://", CleanableHTTPAdapter()) |
422 | + self.session.mount("http://", CleanableHTTPAdapter()) |
423 | + return self.session.request(url=url, **request_kwargs).content |
424 | |
425 | def cleanup(self): |
426 | """Reset the connection when the operation timed out.""" |
427 | - self.handler.reset_connection() |
428 | + self.session.close() |
429 | |
430 | |
431 | -def urlfetch(url, data=None): |
432 | - """Wrapper for `urllib2.urlopen()` that times out.""" |
433 | - return URLFetcher().fetch(url, data) |
434 | +def urlfetch(url, **request_kwargs): |
435 | + """Wrapper for `requests.get()` that times out.""" |
436 | + return URLFetcher().fetch(url, **request_kwargs) |
437 | |
438 | |
439 | class TransportWithTimeout(Transport): |