Merge lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks into lp:ubuntu-push
- little-bits-of-info-n-tweaks
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Merged |
---|---|
Approved by: | Samuele Pedroni |
Approved revision: | 41 |
Merged at revision: | 38 |
Proposed branch: | lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks |
Merge into: | lp:ubuntu-push |
Diff against target: |
439 lines (+79/-37) 6 files modified
server/broker/broker.go (+2/-2) server/broker/exchanges.go (+4/-1) server/broker/exchanges_test.go (+3/-3) server/session/session.go (+6/-4) server/session/session_test.go (+59/-27) server/session/tracker.go (+5/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
John Lenton (community) | Approve | ||
Review via email:
|
Commit message
some more info reported out of session, some tweaks
Description of the change
various bits and bobs but there's logic to the madness:
* increase ping interval/timeout in some test relying in actual timeouts to be less aggressive
* tell the exchange whether it's fully done in Acked
* have the session report what was the effective elapsed ping interval
* remove a SetDeadline too much, exchange timeout is meant to cover pairs read-write, write-read
To post a comment you must log in.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
John Lenton (chipaca) : | # |
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'server/broker/broker.go' | |||
2 | --- server/broker/broker.go 2014-01-23 20:13:22 +0000 | |||
3 | +++ server/broker/broker.go 2014-01-31 17:25:49 +0000 | |||
4 | @@ -40,8 +40,8 @@ | |||
5 | 40 | 40 | ||
6 | 41 | // Exchange leads the session through performing an exchange, typically delivery. | 41 | // Exchange leads the session through performing an exchange, typically delivery. |
7 | 42 | type Exchange interface { | 42 | type Exchange interface { |
10 | 43 | Prepare(BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) | 43 | Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) |
11 | 44 | Acked(BrokerSession) error | 44 | Acked(sess BrokerSession, done bool) error |
12 | 45 | } | 45 | } |
13 | 46 | 46 | ||
14 | 47 | // LevelsMap is the type for holding channel levels for session. | 47 | // LevelsMap is the type for holding channel levels for session. |
15 | 48 | 48 | ||
16 | === modified file 'server/broker/exchanges.go' | |||
17 | --- server/broker/exchanges.go 2014-01-23 18:39:10 +0000 | |||
18 | +++ server/broker/exchanges.go 2014-01-31 17:25:49 +0000 | |||
19 | @@ -39,6 +39,9 @@ | |||
20 | 39 | NotificationPayloads []json.RawMessage | 39 | NotificationPayloads []json.RawMessage |
21 | 40 | } | 40 | } |
22 | 41 | 41 | ||
23 | 42 | // check interface already here | ||
24 | 43 | var _ Exchange = &BroadcastExchange{} | ||
25 | 44 | |||
26 | 42 | func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { | 45 | func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage { |
27 | 43 | c := int64(len(payloads)) | 46 | c := int64(len(payloads)) |
28 | 44 | delta := topLevel - clientLevel | 47 | delta := topLevel - clientLevel |
29 | @@ -63,7 +66,7 @@ | |||
30 | 63 | } | 66 | } |
31 | 64 | 67 | ||
32 | 65 | // Acked deals with an ACK for a BROADCAST. | 68 | // Acked deals with an ACK for a BROADCAST. |
34 | 66 | func (sbe *BroadcastExchange) Acked(sess BrokerSession) error { | 69 | func (sbe *BroadcastExchange) Acked(sess BrokerSession, done bool) error { |
35 | 67 | scratchArea := sess.ExchangeScratchArea() | 70 | scratchArea := sess.ExchangeScratchArea() |
36 | 68 | if scratchArea.ackMsg.Type != "ack" { | 71 | if scratchArea.ackMsg.Type != "ack" { |
37 | 69 | return &ErrAbort{"expected ACK message"} | 72 | return &ErrAbort{"expected ACK message"} |
38 | 70 | 73 | ||
39 | === modified file 'server/broker/exchanges_test.go' | |||
40 | --- server/broker/exchanges_test.go 2014-01-23 20:13:22 +0000 | |||
41 | +++ server/broker/exchanges_test.go 2014-01-31 17:25:49 +0000 | |||
42 | @@ -52,7 +52,7 @@ | |||
43 | 52 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) | 52 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`) |
44 | 53 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | 53 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
45 | 54 | c.Assert(err, IsNil) | 54 | c.Assert(err, IsNil) |
47 | 55 | err = exchg.Acked(sess) | 55 | err = exchg.Acked(sess, true) |
48 | 56 | c.Assert(err, IsNil) | 56 | c.Assert(err, IsNil) |
49 | 57 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3)) | 57 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3)) |
50 | 58 | } | 58 | } |
51 | @@ -76,7 +76,7 @@ | |||
52 | 76 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | 76 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
53 | 77 | err = json.Unmarshal([]byte(`{}`), outMsg) | 77 | err = json.Unmarshal([]byte(`{}`), outMsg) |
54 | 78 | c.Assert(err, IsNil) | 78 | c.Assert(err, IsNil) |
56 | 79 | err = exchg.Acked(sess) | 79 | err = exchg.Acked(sess, true) |
57 | 80 | c.Assert(err, Not(IsNil)) | 80 | c.Assert(err, Not(IsNil)) |
58 | 81 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(0)) | 81 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(0)) |
59 | 82 | } | 82 | } |
60 | @@ -103,6 +103,6 @@ | |||
61 | 103 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) | 103 | c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`) |
62 | 104 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) | 104 | err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg) |
63 | 105 | c.Assert(err, IsNil) | 105 | c.Assert(err, IsNil) |
65 | 106 | err = exchg.Acked(sess) | 106 | err = exchg.Acked(sess, true) |
66 | 107 | c.Assert(err, IsNil) | 107 | c.Assert(err, IsNil) |
67 | 108 | } | 108 | } |
68 | 109 | 109 | ||
69 | === modified file 'server/session/session.go' | |||
70 | --- server/session/session.go 2014-01-15 15:54:20 +0000 | |||
71 | +++ server/session/session.go 2014-01-31 17:25:49 +0000 | |||
72 | @@ -44,7 +44,6 @@ | |||
73 | 44 | if connMsg.Type != "connect" { | 44 | if connMsg.Type != "connect" { |
74 | 45 | return nil, &broker.ErrAbort{"expected CONNECT message"} | 45 | return nil, &broker.ErrAbort{"expected CONNECT message"} |
75 | 46 | } | 46 | } |
76 | 47 | proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout())) | ||
77 | 48 | err = proto.WriteMessage(&protocol.ConnAckMsg{ | 47 | err = proto.WriteMessage(&protocol.ConnAckMsg{ |
78 | 49 | Type: "connack", | 48 | Type: "connack", |
79 | 50 | Params: protocol.ConnAckParams{PingInterval: cfg.PingInterval().String()}, | 49 | Params: protocol.ConnAckParams{PingInterval: cfg.PingInterval().String()}, |
80 | @@ -70,14 +69,16 @@ | |||
81 | 70 | } | 69 | } |
82 | 71 | 70 | ||
83 | 72 | // sessionLoop manages the exchanges of the protocol session. | 71 | // sessionLoop manages the exchanges of the protocol session. |
85 | 73 | func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig) error { | 72 | func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig, track SessionTracker) error { |
86 | 74 | pingInterval := cfg.PingInterval() | 73 | pingInterval := cfg.PingInterval() |
87 | 75 | exchangeTimeout := cfg.ExchangeTimeout() | 74 | exchangeTimeout := cfg.ExchangeTimeout() |
88 | 76 | pingTimer := time.NewTimer(pingInterval) | 75 | pingTimer := time.NewTimer(pingInterval) |
89 | 76 | intervalStart := time.Now() | ||
90 | 77 | ch := sess.SessionChannel() | 77 | ch := sess.SessionChannel() |
91 | 78 | for { | 78 | for { |
92 | 79 | select { | 79 | select { |
93 | 80 | case <-pingTimer.C: | 80 | case <-pingTimer.C: |
94 | 81 | track.EffectivePingInterval(time.Since(intervalStart)) | ||
95 | 81 | pingMsg := &protocol.PingPongMsg{"ping"} | 82 | pingMsg := &protocol.PingPongMsg{"ping"} |
96 | 82 | var pongMsg protocol.PingPongMsg | 83 | var pongMsg protocol.PingPongMsg |
97 | 83 | err := exchange(proto, pingMsg, &pongMsg, exchangeTimeout) | 84 | err := exchange(proto, pingMsg, &pongMsg, exchangeTimeout) |
98 | @@ -103,8 +104,9 @@ | |||
99 | 103 | } | 104 | } |
100 | 104 | if done { | 105 | if done { |
101 | 105 | pingTimer.Reset(pingInterval) | 106 | pingTimer.Reset(pingInterval) |
102 | 107 | intervalStart = time.Now() | ||
103 | 106 | } | 108 | } |
105 | 107 | err = exchg.Acked(sess) | 109 | err = exchg.Acked(sess, done) |
106 | 108 | if err != nil { | 110 | if err != nil { |
107 | 109 | return err | 111 | return err |
108 | 110 | } | 112 | } |
109 | @@ -134,5 +136,5 @@ | |||
110 | 134 | } | 136 | } |
111 | 135 | track.Registered(sess) | 137 | track.Registered(sess) |
112 | 136 | defer brkr.Unregister(sess) | 138 | defer brkr.Unregister(sess) |
114 | 137 | return track.End(sessionLoop(proto, sess, cfg)) | 139 | return track.End(sessionLoop(proto, sess, cfg, track)) |
115 | 138 | } | 140 | } |
116 | 139 | 141 | ||
117 | === modified file 'server/session/session_test.go' | |||
118 | --- server/session/session_test.go 2014-01-23 20:13:22 +0000 | |||
119 | +++ server/session/session_test.go 2014-01-31 17:25:49 +0000 | |||
120 | @@ -147,7 +147,6 @@ | |||
121 | 147 | }() | 147 | }() |
122 | 148 | c.Check(takeNext(down), Equals, "deadline 5ms") | 148 | c.Check(takeNext(down), Equals, "deadline 5ms") |
123 | 149 | up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"} | 149 | up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"} |
124 | 150 | c.Check(takeNext(down), Equals, "deadline 5ms") | ||
125 | 151 | c.Check(takeNext(down), Equals, protocol.ConnAckMsg{ | 150 | c.Check(takeNext(down), Equals, protocol.ConnAckMsg{ |
126 | 152 | Type: "connack", | 151 | Type: "connack", |
127 | 153 | Params: protocol.ConnAckParams{(10 * time.Millisecond).String()}, | 152 | Params: protocol.ConnAckParams{(10 * time.Millisecond).String()}, |
128 | @@ -199,7 +198,6 @@ | |||
129 | 199 | c.Check(err, Equals, io.ErrUnexpectedEOF) | 198 | c.Check(err, Equals, io.ErrUnexpectedEOF) |
130 | 200 | // sanity | 199 | // sanity |
131 | 201 | c.Check(takeNext(down), Matches, "deadline.*") | 200 | c.Check(takeNext(down), Matches, "deadline.*") |
132 | 202 | c.Check(takeNext(down), Matches, "deadline.*") | ||
133 | 203 | c.Check(takeNext(down), FitsTypeOf, protocol.ConnAckMsg{}) | 201 | c.Check(takeNext(down), FitsTypeOf, protocol.ConnAckMsg{}) |
134 | 204 | } | 202 | } |
135 | 205 | 203 | ||
136 | @@ -218,13 +216,14 @@ | |||
137 | 218 | } | 216 | } |
138 | 219 | 217 | ||
139 | 220 | func (s *sessionSuite) TestSessionLoop(c *C) { | 218 | func (s *sessionSuite) TestSessionLoop(c *C) { |
140 | 219 | nopTrack := NewTracker(nopLogger) | ||
141 | 221 | errCh := make(chan error, 1) | 220 | errCh := make(chan error, 1) |
142 | 222 | up := make(chan interface{}, 5) | 221 | up := make(chan interface{}, 5) |
143 | 223 | down := make(chan interface{}, 5) | 222 | down := make(chan interface{}, 5) |
144 | 224 | tp := &testProtocol{up, down} | 223 | tp := &testProtocol{up, down} |
145 | 225 | sess := &testing.TestBrokerSession{} | 224 | sess := &testing.TestBrokerSession{} |
146 | 226 | go func() { | 225 | go func() { |
148 | 227 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 226 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
149 | 228 | }() | 227 | }() |
150 | 229 | c.Check(takeNext(down), Equals, "deadline 2ms") | 228 | c.Check(takeNext(down), Equals, "deadline 2ms") |
151 | 230 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) | 229 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) |
152 | @@ -239,13 +238,14 @@ | |||
153 | 239 | } | 238 | } |
154 | 240 | 239 | ||
155 | 241 | func (s *sessionSuite) TestSessionLoopWriteError(c *C) { | 240 | func (s *sessionSuite) TestSessionLoopWriteError(c *C) { |
156 | 241 | nopTrack := NewTracker(nopLogger) | ||
157 | 242 | errCh := make(chan error, 1) | 242 | errCh := make(chan error, 1) |
158 | 243 | up := make(chan interface{}, 5) | 243 | up := make(chan interface{}, 5) |
159 | 244 | down := make(chan interface{}, 5) | 244 | down := make(chan interface{}, 5) |
160 | 245 | tp := &testProtocol{up, down} | 245 | tp := &testProtocol{up, down} |
161 | 246 | sess := &testing.TestBrokerSession{} | 246 | sess := &testing.TestBrokerSession{} |
162 | 247 | go func() { | 247 | go func() { |
164 | 248 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 248 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
165 | 249 | }() | 249 | }() |
166 | 250 | c.Check(takeNext(down), Equals, "deadline 2ms") | 250 | c.Check(takeNext(down), Equals, "deadline 2ms") |
167 | 251 | c.Check(takeNext(down), FitsTypeOf, protocol.PingPongMsg{}) | 251 | c.Check(takeNext(down), FitsTypeOf, protocol.PingPongMsg{}) |
168 | @@ -255,13 +255,14 @@ | |||
169 | 255 | } | 255 | } |
170 | 256 | 256 | ||
171 | 257 | func (s *sessionSuite) TestSessionLoopMismatch(c *C) { | 257 | func (s *sessionSuite) TestSessionLoopMismatch(c *C) { |
172 | 258 | nopTrack := NewTracker(nopLogger) | ||
173 | 258 | errCh := make(chan error, 1) | 259 | errCh := make(chan error, 1) |
174 | 259 | up := make(chan interface{}, 5) | 260 | up := make(chan interface{}, 5) |
175 | 260 | down := make(chan interface{}, 5) | 261 | down := make(chan interface{}, 5) |
176 | 261 | tp := &testProtocol{up, down} | 262 | tp := &testProtocol{up, down} |
177 | 262 | sess := &testing.TestBrokerSession{} | 263 | sess := &testing.TestBrokerSession{} |
178 | 263 | go func() { | 264 | go func() { |
180 | 264 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 265 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
181 | 265 | }() | 266 | }() |
182 | 266 | c.Check(takeNext(down), Equals, "deadline 2ms") | 267 | c.Check(takeNext(down), Equals, "deadline 2ms") |
183 | 267 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) | 268 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) |
184 | @@ -294,18 +295,29 @@ | |||
185 | 294 | finErr error | 295 | finErr error |
186 | 295 | finSleep time.Duration | 296 | finSleep time.Duration |
187 | 296 | nParts int | 297 | nParts int |
188 | 298 | done chan interface{} | ||
189 | 297 | } | 299 | } |
190 | 298 | 300 | ||
191 | 299 | func (exchg *testExchange) Prepare(sess broker.BrokerSession) (outMsg protocol.SplittableMsg, inMsg interface{}, err error) { | 301 | func (exchg *testExchange) Prepare(sess broker.BrokerSession) (outMsg protocol.SplittableMsg, inMsg interface{}, err error) { |
192 | 300 | return &testMsg{Type: "msg", nParts: exchg.nParts}, &exchg.inMsg, exchg.prepErr | 302 | return &testMsg{Type: "msg", nParts: exchg.nParts}, &exchg.inMsg, exchg.prepErr |
193 | 301 | } | 303 | } |
194 | 302 | 304 | ||
196 | 303 | func (exchg *testExchange) Acked(sess broker.BrokerSession) error { | 305 | func (exchg *testExchange) Acked(sess broker.BrokerSession, done bool) error { |
197 | 304 | time.Sleep(exchg.finSleep) | 306 | time.Sleep(exchg.finSleep) |
198 | 307 | if exchg.done != nil { | ||
199 | 308 | var doneStr string | ||
200 | 309 | if done { | ||
201 | 310 | doneStr = "y" | ||
202 | 311 | } else { | ||
203 | 312 | doneStr = "n" | ||
204 | 313 | } | ||
205 | 314 | exchg.done <- doneStr | ||
206 | 315 | } | ||
207 | 305 | return exchg.finErr | 316 | return exchg.finErr |
208 | 306 | } | 317 | } |
209 | 307 | 318 | ||
210 | 308 | func (s *sessionSuite) TestSessionLoopExchange(c *C) { | 319 | func (s *sessionSuite) TestSessionLoopExchange(c *C) { |
211 | 320 | nopTrack := NewTracker(nopLogger) | ||
212 | 309 | errCh := make(chan error, 1) | 321 | errCh := make(chan error, 1) |
213 | 310 | up := make(chan interface{}, 5) | 322 | up := make(chan interface{}, 5) |
214 | 311 | down := make(chan interface{}, 5) | 323 | down := make(chan interface{}, 5) |
215 | @@ -314,7 +326,7 @@ | |||
216 | 314 | exchanges <- &testExchange{} | 326 | exchanges <- &testExchange{} |
217 | 315 | sess := &testing.TestBrokerSession{Exchanges: exchanges} | 327 | sess := &testing.TestBrokerSession{Exchanges: exchanges} |
218 | 316 | go func() { | 328 | go func() { |
220 | 317 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 329 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
221 | 318 | }() | 330 | }() |
222 | 319 | c.Check(takeNext(down), Equals, "deadline 2ms") | 331 | c.Check(takeNext(down), Equals, "deadline 2ms") |
223 | 320 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"}) | 332 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"}) |
224 | @@ -329,24 +341,28 @@ | |||
225 | 329 | } | 341 | } |
226 | 330 | 342 | ||
227 | 331 | func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) { | 343 | func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) { |
228 | 344 | nopTrack := NewTracker(nopLogger) | ||
229 | 332 | errCh := make(chan error, 1) | 345 | errCh := make(chan error, 1) |
230 | 333 | up := make(chan interface{}, 5) | 346 | up := make(chan interface{}, 5) |
231 | 334 | down := make(chan interface{}, 5) | 347 | down := make(chan interface{}, 5) |
232 | 335 | tp := &testProtocol{up, down} | 348 | tp := &testProtocol{up, down} |
233 | 336 | exchanges := make(chan broker.Exchange, 1) | 349 | exchanges := make(chan broker.Exchange, 1) |
235 | 337 | exchanges <- &testExchange{nParts: 2} | 350 | exchange := &testExchange{nParts: 2, done: make(chan interface{}, 2)} |
236 | 351 | exchanges <- exchange | ||
237 | 338 | sess := &testing.TestBrokerSession{Exchanges: exchanges} | 352 | sess := &testing.TestBrokerSession{Exchanges: exchanges} |
238 | 339 | go func() { | 353 | go func() { |
240 | 340 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 354 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
241 | 341 | }() | 355 | }() |
242 | 342 | c.Check(takeNext(down), Equals, "deadline 2ms") | 356 | c.Check(takeNext(down), Equals, "deadline 2ms") |
243 | 343 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 1, nParts: 2}) | 357 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 1, nParts: 2}) |
244 | 344 | up <- nil // no write error | 358 | up <- nil // no write error |
245 | 345 | up <- testMsg{Type: "ack"} | 359 | up <- testMsg{Type: "ack"} |
246 | 360 | c.Check(takeNext(exchange.done), Equals, "n") | ||
247 | 346 | c.Check(takeNext(down), Equals, "deadline 2ms") | 361 | c.Check(takeNext(down), Equals, "deadline 2ms") |
248 | 347 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 2, nParts: 2}) | 362 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 2, nParts: 2}) |
249 | 348 | up <- nil // no write error | 363 | up <- nil // no write error |
250 | 349 | up <- testMsg{Type: "ack"} | 364 | up <- testMsg{Type: "ack"} |
251 | 365 | c.Check(takeNext(exchange.done), Equals, "y") | ||
252 | 350 | c.Check(takeNext(down), Equals, "deadline 2ms") | 366 | c.Check(takeNext(down), Equals, "deadline 2ms") |
253 | 351 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) | 367 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) |
254 | 352 | up <- nil // no write error | 368 | up <- nil // no write error |
255 | @@ -356,6 +372,7 @@ | |||
256 | 356 | } | 372 | } |
257 | 357 | 373 | ||
258 | 358 | func (s *sessionSuite) TestSessionLoopExchangePrepareError(c *C) { | 374 | func (s *sessionSuite) TestSessionLoopExchangePrepareError(c *C) { |
259 | 375 | nopTrack := NewTracker(nopLogger) | ||
260 | 359 | errCh := make(chan error, 1) | 376 | errCh := make(chan error, 1) |
261 | 360 | up := make(chan interface{}, 5) | 377 | up := make(chan interface{}, 5) |
262 | 361 | down := make(chan interface{}, 5) | 378 | down := make(chan interface{}, 5) |
263 | @@ -365,13 +382,14 @@ | |||
264 | 365 | exchanges <- &testExchange{prepErr: prepErr} | 382 | exchanges <- &testExchange{prepErr: prepErr} |
265 | 366 | sess := &testing.TestBrokerSession{Exchanges: exchanges} | 383 | sess := &testing.TestBrokerSession{Exchanges: exchanges} |
266 | 367 | go func() { | 384 | go func() { |
268 | 368 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 385 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
269 | 369 | }() | 386 | }() |
270 | 370 | err := <-errCh | 387 | err := <-errCh |
271 | 371 | c.Check(err, Equals, prepErr) | 388 | c.Check(err, Equals, prepErr) |
272 | 372 | } | 389 | } |
273 | 373 | 390 | ||
274 | 374 | func (s *sessionSuite) TestSessionLoopExchangeAckedError(c *C) { | 391 | func (s *sessionSuite) TestSessionLoopExchangeAckedError(c *C) { |
275 | 392 | nopTrack := NewTracker(nopLogger) | ||
276 | 375 | errCh := make(chan error, 1) | 393 | errCh := make(chan error, 1) |
277 | 376 | up := make(chan interface{}, 5) | 394 | up := make(chan interface{}, 5) |
278 | 377 | down := make(chan interface{}, 5) | 395 | down := make(chan interface{}, 5) |
279 | @@ -381,7 +399,7 @@ | |||
280 | 381 | exchanges <- &testExchange{finErr: finErr} | 399 | exchanges <- &testExchange{finErr: finErr} |
281 | 382 | sess := &testing.TestBrokerSession{Exchanges: exchanges} | 400 | sess := &testing.TestBrokerSession{Exchanges: exchanges} |
282 | 383 | go func() { | 401 | go func() { |
284 | 384 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 402 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
285 | 385 | }() | 403 | }() |
286 | 386 | c.Check(takeNext(down), Equals, "deadline 2ms") | 404 | c.Check(takeNext(down), Equals, "deadline 2ms") |
287 | 387 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"}) | 405 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"}) |
288 | @@ -392,6 +410,7 @@ | |||
289 | 392 | } | 410 | } |
290 | 393 | 411 | ||
291 | 394 | func (s *sessionSuite) TestSessionLoopExchangeWriteError(c *C) { | 412 | func (s *sessionSuite) TestSessionLoopExchangeWriteError(c *C) { |
292 | 413 | nopTrack := NewTracker(nopLogger) | ||
293 | 395 | errCh := make(chan error, 1) | 414 | errCh := make(chan error, 1) |
294 | 396 | up := make(chan interface{}, 5) | 415 | up := make(chan interface{}, 5) |
295 | 397 | down := make(chan interface{}, 5) | 416 | down := make(chan interface{}, 5) |
296 | @@ -400,7 +419,7 @@ | |||
297 | 400 | exchanges <- &testExchange{} | 419 | exchanges <- &testExchange{} |
298 | 401 | sess := &testing.TestBrokerSession{Exchanges: exchanges} | 420 | sess := &testing.TestBrokerSession{Exchanges: exchanges} |
299 | 402 | go func() { | 421 | go func() { |
301 | 403 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 422 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
302 | 404 | }() | 423 | }() |
303 | 405 | c.Check(takeNext(down), Equals, "deadline 2ms") | 424 | c.Check(takeNext(down), Equals, "deadline 2ms") |
304 | 406 | c.Check(takeNext(down), FitsTypeOf, testMsg{}) | 425 | c.Check(takeNext(down), FitsTypeOf, testMsg{}) |
305 | @@ -409,25 +428,38 @@ | |||
306 | 409 | c.Check(err, Equals, io.ErrUnexpectedEOF) | 428 | c.Check(err, Equals, io.ErrUnexpectedEOF) |
307 | 410 | } | 429 | } |
308 | 411 | 430 | ||
309 | 431 | type testTracker struct { | ||
310 | 432 | SessionTracker | ||
311 | 433 | interval chan interface{} | ||
312 | 434 | } | ||
313 | 435 | |||
314 | 436 | func (trk *testTracker) EffectivePingInterval(interval time.Duration) { | ||
315 | 437 | trk.interval <- interval | ||
316 | 438 | } | ||
317 | 439 | |||
318 | 412 | func (s *sessionSuite) TestSessionLoopExchangeNextPing(c *C) { | 440 | func (s *sessionSuite) TestSessionLoopExchangeNextPing(c *C) { |
319 | 441 | track := &testTracker{NewTracker(nopLogger), make(chan interface{}, 1)} | ||
320 | 413 | errCh := make(chan error, 1) | 442 | errCh := make(chan error, 1) |
321 | 414 | up := make(chan interface{}, 5) | 443 | up := make(chan interface{}, 5) |
322 | 415 | down := make(chan interface{}, 5) | 444 | down := make(chan interface{}, 5) |
323 | 416 | tp := &testProtocol{up, down} | 445 | tp := &testProtocol{up, down} |
324 | 417 | exchanges := make(chan broker.Exchange, 1) | 446 | exchanges := make(chan broker.Exchange, 1) |
326 | 418 | exchanges <- &testExchange{finSleep: 3 * time.Millisecond} | 447 | exchanges <- &testExchange{finSleep: 6 * time.Millisecond} |
327 | 419 | sess := &testing.TestBrokerSession{Exchanges: exchanges} | 448 | sess := &testing.TestBrokerSession{Exchanges: exchanges} |
328 | 420 | go func() { | 449 | go func() { |
330 | 421 | errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout) | 450 | errCh <- sessionLoop(tp, sess, cfg10msPingInterval5msExchangeTout, track) |
331 | 422 | }() | 451 | }() |
333 | 423 | c.Check(takeNext(down), Equals, "deadline 2ms") | 452 | c.Check(takeNext(down), Equals, "deadline 5ms") |
334 | 424 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"}) | 453 | c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"}) |
335 | 425 | up <- nil // no write error | 454 | up <- nil // no write error |
336 | 426 | up <- testMsg{Type: "ack"} | 455 | up <- testMsg{Type: "ack"} |
339 | 427 | tack := time.Now() // next ping interval starts around here | 456 | // next ping interval starts around here |
340 | 428 | c.Check(takeNext(down), Equals, "deadline 2ms") | 457 | interval := takeNext(track.interval).(time.Duration) |
341 | 458 | c.Check(takeNext(down), Equals, "deadline 5ms") | ||
342 | 429 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) | 459 | c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) |
344 | 430 | c.Check(time.Since(tack) < (3+5)*time.Millisecond, Equals, true) | 460 | effectiveOfPing := float64(interval) / float64(10*time.Millisecond) |
345 | 461 | c.Check(effectiveOfPing > 0.95, Equals, true) | ||
346 | 462 | c.Check(effectiveOfPing < 1.15, Equals, true) | ||
347 | 431 | up <- nil // no write error | 463 | up <- nil // no write error |
348 | 432 | up <- io.EOF | 464 | up <- io.EOF |
349 | 433 | err := <-errCh | 465 | err := <-errCh |
350 | @@ -470,9 +502,9 @@ | |||
351 | 470 | return c.Conn.SetDeadline(t) | 502 | return c.Conn.SetDeadline(t) |
352 | 471 | } | 503 | } |
353 | 472 | 504 | ||
357 | 473 | var cfg5msPingInterval = &testSessionConfig{ | 505 | var cfg50msPingInterval = &testSessionConfig{ |
358 | 474 | pingInterval: 5 * time.Millisecond, | 506 | pingInterval: 50 * time.Millisecond, |
359 | 475 | exchangeTimeout: 5 * time.Millisecond, | 507 | exchangeTimeout: 10 * time.Millisecond, |
360 | 476 | } | 508 | } |
361 | 477 | 509 | ||
362 | 478 | var nopLogger = logger.NewSimpleLogger(ioutil.Discard, "error") | 510 | var nopLogger = logger.NewSimpleLogger(ioutil.Discard, "error") |
363 | @@ -486,7 +518,7 @@ | |||
364 | 486 | remSrv := &rememberDeadlineConn{srv, make([]string, 0, 2)} | 518 | remSrv := &rememberDeadlineConn{srv, make([]string, 0, 2)} |
365 | 487 | brkr := newTestBroker() | 519 | brkr := newTestBroker() |
366 | 488 | go func() { | 520 | go func() { |
368 | 489 | errCh <- Session(remSrv, brkr, cfg5msPingInterval, track) | 521 | errCh <- Session(remSrv, brkr, cfg50msPingInterval, track) |
369 | 490 | }() | 522 | }() |
370 | 491 | io.WriteString(cli, "\x00") | 523 | io.WriteString(cli, "\x00") |
371 | 492 | io.WriteString(cli, "\x00\x20{\"T\":\"connect\",\"DeviceId\":\"DEV\"}") | 524 | io.WriteString(cli, "\x00\x20{\"T\":\"connect\",\"DeviceId\":\"DEV\"}") |
372 | @@ -494,7 +526,7 @@ | |||
373 | 494 | downStream := bufio.NewReader(cli) | 526 | downStream := bufio.NewReader(cli) |
374 | 495 | msg, err := downStream.ReadBytes(byte('}')) | 527 | msg, err := downStream.ReadBytes(byte('}')) |
375 | 496 | c.Check(err, IsNil) | 528 | c.Check(err, IsNil) |
377 | 497 | c.Check(msg, DeepEquals, []byte("\x00\x2f{\"T\":\"connack\",\"Params\":{\"PingInterval\":\"5ms\"}")) | 529 | c.Check(msg, DeepEquals, []byte("\x00\x30{\"T\":\"connack\",\"Params\":{\"PingInterval\":\"50ms\"}")) |
378 | 498 | // eat the last } | 530 | // eat the last } |
379 | 499 | rbr, err := downStream.ReadByte() | 531 | rbr, err := downStream.ReadByte() |
380 | 500 | c.Check(err, IsNil) | 532 | c.Check(err, IsNil) |
381 | @@ -507,7 +539,7 @@ | |||
382 | 507 | c.Check(len(brkr.registration), Equals, 0) // not yet unregistered | 539 | c.Check(len(brkr.registration), Equals, 0) // not yet unregistered |
383 | 508 | cli.Close() | 540 | cli.Close() |
384 | 509 | err = <-errCh | 541 | err = <-errCh |
386 | 510 | c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both", "both"}) | 542 | c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both"}) |
387 | 511 | c.Check(err, Equals, io.EOF) | 543 | c.Check(err, Equals, io.EOF) |
388 | 512 | c.Check(takeNext(brkr.registration), Equals, "unregister DEV") | 544 | c.Check(takeNext(brkr.registration), Equals, "unregister DEV") |
389 | 513 | // tracking | 545 | // tracking |
390 | @@ -538,7 +570,7 @@ | |||
391 | 538 | defer lst.Close() | 570 | defer lst.Close() |
392 | 539 | brkr := newTestBroker() | 571 | brkr := newTestBroker() |
393 | 540 | go func() { | 572 | go func() { |
395 | 541 | errCh <- Session(srv, brkr, cfg5msPingInterval, track) | 573 | errCh <- Session(srv, brkr, cfg50msPingInterval, track) |
396 | 542 | }() | 574 | }() |
397 | 543 | io.WriteString(cli, "\x10") | 575 | io.WriteString(cli, "\x10") |
398 | 544 | err := <-errCh | 576 | err := <-errCh |
399 | @@ -557,7 +589,7 @@ | |||
400 | 557 | defer lst.Close() | 589 | defer lst.Close() |
401 | 558 | brkr := newTestBroker() | 590 | brkr := newTestBroker() |
402 | 559 | go func() { | 591 | go func() { |
404 | 560 | errCh <- Session(srv, brkr, cfg5msPingInterval, track) | 592 | errCh <- Session(srv, brkr, cfg50msPingInterval, track) |
405 | 561 | }() | 593 | }() |
406 | 562 | cli.Close() | 594 | cli.Close() |
407 | 563 | err := <-errCh | 595 | err := <-errCh |
408 | @@ -575,7 +607,7 @@ | |||
409 | 575 | defer lst.Close() | 607 | defer lst.Close() |
410 | 576 | brkr := newTestBroker() | 608 | brkr := newTestBroker() |
411 | 577 | go func() { | 609 | go func() { |
413 | 578 | errCh <- Session(srv, brkr, cfg5msPingInterval, track) | 610 | errCh <- Session(srv, brkr, cfg50msPingInterval, track) |
414 | 579 | }() | 611 | }() |
415 | 580 | io.WriteString(cli, "\x00") | 612 | io.WriteString(cli, "\x00") |
416 | 581 | io.WriteString(cli, "\x00") | 613 | io.WriteString(cli, "\x00") |
417 | 582 | 614 | ||
418 | === modified file 'server/session/tracker.go' | |||
419 | --- server/session/tracker.go 2014-01-23 20:13:22 +0000 | |||
420 | +++ server/session/tracker.go 2014-01-31 17:25:49 +0000 | |||
421 | @@ -30,6 +30,8 @@ | |||
422 | 30 | Start(WithRemoteAddr) | 30 | Start(WithRemoteAddr) |
423 | 31 | // Session got registered with broker as sess BrokerSession. | 31 | // Session got registered with broker as sess BrokerSession. |
424 | 32 | Registered(sess broker.BrokerSession) | 32 | Registered(sess broker.BrokerSession) |
425 | 33 | // Report effective elapsed ping interval. | ||
426 | 34 | EffectivePingInterval(time.Duration) | ||
427 | 33 | // Session got ended with error err (can be nil). | 35 | // Session got ended with error err (can be nil). |
428 | 34 | End(error) error | 36 | End(error) error |
429 | 35 | } | 37 | } |
430 | @@ -60,6 +62,9 @@ | |||
431 | 60 | trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceIdentifier()) | 62 | trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceIdentifier()) |
432 | 61 | } | 63 | } |
433 | 62 | 64 | ||
434 | 65 | func (trk *tracker) EffectivePingInterval(time.Duration) { | ||
435 | 66 | } | ||
436 | 67 | |||
437 | 63 | func (trk *tracker) End(err error) error { | 68 | func (trk *tracker) End(err error) error { |
438 | 64 | trk.Debugf("session(%x) ended with: %v", trk.sessionId, err) | 69 | trk.Debugf("session(%x) ended with: %v", trk.sessionId, err) |
439 | 65 | return err | 70 | return err |