Merge lp:~jameinel/bzr/2.1-all-reconnect-819604 into lp:bzr/2.1
- 2.1-all-reconnect-819604
- Merge into 2.1
Status: | Rejected |
---|---|
Rejected by: | John A Meinel |
Proposed branch: | lp:~jameinel/bzr/2.1-all-reconnect-819604 |
Merge into: | lp:bzr/2.1 |
Diff against target: |
1807 lines (+970/-289) 11 files modified
NEWS (+5/-0) bzrlib/help_topics/en/debug-flags.txt (+2/-0) bzrlib/osutils.py (+71/-13) bzrlib/smart/client.py (+208/-86) bzrlib/smart/medium.py (+69/-62) bzrlib/smart/protocol.py (+15/-6) bzrlib/smart/request.py (+142/-98) bzrlib/tests/test_bundle.py (+6/-3) bzrlib/tests/test_osutils.py (+39/-0) bzrlib/tests/test_smart_request.py (+10/-0) bzrlib/tests/test_smart_transport.py (+403/-21) |
To merge this branch: | bzr merge lp:~jameinel/bzr/2.1-all-reconnect-819604 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Richard Wilbur | Approve | ||
Review via email: mp+123901@code.launchpad.net |
Commit message
Bring the code to have clients reconnect-
Description of the change
This is a rollup of all of my client-reconnect patches against bzr-2.1 which is the version in our previous LTS Lucid.
The patch ends up pretty big, as I had originally split it up into a few patches for review. However, most of this is identical to what we have in bzr-2.5.
The specific differences from 2.5 at this point are:
1) No ConnectionTimeout code. This was meant for the server side, not the client. So no need to bring that code in now.
2) No AlreadyConnected client medium. This was designed around communicating with the SSH subprocess using socketpair. However, I didn't want to bring in all of that code (and it was even a source of one of the bugs we had to fix in 2.5).
3) A bunch of more verbs that we added to bzrlib/
This is where I targeted my original changes, so I went back to this level to finish the job.
I'm reasonably comfortable with this patch at this point. I wish it were a bit smaller for a stable series, but the code is quite well tested on newer versions.
John A Meinel (jameinel) wrote : | # |
sent to pqm by email
John A Meinel (jameinel) wrote : | # |
This has sat for too long, that I don't think its value has held. It doesn't land directly (some test suite failures, etc). And the patch is much bigger than the later versions of bzr because of the internal restructuring that has happened since 2.1. So I'm just rejecting this to get it out of the queue.
Preview Diff
1 | === modified file 'NEWS' | |||
2 | --- NEWS 2012-02-02 14:08:45 +0000 | |||
3 | +++ NEWS 2012-09-12 09:29:20 +0000 | |||
4 | @@ -43,6 +43,11 @@ | |||
5 | 43 | 43 | ||
6 | 44 | (John Arbash Meinel, #609187, #812928) | 44 | (John Arbash Meinel, #609187, #812928) |
7 | 45 | 45 | ||
8 | 46 | * Teach the bzr client how to reconnect if we get ``ConnectionReset`` | ||
9 | 47 | while making an RPC request. This doesn't handle all possible network | ||
10 | 48 | disconnects, but it should at least handle when the server is asked to | ||
11 | 49 | shutdown gracefully. (John Arbash Meinel, #819604) | ||
12 | 50 | |||
13 | 46 | 51 | ||
14 | 47 | Improvements | 52 | Improvements |
15 | 48 | ************ | 53 | ************ |
16 | 49 | 54 | ||
17 | === modified file 'bzrlib/help_topics/en/debug-flags.txt' | |||
18 | --- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000 | |||
19 | +++ bzrlib/help_topics/en/debug-flags.txt 2012-09-12 09:29:20 +0000 | |||
20 | @@ -24,6 +24,8 @@ | |||
21 | 24 | -Dindex Trace major index operations. | 24 | -Dindex Trace major index operations. |
22 | 25 | -Dknit Trace knit operations. | 25 | -Dknit Trace knit operations. |
23 | 26 | -Dlock Trace when lockdir locks are taken or released. | 26 | -Dlock Trace when lockdir locks are taken or released. |
24 | 27 | -Dnoretry If a connection is reset, fail immediately rather than | ||
25 | 28 | retrying the request. | ||
26 | 27 | -Dprogress Trace progress bar operations. | 29 | -Dprogress Trace progress bar operations. |
27 | 28 | -Dmerge Emit information for debugging merges. | 30 | -Dmerge Emit information for debugging merges. |
28 | 29 | -Dno_apport Don't use apport to report crashes. | 31 | -Dno_apport Don't use apport to report crashes. |
29 | 30 | 32 | ||
30 | === modified file 'bzrlib/osutils.py' | |||
31 | --- bzrlib/osutils.py 2010-05-27 04:00:01 +0000 | |||
32 | +++ bzrlib/osutils.py 2012-09-12 09:29:20 +0000 | |||
33 | @@ -40,6 +40,7 @@ | |||
34 | 40 | rmtree, | 40 | rmtree, |
35 | 41 | ) | 41 | ) |
36 | 42 | import signal | 42 | import signal |
37 | 43 | import socket | ||
38 | 43 | import subprocess | 44 | import subprocess |
39 | 44 | import tempfile | 45 | import tempfile |
40 | 45 | from tempfile import ( | 46 | from tempfile import ( |
41 | @@ -1929,40 +1930,97 @@ | |||
42 | 1929 | return socket.gethostname().decode(get_user_encoding()) | 1930 | return socket.gethostname().decode(get_user_encoding()) |
43 | 1930 | 1931 | ||
44 | 1931 | 1932 | ||
46 | 1932 | def recv_all(socket, bytes): | 1933 | # We must not read/write any more than 64k at a time from/to a socket so we |
47 | 1934 | # don't risk "no buffer space available" errors on some platforms. Windows in | ||
48 | 1935 | # particular is likely to throw WSAECONNABORTED or WSAENOBUFS if given too much | ||
49 | 1936 | # data at once. | ||
50 | 1937 | MAX_SOCKET_CHUNK = 64 * 1024 | ||
51 | 1938 | |||
52 | 1939 | _end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL] | ||
53 | 1940 | for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']: | ||
54 | 1941 | _eno = getattr(errno, _eno, None) | ||
55 | 1942 | if _eno is not None: | ||
56 | 1943 | _end_of_stream_errors.append(_eno) | ||
57 | 1944 | del _eno | ||
58 | 1945 | |||
59 | 1946 | |||
60 | 1947 | def read_bytes_from_socket(sock, report_activity=None, | ||
61 | 1948 | max_read_size=MAX_SOCKET_CHUNK): | ||
62 | 1949 | """Read up to max_read_size of bytes from sock and notify of progress. | ||
63 | 1950 | |||
64 | 1951 | Translates "Connection reset by peer" into file-like EOF (return an | ||
65 | 1952 | empty string rather than raise an error), and repeats the recv if | ||
66 | 1953 | interrupted by a signal. | ||
67 | 1954 | """ | ||
68 | 1955 | while 1: | ||
69 | 1956 | try: | ||
70 | 1957 | bytes = sock.recv(max_read_size) | ||
71 | 1958 | except socket.error, e: | ||
72 | 1959 | eno = e.args[0] | ||
73 | 1960 | if eno in _end_of_stream_errors: | ||
74 | 1961 | # The connection was closed by the other side. Callers expect | ||
75 | 1962 | # an empty string to signal end-of-stream. | ||
76 | 1963 | return "" | ||
77 | 1964 | elif eno == errno.EINTR: | ||
78 | 1965 | # Retry the interrupted recv. | ||
79 | 1966 | continue | ||
80 | 1967 | raise | ||
81 | 1968 | else: | ||
82 | 1969 | if report_activity is not None: | ||
83 | 1970 | report_activity(len(bytes), 'read') | ||
84 | 1971 | return bytes | ||
85 | 1972 | |||
86 | 1973 | |||
87 | 1974 | def recv_all(socket, count): | ||
88 | 1933 | """Receive an exact number of bytes. | 1975 | """Receive an exact number of bytes. |
89 | 1934 | 1976 | ||
90 | 1935 | Regular Socket.recv() may return less than the requested number of bytes, | 1977 | Regular Socket.recv() may return less than the requested number of bytes, |
92 | 1936 | dependning on what's in the OS buffer. MSG_WAITALL is not available | 1978 | depending on what's in the OS buffer. MSG_WAITALL is not available |
93 | 1937 | on all platforms, but this should work everywhere. This will return | 1979 | on all platforms, but this should work everywhere. This will return |
94 | 1938 | less than the requested amount if the remote end closes. | 1980 | less than the requested amount if the remote end closes. |
95 | 1939 | 1981 | ||
96 | 1940 | This isn't optimized and is intended mostly for use in testing. | 1982 | This isn't optimized and is intended mostly for use in testing. |
97 | 1941 | """ | 1983 | """ |
98 | 1942 | b = '' | 1984 | b = '' |
101 | 1943 | while len(b) < bytes: | 1985 | while len(b) < count: |
102 | 1944 | new = until_no_eintr(socket.recv, bytes - len(b)) | 1986 | new = read_bytes_from_socket(socket, None, count - len(b)) |
103 | 1945 | if new == '': | 1987 | if new == '': |
104 | 1946 | break # eof | 1988 | break # eof |
105 | 1947 | b += new | 1989 | b += new |
106 | 1948 | return b | 1990 | return b |
107 | 1949 | 1991 | ||
108 | 1950 | 1992 | ||
110 | 1951 | def send_all(socket, bytes, report_activity=None): | 1993 | def send_all(sock, bytes, report_activity=None): |
111 | 1952 | """Send all bytes on a socket. | 1994 | """Send all bytes on a socket. |
112 | 1953 | 1995 | ||
115 | 1954 | Regular socket.sendall() can give socket error 10053 on Windows. This | 1996 | Breaks large blocks in smaller chunks to avoid buffering limitations on |
116 | 1955 | implementation sends no more than 64k at a time, which avoids this problem. | 1997 | some platforms, and catches EINTR which may be thrown if the send is |
117 | 1998 | interrupted by a signal. | ||
118 | 1999 | |||
119 | 2000 | This is preferred to socket.sendall(), because it avoids portability bugs | ||
120 | 2001 | and provides activity reporting. | ||
121 | 1956 | 2002 | ||
122 | 1957 | :param report_activity: Call this as bytes are read, see | 2003 | :param report_activity: Call this as bytes are read, see |
123 | 1958 | Transport._report_activity | 2004 | Transport._report_activity |
124 | 1959 | """ | 2005 | """ |
131 | 1960 | chunk_size = 2**16 | 2006 | sent_total = 0 |
132 | 1961 | for pos in xrange(0, len(bytes), chunk_size): | 2007 | byte_count = len(bytes) |
133 | 1962 | block = bytes[pos:pos+chunk_size] | 2008 | while sent_total < byte_count: |
134 | 1963 | if report_activity is not None: | 2009 | try: |
135 | 1964 | report_activity(len(block), 'write') | 2010 | sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK)) |
136 | 1965 | until_no_eintr(socket.sendall, block) | 2011 | except (socket.error, IOError), e: |
137 | 2012 | if e.args[0] in _end_of_stream_errors: | ||
138 | 2013 | raise errors.ConnectionReset( | ||
139 | 2014 | "Error trying to write to socket", e) | ||
140 | 2015 | if e.args[0] != errno.EINTR: | ||
141 | 2016 | raise | ||
142 | 2017 | else: | ||
143 | 2018 | if sent == 0: | ||
144 | 2019 | raise errors.ConnectionReset('Sending to %s returned 0 bytes' | ||
145 | 2020 | % (sock,)) | ||
146 | 2021 | sent_total += sent | ||
147 | 2022 | if report_activity is not None: | ||
148 | 2023 | report_activity(sent, 'write') | ||
149 | 1966 | 2024 | ||
150 | 1967 | 2025 | ||
151 | 1968 | def dereference_path(path): | 2026 | def dereference_path(path): |
152 | 1969 | 2027 | ||
153 | === modified file 'bzrlib/smart/client.py' | |||
154 | --- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000 | |||
155 | +++ bzrlib/smart/client.py 2012-09-12 09:29:20 +0000 | |||
156 | @@ -14,12 +14,18 @@ | |||
157 | 14 | # along with this program; if not, write to the Free Software | 14 | # along with this program; if not, write to the Free Software |
158 | 15 | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | 15 | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
159 | 16 | 16 | ||
160 | 17 | from bzrlib import lazy_import | ||
161 | 18 | lazy_import.lazy_import(globals(), """ | ||
162 | 19 | from bzrlib.smart import request as _mod_request | ||
163 | 20 | """) | ||
164 | 21 | |||
165 | 17 | import bzrlib | 22 | import bzrlib |
166 | 18 | from bzrlib.smart import message, protocol | 23 | from bzrlib.smart import message, protocol |
167 | 19 | from bzrlib.trace import warning | ||
168 | 20 | from bzrlib import ( | 24 | from bzrlib import ( |
169 | 25 | debug, | ||
170 | 21 | errors, | 26 | errors, |
171 | 22 | hooks, | 27 | hooks, |
172 | 28 | trace, | ||
173 | 23 | ) | 29 | ) |
174 | 24 | 30 | ||
175 | 25 | 31 | ||
176 | @@ -39,93 +45,12 @@ | |||
177 | 39 | def __repr__(self): | 45 | def __repr__(self): |
178 | 40 | return '%s(%r)' % (self.__class__.__name__, self._medium) | 46 | return '%s(%r)' % (self.__class__.__name__, self._medium) |
179 | 41 | 47 | ||
180 | 42 | def _send_request(self, protocol_version, method, args, body=None, | ||
181 | 43 | readv_body=None, body_stream=None): | ||
182 | 44 | encoder, response_handler = self._construct_protocol( | ||
183 | 45 | protocol_version) | ||
184 | 46 | encoder.set_headers(self._headers) | ||
185 | 47 | if body is not None: | ||
186 | 48 | if readv_body is not None: | ||
187 | 49 | raise AssertionError( | ||
188 | 50 | "body and readv_body are mutually exclusive.") | ||
189 | 51 | if body_stream is not None: | ||
190 | 52 | raise AssertionError( | ||
191 | 53 | "body and body_stream are mutually exclusive.") | ||
192 | 54 | encoder.call_with_body_bytes((method, ) + args, body) | ||
193 | 55 | elif readv_body is not None: | ||
194 | 56 | if body_stream is not None: | ||
195 | 57 | raise AssertionError( | ||
196 | 58 | "readv_body and body_stream are mutually exclusive.") | ||
197 | 59 | encoder.call_with_body_readv_array((method, ) + args, readv_body) | ||
198 | 60 | elif body_stream is not None: | ||
199 | 61 | encoder.call_with_body_stream((method, ) + args, body_stream) | ||
200 | 62 | else: | ||
201 | 63 | encoder.call(method, *args) | ||
202 | 64 | return response_handler | ||
203 | 65 | |||
204 | 66 | def _run_call_hooks(self, method, args, body, readv_body): | ||
205 | 67 | if not _SmartClient.hooks['call']: | ||
206 | 68 | return | ||
207 | 69 | params = CallHookParams(method, args, body, readv_body, self._medium) | ||
208 | 70 | for hook in _SmartClient.hooks['call']: | ||
209 | 71 | hook(params) | ||
210 | 72 | |||
211 | 73 | def _call_and_read_response(self, method, args, body=None, readv_body=None, | 48 | def _call_and_read_response(self, method, args, body=None, readv_body=None, |
212 | 74 | body_stream=None, expect_response_body=True): | 49 | body_stream=None, expect_response_body=True): |
267 | 75 | self._run_call_hooks(method, args, body, readv_body) | 50 | request = _SmartClientRequest(self, method, args, body=body, |
268 | 76 | if self._medium._protocol_version is not None: | 51 | readv_body=readv_body, body_stream=body_stream, |
269 | 77 | response_handler = self._send_request( | 52 | expect_response_body=expect_response_body) |
270 | 78 | self._medium._protocol_version, method, args, body=body, | 53 | return request.call_and_read_response() |
217 | 79 | readv_body=readv_body, body_stream=body_stream) | ||
218 | 80 | return (response_handler.read_response_tuple( | ||
219 | 81 | expect_body=expect_response_body), | ||
220 | 82 | response_handler) | ||
221 | 83 | else: | ||
222 | 84 | for protocol_version in [3, 2]: | ||
223 | 85 | if protocol_version == 2: | ||
224 | 86 | # If v3 doesn't work, the remote side is older than 1.6. | ||
225 | 87 | self._medium._remember_remote_is_before((1, 6)) | ||
226 | 88 | response_handler = self._send_request( | ||
227 | 89 | protocol_version, method, args, body=body, | ||
228 | 90 | readv_body=readv_body, body_stream=body_stream) | ||
229 | 91 | try: | ||
230 | 92 | response_tuple = response_handler.read_response_tuple( | ||
231 | 93 | expect_body=expect_response_body) | ||
232 | 94 | except errors.UnexpectedProtocolVersionMarker, err: | ||
233 | 95 | # TODO: We could recover from this without disconnecting if | ||
234 | 96 | # we recognise the protocol version. | ||
235 | 97 | warning( | ||
236 | 98 | 'Server does not understand Bazaar network protocol %d,' | ||
237 | 99 | ' reconnecting. (Upgrade the server to avoid this.)' | ||
238 | 100 | % (protocol_version,)) | ||
239 | 101 | self._medium.disconnect() | ||
240 | 102 | continue | ||
241 | 103 | except errors.ErrorFromSmartServer: | ||
242 | 104 | # If we received an error reply from the server, then it | ||
243 | 105 | # must be ok with this protocol version. | ||
244 | 106 | self._medium._protocol_version = protocol_version | ||
245 | 107 | raise | ||
246 | 108 | else: | ||
247 | 109 | self._medium._protocol_version = protocol_version | ||
248 | 110 | return response_tuple, response_handler | ||
249 | 111 | raise errors.SmartProtocolError( | ||
250 | 112 | 'Server is not a Bazaar server: ' + str(err)) | ||
251 | 113 | |||
252 | 114 | def _construct_protocol(self, version): | ||
253 | 115 | request = self._medium.get_request() | ||
254 | 116 | if version == 3: | ||
255 | 117 | request_encoder = protocol.ProtocolThreeRequester(request) | ||
256 | 118 | response_handler = message.ConventionalResponseHandler() | ||
257 | 119 | response_proto = protocol.ProtocolThreeDecoder( | ||
258 | 120 | response_handler, expect_version_marker=True) | ||
259 | 121 | response_handler.setProtoAndMediumRequest(response_proto, request) | ||
260 | 122 | elif version == 2: | ||
261 | 123 | request_encoder = protocol.SmartClientRequestProtocolTwo(request) | ||
262 | 124 | response_handler = request_encoder | ||
263 | 125 | else: | ||
264 | 126 | request_encoder = protocol.SmartClientRequestProtocolOne(request) | ||
265 | 127 | response_handler = request_encoder | ||
266 | 128 | return request_encoder, response_handler | ||
271 | 129 | 54 | ||
272 | 130 | def call(self, method, *args): | 55 | def call(self, method, *args): |
273 | 131 | """Call a method on the remote server.""" | 56 | """Call a method on the remote server.""" |
274 | @@ -191,6 +116,203 @@ | |||
275 | 191 | return self._medium.remote_path_from_transport(transport) | 116 | return self._medium.remote_path_from_transport(transport) |
276 | 192 | 117 | ||
277 | 193 | 118 | ||
278 | 119 | class _SmartClientRequest(object): | ||
279 | 120 | """Encapsulate the logic for a single request. | ||
280 | 121 | |||
281 | 122 | This class handles things like reconnecting and sending the request a | ||
282 | 123 | second time when the connection is reset in the middle. It also handles the | ||
283 | 124 | multiple requests that get made if we don't know what protocol the server | ||
284 | 125 | supports yet. | ||
285 | 126 | |||
286 | 127 | Generally, you build up one of these objects, passing in the arguments that | ||
287 | 128 | you want to send to the server, and then use 'call_and_read_response' to | ||
288 | 129 | get the response from the server. | ||
289 | 130 | """ | ||
290 | 131 | |||
291 | 132 | def __init__(self, client, method, args, body=None, readv_body=None, | ||
292 | 133 | body_stream=None, expect_response_body=True): | ||
293 | 134 | self.client = client | ||
294 | 135 | self.method = method | ||
295 | 136 | self.args = args | ||
296 | 137 | self.body = body | ||
297 | 138 | self.readv_body = readv_body | ||
298 | 139 | self.body_stream = body_stream | ||
299 | 140 | self.expect_response_body = expect_response_body | ||
300 | 141 | |||
301 | 142 | def call_and_read_response(self): | ||
302 | 143 | """Send the request to the server, and read the initial response. | ||
303 | 144 | |||
304 | 145 | This doesn't read all of the body content of the response, instead it | ||
305 | 146 | returns (response_tuple, response_handler). response_tuple is the 'ok', | ||
306 | 147 | or 'error' information, and 'response_handler' can be used to get the | ||
307 | 148 | content stream out. | ||
308 | 149 | """ | ||
309 | 150 | self._run_call_hooks() | ||
310 | 151 | protocol_version = self.client._medium._protocol_version | ||
311 | 152 | if protocol_version is None: | ||
312 | 153 | return self._call_determining_protocol_version() | ||
313 | 154 | else: | ||
314 | 155 | return self._call(protocol_version) | ||
315 | 156 | |||
316 | 157 | def _is_safe_to_send_twice(self): | ||
317 | 158 | """Check if the current method is re-entrant safe.""" | ||
318 | 159 | if self.body_stream is not None or 'noretry' in debug.debug_flags: | ||
319 | 160 | # We can't restart a body stream that has already been consumed. | ||
320 | 161 | return False | ||
321 | 162 | request_type = _mod_request.request_handlers.get_info(self.method) | ||
322 | 163 | if request_type in ('read', 'idem', 'semi'): | ||
323 | 164 | return True | ||
324 | 165 | # If we have gotten this far, 'stream' cannot be retried, because we | ||
325 | 166 | # already consumed the local stream. | ||
326 | 167 | if request_type in ('semivfs', 'mutate', 'stream'): | ||
327 | 168 | return False | ||
328 | 169 | trace.mutter('Unknown request type: %s for method %s' | ||
329 | 170 | % (request_type, self.method)) | ||
330 | 171 | return False | ||
331 | 172 | |||
332 | 173 | def _run_call_hooks(self): | ||
333 | 174 | if not _SmartClient.hooks['call']: | ||
334 | 175 | return | ||
335 | 176 | params = CallHookParams(self.method, self.args, self.body, | ||
336 | 177 | self.readv_body, self.client._medium) | ||
337 | 178 | for hook in _SmartClient.hooks['call']: | ||
338 | 179 | hook(params) | ||
339 | 180 | |||
340 | 181 | def _call(self, protocol_version): | ||
341 | 182 | """We know the protocol version. | ||
342 | 183 | |||
343 | 184 | So this just sends the request, and then reads the response. This is | ||
344 | 185 | where the code will be to retry requests if the connection is closed. | ||
345 | 186 | """ | ||
346 | 187 | response_handler = self._send(protocol_version) | ||
347 | 188 | try: | ||
348 | 189 | response_tuple = response_handler.read_response_tuple( | ||
349 | 190 | expect_body=self.expect_response_body) | ||
350 | 191 | except errors.ConnectionReset, e: | ||
351 | 192 | self.client._medium.reset() | ||
352 | 193 | if not self._is_safe_to_send_twice(): | ||
353 | 194 | raise | ||
354 | 195 | trace.warning('ConnectionReset reading response for %r, retrying' | ||
355 | 196 | % (self.method,)) | ||
356 | 197 | trace.log_exception_quietly() | ||
357 | 198 | encoder, response_handler = self._construct_protocol( | ||
358 | 199 | protocol_version) | ||
359 | 200 | self._send_no_retry(encoder) | ||
360 | 201 | response_tuple = response_handler.read_response_tuple( | ||
361 | 202 | expect_body=self.expect_response_body) | ||
362 | 203 | return (response_tuple, response_handler) | ||
363 | 204 | |||
364 | 205 | def _call_determining_protocol_version(self): | ||
365 | 206 | """Determine what protocol the remote server supports. | ||
366 | 207 | |||
367 | 208 | We do this by placing a request in the most recent protocol, and | ||
368 | 209 | handling the UnexpectedProtocolVersionMarker from the server. | ||
369 | 210 | """ | ||
370 | 211 | for protocol_version in [3, 2]: | ||
371 | 212 | if protocol_version == 2: | ||
372 | 213 | # If v3 doesn't work, the remote side is older than 1.6. | ||
373 | 214 | self.client._medium._remember_remote_is_before((1, 6)) | ||
374 | 215 | try: | ||
375 | 216 | response_tuple, response_handler = self._call(protocol_version) | ||
376 | 217 | except errors.UnexpectedProtocolVersionMarker, err: | ||
377 | 218 | # TODO: We could recover from this without disconnecting if | ||
378 | 219 | # we recognise the protocol version. | ||
379 | 220 | trace.warning( | ||
380 | 221 | 'Server does not understand Bazaar network protocol %d,' | ||
381 | 222 | ' reconnecting. (Upgrade the server to avoid this.)' | ||
382 | 223 | % (protocol_version,)) | ||
383 | 224 | self.client._medium.disconnect() | ||
384 | 225 | continue | ||
385 | 226 | except errors.ErrorFromSmartServer: | ||
386 | 227 | # If we received an error reply from the server, then it | ||
387 | 228 | # must be ok with this protocol version. | ||
388 | 229 | self.client._medium._protocol_version = protocol_version | ||
389 | 230 | raise | ||
390 | 231 | else: | ||
391 | 232 | self.client._medium._protocol_version = protocol_version | ||
392 | 233 | return response_tuple, response_handler | ||
393 | 234 | raise errors.SmartProtocolError( | ||
394 | 235 | 'Server is not a Bazaar server: ' + str(err)) | ||
395 | 236 | |||
396 | 237 | def _construct_protocol(self, version): | ||
397 | 238 | """Build the encoding stack for a given protocol version.""" | ||
398 | 239 | request = self.client._medium.get_request() | ||
399 | 240 | if version == 3: | ||
400 | 241 | request_encoder = protocol.ProtocolThreeRequester(request) | ||
401 | 242 | response_handler = message.ConventionalResponseHandler() | ||
402 | 243 | response_proto = protocol.ProtocolThreeDecoder( | ||
403 | 244 | response_handler, expect_version_marker=True) | ||
404 | 245 | response_handler.setProtoAndMediumRequest(response_proto, request) | ||
405 | 246 | elif version == 2: | ||
406 | 247 | request_encoder = protocol.SmartClientRequestProtocolTwo(request) | ||
407 | 248 | response_handler = request_encoder | ||
408 | 249 | else: | ||
409 | 250 | request_encoder = protocol.SmartClientRequestProtocolOne(request) | ||
410 | 251 | response_handler = request_encoder | ||
411 | 252 | return request_encoder, response_handler | ||
412 | 253 | |||
413 | 254 | def _send(self, protocol_version): | ||
414 | 255 | """Encode the request, and send it to the server. | ||
415 | 256 | |||
416 | 257 | This will retry a request if we get a ConnectionReset while sending the | ||
417 | 258 | request to the server. (Unless we have a body_stream that we have | ||
418 | 259 | already started consuming, since we can't restart body_streams) | ||
419 | 260 | |||
420 | 261 | :return: response_handler as defined by _construct_protocol | ||
421 | 262 | """ | ||
422 | 263 | encoder, response_handler = self._construct_protocol(protocol_version) | ||
423 | 264 | try: | ||
424 | 265 | self._send_no_retry(encoder) | ||
425 | 266 | except errors.ConnectionReset, e: | ||
426 | 267 | # If we fail during the _send_no_retry phase, then we can | ||
427 | 268 | # be confident that the server did not get our request, because we | ||
428 | 269 | # haven't started waiting for the reply yet. So try the request | ||
429 | 270 | # again. We only issue a single retry, because if the connection | ||
430 | 271 | # really is down, there is no reason to loop endlessly. | ||
431 | 272 | |||
432 | 273 | # Connection is dead, so close our end of it. | ||
433 | 274 | self.client._medium.reset() | ||
434 | 275 | if (('noretry' in debug.debug_flags) | ||
435 | 276 | or (self.body_stream is not None | ||
436 | 277 | and encoder.body_stream_started)): | ||
437 | 278 | # We can't restart a body_stream that has been partially | ||
438 | 279 | # consumed, so we don't retry. | ||
439 | 280 | # Note: We don't have to worry about | ||
440 | 281 | # SmartClientRequestProtocolOne or Two, because they don't | ||
441 | 282 | # support client-side body streams. | ||
442 | 283 | raise | ||
443 | 284 | trace.warning('ConnectionReset calling %r, retrying' | ||
444 | 285 | % (self.method,)) | ||
445 | 286 | trace.log_exception_quietly() | ||
446 | 287 | encoder, response_handler = self._construct_protocol( | ||
447 | 288 | protocol_version) | ||
448 | 289 | self._send_no_retry(encoder) | ||
449 | 290 | return response_handler | ||
450 | 291 | |||
451 | 292 | def _send_no_retry(self, encoder): | ||
452 | 293 | """Just encode the request and try to send it.""" | ||
453 | 294 | encoder.set_headers(self.client._headers) | ||
454 | 295 | if self.body is not None: | ||
455 | 296 | if self.readv_body is not None: | ||
456 | 297 | raise AssertionError( | ||
457 | 298 | "body and readv_body are mutually exclusive.") | ||
458 | 299 | if self.body_stream is not None: | ||
459 | 300 | raise AssertionError( | ||
460 | 301 | "body and body_stream are mutually exclusive.") | ||
461 | 302 | encoder.call_with_body_bytes((self.method, ) + self.args, self.body) | ||
462 | 303 | elif self.readv_body is not None: | ||
463 | 304 | if self.body_stream is not None: | ||
464 | 305 | raise AssertionError( | ||
465 | 306 | "readv_body and body_stream are mutually exclusive.") | ||
466 | 307 | encoder.call_with_body_readv_array((self.method, ) + self.args, | ||
467 | 308 | self.readv_body) | ||
468 | 309 | elif self.body_stream is not None: | ||
469 | 310 | encoder.call_with_body_stream((self.method, ) + self.args, | ||
470 | 311 | self.body_stream) | ||
471 | 312 | else: | ||
472 | 313 | encoder.call(self.method, *self.args) | ||
473 | 314 | |||
474 | 315 | |||
475 | 194 | class SmartClientHooks(hooks.Hooks): | 316 | class SmartClientHooks(hooks.Hooks): |
476 | 195 | 317 | ||
477 | 196 | def __init__(self): | 318 | def __init__(self): |
478 | 197 | 319 | ||
479 | === modified file 'bzrlib/smart/medium.py' | |||
480 | --- bzrlib/smart/medium.py 2010-04-27 06:40:37 +0000 | |||
481 | +++ bzrlib/smart/medium.py 2012-09-12 09:29:20 +0000 | |||
482 | @@ -1,4 +1,4 @@ | |||
484 | 1 | # Copyright (C) 2006-2010 Canonical Ltd | 1 | # Copyright (C) 2006-2012 Canonical Ltd |
485 | 2 | # | 2 | # |
486 | 3 | # This program is free software; you can redistribute it and/or modify | 3 | # This program is free software; you can redistribute it and/or modify |
487 | 4 | # it under the terms of the GNU General Public License as published by | 4 | # it under the terms of the GNU General Public License as published by |
488 | @@ -50,11 +50,11 @@ | |||
489 | 50 | #usually already imported, and getting IllegalScoperReplacer on it here. | 50 | #usually already imported, and getting IllegalScoperReplacer on it here. |
490 | 51 | from bzrlib import osutils | 51 | from bzrlib import osutils |
491 | 52 | 52 | ||
497 | 53 | # We must not read any more than 64k at a time so we don't risk "no buffer | 53 | # Throughout this module buffer size parameters are either limited to be at |
498 | 54 | # space available" errors on some platforms. Windows in particular is likely | 54 | # most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead. |
499 | 55 | # to give error 10053 or 10055 if we read more than 64k from a socket. | 55 | # For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads |
500 | 56 | _MAX_READ_SIZE = 64 * 1024 | 56 | # from non-sockets as well. |
501 | 57 | 57 | _MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK | |
502 | 58 | 58 | ||
503 | 59 | def _get_protocol_factory_for_bytes(bytes): | 59 | def _get_protocol_factory_for_bytes(bytes): |
504 | 60 | """Determine the right protocol factory for 'bytes'. | 60 | """Determine the right protocol factory for 'bytes'. |
505 | @@ -178,6 +178,14 @@ | |||
506 | 178 | ui.ui_factory.report_transport_activity(self, bytes, direction) | 178 | ui.ui_factory.report_transport_activity(self, bytes, direction) |
507 | 179 | 179 | ||
508 | 180 | 180 | ||
509 | 181 | _bad_file_descriptor = (errno.EBADF,) | ||
510 | 182 | if sys.platform == 'win32': | ||
511 | 183 | # Given on Windows if you pass a closed socket to select.select. Probably | ||
512 | 184 | # also given if you pass a file handle to select. | ||
513 | 185 | WSAENOTSOCK = 10038 | ||
514 | 186 | _bad_file_descriptor += (WSAENOTSOCK,) | ||
515 | 187 | |||
516 | 188 | |||
517 | 181 | class SmartServerStreamMedium(SmartMedium): | 189 | class SmartServerStreamMedium(SmartMedium): |
518 | 182 | """Handles smart commands coming over a stream. | 190 | """Handles smart commands coming over a stream. |
519 | 183 | 191 | ||
520 | @@ -241,6 +249,8 @@ | |||
521 | 241 | 249 | ||
522 | 242 | :param protocol: a SmartServerRequestProtocol. | 250 | :param protocol: a SmartServerRequestProtocol. |
523 | 243 | """ | 251 | """ |
524 | 252 | if protocol is None: | ||
525 | 253 | return | ||
526 | 244 | try: | 254 | try: |
527 | 245 | self._serve_one_request_unguarded(protocol) | 255 | self._serve_one_request_unguarded(protocol) |
528 | 246 | except KeyboardInterrupt: | 256 | except KeyboardInterrupt: |
529 | @@ -276,9 +286,9 @@ | |||
530 | 276 | def _serve_one_request_unguarded(self, protocol): | 286 | def _serve_one_request_unguarded(self, protocol): |
531 | 277 | while protocol.next_read_size(): | 287 | while protocol.next_read_size(): |
532 | 278 | # We can safely try to read large chunks. If there is less data | 288 | # We can safely try to read large chunks. If there is less data |
536 | 279 | # than _MAX_READ_SIZE ready, the socket wil just return a short | 289 | # than MAX_SOCKET_CHUNK ready, the socket will just return a |
537 | 280 | # read immediately rather than block. | 290 | # short read immediately rather than block. |
538 | 281 | bytes = self.read_bytes(_MAX_READ_SIZE) | 291 | bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK) |
539 | 282 | if bytes == '': | 292 | if bytes == '': |
540 | 283 | self.finished = True | 293 | self.finished = True |
541 | 284 | return | 294 | return |
542 | @@ -287,13 +297,13 @@ | |||
543 | 287 | self._push_back(protocol.unused_data) | 297 | self._push_back(protocol.unused_data) |
544 | 288 | 298 | ||
545 | 289 | def _read_bytes(self, desired_count): | 299 | def _read_bytes(self, desired_count): |
548 | 290 | return _read_bytes_from_socket( | 300 | return osutils.read_bytes_from_socket( |
549 | 291 | self.socket.recv, desired_count, self._report_activity) | 301 | self.socket, self._report_activity) |
550 | 292 | 302 | ||
551 | 293 | def terminate_due_to_error(self): | 303 | def terminate_due_to_error(self): |
552 | 294 | # TODO: This should log to a server log file, but no such thing | 304 | # TODO: This should log to a server log file, but no such thing |
553 | 295 | # exists yet. Andrew Bennetts 2006-09-29. | 305 | # exists yet. Andrew Bennetts 2006-09-29. |
555 | 296 | osutils.until_no_eintr(self.socket.close) | 306 | self.socket.close() |
556 | 297 | self.finished = True | 307 | self.finished = True |
557 | 298 | 308 | ||
558 | 299 | def _write_out(self, bytes): | 309 | def _write_out(self, bytes): |
559 | @@ -345,16 +355,16 @@ | |||
560 | 345 | protocol.accept_bytes(bytes) | 355 | protocol.accept_bytes(bytes) |
561 | 346 | 356 | ||
562 | 347 | def _read_bytes(self, desired_count): | 357 | def _read_bytes(self, desired_count): |
564 | 348 | return osutils.until_no_eintr(self._in.read, desired_count) | 358 | return self._in.read(desired_count) |
565 | 349 | 359 | ||
566 | 350 | def terminate_due_to_error(self): | 360 | def terminate_due_to_error(self): |
567 | 351 | # TODO: This should log to a server log file, but no such thing | 361 | # TODO: This should log to a server log file, but no such thing |
568 | 352 | # exists yet. Andrew Bennetts 2006-09-29. | 362 | # exists yet. Andrew Bennetts 2006-09-29. |
570 | 353 | osutils.until_no_eintr(self._out.close) | 363 | self._out.close() |
571 | 354 | self.finished = True | 364 | self.finished = True |
572 | 355 | 365 | ||
573 | 356 | def _write_out(self, bytes): | 366 | def _write_out(self, bytes): |
575 | 357 | osutils.until_no_eintr(self._out.write, bytes) | 367 | self._out.write(bytes) |
576 | 358 | 368 | ||
577 | 359 | 369 | ||
578 | 360 | class SmartClientMediumRequest(object): | 370 | class SmartClientMediumRequest(object): |
579 | @@ -712,6 +722,14 @@ | |||
580 | 712 | """ | 722 | """ |
581 | 713 | return SmartClientStreamMediumRequest(self) | 723 | return SmartClientStreamMediumRequest(self) |
582 | 714 | 724 | ||
583 | 725 | def reset(self): | ||
584 | 726 | """We have been disconnected, reset current state. | ||
585 | 727 | |||
586 | 728 | This resets things like _current_request and connected state. | ||
587 | 729 | """ | ||
588 | 730 | self.disconnect() | ||
589 | 731 | self._current_request = None | ||
590 | 732 | |||
591 | 715 | 733 | ||
592 | 716 | class SmartSimplePipesClientMedium(SmartClientStreamMedium): | 734 | class SmartSimplePipesClientMedium(SmartClientStreamMedium): |
593 | 717 | """A client medium using simple pipes. | 735 | """A client medium using simple pipes. |
594 | @@ -726,22 +744,35 @@ | |||
595 | 726 | 744 | ||
596 | 727 | def _accept_bytes(self, bytes): | 745 | def _accept_bytes(self, bytes): |
597 | 728 | """See SmartClientStreamMedium.accept_bytes.""" | 746 | """See SmartClientStreamMedium.accept_bytes.""" |
599 | 729 | osutils.until_no_eintr(self._writeable_pipe.write, bytes) | 747 | try: |
600 | 748 | self._writeable_pipe.write(bytes) | ||
601 | 749 | except IOError, e: | ||
602 | 750 | if e.errno in (errno.EINVAL, errno.EPIPE): | ||
603 | 751 | raise errors.ConnectionReset( | ||
604 | 752 | "Error trying to write to subprocess", e) | ||
605 | 753 | raise | ||
606 | 730 | self._report_activity(len(bytes), 'write') | 754 | self._report_activity(len(bytes), 'write') |
607 | 731 | 755 | ||
608 | 732 | def _flush(self): | 756 | def _flush(self): |
609 | 733 | """See SmartClientStreamMedium._flush().""" | 757 | """See SmartClientStreamMedium._flush().""" |
611 | 734 | osutils.until_no_eintr(self._writeable_pipe.flush) | 758 | # Note: If flush were to fail, we'd like to raise ConnectionReset, etc. |
612 | 759 | # However, testing shows that even when the child process is | ||
613 | 760 | # gone, this doesn't error. | ||
614 | 761 | self._writeable_pipe.flush() | ||
615 | 735 | 762 | ||
616 | 736 | def _read_bytes(self, count): | 763 | def _read_bytes(self, count): |
617 | 737 | """See SmartClientStreamMedium._read_bytes.""" | 764 | """See SmartClientStreamMedium._read_bytes.""" |
619 | 738 | bytes = osutils.until_no_eintr(self._readable_pipe.read, count) | 765 | bytes_to_read = min(count, _MAX_READ_SIZE) |
620 | 766 | bytes = self._readable_pipe.read(bytes_to_read) | ||
621 | 739 | self._report_activity(len(bytes), 'read') | 767 | self._report_activity(len(bytes), 'read') |
622 | 740 | return bytes | 768 | return bytes |
623 | 741 | 769 | ||
624 | 742 | 770 | ||
625 | 743 | class SmartSSHClientMedium(SmartClientStreamMedium): | 771 | class SmartSSHClientMedium(SmartClientStreamMedium): |
627 | 744 | """A client medium using SSH.""" | 772 | """A client medium using SSH. |
628 | 773 | |||
629 | 774 | It delegates IO to a SmartSimplePipesClientMedium. | ||
630 | 775 | """ | ||
631 | 745 | 776 | ||
632 | 746 | def __init__(self, host, port=None, username=None, password=None, | 777 | def __init__(self, host, port=None, username=None, password=None, |
633 | 747 | base=None, vendor=None, bzr_remote_path=None): | 778 | base=None, vendor=None, bzr_remote_path=None): |
634 | @@ -750,11 +781,11 @@ | |||
635 | 750 | :param vendor: An optional override for the ssh vendor to use. See | 781 | :param vendor: An optional override for the ssh vendor to use. See |
636 | 751 | bzrlib.transport.ssh for details on ssh vendors. | 782 | bzrlib.transport.ssh for details on ssh vendors. |
637 | 752 | """ | 783 | """ |
638 | 753 | self._connected = False | ||
639 | 754 | self._host = host | 784 | self._host = host |
640 | 755 | self._password = password | 785 | self._password = password |
641 | 756 | self._port = port | 786 | self._port = port |
642 | 757 | self._username = username | 787 | self._username = username |
643 | 788 | self._real_medium = None | ||
644 | 758 | # for the benefit of progress making a short description of this | 789 | # for the benefit of progress making a short description of this |
645 | 759 | # transport | 790 | # transport |
646 | 760 | self._scheme = 'bzr+ssh' | 791 | self._scheme = 'bzr+ssh' |
647 | @@ -762,11 +793,9 @@ | |||
648 | 762 | # _DebugCounter so we have to store all the values used in our repr | 793 | # _DebugCounter so we have to store all the values used in our repr |
649 | 763 | # method before calling the super init. | 794 | # method before calling the super init. |
650 | 764 | SmartClientStreamMedium.__init__(self, base) | 795 | SmartClientStreamMedium.__init__(self, base) |
651 | 765 | self._read_from = None | ||
652 | 766 | self._ssh_connection = None | ||
653 | 767 | self._vendor = vendor | 796 | self._vendor = vendor |
654 | 768 | self._write_to = None | ||
655 | 769 | self._bzr_remote_path = bzr_remote_path | 797 | self._bzr_remote_path = bzr_remote_path |
656 | 798 | self._ssh_connection = None | ||
657 | 770 | 799 | ||
658 | 771 | def __repr__(self): | 800 | def __repr__(self): |
659 | 772 | if self._port is None: | 801 | if self._port is None: |
660 | @@ -783,21 +812,20 @@ | |||
661 | 783 | def _accept_bytes(self, bytes): | 812 | def _accept_bytes(self, bytes): |
662 | 784 | """See SmartClientStreamMedium.accept_bytes.""" | 813 | """See SmartClientStreamMedium.accept_bytes.""" |
663 | 785 | self._ensure_connection() | 814 | self._ensure_connection() |
666 | 786 | osutils.until_no_eintr(self._write_to.write, bytes) | 815 | self._real_medium.accept_bytes(bytes) |
665 | 787 | self._report_activity(len(bytes), 'write') | ||
667 | 788 | 816 | ||
668 | 789 | def disconnect(self): | 817 | def disconnect(self): |
669 | 790 | """See SmartClientMedium.disconnect().""" | 818 | """See SmartClientMedium.disconnect().""" |
676 | 791 | if not self._connected: | 819 | if self._real_medium is not None: |
677 | 792 | return | 820 | self._real_medium.disconnect() |
678 | 793 | osutils.until_no_eintr(self._read_from.close) | 821 | self._real_medium = None |
679 | 794 | osutils.until_no_eintr(self._write_to.close) | 822 | if self._ssh_connection is not None: |
680 | 795 | self._ssh_connection.close() | 823 | self._ssh_connection.close() |
681 | 796 | self._connected = False | 824 | self._ssh_connection = None |
682 | 797 | 825 | ||
683 | 798 | def _ensure_connection(self): | 826 | def _ensure_connection(self): |
684 | 799 | """Connect this medium if not already connected.""" | 827 | """Connect this medium if not already connected.""" |
686 | 800 | if self._connected: | 828 | if self._real_medium is not None: |
687 | 801 | return | 829 | return |
688 | 802 | if self._vendor is None: | 830 | if self._vendor is None: |
689 | 803 | vendor = ssh._get_ssh_vendor() | 831 | vendor = ssh._get_ssh_vendor() |
690 | @@ -807,22 +835,19 @@ | |||
691 | 807 | self._password, self._host, self._port, | 835 | self._password, self._host, self._port, |
692 | 808 | command=[self._bzr_remote_path, 'serve', '--inet', | 836 | command=[self._bzr_remote_path, 'serve', '--inet', |
693 | 809 | '--directory=/', '--allow-writes']) | 837 | '--directory=/', '--allow-writes']) |
697 | 810 | self._read_from, self._write_to = \ | 838 | read_from, write_to = self._ssh_connection.get_filelike_channels() |
698 | 811 | self._ssh_connection.get_filelike_channels() | 839 | self._real_medium = SmartSimplePipesClientMedium( |
699 | 812 | self._connected = True | 840 | read_from, write_to, self.base) |
700 | 813 | 841 | ||
701 | 814 | def _flush(self): | 842 | def _flush(self): |
702 | 815 | """See SmartClientStreamMedium._flush().""" | 843 | """See SmartClientStreamMedium._flush().""" |
704 | 816 | self._write_to.flush() | 844 | self._real_medium._flush() |
705 | 817 | 845 | ||
706 | 818 | def _read_bytes(self, count): | 846 | def _read_bytes(self, count): |
707 | 819 | """See SmartClientStreamMedium.read_bytes.""" | 847 | """See SmartClientStreamMedium.read_bytes.""" |
709 | 820 | if not self._connected: | 848 | if self._real_medium is None: |
710 | 821 | raise errors.MediumNotConnected(self) | 849 | raise errors.MediumNotConnected(self) |
715 | 822 | bytes_to_read = min(count, _MAX_READ_SIZE) | 850 | return self._real_medium.read_bytes(count) |
712 | 823 | bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read) | ||
713 | 824 | self._report_activity(len(bytes), 'read') | ||
714 | 825 | return bytes | ||
716 | 826 | 851 | ||
717 | 827 | 852 | ||
718 | 828 | # Port 4155 is the default port for bzr://, registered with IANA. | 853 | # Port 4155 is the default port for bzr://, registered with IANA. |
719 | @@ -850,7 +875,7 @@ | |||
720 | 850 | """See SmartClientMedium.disconnect().""" | 875 | """See SmartClientMedium.disconnect().""" |
721 | 851 | if not self._connected: | 876 | if not self._connected: |
722 | 852 | return | 877 | return |
724 | 853 | osutils.until_no_eintr(self._socket.close) | 878 | self._socket.close() |
725 | 854 | self._socket = None | 879 | self._socket = None |
726 | 855 | self._connected = False | 880 | self._connected = False |
727 | 856 | 881 | ||
728 | @@ -904,8 +929,8 @@ | |||
729 | 904 | """See SmartClientMedium.read_bytes.""" | 929 | """See SmartClientMedium.read_bytes.""" |
730 | 905 | if not self._connected: | 930 | if not self._connected: |
731 | 906 | raise errors.MediumNotConnected(self) | 931 | raise errors.MediumNotConnected(self) |
734 | 907 | return _read_bytes_from_socket( | 932 | return osutils.read_bytes_from_socket( |
735 | 908 | self._socket.recv, count, self._report_activity) | 933 | self._socket, self._report_activity) |
736 | 909 | 934 | ||
737 | 910 | 935 | ||
738 | 911 | class SmartClientStreamMediumRequest(SmartClientMediumRequest): | 936 | class SmartClientStreamMediumRequest(SmartClientMediumRequest): |
739 | @@ -946,21 +971,3 @@ | |||
740 | 946 | This invokes self._medium._flush to ensure all bytes are transmitted. | 971 | This invokes self._medium._flush to ensure all bytes are transmitted. |
741 | 947 | """ | 972 | """ |
742 | 948 | self._medium._flush() | 973 | self._medium._flush() |
743 | 949 | |||
744 | 950 | |||
745 | 951 | def _read_bytes_from_socket(sock, desired_count, report_activity): | ||
746 | 952 | # We ignore the desired_count because on sockets it's more efficient to | ||
747 | 953 | # read large chunks (of _MAX_READ_SIZE bytes) at a time. | ||
748 | 954 | try: | ||
749 | 955 | bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE) | ||
750 | 956 | except socket.error, e: | ||
751 | 957 | if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054): | ||
752 | 958 | # The connection was closed by the other side. Callers expect an | ||
753 | 959 | # empty string to signal end-of-stream. | ||
754 | 960 | bytes = '' | ||
755 | 961 | else: | ||
756 | 962 | raise | ||
757 | 963 | else: | ||
758 | 964 | report_activity(len(bytes), 'read') | ||
759 | 965 | return bytes | ||
760 | 966 | |||
761 | 967 | 974 | ||
762 | === modified file 'bzrlib/smart/protocol.py' | |||
763 | --- bzrlib/smart/protocol.py 2010-02-17 17:11:16 +0000 | |||
764 | +++ bzrlib/smart/protocol.py 2012-09-12 09:29:20 +0000 | |||
765 | @@ -62,7 +62,13 @@ | |||
766 | 62 | 62 | ||
767 | 63 | def _encode_tuple(args): | 63 | def _encode_tuple(args): |
768 | 64 | """Encode the tuple args to a bytestream.""" | 64 | """Encode the tuple args to a bytestream.""" |
770 | 65 | return '\x01'.join(args) + '\n' | 65 | joined = '\x01'.join(args) + '\n' |
771 | 66 | if type(joined) is unicode: | ||
772 | 67 | # XXX: We should fix things so this never happens! -AJB, 20100304 | ||
773 | 68 | mutter('response args contain unicode, should be only bytes: %r', | ||
774 | 69 | joined) | ||
775 | 70 | joined = joined.encode('ascii') | ||
776 | 71 | return joined | ||
777 | 66 | 72 | ||
778 | 67 | 73 | ||
779 | 68 | class Requester(object): | 74 | class Requester(object): |
780 | @@ -648,7 +654,7 @@ | |||
781 | 648 | """Make a remote call with a readv array. | 654 | """Make a remote call with a readv array. |
782 | 649 | 655 | ||
783 | 650 | The body is encoded with one line per readv offset pair. The numbers in | 656 | The body is encoded with one line per readv offset pair. The numbers in |
785 | 651 | each pair are separated by a comma, and no trailing \n is emitted. | 657 | each pair are separated by a comma, and no trailing \\n is emitted. |
786 | 652 | """ | 658 | """ |
787 | 653 | if 'hpss' in debug.debug_flags: | 659 | if 'hpss' in debug.debug_flags: |
788 | 654 | mutter('hpss call w/readv: %s', repr(args)[1:-1]) | 660 | mutter('hpss call w/readv: %s', repr(args)[1:-1]) |
789 | @@ -1075,9 +1081,6 @@ | |||
790 | 1075 | self._real_write_func = write_func | 1081 | self._real_write_func = write_func |
791 | 1076 | 1082 | ||
792 | 1077 | def _write_func(self, bytes): | 1083 | def _write_func(self, bytes): |
793 | 1078 | # TODO: It is probably more appropriate to use sum(map(len, _buf)) | ||
794 | 1079 | # for total number of bytes to write, rather than buffer based on | ||
795 | 1080 | # the number of write() calls | ||
796 | 1081 | # TODO: Another possibility would be to turn this into an async model. | 1084 | # TODO: Another possibility would be to turn this into an async model. |
797 | 1082 | # Where we let another thread know that we have some bytes if | 1085 | # Where we let another thread know that we have some bytes if |
798 | 1083 | # they want it, but we don't actually block for it | 1086 | # they want it, but we don't actually block for it |
799 | @@ -1225,6 +1228,7 @@ | |||
800 | 1225 | if first_chunk is None: | 1228 | if first_chunk is None: |
801 | 1226 | first_chunk = chunk | 1229 | first_chunk = chunk |
802 | 1227 | self._write_prefixed_body(chunk) | 1230 | self._write_prefixed_body(chunk) |
803 | 1231 | self.flush() | ||
804 | 1228 | if 'hpssdetail' in debug.debug_flags: | 1232 | if 'hpssdetail' in debug.debug_flags: |
805 | 1229 | # Not worth timing separately, as _write_func is | 1233 | # Not worth timing separately, as _write_func is |
806 | 1230 | # actually buffered | 1234 | # actually buffered |
807 | @@ -1285,6 +1289,7 @@ | |||
808 | 1285 | _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes) | 1289 | _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes) |
809 | 1286 | self._medium_request = medium_request | 1290 | self._medium_request = medium_request |
810 | 1287 | self._headers = {} | 1291 | self._headers = {} |
811 | 1292 | self.body_stream_started = None | ||
812 | 1288 | 1293 | ||
813 | 1289 | def set_headers(self, headers): | 1294 | def set_headers(self, headers): |
814 | 1290 | self._headers = headers.copy() | 1295 | self._headers = headers.copy() |
815 | @@ -1325,7 +1330,7 @@ | |||
816 | 1325 | """Make a remote call with a readv array. | 1330 | """Make a remote call with a readv array. |
817 | 1326 | 1331 | ||
818 | 1327 | The body is encoded with one line per readv offset pair. The numbers in | 1332 | The body is encoded with one line per readv offset pair. The numbers in |
820 | 1328 | each pair are separated by a comma, and no trailing \n is emitted. | 1333 | each pair are separated by a comma, and no trailing \\n is emitted. |
821 | 1329 | """ | 1334 | """ |
822 | 1330 | if 'hpss' in debug.debug_flags: | 1335 | if 'hpss' in debug.debug_flags: |
823 | 1331 | mutter('hpss call w/readv: %s', repr(args)[1:-1]) | 1336 | mutter('hpss call w/readv: %s', repr(args)[1:-1]) |
824 | @@ -1350,6 +1355,7 @@ | |||
825 | 1350 | if path is not None: | 1355 | if path is not None: |
826 | 1351 | mutter(' (to %s)', path) | 1356 | mutter(' (to %s)', path) |
827 | 1352 | self._request_start_time = osutils.timer_func() | 1357 | self._request_start_time = osutils.timer_func() |
828 | 1358 | self.body_stream_started = False | ||
829 | 1353 | self._write_protocol_version() | 1359 | self._write_protocol_version() |
830 | 1354 | self._write_headers(self._headers) | 1360 | self._write_headers(self._headers) |
831 | 1355 | self._write_structure(args) | 1361 | self._write_structure(args) |
832 | @@ -1357,6 +1363,9 @@ | |||
833 | 1357 | # have finished sending the stream. We would notice at the end | 1363 | # have finished sending the stream. We would notice at the end |
834 | 1358 | # anyway, but if the medium can deliver it early then it's good | 1364 | # anyway, but if the medium can deliver it early then it's good |
835 | 1359 | # to short-circuit the whole request... | 1365 | # to short-circuit the whole request... |
836 | 1366 | # Provoke any ConnectionReset failures before we start the body stream. | ||
837 | 1367 | self.flush() | ||
838 | 1368 | self.body_stream_started = True | ||
839 | 1360 | for exc_info, part in _iter_with_errors(stream): | 1369 | for exc_info, part in _iter_with_errors(stream): |
840 | 1361 | if exc_info is not None: | 1370 | if exc_info is not None: |
841 | 1362 | # Iterating the stream failed. Cleanly abort the request. | 1371 | # Iterating the stream failed. Cleanly abort the request. |
842 | 1363 | 1372 | ||
843 | === modified file 'bzrlib/smart/request.py' | |||
844 | --- bzrlib/smart/request.py 2010-02-17 17:11:16 +0000 | |||
845 | +++ bzrlib/smart/request.py 2012-09-12 09:29:20 +0000 | |||
846 | @@ -1,4 +1,4 @@ | |||
848 | 1 | # Copyright (C) 2006-2010 Canonical Ltd | 1 | # Copyright (C) 2006-2012 Canonical Ltd |
849 | 2 | # | 2 | # |
850 | 3 | # This program is free software; you can redistribute it and/or modify | 3 | # This program is free software; you can redistribute it and/or modify |
851 | 4 | # it under the terms of the GNU General Public License as published by | 4 | # it under the terms of the GNU General Public License as published by |
852 | @@ -134,7 +134,7 @@ | |||
853 | 134 | It will return a SmartServerResponse if the command does not expect a | 134 | It will return a SmartServerResponse if the command does not expect a |
854 | 135 | body. | 135 | body. |
855 | 136 | 136 | ||
857 | 137 | :param *args: the arguments of the request. | 137 | :param args: the arguments of the request. |
858 | 138 | """ | 138 | """ |
859 | 139 | self._check_enabled() | 139 | self._check_enabled() |
860 | 140 | return self.do(*args) | 140 | return self.do(*args) |
861 | @@ -486,152 +486,196 @@ | |||
862 | 486 | return SuccessfulSmartServerResponse((answer,)) | 486 | return SuccessfulSmartServerResponse((answer,)) |
863 | 487 | 487 | ||
864 | 488 | 488 | ||
865 | 489 | # In the 'info' attribute, we store whether this request is 'safe' to retry if | ||
866 | 490 | # we get a disconnect while reading the response. It can have the values: | ||
867 | 491 | # read This is purely a read request, so retrying it is perfectly ok. | ||
868 | 492 | # idem An idempotent write request. Something like 'put' where if you put | ||
869 | 493 | # the same bytes twice you end up with the same final bytes. | ||
870 | 494 | # semi This is a request that isn't strictly idempotent, but doesn't | ||
871 | 495 | # result in corruption if it is retried. This is for things like | ||
872 | 496 | # 'lock' and 'unlock'. If you call lock, it updates the disk | ||
873 | 497 | # structure. If you fail to read the response, you won't be able to | ||
874 | 498 | # use the lock, because you don't have the lock token. Calling lock | ||
875 | 499 | # again will fail, because the lock is already taken. However, we | ||
876 | 500 | # can't tell if the server received our request or not. If it didn't, | ||
877 | 501 | # then retrying the request is fine, as it will actually do what we | ||
878 | 502 | # want. If it did, we will interrupt the current operation, but we | ||
879 | 503 | # are no worse off than interrupting the current operation because of | ||
880 | 504 | # a ConnectionReset. | ||
881 | 505 | # semivfs Similar to semi, but specific to a Virtual FileSystem request. | ||
882 | 506 | # stream This is a request that takes a stream that cannot be restarted if | ||
883 | 507 | # consumed. This request is 'safe' in that if we determine the | ||
884 | 508 | # connection is closed before we consume the stream, we can try | ||
885 | 509 | # again. | ||
886 | 510 | # mutate State is updated in a way that replaying that request results in a | ||
887 | 511 | # different state. For example 'append' writes more bytes to a given | ||
888 | 512 | # file. If append succeeds, it moves the file pointer. | ||
889 | 489 | request_handlers = registry.Registry() | 513 | request_handlers = registry.Registry() |
890 | 490 | request_handlers.register_lazy( | 514 | request_handlers.register_lazy( |
892 | 491 | 'append', 'bzrlib.smart.vfs', 'AppendRequest') | 515 | 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate') |
893 | 492 | request_handlers.register_lazy( | 516 | request_handlers.register_lazy( |
894 | 493 | 'Branch.get_config_file', 'bzrlib.smart.branch', | 517 | 'Branch.get_config_file', 'bzrlib.smart.branch', |
896 | 494 | 'SmartServerBranchGetConfigFile') | 518 | 'SmartServerBranchGetConfigFile', info='read') |
897 | 495 | request_handlers.register_lazy( | 519 | request_handlers.register_lazy( |
899 | 496 | 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent') | 520 | 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent', |
900 | 521 | info='read') | ||
901 | 497 | request_handlers.register_lazy( | 522 | request_handlers.register_lazy( |
902 | 498 | 'Branch.get_tags_bytes', 'bzrlib.smart.branch', | 523 | 'Branch.get_tags_bytes', 'bzrlib.smart.branch', |
904 | 499 | 'SmartServerBranchGetTagsBytes') | 524 | 'SmartServerBranchGetTagsBytes', info='read') |
905 | 500 | request_handlers.register_lazy( | 525 | request_handlers.register_lazy( |
906 | 501 | 'Branch.set_tags_bytes', 'bzrlib.smart.branch', | 526 | 'Branch.set_tags_bytes', 'bzrlib.smart.branch', |
920 | 502 | 'SmartServerBranchSetTagsBytes') | 527 | 'SmartServerBranchSetTagsBytes', info='idem') |
921 | 503 | request_handlers.register_lazy( | 528 | request_handlers.register_lazy( |
922 | 504 | 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL') | 529 | 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', |
923 | 505 | request_handlers.register_lazy( | 530 | 'SmartServerBranchRequestGetStackedOnURL', info='read') |
924 | 506 | 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo') | 531 | request_handlers.register_lazy( |
925 | 507 | request_handlers.register_lazy( | 532 | 'Branch.last_revision_info', 'bzrlib.smart.branch', |
926 | 508 | 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite') | 533 | 'SmartServerBranchRequestLastRevisionInfo', info='read') |
927 | 509 | request_handlers.register_lazy( 'Branch.revision_history', | 534 | request_handlers.register_lazy( |
928 | 510 | 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory') | 535 | 'Branch.lock_write', 'bzrlib.smart.branch', |
929 | 511 | request_handlers.register_lazy( 'Branch.set_config_option', | 536 | 'SmartServerBranchRequestLockWrite', info='semi') |
930 | 512 | 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption') | 537 | request_handlers.register_lazy( |
931 | 513 | request_handlers.register_lazy( 'Branch.set_last_revision', | 538 | 'Branch.revision_history', 'bzrlib.smart.branch', |
932 | 514 | 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision') | 539 | 'SmartServerRequestRevisionHistory', info='read') |
933 | 540 | request_handlers.register_lazy( | ||
934 | 541 | 'Branch.set_config_option', 'bzrlib.smart.branch', | ||
935 | 542 | 'SmartServerBranchRequestSetConfigOption', info='idem') | ||
936 | 543 | request_handlers.register_lazy( | ||
937 | 544 | 'Branch.set_last_revision', 'bzrlib.smart.branch', | ||
938 | 545 | 'SmartServerBranchRequestSetLastRevision', info='idem') | ||
939 | 515 | request_handlers.register_lazy( | 546 | request_handlers.register_lazy( |
940 | 516 | 'Branch.set_last_revision_info', 'bzrlib.smart.branch', | 547 | 'Branch.set_last_revision_info', 'bzrlib.smart.branch', |
942 | 517 | 'SmartServerBranchRequestSetLastRevisionInfo') | 548 | 'SmartServerBranchRequestSetLastRevisionInfo', info='idem') |
943 | 518 | request_handlers.register_lazy( | 549 | request_handlers.register_lazy( |
944 | 519 | 'Branch.set_last_revision_ex', 'bzrlib.smart.branch', | 550 | 'Branch.set_last_revision_ex', 'bzrlib.smart.branch', |
946 | 520 | 'SmartServerBranchRequestSetLastRevisionEx') | 551 | 'SmartServerBranchRequestSetLastRevisionEx', info='idem') |
947 | 521 | request_handlers.register_lazy( | 552 | request_handlers.register_lazy( |
948 | 522 | 'Branch.set_parent_location', 'bzrlib.smart.branch', | 553 | 'Branch.set_parent_location', 'bzrlib.smart.branch', |
950 | 523 | 'SmartServerBranchRequestSetParentLocation') | 554 | 'SmartServerBranchRequestSetParentLocation', info='idem') |
951 | 524 | request_handlers.register_lazy( | 555 | request_handlers.register_lazy( |
953 | 525 | 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock') | 556 | 'Branch.unlock', 'bzrlib.smart.branch', |
954 | 557 | 'SmartServerBranchRequestUnlock', info='semi') | ||
955 | 526 | request_handlers.register_lazy( | 558 | request_handlers.register_lazy( |
956 | 527 | 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir', | 559 | 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir', |
958 | 528 | 'SmartServerBzrDirRequestCloningMetaDir') | 560 | 'SmartServerBzrDirRequestCloningMetaDir', info='read') |
959 | 529 | request_handlers.register_lazy( | 561 | request_handlers.register_lazy( |
960 | 530 | 'BzrDir.create_branch', 'bzrlib.smart.bzrdir', | 562 | 'BzrDir.create_branch', 'bzrlib.smart.bzrdir', |
962 | 531 | 'SmartServerRequestCreateBranch') | 563 | 'SmartServerRequestCreateBranch', info='semi') |
963 | 532 | request_handlers.register_lazy( | 564 | request_handlers.register_lazy( |
964 | 533 | 'BzrDir.create_repository', 'bzrlib.smart.bzrdir', | 565 | 'BzrDir.create_repository', 'bzrlib.smart.bzrdir', |
966 | 534 | 'SmartServerRequestCreateRepository') | 566 | 'SmartServerRequestCreateRepository', info='semi') |
967 | 535 | request_handlers.register_lazy( | 567 | request_handlers.register_lazy( |
968 | 536 | 'BzrDir.find_repository', 'bzrlib.smart.bzrdir', | 568 | 'BzrDir.find_repository', 'bzrlib.smart.bzrdir', |
970 | 537 | 'SmartServerRequestFindRepositoryV1') | 569 | 'SmartServerRequestFindRepositoryV1', info='read') |
971 | 538 | request_handlers.register_lazy( | 570 | request_handlers.register_lazy( |
972 | 539 | 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir', | 571 | 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir', |
974 | 540 | 'SmartServerRequestFindRepositoryV2') | 572 | 'SmartServerRequestFindRepositoryV2', info='read') |
975 | 541 | request_handlers.register_lazy( | 573 | request_handlers.register_lazy( |
976 | 542 | 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir', | 574 | 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir', |
978 | 543 | 'SmartServerRequestFindRepositoryV3') | 575 | 'SmartServerRequestFindRepositoryV3', info='read') |
979 | 544 | request_handlers.register_lazy( | 576 | request_handlers.register_lazy( |
980 | 545 | 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir', | 577 | 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir', |
982 | 546 | 'SmartServerBzrDirRequestConfigFile') | 578 | 'SmartServerBzrDirRequestConfigFile', info='read') |
983 | 547 | request_handlers.register_lazy( | 579 | request_handlers.register_lazy( |
984 | 548 | 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir', | 580 | 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir', |
986 | 549 | 'SmartServerRequestInitializeBzrDir') | 581 | 'SmartServerRequestInitializeBzrDir', info='semi') |
987 | 550 | request_handlers.register_lazy( | 582 | request_handlers.register_lazy( |
988 | 551 | 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir', | 583 | 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir', |
994 | 552 | 'SmartServerRequestBzrDirInitializeEx') | 584 | 'SmartServerRequestBzrDirInitializeEx', info='semi') |
995 | 553 | request_handlers.register_lazy( | 585 | request_handlers.register_lazy( |
996 | 554 | 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir') | 586 | 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir', |
997 | 555 | request_handlers.register_lazy( | 587 | info='read') |
998 | 556 | 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1') | 588 | request_handlers.register_lazy( |
999 | 589 | 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', | ||
1000 | 590 | 'SmartServerRequestOpenBzrDir_2_1', info='read') | ||
1001 | 557 | request_handlers.register_lazy( | 591 | request_handlers.register_lazy( |
1002 | 558 | 'BzrDir.open_branch', 'bzrlib.smart.bzrdir', | 592 | 'BzrDir.open_branch', 'bzrlib.smart.bzrdir', |
1004 | 559 | 'SmartServerRequestOpenBranch') | 593 | 'SmartServerRequestOpenBranch', info='read') |
1005 | 560 | request_handlers.register_lazy( | 594 | request_handlers.register_lazy( |
1006 | 561 | 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir', | 595 | 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir', |
1008 | 562 | 'SmartServerRequestOpenBranchV2') | 596 | 'SmartServerRequestOpenBranchV2', info='read') |
1009 | 563 | request_handlers.register_lazy( | 597 | request_handlers.register_lazy( |
1010 | 564 | 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir', | 598 | 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir', |
1038 | 565 | 'SmartServerRequestOpenBranchV3') | 599 | 'SmartServerRequestOpenBranchV3', info='read') |
1039 | 566 | request_handlers.register_lazy( | 600 | request_handlers.register_lazy( |
1040 | 567 | 'delete', 'bzrlib.smart.vfs', 'DeleteRequest') | 601 | 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs') |
1041 | 568 | request_handlers.register_lazy( | 602 | request_handlers.register_lazy( |
1042 | 569 | 'get', 'bzrlib.smart.vfs', 'GetRequest') | 603 | 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read') |
1043 | 570 | request_handlers.register_lazy( | 604 | request_handlers.register_lazy( |
1044 | 571 | 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest') | 605 | 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read') |
1045 | 572 | request_handlers.register_lazy( | 606 | request_handlers.register_lazy( |
1046 | 573 | 'has', 'bzrlib.smart.vfs', 'HasRequest') | 607 | 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read') |
1047 | 574 | request_handlers.register_lazy( | 608 | request_handlers.register_lazy( |
1048 | 575 | 'hello', 'bzrlib.smart.request', 'HelloRequest') | 609 | 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read') |
1049 | 576 | request_handlers.register_lazy( | 610 | request_handlers.register_lazy( |
1050 | 577 | 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest') | 611 | 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest', |
1051 | 578 | request_handlers.register_lazy( | 612 | info='read') |
1052 | 579 | 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest') | 613 | request_handlers.register_lazy( |
1053 | 580 | request_handlers.register_lazy( | 614 | 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read') |
1054 | 581 | 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest') | 615 | request_handlers.register_lazy( |
1055 | 582 | request_handlers.register_lazy( | 616 | 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs') |
1056 | 583 | 'move', 'bzrlib.smart.vfs', 'MoveRequest') | 617 | request_handlers.register_lazy( |
1057 | 584 | request_handlers.register_lazy( | 618 | 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs') |
1058 | 585 | 'put', 'bzrlib.smart.vfs', 'PutRequest') | 619 | request_handlers.register_lazy( |
1059 | 586 | request_handlers.register_lazy( | 620 | 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem') |
1060 | 587 | 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest') | 621 | request_handlers.register_lazy( |
1061 | 588 | request_handlers.register_lazy( | 622 | 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem') |
1062 | 589 | 'readv', 'bzrlib.smart.vfs', 'ReadvRequest') | 623 | request_handlers.register_lazy( |
1063 | 590 | request_handlers.register_lazy( | 624 | 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read') |
1064 | 591 | 'rename', 'bzrlib.smart.vfs', 'RenameRequest') | 625 | request_handlers.register_lazy( |
1065 | 626 | 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs') | ||
1066 | 592 | request_handlers.register_lazy( | 627 | request_handlers.register_lazy( |
1067 | 593 | 'PackRepository.autopack', 'bzrlib.smart.packrepository', | 628 | 'PackRepository.autopack', 'bzrlib.smart.packrepository', |
1089 | 594 | 'SmartServerPackRepositoryAutopack') | 629 | 'SmartServerPackRepositoryAutopack', info='idem') |
1090 | 595 | request_handlers.register_lazy('Repository.gather_stats', | 630 | request_handlers.register_lazy( |
1091 | 596 | 'bzrlib.smart.repository', | 631 | 'Repository.gather_stats', 'bzrlib.smart.repository', |
1092 | 597 | 'SmartServerRepositoryGatherStats') | 632 | 'SmartServerRepositoryGatherStats', info='read') |
1093 | 598 | request_handlers.register_lazy('Repository.get_parent_map', | 633 | request_handlers.register_lazy( |
1094 | 599 | 'bzrlib.smart.repository', | 634 | 'Repository.get_parent_map', 'bzrlib.smart.repository', |
1095 | 600 | 'SmartServerRepositoryGetParentMap') | 635 | 'SmartServerRepositoryGetParentMap', info='read') |
1096 | 601 | request_handlers.register_lazy( | 636 | request_handlers.register_lazy( |
1097 | 602 | 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph') | 637 | 'Repository.get_revision_graph', 'bzrlib.smart.repository', |
1098 | 603 | request_handlers.register_lazy( | 638 | 'SmartServerRepositoryGetRevisionGraph', info='read') |
1099 | 604 | 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision') | 639 | request_handlers.register_lazy( |
1100 | 605 | request_handlers.register_lazy( | 640 | 'Repository.has_revision', 'bzrlib.smart.repository', |
1101 | 606 | 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream') | 641 | 'SmartServerRequestHasRevision', info='read') |
1102 | 607 | request_handlers.register_lazy( | 642 | request_handlers.register_lazy( |
1103 | 608 | 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19') | 643 | 'Repository.insert_stream', 'bzrlib.smart.repository', |
1104 | 609 | request_handlers.register_lazy( | 644 | 'SmartServerRepositoryInsertStream', info='stream') |
1105 | 610 | 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked') | 645 | request_handlers.register_lazy( |
1106 | 611 | request_handlers.register_lazy( | 646 | 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', |
1107 | 612 | 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared') | 647 | 'SmartServerRepositoryInsertStream_1_19', info='stream') |
1108 | 613 | request_handlers.register_lazy( | 648 | request_handlers.register_lazy( |
1109 | 614 | 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite') | 649 | 'Repository.insert_stream_locked', 'bzrlib.smart.repository', |
1110 | 650 | 'SmartServerRepositoryInsertStreamLocked', info='stream') | ||
1111 | 651 | request_handlers.register_lazy( | ||
1112 | 652 | 'Repository.is_shared', 'bzrlib.smart.repository', | ||
1113 | 653 | 'SmartServerRepositoryIsShared', info='read') | ||
1114 | 654 | request_handlers.register_lazy( | ||
1115 | 655 | 'Repository.lock_write', 'bzrlib.smart.repository', | ||
1116 | 656 | 'SmartServerRepositoryLockWrite', info='semi') | ||
1117 | 615 | request_handlers.register_lazy( | 657 | request_handlers.register_lazy( |
1118 | 616 | 'Repository.set_make_working_trees', 'bzrlib.smart.repository', | 658 | 'Repository.set_make_working_trees', 'bzrlib.smart.repository', |
1120 | 617 | 'SmartServerRepositorySetMakeWorkingTrees') | 659 | 'SmartServerRepositorySetMakeWorkingTrees', info='idem') |
1121 | 618 | request_handlers.register_lazy( | 660 | request_handlers.register_lazy( |
1123 | 619 | 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock') | 661 | 'Repository.unlock', 'bzrlib.smart.repository', |
1124 | 662 | 'SmartServerRepositoryUnlock', info='semi') | ||
1125 | 620 | request_handlers.register_lazy( | 663 | request_handlers.register_lazy( |
1126 | 621 | 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository', | 664 | 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository', |
1128 | 622 | 'SmartServerRepositoryGetRevIdForRevno') | 665 | 'SmartServerRepositoryGetRevIdForRevno', info='read') |
1129 | 623 | request_handlers.register_lazy( | 666 | request_handlers.register_lazy( |
1130 | 624 | 'Repository.get_stream', 'bzrlib.smart.repository', | 667 | 'Repository.get_stream', 'bzrlib.smart.repository', |
1132 | 625 | 'SmartServerRepositoryGetStream') | 668 | 'SmartServerRepositoryGetStream', info='read') |
1133 | 626 | request_handlers.register_lazy( | 669 | request_handlers.register_lazy( |
1134 | 627 | 'Repository.get_stream_1.19', 'bzrlib.smart.repository', | 670 | 'Repository.get_stream_1.19', 'bzrlib.smart.repository', |
1136 | 628 | 'SmartServerRepositoryGetStream_1_19') | 671 | 'SmartServerRepositoryGetStream_1_19', info='read') |
1137 | 629 | request_handlers.register_lazy( | 672 | request_handlers.register_lazy( |
1138 | 630 | 'Repository.tarball', 'bzrlib.smart.repository', | 673 | 'Repository.tarball', 'bzrlib.smart.repository', |
1146 | 631 | 'SmartServerRepositoryTarball') | 674 | 'SmartServerRepositoryTarball', info='read') |
1147 | 632 | request_handlers.register_lazy( | 675 | request_handlers.register_lazy( |
1148 | 633 | 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest') | 676 | 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs') |
1149 | 634 | request_handlers.register_lazy( | 677 | request_handlers.register_lazy( |
1150 | 635 | 'stat', 'bzrlib.smart.vfs', 'StatRequest') | 678 | 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read') |
1151 | 636 | request_handlers.register_lazy( | 679 | request_handlers.register_lazy( |
1152 | 637 | 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly') | 680 | 'Transport.is_readonly', 'bzrlib.smart.request', |
1153 | 681 | 'SmartServerIsReadonly', info='read') | ||
1154 | 638 | 682 | ||
1155 | === modified file 'bzrlib/tests/test_bundle.py' | |||
1156 | --- bzrlib/tests/test_bundle.py 2010-02-17 17:11:16 +0000 | |||
1157 | +++ bzrlib/tests/test_bundle.py 2012-09-12 09:29:20 +0000 | |||
1158 | @@ -1855,20 +1855,23 @@ | |||
1159 | 1855 | self.sock.bind(('127.0.0.1', 0)) | 1855 | self.sock.bind(('127.0.0.1', 0)) |
1160 | 1856 | self.sock.listen(1) | 1856 | self.sock.listen(1) |
1161 | 1857 | self.port = self.sock.getsockname()[1] | 1857 | self.port = self.sock.getsockname()[1] |
1162 | 1858 | self.stopping = threading.Event() | ||
1163 | 1858 | self.thread = threading.Thread( | 1859 | self.thread = threading.Thread( |
1164 | 1859 | name='%s (port %d)' % (self.__class__.__name__, self.port), | 1860 | name='%s (port %d)' % (self.__class__.__name__, self.port), |
1165 | 1860 | target=self.accept_and_close) | 1861 | target=self.accept_and_close) |
1166 | 1861 | self.thread.start() | 1862 | self.thread.start() |
1167 | 1862 | 1863 | ||
1168 | 1863 | def accept_and_close(self): | 1864 | def accept_and_close(self): |
1172 | 1864 | conn, addr = self.sock.accept() | 1865 | while not self.stopping.isSet(): |
1173 | 1865 | conn.shutdown(socket.SHUT_RDWR) | 1866 | conn, addr = self.sock.accept() |
1174 | 1866 | conn.close() | 1867 | conn.shutdown(socket.SHUT_RDWR) |
1175 | 1868 | conn.close() | ||
1176 | 1867 | 1869 | ||
1177 | 1868 | def get_url(self): | 1870 | def get_url(self): |
1178 | 1869 | return 'bzr://127.0.0.1:%d/' % (self.port,) | 1871 | return 'bzr://127.0.0.1:%d/' % (self.port,) |
1179 | 1870 | 1872 | ||
1180 | 1871 | def stop_server(self): | 1873 | def stop_server(self): |
1181 | 1874 | self.stopping.set() | ||
1182 | 1872 | try: | 1875 | try: |
1183 | 1873 | # make sure the thread dies by connecting to the listening socket, | 1876 | # make sure the thread dies by connecting to the listening socket, |
1184 | 1874 | # just in case the test failed to do so. | 1877 | # just in case the test failed to do so. |
1185 | 1875 | 1878 | ||
1186 | === modified file 'bzrlib/tests/test_osutils.py' | |||
1187 | --- bzrlib/tests/test_osutils.py 2010-11-30 20:42:42 +0000 | |||
1188 | +++ bzrlib/tests/test_osutils.py 2012-09-12 09:29:20 +0000 | |||
1189 | @@ -801,6 +801,45 @@ | |||
1190 | 801 | self.assertEqual(None, osutils.safe_file_id(None)) | 801 | self.assertEqual(None, osutils.safe_file_id(None)) |
1191 | 802 | 802 | ||
1192 | 803 | 803 | ||
1193 | 804 | class TestSendAll(tests.TestCase): | ||
1194 | 805 | |||
1195 | 806 | def test_send_with_disconnected_socket(self): | ||
1196 | 807 | class DisconnectedSocket(object): | ||
1197 | 808 | def __init__(self, err): | ||
1198 | 809 | self.err = err | ||
1199 | 810 | def send(self, content): | ||
1200 | 811 | raise self.err | ||
1201 | 812 | def close(self): | ||
1202 | 813 | pass | ||
1203 | 814 | # All of these should be treated as ConnectionReset | ||
1204 | 815 | errs = [] | ||
1205 | 816 | for err_cls in (IOError, socket.error): | ||
1206 | 817 | for errnum in osutils._end_of_stream_errors: | ||
1207 | 818 | errs.append(err_cls(errnum)) | ||
1208 | 819 | for err in errs: | ||
1209 | 820 | sock = DisconnectedSocket(err) | ||
1210 | 821 | self.assertRaises(errors.ConnectionReset, | ||
1211 | 822 | osutils.send_all, sock, 'some more content') | ||
1212 | 823 | |||
1213 | 824 | def test_send_with_no_progress(self): | ||
1214 | 825 | # See https://bugs.launchpad.net/bzr/+bug/1047309 | ||
1215 | 826 | # It seems that paramiko can get into a state where it doesn't error, | ||
1216 | 827 | # but it returns 0 bytes sent for requests over and over again. | ||
1217 | 828 | class NoSendingSocket(object): | ||
1218 | 829 | def __init__(self): | ||
1219 | 830 | self.call_count = 0 | ||
1220 | 831 | def send(self, bytes): | ||
1221 | 832 | self.call_count += 1 | ||
1222 | 833 | if self.call_count > 100: | ||
1223 | 834 | # Prevent the test suite from hanging | ||
1224 | 835 | raise RuntimeError('too many calls') | ||
1225 | 836 | return 0 | ||
1226 | 837 | sock = NoSendingSocket() | ||
1227 | 838 | self.assertRaises(errors.ConnectionReset, | ||
1228 | 839 | osutils.send_all, sock, 'content') | ||
1229 | 840 | self.assertEqual(1, sock.call_count) | ||
1230 | 841 | |||
1231 | 842 | |||
1232 | 804 | class TestWin32Funcs(tests.TestCase): | 843 | class TestWin32Funcs(tests.TestCase): |
1233 | 805 | """Test that _win32 versions of os utilities return appropriate paths.""" | 844 | """Test that _win32 versions of os utilities return appropriate paths.""" |
1234 | 806 | 845 | ||
1235 | 807 | 846 | ||
1236 | === modified file 'bzrlib/tests/test_smart_request.py' | |||
1237 | --- bzrlib/tests/test_smart_request.py 2009-07-27 02:11:25 +0000 | |||
1238 | +++ bzrlib/tests/test_smart_request.py 2012-09-12 09:29:20 +0000 | |||
1239 | @@ -109,6 +109,16 @@ | |||
1240 | 109 | self.assertEqual( | 109 | self.assertEqual( |
1241 | 110 | [[transport]] * 3, handler._command.jail_transports_log) | 110 | [[transport]] * 3, handler._command.jail_transports_log) |
1242 | 111 | 111 | ||
1243 | 112 | def test_all_registered_requests_are_safety_qualified(self): | ||
1244 | 113 | unclassified_requests = [] | ||
1245 | 114 | allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream') | ||
1246 | 115 | for key in request.request_handlers.keys(): | ||
1247 | 116 | info = request.request_handlers.get_info(key) | ||
1248 | 117 | if info is None or info not in allowed_info: | ||
1249 | 118 | unclassified_requests.append(key) | ||
1250 | 119 | if unclassified_requests: | ||
1251 | 120 | self.fail('These requests were not categorized as safe/unsafe' | ||
1252 | 121 | ' to retry: %s' % (unclassified_requests,)) | ||
1253 | 112 | 122 | ||
1254 | 113 | 123 | ||
1255 | 114 | class TestSmartRequestHandlerErrorTranslation(TestCase): | 124 | class TestSmartRequestHandlerErrorTranslation(TestCase): |
1256 | 115 | 125 | ||
1257 | === modified file 'bzrlib/tests/test_smart_transport.py' | |||
1258 | --- bzrlib/tests/test_smart_transport.py 2010-02-17 17:11:16 +0000 | |||
1259 | +++ bzrlib/tests/test_smart_transport.py 2012-09-12 09:29:20 +0000 | |||
1260 | @@ -18,13 +18,17 @@ | |||
1261 | 18 | 18 | ||
1262 | 19 | # all of this deals with byte strings so this is safe | 19 | # all of this deals with byte strings so this is safe |
1263 | 20 | from cStringIO import StringIO | 20 | from cStringIO import StringIO |
1264 | 21 | import errno | ||
1265 | 21 | import os | 22 | import os |
1266 | 22 | import socket | 23 | import socket |
1267 | 24 | import subprocess | ||
1268 | 25 | import sys | ||
1269 | 23 | import threading | 26 | import threading |
1270 | 24 | 27 | ||
1271 | 25 | import bzrlib | 28 | import bzrlib |
1272 | 26 | from bzrlib import ( | 29 | from bzrlib import ( |
1273 | 27 | bzrdir, | 30 | bzrdir, |
1274 | 31 | debug, | ||
1275 | 28 | errors, | 32 | errors, |
1276 | 29 | osutils, | 33 | osutils, |
1277 | 30 | tests, | 34 | tests, |
1278 | @@ -49,6 +53,29 @@ | |||
1279 | 49 | from bzrlib.transport.http import SmartClientHTTPMediumRequest | 53 | from bzrlib.transport.http import SmartClientHTTPMediumRequest |
1280 | 50 | 54 | ||
1281 | 51 | 55 | ||
1282 | 56 | def create_file_pipes(): | ||
1283 | 57 | r, w = os.pipe() | ||
1284 | 58 | # These must be opened without buffering, or we get undefined results | ||
1285 | 59 | rf = os.fdopen(r, 'rb', 0) | ||
1286 | 60 | wf = os.fdopen(w, 'wb', 0) | ||
1287 | 61 | return rf, wf | ||
1288 | 62 | |||
1289 | 63 | |||
1290 | 64 | def portable_socket_pair(): | ||
1291 | 65 | """Return a pair of TCP sockets connected to each other. | ||
1292 | 66 | |||
1293 | 67 | Unlike socket.socketpair, this should work on Windows. | ||
1294 | 68 | """ | ||
1295 | 69 | listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
1296 | 70 | listen_sock.bind(('127.0.0.1', 0)) | ||
1297 | 71 | listen_sock.listen(1) | ||
1298 | 72 | client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
1299 | 73 | client_sock.connect(listen_sock.getsockname()) | ||
1300 | 74 | server_sock, addr = listen_sock.accept() | ||
1301 | 75 | listen_sock.close() | ||
1302 | 76 | return server_sock, client_sock | ||
1303 | 77 | |||
1304 | 78 | |||
1305 | 52 | class StringIOSSHVendor(object): | 79 | class StringIOSSHVendor(object): |
1306 | 53 | """A SSH vendor that uses StringIO to buffer writes and answer reads.""" | 80 | """A SSH vendor that uses StringIO to buffer writes and answer reads.""" |
1307 | 54 | 81 | ||
1308 | @@ -63,6 +90,27 @@ | |||
1309 | 63 | return StringIOSSHConnection(self) | 90 | return StringIOSSHConnection(self) |
1310 | 64 | 91 | ||
1311 | 65 | 92 | ||
1312 | 93 | class FirstRejectedStringIOSSHVendor(StringIOSSHVendor): | ||
1313 | 94 | """The first connection will be considered closed. | ||
1314 | 95 | |||
1315 | 96 | The second connection will succeed normally. | ||
1316 | 97 | """ | ||
1317 | 98 | |||
1318 | 99 | def __init__(self, read_from, write_to, fail_at_write=True): | ||
1319 | 100 | super(FirstRejectedStringIOSSHVendor, self).__init__(read_from, | ||
1320 | 101 | write_to) | ||
1321 | 102 | self.fail_at_write = fail_at_write | ||
1322 | 103 | self._first = True | ||
1323 | 104 | |||
1324 | 105 | def connect_ssh(self, username, password, host, port, command): | ||
1325 | 106 | self.calls.append(('connect_ssh', username, password, host, port, | ||
1326 | 107 | command)) | ||
1327 | 108 | if self._first: | ||
1328 | 109 | self._first = False | ||
1329 | 110 | return ClosedSSHConnection(self) | ||
1330 | 111 | return StringIOSSHConnection(self) | ||
1331 | 112 | |||
1332 | 113 | |||
1333 | 66 | class StringIOSSHConnection(object): | 114 | class StringIOSSHConnection(object): |
1334 | 67 | """A SSH connection that uses StringIO to buffer writes and answer reads.""" | 115 | """A SSH connection that uses StringIO to buffer writes and answer reads.""" |
1335 | 68 | 116 | ||
1336 | @@ -71,11 +119,36 @@ | |||
1337 | 71 | 119 | ||
1338 | 72 | def close(self): | 120 | def close(self): |
1339 | 73 | self.vendor.calls.append(('close', )) | 121 | self.vendor.calls.append(('close', )) |
1340 | 122 | self.vendor.read_from.close() | ||
1341 | 123 | self.vendor.write_to.close() | ||
1342 | 74 | 124 | ||
1343 | 75 | def get_filelike_channels(self): | 125 | def get_filelike_channels(self): |
1344 | 76 | return self.vendor.read_from, self.vendor.write_to | 126 | return self.vendor.read_from, self.vendor.write_to |
1345 | 77 | 127 | ||
1346 | 78 | 128 | ||
1347 | 129 | class ClosedSSHConnection(object): | ||
1348 | 130 | """An SSH connection that just has closed channels.""" | ||
1349 | 131 | |||
1350 | 132 | def __init__(self, vendor): | ||
1351 | 133 | self.vendor = vendor | ||
1352 | 134 | |||
1353 | 135 | def close(self): | ||
1354 | 136 | self.vendor.calls.append(('close', )) | ||
1355 | 137 | |||
1356 | 138 | def get_filelike_channels(self): | ||
1357 | 139 | # We create matching pipes, and then close the ssh side | ||
1358 | 140 | bzr_read, ssh_write = create_file_pipes() | ||
1359 | 141 | # We always fail when bzr goes to read | ||
1360 | 142 | ssh_write.close() | ||
1361 | 143 | if self.vendor.fail_at_write: | ||
1362 | 144 | # If set, we'll also fail when bzr goes to write | ||
1363 | 145 | ssh_read, bzr_write = create_file_pipes() | ||
1364 | 146 | ssh_read.close() | ||
1365 | 147 | else: | ||
1366 | 148 | bzr_write = self.vendor.write_to | ||
1367 | 149 | return bzr_read, bzr_write | ||
1368 | 150 | |||
1369 | 151 | |||
1370 | 79 | class _InvalidHostnameFeature(tests.Feature): | 152 | class _InvalidHostnameFeature(tests.Feature): |
1371 | 80 | """Does 'non_existent.invalid' fail to resolve? | 153 | """Does 'non_existent.invalid' fail to resolve? |
1372 | 81 | 154 | ||
1373 | @@ -171,6 +244,91 @@ | |||
1374 | 171 | client_medium._accept_bytes('abc') | 244 | client_medium._accept_bytes('abc') |
1375 | 172 | self.assertEqual('abc', output.getvalue()) | 245 | self.assertEqual('abc', output.getvalue()) |
1376 | 173 | 246 | ||
1377 | 247 | def test_simple_pipes__accept_bytes_subprocess_closed(self): | ||
1378 | 248 | # It is unfortunate that we have to use Popen for this. However, | ||
1379 | 249 | # os.pipe() does not behave the same as subprocess.Popen(). | ||
1380 | 250 | # On Windows, if you use os.pipe() and close the write side, | ||
1381 | 251 | # read.read() hangs. On Linux, read.read() returns the empty string. | ||
1382 | 252 | p = subprocess.Popen([sys.executable, '-c', | ||
1383 | 253 | 'import sys\n' | ||
1384 | 254 | 'sys.stdout.write(sys.stdin.read(4))\n' | ||
1385 | 255 | 'sys.stdout.close()\n'], | ||
1386 | 256 | stdout=subprocess.PIPE, stdin=subprocess.PIPE) | ||
1387 | 257 | client_medium = medium.SmartSimplePipesClientMedium( | ||
1388 | 258 | p.stdout, p.stdin, 'base') | ||
1389 | 259 | client_medium._accept_bytes('abc\n') | ||
1390 | 260 | self.assertEqual('abc', client_medium._read_bytes(3)) | ||
1391 | 261 | p.wait() | ||
1392 | 262 | # While writing to the underlying pipe, | ||
1393 | 263 | # Windows py2.6.6 we get IOError(EINVAL) | ||
1394 | 264 | # Lucid py2.6.5, we get IOError(EPIPE) | ||
1395 | 265 | # In both cases, it should be wrapped to ConnectionReset | ||
1396 | 266 | self.assertRaises(errors.ConnectionReset, | ||
1397 | 267 | client_medium._accept_bytes, 'more') | ||
1398 | 268 | |||
1399 | 269 | def test_simple_pipes__accept_bytes_pipe_closed(self): | ||
1400 | 270 | child_read, client_write = create_file_pipes() | ||
1401 | 271 | client_medium = medium.SmartSimplePipesClientMedium( | ||
1402 | 272 | None, client_write, 'base') | ||
1403 | 273 | client_medium._accept_bytes('abc\n') | ||
1404 | 274 | self.assertEqual('abc\n', child_read.read(4)) | ||
1405 | 275 | # While writing to the underlying pipe, | ||
1406 | 276 | # Windows py2.6.6 we get IOError(EINVAL) | ||
1407 | 277 | # Lucid py2.6.5, we get IOError(EPIPE) | ||
1408 | 278 | # In both cases, it should be wrapped to ConnectionReset | ||
1409 | 279 | child_read.close() | ||
1410 | 280 | self.assertRaises(errors.ConnectionReset, | ||
1411 | 281 | client_medium._accept_bytes, 'more') | ||
1412 | 282 | |||
1413 | 283 | def test_simple_pipes__flush_pipe_closed(self): | ||
1414 | 284 | child_read, client_write = create_file_pipes() | ||
1415 | 285 | client_medium = medium.SmartSimplePipesClientMedium( | ||
1416 | 286 | None, client_write, 'base') | ||
1417 | 287 | client_medium._accept_bytes('abc\n') | ||
1418 | 288 | child_read.close() | ||
1419 | 289 | # Even though the pipe is closed, flush on the write side seems to be a | ||
1420 | 290 | # no-op, rather than a failure. | ||
1421 | 291 | client_medium._flush() | ||
1422 | 292 | |||
1423 | 293 | def test_simple_pipes__flush_subprocess_closed(self): | ||
1424 | 294 | p = subprocess.Popen([sys.executable, '-c', | ||
1425 | 295 | 'import sys\n' | ||
1426 | 296 | 'sys.stdout.write(sys.stdin.read(4))\n' | ||
1427 | 297 | 'sys.stdout.close()\n'], | ||
1428 | 298 | stdout=subprocess.PIPE, stdin=subprocess.PIPE) | ||
1429 | 299 | client_medium = medium.SmartSimplePipesClientMedium( | ||
1430 | 300 | p.stdout, p.stdin, 'base') | ||
1431 | 301 | client_medium._accept_bytes('abc\n') | ||
1432 | 302 | p.wait() | ||
1433 | 303 | # Even though the child process is dead, flush seems to be a no-op. | ||
1434 | 304 | client_medium._flush() | ||
1435 | 305 | |||
1436 | 306 | def test_simple_pipes__read_bytes_pipe_closed(self): | ||
1437 | 307 | child_read, client_write = create_file_pipes() | ||
1438 | 308 | client_medium = medium.SmartSimplePipesClientMedium( | ||
1439 | 309 | child_read, client_write, 'base') | ||
1440 | 310 | client_medium._accept_bytes('abc\n') | ||
1441 | 311 | client_write.close() | ||
1442 | 312 | self.assertEqual('abc\n', client_medium._read_bytes(4)) | ||
1443 | 313 | self.assertEqual('', client_medium._read_bytes(4)) | ||
1444 | 314 | |||
1445 | 315 | def test_simple_pipes__read_bytes_subprocess_closed(self): | ||
1446 | 316 | p = subprocess.Popen([sys.executable, '-c', | ||
1447 | 317 | 'import sys\n' | ||
1448 | 318 | 'if sys.platform == "win32":\n' | ||
1449 | 319 | ' import msvcrt, os\n' | ||
1450 | 320 | ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n' | ||
1451 | 321 | ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n' | ||
1452 | 322 | 'sys.stdout.write(sys.stdin.read(4))\n' | ||
1453 | 323 | 'sys.stdout.close()\n'], | ||
1454 | 324 | stdout=subprocess.PIPE, stdin=subprocess.PIPE) | ||
1455 | 325 | client_medium = medium.SmartSimplePipesClientMedium( | ||
1456 | 326 | p.stdout, p.stdin, 'base') | ||
1457 | 327 | client_medium._accept_bytes('abc\n') | ||
1458 | 328 | p.wait() | ||
1459 | 329 | self.assertEqual('abc\n', client_medium._read_bytes(4)) | ||
1460 | 330 | self.assertEqual('', client_medium._read_bytes(4)) | ||
1461 | 331 | |||
1462 | 174 | def test_simple_pipes_client_disconnect_does_nothing(self): | 332 | def test_simple_pipes_client_disconnect_does_nothing(self): |
1463 | 175 | # calling disconnect does nothing. | 333 | # calling disconnect does nothing. |
1464 | 176 | input = StringIO() | 334 | input = StringIO() |
1465 | @@ -556,6 +714,28 @@ | |||
1466 | 556 | request.finished_reading() | 714 | request.finished_reading() |
1467 | 557 | self.assertRaises(errors.ReadingCompleted, request.read_bytes, None) | 715 | self.assertRaises(errors.ReadingCompleted, request.read_bytes, None) |
1468 | 558 | 716 | ||
1469 | 717 | def test_reset(self): | ||
1470 | 718 | server_sock, client_sock = portable_socket_pair() | ||
1471 | 719 | # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of | ||
1472 | 720 | # bzr where it exists. | ||
1473 | 721 | client_medium = medium.SmartTCPClientMedium(None, None, None) | ||
1474 | 722 | client_medium._socket = client_sock | ||
1475 | 723 | client_medium._connected = True | ||
1476 | 724 | req = client_medium.get_request() | ||
1477 | 725 | self.assertRaises(errors.TooManyConcurrentRequests, | ||
1478 | 726 | client_medium.get_request) | ||
1479 | 727 | client_medium.reset() | ||
1480 | 728 | # The stream should be reset, marked as disconnected, though ready for | ||
1481 | 729 | # us to make a new request | ||
1482 | 730 | self.assertFalse(client_medium._connected) | ||
1483 | 731 | self.assertIs(None, client_medium._socket) | ||
1484 | 732 | try: | ||
1485 | 733 | self.assertEqual('', client_sock.recv(1)) | ||
1486 | 734 | except socket.error, e: | ||
1487 | 735 | if e.errno not in (errno.EBADF,): | ||
1488 | 736 | raise | ||
1489 | 737 | req = client_medium.get_request() | ||
1490 | 738 | |||
1491 | 559 | 739 | ||
1492 | 560 | class RemoteTransportTests(TestCaseWithSmartMedium): | 740 | class RemoteTransportTests(TestCaseWithSmartMedium): |
1493 | 561 | 741 | ||
1494 | @@ -609,20 +789,6 @@ | |||
1495 | 609 | super(TestSmartServerStreamMedium, self).setUp() | 789 | super(TestSmartServerStreamMedium, self).setUp() |
1496 | 610 | self._captureVar('BZR_NO_SMART_VFS', None) | 790 | self._captureVar('BZR_NO_SMART_VFS', None) |
1497 | 611 | 791 | ||
1498 | 612 | def portable_socket_pair(self): | ||
1499 | 613 | """Return a pair of TCP sockets connected to each other. | ||
1500 | 614 | |||
1501 | 615 | Unlike socket.socketpair, this should work on Windows. | ||
1502 | 616 | """ | ||
1503 | 617 | listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
1504 | 618 | listen_sock.bind(('127.0.0.1', 0)) | ||
1505 | 619 | listen_sock.listen(1) | ||
1506 | 620 | client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
1507 | 621 | client_sock.connect(listen_sock.getsockname()) | ||
1508 | 622 | server_sock, addr = listen_sock.accept() | ||
1509 | 623 | listen_sock.close() | ||
1510 | 624 | return server_sock, client_sock | ||
1511 | 625 | |||
1512 | 626 | def test_smart_query_version(self): | 792 | def test_smart_query_version(self): |
1513 | 627 | """Feed a canned query version to a server""" | 793 | """Feed a canned query version to a server""" |
1514 | 628 | # wire-to-wire, using the whole stack | 794 | # wire-to-wire, using the whole stack |
1515 | @@ -687,7 +853,7 @@ | |||
1516 | 687 | 853 | ||
1517 | 688 | def test_socket_stream_with_bulk_data(self): | 854 | def test_socket_stream_with_bulk_data(self): |
1518 | 689 | sample_request_bytes = 'command\n9\nbulk datadone\n' | 855 | sample_request_bytes = 'command\n9\nbulk datadone\n' |
1520 | 690 | server_sock, client_sock = self.portable_socket_pair() | 856 | server_sock, client_sock = portable_socket_pair() |
1521 | 691 | server = medium.SmartServerSocketStreamMedium( | 857 | server = medium.SmartServerSocketStreamMedium( |
1522 | 692 | server_sock, None) | 858 | server_sock, None) |
1523 | 693 | sample_protocol = SampleRequest(expected_bytes=sample_request_bytes) | 859 | sample_protocol = SampleRequest(expected_bytes=sample_request_bytes) |
1524 | @@ -706,7 +872,7 @@ | |||
1525 | 706 | self.assertTrue(server.finished) | 872 | self.assertTrue(server.finished) |
1526 | 707 | 873 | ||
1527 | 708 | def test_socket_stream_shutdown_detection(self): | 874 | def test_socket_stream_shutdown_detection(self): |
1529 | 709 | server_sock, client_sock = self.portable_socket_pair() | 875 | server_sock, client_sock = portable_socket_pair() |
1530 | 710 | client_sock.close() | 876 | client_sock.close() |
1531 | 711 | server = medium.SmartServerSocketStreamMedium( | 877 | server = medium.SmartServerSocketStreamMedium( |
1532 | 712 | server_sock, None) | 878 | server_sock, None) |
1533 | @@ -726,7 +892,7 @@ | |||
1534 | 726 | rest_of_request_bytes = 'lo\n' | 892 | rest_of_request_bytes = 'lo\n' |
1535 | 727 | expected_response = ( | 893 | expected_response = ( |
1536 | 728 | protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n') | 894 | protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n') |
1538 | 729 | server_sock, client_sock = self.portable_socket_pair() | 895 | server_sock, client_sock = portable_socket_pair() |
1539 | 730 | server = medium.SmartServerSocketStreamMedium( | 896 | server = medium.SmartServerSocketStreamMedium( |
1540 | 731 | server_sock, None) | 897 | server_sock, None) |
1541 | 732 | client_sock.sendall(incomplete_request_bytes) | 898 | client_sock.sendall(incomplete_request_bytes) |
1542 | @@ -802,7 +968,7 @@ | |||
1543 | 802 | # _serve_one_request should still process both of them as if they had | 968 | # _serve_one_request should still process both of them as if they had |
1544 | 803 | # been received separately. | 969 | # been received separately. |
1545 | 804 | sample_request_bytes = 'command\n' | 970 | sample_request_bytes = 'command\n' |
1547 | 805 | server_sock, client_sock = self.portable_socket_pair() | 971 | server_sock, client_sock = portable_socket_pair() |
1548 | 806 | server = medium.SmartServerSocketStreamMedium( | 972 | server = medium.SmartServerSocketStreamMedium( |
1549 | 807 | server_sock, None) | 973 | server_sock, None) |
1550 | 808 | first_protocol = SampleRequest(expected_bytes=sample_request_bytes) | 974 | first_protocol = SampleRequest(expected_bytes=sample_request_bytes) |
1551 | @@ -839,7 +1005,7 @@ | |||
1552 | 839 | self.assertTrue(server.finished) | 1005 | self.assertTrue(server.finished) |
1553 | 840 | 1006 | ||
1554 | 841 | def test_socket_stream_error_handling(self): | 1007 | def test_socket_stream_error_handling(self): |
1556 | 842 | server_sock, client_sock = self.portable_socket_pair() | 1008 | server_sock, client_sock = portable_socket_pair() |
1557 | 843 | server = medium.SmartServerSocketStreamMedium( | 1009 | server = medium.SmartServerSocketStreamMedium( |
1558 | 844 | server_sock, None) | 1010 | server_sock, None) |
1559 | 845 | fake_protocol = ErrorRaisingProtocol(Exception('boom')) | 1011 | fake_protocol = ErrorRaisingProtocol(Exception('boom')) |
1560 | @@ -860,7 +1026,7 @@ | |||
1561 | 860 | self.assertEqual('', from_server.getvalue()) | 1026 | self.assertEqual('', from_server.getvalue()) |
1562 | 861 | 1027 | ||
1563 | 862 | def test_socket_stream_keyboard_interrupt_handling(self): | 1028 | def test_socket_stream_keyboard_interrupt_handling(self): |
1565 | 863 | server_sock, client_sock = self.portable_socket_pair() | 1029 | server_sock, client_sock = portable_socket_pair() |
1566 | 864 | server = medium.SmartServerSocketStreamMedium( | 1030 | server = medium.SmartServerSocketStreamMedium( |
1567 | 865 | server_sock, None) | 1031 | server_sock, None) |
1568 | 866 | fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom')) | 1032 | fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom')) |
1569 | @@ -877,7 +1043,7 @@ | |||
1570 | 877 | return server._build_protocol() | 1043 | return server._build_protocol() |
1571 | 878 | 1044 | ||
1572 | 879 | def build_protocol_socket(self, bytes): | 1045 | def build_protocol_socket(self, bytes): |
1574 | 880 | server_sock, client_sock = self.portable_socket_pair() | 1046 | server_sock, client_sock = portable_socket_pair() |
1575 | 881 | server = medium.SmartServerSocketStreamMedium( | 1047 | server = medium.SmartServerSocketStreamMedium( |
1576 | 882 | server_sock, None) | 1048 | server_sock, None) |
1577 | 883 | client_sock.sendall(bytes) | 1049 | client_sock.sendall(bytes) |
1578 | @@ -2778,6 +2944,33 @@ | |||
1579 | 2778 | 'e', # end | 2944 | 'e', # end |
1580 | 2779 | output.getvalue()) | 2945 | output.getvalue()) |
1581 | 2780 | 2946 | ||
1582 | 2947 | def test_records_start_of_body_stream(self): | ||
1583 | 2948 | requester, output = self.make_client_encoder_and_output() | ||
1584 | 2949 | requester.set_headers({}) | ||
1585 | 2950 | in_stream = [False] | ||
1586 | 2951 | def stream_checker(): | ||
1587 | 2952 | self.assertTrue(requester.body_stream_started) | ||
1588 | 2953 | in_stream[0] = True | ||
1589 | 2954 | yield 'content' | ||
1590 | 2955 | flush_called = [] | ||
1591 | 2956 | orig_flush = requester.flush | ||
1592 | 2957 | def tracked_flush(): | ||
1593 | 2958 | flush_called.append(in_stream[0]) | ||
1594 | 2959 | if in_stream[0]: | ||
1595 | 2960 | self.assertTrue(requester.body_stream_started) | ||
1596 | 2961 | else: | ||
1597 | 2962 | self.assertFalse(requester.body_stream_started) | ||
1598 | 2963 | return orig_flush() | ||
1599 | 2964 | requester.flush = tracked_flush | ||
1600 | 2965 | requester.call_with_body_stream(('one arg',), stream_checker()) | ||
1601 | 2966 | self.assertEqual( | ||
1602 | 2967 | 'bzr message 3 (bzr 1.6)\n' # protocol version | ||
1603 | 2968 | '\x00\x00\x00\x02de' # headers | ||
1604 | 2969 | 's\x00\x00\x00\x0bl7:one arge' # args | ||
1605 | 2970 | 'b\x00\x00\x00\x07content' # body | ||
1606 | 2971 | 'e', output.getvalue()) | ||
1607 | 2972 | self.assertEqual([False, True, True], flush_called) | ||
1608 | 2973 | |||
1609 | 2781 | 2974 | ||
1610 | 2782 | class StubMediumRequest(object): | 2975 | class StubMediumRequest(object): |
1611 | 2783 | """A stub medium request that tracks the number of times accept_bytes is | 2976 | """A stub medium request that tracks the number of times accept_bytes is |
1612 | @@ -3214,6 +3407,195 @@ | |||
1613 | 3214 | # encoder. | 3407 | # encoder. |
1614 | 3215 | 3408 | ||
1615 | 3216 | 3409 | ||
1616 | 3410 | class Test_SmartClientRequest(tests.TestCase): | ||
1617 | 3411 | |||
1618 | 3412 | def make_client_with_failing_medium(self, fail_at_write=True, response=''): | ||
1619 | 3413 | response_io = StringIO(response) | ||
1620 | 3414 | output = StringIO() | ||
1621 | 3415 | vendor = FirstRejectedStringIOSSHVendor(response_io, output, | ||
1622 | 3416 | fail_at_write=fail_at_write) | ||
1623 | 3417 | client_medium = medium.SmartSSHClientMedium( | ||
1624 | 3418 | 'a host', 'a port', 'a user', 'a pass', 'base', vendor, | ||
1625 | 3419 | 'bzr') | ||
1626 | 3420 | smart_client = client._SmartClient(client_medium, headers={}) | ||
1627 | 3421 | return output, vendor, smart_client | ||
1628 | 3422 | |||
1629 | 3423 | def make_response(self, args, body=None, body_stream=None): | ||
1630 | 3424 | response_io = StringIO() | ||
1631 | 3425 | response = _mod_request.SuccessfulSmartServerResponse(args, body=body, | ||
1632 | 3426 | body_stream=body_stream) | ||
1633 | 3427 | responder = protocol.ProtocolThreeResponder(response_io.write) | ||
1634 | 3428 | responder.send_response(response) | ||
1635 | 3429 | return response_io.getvalue() | ||
1636 | 3430 | |||
1637 | 3431 | def test__call_doesnt_retry_append(self): | ||
1638 | 3432 | response = self.make_response(('appended', '8')) | ||
1639 | 3433 | output, vendor, smart_client = self.make_client_with_failing_medium( | ||
1640 | 3434 | fail_at_write=False, response=response) | ||
1641 | 3435 | smart_request = client._SmartClientRequest(smart_client, 'append', | ||
1642 | 3436 | ('foo', ''), body='content\n') | ||
1643 | 3437 | self.assertRaises(errors.ConnectionReset, smart_request._call, 3) | ||
1644 | 3438 | |||
1645 | 3439 | def test__call_retries_get_bytes(self): | ||
1646 | 3440 | response = self.make_response(('ok',), 'content\n') | ||
1647 | 3441 | output, vendor, smart_client = self.make_client_with_failing_medium( | ||
1648 | 3442 | fail_at_write=False, response=response) | ||
1649 | 3443 | smart_request = client._SmartClientRequest(smart_client, 'get', | ||
1650 | 3444 | ('foo',)) | ||
1651 | 3445 | response, response_handler = smart_request._call(3) | ||
1652 | 3446 | self.assertEqual(('ok',), response) | ||
1653 | 3447 | self.assertEqual('content\n', response_handler.read_body_bytes()) | ||
1654 | 3448 | |||
1655 | 3449 | def test__call_noretry_get_bytes(self): | ||
1656 | 3450 | debug.debug_flags.add('noretry') | ||
1657 | 3451 | response = self.make_response(('ok',), 'content\n') | ||
1658 | 3452 | output, vendor, smart_client = self.make_client_with_failing_medium( | ||
1659 | 3453 | fail_at_write=False, response=response) | ||
1660 | 3454 | smart_request = client._SmartClientRequest(smart_client, 'get', | ||
1661 | 3455 | ('foo',)) | ||
1662 | 3456 | self.assertRaises(errors.ConnectionReset, smart_request._call, 3) | ||
1663 | 3457 | |||
1664 | 3458 | def test__send_no_retry_pipes(self): | ||
1665 | 3459 | client_read, server_write = create_file_pipes() | ||
1666 | 3460 | server_read, client_write = create_file_pipes() | ||
1667 | 3461 | client_medium = medium.SmartSimplePipesClientMedium(client_read, | ||
1668 | 3462 | client_write, base='/') | ||
1669 | 3463 | smart_client = client._SmartClient(client_medium) | ||
1670 | 3464 | smart_request = client._SmartClientRequest(smart_client, | ||
1671 | 3465 | 'hello', ()) | ||
1672 | 3466 | # Close the server side | ||
1673 | 3467 | server_read.close() | ||
1674 | 3468 | encoder, response_handler = smart_request._construct_protocol(3) | ||
1675 | 3469 | self.assertRaises(errors.ConnectionReset, | ||
1676 | 3470 | smart_request._send_no_retry, encoder) | ||
1677 | 3471 | |||
1678 | 3472 | def test__send_read_response_sockets(self): | ||
1679 | 3473 | listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
1680 | 3474 | listen_sock.bind(('127.0.0.1', 0)) | ||
1681 | 3475 | listen_sock.listen(1) | ||
1682 | 3476 | host, port = listen_sock.getsockname() | ||
1683 | 3477 | client_medium = medium.SmartTCPClientMedium(host, port, '/') | ||
1684 | 3478 | client_medium._ensure_connection() | ||
1685 | 3479 | smart_client = client._SmartClient(client_medium) | ||
1686 | 3480 | smart_request = client._SmartClientRequest(smart_client, 'hello', ()) | ||
1687 | 3481 | # Accept the connection, but don't actually talk to the client. | ||
1688 | 3482 | server_sock, _ = listen_sock.accept() | ||
1689 | 3483 | server_sock.close() | ||
1690 | 3484 | # Sockets buffer and don't really notice that the server has closed the | ||
1691 | 3485 | # connection until we try to read again. | ||
1692 | 3486 | handler = smart_request._send(3) | ||
1693 | 3487 | self.assertRaises(errors.ConnectionReset, | ||
1694 | 3488 | handler.read_response_tuple, expect_body=False) | ||
1695 | 3489 | |||
1696 | 3490 | def test__send_retries_on_write(self): | ||
1697 | 3491 | output, vendor, smart_client = self.make_client_with_failing_medium() | ||
1698 | 3492 | smart_request = client._SmartClientRequest(smart_client, 'hello', ()) | ||
1699 | 3493 | handler = smart_request._send(3) | ||
1700 | 3494 | self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol | ||
1701 | 3495 | '\x00\x00\x00\x02de' # empty headers | ||
1702 | 3496 | 's\x00\x00\x00\tl5:helloee', | ||
1703 | 3497 | output.getvalue()) | ||
1704 | 3498 | self.assertEqual( | ||
1705 | 3499 | [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', | ||
1706 | 3500 | ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), | ||
1707 | 3501 | ('close',), | ||
1708 | 3502 | ('connect_ssh', 'a user', 'a pass', 'a host', 'a port', | ||
1709 | 3503 | ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), | ||
1710 | 3504 | ], | ||
1711 | 3505 | vendor.calls) | ||
1712 | 3506 | |||
1713 | 3507 | def test__send_doesnt_retry_read_failure(self): | ||
1714 | 3508 | output, vendor, smart_client = self.make_client_with_failing_medium( | ||
1715 | 3509 | fail_at_write=False) | ||
1716 | 3510 | smart_request = client._SmartClientRequest(smart_client, 'hello', ()) | ||
1717 | 3511 | handler = smart_request._send(3) | ||
1718 | 3512 | self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol | ||
1719 | 3513 | '\x00\x00\x00\x02de' # empty headers | ||
1720 | 3514 | 's\x00\x00\x00\tl5:helloee', | ||
1721 | 3515 | output.getvalue()) | ||
1722 | 3516 | self.assertEqual( | ||
1723 | 3517 | [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', | ||
1724 | 3518 | ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), | ||
1725 | 3519 | ], | ||
1726 | 3520 | vendor.calls) | ||
1727 | 3521 | self.assertRaises(errors.ConnectionReset, handler.read_response_tuple) | ||
1728 | 3522 | |||
1729 | 3523 | def test__send_request_retries_body_stream_if_not_started(self): | ||
1730 | 3524 | output, vendor, smart_client = self.make_client_with_failing_medium() | ||
1731 | 3525 | smart_request = client._SmartClientRequest(smart_client, 'hello', (), | ||
1732 | 3526 | body_stream=['a', 'b']) | ||
1733 | 3527 | response_handler = smart_request._send(3) | ||
1734 | 3528 | # We connect, get disconnected, and notice before consuming the stream, | ||
1735 | 3529 | # so we try again one time and succeed. | ||
1736 | 3530 | self.assertEqual( | ||
1737 | 3531 | [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', | ||
1738 | 3532 | ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), | ||
1739 | 3533 | ('close',), | ||
1740 | 3534 | ('connect_ssh', 'a user', 'a pass', 'a host', 'a port', | ||
1741 | 3535 | ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), | ||
1742 | 3536 | ], | ||
1743 | 3537 | vendor.calls) | ||
1744 | 3538 | self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol | ||
1745 | 3539 | '\x00\x00\x00\x02de' # empty headers | ||
1746 | 3540 | 's\x00\x00\x00\tl5:helloe' | ||
1747 | 3541 | 'b\x00\x00\x00\x01a' | ||
1748 | 3542 | 'b\x00\x00\x00\x01b' | ||
1749 | 3543 | 'e', | ||
1750 | 3544 | output.getvalue()) | ||
1751 | 3545 | |||
1752 | 3546 | def test__send_request_stops_if_body_started(self): | ||
1753 | 3547 | # We intentionally use the python StringIO so that we can subclass it. | ||
1754 | 3548 | from StringIO import StringIO | ||
1755 | 3549 | response = StringIO() | ||
1756 | 3550 | |||
1757 | 3551 | class FailAfterFirstWrite(StringIO): | ||
1758 | 3552 | """Allow one 'write' call to pass, fail the rest""" | ||
1759 | 3553 | def __init__(self): | ||
1760 | 3554 | StringIO.__init__(self) | ||
1761 | 3555 | self._first = True | ||
1762 | 3556 | |||
1763 | 3557 | def write(self, s): | ||
1764 | 3558 | if self._first: | ||
1765 | 3559 | self._first = False | ||
1766 | 3560 | return StringIO.write(self, s) | ||
1767 | 3561 | raise IOError(errno.EINVAL, 'invalid file handle') | ||
1768 | 3562 | output = FailAfterFirstWrite() | ||
1769 | 3563 | |||
1770 | 3564 | vendor = FirstRejectedStringIOSSHVendor(response, output, | ||
1771 | 3565 | fail_at_write=False) | ||
1772 | 3566 | client_medium = medium.SmartSSHClientMedium( | ||
1773 | 3567 | 'a host', 'a port', 'a user', 'a pass', 'base', vendor, | ||
1774 | 3568 | 'bzr') | ||
1775 | 3569 | smart_client = client._SmartClient(client_medium, headers={}) | ||
1776 | 3570 | smart_request = client._SmartClientRequest(smart_client, 'hello', (), | ||
1777 | 3571 | body_stream=['a', 'b']) | ||
1778 | 3572 | self.assertRaises(errors.ConnectionReset, smart_request._send, 3) | ||
1779 | 3573 | # We connect, and manage to get to the point that we start consuming | ||
1780 | 3574 | # the body stream. The next write fails, so we just stop. | ||
1781 | 3575 | self.assertEqual( | ||
1782 | 3576 | [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', | ||
1783 | 3577 | ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), | ||
1784 | 3578 | ('close',), | ||
1785 | 3579 | ], | ||
1786 | 3580 | vendor.calls) | ||
1787 | 3581 | self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol | ||
1788 | 3582 | '\x00\x00\x00\x02de' # empty headers | ||
1789 | 3583 | 's\x00\x00\x00\tl5:helloe', | ||
1790 | 3584 | output.getvalue()) | ||
1791 | 3585 | |||
1792 | 3586 | def test__send_disabled_retry(self): | ||
1793 | 3587 | debug.debug_flags.add('noretry') | ||
1794 | 3588 | output, vendor, smart_client = self.make_client_with_failing_medium() | ||
1795 | 3589 | smart_request = client._SmartClientRequest(smart_client, 'hello', ()) | ||
1796 | 3590 | self.assertRaises(errors.ConnectionReset, smart_request._send, 3) | ||
1797 | 3591 | self.assertEqual( | ||
1798 | 3592 | [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', | ||
1799 | 3593 | ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), | ||
1800 | 3594 | ('close',), | ||
1801 | 3595 | ], | ||
1802 | 3596 | vendor.calls) | ||
1803 | 3597 | |||
1804 | 3598 | |||
1805 | 3217 | class LengthPrefixedBodyDecoder(tests.TestCase): | 3599 | class LengthPrefixedBodyDecoder(tests.TestCase): |
1806 | 3218 | 3600 | ||
1807 | 3219 | # XXX: TODO: make accept_reading_trailer invoke translate_response or | 3601 | # XXX: TODO: make accept_reading_trailer invoke translate_response or |
Looks like a good collection of updates for the LTS 2.1! Thanks for porting the goodness back from 2.5 and fixing the test that broke.
+1
A question regarding the comment in bzrlib/ smart/protocol. py:_encode_ tuple() :
Is the Unicode response issue really an aberrant case? If filesystem names can appear in the "args" list of _encode_tuple, it seems that having Unicode entries would be a matter of course--especially in locales other than "C".