Merge lp:~jameinel/bzr/2.2-all-reconnect-819604 into lp:bzr/2.2

Proposed by John A Meinel
Status: Rejected
Rejected by: John A Meinel
Proposed branch: lp:~jameinel/bzr/2.2-all-reconnect-819604
Merge into: lp:bzr/2.2
Diff against target: 1591 lines (+898/-228)
12 files modified
NEWS (+10/-0)
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+20/-4)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+46/-6)
bzrlib/smart/protocol.py (+7/-5)
bzrlib/smart/request.py (+145/-100)
bzrlib/tests/test_bundle.py (+6/-3)
bzrlib/tests/test_osutils.py (+39/-0)
bzrlib/tests/test_smart.py (+5/-2)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+400/-22)
To merge this branch: bzr merge lp:~jameinel/bzr/2.2-all-reconnect-819604
Reviewer Review Type Date Requested Status
John A Meinel Disapprove
Martin Packman (community) Approve
Review via email: mp+124346@code.launchpad.net

Commit message

Teach bzr-2.2 how to reconnect if it gets disconnected in the middle of a discussion. (Bug #819604)

Description of the change

This is a merge-up of my 2.1 patch.
Fortunately, bzr-2.2 added the AlreadyConnected socket medium, so this is almost identical to the 2.5 code. The main difference is still just not having the connection timeout code.

To post a comment you must log in.
Revision history for this message
Martin Packman (gz) wrote :

This is still very scary, but bringing the related code down from later versions to fix this issue seems reasonable enough.

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

The only reason to land in 2.2 was because 2.1 is in the last LTS (Lucid). However, if I'm rejecting 2.1, I'm not going to bother with 2.2 or 2.3, instead just start with 2.4+.

review: Disapprove

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'NEWS'
--- NEWS 2012-03-15 14:37:25 +0000
+++ NEWS 2012-09-14 07:34:20 +0000
@@ -20,6 +20,11 @@
20Bug Fixes20Bug Fixes
21*********21*********
2222
23* Teach the bzr client how to reconnect if we get ``ConnectionReset``
24 while making an RPC request. This doesn't handle all possible network
25 disconnects, but it should at least handle when the server is asked to
26 shutdown gracefully. (John Arbash Meinel, #819604)
27
23Improvements28Improvements
24************29************
2530
@@ -111,6 +116,11 @@
111116
112 (John Arbash Meinel, #609187, #812928)117 (John Arbash Meinel, #609187, #812928)
113118
119* Teach the bzr client how to reconnect if we get ``ConnectionReset``
120 while making an RPC request. This doesn't handle all possible network
121 disconnects, but it should at least handle when the server is asked to
122 shutdown gracefully. (John Arbash Meinel, #819604)
123
114124
115Improvements125Improvements
116************126************
117127
=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
+++ bzrlib/help_topics/en/debug-flags.txt 2012-09-14 07:34:20 +0000
@@ -24,6 +24,8 @@
24-Dindex Trace major index operations.24-Dindex Trace major index operations.
25-Dknit Trace knit operations.25-Dknit Trace knit operations.
26-Dlock Trace when lockdir locks are taken or released.26-Dlock Trace when lockdir locks are taken or released.
27-Dnoretry If a connection is reset, fail immediately rather than
28 retrying the request.
27-Dprogress Trace progress bar operations.29-Dprogress Trace progress bar operations.
28-Dmerge Emit information for debugging merges.30-Dmerge Emit information for debugging merges.
29-Dno_apport Don't use apport to report crashes.31-Dno_apport Don't use apport to report crashes.
3032
=== modified file 'bzrlib/osutils.py'
--- bzrlib/osutils.py 2010-07-09 16:16:11 +0000
+++ bzrlib/osutils.py 2012-09-14 07:34:20 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2005-2010 Canonical Ltd1# Copyright (C) 2005-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -32,6 +32,7 @@
32# and need the former on windows32# and need the former on windows
33import shutil33import shutil
34from shutil import rmtree34from shutil import rmtree
35import signal
35import socket36import socket
36import subprocess37import subprocess
37# We need to import both tempfile and mkdtemp as we export the later on posix38# We need to import both tempfile and mkdtemp as we export the later on posix
@@ -1993,6 +1994,14 @@
1993# data at once.1994# data at once.
1994MAX_SOCKET_CHUNK = 64 * 10241995MAX_SOCKET_CHUNK = 64 * 1024
19951996
1997_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL]
1998for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:
1999 _eno = getattr(errno, _eno, None)
2000 if _eno is not None:
2001 _end_of_stream_errors.append(_eno)
2002del _eno
2003
2004
1996def read_bytes_from_socket(sock, report_activity=None,2005def read_bytes_from_socket(sock, report_activity=None,
1997 max_read_size=MAX_SOCKET_CHUNK):2006 max_read_size=MAX_SOCKET_CHUNK):
1998 """Read up to max_read_size of bytes from sock and notify of progress.2007 """Read up to max_read_size of bytes from sock and notify of progress.
@@ -2006,7 +2015,7 @@
2006 bytes = sock.recv(max_read_size)2015 bytes = sock.recv(max_read_size)
2007 except socket.error, e:2016 except socket.error, e:
2008 eno = e.args[0]2017 eno = e.args[0]
2009 if eno == getattr(errno, "WSAECONNRESET", errno.ECONNRESET):2018 if eno in _end_of_stream_errors:
2010 # The connection was closed by the other side. Callers expect2019 # The connection was closed by the other side. Callers expect
2011 # an empty string to signal end-of-stream.2020 # an empty string to signal end-of-stream.
2012 return ""2021 return ""
@@ -2057,12 +2066,19 @@
2057 while sent_total < byte_count:2066 while sent_total < byte_count:
2058 try:2067 try:
2059 sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))2068 sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))
2060 except socket.error, e:2069 except (socket.error, IOError), e:
2070 if e.args[0] in _end_of_stream_errors:
2071 raise errors.ConnectionReset(
2072 "Error trying to write to socket", e)
2061 if e.args[0] != errno.EINTR:2073 if e.args[0] != errno.EINTR:
2062 raise2074 raise
2063 else:2075 else:
2076 if sent == 0:
2077 raise errors.ConnectionReset('Sending to %s returned 0 bytes'
2078 % (sock,))
2064 sent_total += sent2079 sent_total += sent
2065 report_activity(sent, 'write')2080 if report_activity is not None:
2081 report_activity(sent, 'write')
20662082
20672083
2068def dereference_path(path):2084def dereference_path(path):
20692085
=== modified file 'bzrlib/smart/client.py'
--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/client.py 2012-09-14 07:34:20 +0000
@@ -14,12 +14,18 @@
14# along with this program; if not, write to the Free Software14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1616
17from bzrlib import lazy_import
18lazy_import.lazy_import(globals(), """
19from bzrlib.smart import request as _mod_request
20""")
21
17import bzrlib22import bzrlib
18from bzrlib.smart import message, protocol23from bzrlib.smart import message, protocol
19from bzrlib.trace import warning
20from bzrlib import (24from bzrlib import (
25 debug,
21 errors,26 errors,
22 hooks,27 hooks,
28 trace,
23 )29 )
2430
2531
@@ -39,93 +45,12 @@
39 def __repr__(self):45 def __repr__(self):
40 return '%s(%r)' % (self.__class__.__name__, self._medium)46 return '%s(%r)' % (self.__class__.__name__, self._medium)
4147
42 def _send_request(self, protocol_version, method, args, body=None,
43 readv_body=None, body_stream=None):
44 encoder, response_handler = self._construct_protocol(
45 protocol_version)
46 encoder.set_headers(self._headers)
47 if body is not None:
48 if readv_body is not None:
49 raise AssertionError(
50 "body and readv_body are mutually exclusive.")
51 if body_stream is not None:
52 raise AssertionError(
53 "body and body_stream are mutually exclusive.")
54 encoder.call_with_body_bytes((method, ) + args, body)
55 elif readv_body is not None:
56 if body_stream is not None:
57 raise AssertionError(
58 "readv_body and body_stream are mutually exclusive.")
59 encoder.call_with_body_readv_array((method, ) + args, readv_body)
60 elif body_stream is not None:
61 encoder.call_with_body_stream((method, ) + args, body_stream)
62 else:
63 encoder.call(method, *args)
64 return response_handler
65
66 def _run_call_hooks(self, method, args, body, readv_body):
67 if not _SmartClient.hooks['call']:
68 return
69 params = CallHookParams(method, args, body, readv_body, self._medium)
70 for hook in _SmartClient.hooks['call']:
71 hook(params)
72
73 def _call_and_read_response(self, method, args, body=None, readv_body=None,48 def _call_and_read_response(self, method, args, body=None, readv_body=None,
74 body_stream=None, expect_response_body=True):49 body_stream=None, expect_response_body=True):
75 self._run_call_hooks(method, args, body, readv_body)50 request = _SmartClientRequest(self, method, args, body=body,
76 if self._medium._protocol_version is not None:51 readv_body=readv_body, body_stream=body_stream,
77 response_handler = self._send_request(52 expect_response_body=expect_response_body)
78 self._medium._protocol_version, method, args, body=body,53 return request.call_and_read_response()
79 readv_body=readv_body, body_stream=body_stream)
80 return (response_handler.read_response_tuple(
81 expect_body=expect_response_body),
82 response_handler)
83 else:
84 for protocol_version in [3, 2]:
85 if protocol_version == 2:
86 # If v3 doesn't work, the remote side is older than 1.6.
87 self._medium._remember_remote_is_before((1, 6))
88 response_handler = self._send_request(
89 protocol_version, method, args, body=body,
90 readv_body=readv_body, body_stream=body_stream)
91 try:
92 response_tuple = response_handler.read_response_tuple(
93 expect_body=expect_response_body)
94 except errors.UnexpectedProtocolVersionMarker, err:
95 # TODO: We could recover from this without disconnecting if
96 # we recognise the protocol version.
97 warning(
98 'Server does not understand Bazaar network protocol %d,'
99 ' reconnecting. (Upgrade the server to avoid this.)'
100 % (protocol_version,))
101 self._medium.disconnect()
102 continue
103 except errors.ErrorFromSmartServer:
104 # If we received an error reply from the server, then it
105 # must be ok with this protocol version.
106 self._medium._protocol_version = protocol_version
107 raise
108 else:
109 self._medium._protocol_version = protocol_version
110 return response_tuple, response_handler
111 raise errors.SmartProtocolError(
112 'Server is not a Bazaar server: ' + str(err))
113
114 def _construct_protocol(self, version):
115 request = self._medium.get_request()
116 if version == 3:
117 request_encoder = protocol.ProtocolThreeRequester(request)
118 response_handler = message.ConventionalResponseHandler()
119 response_proto = protocol.ProtocolThreeDecoder(
120 response_handler, expect_version_marker=True)
121 response_handler.setProtoAndMediumRequest(response_proto, request)
122 elif version == 2:
123 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
124 response_handler = request_encoder
125 else:
126 request_encoder = protocol.SmartClientRequestProtocolOne(request)
127 response_handler = request_encoder
128 return request_encoder, response_handler
12954
130 def call(self, method, *args):55 def call(self, method, *args):
131 """Call a method on the remote server."""56 """Call a method on the remote server."""
@@ -191,6 +116,203 @@
191 return self._medium.remote_path_from_transport(transport)116 return self._medium.remote_path_from_transport(transport)
192117
193118
119class _SmartClientRequest(object):
120 """Encapsulate the logic for a single request.
121
122 This class handles things like reconnecting and sending the request a
123 second time when the connection is reset in the middle. It also handles the
124 multiple requests that get made if we don't know what protocol the server
125 supports yet.
126
127 Generally, you build up one of these objects, passing in the arguments that
128 you want to send to the server, and then use 'call_and_read_response' to
129 get the response from the server.
130 """
131
132 def __init__(self, client, method, args, body=None, readv_body=None,
133 body_stream=None, expect_response_body=True):
134 self.client = client
135 self.method = method
136 self.args = args
137 self.body = body
138 self.readv_body = readv_body
139 self.body_stream = body_stream
140 self.expect_response_body = expect_response_body
141
142 def call_and_read_response(self):
143 """Send the request to the server, and read the initial response.
144
145 This doesn't read all of the body content of the response, instead it
146 returns (response_tuple, response_handler). response_tuple is the 'ok',
147 or 'error' information, and 'response_handler' can be used to get the
148 content stream out.
149 """
150 self._run_call_hooks()
151 protocol_version = self.client._medium._protocol_version
152 if protocol_version is None:
153 return self._call_determining_protocol_version()
154 else:
155 return self._call(protocol_version)
156
157 def _is_safe_to_send_twice(self):
158 """Check if the current method is re-entrant safe."""
159 if self.body_stream is not None or 'noretry' in debug.debug_flags:
160 # We can't restart a body stream that has already been consumed.
161 return False
162 request_type = _mod_request.request_handlers.get_info(self.method)
163 if request_type in ('read', 'idem', 'semi'):
164 return True
165 # If we have gotten this far, 'stream' cannot be retried, because we
166 # already consumed the local stream.
167 if request_type in ('semivfs', 'mutate', 'stream'):
168 return False
169 trace.mutter('Unknown request type: %s for method %s'
170 % (request_type, self.method))
171 return False
172
173 def _run_call_hooks(self):
174 if not _SmartClient.hooks['call']:
175 return
176 params = CallHookParams(self.method, self.args, self.body,
177 self.readv_body, self.client._medium)
178 for hook in _SmartClient.hooks['call']:
179 hook(params)
180
181 def _call(self, protocol_version):
182 """We know the protocol version.
183
184 So this just sends the request, and then reads the response. This is
185 where the code will be to retry requests if the connection is closed.
186 """
187 response_handler = self._send(protocol_version)
188 try:
189 response_tuple = response_handler.read_response_tuple(
190 expect_body=self.expect_response_body)
191 except errors.ConnectionReset, e:
192 self.client._medium.reset()
193 if not self._is_safe_to_send_twice():
194 raise
195 trace.warning('ConnectionReset reading response for %r, retrying'
196 % (self.method,))
197 trace.log_exception_quietly()
198 encoder, response_handler = self._construct_protocol(
199 protocol_version)
200 self._send_no_retry(encoder)
201 response_tuple = response_handler.read_response_tuple(
202 expect_body=self.expect_response_body)
203 return (response_tuple, response_handler)
204
205 def _call_determining_protocol_version(self):
206 """Determine what protocol the remote server supports.
207
208 We do this by placing a request in the most recent protocol, and
209 handling the UnexpectedProtocolVersionMarker from the server.
210 """
211 for protocol_version in [3, 2]:
212 if protocol_version == 2:
213 # If v3 doesn't work, the remote side is older than 1.6.
214 self.client._medium._remember_remote_is_before((1, 6))
215 try:
216 response_tuple, response_handler = self._call(protocol_version)
217 except errors.UnexpectedProtocolVersionMarker, err:
218 # TODO: We could recover from this without disconnecting if
219 # we recognise the protocol version.
220 trace.warning(
221 'Server does not understand Bazaar network protocol %d,'
222 ' reconnecting. (Upgrade the server to avoid this.)'
223 % (protocol_version,))
224 self.client._medium.disconnect()
225 continue
226 except errors.ErrorFromSmartServer:
227 # If we received an error reply from the server, then it
228 # must be ok with this protocol version.
229 self.client._medium._protocol_version = protocol_version
230 raise
231 else:
232 self.client._medium._protocol_version = protocol_version
233 return response_tuple, response_handler
234 raise errors.SmartProtocolError(
235 'Server is not a Bazaar server: ' + str(err))
236
237 def _construct_protocol(self, version):
238 """Build the encoding stack for a given protocol version."""
239 request = self.client._medium.get_request()
240 if version == 3:
241 request_encoder = protocol.ProtocolThreeRequester(request)
242 response_handler = message.ConventionalResponseHandler()
243 response_proto = protocol.ProtocolThreeDecoder(
244 response_handler, expect_version_marker=True)
245 response_handler.setProtoAndMediumRequest(response_proto, request)
246 elif version == 2:
247 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
248 response_handler = request_encoder
249 else:
250 request_encoder = protocol.SmartClientRequestProtocolOne(request)
251 response_handler = request_encoder
252 return request_encoder, response_handler
253
254 def _send(self, protocol_version):
255 """Encode the request, and send it to the server.
256
257 This will retry a request if we get a ConnectionReset while sending the
258 request to the server. (Unless we have a body_stream that we have
259 already started consuming, since we can't restart body_streams)
260
261 :return: response_handler as defined by _construct_protocol
262 """
263 encoder, response_handler = self._construct_protocol(protocol_version)
264 try:
265 self._send_no_retry(encoder)
266 except errors.ConnectionReset, e:
267 # If we fail during the _send_no_retry phase, then we can
268 # be confident that the server did not get our request, because we
269 # haven't started waiting for the reply yet. So try the request
270 # again. We only issue a single retry, because if the connection
271 # really is down, there is no reason to loop endlessly.
272
273 # Connection is dead, so close our end of it.
274 self.client._medium.reset()
275 if (('noretry' in debug.debug_flags)
276 or (self.body_stream is not None
277 and encoder.body_stream_started)):
278 # We can't restart a body_stream that has been partially
279 # consumed, so we don't retry.
280 # Note: We don't have to worry about
281 # SmartClientRequestProtocolOne or Two, because they don't
282 # support client-side body streams.
283 raise
284 trace.warning('ConnectionReset calling %r, retrying'
285 % (self.method,))
286 trace.log_exception_quietly()
287 encoder, response_handler = self._construct_protocol(
288 protocol_version)
289 self._send_no_retry(encoder)
290 return response_handler
291
292 def _send_no_retry(self, encoder):
293 """Just encode the request and try to send it."""
294 encoder.set_headers(self.client._headers)
295 if self.body is not None:
296 if self.readv_body is not None:
297 raise AssertionError(
298 "body and readv_body are mutually exclusive.")
299 if self.body_stream is not None:
300 raise AssertionError(
301 "body and body_stream are mutually exclusive.")
302 encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
303 elif self.readv_body is not None:
304 if self.body_stream is not None:
305 raise AssertionError(
306 "readv_body and body_stream are mutually exclusive.")
307 encoder.call_with_body_readv_array((self.method, ) + self.args,
308 self.readv_body)
309 elif self.body_stream is not None:
310 encoder.call_with_body_stream((self.method, ) + self.args,
311 self.body_stream)
312 else:
313 encoder.call(self.method, *self.args)
314
315
194class SmartClientHooks(hooks.Hooks):316class SmartClientHooks(hooks.Hooks):
195317
196 def __init__(self):318 def __init__(self):
197319
=== modified file 'bzrlib/smart/medium.py'
--- bzrlib/smart/medium.py 2010-06-21 08:08:04 +0000
+++ bzrlib/smart/medium.py 2012-09-14 07:34:20 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -24,6 +24,7 @@
24bzrlib/transport/smart/__init__.py.24bzrlib/transport/smart/__init__.py.
25"""25"""
2626
27import errno
27import os28import os
28import sys29import sys
29import urllib30import urllib
@@ -176,6 +177,14 @@
176 ui.ui_factory.report_transport_activity(self, bytes, direction)177 ui.ui_factory.report_transport_activity(self, bytes, direction)
177178
178179
180_bad_file_descriptor = (errno.EBADF,)
181if sys.platform == 'win32':
182 # Given on Windows if you pass a closed socket to select.select. Probably
183 # also given if you pass a file handle to select.
184 WSAENOTSOCK = 10038
185 _bad_file_descriptor += (WSAENOTSOCK,)
186
187
179class SmartServerStreamMedium(SmartMedium):188class SmartServerStreamMedium(SmartMedium):
180 """Handles smart commands coming over a stream.189 """Handles smart commands coming over a stream.
181190
@@ -239,6 +248,8 @@
239248
240 :param protocol: a SmartServerRequestProtocol.249 :param protocol: a SmartServerRequestProtocol.
241 """250 """
251 if protocol is None:
252 return
242 try:253 try:
243 self._serve_one_request_unguarded(protocol)254 self._serve_one_request_unguarded(protocol)
244 except KeyboardInterrupt:255 except KeyboardInterrupt:
@@ -710,6 +721,14 @@
710 """721 """
711 return SmartClientStreamMediumRequest(self)722 return SmartClientStreamMediumRequest(self)
712723
724 def reset(self):
725 """We have been disconnected, reset current state.
726
727 This resets things like _current_request and connected state.
728 """
729 self.disconnect()
730 self._current_request = None
731
713732
714class SmartSimplePipesClientMedium(SmartClientStreamMedium):733class SmartSimplePipesClientMedium(SmartClientStreamMedium):
715 """A client medium using simple pipes.734 """A client medium using simple pipes.
@@ -724,11 +743,20 @@
724743
725 def _accept_bytes(self, bytes):744 def _accept_bytes(self, bytes):
726 """See SmartClientStreamMedium.accept_bytes."""745 """See SmartClientStreamMedium.accept_bytes."""
727 self._writeable_pipe.write(bytes)746 try:
747 self._writeable_pipe.write(bytes)
748 except IOError, e:
749 if e.errno in (errno.EINVAL, errno.EPIPE):
750 raise errors.ConnectionReset(
751 "Error trying to write to subprocess", e)
752 raise
728 self._report_activity(len(bytes), 'write')753 self._report_activity(len(bytes), 'write')
729754
730 def _flush(self):755 def _flush(self):
731 """See SmartClientStreamMedium._flush()."""756 """See SmartClientStreamMedium._flush()."""
757 # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
758 # However, testing shows that even when the child process is
759 # gone, this doesn't error.
732 self._writeable_pipe.flush()760 self._writeable_pipe.flush()
733761
734 def _read_bytes(self, count):762 def _read_bytes(self, count):
@@ -753,8 +781,8 @@
753781
754class SmartSSHClientMedium(SmartClientStreamMedium):782class SmartSSHClientMedium(SmartClientStreamMedium):
755 """A client medium using SSH.783 """A client medium using SSH.
756 784
757 It delegates IO to a SmartClientSocketMedium or785 It delegates IO to a SmartSimplePipesClientMedium or
758 SmartClientAlreadyConnectedSocketMedium (depending on platform).786 SmartClientAlreadyConnectedSocketMedium (depending on platform).
759 """787 """
760788
@@ -897,6 +925,20 @@
897 SmartClientSocketMedium.__init__(self, base)925 SmartClientSocketMedium.__init__(self, base)
898 self._host = host926 self._host = host
899 self._port = port927 self._port = port
928 self._socket = None
929
930 def _accept_bytes(self, bytes):
931 """See SmartClientMedium.accept_bytes."""
932 self._ensure_connection()
933 osutils.send_all(self._socket, bytes, self._report_activity)
934
935 def disconnect(self):
936 """See SmartClientMedium.disconnect()."""
937 if not self._connected:
938 return
939 self._socket.close()
940 self._socket = None
941 self._connected = False
900942
901 def _ensure_connection(self):943 def _ensure_connection(self):
902 """Connect this medium if not already connected."""944 """Connect this medium if not already connected."""
@@ -993,5 +1035,3 @@
993 This invokes self._medium._flush to ensure all bytes are transmitted.1035 This invokes self._medium._flush to ensure all bytes are transmitted.
994 """1036 """
995 self._medium._flush()1037 self._medium._flush()
996
997
9981038
=== modified file 'bzrlib/smart/protocol.py'
--- bzrlib/smart/protocol.py 2010-06-11 05:57:09 +0000
+++ bzrlib/smart/protocol.py 2012-09-14 07:34:20 +0000
@@ -654,7 +654,7 @@
654 """Make a remote call with a readv array.654 """Make a remote call with a readv array.
655655
656 The body is encoded with one line per readv offset pair. The numbers in656 The body is encoded with one line per readv offset pair. The numbers in
657 each pair are separated by a comma, and no trailing \n is emitted.657 each pair are separated by a comma, and no trailing \\n is emitted.
658 """658 """
659 if 'hpss' in debug.debug_flags:659 if 'hpss' in debug.debug_flags:
660 mutter('hpss call w/readv: %s', repr(args)[1:-1])660 mutter('hpss call w/readv: %s', repr(args)[1:-1])
@@ -1081,9 +1081,6 @@
1081 self._real_write_func = write_func1081 self._real_write_func = write_func
10821082
1083 def _write_func(self, bytes):1083 def _write_func(self, bytes):
1084 # TODO: It is probably more appropriate to use sum(map(len, _buf))
1085 # for total number of bytes to write, rather than buffer based on
1086 # the number of write() calls
1087 # TODO: Another possibility would be to turn this into an async model.1084 # TODO: Another possibility would be to turn this into an async model.
1088 # Where we let another thread know that we have some bytes if1085 # Where we let another thread know that we have some bytes if
1089 # they want it, but we don't actually block for it1086 # they want it, but we don't actually block for it
@@ -1292,6 +1289,7 @@
1292 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)1289 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1293 self._medium_request = medium_request1290 self._medium_request = medium_request
1294 self._headers = {}1291 self._headers = {}
1292 self.body_stream_started = None
12951293
1296 def set_headers(self, headers):1294 def set_headers(self, headers):
1297 self._headers = headers.copy()1295 self._headers = headers.copy()
@@ -1332,7 +1330,7 @@
1332 """Make a remote call with a readv array.1330 """Make a remote call with a readv array.
13331331
1334 The body is encoded with one line per readv offset pair. The numbers in1332 The body is encoded with one line per readv offset pair. The numbers in
1335 each pair are separated by a comma, and no trailing \n is emitted.1333 each pair are separated by a comma, and no trailing \\n is emitted.
1336 """1334 """
1337 if 'hpss' in debug.debug_flags:1335 if 'hpss' in debug.debug_flags:
1338 mutter('hpss call w/readv: %s', repr(args)[1:-1])1336 mutter('hpss call w/readv: %s', repr(args)[1:-1])
@@ -1357,6 +1355,7 @@
1357 if path is not None:1355 if path is not None:
1358 mutter(' (to %s)', path)1356 mutter(' (to %s)', path)
1359 self._request_start_time = osutils.timer_func()1357 self._request_start_time = osutils.timer_func()
1358 self.body_stream_started = False
1360 self._write_protocol_version()1359 self._write_protocol_version()
1361 self._write_headers(self._headers)1360 self._write_headers(self._headers)
1362 self._write_structure(args)1361 self._write_structure(args)
@@ -1364,6 +1363,9 @@
1364 # have finished sending the stream. We would notice at the end1363 # have finished sending the stream. We would notice at the end
1365 # anyway, but if the medium can deliver it early then it's good1364 # anyway, but if the medium can deliver it early then it's good
1366 # to short-circuit the whole request...1365 # to short-circuit the whole request...
1366 # Provoke any ConnectionReset failures before we start the body stream.
1367 self.flush()
1368 self.body_stream_started = True
1367 for exc_info, part in _iter_with_errors(stream):1369 for exc_info, part in _iter_with_errors(stream):
1368 if exc_info is not None:1370 if exc_info is not None:
1369 # Iterating the stream failed. Cleanly abort the request.1371 # Iterating the stream failed. Cleanly abort the request.
13701372
=== modified file 'bzrlib/smart/request.py'
--- bzrlib/smart/request.py 2010-05-13 16:17:54 +0000
+++ bzrlib/smart/request.py 2012-09-14 07:34:20 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -134,7 +134,7 @@
134 It will return a SmartServerResponse if the command does not expect a134 It will return a SmartServerResponse if the command does not expect a
135 body.135 body.
136136
137 :param *args: the arguments of the request.137 :param args: the arguments of the request.
138 """138 """
139 self._check_enabled()139 self._check_enabled()
140 return self.do(*args)140 return self.do(*args)
@@ -486,154 +486,199 @@
486 return SuccessfulSmartServerResponse((answer,))486 return SuccessfulSmartServerResponse((answer,))
487487
488488
489# In the 'info' attribute, we store whether this request is 'safe' to retry if
490# we get a disconnect while reading the response. It can have the values:
491# read This is purely a read request, so retrying it is perfectly ok.
492# idem An idempotent write request. Something like 'put' where if you put
493# the same bytes twice you end up with the same final bytes.
494# semi This is a request that isn't strictly idempotent, but doesn't
495# result in corruption if it is retried. This is for things like
496# 'lock' and 'unlock'. If you call lock, it updates the disk
497# structure. If you fail to read the response, you won't be able to
498# use the lock, because you don't have the lock token. Calling lock
499# again will fail, because the lock is already taken. However, we
500# can't tell if the server received our request or not. If it didn't,
501# then retrying the request is fine, as it will actually do what we
502# want. If it did, we will interrupt the current operation, but we
503# are no worse off than interrupting the current operation because of
504# a ConnectionReset.
505# semivfs Similar to semi, but specific to a Virtual FileSystem request.
506# stream This is a request that takes a stream that cannot be restarted if
507# consumed. This request is 'safe' in that if we determine the
508# connection is closed before we consume the stream, we can try
509# again.
510# mutate State is updated in a way that replaying that request results in a
511# different state. For example 'append' writes more bytes to a given
512# file. If append succeeds, it moves the file pointer.
489request_handlers = registry.Registry()513request_handlers = registry.Registry()
490request_handlers.register_lazy(514request_handlers.register_lazy(
491 'append', 'bzrlib.smart.vfs', 'AppendRequest')515 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
492request_handlers.register_lazy(516request_handlers.register_lazy(
493 'Branch.get_config_file', 'bzrlib.smart.branch',517 'Branch.get_config_file', 'bzrlib.smart.branch',
494 'SmartServerBranchGetConfigFile')518 'SmartServerBranchGetConfigFile', info='read')
495request_handlers.register_lazy(519request_handlers.register_lazy(
496 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')520 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
521 info='read')
497request_handlers.register_lazy(522request_handlers.register_lazy(
498 'Branch.get_tags_bytes', 'bzrlib.smart.branch',523 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
499 'SmartServerBranchGetTagsBytes')524 'SmartServerBranchGetTagsBytes', info='read')
500request_handlers.register_lazy(525request_handlers.register_lazy(
501 'Branch.set_tags_bytes', 'bzrlib.smart.branch',526 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
502 'SmartServerBranchSetTagsBytes')527 'SmartServerBranchSetTagsBytes', info='idem')
503request_handlers.register_lazy(528request_handlers.register_lazy(
504 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')529 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
505request_handlers.register_lazy(530 'SmartServerBranchRequestGetStackedOnURL', info='read')
506 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')531request_handlers.register_lazy(
507request_handlers.register_lazy(532 'Branch.last_revision_info', 'bzrlib.smart.branch',
508 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')533 'SmartServerBranchRequestLastRevisionInfo', info='read')
509request_handlers.register_lazy( 'Branch.revision_history',534request_handlers.register_lazy(
510 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')535 'Branch.lock_write', 'bzrlib.smart.branch',
511request_handlers.register_lazy( 'Branch.set_config_option',536 'SmartServerBranchRequestLockWrite', info='semi')
512 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')537request_handlers.register_lazy(
513request_handlers.register_lazy( 'Branch.set_config_option_dict',538 'Branch.revision_history', 'bzrlib.smart.branch',
514 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict')539 'SmartServerRequestRevisionHistory', info='read')
515request_handlers.register_lazy( 'Branch.set_last_revision',540request_handlers.register_lazy(
516 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')541 'Branch.set_config_option', 'bzrlib.smart.branch',
542 'SmartServerBranchRequestSetConfigOption', info='idem')
543request_handlers.register_lazy(
544 'Branch.set_config_option_dict', 'bzrlib.smart.branch',
545 'SmartServerBranchRequestSetConfigOptionDict', info='idem')
546request_handlers.register_lazy(
547 'Branch.set_last_revision', 'bzrlib.smart.branch',
548 'SmartServerBranchRequestSetLastRevision', info='idem')
517request_handlers.register_lazy(549request_handlers.register_lazy(
518 'Branch.set_last_revision_info', 'bzrlib.smart.branch',550 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
519 'SmartServerBranchRequestSetLastRevisionInfo')551 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
520request_handlers.register_lazy(552request_handlers.register_lazy(
521 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',553 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
522 'SmartServerBranchRequestSetLastRevisionEx')554 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
523request_handlers.register_lazy(555request_handlers.register_lazy(
524 'Branch.set_parent_location', 'bzrlib.smart.branch',556 'Branch.set_parent_location', 'bzrlib.smart.branch',
525 'SmartServerBranchRequestSetParentLocation')557 'SmartServerBranchRequestSetParentLocation', info='idem')
526request_handlers.register_lazy(558request_handlers.register_lazy(
527 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')559 'Branch.unlock', 'bzrlib.smart.branch',
560 'SmartServerBranchRequestUnlock', info='semi')
528request_handlers.register_lazy(561request_handlers.register_lazy(
529 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',562 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
530 'SmartServerBzrDirRequestCloningMetaDir')563 'SmartServerBzrDirRequestCloningMetaDir', info='read')
531request_handlers.register_lazy(564request_handlers.register_lazy(
532 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',565 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
533 'SmartServerRequestCreateBranch')566 'SmartServerRequestCreateBranch', info='semi')
534request_handlers.register_lazy(567request_handlers.register_lazy(
535 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',568 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
536 'SmartServerRequestCreateRepository')569 'SmartServerRequestCreateRepository', info='semi')
537request_handlers.register_lazy(570request_handlers.register_lazy(
538 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',571 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
539 'SmartServerRequestFindRepositoryV1')572 'SmartServerRequestFindRepositoryV1', info='read')
540request_handlers.register_lazy(573request_handlers.register_lazy(
541 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',574 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
542 'SmartServerRequestFindRepositoryV2')575 'SmartServerRequestFindRepositoryV2', info='read')
543request_handlers.register_lazy(576request_handlers.register_lazy(
544 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',577 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
545 'SmartServerRequestFindRepositoryV3')578 'SmartServerRequestFindRepositoryV3', info='read')
546request_handlers.register_lazy(579request_handlers.register_lazy(
547 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',580 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
548 'SmartServerBzrDirRequestConfigFile')581 'SmartServerBzrDirRequestConfigFile', info='read')
549request_handlers.register_lazy(582request_handlers.register_lazy(
550 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',583 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
551 'SmartServerRequestInitializeBzrDir')584 'SmartServerRequestInitializeBzrDir', info='semi')
552request_handlers.register_lazy(585request_handlers.register_lazy(
553 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',586 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
554 'SmartServerRequestBzrDirInitializeEx')587 'SmartServerRequestBzrDirInitializeEx', info='semi')
555request_handlers.register_lazy(588request_handlers.register_lazy(
556 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')589 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
557request_handlers.register_lazy(590 info='read')
558 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')591request_handlers.register_lazy(
592 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
593 'SmartServerRequestOpenBzrDir_2_1', info='read')
559request_handlers.register_lazy(594request_handlers.register_lazy(
560 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',595 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
561 'SmartServerRequestOpenBranch')596 'SmartServerRequestOpenBranch', info='read')
562request_handlers.register_lazy(597request_handlers.register_lazy(
563 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',598 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
564 'SmartServerRequestOpenBranchV2')599 'SmartServerRequestOpenBranchV2', info='read')
565request_handlers.register_lazy(600request_handlers.register_lazy(
566 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',601 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
567 'SmartServerRequestOpenBranchV3')602 'SmartServerRequestOpenBranchV3', info='read')
568request_handlers.register_lazy(603request_handlers.register_lazy(
569 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')604 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
570request_handlers.register_lazy(605request_handlers.register_lazy(
571 'get', 'bzrlib.smart.vfs', 'GetRequest')606 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
572request_handlers.register_lazy(607request_handlers.register_lazy(
573 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')608 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
574request_handlers.register_lazy(609request_handlers.register_lazy(
575 'has', 'bzrlib.smart.vfs', 'HasRequest')610 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
576request_handlers.register_lazy(611request_handlers.register_lazy(
577 'hello', 'bzrlib.smart.request', 'HelloRequest')612 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
578request_handlers.register_lazy(613request_handlers.register_lazy(
579 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')614 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
580request_handlers.register_lazy(615 info='read')
581 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')616request_handlers.register_lazy(
582request_handlers.register_lazy(617 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
583 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')618request_handlers.register_lazy(
584request_handlers.register_lazy(619 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
585 'move', 'bzrlib.smart.vfs', 'MoveRequest')620request_handlers.register_lazy(
586request_handlers.register_lazy(621 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
587 'put', 'bzrlib.smart.vfs', 'PutRequest')622request_handlers.register_lazy(
588request_handlers.register_lazy(623 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
589 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')624request_handlers.register_lazy(
590request_handlers.register_lazy(625 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
591 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')626request_handlers.register_lazy(
592request_handlers.register_lazy(627 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
593 'rename', 'bzrlib.smart.vfs', 'RenameRequest')628request_handlers.register_lazy(
629 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
594request_handlers.register_lazy(630request_handlers.register_lazy(
595 'PackRepository.autopack', 'bzrlib.smart.packrepository',631 'PackRepository.autopack', 'bzrlib.smart.packrepository',
596 'SmartServerPackRepositoryAutopack')632 'SmartServerPackRepositoryAutopack', info='idem')
597request_handlers.register_lazy('Repository.gather_stats',633request_handlers.register_lazy(
598 'bzrlib.smart.repository',634 'Repository.gather_stats', 'bzrlib.smart.repository',
599 'SmartServerRepositoryGatherStats')635 'SmartServerRepositoryGatherStats', info='read')
600request_handlers.register_lazy('Repository.get_parent_map',636request_handlers.register_lazy(
601 'bzrlib.smart.repository',637 'Repository.get_parent_map', 'bzrlib.smart.repository',
602 'SmartServerRepositoryGetParentMap')638 'SmartServerRepositoryGetParentMap', info='read')
603request_handlers.register_lazy(639request_handlers.register_lazy(
604 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')640 'Repository.get_revision_graph', 'bzrlib.smart.repository',
605request_handlers.register_lazy(641 'SmartServerRepositoryGetRevisionGraph', info='read')
606 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')642request_handlers.register_lazy(
607request_handlers.register_lazy(643 'Repository.has_revision', 'bzrlib.smart.repository',
608 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')644 'SmartServerRequestHasRevision', info='read')
609request_handlers.register_lazy(645request_handlers.register_lazy(
610 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')646 'Repository.insert_stream', 'bzrlib.smart.repository',
611request_handlers.register_lazy(647 'SmartServerRepositoryInsertStream', info='stream')
612 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')648request_handlers.register_lazy(
613request_handlers.register_lazy(649 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
614 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')650 'SmartServerRepositoryInsertStream_1_19', info='stream')
615request_handlers.register_lazy(651request_handlers.register_lazy(
616 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')652 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
653 'SmartServerRepositoryInsertStreamLocked', info='stream')
654request_handlers.register_lazy(
655 'Repository.is_shared', 'bzrlib.smart.repository',
656 'SmartServerRepositoryIsShared', info='read')
657request_handlers.register_lazy(
658 'Repository.lock_write', 'bzrlib.smart.repository',
659 'SmartServerRepositoryLockWrite', info='semi')
617request_handlers.register_lazy(660request_handlers.register_lazy(
618 'Repository.set_make_working_trees', 'bzrlib.smart.repository',661 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
619 'SmartServerRepositorySetMakeWorkingTrees')662 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
620request_handlers.register_lazy(663request_handlers.register_lazy(
621 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')664 'Repository.unlock', 'bzrlib.smart.repository',
665 'SmartServerRepositoryUnlock', info='semi')
622request_handlers.register_lazy(666request_handlers.register_lazy(
623 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',667 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
624 'SmartServerRepositoryGetRevIdForRevno')668 'SmartServerRepositoryGetRevIdForRevno', info='read')
625request_handlers.register_lazy(669request_handlers.register_lazy(
626 'Repository.get_stream', 'bzrlib.smart.repository',670 'Repository.get_stream', 'bzrlib.smart.repository',
627 'SmartServerRepositoryGetStream')671 'SmartServerRepositoryGetStream', info='read')
628request_handlers.register_lazy(672request_handlers.register_lazy(
629 'Repository.get_stream_1.19', 'bzrlib.smart.repository',673 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
630 'SmartServerRepositoryGetStream_1_19')674 'SmartServerRepositoryGetStream_1_19', info='read')
631request_handlers.register_lazy(675request_handlers.register_lazy(
632 'Repository.tarball', 'bzrlib.smart.repository',676 'Repository.tarball', 'bzrlib.smart.repository',
633 'SmartServerRepositoryTarball')677 'SmartServerRepositoryTarball', info='read')
634request_handlers.register_lazy(678request_handlers.register_lazy(
635 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')679 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
636request_handlers.register_lazy(680request_handlers.register_lazy(
637 'stat', 'bzrlib.smart.vfs', 'StatRequest')681 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
638request_handlers.register_lazy(682request_handlers.register_lazy(
639 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')683 'Transport.is_readonly', 'bzrlib.smart.request',
684 'SmartServerIsReadonly', info='read')
640685
=== modified file 'bzrlib/tests/test_bundle.py'
--- bzrlib/tests/test_bundle.py 2010-02-23 07:43:11 +0000
+++ bzrlib/tests/test_bundle.py 2012-09-14 07:34:20 +0000
@@ -1855,20 +1855,23 @@
1855 self.sock.bind(('127.0.0.1', 0))1855 self.sock.bind(('127.0.0.1', 0))
1856 self.sock.listen(1)1856 self.sock.listen(1)
1857 self.port = self.sock.getsockname()[1]1857 self.port = self.sock.getsockname()[1]
1858 self.stopping = threading.Event()
1858 self.thread = threading.Thread(1859 self.thread = threading.Thread(
1859 name='%s (port %d)' % (self.__class__.__name__, self.port),1860 name='%s (port %d)' % (self.__class__.__name__, self.port),
1860 target=self.accept_and_close)1861 target=self.accept_and_close)
1861 self.thread.start()1862 self.thread.start()
18621863
1863 def accept_and_close(self):1864 def accept_and_close(self):
1864 conn, addr = self.sock.accept()1865 while not self.stopping.isSet():
1865 conn.shutdown(socket.SHUT_RDWR)1866 conn, addr = self.sock.accept()
1866 conn.close()1867 conn.shutdown(socket.SHUT_RDWR)
1868 conn.close()
18671869
1868 def get_url(self):1870 def get_url(self):
1869 return 'bzr://127.0.0.1:%d/' % (self.port,)1871 return 'bzr://127.0.0.1:%d/' % (self.port,)
18701872
1871 def stop_server(self):1873 def stop_server(self):
1874 self.stopping.set()
1872 try:1875 try:
1873 # make sure the thread dies by connecting to the listening socket,1876 # make sure the thread dies by connecting to the listening socket,
1874 # just in case the test failed to do so.1877 # just in case the test failed to do so.
18751878
=== modified file 'bzrlib/tests/test_osutils.py'
--- bzrlib/tests/test_osutils.py 2010-12-02 09:23:10 +0000
+++ bzrlib/tests/test_osutils.py 2012-09-14 07:34:20 +0000
@@ -804,6 +804,45 @@
804 self.assertEqual(None, osutils.safe_file_id(None))804 self.assertEqual(None, osutils.safe_file_id(None))
805805
806806
807class TestSendAll(tests.TestCase):
808
809 def test_send_with_disconnected_socket(self):
810 class DisconnectedSocket(object):
811 def __init__(self, err):
812 self.err = err
813 def send(self, content):
814 raise self.err
815 def close(self):
816 pass
817 # All of these should be treated as ConnectionReset
818 errs = []
819 for err_cls in (IOError, socket.error):
820 for errnum in osutils._end_of_stream_errors:
821 errs.append(err_cls(errnum))
822 for err in errs:
823 sock = DisconnectedSocket(err)
824 self.assertRaises(errors.ConnectionReset,
825 osutils.send_all, sock, 'some more content')
826
827 def test_send_with_no_progress(self):
828 # See https://bugs.launchpad.net/bzr/+bug/1047309
829 # It seems that paramiko can get into a state where it doesn't error,
830 # but it returns 0 bytes sent for requests over and over again.
831 class NoSendingSocket(object):
832 def __init__(self):
833 self.call_count = 0
834 def send(self, bytes):
835 self.call_count += 1
836 if self.call_count > 100:
837 # Prevent the test suite from hanging
838 raise RuntimeError('too many calls')
839 return 0
840 sock = NoSendingSocket()
841 self.assertRaises(errors.ConnectionReset,
842 osutils.send_all, sock, 'content')
843 self.assertEqual(1, sock.call_count)
844
845
807class TestWin32Funcs(tests.TestCase):846class TestWin32Funcs(tests.TestCase):
808 """Test that _win32 versions of os utilities return appropriate paths."""847 """Test that _win32 versions of os utilities return appropriate paths."""
809848
810849
=== modified file 'bzrlib/tests/test_smart.py'
--- bzrlib/tests/test_smart.py 2010-05-13 16:17:54 +0000
+++ bzrlib/tests/test_smart.py 2012-09-14 07:34:20 +0000
@@ -1849,8 +1849,11 @@
1849 """All registered request_handlers can be found."""1849 """All registered request_handlers can be found."""
1850 # If there's a typo in a register_lazy call, this loop will fail with1850 # If there's a typo in a register_lazy call, this loop will fail with
1851 # an AttributeError.1851 # an AttributeError.
1852 for key, item in smart_req.request_handlers.iteritems():1852 for key in smart_req.request_handlers.keys():
1853 pass1853 try:
1854 item = smart_req.request_handlers.get(key)
1855 except AttributeError, e:
1856 raise AttributeError('failed to get %s: %s' % (key, e))
18541857
1855 def assertHandlerEqual(self, verb, handler):1858 def assertHandlerEqual(self, verb, handler):
1856 self.assertEqual(smart_req.request_handlers.get(verb), handler)1859 self.assertEqual(smart_req.request_handlers.get(verb), handler)
18571860
=== modified file 'bzrlib/tests/test_smart_request.py'
--- bzrlib/tests/test_smart_request.py 2010-06-20 11:18:38 +0000
+++ bzrlib/tests/test_smart_request.py 2012-09-14 07:34:20 +0000
@@ -111,6 +111,16 @@
111 self.assertEqual(111 self.assertEqual(
112 [[transport]] * 3, handler._command.jail_transports_log)112 [[transport]] * 3, handler._command.jail_transports_log)
113113
114 def test_all_registered_requests_are_safety_qualified(self):
115 unclassified_requests = []
116 allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
117 for key in request.request_handlers.keys():
118 info = request.request_handlers.get_info(key)
119 if info is None or info not in allowed_info:
120 unclassified_requests.append(key)
121 if unclassified_requests:
122 self.fail('These requests were not categorized as safe/unsafe'
123 ' to retry: %s' % (unclassified_requests,))
114124
115125
116class TestSmartRequestHandlerErrorTranslation(TestCase):126class TestSmartRequestHandlerErrorTranslation(TestCase):
117127
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- bzrlib/tests/test_smart_transport.py 2010-06-25 09:56:07 +0000
+++ bzrlib/tests/test_smart_transport.py 2012-09-14 07:34:20 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2011 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -18,13 +18,17 @@
1818
19# all of this deals with byte strings so this is safe19# all of this deals with byte strings so this is safe
20from cStringIO import StringIO20from cStringIO import StringIO
21import errno
21import os22import os
22import socket23import socket
24import subprocess
25import sys
23import threading26import threading
2427
25import bzrlib28import bzrlib
26from bzrlib import (29from bzrlib import (
27 bzrdir,30 bzrdir,
31 debug,
28 errors,32 errors,
29 osutils,33 osutils,
30 tests,34 tests,
@@ -50,6 +54,29 @@
50 )54 )
5155
5256
57def create_file_pipes():
58 r, w = os.pipe()
59 # These must be opened without buffering, or we get undefined results
60 rf = os.fdopen(r, 'rb', 0)
61 wf = os.fdopen(w, 'wb', 0)
62 return rf, wf
63
64
65def portable_socket_pair():
66 """Return a pair of TCP sockets connected to each other.
67
68 Unlike socket.socketpair, this should work on Windows.
69 """
70 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
71 listen_sock.bind(('127.0.0.1', 0))
72 listen_sock.listen(1)
73 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
74 client_sock.connect(listen_sock.getsockname())
75 server_sock, addr = listen_sock.accept()
76 listen_sock.close()
77 return server_sock, client_sock
78
79
53class StringIOSSHVendor(object):80class StringIOSSHVendor(object):
54 """A SSH vendor that uses StringIO to buffer writes and answer reads."""81 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
5582
@@ -64,6 +91,27 @@
64 return StringIOSSHConnection(self)91 return StringIOSSHConnection(self)
6592
6693
94class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
95 """The first connection will be considered closed.
96
97 The second connection will succeed normally.
98 """
99
100 def __init__(self, read_from, write_to, fail_at_write=True):
101 super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
102 write_to)
103 self.fail_at_write = fail_at_write
104 self._first = True
105
106 def connect_ssh(self, username, password, host, port, command):
107 self.calls.append(('connect_ssh', username, password, host, port,
108 command))
109 if self._first:
110 self._first = False
111 return ClosedSSHConnection(self)
112 return StringIOSSHConnection(self)
113
114
67class StringIOSSHConnection(ssh.SSHConnection):115class StringIOSSHConnection(ssh.SSHConnection):
68 """A SSH connection that uses StringIO to buffer writes and answer reads."""116 """A SSH connection that uses StringIO to buffer writes and answer reads."""
69117
@@ -79,6 +127,29 @@
79 return 'pipes', (self.vendor.read_from, self.vendor.write_to)127 return 'pipes', (self.vendor.read_from, self.vendor.write_to)
80128
81129
130class ClosedSSHConnection(ssh.SSHConnection):
131 """An SSH connection that just has closed channels."""
132
133 def __init__(self, vendor):
134 self.vendor = vendor
135
136 def close(self):
137 self.vendor.calls.append(('close', ))
138
139 def get_sock_or_pipes(self):
140 # We create matching pipes, and then close the ssh side
141 bzr_read, ssh_write = create_file_pipes()
142 # We always fail when bzr goes to read
143 ssh_write.close()
144 if self.vendor.fail_at_write:
145 # If set, we'll also fail when bzr goes to write
146 ssh_read, bzr_write = create_file_pipes()
147 ssh_read.close()
148 else:
149 bzr_write = self.vendor.write_to
150 return 'pipes', (bzr_read, bzr_write)
151
152
82class _InvalidHostnameFeature(tests.Feature):153class _InvalidHostnameFeature(tests.Feature):
83 """Does 'non_existent.invalid' fail to resolve?154 """Does 'non_existent.invalid' fail to resolve?
84155
@@ -174,6 +245,91 @@
174 client_medium._accept_bytes('abc')245 client_medium._accept_bytes('abc')
175 self.assertEqual('abc', output.getvalue())246 self.assertEqual('abc', output.getvalue())
176247
248 def test_simple_pipes__accept_bytes_subprocess_closed(self):
249 # It is unfortunate that we have to use Popen for this. However,
250 # os.pipe() does not behave the same as subprocess.Popen().
251 # On Windows, if you use os.pipe() and close the write side,
252 # read.read() hangs. On Linux, read.read() returns the empty string.
253 p = subprocess.Popen([sys.executable, '-c',
254 'import sys\n'
255 'sys.stdout.write(sys.stdin.read(4))\n'
256 'sys.stdout.close()\n'],
257 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
258 client_medium = medium.SmartSimplePipesClientMedium(
259 p.stdout, p.stdin, 'base')
260 client_medium._accept_bytes('abc\n')
261 self.assertEqual('abc', client_medium._read_bytes(3))
262 p.wait()
263 # While writing to the underlying pipe,
264 # Windows py2.6.6 we get IOError(EINVAL)
265 # Lucid py2.6.5, we get IOError(EPIPE)
266 # In both cases, it should be wrapped to ConnectionReset
267 self.assertRaises(errors.ConnectionReset,
268 client_medium._accept_bytes, 'more')
269
270 def test_simple_pipes__accept_bytes_pipe_closed(self):
271 child_read, client_write = create_file_pipes()
272 client_medium = medium.SmartSimplePipesClientMedium(
273 None, client_write, 'base')
274 client_medium._accept_bytes('abc\n')
275 self.assertEqual('abc\n', child_read.read(4))
276 # While writing to the underlying pipe,
277 # Windows py2.6.6 we get IOError(EINVAL)
278 # Lucid py2.6.5, we get IOError(EPIPE)
279 # In both cases, it should be wrapped to ConnectionReset
280 child_read.close()
281 self.assertRaises(errors.ConnectionReset,
282 client_medium._accept_bytes, 'more')
283
284 def test_simple_pipes__flush_pipe_closed(self):
285 child_read, client_write = create_file_pipes()
286 client_medium = medium.SmartSimplePipesClientMedium(
287 None, client_write, 'base')
288 client_medium._accept_bytes('abc\n')
289 child_read.close()
290 # Even though the pipe is closed, flush on the write side seems to be a
291 # no-op, rather than a failure.
292 client_medium._flush()
293
294 def test_simple_pipes__flush_subprocess_closed(self):
295 p = subprocess.Popen([sys.executable, '-c',
296 'import sys\n'
297 'sys.stdout.write(sys.stdin.read(4))\n'
298 'sys.stdout.close()\n'],
299 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
300 client_medium = medium.SmartSimplePipesClientMedium(
301 p.stdout, p.stdin, 'base')
302 client_medium._accept_bytes('abc\n')
303 p.wait()
304 # Even though the child process is dead, flush seems to be a no-op.
305 client_medium._flush()
306
307 def test_simple_pipes__read_bytes_pipe_closed(self):
308 child_read, client_write = create_file_pipes()
309 client_medium = medium.SmartSimplePipesClientMedium(
310 child_read, client_write, 'base')
311 client_medium._accept_bytes('abc\n')
312 client_write.close()
313 self.assertEqual('abc\n', client_medium._read_bytes(4))
314 self.assertEqual('', client_medium._read_bytes(4))
315
316 def test_simple_pipes__read_bytes_subprocess_closed(self):
317 p = subprocess.Popen([sys.executable, '-c',
318 'import sys\n'
319 'if sys.platform == "win32":\n'
320 ' import msvcrt, os\n'
321 ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
322 ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
323 'sys.stdout.write(sys.stdin.read(4))\n'
324 'sys.stdout.close()\n'],
325 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
326 client_medium = medium.SmartSimplePipesClientMedium(
327 p.stdout, p.stdin, 'base')
328 client_medium._accept_bytes('abc\n')
329 p.wait()
330 self.assertEqual('abc\n', client_medium._read_bytes(4))
331 self.assertEqual('', client_medium._read_bytes(4))
332
177 def test_simple_pipes_client_disconnect_does_nothing(self):333 def test_simple_pipes_client_disconnect_does_nothing(self):
178 # calling disconnect does nothing.334 # calling disconnect does nothing.
179 input = StringIO()335 input = StringIO()
@@ -561,6 +717,28 @@
561 request.finished_reading()717 request.finished_reading()
562 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)718 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
563719
720 def test_reset(self):
721 server_sock, client_sock = portable_socket_pair()
722 # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
723 # bzr where it exists.
724 client_medium = medium.SmartTCPClientMedium(None, None, None)
725 client_medium._socket = client_sock
726 client_medium._connected = True
727 req = client_medium.get_request()
728 self.assertRaises(errors.TooManyConcurrentRequests,
729 client_medium.get_request)
730 client_medium.reset()
731 # The stream should be reset, marked as disconnected, though ready for
732 # us to make a new request
733 self.assertFalse(client_medium._connected)
734 self.assertIs(None, client_medium._socket)
735 try:
736 self.assertEqual('', client_sock.recv(1))
737 except socket.error, e:
738 if e.errno not in (errno.EBADF,):
739 raise
740 req = client_medium.get_request()
741
564742
565class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):743class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):
566744
@@ -614,20 +792,6 @@
614 super(TestSmartServerStreamMedium, self).setUp()792 super(TestSmartServerStreamMedium, self).setUp()
615 self._captureVar('BZR_NO_SMART_VFS', None)793 self._captureVar('BZR_NO_SMART_VFS', None)
616794
617 def portable_socket_pair(self):
618 """Return a pair of TCP sockets connected to each other.
619
620 Unlike socket.socketpair, this should work on Windows.
621 """
622 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
623 listen_sock.bind(('127.0.0.1', 0))
624 listen_sock.listen(1)
625 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
626 client_sock.connect(listen_sock.getsockname())
627 server_sock, addr = listen_sock.accept()
628 listen_sock.close()
629 return server_sock, client_sock
630
631 def test_smart_query_version(self):795 def test_smart_query_version(self):
632 """Feed a canned query version to a server"""796 """Feed a canned query version to a server"""
633 # wire-to-wire, using the whole stack797 # wire-to-wire, using the whole stack
@@ -692,7 +856,7 @@
692856
693 def test_socket_stream_with_bulk_data(self):857 def test_socket_stream_with_bulk_data(self):
694 sample_request_bytes = 'command\n9\nbulk datadone\n'858 sample_request_bytes = 'command\n9\nbulk datadone\n'
695 server_sock, client_sock = self.portable_socket_pair()859 server_sock, client_sock = portable_socket_pair()
696 server = medium.SmartServerSocketStreamMedium(860 server = medium.SmartServerSocketStreamMedium(
697 server_sock, None)861 server_sock, None)
698 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)862 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -711,7 +875,7 @@
711 self.assertTrue(server.finished)875 self.assertTrue(server.finished)
712876
713 def test_socket_stream_shutdown_detection(self):877 def test_socket_stream_shutdown_detection(self):
714 server_sock, client_sock = self.portable_socket_pair()878 server_sock, client_sock = portable_socket_pair()
715 client_sock.close()879 client_sock.close()
716 server = medium.SmartServerSocketStreamMedium(880 server = medium.SmartServerSocketStreamMedium(
717 server_sock, None)881 server_sock, None)
@@ -731,7 +895,7 @@
731 rest_of_request_bytes = 'lo\n'895 rest_of_request_bytes = 'lo\n'
732 expected_response = (896 expected_response = (
733 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')897 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
734 server_sock, client_sock = self.portable_socket_pair()898 server_sock, client_sock = portable_socket_pair()
735 server = medium.SmartServerSocketStreamMedium(899 server = medium.SmartServerSocketStreamMedium(
736 server_sock, None)900 server_sock, None)
737 client_sock.sendall(incomplete_request_bytes)901 client_sock.sendall(incomplete_request_bytes)
@@ -807,7 +971,7 @@
807 # _serve_one_request should still process both of them as if they had971 # _serve_one_request should still process both of them as if they had
808 # been received separately.972 # been received separately.
809 sample_request_bytes = 'command\n'973 sample_request_bytes = 'command\n'
810 server_sock, client_sock = self.portable_socket_pair()974 server_sock, client_sock = portable_socket_pair()
811 server = medium.SmartServerSocketStreamMedium(975 server = medium.SmartServerSocketStreamMedium(
812 server_sock, None)976 server_sock, None)
813 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)977 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -844,7 +1008,7 @@
844 self.assertTrue(server.finished)1008 self.assertTrue(server.finished)
8451009
846 def test_socket_stream_error_handling(self):1010 def test_socket_stream_error_handling(self):
847 server_sock, client_sock = self.portable_socket_pair()1011 server_sock, client_sock = portable_socket_pair()
848 server = medium.SmartServerSocketStreamMedium(1012 server = medium.SmartServerSocketStreamMedium(
849 server_sock, None)1013 server_sock, None)
850 fake_protocol = ErrorRaisingProtocol(Exception('boom'))1014 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
@@ -865,7 +1029,7 @@
865 self.assertEqual('', from_server.getvalue())1029 self.assertEqual('', from_server.getvalue())
8661030
867 def test_socket_stream_keyboard_interrupt_handling(self):1031 def test_socket_stream_keyboard_interrupt_handling(self):
868 server_sock, client_sock = self.portable_socket_pair()1032 server_sock, client_sock = portable_socket_pair()
869 server = medium.SmartServerSocketStreamMedium(1033 server = medium.SmartServerSocketStreamMedium(
870 server_sock, None)1034 server_sock, None)
871 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))1035 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
@@ -882,7 +1046,7 @@
882 return server._build_protocol()1046 return server._build_protocol()
8831047
884 def build_protocol_socket(self, bytes):1048 def build_protocol_socket(self, bytes):
885 server_sock, client_sock = self.portable_socket_pair()1049 server_sock, client_sock = portable_socket_pair()
886 server = medium.SmartServerSocketStreamMedium(1050 server = medium.SmartServerSocketStreamMedium(
887 server_sock, None)1051 server_sock, None)
888 client_sock.sendall(bytes)1052 client_sock.sendall(bytes)
@@ -2785,6 +2949,33 @@
2785 'e', # end2949 'e', # end
2786 output.getvalue())2950 output.getvalue())
27872951
2952 def test_records_start_of_body_stream(self):
2953 requester, output = self.make_client_encoder_and_output()
2954 requester.set_headers({})
2955 in_stream = [False]
2956 def stream_checker():
2957 self.assertTrue(requester.body_stream_started)
2958 in_stream[0] = True
2959 yield 'content'
2960 flush_called = []
2961 orig_flush = requester.flush
2962 def tracked_flush():
2963 flush_called.append(in_stream[0])
2964 if in_stream[0]:
2965 self.assertTrue(requester.body_stream_started)
2966 else:
2967 self.assertFalse(requester.body_stream_started)
2968 return orig_flush()
2969 requester.flush = tracked_flush
2970 requester.call_with_body_stream(('one arg',), stream_checker())
2971 self.assertEqual(
2972 'bzr message 3 (bzr 1.6)\n' # protocol version
2973 '\x00\x00\x00\x02de' # headers
2974 's\x00\x00\x00\x0bl7:one arge' # args
2975 'b\x00\x00\x00\x07content' # body
2976 'e', output.getvalue())
2977 self.assertEqual([False, True, True], flush_called)
2978
27882979
2789class StubMediumRequest(object):2980class StubMediumRequest(object):
2790 """A stub medium request that tracks the number of times accept_bytes is2981 """A stub medium request that tracks the number of times accept_bytes is
@@ -3209,6 +3400,193 @@
3209 # encoder.3400 # encoder.
32103401
32113402
3403class Test_SmartClientRequest(tests.TestCase):
3404
3405 def make_client_with_failing_medium(self, fail_at_write=True, response=''):
3406 response_io = StringIO(response)
3407 output = StringIO()
3408 vendor = FirstRejectedStringIOSSHVendor(response_io, output,
3409 fail_at_write=fail_at_write)
3410 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3411 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3412 smart_client = client._SmartClient(client_medium, headers={})
3413 return output, vendor, smart_client
3414
3415 def make_response(self, args, body=None, body_stream=None):
3416 response_io = StringIO()
3417 response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
3418 body_stream=body_stream)
3419 responder = protocol.ProtocolThreeResponder(response_io.write)
3420 responder.send_response(response)
3421 return response_io.getvalue()
3422
3423 def test__call_doesnt_retry_append(self):
3424 response = self.make_response(('appended', '8'))
3425 output, vendor, smart_client = self.make_client_with_failing_medium(
3426 fail_at_write=False, response=response)
3427 smart_request = client._SmartClientRequest(smart_client, 'append',
3428 ('foo', ''), body='content\n')
3429 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3430
3431 def test__call_retries_get_bytes(self):
3432 response = self.make_response(('ok',), 'content\n')
3433 output, vendor, smart_client = self.make_client_with_failing_medium(
3434 fail_at_write=False, response=response)
3435 smart_request = client._SmartClientRequest(smart_client, 'get',
3436 ('foo',))
3437 response, response_handler = smart_request._call(3)
3438 self.assertEqual(('ok',), response)
3439 self.assertEqual('content\n', response_handler.read_body_bytes())
3440
3441 def test__call_noretry_get_bytes(self):
3442 debug.debug_flags.add('noretry')
3443 response = self.make_response(('ok',), 'content\n')
3444 output, vendor, smart_client = self.make_client_with_failing_medium(
3445 fail_at_write=False, response=response)
3446 smart_request = client._SmartClientRequest(smart_client, 'get',
3447 ('foo',))
3448 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3449
3450 def test__send_no_retry_pipes(self):
3451 client_read, server_write = create_file_pipes()
3452 server_read, client_write = create_file_pipes()
3453 client_medium = medium.SmartSimplePipesClientMedium(client_read,
3454 client_write, base='/')
3455 smart_client = client._SmartClient(client_medium)
3456 smart_request = client._SmartClientRequest(smart_client,
3457 'hello', ())
3458 # Close the server side
3459 server_read.close()
3460 encoder, response_handler = smart_request._construct_protocol(3)
3461 self.assertRaises(errors.ConnectionReset,
3462 smart_request._send_no_retry, encoder)
3463
3464 def test__send_read_response_sockets(self):
3465 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3466 listen_sock.bind(('127.0.0.1', 0))
3467 listen_sock.listen(1)
3468 host, port = listen_sock.getsockname()
3469 client_medium = medium.SmartTCPClientMedium(host, port, '/')
3470 client_medium._ensure_connection()
3471 smart_client = client._SmartClient(client_medium)
3472 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3473 # Accept the connection, but don't actually talk to the client.
3474 server_sock, _ = listen_sock.accept()
3475 server_sock.close()
3476 # Sockets buffer and don't really notice that the server has closed the
3477 # connection until we try to read again.
3478 handler = smart_request._send(3)
3479 self.assertRaises(errors.ConnectionReset,
3480 handler.read_response_tuple, expect_body=False)
3481
3482 def test__send_retries_on_write(self):
3483 output, vendor, smart_client = self.make_client_with_failing_medium()
3484 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3485 handler = smart_request._send(3)
3486 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3487 '\x00\x00\x00\x02de' # empty headers
3488 's\x00\x00\x00\tl5:helloee',
3489 output.getvalue())
3490 self.assertEqual(
3491 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3492 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3493 ('close',),
3494 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3495 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3496 ],
3497 vendor.calls)
3498
3499 def test__send_doesnt_retry_read_failure(self):
3500 output, vendor, smart_client = self.make_client_with_failing_medium(
3501 fail_at_write=False)
3502 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3503 handler = smart_request._send(3)
3504 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3505 '\x00\x00\x00\x02de' # empty headers
3506 's\x00\x00\x00\tl5:helloee',
3507 output.getvalue())
3508 self.assertEqual(
3509 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3510 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3511 ],
3512 vendor.calls)
3513 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3514
3515 def test__send_request_retries_body_stream_if_not_started(self):
3516 output, vendor, smart_client = self.make_client_with_failing_medium()
3517 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3518 body_stream=['a', 'b'])
3519 response_handler = smart_request._send(3)
3520 # We connect, get disconnected, and notice before consuming the stream,
3521 # so we try again one time and succeed.
3522 self.assertEqual(
3523 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3524 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3525 ('close',),
3526 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3527 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3528 ],
3529 vendor.calls)
3530 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3531 '\x00\x00\x00\x02de' # empty headers
3532 's\x00\x00\x00\tl5:helloe'
3533 'b\x00\x00\x00\x01a'
3534 'b\x00\x00\x00\x01b'
3535 'e',
3536 output.getvalue())
3537
3538 def test__send_request_stops_if_body_started(self):
3539 # We intentionally use the python StringIO so that we can subclass it.
3540 from StringIO import StringIO
3541 response = StringIO()
3542
3543 class FailAfterFirstWrite(StringIO):
3544 """Allow one 'write' call to pass, fail the rest"""
3545 def __init__(self):
3546 StringIO.__init__(self)
3547 self._first = True
3548
3549 def write(self, s):
3550 if self._first:
3551 self._first = False
3552 return StringIO.write(self, s)
3553 raise IOError(errno.EINVAL, 'invalid file handle')
3554 output = FailAfterFirstWrite()
3555
3556 vendor = FirstRejectedStringIOSSHVendor(response, output,
3557 fail_at_write=False)
3558 ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
3559 client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
3560 smart_client = client._SmartClient(client_medium, headers={})
3561 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3562 body_stream=['a', 'b'])
3563 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3564 # We connect, and manage to get to the point that we start consuming
3565 # the body stream. The next write fails, so we just stop.
3566 self.assertEqual(
3567 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3568 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3569 ('close',),
3570 ],
3571 vendor.calls)
3572 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3573 '\x00\x00\x00\x02de' # empty headers
3574 's\x00\x00\x00\tl5:helloe',
3575 output.getvalue())
3576
3577 def test__send_disabled_retry(self):
3578 debug.debug_flags.add('noretry')
3579 output, vendor, smart_client = self.make_client_with_failing_medium()
3580 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3581 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3582 self.assertEqual(
3583 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3584 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3585 ('close',),
3586 ],
3587 vendor.calls)
3588
3589
3212class LengthPrefixedBodyDecoder(tests.TestCase):3590class LengthPrefixedBodyDecoder(tests.TestCase):
32133591
3214 # XXX: TODO: make accept_reading_trailer invoke translate_response or3592 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches