Merge lp:~pedronis/ubuntu-push/kick-out-replaced into lp:ubuntu-push

Proposed by Samuele Pedroni
Status: Superseded
Proposed branch: lp:~pedronis/ubuntu-push/kick-out-replaced
Merge into: lp:ubuntu-push
Diff against target: 565 lines (+283/-20)
12 files modified
client/session/session.go (+21/-1)
client/session/session_test.go (+69/-4)
protocol/messages.go (+21/-5)
protocol/messages_test.go (+4/-0)
server/acceptance/suites/broadcast.go (+0/-2)
server/broker/broker.go (+4/-0)
server/broker/exchanges.go (+32/-6)
server/broker/exchanges_test.go (+52/-0)
server/broker/simple/simple.go (+4/-0)
server/broker/testsuite/suite.go (+8/-0)
server/session/session.go (+13/-2)
server/session/session_test.go (+55/-0)
To merge this branch: bzr merge lp:~pedronis/ubuntu-push/kick-out-replaced
Reviewer Review Type Date Requested Status
Ubuntu Push Hackers Pending
Review via email: mp+215374@code.launchpad.net

Commit message

explicitly kick out superseded sessions

Description of the change

explicitly kick out superseded sessions

To post a comment you must log in.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'client/session/session.go'
2--- client/session/session.go 2014-04-04 12:28:40 +0000
3+++ client/session/session.go 2014-04-11 09:14:12 +0000
4@@ -49,6 +49,7 @@
5 Type string `json:"T"`
6 protocol.BroadcastMsg
7 protocol.NotificationsMsg
8+ protocol.ConnBrokenMsg
9 }
10
11 // parseServerAddrSpec recognizes whether spec is a HTTP URL to get
12@@ -176,7 +177,7 @@
13 // getHosts sets deliveryHosts possibly querying a remote endpoint
14 func (sess *ClientSession) getHosts() error {
15 if sess.getHost != nil {
16- if sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
17+ if sess.deliveryHosts != nil && sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime {
18 return nil
19 }
20 hosts, err := sess.getHost.Get()
21@@ -193,6 +194,10 @@
22 return nil
23 }
24
25+func (sess *ClientSession) resetHosts() {
26+ sess.deliveryHosts = nil
27+}
28+
29 // startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts
30
31 func (sess *ClientSession) startConnectionAttempt() {
32@@ -338,6 +343,19 @@
33 return nil
34 }
35
36+// handle "connbroken" messages
37+func (sess *ClientSession) handleConnBroken(connBroken *serverMsg) error {
38+ sess.setState(Error)
39+ reason := connBroken.Reason
40+ err := fmt.Errorf("server broke connection: %s", reason)
41+ sess.Log.Errorf("%s", err)
42+ switch reason {
43+ case protocol.BrokenHostMismatch:
44+ sess.resetHosts()
45+ }
46+ return err
47+}
48+
49 // loop runs the session with the server, emits a stream of events.
50 func (sess *ClientSession) loop() error {
51 var err error
52@@ -356,6 +374,8 @@
53 err = sess.handlePing()
54 case "broadcast":
55 err = sess.handleBroadcast(&recv)
56+ case "connbroken":
57+ err = sess.handleConnBroken(&recv)
58 }
59 if err != nil {
60 return err
61
62=== modified file 'client/session/session_test.go'
63--- client/session/session_test.go 2014-04-03 20:56:25 +0000
64+++ client/session/session_test.go 2014-04-11 09:14:12 +0000
65@@ -317,6 +317,29 @@
66 c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"})
67 }
68
69+func (cs *clientSessionSuite) TestGetHostsRemoteCachingReset(c *C) {
70+ hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil}
71+ sess := &ClientSession{
72+ getHost: hostGetter,
73+ ClientSessionConfig: ClientSessionConfig{
74+ HostsCachingExpiryTime: 2 * time.Hour,
75+ },
76+ timeSince: time.Since,
77+ }
78+ err := sess.getHosts()
79+ c.Assert(err, IsNil)
80+ hostGetter.hosts = []string{"baz:443"}
81+ // cached
82+ err = sess.getHosts()
83+ c.Assert(err, IsNil)
84+ c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"})
85+ // reset
86+ sess.resetHosts()
87+ err = sess.getHosts()
88+ c.Assert(err, IsNil)
89+ c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"})
90+}
91+
92 /****************************************************************
93 startConnectionAttempt()/nextHostToTry()/started tests
94 ****************************************************************/
95@@ -587,7 +610,7 @@
96 json.RawMessage("false"), // shouldn't happen but robust
97 json.RawMessage(`{"img1/m1":[102,"tubular"]}`),
98 },
99- }, protocol.NotificationsMsg{}}
100+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
101 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
102 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
103 s.upCh <- nil // ack ok
104@@ -618,7 +641,7 @@
105 ChanId: "0",
106 TopLevel: 2,
107 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
108- }, protocol.NotificationsMsg{}}
109+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
110 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
111 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
112 failure := errors.New("ACK ACK ACK")
113@@ -635,7 +658,7 @@
114 ChanId: "something awful",
115 TopLevel: 2,
116 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
117- }, protocol.NotificationsMsg{}}
118+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
119 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
120 c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"})
121 s.upCh <- nil // ack ok
122@@ -652,7 +675,7 @@
123 ChanId: "0",
124 TopLevel: 2,
125 Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)},
126- }, protocol.NotificationsMsg{}}
127+ }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}}
128 go func() { s.errCh <- s.sess.handleBroadcast(&msg) }()
129 s.upCh <- nil // ack ok
130 // start returns with error
131@@ -665,6 +688,37 @@
132 }
133
134 /****************************************************************
135+ handleConnBroken() tests
136+****************************************************************/
137+
138+func (s *msgSuite) TestHandleConnBrokenUnkwown(c *C) {
139+ msg := serverMsg{"connbroken",
140+ protocol.BroadcastMsg{}, protocol.NotificationsMsg{},
141+ protocol.ConnBrokenMsg{
142+ Reason: "REASON",
143+ },
144+ }
145+ go func() { s.errCh <- s.sess.handleConnBroken(&msg) }()
146+ c.Check(<-s.errCh, ErrorMatches, "server broke connection: REASON")
147+ c.Check(s.sess.State(), Equals, Error)
148+}
149+
150+func (s *msgSuite) TestHandleConnBrokenHostMismatch(c *C) {
151+ msg := serverMsg{"connbroken",
152+ protocol.BroadcastMsg{}, protocol.NotificationsMsg{},
153+ protocol.ConnBrokenMsg{
154+ Reason: protocol.BrokenHostMismatch,
155+ },
156+ }
157+ s.sess.deliveryHosts = []string{"foo:443", "bar:443"}
158+ go func() { s.errCh <- s.sess.handleConnBroken(&msg) }()
159+ c.Check(<-s.errCh, ErrorMatches, "server broke connection: host-mismatch")
160+ c.Check(s.sess.State(), Equals, Error)
161+ // hosts were reset
162+ c.Check(s.sess.deliveryHosts, IsNil)
163+}
164+
165+/****************************************************************
166 loop() tests
167 ****************************************************************/
168
169@@ -728,6 +782,17 @@
170 c.Check(<-s.errCh, Equals, failure)
171 }
172
173+func (s *loopSuite) TestLoopConnBroken(c *C) {
174+ c.Check(s.sess.State(), Equals, Running)
175+ broken := protocol.ConnBrokenMsg{
176+ Type: "connbroken",
177+ Reason: "REASON",
178+ }
179+ c.Check(takeNext(s.downCh), Equals, "deadline 1ms")
180+ s.upCh <- broken
181+ c.Check(<-s.errCh, NotNil)
182+}
183+
184 /****************************************************************
185 start() tests
186 ****************************************************************/
187
188=== modified file 'protocol/messages.go'
189--- protocol/messages.go 2014-03-19 22:31:20 +0000
190+++ protocol/messages.go 2014-04-11 09:14:12 +0000
191@@ -49,6 +49,27 @@
192 PingInterval string
193 }
194
195+// SplittableMsg are messages that may require and are capable of splitting.
196+type SplittableMsg interface {
197+ Split() (done bool)
198+}
199+
200+// CONNBROKEN message, server side is breaking the connection for reason.
201+type ConnBrokenMsg struct {
202+ Type string `json:"T"`
203+ // reason
204+ Reason string
205+}
206+
207+func (m *ConnBrokenMsg) Split() bool {
208+ return true
209+}
210+
211+// CONNBROKEN reasons
212+const (
213+ BrokenHostMismatch = "host-mismatch"
214+)
215+
216 // PING/PONG messages
217 type PingPongMsg struct {
218 Type string `json:"T"`
219@@ -56,11 +77,6 @@
220
221 const maxPayloadSize = 62 * 1024
222
223-// SplittableMsg are messages that may require and are capable of splitting.
224-type SplittableMsg interface {
225- Split() (done bool)
226-}
227-
228 // BROADCAST messages
229 type BroadcastMsg struct {
230 Type string `json:"T"`
231
232=== modified file 'protocol/messages_test.go'
233--- protocol/messages_test.go 2014-02-26 16:04:57 +0000
234+++ protocol/messages_test.go 2014-04-11 09:14:12 +0000
235@@ -103,3 +103,7 @@
236 b.Reset()
237 c.Check(b.splitting, Equals, 0)
238 }
239+
240+func (s *messagesSuite) TestSplitConnBrokenMsg(c *C) {
241+ c.Check((&ConnBrokenMsg{}).Split(), Equals, true)
242+}
243
244=== modified file 'server/acceptance/suites/broadcast.go'
245--- server/acceptance/suites/broadcast.go 2014-04-03 16:47:47 +0000
246+++ server/acceptance/suites/broadcast.go 2014-04-11 09:14:12 +0000
247@@ -66,8 +66,6 @@
248 })
249 c.Assert(err, IsNil)
250 c.Assert(got, Matches, ".*ok.*")
251- // xxx don't send this one
252- c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`)
253 c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`)
254 stop()
255 c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`)
256
257=== modified file 'server/broker/broker.go'
258--- server/broker/broker.go 2014-04-03 14:31:10 +0000
259+++ server/broker/broker.go 2014-04-11 09:14:12 +0000
260@@ -19,6 +19,7 @@
261 package broker
262
263 import (
264+ "errors"
265 "fmt"
266
267 "launchpad.net/ubuntu-push/protocol"
268@@ -46,6 +47,9 @@
269 Acked(sess BrokerSession, done bool) error
270 }
271
272+// ErrNop returned by Prepare means nothing to do/send.
273+var ErrNop = errors.New("nothing to send")
274+
275 // LevelsMap is the type for holding channel levels for session.
276 type LevelsMap map[store.InternalChannelId]int64
277
278
279=== modified file 'server/broker/exchanges.go'
280--- server/broker/exchanges.go 2014-04-03 16:00:53 +0000
281+++ server/broker/exchanges.go 2014-04-11 09:14:12 +0000
282@@ -28,8 +28,9 @@
283
284 // Scratch area for exchanges, sessions should hold one of these.
285 type ExchangesScratchArea struct {
286- broadcastMsg protocol.BroadcastMsg
287- ackMsg protocol.AckMsg
288+ broadcastMsg protocol.BroadcastMsg
289+ ackMsg protocol.AckMsg
290+ connBrokenMsg protocol.ConnBrokenMsg
291 }
292
293 // BroadcastExchange leads a session through delivering a BROADCAST.
294@@ -42,7 +43,7 @@
295 }
296
297 // check interface already here
298-var _ Exchange = &BroadcastExchange{}
299+var _ Exchange = (*BroadcastExchange)(nil)
300
301 // Init ensures the BroadcastExchange is fully initialized for the sessions.
302 func (sbe *BroadcastExchange) Init() {
303@@ -88,14 +89,18 @@
304
305 // Prepare session for a BROADCAST.
306 func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
307- scratchArea := sess.ExchangeScratchArea()
308- scratchArea.broadcastMsg.Reset()
309- scratchArea.broadcastMsg.Type = "broadcast"
310 clientLevel := sess.Levels()[sbe.ChanId]
311 payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads)
312 tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel())
313 payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded)
314+ if len(payloads) == 0 && sbe.TopLevel >= clientLevel {
315+ // empty and don't need to force resync => do nothing
316+ return nil, nil, ErrNop
317+ }
318
319+ scratchArea := sess.ExchangeScratchArea()
320+ scratchArea.broadcastMsg.Reset()
321+ scratchArea.broadcastMsg.Type = "broadcast"
322 // xxx need an AppId as well, later
323 scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId)
324 scratchArea.broadcastMsg.TopLevel = sbe.TopLevel
325@@ -113,3 +118,24 @@
326 sess.Levels()[sbe.ChanId] = sbe.TopLevel
327 return nil
328 }
329+
330+// ConnBrokenExchange breaks a session giving a reason.
331+type ConnBrokenExchange struct {
332+ Reason string
333+}
334+
335+// check interface already here
336+var _ Exchange = (*ConnBrokenExchange)(nil)
337+
338+// Prepare session for a CONNBROKEN.
339+func (cbe *ConnBrokenExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) {
340+ scratchArea := sess.ExchangeScratchArea()
341+ scratchArea.connBrokenMsg.Type = "connbroken"
342+ scratchArea.connBrokenMsg.Reason = cbe.Reason
343+ return &scratchArea.connBrokenMsg, nil, nil
344+}
345+
346+// CONNBROKEN isn't acked
347+func (cbe *ConnBrokenExchange) Acked(sess BrokerSession, done bool) error {
348+ panic("Acked should not get invoked on ConnBrokenExchange")
349+}
350
351=== modified file 'server/broker/exchanges_test.go'
352--- server/broker/exchanges_test.go 2014-04-04 09:57:02 +0000
353+++ server/broker/exchanges_test.go 2014-04-11 09:14:12 +0000
354@@ -81,6 +81,44 @@
355 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3))
356 }
357
358+func (s *exchangesSuite) TestBroadcastExchangeEmpty(c *C) {
359+ sess := &testing.TestBrokerSession{
360+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
361+ Model: "m1",
362+ ImageChannel: "img1",
363+ }
364+ exchg := &broker.BroadcastExchange{
365+ ChanId: store.SystemInternalChannelId,
366+ TopLevel: 3,
367+ NotificationPayloads: []json.RawMessage{},
368+ }
369+ exchg.Init()
370+ outMsg, inMsg, err := exchg.Prepare(sess)
371+ c.Assert(err, Equals, broker.ErrNop)
372+ c.Check(outMsg, IsNil)
373+ c.Check(inMsg, IsNil)
374+}
375+
376+func (s *exchangesSuite) TestBroadcastExchangeEmptyButAhead(c *C) {
377+ sess := &testing.TestBrokerSession{
378+ LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{
379+ store.SystemInternalChannelId: 10,
380+ }),
381+ Model: "m1",
382+ ImageChannel: "img1",
383+ }
384+ exchg := &broker.BroadcastExchange{
385+ ChanId: store.SystemInternalChannelId,
386+ TopLevel: 3,
387+ NotificationPayloads: []json.RawMessage{},
388+ }
389+ exchg.Init()
390+ outMsg, inMsg, err := exchg.Prepare(sess)
391+ c.Assert(err, IsNil)
392+ c.Check(outMsg, NotNil)
393+ c.Check(inMsg, NotNil)
394+}
395+
396 func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) {
397 sess := &testing.TestBrokerSession{
398 LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}),
399@@ -210,3 +248,17 @@
400 c.Assert(err, IsNil)
401 c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5))
402 }
403+
404+func (s *exchangesSuite) TestConnBrokenExchange(c *C) {
405+ sess := &testing.TestBrokerSession{}
406+ cbe := &broker.ConnBrokenExchange{"REASON"}
407+ outMsg, inMsg, err := cbe.Prepare(sess)
408+ c.Assert(err, IsNil)
409+ c.Check(inMsg, IsNil) // no answer is expected
410+ // check
411+ marshalled, err := json.Marshal(outMsg)
412+ c.Assert(err, IsNil)
413+ c.Check(string(marshalled), Equals, `{"T":"connbroken","Reason":"REASON"}`)
414+
415+ c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnBrokenExchange")
416+}
417
418=== modified file 'server/broker/simple/simple.go'
419--- server/broker/simple/simple.go 2014-04-03 16:00:53 +0000
420+++ server/broker/simple/simple.go 2014-04-11 09:14:12 +0000
421@@ -222,6 +222,10 @@
422 delete(b.registry, sess.deviceId)
423 }
424 } else { // register
425+ prev := b.registry[sess.deviceId]
426+ if prev != nil { // kick it
427+ close(prev.exchanges)
428+ }
429 b.registry[sess.deviceId] = sess
430 sess.registered = true
431 sess.done <- true
432
433=== modified file 'server/broker/testsuite/suite.go'
434--- server/broker/testsuite/suite.go 2014-04-03 16:00:53 +0000
435+++ server/broker/testsuite/suite.go 2014-04-11 09:14:12 +0000
436@@ -164,6 +164,14 @@
437 c.Assert(err, IsNil)
438 sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"})
439 c.Assert(err, IsNil)
440+ checkAndFalse := false
441+ // previous session got signaled by closing its channel
442+ select {
443+ case _, ok := <-sess1.SessionChannel():
444+ checkAndFalse = ok == false
445+ default:
446+ }
447+ c.Check(checkAndFalse, Equals, true)
448 c.Assert(s.RevealSession(b, "dev-1"), Equals, sess2)
449 b.Unregister(sess1)
450 // just to make sure the unregister was processed
451
452=== modified file 'server/session/session.go'
453--- server/session/session.go 2014-02-10 23:19:08 +0000
454+++ server/session/session.go 2014-04-11 09:14:12 +0000
455@@ -62,6 +62,9 @@
456 if err != nil {
457 return err
458 }
459+ if inMsg == nil { // no answer expected, breaking connection
460+ return &broker.ErrAbort{"session broken for reason"}
461+ }
462 err = proto.ReadMessage(inMsg)
463 if err != nil {
464 return err
465@@ -76,6 +79,7 @@
466 pingTimer := time.NewTimer(pingInterval)
467 intervalStart := time.Now()
468 ch := sess.SessionChannel()
469+Loop:
470 for {
471 select {
472 case <-pingTimer.C:
473@@ -90,10 +94,17 @@
474 return &broker.ErrAbort{"expected PONG message"}
475 }
476 pingTimer.Reset(pingInterval)
477- case exchg := <-ch:
478- // xxx later can use ch closing for shutdown/reset
479+ case exchg, ok := <-ch:
480 pingTimer.Stop()
481+ if !ok {
482+ return &broker.ErrAbort{"terminated"}
483+ }
484 outMsg, inMsg, err := exchg.Prepare(sess)
485+ if err == broker.ErrNop { // nothing to do
486+ pingTimer.Reset(pingInterval)
487+ intervalStart = time.Now()
488+ continue Loop
489+ }
490 if err != nil {
491 return err
492 }
493
494=== modified file 'server/session/session_test.go'
495--- server/session/session_test.go 2014-03-19 23:46:18 +0000
496+++ server/session/session_test.go 2014-04-11 09:14:12 +0000
497@@ -346,6 +346,42 @@
498 c.Check(err, Equals, io.EOF)
499 }
500
501+func (s *sessionSuite) TestSessionLoopKick(c *C) {
502+ nopTrack := NewTracker(s.testlog)
503+ errCh := make(chan error, 1)
504+ up := make(chan interface{}, 5)
505+ down := make(chan interface{}, 5)
506+ tp := &testProtocol{up, down}
507+ exchanges := make(chan broker.Exchange, 1)
508+ sess := &testing.TestBrokerSession{Exchanges: exchanges}
509+ go func() {
510+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
511+ }()
512+ close(exchanges)
513+ err := <-errCh
514+ c.Check(err, DeepEquals, &broker.ErrAbort{"terminated"})
515+}
516+
517+func (s *sessionSuite) TestSessionLoopExchangeErrNop(c *C) {
518+ nopTrack := NewTracker(s.testlog)
519+ errCh := make(chan error, 1)
520+ up := make(chan interface{}, 5)
521+ down := make(chan interface{}, 5)
522+ tp := &testProtocol{up, down}
523+ exchanges := make(chan broker.Exchange, 1)
524+ exchanges <- &testExchange{prepErr: broker.ErrNop}
525+ sess := &testing.TestBrokerSession{Exchanges: exchanges}
526+ go func() {
527+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
528+ }()
529+ c.Check(takeNext(down), Equals, "deadline 2ms")
530+ c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"})
531+ up <- nil // no write error
532+ up <- io.EOF
533+ err := <-errCh
534+ c.Check(err, Equals, io.EOF)
535+}
536+
537 func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) {
538 nopTrack := NewTracker(s.testlog)
539 errCh := make(chan error, 1)
540@@ -434,6 +470,25 @@
541 c.Check(err, Equals, io.ErrUnexpectedEOF)
542 }
543
544+func (s *sessionSuite) TestSessionLoopConnBrokenExchange(c *C) {
545+ nopTrack := NewTracker(s.testlog)
546+ errCh := make(chan error, 1)
547+ up := make(chan interface{}, 5)
548+ down := make(chan interface{}, 5)
549+ tp := &testProtocol{up, down}
550+ exchanges := make(chan broker.Exchange, 1)
551+ exchanges <- &broker.ConnBrokenExchange{"REASON"}
552+ sess := &testing.TestBrokerSession{Exchanges: exchanges}
553+ go func() {
554+ errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack)
555+ }()
556+ c.Check(takeNext(down), Equals, "deadline 2ms")
557+ c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "REASON"})
558+ up <- nil // no write error
559+ err := <-errCh
560+ c.Check(err, DeepEquals, &broker.ErrAbort{"session broken for reason"})
561+}
562+
563 type testTracker struct {
564 SessionTracker
565 interval chan interface{}

Subscribers

People subscribed via source and target branches