Merge lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 41
Merged at revision: 38
Proposed branch: lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks
Merge into: lp:ubuntu-push
Diff against target: 439 lines (+79/-37)
6 files modified
server/broker/broker.go (+2/-2)
server/broker/exchanges.go (+4/-1)
server/broker/exchanges_test.go (+3/-3)
server/session/session.go (+6/-4)
server/session/session_test.go (+59/-27)
server/session/tracker.go (+5/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+204290@code.launchpad.net

Commit message

some more info reported out of session, some tweaks

Description of the change

various bits and bobs but there's logic to the madness:

* increase ping interval/timeout in some test relying in actual timeouts to be less aggressive

* tell the exchange whether it's fully done in Acked

* have the session report what was the effective elapsed ping interval

* remove a SetDeadline too much, exchange timeout is meant to cover pairs read-write, write-read

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'server/broker/broker.go'
--- server/broker/broker.go 2014-01-23 20:13:22 +0000
+++ server/broker/broker.go 2014-01-31 17:25:49 +0000
@@ -40,8 +40,8 @@
4040
41// Exchange leads the session through performing an exchange, typically delivery.41// Exchange leads the session through performing an exchange, typically delivery.
42type Exchange interface {42type Exchange interface {
43 Prepare(BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error)43 Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error)
44 Acked(BrokerSession) error44 Acked(sess BrokerSession, done bool) error
45}45}
4646
47// LevelsMap is the type for holding channel levels for session.47// LevelsMap is the type for holding channel levels for session.
4848
=== modified file 'server/broker/exchanges.go'
--- server/broker/exchanges.go 2014-01-23 18:39:10 +0000
+++ server/broker/exchanges.go 2014-01-31 17:25:49 +0000
@@ -39,6 +39,9 @@
39 NotificationPayloads []json.RawMessage39 NotificationPayloads []json.RawMessage
40}40}
4141
42// check interface already here
43var _ Exchange = &BroadcastExchange{}
44
42func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {45func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
43 c := int64(len(payloads))46 c := int64(len(payloads))
44 delta := topLevel - clientLevel47 delta := topLevel - clientLevel
@@ -63,7 +66,7 @@
63}66}
6467
65// Acked deals with an ACK for a BROADCAST.68// Acked deals with an ACK for a BROADCAST.
66func (sbe *BroadcastExchange) Acked(sess BrokerSession) error {69func (sbe *BroadcastExchange) Acked(sess BrokerSession, done bool) error {
67 scratchArea := sess.ExchangeScratchArea()70 scratchArea := sess.ExchangeScratchArea()
68 if scratchArea.ackMsg.Type != "ack" {71 if scratchArea.ackMsg.Type != "ack" {
69 return &ErrAbort{"expected ACK message"}72 return &ErrAbort{"expected ACK message"}
7073
=== modified file 'server/broker/exchanges_test.go'
--- server/broker/exchanges_test.go 2014-01-23 20:13:22 +0000
+++ server/broker/exchanges_test.go 2014-01-31 17:25:49 +0000
@@ -52,7 +52,7 @@
52 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)52 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
53 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)53 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
54 c.Assert(err, IsNil)54 c.Assert(err, IsNil)
55 err = exchg.Acked(sess)55 err = exchg.Acked(sess, true)
56 c.Assert(err, IsNil)56 c.Assert(err, IsNil)
57 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))57 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
58}58}
@@ -76,7 +76,7 @@
76 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)76 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
77 err = json.Unmarshal([]byte(`{}`), outMsg)77 err = json.Unmarshal([]byte(`{}`), outMsg)
78 c.Assert(err, IsNil)78 c.Assert(err, IsNil)
79 err = exchg.Acked(sess)79 err = exchg.Acked(sess, true)
80 c.Assert(err, Not(IsNil))80 c.Assert(err, Not(IsNil))
81 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(0))81 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(0))
82}82}
@@ -103,6 +103,6 @@
103 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)103 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
104 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)104 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
105 c.Assert(err, IsNil)105 c.Assert(err, IsNil)
106 err = exchg.Acked(sess)106 err = exchg.Acked(sess, true)
107 c.Assert(err, IsNil)107 c.Assert(err, IsNil)
108}108}
109109
=== modified file 'server/session/session.go'
--- server/session/session.go 2014-01-15 15:54:20 +0000
+++ server/session/session.go 2014-01-31 17:25:49 +0000
@@ -44,7 +44,6 @@
44 if connMsg.Type != "connect" {44 if connMsg.Type != "connect" {
45 return nil, &broker.ErrAbort{"expected CONNECT message"}45 return nil, &broker.ErrAbort{"expected CONNECT message"}
46 }46 }
47 proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout()))
48 err = proto.WriteMessage(&protocol.ConnAckMsg{47 err = proto.WriteMessage(&protocol.ConnAckMsg{
49 Type: "connack",48 Type: "connack",
50 Params: protocol.ConnAckParams{PingInterval: cfg.PingInterval().String()},49 Params: protocol.ConnAckParams{PingInterval: cfg.PingInterval().String()},
@@ -70,14 +69,16 @@
70}69}
7170
72// sessionLoop manages the exchanges of the protocol session.71// sessionLoop manages the exchanges of the protocol session.
73func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig) error {72func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig, track SessionTracker) error {
74 pingInterval := cfg.PingInterval()73 pingInterval := cfg.PingInterval()
75 exchangeTimeout := cfg.ExchangeTimeout()74 exchangeTimeout := cfg.ExchangeTimeout()
76 pingTimer := time.NewTimer(pingInterval)75 pingTimer := time.NewTimer(pingInterval)
76 intervalStart := time.Now()
77 ch := sess.SessionChannel()77 ch := sess.SessionChannel()
78 for {78 for {
79 select {79 select {
80 case <-pingTimer.C:80 case <-pingTimer.C:
81 track.EffectivePingInterval(time.Since(intervalStart))
81 pingMsg := &protocol.PingPongMsg{"ping"}82 pingMsg := &protocol.PingPongMsg{"ping"}
82 var pongMsg protocol.PingPongMsg83 var pongMsg protocol.PingPongMsg
83 err := exchange(proto, pingMsg, &pongMsg, exchangeTimeout)84 err := exchange(proto, pingMsg, &pongMsg, exchangeTimeout)
@@ -103,8 +104,9 @@
103 }104 }
104 if done {105 if done {
105 pingTimer.Reset(pingInterval)106 pingTimer.Reset(pingInterval)
107 intervalStart = time.Now()
106 }108 }
107 err = exchg.Acked(sess)109 err = exchg.Acked(sess, done)
108 if err != nil {110 if err != nil {
109 return err111 return err
110 }112 }
@@ -134,5 +136,5 @@
134 }136 }
135 track.Registered(sess)137 track.Registered(sess)
136 defer brkr.Unregister(sess)138 defer brkr.Unregister(sess)
137 return track.End(sessionLoop(proto, sess, cfg))139 return track.End(sessionLoop(proto, sess, cfg, track))
138}140}
139141
=== modified file 'server/session/session_test.go'
--- server/session/session_test.go 2014-01-23 20:13:22 +0000
+++ server/session/session_test.go 2014-01-31 17:25:49 +0000
@@ -147,7 +147,6 @@
147 }()147 }()
148 c.Check(takeNext(down), Equals, "deadline 5ms")148 c.Check(takeNext(down), Equals, "deadline 5ms")
149 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}149 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}
150 c.Check(takeNext(down), Equals, "deadline 5ms")
151 c.Check(takeNext(down), Equals, protocol.ConnAckMsg{150 c.Check(takeNext(down), Equals, protocol.ConnAckMsg{
152 Type: "connack",151 Type: "connack",
153 Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},152 Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},
@@ -199,7 +198,6 @@
199 c.Check(err, Equals, io.ErrUnexpectedEOF)198 c.Check(err, Equals, io.ErrUnexpectedEOF)
200 // sanity199 // sanity
201 c.Check(takeNext(down), Matches, "deadline.*")200 c.Check(takeNext(down), Matches, "deadline.*")
202 c.Check(takeNext(down), Matches, "deadline.*")
203 c.Check(takeNext(down), FitsTypeOf, protocol.ConnAckMsg{})201 c.Check(takeNext(down), FitsTypeOf, protocol.ConnAckMsg{})
204}202}
205203
@@ -218,13 +216,14 @@
218}216}
219217
220func (s *sessionSuite) TestSessionLoop(c *C) {218func (s *sessionSuite) TestSessionLoop(c *C) {
219 nopTrack := NewTracker(nopLogger)
221 errCh := make(chan error, 1)220 errCh := make(chan error, 1)
222 up := make(chan interface{}, 5)221 up := make(chan interface{}, 5)
223 down := make(chan interface{}, 5)222 down := make(chan interface{}, 5)
224 tp := &testProtocol{up, down}223 tp := &testProtocol{up, down}
225 sess := &testing.TestBrokerSession{}224 sess := &testing.TestBrokerSession{}
226 go func() {225 go func() {
227 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)226 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
228 }()227 }()
229 c.Check(takeNext(down), Equals, "deadline 2ms")228 c.Check(takeNext(down), Equals, "deadline 2ms")
230 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})229 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
@@ -239,13 +238,14 @@
239}238}
240239
241func (s *sessionSuite) TestSessionLoopWriteError(c *C) {240func (s *sessionSuite) TestSessionLoopWriteError(c *C) {
241 nopTrack := NewTracker(nopLogger)
242 errCh := make(chan error, 1)242 errCh := make(chan error, 1)
243 up := make(chan interface{}, 5)243 up := make(chan interface{}, 5)
244 down := make(chan interface{}, 5)244 down := make(chan interface{}, 5)
245 tp := &testProtocol{up, down}245 tp := &testProtocol{up, down}
246 sess := &testing.TestBrokerSession{}246 sess := &testing.TestBrokerSession{}
247 go func() {247 go func() {
248 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)248 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
249 }()249 }()
250 c.Check(takeNext(down), Equals, "deadline 2ms")250 c.Check(takeNext(down), Equals, "deadline 2ms")
251 c.Check(takeNext(down), FitsTypeOf, protocol.PingPongMsg{})251 c.Check(takeNext(down), FitsTypeOf, protocol.PingPongMsg{})
@@ -255,13 +255,14 @@
255}255}
256256
257func (s *sessionSuite) TestSessionLoopMismatch(c *C) {257func (s *sessionSuite) TestSessionLoopMismatch(c *C) {
258 nopTrack := NewTracker(nopLogger)
258 errCh := make(chan error, 1)259 errCh := make(chan error, 1)
259 up := make(chan interface{}, 5)260 up := make(chan interface{}, 5)
260 down := make(chan interface{}, 5)261 down := make(chan interface{}, 5)
261 tp := &testProtocol{up, down}262 tp := &testProtocol{up, down}
262 sess := &testing.TestBrokerSession{}263 sess := &testing.TestBrokerSession{}
263 go func() {264 go func() {
264 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)265 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
265 }()266 }()
266 c.Check(takeNext(down), Equals, "deadline 2ms")267 c.Check(takeNext(down), Equals, "deadline 2ms")
267 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})268 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
@@ -294,18 +295,29 @@
294 finErr error295 finErr error
295 finSleep time.Duration296 finSleep time.Duration
296 nParts int297 nParts int
298 done chan interface{}
297}299}
298300
299func (exchg *testExchange) Prepare(sess broker.BrokerSession) (outMsg protocol.SplittableMsg, inMsg interface{}, err error) {301func (exchg *testExchange) Prepare(sess broker.BrokerSession) (outMsg protocol.SplittableMsg, inMsg interface{}, err error) {
300 return &testMsg{Type: "msg", nParts: exchg.nParts}, &exchg.inMsg, exchg.prepErr302 return &testMsg{Type: "msg", nParts: exchg.nParts}, &exchg.inMsg, exchg.prepErr
301}303}
302304
303func (exchg *testExchange) Acked(sess broker.BrokerSession) error {305func (exchg *testExchange) Acked(sess broker.BrokerSession, done bool) error {
304 time.Sleep(exchg.finSleep)306 time.Sleep(exchg.finSleep)
307 if exchg.done != nil {
308 var doneStr string
309 if done {
310 doneStr = "y"
311 } else {
312 doneStr = "n"
313 }
314 exchg.done <- doneStr
315 }
305 return exchg.finErr316 return exchg.finErr
306}317}
307318
308func (s *sessionSuite) TestSessionLoopExchange(c *C) {319func (s *sessionSuite) TestSessionLoopExchange(c *C) {
320 nopTrack := NewTracker(nopLogger)
309 errCh := make(chan error, 1)321 errCh := make(chan error, 1)
310 up := make(chan interface{}, 5)322 up := make(chan interface{}, 5)
311 down := make(chan interface{}, 5)323 down := make(chan interface{}, 5)
@@ -314,7 +326,7 @@
314 exchanges <- &testExchange{}326 exchanges <- &testExchange{}
315 sess := &testing.TestBrokerSession{Exchanges: exchanges}327 sess := &testing.TestBrokerSession{Exchanges: exchanges}
316 go func() {328 go func() {
317 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)329 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
318 }()330 }()
319 c.Check(takeNext(down), Equals, "deadline 2ms")331 c.Check(takeNext(down), Equals, "deadline 2ms")
320 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})332 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})
@@ -329,24 +341,28 @@
329}341}
330342
331func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) {343func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) {
344 nopTrack := NewTracker(nopLogger)
332 errCh := make(chan error, 1)345 errCh := make(chan error, 1)
333 up := make(chan interface{}, 5)346 up := make(chan interface{}, 5)
334 down := make(chan interface{}, 5)347 down := make(chan interface{}, 5)
335 tp := &testProtocol{up, down}348 tp := &testProtocol{up, down}
336 exchanges := make(chan broker.Exchange, 1)349 exchanges := make(chan broker.Exchange, 1)
337 exchanges <- &testExchange{nParts: 2}350 exchange := &testExchange{nParts: 2, done: make(chan interface{}, 2)}
351 exchanges <- exchange
338 sess := &testing.TestBrokerSession{Exchanges: exchanges}352 sess := &testing.TestBrokerSession{Exchanges: exchanges}
339 go func() {353 go func() {
340 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)354 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
341 }()355 }()
342 c.Check(takeNext(down), Equals, "deadline 2ms")356 c.Check(takeNext(down), Equals, "deadline 2ms")
343 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 1, nParts: 2})357 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 1, nParts: 2})
344 up <- nil // no write error358 up <- nil // no write error
345 up <- testMsg{Type: "ack"}359 up <- testMsg{Type: "ack"}
360 c.Check(takeNext(exchange.done), Equals, "n")
346 c.Check(takeNext(down), Equals, "deadline 2ms")361 c.Check(takeNext(down), Equals, "deadline 2ms")
347 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 2, nParts: 2})362 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 2, nParts: 2})
348 up <- nil // no write error363 up <- nil // no write error
349 up <- testMsg{Type: "ack"}364 up <- testMsg{Type: "ack"}
365 c.Check(takeNext(exchange.done), Equals, "y")
350 c.Check(takeNext(down), Equals, "deadline 2ms")366 c.Check(takeNext(down), Equals, "deadline 2ms")
351 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})367 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
352 up <- nil // no write error368 up <- nil // no write error
@@ -356,6 +372,7 @@
356}372}
357373
358func (s *sessionSuite) TestSessionLoopExchangePrepareError(c *C) {374func (s *sessionSuite) TestSessionLoopExchangePrepareError(c *C) {
375 nopTrack := NewTracker(nopLogger)
359 errCh := make(chan error, 1)376 errCh := make(chan error, 1)
360 up := make(chan interface{}, 5)377 up := make(chan interface{}, 5)
361 down := make(chan interface{}, 5)378 down := make(chan interface{}, 5)
@@ -365,13 +382,14 @@
365 exchanges <- &testExchange{prepErr: prepErr}382 exchanges <- &testExchange{prepErr: prepErr}
366 sess := &testing.TestBrokerSession{Exchanges: exchanges}383 sess := &testing.TestBrokerSession{Exchanges: exchanges}
367 go func() {384 go func() {
368 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)385 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
369 }()386 }()
370 err := <-errCh387 err := <-errCh
371 c.Check(err, Equals, prepErr)388 c.Check(err, Equals, prepErr)
372}389}
373390
374func (s *sessionSuite) TestSessionLoopExchangeAckedError(c *C) {391func (s *sessionSuite) TestSessionLoopExchangeAckedError(c *C) {
392 nopTrack := NewTracker(nopLogger)
375 errCh := make(chan error, 1)393 errCh := make(chan error, 1)
376 up := make(chan interface{}, 5)394 up := make(chan interface{}, 5)
377 down := make(chan interface{}, 5)395 down := make(chan interface{}, 5)
@@ -381,7 +399,7 @@
381 exchanges <- &testExchange{finErr: finErr}399 exchanges <- &testExchange{finErr: finErr}
382 sess := &testing.TestBrokerSession{Exchanges: exchanges}400 sess := &testing.TestBrokerSession{Exchanges: exchanges}
383 go func() {401 go func() {
384 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)402 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
385 }()403 }()
386 c.Check(takeNext(down), Equals, "deadline 2ms")404 c.Check(takeNext(down), Equals, "deadline 2ms")
387 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})405 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})
@@ -392,6 +410,7 @@
392}410}
393411
394func (s *sessionSuite) TestSessionLoopExchangeWriteError(c *C) {412func (s *sessionSuite) TestSessionLoopExchangeWriteError(c *C) {
413 nopTrack := NewTracker(nopLogger)
395 errCh := make(chan error, 1)414 errCh := make(chan error, 1)
396 up := make(chan interface{}, 5)415 up := make(chan interface{}, 5)
397 down := make(chan interface{}, 5)416 down := make(chan interface{}, 5)
@@ -400,7 +419,7 @@
400 exchanges <- &testExchange{}419 exchanges <- &testExchange{}
401 sess := &testing.TestBrokerSession{Exchanges: exchanges}420 sess := &testing.TestBrokerSession{Exchanges: exchanges}
402 go func() {421 go func() {
403 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)422 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
404 }()423 }()
405 c.Check(takeNext(down), Equals, "deadline 2ms")424 c.Check(takeNext(down), Equals, "deadline 2ms")
406 c.Check(takeNext(down), FitsTypeOf, testMsg{})425 c.Check(takeNext(down), FitsTypeOf, testMsg{})
@@ -409,25 +428,38 @@
409 c.Check(err, Equals, io.ErrUnexpectedEOF)428 c.Check(err, Equals, io.ErrUnexpectedEOF)
410}429}
411430
431type testTracker struct {
432 SessionTracker
433 interval chan interface{}
434}
435
436func (trk *testTracker) EffectivePingInterval(interval time.Duration) {
437 trk.interval <- interval
438}
439
412func (s *sessionSuite) TestSessionLoopExchangeNextPing(c *C) {440func (s *sessionSuite) TestSessionLoopExchangeNextPing(c *C) {
441 track := &testTracker{NewTracker(nopLogger), make(chan interface{}, 1)}
413 errCh := make(chan error, 1)442 errCh := make(chan error, 1)
414 up := make(chan interface{}, 5)443 up := make(chan interface{}, 5)
415 down := make(chan interface{}, 5)444 down := make(chan interface{}, 5)
416 tp := &testProtocol{up, down}445 tp := &testProtocol{up, down}
417 exchanges := make(chan broker.Exchange, 1)446 exchanges := make(chan broker.Exchange, 1)
418 exchanges <- &testExchange{finSleep: 3 * time.Millisecond}447 exchanges <- &testExchange{finSleep: 6 * time.Millisecond}
419 sess := &testing.TestBrokerSession{Exchanges: exchanges}448 sess := &testing.TestBrokerSession{Exchanges: exchanges}
420 go func() {449 go func() {
421 errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)450 errCh <- sessionLoop(tp, sess, cfg10msPingInterval5msExchangeTout, track)
422 }()451 }()
423 c.Check(takeNext(down), Equals, "deadline 2ms")452 c.Check(takeNext(down), Equals, "deadline 5ms")
424 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})453 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})
425 up <- nil // no write error454 up <- nil // no write error
426 up <- testMsg{Type: "ack"}455 up <- testMsg{Type: "ack"}
427 tack := time.Now() // next ping interval starts around here456 // next ping interval starts around here
428 c.Check(takeNext(down), Equals, "deadline 2ms")457 interval := takeNext(track.interval).(time.Duration)
458 c.Check(takeNext(down), Equals, "deadline 5ms")
429 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})459 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
430 c.Check(time.Since(tack) < (3+5)*time.Millisecond, Equals, true)460 effectiveOfPing := float64(interval) / float64(10*time.Millisecond)
461 c.Check(effectiveOfPing > 0.95, Equals, true)
462 c.Check(effectiveOfPing < 1.15, Equals, true)
431 up <- nil // no write error463 up <- nil // no write error
432 up <- io.EOF464 up <- io.EOF
433 err := <-errCh465 err := <-errCh
@@ -470,9 +502,9 @@
470 return c.Conn.SetDeadline(t)502 return c.Conn.SetDeadline(t)
471}503}
472504
473var cfg5msPingInterval = &testSessionConfig{505var cfg50msPingInterval = &testSessionConfig{
474 pingInterval: 5 * time.Millisecond,506 pingInterval: 50 * time.Millisecond,
475 exchangeTimeout: 5 * time.Millisecond,507 exchangeTimeout: 10 * time.Millisecond,
476}508}
477509
478var nopLogger = logger.NewSimpleLogger(ioutil.Discard, "error")510var nopLogger = logger.NewSimpleLogger(ioutil.Discard, "error")
@@ -486,7 +518,7 @@
486 remSrv := &rememberDeadlineConn{srv, make([]string, 0, 2)}518 remSrv := &rememberDeadlineConn{srv, make([]string, 0, 2)}
487 brkr := newTestBroker()519 brkr := newTestBroker()
488 go func() {520 go func() {
489 errCh <- Session(remSrv, brkr, cfg5msPingInterval, track)521 errCh <- Session(remSrv, brkr, cfg50msPingInterval, track)
490 }()522 }()
491 io.WriteString(cli, "\x00")523 io.WriteString(cli, "\x00")
492 io.WriteString(cli, "\x00\x20{\"T\":\"connect\",\"DeviceId\":\"DEV\"}")524 io.WriteString(cli, "\x00\x20{\"T\":\"connect\",\"DeviceId\":\"DEV\"}")
@@ -494,7 +526,7 @@
494 downStream := bufio.NewReader(cli)526 downStream := bufio.NewReader(cli)
495 msg, err := downStream.ReadBytes(byte('}'))527 msg, err := downStream.ReadBytes(byte('}'))
496 c.Check(err, IsNil)528 c.Check(err, IsNil)
497 c.Check(msg, DeepEquals, []byte("\x00\x2f{\"T\":\"connack\",\"Params\":{\"PingInterval\":\"5ms\"}"))529 c.Check(msg, DeepEquals, []byte("\x00\x30{\"T\":\"connack\",\"Params\":{\"PingInterval\":\"50ms\"}"))
498 // eat the last }530 // eat the last }
499 rbr, err := downStream.ReadByte()531 rbr, err := downStream.ReadByte()
500 c.Check(err, IsNil)532 c.Check(err, IsNil)
@@ -507,7 +539,7 @@
507 c.Check(len(brkr.registration), Equals, 0) // not yet unregistered539 c.Check(len(brkr.registration), Equals, 0) // not yet unregistered
508 cli.Close()540 cli.Close()
509 err = <-errCh541 err = <-errCh
510 c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both", "both"})542 c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both"})
511 c.Check(err, Equals, io.EOF)543 c.Check(err, Equals, io.EOF)
512 c.Check(takeNext(brkr.registration), Equals, "unregister DEV")544 c.Check(takeNext(brkr.registration), Equals, "unregister DEV")
513 // tracking545 // tracking
@@ -538,7 +570,7 @@
538 defer lst.Close()570 defer lst.Close()
539 brkr := newTestBroker()571 brkr := newTestBroker()
540 go func() {572 go func() {
541 errCh <- Session(srv, brkr, cfg5msPingInterval, track)573 errCh <- Session(srv, brkr, cfg50msPingInterval, track)
542 }()574 }()
543 io.WriteString(cli, "\x10")575 io.WriteString(cli, "\x10")
544 err := <-errCh576 err := <-errCh
@@ -557,7 +589,7 @@
557 defer lst.Close()589 defer lst.Close()
558 brkr := newTestBroker()590 brkr := newTestBroker()
559 go func() {591 go func() {
560 errCh <- Session(srv, brkr, cfg5msPingInterval, track)592 errCh <- Session(srv, brkr, cfg50msPingInterval, track)
561 }()593 }()
562 cli.Close()594 cli.Close()
563 err := <-errCh595 err := <-errCh
@@ -575,7 +607,7 @@
575 defer lst.Close()607 defer lst.Close()
576 brkr := newTestBroker()608 brkr := newTestBroker()
577 go func() {609 go func() {
578 errCh <- Session(srv, brkr, cfg5msPingInterval, track)610 errCh <- Session(srv, brkr, cfg50msPingInterval, track)
579 }()611 }()
580 io.WriteString(cli, "\x00")612 io.WriteString(cli, "\x00")
581 io.WriteString(cli, "\x00")613 io.WriteString(cli, "\x00")
582614
=== modified file 'server/session/tracker.go'
--- server/session/tracker.go 2014-01-23 20:13:22 +0000
+++ server/session/tracker.go 2014-01-31 17:25:49 +0000
@@ -30,6 +30,8 @@
30 Start(WithRemoteAddr)30 Start(WithRemoteAddr)
31 // Session got registered with broker as sess BrokerSession.31 // Session got registered with broker as sess BrokerSession.
32 Registered(sess broker.BrokerSession)32 Registered(sess broker.BrokerSession)
33 // Report effective elapsed ping interval.
34 EffectivePingInterval(time.Duration)
33 // Session got ended with error err (can be nil).35 // Session got ended with error err (can be nil).
34 End(error) error36 End(error) error
35}37}
@@ -60,6 +62,9 @@
60 trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceIdentifier())62 trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceIdentifier())
61}63}
6264
65func (trk *tracker) EffectivePingInterval(time.Duration) {
66}
67
63func (trk *tracker) End(err error) error {68func (trk *tracker) End(err error) error {
64 trk.Debugf("session(%x) ended with: %v", trk.sessionId, err)69 trk.Debugf("session(%x) ended with: %v", trk.sessionId, err)
65 return err70 return err

Subscribers

People subscribed via source and target branches