Merge lp:~rvb/maas/rework-threadpool into lp:~maas-committers/maas/trunk

Proposed by Raphaël Badin
Status: Merged
Approved by: Raphaël Badin
Approved revision: no longer in the source branch.
Merged at revision: 3820
Proposed branch: lp:~rvb/maas/rework-threadpool
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 203 lines (+50/-16)
4 files modified
src/maasserver/plugin.py (+0/-4)
src/maasserver/tests/test_plugin.py (+0/-2)
src/maasserver/websockets/protocol.py (+22/-7)
src/maasserver/websockets/tests/test_protocol.py (+28/-3)
To merge this branch: bzr merge lp:~rvb/maas/rework-threadpool
Reviewer Review Type Date Requested Status
Blake Rouse (community) Approve
Review via email: mp+256435@code.launchpad.net

Commit message

Revert 3819 and 3815: use the default threadpool size and re-introduce the webcoket threadpool. An unbounded threadpool would end up exhausting the available DB connections.

To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/maasserver/plugin.py'
2--- src/maasserver/plugin.py 2015-04-14 08:27:39 +0000
3+++ src/maasserver/plugin.py 2015-04-16 07:48:27 +0000
4@@ -38,10 +38,6 @@
5
6 def makeService(self, options):
7 """Construct a service."""
8- from twisted.internet import reactor
9- # This is a workaround until we implement proper unbounded threadpool
10- # with a cleanup method.
11- reactor.suggestThreadPoolSize(1024 * 10)
12 # Get something going with the logs.
13 from provisioningserver import logger
14 logger.basicConfig()
15
16=== modified file 'src/maasserver/tests/test_plugin.py'
17--- src/maasserver/tests/test_plugin.py 2015-04-14 08:27:39 +0000
18+++ src/maasserver/tests/test_plugin.py 2015-04-16 07:48:27 +0000
19@@ -25,7 +25,6 @@
20 from provisioningserver import logger
21 from provisioningserver.utils.twisted import asynchronous
22 from twisted.application.service import MultiService
23-from twisted.internet import reactor
24
25
26 class TestOptions(MAASTestCase):
27@@ -73,7 +72,6 @@
28 "rpc-advertise",
29 "web",
30 ]
31- self.assertEqual(1024 * 10, reactor.getThreadPool().max)
32 self.assertItemsEqual(expected_services, service.namedServices)
33 self.assertEqual(
34 len(service.namedServices), len(service.services),
35
36=== modified file 'src/maasserver/websockets/protocol.py'
37--- src/maasserver/websockets/protocol.py 2015-04-14 20:37:30 +0000
38+++ src/maasserver/websockets/protocol.py 2015-04-16 07:48:27 +0000
39@@ -37,13 +37,15 @@
40 from maasserver.websockets.listener import PostgresListener
41 from maasserver.websockets.websockets import STATUSES
42 from provisioningserver.utils.twisted import synchronous
43+from twisted.internet import reactor
44 from twisted.internet.defer import inlineCallbacks
45 from twisted.internet.protocol import (
46 Factory,
47 Protocol,
48 )
49-from twisted.internet.threads import deferToThread
50+from twisted.internet.threads import deferToThreadPool
51 from twisted.python import log
52+from twisted.python.threadpool import ThreadPool
53 from twisted.web.server import NOT_DONE_YET
54
55
56@@ -175,7 +177,9 @@
57 "Error authenticating user: %s" % failure.getErrorMessage())
58 return None
59
60- d = deferToThread(self.getUserFromSessionId, session_id)
61+ d = deferToThreadPool(
62+ reactor, self.factory.threadpool,
63+ self.getUserFromSessionId, session_id)
64 d.addCallbacks(got_user, got_user_error)
65
66 return d
67@@ -254,7 +258,8 @@
68 # performed. The execution of this method is defered to a thread
69 # because it interacts with the database which is blocking.
70 transactional_execute = transactional(handler.execute)
71- d = deferToThread(
72+ d = deferToThreadPool(
73+ reactor, self.factory.threadpool,
74 transactional_execute, method, message.get("params", {}))
75 d.addCallbacks(
76 partial(self.sendResult, request_id),
77@@ -300,22 +305,31 @@
78
79
80 class WebSocketFactory(Factory):
81+ """Factory for WebSocketProtocol.
82+
83+ :ivar threadpool: The thread-pool used for servicing websocket
84+ requests.
85+ """
86
87 handlers = {}
88 clients = []
89
90 def __init__(self):
91+ self.threadpool = ThreadPool(name=self.__class__.__name__)
92 self.listener = PostgresListener()
93 self.cacheHandlers()
94 self.registerNotifiers()
95
96 def startFactory(self):
97- """Start the listener."""
98+ """Start the thread pool and the listener."""
99+ self.threadpool.start()
100 return self.listener.start()
101
102 def stopFactory(self):
103- """Stop the listener."""
104- return self.listener.stop()
105+ """Stop the thread pool and the listener."""
106+ stopped = self.listener.stop()
107+ self.threadpool.stop()
108+ return stopped
109
110 def getSessionEngine(self):
111 """Returns the session engine being used by Django.
112@@ -359,7 +373,8 @@
113 def onNotify(self, handler_class, channel, action, obj_id):
114 for client in self.clients:
115 handler = handler_class(client.user, client.cache)
116- data = yield deferToThread(
117+ data = yield deferToThreadPool(
118+ reactor, self.threadpool,
119 self.processNotify, handler, channel, action, obj_id)
120 if data is not None:
121 (name, data) = data
122
123=== modified file 'src/maasserver/websockets/tests/test_protocol.py'
124--- src/maasserver/websockets/tests/test_protocol.py 2015-04-14 20:37:30 +0000
125+++ src/maasserver/websockets/tests/test_protocol.py 2015-04-16 07:48:27 +0000
126@@ -47,12 +47,14 @@
127 from testtools.matchers import (
128 Equals,
129 Is,
130+ IsInstance,
131 )
132 from twisted.internet.defer import (
133 fail,
134 inlineCallbacks,
135 )
136 from twisted.internet.threads import deferToThread
137+from twisted.python.threadpool import ThreadPool
138 from twisted.web.server import NOT_DONE_YET
139
140
141@@ -61,6 +63,8 @@
142 def make_protocol(self, patch_authenticate=True, transport_uri=''):
143 self.patch(protocol_module, "PostgresListener")
144 factory = WebSocketFactory()
145+ factory.startFactory()
146+ self.addCleanup(factory.stopFactory)
147 protocol = factory.buildProtocol(None)
148 protocol.transport = MagicMock()
149 protocol.transport.uri = transport_uri
150@@ -521,9 +525,9 @@
151 self.addCleanup(self.clean_node, node)
152 protocol, factory = self.make_protocol()
153 protocol.user = MagicMock()
154- mock_deferToThread = self.patch_autospec(
155- protocol_module, "deferToThread")
156- mock_deferToThread.return_value = fail(
157+ mock_deferToThreadPool = self.patch_autospec(
158+ protocol_module, "deferToThreadPool")
159+ mock_deferToThreadPool.return_value = fail(
160 maas_factory.make_exception("error"))
161 message = {
162 "type": MSG_TYPE.REQUEST,
163@@ -561,6 +565,8 @@
164
165 def make_protocol_with_factory(self, user=None):
166 factory = WebSocketFactory()
167+ factory.startFactory()
168+ self.addCleanup(factory.stopFactory)
169 protocol = factory.buildProtocol(None)
170 protocol.transport = MagicMock()
171 if user is None:
172@@ -618,12 +624,31 @@
173
174 @wait_for_reactor
175 @inlineCallbacks
176+ def test_startFactory_starts_threadpool(self):
177+ factory = WebSocketFactory()
178+ yield factory.startFactory()
179+ try:
180+ self.assertThat(factory.threadpool, IsInstance(ThreadPool))
181+ self.expectThat(factory.threadpool.started, Equals(True))
182+ finally:
183+ yield factory.stopFactory()
184+
185+ @wait_for_reactor
186+ @inlineCallbacks
187 def test_stopFactory_stops_listener(self):
188 factory = WebSocketFactory()
189 yield factory.startFactory()
190 yield factory.stopFactory()
191 self.expectThat(factory.listener.connected(), Equals(False))
192
193+ @wait_for_reactor
194+ @inlineCallbacks
195+ def test_stopFactory_stops_threadpool(self):
196+ factory = WebSocketFactory()
197+ yield factory.startFactory()
198+ yield factory.stopFactory()
199+ self.assertEqual([], factory.threadpool.threads)
200+
201 def test_registerNotifiers_registers_all_notifiers(self):
202 factory = WebSocketFactory()
203 self.assertItemsEqual(