Merge lp:~pedronis/ubuntu-push/connack into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 46
Merged at revision: 47
Proposed branch: lp:~pedronis/ubuntu-push/connack
Merge into: lp:ubuntu-push
Diff against target: 239 lines (+77/-19)
6 files modified
protocol/messages.go (+13/-0)
server/acceptance/acceptance_test.go (+2/-3)
server/acceptance/acceptanceclient.go (+10/-2)
server/acceptance/cmd/acceptanceclient.go (+0/-2)
server/session/session.go (+8/-0)
server/session/session_test.go (+44/-12)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/connack
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+201810@code.launchpad.net

Commit message

introduce CONNACK message from the server after CONNECT to inform the client of connection params like ping interval

Description of the change

introduce CONNACK message from the server after CONNECT to inform the client of connection params like ping interval

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) wrote :

Connack the Barbariack!
Meaning, I approve.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'protocol/messages.go'
2--- protocol/messages.go 2014-01-13 15:21:11 +0000
3+++ protocol/messages.go 2014-01-15 16:18:38 +0000
4@@ -36,6 +36,19 @@
5 Levels map[string]int64
6 }
7
8+// CONNACK message
9+type ConnAckMsg struct {
10+ Type string `json:"T"`
11+ Params ConnAckParams
12+}
13+
14+// ConnAckParams carries the connection parameters from the server on
15+// connection acknowledment.
16+type ConnAckParams struct {
17+ // ping interval formatted time.Duration
18+ PingInterval string
19+}
20+
21 // PING/PONG messages
22 type PingPongMsg struct {
23 Type string `json:"T"`
24
25=== modified file 'server/acceptance/acceptance_test.go'
26--- server/acceptance/acceptance_test.go 2014-01-13 15:21:11 +0000
27+++ server/acceptance/acceptance_test.go 2014-01-15 16:18:38 +0000
28@@ -86,7 +86,6 @@
29 }
30 return &ClientSession{
31 ExchangeTimeout: 100 * time.Millisecond,
32- PingInterval: 500 * time.Millisecond,
33 ServerAddr: addr,
34 CertPEMBlock: certPEMBlock,
35 DeviceId: deviceId,
36@@ -245,7 +244,7 @@
37 c.Assert(err, IsNil)
38 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
39 // would be 3rd ping read, based on logged traffic
40- if op == "read" && ic.totalRead >= 28 {
41+ if op == "read" && ic.totalRead >= 79 {
42 // exit the sess.Run() goroutine, client will close
43 runtime.Goexit()
44 }
45@@ -279,7 +278,7 @@
46 c.Assert(err, IsNil)
47 intercept := func(ic *interceptingConn, op string, b []byte) (bool, int, error) {
48 // would be pong to 2nd ping, based on logged traffic
49- if op == "write" && ic.totalRead >= 28 {
50+ if op == "write" && ic.totalRead >= 67 {
51 time.Sleep(200 * time.Millisecond)
52 // exit the sess.Run() goroutine, client will close
53 runtime.Goexit()
54
55=== modified file 'server/acceptance/acceptanceclient.go'
56--- server/acceptance/acceptanceclient.go 2014-01-13 17:43:34 +0000
57+++ server/acceptance/acceptanceclient.go 2014-01-15 16:18:38 +0000
58@@ -34,7 +34,6 @@
59 // configuration
60 DeviceId string
61 ServerAddr string
62- PingInterval time.Duration
63 ExchangeTimeout time.Duration
64 CertPEMBlock []byte
65 ReportPings bool
66@@ -89,10 +88,19 @@
67 if err != nil {
68 return err
69 }
70+ var connAck protocol.ConnAckMsg
71+ err = proto.ReadMessage(&connAck)
72+ if err != nil {
73+ return err
74+ }
75+ pingInterval, err := time.ParseDuration(connAck.Params.PingInterval)
76+ if err != nil {
77+ return err
78+ }
79 events <- fmt.Sprintf("connected %v", conn.LocalAddr())
80 var recv serverMsg
81 for {
82- deadAfter := sess.PingInterval + sess.ExchangeTimeout
83+ deadAfter := pingInterval + sess.ExchangeTimeout
84 conn.SetDeadline(time.Now().Add(deadAfter))
85 err = proto.ReadMessage(&recv)
86 if err != nil {
87
88=== modified file 'server/acceptance/cmd/acceptanceclient.go'
89--- server/acceptance/cmd/acceptanceclient.go 2014-01-13 17:43:34 +0000
90+++ server/acceptance/cmd/acceptanceclient.go 2014-01-15 16:18:38 +0000
91@@ -33,7 +33,6 @@
92
93 type configuration struct {
94 // session configuration
95- PingInterval config.ConfigTimeDuration `json:"ping_interval"`
96 ExchangeTimeout config.ConfigTimeDuration `json:"exchange_timeout"`
97 // server connection config
98 Addr config.ConfigHostPort
99@@ -61,7 +60,6 @@
100 }
101 session := &acceptance.ClientSession{
102 ExchangeTimeout: cfg.ExchangeTimeout.TimeDuration(),
103- PingInterval: cfg.PingInterval.TimeDuration(),
104 ServerAddr: cfg.Addr.HostPort(),
105 DeviceId: flag.Arg(1),
106 // flags
107
108=== modified file 'server/session/session.go'
109--- server/session/session.go 2014-01-14 14:34:20 +0000
110+++ server/session/session.go 2014-01-15 16:18:38 +0000
111@@ -44,6 +44,14 @@
112 if connMsg.Type != "connect" {
113 return nil, &broker.ErrAbort{"expected CONNECT message"}
114 }
115+ proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout()))
116+ err = proto.WriteMessage(&protocol.ConnAckMsg{
117+ Type: "connack",
118+ Params: protocol.ConnAckParams{PingInterval: cfg.PingInterval().String()},
119+ })
120+ if err != nil {
121+ return nil, err
122+ }
123 return brkr.Register(&connMsg)
124 }
125
126
127=== modified file 'server/session/session_test.go'
128--- server/session/session_test.go 2014-01-14 14:34:20 +0000
129+++ server/session/session_test.go 2014-01-15 16:18:38 +0000
130@@ -109,7 +109,8 @@
131 return tsc.exchangeTimeout
132 }
133
134-var cfg5msExchangeTout = &testSessionConfig{
135+var cfg10msPingInterval5msExchangeTout = &testSessionConfig{
136+ pingInterval: 10 * time.Millisecond,
137 exchangeTimeout: 5 * time.Millisecond,
138 }
139
140@@ -153,11 +154,17 @@
141 brkr := newTestBroker()
142 go func() {
143 var err error
144- sess, err = sessionStart(tp, brkr, cfg5msExchangeTout)
145+ sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout)
146 errCh <- err
147 }()
148 c.Check(takeNext(down), Equals, "deadline 5ms")
149 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}
150+ c.Check(takeNext(down), Equals, "deadline 5ms")
151+ c.Check(takeNext(down), Equals, protocol.ConnAckMsg{
152+ Type: "connack",
153+ Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},
154+ })
155+ up <- nil // no write error
156 err := <-errCh
157 c.Check(err, IsNil)
158 c.Check(takeNext(brkr.registration), Equals, "register dev-1")
159@@ -175,21 +182,37 @@
160 brkr.err = errRegister
161 go func() {
162 var err error
163- sess, err = sessionStart(tp, brkr, cfg5msExchangeTout)
164+ sess, err = sessionStart(tp, brkr, cfg10msPingInterval5msExchangeTout)
165 errCh <- err
166 }()
167 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}
168+ takeNext(down) // CONNACK
169+ up <- nil // no write error
170 err := <-errCh
171 c.Check(err, Equals, errRegister)
172 }
173
174-func (s *sessionSuite) TestSessionStartErrors(c *C) {
175- up := make(chan interface{}, 5)
176- down := make(chan interface{}, 5)
177- tp := &testProtocol{up, down}
178- up <- io.ErrUnexpectedEOF
179- _, err := sessionStart(tp, nil, cfg5msExchangeTout)
180- c.Check(err, Equals, io.ErrUnexpectedEOF)
181+func (s *sessionSuite) TestSessionStartReadError(c *C) {
182+ up := make(chan interface{}, 5)
183+ down := make(chan interface{}, 5)
184+ tp := &testProtocol{up, down}
185+ up <- io.ErrUnexpectedEOF
186+ _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)
187+ c.Check(err, Equals, io.ErrUnexpectedEOF)
188+}
189+
190+func (s *sessionSuite) TestSessionStartWriteError(c *C) {
191+ up := make(chan interface{}, 5)
192+ down := make(chan interface{}, 5)
193+ tp := &testProtocol{up, down}
194+ up <- protocol.ConnectMsg{Type: "connect"}
195+ up <- io.ErrUnexpectedEOF
196+ _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)
197+ c.Check(err, Equals, io.ErrUnexpectedEOF)
198+ // sanity
199+ c.Check(takeNext(down), Matches, "deadline.*")
200+ c.Check(takeNext(down), Matches, "deadline.*")
201+ c.Check(takeNext(down), FitsTypeOf, protocol.ConnAckMsg{})
202 }
203
204 func (s *sessionSuite) TestSessionStartMismatch(c *C) {
205@@ -197,7 +220,7 @@
206 down := make(chan interface{}, 5)
207 tp := &testProtocol{up, down}
208 up <- protocol.ConnectMsg{Type: "what"}
209- _, err := sessionStart(tp, nil, cfg5msExchangeTout)
210+ _, err := sessionStart(tp, nil, cfg10msPingInterval5msExchangeTout)
211 c.Check(err, DeepEquals, &broker.ErrAbort{"expected CONNECT message"})
212 }
213
214@@ -479,15 +502,24 @@
215 }()
216 io.WriteString(cli, "\x00")
217 io.WriteString(cli, "\x00\x20{\"T\":\"connect\",\"DeviceId\":\"DEV\"}")
218+ // connack
219 downStream := bufio.NewReader(cli)
220 msg, err := downStream.ReadBytes(byte('}'))
221 c.Check(err, IsNil)
222+ c.Check(msg, DeepEquals, []byte("\x00\x2f{\"T\":\"connack\",\"Params\":{\"PingInterval\":\"5ms\"}"))
223+ // eat the last }
224+ rbr, err := downStream.ReadByte()
225+ c.Check(err, IsNil)
226+ c.Check(rbr, Equals, byte('}'))
227+ // first ping
228+ msg, err = downStream.ReadBytes(byte('}'))
229+ c.Check(err, IsNil)
230 c.Check(msg, DeepEquals, []byte("\x00\x0c{\"T\":\"ping\"}"))
231 c.Check(takeNext(brkr.registration), Equals, "register DEV")
232 c.Check(len(brkr.registration), Equals, 0) // not yet unregistered
233 cli.Close()
234 err = <-errCh
235- c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both"})
236+ c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both", "both"})
237 c.Check(err, Equals, io.EOF)
238 c.Check(takeNext(brkr.registration), Equals, "unregister DEV")
239 // tracking

Subscribers

People subscribed via source and target branches