Merge lp:~jameinel/bzr/2.5-soft-hangup-795025 into lp:bzr

Proposed by John A Meinel
Status: Merged
Approved by: John A Meinel
Approved revision: no longer in the source branch.
Merged at revision: 6170
Proposed branch: lp:~jameinel/bzr/2.5-soft-hangup-795025
Merge into: lp:bzr
Prerequisite: lp:~jameinel/bzr/drop-idle-connections-824797
Diff against target: 1019 lines (+698/-23)
10 files modified
bzrlib/smart/__init__.py (+1/-1)
bzrlib/smart/medium.py (+46/-4)
bzrlib/smart/server.py (+92/-10)
bzrlib/smart/signals.py (+114/-0)
bzrlib/tests/__init__.py (+1/-0)
bzrlib/tests/blackbox/test_serve.py (+28/-0)
bzrlib/tests/test_smart_signals.py (+189/-0)
bzrlib/tests/test_smart_transport.py (+213/-2)
bzrlib/trace.py (+4/-0)
doc/en/release-notes/bzr-2.5.txt (+10/-6)
To merge this branch: bzr merge lp:~jameinel/bzr/2.5-soft-hangup-795025
Reviewer Review Type Date Requested Status
Jelmer Vernooij (community) Approve
Review via email: mp+76608@code.launchpad.net

Commit message

Allow 'bzr serve' to interpret SIGHUP as a graceful shutdown. (bug #795025)

Description of the change

Silly launchpad losing changes if you accidentally navigate away from the edit box... :( (and silly laptop for putting browser previous right next to the up arrow.)

Anyway, this should be done. It does the bits that I wanted it to, and it seems well tested.

1) sending SIGHUP will start a soft shutdown for both 'bzr serve' and 'bzr serve --inet'. In the former case, it tells all the client threads that we want a soft shutdown (so stop as soon as you are done with the current request), and then waits for them to finish, logging whether it is progressing or not.

2) This changes 'serve()' so that it defaults to closing the client connection when done serving. This makes things cleaner IMO, but it means that we will close sys.stdin during 'bzr serve --inet'. And _flush_stdout_stderr was handling some IOError, but on py2.6 on Windows, I was getting a *ValueError*, so I added that to the trap list.

3) The infrastructure is all hooked up on Windows, except that we don't have SIGHUP. For now, I've been using SIGBREAK and then self._stop_gracefully() to make sure it is working.

4) We close the primary server socket after we get the shutdown request, but before we wait for the clients to finish. That should mean that you can get a no-downtime 'bzr serve' restart by doing "kill -SIGHUP <PID> ; bzr serve". So it will shut down the current one, and bring up another one to serve new requests while the first one finishes out the existing requests. For Launchpad, it will be more complex. We'll need to teach the twisted daemon about SIGHUP, and then have it pass that same signal to all the 'bzr serve --inet' children. With the forking server, I imagine it will similarly need to pass the signal around. But something similar should be possible, making the current one quiet (down to HAProxy), close the primary socket, start a new one responding to HAProxy, etc.

5) We now need to teach the client to reconnect when it gets disconnected, and the rest should flow pretty smoothly.

To post a comment you must log in.
Revision history for this message
Jelmer Vernooij (jelmer) wrote :

I've begun reviewing this, but threads make me sad and I'm not really familiar with this code (yet).
 I've noticed some really minor issues, but I'll try to follow up with some actually useful comments (once I understand the global picture a bit better).

you have an import conflict in bzrlib/smart/medium.py

206 + # have a good way (yet) to poll the spawned clients and
... and what ? :)

246 + if e.args[0] not in (errno.EBADF, errno.EINTR):
The comment above this line should probably be updated.

Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 9/24/2011 11:26 PM, Jelmer Vernooij wrote:
> I've begun reviewing this, but threads make me sad and I'm not
> really familiar with this code (yet). I've noticed some really
> minor issues, but I'll try to follow up with some actually useful
> comments (once I understand the global picture a bit better).
>
> you have an import conflict in bzrlib/smart/medium.py
>
> 206 + # have a good way (yet) to poll the spawned
> clients and ... and what ? :)

I fixed that one, actually, so I just removed the comment.
SmartTCPServer now keeps track of clients and on graceful shutdown
waits for them to finish.

>
> 246 + if e.args[0] not in (errno.EBADF,
> errno.EINTR): The comment above this line should probably be
> updated.

Done. I fixed the import collision, and added gettext() calls to all
the new trace.note() functionality. As an aside, it is really easy to
forget those. It would be nice to have some sort of test/regular
procedure for making sure we get them right.

John
=:->

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk6AMTYACgkQJdeBCYSNAAPsOACgkYEjHGH0AG7/X5Tid0537l6W
dvkAnj81J8GDMF7ijtwsqdZOSNf8+N1v
=wh/M
-----END PGP SIGNATURE-----

Revision history for this message
Jelmer Vernooij (jelmer) :
review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

