Merge lp:~pedronis/ubuntu-push/kick-out-replaced into lp:ubuntu-push
- kick-out-replaced
- Merge into trunk
Proposed by
Samuele Pedroni
Status: | Superseded |
---|---|
Proposed branch: | lp:~pedronis/ubuntu-push/kick-out-replaced |
Merge into: | lp:ubuntu-push |
Diff against target: |
565 lines (+283/-20) 12 files modified
client/session/session.go (+21/-1) client/session/session_test.go (+69/-4) protocol/messages.go (+21/-5) protocol/messages_test.go (+4/-0) server/acceptance/suites/broadcast.go (+0/-2) server/broker/broker.go (+4/-0) server/broker/exchanges.go (+32/-6) server/broker/exchanges_test.go (+52/-0) server/broker/simple/simple.go (+4/-0) server/broker/testsuite/suite.go (+8/-0) server/session/session.go (+13/-2) server/session/session_test.go (+55/-0) |
To merge this branch: | bzr merge lp:~pedronis/ubuntu-push/kick-out-replaced |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Ubuntu Push Hackers | Pending | ||
Review via email: mp+215374@code.launchpad.net |
Commit message
explicitly kick out superseded sessions
Description of the change
explicitly kick out superseded sessions
To post a comment you must log in.
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'client/session/session.go' |
2 | --- client/session/session.go 2014-04-04 12:28:40 +0000 |
3 | +++ client/session/session.go 2014-04-11 09:14:12 +0000 |
4 | @@ -49,6 +49,7 @@ |
5 | Type string `json:"T"` |
6 | protocol.BroadcastMsg |
7 | protocol.NotificationsMsg |
8 | + protocol.ConnBrokenMsg |
9 | } |
10 | |
11 | // parseServerAddrSpec recognizes whether spec is a HTTP URL to get |
12 | @@ -176,7 +177,7 @@ |
13 | // getHosts sets deliveryHosts possibly querying a remote endpoint |
14 | func (sess *ClientSession) getHosts() error { |
15 | if sess.getHost != nil { |
16 | - if sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime { |
17 | + if sess.deliveryHosts != nil && sess.timeSince(sess.deliveryHostsTimestamp) < sess.HostsCachingExpiryTime { |
18 | return nil |
19 | } |
20 | hosts, err := sess.getHost.Get() |
21 | @@ -193,6 +194,10 @@ |
22 | return nil |
23 | } |
24 | |
25 | +func (sess *ClientSession) resetHosts() { |
26 | + sess.deliveryHosts = nil |
27 | +} |
28 | + |
29 | // startConnectionAttempt/nextHostToTry help connect iterating over candidate hosts |
30 | |
31 | func (sess *ClientSession) startConnectionAttempt() { |
32 | @@ -338,6 +343,19 @@ |
33 | return nil |
34 | } |
35 | |
36 | +// handle "connbroken" messages |
37 | +func (sess *ClientSession) handleConnBroken(connBroken *serverMsg) error { |
38 | + sess.setState(Error) |
39 | + reason := connBroken.Reason |
40 | + err := fmt.Errorf("server broke connection: %s", reason) |
41 | + sess.Log.Errorf("%s", err) |
42 | + switch reason { |
43 | + case protocol.BrokenHostMismatch: |
44 | + sess.resetHosts() |
45 | + } |
46 | + return err |
47 | +} |
48 | + |
49 | // loop runs the session with the server, emits a stream of events. |
50 | func (sess *ClientSession) loop() error { |
51 | var err error |
52 | @@ -356,6 +374,8 @@ |
53 | err = sess.handlePing() |
54 | case "broadcast": |
55 | err = sess.handleBroadcast(&recv) |
56 | + case "connbroken": |
57 | + err = sess.handleConnBroken(&recv) |
58 | } |
59 | if err != nil { |
60 | return err |
61 | |
62 | === modified file 'client/session/session_test.go' |
63 | --- client/session/session_test.go 2014-04-03 20:56:25 +0000 |
64 | +++ client/session/session_test.go 2014-04-11 09:14:12 +0000 |
65 | @@ -317,6 +317,29 @@ |
66 | c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"}) |
67 | } |
68 | |
69 | +func (cs *clientSessionSuite) TestGetHostsRemoteCachingReset(c *C) { |
70 | + hostGetter := &testHostGetter{[]string{"foo:443", "bar:443"}, nil} |
71 | + sess := &ClientSession{ |
72 | + getHost: hostGetter, |
73 | + ClientSessionConfig: ClientSessionConfig{ |
74 | + HostsCachingExpiryTime: 2 * time.Hour, |
75 | + }, |
76 | + timeSince: time.Since, |
77 | + } |
78 | + err := sess.getHosts() |
79 | + c.Assert(err, IsNil) |
80 | + hostGetter.hosts = []string{"baz:443"} |
81 | + // cached |
82 | + err = sess.getHosts() |
83 | + c.Assert(err, IsNil) |
84 | + c.Check(sess.deliveryHosts, DeepEquals, []string{"foo:443", "bar:443"}) |
85 | + // reset |
86 | + sess.resetHosts() |
87 | + err = sess.getHosts() |
88 | + c.Assert(err, IsNil) |
89 | + c.Check(sess.deliveryHosts, DeepEquals, []string{"baz:443"}) |
90 | +} |
91 | + |
92 | /**************************************************************** |
93 | startConnectionAttempt()/nextHostToTry()/started tests |
94 | ****************************************************************/ |
95 | @@ -587,7 +610,7 @@ |
96 | json.RawMessage("false"), // shouldn't happen but robust |
97 | json.RawMessage(`{"img1/m1":[102,"tubular"]}`), |
98 | }, |
99 | - }, protocol.NotificationsMsg{}} |
100 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
101 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
102 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
103 | s.upCh <- nil // ack ok |
104 | @@ -618,7 +641,7 @@ |
105 | ChanId: "0", |
106 | TopLevel: 2, |
107 | Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
108 | - }, protocol.NotificationsMsg{}} |
109 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
110 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
111 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
112 | failure := errors.New("ACK ACK ACK") |
113 | @@ -635,7 +658,7 @@ |
114 | ChanId: "something awful", |
115 | TopLevel: 2, |
116 | Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
117 | - }, protocol.NotificationsMsg{}} |
118 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
119 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
120 | c.Check(takeNext(s.downCh), Equals, protocol.AckMsg{"ack"}) |
121 | s.upCh <- nil // ack ok |
122 | @@ -652,7 +675,7 @@ |
123 | ChanId: "0", |
124 | TopLevel: 2, |
125 | Payloads: []json.RawMessage{json.RawMessage(`{"b":1}`)}, |
126 | - }, protocol.NotificationsMsg{}} |
127 | + }, protocol.NotificationsMsg{}, protocol.ConnBrokenMsg{}} |
128 | go func() { s.errCh <- s.sess.handleBroadcast(&msg) }() |
129 | s.upCh <- nil // ack ok |
130 | // start returns with error |
131 | @@ -665,6 +688,37 @@ |
132 | } |
133 | |
134 | /**************************************************************** |
135 | + handleConnBroken() tests |
136 | +****************************************************************/ |
137 | + |
138 | +func (s *msgSuite) TestHandleConnBrokenUnkwown(c *C) { |
139 | + msg := serverMsg{"connbroken", |
140 | + protocol.BroadcastMsg{}, protocol.NotificationsMsg{}, |
141 | + protocol.ConnBrokenMsg{ |
142 | + Reason: "REASON", |
143 | + }, |
144 | + } |
145 | + go func() { s.errCh <- s.sess.handleConnBroken(&msg) }() |
146 | + c.Check(<-s.errCh, ErrorMatches, "server broke connection: REASON") |
147 | + c.Check(s.sess.State(), Equals, Error) |
148 | +} |
149 | + |
150 | +func (s *msgSuite) TestHandleConnBrokenHostMismatch(c *C) { |
151 | + msg := serverMsg{"connbroken", |
152 | + protocol.BroadcastMsg{}, protocol.NotificationsMsg{}, |
153 | + protocol.ConnBrokenMsg{ |
154 | + Reason: protocol.BrokenHostMismatch, |
155 | + }, |
156 | + } |
157 | + s.sess.deliveryHosts = []string{"foo:443", "bar:443"} |
158 | + go func() { s.errCh <- s.sess.handleConnBroken(&msg) }() |
159 | + c.Check(<-s.errCh, ErrorMatches, "server broke connection: host-mismatch") |
160 | + c.Check(s.sess.State(), Equals, Error) |
161 | + // hosts were reset |
162 | + c.Check(s.sess.deliveryHosts, IsNil) |
163 | +} |
164 | + |
165 | +/**************************************************************** |
166 | loop() tests |
167 | ****************************************************************/ |
168 | |
169 | @@ -728,6 +782,17 @@ |
170 | c.Check(<-s.errCh, Equals, failure) |
171 | } |
172 | |
173 | +func (s *loopSuite) TestLoopConnBroken(c *C) { |
174 | + c.Check(s.sess.State(), Equals, Running) |
175 | + broken := protocol.ConnBrokenMsg{ |
176 | + Type: "connbroken", |
177 | + Reason: "REASON", |
178 | + } |
179 | + c.Check(takeNext(s.downCh), Equals, "deadline 1ms") |
180 | + s.upCh <- broken |
181 | + c.Check(<-s.errCh, NotNil) |
182 | +} |
183 | + |
184 | /**************************************************************** |
185 | start() tests |
186 | ****************************************************************/ |
187 | |
188 | === modified file 'protocol/messages.go' |
189 | --- protocol/messages.go 2014-03-19 22:31:20 +0000 |
190 | +++ protocol/messages.go 2014-04-11 09:14:12 +0000 |
191 | @@ -49,6 +49,27 @@ |
192 | PingInterval string |
193 | } |
194 | |
195 | +// SplittableMsg are messages that may require and are capable of splitting. |
196 | +type SplittableMsg interface { |
197 | + Split() (done bool) |
198 | +} |
199 | + |
200 | +// CONNBROKEN message, server side is breaking the connection for reason. |
201 | +type ConnBrokenMsg struct { |
202 | + Type string `json:"T"` |
203 | + // reason |
204 | + Reason string |
205 | +} |
206 | + |
207 | +func (m *ConnBrokenMsg) Split() bool { |
208 | + return true |
209 | +} |
210 | + |
211 | +// CONNBROKEN reasons |
212 | +const ( |
213 | + BrokenHostMismatch = "host-mismatch" |
214 | +) |
215 | + |
216 | // PING/PONG messages |
217 | type PingPongMsg struct { |
218 | Type string `json:"T"` |
219 | @@ -56,11 +77,6 @@ |
220 | |
221 | const maxPayloadSize = 62 * 1024 |
222 | |
223 | -// SplittableMsg are messages that may require and are capable of splitting. |
224 | -type SplittableMsg interface { |
225 | - Split() (done bool) |
226 | -} |
227 | - |
228 | // BROADCAST messages |
229 | type BroadcastMsg struct { |
230 | Type string `json:"T"` |
231 | |
232 | === modified file 'protocol/messages_test.go' |
233 | --- protocol/messages_test.go 2014-02-26 16:04:57 +0000 |
234 | +++ protocol/messages_test.go 2014-04-11 09:14:12 +0000 |
235 | @@ -103,3 +103,7 @@ |
236 | b.Reset() |
237 | c.Check(b.splitting, Equals, 0) |
238 | } |
239 | + |
240 | +func (s *messagesSuite) TestSplitConnBrokenMsg(c *C) { |
241 | + c.Check((&ConnBrokenMsg{}).Split(), Equals, true) |
242 | +} |
243 | |
244 | === modified file 'server/acceptance/suites/broadcast.go' |
245 | --- server/acceptance/suites/broadcast.go 2014-04-03 16:47:47 +0000 |
246 | +++ server/acceptance/suites/broadcast.go 2014-04-11 09:14:12 +0000 |
247 | @@ -66,8 +66,6 @@ |
248 | }) |
249 | c.Assert(err, IsNil) |
250 | c.Assert(got, Matches, ".*ok.*") |
251 | - // xxx don't send this one |
252 | - c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:1 payloads:[]`) |
253 | c.Check(NextEvent(events, errCh), Equals, `broadcast chan:0 app: topLevel:2 payloads:[{"img1/m1":20}]`) |
254 | stop() |
255 | c.Assert(NextEvent(s.ServerEvents, nil), Matches, `.* ended with:.*EOF`) |
256 | |
257 | === modified file 'server/broker/broker.go' |
258 | --- server/broker/broker.go 2014-04-03 14:31:10 +0000 |
259 | +++ server/broker/broker.go 2014-04-11 09:14:12 +0000 |
260 | @@ -19,6 +19,7 @@ |
261 | package broker |
262 | |
263 | import ( |
264 | + "errors" |
265 | "fmt" |
266 | |
267 | "launchpad.net/ubuntu-push/protocol" |
268 | @@ -46,6 +47,9 @@ |
269 | Acked(sess BrokerSession, done bool) error |
270 | } |
271 | |
272 | +// ErrNop returned by Prepare means nothing to do/send. |
273 | +var ErrNop = errors.New("nothing to send") |
274 | + |
275 | // LevelsMap is the type for holding channel levels for session. |
276 | type LevelsMap map[store.InternalChannelId]int64 |
277 | |
278 | |
279 | === modified file 'server/broker/exchanges.go' |
280 | --- server/broker/exchanges.go 2014-04-03 16:00:53 +0000 |
281 | +++ server/broker/exchanges.go 2014-04-11 09:14:12 +0000 |
282 | @@ -28,8 +28,9 @@ |
283 | |
284 | // Scratch area for exchanges, sessions should hold one of these. |
285 | type ExchangesScratchArea struct { |
286 | - broadcastMsg protocol.BroadcastMsg |
287 | - ackMsg protocol.AckMsg |
288 | + broadcastMsg protocol.BroadcastMsg |
289 | + ackMsg protocol.AckMsg |
290 | + connBrokenMsg protocol.ConnBrokenMsg |
291 | } |
292 | |
293 | // BroadcastExchange leads a session through delivering a BROADCAST. |
294 | @@ -42,7 +43,7 @@ |
295 | } |
296 | |
297 | // check interface already here |
298 | -var _ Exchange = &BroadcastExchange{} |
299 | +var _ Exchange = (*BroadcastExchange)(nil) |
300 | |
301 | // Init ensures the BroadcastExchange is fully initialized for the sessions. |
302 | func (sbe *BroadcastExchange) Init() { |
303 | @@ -88,14 +89,18 @@ |
304 | |
305 | // Prepare session for a BROADCAST. |
306 | func (sbe *BroadcastExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
307 | - scratchArea := sess.ExchangeScratchArea() |
308 | - scratchArea.broadcastMsg.Reset() |
309 | - scratchArea.broadcastMsg.Type = "broadcast" |
310 | clientLevel := sess.Levels()[sbe.ChanId] |
311 | payloads := filterByLevel(clientLevel, sbe.TopLevel, sbe.NotificationPayloads) |
312 | tag := fmt.Sprintf("%s/%s", sess.DeviceImageChannel(), sess.DeviceImageModel()) |
313 | payloads = channelFilter(tag, sbe.ChanId, payloads, sbe.Decoded) |
314 | + if len(payloads) == 0 && sbe.TopLevel >= clientLevel { |
315 | + // empty and don't need to force resync => do nothing |
316 | + return nil, nil, ErrNop |
317 | + } |
318 | |
319 | + scratchArea := sess.ExchangeScratchArea() |
320 | + scratchArea.broadcastMsg.Reset() |
321 | + scratchArea.broadcastMsg.Type = "broadcast" |
322 | // xxx need an AppId as well, later |
323 | scratchArea.broadcastMsg.ChanId = store.InternalChannelIdToHex(sbe.ChanId) |
324 | scratchArea.broadcastMsg.TopLevel = sbe.TopLevel |
325 | @@ -113,3 +118,24 @@ |
326 | sess.Levels()[sbe.ChanId] = sbe.TopLevel |
327 | return nil |
328 | } |
329 | + |
330 | +// ConnBrokenExchange breaks a session giving a reason. |
331 | +type ConnBrokenExchange struct { |
332 | + Reason string |
333 | +} |
334 | + |
335 | +// check interface already here |
336 | +var _ Exchange = (*ConnBrokenExchange)(nil) |
337 | + |
338 | +// Prepare session for a CONNBROKEN. |
339 | +func (cbe *ConnBrokenExchange) Prepare(sess BrokerSession) (outMessage protocol.SplittableMsg, inMessage interface{}, err error) { |
340 | + scratchArea := sess.ExchangeScratchArea() |
341 | + scratchArea.connBrokenMsg.Type = "connbroken" |
342 | + scratchArea.connBrokenMsg.Reason = cbe.Reason |
343 | + return &scratchArea.connBrokenMsg, nil, nil |
344 | +} |
345 | + |
346 | +// CONNBROKEN isn't acked |
347 | +func (cbe *ConnBrokenExchange) Acked(sess BrokerSession, done bool) error { |
348 | + panic("Acked should not get invoked on ConnBrokenExchange") |
349 | +} |
350 | |
351 | === modified file 'server/broker/exchanges_test.go' |
352 | --- server/broker/exchanges_test.go 2014-04-04 09:57:02 +0000 |
353 | +++ server/broker/exchanges_test.go 2014-04-11 09:14:12 +0000 |
354 | @@ -81,6 +81,44 @@ |
355 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(3)) |
356 | } |
357 | |
358 | +func (s *exchangesSuite) TestBroadcastExchangeEmpty(c *C) { |
359 | + sess := &testing.TestBrokerSession{ |
360 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
361 | + Model: "m1", |
362 | + ImageChannel: "img1", |
363 | + } |
364 | + exchg := &broker.BroadcastExchange{ |
365 | + ChanId: store.SystemInternalChannelId, |
366 | + TopLevel: 3, |
367 | + NotificationPayloads: []json.RawMessage{}, |
368 | + } |
369 | + exchg.Init() |
370 | + outMsg, inMsg, err := exchg.Prepare(sess) |
371 | + c.Assert(err, Equals, broker.ErrNop) |
372 | + c.Check(outMsg, IsNil) |
373 | + c.Check(inMsg, IsNil) |
374 | +} |
375 | + |
376 | +func (s *exchangesSuite) TestBroadcastExchangeEmptyButAhead(c *C) { |
377 | + sess := &testing.TestBrokerSession{ |
378 | + LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{ |
379 | + store.SystemInternalChannelId: 10, |
380 | + }), |
381 | + Model: "m1", |
382 | + ImageChannel: "img1", |
383 | + } |
384 | + exchg := &broker.BroadcastExchange{ |
385 | + ChanId: store.SystemInternalChannelId, |
386 | + TopLevel: 3, |
387 | + NotificationPayloads: []json.RawMessage{}, |
388 | + } |
389 | + exchg.Init() |
390 | + outMsg, inMsg, err := exchg.Prepare(sess) |
391 | + c.Assert(err, IsNil) |
392 | + c.Check(outMsg, NotNil) |
393 | + c.Check(inMsg, NotNil) |
394 | +} |
395 | + |
396 | func (s *exchangesSuite) TestBroadcastExchangeReuseVsSplit(c *C) { |
397 | sess := &testing.TestBrokerSession{ |
398 | LevelsMap: broker.LevelsMap(map[store.InternalChannelId]int64{}), |
399 | @@ -210,3 +248,17 @@ |
400 | c.Assert(err, IsNil) |
401 | c.Check(sess.LevelsMap[store.SystemInternalChannelId], Equals, int64(5)) |
402 | } |
403 | + |
404 | +func (s *exchangesSuite) TestConnBrokenExchange(c *C) { |
405 | + sess := &testing.TestBrokerSession{} |
406 | + cbe := &broker.ConnBrokenExchange{"REASON"} |
407 | + outMsg, inMsg, err := cbe.Prepare(sess) |
408 | + c.Assert(err, IsNil) |
409 | + c.Check(inMsg, IsNil) // no answer is expected |
410 | + // check |
411 | + marshalled, err := json.Marshal(outMsg) |
412 | + c.Assert(err, IsNil) |
413 | + c.Check(string(marshalled), Equals, `{"T":"connbroken","Reason":"REASON"}`) |
414 | + |
415 | + c.Check(func() { cbe.Acked(nil, true) }, PanicMatches, "Acked should not get invoked on ConnBrokenExchange") |
416 | +} |
417 | |
418 | === modified file 'server/broker/simple/simple.go' |
419 | --- server/broker/simple/simple.go 2014-04-03 16:00:53 +0000 |
420 | +++ server/broker/simple/simple.go 2014-04-11 09:14:12 +0000 |
421 | @@ -222,6 +222,10 @@ |
422 | delete(b.registry, sess.deviceId) |
423 | } |
424 | } else { // register |
425 | + prev := b.registry[sess.deviceId] |
426 | + if prev != nil { // kick it |
427 | + close(prev.exchanges) |
428 | + } |
429 | b.registry[sess.deviceId] = sess |
430 | sess.registered = true |
431 | sess.done <- true |
432 | |
433 | === modified file 'server/broker/testsuite/suite.go' |
434 | --- server/broker/testsuite/suite.go 2014-04-03 16:00:53 +0000 |
435 | +++ server/broker/testsuite/suite.go 2014-04-11 09:14:12 +0000 |
436 | @@ -164,6 +164,14 @@ |
437 | c.Assert(err, IsNil) |
438 | sess2, err := b.Register(&protocol.ConnectMsg{Type: "connect", DeviceId: "dev-1"}) |
439 | c.Assert(err, IsNil) |
440 | + checkAndFalse := false |
441 | + // previous session got signaled by closing its channel |
442 | + select { |
443 | + case _, ok := <-sess1.SessionChannel(): |
444 | + checkAndFalse = ok == false |
445 | + default: |
446 | + } |
447 | + c.Check(checkAndFalse, Equals, true) |
448 | c.Assert(s.RevealSession(b, "dev-1"), Equals, sess2) |
449 | b.Unregister(sess1) |
450 | // just to make sure the unregister was processed |
451 | |
452 | === modified file 'server/session/session.go' |
453 | --- server/session/session.go 2014-02-10 23:19:08 +0000 |
454 | +++ server/session/session.go 2014-04-11 09:14:12 +0000 |
455 | @@ -62,6 +62,9 @@ |
456 | if err != nil { |
457 | return err |
458 | } |
459 | + if inMsg == nil { // no answer expected, breaking connection |
460 | + return &broker.ErrAbort{"session broken for reason"} |
461 | + } |
462 | err = proto.ReadMessage(inMsg) |
463 | if err != nil { |
464 | return err |
465 | @@ -76,6 +79,7 @@ |
466 | pingTimer := time.NewTimer(pingInterval) |
467 | intervalStart := time.Now() |
468 | ch := sess.SessionChannel() |
469 | +Loop: |
470 | for { |
471 | select { |
472 | case <-pingTimer.C: |
473 | @@ -90,10 +94,17 @@ |
474 | return &broker.ErrAbort{"expected PONG message"} |
475 | } |
476 | pingTimer.Reset(pingInterval) |
477 | - case exchg := <-ch: |
478 | - // xxx later can use ch closing for shutdown/reset |
479 | + case exchg, ok := <-ch: |
480 | pingTimer.Stop() |
481 | + if !ok { |
482 | + return &broker.ErrAbort{"terminated"} |
483 | + } |
484 | outMsg, inMsg, err := exchg.Prepare(sess) |
485 | + if err == broker.ErrNop { // nothing to do |
486 | + pingTimer.Reset(pingInterval) |
487 | + intervalStart = time.Now() |
488 | + continue Loop |
489 | + } |
490 | if err != nil { |
491 | return err |
492 | } |
493 | |
494 | === modified file 'server/session/session_test.go' |
495 | --- server/session/session_test.go 2014-03-19 23:46:18 +0000 |
496 | +++ server/session/session_test.go 2014-04-11 09:14:12 +0000 |
497 | @@ -346,6 +346,42 @@ |
498 | c.Check(err, Equals, io.EOF) |
499 | } |
500 | |
501 | +func (s *sessionSuite) TestSessionLoopKick(c *C) { |
502 | + nopTrack := NewTracker(s.testlog) |
503 | + errCh := make(chan error, 1) |
504 | + up := make(chan interface{}, 5) |
505 | + down := make(chan interface{}, 5) |
506 | + tp := &testProtocol{up, down} |
507 | + exchanges := make(chan broker.Exchange, 1) |
508 | + sess := &testing.TestBrokerSession{Exchanges: exchanges} |
509 | + go func() { |
510 | + errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
511 | + }() |
512 | + close(exchanges) |
513 | + err := <-errCh |
514 | + c.Check(err, DeepEquals, &broker.ErrAbort{"terminated"}) |
515 | +} |
516 | + |
517 | +func (s *sessionSuite) TestSessionLoopExchangeErrNop(c *C) { |
518 | + nopTrack := NewTracker(s.testlog) |
519 | + errCh := make(chan error, 1) |
520 | + up := make(chan interface{}, 5) |
521 | + down := make(chan interface{}, 5) |
522 | + tp := &testProtocol{up, down} |
523 | + exchanges := make(chan broker.Exchange, 1) |
524 | + exchanges <- &testExchange{prepErr: broker.ErrNop} |
525 | + sess := &testing.TestBrokerSession{Exchanges: exchanges} |
526 | + go func() { |
527 | + errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
528 | + }() |
529 | + c.Check(takeNext(down), Equals, "deadline 2ms") |
530 | + c.Check(takeNext(down), DeepEquals, protocol.PingPongMsg{Type: "ping"}) |
531 | + up <- nil // no write error |
532 | + up <- io.EOF |
533 | + err := <-errCh |
534 | + c.Check(err, Equals, io.EOF) |
535 | +} |
536 | + |
537 | func (s *sessionSuite) TestSessionLoopExchangeSplit(c *C) { |
538 | nopTrack := NewTracker(s.testlog) |
539 | errCh := make(chan error, 1) |
540 | @@ -434,6 +470,25 @@ |
541 | c.Check(err, Equals, io.ErrUnexpectedEOF) |
542 | } |
543 | |
544 | +func (s *sessionSuite) TestSessionLoopConnBrokenExchange(c *C) { |
545 | + nopTrack := NewTracker(s.testlog) |
546 | + errCh := make(chan error, 1) |
547 | + up := make(chan interface{}, 5) |
548 | + down := make(chan interface{}, 5) |
549 | + tp := &testProtocol{up, down} |
550 | + exchanges := make(chan broker.Exchange, 1) |
551 | + exchanges <- &broker.ConnBrokenExchange{"REASON"} |
552 | + sess := &testing.TestBrokerSession{Exchanges: exchanges} |
553 | + go func() { |
554 | + errCh <- sessionLoop(tp, sess, cfg5msPingInterval2msExchangeTout, nopTrack) |
555 | + }() |
556 | + c.Check(takeNext(down), Equals, "deadline 2ms") |
557 | + c.Check(takeNext(down), DeepEquals, protocol.ConnBrokenMsg{"connbroken", "REASON"}) |
558 | + up <- nil // no write error |
559 | + err := <-errCh |
560 | + c.Check(err, DeepEquals, &broker.ErrAbort{"session broken for reason"}) |
561 | +} |
562 | + |
563 | type testTracker struct { |
564 | SessionTracker |
565 | interval chan interface{} |