Merge lp:~axwalk/juju-core/apiclient-open-parallel into lp:~go-bot/juju-core/trunk

Proposed by Andrew Wilkins
Status: Merged
Approved by: Andrew Wilkins
Approved revision: no longer in the source branch.
Merged at revision: 2520
Proposed branch: lp:~axwalk/juju-core/apiclient-open-parallel
Merge into: lp:~go-bot/juju-core/trunk
Diff against target: 250 lines (+141/-29)
3 files modified
state/api/apiclient.go (+59/-26)
state/api/apiclient_test.go (+79/-0)
state/api/state.go (+3/-3)
To merge this branch: bzr merge lp:~axwalk/juju-core/apiclient-open-parallel
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+213401@code.launchpad.net

Commit message

state/api: dial info.Addrs in parallel

This change is to dial all addresses in
parallel. The first one that successfully
connects is used.

https://codereview.appspot.com/82450043/

Description of the change

state/api: dial info.Addrs in parallel

This change is to dial all addresses in
parallel. The first one that successfully
connects is used.

https://codereview.appspot.com/82450043/

To post a comment you must log in.
Revision history for this message
Andrew Wilkins (axwalk) wrote :

Reviewers: mp+213401_code.launchpad.net,

Message:
Please take a look.

Description:
state/api: dial info.Addrs in parallel

This change is to dial all addresses in
parallel. The first one that successfully
connects is used.

https://code.launchpad.net/~axwalk/juju-core/apiclient-open-parallel/+merge/213401

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/82450043/

Affected files (+135, -25 lines):
   A [revision details]
   M state/api/apiclient.go
   A state/api/apiclient_test.go

Revision history for this message
Dimiter Naydenov (dimitern) wrote :
Revision history for this message
John A Meinel (jameinel) wrote :

A couple of comments, but nothing that would block landing this.

LGTM

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode142
state/api/apiclient.go:142: // TODO what does "origin" really mean, and
is localhost always ok?
origin would have sense if you were doing actual web requests, I think
it is the location that redirected you here.
I believe the websocket requires something, but it accepts localhost, so
we just do it.

It isn't actually used in our implementation, so it is ok.

See something like:
http://learnitcorrect.com/blog/websocket-is-great-but-not-the-origin-policy.html

It is intended to avoid cross-site scripting, etc. But we just let
people connect to our websockets, and do the auth inside the websocket
(as part of Login) rather than assuming that connections to us are
already valid.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode167
state/api/apiclient.go:167: return conn, nil
Would we want to check <-stop at this point, in case we got a stop while
the dial was processing?

I guess it has to be handled up a layer anyway. We just know that
DialConfig is blocking, so we sort of expect to be in the middle of
dialing while we get a stop request. (because someone else succeeded.)

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: log.Debugf("state/api: %v", err)
maybe a little more context about "error trying to Dial API server, will
retry: %v" ?

It depends what errors we generally get, but it seems helpful to give a
bit more context than just an error.

https://codereview.appspot.com/82450043/

Revision history for this message
Andrew Wilkins (axwalk) wrote :

Please take a look.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode142
state/api/apiclient.go:142: // TODO what does "origin" really mean, and
is localhost always ok?
On 2014/03/31 12:22:11, jameinel wrote:
> origin would have sense if you were doing actual web requests, I think
it is the
> location that redirected you here.
> I believe the websocket requires something, but it accepts localhost,
so we just
> do it.

> It isn't actually used in our implementation, so it is ok.

> See something like:

http://learnitcorrect.com/blog/websocket-is-great-but-not-the-origin-policy.html

> It is intended to avoid cross-site scripting, etc. But we just let
people
> connect to our websockets, and do the auth inside the websocket (as
part of
> Login) rather than assuming that connections to us are already valid.

I just moved that TODO, but thanks for the info :)
I'll remove the TODO, and update the comment.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode167
state/api/apiclient.go:167: return conn, nil
On 2014/03/31 12:22:11, jameinel wrote:
> Would we want to check <-stop at this point, in case we got a stop
while the
> dial was processing?

> I guess it has to be handled up a layer anyway. We just know that
DialConfig is
> blocking, so we sort of expect to be in the middle of dialing while we
get a
> stop request. (because someone else succeeded.)

It is indeed handled higher up; utils/parallel.Try.Start creates a
closure that will dispose of the result by Close()ing it once "stop"
("dying" inside Try) is closed.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: log.Debugf("state/api: %v", err)
On 2014/03/31 12:22:11, jameinel wrote:
> maybe a little more context about "error trying to Dial API server,
will retry:
> %v" ?

> It depends what errors we generally get, but it seems helpful to give
a bit more
> context than just an error.

Done.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient_test.go
File state/api/apiclient_test.go (right):

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient_test.go#newcode10
state/api/apiclient_test.go:10: //jc "github.com/juju/testing/checkers"
On 2014/03/31 11:55:23, dimitern wrote:
> d

Done.

https://codereview.appspot.com/82450043/

Revision history for this message
Go Bot (go-bot) wrote :

Attempt to merge into lp:juju-core failed due to conflicts:

text conflict in state/api/apiclient.go

Revision history for this message
Andrew Wilkins (axwalk) wrote :
Revision history for this message
Roger Peppe (rogpeppe) wrote :

A few fixes that would be nice to land when possible.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode117
state/api/apiclient.go:117: if err := dialWebsocket(addr, opts, pool,
try); err != nil {
This doesn't seem quite right. Try.Start returns ErrStopped if an
earlier attempt has succeeded. If that happens, we want to stop calling
dialWebsocket.

I think something like this might be better:

for _, addr := range info.Addrs {
     err := dialWebsocket(addr, opts, pool, try)
     if err == parallel.ErrStopped {
        break
     }
     if err != nil {
         return nil, err
     }
     select{
     case <-time.After(dialAddressInterval):
     case <-try.Dead():
     }
}

with dialAddressInterval being something like 50ms,
so if the connect succeeds fast, we can avoid dialing
most of the addresses.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode168
state/api/apiclient.go:168: err := parallel.ErrStopped
This isn't quite right - if we used up all our attempts, we probably
want to return a timeout error rather than ErrStopped.

I quite like using HasNext for this kind of thing:

for a := openAttempt.Start(); a.Next(); {
    select{
    case <-stop:
        return nil, parallel.ErrStopped
    default:
    }
    conn, err := websocket.DialConfig(...)
    if err == nil {
        return conn, nil
    }
    if !a.HasNext() {
        return nil, fmt.Errorf("timed out connecting to %q",
cfg.Location)
    }
}
panic("unreachable")

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: for a := openAttempt.Start(); a.Next(); {
It's a pity we don't have a method on Attempt that lets us use
it in selects, so that it could be interrupted immediately.

At some point, something like:

// NextDuration reports the interval until the
// next attempt should be made.
func (a *Attempt) NextDuration() time.Duration

might be good. Then you could write:

for a := openAttempt.Start(); a.HasNext(); {
     select {
     case <-a.stop:
         return nil, parallel.ErrStopped
     case <-time.After(a.NextDuration()):
     }
     ....
}

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode172
state/api/apiclient.go:172: break
this is a no-op.
I think you probably want:

    return nil, parallel.ErrStopped

It could probably do with a test.

https://codereview.appspot.com/82450043/

Revision history for this message
Andrew Wilkins (axwalk) wrote :

Updates over here: https://codereview.appspot.com/82900045/

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode117
state/api/apiclient.go:117: if err := dialWebsocket(addr, opts, pool,
try); err != nil {
On 2014/03/31 15:39:01, rog wrote:
> This doesn't seem quite right. Try.Start returns ErrStopped if an
earlier
> attempt has succeeded. If that happens, we want to stop calling
dialWebsocket.

> I think something like this might be better:

> for _, addr := range info.Addrs {
> err := dialWebsocket(addr, opts, pool, try)
> if err == parallel.ErrStopped {
> break
> }
> if err != nil {
> return nil, err
> }
> select{
> case <-time.After(dialAddressInterval):
> case <-try.Dead():
> }
> }

> with dialAddressInterval being something like 50ms,
> so if the connect succeeds fast, we can avoid dialing
> most of the addresses.

Done.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode168
state/api/apiclient.go:168: err := parallel.ErrStopped
On 2014/03/31 15:39:01, rog wrote:
> This isn't quite right - if we used up all our attempts, we probably
want to
> return a timeout error rather than ErrStopped.

> I quite like using HasNext for this kind of thing:

> for a := openAttempt.Start(); a.Next(); {
> select{
> case <-stop:
> return nil, parallel.ErrStopped
> default:
> }
> conn, err := websocket.DialConfig(...)
> if err == nil {
> return conn, nil
> }
> if !a.HasNext() {
> return nil, fmt.Errorf("timed out connecting to %q",
cfg.Location)
> }
> }
> panic("unreachable")

Done.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: for a := openAttempt.Start(); a.Next(); {
On 2014/03/31 15:39:01, rog wrote:
> It's a pity we don't have a method on Attempt that lets us use
> it in selects, so that it could be interrupted immediately.

> At some point, something like:

> // NextDuration reports the interval until the
> // next attempt should be made.
> func (a *Attempt) NextDuration() time.Duration

> might be good. Then you could write:

> for a := openAttempt.Start(); a.HasNext(); {
> select {
> case <-a.stop:
> return nil, parallel.ErrStopped
> case <-time.After(a.NextDuration()):
> }
> ....
> }

I looked briefly, but it looks a little complicated. I'll not attempt
(ha ha) this for the moment.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode172
state/api/apiclient.go:172: break
On 2014/03/31 15:39:01, rog wrote:
> this is a no-op.
> I think you probably want:

> return nil, parallel.ErrStopped

> It could probably do with a test.

/facepalm
thanks, fixed and added a test.

https://codereview.appspot.com/82450043/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'state/api/apiclient.go'
--- state/api/apiclient.go 2014-03-31 12:24:52 +0000
+++ state/api/apiclient.go 2014-03-31 14:44:35 +0000
@@ -6,23 +6,31 @@
6import (6import (
7 "crypto/tls"7 "crypto/tls"
8 "crypto/x509"8 "crypto/x509"
9 "io"
9 "time"10 "time"
1011
11 "code.google.com/p/go.net/websocket"12 "code.google.com/p/go.net/websocket"
13 "github.com/juju/loggo"
1214
13 "launchpad.net/juju-core/cert"15 "launchpad.net/juju-core/cert"
14 "launchpad.net/juju-core/instance"16 "launchpad.net/juju-core/instance"
15 "launchpad.net/juju-core/log"
16 "launchpad.net/juju-core/rpc"17 "launchpad.net/juju-core/rpc"
17 "launchpad.net/juju-core/rpc/jsoncodec"18 "launchpad.net/juju-core/rpc/jsoncodec"
18 "launchpad.net/juju-core/state/api/params"19 "launchpad.net/juju-core/state/api/params"
19 "launchpad.net/juju-core/utils"20 "launchpad.net/juju-core/utils"
21 "launchpad.net/juju-core/utils/parallel"
20)22)
2123
24var logger = loggo.GetLogger("juju.state.api")
25
22// PingPeriod defines how often the internal connection health check26// PingPeriod defines how often the internal connection health check
23// will run. It's a variable so it can be changed in tests.27// will run. It's a variable so it can be changed in tests.
24var PingPeriod = 1 * time.Minute28var PingPeriod = 1 * time.Minute
2529
30// maxParallelDial defines the maximum number addresses to dial in
31// parallel.
32const maxParallelDial = 7
33
26type State struct {34type State struct {
27 client *rpc.Conn35 client *rpc.Conn
28 conn *websocket.Conn36 conn *websocket.Conn
@@ -95,48 +103,36 @@
95}103}
96104
97func Open(info *Info, opts DialOpts) (*State, error) {105func Open(info *Info, opts DialOpts) (*State, error) {
98 // TODO Select a random address from info.Addrs
99 // and only fail when we've tried all the addresses.
100 // TODO what does "origin" really mean, and is localhost always ok?
101 cfg, err := websocket.NewConfig("wss://"+info.Addrs[0]+"/", "http://localhost/")
102 if err != nil {
103 return nil, err
104 }
105 pool := x509.NewCertPool()106 pool := x509.NewCertPool()
106 xcert, err := cert.ParseCert(info.CACert)107 xcert, err := cert.ParseCert(info.CACert)
107 if err != nil {108 if err != nil {
108 return nil, err109 return nil, err
109 }110 }
110 pool.AddCert(xcert)111 pool.AddCert(xcert)
111 cfg.TlsConfig = &tls.Config{112
112 RootCAs: pool,113 // Dial all addresses, with up to maxParallelDial in parallel.
113 ServerName: "anything",114 try := parallel.NewTry(maxParallelDial, nil)
114 }115 defer try.Kill()
115 var conn *websocket.Conn116 for _, addr := range info.Addrs {
116 openAttempt := utils.AttemptStrategy{117 if err := dialWebsocket(addr, opts, pool, try); err != nil {
117 Total: opts.Timeout,118 return nil, err
118 Delay: opts.RetryDelay,
119 }
120 for a := openAttempt.Start(); a.Next(); {
121 log.Infof("state/api: dialing %q", cfg.Location)
122 conn, err = websocket.DialConfig(cfg)
123 if err == nil {
124 break
125 }119 }
126 log.Errorf("state/api: %v", err)
127 }120 }
121 try.Close()
122 result, err := try.Result()
128 if err != nil {123 if err != nil {
129 return nil, err124 return nil, err
130 }125 }
131 log.Infof("state/api: connection established")126 conn := result.(*websocket.Conn)
127 logger.Infof("connection established to %q", conn.RemoteAddr())
132128
133 client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil)129 client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil)
134 client.Start()130 client.Start()
135 st := &State{131 st := &State{
136 client: client,132 client: client,
137 conn: conn,133 conn: conn,
138 addr: cfg.Location.Host,134 addr: conn.Config().Location.Host,
139 serverRoot: "https://" + cfg.Location.Host,135 serverRoot: "https://" + conn.Config().Location.Host,
140 tag: info.Tag,136 tag: info.Tag,
141 password: info.Password,137 password: info.Password,
142 }138 }
@@ -151,6 +147,43 @@
151 return st, nil147 return st, nil
152}148}
153149
150func dialWebsocket(addr string, opts DialOpts, rootCAs *x509.CertPool, try *parallel.Try) error {
151 // origin is required by the WebSocket API, used for "origin policy"
152 // in websockets. We pass localhost to satisfy the API; it is
153 // inconsequential to us.
154 const origin = "http://localhost/"
155 cfg, err := websocket.NewConfig("wss://"+addr+"/", origin)
156 if err != nil {
157 return err
158 }
159 cfg.TlsConfig = &tls.Config{
160 RootCAs: rootCAs,
161 ServerName: "anything",
162 }
163 openAttempt := utils.AttemptStrategy{
164 Total: opts.Timeout,
165 Delay: opts.RetryDelay,
166 }
167 return try.Start(func(stop <-chan struct{}) (io.Closer, error) {
168 err := parallel.ErrStopped
169 for a := openAttempt.Start(); a.Next(); {
170 select {
171 case <-stop:
172 break
173 default:
174 }
175 logger.Infof("dialing %q", cfg.Location)
176 var conn *websocket.Conn
177 conn, err = websocket.DialConfig(cfg)
178 if err == nil {
179 return conn, nil
180 }
181 logger.Debugf("error dialing API server, will retry: %v", err)
182 }
183 return nil, err
184 })
185}
186
154func (s *State) heartbeatMonitor() {187func (s *State) heartbeatMonitor() {
155 for {188 for {
156 if err := s.Ping(); err != nil {189 if err := s.Ping(); err != nil {
157190
=== added file 'state/api/apiclient_test.go'
--- state/api/apiclient_test.go 1970-01-01 00:00:00 +0000
+++ state/api/apiclient_test.go 2014-03-31 14:44:35 +0000
@@ -0,0 +1,79 @@
1// Copyright 2014 Canonical Ltd.
2// Licensed under the AGPLv3, see LICENCE file for details.
3
4package api_test
5
6import (
7 "io"
8 "net"
9
10 gc "launchpad.net/gocheck"
11
12 jujutesting "launchpad.net/juju-core/juju/testing"
13 "launchpad.net/juju-core/state/api"
14)
15
16type apiclientSuite struct {
17 jujutesting.JujuConnSuite
18}
19
20var _ = gc.Suite(&apiclientSuite{})
21
22func (s *apiclientSuite) TestOpenMultiple(c *gc.C) {
23 // Create a socket that proxies to the API server.
24 info := s.APIInfo(c)
25 serverAddr := info.Addrs[0]
26 server, err := net.Dial("tcp", serverAddr)
27 c.Assert(err, gc.IsNil)
28 defer server.Close()
29 listener, err := net.Listen("tcp", ":0")
30 c.Assert(err, gc.IsNil)
31 defer listener.Close()
32 go func() {
33 for {
34 client, err := listener.Accept()
35 if err != nil {
36 return
37 }
38 go io.Copy(client, server)
39 go io.Copy(server, client)
40 }
41 }()
42
43 // Check that we can use the proxy to connect.
44 proxyAddr := listener.Addr().String()
45 info.Addrs = []string{proxyAddr}
46 st, err := api.Open(info, api.DialOpts{})
47 c.Assert(err, gc.IsNil)
48 defer st.Close()
49 c.Assert(st.Addr(), gc.Equals, proxyAddr)
50
51 // Now break Addrs[0], and ensure that Addrs[1]
52 // is successfully connected to.
53 info.Addrs = []string{proxyAddr, serverAddr}
54 listener.Close()
55 st, err = api.Open(info, api.DialOpts{})
56 c.Assert(err, gc.IsNil)
57 defer st.Close()
58 c.Assert(st.Addr(), gc.Equals, serverAddr)
59}
60
61func (s *apiclientSuite) TestOpenMultipleError(c *gc.C) {
62 listener, err := net.Listen("tcp", ":0")
63 c.Assert(err, gc.IsNil)
64 defer listener.Close()
65 go func() {
66 for {
67 client, err := listener.Accept()
68 if err != nil {
69 return
70 }
71 client.Close()
72 }
73 }()
74 info := s.APIInfo(c)
75 addr := listener.Addr().String()
76 info.Addrs = []string{addr, addr, addr}
77 _, err = api.Open(info, api.DialOpts{})
78 c.Assert(err, gc.ErrorMatches, "websocket.Dial .*: read tcp .*: connection reset by peer")
79}
080
=== modified file 'state/api/state.go'
--- state/api/state.go 2014-03-31 12:24:52 +0000
+++ state/api/state.go 2014-03-31 14:44:35 +0000
@@ -14,7 +14,7 @@
14 "launchpad.net/juju-core/state/api/environment"14 "launchpad.net/juju-core/state/api/environment"
15 "launchpad.net/juju-core/state/api/firewaller"15 "launchpad.net/juju-core/state/api/firewaller"
16 "launchpad.net/juju-core/state/api/keyupdater"16 "launchpad.net/juju-core/state/api/keyupdater"
17 "launchpad.net/juju-core/state/api/logger"17 apilogger "launchpad.net/juju-core/state/api/logger"
18 "launchpad.net/juju-core/state/api/machiner"18 "launchpad.net/juju-core/state/api/machiner"
19 "launchpad.net/juju-core/state/api/params"19 "launchpad.net/juju-core/state/api/params"
20 "launchpad.net/juju-core/state/api/provisioner"20 "launchpad.net/juju-core/state/api/provisioner"
@@ -128,8 +128,8 @@
128}128}
129129
130// Logger returns access to the Logger API130// Logger returns access to the Logger API
131func (st *State) Logger() *logger.State {131func (st *State) Logger() *apilogger.State {
132 return logger.NewState(st)132 return apilogger.NewState(st)
133}133}
134134
135// KeyUpdater returns access to the KeyUpdater API135// KeyUpdater returns access to the KeyUpdater API

Subscribers

People subscribed via source and target branches

to status/vote changes: