Merge lp:~jameinel/bzr/2.1-client-stream-started-819604 into lp:bzr/2.1

Proposed by John A Meinel
Status: Work in progress
Proposed branch: lp:~jameinel/bzr/2.1-client-stream-started-819604
Merge into: lp:bzr/2.1
Prerequisite: lp:~jameinel/bzr/2.1-client-reconnect-819604
Diff against target: 175 lines (+90/-9)
4 files modified
bzrlib/smart/client.py (+5/-1)
bzrlib/smart/medium.py (+1/-2)
bzrlib/smart/protocol.py (+5/-0)
bzrlib/tests/test_smart_transport.py (+79/-6)
To merge this branch: bzr merge lp:~jameinel/bzr/2.1-client-stream-started-819604
Reviewer Review Type Date Requested Status
bzr-core Pending
Review via email: mp+78631@code.launchpad.net

Commit message

Bug #819604, allow reconnecting with a body stream as long as we haven't touched the stream yet.

Description of the change

This layers on top of my 'client reconnect' patch for the case where we have a body stream.

1) It adds a flush just before we process the first entry of the body stream. This adds an extra write call to 'bzr push'. However, we write after every entry in the stream, and the first entry of the stream is actually very small. (bzrlib.repository._stream_to_byte_stream() the first object that gets flushed is 'pack_writer.begin()', which is just the pack header.)
I don't imagine writing out the ProtocolThree headers will be adding tons of real world overhead.

2) It tracks once we start consuming body_stream, so that we don't do the auto-retry after that point.

I felt it was clear and clean enough to do it this way. We could arguably do a little bit better higher up the stack, but not much, as I described in the bug. Specifically, RemoteRepository.insert_stream is the only caller that passes a body_stream. But it, itself, only gets a stream of records to pass. So if we've started passing records, we can't retry anyway. So the retry logic needs to go all the way up to Fetch code.

To post a comment you must log in.
Revision history for this message
Andrew Bennetts (spiv) wrote :

[Just a comment, not a full review]

> 1) It adds a flush just before we process the first entry of the body
> stream. This adds an extra write call to 'bzr push'. However, we write
> after every entry in the stream, and the first entry of the stream is
> actually very small. (bzrlib.repository._stream_to_byte_stream() the
> first object that gets flushed is 'pack_writer.begin()', which is just
> the pack header.)
> I don't imagine writing out the ProtocolThree headers will be adding
> tons of real world overhead.

You may be right, but please check. Use the netem stuff under linux to
add latency to loopback (I like 500ms), it's documented on the wiki, and
try some pushes with and without.

> 2) It tracks once we start consuming body_stream, so that we don't do
> the auto-retry after that point.
>
> I felt it was clear and clean enough to do it this way. We could

The layering here sounds reasonable to me.

-Andrew.

Revision history for this message
John A Meinel (jameinel) wrote :
Download full text (3.3 KiB)

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

On 10/10/2011 01:37 AM, Andrew Bennetts wrote:
> [Just a comment, not a full review]
>
>> 1) It adds a flush just before we process the first entry of the body
>> stream. This adds an extra write call to 'bzr push'. However, we write
>> after every entry in the stream, and the first entry of the stream is
>> actually very small. (bzrlib.repository._stream_to_byte_stream() the
>> first object that gets flushed is 'pack_writer.begin()', which is just
>> the pack header.)
>> I don't imagine writing out the ProtocolThree headers will be adding
>> tons of real world overhead.
>
> You may be right, but please check. Use the netem stuff under linux to
> add latency to loopback (I like 500ms), it's documented on the wiki, and
> try some pushes with and without.

Short version: This can cause us to send an extra packet, but we don't
always.

I plugged in netem, adding a 500ms delay on the loopback.

I then create a shared repository, branch into it loggerhead -r -10, and
then push to it from the client. I added debugging code that says
'flushing' whenever we hit that '.flush()'[1] line.

bzr.dev 21.627 21.655 21.645
flush() 22.587 22.575 21.656 21.640

It looks like there are 2 tiers of times possible, one with the extra
round trip, and one without, but the extra round trip is not guaranteed.
(Maybe it depends on whether we get a context switch between the
.flush() call and consuming the first hunk from the iterator.)

I ran 3-4 times before I started this list, and all of them were at 21.6
tier, but the next 6 runs were about even between 21.6 and 22.5.

If I change it so that we just have an empty shared repository, and we
push all of loggerhead's history, I get:

