Merge lp:~jameinel/bzr/2.1-client-reconnect-819604 into lp:bzr/2.1

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.1-client-reconnect-819604
Merge into: lp:bzr/2.1
Diff against target: 1035 lines (+589/-143)
7 files modified
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+40/-9)
bzrlib/smart/client.py (+170/-86)
bzrlib/smart/medium.py (+43/-24)
bzrlib/smart/protocol.py (+0/-3)
bzrlib/tests/test_osutils.py (+39/-0)
bzrlib/tests/test_smart_transport.py (+295/-21)
To merge this branch: bzr merge lp:~jameinel/bzr/2.1-client-reconnect-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78597@code.launchpad.net

Commit message

Start of bug #819604, allow bzr clients to reconnect if the connection is gone when we go to write a new request.

Description of the change

This is some initial work for having clients reconnect when their bzr+ssh (or bzr://) connection gets closed underneath them.

To start with, the trickiest bit is that it is really quite hard to detect when things are closed. Mostly because of buffering, etc, at lots of different levels (internal memory buffering, socket/pipe buffering, 'ssh.exe' buffering, latency of close message from server back to client, etc.)

I'll also clarify that this isn't intended to handle all possible sorts of connection failures. What we really are trying to handle is allowing a server to be upgraded 'live'. Such that the server will gently disconnect us only between complete requests. Versus having requests terminated randomly in the middle of content. It may help with some of those cases, and we certainly don't want to cause corruption, etc if one of those happens. But it isn't the goal of *this* change.

This specific patch has a few aspects, and does help some cases in real-world testing.

1) Backport a minimum amount of the SmartSSHClientMedium proxies to _real_medium patch from Andrew. In bzr-2.2 Andrew updated the code so that when spawning an 'ssh' subprocess, we use a socketpair when possible instead of pipes. (That way we can read without blocking, allowing us to use large buffers, etc.)

It also meant that he shared the logic from SmartSimplePipeStreamMedium and SmartTCPClientMedium, which I wanted to do, to avoid having to re-implement the logic multiple times.

This is a pretty small part of the patch.

2) Update some of the lower level code so that we get ConnectionReset rather than various IOError and ValueError when writing to a closed connection.

3) Update the _SmartClient._send_request logic. If we get a ConnectionReset while we are trying to write the request, then we can safely retry the request. On the assumption that as long as the server doesn't see the final 'e' terminal byte, it will reject the request, because it is incomplete.

4) The one caveat is if there is a 'body_stream'. body_stream is an iterable, so we can't just rewind it and resume it.

On the plus side, there is only one caller RemoteStreamSink.insert_stream. In theory we could update that caller. In practice, it *also* takes a 'stream' object, which can't simply be rewound. Though we do a 'no-op' stream just before the real content to determine that the remote server actually supports the RPC we want to use.

That would allow us to close the gap a little bit. So we can detect ConnectionReset all the way to the point that we actually start streaming the content.

The other bit that would be possible, is to update ProtocolThreeRequester.call_with_body_stream to allow it to set a flag to indicate whether it has actually started consuming the stream yet. Then update it to flush just before it iterates the body stream to help force detecting closed connections. (otherwise we are likely to have buffered at least the first part of the body_stream in local memory, since the local buffer is 1MB and the first flush is only called after the first chunk of the body_stream.)

This just closes a window where we won't reconnect during 'bzr push'.

5) The next step is that (especially for sockets), we write out the request successfully, and then notice it is closed when we try to read back the response.

Here I'm a lot more concerned about non-idempotent requests. Because the server might have fully read the request, and sent back a response, but we didn't get to see the response.

However, our RPC design is meant to be stateless (all state is passed in the request, not assumed to be stored on the server side). Which means things *should* be idempotent. For something like 'put_bytes', we may write the same content twice, but we shouldn't corrupt anything.

The only one I can think of is 'append_bytes', and the associated open_write_stream() code. (RemoteTransport.open_write_stream uses AppendBasedFileStream as its implementation.)

And that code shouldn't really be used with up-to-date bzr clients and servers. So I'm much more ok with it just dying if it isn't safe to retry.

To post a comment you must log in.
Revision history for this message
John A Meinel (jameinel) wrote :

I should note, I think it would be prudent to land this into bzr.dev first, but I wanted to make sure it was written against 2.1 since that is the oldest client that we want to backport support for.

Also, I expect there will be a modest amount of conflicts bringing this up from bzr-2.1 into bzr.dev. There has been a modest amount of changes inbetween, so I'll try to produce branches for each series.

Revision history for this message
John A Meinel (jameinel) wrote :

Small update, in working on the read-reconnect code, I realized passing all these arguments around was very clumsy. So I refactored the code into a helper class _SmartClientRequest. That way, the 6 arguments being passed around just end up as attributes, and the function call interplay is easier to follow.

Revision history for this message
Vincent Ladeuil (vila) wrote :

> I should note, I think it would be prudent to land this into bzr.dev first,
> but I wanted to make sure it was written against 2.1 since that is the oldest
> client that we want to backport support for.

+4 (+1 for each of 2.1, 2.2, 2.3, 2.4)

No need to ask anymore, good, keep reading my mind ;)

I don't know exactly how we should proceed but as much as possible, all the modifications related to this topic should be done in a separate branch which should be merged into whatever series we target.

Backport is hard, let's avoid it as much as possible (but no more ;)

> Also, I expect there will be a modest amount of conflicts bringing this up
> from bzr-2.1 into bzr.dev. There has been a modest amount of changes
> inbetween, so I'll try to produce branches for each series.

Yup, that sounds a lot like a loom with a thread by targeted series. Or any arrangement of branches that makes our life easier.

Revision history for this message
Vincent Ladeuil (vila) wrote :

