Merge ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests into launchpad:master

Proposed by William Grant
Status: Merged
Merged at revision: 59ad69fc9edb62336f98e3883e91341cb4876d58
Proposed branch: ~canonical-launchpad-branches/launchpad/+git/bzr-personal:cjwatson/timeout-with-requests
Merge into: launchpad:master
Diff against target: 439 lines (+167/-60)
3 files modified
lib/lp/services/googlesearch/doc/google-searchservice.txt (+8/-0)
lib/lp/services/tests/test_timeout.py (+47/-30)
lib/lp/services/timeout.py (+112/-30)
Reviewer Review Type Date Requested Status
Canonical Launchpad Branches Pending
Review via email: mp+264221@code.launchpad.net
To post a comment you must log in.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/services/googlesearch/doc/google-searchservice.txt b/lib/lp/services/googlesearch/doc/google-searchservice.txt
2index 88c5646..17763b5 100644
3--- a/lib/lp/services/googlesearch/doc/google-searchservice.txt
4+++ b/lib/lp/services/googlesearch/doc/google-searchservice.txt
5@@ -6,6 +6,12 @@ The GoogleSearchService is a Google Custom Service Business Edition
6 (cs-be) client. Given one or more terms, it will retrieve an XML
7 summary of the matching launchpad.net pages.
8
9+We silence logging of new HTTP connections from requests throughout.
10+
11+ >>> from fixtures import FakeLogger
12+ >>> logger = FakeLogger()
13+ >>> logger.setUp()
14+
15
16 GoogleSearchService
17 ===================
18@@ -616,3 +622,5 @@ cases a TimeoutError is issued.
19 # Restore the configuration and the timeout state.
20 >>> timeout_data = config.pop('timeout_data')
21 >>> set_default_timeout_function(old_timeout_function)
22+
23+ >>> logger.cleanUp()
24diff --git a/lib/lp/services/tests/test_timeout.py b/lib/lp/services/tests/test_timeout.py
25index 6a20797..e34d7de 100644
26--- a/lib/lp/services/tests/test_timeout.py
27+++ b/lib/lp/services/tests/test_timeout.py
28@@ -1,4 +1,4 @@
29-# Copyright 2012 Canonical Ltd. This software is licensed under the
30+# Copyright 2012-2015 Canonical Ltd. This software is licensed under the
31 # GNU Affero General Public License version 3 (see the file LICENSE).
32
33 """timeout.py tests.
34@@ -6,7 +6,6 @@
35
36 __metaclass__ = type
37
38-from cStringIO import StringIO
39 from SimpleXMLRPCServer import (
40 SimpleXMLRPCRequestHandler,
41 SimpleXMLRPCServer,
42@@ -14,13 +13,13 @@ from SimpleXMLRPCServer import (
43 import socket
44 from textwrap import dedent
45 import threading
46-import time
47-import urllib2
48 import xmlrpclib
49
50-from zope.interface import implements
51+from requests.exceptions import (
52+ ConnectionError,
53+ InvalidSchema,
54+ )
55
56-from lp.services.log.logger import FakeLogger
57 from lp.services.timeout import (
58 get_default_timeout_function,
59 set_default_timeout_function,
60@@ -30,11 +29,11 @@ from lp.services.timeout import (
61 with_timeout,
62 )
63 from lp.testing import TestCase
64-from lp.testing.layers import BaseLayer
65
66
67 @with_timeout()
68-def no_default_timeout(): pass
69+def no_default_timeout():
70+ pass
71
72
73 class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
74@@ -54,10 +53,12 @@ class EchoOrWaitXMLRPCReqHandler(SimpleXMLRPCRequestHandler):
75 class MySimpleXMLRPCServer(SimpleXMLRPCServer):
76 """Create a simple XMLRPC server to listen for requests."""
77 allow_reuse_address = True
78+
79 def serve_2_requests(self):
80 for i in range(2):
81 self.handle_request()
82 self.server_close()
83+
84 def handle_error(self, request, address):
85 pass
86
87@@ -69,11 +70,13 @@ class TestTimeout(TestCase):
88 finishes before the supplied timeout, it should function normally.
89 """
90 wait_evt = threading.Event()
91+
92 @with_timeout(timeout=0.5)
93 def wait_100ms():
94 """Function that waits for a supplied number of seconds."""
95 wait_evt.wait(0.1)
96 return "Succeeded."
97+
98 self.assertEqual("Succeeded.", wait_100ms())
99
100 def test_timeout_overrun(self):
101@@ -87,11 +90,13 @@ class TestTimeout(TestCase):
102 # inform us that it is about to exit.
103 wait_evt = threading.Event()
104 stopping_evt = threading.Event()
105+
106 @with_timeout(timeout=0.5)
107 def wait_for_event():
108 """Function that waits for a supplied number of seconds."""
109 wait_evt.wait()
110 stopping_evt.set()
111+
112 self.assertRaises(TimeoutError, wait_for_event)
113 wait_evt.set()
114 stopping_evt.wait()
115@@ -115,6 +120,7 @@ class TestTimeout(TestCase):
116 socket.setdefaulttimeout(5)
117 sockets = socket.socketpair()
118 closed = []
119+
120 def close_socket():
121 closed.append(True)
122 sockets[0].shutdown(socket.SHUT_RDWR)
123@@ -152,16 +158,17 @@ class TestTimeout(TestCase):
124 method."""
125 def do_definition():
126 @with_timeout(cleanup='not_a_method', timeout=0.5)
127- def a_function(): pass
128+ def a_function():
129+ pass
130 self.assertRaises(TypeError, do_definition)
131
132 def test_timeout_uses_default(self):
133- """If the timeout parameter isn't provided, it will default to the value
134- returned by the function installed as "default_timeout_function". A
135- function is used because it's useful for the timeout value to be
136- determined dynamically. For example, if you want to limit the
137- overall processing to 30s and you already did 14s, you want that timeout
138- to be 16s.
139+ """If the timeout parameter isn't provided, it will default to the
140+ value returned by the function installed as
141+ "default_timeout_function". A function is used because it's useful
142+ for the timeout value to be determined dynamically. For example, if
143+ you want to limit the overall processing to 30s and you already did
144+ 14s, you want that timeout to be 16s.
145
146 By default, there is no default_timeout_function.
147 """
148@@ -177,23 +184,25 @@ class TestTimeout(TestCase):
149 str(e))
150
151 def test_set_default_timeout(self):
152- """the set_default_timeout_function() takes a function that should return
153- the number of seconds to wait.
154+ """The set_default_timeout_function() takes a function that should
155+ return the number of seconds to wait.
156 """
157 using_default = []
158+
159 def my_default_timeout():
160 using_default.append(True)
161 return 1
162+
163 set_default_timeout_function(my_default_timeout)
164 self.addCleanup(set_default_timeout_function, None)
165 no_default_timeout()
166 self.assertEqual([True], using_default)
167
168 def make_test_socket(self):
169- """One common use case for timing out is when making an HTTP request to
170- an external site to fetch content. To this end, the timeout module has
171- a urlfetch() function that retrieve a URL using custom urllib2 handlers
172- that will timeout using the default timeout function and clean-up the
173+ """One common use case for timing out is when making an HTTP request
174+ to an external site to fetch content. To this end, the timeout
175+ module has a urlfetch() function that retrieves a URL in such a way
176+ as to timeout using the default timeout function and clean-up the
177 socket properly.
178 """
179 sock = socket.socket()
180@@ -206,11 +215,11 @@ class TestTimeout(TestCase):
181 http_server_url = 'http://%s:%d/' % sock.getsockname()
182 return sock, http_server_url
183
184- def test_urlfetch_raises_urllib2_exceptions(self):
185- """Normal urllib2 exceptions are raised."""
186+ def test_urlfetch_raises_requests_exceptions(self):
187+ """Normal requests exceptions are raised."""
188 sock, http_server_url = self.make_test_socket()
189
190- e = self.assertRaises(urllib2.URLError, urlfetch, http_server_url)
191+ e = self.assertRaises(ConnectionError, urlfetch, http_server_url)
192 self.assertIn('Connection refused', str(e))
193
194 def test_urlfetch_timeout_after_listen(self):
195@@ -237,6 +246,7 @@ class TestTimeout(TestCase):
196 sock, http_server_url = self.make_test_socket()
197 sock.listen(1)
198 stop_event = threading.Event()
199+
200 def slow_reply():
201 (client_sock, client_addr) = sock.accept()
202 content = 'You are veeeeryyy patient!'
203@@ -252,12 +262,15 @@ class TestTimeout(TestCase):
204 if stop_event.wait(0.05):
205 break
206 client_sock.close()
207+
208 slow_thread = threading.Thread(target=slow_reply)
209 slow_thread.start()
210 saved_threads = set(threading.enumerate())
211 self.assertRaises(TimeoutError, urlfetch, http_server_url)
212- # Note that the cleanup also takes care of leaving no worker thread behind.
213- remaining_threads = set(threading.enumerate()).difference(saved_threads)
214+ # Note that the cleanup also takes care of leaving no worker thread
215+ # behind.
216+ remaining_threads = set(
217+ threading.enumerate()).difference(saved_threads)
218 self.assertEqual(set(), remaining_threads)
219 stop_event.set()
220 slow_thread.join()
221@@ -266,6 +279,7 @@ class TestTimeout(TestCase):
222 """When the request succeeds, the result content is returned."""
223 sock, http_server_url = self.make_test_socket()
224 sock.listen(1)
225+
226 def success_result():
227 (client_sock, client_addr) = sock.accept()
228 client_sock.sendall(dedent("""\
229@@ -275,17 +289,20 @@ class TestTimeout(TestCase):
230
231 Success."""))
232 client_sock.close()
233+
234 t = threading.Thread(target=success_result)
235 t.start()
236 self.assertEqual('Success.', urlfetch(http_server_url))
237 t.join()
238
239- def test_urlfetch_only_supports_http_urls(self):
240- """urlfetch() only supports http urls:"""
241+ def test_urlfetch_does_not_support_ftp_urls(self):
242+ """urlfetch() does not support ftp urls."""
243 set_default_timeout_function(lambda: 1)
244 self.addCleanup(set_default_timeout_function, None)
245- e = self.assertRaises(AssertionError, urlfetch, 'ftp://localhost')
246- self.assertEqual('only http is supported.', str(e))
247+ url = 'ftp://localhost/'
248+ e = self.assertRaises(InvalidSchema, urlfetch, url)
249+ self.assertEqual(
250+ "No connection adapters were found for '%s'" % url, str(e))
251
252 def test_xmlrpc_transport(self):
253 """ Another use case for timeouts is communicating with external
254diff --git a/lib/lp/services/timeout.py b/lib/lp/services/timeout.py
255index 877f276..d52c597 100644
256--- a/lib/lp/services/timeout.py
257+++ b/lib/lp/services/timeout.py
258@@ -1,4 +1,4 @@
259-# Copyright 2009 Canonical Ltd. This software is licensed under the
260+# Copyright 2009-2015 Canonical Ltd. This software is licensed under the
261 # GNU Affero General Public License version 3 (see the file LICENSE).
262
263 """Helpers to time out external operations."""
264@@ -14,16 +14,32 @@ __all__ = [
265 "with_timeout",
266 ]
267
268-import httplib
269 import socket
270 import sys
271-from threading import Thread
272-import urllib2
273+from threading import (
274+ Lock,
275+ Thread,
276+ )
277 from xmlrpclib import (
278 SafeTransport,
279 Transport,
280 )
281
282+from requests import Session
283+from requests.adapters import (
284+ DEFAULT_POOLBLOCK,
285+ HTTPAdapter,
286+ )
287+from requests.packages.urllib3.connectionpool import (
288+ HTTPConnectionPool,
289+ HTTPSConnectionPool,
290+ )
291+from requests.packages.urllib3.exceptions import ClosedPoolError
292+from requests.packages.urllib3.poolmanager import (
293+ PoolManager,
294+ SSL_KEYWORDS,
295+ )
296+
297
298 default_timeout_function = None
299
300@@ -145,47 +161,113 @@ class with_timeout:
301 return call_with_timeout
302
303
304-class CleanableHTTPHandler(urllib2.HTTPHandler):
305- """Subclass of `urllib2.HTTPHandler` that can be cleaned-up."""
306+class CleanableConnectionPoolMixin:
307+ """Enhance urllib3's connection pools to support forced socket cleanup."""
308
309- def http_open(self, req):
310- """See `urllib2.HTTPHandler`."""
311- def connection_factory(*args, **kwargs):
312- """Save the created connection so that we can clean it up."""
313- self.__conn = httplib.HTTPConnection(*args, **kwargs)
314- return self.__conn
315- return self.do_open(connection_factory, req)
316+ def __init__(self, *args, **kwargs):
317+ super(CleanableConnectionPoolMixin, self).__init__(*args, **kwargs)
318+ self._all_connections = []
319+ self._all_connections_mutex = Lock()
320
321- def reset_connection(self):
322- """Reset the underlying HTTP connection."""
323+ def _new_conn(self):
324+ self._all_connections_mutex.acquire()
325 try:
326- self.__conn.sock.shutdown(socket.SHUT_RDWR)
327- except AttributeError:
328- # It's possible that the other thread closed the socket
329- # beforehand.
330- pass
331- self.__conn.close()
332+ if self._all_connections is None:
333+ raise ClosedPoolError(self, "Pool is closed.")
334+ conn = super(CleanableConnectionPoolMixin, self)._new_conn()
335+ self._all_connections.append(conn)
336+ return conn
337+ finally:
338+ self._all_connections_mutex.release()
339+
340+ def close(self):
341+ self._all_connections_mutex.acquire()
342+ try:
343+ if self._all_connections is None:
344+ return
345+ for conn in self._all_connections:
346+ sock = getattr(conn, "sock", None)
347+ if sock is not None:
348+ sock.shutdown(socket.SHUT_RDWR)
349+ sock.close()
350+ conn.sock = None
351+ self._all_connections = None
352+ finally:
353+ self._all_connections_mutex.release()
354+ super(CleanableConnectionPoolMixin, self).close()
355+
356+
357+class CleanableHTTPConnectionPool(
358+ CleanableConnectionPoolMixin, HTTPConnectionPool):
359+ pass
360+
361+
362+class CleanableHTTPSConnectionPool(
363+ CleanableConnectionPoolMixin, HTTPSConnectionPool):
364+ pass
365+
366+
367+cleanable_pool_classes_by_scheme = {
368+ "http": CleanableHTTPConnectionPool,
369+ "https": CleanableHTTPSConnectionPool,
370+ }
371+
372+
373+class CleanablePoolManager(PoolManager):
374+ """A version of urllib3's PoolManager supporting forced socket cleanup."""
375+
376+ # XXX cjwatson 2015-03-11: Reimplements PoolManager._new_pool; check
377+ # this when upgrading requests.
378+ def _new_pool(self, scheme, host, port):
379+ if scheme not in cleanable_pool_classes_by_scheme:
380+ raise ValueError("Unhandled scheme: %s" % scheme)
381+ pool_cls = cleanable_pool_classes_by_scheme[scheme]
382+ if scheme == 'http':
383+ kwargs = self.connection_pool_kw.copy()
384+ for kw in SSL_KEYWORDS:
385+ kwargs.pop(kw, None)
386+
387+ return pool_cls(host, port, **kwargs)
388+
389+
390+class CleanableHTTPAdapter(HTTPAdapter):
391+ """Enhance HTTPAdapter to use CleanablePoolManager."""
392+
393+ # XXX cjwatson 2015-03-11: Reimplements HTTPAdapter.init_poolmanager;
394+ # check this when upgrading requests.
395+ def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK):
396+ # save these values for pickling
397+ self._pool_connections = connections
398+ self._pool_maxsize = maxsize
399+ self._pool_block = block
400+
401+ self.poolmanager = CleanablePoolManager(
402+ num_pools=connections, maxsize=maxsize, block=block)
403
404
405 class URLFetcher:
406 """Object fetching remote URLs with a time out."""
407
408 @with_timeout(cleanup='cleanup')
409- def fetch(self, url, data=None):
410+ def fetch(self, url, **request_kwargs):
411 """Fetch the URL using a custom HTTP handler supporting timeout."""
412- assert url.startswith('http://'), "only http is supported."
413- self.handler = CleanableHTTPHandler()
414- opener = urllib2.build_opener(self.handler)
415- return opener.open(url, data).read()
416+ request_kwargs.setdefault("method", "GET")
417+ self.session = Session()
418+ # Don't honour things like environment proxy configuration.
419+ self.session.trust_env = False
420+ # Mount our custom adapters.
421+ self.session.mount("https://", CleanableHTTPAdapter())
422+ self.session.mount("http://", CleanableHTTPAdapter())
423+ return self.session.request(url=url, **request_kwargs).content
424
425 def cleanup(self):
426 """Reset the connection when the operation timed out."""
427- self.handler.reset_connection()
428+ self.session.close()
429
430
431-def urlfetch(url, data=None):
432- """Wrapper for `urllib2.urlopen()` that times out."""
433- return URLFetcher().fetch(url, data)
434+def urlfetch(url, **request_kwargs):
435+ """Wrapper for `requests.get()` that times out."""
436+ return URLFetcher().fetch(url, **request_kwargs)
437
438
439 class TransportWithTimeout(Transport):

Subscribers

People subscribed via source and target branches

to status/vote changes: