Merge lp:~pedronis/ubuntu-push/delivery-hosts-details into lp:ubuntu-push
- delivery-hosts-details
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Superseded |
---|---|
Proposed branch: | lp:~pedronis/ubuntu-push/delivery-hosts-details |
Merge into: | lp:ubuntu-push |
Diff against target: |
1245 lines (+595/-128) 7 files modified
.bzrignore (+1/-0) client/client.go (+31/-5) client/client_test.go (+93/-44) client/gethosts/gethost_test.go (+5/-7) client/session/session.go (+156/-31) client/session/session_test.go (+306/-41) debian/config.json (+3/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/delivery-hosts-details |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Push Hackers | Pending | ||
Review via email: mp+213485@code.launchpad.net |
Commit message
small sanity details
Description of the change
small sanity details:
* panic if there are no hosts specified instead of getting stuck in infinite retrying
* check that host bit of the config isn't empty
* tweak test with -race problems differently after some discussion in the golang bug tracker
To post a comment you must log in.
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file '.bzrignore' |
2 | --- .bzrignore 2014-02-07 19:36:38 +0000 |
3 | +++ .bzrignore 2014-03-31 14:33:42 +0000 |
4 | @@ -11,3 +11,4 @@ |
5 | debian/*.ex |
6 | debian/*.EX |
7 | debian/*.substvars |
8 | +ubuntu-push-client |
9 | |
10 | === modified file 'client/client.go' |
11 | --- client/client.go 2014-03-26 16:26:36 +0000 |
12 | +++ client/client.go 2014-03-31 14:33:42 +0000 |
13 | @@ -20,9 +20,14 @@ |
14 | |
15 | import ( |
16 | "encoding/pem" |
17 | + "errors" |
18 | "fmt" |
19 | "io/ioutil" |
20 | + "os" |
21 | + "strings" |
22 | + |
23 | "launchpad.net/go-dbus/v1" |
24 | + |
25 | "launchpad.net/ubuntu-push/bus" |
26 | "launchpad.net/ubuntu-push/bus/connectivity" |
27 | "launchpad.net/ubuntu-push/bus/networkmanager" |
28 | @@ -34,7 +39,6 @@ |
29 | "launchpad.net/ubuntu-push/logger" |
30 | "launchpad.net/ubuntu-push/util" |
31 | "launchpad.net/ubuntu-push/whoopsie/identifier" |
32 | - "os" |
33 | ) |
34 | |
35 | // ClientConfig holds the client configuration |
36 | @@ -42,8 +46,13 @@ |
37 | connectivity.ConnectivityConfig // q.v. |
38 | // A reasonably large timeout for receive/answer pairs |
39 | ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"` |
40 | - // The server to connect to |
41 | - Addr config.ConfigHostPort |
42 | + // A timeout to use when trying to connect to the server |
43 | + ConnectTimeout config.ConfigTimeDuration `json:"connect_timeout"` |
44 | + // The server to connect to or url to query for hosts to connect to |
45 | + Addr string |
46 | + // Host list management |
47 | + HostsCachingExpiryTime config.ConfigTimeDuration `json:"hosts_cache_expiry"` // potentially refresh host list after |
48 | + ExpectAllRepairedTime config.ConfigTimeDuration `json:"expect_all_repaired"` // worth retrying all servers after |
49 | // The PEM-encoded server certificate |
50 | CertPEMFile string `json:"cert_pem_file"` |
51 | // The logging level (one of "debug", "info", "error") |
52 | @@ -89,6 +98,12 @@ |
53 | if err != nil { |
54 | return fmt.Errorf("reading config: %v", err) |
55 | } |
56 | + // ignore spaces |
57 | + client.config.Addr = strings.Replace(client.config.Addr, " ", "", -1) |
58 | + if client.config.Addr == "" { |
59 | + return errors.New("no hosts specified") |
60 | + } |
61 | + |
62 | // later, we'll be specifying more logging options in the config file |
63 | client.log = logger.NewSimpleLogger(os.Stderr, client.config.LogLevel) |
64 | |
65 | @@ -116,6 +131,17 @@ |
66 | return nil |
67 | } |
68 | |
69 | +// deriveSessionConfig dervies the session configuration from the client configuration bits. |
70 | +func (client *PushClient) deriveSessionConfig() session.ClientSessionConfig { |
71 | + return session.ClientSessionConfig{ |
72 | + ConnectTimeout: client.config.ConnectTimeout.TimeDuration(), |
73 | + ExchangeTimeout: client.config.ExchangeTimeout.TimeDuration(), |
74 | + HostsCachingExpiryTime: client.config.HostsCachingExpiryTime.TimeDuration(), |
75 | + ExpectAllRepairedTime: client.config.ExpectAllRepairedTime.TimeDuration(), |
76 | + PEM: client.pem, |
77 | + } |
78 | +} |
79 | + |
80 | // getDeviceId gets the whoopsie identifier for the device |
81 | func (client *PushClient) getDeviceId() error { |
82 | err := client.idder.Generate() |
83 | @@ -143,8 +169,8 @@ |
84 | |
85 | // initSession creates the session object |
86 | func (client *PushClient) initSession() error { |
87 | - sess, err := session.NewSession(string(client.config.Addr), client.pem, |
88 | - client.config.ExchangeTimeout.Duration, client.deviceId, |
89 | + sess, err := session.NewSession(client.config.Addr, |
90 | + client.deriveSessionConfig(), client.deviceId, |
91 | client.levelMapFactory, client.log) |
92 | if err != nil { |
93 | return err |
94 | |
95 | === modified file 'client/client_test.go' |
96 | --- client/client_test.go 2014-03-26 16:26:36 +0000 |
97 | +++ client/client_test.go 2014-03-31 14:33:42 +0000 |
98 | @@ -17,10 +17,19 @@ |
99 | package client |
100 | |
101 | import ( |
102 | + "encoding/json" |
103 | "errors" |
104 | "fmt" |
105 | "io/ioutil" |
106 | + "net/http" |
107 | + "net/http/httptest" |
108 | + "path/filepath" |
109 | + "reflect" |
110 | + "testing" |
111 | + "time" |
112 | + |
113 | . "launchpad.net/gocheck" |
114 | + |
115 | "launchpad.net/ubuntu-push/bus" |
116 | "launchpad.net/ubuntu-push/bus/networkmanager" |
117 | "launchpad.net/ubuntu-push/bus/notifications" |
118 | @@ -32,11 +41,6 @@ |
119 | "launchpad.net/ubuntu-push/util" |
120 | "launchpad.net/ubuntu-push/whoopsie/identifier" |
121 | idtesting "launchpad.net/ubuntu-push/whoopsie/identifier/testing" |
122 | - "net/http" |
123 | - "net/http/httptest" |
124 | - "path/filepath" |
125 | - "testing" |
126 | - "time" |
127 | ) |
128 | |
129 | func TestClient(t *testing.T) { TestingT(t) } |
130 | @@ -83,22 +87,37 @@ |
131 | cs.timeouts = nil |
132 | } |
133 | |
134 | +func (cs *clientSuite) writeTestConfig(overrides map[string]interface{}) { |
135 | + pem_file := helpers.SourceRelative("../server/acceptance/config/testing.cert") |
136 | + cfgMap := map[string]interface{}{ |
137 | + "connect_timeout": "7ms", |
138 | + "exchange_timeout": "10ms", |
139 | + "hosts_cache_expiry": "1h", |
140 | + "expect_all_repaired": "30m", |
141 | + "stabilizing_timeout": "0ms", |
142 | + "connectivity_check_url": "", |
143 | + "connectivity_check_md5": "", |
144 | + "addr": ":0", |
145 | + "cert_pem_file": pem_file, |
146 | + "recheck_timeout": "3h", |
147 | + "log_level": "debug", |
148 | + } |
149 | + for k, v := range overrides { |
150 | + cfgMap[k] = v |
151 | + } |
152 | + cfgBlob, err := json.Marshal(cfgMap) |
153 | + if err != nil { |
154 | + panic(err) |
155 | + } |
156 | + ioutil.WriteFile(cs.configPath, cfgBlob, 0600) |
157 | +} |
158 | + |
159 | func (cs *clientSuite) SetUpTest(c *C) { |
160 | cs.log = helpers.NewTestLogger(c, "debug") |
161 | dir := c.MkDir() |
162 | cs.configPath = filepath.Join(dir, "config") |
163 | - cfg := fmt.Sprintf(` |
164 | -{ |
165 | - "exchange_timeout": "10ms", |
166 | - "stabilizing_timeout": "0ms", |
167 | - "connectivity_check_url": "", |
168 | - "connectivity_check_md5": "", |
169 | - "addr": ":0", |
170 | - "cert_pem_file": %#v, |
171 | - "recheck_timeout": "3h", |
172 | - "log_level": "debug" |
173 | -}`, helpers.SourceRelative("../server/acceptance/config/testing.cert")) |
174 | - ioutil.WriteFile(cs.configPath, []byte(cfg), 0600) |
175 | + |
176 | + cs.writeTestConfig(nil) |
177 | } |
178 | |
179 | type sqlientSuite struct{ clientSuite } |
180 | @@ -119,7 +138,7 @@ |
181 | err := cli.configure() |
182 | c.Assert(err, IsNil) |
183 | c.Assert(cli.config, NotNil) |
184 | - c.Check(cli.config.ExchangeTimeout.Duration, Equals, time.Duration(10*time.Millisecond)) |
185 | + c.Check(cli.config.ExchangeTimeout.TimeDuration(), Equals, time.Duration(10*time.Millisecond)) |
186 | } |
187 | |
188 | func (cs *clientSuite) TestConfigureSetsUpLog(c *C) { |
189 | @@ -179,41 +198,70 @@ |
190 | } |
191 | |
192 | func (cs *clientSuite) TestConfigureBailsOnBadPEMFilename(c *C) { |
193 | - ioutil.WriteFile(cs.configPath, []byte(` |
194 | -{ |
195 | - "exchange_timeout": "10ms", |
196 | - "stabilizing_timeout": "0ms", |
197 | - "connectivity_check_url": "", |
198 | - "connectivity_check_md5": "", |
199 | - "addr": ":0", |
200 | - "cert_pem_file": "/a/b/c", |
201 | - "log_level": "debug", |
202 | - "recheck_timeout": "3h" |
203 | -}`), 0600) |
204 | - |
205 | + cs.writeTestConfig(map[string]interface{}{ |
206 | + "cert_pem_file": "/a/b/c", |
207 | + }) |
208 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
209 | err := cli.configure() |
210 | c.Assert(err, ErrorMatches, "reading PEM file: .*") |
211 | } |
212 | |
213 | func (cs *clientSuite) TestConfigureBailsOnBadPEM(c *C) { |
214 | - ioutil.WriteFile(cs.configPath, []byte(` |
215 | -{ |
216 | - "exchange_timeout": "10ms", |
217 | - "stabilizing_timeout": "0ms", |
218 | - "connectivity_check_url": "", |
219 | - "connectivity_check_md5": "", |
220 | - "addr": ":0", |
221 | - "cert_pem_file": "/etc/passwd", |
222 | - "log_level": "debug", |
223 | - "recheck_timeout": "3h" |
224 | -}`), 0600) |
225 | - |
226 | + cs.writeTestConfig(map[string]interface{}{ |
227 | + "cert_pem_file": "/etc/passwd", |
228 | + }) |
229 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
230 | err := cli.configure() |
231 | c.Assert(err, ErrorMatches, "no PEM found.*") |
232 | } |
233 | |
234 | +func (cs *clientSuite) TestConfigureBailsOnNoHosts(c *C) { |
235 | + cs.writeTestConfig(map[string]interface{}{ |
236 | + "addr": " ", |
237 | + }) |
238 | + cli := NewPushClient(cs.configPath, cs.leveldbPath) |
239 | + err := cli.configure() |
240 | + c.Assert(err, ErrorMatches, "no hosts specified") |
241 | +} |
242 | + |
243 | +func (cs *clientSuite) TestConfigureRemovesBlanksInAddr(c *C) { |
244 | + cs.writeTestConfig(map[string]interface{}{ |
245 | + "addr": " foo: 443", |
246 | + }) |
247 | + cli := NewPushClient(cs.configPath, cs.leveldbPath) |
248 | + err := cli.configure() |
249 | + c.Assert(err, IsNil) |
250 | + c.Check(cli.config.Addr, Equals, "foo:443") |
251 | +} |
252 | + |
253 | +/***************************************************************** |
254 | + deriveSessionConfig tests |
255 | +******************************************************************/ |
256 | + |
257 | +func (cs *clientSuite) TestDeriveSessionConfig(c *C) { |
258 | + cli := NewPushClient(cs.configPath, cs.leveldbPath) |
259 | + err := cli.configure() |
260 | + c.Assert(err, IsNil) |
261 | + expected := session.ClientSessionConfig{ |
262 | + ConnectTimeout: 7 * time.Millisecond, |
263 | + ExchangeTimeout: 10 * time.Millisecond, |
264 | + HostsCachingExpiryTime: 1 * time.Hour, |
265 | + ExpectAllRepairedTime: 30 * time.Minute, |
266 | + PEM: cli.pem, |
267 | + } |
268 | + // sanity check that we are looking at all fields |
269 | + vExpected := reflect.ValueOf(expected) |
270 | + nf := vExpected.NumField() |
271 | + for i := 0; i < nf; i++ { |
272 | + fv := vExpected.Field(i) |
273 | + // field isn't empty/zero |
274 | + c.Assert(fv.Interface(), Not(DeepEquals), reflect.Zero(fv.Type()).Interface(), Commentf("forgot about: %s", vExpected.Type().Field(i).Name)) |
275 | + } |
276 | + // finally compare |
277 | + conf := cli.deriveSessionConfig() |
278 | + c.Check(conf, DeepEquals, expected) |
279 | +} |
280 | + |
281 | /***************************************************************** |
282 | getDeviceId tests |
283 | ******************************************************************/ |
284 | @@ -308,6 +356,7 @@ |
285 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
286 | cli.log = cs.log |
287 | c.Assert(cli.initSession(), IsNil) |
288 | + cs.log.ResetCapture() |
289 | cli.hasConnectivity = true |
290 | cli.handleErr(errors.New("bananas")) |
291 | c.Check(cs.log.Captured(), Matches, ".*session exited.*bananas\n") |
292 | @@ -363,7 +412,7 @@ |
293 | func (cs *clientSuite) TestHandleConnStateC2D(c *C) { |
294 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
295 | cli.log = cs.log |
296 | - cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, levelmap.NewLevelMap, cs.log) |
297 | + cli.session, _ = session.NewSession(cli.config.Addr, cli.deriveSessionConfig(), cli.deviceId, levelmap.NewLevelMap, cs.log) |
298 | cli.session.Dial() |
299 | cli.hasConnectivity = true |
300 | |
301 | @@ -376,7 +425,7 @@ |
302 | func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) { |
303 | cli := NewPushClient(cs.configPath, cs.leveldbPath) |
304 | cli.log = cs.log |
305 | - cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, levelmap.NewLevelMap, cs.log) |
306 | + cli.session, _ = session.NewSession(cli.config.Addr, cli.deriveSessionConfig(), cli.deviceId, levelmap.NewLevelMap, cs.log) |
307 | cli.hasConnectivity = true |
308 | |
309 | cli.handleConnState(false) |
310 | |
311 | === modified file 'client/gethosts/gethost_test.go' |
312 | --- client/gethosts/gethost_test.go 2014-03-24 15:32:29 +0000 |
313 | +++ client/gethosts/gethost_test.go 2014-03-31 14:33:42 +0000 |
314 | @@ -61,18 +61,16 @@ |
315 | } |
316 | |
317 | func (s *getHostsSuite) TestGetTimeout(c *C) { |
318 | - finish := make(chan bool, 1) |
319 | + started := make(chan bool, 1) |
320 | ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
321 | - <-finish |
322 | + started <- true |
323 | + time.Sleep(700 * time.Millisecond) |
324 | })) |
325 | defer func() { |
326 | - time.Sleep(100 * time.Millisecond) // work around -race issue |
327 | + <-started |
328 | ts.Close() |
329 | }() |
330 | - defer func() { |
331 | - finish <- true |
332 | - }() |
333 | - gh := New("foobar", ts.URL, 1*time.Second) |
334 | + gh := New("foobar", ts.URL, 500*time.Millisecond) |
335 | _, err := gh.Get() |
336 | c.Check(err, ErrorMatches, ".*closed.*") |
337 | } |
338 | |
339 | === modified file 'client/session/session.go' |
340 | --- client/session/session.go 2014-03-26 16:26:36 +0000 |
341 | +++ client/session/session.go 2014-03-31 14:33:42 +0000 |
342 | @@ -23,14 +23,18 @@ |
343 | "crypto/x509" |
344 | "errors" |
345 | "fmt" |
346 | + "math/rand" |
347 | + "net" |
348 | + "strings" |
349 | + "sync" |
350 | + "sync/atomic" |
351 | + "time" |
352 | + |
353 | + "launchpad.net/ubuntu-push/client/gethosts" |
354 | "launchpad.net/ubuntu-push/client/session/levelmap" |
355 | "launchpad.net/ubuntu-push/logger" |
356 | "launchpad.net/ubuntu-push/protocol" |
357 | "launchpad.net/ubuntu-push/util" |
358 | - "math/rand" |
359 | - "net" |
360 | - "sync/atomic" |
361 | - "time" |
362 | ) |
363 | |
364 | var wireVersionBytes = []byte{protocol.ProtocolWireVersion} |
365 | @@ -45,6 +49,15 @@ |
366 | protocol.NotificationsMsg |
367 | } |
368 | |
369 | +// parseServerAddrSpec recognizes whether spec is a HTTP URL to get |
370 | +// hosts from or a |-separated list of host:port pairs. |
371 | +func parseServerAddrSpec(spec string) (hostsEndpoint string, fallbackHosts []string) { |
372 | + if strings.HasPrefix(spec, "http") { |
373 | + return spec, nil |
374 | + } |
375 | + return "", strings.Split(spec, "|") |
376 | +} |
377 | + |
378 | // ClientSessionState is a way to broadly track the progress of the session |
379 | type ClientSessionState uint32 |
380 | |
381 | @@ -56,15 +69,38 @@ |
382 | Running |
383 | ) |
384 | |
385 | -// ClienSession holds a client<->server session and its configuration. |
386 | +type hostGetter interface { |
387 | + Get() ([]string, error) |
388 | +} |
389 | + |
390 | +// ClientSessionConfig groups the client session configuration. |
391 | +type ClientSessionConfig struct { |
392 | + ConnectTimeout time.Duration |
393 | + ExchangeTimeout time.Duration |
394 | + HostsCachingExpiryTime time.Duration |
395 | + ExpectAllRepairedTime time.Duration |
396 | + PEM []byte |
397 | +} |
398 | + |
399 | +// ClientSession holds a client<->server session and its configuration. |
400 | type ClientSession struct { |
401 | // configuration |
402 | - DeviceId string |
403 | - ServerAddr string |
404 | - ExchangeTimeout time.Duration |
405 | - Levels levelmap.LevelMap |
406 | - Protocolator func(net.Conn) protocol.Protocol |
407 | + DeviceId string |
408 | + ClientSessionConfig |
409 | + Levels levelmap.LevelMap |
410 | + Protocolator func(net.Conn) protocol.Protocol |
411 | + // hosts |
412 | + getHost hostGetter |
413 | + fallbackHosts []string |
414 | + deliveryHostsTimestamp time.Time |
415 | + deliveryHosts []string |
416 | + lastAttemptTimestamp time.Time |
417 | + leftToTry int |
418 | + tryHost int |
419 | + // hook for testing |
420 | + timeSince func(time.Time) time.Duration |
421 | // connection |
422 | + connLock sync.RWMutex |
423 | Connection net.Conn |
424 | Log logger.Logger |
425 | TLS *tls.Config |
426 | @@ -77,7 +113,7 @@ |
427 | MsgCh chan *Notification |
428 | } |
429 | |
430 | -func NewSession(serverAddr string, pem []byte, exchangeTimeout time.Duration, |
431 | +func NewSession(serverAddrSpec string, conf ClientSessionConfig, |
432 | deviceId string, levelmapFactory func() (levelmap.LevelMap, error), |
433 | log logger.Logger) (*ClientSession, error) { |
434 | state := uint32(Disconnected) |
435 | @@ -85,19 +121,27 @@ |
436 | if err != nil { |
437 | return nil, err |
438 | } |
439 | + var getHost hostGetter |
440 | + log.Infof("using addr: %v", serverAddrSpec) |
441 | + hostsEndpoint, fallbackHosts := parseServerAddrSpec(serverAddrSpec) |
442 | + if hostsEndpoint != "" { |
443 | + getHost = gethosts.New(deviceId, hostsEndpoint, conf.ExchangeTimeout) |
444 | + } |
445 | sess := &ClientSession{ |
446 | - ExchangeTimeout: exchangeTimeout, |
447 | - ServerAddr: serverAddr, |
448 | - DeviceId: deviceId, |
449 | - Log: log, |
450 | - Protocolator: protocol.NewProtocol0, |
451 | - Levels: levels, |
452 | - TLS: &tls.Config{InsecureSkipVerify: true}, // XXX |
453 | - stateP: &state, |
454 | + ClientSessionConfig: conf, |
455 | + getHost: getHost, |
456 | + fallbackHosts: fallbackHosts, |
457 | + DeviceId: deviceId, |
458 | + Log: log, |
459 | + Protocolator: protocol.NewProtocol0, |
460 | + Levels: levels, |
461 | + TLS: &tls.Config{InsecureSkipVerify: true}, // XXX |
462 | + stateP: &state, |
463 | + timeSince: time.Since, |
464 | } |
465 | - if pem != nil { |
466 | + if sess.PEM != nil { |
467 | cp := x509.NewCertPool() |
468 | - ok := cp.AppendCertsFromPEM(pem) |
469 | + ok := cp.AppendCertsFromPEM(sess.PEM) |
470 | if !ok { |
471 | return nil, errors.New("could not parse certificate") |
472 | } |
473 | @@ -114,15 +158,90 @@ |
474 | atomic.StoreUint32(sess.stateP, uint32(state)) |
475 | } |
476 | |
477 | +func (sess *ClientSession) setConnection(conn net.Conn) { |
478 | + sess.connLock.Lock() |
479 | + defer sess.connLock.Unlock() |
480 | + sess.Connection = conn |
481 | +} |
482 | + |
483 | +func (sess *ClientSession) getConnection() net.Conn { |
484 | + sess.connLock.RLock() |
485 | + defer sess.connLock.RUnlock() |
486 | + return sess.Connection |
487 | +} |
488 | + |
489 | +// getHosts sets deliveryHosts possibly querying a remote endpoint |
490 | +func (sess *ClientSession) getHosts() error { |
491 | + if sess.getHost != nil { |
492 | + if sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime { |
493 | + return nil |
494 | + } |
495 | + hosts, err := sess.getHost.Get() |
496 | + if err != nil { |
497 | + sess.Log.Errorf("getHosts: %v", err) |
498 | + sess.setState(Error) |
499 | + return err |
500 | + } |
501 | + sess.deliveryHostsTimestamp = time.Now() |
502 | + sess.deliveryHosts = hosts |
503 | + } else { |
504 | + sess.deliveryHosts = sess.fallbackHosts |
505 | + } |
506 | + return nil |
507 | +} |
508 | + |
509 | +// startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts |
510 | + |
511 | +func (sess *ClientSession) startConnectionAttempt() { |
512 | + if sess.timeSince(sess.lastAttemptTimestamp) > sess.ExpectAllRepairedTime { |
513 | + sess.tryHost = 0 |
514 | + } |
515 | + sess.leftToTry = len(sess.deliveryHosts) |
516 | + if sess.leftToTry == 0 { |
517 | + panic("should have got hosts from config or remote at this point") |
518 | + } |
519 | + sess.lastAttemptTimestamp = time.Now() |
520 | +} |
521 | + |
522 | +func (sess *ClientSession) nextHostToTry() string { |
523 | + if sess.leftToTry == 0 { |
524 | + return "" |
525 | + } |
526 | + res := sess.deliveryHosts[sess.tryHost] |
527 | + sess.tryHost = (sess.tryHost + 1) % len(sess.deliveryHosts) |
528 | + sess.leftToTry-- |
529 | + return res |
530 | +} |
531 | + |
532 | +// we reached the Started state, we can retry with the same host if we |
533 | +// have to retry again |
534 | +func (sess *ClientSession) started() { |
535 | + sess.tryHost-- |
536 | + if sess.tryHost == -1 { |
537 | + sess.tryHost = len(sess.deliveryHosts) - 1 |
538 | + } |
539 | + sess.setState(Started) |
540 | +} |
541 | + |
542 | // connect to a server using the configuration in the ClientSession |
543 | // and set up the connection. |
544 | func (sess *ClientSession) connect() error { |
545 | - conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout) |
546 | - if err != nil { |
547 | - sess.setState(Error) |
548 | - return fmt.Errorf("connect: %s", err) |
549 | + sess.startConnectionAttempt() |
550 | + var err error |
551 | + var conn net.Conn |
552 | + for { |
553 | + host := sess.nextHostToTry() |
554 | + if host == "" { |
555 | + sess.setState(Error) |
556 | + return fmt.Errorf("connect: %s", err) |
557 | + } |
558 | + sess.Log.Debugf("trying to connect to: %v", host) |
559 | + conn, err = net.DialTimeout("tcp", host, sess.ConnectTimeout) |
560 | + if err == nil { |
561 | + break |
562 | + } |
563 | } |
564 | - sess.Connection = tls.Client(conn, sess.TLS) |
565 | + sess.setConnection(tls.Client(conn, sess.TLS)) |
566 | sess.setState(Connected) |
567 | return nil |
568 | } |
569 | @@ -145,6 +264,8 @@ |
570 | sess.doClose() |
571 | } |
572 | func (sess *ClientSession) doClose() { |
573 | + sess.connLock.Lock() |
574 | + defer sess.connLock.Unlock() |
575 | if sess.Connection != nil { |
576 | sess.Connection.Close() |
577 | // we ignore Close errors, on purpose (the thinking being that |
578 | @@ -224,7 +345,7 @@ |
579 | |
580 | // Call this when you've connected and want to start looping. |
581 | func (sess *ClientSession) start() error { |
582 | - conn := sess.Connection |
583 | + conn := sess.getConnection() |
584 | err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
585 | if err != nil { |
586 | sess.setState(Error) |
587 | @@ -279,15 +400,19 @@ |
588 | sess.proto = proto |
589 | sess.pingInterval = pingInterval |
590 | sess.Log.Debugf("Connected %v.", conn.LocalAddr()) |
591 | - sess.setState(Started) |
592 | + sess.started() // deals with choosing which host to retry with as well |
593 | return nil |
594 | } |
595 | |
596 | // run calls connect, and if it works it calls start, and if it works |
597 | // it runs loop in a goroutine, and ships its return value over ErrCh. |
598 | -func (sess *ClientSession) run(closer func(), connecter, starter, looper func() error) error { |
599 | +func (sess *ClientSession) run(closer func(), hostGetter, connecter, starter, looper func() error) error { |
600 | closer() |
601 | - err := connecter() |
602 | + err := hostGetter() |
603 | + if err != nil { |
604 | + return err |
605 | + } |
606 | + err = connecter() |
607 | if err == nil { |
608 | err = starter() |
609 | if err == nil { |
610 | @@ -317,7 +442,7 @@ |
611 | // keep on trying. |
612 | panic("can't Dial() without a protocol constructor.") |
613 | } |
614 | - return sess.run(sess.doClose, sess.connect, sess.start, sess.loop) |
615 | + return sess.run(sess.doClose, sess.getHosts, sess.connect, sess.start, sess.loop) |
616 | } |
617 | |
618 | func init() { |
619 | |
620 | === modified file 'client/session/session_test.go' |
621 | --- client/session/session_test.go 2014-03-27 13:26:10 +0000 |
622 | +++ client/session/session_test.go 2014-03-31 14:33:42 +0000 |
623 | @@ -23,16 +23,21 @@ |
624 | "fmt" |
625 | "io" |
626 | "io/ioutil" |
627 | + "net" |
628 | + "net/http" |
629 | + "net/http/httptest" |
630 | + "reflect" |
631 | + "testing" |
632 | + "time" |
633 | + |
634 | . "launchpad.net/gocheck" |
635 | + |
636 | "launchpad.net/ubuntu-push/client/session/levelmap" |
637 | + //"launchpad.net/ubuntu-push/client/gethosts" |
638 | "launchpad.net/ubuntu-push/logger" |
639 | "launchpad.net/ubuntu-push/protocol" |
640 | helpers "launchpad.net/ubuntu-push/testing" |
641 | "launchpad.net/ubuntu-push/testing/condition" |
642 | - "net" |
643 | - "reflect" |
644 | - "testing" |
645 | - "time" |
646 | ) |
647 | |
648 | func TestSession(t *testing.T) { TestingT(t) } |
649 | @@ -181,23 +186,51 @@ |
650 | } |
651 | |
652 | /**************************************************************** |
653 | + parseServerAddrSpec() tests |
654 | +****************************************************************/ |
655 | + |
656 | +func (cs *clientSessionSuite) TestParseServerAddrSpec(c *C) { |
657 | + hEp, fallbackHosts := parseServerAddrSpec("http://foo/hosts") |
658 | + c.Check(hEp, Equals, "http://foo/hosts") |
659 | + c.Check(fallbackHosts, IsNil) |
660 | + |
661 | + hEp, fallbackHosts = parseServerAddrSpec("foo:443") |
662 | + c.Check(hEp, Equals, "") |
663 | + c.Check(fallbackHosts, DeepEquals, []string{"foo:443"}) |
664 | + |
665 | + hEp, fallbackHosts = parseServerAddrSpec("foo:443|bar:443") |
666 | + c.Check(hEp, Equals, "") |
667 | + c.Check(fallbackHosts, DeepEquals, []string{"foo:443", "bar:443"}) |
668 | +} |
669 | + |
670 | +/**************************************************************** |
671 | NewSession() tests |
672 | ****************************************************************/ |
673 | |
674 | +var dummyConf = ClientSessionConfig{} |
675 | + |
676 | func (cs *clientSessionSuite) TestNewSessionPlainWorks(c *C) { |
677 | - sess, err := NewSession("", nil, 0, "", cs.lvls, cs.log) |
678 | + sess, err := NewSession("foo:443", dummyConf, "", cs.lvls, cs.log) |
679 | c.Check(sess, NotNil) |
680 | c.Check(err, IsNil) |
681 | + c.Check(sess.fallbackHosts, DeepEquals, []string{"foo:443"}) |
682 | // but no root CAs set |
683 | c.Check(sess.TLS.RootCAs, IsNil) |
684 | c.Check(sess.State(), Equals, Disconnected) |
685 | } |
686 | |
687 | +func (cs *clientSessionSuite) TestNewSessionHostEndpointWorks(c *C) { |
688 | + sess, err := NewSession("http://foo/hosts", dummyConf, "wah", cs.lvls, cs.log) |
689 | + c.Assert(err, IsNil) |
690 | + c.Check(sess.getHost, NotNil) |
691 | +} |
692 | + |
693 | var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert") |
694 | var pem, _ = ioutil.ReadFile(certfile) |
695 | |
696 | func (cs *clientSessionSuite) TestNewSessionPEMWorks(c *C) { |
697 | - sess, err := NewSession("", pem, 0, "wah", cs.lvls, cs.log) |
698 | + conf := ClientSessionConfig{PEM: pem} |
699 | + sess, err := NewSession("", conf, "wah", cs.lvls, cs.log) |
700 | c.Check(sess, NotNil) |
701 | c.Assert(err, IsNil) |
702 | c.Check(sess.TLS.RootCAs, NotNil) |
703 | @@ -205,25 +238,172 @@ |
704 | |
705 | func (cs *clientSessionSuite) TestNewSessionBadPEMFileContentFails(c *C) { |
706 | badpem := []byte("This is not the PEM you're looking for.") |
707 | - sess, err := NewSession("", badpem, 0, "wah", cs.lvls, cs.log) |
708 | + conf := ClientSessionConfig{PEM: badpem} |
709 | + sess, err := NewSession("", conf, "wah", cs.lvls, cs.log) |
710 | c.Check(sess, IsNil) |
711 | c.Check(err, NotNil) |
712 | } |
713 | |
714 | func (cs *clientSessionSuite) TestNewSessionBadLevelMapFails(c *C) { |
715 | ferr := func() (levelmap.LevelMap, error) { return nil, errors.New("Busted.") } |
716 | - sess, err := NewSession("", nil, 0, "wah", ferr, cs.log) |
717 | + sess, err := NewSession("", dummyConf, "wah", ferr, cs.log) |
718 | c.Check(sess, IsNil) |
719 | c.Assert(err, NotNil) |
720 | } |
721 | |
722 | /**************************************************************** |
723 | + getHosts() tests |
724 | +****************************************************************/ |
725 | + |
726 | +func (cs *clientSessionSuite) TestGetHostsFallback(c *C) { |
727 | + fallback := []string{"foo:443", "bar:443"} |
728 | + sess := &ClientSession{fallbackHosts: fallback} |
729 | + err := sess.getHosts() |
730 | + c.Assert(err, IsNil) |
731 | + c.Check(sess.deliveryHosts, DeepEquals, fallback) |
732 | +} |
733 | + |
734 | +type testHostGetter struct { |
735 | + hosts []string |
736 | + err error |
737 | +} |
738 | + |
739 | +func (thg *testHostGetter) Get() ([]string, error) { |
740 | + return thg.hosts, thg.err |
741 | +} |
742 | + |
743 | +func (cs *clientSessionSuite) TestGetHostsRemote(c *C) { |
744 | + hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil} |
745 | + sess := &ClientSession{getHost: hostGetter, timeSince: time.Since} |
746 | + err := sess.getHosts() |
747 | + c.Assert(err, IsNil) |
748 | + c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"}) |
749 | +} |
750 | + |
751 | +func (cs *clientSessionSuite) TestGetHostsRemoteError(c *C) { |
752 | + sess, err := NewSession("", dummyConf, "", cs.lvls, cs.log) |
753 | + c.Assert(err, IsNil) |
754 | + hostsErr := errors.New("failed") |
755 | + hostGetter := &testHostGetter{nil, hostsErr} |
756 | + sess.getHost = hostGetter |
757 | + err = sess.getHosts() |
758 | + c.Assert(err, Equals, hostsErr) |
759 | + c.Check(sess.deliveryHosts, IsNil) |
760 | + c.Check(sess.State(), Equals, Error) |
761 | +} |
762 | + |
763 | +func (cs *clientSessionSuite) TestGetHostsRemoteCaching(c *C) { |
764 | + hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil} |
765 | + sess := &ClientSession{ |
766 | + getHost: hostGetter, |
767 | + ClientSessionConfig: ClientSessionConfig{ |
768 | + HostsCachingExpiryTime: 2 * time.Hour, |
769 | + }, |
770 | + timeSince: time.Since, |
771 | + } |
772 | + err := sess.getHosts() |
773 | + c.Assert(err, IsNil) |
774 | + hostGetter.hosts = []string{"baz:443"} |
775 | + // cached |
776 | + err = sess.getHosts() |
777 | + c.Assert(err, IsNil) |
778 | + c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"}) |
779 | + // expired |
780 | + sess.timeSince = func(ts time.Time) time.Duration { |
781 | + return 3 * time.Hour |
782 | + } |
783 | + err = sess.getHosts() |
784 | + c.Assert(err, IsNil) |
785 | + c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"}) |
786 | +} |
787 | + |
788 | +/**************************************************************** |
789 | + startConnectionAttempt()/nextHostToTry()/started tests |
790 | +****************************************************************/ |
791 | + |
792 | +func (cs *clientSessionSuite) TestStartConnectionAttempt(c *C) { |
793 | + since := time.Since(time.Time{}) |
794 | + sess := &ClientSession{ |
795 | + ClientSessionConfig: ClientSessionConfig{ |
796 | + ExpectAllRepairedTime: 10 * time.Second, |
797 | + }, |
798 | + timeSince: func(ts time.Time) time.Duration { |
799 | + return since |
800 | + }, |
801 | + deliveryHosts: []string{"foo:443", "bar:443"}, |
802 | + } |
803 | + // start from first host |
804 | + sess.startConnectionAttempt() |
805 | + c.Check(sess.lastAttemptTimestamp, Not(Equals), 0) |
806 | + c.Check(sess.tryHost, Equals, 0) |
807 | + c.Check(sess.leftToTry, Equals, 2) |
808 | + since = 1 * time.Second |
809 | + sess.tryHost = 1 |
810 | + // just continue |
811 | + sess.startConnectionAttempt() |
812 | + c.Check(sess.tryHost, Equals, 1) |
813 | + sess.tryHost = 2 |
814 | +} |
815 | + |
816 | +func (cs *clientSessionSuite) TestStartConnectionAttemptNoHostsPanic(c *C) { |
817 | + since := time.Since(time.Time{}) |
818 | + sess := &ClientSession{ |
819 | + ClientSessionConfig: ClientSessionConfig{ |
820 | + ExpectAllRepairedTime: 10 * time.Second, |
821 | + }, |
822 | + timeSince: func(ts time.Time) time.Duration { |
823 | + return since |
824 | + }, |
825 | + } |
826 | + c.Check(sess.startConnectionAttempt, PanicMatches, "should have got hosts from config or remote at this point") |
827 | +} |
828 | + |
829 | +func (cs *clientSessionSuite) TestNextHostToTry(c *C) { |
830 | + sess := &ClientSession{ |
831 | + deliveryHosts: []string{"foo:443", "bar:443", "baz:443"}, |
832 | + tryHost: 0, |
833 | + leftToTry: 3, |
834 | + } |
835 | + c.Check(sess.nextHostToTry(), Equals, "foo:443") |
836 | + c.Check(sess.nextHostToTry(), Equals, "bar:443") |
837 | + c.Check(sess.nextHostToTry(), Equals, "baz:443") |
838 | + c.Check(sess.nextHostToTry(), Equals, "") |
839 | + c.Check(sess.nextHostToTry(), Equals, "") |
840 | + c.Check(sess.tryHost, Equals, 0) |
841 | + |
842 | + sess.leftToTry = 3 |
843 | + sess.tryHost = 1 |
844 | + c.Check(sess.nextHostToTry(), Equals, "bar:443") |
845 | + c.Check(sess.nextHostToTry(), Equals, "baz:443") |
846 | + c.Check(sess.nextHostToTry(), Equals, "foo:443") |
847 | + c.Check(sess.nextHostToTry(), Equals, "") |
848 | + c.Check(sess.nextHostToTry(), Equals, "") |
849 | + c.Check(sess.tryHost, Equals, 1) |
850 | +} |
851 | + |
852 | +func (cs *clientSessionSuite) TestStarted(c *C) { |
853 | + sess, err := NewSession("", dummyConf, "", cs.lvls, cs.log) |
854 | + c.Assert(err, IsNil) |
855 | + |
856 | + sess.deliveryHosts = []string{"foo:443", "bar:443", "baz:443"} |
857 | + sess.tryHost = 1 |
858 | + |
859 | + sess.started() |
860 | + c.Check(sess.tryHost, Equals, 0) |
861 | + c.Check(sess.State(), Equals, Started) |
862 | + |
863 | + sess.started() |
864 | + c.Check(sess.tryHost, Equals, 2) |
865 | +} |
866 | + |
867 | +/**************************************************************** |
868 | connect() tests |
869 | ****************************************************************/ |
870 | |
871 | func (cs *clientSessionSuite) TestConnectFailsWithNoAddress(c *C) { |
872 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
873 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
874 | c.Assert(err, IsNil) |
875 | + sess.deliveryHosts = []string{"nowhere"} |
876 | err = sess.connect() |
877 | c.Check(err, ErrorMatches, ".*connect.*address.*") |
878 | c.Check(sess.State(), Equals, Error) |
879 | @@ -233,20 +413,36 @@ |
880 | srv, err := net.Listen("tcp", "localhost:0") |
881 | c.Assert(err, IsNil) |
882 | defer srv.Close() |
883 | - sess, err := NewSession(srv.Addr().String(), nil, 0, "wah", cs.lvls, cs.log) |
884 | - c.Assert(err, IsNil) |
885 | - err = sess.connect() |
886 | - c.Check(err, IsNil) |
887 | - c.Check(sess.Connection, NotNil) |
888 | - c.Check(sess.State(), Equals, Connected) |
889 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
890 | + c.Assert(err, IsNil) |
891 | + sess.deliveryHosts = []string{srv.Addr().String()} |
892 | + err = sess.connect() |
893 | + c.Check(err, IsNil) |
894 | + c.Check(sess.Connection, NotNil) |
895 | + c.Check(sess.State(), Equals, Connected) |
896 | +} |
897 | + |
898 | +func (cs *clientSessionSuite) TestConnectSecondConnects(c *C) { |
899 | + srv, err := net.Listen("tcp", "localhost:0") |
900 | + c.Assert(err, IsNil) |
901 | + defer srv.Close() |
902 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
903 | + c.Assert(err, IsNil) |
904 | + sess.deliveryHosts = []string{"nowhere", srv.Addr().String()} |
905 | + err = sess.connect() |
906 | + c.Check(err, IsNil) |
907 | + c.Check(sess.Connection, NotNil) |
908 | + c.Check(sess.State(), Equals, Connected) |
909 | + c.Check(sess.tryHost, Equals, 0) |
910 | } |
911 | |
912 | func (cs *clientSessionSuite) TestConnectConnectFail(c *C) { |
913 | srv, err := net.Listen("tcp", "localhost:0") |
914 | c.Assert(err, IsNil) |
915 | - sess, err := NewSession(srv.Addr().String(), nil, 0, "wah", cs.lvls, cs.log) |
916 | + sess, err := NewSession(srv.Addr().String(), dummyConf, "wah", cs.lvls, cs.log) |
917 | srv.Close() |
918 | c.Assert(err, IsNil) |
919 | + sess.deliveryHosts = []string{srv.Addr().String()} |
920 | err = sess.connect() |
921 | c.Check(err, ErrorMatches, ".*connection refused") |
922 | c.Check(sess.State(), Equals, Error) |
923 | @@ -257,7 +453,7 @@ |
924 | ****************************************************************/ |
925 | |
926 | func (cs *clientSessionSuite) TestClose(c *C) { |
927 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
928 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
929 | c.Assert(err, IsNil) |
930 | sess.Connection = &testConn{Name: "TestClose"} |
931 | sess.Close() |
932 | @@ -266,7 +462,7 @@ |
933 | } |
934 | |
935 | func (cs *clientSessionSuite) TestCloseTwice(c *C) { |
936 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
937 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
938 | c.Assert(err, IsNil) |
939 | sess.Connection = &testConn{Name: "TestCloseTwice"} |
940 | sess.Close() |
941 | @@ -277,7 +473,7 @@ |
942 | } |
943 | |
944 | func (cs *clientSessionSuite) TestCloseFails(c *C) { |
945 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
946 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
947 | c.Assert(err, IsNil) |
948 | sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)} |
949 | sess.Close() |
950 | @@ -291,7 +487,7 @@ |
951 | func (d *derp) Stop() { d.stopped = true } |
952 | |
953 | func (cs *clientSessionSuite) TestCloseStopsRetrier(c *C) { |
954 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
955 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
956 | c.Assert(err, IsNil) |
957 | ar := new(derp) |
958 | sess.retrier = ar |
959 | @@ -308,7 +504,7 @@ |
960 | |
961 | func (cs *clientSessionSuite) TestAutoRedialWorks(c *C) { |
962 | // checks that AutoRedial sets up a retrier and tries redialing it |
963 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
964 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
965 | c.Assert(err, IsNil) |
966 | ar := new(derp) |
967 | sess.retrier = ar |
968 | @@ -319,7 +515,7 @@ |
969 | |
970 | func (cs *clientSessionSuite) TestAutoRedialStopsRetrier(c *C) { |
971 | // checks that AutoRedial stops the previous retrier |
972 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
973 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
974 | c.Assert(err, IsNil) |
975 | ch := make(chan uint32) |
976 | c.Check(sess.retrier, IsNil) |
977 | @@ -344,7 +540,10 @@ |
978 | |
979 | func (s *msgSuite) SetUpTest(c *C) { |
980 | var err error |
981 | - s.sess, err = NewSession("", nil, time.Millisecond, "wah", levelmap.NewLevelMap, helpers.NewTestLogger(c, "debug")) |
982 | + conf := ClientSessionConfig{ |
983 | + ExchangeTimeout: time.Millisecond, |
984 | + } |
985 | + s.sess, err = NewSession("", conf, "wah", levelmap.NewLevelMap, helpers.NewTestLogger(c, "debug")) |
986 | c.Assert(err, IsNil) |
987 | s.sess.Connection = &testConn{Name: "TestHandle*"} |
988 | s.errCh = make(chan error, 1) |
989 | @@ -519,7 +718,7 @@ |
990 | start() tests |
991 | ****************************************************************/ |
992 | func (cs *clientSessionSuite) TestStartFailsIfSetDeadlineFails(c *C) { |
993 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
994 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
995 | c.Assert(err, IsNil) |
996 | sess.Connection = &testConn{Name: "TestStartFailsIfSetDeadlineFails", |
997 | DeadlineCondition: condition.Work(false)} // setdeadline will fail |
998 | @@ -529,7 +728,7 @@ |
999 | } |
1000 | |
1001 | func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) { |
1002 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1003 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1004 | c.Assert(err, IsNil) |
1005 | sess.Connection = &testConn{Name: "TestStartFailsIfWriteFails", |
1006 | WriteCondition: condition.Work(false)} // write will fail |
1007 | @@ -539,7 +738,7 @@ |
1008 | } |
1009 | |
1010 | func (cs *clientSessionSuite) TestStartFailsIfGetLevelsFails(c *C) { |
1011 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1012 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1013 | c.Assert(err, IsNil) |
1014 | sess.Levels = &brokenLevelMap{} |
1015 | sess.Connection = &testConn{Name: "TestStartConnectMessageFails"} |
1016 | @@ -559,7 +758,7 @@ |
1017 | } |
1018 | |
1019 | func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) { |
1020 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1021 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1022 | c.Assert(err, IsNil) |
1023 | sess.Connection = &testConn{Name: "TestStartConnectMessageFails"} |
1024 | errCh := make(chan error, 1) |
1025 | @@ -585,7 +784,7 @@ |
1026 | } |
1027 | |
1028 | func (cs *clientSessionSuite) TestStartConnackReadError(c *C) { |
1029 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1030 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1031 | c.Assert(err, IsNil) |
1032 | sess.Connection = &testConn{Name: "TestStartConnackReadError"} |
1033 | errCh := make(chan error, 1) |
1034 | @@ -609,7 +808,7 @@ |
1035 | } |
1036 | |
1037 | func (cs *clientSessionSuite) TestStartBadConnack(c *C) { |
1038 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1039 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1040 | c.Assert(err, IsNil) |
1041 | sess.Connection = &testConn{Name: "TestStartBadConnack"} |
1042 | errCh := make(chan error, 1) |
1043 | @@ -633,7 +832,7 @@ |
1044 | } |
1045 | |
1046 | func (cs *clientSessionSuite) TestStartNotConnack(c *C) { |
1047 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1048 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1049 | c.Assert(err, IsNil) |
1050 | sess.Connection = &testConn{Name: "TestStartBadConnack"} |
1051 | errCh := make(chan error, 1) |
1052 | @@ -657,7 +856,7 @@ |
1053 | } |
1054 | |
1055 | func (cs *clientSessionSuite) TestStartWorks(c *C) { |
1056 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1057 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1058 | c.Assert(err, IsNil) |
1059 | sess.Connection = &testConn{Name: "TestStartWorks"} |
1060 | errCh := make(chan error, 1) |
1061 | @@ -688,34 +887,49 @@ |
1062 | run() tests |
1063 | ****************************************************************/ |
1064 | |
1065 | -func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) { |
1066 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1067 | +func (cs *clientSessionSuite) TestRunBailsIfHostGetterFails(c *C) { |
1068 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1069 | c.Assert(err, IsNil) |
1070 | - failure := errors.New("TestRunBailsIfConnectFails") |
1071 | + failure := errors.New("TestRunBailsIfHostGetterFails") |
1072 | has_closed := false |
1073 | err = sess.run( |
1074 | func() { has_closed = true }, |
1075 | func() error { return failure }, |
1076 | nil, |
1077 | + nil, |
1078 | nil) |
1079 | c.Check(err, Equals, failure) |
1080 | c.Check(has_closed, Equals, true) |
1081 | } |
1082 | |
1083 | +func (cs *clientSessionSuite) TestRunBailsIfConnectFails(c *C) { |
1084 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1085 | + c.Assert(err, IsNil) |
1086 | + failure := errors.New("TestRunBailsIfConnectFails") |
1087 | + err = sess.run( |
1088 | + func() {}, |
1089 | + func() error { return nil }, |
1090 | + func() error { return failure }, |
1091 | + nil, |
1092 | + nil) |
1093 | + c.Check(err, Equals, failure) |
1094 | +} |
1095 | + |
1096 | func (cs *clientSessionSuite) TestRunBailsIfStartFails(c *C) { |
1097 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1098 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1099 | c.Assert(err, IsNil) |
1100 | failure := errors.New("TestRunBailsIfStartFails") |
1101 | err = sess.run( |
1102 | func() {}, |
1103 | func() error { return nil }, |
1104 | + func() error { return nil }, |
1105 | func() error { return failure }, |
1106 | nil) |
1107 | c.Check(err, Equals, failure) |
1108 | } |
1109 | |
1110 | func (cs *clientSessionSuite) TestRunRunsEvenIfLoopFails(c *C) { |
1111 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1112 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1113 | c.Assert(err, IsNil) |
1114 | // just to make a point: until here we haven't set ErrCh & MsgCh (no |
1115 | // biggie if this stops being true) |
1116 | @@ -727,6 +941,7 @@ |
1117 | func() {}, |
1118 | func() error { return nil }, |
1119 | func() error { return nil }, |
1120 | + func() error { return nil }, |
1121 | func() error { sess.MsgCh <- notf; return <-failureCh }) |
1122 | c.Check(err, Equals, nil) |
1123 | // if run doesn't error it sets up the channels |
1124 | @@ -744,7 +959,7 @@ |
1125 | ****************************************************************/ |
1126 | |
1127 | func (cs *clientSessionSuite) TestJitter(c *C) { |
1128 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1129 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1130 | c.Assert(err, IsNil) |
1131 | num_tries := 20 // should do the math |
1132 | spread := time.Second // |
1133 | @@ -776,12 +991,17 @@ |
1134 | |
1135 | func (cs *clientSessionSuite) TestDialPanics(c *C) { |
1136 | // one last unhappy test |
1137 | - sess, err := NewSession("", nil, 0, "wah", cs.lvls, cs.log) |
1138 | + sess, err := NewSession("", dummyConf, "wah", cs.lvls, cs.log) |
1139 | c.Assert(err, IsNil) |
1140 | sess.Protocolator = nil |
1141 | c.Check(sess.Dial, PanicMatches, ".*protocol constructor.") |
1142 | } |
1143 | |
1144 | +var ( |
1145 | + dialTestTimeout = 100 * time.Millisecond |
1146 | + dialTestConf = ClientSessionConfig{ExchangeTimeout: dialTestTimeout} |
1147 | +) |
1148 | + |
1149 | func (cs *clientSessionSuite) TestDialWorks(c *C) { |
1150 | // happy path thoughts |
1151 | cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock) |
1152 | @@ -791,10 +1011,22 @@ |
1153 | SessionTicketsDisabled: true, |
1154 | } |
1155 | |
1156 | - timeout := 100 * time.Millisecond |
1157 | lst, err := tls.Listen("tcp", "localhost:0", tlsCfg) |
1158 | c.Assert(err, IsNil) |
1159 | - sess, err := NewSession(lst.Addr().String(), nil, timeout, "wah", cs.lvls, cs.log) |
1160 | + // advertise |
1161 | + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
1162 | + b, err := json.Marshal(map[string]interface{}{ |
1163 | + "hosts": []string{"nowhere", lst.Addr().String()}, |
1164 | + }) |
1165 | + if err != nil { |
1166 | + panic(err) |
1167 | + } |
1168 | + w.Header().Set("Content-Type", "application/json") |
1169 | + w.Write(b) |
1170 | + })) |
1171 | + defer ts.Close() |
1172 | + |
1173 | + sess, err := NewSession(ts.URL, dialTestConf, "wah", cs.lvls, cs.log) |
1174 | c.Assert(err, IsNil) |
1175 | tconn := &testConn{CloseCondition: condition.Fail2Work(10)} |
1176 | sess.Connection = tconn |
1177 | @@ -819,10 +1051,13 @@ |
1178 | c.Check(tconn.CloseCondition.String(), Matches, ".* 9 to go.") |
1179 | |
1180 | // now, start: 1. protocol version |
1181 | - v, err := protocol.ReadWireFormatVersion(srv, timeout) |
1182 | + v, err := protocol.ReadWireFormatVersion(srv, dialTestTimeout) |
1183 | c.Assert(err, IsNil) |
1184 | c.Assert(v, Equals, protocol.ProtocolWireVersion) |
1185 | |
1186 | + // if something goes wrong session would try the first/other host |
1187 | + c.Check(sess.tryHost, Equals, 0) |
1188 | + |
1189 | // 2. "connect" (but on the fake protcol above! woo) |
1190 | |
1191 | c.Check(takeNext(downCh), Equals, "deadline 100ms") |
1192 | @@ -843,6 +1078,9 @@ |
1193 | c.Check(takeNext(downCh), Equals, protocol.PingPongMsg{Type: "pong"}) |
1194 | upCh <- nil |
1195 | |
1196 | + // session would retry the same host |
1197 | + c.Check(sess.tryHost, Equals, 1) |
1198 | + |
1199 | // and broadcasts... |
1200 | b := &protocol.BroadcastMsg{ |
1201 | Type: "broadcast", |
1202 | @@ -870,3 +1108,30 @@ |
1203 | upCh <- failure |
1204 | c.Check(<-sess.ErrCh, Equals, failure) |
1205 | } |
1206 | + |
1207 | +func (cs *clientSessionSuite) TestDialWorksDirect(c *C) { |
1208 | + // happy path thoughts |
1209 | + cert, err := tls.X509KeyPair(helpers.TestCertPEMBlock, helpers.TestKeyPEMBlock) |
1210 | + c.Assert(err, IsNil) |
1211 | + tlsCfg := &tls.Config{ |
1212 | + Certificates: []tls.Certificate{cert}, |
1213 | + SessionTicketsDisabled: true, |
1214 | + } |
1215 | + |
1216 | + lst, err := tls.Listen("tcp", "localhost:0", tlsCfg) |
1217 | + c.Assert(err, IsNil) |
1218 | + sess, err := NewSession(lst.Addr().String(), dialTestConf, "wah", cs.lvls, cs.log) |
1219 | + c.Assert(err, IsNil) |
1220 | + defer sess.Close() |
1221 | + |
1222 | + upCh := make(chan interface{}, 5) |
1223 | + downCh := make(chan interface{}, 5) |
1224 | + proto := &testProtocol{up: upCh, down: downCh} |
1225 | + sess.Protocolator = func(net.Conn) protocol.Protocol { return proto } |
1226 | + |
1227 | + go sess.Dial() |
1228 | + |
1229 | + _, err = lst.Accept() |
1230 | + c.Assert(err, IsNil) |
1231 | + // connect done |
1232 | +} |
1233 | |
1234 | === modified file 'debian/config.json' |
1235 | --- debian/config.json 2014-03-20 12:17:40 +0000 |
1236 | +++ debian/config.json 2014-03-31 14:33:42 +0000 |
1237 | @@ -1,5 +1,8 @@ |
1238 | { |
1239 | + "connect_timeout": "20s", |
1240 | "exchange_timeout": "30s", |
1241 | + "hosts_cache_expiry": "12h", |
1242 | + "expect_all_repaired": "40m", |
1243 | "addr": "push-delivery.ubuntu.com:443", |
1244 | "cert_pem_file": "", |
1245 | "stabilizing_timeout": "2s", |