Merge lp:~chipaca/ubuntu-push/introduction-to-quantum-client-sessions into lp:ubuntu-push

Proposed by John Lenton
Status: Merged
Approved by: John Lenton
Approved revision: 49
Merged at revision: 39
Proposed branch: lp:~chipaca/ubuntu-push/introduction-to-quantum-client-sessions
Merge into: lp:ubuntu-push
Prerequisite: lp:~chipaca/ubuntu-push/complex-client-session
Diff against target: 177 lines (+98/-10)
2 files modified
client/session/session.go (+23/-2)
client/session/session_test.go (+75/-8)
To merge this branch: bzr merge lp:~chipaca/ubuntu-push/introduction-to-quantum-client-sessions
Reviewer Review Type Date Requested Status
Samuele Pedroni Approve
Review via email: mp+204128@code.launchpad.net

Commit message

The running man.

Description of the change

Client session, volume 4: A gentle introduction to quantum client sessions.

To post a comment you must log in.
40. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

41. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

42. By John Lenton

merged and resolved

43. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

44. By John Lenton

the good ol' proto.SetDeadline

45. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

46. By John Lenton

AckMsg still need to specify Type

47. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

48. By John Lenton

Too many deadlines! I know the feeling.

49. By John Lenton

Merged complex-client-session into introduction-to-quantum-client-sessions.

Revision history for this message
Samuele Pedroni (pedronis) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'client/session/session.go'
2--- client/session/session.go 2014-01-31 16:05:03 +0000
3+++ client/session/session.go 2014-01-31 17:43:04 +0000
4@@ -116,7 +116,6 @@
5
6 // handle "ping" messages
7 func (sess *ClientSession) handlePing() error {
8- sess.proto.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
9 err := sess.proto.WriteMessage(protocol.PingPongMsg{Type: "pong"})
10 if err == nil {
11 sess.Log.Debugf("ping.")
12@@ -128,7 +127,6 @@
13
14 // handle "broadcast" messages
15 func (sess *ClientSession) handleBroadcast(bcast *serverMsg) error {
16- sess.proto.SetDeadline(time.Now().Add(sess.ExchangeTimeout))
17 err := sess.proto.WriteMessage(protocol.AckMsg{"ack"})
18 if err != nil {
19 return err
20@@ -144,3 +142,26 @@
21 }
22 return nil
23 }
24+
25+// Run the session with the server, emits a stream of events.
26+func (sess *ClientSession) run() error {
27+ var err error
28+ var recv serverMsg
29+ for {
30+ deadAfter := sess.pingInterval + sess.ExchangeTimeout
31+ sess.proto.SetDeadline(time.Now().Add(deadAfter))
32+ err = sess.proto.ReadMessage(&recv)
33+ if err != nil {
34+ return err
35+ }
36+ switch recv.Type {
37+ case "ping":
38+ err = sess.handlePing()
39+ case "broadcast":
40+ err = sess.handleBroadcast(&recv)
41+ }
42+ if err != nil {
43+ return err
44+ }
45+ }
46+}
47
48=== modified file 'client/session/session_test.go'
49--- client/session/session_test.go 2014-01-31 16:05:03 +0000
50+++ client/session/session_test.go 2014-01-31 17:43:04 +0000
51@@ -104,7 +104,19 @@
52 }
53
54 func (c *testProtocol) ReadMessage(dest interface{}) error {
55- panic("ReadMessage not implemented.")
56+ switch v := takeNext(c.up).(type) {
57+ case error:
58+ return v
59+ default:
60+ // make sure JSON.Unmarshal works with dest
61+ var marshalledMsg []byte
62+ marshalledMsg, err := json.Marshal(v)
63+ if err != nil {
64+ return fmt.Errorf("can't jsonify test value %v: %s", v, err)
65+ }
66+ return json.Unmarshal(marshalledMsg, dest)
67+ }
68+ return nil
69 }
70
71 func (c *testProtocol) WriteMessage(src interface{}) error {
72@@ -277,8 +289,7 @@
73 func (s *msgSuite) TestHandlePingWorks(c *C) {
74 s.upCh <- nil // no error
75 c.Check(s.sess.handlePing(), IsNil)
76- c.Assert(len(s.downCh), Equals, 2)
77- c.Check(<-s.downCh, Equals, "deadline 1ms")
78+ c.Assert(len(s.downCh), Equals, 1)
79 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})
80 }
81
82@@ -287,8 +298,7 @@
83 s.upCh <- failure
84
85 c.Check(s.sess.handlePing(), Equals, failure)
86- c.Assert(len(s.downCh), Equals, 2)
87- c.Check(<-s.downCh, Equals, "deadline 1ms")
88+ c.Assert(len(s.downCh), Equals, 1)
89 c.Check(<-s.downCh, Equals, protocol.PingPongMsg{Type: "pong"})
90 }
91
92@@ -306,7 +316,6 @@
93 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
94 }, protocol.NotificationsMsg{}}
95 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
96- c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
97 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
98 s.upCh <- nil // ack ok
99 c.Check(<-s.errCh, Equals, nil)
100@@ -326,7 +335,6 @@
101 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
102 }, protocol.NotificationsMsg{}}
103 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
104- c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
105 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
106 failure := errors.New("ACK ACK ACK")
107 s.upCh <- failure
108@@ -343,9 +351,68 @@
109 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
110 }, protocol.NotificationsMsg{}}
111 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
112- c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
113 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
114 s.upCh <- nil // ack ok
115 c.Check(<-s.errCh, IsNil)
116 c.Check(len(s.sess.MsgCh), Equals, 0)
117 }
118+
119+/****************************************************************
120+ run() tests
121+****************************************************************/
122+
123+type runSuite msgSuite
124+
125+var _ = Suite(&runSuite{})
126+
127+func (s *runSuite) SetUpTest(c *C) {
128+ (*msgSuite)(s).SetUpTest(c)
129+ s.sess.Connection.(*testConn).Name = "TestRun* (small r)"
130+ go func() {
131+ s.errCh <- s.sess.run()
132+ }()
133+}
134+
135+func (s *runSuite) TestRunReadError(c *C) {
136+ s.upCh <- errors.New("Read")
137+ err := <-s.errCh
138+ c.Assert(err, NotNil)
139+ c.Check(err.Error(), Equals, "Read")
140+}
141+
142+func (s *runSuite) TestRunPing(c *C) {
143+ c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
144+ s.upCh <- protocol.PingPongMsg{Type: "ping"}
145+ c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
146+ failure := errors.New("pong")
147+ s.upCh <- failure
148+ c.Check(<-s.errCh, Equals, failure)
149+}
150+
151+func (s *runSuite) TestRunLoopsDaLoop(c *C) {
152+ for i := 1; i < 10; i++ {
153+ c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
154+ s.upCh <- protocol.PingPongMsg{Type: "ping"}
155+ c.Check(takeNext(s.downCh), Equals, protocol.PingPongMsg{Type: "pong"})
156+ s.upCh <- nil
157+ }
158+ failure := errors.New("pong")
159+ s.upCh <- failure
160+ c.Check(<-s.errCh, Equals, failure)
161+}
162+
163+func (s *runSuite) TestRunBroadcast(c *C) {
164+ b := &protocol.BroadcastMsg{
165+ Type: "broadcast",
166+ AppId: "--ignored--",
167+ ChanId: "0",
168+ TopLevel: 2,
169+ Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
170+ }
171+ c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
172+ s.upCh <- b
173+ c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
174+ failure := errors.New("ack")
175+ s.upCh <- failure
176+ c.Check(<-s.errCh, Equals, failure)
177+}

Subscribers

People subscribed via source and target branches