Merge lp:~chipaca/ubuntu-push/introduction-to-quantum-client-sessions into lp:ubuntu-push

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: 49
Merged at revision: 39
Proposed branch: lp:~chipaca/ubuntu-push/introduction-to-quantum-client-sessions
Merge into: lp:ubuntu-push
Prerequisite: lp:~chipaca/ubuntu-push/complex-client-session
Diff against target: 177 lines (+98/-10)
2 files modified
client/session/session.go (+23/-2)
client/session/session_test.go (+75/-8)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/introduction-to-quantum-client-sessions
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+204128@code.launchpad.net

Commit message

The running man.

Description of the change

Client session, volume 4: A gentle introduction to quantum client sessions.

To post a comment you must log in.
40. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

41. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

42. By John Lenton

merged and resolved

43. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

44. By John Lenton

the good ol' proto.SetDeadline

45. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

46. By John Lenton

AckMsg still need to specify Type

47. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

48. By John Lenton

Too many deadlines! I know the feeling.

49. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'client/session/session.go'
--- client/session/session.go 2014-01-31 16:05:03 +0000
+++ client/session/session.go 2014-01-31 17:43:04 +0000
@@ -116,7 +116,6 @@
116116
117// handle "ping" messages117// handle "ping" messages
118func (sess *ClientSession) handlePing() error {118func (sess *ClientSession) handlePing() error {
119 sess.proto.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
120 err := sess.proto.WriteMessage(protocol.PingPongMsg{Type: "pong"})119 err := sess.proto.WriteMessage(protocol.PingPongMsg{Type: "pong"})
121 if err == nil {120 if err == nil {
122 sess.Log.Debugf("ping.")121 sess.Log.Debugf("ping.")
@@ -128,7 +127,6 @@
128127
129// handle "broadcast" messages128// handle "broadcast" messages
130func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {129func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
131 sess.proto.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
132 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})130 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})
133 if err != nil {131 if err != nil {
134 return err132 return err
@@ -144,3 +142,26 @@
144 }142 }
145 return nil143 return nil
146}144}
145
146// Run the session with the server, emits a stream of events.
147func (sess *ClientSession) run() error {
148 var err error
149 var recv serverMsg
150 for {
151 deadAfter := sess.pingInterval + sess.ExchangeTimeout
152 sess.proto.SetDeadline(time.Now().Add(deadAfter))
153 err = sess.proto.ReadMessage(&recv)
154 if err != nil {
155 return err
156 }
157 switch recv.Type {
158 case "ping":
159 err = sess.handlePing()
160 case "broadcast":
161 err = sess.handleBroadcast(&recv)
162 }
163 if err != nil {
164 return err
165 }
166 }
167}
147168
=== modified file 'client/session/session_test.go'
--- client/session/session_test.go 2014-01-31 16:05:03 +0000
+++ client/session/session_test.go 2014-01-31 17:43:04 +0000
@@ -104,7 +104,19 @@
104}104}
105105
106func (c *testProtocol) ReadMessage(dest interface{}) error {106func (c *testProtocol) ReadMessage(dest interface{}) error {
107 panic("ReadMessage not implemented.")107 switch v := takeNext(c.up).(type) {
108 case error:
109 return v
110 default:
111 // make sure JSON.Unmarshal works with dest
112 var marshalledMsg []byte
113 marshalledMsg, err := json.Marshal(v)
114 if err != nil {
115 return fmt.Errorf("can't jsonify test value %v: %s", v, err)
116 }
117 return json.Unmarshal(marshalledMsg, dest)
118 }
119 return nil
108}120}
109121
110func (c *testProtocol) WriteMessage(src interface{}) error {122func (c *testProtocol) WriteMessage(src interface{}) error {
@@ -277,8 +289,7 @@
277func (s *msgSuite) TestHandlePingWorks(c *C) {289func (s *msgSuite) TestHandlePingWorks(c *C) {
278 s.upCh <- nil // no error290 s.upCh <- nil // no error
279 c.Check(s.sess.handlePing(), IsNil)291 c.Check(s.sess.handlePing(), IsNil)
280 c.Assert(len(s.downCh), Equals, 2)292 c.Assert(len(s.downCh), Equals, 1)
281 c.Check(<-s.downCh, Equals, "deadline 1ms")
282 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})293 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})
283}294}
284295
@@ -287,8 +298,7 @@
287 s.upCh <- failure298 s.upCh <- failure
288299
289 c.Check(s.sess.handlePing(), Equals, failure)300 c.Check(s.sess.handlePing(), Equals, failure)
290 c.Assert(len(s.downCh), Equals, 2)301 c.Assert(len(s.downCh), Equals, 1)
291 c.Check(<-s.downCh, Equals, "deadline 1ms")
292 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})302 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})
293}303}
294304
@@ -306,7 +316,6 @@
306 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},316 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
307 }, protocol.NotificationsMsg{}}317 }, protocol.NotificationsMsg{}}
308 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()318 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
309 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
310 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})319 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
311 s.upCh <- nil // ack ok320 s.upCh <- nil // ack ok
312 c.Check(<-s.errCh, Equals, nil)321 c.Check(<-s.errCh, Equals, nil)
@@ -326,7 +335,6 @@
326 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},335 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
327 }, protocol.NotificationsMsg{}}336 }, protocol.NotificationsMsg{}}
328 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()337 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
329 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
330 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})338 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
331 failure := errors.New("ACK ACK ACK")339 failure := errors.New("ACK ACK ACK")
332 s.upCh <- failure340 s.upCh <- failure
@@ -343,9 +351,68 @@
343 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},351 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
344 }, protocol.NotificationsMsg{}}352 }, protocol.NotificationsMsg{}}
345 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()353 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
346 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
347 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})354 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
348 s.upCh <- nil // ack ok355 s.upCh <- nil // ack ok
349 c.Check(<-s.errCh, IsNil)356 c.Check(<-s.errCh, IsNil)
350 c.Check(len(s.sess.MsgCh), Equals, 0)357 c.Check(len(s.sess.MsgCh), Equals, 0)
351}358}
359
360/****************************************************************
361 run() tests
362****************************************************************/
363
364type runSuite msgSuite
365
366var _ = Suite(&runSuite{})
367
368func (s *runSuite) SetUpTest(c *C) {
369 (*msgSuite)(s).SetUpTest(c)
370 s.sess.Connection.(*testConn).Name = "TestRun* (small r)"
371 go func() {
372 s.errCh <- s.sess.run()
373 }()
374}
375
376func (s *runSuite) TestRunReadError(c *C) {
377 s.upCh <- errors.New("Read")
378 err := <-s.errCh
379 c.Assert(err, NotNil)
380 c.Check(err.Error(), Equals, "Read")
381}
382
383func (s *runSuite) TestRunPing(c *C) {
384 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
385 s.upCh <- protocol.PingPongMsg{Type: "ping"}
386 c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
387 failure := errors.New("pong")
388 s.upCh <- failure
389 c.Check(<-s.errCh, Equals, failure)
390}
391
392func (s *runSuite) TestRunLoopsDaLoop(c *C) {
393 for i := 1; i < 10; i++ {
394 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
395 s.upCh <- protocol.PingPongMsg{Type: "ping"}
396 c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
397 s.upCh <- nil
398 }
399 failure := errors.New("pong")
400 s.upCh <- failure
401 c.Check(<-s.errCh, Equals, failure)
402}
403
404func (s *runSuite) TestRunBroadcast(c *C) {
405 b := &protocol.BroadcastMsg{
406 Type: "broadcast",
407 AppId: "--ignored--",
408 ChanId: "0",
409 TopLevel: 2,
410 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
411 }
412 c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
413 s.upCh <- b
414 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
415 failure := errors.New("ack")
416 s.upCh <- failure
417 c.Check(<-s.errCh, Equals, failure)
418}

Subscribers

People subscribed via source and target branches