Merge lp:~jameinel/bzr/2.3-bzr-connect-ssh into lp:bzr

Proposed by John A Meinel on 2010-09-01
Status: Merged
Approved by: Vincent Ladeuil on 2010-09-02
Approved revision: 5406
Merged at revision: 5405
Proposed branch: lp:~jameinel/bzr/2.3-bzr-connect-ssh
Merge into: lp:bzr
Diff against target: 112 lines (+31/-24)
3 files modified
NEWS (+3/-0)
bzrlib/tests/stub_sftp.py (+28/-22)
bzrlib/tests/test_transport.py (+0/-2)
To merge this branch: bzr merge lp:~jameinel/bzr/2.3-bzr-connect-ssh
Reviewer Review Type Date Requested Status
Vincent Ladeuil 2010-09-01 Approve on 2010-09-02
Review via email: mp+34331@code.launchpad.net

Commit message

Fix bug #626876 by properly waiting for the paramiko.Transport thread to finish its work.

Description of the change

This should fix the failing test that we started skipping.

Specifically, the code was expecting "ssh_server.start_server()" to finish the conversation. And that is what it appears when looking through the paramiko.Transport code.

However, it turns out that it returns as soon as "negotiation" has completed, not when it is done with the conversation.

It turns out that paramiko will set the "self.completion_event" for a bunch of different reasons, but you also need to check "self.active" to know if it really thinks it is done with the conversation. We didn't notice before because we didn't try to stop the client threads, causing the extra connections to get closed.

As such, I went ahead and created a different event for us to wait on, that just triggers when the thread is actually done running. Maybe we could just use "ssh_server.join(TIMEOUT)" instead. That would make the code cleaner, at least.

Part of the confusion is that:
a) TestServerInAThread is run in a thread, when it gets a connection it
b) Starts a new thread and calls process_request, which ends up calling
   finish_request which is where the TestSFTPConnectionHandler gets instantiated, and
   in __init__ it calls self.setup()
c) self.setup() spawns a new thread in paramiko.Transport. Though it isn't obvious at
   first that paramiko.Transport is actually a subclass of Thread.

Anyway, it makes sense to have the test 'process_request' thread just wait for the paramiko.Transport thread to finish. Arguably we shouldn't be threaded at that level at all, and SftpServer shouldn't be spawning threads per request. (Instead we should just have the main SftpServer thread which spawns and waits for paramiko.Transport threads directly.)

This code works, the test passes reliably, and it doesn't seem to have affected any other tests negatively.

  bzr selftest -s bt.test_transport bzr_ssh
  bzr selftest "(?i)sftp"

To post a comment you must log in.
John A Meinel (jameinel) wrote :

An even simpler one that works is to just change the line:

 time.sleep(0.2)

to

  ssh_server.join()

I've gone ahead and switched to that one, as I think it is easier to understand.

lp:~jameinel/bzr/2.3-bzr-connect-ssh updated on 2010-09-01
5406. By John A Meinel on 2010-09-01

Use the simpler form for the test server

Vincent Ladeuil (vila) wrote :

Well done !

A couple of tweaks and some comments.

8 +* Wait for the SSH server to actually finish, rather than just waiting for
9 + it to negotiate the key exchange. (John Arbash Meinel, #626876)
10 +

You commented on the abuse of the 'server' word but you use it here in a way that will
add to the confusion.

You're waiting for the *connection* to actually finish here and the way paramiko
presents it is misleading (using ssh_server as a variable name wasn't a good idea
in the first place either).

The SFTPServer uses TestingTCPServerInAThread and TestingSFTPServer which uses
TestingThreadingTCPServer.
TestingTCPServerInAThread is where the *server* is really shut down.
TestingThreadingTCPServer creates a new thread for each *connection*.

This connection should not start processing the request (the ssh command here)
before the key exchange. I.e TestingSFTPConnectionHandler.handle() which is
called after setup() should not start before the key exchange is done). The sleep
was enough... in some cases. But the join() *ensures* the synchronisation.

So:

28 server = tcs._server_interface(tcs)
29 + # This blocks until the key exchange has been done
30 ssh_server.start_server(None, server)

is misleading too, that's the join() that guarantees that the key exchange
has occurred and that it's safe to process the request. This comment applies
to the .join() indeed.

