Merge lp:~chipaca/ubuntu-push/client-v0-p4 into lp:ubuntu-push
- client-v0-p4
- Merge into trunk
Proposed by
John Lenton
Status: | Superseded |
---|---|
Proposed branch: | lp:~chipaca/ubuntu-push/client-v0-p4 |
Merge into: | lp:ubuntu-push |
Prerequisite: | lp:~chipaca/ubuntu-push/client-v0-p3 |
Diff against target: |
449 lines (+155/-10) 4 files modified
client/client.go (+52/-10) client/client_test.go (+55/-0) client/session/session.go (+27/-0) client/session/session_test.go (+21/-0) |
To merge this branch: | bzr merge lp:~chipaca/ubuntu-push/client-v0-p4 |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Push Hackers | Pending | ||
Review via email: mp+204547@code.launchpad.net |
This proposal has been superseded by a proposal from 2014-02-03.
Commit message
part 4: handling connectivity events
Description of the change
Part 4: handling connectivity events.
Also, added State to client/session, to aid in testing some aspects of
this.
To post a comment you must log in.
- 46. By John Lenton
-
[r=pedronis] part 3: setting up the bus
- 47. By John Lenton
-
gave client/session a State, so that it can be queried from tests
- 48. By John Lenton
-
fixed races using cannons
- 49. By John Lenton
-
merged trunk
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'client/client.go' |
2 | --- client/client.go 2014-02-03 16:14:16 +0000 |
3 | +++ client/client.go 2014-02-03 18:29:58 +0000 |
4 | @@ -27,6 +27,7 @@ |
5 | "launchpad.net/ubuntu-push/bus/networkmanager" |
6 | "launchpad.net/ubuntu-push/bus/notifications" |
7 | "launchpad.net/ubuntu-push/bus/urldispatcher" |
8 | + "launchpad.net/ubuntu-push/client/session" |
9 | "launchpad.net/ubuntu-push/config" |
10 | "launchpad.net/ubuntu-push/logger" |
11 | "launchpad.net/ubuntu-push/util" |
12 | @@ -47,16 +48,19 @@ |
13 | |
14 | // Client is the Ubuntu Push Notifications client-side daemon. |
15 | type Client struct { |
16 | - config ClientConfig |
17 | - log logger.Logger |
18 | - pem []byte |
19 | - idder identifier.Id |
20 | - deviceId string |
21 | - notificationsEndp bus.Endpoint |
22 | - urlDispatcherEndp bus.Endpoint |
23 | - connectivityEndp bus.Endpoint |
24 | - connCh chan bool |
25 | - actionsCh <-chan notifications.RawActionReply |
26 | + config ClientConfig |
27 | + log logger.Logger |
28 | + pem []byte |
29 | + idder identifier.Id |
30 | + deviceId string |
31 | + notificationsEndp bus.Endpoint |
32 | + urlDispatcherEndp bus.Endpoint |
33 | + connectivityEndp bus.Endpoint |
34 | + connCh chan bool |
35 | + connState bool |
36 | + actionsCh <-chan notifications.RawActionReply |
37 | + session *session.ClientSession |
38 | + sessionRetrierStopper chan bool |
39 | } |
40 | |
41 | // Configure loads the configuration specified in configPath, and sets it up. |
42 | @@ -119,3 +123,41 @@ |
43 | client.actionsCh = actionsCh |
44 | return err |
45 | } |
46 | + |
47 | +// handleConnState deals with connectivity events |
48 | +func (client *Client) handleConnState(connState bool) { |
49 | + if client.connState == connState { |
50 | + // nothing to do! |
51 | + return |
52 | + } |
53 | + client.connState = connState |
54 | + if client.session == nil { |
55 | + sess, err := session.NewSession(string(client.config.Addr), client.pem, |
56 | + client.config.ExchangeTimeout.Duration, client.deviceId, client.log) |
57 | + if err != nil { |
58 | + panic("Don't know how to handle session creation failure.") |
59 | + } |
60 | + client.session = sess |
61 | + } |
62 | + if connState { |
63 | + // connected |
64 | + if client.sessionRetrierStopper != nil { |
65 | + client.sessionRetrierStopper <- true |
66 | + client.sessionRetrierStopper = nil |
67 | + } |
68 | + ar := &util.AutoRetrier{ |
69 | + make(chan bool, 1), |
70 | + client.session.Dial, |
71 | + util.Jitter} |
72 | + client.sessionRetrierStopper = ar.Stop |
73 | + go ar.AutoRetry() |
74 | + } else { |
75 | + // disconnected |
76 | + if client.sessionRetrierStopper != nil { |
77 | + client.sessionRetrierStopper <- true |
78 | + client.sessionRetrierStopper = nil |
79 | + } else { |
80 | + client.session.Close() |
81 | + } |
82 | + } |
83 | +} |
84 | |
85 | === modified file 'client/client_test.go' |
86 | --- client/client_test.go 2014-02-03 18:17:27 +0000 |
87 | +++ client/client_test.go 2014-02-03 18:29:58 +0000 |
88 | @@ -22,6 +22,7 @@ |
89 | . "launchpad.net/gocheck" |
90 | "launchpad.net/ubuntu-push/bus/networkmanager" |
91 | testibus "launchpad.net/ubuntu-push/bus/testing" |
92 | + "launchpad.net/ubuntu-push/client/session" |
93 | "launchpad.net/ubuntu-push/logger" |
94 | helpers "launchpad.net/ubuntu-push/testing" |
95 | "launchpad.net/ubuntu-push/testing/condition" |
96 | @@ -274,3 +275,57 @@ |
97 | c.Check(cli.takeTheBus(), NotNil) |
98 | c.Check(cli.actionsCh, IsNil) |
99 | } |
100 | + |
101 | +/***************************************************************** |
102 | + handleConnState tests |
103 | +******************************************************************/ |
104 | + |
105 | +func (cs *clientSuite) TestHandleConnStateD2C(c *C) { |
106 | + cli := new(Client) |
107 | + |
108 | + // let's pretend the client had a previous attempt at connecting still pending |
109 | + // (hard to trigger in real life, but possible) |
110 | + cli.sessionRetrierStopper = make(chan bool, 1) |
111 | + |
112 | + c.Assert(cli.connState, Equals, false) |
113 | + cli.handleConnState(true) |
114 | + c.Check(cli.connState, Equals, true) |
115 | + c.Assert(cli.session, NotNil) |
116 | + c.Check(cli.session.State, Equals, session.Disconnected) |
117 | +} |
118 | + |
119 | +func (cs *clientSuite) TestHandleConnStateSame(c *C) { |
120 | + cli := new(Client) |
121 | + // here we want to check that we don't do anything |
122 | + c.Assert(cli.session, IsNil) |
123 | + c.Assert(cli.connState, Equals, false) |
124 | + cli.handleConnState(false) |
125 | + c.Check(cli.session, IsNil) |
126 | + |
127 | + cli.connState = true |
128 | + cli.handleConnState(true) |
129 | + c.Check(cli.session, IsNil) |
130 | +} |
131 | + |
132 | +func (cs *clientSuite) TestHandleConnStateC2D(c *C) { |
133 | + cli := new(Client) |
134 | + cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, noisylog) |
135 | + cli.session.Dial() |
136 | + cli.connState = true |
137 | + |
138 | + // cli.session.State will be "Error" here, for now at least |
139 | + c.Check(cli.session.State, Not(Equals), session.Disconnected) |
140 | + cli.handleConnState(false) |
141 | + c.Check(cli.session.State, Equals, session.Disconnected) |
142 | +} |
143 | + |
144 | +func (cs *clientSuite) TestHandleConnStateC2DPending(c *C) { |
145 | + cli := new(Client) |
146 | + cli.session, _ = session.NewSession(string(cli.config.Addr), cli.pem, cli.config.ExchangeTimeout.Duration, cli.deviceId, noisylog) |
147 | + cli.sessionRetrierStopper = make(chan bool, 1) |
148 | + cli.connState = true |
149 | + |
150 | + cli.handleConnState(false) |
151 | + c.Check(cli.session.State, Equals, session.Disconnected) |
152 | + c.Check(cli.sessionRetrierStopper, IsNil) |
153 | +} |
154 | |
155 | === modified file 'client/session/session.go' |
156 | --- client/session/session.go 2014-02-01 21:04:32 +0000 |
157 | +++ client/session/session.go 2014-02-03 18:29:58 +0000 |
158 | @@ -42,6 +42,17 @@ |
159 | protocol.NotificationsMsg |
160 | } |
161 | |
162 | +// ClientSessionState is a way to broadly track the progress of the session |
163 | +type ClientSessionState uint8 |
164 | + |
165 | +const ( |
166 | + Error ClientSessionState = iota |
167 | + Disconnected |
168 | + Connected |
169 | + Started |
170 | + Running |
171 | +) |
172 | + |
173 | // ClienSession holds a client<->server session and its configuration. |
174 | type ClientSession struct { |
175 | // configuration |
176 | @@ -57,6 +68,7 @@ |
177 | proto protocol.Protocol |
178 | pingInterval time.Duration |
179 | // status |
180 | + State ClientSessionState |
181 | ErrCh chan error |
182 | MsgCh chan *Notification |
183 | } |
184 | @@ -71,6 +83,7 @@ |
185 | Protocolator: protocol.NewProtocol0, |
186 | Levels: levelmap.NewLevelMap(), |
187 | TLS: &tls.Config{InsecureSkipVerify: true}, // XXX |
188 | + State: Disconnected, |
189 | } |
190 | if pem != nil { |
191 | cp := x509.NewCertPool() |
192 | @@ -88,9 +101,11 @@ |
193 | func (sess *ClientSession) connect() error { |
194 | conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout) |
195 | if err != nil { |
196 | + sess.State = Error |
197 | return fmt.Errorf("connect: %s", err) |
198 | } |
199 | sess.Connection = tls.Client(conn, sess.TLS) |
200 | + sess.State = Connected |
201 | return nil |
202 | } |
203 | |
204 | @@ -102,6 +117,7 @@ |
205 | // you could do to recover at this stage). |
206 | sess.Connection = nil |
207 | } |
208 | + sess.State = Disconnected |
209 | } |
210 | |
211 | // handle "ping" messages |
212 | @@ -110,6 +126,7 @@ |
213 | if err == nil { |
214 | sess.Log.Debugf("ping.") |
215 | } else { |
216 | + sess.State = Error |
217 | sess.Log.Errorf("unable to pong: %s", err) |
218 | } |
219 | return err |
220 | @@ -119,6 +136,7 @@ |
221 | func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error { |
222 | err := sess.proto.WriteMessage(protocol.AckMsg{"ack"}) |
223 | if err != nil { |
224 | + sess.State = Error |
225 | return err |
226 | } |
227 | sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s", |
228 | @@ -137,11 +155,13 @@ |
229 | func (sess *ClientSession) loop() error { |
230 | var err error |
231 | var recv serverMsg |
232 | + sess.State = Running |
233 | for { |
234 | deadAfter := sess.pingInterval + sess.ExchangeTimeout |
235 | sess.proto.SetDeadline(time.Now().Add(deadAfter)) |
236 | err = sess.proto.ReadMessage(&recv) |
237 | if err != nil { |
238 | + sess.State = Error |
239 | return err |
240 | } |
241 | switch recv.Type { |
242 | @@ -161,12 +181,14 @@ |
243 | conn := sess.Connection |
244 | err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout)) |
245 | if err != nil { |
246 | + sess.State = Error |
247 | return err |
248 | } |
249 | _, err = conn.Write(wireVersionBytes) |
250 | // The Writer docs: Write must return a non-nil error if it returns |
251 | // n < len(p). So, no need to check number of bytes written, hooray. |
252 | if err != nil { |
253 | + sess.State = Error |
254 | return err |
255 | } |
256 | proto := sess.Protocolator(conn) |
257 | @@ -177,23 +199,28 @@ |
258 | Levels: sess.Levels.GetAll(), |
259 | }) |
260 | if err != nil { |
261 | + sess.State = Error |
262 | return err |
263 | } |
264 | var connAck protocol.ConnAckMsg |
265 | err = proto.ReadMessage(&connAck) |
266 | if err != nil { |
267 | + sess.State = Error |
268 | return err |
269 | } |
270 | if connAck.Type != "connack" { |
271 | + sess.State = Error |
272 | return fmt.Errorf("expecting CONNACK, got %#v", connAck.Type) |
273 | } |
274 | pingInterval, err := time.ParseDuration(connAck.Params.PingInterval) |
275 | if err != nil { |
276 | + sess.State = Error |
277 | return err |
278 | } |
279 | sess.proto = proto |
280 | sess.pingInterval = pingInterval |
281 | sess.Log.Debugf("Connected %v.", conn.LocalAddr()) |
282 | + sess.State = Started |
283 | return nil |
284 | } |
285 | |
286 | |
287 | === modified file 'client/session/session_test.go' |
288 | --- client/session/session_test.go 2014-02-01 21:04:32 +0000 |
289 | +++ client/session/session_test.go 2014-02-03 18:29:58 +0000 |
290 | @@ -168,6 +168,7 @@ |
291 | c.Check(err, IsNil) |
292 | // but no root CAs set |
293 | c.Check(sess.TLS.RootCAs, IsNil) |
294 | + c.Check(sess.State, Equals, Disconnected) |
295 | } |
296 | |
297 | var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert") |
298 | @@ -196,6 +197,7 @@ |
299 | c.Assert(err, IsNil) |
300 | err = sess.connect() |
301 | c.Check(err, ErrorMatches, ".*connect.*address.*") |
302 | + c.Check(sess.State, Equals, Error) |
303 | } |
304 | |
305 | func (cs *clientSessionSuite) TestConnectConnects(c *C) { |
306 | @@ -207,6 +209,7 @@ |
307 | err = sess.connect() |
308 | c.Check(err, IsNil) |
309 | c.Check(sess.Connection, NotNil) |
310 | + c.Check(sess.State, Equals, Connected) |
311 | } |
312 | |
313 | func (cs *clientSessionSuite) TestConnectConnectFail(c *C) { |
314 | @@ -217,6 +220,7 @@ |
315 | c.Assert(err, IsNil) |
316 | err = sess.connect() |
317 | c.Check(err, ErrorMatches, ".*connection refused") |
318 | + c.Check(sess.State, Equals, Error) |
319 | } |
320 | |
321 | /**************************************************************** |
322 | @@ -229,6 +233,7 @@ |
323 | sess.Connection = &testConn{Name: "TestClose"} |
324 | sess.Close() |
325 | c.Check(sess.Connection, IsNil) |
326 | + c.Check(sess.State, Equals, Disconnected) |
327 | } |
328 | |
329 | func (cs *clientSessionSuite) TestCloseTwice(c *C) { |
330 | @@ -239,6 +244,7 @@ |
331 | c.Check(sess.Connection, IsNil) |
332 | sess.Close() |
333 | c.Check(sess.Connection, IsNil) |
334 | + c.Check(sess.State, Equals, Disconnected) |
335 | } |
336 | |
337 | func (cs *clientSessionSuite) TestCloseFails(c *C) { |
338 | @@ -247,6 +253,7 @@ |
339 | sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)} |
340 | sess.Close() |
341 | c.Check(sess.Connection, IsNil) // nothing you can do to clean up anyway |
342 | + c.Check(sess.State, Equals, Disconnected) |
343 | } |
344 | |
345 | /**************************************************************** |
346 | @@ -289,6 +296,7 @@ |
347 | c.Check(s.sess.handlePing(), Equals, failure) |
348 | c.Assert(len(s.downCh), Equals, 1) |
349 | c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"}) |
350 | + c.Check(s.sess.State, Equals, Error) |
351 | } |
352 | |
353 | /**************************************************************** |
354 | @@ -328,6 +336,7 @@ |
355 | failure := errors.New("ACK ACK ACK") |
356 | s.upCh <- failure |
357 | c.Assert(<-s.errCh, Equals, failure) |
358 | + c.Check(s.sess.State, Equals, Error) |
359 | } |
360 | |
361 | func (s *msgSuite) TestHandleBroadcastWrongChannel(c *C) { |
362 | @@ -363,12 +372,15 @@ |
363 | } |
364 | |
365 | func (s *loopSuite) TestLoopReadError(c *C) { |
366 | + c.Check(s.sess.State, Equals, Running) |
367 | s.upCh <- errors.New("Read") |
368 | err := <-s.errCh |
369 | c.Check(err, ErrorMatches, "Read") |
370 | + c.Check(s.sess.State, Equals, Error) |
371 | } |
372 | |
373 | func (s *loopSuite) TestLoopPing(c *C) { |
374 | + c.Check(s.sess.State, Equals, Running) |
375 | c.Check(takeNext(s.downCh), Equals, "deadline 1ms") |
376 | s.upCh <- protocol.PingPongMsg{Type: "ping"} |
377 | c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"}) |
378 | @@ -378,6 +390,7 @@ |
379 | } |
380 | |
381 | func (s *loopSuite) TestLoopLoopsDaLoop(c *C) { |
382 | + c.Check(s.sess.State, Equals, Running) |
383 | for i := 1; i < 10; i++ { |
384 | c.Check(takeNext(s.downCh), Equals, "deadline 1ms") |
385 | s.upCh <- protocol.PingPongMsg{Type: "ping"} |
386 | @@ -390,6 +403,7 @@ |
387 | } |
388 | |
389 | func (s *loopSuite) TestLoopBroadcast(c *C) { |
390 | + c.Check(s.sess.State, Equals, Running) |
391 | b := &protocol.BroadcastMsg{ |
392 | Type: "broadcast", |
393 | AppId: "--ignored--", |
394 | @@ -415,6 +429,7 @@ |
395 | DeadlineCondition: condition.Work(false)} // setdeadline will fail |
396 | err = sess.start() |
397 | c.Check(err, ErrorMatches, ".*deadline.*") |
398 | + c.Check(sess.State, Equals, Error) |
399 | } |
400 | |
401 | func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) { |
402 | @@ -424,6 +439,7 @@ |
403 | WriteCondition: condition.Work(false)} // write will fail |
404 | err = sess.start() |
405 | c.Check(err, ErrorMatches, ".*write.*") |
406 | + c.Check(sess.State, Equals, Error) |
407 | } |
408 | |
409 | func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) { |
410 | @@ -449,6 +465,7 @@ |
411 | upCh <- errors.New("Overflow error in /dev/null") |
412 | err = <-errCh |
413 | c.Check(err, ErrorMatches, "Overflow.*null") |
414 | + c.Check(sess.State, Equals, Error) |
415 | } |
416 | |
417 | func (cs *clientSessionSuite) TestStartConnackReadError(c *C) { |
418 | @@ -472,6 +489,7 @@ |
419 | upCh <- io.EOF |
420 | err = <-errCh |
421 | c.Check(err, ErrorMatches, ".*EOF.*") |
422 | + c.Check(sess.State, Equals, Error) |
423 | } |
424 | |
425 | func (cs *clientSessionSuite) TestStartBadConnack(c *C) { |
426 | @@ -495,6 +513,7 @@ |
427 | upCh <- protocol.ConnAckMsg{Type: "connack"} |
428 | err = <-errCh |
429 | c.Check(err, ErrorMatches, ".*invalid.*") |
430 | + c.Check(sess.State, Equals, Error) |
431 | } |
432 | |
433 | func (cs *clientSessionSuite) TestStartNotConnack(c *C) { |
434 | @@ -518,6 +537,7 @@ |
435 | upCh <- protocol.ConnAckMsg{Type: "connnak"} |
436 | err = <-errCh |
437 | c.Check(err, ErrorMatches, ".*CONNACK.*") |
438 | + c.Check(sess.State, Equals, Error) |
439 | } |
440 | |
441 | func (cs *clientSessionSuite) TestStartWorks(c *C) { |
442 | @@ -545,6 +565,7 @@ |
443 | // start is now done. |
444 | err = <-errCh |
445 | c.Check(err, IsNil) |
446 | + c.Check(sess.State, Equals, Started) |
447 | } |
448 | |
449 | /**************************************************************** |