Merge lp:~cjwatson/launchpad/buildmaster-twisted-agent into lp:launchpad

Proposed by Colin Watson
Status: Merged
Merged at revision: 18068
Proposed branch: lp:~cjwatson/launchpad/buildmaster-twisted-agent
Merge into: lp:launchpad
Diff against target: 363 lines (+189/-20)
5 files modified
daemons/buildd-manager.tac (+13/-4)
lib/lp/buildmaster/interactor.py (+112/-10)
lib/lp/buildmaster/tests/mock_slaves.py (+3/-3)
lib/lp/buildmaster/tests/test_interactor.py (+53/-3)
lib/lp/services/config/schema-lazr.conf (+8/-0)
To merge this branch: bzr merge lp:~cjwatson/launchpad/buildmaster-twisted-agent
Reviewer Review Type Date Requested Status
William Grant code Approve
Celso Providelo (community) Approve
Review via email: mp+295593@code.launchpad.net

Commit message

Convert BuilderSlave to twisted.web.client.Agent, causing it to use a connection pool rather than trying to download everything at once.

Description of the change

Convert BuilderSlave to twisted.web.client.Agent, causing it to use a connection pool rather than trying to download everything at once. The old approach could overflow open-file limits.

We'll need to give this a good workout on dogfood.

To post a comment you must log in.
Revision history for this message
Celso Providelo (cprov) wrote :

Thanks Colin, great solution!

From http://twistedmatrix.com/documents/15.2.0/web/howto/client.html#multiple-connections-to-the-same-server it seems that it will sustain at most 2 simultaneous downloads from each slave. Is that reasonable and sufficient ?

