Merge lp:~jameinel/bzr/2.5-client-reconnect-819604 into lp:bzr
- 2.5-client-reconnect-819604
- Merge into bzr.dev
Status: | Superseded |
---|---|
Proposed branch: | lp:~jameinel/bzr/2.5-client-reconnect-819604 |
Merge into: | lp:bzr |
Diff against target: |
1324 lines (+799/-199) 13 files modified
bzrlib/help_topics/en/debug-flags.txt (+2/-0) bzrlib/smart/client.py (+208/-86) bzrlib/smart/medium.py (+20/-5) bzrlib/smart/protocol.py (+5/-3) bzrlib/smart/request.py (+145/-100) bzrlib/tests/test_smart.py (+5/-2) bzrlib/tests/test_smart_request.py (+10/-0) bzrlib/tests/test_smart_transport.py (+376/-0) doc/en/release-notes/bzr-2.1.txt (+5/-0) doc/en/release-notes/bzr-2.2.txt (+5/-0) doc/en/release-notes/bzr-2.3.txt (+5/-0) doc/en/release-notes/bzr-2.4.txt (+8/-3) doc/en/release-notes/bzr-2.5.txt (+5/-0) |
To merge this branch: | bzr merge lp:~jameinel/bzr/2.5-client-reconnect-819604 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Jelmer Vernooij (community) | Approve | ||
Martin Pool | Approve | ||
Review via email: mp+78856@code.launchpad.net |
This proposal has been superseded by a proposal from 2011-11-25.
Commit message
Bug #819604, allow clients to reconnect if they get ConnectionReset while trying to send an RPC.
Description of the change
And finally, this is a merge-up of all the previous versions of my client-reconnect patch, targetting bzr.dev.
The main conflicts were just the test_smart_
Martin Pool (mbp) wrote : | # |
John A Meinel (jameinel) wrote : | # |
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1
On 10/13/2011 7:07 AM, Martin Pool wrote:
> If this is just a merge-up, feel free to just land it. Otherwise
> I'll read it later.
>
This is a merge up (+ resolving conflicts, etc), but the 2.1 code
still needs to be approved.
John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://
iEYEARECAAYFAk6
7M8An39FrO40evT
=ncmU
-----END PGP SIGNATURE-----
Martin Pool (mbp) wrote : | # |
I think we should land this feature and give it a good shake out on 2.5 first, before even thinking about putting it back into previous series, partly because it's just a big patch, partly because so many different cases can come up in handling disconnects. So partly because of that, and to see everything that would land, and to avoid reviewing the backport shims right now.
I really like the split out of the request object; that is pretty clear.
[fix] Obviously if we land this only in trunk first, we have to remove those lines from the old news.
I am glad you added a flag to turn it off.
I think this is pretty clear and I don't see any problems. I don't see any unaddressed problems in the other reviews (are there?) I wouldn't be surprised if we see some problems on the real network but the only way to tell is to bring it in.
Does anyone else have objections or see problems?
Jelmer Vernooij (jelmer) wrote : | # |
mgz and I just discussed this independently as well, before reading poolie's review and came to the same conclusion.
Let's land this as early in the cycle as we can so we can shake out bugs.
Independently, we should look into the issues (spurious failures, etc) with some of the smart server tests, some of which have become more frequent after the prerequisites of this branch landed.
We're a bit worried about this feature hiding issues during tests, e.g. with servers dropping connections after requests. One way to work around that would be to add tests that make sure that commands only open X connections. This would also catch some other bugs, such as command implementations not using possible_
Jelmer Vernooij (jelmer) wrote : | # |
sent to pqm by email
Preview Diff
1 | === modified file 'bzrlib/help_topics/en/debug-flags.txt' |
2 | --- bzrlib/help_topics/en/debug-flags.txt 2011-05-27 17:10:05 +0000 |
3 | +++ bzrlib/help_topics/en/debug-flags.txt 2011-10-10 15:16:30 +0000 |
4 | @@ -24,6 +24,8 @@ |
5 | -Dindex Trace major index operations. |
6 | -Dknit Trace knit operations. |
7 | -Dlock Trace when lockdir locks are taken or released. |
8 | +-Dnoretry If a connection is reset, fail immediately rather than |
9 | + retrying the request. |
10 | -Dprogress Trace progress bar operations. |
11 | -Dmem_dump Dump memory to a file upon an out of memory error. |
12 | -Dmerge Emit information for debugging merges. |
13 | |
14 | === modified file 'bzrlib/smart/client.py' |
15 | --- bzrlib/smart/client.py 2011-03-30 11:45:54 +0000 |
16 | +++ bzrlib/smart/client.py 2011-10-10 15:16:30 +0000 |
17 | @@ -14,12 +14,18 @@ |
18 | # along with this program; if not, write to the Free Software |
19 | # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
20 | |
21 | +from bzrlib import lazy_import |
22 | +lazy_import.lazy_import(globals(), """ |
23 | +from bzrlib.smart import request as _mod_request |
24 | +""") |
25 | + |
26 | import bzrlib |
27 | from bzrlib.smart import message, protocol |
28 | -from bzrlib.trace import warning |
29 | from bzrlib import ( |
30 | + debug, |
31 | errors, |
32 | hooks, |
33 | + trace, |
34 | ) |
35 | |
36 | |
37 | @@ -39,93 +45,12 @@ |
38 | def __repr__(self): |
39 | return '%s(%r)' % (self.__class__.__name__, self._medium) |
40 | |
41 | - def _send_request(self, protocol_version, method, args, body=None, |
42 | - readv_body=None, body_stream=None): |
43 | - encoder, response_handler = self._construct_protocol( |
44 | - protocol_version) |
45 | - encoder.set_headers(self._headers) |
46 | - if body is not None: |
47 | - if readv_body is not None: |
48 | - raise AssertionError( |
49 | - "body and readv_body are mutually exclusive.") |
50 | - if body_stream is not None: |
51 | - raise AssertionError( |
52 | - "body and body_stream are mutually exclusive.") |
53 | - encoder.call_with_body_bytes((method, ) + args, body) |
54 | - elif readv_body is not None: |
55 | - if body_stream is not None: |
56 | - raise AssertionError( |
57 | - "readv_body and body_stream are mutually exclusive.") |
58 | - encoder.call_with_body_readv_array((method, ) + args, readv_body) |
59 | - elif body_stream is not None: |
60 | - encoder.call_with_body_stream((method, ) + args, body_stream) |
61 | - else: |
62 | - encoder.call(method, *args) |
63 | - return response_handler |
64 | - |
65 | - def _run_call_hooks(self, method, args, body, readv_body): |
66 | - if not _SmartClient.hooks['call']: |
67 | - return |
68 | - params = CallHookParams(method, args, body, readv_body, self._medium) |
69 | - for hook in _SmartClient.hooks['call']: |
70 | - hook(params) |
71 | - |
72 | def _call_and_read_response(self, method, args, body=None, readv_body=None, |
73 | body_stream=None, expect_response_body=True): |
74 | - self._run_call_hooks(method, args, body, readv_body) |
75 | - if self._medium._protocol_version is not None: |
76 | - response_handler = self._send_request( |
77 | - self._medium._protocol_version, method, args, body=body, |
78 | - readv_body=readv_body, body_stream=body_stream) |
79 | - return (response_handler.read_response_tuple( |
80 | - expect_body=expect_response_body), |
81 | - response_handler) |
82 | - else: |
83 | - for protocol_version in [3, 2]: |
84 | - if protocol_version == 2: |
85 | - # If v3 doesn't work, the remote side is older than 1.6. |
86 | - self._medium._remember_remote_is_before((1, 6)) |
87 | - response_handler = self._send_request( |
88 | - protocol_version, method, args, body=body, |
89 | - readv_body=readv_body, body_stream=body_stream) |
90 | - try: |
91 | - response_tuple = response_handler.read_response_tuple( |
92 | - expect_body=expect_response_body) |
93 | - except errors.UnexpectedProtocolVersionMarker, err: |
94 | - # TODO: We could recover from this without disconnecting if |
95 | - # we recognise the protocol version. |
96 | - warning( |
97 | - 'Server does not understand Bazaar network protocol %d,' |
98 | - ' reconnecting. (Upgrade the server to avoid this.)' |
99 | - % (protocol_version,)) |
100 | - self._medium.disconnect() |
101 | - continue |
102 | - except errors.ErrorFromSmartServer: |
103 | - # If we received an error reply from the server, then it |
104 | - # must be ok with this protocol version. |
105 | - self._medium._protocol_version = protocol_version |
106 | - raise |
107 | - else: |
108 | - self._medium._protocol_version = protocol_version |
109 | - return response_tuple, response_handler |
110 | - raise errors.SmartProtocolError( |
111 | - 'Server is not a Bazaar server: ' + str(err)) |
112 | - |
113 | - def _construct_protocol(self, version): |
114 | - request = self._medium.get_request() |
115 | - if version == 3: |
116 | - request_encoder = protocol.ProtocolThreeRequester(request) |
117 | - response_handler = message.ConventionalResponseHandler() |
118 | - response_proto = protocol.ProtocolThreeDecoder( |
119 | - response_handler, expect_version_marker=True) |
120 | - response_handler.setProtoAndMediumRequest(response_proto, request) |
121 | - elif version == 2: |
122 | - request_encoder = protocol.SmartClientRequestProtocolTwo(request) |
123 | - response_handler = request_encoder |
124 | - else: |
125 | - request_encoder = protocol.SmartClientRequestProtocolOne(request) |
126 | - response_handler = request_encoder |
127 | - return request_encoder, response_handler |
128 | + request = _SmartClientRequest(self, method, args, body=body, |
129 | + readv_body=readv_body, body_stream=body_stream, |
130 | + expect_response_body=expect_response_body) |
131 | + return request.call_and_read_response() |
132 | |
133 | def call(self, method, *args): |
134 | """Call a method on the remote server.""" |
135 | @@ -191,6 +116,203 @@ |
136 | return self._medium.remote_path_from_transport(transport) |
137 | |
138 | |
139 | +class _SmartClientRequest(object): |
140 | + """Encapsulate the logic for a single request. |
141 | + |
142 | + This class handles things like reconnecting and sending the request a |
143 | + second time when the connection is reset in the middle. It also handles the |
144 | + multiple requests that get made if we don't know what protocol the server |
145 | + supports yet. |
146 | + |
147 | + Generally, you build up one of these objects, passing in the arguments that |
148 | + you want to send to the server, and then use 'call_and_read_response' to |
149 | + get the response from the server. |
150 | + """ |
151 | + |
152 | + def __init__(self, client, method, args, body=None, readv_body=None, |
153 | + body_stream=None, expect_response_body=True): |
154 | + self.client = client |
155 | + self.method = method |
156 | + self.args = args |
157 | + self.body = body |
158 | + self.readv_body = readv_body |
159 | + self.body_stream = body_stream |
160 | + self.expect_response_body = expect_response_body |
161 | + |
162 | + def call_and_read_response(self): |
163 | + """Send the request to the server, and read the initial response. |
164 | + |
165 | + This doesn't read all of the body content of the response, instead it |
166 | + returns (response_tuple, response_handler). response_tuple is the 'ok', |
167 | + or 'error' information, and 'response_handler' can be used to get the |
168 | + content stream out. |
169 | + """ |
170 | + self._run_call_hooks() |
171 | + protocol_version = self.client._medium._protocol_version |
172 | + if protocol_version is None: |
173 | + return self._call_determining_protocol_version() |
174 | + else: |
175 | + return self._call(protocol_version) |
176 | + |
177 | + def _is_safe_to_send_twice(self): |
178 | + """Check if the current method is re-entrant safe.""" |
179 | + if self.body_stream is not None or 'noretry' in debug.debug_flags: |
180 | + # We can't restart a body stream that has already been consumed. |
181 | + return False |
182 | + request_type = _mod_request.request_handlers.get_info(self.method) |
183 | + if request_type in ('read', 'idem', 'semi'): |
184 | + return True |
185 | + # If we have gotten this far, 'stream' cannot be retried, because we |
186 | + # already consumed the local stream. |
187 | + if request_type in ('semivfs', 'mutate', 'stream'): |
188 | + return False |
189 | + trace.mutter('Unknown request type: %s for method %s' |
190 | + % (request_type, self.method)) |
191 | + return False |
192 | + |
193 | + def _run_call_hooks(self): |
194 | + if not _SmartClient.hooks['call']: |
195 | + return |
196 | + params = CallHookParams(self.method, self.args, self.body, |
197 | + self.readv_body, self.client._medium) |
198 | + for hook in _SmartClient.hooks['call']: |
199 | + hook(params) |
200 | + |
201 | + def _call(self, protocol_version): |
202 | + """We know the protocol version. |
203 | + |
204 | + So this just sends the request, and then reads the response. This is |
205 | + where the code will be to retry requests if the connection is closed. |
206 | + """ |
207 | + response_handler = self._send(protocol_version) |
208 | + try: |
209 | + response_tuple = response_handler.read_response_tuple( |
210 | + expect_body=self.expect_response_body) |
211 | + except errors.ConnectionReset, e: |
212 | + self.client._medium.reset() |
213 | + if not self._is_safe_to_send_twice(): |
214 | + raise |
215 | + trace.warning('ConnectionReset reading response for %r, retrying' |
216 | + % (self.method,)) |
217 | + trace.log_exception_quietly() |
218 | + encoder, response_handler = self._construct_protocol( |
219 | + protocol_version) |
220 | + self._send_no_retry(encoder) |
221 | + response_tuple = response_handler.read_response_tuple( |
222 | + expect_body=self.expect_response_body) |
223 | + return (response_tuple, response_handler) |
224 | + |
225 | + def _call_determining_protocol_version(self): |
226 | + """Determine what protocol the remote server supports. |
227 | + |
228 | + We do this by placing a request in the most recent protocol, and |
229 | + handling the UnexpectedProtocolVersionMarker from the server. |
230 | + """ |
231 | + for protocol_version in [3, 2]: |
232 | + if protocol_version == 2: |
233 | + # If v3 doesn't work, the remote side is older than 1.6. |
234 | + self.client._medium._remember_remote_is_before((1, 6)) |
235 | + try: |
236 | + response_tuple, response_handler = self._call(protocol_version) |
237 | + except errors.UnexpectedProtocolVersionMarker, err: |
238 | + # TODO: We could recover from this without disconnecting if |
239 | + # we recognise the protocol version. |
240 | + trace.warning( |
241 | + 'Server does not understand Bazaar network protocol %d,' |
242 | + ' reconnecting. (Upgrade the server to avoid this.)' |
243 | + % (protocol_version,)) |
244 | + self.client._medium.disconnect() |
245 | + continue |
246 | + except errors.ErrorFromSmartServer: |
247 | + # If we received an error reply from the server, then it |
248 | + # must be ok with this protocol version. |
249 | + self.client._medium._protocol_version = protocol_version |
250 | + raise |
251 | + else: |
252 | + self.client._medium._protocol_version = protocol_version |
253 | + return response_tuple, response_handler |
254 | + raise errors.SmartProtocolError( |
255 | + 'Server is not a Bazaar server: ' + str(err)) |
256 | + |
257 | + def _construct_protocol(self, version): |
258 | + """Build the encoding stack for a given protocol version.""" |
259 | + request = self.client._medium.get_request() |
260 | + if version == 3: |
261 | + request_encoder = protocol.ProtocolThreeRequester(request) |
262 | + response_handler = message.ConventionalResponseHandler() |
263 | + response_proto = protocol.ProtocolThreeDecoder( |
264 | + response_handler, expect_version_marker=True) |
265 | + response_handler.setProtoAndMediumRequest(response_proto, request) |
266 | + elif version == 2: |
267 | + request_encoder = protocol.SmartClientRequestProtocolTwo(request) |
268 | + response_handler = request_encoder |
269 | + else: |
270 | + request_encoder = protocol.SmartClientRequestProtocolOne(request) |
271 | + response_handler = request_encoder |
272 | + return request_encoder, response_handler |
273 | + |
274 | + def _send(self, protocol_version): |
275 | + """Encode the request, and send it to the server. |
276 | + |
277 | + This will retry a request if we get a ConnectionReset while sending the |
278 | + request to the server. (Unless we have a body_stream that we have |
279 | + already started consuming, since we can't restart body_streams) |
280 | + |
281 | + :return: response_handler as defined by _construct_protocol |
282 | + """ |
283 | + encoder, response_handler = self._construct_protocol(protocol_version) |
284 | + try: |
285 | + self._send_no_retry(encoder) |
286 | + except errors.ConnectionReset, e: |
287 | + # If we fail during the _send_no_retry phase, then we can |
288 | + # be confident that the server did not get our request, because we |
289 | + # haven't started waiting for the reply yet. So try the request |
290 | + # again. We only issue a single retry, because if the connection |
291 | + # really is down, there is no reason to loop endlessly. |
292 | + |
293 | + # Connection is dead, so close our end of it. |
294 | + self.client._medium.reset() |
295 | + if (('noretry' in debug.debug_flags) |
296 | + or (self.body_stream is not None |
297 | + and encoder.body_stream_started)): |
298 | + # We can't restart a body_stream that has been partially |
299 | + # consumed, so we don't retry. |
300 | + # Note: We don't have to worry about |
301 | + # SmartClientRequestProtocolOne or Two, because they don't |
302 | + # support client-side body streams. |
303 | + raise |
304 | + trace.warning('ConnectionReset calling %r, retrying' |
305 | + % (self.method,)) |
306 | + trace.log_exception_quietly() |
307 | + encoder, response_handler = self._construct_protocol( |
308 | + protocol_version) |
309 | + self._send_no_retry(encoder) |
310 | + return response_handler |
311 | + |
312 | + def _send_no_retry(self, encoder): |
313 | + """Just encode the request and try to send it.""" |
314 | + encoder.set_headers(self.client._headers) |
315 | + if self.body is not None: |
316 | + if self.readv_body is not None: |
317 | + raise AssertionError( |
318 | + "body and readv_body are mutually exclusive.") |
319 | + if self.body_stream is not None: |
320 | + raise AssertionError( |
321 | + "body and body_stream are mutually exclusive.") |
322 | + encoder.call_with_body_bytes((self.method, ) + self.args, self.body) |
323 | + elif self.readv_body is not None: |
324 | + if self.body_stream is not None: |
325 | + raise AssertionError( |
326 | + "readv_body and body_stream are mutually exclusive.") |
327 | + encoder.call_with_body_readv_array((self.method, ) + self.args, |
328 | + self.readv_body) |
329 | + elif self.body_stream is not None: |
330 | + encoder.call_with_body_stream((self.method, ) + self.args, |
331 | + self.body_stream) |
332 | + else: |
333 | + encoder.call(self.method, *self.args) |
334 | + |
335 | + |
336 | class SmartClientHooks(hooks.Hooks): |
337 | |
338 | def __init__(self): |
339 | |
340 | === modified file 'bzrlib/smart/medium.py' |
341 | --- bzrlib/smart/medium.py 2011-10-03 08:15:39 +0000 |
342 | +++ bzrlib/smart/medium.py 2011-10-10 15:16:30 +0000 |
343 | @@ -881,6 +881,14 @@ |
344 | """ |
345 | return SmartClientStreamMediumRequest(self) |
346 | |
347 | + def reset(self): |
348 | + """We have been disconnected, reset current state. |
349 | + |
350 | + This resets things like _current_request and connected state. |
351 | + """ |
352 | + self.disconnect() |
353 | + self._current_request = None |
354 | + |
355 | |
356 | class SmartSimplePipesClientMedium(SmartClientStreamMedium): |
357 | """A client medium using simple pipes. |
358 | @@ -895,11 +903,20 @@ |
359 | |
360 | def _accept_bytes(self, bytes): |
361 | """See SmartClientStreamMedium.accept_bytes.""" |
362 | - self._writeable_pipe.write(bytes) |
363 | + try: |
364 | + self._writeable_pipe.write(bytes) |
365 | + except IOError, e: |
366 | + if e.errno in (errno.EINVAL, errno.EPIPE): |
367 | + raise errors.ConnectionReset( |
368 | + "Error trying to write to subprocess:\n%s" % (e,)) |
369 | + raise |
370 | self._report_activity(len(bytes), 'write') |
371 | |
372 | def _flush(self): |
373 | """See SmartClientStreamMedium._flush().""" |
374 | + # Note: If flush were to fail, we'd like to raise ConnectionReset, etc. |
375 | + # However, testing shows that even when the child process is |
376 | + # gone, this doesn't error. |
377 | self._writeable_pipe.flush() |
378 | |
379 | def _read_bytes(self, count): |
380 | @@ -924,8 +941,8 @@ |
381 | |
382 | class SmartSSHClientMedium(SmartClientStreamMedium): |
383 | """A client medium using SSH. |
384 | - |
385 | - It delegates IO to a SmartClientSocketMedium or |
386 | + |
387 | + It delegates IO to a SmartSimplePipesClientMedium or |
388 | SmartClientAlreadyConnectedSocketMedium (depending on platform). |
389 | """ |
390 | |
391 | @@ -1164,5 +1181,3 @@ |
392 | This invokes self._medium._flush to ensure all bytes are transmitted. |
393 | """ |
394 | self._medium._flush() |
395 | - |
396 | - |
397 | |
398 | === modified file 'bzrlib/smart/protocol.py' |
399 | --- bzrlib/smart/protocol.py 2011-05-19 09:32:38 +0000 |
400 | +++ bzrlib/smart/protocol.py 2011-10-10 15:16:30 +0000 |
401 | @@ -1081,9 +1081,6 @@ |
402 | self._real_write_func = write_func |
403 | |
404 | def _write_func(self, bytes): |
405 | - # TODO: It is probably more appropriate to use sum(map(len, _buf)) |
406 | - # for total number of bytes to write, rather than buffer based on |
407 | - # the number of write() calls |
408 | # TODO: Another possibility would be to turn this into an async model. |
409 | # Where we let another thread know that we have some bytes if |
410 | # they want it, but we don't actually block for it |
411 | @@ -1292,6 +1289,7 @@ |
412 | _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes) |
413 | self._medium_request = medium_request |
414 | self._headers = {} |
415 | + self.body_stream_started = None |
416 | |
417 | def set_headers(self, headers): |
418 | self._headers = headers.copy() |
419 | @@ -1357,6 +1355,7 @@ |
420 | if path is not None: |
421 | mutter(' (to %s)', path) |
422 | self._request_start_time = osutils.timer_func() |
423 | + self.body_stream_started = False |
424 | self._write_protocol_version() |
425 | self._write_headers(self._headers) |
426 | self._write_structure(args) |
427 | @@ -1364,6 +1363,9 @@ |
428 | # have finished sending the stream. We would notice at the end |
429 | # anyway, but if the medium can deliver it early then it's good |
430 | # to short-circuit the whole request... |
431 | + # Provoke any ConnectionReset failures before we start the body stream. |
432 | + self.flush() |
433 | + self.body_stream_started = True |
434 | for exc_info, part in _iter_with_errors(stream): |
435 | if exc_info is not None: |
436 | # Iterating the stream failed. Cleanly abort the request. |
437 | |
438 | === modified file 'bzrlib/smart/request.py' |
439 | --- bzrlib/smart/request.py 2011-05-19 09:32:38 +0000 |
440 | +++ bzrlib/smart/request.py 2011-10-10 15:16:30 +0000 |
441 | @@ -1,4 +1,4 @@ |
442 | -# Copyright (C) 2006-2010 Canonical Ltd |
443 | +# Copyright (C) 2006-2011 Canonical Ltd |
444 | # |
445 | # This program is free software; you can redistribute it and/or modify |
446 | # it under the terms of the GNU General Public License as published by |
447 | @@ -491,157 +491,202 @@ |
448 | return SuccessfulSmartServerResponse((answer,)) |
449 | |
450 | |
451 | +# In the 'info' attribute, we store whether this request is 'safe' to retry if |
452 | +# we get a disconnect while reading the response. It can have the values: |
453 | +# read This is purely a read request, so retrying it is perfectly ok. |
454 | +# idem An idempotent write request. Something like 'put' where if you put |
455 | +# the same bytes twice you end up with the same final bytes. |
456 | +# semi This is a request that isn't strictly idempotent, but doesn't |
457 | +# result in corruption if it is retried. This is for things like |
458 | +# 'lock' and 'unlock'. If you call lock, it updates the disk |
459 | +# structure. If you fail to read the response, you won't be able to |
460 | +# use the lock, because you don't have the lock token. Calling lock |
461 | +# again will fail, because the lock is already taken. However, we |
462 | +# can't tell if the server received our request or not. If it didn't, |
463 | +# then retrying the request is fine, as it will actually do what we |
464 | +# want. If it did, we will interrupt the current operation, but we |
465 | +# are no worse off than interrupting the current operation because of |
466 | +# a ConnectionReset. |
467 | +# semivfs Similar to semi, but specific to a Virtual FileSystem request. |
468 | +# stream This is a request that takes a stream that cannot be restarted if |
469 | +# consumed. This request is 'safe' in that if we determine the |
470 | +# connection is closed before we consume the stream, we can try |
471 | +# again. |
472 | +# mutate State is updated in a way that replaying that request results in a |
473 | +# different state. For example 'append' writes more bytes to a given |
474 | +# file. If append succeeds, it moves the file pointer. |
475 | request_handlers = registry.Registry() |
476 | request_handlers.register_lazy( |
477 | - 'append', 'bzrlib.smart.vfs', 'AppendRequest') |
478 | + 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate') |
479 | request_handlers.register_lazy( |
480 | 'Branch.get_config_file', 'bzrlib.smart.branch', |
481 | - 'SmartServerBranchGetConfigFile') |
482 | + 'SmartServerBranchGetConfigFile', info='read') |
483 | request_handlers.register_lazy( |
484 | - 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent') |
485 | + 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent', |
486 | + info='read') |
487 | request_handlers.register_lazy( |
488 | 'Branch.get_tags_bytes', 'bzrlib.smart.branch', |
489 | - 'SmartServerBranchGetTagsBytes') |
490 | + 'SmartServerBranchGetTagsBytes', info='read') |
491 | request_handlers.register_lazy( |
492 | 'Branch.set_tags_bytes', 'bzrlib.smart.branch', |
493 | - 'SmartServerBranchSetTagsBytes') |
494 | + 'SmartServerBranchSetTagsBytes', info='idem') |
495 | request_handlers.register_lazy( |
496 | 'Branch.heads_to_fetch', 'bzrlib.smart.branch', |
497 | - 'SmartServerBranchHeadsToFetch') |
498 | -request_handlers.register_lazy( |
499 | - 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL') |
500 | -request_handlers.register_lazy( |
501 | - 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo') |
502 | -request_handlers.register_lazy( |
503 | - 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite') |
504 | -request_handlers.register_lazy( 'Branch.revision_history', |
505 | - 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory') |
506 | -request_handlers.register_lazy( 'Branch.set_config_option', |
507 | - 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption') |
508 | -request_handlers.register_lazy( 'Branch.set_config_option_dict', |
509 | - 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict') |
510 | -request_handlers.register_lazy( 'Branch.set_last_revision', |
511 | - 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision') |
512 | + 'SmartServerBranchHeadsToFetch', info='read') |
513 | +request_handlers.register_lazy( |
514 | + 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', |
515 | + 'SmartServerBranchRequestGetStackedOnURL', info='read') |
516 | +request_handlers.register_lazy( |
517 | + 'Branch.last_revision_info', 'bzrlib.smart.branch', |
518 | + 'SmartServerBranchRequestLastRevisionInfo', info='read') |
519 | +request_handlers.register_lazy( |
520 | + 'Branch.lock_write', 'bzrlib.smart.branch', |
521 | + 'SmartServerBranchRequestLockWrite', info='semi') |
522 | +request_handlers.register_lazy( |
523 | + 'Branch.revision_history', 'bzrlib.smart.branch', |
524 | + 'SmartServerRequestRevisionHistory', info='read') |
525 | +request_handlers.register_lazy( |
526 | + 'Branch.set_config_option', 'bzrlib.smart.branch', |
527 | + 'SmartServerBranchRequestSetConfigOption', info='idem') |
528 | +request_handlers.register_lazy( |
529 | + 'Branch.set_config_option_dict', 'bzrlib.smart.branch', |
530 | + 'SmartServerBranchRequestSetConfigOptionDict', info='idem') |
531 | +request_handlers.register_lazy( |
532 | + 'Branch.set_last_revision', 'bzrlib.smart.branch', |
533 | + 'SmartServerBranchRequestSetLastRevision', info='idem') |
534 | request_handlers.register_lazy( |
535 | 'Branch.set_last_revision_info', 'bzrlib.smart.branch', |
536 | - 'SmartServerBranchRequestSetLastRevisionInfo') |
537 | + 'SmartServerBranchRequestSetLastRevisionInfo', info='idem') |
538 | request_handlers.register_lazy( |
539 | 'Branch.set_last_revision_ex', 'bzrlib.smart.branch', |
540 | - 'SmartServerBranchRequestSetLastRevisionEx') |
541 | + 'SmartServerBranchRequestSetLastRevisionEx', info='idem') |
542 | request_handlers.register_lazy( |
543 | 'Branch.set_parent_location', 'bzrlib.smart.branch', |
544 | - 'SmartServerBranchRequestSetParentLocation') |
545 | + 'SmartServerBranchRequestSetParentLocation', info='idem') |
546 | request_handlers.register_lazy( |
547 | - 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock') |
548 | + 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock', |
549 | + info='semi') |
550 | request_handlers.register_lazy( |
551 | 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir', |
552 | - 'SmartServerBzrDirRequestCloningMetaDir') |
553 | + 'SmartServerBzrDirRequestCloningMetaDir', info='read') |
554 | request_handlers.register_lazy( |
555 | 'BzrDir.create_branch', 'bzrlib.smart.bzrdir', |
556 | - 'SmartServerRequestCreateBranch') |
557 | + 'SmartServerRequestCreateBranch', info='semi') |
558 | request_handlers.register_lazy( |
559 | 'BzrDir.create_repository', 'bzrlib.smart.bzrdir', |
560 | - 'SmartServerRequestCreateRepository') |
561 | + 'SmartServerRequestCreateRepository', info='semi') |
562 | request_handlers.register_lazy( |
563 | 'BzrDir.find_repository', 'bzrlib.smart.bzrdir', |
564 | - 'SmartServerRequestFindRepositoryV1') |
565 | + 'SmartServerRequestFindRepositoryV1', info='read') |
566 | request_handlers.register_lazy( |
567 | 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir', |
568 | - 'SmartServerRequestFindRepositoryV2') |
569 | + 'SmartServerRequestFindRepositoryV2', info='read') |
570 | request_handlers.register_lazy( |
571 | 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir', |
572 | - 'SmartServerRequestFindRepositoryV3') |
573 | + 'SmartServerRequestFindRepositoryV3', info='read') |
574 | request_handlers.register_lazy( |
575 | 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir', |
576 | - 'SmartServerBzrDirRequestConfigFile') |
577 | + 'SmartServerBzrDirRequestConfigFile', info='read') |
578 | request_handlers.register_lazy( |
579 | 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir', |
580 | - 'SmartServerRequestInitializeBzrDir') |
581 | + 'SmartServerRequestInitializeBzrDir', info='semi') |
582 | request_handlers.register_lazy( |
583 | 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir', |
584 | - 'SmartServerRequestBzrDirInitializeEx') |
585 | -request_handlers.register_lazy( |
586 | - 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir') |
587 | -request_handlers.register_lazy( |
588 | - 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1') |
589 | + 'SmartServerRequestBzrDirInitializeEx', info='semi') |
590 | +request_handlers.register_lazy( |
591 | + 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir', |
592 | + info='read') |
593 | +request_handlers.register_lazy( |
594 | + 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', |
595 | + 'SmartServerRequestOpenBzrDir_2_1', info='read') |
596 | request_handlers.register_lazy( |
597 | 'BzrDir.open_branch', 'bzrlib.smart.bzrdir', |
598 | - 'SmartServerRequestOpenBranch') |
599 | + 'SmartServerRequestOpenBranch', info='read') |
600 | request_handlers.register_lazy( |
601 | 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir', |
602 | - 'SmartServerRequestOpenBranchV2') |
603 | + 'SmartServerRequestOpenBranchV2', info='read') |
604 | request_handlers.register_lazy( |
605 | 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir', |
606 | - 'SmartServerRequestOpenBranchV3') |
607 | -request_handlers.register_lazy( |
608 | - 'delete', 'bzrlib.smart.vfs', 'DeleteRequest') |
609 | -request_handlers.register_lazy( |
610 | - 'get', 'bzrlib.smart.vfs', 'GetRequest') |
611 | -request_handlers.register_lazy( |
612 | - 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest') |
613 | -request_handlers.register_lazy( |
614 | - 'has', 'bzrlib.smart.vfs', 'HasRequest') |
615 | -request_handlers.register_lazy( |
616 | - 'hello', 'bzrlib.smart.request', 'HelloRequest') |
617 | -request_handlers.register_lazy( |
618 | - 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest') |
619 | -request_handlers.register_lazy( |
620 | - 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest') |
621 | -request_handlers.register_lazy( |
622 | - 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest') |
623 | -request_handlers.register_lazy( |
624 | - 'move', 'bzrlib.smart.vfs', 'MoveRequest') |
625 | -request_handlers.register_lazy( |
626 | - 'put', 'bzrlib.smart.vfs', 'PutRequest') |
627 | -request_handlers.register_lazy( |
628 | - 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest') |
629 | -request_handlers.register_lazy( |
630 | - 'readv', 'bzrlib.smart.vfs', 'ReadvRequest') |
631 | -request_handlers.register_lazy( |
632 | - 'rename', 'bzrlib.smart.vfs', 'RenameRequest') |
633 | + 'SmartServerRequestOpenBranchV3', info='read') |
634 | +request_handlers.register_lazy( |
635 | + 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs') |
636 | +request_handlers.register_lazy( |
637 | + 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read') |
638 | +request_handlers.register_lazy( |
639 | + 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read') |
640 | +request_handlers.register_lazy( |
641 | + 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read') |
642 | +request_handlers.register_lazy( |
643 | + 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read') |
644 | +request_handlers.register_lazy( |
645 | + 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest', |
646 | + info='read') |
647 | +request_handlers.register_lazy( |
648 | + 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read') |
649 | +request_handlers.register_lazy( |
650 | + 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs') |
651 | +request_handlers.register_lazy( |
652 | + 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs') |
653 | +request_handlers.register_lazy( |
654 | + 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem') |
655 | +request_handlers.register_lazy( |
656 | + 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem') |
657 | +request_handlers.register_lazy( |
658 | + 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read') |
659 | +request_handlers.register_lazy( |
660 | + 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs') |
661 | request_handlers.register_lazy( |
662 | 'PackRepository.autopack', 'bzrlib.smart.packrepository', |
663 | - 'SmartServerPackRepositoryAutopack') |
664 | -request_handlers.register_lazy('Repository.gather_stats', |
665 | - 'bzrlib.smart.repository', |
666 | - 'SmartServerRepositoryGatherStats') |
667 | -request_handlers.register_lazy('Repository.get_parent_map', |
668 | - 'bzrlib.smart.repository', |
669 | - 'SmartServerRepositoryGetParentMap') |
670 | -request_handlers.register_lazy( |
671 | - 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph') |
672 | -request_handlers.register_lazy( |
673 | - 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision') |
674 | -request_handlers.register_lazy( |
675 | - 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream') |
676 | -request_handlers.register_lazy( |
677 | - 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19') |
678 | -request_handlers.register_lazy( |
679 | - 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked') |
680 | -request_handlers.register_lazy( |
681 | - 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared') |
682 | -request_handlers.register_lazy( |
683 | - 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite') |
684 | + 'SmartServerPackRepositoryAutopack', info='idem') |
685 | +request_handlers.register_lazy( |
686 | + 'Repository.gather_stats', 'bzrlib.smart.repository', |
687 | + 'SmartServerRepositoryGatherStats', info='read') |
688 | +request_handlers.register_lazy( |
689 | + 'Repository.get_parent_map', 'bzrlib.smart.repository', |
690 | + 'SmartServerRepositoryGetParentMap', info='read') |
691 | +request_handlers.register_lazy( |
692 | + 'Repository.get_revision_graph', 'bzrlib.smart.repository', |
693 | + 'SmartServerRepositoryGetRevisionGraph', info='read') |
694 | +request_handlers.register_lazy( |
695 | + 'Repository.has_revision', 'bzrlib.smart.repository', |
696 | + 'SmartServerRequestHasRevision', info='read') |
697 | +request_handlers.register_lazy( |
698 | + 'Repository.insert_stream', 'bzrlib.smart.repository', |
699 | + 'SmartServerRepositoryInsertStream', info='stream') |
700 | +request_handlers.register_lazy( |
701 | + 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', |
702 | + 'SmartServerRepositoryInsertStream_1_19', info='stream') |
703 | +request_handlers.register_lazy( |
704 | + 'Repository.insert_stream_locked', 'bzrlib.smart.repository', |
705 | + 'SmartServerRepositoryInsertStreamLocked', info='stream') |
706 | +request_handlers.register_lazy( |
707 | + 'Repository.is_shared', 'bzrlib.smart.repository', |
708 | + 'SmartServerRepositoryIsShared', info='read') |
709 | +request_handlers.register_lazy( |
710 | + 'Repository.lock_write', 'bzrlib.smart.repository', |
711 | + 'SmartServerRepositoryLockWrite', info='semi') |
712 | request_handlers.register_lazy( |
713 | 'Repository.set_make_working_trees', 'bzrlib.smart.repository', |
714 | - 'SmartServerRepositorySetMakeWorkingTrees') |
715 | + 'SmartServerRepositorySetMakeWorkingTrees', info='idem') |
716 | request_handlers.register_lazy( |
717 | - 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock') |
718 | + 'Repository.unlock', 'bzrlib.smart.repository', |
719 | + 'SmartServerRepositoryUnlock', info='semi') |
720 | request_handlers.register_lazy( |
721 | 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository', |
722 | - 'SmartServerRepositoryGetRevIdForRevno') |
723 | + 'SmartServerRepositoryGetRevIdForRevno', info='read') |
724 | request_handlers.register_lazy( |
725 | 'Repository.get_stream', 'bzrlib.smart.repository', |
726 | - 'SmartServerRepositoryGetStream') |
727 | + 'SmartServerRepositoryGetStream', info='read') |
728 | request_handlers.register_lazy( |
729 | 'Repository.get_stream_1.19', 'bzrlib.smart.repository', |
730 | - 'SmartServerRepositoryGetStream_1_19') |
731 | + 'SmartServerRepositoryGetStream_1_19', info='read') |
732 | request_handlers.register_lazy( |
733 | 'Repository.tarball', 'bzrlib.smart.repository', |
734 | - 'SmartServerRepositoryTarball') |
735 | -request_handlers.register_lazy( |
736 | - 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest') |
737 | -request_handlers.register_lazy( |
738 | - 'stat', 'bzrlib.smart.vfs', 'StatRequest') |
739 | -request_handlers.register_lazy( |
740 | - 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly') |
741 | + 'SmartServerRepositoryTarball', info='read') |
742 | +request_handlers.register_lazy( |
743 | + 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs') |
744 | +request_handlers.register_lazy( |
745 | + 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read') |
746 | +request_handlers.register_lazy( |
747 | + 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly', |
748 | + info='read') |
749 | |
750 | === modified file 'bzrlib/tests/test_smart.py' |
751 | --- bzrlib/tests/test_smart.py 2011-08-19 22:34:02 +0000 |
752 | +++ bzrlib/tests/test_smart.py 2011-10-10 15:16:30 +0000 |
753 | @@ -1860,8 +1860,11 @@ |
754 | """All registered request_handlers can be found.""" |
755 | # If there's a typo in a register_lazy call, this loop will fail with |
756 | # an AttributeError. |
757 | - for key, item in smart_req.request_handlers.iteritems(): |
758 | - pass |
759 | + for key in smart_req.request_handlers.keys(): |
760 | + try: |
761 | + item = smart_req.request_handlers.get(key) |
762 | + except AttributeError, e: |
763 | + raise AttributeError('failed to get %s: %s' % (key, e)) |
764 | |
765 | def assertHandlerEqual(self, verb, handler): |
766 | self.assertEqual(smart_req.request_handlers.get(verb), handler) |
767 | |
768 | === modified file 'bzrlib/tests/test_smart_request.py' |
769 | --- bzrlib/tests/test_smart_request.py 2011-08-19 22:34:02 +0000 |
770 | +++ bzrlib/tests/test_smart_request.py 2011-10-10 15:16:30 +0000 |
771 | @@ -118,6 +118,16 @@ |
772 | self.assertEqual( |
773 | [[transport]] * 3, handler._command.jail_transports_log) |
774 | |
775 | + def test_all_registered_requests_are_safety_qualified(self): |
776 | + unclassified_requests = [] |
777 | + allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream') |
778 | + for key in request.request_handlers.keys(): |
779 | + info = request.request_handlers.get_info(key) |
780 | + if info is None or info not in allowed_info: |
781 | + unclassified_requests.append(key) |
782 | + if unclassified_requests: |
783 | + self.fail('These requests were not categorized as safe/unsafe' |
784 | + ' to retry: %s' % (unclassified_requests,)) |
785 | |
786 | |
787 | class TestSmartRequestHandlerErrorTranslation(TestCase): |
788 | |
789 | === modified file 'bzrlib/tests/test_smart_transport.py' |
790 | --- bzrlib/tests/test_smart_transport.py 2011-09-23 20:24:51 +0000 |
791 | +++ bzrlib/tests/test_smart_transport.py 2011-10-10 15:16:30 +0000 |
792 | @@ -19,8 +19,10 @@ |
793 | # all of this deals with byte strings so this is safe |
794 | from cStringIO import StringIO |
795 | import doctest |
796 | +import errno |
797 | import os |
798 | import socket |
799 | +import subprocess |
800 | import sys |
801 | import threading |
802 | import time |
803 | @@ -30,6 +32,7 @@ |
804 | import bzrlib |
805 | from bzrlib import ( |
806 | bzrdir, |
807 | + debug, |
808 | errors, |
809 | osutils, |
810 | tests, |
811 | @@ -59,6 +62,14 @@ |
812 | ) |
813 | |
814 | |
815 | +def create_file_pipes(): |
816 | + r, w = os.pipe() |
817 | + # These must be opened without buffering, or we get undefined results |
818 | + rf = os.fdopen(r, 'rb', 0) |
819 | + wf = os.fdopen(w, 'wb', 0) |
820 | + return rf, wf |
821 | + |
822 | + |
823 | def portable_socket_pair(): |
824 | """Return a pair of TCP sockets connected to each other. |
825 | |
826 | @@ -88,6 +99,27 @@ |
827 | return StringIOSSHConnection(self) |
828 | |
829 | |
830 | +class FirstRejectedStringIOSSHVendor(StringIOSSHVendor): |
831 | + """The first connection will be considered closed. |
832 | + |
833 | + The second connection will succeed normally. |
834 | + """ |
835 | + |
836 | + def __init__(self, read_from, write_to, fail_at_write=True): |
837 | + super(FirstRejectedStringIOSSHVendor, self).__init__(read_from, |
838 | + write_to) |
839 | + self.fail_at_write = fail_at_write |
840 | + self._first = True |
841 | + |
842 | + def connect_ssh(self, username, password, host, port, command): |
843 | + self.calls.append(('connect_ssh', username, password, host, port, |
844 | + command)) |
845 | + if self._first: |
846 | + self._first = False |
847 | + return ClosedSSHConnection(self) |
848 | + return StringIOSSHConnection(self) |
849 | + |
850 | + |
851 | class StringIOSSHConnection(ssh.SSHConnection): |
852 | """A SSH connection that uses StringIO to buffer writes and answer reads.""" |
853 | |
854 | @@ -103,6 +135,29 @@ |
855 | return 'pipes', (self.vendor.read_from, self.vendor.write_to) |
856 | |
857 | |
858 | +class ClosedSSHConnection(ssh.SSHConnection): |
859 | + """An SSH connection that just has closed channels.""" |
860 | + |
861 | + def __init__(self, vendor): |
862 | + self.vendor = vendor |
863 | + |
864 | + def close(self): |
865 | + self.vendor.calls.append(('close', )) |
866 | + |
867 | + def get_sock_or_pipes(self): |
868 | + # We create matching pipes, and then close the ssh side |
869 | + bzr_read, ssh_write = create_file_pipes() |
870 | + # We always fail when bzr goes to read |
871 | + ssh_write.close() |
872 | + if self.vendor.fail_at_write: |
873 | + # If set, we'll also fail when bzr goes to write |
874 | + ssh_read, bzr_write = create_file_pipes() |
875 | + ssh_read.close() |
876 | + else: |
877 | + bzr_write = self.vendor.write_to |
878 | + return 'pipes', (bzr_read, bzr_write) |
879 | + |
880 | + |
881 | class _InvalidHostnameFeature(features.Feature): |
882 | """Does 'non_existent.invalid' fail to resolve? |
883 | |
884 | @@ -198,6 +253,91 @@ |
885 | client_medium._accept_bytes('abc') |
886 | self.assertEqual('abc', output.getvalue()) |
887 | |
888 | + def test_simple_pipes__accept_bytes_subprocess_closed(self): |
889 | + # It is unfortunate that we have to use Popen for this. However, |
890 | + # os.pipe() does not behave the same as subprocess.Popen(). |
891 | + # On Windows, if you use os.pipe() and close the write side, |
892 | + # read.read() hangs. On Linux, read.read() returns the empty string. |
893 | + p = subprocess.Popen([sys.executable, '-c', |
894 | + 'import sys\n' |
895 | + 'sys.stdout.write(sys.stdin.read(4))\n' |
896 | + 'sys.stdout.close()\n'], |
897 | + stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
898 | + client_medium = medium.SmartSimplePipesClientMedium( |
899 | + p.stdout, p.stdin, 'base') |
900 | + client_medium._accept_bytes('abc\n') |
901 | + self.assertEqual('abc', client_medium._read_bytes(3)) |
902 | + p.wait() |
903 | + # While writing to the underlying pipe, |
904 | + # Windows py2.6.6 we get IOError(EINVAL) |
905 | + # Lucid py2.6.5, we get IOError(EPIPE) |
906 | + # In both cases, it should be wrapped to ConnectionReset |
907 | + self.assertRaises(errors.ConnectionReset, |
908 | + client_medium._accept_bytes, 'more') |
909 | + |
910 | + def test_simple_pipes__accept_bytes_pipe_closed(self): |
911 | + child_read, client_write = create_file_pipes() |
912 | + client_medium = medium.SmartSimplePipesClientMedium( |
913 | + None, client_write, 'base') |
914 | + client_medium._accept_bytes('abc\n') |
915 | + self.assertEqual('abc\n', child_read.read(4)) |
916 | + # While writing to the underlying pipe, |
917 | + # Windows py2.6.6 we get IOError(EINVAL) |
918 | + # Lucid py2.6.5, we get IOError(EPIPE) |
919 | + # In both cases, it should be wrapped to ConnectionReset |
920 | + child_read.close() |
921 | + self.assertRaises(errors.ConnectionReset, |
922 | + client_medium._accept_bytes, 'more') |
923 | + |
924 | + def test_simple_pipes__flush_pipe_closed(self): |
925 | + child_read, client_write = create_file_pipes() |
926 | + client_medium = medium.SmartSimplePipesClientMedium( |
927 | + None, client_write, 'base') |
928 | + client_medium._accept_bytes('abc\n') |
929 | + child_read.close() |
930 | + # Even though the pipe is closed, flush on the write side seems to be a |
931 | + # no-op, rather than a failure. |
932 | + client_medium._flush() |
933 | + |
934 | + def test_simple_pipes__flush_subprocess_closed(self): |
935 | + p = subprocess.Popen([sys.executable, '-c', |
936 | + 'import sys\n' |
937 | + 'sys.stdout.write(sys.stdin.read(4))\n' |
938 | + 'sys.stdout.close()\n'], |
939 | + stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
940 | + client_medium = medium.SmartSimplePipesClientMedium( |
941 | + p.stdout, p.stdin, 'base') |
942 | + client_medium._accept_bytes('abc\n') |
943 | + p.wait() |
944 | + # Even though the child process is dead, flush seems to be a no-op. |
945 | + client_medium._flush() |
946 | + |
947 | + def test_simple_pipes__read_bytes_pipe_closed(self): |
948 | + child_read, client_write = create_file_pipes() |
949 | + client_medium = medium.SmartSimplePipesClientMedium( |
950 | + child_read, client_write, 'base') |
951 | + client_medium._accept_bytes('abc\n') |
952 | + client_write.close() |
953 | + self.assertEqual('abc\n', client_medium._read_bytes(4)) |
954 | + self.assertEqual('', client_medium._read_bytes(4)) |
955 | + |
956 | + def test_simple_pipes__read_bytes_subprocess_closed(self): |
957 | + p = subprocess.Popen([sys.executable, '-c', |
958 | + 'import sys\n' |
959 | + 'if sys.platform == "win32":\n' |
960 | + ' import msvcrt, os\n' |
961 | + ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n' |
962 | + ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n' |
963 | + 'sys.stdout.write(sys.stdin.read(4))\n' |
964 | + 'sys.stdout.close()\n'], |
965 | + stdout=subprocess.PIPE, stdin=subprocess.PIPE) |
966 | + client_medium = medium.SmartSimplePipesClientMedium( |
967 | + p.stdout, p.stdin, 'base') |
968 | + client_medium._accept_bytes('abc\n') |
969 | + p.wait() |
970 | + self.assertEqual('abc\n', client_medium._read_bytes(4)) |
971 | + self.assertEqual('', client_medium._read_bytes(4)) |
972 | + |
973 | def test_simple_pipes_client_disconnect_does_nothing(self): |
974 | # calling disconnect does nothing. |
975 | input = StringIO() |
976 | @@ -585,6 +725,28 @@ |
977 | request.finished_reading() |
978 | self.assertRaises(errors.ReadingCompleted, request.read_bytes, None) |
979 | |
980 | + def test_reset(self): |
981 | + server_sock, client_sock = portable_socket_pair() |
982 | + # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of |
983 | + # bzr where it exists. |
984 | + client_medium = medium.SmartTCPClientMedium(None, None, None) |
985 | + client_medium._socket = client_sock |
986 | + client_medium._connected = True |
987 | + req = client_medium.get_request() |
988 | + self.assertRaises(errors.TooManyConcurrentRequests, |
989 | + client_medium.get_request) |
990 | + client_medium.reset() |
991 | + # The stream should be reset, marked as disconnected, though ready for |
992 | + # us to make a new request |
993 | + self.assertFalse(client_medium._connected) |
994 | + self.assertIs(None, client_medium._socket) |
995 | + try: |
996 | + self.assertEqual('', client_sock.recv(1)) |
997 | + except socket.error, e: |
998 | + if e.errno not in (errno.EBADF,): |
999 | + raise |
1000 | + req = client_medium.get_request() |
1001 | + |
1002 | |
1003 | class RemoteTransportTests(test_smart.TestCaseWithSmartMedium): |
1004 | |
1005 | @@ -3101,6 +3263,33 @@ |
1006 | 'e', # end |
1007 | output.getvalue()) |
1008 | |
1009 | + def test_records_start_of_body_stream(self): |
1010 | + requester, output = self.make_client_encoder_and_output() |
1011 | + requester.set_headers({}) |
1012 | + in_stream = [False] |
1013 | + def stream_checker(): |
1014 | + self.assertTrue(requester.body_stream_started) |
1015 | + in_stream[0] = True |
1016 | + yield 'content' |
1017 | + flush_called = [] |
1018 | + orig_flush = requester.flush |
1019 | + def tracked_flush(): |
1020 | + flush_called.append(in_stream[0]) |
1021 | + if in_stream[0]: |
1022 | + self.assertTrue(requester.body_stream_started) |
1023 | + else: |
1024 | + self.assertFalse(requester.body_stream_started) |
1025 | + return orig_flush() |
1026 | + requester.flush = tracked_flush |
1027 | + requester.call_with_body_stream(('one arg',), stream_checker()) |
1028 | + self.assertEqual( |
1029 | + 'bzr message 3 (bzr 1.6)\n' # protocol version |
1030 | + '\x00\x00\x00\x02de' # headers |
1031 | + 's\x00\x00\x00\x0bl7:one arge' # args |
1032 | + 'b\x00\x00\x00\x07content' # body |
1033 | + 'e', output.getvalue()) |
1034 | + self.assertEqual([False, True, True], flush_called) |
1035 | + |
1036 | |
1037 | class StubMediumRequest(object): |
1038 | """A stub medium request that tracks the number of times accept_bytes is |
1039 | @@ -3526,6 +3715,193 @@ |
1040 | # encoder. |
1041 | |
1042 | |
1043 | +class Test_SmartClientRequest(tests.TestCase): |
1044 | + |
1045 | + def make_client_with_failing_medium(self, fail_at_write=True, response=''): |
1046 | + response_io = StringIO(response) |
1047 | + output = StringIO() |
1048 | + vendor = FirstRejectedStringIOSSHVendor(response_io, output, |
1049 | + fail_at_write=fail_at_write) |
1050 | + ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass') |
1051 | + client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor) |
1052 | + smart_client = client._SmartClient(client_medium, headers={}) |
1053 | + return output, vendor, smart_client |
1054 | + |
1055 | + def make_response(self, args, body=None, body_stream=None): |
1056 | + response_io = StringIO() |
1057 | + response = _mod_request.SuccessfulSmartServerResponse(args, body=body, |
1058 | + body_stream=body_stream) |
1059 | + responder = protocol.ProtocolThreeResponder(response_io.write) |
1060 | + responder.send_response(response) |
1061 | + return response_io.getvalue() |
1062 | + |
1063 | + def test__call_doesnt_retry_append(self): |
1064 | + response = self.make_response(('appended', '8')) |
1065 | + output, vendor, smart_client = self.make_client_with_failing_medium( |
1066 | + fail_at_write=False, response=response) |
1067 | + smart_request = client._SmartClientRequest(smart_client, 'append', |
1068 | + ('foo', ''), body='content\n') |
1069 | + self.assertRaises(errors.ConnectionReset, smart_request._call, 3) |
1070 | + |
1071 | + def test__call_retries_get_bytes(self): |
1072 | + response = self.make_response(('ok',), 'content\n') |
1073 | + output, vendor, smart_client = self.make_client_with_failing_medium( |
1074 | + fail_at_write=False, response=response) |
1075 | + smart_request = client._SmartClientRequest(smart_client, 'get', |
1076 | + ('foo',)) |
1077 | + response, response_handler = smart_request._call(3) |
1078 | + self.assertEqual(('ok',), response) |
1079 | + self.assertEqual('content\n', response_handler.read_body_bytes()) |
1080 | + |
1081 | + def test__call_noretry_get_bytes(self): |
1082 | + debug.debug_flags.add('noretry') |
1083 | + response = self.make_response(('ok',), 'content\n') |
1084 | + output, vendor, smart_client = self.make_client_with_failing_medium( |
1085 | + fail_at_write=False, response=response) |
1086 | + smart_request = client._SmartClientRequest(smart_client, 'get', |
1087 | + ('foo',)) |
1088 | + self.assertRaises(errors.ConnectionReset, smart_request._call, 3) |
1089 | + |
1090 | + def test__send_no_retry_pipes(self): |
1091 | + client_read, server_write = create_file_pipes() |
1092 | + server_read, client_write = create_file_pipes() |
1093 | + client_medium = medium.SmartSimplePipesClientMedium(client_read, |
1094 | + client_write, base='/') |
1095 | + smart_client = client._SmartClient(client_medium) |
1096 | + smart_request = client._SmartClientRequest(smart_client, |
1097 | + 'hello', ()) |
1098 | + # Close the server side |
1099 | + server_read.close() |
1100 | + encoder, response_handler = smart_request._construct_protocol(3) |
1101 | + self.assertRaises(errors.ConnectionReset, |
1102 | + smart_request._send_no_retry, encoder) |
1103 | + |
1104 | + def test__send_read_response_sockets(self): |
1105 | + listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
1106 | + listen_sock.bind(('127.0.0.1', 0)) |
1107 | + listen_sock.listen(1) |
1108 | + host, port = listen_sock.getsockname() |
1109 | + client_medium = medium.SmartTCPClientMedium(host, port, '/') |
1110 | + client_medium._ensure_connection() |
1111 | + smart_client = client._SmartClient(client_medium) |
1112 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
1113 | + # Accept the connection, but don't actually talk to the client. |
1114 | + server_sock, _ = listen_sock.accept() |
1115 | + server_sock.close() |
1116 | + # Sockets buffer and don't really notice that the server has closed the |
1117 | + # connection until we try to read again. |
1118 | + handler = smart_request._send(3) |
1119 | + self.assertRaises(errors.ConnectionReset, |
1120 | + handler.read_response_tuple, expect_body=False) |
1121 | + |
1122 | + def test__send_retries_on_write(self): |
1123 | + output, vendor, smart_client = self.make_client_with_failing_medium() |
1124 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
1125 | + handler = smart_request._send(3) |
1126 | + self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol |
1127 | + '\x00\x00\x00\x02de' # empty headers |
1128 | + 's\x00\x00\x00\tl5:helloee', |
1129 | + output.getvalue()) |
1130 | + self.assertEqual( |
1131 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1132 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1133 | + ('close',), |
1134 | + ('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1135 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1136 | + ], |
1137 | + vendor.calls) |
1138 | + |
1139 | + def test__send_doesnt_retry_read_failure(self): |
1140 | + output, vendor, smart_client = self.make_client_with_failing_medium( |
1141 | + fail_at_write=False) |
1142 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
1143 | + handler = smart_request._send(3) |
1144 | + self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol |
1145 | + '\x00\x00\x00\x02de' # empty headers |
1146 | + 's\x00\x00\x00\tl5:helloee', |
1147 | + output.getvalue()) |
1148 | + self.assertEqual( |
1149 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1150 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1151 | + ], |
1152 | + vendor.calls) |
1153 | + self.assertRaises(errors.ConnectionReset, handler.read_response_tuple) |
1154 | + |
1155 | + def test__send_request_retries_body_stream_if_not_started(self): |
1156 | + output, vendor, smart_client = self.make_client_with_failing_medium() |
1157 | + smart_request = client._SmartClientRequest(smart_client, 'hello', (), |
1158 | + body_stream=['a', 'b']) |
1159 | + response_handler = smart_request._send(3) |
1160 | + # We connect, get disconnected, and notice before consuming the stream, |
1161 | + # so we try again one time and succeed. |
1162 | + self.assertEqual( |
1163 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1164 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1165 | + ('close',), |
1166 | + ('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1167 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1168 | + ], |
1169 | + vendor.calls) |
1170 | + self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol |
1171 | + '\x00\x00\x00\x02de' # empty headers |
1172 | + 's\x00\x00\x00\tl5:helloe' |
1173 | + 'b\x00\x00\x00\x01a' |
1174 | + 'b\x00\x00\x00\x01b' |
1175 | + 'e', |
1176 | + output.getvalue()) |
1177 | + |
1178 | + def test__send_request_stops_if_body_started(self): |
1179 | + # We intentionally use the python StringIO so that we can subclass it. |
1180 | + from StringIO import StringIO |
1181 | + response = StringIO() |
1182 | + |
1183 | + class FailAfterFirstWrite(StringIO): |
1184 | + """Allow one 'write' call to pass, fail the rest""" |
1185 | + def __init__(self): |
1186 | + StringIO.__init__(self) |
1187 | + self._first = True |
1188 | + |
1189 | + def write(self, s): |
1190 | + if self._first: |
1191 | + self._first = False |
1192 | + return StringIO.write(self, s) |
1193 | + raise IOError(errno.EINVAL, 'invalid file handle') |
1194 | + output = FailAfterFirstWrite() |
1195 | + |
1196 | + vendor = FirstRejectedStringIOSSHVendor(response, output, |
1197 | + fail_at_write=False) |
1198 | + ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass') |
1199 | + client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor) |
1200 | + smart_client = client._SmartClient(client_medium, headers={}) |
1201 | + smart_request = client._SmartClientRequest(smart_client, 'hello', (), |
1202 | + body_stream=['a', 'b']) |
1203 | + self.assertRaises(errors.ConnectionReset, smart_request._send, 3) |
1204 | + # We connect, and manage to get to the point that we start consuming |
1205 | + # the body stream. The next write fails, so we just stop. |
1206 | + self.assertEqual( |
1207 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1208 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1209 | + ('close',), |
1210 | + ], |
1211 | + vendor.calls) |
1212 | + self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol |
1213 | + '\x00\x00\x00\x02de' # empty headers |
1214 | + 's\x00\x00\x00\tl5:helloe', |
1215 | + output.getvalue()) |
1216 | + |
1217 | + def test__send_disabled_retry(self): |
1218 | + debug.debug_flags.add('noretry') |
1219 | + output, vendor, smart_client = self.make_client_with_failing_medium() |
1220 | + smart_request = client._SmartClientRequest(smart_client, 'hello', ()) |
1221 | + self.assertRaises(errors.ConnectionReset, smart_request._send, 3) |
1222 | + self.assertEqual( |
1223 | + [('connect_ssh', 'a user', 'a pass', 'a host', 'a port', |
1224 | + ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']), |
1225 | + ('close',), |
1226 | + ], |
1227 | + vendor.calls) |
1228 | + |
1229 | + |
1230 | class LengthPrefixedBodyDecoder(tests.TestCase): |
1231 | |
1232 | # XXX: TODO: make accept_reading_trailer invoke translate_response or |
1233 | |
1234 | === modified file 'doc/en/release-notes/bzr-2.1.txt' |
1235 | --- doc/en/release-notes/bzr-2.1.txt 2011-08-20 09:28:27 +0000 |
1236 | +++ doc/en/release-notes/bzr-2.1.txt 2011-10-10 15:16:30 +0000 |
1237 | @@ -43,6 +43,11 @@ |
1238 | |
1239 | (John Arbash Meinel, #609187, #812928) |
1240 | |
1241 | +* Teach the bzr client how to reconnect if we get ``ConnectionReset`` |
1242 | + while making an RPC request. This doesn't handle all possible network |
1243 | + disconnects, but it should at least handle when the server is asked to |
1244 | + shutdown gracefully. (John Arbash Meinel, #819604) |
1245 | + |
1246 | Improvements |
1247 | ************ |
1248 | |
1249 | |
1250 | === modified file 'doc/en/release-notes/bzr-2.2.txt' |
1251 | --- doc/en/release-notes/bzr-2.2.txt 2011-09-09 13:30:12 +0000 |
1252 | +++ doc/en/release-notes/bzr-2.2.txt 2011-10-10 15:16:30 +0000 |
1253 | @@ -19,6 +19,11 @@ |
1254 | Bug Fixes |
1255 | ********* |
1256 | |
1257 | +* Teach the bzr client how to reconnect if we get ``ConnectionReset`` |
1258 | + while making an RPC request. This doesn't handle all possible network |
1259 | + disconnects, but it should at least handle when the server is asked to |
1260 | + shutdown gracefully. (John Arbash Meinel, #819604) |
1261 | + |
1262 | Improvements |
1263 | ************ |
1264 | |
1265 | |
1266 | === modified file 'doc/en/release-notes/bzr-2.3.txt' |
1267 | --- doc/en/release-notes/bzr-2.3.txt 2011-09-09 13:30:12 +0000 |
1268 | +++ doc/en/release-notes/bzr-2.3.txt 2011-10-10 15:16:30 +0000 |
1269 | @@ -35,6 +35,11 @@ |
1270 | * Cope cleanly with buggy HTTP proxies that close the socket in the middle |
1271 | of a multipart response. (Martin Pool, #198646). |
1272 | |
1273 | +* Teach the bzr client how to reconnect if we get ``ConnectionReset`` |
1274 | + while making an RPC request. This doesn't handle all possible network |
1275 | + disconnects, but it should at least handle when the server is asked to |
1276 | + shutdown gracefully. (John Arbash Meinel, #819604) |
1277 | + |
1278 | Documentation |
1279 | ************* |
1280 | |
1281 | |
1282 | === modified file 'doc/en/release-notes/bzr-2.4.txt' |
1283 | --- doc/en/release-notes/bzr-2.4.txt 2011-10-10 13:59:22 +0000 |
1284 | +++ doc/en/release-notes/bzr-2.4.txt 2011-10-10 15:16:30 +0000 |
1285 | @@ -32,8 +32,8 @@ |
1286 | Bug Fixes |
1287 | ********* |
1288 | |
1289 | -* Fixed an infinite loop when creating a repo at the root of the filesystem, |
1290 | - i.e. "/", due to posixpath.normpath() not collapsing 2 leading slashes into |
1291 | +* Fixed an infinite loop when creating a repo at the root of the filesystem, |
1292 | + i.e. "/", due to posixpath.normpath() not collapsing 2 leading slashes into |
1293 | one, thus respecting the POSIX standard, but making relpath() loop infinitely. |
1294 | (Florian Vichot, #861008) |
1295 | |
1296 | @@ -50,7 +50,12 @@ |
1297 | * Return early from create_delta_index_from_delta given tiny inputs. This |
1298 | avoids raising a spurious MemoryError on certain platforms such as AIX. |
1299 | (John Arbash Meinel, #856731) |
1300 | - |
1301 | + |
1302 | +* Teach the bzr client how to reconnect if we get ``ConnectionReset`` |
1303 | + while making an RPC request. This doesn't handle all possible network |
1304 | + disconnects, but it should at least handle when the server is asked to |
1305 | + shutdown gracefully. (John Arbash Meinel, #819604) |
1306 | + |
1307 | Documentation |
1308 | ************* |
1309 | |
1310 | |
1311 | === modified file 'doc/en/release-notes/bzr-2.5.txt' |
1312 | --- doc/en/release-notes/bzr-2.5.txt 2011-10-09 13:52:06 +0000 |
1313 | +++ doc/en/release-notes/bzr-2.5.txt 2011-10-10 15:16:30 +0000 |
1314 | @@ -46,6 +46,11 @@ |
1315 | * ``bzr mkdir --quiet`` now does not print a line for every created |
1316 | directory. (Martin von Gagern, #869915) |
1317 | |
1318 | +* Teach the bzr client how to reconnect if we get ``ConnectionReset`` |
1319 | + while making an RPC request. This doesn't handle all possible network |
1320 | + disconnects, but it should at least handle when the server is asked to |
1321 | + shutdown gracefully. (John Arbash Meinel, #819604) |
1322 | + |
1323 | Documentation |
1324 | ************* |
1325 |
If this is just a merge-up, feel free to just land it. Otherwise I'll
read it later.