Merge ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests into launchpad:master

Proposed by William Grant
Status: Merged
Merged at revision: 59ad69fc9edb62336f98e3883e91341cb4876d58
Proposed branch: ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests
Merge into: launchpad:master
Diff against target: 439 lines (+167/-60)
3 files modified
lib/lp/services/googlesearch/doc/google-searchservice.txt (+8/-0)
lib/lp/services/tests/test_timeout.py (+47/-30)
lib/lp/services/timeout.py (+112/-30)
Reviewer Review Type Date Requested Status
Canonical Launchpad Branches Pending
Review via email: mp+264221@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/lib/lp/services/googlesearch/doc/google-searchservice.txt b/lib/lp/services/googlesearch/doc/google-searchservice.txt
index 88c5646..17763b5 100644
--- a/lib/lp/services/googlesearch/doc/google-searchservice.txt
+++ b/lib/lp/services/googlesearch/doc/google-searchservice.txt
@@ -6,6 +6,12 @@ The GoogleSearchService is a Google Custom Service Business Edition
6(cs-be) client. Given one or more terms, it will retrieve an XML6(cs-be) client. Given one or more terms, it will retrieve an XML
7summary of the matching launchpad.net pages.7summary of the matching launchpad.net pages.
88
9We silence logging of new HTTP connections from requests throughout.
10
11 >>> from fixtures import FakeLogger
12 >>> logger = FakeLogger()
13 >>> logger.setUp()
14
915
10GoogleSearchService16GoogleSearchService
11===================17===================
@@ -616,3 +622,5 @@ cases a TimeoutError is issued.
616 # Restore the configuration and the timeout state.622 # Restore the configuration and the timeout state.
617 >>> timeout_data = config.pop('timeout_data')623 >>> timeout_data = config.pop('timeout_data')
618 >>> set_default_timeout_function(old_timeout_function)624 >>> set_default_timeout_function(old_timeout_function)
625
626 >>> logger.cleanUp()
diff --git a/lib/lp/services/tests/test_timeout.py b/lib/lp/services/tests/test_timeout.py
index 6a20797..e34d7de 100644
--- a/lib/lp/services/tests/test_timeout.py
+++ b/lib/lp/services/tests/test_timeout.py
@@ -1,4 +1,4 @@
1# Copyright 2012 Canonical Ltd. This software is licensed under the1# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""timeout.py tests.4"""timeout.py tests.
@@ -6,7 +6,6 @@
66
7__metaclass__ = type7__metaclass__ = type
88
9from cStringIO import StringIO
10from SimpleXMLRPCServer import (9from SimpleXMLRPCServer import (
11 SimpleXMLRPCRequestHandler,10 SimpleXMLRPCRequestHandler,
12 SimpleXMLRPCServer,11 SimpleXMLRPCServer,
@@ -14,13 +13,13 @@ from SimpleXMLRPCServer import (
14import socket13import socket
15from textwrap import dedent14from textwrap import dedent
16import threading15import threading
17import time
18import urllib2
19import xmlrpclib16import xmlrpclib
2017
21from zope.interface import implements18from requests.exceptions import (
19 ConnectionError,
20 InvalidSchema,
21 )
2222
23from lp.services.log.logger import FakeLogger
24from lp.services.timeout import (23from lp.services.timeout import (
25 get_default_timeout_function,24 get_default_timeout_function,
26 set_default_timeout_function,25 set_default_timeout_function,
@@ -30,11 +29,11 @@ from lp.services.timeout import (
30 with_timeout,29 with_timeout,
31 )30 )
32from lp.testing import TestCase31from lp.testing import TestCase
33from lp.testing.layers import BaseLayer
3432
3533
36@with_timeout()34@with_timeout()
37def no_default_timeout(): pass35def no_default_timeout():
36 pass
3837
3938
40class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):39class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
@@ -54,10 +53,12 @@ class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
54class MySimpleXMLRPCServer(SimpleXMLRPCServer):53class MySimpleXMLRPCServer(SimpleXMLRPCServer):
55 """Create a simple XMLRPC server to listen for requests."""54 """Create a simple XMLRPC server to listen for requests."""
56 allow_reuse_address = True55 allow_reuse_address = True
56
57 def serve_2_requests(self):57 def serve_2_requests(self):
58 for i in range(2):58 for i in range(2):
59 self.handle_request()59 self.handle_request()
60 self.server_close()60 self.server_close()
61
61 def handle_error(self, request, address):62 def handle_error(self, request, address):
62 pass63 pass
6364
@@ -69,11 +70,13 @@ class TestTimeout(TestCase):
69 finishes before the supplied timeout, it should function normally.70 finishes before the supplied timeout, it should function normally.
70 """71 """
71 wait_evt = threading.Event()72 wait_evt = threading.Event()
73
72 @with_timeout(timeout=0.5)74 @with_timeout(timeout=0.5)
73 def wait_100ms():75 def wait_100ms():
74 """Function that waits for a supplied number of seconds."""76 """Function that waits for a supplied number of seconds."""
75 wait_evt.wait(0.1)77 wait_evt.wait(0.1)
76 return "Succeeded."78 return "Succeeded."
79
77 self.assertEqual("Succeeded.", wait_100ms())80 self.assertEqual("Succeeded.", wait_100ms())
7881
79 def test_timeout_overrun(self):82 def test_timeout_overrun(self):
@@ -87,11 +90,13 @@ class TestTimeout(TestCase):
87 # inform us that it is about to exit.90 # inform us that it is about to exit.
88 wait_evt = threading.Event()91 wait_evt = threading.Event()
89 stopping_evt = threading.Event()92 stopping_evt = threading.Event()
93
90 @with_timeout(timeout=0.5)94 @with_timeout(timeout=0.5)
91 def wait_for_event():95 def wait_for_event():
92 """Function that waits for a supplied number of seconds."""96 """Function that waits for a supplied number of seconds."""
93 wait_evt.wait()97 wait_evt.wait()
94 stopping_evt.set()98 stopping_evt.set()
99
95 self.assertRaises(TimeoutError, wait_for_event)100 self.assertRaises(TimeoutError, wait_for_event)
96 wait_evt.set()101 wait_evt.set()
97 stopping_evt.wait()102 stopping_evt.wait()
@@ -115,6 +120,7 @@ class TestTimeout(TestCase):
115 socket.setdefaulttimeout(5)120 socket.setdefaulttimeout(5)
116 sockets = socket.socketpair()121 sockets = socket.socketpair()
117 closed = []122 closed = []
123
118 def close_socket():124 def close_socket():
119 closed.append(True)125 closed.append(True)
120 sockets[0].shutdown(socket.SHUT_RDWR)126 sockets[0].shutdown(socket.SHUT_RDWR)
@@ -152,16 +158,17 @@ class TestTimeout(TestCase):
152 method."""158 method."""
153 def do_definition():159 def do_definition():
154 @with_timeout(cleanup='not_a_method', timeout=0.5)160 @with_timeout(cleanup='not_a_method', timeout=0.5)
155 def a_function(): pass161 def a_function():
162 pass
156 self.assertRaises(TypeError, do_definition)163 self.assertRaises(TypeError, do_definition)
157164
158 def test_timeout_uses_default(self):165 def test_timeout_uses_default(self):
159 """If the timeout parameter isn't provided, it will default to the value166 """If the timeout parameter isn't provided, it will default to the
160 returned by the function installed as "default_timeout_function". A167 value returned by the function installed as
161 function is used because it's useful for the timeout value to be168 "default_timeout_function". A function is used because it's useful
162 determined dynamically. For example, if you want to limit the169 for the timeout value to be determined dynamically. For example, if
163 overall processing to 30s and you already did 14s, you want that timeout170 you want to limit the overall processing to 30s and you already did
164 to be 16s.171 14s, you want that timeout to be 16s.
165172
166 By default, there is no default_timeout_function.173 By default, there is no default_timeout_function.
167 """174 """
@@ -177,23 +184,25 @@ class TestTimeout(TestCase):
177 str(e))184 str(e))
178185
179 def test_set_default_timeout(self):186 def test_set_default_timeout(self):
180 """the set_default_timeout_function() takes a function that should return187 """The set_default_timeout_function() takes a function that should
181 the number of seconds to wait.188 return the number of seconds to wait.
182 """189 """
183 using_default = []190 using_default = []
191
184 def my_default_timeout():192 def my_default_timeout():
185 using_default.append(True)193 using_default.append(True)
186 return 1194 return 1
195
187 set_default_timeout_function(my_default_timeout)196 set_default_timeout_function(my_default_timeout)
188 self.addCleanup(set_default_timeout_function, None)197 self.addCleanup(set_default_timeout_function, None)
189 no_default_timeout()198 no_default_timeout()
190 self.assertEqual([True], using_default)199 self.assertEqual([True], using_default)
191200
192 def make_test_socket(self):201 def make_test_socket(self):
193 """One common use case for timing out is when making an HTTP request to202 """One common use case for timing out is when making an HTTP request
194 an external site to fetch content. To this end, the timeout module has203 to an external site to fetch content. To this end, the timeout
195 a urlfetch() function that retrieve a URL using custom urllib2 handlers204 module has a urlfetch() function that retrieves a URL in such a way
196 that will timeout using the default timeout function and clean-up the205 as to timeout using the default timeout function and clean-up the
197 socket properly.206 socket properly.
198 """207 """
199 sock = socket.socket()208 sock = socket.socket()
@@ -206,11 +215,11 @@ class TestTimeout(TestCase):
206 http_server_url = 'http://%s:%d/' % sock.getsockname()215 http_server_url = 'http://%s:%d/' % sock.getsockname()
207 return sock, http_server_url216 return sock, http_server_url
208217
209 def test_urlfetch_raises_urllib2_exceptions(self):218 def test_urlfetch_raises_requests_exceptions(self):
210 """Normal urllib2 exceptions are raised."""219 """Normal requests exceptions are raised."""
211 sock, http_server_url = self.make_test_socket()220 sock, http_server_url = self.make_test_socket()
212221
213 e = self.assertRaises(urllib2.URLError, urlfetch, http_server_url)222 e = self.assertRaises(ConnectionError, urlfetch, http_server_url)
214 self.assertIn('Connection refused', str(e))223 self.assertIn('Connection refused', str(e))
215224
216 def test_urlfetch_timeout_after_listen(self):225 def test_urlfetch_timeout_after_listen(self):
@@ -237,6 +246,7 @@ class TestTimeout(TestCase):
237 sock, http_server_url = self.make_test_socket()246 sock, http_server_url = self.make_test_socket()
238 sock.listen(1)247 sock.listen(1)
239 stop_event = threading.Event()248 stop_event = threading.Event()
249
240 def slow_reply():250 def slow_reply():
241 (client_sock, client_addr) = sock.accept()251 (client_sock, client_addr) = sock.accept()
242 content = 'You are veeeeryyy patient!'252 content = 'You are veeeeryyy patient!'
@@ -252,12 +262,15 @@ class TestTimeout(TestCase):
252 if stop_event.wait(0.05):262 if stop_event.wait(0.05):
253 break263 break
254 client_sock.close()264 client_sock.close()
265
255 slow_thread = threading.Thread(target=slow_reply)266 slow_thread = threading.Thread(target=slow_reply)
256 slow_thread.start()267 slow_thread.start()
257 saved_threads = set(threading.enumerate())268 saved_threads = set(threading.enumerate())
258 self.assertRaises(TimeoutError, urlfetch, http_server_url)269 self.assertRaises(TimeoutError, urlfetch, http_server_url)
259 # Note that the cleanup also takes care of leaving no worker thread behind.270 # Note that the cleanup also takes care of leaving no worker thread
260 remaining_threads = set(threading.enumerate()).difference(saved_threads)271 # behind.
272 remaining_threads = set(
273 threading.enumerate()).difference(saved_threads)
261 self.assertEqual(set(), remaining_threads)274 self.assertEqual(set(), remaining_threads)
262 stop_event.set()275 stop_event.set()
263 slow_thread.join()276 slow_thread.join()
@@ -266,6 +279,7 @@ class TestTimeout(TestCase):
266 """When the request succeeds, the result content is returned."""279 """When the request succeeds, the result content is returned."""
267 sock, http_server_url = self.make_test_socket()280 sock, http_server_url = self.make_test_socket()
268 sock.listen(1)281 sock.listen(1)
282
269 def success_result():283 def success_result():
270 (client_sock, client_addr) = sock.accept()284 (client_sock, client_addr) = sock.accept()
271 client_sock.sendall(dedent("""\285 client_sock.sendall(dedent("""\
@@ -275,17 +289,20 @@ class TestTimeout(TestCase):
275289
276 Success."""))290 Success."""))
277 client_sock.close()291 client_sock.close()
292
278 t = threading.Thread(target=success_result)293 t = threading.Thread(target=success_result)
279 t.start()294 t.start()
280 self.assertEqual('Success.', urlfetch(http_server_url))295 self.assertEqual('Success.', urlfetch(http_server_url))
281 t.join()296 t.join()
282297
283 def test_urlfetch_only_supports_http_urls(self):298 def test_urlfetch_does_not_support_ftp_urls(self):
284 """urlfetch() only supports http urls:"""299 """urlfetch() does not support ftp urls."""
285 set_default_timeout_function(lambda: 1)300 set_default_timeout_function(lambda: 1)
286 self.addCleanup(set_default_timeout_function, None)301 self.addCleanup(set_default_timeout_function, None)
287 e = self.assertRaises(AssertionError, urlfetch, 'ftp://localhost')302 url = 'ftp://localhost/'
288 self.assertEqual('only http is supported.', str(e))303 e = self.assertRaises(InvalidSchema, urlfetch, url)
304 self.assertEqual(
305 "No connection adapters were found for '%s'" % url, str(e))
289306
290 def test_xmlrpc_transport(self):307 def test_xmlrpc_transport(self):
291 """ Another use case for timeouts is communicating with external308 """ Another use case for timeouts is communicating with external
diff --git a/lib/lp/services/timeout.py b/lib/lp/services/timeout.py
index 877f276..d52c597 100644
--- a/lib/lp/services/timeout.py
+++ b/lib/lp/services/timeout.py
@@ -1,4 +1,4 @@
1# Copyright 2009 Canonical Ltd. This software is licensed under the1# Copyright 2009-2015 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""Helpers to time out external operations."""4"""Helpers to time out external operations."""
@@ -14,16 +14,32 @@ __all__ = [
14 "with_timeout",14 "with_timeout",
15 ]15 ]
1616
17import httplib
18import socket17import socket
19import sys18import sys
20from threading import Thread19from threading import (
21import urllib220 Lock,
21 Thread,
22 )
22from xmlrpclib import (23from xmlrpclib import (
23 SafeTransport,24 SafeTransport,
24 Transport,25 Transport,
25 )26 )
2627
28from requests import Session
29from requests.adapters import (
30 DEFAULT_POOLBLOCK,
31 HTTPAdapter,
32 )
33from requests.packages.urllib3.connectionpool import (
34 HTTPConnectionPool,
35 HTTPSConnectionPool,
36 )
37from requests.packages.urllib3.exceptions import ClosedPoolError
38from requests.packages.urllib3.poolmanager import (
39 PoolManager,
40 SSL_KEYWORDS,
41 )
42
2743
28default_timeout_function = None44default_timeout_function = None
2945
@@ -145,47 +161,113 @@ class with_timeout:
145 return call_with_timeout161 return call_with_timeout
146162
147163
148class CleanableHTTPHandler(urllib2.HTTPHandler):164class CleanableConnectionPoolMixin:
149 """Subclass of `urllib2.HTTPHandler` that can be cleaned-up."""165 """Enhance urllib3's connection pools to support forced socket cleanup."""
150166
151 def http_open(self, req):167 def __init__(self, *args, **kwargs):
152 """See `urllib2.HTTPHandler`."""168 super(CleanableConnectionPoolMixin, self).__init__(*args, **kwargs)
153 def connection_factory(*args, **kwargs):169 self._all_connections = []
154 """Save the created connection so that we can clean it up."""170 self._all_connections_mutex = Lock()
155 self.__conn = httplib.HTTPConnection(*args, **kwargs)
156 return self.__conn
157 return self.do_open(connection_factory, req)
158171
159 def reset_connection(self):172 def _new_conn(self):
160 """Reset the underlying HTTP connection."""173 self._all_connections_mutex.acquire()
161 try:174 try:
162 self.__conn.sock.shutdown(socket.SHUT_RDWR)175 if self._all_connections is None:
163 except AttributeError:176 raise ClosedPoolError(self, "Pool is closed.")
164 # It's possible that the other thread closed the socket177 conn = super(CleanableConnectionPoolMixin, self)._new_conn()
165 # beforehand.178 self._all_connections.append(conn)
166 pass179 return conn
167 self.__conn.close()180 finally:
181 self._all_connections_mutex.release()
182
183 def close(self):
184 self._all_connections_mutex.acquire()
185 try:
186 if self._all_connections is None:
187 return
188 for conn in self._all_connections:
189 sock = getattr(conn, "sock", None)
190 if sock is not None:
191 sock.shutdown(socket.SHUT_RDWR)
192 sock.close()
193 conn.sock = None
194 self._all_connections = None
195 finally:
196 self._all_connections_mutex.release()
197 super(CleanableConnectionPoolMixin, self).close()
198
199
200class CleanableHTTPConnectionPool(
201 CleanableConnectionPoolMixin, HTTPConnectionPool):
202 pass
203
204
205class CleanableHTTPSConnectionPool(
206 CleanableConnectionPoolMixin, HTTPSConnectionPool):
207 pass
208
209
210cleanable_pool_classes_by_scheme = {
211 "http": CleanableHTTPConnectionPool,
212 "https": CleanableHTTPSConnectionPool,
213 }
214
215
216class CleanablePoolManager(PoolManager):
217 """A version of urllib3's PoolManager supporting forced socket cleanup."""
218
219 # XXX cjwatson 2015-03-11: Reimplements PoolManager._new_pool; check
220 # this when upgrading requests.
221 def _new_pool(self, scheme, host, port):
222 if scheme not in cleanable_pool_classes_by_scheme:
223 raise ValueError("Unhandled scheme: %s" % scheme)
224 pool_cls = cleanable_pool_classes_by_scheme[scheme]
225 if scheme == 'http':
226 kwargs = self.connection_pool_kw.copy()
227 for kw in SSL_KEYWORDS:
228 kwargs.pop(kw, None)
229
230 return pool_cls(host, port, **kwargs)
231
232
233class CleanableHTTPAdapter(HTTPAdapter):
234 """Enhance HTTPAdapter to use CleanablePoolManager."""
235
236 # XXX cjwatson 2015-03-11: Reimplements HTTPAdapter.init_poolmanager;
237 # check this when upgrading requests.
238 def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK):
239 # save these values for pickling
240 self._pool_connections = connections
241 self._pool_maxsize = maxsize
242 self._pool_block = block
243
244 self.poolmanager = CleanablePoolManager(
245 num_pools=connections, maxsize=maxsize, block=block)
168246
169247
170class URLFetcher:248class URLFetcher:
171 """Object fetching remote URLs with a time out."""249 """Object fetching remote URLs with a time out."""
172250
173 @with_timeout(cleanup='cleanup')251 @with_timeout(cleanup='cleanup')
174 def fetch(self, url, data=None):252 def fetch(self, url, **request_kwargs):
175 """Fetch the URL using a custom HTTP handler supporting timeout."""253 """Fetch the URL using a custom HTTP handler supporting timeout."""
176 assert url.startswith('http://'), "only http is supported."254 request_kwargs.setdefault("method", "GET")
177 self.handler = CleanableHTTPHandler()255 self.session = Session()
178 opener = urllib2.build_opener(self.handler)256 # Don't honour things like environment proxy configuration.
179 return opener.open(url, data).read()257 self.session.trust_env = False
258 # Mount our custom adapters.
259 self.session.mount("https://", CleanableHTTPAdapter())
260 self.session.mount("http://", CleanableHTTPAdapter())
261 return self.session.request(url=url, **request_kwargs).content
180262
181 def cleanup(self):263 def cleanup(self):
182 """Reset the connection when the operation timed out."""264 """Reset the connection when the operation timed out."""
183 self.handler.reset_connection()265 self.session.close()
184266
185267
186def urlfetch(url, data=None):268def urlfetch(url, **request_kwargs):
187 """Wrapper for `urllib2.urlopen()` that times out."""269 """Wrapper for `requests.get()` that times out."""
188 return URLFetcher().fetch(url, data)270 return URLFetcher().fetch(url, **request_kwargs)
189271
190272
191class TransportWithTimeout(Transport):273class TransportWithTimeout(Transport):