Merge lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Merged
Approved by: Samuele Pedroni
Approved revision: 41
Merged at revision: 38
Proposed branch: lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks
Merge into: lp:ubuntu-push
Diff against target: 439 lines (+79/-37)
6 files modified
server/broker/broker.go (+2/-2)
server/broker/exchanges.go (+4/-1)
server/broker/exchanges_test.go (+3/-3)
server/session/session.go (+6/-4)
server/session/session_test.go (+59/-27)
server/session/tracker.go (+5/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/little-bits-of-info-n-tweaks
Reviewer Review Type Date Requested Status
John Lenton (community) Approve
Review via email: mp+204290@code.launchpad.net

Commit message

some more info reported out of session, some tweaks

Description of the change

various bits and bobs but there's logic to the madness:

* increase ping interval/timeout in some test relying in actual timeouts to be less aggressive

* tell the exchange whether it's fully done in Acked

* have the session report what was the effective elapsed ping interval

* remove a SetDeadline too much, exchange timeout is meant to cover pairs read-write, write-read

To post a comment you must log in.
Revision history for this message
John Lenton (chipaca) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'server/broker/broker.go'
2--- server/broker/broker.go 2014-01-23 20:13:22 +0000
3+++ server/broker/broker.go 2014-01-31 17:25:49 +0000
4@@ -40,8 +40,8 @@
5
6 // Exchange leads the session through performing an exchange, typically delivery.
7 type Exchange interface {
8- Prepare(BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error)
9- Acked(BrokerSession) error
10+ Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error)
11+ Acked(sess BrokerSession, done bool) error
12 }
13
14 // LevelsMap is the type for holding channel levels for session.
15
16=== modified file 'server/broker/exchanges.go'
17--- server/broker/exchanges.go 2014-01-23 18:39:10 +0000
18+++ server/broker/exchanges.go 2014-01-31 17:25:49 +0000
19@@ -39,6 +39,9 @@
20 NotificationPayloads []json.RawMessage
21 }
22
23+// check interface already here
24+var _ Exchange = &BroadcastExchange{}
25+
26 func filterByLevel(clientLevel, topLevel int64, payloads []json.RawMessage) []json.RawMessage {
27 c := int64(len(payloads))
28 delta := topLevel - clientLevel
29@@ -63,7 +66,7 @@
30 }
31
32 // Acked deals with an ACK for a BROADCAST.
33-func (sbe *BroadcastExchange) Acked(sess BrokerSession) error {
34+func (sbe *BroadcastExchange) Acked(sess BrokerSession, done bool) error {
35 scratchArea := sess.ExchangeScratchArea()
36 if scratchArea.ackMsg.Type != "ack" {
37 return &ErrAbort{"expected ACK message"}
38
39=== modified file 'server/broker/exchanges_test.go'
40--- server/broker/exchanges_test.go 2014-01-23 20:13:22 +0000
41+++ server/broker/exchanges_test.go 2014-01-31 17:25:49 +0000
42@@ -52,7 +52,7 @@
43 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"x"},{"a":"y"}]}`)
44 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
45 c.Assert(err, IsNil)
46- err = exchg.Acked(sess)
47+ err = exchg.Acked(sess, true)
48 c.Assert(err, IsNil)
49 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
50 }
51@@ -76,7 +76,7 @@
52 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
53 err = json.Unmarshal([]byte(`{}`), outMsg)
54 c.Assert(err, IsNil)
55- err = exchg.Acked(sess)
56+ err = exchg.Acked(sess, true)
57 c.Assert(err, Not(IsNil))
58 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(0))
59 }
60@@ -103,6 +103,6 @@
61 c.Check(string(marshalled), Equals, `{"T":"broadcast","ChanId":"0","TopLevel":3,"Payloads":[{"a":"y"}]}`)
62 err = json.Unmarshal([]byte(`{"T":"ack"}`), outMsg)
63 c.Assert(err, IsNil)
64- err = exchg.Acked(sess)
65+ err = exchg.Acked(sess, true)
66 c.Assert(err, IsNil)
67 }
68
69=== modified file 'server/session/session.go'
70--- server/session/session.go 2014-01-15 15:54:20 +0000
71+++ server/session/session.go 2014-01-31 17:25:49 +0000
72@@ -44,7 +44,6 @@
73 if connMsg.Type != "connect" {
74 return nil, &broker.ErrAbort{"expected CONNECT message"}
75 }
76- proto.SetDeadline(time.Now().Add(cfg.ExchangeTimeout()))
77 err = proto.WriteMessage(&protocol.ConnAckMsg{
78 Type: "connack",
79 Params: protocol.ConnAckParams{PingInterval: cfg.PingInterval().String()},
80@@ -70,14 +69,16 @@
81 }
82
83 // sessionLoop manages the exchanges of the protocol session.
84-func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig) error {
85+func sessionLoop(proto protocol.Protocol, sess broker.BrokerSession, cfg SessionConfig, track SessionTracker) error {
86 pingInterval := cfg.PingInterval()
87 exchangeTimeout := cfg.ExchangeTimeout()
88 pingTimer := time.NewTimer(pingInterval)
89+ intervalStart := time.Now()
90 ch := sess.SessionChannel()
91 for {
92 select {
93 case <-pingTimer.C:
94+ track.EffectivePingInterval(time.Since(intervalStart))
95 pingMsg := &protocol.PingPongMsg{"ping"}
96 var pongMsg protocol.PingPongMsg
97 err := exchange(proto, pingMsg, &pongMsg, exchangeTimeout)
98@@ -103,8 +104,9 @@
99 }
100 if done {
101 pingTimer.Reset(pingInterval)
102+ intervalStart = time.Now()
103 }
104- err = exchg.Acked(sess)
105+ err = exchg.Acked(sess, done)
106 if err != nil {
107 return err
108 }
109@@ -134,5 +136,5 @@
110 }
111 track.Registered(sess)
112 defer brkr.Unregister(sess)
113- return track.End(sessionLoop(proto, sess, cfg))
114+ return track.End(sessionLoop(proto, sess, cfg, track))
115 }
116
117=== modified file 'server/session/session_test.go'
118--- server/session/session_test.go 2014-01-23 20:13:22 +0000
119+++ server/session/session_test.go 2014-01-31 17:25:49 +0000
120@@ -147,7 +147,6 @@
121 }()
122 c.Check(takeNext(down), Equals, "deadline 5ms")
123 up <- protocol.ConnectMsg{Type: "connect", ClientVer: "1", DeviceId: "dev-1"}
124- c.Check(takeNext(down), Equals, "deadline 5ms")
125 c.Check(takeNext(down), Equals, protocol.ConnAckMsg{
126 Type: "connack",
127 Params: protocol.ConnAckParams{(10 * time.Millisecond).String()},
128@@ -199,7 +198,6 @@
129 c.Check(err, Equals, io.ErrUnexpectedEOF)
130 // sanity
131 c.Check(takeNext(down), Matches, "deadline.*")
132- c.Check(takeNext(down), Matches, "deadline.*")
133 c.Check(takeNext(down), FitsTypeOf, protocol.ConnAckMsg{})
134 }
135
136@@ -218,13 +216,14 @@
137 }
138
139 func (s *sessionSuite) TestSessionLoop(c *C) {
140+ nopTrack := NewTracker(nopLogger)
141 errCh := make(chan error, 1)
142 up := make(chan interface{}, 5)
143 down := make(chan interface{}, 5)
144 tp := &testProtocol{up, down}
145 sess := &testing.TestBrokerSession{}
146 go func() {
147- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
148+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
149 }()
150 c.Check(takeNext(down), Equals, "deadline 2ms")
151 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
152@@ -239,13 +238,14 @@
153 }
154
155 func (s *sessionSuite) TestSessionLoopWriteError(c *C) {
156+ nopTrack := NewTracker(nopLogger)
157 errCh := make(chan error, 1)
158 up := make(chan interface{}, 5)
159 down := make(chan interface{}, 5)
160 tp := &testProtocol{up, down}
161 sess := &testing.TestBrokerSession{}
162 go func() {
163- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
164+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
165 }()
166 c.Check(takeNext(down), Equals, "deadline 2ms")
167 c.Check(takeNext(down), FitsTypeOf, protocol.PingPongMsg{})
168@@ -255,13 +255,14 @@
169 }
170
171 func (s *sessionSuite) TestSessionLoopMismatch(c *C) {
172+ nopTrack := NewTracker(nopLogger)
173 errCh := make(chan error, 1)
174 up := make(chan interface{}, 5)
175 down := make(chan interface{}, 5)
176 tp := &testProtocol{up, down}
177 sess := &testing.TestBrokerSession{}
178 go func() {
179- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
180+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
181 }()
182 c.Check(takeNext(down), Equals, "deadline 2ms")
183 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
184@@ -294,18 +295,29 @@
185 finErr error
186 finSleep time.Duration
187 nParts int
188+ done chan interface{}
189 }
190
191 func (exchg *testExchange) Prepare(sess broker.BrokerSession) (outMsg protocol.SplittableMsg, inMsg interface{}, err error) {
192 return &testMsg{Type: "msg", nParts: exchg.nParts}, &exchg.inMsg, exchg.prepErr
193 }
194
195-func (exchg *testExchange) Acked(sess broker.BrokerSession) error {
196+func (exchg *testExchange) Acked(sess broker.BrokerSession, done bool) error {
197 time.Sleep(exchg.finSleep)
198+ if exchg.done != nil {
199+ var doneStr string
200+ if done {
201+ doneStr = "y"
202+ } else {
203+ doneStr = "n"
204+ }
205+ exchg.done <- doneStr
206+ }
207 return exchg.finErr
208 }
209
210 func (s *sessionSuite) TestSessionLoopExchange(c *C) {
211+ nopTrack := NewTracker(nopLogger)
212 errCh := make(chan error, 1)
213 up := make(chan interface{}, 5)
214 down := make(chan interface{}, 5)
215@@ -314,7 +326,7 @@
216 exchanges <- &testExchange{}
217 sess := &testing.TestBrokerSession{Exchanges: exchanges}
218 go func() {
219- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
220+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
221 }()
222 c.Check(takeNext(down), Equals, "deadline 2ms")
223 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})
224@@ -329,24 +341,28 @@
225 }
226
227 func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) {
228+ nopTrack := NewTracker(nopLogger)
229 errCh := make(chan error, 1)
230 up := make(chan interface{}, 5)
231 down := make(chan interface{}, 5)
232 tp := &testProtocol{up, down}
233 exchanges := make(chan broker.Exchange, 1)
234- exchanges <- &testExchange{nParts: 2}
235+ exchange := &testExchange{nParts: 2, done: make(chan interface{}, 2)}
236+ exchanges <- exchange
237 sess := &testing.TestBrokerSession{Exchanges: exchanges}
238 go func() {
239- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
240+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
241 }()
242 c.Check(takeNext(down), Equals, "deadline 2ms")
243 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 1, nParts: 2})
244 up <- nil // no write error
245 up <- testMsg{Type: "ack"}
246+ c.Check(takeNext(exchange.done), Equals, "n")
247 c.Check(takeNext(down), Equals, "deadline 2ms")
248 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg", Part: 2, nParts: 2})
249 up <- nil // no write error
250 up <- testMsg{Type: "ack"}
251+ c.Check(takeNext(exchange.done), Equals, "y")
252 c.Check(takeNext(down), Equals, "deadline 2ms")
253 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
254 up <- nil // no write error
255@@ -356,6 +372,7 @@
256 }
257
258 func (s *sessionSuite) TestSessionLoopExchangePrepareError(c *C) {
259+ nopTrack := NewTracker(nopLogger)
260 errCh := make(chan error, 1)
261 up := make(chan interface{}, 5)
262 down := make(chan interface{}, 5)
263@@ -365,13 +382,14 @@
264 exchanges <- &testExchange{prepErr: prepErr}
265 sess := &testing.TestBrokerSession{Exchanges: exchanges}
266 go func() {
267- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
268+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
269 }()
270 err := <-errCh
271 c.Check(err, Equals, prepErr)
272 }
273
274 func (s *sessionSuite) TestSessionLoopExchangeAckedError(c *C) {
275+ nopTrack := NewTracker(nopLogger)
276 errCh := make(chan error, 1)
277 up := make(chan interface{}, 5)
278 down := make(chan interface{}, 5)
279@@ -381,7 +399,7 @@
280 exchanges <- &testExchange{finErr: finErr}
281 sess := &testing.TestBrokerSession{Exchanges: exchanges}
282 go func() {
283- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
284+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
285 }()
286 c.Check(takeNext(down), Equals, "deadline 2ms")
287 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})
288@@ -392,6 +410,7 @@
289 }
290
291 func (s *sessionSuite) TestSessionLoopExchangeWriteError(c *C) {
292+ nopTrack := NewTracker(nopLogger)
293 errCh := make(chan error, 1)
294 up := make(chan interface{}, 5)
295 down := make(chan interface{}, 5)
296@@ -400,7 +419,7 @@
297 exchanges <- &testExchange{}
298 sess := &testing.TestBrokerSession{Exchanges: exchanges}
299 go func() {
300- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
301+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
302 }()
303 c.Check(takeNext(down), Equals, "deadline 2ms")
304 c.Check(takeNext(down), FitsTypeOf, testMsg{})
305@@ -409,25 +428,38 @@
306 c.Check(err, Equals, io.ErrUnexpectedEOF)
307 }
308
309+type testTracker struct {
310+ SessionTracker
311+ interval chan interface{}
312+}
313+
314+func (trk *testTracker) EffectivePingInterval(interval time.Duration) {
315+ trk.interval <- interval
316+}
317+
318 func (s *sessionSuite) TestSessionLoopExchangeNextPing(c *C) {
319+ track := &testTracker{NewTracker(nopLogger), make(chan interface{}, 1)}
320 errCh := make(chan error, 1)
321 up := make(chan interface{}, 5)
322 down := make(chan interface{}, 5)
323 tp := &testProtocol{up, down}
324 exchanges := make(chan broker.Exchange, 1)
325- exchanges <- &testExchange{finSleep: 3 * time.Millisecond}
326+ exchanges <- &testExchange{finSleep: 6 * time.Millisecond}
327 sess := &testing.TestBrokerSession{Exchanges: exchanges}
328 go func() {
329- errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout)
330+ errCh <- sessionLoop(tp, sess, cfg10msPingInterval5msExchangeTout, track)
331 }()
332- c.Check(takeNext(down), Equals, "deadline 2ms")
333+ c.Check(takeNext(down), Equals, "deadline 5ms")
334 c.Check(takeNext(down), DeepEquals, testMsg{Type: "msg"})
335 up <- nil // no write error
336 up <- testMsg{Type: "ack"}
337- tack := time.Now() // next ping interval starts around here
338- c.Check(takeNext(down), Equals, "deadline 2ms")
339+ // next ping interval starts around here
340+ interval := takeNext(track.interval).(time.Duration)
341+ c.Check(takeNext(down), Equals, "deadline 5ms")
342 c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
343- c.Check(time.Since(tack) < (3+5)*time.Millisecond, Equals, true)
344+ effectiveOfPing := float64(interval) / float64(10*time.Millisecond)
345+ c.Check(effectiveOfPing > 0.95, Equals, true)
346+ c.Check(effectiveOfPing < 1.15, Equals, true)
347 up <- nil // no write error
348 up <- io.EOF
349 err := <-errCh
350@@ -470,9 +502,9 @@
351 return c.Conn.SetDeadline(t)
352 }
353
354-var cfg5msPingInterval = &testSessionConfig{
355- pingInterval: 5 * time.Millisecond,
356- exchangeTimeout: 5 * time.Millisecond,
357+var cfg50msPingInterval = &testSessionConfig{
358+ pingInterval: 50 * time.Millisecond,
359+ exchangeTimeout: 10 * time.Millisecond,
360 }
361
362 var nopLogger = logger.NewSimpleLogger(ioutil.Discard, "error")
363@@ -486,7 +518,7 @@
364 remSrv := &rememberDeadlineConn{srv, make([]string, 0, 2)}
365 brkr := newTestBroker()
366 go func() {
367- errCh <- Session(remSrv, brkr, cfg5msPingInterval, track)
368+ errCh <- Session(remSrv, brkr, cfg50msPingInterval, track)
369 }()
370 io.WriteString(cli, "\x00")
371 io.WriteString(cli, "\x00\x20{\"T\":\"connect\",\"DeviceId\":\"DEV\"}")
372@@ -494,7 +526,7 @@
373 downStream := bufio.NewReader(cli)
374 msg, err := downStream.ReadBytes(byte('}'))
375 c.Check(err, IsNil)
376- c.Check(msg, DeepEquals, []byte("\x00\x2f{\"T\":\"connack\",\"Params\":{\"PingInterval\":\"5ms\"}"))
377+ c.Check(msg, DeepEquals, []byte("\x00\x30{\"T\":\"connack\",\"Params\":{\"PingInterval\":\"50ms\"}"))
378 // eat the last }
379 rbr, err := downStream.ReadByte()
380 c.Check(err, IsNil)
381@@ -507,7 +539,7 @@
382 c.Check(len(brkr.registration), Equals, 0) // not yet unregistered
383 cli.Close()
384 err = <-errCh
385- c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both", "both"})
386+ c.Check(remSrv.deadlineKind, DeepEquals, []string{"read", "both", "both"})
387 c.Check(err, Equals, io.EOF)
388 c.Check(takeNext(brkr.registration), Equals, "unregister DEV")
389 // tracking
390@@ -538,7 +570,7 @@
391 defer lst.Close()
392 brkr := newTestBroker()
393 go func() {
394- errCh <- Session(srv, brkr, cfg5msPingInterval, track)
395+ errCh <- Session(srv, brkr, cfg50msPingInterval, track)
396 }()
397 io.WriteString(cli, "\x10")
398 err := <-errCh
399@@ -557,7 +589,7 @@
400 defer lst.Close()
401 brkr := newTestBroker()
402 go func() {
403- errCh <- Session(srv, brkr, cfg5msPingInterval, track)
404+ errCh <- Session(srv, brkr, cfg50msPingInterval, track)
405 }()
406 cli.Close()
407 err := <-errCh
408@@ -575,7 +607,7 @@
409 defer lst.Close()
410 brkr := newTestBroker()
411 go func() {
412- errCh <- Session(srv, brkr, cfg5msPingInterval, track)
413+ errCh <- Session(srv, brkr, cfg50msPingInterval, track)
414 }()
415 io.WriteString(cli, "\x00")
416 io.WriteString(cli, "\x00")
417
418=== modified file 'server/session/tracker.go'
419--- server/session/tracker.go 2014-01-23 20:13:22 +0000
420+++ server/session/tracker.go 2014-01-31 17:25:49 +0000
421@@ -30,6 +30,8 @@
422 Start(WithRemoteAddr)
423 // Session got registered with broker as sess BrokerSession.
424 Registered(sess broker.BrokerSession)
425+ // Report effective elapsed ping interval.
426+ EffectivePingInterval(time.Duration)
427 // Session got ended with error err (can be nil).
428 End(error) error
429 }
430@@ -60,6 +62,9 @@
431 trk.Infof("session(%x) registered %v", trk.sessionId, sess.DeviceIdentifier())
432 }
433
434+func (trk *tracker) EffectivePingInterval(time.Duration) {
435+}
436+
437 func (trk *tracker) End(err error) error {
438 trk.Debugf("session(%x) ended with: %v", trk.sessionId, err)
439 return err

Subscribers

People subscribed via source and target branches