Merge lp:~chipaca/ubuntu-push/client-v0-p4 into lp:ubuntu-push
- client-v0-p4
- Merge into trunk
Proposed by
John Lenton
Status: | Merged |
---|---|
Approved by: | John Lenton |
Approved revision: | 49 |
Merged at revision: | 48 |
Proposed branch: | lp:~chipaca/ubuntu-push/client-v0-p4 |
Merge into: | lp:ubuntu-push |
Diff against target: |
493 lines (+100/-26) 7 files modified
bus/connectivity/connectivity_test.go (+2/-3) bus/connectivity/webchecker_test.go (+2/-3) client/client_test.go (+9/-6) client/session/session.go (+39/-2) client/session/session_test.go (+21/-0) util/redialer.go (+25/-9) util/redialer_test.go (+2/-3) |
To merge this branch: | bzr merge lp:~chipaca/ubuntu-push/client-v0-p4 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Samuele Pedroni | Approve | ||
Review via email:
|
This proposal supersedes a proposal from 2014-02-03.
Commit message
Part 4: added State to client/session, to aid in testing some aspects of
this.
Description of the change
Part 4: added State to client/session, to aid in testing some aspects of
this.
To post a comment you must log in.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
John Lenton (chipaca) wrote : | # |
- 48. By John Lenton
-
fixed races using cannons
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
John Lenton (chipaca) wrote : | # |
OK, that "fixes" it.
I've added a mutex around the retrier's timeouts, for now; I'll be moving retrier into a class and making the mutex unnecessary. At some point.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Samuele Pedroni (pedronis) : | # |
review:
Approve
- 49. By John Lenton
-
merged trunk
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'bus/connectivity/connectivity_test.go' |
2 | --- bus/connectivity/connectivity_test.go 2014-01-31 13:31:09 +0000 |
3 | +++ bus/connectivity/connectivity_test.go 2014-02-04 13:39:58 +0000 |
4 | @@ -42,12 +42,11 @@ |
5 | var nullog = logger.NewSimpleLogger(ioutil.Discard, "error") |
6 | |
7 | func (s *ConnSuite) SetUpSuite(c *C) { |
8 | - s.timeouts = util.Timeouts |
9 | - util.Timeouts = []time.Duration{0, 0, 0, 0} |
10 | + s.timeouts = util.SwapTimeouts([]time.Duration{0, 0, 0, 0}) |
11 | } |
12 | |
13 | func (s *ConnSuite) TearDownSuite(c *C) { |
14 | - util.Timeouts = s.timeouts |
15 | + util.SwapTimeouts(s.timeouts) |
16 | s.timeouts = nil |
17 | } |
18 | |
19 | |
20 | === modified file 'bus/connectivity/webchecker_test.go' |
21 | --- bus/connectivity/webchecker_test.go 2014-01-24 14:12:22 +0000 |
22 | +++ bus/connectivity/webchecker_test.go 2014-02-04 13:39:58 +0000 |
23 | @@ -61,12 +61,11 @@ |
24 | } |
25 | |
26 | func (s *WebcheckerSuite) SetUpSuite(c *C) { |
27 | - s.timeouts = util.Timeouts |
28 | - util.Timeouts = []time.Duration{0} |
29 | + s.timeouts = util.SwapTimeouts([]time.Duration{0}) |
30 | } |
31 | |
32 | func (s *WebcheckerSuite) TearDownSuite(c *C) { |
33 | - util.Timeouts = s.timeouts |
34 | + util.SwapTimeouts(s.timeouts) |
35 | s.timeouts = nil |
36 | } |
37 | |
38 | |
39 | === modified file 'client/client_test.go' |
40 | --- client/client_test.go 2014-02-03 18:17:27 +0000 |
41 | +++ client/client_test.go 2014-02-04 13:39:58 +0000 |
42 | @@ -70,9 +70,16 @@ |
43 | } |
44 | } |
45 | |
46 | +func (cs *clientSuite) SetUpSuite(c *C) { |
47 | + cs.timeouts = util.SwapTimeouts([]time.Duration{0}) |
48 | +} |
49 | + |
50 | +func (cs *clientSuite) TearDownSuite(c *C) { |
51 | + util.SwapTimeouts(cs.timeouts) |
52 | + cs.timeouts = nil |
53 | +} |
54 | + |
55 | func (cs *clientSuite) SetUpTest(c *C) { |
56 | - cs.timeouts = util.Timeouts |
57 | - util.Timeouts = []time.Duration{0} |
58 | dir := c.MkDir() |
59 | cs.configPath = filepath.Join(dir, "config") |
60 | cfg := fmt.Sprintf(` |
61 | @@ -88,10 +95,6 @@ |
62 | ioutil.WriteFile(cs.configPath, []byte(cfg), 0600) |
63 | } |
64 | |
65 | -func (cs *clientSuite) TearDownTest(c *C) { |
66 | - util.Timeouts = cs.timeouts |
67 | -} |
68 | - |
69 | /***************************************************************** |
70 | Configure tests |
71 | ******************************************************************/ |
72 | |
73 | === modified file 'client/session/session.go' |
74 | --- client/session/session.go 2014-02-01 21:04:32 +0000 |
75 | +++ client/session/session.go 2014-02-04 13:39:58 +0000 |
76 | @@ -27,6 +27,7 @@ |
77 | "launchpad.net/ubuntu-push/logger" |
78 | "launchpad.net/ubuntu-push/protocol" |
79 | "net" |
80 | + "sync/atomic" |
81 | "time" |
82 | ) |
83 | |
84 | @@ -42,6 +43,17 @@ |
85 | protocol.NotificationsMsg |
86 | } |
87 | |
88 | +// ClientSessionState is a way to broadly track the progress of the session |
89 | +type ClientSessionState uint32 |
90 | + |
91 | +const ( |
92 | + Error ClientSessionState = iota |
93 | + Disconnected |
94 | + Connected |
95 | + Started |
96 | + Running |
97 | +) |
98 | + |
99 | // ClienSession holds a client<->server session and its configuration. |
100 | type ClientSession struct { |
101 | // configuration |
102 | @@ -57,12 +69,14 @@ |
103 | proto protocol.Protocol |
104 | pingInterval time.Duration |
105 | // status |
106 | - ErrCh chan error |
107 | - MsgCh chan *Notification |
108 | + stateP *uint32 |
109 | + ErrCh chan error |
110 | + MsgCh chan *Notification |
111 | } |
112 | |
113 | func NewSession(serverAddr string, pem []byte, exchangeTimeout time.Duration, |
114 | deviceId string, log logger.Logger) (*ClientSession, error) { |
115 | + state := uint32(Disconnected) |
116 | sess := &ClientSession{ |
117 | ExchangeTimeout: exchangeTimeout, |
118 | ServerAddr: serverAddr, |
119 | @@ -71,6 +85,7 @@ |
120 | Protocolator: protocol.NewProtocol0, |
121 | Levels: levelmap.NewLevelMap(), |
122 | TLS: &tls.Config{InsecureSkipVerify: true}, // XXX |
123 | + stateP: &state, |
124 | } |
125 | if pem != nil { |
126 | cp := x509.NewCertPool() |
127 | @@ -83,14 +98,24 @@ |
128 | return sess, nil |
129 | } |
130 | |
131 | +func (sess *ClientSession) State() ClientSessionState { |
132 | + return ClientSessionState(atomic.LoadUint32(sess.stateP)) |
133 | +} |
134 | + |
135 | +func (sess *ClientSession) setState(state ClientSessionState) { |
136 | + atomic.StoreUint32(sess.stateP, uint32(state)) |
137 | +} |
138 | + |
139 | // connect to a server using the configuration in the ClientSession |
140 | // and set up the connection. |
141 | func (sess *ClientSession) connect() error { |
142 | conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout) |
143 | if err != nil { |
144 | + sess.setState(Error) |
145 | return fmt.Errorf("connect: %s", err) |
146 | } |
147 | sess.Connection = tls.Client(conn, sess.TLS) |
148 | + sess.setState(Connected) |
149 | return nil |
150 | } |
151 | |
152 | @@ -102,6 +127,7 @@ |
153 | // you could do to recover at this stage). |
154 | sess.Connection = nil |
155 | } |
156 | + sess.setState(Disconnected) |
157 | } |
158 | |
159 | // handle "ping" messages |
160 | @@ -110,6 +136,7 @@ |
161 | if err == nil { |
162 | sess.Log.Debugf("ping.") |
163 | } else { |
164 | + sess.setState(Error) |
165 | sess.Log.Errorf("unable to pong: %s", err) |
166 | } |
167 | return err |
168 | @@ -119,6 +146,7 @@ |
169 | func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error { |
170 | err := sess.proto.WriteMessage(protocol.AckMsg{"ack"}) |
171 | if err != nil { |
172 | + sess.setState(Error) |
173 | return err |
174 | } |
175 | sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s", |
176 | @@ -137,11 +165,13 @@ |
177 | func (sess *ClientSession) loop() error { |
178 | var err error |
179 | var recv serverMsg |
180 | + sess.setState(Running) |
181 | for { |
182 | deadAfter := sess.pingInterval + sess.ExchangeTimeout |
183 | sess.proto.SetDeadline(time.Now().Add(deadAfter)) |
184 | err = sess.proto.ReadMessage(&recv) |
185 | if err != nil { |
186 | + sess.setState(Error) |
187 | return err |
188 | } |
189 | switch recv.Type { |
190 | @@ -161,12 +191,14 @@ |
191 | conn := sess.Connection |
192 | err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
193 | if err != nil { |
194 | + sess.setState(Error) |
195 | return err |
196 | } |
197 | _, err = conn.Write(wireVersionBytes) |
198 | // The Writer docs: Write must return a non-nil error if it returns |
199 | // n < len(p). So, no need to check number of bytes written, hooray. |
200 | if err != nil { |
201 | + sess.setState(Error) |
202 | return err |
203 | } |
204 | proto := sess.Protocolator(conn) |
205 | @@ -177,23 +209,28 @@ |
206 | Levels: sess.Levels.GetAll(), |
207 | }) |
208 | if err != nil { |
209 | + sess.setState(Error) |
210 | return err |
211 | } |
212 | var connAck protocol.ConnAckMsg |
213 | err = proto.ReadMessage(&connAck) |
214 | if err != nil { |
215 | + sess.setState(Error) |
216 | return err |
217 | } |
218 | if connAck.Type != "connack" { |
219 | + sess.setState(Error) |
220 | return fmt.Errorf("expecting CONNACK, got %#v", connAck.Type) |
221 | } |
222 | pingInterval, err := time.ParseDuration(connAck.Params.PingInterval) |
223 | if err != nil { |
224 | + sess.setState(Error) |
225 | return err |
226 | } |
227 | sess.proto = proto |
228 | sess.pingInterval = pingInterval |
229 | sess.Log.Debugf("Connected %v.", conn.LocalAddr()) |
230 | + sess.setState(Started) |
231 | return nil |
232 | } |
233 | |
234 | |
235 | === modified file 'client/session/session_test.go' |
236 | --- client/session/session_test.go 2014-02-01 21:04:32 +0000 |
237 | +++ client/session/session_test.go 2014-02-04 13:39:58 +0000 |
238 | @@ -168,6 +168,7 @@ |
239 | c.Check(err, IsNil) |
240 | // but no root CAs set |
241 | c.Check(sess.TLS.RootCAs, IsNil) |
242 | + c.Check(sess.State(), Equals, Disconnected) |
243 | } |
244 | |
245 | var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert") |
246 | @@ -196,6 +197,7 @@ |
247 | c.Assert(err, IsNil) |
248 | err = sess.connect() |
249 | c.Check(err, ErrorMatches, ".*connect.*address.*") |
250 | + c.Check(sess.State(), Equals, Error) |
251 | } |
252 | |
253 | func (cs *clientSessionSuite) TestConnectConnects(c *C) { |
254 | @@ -207,6 +209,7 @@ |
255 | err = sess.connect() |
256 | c.Check(err, IsNil) |
257 | c.Check(sess.Connection, NotNil) |
258 | + c.Check(sess.State(), Equals, Connected) |
259 | } |
260 | |
261 | func (cs *clientSessionSuite) TestConnectConnectFail(c *C) { |
262 | @@ -217,6 +220,7 @@ |
263 | c.Assert(err, IsNil) |
264 | err = sess.connect() |
265 | c.Check(err, ErrorMatches, ".*connection refused") |
266 | + c.Check(sess.State(), Equals, Error) |
267 | } |
268 | |
269 | /**************************************************************** |
270 | @@ -229,6 +233,7 @@ |
271 | sess.Connection = &testConn{Name: "TestClose"} |
272 | sess.Close() |
273 | c.Check(sess.Connection, IsNil) |
274 | + c.Check(sess.State(), Equals, Disconnected) |
275 | } |
276 | |
277 | func (cs *clientSessionSuite) TestCloseTwice(c *C) { |
278 | @@ -239,6 +244,7 @@ |
279 | c.Check(sess.Connection, IsNil) |
280 | sess.Close() |
281 | c.Check(sess.Connection, IsNil) |
282 | + c.Check(sess.State(), Equals, Disconnected) |
283 | } |
284 | |
285 | func (cs *clientSessionSuite) TestCloseFails(c *C) { |
286 | @@ -247,6 +253,7 @@ |
287 | sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)} |
288 | sess.Close() |
289 | c.Check(sess.Connection, IsNil) // nothing you can do to clean up anyway |
290 | + c.Check(sess.State(), Equals, Disconnected) |
291 | } |
292 | |
293 | /**************************************************************** |
294 | @@ -289,6 +296,7 @@ |
295 | c.Check(s.sess.handlePing(), Equals, failure) |
296 | c.Assert(len(s.downCh), Equals, 1) |
297 | c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"}) |
298 | + c.Check(s.sess.State(), Equals, Error) |
299 | } |
300 | |
301 | /**************************************************************** |
302 | @@ -328,6 +336,7 @@ |
303 | failure := errors.New("ACK ACK ACK") |
304 | s.upCh <- failure |
305 | c.Assert(<-s.errCh, Equals, failure) |
306 | + c.Check(s.sess.State(), Equals, Error) |
307 | } |
308 | |
309 | func (s *msgSuite) TestHandleBroadcastWrongChannel(c *C) { |
310 | @@ -363,12 +372,15 @@ |
311 | } |
312 | |
313 | func (s *loopSuite) TestLoopReadError(c *C) { |
314 | + c.Check(s.sess.State(), Equals, Running) |
315 | s.upCh <- errors.New("Read") |
316 | err := <-s.errCh |
317 | c.Check(err, ErrorMatches, "Read") |
318 | + c.Check(s.sess.State(), Equals, Error) |
319 | } |
320 | |
321 | func (s *loopSuite) TestLoopPing(c *C) { |
322 | + c.Check(s.sess.State(), Equals, Running) |
323 | c.Check(takeNext(s.downCh), Equals, "deadline 1ms") |
324 | s.upCh <- protocol.PingPongMsg{Type: "ping"} |
325 | c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"}) |
326 | @@ -378,6 +390,7 @@ |
327 | } |
328 | |
329 | func (s *loopSuite) TestLoopLoopsDaLoop(c *C) { |
330 | + c.Check(s.sess.State(), Equals, Running) |
331 | for i := 1; i < 10; i++ { |
332 | c.Check(takeNext(s.downCh), Equals, "deadline 1ms") |
333 | s.upCh <- protocol.PingPongMsg{Type: "ping"} |
334 | @@ -390,6 +403,7 @@ |
335 | } |
336 | |
337 | func (s *loopSuite) TestLoopBroadcast(c *C) { |
338 | + c.Check(s.sess.State(), Equals, Running) |
339 | b := &protocol.BroadcastMsg{ |
340 | Type: "broadcast", |
341 | AppId: "--ignored--", |
342 | @@ -415,6 +429,7 @@ |
343 | DeadlineCondition: condition.Work(false)} // setdeadline will fail |
344 | err = sess.start() |
345 | c.Check(err, ErrorMatches, ".*deadline.*") |
346 | + c.Check(sess.State(), Equals, Error) |
347 | } |
348 | |
349 | func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) { |
350 | @@ -424,6 +439,7 @@ |
351 | WriteCondition: condition.Work(false)} // write will fail |
352 | err = sess.start() |
353 | c.Check(err, ErrorMatches, ".*write.*") |
354 | + c.Check(sess.State(), Equals, Error) |
355 | } |
356 | |
357 | func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) { |
358 | @@ -449,6 +465,7 @@ |
359 | upCh <- errors.New("Overflow error in /dev/null") |
360 | err = <-errCh |
361 | c.Check(err, ErrorMatches, "Overflow.*null") |
362 | + c.Check(sess.State(), Equals, Error) |
363 | } |
364 | |
365 | func (cs *clientSessionSuite) TestStartConnackReadError(c *C) { |
366 | @@ -472,6 +489,7 @@ |
367 | upCh <- io.EOF |
368 | err = <-errCh |
369 | c.Check(err, ErrorMatches, ".*EOF.*") |
370 | + c.Check(sess.State(), Equals, Error) |
371 | } |
372 | |
373 | func (cs *clientSessionSuite) TestStartBadConnack(c *C) { |
374 | @@ -495,6 +513,7 @@ |
375 | upCh <- protocol.ConnAckMsg{Type: "connack"} |
376 | err = <-errCh |
377 | c.Check(err, ErrorMatches, ".*invalid.*") |
378 | + c.Check(sess.State(), Equals, Error) |
379 | } |
380 | |
381 | func (cs *clientSessionSuite) TestStartNotConnack(c *C) { |
382 | @@ -518,6 +537,7 @@ |
383 | upCh <- protocol.ConnAckMsg{Type: "connnak"} |
384 | err = <-errCh |
385 | c.Check(err, ErrorMatches, ".*CONNACK.*") |
386 | + c.Check(sess.State(), Equals, Error) |
387 | } |
388 | |
389 | func (cs *clientSessionSuite) TestStartWorks(c *C) { |
390 | @@ -545,6 +565,7 @@ |
391 | // start is now done. |
392 | err = <-errCh |
393 | c.Check(err, IsNil) |
394 | + c.Check(sess.State(), Equals, Started) |
395 | } |
396 | |
397 | /**************************************************************** |
398 | |
399 | === modified file 'util/redialer.go' |
400 | --- util/redialer.go 2014-02-03 18:17:27 +0000 |
401 | +++ util/redialer.go 2014-02-04 13:39:58 +0000 |
402 | @@ -18,6 +18,7 @@ |
403 | |
404 | import ( |
405 | "math/rand" |
406 | + "sync" |
407 | "time" |
408 | ) |
409 | |
410 | @@ -30,14 +31,28 @@ |
411 | Jitter(time.Duration) time.Duration |
412 | } |
413 | |
414 | -// The timeouts used during backoff. While this is public, you'd |
415 | -// usually not need to meddle with it. |
416 | -var Timeouts []time.Duration |
417 | +// The timeouts used during backoff. |
418 | +var timeouts []time.Duration |
419 | +var trwlock sync.RWMutex |
420 | |
421 | var ( // for use in testing |
422 | quitRedialing chan bool = make(chan bool) |
423 | ) |
424 | |
425 | +func Timeouts() []time.Duration { |
426 | + trwlock.RLock() |
427 | + defer trwlock.RUnlock() |
428 | + return timeouts |
429 | +} |
430 | + |
431 | +// for testing |
432 | +func SwapTimeouts(newTimeouts []time.Duration) (oldTimeouts []time.Duration) { |
433 | + trwlock.Lock() |
434 | + defer trwlock.Unlock() |
435 | + oldTimeouts, timeouts = timeouts, newTimeouts |
436 | + return |
437 | +} |
438 | + |
439 | // Jitter returns a random time.Duration somewhere in [-spread, spread]. |
440 | // |
441 | // This is meant as a default implementation for Dialers to use if wanted. |
442 | @@ -60,15 +75,16 @@ |
443 | func (ar *AutoRetrier) Retry() uint32 { |
444 | var timeout time.Duration |
445 | var dialAttempts uint32 = 0 // unsigned so it can wrap safely ... |
446 | - var numTimeouts uint32 = uint32(len(Timeouts)) |
447 | + timeouts := Timeouts() |
448 | + var numTimeouts uint32 = uint32(len(timeouts)) |
449 | for { |
450 | if ar.Dial() == nil { |
451 | return dialAttempts + 1 |
452 | } |
453 | if dialAttempts < numTimeouts { |
454 | - timeout = Timeouts[dialAttempts] |
455 | + timeout = timeouts[dialAttempts] |
456 | } else { |
457 | - timeout = Timeouts[numTimeouts-1] |
458 | + timeout = timeouts[numTimeouts-1] |
459 | } |
460 | timeout += ar.Jitter(timeout) |
461 | dialAttempts++ |
462 | @@ -90,10 +106,10 @@ |
463 | |
464 | func init() { |
465 | ps := []int{1, 2, 5, 11, 19, 37, 67, 113, 191} // 3 pₙ₊₁ ≥ 5 pₙ |
466 | - Timeouts = make([]time.Duration, len(ps)) |
467 | + timeouts := make([]time.Duration, len(ps)) |
468 | for i, n := range ps { |
469 | - Timeouts[i] = time.Duration(n) * time.Second |
470 | + timeouts[i] = time.Duration(n) * time.Second |
471 | } |
472 | - |
473 | + SwapTimeouts(timeouts) |
474 | rand.Seed(time.Now().Unix()) // good enough for us (not crypto, yadda) |
475 | } |
476 | |
477 | === modified file 'util/redialer_test.go' |
478 | --- util/redialer_test.go 2014-02-03 18:17:27 +0000 |
479 | +++ util/redialer_test.go 2014-02-04 13:39:58 +0000 |
480 | @@ -38,12 +38,11 @@ |
481 | var _ = Suite(&RedialerSuite{}) |
482 | |
483 | func (s *RedialerSuite) SetUpSuite(c *C) { |
484 | - s.timeouts = Timeouts |
485 | - Timeouts = []time.Duration{0, 0} |
486 | + s.timeouts = SwapTimeouts([]time.Duration{0, 0}) |
487 | } |
488 | |
489 | func (s *RedialerSuite) TearDownSuite(c *C) { |
490 | - Timeouts = s.timeouts |
491 | + SwapTimeouts(s.timeouts) |
492 | s.timeouts = nil |
493 | } |
494 |
This is a terrible idea. Why did I think it would work?