Merge lp:~axwalk/juju-core/apiclient-open-parallel into lp:~go-bot/juju-core/trunk

Proposed by Andrew Wilkins
Status: Merged
Approved by: Andrew Wilkins
Approved revision: no longer in the source branch.
Merged at revision: 2520
Proposed branch: lp:~axwalk/juju-core/apiclient-open-parallel
Merge into: lp:~go-bot/juju-core/trunk
Diff against target: 250 lines (+141/-29)
3 files modified
state/api/apiclient.go (+59/-26)
state/api/apiclient_test.go (+79/-0)
state/api/state.go (+3/-3)
To merge this branch: bzr merge lp:~axwalk/juju-core/apiclient-open-parallel
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+213401@code.launchpad.net

Commit message

state/api: dial info.Addrs in parallel

This change is to dial all addresses in
parallel. The first one that successfully
connects is used.

https://codereview.appspot.com/82450043/

Description of the change

state/api: dial info.Addrs in parallel

This change is to dial all addresses in
parallel. The first one that successfully
connects is used.

https://codereview.appspot.com/82450043/

To post a comment you must log in.
Revision history for this message
Andrew Wilkins (axwalk) wrote :

Reviewers: mp+213401_code.launchpad.net,

Message:
Please take a look.

Description:
state/api: dial info.Addrs in parallel

This change is to dial all addresses in
parallel. The first one that successfully
connects is used.

https://code.launchpad.net/~axwalk/juju-core/apiclient-open-parallel/+merge/213401

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/82450043/

Affected files (+135, -25 lines):
   A [revision details]
   M state/api/apiclient.go
   A state/api/apiclient_test.go

Revision history for this message
Dimiter Naydenov (dimitern) wrote :
Revision history for this message
John A Meinel (jameinel) wrote :

A couple of comments, but nothing that would block landing this.

LGTM

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode142
state/api/apiclient.go:142: // TODO what does "origin" really mean, and
is localhost always ok?
origin would have sense if you were doing actual web requests, I think
it is the location that redirected you here.
I believe the websocket requires something, but it accepts localhost, so
we just do it.

It isn't actually used in our implementation, so it is ok.

See something like:
http://learnitcorrect.com/blog/websocket-is-great-but-not-the-origin-policy.html

It is intended to avoid cross-site scripting, etc. But we just let
people connect to our websockets, and do the auth inside the websocket
(as part of Login) rather than assuming that connections to us are
already valid.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode167
state/api/apiclient.go:167: return conn, nil
Would we want to check <-stop at this point, in case we got a stop while
the dial was processing?

I guess it has to be handled up a layer anyway. We just know that
DialConfig is blocking, so we sort of expect to be in the middle of
dialing while we get a stop request. (because someone else succeeded.)

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: log.Debugf("state/api: %v", err)
maybe a little more context about "error trying to Dial API server, will
retry: %v" ?

It depends what errors we generally get, but it seems helpful to give a
bit more context than just an error.

https://codereview.appspot.com/82450043/

Revision history for this message
Andrew Wilkins (axwalk) wrote :

Please take a look.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode142
state/api/apiclient.go:142: // TODO what does "origin" really mean, and
is localhost always ok?
On 2014/03/31 12:22:11, jameinel wrote:
> origin would have sense if you were doing actual web requests, I think
it is the
> location that redirected you here.
> I believe the websocket requires something, but it accepts localhost,
so we just
> do it.

> It isn't actually used in our implementation, so it is ok.

> See something like:

http://learnitcorrect.com/blog/websocket-is-great-but-not-the-origin-policy.html

> It is intended to avoid cross-site scripting, etc. But we just let
people
> connect to our websockets, and do the auth inside the websocket (as
part of
> Login) rather than assuming that connections to us are already valid.

I just moved that TODO, but thanks for the info :)
I'll remove the TODO, and update the comment.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode167
state/api/apiclient.go:167: return conn, nil
On 2014/03/31 12:22:11, jameinel wrote:
> Would we want to check <-stop at this point, in case we got a stop
while the
> dial was processing?

> I guess it has to be handled up a layer anyway. We just know that
DialConfig is
> blocking, so we sort of expect to be in the middle of dialing while we
get a
> stop request. (because someone else succeeded.)

It is indeed handled higher up; utils/parallel.Try.Start creates a
closure that will dispose of the result by Close()ing it once "stop"
("dying" inside Try) is closed.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: log.Debugf("state/api: %v", err)
On 2014/03/31 12:22:11, jameinel wrote:
> maybe a little more context about "error trying to Dial API server,
will retry:
> %v" ?

> It depends what errors we generally get, but it seems helpful to give
a bit more
> context than just an error.

Done.

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient_test.go
File state/api/apiclient_test.go (right):

https://codereview.appspot.com/82450043/diff/1/state/api/apiclient_test.go#newcode10
state/api/apiclient_test.go:10: //jc "github.com/juju/testing/checkers"
On 2014/03/31 11:55:23, dimitern wrote:
> d

Done.

https://codereview.appspot.com/82450043/

Revision history for this message
Go Bot (go-bot) wrote :

Attempt to merge into lp:juju-core failed due to conflicts:

text conflict in state/api/apiclient.go

Revision history for this message
Andrew Wilkins (axwalk) wrote :
Revision history for this message
Roger Peppe (rogpeppe) wrote :

A few fixes that would be nice to land when possible.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode117
state/api/apiclient.go:117: if err := dialWebsocket(addr, opts, pool,
try); err != nil {
This doesn't seem quite right. Try.Start returns ErrStopped if an
earlier attempt has succeeded. If that happens, we want to stop calling
dialWebsocket.

I think something like this might be better:

for _, addr := range info.Addrs {
     err := dialWebsocket(addr, opts, pool, try)
     if err == parallel.ErrStopped {
        break
     }
     if err != nil {
         return nil, err
     }
     select{
     case <-time.After(dialAddressInterval):
     case <-try.Dead():
     }
}

with dialAddressInterval being something like 50ms,
so if the connect succeeds fast, we can avoid dialing
most of the addresses.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode168
state/api/apiclient.go:168: err := parallel.ErrStopped
This isn't quite right - if we used up all our attempts, we probably
want to return a timeout error rather than ErrStopped.

I quite like using HasNext for this kind of thing:

for a := openAttempt.Start(); a.Next(); {
    select{
    case <-stop:
        return nil, parallel.ErrStopped
    default:
    }
    conn, err := websocket.DialConfig(...)
    if err == nil {
        return conn, nil
    }
    if !a.HasNext() {
        return nil, fmt.Errorf("timed out connecting to %q",
cfg.Location)
    }
}
panic("unreachable")

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: for a := openAttempt.Start(); a.Next(); {
It's a pity we don't have a method on Attempt that lets us use
it in selects, so that it could be interrupted immediately.

At some point, something like:

// NextDuration reports the interval until the
// next attempt should be made.
func (a *Attempt) NextDuration() time.Duration

might be good. Then you could write:

for a := openAttempt.Start(); a.HasNext(); {
     select {
     case <-a.stop:
         return nil, parallel.ErrStopped
     case <-time.After(a.NextDuration()):
     }
     ....
}

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode172
state/api/apiclient.go:172: break
this is a no-op.
I think you probably want:

    return nil, parallel.ErrStopped

It could probably do with a test.

https://codereview.appspot.com/82450043/

Revision history for this message
Andrew Wilkins (axwalk) wrote :

Updates over here: https://codereview.appspot.com/82900045/

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go
File state/api/apiclient.go (right):

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode117
state/api/apiclient.go:117: if err := dialWebsocket(addr, opts, pool,
try); err != nil {
On 2014/03/31 15:39:01, rog wrote:
> This doesn't seem quite right. Try.Start returns ErrStopped if an
earlier
> attempt has succeeded. If that happens, we want to stop calling
dialWebsocket.

> I think something like this might be better:

> for _, addr := range info.Addrs {
> err := dialWebsocket(addr, opts, pool, try)
> if err == parallel.ErrStopped {
> break
> }
> if err != nil {
> return nil, err
> }
> select{
> case <-time.After(dialAddressInterval):
> case <-try.Dead():
> }
> }

> with dialAddressInterval being something like 50ms,
> so if the connect succeeds fast, we can avoid dialing
> most of the addresses.

Done.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode168
state/api/apiclient.go:168: err := parallel.ErrStopped
On 2014/03/31 15:39:01, rog wrote:
> This isn't quite right - if we used up all our attempts, we probably
want to
> return a timeout error rather than ErrStopped.

> I quite like using HasNext for this kind of thing:

> for a := openAttempt.Start(); a.Next(); {
> select{
> case <-stop:
> return nil, parallel.ErrStopped
> default:
> }
> conn, err := websocket.DialConfig(...)
> if err == nil {
> return conn, nil
> }
> if !a.HasNext() {
> return nil, fmt.Errorf("timed out connecting to %q",
cfg.Location)
> }
> }
> panic("unreachable")

Done.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode169
state/api/apiclient.go:169: for a := openAttempt.Start(); a.Next(); {
On 2014/03/31 15:39:01, rog wrote:
> It's a pity we don't have a method on Attempt that lets us use
> it in selects, so that it could be interrupted immediately.

> At some point, something like:

> // NextDuration reports the interval until the
> // next attempt should be made.
> func (a *Attempt) NextDuration() time.Duration

> might be good. Then you could write:

> for a := openAttempt.Start(); a.HasNext(); {
> select {
> case <-a.stop:
> return nil, parallel.ErrStopped
> case <-time.After(a.NextDuration()):
> }
> ....
> }

I looked briefly, but it looks a little complicated. I'll not attempt
(ha ha) this for the moment.

https://codereview.appspot.com/82450043/diff/40001/state/api/apiclient.go#newcode172
state/api/apiclient.go:172: break
On 2014/03/31 15:39:01, rog wrote:
> this is a no-op.
> I think you probably want:

> return nil, parallel.ErrStopped

> It could probably do with a test.

/facepalm
thanks, fixed and added a test.

https://codereview.appspot.com/82450043/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'state/api/apiclient.go'
2--- state/api/apiclient.go 2014-03-31 12:24:52 +0000
3+++ state/api/apiclient.go 2014-03-31 14:44:35 +0000
4@@ -6,23 +6,31 @@
5 import (
6 "crypto/tls"
7 "crypto/x509"
8+ "io"
9 "time"
10
11 "code.google.com/p/go.net/websocket"
12+ "github.com/juju/loggo"
13
14 "launchpad.net/juju-core/cert"
15 "launchpad.net/juju-core/instance"
16- "launchpad.net/juju-core/log"
17 "launchpad.net/juju-core/rpc"
18 "launchpad.net/juju-core/rpc/jsoncodec"
19 "launchpad.net/juju-core/state/api/params"
20 "launchpad.net/juju-core/utils"
21+ "launchpad.net/juju-core/utils/parallel"
22 )
23
24+var logger = loggo.GetLogger("juju.state.api")
25+
26 // PingPeriod defines how often the internal connection health check
27 // will run. It's a variable so it can be changed in tests.
28 var PingPeriod = 1 * time.Minute
29
30+// maxParallelDial defines the maximum number addresses to dial in
31+// parallel.
32+const maxParallelDial = 7
33+
34 type State struct {
35 client *rpc.Conn
36 conn *websocket.Conn
37@@ -95,48 +103,36 @@
38 }
39
40 func Open(info *Info, opts DialOpts) (*State, error) {
41- // TODO Select a random address from info.Addrs
42- // and only fail when we've tried all the addresses.
43- // TODO what does "origin" really mean, and is localhost always ok?
44- cfg, err := websocket.NewConfig("wss://"+info.Addrs[0]+"/", "http://localhost/")
45- if err != nil {
46- return nil, err
47- }
48 pool := x509.NewCertPool()
49 xcert, err := cert.ParseCert(info.CACert)
50 if err != nil {
51 return nil, err
52 }
53 pool.AddCert(xcert)
54- cfg.TlsConfig = &tls.Config{
55- RootCAs: pool,
56- ServerName: "anything",
57- }
58- var conn *websocket.Conn
59- openAttempt := utils.AttemptStrategy{
60- Total: opts.Timeout,
61- Delay: opts.RetryDelay,
62- }
63- for a := openAttempt.Start(); a.Next(); {
64- log.Infof("state/api: dialing %q", cfg.Location)
65- conn, err = websocket.DialConfig(cfg)
66- if err == nil {
67- break
68+
69+ // Dial all addresses, with up to maxParallelDial in parallel.
70+ try := parallel.NewTry(maxParallelDial, nil)
71+ defer try.Kill()
72+ for _, addr := range info.Addrs {
73+ if err := dialWebsocket(addr, opts, pool, try); err != nil {
74+ return nil, err
75 }
76- log.Errorf("state/api: %v", err)
77 }
78+ try.Close()
79+ result, err := try.Result()
80 if err != nil {
81 return nil, err
82 }
83- log.Infof("state/api: connection established")
84+ conn := result.(*websocket.Conn)
85+ logger.Infof("connection established to %q", conn.RemoteAddr())
86
87 client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil)
88 client.Start()
89 st := &State{
90 client: client,
91 conn: conn,
92- addr: cfg.Location.Host,
93- serverRoot: "https://" + cfg.Location.Host,
94+ addr: conn.Config().Location.Host,
95+ serverRoot: "https://" + conn.Config().Location.Host,
96 tag: info.Tag,
97 password: info.Password,
98 }
99@@ -151,6 +147,43 @@
100 return st, nil
101 }
102
103+func dialWebsocket(addr string, opts DialOpts, rootCAs *x509.CertPool, try *parallel.Try) error {
104+ // origin is required by the WebSocket API, used for "origin policy"
105+ // in websockets. We pass localhost to satisfy the API; it is
106+ // inconsequential to us.
107+ const origin = "http://localhost/"
108+ cfg, err := websocket.NewConfig("wss://"+addr+"/", origin)
109+ if err != nil {
110+ return err
111+ }
112+ cfg.TlsConfig = &tls.Config{
113+ RootCAs: rootCAs,
114+ ServerName: "anything",
115+ }
116+ openAttempt := utils.AttemptStrategy{
117+ Total: opts.Timeout,
118+ Delay: opts.RetryDelay,
119+ }
120+ return try.Start(func(stop <-chan struct{}) (io.Closer, error) {
121+ err := parallel.ErrStopped
122+ for a := openAttempt.Start(); a.Next(); {
123+ select {
124+ case <-stop:
125+ break
126+ default:
127+ }
128+ logger.Infof("dialing %q", cfg.Location)
129+ var conn *websocket.Conn
130+ conn, err = websocket.DialConfig(cfg)
131+ if err == nil {
132+ return conn, nil
133+ }
134+ logger.Debugf("error dialing API server, will retry: %v", err)
135+ }
136+ return nil, err
137+ })
138+}
139+
140 func (s *State) heartbeatMonitor() {
141 for {
142 if err := s.Ping(); err != nil {
143
144=== added file 'state/api/apiclient_test.go'
145--- state/api/apiclient_test.go 1970-01-01 00:00:00 +0000
146+++ state/api/apiclient_test.go 2014-03-31 14:44:35 +0000
147@@ -0,0 +1,79 @@
148+// Copyright 2014 Canonical Ltd.
149+// Licensed under the AGPLv3, see LICENCE file for details.
150+
151+package api_test
152+
153+import (
154+ "io"
155+ "net"
156+
157+ gc "launchpad.net/gocheck"
158+
159+ jujutesting "launchpad.net/juju-core/juju/testing"
160+ "launchpad.net/juju-core/state/api"
161+)
162+
163+type apiclientSuite struct {
164+ jujutesting.JujuConnSuite
165+}
166+
167+var _ = gc.Suite(&apiclientSuite{})
168+
169+func (s *apiclientSuite) TestOpenMultiple(c *gc.C) {
170+ // Create a socket that proxies to the API server.
171+ info := s.APIInfo(c)
172+ serverAddr := info.Addrs[0]
173+ server, err := net.Dial("tcp", serverAddr)
174+ c.Assert(err, gc.IsNil)
175+ defer server.Close()
176+ listener, err := net.Listen("tcp", ":0")
177+ c.Assert(err, gc.IsNil)
178+ defer listener.Close()
179+ go func() {
180+ for {
181+ client, err := listener.Accept()
182+ if err != nil {
183+ return
184+ }
185+ go io.Copy(client, server)
186+ go io.Copy(server, client)
187+ }
188+ }()
189+
190+ // Check that we can use the proxy to connect.
191+ proxyAddr := listener.Addr().String()
192+ info.Addrs = []string{proxyAddr}
193+ st, err := api.Open(info, api.DialOpts{})
194+ c.Assert(err, gc.IsNil)
195+ defer st.Close()
196+ c.Assert(st.Addr(), gc.Equals, proxyAddr)
197+
198+ // Now break Addrs[0], and ensure that Addrs[1]
199+ // is successfully connected to.
200+ info.Addrs = []string{proxyAddr, serverAddr}
201+ listener.Close()
202+ st, err = api.Open(info, api.DialOpts{})
203+ c.Assert(err, gc.IsNil)
204+ defer st.Close()
205+ c.Assert(st.Addr(), gc.Equals, serverAddr)
206+}
207+
208+func (s *apiclientSuite) TestOpenMultipleError(c *gc.C) {
209+ listener, err := net.Listen("tcp", ":0")
210+ c.Assert(err, gc.IsNil)
211+ defer listener.Close()
212+ go func() {
213+ for {
214+ client, err := listener.Accept()
215+ if err != nil {
216+ return
217+ }
218+ client.Close()
219+ }
220+ }()
221+ info := s.APIInfo(c)
222+ addr := listener.Addr().String()
223+ info.Addrs = []string{addr, addr, addr}
224+ _, err = api.Open(info, api.DialOpts{})
225+ c.Assert(err, gc.ErrorMatches, "websocket.Dial .*: read tcp .*: connection reset by peer")
226+}
227
228=== modified file 'state/api/state.go'
229--- state/api/state.go 2014-03-31 12:24:52 +0000
230+++ state/api/state.go 2014-03-31 14:44:35 +0000
231@@ -14,7 +14,7 @@
232 "launchpad.net/juju-core/state/api/environment"
233 "launchpad.net/juju-core/state/api/firewaller"
234 "launchpad.net/juju-core/state/api/keyupdater"
235- "launchpad.net/juju-core/state/api/logger"
236+ apilogger "launchpad.net/juju-core/state/api/logger"
237 "launchpad.net/juju-core/state/api/machiner"
238 "launchpad.net/juju-core/state/api/params"
239 "launchpad.net/juju-core/state/api/provisioner"
240@@ -128,8 +128,8 @@
241 }
242
243 // Logger returns access to the Logger API
244-func (st *State) Logger() *logger.State {
245- return logger.NewState(st)
246+func (st *State) Logger() *apilogger.State {
247+ return apilogger.NewState(st)
248 }
249
250 // KeyUpdater returns access to the KeyUpdater API

Subscribers

People subscribed via source and target branches

to status/vote changes: