Merge ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests into launchpad:master
- Git
- lp:~canonical-launchpad-branches/launchpad/+git/bzr-personal
- cjwatson/timeout-with-requests
- Merge into master
Proposed by
William Grant
Status: | Merged |
---|---|
Merged at revision: | 59ad69fc9edb62336f98e3883e91341cb4876d58 |
Proposed branch: | ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests |
Merge into: | launchpad:master |
Diff against target: |
439 lines (+167/-60) 3 files modified
lib/lp/services/googlesearch/doc/google-searchservice.txt (+8/-0) lib/lp/services/tests/test_timeout.py (+47/-30) lib/lp/services/timeout.py (+112/-30) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Canonical Launchpad Branches | Pending | ||
Review via email:
|
Commit message
Description of the change
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/lib/lp/services/googlesearch/doc/google-searchservice.txt b/lib/lp/services/googlesearch/doc/google-searchservice.txt | |||
2 | index 88c5646..17763b5 100644 | |||
3 | --- a/lib/lp/services/googlesearch/doc/google-searchservice.txt | |||
4 | +++ b/lib/lp/services/googlesearch/doc/google-searchservice.txt | |||
5 | @@ -6,6 +6,12 @@ The GoogleSearchService is a Google Custom Service Business Edition | |||
6 | 6 | (cs-be) client. Given one or more terms, it will retrieve an XML | 6 | (cs-be) client. Given one or more terms, it will retrieve an XML |
7 | 7 | summary of the matching launchpad.net pages. | 7 | summary of the matching launchpad.net pages. |
8 | 8 | 8 | ||
9 | 9 | We silence logging of new HTTP connections from requests throughout. | ||
10 | 10 | |||
11 | 11 | >>> from fixtures import FakeLogger | ||
12 | 12 | >>> logger = FakeLogger() | ||
13 | 13 | >>> logger.setUp() | ||
14 | 14 | |||
15 | 9 | 15 | ||
16 | 10 | GoogleSearchService | 16 | GoogleSearchService |
17 | 11 | =================== | 17 | =================== |
18 | @@ -616,3 +622,5 @@ cases a TimeoutError is issued. | |||
19 | 616 | # Restore the configuration and the timeout state. | 622 | # Restore the configuration and the timeout state. |
20 | 617 | >>> timeout_data = config.pop('timeout_data') | 623 | >>> timeout_data = config.pop('timeout_data') |
21 | 618 | >>> set_default_timeout_function(old_timeout_function) | 624 | >>> set_default_timeout_function(old_timeout_function) |
22 | 625 | |||
23 | 626 | >>> logger.cleanUp() | ||
24 | diff --git a/lib/lp/services/tests/test_timeout.py b/lib/lp/services/tests/test_timeout.py | |||
25 | index 6a20797..e34d7de 100644 | |||
26 | --- a/lib/lp/services/tests/test_timeout.py | |||
27 | +++ b/lib/lp/services/tests/test_timeout.py | |||
28 | @@ -1,4 +1,4 @@ | |||
30 | 1 | # Copyright 2012 Canonical Ltd. This software is licensed under the | 1 | # Copyright 2012-2015 Canonical Ltd. This software is licensed under the |
31 | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). |
32 | 3 | 3 | ||
33 | 4 | """timeout.py tests. | 4 | """timeout.py tests. |
34 | @@ -6,7 +6,6 @@ | |||
35 | 6 | 6 | ||
36 | 7 | __metaclass__ = type | 7 | __metaclass__ = type |
37 | 8 | 8 | ||
38 | 9 | from cStringIO import StringIO | ||
39 | 10 | from SimpleXMLRPCServer import ( | 9 | from SimpleXMLRPCServer import ( |
40 | 11 | SimpleXMLRPCRequestHandler, | 10 | SimpleXMLRPCRequestHandler, |
41 | 12 | SimpleXMLRPCServer, | 11 | SimpleXMLRPCServer, |
42 | @@ -14,13 +13,13 @@ from SimpleXMLRPCServer import ( | |||
43 | 14 | import socket | 13 | import socket |
44 | 15 | from textwrap import dedent | 14 | from textwrap import dedent |
45 | 16 | import threading | 15 | import threading |
46 | 17 | import time | ||
47 | 18 | import urllib2 | ||
48 | 19 | import xmlrpclib | 16 | import xmlrpclib |
49 | 20 | 17 | ||
51 | 21 | from zope.interface import implements | 18 | from requests.exceptions import ( |
52 | 19 | ConnectionError, | ||
53 | 20 | InvalidSchema, | ||
54 | 21 | ) | ||
55 | 22 | 22 | ||
56 | 23 | from lp.services.log.logger import FakeLogger | ||
57 | 24 | from lp.services.timeout import ( | 23 | from lp.services.timeout import ( |
58 | 25 | get_default_timeout_function, | 24 | get_default_timeout_function, |
59 | 26 | set_default_timeout_function, | 25 | set_default_timeout_function, |
60 | @@ -30,11 +29,11 @@ from lp.services.timeout import ( | |||
61 | 30 | with_timeout, | 29 | with_timeout, |
62 | 31 | ) | 30 | ) |
63 | 32 | from lp.testing import TestCase | 31 | from lp.testing import TestCase |
64 | 33 | from lp.testing.layers import BaseLayer | ||
65 | 34 | 32 | ||
66 | 35 | 33 | ||
67 | 36 | @with_timeout() | 34 | @with_timeout() |
69 | 37 | def no_default_timeout(): pass | 35 | def no_default_timeout(): |
70 | 36 | pass | ||
71 | 38 | 37 | ||
72 | 39 | 38 | ||
73 | 40 | class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler): | 39 | class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler): |
74 | @@ -54,10 +53,12 @@ class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler): | |||
75 | 54 | class MySimpleXMLRPCServer(SimpleXMLRPCServer): | 53 | class MySimpleXMLRPCServer(SimpleXMLRPCServer): |
76 | 55 | """Create a simple XMLRPC server to listen for requests.""" | 54 | """Create a simple XMLRPC server to listen for requests.""" |
77 | 56 | allow_reuse_address = True | 55 | allow_reuse_address = True |
78 | 56 | |||
79 | 57 | def serve_2_requests(self): | 57 | def serve_2_requests(self): |
80 | 58 | for i in range(2): | 58 | for i in range(2): |
81 | 59 | self.handle_request() | 59 | self.handle_request() |
82 | 60 | self.server_close() | 60 | self.server_close() |
83 | 61 | |||
84 | 61 | def handle_error(self, request, address): | 62 | def handle_error(self, request, address): |
85 | 62 | pass | 63 | pass |
86 | 63 | 64 | ||
87 | @@ -69,11 +70,13 @@ class TestTimeout(TestCase): | |||
88 | 69 | finishes before the supplied timeout, it should function normally. | 70 | finishes before the supplied timeout, it should function normally. |
89 | 70 | """ | 71 | """ |
90 | 71 | wait_evt = threading.Event() | 72 | wait_evt = threading.Event() |
91 | 73 | |||
92 | 72 | @with_timeout(timeout=0.5) | 74 | @with_timeout(timeout=0.5) |
93 | 73 | def wait_100ms(): | 75 | def wait_100ms(): |
94 | 74 | """Function that waits for a supplied number of seconds.""" | 76 | """Function that waits for a supplied number of seconds.""" |
95 | 75 | wait_evt.wait(0.1) | 77 | wait_evt.wait(0.1) |
96 | 76 | return "Succeeded." | 78 | return "Succeeded." |
97 | 79 | |||
98 | 77 | self.assertEqual("Succeeded.", wait_100ms()) | 80 | self.assertEqual("Succeeded.", wait_100ms()) |
99 | 78 | 81 | ||
100 | 79 | def test_timeout_overrun(self): | 82 | def test_timeout_overrun(self): |
101 | @@ -87,11 +90,13 @@ class TestTimeout(TestCase): | |||
102 | 87 | # inform us that it is about to exit. | 90 | # inform us that it is about to exit. |
103 | 88 | wait_evt = threading.Event() | 91 | wait_evt = threading.Event() |
104 | 89 | stopping_evt = threading.Event() | 92 | stopping_evt = threading.Event() |
105 | 93 | |||
106 | 90 | @with_timeout(timeout=0.5) | 94 | @with_timeout(timeout=0.5) |
107 | 91 | def wait_for_event(): | 95 | def wait_for_event(): |
108 | 92 | """Function that waits for a supplied number of seconds.""" | 96 | """Function that waits for a supplied number of seconds.""" |
109 | 93 | wait_evt.wait() | 97 | wait_evt.wait() |
110 | 94 | stopping_evt.set() | 98 | stopping_evt.set() |
111 | 99 | |||
112 | 95 | self.assertRaises(TimeoutError, wait_for_event) | 100 | self.assertRaises(TimeoutError, wait_for_event) |
113 | 96 | wait_evt.set() | 101 | wait_evt.set() |
114 | 97 | stopping_evt.wait() | 102 | stopping_evt.wait() |
115 | @@ -115,6 +120,7 @@ class TestTimeout(TestCase): | |||
116 | 115 | socket.setdefaulttimeout(5) | 120 | socket.setdefaulttimeout(5) |
117 | 116 | sockets = socket.socketpair() | 121 | sockets = socket.socketpair() |
118 | 117 | closed = [] | 122 | closed = [] |
119 | 123 | |||
120 | 118 | def close_socket(): | 124 | def close_socket(): |
121 | 119 | closed.append(True) | 125 | closed.append(True) |
122 | 120 | sockets[0].shutdown(socket.SHUT_RDWR) | 126 | sockets[0].shutdown(socket.SHUT_RDWR) |
123 | @@ -152,16 +158,17 @@ class TestTimeout(TestCase): | |||
124 | 152 | method.""" | 158 | method.""" |
125 | 153 | def do_definition(): | 159 | def do_definition(): |
126 | 154 | @with_timeout(cleanup='not_a_method', timeout=0.5) | 160 | @with_timeout(cleanup='not_a_method', timeout=0.5) |
128 | 155 | def a_function(): pass | 161 | def a_function(): |
129 | 162 | pass | ||
130 | 156 | self.assertRaises(TypeError, do_definition) | 163 | self.assertRaises(TypeError, do_definition) |
131 | 157 | 164 | ||
132 | 158 | def test_timeout_uses_default(self): | 165 | def test_timeout_uses_default(self): |
139 | 159 | """If the timeout parameter isn't provided, it will default to the value | 166 | """If the timeout parameter isn't provided, it will default to the |
140 | 160 | returned by the function installed as "default_timeout_function". A | 167 | value returned by the function installed as |
141 | 161 | function is used because it's useful for the timeout value to be | 168 | "default_timeout_function". A function is used because it's useful |
142 | 162 | determined dynamically. For example, if you want to limit the | 169 | for the timeout value to be determined dynamically. For example, if |
143 | 163 | overall processing to 30s and you already did 14s, you want that timeout | 170 | you want to limit the overall processing to 30s and you already did |
144 | 164 | to be 16s. | 171 | 14s, you want that timeout to be 16s. |
145 | 165 | 172 | ||
146 | 166 | By default, there is no default_timeout_function. | 173 | By default, there is no default_timeout_function. |
147 | 167 | """ | 174 | """ |
148 | @@ -177,23 +184,25 @@ class TestTimeout(TestCase): | |||
149 | 177 | str(e)) | 184 | str(e)) |
150 | 178 | 185 | ||
151 | 179 | def test_set_default_timeout(self): | 186 | def test_set_default_timeout(self): |
154 | 180 | """the set_default_timeout_function() takes a function that should return | 187 | """The set_default_timeout_function() takes a function that should |
155 | 181 | the number of seconds to wait. | 188 | return the number of seconds to wait. |
156 | 182 | """ | 189 | """ |
157 | 183 | using_default = [] | 190 | using_default = [] |
158 | 191 | |||
159 | 184 | def my_default_timeout(): | 192 | def my_default_timeout(): |
160 | 185 | using_default.append(True) | 193 | using_default.append(True) |
161 | 186 | return 1 | 194 | return 1 |
162 | 195 | |||
163 | 187 | set_default_timeout_function(my_default_timeout) | 196 | set_default_timeout_function(my_default_timeout) |
164 | 188 | self.addCleanup(set_default_timeout_function, None) | 197 | self.addCleanup(set_default_timeout_function, None) |
165 | 189 | no_default_timeout() | 198 | no_default_timeout() |
166 | 190 | self.assertEqual([True], using_default) | 199 | self.assertEqual([True], using_default) |
167 | 191 | 200 | ||
168 | 192 | def make_test_socket(self): | 201 | def make_test_socket(self): |
173 | 193 | """One common use case for timing out is when making an HTTP request to | 202 | """One common use case for timing out is when making an HTTP request |
174 | 194 | an external site to fetch content. To this end, the timeout module has | 203 | to an external site to fetch content. To this end, the timeout |
175 | 195 | a urlfetch() function that retrieve a URL using custom urllib2 handlers | 204 | module has a urlfetch() function that retrieves a URL in such a way |
176 | 196 | that will timeout using the default timeout function and clean-up the | 205 | as to timeout using the default timeout function and clean-up the |
177 | 197 | socket properly. | 206 | socket properly. |
178 | 198 | """ | 207 | """ |
179 | 199 | sock = socket.socket() | 208 | sock = socket.socket() |
180 | @@ -206,11 +215,11 @@ class TestTimeout(TestCase): | |||
181 | 206 | http_server_url = 'http://%s:%d/' % sock.getsockname() | 215 | http_server_url = 'http://%s:%d/' % sock.getsockname() |
182 | 207 | return sock, http_server_url | 216 | return sock, http_server_url |
183 | 208 | 217 | ||
186 | 209 | def test_urlfetch_raises_urllib2_exceptions(self): | 218 | def test_urlfetch_raises_requests_exceptions(self): |
187 | 210 | """Normal urllib2 exceptions are raised.""" | 219 | """Normal requests exceptions are raised.""" |
188 | 211 | sock, http_server_url = self.make_test_socket() | 220 | sock, http_server_url = self.make_test_socket() |
189 | 212 | 221 | ||
191 | 213 | e = self.assertRaises(urllib2.URLError, urlfetch, http_server_url) | 222 | e = self.assertRaises(ConnectionError, urlfetch, http_server_url) |
192 | 214 | self.assertIn('Connection refused', str(e)) | 223 | self.assertIn('Connection refused', str(e)) |
193 | 215 | 224 | ||
194 | 216 | def test_urlfetch_timeout_after_listen(self): | 225 | def test_urlfetch_timeout_after_listen(self): |
195 | @@ -237,6 +246,7 @@ class TestTimeout(TestCase): | |||
196 | 237 | sock, http_server_url = self.make_test_socket() | 246 | sock, http_server_url = self.make_test_socket() |
197 | 238 | sock.listen(1) | 247 | sock.listen(1) |
198 | 239 | stop_event = threading.Event() | 248 | stop_event = threading.Event() |
199 | 249 | |||
200 | 240 | def slow_reply(): | 250 | def slow_reply(): |
201 | 241 | (client_sock, client_addr) = sock.accept() | 251 | (client_sock, client_addr) = sock.accept() |
202 | 242 | content = 'You are veeeeryyy patient!' | 252 | content = 'You are veeeeryyy patient!' |
203 | @@ -252,12 +262,15 @@ class TestTimeout(TestCase): | |||
204 | 252 | if stop_event.wait(0.05): | 262 | if stop_event.wait(0.05): |
205 | 253 | break | 263 | break |
206 | 254 | client_sock.close() | 264 | client_sock.close() |
207 | 265 | |||
208 | 255 | slow_thread = threading.Thread(target=slow_reply) | 266 | slow_thread = threading.Thread(target=slow_reply) |
209 | 256 | slow_thread.start() | 267 | slow_thread.start() |
210 | 257 | saved_threads = set(threading.enumerate()) | 268 | saved_threads = set(threading.enumerate()) |
211 | 258 | self.assertRaises(TimeoutError, urlfetch, http_server_url) | 269 | self.assertRaises(TimeoutError, urlfetch, http_server_url) |
214 | 259 | # Note that the cleanup also takes care of leaving no worker thread behind. | 270 | # Note that the cleanup also takes care of leaving no worker thread |
215 | 260 | remaining_threads = set(threading.enumerate()).difference(saved_threads) | 271 | # behind. |
216 | 272 | remaining_threads = set( | ||
217 | 273 | threading.enumerate()).difference(saved_threads) | ||
218 | 261 | self.assertEqual(set(), remaining_threads) | 274 | self.assertEqual(set(), remaining_threads) |
219 | 262 | stop_event.set() | 275 | stop_event.set() |
220 | 263 | slow_thread.join() | 276 | slow_thread.join() |
221 | @@ -266,6 +279,7 @@ class TestTimeout(TestCase): | |||
222 | 266 | """When the request succeeds, the result content is returned.""" | 279 | """When the request succeeds, the result content is returned.""" |
223 | 267 | sock, http_server_url = self.make_test_socket() | 280 | sock, http_server_url = self.make_test_socket() |
224 | 268 | sock.listen(1) | 281 | sock.listen(1) |
225 | 282 | |||
226 | 269 | def success_result(): | 283 | def success_result(): |
227 | 270 | (client_sock, client_addr) = sock.accept() | 284 | (client_sock, client_addr) = sock.accept() |
228 | 271 | client_sock.sendall(dedent("""\ | 285 | client_sock.sendall(dedent("""\ |
229 | @@ -275,17 +289,20 @@ class TestTimeout(TestCase): | |||
230 | 275 | 289 | ||
231 | 276 | Success.""")) | 290 | Success.""")) |
232 | 277 | client_sock.close() | 291 | client_sock.close() |
233 | 292 | |||
234 | 278 | t = threading.Thread(target=success_result) | 293 | t = threading.Thread(target=success_result) |
235 | 279 | t.start() | 294 | t.start() |
236 | 280 | self.assertEqual('Success.', urlfetch(http_server_url)) | 295 | self.assertEqual('Success.', urlfetch(http_server_url)) |
237 | 281 | t.join() | 296 | t.join() |
238 | 282 | 297 | ||
241 | 283 | def test_urlfetch_only_supports_http_urls(self): | 298 | def test_urlfetch_does_not_support_ftp_urls(self): |
242 | 284 | """urlfetch() only supports http urls:""" | 299 | """urlfetch() does not support ftp urls.""" |
243 | 285 | set_default_timeout_function(lambda: 1) | 300 | set_default_timeout_function(lambda: 1) |
244 | 286 | self.addCleanup(set_default_timeout_function, None) | 301 | self.addCleanup(set_default_timeout_function, None) |
247 | 287 | e = self.assertRaises(AssertionError, urlfetch, 'ftp://localhost') | 302 | url = 'ftp://localhost/' |
248 | 288 | self.assertEqual('only http is supported.', str(e)) | 303 | e = self.assertRaises(InvalidSchema, urlfetch, url) |
249 | 304 | self.assertEqual( | ||
250 | 305 | "No connection adapters were found for '%s'" % url, str(e)) | ||
251 | 289 | 306 | ||
252 | 290 | def test_xmlrpc_transport(self): | 307 | def test_xmlrpc_transport(self): |
253 | 291 | """ Another use case for timeouts is communicating with external | 308 | """ Another use case for timeouts is communicating with external |
254 | diff --git a/lib/lp/services/timeout.py b/lib/lp/services/timeout.py | |||
255 | index 877f276..d52c597 100644 | |||
256 | --- a/lib/lp/services/timeout.py | |||
257 | +++ b/lib/lp/services/timeout.py | |||
258 | @@ -1,4 +1,4 @@ | |||
260 | 1 | # Copyright 2009 Canonical Ltd. This software is licensed under the | 1 | # Copyright 2009-2015 Canonical Ltd. This software is licensed under the |
261 | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). |
262 | 3 | 3 | ||
263 | 4 | """Helpers to time out external operations.""" | 4 | """Helpers to time out external operations.""" |
264 | @@ -14,16 +14,32 @@ __all__ = [ | |||
265 | 14 | "with_timeout", | 14 | "with_timeout", |
266 | 15 | ] | 15 | ] |
267 | 16 | 16 | ||
268 | 17 | import httplib | ||
269 | 18 | import socket | 17 | import socket |
270 | 19 | import sys | 18 | import sys |
273 | 20 | from threading import Thread | 19 | from threading import ( |
274 | 21 | import urllib2 | 20 | Lock, |
275 | 21 | Thread, | ||
276 | 22 | ) | ||
277 | 22 | from xmlrpclib import ( | 23 | from xmlrpclib import ( |
278 | 23 | SafeTransport, | 24 | SafeTransport, |
279 | 24 | Transport, | 25 | Transport, |
280 | 25 | ) | 26 | ) |
281 | 26 | 27 | ||
282 | 28 | from requests import Session | ||
283 | 29 | from requests.adapters import ( | ||
284 | 30 | DEFAULT_POOLBLOCK, | ||
285 | 31 | HTTPAdapter, | ||
286 | 32 | ) | ||
287 | 33 | from requests.packages.urllib3.connectionpool import ( | ||
288 | 34 | HTTPConnectionPool, | ||
289 | 35 | HTTPSConnectionPool, | ||
290 | 36 | ) | ||
291 | 37 | from requests.packages.urllib3.exceptions import ClosedPoolError | ||
292 | 38 | from requests.packages.urllib3.poolmanager import ( | ||
293 | 39 | PoolManager, | ||
294 | 40 | SSL_KEYWORDS, | ||
295 | 41 | ) | ||
296 | 42 | |||
297 | 27 | 43 | ||
298 | 28 | default_timeout_function = None | 44 | default_timeout_function = None |
299 | 29 | 45 | ||
300 | @@ -145,47 +161,113 @@ class with_timeout: | |||
301 | 145 | return call_with_timeout | 161 | return call_with_timeout |
302 | 146 | 162 | ||
303 | 147 | 163 | ||
306 | 148 | class CleanableHTTPHandler(urllib2.HTTPHandler): | 164 | class CleanableConnectionPoolMixin: |
307 | 149 | """Subclass of `urllib2.HTTPHandler` that can be cleaned-up.""" | 165 | """Enhance urllib3's connection pools to support forced socket cleanup.""" |
308 | 150 | 166 | ||
316 | 151 | def http_open(self, req): | 167 | def __init__(self, *args, **kwargs): |
317 | 152 | """See `urllib2.HTTPHandler`.""" | 168 | super(CleanableConnectionPoolMixin, self).__init__(*args, **kwargs) |
318 | 153 | def connection_factory(*args, **kwargs): | 169 | self._all_connections = [] |
319 | 154 | """Save the created connection so that we can clean it up.""" | 170 | self._all_connections_mutex = Lock() |
313 | 155 | self.__conn = httplib.HTTPConnection(*args, **kwargs) | ||
314 | 156 | return self.__conn | ||
315 | 157 | return self.do_open(connection_factory, req) | ||
320 | 158 | 171 | ||
323 | 159 | def reset_connection(self): | 172 | def _new_conn(self): |
324 | 160 | """Reset the underlying HTTP connection.""" | 173 | self._all_connections_mutex.acquire() |
325 | 161 | try: | 174 | try: |
332 | 162 | self.__conn.sock.shutdown(socket.SHUT_RDWR) | 175 | if self._all_connections is None: |
333 | 163 | except AttributeError: | 176 | raise ClosedPoolError(self, "Pool is closed.") |
334 | 164 | # It's possible that the other thread closed the socket | 177 | conn = super(CleanableConnectionPoolMixin, self)._new_conn() |
335 | 165 | # beforehand. | 178 | self._all_connections.append(conn) |
336 | 166 | pass | 179 | return conn |
337 | 167 | self.__conn.close() | 180 | finally: |
338 | 181 | self._all_connections_mutex.release() | ||
339 | 182 | |||
340 | 183 | def close(self): | ||
341 | 184 | self._all_connections_mutex.acquire() | ||
342 | 185 | try: | ||
343 | 186 | if self._all_connections is None: | ||
344 | 187 | return | ||
345 | 188 | for conn in self._all_connections: | ||
346 | 189 | sock = getattr(conn, "sock", None) | ||
347 | 190 | if sock is not None: | ||
348 | 191 | sock.shutdown(socket.SHUT_RDWR) | ||
349 | 192 | sock.close() | ||
350 | 193 | conn.sock = None | ||
351 | 194 | self._all_connections = None | ||
352 | 195 | finally: | ||
353 | 196 | self._all_connections_mutex.release() | ||
354 | 197 | super(CleanableConnectionPoolMixin, self).close() | ||
355 | 198 | |||
356 | 199 | |||
357 | 200 | class CleanableHTTPConnectionPool( | ||
358 | 201 | CleanableConnectionPoolMixin, HTTPConnectionPool): | ||
359 | 202 | pass | ||
360 | 203 | |||
361 | 204 | |||
362 | 205 | class CleanableHTTPSConnectionPool( | ||
363 | 206 | CleanableConnectionPoolMixin, HTTPSConnectionPool): | ||
364 | 207 | pass | ||
365 | 208 | |||
366 | 209 | |||
367 | 210 | cleanable_pool_classes_by_scheme = { | ||
368 | 211 | "http": CleanableHTTPConnectionPool, | ||
369 | 212 | "https": CleanableHTTPSConnectionPool, | ||
370 | 213 | } | ||
371 | 214 | |||
372 | 215 | |||
373 | 216 | class CleanablePoolManager(PoolManager): | ||
374 | 217 | """A version of urllib3's PoolManager supporting forced socket cleanup.""" | ||
375 | 218 | |||
376 | 219 | # XXX cjwatson 2015-03-11: Reimplements PoolManager._new_pool; check | ||
377 | 220 | # this when upgrading requests. | ||
378 | 221 | def _new_pool(self, scheme, host, port): | ||
379 | 222 | if scheme not in cleanable_pool_classes_by_scheme: | ||
380 | 223 | raise ValueError("Unhandled scheme: %s" % scheme) | ||
381 | 224 | pool_cls = cleanable_pool_classes_by_scheme[scheme] | ||
382 | 225 | if scheme == 'http': | ||
383 | 226 | kwargs = self.connection_pool_kw.copy() | ||
384 | 227 | for kw in SSL_KEYWORDS: | ||
385 | 228 | kwargs.pop(kw, None) | ||
386 | 229 | |||
387 | 230 | return pool_cls(host, port, **kwargs) | ||
388 | 231 | |||
389 | 232 | |||
390 | 233 | class CleanableHTTPAdapter(HTTPAdapter): | ||
391 | 234 | """Enhance HTTPAdapter to use CleanablePoolManager.""" | ||
392 | 235 | |||
393 | 236 | # XXX cjwatson 2015-03-11: Reimplements HTTPAdapter.init_poolmanager; | ||
394 | 237 | # check this when upgrading requests. | ||
395 | 238 | def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK): | ||
396 | 239 | # save these values for pickling | ||
397 | 240 | self._pool_connections = connections | ||
398 | 241 | self._pool_maxsize = maxsize | ||
399 | 242 | self._pool_block = block | ||
400 | 243 | |||
401 | 244 | self.poolmanager = CleanablePoolManager( | ||
402 | 245 | num_pools=connections, maxsize=maxsize, block=block) | ||
403 | 168 | 246 | ||
404 | 169 | 247 | ||
405 | 170 | class URLFetcher: | 248 | class URLFetcher: |
406 | 171 | """Object fetching remote URLs with a time out.""" | 249 | """Object fetching remote URLs with a time out.""" |
407 | 172 | 250 | ||
408 | 173 | @with_timeout(cleanup='cleanup') | 251 | @with_timeout(cleanup='cleanup') |
410 | 174 | def fetch(self, url, data=None): | 252 | def fetch(self, url, **request_kwargs): |
411 | 175 | """Fetch the URL using a custom HTTP handler supporting timeout.""" | 253 | """Fetch the URL using a custom HTTP handler supporting timeout.""" |
416 | 176 | assert url.startswith('http://'), "only http is supported." | 254 | request_kwargs.setdefault("method", "GET") |
417 | 177 | self.handler = CleanableHTTPHandler() | 255 | self.session = Session() |
418 | 178 | opener = urllib2.build_opener(self.handler) | 256 | # Don't honour things like environment proxy configuration. |
419 | 179 | return opener.open(url, data).read() | 257 | self.session.trust_env = False |
420 | 258 | # Mount our custom adapters. | ||
421 | 259 | self.session.mount("https://", CleanableHTTPAdapter()) | ||
422 | 260 | self.session.mount("http://", CleanableHTTPAdapter()) | ||
423 | 261 | return self.session.request(url=url, **request_kwargs).content | ||
424 | 180 | 262 | ||
425 | 181 | def cleanup(self): | 263 | def cleanup(self): |
426 | 182 | """Reset the connection when the operation timed out.""" | 264 | """Reset the connection when the operation timed out.""" |
428 | 183 | self.handler.reset_connection() | 265 | self.session.close() |
429 | 184 | 266 | ||
430 | 185 | 267 | ||
434 | 186 | def urlfetch(url, data=None): | 268 | def urlfetch(url, **request_kwargs): |
435 | 187 | """Wrapper for `urllib2.urlopen()` that times out.""" | 269 | """Wrapper for `requests.get()` that times out.""" |
436 | 188 | return URLFetcher().fetch(url, data) | 270 | return URLFetcher().fetch(url, **request_kwargs) |
437 | 189 | 271 | ||
438 | 190 | 272 | ||
439 | 191 | class TransportWithTimeout(Transport): | 273 | class TransportWithTimeout(Transport): |