Merge lp:~axwalk/juju-core/apiclient-open-parallel-fixes into lp:~go-bot/juju-core/trunk
- apiclient-open-parallel-fixes
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Andrew Wilkins |
Approved revision: | no longer in the source branch. |
Merged at revision: | 2526 |
Proposed branch: | lp:~axwalk/juju-core/apiclient-open-parallel-fixes |
Merge into: | lp:~go-bot/juju-core/trunk |
Diff against target: |
141 lines (+48/-12) 3 files modified
state/api/apiclient.go (+33/-11) state/api/apiclient_test.go (+11/-1) state/api/export_test.go (+4/-0) |
To merge this branch: | bzr merge lp:~axwalk/juju-core/apiclient-open-parallel-fixes |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+213569@code.launchpad.net |
Commit message
state/api: fixes to parallel api.Open
With these changes, api.Open will now
do the right thing if one of the dialers
succeeds before all are started. We also
introduce a short delay between starting
dialers, to avoid unnecessary dial
attempts.
Description of the change
state/api: fixes to parallel api.Open
With these changes, api.Open will now
do the right thing if one of the dialers
succeeds before all are started. We also
introduce a short delay between starting
dialers, to avoid unnecessary dial
attempts.
Andrew Wilkins (axwalk) wrote : | # |
William Reade (fwereade) wrote : | # |
LGTM
https:/
File state/api/
https:/
state/api/
out connecting to "wss://.*/"`)
much nicer, thanks
Roger Peppe (rogpeppe) wrote : | # |
LGTM, thanks
https:/
File state/api/
https:/
state/api/
s/else//
https:/
state/api/
will retry: %v", err)
perhaps include the cfg.Location here, otherwise when we've got a few of
these happening concurrently, we won't be able to tell which address is
giving which error.
Andrew Wilkins (axwalk) wrote : | # |
Please take a look.
https:/
File state/api/
https:/
state/api/
On 2014/04/01 07:42:07, rog wrote:
> s/else//
Done.
https:/
state/api/
will retry: %v", err)
On 2014/04/01 07:42:07, rog wrote:
> perhaps include the cfg.Location here, otherwise when we've got a few
of these
> happening concurrently, we won't be able to tell which address is
giving which
> error.
Done.
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~axwalk/juju-core/apiclient-open-parallel-fixes into lp:juju-core failed. Below is the output from the failed tests.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
? launchpad.
? launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~axwalk/juju-core/apiclient-open-parallel-fixes into lp:juju-core failed. Below is the output from the failed tests.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
? launchpad.
warning: building out-of-date packages:
launchpad.
launchpad.
github.
github.
github.
github.
github.
github.
launchpad.
launchpad.
launchpad.
launchpad.
launchpad.
launchpad.
installing these packages with 'go test -i ./...' will speed future tests.
Go Bot (go-bot) wrote : | # |
The attempt to merge lp:~axwalk/juju-core/apiclient-open-parallel-fixes into lp:juju-core failed. Below is the output from the failed tests.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
? launchpad.
? launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
? launchpad.
ok launchpad.
ok launchpad.
Preview Diff
1 | === modified file 'state/api/apiclient.go' | |||
2 | --- state/api/apiclient.go 2014-03-31 14:43:24 +0000 | |||
3 | +++ state/api/apiclient.go 2014-04-01 07:50:42 +0000 | |||
4 | @@ -6,6 +6,7 @@ | |||
5 | 6 | import ( | 6 | import ( |
6 | 7 | "crypto/tls" | 7 | "crypto/tls" |
7 | 8 | "crypto/x509" | 8 | "crypto/x509" |
8 | 9 | "fmt" | ||
9 | 9 | "io" | 10 | "io" |
10 | 10 | "time" | 11 | "time" |
11 | 11 | 12 | ||
12 | @@ -84,6 +85,10 @@ | |||
13 | 84 | // DialOpts holds configuration parameters that control the | 85 | // DialOpts holds configuration parameters that control the |
14 | 85 | // Dialing behavior when connecting to a state server. | 86 | // Dialing behavior when connecting to a state server. |
15 | 86 | type DialOpts struct { | 87 | type DialOpts struct { |
16 | 88 | // DialAddressInterval is the amount of time to wait | ||
17 | 89 | // before starting to dial another address. | ||
18 | 90 | DialAddressInterval time.Duration | ||
19 | 91 | |||
20 | 87 | // Timeout is the amount of time to wait contacting | 92 | // Timeout is the amount of time to wait contacting |
21 | 88 | // a state server. | 93 | // a state server. |
22 | 89 | Timeout time.Duration | 94 | Timeout time.Duration |
23 | @@ -97,8 +102,9 @@ | |||
24 | 97 | // parameters for contacting a state server. | 102 | // parameters for contacting a state server. |
25 | 98 | func DefaultDialOpts() DialOpts { | 103 | func DefaultDialOpts() DialOpts { |
26 | 99 | return DialOpts{ | 104 | return DialOpts{ |
29 | 100 | Timeout: 10 * time.Minute, | 105 | DialAddressInterval: 50 * time.Millisecond, |
30 | 101 | RetryDelay: 2 * time.Second, | 106 | Timeout: 10 * time.Minute, |
31 | 107 | RetryDelay: 2 * time.Second, | ||
32 | 102 | } | 108 | } |
33 | 103 | } | 109 | } |
34 | 104 | 110 | ||
35 | @@ -114,9 +120,17 @@ | |||
36 | 114 | try := parallel.NewTry(maxParallelDial, nil) | 120 | try := parallel.NewTry(maxParallelDial, nil) |
37 | 115 | defer try.Kill() | 121 | defer try.Kill() |
38 | 116 | for _, addr := range info.Addrs { | 122 | for _, addr := range info.Addrs { |
40 | 117 | if err := dialWebsocket(addr, opts, pool, try); err != nil { | 123 | err := dialWebsocket(addr, opts, pool, try) |
41 | 124 | if err == parallel.ErrStopped { | ||
42 | 125 | break | ||
43 | 126 | } | ||
44 | 127 | if err != nil { | ||
45 | 118 | return nil, err | 128 | return nil, err |
46 | 119 | } | 129 | } |
47 | 130 | select { | ||
48 | 131 | case <-time.After(opts.DialAddressInterval): | ||
49 | 132 | case <-try.Dead(): | ||
50 | 133 | } | ||
51 | 120 | } | 134 | } |
52 | 121 | try.Close() | 135 | try.Close() |
53 | 122 | result, err := try.Result() | 136 | result, err := try.Result() |
54 | @@ -160,28 +174,36 @@ | |||
55 | 160 | RootCAs: rootCAs, | 174 | RootCAs: rootCAs, |
56 | 161 | ServerName: "anything", | 175 | ServerName: "anything", |
57 | 162 | } | 176 | } |
58 | 177 | return try.Start(newWebsocketDialer(cfg, opts)) | ||
59 | 178 | } | ||
60 | 179 | |||
61 | 180 | // new WebsocketDialler returns a function that | ||
62 | 181 | // can be passed to utils/parallel.Try.Start. | ||
63 | 182 | func newWebsocketDialer(cfg *websocket.Config, opts DialOpts) func(<-chan struct{}) (io.Closer, error) { | ||
64 | 163 | openAttempt := utils.AttemptStrategy{ | 183 | openAttempt := utils.AttemptStrategy{ |
65 | 164 | Total: opts.Timeout, | 184 | Total: opts.Timeout, |
66 | 165 | Delay: opts.RetryDelay, | 185 | Delay: opts.RetryDelay, |
67 | 166 | } | 186 | } |
70 | 167 | return try.Start(func(stop <-chan struct{}) (io.Closer, error) { | 187 | return func(stop <-chan struct{}) (io.Closer, error) { |
69 | 168 | err := parallel.ErrStopped | ||
71 | 169 | for a := openAttempt.Start(); a.Next(); { | 188 | for a := openAttempt.Start(); a.Next(); { |
72 | 170 | select { | 189 | select { |
73 | 171 | case <-stop: | 190 | case <-stop: |
75 | 172 | break | 191 | return nil, parallel.ErrStopped |
76 | 173 | default: | 192 | default: |
77 | 174 | } | 193 | } |
78 | 175 | logger.Infof("dialing %q", cfg.Location) | 194 | logger.Infof("dialing %q", cfg.Location) |
81 | 176 | var conn *websocket.Conn | 195 | conn, err := websocket.DialConfig(cfg) |
80 | 177 | conn, err = websocket.DialConfig(cfg) | ||
82 | 178 | if err == nil { | 196 | if err == nil { |
83 | 179 | return conn, nil | 197 | return conn, nil |
84 | 180 | } | 198 | } |
86 | 181 | logger.Debugf("error dialing API server, will retry: %v", err) | 199 | if a.HasNext() { |
87 | 200 | logger.Debugf("error dialing %q, will retry: %v", cfg.Location, err) | ||
88 | 201 | } else { | ||
89 | 202 | return nil, fmt.Errorf("timed out connecting to %q", cfg.Location) | ||
90 | 203 | } | ||
91 | 182 | } | 204 | } |
94 | 183 | return nil, err | 205 | panic("unreachable") |
95 | 184 | }) | 206 | } |
96 | 185 | } | 207 | } |
97 | 186 | 208 | ||
98 | 187 | func (s *State) heartbeatMonitor() { | 209 | func (s *State) heartbeatMonitor() { |
99 | 188 | 210 | ||
100 | === modified file 'state/api/apiclient_test.go' | |||
101 | --- state/api/apiclient_test.go 2014-03-31 13:03:59 +0000 | |||
102 | +++ state/api/apiclient_test.go 2014-04-01 07:50:42 +0000 | |||
103 | @@ -11,6 +11,7 @@ | |||
104 | 11 | 11 | ||
105 | 12 | jujutesting "launchpad.net/juju-core/juju/testing" | 12 | jujutesting "launchpad.net/juju-core/juju/testing" |
106 | 13 | "launchpad.net/juju-core/state/api" | 13 | "launchpad.net/juju-core/state/api" |
107 | 14 | "launchpad.net/juju-core/utils/parallel" | ||
108 | 14 | ) | 15 | ) |
109 | 15 | 16 | ||
110 | 16 | type apiclientSuite struct { | 17 | type apiclientSuite struct { |
111 | @@ -75,5 +76,14 @@ | |||
112 | 75 | addr := listener.Addr().String() | 76 | addr := listener.Addr().String() |
113 | 76 | info.Addrs = []string{addr, addr, addr} | 77 | info.Addrs = []string{addr, addr, addr} |
114 | 77 | _, err = api.Open(info, api.DialOpts{}) | 78 | _, err = api.Open(info, api.DialOpts{}) |
116 | 78 | c.Assert(err, gc.ErrorMatches, "websocket.Dial .*: read tcp .*: connection reset by peer") | 79 | c.Assert(err, gc.ErrorMatches, `timed out connecting to "wss://.*/"`) |
117 | 80 | } | ||
118 | 81 | |||
119 | 82 | func (s *apiclientSuite) TestDialWebsocketStopped(c *gc.C) { | ||
120 | 83 | stopped := make(chan struct{}) | ||
121 | 84 | f := api.NewWebsocketDialer(nil, api.DialOpts{}) | ||
122 | 85 | close(stopped) | ||
123 | 86 | result, err := f(stopped) | ||
124 | 87 | c.Assert(err, gc.Equals, parallel.ErrStopped) | ||
125 | 88 | c.Assert(result, gc.IsNil) | ||
126 | 79 | } | 89 | } |
127 | 80 | 90 | ||
128 | === modified file 'state/api/export_test.go' | |||
129 | --- state/api/export_test.go 2013-12-17 15:30:13 +0000 | |||
130 | +++ state/api/export_test.go 2014-04-01 07:50:42 +0000 | |||
131 | @@ -3,6 +3,10 @@ | |||
132 | 3 | 3 | ||
133 | 4 | package api | 4 | package api |
134 | 5 | 5 | ||
135 | 6 | var ( | ||
136 | 7 | NewWebsocketDialer = newWebsocketDialer | ||
137 | 8 | ) | ||
138 | 9 | |||
139 | 6 | // SetServerRoot allows changing the URL to the internal API server | 10 | // SetServerRoot allows changing the URL to the internal API server |
140 | 7 | // that AddLocalCharm uses in order to test NotImplementedError. | 11 | // that AddLocalCharm uses in order to test NotImplementedError. |
141 | 8 | func SetServerRoot(c *Client, root string) { | 12 | func SetServerRoot(c *Client, root string) { |
Reviewers: mp+213569_ code.launchpad. net,
Message:
Please take a look.
Description:
state/api: fixes to parallel api.Open
With these changes, api.Open will now
do the right thing if one of the dialers
succeeds before all are started. We also
introduce a short delay between starting
dialers, to avoid unnecessary dial
attempts.
https:/ /code.launchpad .net/~axwalk/ juju-core/ apiclient- open-parallel- fixes/+ merge/213569
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/82900045/
Affected files (+49, -12 lines): apiclient. go apiclient_ test.go export_ test.go
A [revision details]
M state/api/
M state/api/
M state/api/
Index: [revision details] 20140401002201- jdbrb1jd9gomhhc i
=== added file '[revision details]'
--- [revision details] 2012-01-01 00:00:00 +0000
+++ [revision details] 2012-01-01 00:00:00 +0000
@@ -0,0 +1,2 @@
+Old revision: tarmac-
+New revision: <email address hidden>
Index: state/api/ apiclient. go api/apiclient. go' apiclient. go 2014-03-31 14:43:24 +0000 apiclient. go 2014-04-01 03:13:49 +0000
=== modified file 'state/
--- state/api/
+++ state/api/
@@ -6,6 +6,7 @@
import (
"crypto/tls"
"crypto/x509"
+ "fmt"
"io"
"time"
@@ -84,6 +85,10 @@ rval: 50 * time.Millisecond,
// DialOpts holds configuration parameters that control the
// Dialing behavior when connecting to a state server.
type DialOpts struct {
+ // DialAddressInterval is the amount of time to wait
+ // before starting to dial another address.
+ DialAddressInterval time.Duration
+
// Timeout is the amount of time to wait contacting
// a state server.
Timeout time.Duration
@@ -97,8 +102,9 @@
// parameters for contacting a state server.
func DefaultDialOpts() DialOpts {
return DialOpts{
- Timeout: 10 * time.Minute,
- RetryDelay: 2 * time.Second,
+ DialAddressInte
+ Timeout: 10 * time.Minute,
+ RetryDelay: 2 * time.Second,
}
}
@@ -114,9 +120,16 @@ NewTry( maxParallelDial , nil) After(opts. DialAddressInte rval): newWebsocketDia ler(cfg, opts)) Try.Start. ler(cfg *websocket.Config, opts DialOpts) func(<-chan rategy{ Start() ; a.Next(); {
try := parallel.
defer try.Kill()
for _, addr := range info.Addrs {
- if err := dialWebsocket(addr, opts, pool, try); err != nil {
+ err := dialWebsocket(addr, opts, pool, try)
+ if err == parallel.ErrStopped {
+ break
+ } else if err != nil {
return nil, err
}
+ select {
+ case <-time.
+ case <-try.Dead():
+ }
}
try.Close()
result, err := try.Result()
@@ -160,28 +173,36 @@
RootCAs: rootCAs,
ServerName: "anything",
}
+ return try.Start(
+}
+
+// new WebsocketDialler returns a function that
+// can be passed to utils/parallel.
+func newWebsocketDia
struct{}) (io.Closer, error) {
openAttempt := utils.AttemptSt
Total: opts.Timeout,
Delay: opts.RetryDelay,
}
- return try.Start(func(stop <-chan struct{}) (io.Closer, error) {
- err := parallel.ErrStopped
+ return func(stop <-chan struct{}) (io.Closer, error) {
for a := openAttempt.
select {
case <-stop:
- break
+ return nil, parallel.ErrStopped
default:
}...