Merge lp:~jameinel/bzr/2.2-all-reconnect-819604 into lp:bzr/2.2

Proposed by John A Meinel
Status: Rejected
Rejected by: John A Meinel
Proposed branch: lp:~jameinel/bzr/2.2-all-reconnect-819604
Merge into: lp:bzr/2.2
Diff against target: 1591 lines (+898/-228)
12 files modified
NEWS (+10/-0)
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+20/-4)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+46/-6)
bzrlib/smart/protocol.py (+7/-5)
bzrlib/smart/request.py (+145/-100)
bzrlib/tests/test_bundle.py (+6/-3)
bzrlib/tests/test_osutils.py (+39/-0)
bzrlib/tests/test_smart.py (+5/-2)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+400/-22)
To merge this branch: bzr merge lp:~jameinel/bzr/2.2-all-reconnect-819604
Reviewer Review Type Date Requested Status
John A Meinel Disapprove
Martin Packman (community) Approve
Review via email: mp+124346@code.launchpad.net

Commit message

Teach bzr-2.2 how to reconnect if it gets disconnected in the middle of a discussion. (Bug #819604)

Description of the change

This is a merge-up of my 2.1 patch.
Fortunately, bzr-2.2 added the AlreadyConnected socket medium, so this is almost identical to the 2.5 code. The main difference is still just not having the connection timeout code.

To post a comment you must log in.
Revision history for this message
Martin Packman (gz) wrote :

This is still very scary, but bringing the related code down from later versions to fix this issue seems reasonable enough.

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

The only reason to land in 2.2 was because 2.1 is in the last LTS (Lucid). However, if I'm rejecting 2.1, I'm not going to bother with 2.2 or 2.3, instead just start with 2.4+.

review: Disapprove

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2012-03-15 14:37:25 +0000
3+++ NEWS 2012-09-14 07:34:20 +0000
4@@ -20,6 +20,11 @@
5 Bug Fixes
6 *********
7
8+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
9+ while making an RPC request. This doesn't handle all possible network
10+ disconnects, but it should at least handle when the server is asked to
11+ shutdown gracefully. (John Arbash Meinel, #819604)
12+
13 Improvements
14 ************
15
16@@ -111,6 +116,11 @@
17
18 (John Arbash Meinel, #609187, #812928)
19
20+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
21+ while making an RPC request. This doesn't handle all possible network
22+ disconnects, but it should at least handle when the server is asked to
23+ shutdown gracefully. (John Arbash Meinel, #819604)
24+
25
26 Improvements
27 ************
28
29=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
30--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
31+++ bzrlib/help_topics/en/debug-flags.txt 2012-09-14 07:34:20 +0000
32@@ -24,6 +24,8 @@
33 -Dindex Trace major index operations.
34 -Dknit Trace knit operations.
35 -Dlock Trace when lockdir locks are taken or released.
36+-Dnoretry If a connection is reset, fail immediately rather than
37+ retrying the request.
38 -Dprogress Trace progress bar operations.
39 -Dmerge Emit information for debugging merges.
40 -Dno_apport Don't use apport to report crashes.
41
42=== modified file 'bzrlib/osutils.py'
43--- bzrlib/osutils.py 2010-07-09 16:16:11 +0000
44+++ bzrlib/osutils.py 2012-09-14 07:34:20 +0000
45@@ -1,4 +1,4 @@
46-# Copyright (C) 2005-2010 Canonical Ltd
47+# Copyright (C) 2005-2012 Canonical Ltd
48 #
49 # This program is free software; you can redistribute it and/or modify
50 # it under the terms of the GNU General Public License as published by
51@@ -32,6 +32,7 @@
52 # and need the former on windows
53 import shutil
54 from shutil import rmtree
55+import signal
56 import socket
57 import subprocess
58 # We need to import both tempfile and mkdtemp as we export the later on posix
59@@ -1993,6 +1994,14 @@
60 # data at once.
61 MAX_SOCKET_CHUNK = 64 * 1024
62
63+_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL]
64+for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:
65+ _eno = getattr(errno, _eno, None)
66+ if _eno is not None:
67+ _end_of_stream_errors.append(_eno)
68+del _eno
69+
70+
71 def read_bytes_from_socket(sock, report_activity=None,
72 max_read_size=MAX_SOCKET_CHUNK):
73 """Read up to max_read_size of bytes from sock and notify of progress.
74@@ -2006,7 +2015,7 @@
75 bytes = sock.recv(max_read_size)
76 except socket.error, e:
77 eno = e.args[0]
78- if eno == getattr(errno, "WSAECONNRESET", errno.ECONNRESET):
79+ if eno in _end_of_stream_errors:
80 # The connection was closed by the other side. Callers expect
81 # an empty string to signal end-of-stream.
82 return ""
83@@ -2057,12 +2066,19 @@
84 while sent_total < byte_count:
85 try:
86 sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))
87- except socket.error, e:
88+ except (socket.error, IOError), e:
89+ if e.args[0] in _end_of_stream_errors:
90+ raise errors.ConnectionReset(
91+ "Error trying to write to socket", e)
92 if e.args[0] != errno.EINTR:
93 raise
94 else:
95+ if sent == 0:
96+ raise errors.ConnectionReset('Sending to %s returned 0 bytes'
97+ % (sock,))
98 sent_total += sent
99- report_activity(sent, 'write')
100+ if report_activity is not None:
101+ report_activity(sent, 'write')
102
103
104 def dereference_path(path):
105
106=== modified file 'bzrlib/smart/client.py'
107--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
108+++ bzrlib/smart/client.py 2012-09-14 07:34:20 +0000
109@@ -14,12 +14,18 @@
110 # along with this program; if not, write to the Free Software
111 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
112
113+from bzrlib import lazy_import
114+lazy_import.lazy_import(globals(), """
115+from bzrlib.smart import request as _mod_request
116+""")
117+
118 import bzrlib
119 from bzrlib.smart import message, protocol
120-from bzrlib.trace import warning
121 from bzrlib import (
122+ debug,
123 errors,
124 hooks,
125+ trace,
126 )
127
128
129@@ -39,93 +45,12 @@
130 def __repr__(self):
131 return '%s(%r)' % (self.__class__.__name__, self._medium)
132
133- def _send_request(self, protocol_version, method, args, body=None,
134- readv_body=None, body_stream=None):
135- encoder, response_handler = self._construct_protocol(
136- protocol_version)
137- encoder.set_headers(self._headers)
138- if body is not None:
139- if readv_body is not None:
140- raise AssertionError(
141- "body and readv_body are mutually exclusive.")
142- if body_stream is not None:
143- raise AssertionError(
144- "body and body_stream are mutually exclusive.")
145- encoder.call_with_body_bytes((method, ) + args, body)
146- elif readv_body is not None:
147- if body_stream is not None:
148- raise AssertionError(
149- "readv_body and body_stream are mutually exclusive.")
150- encoder.call_with_body_readv_array((method, ) + args, readv_body)
151- elif body_stream is not None:
152- encoder.call_with_body_stream((method, ) + args, body_stream)
153- else:
154- encoder.call(method, *args)
155- return response_handler
156-
157- def _run_call_hooks(self, method, args, body, readv_body):
158- if not _SmartClient.hooks['call']:
159- return
160- params = CallHookParams(method, args, body, readv_body, self._medium)
161- for hook in _SmartClient.hooks['call']:
162- hook(params)
163-
164 def _call_and_read_response(self, method, args, body=None, readv_body=None,
165 body_stream=None, expect_response_body=True):
166- self._run_call_hooks(method, args, body, readv_body)
167- if self._medium._protocol_version is not None:
168- response_handler = self._send_request(
169- self._medium._protocol_version, method, args, body=body,
170- readv_body=readv_body, body_stream=body_stream)
171- return (response_handler.read_response_tuple(
172- expect_body=expect_response_body),
173- response_handler)
174- else:
175- for protocol_version in [3, 2]:
176- if protocol_version == 2:
177- # If v3 doesn't work, the remote side is older than 1.6.
178- self._medium._remember_remote_is_before((1, 6))
179- response_handler = self._send_request(
180- protocol_version, method, args, body=body,
181- readv_body=readv_body, body_stream=body_stream)
182- try:
183- response_tuple = response_handler.read_response_tuple(
184- expect_body=expect_response_body)
185- except errors.UnexpectedProtocolVersionMarker, err:
186- # TODO: We could recover from this without disconnecting if
187- # we recognise the protocol version.
188- warning(
189- 'Server does not understand Bazaar network protocol %d,'
190- ' reconnecting. (Upgrade the server to avoid this.)'
191- % (protocol_version,))
192- self._medium.disconnect()
193- continue
194- except errors.ErrorFromSmartServer:
195- # If we received an error reply from the server, then it
196- # must be ok with this protocol version.
197- self._medium._protocol_version = protocol_version
198- raise
199- else:
200- self._medium._protocol_version = protocol_version
201- return response_tuple, response_handler
202- raise errors.SmartProtocolError(
203- 'Server is not a Bazaar server: ' + str(err))
204-
205- def _construct_protocol(self, version):
206- request = self._medium.get_request()
207- if version == 3:
208- request_encoder = protocol.ProtocolThreeRequester(request)
209- response_handler = message.ConventionalResponseHandler()
210- response_proto = protocol.ProtocolThreeDecoder(
211- response_handler, expect_version_marker=True)
212- response_handler.setProtoAndMediumRequest(response_proto, request)
213- elif version == 2:
214- request_encoder = protocol.SmartClientRequestProtocolTwo(request)
215- response_handler = request_encoder
216- else:
217- request_encoder = protocol.SmartClientRequestProtocolOne(request)
218- response_handler = request_encoder
219- return request_encoder, response_handler
220+ request = _SmartClientRequest(self, method, args, body=body,
221+ readv_body=readv_body, body_stream=body_stream,
222+ expect_response_body=expect_response_body)
223+ return request.call_and_read_response()
224
225 def call(self, method, *args):
226 """Call a method on the remote server."""
227@@ -191,6 +116,203 @@
228 return self._medium.remote_path_from_transport(transport)
229
230
231+class _SmartClientRequest(object):
232+ """Encapsulate the logic for a single request.
233+
234+ This class handles things like reconnecting and sending the request a
235+ second time when the connection is reset in the middle. It also handles the
236+ multiple requests that get made if we don't know what protocol the server
237+ supports yet.
238+
239+ Generally, you build up one of these objects, passing in the arguments that
240+ you want to send to the server, and then use 'call_and_read_response' to
241+ get the response from the server.
242+ """
243+
244+ def __init__(self, client, method, args, body=None, readv_body=None,
245+ body_stream=None, expect_response_body=True):
246+ self.client = client
247+ self.method = method
248+ self.args = args
249+ self.body = body
250+ self.readv_body = readv_body
251+ self.body_stream = body_stream
252+ self.expect_response_body = expect_response_body
253+
254+ def call_and_read_response(self):
255+ """Send the request to the server, and read the initial response.
256+
257+ This doesn't read all of the body content of the response, instead it
258+ returns (response_tuple, response_handler). response_tuple is the 'ok',
259+ or 'error' information, and 'response_handler' can be used to get the
260+ content stream out.
261+ """
262+ self._run_call_hooks()
263+ protocol_version = self.client._medium._protocol_version
264+ if protocol_version is None:
265+ return self._call_determining_protocol_version()
266+ else:
267+ return self._call(protocol_version)
268+
269+ def _is_safe_to_send_twice(self):
270+ """Check if the current method is re-entrant safe."""
271+ if self.body_stream is not None or 'noretry' in debug.debug_flags:
272+ # We can't restart a body stream that has already been consumed.
273+ return False
274+ request_type = _mod_request.request_handlers.get_info(self.method)
275+ if request_type in ('read', 'idem', 'semi'):
276+ return True
277+ # If we have gotten this far, 'stream' cannot be retried, because we
278+ # already consumed the local stream.
279+ if request_type in ('semivfs', 'mutate', 'stream'):
280+ return False
281+ trace.mutter('Unknown request type: %s for method %s'
282+ % (request_type, self.method))
283+ return False
284+
285+ def _run_call_hooks(self):
286+ if not _SmartClient.hooks['call']:
287+ return
288+ params = CallHookParams(self.method, self.args, self.body,
289+ self.readv_body, self.client._medium)
290+ for hook in _SmartClient.hooks['call']:
291+ hook(params)
292+
293+ def _call(self, protocol_version):
294+ """We know the protocol version.
295+
296+ So this just sends the request, and then reads the response. This is
297+ where the code will be to retry requests if the connection is closed.
298+ """
299+ response_handler = self._send(protocol_version)
300+ try:
301+ response_tuple = response_handler.read_response_tuple(
302+ expect_body=self.expect_response_body)
303+ except errors.ConnectionReset, e:
304+ self.client._medium.reset()
305+ if not self._is_safe_to_send_twice():
306+ raise
307+ trace.warning('ConnectionReset reading response for %r, retrying'
308+ % (self.method,))
309+ trace.log_exception_quietly()
310+ encoder, response_handler = self._construct_protocol(
311+ protocol_version)
312+ self._send_no_retry(encoder)
313+ response_tuple = response_handler.read_response_tuple(
314+ expect_body=self.expect_response_body)
315+ return (response_tuple, response_handler)
316+
317+ def _call_determining_protocol_version(self):
318+ """Determine what protocol the remote server supports.
319+
320+ We do this by placing a request in the most recent protocol, and
321+ handling the UnexpectedProtocolVersionMarker from the server.
322+ """
323+ for protocol_version in [3, 2]:
324+ if protocol_version == 2:
325+ # If v3 doesn't work, the remote side is older than 1.6.
326+ self.client._medium._remember_remote_is_before((1, 6))
327+ try:
328+ response_tuple, response_handler = self._call(protocol_version)
329+ except errors.UnexpectedProtocolVersionMarker, err:
330+ # TODO: We could recover from this without disconnecting if
331+ # we recognise the protocol version.
332+ trace.warning(
333+ 'Server does not understand Bazaar network protocol %d,'
334+ ' reconnecting. (Upgrade the server to avoid this.)'
335+ % (protocol_version,))
336+ self.client._medium.disconnect()
337+ continue
338+ except errors.ErrorFromSmartServer:
339+ # If we received an error reply from the server, then it
340+ # must be ok with this protocol version.
341+ self.client._medium._protocol_version = protocol_version
342+ raise
343+ else:
344+ self.client._medium._protocol_version = protocol_version
345+ return response_tuple, response_handler
346+ raise errors.SmartProtocolError(
347+ 'Server is not a Bazaar server: ' + str(err))
348+
349+ def _construct_protocol(self, version):
350+ """Build the encoding stack for a given protocol version."""
351+ request = self.client._medium.get_request()
352+ if version == 3:
353+ request_encoder = protocol.ProtocolThreeRequester(request)
354+ response_handler = message.ConventionalResponseHandler()
355+ response_proto = protocol.ProtocolThreeDecoder(
356+ response_handler, expect_version_marker=True)
357+ response_handler.setProtoAndMediumRequest(response_proto, request)
358+ elif version == 2:
359+ request_encoder = protocol.SmartClientRequestProtocolTwo(request)
360+ response_handler = request_encoder
361+ else:
362+ request_encoder = protocol.SmartClientRequestProtocolOne(request)
363+ response_handler = request_encoder
364+ return request_encoder, response_handler
365+
366+ def _send(self, protocol_version):
367+ """Encode the request, and send it to the server.
368+
369+ This will retry a request if we get a ConnectionReset while sending the
370+ request to the server. (Unless we have a body_stream that we have
371+ already started consuming, since we can't restart body_streams)
372+
373+ :return: response_handler as defined by _construct_protocol
374+ """
375+ encoder, response_handler = self._construct_protocol(protocol_version)
376+ try:
377+ self._send_no_retry(encoder)
378+ except errors.ConnectionReset, e:
379+ # If we fail during the _send_no_retry phase, then we can
380+ # be confident that the server did not get our request, because we
381+ # haven't started waiting for the reply yet. So try the request
382+ # again. We only issue a single retry, because if the connection
383+ # really is down, there is no reason to loop endlessly.
384+
385+ # Connection is dead, so close our end of it.
386+ self.client._medium.reset()
387+ if (('noretry' in debug.debug_flags)
388+ or (self.body_stream is not None
389+ and encoder.body_stream_started)):
390+ # We can't restart a body_stream that has been partially
391+ # consumed, so we don't retry.
392+ # Note: We don't have to worry about
393+ # SmartClientRequestProtocolOne or Two, because they don't
394+ # support client-side body streams.
395+ raise
396+ trace.warning('ConnectionReset calling %r, retrying'
397+ % (self.method,))
398+ trace.log_exception_quietly()
399+ encoder, response_handler = self._construct_protocol(
400+ protocol_version)
401+ self._send_no_retry(encoder)
402+ return response_handler
403+
404+ def _send_no_retry(self, encoder):
405+ """Just encode the request and try to send it."""
406+ encoder.set_headers(self.client._headers)
407+ if self.body is not None:
408+ if self.readv_body is not None:
409+ raise AssertionError(
410+ "body and readv_body are mutually exclusive.")
411+ if self.body_stream is not None:
412+ raise AssertionError(
413+ "body and body_stream are mutually exclusive.")
414+ encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
415+ elif self.readv_body is not None:
416+ if self.body_stream is not None:
417+ raise AssertionError(
418+ "readv_body and body_stream are mutually exclusive.")
419+ encoder.call_with_body_readv_array((self.method, ) + self.args,
420+ self.readv_body)
421+ elif self.body_stream is not None:
422+ encoder.call_with_body_stream((self.method, ) + self.args,
423+ self.body_stream)
424+ else:
425+ encoder.call(self.method, *self.args)
426+
427+
428 class SmartClientHooks(hooks.Hooks):
429
430 def __init__(self):
431
432=== modified file 'bzrlib/smart/medium.py'
433--- bzrlib/smart/medium.py 2010-06-21 08:08:04 +0000
434+++ bzrlib/smart/medium.py 2012-09-14 07:34:20 +0000
435@@ -1,4 +1,4 @@
436-# Copyright (C) 2006-2010 Canonical Ltd
437+# Copyright (C) 2006-2012 Canonical Ltd
438 #
439 # This program is free software; you can redistribute it and/or modify
440 # it under the terms of the GNU General Public License as published by
441@@ -24,6 +24,7 @@
442 bzrlib/transport/smart/__init__.py.
443 """
444
445+import errno
446 import os
447 import sys
448 import urllib
449@@ -176,6 +177,14 @@
450 ui.ui_factory.report_transport_activity(self, bytes, direction)
451
452
453+_bad_file_descriptor = (errno.EBADF,)
454+if sys.platform == 'win32':
455+ # Given on Windows if you pass a closed socket to select.select. Probably
456+ # also given if you pass a file handle to select.
457+ WSAENOTSOCK = 10038
458+ _bad_file_descriptor += (WSAENOTSOCK,)
459+
460+
461 class SmartServerStreamMedium(SmartMedium):
462 """Handles smart commands coming over a stream.
463
464@@ -239,6 +248,8 @@
465
466 :param protocol: a SmartServerRequestProtocol.
467 """
468+ if protocol is None:
469+ return
470 try:
471 self._serve_one_request_unguarded(protocol)
472 except KeyboardInterrupt:
473@@ -710,6 +721,14 @@
474 """
475 return SmartClientStreamMediumRequest(self)
476
477+ def reset(self):
478+ """We have been disconnected, reset current state.
479+
480+ This resets things like _current_request and connected state.
481+ """
482+ self.disconnect()
483+ self._current_request = None
484+
485
486 class SmartSimplePipesClientMedium(SmartClientStreamMedium):
487 """A client medium using simple pipes.
488@@ -724,11 +743,20 @@
489
490 def _accept_bytes(self, bytes):
491 """See SmartClientStreamMedium.accept_bytes."""
492- self._writeable_pipe.write(bytes)
493+ try:
494+ self._writeable_pipe.write(bytes)
495+ except IOError, e:
496+ if e.errno in (errno.EINVAL, errno.EPIPE):
497+ raise errors.ConnectionReset(
498+ "Error trying to write to subprocess", e)
499+ raise
500 self._report_activity(len(bytes), 'write')
501
502 def _flush(self):
503 """See SmartClientStreamMedium._flush()."""
504+ # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
505+ # However, testing shows that even when the child process is
506+ # gone, this doesn't error.
507 self._writeable_pipe.flush()
508
509 def _read_bytes(self, count):
510@@ -753,8 +781,8 @@
511
512 class SmartSSHClientMedium(SmartClientStreamMedium):
513 """A client medium using SSH.
514-
515- It delegates IO to a SmartClientSocketMedium or
516+
517+ It delegates IO to a SmartSimplePipesClientMedium or
518 SmartClientAlreadyConnectedSocketMedium (depending on platform).
519 """
520
521@@ -897,6 +925,20 @@
522 SmartClientSocketMedium.__init__(self, base)
523 self._host = host
524 self._port = port
525+ self._socket = None
526+
527+ def _accept_bytes(self, bytes):
528+ """See SmartClientMedium.accept_bytes."""
529+ self._ensure_connection()
530+ osutils.send_all(self._socket, bytes, self._report_activity)
531+
532+ def disconnect(self):
533+ """See SmartClientMedium.disconnect()."""
534+ if not self._connected:
535+ return
536+ self._socket.close()
537+ self._socket = None
538+ self._connected = False
539
540 def _ensure_connection(self):
541 """Connect this medium if not already connected."""
542@@ -993,5 +1035,3 @@
543 This invokes self._medium._flush to ensure all bytes are transmitted.
544 """
545 self._medium._flush()
546-
547-
548
549=== modified file 'bzrlib/smart/protocol.py'
550--- bzrlib/smart/protocol.py 2010-06-11 05:57:09 +0000
551+++ bzrlib/smart/protocol.py 2012-09-14 07:34:20 +0000
552@@ -654,7 +654,7 @@
553 """Make a remote call with a readv array.
554
555 The body is encoded with one line per readv offset pair. The numbers in
556- each pair are separated by a comma, and no trailing \n is emitted.
557+ each pair are separated by a comma, and no trailing \\n is emitted.
558 """
559 if 'hpss' in debug.debug_flags:
560 mutter('hpss call w/readv: %s', repr(args)[1:-1])
561@@ -1081,9 +1081,6 @@
562 self._real_write_func = write_func
563
564 def _write_func(self, bytes):
565- # TODO: It is probably more appropriate to use sum(map(len, _buf))
566- # for total number of bytes to write, rather than buffer based on
567- # the number of write() calls
568 # TODO: Another possibility would be to turn this into an async model.
569 # Where we let another thread know that we have some bytes if
570 # they want it, but we don't actually block for it
571@@ -1292,6 +1289,7 @@
572 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
573 self._medium_request = medium_request
574 self._headers = {}
575+ self.body_stream_started = None
576
577 def set_headers(self, headers):
578 self._headers = headers.copy()
579@@ -1332,7 +1330,7 @@
580 """Make a remote call with a readv array.
581
582 The body is encoded with one line per readv offset pair. The numbers in
583- each pair are separated by a comma, and no trailing \n is emitted.
584+ each pair are separated by a comma, and no trailing \\n is emitted.
585 """
586 if 'hpss' in debug.debug_flags:
587 mutter('hpss call w/readv: %s', repr(args)[1:-1])
588@@ -1357,6 +1355,7 @@
589 if path is not None:
590 mutter(' (to %s)', path)
591 self._request_start_time = osutils.timer_func()
592+ self.body_stream_started = False
593 self._write_protocol_version()
594 self._write_headers(self._headers)
595 self._write_structure(args)
596@@ -1364,6 +1363,9 @@
597 # have finished sending the stream. We would notice at the end
598 # anyway, but if the medium can deliver it early then it's good
599 # to short-circuit the whole request...
600+ # Provoke any ConnectionReset failures before we start the body stream.
601+ self.flush()
602+ self.body_stream_started = True
603 for exc_info, part in _iter_with_errors(stream):
604 if exc_info is not None:
605 # Iterating the stream failed. Cleanly abort the request.
606
607=== modified file 'bzrlib/smart/request.py'
608--- bzrlib/smart/request.py 2010-05-13 16:17:54 +0000
609+++ bzrlib/smart/request.py 2012-09-14 07:34:20 +0000
610@@ -1,4 +1,4 @@
611-# Copyright (C) 2006-2010 Canonical Ltd
612+# Copyright (C) 2006-2012 Canonical Ltd
613 #
614 # This program is free software; you can redistribute it and/or modify
615 # it under the terms of the GNU General Public License as published by
616@@ -134,7 +134,7 @@
617 It will return a SmartServerResponse if the command does not expect a
618 body.
619
620- :param *args: the arguments of the request.
621+ :param args: the arguments of the request.
622 """
623 self._check_enabled()
624 return self.do(*args)
625@@ -486,154 +486,199 @@
626 return SuccessfulSmartServerResponse((answer,))
627
628
629+# In the 'info' attribute, we store whether this request is 'safe' to retry if
630+# we get a disconnect while reading the response. It can have the values:
631+# read This is purely a read request, so retrying it is perfectly ok.
632+# idem An idempotent write request. Something like 'put' where if you put
633+# the same bytes twice you end up with the same final bytes.
634+# semi This is a request that isn't strictly idempotent, but doesn't
635+# result in corruption if it is retried. This is for things like
636+# 'lock' and 'unlock'. If you call lock, it updates the disk
637+# structure. If you fail to read the response, you won't be able to
638+# use the lock, because you don't have the lock token. Calling lock
639+# again will fail, because the lock is already taken. However, we
640+# can't tell if the server received our request or not. If it didn't,
641+# then retrying the request is fine, as it will actually do what we
642+# want. If it did, we will interrupt the current operation, but we
643+# are no worse off than interrupting the current operation because of
644+# a ConnectionReset.
645+# semivfs Similar to semi, but specific to a Virtual FileSystem request.
646+# stream This is a request that takes a stream that cannot be restarted if
647+# consumed. This request is 'safe' in that if we determine the
648+# connection is closed before we consume the stream, we can try
649+# again.
650+# mutate State is updated in a way that replaying that request results in a
651+# different state. For example 'append' writes more bytes to a given
652+# file. If append succeeds, it moves the file pointer.
653 request_handlers = registry.Registry()
654 request_handlers.register_lazy(
655- 'append', 'bzrlib.smart.vfs', 'AppendRequest')
656+ 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
657 request_handlers.register_lazy(
658 'Branch.get_config_file', 'bzrlib.smart.branch',
659- 'SmartServerBranchGetConfigFile')
660+ 'SmartServerBranchGetConfigFile', info='read')
661 request_handlers.register_lazy(
662- 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')
663+ 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
664+ info='read')
665 request_handlers.register_lazy(
666 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
667- 'SmartServerBranchGetTagsBytes')
668+ 'SmartServerBranchGetTagsBytes', info='read')
669 request_handlers.register_lazy(
670 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
671- 'SmartServerBranchSetTagsBytes')
672-request_handlers.register_lazy(
673- 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')
674-request_handlers.register_lazy(
675- 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')
676-request_handlers.register_lazy(
677- 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')
678-request_handlers.register_lazy( 'Branch.revision_history',
679- 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')
680-request_handlers.register_lazy( 'Branch.set_config_option',
681- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')
682-request_handlers.register_lazy( 'Branch.set_config_option_dict',
683- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOptionDict')
684-request_handlers.register_lazy( 'Branch.set_last_revision',
685- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')
686+ 'SmartServerBranchSetTagsBytes', info='idem')
687+request_handlers.register_lazy(
688+ 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
689+ 'SmartServerBranchRequestGetStackedOnURL', info='read')
690+request_handlers.register_lazy(
691+ 'Branch.last_revision_info', 'bzrlib.smart.branch',
692+ 'SmartServerBranchRequestLastRevisionInfo', info='read')
693+request_handlers.register_lazy(
694+ 'Branch.lock_write', 'bzrlib.smart.branch',
695+ 'SmartServerBranchRequestLockWrite', info='semi')
696+request_handlers.register_lazy(
697+ 'Branch.revision_history', 'bzrlib.smart.branch',
698+ 'SmartServerRequestRevisionHistory', info='read')
699+request_handlers.register_lazy(
700+ 'Branch.set_config_option', 'bzrlib.smart.branch',
701+ 'SmartServerBranchRequestSetConfigOption', info='idem')
702+request_handlers.register_lazy(
703+ 'Branch.set_config_option_dict', 'bzrlib.smart.branch',
704+ 'SmartServerBranchRequestSetConfigOptionDict', info='idem')
705+request_handlers.register_lazy(
706+ 'Branch.set_last_revision', 'bzrlib.smart.branch',
707+ 'SmartServerBranchRequestSetLastRevision', info='idem')
708 request_handlers.register_lazy(
709 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
710- 'SmartServerBranchRequestSetLastRevisionInfo')
711+ 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
712 request_handlers.register_lazy(
713 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
714- 'SmartServerBranchRequestSetLastRevisionEx')
715+ 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
716 request_handlers.register_lazy(
717 'Branch.set_parent_location', 'bzrlib.smart.branch',
718- 'SmartServerBranchRequestSetParentLocation')
719+ 'SmartServerBranchRequestSetParentLocation', info='idem')
720 request_handlers.register_lazy(
721- 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')
722+ 'Branch.unlock', 'bzrlib.smart.branch',
723+ 'SmartServerBranchRequestUnlock', info='semi')
724 request_handlers.register_lazy(
725 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
726- 'SmartServerBzrDirRequestCloningMetaDir')
727+ 'SmartServerBzrDirRequestCloningMetaDir', info='read')
728 request_handlers.register_lazy(
729 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
730- 'SmartServerRequestCreateBranch')
731+ 'SmartServerRequestCreateBranch', info='semi')
732 request_handlers.register_lazy(
733 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
734- 'SmartServerRequestCreateRepository')
735+ 'SmartServerRequestCreateRepository', info='semi')
736 request_handlers.register_lazy(
737 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
738- 'SmartServerRequestFindRepositoryV1')
739+ 'SmartServerRequestFindRepositoryV1', info='read')
740 request_handlers.register_lazy(
741 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
742- 'SmartServerRequestFindRepositoryV2')
743+ 'SmartServerRequestFindRepositoryV2', info='read')
744 request_handlers.register_lazy(
745 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
746- 'SmartServerRequestFindRepositoryV3')
747+ 'SmartServerRequestFindRepositoryV3', info='read')
748 request_handlers.register_lazy(
749 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
750- 'SmartServerBzrDirRequestConfigFile')
751+ 'SmartServerBzrDirRequestConfigFile', info='read')
752 request_handlers.register_lazy(
753 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
754- 'SmartServerRequestInitializeBzrDir')
755+ 'SmartServerRequestInitializeBzrDir', info='semi')
756 request_handlers.register_lazy(
757 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
758- 'SmartServerRequestBzrDirInitializeEx')
759-request_handlers.register_lazy(
760- 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')
761-request_handlers.register_lazy(
762- 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')
763+ 'SmartServerRequestBzrDirInitializeEx', info='semi')
764+request_handlers.register_lazy(
765+ 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
766+ info='read')
767+request_handlers.register_lazy(
768+ 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
769+ 'SmartServerRequestOpenBzrDir_2_1', info='read')
770 request_handlers.register_lazy(
771 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
772- 'SmartServerRequestOpenBranch')
773+ 'SmartServerRequestOpenBranch', info='read')
774 request_handlers.register_lazy(
775 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
776- 'SmartServerRequestOpenBranchV2')
777+ 'SmartServerRequestOpenBranchV2', info='read')
778 request_handlers.register_lazy(
779 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
780- 'SmartServerRequestOpenBranchV3')
781-request_handlers.register_lazy(
782- 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')
783-request_handlers.register_lazy(
784- 'get', 'bzrlib.smart.vfs', 'GetRequest')
785-request_handlers.register_lazy(
786- 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')
787-request_handlers.register_lazy(
788- 'has', 'bzrlib.smart.vfs', 'HasRequest')
789-request_handlers.register_lazy(
790- 'hello', 'bzrlib.smart.request', 'HelloRequest')
791-request_handlers.register_lazy(
792- 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')
793-request_handlers.register_lazy(
794- 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')
795-request_handlers.register_lazy(
796- 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')
797-request_handlers.register_lazy(
798- 'move', 'bzrlib.smart.vfs', 'MoveRequest')
799-request_handlers.register_lazy(
800- 'put', 'bzrlib.smart.vfs', 'PutRequest')
801-request_handlers.register_lazy(
802- 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')
803-request_handlers.register_lazy(
804- 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')
805-request_handlers.register_lazy(
806- 'rename', 'bzrlib.smart.vfs', 'RenameRequest')
807+ 'SmartServerRequestOpenBranchV3', info='read')
808+request_handlers.register_lazy(
809+ 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
810+request_handlers.register_lazy(
811+ 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
812+request_handlers.register_lazy(
813+ 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
814+request_handlers.register_lazy(
815+ 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
816+request_handlers.register_lazy(
817+ 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
818+request_handlers.register_lazy(
819+ 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
820+ info='read')
821+request_handlers.register_lazy(
822+ 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
823+request_handlers.register_lazy(
824+ 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
825+request_handlers.register_lazy(
826+ 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
827+request_handlers.register_lazy(
828+ 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
829+request_handlers.register_lazy(
830+ 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
831+request_handlers.register_lazy(
832+ 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
833+request_handlers.register_lazy(
834+ 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
835 request_handlers.register_lazy(
836 'PackRepository.autopack', 'bzrlib.smart.packrepository',
837- 'SmartServerPackRepositoryAutopack')
838-request_handlers.register_lazy('Repository.gather_stats',
839- 'bzrlib.smart.repository',
840- 'SmartServerRepositoryGatherStats')
841-request_handlers.register_lazy('Repository.get_parent_map',
842- 'bzrlib.smart.repository',
843- 'SmartServerRepositoryGetParentMap')
844-request_handlers.register_lazy(
845- 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')
846-request_handlers.register_lazy(
847- 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
848-request_handlers.register_lazy(
849- 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
850-request_handlers.register_lazy(
851- 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')
852-request_handlers.register_lazy(
853- 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')
854-request_handlers.register_lazy(
855- 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
856-request_handlers.register_lazy(
857- 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')
858+ 'SmartServerPackRepositoryAutopack', info='idem')
859+request_handlers.register_lazy(
860+ 'Repository.gather_stats', 'bzrlib.smart.repository',
861+ 'SmartServerRepositoryGatherStats', info='read')
862+request_handlers.register_lazy(
863+ 'Repository.get_parent_map', 'bzrlib.smart.repository',
864+ 'SmartServerRepositoryGetParentMap', info='read')
865+request_handlers.register_lazy(
866+ 'Repository.get_revision_graph', 'bzrlib.smart.repository',
867+ 'SmartServerRepositoryGetRevisionGraph', info='read')
868+request_handlers.register_lazy(
869+ 'Repository.has_revision', 'bzrlib.smart.repository',
870+ 'SmartServerRequestHasRevision', info='read')
871+request_handlers.register_lazy(
872+ 'Repository.insert_stream', 'bzrlib.smart.repository',
873+ 'SmartServerRepositoryInsertStream', info='stream')
874+request_handlers.register_lazy(
875+ 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
876+ 'SmartServerRepositoryInsertStream_1_19', info='stream')
877+request_handlers.register_lazy(
878+ 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
879+ 'SmartServerRepositoryInsertStreamLocked', info='stream')
880+request_handlers.register_lazy(
881+ 'Repository.is_shared', 'bzrlib.smart.repository',
882+ 'SmartServerRepositoryIsShared', info='read')
883+request_handlers.register_lazy(
884+ 'Repository.lock_write', 'bzrlib.smart.repository',
885+ 'SmartServerRepositoryLockWrite', info='semi')
886 request_handlers.register_lazy(
887 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
888- 'SmartServerRepositorySetMakeWorkingTrees')
889+ 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
890 request_handlers.register_lazy(
891- 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')
892+ 'Repository.unlock', 'bzrlib.smart.repository',
893+ 'SmartServerRepositoryUnlock', info='semi')
894 request_handlers.register_lazy(
895 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
896- 'SmartServerRepositoryGetRevIdForRevno')
897+ 'SmartServerRepositoryGetRevIdForRevno', info='read')
898 request_handlers.register_lazy(
899 'Repository.get_stream', 'bzrlib.smart.repository',
900- 'SmartServerRepositoryGetStream')
901+ 'SmartServerRepositoryGetStream', info='read')
902 request_handlers.register_lazy(
903 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
904- 'SmartServerRepositoryGetStream_1_19')
905+ 'SmartServerRepositoryGetStream_1_19', info='read')
906 request_handlers.register_lazy(
907 'Repository.tarball', 'bzrlib.smart.repository',
908- 'SmartServerRepositoryTarball')
909-request_handlers.register_lazy(
910- 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')
911-request_handlers.register_lazy(
912- 'stat', 'bzrlib.smart.vfs', 'StatRequest')
913-request_handlers.register_lazy(
914- 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')
915+ 'SmartServerRepositoryTarball', info='read')
916+request_handlers.register_lazy(
917+ 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
918+request_handlers.register_lazy(
919+ 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
920+request_handlers.register_lazy(
921+ 'Transport.is_readonly', 'bzrlib.smart.request',
922+ 'SmartServerIsReadonly', info='read')
923
924=== modified file 'bzrlib/tests/test_bundle.py'
925--- bzrlib/tests/test_bundle.py 2010-02-23 07:43:11 +0000
926+++ bzrlib/tests/test_bundle.py 2012-09-14 07:34:20 +0000
927@@ -1855,20 +1855,23 @@
928 self.sock.bind(('127.0.0.1', 0))
929 self.sock.listen(1)
930 self.port = self.sock.getsockname()[1]
931+ self.stopping = threading.Event()
932 self.thread = threading.Thread(
933 name='%s (port %d)' % (self.__class__.__name__, self.port),
934 target=self.accept_and_close)
935 self.thread.start()
936
937 def accept_and_close(self):
938- conn, addr = self.sock.accept()
939- conn.shutdown(socket.SHUT_RDWR)
940- conn.close()
941+ while not self.stopping.isSet():
942+ conn, addr = self.sock.accept()
943+ conn.shutdown(socket.SHUT_RDWR)
944+ conn.close()
945
946 def get_url(self):
947 return 'bzr://127.0.0.1:%d/' % (self.port,)
948
949 def stop_server(self):
950+ self.stopping.set()
951 try:
952 # make sure the thread dies by connecting to the listening socket,
953 # just in case the test failed to do so.
954
955=== modified file 'bzrlib/tests/test_osutils.py'
956--- bzrlib/tests/test_osutils.py 2010-12-02 09:23:10 +0000
957+++ bzrlib/tests/test_osutils.py 2012-09-14 07:34:20 +0000
958@@ -804,6 +804,45 @@
959 self.assertEqual(None, osutils.safe_file_id(None))
960
961
962+class TestSendAll(tests.TestCase):
963+
964+ def test_send_with_disconnected_socket(self):
965+ class DisconnectedSocket(object):
966+ def __init__(self, err):
967+ self.err = err
968+ def send(self, content):
969+ raise self.err
970+ def close(self):
971+ pass
972+ # All of these should be treated as ConnectionReset
973+ errs = []
974+ for err_cls in (IOError, socket.error):
975+ for errnum in osutils._end_of_stream_errors:
976+ errs.append(err_cls(errnum))
977+ for err in errs:
978+ sock = DisconnectedSocket(err)
979+ self.assertRaises(errors.ConnectionReset,
980+ osutils.send_all, sock, 'some more content')
981+
982+ def test_send_with_no_progress(self):
983+ # See https://bugs.launchpad.net/bzr/+bug/1047309
984+ # It seems that paramiko can get into a state where it doesn't error,
985+ # but it returns 0 bytes sent for requests over and over again.
986+ class NoSendingSocket(object):
987+ def __init__(self):
988+ self.call_count = 0
989+ def send(self, bytes):
990+ self.call_count += 1
991+ if self.call_count > 100:
992+ # Prevent the test suite from hanging
993+ raise RuntimeError('too many calls')
994+ return 0
995+ sock = NoSendingSocket()
996+ self.assertRaises(errors.ConnectionReset,
997+ osutils.send_all, sock, 'content')
998+ self.assertEqual(1, sock.call_count)
999+
1000+
1001 class TestWin32Funcs(tests.TestCase):
1002 """Test that _win32 versions of os utilities return appropriate paths."""
1003
1004
1005=== modified file 'bzrlib/tests/test_smart.py'
1006--- bzrlib/tests/test_smart.py 2010-05-13 16:17:54 +0000
1007+++ bzrlib/tests/test_smart.py 2012-09-14 07:34:20 +0000
1008@@ -1849,8 +1849,11 @@
1009 """All registered request_handlers can be found."""
1010 # If there's a typo in a register_lazy call, this loop will fail with
1011 # an AttributeError.
1012- for key, item in smart_req.request_handlers.iteritems():
1013- pass
1014+ for key in smart_req.request_handlers.keys():
1015+ try:
1016+ item = smart_req.request_handlers.get(key)
1017+ except AttributeError, e:
1018+ raise AttributeError('failed to get %s: %s' % (key, e))
1019
1020 def assertHandlerEqual(self, verb, handler):
1021 self.assertEqual(smart_req.request_handlers.get(verb), handler)
1022
1023=== modified file 'bzrlib/tests/test_smart_request.py'
1024--- bzrlib/tests/test_smart_request.py 2010-06-20 11:18:38 +0000
1025+++ bzrlib/tests/test_smart_request.py 2012-09-14 07:34:20 +0000
1026@@ -111,6 +111,16 @@
1027 self.assertEqual(
1028 [[transport]] * 3, handler._command.jail_transports_log)
1029
1030+ def test_all_registered_requests_are_safety_qualified(self):
1031+ unclassified_requests = []
1032+ allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
1033+ for key in request.request_handlers.keys():
1034+ info = request.request_handlers.get_info(key)
1035+ if info is None or info not in allowed_info:
1036+ unclassified_requests.append(key)
1037+ if unclassified_requests:
1038+ self.fail('These requests were not categorized as safe/unsafe'
1039+ ' to retry: %s' % (unclassified_requests,))
1040
1041
1042 class TestSmartRequestHandlerErrorTranslation(TestCase):
1043
1044=== modified file 'bzrlib/tests/test_smart_transport.py'
1045--- bzrlib/tests/test_smart_transport.py 2010-06-25 09:56:07 +0000
1046+++ bzrlib/tests/test_smart_transport.py 2012-09-14 07:34:20 +0000
1047@@ -1,4 +1,4 @@
1048-# Copyright (C) 2006-2010 Canonical Ltd
1049+# Copyright (C) 2006-2011 Canonical Ltd
1050 #
1051 # This program is free software; you can redistribute it and/or modify
1052 # it under the terms of the GNU General Public License as published by
1053@@ -18,13 +18,17 @@
1054
1055 # all of this deals with byte strings so this is safe
1056 from cStringIO import StringIO
1057+import errno
1058 import os
1059 import socket
1060+import subprocess
1061+import sys
1062 import threading
1063
1064 import bzrlib
1065 from bzrlib import (
1066 bzrdir,
1067+ debug,
1068 errors,
1069 osutils,
1070 tests,
1071@@ -50,6 +54,29 @@
1072 )
1073
1074
1075+def create_file_pipes():
1076+ r, w = os.pipe()
1077+ # These must be opened without buffering, or we get undefined results
1078+ rf = os.fdopen(r, 'rb', 0)
1079+ wf = os.fdopen(w, 'wb', 0)
1080+ return rf, wf
1081+
1082+
1083+def portable_socket_pair():
1084+ """Return a pair of TCP sockets connected to each other.
1085+
1086+ Unlike socket.socketpair, this should work on Windows.
1087+ """
1088+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1089+ listen_sock.bind(('127.0.0.1', 0))
1090+ listen_sock.listen(1)
1091+ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1092+ client_sock.connect(listen_sock.getsockname())
1093+ server_sock, addr = listen_sock.accept()
1094+ listen_sock.close()
1095+ return server_sock, client_sock
1096+
1097+
1098 class StringIOSSHVendor(object):
1099 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
1100
1101@@ -64,6 +91,27 @@
1102 return StringIOSSHConnection(self)
1103
1104
1105+class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
1106+ """The first connection will be considered closed.
1107+
1108+ The second connection will succeed normally.
1109+ """
1110+
1111+ def __init__(self, read_from, write_to, fail_at_write=True):
1112+ super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
1113+ write_to)
1114+ self.fail_at_write = fail_at_write
1115+ self._first = True
1116+
1117+ def connect_ssh(self, username, password, host, port, command):
1118+ self.calls.append(('connect_ssh', username, password, host, port,
1119+ command))
1120+ if self._first:
1121+ self._first = False
1122+ return ClosedSSHConnection(self)
1123+ return StringIOSSHConnection(self)
1124+
1125+
1126 class StringIOSSHConnection(ssh.SSHConnection):
1127 """A SSH connection that uses StringIO to buffer writes and answer reads."""
1128
1129@@ -79,6 +127,29 @@
1130 return 'pipes', (self.vendor.read_from, self.vendor.write_to)
1131
1132
1133+class ClosedSSHConnection(ssh.SSHConnection):
1134+ """An SSH connection that just has closed channels."""
1135+
1136+ def __init__(self, vendor):
1137+ self.vendor = vendor
1138+
1139+ def close(self):
1140+ self.vendor.calls.append(('close', ))
1141+
1142+ def get_sock_or_pipes(self):
1143+ # We create matching pipes, and then close the ssh side
1144+ bzr_read, ssh_write = create_file_pipes()
1145+ # We always fail when bzr goes to read
1146+ ssh_write.close()
1147+ if self.vendor.fail_at_write:
1148+ # If set, we'll also fail when bzr goes to write
1149+ ssh_read, bzr_write = create_file_pipes()
1150+ ssh_read.close()
1151+ else:
1152+ bzr_write = self.vendor.write_to
1153+ return 'pipes', (bzr_read, bzr_write)
1154+
1155+
1156 class _InvalidHostnameFeature(tests.Feature):
1157 """Does 'non_existent.invalid' fail to resolve?
1158
1159@@ -174,6 +245,91 @@
1160 client_medium._accept_bytes('abc')
1161 self.assertEqual('abc', output.getvalue())
1162
1163+ def test_simple_pipes__accept_bytes_subprocess_closed(self):
1164+ # It is unfortunate that we have to use Popen for this. However,
1165+ # os.pipe() does not behave the same as subprocess.Popen().
1166+ # On Windows, if you use os.pipe() and close the write side,
1167+ # read.read() hangs. On Linux, read.read() returns the empty string.
1168+ p = subprocess.Popen([sys.executable, '-c',
1169+ 'import sys\n'
1170+ 'sys.stdout.write(sys.stdin.read(4))\n'
1171+ 'sys.stdout.close()\n'],
1172+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1173+ client_medium = medium.SmartSimplePipesClientMedium(
1174+ p.stdout, p.stdin, 'base')
1175+ client_medium._accept_bytes('abc\n')
1176+ self.assertEqual('abc', client_medium._read_bytes(3))
1177+ p.wait()
1178+ # While writing to the underlying pipe,
1179+ # Windows py2.6.6 we get IOError(EINVAL)
1180+ # Lucid py2.6.5, we get IOError(EPIPE)
1181+ # In both cases, it should be wrapped to ConnectionReset
1182+ self.assertRaises(errors.ConnectionReset,
1183+ client_medium._accept_bytes, 'more')
1184+
1185+ def test_simple_pipes__accept_bytes_pipe_closed(self):
1186+ child_read, client_write = create_file_pipes()
1187+ client_medium = medium.SmartSimplePipesClientMedium(
1188+ None, client_write, 'base')
1189+ client_medium._accept_bytes('abc\n')
1190+ self.assertEqual('abc\n', child_read.read(4))
1191+ # While writing to the underlying pipe,
1192+ # Windows py2.6.6 we get IOError(EINVAL)
1193+ # Lucid py2.6.5, we get IOError(EPIPE)
1194+ # In both cases, it should be wrapped to ConnectionReset
1195+ child_read.close()
1196+ self.assertRaises(errors.ConnectionReset,
1197+ client_medium._accept_bytes, 'more')
1198+
1199+ def test_simple_pipes__flush_pipe_closed(self):
1200+ child_read, client_write = create_file_pipes()
1201+ client_medium = medium.SmartSimplePipesClientMedium(
1202+ None, client_write, 'base')
1203+ client_medium._accept_bytes('abc\n')
1204+ child_read.close()
1205+ # Even though the pipe is closed, flush on the write side seems to be a
1206+ # no-op, rather than a failure.
1207+ client_medium._flush()
1208+
1209+ def test_simple_pipes__flush_subprocess_closed(self):
1210+ p = subprocess.Popen([sys.executable, '-c',
1211+ 'import sys\n'
1212+ 'sys.stdout.write(sys.stdin.read(4))\n'
1213+ 'sys.stdout.close()\n'],
1214+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1215+ client_medium = medium.SmartSimplePipesClientMedium(
1216+ p.stdout, p.stdin, 'base')
1217+ client_medium._accept_bytes('abc\n')
1218+ p.wait()
1219+ # Even though the child process is dead, flush seems to be a no-op.
1220+ client_medium._flush()
1221+
1222+ def test_simple_pipes__read_bytes_pipe_closed(self):
1223+ child_read, client_write = create_file_pipes()
1224+ client_medium = medium.SmartSimplePipesClientMedium(
1225+ child_read, client_write, 'base')
1226+ client_medium._accept_bytes('abc\n')
1227+ client_write.close()
1228+ self.assertEqual('abc\n', client_medium._read_bytes(4))
1229+ self.assertEqual('', client_medium._read_bytes(4))
1230+
1231+ def test_simple_pipes__read_bytes_subprocess_closed(self):
1232+ p = subprocess.Popen([sys.executable, '-c',
1233+ 'import sys\n'
1234+ 'if sys.platform == "win32":\n'
1235+ ' import msvcrt, os\n'
1236+ ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
1237+ ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
1238+ 'sys.stdout.write(sys.stdin.read(4))\n'
1239+ 'sys.stdout.close()\n'],
1240+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1241+ client_medium = medium.SmartSimplePipesClientMedium(
1242+ p.stdout, p.stdin, 'base')
1243+ client_medium._accept_bytes('abc\n')
1244+ p.wait()
1245+ self.assertEqual('abc\n', client_medium._read_bytes(4))
1246+ self.assertEqual('', client_medium._read_bytes(4))
1247+
1248 def test_simple_pipes_client_disconnect_does_nothing(self):
1249 # calling disconnect does nothing.
1250 input = StringIO()
1251@@ -561,6 +717,28 @@
1252 request.finished_reading()
1253 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
1254
1255+ def test_reset(self):
1256+ server_sock, client_sock = portable_socket_pair()
1257+ # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
1258+ # bzr where it exists.
1259+ client_medium = medium.SmartTCPClientMedium(None, None, None)
1260+ client_medium._socket = client_sock
1261+ client_medium._connected = True
1262+ req = client_medium.get_request()
1263+ self.assertRaises(errors.TooManyConcurrentRequests,
1264+ client_medium.get_request)
1265+ client_medium.reset()
1266+ # The stream should be reset, marked as disconnected, though ready for
1267+ # us to make a new request
1268+ self.assertFalse(client_medium._connected)
1269+ self.assertIs(None, client_medium._socket)
1270+ try:
1271+ self.assertEqual('', client_sock.recv(1))
1272+ except socket.error, e:
1273+ if e.errno not in (errno.EBADF,):
1274+ raise
1275+ req = client_medium.get_request()
1276+
1277
1278 class RemoteTransportTests(test_smart.TestCaseWithSmartMedium):
1279
1280@@ -614,20 +792,6 @@
1281 super(TestSmartServerStreamMedium, self).setUp()
1282 self._captureVar('BZR_NO_SMART_VFS', None)
1283
1284- def portable_socket_pair(self):
1285- """Return a pair of TCP sockets connected to each other.
1286-
1287- Unlike socket.socketpair, this should work on Windows.
1288- """
1289- listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1290- listen_sock.bind(('127.0.0.1', 0))
1291- listen_sock.listen(1)
1292- client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1293- client_sock.connect(listen_sock.getsockname())
1294- server_sock, addr = listen_sock.accept()
1295- listen_sock.close()
1296- return server_sock, client_sock
1297-
1298 def test_smart_query_version(self):
1299 """Feed a canned query version to a server"""
1300 # wire-to-wire, using the whole stack
1301@@ -692,7 +856,7 @@
1302
1303 def test_socket_stream_with_bulk_data(self):
1304 sample_request_bytes = 'command\n9\nbulk datadone\n'
1305- server_sock, client_sock = self.portable_socket_pair()
1306+ server_sock, client_sock = portable_socket_pair()
1307 server = medium.SmartServerSocketStreamMedium(
1308 server_sock, None)
1309 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1310@@ -711,7 +875,7 @@
1311 self.assertTrue(server.finished)
1312
1313 def test_socket_stream_shutdown_detection(self):
1314- server_sock, client_sock = self.portable_socket_pair()
1315+ server_sock, client_sock = portable_socket_pair()
1316 client_sock.close()
1317 server = medium.SmartServerSocketStreamMedium(
1318 server_sock, None)
1319@@ -731,7 +895,7 @@
1320 rest_of_request_bytes = 'lo\n'
1321 expected_response = (
1322 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
1323- server_sock, client_sock = self.portable_socket_pair()
1324+ server_sock, client_sock = portable_socket_pair()
1325 server = medium.SmartServerSocketStreamMedium(
1326 server_sock, None)
1327 client_sock.sendall(incomplete_request_bytes)
1328@@ -807,7 +971,7 @@
1329 # _serve_one_request should still process both of them as if they had
1330 # been received separately.
1331 sample_request_bytes = 'command\n'
1332- server_sock, client_sock = self.portable_socket_pair()
1333+ server_sock, client_sock = portable_socket_pair()
1334 server = medium.SmartServerSocketStreamMedium(
1335 server_sock, None)
1336 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1337@@ -844,7 +1008,7 @@
1338 self.assertTrue(server.finished)
1339
1340 def test_socket_stream_error_handling(self):
1341- server_sock, client_sock = self.portable_socket_pair()
1342+ server_sock, client_sock = portable_socket_pair()
1343 server = medium.SmartServerSocketStreamMedium(
1344 server_sock, None)
1345 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
1346@@ -865,7 +1029,7 @@
1347 self.assertEqual('', from_server.getvalue())
1348
1349 def test_socket_stream_keyboard_interrupt_handling(self):
1350- server_sock, client_sock = self.portable_socket_pair()
1351+ server_sock, client_sock = portable_socket_pair()
1352 server = medium.SmartServerSocketStreamMedium(
1353 server_sock, None)
1354 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
1355@@ -882,7 +1046,7 @@
1356 return server._build_protocol()
1357
1358 def build_protocol_socket(self, bytes):
1359- server_sock, client_sock = self.portable_socket_pair()
1360+ server_sock, client_sock = portable_socket_pair()
1361 server = medium.SmartServerSocketStreamMedium(
1362 server_sock, None)
1363 client_sock.sendall(bytes)
1364@@ -2785,6 +2949,33 @@
1365 'e', # end
1366 output.getvalue())
1367
1368+ def test_records_start_of_body_stream(self):
1369+ requester, output = self.make_client_encoder_and_output()
1370+ requester.set_headers({})
1371+ in_stream = [False]
1372+ def stream_checker():
1373+ self.assertTrue(requester.body_stream_started)
1374+ in_stream[0] = True
1375+ yield 'content'
1376+ flush_called = []
1377+ orig_flush = requester.flush
1378+ def tracked_flush():
1379+ flush_called.append(in_stream[0])
1380+ if in_stream[0]:
1381+ self.assertTrue(requester.body_stream_started)
1382+ else:
1383+ self.assertFalse(requester.body_stream_started)
1384+ return orig_flush()
1385+ requester.flush = tracked_flush
1386+ requester.call_with_body_stream(('one arg',), stream_checker())
1387+ self.assertEqual(
1388+ 'bzr message 3 (bzr 1.6)\n' # protocol version
1389+ '\x00\x00\x00\x02de' # headers
1390+ 's\x00\x00\x00\x0bl7:one arge' # args
1391+ 'b\x00\x00\x00\x07content' # body
1392+ 'e', output.getvalue())
1393+ self.assertEqual([False, True, True], flush_called)
1394+
1395
1396 class StubMediumRequest(object):
1397 """A stub medium request that tracks the number of times accept_bytes is
1398@@ -3209,6 +3400,193 @@
1399 # encoder.
1400
1401
1402+class Test_SmartClientRequest(tests.TestCase):
1403+
1404+ def make_client_with_failing_medium(self, fail_at_write=True, response=''):
1405+ response_io = StringIO(response)
1406+ output = StringIO()
1407+ vendor = FirstRejectedStringIOSSHVendor(response_io, output,
1408+ fail_at_write=fail_at_write)
1409+ ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
1410+ client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
1411+ smart_client = client._SmartClient(client_medium, headers={})
1412+ return output, vendor, smart_client
1413+
1414+ def make_response(self, args, body=None, body_stream=None):
1415+ response_io = StringIO()
1416+ response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
1417+ body_stream=body_stream)
1418+ responder = protocol.ProtocolThreeResponder(response_io.write)
1419+ responder.send_response(response)
1420+ return response_io.getvalue()
1421+
1422+ def test__call_doesnt_retry_append(self):
1423+ response = self.make_response(('appended', '8'))
1424+ output, vendor, smart_client = self.make_client_with_failing_medium(
1425+ fail_at_write=False, response=response)
1426+ smart_request = client._SmartClientRequest(smart_client, 'append',
1427+ ('foo', ''), body='content\n')
1428+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1429+
1430+ def test__call_retries_get_bytes(self):
1431+ response = self.make_response(('ok',), 'content\n')
1432+ output, vendor, smart_client = self.make_client_with_failing_medium(
1433+ fail_at_write=False, response=response)
1434+ smart_request = client._SmartClientRequest(smart_client, 'get',
1435+ ('foo',))
1436+ response, response_handler = smart_request._call(3)
1437+ self.assertEqual(('ok',), response)
1438+ self.assertEqual('content\n', response_handler.read_body_bytes())
1439+
1440+ def test__call_noretry_get_bytes(self):
1441+ debug.debug_flags.add('noretry')
1442+ response = self.make_response(('ok',), 'content\n')
1443+ output, vendor, smart_client = self.make_client_with_failing_medium(
1444+ fail_at_write=False, response=response)
1445+ smart_request = client._SmartClientRequest(smart_client, 'get',
1446+ ('foo',))
1447+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1448+
1449+ def test__send_no_retry_pipes(self):
1450+ client_read, server_write = create_file_pipes()
1451+ server_read, client_write = create_file_pipes()
1452+ client_medium = medium.SmartSimplePipesClientMedium(client_read,
1453+ client_write, base='/')
1454+ smart_client = client._SmartClient(client_medium)
1455+ smart_request = client._SmartClientRequest(smart_client,
1456+ 'hello', ())
1457+ # Close the server side
1458+ server_read.close()
1459+ encoder, response_handler = smart_request._construct_protocol(3)
1460+ self.assertRaises(errors.ConnectionReset,
1461+ smart_request._send_no_retry, encoder)
1462+
1463+ def test__send_read_response_sockets(self):
1464+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1465+ listen_sock.bind(('127.0.0.1', 0))
1466+ listen_sock.listen(1)
1467+ host, port = listen_sock.getsockname()
1468+ client_medium = medium.SmartTCPClientMedium(host, port, '/')
1469+ client_medium._ensure_connection()
1470+ smart_client = client._SmartClient(client_medium)
1471+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1472+ # Accept the connection, but don't actually talk to the client.
1473+ server_sock, _ = listen_sock.accept()
1474+ server_sock.close()
1475+ # Sockets buffer and don't really notice that the server has closed the
1476+ # connection until we try to read again.
1477+ handler = smart_request._send(3)
1478+ self.assertRaises(errors.ConnectionReset,
1479+ handler.read_response_tuple, expect_body=False)
1480+
1481+ def test__send_retries_on_write(self):
1482+ output, vendor, smart_client = self.make_client_with_failing_medium()
1483+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1484+ handler = smart_request._send(3)
1485+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1486+ '\x00\x00\x00\x02de' # empty headers
1487+ 's\x00\x00\x00\tl5:helloee',
1488+ output.getvalue())
1489+ self.assertEqual(
1490+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1491+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1492+ ('close',),
1493+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1494+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1495+ ],
1496+ vendor.calls)
1497+
1498+ def test__send_doesnt_retry_read_failure(self):
1499+ output, vendor, smart_client = self.make_client_with_failing_medium(
1500+ fail_at_write=False)
1501+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1502+ handler = smart_request._send(3)
1503+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1504+ '\x00\x00\x00\x02de' # empty headers
1505+ 's\x00\x00\x00\tl5:helloee',
1506+ output.getvalue())
1507+ self.assertEqual(
1508+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1509+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1510+ ],
1511+ vendor.calls)
1512+ self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
1513+
1514+ def test__send_request_retries_body_stream_if_not_started(self):
1515+ output, vendor, smart_client = self.make_client_with_failing_medium()
1516+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1517+ body_stream=['a', 'b'])
1518+ response_handler = smart_request._send(3)
1519+ # We connect, get disconnected, and notice before consuming the stream,
1520+ # so we try again one time and succeed.
1521+ self.assertEqual(
1522+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1523+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1524+ ('close',),
1525+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1526+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1527+ ],
1528+ vendor.calls)
1529+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1530+ '\x00\x00\x00\x02de' # empty headers
1531+ 's\x00\x00\x00\tl5:helloe'
1532+ 'b\x00\x00\x00\x01a'
1533+ 'b\x00\x00\x00\x01b'
1534+ 'e',
1535+ output.getvalue())
1536+
1537+ def test__send_request_stops_if_body_started(self):
1538+ # We intentionally use the python StringIO so that we can subclass it.
1539+ from StringIO import StringIO
1540+ response = StringIO()
1541+
1542+ class FailAfterFirstWrite(StringIO):
1543+ """Allow one 'write' call to pass, fail the rest"""
1544+ def __init__(self):
1545+ StringIO.__init__(self)
1546+ self._first = True
1547+
1548+ def write(self, s):
1549+ if self._first:
1550+ self._first = False
1551+ return StringIO.write(self, s)
1552+ raise IOError(errno.EINVAL, 'invalid file handle')
1553+ output = FailAfterFirstWrite()
1554+
1555+ vendor = FirstRejectedStringIOSSHVendor(response, output,
1556+ fail_at_write=False)
1557+ ssh_params = medium.SSHParams('a host', 'a port', 'a user', 'a pass')
1558+ client_medium = medium.SmartSSHClientMedium('base', ssh_params, vendor)
1559+ smart_client = client._SmartClient(client_medium, headers={})
1560+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1561+ body_stream=['a', 'b'])
1562+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1563+ # We connect, and manage to get to the point that we start consuming
1564+ # the body stream. The next write fails, so we just stop.
1565+ self.assertEqual(
1566+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1567+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1568+ ('close',),
1569+ ],
1570+ vendor.calls)
1571+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1572+ '\x00\x00\x00\x02de' # empty headers
1573+ 's\x00\x00\x00\tl5:helloe',
1574+ output.getvalue())
1575+
1576+ def test__send_disabled_retry(self):
1577+ debug.debug_flags.add('noretry')
1578+ output, vendor, smart_client = self.make_client_with_failing_medium()
1579+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1580+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1581+ self.assertEqual(
1582+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1583+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1584+ ('close',),
1585+ ],
1586+ vendor.calls)
1587+
1588+
1589 class LengthPrefixedBodyDecoder(tests.TestCase):
1590
1591 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches