Merge lp:~chipaca/ubuntu-push/client-v0-p4 into lp:ubuntu-push

Proposed by John Lenton
Status: Superseded
Proposed branch: lp:~chipaca/ubuntu-push/client-v0-p4
Merge into: lp:ubuntu-push
Prerequisite: lp:~chipaca/ubuntu-push/client-v0-p3
Diff against target: 449 lines (+155/-10)
4 files modified
client/client.go (+52/-10)
client/client_test.go (+55/-0)
client/session/session.go (+27/-0)
client/session/session_test.go (+21/-0)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/client-v0-p4
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+204547@code.launchpad.net

This proposal has been superseded by a proposal from 2014-02-03.

Commit message

part 4: handling connectivity events

Description of the change

Part 4: handling connectivity events.

Also, added State to client/session, to aid in testing some aspects of
this.

To post a comment you must log in.
lp:~chipaca/ubuntu-push/client-v0-p4 updated
46. By John Lenton

[r=pedronis] part 3: setting up the bus

47. By John Lenton

gave client/session a State, so that it can be queried from tests

48. By John Lenton

fixed races using cannons

49. By John Lenton

merged trunk

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'client/client.go'
2--- client/client.go 2014-02-03 16:14:16 +0000
3+++ client/client.go 2014-02-03 18:29:58 +0000
4@@ -27,6 +27,7 @@
5 "launchpad.net/ubuntu-push/bus/networkmanager"
6 "launchpad.net/ubuntu-push/bus/notifications"
7 "launchpad.net/ubuntu-push/bus/urldispatcher"
8+ "launchpad.net/ubuntu-push/client/session"
9 "launchpad.net/ubuntu-push/config"
10 "launchpad.net/ubuntu-push/logger"
11 "launchpad.net/ubuntu-push/util"
12@@ -47,16 +48,19 @@
13
14 // Client is the Ubuntu Push Notifications client-side daemon.
15 type Client struct {
16- config ClientConfig
17- log logger.Logger
18- pem []byte
19- idder identifier.Id
20- deviceId string
21- notificationsEndp bus.Endpoint
22- urlDispatcherEndp bus.Endpoint
23- connectivityEndp bus.Endpoint
24- connCh chan bool
25- actionsCh <-chan notifications.RawActionReply
26+ config ClientConfig
27+ log logger.Logger
28+ pem []byte
29+ idder identifier.Id
30+ deviceId string
31+ notificationsEndp bus.Endpoint
32+ urlDispatcherEndp bus.Endpoint
33+ connectivityEndp bus.Endpoint
34+ connCh chan bool
35+ connState bool
36+ actionsCh <-chan notifications.RawActionReply
37+ session *session.ClientSession
38+ sessionRetrierStopper chan bool
39 }
40
41 // Configure loads the configuration specified in configPath, and sets it up.
42@@ -119,3 +123,41 @@
43 client.actionsCh = actionsCh
44 return err
45 }
46+
47+// handleConnState deals with connectivity events
48+func (client *Client) handleConnState(connState bool) {
49+ if client.connState == connState {
50+ // nothing to do!
51+ return
52+ }
53+ client.connState = connState
54+ if client.session == nil {
55+ sess, err := session.NewSession(string(client.config.Addr), client.pem,
56+ client.config.ExchangeTimeout.Duration, client.deviceId, client.log)
57+ if err != nil {
58+ panic("Don't know how to handle session creation failure.")
59+ }
60+ client.session = sess
61+ }
62+ if connState {
63+ // connected
64+ if client.sessionRetrierStopper != nil {
65+ client.sessionRetrierStopper <- true
66+ client.sessionRetrierStopper = nil
67+ }
68+ ar := &util.AutoRetrier{
69+ make(chan bool, 1),
70+ client.session.Dial,
71+ util.Jitter}
72+ client.sessionRetrierStopper = ar.Stop
73+ go ar.AutoRetry()
74+ } else {
75+ // disconnected
76+ if client.sessionRetrierStopper != nil {
77+ client.sessionRetrierStopper <- true
78+ client.sessionRetrierStopper = nil
79+ } else {
80+ client.session.Close()
81+ }
82+ }
83+}
84
85=== modified file 'client/client_test.go'
86--- client/client_test.go 2014-02-03 18:17:27 +0000
87+++ client/client_test.go 2014-02-03 18:29:58 +0000
88@@ -22,6 +22,7 @@
89 . "launchpad.net/gocheck"
90 "launchpad.net/ubuntu-push/bus/networkmanager"
91 testibus "launchpad.net/ubuntu-push/bus/testing"
92+ "launchpad.net/ubuntu-push/client/session"
93 "launchpad.net/ubuntu-push/logger"
94 helpers "launchpad.net/ubuntu-push/testing"
95 "launchpad.net/ubuntu-push/testing/condition"
96@@ -274,3 +275,57 @@
97 c.Check(cli.takeTheBus(), NotNil)
98 c.Check(cli.actionsCh, IsNil)
99 }
100+
101+/*****************************************************************
102+ handleConnState tests
103+******************************************************************/
104+
105+func (cs *clientSuite) TestHandleConnStateD2C(c *C) {
106+ cli := new(Client)
107+
108+ // let's pretend the client had a previous attempt at connecting still pending
109+ // (hard to trigger in real life, but possible)
110+ cli.sessionRetrierStopper = make(chan bool, 1)
111+
112+ c.Assert(cli.connState, Equals, false)
113+ cli.handleConnState(true)
114+ c.Check(cli.connState, Equals, true)
115+ c.Assert(cli.session, NotNil)
116+ c.Check(cli.session.State, Equals, session.Disconnected)
117+}
118+
119+func (cs *clientSuite) TestHandleConnStateSame(c *C) {
120+ cli := new(Client)
121+ // here we want to check that we don't do anything
122+ c.Assert(cli.session, IsNil)
123+ c.Assert(cli.connState, Equals, false)
124+ cli.handleConnState(false)
125+ c.Check(cli.session, IsNil)
126+
127+ cli.connState = true
128+ cli.handleConnState(true)
129+ c.Check(cli.session, IsNil)
130+}
131+
132+func (cs *clientSuite) TestHandleConnStateC2D(c *C) {
133+ cli := new(Client)
134+ cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, noisylog)
135+ cli.session.Dial()
136+ cli.connState = true
137+
138+ // cli.session.State will be "Error" here, for now at least
139+ c.Check(cli.session.State, Not(Equals), session.Disconnected)
140+ cli.handleConnState(false)
141+ c.Check(cli.session.State, Equals, session.Disconnected)
142+}
143+
144+func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) {
145+ cli := new(Client)
146+ cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, noisylog)
147+ cli.sessionRetrierStopper = make(chan bool, 1)
148+ cli.connState = true
149+
150+ cli.handleConnState(false)
151+ c.Check(cli.session.State, Equals, session.Disconnected)
152+ c.Check(cli.sessionRetrierStopper, IsNil)
153+}
154
155=== modified file 'client/session/session.go'
156--- client/session/session.go 2014-02-01 21:04:32 +0000
157+++ client/session/session.go 2014-02-03 18:29:58 +0000
158@@ -42,6 +42,17 @@
159 protocol.NotificationsMsg
160 }
161
162+// ClientSessionState is a way to broadly track the progress of the session
163+type ClientSessionState uint8
164+
165+const (
166+ Error ClientSessionState = iota
167+ Disconnected
168+ Connected
169+ Started
170+ Running
171+)
172+
173 // ClienSession holds a client<->server session and its configuration.
174 type ClientSession struct {
175 // configuration
176@@ -57,6 +68,7 @@
177 proto protocol.Protocol
178 pingInterval time.Duration
179 // status
180+ State ClientSessionState
181 ErrCh chan error
182 MsgCh chan *Notification
183 }
184@@ -71,6 +83,7 @@
185 Protocolator: protocol.NewProtocol0,
186 Levels: levelmap.NewLevelMap(),
187 TLS: &tls.Config{InsecureSkipVerify: true}, // XXX
188+ State: Disconnected,
189 }
190 if pem != nil {
191 cp := x509.NewCertPool()
192@@ -88,9 +101,11 @@
193 func (sess *ClientSession) connect() error {
194 conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout)
195 if err != nil {
196+ sess.State = Error
197 return fmt.Errorf("connect: %s", err)
198 }
199 sess.Connection = tls.Client(conn, sess.TLS)
200+ sess.State = Connected
201 return nil
202 }
203
204@@ -102,6 +117,7 @@
205 // you could do to recover at this stage).
206 sess.Connection = nil
207 }
208+ sess.State = Disconnected
209 }
210
211 // handle "ping" messages
212@@ -110,6 +126,7 @@
213 if err == nil {
214 sess.Log.Debugf("ping.")
215 } else {
216+ sess.State = Error
217 sess.Log.Errorf("unable to pong: %s", err)
218 }
219 return err
220@@ -119,6 +136,7 @@
221 func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
222 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})
223 if err != nil {
224+ sess.State = Error
225 return err
226 }
227 sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s",
228@@ -137,11 +155,13 @@
229 func (sess *ClientSession) loop() error {
230 var err error
231 var recv serverMsg
232+ sess.State = Running
233 for {
234 deadAfter := sess.pingInterval + sess.ExchangeTimeout
235 sess.proto.SetDeadline(time.Now().Add(deadAfter))
236 err = sess.proto.ReadMessage(&recv)
237 if err != nil {
238+ sess.State = Error
239 return err
240 }
241 switch recv.Type {
242@@ -161,12 +181,14 @@
243 conn := sess.Connection
244 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
245 if err != nil {
246+ sess.State = Error
247 return err
248 }
249 _, err = conn.Write(wireVersionBytes)
250 // The Writer docs: Write must return a non-nil error if it returns
251 // n < len(p). So, no need to check number of bytes written, hooray.
252 if err != nil {
253+ sess.State = Error
254 return err
255 }
256 proto := sess.Protocolator(conn)
257@@ -177,23 +199,28 @@
258 Levels: sess.Levels.GetAll(),
259 })
260 if err != nil {
261+ sess.State = Error
262 return err
263 }
264 var connAck protocol.ConnAckMsg
265 err = proto.ReadMessage(&connAck)
266 if err != nil {
267+ sess.State = Error
268 return err
269 }
270 if connAck.Type != "connack" {
271+ sess.State = Error
272 return fmt.Errorf("expecting CONNACK, got %#v", connAck.Type)
273 }
274 pingInterval, err := time.ParseDuration(connAck.Params.PingInterval)
275 if err != nil {
276+ sess.State = Error
277 return err
278 }
279 sess.proto = proto
280 sess.pingInterval = pingInterval
281 sess.Log.Debugf("Connected %v.", conn.LocalAddr())
282+ sess.State = Started
283 return nil
284 }
285
286
287=== modified file 'client/session/session_test.go'
288--- client/session/session_test.go 2014-02-01 21:04:32 +0000
289+++ client/session/session_test.go 2014-02-03 18:29:58 +0000
290@@ -168,6 +168,7 @@
291 c.Check(err, IsNil)
292 // but no root CAs set
293 c.Check(sess.TLS.RootCAs, IsNil)
294+ c.Check(sess.State, Equals, Disconnected)
295 }
296
297 var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert")
298@@ -196,6 +197,7 @@
299 c.Assert(err, IsNil)
300 err = sess.connect()
301 c.Check(err, ErrorMatches, ".*connect.*address.*")
302+ c.Check(sess.State, Equals, Error)
303 }
304
305 func (cs *clientSessionSuite) TestConnectConnects(c *C) {
306@@ -207,6 +209,7 @@
307 err = sess.connect()
308 c.Check(err, IsNil)
309 c.Check(sess.Connection, NotNil)
310+ c.Check(sess.State, Equals, Connected)
311 }
312
313 func (cs *clientSessionSuite) TestConnectConnectFail(c *C) {
314@@ -217,6 +220,7 @@
315 c.Assert(err, IsNil)
316 err = sess.connect()
317 c.Check(err, ErrorMatches, ".*connection refused")
318+ c.Check(sess.State, Equals, Error)
319 }
320
321 /****************************************************************
322@@ -229,6 +233,7 @@
323 sess.Connection = &testConn{Name: "TestClose"}
324 sess.Close()
325 c.Check(sess.Connection, IsNil)
326+ c.Check(sess.State, Equals, Disconnected)
327 }
328
329 func (cs *clientSessionSuite) TestCloseTwice(c *C) {
330@@ -239,6 +244,7 @@
331 c.Check(sess.Connection, IsNil)
332 sess.Close()
333 c.Check(sess.Connection, IsNil)
334+ c.Check(sess.State, Equals, Disconnected)
335 }
336
337 func (cs *clientSessionSuite) TestCloseFails(c *C) {
338@@ -247,6 +253,7 @@
339 sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)}
340 sess.Close()
341 c.Check(sess.Connection, IsNil) // nothing you can do to clean up anyway
342+ c.Check(sess.State, Equals, Disconnected)
343 }
344
345 /****************************************************************
346@@ -289,6 +296,7 @@
347 c.Check(s.sess.handlePing(), Equals, failure)
348 c.Assert(len(s.downCh), Equals, 1)
349 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})
350+ c.Check(s.sess.State, Equals, Error)
351 }
352
353 /****************************************************************
354@@ -328,6 +336,7 @@
355 failure := errors.New("ACK ACK ACK")
356 s.upCh <- failure
357 c.Assert(<-s.errCh, Equals, failure)
358+ c.Check(s.sess.State, Equals, Error)
359 }
360
361 func (s *msgSuite) TestHandleBroadcastWrongChannel(c *C) {
362@@ -363,12 +372,15 @@
363 }
364
365 func (s *loopSuite) TestLoopReadError(c *C) {
366+ c.Check(s.sess.State, Equals, Running)
367 s.upCh <- errors.New("Read")
368 err := <-s.errCh
369 c.Check(err, ErrorMatches, "Read")
370+ c.Check(s.sess.State, Equals, Error)
371 }
372
373 func (s *loopSuite) TestLoopPing(c *C) {
374+ c.Check(s.sess.State, Equals, Running)
375 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
376 s.upCh <- protocol.PingPongMsg{Type: "ping"}
377 c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
378@@ -378,6 +390,7 @@
379 }
380
381 func (s *loopSuite) TestLoopLoopsDaLoop(c *C) {
382+ c.Check(s.sess.State, Equals, Running)
383 for i := 1; i < 10; i++ {
384 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
385 s.upCh <- protocol.PingPongMsg{Type: "ping"}
386@@ -390,6 +403,7 @@
387 }
388
389 func (s *loopSuite) TestLoopBroadcast(c *C) {
390+ c.Check(s.sess.State, Equals, Running)
391 b := &protocol.BroadcastMsg{
392 Type: "broadcast",
393 AppId: "--ignored--",
394@@ -415,6 +429,7 @@
395 DeadlineCondition: condition.Work(false)} // setdeadline will fail
396 err = sess.start()
397 c.Check(err, ErrorMatches, ".*deadline.*")
398+ c.Check(sess.State, Equals, Error)
399 }
400
401 func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) {
402@@ -424,6 +439,7 @@
403 WriteCondition: condition.Work(false)} // write will fail
404 err = sess.start()
405 c.Check(err, ErrorMatches, ".*write.*")
406+ c.Check(sess.State, Equals, Error)
407 }
408
409 func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) {
410@@ -449,6 +465,7 @@
411 upCh <- errors.New("Overflow error in /dev/null")
412 err = <-errCh
413 c.Check(err, ErrorMatches, "Overflow.*null")
414+ c.Check(sess.State, Equals, Error)
415 }
416
417 func (cs *clientSessionSuite) TestStartConnackReadError(c *C) {
418@@ -472,6 +489,7 @@
419 upCh <- io.EOF
420 err = <-errCh
421 c.Check(err, ErrorMatches, ".*EOF.*")
422+ c.Check(sess.State, Equals, Error)
423 }
424
425 func (cs *clientSessionSuite) TestStartBadConnack(c *C) {
426@@ -495,6 +513,7 @@
427 upCh <- protocol.ConnAckMsg{Type: "connack"}
428 err = <-errCh
429 c.Check(err, ErrorMatches, ".*invalid.*")
430+ c.Check(sess.State, Equals, Error)
431 }
432
433 func (cs *clientSessionSuite) TestStartNotConnack(c *C) {
434@@ -518,6 +537,7 @@
435 upCh <- protocol.ConnAckMsg{Type: "connnak"}
436 err = <-errCh
437 c.Check(err, ErrorMatches, ".*CONNACK.*")
438+ c.Check(sess.State, Equals, Error)
439 }
440
441 func (cs *clientSessionSuite) TestStartWorks(c *C) {
442@@ -545,6 +565,7 @@
443 // start is now done.
444 err = <-errCh
445 c.Check(err, IsNil)
446+ c.Check(sess.State, Equals, Started)
447 }
448
449 /****************************************************************

Subscribers

People subscribed via source and target branches