bzr.dev 54.682 54.713 54.742 54.683
flush() 55.052 55.083 55.403 55.043

What is strange is that it is consistently slower for a 'write
everything' call, but it isn't even 500ms slower.

Note that without netem slowing things down, the full push times are:

bzr.dev 3.923 3.975 3.945
flush() 3.974 3.954 3.967

So certainly well within any noise margin.

Note that 'flush()' is really just saying 'actually write this to the
socket/pipe/etc', it isn't actually requesting a round trip be made.
Though I think we might have NODELAY set, if you look at the code, it is
going to be writing the actual header/content bytes to the socket in a
millisecond or so.

Arguably during the time we are sending the bulk data stream, we should
be setting NODELAY off.

>
>> 2) It tracks once we start consuming body_stream, so that we don't do
>> the auto-retry after that point.
>>
>> I felt it was clear and clean enough to do it this way. We could
>
> The layering here sounds reasonable to me.
>
> -Andrew.
>
>

We could leave in the check for body_stream_started without adding in
the flush(), though the chances of triggering it before the stream are
virtually nil because we buffer up to 1MB of data before auto-flushing.

[1] I actually see 'flushing' display 2 times. Probably that is the
'probe' insert? (The empty one to determine the remote side supports the
api.)

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)
Comment: Using ...

Read more...

Revision history for this message
John A Meinel (jameinel) wrote :

On 10/10/2011 01:37 AM, Andrew Bennetts wrote:
> [Just a comment, not a full review]
>
>> 1) It adds a flush just before we process the first entry of the body
>> stream. This adds an extra write call to 'bzr push'. However, we write
>> after every entry in the stream, and the first entry of the stream is
>> actually very small. (bzrlib.repository._stream_to_byte_stream() the
>> first object that gets flushed is 'pack_writer.begin()', which is just
>> the pack header.)
>> I don't imagine writing out the ProtocolThree headers will be adding
>> tons of real world overhead.
>
> You may be right, but please check. Use the netem stuff under linux to
> add latency to loopback (I like 500ms), it's documented on the wiki, and
> try some pushes with and without.

Short version: This can cause us to send an extra packet, but we don't
always.

I plugged in netem, adding a 500ms delay on the loopback.

I then create a shared repository, branch into it loggerhead -r -10, and
then push to it from the client. I added debugging code that says
'flushing' whenever we hit that '.flush()'[1] line.

bzr.dev 21.627 21.655 21.645
flush() 22.587 22.575 21.656 21.640

It looks like there are 2 tiers of times possible, one with the extra
round trip, and one without, but the extra round trip is not guaranteed.
(Maybe it depends on whether we get a context switch between the
.flush() call and consuming the first hunk from the iterator.)

I ran 3-4 times before I started this list, and all of them were at 21.6
tier, but the next 6 runs were about even between 21.6 and 22.5.

If I change it so that we just have an empty shared repository, and we
push all of loggerhead's history, I get:

bzr.dev 54.682 54.713 54.742 54.683
flush() 55.052 55.083 55.403 55.043

What is strange is that it is consistently slower for a 'write
everything' call, but it isn't even 500ms slower.

Note that without netem slowing things down, the full push times are:

bzr.dev 3.923 3.975 3.945
flush() 3.974 3.954 3.967

So certainly well within any noise margin.

Note that 'flush()' is really just saying 'actually write this to the
socket/pipe/etc', it isn't actually requesting a round trip be made.
Though I think we might have NODELAY set, if you look at the code, it is
going to be writing the actual header/content bytes to the socket in a
millisecond or so.

Arguably during the time we are sending the bulk data stream, we should
be setting NODELAY off.

>
>> 2) It tracks once we start consuming body_stream, so that we don't do
>> the auto-retry after that point.
>>
>> I felt it was clear and clean enough to do it this way. We could
>
> The layering here sounds reasonable to me.
>
> -Andrew.
>
>

We could leave in the check for body_stream_started without adding in
the flush(), though the chances of triggering it before the stream are
virtually nil because we buffer up to 1MB of data before auto-flushing.

[1] I actually see 'flushing' display 2 times. Probably that is the
'probe' insert? (The empty one to determine the remote side supports the
api.)

John
=:->

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'bzrlib/smart/client.py'
--- bzrlib/smart/client.py 2011-10-10 12:59:12 +0000
+++ bzrlib/smart/client.py 2011-10-10 12:59:12 +0000
@@ -239,9 +239,13 @@
239 # Connection is dead, so close our end of it.239 # Connection is dead, so close our end of it.
240 self.client._medium.reset()240 self.client._medium.reset()
241 if (('noretry' in debug.debug_flags)241 if (('noretry' in debug.debug_flags)
242 or self.body_stream is not None):242 or (self.body_stream is not None
243 and encoder.body_stream_started)):
243 # We can't restart a body_stream that has been partially244 # We can't restart a body_stream that has been partially
244 # consumed, so we don't retry.245 # consumed, so we don't retry.
246 # Note: We don't have to worry about
247 # SmartClientRequestProtocolOne or Two, because they don't
248 # support client-side body streams.
245 raise249 raise
246 trace.warning('ConnectionReset calling %r, retrying'250 trace.warning('ConnectionReset calling %r, retrying'
247 % (self.method,))251 % (self.method,))
248252
=== modified file 'bzrlib/smart/medium.py'
--- bzrlib/smart/medium.py 2011-10-10 12:59:12 +0000
+++ bzrlib/smart/medium.py 2011-10-10 12:59:12 +0000
@@ -739,8 +739,7 @@
739 except IOError, e:739 except IOError, e:
740 if e.errno in (errno.EINVAL, errno.EPIPE):740 if e.errno in (errno.EINVAL, errno.EPIPE):
741 raise errors.ConnectionReset(741 raise errors.ConnectionReset(
742 "Error trying to write to subprocess:\n%s"742 "Error trying to write to subprocess:\n%s" % (e,))
743 % (e,))
744 raise743 raise
745 self._report_activity(len(bytes), 'write')744 self._report_activity(len(bytes), 'write')
746745
747746
=== modified file 'bzrlib/smart/protocol.py'
--- bzrlib/smart/protocol.py 2011-10-10 12:59:12 +0000
+++ bzrlib/smart/protocol.py 2011-10-10 12:59:12 +0000
@@ -1282,6 +1282,7 @@
1282 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)1282 _ProtocolThreeEncoder.__init__(self, medium_request.accept_bytes)
1283 self._medium_request = medium_request1283 self._medium_request = medium_request
1284 self._headers = {}1284 self._headers = {}
1285 self.body_stream_started = None
12851286
1286 def set_headers(self, headers):1287 def set_headers(self, headers):
1287 self._headers = headers.copy()1288 self._headers = headers.copy()
@@ -1347,6 +1348,7 @@
1347 if path is not None:1348 if path is not None:
1348 mutter(' (to %s)', path)1349 mutter(' (to %s)', path)
1349 self._request_start_time = osutils.timer_func()1350 self._request_start_time = osutils.timer_func()
1351 self.body_stream_started = False
1350 self._write_protocol_version()1352 self._write_protocol_version()
1351 self._write_headers(self._headers)1353 self._write_headers(self._headers)
1352 self._write_structure(args)1354 self._write_structure(args)
@@ -1354,6 +1356,9 @@
1354 # have finished sending the stream. We would notice at the end1356 # have finished sending the stream. We would notice at the end
1355 # anyway, but if the medium can deliver it early then it's good1357 # anyway, but if the medium can deliver it early then it's good
1356 # to short-circuit the whole request...1358 # to short-circuit the whole request...
1359 # Provoke any ConnectionReset failures before we start the body stream.
1360 self.flush()
1361 self.body_stream_started = True
1357 for exc_info, part in _iter_with_errors(stream):1362 for exc_info, part in _iter_with_errors(stream):
1358 if exc_info is not None:1363 if exc_info is not None:
1359 # Iterating the stream failed. Cleanly abort the request.1364 # Iterating the stream failed. Cleanly abort the request.
13601365
=== modified file 'bzrlib/tests/test_smart_transport.py'
--- bzrlib/tests/test_smart_transport.py 2011-10-10 12:59:12 +0000
+++ bzrlib/tests/test_smart_transport.py 2011-10-10 12:59:12 +0000
@@ -2944,6 +2944,33 @@
2944 'e', # end2944 'e', # end
2945 output.getvalue())2945 output.getvalue())
29462946
2947 def test_records_start_of_body_stream(self):
2948 requester, output = self.make_client_encoder_and_output()
2949 requester.set_headers({})
2950 in_stream = [False]
2951 def stream_checker():
2952 self.assertTrue(requester.body_stream_started)
2953 in_stream[0] = True
2954 yield 'content'
2955 flush_called = []
2956 orig_flush = requester.flush
2957 def tracked_flush():
2958 flush_called.append(in_stream[0])
2959 if in_stream[0]:
2960 self.assertTrue(requester.body_stream_started)
2961 else:
2962 self.assertFalse(requester.body_stream_started)
2963 return orig_flush()
2964 requester.flush = tracked_flush
2965 requester.call_with_body_stream(('one arg',), stream_checker())
2966 self.assertEqual(
2967 'bzr message 3 (bzr 1.6)\n' # protocol version
2968 '\x00\x00\x00\x02de' # headers
2969 's\x00\x00\x00\x0bl7:one arge' # args
2970 'b\x00\x00\x00\x07content' # body
2971 'e', output.getvalue())
2972 self.assertEqual([False, True, True], flush_called)
2973
29472974
2948class StubMediumRequest(object):2975class StubMediumRequest(object):
2949 """A stub medium request that tracks the number of times accept_bytes is2976 """A stub medium request that tracks the number of times accept_bytes is
@@ -3458,22 +3485,68 @@
3458 vendor.calls)3485 vendor.calls)
3459 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)3486 self.assertRaises(errors.ConnectionReset, handler.read_response_tuple)
34603487
3461 def test__send_doesnt_retry_body_stream(self):3488 def test__send_request_retries_body_stream_if_not_started(self):
3462 # We don't know how much of body_stream would get iterated as part of
3463 # _send before it failed to actually send the request, so we
3464 # just always fail in this condition.
3465 output, vendor, smart_client = self.make_client_with_failing_medium()3489 output, vendor, smart_client = self.make_client_with_failing_medium()
3466 smart_request = client._SmartClientRequest(smart_client, 'hello', (),3490 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3467 body_stream=['a', 'b'])3491 body_stream=['a', 'b'])
3492 response_handler = smart_request._send(3)
3493 # We connect, get disconnected, and notice before consuming the stream,
3494 # so we try again one time and succeed.
3495 self.assertEqual(
3496 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3497 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3498 ('close',),
3499 ('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3500 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3501 ],
3502 vendor.calls)
3503 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3504 '\x00\x00\x00\x02de' # empty headers
3505 's\x00\x00\x00\tl5:helloe'
3506 'b\x00\x00\x00\x01a'
3507 'b\x00\x00\x00\x01b'
3508 'e',
3509 output.getvalue())
3510
3511 def test__send_request_stops_if_body_started(self):
3512 # We intentionally use the python StringIO so that we can subclass it.
3513 from StringIO import StringIO
3514 response = StringIO()
3515
3516 class FailAfterFirstWrite(StringIO):
3517 """Allow one 'write' call to pass, fail the rest"""
3518 def __init__(self):
3519 StringIO.__init__(self)
3520 self._first = True
3521
3522 def write(self, s):
3523 if self._first:
3524 self._first = False
3525 return StringIO.write(self, s)
3526 raise IOError(errno.EINVAL, 'invalid file handle')
3527 output = FailAfterFirstWrite()
3528
3529 vendor = FirstRejectedStringIOSSHVendor(response, output,
3530 fail_at_write=False)
3531 client_medium = medium.SmartSSHClientMedium(
3532 'a host', 'a port', 'a user', 'a pass', 'base', vendor,
3533 'bzr')
3534 smart_client = client._SmartClient(client_medium, headers={})
3535 smart_request = client._SmartClientRequest(smart_client, 'hello', (),
3536 body_stream=['a', 'b'])
3468 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)3537 self.assertRaises(errors.ConnectionReset, smart_request._send, 3)
3469 # We got one connect, but it fails, so we disconnect, but we don't3538 # We connect, and manage to get to the point that we start consuming
3470 # retry it3539 # the body stream. The next write fails, so we just stop.
3471 self.assertEqual(3540 self.assertEqual(
3472 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',3541 [('connect_ssh', 'a user', 'a pass', 'a host', 'a port',
3473 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),3542 ['bzr', 'serve', '--inet', '--directory=/', '--allow-writes']),
3474 ('close',),3543 ('close',),
3475 ],3544 ],
3476 vendor.calls)3545 vendor.calls)
3546 self.assertEqual('bzr message 3 (bzr 1.6)\n' # protocol
3547 '\x00\x00\x00\x02de' # empty headers
3548 's\x00\x00\x00\tl5:helloe',
3549 output.getvalue())
34773550
3478 def test__send_disabled_retry(self):3551 def test__send_disabled_retry(self):
3479 debug.debug_flags.add('noretry')3552 debug.debug_flags.add('noretry')

Subscribers

People subscribed via source and target branches