review: Approve
Revision history for this message
William Grant (wgrant) :
review: Approve (code)
Revision history for this message
Colin Watson (cjwatson) :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'daemons/buildd-manager.tac'
--- daemons/buildd-manager.tac 2011-12-29 05:29:36 +0000
+++ daemons/buildd-manager.tac 2016-05-25 15:34:01 +0000
@@ -1,14 +1,18 @@
1# Copyright 2009-2011 Canonical Ltd. This software is licensed under the1# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4# Twisted Application Configuration file.4# Twisted Application Configuration file.
5# Use with "twistd2.4 -y <file.tac>", e.g. "twistd -noy server.tac"5# Use with "twistd2.4 -y <file.tac>", e.g. "twistd -noy server.tac"
66
7import resource
8
7from twisted.application import service9from twisted.application import service
8from twisted.scripts.twistd import ServerOptions10from twisted.scripts.twistd import ServerOptions
9from twisted.web import server
1011
11from lp.services.config import dbconfig12from lp.services.config import (
13 config,
14 dbconfig,
15 )
12from lp.services.daemons import readyservice16from lp.services.daemons import readyservice
13from lp.services.scripts import execute_zcml_for_scripts17from lp.services.scripts import execute_zcml_for_scripts
14from lp.buildmaster.manager import BuilddManager18from lp.buildmaster.manager import BuilddManager
@@ -21,6 +25,12 @@
21# Should be removed from callsites verified to not need it.25# Should be removed from callsites verified to not need it.
22set_immediate_mail_delivery(True)26set_immediate_mail_delivery(True)
2327
28# Allow generous slack for database connections, idle download connections,
29# etc.
30soft_nofile = config.builddmaster.download_connections + 1024
31_, hard_nofile = resource.getrlimit(resource.RLIMIT_NOFILE)
32resource.setrlimit(resource.RLIMIT_NOFILE, (soft_nofile, hard_nofile))
33
24options = ServerOptions()34options = ServerOptions()
25options.parseOptions()35options.parseOptions()
2636
@@ -34,4 +44,3 @@
34# Service for scanning buildd slaves.44# Service for scanning buildd slaves.
35service = BuilddManager()45service = BuilddManager()
36service.setServiceParent(application)46service.setServiceParent(application)
37
3847
=== modified file 'lib/lp/buildmaster/interactor.py'
--- lib/lp/buildmaster/interactor.py 2015-02-17 05:38:37 +0000
+++ lib/lp/buildmaster/interactor.py 2016-05-25 15:34:01 +0000
@@ -1,4 +1,4 @@
1# Copyright 2009-2014 Canonical Ltd. This software is licensed under the1# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4__metaclass__ = type4__metaclass__ = type
@@ -13,9 +13,17 @@
13from urlparse import urlparse13from urlparse import urlparse
1414
15import transaction15import transaction
16from twisted.internet import defer16from twisted.internet import (
17 defer,
18 reactor as default_reactor,
19 )
20from twisted.internet.protocol import Protocol
17from twisted.web import xmlrpc21from twisted.web import xmlrpc
18from twisted.web.client import downloadPage22from twisted.web.client import (
23 Agent,
24 HTTPConnectionPool,
25 ResponseDone,
26 )
19from zope.security.proxy import (27from zope.security.proxy import (
20 isinstance as zope_isinstance,28 isinstance as zope_isinstance,
21 removeSecurityProxy,29 removeSecurityProxy,
@@ -46,6 +54,89 @@
46 noisy = False54 noisy = False
4755
4856
57class FileWritingProtocol(Protocol):
58 """A protocol that saves data to a file."""
59
60 def __init__(self, finished, filename):
61 self.finished = finished
62 self.filename = filename
63 self.file = None
64
65 def dataReceived(self, data):
66 if self.file is None:
67 self.file = open(self.filename, "wb")
68 try:
69 self.file.write(data)
70 except IOError:
71 try:
72 self.file.close()
73 except IOError:
74 pass
75 self.file = None
76 self.finished.errback()
77
78 def connectionLost(self, reason):
79 try:
80 self.file.close()
81 except IOError:
82 self.finished.errback()
83 else:
84 if reason.check(ResponseDone):
85 self.finished.callback(None)
86 else:
87 self.finished.errback(reason)
88
89
90class LimitedHTTPConnectionPool(HTTPConnectionPool):
91 """A connection pool with an upper limit on open connections."""
92
93 # XXX cjwatson 2016-05-25: This actually only limits active connections,
94 # and doesn't count idle but open connections towards the limit; this is
95 # because it's very difficult to do the latter with HTTPConnectionPool's
96 # current design. Users of this pool must therefore expect some
97 # additional file descriptors to be open for idle connections.
98
99 def __init__(self, reactor, limit, persistent=True):
100 super(LimitedHTTPConnectionPool, self).__init__(
101 reactor, persistent=persistent)
102 self._semaphore = defer.DeferredSemaphore(limit)
103
104 def getConnection(self, key, endpoint):
105 d = self._semaphore.acquire()
106 d.addCallback(
107 lambda _: super(LimitedHTTPConnectionPool, self).getConnection(
108 key, endpoint))
109 return d
110
111 def _putConnection(self, key, connection):
112 super(LimitedHTTPConnectionPool, self)._putConnection(key, connection)
113 # Only release the semaphore in the next main loop iteration; if we
114 # release it here then the next request may start using this
115 # connection's parser before this request has quite finished with
116 # it.
117 self._reactor.callLater(0, self._semaphore.release)
118
119
120_default_pool = None
121
122
123def default_pool(reactor=None):
124 global _default_pool
125 if reactor is None:
126 reactor = default_reactor
127 if _default_pool is None:
128 # Circular import.
129 from lp.buildmaster.manager import SlaveScanner
130 # Short cached connection timeout to avoid potential weirdness with
131 # virtual builders that reboot frequently.
132 _default_pool = LimitedHTTPConnectionPool(
133 reactor, config.builddmaster.download_connections)
134 _default_pool.maxPersistentPerHost = (
135 config.builddmaster.idle_download_connections_per_builder)
136 _default_pool.cachedConnectionTimeout = SlaveScanner.SCAN_INTERVAL
137 return _default_pool
138
139
49class BuilderSlave(object):140class BuilderSlave(object):
50 """Add in a few useful methods for the XMLRPC slave.141 """Add in a few useful methods for the XMLRPC slave.
51142
@@ -58,7 +149,7 @@
58 # many false positives in your test run and will most likely break149 # many false positives in your test run and will most likely break
59 # production.150 # production.
60151
61 def __init__(self, proxy, builder_url, vm_host, timeout, reactor):152 def __init__(self, proxy, builder_url, vm_host, timeout, reactor, pool):
62 """Initialize a BuilderSlave.153 """Initialize a BuilderSlave.
63154
64 :param proxy: An XML-RPC proxy, implementing 'callRemote'. It must155 :param proxy: An XML-RPC proxy, implementing 'callRemote'. It must
@@ -71,11 +162,16 @@
71 self._file_cache_url = urlappend(builder_url, 'filecache')162 self._file_cache_url = urlappend(builder_url, 'filecache')
72 self._server = proxy163 self._server = proxy
73 self.timeout = timeout164 self.timeout = timeout
165 if reactor is None:
166 reactor = default_reactor
74 self.reactor = reactor167 self.reactor = reactor
168 if pool is None:
169 pool = default_pool(reactor=reactor)
170 self.pool = pool
75171
76 @classmethod172 @classmethod
77 def makeBuilderSlave(cls, builder_url, vm_host, timeout, reactor=None,173 def makeBuilderSlave(cls, builder_url, vm_host, timeout, reactor=None,
78 proxy=None):174 proxy=None, pool=None):
79 """Create and return a `BuilderSlave`.175 """Create and return a `BuilderSlave`.
80176
81 :param builder_url: The URL of the slave buildd machine,177 :param builder_url: The URL of the slave buildd machine,
@@ -84,6 +180,7 @@
84 here.180 here.
85 :param reactor: Used by tests to override the Twisted reactor.181 :param reactor: Used by tests to override the Twisted reactor.
86 :param proxy: Used By tests to override the xmlrpc.Proxy.182 :param proxy: Used By tests to override the xmlrpc.Proxy.
183 :param pool: Used by tests to override the HTTPConnectionPool.
87 """184 """
88 rpc_url = urlappend(builder_url.encode('utf-8'), 'rpc')185 rpc_url = urlappend(builder_url.encode('utf-8'), 'rpc')
89 if proxy is None:186 if proxy is None:
@@ -92,7 +189,7 @@
92 server_proxy.queryFactory = QuietQueryFactory189 server_proxy.queryFactory = QuietQueryFactory
93 else:190 else:
94 server_proxy = proxy191 server_proxy = proxy
95 return cls(server_proxy, builder_url, vm_host, timeout, reactor)192 return cls(server_proxy, builder_url, vm_host, timeout, reactor, pool)
96193
97 def _with_timeout(self, d, timeout=None):194 def _with_timeout(self, d, timeout=None):
98 return cancel_on_timeout(d, timeout or self.timeout, self.reactor)195 return cancel_on_timeout(d, timeout or self.timeout, self.reactor)
@@ -138,10 +235,15 @@
138 errback with the error string.235 errback with the error string.
139 """236 """
140 file_url = urlappend(self._file_cache_url, sha_sum).encode('utf8')237 file_url = urlappend(self._file_cache_url, sha_sum).encode('utf8')
141 # If desired we can pass a param "timeout" here but let's leave238 d = Agent(self.reactor, pool=self.pool).request("GET", file_url)
142 # it at the default value if it becomes obvious we need to239
143 # change it.240 def got_response(response):
144 return downloadPage(file_url, file_to_write, followRedirect=0)241 finished = defer.Deferred()
242 response.deliverBody(FileWritingProtocol(finished, file_to_write))
243 return finished
244
245 d.addCallback(got_response)
246 return d
145247
146 def getFiles(self, files):248 def getFiles(self, files):
147 """Fetch many files from the builder.249 """Fetch many files from the builder.
148250
=== modified file 'lib/lp/buildmaster/tests/mock_slaves.py'
--- lib/lp/buildmaster/tests/mock_slaves.py 2015-09-28 17:38:45 +0000
+++ lib/lp/buildmaster/tests/mock_slaves.py 2016-05-25 15:34:01 +0000
@@ -1,4 +1,4 @@
1# Copyright 2009-2015 Canonical Ltd. This software is licensed under the1# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""Mock Build objects for tests soyuz buildd-system."""4"""Mock Build objects for tests soyuz buildd-system."""
@@ -290,14 +290,14 @@
290 lambda: open(tachandler.logfile, 'r').readlines()))290 lambda: open(tachandler.logfile, 'r').readlines()))
291 return tachandler291 return tachandler
292292
293 def getClientSlave(self, reactor=None, proxy=None):293 def getClientSlave(self, reactor=None, proxy=None, pool=None):
294 """Return a `BuilderSlave` for use in testing.294 """Return a `BuilderSlave` for use in testing.
295295
296 Points to a fixed URL that is also used by `BuilddSlaveTestSetup`.296 Points to a fixed URL that is also used by `BuilddSlaveTestSetup`.
297 """297 """
298 return BuilderSlave.makeBuilderSlave(298 return BuilderSlave.makeBuilderSlave(
299 self.BASE_URL, 'vmhost', config.builddmaster.socket_timeout,299 self.BASE_URL, 'vmhost', config.builddmaster.socket_timeout,
300 reactor, proxy)300 reactor=reactor, proxy=proxy, pool=pool)
301301
302 def makeCacheFile(self, tachandler, filename):302 def makeCacheFile(self, tachandler, filename):
303 """Make a cache file available on the remote slave.303 """Make a cache file available on the remote slave.
304304
=== modified file 'lib/lp/buildmaster/tests/test_interactor.py'
--- lib/lp/buildmaster/tests/test_interactor.py 2015-11-04 14:30:09 +0000
+++ lib/lp/buildmaster/tests/test_interactor.py 2016-05-25 15:34:01 +0000
@@ -1,4 +1,4 @@
1# Copyright 2009-2015 Canonical Ltd. This software is licensed under the1# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""Test BuilderInteractor features."""4"""Test BuilderInteractor features."""
@@ -20,9 +20,16 @@
20 AsynchronousDeferredRunTestForBrokenTwisted,20 AsynchronousDeferredRunTestForBrokenTwisted,
21 SynchronousDeferredRunTest,21 SynchronousDeferredRunTest,
22 )22 )
23from testtools.matchers import ContainsAll23from testtools.matchers import (
24 ContainsAll,
25 HasLength,
26 MatchesDict,
27 )
24from testtools.testcase import ExpectedException28from testtools.testcase import ExpectedException
25from twisted.internet import defer29from twisted.internet import (
30 defer,
31 reactor as default_reactor,
32 )
26from twisted.internet.task import Clock33from twisted.internet.task import Clock
27from twisted.python.failure import Failure34from twisted.python.failure import Failure
28from twisted.web.client import getPage35from twisted.web.client import getPage
@@ -38,6 +45,7 @@
38 BuilderInteractor,45 BuilderInteractor,
39 BuilderSlave,46 BuilderSlave,
40 extract_vitals_from_db,47 extract_vitals_from_db,
48 LimitedHTTPConnectionPool,
41 )49 )
42from lp.buildmaster.interfaces.builder import (50from lp.buildmaster.interfaces.builder import (
43 BuildDaemonIsolationError,51 BuildDaemonIsolationError,
@@ -789,6 +797,7 @@
789 for sha1, local_file in files:797 for sha1, local_file in files:
790 with open(local_file) as f:798 with open(local_file) as f:
791 self.assertEqual(content_map[sha1], f.read())799 self.assertEqual(content_map[sha1], f.read())
800 return slave.pool.closeCachedConnections()
792801
793 def finished_uploading(ignored):802 def finished_uploading(ignored):
794 d = slave.getFiles(files)803 d = slave.getFiles(files)
@@ -812,3 +821,44 @@
812 dl.append(d)821 dl.append(d)
813822
814 return defer.DeferredList(dl).addCallback(finished_uploading)823 return defer.DeferredList(dl).addCallback(finished_uploading)
824
825 def test_getFiles_open_connections(self):
826 # getFiles honours the configured limit on active download
827 # connections.
828 pool = LimitedHTTPConnectionPool(default_reactor, 2)
829 contents = [self.factory.getUniqueString() for _ in range(10)]
830 self.slave_helper.getServerSlave()
831 slave = self.slave_helper.getClientSlave(pool=pool)
832 files = []
833 content_map = {}
834
835 def got_files(ignored):
836 # Called back when getFiles finishes. Make sure all the
837 # content is as expected.
838 for sha1, local_file in files:
839 with open(local_file) as f:
840 self.assertEqual(content_map[sha1], f.read())
841 # Only two connections were used.
842 self.assertThat(
843 slave.pool._connections,
844 MatchesDict({("http", "localhost", 8221): HasLength(2)}))
845 return slave.pool.closeCachedConnections()
846
847 def finished_uploading(ignored):
848 d = slave.getFiles(files)
849 return d.addCallback(got_files)
850
851 # Set up some files on the builder and store details in
852 # content_map so we can compare downloads later.
853 dl = []
854 for content in contents:
855 filename = content + '.txt'
856 lf = self.factory.makeLibraryFileAlias(filename, content=content)
857 content_map[lf.content.sha1] = content
858 files.append((lf.content.sha1, tempfile.mkstemp()[1]))
859 self.addCleanup(os.remove, files[-1][1])
860 self.layer.txn.commit()
861 d = slave.ensurepresent(lf.content.sha1, lf.http_url, "", "")
862 dl.append(d)
863
864 return defer.DeferredList(dl).addCallback(finished_uploading)
815865
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf 2016-05-18 03:59:44 +0000
+++ lib/lp/services/config/schema-lazr.conf 2016-05-25 15:34:01 +0000
@@ -72,6 +72,14 @@
72# datatype: integer72# datatype: integer
73virtualized_socket_timeout: 3073virtualized_socket_timeout: 30
7474
75# The maximum number of idle file download connections per builder that
76# may be kept open.
77idle_download_connections_per_builder: 10
78
79# The maximum number of file download connections that may be open
80# across all builders.
81download_connections: 2048
82
75# Activate the Build Notification system.83# Activate the Build Notification system.
76# datatype: boolean84# datatype: boolean
77send_build_notification: True85send_build_notification: True