Merge lp:~jameinel/bzr/2.1-client-stream-started-819604 into lp:bzr/2.1

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.1-client-stream-started-819604
Merge into: lp:bzr/2.1
Prerequisite: lp:~jameinel/bzr/2.1-client-reconnect-819604
Diff against target: 175 lines (+90/-9)
4 files modified
bzrlib/smart/client.py (+5/-1)
bzrlib/smart/medium.py (+1/-2)
bzrlib/smart/protocol.py (+5/-0)
bzrlib/tests/test_smart_transport.py (+79/-6)
To merge this branch: bzr merge lp:~jameinel/bzr/2.1-client-stream-started-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78631@code.launchpad.net

Commit message

Bug #819604, allow reconnecting with a body stream as long as we haven't touched the stream yet.

Description of the change

This layers on top of my 'client reconnect' patch for the case where we have a body stream.

1) It adds a flush just before we process the first entry of the body stream. This adds an extra write call to 'bzr push'. However, we write after every entry in the stream, and the first entry of the stream is actually very small. (bzrlib.repository._stream_to_byte_stream() the first object that gets flushed is 'pack_writer.begin()', which is just the pack header.)
I don't imagine writing out the ProtocolThree headers will be adding tons of real world overhead.

2) It tracks once we start consuming body_stream, so that we don't do the auto-retry after that point.

I felt it was clear and clean enough to do it this way. We could arguably do a little bit better higher up the stack, but not much, as I described in the bug. Specifically, RemoteRepository.insert_stream is the only caller that passes a body_stream. But it, itself, only gets a stream of records to pass. So if we've started passing records, we can't retry anyway. So the retry logic needs to go all the way up to Fetch code.

To post a comment you must log in.
Revision history for this message
Andrew Bennetts (spiv) wrote :

[Just a comment, not a full review]

> 1) It adds a flush just before we process the first entry of the body
> stream. This adds an extra write call to 'bzr push'. However, we write
> after every entry in the stream, and the first entry of the stream is
> actually very small. (bzrlib.repository._stream_to_byte_stream() the
> first object that gets flushed is 'pack_writer.begin()', which is just
> the pack header.)
> I don't imagine writing out the ProtocolThree headers will be adding
> tons of real world overhead.

You may be right, but please check. Use the netem stuff under linux to
add latency to loopback (I like 500ms), it's documented on the wiki, and
try some pushes with and without.

> 2) It tracks once we start consuming body_stream, so that we don't do
> the auto-retry after that point.
>
> I felt it was clear and clean enough to do it this way. We could

The layering here sounds reasonable to me.

-Andrew.

Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (3.3 KiB)

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 10/10/2011 01:37 AM, Andrew Bennetts wrote:
> [Just a comment, not a full review]
>
>> 1) It adds a flush just before we process the first entry of the body
>> stream. This adds an extra write call to 'bzr push'. However, we write
>> after every entry in the stream, and the first entry of the stream is
>> actually very small. (bzrlib.repository._stream_to_byte_stream() the
>> first object that gets flushed is 'pack_writer.begin()', which is just
>> the pack header.)
>> I don't imagine writing out the ProtocolThree headers will be adding
>> tons of real world overhead.
>
> You may be right, but please check. Use the netem stuff under linux to
> add latency to loopback (I like 500ms), it's documented on the wiki, and
> try some pushes with and without.

Short version: This can cause us to send an extra packet, but we don't
always.

I plugged in netem, adding a 500ms delay on the loopback.

I then create a shared repository, branch into it loggerhead -r -10, and
then push to it from the client. I added debugging code that says
'flushing' whenever we hit that '.flush()'[1] line.

bzr.dev 21.627 21.655 21.645
flush() 22.587 22.575 21.656 21.640

It looks like there are 2 tiers of times possible, one with the extra
round trip, and one without, but the extra round trip is not guaranteed.
(Maybe it depends on whether we get a context switch between the
.flush() call and consuming the first hunk from the iterator.)

I ran 3-4 times before I started this list, and all of them were at 21.6
tier, but the next 6 runs were about even between 21.6 and 22.5.

If I change it so that we just have an empty shared repository, and we
push all of loggerhead's history, I get:

bzr.dev 54.682 54.713 54.742 54.683
flush() 55.052 55.083 55.403 55.043

What is strange is that it is consistently slower for a 'write
everything' call, but it isn't even 500ms slower.

Note that without netem slowing things down, the full push times are:

bzr.dev 3.923 3.975 3.945
flush() 3.974 3.954 3.967

So certainly well within any noise margin.

Note that 'flush()' is really just saying 'actually write this to the
socket/pipe/etc', it isn't actually requesting a round trip be made.
Though I think we might have NODELAY set, if you look at the code, it is
going to be writing the actual header/content bytes to the socket in a
millisecond or so.

Arguably during the time we are sending the bulk data stream, we should
be setting NODELAY off.

>
>> 2) It tracks once we start consuming body_stream, so that we don't do
>> the auto-retry after that point.
>>
>> I felt it was clear and clean enough to do it this way. We could
>
> The layering here sounds reasonable to me.
>
> -Andrew.
>
>

