Merge lp:~jameinel/bzr/2.4-client-reconnect-819604 into lp:bzr/2.4

Proposed by John A Meinel
Status: Merged
Approved by: John A Meinel
Approved revision: no longer in the source branch.
Merged at revision: 6076
Proposed branch: lp:~jameinel/bzr/2.4-client-reconnect-819604
Merge into: lp:bzr/2.4
Diff against target: 1576 lines (+891/-230)
14 files modified
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+12/-4)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+46/-6)
bzrlib/smart/protocol.py (+5/-3)
bzrlib/smart/request.py (+145/-100)
bzrlib/tests/__init__.py (+3/-1)
bzrlib/tests/test_bundle.py (+6/-3)
bzrlib/tests/test_osutils.py (+40/-1)
bzrlib/tests/test_remote.py (+4/-2)
bzrlib/tests/test_smart.py (+5/-2)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+399/-21)
doc/en/release-notes/bzr-2.4.txt (+6/-1)
To merge this branch: bzr merge lp:~jameinel/bzr/2.4-client-reconnect-819604
Reviewer Review Type Date Requested Status
Richard Wilbur Approve
Review via email: mp+78844@code.launchpad.net

Commit message

Bug #819604, merge the bzr-2.3 client-reconnect 819604 changes into bzr-2.4.

Description of the change

