Merge lp:~jameinel/bzr/2.1-all-reconnect-819604 into lp:bzr/2.1

Proposed by John A Meinel
Status: Rejected
Rejected by: John A Meinel
Proposed branch: lp:~jameinel/bzr/2.1-all-reconnect-819604
Merge into: lp:bzr/2.1
Diff against target: 1807 lines (+970/-289)
11 files modified
NEWS (+5/-0)
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+71/-13)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+69/-62)
bzrlib/smart/protocol.py (+15/-6)
bzrlib/smart/request.py (+142/-98)
bzrlib/tests/test_bundle.py (+6/-3)
bzrlib/tests/test_osutils.py (+39/-0)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+403/-21)
To merge this branch: bzr merge lp:~jameinel/bzr/2.1-all-reconnect-819604
Reviewer Review Type Date Requested Status
Richard Wilbur Approve
Review via email: mp+123901@code.launchpad.net

Commit message

Bring the code to have clients reconnect-on-disconnect to bzr-2.1 (bug #819604)

Description of the change

This is a rollup of all of my client-reconnect patches against bzr-2.1 which is the version in our previous LTS Lucid.

The patch ends up pretty big, as I had originally split it up into a few patches for review. However, most of this is identical to what we have in bzr-2.5.

The specific differences from 2.5 at this point are:

1) No ConnectionTimeout code. This was meant for the server side, not the client. So no need to bring that code in now.
2) No AlreadyConnected client medium. This was designed around communicating with the SSH subprocess using socketpair. However, I didn't want to bring in all of that code (and it was even a source of one of the bugs we had to fix in 2.5).
3) A bunch of more verbs that we added to bzrlib/smart/request.py but all the requests that exist in 2.1 have identical signatures in this code.

This is where I targeted my original changes, so I went back to this level to finish the job.

I'm reasonably comfortable with this patch at this point. I wish it were a bit smaller for a stable series, but the code is quite well tested on newer versions.

To post a comment you must log in.
Revision history for this message
Richard Wilbur (richard-wilbur) wrote :

Looks like a good collection of updates for the LTS 2.1! Thanks for porting the goodness back from 2.5 and fixing the test that broke.
+1

A question regarding the comment in bzrlib/smart/protocol.py:_encode_tuple():
Is the Unicode response issue really an aberrant case? If filesystem names can appear in the "args" list of _encode_tuple, it seems that having Unicode entries would be a matter of course--especially in locales other than "C".

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

sent to pqm by email

Revision history for this message
John A Meinel (jameinel) wrote :