We could leave in the check for body_stream_started without adding in
the flush(), though the chances of triggering it before the stream are
virtually nil because we buffer up to 1MB of data before auto-flushing.

[1] I actually see 'flushing' display 2 times. Probably that is the
'probe' insert? (The empty one to determine the remote side supports the
api.)

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)
Comment: Using ...

Read more...

Revision history for this message
John A Meinel (jameinel) wrote :

On 10/10/2011 01:37 AM, Andrew Bennetts wrote:
> [Just a comment, not a full review]
>
>> 1) It adds a flush just before we process the first entry of the body
>> stream. This adds an extra write call to 'bzr push'. However, we write
>> after every entry in the stream, and the first entry of the stream is
>> actually very small. (bzrlib.repository._stream_to_byte_stream() the
>> first object that gets flushed is 'pack_writer.begin()', which is just
>> the pack header.)
>> I don't imagine writing out the ProtocolThree headers will be adding
>> tons of real world overhead.
>
> You may be right, but please check. Use the netem stuff under linux to
> add latency to loopback (I like 500ms), it's documented on the wiki, and
> try some pushes with and without.

Short version: This can cause us to send an extra packet, but we don't
always.

I plugged in netem, adding a 500ms delay on the loopback.

I then create a shared repository, branch into it loggerhead -r -10, and
then push to it from the client. I added debugging code that says
'flushing' whenever we hit that '.flush()'[1] line.

bzr.dev 21.627 21.655 21.645
flush() 22.587 22.575 21.656 21.640

It looks like there are 2 tiers of times possible, one with the extra
round trip, and one without, but the extra round trip is not guaranteed.
(Maybe it depends on whether we get a context switch between the
.flush() call and consuming the first hunk from the iterator.)

I ran 3-4 times before I started this list, and all of them were at 21.6
tier, but the next 6 runs were about even between 21.6 and 22.5.

If I change it so that we just have an empty shared repository, and we
push all of loggerhead's history, I get:

bzr.dev 54.682 54.713 54.742 54.683
flush() 55.052 55.083 55.403 55.043

What is strange is that it is consistently slower for a 'write
everything' call, but it isn't even 500ms slower.

Note that without netem slowing things down, the full push times are:

bzr.dev 3.923 3.975 3.945
flush() 3.974 3.954 3.967

So certainly well within any noise margin.

Note that 'flush()' is really just saying 'actually write this to the
socket/pipe/etc', it isn't actually requesting a round trip be made.
Though I think we might have NODELAY set, if you look at the code, it is
going to be writing the actual header/content bytes to the socket in a
millisecond or so.

Arguably during the time we are sending the bulk data stream, we should
be setting NODELAY off.

>
>> 2) It tracks once we start consuming body_stream, so that we don't do
>> the auto-retry after that point.
>>
>> I felt it was clear and clean enough to do it this way. We could
>
> The layering here sounds reasonable to me.
>
> -Andrew.
>
>

We could leave in the check for body_stream_started without adding in
the flush(), though the chances of triggering it before the stream are
virtually nil because we buffer up to 1MB of data before auto-flushing.

[1] I actually see 'flushing' display 2 times. Probably that is the
'probe' insert? (The empty one to determine the remote side supports the
api.)

John
=:->

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bzrlib/smart/client.py'
2--- bzrlib/smart/client.py 2011-10-10 12:59:12 +0000
3+++ bzrlib/smart/client.py 2011-10-10 12:59:12 +0000
4@@ -239,9 +239,13 @@
5 # Connection is dead, so close our end of it.
6 self.client._medium.reset()
7 if (('noretry' in debug.debug_flags)
8- or self.body_stream is not None):
9+ or (self.body_stream is not None
10+ and encoder.body_stream_started)):
11 # We can't restart a body_stream that has been partially
12 # consumed, so we don't retry.
13+ # Note: We don't have to worry about
14+ # SmartClientRequestProtocolOne or Two, because they don't
15+ # support client-side body streams.
16 raise
17 trace.warning('ConnectionReset calling %r, retrying'
18 % (self.method,))
19
20=== modified file 'bzrlib/smart/medium.py'
21--- bzrlib/smart/medium.py 2011-10-10 12:59:12 +0000
22+++ bzrlib/smart/medium.py 2011-10-10 12:59:12 +0000
23@@ -739,8 +739,7 @@
24 except IOError, e:
25 if e.errno in (errno.EINVAL, errno.EPIPE):
26 raise errors.ConnectionReset(
27- "Error trying to write to subprocess:\n%s"
28- % (e,))
29+ "Error trying to write to subprocess:\n%s" % (e,))
30 raise
31 self._report_activity(len(bytes), 'write')
32
33
34=== modified file 'bzrlib/smart/protocol.py'
35--- bzrlib/smart/protocol.py 2011-10-10 12:59:12 +0000
36+++ bzrlib/smart/protocol.py 2011-10-10 12:59:12 +0000
37@@ -1282,6 +1282,7 @@
38 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
39 self._medium_request = medium_request
40 self._headers = {}
41+ self.body_stream_started = None
42
43 def set_headers(self, headers):
44 self._headers = headers.copy()
45@@ -1347,6 +1348,7 @@
46 if path is not None:
47 mutter(' (to %s)', path)
48 self._request_start_time = osutils.timer_func()
49+ self.body_stream_started = False
50 self._write_protocol_version()
51 self._write_headers(self._headers)
52 self._write_structure(args)
53@@ -1354,6 +1356,9 @@
54 # have finished sending the stream. We would notice at the end
55 # anyway, but if the medium can deliver it early then it's good
56 # to short-circuit the whole request...
57+ # Provoke any ConnectionReset failures before we start the body stream.
58+ self.flush()
59+ self.body_stream_started = True
60 for exc_info, part in _iter_with_errors(stream):
61 if exc_info is not None:
62 # Iterating the stream failed. Cleanly abort the request.
63
64=== modified file 'bzrlib/tests/test_smart_transport.py'
65--- bzrlib/tests/test_smart_transport.py 2011-10-10 12:59:12 +0000
66+++ bzrlib/tests/test_smart_transport.py 2011-10-10 12:59:12 +0000
67@@ -2944,6 +2944,33 @@
68 'e', # end
69 output.getvalue())
70
71+ def test_records_start_of_body_stream(self):
72+ requester, output = self.make_client_encoder_and_output()
73+ requester.set_headers({})
74+ in_stream = [False]
75+ def stream_checker():
76+ self.assertTrue(requester.body_stream_started)
77+ in_stream[0] = True
78+ yield 'content'
79+ flush_called = []
80+ orig_flush = requester.flush
81+ def tracked_flush():
82+ flush_called.append(in_stream[0])
83+ if in_stream[0]:
84+ self.assertTrue(requester.body_stream_started)
85+ else:
86+ self.assertFalse(requester.body_stream_started)
87+ return orig_flush()
88+ requester.flush = tracked_flush
89+ requester.call_with_body_stream(('one arg',), stream_checker())
90+ self.assertEqual(
91+ 'bzr message 3 (bzr 1.6)\n' # protocol version
92+ '\x00\x00\x00\x02de' # headers
93+ 's\x00\x00\x00\x0bl7:one arge' # args
94+ 'b\x00\x00\x00\x07content' # body
95+ 'e', output.getvalue())
96+ self.assertEqual([False, True, True], flush_called)
97+
98
99 class StubMediumRequest(object):
100 """A stub medium request that tracks the number of times accept_bytes is
101@@ -3458,22 +3485,68 @@
102 vendor.calls)
103 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
104
105- def test__send_doesnt_retry_body_stream(self):
106- # We don't know how much of body_stream would get iterated as part of
107- # _send before it failed to actually send the request, so we
108- # just always fail in this condition.
109+ def test__send_request_retries_body_stream_if_not_started(self):
110 output, vendor, smart_client = self.make_client_with_failing_medium()
111 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
112 body_stream=['a', 'b'])
113+ response_handler = smart_request._send(3)
114+ # We connect, get disconnected, and notice before consuming the stream,
115+ # so we try again one time and succeed.
116+ self.assertEqual(
117+ [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
118+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
119+ ('close',),
120+ ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
121+ ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
122+ ],
123+ vendor.calls)
124+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
125+ '\x00\x00\x00\x02de' # empty headers
126+ 's\x00\x00\x00\tl5:helloe'
127+ 'b\x00\x00\x00\x01a'
128+ 'b\x00\x00\x00\x01b'
129+ 'e',
130+ output.getvalue())
131+
132+ def test__send_request_stops_if_body_started(self):
133+ # We intentionally use the python StringIO so that we can subclass it.
134+ from StringIO import StringIO
135+ response = StringIO()
136+
137+ class FailAfterFirstWrite(StringIO):
138+ """Allow one 'write' call to pass, fail the rest"""
139+ def __init__(self):
140+ StringIO.__init__(self)
141+ self._first = True
142+
143+ def write(self, s):
144+ if self._first:
145+ self._first = False
146+ return StringIO.write(self, s)
147+ raise IOError(errno.EINVAL, 'invalid file handle')
148+ output = FailAfterFirstWrite()
149+
150+ vendor = FirstRejectedStringIOSSHVendor(response, output,
151+ fail_at_write=False)
152+ client_medium = medium.SmartSSHClientMedium(
153+ 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
154+ 'bzr')
155+ smart_client = client._SmartClient(client_medium, headers={})
156+ smart_request = client._SmartClientRequest(smart_client, 'hello', (),
157+ body_stream=['a', 'b'])
158 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
159- # We got one connect, but it fails, so we disconnect, but we don't
160- # retry it
161+ # We connect, and manage to get to the point that we start consuming
162+ # the body stream. The next write fails, so we just stop.
163 self.assertEqual(
164 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
165 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
166 ('close',),
167 ],
168 vendor.calls)
169+ self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
170+ '\x00\x00\x00\x02de' # empty headers
171+ 's\x00\x00\x00\tl5:helloe',
172+ output.getvalue())
173
174 def test__send_disabled_retry(self):
175 debug.debug_flags.add('noretry')

Subscribers

People subscribed via source and target branches