Merge lp:~chipaca/ubuntu-push/client-v0-p4 into lp:ubuntu-push

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: 49
Merged at revision: 48
Proposed branch: lp:~chipaca/ubuntu-push/client-v0-p4
Merge into: lp:ubuntu-push
Diff against target: 493 lines (+100/-26)
7 files modified
bus/connectivity/connectivity_test.go (+2/-3)
bus/connectivity/webchecker_test.go (+2/-3)
client/client_test.go (+9/-6)
client/session/session.go (+39/-2)
client/session/session_test.go (+21/-0)
util/redialer.go (+25/-9)
util/redialer_test.go (+2/-3)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/client-v0-p4
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+204560@code.launchpad.net

This proposal supersedes a proposal from 2014-02-03.

Commit message

Part 4: added State to client/session, to aid in testing some aspects of
this.

Description of the change

Part 4: added State to client/session, to aid in testing some aspects of
this.

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

This is a terrible idea. Why did I think it would work?

lp:~chipaca/ubuntu-push/client-v0-p4 updated
48. By John Lenton

fixed races using cannons

Revision history for this message
John Lenton (chipaca) wrote :

OK, that "fixes" it.

I've added a mutex around the retrier's timeouts, for now; I'll be moving retrier into a class and making the mutex unnecessary. At some point.

Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve
lp:~chipaca/ubuntu-push/client-v0-p4 updated
49. By John Lenton

merged trunk

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'bus/connectivity/connectivity_test.go'
2--- bus/connectivity/connectivity_test.go 2014-01-31 13:31:09 +0000
3+++ bus/connectivity/connectivity_test.go 2014-02-04 13:39:58 +0000
4@@ -42,12 +42,11 @@
5 var nullog = logger.NewSimpleLogger(ioutil.Discard, "error")
6
7 func (s *ConnSuite) SetUpSuite(c *C) {
8- s.timeouts = util.Timeouts
9- util.Timeouts = []time.Duration{0, 0, 0, 0}
10+ s.timeouts = util.SwapTimeouts([]time.Duration{0, 0, 0, 0})
11 }
12
13 func (s *ConnSuite) TearDownSuite(c *C) {
14- util.Timeouts = s.timeouts
15+ util.SwapTimeouts(s.timeouts)
16 s.timeouts = nil
17 }
18
19
20=== modified file 'bus/connectivity/webchecker_test.go'
21--- bus/connectivity/webchecker_test.go 2014-01-24 14:12:22 +0000
22+++ bus/connectivity/webchecker_test.go 2014-02-04 13:39:58 +0000
23@@ -61,12 +61,11 @@
24 }
25
26 func (s *WebcheckerSuite) SetUpSuite(c *C) {
27- s.timeouts = util.Timeouts
28- util.Timeouts = []time.Duration{0}
29+ s.timeouts = util.SwapTimeouts([]time.Duration{0})
30 }
31
32 func (s *WebcheckerSuite) TearDownSuite(c *C) {
33- util.Timeouts = s.timeouts
34+ util.SwapTimeouts(s.timeouts)
35 s.timeouts = nil
36 }
37
38
39=== modified file 'client/client_test.go'
40--- client/client_test.go 2014-02-03 18:17:27 +0000
41+++ client/client_test.go 2014-02-04 13:39:58 +0000
42@@ -70,9 +70,16 @@
43 }
44 }
45
46+func (cs *clientSuite) SetUpSuite(c *C) {
47+ cs.timeouts = util.SwapTimeouts([]time.Duration{0})
48+}
49+
50+func (cs *clientSuite) TearDownSuite(c *C) {
51+ util.SwapTimeouts(cs.timeouts)
52+ cs.timeouts = nil
53+}
54+
55 func (cs *clientSuite) SetUpTest(c *C) {
56- cs.timeouts = util.Timeouts
57- util.Timeouts = []time.Duration{0}
58 dir := c.MkDir()
59 cs.configPath = filepath.Join(dir, "config")
60 cfg := fmt.Sprintf(`
61@@ -88,10 +95,6 @@
62 ioutil.WriteFile(cs.configPath, []byte(cfg), 0600)
63 }
64
65-func (cs *clientSuite) TearDownTest(c *C) {
66- util.Timeouts = cs.timeouts
67-}
68-
69 /*****************************************************************
70 Configure tests
71 ******************************************************************/
72
73=== modified file 'client/session/session.go'
74--- client/session/session.go 2014-02-01 21:04:32 +0000
75+++ client/session/session.go 2014-02-04 13:39:58 +0000
76@@ -27,6 +27,7 @@
77 "launchpad.net/ubuntu-push/logger"
78 "launchpad.net/ubuntu-push/protocol"
79 "net"
80+ "sync/atomic"
81 "time"
82 )
83
84@@ -42,6 +43,17 @@
85 protocol.NotificationsMsg
86 }
87
88+// ClientSessionState is a way to broadly track the progress of the session
89+type ClientSessionState uint32
90+
91+const (
92+ Error ClientSessionState = iota
93+ Disconnected
94+ Connected
95+ Started
96+ Running
97+)
98+
99 // ClienSession holds a client<->server session and its configuration.
100 type ClientSession struct {
101 // configuration
102@@ -57,12 +69,14 @@
103 proto protocol.Protocol
104 pingInterval time.Duration
105 // status
106- ErrCh chan error
107- MsgCh chan *Notification
108+ stateP *uint32
109+ ErrCh chan error
110+ MsgCh chan *Notification
111 }
112
113 func NewSession(serverAddr string, pem []byte, exchangeTimeout time.Duration,
114 deviceId string, log logger.Logger) (*ClientSession, error) {
115+ state := uint32(Disconnected)
116 sess := &ClientSession{
117 ExchangeTimeout: exchangeTimeout,
118 ServerAddr: serverAddr,
119@@ -71,6 +85,7 @@
120 Protocolator: protocol.NewProtocol0,
121 Levels: levelmap.NewLevelMap(),
122 TLS: &tls.Config{InsecureSkipVerify: true}, // XXX
123+ stateP: &state,
124 }
125 if pem != nil {
126 cp := x509.NewCertPool()
127@@ -83,14 +98,24 @@
128 return sess, nil
129 }
130
131+func (sess *ClientSession) State() ClientSessionState {
132+ return ClientSessionState(atomic.LoadUint32(sess.stateP))
133+}
134+
135+func (sess *ClientSession) setState(state ClientSessionState) {
136+ atomic.StoreUint32(sess.stateP, uint32(state))
137+}
138+
139 // connect to a server using the configuration in the ClientSession
140 // and set up the connection.
141 func (sess *ClientSession) connect() error {
142 conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout)
143 if err != nil {
144+ sess.setState(Error)
145 return fmt.Errorf("connect: %s", err)
146 }
147 sess.Connection = tls.Client(conn, sess.TLS)
148+ sess.setState(Connected)
149 return nil
150 }
151
152@@ -102,6 +127,7 @@
153 // you could do to recover at this stage).
154 sess.Connection = nil
155 }
156+ sess.setState(Disconnected)
157 }
158
159 // handle "ping" messages
160@@ -110,6 +136,7 @@
161 if err == nil {
162 sess.Log.Debugf("ping.")
163 } else {
164+ sess.setState(Error)
165 sess.Log.Errorf("unable to pong: %s", err)
166 }
167 return err
168@@ -119,6 +146,7 @@
169 func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
170 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})
171 if err != nil {
172+ sess.setState(Error)
173 return err
174 }
175 sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s",
176@@ -137,11 +165,13 @@
177 func (sess *ClientSession) loop() error {
178 var err error
179 var recv serverMsg
180+ sess.setState(Running)
181 for {
182 deadAfter := sess.pingInterval + sess.ExchangeTimeout
183 sess.proto.SetDeadline(time.Now().Add(deadAfter))
184 err = sess.proto.ReadMessage(&recv)
185 if err != nil {
186+ sess.setState(Error)
187 return err
188 }
189 switch recv.Type {
190@@ -161,12 +191,14 @@
191 conn := sess.Connection
192 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
193 if err != nil {
194+ sess.setState(Error)
195 return err
196 }
197 _, err = conn.Write(wireVersionBytes)
198 // The Writer docs: Write must return a non-nil error if it returns
199 // n < len(p). So, no need to check number of bytes written, hooray.
200 if err != nil {
201+ sess.setState(Error)
202 return err
203 }
204 proto := sess.Protocolator(conn)
205@@ -177,23 +209,28 @@
206 Levels: sess.Levels.GetAll(),
207 })
208 if err != nil {
209+ sess.setState(Error)
210 return err
211 }
212 var connAck protocol.ConnAckMsg
213 err = proto.ReadMessage(&connAck)
214 if err != nil {
215+ sess.setState(Error)
216 return err
217 }
218 if connAck.Type != "connack" {
219+ sess.setState(Error)
220 return fmt.Errorf("expecting CONNACK, got %#v", connAck.Type)
221 }
222 pingInterval, err := time.ParseDuration(connAck.Params.PingInterval)
223 if err != nil {
224+ sess.setState(Error)
225 return err
226 }
227 sess.proto = proto
228 sess.pingInterval = pingInterval
229 sess.Log.Debugf("Connected %v.", conn.LocalAddr())
230+ sess.setState(Started)
231 return nil
232 }
233
234
235=== modified file 'client/session/session_test.go'
236--- client/session/session_test.go 2014-02-01 21:04:32 +0000
237+++ client/session/session_test.go 2014-02-04 13:39:58 +0000
238@@ -168,6 +168,7 @@
239 c.Check(err, IsNil)
240 // but no root CAs set
241 c.Check(sess.TLS.RootCAs, IsNil)
242+ c.Check(sess.State(), Equals, Disconnected)
243 }
244
245 var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert")
246@@ -196,6 +197,7 @@
247 c.Assert(err, IsNil)
248 err = sess.connect()
249 c.Check(err, ErrorMatches, ".*connect.*address.*")
250+ c.Check(sess.State(), Equals, Error)
251 }
252
253 func (cs *clientSessionSuite) TestConnectConnects(c *C) {
254@@ -207,6 +209,7 @@
255 err = sess.connect()
256 c.Check(err, IsNil)
257 c.Check(sess.Connection, NotNil)
258+ c.Check(sess.State(), Equals, Connected)
259 }
260
261 func (cs *clientSessionSuite) TestConnectConnectFail(c *C) {
262@@ -217,6 +220,7 @@
263 c.Assert(err, IsNil)
264 err = sess.connect()
265 c.Check(err, ErrorMatches, ".*connection refused")
266+ c.Check(sess.State(), Equals, Error)
267 }
268
269 /****************************************************************
270@@ -229,6 +233,7 @@
271 sess.Connection = &testConn{Name: "TestClose"}
272 sess.Close()
273 c.Check(sess.Connection, IsNil)
274+ c.Check(sess.State(), Equals, Disconnected)
275 }
276
277 func (cs *clientSessionSuite) TestCloseTwice(c *C) {
278@@ -239,6 +244,7 @@
279 c.Check(sess.Connection, IsNil)
280 sess.Close()
281 c.Check(sess.Connection, IsNil)
282+ c.Check(sess.State(), Equals, Disconnected)
283 }
284
285 func (cs *clientSessionSuite) TestCloseFails(c *C) {
286@@ -247,6 +253,7 @@
287 sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)}
288 sess.Close()
289 c.Check(sess.Connection, IsNil) // nothing you can do to clean up anyway
290+ c.Check(sess.State(), Equals, Disconnected)
291 }
292
293 /****************************************************************
294@@ -289,6 +296,7 @@
295 c.Check(s.sess.handlePing(), Equals, failure)
296 c.Assert(len(s.downCh), Equals, 1)
297 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})
298+ c.Check(s.sess.State(), Equals, Error)
299 }
300
301 /****************************************************************
302@@ -328,6 +336,7 @@
303 failure := errors.New("ACK ACK ACK")
304 s.upCh <- failure
305 c.Assert(<-s.errCh, Equals, failure)
306+ c.Check(s.sess.State(), Equals, Error)
307 }
308
309 func (s *msgSuite) TestHandleBroadcastWrongChannel(c *C) {
310@@ -363,12 +372,15 @@
311 }
312
313 func (s *loopSuite) TestLoopReadError(c *C) {
314+ c.Check(s.sess.State(), Equals, Running)
315 s.upCh <- errors.New("Read")
316 err := <-s.errCh
317 c.Check(err, ErrorMatches, "Read")
318+ c.Check(s.sess.State(), Equals, Error)
319 }
320
321 func (s *loopSuite) TestLoopPing(c *C) {
322+ c.Check(s.sess.State(), Equals, Running)
323 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
324 s.upCh <- protocol.PingPongMsg{Type: "ping"}
325 c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
326@@ -378,6 +390,7 @@
327 }
328
329 func (s *loopSuite) TestLoopLoopsDaLoop(c *C) {
330+ c.Check(s.sess.State(), Equals, Running)
331 for i := 1; i < 10; i++ {
332 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
333 s.upCh <- protocol.PingPongMsg{Type: "ping"}
334@@ -390,6 +403,7 @@
335 }
336
337 func (s *loopSuite) TestLoopBroadcast(c *C) {
338+ c.Check(s.sess.State(), Equals, Running)
339 b := &protocol.BroadcastMsg{
340 Type: "broadcast",
341 AppId: "--ignored--",
342@@ -415,6 +429,7 @@
343 DeadlineCondition: condition.Work(false)} // setdeadline will fail
344 err = sess.start()
345 c.Check(err, ErrorMatches, ".*deadline.*")
346+ c.Check(sess.State(), Equals, Error)
347 }
348
349 func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) {
350@@ -424,6 +439,7 @@
351 WriteCondition: condition.Work(false)} // write will fail
352 err = sess.start()
353 c.Check(err, ErrorMatches, ".*write.*")
354+ c.Check(sess.State(), Equals, Error)
355 }
356
357 func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) {
358@@ -449,6 +465,7 @@
359 upCh <- errors.New("Overflow error in /dev/null")
360 err = <-errCh
361 c.Check(err, ErrorMatches, "Overflow.*null")
362+ c.Check(sess.State(), Equals, Error)
363 }
364
365 func (cs *clientSessionSuite) TestStartConnackReadError(c *C) {
366@@ -472,6 +489,7 @@
367 upCh <- io.EOF
368 err = <-errCh
369 c.Check(err, ErrorMatches, ".*EOF.*")
370+ c.Check(sess.State(), Equals, Error)
371 }
372
373 func (cs *clientSessionSuite) TestStartBadConnack(c *C) {
374@@ -495,6 +513,7 @@
375 upCh <- protocol.ConnAckMsg{Type: "connack"}
376 err = <-errCh
377 c.Check(err, ErrorMatches, ".*invalid.*")
378+ c.Check(sess.State(), Equals, Error)
379 }
380
381 func (cs *clientSessionSuite) TestStartNotConnack(c *C) {
382@@ -518,6 +537,7 @@
383 upCh <- protocol.ConnAckMsg{Type: "connnak"}
384 err = <-errCh
385 c.Check(err, ErrorMatches, ".*CONNACK.*")
386+ c.Check(sess.State(), Equals, Error)
387 }
388
389 func (cs *clientSessionSuite) TestStartWorks(c *C) {
390@@ -545,6 +565,7 @@
391 // start is now done.
392 err = <-errCh
393 c.Check(err, IsNil)
394+ c.Check(sess.State(), Equals, Started)
395 }
396
397 /****************************************************************
398
399=== modified file 'util/redialer.go'
400--- util/redialer.go 2014-02-03 18:17:27 +0000
401+++ util/redialer.go 2014-02-04 13:39:58 +0000
402@@ -18,6 +18,7 @@
403
404 import (
405 "math/rand"
406+ "sync"
407 "time"
408 )
409
410@@ -30,14 +31,28 @@
411 Jitter(time.Duration) time.Duration
412 }
413
414-// The timeouts used during backoff. While this is public, you'd
415-// usually not need to meddle with it.
416-var Timeouts []time.Duration
417+// The timeouts used during backoff.
418+var timeouts []time.Duration
419+var trwlock sync.RWMutex
420
421 var ( // for use in testing
422 quitRedialing chan bool = make(chan bool)
423 )
424
425+func Timeouts() []time.Duration {
426+ trwlock.RLock()
427+ defer trwlock.RUnlock()
428+ return timeouts
429+}
430+
431+// for testing
432+func SwapTimeouts(newTimeouts []time.Duration) (oldTimeouts []time.Duration) {
433+ trwlock.Lock()
434+ defer trwlock.Unlock()
435+ oldTimeouts, timeouts = timeouts, newTimeouts
436+ return
437+}
438+
439 // Jitter returns a random time.Duration somewhere in [-spread, spread].
440 //
441 // This is meant as a default implementation for Dialers to use if wanted.
442@@ -60,15 +75,16 @@
443 func (ar *AutoRetrier) Retry() uint32 {
444 var timeout time.Duration
445 var dialAttempts uint32 = 0 // unsigned so it can wrap safely ...
446- var numTimeouts uint32 = uint32(len(Timeouts))
447+ timeouts := Timeouts()
448+ var numTimeouts uint32 = uint32(len(timeouts))
449 for {
450 if ar.Dial() == nil {
451 return dialAttempts + 1
452 }
453 if dialAttempts < numTimeouts {
454- timeout = Timeouts[dialAttempts]
455+ timeout = timeouts[dialAttempts]
456 } else {
457- timeout = Timeouts[numTimeouts-1]
458+ timeout = timeouts[numTimeouts-1]
459 }
460 timeout += ar.Jitter(timeout)
461 dialAttempts++
462@@ -90,10 +106,10 @@
463
464 func init() {
465 ps := []int{1, 2, 5, 11, 19, 37, 67, 113, 191} // 3 pₙ₊₁ ≥ 5 pₙ
466- Timeouts = make([]time.Duration, len(ps))
467+ timeouts := make([]time.Duration, len(ps))
468 for i, n := range ps {
469- Timeouts[i] = time.Duration(n) * time.Second
470+ timeouts[i] = time.Duration(n) * time.Second
471 }
472-
473+ SwapTimeouts(timeouts)
474 rand.Seed(time.Now().Unix()) // good enough for us (not crypto, yadda)
475 }
476
477=== modified file 'util/redialer_test.go'
478--- util/redialer_test.go 2014-02-03 18:17:27 +0000
479+++ util/redialer_test.go 2014-02-04 13:39:58 +0000
480@@ -38,12 +38,11 @@
481 var _ = Suite(&RedialerSuite{})
482
483 func (s *RedialerSuite) SetUpSuite(c *C) {
484- s.timeouts = Timeouts
485- Timeouts = []time.Duration{0, 0}
486+ s.timeouts = SwapTimeouts([]time.Duration{0, 0})
487 }
488
489 func (s *RedialerSuite) TearDownSuite(c *C) {
490- Timeouts = s.timeouts
491+ SwapTimeouts(s.timeouts)
492 s.timeouts = nil
493 }
494

Subscribers

People subscribed via source and target branches