I think that one confusing point is that ssh presents an encrypted medium
(paramiko.Transport) from which one or several "connections" (in the TCP sense)
can happen (paramiko.Channel which can be used a socket).
We don't really care about the distinction in our case and view the Transport + Channel
as a single socket that appears unencrypted to us.

What this means is that we shouldn't touch the socket before the key exchange has occurred
and your fix ensures that.

Anyway, thank you for finding this nasty one, we may still want to avoid spawning
a subprocess here though and clean up some of the tearDown done here which I'm pretty
sure is now useless.

review: Approve
John A Meinel (jameinel) wrote :

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

...
>
> This connection should not start processing the request (the ssh command here)
> before the key exchange. I.e TestingSFTPConnectionHandler.handle() which is
> called after setup() should not start before the key exchange is done). The sleep
> was enough... in some cases. But the join() *ensures* the synchronisation.
>
> So:
>
> 28 server = tcs._server_interface(tcs)
> 29 + # This blocks until the key exchange has been done
> 30 ssh_server.start_server(None, server)
>
> is misleading too, that's the join() that guarantees that the key exchange
> has occurred and that it's safe to process the request. This comment applies
> to the .join() indeed.

No. The 'start_server' waits until the key negotiation has occurred.
After that point it does the actual communication on the channel.

The 'ssh_server.join()' waits all the way until the remote end
disconnects and everything has completed.

The reason this is 'okay' is because the .handle() method is *completely
empty*, and is just called immediately after .setup().

The BaseResponseHandler __init__ code is:

 self.setup()
 self.handle()
 self.close() # or finish or whatever

>
> I think that one confusing point is that ssh presents an encrypted medium
> (paramiko.Transport) from which one or several "connections" (in the TCP sense)
> can happen (paramiko.Channel which can be used a socket).
> We don't really care about the distinction in our case and view the Transport + Channel
> as a single socket that appears unencrypted to us.
>
> What this means is that we shouldn't touch the socket before the key exchange has occurred
> and your fix ensures that.

That is true, but still what I said above is correct.

>
> Anyway, thank you for finding this nasty one, we may still want to avoid spawning
> a subprocess here though and clean up some of the tearDown done here which I'm pretty
> sure is now useless.
>

John
=:->
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.9 (Cygwin)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org/

iEYEARECAAYFAkx/qH0ACgkQJdeBCYSNAAPylQCfWmEOqi3FV7Iatw6ueNi0D3tH
pkoAoIrqn59BYHg6kRojSNotoI2c4RJT
=UPuM
-----END PGP SIGNATURE-----

Vincent Ladeuil (vila) wrote :

>
> No. The 'start_server' waits until the key negotiation has occurred.

start_server() internally calls Thread.start(), are you sure there is only one
other thread that can set self.completion_event ?

> After that point it does the actual communication on the channel.

>
> The 'ssh_server.join()' waits all the way until the remote end
> disconnects and everything has completed.

That's what I said :)

> The reason this is 'okay' is because the .handle() method is *completely
> empty*, and is just called immediately after .setup().

That also matches what I said, it shouldn't be empty.

It will be clearer to separate between setup, handle and finish, and put the join
into finish.

> The BaseResponseHandler __init__ code is:
>
> self.setup()
> self.handle()
> self.close() # or finish or whatever

Yes, finish() not whatever :)

> That is true, but still what I said above is correct.

Confusingly correct, that's my point.

lp:~jameinel/bzr/2.3-bzr-connect-ssh updated on 2010-09-02
5407. By John A Meinel on 2010-09-02

refactor a little bit, so it is clearer what is going on.

John A Meinel (jameinel) wrote :

sent to pqm by email

lp:~jameinel/bzr/2.3-bzr-connect-ssh updated on 2010-09-02
5408. By John A Meinel on 2010-09-02

Add a .finish method to the sftp version as well.

John A Meinel (jameinel) wrote :

sent to pqm by email

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS'
2--- NEWS 2010-09-01 18:06:19 +0000
3+++ NEWS 2010-09-02 19:22:46 +0000
4@@ -158,6 +158,9 @@
5 to a symlink, now returns information about the symlink.
6 (Martin Pool)
7
8+* Wait for the SSH server to actually finish, rather than just waiting for
9+ it to negotiate the key exchange. (John Arbash Meinel, #626876)
10+
11 Improvements
12 ************
13
14
15=== modified file 'bzrlib/tests/stub_sftp.py'
16--- bzrlib/tests/stub_sftp.py 2010-07-01 15:14:35 +0000
17+++ bzrlib/tests/stub_sftp.py 2010-09-02 19:22:46 +0000
18@@ -344,22 +344,25 @@
19 def setup(self):
20 self.wrap_for_latency()
21 tcs = self.server.test_case_server
22- ssh_server = paramiko.Transport(self.request)
23- ssh_server.add_server_key(tcs.get_host_key())
24- ssh_server.set_subsystem_handler('sftp', paramiko.SFTPServer,
25- StubSFTPServer, root=tcs._root,
26- home=tcs._server_homedir)
27+ ptrans = paramiko.Transport(self.request)
28+ self.paramiko_transport = ptrans
29+ # Set it to a channel under 'bzr' so that we get debug info
30+ ptrans.set_log_channel('bzr.paramiko.transport')
31+ ptrans.add_server_key(tcs.get_host_key())
32+ ptrans.set_subsystem_handler('sftp', paramiko.SFTPServer,
33+ StubSFTPServer, root=tcs._root,
34+ home=tcs._server_homedir)
35 server = tcs._server_interface(tcs)
36- ssh_server.start_server(None, server)
37- # FIXME: Long story short:
38- # bt.test_transport.TestSSHConnections.test_bzr_connect_to_bzr_ssh
39- # fails if we wait less than 0.2 seconds... paramiko uses a lot of
40- # timeouts internally which probably mask a synchronisation
41- # problem. Note that this is the only test that requires this hack and
42- # the test may need to be fixed instead, but it's late and the test is
43- # horrible as mentioned in its comments :) -- vila 20100623
44- import time
45- time.sleep(0.2)
46+ # This blocks until the key exchange has been done
47+ ptrans.start_server(None, server)
48+
49+ def finish(self):
50+ # Wait for the conversation to finish, when the paramiko.Transport
51+ # thread finishes
52+ # TODO: Consider timing out after XX seconds rather than hanging.
53+ # Also we could check paramiko_transport.active and possibly
54+ # paramiko_transport.getException().
55+ self.paramiko_transport.join()
56
57 def wrap_for_latency(self):
58 tcs = self.server.test_case_server
59@@ -380,7 +383,7 @@
60 def get_transport(self):
61 return self
62 def get_log_channel(self):
63- return 'paramiko'
64+ return 'bzr.paramiko'
65 def get_name(self):
66 return '1'
67 def get_hexdump(self):
68@@ -389,11 +392,13 @@
69 pass
70
71 tcs = self.server.test_case_server
72- server = paramiko.SFTPServer(
73+ sftp_server = paramiko.SFTPServer(
74 FakeChannel(), 'sftp', StubServer(tcs), StubSFTPServer,
75 root=tcs._root, home=tcs._server_homedir)
76+ self.sftp_server = sftp_server
77+ sys_stderr = sys.stderr # Used in error reporting during shutdown
78 try:
79- server.start_subsystem(
80+ sftp_server.start_subsystem(
81 'sftp', None, ssh.SocketAsChannelAdapter(self.request))
82 except socket.error, e:
83 if (len(e.args) > 0) and (e.args[0] == errno.EPIPE):
84@@ -409,10 +414,11 @@
85 # seems to be the best we can do.
86 # FIXME: All interpreter shutdown errors should have been related
87 # to daemon threads, cleanup needed -- vila 20100623
88- import sys
89- sys.stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
90- sys.stderr.write('%s\n\n' % (e,))
91- server.finish_subsystem()
92+ sys_stderr.write('\nEXCEPTION %r: ' % (e.__class__,))
93+ sys_stderr.write('%s\n\n' % (e,))
94+
95+ def finish(self):
96+ self.sftp_server.finish_subsystem()
97
98
99 class TestingSFTPServer(test_server.TestingThreadingTCPServer):
100
101=== modified file 'bzrlib/tests/test_transport.py'
102--- bzrlib/tests/test_transport.py 2010-08-30 21:27:50 +0000
103+++ bzrlib/tests/test_transport.py 2010-09-02 19:22:46 +0000
104@@ -900,8 +900,6 @@
105
106 bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
107 """
108- raise tests.TestSkipped('this test was recently broken,'
109- ' see bug #626876')
110 # This test actually causes a bzr instance to be invoked, which is very
111 # expensive: it should be the only such test in the test suite.
112 # A reasonable evolution for this would be to simply check inside