This has sat for too long, that I don't think its value has held. It doesn't land directly (some test suite failures, etc). And the patch is much bigger than the later versions of bzr because of the internal restructuring that has happened since 2.1. So I'm just rejecting this to get it out of the queue.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'NEWS'
--- NEWS 2012-02-02 14:08:45 +0000
+++ NEWS 2012-09-12 09:29:20 +0000
@@ -43,6 +43,11 @@
4343
44 (John Arbash Meinel, #609187, #812928)44 (John Arbash Meinel, #609187, #812928)
4545
46* Teach the bzr client how to reconnect if we get ``ConnectionReset``
47 while making an RPC request. This doesn't handle all possible network
48 disconnects, but it should at least handle when the server is asked to
49 shutdown gracefully. (John Arbash Meinel, #819604)
50
4651
47Improvements52Improvements
48************53************
4954
=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
+++ bzrlib/help_topics/en/debug-flags.txt 2012-09-12 09:29:20 +0000
@@ -24,6 +24,8 @@
24-Dindex Trace major index operations.24-Dindex Trace major index operations.
25-Dknit Trace knit operations.25-Dknit Trace knit operations.
26-Dlock Trace when lockdir locks are taken or released.26-Dlock Trace when lockdir locks are taken or released.
27-Dnoretry If a connection is reset, fail immediately rather than
28 retrying the request.
27-Dprogress Trace progress bar operations.29-Dprogress Trace progress bar operations.
28-Dmerge Emit information for debugging merges.30-Dmerge Emit information for debugging merges.
29-Dno_apport Don't use apport to report crashes.31-Dno_apport Don't use apport to report crashes.
3032
=== modified file 'bzrlib/osutils.py'
--- bzrlib/osutils.py 2010-05-27 04:00:01 +0000
+++ bzrlib/osutils.py 2012-09-12 09:29:20 +0000
@@ -40,6 +40,7 @@
40 rmtree,40 rmtree,
41 )41 )
42import signal42import signal
43import socket
43import subprocess44import subprocess
44import tempfile45import tempfile
45from tempfile import (46from tempfile import (
@@ -1929,40 +1930,97 @@
1929 return socket.gethostname().decode(get_user_encoding())1930 return socket.gethostname().decode(get_user_encoding())
19301931
19311932
1932def recv_all(socket, bytes):1933# We must not read/write any more than 64k at a time from/to a socket so we
1934# don't risk "no buffer space available" errors on some platforms. Windows in
1935# particular is likely to throw WSAECONNABORTED or WSAENOBUFS if given too much
1936# data at once.
1937MAX_SOCKET_CHUNK = 64 * 1024
1938
1939_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL]
1940for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:
1941 _eno = getattr(errno, _eno, None)
1942 if _eno is not None:
1943 _end_of_stream_errors.append(_eno)
1944del _eno
1945
1946
1947def read_bytes_from_socket(sock, report_activity=None,
1948 max_read_size=MAX_SOCKET_CHUNK):
1949 """Read up to max_read_size of bytes from sock and notify of progress.
1950
1951 Translates "Connection reset by peer" into file-like EOF (return an
1952 empty string rather than raise an error), and repeats the recv if
1953 interrupted by a signal.
1954 """
1955 while 1:
1956 try:
1957 bytes = sock.recv(max_read_size)
1958 except socket.error, e:
1959 eno = e.args[0]
1960 if eno in _end_of_stream_errors:
1961 # The connection was closed by the other side. Callers expect
1962 # an empty string to signal end-of-stream.
1963 return ""
1964 elif eno == errno.EINTR:
1965 # Retry the interrupted recv.
1966 continue
1967 raise
1968 else:
1969 if report_activity is not None:
1970 report_activity(len(bytes), 'read')
1971 return bytes
1972
1973
1974def recv_all(socket, count):
1933 """Receive an exact number of bytes.1975 """Receive an exact number of bytes.
19341976
1935 Regular Socket.recv() may return less than the requested number of bytes,1977 Regular Socket.recv() may return less than the requested number of bytes,
1936 dependning on what's in the OS buffer. MSG_WAITALL is not available1978 depending on what's in the OS buffer. MSG_WAITALL is not available
1937 on all platforms, but this should work everywhere. This will return1979 on all platforms, but this should work everywhere. This will return
1938 less than the requested amount if the remote end closes.1980 less than the requested amount if the remote end closes.
19391981
1940 This isn't optimized and is intended mostly for use in testing.1982 This isn't optimized and is intended mostly for use in testing.
1941 """1983 """
1942 b = ''1984 b = ''
1943 while len(b) < bytes:1985 while len(b) < count:
1944 new = until_no_eintr(socket.recv, bytes - len(b))1986 new = read_bytes_from_socket(socket, None, count - len(b))
1945 if new == '':1987 if new == '':
1946 break # eof1988 break # eof
1947 b += new1989 b += new
1948 return b1990 return b
19491991
19501992
1951def send_all(socket, bytes, report_activity=None):1993def send_all(sock, bytes, report_activity=None):
1952 """Send all bytes on a socket.1994 """Send all bytes on a socket.
19531995
1954 Regular socket.sendall() can give socket error 10053 on Windows. This1996 Breaks large blocks in smaller chunks to avoid buffering limitations on
1955 implementation sends no more than 64k at a time, which avoids this problem.1997 some platforms, and catches EINTR which may be thrown if the send is
1998 interrupted by a signal.
1999
2000 This is preferred to socket.sendall(), because it avoids portability bugs
2001 and provides activity reporting.
19562002
1957 :param report_activity: Call this as bytes are read, see2003 :param report_activity: Call this as bytes are read, see
1958 Transport._report_activity2004 Transport._report_activity
1959 """2005 """
1960 chunk_size = 2**162006 sent_total = 0
1961 for pos in xrange(0, len(bytes), chunk_size):2007 byte_count = len(bytes)
1962 block = bytes[pos:pos+chunk_size]2008 while sent_total < byte_count:
1963 if report_activity is not None:2009 try:
1964 report_activity(len(block), 'write')2010 sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))
1965 until_no_eintr(socket.sendall, block)2011 except (socket.error, IOError), e:
2012 if e.args[0] in _end_of_stream_errors:
2013 raise errors.ConnectionReset(
2014 "Error trying to write to socket", e)
2015 if e.args[0] != errno.EINTR:
2016 raise
2017 else:
2018 if sent == 0:
2019 raise errors.ConnectionReset('Sending to %s returned 0 bytes'
2020 % (sock,))
2021 sent_total += sent
2022 if report_activity is not None:
2023 report_activity(sent, 'write')
19662024
19672025
1968def dereference_path(path):2026def dereference_path(path):
19692027
=== modified file 'bzrlib/smart/client.py'
--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/client.py 2012-09-12 09:29:20 +0000
@@ -14,12 +14,18 @@
14# along with this program; if not, write to the Free Software14# along with this program; if not, write to the Free Software
15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA15# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
1616
17from bzrlib import lazy_import
18lazy_import.lazy_import(globals(), """
19from bzrlib.smart import request as _mod_request
20""")
21
17import bzrlib22import bzrlib
18from bzrlib.smart import message, protocol23from bzrlib.smart import message, protocol
19from bzrlib.trace import warning
20from bzrlib import (24from bzrlib import (
25 debug,
21 errors,26 errors,
22 hooks,27 hooks,
28 trace,
23 )29 )
2430
2531
@@ -39,93 +45,12 @@
39 def __repr__(self):45 def __repr__(self):
40 return '%s(%r)' % (self.__class__.__name__, self._medium)46 return '%s(%r)' % (self.__class__.__name__, self._medium)
4147
42 def _send_request(self, protocol_version, method, args, body=None,
43 readv_body=None, body_stream=None):
44 encoder, response_handler = self._construct_protocol(
45 protocol_version)
46 encoder.set_headers(self._headers)
47 if body is not None:
48 if readv_body is not None:
49 raise AssertionError(
50 "body and readv_body are mutually exclusive.")
51 if body_stream is not None:
52 raise AssertionError(
53 "body and body_stream are mutually exclusive.")
54 encoder.call_with_body_bytes((method, ) + args, body)
55 elif readv_body is not None:
56 if body_stream is not None:
57 raise AssertionError(
58 "readv_body and body_stream are mutually exclusive.")
59 encoder.call_with_body_readv_array((method, ) + args, readv_body)
60 elif body_stream is not None:
61 encoder.call_with_body_stream((method, ) + args, body_stream)
62 else:
63 encoder.call(method, *args)
64 return response_handler
65
66 def _run_call_hooks(self, method, args, body, readv_body):
67 if not _SmartClient.hooks['call']:
68 return
69 params = CallHookParams(method, args, body, readv_body, self._medium)
70 for hook in _SmartClient.hooks['call']:
71 hook(params)
72
73 def _call_and_read_response(self, method, args, body=None, readv_body=None,48 def _call_and_read_response(self, method, args, body=None, readv_body=None,
74 body_stream=None, expect_response_body=True):49 body_stream=None, expect_response_body=True):
75 self._run_call_hooks(method, args, body, readv_body)50 request = _SmartClientRequest(self, method, args, body=body,
76 if self._medium._protocol_version is not None:51 readv_body=readv_body, body_stream=body_stream,
77 response_handler = self._send_request(52 expect_response_body=expect_response_body)
78 self._medium._protocol_version, method, args, body=body,53 return request.call_and_read_response()
79 readv_body=readv_body, body_stream=body_stream)
80 return (response_handler.read_response_tuple(
81 expect_body=expect_response_body),
82 response_handler)
83 else:
84 for protocol_version in [3, 2]:
85 if protocol_version == 2:
86 # If v3 doesn't work, the remote side is older than 1.6.
87 self._medium._remember_remote_is_before((1, 6))
88 response_handler = self._send_request(
89 protocol_version, method, args, body=body,
90 readv_body=readv_body, body_stream=body_stream)
91 try:
92 response_tuple = response_handler.read_response_tuple(
93 expect_body=expect_response_body)
94 except errors.UnexpectedProtocolVersionMarker, err:
95 # TODO: We could recover from this without disconnecting if
96 # we recognise the protocol version.
97 warning(
98 'Server does not understand Bazaar network protocol %d,'
99 ' reconnecting. (Upgrade the server to avoid this.)'
100 % (protocol_version,))
101 self._medium.disconnect()
102 continue
103 except errors.ErrorFromSmartServer:
104 # If we received an error reply from the server, then it
105 # must be ok with this protocol version.
106 self._medium._protocol_version = protocol_version
107 raise
108 else:
109 self._medium._protocol_version = protocol_version
110 return response_tuple, response_handler
111 raise errors.SmartProtocolError(
112 'Server is not a Bazaar server: ' + str(err))
113
114 def _construct_protocol(self, version):
115 request = self._medium.get_request()
116 if version == 3:
117 request_encoder = protocol.ProtocolThreeRequester(request)
118 response_handler = message.ConventionalResponseHandler()
119 response_proto = protocol.ProtocolThreeDecoder(
120 response_handler, expect_version_marker=True)
121 response_handler.setProtoAndMediumRequest(response_proto, request)
122 elif version == 2:
123 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
124 response_handler = request_encoder
125 else:
126 request_encoder = protocol.SmartClientRequestProtocolOne(request)
127 response_handler = request_encoder
128 return request_encoder, response_handler
12954
130 def call(self, method, *args):55 def call(self, method, *args):
131 """Call a method on the remote server."""56 """Call a method on the remote server."""
@@ -191,6 +116,203 @@
191 return self._medium.remote_path_from_transport(transport)116 return self._medium.remote_path_from_transport(transport)
192117
193118
119class _SmartClientRequest(object):
120 """Encapsulate the logic for a single request.
121
122 This class handles things like reconnecting and sending the request a
123 second time when the connection is reset in the middle. It also handles the
124 multiple requests that get made if we don't know what protocol the server
125 supports yet.
126
127 Generally, you build up one of these objects, passing in the arguments that
128 you want to send to the server, and then use 'call_and_read_response' to
129 get the response from the server.
130 """
131
132 def __init__(self, client, method, args, body=None, readv_body=None,
133 body_stream=None, expect_response_body=True):
134 self.client = client
135 self.method = method
136 self.args = args
137 self.body = body
138 self.readv_body = readv_body
139 self.body_stream = body_stream
140 self.expect_response_body = expect_response_body
141
142 def call_and_read_response(self):
143 """Send the request to the server, and read the initial response.
144
145 This doesn't read all of the body content of the response, instead it
146 returns (response_tuple, response_handler). response_tuple is the 'ok',
147 or 'error' information, and 'response_handler' can be used to get the
148 content stream out.
149 """
150 self._run_call_hooks()
151 protocol_version = self.client._medium._protocol_version
152 if protocol_version is None:
153 return self._call_determining_protocol_version()
154 else:
155 return self._call(protocol_version)
156
157 def _is_safe_to_send_twice(self):
158 """Check if the current method is re-entrant safe."""
159 if self.body_stream is not None or 'noretry' in debug.debug_flags:
160 # We can't restart a body stream that has already been consumed.
161 return False
162 request_type = _mod_request.request_handlers.get_info(self.method)
163 if request_type in ('read', 'idem', 'semi'):
164 return True
165 # If we have gotten this far, 'stream' cannot be retried, because we
166 # already consumed the local stream.
167 if request_type in ('semivfs', 'mutate', 'stream'):
168 return False
169 trace.mutter('Unknown request type: %s for method %s'
170 % (request_type, self.method))
171 return False
172
173 def _run_call_hooks(self):
174 if not _SmartClient.hooks['call']:
175 return
176 params = CallHookParams(self.method, self.args, self.body,
177 self.readv_body, self.client._medium)
178 for hook in _SmartClient.hooks['call']:
179 hook(params)
180
181 def _call(self, protocol_version):
182 """We know the protocol version.
183
184 So this just sends the request, and then reads the response. This is
185 where the code will be to retry requests if the connection is closed.
186 """
187 response_handler = self._send(protocol_version)
188 try:
189 response_tuple = response_handler.read_response_tuple(
190 expect_body=self.expect_response_body)
191 except errors.ConnectionReset, e:
192 self.client._medium.reset()
193 if not self._is_safe_to_send_twice():
194 raise
195 trace.warning('ConnectionReset reading response for %r, retrying'
196 % (self.method,))
197 trace.log_exception_quietly()
198 encoder, response_handler = self._construct_protocol(
199 protocol_version)
200 self._send_no_retry(encoder)
201 response_tuple = response_handler.read_response_tuple(
202 expect_body=self.expect_response_body)
203 return (response_tuple, response_handler)
204
205 def _call_determining_protocol_version(self):
206 """Determine what protocol the remote server supports.
207
208 We do this by placing a request in the most recent protocol, and
209 handling the UnexpectedProtocolVersionMarker from the server.
210 """
211 for protocol_version in [3, 2]:
212 if protocol_version == 2:
213 # If v3 doesn't work, the remote side is older than 1.6.
214 self.client._medium._remember_remote_is_before((1, 6))
215 try:
216 response_tuple, response_handler = self._call(protocol_version)
217 except errors.UnexpectedProtocolVersionMarker, err:
218 # TODO: We could recover from this without disconnecting if
219 # we recognise the protocol version.
220 trace.warning(
221 'Server does not understand Bazaar network protocol %d,'
222 ' reconnecting. (Upgrade the server to avoid this.)'
223 % (protocol_version,))
224 self.client._medium.disconnect()
225 continue
226 except errors.ErrorFromSmartServer:
227 # If we received an error reply from the server, then it
228 # must be ok with this protocol version.
229 self.client._medium._protocol_version = protocol_version
230 raise
231 else:
232 self.client._medium._protocol_version = protocol_version
233 return response_tuple, response_handler
234 raise errors.SmartProtocolError(
235 'Server is not a Bazaar server: ' + str(err))
236
237 def _construct_protocol(self, version):
238 """Build the encoding stack for a given protocol version."""
239 request = self.client._medium.get_request()
240 if version == 3:
241 request_encoder = protocol.ProtocolThreeRequester(request)
242 response_handler = message.ConventionalResponseHandler()
243 response_proto = protocol.ProtocolThreeDecoder(
244 response_handler, expect_version_marker=True)
245 response_handler.setProtoAndMediumRequest(response_proto, request)
246 elif version == 2:
247 request_encoder = protocol.SmartClientRequestProtocolTwo(request)
248 response_handler = request_encoder
249 else:
250 request_encoder = protocol.SmartClientRequestProtocolOne(request)
251 response_handler = request_encoder
252 return request_encoder, response_handler
253
254 def _send(self, protocol_version):
255 """Encode the request, and send it to the server.
256
257 This will retry a request if we get a ConnectionReset while sending the
258 request to the server. (Unless we have a body_stream that we have
259 already started consuming, since we can't restart body_streams)
260
261 :return: response_handler as defined by _construct_protocol
262 """
263 encoder, response_handler = self._construct_protocol(protocol_version)
264 try:
265 self._send_no_retry(encoder)
266 except errors.ConnectionReset, e:
267 # If we fail during the _send_no_retry phase, then we can
268 # be confident that the server did not get our request, because we
269 # haven't started waiting for the reply yet. So try the request
270 # again. We only issue a single retry, because if the connection
271 # really is down, there is no reason to loop endlessly.
272
273 # Connection is dead, so close our end of it.
274 self.client._medium.reset()
275 if (('noretry' in debug.debug_flags)
276 or (self.body_stream is not None
277 and encoder.body_stream_started)):
278 # We can't restart a body_stream that has been partially
279 # consumed, so we don't retry.
280 # Note: We don't have to worry about
281 # SmartClientRequestProtocolOne or Two, because they don't
282 # support client-side body streams.
283 raise
284 trace.warning('ConnectionReset calling %r, retrying'
285 % (self.method,))
286 trace.log_exception_quietly()
287 encoder, response_handler = self._construct_protocol(
288 protocol_version)
289 self._send_no_retry(encoder)
290 return response_handler
291
292 def _send_no_retry(self, encoder):
293 """Just encode the request and try to send it."""
294 encoder.set_headers(self.client._headers)
295 if self.body is not None:
296 if self.readv_body is not None:
297 raise AssertionError(
298 "body and readv_body are mutually exclusive.")
299 if self.body_stream is not None:
300 raise AssertionError(
301 "body and body_stream are mutually exclusive.")
302 encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
303 elif self.readv_body is not None:
304 if self.body_stream is not None:
305 raise AssertionError(
306 "readv_body and body_stream are mutually exclusive.")
307 encoder.call_with_body_readv_array((self.method, ) + self.args,
308 self.readv_body)
309 elif self.body_stream is not None:
310 encoder.call_with_body_stream((self.method, ) + self.args,
311 self.body_stream)
312 else:
313 encoder.call(self.method, *self.args)
314
315
194class SmartClientHooks(hooks.Hooks):316class SmartClientHooks(hooks.Hooks):
195317
196 def __init__(self):318 def __init__(self):
197319
=== modified file 'bzrlib/smart/medium.py'
--- bzrlib/smart/medium.py 2010-04-27 06:40:37 +0000
+++ bzrlib/smart/medium.py 2012-09-12 09:29:20 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -50,11 +50,11 @@
50#usually already imported, and getting IllegalScoperReplacer on it here.50#usually already imported, and getting IllegalScoperReplacer on it here.
51from bzrlib import osutils51from bzrlib import osutils
5252
53# We must not read any more than 64k at a time so we don't risk "no buffer53# Throughout this module buffer size parameters are either limited to be at
54# space available" errors on some platforms. Windows in particular is likely54# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
55# to give error 10053 or 10055 if we read more than 64k from a socket.55# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
56_MAX_READ_SIZE = 64 * 102456# from non-sockets as well.
5757_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
5858
59def _get_protocol_factory_for_bytes(bytes):59def _get_protocol_factory_for_bytes(bytes):
60 """Determine the right protocol factory for 'bytes'.60 """Determine the right protocol factory for 'bytes'.
@@ -178,6 +178,14 @@
178 ui.ui_factory.report_transport_activity(self, bytes, direction)178 ui.ui_factory.report_transport_activity(self, bytes, direction)
179179
180180
181_bad_file_descriptor = (errno.EBADF,)
182if sys.platform == 'win32':
183 # Given on Windows if you pass a closed socket to select.select. Probably
184 # also given if you pass a file handle to select.
185 WSAENOTSOCK = 10038
186 _bad_file_descriptor += (WSAENOTSOCK,)
187
188
181class SmartServerStreamMedium(SmartMedium):189class SmartServerStreamMedium(SmartMedium):
182 """Handles smart commands coming over a stream.190 """Handles smart commands coming over a stream.
183191
@@ -241,6 +249,8 @@
241249
242 :param protocol: a SmartServerRequestProtocol.250 :param protocol: a SmartServerRequestProtocol.
243 """251 """
252 if protocol is None:
253 return
244 try:254 try:
245 self._serve_one_request_unguarded(protocol)255 self._serve_one_request_unguarded(protocol)
246 except KeyboardInterrupt:256 except KeyboardInterrupt:
@@ -276,9 +286,9 @@
276 def _serve_one_request_unguarded(self, protocol):286 def _serve_one_request_unguarded(self, protocol):
277 while protocol.next_read_size():287 while protocol.next_read_size():
278 # We can safely try to read large chunks. If there is less data288 # We can safely try to read large chunks. If there is less data
279 # than _MAX_READ_SIZE ready, the socket wil just return a short289 # than MAX_SOCKET_CHUNK ready, the socket will just return a
280 # read immediately rather than block.290 # short read immediately rather than block.
281 bytes = self.read_bytes(_MAX_READ_SIZE)291 bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
282 if bytes == '':292 if bytes == '':
283 self.finished = True293 self.finished = True
284 return294 return
@@ -287,13 +297,13 @@
287 self._push_back(protocol.unused_data)297 self._push_back(protocol.unused_data)
288298
289 def _read_bytes(self, desired_count):299 def _read_bytes(self, desired_count):
290 return _read_bytes_from_socket(300 return osutils.read_bytes_from_socket(
291 self.socket.recv, desired_count, self._report_activity)301 self.socket, self._report_activity)
292302
293 def terminate_due_to_error(self):303 def terminate_due_to_error(self):
294 # TODO: This should log to a server log file, but no such thing304 # TODO: This should log to a server log file, but no such thing
295 # exists yet. Andrew Bennetts 2006-09-29.305 # exists yet. Andrew Bennetts 2006-09-29.
296 osutils.until_no_eintr(self.socket.close)306 self.socket.close()
297 self.finished = True307 self.finished = True
298308
299 def _write_out(self, bytes):309 def _write_out(self, bytes):
@@ -345,16 +355,16 @@
345 protocol.accept_bytes(bytes)355 protocol.accept_bytes(bytes)
346356
347 def _read_bytes(self, desired_count):357 def _read_bytes(self, desired_count):
348 return osutils.until_no_eintr(self._in.read, desired_count)358 return self._in.read(desired_count)
349359
350 def terminate_due_to_error(self):360 def terminate_due_to_error(self):
351 # TODO: This should log to a server log file, but no such thing361 # TODO: This should log to a server log file, but no such thing
352 # exists yet. Andrew Bennetts 2006-09-29.362 # exists yet. Andrew Bennetts 2006-09-29.
353 osutils.until_no_eintr(self._out.close)363 self._out.close()
354 self.finished = True364 self.finished = True
355365
356 def _write_out(self, bytes):366 def _write_out(self, bytes):
357 osutils.until_no_eintr(self._out.write, bytes)367 self._out.write(bytes)
358368
359369
360class SmartClientMediumRequest(object):370class SmartClientMediumRequest(object):
@@ -712,6 +722,14 @@
712 """722 """
713 return SmartClientStreamMediumRequest(self)723 return SmartClientStreamMediumRequest(self)
714724
725 def reset(self):
726 """We have been disconnected, reset current state.
727
728 This resets things like _current_request and connected state.
729 """
730 self.disconnect()
731 self._current_request = None
732
715733
716class SmartSimplePipesClientMedium(SmartClientStreamMedium):734class SmartSimplePipesClientMedium(SmartClientStreamMedium):
717 """A client medium using simple pipes.735 """A client medium using simple pipes.
@@ -726,22 +744,35 @@
726744
727 def _accept_bytes(self, bytes):745 def _accept_bytes(self, bytes):
728 """See SmartClientStreamMedium.accept_bytes."""746 """See SmartClientStreamMedium.accept_bytes."""
729 osutils.until_no_eintr(self._writeable_pipe.write, bytes)747 try:
748 self._writeable_pipe.write(bytes)
749 except IOError, e:
750 if e.errno in (errno.EINVAL, errno.EPIPE):
751 raise errors.ConnectionReset(
752 "Error trying to write to subprocess", e)
753 raise
730 self._report_activity(len(bytes), 'write')754 self._report_activity(len(bytes), 'write')
731755
732 def _flush(self):756 def _flush(self):
733 """See SmartClientStreamMedium._flush()."""757 """See SmartClientStreamMedium._flush()."""
734 osutils.until_no_eintr(self._writeable_pipe.flush)758 # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
759 # However, testing shows that even when the child process is
760 # gone, this doesn't error.
761 self._writeable_pipe.flush()
735762
736 def _read_bytes(self, count):763 def _read_bytes(self, count):
737 """See SmartClientStreamMedium._read_bytes."""764 """See SmartClientStreamMedium._read_bytes."""
738 bytes = osutils.until_no_eintr(self._readable_pipe.read, count)765 bytes_to_read = min(count, _MAX_READ_SIZE)
766 bytes = self._readable_pipe.read(bytes_to_read)
739 self._report_activity(len(bytes), 'read')767 self._report_activity(len(bytes), 'read')
740 return bytes768 return bytes
741769
742770
743class SmartSSHClientMedium(SmartClientStreamMedium):771class SmartSSHClientMedium(SmartClientStreamMedium):
744 """A client medium using SSH."""772 """A client medium using SSH.
773
774 It delegates IO to a SmartSimplePipesClientMedium.
775 """
745776
746 def __init__(self, host, port=None, username=None, password=None,777 def __init__(self, host, port=None, username=None, password=None,
747 base=None, vendor=None, bzr_remote_path=None):778 base=None, vendor=None, bzr_remote_path=None):
@@ -750,11 +781,11 @@
750 :param vendor: An optional override for the ssh vendor to use. See781 :param vendor: An optional override for the ssh vendor to use. See
751 bzrlib.transport.ssh for details on ssh vendors.782 bzrlib.transport.ssh for details on ssh vendors.
752 """783 """
753 self._connected = False
754 self._host = host784 self._host = host
755 self._password = password785 self._password = password
756 self._port = port786 self._port = port
757 self._username = username787 self._username = username
788 self._real_medium = None
758 # for the benefit of progress making a short description of this789 # for the benefit of progress making a short description of this
759 # transport790 # transport
760 self._scheme = 'bzr+ssh'791 self._scheme = 'bzr+ssh'
@@ -762,11 +793,9 @@
762 # _DebugCounter so we have to store all the values used in our repr793 # _DebugCounter so we have to store all the values used in our repr
763 # method before calling the super init.794 # method before calling the super init.
764 SmartClientStreamMedium.__init__(self, base)795 SmartClientStreamMedium.__init__(self, base)
765 self._read_from = None
766 self._ssh_connection = None
767 self._vendor = vendor796 self._vendor = vendor
768 self._write_to = None
769 self._bzr_remote_path = bzr_remote_path797 self._bzr_remote_path = bzr_remote_path
798 self._ssh_connection = None
770799
771 def __repr__(self):800 def __repr__(self):
772 if self._port is None:801 if self._port is None:
@@ -783,21 +812,20 @@
783 def _accept_bytes(self, bytes):812 def _accept_bytes(self, bytes):
784 """See SmartClientStreamMedium.accept_bytes."""813 """See SmartClientStreamMedium.accept_bytes."""
785 self._ensure_connection()814 self._ensure_connection()
786 osutils.until_no_eintr(self._write_to.write, bytes)815 self._real_medium.accept_bytes(bytes)
787 self._report_activity(len(bytes), 'write')
788816
789 def disconnect(self):817 def disconnect(self):
790 """See SmartClientMedium.disconnect()."""818 """See SmartClientMedium.disconnect()."""
791 if not self._connected:819 if self._real_medium is not None:
792 return820 self._real_medium.disconnect()
793 osutils.until_no_eintr(self._read_from.close)821 self._real_medium = None
794 osutils.until_no_eintr(self._write_to.close)822 if self._ssh_connection is not None:
795 self._ssh_connection.close()823 self._ssh_connection.close()
796 self._connected = False824 self._ssh_connection = None
797825
798 def _ensure_connection(self):826 def _ensure_connection(self):
799 """Connect this medium if not already connected."""827 """Connect this medium if not already connected."""
800 if self._connected:828 if self._real_medium is not None:
801 return829 return
802 if self._vendor is None:830 if self._vendor is None:
803 vendor = ssh._get_ssh_vendor()831 vendor = ssh._get_ssh_vendor()
@@ -807,22 +835,19 @@
807 self._password, self._host, self._port,835 self._password, self._host, self._port,
808 command=[self._bzr_remote_path, 'serve', '--inet',836 command=[self._bzr_remote_path, 'serve', '--inet',
809 '--directory=/', '--allow-writes'])837 '--directory=/', '--allow-writes'])
810 self._read_from, self._write_to = \838 read_from, write_to = self._ssh_connection.get_filelike_channels()
811 self._ssh_connection.get_filelike_channels()839 self._real_medium = SmartSimplePipesClientMedium(
812 self._connected = True840 read_from, write_to, self.base)
813841
814 def _flush(self):842 def _flush(self):
815 """See SmartClientStreamMedium._flush()."""843 """See SmartClientStreamMedium._flush()."""
816 self._write_to.flush()844 self._real_medium._flush()
817845
818 def _read_bytes(self, count):846 def _read_bytes(self, count):
819 """See SmartClientStreamMedium.read_bytes."""847 """See SmartClientStreamMedium.read_bytes."""
820 if not self._connected:848 if self._real_medium is None:
821 raise errors.MediumNotConnected(self)849 raise errors.MediumNotConnected(self)
822 bytes_to_read = min(count, _MAX_READ_SIZE)850 return self._real_medium.read_bytes(count)
823 bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
824 self._report_activity(len(bytes), 'read')
825 return bytes
826851
827852
828# Port 4155 is the default port for bzr://, registered with IANA.853# Port 4155 is the default port for bzr://, registered with IANA.
@@ -850,7 +875,7 @@
850 """See SmartClientMedium.disconnect()."""875 """See SmartClientMedium.disconnect()."""
851 if not self._connected:876 if not self._connected:
852 return877 return
853 osutils.until_no_eintr(self._socket.close)878 self._socket.close()
854 self._socket = None879 self._socket = None
855 self._connected = False880 self._connected = False
856881
@@ -904,8 +929,8 @@
904 """See SmartClientMedium.read_bytes."""929 """See SmartClientMedium.read_bytes."""
905 if not self._connected:930 if not self._connected:
906 raise errors.MediumNotConnected(self)931 raise errors.MediumNotConnected(self)
907 return _read_bytes_from_socket(932 return osutils.read_bytes_from_socket(
908 self._socket.recv, count, self._report_activity)933 self._socket, self._report_activity)
909934
910935
911class SmartClientStreamMediumRequest(SmartClientMediumRequest):936class SmartClientStreamMediumRequest(SmartClientMediumRequest):
@@ -946,21 +971,3 @@
946 This invokes self._medium._flush to ensure all bytes are transmitted.971 This invokes self._medium._flush to ensure all bytes are transmitted.
947 """972 """
948 self._medium._flush()973 self._medium._flush()
949
950
951def _read_bytes_from_socket(sock, desired_count, report_activity):
952 # We ignore the desired_count because on sockets it's more efficient to
953 # read large chunks (of _MAX_READ_SIZE bytes) at a time.
954 try:
955 bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
956 except socket.error, e:
957 if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
958 # The connection was closed by the other side. Callers expect an
959 # empty string to signal end-of-stream.
960 bytes = ''
961 else:
962 raise
963 else:
964 report_activity(len(bytes), 'read')
965 return bytes
966
967974
=== modified file 'bzrlib/smart/protocol.py'
--- bzrlib/smart/protocol.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/protocol.py 2012-09-12 09:29:20 +0000
@@ -62,7 +62,13 @@
6262
63def _encode_tuple(args):63def _encode_tuple(args):
64 """Encode the tuple args to a bytestream."""64 """Encode the tuple args to a bytestream."""
65 return '\x01'.join(args) + '\n'65 joined = '\x01'.join(args) + '\n'
66 if type(joined) is unicode:
67 # XXX: We should fix things so this never happens! -AJB, 20100304
68 mutter('response args contain unicode, should be only bytes: %r',
69 joined)
70 joined = joined.encode('ascii')
71 return joined
6672
6773
68class Requester(object):74class Requester(object):
@@ -648,7 +654,7 @@
648 """Make a remote call with a readv array.654 """Make a remote call with a readv array.
649655
650 The body is encoded with one line per readv offset pair. The numbers in656 The body is encoded with one line per readv offset pair. The numbers in
651 each pair are separated by a comma, and no trailing \n is emitted.657 each pair are separated by a comma, and no trailing \\n is emitted.
652 """658 """
653 if 'hpss' in debug.debug_flags:659 if 'hpss' in debug.debug_flags:
654 mutter('hpss call w/readv: %s', repr(args)[1:-1])660 mutter('hpss call w/readv: %s', repr(args)[1:-1])
@@ -1075,9 +1081,6 @@
1075 self._real_write_func = write_func1081 self._real_write_func = write_func
10761082
1077 def _write_func(self, bytes):1083 def _write_func(self, bytes):
1078 # TODO: It is probably more appropriate to use sum(map(len, _buf))
1079 # for total number of bytes to write, rather than buffer based on
1080 # the number of write() calls
1081 # TODO: Another possibility would be to turn this into an async model.1084 # TODO: Another possibility would be to turn this into an async model.
1082 # Where we let another thread know that we have some bytes if1085 # Where we let another thread know that we have some bytes if
1083 # they want it, but we don't actually block for it1086 # they want it, but we don't actually block for it
@@ -1225,6 +1228,7 @@
1225 if first_chunk is None:1228 if first_chunk is None:
1226 first_chunk = chunk1229 first_chunk = chunk
1227 self._write_prefixed_body(chunk)1230 self._write_prefixed_body(chunk)
1231 self.flush()
1228 if 'hpssdetail' in debug.debug_flags:1232 if 'hpssdetail' in debug.debug_flags:
1229 # Not worth timing separately, as _write_func is1233 # Not worth timing separately, as _write_func is
1230 # actually buffered1234 # actually buffered
@@ -1285,6 +1289,7 @@
1285 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)1289 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1286 self._medium_request = medium_request1290 self._medium_request = medium_request
1287 self._headers = {}1291 self._headers = {}
1292 self.body_stream_started = None
12881293
1289 def set_headers(self, headers):1294 def set_headers(self, headers):
1290 self._headers = headers.copy()1295 self._headers = headers.copy()
@@ -1325,7 +1330,7 @@
1325 """Make a remote call with a readv array.1330 """Make a remote call with a readv array.
13261331
1327 The body is encoded with one line per readv offset pair. The numbers in1332 The body is encoded with one line per readv offset pair. The numbers in
1328 each pair are separated by a comma, and no trailing \n is emitted.1333 each pair are separated by a comma, and no trailing \\n is emitted.
1329 """1334 """
1330 if 'hpss' in debug.debug_flags:1335 if 'hpss' in debug.debug_flags:
1331 mutter('hpss call w/readv: %s', repr(args)[1:-1])1336 mutter('hpss call w/readv: %s', repr(args)[1:-1])
@@ -1350,6 +1355,7 @@
1350 if path is not None:1355 if path is not None:
1351 mutter(' (to %s)', path)1356 mutter(' (to %s)', path)
1352 self._request_start_time = osutils.timer_func()1357 self._request_start_time = osutils.timer_func()
1358 self.body_stream_started = False
1353 self._write_protocol_version()1359 self._write_protocol_version()
1354 self._write_headers(self._headers)1360 self._write_headers(self._headers)
1355 self._write_structure(args)1361 self._write_structure(args)
@@ -1357,6 +1363,9 @@
1357 # have finished sending the stream. We would notice at the end1363 # have finished sending the stream. We would notice at the end
1358 # anyway, but if the medium can deliver it early then it's good1364 # anyway, but if the medium can deliver it early then it's good
1359 # to short-circuit the whole request...1365 # to short-circuit the whole request...
1366 # Provoke any ConnectionReset failures before we start the body stream.
1367 self.flush()
1368 self.body_stream_started = True
1360 for exc_info, part in _iter_with_errors(stream):1369 for exc_info, part in _iter_with_errors(stream):
1361 if exc_info is not None:1370 if exc_info is not None:
1362 # Iterating the stream failed. Cleanly abort the request.1371 # Iterating the stream failed. Cleanly abort the request.
13631372
=== modified file 'bzrlib/smart/request.py'
--- bzrlib/smart/request.py 2010-02-17 17:11:16 +0000
+++ bzrlib/smart/request.py 2012-09-12 09:29:20 +0000
@@ -1,4 +1,4 @@
1# Copyright (C) 2006-2010 Canonical Ltd1# Copyright (C) 2006-2012 Canonical Ltd
2#2#
3# This program is free software; you can redistribute it and/or modify3# This program is free software; you can redistribute it and/or modify
4# it under the terms of the GNU General Public License as published by4# it under the terms of the GNU General Public License as published by
@@ -134,7 +134,7 @@
134 It will return a SmartServerResponse if the command does not expect a134 It will return a SmartServerResponse if the command does not expect a
135 body.135 body.
136136
137 :param *args: the arguments of the request.137 :param args: the arguments of the request.
138 """138 """
139 self._check_enabled()139 self._check_enabled()
140 return self.do(*args)140 return self.do(*args)
@@ -486,152 +486,196 @@
486 return SuccessfulSmartServerResponse((answer,))486 return SuccessfulSmartServerResponse((answer,))
487487
488488
489# In the 'info' attribute, we store whether this request is 'safe' to retry if
490# we get a disconnect while reading the response. It can have the values:
491# read This is purely a read request, so retrying it is perfectly ok.
492# idem An idempotent write request. Something like 'put' where if you put
493# the same bytes twice you end up with the same final bytes.
494# semi This is a request that isn't strictly idempotent, but doesn't
495# result in corruption if it is retried. This is for things like
496# 'lock' and 'unlock'. If you call lock, it updates the disk
497# structure. If you fail to read the response, you won't be able to
498# use the lock, because you don't have the lock token. Calling lock
499# again will fail, because the lock is already taken. However, we
500# can't tell if the server received our request or not. If it didn't,
501# then retrying the request is fine, as it will actually do what we
502# want. If it did, we will interrupt the current operation, but we
503# are no worse off than interrupting the current operation because of
504# a ConnectionReset.
505# semivfs Similar to semi, but specific to a Virtual FileSystem request.
506# stream This is a request that takes a stream that cannot be restarted if
507# consumed. This request is 'safe' in that if we determine the
508# connection is closed before we consume the stream, we can try
509# again.
510# mutate State is updated in a way that replaying that request results in a
511# different state. For example 'append' writes more bytes to a given
512# file. If append succeeds, it moves the file pointer.
489request_handlers = registry.Registry()513request_handlers = registry.Registry()
490request_handlers.register_lazy(514request_handlers.register_lazy(
491 'append', 'bzrlib.smart.vfs', 'AppendRequest')515 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
492request_handlers.register_lazy(516request_handlers.register_lazy(
493 'Branch.get_config_file', 'bzrlib.smart.branch',517 'Branch.get_config_file', 'bzrlib.smart.branch',
494 'SmartServerBranchGetConfigFile')518 'SmartServerBranchGetConfigFile', info='read')
495request_handlers.register_lazy(519request_handlers.register_lazy(
496 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')520 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
521 info='read')
497request_handlers.register_lazy(522request_handlers.register_lazy(
498 'Branch.get_tags_bytes', 'bzrlib.smart.branch',523 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
499 'SmartServerBranchGetTagsBytes')524 'SmartServerBranchGetTagsBytes', info='read')
500request_handlers.register_lazy(525request_handlers.register_lazy(
501 'Branch.set_tags_bytes', 'bzrlib.smart.branch',526 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
502 'SmartServerBranchSetTagsBytes')527 'SmartServerBranchSetTagsBytes', info='idem')
503request_handlers.register_lazy(528request_handlers.register_lazy(
504 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')529 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
505request_handlers.register_lazy(530 'SmartServerBranchRequestGetStackedOnURL', info='read')
506 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')531request_handlers.register_lazy(
507request_handlers.register_lazy(532 'Branch.last_revision_info', 'bzrlib.smart.branch',
508 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')533 'SmartServerBranchRequestLastRevisionInfo', info='read')
509request_handlers.register_lazy( 'Branch.revision_history',534request_handlers.register_lazy(
510 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')535 'Branch.lock_write', 'bzrlib.smart.branch',
511request_handlers.register_lazy( 'Branch.set_config_option',536 'SmartServerBranchRequestLockWrite', info='semi')
512 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')537request_handlers.register_lazy(
513request_handlers.register_lazy( 'Branch.set_last_revision',538 'Branch.revision_history', 'bzrlib.smart.branch',
514 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')539 'SmartServerRequestRevisionHistory', info='read')
540request_handlers.register_lazy(
541 'Branch.set_config_option', 'bzrlib.smart.branch',
542 'SmartServerBranchRequestSetConfigOption', info='idem')
543request_handlers.register_lazy(
544 'Branch.set_last_revision', 'bzrlib.smart.branch',
545 'SmartServerBranchRequestSetLastRevision', info='idem')
515request_handlers.register_lazy(546request_handlers.register_lazy(
516 'Branch.set_last_revision_info', 'bzrlib.smart.branch',547 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
517 'SmartServerBranchRequestSetLastRevisionInfo')548 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
518request_handlers.register_lazy(549request_handlers.register_lazy(
519 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',550 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
520 'SmartServerBranchRequestSetLastRevisionEx')551 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
521request_handlers.register_lazy(552request_handlers.register_lazy(
522 'Branch.set_parent_location', 'bzrlib.smart.branch',553 'Branch.set_parent_location', 'bzrlib.smart.branch',
523 'SmartServerBranchRequestSetParentLocation')554 'SmartServerBranchRequestSetParentLocation', info='idem')
524request_handlers.register_lazy(555request_handlers.register_lazy(
525 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')556 'Branch.unlock', 'bzrlib.smart.branch',
557 'SmartServerBranchRequestUnlock', info='semi')
526request_handlers.register_lazy(558request_handlers.register_lazy(
527 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',559 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
528 'SmartServerBzrDirRequestCloningMetaDir')560 'SmartServerBzrDirRequestCloningMetaDir', info='read')
529request_handlers.register_lazy(561request_handlers.register_lazy(
530 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',562 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
531 'SmartServerRequestCreateBranch')563 'SmartServerRequestCreateBranch', info='semi')
532request_handlers.register_lazy(564request_handlers.register_lazy(
533 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',565 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
534 'SmartServerRequestCreateRepository')566 'SmartServerRequestCreateRepository', info='semi')
535request_handlers.register_lazy(567request_handlers.register_lazy(
536 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',568 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
537 'SmartServerRequestFindRepositoryV1')569 'SmartServerRequestFindRepositoryV1', info='read')
538request_handlers.register_lazy(570request_handlers.register_lazy(
539 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',571 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
540 'SmartServerRequestFindRepositoryV2')572 'SmartServerRequestFindRepositoryV2', info='read')
541request_handlers.register_lazy(573request_handlers.register_lazy(
542 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',574 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
543 'SmartServerRequestFindRepositoryV3')575 'SmartServerRequestFindRepositoryV3', info='read')
544request_handlers.register_lazy(576request_handlers.register_lazy(
545 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',577 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
546 'SmartServerBzrDirRequestConfigFile')578 'SmartServerBzrDirRequestConfigFile', info='read')
547request_handlers.register_lazy(579request_handlers.register_lazy(
548 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',580 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
549 'SmartServerRequestInitializeBzrDir')581 'SmartServerRequestInitializeBzrDir', info='semi')
550request_handlers.register_lazy(582request_handlers.register_lazy(
551 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',583 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
552 'SmartServerRequestBzrDirInitializeEx')584 'SmartServerRequestBzrDirInitializeEx', info='semi')
553request_handlers.register_lazy(585request_handlers.register_lazy(
554 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')586 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
555request_handlers.register_lazy(587 info='read')
556 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')588request_handlers.register_lazy(
589 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
590 'SmartServerRequestOpenBzrDir_2_1', info='read')
557request_handlers.register_lazy(591request_handlers.register_lazy(
558 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',592 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
559 'SmartServerRequestOpenBranch')593 'SmartServerRequestOpenBranch', info='read')
560request_handlers.register_lazy(594request_handlers.register_lazy(
561 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',595 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
562 'SmartServerRequestOpenBranchV2')596 'SmartServerRequestOpenBranchV2', info='read')
563request_handlers.register_lazy(597request_handlers.register_lazy(
564 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',598 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
565 'SmartServerRequestOpenBranchV3')599 'SmartServerRequestOpenBranchV3', info='read')
566request_handlers.register_lazy(600request_handlers.register_lazy(
567 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')601 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
568request_handlers.register_lazy(602request_handlers.register_lazy(
569 'get', 'bzrlib.smart.vfs', 'GetRequest')603 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
570request_handlers.register_lazy(604request_handlers.register_lazy(
571 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')605 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
572request_handlers.register_lazy(606request_handlers.register_lazy(
573 'has', 'bzrlib.smart.vfs', 'HasRequest')607 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
574request_handlers.register_lazy(608request_handlers.register_lazy(
575 'hello', 'bzrlib.smart.request', 'HelloRequest')609 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
576request_handlers.register_lazy(610request_handlers.register_lazy(
577 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')611 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
578request_handlers.register_lazy(612 info='read')
579 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')613request_handlers.register_lazy(
580request_handlers.register_lazy(614 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
581 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')615request_handlers.register_lazy(
582request_handlers.register_lazy(616 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
583 'move', 'bzrlib.smart.vfs', 'MoveRequest')617request_handlers.register_lazy(
584request_handlers.register_lazy(618 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
585 'put', 'bzrlib.smart.vfs', 'PutRequest')619request_handlers.register_lazy(
586request_handlers.register_lazy(620 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
587 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')621request_handlers.register_lazy(
588request_handlers.register_lazy(622 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
589 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')623request_handlers.register_lazy(
590request_handlers.register_lazy(624 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
591 'rename', 'bzrlib.smart.vfs', 'RenameRequest')625request_handlers.register_lazy(
626 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
592request_handlers.register_lazy(627request_handlers.register_lazy(
593 'PackRepository.autopack', 'bzrlib.smart.packrepository',628 'PackRepository.autopack', 'bzrlib.smart.packrepository',
594 'SmartServerPackRepositoryAutopack')629 'SmartServerPackRepositoryAutopack', info='idem')
595request_handlers.register_lazy('Repository.gather_stats',630request_handlers.register_lazy(
596 'bzrlib.smart.repository',631 'Repository.gather_stats', 'bzrlib.smart.repository',
597 'SmartServerRepositoryGatherStats')632 'SmartServerRepositoryGatherStats', info='read')
598request_handlers.register_lazy('Repository.get_parent_map',633request_handlers.register_lazy(
599 'bzrlib.smart.repository',634 'Repository.get_parent_map', 'bzrlib.smart.repository',
600 'SmartServerRepositoryGetParentMap')635 'SmartServerRepositoryGetParentMap', info='read')
601request_handlers.register_lazy(636request_handlers.register_lazy(
602 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')637 'Repository.get_revision_graph', 'bzrlib.smart.repository',
603request_handlers.register_lazy(638 'SmartServerRepositoryGetRevisionGraph', info='read')
604 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')639request_handlers.register_lazy(
605request_handlers.register_lazy(640 'Repository.has_revision', 'bzrlib.smart.repository',
606 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')641 'SmartServerRequestHasRevision', info='read')
607request_handlers.register_lazy(642request_handlers.register_lazy(
608 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')643 'Repository.insert_stream', 'bzrlib.smart.repository',
609request_handlers.register_lazy(644 'SmartServerRepositoryInsertStream', info='stream')
610 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')645request_handlers.register_lazy(
611request_handlers.register_lazy(646 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
612 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')647 'SmartServerRepositoryInsertStream_1_19', info='stream')
613request_handlers.register_lazy(648request_handlers.register_lazy(
614 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')649 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
650 'SmartServerRepositoryInsertStreamLocked', info='stream')
651request_handlers.register_lazy(
652 'Repository.is_shared', 'bzrlib.smart.repository',
653 'SmartServerRepositoryIsShared', info='read')
654request_handlers.register_lazy(
655 'Repository.lock_write', 'bzrlib.smart.repository',
656 'SmartServerRepositoryLockWrite', info='semi')
615request_handlers.register_lazy(657request_handlers.register_lazy(
616 'Repository.set_make_working_trees', 'bzrlib.smart.repository',658 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
617 'SmartServerRepositorySetMakeWorkingTrees')659 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
618request_handlers.register_lazy(660request_handlers.register_lazy(
619 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')661 'Repository.unlock', 'bzrlib.smart.repository',
662 'SmartServerRepositoryUnlock', info='semi')
620request_handlers.register_lazy(663request_handlers.register_lazy(
621 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',664 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
622 'SmartServerRepositoryGetRevIdForRevno')665 'SmartServerRepositoryGetRevIdForRevno', info='read')
623request_handlers.register_lazy(666request_handlers.register_lazy(
624 'Repository.get_stream', 'bzrlib.smart.repository',667 'Repository.get_stream', 'bzrlib.smart.repository',
625 'SmartServerRepositoryGetStream')668 'SmartServerRepositoryGetStream', info='read')
626request_handlers.register_lazy(669request_handlers.register_lazy(
627 'Repository.get_stream_1.19', 'bzrlib.smart.repository',670 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
628 'SmartServerRepositoryGetStream_1_19')671 'SmartServerRepositoryGetStream_1_19', info='read')
629request_handlers.register_lazy(672request_handlers.register_lazy(
630 'Repository.tarball', 'bzrlib.smart.repository',673 'Repository.tarball', 'bzrlib.smart.repository',
631 'SmartServerRepositoryTarball')674 'SmartServerRepositoryTarball', info='read')
632request_handlers.register_lazy(675request_handlers.register_lazy(
633 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')676 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
634request_handlers.register_lazy(677request_handlers.register_lazy(
635 'stat', 'bzrlib.smart.vfs', 'StatRequest')678 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
636request_handlers.register_lazy(679request_handlers.register_lazy(
637 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')680 'Transport.is_readonly', 'bzrlib.smart.request',
681 'SmartServerIsReadonly', info='read')
638682
=== modified file 'bzrlib/tests/test_bundle.py'
--- bzrlib/tests/test_bundle.py 2010-02-17 17:11:16 +0000
+++ bzrlib/tests/test_bundle.py 2012-09-12 09:29:20 +0000
@@ -1855,20 +1855,23 @@
1855 self.sock.bind(('127.0.0.1', 0))1855 self.sock.bind(('127.0.0.1', 0))
1856 self.sock.listen(1)1856 self.sock.listen(1)
1857 self.port = self.sock.getsockname()[1]1857 self.port = self.sock.getsockname()[1]
1858 self.stopping = threading.Event()
1858 self.thread = threading.Thread(1859 self.thread = threading.Thread(
1859 name='%s (port %d)' % (self.__class__.__name__, self.port),1860 name='%s (port %d)' % (self.__class__.__name__, self.port),
1860 target=self.accept_and_close)1861 target=self.accept_and_close)
1861 self.thread.start()1862 self.thread.start()
18621863
1863 def accept_and_close(self):1864 def accept_and_close(self):
1864 conn, addr = self.sock.accept()1865 while not self.stopping.isSet():
1865 conn.shutdown(socket.SHUT_RDWR)1866 conn, addr = self.sock.accept()
1866 conn.close()1867 conn.shutdown(socket.SHUT_RDWR)
1868 conn.close()
18671869
1868 def get_url(self):1870 def get_url(self):
1869 return 'bzr://127.0.0.1:%d/' % (self.port,)1871 return 'bzr://127.0.0.1:%d/' % (self.port,)
18701872
1871 def stop_server(self):1873 def stop_server(self):
1874 self.stopping.set()
1872 try:1875 try:
1873 # make sure the thread dies by connecting to the listening socket,1876 # make sure the thread dies by connecting to the listening socket,
1874 # just in case the test failed to do so.1877 # just in case the test failed to do so.
18751878
=== modified file 'bzrlib/tests/test_osutils.py'
--- bzrlib/tests/test_osutils.py 2010-11-30 20:42:42 +0000
+++ bzrlib/tests/test_osutils.py 2012-09-12 09:29:20 +0000
@@ -801,6 +801,45 @@
801 self.assertEqual(None, osutils.safe_file_id(None))801 self.assertEqual(None, osutils.safe_file_id(None))
802802
803803
804class TestSendAll(tests.TestCase):
805
806 def test_send_with_disconnected_socket(self):
807 class DisconnectedSocket(object):
808 def __init__(self, err):
809 self.err = err
810 def send(self, content):
811 raise self.err
812 def close(self):
813 pass
814 # All of these should be treated as ConnectionReset
815 errs = []
816 for err_cls in (IOError, socket.error):
817 for errnum in osutils._end_of_stream_errors:
818 errs.append(err_cls(errnum))
819 for err in errs:
820 sock = DisconnectedSocket(err)
821 self.assertRaises(errors.ConnectionReset,
822 osutils.send_all, sock, 'some more content')
823
824 def test_send_with_no_progress(self):
825 # See https://bugs.launchpad.net/bzr/+bug/1047309
826 # It seems that paramiko can get into a state where it doesn't error,
827 # but it returns 0 bytes sent for requests over and over again.
828 class NoSendingSocket(object):
829 def __init__(self):
830 self.call_count = 0
831 def send(self, bytes):
832 self.call_count += 1
833 if self.call_count > 100:
834 # Prevent the test suite from hanging
835 raise RuntimeError('too many calls')
836 return 0
837 sock = NoSendingSocket()
838 self.assertRaises(errors.ConnectionReset,
839 osutils.send_all, sock, 'content')
840 self.assertEqual(1, sock.call_count)
841
842
804class TestWin32Funcs(tests.TestCase):843class TestWin32Funcs(tests.TestCase):
805 """Test that _win32 versions of os utilities return appropriate paths."""844 """Test that _win32 versions of os utilities return appropriate paths."""
806845
807846
=== modified file 'bzrlib/tests/test_smart_request.py'
--- bzrlib/tests/test_smart_request.py 2009-07-27 02:11:25 +0000
+++ bzrlib/tests/test_smart_request.py 2012-09-12 09:29:20 +0000
@@ -109,6 +109,16 @@
109 self.assertEqual(109 self.assertEqual(
110 [[transport]] * 3, handler._command.jail_transports_log)110 [[transport]] * 3, handler._command.jail_transports_log)
111111
112 def test_all_registered_requests_are_safety_qualified(self):
113 unclassified_requests = []
114 allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
115 for key in request.request_handlers.keys():
116 info = request.request_handlers.get_info(key)
117 if info is None or info not in allowed_info:
118 unclassified_requests.append(key)
119 if unclassified_requests:
120 self.fail('These requests were not categorized as safe/unsafe'
121 ' to retry: %s' % (unclassified_requests,))
112122
113123
114class TestSmartRequestHandlerErrorTranslation(TestCase):124class TestSmartRequestHandlerErrorTranslation(TestCase):
115125
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- bzrlib/tests/test_smart_transport.py 2010-02-17 17:11:16 +0000
+++ bzrlib/tests/test_smart_transport.py 2012-09-12 09:29:20 +0000
@@ -18,13 +18,17 @@
1818
19# all of this deals with byte strings so this is safe19# all of this deals with byte strings so this is safe
20from cStringIO import StringIO20from cStringIO import StringIO
21import errno
21import os22import os
22import socket23import socket
24import subprocess
25import sys
23import threading26import threading
2427
25import bzrlib28import bzrlib
26from bzrlib import (29from bzrlib import (
27 bzrdir,30 bzrdir,
31 debug,
28 errors,32 errors,
29 osutils,33 osutils,
30 tests,34 tests,
@@ -49,6 +53,29 @@
49from bzrlib.transport.http import SmartClientHTTPMediumRequest53from bzrlib.transport.http import SmartClientHTTPMediumRequest
5054
5155
56def create_file_pipes():
57 r, w = os.pipe()
58 # These must be opened without buffering, or we get undefined results
59 rf = os.fdopen(r, 'rb', 0)
60 wf = os.fdopen(w, 'wb', 0)
61 return rf, wf
62
63
64def portable_socket_pair():
65 """Return a pair of TCP sockets connected to each other.
66
67 Unlike socket.socketpair, this should work on Windows.
68 """
69 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
70 listen_sock.bind(('127.0.0.1', 0))
71 listen_sock.listen(1)
72 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
73 client_sock.connect(listen_sock.getsockname())
74 server_sock, addr = listen_sock.accept()
75 listen_sock.close()
76 return server_sock, client_sock
77
78
52class StringIOSSHVendor(object):79class StringIOSSHVendor(object):
53 """A SSH vendor that uses StringIO to buffer writes and answer reads."""80 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
5481
@@ -63,6 +90,27 @@
63 return StringIOSSHConnection(self)90 return StringIOSSHConnection(self)
6491
6592
93class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
94 """The first connection will be considered closed.
95
96 The second connection will succeed normally.
97 """
98
99 def __init__(self, read_from, write_to, fail_at_write=True):
100 super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
101 write_to)
102 self.fail_at_write = fail_at_write
103 self._first = True
104
105 def connect_ssh(self, username, password, host, port, command):
106 self.calls.append(('connect_ssh', username, password, host, port,
107 command))
108 if self._first:
109 self._first = False
110 return ClosedSSHConnection(self)
111 return StringIOSSHConnection(self)
112
113
66class StringIOSSHConnection(object):114class StringIOSSHConnection(object):
67 """A SSH connection that uses StringIO to buffer writes and answer reads."""115 """A SSH connection that uses StringIO to buffer writes and answer reads."""
68116
@@ -71,11 +119,36 @@
71119
72 def close(self):120 def close(self):
73 self.vendor.calls.append(('close', ))121 self.vendor.calls.append(('close', ))
122 self.vendor.read_from.close()
123 self.vendor.write_to.close()
74124
75 def get_filelike_channels(self):125 def get_filelike_channels(self):
76 return self.vendor.read_from, self.vendor.write_to126 return self.vendor.read_from, self.vendor.write_to
77127
78128
129class ClosedSSHConnection(object):
130 """An SSH connection that just has closed channels."""
131
132 def __init__(self, vendor):
133 self.vendor = vendor
134
135 def close(self):
136 self.vendor.calls.append(('close', ))
137
138 def get_filelike_channels(self):
139 # We create matching pipes, and then close the ssh side
140 bzr_read, ssh_write = create_file_pipes()
141 # We always fail when bzr goes to read
142 ssh_write.close()
143 if self.vendor.fail_at_write:
144 # If set, we'll also fail when bzr goes to write
145 ssh_read, bzr_write = create_file_pipes()
146 ssh_read.close()
147 else:
148 bzr_write = self.vendor.write_to
149 return bzr_read, bzr_write
150
151
79class _InvalidHostnameFeature(tests.Feature):152class _InvalidHostnameFeature(tests.Feature):
80 """Does 'non_existent.invalid' fail to resolve?153 """Does 'non_existent.invalid' fail to resolve?
81154
@@ -171,6 +244,91 @@
171 client_medium._accept_bytes('abc')244 client_medium._accept_bytes('abc')
172 self.assertEqual('abc', output.getvalue())245 self.assertEqual('abc', output.getvalue())
173246
247 def test_simple_pipes__accept_bytes_subprocess_closed(self):
248 # It is unfortunate that we have to use Popen for this. However,
249 # os.pipe() does not behave the same as subprocess.Popen().
250 # On Windows, if you use os.pipe() and close the write side,
251 # read.read() hangs. On Linux, read.read() returns the empty string.
252 p = subprocess.Popen([sys.executable, '-c',
253 'import sys\n'
254 'sys.stdout.write(sys.stdin.read(4))\n'
255 'sys.stdout.close()\n'],
256 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
257 client_medium = medium.SmartSimplePipesClientMedium(
258 p.stdout, p.stdin, 'base')
259 client_medium._accept_bytes('abc\n')
260 self.assertEqual('abc', client_medium._read_bytes(3))
261 p.wait()
262 # While writing to the underlying pipe,
263 # Windows py2.6.6 we get IOError(EINVAL)
264 # Lucid py2.6.5, we get IOError(EPIPE)
265 # In both cases, it should be wrapped to ConnectionReset
266 self.assertRaises(errors.ConnectionReset,
267 client_medium._accept_bytes, 'more')
268
269 def test_simple_pipes__accept_bytes_pipe_closed(self):
270 child_read, client_write = create_file_pipes()
271 client_medium = medium.SmartSimplePipesClientMedium(
272 None, client_write, 'base')
273 client_medium._accept_bytes('abc\n')
274 self.assertEqual('abc\n', child_read.read(4))
275 # While writing to the underlying pipe,
276 # Windows py2.6.6 we get IOError(EINVAL)
277 # Lucid py2.6.5, we get IOError(EPIPE)
278 # In both cases, it should be wrapped to ConnectionReset
279 child_read.close()
280 self.assertRaises(errors.ConnectionReset,
281 client_medium._accept_bytes, 'more')
282
283 def test_simple_pipes__flush_pipe_closed(self):
284 child_read, client_write = create_file_pipes()
285 client_medium = medium.SmartSimplePipesClientMedium(
286 None, client_write, 'base')
287 client_medium._accept_bytes('abc\n')
288 child_read.close()
289 # Even though the pipe is closed, flush on the write side seems to be a
290 # no-op, rather than a failure.
291 client_medium._flush()
292
293 def test_simple_pipes__flush_subprocess_closed(self):
294 p = subprocess.Popen([sys.executable, '-c',
295 'import sys\n'
296 'sys.stdout.write(sys.stdin.read(4))\n'
297 'sys.stdout.close()\n'],
298 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
299 client_medium = medium.SmartSimplePipesClientMedium(
300 p.stdout, p.stdin, 'base')
301 client_medium._accept_bytes('abc\n')
302 p.wait()
303 # Even though the child process is dead, flush seems to be a no-op.
304 client_medium._flush()
305
306 def test_simple_pipes__read_bytes_pipe_closed(self):
307 child_read, client_write = create_file_pipes()
308 client_medium = medium.SmartSimplePipesClientMedium(
309 child_read, client_write, 'base')
310 client_medium._accept_bytes('abc\n')
311 client_write.close()
312 self.assertEqual('abc\n', client_medium._read_bytes(4))
313 self.assertEqual('', client_medium._read_bytes(4))
314
315 def test_simple_pipes__read_bytes_subprocess_closed(self):
316 p = subprocess.Popen([sys.executable, '-c',
317 'import sys\n'
318 'if sys.platform == "win32":\n'
319 ' import msvcrt, os\n'
320 ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
321 ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
322 'sys.stdout.write(sys.stdin.read(4))\n'
323 'sys.stdout.close()\n'],
324 stdout=subprocess.PIPE, stdin=subprocess.PIPE)
325 client_medium = medium.SmartSimplePipesClientMedium(
326 p.stdout, p.stdin, 'base')
327 client_medium._accept_bytes('abc\n')
328 p.wait()
329 self.assertEqual('abc\n', client_medium._read_bytes(4))
330 self.assertEqual('', client_medium._read_bytes(4))
331
174 def test_simple_pipes_client_disconnect_does_nothing(self):332 def test_simple_pipes_client_disconnect_does_nothing(self):
175 # calling disconnect does nothing.333 # calling disconnect does nothing.
176 input = StringIO()334 input = StringIO()
@@ -556,6 +714,28 @@
556 request.finished_reading()714 request.finished_reading()
557 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)715 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
558716
717 def test_reset(self):
718 server_sock, client_sock = portable_socket_pair()
719 # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
720 # bzr where it exists.
721 client_medium = medium.SmartTCPClientMedium(None, None, None)
722 client_medium._socket = client_sock
723 client_medium._connected = True
724 req = client_medium.get_request()
725 self.assertRaises(errors.TooManyConcurrentRequests,
726 client_medium.get_request)
727 client_medium.reset()
728 # The stream should be reset, marked as disconnected, though ready for
729 # us to make a new request
730 self.assertFalse(client_medium._connected)
731 self.assertIs(None, client_medium._socket)
732 try:
733 self.assertEqual('', client_sock.recv(1))
734 except socket.error, e:
735 if e.errno not in (errno.EBADF,):
736 raise
737 req = client_medium.get_request()
738
559739
560class RemoteTransportTests(TestCaseWithSmartMedium):740class RemoteTransportTests(TestCaseWithSmartMedium):
561741
@@ -609,20 +789,6 @@
609 super(TestSmartServerStreamMedium, self).setUp()789 super(TestSmartServerStreamMedium, self).setUp()
610 self._captureVar('BZR_NO_SMART_VFS', None)790 self._captureVar('BZR_NO_SMART_VFS', None)
611791
612 def portable_socket_pair(self):
613 """Return a pair of TCP sockets connected to each other.
614
615 Unlike socket.socketpair, this should work on Windows.
616 """
617 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
618 listen_sock.bind(('127.0.0.1', 0))
619 listen_sock.listen(1)
620 client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
621 client_sock.connect(listen_sock.getsockname())
622 server_sock, addr = listen_sock.accept()
623 listen_sock.close()
624 return server_sock, client_sock
625
626 def test_smart_query_version(self):792 def test_smart_query_version(self):
627 """Feed a canned query version to a server"""793 """Feed a canned query version to a server"""
628 # wire-to-wire, using the whole stack794 # wire-to-wire, using the whole stack
@@ -687,7 +853,7 @@
687853
688 def test_socket_stream_with_bulk_data(self):854 def test_socket_stream_with_bulk_data(self):
689 sample_request_bytes = 'command\n9\nbulk datadone\n'855 sample_request_bytes = 'command\n9\nbulk datadone\n'
690 server_sock, client_sock = self.portable_socket_pair()856 server_sock, client_sock = portable_socket_pair()
691 server = medium.SmartServerSocketStreamMedium(857 server = medium.SmartServerSocketStreamMedium(
692 server_sock, None)858 server_sock, None)
693 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)859 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -706,7 +872,7 @@
706 self.assertTrue(server.finished)872 self.assertTrue(server.finished)
707873
708 def test_socket_stream_shutdown_detection(self):874 def test_socket_stream_shutdown_detection(self):
709 server_sock, client_sock = self.portable_socket_pair()875 server_sock, client_sock = portable_socket_pair()
710 client_sock.close()876 client_sock.close()
711 server = medium.SmartServerSocketStreamMedium(877 server = medium.SmartServerSocketStreamMedium(
712 server_sock, None)878 server_sock, None)
@@ -726,7 +892,7 @@
726 rest_of_request_bytes = 'lo\n'892 rest_of_request_bytes = 'lo\n'
727 expected_response = (893 expected_response = (
728 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')894 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
729 server_sock, client_sock = self.portable_socket_pair()895 server_sock, client_sock = portable_socket_pair()
730 server = medium.SmartServerSocketStreamMedium(896 server = medium.SmartServerSocketStreamMedium(
731 server_sock, None)897 server_sock, None)
732 client_sock.sendall(incomplete_request_bytes)898 client_sock.sendall(incomplete_request_bytes)
@@ -802,7 +968,7 @@
802 # _serve_one_request should still process both of them as if they had968 # _serve_one_request should still process both of them as if they had
803 # been received separately.969 # been received separately.
804 sample_request_bytes = 'command\n'970 sample_request_bytes = 'command\n'
805 server_sock, client_sock = self.portable_socket_pair()971 server_sock, client_sock = portable_socket_pair()
806 server = medium.SmartServerSocketStreamMedium(972 server = medium.SmartServerSocketStreamMedium(
807 server_sock, None)973 server_sock, None)
808 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)974 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
@@ -839,7 +1005,7 @@
839 self.assertTrue(server.finished)1005 self.assertTrue(server.finished)
8401006
841 def test_socket_stream_error_handling(self):1007 def test_socket_stream_error_handling(self):
842 server_sock, client_sock = self.portable_socket_pair()1008 server_sock, client_sock = portable_socket_pair()
843 server = medium.SmartServerSocketStreamMedium(1009 server = medium.SmartServerSocketStreamMedium(
844 server_sock, None)1010 server_sock, None)
845 fake_protocol = ErrorRaisingProtocol(Exception('boom'))1011 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
@@ -860,7 +1026,7 @@
860 self.assertEqual('', from_server.getvalue())1026 self.assertEqual('', from_server.getvalue())
8611027
862 def test_socket_stream_keyboard_interrupt_handling(self):1028 def test_socket_stream_keyboard_interrupt_handling(self):
863 server_sock, client_sock = self.portable_socket_pair()1029 server_sock, client_sock = portable_socket_pair()
864 server = medium.SmartServerSocketStreamMedium(1030 server = medium.SmartServerSocketStreamMedium(
865 server_sock, None)1031 server_sock, None)
866 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))1032 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
@@ -877,7 +1043,7 @@
877 return server._build_protocol()1043 return server._build_protocol()
8781044
879 def build_protocol_socket(self, bytes):1045 def build_protocol_socket(self, bytes):
880 server_sock, client_sock = self.portable_socket_pair()1046 server_sock, client_sock = portable_socket_pair()
881 server = medium.SmartServerSocketStreamMedium(1047 server = medium.SmartServerSocketStreamMedium(
882 server_sock, None)1048 server_sock, None)
883 client_sock.sendall(bytes)1049 client_sock.sendall(bytes)
@@ -2778,6 +2944,33 @@
2778 'e', # end2944 'e', # end
2779 output.getvalue())2945 output.getvalue())
27802946
2947 def test_records_start_of_body_stream(self):
2948 requester, output = self.make_client_encoder_and_output()
2949 requester.set_headers({})
2950 in_stream = [False]
2951 def stream_checker():
2952 self.assertTrue(requester.body_stream_started)
2953 in_stream[0] = True
2954 yield 'content'
2955 flush_called = []
2956 orig_flush = requester.flush
2957 def tracked_flush():
2958 flush_called.append(in_stream[0])
2959 if in_stream[0]:
2960 self.assertTrue(requester.body_stream_started)
2961 else:
2962 self.assertFalse(requester.body_stream_started)
2963 return orig_flush()
2964 requester.flush = tracked_flush
2965 requester.call_with_body_stream(('one arg',), stream_checker())
2966 self.assertEqual(
2967 'bzr message 3 (bzr 1.6)\n' # protocol version
2968 '\x00\x00\x00\x02de' # headers
2969 's\x00\x00\x00\x0bl7:one arge' # args
2970 'b\x00\x00\x00\x07content' # body
2971 'e', output.getvalue())
2972 self.assertEqual([False, True, True], flush_called)
2973
27812974
2782class StubMediumRequest(object):2975class StubMediumRequest(object):
2783 """A stub medium request that tracks the number of times accept_bytes is2976 """A stub medium request that tracks the number of times accept_bytes is
@@ -3214,6 +3407,195 @@
3214 # encoder.3407 # encoder.
32153408
32163409
3410class Test_SmartClientRequest(tests.TestCase):
3411
3412 def make_client_with_failing_medium(self, fail_at_write=True, response=''):
3413 response_io = StringIO(response)
3414 output = StringIO()
3415 vendor = FirstRejectedStringIOSSHVendor(response_io, output,
3416 fail_at_write=fail_at_write)
3417 client_medium = medium.SmartSSHClientMedium(
3418 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
3419 'bzr')
3420 smart_client = client._SmartClient(client_medium, headers={})
3421 return output, vendor, smart_client
3422
3423 def make_response(self, args, body=None, body_stream=None):
3424 response_io = StringIO()
3425 response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
3426 body_stream=body_stream)
3427 responder = protocol.ProtocolThreeResponder(response_io.write)
3428 responder.send_response(response)
3429 return response_io.getvalue()
3430
3431 def test__call_doesnt_retry_append(self):
3432 response = self.make_response(('appended', '8'))
3433 output, vendor, smart_client = self.make_client_with_failing_medium(
3434 fail_at_write=False, response=response)
3435 smart_request = client._SmartClientRequest(smart_client, 'append',
3436 ('foo', ''), body='content\n')
3437 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3438
3439 def test__call_retries_get_bytes(self):
3440 response = self.make_response(('ok',), 'content\n')
3441 output, vendor, smart_client = self.make_client_with_failing_medium(
3442 fail_at_write=False, response=response)
3443 smart_request = client._SmartClientRequest(smart_client, 'get',
3444 ('foo',))
3445 response, response_handler = smart_request._call(3)
3446 self.assertEqual(('ok',), response)
3447 self.assertEqual('content\n', response_handler.read_body_bytes())
3448
3449 def test__call_noretry_get_bytes(self):
3450 debug.debug_flags.add('noretry')
3451 response = self.make_response(('ok',), 'content\n')
3452 output, vendor, smart_client = self.make_client_with_failing_medium(
3453 fail_at_write=False, response=response)
3454 smart_request = client._SmartClientRequest(smart_client, 'get',
3455 ('foo',))
3456 self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
3457
3458 def test__send_no_retry_pipes(self):
3459 client_read, server_write = create_file_pipes()
3460 server_read, client_write = create_file_pipes()
3461 client_medium = medium.SmartSimplePipesClientMedium(client_read,
3462 client_write, base='/')
3463 smart_client = client._SmartClient(client_medium)
3464 smart_request = client._SmartClientRequest(smart_client,
3465 'hello', ())
3466 # Close the server side
3467 server_read.close()
3468 encoder, response_handler = smart_request._construct_protocol(3)
3469 self.assertRaises(errors.ConnectionReset,
3470 smart_request._send_no_retry, encoder)
3471
3472 def test__send_read_response_sockets(self):
3473 listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3474 listen_sock.bind(('127.0.0.1', 0))
3475 listen_sock.listen(1)
3476 host, port = listen_sock.getsockname()
3477 client_medium = medium.SmartTCPClientMedium(host, port, '/')
3478 client_medium._ensure_connection()
3479 smart_client = client._SmartClient(client_medium)
3480 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3481 # Accept the connection, but don't actually talk to the client.
3482 server_sock, _ = listen_sock.accept()
3483 server_sock.close()
3484 # Sockets buffer and don't really notice that the server has closed the
3485 # connection until we try to read again.
3486 handler = smart_request._send(3)
3487 self.assertRaises(errors.ConnectionReset,
3488 handler.read_response_tuple, expect_body=False)
3489
3490 def test__send_retries_on_write(self):
3491 output, vendor, smart_client = self.make_client_with_failing_medium()
3492 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3493 handler = smart_request._send(3)
3494 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3495 '\x00\x00\x00\x02de' # empty headers
3496 's\x00\x00\x00\tl5:helloee',
3497 output.getvalue())
3498 self.assertEqual(
3499 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3500 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3501 ('close',),
3502 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3503 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3504 ],
3505 vendor.calls)
3506
3507 def test__send_doesnt_retry_read_failure(self):
3508 output, vendor, smart_client = self.make_client_with_failing_medium(
3509 fail_at_write=False)
3510 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3511 handler = smart_request._send(3)
3512 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3513 '\x00\x00\x00\x02de' # empty headers
3514 's\x00\x00\x00\tl5:helloee',
3515 output.getvalue())
3516 self.assertEqual(
3517 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3518 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3519 ],
3520 vendor.calls)
3521 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
3522
3523 def test__send_request_retries_body_stream_if_not_started(self):
3524 output, vendor, smart_client = self.make_client_with_failing_medium()
3525 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3526 body_stream=['a', 'b'])
3527 response_handler = smart_request._send(3)
3528 # We connect, get disconnected, and notice before consuming the stream,
3529 # so we try again one time and succeed.
3530 self.assertEqual(
3531 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3532 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3533 ('close',),
3534 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3535 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3536 ],
3537 vendor.calls)
3538 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3539 '\x00\x00\x00\x02de' # empty headers
3540 's\x00\x00\x00\tl5:helloe'
3541 'b\x00\x00\x00\x01a'
3542 'b\x00\x00\x00\x01b'
3543 'e',
3544 output.getvalue())
3545
3546 def test__send_request_stops_if_body_started(self):
3547 # We intentionally use the python StringIO so that we can subclass it.
3548 from StringIO import StringIO
3549 response = StringIO()
3550
3551 class FailAfterFirstWrite(StringIO):
3552 """Allow one 'write' call to pass, fail the rest"""
3553 def __init__(self):
3554 StringIO.__init__(self)
3555 self._first = True
3556
3557 def write(self, s):
3558 if self._first:
3559 self._first = False
3560 return StringIO.write(self, s)
3561 raise IOError(errno.EINVAL, 'invalid file handle')
3562 output = FailAfterFirstWrite()
3563
3564 vendor = FirstRejectedStringIOSSHVendor(response, output,
3565 fail_at_write=False)
3566 client_medium = medium.SmartSSHClientMedium(
3567 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
3568 'bzr')
3569 smart_client = client._SmartClient(client_medium, headers={})
3570 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3571 body_stream=['a', 'b'])
3572 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3573 # We connect, and manage to get to the point that we start consuming
3574 # the body stream. The next write fails, so we just stop.
3575 self.assertEqual(
3576 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3577 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3578 ('close',),
3579 ],
3580 vendor.calls)
3581 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3582 '\x00\x00\x00\x02de' # empty headers
3583 's\x00\x00\x00\tl5:helloe',
3584 output.getvalue())
3585
3586 def test__send_disabled_retry(self):
3587 debug.debug_flags.add('noretry')
3588 output, vendor, smart_client = self.make_client_with_failing_medium()
3589 smart_request = client._SmartClientRequest(smart_client, 'hello', ())
3590 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3591 self.assertEqual(
3592 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3593 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3594 ('close',),
3595 ],
3596 vendor.calls)
3597
3598
3217class LengthPrefixedBodyDecoder(tests.TestCase):3599class LengthPrefixedBodyDecoder(tests.TestCase):
32183600
3219 # XXX: TODO: make accept_reading_trailer invoke translate_response or3601 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches