Merge lp:~axwalk/juju-core/apiclient-open-parallel into lp:~go-bot/juju-core/trunk
- apiclient-open-parallel
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Andrew Wilkins |
Approved revision: | no longer in the source branch. |
Merged at revision: | 2520 |
Proposed branch: | lp:~axwalk/juju-core/apiclient-open-parallel |
Merge into: | lp:~go-bot/juju-core/trunk |
Diff against target: |
250 lines (+141/-29) 3 files modified
state/api/apiclient.go (+59/-26) state/api/apiclient_test.go (+79/-0) state/api/state.go (+3/-3) |
To merge this branch: | bzr merge lp:~axwalk/juju-core/apiclient-open-parallel |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+213401@code.launchpad.net |
Commit message
state/api: dial info.Addrs in parallel
This change is to dial all addresses in
parallel. The first one that successfully
connects is used.
Description of the change
state/api: dial info.Addrs in parallel
This change is to dial all addresses in
parallel. The first one that successfully
connects is used.
Andrew Wilkins (axwalk) wrote : | # |
Dimiter Naydenov (dimitern) wrote : | # |
LGTM
https:/
File state/api/
https:/
state/api/
d
John A Meinel (jameinel) wrote : | # |
A couple of comments, but nothing that would block landing this.
LGTM
https:/
File state/api/
https:/
state/api/
is localhost always ok?
origin would have sense if you were doing actual web requests, I think
it is the location that redirected you here.
I believe the websocket requires something, but it accepts localhost, so
we just do it.
It isn't actually used in our implementation, so it is ok.
See something like:
http://
It is intended to avoid cross-site scripting, etc. But we just let
people connect to our websockets, and do the auth inside the websocket
(as part of Login) rather than assuming that connections to us are
already valid.
https:/
state/api/
Would we want to check <-stop at this point, in case we got a stop while
the dial was processing?
I guess it has to be handled up a layer anyway. We just know that
DialConfig is blocking, so we sort of expect to be in the middle of
dialing while we get a stop request. (because someone else succeeded.)
https:/
state/api/
maybe a little more context about "error trying to Dial API server, will
retry: %v" ?
It depends what errors we generally get, but it seems helpful to give a
bit more context than just an error.
Andrew Wilkins (axwalk) wrote : | # |
Please take a look.
https:/
File state/api/
https:/
state/api/
is localhost always ok?
On 2014/03/31 12:22:11, jameinel wrote:
> origin would have sense if you were doing actual web requests, I think
it is the
> location that redirected you here.
> I believe the websocket requires something, but it accepts localhost,
so we just
> do it.
> It isn't actually used in our implementation, so it is ok.
> See something like:
http://
> It is intended to avoid cross-site scripting, etc. But we just let
people
> connect to our websockets, and do the auth inside the websocket (as
part of
> Login) rather than assuming that connections to us are already valid.
I just moved that TODO, but thanks for the info :)
I'll remove the TODO, and update the comment.
https:/
state/api/
On 2014/03/31 12:22:11, jameinel wrote:
> Would we want to check <-stop at this point, in case we got a stop
while the
> dial was processing?
> I guess it has to be handled up a layer anyway. We just know that
DialConfig is
> blocking, so we sort of expect to be in the middle of dialing while we
get a
> stop request. (because someone else succeeded.)
It is indeed handled higher up; utils/parallel.
closure that will dispose of the result by Close()ing it once "stop"
("dying" inside Try) is closed.
https:/
state/api/
On 2014/03/31 12:22:11, jameinel wrote:
> maybe a little more context about "error trying to Dial API server,
will retry:
> %v" ?
> It depends what errors we generally get, but it seems helpful to give
a bit more
> context than just an error.
Done.
https:/
File state/api/
https:/
state/api/
On 2014/03/31 11:55:23, dimitern wrote:
> d
Done.
Go Bot (go-bot) wrote : | # |
Attempt to merge into lp:juju-core failed due to conflicts:
text conflict in state/api/
Andrew Wilkins (axwalk) wrote : | # |
Please take a look.
Roger Peppe (rogpeppe) wrote : | # |
A few fixes that would be nice to land when possible.
https:/
File state/api/
https:/
state/api/
try); err != nil {
This doesn't seem quite right. Try.Start returns ErrStopped if an
earlier attempt has succeeded. If that happens, we want to stop calling
dialWebsocket.
I think something like this might be better:
for _, addr := range info.Addrs {
err := dialWebsocket(addr, opts, pool, try)
if err == parallel.ErrStopped {
break
}
if err != nil {
return nil, err
}
select{
case <-time.
case <-try.Dead():
}
}
with dialAddressInterval being something like 50ms,
so if the connect succeeds fast, we can avoid dialing
most of the addresses.
https:/
state/api/
This isn't quite right - if we used up all our attempts, we probably
want to return a timeout error rather than ErrStopped.
I quite like using HasNext for this kind of thing:
for a := openAttempt.
select{
case <-stop:
return nil, parallel.ErrStopped
default:
}
conn, err := websocket.
if err == nil {
return conn, nil
}
if !a.HasNext() {
return nil, fmt.Errorf("timed out connecting to %q",
cfg.Location)
}
}
panic("
https:/
state/api/
It's a pity we don't have a method on Attempt that lets us use
it in selects, so that it could be interrupted immediately.
At some point, something like:
// NextDuration reports the interval until the
// next attempt should be made.
func (a *Attempt) NextDuration() time.Duration
might be good. Then you could write:
for a := openAttempt.
select {
case <-a.stop:
return nil, parallel.ErrStopped
case <-time.
}
....
}
https:/
state/api/
this is a no-op.
I think you probably want:
return nil, parallel.ErrStopped
It could probably do with a test.
Andrew Wilkins (axwalk) wrote : | # |
Updates over here: https:/
https:/
File state/api/
https:/
state/api/
try); err != nil {
On 2014/03/31 15:39:01, rog wrote:
> This doesn't seem quite right. Try.Start returns ErrStopped if an
earlier
> attempt has succeeded. If that happens, we want to stop calling
dialWebsocket.
> I think something like this might be better:
> for _, addr := range info.Addrs {
> err := dialWebsocket(addr, opts, pool, try)
> if err == parallel.ErrStopped {
> break
> }
> if err != nil {
> return nil, err
> }
> select{
> case <-time.
> case <-try.Dead():
> }
> }
> with dialAddressInterval being something like 50ms,
> so if the connect succeeds fast, we can avoid dialing
> most of the addresses.
Done.
https:/
state/api/
On 2014/03/31 15:39:01, rog wrote:
> This isn't quite right - if we used up all our attempts, we probably
want to
> return a timeout error rather than ErrStopped.
> I quite like using HasNext for this kind of thing:
> for a := openAttempt.
> select{
> case <-stop:
> return nil, parallel.ErrStopped
> default:
> }
> conn, err := websocket.
> if err == nil {
> return conn, nil
> }
> if !a.HasNext() {
> return nil, fmt.Errorf("timed out connecting to %q",
cfg.Location)
> }
> }
> panic("
Done.
https:/
state/api/
On 2014/03/31 15:39:01, rog wrote:
> It's a pity we don't have a method on Attempt that lets us use
> it in selects, so that it could be interrupted immediately.
> At some point, something like:
> // NextDuration reports the interval until the
> // next attempt should be made.
> func (a *Attempt) NextDuration() time.Duration
> might be good. Then you could write:
> for a := openAttempt.
> select {
> case <-a.stop:
> return nil, parallel.ErrStopped
> case <-time.
> }
> ....
> }
I looked briefly, but it looks a little complicated. I'll not attempt
(ha ha) this for the moment.
https:/
state/api/
On 2014/03/31 15:39:01, rog wrote:
> this is a no-op.
> I think you probably want:
> return nil, parallel.ErrStopped
> It could probably do with a test.
/facepalm
thanks, fixed and added a test.
Preview Diff
1 | === modified file 'state/api/apiclient.go' |
2 | --- state/api/apiclient.go 2014-03-31 12:24:52 +0000 |
3 | +++ state/api/apiclient.go 2014-03-31 14:44:35 +0000 |
4 | @@ -6,23 +6,31 @@ |
5 | import ( |
6 | "crypto/tls" |
7 | "crypto/x509" |
8 | + "io" |
9 | "time" |
10 | |
11 | "code.google.com/p/go.net/websocket" |
12 | + "github.com/juju/loggo" |
13 | |
14 | "launchpad.net/juju-core/cert" |
15 | "launchpad.net/juju-core/instance" |
16 | - "launchpad.net/juju-core/log" |
17 | "launchpad.net/juju-core/rpc" |
18 | "launchpad.net/juju-core/rpc/jsoncodec" |
19 | "launchpad.net/juju-core/state/api/params" |
20 | "launchpad.net/juju-core/utils" |
21 | + "launchpad.net/juju-core/utils/parallel" |
22 | ) |
23 | |
24 | +var logger = loggo.GetLogger("juju.state.api") |
25 | + |
26 | // PingPeriod defines how often the internal connection health check |
27 | // will run. It's a variable so it can be changed in tests. |
28 | var PingPeriod = 1 * time.Minute |
29 | |
30 | +// maxParallelDial defines the maximum number addresses to dial in |
31 | +// parallel. |
32 | +const maxParallelDial = 7 |
33 | + |
34 | type State struct { |
35 | client *rpc.Conn |
36 | conn *websocket.Conn |
37 | @@ -95,48 +103,36 @@ |
38 | } |
39 | |
40 | func Open(info *Info, opts DialOpts) (*State, error) { |
41 | - // TODO Select a random address from info.Addrs |
42 | - // and only fail when we've tried all the addresses. |
43 | - // TODO what does "origin" really mean, and is localhost always ok? |
44 | - cfg, err := websocket.NewConfig("wss://"+info.Addrs[0]+"/", "http://localhost/") |
45 | - if err != nil { |
46 | - return nil, err |
47 | - } |
48 | pool := x509.NewCertPool() |
49 | xcert, err := cert.ParseCert(info.CACert) |
50 | if err != nil { |
51 | return nil, err |
52 | } |
53 | pool.AddCert(xcert) |
54 | - cfg.TlsConfig = &tls.Config{ |
55 | - RootCAs: pool, |
56 | - ServerName: "anything", |
57 | - } |
58 | - var conn *websocket.Conn |
59 | - openAttempt := utils.AttemptStrategy{ |
60 | - Total: opts.Timeout, |
61 | - Delay: opts.RetryDelay, |
62 | - } |
63 | - for a := openAttempt.Start(); a.Next(); { |
64 | - log.Infof("state/api: dialing %q", cfg.Location) |
65 | - conn, err = websocket.DialConfig(cfg) |
66 | - if err == nil { |
67 | - break |
68 | + |
69 | + // Dial all addresses, with up to maxParallelDial in parallel. |
70 | + try := parallel.NewTry(maxParallelDial, nil) |
71 | + defer try.Kill() |
72 | + for _, addr := range info.Addrs { |
73 | + if err := dialWebsocket(addr, opts, pool, try); err != nil { |
74 | + return nil, err |
75 | } |
76 | - log.Errorf("state/api: %v", err) |
77 | } |
78 | + try.Close() |
79 | + result, err := try.Result() |
80 | if err != nil { |
81 | return nil, err |
82 | } |
83 | - log.Infof("state/api: connection established") |
84 | + conn := result.(*websocket.Conn) |
85 | + logger.Infof("connection established to %q", conn.RemoteAddr()) |
86 | |
87 | client := rpc.NewConn(jsoncodec.NewWebsocket(conn), nil) |
88 | client.Start() |
89 | st := &State{ |
90 | client: client, |
91 | conn: conn, |
92 | - addr: cfg.Location.Host, |
93 | - serverRoot: "https://" + cfg.Location.Host, |
94 | + addr: conn.Config().Location.Host, |
95 | + serverRoot: "https://" + conn.Config().Location.Host, |
96 | tag: info.Tag, |
97 | password: info.Password, |
98 | } |
99 | @@ -151,6 +147,43 @@ |
100 | return st, nil |
101 | } |
102 | |
103 | +func dialWebsocket(addr string, opts DialOpts, rootCAs *x509.CertPool, try *parallel.Try) error { |
104 | + // origin is required by the WebSocket API, used for "origin policy" |
105 | + // in websockets. We pass localhost to satisfy the API; it is |
106 | + // inconsequential to us. |
107 | + const origin = "http://localhost/" |
108 | + cfg, err := websocket.NewConfig("wss://"+addr+"/", origin) |
109 | + if err != nil { |
110 | + return err |
111 | + } |
112 | + cfg.TlsConfig = &tls.Config{ |
113 | + RootCAs: rootCAs, |
114 | + ServerName: "anything", |
115 | + } |
116 | + openAttempt := utils.AttemptStrategy{ |
117 | + Total: opts.Timeout, |
118 | + Delay: opts.RetryDelay, |
119 | + } |
120 | + return try.Start(func(stop <-chan struct{}) (io.Closer, error) { |
121 | + err := parallel.ErrStopped |
122 | + for a := openAttempt.Start(); a.Next(); { |
123 | + select { |
124 | + case <-stop: |
125 | + break |
126 | + default: |
127 | + } |
128 | + logger.Infof("dialing %q", cfg.Location) |
129 | + var conn *websocket.Conn |
130 | + conn, err = websocket.DialConfig(cfg) |
131 | + if err == nil { |
132 | + return conn, nil |
133 | + } |
134 | + logger.Debugf("error dialing API server, will retry: %v", err) |
135 | + } |
136 | + return nil, err |
137 | + }) |
138 | +} |
139 | + |
140 | func (s *State) heartbeatMonitor() { |
141 | for { |
142 | if err := s.Ping(); err != nil { |
143 | |
144 | === added file 'state/api/apiclient_test.go' |
145 | --- state/api/apiclient_test.go 1970-01-01 00:00:00 +0000 |
146 | +++ state/api/apiclient_test.go 2014-03-31 14:44:35 +0000 |
147 | @@ -0,0 +1,79 @@ |
148 | +// Copyright 2014 Canonical Ltd. |
149 | +// Licensed under the AGPLv3, see LICENCE file for details. |
150 | + |
151 | +package api_test |
152 | + |
153 | +import ( |
154 | + "io" |
155 | + "net" |
156 | + |
157 | + gc "launchpad.net/gocheck" |
158 | + |
159 | + jujutesting "launchpad.net/juju-core/juju/testing" |
160 | + "launchpad.net/juju-core/state/api" |
161 | +) |
162 | + |
163 | +type apiclientSuite struct { |
164 | + jujutesting.JujuConnSuite |
165 | +} |
166 | + |
167 | +var _ = gc.Suite(&apiclientSuite{}) |
168 | + |
169 | +func (s *apiclientSuite) TestOpenMultiple(c *gc.C) { |
170 | + // Create a socket that proxies to the API server. |
171 | + info := s.APIInfo(c) |
172 | + serverAddr := info.Addrs[0] |
173 | + server, err := net.Dial("tcp", serverAddr) |
174 | + c.Assert(err, gc.IsNil) |
175 | + defer server.Close() |
176 | + listener, err := net.Listen("tcp", ":0") |
177 | + c.Assert(err, gc.IsNil) |
178 | + defer listener.Close() |
179 | + go func() { |
180 | + for { |
181 | + client, err := listener.Accept() |
182 | + if err != nil { |
183 | + return |
184 | + } |
185 | + go io.Copy(client, server) |
186 | + go io.Copy(server, client) |
187 | + } |
188 | + }() |
189 | + |
190 | + // Check that we can use the proxy to connect. |
191 | + proxyAddr := listener.Addr().String() |
192 | + info.Addrs = []string{proxyAddr} |
193 | + st, err := api.Open(info, api.DialOpts{}) |
194 | + c.Assert(err, gc.IsNil) |
195 | + defer st.Close() |
196 | + c.Assert(st.Addr(), gc.Equals, proxyAddr) |
197 | + |
198 | + // Now break Addrs[0], and ensure that Addrs[1] |
199 | + // is successfully connected to. |
200 | + info.Addrs = []string{proxyAddr, serverAddr} |
201 | + listener.Close() |
202 | + st, err = api.Open(info, api.DialOpts{}) |
203 | + c.Assert(err, gc.IsNil) |
204 | + defer st.Close() |
205 | + c.Assert(st.Addr(), gc.Equals, serverAddr) |
206 | +} |
207 | + |
208 | +func (s *apiclientSuite) TestOpenMultipleError(c *gc.C) { |
209 | + listener, err := net.Listen("tcp", ":0") |
210 | + c.Assert(err, gc.IsNil) |
211 | + defer listener.Close() |
212 | + go func() { |
213 | + for { |
214 | + client, err := listener.Accept() |
215 | + if err != nil { |
216 | + return |
217 | + } |
218 | + client.Close() |
219 | + } |
220 | + }() |
221 | + info := s.APIInfo(c) |
222 | + addr := listener.Addr().String() |
223 | + info.Addrs = []string{addr, addr, addr} |
224 | + _, err = api.Open(info, api.DialOpts{}) |
225 | + c.Assert(err, gc.ErrorMatches, "websocket.Dial .*: read tcp .*: connection reset by peer") |
226 | +} |
227 | |
228 | === modified file 'state/api/state.go' |
229 | --- state/api/state.go 2014-03-31 12:24:52 +0000 |
230 | +++ state/api/state.go 2014-03-31 14:44:35 +0000 |
231 | @@ -14,7 +14,7 @@ |
232 | "launchpad.net/juju-core/state/api/environment" |
233 | "launchpad.net/juju-core/state/api/firewaller" |
234 | "launchpad.net/juju-core/state/api/keyupdater" |
235 | - "launchpad.net/juju-core/state/api/logger" |
236 | + apilogger "launchpad.net/juju-core/state/api/logger" |
237 | "launchpad.net/juju-core/state/api/machiner" |
238 | "launchpad.net/juju-core/state/api/params" |
239 | "launchpad.net/juju-core/state/api/provisioner" |
240 | @@ -128,8 +128,8 @@ |
241 | } |
242 | |
243 | // Logger returns access to the Logger API |
244 | -func (st *State) Logger() *logger.State { |
245 | - return logger.NewState(st) |
246 | +func (st *State) Logger() *apilogger.State { |
247 | + return apilogger.NewState(st) |
248 | } |
249 | |
250 | // KeyUpdater returns access to the KeyUpdater API |
Reviewers: mp+213401_ code.launchpad. net,
Message:
Please take a look.
Description:
state/api: dial info.Addrs in parallel
This change is to dial all addresses in
parallel. The first one that successfully
connects is used.
https:/ /code.launchpad .net/~axwalk/ juju-core/ apiclient- open-parallel/ +merge/ 213401
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/82450043/
Affected files (+135, -25 lines): apiclient. go apiclient_ test.go
A [revision details]
M state/api/
A state/api/