Merge lp:~jameinel/bzr/2.1-all-reconnect-819604 into lp:bzr/2.1

Proposed by John A Meinel
Status: Rejected
Rejected by: John A Meinel
Proposed branch: lp:~jameinel/bzr/2.1-all-reconnect-819604
Merge into: lp:bzr/2.1
Diff against target: 1807 lines (+970/-289)
11 files modified
NEWS (+5/-0)
bzrlib/help_topics/en/debug-flags.txt (+2/-0)
bzrlib/osutils.py (+71/-13)
bzrlib/smart/client.py (+208/-86)
bzrlib/smart/medium.py (+69/-62)
bzrlib/smart/protocol.py (+15/-6)
bzrlib/smart/request.py (+142/-98)
bzrlib/tests/test_bundle.py (+6/-3)
bzrlib/tests/test_osutils.py (+39/-0)
bzrlib/tests/test_smart_request.py (+10/-0)
bzrlib/tests/test_smart_transport.py (+403/-21)
To merge this branch: bzr merge lp:~jameinel/bzr/2.1-all-reconnect-819604
Reviewer Review Type Date Requested Status
Richard Wilbur Approve
Review via email: mp+123901@code.launchpad.net

Commit message

Bring the code to have clients reconnect-on-disconnect to bzr-2.1 (bug #819604)

Description of the change

This is a rollup of all of my client-reconnect patches against bzr-2.1 which is the version in our previous LTS Lucid.

The patch ends up pretty big, as I had originally split it up into a few patches for review. However, most of this is identical to what we have in bzr-2.5.

The specific differences from 2.5 at this point are:

1) No ConnectionTimeout code. This was meant for the server side, not the client. So no need to bring that code in now.
2) No AlreadyConnected client medium. This was designed around communicating with the SSH subprocess using socketpair. However, I didn't want to bring in all of that code (and it was even a source of one of the bugs we had to fix in 2.5).
3) A bunch of more verbs that we added to bzrlib/smart/request.py but all the requests that exist in 2.1 have identical signatures in this code.

This is where I targeted my original changes, so I went back to this level to finish the job.

I'm reasonably comfortable with this patch at this point. I wish it were a bit smaller for a stable series, but the code is quite well tested on newer versions.

To post a comment you must log in.
Revision history for this message
Richard Wilbur (richard-wilbur) wrote :

Looks like a good collection of updates for the LTS 2.1! Thanks for porting the goodness back from 2.5 and fixing the test that broke.
+1

A question regarding the comment in bzrlib/smart/protocol.py:_encode_tuple():
Is the Unicode response issue really an aberrant case? If filesystem names can appear in the "args" list of _encode_tuple, it seems that having Unicode entries would be a matter of course--especially in locales other than "C".

review: Approve
Revision history for this message
John A Meinel (jameinel) wrote :

sent to pqm by email

Revision history for this message
John A Meinel (jameinel) wrote :

This has sat for too long, that I don't think its value has held. It doesn't land directly (some test suite failures, etc). And the patch is much bigger than the later versions of bzr because of the internal restructuring that has happened since 2.1. So I'm just rejecting this to get it out of the queue.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2012-02-02 14:08:45 +0000
3+++ NEWS 2012-09-12 09:29:20 +0000
4@@ -43,6 +43,11 @@
5
6 (John Arbash Meinel, #609187, #812928)
7
8+* Teach the bzr client how to reconnect if we get ``ConnectionReset``
9+ while making an RPC request. This doesn't handle all possible network
10+ disconnects, but it should at least handle when the server is asked to
11+ shutdown gracefully. (John Arbash Meinel, #819604)
12+
13
14 Improvements
15 ************
16
17=== modified file 'bzrlib/help_topics/en/debug-flags.txt'
18--- bzrlib/help_topics/en/debug-flags.txt 2010-01-05 04:30:07 +0000
19+++ bzrlib/help_topics/en/debug-flags.txt 2012-09-12 09:29:20 +0000
20@@ -24,6 +24,8 @@
21 -Dindex Trace major index operations.
22 -Dknit Trace knit operations.
23 -Dlock Trace when lockdir locks are taken or released.
24+-Dnoretry If a connection is reset, fail immediately rather than
25+ retrying the request.
26 -Dprogress Trace progress bar operations.
27 -Dmerge Emit information for debugging merges.
28 -Dno_apport Don't use apport to report crashes.
29
30=== modified file 'bzrlib/osutils.py'
31--- bzrlib/osutils.py 2010-05-27 04:00:01 +0000
32+++ bzrlib/osutils.py 2012-09-12 09:29:20 +0000
33@@ -40,6 +40,7 @@
34 rmtree,
35 )
36 import signal
37+import socket
38 import subprocess
39 import tempfile
40 from tempfile import (
41@@ -1929,40 +1930,97 @@
42 return socket.gethostname().decode(get_user_encoding())
43
44
45-def recv_all(socket, bytes):
46+# We must not read/write any more than 64k at a time from/to a socket so we
47+# don't risk "no buffer space available" errors on some platforms. Windows in
48+# particular is likely to throw WSAECONNABORTED or WSAENOBUFS if given too much
49+# data at once.
50+MAX_SOCKET_CHUNK = 64 * 1024
51+
52+_end_of_stream_errors = [errno.ECONNRESET, errno.EPIPE, errno.EINVAL]
53+for _eno in ['WSAECONNRESET', 'WSAECONNABORTED']:
54+ _eno = getattr(errno, _eno, None)
55+ if _eno is not None:
56+ _end_of_stream_errors.append(_eno)
57+del _eno
58+
59+
60+def read_bytes_from_socket(sock, report_activity=None,
61+ max_read_size=MAX_SOCKET_CHUNK):
62+ """Read up to max_read_size of bytes from sock and notify of progress.
63+
64+ Translates "Connection reset by peer" into file-like EOF (return an
65+ empty string rather than raise an error), and repeats the recv if
66+ interrupted by a signal.
67+ """
68+ while 1:
69+ try:
70+ bytes = sock.recv(max_read_size)
71+ except socket.error, e:
72+ eno = e.args[0]
73+ if eno in _end_of_stream_errors:
74+ # The connection was closed by the other side. Callers expect
75+ # an empty string to signal end-of-stream.
76+ return ""
77+ elif eno == errno.EINTR:
78+ # Retry the interrupted recv.
79+ continue
80+ raise
81+ else:
82+ if report_activity is not None:
83+ report_activity(len(bytes), 'read')
84+ return bytes
85+
86+
87+def recv_all(socket, count):
88 """Receive an exact number of bytes.
89
90 Regular Socket.recv() may return less than the requested number of bytes,
91- dependning on what's in the OS buffer. MSG_WAITALL is not available
92+ depending on what's in the OS buffer. MSG_WAITALL is not available
93 on all platforms, but this should work everywhere. This will return
94 less than the requested amount if the remote end closes.
95
96 This isn't optimized and is intended mostly for use in testing.
97 """
98 b = ''
99- while len(b) < bytes:
100- new = until_no_eintr(socket.recv, bytes - len(b))
101+ while len(b) < count:
102+ new = read_bytes_from_socket(socket, None, count - len(b))
103 if new == '':
104 break # eof
105 b += new
106 return b
107
108
109-def send_all(socket, bytes, report_activity=None):
110+def send_all(sock, bytes, report_activity=None):
111 """Send all bytes on a socket.
112
113- Regular socket.sendall() can give socket error 10053 on Windows. This
114- implementation sends no more than 64k at a time, which avoids this problem.
115+ Breaks large blocks in smaller chunks to avoid buffering limitations on
116+ some platforms, and catches EINTR which may be thrown if the send is
117+ interrupted by a signal.
118+
119+ This is preferred to socket.sendall(), because it avoids portability bugs
120+ and provides activity reporting.
121
122 :param report_activity: Call this as bytes are read, see
123 Transport._report_activity
124 """
125- chunk_size = 2**16
126- for pos in xrange(0, len(bytes), chunk_size):
127- block = bytes[pos:pos+chunk_size]
128- if report_activity is not None:
129- report_activity(len(block), 'write')
130- until_no_eintr(socket.sendall, block)
131+ sent_total = 0
132+ byte_count = len(bytes)
133+ while sent_total < byte_count:
134+ try:
135+ sent = sock.send(buffer(bytes, sent_total, MAX_SOCKET_CHUNK))
136+ except (socket.error, IOError), e:
137+ if e.args[0] in _end_of_stream_errors:
138+ raise errors.ConnectionReset(
139+ "Error trying to write to socket", e)
140+ if e.args[0] != errno.EINTR:
141+ raise
142+ else:
143+ if sent == 0:
144+ raise errors.ConnectionReset('Sending to %s returned 0 bytes'
145+ % (sock,))
146+ sent_total += sent
147+ if report_activity is not None:
148+ report_activity(sent, 'write')
149
150
151 def dereference_path(path):
152
153=== modified file 'bzrlib/smart/client.py'
154--- bzrlib/smart/client.py 2010-02-17 17:11:16 +0000
155+++ bzrlib/smart/client.py 2012-09-12 09:29:20 +0000
156@@ -14,12 +14,18 @@
157 # along with this program; if not, write to the Free Software
158 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
159
160+from bzrlib import lazy_import
161+lazy_import.lazy_import(globals(), """
162+from bzrlib.smart import request as _mod_request
163+""")
164+
165 import bzrlib
166 from bzrlib.smart import message, protocol
167-from bzrlib.trace import warning
168 from bzrlib import (
169+ debug,
170 errors,
171 hooks,
172+ trace,
173 )
174
175
176@@ -39,93 +45,12 @@
177 def __repr__(self):
178 return '%s(%r)' % (self.__class__.__name__, self._medium)
179
180- def _send_request(self, protocol_version, method, args, body=None,
181- readv_body=None, body_stream=None):
182- encoder, response_handler = self._construct_protocol(
183- protocol_version)
184- encoder.set_headers(self._headers)
185- if body is not None:
186- if readv_body is not None:
187- raise AssertionError(
188- "body and readv_body are mutually exclusive.")
189- if body_stream is not None:
190- raise AssertionError(
191- "body and body_stream are mutually exclusive.")
192- encoder.call_with_body_bytes((method, ) + args, body)
193- elif readv_body is not None:
194- if body_stream is not None:
195- raise AssertionError(
196- "readv_body and body_stream are mutually exclusive.")
197- encoder.call_with_body_readv_array((method, ) + args, readv_body)
198- elif body_stream is not None:
199- encoder.call_with_body_stream((method, ) + args, body_stream)
200- else:
201- encoder.call(method, *args)
202- return response_handler
203-
204- def _run_call_hooks(self, method, args, body, readv_body):
205- if not _SmartClient.hooks['call']:
206- return
207- params = CallHookParams(method, args, body, readv_body, self._medium)
208- for hook in _SmartClient.hooks['call']:
209- hook(params)
210-
211 def _call_and_read_response(self, method, args, body=None, readv_body=None,
212 body_stream=None, expect_response_body=True):
213- self._run_call_hooks(method, args, body, readv_body)
214- if self._medium._protocol_version is not None:
215- response_handler = self._send_request(
216- self._medium._protocol_version, method, args, body=body,
217- readv_body=readv_body, body_stream=body_stream)
218- return (response_handler.read_response_tuple(
219- expect_body=expect_response_body),
220- response_handler)
221- else:
222- for protocol_version in [3, 2]:
223- if protocol_version == 2:
224- # If v3 doesn't work, the remote side is older than 1.6.
225- self._medium._remember_remote_is_before((1, 6))
226- response_handler = self._send_request(
227- protocol_version, method, args, body=body,
228- readv_body=readv_body, body_stream=body_stream)
229- try:
230- response_tuple = response_handler.read_response_tuple(
231- expect_body=expect_response_body)
232- except errors.UnexpectedProtocolVersionMarker, err:
233- # TODO: We could recover from this without disconnecting if
234- # we recognise the protocol version.
235- warning(
236- 'Server does not understand Bazaar network protocol %d,'
237- ' reconnecting. (Upgrade the server to avoid this.)'
238- % (protocol_version,))
239- self._medium.disconnect()
240- continue
241- except errors.ErrorFromSmartServer:
242- # If we received an error reply from the server, then it
243- # must be ok with this protocol version.
244- self._medium._protocol_version = protocol_version
245- raise
246- else:
247- self._medium._protocol_version = protocol_version
248- return response_tuple, response_handler
249- raise errors.SmartProtocolError(
250- 'Server is not a Bazaar server: ' + str(err))
251-
252- def _construct_protocol(self, version):
253- request = self._medium.get_request()
254- if version == 3:
255- request_encoder = protocol.ProtocolThreeRequester(request)
256- response_handler = message.ConventionalResponseHandler()
257- response_proto = protocol.ProtocolThreeDecoder(
258- response_handler, expect_version_marker=True)
259- response_handler.setProtoAndMediumRequest(response_proto, request)
260- elif version == 2:
261- request_encoder = protocol.SmartClientRequestProtocolTwo(request)
262- response_handler = request_encoder
263- else:
264- request_encoder = protocol.SmartClientRequestProtocolOne(request)
265- response_handler = request_encoder
266- return request_encoder, response_handler
267+ request = _SmartClientRequest(self, method, args, body=body,
268+ readv_body=readv_body, body_stream=body_stream,
269+ expect_response_body=expect_response_body)
270+ return request.call_and_read_response()
271
272 def call(self, method, *args):
273 """Call a method on the remote server."""
274@@ -191,6 +116,203 @@
275 return self._medium.remote_path_from_transport(transport)
276
277
278+class _SmartClientRequest(object):
279+ """Encapsulate the logic for a single request.
280+
281+ This class handles things like reconnecting and sending the request a
282+ second time when the connection is reset in the middle. It also handles the
283+ multiple requests that get made if we don't know what protocol the server
284+ supports yet.
285+
286+ Generally, you build up one of these objects, passing in the arguments that
287+ you want to send to the server, and then use 'call_and_read_response' to
288+ get the response from the server.
289+ """
290+
291+ def __init__(self, client, method, args, body=None, readv_body=None,
292+ body_stream=None, expect_response_body=True):
293+ self.client = client
294+ self.method = method
295+ self.args = args
296+ self.body = body
297+ self.readv_body = readv_body
298+ self.body_stream = body_stream
299+ self.expect_response_body = expect_response_body
300+
301+ def call_and_read_response(self):
302+ """Send the request to the server, and read the initial response.
303+
304+ This doesn't read all of the body content of the response, instead it
305+ returns (response_tuple, response_handler). response_tuple is the 'ok',
306+ or 'error' information, and 'response_handler' can be used to get the
307+ content stream out.
308+ """
309+ self._run_call_hooks()
310+ protocol_version = self.client._medium._protocol_version
311+ if protocol_version is None:
312+ return self._call_determining_protocol_version()
313+ else:
314+ return self._call(protocol_version)
315+
316+ def _is_safe_to_send_twice(self):
317+ """Check if the current method is re-entrant safe."""
318+ if self.body_stream is not None or 'noretry' in debug.debug_flags:
319+ # We can't restart a body stream that has already been consumed.
320+ return False
321+ request_type = _mod_request.request_handlers.get_info(self.method)
322+ if request_type in ('read', 'idem', 'semi'):
323+ return True
324+ # If we have gotten this far, 'stream' cannot be retried, because we
325+ # already consumed the local stream.
326+ if request_type in ('semivfs', 'mutate', 'stream'):
327+ return False
328+ trace.mutter('Unknown request type: %s for method %s'
329+ % (request_type, self.method))
330+ return False
331+
332+ def _run_call_hooks(self):
333+ if not _SmartClient.hooks['call']:
334+ return
335+ params = CallHookParams(self.method, self.args, self.body,
336+ self.readv_body, self.client._medium)
337+ for hook in _SmartClient.hooks['call']:
338+ hook(params)
339+
340+ def _call(self, protocol_version):
341+ """We know the protocol version.
342+
343+ So this just sends the request, and then reads the response. This is
344+ where the code will be to retry requests if the connection is closed.
345+ """
346+ response_handler = self._send(protocol_version)
347+ try:
348+ response_tuple = response_handler.read_response_tuple(
349+ expect_body=self.expect_response_body)
350+ except errors.ConnectionReset, e:
351+ self.client._medium.reset()
352+ if not self._is_safe_to_send_twice():
353+ raise
354+ trace.warning('ConnectionReset reading response for %r, retrying'
355+ % (self.method,))
356+ trace.log_exception_quietly()
357+ encoder, response_handler = self._construct_protocol(
358+ protocol_version)
359+ self._send_no_retry(encoder)
360+ response_tuple = response_handler.read_response_tuple(
361+ expect_body=self.expect_response_body)
362+ return (response_tuple, response_handler)
363+
364+ def _call_determining_protocol_version(self):
365+ """Determine what protocol the remote server supports.
366+
367+ We do this by placing a request in the most recent protocol, and
368+ handling the UnexpectedProtocolVersionMarker from the server.
369+ """
370+ for protocol_version in [3, 2]:
371+ if protocol_version == 2:
372+ # If v3 doesn't work, the remote side is older than 1.6.
373+ self.client._medium._remember_remote_is_before((1, 6))
374+ try:
375+ response_tuple, response_handler = self._call(protocol_version)
376+ except errors.UnexpectedProtocolVersionMarker, err:
377+ # TODO: We could recover from this without disconnecting if
378+ # we recognise the protocol version.
379+ trace.warning(
380+ 'Server does not understand Bazaar network protocol %d,'
381+ ' reconnecting. (Upgrade the server to avoid this.)'
382+ % (protocol_version,))
383+ self.client._medium.disconnect()
384+ continue
385+ except errors.ErrorFromSmartServer:
386+ # If we received an error reply from the server, then it
387+ # must be ok with this protocol version.
388+ self.client._medium._protocol_version = protocol_version
389+ raise
390+ else:
391+ self.client._medium._protocol_version = protocol_version
392+ return response_tuple, response_handler
393+ raise errors.SmartProtocolError(
394+ 'Server is not a Bazaar server: ' + str(err))
395+
396+ def _construct_protocol(self, version):
397+ """Build the encoding stack for a given protocol version."""
398+ request = self.client._medium.get_request()
399+ if version == 3:
400+ request_encoder = protocol.ProtocolThreeRequester(request)
401+ response_handler = message.ConventionalResponseHandler()
402+ response_proto = protocol.ProtocolThreeDecoder(
403+ response_handler, expect_version_marker=True)
404+ response_handler.setProtoAndMediumRequest(response_proto, request)
405+ elif version == 2:
406+ request_encoder = protocol.SmartClientRequestProtocolTwo(request)
407+ response_handler = request_encoder
408+ else:
409+ request_encoder = protocol.SmartClientRequestProtocolOne(request)
410+ response_handler = request_encoder
411+ return request_encoder, response_handler
412+
413+ def _send(self, protocol_version):
414+ """Encode the request, and send it to the server.
415+
416+ This will retry a request if we get a ConnectionReset while sending the
417+ request to the server. (Unless we have a body_stream that we have
418+ already started consuming, since we can't restart body_streams)
419+
420+ :return: response_handler as defined by _construct_protocol
421+ """
422+ encoder, response_handler = self._construct_protocol(protocol_version)
423+ try:
424+ self._send_no_retry(encoder)
425+ except errors.ConnectionReset, e:
426+ # If we fail during the _send_no_retry phase, then we can
427+ # be confident that the server did not get our request, because we
428+ # haven't started waiting for the reply yet. So try the request
429+ # again. We only issue a single retry, because if the connection
430+ # really is down, there is no reason to loop endlessly.
431+
432+ # Connection is dead, so close our end of it.
433+ self.client._medium.reset()
434+ if (('noretry' in debug.debug_flags)
435+ or (self.body_stream is not None
436+ and encoder.body_stream_started)):
437+ # We can't restart a body_stream that has been partially
438+ # consumed, so we don't retry.
439+ # Note: We don't have to worry about
440+ # SmartClientRequestProtocolOne or Two, because they don't
441+ # support client-side body streams.
442+ raise
443+ trace.warning('ConnectionReset calling %r, retrying'
444+ % (self.method,))
445+ trace.log_exception_quietly()
446+ encoder, response_handler = self._construct_protocol(
447+ protocol_version)
448+ self._send_no_retry(encoder)
449+ return response_handler
450+
451+ def _send_no_retry(self, encoder):
452+ """Just encode the request and try to send it."""
453+ encoder.set_headers(self.client._headers)
454+ if self.body is not None:
455+ if self.readv_body is not None:
456+ raise AssertionError(
457+ "body and readv_body are mutually exclusive.")
458+ if self.body_stream is not None:
459+ raise AssertionError(
460+ "body and body_stream are mutually exclusive.")
461+ encoder.call_with_body_bytes((self.method, ) + self.args, self.body)
462+ elif self.readv_body is not None:
463+ if self.body_stream is not None:
464+ raise AssertionError(
465+ "readv_body and body_stream are mutually exclusive.")
466+ encoder.call_with_body_readv_array((self.method, ) + self.args,
467+ self.readv_body)
468+ elif self.body_stream is not None:
469+ encoder.call_with_body_stream((self.method, ) + self.args,
470+ self.body_stream)
471+ else:
472+ encoder.call(self.method, *self.args)
473+
474+
475 class SmartClientHooks(hooks.Hooks):
476
477 def __init__(self):
478
479=== modified file 'bzrlib/smart/medium.py'
480--- bzrlib/smart/medium.py 2010-04-27 06:40:37 +0000
481+++ bzrlib/smart/medium.py 2012-09-12 09:29:20 +0000
482@@ -1,4 +1,4 @@
483-# Copyright (C) 2006-2010 Canonical Ltd
484+# Copyright (C) 2006-2012 Canonical Ltd
485 #
486 # This program is free software; you can redistribute it and/or modify
487 # it under the terms of the GNU General Public License as published by
488@@ -50,11 +50,11 @@
489 #usually already imported, and getting IllegalScoperReplacer on it here.
490 from bzrlib import osutils
491
492-# We must not read any more than 64k at a time so we don't risk "no buffer
493-# space available" errors on some platforms. Windows in particular is likely
494-# to give error 10053 or 10055 if we read more than 64k from a socket.
495-_MAX_READ_SIZE = 64 * 1024
496-
497+# Throughout this module buffer size parameters are either limited to be at
498+# most _MAX_READ_SIZE, or are ignored and _MAX_READ_SIZE is used instead.
499+# For this module's purposes, MAX_SOCKET_CHUNK is a reasonable size for reads
500+# from non-sockets as well.
501+_MAX_READ_SIZE = osutils.MAX_SOCKET_CHUNK
502
503 def _get_protocol_factory_for_bytes(bytes):
504 """Determine the right protocol factory for 'bytes'.
505@@ -178,6 +178,14 @@
506 ui.ui_factory.report_transport_activity(self, bytes, direction)
507
508
509+_bad_file_descriptor = (errno.EBADF,)
510+if sys.platform == 'win32':
511+ # Given on Windows if you pass a closed socket to select.select. Probably
512+ # also given if you pass a file handle to select.
513+ WSAENOTSOCK = 10038
514+ _bad_file_descriptor += (WSAENOTSOCK,)
515+
516+
517 class SmartServerStreamMedium(SmartMedium):
518 """Handles smart commands coming over a stream.
519
520@@ -241,6 +249,8 @@
521
522 :param protocol: a SmartServerRequestProtocol.
523 """
524+ if protocol is None:
525+ return
526 try:
527 self._serve_one_request_unguarded(protocol)
528 except KeyboardInterrupt:
529@@ -276,9 +286,9 @@
530 def _serve_one_request_unguarded(self, protocol):
531 while protocol.next_read_size():
532 # We can safely try to read large chunks. If there is less data
533- # than _MAX_READ_SIZE ready, the socket wil just return a short
534- # read immediately rather than block.
535- bytes = self.read_bytes(_MAX_READ_SIZE)
536+ # than MAX_SOCKET_CHUNK ready, the socket will just return a
537+ # short read immediately rather than block.
538+ bytes = self.read_bytes(osutils.MAX_SOCKET_CHUNK)
539 if bytes == '':
540 self.finished = True
541 return
542@@ -287,13 +297,13 @@
543 self._push_back(protocol.unused_data)
544
545 def _read_bytes(self, desired_count):
546- return _read_bytes_from_socket(
547- self.socket.recv, desired_count, self._report_activity)
548+ return osutils.read_bytes_from_socket(
549+ self.socket, self._report_activity)
550
551 def terminate_due_to_error(self):
552 # TODO: This should log to a server log file, but no such thing
553 # exists yet. Andrew Bennetts 2006-09-29.
554- osutils.until_no_eintr(self.socket.close)
555+ self.socket.close()
556 self.finished = True
557
558 def _write_out(self, bytes):
559@@ -345,16 +355,16 @@
560 protocol.accept_bytes(bytes)
561
562 def _read_bytes(self, desired_count):
563- return osutils.until_no_eintr(self._in.read, desired_count)
564+ return self._in.read(desired_count)
565
566 def terminate_due_to_error(self):
567 # TODO: This should log to a server log file, but no such thing
568 # exists yet. Andrew Bennetts 2006-09-29.
569- osutils.until_no_eintr(self._out.close)
570+ self._out.close()
571 self.finished = True
572
573 def _write_out(self, bytes):
574- osutils.until_no_eintr(self._out.write, bytes)
575+ self._out.write(bytes)
576
577
578 class SmartClientMediumRequest(object):
579@@ -712,6 +722,14 @@
580 """
581 return SmartClientStreamMediumRequest(self)
582
583+ def reset(self):
584+ """We have been disconnected, reset current state.
585+
586+ This resets things like _current_request and connected state.
587+ """
588+ self.disconnect()
589+ self._current_request = None
590+
591
592 class SmartSimplePipesClientMedium(SmartClientStreamMedium):
593 """A client medium using simple pipes.
594@@ -726,22 +744,35 @@
595
596 def _accept_bytes(self, bytes):
597 """See SmartClientStreamMedium.accept_bytes."""
598- osutils.until_no_eintr(self._writeable_pipe.write, bytes)
599+ try:
600+ self._writeable_pipe.write(bytes)
601+ except IOError, e:
602+ if e.errno in (errno.EINVAL, errno.EPIPE):
603+ raise errors.ConnectionReset(
604+ "Error trying to write to subprocess", e)
605+ raise
606 self._report_activity(len(bytes), 'write')
607
608 def _flush(self):
609 """See SmartClientStreamMedium._flush()."""
610- osutils.until_no_eintr(self._writeable_pipe.flush)
611+ # Note: If flush were to fail, we'd like to raise ConnectionReset, etc.
612+ # However, testing shows that even when the child process is
613+ # gone, this doesn't error.
614+ self._writeable_pipe.flush()
615
616 def _read_bytes(self, count):
617 """See SmartClientStreamMedium._read_bytes."""
618- bytes = osutils.until_no_eintr(self._readable_pipe.read, count)
619+ bytes_to_read = min(count, _MAX_READ_SIZE)
620+ bytes = self._readable_pipe.read(bytes_to_read)
621 self._report_activity(len(bytes), 'read')
622 return bytes
623
624
625 class SmartSSHClientMedium(SmartClientStreamMedium):
626- """A client medium using SSH."""
627+ """A client medium using SSH.
628+
629+ It delegates IO to a SmartSimplePipesClientMedium.
630+ """
631
632 def __init__(self, host, port=None, username=None, password=None,
633 base=None, vendor=None, bzr_remote_path=None):
634@@ -750,11 +781,11 @@
635 :param vendor: An optional override for the ssh vendor to use. See
636 bzrlib.transport.ssh for details on ssh vendors.
637 """
638- self._connected = False
639 self._host = host
640 self._password = password
641 self._port = port
642 self._username = username
643+ self._real_medium = None
644 # for the benefit of progress making a short description of this
645 # transport
646 self._scheme = 'bzr+ssh'
647@@ -762,11 +793,9 @@
648 # _DebugCounter so we have to store all the values used in our repr
649 # method before calling the super init.
650 SmartClientStreamMedium.__init__(self, base)
651- self._read_from = None
652- self._ssh_connection = None
653 self._vendor = vendor
654- self._write_to = None
655 self._bzr_remote_path = bzr_remote_path
656+ self._ssh_connection = None
657
658 def __repr__(self):
659 if self._port is None:
660@@ -783,21 +812,20 @@
661 def _accept_bytes(self, bytes):
662 """See SmartClientStreamMedium.accept_bytes."""
663 self._ensure_connection()
664- osutils.until_no_eintr(self._write_to.write, bytes)
665- self._report_activity(len(bytes), 'write')
666+ self._real_medium.accept_bytes(bytes)
667
668 def disconnect(self):
669 """See SmartClientMedium.disconnect()."""
670- if not self._connected:
671- return
672- osutils.until_no_eintr(self._read_from.close)
673- osutils.until_no_eintr(self._write_to.close)
674- self._ssh_connection.close()
675- self._connected = False
676+ if self._real_medium is not None:
677+ self._real_medium.disconnect()
678+ self._real_medium = None
679+ if self._ssh_connection is not None:
680+ self._ssh_connection.close()
681+ self._ssh_connection = None
682
683 def _ensure_connection(self):
684 """Connect this medium if not already connected."""
685- if self._connected:
686+ if self._real_medium is not None:
687 return
688 if self._vendor is None:
689 vendor = ssh._get_ssh_vendor()
690@@ -807,22 +835,19 @@
691 self._password, self._host, self._port,
692 command=[self._bzr_remote_path, 'serve', '--inet',
693 '--directory=/', '--allow-writes'])
694- self._read_from, self._write_to = \
695- self._ssh_connection.get_filelike_channels()
696- self._connected = True
697+ read_from, write_to = self._ssh_connection.get_filelike_channels()
698+ self._real_medium = SmartSimplePipesClientMedium(
699+ read_from, write_to, self.base)
700
701 def _flush(self):
702 """See SmartClientStreamMedium._flush()."""
703- self._write_to.flush()
704+ self._real_medium._flush()
705
706 def _read_bytes(self, count):
707 """See SmartClientStreamMedium.read_bytes."""
708- if not self._connected:
709+ if self._real_medium is None:
710 raise errors.MediumNotConnected(self)
711- bytes_to_read = min(count, _MAX_READ_SIZE)
712- bytes = osutils.until_no_eintr(self._read_from.read, bytes_to_read)
713- self._report_activity(len(bytes), 'read')
714- return bytes
715+ return self._real_medium.read_bytes(count)
716
717
718 # Port 4155 is the default port for bzr://, registered with IANA.
719@@ -850,7 +875,7 @@
720 """See SmartClientMedium.disconnect()."""
721 if not self._connected:
722 return
723- osutils.until_no_eintr(self._socket.close)
724+ self._socket.close()
725 self._socket = None
726 self._connected = False
727
728@@ -904,8 +929,8 @@
729 """See SmartClientMedium.read_bytes."""
730 if not self._connected:
731 raise errors.MediumNotConnected(self)
732- return _read_bytes_from_socket(
733- self._socket.recv, count, self._report_activity)
734+ return osutils.read_bytes_from_socket(
735+ self._socket, self._report_activity)
736
737
738 class SmartClientStreamMediumRequest(SmartClientMediumRequest):
739@@ -946,21 +971,3 @@
740 This invokes self._medium._flush to ensure all bytes are transmitted.
741 """
742 self._medium._flush()
743-
744-
745-def _read_bytes_from_socket(sock, desired_count, report_activity):
746- # We ignore the desired_count because on sockets it's more efficient to
747- # read large chunks (of _MAX_READ_SIZE bytes) at a time.
748- try:
749- bytes = osutils.until_no_eintr(sock, _MAX_READ_SIZE)
750- except socket.error, e:
751- if len(e.args) and e.args[0] in (errno.ECONNRESET, 10054):
752- # The connection was closed by the other side. Callers expect an
753- # empty string to signal end-of-stream.
754- bytes = ''
755- else:
756- raise
757- else:
758- report_activity(len(bytes), 'read')
759- return bytes
760-
761
762=== modified file 'bzrlib/smart/protocol.py'
763--- bzrlib/smart/protocol.py 2010-02-17 17:11:16 +0000
764+++ bzrlib/smart/protocol.py 2012-09-12 09:29:20 +0000
765@@ -62,7 +62,13 @@
766
767 def _encode_tuple(args):
768 """Encode the tuple args to a bytestream."""
769- return '\x01'.join(args) + '\n'
770+ joined = '\x01'.join(args) + '\n'
771+ if type(joined) is unicode:
772+ # XXX: We should fix things so this never happens! -AJB, 20100304
773+ mutter('response args contain unicode, should be only bytes: %r',
774+ joined)
775+ joined = joined.encode('ascii')
776+ return joined
777
778
779 class Requester(object):
780@@ -648,7 +654,7 @@
781 """Make a remote call with a readv array.
782
783 The body is encoded with one line per readv offset pair. The numbers in
784- each pair are separated by a comma, and no trailing \n is emitted.
785+ each pair are separated by a comma, and no trailing \\n is emitted.
786 """
787 if 'hpss' in debug.debug_flags:
788 mutter('hpss call w/readv: %s', repr(args)[1:-1])
789@@ -1075,9 +1081,6 @@
790 self._real_write_func = write_func
791
792 def _write_func(self, bytes):
793- # TODO: It is probably more appropriate to use sum(map(len, _buf))
794- # for total number of bytes to write, rather than buffer based on
795- # the number of write() calls
796 # TODO: Another possibility would be to turn this into an async model.
797 # Where we let another thread know that we have some bytes if
798 # they want it, but we don't actually block for it
799@@ -1225,6 +1228,7 @@
800 if first_chunk is None:
801 first_chunk = chunk
802 self._write_prefixed_body(chunk)
803+ self.flush()
804 if 'hpssdetail' in debug.debug_flags:
805 # Not worth timing separately, as _write_func is
806 # actually buffered
807@@ -1285,6 +1289,7 @@
808 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
809 self._medium_request = medium_request
810 self._headers = {}
811+ self.body_stream_started = None
812
813 def set_headers(self, headers):
814 self._headers = headers.copy()
815@@ -1325,7 +1330,7 @@
816 """Make a remote call with a readv array.
817
818 The body is encoded with one line per readv offset pair. The numbers in
819- each pair are separated by a comma, and no trailing \n is emitted.
820+ each pair are separated by a comma, and no trailing \\n is emitted.
821 """
822 if 'hpss' in debug.debug_flags:
823 mutter('hpss call w/readv: %s', repr(args)[1:-1])
824@@ -1350,6 +1355,7 @@
825 if path is not None:
826 mutter(' (to %s)', path)
827 self._request_start_time = osutils.timer_func()
828+ self.body_stream_started = False
829 self._write_protocol_version()
830 self._write_headers(self._headers)
831 self._write_structure(args)
832@@ -1357,6 +1363,9 @@
833 # have finished sending the stream. We would notice at the end
834 # anyway, but if the medium can deliver it early then it's good
835 # to short-circuit the whole request...
836+ # Provoke any ConnectionReset failures before we start the body stream.
837+ self.flush()
838+ self.body_stream_started = True
839 for exc_info, part in _iter_with_errors(stream):
840 if exc_info is not None:
841 # Iterating the stream failed. Cleanly abort the request.
842
843=== modified file 'bzrlib/smart/request.py'
844--- bzrlib/smart/request.py 2010-02-17 17:11:16 +0000
845+++ bzrlib/smart/request.py 2012-09-12 09:29:20 +0000
846@@ -1,4 +1,4 @@
847-# Copyright (C) 2006-2010 Canonical Ltd
848+# Copyright (C) 2006-2012 Canonical Ltd
849 #
850 # This program is free software; you can redistribute it and/or modify
851 # it under the terms of the GNU General Public License as published by
852@@ -134,7 +134,7 @@
853 It will return a SmartServerResponse if the command does not expect a
854 body.
855
856- :param *args: the arguments of the request.
857+ :param args: the arguments of the request.
858 """
859 self._check_enabled()
860 return self.do(*args)
861@@ -486,152 +486,196 @@
862 return SuccessfulSmartServerResponse((answer,))
863
864
865+# In the 'info' attribute, we store whether this request is 'safe' to retry if
866+# we get a disconnect while reading the response. It can have the values:
867+# read This is purely a read request, so retrying it is perfectly ok.
868+# idem An idempotent write request. Something like 'put' where if you put
869+# the same bytes twice you end up with the same final bytes.
870+# semi This is a request that isn't strictly idempotent, but doesn't
871+# result in corruption if it is retried. This is for things like
872+# 'lock' and 'unlock'. If you call lock, it updates the disk
873+# structure. If you fail to read the response, you won't be able to
874+# use the lock, because you don't have the lock token. Calling lock
875+# again will fail, because the lock is already taken. However, we
876+# can't tell if the server received our request or not. If it didn't,
877+# then retrying the request is fine, as it will actually do what we
878+# want. If it did, we will interrupt the current operation, but we
879+# are no worse off than interrupting the current operation because of
880+# a ConnectionReset.
881+# semivfs Similar to semi, but specific to a Virtual FileSystem request.
882+# stream This is a request that takes a stream that cannot be restarted if
883+# consumed. This request is 'safe' in that if we determine the
884+# connection is closed before we consume the stream, we can try
885+# again.
886+# mutate State is updated in a way that replaying that request results in a
887+# different state. For example 'append' writes more bytes to a given
888+# file. If append succeeds, it moves the file pointer.
889 request_handlers = registry.Registry()
890 request_handlers.register_lazy(
891- 'append', 'bzrlib.smart.vfs', 'AppendRequest')
892+ 'append', 'bzrlib.smart.vfs', 'AppendRequest', info='mutate')
893 request_handlers.register_lazy(
894 'Branch.get_config_file', 'bzrlib.smart.branch',
895- 'SmartServerBranchGetConfigFile')
896+ 'SmartServerBranchGetConfigFile', info='read')
897 request_handlers.register_lazy(
898- 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent')
899+ 'Branch.get_parent', 'bzrlib.smart.branch', 'SmartServerBranchGetParent',
900+ info='read')
901 request_handlers.register_lazy(
902 'Branch.get_tags_bytes', 'bzrlib.smart.branch',
903- 'SmartServerBranchGetTagsBytes')
904+ 'SmartServerBranchGetTagsBytes', info='read')
905 request_handlers.register_lazy(
906 'Branch.set_tags_bytes', 'bzrlib.smart.branch',
907- 'SmartServerBranchSetTagsBytes')
908-request_handlers.register_lazy(
909- 'Branch.get_stacked_on_url', 'bzrlib.smart.branch', 'SmartServerBranchRequestGetStackedOnURL')
910-request_handlers.register_lazy(
911- 'Branch.last_revision_info', 'bzrlib.smart.branch', 'SmartServerBranchRequestLastRevisionInfo')
912-request_handlers.register_lazy(
913- 'Branch.lock_write', 'bzrlib.smart.branch', 'SmartServerBranchRequestLockWrite')
914-request_handlers.register_lazy( 'Branch.revision_history',
915- 'bzrlib.smart.branch', 'SmartServerRequestRevisionHistory')
916-request_handlers.register_lazy( 'Branch.set_config_option',
917- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetConfigOption')
918-request_handlers.register_lazy( 'Branch.set_last_revision',
919- 'bzrlib.smart.branch', 'SmartServerBranchRequestSetLastRevision')
920+ 'SmartServerBranchSetTagsBytes', info='idem')
921+request_handlers.register_lazy(
922+ 'Branch.get_stacked_on_url', 'bzrlib.smart.branch',
923+ 'SmartServerBranchRequestGetStackedOnURL', info='read')
924+request_handlers.register_lazy(
925+ 'Branch.last_revision_info', 'bzrlib.smart.branch',
926+ 'SmartServerBranchRequestLastRevisionInfo', info='read')
927+request_handlers.register_lazy(
928+ 'Branch.lock_write', 'bzrlib.smart.branch',
929+ 'SmartServerBranchRequestLockWrite', info='semi')
930+request_handlers.register_lazy(
931+ 'Branch.revision_history', 'bzrlib.smart.branch',
932+ 'SmartServerRequestRevisionHistory', info='read')
933+request_handlers.register_lazy(
934+ 'Branch.set_config_option', 'bzrlib.smart.branch',
935+ 'SmartServerBranchRequestSetConfigOption', info='idem')
936+request_handlers.register_lazy(
937+ 'Branch.set_last_revision', 'bzrlib.smart.branch',
938+ 'SmartServerBranchRequestSetLastRevision', info='idem')
939 request_handlers.register_lazy(
940 'Branch.set_last_revision_info', 'bzrlib.smart.branch',
941- 'SmartServerBranchRequestSetLastRevisionInfo')
942+ 'SmartServerBranchRequestSetLastRevisionInfo', info='idem')
943 request_handlers.register_lazy(
944 'Branch.set_last_revision_ex', 'bzrlib.smart.branch',
945- 'SmartServerBranchRequestSetLastRevisionEx')
946+ 'SmartServerBranchRequestSetLastRevisionEx', info='idem')
947 request_handlers.register_lazy(
948 'Branch.set_parent_location', 'bzrlib.smart.branch',
949- 'SmartServerBranchRequestSetParentLocation')
950+ 'SmartServerBranchRequestSetParentLocation', info='idem')
951 request_handlers.register_lazy(
952- 'Branch.unlock', 'bzrlib.smart.branch', 'SmartServerBranchRequestUnlock')
953+ 'Branch.unlock', 'bzrlib.smart.branch',
954+ 'SmartServerBranchRequestUnlock', info='semi')
955 request_handlers.register_lazy(
956 'BzrDir.cloning_metadir', 'bzrlib.smart.bzrdir',
957- 'SmartServerBzrDirRequestCloningMetaDir')
958+ 'SmartServerBzrDirRequestCloningMetaDir', info='read')
959 request_handlers.register_lazy(
960 'BzrDir.create_branch', 'bzrlib.smart.bzrdir',
961- 'SmartServerRequestCreateBranch')
962+ 'SmartServerRequestCreateBranch', info='semi')
963 request_handlers.register_lazy(
964 'BzrDir.create_repository', 'bzrlib.smart.bzrdir',
965- 'SmartServerRequestCreateRepository')
966+ 'SmartServerRequestCreateRepository', info='semi')
967 request_handlers.register_lazy(
968 'BzrDir.find_repository', 'bzrlib.smart.bzrdir',
969- 'SmartServerRequestFindRepositoryV1')
970+ 'SmartServerRequestFindRepositoryV1', info='read')
971 request_handlers.register_lazy(
972 'BzrDir.find_repositoryV2', 'bzrlib.smart.bzrdir',
973- 'SmartServerRequestFindRepositoryV2')
974+ 'SmartServerRequestFindRepositoryV2', info='read')
975 request_handlers.register_lazy(
976 'BzrDir.find_repositoryV3', 'bzrlib.smart.bzrdir',
977- 'SmartServerRequestFindRepositoryV3')
978+ 'SmartServerRequestFindRepositoryV3', info='read')
979 request_handlers.register_lazy(
980 'BzrDir.get_config_file', 'bzrlib.smart.bzrdir',
981- 'SmartServerBzrDirRequestConfigFile')
982+ 'SmartServerBzrDirRequestConfigFile', info='read')
983 request_handlers.register_lazy(
984 'BzrDirFormat.initialize', 'bzrlib.smart.bzrdir',
985- 'SmartServerRequestInitializeBzrDir')
986+ 'SmartServerRequestInitializeBzrDir', info='semi')
987 request_handlers.register_lazy(
988 'BzrDirFormat.initialize_ex_1.16', 'bzrlib.smart.bzrdir',
989- 'SmartServerRequestBzrDirInitializeEx')
990-request_handlers.register_lazy(
991- 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir')
992-request_handlers.register_lazy(
993- 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir_2_1')
994+ 'SmartServerRequestBzrDirInitializeEx', info='semi')
995+request_handlers.register_lazy(
996+ 'BzrDir.open', 'bzrlib.smart.bzrdir', 'SmartServerRequestOpenBzrDir',
997+ info='read')
998+request_handlers.register_lazy(
999+ 'BzrDir.open_2.1', 'bzrlib.smart.bzrdir',
1000+ 'SmartServerRequestOpenBzrDir_2_1', info='read')
1001 request_handlers.register_lazy(
1002 'BzrDir.open_branch', 'bzrlib.smart.bzrdir',
1003- 'SmartServerRequestOpenBranch')
1004+ 'SmartServerRequestOpenBranch', info='read')
1005 request_handlers.register_lazy(
1006 'BzrDir.open_branchV2', 'bzrlib.smart.bzrdir',
1007- 'SmartServerRequestOpenBranchV2')
1008+ 'SmartServerRequestOpenBranchV2', info='read')
1009 request_handlers.register_lazy(
1010 'BzrDir.open_branchV3', 'bzrlib.smart.bzrdir',
1011- 'SmartServerRequestOpenBranchV3')
1012-request_handlers.register_lazy(
1013- 'delete', 'bzrlib.smart.vfs', 'DeleteRequest')
1014-request_handlers.register_lazy(
1015- 'get', 'bzrlib.smart.vfs', 'GetRequest')
1016-request_handlers.register_lazy(
1017- 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest')
1018-request_handlers.register_lazy(
1019- 'has', 'bzrlib.smart.vfs', 'HasRequest')
1020-request_handlers.register_lazy(
1021- 'hello', 'bzrlib.smart.request', 'HelloRequest')
1022-request_handlers.register_lazy(
1023- 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest')
1024-request_handlers.register_lazy(
1025- 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest')
1026-request_handlers.register_lazy(
1027- 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest')
1028-request_handlers.register_lazy(
1029- 'move', 'bzrlib.smart.vfs', 'MoveRequest')
1030-request_handlers.register_lazy(
1031- 'put', 'bzrlib.smart.vfs', 'PutRequest')
1032-request_handlers.register_lazy(
1033- 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest')
1034-request_handlers.register_lazy(
1035- 'readv', 'bzrlib.smart.vfs', 'ReadvRequest')
1036-request_handlers.register_lazy(
1037- 'rename', 'bzrlib.smart.vfs', 'RenameRequest')
1038+ 'SmartServerRequestOpenBranchV3', info='read')
1039+request_handlers.register_lazy(
1040+ 'delete', 'bzrlib.smart.vfs', 'DeleteRequest', info='semivfs')
1041+request_handlers.register_lazy(
1042+ 'get', 'bzrlib.smart.vfs', 'GetRequest', info='read')
1043+request_handlers.register_lazy(
1044+ 'get_bundle', 'bzrlib.smart.request', 'GetBundleRequest', info='read')
1045+request_handlers.register_lazy(
1046+ 'has', 'bzrlib.smart.vfs', 'HasRequest', info='read')
1047+request_handlers.register_lazy(
1048+ 'hello', 'bzrlib.smart.request', 'HelloRequest', info='read')
1049+request_handlers.register_lazy(
1050+ 'iter_files_recursive', 'bzrlib.smart.vfs', 'IterFilesRecursiveRequest',
1051+ info='read')
1052+request_handlers.register_lazy(
1053+ 'list_dir', 'bzrlib.smart.vfs', 'ListDirRequest', info='read')
1054+request_handlers.register_lazy(
1055+ 'mkdir', 'bzrlib.smart.vfs', 'MkdirRequest', info='semivfs')
1056+request_handlers.register_lazy(
1057+ 'move', 'bzrlib.smart.vfs', 'MoveRequest', info='semivfs')
1058+request_handlers.register_lazy(
1059+ 'put', 'bzrlib.smart.vfs', 'PutRequest', info='idem')
1060+request_handlers.register_lazy(
1061+ 'put_non_atomic', 'bzrlib.smart.vfs', 'PutNonAtomicRequest', info='idem')
1062+request_handlers.register_lazy(
1063+ 'readv', 'bzrlib.smart.vfs', 'ReadvRequest', info='read')
1064+request_handlers.register_lazy(
1065+ 'rename', 'bzrlib.smart.vfs', 'RenameRequest', info='semivfs')
1066 request_handlers.register_lazy(
1067 'PackRepository.autopack', 'bzrlib.smart.packrepository',
1068- 'SmartServerPackRepositoryAutopack')
1069-request_handlers.register_lazy('Repository.gather_stats',
1070- 'bzrlib.smart.repository',
1071- 'SmartServerRepositoryGatherStats')
1072-request_handlers.register_lazy('Repository.get_parent_map',
1073- 'bzrlib.smart.repository',
1074- 'SmartServerRepositoryGetParentMap')
1075-request_handlers.register_lazy(
1076- 'Repository.get_revision_graph', 'bzrlib.smart.repository', 'SmartServerRepositoryGetRevisionGraph')
1077-request_handlers.register_lazy(
1078- 'Repository.has_revision', 'bzrlib.smart.repository', 'SmartServerRequestHasRevision')
1079-request_handlers.register_lazy(
1080- 'Repository.insert_stream', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream')
1081-request_handlers.register_lazy(
1082- 'Repository.insert_stream_1.19', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStream_1_19')
1083-request_handlers.register_lazy(
1084- 'Repository.insert_stream_locked', 'bzrlib.smart.repository', 'SmartServerRepositoryInsertStreamLocked')
1085-request_handlers.register_lazy(
1086- 'Repository.is_shared', 'bzrlib.smart.repository', 'SmartServerRepositoryIsShared')
1087-request_handlers.register_lazy(
1088- 'Repository.lock_write', 'bzrlib.smart.repository', 'SmartServerRepositoryLockWrite')
1089+ 'SmartServerPackRepositoryAutopack', info='idem')
1090+request_handlers.register_lazy(
1091+ 'Repository.gather_stats', 'bzrlib.smart.repository',
1092+ 'SmartServerRepositoryGatherStats', info='read')
1093+request_handlers.register_lazy(
1094+ 'Repository.get_parent_map', 'bzrlib.smart.repository',
1095+ 'SmartServerRepositoryGetParentMap', info='read')
1096+request_handlers.register_lazy(
1097+ 'Repository.get_revision_graph', 'bzrlib.smart.repository',
1098+ 'SmartServerRepositoryGetRevisionGraph', info='read')
1099+request_handlers.register_lazy(
1100+ 'Repository.has_revision', 'bzrlib.smart.repository',
1101+ 'SmartServerRequestHasRevision', info='read')
1102+request_handlers.register_lazy(
1103+ 'Repository.insert_stream', 'bzrlib.smart.repository',
1104+ 'SmartServerRepositoryInsertStream', info='stream')
1105+request_handlers.register_lazy(
1106+ 'Repository.insert_stream_1.19', 'bzrlib.smart.repository',
1107+ 'SmartServerRepositoryInsertStream_1_19', info='stream')
1108+request_handlers.register_lazy(
1109+ 'Repository.insert_stream_locked', 'bzrlib.smart.repository',
1110+ 'SmartServerRepositoryInsertStreamLocked', info='stream')
1111+request_handlers.register_lazy(
1112+ 'Repository.is_shared', 'bzrlib.smart.repository',
1113+ 'SmartServerRepositoryIsShared', info='read')
1114+request_handlers.register_lazy(
1115+ 'Repository.lock_write', 'bzrlib.smart.repository',
1116+ 'SmartServerRepositoryLockWrite', info='semi')
1117 request_handlers.register_lazy(
1118 'Repository.set_make_working_trees', 'bzrlib.smart.repository',
1119- 'SmartServerRepositorySetMakeWorkingTrees')
1120+ 'SmartServerRepositorySetMakeWorkingTrees', info='idem')
1121 request_handlers.register_lazy(
1122- 'Repository.unlock', 'bzrlib.smart.repository', 'SmartServerRepositoryUnlock')
1123+ 'Repository.unlock', 'bzrlib.smart.repository',
1124+ 'SmartServerRepositoryUnlock', info='semi')
1125 request_handlers.register_lazy(
1126 'Repository.get_rev_id_for_revno', 'bzrlib.smart.repository',
1127- 'SmartServerRepositoryGetRevIdForRevno')
1128+ 'SmartServerRepositoryGetRevIdForRevno', info='read')
1129 request_handlers.register_lazy(
1130 'Repository.get_stream', 'bzrlib.smart.repository',
1131- 'SmartServerRepositoryGetStream')
1132+ 'SmartServerRepositoryGetStream', info='read')
1133 request_handlers.register_lazy(
1134 'Repository.get_stream_1.19', 'bzrlib.smart.repository',
1135- 'SmartServerRepositoryGetStream_1_19')
1136+ 'SmartServerRepositoryGetStream_1_19', info='read')
1137 request_handlers.register_lazy(
1138 'Repository.tarball', 'bzrlib.smart.repository',
1139- 'SmartServerRepositoryTarball')
1140-request_handlers.register_lazy(
1141- 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest')
1142-request_handlers.register_lazy(
1143- 'stat', 'bzrlib.smart.vfs', 'StatRequest')
1144-request_handlers.register_lazy(
1145- 'Transport.is_readonly', 'bzrlib.smart.request', 'SmartServerIsReadonly')
1146+ 'SmartServerRepositoryTarball', info='read')
1147+request_handlers.register_lazy(
1148+ 'rmdir', 'bzrlib.smart.vfs', 'RmdirRequest', info='semivfs')
1149+request_handlers.register_lazy(
1150+ 'stat', 'bzrlib.smart.vfs', 'StatRequest', info='read')
1151+request_handlers.register_lazy(
1152+ 'Transport.is_readonly', 'bzrlib.smart.request',
1153+ 'SmartServerIsReadonly', info='read')
1154
1155=== modified file 'bzrlib/tests/test_bundle.py'
1156--- bzrlib/tests/test_bundle.py 2010-02-17 17:11:16 +0000
1157+++ bzrlib/tests/test_bundle.py 2012-09-12 09:29:20 +0000
1158@@ -1855,20 +1855,23 @@
1159 self.sock.bind(('127.0.0.1', 0))
1160 self.sock.listen(1)
1161 self.port = self.sock.getsockname()[1]
1162+ self.stopping = threading.Event()
1163 self.thread = threading.Thread(
1164 name='%s (port %d)' % (self.__class__.__name__, self.port),
1165 target=self.accept_and_close)
1166 self.thread.start()
1167
1168 def accept_and_close(self):
1169- conn, addr = self.sock.accept()
1170- conn.shutdown(socket.SHUT_RDWR)
1171- conn.close()
1172+ while not self.stopping.isSet():
1173+ conn, addr = self.sock.accept()
1174+ conn.shutdown(socket.SHUT_RDWR)
1175+ conn.close()
1176
1177 def get_url(self):
1178 return 'bzr://127.0.0.1:%d/' % (self.port,)
1179
1180 def stop_server(self):
1181+ self.stopping.set()
1182 try:
1183 # make sure the thread dies by connecting to the listening socket,
1184 # just in case the test failed to do so.
1185
1186=== modified file 'bzrlib/tests/test_osutils.py'
1187--- bzrlib/tests/test_osutils.py 2010-11-30 20:42:42 +0000
1188+++ bzrlib/tests/test_osutils.py 2012-09-12 09:29:20 +0000
1189@@ -801,6 +801,45 @@
1190 self.assertEqual(None, osutils.safe_file_id(None))
1191
1192
1193+class TestSendAll(tests.TestCase):
1194+
1195+ def test_send_with_disconnected_socket(self):
1196+ class DisconnectedSocket(object):
1197+ def __init__(self, err):
1198+ self.err = err
1199+ def send(self, content):
1200+ raise self.err
1201+ def close(self):
1202+ pass
1203+ # All of these should be treated as ConnectionReset
1204+ errs = []
1205+ for err_cls in (IOError, socket.error):
1206+ for errnum in osutils._end_of_stream_errors:
1207+ errs.append(err_cls(errnum))
1208+ for err in errs:
1209+ sock = DisconnectedSocket(err)
1210+ self.assertRaises(errors.ConnectionReset,
1211+ osutils.send_all, sock, 'some more content')
1212+
1213+ def test_send_with_no_progress(self):
1214+ # See https://bugs.launchpad.net/bzr/+bug/1047309
1215+ # It seems that paramiko can get into a state where it doesn't error,
1216+ # but it returns 0 bytes sent for requests over and over again.
1217+ class NoSendingSocket(object):
1218+ def __init__(self):
1219+ self.call_count = 0
1220+ def send(self, bytes):
1221+ self.call_count += 1
1222+ if self.call_count > 100:
1223+ # Prevent the test suite from hanging
1224+ raise RuntimeError('too many calls')
1225+ return 0
1226+ sock = NoSendingSocket()
1227+ self.assertRaises(errors.ConnectionReset,
1228+ osutils.send_all, sock, 'content')
1229+ self.assertEqual(1, sock.call_count)
1230+
1231+
1232 class TestWin32Funcs(tests.TestCase):
1233 """Test that _win32 versions of os utilities return appropriate paths."""
1234
1235
1236=== modified file 'bzrlib/tests/test_smart_request.py'
1237--- bzrlib/tests/test_smart_request.py 2009-07-27 02:11:25 +0000
1238+++ bzrlib/tests/test_smart_request.py 2012-09-12 09:29:20 +0000
1239@@ -109,6 +109,16 @@
1240 self.assertEqual(
1241 [[transport]] * 3, handler._command.jail_transports_log)
1242
1243+ def test_all_registered_requests_are_safety_qualified(self):
1244+ unclassified_requests = []
1245+ allowed_info = ('read', 'idem', 'mutate', 'semivfs', 'semi', 'stream')
1246+ for key in request.request_handlers.keys():
1247+ info = request.request_handlers.get_info(key)
1248+ if info is None or info not in allowed_info:
1249+ unclassified_requests.append(key)
1250+ if unclassified_requests:
1251+ self.fail('These requests were not categorized as safe/unsafe'
1252+ ' to retry: %s' % (unclassified_requests,))
1253
1254
1255 class TestSmartRequestHandlerErrorTranslation(TestCase):
1256
1257=== modified file 'bzrlib/tests/test_smart_transport.py'
1258--- bzrlib/tests/test_smart_transport.py 2010-02-17 17:11:16 +0000
1259+++ bzrlib/tests/test_smart_transport.py 2012-09-12 09:29:20 +0000
1260@@ -18,13 +18,17 @@
1261
1262 # all of this deals with byte strings so this is safe
1263 from cStringIO import StringIO
1264+import errno
1265 import os
1266 import socket
1267+import subprocess
1268+import sys
1269 import threading
1270
1271 import bzrlib
1272 from bzrlib import (
1273 bzrdir,
1274+ debug,
1275 errors,
1276 osutils,
1277 tests,
1278@@ -49,6 +53,29 @@
1279 from bzrlib.transport.http import SmartClientHTTPMediumRequest
1280
1281
1282+def create_file_pipes():
1283+ r, w = os.pipe()
1284+ # These must be opened without buffering, or we get undefined results
1285+ rf = os.fdopen(r, 'rb', 0)
1286+ wf = os.fdopen(w, 'wb', 0)
1287+ return rf, wf
1288+
1289+
1290+def portable_socket_pair():
1291+ """Return a pair of TCP sockets connected to each other.
1292+
1293+ Unlike socket.socketpair, this should work on Windows.
1294+ """
1295+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1296+ listen_sock.bind(('127.0.0.1', 0))
1297+ listen_sock.listen(1)
1298+ client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1299+ client_sock.connect(listen_sock.getsockname())
1300+ server_sock, addr = listen_sock.accept()
1301+ listen_sock.close()
1302+ return server_sock, client_sock
1303+
1304+
1305 class StringIOSSHVendor(object):
1306 """A SSH vendor that uses StringIO to buffer writes and answer reads."""
1307
1308@@ -63,6 +90,27 @@
1309 return StringIOSSHConnection(self)
1310
1311
1312+class FirstRejectedStringIOSSHVendor(StringIOSSHVendor):
1313+ """The first connection will be considered closed.
1314+
1315+ The second connection will succeed normally.
1316+ """
1317+
1318+ def __init__(self, read_from, write_to, fail_at_write=True):
1319+ super(FirstRejectedStringIOSSHVendor, self).__init__(read_from,
1320+ write_to)
1321+ self.fail_at_write = fail_at_write
1322+ self._first = True
1323+
1324+ def connect_ssh(self, username, password, host, port, command):
1325+ self.calls.append(('connect_ssh', username, password, host, port,
1326+ command))
1327+ if self._first:
1328+ self._first = False
1329+ return ClosedSSHConnection(self)
1330+ return StringIOSSHConnection(self)
1331+
1332+
1333 class StringIOSSHConnection(object):
1334 """A SSH connection that uses StringIO to buffer writes and answer reads."""
1335
1336@@ -71,11 +119,36 @@
1337
1338 def close(self):
1339 self.vendor.calls.append(('close', ))
1340+ self.vendor.read_from.close()
1341+ self.vendor.write_to.close()
1342
1343 def get_filelike_channels(self):
1344 return self.vendor.read_from, self.vendor.write_to
1345
1346
1347+class ClosedSSHConnection(object):
1348+ """An SSH connection that just has closed channels."""
1349+
1350+ def __init__(self, vendor):
1351+ self.vendor = vendor
1352+
1353+ def close(self):
1354+ self.vendor.calls.append(('close', ))
1355+
1356+ def get_filelike_channels(self):
1357+ # We create matching pipes, and then close the ssh side
1358+ bzr_read, ssh_write = create_file_pipes()
1359+ # We always fail when bzr goes to read
1360+ ssh_write.close()
1361+ if self.vendor.fail_at_write:
1362+ # If set, we'll also fail when bzr goes to write
1363+ ssh_read, bzr_write = create_file_pipes()
1364+ ssh_read.close()
1365+ else:
1366+ bzr_write = self.vendor.write_to
1367+ return bzr_read, bzr_write
1368+
1369+
1370 class _InvalidHostnameFeature(tests.Feature):
1371 """Does 'non_existent.invalid' fail to resolve?
1372
1373@@ -171,6 +244,91 @@
1374 client_medium._accept_bytes('abc')
1375 self.assertEqual('abc', output.getvalue())
1376
1377+ def test_simple_pipes__accept_bytes_subprocess_closed(self):
1378+ # It is unfortunate that we have to use Popen for this. However,
1379+ # os.pipe() does not behave the same as subprocess.Popen().
1380+ # On Windows, if you use os.pipe() and close the write side,
1381+ # read.read() hangs. On Linux, read.read() returns the empty string.
1382+ p = subprocess.Popen([sys.executable, '-c',
1383+ 'import sys\n'
1384+ 'sys.stdout.write(sys.stdin.read(4))\n'
1385+ 'sys.stdout.close()\n'],
1386+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1387+ client_medium = medium.SmartSimplePipesClientMedium(
1388+ p.stdout, p.stdin, 'base')
1389+ client_medium._accept_bytes('abc\n')
1390+ self.assertEqual('abc', client_medium._read_bytes(3))
1391+ p.wait()
1392+ # While writing to the underlying pipe,
1393+ # Windows py2.6.6 we get IOError(EINVAL)
1394+ # Lucid py2.6.5, we get IOError(EPIPE)
1395+ # In both cases, it should be wrapped to ConnectionReset
1396+ self.assertRaises(errors.ConnectionReset,
1397+ client_medium._accept_bytes, 'more')
1398+
1399+ def test_simple_pipes__accept_bytes_pipe_closed(self):
1400+ child_read, client_write = create_file_pipes()
1401+ client_medium = medium.SmartSimplePipesClientMedium(
1402+ None, client_write, 'base')
1403+ client_medium._accept_bytes('abc\n')
1404+ self.assertEqual('abc\n', child_read.read(4))
1405+ # While writing to the underlying pipe,
1406+ # Windows py2.6.6 we get IOError(EINVAL)
1407+ # Lucid py2.6.5, we get IOError(EPIPE)
1408+ # In both cases, it should be wrapped to ConnectionReset
1409+ child_read.close()
1410+ self.assertRaises(errors.ConnectionReset,
1411+ client_medium._accept_bytes, 'more')
1412+
1413+ def test_simple_pipes__flush_pipe_closed(self):
1414+ child_read, client_write = create_file_pipes()
1415+ client_medium = medium.SmartSimplePipesClientMedium(
1416+ None, client_write, 'base')
1417+ client_medium._accept_bytes('abc\n')
1418+ child_read.close()
1419+ # Even though the pipe is closed, flush on the write side seems to be a
1420+ # no-op, rather than a failure.
1421+ client_medium._flush()
1422+
1423+ def test_simple_pipes__flush_subprocess_closed(self):
1424+ p = subprocess.Popen([sys.executable, '-c',
1425+ 'import sys\n'
1426+ 'sys.stdout.write(sys.stdin.read(4))\n'
1427+ 'sys.stdout.close()\n'],
1428+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1429+ client_medium = medium.SmartSimplePipesClientMedium(
1430+ p.stdout, p.stdin, 'base')
1431+ client_medium._accept_bytes('abc\n')
1432+ p.wait()
1433+ # Even though the child process is dead, flush seems to be a no-op.
1434+ client_medium._flush()
1435+
1436+ def test_simple_pipes__read_bytes_pipe_closed(self):
1437+ child_read, client_write = create_file_pipes()
1438+ client_medium = medium.SmartSimplePipesClientMedium(
1439+ child_read, client_write, 'base')
1440+ client_medium._accept_bytes('abc\n')
1441+ client_write.close()
1442+ self.assertEqual('abc\n', client_medium._read_bytes(4))
1443+ self.assertEqual('', client_medium._read_bytes(4))
1444+
1445+ def test_simple_pipes__read_bytes_subprocess_closed(self):
1446+ p = subprocess.Popen([sys.executable, '-c',
1447+ 'import sys\n'
1448+ 'if sys.platform == "win32":\n'
1449+ ' import msvcrt, os\n'
1450+ ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n'
1451+ ' msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)\n'
1452+ 'sys.stdout.write(sys.stdin.read(4))\n'
1453+ 'sys.stdout.close()\n'],
1454+ stdout=subprocess.PIPE, stdin=subprocess.PIPE)
1455+ client_medium = medium.SmartSimplePipesClientMedium(
1456+ p.stdout, p.stdin, 'base')
1457+ client_medium._accept_bytes('abc\n')
1458+ p.wait()
1459+ self.assertEqual('abc\n', client_medium._read_bytes(4))
1460+ self.assertEqual('', client_medium._read_bytes(4))
1461+
1462 def test_simple_pipes_client_disconnect_does_nothing(self):
1463 # calling disconnect does nothing.
1464 input = StringIO()
1465@@ -556,6 +714,28 @@
1466 request.finished_reading()
1467 self.assertRaises(errors.ReadingCompleted, request.read_bytes, None)
1468
1469+ def test_reset(self):
1470+ server_sock, client_sock = portable_socket_pair()
1471+ # TODO: Use SmartClientAlreadyConnectedSocketMedium for the versions of
1472+ # bzr where it exists.
1473+ client_medium = medium.SmartTCPClientMedium(None, None, None)
1474+ client_medium._socket = client_sock
1475+ client_medium._connected = True
1476+ req = client_medium.get_request()
1477+ self.assertRaises(errors.TooManyConcurrentRequests,
1478+ client_medium.get_request)
1479+ client_medium.reset()
1480+ # The stream should be reset, marked as disconnected, though ready for
1481+ # us to make a new request
1482+ self.assertFalse(client_medium._connected)
1483+ self.assertIs(None, client_medium._socket)
1484+ try:
1485+ self.assertEqual('', client_sock.recv(1))
1486+ except socket.error, e:
1487+ if e.errno not in (errno.EBADF,):
1488+ raise
1489+ req = client_medium.get_request()
1490+
1491
1492 class RemoteTransportTests(TestCaseWithSmartMedium):
1493
1494@@ -609,20 +789,6 @@
1495 super(TestSmartServerStreamMedium, self).setUp()
1496 self._captureVar('BZR_NO_SMART_VFS', None)
1497
1498- def portable_socket_pair(self):
1499- """Return a pair of TCP sockets connected to each other.
1500-
1501- Unlike socket.socketpair, this should work on Windows.
1502- """
1503- listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1504- listen_sock.bind(('127.0.0.1', 0))
1505- listen_sock.listen(1)
1506- client_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1507- client_sock.connect(listen_sock.getsockname())
1508- server_sock, addr = listen_sock.accept()
1509- listen_sock.close()
1510- return server_sock, client_sock
1511-
1512 def test_smart_query_version(self):
1513 """Feed a canned query version to a server"""
1514 # wire-to-wire, using the whole stack
1515@@ -687,7 +853,7 @@
1516
1517 def test_socket_stream_with_bulk_data(self):
1518 sample_request_bytes = 'command\n9\nbulk datadone\n'
1519- server_sock, client_sock = self.portable_socket_pair()
1520+ server_sock, client_sock = portable_socket_pair()
1521 server = medium.SmartServerSocketStreamMedium(
1522 server_sock, None)
1523 sample_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1524@@ -706,7 +872,7 @@
1525 self.assertTrue(server.finished)
1526
1527 def test_socket_stream_shutdown_detection(self):
1528- server_sock, client_sock = self.portable_socket_pair()
1529+ server_sock, client_sock = portable_socket_pair()
1530 client_sock.close()
1531 server = medium.SmartServerSocketStreamMedium(
1532 server_sock, None)
1533@@ -726,7 +892,7 @@
1534 rest_of_request_bytes = 'lo\n'
1535 expected_response = (
1536 protocol.RESPONSE_VERSION_TWO + 'success\nok\x012\n')
1537- server_sock, client_sock = self.portable_socket_pair()
1538+ server_sock, client_sock = portable_socket_pair()
1539 server = medium.SmartServerSocketStreamMedium(
1540 server_sock, None)
1541 client_sock.sendall(incomplete_request_bytes)
1542@@ -802,7 +968,7 @@
1543 # _serve_one_request should still process both of them as if they had
1544 # been received separately.
1545 sample_request_bytes = 'command\n'
1546- server_sock, client_sock = self.portable_socket_pair()
1547+ server_sock, client_sock = portable_socket_pair()
1548 server = medium.SmartServerSocketStreamMedium(
1549 server_sock, None)
1550 first_protocol = SampleRequest(expected_bytes=sample_request_bytes)
1551@@ -839,7 +1005,7 @@
1552 self.assertTrue(server.finished)
1553
1554 def test_socket_stream_error_handling(self):
1555- server_sock, client_sock = self.portable_socket_pair()
1556+ server_sock, client_sock = portable_socket_pair()
1557 server = medium.SmartServerSocketStreamMedium(
1558 server_sock, None)
1559 fake_protocol = ErrorRaisingProtocol(Exception('boom'))
1560@@ -860,7 +1026,7 @@
1561 self.assertEqual('', from_server.getvalue())
1562
1563 def test_socket_stream_keyboard_interrupt_handling(self):
1564- server_sock, client_sock = self.portable_socket_pair()
1565+ server_sock, client_sock = portable_socket_pair()
1566 server = medium.SmartServerSocketStreamMedium(
1567 server_sock, None)
1568 fake_protocol = ErrorRaisingProtocol(KeyboardInterrupt('boom'))
1569@@ -877,7 +1043,7 @@
1570 return server._build_protocol()
1571
1572 def build_protocol_socket(self, bytes):
1573- server_sock, client_sock = self.portable_socket_pair()
1574+ server_sock, client_sock = portable_socket_pair()
1575 server = medium.SmartServerSocketStreamMedium(
1576 server_sock, None)
1577 client_sock.sendall(bytes)
1578@@ -2778,6 +2944,33 @@
1579 'e', # end
1580 output.getvalue())
1581
1582+ def test_records_start_of_body_stream(self):
1583+ requester, output = self.make_client_encoder_and_output()
1584+ requester.set_headers({})
1585+ in_stream = [False]
1586+ def stream_checker():
1587+ self.assertTrue(requester.body_stream_started)
1588+ in_stream[0] = True
1589+ yield 'content'
1590+ flush_called = []
1591+ orig_flush = requester.flush
1592+ def tracked_flush():
1593+ flush_called.append(in_stream[0])
1594+ if in_stream[0]:
1595+ self.assertTrue(requester.body_stream_started)
1596+ else:
1597+ self.assertFalse(requester.body_stream_started)
1598+ return orig_flush()
1599+ requester.flush = tracked_flush
1600+ requester.call_with_body_stream(('one arg',), stream_checker())
1601+ self.assertEqual(
1602+ 'bzr message 3 (bzr 1.6)\n' # protocol version
1603+ '\x00\x00\x00\x02de' # headers
1604+ 's\x00\x00\x00\x0bl7:one arge' # args
1605+ 'b\x00\x00\x00\x07content' # body
1606+ 'e', output.getvalue())
1607+ self.assertEqual([False, True, True], flush_called)
1608+
1609
1610 class StubMediumRequest(object):
1611 """A stub medium request that tracks the number of times accept_bytes is
1612@@ -3214,6 +3407,195 @@
1613 # encoder.
1614
1615
1616+class Test_SmartClientRequest(tests.TestCase):
1617+
1618+ def make_client_with_failing_medium(self, fail_at_write=True, response=''):
1619+ response_io = StringIO(response)
1620+ output = StringIO()
1621+ vendor = FirstRejectedStringIOSSHVendor(response_io, output,
1622+ fail_at_write=fail_at_write)
1623+ client_medium = medium.SmartSSHClientMedium(
1624+ 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
1625+ 'bzr')
1626+ smart_client = client._SmartClient(client_medium, headers={})
1627+ return output, vendor, smart_client
1628+
1629+ def make_response(self, args, body=None, body_stream=None):
1630+ response_io = StringIO()
1631+ response = _mod_request.SuccessfulSmartServerResponse(args, body=body,
1632+ body_stream=body_stream)
1633+ responder = protocol.ProtocolThreeResponder(response_io.write)
1634+ responder.send_response(response)
1635+ return response_io.getvalue()
1636+
1637+ def test__call_doesnt_retry_append(self):
1638+ response = self.make_response(('appended', '8'))
1639+ output, vendor, smart_client = self.make_client_with_failing_medium(
1640+ fail_at_write=False, response=response)
1641+ smart_request = client._SmartClientRequest(smart_client, 'append',
1642+ ('foo', ''), body='content\n')
1643+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1644+
1645+ def test__call_retries_get_bytes(self):
1646+ response = self.make_response(('ok',), 'content\n')
1647+ output, vendor, smart_client = self.make_client_with_failing_medium(
1648+ fail_at_write=False, response=response)
1649+ smart_request = client._SmartClientRequest(smart_client, 'get',
1650+ ('foo',))
1651+ response, response_handler = smart_request._call(3)
1652+ self.assertEqual(('ok',), response)
1653+ self.assertEqual('content\n', response_handler.read_body_bytes())
1654+
1655+ def test__call_noretry_get_bytes(self):
1656+ debug.debug_flags.add('noretry')
1657+ response = self.make_response(('ok',), 'content\n')
1658+ output, vendor, smart_client = self.make_client_with_failing_medium(
1659+ fail_at_write=False, response=response)
1660+ smart_request = client._SmartClientRequest(smart_client, 'get',
1661+ ('foo',))
1662+ self.assertRaises(errors.ConnectionReset, smart_request._call, 3)
1663+
1664+ def test__send_no_retry_pipes(self):
1665+ client_read, server_write = create_file_pipes()
1666+ server_read, client_write = create_file_pipes()
1667+ client_medium = medium.SmartSimplePipesClientMedium(client_read,
1668+ client_write, base='/')
1669+ smart_client = client._SmartClient(client_medium)
1670+ smart_request = client._SmartClientRequest(smart_client,
1671+ 'hello', ())
1672+ # Close the server side
1673+ server_read.close()
1674+ encoder, response_handler = smart_request._construct_protocol(3)
1675+ self.assertRaises(errors.ConnectionReset,
1676+ smart_request._send_no_retry, encoder)
1677+
1678+ def test__send_read_response_sockets(self):
1679+ listen_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1680+ listen_sock.bind(('127.0.0.1', 0))
1681+ listen_sock.listen(1)
1682+ host, port = listen_sock.getsockname()
1683+ client_medium = medium.SmartTCPClientMedium(host, port, '/')
1684+ client_medium._ensure_connection()
1685+ smart_client = client._SmartClient(client_medium)
1686+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1687+ # Accept the connection, but don't actually talk to the client.
1688+ server_sock, _ = listen_sock.accept()
1689+ server_sock.close()
1690+ # Sockets buffer and don't really notice that the server has closed the
1691+ # connection until we try to read again.
1692+ handler = smart_request._send(3)
1693+ self.assertRaises(errors.ConnectionReset,
1694+ handler.read_response_tuple, expect_body=False)
1695+
1696+ def test__send_retries_on_write(self):
1697+ output, vendor, smart_client = self.make_client_with_failing_medium()
1698+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1699+ handler = smart_request._send(3)
1700+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1701+ '\x00\x00\x00\x02de' # empty headers
1702+ 's\x00\x00\x00\tl5:helloee',
1703+ output.getvalue())
1704+ self.assertEqual(
1705+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1706+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1707+ ('close',),
1708+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1709+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1710+ ],
1711+ vendor.calls)
1712+
1713+ def test__send_doesnt_retry_read_failure(self):
1714+ output, vendor, smart_client = self.make_client_with_failing_medium(
1715+ fail_at_write=False)
1716+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1717+ handler = smart_request._send(3)
1718+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1719+ '\x00\x00\x00\x02de' # empty headers
1720+ 's\x00\x00\x00\tl5:helloee',
1721+ output.getvalue())
1722+ self.assertEqual(
1723+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1724+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1725+ ],
1726+ vendor.calls)
1727+ self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
1728+
1729+ def test__send_request_retries_body_stream_if_not_started(self):
1730+ output, vendor, smart_client = self.make_client_with_failing_medium()
1731+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1732+ body_stream=['a', 'b'])
1733+ response_handler = smart_request._send(3)
1734+ # We connect, get disconnected, and notice before consuming the stream,
1735+ # so we try again one time and succeed.
1736+ self.assertEqual(
1737+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1738+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1739+ ('close',),
1740+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1741+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1742+ ],
1743+ vendor.calls)
1744+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1745+ '\x00\x00\x00\x02de' # empty headers
1746+ 's\x00\x00\x00\tl5:helloe'
1747+ 'b\x00\x00\x00\x01a'
1748+ 'b\x00\x00\x00\x01b'
1749+ 'e',
1750+ output.getvalue())
1751+
1752+ def test__send_request_stops_if_body_started(self):
1753+ # We intentionally use the python StringIO so that we can subclass it.
1754+ from StringIO import StringIO
1755+ response = StringIO()
1756+
1757+ class FailAfterFirstWrite(StringIO):
1758+ """Allow one 'write' call to pass, fail the rest"""
1759+ def __init__(self):
1760+ StringIO.__init__(self)
1761+ self._first = True
1762+
1763+ def write(self, s):
1764+ if self._first:
1765+ self._first = False
1766+ return StringIO.write(self, s)
1767+ raise IOError(errno.EINVAL, 'invalid file handle')
1768+ output = FailAfterFirstWrite()
1769+
1770+ vendor = FirstRejectedStringIOSSHVendor(response, output,
1771+ fail_at_write=False)
1772+ client_medium = medium.SmartSSHClientMedium(
1773+ 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
1774+ 'bzr')
1775+ smart_client = client._SmartClient(client_medium, headers={})
1776+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
1777+ body_stream=['a', 'b'])
1778+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1779+ # We connect, and manage to get to the point that we start consuming
1780+ # the body stream. The next write fails, so we just stop.
1781+ self.assertEqual(
1782+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1783+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1784+ ('close',),
1785+ ],
1786+ vendor.calls)
1787+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
1788+ '\x00\x00\x00\x02de' # empty headers
1789+ 's\x00\x00\x00\tl5:helloe',
1790+ output.getvalue())
1791+
1792+ def test__send_disabled_retry(self):
1793+ debug.debug_flags.add('noretry')
1794+ output, vendor, smart_client = self.make_client_with_failing_medium()
1795+ smart_request = client._SmartClientRequest(smart_client, 'hello', ())
1796+ self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
1797+ self.assertEqual(
1798+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
1799+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
1800+ ('close',),
1801+ ],
1802+ vendor.calls)
1803+
1804+
1805 class LengthPrefixedBodyDecoder(tests.TestCase):
1806
1807 # XXX: TODO: make accept_reading_trailer invoke translate_response or

Subscribers

People subscribed via source and target branches