sent to pqm by email

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bzrlib/smart/__init__.py'
2--- bzrlib/smart/__init__.py 2011-09-22 13:25:47 +0000
3+++ bzrlib/smart/__init__.py 2011-09-26 14:29:40 +0000
4@@ -1,4 +1,4 @@
5-# Copyright (C) 2006 Canonical Ltd
6+# Copyright (C) 2006,2011 Canonical Ltd
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10
11=== modified file 'bzrlib/smart/medium.py'
12--- bzrlib/smart/medium.py 2011-09-26 14:29:38 +0000
13+++ bzrlib/smart/medium.py 2011-09-26 14:29:40 +0000
14@@ -46,10 +46,10 @@
15 urlutils,
16 )
17 from bzrlib.i18n import gettext
18-from bzrlib.smart import client, protocol, request, vfs
19+from bzrlib.smart import client, protocol, request, signals, vfs
20 from bzrlib.transport import ssh
21 """)
22-from bzrlib import config, osutils
23+from bzrlib import osutils
24
25 # Throughout this module buffer size parameters are either limited to be at
26 # most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
27@@ -205,6 +205,8 @@
28 the stream. See also the _push_back method.
29 """
30
31+ _timer = time.time
32+
33 def __init__(self, backing_transport, root_client_path='/', timeout=None):
34 """Construct new server.
35
36@@ -228,6 +230,11 @@
37 try:
38 while not self.finished:
39 server_protocol = self._build_protocol()
40+ # TODO: This seems inelegant:
41+ if server_protocol is None:
42+ # We could 'continue' only to notice that self.finished is
43+ # True...
44+ break
45 self._serve_one_request(server_protocol)
46 except errors.ConnectionTimeout, e:
47 trace.note('%s' % (e,))
48@@ -238,6 +245,12 @@
49 except Exception, e:
50 stderr.write("%s terminating on exception %s\n" % (self, e))
51 raise
52+ self._disconnect_client()
53+
54+ def _stop_gracefully(self):
55+ """When we finish this message, stop looking for more."""
56+ trace.mutter('Stopping %s' % (self,))
57+ self.finished = True
58
59 def _disconnect_client(self):
60 """Close the current connection. We stopped due to a timeout/etc."""
61@@ -267,6 +280,9 @@
62 :returns: a SmartServerRequestProtocol.
63 """
64 self._wait_for_bytes_with_timeout(self._client_timeout)
65+ if self.finished:
66+ # We're stopping, so don't try to do any more work
67+ return None
68 bytes = self._get_line()
69 protocol_factory, unused_bytes = _get_protocol_factory_for_bytes(bytes)
70 protocol = protocol_factory(
71@@ -281,10 +297,12 @@
72 readable handle before timeout_seconds.
73 :return: None
74 """
75- t_end = time.time() + timeout_seconds
76+ t_end = self._timer() + timeout_seconds
77 poll_timeout = min(timeout_seconds, self._client_poll_timeout)
78 rs = xs = None
79- while not rs and not xs and time.time() < t_end:
80+ while not rs and not xs and self._timer() < t_end:
81+ if self.finished:
82+ return
83 try:
84 rs, _, xs = select.select([fd], [], [fd], poll_timeout)
85 except (select.error, socket.error) as e:
86@@ -341,6 +359,18 @@
87 timeout=timeout)
88 sock.setblocking(True)
89 self.socket = sock
90+ # Get the getpeername now, as we might be closed later when we care.
91+ try:
92+ self._client_info = sock.getpeername()
93+ except socket.error:
94+ self._client_info = '<unknown>'
95+
96+ def __str__(self):
97+ return '%s(client=%s)' % (self.__class__.__name__, self._client_info)
98+
99+ def __repr__(self):
100+ return '%s.%s(client=%s)' % (self.__module__, self.__class__.__name__,
101+ self._client_info)
102
103 def _serve_one_request_unguarded(self, protocol):
104 while protocol.next_read_size():
105@@ -412,6 +442,17 @@
106 self._in = in_file
107 self._out = out_file
108
109+ def serve(self):
110+ """See SmartServerStreamMedium.serve"""
111+ # This is the regular serve, except it adds signal trapping for soft
112+ # shutdown.
113+ stop_gracefully = self._stop_gracefully
114+ signals.register_on_hangup(id(self), stop_gracefully)
115+ try:
116+ return super(SmartServerPipeStreamMedium, self).serve()
117+ finally:
118+ signals.unregister_on_hangup(id(self))
119+
120 def _serve_one_request_unguarded(self, protocol):
121 while True:
122 # We need to be careful not to read past the end of the current
123@@ -432,6 +473,7 @@
124
125 def _disconnect_client(self):
126 self._in.close()
127+ self._out.flush()
128 self._out.close()
129
130 def _wait_for_bytes_with_timeout(self, timeout_seconds):
131
132=== modified file 'bzrlib/smart/server.py'
133--- bzrlib/smart/server.py 2011-09-26 14:29:38 +0000
134+++ bzrlib/smart/server.py 2011-09-26 14:29:40 +0000
135@@ -20,6 +20,7 @@
136 import os.path
137 import socket
138 import sys
139+import time
140 import threading
141
142 from bzrlib.hooks import Hooks
143@@ -31,7 +32,10 @@
144 from bzrlib.i18n import gettext
145 from bzrlib.lazy_import import lazy_import
146 lazy_import(globals(), """
147-from bzrlib.smart import medium
148+from bzrlib.smart import (
149+ medium,
150+ signals,
151+ )
152 from bzrlib.transport import (
153 chroot,
154 pathfilter,
155@@ -56,6 +60,10 @@
156 # so the test suite can set it faster. (It thread.interrupt_main() will not
157 # fire a KeyboardInterrupt during socket.accept)
158 _ACCEPT_TIMEOUT = 1.0
159+ _SHUTDOWN_POLL_TIMEOUT = 1.0
160+ _LOG_WAITING_TIMEOUT = 10.0
161+
162+ _timer = time.time
163
164 def __init__(self, backing_transport, root_client_path='/',
165 client_timeout=None):
166@@ -73,6 +81,10 @@
167 self.backing_transport = backing_transport
168 self.root_client_path = root_client_path
169 self._client_timeout = client_timeout
170+ self._active_connections = []
171+ # This is set to indicate we want to wait for clients to finish before
172+ # we disconnect.
173+ self._gracefully_stopping = False
174
175 def start_server(self, host, port):
176 """Create the server listening socket.
177@@ -105,8 +117,14 @@
178 self.port = self._sockname[1]
179 self._server_socket.listen(1)
180 self._server_socket.settimeout(self._ACCEPT_TIMEOUT)
181+ # Once we start accept()ing connections, we set started.
182 self._started = threading.Event()
183+ # Once we stop accept()ing connections (and are closing the socket) we
184+ # set _stopped
185 self._stopped = threading.Event()
186+ # Once we have finished waiting for all clients, etc. We set
187+ # _fully_stopped
188+ self._fully_stopped = threading.Event()
189
190 def _backing_urls(self):
191 # There are three interesting urls:
192@@ -145,7 +163,38 @@
193 for hook in SmartTCPServer.hooks['server_stopped']:
194 hook(backing_urls, self.get_url())
195
196+ def _stop_gracefully(self):
197+ trace.note(gettext('Requested to stop gracefully'))
198+ self._should_terminate = True
199+ self._gracefully_stopping = True
200+ for handler, _ in self._active_connections:
201+ handler._stop_gracefully()
202+
203+ def _wait_for_clients_to_disconnect(self):
204+ self._poll_active_connections()
205+ if not self._active_connections:
206+ return
207+ trace.note(gettext('Waiting for %d client(s) to finish')
208+ % (len(self._active_connections),))
209+ t_next_log = self._timer() + self._LOG_WAITING_TIMEOUT
210+ while self._active_connections:
211+ now = self._timer()
212+ if now >= t_next_log:
213+ trace.note(gettext('Still waiting for %d client(s) to finish')
214+ % (len(self._active_connections),))
215+ t_next_log = now + self._LOG_WAITING_TIMEOUT
216+ self._poll_active_connections(self._SHUTDOWN_POLL_TIMEOUT)
217+
218 def serve(self, thread_name_suffix=''):
219+ # Note: There is a temptation to do
220+ # signals.register_on_hangup(id(self), self._stop_gracefully)
221+ # However, that creates a temporary object which is a bound
222+ # method. signals._on_sighup is a WeakKeyDictionary so it
223+ # immediately gets garbage collected, because nothing else
224+ # references it. Instead, we need to keep a real reference to the
225+ # bound method for the lifetime of the serve() function.
226+ stop_gracefully = self._stop_gracefully
227+ signals.register_on_hangup(id(self), stop_gracefully)
228 self._should_terminate = False
229 # for hooks we are letting code know that a server has started (and
230 # later stopped).
231@@ -161,14 +210,19 @@
232 pass
233 except self._socket_error, e:
234 # if the socket is closed by stop_background_thread
235- # we might get a EBADF here, any other socket errors
236- # should get logged.
237- if e.args[0] != errno.EBADF:
238- trace.warning("listening socket error: %s", e)
239+ # we might get a EBADF here, or if we get a signal we
240+ # can get EINTR, any other socket errors should get
241+ # logged.
242+ if e.args[0] not in (errno.EBADF, errno.EINTR):
243+ trace.warning(gettext("listening socket error: %s")
244+ % (e,))
245 else:
246 if self._should_terminate:
247+ conn.close()
248 break
249 self.serve_conn(conn, thread_name_suffix)
250+ # Cleanout any threads that have finished processing.
251+ self._poll_active_connections()
252 except KeyboardInterrupt:
253 # dont log when CTRL-C'd.
254 raise
255@@ -176,14 +230,18 @@
256 trace.report_exception(sys.exc_info(), sys.stderr)
257 raise
258 finally:
259- self._stopped.set()
260 try:
261 # ensure the server socket is closed.
262 self._server_socket.close()
263 except self._socket_error:
264 # ignore errors on close
265 pass
266+ self._stopped.set()
267+ signals.unregister_on_hangup(id(self))
268 self.run_server_stopped_hooks()
269+ if self._gracefully_stopping:
270+ self._wait_for_clients_to_disconnect()
271+ self._fully_stopped.set()
272
273 def get_url(self):
274 """Return the url of the server"""
275@@ -194,6 +252,23 @@
276 conn, self.backing_transport, self.root_client_path,
277 timeout=self._client_timeout)
278
279+ def _poll_active_connections(self, timeout=0.0):
280+ """Check to see if any active connections have finished.
281+
282+ This will iterate through self._active_connections, and update any
283+ connections that are finished.
284+
285+ :param timeout: The timeout to pass to thread.join(). By default, we
286+ set it to 0, so that we don't hang if threads are not done yet.
287+ :return: None
288+ """
289+ still_active = []
290+ for handler, thread in self._active_connections:
291+ thread.join(timeout)
292+ if thread.isAlive():
293+ still_active.append((handler, thread))
294+ self._active_connections = still_active
295+
296 def serve_conn(self, conn, thread_name_suffix):
297 # For WIN32, where the timeout value from the listening socket
298 # propagates to the newly accepted socket.
299@@ -203,9 +278,7 @@
300 handler = self._make_handler(conn)
301 connection_thread = threading.Thread(
302 None, handler.serve, name=thread_name)
303- # FIXME: This thread is never joined, it should at least be collected
304- # somewhere so that tests that want to check for leaked threads can get
305- # rid of them -- vila 20100531
306+ self._active_connections.append((handler, connection_thread))
307 connection_thread.setDaemon(True)
308 connection_thread.start()
309 return connection_thread
310@@ -355,13 +428,17 @@
311 transport = _mod_transport.get_transport_from_url(expand_userdirs.get_url())
312 self.transport = transport
313
314+ def _get_stdin_stdout(self):
315+ return sys.stdin, sys.stdout
316+
317 def _make_smart_server(self, host, port, inet, timeout):
318 if timeout is None:
319 c = config.GlobalStack()
320 timeout = c.get('serve.client_timeout')
321 if inet:
322+ stdin, stdout = self._get_stdin_stdout()
323 smart_server = medium.SmartServerPipeStreamMedium(
324- sys.stdin, sys.stdout, self.transport, timeout=timeout)
325+ stdin, stdout, self.transport, timeout=timeout)
326 else:
327 if host is None:
328 host = medium.BZR_DEFAULT_INTERFACE
329@@ -387,6 +464,10 @@
330 self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
331 ui.ui_factory = ui.SilentUIFactory()
332 lockdir._DEFAULT_TIMEOUT_SECONDS = 0
333+ orig = signals.install_sighup_handler()
334+ def restore_signals():
335+ signals.restore_sighup_handler(orig)
336+ self.cleanups.append(restore_signals)
337
338 def set_up(self, transport, host, port, inet, timeout):
339 self._make_backing_transport(transport)
340@@ -397,6 +478,7 @@
341 for cleanup in reversed(self.cleanups):
342 cleanup()
343
344+
345 def serve_bzr(transport, host=None, port=None, inet=False, timeout=None):
346 """This is the default implementation of 'bzr serve'.
347
348
349=== added file 'bzrlib/smart/signals.py'
350--- bzrlib/smart/signals.py 1970-01-01 00:00:00 +0000
351+++ bzrlib/smart/signals.py 2011-09-26 14:29:40 +0000
352@@ -0,0 +1,114 @@
353+# Copyright (C) 2011 Canonical Ltd
354+#
355+# This program is free software; you can redistribute it and/or modify
356+# it under the terms of the GNU General Public License as published by
357+# the Free Software Foundation; either version 2 of the License, or
358+# (at your option) any later version.
359+#
360+# This program is distributed in the hope that it will be useful,
361+# but WITHOUT ANY WARRANTY; without even the implied warranty of
362+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
363+# GNU General Public License for more details.
364+#
365+# You should have received a copy of the GNU General Public License
366+# along with this program; if not, write to the Free Software
367+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
368+
369+"""Signal handling for the smart server code."""
370+
371+import signal
372+import weakref
373+
374+from bzrlib import trace
375+
376+
377+# I'm pretty sure this has to be global, since signal handling is per-process.
378+_on_sighup = None
379+# TODO: Using a dict means that the order of calls is unordered. We could use a
380+# list and then do something like LIFO ordering. A dict was chosen so
381+# that you could have a key to easily remove your entry. However, you
382+# could just use the callable itself as the indexed part, and even in
383+# large cases, we shouldn't have more than 100 or so callbacks
384+# registered.
385+def _sighup_handler(signal_number, interrupted_frame):
386+ """This is the actual function that is registered for handling SIGHUP.
387+
388+ It will call out to all the registered functions, letting them know that a
389+ graceful termination has been requested.
390+ """
391+ if _on_sighup is None:
392+ return
393+ trace.mutter('Caught SIGHUP, sending graceful shutdown requests.')
394+ for ref in _on_sighup.valuerefs():
395+ try:
396+ cb = ref()
397+ if cb is not None:
398+ cb()
399+ except KeyboardInterrupt:
400+ raise
401+ except Exception:
402+ trace.mutter('Error occurred while running SIGHUP handlers:')
403+ trace.log_exception_quietly()
404+
405+
406+def install_sighup_handler():
407+ """Setup a handler for the SIGHUP signal."""
408+ if getattr(signal, "SIGHUP", None) is None:
409+ # If we can't install SIGHUP, there is no reason (yet) to do graceful
410+ # shutdown.
411+ old_signal = None
412+ else:
413+ old_signal = signal.signal(signal.SIGHUP, _sighup_handler)
414+ old_dict = _setup_on_hangup_dict()
415+ return old_signal, old_dict
416+
417+
418+def _setup_on_hangup_dict():
419+ """Create something for _on_sighup.
420+
421+ This is done when we install the sighup handler, and for tests that want to
422+ test the functionality. If this hasn'nt been called, then
423+ register_on_hangup is a no-op. As is unregister_on_hangup.
424+ """
425+ global _on_sighup
426+ old = _on_sighup
427+ _on_sighup = weakref.WeakValueDictionary()
428+ return old
429+
430+
431+def restore_sighup_handler(orig):
432+ """Pass in the returned value from install_sighup_handler to reset."""
433+ global _on_sighup
434+ old_signal, old_dict = orig
435+ if old_signal is not None:
436+ signal.signal(signal.SIGHUP, old_signal)
437+ _on_sighup = old_dict
438+
439+
440+# TODO: Should these be single-use callables? Meaning that once we've triggered
441+# SIGHUP and called them, they should auto-remove themselves? I don't
442+# think so. Callers need to clean up during shutdown anyway, so that we
443+# don't end up with lots of garbage in the _on_sighup dict. On the other
444+# hand, we made _on_sighup a WeakValueDictionary in case cleanups didn't
445+# get fired properly. Maybe we just assume we don't have to do it?
446+def register_on_hangup(identifier, a_callable):
447+ """Register for us to call a_callable as part of a graceful shutdown."""
448+ if _on_sighup is None:
449+ return
450+ _on_sighup[identifier] = a_callable
451+
452+
453+def unregister_on_hangup(identifier):
454+ """Remove a callback from being called during sighup."""
455+ if _on_sighup is None:
456+ return
457+ try:
458+ del _on_sighup[identifier]
459+ except KeyboardInterrupt:
460+ raise
461+ except Exception:
462+ # This usually runs as a tear-down step. So we don't want to propagate
463+ # most exceptions.
464+ trace.mutter('Error occurred during unregister_on_hangup:')
465+ trace.log_exception_quietly()
466+
467
468=== modified file 'bzrlib/tests/__init__.py'
469--- bzrlib/tests/__init__.py 2011-09-19 18:25:05 +0000
470+++ bzrlib/tests/__init__.py 2011-09-26 14:29:40 +0000
471@@ -4077,6 +4077,7 @@
472 'bzrlib.tests.test_smart',
473 'bzrlib.tests.test_smart_add',
474 'bzrlib.tests.test_smart_request',
475+ 'bzrlib.tests.test_smart_signals',
476 'bzrlib.tests.test_smart_transport',
477 'bzrlib.tests.test_smtp_connection',
478 'bzrlib.tests.test_source',
479
480=== modified file 'bzrlib/tests/blackbox/test_serve.py'
481--- bzrlib/tests/blackbox/test_serve.py 2011-09-26 14:29:38 +0000
482+++ bzrlib/tests/blackbox/test_serve.py 2011-09-26 14:29:40 +0000
483@@ -314,6 +314,34 @@
484 err)
485 self.assertServerFinishesCleanly(process)
486
487+ def test_bzr_serve_graceful_shutdown(self):
488+ big_contents = 'a'*64*1024
489+ self.build_tree_contents([('bigfile', big_contents)])
490+ process, url = self.start_server_port(['--client-timeout=1.0'])
491+ t = transport.get_transport_from_url(url)
492+ m = t.get_smart_medium()
493+ c = client._SmartClient(m)
494+ # Start, but don't finish a response
495+ resp, response_handler = c.call_expecting_body('get', 'bigfile')
496+ self.assertEqual(('ok',), resp)
497+ # Note: process.send_signal is a Python 2.6ism
498+ process.send_signal(signal.SIGHUP)
499+ # Wait for the server to notice the signal, and then read the actual
500+ # body of the response. That way we know that it is waiting for the
501+ # request to finish
502+ self.assertEqual('Requested to stop gracefully\n',
503+ process.stderr.readline())
504+ self.assertEqual('Waiting for 1 client(s) to finish\n',
505+ process.stderr.readline())
506+ body = response_handler.read_body_bytes()
507+ if body != big_contents:
508+ self.fail('Failed to properly read the contents of "bigfile"')
509+ # Now that our request is finished, the medium should notice it has
510+ # been disconnected.
511+ self.assertEqual('', m.read_bytes(1))
512+ # And the server should be stopping
513+ self.assertEqual(0, process.wait())
514+
515
516 class TestCmdServeChrooting(TestBzrServeBase):
517
518
519=== added file 'bzrlib/tests/test_smart_signals.py'
520--- bzrlib/tests/test_smart_signals.py 1970-01-01 00:00:00 +0000
521+++ bzrlib/tests/test_smart_signals.py 2011-09-26 14:29:40 +0000
522@@ -0,0 +1,189 @@
523+# Copyright (C) 2011 Canonical Ltd
524+#
525+# This program is free software; you can redistribute it and/or modify
526+# it under the terms of the GNU General Public License as published by
527+# the Free Software Foundation; either version 2 of the License, or
528+# (at your option) any later version.
529+#
530+# This program is distributed in the hope that it will be useful,
531+# but WITHOUT ANY WARRANTY; without even the implied warranty of
532+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
533+# GNU General Public License for more details.
534+#
535+# You should have received a copy of the GNU General Public License
536+# along with this program; if not, write to the Free Software
537+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
538+
539+
540+import os
541+import signal
542+import threading
543+import weakref
544+
545+from bzrlib import tests, transport
546+from bzrlib.smart import client, medium, server, signals
547+
548+# Windows doesn't define SIGHUP. And while we could just skip a lot of these
549+# tests, we often don't actually care about interaction with 'signal', so we
550+# can still run the tests for code coverage.
551+SIGHUP = getattr(signal, 'SIGHUP', 1)
552+
553+
554+class TestSignalHandlers(tests.TestCase):
555+
556+ def setUp(self):
557+ super(TestSignalHandlers, self).setUp()
558+ # This allows us to mutate the signal handler callbacks, but leave it
559+ # 'pristine' after the test case.
560+ # TODO: Arguably, this could be put into the base test.TestCase, along
561+ # with a tearDown that asserts that all the entries have been
562+ # removed properly. Global state is always a bit messy. A shame
563+ # that we need it for signal handling.
564+ orig = signals._setup_on_hangup_dict()
565+ self.assertIs(None, orig)
566+ def cleanup():
567+ signals._on_sighup = None
568+ self.addCleanup(cleanup)
569+
570+ def test_registered_callback_gets_called(self):
571+ calls = []
572+ def call_me():
573+ calls.append('called')
574+ signals.register_on_hangup('myid', call_me)
575+ signals._sighup_handler(SIGHUP, None)
576+ self.assertEqual(['called'], calls)
577+ signals.unregister_on_hangup('myid')
578+
579+ def test_unregister_not_present(self):
580+ # We don't want unregister to fail, since it is generally run at times
581+ # that shouldn't interrupt other flow.
582+ signals.unregister_on_hangup('no-such-id')
583+ log = self.get_log()
584+ self.assertContainsRe(log, 'Error occurred during unregister_on_hangup:')
585+ self.assertContainsRe(log, '(?s)Traceback.*KeyError')
586+
587+ def test_failing_callback(self):
588+ calls = []
589+ def call_me():
590+ calls.append('called')
591+ def fail_me():
592+ raise RuntimeError('something bad happened')
593+ signals.register_on_hangup('myid', call_me)
594+ signals.register_on_hangup('otherid', fail_me)
595+ # _sighup_handler should call both, even though it got an exception
596+ signals._sighup_handler(SIGHUP, None)
597+ signals.unregister_on_hangup('myid')
598+ signals.unregister_on_hangup('otherid')
599+ log = self.get_log()
600+ self.assertContainsRe(log, '(?s)Traceback.*RuntimeError')
601+ self.assertEqual(['called'], calls)
602+
603+ def test_unregister_during_call(self):
604+ # _sighup_handler should handle if some callbacks actually remove
605+ # themselves while running.
606+ calls = []
607+ def call_me_and_unregister():
608+ signals.unregister_on_hangup('myid')
609+ calls.append('called_and_unregistered')
610+ def call_me():
611+ calls.append('called')
612+ signals.register_on_hangup('myid', call_me_and_unregister)
613+ signals.register_on_hangup('other', call_me)
614+ signals._sighup_handler(SIGHUP, None)
615+
616+ def test_keyboard_interrupt_propagated(self):
617+ # In case we get 'stuck' while running a hangup function, we should
618+ # not suppress KeyboardInterrupt
619+ def call_me_and_raise():
620+ raise KeyboardInterrupt()
621+ signals.register_on_hangup('myid', call_me_and_raise)
622+ self.assertRaises(KeyboardInterrupt,
623+ signals._sighup_handler, SIGHUP, None)
624+ signals.unregister_on_hangup('myid')
625+
626+ def test_weak_references(self):
627+ # TODO: This is probably a very-CPython-specific test
628+ # Adding yourself to the callback should not make you immortal
629+ # We overrideAttr during the test suite, so that we don't pollute the
630+ # original dict. However, we can test that what we override matches
631+ # what we are putting there.
632+ self.assertIsInstance(signals._on_sighup,
633+ weakref.WeakValueDictionary)
634+ calls = []
635+ def call_me():
636+ calls.append('called')
637+ signals.register_on_hangup('myid', call_me)
638+ del call_me
639+ # Non-CPython might want to do a gc.collect() here
640+ signals._sighup_handler(SIGHUP, None)
641+ self.assertEqual([], calls)
642+
643+ def test_not_installed(self):
644+ # If you haven't called bzrlib.smart.signals.install_sighup_handler,
645+ # then _on_sighup should be None, and all the calls become no-ops.
646+ signals._on_sighup = None
647+ calls = []
648+ def call_me():
649+ calls.append('called')
650+ signals.register_on_hangup('myid', calls)
651+ signals._sighup_handler(SIGHUP, None)
652+ signals.unregister_on_hangup('myid')
653+ log = self.get_log()
654+ self.assertEqual('', log)
655+
656+ def test_install_sighup_handler(self):
657+ # install_sighup_handler should set up a signal handler for SIGHUP, as
658+ # well as the signals._on_sighup dict.
659+ signals._on_sighup = None
660+ orig = signals.install_sighup_handler()
661+ if getattr(signal, 'SIGHUP', None) is not None:
662+ cur = signal.getsignal(SIGHUP)
663+ self.assertEqual(signals._sighup_handler, cur)
664+ self.assertIsNot(None, signals._on_sighup)
665+ signals.restore_sighup_handler(orig)
666+ self.assertIs(None, signals._on_sighup)
667+
668+
669+class TestInetServer(tests.TestCase):
670+
671+ def create_file_pipes(self):
672+ r, w = os.pipe()
673+ rf = os.fdopen(r, 'rb')
674+ wf = os.fdopen(w, 'wb')
675+ return rf, wf
676+
677+ def test_inet_server_responds_to_sighup(self):
678+ t = transport.get_transport('memory:///')
679+ content = 'a'*1024*1024
680+ t.put_bytes('bigfile', content)
681+ factory = server.BzrServerFactory()
682+ # Override stdin/stdout so that we can inject our own handles
683+ client_read, server_write = self.create_file_pipes()
684+ server_read, client_write = self.create_file_pipes()
685+ factory._get_stdin_stdout = lambda: (server_read, server_write)
686+ factory.set_up(t, None, None, inet=True, timeout=4.0)
687+ self.addCleanup(factory.tear_down)
688+ started = threading.Event()
689+ stopped = threading.Event()
690+ def serving():
691+ started.set()
692+ factory.smart_server.serve()
693+ stopped.set()
694+ server_thread = threading.Thread(target=serving)
695+ server_thread.start()
696+ started.wait()
697+ client_medium = medium.SmartSimplePipesClientMedium(client_read,
698+ client_write, 'base')
699+ client_client = client._SmartClient(client_medium)
700+ resp, response_handler = client_client.call_expecting_body('get',
701+ 'bigfile')
702+ signals._sighup_handler(SIGHUP, None)
703+ self.assertTrue(factory.smart_server.finished)
704+ # We can still finish reading the file content, but more than that, and
705+ # the file is closed.
706+ v = response_handler.read_body_bytes()
707+ if v != content:
708+ self.fail('Got the wrong content back, expected 1M "a"')
709+ stopped.wait()
710+ server_thread.join()
711+
712
713=== modified file 'bzrlib/tests/test_smart_transport.py'
714--- bzrlib/tests/test_smart_transport.py 2011-09-26 14:29:38 +0000
715+++ bzrlib/tests/test_smart_transport.py 2011-09-26 14:29:40 +0000
716@@ -18,10 +18,14 @@
717
718 # all of this deals with byte strings so this is safe
719 from cStringIO import StringIO
720+import doctest
721 import os
722 import socket
723 import sys
724 import threading
725+import time
726+
727+from testtools.matchers import DocTestMatches
728
729 import bzrlib
730 from bzrlib import (
731@@ -937,6 +941,13 @@
732 server_protocol = self.build_protocol_socket('bzr request 2\n')
733 self.assertProtocolTwo(server_protocol)
734
735+ def test__build_protocol_returns_if_stopping(self):
736+ # _build_protocol should notice that we are stopping, and return
737+ # without waiting for bytes from the client.
738+ server, client_sock = self.create_socket_context(None)
739+ server._stop_gracefully()
740+ self.assertIs(None, server._build_protocol())
741+
742 def test_socket_set_timeout(self):
743 server, _ = self.create_socket_context(None, timeout=1.23)
744 self.assertEqual(1.23, server._client_timeout)
745@@ -975,6 +986,17 @@
746 data = server.read_bytes(1)
747 self.assertEqual('', data)
748
749+ def test_socket_wait_for_bytes_with_shutdown(self):
750+ server, client_sock = self.create_socket_context(None)
751+ t = time.time()
752+ # Override the _timer functionality, so that time never increments,
753+ # this way, we can be sure we stopped because of the flag, and not
754+ # because of a timeout, etc.
755+ server._timer = lambda: t
756+ server._client_poll_timeout = 0.1
757+ server._stop_gracefully()
758+ server._wait_for_bytes_with_timeout(1.0)
759+
760 def test_socket_serve_timeout_closes_socket(self):
761 server, client_sock = self.create_socket_context(None, timeout=0.1)
762 # This should timeout quickly, and then close the connection so that
763@@ -1056,6 +1078,75 @@
764
765 class TestSmartTCPServer(tests.TestCase):
766
767+ def make_server(self):
768+ """Create a SmartTCPServer that we can exercise.
769+
770+ Note: we don't use SmartTCPServer_for_testing because the testing
771+ version overrides lots of functionality like 'serve', and we want to
772+ test the raw service.
773+
774+ This will start the server in another thread, and wait for it to
775+ indicate it has finished starting up.
776+
777+ :return: (server, server_thread)
778+ """
779+ t = _mod_transport.get_transport_from_url('memory:///')
780+ server = _mod_server.SmartTCPServer(t, client_timeout=4.0)
781+ server._ACCEPT_TIMEOUT = 0.1
782+ # We don't use 'localhost' because that might be an IPv6 address.
783+ server.start_server('127.0.0.1', 0)
784+ server_thread = threading.Thread(target=server.serve,
785+ args=(self.id(),))
786+ server_thread.start()
787+ # Ensure this gets called at some point
788+ self.addCleanup(server._stop_gracefully)
789+ server._started.wait()
790+ return server, server_thread
791+
792+ def ensure_client_disconnected(self, client_sock):
793+ """Ensure that a socket is closed, discarding all errors."""
794+ try:
795+ client_sock.close()
796+ except Exception:
797+ pass
798+
799+ def connect_to_server(self, server):
800+ """Create a client socket that can talk to the server."""
801+ client_sock = socket.socket()
802+ server_info = server._server_socket.getsockname()
803+ client_sock.connect(server_info)
804+ self.addCleanup(self.ensure_client_disconnected, client_sock)
805+ return client_sock
806+
807+ def connect_to_server_and_hangup(self, server):
808+ """Connect to the server, and then hang up.
809+ That way it doesn't sit waiting for 'accept()' to timeout.
810+ """
811+ # If the server has already signaled that the socket is closed, we
812+ # don't need to try to connect to it. Not being set, though, the server
813+ # might still close the socket while we try to connect to it. So we
814+ # still have to catch the exception.
815+ if server._stopped.isSet():
816+ return
817+ try:
818+ client_sock = self.connect_to_server(server)
819+ client_sock.close()
820+ except socket.error, e:
821+ # If the server has hung up already, that is fine.
822+ pass
823+
824+ def say_hello(self, client_sock):
825+ """Send the 'hello' smart RPC, and expect the response."""
826+ client_sock.send('hello\n')
827+ self.assertEqual('ok\x012\n', client_sock.recv(5))
828+
829+ def shutdown_server_cleanly(self, server, server_thread):
830+ server._stop_gracefully()
831+ self.connect_to_server_and_hangup(server)
832+ server._stopped.wait()
833+ server._fully_stopped.wait()
834+ server_thread.join()
835+
836 def test_get_error_unexpected(self):
837 """Error reported by server with no specific representation"""
838 self.overrideEnv('BZR_NO_SMART_VFS', None)
839@@ -1081,10 +1172,130 @@
840
841 def test_propagates_timeout(self):
842 server = _mod_server.SmartTCPServer(None, client_timeout=1.23)
843- server_socket = socket.socket()
844- handler = server._make_handler(server_socket)
845+ server_sock, client_sock = portable_socket_pair()
846+ handler = server._make_handler(server_sock)
847 self.assertEqual(1.23, handler._client_timeout)
848
849+ def test_serve_conn_tracks_connections(self):
850+ server = _mod_server.SmartTCPServer(None, client_timeout=4.0)
851+ server_sock, client_sock = portable_socket_pair()
852+ server.serve_conn(server_sock, '-%s' % (self.id(),))
853+ self.assertEqual(1, len(server._active_connections))
854+ # We still want to talk on the connection. Polling should indicate it
855+ # is still active.
856+ server._poll_active_connections()
857+ self.assertEqual(1, len(server._active_connections))
858+ # Closing the socket will end the active thread, and polling will
859+ # notice and remove it from the active set.
860+ client_sock.close()
861+ server._poll_active_connections(0.1)
862+ self.assertEqual(0, len(server._active_connections))
863+
864+ def test_serve_closes_out_finished_connections(self):
865+ server, server_thread = self.make_server()
866+ # The server is started, connect to it.
867+ client_sock = self.connect_to_server(server)
868+ # We send and receive on the connection, so that we know the
869+ # server-side has seen the connect, and started handling the
870+ # results.
871+ self.say_hello(client_sock)
872+ self.assertEqual(1, len(server._active_connections))
873+ # Grab a handle to the thread that is processing our request
874+ _, server_side_thread = server._active_connections[0]
875+ # Close the connection, ask the server to stop, and wait for the
876+ # server to stop, as well as the thread that was servicing the
877+ # client request.
878+ client_sock.close()
879+ # Wait for the server-side request thread to notice we are closed.
880+ server_side_thread.join()
881+ # Stop the server, it should notice the connection has finished.
882+ self.shutdown_server_cleanly(server, server_thread)
883+ # The server should have noticed that all clients are gone before
884+ # exiting.
885+ self.assertEqual(0, len(server._active_connections))
886+
887+ def test_serve_reaps_finished_connections(self):
888+ server, server_thread = self.make_server()
889+ client_sock1 = self.connect_to_server(server)
890+ # We send and receive on the connection, so that we know the
891+ # server-side has seen the connect, and started handling the
892+ # results.
893+ self.say_hello(client_sock1)
894+ server_handler1, server_side_thread1 = server._active_connections[0]
895+ client_sock1.close()
896+ server_side_thread1.join()
897+ # By waiting until the first connection is fully done, the server
898+ # should notice after another connection that the first has finished.
899+ client_sock2 = self.connect_to_server(server)
900+ self.say_hello(client_sock2)
901+ server_handler2, server_side_thread2 = server._active_connections[-1]
902+ # There is a race condition. We know that client_sock2 has been
903+ # registered, but not that _poll_active_connections has been called. We
904+ # know that it will be called before the server will accept a new
905+ # connection, however. So connect one more time, and assert that we
906+ # either have 1 or 2 active connections (never 3), and that the 'first'
907+ # connection is not connection 1
908+ client_sock3 = self.connect_to_server(server)
909+ self.say_hello(client_sock3)
910+ # Copy the list, so we don't have it mutating behind our back
911+ conns = list(server._active_connections)
912+ self.assertEqual(2, len(conns))
913+ self.assertNotEqual((server_handler1, server_side_thread1), conns[0])
914+ self.assertEqual((server_handler2, server_side_thread2), conns[0])
915+ client_sock2.close()
916+ client_sock3.close()
917+ self.shutdown_server_cleanly(server, server_thread)
918+
919+ def test_graceful_shutdown_waits_for_clients_to_stop(self):
920+ server, server_thread = self.make_server()
921+ # We need something big enough that it won't fit in a single recv. So
922+ # the server thread gets blocked writing content to the client until we
923+ # finish reading on the client.
924+ server.backing_transport.put_bytes('bigfile',
925+ 'a'*1024*1024)
926+ client_sock = self.connect_to_server(server)
927+ self.say_hello(client_sock)
928+ _, server_side_thread = server._active_connections[0]
929+ # Start the RPC, but don't finish reading the response
930+ client_medium = medium.SmartClientAlreadyConnectedSocketMedium(
931+ 'base', client_sock)
932+ client_client = client._SmartClient(client_medium)
933+ resp, response_handler = client_client.call_expecting_body('get',
934+ 'bigfile')
935+ self.assertEqual(('ok',), resp)
936+ # Ask the server to stop gracefully, and wait for it.
937+ server._stop_gracefully()
938+ self.connect_to_server_and_hangup(server)
939+ server._stopped.wait()
940+ # It should not be accepting another connection.
941+ self.assertRaises(socket.error, self.connect_to_server, server)
942+ # It should also not be fully stopped
943+ server._fully_stopped.wait(0.01)
944+ self.assertFalse(server._fully_stopped.isSet())
945+ response_handler.read_body_bytes()
946+ client_sock.close()
947+ server_side_thread.join()
948+ server_thread.join()
949+ self.assertTrue(server._fully_stopped.isSet())
950+ log = self.get_log()
951+ self.assertThat(log, DocTestMatches("""\
952+ INFO Requested to stop gracefully
953+... Stopping SmartServerSocketStreamMedium(client=('127.0.0.1', ...
954+ INFO Waiting for 1 client(s) to finish
955+""", flags=doctest.ELLIPSIS|doctest.REPORT_UDIFF))
956+
957+ def test_stop_gracefully_tells_handlers_to_stop(self):
958+ server, server_thread = self.make_server()
959+ client_sock = self.connect_to_server(server)
960+ self.say_hello(client_sock)
961+ server_handler, server_side_thread = server._active_connections[0]
962+ self.assertFalse(server_handler.finished)
963+ server._stop_gracefully()
964+ self.assertTrue(server_handler.finished)
965+ client_sock.close()
966+ self.connect_to_server_and_hangup(server)
967+ server_thread.join()
968+
969
970 class SmartTCPTests(tests.TestCase):
971 """Tests for connection/end to end behaviour using the TCP server.
972
973=== modified file 'bzrlib/trace.py'
974--- bzrlib/trace.py 2011-07-15 14:13:32 +0000
975+++ bzrlib/trace.py 2011-09-26 14:29:40 +0000
976@@ -559,6 +559,10 @@
977 try:
978 sys.stdout.flush()
979 sys.stderr.flush()
980+ except ValueError, e:
981+ # On Windows, I get ValueError calling stdout.flush() on a closed
982+ # handle
983+ pass
984 except IOError, e:
985 import errno
986 if e.errno in [errno.EINVAL, errno.EPIPE]:
987
988=== modified file 'doc/en/release-notes/bzr-2.5.txt'
989--- doc/en/release-notes/bzr-2.5.txt 2011-09-26 14:29:38 +0000
990+++ doc/en/release-notes/bzr-2.5.txt 2011-09-26 14:29:40 +0000
991@@ -20,6 +20,16 @@
992
993 .. New commands, options, etc that users may wish to try out.
994
995+* ``bzr serve`` will now disconnect clients if they have not issued an RPC
996+ request after 5minutes. On POSIX platforms, this will also happen for
997+ ``bzr serve --inet``. This can be overridden with the configuration
998+ variable ``serve.client_timeout`` or in the command line parameter
999+ ``bzr serve --client-timeout=X``. Further, it is possible to request
1000+ ``bzr serve [--inet]`` to shutdown gracefully by sending SIGHUP. It will
1001+ finish the current request, and then close the connection.
1002+ (John Arbash Meinel, #824797, #795025)
1003+
1004+
1005 Improvements
1006 ************
1007
1008@@ -161,12 +171,6 @@
1009 The name change also affects ``bzr missing``.
1010 (Martin von Gagern)
1011
1012-* ``bzr serve`` will now disconnect clients if they have not issued an RPC
1013- request after 5minutes. On POSIX platforms, this will also happen for
1014- ``bzr serve --inet``. This can be overridden with the configuration
1015- variable ``serve.client_timeout`` or in the command line parameter
1016- ``bzr serve --client-timeout=X``. (John Arbash Meinel, #824797)
1017-
1018 * ``config.Option`` can now declare ``default_from_env``, a list of
1019 environment variables to get a default value from. (Vincent Ladeuil)
1020