The bzr-2.4 version of client-reconnect (bug #819604).

The only difference for this vs the bzr-2.3 branch is that we added "Branch.heads_to_fetch" as an RPC that needed to be quantified.

To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote :

Resurrecting this, since I'm thinking to try and land it in 2.4, but not worry about 2.1 (which has very different internals from 2.4).

Revision history for this message
Richard Wilbur (richard-wilbur) wrote :

This looks like a measured response to failures caused by network misbehaviour. It seems like a good idea to get these into a version that can be tested and then tweak them further, if needed, based on experience.
+1

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 2013-05-24 8:36, Richard Wilbur wrote:
> Review: Approve
>
> This looks like a measured response to failures caused by network
> misbehaviour. It seems like a good idea to get these into a
> version that can be tested and then tweak them further, if needed,
> based on experience. +1
>

These changes have actually been in trunk for a long time. This is
just backporting it to earlier versions so that they also are in old
stable releases. (Launchpad itself will start disconnecting clients if
they are idle for too long, so we wanted people to not get too many
disconnects. Obviously releasing took longer than originally planned.)

So the changes should be pretty safe to land, since we've been
actively using them for >1year now.

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.13 (Cygwin)
Comment: Using GnuPG with Thunderbird - http://www.enigmail.net/

iEYEARECAAYFAlGgoN4ACgkQJdeBCYSNAAOH1QCgrc8wfn2yS+PbJxsdU4Ulslho
O08AoM0xE0NNaNinOSUsNX+5++mCfNkQ
=VCVm
-----END PGP SIGNATURE-----

Revision history for this message
Richard Wilbur (richard-wilbur) wrote :

In light of the fact that these changes have already been in trunk under active use for over a year, I'd like to change my vote to: I unreservedly approve.
+2

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

sent to pqm by email

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
--- bzrlib/help_topics/en/debug-flags.txt 2011-05-27 17:10:05 +0000
+++ bzrlib/help_topics/en/debug-flags.txt 2013-05-23 13:00:35 +0000
@@ -24,6 +24,8 @@
24-Dindex Trace major index operations.24-Dindex Trace major index operations.
25-Dknit Trace knit operations.25-Dknit Trace knit operations.
26-Dlock Trace when lockdir locks are taken or released.26-Dlock Trace when lockdir locks are taken or released.
27-Dnoretry If a connection is reset, fail immediately rather than
28 retrying the request.
27-Dprogress Trace progress bar operations.29-Dprogress Trace progress bar operations.
28-Dmem_dump Dump memory to a file upon an out of memory error.30-Dmem_dump Dump memory to a file upon an out of memory error.
29-Dmerge Emit information for debugging merges.31-Dmerge Emit information for debugging merges.
3032
=== modified file 'bzrlib/osutils.py'
--- bzrlib/osutils.py 2013-05-23 08:25:07 +0000
+++ bzrlib/osutils.py 2013-05-23 13:00:35 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2005-2011 Canonical Ltd1# Copyright (C) 2005-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -32,6 +32,7 @@
32# and need the former on windows32# and need the former on windows
33import shutil33import shutil
34from shutil import rmtree34from shutil import rmtree
35import signal
35import socket36import socket
36import subprocess37import subprocess
37# We need to import both tempfile and mkdtemp as we export the later on posix38# We need to import both tempfile and mkdtemp as we export the later on posix
@@ -2044,7 +2045,7 @@
2044# data at once.2045# data at once.
2045MAX_SOCKET_CHUNK = 64 * 10242046MAX_SOCKET_CHUNK = 64 * 1024
20462047
2047_end_of_stream_errors = [errno.ECONNRESET]2048_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL]
2048for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:2049for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:
2049 _eno = getattr(errno, _eno, None)2050 _eno = getattr(errno, _eno, None)
2050 if _eno is not None:2051 if _eno is not None:
@@ -2116,12 +2117,19 @@
2116 while sent_total < byte_count:2117 while sent_total < byte_count:
2117 try:2118 try:
2118 sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))2119 sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))
2119 except socket.error, e:2120 except (socket.error, IOError), e:
2121 if e.args[0] in _end_of_stream_errors:
2122 raise errors.ConnectionReset(
2123 "Error trying to write to socket", e)
2120 if e.args[0] != errno.EINTR:2124 if e.args[0] != errno.EINTR:
2121 raise2125 raise
2122 else:2126 else:
2127 if sent == 0:
2128 raise errors.ConnectionReset('Sending to %s returned 0 bytes'
2129 % (sock,))
2123 sent_total += sent2130 sent_total += sent
2124 report_activity(sent, 'write')2131 if report_activity is not None:
2132 report_activity(sent, 'write')
21252133
21262134
2127def connect_socket(address):2135def connect_socket(address):
21282136
=== modified file 'bzrlib/smart/client.py'
--- bzrlib/smart/client.py 2011-03-30 11:45:54 +0000
+++ bzrlib/smart/client.py 2013-05-23 13:00:35 +0000
@@ -14,12 +14,18 @@
14# along with this program; if not, write to the Free Software14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1616
17from bzrlib import lazy_import
18lazy_import.lazy_import(globals(), """
19from bzrlib.smart import request as _mod_request
20""")
21
17import bzrlib22import bzrlib
18from bzrlib.smart import message, protocol23from bzrlib.smart import message, protocol
19from bzrlib.trace import warning
20from bzrlib import (24from bzrlib import (
25 debug,
21 errors,26 errors,
22 hooks,27 hooks,
28 trace,
23 )29 )
2430
2531
@@ -39,93 +45,12 @@
39 def __repr__(self):45 def __repr__(self):
40 return '%s(%r)' % (self.__class__.__name__, self._medium)46 return '%s(%r)' % (self.__class__.__name__, self._medium)
4147
42 def _send_request(self, protocol_version, method, args, body=None,
43 readv_body=None, body_stream=None):
44 encoder, response_handler = self._construct_protocol(
45 protocol_version)
46 encoder.set_headers(self._headers)
47 if body is not None:
48 if readv_body is not None:
49 raise AssertionError(
50 "body and readv_body are mutually exclusive.")
51 if body_stream is not None:
52 raise AssertionError(
53 "body and body_stream are mutually exclusive.")
54 encoder.call_with_body_bytes((method, ) + args, body)
55 elif readv_body is not None:
56 if body_stream is not None:
57 raise AssertionError(
58 "readv_body and body_stream are mutually exclusive.")
59 encoder.call_with_body_readv_array((method, ) + args, readv_body)
60 elif body_stream is not None:
61 encoder.call_with_body_stream((method, ) + args, body_stream)
62 else:
63 encoder.call(method, *args)
64 return response_handler
65
66 def _run_call_hooks(self, method, args, body, readv_body):
67 if not _SmartClient.hooks['call']:
68 return
69 params = CallHookParams(method, args, body, readv_body, self._medium)
70 for hook in _SmartClient.hooks['call']:
71 hook(params)
72
73 def _call_and_read_response(self, method, args, body=None, readv_body=None,48 def _call_and_read_response(self, method, args, body=None, readv_body=None,
74 body_stream=None, expect_response_body=True):49 body_stream=None, expect_response_body=True):
75 self._run_call_hooks(method, args, body, readv_body)50 request = _SmartClientRequest(self, method, args, body=body,
76 if self._medium._protocol_version is not None:51 readv_body=readv_body, body_stream=body_stream,
77 response_handler = self._send_request(52 expect_response_body=expect_response_body)
78 self._medium._protocol_version, method, args, body=body,53 return request.call_and_read_response()
79 readv_body=readv_body, body_stream=body_stream)
80 return (response_handler.read_response_tuple(
81 expect_body=expect_response_body),
82 response_handler)
83 else:
84 for protocol_version in [3, 2]:
85 if protocol_version == 2:
86 # If v3 doesn't work, the remote side is older than 1.6.
87 self._medium._remember_remote_is_before((1, 6))
88 response_handler = self._send_request(
89 protocol_version, method, args, body=body,
90 readv_body=readv_body, body_stream=body_stream)
91 try:
92 response_tuple = response_handler.read_response_tuple(
93 expect_body=expect_response_body)
94 except errors.UnexpectedProtocolVersionMarker, err:
95 # TODO: We could recover from this without disconnecting if
96 # we recognise the protocol version.
97 warning(
98 'Server does not understand Bazaar network protocol %d,'
99 ' reconnecting. (Upgrade the server to avoid this.)'
100 % (protocol_version,))
101 self._medium.disconnect()
102 continue
103 except errors.ErrorFromSmartServer:
104 # If we received an error reply from the server, then it
105 # must be ok with this protocol version.
106 self._medium._protocol_version = protocol_version
107 raise
108 else:
109 self._medium._protocol_version = protocol_version
110 return response_tuple, response_handler
111 raise errors.SmartProtocolError(
112 'Server is not a Bazaar server: ' + str(err))
113
114 def _construct_protocol(self, version):
115 request = self._medium.get_request()
116 if version == 3:
117 request_encoder = protocol.ProtocolThreeRequester(request)
118 response_handler = message.ConventionalResponseHandler()
119 response_proto = protocol.ProtocolThreeDecoder(
120 response_handler, expect_version_marker=True)
121 response_handler.setProtoAndMediumRequest(response_proto, request)
122 elif version == 2:
123 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
124 response_handler = request_encoder
125 else:
126 request_encoder = protocol.SmartClientRequestProtocolOne(request)
127 response_handler = request_encoder
128 return request_encoder, response_handler
12954
130 def call(self, method, *args):55 def call(self, method, *args):
131 """Call a method on the remote server."""56 """Call a method on the remote server."""
@@ -191,6 +116,203 @@
191 return self._medium.remote_path_from_transport(transport)116 return self._medium.remote_path_from_transport(transport)
192117
193118
119class _SmartClientRequest(object):
120 """Encapsulate the logic for a single request.
121
122 This class handles things like reconnecting and sending the request a
123 second time when the connection is reset in the middle. It also handles the
124 multiple requests that get made if we don't know what protocol the server
125 supports yet.
126
127 Generally, you build up one of these objects, passing in the arguments that
128 you want to send to the server, and then use 'call_and_read_response' to
129 get the response from the server.
130 """
131
132 def __init__(self, client, method, args, body=None, readv_body=None,
133 body_stream=None, expect_response_body=True):
134 self.client = client
135 self.method = method
136 self.args = args
137 self.body = body
138 self.readv_body = readv_body
139 self.body_stream = body_stream
140 self.expect_response_body = expect_response_body
141
142 def call_and_read_response(self):
143 """Send the request to the server, and read the initial response.
144
145 This doesn't read all of the body content of the response, instead it
146 returns (response_tuple, response_handler). response_tuple is the 'ok',
147 or 'error' information, and 'response_handler' can be used to get the
148 content stream out.
149 """
150 self._run_call_hooks()
151 protocol_version = self.client._medium._protocol_version
152 if protocol_version is None:
153 return self._call_determining_protocol_version()
154 else:
155 return self._call(protocol_version)
156
157 def _is_safe_to_send_twice(self):
158 """Check if the current method is re-entrant safe."""
159 if self.body_stream is not None or 'noretry' in debug.debug_flags:
160 # We can't restart a body stream that has already been consumed.
161 return False
162 request_type = _mod_request.request_handlers.get_info(self.method)
163 if request_type in ('read', 'idem', 'semi'):
164 return True
165 # If we have gotten this far, 'stream' cannot be retried, because we
166 # already consumed the local stream.
167 if request_type in ('semivfs', 'mutate', 'stream'):
168 return False
169 trace.mutter('Unknown request type: %s for method %s'
170 % (request_type, self.method))
171 return False
172
173 def _run_call_hooks(self):
174 if not _SmartClient.hooks['call']:
175 return
176 params = CallHookParams(self.method, self.args, self.body,
177 self.readv_body, self.client._medium)
178 for hook in _SmartClient.hooks['call']:
179 hook(params)
180
181 def _call(self, protocol_version):
182 """We know the protocol version.
183
184 So this just sends the request, and then reads the response. This is
185 where the code will be to retry requests if the connection is closed.
186 """
187 response_handler = self._send(protocol_version)
188 try:
189 response_tuple = response_handler.read_response_tuple(
190 expect_body=self.expect_response_body)
191 except errors.ConnectionReset, e:
192 self.client._medium.reset()
193 if not self._is_safe_to_send_twice():
194 raise
195 trace.warning('ConnectionReset reading response for %r, retrying'
196 % (self.method,))
197 trace.log_exception_quietly()
198 encoder, response_handler = self._construct_protocol(
199 protocol_version)
200 self._send_no_retry(encoder)
201 response_tuple = response_handler.read_response_tuple(
202 expect_body=self.expect_response_body)
203 return (response_tuple, response_handler)
204
205 def _call_determining_protocol_version(self):
206 """Determine what protocol the remote server supports.
207
208 We do this by placing a request in the most recent protocol, and
209 handling the UnexpectedProtocolVersionMarker from the server.
210 """
211 for protocol_version in [3, 2]:
212 if protocol_version == 2:
213 # If v3 doesn't work, the remote side is older than 1.6.
214 self.client._medium._remember_remote_is_before((1, 6))
215 try:
216 response_tuple, response_handler = self._call(protocol_version)
217 except errors.UnexpectedProtocolVersionMarker, err:
218 # TODO: We could recover from this without disconnecting if
219 # we recognise the protocol version.
220 trace.warning(
221 'Server does not understand Bazaar network protocol %d,'
222 ' reconnecting. (Upgrade the server to avoid this.)'
223 % (protocol_version,))
224 self.client._medium.disconnect()
225 continue
226 except errors.ErrorFromSmartServer:
227 # If we received an error reply from the server, then it
228 # must be ok with this protocol version.
229 self.client._medium._protocol_version = protocol_version
230 raise
231 else:
232 self.client._medium._protocol_version = protocol_version
233 return response_tuple, response_handler
234 raise errors.SmartProtocolError(
235 'Server is not a Bazaar server: ' + str(err))
236
237 def _construct_protocol(self, version):
238 """Build the encoding stack for a given protocol version."""
239 request = self.client._medium.get_request()
240 if version == 3:
241 request_encoder = protocol.ProtocolThreeRequester(request)
242 response_handler = message.ConventionalResponseHandler()
243 response_proto = protocol.ProtocolThreeDecoder(
244 response_handler, expect_version_marker=True)
245 response_handler.setProtoAndMediumRequest(response_proto, request)
246 elif version == 2:
247 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
248 response_handler = request_encoder
249 else:
250 request_encoder = protocol.SmartClientRequestProtocolOne(request)
251 response_handler = request_encoder
252 return request_encoder, response_handler
253
254 def _send(self, protocol_version):
255 """Encode the request, and send it to the server.
256
257 This will retry a request if we get a ConnectionReset while sending the
258 request to the server. (Unless we have a body_stream that we have
259 already started consuming, since we can't restart body_streams)
260
261 :return: response_handler as defined by _construct_protocol
262 """
263 encoder, response_handler = self._construct_protocol(protocol_version)
264 try:
265 self._send_no_retry(encoder)
266 except errors.ConnectionReset, e:
267 # If we fail during the _send_no_retry phase, then we can
268 # be confident that the server did not get our request, because we
269 # haven't started waiting for the reply yet. So try the request
270 # again. We only issue a single retry, because if the connection
271 # really is down, there is no reason to loop endlessly.
272
273 # Connection is dead, so close our end of it.
274 self.client._medium.reset()
275 if (('noretry' in debug.debug_flags)
276 or (self.body_stream is not None
277 and encoder.body_stream_started)):
278 # We can't restart a body_stream that has been partially
279 # consumed, so we don't retry.
280 # Note: We don't have to worry about
281 # SmartClientRequestProtocolOne or Two, because they don't
282 # support client-side body streams.
283 raise
284 trace.warning('ConnectionReset calling %r, retrying'
285 % (self.method,))
286 trace.log_exception_quietly()
287 encoder, response_handler = self._construct_protocol(
288 protocol_version)
289 self._send_no_retry(encoder)
290 return response_handler
291
292 def _send_no_retry(self, encoder):
293 """Just encode the request and try to send it."""
294 encoder.set_headers(self.client._headers)
295 if self.body is not None:
296 if self.readv_body is not None:
297 raise AssertionError(
298 "body and readv_body are mutually exclusive.")
299 if self.body_stream is not None:
300 raise AssertionError(
301 "body and body_stream are mutually exclusive.")
302 encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
303 elif self.readv_body is not None:
304 if self.body_stream is not None:
305 raise AssertionError(
306 "readv_body and body_stream are mutually exclusive.")
307 encoder.call_with_body_readv_array((self.method, ) + self.args,
308 self.readv_body)
309 elif self.body_stream is not None:
310 encoder.call_with_body_stream((self.method, ) + self.args,
311 self.body_stream)
312 else:
313 encoder.call(self.method, *self.args)
314
315
194class SmartClientHooks(hooks.Hooks):316class SmartClientHooks(hooks.Hooks):
195317
196 def __init__(self):318 def __init__(self):
197319
=== modified file 'bzrlib/smart/medium.py'
--- bzrlib/smart/medium.py 2011-04-07 10:36:24 +0000
+++ bzrlib/smart/medium.py 2013-05-23 13:00:35 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2011 Canonical Ltd1# Copyright (C) 2006-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -24,6 +24,7 @@
24bzrlib/transport/smart/__init__.py.24bzrlib/transport/smart/__init__.py.
25"""25"""
2626
27import errno
27import os28import os
28import sys29import sys
29import urllib30import urllib
@@ -175,6 +176,14 @@
175 ui.ui_factory.report_transport_activity(self, bytes, direction)176 ui.ui_factory.report_transport_activity(self, bytes, direction)
176177
177178
179_bad_file_descriptor = (errno.EBADF,)
180if sys.platform == 'win32':
181 # Given on Windows if you pass a closed socket to select.select. Probably
182 # also given if you pass a file handle to select.
183 WSAENOTSOCK = 10038
184 _bad_file_descriptor += (WSAENOTSOCK,)
185
186
178class SmartServerStreamMedium(SmartMedium):187class SmartServerStreamMedium(SmartMedium):
179 """Handles smart commands coming over a stream.188 """Handles smart commands coming over a stream.
180189
@@ -238,6 +247,8 @@
238247
239 :param protocol: a SmartServerRequestProtocol.248 :param protocol: a SmartServerRequestProtocol.
240 """249 """
250 if protocol is None:
251 return
241 try:252 try:
242 self._serve_one_request_unguarded(protocol)253 self._serve_one_request_unguarded(protocol)
243 except KeyboardInterrupt:254 except KeyboardInterrupt:
@@ -709,6 +720,14 @@
709 """720 """
710 return SmartClientStreamMediumRequest(self)721 return SmartClientStreamMediumRequest(self)
711722
723 def reset(self):
724 """We have been disconnected, reset current state.
725
726 This resets things like _current_request and connected state.
727 """
728 self.disconnect()
729 self._current_request = None
730
712731
713class SmartSimplePipesClientMedium(SmartClientStreamMedium):732class SmartSimplePipesClientMedium(SmartClientStreamMedium):
714 """A client medium using simple pipes.733 """A client medium using simple pipes.
@@ -723,11 +742,20 @@
723742
724 def _accept_bytes(self, bytes):743 def _accept_bytes(self, bytes):
725 """See SmartClientStreamMedium.accept_bytes."""744 """See SmartClientStreamMedium.accept_bytes."""
726 self._writeable_pipe.write(bytes)745 try:
746 self._writeable_pipe.write(bytes)
747 except IOError, e:
748 if e.errno in (errno.EINVAL, errno.EPIPE):
749 raise errors.ConnectionReset(
750 "Error trying to write to subprocess", e)
751 raise
727 self._report_activity(len(bytes), 'write')752 self._report_activity(len(bytes), 'write')
728753
729 def _flush(self):754 def _flush(self):
730 """See SmartClientStreamMedium._flush()."""755 """See SmartClientStreamMedium._flush()."""
756 # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
757 # However, testing shows that even when the child process is
758 # gone, this doesn't error.
731 self._writeable_pipe.flush()759 self._writeable_pipe.flush()
732760
733 def _read_bytes(self, count):761 def _read_bytes(self, count):
@@ -752,8 +780,8 @@
752780
753class SmartSSHClientMedium(SmartClientStreamMedium):781class SmartSSHClientMedium(SmartClientStreamMedium):
754 """A client medium using SSH.782 """A client medium using SSH.
755 783
756 It delegates IO to a SmartClientSocketMedium or784 It delegates IO to a SmartSimplePipesClientMedium or
757 SmartClientAlreadyConnectedSocketMedium (depending on platform).785 SmartClientAlreadyConnectedSocketMedium (depending on platform).
758 """786 """
759787
@@ -896,6 +924,20 @@
896 SmartClientSocketMedium.__init__(self, base)924 SmartClientSocketMedium.__init__(self, base)
897 self._host = host925 self._host = host
898 self._port = port926 self._port = port
927 self._socket = None
928
929 def _accept_bytes(self, bytes):
930 """See SmartClientMedium.accept_bytes."""
931 self._ensure_connection()
932 osutils.send_all(self._socket, bytes, self._report_activity)
933
934 def disconnect(self):
935 """See SmartClientMedium.disconnect()."""
936 if not self._connected:
937 return
938 self._socket.close()
939 self._socket = None
940 self._connected = False
899941
900 def _ensure_connection(self):942 def _ensure_connection(self):
901 """Connect this medium if not already connected."""943 """Connect this medium if not already connected."""
@@ -992,5 +1034,3 @@
992 This invokes self._medium._flush to ensure all bytes are transmitted.1034 This invokes self._medium._flush to ensure all bytes are transmitted.
993 """1035 """
994 self._medium._flush()1036 self._medium._flush()
995
996
9971037
=== modified file 'bzrlib/smart/protocol.py'
--- bzrlib/smart/protocol.py 2011-05-19 09:32:38 +0000
+++ bzrlib/smart/protocol.py 2013-05-23 13:00:35 +0000
@@ -1081,9 +1081,6 @@
1081 self._real_write_func = write_func1081 self._real_write_func = write_func
10821082
1083 def _write_func(self, bytes):1083 def _write_func(self, bytes):
1084 # TODO: It is probably more appropriate to use sum(map(len, _buf))
1085 # for total number of bytes to write, rather than buffer based on
1086 # the number of write() calls
1087 # TODO: Another possibility would be to turn this into an async model.1084 # TODO: Another possibility would be to turn this into an async model.
1088 # Where we let another thread know that we have some bytes if1085 # Where we let another thread know that we have some bytes if
1089 # they want it, but we don't actually block for it1086 # they want it, but we don't actually block for it
@@ -1292,6 +1289,7 @@
1292 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)1289 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1293 self._medium_request = medium_request1290 self._medium_request = medium_request
1294 self._headers = {}1291 self._headers = {}
1292 self.body_stream_started = None
12951293
1296 def set_headers(self, headers):1294 def set_headers(self, headers):
1297 self._headers = headers.copy()1295 self._headers = headers.copy()
@@ -1357,6 +1355,7 @@
1357 if path is not None:1355 if path is not None:
1358 mutter(' (to %s)', path)1356 mutter(' (to %s)', path)
1359 self._request_start_time = osutils.timer_func()1357 self._request_start_time = osutils.timer_func()
1358 self.body_stream_started = False
1360 self._write_protocol_version()1359 self._write_protocol_version()
1361 self._write_headers(self._headers)1360 self._write_headers(self._headers)
1362 self._write_structure(args)1361 self._write_structure(args)
@@ -1364,6 +1363,9 @@
1364 # have finished sending the stream. We would notice at the end1363 # have finished sending the stream. We would notice at the end
1365 # anyway, but if the medium can deliver it early then it's good1364 # anyway, but if the medium can deliver it early then it's good
1366 # to short-circuit the whole request...1365 # to short-circuit the whole request...
1366 # Provoke any ConnectionReset failures before we start the body stream.
1367 self.flush()
1368 self.body_stream_started = True
1367 for exc_info, part in _iter_with_errors(stream):1369 for exc_info, part in _iter_with_errors(stream):
1368 if exc_info is not None:1370 if exc_info is not None:
1369 # Iterating the stream failed. Cleanly abort the request.1371 # Iterating the stream failed. Cleanly abort the request.
13701372
=== modified file 'bzrlib/smart/request.py'
--- bzrlib/smart/request.py 2011-05-19 09:32:38 +0000
+++ bzrlib/smart/request.py 2013-05-23 13:00:35 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -491,157 +491,202 @@
491 return SuccessfulSmartServerResponse((answer,))491 return SuccessfulSmartServerResponse((answer,))
492492
493493
494# In the 'info' attribute, we store whether this request is 'safe' to retry if
495# we get a disconnect while reading the response. It can have the values:
496# read This is purely a read request, so retrying it is perfectly ok.
497# idem An idempotent write request. Something like 'put' where if you put
498# the same bytes twice you end up with the same final bytes.
499# semi This is a request that isn't strictly idempotent, but doesn't
500# result in corruption if it is retried. This is for things like
501# 'lock' and 'unlock'. If you call lock, it updates the disk
502# structure. If you fail to read the response, you won't be able to
503# use the lock, because you don't have the lock token. Calling lock
504# again will fail, because the lock is already taken. However, we
505# can't tell if the server received our request or not. If it didn't,
506# then retrying the request is fine, as it will actually do what we
507# want. If it did, we will interrupt the current operation, but we
508# are no worse off than interrupting the current operation because of
509# a ConnectionReset.
510# semivfs Similar to semi, but specific to a Virtual FileSystem request.
511# stream This is a request that takes a stream that cannot be restarted if
512# consumed. This request is 'safe' in that if we determine the
513# connection is closed before we consume the stream, we can try
514# again.
515# mutate State is updated in a way that replaying that request results in a
516# different state. For example 'append' writes more bytes to a given
517# file. If append succeeds, it moves the file pointer.
494request_handlers = registry.Registry()518request_handlers = registry.Registry()
495request_handlers.register_lazy(519request_handlers.register_lazy(
496 'append', 'bzrlib.smart.vfs', 'AppendRequest')520 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
497request_handlers.register_lazy(521request_handlers.register_lazy(
498 'Branch.get_config_file', 'bzrlib.smart.branch',522 'Branch.get_config_file', 'bzrlib.smart.branch',
499 'SmartServerBranchGetConfigFile')523 'SmartServerBranchGetConfigFile', info='read')
500request_handlers.register_lazy(524request_handlers.register_lazy(
501 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')525 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
526 info='read')
502request_handlers.register_lazy(527request_handlers.register_lazy(
503 'Branch.get_tags_bytes', 'bzrlib.smart.branch',528 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
504 'SmartServerBranchGetTagsBytes')529 'SmartServerBranchGetTagsBytes', info='read')
505request_handlers.register_lazy(530request_handlers.register_lazy(
506 'Branch.set_tags_bytes', 'bzrlib.smart.branch',531 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
507 'SmartServerBranchSetTagsBytes')532 'SmartServerBranchSetTagsBytes', info='idem')
508request_handlers.register_lazy(533request_handlers.register_lazy(
509 'Branch.heads_to_fetch', 'bzrlib.smart.branch',534 'Branch.heads_to_fetch', 'bzrlib.smart.branch',
510 'SmartServerBranchHeadsToFetch')535 'SmartServerBranchHeadsToFetch', info='read')
511request_handlers.register_lazy(536request_handlers.register_lazy(
512 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')537 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
513request_handlers.register_lazy(538 'SmartServerBranchRequestGetStackedOnURL', info='read')
514 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')539request_handlers.register_lazy(
515request_handlers.register_lazy(540 'Branch.last_revision_info', 'bzrlib.smart.branch',
516 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')541 'SmartServerBranchRequestLastRevisionInfo', info='read')
517request_handlers.register_lazy( 'Branch.revision_history',542request_handlers.register_lazy(
518 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')543 'Branch.lock_write', 'bzrlib.smart.branch',
519request_handlers.register_lazy( 'Branch.set_config_option',544 'SmartServerBranchRequestLockWrite', info='semi')
520 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')545request_handlers.register_lazy(
521request_handlers.register_lazy( 'Branch.set_config_option_dict',546 'Branch.revision_history', 'bzrlib.smart.branch',
522 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict')547 'SmartServerRequestRevisionHistory', info='read')
523request_handlers.register_lazy( 'Branch.set_last_revision',548request_handlers.register_lazy(
524 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')549 'Branch.set_config_option', 'bzrlib.smart.branch',
550 'SmartServerBranchRequestSetConfigOption', info='idem')
551request_handlers.register_lazy(
552 'Branch.set_config_option_dict', 'bzrlib.smart.branch',
553 'SmartServerBranchRequestSetConfigOptionDict', info='idem')
554request_handlers.register_lazy(
555 'Branch.set_last_revision', 'bzrlib.smart.branch',
556 'SmartServerBranchRequestSetLastRevision', info='idem')
525request_handlers.register_lazy(557request_handlers.register_lazy(
526 'Branch.set_last_revision_info', 'bzrlib.smart.branch',558 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
527 'SmartServerBranchRequestSetLastRevisionInfo')559 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
528request_handlers.register_lazy(560request_handlers.register_lazy(
529 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',561 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
530 'SmartServerBranchRequestSetLastRevisionEx')562 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
531request_handlers.register_lazy(563request_handlers.register_lazy(
532 'Branch.set_parent_location', 'bzrlib.smart.branch',564 'Branch.set_parent_location', 'bzrlib.smart.branch',
533 'SmartServerBranchRequestSetParentLocation')565 'SmartServerBranchRequestSetParentLocation', info='idem')
534request_handlers.register_lazy(566request_handlers.register_lazy(
535 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')567 'Branch.unlock', 'bzrlib.smart.branch',
568 'SmartServerBranchRequestUnlock', info='semi')
536request_handlers.register_lazy(569request_handlers.register_lazy(
537 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',570 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
538 'SmartServerBzrDirRequestCloningMetaDir')571 'SmartServerBzrDirRequestCloningMetaDir', info='read')
539request_handlers.register_lazy(572request_handlers.register_lazy(
540 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',573 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
541 'SmartServerRequestCreateBranch')574 'SmartServerRequestCreateBranch', info='semi')
542request_handlers.register_lazy(575request_handlers.register_lazy(
543 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',576 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
544 'SmartServerRequestCreateRepository')577 'SmartServerRequestCreateRepository', info='semi')
545request_handlers.register_lazy(578request_handlers.register_lazy(
546 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',579 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
547 'SmartServerRequestFindRepositoryV1')580 'SmartServerRequestFindRepositoryV1', info='read')
548request_handlers.register_lazy(581request_handlers.register_lazy(
549 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',582 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
550 'SmartServerRequestFindRepositoryV2')583 'SmartServerRequestFindRepositoryV2', info='read')
551request_handlers.register_lazy(584request_handlers.register_lazy(
552 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',585 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
553 'SmartServerRequestFindRepositoryV3')586 'SmartServerRequestFindRepositoryV3', info='read')
554request_handlers.register_lazy(587request_handlers.register_lazy(
555 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',588 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
556 'SmartServerBzrDirRequestConfigFile')589 'SmartServerBzrDirRequestConfigFile', info='read')
557request_handlers.register_lazy(590request_handlers.register_lazy(
558 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',591 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
559 'SmartServerRequestInitializeBzrDir')592 'SmartServerRequestInitializeBzrDir', info='semi')
560request_handlers.register_lazy(593request_handlers.register_lazy(
561 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',594 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
562 'SmartServerRequestBzrDirInitializeEx')595 'SmartServerRequestBzrDirInitializeEx', info='semi')
563request_handlers.register_lazy(596request_handlers.register_lazy(
564 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')597 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
565request_handlers.register_lazy(598 info='read')
566 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')599request_handlers.register_lazy(
600 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
601 'SmartServerRequestOpenBzrDir_2_1', info='read')
567request_handlers.register_lazy(602request_handlers.register_lazy(
568 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',603 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
569 'SmartServerRequestOpenBranch')604 'SmartServerRequestOpenBranch', info='read')
570request_handlers.register_lazy(605request_handlers.register_lazy(
571 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',606 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
572 'SmartServerRequestOpenBranchV2')607 'SmartServerRequestOpenBranchV2', info='read')
573request_handlers.register_lazy(608request_handlers.register_lazy(
574 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',609 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
575 'SmartServerRequestOpenBranchV3')610 'SmartServerRequestOpenBranchV3', info='read')
576request_handlers.register_lazy(611request_handlers.register_lazy(
577 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')612 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
578request_handlers.register_lazy(613request_handlers.register_lazy(
579 'get', 'bzrlib.smart.vfs', 'GetRequest')614 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
580request_handlers.register_lazy(615request_handlers.register_lazy(
581 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')616 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
582request_handlers.register_lazy(617request_handlers.register_lazy(
583 'has', 'bzrlib.smart.vfs', 'HasRequest')618 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
584request_handlers.register_lazy(619request_handlers.register_lazy(
585 'hello', 'bzrlib.smart.request', 'HelloRequest')620 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
586request_handlers.register_lazy(621request_handlers.register_lazy(
587 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')622 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
588request_handlers.register_lazy(623 info='read')
589 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')624request_handlers.register_lazy(
590request_handlers.register_lazy(625 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
591 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')626request_handlers.register_lazy(
592request_handlers.register_lazy(627 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
593 'move', 'bzrlib.smart.vfs', 'MoveRequest')628request_handlers.register_lazy(
594request_handlers.register_lazy(629 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
595 'put', 'bzrlib.smart.vfs', 'PutRequest')630request_handlers.register_lazy(
596request_handlers.register_lazy(631 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
597 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')632request_handlers.register_lazy(
598request_handlers.register_lazy(633 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
599 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')634request_handlers.register_lazy(
600request_handlers.register_lazy(635 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
601 'rename', 'bzrlib.smart.vfs', 'RenameRequest')636request_handlers.register_lazy(
637 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
602request_handlers.register_lazy(638request_handlers.register_lazy(
603 'PackRepository.autopack', 'bzrlib.smart.packrepository',639 'PackRepository.autopack', 'bzrlib.smart.packrepository',
604 'SmartServerPackRepositoryAutopack')640 'SmartServerPackRepositoryAutopack', info='idem')
605request_handlers.register_lazy('Repository.gather_stats',641request_handlers.register_lazy(
606 'bzrlib.smart.repository',642 'Repository.gather_stats', 'bzrlib.smart.repository',
607 'SmartServerRepositoryGatherStats')643 'SmartServerRepositoryGatherStats', info='read')
608request_handlers.register_lazy('Repository.get_parent_map',644request_handlers.register_lazy(
609 'bzrlib.smart.repository',645 'Repository.get_parent_map', 'bzrlib.smart.repository',
610 'SmartServerRepositoryGetParentMap')646 'SmartServerRepositoryGetParentMap', info='read')
611request_handlers.register_lazy(647request_handlers.register_lazy(
612 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')648 'Repository.get_revision_graph', 'bzrlib.smart.repository',
613request_handlers.register_lazy(649 'SmartServerRepositoryGetRevisionGraph', info='read')
614 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')650request_handlers.register_lazy(
615request_handlers.register_lazy(651 'Repository.has_revision', 'bzrlib.smart.repository',
616 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')652 'SmartServerRequestHasRevision', info='read')
617request_handlers.register_lazy(653request_handlers.register_lazy(
618 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')654 'Repository.insert_stream', 'bzrlib.smart.repository',
619request_handlers.register_lazy(655 'SmartServerRepositoryInsertStream', info='stream')
620 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')656request_handlers.register_lazy(
621request_handlers.register_lazy(657 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
622 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')658 'SmartServerRepositoryInsertStream_1_19', info='stream')
623request_handlers.register_lazy(659request_handlers.register_lazy(
624 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')660 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
661 'SmartServerRepositoryInsertStreamLocked', info='stream')
662request_handlers.register_lazy(
663 'Repository.is_shared', 'bzrlib.smart.repository',
664 'SmartServerRepositoryIsShared', info='read')
665request_handlers.register_lazy(
666 'Repository.lock_write', 'bzrlib.smart.repository',
667 'SmartServerRepositoryLockWrite', info='semi')
625request_handlers.register_lazy(668request_handlers.register_lazy(
626 'Repository.set_make_working_trees', 'bzrlib.smart.repository',669 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
627 'SmartServerRepositorySetMakeWorkingTrees')670 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
628request_handlers.register_lazy(671request_handlers.register_lazy(
629 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')672 'Repository.unlock', 'bzrlib.smart.repository',
673 'SmartServerRepositoryUnlock', info='semi')
630request_handlers.register_lazy(674request_handlers.register_lazy(
631 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',675 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
632 'SmartServerRepositoryGetRevIdForRevno')676 'SmartServerRepositoryGetRevIdForRevno', info='read')
633request_handlers.register_lazy(677request_handlers.register_lazy(
634 'Repository.get_stream', 'bzrlib.smart.repository',678 'Repository.get_stream', 'bzrlib.smart.repository',
635 'SmartServerRepositoryGetStream')679 'SmartServerRepositoryGetStream', info='read')
636request_handlers.register_lazy(680request_handlers.register_lazy(
637 'Repository.get_stream_1.19', 'bzrlib.smart.repository',681 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
638 'SmartServerRepositoryGetStream_1_19')682 'SmartServerRepositoryGetStream_1_19', info='read')
639request_handlers.register_lazy(683request_handlers.register_lazy(
640 'Repository.tarball', 'bzrlib.smart.repository',684 'Repository.tarball', 'bzrlib.smart.repository',
641 'SmartServerRepositoryTarball')685 'SmartServerRepositoryTarball', info='read')
642request_handlers.register_lazy(686request_handlers.register_lazy(
643 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')687 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
644request_handlers.register_lazy(688request_handlers.register_lazy(
645 'stat', 'bzrlib.smart.vfs', 'StatRequest')689 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
646request_handlers.register_lazy(690request_handlers.register_lazy(
647 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')691 'Transport.is_readonly', 'bzrlib.smart.request',
692 'SmartServerIsReadonly', info='read')
648693
=== modified file 'bzrlib/tests/__init__.py'
--- bzrlib/tests/__init__.py 2012-11-05 11:20:40 +0000
+++ bzrlib/tests/__init__.py 2013-05-23 13:00:35 +0000
@@ -2336,8 +2336,10 @@
2336 from bzrlib.smart import request2336 from bzrlib.smart import request
2337 request_handlers = request.request_handlers2337 request_handlers = request.request_handlers
2338 orig_method = request_handlers.get(verb)2338 orig_method = request_handlers.get(verb)
2339 orig_info = request_handlers.get_info(verb)
2339 request_handlers.remove(verb)2340 request_handlers.remove(verb)
2340 self.addCleanup(request_handlers.register, verb, orig_method)2341 self.addCleanup(request_handlers.register, verb, orig_method,
2342 info=orig_info)
23412343
23422344
2343class CapturedCall(object):2345class CapturedCall(object):
23442346
=== modified file 'bzrlib/tests/test_bundle.py'
--- bzrlib/tests/test_bundle.py 2011-05-13 12:51:05 +0000
+++ bzrlib/tests/test_bundle.py 2013-05-23 13:00:35 +0000
@@ -1852,20 +1852,23 @@
1852 self.sock.bind(('127.0.0.1', 0))1852 self.sock.bind(('127.0.0.1', 0))
1853 self.sock.listen(1)1853 self.sock.listen(1)
1854 self.port = self.sock.getsockname()[1]1854 self.port = self.sock.getsockname()[1]
1855 self.stopping = threading.Event()
1855 self.thread = threading.Thread(1856 self.thread = threading.Thread(
1856 name='%s (port %d)' % (self.__class__.__name__, self.port),1857 name='%s (port %d)' % (self.__class__.__name__, self.port),
1857 target=self.accept_and_close)1858 target=self.accept_and_close)
1858 self.thread.start()1859 self.thread.start()
18591860
1860 def accept_and_close(self):1861 def accept_and_close(self):
1861 conn, addr = self.sock.accept()1862 while not self.stopping.isSet():
1862 conn.shutdown(socket.SHUT_RDWR)1863 conn, addr = self.sock.accept()
1863 conn.close()1864 conn.shutdown(socket.SHUT_RDWR)
1865 conn.close()
18641866
1865 def get_url(self):1867 def get_url(self):
1866 return 'bzr://127.0.0.1:%d/' % (self.port,)1868 return 'bzr://127.0.0.1:%d/' % (self.port,)
18671869
1868 def stop_server(self):1870 def stop_server(self):
1871 self.stopping.set()
1869 try:1872 try:
1870 # make sure the thread dies by connecting to the listening socket,1873 # make sure the thread dies by connecting to the listening socket,
1871 # just in case the test failed to do so.1874 # just in case the test failed to do so.
18721875
=== modified file 'bzrlib/tests/test_osutils.py'
--- bzrlib/tests/test_osutils.py 2013-05-23 08:25:07 +0000
+++ bzrlib/tests/test_osutils.py 2013-05-23 13:00:35 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2005-2011 Canonical Ltd1# Copyright (C) 2005-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -872,6 +872,45 @@
872 self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))872 self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
873873
874874
875class TestSendAll(tests.TestCase):
876
877 def test_send_with_disconnected_socket(self):
878 class DisconnectedSocket(object):
879 def __init__(self, err):
880 self.err = err
881 def send(self, content):
882 raise self.err
883 def close(self):
884 pass
885 # All of these should be treated as ConnectionReset
886 errs = []
887 for err_cls in (IOError, socket.error):
888 for errnum in osutils._end_of_stream_errors:
889 errs.append(err_cls(errnum))
890 for err in errs:
891 sock = DisconnectedSocket(err)
892 self.assertRaises(errors.ConnectionReset,
893 osutils.send_all, sock, 'some more content')
894
895 def test_send_with_no_progress(self):
896 # See https://bugs.launchpad.net/bzr/+bug/1047309
897 # It seems that paramiko can get into a state where it doesn't error,
898 # but it returns 0 bytes sent for requests over and over again.
899 class NoSendingSocket(object):
900 def __init__(self):
901 self.call_count = 0
902 def send(self, bytes):
903 self.call_count += 1
904 if self.call_count > 100:
905 # Prevent the test suite from hanging
906 raise RuntimeError('too many calls')
907 return 0
908 sock = NoSendingSocket()
909 self.assertRaises(errors.ConnectionReset,
910 osutils.send_all, sock, 'content')
911 self.assertEqual(1, sock.call_count)
912
913
875class TestWin32Funcs(tests.TestCase):914class TestWin32Funcs(tests.TestCase):
876 """Test that _win32 versions of os utilities return appropriate paths."""915 """Test that _win32 versions of os utilities return appropriate paths."""
877916
878917
=== modified file 'bzrlib/tests/test_remote.py'
--- bzrlib/tests/test_remote.py 2011-08-09 14:18:05 +0000
+++ bzrlib/tests/test_remote.py 2013-05-23 13:00:35 +0000
@@ -3411,9 +3411,11 @@
3411 def override_verb(self, verb_name, verb):3411 def override_verb(self, verb_name, verb):
3412 request_handlers = request.request_handlers3412 request_handlers = request.request_handlers
3413 orig_verb = request_handlers.get(verb_name)3413 orig_verb = request_handlers.get(verb_name)
3414 request_handlers.register(verb_name, verb, override_existing=True)3414 orig_info = request_handlers.get_info(verb_name)
3415 request_handlers.register(verb_name, verb, override_existing=True,
3416 info=orig_info)
3415 self.addCleanup(request_handlers.register, verb_name, orig_verb,3417 self.addCleanup(request_handlers.register, verb_name, orig_verb,
3416 override_existing=True)3418 override_existing=True, info=orig_info)
34173419
3418 def test_fetch_everything_backwards_compat(self):3420 def test_fetch_everything_backwards_compat(self):
3419 """Can fetch with EverythingResult even with pre 2.4 servers.3421 """Can fetch with EverythingResult even with pre 2.4 servers.
34203422
=== modified file 'bzrlib/tests/test_smart.py'
--- bzrlib/tests/test_smart.py 2011-05-26 08:05:45 +0000
+++ bzrlib/tests/test_smart.py 2013-05-23 13:00:35 +0000
@@ -1860,8 +1860,11 @@
1860 """All registered request_handlers can be found."""1860 """All registered request_handlers can be found."""
1861 # If there's a typo in a register_lazy call, this loop will fail with1861 # If there's a typo in a register_lazy call, this loop will fail with
1862 # an AttributeError.1862 # an AttributeError.
1863 for key, item in smart_req.request_handlers.iteritems():1863 for key in smart_req.request_handlers.keys():
1864 pass1864 try:
1865 item = smart_req.request_handlers.get(key)
1866 except AttributeError, e:
1867 raise AttributeError('failed to get %s: %s' % (key, e))
18651868
1866 def assertHandlerEqual(self, verb, handler):1869 def assertHandlerEqual(self, verb, handler):
1867 self.assertEqual(smart_req.request_handlers.get(verb), handler)1870 self.assertEqual(smart_req.request_handlers.get(verb), handler)
18681871
=== modified file 'bzrlib/tests/test_smart_request.py'
--- bzrlib/tests/test_smart_request.py 2011-03-02 21:04:00 +0000
+++ bzrlib/tests/test_smart_request.py 2013-05-23 13:00:35 +0000
@@ -118,6 +118,16 @@
118 self.assertEqual(118 self.assertEqual(
119 [[transport]] * 3, handler._command.jail_transports_log)119 [[transport]] * 3, handler._command.jail_transports_log)
120120
121 def test_all_registered_requests_are_safety_qualified(self):
122 unclassified_requests = []
123 allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
124 for key in request.request_handlers.keys():
125 info = request.request_handlers.get_info(key)
126 if info is None or info not in allowed_info:
127 unclassified_requests.append(key)
128 if unclassified_requests:
129 self.fail('These requests were not categorized as safe/unsafe'
130 ' to retry: %s' % (unclassified_requests,))
121131
122132
123class TestSmartRequestHandlerErrorTranslation(TestCase):133class TestSmartRequestHandlerErrorTranslation(TestCase):
124134
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- bzrlib/tests/test_smart_transport.py 2011-05-26 08:05:45 +0000
+++ bzrlib/tests/test_smart_transport.py 2013-05-23 13:00:35 +0000
@@ -18,13 +18,17 @@
1818
19# all of this deals with byte strings so this is safe19# all of this deals with byte strings so this is safe
20from cStringIO import StringIO20from cStringIO import StringIO
21import errno
21import os22import os
22import socket23import socket
24import subprocess
25import sys
23import threading26import threading
2427
25import bzrlib28import bzrlib
26from bzrlib import (29from bzrlib import (
27 bzrdir,30 bzrdir,
31 debug,
28 errors,32 errors,
29 osutils,33 osutils,
30 tests,34 tests,
@@ -53,6 +57,29 @@
53 )57 )
5458
5559
60def create_file_pipes():
61 r, w = os.pipe()
62 # These must be opened without buffering, or we get undefined results
63 rf = os.fdopen(r, 'rb', 0)
64 wf = os.fdopen(w, 'wb', 0)
65 return rf, wf
66
67
68def portable_socket_pair():
69 """Return a pair of TCP sockets connected to each other.
70
71 Unlike socket.socketpair, this should work on Windows.
72 """
73 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74 listen_sock.bind(('127.0.0.1', 0))
75 listen_sock.listen(1)
76 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
77 client_sock.connect(listen_sock.getsockname())
78 server_sock, addr = listen_sock.accept()
79 listen_sock.close()
80 return server_sock, client_sock
81
82
56class StringIOSSHVendor(object):83class StringIOSSHVendor(object):
57 """A SSH vendor that uses StringIO to buffer writes and answer reads."""84 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
5885
@@ -67,6 +94,27 @@
67 return StringIOSSHConnection(self)94 return StringIOSSHConnection(self)
6895
6996
97class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
98 """The first connection will be considered closed.
99
100 The second connection will succeed normally.
101 """
102
103 def __init__(self, read_from, write_to, fail_at_write=True):
104 super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
105 write_to)
106 self.fail_at_write = fail_at_write
107 self._first = True
108
109 def connect_ssh(self, username, password, host, port, command):
110 self.calls.append(('connect_ssh', username, password, host, port,
111 command))
112 if self._first:
113 self._first = False
114 return ClosedSSHConnection(self)
115 return StringIOSSHConnection(self)
116
117
70class StringIOSSHConnection(ssh.SSHConnection):118class StringIOSSHConnection(ssh.SSHConnection):
71 """A SSH connection that uses StringIO to buffer writes and answer reads."""119 """A SSH connection that uses StringIO to buffer writes and answer reads."""
72120
@@ -82,6 +130,29 @@
82 return 'pipes', (self.vendor.read_from, self.vendor.write_to)130 return 'pipes', (self.vendor.read_from, self.vendor.write_to)
83131
84132
133class ClosedSSHConnection(ssh.SSHConnection):
134 """An SSH connection that just has closed channels."""
135
136 def __init__(self, vendor):
137 self.vendor = vendor
138
139 def close(self):
140 self.vendor.calls.append(('close', ))
141
142 def get_sock_or_pipes(self):
143 # We create matching pipes, and then close the ssh side
144 bzr_read, ssh_write = create_file_pipes()
145 # We always fail when bzr goes to read
146 ssh_write.close()
147 if self.vendor.fail_at_write:
148 # If set, we'll also fail when bzr goes to write
149 ssh_read, bzr_write = create_file_pipes()
150 ssh_read.close()
151 else:
152 bzr_write = self.vendor.write_to
153 return 'pipes', (bzr_read, bzr_write)
154
155
85class _InvalidHostnameFeature(tests.Feature):156class _InvalidHostnameFeature(tests.Feature):
86 """Does 'non_existent.invalid' fail to resolve?157 """Does 'non_existent.invalid' fail to resolve?
87158
@@ -177,6 +248,91 @@
177 client_medium._accept_bytes('abc')248 client_medium._accept_bytes('abc')
178 self.assertEqual('abc', output.getvalue())249 self.assertEqual('abc', output.getvalue())
179250
251 def test_simple_pipes__accept_bytes_subprocess_closed(self):
252 # It is unfortunate that we have to use Popen for this. However,
253 # os.pipe() does not behave the same as subprocess.Popen().
254 # On Windows, if you use os.pipe() and close the write side,
255 # read.read() hangs. On Linux, read.read() returns the empty string.
256 p = subprocess.Popen([sys.executable, '-c',
257 'import sys\n'
258 'sys.stdout.write(sys.stdin.read(4))\n'
259 'sys.stdout.close()\n'],
260 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
261 client_medium = medium.SmartSimplePipesClientMedium(
262 p.stdout, p.stdin, 'base')
263 client_medium._accept_bytes('abc\n')
264 self.assertEqual('abc', client_medium._read_bytes(3))
265 p.wait()
266 # While writing to the underlying pipe,
267 # Windows py2.6.6 we get IOError(EINVAL)
268 # Lucid py2.6.5, we get IOError(EPIPE)
269 # In both cases, it should be wrapped to ConnectionReset
270 self.assertRaises(errors.ConnectionReset,
271 client_medium._accept_bytes, 'more')
272
273 def test_simple_pipes__accept_bytes_pipe_closed(self):
274 child_read, client_write = create_file_pipes()
275 client_medium = medium.SmartSimplePipesClientMedium(
276 None, client_write, 'base')
277 client_medium._accept_bytes('abc\n')
278 self.assertEqual('abc\n', child_read.read(4))
279 # While writing to the underlying pipe,
280 # Windows py2.6.6 we get IOError(EINVAL)
281 # Lucid py2.6.5, we get IOError(EPIPE)
282 # In both cases, it should be wrapped to ConnectionReset
283 child_read.close()
284 self.assertRaises(errors.ConnectionReset,
285 client_medium._accept_bytes, 'more')
286
287 def test_simple_pipes__flush_pipe_closed(self):
288 child_read, client_write = create_file_pipes()
289 client_medium = medium.SmartSimplePipesClientMedium(
290 None, client_write, 'base')
291 client_medium._accept_bytes('abc\n')
292 child_read.close()
293 # Even though the pipe is closed, flush on the write side seems to be a
294 # no-op, rather than a failure.
295 client_medium._flush()
296
297 def test_simple_pipes__flush_subprocess_closed(self):
298 p = subprocess.Popen([sys.executable, '-c',
299 'import sys\n'
300 'sys.stdout.write(sys.stdin.read(4))\n'
301 'sys.stdout.close()\n'],
302 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
303 client_medium = medium.SmartSimplePipesClientMedium(
304 p.stdout, p.stdin, 'base')
305 client_medium._accept_bytes('abc\n')
306 p.wait()
307 # Even though the child process is dead, flush seems to be a no-op.
308 client_medium._flush()
309
310 def test_simple_pipes__read_bytes_pipe_closed(self):
311 child_read, client_write = create_file_pipes()
312 client_medium = medium.SmartSimplePipesClientMedium(
313 child_read, client_write, 'base')
314 client_medium._accept_bytes('abc\n')
315 client_write.close()
316 self.assertEqual('abc\n', client_medium._read_bytes(4))
317 self.assertEqual('', client_medium._read_bytes(4))
318
319 def test_simple_pipes__read_bytes_subprocess_closed(self):
320 p = subprocess.Popen([sys.executable, '-c',
321 'import sys\n'
322 'if sys.platform == "win32":\n'
323 ' import msvcrt, os\n'
324 ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
325 ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
326 'sys.stdout.write(sys.stdin.read(4))\n'
327 'sys.stdout.close()\n'],
328 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
329 client_medium = medium.SmartSimplePipesClientMedium(
330 p.stdout, p.stdin, 'base')
331 client_medium._accept_bytes('abc\n')
332 p.wait()
333 self.assertEqual('abc\n', client_medium._read_bytes(4))
334 self.assertEqual('', client_medium._read_bytes(4))
335
180 def test_simple_pipes_client_disconnect_does_nothing(self):336 def test_simple_pipes_client_disconnect_does_nothing(self):
181 # calling disconnect does nothing.337 # calling disconnect does nothing.
182 input = StringIO()338 input = StringIO()
@@ -564,6 +720,28 @@
564 request.finished_reading()720 request.finished_reading()
565 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)721 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
566722
723 def test_reset(self):
724 server_sock, client_sock = portable_socket_pair()
725 # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
726 # bzr where it exists.
727 client_medium = medium.SmartTCPClientMedium(None, None, None)
728 client_medium._socket = client_sock
729 client_medium._connected = True
730 req = client_medium.get_request()
731 self.assertRaises(errors.TooManyConcurrentRequests,
732 client_medium.get_request)
733 client_medium.reset()
734 # The stream should be reset, marked as disconnected, though ready for
735 # us to make a new request
736 self.assertFalse(client_medium._connected)
737 self.assertIs(None, client_medium._socket)
738 try:
739 self.assertEqual('', client_sock.recv(1))
740 except socket.error, e:
741 if e.errno not in (errno.EBADF,):
742 raise
743 req = client_medium.get_request()
744
567745
568class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):746class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):
569747
@@ -617,20 +795,6 @@
617 super(TestSmartServerStreamMedium, self).setUp()795 super(TestSmartServerStreamMedium, self).setUp()
618 self.overrideEnv('BZR_NO_SMART_VFS', None)796 self.overrideEnv('BZR_NO_SMART_VFS', None)
619797
620 def portable_socket_pair(self):
621 """Return a pair of TCP sockets connected to each other.
622
623 Unlike socket.socketpair, this should work on Windows.
624 """
625 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
626 listen_sock.bind(('127.0.0.1', 0))
627 listen_sock.listen(1)
628 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
629 client_sock.connect(listen_sock.getsockname())
630 server_sock, addr = listen_sock.accept()
631 listen_sock.close()
632 return server_sock, client_sock
633
634 def test_smart_query_version(self):798 def test_smart_query_version(self):
635 """Feed a canned query version to a server"""799 """Feed a canned query version to a server"""
636 # wire-to-wire, using the whole stack800 # wire-to-wire, using the whole stack
@@ -695,7 +859,7 @@
695859
696 def test_socket_stream_with_bulk_data(self):860 def test_socket_stream_with_bulk_data(self):
697 sample_request_bytes = 'command\n9\nbulk datadone\n'861 sample_request_bytes = 'command\n9\nbulk datadone\n'
698 server_sock, client_sock = self.portable_socket_pair()862 server_sock, client_sock = portable_socket_pair()
699 server = medium.SmartServerSocketStreamMedium(863 server = medium.SmartServerSocketStreamMedium(
700 server_sock, None)864 server_sock, None)
701 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)865 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -714,7 +878,7 @@
714 self.assertTrue(server.finished)878 self.assertTrue(server.finished)
715879
716 def test_socket_stream_shutdown_detection(self):880 def test_socket_stream_shutdown_detection(self):
717 server_sock, client_sock = self.portable_socket_pair()881 server_sock, client_sock = portable_socket_pair()
718 client_sock.close()882 client_sock.close()
719 server = medium.SmartServerSocketStreamMedium(883 server = medium.SmartServerSocketStreamMedium(
720 server_sock, None)884 server_sock, None)
@@ -734,7 +898,7 @@
734 rest_of_request_bytes = 'lo\n'898 rest_of_request_bytes = 'lo\n'
735 expected_response = (899 expected_response = (
736 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')900 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
737 server_sock, client_sock = self.portable_socket_pair()901 server_sock, client_sock = portable_socket_pair()
738 server = medium.SmartServerSocketStreamMedium(902 server = medium.SmartServerSocketStreamMedium(
739 server_sock, None)903 server_sock, None)
740 client_sock.sendall(incomplete_request_bytes)904 client_sock.sendall(incomplete_request_bytes)
@@ -810,7 +974,7 @@
810 # _serve_one_request should still process both of them as if they had974 # _serve_one_request should still process both of them as if they had
811 # been received separately.975 # been received separately.
812 sample_request_bytes = 'command\n'976 sample_request_bytes = 'command\n'
813 server_sock, client_sock = self.portable_socket_pair()977 server_sock, client_sock = portable_socket_pair()
814 server = medium.SmartServerSocketStreamMedium(978 server = medium.SmartServerSocketStreamMedium(
815 server_sock, None)979 server_sock, None)
816 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)980 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -847,7 +1011,7 @@
847 self.assertTrue(server.finished)1011 self.assertTrue(server.finished)
8481012
849 def test_socket_stream_error_handling(self):1013 def test_socket_stream_error_handling(self):
850 server_sock, client_sock = self.portable_socket_pair()1014 server_sock, client_sock = portable_socket_pair()
851 server = medium.SmartServerSocketStreamMedium(1015 server = medium.SmartServerSocketStreamMedium(
852 server_sock, None)1016 server_sock, None)
853 fake_protocol = ErrorRaisingProtocol(Exception('boom'))1017 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
@@ -868,7 +1032,7 @@
868 self.assertEqual('', from_server.getvalue())1032 self.assertEqual('', from_server.getvalue())
8691033
870 def test_socket_stream_keyboard_interrupt_handling(self):1034 def test_socket_stream_keyboard_interrupt_handling(self):
871 server_sock, client_sock = self.portable_socket_pair()1035 server_sock, client_sock = portable_socket_pair()
872 server = medium.SmartServerSocketStreamMedium(1036 server = medium.SmartServerSocketStreamMedium(
873 server_sock, None)1037 server_sock, None)
874 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))1038 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
@@ -885,7 +1049,7 @@
885 return server._build_protocol()1049 return server._build_protocol()
8861050
887 def build_protocol_socket(self, bytes):1051 def build_protocol_socket(self, bytes):
888 server_sock, client_sock = self.portable_socket_pair()1052 server_sock, client_sock = portable_socket_pair()
889 server = medium.SmartServerSocketStreamMedium(1053 server = medium.SmartServerSocketStreamMedium(
890 server_sock, None)1054 server_sock, None)
891 client_sock.sendall(bytes)1055 client_sock.sendall(bytes)
@@ -2793,6 +2957,33 @@
2793 'e', # end2957 'e', # end
2794 output.getvalue())2958 output.getvalue())
27952959
2960 def test_records_start_of_body_stream(self):
2961 requester, output = self.make_client_encoder_and_output()
2962 requester.set_headers({})
2963 in_stream = [False]
2964 def stream_checker():
2965 self.assertTrue(requester.body_stream_started)
2966 in_stream[0] = True
2967 yield 'content'
2968 flush_called = []
2969 orig_flush = requester.flush
2970 def tracked_flush():
2971 flush_called.append(in_stream[0])
2972 if in_stream[0]:
2973 self.assertTrue(requester.body_stream_started)
2974 else:
2975 self.assertFalse(requester.body_stream_started)
2976 return orig_flush()
2977 requester.flush = tracked_flush
2978 requester.call_with_body_stream(('one arg',), stream_checker())
2979 self.assertEqual(
2980 'bzr message 3 (bzr 1.6)\n' # protocol version
2981 '\x00\x00\x00\x02de' # headers
2982 's\x00\x00\x00\x0bl7:one arge' # args
2983 'b\x00\x00\x00\x07content' # body
2984 'e', output.getvalue())
2985 self.assertEqual([False, True, True], flush_called)
2986
27962987
2797class StubMediumRequest(object):2988class StubMediumRequest(object):
2798 """A stub medium request that tracks the number of times accept_bytes is2989 """A stub medium request that tracks the number of times accept_bytes is
@@ -3218,6 +3409,193 @@
3218 # encoder.3409 # encoder.
32193410
32203411
3412class Test_SmartClientRequest(tests.TestCase):
3413
3414 def make_client_with_failing_medium(self, fail_at_write=True, response=''):
3415 response_io = StringIO(response)
3416 output = StringIO()
3417 vendor = FirstRejectedStringIOSSHVendor(response_io, output,
3418 fail_at_write=fail_at_write)
3419 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3420 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3421 smart_client = client._SmartClient(client_medium, headers={})
3422 return output, vendor, smart_client
3423
3424 def make_response(self, args, body=None, body_stream=None):
3425 response_io = StringIO()
3426 response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
3427 body_stream=body_stream)
3428 responder = protocol.ProtocolThreeResponder(response_io.write)
3429 responder.send_response(response)
3430 return response_io.getvalue()
3431
3432 def test__call_doesnt_retry_append(self):
3433 response = self.make_response(('appended', '8'))
3434 output, vendor, smart_client = self.make_client_with_failing_medium(
3435 fail_at_write=False, response=response)
3436 smart_request = client._SmartClientRequest(smart_client, 'append',
3437 ('foo', ''), body='content\n')
3438 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3439
3440 def test__call_retries_get_bytes(self):
3441 response = self.make_response(('ok',), 'content\n')
3442 output, vendor, smart_client = self.make_client_with_failing_medium(
3443 fail_at_write=False, response=response)
3444 smart_request = client._SmartClientRequest(smart_client, 'get',
3445 ('foo',))
3446 response, response_handler = smart_request._call(3)
3447 self.assertEqual(('ok',), response)
3448 self.assertEqual('content\n', response_handler.read_body_bytes())
3449
3450 def test__call_noretry_get_bytes(self):
3451 debug.debug_flags.add('noretry')
3452 response = self.make_response(('ok',), 'content\n')
3453 output, vendor, smart_client = self.make_client_with_failing_medium(
3454 fail_at_write=False, response=response)
3455 smart_request = client._SmartClientRequest(smart_client, 'get',
3456 ('foo',))
3457 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3458
3459 def test__send_no_retry_pipes(self):
3460 client_read, server_write = create_file_pipes()
3461 server_read, client_write = create_file_pipes()
3462 client_medium = medium.SmartSimplePipesClientMedium(client_read,
3463 client_write, base='/')
3464 smart_client = client._SmartClient(client_medium)
3465 smart_request = client._SmartClientRequest(smart_client,
3466 'hello', ())
3467 # Close the server side
3468 server_read.close()
3469 encoder, response_handler = smart_request._construct_protocol(3)
3470 self.assertRaises(errors.ConnectionReset,
3471 smart_request._send_no_retry, encoder)
3472
3473 def test__send_read_response_sockets(self):
3474 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3475 listen_sock.bind(('127.0.0.1', 0))
3476 listen_sock.listen(1)
3477 host, port = listen_sock.getsockname()
3478 client_medium = medium.SmartTCPClientMedium(host, port, '/')
3479 client_medium._ensure_connection()
3480 smart_client = client._SmartClient(client_medium)
3481 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3482 # Accept the connection, but don't actually talk to the client.
3483 server_sock, _ = listen_sock.accept()
3484 server_sock.close()
3485 # Sockets buffer and don't really notice that the server has closed the
3486 # connection until we try to read again.
3487 handler = smart_request._send(3)
3488 self.assertRaises(errors.ConnectionReset,
3489 handler.read_response_tuple, expect_body=False)
3490
3491 def test__send_retries_on_write(self):
3492 output, vendor, smart_client = self.make_client_with_failing_medium()
3493 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3494 handler = smart_request._send(3)
3495 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3496 '\x00\x00\x00\x02de' # empty headers
3497 's\x00\x00\x00\tl5:helloee',
3498 output.getvalue())
3499 self.assertEqual(
3500 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3501 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3502 ('close',),
3503 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3504 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3505 ],
3506 vendor.calls)
3507
3508 def test__send_doesnt_retry_read_failure(self):
3509 output, vendor, smart_client = self.make_client_with_failing_medium(
3510 fail_at_write=False)
3511 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3512 handler = smart_request._send(3)
3513 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3514 '\x00\x00\x00\x02de' # empty headers
3515 's\x00\x00\x00\tl5:helloee',
3516 output.getvalue())
3517 self.assertEqual(
3518 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3519 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3520 ],
3521 vendor.calls)
3522 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3523
3524 def test__send_request_retries_body_stream_if_not_started(self):
3525 output, vendor, smart_client = self.make_client_with_failing_medium()
3526 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3527 body_stream=['a', 'b'])
3528 response_handler = smart_request._send(3)
3529 # We connect, get disconnected, and notice before consuming the stream,
3530 # so we try again one time and succeed.
3531 self.assertEqual(
3532 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3533 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3534 ('close',),
3535 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3536 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3537 ],
3538 vendor.calls)
3539 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3540 '\x00\x00\x00\x02de' # empty headers
3541 's\x00\x00\x00\tl5:helloe'
3542 'b\x00\x00\x00\x01a'
3543 'b\x00\x00\x00\x01b'
3544 'e',
3545 output.getvalue())
3546
3547 def test__send_request_stops_if_body_started(self):
3548 # We intentionally use the python StringIO so that we can subclass it.
3549 from StringIO import StringIO
3550 response = StringIO()
3551
3552 class FailAfterFirstWrite(StringIO):
3553 """Allow one 'write' call to pass, fail the rest"""
3554 def __init__(self):
3555 StringIO.__init__(self)
3556 self._first = True
3557
3558 def write(self, s):
3559 if self._first:
3560 self._first = False
3561 return StringIO.write(self, s)
3562 raise IOError(errno.EINVAL, 'invalid file handle')
3563 output = FailAfterFirstWrite()
3564
3565 vendor = FirstRejectedStringIOSSHVendor(response, output,
3566 fail_at_write=False)
3567 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3568 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3569 smart_client = client._SmartClient(client_medium, headers={})
3570 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3571 body_stream=['a', 'b'])
3572 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3573 # We connect, and manage to get to the point that we start consuming
3574 # the body stream. The next write fails, so we just stop.
3575 self.assertEqual(
3576 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3577 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3578 ('close',),
3579 ],
3580 vendor.calls)
3581 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3582 '\x00\x00\x00\x02de' # empty headers
3583 's\x00\x00\x00\tl5:helloe',
3584 output.getvalue())
3585
3586 def test__send_disabled_retry(self):
3587 debug.debug_flags.add('noretry')
3588 output, vendor, smart_client = self.make_client_with_failing_medium()
3589 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3590 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3591 self.assertEqual(
3592 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3593 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3594 ('close',),
3595 ],
3596 vendor.calls)
3597
3598
3221class LengthPrefixedBodyDecoder(tests.TestCase):3599class LengthPrefixedBodyDecoder(tests.TestCase):
32223600
3223 # XXX: TODO: make accept_reading_trailer invoke translate_response or3601 # XXX: TODO: make accept_reading_trailer invoke translate_response or
32243602
=== modified file 'doc/en/release-notes/bzr-2.4.txt'
--- doc/en/release-notes/bzr-2.4.txt 2013-05-23 08:30:09 +0000
+++ doc/en/release-notes/bzr-2.4.txt 2013-05-23 13:00:35 +0000
@@ -145,7 +145,12 @@
145 avoids raising a spurious MemoryError on certain platforms such as AIX.145 avoids raising a spurious MemoryError on certain platforms such as AIX.
146 (John Arbash Meinel, #856731)146 (John Arbash Meinel, #856731)
147147
148 148* Teach the bzr client how to reconnect if we get ``ConnectionReset``
149 while making an RPC request. This doesn't handle all possible network
150 disconnects, but it should at least handle when the server is asked to
151 shutdown gracefully. (John Arbash Meinel, #819604)
152
153
149Documentation154Documentation
150*************155*************
151156

Subscribers

People subscribed via source and target branches