Merge lp:~chipaca/ubuntu-push/client-v0-p4 into lp:ubuntu-push

Proposed by John Lenton
Status: Superseded
Proposed branch: lp:~chipaca/ubuntu-push/client-v0-p4
Merge into: lp:ubuntu-push
Diff against target: 295 lines (+48/-0)
2 files modified
client/session/session.go (+27/-0)
client/session/session_test.go (+21/-0)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/client-v0-p4
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+204552@code.launchpad.net

This proposal supersedes a proposal from 2014-02-03.

This proposal has been superseded by a proposal from 2014-02-03.

Description of the change

Part 4: handling connectivity events.

Also, added State to client/session, to aid in testing some aspects of
this.

To post a comment you must log in.
lp:~chipaca/ubuntu-push/client-v0-p4 updated
48. By John Lenton

fixed races using cannons

49. By John Lenton

merged trunk

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'client/session/session.go'
2--- client/session/session.go 2014-02-01 21:04:32 +0000
3+++ client/session/session.go 2014-02-03 19:32:41 +0000
4@@ -42,6 +42,17 @@
5 protocol.NotificationsMsg
6 }
7
8+// ClientSessionState is a way to broadly track the progress of the session
9+type ClientSessionState uint8
10+
11+const (
12+ Error ClientSessionState = iota
13+ Disconnected
14+ Connected
15+ Started
16+ Running
17+)
18+
19 // ClienSession holds a client<->server session and its configuration.
20 type ClientSession struct {
21 // configuration
22@@ -57,6 +68,7 @@
23 proto protocol.Protocol
24 pingInterval time.Duration
25 // status
26+ State ClientSessionState
27 ErrCh chan error
28 MsgCh chan *Notification
29 }
30@@ -71,6 +83,7 @@
31 Protocolator: protocol.NewProtocol0,
32 Levels: levelmap.NewLevelMap(),
33 TLS: &tls.Config{InsecureSkipVerify: true}, // XXX
34+ State: Disconnected,
35 }
36 if pem != nil {
37 cp := x509.NewCertPool()
38@@ -88,9 +101,11 @@
39 func (sess *ClientSession) connect() error {
40 conn, err := net.DialTimeout("tcp", sess.ServerAddr, sess.ExchangeTimeout)
41 if err != nil {
42+ sess.State = Error
43 return fmt.Errorf("connect: %s", err)
44 }
45 sess.Connection = tls.Client(conn, sess.TLS)
46+ sess.State = Connected
47 return nil
48 }
49
50@@ -102,6 +117,7 @@
51 // you could do to recover at this stage).
52 sess.Connection = nil
53 }
54+ sess.State = Disconnected
55 }
56
57 // handle "ping" messages
58@@ -110,6 +126,7 @@
59 if err == nil {
60 sess.Log.Debugf("ping.")
61 } else {
62+ sess.State = Error
63 sess.Log.Errorf("unable to pong: %s", err)
64 }
65 return err
66@@ -119,6 +136,7 @@
67 func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
68 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})
69 if err != nil {
70+ sess.State = Error
71 return err
72 }
73 sess.Log.Debugf("broadcast chan:%v app:%v topLevel:%d payloads:%s",
74@@ -137,11 +155,13 @@
75 func (sess *ClientSession) loop() error {
76 var err error
77 var recv serverMsg
78+ sess.State = Running
79 for {
80 deadAfter := sess.pingInterval + sess.ExchangeTimeout
81 sess.proto.SetDeadline(time.Now().Add(deadAfter))
82 err = sess.proto.ReadMessage(&recv)
83 if err != nil {
84+ sess.State = Error
85 return err
86 }
87 switch recv.Type {
88@@ -161,12 +181,14 @@
89 conn := sess.Connection
90 err := conn.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
91 if err != nil {
92+ sess.State = Error
93 return err
94 }
95 _, err = conn.Write(wireVersionBytes)
96 // The Writer docs: Write must return a non-nil error if it returns
97 // n < len(p). So, no need to check number of bytes written, hooray.
98 if err != nil {
99+ sess.State = Error
100 return err
101 }
102 proto := sess.Protocolator(conn)
103@@ -177,23 +199,28 @@
104 Levels: sess.Levels.GetAll(),
105 })
106 if err != nil {
107+ sess.State = Error
108 return err
109 }
110 var connAck protocol.ConnAckMsg
111 err = proto.ReadMessage(&connAck)
112 if err != nil {
113+ sess.State = Error
114 return err
115 }
116 if connAck.Type != "connack" {
117+ sess.State = Error
118 return fmt.Errorf("expecting CONNACK, got %#v", connAck.Type)
119 }
120 pingInterval, err := time.ParseDuration(connAck.Params.PingInterval)
121 if err != nil {
122+ sess.State = Error
123 return err
124 }
125 sess.proto = proto
126 sess.pingInterval = pingInterval
127 sess.Log.Debugf("Connected %v.", conn.LocalAddr())
128+ sess.State = Started
129 return nil
130 }
131
132
133=== modified file 'client/session/session_test.go'
134--- client/session/session_test.go 2014-02-01 21:04:32 +0000
135+++ client/session/session_test.go 2014-02-03 19:32:41 +0000
136@@ -168,6 +168,7 @@
137 c.Check(err, IsNil)
138 // but no root CAs set
139 c.Check(sess.TLS.RootCAs, IsNil)
140+ c.Check(sess.State, Equals, Disconnected)
141 }
142
143 var certfile string = helpers.SourceRelative("../../server/acceptance/config/testing.cert")
144@@ -196,6 +197,7 @@
145 c.Assert(err, IsNil)
146 err = sess.connect()
147 c.Check(err, ErrorMatches, ".*connect.*address.*")
148+ c.Check(sess.State, Equals, Error)
149 }
150
151 func (cs *clientSessionSuite) TestConnectConnects(c *C) {
152@@ -207,6 +209,7 @@
153 err = sess.connect()
154 c.Check(err, IsNil)
155 c.Check(sess.Connection, NotNil)
156+ c.Check(sess.State, Equals, Connected)
157 }
158
159 func (cs *clientSessionSuite) TestConnectConnectFail(c *C) {
160@@ -217,6 +220,7 @@
161 c.Assert(err, IsNil)
162 err = sess.connect()
163 c.Check(err, ErrorMatches, ".*connection refused")
164+ c.Check(sess.State, Equals, Error)
165 }
166
167 /****************************************************************
168@@ -229,6 +233,7 @@
169 sess.Connection = &testConn{Name: "TestClose"}
170 sess.Close()
171 c.Check(sess.Connection, IsNil)
172+ c.Check(sess.State, Equals, Disconnected)
173 }
174
175 func (cs *clientSessionSuite) TestCloseTwice(c *C) {
176@@ -239,6 +244,7 @@
177 c.Check(sess.Connection, IsNil)
178 sess.Close()
179 c.Check(sess.Connection, IsNil)
180+ c.Check(sess.State, Equals, Disconnected)
181 }
182
183 func (cs *clientSessionSuite) TestCloseFails(c *C) {
184@@ -247,6 +253,7 @@
185 sess.Connection = &testConn{Name: "TestCloseFails", CloseCondition: condition.Work(false)}
186 sess.Close()
187 c.Check(sess.Connection, IsNil) // nothing you can do to clean up anyway
188+ c.Check(sess.State, Equals, Disconnected)
189 }
190
191 /****************************************************************
192@@ -289,6 +296,7 @@
193 c.Check(s.sess.handlePing(), Equals, failure)
194 c.Assert(len(s.downCh), Equals, 1)
195 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})
196+ c.Check(s.sess.State, Equals, Error)
197 }
198
199 /****************************************************************
200@@ -328,6 +336,7 @@
201 failure := errors.New("ACK ACK ACK")
202 s.upCh <- failure
203 c.Assert(<-s.errCh, Equals, failure)
204+ c.Check(s.sess.State, Equals, Error)
205 }
206
207 func (s *msgSuite) TestHandleBroadcastWrongChannel(c *C) {
208@@ -363,12 +372,15 @@
209 }
210
211 func (s *loopSuite) TestLoopReadError(c *C) {
212+ c.Check(s.sess.State, Equals, Running)
213 s.upCh <- errors.New("Read")
214 err := <-s.errCh
215 c.Check(err, ErrorMatches, "Read")
216+ c.Check(s.sess.State, Equals, Error)
217 }
218
219 func (s *loopSuite) TestLoopPing(c *C) {
220+ c.Check(s.sess.State, Equals, Running)
221 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
222 s.upCh <- protocol.PingPongMsg{Type: "ping"}
223 c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
224@@ -378,6 +390,7 @@
225 }
226
227 func (s *loopSuite) TestLoopLoopsDaLoop(c *C) {
228+ c.Check(s.sess.State, Equals, Running)
229 for i := 1; i < 10; i++ {
230 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
231 s.upCh <- protocol.PingPongMsg{Type: "ping"}
232@@ -390,6 +403,7 @@
233 }
234
235 func (s *loopSuite) TestLoopBroadcast(c *C) {
236+ c.Check(s.sess.State, Equals, Running)
237 b := &protocol.BroadcastMsg{
238 Type: "broadcast",
239 AppId: "--ignored--",
240@@ -415,6 +429,7 @@
241 DeadlineCondition: condition.Work(false)} // setdeadline will fail
242 err = sess.start()
243 c.Check(err, ErrorMatches, ".*deadline.*")
244+ c.Check(sess.State, Equals, Error)
245 }
246
247 func (cs *clientSessionSuite) TestStartFailsIfWriteFails(c *C) {
248@@ -424,6 +439,7 @@
249 WriteCondition: condition.Work(false)} // write will fail
250 err = sess.start()
251 c.Check(err, ErrorMatches, ".*write.*")
252+ c.Check(sess.State, Equals, Error)
253 }
254
255 func (cs *clientSessionSuite) TestStartConnectMessageFails(c *C) {
256@@ -449,6 +465,7 @@
257 upCh <- errors.New("Overflow error in /dev/null")
258 err = <-errCh
259 c.Check(err, ErrorMatches, "Overflow.*null")
260+ c.Check(sess.State, Equals, Error)
261 }
262
263 func (cs *clientSessionSuite) TestStartConnackReadError(c *C) {
264@@ -472,6 +489,7 @@
265 upCh <- io.EOF
266 err = <-errCh
267 c.Check(err, ErrorMatches, ".*EOF.*")
268+ c.Check(sess.State, Equals, Error)
269 }
270
271 func (cs *clientSessionSuite) TestStartBadConnack(c *C) {
272@@ -495,6 +513,7 @@
273 upCh <- protocol.ConnAckMsg{Type: "connack"}
274 err = <-errCh
275 c.Check(err, ErrorMatches, ".*invalid.*")
276+ c.Check(sess.State, Equals, Error)
277 }
278
279 func (cs *clientSessionSuite) TestStartNotConnack(c *C) {
280@@ -518,6 +537,7 @@
281 upCh <- protocol.ConnAckMsg{Type: "connnak"}
282 err = <-errCh
283 c.Check(err, ErrorMatches, ".*CONNACK.*")
284+ c.Check(sess.State, Equals, Error)
285 }
286
287 func (cs *clientSessionSuite) TestStartWorks(c *C) {
288@@ -545,6 +565,7 @@
289 // start is now done.
290 err = <-errCh
291 c.Check(err, IsNil)
292+ c.Check(sess.State, Equals, Started)
293 }
294
295 /****************************************************************

Subscribers

People subscribed via source and target branches