> Small update, in working on the read-reconnect code, I realized passing all
> these arguments around was very clumsy. So I refactored the code into a helper
> class _SmartClientRequest. That way, the 6 arguments being passed around just
> end up as attributes, and the function call interplay is easier to follow.

From the description alone this part got my approval. If some helpers need to move there too, clarifying the class features, even better ( no pre-conceptions here, just saying).

Revision history for this message
John A Meinel (jameinel) wrote :

Some small code cleanups, and add -Dnoretry as a debugging flag that will disable the retry-on-write.

Revision history for this message
Martin Pool (mbp) wrote :

I'm going to (reluctantly) bump these out of the merge queue until this is landed and well tested in 2.5 - probably until after the 2.5.0 final release is out. Then we can come back and look at landing to earlier series.

Revision history for this message
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 11/29/2011 7:32 AM, Martin Pool wrote:
> I'm going to (reluctantly) bump these out of the merge queue until
> this is landed and well tested in 2.5 - probably until after the
> 2.5.0 final release is out. Then we can come back and look at
> landing to earlier series.

I don't think it is something you should feel bad about :). That was
certainly the intent.

1) I wrote it against 2.1 so we would have a chance to land it there
   cleanly.

2) I tried to split it up into multiple patches to make it easier to
   review.

3) I merged it up through the stack because there are a fair number of
   changes to this code, and thus conflicts, etc.

4) It should certainly land on trunk and get a thorough shakeout there.
   I'm not sure how much time you think is enough. Certainly if enough
   time passes, 2.1 will be fully obsolete :).

Anyway, thanks for getting this landed in 2.5.

John
=:->

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAk7UiEcACgkQJdeBCYSNAANSFwCgiwx+EeIpiYLJF84P0NPcLFd5
T90AoJDimVbLmeDiqn1FWFDmdWtMXaWq
=94bN
-----END PGP SIGNATURE-----

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
2--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
3+++ bzrlib/help_topics/en/debug-flags.txt 2012-09-11 12:31:20 +0000
4@@ -24,6 +24,8 @@
5 -Dindex Trace major index operations.
6 -Dknit Trace knit operations.
7 -Dlock Trace when lockdir locks are taken or released.
8+-Dnoretry If a connection is reset, fail immediately rather than
9+ retrying the request.
10 -Dprogress Trace progress bar operations.
11 -Dmerge Emit information for debugging merges.
12 -Dno_apport Don't use apport to report crashes.
13
14=== modified file 'bzrlib/osutils.py'
15--- bzrlib/osutils.py 2010-05-27 04:00:01 +0000
16+++ bzrlib/osutils.py 2012-09-11 12:31:20 +0000
17@@ -40,6 +40,7 @@
18 rmtree,
19 )
20 import signal
21+import socket
22 import subprocess
23 import tempfile
24 from tempfile import (
25@@ -1929,6 +1930,20 @@
26 return socket.gethostname().decode(get_user_encoding())
27
28
29+# We must not read/write any more than 64k at a time from/to a socket so we
30+# don't risk "no buffer space available" errors on some platforms. Windows in
31+# particular is likely to throw WSAECONNABORTED or WSAENOBUFS if given too much
32+# data at once.
33+MAX_SOCKET_CHUNK = 64 * 1024
34+
35+_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL]
36+for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:
37+ _eno = getattr(errno, _eno, None)
38+ if _eno is not None:
39+ _end_of_stream_errors.append(_eno)
40+del _eno
41+
42+
43 def recv_all(socket, bytes):
44 """Receive an exact number of bytes.
45
46@@ -1948,21 +1963,37 @@
47 return b
48
49
50-def send_all(socket, bytes, report_activity=None):
51+def send_all(sock, bytes, report_activity=None):
52 """Send all bytes on a socket.
53
54- Regular socket.sendall() can give socket error 10053 on Windows. This
55- implementation sends no more than 64k at a time, which avoids this problem.
56+ Breaks large blocks in smaller chunks to avoid buffering limitations on
57+ some platforms, and catches EINTR which may be thrown if the send is
58+ interrupted by a signal.
59+
60+ This is preferred to socket.sendall(), because it avoids portability bugs
61+ and provides activity reporting.
62
63 :param report_activity: Call this as bytes are read, see
64 Transport._report_activity
65 """
66- chunk_size = 2**16
67- for pos in xrange(0, len(bytes), chunk_size):
68- block = bytes[pos:pos+chunk_size]
69- if report_activity is not None:
70- report_activity(len(block), 'write')
71- until_no_eintr(socket.sendall, block)
72+ sent_total = 0
73+ byte_count = len(bytes)
74+ while sent_total < byte_count:
75+ try:
76+ sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))
77+ except (socket.error, IOError), e:
78+ if e.args[0] in _end_of_stream_errors:
79+ raise errors.ConnectionReset(
80+ "Error trying to write to socket", e)
81+ if e.args[0] != errno.EINTR:
82+ raise
83+ else:
84+ if sent == 0:
85+ raise errors.ConnectionReset('Sending to %s returned 0 bytes'
86+ % (sock,))
87+ sent_total += sent
88+ if report_activity is not None:
89+ report_activity(sent, 'write')
90
91
92 def dereference_path(path):
93
94=== modified file 'bzrlib/smart/client.py'
95--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
96+++ bzrlib/smart/client.py 2012-09-11 12:31:20 +0000
97@@ -16,10 +16,11 @@
98
99 import bzrlib
100 from bzrlib.smart import message, protocol
101-from bzrlib.trace import warning
102 from bzrlib import (
103+ debug,
104 errors,
105 hooks,
106+ trace,
107 )
108
109
110@@ -39,93 +40,12 @@
111 def __repr__(self):
112 return '%s(%r)' % (self.__class__.__name__, self._medium)
113
114- def _send_request(self, protocol_version, method, args, body=None,
115- readv_body=None, body_stream=None):
116- encoder, response_handler = self._construct_protocol(
117- protocol_version)
118- encoder.set_headers(self._headers)
119- if body is not None:
120- if readv_body is not None:
121- raise AssertionError(
122- "body and readv_body are mutually exclusive.")
123- if body_stream is not None:
124- raise AssertionError(
125- "body and body_stream are mutually exclusive.")
126- encoder.call_with_body_bytes((method, ) + args, body)
127- elif readv_body is not None:
128- if body_stream is not None:
129- raise AssertionError(
130- "readv_body and body_stream are mutually exclusive.")
131- encoder.call_with_body_readv_array((method, ) + args, readv_body)
132- elif body_stream is not None:
133- encoder.call_with_body_stream((method, ) + args, body_stream)
134- else:
135- encoder.call(method, *args)
136- return response_handler
137-
138- def _run_call_hooks(self, method, args, body, readv_body):
139- if not _SmartClient.hooks['call']:
140- return
141- params = CallHookParams(method, args, body, readv_body, self._medium)
142- for hook in _SmartClient.hooks['call']:
143- hook(params)
144-
145 def _call_and_read_response(self, method, args, body=None, readv_body=None,
146 body_stream=None, expect_response_body=True):
147- self._run_call_hooks(method, args, body, readv_body)
148- if self._medium._protocol_version is not None:
149- response_handler = self._send_request(
150- self._medium._protocol_version, method, args, body=body,
151- readv_body=readv_body, body_stream=body_stream)
152- return (response_handler.read_response_tuple(
153- expect_body=expect_response_body),
154- response_handler)
155- else:
156- for protocol_version in [3, 2]:
157- if protocol_version == 2:
158- # If v3 doesn't work, the remote side is older than 1.6.
159- self._medium._remember_remote_is_before((1, 6))
160- response_handler = self._send_request(
161- protocol_version, method, args, body=body,
162- readv_body=readv_body, body_stream=body_stream)
163- try:
164- response_tuple = response_handler.read_response_tuple(
165- expect_body=expect_response_body)
166- except errors.UnexpectedProtocolVersionMarker, err:
167- # TODO: We could recover from this without disconnecting if
168- # we recognise the protocol version.
169- warning(
170- 'Server does not understand Bazaar network protocol %d,'
171- ' reconnecting. (Upgrade the server to avoid this.)'
172- % (protocol_version,))
173- self._medium.disconnect()
174- continue
175- except errors.ErrorFromSmartServer:
176- # If we received an error reply from the server, then it
177- # must be ok with this protocol version.
178- self._medium._protocol_version = protocol_version
179- raise
180- else:
181- self._medium._protocol_version = protocol_version
182- return response_tuple, response_handler
183- raise errors.SmartProtocolError(
184- 'Server is not a Bazaar server: ' + str(err))
185-
186- def _construct_protocol(self, version):
187- request = self._medium.get_request()
188- if version == 3:
189- request_encoder = protocol.ProtocolThreeRequester(request)
190- response_handler = message.ConventionalResponseHandler()
191- response_proto = protocol.ProtocolThreeDecoder(
192- response_handler, expect_version_marker=True)
193- response_handler.setProtoAndMediumRequest(response_proto, request)
194- elif version == 2:
195- request_encoder = protocol.SmartClientRequestProtocolTwo(request)
196- response_handler = request_encoder
197- else:
198- request_encoder = protocol.SmartClientRequestProtocolOne(request)
199- response_handler = request_encoder
200- return request_encoder, response_handler
201+ request = _SmartClientRequest(self, method, args, body=body,
202+ readv_body=readv_body, body_stream=body_stream,
203+ expect_response_body=expect_response_body)
204+ return request.call_and_read_response()
205
206 def call(self, method, *args):
207 """Call a method on the remote server."""
208@@ -191,6 +111,170 @@
209 return self._medium.remote_path_from_transport(transport)
210
211
212+class _SmartClientRequest(object):
213+ """Encapsulate the logic for a single request.
214+
215+ This class handles things like reconnecting and sending the request a
216+ second time when the connection is reset in the middle. It also handles the
217+ multiple requests that get made if we don't know what protocol the server
218+ supports yet.
219+
220+ Generally, you build up one of these objects, passing in the arguments that
221+ you want to send to the server, and then use 'call_and_read_response' to
222+ get the response from the server.
223+ """
224+
225+ def __init__(self, client, method, args, body=None, readv_body=None,
226+ body_stream=None, expect_response_body=True):
227+ self.client = client
228+ self.method = method
229+ self.args = args
230+ self.body = body
231+ self.readv_body = readv_body
232+ self.body_stream = body_stream
233+ self.expect_response_body = expect_response_body
234+
235+ def call_and_read_response(self):
236+ """Send the request to the server, and read the initial response.
237+
238+ This doesn't read all of the body content of the response, instead it
239+ returns (response_tuple, response_handler). response_tuple is the 'ok',
240+ or 'error' information, and 'response_handler' can be used to get the
241+ content stream out.
242+ """
243+ self._run_call_hooks()
244+ protocol_version = self.client._medium._protocol_version
245+ if protocol_version is None:
246+ return self._call_determining_protocol_version()
247+ else:
248+ return self._call(protocol_version)
249+
250+ def _run_call_hooks(self):
251+ if not _SmartClient.hooks['call']:
252+ return
253+ params = CallHookParams(self.method, self.args, self.body,
254+ self.readv_body, self.client._medium)
255+ for hook in _SmartClient.hooks['call']:
256+ hook(params)
257+
258+ def _call(self, protocol_version):
259+ """We know the protocol version.
260+
261+ So this just sends the request, and then reads the response. This is
262+ where the code will be to retry requests if the connection is closed.
263+ """
264+ response_handler = self._send(protocol_version)
265+ response_tuple = response_handler.read_response_tuple(
266+ expect_body=self.expect_response_body)
267+ return (response_tuple, response_handler)
268+
269+ def _call_determining_protocol_version(self):
270+ """Determine what protocol the remote server supports.
271+
272+ We do this by placing a request in the most recent protocol, and
273+ handling the UnexpectedProtocolVersionMarker from the server.
274+ """
275+ for protocol_version in [3, 2]:
276+ if protocol_version == 2:
277+ # If v3 doesn't work, the remote side is older than 1.6.
278+ self.client._medium._remember_remote_is_before((1, 6))
279+ try:
280+ response_tuple, response_handler = self._call(protocol_version)
281+ except errors.UnexpectedProtocolVersionMarker, err:
282+ # TODO: We could recover from this without disconnecting if
283+ # we recognise the protocol version.
284+ trace.warning(
285+ 'Server does not understand Bazaar network protocol %d,'
286+ ' reconnecting. (Upgrade the server to avoid this.)'
287+ % (protocol_version,))
288+ self.client._medium.disconnect()
289+ continue
290+ except errors.ErrorFromSmartServer:
291+ # If we received an error reply from the server, then it
292+ # must be ok with this protocol version.
293+ self.client._medium._protocol_version = protocol_version
294+ raise
295+ else:
296+ self.client._medium._protocol_version = protocol_version
297+ return response_tuple, response_handler
298+ raise errors.SmartProtocolError(
299+ 'Server is not a Bazaar server: ' + str(err))
300+
301+ def _construct_protocol(self, version):
302+ """Build the encoding stack for a given protocol version."""
303+ request = self.client._medium.get_request()
304+ if version == 3:
305+ request_encoder = protocol.ProtocolThreeRequester(request)
306+ response_handler = message.ConventionalResponseHandler()
307+ response_proto = protocol.ProtocolThreeDecoder(
308+ response_handler, expect_version_marker=True)
309+ response_handler.setProtoAndMediumRequest(response_proto, request)
310+ elif version == 2:
311+ request_encoder = protocol.SmartClientRequestProtocolTwo(request)
312+ response_handler = request_encoder
313+ else:
314+ request_encoder = protocol.SmartClientRequestProtocolOne(request)
315+ response_handler = request_encoder
316+ return request_encoder, response_handler
317+
318+ def _send(self, protocol_version):
319+ """Encode the request, and send it to the server.
320+
321+ This will retry a request if we get a ConnectionReset while sending the
322+ request to the server. (Unless we have a body_stream that we have
323+ already started consuming, since we can't restart body_streams)
324+
325+ :return: response_handler as defined by _construct_protocol
326+ """
327+ encoder, response_handler = self._construct_protocol(protocol_version)
328+ try:
329+ self._send_no_retry(encoder)
330+ except errors.ConnectionReset, e:
331+ # If we fail during the _send_no_retry phase, then we can
332+ # be confident that the server did not get our request, because we
333+ # haven't started waiting for the reply yet. So try the request
334+ # again. We only issue a single retry, because if the connection
335+ # really is down, there is no reason to loop endlessly.
336+
337+ # Connection is dead, so close our end of it.
338+ self.client._medium.reset()
339+ if (('noretry' in debug.debug_flags)
340+ or self.body_stream is not None):
341+ # We can't restart a body_stream that has been partially
342+ # consumed, so we don't retry.
343+ raise
344+ trace.warning('ConnectionReset calling %r, retrying'
345+ % (self.method,))
346+ trace.log_exception_quietly()
347+ encoder, response_handler = self._construct_protocol(
348+ protocol_version)
349+ self._send_no_retry(encoder)
350+ return response_handler
351+
352+ def _send_no_retry(self, encoder):
353+ """Just encode the request and try to send it."""
354+ encoder.set_headers(self.client._headers)
355+ if self.body is not None:
356+ if self.readv_body is not None:
357+ raise AssertionError(
358+ "body and readv_body are mutually exclusive.")
359+ if self.body_stream is not None:
360+ raise AssertionError(
361+ "body and body_stream are mutually exclusive.")
362+ encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
363+ elif self.readv_body is not None:
364+ if self.body_stream is not None:
365+ raise AssertionError(
366+ "readv_body and body_stream are mutually exclusive.")
367+ encoder.call_with_body_readv_array((self.method, ) + self.args,
368+ self.readv_body)
369+ elif self.body_stream is not None:
370+ encoder.call_with_body_stream((self.method, ) + self.args,
371+ self.body_stream)
372+ else:
373+ encoder.call(self.method, *self.args)
374+
375+
376 class SmartClientHooks(hooks.Hooks):
377
378 def __init__(self):
379
380=== modified file 'bzrlib/smart/medium.py'
381--- bzrlib/smart/medium.py 2010-04-27 06:40:37 +0000
382+++ bzrlib/smart/medium.py 2012-09-11 12:31:20 +0000
383@@ -712,6 +712,14 @@
384 """
385 return SmartClientStreamMediumRequest(self)
386
387+ def reset(self):
388+ """We have been disconnected, reset current state.
389+
390+ This resets things like _current_request and connected state.
391+ """
392+ self.disconnect()
393+ self._current_request = None
394+
395
396 class SmartSimplePipesClientMedium(SmartClientStreamMedium):
397 """A client medium using simple pipes.
398@@ -726,11 +734,21 @@
399
400 def _accept_bytes(self, bytes):
401 """See SmartClientStreamMedium.accept_bytes."""
402- osutils.until_no_eintr(self._writeable_pipe.write, bytes)
403+ try:
404+ osutils.until_no_eintr(self._writeable_pipe.write, bytes)
405+ except IOError, e:
406+ if e.errno in (errno.EINVAL, errno.EPIPE):
407+ raise errors.ConnectionReset(
408+ "Error trying to write to subprocess:\n%s"
409+ % (e,))
410+ raise
411 self._report_activity(len(bytes), 'write')
412
413 def _flush(self):
414 """See SmartClientStreamMedium._flush()."""
415+ # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
416+ # However, testing shows that even when the child process is
417+ # gone, this doesn't error.
418 osutils.until_no_eintr(self._writeable_pipe.flush)
419
420 def _read_bytes(self, count):
421@@ -741,7 +759,10 @@
422
423
424 class SmartSSHClientMedium(SmartClientStreamMedium):
425- """A client medium using SSH."""
426+ """A client medium using SSH.
427+
428+ It delegates IO to a SmartSimplePipesClientMedium.
429+ """
430
431 def __init__(self, host, port=None, username=None, password=None,
432 base=None, vendor=None, bzr_remote_path=None):
433@@ -750,11 +771,11 @@
434 :param vendor: An optional override for the ssh vendor to use. See
435 bzrlib.transport.ssh for details on ssh vendors.
436 """
437- self._connected = False
438 self._host = host
439 self._password = password
440 self._port = port
441 self._username = username
442+ self._real_medium = None
443 # for the benefit of progress making a short description of this
444 # transport
445 self._scheme = 'bzr+ssh'
446@@ -762,10 +783,8 @@
447 # _DebugCounter so we have to store all the values used in our repr
448 # method before calling the super init.
449 SmartClientStreamMedium.__init__(self, base)
450- self._read_from = None
451 self._ssh_connection = None
452 self._vendor = vendor
453- self._write_to = None
454 self._bzr_remote_path = bzr_remote_path
455
456 def __repr__(self):
457@@ -783,21 +802,20 @@
458 def _accept_bytes(self, bytes):
459 """See SmartClientStreamMedium.accept_bytes."""
460 self._ensure_connection()
461- osutils.until_no_eintr(self._write_to.write, bytes)
462- self._report_activity(len(bytes), 'write')
463+ self._real_medium.accept_bytes(bytes)
464
465 def disconnect(self):
466 """See SmartClientMedium.disconnect()."""
467- if not self._connected:
468- return
469- osutils.until_no_eintr(self._read_from.close)
470- osutils.until_no_eintr(self._write_to.close)
471- self._ssh_connection.close()
472- self._connected = False
473+ if self._real_medium is not None:
474+ self._real_medium.disconnect()
475+ self._real_medium = None
476+ if self._ssh_connection is not None:
477+ self._ssh_connection.close()
478+ self._ssh_connection = None
479
480 def _ensure_connection(self):
481 """Connect this medium if not already connected."""
482- if self._connected:
483+ if self._real_medium is not None:
484 return
485 if self._vendor is None:
486 vendor = ssh._get_ssh_vendor()
487@@ -807,22 +825,19 @@
488 self._password, self._host, self._port,
489 command=[self._bzr_remote_path, 'serve', '--inet',
490 '--directory=/', '--allow-writes'])
491- self._read_from, self._write_to = \
492- self._ssh_connection.get_filelike_channels()
493- self._connected = True
494+ read_from, write_to = self._ssh_connection.get_filelike_channels()
495+ self._real_medium = SmartSimplePipesClientMedium(
496+ read_from, write_to, self.base)
497
498 def _flush(self):
499 """See SmartClientStreamMedium._flush()."""
500- self._write_to.flush()
501+ self._real_medium._flush()
502
503 def _read_bytes(self, count):
504 """See SmartClientStreamMedium.read_bytes."""
505- if not self._connected:
506+ if self._real_medium is None:
507 raise errors.MediumNotConnected(self)
508- bytes_to_read = min(count, _MAX_READ_SIZE)
509- bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
510- self._report_activity(len(bytes), 'read')
511- return bytes
512+ return self._real_medium.read_bytes(count)
513
514
515 # Port 4155 is the default port for bzr://, registered with IANA.
516@@ -948,13 +963,17 @@
517 self._medium._flush()
518
519
520+WSAECONNABORTED = 10053
521+WSAECONNRESET = 10054
522+
523 def _read_bytes_from_socket(sock, desired_count, report_activity):
524 # We ignore the desired_count because on sockets it's more efficient to
525 # read large chunks (of _MAX_READ_SIZE bytes) at a time.
526 try:
527 bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
528 except socket.error, e:
529- if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
530+ if len(e.args) and e.args[0] in (errno.ECONNRESET, WSAECONNABORTED,
531+ WSAECONNRESET):
532 # The connection was closed by the other side. Callers expect an
533 # empty string to signal end-of-stream.
534 bytes = ''
535
536=== modified file 'bzrlib/smart/protocol.py'
537--- bzrlib/smart/protocol.py 2010-02-17 17:11:16 +0000
538+++ bzrlib/smart/protocol.py 2012-09-11 12:31:20 +0000
539@@ -1075,9 +1075,6 @@
540 self._real_write_func = write_func
541
542 def _write_func(self, bytes):
543- # TODO: It is probably more appropriate to use sum(map(len, _buf))
544- # for total number of bytes to write, rather than buffer based on
545- # the number of write() calls
546 # TODO: Another possibility would be to turn this into an async model.
547 # Where we let another thread know that we have some bytes if
548 # they want it, but we don't actually block for it
549
550=== modified file 'bzrlib/tests/test_osutils.py'
551--- bzrlib/tests/test_osutils.py 2010-11-30 20:42:42 +0000
552+++ bzrlib/tests/test_osutils.py 2012-09-11 12:31:20 +0000
553@@ -801,6 +801,45 @@
554 self.assertEqual(None, osutils.safe_file_id(None))
555
556
557+class TestSendAll(tests.TestCase):
558+
559+ def test_send_with_disconnected_socket(self):
560+ class DisconnectedSocket(object):
561+ def __init__(self, err):
562+ self.err = err
563+ def send(self, content):
564+ raise self.err
565+ def close(self):
566+ pass
567+ # All of these should be treated as ConnectionReset
568+ errs = []
569+ for err_cls in (IOError, socket.error):
570+ for errnum in osutils._end_of_stream_errors:
571+ errs.append(err_cls(errnum))
572+ for err in errs:
573+ sock = DisconnectedSocket(err)
574+ self.assertRaises(errors.ConnectionReset,
575+ osutils.send_all, sock, 'some more content')
576+
577+ def test_send_with_no_progress(self):
578+ # See https://bugs.launchpad.net/bzr/+bug/1047309
579+ # It seems that paramiko can get into a state where it doesn't error,
580+ # but it returns 0 bytes sent for requests over and over again.
581+ class NoSendingSocket(object):
582+ def __init__(self):
583+ self.call_count = 0
584+ def send(self, bytes):
585+ self.call_count += 1
586+ if self.call_count > 100:
587+ # Prevent the test suite from hanging
588+ raise RuntimeError('too many calls')
589+ return 0
590+ sock = NoSendingSocket()
591+ self.assertRaises(errors.ConnectionReset,
592+ osutils.send_all, sock, 'content')
593+ self.assertEqual(1, sock.call_count)
594+
595+
596 class TestWin32Funcs(tests.TestCase):
597 """Test that _win32 versions of os utilities return appropriate paths."""
598
599
600=== modified file 'bzrlib/tests/test_smart_transport.py'
601--- bzrlib/tests/test_smart_transport.py 2010-02-17 17:11:16 +0000
602+++ bzrlib/tests/test_smart_transport.py 2012-09-11 12:31:20 +0000
603@@ -18,13 +18,17 @@
604
605 # all of this deals with byte strings so this is safe
606 from cStringIO import StringIO
607+import errno
608 import os
609 import socket
610+import subprocess
611+import sys
612 import threading
613
614 import bzrlib
615 from bzrlib import (
616 bzrdir,
617+ debug,
618 errors,
619 osutils,
620 tests,
621@@ -49,6 +53,29 @@
622 from bzrlib.transport.http import SmartClientHTTPMediumRequest
623
624
625+def create_file_pipes():
626+ r, w = os.pipe()
627+ # These must be opened without buffering, or we get undefined results
628+ rf = os.fdopen(r, 'rb', 0)
629+ wf = os.fdopen(w, 'wb', 0)
630+ return rf, wf
631+
632+
633+def portable_socket_pair():
634+ """Return a pair of TCP sockets connected to each other.
635+
636+ Unlike socket.socketpair, this should work on Windows.
637+ """
638+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
639+ listen_sock.bind(('127.0.0.1', 0))
640+ listen_sock.listen(1)
641+ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
642+ client_sock.connect(listen_sock.getsockname())
643+ server_sock, addr = listen_sock.accept()
644+ listen_sock.close()
645+ return server_sock, client_sock
646+
647+
648 class StringIOSSHVendor(object):
649 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
650
651@@ -63,6 +90,27 @@
652 return StringIOSSHConnection(self)
653
654
655+class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
656+ """The first connection will be considered closed.
657+
658+ The second connection will succeed normally.
659+ """
660+
661+ def __init__(self, read_from, write_to, fail_at_write=True):
662+ super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
663+ write_to)
664+ self.fail_at_write = fail_at_write
665+ self._first = True
666+
667+ def connect_ssh(self, username, password, host, port, command):
668+ self.calls.append(('connect_ssh', username, password, host, port,
669+ command))
670+ if self._first:
671+ self._first = False
672+ return ClosedSSHConnection(self)
673+ return StringIOSSHConnection(self)
674+
675+
676 class StringIOSSHConnection(object):
677 """A SSH connection that uses StringIO to buffer writes and answer reads."""
678
679@@ -71,11 +119,36 @@
680
681 def close(self):
682 self.vendor.calls.append(('close', ))
683+ self.vendor.read_from.close()
684+ self.vendor.write_to.close()
685
686 def get_filelike_channels(self):
687 return self.vendor.read_from, self.vendor.write_to
688
689
690+class ClosedSSHConnection(object):
691+ """An SSH connection that just has closed channels."""
692+
693+ def __init__(self, vendor):
694+ self.vendor = vendor
695+
696+ def close(self):
697+ self.vendor.calls.append(('close', ))
698+
699+ def get_filelike_channels(self):
700+ # We create matching pipes, and then close the ssh side
701+ bzr_read, ssh_write = create_file_pipes()
702+ # We always fail when bzr goes to read
703+ ssh_write.close()
704+ if self.vendor.fail_at_write:
705+ # If set, we'll also fail when bzr goes to write
706+ ssh_read, bzr_write = create_file_pipes()
707+ ssh_read.close()
708+ else:
709+ bzr_write = self.vendor.write_to
710+ return bzr_read, bzr_write
711+
712+
713 class _InvalidHostnameFeature(tests.Feature):
714 """Does 'non_existent.invalid' fail to resolve?
715
716@@ -171,6 +244,91 @@
717 client_medium._accept_bytes('abc')
718 self.assertEqual('abc', output.getvalue())
719
720+ def test_simple_pipes__accept_bytes_subprocess_closed(self):
721+ # It is unfortunate that we have to use Popen for this. However,
722+ # os.pipe() does not behave the same as subprocess.Popen().
723+ # On Windows, if you use os.pipe() and close the write side,
724+ # read.read() hangs. On Linux, read.read() returns the empty string.
725+ p = subprocess.Popen([sys.executable, '-c',
726+ 'import sys\n'
727+ 'sys.stdout.write(sys.stdin.read(4))\n'
728+ 'sys.stdout.close()\n'],
729+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
730+ client_medium = medium.SmartSimplePipesClientMedium(
731+ p.stdout, p.stdin, 'base')
732+ client_medium._accept_bytes('abc\n')
733+ self.assertEqual('abc', client_medium._read_bytes(3))
734+ p.wait()
735+ # While writing to the underlying pipe,
736+ # Windows py2.6.6 we get IOError(EINVAL)
737+ # Lucid py2.6.5, we get IOError(EPIPE)
738+ # In both cases, it should be wrapped to ConnectionReset
739+ self.assertRaises(errors.ConnectionReset,
740+ client_medium._accept_bytes, 'more')
741+
742+ def test_simple_pipes__accept_bytes_pipe_closed(self):
743+ child_read, client_write = create_file_pipes()
744+ client_medium = medium.SmartSimplePipesClientMedium(
745+ None, client_write, 'base')
746+ client_medium._accept_bytes('abc\n')
747+ self.assertEqual('abc\n', child_read.read(4))
748+ # While writing to the underlying pipe,
749+ # Windows py2.6.6 we get IOError(EINVAL)
750+ # Lucid py2.6.5, we get IOError(EPIPE)
751+ # In both cases, it should be wrapped to ConnectionReset
752+ child_read.close()
753+ self.assertRaises(errors.ConnectionReset,
754+ client_medium._accept_bytes, 'more')
755+
756+ def test_simple_pipes__flush_pipe_closed(self):
757+ child_read, client_write = create_file_pipes()
758+ client_medium = medium.SmartSimplePipesClientMedium(
759+ None, client_write, 'base')
760+ client_medium._accept_bytes('abc\n')
761+ child_read.close()
762+ # Even though the pipe is closed, flush on the write side seems to be a
763+ # no-op, rather than a failure.
764+ client_medium._flush()
765+
766+ def test_simple_pipes__flush_subprocess_closed(self):
767+ p = subprocess.Popen([sys.executable, '-c',
768+ 'import sys\n'
769+ 'sys.stdout.write(sys.stdin.read(4))\n'
770+ 'sys.stdout.close()\n'],
771+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
772+ client_medium = medium.SmartSimplePipesClientMedium(
773+ p.stdout, p.stdin, 'base')
774+ client_medium._accept_bytes('abc\n')
775+ p.wait()
776+ # Even though the child process is dead, flush seems to be a no-op.
777+ client_medium._flush()
778+
779+ def test_simple_pipes__read_bytes_pipe_closed(self):
780+ child_read, client_write = create_file_pipes()
781+ client_medium = medium.SmartSimplePipesClientMedium(
782+ child_read, client_write, 'base')
783+ client_medium._accept_bytes('abc\n')
784+ client_write.close()
785+ self.assertEqual('abc\n', client_medium._read_bytes(4))
786+ self.assertEqual('', client_medium._read_bytes(4))
787+
788+ def test_simple_pipes__read_bytes_subprocess_closed(self):
789+ p = subprocess.Popen([sys.executable, '-c',
790+ 'import sys\n'
791+ 'if sys.platform == "win32":\n'
792+ ' import msvcrt, os\n'
793+ ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
794+ ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
795+ 'sys.stdout.write(sys.stdin.read(4))\n'
796+ 'sys.stdout.close()\n'],
797+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
798+ client_medium = medium.SmartSimplePipesClientMedium(
799+ p.stdout, p.stdin, 'base')
800+ client_medium._accept_bytes('abc\n')
801+ p.wait()
802+ self.assertEqual('abc\n', client_medium._read_bytes(4))
803+ self.assertEqual('', client_medium._read_bytes(4))
804+
805 def test_simple_pipes_client_disconnect_does_nothing(self):
806 # calling disconnect does nothing.
807 input = StringIO()
808@@ -556,6 +714,28 @@
809 request.finished_reading()
810 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
811
812+ def test_reset(self):
813+ server_sock, client_sock = portable_socket_pair()
814+ # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
815+ # bzr where it exists.
816+ client_medium = medium.SmartTCPClientMedium(None, None, None)
817+ client_medium._socket = client_sock
818+ client_medium._connected = True
819+ req = client_medium.get_request()
820+ self.assertRaises(errors.TooManyConcurrentRequests,
821+ client_medium.get_request)
822+ client_medium.reset()
823+ # The stream should be reset, marked as disconnected, though ready for
824+ # us to make a new request
825+ self.assertFalse(client_medium._connected)
826+ self.assertIs(None, client_medium._socket)
827+ try:
828+ self.assertEqual('', client_sock.recv(1))
829+ except socket.error, e:
830+ if e.errno not in (errno.EBADF,):
831+ raise
832+ req = client_medium.get_request()
833+
834
835 class RemoteTransportTests(TestCaseWithSmartMedium):
836
837@@ -609,20 +789,6 @@
838 super(TestSmartServerStreamMedium, self).setUp()
839 self._captureVar('BZR_NO_SMART_VFS', None)
840
841- def portable_socket_pair(self):
842- """Return a pair of TCP sockets connected to each other.
843-
844- Unlike socket.socketpair, this should work on Windows.
845- """
846- listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
847- listen_sock.bind(('127.0.0.1', 0))
848- listen_sock.listen(1)
849- client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
850- client_sock.connect(listen_sock.getsockname())
851- server_sock, addr = listen_sock.accept()
852- listen_sock.close()
853- return server_sock, client_sock
854-
855 def test_smart_query_version(self):
856 """Feed a canned query version to a server"""
857 # wire-to-wire, using the whole stack
858@@ -687,7 +853,7 @@
859
860 def test_socket_stream_with_bulk_data(self):
861 sample_request_bytes = 'command\n9\nbulk datadone\n'
862- server_sock, client_sock = self.portable_socket_pair()
863+ server_sock, client_sock = portable_socket_pair()
864 server = medium.SmartServerSocketStreamMedium(
865 server_sock, None)
866 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
867@@ -706,7 +872,7 @@
868 self.assertTrue(server.finished)
869
870 def test_socket_stream_shutdown_detection(self):
871- server_sock, client_sock = self.portable_socket_pair()
872+ server_sock, client_sock = portable_socket_pair()
873 client_sock.close()
874 server = medium.SmartServerSocketStreamMedium(
875 server_sock, None)
876@@ -726,7 +892,7 @@
877 rest_of_request_bytes = 'lo\n'
878 expected_response = (
879 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
880- server_sock, client_sock = self.portable_socket_pair()
881+ server_sock, client_sock = portable_socket_pair()
882 server = medium.SmartServerSocketStreamMedium(
883 server_sock, None)
884 client_sock.sendall(incomplete_request_bytes)
885@@ -802,7 +968,7 @@
886 # _serve_one_request should still process both of them as if they had
887 # been received separately.
888 sample_request_bytes = 'command\n'
889- server_sock, client_sock = self.portable_socket_pair()
890+ server_sock, client_sock = portable_socket_pair()
891 server = medium.SmartServerSocketStreamMedium(
892 server_sock, None)
893 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
894@@ -839,7 +1005,7 @@
895 self.assertTrue(server.finished)
896
897 def test_socket_stream_error_handling(self):
898- server_sock, client_sock = self.portable_socket_pair()
899+ server_sock, client_sock = portable_socket_pair()
900 server = medium.SmartServerSocketStreamMedium(
901 server_sock, None)
902 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
903@@ -860,7 +1026,7 @@
904 self.assertEqual('', from_server.getvalue())
905
906 def test_socket_stream_keyboard_interrupt_handling(self):
907- server_sock, client_sock = self.portable_socket_pair()
908+ server_sock, client_sock = portable_socket_pair()
909 server = medium.SmartServerSocketStreamMedium(
910 server_sock, None)
911 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
912@@ -877,7 +1043,7 @@
913 return server._build_protocol()
914
915 def build_protocol_socket(self, bytes):
916- server_sock, client_sock = self.portable_socket_pair()
917+ server_sock, client_sock = portable_socket_pair()
918 server = medium.SmartServerSocketStreamMedium(
919 server_sock, None)
920 client_sock.sendall(bytes)
921@@ -3214,6 +3380,114 @@
922 # encoder.
923
924
925+class Test_SmartClientRequest(tests.TestCase):
926+
927+ def make_client_with_failing_medium(self, fail_at_write=True):
928+ response = StringIO()
929+ output = StringIO()
930+ vendor = FirstRejectedStringIOSSHVendor(response, output,
931+ fail_at_write=fail_at_write)
932+ client_medium = medium.SmartSSHClientMedium(
933+ 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
934+ 'bzr')
935+ smart_client = client._SmartClient(client_medium, headers={})
936+ return output, vendor, smart_client
937+
938+ def test__send_no_retry_pipes(self):
939+ client_read, server_write = create_file_pipes()
940+ server_read, client_write = create_file_pipes()
941+ client_medium = medium.SmartSimplePipesClientMedium(client_read,
942+ client_write, base='/')
943+ smart_client = client._SmartClient(client_medium)
944+ smart_request = client._SmartClientRequest(smart_client,
945+ 'hello', ())
946+ # Close the server side
947+ server_read.close()
948+ encoder, response_handler = smart_request._construct_protocol(3)
949+ self.assertRaises(errors.ConnectionReset,
950+ smart_request._send_no_retry, encoder)
951+
952+ def test__send_read_response_sockets(self):
953+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
954+ listen_sock.bind(('127.0.0.1', 0))
955+ listen_sock.listen(1)
956+ host, port = listen_sock.getsockname()
957+ client_medium = medium.SmartTCPClientMedium(host, port, '/')
958+ client_medium._ensure_connection()
959+ smart_client = client._SmartClient(client_medium)
960+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
961+ # Accept the connection, but don't actually talk to the client.
962+ server_sock, _ = listen_sock.accept()
963+ server_sock.close()
964+ # Sockets buffer and don't really notice that the server has closed the
965+ # connection until we try to read again.
966+ handler = smart_request._send(3)
967+ self.assertRaises(errors.ConnectionReset,
968+ handler.read_response_tuple, expect_body=False)
969+
970+ def test__send_retries_on_write(self):
971+ output, vendor, smart_client = self.make_client_with_failing_medium()
972+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
973+ handler = smart_request._send(3)
974+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
975+ '\x00\x00\x00\x02de' # empty headers
976+ 's\x00\x00\x00\tl5:helloee',
977+ output.getvalue())
978+ self.assertEqual(
979+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
980+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
981+ ('close',),
982+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
983+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
984+ ],
985+ vendor.calls)
986+
987+ def test__send_doesnt_retry_read_failure(self):
988+ output, vendor, smart_client = self.make_client_with_failing_medium(
989+ fail_at_write=False)
990+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
991+ handler = smart_request._send(3)
992+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
993+ '\x00\x00\x00\x02de' # empty headers
994+ 's\x00\x00\x00\tl5:helloee',
995+ output.getvalue())
996+ self.assertEqual(
997+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
998+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
999+ ],
1000+ vendor.calls)
1001+ self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
1002+
1003+ def test__send_doesnt_retry_body_stream(self):
1004+ # We don't know how much of body_stream would get iterated as part of
1005+ # _send before it failed to actually send the request, so we
1006+ # just always fail in this condition.
1007+ output, vendor, smart_client = self.make_client_with_failing_medium()
1008+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1009+ body_stream=['a', 'b'])
1010+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1011+ # We got one connect, but it fails, so we disconnect, but we don't
1012+ # retry it
1013+ self.assertEqual(
1014+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1015+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1016+ ('close',),
1017+ ],
1018+ vendor.calls)
1019+
1020+ def test__send_disabled_retry(self):
1021+ debug.debug_flags.add('noretry')
1022+ output, vendor, smart_client = self.make_client_with_failing_medium()
1023+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1024+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1025+ self.assertEqual(
1026+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1027+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1028+ ('close',),
1029+ ],
1030+ vendor.calls)
1031+
1032+
1033 class LengthPrefixedBodyDecoder(tests.TestCase):
1034
1035 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches